1use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::ScalarFunctionExpr;
22use crate::{
23 expressions::{self, binary, like, similar_to, Column, Literal},
24 PhysicalExpr,
25};
26
27use arrow::datatypes::Schema;
28use datafusion_common::{
29 exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
30};
31use datafusion_expr::execution_props::ExecutionProps;
32use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction};
33use datafusion_expr::var_provider::is_system_variables;
34use datafusion_expr::var_provider::VarType;
35use datafusion_expr::{
36 binary_expr, lit, Between, BinaryExpr, Expr, Like, Operator, TryCast,
37};
38
39pub fn create_physical_expr(
108 e: &Expr,
109 input_dfschema: &DFSchema,
110 execution_props: &ExecutionProps,
111) -> Result<Arc<dyn PhysicalExpr>> {
112 let input_schema: &Schema = &input_dfschema.into();
113
114 match e {
115 Expr::Alias(Alias { expr, metadata, .. }) => {
116 if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
117 let mut new_metadata = prior_metadata
118 .as_ref()
119 .map(|m| {
120 m.iter()
121 .map(|(k, v)| (k.clone(), v.clone()))
122 .collect::<HashMap<String, String>>()
123 })
124 .unwrap_or_default();
125 if let Some(metadata) = metadata {
126 new_metadata.extend(metadata.clone());
127 }
128 let new_metadata = match new_metadata.is_empty() {
129 true => None,
130 false => Some(new_metadata),
131 };
132
133 Ok(Arc::new(Literal::new_with_metadata(
134 v.clone(),
135 new_metadata,
136 )))
137 } else {
138 Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
139 }
140 }
141 Expr::Column(c) => {
142 let idx = input_dfschema.index_of_column(c)?;
143 Ok(Arc::new(Column::new(&c.name, idx)))
144 }
145 Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
146 value.clone(),
147 metadata
148 .as_ref()
149 .map(|m| m.iter().map(|(k, v)| (k.clone(), v.clone())).collect()),
150 ))),
151 Expr::ScalarVariable(_, variable_names) => {
152 if is_system_variables(variable_names) {
153 match execution_props.get_var_provider(VarType::System) {
154 Some(provider) => {
155 let scalar_value = provider.get_value(variable_names.clone())?;
156 Ok(Arc::new(Literal::new(scalar_value)))
157 }
158 _ => plan_err!("No system variable provider found"),
159 }
160 } else {
161 match execution_props.get_var_provider(VarType::UserDefined) {
162 Some(provider) => {
163 let scalar_value = provider.get_value(variable_names.clone())?;
164 Ok(Arc::new(Literal::new(scalar_value)))
165 }
166 _ => plan_err!("No user defined variable provider found"),
167 }
168 }
169 }
170 Expr::IsTrue(expr) => {
171 let binary_op = binary_expr(
172 expr.as_ref().clone(),
173 Operator::IsNotDistinctFrom,
174 lit(true),
175 );
176 create_physical_expr(&binary_op, input_dfschema, execution_props)
177 }
178 Expr::IsNotTrue(expr) => {
179 let binary_op =
180 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
181 create_physical_expr(&binary_op, input_dfschema, execution_props)
182 }
183 Expr::IsFalse(expr) => {
184 let binary_op = binary_expr(
185 expr.as_ref().clone(),
186 Operator::IsNotDistinctFrom,
187 lit(false),
188 );
189 create_physical_expr(&binary_op, input_dfschema, execution_props)
190 }
191 Expr::IsNotFalse(expr) => {
192 let binary_op =
193 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
194 create_physical_expr(&binary_op, input_dfschema, execution_props)
195 }
196 Expr::IsUnknown(expr) => {
197 let binary_op = binary_expr(
198 expr.as_ref().clone(),
199 Operator::IsNotDistinctFrom,
200 Expr::Literal(ScalarValue::Boolean(None), None),
201 );
202 create_physical_expr(&binary_op, input_dfschema, execution_props)
203 }
204 Expr::IsNotUnknown(expr) => {
205 let binary_op = binary_expr(
206 expr.as_ref().clone(),
207 Operator::IsDistinctFrom,
208 Expr::Literal(ScalarValue::Boolean(None), None),
209 );
210 create_physical_expr(&binary_op, input_dfschema, execution_props)
211 }
212 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
213 let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
215 let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
216 binary(lhs, *op, rhs, input_schema)
224 }
225 Expr::Like(Like {
226 negated,
227 expr,
228 pattern,
229 escape_char,
230 case_insensitive,
231 }) => {
232 if escape_char.unwrap_or('\\') != '\\' {
234 return exec_err!(
235 "LIKE does not support escape_char other than the backslash (\\)"
236 );
237 }
238 let physical_expr =
239 create_physical_expr(expr, input_dfschema, execution_props)?;
240 let physical_pattern =
241 create_physical_expr(pattern, input_dfschema, execution_props)?;
242 like(
243 *negated,
244 *case_insensitive,
245 physical_expr,
246 physical_pattern,
247 input_schema,
248 )
249 }
250 Expr::SimilarTo(Like {
251 negated,
252 expr,
253 pattern,
254 escape_char,
255 case_insensitive,
256 }) => {
257 if escape_char.is_some() {
258 return exec_err!("SIMILAR TO does not support escape_char yet");
259 }
260 let physical_expr =
261 create_physical_expr(expr, input_dfschema, execution_props)?;
262 let physical_pattern =
263 create_physical_expr(pattern, input_dfschema, execution_props)?;
264 similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
265 }
266 Expr::Case(case) => {
267 let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
268 Some(create_physical_expr(
269 e.as_ref(),
270 input_dfschema,
271 execution_props,
272 )?)
273 } else {
274 None
275 };
276 let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
277 .when_then_expr
278 .iter()
279 .map(|(w, t)| (w.as_ref(), t.as_ref()))
280 .unzip();
281 let when_expr =
282 create_physical_exprs(when_expr, input_dfschema, execution_props)?;
283 let then_expr =
284 create_physical_exprs(then_expr, input_dfschema, execution_props)?;
285 let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
286 when_expr
287 .iter()
288 .zip(then_expr.iter())
289 .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
290 .collect();
291 let else_expr: Option<Arc<dyn PhysicalExpr>> =
292 if let Some(e) = &case.else_expr {
293 Some(create_physical_expr(
294 e.as_ref(),
295 input_dfschema,
296 execution_props,
297 )?)
298 } else {
299 None
300 };
301 Ok(expressions::case(expr, when_then_expr, else_expr)?)
302 }
303 Expr::Cast(Cast { expr, data_type }) => expressions::cast(
304 create_physical_expr(expr, input_dfschema, execution_props)?,
305 input_schema,
306 data_type.clone(),
307 ),
308 Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
309 create_physical_expr(expr, input_dfschema, execution_props)?,
310 input_schema,
311 data_type.clone(),
312 ),
313 Expr::Not(expr) => {
314 expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
315 }
316 Expr::Negative(expr) => expressions::negative(
317 create_physical_expr(expr, input_dfschema, execution_props)?,
318 input_schema,
319 ),
320 Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
321 expr,
322 input_dfschema,
323 execution_props,
324 )?),
325 Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
326 expr,
327 input_dfschema,
328 execution_props,
329 )?),
330 Expr::ScalarFunction(ScalarFunction { func, args }) => {
331 let physical_args =
332 create_physical_exprs(args, input_dfschema, execution_props)?;
333
334 Ok(Arc::new(ScalarFunctionExpr::try_new(
335 Arc::clone(func),
336 physical_args,
337 input_schema,
338 )?))
339 }
340 Expr::Between(Between {
341 expr,
342 negated,
343 low,
344 high,
345 }) => {
346 let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
347 let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
348 let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
349
350 let binary_expr = binary(
352 binary(
353 Arc::clone(&value_expr),
354 Operator::GtEq,
355 low_expr,
356 input_schema,
357 )?,
358 Operator::And,
359 binary(
360 Arc::clone(&value_expr),
361 Operator::LtEq,
362 high_expr,
363 input_schema,
364 )?,
365 input_schema,
366 );
367
368 if *negated {
369 expressions::not(binary_expr?)
370 } else {
371 binary_expr
372 }
373 }
374 Expr::InList(InList {
375 expr,
376 list,
377 negated,
378 }) => match expr.as_ref() {
379 Expr::Literal(ScalarValue::Utf8(None), _) => {
380 Ok(expressions::lit(ScalarValue::Boolean(None)))
381 }
382 _ => {
383 let value_expr =
384 create_physical_expr(expr, input_dfschema, execution_props)?;
385
386 let list_exprs =
387 create_physical_exprs(list, input_dfschema, execution_props)?;
388 expressions::in_list(value_expr, list_exprs, negated, input_schema)
389 }
390 },
391 Expr::Placeholder(Placeholder { id, .. }) => {
392 exec_err!("Placeholder '{id}' was not provided a value for execution.")
393 }
394 other => {
395 not_impl_err!("Physical plan does not support logical expression {other:?}")
396 }
397 }
398}
399
400pub fn create_physical_exprs<'a, I>(
402 exprs: I,
403 input_dfschema: &DFSchema,
404 execution_props: &ExecutionProps,
405) -> Result<Vec<Arc<dyn PhysicalExpr>>>
406where
407 I: IntoIterator<Item = &'a Expr>,
408{
409 exprs
410 .into_iter()
411 .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
412 .collect::<Result<Vec<_>>>()
413}
414
415pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
417 let df_schema = schema.clone().to_dfschema().unwrap();
418 let execution_props = ExecutionProps::new();
419 create_physical_expr(expr, &df_schema, &execution_props).unwrap()
420}
421
422#[cfg(test)]
423mod tests {
424 use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
425 use arrow::datatypes::{DataType, Field};
426
427 use datafusion_expr::{col, lit};
428
429 use super::*;
430
431 #[test]
432 fn test_create_physical_expr_scalar_input_output() -> Result<()> {
433 let expr = col("letter").eq(lit("A"));
434
435 let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
436 let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
437 let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
438
439 let batch = RecordBatch::try_new(
440 Arc::new(schema),
441 vec![Arc::new(StringArray::from_iter_values(vec![
442 "A", "B", "C", "D",
443 ]))],
444 )?;
445 let result = p.evaluate(&batch)?;
446 let result = result.into_array(4).expect("Failed to convert to array");
447
448 assert_eq!(
449 &result,
450 &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
451 );
452
453 Ok(())
454 }
455}