1use arrow::datatypes::{DataType, TimeUnit};
19use datafusion_expr::planner::{
20 PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr,
21};
22use sqlparser::ast::{
23 AccessExpr, BinaryOperator, CastFormat, CastKind, CeilFloorKind,
24 DataType as SQLDataType, DateTimeField, DictionaryField, Expr as SQLExpr,
25 ExprWithAlias as SQLExprWithAlias, JsonPath, MapEntry, StructField, Subscript,
26 TrimWhereField, TypedString, Value, ValueWithSpan,
27};
28
29use datafusion_common::{
30 DFSchema, Result, ScalarValue, internal_datafusion_err, internal_err, not_impl_err,
31 plan_err,
32};
33
34use datafusion_expr::expr::ScalarFunction;
35use datafusion_expr::expr::SetQuantifier;
36use datafusion_expr::expr::{InList, WildcardOptions};
37use datafusion_expr::{
38 Between, BinaryExpr, Cast, Expr, ExprSchemable, GetFieldAccess, Like, Literal,
39 Operator, TryCast, lit,
40};
41
42use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
43use datafusion_functions_nested::expr_fn::array_has;
44
45mod binary_op;
46mod function;
47mod grouping_set;
48mod identifier;
49mod order_by;
50mod subquery;
51mod substring;
52mod unary_op;
53mod value;
54
55impl<S: ContextProvider> SqlToRel<'_, S> {
56 pub(crate) fn sql_expr_to_logical_expr_with_alias(
57 &self,
58 sql: SQLExprWithAlias,
59 schema: &DFSchema,
60 planner_context: &mut PlannerContext,
61 ) -> Result<Expr> {
62 let mut expr =
63 self.sql_expr_to_logical_expr(sql.expr, schema, planner_context)?;
64 if let Some(alias) = sql.alias {
65 expr = expr.alias(alias.value);
66 }
67 Ok(expr)
68 }
69 pub(crate) fn sql_expr_to_logical_expr(
70 &self,
71 sql: SQLExpr,
72 schema: &DFSchema,
73 planner_context: &mut PlannerContext,
74 ) -> Result<Expr> {
75 enum StackEntry {
76 SQLExpr(Box<SQLExpr>),
77 Operator(BinaryOperator),
78 }
79
80 let mut stack = vec![StackEntry::SQLExpr(Box::new(sql))];
85 let mut eval_stack = vec![];
86
87 while let Some(entry) = stack.pop() {
88 match entry {
89 StackEntry::SQLExpr(sql_expr) => {
90 match *sql_expr {
91 SQLExpr::BinaryOp { left, op, right } => {
92 stack.push(StackEntry::Operator(op));
95 stack.push(StackEntry::SQLExpr(right));
96 stack.push(StackEntry::SQLExpr(left));
97 }
98 _ => {
99 let expr = self.sql_expr_to_logical_expr_internal(
100 *sql_expr,
101 schema,
102 planner_context,
103 )?;
104 eval_stack.push(expr);
105 }
106 }
107 }
108 StackEntry::Operator(op) => {
109 let right = eval_stack.pop().unwrap();
110 let left = eval_stack.pop().unwrap();
111 let expr = self.build_logical_expr(op, left, right, schema)?;
112 eval_stack.push(expr);
113 }
114 }
115 }
116
117 assert_eq!(1, eval_stack.len());
118 let expr = eval_stack.pop().unwrap();
119 Ok(expr)
120 }
121
122 fn build_logical_expr(
123 &self,
124 op: BinaryOperator,
125 left: Expr,
126 right: Expr,
127 schema: &DFSchema,
128 ) -> Result<Expr> {
129 let mut binary_expr = RawBinaryExpr { op, left, right };
131 for planner in self.context_provider.get_expr_planners() {
132 match planner.plan_binary_op(binary_expr, schema)? {
133 PlannerResult::Planned(expr) => {
134 return Ok(expr);
135 }
136 PlannerResult::Original(expr) => {
137 binary_expr = expr;
138 }
139 }
140 }
141
142 let RawBinaryExpr { op, left, right } = binary_expr;
143 Ok(Expr::BinaryExpr(BinaryExpr::new(
144 Box::new(left),
145 self.parse_sql_binary_op(&op)?,
146 Box::new(right),
147 )))
148 }
149
150 pub fn sql_to_expr_with_alias(
151 &self,
152 sql: SQLExprWithAlias,
153 schema: &DFSchema,
154 planner_context: &mut PlannerContext,
155 ) -> Result<Expr> {
156 let mut expr =
157 self.sql_expr_to_logical_expr_with_alias(sql, schema, planner_context)?;
158 expr = self.rewrite_partial_qualifier(expr, schema);
159 self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?;
160 let (expr, _) = expr.infer_placeholder_types(schema)?;
161 Ok(expr)
162 }
163
164 pub fn sql_to_expr(
166 &self,
167 sql: SQLExpr,
168 schema: &DFSchema,
169 planner_context: &mut PlannerContext,
170 ) -> Result<Expr> {
171 let mut expr = self.sql_expr_to_logical_expr(sql, schema, planner_context)?;
173 expr = self.rewrite_partial_qualifier(expr, schema);
174 self.validate_schema_satisfies_exprs(schema, std::slice::from_ref(&expr))?;
175 let (expr, _) = expr.infer_placeholder_types(schema)?;
176 Ok(expr)
177 }
178
179 fn rewrite_partial_qualifier(&self, expr: Expr, schema: &DFSchema) -> Expr {
181 match expr {
182 Expr::Column(col) => match &col.relation {
183 Some(q) => {
184 match schema.iter().find(|(qualifier, field)| match qualifier {
185 Some(field_q) => {
186 field.name() == &col.name
187 && field_q.to_string().ends_with(&format!(".{q}"))
188 }
189 _ => false,
190 }) {
191 Some((qualifier, df_field)) => Expr::from((qualifier, df_field)),
192 None => Expr::Column(col),
193 }
194 }
195 None => Expr::Column(col),
196 },
197 _ => expr,
198 }
199 }
200
201 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
204 fn sql_expr_to_logical_expr_internal(
205 &self,
206 sql: SQLExpr,
207 schema: &DFSchema,
208 planner_context: &mut PlannerContext,
209 ) -> Result<Expr> {
210 match sql {
216 SQLExpr::Value(value) => {
217 self.parse_value(value.into(), planner_context.prepare_param_data_types())
218 }
219 SQLExpr::Extract { field, expr, .. } => {
220 let mut extract_args = vec![
221 Expr::Literal(ScalarValue::from(format!("{field}")), None),
222 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
223 ];
224
225 for planner in self.context_provider.get_expr_planners() {
226 match planner.plan_extract(extract_args)? {
227 PlannerResult::Planned(expr) => return Ok(expr),
228 PlannerResult::Original(args) => {
229 extract_args = args;
230 }
231 }
232 }
233
234 not_impl_err!("Extract not supported by ExprPlanner: {extract_args:?}")
235 }
236
237 SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
238 SQLExpr::Interval(interval) => self.sql_interval_to_expr(false, interval),
239 SQLExpr::Identifier(id) => {
240 self.sql_identifier_to_expr(id, schema, planner_context)
241 }
242
243 SQLExpr::CompoundFieldAccess { root, access_chain } => self
245 .sql_compound_field_access_to_expr(
246 *root,
247 access_chain,
248 schema,
249 planner_context,
250 ),
251
252 SQLExpr::CompoundIdentifier(ids) => {
253 self.sql_compound_identifier_to_expr(ids, schema, planner_context)
254 }
255
256 SQLExpr::Case {
257 operand,
258 conditions,
259 else_result,
260 case_token: _,
261 end_token: _,
262 } => self.sql_case_identifier_to_expr(
263 operand,
264 conditions,
265 else_result,
266 schema,
267 planner_context,
268 ),
269
270 SQLExpr::Cast { array: true, .. } => {
271 not_impl_err!("`CAST(... AS type ARRAY`) not supported")
272 }
273
274 SQLExpr::Cast {
275 kind: CastKind::Cast | CastKind::DoubleColon,
276 expr,
277 data_type,
278 format,
279 array: false,
280 } => {
281 self.sql_cast_to_expr(*expr, &data_type, format, schema, planner_context)
282 }
283
284 SQLExpr::Cast {
285 kind: CastKind::TryCast | CastKind::SafeCast,
286 expr,
287 data_type,
288 format,
289 array: false,
290 } => {
291 if let Some(format) = format {
292 return not_impl_err!("CAST with format is not supported: {format}");
293 }
294
295 Ok(Expr::TryCast(TryCast::new(
296 Box::new(self.sql_expr_to_logical_expr(
297 *expr,
298 schema,
299 planner_context,
300 )?),
301 self.convert_data_type_to_field(&data_type)?
302 .data_type()
303 .clone(),
304 )))
305 }
306
307 SQLExpr::TypedString(TypedString {
308 data_type,
309 value,
310 uses_odbc_syntax: _,
311 }) => Ok(Expr::Cast(Cast::new(
312 Box::new(lit(value.into_string().unwrap())),
313 self.convert_data_type_to_field(&data_type)?
314 .data_type()
315 .clone(),
316 ))),
317
318 SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new(
319 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
320 ))),
321
322 SQLExpr::IsNotNull(expr) => Ok(Expr::IsNotNull(Box::new(
323 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
324 ))),
325
326 SQLExpr::IsDistinctFrom(left, right) => {
327 Ok(Expr::BinaryExpr(BinaryExpr::new(
328 Box::new(self.sql_expr_to_logical_expr(
329 *left,
330 schema,
331 planner_context,
332 )?),
333 Operator::IsDistinctFrom,
334 Box::new(self.sql_expr_to_logical_expr(
335 *right,
336 schema,
337 planner_context,
338 )?),
339 )))
340 }
341
342 SQLExpr::IsNotDistinctFrom(left, right) => {
343 Ok(Expr::BinaryExpr(BinaryExpr::new(
344 Box::new(self.sql_expr_to_logical_expr(
345 *left,
346 schema,
347 planner_context,
348 )?),
349 Operator::IsNotDistinctFrom,
350 Box::new(self.sql_expr_to_logical_expr(
351 *right,
352 schema,
353 planner_context,
354 )?),
355 )))
356 }
357
358 SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(
359 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
360 ))),
361
362 SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(
363 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
364 ))),
365
366 SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(
367 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
368 ))),
369
370 SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(
371 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
372 ))),
373
374 SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(
375 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
376 ))),
377
378 SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(
379 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
380 ))),
381
382 SQLExpr::UnaryOp { op, expr } => {
383 self.parse_sql_unary_op(op, *expr, schema, planner_context)
384 }
385
386 SQLExpr::Between {
387 expr,
388 negated,
389 low,
390 high,
391 } => Ok(Expr::Between(Between::new(
392 Box::new(self.sql_expr_to_logical_expr(
393 *expr,
394 schema,
395 planner_context,
396 )?),
397 negated,
398 Box::new(self.sql_expr_to_logical_expr(*low, schema, planner_context)?),
399 Box::new(self.sql_expr_to_logical_expr(
400 *high,
401 schema,
402 planner_context,
403 )?),
404 ))),
405
406 SQLExpr::InList {
407 expr,
408 list,
409 negated,
410 } => self.sql_in_list_to_expr(*expr, list, negated, schema, planner_context),
411
412 SQLExpr::Like {
413 negated,
414 expr,
415 pattern,
416 escape_char,
417 any,
418 } => self.sql_like_to_expr(
419 negated,
420 *expr,
421 *pattern,
422 escape_char,
423 schema,
424 planner_context,
425 false,
426 any,
427 ),
428
429 SQLExpr::ILike {
430 negated,
431 expr,
432 pattern,
433 escape_char,
434 any,
435 } => self.sql_like_to_expr(
436 negated,
437 *expr,
438 *pattern,
439 escape_char,
440 schema,
441 planner_context,
442 true,
443 any,
444 ),
445
446 SQLExpr::SimilarTo {
447 negated,
448 expr,
449 pattern,
450 escape_char,
451 } => self.sql_similarto_to_expr(
452 negated,
453 *expr,
454 *pattern,
455 escape_char,
456 schema,
457 planner_context,
458 ),
459
460 SQLExpr::BinaryOp { .. } => {
461 internal_err!("binary_op should be handled by sql_expr_to_logical_expr.")
462 }
463
464 #[cfg(feature = "unicode_expressions")]
465 SQLExpr::Substring {
466 expr,
467 substring_from,
468 substring_for,
469 special: _,
470 shorthand: _,
471 } => self.sql_substring_to_expr(
472 expr,
473 substring_from,
474 substring_for,
475 schema,
476 planner_context,
477 ),
478
479 #[cfg(not(feature = "unicode_expressions"))]
480 SQLExpr::Substring { .. } => {
481 internal_err!(
482 "statement substring requires compilation with feature flag: unicode_expressions."
483 )
484 }
485
486 SQLExpr::Trim {
487 expr,
488 trim_where,
489 trim_what,
490 trim_characters,
491 } => self.sql_trim_to_expr(
492 *expr,
493 trim_where,
494 trim_what,
495 trim_characters,
496 schema,
497 planner_context,
498 ),
499
500 SQLExpr::Function(function) => {
501 self.sql_function_to_expr(function, schema, planner_context)
502 }
503
504 SQLExpr::Rollup(exprs) => {
505 self.sql_rollup_to_expr(exprs, schema, planner_context)
506 }
507 SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs, schema, planner_context),
508 SQLExpr::GroupingSets(exprs) => {
509 self.sql_grouping_sets_to_expr(exprs, schema, planner_context)
510 }
511
512 SQLExpr::Floor { expr, field } => match field {
513 CeilFloorKind::DateTimeField(DateTimeField::NoDateTime) => {
514 self.sql_fn_name_to_expr(*expr, "floor", schema, planner_context)
515 }
516 CeilFloorKind::DateTimeField(_) => {
517 not_impl_err!("FLOOR with datetime is not supported")
518 }
519 CeilFloorKind::Scale(_) => {
520 not_impl_err!("FLOOR with scale is not supported")
521 }
522 },
523 SQLExpr::Ceil { expr, field } => match field {
524 CeilFloorKind::DateTimeField(DateTimeField::NoDateTime) => {
525 self.sql_fn_name_to_expr(*expr, "ceil", schema, planner_context)
526 }
527 CeilFloorKind::DateTimeField(_) => {
528 not_impl_err!("CEIL with datetime is not supported")
529 }
530 CeilFloorKind::Scale(_) => {
531 not_impl_err!("CEIL with scale is not supported")
532 }
533 },
534 SQLExpr::Overlay {
535 expr,
536 overlay_what,
537 overlay_from,
538 overlay_for,
539 } => self.sql_overlay_to_expr(
540 *expr,
541 *overlay_what,
542 *overlay_from,
543 overlay_for,
544 schema,
545 planner_context,
546 ),
547 SQLExpr::Nested(e) => {
548 self.sql_expr_to_logical_expr(*e, schema, planner_context)
549 }
550
551 SQLExpr::Exists { subquery, negated } => {
552 self.parse_exists_subquery(*subquery, negated, schema, planner_context)
553 }
554 SQLExpr::InSubquery {
555 expr,
556 subquery,
557 negated,
558 } => {
559 self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context)
560 }
561 SQLExpr::Subquery(subquery) => {
562 self.parse_scalar_subquery(*subquery, schema, planner_context)
563 }
564
565 SQLExpr::Struct { values, fields } => {
566 self.parse_struct(schema, planner_context, values, &fields)
567 }
568 SQLExpr::Position { expr, r#in } => {
569 self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
570 }
571 SQLExpr::AtTimeZone {
572 timestamp,
573 time_zone,
574 } => Ok(Expr::Cast(Cast::new(
575 Box::new(self.sql_expr_to_logical_expr_internal(
576 *timestamp,
577 schema,
578 planner_context,
579 )?),
580 match *time_zone {
581 SQLExpr::Value(ValueWithSpan {
582 value: Value::SingleQuotedString(s),
583 span: _,
584 }) => DataType::Timestamp(TimeUnit::Nanosecond, Some(s.into())),
585 _ => {
586 return not_impl_err!(
587 "Unsupported ast node in sqltorel: {time_zone:?}"
588 );
589 }
590 },
591 ))),
592 SQLExpr::Dictionary(fields) => {
593 self.try_plan_dictionary_literal(fields, schema, planner_context)
594 }
595 SQLExpr::Map(map) => {
596 self.try_plan_map_literal(map.entries, schema, planner_context)
597 }
598 SQLExpr::AnyOp {
599 left,
600 compare_op,
601 right,
602 is_some: _,
605 } => match *right {
606 SQLExpr::Subquery(subquery) => self.parse_set_comparison_subquery(
607 *left,
608 *subquery,
609 &compare_op,
610 SetQuantifier::Any,
611 schema,
612 planner_context,
613 ),
614 _ => {
615 if compare_op != BinaryOperator::Eq {
616 plan_err!(
617 "Unsupported AnyOp: '{compare_op}', only '=' is supported"
618 )
619 } else {
620 let left_expr =
621 self.sql_to_expr(*left, schema, planner_context)?;
622 let right_expr =
623 self.sql_to_expr(*right, schema, planner_context)?;
624 Ok(array_has(right_expr, left_expr))
625 }
626 }
627 },
628 SQLExpr::AllOp {
629 left,
630 compare_op,
631 right,
632 } => match *right {
633 SQLExpr::Subquery(subquery) => self.parse_set_comparison_subquery(
634 *left,
635 *subquery,
636 &compare_op,
637 SetQuantifier::All,
638 schema,
639 planner_context,
640 ),
641 _ => not_impl_err!("ALL only supports subquery comparison currently"),
642 },
643 #[expect(deprecated)]
644 SQLExpr::Wildcard(_token) => Ok(Expr::Wildcard {
645 qualifier: None,
646 options: Box::new(WildcardOptions::default()),
647 }),
648 #[expect(deprecated)]
649 SQLExpr::QualifiedWildcard(object_name, _token) => Ok(Expr::Wildcard {
650 qualifier: Some(self.object_name_to_table_reference(object_name)?),
651 options: Box::new(WildcardOptions::default()),
652 }),
653 SQLExpr::Tuple(values) => self.parse_tuple(schema, planner_context, values),
654 SQLExpr::JsonAccess { value, path } => {
655 self.parse_json_access(schema, planner_context, value, &path)
656 }
657 _ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
658 }
659 }
660
661 fn parse_json_access(
662 &self,
663 schema: &DFSchema,
664 planner_context: &mut PlannerContext,
665 value: Box<SQLExpr>,
666 path: &JsonPath,
667 ) -> Result<Expr> {
668 let json_path = path.to_string();
669 let json_path = if let Some(json_path) = json_path.strip_prefix(":") {
670 json_path.to_owned()
672 } else {
673 json_path
674 };
675 self.build_logical_expr(
676 BinaryOperator::Custom(":".to_owned()),
677 self.sql_to_expr(*value, schema, planner_context)?,
678 Expr::Literal(ScalarValue::Utf8(Some(json_path)), None),
680 schema,
681 )
682 }
683
684 fn parse_struct(
686 &self,
687 schema: &DFSchema,
688 planner_context: &mut PlannerContext,
689 values: Vec<SQLExpr>,
690 fields: &[StructField],
691 ) -> Result<Expr> {
692 if !fields.is_empty() {
693 return not_impl_err!("Struct fields are not supported yet");
694 }
695 let is_named_struct = values
696 .iter()
697 .any(|value| matches!(value, SQLExpr::Named { .. }));
698
699 let mut create_struct_args = if is_named_struct {
700 self.create_named_struct_expr(values, schema, planner_context)?
701 } else {
702 self.create_struct_expr(values, schema, planner_context)?
703 };
704
705 for planner in self.context_provider.get_expr_planners() {
706 match planner.plan_struct_literal(create_struct_args, is_named_struct)? {
707 PlannerResult::Planned(expr) => return Ok(expr),
708 PlannerResult::Original(args) => create_struct_args = args,
709 }
710 }
711 not_impl_err!("Struct not supported by ExprPlanner: {create_struct_args:?}")
712 }
713
714 fn parse_tuple(
715 &self,
716 schema: &DFSchema,
717 planner_context: &mut PlannerContext,
718 values: Vec<SQLExpr>,
719 ) -> Result<Expr> {
720 match values.first() {
721 Some(SQLExpr::Identifier(_))
722 | Some(SQLExpr::Value(_))
723 | Some(SQLExpr::CompoundIdentifier(_)) => {
724 self.parse_struct(schema, planner_context, values, &[])
725 }
726 None => not_impl_err!("Empty tuple not supported yet"),
727 _ => {
728 not_impl_err!("Only identifiers and literals are supported in tuples")
729 }
730 }
731 }
732
733 fn sql_position_to_expr(
734 &self,
735 substr_expr: SQLExpr,
736 str_expr: SQLExpr,
737 schema: &DFSchema,
738 planner_context: &mut PlannerContext,
739 ) -> Result<Expr> {
740 let substr =
741 self.sql_expr_to_logical_expr(substr_expr, schema, planner_context)?;
742 let fullstr = self.sql_expr_to_logical_expr(str_expr, schema, planner_context)?;
743 let mut position_args = vec![fullstr, substr];
744 for planner in self.context_provider.get_expr_planners() {
745 match planner.plan_position(position_args)? {
746 PlannerResult::Planned(expr) => return Ok(expr),
747 PlannerResult::Original(args) => {
748 position_args = args;
749 }
750 }
751 }
752
753 not_impl_err!("Position not supported by ExprPlanner: {position_args:?}")
754 }
755
756 fn try_plan_dictionary_literal(
757 &self,
758 fields: Vec<DictionaryField>,
759 schema: &DFSchema,
760 planner_context: &mut PlannerContext,
761 ) -> Result<Expr> {
762 let mut keys = vec![];
763 let mut values = vec![];
764 for field in fields {
765 let key = lit(field.key.value);
766 let value =
767 self.sql_expr_to_logical_expr(*field.value, schema, planner_context)?;
768 keys.push(key);
769 values.push(value);
770 }
771
772 let mut raw_expr = RawDictionaryExpr { keys, values };
773
774 for planner in self.context_provider.get_expr_planners() {
775 match planner.plan_dictionary_literal(raw_expr, schema)? {
776 PlannerResult::Planned(expr) => {
777 return Ok(expr);
778 }
779 PlannerResult::Original(expr) => raw_expr = expr,
780 }
781 }
782 not_impl_err!("Dictionary not supported by ExprPlanner: {raw_expr:?}")
783 }
784
785 fn try_plan_map_literal(
786 &self,
787 entries: Vec<MapEntry>,
788 schema: &DFSchema,
789 planner_context: &mut PlannerContext,
790 ) -> Result<Expr> {
791 let mut exprs: Vec<_> = entries
792 .into_iter()
793 .flat_map(|entry| vec![entry.key, entry.value].into_iter())
794 .map(|expr| self.sql_expr_to_logical_expr(*expr, schema, planner_context))
795 .collect::<Result<Vec<_>>>()?;
796 for planner in self.context_provider.get_expr_planners() {
797 match planner.plan_make_map(exprs)? {
798 PlannerResult::Planned(expr) => {
799 return Ok(expr);
800 }
801 PlannerResult::Original(expr) => exprs = expr,
802 }
803 }
804 not_impl_err!("MAP not supported by ExprPlanner: {exprs:?}")
805 }
806
807 fn create_named_struct_expr(
810 &self,
811 values: Vec<SQLExpr>,
812 input_schema: &DFSchema,
813 planner_context: &mut PlannerContext,
814 ) -> Result<Vec<Expr>> {
815 Ok(values
816 .into_iter()
817 .enumerate()
818 .map(|(i, value)| {
819 let args = if let SQLExpr::Named { expr, name } = value {
820 [
821 name.value.lit(),
822 self.sql_expr_to_logical_expr(
823 *expr,
824 input_schema,
825 planner_context,
826 )?,
827 ]
828 } else {
829 [
830 format!("c{i}").lit(),
831 self.sql_expr_to_logical_expr(
832 value,
833 input_schema,
834 planner_context,
835 )?,
836 ]
837 };
838
839 Ok(args)
840 })
841 .collect::<Result<Vec<_>>>()?
842 .into_iter()
843 .flatten()
844 .collect())
845 }
846
847 fn create_struct_expr(
851 &self,
852 values: Vec<SQLExpr>,
853 input_schema: &DFSchema,
854 planner_context: &mut PlannerContext,
855 ) -> Result<Vec<Expr>> {
856 values
857 .into_iter()
858 .map(|value| {
859 self.sql_expr_to_logical_expr(value, input_schema, planner_context)
860 })
861 .collect::<Result<Vec<_>>>()
862 }
863
864 fn sql_in_list_to_expr(
865 &self,
866 expr: SQLExpr,
867 list: Vec<SQLExpr>,
868 negated: bool,
869 schema: &DFSchema,
870 planner_context: &mut PlannerContext,
871 ) -> Result<Expr> {
872 let list_expr = list
873 .into_iter()
874 .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
875 .collect::<Result<Vec<_>>>()?;
876
877 Ok(Expr::InList(InList::new(
878 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
879 list_expr,
880 negated,
881 )))
882 }
883
884 #[expect(clippy::too_many_arguments)]
885 fn sql_like_to_expr(
886 &self,
887 negated: bool,
888 expr: SQLExpr,
889 pattern: SQLExpr,
890 escape_char: Option<Value>,
891 schema: &DFSchema,
892 planner_context: &mut PlannerContext,
893 case_insensitive: bool,
894 any: bool,
895 ) -> Result<Expr> {
896 if any {
897 return not_impl_err!("ANY in LIKE expression");
898 }
899 let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
900 let escape_char = match escape_char {
901 Some(Value::SingleQuotedString(char)) if char.len() == 1 => {
902 Some(char.chars().next().unwrap())
903 }
904 Some(value) => {
905 return plan_err!(
906 "Invalid escape character in LIKE expression. Expected a single character wrapped with single quotes, got {value}"
907 );
908 }
909 None => None,
910 };
911 Ok(Expr::Like(Like::new(
912 negated,
913 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
914 Box::new(pattern),
915 escape_char,
916 case_insensitive,
917 )))
918 }
919
920 fn sql_similarto_to_expr(
921 &self,
922 negated: bool,
923 expr: SQLExpr,
924 pattern: SQLExpr,
925 escape_char: Option<Value>,
926 schema: &DFSchema,
927 planner_context: &mut PlannerContext,
928 ) -> Result<Expr> {
929 let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
930 let pattern_type = pattern.get_type(schema)?;
931 if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
932 return plan_err!("Invalid pattern in SIMILAR TO expression");
933 }
934 let escape_char = match escape_char {
935 Some(Value::SingleQuotedString(char)) if char.len() == 1 => {
936 Some(char.chars().next().unwrap())
937 }
938 Some(value) => {
939 return plan_err!(
940 "Invalid escape character in SIMILAR TO expression. Expected a single character wrapped with single quotes, got {value}"
941 );
942 }
943 None => None,
944 };
945 Ok(Expr::SimilarTo(Like::new(
946 negated,
947 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
948 Box::new(pattern),
949 escape_char,
950 false,
951 )))
952 }
953
954 fn sql_trim_to_expr(
955 &self,
956 expr: SQLExpr,
957 trim_where: Option<TrimWhereField>,
958 trim_what: Option<Box<SQLExpr>>,
959 trim_characters: Option<Vec<SQLExpr>>,
960 schema: &DFSchema,
961 planner_context: &mut PlannerContext,
962 ) -> Result<Expr> {
963 let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
964 let args = match (trim_what, trim_characters) {
965 (Some(to_trim), None) => {
966 let to_trim =
967 self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?;
968 Ok(vec![arg, to_trim])
969 }
970 (None, Some(trim_characters)) => {
971 if let Some(first) = trim_characters.first() {
972 let to_trim = self.sql_expr_to_logical_expr(
973 first.clone(),
974 schema,
975 planner_context,
976 )?;
977 Ok(vec![arg, to_trim])
978 } else {
979 plan_err!("TRIM CHARACTERS cannot be empty")
980 }
981 }
982 (Some(_), Some(_)) => {
983 plan_err!("Both TRIM and TRIM CHARACTERS cannot be specified")
984 }
985 (None, None) => Ok(vec![arg]),
986 }?;
987
988 let fun_name = match trim_where {
989 Some(TrimWhereField::Leading) => "ltrim",
990 Some(TrimWhereField::Trailing) => "rtrim",
991 Some(TrimWhereField::Both) => "btrim",
992 None => "trim",
993 };
994 let fun = self
995 .context_provider
996 .get_function_meta(fun_name)
997 .ok_or_else(|| {
998 internal_datafusion_err!("Unable to find expected '{fun_name}' function")
999 })?;
1000
1001 Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
1002 }
1003
1004 fn sql_overlay_to_expr(
1005 &self,
1006 expr: SQLExpr,
1007 overlay_what: SQLExpr,
1008 overlay_from: SQLExpr,
1009 overlay_for: Option<Box<SQLExpr>>,
1010 schema: &DFSchema,
1011 planner_context: &mut PlannerContext,
1012 ) -> Result<Expr> {
1013 let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
1014 let what_arg =
1015 self.sql_expr_to_logical_expr(overlay_what, schema, planner_context)?;
1016 let from_arg =
1017 self.sql_expr_to_logical_expr(overlay_from, schema, planner_context)?;
1018 let mut overlay_args = match overlay_for {
1019 Some(for_expr) => {
1020 let for_expr =
1021 self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
1022 vec![arg, what_arg, from_arg, for_expr]
1023 }
1024 None => vec![arg, what_arg, from_arg],
1025 };
1026 for planner in self.context_provider.get_expr_planners() {
1027 match planner.plan_overlay(overlay_args)? {
1028 PlannerResult::Planned(expr) => return Ok(expr),
1029 PlannerResult::Original(args) => overlay_args = args,
1030 }
1031 }
1032 not_impl_err!("Overlay not supported by ExprPlanner: {overlay_args:?}")
1033 }
1034
1035 fn sql_cast_to_expr(
1036 &self,
1037 expr: SQLExpr,
1038 data_type: &SQLDataType,
1039 format: Option<CastFormat>,
1040 schema: &DFSchema,
1041 planner_context: &mut PlannerContext,
1042 ) -> Result<Expr> {
1043 if let Some(format) = format {
1044 return not_impl_err!("CAST with format is not supported: {format}");
1045 }
1046
1047 let dt = self.convert_data_type_to_field(data_type)?;
1048 let expr = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
1049
1050 let expr = match dt.data_type() {
1053 DataType::Timestamp(TimeUnit::Nanosecond, tz)
1054 if expr.get_type(schema)? == DataType::Int64 =>
1055 {
1056 Expr::Cast(Cast::new(
1057 Box::new(expr),
1058 DataType::Timestamp(TimeUnit::Second, tz.clone()),
1059 ))
1060 }
1061 _ => expr,
1062 };
1063
1064 Ok(Expr::Cast(Cast::new(
1067 Box::new(expr),
1068 dt.data_type().clone(),
1069 )))
1070 }
1071
1072 fn extract_root_and_access_chain(
1090 &self,
1091 root: SQLExpr,
1092 mut access_chain: Vec<AccessExpr>,
1093 schema: &DFSchema,
1094 planner_context: &mut PlannerContext,
1095 ) -> Result<(Expr, Vec<AccessExpr>)> {
1096 let SQLExpr::Identifier(root_ident) = root else {
1097 let root = self.sql_expr_to_logical_expr(root, schema, planner_context)?;
1098 return Ok((root, access_chain));
1099 };
1100
1101 let mut compound_idents = vec![root_ident];
1102 let first_non_ident = access_chain
1103 .iter()
1104 .position(|access| !matches!(access, AccessExpr::Dot(SQLExpr::Identifier(_))))
1105 .unwrap_or(access_chain.len());
1106 for access in access_chain.drain(0..first_non_ident) {
1107 if let AccessExpr::Dot(SQLExpr::Identifier(ident)) = access {
1108 compound_idents.push(ident);
1109 } else {
1110 return internal_err!("Expected identifier in access chain");
1111 }
1112 }
1113
1114 let root = if compound_idents.len() == 1 {
1115 self.sql_identifier_to_expr(
1116 compound_idents.pop().unwrap(),
1117 schema,
1118 planner_context,
1119 )?
1120 } else {
1121 self.sql_compound_identifier_to_expr(
1122 compound_idents,
1123 schema,
1124 planner_context,
1125 )?
1126 };
1127 Ok((root, access_chain))
1128 }
1129
1130 fn sql_compound_field_access_to_expr(
1131 &self,
1132 root: SQLExpr,
1133 access_chain: Vec<AccessExpr>,
1134 schema: &DFSchema,
1135 planner_context: &mut PlannerContext,
1136 ) -> Result<Expr> {
1137 let (root, access_chain) = self.extract_root_and_access_chain(
1138 root,
1139 access_chain,
1140 schema,
1141 planner_context,
1142 )?;
1143 let fields = access_chain
1144 .into_iter()
1145 .map(|field| match field {
1146 AccessExpr::Subscript(subscript) => {
1147 match subscript {
1148 Subscript::Index { index } => {
1149 match index {
1151 SQLExpr::Value(ValueWithSpan {
1152 value:
1153 Value::SingleQuotedString(s)
1154 | Value::DoubleQuotedString(s),
1155 span: _,
1156 }) => Ok(Some(GetFieldAccess::NamedStructField {
1157 name: ScalarValue::from(s),
1158 })),
1159 SQLExpr::JsonAccess { .. } => {
1160 not_impl_err!("JsonAccess")
1161 }
1162 _ => Ok(Some(GetFieldAccess::ListIndex {
1164 key: Box::new(self.sql_expr_to_logical_expr(
1165 index,
1166 schema,
1167 planner_context,
1168 )?),
1169 })),
1170 }
1171 }
1172 Subscript::Slice {
1173 lower_bound,
1174 upper_bound,
1175 stride,
1176 } => {
1177 let lower_bound = if let Some(lower_bound) = lower_bound {
1179 self.sql_expr_to_logical_expr(
1180 lower_bound,
1181 schema,
1182 planner_context,
1183 )
1184 } else {
1185 not_impl_err!("Slice subscript requires a lower bound")
1186 }?;
1187
1188 let upper_bound = if let Some(upper_bound) = upper_bound {
1190 self.sql_expr_to_logical_expr(
1191 upper_bound,
1192 schema,
1193 planner_context,
1194 )
1195 } else {
1196 not_impl_err!("Slice subscript requires an upper bound")
1197 }?;
1198
1199 let stride = if let Some(stride) = stride {
1201 self.sql_expr_to_logical_expr(
1202 stride,
1203 schema,
1204 planner_context,
1205 )?
1206 } else {
1207 lit(1i64)
1208 };
1209
1210 Ok(Some(GetFieldAccess::ListRange {
1211 start: Box::new(lower_bound),
1212 stop: Box::new(upper_bound),
1213 stride: Box::new(stride),
1214 }))
1215 }
1216 }
1217 }
1218 AccessExpr::Dot(expr) => match expr {
1219 SQLExpr::Value(ValueWithSpan {
1220 value: Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
1221 span : _
1222 }) => Ok(Some(GetFieldAccess::NamedStructField {
1223 name: ScalarValue::from(s),
1224 })),
1225 _ => {
1226 not_impl_err!(
1227 "Dot access not supported for non-string expr: {expr:?}"
1228 )
1229 }
1230 },
1231 })
1232 .collect::<Result<Vec<_>>>()?;
1233
1234 fields
1235 .into_iter()
1236 .flatten()
1237 .try_fold(root, |expr, field_access| {
1238 let mut field_access_expr = RawFieldAccessExpr { expr, field_access };
1239 for planner in self.context_provider.get_expr_planners() {
1240 match planner.plan_field_access(field_access_expr, schema)? {
1241 PlannerResult::Planned(expr) => return Ok(expr),
1242 PlannerResult::Original(expr) => {
1243 field_access_expr = expr;
1244 }
1245 }
1246 }
1247 not_impl_err!(
1248 "GetFieldAccess not supported by ExprPlanner: {field_access_expr:?}"
1249 )
1250 })
1251 }
1252}
1253
1254#[cfg(test)]
1255mod tests {
1256 use std::collections::HashMap;
1257 use std::sync::Arc;
1258
1259 use arrow::datatypes::{Field, Schema};
1260 use sqlparser::dialect::GenericDialect;
1261 use sqlparser::parser::Parser;
1262
1263 use datafusion_common::TableReference;
1264 use datafusion_common::config::ConfigOptions;
1265 use datafusion_expr::logical_plan::builder::LogicalTableSource;
1266 use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
1267
1268 use super::*;
1269
1270 struct TestContextProvider {
1271 options: ConfigOptions,
1272 tables: HashMap<String, Arc<dyn TableSource>>,
1273 }
1274
1275 impl TestContextProvider {
1276 pub fn new() -> Self {
1277 let mut tables = HashMap::new();
1278 tables.insert(
1279 "table1".to_string(),
1280 create_table_source(vec![Field::new(
1281 "column1".to_string(),
1282 DataType::Utf8,
1283 false,
1284 )]),
1285 );
1286
1287 Self {
1288 options: Default::default(),
1289 tables,
1290 }
1291 }
1292 }
1293
1294 impl ContextProvider for TestContextProvider {
1295 fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
1296 match self.tables.get(name.table()) {
1297 Some(table) => Ok(Arc::clone(table)),
1298 _ => plan_err!("Table not found: {}", name.table()),
1299 }
1300 }
1301
1302 fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
1303 None
1304 }
1305
1306 fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
1307 match name {
1308 "sum" => Some(datafusion_functions_aggregate::sum::sum_udaf()),
1309 _ => None,
1310 }
1311 }
1312
1313 fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
1314 None
1315 }
1316
1317 fn options(&self) -> &ConfigOptions {
1318 &self.options
1319 }
1320
1321 fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
1322 None
1323 }
1324
1325 fn udf_names(&self) -> Vec<String> {
1326 Vec::new()
1327 }
1328
1329 fn udaf_names(&self) -> Vec<String> {
1330 vec!["sum".to_string()]
1331 }
1332
1333 fn udwf_names(&self) -> Vec<String> {
1334 Vec::new()
1335 }
1336 }
1337
1338 fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
1339 Arc::new(LogicalTableSource::new(Arc::new(
1340 Schema::new_with_metadata(fields, HashMap::new()),
1341 )))
1342 }
1343
1344 macro_rules! test_stack_overflow {
1345 ($num_expr:expr) => {
1346 paste::item! {
1347 #[test]
1348 fn [<test_stack_overflow_ $num_expr>]() {
1349 let schema = DFSchema::empty();
1350 let mut planner_context = PlannerContext::default();
1351
1352 let expr_str = (0..$num_expr)
1353 .map(|i| format!("column1 = 'value{:?}'", i))
1354 .collect::<Vec<String>>()
1355 .join(" OR ");
1356
1357 let dialect = GenericDialect{};
1358 let mut parser = Parser::new(&dialect)
1359 .try_with_sql(expr_str.as_str())
1360 .unwrap();
1361 let sql_expr = parser.parse_expr().unwrap();
1362
1363 let context_provider = TestContextProvider::new();
1364 let sql_to_rel = SqlToRel::new(&context_provider);
1365
1366 sql_to_rel.sql_expr_to_logical_expr(
1368 sql_expr,
1369 &schema,
1370 &mut planner_context,
1371 ).unwrap();
1372 }
1373 }
1374 };
1375 }
1376
1377 test_stack_overflow!(64);
1378 test_stack_overflow!(128);
1379 test_stack_overflow!(256);
1380 test_stack_overflow!(512);
1381 test_stack_overflow!(1024);
1382 test_stack_overflow!(2048);
1383 test_stack_overflow!(4096);
1384 test_stack_overflow!(8192);
1385 #[test]
1386 fn test_sql_to_expr_with_alias() {
1387 let schema = DFSchema::empty();
1388 let mut planner_context = PlannerContext::default();
1389
1390 let expr_str = "SUM(int_col) as sum_int_col";
1391
1392 let dialect = GenericDialect {};
1393 let mut parser = Parser::new(&dialect).try_with_sql(expr_str).unwrap();
1394 let sql_expr = parser.parse_expr_with_alias().unwrap();
1396
1397 let context_provider = TestContextProvider::new();
1398 let sql_to_rel = SqlToRel::new(&context_provider);
1399
1400 let expr = sql_to_rel
1401 .sql_expr_to_logical_expr_with_alias(sql_expr, &schema, &mut planner_context)
1402 .unwrap();
1403
1404 assert!(matches!(expr, Expr::Alias(_)));
1405 }
1406}