1use std::sync::Arc;
19
20use crate::ScalarFunctionExpr;
21use crate::{
22 PhysicalExpr,
23 expressions::{self, Column, Literal, binary, like, similar_to},
24};
25
26use arrow::datatypes::Schema;
27use datafusion_common::config::ConfigOptions;
28use datafusion_common::metadata::FieldMetadata;
29use datafusion_common::{
30 DFSchema, Result, ScalarValue, ToDFSchema, exec_err, not_impl_err, plan_err,
31};
32use datafusion_expr::execution_props::ExecutionProps;
33use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
34use datafusion_expr::var_provider::VarType;
35use datafusion_expr::var_provider::is_system_variables;
36use datafusion_expr::{
37 Between, BinaryExpr, Expr, Like, Operator, TryCast, binary_expr, lit,
38};
39
40#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
109pub fn create_physical_expr(
110 e: &Expr,
111 input_dfschema: &DFSchema,
112 execution_props: &ExecutionProps,
113) -> Result<Arc<dyn PhysicalExpr>> {
114 let input_schema = input_dfschema.as_arrow();
115
116 match e {
117 Expr::Alias(Alias { expr, metadata, .. }) => {
118 if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
119 let new_metadata = FieldMetadata::merge_options(
120 prior_metadata.as_ref(),
121 metadata.as_ref(),
122 );
123 Ok(Arc::new(Literal::new_with_metadata(
124 v.clone(),
125 new_metadata,
126 )))
127 } else {
128 Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
129 }
130 }
131 Expr::Column(c) => {
132 let idx = input_dfschema.index_of_column(c)?;
133 Ok(Arc::new(Column::new(&c.name, idx)))
134 }
135 Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
136 value.clone(),
137 metadata.clone(),
138 ))),
139 Expr::ScalarVariable(_, variable_names) => {
140 if is_system_variables(variable_names) {
141 match execution_props.get_var_provider(VarType::System) {
142 Some(provider) => {
143 let scalar_value = provider.get_value(variable_names.clone())?;
144 Ok(Arc::new(Literal::new(scalar_value)))
145 }
146 _ => plan_err!("No system variable provider found"),
147 }
148 } else {
149 match execution_props.get_var_provider(VarType::UserDefined) {
150 Some(provider) => {
151 let scalar_value = provider.get_value(variable_names.clone())?;
152 Ok(Arc::new(Literal::new(scalar_value)))
153 }
154 _ => plan_err!("No user defined variable provider found"),
155 }
156 }
157 }
158 Expr::IsTrue(expr) => {
159 let binary_op = binary_expr(
160 expr.as_ref().clone(),
161 Operator::IsNotDistinctFrom,
162 lit(true),
163 );
164 create_physical_expr(&binary_op, input_dfschema, execution_props)
165 }
166 Expr::IsNotTrue(expr) => {
167 let binary_op =
168 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
169 create_physical_expr(&binary_op, input_dfschema, execution_props)
170 }
171 Expr::IsFalse(expr) => {
172 let binary_op = binary_expr(
173 expr.as_ref().clone(),
174 Operator::IsNotDistinctFrom,
175 lit(false),
176 );
177 create_physical_expr(&binary_op, input_dfschema, execution_props)
178 }
179 Expr::IsNotFalse(expr) => {
180 let binary_op =
181 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
182 create_physical_expr(&binary_op, input_dfschema, execution_props)
183 }
184 Expr::IsUnknown(expr) => {
185 let binary_op = binary_expr(
186 expr.as_ref().clone(),
187 Operator::IsNotDistinctFrom,
188 Expr::Literal(ScalarValue::Boolean(None), None),
189 );
190 create_physical_expr(&binary_op, input_dfschema, execution_props)
191 }
192 Expr::IsNotUnknown(expr) => {
193 let binary_op = binary_expr(
194 expr.as_ref().clone(),
195 Operator::IsDistinctFrom,
196 Expr::Literal(ScalarValue::Boolean(None), None),
197 );
198 create_physical_expr(&binary_op, input_dfschema, execution_props)
199 }
200 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
201 let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
203 let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
204 binary(lhs, *op, rhs, input_schema)
212 }
213 Expr::Like(Like {
214 negated,
215 expr,
216 pattern,
217 escape_char,
218 case_insensitive,
219 }) => {
220 if escape_char.unwrap_or('\\') != '\\' {
222 return exec_err!(
223 "LIKE does not support escape_char other than the backslash (\\)"
224 );
225 }
226 let physical_expr =
227 create_physical_expr(expr, input_dfschema, execution_props)?;
228 let physical_pattern =
229 create_physical_expr(pattern, input_dfschema, execution_props)?;
230 like(
231 *negated,
232 *case_insensitive,
233 physical_expr,
234 physical_pattern,
235 input_schema,
236 )
237 }
238 Expr::SimilarTo(Like {
239 negated,
240 expr,
241 pattern,
242 escape_char,
243 case_insensitive,
244 }) => {
245 if escape_char.is_some() {
246 return exec_err!("SIMILAR TO does not support escape_char yet");
247 }
248 let physical_expr =
249 create_physical_expr(expr, input_dfschema, execution_props)?;
250 let physical_pattern =
251 create_physical_expr(pattern, input_dfschema, execution_props)?;
252 similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
253 }
254 Expr::Case(case) => {
255 let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
256 Some(create_physical_expr(
257 e.as_ref(),
258 input_dfschema,
259 execution_props,
260 )?)
261 } else {
262 None
263 };
264 let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
265 .when_then_expr
266 .iter()
267 .map(|(w, t)| (w.as_ref(), t.as_ref()))
268 .unzip();
269 let when_expr =
270 create_physical_exprs(when_expr, input_dfschema, execution_props)?;
271 let then_expr =
272 create_physical_exprs(then_expr, input_dfschema, execution_props)?;
273 let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
274 when_expr
275 .iter()
276 .zip(then_expr.iter())
277 .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
278 .collect();
279 let else_expr: Option<Arc<dyn PhysicalExpr>> =
280 if let Some(e) = &case.else_expr {
281 Some(create_physical_expr(
282 e.as_ref(),
283 input_dfschema,
284 execution_props,
285 )?)
286 } else {
287 None
288 };
289 Ok(expressions::case(expr, when_then_expr, else_expr)?)
290 }
291 Expr::Cast(Cast { expr, data_type }) => expressions::cast(
292 create_physical_expr(expr, input_dfschema, execution_props)?,
293 input_schema,
294 data_type.clone(),
295 ),
296 Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
297 create_physical_expr(expr, input_dfschema, execution_props)?,
298 input_schema,
299 data_type.clone(),
300 ),
301 Expr::Not(expr) => {
302 expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
303 }
304 Expr::Negative(expr) => expressions::negative(
305 create_physical_expr(expr, input_dfschema, execution_props)?,
306 input_schema,
307 ),
308 Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
309 expr,
310 input_dfschema,
311 execution_props,
312 )?),
313 Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
314 expr,
315 input_dfschema,
316 execution_props,
317 )?),
318 Expr::ScalarFunction(ScalarFunction { func, args }) => {
319 let physical_args =
320 create_physical_exprs(args, input_dfschema, execution_props)?;
321 let config_options = match execution_props.config_options.as_ref() {
322 Some(config_options) => Arc::clone(config_options),
323 None => Arc::new(ConfigOptions::default()),
324 };
325
326 Ok(Arc::new(ScalarFunctionExpr::try_new(
327 Arc::clone(func),
328 physical_args,
329 input_schema,
330 config_options,
331 )?))
332 }
333 Expr::Between(Between {
334 expr,
335 negated,
336 low,
337 high,
338 }) => {
339 let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
340 let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
341 let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
342
343 let binary_expr = binary(
345 binary(
346 Arc::clone(&value_expr),
347 Operator::GtEq,
348 low_expr,
349 input_schema,
350 )?,
351 Operator::And,
352 binary(
353 Arc::clone(&value_expr),
354 Operator::LtEq,
355 high_expr,
356 input_schema,
357 )?,
358 input_schema,
359 );
360
361 if *negated {
362 expressions::not(binary_expr?)
363 } else {
364 binary_expr
365 }
366 }
367 Expr::InList(InList {
368 expr,
369 list,
370 negated,
371 }) => match expr.as_ref() {
372 Expr::Literal(ScalarValue::Utf8(None), _) => {
373 Ok(expressions::lit(ScalarValue::Boolean(None)))
374 }
375 _ => {
376 let value_expr =
377 create_physical_expr(expr, input_dfschema, execution_props)?;
378
379 let list_exprs =
380 create_physical_exprs(list, input_dfschema, execution_props)?;
381 expressions::in_list(value_expr, list_exprs, negated, input_schema)
382 }
383 },
384 Expr::Placeholder(Placeholder { id, .. }) => {
385 exec_err!("Placeholder '{id}' was not provided a value for execution.")
386 }
387 other => {
388 not_impl_err!("Physical plan does not support logical expression {other:?}")
389 }
390 }
391}
392
393pub fn create_physical_exprs<'a, I>(
395 exprs: I,
396 input_dfschema: &DFSchema,
397 execution_props: &ExecutionProps,
398) -> Result<Vec<Arc<dyn PhysicalExpr>>>
399where
400 I: IntoIterator<Item = &'a Expr>,
401{
402 exprs
403 .into_iter()
404 .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
405 .collect()
406}
407
408pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
410 let df_schema = schema.clone().to_dfschema().unwrap();
412 let execution_props = ExecutionProps::new();
413 create_physical_expr(expr, &df_schema, &execution_props).unwrap()
414}
415
416#[cfg(test)]
417mod tests {
418 use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
419 use arrow::datatypes::{DataType, Field};
420
421 use datafusion_expr::{Operator, col, lit};
422
423 use super::*;
424
425 #[test]
426 fn test_create_physical_expr_scalar_input_output() -> Result<()> {
427 let expr = col("letter").eq(lit("A"));
428
429 let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
430 let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
431 let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
432
433 let batch = RecordBatch::try_new(
434 Arc::new(schema),
435 vec![Arc::new(StringArray::from_iter_values(vec![
436 "A", "B", "C", "D",
437 ]))],
438 )?;
439 let result = p.evaluate(&batch)?;
440 let result = result.into_array(4).expect("Failed to convert to array");
441
442 assert_eq!(
443 &result,
444 &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
445 );
446
447 Ok(())
448 }
449
450 #[test]
455 #[cfg_attr(not(feature = "recursive_protection"), ignore)]
456 fn test_deeply_nested_binary_expr() -> Result<()> {
457 let depth = 1000;
460
461 let mut expr = col("a");
462 for _ in 0..depth {
463 expr = Expr::BinaryExpr(BinaryExpr {
464 left: Box::new(expr),
465 op: Operator::Plus,
466 right: Box::new(col("a")),
467 });
468 }
469
470 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
471 let df_schema = DFSchema::try_from(schema)?;
472
473 let _physical_expr =
475 create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
476
477 Ok(())
478 }
479}