1use std::sync::Arc;
19
20use crate::ScalarFunctionExpr;
21use crate::{
22 expressions::{self, binary, like, similar_to, Column, Literal},
23 PhysicalExpr,
24};
25
26use arrow::datatypes::Schema;
27use datafusion_common::config::ConfigOptions;
28use datafusion_common::metadata::FieldMetadata;
29use datafusion_common::{
30 exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
31};
32use datafusion_expr::execution_props::ExecutionProps;
33use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
34use datafusion_expr::var_provider::is_system_variables;
35use datafusion_expr::var_provider::VarType;
36use datafusion_expr::{
37 binary_expr, lit, Between, BinaryExpr, Expr, Like, Operator, TryCast,
38};
39
40pub fn create_physical_expr(
109 e: &Expr,
110 input_dfschema: &DFSchema,
111 execution_props: &ExecutionProps,
112) -> Result<Arc<dyn PhysicalExpr>> {
113 let input_schema = input_dfschema.as_arrow();
114
115 match e {
116 Expr::Alias(Alias { expr, metadata, .. }) => {
117 if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
118 let new_metadata = FieldMetadata::merge_options(
119 prior_metadata.as_ref(),
120 metadata.as_ref(),
121 );
122 Ok(Arc::new(Literal::new_with_metadata(
123 v.clone(),
124 new_metadata,
125 )))
126 } else {
127 Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
128 }
129 }
130 Expr::Column(c) => {
131 let idx = input_dfschema.index_of_column(c)?;
132 Ok(Arc::new(Column::new(&c.name, idx)))
133 }
134 Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
135 value.clone(),
136 metadata.clone(),
137 ))),
138 Expr::ScalarVariable(_, variable_names) => {
139 if is_system_variables(variable_names) {
140 match execution_props.get_var_provider(VarType::System) {
141 Some(provider) => {
142 let scalar_value = provider.get_value(variable_names.clone())?;
143 Ok(Arc::new(Literal::new(scalar_value)))
144 }
145 _ => plan_err!("No system variable provider found"),
146 }
147 } else {
148 match execution_props.get_var_provider(VarType::UserDefined) {
149 Some(provider) => {
150 let scalar_value = provider.get_value(variable_names.clone())?;
151 Ok(Arc::new(Literal::new(scalar_value)))
152 }
153 _ => plan_err!("No user defined variable provider found"),
154 }
155 }
156 }
157 Expr::IsTrue(expr) => {
158 let binary_op = binary_expr(
159 expr.as_ref().clone(),
160 Operator::IsNotDistinctFrom,
161 lit(true),
162 );
163 create_physical_expr(&binary_op, input_dfschema, execution_props)
164 }
165 Expr::IsNotTrue(expr) => {
166 let binary_op =
167 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
168 create_physical_expr(&binary_op, input_dfschema, execution_props)
169 }
170 Expr::IsFalse(expr) => {
171 let binary_op = binary_expr(
172 expr.as_ref().clone(),
173 Operator::IsNotDistinctFrom,
174 lit(false),
175 );
176 create_physical_expr(&binary_op, input_dfschema, execution_props)
177 }
178 Expr::IsNotFalse(expr) => {
179 let binary_op =
180 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
181 create_physical_expr(&binary_op, input_dfschema, execution_props)
182 }
183 Expr::IsUnknown(expr) => {
184 let binary_op = binary_expr(
185 expr.as_ref().clone(),
186 Operator::IsNotDistinctFrom,
187 Expr::Literal(ScalarValue::Boolean(None), None),
188 );
189 create_physical_expr(&binary_op, input_dfschema, execution_props)
190 }
191 Expr::IsNotUnknown(expr) => {
192 let binary_op = binary_expr(
193 expr.as_ref().clone(),
194 Operator::IsDistinctFrom,
195 Expr::Literal(ScalarValue::Boolean(None), None),
196 );
197 create_physical_expr(&binary_op, input_dfschema, execution_props)
198 }
199 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
200 let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
202 let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
203 binary(lhs, *op, rhs, input_schema)
211 }
212 Expr::Like(Like {
213 negated,
214 expr,
215 pattern,
216 escape_char,
217 case_insensitive,
218 }) => {
219 if escape_char.unwrap_or('\\') != '\\' {
221 return exec_err!(
222 "LIKE does not support escape_char other than the backslash (\\)"
223 );
224 }
225 let physical_expr =
226 create_physical_expr(expr, input_dfschema, execution_props)?;
227 let physical_pattern =
228 create_physical_expr(pattern, input_dfschema, execution_props)?;
229 like(
230 *negated,
231 *case_insensitive,
232 physical_expr,
233 physical_pattern,
234 input_schema,
235 )
236 }
237 Expr::SimilarTo(Like {
238 negated,
239 expr,
240 pattern,
241 escape_char,
242 case_insensitive,
243 }) => {
244 if escape_char.is_some() {
245 return exec_err!("SIMILAR TO does not support escape_char yet");
246 }
247 let physical_expr =
248 create_physical_expr(expr, input_dfschema, execution_props)?;
249 let physical_pattern =
250 create_physical_expr(pattern, input_dfschema, execution_props)?;
251 similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
252 }
253 Expr::Case(case) => {
254 let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
255 Some(create_physical_expr(
256 e.as_ref(),
257 input_dfschema,
258 execution_props,
259 )?)
260 } else {
261 None
262 };
263 let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
264 .when_then_expr
265 .iter()
266 .map(|(w, t)| (w.as_ref(), t.as_ref()))
267 .unzip();
268 let when_expr =
269 create_physical_exprs(when_expr, input_dfschema, execution_props)?;
270 let then_expr =
271 create_physical_exprs(then_expr, input_dfschema, execution_props)?;
272 let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
273 when_expr
274 .iter()
275 .zip(then_expr.iter())
276 .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
277 .collect();
278 let else_expr: Option<Arc<dyn PhysicalExpr>> =
279 if let Some(e) = &case.else_expr {
280 Some(create_physical_expr(
281 e.as_ref(),
282 input_dfschema,
283 execution_props,
284 )?)
285 } else {
286 None
287 };
288 Ok(expressions::case(expr, when_then_expr, else_expr)?)
289 }
290 Expr::Cast(Cast { expr, data_type }) => expressions::cast(
291 create_physical_expr(expr, input_dfschema, execution_props)?,
292 input_schema,
293 data_type.clone(),
294 ),
295 Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
296 create_physical_expr(expr, input_dfschema, execution_props)?,
297 input_schema,
298 data_type.clone(),
299 ),
300 Expr::Not(expr) => {
301 expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
302 }
303 Expr::Negative(expr) => expressions::negative(
304 create_physical_expr(expr, input_dfschema, execution_props)?,
305 input_schema,
306 ),
307 Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
308 expr,
309 input_dfschema,
310 execution_props,
311 )?),
312 Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
313 expr,
314 input_dfschema,
315 execution_props,
316 )?),
317 Expr::ScalarFunction(ScalarFunction { func, args }) => {
318 let physical_args =
319 create_physical_exprs(args, input_dfschema, execution_props)?;
320 let config_options = match execution_props.config_options.as_ref() {
321 Some(config_options) => Arc::clone(config_options),
322 None => Arc::new(ConfigOptions::default()),
323 };
324
325 Ok(Arc::new(ScalarFunctionExpr::try_new(
326 Arc::clone(func),
327 physical_args,
328 input_schema,
329 config_options,
330 )?))
331 }
332 Expr::Between(Between {
333 expr,
334 negated,
335 low,
336 high,
337 }) => {
338 let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
339 let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
340 let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
341
342 let binary_expr = binary(
344 binary(
345 Arc::clone(&value_expr),
346 Operator::GtEq,
347 low_expr,
348 input_schema,
349 )?,
350 Operator::And,
351 binary(
352 Arc::clone(&value_expr),
353 Operator::LtEq,
354 high_expr,
355 input_schema,
356 )?,
357 input_schema,
358 );
359
360 if *negated {
361 expressions::not(binary_expr?)
362 } else {
363 binary_expr
364 }
365 }
366 Expr::InList(InList {
367 expr,
368 list,
369 negated,
370 }) => match expr.as_ref() {
371 Expr::Literal(ScalarValue::Utf8(None), _) => {
372 Ok(expressions::lit(ScalarValue::Boolean(None)))
373 }
374 _ => {
375 let value_expr =
376 create_physical_expr(expr, input_dfschema, execution_props)?;
377
378 let list_exprs =
379 create_physical_exprs(list, input_dfschema, execution_props)?;
380 expressions::in_list(value_expr, list_exprs, negated, input_schema)
381 }
382 },
383 Expr::Placeholder(Placeholder { id, .. }) => {
384 exec_err!("Placeholder '{id}' was not provided a value for execution.")
385 }
386 other => {
387 not_impl_err!("Physical plan does not support logical expression {other:?}")
388 }
389 }
390}
391
392pub fn create_physical_exprs<'a, I>(
394 exprs: I,
395 input_dfschema: &DFSchema,
396 execution_props: &ExecutionProps,
397) -> Result<Vec<Arc<dyn PhysicalExpr>>>
398where
399 I: IntoIterator<Item = &'a Expr>,
400{
401 exprs
402 .into_iter()
403 .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
404 .collect()
405}
406
407pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
409 let df_schema = schema.clone().to_dfschema().unwrap();
411 let execution_props = ExecutionProps::new();
412 create_physical_expr(expr, &df_schema, &execution_props).unwrap()
413}
414
415#[cfg(test)]
416mod tests {
417 use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
418 use arrow::datatypes::{DataType, Field};
419
420 use datafusion_expr::{col, lit};
421
422 use super::*;
423
424 #[test]
425 fn test_create_physical_expr_scalar_input_output() -> Result<()> {
426 let expr = col("letter").eq(lit("A"));
427
428 let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
429 let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
430 let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
431
432 let batch = RecordBatch::try_new(
433 Arc::new(schema),
434 vec![Arc::new(StringArray::from_iter_values(vec![
435 "A", "B", "C", "D",
436 ]))],
437 )?;
438 let result = p.evaluate(&batch)?;
439 let result = result.into_array(4).expect("Failed to convert to array");
440
441 assert_eq!(
442 &result,
443 &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
444 );
445
446 Ok(())
447 }
448}