1use arrow::datatypes::{DataType, TimeUnit};
19use datafusion_expr::planner::{
20 PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr,
21};
22use sqlparser::ast::{
23 AccessExpr, BinaryOperator, CastFormat, CastKind, CeilFloorKind,
24 DataType as SQLDataType, DateTimeField, DictionaryField, Expr as SQLExpr,
25 ExprWithAlias as SQLExprWithAlias, JsonPath, MapEntry, StructField, Subscript,
26 TrimWhereField, TypedString, Value, ValueWithSpan,
27};
28
29use datafusion_common::{
30 DFSchema, Result, ScalarValue, internal_datafusion_err, internal_err, not_impl_err,
31 plan_err,
32};
33
34use datafusion_expr::expr::ScalarFunction;
35use datafusion_expr::expr::SetQuantifier;
36use datafusion_expr::expr::{InList, WildcardOptions};
37use datafusion_expr::{
38 Between, BinaryExpr, Cast, Expr, ExprSchemable, GetFieldAccess, Like, Literal,
39 Operator, TryCast, lit, when,
40};
41
42use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
43use datafusion_functions_nested::expr_fn::{
44 array_has, array_max, array_min, array_position, cardinality,
45};
46
47mod binary_op;
48mod function;
49mod grouping_set;
50mod identifier;
51mod order_by;
52mod subquery;
53mod substring;
54mod unary_op;
55mod value;
56
57impl<S: ContextProvider> SqlToRel<'_, S> {
58 pub(crate) fn sql_expr_to_logical_expr_with_alias(
59 &self,
60 sql: SQLExprWithAlias,
61 schema: &DFSchema,
62 planner_context: &mut PlannerContext,
63 ) -> Result<Expr> {
64 let mut expr =
65 self.sql_expr_to_logical_expr(sql.expr, schema, planner_context)?;
66 if let Some(alias) = sql.alias {
67 expr = expr.alias(alias.value);
68 }
69 Ok(expr)
70 }
71 pub(crate) fn sql_expr_to_logical_expr(
72 &self,
73 sql: SQLExpr,
74 schema: &DFSchema,
75 planner_context: &mut PlannerContext,
76 ) -> Result<Expr> {
77 enum StackEntry {
78 SQLExpr(Box<SQLExpr>),
79 Operator(BinaryOperator),
80 }
81
82 let mut stack = vec![StackEntry::SQLExpr(Box::new(sql))];
87 let mut eval_stack = vec![];
88
89 while let Some(entry) = stack.pop() {
90 match entry {
91 StackEntry::SQLExpr(sql_expr) => {
92 match *sql_expr {
93 SQLExpr::BinaryOp { left, op, right } => {
94 stack.push(StackEntry::Operator(op));
97 stack.push(StackEntry::SQLExpr(right));
98 stack.push(StackEntry::SQLExpr(left));
99 }
100 _ => {
101 let expr = self.sql_expr_to_logical_expr_internal(
102 *sql_expr,
103 schema,
104 planner_context,
105 )?;
106 eval_stack.push(expr);
107 }
108 }
109 }
110 StackEntry::Operator(op) => {
111 let right = eval_stack.pop().unwrap();
112 let left = eval_stack.pop().unwrap();
113 let expr = self.build_logical_expr(op, left, right, schema)?;
114 eval_stack.push(expr);
115 }
116 }
117 }
118
119 assert_eq!(1, eval_stack.len());
120 let expr = eval_stack.pop().unwrap();
121 Ok(expr)
122 }
123
124 fn build_logical_expr(
125 &self,
126 op: BinaryOperator,
127 left: Expr,
128 right: Expr,
129 schema: &DFSchema,
130 ) -> Result<Expr> {
131 let mut binary_expr = RawBinaryExpr { op, left, right };
133 for planner in self.context_provider.get_expr_planners() {
134 match planner.plan_binary_op(binary_expr, schema)? {
135 PlannerResult::Planned(expr) => {
136 return Ok(expr);
137 }
138 PlannerResult::Original(expr) => {
139 binary_expr = expr;
140 }
141 }
142 }
143
144 let RawBinaryExpr { op, left, right } = binary_expr;
145 Ok(Expr::BinaryExpr(BinaryExpr::new(
146 Box::new(left),
147 self.parse_sql_binary_op(&op)?,
148 Box::new(right),
149 )))
150 }
151
152 pub fn sql_to_expr_with_alias(
153 &self,
154 sql: SQLExprWithAlias,
155 schema: &DFSchema,
156 planner_context: &mut PlannerContext,
157 ) -> Result<Expr> {
158 let mut expr =
159 self.sql_expr_to_logical_expr_with_alias(sql, schema, planner_context)?;
160 expr = self.rewrite_partial_qualifier(expr, schema);
161 self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?;
162 let (expr, _) = expr.infer_placeholder_types(schema)?;
163 Ok(expr)
164 }
165
166 pub fn sql_to_expr(
168 &self,
169 sql: SQLExpr,
170 schema: &DFSchema,
171 planner_context: &mut PlannerContext,
172 ) -> Result<Expr> {
173 let mut expr = self.sql_expr_to_logical_expr(sql, schema, planner_context)?;
175 expr = self.rewrite_partial_qualifier(expr, schema);
176 self.validate_schema_satisfies_exprs(schema, std::slice::from_ref(&expr))?;
177 let (expr, _) = expr.infer_placeholder_types(schema)?;
178 Ok(expr)
179 }
180
181 fn rewrite_partial_qualifier(&self, expr: Expr, schema: &DFSchema) -> Expr {
183 match expr {
184 Expr::Column(col) => match &col.relation {
185 Some(q) => {
186 match schema.iter().find(|(qualifier, field)| match qualifier {
187 Some(field_q) => {
188 field.name() == &col.name
189 && field_q.to_string().ends_with(&format!(".{q}"))
190 }
191 _ => false,
192 }) {
193 Some((qualifier, df_field)) => Expr::from((qualifier, df_field)),
194 None => Expr::Column(col),
195 }
196 }
197 None => Expr::Column(col),
198 },
199 _ => expr,
200 }
201 }
202
203 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
206 fn sql_expr_to_logical_expr_internal(
207 &self,
208 sql: SQLExpr,
209 schema: &DFSchema,
210 planner_context: &mut PlannerContext,
211 ) -> Result<Expr> {
212 match sql {
218 SQLExpr::Value(value) => {
219 self.parse_value(value.into(), planner_context.prepare_param_data_types())
220 }
221 SQLExpr::Extract { field, expr, .. } => {
222 let mut extract_args = vec![
223 Expr::Literal(ScalarValue::from(format!("{field}")), None),
224 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
225 ];
226
227 for planner in self.context_provider.get_expr_planners() {
228 match planner.plan_extract(extract_args)? {
229 PlannerResult::Planned(expr) => return Ok(expr),
230 PlannerResult::Original(args) => {
231 extract_args = args;
232 }
233 }
234 }
235
236 not_impl_err!("Extract not supported by ExprPlanner: {extract_args:?}")
237 }
238
239 SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
240 SQLExpr::Interval(interval) => self.sql_interval_to_expr(false, interval),
241 SQLExpr::Identifier(id) => {
242 self.sql_identifier_to_expr(id, schema, planner_context)
243 }
244
245 SQLExpr::CompoundFieldAccess { root, access_chain } => self
247 .sql_compound_field_access_to_expr(
248 *root,
249 access_chain,
250 schema,
251 planner_context,
252 ),
253
254 SQLExpr::CompoundIdentifier(ids) => {
255 self.sql_compound_identifier_to_expr(ids, schema, planner_context)
256 }
257
258 SQLExpr::Case {
259 operand,
260 conditions,
261 else_result,
262 case_token: _,
263 end_token: _,
264 } => self.sql_case_identifier_to_expr(
265 operand,
266 conditions,
267 else_result,
268 schema,
269 planner_context,
270 ),
271
272 SQLExpr::Cast { array: true, .. } => {
273 not_impl_err!("`CAST(... AS type ARRAY`) not supported")
274 }
275
276 SQLExpr::Cast {
277 kind: CastKind::Cast | CastKind::DoubleColon,
278 expr,
279 data_type,
280 format,
281 array: false,
282 } => {
283 self.sql_cast_to_expr(*expr, &data_type, format, schema, planner_context)
284 }
285
286 SQLExpr::Cast {
287 kind: CastKind::TryCast | CastKind::SafeCast,
288 expr,
289 data_type,
290 format,
291 array: false,
292 } => {
293 if let Some(format) = format {
294 return not_impl_err!("CAST with format is not supported: {format}");
295 }
296
297 Ok(Expr::TryCast(TryCast::new_from_field(
298 Box::new(self.sql_expr_to_logical_expr(
299 *expr,
300 schema,
301 planner_context,
302 )?),
303 self.convert_data_type_to_field(&data_type)?,
304 )))
305 }
306
307 SQLExpr::TypedString(TypedString {
308 data_type,
309 value,
310 uses_odbc_syntax: _,
311 }) => {
312 let value = match value.into_string() {
313 Some(value) => value,
314 None => {
315 return plan_err!("Typed literal requires a string payload");
316 }
317 };
318
319 Ok(Expr::Cast(Cast::new_from_field(
320 Box::new(lit(value)),
321 self.convert_data_type_to_field(&data_type)?,
322 )))
323 }
324
325 SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new(
326 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
327 ))),
328
329 SQLExpr::IsNotNull(expr) => Ok(Expr::IsNotNull(Box::new(
330 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
331 ))),
332
333 SQLExpr::IsDistinctFrom(left, right) => {
334 Ok(Expr::BinaryExpr(BinaryExpr::new(
335 Box::new(self.sql_expr_to_logical_expr(
336 *left,
337 schema,
338 planner_context,
339 )?),
340 Operator::IsDistinctFrom,
341 Box::new(self.sql_expr_to_logical_expr(
342 *right,
343 schema,
344 planner_context,
345 )?),
346 )))
347 }
348
349 SQLExpr::IsNotDistinctFrom(left, right) => {
350 Ok(Expr::BinaryExpr(BinaryExpr::new(
351 Box::new(self.sql_expr_to_logical_expr(
352 *left,
353 schema,
354 planner_context,
355 )?),
356 Operator::IsNotDistinctFrom,
357 Box::new(self.sql_expr_to_logical_expr(
358 *right,
359 schema,
360 planner_context,
361 )?),
362 )))
363 }
364
365 SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(
366 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
367 ))),
368
369 SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(
370 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
371 ))),
372
373 SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(
374 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
375 ))),
376
377 SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(
378 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
379 ))),
380
381 SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(
382 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
383 ))),
384
385 SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(
386 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
387 ))),
388
389 SQLExpr::UnaryOp { op, expr } => {
390 self.parse_sql_unary_op(op, *expr, schema, planner_context)
391 }
392
393 SQLExpr::Between {
394 expr,
395 negated,
396 low,
397 high,
398 } => Ok(Expr::Between(Between::new(
399 Box::new(self.sql_expr_to_logical_expr(
400 *expr,
401 schema,
402 planner_context,
403 )?),
404 negated,
405 Box::new(self.sql_expr_to_logical_expr(*low, schema, planner_context)?),
406 Box::new(self.sql_expr_to_logical_expr(
407 *high,
408 schema,
409 planner_context,
410 )?),
411 ))),
412
413 SQLExpr::InList {
414 expr,
415 list,
416 negated,
417 } => self.sql_in_list_to_expr(*expr, list, negated, schema, planner_context),
418
419 SQLExpr::Like {
420 negated,
421 expr,
422 pattern,
423 escape_char,
424 any,
425 } => self.sql_like_to_expr(
426 negated,
427 *expr,
428 *pattern,
429 escape_char,
430 schema,
431 planner_context,
432 false,
433 any,
434 ),
435
436 SQLExpr::ILike {
437 negated,
438 expr,
439 pattern,
440 escape_char,
441 any,
442 } => self.sql_like_to_expr(
443 negated,
444 *expr,
445 *pattern,
446 escape_char,
447 schema,
448 planner_context,
449 true,
450 any,
451 ),
452
453 SQLExpr::SimilarTo {
454 negated,
455 expr,
456 pattern,
457 escape_char,
458 } => self.sql_similarto_to_expr(
459 negated,
460 *expr,
461 *pattern,
462 escape_char,
463 schema,
464 planner_context,
465 ),
466
467 SQLExpr::BinaryOp { .. } => {
468 internal_err!("binary_op should be handled by sql_expr_to_logical_expr.")
469 }
470
471 #[cfg(feature = "unicode_expressions")]
472 SQLExpr::Substring {
473 expr,
474 substring_from,
475 substring_for,
476 special: _,
477 shorthand: _,
478 } => self.sql_substring_to_expr(
479 expr,
480 substring_from,
481 substring_for,
482 schema,
483 planner_context,
484 ),
485
486 #[cfg(not(feature = "unicode_expressions"))]
487 SQLExpr::Substring { .. } => {
488 internal_err!(
489 "statement substring requires compilation with feature flag: unicode_expressions."
490 )
491 }
492
493 SQLExpr::Trim {
494 expr,
495 trim_where,
496 trim_what,
497 trim_characters,
498 } => self.sql_trim_to_expr(
499 *expr,
500 trim_where,
501 trim_what,
502 trim_characters,
503 schema,
504 planner_context,
505 ),
506
507 SQLExpr::Function(function) => {
508 self.sql_function_to_expr(function, schema, planner_context)
509 }
510
511 SQLExpr::Rollup(exprs) => {
512 self.sql_rollup_to_expr(exprs, schema, planner_context)
513 }
514 SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs, schema, planner_context),
515 SQLExpr::GroupingSets(exprs) => {
516 self.sql_grouping_sets_to_expr(exprs, schema, planner_context)
517 }
518
519 SQLExpr::Floor { expr, field } => match field {
520 CeilFloorKind::DateTimeField(DateTimeField::NoDateTime) => {
521 self.sql_fn_name_to_expr(*expr, "floor", schema, planner_context)
522 }
523 CeilFloorKind::DateTimeField(_) => {
524 not_impl_err!("FLOOR with datetime is not supported")
525 }
526 CeilFloorKind::Scale(_) => {
527 not_impl_err!("FLOOR with scale is not supported")
528 }
529 },
530 SQLExpr::Ceil { expr, field } => match field {
531 CeilFloorKind::DateTimeField(DateTimeField::NoDateTime) => {
532 self.sql_fn_name_to_expr(*expr, "ceil", schema, planner_context)
533 }
534 CeilFloorKind::DateTimeField(_) => {
535 not_impl_err!("CEIL with datetime is not supported")
536 }
537 CeilFloorKind::Scale(_) => {
538 not_impl_err!("CEIL with scale is not supported")
539 }
540 },
541 SQLExpr::Overlay {
542 expr,
543 overlay_what,
544 overlay_from,
545 overlay_for,
546 } => self.sql_overlay_to_expr(
547 *expr,
548 *overlay_what,
549 *overlay_from,
550 overlay_for,
551 schema,
552 planner_context,
553 ),
554 SQLExpr::Nested(e) => {
555 self.sql_expr_to_logical_expr(*e, schema, planner_context)
556 }
557
558 SQLExpr::Exists { subquery, negated } => {
559 self.parse_exists_subquery(*subquery, negated, schema, planner_context)
560 }
561 SQLExpr::InSubquery {
562 expr,
563 subquery,
564 negated,
565 } => {
566 self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context)
567 }
568 SQLExpr::Subquery(subquery) => {
569 self.parse_scalar_subquery(*subquery, schema, planner_context)
570 }
571
572 SQLExpr::Struct { values, fields } => {
573 self.parse_struct(schema, planner_context, values, &fields)
574 }
575 SQLExpr::Position { expr, r#in } => {
576 self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
577 }
578 SQLExpr::AtTimeZone {
579 timestamp,
580 time_zone,
581 } => Ok(Expr::Cast(Cast::new(
582 Box::new(self.sql_expr_to_logical_expr_internal(
583 *timestamp,
584 schema,
585 planner_context,
586 )?),
587 match *time_zone {
588 SQLExpr::Value(ValueWithSpan {
589 value: Value::SingleQuotedString(s),
590 span: _,
591 }) => DataType::Timestamp(TimeUnit::Nanosecond, Some(s.into())),
592 _ => {
593 return not_impl_err!(
594 "Unsupported ast node in sqltorel: {time_zone:?}"
595 );
596 }
597 },
598 ))),
599 SQLExpr::Dictionary(fields) => {
600 self.try_plan_dictionary_literal(fields, schema, planner_context)
601 }
602 SQLExpr::Map(map) => {
603 self.try_plan_map_literal(map.entries, schema, planner_context)
604 }
605 SQLExpr::AnyOp {
606 left,
607 compare_op,
608 right,
609 is_some: _,
612 } => match *right {
613 SQLExpr::Subquery(subquery) => self.parse_set_comparison_subquery(
614 *left,
615 *subquery,
616 &compare_op,
617 SetQuantifier::Any,
618 schema,
619 planner_context,
620 ),
621 _ => {
622 let left_expr = self.sql_to_expr(*left, schema, planner_context)?;
623 let right_expr = self.sql_to_expr(*right, schema, planner_context)?;
624 plan_any_op(left_expr, right_expr, &compare_op)
625 }
626 },
627 SQLExpr::AllOp {
628 left,
629 compare_op,
630 right,
631 } => match *right {
632 SQLExpr::Subquery(subquery) => self.parse_set_comparison_subquery(
633 *left,
634 *subquery,
635 &compare_op,
636 SetQuantifier::All,
637 schema,
638 planner_context,
639 ),
640 _ => {
641 let left_expr = self.sql_to_expr(*left, schema, planner_context)?;
642 let right_expr = self.sql_to_expr(*right, schema, planner_context)?;
643 plan_all_op(&left_expr, &right_expr, &compare_op)
644 }
645 },
646 #[expect(deprecated)]
647 SQLExpr::Wildcard(_token) => Ok(Expr::Wildcard {
648 qualifier: None,
649 options: Box::new(WildcardOptions::default()),
650 }),
651 #[expect(deprecated)]
652 SQLExpr::QualifiedWildcard(object_name, _token) => Ok(Expr::Wildcard {
653 qualifier: Some(self.object_name_to_table_reference(object_name)?),
654 options: Box::new(WildcardOptions::default()),
655 }),
656 SQLExpr::Tuple(values) => self.parse_tuple(schema, planner_context, values),
657 SQLExpr::JsonAccess { value, path } => {
658 self.parse_json_access(schema, planner_context, value, &path)
659 }
660 _ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
661 }
662 }
663
664 fn parse_json_access(
665 &self,
666 schema: &DFSchema,
667 planner_context: &mut PlannerContext,
668 value: Box<SQLExpr>,
669 path: &JsonPath,
670 ) -> Result<Expr> {
671 let json_path = path.to_string();
672 let json_path = if let Some(json_path) = json_path.strip_prefix(":") {
673 json_path.to_owned()
675 } else {
676 json_path
677 };
678 self.build_logical_expr(
679 BinaryOperator::Custom(":".to_owned()),
680 self.sql_to_expr(*value, schema, planner_context)?,
681 Expr::Literal(ScalarValue::Utf8(Some(json_path)), None),
683 schema,
684 )
685 }
686
687 fn parse_struct(
689 &self,
690 schema: &DFSchema,
691 planner_context: &mut PlannerContext,
692 values: Vec<SQLExpr>,
693 fields: &[StructField],
694 ) -> Result<Expr> {
695 if !fields.is_empty() {
696 return not_impl_err!("Struct fields are not supported yet");
697 }
698 let is_named_struct = values
699 .iter()
700 .any(|value| matches!(value, SQLExpr::Named { .. }));
701
702 let mut create_struct_args = if is_named_struct {
703 self.create_named_struct_expr(values, schema, planner_context)?
704 } else {
705 self.create_struct_expr(values, schema, planner_context)?
706 };
707
708 for planner in self.context_provider.get_expr_planners() {
709 match planner.plan_struct_literal(create_struct_args, is_named_struct)? {
710 PlannerResult::Planned(expr) => return Ok(expr),
711 PlannerResult::Original(args) => create_struct_args = args,
712 }
713 }
714 not_impl_err!("Struct not supported by ExprPlanner: {create_struct_args:?}")
715 }
716
717 fn parse_tuple(
718 &self,
719 schema: &DFSchema,
720 planner_context: &mut PlannerContext,
721 values: Vec<SQLExpr>,
722 ) -> Result<Expr> {
723 match values.first() {
724 Some(SQLExpr::Identifier(_))
725 | Some(SQLExpr::Value(_))
726 | Some(SQLExpr::CompoundIdentifier(_)) => {
727 self.parse_struct(schema, planner_context, values, &[])
728 }
729 None => not_impl_err!("Empty tuple not supported yet"),
730 _ => {
731 not_impl_err!("Only identifiers and literals are supported in tuples")
732 }
733 }
734 }
735
736 fn sql_position_to_expr(
737 &self,
738 substr_expr: SQLExpr,
739 str_expr: SQLExpr,
740 schema: &DFSchema,
741 planner_context: &mut PlannerContext,
742 ) -> Result<Expr> {
743 let substr =
744 self.sql_expr_to_logical_expr(substr_expr, schema, planner_context)?;
745 let fullstr = self.sql_expr_to_logical_expr(str_expr, schema, planner_context)?;
746 let mut position_args = vec![fullstr, substr];
747 for planner in self.context_provider.get_expr_planners() {
748 match planner.plan_position(position_args)? {
749 PlannerResult::Planned(expr) => return Ok(expr),
750 PlannerResult::Original(args) => {
751 position_args = args;
752 }
753 }
754 }
755
756 not_impl_err!("Position not supported by ExprPlanner: {position_args:?}")
757 }
758
759 fn try_plan_dictionary_literal(
760 &self,
761 fields: Vec<DictionaryField>,
762 schema: &DFSchema,
763 planner_context: &mut PlannerContext,
764 ) -> Result<Expr> {
765 let mut keys = vec![];
766 let mut values = vec![];
767 for field in fields {
768 let key = lit(field.key.value);
769 let value =
770 self.sql_expr_to_logical_expr(*field.value, schema, planner_context)?;
771 keys.push(key);
772 values.push(value);
773 }
774
775 let mut raw_expr = RawDictionaryExpr { keys, values };
776
777 for planner in self.context_provider.get_expr_planners() {
778 match planner.plan_dictionary_literal(raw_expr, schema)? {
779 PlannerResult::Planned(expr) => {
780 return Ok(expr);
781 }
782 PlannerResult::Original(expr) => raw_expr = expr,
783 }
784 }
785 not_impl_err!("Dictionary not supported by ExprPlanner: {raw_expr:?}")
786 }
787
788 fn try_plan_map_literal(
789 &self,
790 entries: Vec<MapEntry>,
791 schema: &DFSchema,
792 planner_context: &mut PlannerContext,
793 ) -> Result<Expr> {
794 let mut exprs: Vec<_> = entries
795 .into_iter()
796 .flat_map(|entry| vec![entry.key, entry.value].into_iter())
797 .map(|expr| self.sql_expr_to_logical_expr(*expr, schema, planner_context))
798 .collect::<Result<Vec<_>>>()?;
799 for planner in self.context_provider.get_expr_planners() {
800 match planner.plan_make_map(exprs)? {
801 PlannerResult::Planned(expr) => {
802 return Ok(expr);
803 }
804 PlannerResult::Original(expr) => exprs = expr,
805 }
806 }
807 not_impl_err!("MAP not supported by ExprPlanner: {exprs:?}")
808 }
809
810 fn create_named_struct_expr(
813 &self,
814 values: Vec<SQLExpr>,
815 input_schema: &DFSchema,
816 planner_context: &mut PlannerContext,
817 ) -> Result<Vec<Expr>> {
818 Ok(values
819 .into_iter()
820 .enumerate()
821 .map(|(i, value)| {
822 let args = if let SQLExpr::Named { expr, name } = value {
823 [
824 name.value.lit(),
825 self.sql_expr_to_logical_expr(
826 *expr,
827 input_schema,
828 planner_context,
829 )?,
830 ]
831 } else {
832 [
833 format!("c{i}").lit(),
834 self.sql_expr_to_logical_expr(
835 value,
836 input_schema,
837 planner_context,
838 )?,
839 ]
840 };
841
842 Ok(args)
843 })
844 .collect::<Result<Vec<_>>>()?
845 .into_iter()
846 .flatten()
847 .collect())
848 }
849
850 fn create_struct_expr(
854 &self,
855 values: Vec<SQLExpr>,
856 input_schema: &DFSchema,
857 planner_context: &mut PlannerContext,
858 ) -> Result<Vec<Expr>> {
859 values
860 .into_iter()
861 .map(|value| {
862 self.sql_expr_to_logical_expr(value, input_schema, planner_context)
863 })
864 .collect::<Result<Vec<_>>>()
865 }
866
867 fn sql_in_list_to_expr(
868 &self,
869 expr: SQLExpr,
870 list: Vec<SQLExpr>,
871 negated: bool,
872 schema: &DFSchema,
873 planner_context: &mut PlannerContext,
874 ) -> Result<Expr> {
875 let list_expr = list
876 .into_iter()
877 .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
878 .collect::<Result<Vec<_>>>()?;
879
880 Ok(Expr::InList(InList::new(
881 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
882 list_expr,
883 negated,
884 )))
885 }
886
887 #[expect(clippy::too_many_arguments)]
888 fn sql_like_to_expr(
889 &self,
890 negated: bool,
891 expr: SQLExpr,
892 pattern: SQLExpr,
893 escape_char: Option<ValueWithSpan>,
894 schema: &DFSchema,
895 planner_context: &mut PlannerContext,
896 case_insensitive: bool,
897 any: bool,
898 ) -> Result<Expr> {
899 if any {
900 return not_impl_err!("ANY in LIKE expression");
901 }
902 let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
903 let escape_char = match escape_char.map(|v| v.value) {
904 Some(Value::SingleQuotedString(char)) if char.len() == 1 => {
905 Some(char.chars().next().unwrap())
906 }
907 Some(value) => {
908 return plan_err!(
909 "Invalid escape character in LIKE expression. Expected a single character wrapped with single quotes, got {value}"
910 );
911 }
912 None => None,
913 };
914 Ok(Expr::Like(Like::new(
915 negated,
916 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
917 Box::new(pattern),
918 escape_char,
919 case_insensitive,
920 )))
921 }
922
923 fn sql_similarto_to_expr(
924 &self,
925 negated: bool,
926 expr: SQLExpr,
927 pattern: SQLExpr,
928 escape_char: Option<ValueWithSpan>,
929 schema: &DFSchema,
930 planner_context: &mut PlannerContext,
931 ) -> Result<Expr> {
932 let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
933 let pattern_type = pattern.get_type(schema)?;
934 if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
935 return plan_err!("Invalid pattern in SIMILAR TO expression");
936 }
937 let escape_char = match escape_char.map(|v| v.value) {
938 Some(Value::SingleQuotedString(char)) if char.len() == 1 => {
939 Some(char.chars().next().unwrap())
940 }
941 Some(value) => {
942 return plan_err!(
943 "Invalid escape character in SIMILAR TO expression. Expected a single character wrapped with single quotes, got {value}"
944 );
945 }
946 None => None,
947 };
948 Ok(Expr::SimilarTo(Like::new(
949 negated,
950 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
951 Box::new(pattern),
952 escape_char,
953 false,
954 )))
955 }
956
957 fn sql_trim_to_expr(
958 &self,
959 expr: SQLExpr,
960 trim_where: Option<TrimWhereField>,
961 trim_what: Option<Box<SQLExpr>>,
962 trim_characters: Option<Vec<SQLExpr>>,
963 schema: &DFSchema,
964 planner_context: &mut PlannerContext,
965 ) -> Result<Expr> {
966 let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
967 let args = match (trim_what, trim_characters) {
968 (Some(to_trim), None) => {
969 let to_trim =
970 self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?;
971 Ok(vec![arg, to_trim])
972 }
973 (None, Some(trim_characters)) => {
974 if let Some(first) = trim_characters.first() {
975 let to_trim = self.sql_expr_to_logical_expr(
976 first.clone(),
977 schema,
978 planner_context,
979 )?;
980 Ok(vec![arg, to_trim])
981 } else {
982 plan_err!("TRIM CHARACTERS cannot be empty")
983 }
984 }
985 (Some(_), Some(_)) => {
986 plan_err!("Both TRIM and TRIM CHARACTERS cannot be specified")
987 }
988 (None, None) => Ok(vec![arg]),
989 }?;
990
991 let fun_name = match trim_where {
992 Some(TrimWhereField::Leading) => "ltrim",
993 Some(TrimWhereField::Trailing) => "rtrim",
994 Some(TrimWhereField::Both) => "btrim",
995 None => "trim",
996 };
997 let fun = self
998 .context_provider
999 .get_function_meta(fun_name)
1000 .ok_or_else(|| {
1001 internal_datafusion_err!("Unable to find expected '{fun_name}' function")
1002 })?;
1003
1004 Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
1005 }
1006
1007 fn sql_overlay_to_expr(
1008 &self,
1009 expr: SQLExpr,
1010 overlay_what: SQLExpr,
1011 overlay_from: SQLExpr,
1012 overlay_for: Option<Box<SQLExpr>>,
1013 schema: &DFSchema,
1014 planner_context: &mut PlannerContext,
1015 ) -> Result<Expr> {
1016 let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
1017 let what_arg =
1018 self.sql_expr_to_logical_expr(overlay_what, schema, planner_context)?;
1019 let from_arg =
1020 self.sql_expr_to_logical_expr(overlay_from, schema, planner_context)?;
1021 let mut overlay_args = match overlay_for {
1022 Some(for_expr) => {
1023 let for_expr =
1024 self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
1025 vec![arg, what_arg, from_arg, for_expr]
1026 }
1027 None => vec![arg, what_arg, from_arg],
1028 };
1029 for planner in self.context_provider.get_expr_planners() {
1030 match planner.plan_overlay(overlay_args)? {
1031 PlannerResult::Planned(expr) => return Ok(expr),
1032 PlannerResult::Original(args) => overlay_args = args,
1033 }
1034 }
1035 not_impl_err!("Overlay not supported by ExprPlanner: {overlay_args:?}")
1036 }
1037
1038 fn sql_cast_to_expr(
1039 &self,
1040 expr: SQLExpr,
1041 data_type: &SQLDataType,
1042 format: Option<CastFormat>,
1043 schema: &DFSchema,
1044 planner_context: &mut PlannerContext,
1045 ) -> Result<Expr> {
1046 if let Some(format) = format {
1047 return not_impl_err!("CAST with format is not supported: {format}");
1048 }
1049
1050 let dt = self.convert_data_type_to_field(data_type)?;
1051 let expr = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
1052
1053 let expr = match dt.data_type() {
1056 DataType::Timestamp(TimeUnit::Nanosecond, tz)
1057 if expr.get_type(schema)? == DataType::Int64 =>
1058 {
1059 Expr::Cast(Cast::new(
1060 Box::new(expr),
1061 DataType::Timestamp(TimeUnit::Second, tz.clone()),
1062 ))
1063 }
1064 _ => expr,
1065 };
1066
1067 Ok(Expr::Cast(Cast::new_from_field(Box::new(expr), dt)))
1068 }
1069
1070 fn extract_root_and_access_chain(
1088 &self,
1089 root: SQLExpr,
1090 mut access_chain: Vec<AccessExpr>,
1091 schema: &DFSchema,
1092 planner_context: &mut PlannerContext,
1093 ) -> Result<(Expr, Vec<AccessExpr>)> {
1094 let SQLExpr::Identifier(root_ident) = root else {
1095 let root = self.sql_expr_to_logical_expr(root, schema, planner_context)?;
1096 return Ok((root, access_chain));
1097 };
1098
1099 let mut compound_idents = vec![root_ident];
1100 let first_non_ident = access_chain
1101 .iter()
1102 .position(|access| !matches!(access, AccessExpr::Dot(SQLExpr::Identifier(_))))
1103 .unwrap_or(access_chain.len());
1104 for access in access_chain.drain(0..first_non_ident) {
1105 if let AccessExpr::Dot(SQLExpr::Identifier(ident)) = access {
1106 compound_idents.push(ident);
1107 } else {
1108 return internal_err!("Expected identifier in access chain");
1109 }
1110 }
1111
1112 let root = if compound_idents.len() == 1 {
1113 self.sql_identifier_to_expr(
1114 compound_idents.pop().unwrap(),
1115 schema,
1116 planner_context,
1117 )?
1118 } else {
1119 self.sql_compound_identifier_to_expr(
1120 compound_idents,
1121 schema,
1122 planner_context,
1123 )?
1124 };
1125 Ok((root, access_chain))
1126 }
1127
1128 fn sql_compound_field_access_to_expr(
1129 &self,
1130 root: SQLExpr,
1131 access_chain: Vec<AccessExpr>,
1132 schema: &DFSchema,
1133 planner_context: &mut PlannerContext,
1134 ) -> Result<Expr> {
1135 let (root, access_chain) = self.extract_root_and_access_chain(
1136 root,
1137 access_chain,
1138 schema,
1139 planner_context,
1140 )?;
1141 let fields = access_chain
1142 .into_iter()
1143 .map(|field| match field {
1144 AccessExpr::Subscript(subscript) => {
1145 match subscript {
1146 Subscript::Index { index } => {
1147 match index {
1149 SQLExpr::Value(ValueWithSpan {
1150 value:
1151 Value::SingleQuotedString(s)
1152 | Value::DoubleQuotedString(s),
1153 span: _,
1154 }) => Ok(Some(GetFieldAccess::NamedStructField {
1155 name: ScalarValue::from(s),
1156 })),
1157 SQLExpr::JsonAccess { .. } => {
1158 not_impl_err!("JsonAccess")
1159 }
1160 _ => Ok(Some(GetFieldAccess::ListIndex {
1162 key: Box::new(self.sql_expr_to_logical_expr(
1163 index,
1164 schema,
1165 planner_context,
1166 )?),
1167 })),
1168 }
1169 }
1170 Subscript::Slice {
1171 lower_bound,
1172 upper_bound,
1173 stride,
1174 } => {
1175 let lower_bound = if let Some(lower_bound) = lower_bound {
1177 self.sql_expr_to_logical_expr(
1178 lower_bound,
1179 schema,
1180 planner_context,
1181 )
1182 } else {
1183 not_impl_err!("Slice subscript requires a lower bound")
1184 }?;
1185
1186 let upper_bound = if let Some(upper_bound) = upper_bound {
1188 self.sql_expr_to_logical_expr(
1189 upper_bound,
1190 schema,
1191 planner_context,
1192 )
1193 } else {
1194 not_impl_err!("Slice subscript requires an upper bound")
1195 }?;
1196
1197 let stride = if let Some(stride) = stride {
1199 self.sql_expr_to_logical_expr(
1200 stride,
1201 schema,
1202 planner_context,
1203 )?
1204 } else {
1205 lit(1i64)
1206 };
1207
1208 Ok(Some(GetFieldAccess::ListRange {
1209 start: Box::new(lower_bound),
1210 stop: Box::new(upper_bound),
1211 stride: Box::new(stride),
1212 }))
1213 }
1214 }
1215 }
1216 AccessExpr::Dot(expr) => match expr {
1217 SQLExpr::Value(ValueWithSpan {
1218 value: Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
1219 span : _
1220 }) => Ok(Some(GetFieldAccess::NamedStructField {
1221 name: ScalarValue::from(s),
1222 })),
1223 _ => {
1224 not_impl_err!(
1225 "Dot access not supported for non-string expr: {expr:?}"
1226 )
1227 }
1228 },
1229 })
1230 .collect::<Result<Vec<_>>>()?;
1231
1232 fields
1233 .into_iter()
1234 .flatten()
1235 .try_fold(root, |expr, field_access| {
1236 let mut field_access_expr = RawFieldAccessExpr { expr, field_access };
1237 for planner in self.context_provider.get_expr_planners() {
1238 match planner.plan_field_access(field_access_expr, schema)? {
1239 PlannerResult::Planned(expr) => return Ok(expr),
1240 PlannerResult::Original(expr) => {
1241 field_access_expr = expr;
1242 }
1243 }
1244 }
1245 not_impl_err!(
1246 "GetFieldAccess not supported by ExprPlanner: {field_access_expr:?}"
1247 )
1248 })
1249 }
1250}
1251
1252fn any_op_with_null_handling(bound: Expr, comparison: Expr, arr: Expr) -> Result<Expr> {
1262 when(bound.is_not_null(), comparison)
1263 .when(arr.is_not_null(), lit(false))
1264 .otherwise(lit(ScalarValue::Boolean(None)))
1265}
1266
1267fn plan_any_op(
1269 left_expr: Expr,
1270 right_expr: Expr,
1271 compare_op: &BinaryOperator,
1272) -> Result<Expr> {
1273 match compare_op {
1274 BinaryOperator::Eq => Ok(array_has(right_expr, left_expr)),
1275 BinaryOperator::NotEq => {
1276 let min = array_min(right_expr.clone());
1277 let max = array_max(right_expr.clone());
1278 let comparison = min
1280 .not_eq(left_expr.clone())
1281 .or(max.clone().not_eq(left_expr));
1282 any_op_with_null_handling(max, comparison, right_expr)
1283 }
1284 BinaryOperator::Gt => {
1285 let min = array_min(right_expr.clone());
1286 any_op_with_null_handling(min.clone(), min.lt(left_expr), right_expr)
1287 }
1288 BinaryOperator::Lt => {
1289 let max = array_max(right_expr.clone());
1290 any_op_with_null_handling(max.clone(), max.gt(left_expr), right_expr)
1291 }
1292 BinaryOperator::GtEq => {
1293 let min = array_min(right_expr.clone());
1294 any_op_with_null_handling(min.clone(), min.lt_eq(left_expr), right_expr)
1295 }
1296 BinaryOperator::LtEq => {
1297 let max = array_max(right_expr.clone());
1298 any_op_with_null_handling(max.clone(), max.gt_eq(left_expr), right_expr)
1299 }
1300 _ => plan_err!(
1301 "Unsupported AnyOp: '{compare_op}', only '=', '<>', '>', '<', '>=', '<=' are supported"
1302 ),
1303 }
1304}
1305
1306fn plan_all_op(
1316 needle: &Expr,
1317 haystack: &Expr,
1318 compare_op: &BinaryOperator,
1319) -> Result<Expr> {
1320 let null_arr_check = haystack.clone().is_null();
1321 let empty_check = cardinality(haystack.clone()).eq(lit(0u64));
1322 let null_lhs_check = needle.clone().is_null();
1323 let has_nulls =
1326 array_position(haystack.clone(), lit(ScalarValue::Null), lit(1i64)).is_not_null();
1327
1328 let decisive_condition = match compare_op {
1329 BinaryOperator::NotEq => array_has(haystack.clone(), needle.clone()),
1330 BinaryOperator::Eq => {
1331 let all_equal = array_min(haystack.clone())
1332 .eq(needle.clone())
1333 .and(array_max(haystack.clone()).eq(needle.clone()));
1334 Expr::Not(Box::new(all_equal))
1335 }
1336 BinaryOperator::Gt => {
1337 Expr::Not(Box::new(needle.clone().gt(array_max(haystack.clone()))))
1338 }
1339 BinaryOperator::Lt => {
1340 Expr::Not(Box::new(needle.clone().lt(array_min(haystack.clone()))))
1341 }
1342 BinaryOperator::GtEq => {
1343 Expr::Not(Box::new(needle.clone().gt_eq(array_max(haystack.clone()))))
1344 }
1345 BinaryOperator::LtEq => {
1346 Expr::Not(Box::new(needle.clone().lt_eq(array_min(haystack.clone()))))
1347 }
1348 _ => {
1349 return plan_err!(
1350 "Unsupported AllOp: '{compare_op}', only '=', '<>', '>', '<', '>=', '<=' are supported"
1351 );
1352 }
1353 };
1354
1355 let null_bool = lit(ScalarValue::Boolean(None));
1356 when(null_arr_check, null_bool.clone())
1357 .when(empty_check, lit(true))
1358 .when(null_lhs_check, null_bool.clone())
1359 .when(decisive_condition, lit(false))
1360 .when(has_nulls, null_bool)
1361 .otherwise(lit(true))
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366 use std::collections::HashMap;
1367 use std::sync::Arc;
1368
1369 use arrow::datatypes::{Field, Schema};
1370 use sqlparser::dialect::GenericDialect;
1371 use sqlparser::parser::Parser;
1372
1373 use datafusion_common::TableReference;
1374 use datafusion_common::config::ConfigOptions;
1375 use datafusion_expr::logical_plan::builder::LogicalTableSource;
1376 use datafusion_expr::{
1377 AggregateUDF, HigherOrderUDF, ScalarUDF, TableSource, WindowUDF,
1378 };
1379
1380 use super::*;
1381
1382 struct TestContextProvider {
1383 options: ConfigOptions,
1384 tables: HashMap<String, Arc<dyn TableSource>>,
1385 }
1386
1387 impl TestContextProvider {
1388 pub fn new() -> Self {
1389 let mut tables = HashMap::new();
1390 tables.insert(
1391 "table1".to_string(),
1392 create_table_source(vec![Field::new(
1393 "column1".to_string(),
1394 DataType::Utf8,
1395 false,
1396 )]),
1397 );
1398
1399 Self {
1400 options: Default::default(),
1401 tables,
1402 }
1403 }
1404 }
1405
1406 impl ContextProvider for TestContextProvider {
1407 fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
1408 match self.tables.get(name.table()) {
1409 Some(table) => Ok(Arc::clone(table)),
1410 _ => plan_err!("Table not found: {}", name.table()),
1411 }
1412 }
1413
1414 fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
1415 None
1416 }
1417
1418 fn get_higher_order_meta(&self, _name: &str) -> Option<Arc<HigherOrderUDF>> {
1419 None
1420 }
1421
1422 fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
1423 match name {
1424 "sum" => Some(datafusion_functions_aggregate::sum::sum_udaf()),
1425 _ => None,
1426 }
1427 }
1428
1429 fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
1430 None
1431 }
1432
1433 fn options(&self) -> &ConfigOptions {
1434 &self.options
1435 }
1436
1437 fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
1438 None
1439 }
1440
1441 fn udf_names(&self) -> Vec<String> {
1442 Vec::new()
1443 }
1444
1445 fn higher_order_function_names(&self) -> Vec<String> {
1446 Vec::new()
1447 }
1448
1449 fn udaf_names(&self) -> Vec<String> {
1450 vec!["sum".to_string()]
1451 }
1452
1453 fn udwf_names(&self) -> Vec<String> {
1454 Vec::new()
1455 }
1456 }
1457
1458 fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
1459 Arc::new(LogicalTableSource::new(Arc::new(
1460 Schema::new_with_metadata(fields, HashMap::new()),
1461 )))
1462 }
1463
1464 macro_rules! test_stack_overflow {
1465 ($name:ident, $num_expr:expr) => {
1466 #[test]
1467 fn $name() {
1468 let schema = DFSchema::empty();
1469 let mut planner_context = PlannerContext::default();
1470
1471 let expr_str = (0..$num_expr)
1472 .map(|i| format!("column1 = 'value{:?}'", i))
1473 .collect::<Vec<String>>()
1474 .join(" OR ");
1475
1476 let dialect = GenericDialect {};
1477 let mut parser = Parser::new(&dialect)
1478 .try_with_sql(expr_str.as_str())
1479 .unwrap();
1480 let sql_expr = parser.parse_expr().unwrap();
1481
1482 let context_provider = TestContextProvider::new();
1483 let sql_to_rel = SqlToRel::new(&context_provider);
1484
1485 sql_to_rel
1487 .sql_expr_to_logical_expr(sql_expr, &schema, &mut planner_context)
1488 .unwrap();
1489 }
1490 };
1491 }
1492
1493 test_stack_overflow!(test_stack_overflow_64, 64);
1494 test_stack_overflow!(test_stack_overflow_128, 128);
1495 test_stack_overflow!(test_stack_overflow_256, 256);
1496 test_stack_overflow!(test_stack_overflow_512, 512);
1497 test_stack_overflow!(test_stack_overflow_1024, 1024);
1498 test_stack_overflow!(test_stack_overflow_2048, 2048);
1499 test_stack_overflow!(test_stack_overflow_4096, 4096);
1500 test_stack_overflow!(test_stack_overflow_8192, 8192);
1501 #[test]
1502 fn test_sql_to_expr_with_alias() {
1503 let schema = DFSchema::empty();
1504 let mut planner_context = PlannerContext::default();
1505
1506 let expr_str = "SUM(int_col) as sum_int_col";
1507
1508 let dialect = GenericDialect {};
1509 let mut parser = Parser::new(&dialect).try_with_sql(expr_str).unwrap();
1510 let sql_expr = parser.parse_expr_with_alias().unwrap();
1512
1513 let context_provider = TestContextProvider::new();
1514 let sql_to_rel = SqlToRel::new(&context_provider);
1515
1516 let expr = sql_to_rel
1517 .sql_expr_to_logical_expr_with_alias(sql_expr, &schema, &mut planner_context)
1518 .unwrap();
1519
1520 assert!(matches!(expr, Expr::Alias(_)));
1521 }
1522}