1use std::sync::Arc;
19
20use crate::ScalarFunctionExpr;
21use crate::{
22 expressions::{self, binary, like, similar_to, Column, Literal},
23 PhysicalExpr,
24};
25
26use arrow::datatypes::Schema;
27use datafusion_common::config::ConfigOptions;
28use datafusion_common::{
29 exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
30};
31use datafusion_expr::execution_props::ExecutionProps;
32use datafusion_expr::expr::{
33 Alias, Cast, FieldMetadata, InList, Placeholder, ScalarFunction,
34};
35use datafusion_expr::var_provider::is_system_variables;
36use datafusion_expr::var_provider::VarType;
37use datafusion_expr::{
38 binary_expr, lit, Between, BinaryExpr, Expr, Like, Operator, TryCast,
39};
40
41pub fn create_physical_expr(
110 e: &Expr,
111 input_dfschema: &DFSchema,
112 execution_props: &ExecutionProps,
113) -> Result<Arc<dyn PhysicalExpr>> {
114 let input_schema: &Schema = &input_dfschema.into();
115
116 match e {
117 Expr::Alias(Alias { expr, metadata, .. }) => {
118 if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
119 let new_metadata = FieldMetadata::merge_options(
120 prior_metadata.as_ref(),
121 metadata.as_ref(),
122 );
123 Ok(Arc::new(Literal::new_with_metadata(
124 v.clone(),
125 new_metadata,
126 )))
127 } else {
128 Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
129 }
130 }
131 Expr::Column(c) => {
132 let idx = input_dfschema.index_of_column(c)?;
133 Ok(Arc::new(Column::new(&c.name, idx)))
134 }
135 Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
136 value.clone(),
137 metadata.clone(),
138 ))),
139 Expr::ScalarVariable(_, variable_names) => {
140 if is_system_variables(variable_names) {
141 match execution_props.get_var_provider(VarType::System) {
142 Some(provider) => {
143 let scalar_value = provider.get_value(variable_names.clone())?;
144 Ok(Arc::new(Literal::new(scalar_value)))
145 }
146 _ => plan_err!("No system variable provider found"),
147 }
148 } else {
149 match execution_props.get_var_provider(VarType::UserDefined) {
150 Some(provider) => {
151 let scalar_value = provider.get_value(variable_names.clone())?;
152 Ok(Arc::new(Literal::new(scalar_value)))
153 }
154 _ => plan_err!("No user defined variable provider found"),
155 }
156 }
157 }
158 Expr::IsTrue(expr) => {
159 let binary_op = binary_expr(
160 expr.as_ref().clone(),
161 Operator::IsNotDistinctFrom,
162 lit(true),
163 );
164 create_physical_expr(&binary_op, input_dfschema, execution_props)
165 }
166 Expr::IsNotTrue(expr) => {
167 let binary_op =
168 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
169 create_physical_expr(&binary_op, input_dfschema, execution_props)
170 }
171 Expr::IsFalse(expr) => {
172 let binary_op = binary_expr(
173 expr.as_ref().clone(),
174 Operator::IsNotDistinctFrom,
175 lit(false),
176 );
177 create_physical_expr(&binary_op, input_dfschema, execution_props)
178 }
179 Expr::IsNotFalse(expr) => {
180 let binary_op =
181 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
182 create_physical_expr(&binary_op, input_dfschema, execution_props)
183 }
184 Expr::IsUnknown(expr) => {
185 let binary_op = binary_expr(
186 expr.as_ref().clone(),
187 Operator::IsNotDistinctFrom,
188 Expr::Literal(ScalarValue::Boolean(None), None),
189 );
190 create_physical_expr(&binary_op, input_dfschema, execution_props)
191 }
192 Expr::IsNotUnknown(expr) => {
193 let binary_op = binary_expr(
194 expr.as_ref().clone(),
195 Operator::IsDistinctFrom,
196 Expr::Literal(ScalarValue::Boolean(None), None),
197 );
198 create_physical_expr(&binary_op, input_dfschema, execution_props)
199 }
200 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
201 let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
203 let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
204 binary(lhs, *op, rhs, input_schema)
212 }
213 Expr::Like(Like {
214 negated,
215 expr,
216 pattern,
217 escape_char,
218 case_insensitive,
219 }) => {
220 if escape_char.unwrap_or('\\') != '\\' {
222 return exec_err!(
223 "LIKE does not support escape_char other than the backslash (\\)"
224 );
225 }
226 let physical_expr =
227 create_physical_expr(expr, input_dfschema, execution_props)?;
228 let physical_pattern =
229 create_physical_expr(pattern, input_dfschema, execution_props)?;
230 like(
231 *negated,
232 *case_insensitive,
233 physical_expr,
234 physical_pattern,
235 input_schema,
236 )
237 }
238 Expr::SimilarTo(Like {
239 negated,
240 expr,
241 pattern,
242 escape_char,
243 case_insensitive,
244 }) => {
245 if escape_char.is_some() {
246 return exec_err!("SIMILAR TO does not support escape_char yet");
247 }
248 let physical_expr =
249 create_physical_expr(expr, input_dfschema, execution_props)?;
250 let physical_pattern =
251 create_physical_expr(pattern, input_dfschema, execution_props)?;
252 similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
253 }
254 Expr::Case(case) => {
255 let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
256 Some(create_physical_expr(
257 e.as_ref(),
258 input_dfschema,
259 execution_props,
260 )?)
261 } else {
262 None
263 };
264 let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
265 .when_then_expr
266 .iter()
267 .map(|(w, t)| (w.as_ref(), t.as_ref()))
268 .unzip();
269 let when_expr =
270 create_physical_exprs(when_expr, input_dfschema, execution_props)?;
271 let then_expr =
272 create_physical_exprs(then_expr, input_dfschema, execution_props)?;
273 let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
274 when_expr
275 .iter()
276 .zip(then_expr.iter())
277 .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
278 .collect();
279 let else_expr: Option<Arc<dyn PhysicalExpr>> =
280 if let Some(e) = &case.else_expr {
281 Some(create_physical_expr(
282 e.as_ref(),
283 input_dfschema,
284 execution_props,
285 )?)
286 } else {
287 None
288 };
289 Ok(expressions::case(expr, when_then_expr, else_expr)?)
290 }
291 Expr::Cast(Cast { expr, data_type }) => expressions::cast(
292 create_physical_expr(expr, input_dfschema, execution_props)?,
293 input_schema,
294 data_type.clone(),
295 ),
296 Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
297 create_physical_expr(expr, input_dfschema, execution_props)?,
298 input_schema,
299 data_type.clone(),
300 ),
301 Expr::Not(expr) => {
302 expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
303 }
304 Expr::Negative(expr) => expressions::negative(
305 create_physical_expr(expr, input_dfschema, execution_props)?,
306 input_schema,
307 ),
308 Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
309 expr,
310 input_dfschema,
311 execution_props,
312 )?),
313 Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
314 expr,
315 input_dfschema,
316 execution_props,
317 )?),
318 Expr::ScalarFunction(ScalarFunction { func, args }) => {
319 let physical_args =
320 create_physical_exprs(args, input_dfschema, execution_props)?;
321 let config_options = match execution_props.config_options.as_ref() {
322 Some(config_options) => Arc::clone(config_options),
323 None => Arc::new(ConfigOptions::default()),
324 };
325
326 Ok(Arc::new(ScalarFunctionExpr::try_new(
327 Arc::clone(func),
328 physical_args,
329 input_schema,
330 config_options,
331 )?))
332 }
333 Expr::Between(Between {
334 expr,
335 negated,
336 low,
337 high,
338 }) => {
339 let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
340 let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
341 let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
342
343 let binary_expr = binary(
345 binary(
346 Arc::clone(&value_expr),
347 Operator::GtEq,
348 low_expr,
349 input_schema,
350 )?,
351 Operator::And,
352 binary(
353 Arc::clone(&value_expr),
354 Operator::LtEq,
355 high_expr,
356 input_schema,
357 )?,
358 input_schema,
359 );
360
361 if *negated {
362 expressions::not(binary_expr?)
363 } else {
364 binary_expr
365 }
366 }
367 Expr::InList(InList {
368 expr,
369 list,
370 negated,
371 }) => match expr.as_ref() {
372 Expr::Literal(ScalarValue::Utf8(None), _) => {
373 Ok(expressions::lit(ScalarValue::Boolean(None)))
374 }
375 _ => {
376 let value_expr =
377 create_physical_expr(expr, input_dfschema, execution_props)?;
378
379 let list_exprs =
380 create_physical_exprs(list, input_dfschema, execution_props)?;
381 expressions::in_list(value_expr, list_exprs, negated, input_schema)
382 }
383 },
384 Expr::Placeholder(Placeholder { id, .. }) => {
385 exec_err!("Placeholder '{id}' was not provided a value for execution.")
386 }
387 other => {
388 not_impl_err!("Physical plan does not support logical expression {other:?}")
389 }
390 }
391}
392
393pub fn create_physical_exprs<'a, I>(
395 exprs: I,
396 input_dfschema: &DFSchema,
397 execution_props: &ExecutionProps,
398) -> Result<Vec<Arc<dyn PhysicalExpr>>>
399where
400 I: IntoIterator<Item = &'a Expr>,
401{
402 exprs
403 .into_iter()
404 .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
405 .collect()
406}
407
408pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
410 let df_schema = schema.clone().to_dfschema().unwrap();
411 let execution_props = ExecutionProps::new();
412 create_physical_expr(expr, &df_schema, &execution_props).unwrap()
413}
414
415#[cfg(test)]
416mod tests {
417 use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
418 use arrow::datatypes::{DataType, Field};
419
420 use datafusion_expr::{col, lit};
421
422 use super::*;
423
424 #[test]
425 fn test_create_physical_expr_scalar_input_output() -> Result<()> {
426 let expr = col("letter").eq(lit("A"));
427
428 let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
429 let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
430 let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
431
432 let batch = RecordBatch::try_new(
433 Arc::new(schema),
434 vec![Arc::new(StringArray::from_iter_values(vec![
435 "A", "B", "C", "D",
436 ]))],
437 )?;
438 let result = p.evaluate(&batch)?;
439 let result = result.into_array(4).expect("Failed to convert to array");
440
441 assert_eq!(
442 &result,
443 &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
444 );
445
446 Ok(())
447 }
448}