1use arrow::datatypes::{DataType, TimeUnit};
19use datafusion_expr::planner::{
20 PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr,
21};
22use sqlparser::ast::{
23 AccessExpr, BinaryOperator, CastFormat, CastKind, DataType as SQLDataType,
24 DictionaryField, Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias, MapEntry,
25 StructField, Subscript, TrimWhereField, Value,
26};
27
28use datafusion_common::{
29 internal_datafusion_err, internal_err, not_impl_err, plan_err, Column, DFSchema,
30 Result, ScalarValue,
31};
32
33use datafusion_expr::expr::ScalarFunction;
34use datafusion_expr::expr::{InList, WildcardOptions};
35use datafusion_expr::{
36 lit, Between, BinaryExpr, Cast, Expr, ExprSchemable, GetFieldAccess, Like, Literal,
37 Operator, TryCast,
38};
39
40use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
41
42mod binary_op;
43mod function;
44mod grouping_set;
45mod identifier;
46mod order_by;
47mod subquery;
48mod substring;
49mod unary_op;
50mod value;
51
52impl<S: ContextProvider> SqlToRel<'_, S> {
53 pub(crate) fn sql_expr_to_logical_expr_with_alias(
54 &self,
55 sql: SQLExprWithAlias,
56 schema: &DFSchema,
57 planner_context: &mut PlannerContext,
58 ) -> Result<Expr> {
59 let mut expr =
60 self.sql_expr_to_logical_expr(sql.expr, schema, planner_context)?;
61 if let Some(alias) = sql.alias {
62 expr = expr.alias(alias.value);
63 }
64 Ok(expr)
65 }
66 pub(crate) fn sql_expr_to_logical_expr(
67 &self,
68 sql: SQLExpr,
69 schema: &DFSchema,
70 planner_context: &mut PlannerContext,
71 ) -> Result<Expr> {
72 enum StackEntry {
73 SQLExpr(Box<SQLExpr>),
74 Operator(BinaryOperator),
75 }
76
77 let mut stack = vec![StackEntry::SQLExpr(Box::new(sql))];
82 let mut eval_stack = vec![];
83
84 while let Some(entry) = stack.pop() {
85 match entry {
86 StackEntry::SQLExpr(sql_expr) => {
87 match *sql_expr {
88 SQLExpr::BinaryOp { left, op, right } => {
89 stack.push(StackEntry::Operator(op));
92 stack.push(StackEntry::SQLExpr(right));
93 stack.push(StackEntry::SQLExpr(left));
94 }
95 _ => {
96 let expr = self.sql_expr_to_logical_expr_internal(
97 *sql_expr,
98 schema,
99 planner_context,
100 )?;
101 eval_stack.push(expr);
102 }
103 }
104 }
105 StackEntry::Operator(op) => {
106 let right = eval_stack.pop().unwrap();
107 let left = eval_stack.pop().unwrap();
108 let expr = self.build_logical_expr(op, left, right, schema)?;
109 eval_stack.push(expr);
110 }
111 }
112 }
113
114 assert_eq!(1, eval_stack.len());
115 let expr = eval_stack.pop().unwrap();
116 Ok(expr)
117 }
118
119 fn build_logical_expr(
120 &self,
121 op: BinaryOperator,
122 left: Expr,
123 right: Expr,
124 schema: &DFSchema,
125 ) -> Result<Expr> {
126 let mut binary_expr = RawBinaryExpr { op, left, right };
128 for planner in self.context_provider.get_expr_planners() {
129 match planner.plan_binary_op(binary_expr, schema)? {
130 PlannerResult::Planned(expr) => {
131 return Ok(expr);
132 }
133 PlannerResult::Original(expr) => {
134 binary_expr = expr;
135 }
136 }
137 }
138
139 let RawBinaryExpr { op, left, right } = binary_expr;
140 Ok(Expr::BinaryExpr(BinaryExpr::new(
141 Box::new(left),
142 self.parse_sql_binary_op(op)?,
143 Box::new(right),
144 )))
145 }
146
147 pub fn sql_to_expr_with_alias(
148 &self,
149 sql: SQLExprWithAlias,
150 schema: &DFSchema,
151 planner_context: &mut PlannerContext,
152 ) -> Result<Expr> {
153 let mut expr =
154 self.sql_expr_to_logical_expr_with_alias(sql, schema, planner_context)?;
155 expr = self.rewrite_partial_qualifier(expr, schema);
156 self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?;
157 let (expr, _) = expr.infer_placeholder_types(schema)?;
158 Ok(expr)
159 }
160
161 pub fn sql_to_expr(
163 &self,
164 sql: SQLExpr,
165 schema: &DFSchema,
166 planner_context: &mut PlannerContext,
167 ) -> Result<Expr> {
168 let mut expr = self.sql_expr_to_logical_expr(sql, schema, planner_context)?;
170 expr = self.rewrite_partial_qualifier(expr, schema);
171 self.validate_schema_satisfies_exprs(schema, std::slice::from_ref(&expr))?;
172 let (expr, _) = expr.infer_placeholder_types(schema)?;
173 Ok(expr)
174 }
175
176 fn rewrite_partial_qualifier(&self, expr: Expr, schema: &DFSchema) -> Expr {
178 match expr {
179 Expr::Column(col) => match &col.relation {
180 Some(q) => {
181 match schema.iter().find(|(qualifier, field)| match qualifier {
182 Some(field_q) => {
183 field.name() == &col.name
184 && field_q.to_string().ends_with(&format!(".{q}"))
185 }
186 _ => false,
187 }) {
188 Some((qualifier, df_field)) => Expr::from((qualifier, df_field)),
189 None => Expr::Column(col),
190 }
191 }
192 None => Expr::Column(col),
193 },
194 _ => expr,
195 }
196 }
197
198 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
201 fn sql_expr_to_logical_expr_internal(
202 &self,
203 sql: SQLExpr,
204 schema: &DFSchema,
205 planner_context: &mut PlannerContext,
206 ) -> Result<Expr> {
207 match sql {
213 SQLExpr::Value(value) => {
214 self.parse_value(value, planner_context.prepare_param_data_types())
215 }
216 SQLExpr::Extract { field, expr, .. } => {
217 let mut extract_args = vec![
218 Expr::Literal(ScalarValue::from(format!("{field}"))),
219 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
220 ];
221
222 for planner in self.context_provider.get_expr_planners() {
223 match planner.plan_extract(extract_args)? {
224 PlannerResult::Planned(expr) => return Ok(expr),
225 PlannerResult::Original(args) => {
226 extract_args = args;
227 }
228 }
229 }
230
231 not_impl_err!("Extract not supported by ExprPlanner: {extract_args:?}")
232 }
233
234 SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
235 SQLExpr::Interval(interval) => self.sql_interval_to_expr(false, interval),
236 SQLExpr::Identifier(id) => {
237 self.sql_identifier_to_expr(id, schema, planner_context)
238 }
239
240 SQLExpr::CompoundFieldAccess { root, access_chain } => self
242 .sql_compound_field_access_to_expr(
243 *root,
244 access_chain,
245 schema,
246 planner_context,
247 ),
248
249 SQLExpr::CompoundIdentifier(ids) => {
250 self.sql_compound_identifier_to_expr(ids, schema, planner_context)
251 }
252
253 SQLExpr::Case {
254 operand,
255 conditions,
256 results,
257 else_result,
258 } => self.sql_case_identifier_to_expr(
259 operand,
260 conditions,
261 results,
262 else_result,
263 schema,
264 planner_context,
265 ),
266
267 SQLExpr::Cast {
268 kind: CastKind::Cast | CastKind::DoubleColon,
269 expr,
270 data_type,
271 format,
272 } => self.sql_cast_to_expr(*expr, data_type, format, schema, planner_context),
273
274 SQLExpr::Cast {
275 kind: CastKind::TryCast | CastKind::SafeCast,
276 expr,
277 data_type,
278 format,
279 } => {
280 if let Some(format) = format {
281 return not_impl_err!("CAST with format is not supported: {format}");
282 }
283
284 Ok(Expr::TryCast(TryCast::new(
285 Box::new(self.sql_expr_to_logical_expr(
286 *expr,
287 schema,
288 planner_context,
289 )?),
290 self.convert_data_type(&data_type)?,
291 )))
292 }
293
294 SQLExpr::TypedString { data_type, value } => Ok(Expr::Cast(Cast::new(
295 Box::new(lit(value)),
296 self.convert_data_type(&data_type)?,
297 ))),
298
299 SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new(
300 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
301 ))),
302
303 SQLExpr::IsNotNull(expr) => Ok(Expr::IsNotNull(Box::new(
304 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
305 ))),
306
307 SQLExpr::IsDistinctFrom(left, right) => {
308 Ok(Expr::BinaryExpr(BinaryExpr::new(
309 Box::new(self.sql_expr_to_logical_expr(
310 *left,
311 schema,
312 planner_context,
313 )?),
314 Operator::IsDistinctFrom,
315 Box::new(self.sql_expr_to_logical_expr(
316 *right,
317 schema,
318 planner_context,
319 )?),
320 )))
321 }
322
323 SQLExpr::IsNotDistinctFrom(left, right) => {
324 Ok(Expr::BinaryExpr(BinaryExpr::new(
325 Box::new(self.sql_expr_to_logical_expr(
326 *left,
327 schema,
328 planner_context,
329 )?),
330 Operator::IsNotDistinctFrom,
331 Box::new(self.sql_expr_to_logical_expr(
332 *right,
333 schema,
334 planner_context,
335 )?),
336 )))
337 }
338
339 SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(
340 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
341 ))),
342
343 SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(
344 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
345 ))),
346
347 SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(
348 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
349 ))),
350
351 SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(
352 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
353 ))),
354
355 SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(
356 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
357 ))),
358
359 SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(
360 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
361 ))),
362
363 SQLExpr::UnaryOp { op, expr } => {
364 self.parse_sql_unary_op(op, *expr, schema, planner_context)
365 }
366
367 SQLExpr::Between {
368 expr,
369 negated,
370 low,
371 high,
372 } => Ok(Expr::Between(Between::new(
373 Box::new(self.sql_expr_to_logical_expr(
374 *expr,
375 schema,
376 planner_context,
377 )?),
378 negated,
379 Box::new(self.sql_expr_to_logical_expr(*low, schema, planner_context)?),
380 Box::new(self.sql_expr_to_logical_expr(
381 *high,
382 schema,
383 planner_context,
384 )?),
385 ))),
386
387 SQLExpr::InList {
388 expr,
389 list,
390 negated,
391 } => self.sql_in_list_to_expr(*expr, list, negated, schema, planner_context),
392
393 SQLExpr::Like {
394 negated,
395 expr,
396 pattern,
397 escape_char,
398 any,
399 } => self.sql_like_to_expr(
400 negated,
401 *expr,
402 *pattern,
403 escape_char,
404 schema,
405 planner_context,
406 false,
407 any,
408 ),
409
410 SQLExpr::ILike {
411 negated,
412 expr,
413 pattern,
414 escape_char,
415 any,
416 } => self.sql_like_to_expr(
417 negated,
418 *expr,
419 *pattern,
420 escape_char,
421 schema,
422 planner_context,
423 true,
424 any,
425 ),
426
427 SQLExpr::SimilarTo {
428 negated,
429 expr,
430 pattern,
431 escape_char,
432 } => self.sql_similarto_to_expr(
433 negated,
434 *expr,
435 *pattern,
436 escape_char,
437 schema,
438 planner_context,
439 ),
440
441 SQLExpr::BinaryOp { .. } => {
442 internal_err!("binary_op should be handled by sql_expr_to_logical_expr.")
443 }
444
445 #[cfg(feature = "unicode_expressions")]
446 SQLExpr::Substring {
447 expr,
448 substring_from,
449 substring_for,
450 special: _,
451 } => self.sql_substring_to_expr(
452 expr,
453 substring_from,
454 substring_for,
455 schema,
456 planner_context,
457 ),
458
459 #[cfg(not(feature = "unicode_expressions"))]
460 SQLExpr::Substring { .. } => {
461 internal_err!(
462 "statement substring requires compilation with feature flag: unicode_expressions."
463 )
464 }
465
466 SQLExpr::Trim {
467 expr,
468 trim_where,
469 trim_what,
470 trim_characters,
471 } => self.sql_trim_to_expr(
472 *expr,
473 trim_where,
474 trim_what,
475 trim_characters,
476 schema,
477 planner_context,
478 ),
479
480 SQLExpr::Function(function) => {
481 self.sql_function_to_expr(function, schema, planner_context)
482 }
483
484 SQLExpr::Rollup(exprs) => {
485 self.sql_rollup_to_expr(exprs, schema, planner_context)
486 }
487 SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs, schema, planner_context),
488 SQLExpr::GroupingSets(exprs) => {
489 self.sql_grouping_sets_to_expr(exprs, schema, planner_context)
490 }
491
492 SQLExpr::Floor {
493 expr,
494 field: _field,
495 } => self.sql_fn_name_to_expr(*expr, "floor", schema, planner_context),
496 SQLExpr::Ceil {
497 expr,
498 field: _field,
499 } => self.sql_fn_name_to_expr(*expr, "ceil", schema, planner_context),
500 SQLExpr::Overlay {
501 expr,
502 overlay_what,
503 overlay_from,
504 overlay_for,
505 } => self.sql_overlay_to_expr(
506 *expr,
507 *overlay_what,
508 *overlay_from,
509 overlay_for,
510 schema,
511 planner_context,
512 ),
513 SQLExpr::Nested(e) => {
514 self.sql_expr_to_logical_expr(*e, schema, planner_context)
515 }
516
517 SQLExpr::Exists { subquery, negated } => {
518 self.parse_exists_subquery(*subquery, negated, schema, planner_context)
519 }
520 SQLExpr::InSubquery {
521 expr,
522 subquery,
523 negated,
524 } => {
525 self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context)
526 }
527 SQLExpr::Subquery(subquery) => {
528 self.parse_scalar_subquery(*subquery, schema, planner_context)
529 }
530
531 SQLExpr::Struct { values, fields } => {
532 self.parse_struct(schema, planner_context, values, fields)
533 }
534 SQLExpr::Position { expr, r#in } => {
535 self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
536 }
537 SQLExpr::AtTimeZone {
538 timestamp,
539 time_zone,
540 } => Ok(Expr::Cast(Cast::new(
541 Box::new(self.sql_expr_to_logical_expr_internal(
542 *timestamp,
543 schema,
544 planner_context,
545 )?),
546 match *time_zone {
547 SQLExpr::Value(Value::SingleQuotedString(s)) => {
548 DataType::Timestamp(TimeUnit::Nanosecond, Some(s.into()))
549 }
550 _ => {
551 return not_impl_err!(
552 "Unsupported ast node in sqltorel: {time_zone:?}"
553 )
554 }
555 },
556 ))),
557 SQLExpr::Dictionary(fields) => {
558 self.try_plan_dictionary_literal(fields, schema, planner_context)
559 }
560 SQLExpr::Map(map) => {
561 self.try_plan_map_literal(map.entries, schema, planner_context)
562 }
563 SQLExpr::AnyOp {
564 left,
565 compare_op,
566 right,
567 is_some: _,
570 } => {
571 let mut binary_expr = RawBinaryExpr {
572 op: compare_op,
573 left: self.sql_expr_to_logical_expr(
574 *left,
575 schema,
576 planner_context,
577 )?,
578 right: self.sql_expr_to_logical_expr(
579 *right,
580 schema,
581 planner_context,
582 )?,
583 };
584 for planner in self.context_provider.get_expr_planners() {
585 match planner.plan_any(binary_expr)? {
586 PlannerResult::Planned(expr) => {
587 return Ok(expr);
588 }
589 PlannerResult::Original(expr) => {
590 binary_expr = expr;
591 }
592 }
593 }
594 not_impl_err!("AnyOp not supported by ExprPlanner: {binary_expr:?}")
595 }
596 #[expect(deprecated)]
597 SQLExpr::Wildcard(_token) => Ok(Expr::Wildcard {
598 qualifier: None,
599 options: Box::new(WildcardOptions::default()),
600 }),
601 #[expect(deprecated)]
602 SQLExpr::QualifiedWildcard(object_name, _token) => Ok(Expr::Wildcard {
603 qualifier: Some(self.object_name_to_table_reference(object_name)?),
604 options: Box::new(WildcardOptions::default()),
605 }),
606 SQLExpr::Tuple(values) => self.parse_tuple(schema, planner_context, values),
607 _ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
608 }
609 }
610
611 fn parse_struct(
613 &self,
614 schema: &DFSchema,
615 planner_context: &mut PlannerContext,
616 values: Vec<SQLExpr>,
617 fields: Vec<StructField>,
618 ) -> Result<Expr> {
619 if !fields.is_empty() {
620 return not_impl_err!("Struct fields are not supported yet");
621 }
622 let is_named_struct = values
623 .iter()
624 .any(|value| matches!(value, SQLExpr::Named { .. }));
625
626 let mut create_struct_args = if is_named_struct {
627 self.create_named_struct_expr(values, schema, planner_context)?
628 } else {
629 self.create_struct_expr(values, schema, planner_context)?
630 };
631
632 for planner in self.context_provider.get_expr_planners() {
633 match planner.plan_struct_literal(create_struct_args, is_named_struct)? {
634 PlannerResult::Planned(expr) => return Ok(expr),
635 PlannerResult::Original(args) => create_struct_args = args,
636 }
637 }
638 not_impl_err!("Struct not supported by ExprPlanner: {create_struct_args:?}")
639 }
640
641 fn parse_tuple(
642 &self,
643 schema: &DFSchema,
644 planner_context: &mut PlannerContext,
645 values: Vec<SQLExpr>,
646 ) -> Result<Expr> {
647 match values.first() {
648 Some(SQLExpr::Identifier(_)) | Some(SQLExpr::Value(_)) => {
649 self.parse_struct(schema, planner_context, values, vec![])
650 }
651 None => not_impl_err!("Empty tuple not supported yet"),
652 _ => {
653 not_impl_err!("Only identifiers and literals are supported in tuples")
654 }
655 }
656 }
657
658 fn sql_position_to_expr(
659 &self,
660 substr_expr: SQLExpr,
661 str_expr: SQLExpr,
662 schema: &DFSchema,
663 planner_context: &mut PlannerContext,
664 ) -> Result<Expr> {
665 let substr =
666 self.sql_expr_to_logical_expr(substr_expr, schema, planner_context)?;
667 let fullstr = self.sql_expr_to_logical_expr(str_expr, schema, planner_context)?;
668 let mut position_args = vec![fullstr, substr];
669 for planner in self.context_provider.get_expr_planners() {
670 match planner.plan_position(position_args)? {
671 PlannerResult::Planned(expr) => return Ok(expr),
672 PlannerResult::Original(args) => {
673 position_args = args;
674 }
675 }
676 }
677
678 not_impl_err!("Position not supported by ExprPlanner: {position_args:?}")
679 }
680
681 fn try_plan_dictionary_literal(
682 &self,
683 fields: Vec<DictionaryField>,
684 schema: &DFSchema,
685 planner_context: &mut PlannerContext,
686 ) -> Result<Expr> {
687 let mut keys = vec![];
688 let mut values = vec![];
689 for field in fields {
690 let key = lit(field.key.value);
691 let value =
692 self.sql_expr_to_logical_expr(*field.value, schema, planner_context)?;
693 keys.push(key);
694 values.push(value);
695 }
696
697 let mut raw_expr = RawDictionaryExpr { keys, values };
698
699 for planner in self.context_provider.get_expr_planners() {
700 match planner.plan_dictionary_literal(raw_expr, schema)? {
701 PlannerResult::Planned(expr) => {
702 return Ok(expr);
703 }
704 PlannerResult::Original(expr) => raw_expr = expr,
705 }
706 }
707 not_impl_err!("Dictionary not supported by ExprPlanner: {raw_expr:?}")
708 }
709
710 fn try_plan_map_literal(
711 &self,
712 entries: Vec<MapEntry>,
713 schema: &DFSchema,
714 planner_context: &mut PlannerContext,
715 ) -> Result<Expr> {
716 let mut exprs: Vec<_> = entries
717 .into_iter()
718 .flat_map(|entry| vec![entry.key, entry.value].into_iter())
719 .map(|expr| self.sql_expr_to_logical_expr(*expr, schema, planner_context))
720 .collect::<Result<Vec<_>>>()?;
721 for planner in self.context_provider.get_expr_planners() {
722 match planner.plan_make_map(exprs)? {
723 PlannerResult::Planned(expr) => {
724 return Ok(expr);
725 }
726 PlannerResult::Original(expr) => exprs = expr,
727 }
728 }
729 not_impl_err!("MAP not supported by ExprPlanner: {exprs:?}")
730 }
731
732 fn create_named_struct_expr(
735 &self,
736 values: Vec<SQLExpr>,
737 input_schema: &DFSchema,
738 planner_context: &mut PlannerContext,
739 ) -> Result<Vec<Expr>> {
740 Ok(values
741 .into_iter()
742 .enumerate()
743 .map(|(i, value)| {
744 let args = if let SQLExpr::Named { expr, name } = value {
745 [
746 name.value.lit(),
747 self.sql_expr_to_logical_expr(
748 *expr,
749 input_schema,
750 planner_context,
751 )?,
752 ]
753 } else {
754 [
755 format!("c{i}").lit(),
756 self.sql_expr_to_logical_expr(
757 value,
758 input_schema,
759 planner_context,
760 )?,
761 ]
762 };
763
764 Ok(args)
765 })
766 .collect::<Result<Vec<_>>>()?
767 .into_iter()
768 .flatten()
769 .collect())
770 }
771
772 fn create_struct_expr(
776 &self,
777 values: Vec<SQLExpr>,
778 input_schema: &DFSchema,
779 planner_context: &mut PlannerContext,
780 ) -> Result<Vec<Expr>> {
781 values
782 .into_iter()
783 .map(|value| {
784 self.sql_expr_to_logical_expr(value, input_schema, planner_context)
785 })
786 .collect::<Result<Vec<_>>>()
787 }
788
789 fn sql_in_list_to_expr(
790 &self,
791 expr: SQLExpr,
792 list: Vec<SQLExpr>,
793 negated: bool,
794 schema: &DFSchema,
795 planner_context: &mut PlannerContext,
796 ) -> Result<Expr> {
797 let list_expr = list
798 .into_iter()
799 .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
800 .collect::<Result<Vec<_>>>()?;
801
802 Ok(Expr::InList(InList::new(
803 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
804 list_expr,
805 negated,
806 )))
807 }
808
809 #[allow(clippy::too_many_arguments)]
810 fn sql_like_to_expr(
811 &self,
812 negated: bool,
813 expr: SQLExpr,
814 pattern: SQLExpr,
815 escape_char: Option<String>,
816 schema: &DFSchema,
817 planner_context: &mut PlannerContext,
818 case_insensitive: bool,
819 any: bool,
820 ) -> Result<Expr> {
821 if any {
822 return not_impl_err!("ANY in LIKE expression");
823 }
824 let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
825 let escape_char = if let Some(char) = escape_char {
826 if char.len() != 1 {
827 return plan_err!("Invalid escape character in LIKE expression");
828 }
829 Some(char.chars().next().unwrap())
830 } else {
831 None
832 };
833 Ok(Expr::Like(Like::new(
834 negated,
835 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
836 Box::new(pattern),
837 escape_char,
838 case_insensitive,
839 )))
840 }
841
842 fn sql_similarto_to_expr(
843 &self,
844 negated: bool,
845 expr: SQLExpr,
846 pattern: SQLExpr,
847 escape_char: Option<String>,
848 schema: &DFSchema,
849 planner_context: &mut PlannerContext,
850 ) -> Result<Expr> {
851 let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
852 let pattern_type = pattern.get_type(schema)?;
853 if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
854 return plan_err!("Invalid pattern in SIMILAR TO expression");
855 }
856 let escape_char = if let Some(char) = escape_char {
857 if char.len() != 1 {
858 return plan_err!("Invalid escape character in SIMILAR TO expression");
859 }
860 Some(char.chars().next().unwrap())
861 } else {
862 None
863 };
864 Ok(Expr::SimilarTo(Like::new(
865 negated,
866 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
867 Box::new(pattern),
868 escape_char,
869 false,
870 )))
871 }
872
873 fn sql_trim_to_expr(
874 &self,
875 expr: SQLExpr,
876 trim_where: Option<TrimWhereField>,
877 trim_what: Option<Box<SQLExpr>>,
878 trim_characters: Option<Vec<SQLExpr>>,
879 schema: &DFSchema,
880 planner_context: &mut PlannerContext,
881 ) -> Result<Expr> {
882 let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
883 let args = match (trim_what, trim_characters) {
884 (Some(to_trim), None) => {
885 let to_trim =
886 self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?;
887 Ok(vec![arg, to_trim])
888 }
889 (None, Some(trim_characters)) => {
890 if let Some(first) = trim_characters.first() {
891 let to_trim = self.sql_expr_to_logical_expr(
892 first.clone(),
893 schema,
894 planner_context,
895 )?;
896 Ok(vec![arg, to_trim])
897 } else {
898 plan_err!("TRIM CHARACTERS cannot be empty")
899 }
900 }
901 (Some(_), Some(_)) => {
902 plan_err!("Both TRIM and TRIM CHARACTERS cannot be specified")
903 }
904 (None, None) => Ok(vec![arg]),
905 }?;
906
907 let fun_name = match trim_where {
908 Some(TrimWhereField::Leading) => "ltrim",
909 Some(TrimWhereField::Trailing) => "rtrim",
910 Some(TrimWhereField::Both) => "btrim",
911 None => "trim",
912 };
913 let fun = self
914 .context_provider
915 .get_function_meta(fun_name)
916 .ok_or_else(|| {
917 internal_datafusion_err!("Unable to find expected '{fun_name}' function")
918 })?;
919
920 Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
921 }
922
923 fn sql_overlay_to_expr(
924 &self,
925 expr: SQLExpr,
926 overlay_what: SQLExpr,
927 overlay_from: SQLExpr,
928 overlay_for: Option<Box<SQLExpr>>,
929 schema: &DFSchema,
930 planner_context: &mut PlannerContext,
931 ) -> Result<Expr> {
932 let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
933 let what_arg =
934 self.sql_expr_to_logical_expr(overlay_what, schema, planner_context)?;
935 let from_arg =
936 self.sql_expr_to_logical_expr(overlay_from, schema, planner_context)?;
937 let mut overlay_args = match overlay_for {
938 Some(for_expr) => {
939 let for_expr =
940 self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
941 vec![arg, what_arg, from_arg, for_expr]
942 }
943 None => vec![arg, what_arg, from_arg],
944 };
945 for planner in self.context_provider.get_expr_planners() {
946 match planner.plan_overlay(overlay_args)? {
947 PlannerResult::Planned(expr) => return Ok(expr),
948 PlannerResult::Original(args) => overlay_args = args,
949 }
950 }
951 not_impl_err!("Overlay not supported by ExprPlanner: {overlay_args:?}")
952 }
953
954 fn sql_cast_to_expr(
955 &self,
956 expr: SQLExpr,
957 data_type: SQLDataType,
958 format: Option<CastFormat>,
959 schema: &DFSchema,
960 planner_context: &mut PlannerContext,
961 ) -> Result<Expr> {
962 if let Some(format) = format {
963 return not_impl_err!("CAST with format is not supported: {format}");
964 }
965
966 let dt = self.convert_data_type(&data_type)?;
967 let expr = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
968
969 let expr = match &dt {
972 DataType::Timestamp(TimeUnit::Nanosecond, tz)
973 if expr.get_type(schema)? == DataType::Int64 =>
974 {
975 Expr::Cast(Cast::new(
976 Box::new(expr),
977 DataType::Timestamp(TimeUnit::Second, tz.clone()),
978 ))
979 }
980 _ => expr,
981 };
982
983 Ok(Expr::Cast(Cast::new(Box::new(expr), dt)))
984 }
985
986 fn sql_compound_field_access_to_expr(
987 &self,
988 root: SQLExpr,
989 access_chain: Vec<AccessExpr>,
990 schema: &DFSchema,
991 planner_context: &mut PlannerContext,
992 ) -> Result<Expr> {
993 let mut root = self.sql_expr_to_logical_expr(root, schema, planner_context)?;
994 let fields = access_chain
995 .into_iter()
996 .map(|field| match field {
997 AccessExpr::Subscript(subscript) => {
998 match subscript {
999 Subscript::Index { index } => {
1000 match index {
1002 SQLExpr::Value(
1003 Value::SingleQuotedString(s)
1004 | Value::DoubleQuotedString(s),
1005 ) => Ok(Some(GetFieldAccess::NamedStructField {
1006 name: ScalarValue::from(s),
1007 })),
1008 SQLExpr::JsonAccess { .. } => {
1009 not_impl_err!("JsonAccess")
1010 }
1011 _ => Ok(Some(GetFieldAccess::ListIndex {
1013 key: Box::new(self.sql_expr_to_logical_expr(
1014 index,
1015 schema,
1016 planner_context,
1017 )?),
1018 })),
1019 }
1020 }
1021 Subscript::Slice {
1022 lower_bound,
1023 upper_bound,
1024 stride,
1025 } => {
1026 let lower_bound = if let Some(lower_bound) = lower_bound {
1028 self.sql_expr_to_logical_expr(
1029 lower_bound,
1030 schema,
1031 planner_context,
1032 )
1033 } else {
1034 not_impl_err!("Slice subscript requires a lower bound")
1035 }?;
1036
1037 let upper_bound = if let Some(upper_bound) = upper_bound {
1039 self.sql_expr_to_logical_expr(
1040 upper_bound,
1041 schema,
1042 planner_context,
1043 )
1044 } else {
1045 not_impl_err!("Slice subscript requires an upper bound")
1046 }?;
1047
1048 let stride = if let Some(stride) = stride {
1050 self.sql_expr_to_logical_expr(
1051 stride,
1052 schema,
1053 planner_context,
1054 )?
1055 } else {
1056 lit(1i64)
1057 };
1058
1059 Ok(Some(GetFieldAccess::ListRange {
1060 start: Box::new(lower_bound),
1061 stop: Box::new(upper_bound),
1062 stride: Box::new(stride),
1063 }))
1064 }
1065 }
1066 }
1067 AccessExpr::Dot(expr) => {
1068 let expr =
1069 self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
1070 match expr {
1071 Expr::Column(Column {
1072 name,
1073 relation,
1074 spans,
1075 }) => {
1076 if let Some(relation) = &relation {
1077 if relation.table() == root.schema_name().to_string() {
1082 root = Expr::Column(Column {
1083 name,
1084 relation: Some(relation.clone()),
1085 spans,
1086 });
1087 Ok(None)
1088 } else {
1089 plan_err!(
1090 "table name mismatch: {} != {}",
1091 relation.table(),
1092 root.schema_name()
1093 )
1094 }
1095 } else {
1096 Ok(Some(GetFieldAccess::NamedStructField {
1097 name: ScalarValue::from(name),
1098 }))
1099 }
1100 }
1101 _ => not_impl_err!(
1102 "Dot access not supported for non-column expr: {expr:?}"
1103 ),
1104 }
1105 }
1106 })
1107 .collect::<Result<Vec<_>>>()?;
1108
1109 fields
1110 .into_iter()
1111 .flatten()
1112 .try_fold(root, |expr, field_access| {
1113 let mut field_access_expr = RawFieldAccessExpr { expr, field_access };
1114 for planner in self.context_provider.get_expr_planners() {
1115 match planner.plan_field_access(field_access_expr, schema)? {
1116 PlannerResult::Planned(expr) => return Ok(expr),
1117 PlannerResult::Original(expr) => {
1118 field_access_expr = expr;
1119 }
1120 }
1121 }
1122 not_impl_err!(
1123 "GetFieldAccess not supported by ExprPlanner: {field_access_expr:?}"
1124 )
1125 })
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use std::collections::HashMap;
1132 use std::sync::Arc;
1133
1134 use arrow::datatypes::{Field, Schema};
1135 use sqlparser::dialect::GenericDialect;
1136 use sqlparser::parser::Parser;
1137
1138 use datafusion_common::config::ConfigOptions;
1139 use datafusion_common::TableReference;
1140 use datafusion_expr::logical_plan::builder::LogicalTableSource;
1141 use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
1142
1143 use super::*;
1144
1145 struct TestContextProvider {
1146 options: ConfigOptions,
1147 tables: HashMap<String, Arc<dyn TableSource>>,
1148 }
1149
1150 impl TestContextProvider {
1151 pub fn new() -> Self {
1152 let mut tables = HashMap::new();
1153 tables.insert(
1154 "table1".to_string(),
1155 create_table_source(vec![Field::new(
1156 "column1".to_string(),
1157 DataType::Utf8,
1158 false,
1159 )]),
1160 );
1161
1162 Self {
1163 options: Default::default(),
1164 tables,
1165 }
1166 }
1167 }
1168
1169 impl ContextProvider for TestContextProvider {
1170 fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
1171 match self.tables.get(name.table()) {
1172 Some(table) => Ok(Arc::clone(table)),
1173 _ => plan_err!("Table not found: {}", name.table()),
1174 }
1175 }
1176
1177 fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
1178 None
1179 }
1180
1181 fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
1182 match name {
1183 "sum" => Some(datafusion_functions_aggregate::sum::sum_udaf()),
1184 _ => None,
1185 }
1186 }
1187
1188 fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
1189 None
1190 }
1191
1192 fn options(&self) -> &ConfigOptions {
1193 &self.options
1194 }
1195
1196 fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
1197 None
1198 }
1199
1200 fn udf_names(&self) -> Vec<String> {
1201 Vec::new()
1202 }
1203
1204 fn udaf_names(&self) -> Vec<String> {
1205 vec!["sum".to_string()]
1206 }
1207
1208 fn udwf_names(&self) -> Vec<String> {
1209 Vec::new()
1210 }
1211 }
1212
1213 fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
1214 Arc::new(LogicalTableSource::new(Arc::new(
1215 Schema::new_with_metadata(fields, HashMap::new()),
1216 )))
1217 }
1218
1219 macro_rules! test_stack_overflow {
1220 ($num_expr:expr) => {
1221 paste::item! {
1222 #[test]
1223 fn [<test_stack_overflow_ $num_expr>]() {
1224 let schema = DFSchema::empty();
1225 let mut planner_context = PlannerContext::default();
1226
1227 let expr_str = (0..$num_expr)
1228 .map(|i| format!("column1 = 'value{:?}'", i))
1229 .collect::<Vec<String>>()
1230 .join(" OR ");
1231
1232 let dialect = GenericDialect{};
1233 let mut parser = Parser::new(&dialect)
1234 .try_with_sql(expr_str.as_str())
1235 .unwrap();
1236 let sql_expr = parser.parse_expr().unwrap();
1237
1238 let context_provider = TestContextProvider::new();
1239 let sql_to_rel = SqlToRel::new(&context_provider);
1240
1241 sql_to_rel.sql_expr_to_logical_expr(
1243 sql_expr,
1244 &schema,
1245 &mut planner_context,
1246 ).unwrap();
1247 }
1248 }
1249 };
1250 }
1251
1252 test_stack_overflow!(64);
1253 test_stack_overflow!(128);
1254 test_stack_overflow!(256);
1255 test_stack_overflow!(512);
1256 test_stack_overflow!(1024);
1257 test_stack_overflow!(2048);
1258 test_stack_overflow!(4096);
1259 test_stack_overflow!(8192);
1260 #[test]
1261 fn test_sql_to_expr_with_alias() {
1262 let schema = DFSchema::empty();
1263 let mut planner_context = PlannerContext::default();
1264
1265 let expr_str = "SUM(int_col) as sum_int_col";
1266
1267 let dialect = GenericDialect {};
1268 let mut parser = Parser::new(&dialect).try_with_sql(expr_str).unwrap();
1269 let sql_expr = parser.parse_expr_with_alias().unwrap();
1271
1272 let context_provider = TestContextProvider::new();
1273 let sql_to_rel = SqlToRel::new(&context_provider);
1274
1275 let expr = sql_to_rel
1276 .sql_expr_to_logical_expr_with_alias(sql_expr, &schema, &mut planner_context)
1277 .unwrap();
1278
1279 assert!(matches!(expr, Expr::Alias(_)));
1280 }
1281}