1use arrow::datatypes::{DataType, TimeUnit};
19use datafusion_expr::planner::{
20 PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr,
21};
22use sqlparser::ast::{
23 AccessExpr, BinaryOperator, CastFormat, CastKind, DataType as SQLDataType,
24 DictionaryField, Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias, MapEntry,
25 StructField, Subscript, TrimWhereField, Value, ValueWithSpan,
26};
27
28use datafusion_common::{
29 internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result,
30 ScalarValue,
31};
32
33use datafusion_expr::expr::ScalarFunction;
34use datafusion_expr::expr::{InList, WildcardOptions};
35use datafusion_expr::{
36 lit, Between, BinaryExpr, Cast, Expr, ExprSchemable, GetFieldAccess, Like, Literal,
37 Operator, TryCast,
38};
39
40use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
41
42mod binary_op;
43mod function;
44mod grouping_set;
45mod identifier;
46mod order_by;
47mod subquery;
48mod substring;
49mod unary_op;
50mod value;
51
52impl<S: ContextProvider> SqlToRel<'_, S> {
53 pub(crate) fn sql_expr_to_logical_expr_with_alias(
54 &self,
55 sql: SQLExprWithAlias,
56 schema: &DFSchema,
57 planner_context: &mut PlannerContext,
58 ) -> Result<Expr> {
59 let mut expr =
60 self.sql_expr_to_logical_expr(sql.expr, schema, planner_context)?;
61 if let Some(alias) = sql.alias {
62 expr = expr.alias(alias.value);
63 }
64 Ok(expr)
65 }
66 pub(crate) fn sql_expr_to_logical_expr(
67 &self,
68 sql: SQLExpr,
69 schema: &DFSchema,
70 planner_context: &mut PlannerContext,
71 ) -> Result<Expr> {
72 enum StackEntry {
73 SQLExpr(Box<SQLExpr>),
74 Operator(BinaryOperator),
75 }
76
77 let mut stack = vec![StackEntry::SQLExpr(Box::new(sql))];
82 let mut eval_stack = vec![];
83
84 while let Some(entry) = stack.pop() {
85 match entry {
86 StackEntry::SQLExpr(sql_expr) => {
87 match *sql_expr {
88 SQLExpr::BinaryOp { left, op, right } => {
89 stack.push(StackEntry::Operator(op));
92 stack.push(StackEntry::SQLExpr(right));
93 stack.push(StackEntry::SQLExpr(left));
94 }
95 _ => {
96 let expr = self.sql_expr_to_logical_expr_internal(
97 *sql_expr,
98 schema,
99 planner_context,
100 )?;
101 eval_stack.push(expr);
102 }
103 }
104 }
105 StackEntry::Operator(op) => {
106 let right = eval_stack.pop().unwrap();
107 let left = eval_stack.pop().unwrap();
108 let expr = self.build_logical_expr(op, left, right, schema)?;
109 eval_stack.push(expr);
110 }
111 }
112 }
113
114 assert_eq!(1, eval_stack.len());
115 let expr = eval_stack.pop().unwrap();
116 Ok(expr)
117 }
118
119 fn build_logical_expr(
120 &self,
121 op: BinaryOperator,
122 left: Expr,
123 right: Expr,
124 schema: &DFSchema,
125 ) -> Result<Expr> {
126 let mut binary_expr = RawBinaryExpr { op, left, right };
128 for planner in self.context_provider.get_expr_planners() {
129 match planner.plan_binary_op(binary_expr, schema)? {
130 PlannerResult::Planned(expr) => {
131 return Ok(expr);
132 }
133 PlannerResult::Original(expr) => {
134 binary_expr = expr;
135 }
136 }
137 }
138
139 let RawBinaryExpr { op, left, right } = binary_expr;
140 Ok(Expr::BinaryExpr(BinaryExpr::new(
141 Box::new(left),
142 self.parse_sql_binary_op(op)?,
143 Box::new(right),
144 )))
145 }
146
147 pub fn sql_to_expr_with_alias(
148 &self,
149 sql: SQLExprWithAlias,
150 schema: &DFSchema,
151 planner_context: &mut PlannerContext,
152 ) -> Result<Expr> {
153 let mut expr =
154 self.sql_expr_to_logical_expr_with_alias(sql, schema, planner_context)?;
155 expr = self.rewrite_partial_qualifier(expr, schema);
156 self.validate_schema_satisfies_exprs(schema, &[expr.clone()])?;
157 let (expr, _) = expr.infer_placeholder_types(schema)?;
158 Ok(expr)
159 }
160
161 pub fn sql_to_expr(
163 &self,
164 sql: SQLExpr,
165 schema: &DFSchema,
166 planner_context: &mut PlannerContext,
167 ) -> Result<Expr> {
168 let mut expr = self.sql_expr_to_logical_expr(sql, schema, planner_context)?;
170 expr = self.rewrite_partial_qualifier(expr, schema);
171 self.validate_schema_satisfies_exprs(schema, std::slice::from_ref(&expr))?;
172 let (expr, _) = expr.infer_placeholder_types(schema)?;
173 Ok(expr)
174 }
175
176 fn rewrite_partial_qualifier(&self, expr: Expr, schema: &DFSchema) -> Expr {
178 match expr {
179 Expr::Column(col) => match &col.relation {
180 Some(q) => {
181 match schema.iter().find(|(qualifier, field)| match qualifier {
182 Some(field_q) => {
183 field.name() == &col.name
184 && field_q.to_string().ends_with(&format!(".{q}"))
185 }
186 _ => false,
187 }) {
188 Some((qualifier, df_field)) => Expr::from((qualifier, df_field)),
189 None => Expr::Column(col),
190 }
191 }
192 None => Expr::Column(col),
193 },
194 _ => expr,
195 }
196 }
197
198 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
201 fn sql_expr_to_logical_expr_internal(
202 &self,
203 sql: SQLExpr,
204 schema: &DFSchema,
205 planner_context: &mut PlannerContext,
206 ) -> Result<Expr> {
207 match sql {
213 SQLExpr::Value(value) => {
214 self.parse_value(value.into(), planner_context.prepare_param_data_types())
215 }
216 SQLExpr::Extract { field, expr, .. } => {
217 let mut extract_args = vec![
218 Expr::Literal(ScalarValue::from(format!("{field}")), None),
219 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
220 ];
221
222 for planner in self.context_provider.get_expr_planners() {
223 match planner.plan_extract(extract_args)? {
224 PlannerResult::Planned(expr) => return Ok(expr),
225 PlannerResult::Original(args) => {
226 extract_args = args;
227 }
228 }
229 }
230
231 not_impl_err!("Extract not supported by ExprPlanner: {extract_args:?}")
232 }
233
234 SQLExpr::Array(arr) => self.sql_array_literal(arr.elem, schema),
235 SQLExpr::Interval(interval) => self.sql_interval_to_expr(false, interval),
236 SQLExpr::Identifier(id) => {
237 self.sql_identifier_to_expr(id, schema, planner_context)
238 }
239
240 SQLExpr::CompoundFieldAccess { root, access_chain } => self
242 .sql_compound_field_access_to_expr(
243 *root,
244 access_chain,
245 schema,
246 planner_context,
247 ),
248
249 SQLExpr::CompoundIdentifier(ids) => {
250 self.sql_compound_identifier_to_expr(ids, schema, planner_context)
251 }
252
253 SQLExpr::Case {
254 operand,
255 conditions,
256 else_result,
257 case_token: _,
258 end_token: _,
259 } => self.sql_case_identifier_to_expr(
260 operand,
261 conditions,
262 else_result,
263 schema,
264 planner_context,
265 ),
266
267 SQLExpr::Cast {
268 kind: CastKind::Cast | CastKind::DoubleColon,
269 expr,
270 data_type,
271 format,
272 } => self.sql_cast_to_expr(*expr, data_type, format, schema, planner_context),
273
274 SQLExpr::Cast {
275 kind: CastKind::TryCast | CastKind::SafeCast,
276 expr,
277 data_type,
278 format,
279 } => {
280 if let Some(format) = format {
281 return not_impl_err!("CAST with format is not supported: {format}");
282 }
283
284 Ok(Expr::TryCast(TryCast::new(
285 Box::new(self.sql_expr_to_logical_expr(
286 *expr,
287 schema,
288 planner_context,
289 )?),
290 self.convert_data_type(&data_type)?,
291 )))
292 }
293
294 SQLExpr::TypedString { data_type, value } => Ok(Expr::Cast(Cast::new(
295 Box::new(lit(value.into_string().unwrap())),
296 self.convert_data_type(&data_type)?,
297 ))),
298
299 SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new(
300 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
301 ))),
302
303 SQLExpr::IsNotNull(expr) => Ok(Expr::IsNotNull(Box::new(
304 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
305 ))),
306
307 SQLExpr::IsDistinctFrom(left, right) => {
308 Ok(Expr::BinaryExpr(BinaryExpr::new(
309 Box::new(self.sql_expr_to_logical_expr(
310 *left,
311 schema,
312 planner_context,
313 )?),
314 Operator::IsDistinctFrom,
315 Box::new(self.sql_expr_to_logical_expr(
316 *right,
317 schema,
318 planner_context,
319 )?),
320 )))
321 }
322
323 SQLExpr::IsNotDistinctFrom(left, right) => {
324 Ok(Expr::BinaryExpr(BinaryExpr::new(
325 Box::new(self.sql_expr_to_logical_expr(
326 *left,
327 schema,
328 planner_context,
329 )?),
330 Operator::IsNotDistinctFrom,
331 Box::new(self.sql_expr_to_logical_expr(
332 *right,
333 schema,
334 planner_context,
335 )?),
336 )))
337 }
338
339 SQLExpr::IsTrue(expr) => Ok(Expr::IsTrue(Box::new(
340 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
341 ))),
342
343 SQLExpr::IsFalse(expr) => Ok(Expr::IsFalse(Box::new(
344 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
345 ))),
346
347 SQLExpr::IsNotTrue(expr) => Ok(Expr::IsNotTrue(Box::new(
348 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
349 ))),
350
351 SQLExpr::IsNotFalse(expr) => Ok(Expr::IsNotFalse(Box::new(
352 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
353 ))),
354
355 SQLExpr::IsUnknown(expr) => Ok(Expr::IsUnknown(Box::new(
356 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
357 ))),
358
359 SQLExpr::IsNotUnknown(expr) => Ok(Expr::IsNotUnknown(Box::new(
360 self.sql_expr_to_logical_expr(*expr, schema, planner_context)?,
361 ))),
362
363 SQLExpr::UnaryOp { op, expr } => {
364 self.parse_sql_unary_op(op, *expr, schema, planner_context)
365 }
366
367 SQLExpr::Between {
368 expr,
369 negated,
370 low,
371 high,
372 } => Ok(Expr::Between(Between::new(
373 Box::new(self.sql_expr_to_logical_expr(
374 *expr,
375 schema,
376 planner_context,
377 )?),
378 negated,
379 Box::new(self.sql_expr_to_logical_expr(*low, schema, planner_context)?),
380 Box::new(self.sql_expr_to_logical_expr(
381 *high,
382 schema,
383 planner_context,
384 )?),
385 ))),
386
387 SQLExpr::InList {
388 expr,
389 list,
390 negated,
391 } => self.sql_in_list_to_expr(*expr, list, negated, schema, planner_context),
392
393 SQLExpr::Like {
394 negated,
395 expr,
396 pattern,
397 escape_char,
398 any,
399 } => self.sql_like_to_expr(
400 negated,
401 *expr,
402 *pattern,
403 escape_char,
404 schema,
405 planner_context,
406 false,
407 any,
408 ),
409
410 SQLExpr::ILike {
411 negated,
412 expr,
413 pattern,
414 escape_char,
415 any,
416 } => self.sql_like_to_expr(
417 negated,
418 *expr,
419 *pattern,
420 escape_char,
421 schema,
422 planner_context,
423 true,
424 any,
425 ),
426
427 SQLExpr::SimilarTo {
428 negated,
429 expr,
430 pattern,
431 escape_char,
432 } => self.sql_similarto_to_expr(
433 negated,
434 *expr,
435 *pattern,
436 escape_char,
437 schema,
438 planner_context,
439 ),
440
441 SQLExpr::BinaryOp { .. } => {
442 internal_err!("binary_op should be handled by sql_expr_to_logical_expr.")
443 }
444
445 #[cfg(feature = "unicode_expressions")]
446 SQLExpr::Substring {
447 expr,
448 substring_from,
449 substring_for,
450 special: _,
451 shorthand: _,
452 } => self.sql_substring_to_expr(
453 expr,
454 substring_from,
455 substring_for,
456 schema,
457 planner_context,
458 ),
459
460 #[cfg(not(feature = "unicode_expressions"))]
461 SQLExpr::Substring { .. } => {
462 internal_err!(
463 "statement substring requires compilation with feature flag: unicode_expressions."
464 )
465 }
466
467 SQLExpr::Trim {
468 expr,
469 trim_where,
470 trim_what,
471 trim_characters,
472 } => self.sql_trim_to_expr(
473 *expr,
474 trim_where,
475 trim_what,
476 trim_characters,
477 schema,
478 planner_context,
479 ),
480
481 SQLExpr::Function(function) => {
482 self.sql_function_to_expr(function, schema, planner_context)
483 }
484
485 SQLExpr::Rollup(exprs) => {
486 self.sql_rollup_to_expr(exprs, schema, planner_context)
487 }
488 SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs, schema, planner_context),
489 SQLExpr::GroupingSets(exprs) => {
490 self.sql_grouping_sets_to_expr(exprs, schema, planner_context)
491 }
492
493 SQLExpr::Floor {
494 expr,
495 field: _field,
496 } => self.sql_fn_name_to_expr(*expr, "floor", schema, planner_context),
497 SQLExpr::Ceil {
498 expr,
499 field: _field,
500 } => self.sql_fn_name_to_expr(*expr, "ceil", schema, planner_context),
501 SQLExpr::Overlay {
502 expr,
503 overlay_what,
504 overlay_from,
505 overlay_for,
506 } => self.sql_overlay_to_expr(
507 *expr,
508 *overlay_what,
509 *overlay_from,
510 overlay_for,
511 schema,
512 planner_context,
513 ),
514 SQLExpr::Nested(e) => {
515 self.sql_expr_to_logical_expr(*e, schema, planner_context)
516 }
517
518 SQLExpr::Exists { subquery, negated } => {
519 self.parse_exists_subquery(*subquery, negated, schema, planner_context)
520 }
521 SQLExpr::InSubquery {
522 expr,
523 subquery,
524 negated,
525 } => {
526 self.parse_in_subquery(*expr, *subquery, negated, schema, planner_context)
527 }
528 SQLExpr::Subquery(subquery) => {
529 self.parse_scalar_subquery(*subquery, schema, planner_context)
530 }
531
532 SQLExpr::Struct { values, fields } => {
533 self.parse_struct(schema, planner_context, values, fields)
534 }
535 SQLExpr::Position { expr, r#in } => {
536 self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
537 }
538 SQLExpr::AtTimeZone {
539 timestamp,
540 time_zone,
541 } => Ok(Expr::Cast(Cast::new(
542 Box::new(self.sql_expr_to_logical_expr_internal(
543 *timestamp,
544 schema,
545 planner_context,
546 )?),
547 match *time_zone {
548 SQLExpr::Value(ValueWithSpan {
549 value: Value::SingleQuotedString(s),
550 span: _,
551 }) => DataType::Timestamp(TimeUnit::Nanosecond, Some(s.into())),
552 _ => {
553 return not_impl_err!(
554 "Unsupported ast node in sqltorel: {time_zone:?}"
555 )
556 }
557 },
558 ))),
559 SQLExpr::Dictionary(fields) => {
560 self.try_plan_dictionary_literal(fields, schema, planner_context)
561 }
562 SQLExpr::Map(map) => {
563 self.try_plan_map_literal(map.entries, schema, planner_context)
564 }
565 SQLExpr::AnyOp {
566 left,
567 compare_op,
568 right,
569 is_some: _,
572 } => {
573 let mut binary_expr = RawBinaryExpr {
574 op: compare_op,
575 left: self.sql_expr_to_logical_expr(
576 *left,
577 schema,
578 planner_context,
579 )?,
580 right: self.sql_expr_to_logical_expr(
581 *right,
582 schema,
583 planner_context,
584 )?,
585 };
586 for planner in self.context_provider.get_expr_planners() {
587 match planner.plan_any(binary_expr)? {
588 PlannerResult::Planned(expr) => {
589 return Ok(expr);
590 }
591 PlannerResult::Original(expr) => {
592 binary_expr = expr;
593 }
594 }
595 }
596 not_impl_err!("AnyOp not supported by ExprPlanner: {binary_expr:?}")
597 }
598 #[expect(deprecated)]
599 SQLExpr::Wildcard(_token) => Ok(Expr::Wildcard {
600 qualifier: None,
601 options: Box::new(WildcardOptions::default()),
602 }),
603 #[expect(deprecated)]
604 SQLExpr::QualifiedWildcard(object_name, _token) => Ok(Expr::Wildcard {
605 qualifier: Some(self.object_name_to_table_reference(object_name)?),
606 options: Box::new(WildcardOptions::default()),
607 }),
608 SQLExpr::Tuple(values) => self.parse_tuple(schema, planner_context, values),
609 _ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
610 }
611 }
612
613 fn parse_struct(
615 &self,
616 schema: &DFSchema,
617 planner_context: &mut PlannerContext,
618 values: Vec<SQLExpr>,
619 fields: Vec<StructField>,
620 ) -> Result<Expr> {
621 if !fields.is_empty() {
622 return not_impl_err!("Struct fields are not supported yet");
623 }
624 let is_named_struct = values
625 .iter()
626 .any(|value| matches!(value, SQLExpr::Named { .. }));
627
628 let mut create_struct_args = if is_named_struct {
629 self.create_named_struct_expr(values, schema, planner_context)?
630 } else {
631 self.create_struct_expr(values, schema, planner_context)?
632 };
633
634 for planner in self.context_provider.get_expr_planners() {
635 match planner.plan_struct_literal(create_struct_args, is_named_struct)? {
636 PlannerResult::Planned(expr) => return Ok(expr),
637 PlannerResult::Original(args) => create_struct_args = args,
638 }
639 }
640 not_impl_err!("Struct not supported by ExprPlanner: {create_struct_args:?}")
641 }
642
643 fn parse_tuple(
644 &self,
645 schema: &DFSchema,
646 planner_context: &mut PlannerContext,
647 values: Vec<SQLExpr>,
648 ) -> Result<Expr> {
649 match values.first() {
650 Some(SQLExpr::Identifier(_))
651 | Some(SQLExpr::Value(_))
652 | Some(SQLExpr::CompoundIdentifier(_)) => {
653 self.parse_struct(schema, planner_context, values, vec![])
654 }
655 None => not_impl_err!("Empty tuple not supported yet"),
656 _ => {
657 not_impl_err!("Only identifiers and literals are supported in tuples")
658 }
659 }
660 }
661
662 fn sql_position_to_expr(
663 &self,
664 substr_expr: SQLExpr,
665 str_expr: SQLExpr,
666 schema: &DFSchema,
667 planner_context: &mut PlannerContext,
668 ) -> Result<Expr> {
669 let substr =
670 self.sql_expr_to_logical_expr(substr_expr, schema, planner_context)?;
671 let fullstr = self.sql_expr_to_logical_expr(str_expr, schema, planner_context)?;
672 let mut position_args = vec![fullstr, substr];
673 for planner in self.context_provider.get_expr_planners() {
674 match planner.plan_position(position_args)? {
675 PlannerResult::Planned(expr) => return Ok(expr),
676 PlannerResult::Original(args) => {
677 position_args = args;
678 }
679 }
680 }
681
682 not_impl_err!("Position not supported by ExprPlanner: {position_args:?}")
683 }
684
685 fn try_plan_dictionary_literal(
686 &self,
687 fields: Vec<DictionaryField>,
688 schema: &DFSchema,
689 planner_context: &mut PlannerContext,
690 ) -> Result<Expr> {
691 let mut keys = vec![];
692 let mut values = vec![];
693 for field in fields {
694 let key = lit(field.key.value);
695 let value =
696 self.sql_expr_to_logical_expr(*field.value, schema, planner_context)?;
697 keys.push(key);
698 values.push(value);
699 }
700
701 let mut raw_expr = RawDictionaryExpr { keys, values };
702
703 for planner in self.context_provider.get_expr_planners() {
704 match planner.plan_dictionary_literal(raw_expr, schema)? {
705 PlannerResult::Planned(expr) => {
706 return Ok(expr);
707 }
708 PlannerResult::Original(expr) => raw_expr = expr,
709 }
710 }
711 not_impl_err!("Dictionary not supported by ExprPlanner: {raw_expr:?}")
712 }
713
714 fn try_plan_map_literal(
715 &self,
716 entries: Vec<MapEntry>,
717 schema: &DFSchema,
718 planner_context: &mut PlannerContext,
719 ) -> Result<Expr> {
720 let mut exprs: Vec<_> = entries
721 .into_iter()
722 .flat_map(|entry| vec![entry.key, entry.value].into_iter())
723 .map(|expr| self.sql_expr_to_logical_expr(*expr, schema, planner_context))
724 .collect::<Result<Vec<_>>>()?;
725 for planner in self.context_provider.get_expr_planners() {
726 match planner.plan_make_map(exprs)? {
727 PlannerResult::Planned(expr) => {
728 return Ok(expr);
729 }
730 PlannerResult::Original(expr) => exprs = expr,
731 }
732 }
733 not_impl_err!("MAP not supported by ExprPlanner: {exprs:?}")
734 }
735
736 fn create_named_struct_expr(
739 &self,
740 values: Vec<SQLExpr>,
741 input_schema: &DFSchema,
742 planner_context: &mut PlannerContext,
743 ) -> Result<Vec<Expr>> {
744 Ok(values
745 .into_iter()
746 .enumerate()
747 .map(|(i, value)| {
748 let args = if let SQLExpr::Named { expr, name } = value {
749 [
750 name.value.lit(),
751 self.sql_expr_to_logical_expr(
752 *expr,
753 input_schema,
754 planner_context,
755 )?,
756 ]
757 } else {
758 [
759 format!("c{i}").lit(),
760 self.sql_expr_to_logical_expr(
761 value,
762 input_schema,
763 planner_context,
764 )?,
765 ]
766 };
767
768 Ok(args)
769 })
770 .collect::<Result<Vec<_>>>()?
771 .into_iter()
772 .flatten()
773 .collect())
774 }
775
776 fn create_struct_expr(
780 &self,
781 values: Vec<SQLExpr>,
782 input_schema: &DFSchema,
783 planner_context: &mut PlannerContext,
784 ) -> Result<Vec<Expr>> {
785 values
786 .into_iter()
787 .map(|value| {
788 self.sql_expr_to_logical_expr(value, input_schema, planner_context)
789 })
790 .collect::<Result<Vec<_>>>()
791 }
792
793 fn sql_in_list_to_expr(
794 &self,
795 expr: SQLExpr,
796 list: Vec<SQLExpr>,
797 negated: bool,
798 schema: &DFSchema,
799 planner_context: &mut PlannerContext,
800 ) -> Result<Expr> {
801 let list_expr = list
802 .into_iter()
803 .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
804 .collect::<Result<Vec<_>>>()?;
805
806 Ok(Expr::InList(InList::new(
807 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
808 list_expr,
809 negated,
810 )))
811 }
812
813 #[allow(clippy::too_many_arguments)]
814 fn sql_like_to_expr(
815 &self,
816 negated: bool,
817 expr: SQLExpr,
818 pattern: SQLExpr,
819 escape_char: Option<Value>,
820 schema: &DFSchema,
821 planner_context: &mut PlannerContext,
822 case_insensitive: bool,
823 any: bool,
824 ) -> Result<Expr> {
825 if any {
826 return not_impl_err!("ANY in LIKE expression");
827 }
828 let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
829 let escape_char = match escape_char {
830 Some(Value::SingleQuotedString(char)) if char.len() == 1 => {
831 Some(char.chars().next().unwrap())
832 }
833 Some(value) => return plan_err!("Invalid escape character in LIKE expression. Expected a single character wrapped with single quotes, got {value}"),
834 None => None,
835 };
836 Ok(Expr::Like(Like::new(
837 negated,
838 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
839 Box::new(pattern),
840 escape_char,
841 case_insensitive,
842 )))
843 }
844
845 fn sql_similarto_to_expr(
846 &self,
847 negated: bool,
848 expr: SQLExpr,
849 pattern: SQLExpr,
850 escape_char: Option<Value>,
851 schema: &DFSchema,
852 planner_context: &mut PlannerContext,
853 ) -> Result<Expr> {
854 let pattern = self.sql_expr_to_logical_expr(pattern, schema, planner_context)?;
855 let pattern_type = pattern.get_type(schema)?;
856 if pattern_type != DataType::Utf8 && pattern_type != DataType::Null {
857 return plan_err!("Invalid pattern in SIMILAR TO expression");
858 }
859 let escape_char = match escape_char {
860 Some(Value::SingleQuotedString(char)) if char.len() == 1 => {
861 Some(char.chars().next().unwrap())
862 }
863 Some(value) => return plan_err!("Invalid escape character in SIMILAR TO expression. Expected a single character wrapped with single quotes, got {value}"),
864 None => None,
865 };
866 Ok(Expr::SimilarTo(Like::new(
867 negated,
868 Box::new(self.sql_expr_to_logical_expr(expr, schema, planner_context)?),
869 Box::new(pattern),
870 escape_char,
871 false,
872 )))
873 }
874
875 fn sql_trim_to_expr(
876 &self,
877 expr: SQLExpr,
878 trim_where: Option<TrimWhereField>,
879 trim_what: Option<Box<SQLExpr>>,
880 trim_characters: Option<Vec<SQLExpr>>,
881 schema: &DFSchema,
882 planner_context: &mut PlannerContext,
883 ) -> Result<Expr> {
884 let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
885 let args = match (trim_what, trim_characters) {
886 (Some(to_trim), None) => {
887 let to_trim =
888 self.sql_expr_to_logical_expr(*to_trim, schema, planner_context)?;
889 Ok(vec![arg, to_trim])
890 }
891 (None, Some(trim_characters)) => {
892 if let Some(first) = trim_characters.first() {
893 let to_trim = self.sql_expr_to_logical_expr(
894 first.clone(),
895 schema,
896 planner_context,
897 )?;
898 Ok(vec![arg, to_trim])
899 } else {
900 plan_err!("TRIM CHARACTERS cannot be empty")
901 }
902 }
903 (Some(_), Some(_)) => {
904 plan_err!("Both TRIM and TRIM CHARACTERS cannot be specified")
905 }
906 (None, None) => Ok(vec![arg]),
907 }?;
908
909 let fun_name = match trim_where {
910 Some(TrimWhereField::Leading) => "ltrim",
911 Some(TrimWhereField::Trailing) => "rtrim",
912 Some(TrimWhereField::Both) => "btrim",
913 None => "trim",
914 };
915 let fun = self
916 .context_provider
917 .get_function_meta(fun_name)
918 .ok_or_else(|| {
919 internal_datafusion_err!("Unable to find expected '{fun_name}' function")
920 })?;
921
922 Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
923 }
924
925 fn sql_overlay_to_expr(
926 &self,
927 expr: SQLExpr,
928 overlay_what: SQLExpr,
929 overlay_from: SQLExpr,
930 overlay_for: Option<Box<SQLExpr>>,
931 schema: &DFSchema,
932 planner_context: &mut PlannerContext,
933 ) -> Result<Expr> {
934 let arg = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
935 let what_arg =
936 self.sql_expr_to_logical_expr(overlay_what, schema, planner_context)?;
937 let from_arg =
938 self.sql_expr_to_logical_expr(overlay_from, schema, planner_context)?;
939 let mut overlay_args = match overlay_for {
940 Some(for_expr) => {
941 let for_expr =
942 self.sql_expr_to_logical_expr(*for_expr, schema, planner_context)?;
943 vec![arg, what_arg, from_arg, for_expr]
944 }
945 None => vec![arg, what_arg, from_arg],
946 };
947 for planner in self.context_provider.get_expr_planners() {
948 match planner.plan_overlay(overlay_args)? {
949 PlannerResult::Planned(expr) => return Ok(expr),
950 PlannerResult::Original(args) => overlay_args = args,
951 }
952 }
953 not_impl_err!("Overlay not supported by ExprPlanner: {overlay_args:?}")
954 }
955
956 fn sql_cast_to_expr(
957 &self,
958 expr: SQLExpr,
959 data_type: SQLDataType,
960 format: Option<CastFormat>,
961 schema: &DFSchema,
962 planner_context: &mut PlannerContext,
963 ) -> Result<Expr> {
964 if let Some(format) = format {
965 return not_impl_err!("CAST with format is not supported: {format}");
966 }
967
968 let dt = self.convert_data_type(&data_type)?;
969 let expr = self.sql_expr_to_logical_expr(expr, schema, planner_context)?;
970
971 let expr = match &dt {
974 DataType::Timestamp(TimeUnit::Nanosecond, tz)
975 if expr.get_type(schema)? == DataType::Int64 =>
976 {
977 Expr::Cast(Cast::new(
978 Box::new(expr),
979 DataType::Timestamp(TimeUnit::Second, tz.clone()),
980 ))
981 }
982 _ => expr,
983 };
984
985 Ok(Expr::Cast(Cast::new(Box::new(expr), dt)))
986 }
987
988 fn extract_root_and_access_chain(
1006 &self,
1007 root: SQLExpr,
1008 mut access_chain: Vec<AccessExpr>,
1009 schema: &DFSchema,
1010 planner_context: &mut PlannerContext,
1011 ) -> Result<(Expr, Vec<AccessExpr>)> {
1012 let SQLExpr::Identifier(root_ident) = root else {
1013 let root = self.sql_expr_to_logical_expr(root, schema, planner_context)?;
1014 return Ok((root, access_chain));
1015 };
1016
1017 let mut compound_idents = vec![root_ident];
1018 let first_non_ident = access_chain
1019 .iter()
1020 .position(|access| !matches!(access, AccessExpr::Dot(SQLExpr::Identifier(_))))
1021 .unwrap_or(access_chain.len());
1022 for access in access_chain.drain(0..first_non_ident) {
1023 if let AccessExpr::Dot(SQLExpr::Identifier(ident)) = access {
1024 compound_idents.push(ident);
1025 } else {
1026 return internal_err!("Expected identifier in access chain");
1027 }
1028 }
1029
1030 let root = if compound_idents.len() == 1 {
1031 self.sql_identifier_to_expr(
1032 compound_idents.pop().unwrap(),
1033 schema,
1034 planner_context,
1035 )?
1036 } else {
1037 self.sql_compound_identifier_to_expr(
1038 compound_idents,
1039 schema,
1040 planner_context,
1041 )?
1042 };
1043 Ok((root, access_chain))
1044 }
1045
1046 fn sql_compound_field_access_to_expr(
1047 &self,
1048 root: SQLExpr,
1049 access_chain: Vec<AccessExpr>,
1050 schema: &DFSchema,
1051 planner_context: &mut PlannerContext,
1052 ) -> Result<Expr> {
1053 let (root, access_chain) = self.extract_root_and_access_chain(
1054 root,
1055 access_chain,
1056 schema,
1057 planner_context,
1058 )?;
1059 let fields = access_chain
1060 .into_iter()
1061 .map(|field| match field {
1062 AccessExpr::Subscript(subscript) => {
1063 match subscript {
1064 Subscript::Index { index } => {
1065 match index {
1067 SQLExpr::Value(ValueWithSpan {
1068 value:
1069 Value::SingleQuotedString(s)
1070 | Value::DoubleQuotedString(s),
1071 span: _,
1072 }) => Ok(Some(GetFieldAccess::NamedStructField {
1073 name: ScalarValue::from(s),
1074 })),
1075 SQLExpr::JsonAccess { .. } => {
1076 not_impl_err!("JsonAccess")
1077 }
1078 _ => Ok(Some(GetFieldAccess::ListIndex {
1080 key: Box::new(self.sql_expr_to_logical_expr(
1081 index,
1082 schema,
1083 planner_context,
1084 )?),
1085 })),
1086 }
1087 }
1088 Subscript::Slice {
1089 lower_bound,
1090 upper_bound,
1091 stride,
1092 } => {
1093 let lower_bound = if let Some(lower_bound) = lower_bound {
1095 self.sql_expr_to_logical_expr(
1096 lower_bound,
1097 schema,
1098 planner_context,
1099 )
1100 } else {
1101 not_impl_err!("Slice subscript requires a lower bound")
1102 }?;
1103
1104 let upper_bound = if let Some(upper_bound) = upper_bound {
1106 self.sql_expr_to_logical_expr(
1107 upper_bound,
1108 schema,
1109 planner_context,
1110 )
1111 } else {
1112 not_impl_err!("Slice subscript requires an upper bound")
1113 }?;
1114
1115 let stride = if let Some(stride) = stride {
1117 self.sql_expr_to_logical_expr(
1118 stride,
1119 schema,
1120 planner_context,
1121 )?
1122 } else {
1123 lit(1i64)
1124 };
1125
1126 Ok(Some(GetFieldAccess::ListRange {
1127 start: Box::new(lower_bound),
1128 stop: Box::new(upper_bound),
1129 stride: Box::new(stride),
1130 }))
1131 }
1132 }
1133 }
1134 AccessExpr::Dot(expr) => match expr {
1135 SQLExpr::Value(ValueWithSpan {
1136 value: Value::SingleQuotedString(s) | Value::DoubleQuotedString(s),
1137 span : _
1138 }) => Ok(Some(GetFieldAccess::NamedStructField {
1139 name: ScalarValue::from(s),
1140 })),
1141 _ => {
1142 not_impl_err!(
1143 "Dot access not supported for non-string expr: {expr:?}"
1144 )
1145 }
1146 },
1147 })
1148 .collect::<Result<Vec<_>>>()?;
1149
1150 fields
1151 .into_iter()
1152 .flatten()
1153 .try_fold(root, |expr, field_access| {
1154 let mut field_access_expr = RawFieldAccessExpr { expr, field_access };
1155 for planner in self.context_provider.get_expr_planners() {
1156 match planner.plan_field_access(field_access_expr, schema)? {
1157 PlannerResult::Planned(expr) => return Ok(expr),
1158 PlannerResult::Original(expr) => {
1159 field_access_expr = expr;
1160 }
1161 }
1162 }
1163 not_impl_err!(
1164 "GetFieldAccess not supported by ExprPlanner: {field_access_expr:?}"
1165 )
1166 })
1167 }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use std::collections::HashMap;
1173 use std::sync::Arc;
1174
1175 use arrow::datatypes::{Field, Schema};
1176 use sqlparser::dialect::GenericDialect;
1177 use sqlparser::parser::Parser;
1178
1179 use datafusion_common::config::ConfigOptions;
1180 use datafusion_common::TableReference;
1181 use datafusion_expr::logical_plan::builder::LogicalTableSource;
1182 use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
1183
1184 use super::*;
1185
1186 struct TestContextProvider {
1187 options: ConfigOptions,
1188 tables: HashMap<String, Arc<dyn TableSource>>,
1189 }
1190
1191 impl TestContextProvider {
1192 pub fn new() -> Self {
1193 let mut tables = HashMap::new();
1194 tables.insert(
1195 "table1".to_string(),
1196 create_table_source(vec![Field::new(
1197 "column1".to_string(),
1198 DataType::Utf8,
1199 false,
1200 )]),
1201 );
1202
1203 Self {
1204 options: Default::default(),
1205 tables,
1206 }
1207 }
1208 }
1209
1210 impl ContextProvider for TestContextProvider {
1211 fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
1212 match self.tables.get(name.table()) {
1213 Some(table) => Ok(Arc::clone(table)),
1214 _ => plan_err!("Table not found: {}", name.table()),
1215 }
1216 }
1217
1218 fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
1219 None
1220 }
1221
1222 fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
1223 match name {
1224 "sum" => Some(datafusion_functions_aggregate::sum::sum_udaf()),
1225 _ => None,
1226 }
1227 }
1228
1229 fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
1230 None
1231 }
1232
1233 fn options(&self) -> &ConfigOptions {
1234 &self.options
1235 }
1236
1237 fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
1238 None
1239 }
1240
1241 fn udf_names(&self) -> Vec<String> {
1242 Vec::new()
1243 }
1244
1245 fn udaf_names(&self) -> Vec<String> {
1246 vec!["sum".to_string()]
1247 }
1248
1249 fn udwf_names(&self) -> Vec<String> {
1250 Vec::new()
1251 }
1252 }
1253
1254 fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
1255 Arc::new(LogicalTableSource::new(Arc::new(
1256 Schema::new_with_metadata(fields, HashMap::new()),
1257 )))
1258 }
1259
1260 macro_rules! test_stack_overflow {
1261 ($num_expr:expr) => {
1262 paste::item! {
1263 #[test]
1264 fn [<test_stack_overflow_ $num_expr>]() {
1265 let schema = DFSchema::empty();
1266 let mut planner_context = PlannerContext::default();
1267
1268 let expr_str = (0..$num_expr)
1269 .map(|i| format!("column1 = 'value{:?}'", i))
1270 .collect::<Vec<String>>()
1271 .join(" OR ");
1272
1273 let dialect = GenericDialect{};
1274 let mut parser = Parser::new(&dialect)
1275 .try_with_sql(expr_str.as_str())
1276 .unwrap();
1277 let sql_expr = parser.parse_expr().unwrap();
1278
1279 let context_provider = TestContextProvider::new();
1280 let sql_to_rel = SqlToRel::new(&context_provider);
1281
1282 sql_to_rel.sql_expr_to_logical_expr(
1284 sql_expr,
1285 &schema,
1286 &mut planner_context,
1287 ).unwrap();
1288 }
1289 }
1290 };
1291 }
1292
1293 test_stack_overflow!(64);
1294 test_stack_overflow!(128);
1295 test_stack_overflow!(256);
1296 test_stack_overflow!(512);
1297 test_stack_overflow!(1024);
1298 test_stack_overflow!(2048);
1299 test_stack_overflow!(4096);
1300 test_stack_overflow!(8192);
1301 #[test]
1302 fn test_sql_to_expr_with_alias() {
1303 let schema = DFSchema::empty();
1304 let mut planner_context = PlannerContext::default();
1305
1306 let expr_str = "SUM(int_col) as sum_int_col";
1307
1308 let dialect = GenericDialect {};
1309 let mut parser = Parser::new(&dialect).try_with_sql(expr_str).unwrap();
1310 let sql_expr = parser.parse_expr_with_alias().unwrap();
1312
1313 let context_provider = TestContextProvider::new();
1314 let sql_to_rel = SqlToRel::new(&context_provider);
1315
1316 let expr = sql_to_rel
1317 .sql_expr_to_logical_expr_with_alias(sql_expr, &schema, &mut planner_context)
1318 .unwrap();
1319
1320 assert!(matches!(expr, Expr::Alias(_)));
1321 }
1322}