1use arrow::datatypes::{DataType, TimeUnit};
19use datafusion_expr::planner::{
20 PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr,
21};
22use sqlparser::ast::{
23 AccessExpr, BinaryOperator, CastFormat, CastKind, CeilFloorKind,
24 DataType as SQLDataType, DateTimeField, DictionaryField, Expr as SQLExpr,
25 ExprWithAlias as SQLExprWithAlias, MapEntry, StructField, Subscript, TrimWhereField,
26 TypedString, Value, ValueWithSpan,
27};
28
29use datafusion_common::{
30 DFSchema, Result, ScalarValue, internal_datafusion_err, internal_err, not_impl_err,
31 plan_err,
32};
33
34use datafusion_expr::expr::ScalarFunction;
35use datafusion_expr::expr::{InList, WildcardOptions};
36use datafusion_expr::{
37 Between, BinaryExpr, Cast, Expr, ExprSchemable, GetFieldAccess, Like, Literal,
38 Operator, TryCast, lit,
39};
40
41use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
42
43mod binary_op;
44mod function;
45mod grouping_set;
46mod identifier;
47mod order_by;
48mod subquery;
49mod substring;
50mod unary_op;
51mod value;
52
53impl<S: ContextProvider> SqlToRel<'_, S> {
54 pub(crate) fn sql_expr_to_logical_expr_with_alias(
55 &self,
56 sql: SQLExprWithAlias,
57 schema: &DFSchema,
58 planner_context: &mut PlannerContext,
59 ) -> Result<Expr> {
60 let mut expr =
61 self.sql_expr_to_logical_expr(sql.expr, schema, planner_context)?;
62 if let Some(alias) = sql.alias {
63 expr = expr.alias(alias.value);
64 }
65 Ok(expr)
66 }
67 pub(crate) fn sql_expr_to_logical_expr(
68 &self,
69 sql: SQLExpr,
70 schema: &DFSchema,
71 planner_context: &mut PlannerContext,
72 ) -> Result<Expr> {
73 enum StackEntry {
74 SQLExpr(Box<SQLExpr>),
75 Operator(BinaryOperator),
76 }
77
78 let mut stack = vec![StackEntry::SQLExpr(Box::new(sql))];
83 let mut eval_stack = vec![];
84
85 while let Some(entry) = stack.pop() {
86 match entry {
87 StackEntry::SQLExpr(sql_expr) => {
88 match *sql_expr {
89 SQLExpr::BinaryOp { left, op, right } => {
90 stack.push(StackEntry::Operator(op));
93 stack.push(StackEntry::SQLExpr(right));
94 stack.push(StackEntry::SQLExpr(left));
95 }
96 _ => {
97 let expr = self.sql_expr_to_logical_expr_internal(
98 *sql_expr,
99 schema,
100 planner_context,
101 )?;
102 eval_stack.push(expr);
103 }
104 }
105 }
106 StackEntry::Operator(op) => {
107 let right = eval_stack.pop().unwrap();
108 let left = eval_stack.pop().unwrap();
109 let expr = self.build_logical_expr(op, left, right, schema)?;
110 eval_stack.push(expr);
111 }
112 }
113 }
114
115 assert_eq!(1, eval_stack.len());
116 let expr = eval_stack.pop().unwrap();
117 Ok(expr)
118 }
119
120 fn build_logical_expr(
121 &self,
122 op: BinaryOperator,
123 left: Expr,
124 right: Expr,
125 schema: &DFSchema,
126 ) -> Result<Expr> {
127 let mut binary_expr = RawBinaryExpr { op, left, right };
129 for planner in self.context_provider.get_expr_planners() {
130 match planner.plan_binary_op(binary_expr, schema)? {
131 PlannerResult::Planned(expr) => {
132 return Ok(expr);
133 }
134 PlannerResult::Original(expr) => {
135 binary_expr = expr;
136 }
137 }
138 }
139
140 let RawBinaryExpr { op, left, right } = binary_expr;
141 Ok(Expr::BinaryExpr(BinaryExpr::new(
142 Box::new(left),
143 self.parse_sql_binary_op(&op)?,
144 Box::new(right),
145 )))
146 }
147
148 pub fn sql_to_expr_with_alias(
149 &self,
150 sql: SQLExprWithAlias,
151 schema: &DFSchema,
152 planner_context: &mut PlannerContext,
153 ) -> Result<Expr> {
154 let mut expr =
155 self.sql_expr_to_logical_expr_with_alias(sql, schema, planner_context)?;
156 expr = self.rewrite_partial_qualifier(expr, schema);
157 self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?;
158 let (expr, _) = expr.infer_placeholder_types(schema)?;
159 Ok(expr)
160 }
161
162 pub fn sql_to_expr(
164 &self,
165 sql: SQLExpr,
166 schema: &DFSchema,
167 planner_context: &mut PlannerContext,
168 ) -> Result<Expr> {
169 let mut expr = self.sql_expr_to_logical_expr(sql, schema, planner_context)?;
171 expr = self.rewrite_partial_qualifier(expr, schema);
172 self.validate_schema_satisfies_exprs(schema, std::slice::from_ref(&expr))?;
173 let (expr, _) = expr.infer_placeholder_types(schema)?;
174 Ok(expr)
175 }
176
177 fn rewrite_partial_qualifier(&self, expr: Expr, schema: &DFSchema) -> Expr {
179 match expr {
180 Expr::Column(col) => match &col.relation {
181 Some(q) => {
182 match schema.iter().find(|(qualifier, field)| match qualifier {
183 Some(field_q) => {
184 field.name() == &col.name
185 && field_q.to_string().ends_with(&format!(".{q}"))
186 }
187 _ => false,
188 }) {
189 Some((qualifier, df_field)) => Expr::from((qualifier, df_field)),
190 None => Expr::Column(col),
191 }
192 }
193 None => Expr::Column(col),
194 },
195 _ => expr,
196 }
197 }
198
199 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
202 fn sql_expr_to_logical_expr_internal(
203 &self,
204 sql: SQLExpr,
205 schema: &DFSchema,
206 planner_context: &mut PlannerContext,
207 ) -> Result<Expr> {
208 match sql {
214 SQLExpr::Value(value) => {
215 self.parse_value(value.into(), planner_context.prepare_param_data_types())
216 }
217 SQLExpr::Extract { field, expr, .. } => {
218 let mut extract_args = vec![
219 Expr::Literal(ScalarValue::from(format!("{field}")), None),
220 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
221 ];
222
223 for planner in self.context_provider.get_expr_planners() {
224 match planner.plan_extract(extract_args)? {
225 PlannerResult::Planned(expr) => return Ok(expr),
226 PlannerResult::Original(args) => {
227 extract_args = args;
228 }
229 }
230 }
231
232 not_impl_err!("Extract not supported by ExprPlanner: {extract_args:?}")
233 }
234
235 SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
236 SQLExpr::Interval(interval) => self.sql_interval_to_expr(false, interval),
237 SQLExpr::Identifier(id) => {
238 self.sql_identifier_to_expr(id, schema, planner_context)
239 }
240
241 SQLExpr::CompoundFieldAccess { root, access_chain } => self
243 .sql_compound_field_access_to_expr(
244 *root,
245 access_chain,
246 schema,
247 planner_context,
248 ),
249
250 SQLExpr::CompoundIdentifier(ids) => {
251 self.sql_compound_identifier_to_expr(ids, schema, planner_context)
252 }
253
254 SQLExpr::Case {
255 operand,
256 conditions,
257 else_result,
258 case_token: _,
259 end_token: _,
260 } => self.sql_case_identifier_to_expr(
261 operand,
262 conditions,
263 else_result,
264 schema,
265 planner_context,
266 ),
267
268 SQLExpr::Cast {
269 kind: CastKind::Cast | CastKind::DoubleColon,
270 expr,
271 data_type,
272 format,
273 } => {
274 self.sql_cast_to_expr(*expr, &data_type, format, schema, planner_context)
275 }
276
277 SQLExpr::Cast {
278 kind: CastKind::TryCast | CastKind::SafeCast,
279 expr,
280 data_type,
281 format,
282 } => {
283 if let Some(format) = format {
284 return not_impl_err!("CAST with format is not supported: {format}");
285 }
286
287 Ok(Expr::TryCast(TryCast::new(
288 Box::new(self.sql_expr_to_logical_expr(
289 *expr,
290 schema,
291 planner_context,
292 )?),
293 self.convert_data_type_to_field(&data_type)?
294 .data_type()
295 .clone(),
296 )))
297 }
298
299 SQLExpr::TypedString(TypedString {
300 data_type,
301 value,
302 uses_odbc_syntax: _,
303 }) => Ok(Expr::Cast(Cast::new(
304 Box::new(lit(value.into_string().unwrap())),
305 self.convert_data_type_to_field(&data_type)?
306 .data_type()
307 .clone(),
308 ))),
309
310 SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new(
311 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
312 ))),
313
314 SQLExpr::IsNotNull(expr) => Ok(Expr::IsNotNull(Box::new(
315 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
316 ))),
317
318 SQLExpr::IsDistinctFrom(left, right) => {
319 Ok(Expr::BinaryExpr(BinaryExpr::new(
320 Box::new(self.sql_expr_to_logical_expr(
321 *left,
322 schema,
323 planner_context,
324 )?),
325 Operator::IsDistinctFrom,
326 Box::new(self.sql_expr_to_logical_expr(
327 *right,
328 schema,
329 planner_context,
330 )?),
331 )))
332 }
333
334 SQLExpr::IsNotDistinctFrom(left, right) => {
335 Ok(Expr::BinaryExpr(BinaryExpr::new(
336 Box::new(self.sql_expr_to_logical_expr(
337 *left,
338 schema,
339 planner_context,
340 )?),
341 Operator::IsNotDistinctFrom,
342 Box::new(self.sql_expr_to_logical_expr(
343 *right,
344 schema,
345 planner_context,
346 )?),
347 )))
348 }
349
350 SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(
351 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
352 ))),
353
354 SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(
355 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
356 ))),
357
358 SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(
359 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
360 ))),
361
362 SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(
363 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
364 ))),
365
366 SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(
367 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
368 ))),
369
370 SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(
371 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
372 ))),
373
374 SQLExpr::UnaryOp { op, expr } => {
375 self.parse_sql_unary_op(op, *expr, schema, planner_context)
376 }
377
378 SQLExpr::Between {
379 expr,
380 negated,
381 low,
382 high,
383 } => Ok(Expr::Between(Between::new(
384 Box::new(self.sql_expr_to_logical_expr(
385 *expr,
386 schema,
387 planner_context,
388 )?),
389 negated,
390 Box::new(self.sql_expr_to_logical_expr(*low, schema, planner_context)?),
391 Box::new(self.sql_expr_to_logical_expr(
392 *high,
393 schema,
394 planner_context,
395 )?),
396 ))),
397
398 SQLExpr::InList {
399 expr,
400 list,
401 negated,
402 } => self.sql_in_list_to_expr(*expr, list, negated, schema, planner_context),
403
404 SQLExpr::Like {
405 negated,
406 expr,
407 pattern,
408 escape_char,
409 any,
410 } => self.sql_like_to_expr(
411 negated,
412 *expr,
413 *pattern,
414 escape_char,
415 schema,
416 planner_context,
417 false,
418 any,
419 ),
420
421 SQLExpr::ILike {
422 negated,
423 expr,
424 pattern,
425 escape_char,
426 any,
427 } => self.sql_like_to_expr(
428 negated,
429 *expr,
430 *pattern,
431 escape_char,
432 schema,
433 planner_context,
434 true,
435 any,
436 ),
437
438 SQLExpr::SimilarTo {
439 negated,
440 expr,
441 pattern,
442 escape_char,
443 } => self.sql_similarto_to_expr(
444 negated,
445 *expr,
446 *pattern,
447 escape_char,
448 schema,
449 planner_context,
450 ),
451
452 SQLExpr::BinaryOp { .. } => {
453 internal_err!("binary_op should be handled by sql_expr_to_logical_expr.")
454 }
455
456 #[cfg(feature = "unicode_expressions")]
457 SQLExpr::Substring {
458 expr,
459 substring_from,
460 substring_for,
461 special: _,
462 shorthand: _,
463 } => self.sql_substring_to_expr(
464 expr,
465 substring_from,
466 substring_for,
467 schema,
468 planner_context,
469 ),
470
471 #[cfg(not(feature = "unicode_expressions"))]
472 SQLExpr::Substring { .. } => {
473 internal_err!(
474 "statement substring requires compilation with feature flag: unicode_expressions."
475 )
476 }
477
478 SQLExpr::Trim {
479 expr,
480 trim_where,
481 trim_what,
482 trim_characters,
483 } => self.sql_trim_to_expr(
484 *expr,
485 trim_where,
486 trim_what,
487 trim_characters,
488 schema,
489 planner_context,
490 ),
491
492 SQLExpr::Function(function) => {
493 self.sql_function_to_expr(function, schema, planner_context)
494 }
495
496 SQLExpr::Rollup(exprs) => {
497 self.sql_rollup_to_expr(exprs, schema, planner_context)
498 }
499 SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs, schema, planner_context),
500 SQLExpr::GroupingSets(exprs) => {
501 self.sql_grouping_sets_to_expr(exprs, schema, planner_context)
502 }
503
504 SQLExpr::Floor { expr, field } => match field {
505 CeilFloorKind::DateTimeField(DateTimeField::NoDateTime) => {
506 self.sql_fn_name_to_expr(*expr, "floor", schema, planner_context)
507 }
508 CeilFloorKind::DateTimeField(_) => {
509 not_impl_err!("FLOOR with datetime is not supported")
510 }
511 CeilFloorKind::Scale(_) => {
512 not_impl_err!("FLOOR with scale is not supported")
513 }
514 },
515 SQLExpr::Ceil { expr, field } => match field {
516 CeilFloorKind::DateTimeField(DateTimeField::NoDateTime) => {
517 self.sql_fn_name_to_expr(*expr, "ceil", schema, planner_context)
518 }
519 CeilFloorKind::DateTimeField(_) => {
520 not_impl_err!("CEIL with datetime is not supported")
521 }
522 CeilFloorKind::Scale(_) => {
523 not_impl_err!("CEIL with scale is not supported")
524 }
525 },
526 SQLExpr::Overlay {
527 expr,
528 overlay_what,
529 overlay_from,
530 overlay_for,
531 } => self.sql_overlay_to_expr(
532 *expr,
533 *overlay_what,
534 *overlay_from,
535 overlay_for,
536 schema,
537 planner_context,
538 ),
539 SQLExpr::Nested(e) => {
540 self.sql_expr_to_logical_expr(*e, schema, planner_context)
541 }
542
543 SQLExpr::Exists { subquery, negated } => {
544 self.parse_exists_subquery(*subquery, negated, schema, planner_context)
545 }
546 SQLExpr::InSubquery {
547 expr,
548 subquery,
549 negated,
550 } => {
551 self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context)
552 }
553 SQLExpr::Subquery(subquery) => {
554 self.parse_scalar_subquery(*subquery, schema, planner_context)
555 }
556
557 SQLExpr::Struct { values, fields } => {
558 self.parse_struct(schema, planner_context, values, &fields)
559 }
560 SQLExpr::Position { expr, r#in } => {
561 self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
562 }
563 SQLExpr::AtTimeZone {
564 timestamp,
565 time_zone,
566 } => Ok(Expr::Cast(Cast::new(
567 Box::new(self.sql_expr_to_logical_expr_internal(
568 *timestamp,
569 schema,
570 planner_context,
571 )?),
572 match *time_zone {
573 SQLExpr::Value(ValueWithSpan {
574 value: Value::SingleQuotedString(s),
575 span: _,
576 }) => DataType::Timestamp(TimeUnit::Nanosecond, Some(s.into())),
577 _ => {
578 return not_impl_err!(
579 "Unsupported ast node in sqltorel: {time_zone:?}"
580 );
581 }
582 },
583 ))),
584 SQLExpr::Dictionary(fields) => {
585 self.try_plan_dictionary_literal(fields, schema, planner_context)
586 }
587 SQLExpr::Map(map) => {
588 self.try_plan_map_literal(map.entries, schema, planner_context)
589 }
590 SQLExpr::AnyOp {
591 left,
592 compare_op,
593 right,
594 is_some: _,
597 } => {
598 let mut binary_expr = RawBinaryExpr {
599 op: compare_op,
600 left: self.sql_expr_to_logical_expr(
601 *left,
602 schema,
603 planner_context,
604 )?,
605 right: self.sql_expr_to_logical_expr(
606 *right,
607 schema,
608 planner_context,
609 )?,
610 };
611 for planner in self.context_provider.get_expr_planners() {
612 match planner.plan_any(binary_expr)? {
613 PlannerResult::Planned(expr) => {
614 return Ok(expr);
615 }
616 PlannerResult::Original(expr) => {
617 binary_expr = expr;
618 }
619 }
620 }
621 not_impl_err!("AnyOp not supported by ExprPlanner: {binary_expr:?}")
622 }
623 #[expect(deprecated)]
624 SQLExpr::Wildcard(_token) => Ok(Expr::Wildcard {
625 qualifier: None,
626 options: Box::new(WildcardOptions::default()),
627 }),
628 #[expect(deprecated)]
629 SQLExpr::QualifiedWildcard(object_name, _token) => Ok(Expr::Wildcard {
630 qualifier: Some(self.object_name_to_table_reference(object_name)?),
631 options: Box::new(WildcardOptions::default()),
632 }),
633 SQLExpr::Tuple(values) => self.parse_tuple(schema, planner_context, values),
634 _ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
635 }
636 }
637
638 fn parse_struct(
640 &self,
641 schema: &DFSchema,
642 planner_context: &mut PlannerContext,
643 values: Vec<SQLExpr>,
644 fields: &[StructField],
645 ) -> Result<Expr> {
646 if !fields.is_empty() {
647 return not_impl_err!("Struct fields are not supported yet");
648 }
649 let is_named_struct = values
650 .iter()
651 .any(|value| matches!(value, SQLExpr::Named { .. }));
652
653 let mut create_struct_args = if is_named_struct {
654 self.create_named_struct_expr(values, schema, planner_context)?
655 } else {
656 self.create_struct_expr(values, schema, planner_context)?
657 };
658
659 for planner in self.context_provider.get_expr_planners() {
660 match planner.plan_struct_literal(create_struct_args, is_named_struct)? {
661 PlannerResult::Planned(expr) => return Ok(expr),
662 PlannerResult::Original(args) => create_struct_args = args,
663 }
664 }
665 not_impl_err!("Struct not supported by ExprPlanner: {create_struct_args:?}")
666 }
667
668 fn parse_tuple(
669 &self,
670 schema: &DFSchema,
671 planner_context: &mut PlannerContext,
672 values: Vec<SQLExpr>,
673 ) -> Result<Expr> {
674 match values.first() {
675 Some(SQLExpr::Identifier(_))
676 | Some(SQLExpr::Value(_))
677 | Some(SQLExpr::CompoundIdentifier(_)) => {
678 self.parse_struct(schema, planner_context, values, &[])
679 }
680 None => not_impl_err!("Empty tuple not supported yet"),
681 _ => {
682 not_impl_err!("Only identifiers and literals are supported in tuples")
683 }
684 }
685 }
686
687 fn sql_position_to_expr(
688 &self,
689 substr_expr: SQLExpr,
690 str_expr: SQLExpr,
691 schema: &DFSchema,
692 planner_context: &mut PlannerContext,
693 ) -> Result<Expr> {
694 let substr =
695 self.sql_expr_to_logical_expr(substr_expr, schema, planner_context)?;
696 let fullstr = self.sql_expr_to_logical_expr(str_expr, schema, planner_context)?;
697 let mut position_args = vec![fullstr, substr];
698 for planner in self.context_provider.get_expr_planners() {
699 match planner.plan_position(position_args)? {
700 PlannerResult::Planned(expr) => return Ok(expr),
701 PlannerResult::Original(args) => {
702 position_args = args;
703 }
704 }
705 }
706
707 not_impl_err!("Position not supported by ExprPlanner: {position_args:?}")
708 }
709
710 fn try_plan_dictionary_literal(
711 &self,
712 fields: Vec<DictionaryField>,
713 schema: &DFSchema,
714 planner_context: &mut PlannerContext,
715 ) -> Result<Expr> {
716 let mut keys = vec![];
717 let mut values = vec![];
718 for field in fields {
719 let key = lit(field.key.value);
720 let value =
721 self.sql_expr_to_logical_expr(*field.value, schema, planner_context)?;
722 keys.push(key);
723 values.push(value);
724 }
725
726 let mut raw_expr = RawDictionaryExpr { keys, values };
727
728 for planner in self.context_provider.get_expr_planners() {
729 match planner.plan_dictionary_literal(raw_expr, schema)? {
730 PlannerResult::Planned(expr) => {
731 return Ok(expr);
732 }
733 PlannerResult::Original(expr) => raw_expr = expr,
734 }
735 }
736 not_impl_err!("Dictionary not supported by ExprPlanner: {raw_expr:?}")
737 }
738
739 fn try_plan_map_literal(
740 &self,
741 entries: Vec<MapEntry>,
742 schema: &DFSchema,
743 planner_context: &mut PlannerContext,
744 ) -> Result<Expr> {
745 let mut exprs: Vec<_> = entries
746 .into_iter()
747 .flat_map(|entry| vec![entry.key, entry.value].into_iter())
748 .map(|expr| self.sql_expr_to_logical_expr(*expr, schema, planner_context))
749 .collect::<Result<Vec<_>>>()?;
750 for planner in self.context_provider.get_expr_planners() {
751 match planner.plan_make_map(exprs)? {
752 PlannerResult::Planned(expr) => {
753 return Ok(expr);
754 }
755 PlannerResult::Original(expr) => exprs = expr,
756 }
757 }
758 not_impl_err!("MAP not supported by ExprPlanner: {exprs:?}")
759 }
760
761 fn create_named_struct_expr(
764 &self,
765 values: Vec<SQLExpr>,
766 input_schema: &DFSchema,
767 planner_context: &mut PlannerContext,
768 ) -> Result<Vec<Expr>> {
769 Ok(values
770 .into_iter()
771 .enumerate()
772 .map(|(i, value)| {
773 let args = if let SQLExpr::Named { expr, name } = value {
774 [
775 name.value.lit(),
776 self.sql_expr_to_logical_expr(
777 *expr,
778 input_schema,
779 planner_context,
780 )?,
781 ]
782 } else {
783 [
784 format!("c{i}").lit(),
785 self.sql_expr_to_logical_expr(
786 value,
787 input_schema,
788 planner_context,
789 )?,
790 ]
791 };
792
793 Ok(args)
794 })
795 .collect::<Result<Vec<_>>>()?
796 .into_iter()
797 .flatten()
798 .collect())
799 }
800
801 fn create_struct_expr(
805 &self,
806 values: Vec<SQLExpr>,
807 input_schema: &DFSchema,
808 planner_context: &mut PlannerContext,
809 ) -> Result<Vec<Expr>> {
810 values
811 .into_iter()
812 .map(|value| {
813 self.sql_expr_to_logical_expr(value, input_schema, planner_context)
814 })
815 .collect::<Result<Vec<_>>>()
816 }
817
818 fn sql_in_list_to_expr(
819 &self,
820 expr: SQLExpr,
821 list: Vec<SQLExpr>,
822 negated: bool,
823 schema: &DFSchema,
824 planner_context: &mut PlannerContext,
825 ) -> Result<Expr> {
826 let list_expr = list
827 .into_iter()
828 .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
829 .collect::<Result<Vec<_>>>()?;
830
831 Ok(Expr::InList(InList::new(
832 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
833 list_expr,
834 negated,
835 )))
836 }
837
838 #[expect(clippy::too_many_arguments)]
839 fn sql_like_to_expr(
840 &self,
841 negated: bool,
842 expr: SQLExpr,
843 pattern: SQLExpr,
844 escape_char: Option<Value>,
845 schema: &DFSchema,
846 planner_context: &mut PlannerContext,
847 case_insensitive: bool,
848 any: bool,
849 ) -> Result<Expr> {
850 if any {
851 return not_impl_err!("ANY in LIKE expression");
852 }
853 let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
854 let escape_char = match escape_char {
855 Some(Value::SingleQuotedString(char)) if char.len() == 1 => {
856 Some(char.chars().next().unwrap())
857 }
858 Some(value) => {
859 return plan_err!(
860 "Invalid escape character in LIKE expression. Expected a single character wrapped with single quotes, got {value}"
861 );
862 }
863 None => None,
864 };
865 Ok(Expr::Like(Like::new(
866 negated,
867 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
868 Box::new(pattern),
869 escape_char,
870 case_insensitive,
871 )))
872 }
873
874 fn sql_similarto_to_expr(
875 &self,
876 negated: bool,
877 expr: SQLExpr,
878 pattern: SQLExpr,
879 escape_char: Option<Value>,
880 schema: &DFSchema,
881 planner_context: &mut PlannerContext,
882 ) -> Result<Expr> {
883 let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
884 let pattern_type = pattern.get_type(schema)?;
885 if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
886 return plan_err!("Invalid pattern in SIMILAR TO expression");
887 }
888 let escape_char = match escape_char {
889 Some(Value::SingleQuotedString(char)) if char.len() == 1 => {
890 Some(char.chars().next().unwrap())
891 }
892 Some(value) => {
893 return plan_err!(
894 "Invalid escape character in SIMILAR TO expression. Expected a single character wrapped with single quotes, got {value}"
895 );
896 }
897 None => None,
898 };
899 Ok(Expr::SimilarTo(Like::new(
900 negated,
901 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
902 Box::new(pattern),
903 escape_char,
904 false,
905 )))
906 }
907
908 fn sql_trim_to_expr(
909 &self,
910 expr: SQLExpr,
911 trim_where: Option<TrimWhereField>,
912 trim_what: Option<Box<SQLExpr>>,
913 trim_characters: Option<Vec<SQLExpr>>,
914 schema: &DFSchema,
915 planner_context: &mut PlannerContext,
916 ) -> Result<Expr> {
917 let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
918 let args = match (trim_what, trim_characters) {
919 (Some(to_trim), None) => {
920 let to_trim =
921 self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?;
922 Ok(vec![arg, to_trim])
923 }
924 (None, Some(trim_characters)) => {
925 if let Some(first) = trim_characters.first() {
926 let to_trim = self.sql_expr_to_logical_expr(
927 first.clone(),
928 schema,
929 planner_context,
930 )?;
931 Ok(vec![arg, to_trim])
932 } else {
933 plan_err!("TRIM CHARACTERS cannot be empty")
934 }
935 }
936 (Some(_), Some(_)) => {
937 plan_err!("Both TRIM and TRIM CHARACTERS cannot be specified")
938 }
939 (None, None) => Ok(vec![arg]),
940 }?;
941
942 let fun_name = match trim_where {
943 Some(TrimWhereField::Leading) => "ltrim",
944 Some(TrimWhereField::Trailing) => "rtrim",
945 Some(TrimWhereField::Both) => "btrim",
946 None => "trim",
947 };
948 let fun = self
949 .context_provider
950 .get_function_meta(fun_name)
951 .ok_or_else(|| {
952 internal_datafusion_err!("Unable to find expected '{fun_name}' function")
953 })?;
954
955 Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
956 }
957
958 fn sql_overlay_to_expr(
959 &self,
960 expr: SQLExpr,
961 overlay_what: SQLExpr,
962 overlay_from: SQLExpr,
963 overlay_for: Option<Box<SQLExpr>>,
964 schema: &DFSchema,
965 planner_context: &mut PlannerContext,
966 ) -> Result<Expr> {
967 let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
968 let what_arg =
969 self.sql_expr_to_logical_expr(overlay_what, schema, planner_context)?;
970 let from_arg =
971 self.sql_expr_to_logical_expr(overlay_from, schema, planner_context)?;
972 let mut overlay_args = match overlay_for {
973 Some(for_expr) => {
974 let for_expr =
975 self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
976 vec![arg, what_arg, from_arg, for_expr]
977 }
978 None => vec![arg, what_arg, from_arg],
979 };
980 for planner in self.context_provider.get_expr_planners() {
981 match planner.plan_overlay(overlay_args)? {
982 PlannerResult::Planned(expr) => return Ok(expr),
983 PlannerResult::Original(args) => overlay_args = args,
984 }
985 }
986 not_impl_err!("Overlay not supported by ExprPlanner: {overlay_args:?}")
987 }
988
989 fn sql_cast_to_expr(
990 &self,
991 expr: SQLExpr,
992 data_type: &SQLDataType,
993 format: Option<CastFormat>,
994 schema: &DFSchema,
995 planner_context: &mut PlannerContext,
996 ) -> Result<Expr> {
997 if let Some(format) = format {
998 return not_impl_err!("CAST with format is not supported: {format}");
999 }
1000
1001 let dt = self.convert_data_type_to_field(data_type)?;
1002 let expr = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
1003
1004 let expr = match dt.data_type() {
1007 DataType::Timestamp(TimeUnit::Nanosecond, tz)
1008 if expr.get_type(schema)? == DataType::Int64 =>
1009 {
1010 Expr::Cast(Cast::new(
1011 Box::new(expr),
1012 DataType::Timestamp(TimeUnit::Second, tz.clone()),
1013 ))
1014 }
1015 _ => expr,
1016 };
1017
1018 Ok(Expr::Cast(Cast::new(
1021 Box::new(expr),
1022 dt.data_type().clone(),
1023 )))
1024 }
1025
1026 fn extract_root_and_access_chain(
1044 &self,
1045 root: SQLExpr,
1046 mut access_chain: Vec<AccessExpr>,
1047 schema: &DFSchema,
1048 planner_context: &mut PlannerContext,
1049 ) -> Result<(Expr, Vec<AccessExpr>)> {
1050 let SQLExpr::Identifier(root_ident) = root else {
1051 let root = self.sql_expr_to_logical_expr(root, schema, planner_context)?;
1052 return Ok((root, access_chain));
1053 };
1054
1055 let mut compound_idents = vec![root_ident];
1056 let first_non_ident = access_chain
1057 .iter()
1058 .position(|access| !matches!(access, AccessExpr::Dot(SQLExpr::Identifier(_))))
1059 .unwrap_or(access_chain.len());
1060 for access in access_chain.drain(0..first_non_ident) {
1061 if let AccessExpr::Dot(SQLExpr::Identifier(ident)) = access {
1062 compound_idents.push(ident);
1063 } else {
1064 return internal_err!("Expected identifier in access chain");
1065 }
1066 }
1067
1068 let root = if compound_idents.len() == 1 {
1069 self.sql_identifier_to_expr(
1070 compound_idents.pop().unwrap(),
1071 schema,
1072 planner_context,
1073 )?
1074 } else {
1075 self.sql_compound_identifier_to_expr(
1076 compound_idents,
1077 schema,
1078 planner_context,
1079 )?
1080 };
1081 Ok((root, access_chain))
1082 }
1083
1084 fn sql_compound_field_access_to_expr(
1085 &self,
1086 root: SQLExpr,
1087 access_chain: Vec<AccessExpr>,
1088 schema: &DFSchema,
1089 planner_context: &mut PlannerContext,
1090 ) -> Result<Expr> {
1091 let (root, access_chain) = self.extract_root_and_access_chain(
1092 root,
1093 access_chain,
1094 schema,
1095 planner_context,
1096 )?;
1097 let fields = access_chain
1098 .into_iter()
1099 .map(|field| match field {
1100 AccessExpr::Subscript(subscript) => {
1101 match subscript {
1102 Subscript::Index { index } => {
1103 match index {
1105 SQLExpr::Value(ValueWithSpan {
1106 value:
1107 Value::SingleQuotedString(s)
1108 | Value::DoubleQuotedString(s),
1109 span: _,
1110 }) => Ok(Some(GetFieldAccess::NamedStructField {
1111 name: ScalarValue::from(s),
1112 })),
1113 SQLExpr::JsonAccess { .. } => {
1114 not_impl_err!("JsonAccess")
1115 }
1116 _ => Ok(Some(GetFieldAccess::ListIndex {
1118 key: Box::new(self.sql_expr_to_logical_expr(
1119 index,
1120 schema,
1121 planner_context,
1122 )?),
1123 })),
1124 }
1125 }
1126 Subscript::Slice {
1127 lower_bound,
1128 upper_bound,
1129 stride,
1130 } => {
1131 let lower_bound = if let Some(lower_bound) = lower_bound {
1133 self.sql_expr_to_logical_expr(
1134 lower_bound,
1135 schema,
1136 planner_context,
1137 )
1138 } else {
1139 not_impl_err!("Slice subscript requires a lower bound")
1140 }?;
1141
1142 let upper_bound = if let Some(upper_bound) = upper_bound {
1144 self.sql_expr_to_logical_expr(
1145 upper_bound,
1146 schema,
1147 planner_context,
1148 )
1149 } else {
1150 not_impl_err!("Slice subscript requires an upper bound")
1151 }?;
1152
1153 let stride = if let Some(stride) = stride {
1155 self.sql_expr_to_logical_expr(
1156 stride,
1157 schema,
1158 planner_context,
1159 )?
1160 } else {
1161 lit(1i64)
1162 };
1163
1164 Ok(Some(GetFieldAccess::ListRange {
1165 start: Box::new(lower_bound),
1166 stop: Box::new(upper_bound),
1167 stride: Box::new(stride),
1168 }))
1169 }
1170 }
1171 }
1172 AccessExpr::Dot(expr) => match expr {
1173 SQLExpr::Value(ValueWithSpan {
1174 value: Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
1175 span : _
1176 }) => Ok(Some(GetFieldAccess::NamedStructField {
1177 name: ScalarValue::from(s),
1178 })),
1179 _ => {
1180 not_impl_err!(
1181 "Dot access not supported for non-string expr: {expr:?}"
1182 )
1183 }
1184 },
1185 })
1186 .collect::<Result<Vec<_>>>()?;
1187
1188 fields
1189 .into_iter()
1190 .flatten()
1191 .try_fold(root, |expr, field_access| {
1192 let mut field_access_expr = RawFieldAccessExpr { expr, field_access };
1193 for planner in self.context_provider.get_expr_planners() {
1194 match planner.plan_field_access(field_access_expr, schema)? {
1195 PlannerResult::Planned(expr) => return Ok(expr),
1196 PlannerResult::Original(expr) => {
1197 field_access_expr = expr;
1198 }
1199 }
1200 }
1201 not_impl_err!(
1202 "GetFieldAccess not supported by ExprPlanner: {field_access_expr:?}"
1203 )
1204 })
1205 }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use std::collections::HashMap;
1211 use std::sync::Arc;
1212
1213 use arrow::datatypes::{Field, Schema};
1214 use sqlparser::dialect::GenericDialect;
1215 use sqlparser::parser::Parser;
1216
1217 use datafusion_common::TableReference;
1218 use datafusion_common::config::ConfigOptions;
1219 use datafusion_expr::logical_plan::builder::LogicalTableSource;
1220 use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
1221
1222 use super::*;
1223
1224 struct TestContextProvider {
1225 options: ConfigOptions,
1226 tables: HashMap<String, Arc<dyn TableSource>>,
1227 }
1228
1229 impl TestContextProvider {
1230 pub fn new() -> Self {
1231 let mut tables = HashMap::new();
1232 tables.insert(
1233 "table1".to_string(),
1234 create_table_source(vec![Field::new(
1235 "column1".to_string(),
1236 DataType::Utf8,
1237 false,
1238 )]),
1239 );
1240
1241 Self {
1242 options: Default::default(),
1243 tables,
1244 }
1245 }
1246 }
1247
1248 impl ContextProvider for TestContextProvider {
1249 fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
1250 match self.tables.get(name.table()) {
1251 Some(table) => Ok(Arc::clone(table)),
1252 _ => plan_err!("Table not found: {}", name.table()),
1253 }
1254 }
1255
1256 fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
1257 None
1258 }
1259
1260 fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
1261 match name {
1262 "sum" => Some(datafusion_functions_aggregate::sum::sum_udaf()),
1263 _ => None,
1264 }
1265 }
1266
1267 fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
1268 None
1269 }
1270
1271 fn options(&self) -> &ConfigOptions {
1272 &self.options
1273 }
1274
1275 fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
1276 None
1277 }
1278
1279 fn udf_names(&self) -> Vec<String> {
1280 Vec::new()
1281 }
1282
1283 fn udaf_names(&self) -> Vec<String> {
1284 vec!["sum".to_string()]
1285 }
1286
1287 fn udwf_names(&self) -> Vec<String> {
1288 Vec::new()
1289 }
1290 }
1291
1292 fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
1293 Arc::new(LogicalTableSource::new(Arc::new(
1294 Schema::new_with_metadata(fields, HashMap::new()),
1295 )))
1296 }
1297
1298 macro_rules! test_stack_overflow {
1299 ($num_expr:expr) => {
1300 paste::item! {
1301 #[test]
1302 fn [<test_stack_overflow_ $num_expr>]() {
1303 let schema = DFSchema::empty();
1304 let mut planner_context = PlannerContext::default();
1305
1306 let expr_str = (0..$num_expr)
1307 .map(|i| format!("column1 = 'value{:?}'", i))
1308 .collect::<Vec<String>>()
1309 .join(" OR ");
1310
1311 let dialect = GenericDialect{};
1312 let mut parser = Parser::new(&dialect)
1313 .try_with_sql(expr_str.as_str())
1314 .unwrap();
1315 let sql_expr = parser.parse_expr().unwrap();
1316
1317 let context_provider = TestContextProvider::new();
1318 let sql_to_rel = SqlToRel::new(&context_provider);
1319
1320 sql_to_rel.sql_expr_to_logical_expr(
1322 sql_expr,
1323 &schema,
1324 &mut planner_context,
1325 ).unwrap();
1326 }
1327 }
1328 };
1329 }
1330
1331 test_stack_overflow!(64);
1332 test_stack_overflow!(128);
1333 test_stack_overflow!(256);
1334 test_stack_overflow!(512);
1335 test_stack_overflow!(1024);
1336 test_stack_overflow!(2048);
1337 test_stack_overflow!(4096);
1338 test_stack_overflow!(8192);
1339 #[test]
1340 fn test_sql_to_expr_with_alias() {
1341 let schema = DFSchema::empty();
1342 let mut planner_context = PlannerContext::default();
1343
1344 let expr_str = "SUM(int_col) as sum_int_col";
1345
1346 let dialect = GenericDialect {};
1347 let mut parser = Parser::new(&dialect).try_with_sql(expr_str).unwrap();
1348 let sql_expr = parser.parse_expr_with_alias().unwrap();
1350
1351 let context_provider = TestContextProvider::new();
1352 let sql_to_rel = SqlToRel::new(&context_provider);
1353
1354 let expr = sql_to_rel
1355 .sql_expr_to_logical_expr_with_alias(sql_expr, &schema, &mut planner_context)
1356 .unwrap();
1357
1358 assert!(matches!(expr, Expr::Alias(_)));
1359 }
1360}