1use std::sync::Arc;
19
20use crate::scalar_subquery::ScalarSubqueryExpr;
21use crate::{HigherOrderFunctionExpr, ScalarFunctionExpr};
22use crate::{
23 PhysicalExpr,
24 expressions::{self, Column, Literal, binary, like, similar_to},
25};
26
27use arrow::datatypes::Schema;
28use datafusion_common::config::ConfigOptions;
29use datafusion_common::datatype::FieldExt;
30use datafusion_common::metadata::{FieldMetadata, format_type_and_metadata};
31use datafusion_common::{
32 DFSchema, Result, ScalarValue, TableReference, ToDFSchema, exec_err,
33 internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err,
34};
35use datafusion_expr::execution_props::ExecutionProps;
36use datafusion_expr::expr::{
37 Alias, Cast, HigherOrderFunction, InList, Lambda, LambdaVariable, Placeholder,
38 ScalarFunction,
39};
40use datafusion_expr::var_provider::VarType;
41use datafusion_expr::var_provider::is_system_variables;
42use datafusion_expr::{
43 Between, BinaryExpr, Expr, ExprSchemable, Like, Operator, TryCast, binary_expr, lit,
44};
45
46#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
115pub fn create_physical_expr(
116 e: &Expr,
117 input_dfschema: &DFSchema,
118 execution_props: &ExecutionProps,
119) -> Result<Arc<dyn PhysicalExpr>> {
120 let input_schema = input_dfschema.as_arrow();
121
122 match e {
123 Expr::Alias(Alias { expr, metadata, .. }) => {
124 if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
125 let new_metadata = FieldMetadata::merge_options(
126 prior_metadata.as_ref(),
127 metadata.as_ref(),
128 );
129 Ok(Arc::new(Literal::new_with_metadata(
130 v.clone(),
131 new_metadata,
132 )))
133 } else {
134 Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
135 }
136 }
137 Expr::Column(c) => {
138 let idx = input_dfschema.index_of_column(c)?;
139 Ok(Arc::new(Column::new(&c.name, idx)))
140 }
141 Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
142 value.clone(),
143 metadata.clone(),
144 ))),
145 Expr::ScalarVariable(_, variable_names) => {
146 if is_system_variables(variable_names) {
147 match execution_props.get_var_provider(VarType::System) {
148 Some(provider) => {
149 let scalar_value = provider.get_value(variable_names.clone())?;
150 Ok(Arc::new(Literal::new(scalar_value)))
151 }
152 _ => plan_err!("No system variable provider found"),
153 }
154 } else {
155 match execution_props.get_var_provider(VarType::UserDefined) {
156 Some(provider) => {
157 let scalar_value = provider.get_value(variable_names.clone())?;
158 Ok(Arc::new(Literal::new(scalar_value)))
159 }
160 _ => plan_err!("No user defined variable provider found"),
161 }
162 }
163 }
164 Expr::IsTrue(expr) => {
165 let binary_op = binary_expr(
166 expr.as_ref().clone(),
167 Operator::IsNotDistinctFrom,
168 lit(true),
169 );
170 create_physical_expr(&binary_op, input_dfschema, execution_props)
171 }
172 Expr::IsNotTrue(expr) => {
173 let binary_op =
174 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
175 create_physical_expr(&binary_op, input_dfschema, execution_props)
176 }
177 Expr::IsFalse(expr) => {
178 let binary_op = binary_expr(
179 expr.as_ref().clone(),
180 Operator::IsNotDistinctFrom,
181 lit(false),
182 );
183 create_physical_expr(&binary_op, input_dfschema, execution_props)
184 }
185 Expr::IsNotFalse(expr) => {
186 let binary_op =
187 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
188 create_physical_expr(&binary_op, input_dfschema, execution_props)
189 }
190 Expr::IsUnknown(expr) => {
191 let binary_op = binary_expr(
192 expr.as_ref().clone(),
193 Operator::IsNotDistinctFrom,
194 Expr::Literal(ScalarValue::Boolean(None), None),
195 );
196 create_physical_expr(&binary_op, input_dfschema, execution_props)
197 }
198 Expr::IsNotUnknown(expr) => {
199 let binary_op = binary_expr(
200 expr.as_ref().clone(),
201 Operator::IsDistinctFrom,
202 Expr::Literal(ScalarValue::Boolean(None), None),
203 );
204 create_physical_expr(&binary_op, input_dfschema, execution_props)
205 }
206 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
207 let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
209 let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
210 binary(lhs, *op, rhs, input_schema)
218 }
219 Expr::Like(Like {
220 negated,
221 expr,
222 pattern,
223 escape_char,
224 case_insensitive,
225 }) => {
226 if escape_char.unwrap_or('\\') != '\\' {
228 return exec_err!(
229 "LIKE does not support escape_char other than the backslash (\\)"
230 );
231 }
232 let physical_expr =
233 create_physical_expr(expr, input_dfschema, execution_props)?;
234 let physical_pattern =
235 create_physical_expr(pattern, input_dfschema, execution_props)?;
236 like(
237 *negated,
238 *case_insensitive,
239 physical_expr,
240 physical_pattern,
241 input_schema,
242 )
243 }
244 Expr::SimilarTo(Like {
245 negated,
246 expr,
247 pattern,
248 escape_char,
249 case_insensitive,
250 }) => {
251 if escape_char.is_some() {
252 return exec_err!("SIMILAR TO does not support escape_char yet");
253 }
254 let physical_expr =
255 create_physical_expr(expr, input_dfschema, execution_props)?;
256 let physical_pattern =
257 create_physical_expr(pattern, input_dfschema, execution_props)?;
258 similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
259 }
260 Expr::Case(case) => {
261 let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
262 Some(create_physical_expr(
263 e.as_ref(),
264 input_dfschema,
265 execution_props,
266 )?)
267 } else {
268 None
269 };
270 let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
271 .when_then_expr
272 .iter()
273 .map(|(w, t)| (w.as_ref(), t.as_ref()))
274 .unzip();
275 let when_expr =
276 create_physical_exprs(when_expr, input_dfschema, execution_props)?;
277 let then_expr =
278 create_physical_exprs(then_expr, input_dfschema, execution_props)?;
279 let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
280 when_expr
281 .iter()
282 .zip(then_expr.iter())
283 .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
284 .collect();
285 let else_expr: Option<Arc<dyn PhysicalExpr>> =
286 if let Some(e) = &case.else_expr {
287 Some(create_physical_expr(
288 e.as_ref(),
289 input_dfschema,
290 execution_props,
291 )?)
292 } else {
293 None
294 };
295 Ok(expressions::case(expr, when_then_expr, else_expr)?)
296 }
297 Expr::Cast(Cast { expr, field }) => expressions::cast_with_target_field(
298 create_physical_expr(expr, input_dfschema, execution_props)?,
299 input_schema,
300 Arc::clone(field),
301 None,
302 ),
303 Expr::TryCast(TryCast { expr, field }) => {
304 if !field.metadata().is_empty() {
305 let (_, src_field) = expr.to_field(input_dfschema)?;
306 return plan_err!(
307 "TryCast from {} to {} is not supported",
308 format_type_and_metadata(
309 src_field.data_type(),
310 Some(src_field.metadata()),
311 ),
312 format_type_and_metadata(field.data_type(), Some(field.metadata()))
313 );
314 }
315
316 expressions::try_cast(
317 create_physical_expr(expr, input_dfschema, execution_props)?,
318 input_schema,
319 field.data_type().clone(),
320 )
321 }
322 Expr::Not(expr) => {
323 expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
324 }
325 Expr::Negative(expr) => expressions::negative(
326 create_physical_expr(expr, input_dfschema, execution_props)?,
327 input_schema,
328 ),
329 Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
330 expr,
331 input_dfschema,
332 execution_props,
333 )?),
334 Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
335 expr,
336 input_dfschema,
337 execution_props,
338 )?),
339 Expr::ScalarFunction(ScalarFunction { func, args }) => {
340 let physical_args =
341 create_physical_exprs(args, input_dfschema, execution_props)?;
342 let config_options = match execution_props.config_options.as_ref() {
343 Some(config_options) => Arc::clone(config_options),
344 None => Arc::new(ConfigOptions::default()),
345 };
346
347 Ok(Arc::new(ScalarFunctionExpr::try_new(
348 Arc::clone(func),
349 physical_args,
350 input_schema,
351 config_options,
352 )?))
353 }
354 Expr::Between(Between {
355 expr,
356 negated,
357 low,
358 high,
359 }) => {
360 let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
361 let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
362 let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
363
364 let binary_expr = binary(
366 binary(
367 Arc::clone(&value_expr),
368 Operator::GtEq,
369 low_expr,
370 input_schema,
371 )?,
372 Operator::And,
373 binary(
374 Arc::clone(&value_expr),
375 Operator::LtEq,
376 high_expr,
377 input_schema,
378 )?,
379 input_schema,
380 );
381
382 if *negated {
383 expressions::not(binary_expr?)
384 } else {
385 binary_expr
386 }
387 }
388 Expr::InList(InList {
389 expr,
390 list,
391 negated,
392 }) => match expr.as_ref() {
393 Expr::Literal(ScalarValue::Utf8(None), _) => {
394 Ok(expressions::lit(ScalarValue::Boolean(None)))
395 }
396 _ => {
397 let value_expr =
398 create_physical_expr(expr, input_dfschema, execution_props)?;
399
400 let list_exprs =
401 create_physical_exprs(list, input_dfschema, execution_props)?;
402 expressions::in_list(value_expr, list_exprs, negated, input_schema)
403 }
404 },
405 Expr::ScalarSubquery(sq) => {
406 match execution_props.subquery_indexes.get(sq) {
407 Some(&index) => {
408 let schema = sq.subquery.schema();
409 if schema.fields().len() != 1 {
410 return plan_err!(
411 "Scalar subquery must return exactly one column, got {}",
412 schema.fields().len()
413 );
414 }
415 let dt = schema.field(0).data_type().clone();
416 let nullable = schema.field(0).is_nullable();
417 Ok(Arc::new(ScalarSubqueryExpr::new(
418 dt,
419 nullable,
420 index,
421 execution_props.subquery_results.clone(),
422 )))
423 }
424 None => {
425 not_impl_err!(
429 "Physical plan does not support logical expression {e:?}"
430 )
431 }
432 }
433 }
434 Expr::Placeholder(Placeholder { id, .. }) => {
435 exec_err!("Placeholder '{id}' was not provided a value for execution.")
436 }
437 Expr::HigherOrderFunction(invocation @ HigherOrderFunction { func, args }) => {
438 let num_lambdas = args
439 .iter()
440 .filter(|arg| matches!(arg, Expr::Lambda(_)))
441 .count();
442
443 let mut lambda_parameters =
444 invocation.lambda_parameters(input_dfschema)?.into_iter();
445
446 if num_lambdas > lambda_parameters.len() {
447 return plan_err!(
448 "{} lambda_parameters returned only {} values for {num_lambdas} lambdas",
449 func.name(),
450 lambda_parameters.len()
451 );
452 }
453
454 let lambda_qualifier = 1 + input_dfschema
455 .iter()
456 .filter_map(|(qualifier, _field)| {
457 qualifier.and_then(|tbl| {
458 tbl.table().strip_prefix("lambda_")?.parse::<usize>().ok()
459 })
460 })
461 .max()
462 .unwrap_or_default();
463
464 let qualifier = TableReference::bare(format!("lambda_{lambda_qualifier}"));
465
466 let physical_args = args
467 .iter()
468 .map(|arg| match arg {
469 Expr::Lambda(lambda) => {
470 let lambda_parameters = lambda_parameters
471 .next()
472 .ok_or_else(|| {
473 internal_datafusion_err!(
474 "lambda_parameters len should have been checked above"
475 )
476 })?
477 .into_iter()
478 .zip(&lambda.params)
479 .map(|(field, name)| {
480 (Some(qualifier.clone()), field.renamed(name.as_str()))
481 });
482
483 let new_fields = input_dfschema
484 .iter()
485 .map(|(tbl, field)| (tbl.cloned(), Arc::clone(field)))
486 .chain(lambda_parameters)
487 .collect();
488
489 let lambda_schema = DFSchema::new_with_metadata(
490 new_fields,
491 input_dfschema.metadata().clone(),
492 )?;
493
494 let execution_props = execution_props
495 .clone()
496 .with_qualified_lambda_variables(&qualifier, &lambda.params);
497
498 create_physical_expr(arg, &lambda_schema, &execution_props)
499 }
500 _ => create_physical_expr(arg, input_dfschema, execution_props),
501 })
502 .collect::<Result<_>>()?;
503
504 let config_options = match execution_props.config_options.as_ref() {
505 Some(config_options) => Arc::clone(config_options),
506 None => Arc::new(ConfigOptions::default()),
507 };
508
509 Ok(Arc::new(HigherOrderFunctionExpr::try_new_with_schema(
510 Arc::clone(func),
511 physical_args,
512 input_schema,
513 config_options,
514 )?))
515 }
516 Expr::Lambda(Lambda { params, body }) => expressions::lambda(
517 params,
518 create_physical_expr(body, input_dfschema, execution_props)?,
519 ),
520 Expr::LambdaVariable(LambdaVariable {
521 name,
522 field,
523 spans: _,
524 }) => {
525 let field = field.as_ref().ok_or_else(|| {
526 plan_datafusion_err!("unresolved LambdaVariable {name}")
527 })?;
528
529 let qualifier = execution_props
530 .lambda_variable_qualifier
531 .get(name)
532 .ok_or_else(|| {
533 plan_datafusion_err!("qualifier for lambda variable {name} not found")
534 })?;
535
536 let index = input_dfschema
537 .index_of_column_by_name(Some(qualifier), name)
538 .ok_or_else(|| {
539 plan_datafusion_err!(
540 "lambda variable {qualifier}.{name} not found in planning schema"
541 )
542 })?;
543
544 let schema_field = input_dfschema.field(index);
545
546 let renamed_field = Arc::clone(field).renamed(name);
554
555 if &renamed_field != schema_field {
556 return plan_err!(
557 "LambdaVariable field and schema field mismatch {} != {}",
558 renamed_field,
559 schema_field
560 );
561 }
562
563 Ok(Arc::new(expressions::LambdaVariable::new(
564 index,
565 Arc::clone(schema_field),
566 )))
567 }
568 other => {
569 not_impl_err!("Physical plan does not support logical expression {other:?}")
570 }
571 }
572}
573
574pub fn create_physical_exprs<'a, I>(
576 exprs: I,
577 input_dfschema: &DFSchema,
578 execution_props: &ExecutionProps,
579) -> Result<Vec<Arc<dyn PhysicalExpr>>>
580where
581 I: IntoIterator<Item = &'a Expr>,
582{
583 exprs
584 .into_iter()
585 .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
586 .collect()
587}
588
589pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
591 let df_schema = schema.clone().to_dfschema().unwrap();
593 let execution_props = ExecutionProps::new();
594 create_physical_expr(expr, &df_schema, &execution_props).unwrap()
595}
596
597#[cfg(test)]
598mod tests {
599 use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
600 use arrow::datatypes::{DataType, Field};
601 use datafusion_expr::col;
602
603 use super::*;
604
605 fn test_cast_schema() -> Schema {
606 Schema::new(vec![Field::new("a", DataType::Int32, false)])
607 }
608
609 fn lower_cast_expr(expr: &Expr, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
610 let df_schema = DFSchema::try_from(schema.clone())?;
611 create_physical_expr(expr, &df_schema, &ExecutionProps::new())
612 }
613
614 fn as_planner_cast(physical: &Arc<dyn PhysicalExpr>) -> &expressions::CastExpr {
615 physical
616 .downcast_ref::<expressions::CastExpr>()
617 .expect("planner should lower logical CAST to CastExpr")
618 }
619
620 #[test]
621 fn test_create_physical_expr_scalar_input_output() -> Result<()> {
622 let expr = col("letter").eq(lit("A"));
623
624 let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
625 let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
626 let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
627
628 let batch = RecordBatch::try_new(
629 Arc::new(schema),
630 vec![Arc::new(StringArray::from_iter_values(vec![
631 "A", "B", "C", "D",
632 ]))],
633 )?;
634 let result = p.evaluate(&batch)?;
635 let result = result.into_array(4).expect("Failed to convert to array");
636
637 assert_eq!(
638 &result,
639 &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
640 );
641
642 Ok(())
643 }
644
645 #[test]
646 fn test_cast_lowering_preserves_target_field_metadata() -> Result<()> {
647 let schema = test_cast_schema();
648 let target_field = Arc::new(
649 Field::new("cast_target", DataType::Int64, true)
650 .with_metadata([("target_meta".to_string(), "1".to_string())].into()),
651 );
652 let cast_expr = Expr::Cast(Cast::new_from_field(
653 Box::new(col("a")),
654 Arc::clone(&target_field),
655 ));
656
657 let physical = lower_cast_expr(&cast_expr, &schema)?;
658 let cast = as_planner_cast(&physical);
659
660 assert_eq!(cast.target_field(), &target_field);
661 assert_eq!(physical.return_field(&schema)?, target_field);
662 assert!(physical.nullable(&schema)?);
663
664 Ok(())
665 }
666
667 #[test]
668 fn test_cast_lowering_preserves_standard_cast_semantics() -> Result<()> {
669 let schema = test_cast_schema();
670 let cast_expr = Expr::Cast(Cast::new(Box::new(col("a")), DataType::Int64));
671
672 let physical = lower_cast_expr(&cast_expr, &schema)?;
673 let cast = as_planner_cast(&physical);
674 let returned_field = physical.return_field(&schema)?;
675
676 assert_eq!(cast.cast_type(), &DataType::Int64);
677 assert_eq!(returned_field.name(), "a");
678 assert_eq!(returned_field.data_type(), &DataType::Int64);
679 assert!(!physical.nullable(&schema)?);
680
681 Ok(())
682 }
683
684 #[test]
685 fn test_cast_lowering_preserves_same_type_field_semantics() -> Result<()> {
686 let schema = test_cast_schema();
687 let target_field = Arc::new(
688 Field::new("same_type_cast", DataType::Int32, true).with_metadata(
689 [("target_meta".to_string(), "same-type".to_string())].into(),
690 ),
691 );
692 let cast_expr = Expr::Cast(Cast::new_from_field(
693 Box::new(col("a")),
694 Arc::clone(&target_field),
695 ));
696
697 let physical = lower_cast_expr(&cast_expr, &schema)?;
698 let cast = as_planner_cast(&physical);
699
700 assert_eq!(cast.target_field(), &target_field);
701 assert_eq!(physical.return_field(&schema)?, target_field);
702 assert!(physical.nullable(&schema)?);
703
704 Ok(())
705 }
706
707 #[test]
712 #[cfg_attr(not(feature = "recursive_protection"), ignore)]
713 fn test_deeply_nested_binary_expr() -> Result<()> {
714 let depth = 1000;
717
718 let mut expr = col("a");
719 for _ in 0..depth {
720 expr = Expr::BinaryExpr(BinaryExpr {
721 left: Box::new(expr),
722 op: Operator::Plus,
723 right: Box::new(col("a")),
724 });
725 }
726
727 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
728 let df_schema = DFSchema::try_from(schema)?;
729
730 let _physical_expr =
732 create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
733
734 Ok(())
735 }
736}