1use std::vec;
21
22use arrow::datatypes::{
23 DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE,
24};
25use datafusion_common::tree_node::{
26 Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
27};
28use datafusion_common::{
29 exec_err, internal_err, plan_err, Column, DFSchemaRef, DataFusionError, Diagnostic,
30 HashMap, Result, ScalarValue,
31};
32use datafusion_expr::builder::get_struct_unnested_columns;
33use datafusion_expr::expr::{
34 Alias, GroupingSet, Unnest, WindowFunction, WindowFunctionParams,
35};
36use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs};
37use datafusion_expr::{
38 col, expr_vec_fmt, ColumnUnnestList, Expr, ExprSchemable, LogicalPlan,
39};
40
41use indexmap::IndexMap;
42use sqlparser::ast::{Ident, Value};
43
44pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
46 expr.clone()
47 .transform_up(|nested_expr| {
48 match nested_expr {
49 Expr::Column(col) => {
50 let (qualifier, field) =
51 plan.schema().qualified_field_from_column(&col)?;
52 Ok(Transformed::yes(Expr::Column(Column::from((
53 qualifier, field,
54 )))))
55 }
56 _ => {
57 Ok(Transformed::no(nested_expr))
59 }
60 }
61 })
62 .data()
63}
64
65pub(crate) fn rebase_expr(
80 expr: &Expr,
81 base_exprs: &[Expr],
82 plan: &LogicalPlan,
83) -> Result<Expr> {
84 expr.clone()
85 .transform_down(|nested_expr| {
86 if base_exprs.contains(&nested_expr) {
87 Ok(Transformed::yes(expr_as_column_expr(&nested_expr, plan)?))
88 } else {
89 Ok(Transformed::no(nested_expr))
90 }
91 })
92 .data()
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub(crate) enum CheckColumnsSatisfyExprsPurpose {
97 ProjectionMustReferenceAggregate,
98 HavingMustReferenceAggregate,
99}
100
101impl CheckColumnsSatisfyExprsPurpose {
102 fn message_prefix(&self) -> &'static str {
103 match self {
104 CheckColumnsSatisfyExprsPurpose::ProjectionMustReferenceAggregate => {
105 "Column in SELECT must be in GROUP BY or an aggregate function"
106 }
107 CheckColumnsSatisfyExprsPurpose::HavingMustReferenceAggregate => {
108 "Column in HAVING must be in GROUP BY or an aggregate function"
109 }
110 }
111 }
112
113 fn diagnostic_message(&self, expr: &Expr) -> String {
114 format!("'{expr}' must appear in GROUP BY clause because it's not an aggregate expression")
115 }
116}
117
118pub(crate) fn check_columns_satisfy_exprs(
121 columns: &[Expr],
122 exprs: &[Expr],
123 purpose: CheckColumnsSatisfyExprsPurpose,
124) -> Result<()> {
125 columns.iter().try_for_each(|c| match c {
126 Expr::Column(_) => Ok(()),
127 _ => internal_err!("Expr::Column are required"),
128 })?;
129 let column_exprs = find_column_exprs(exprs);
130 for e in &column_exprs {
131 match e {
132 Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
133 for e in exprs {
134 check_column_satisfies_expr(columns, e, purpose)?;
135 }
136 }
137 Expr::GroupingSet(GroupingSet::Cube(exprs)) => {
138 for e in exprs {
139 check_column_satisfies_expr(columns, e, purpose)?;
140 }
141 }
142 Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => {
143 for exprs in lists_of_exprs {
144 for e in exprs {
145 check_column_satisfies_expr(columns, e, purpose)?;
146 }
147 }
148 }
149 _ => check_column_satisfies_expr(columns, e, purpose)?,
150 }
151 }
152 Ok(())
153}
154
155fn check_column_satisfies_expr(
156 columns: &[Expr],
157 expr: &Expr,
158 purpose: CheckColumnsSatisfyExprsPurpose,
159) -> Result<()> {
160 if !columns.contains(expr) {
161 return plan_err!(
162 "{}: While expanding wildcard, column \"{}\" must appear in the GROUP BY clause or must be part of an aggregate function, currently only \"{}\" appears in the SELECT clause satisfies this requirement",
163 purpose.message_prefix(),
164 expr,
165 expr_vec_fmt!(columns)
166 )
167 .map_err(|err| {
168 let diagnostic = Diagnostic::new_error(
169 purpose.diagnostic_message(expr),
170 expr.spans().and_then(|spans| spans.first()),
171 )
172 .with_help(format!("Either add '{expr}' to GROUP BY clause, or use an aggregare function like ANY_VALUE({expr})"), None);
173 err.with_diagnostic(diagnostic)
174 });
175 }
176 Ok(())
177}
178
179pub(crate) fn extract_aliases(exprs: &[Expr]) -> HashMap<String, Expr> {
182 exprs
183 .iter()
184 .filter_map(|expr| match expr {
185 Expr::Alias(Alias { expr, name, .. }) => Some((name.clone(), *expr.clone())),
186 _ => None,
187 })
188 .collect::<HashMap<String, Expr>>()
189}
190
191pub(crate) fn resolve_positions_to_exprs(
196 expr: Expr,
197 select_exprs: &[Expr],
198) -> Result<Expr> {
199 match expr {
200 Expr::Literal(ScalarValue::Int64(Some(position)))
203 if position > 0_i64 && position <= select_exprs.len() as i64 =>
204 {
205 let index = (position - 1) as usize;
206 let select_expr = &select_exprs[index];
207 Ok(match select_expr {
208 Expr::Alias(Alias { expr, .. }) => *expr.clone(),
209 _ => select_expr.clone(),
210 })
211 }
212 Expr::Literal(ScalarValue::Int64(Some(position))) => plan_err!(
213 "Cannot find column with position {} in SELECT clause. Valid columns: 1 to {}",
214 position, select_exprs.len()
215 ),
216 _ => Ok(expr),
217 }
218}
219
220pub(crate) fn resolve_aliases_to_exprs(
223 expr: Expr,
224 aliases: &HashMap<String, Expr>,
225) -> Result<Expr> {
226 expr.transform_up(|nested_expr| match nested_expr {
227 Expr::Column(c) if c.relation.is_none() => {
228 if let Some(aliased_expr) = aliases.get(&c.name) {
229 Ok(Transformed::yes(aliased_expr.clone()))
230 } else {
231 Ok(Transformed::no(Expr::Column(c)))
232 }
233 }
234 _ => Ok(Transformed::no(nested_expr)),
235 })
236 .data()
237}
238
239pub fn window_expr_common_partition_keys(window_exprs: &[Expr]) -> Result<&[Expr]> {
242 let all_partition_keys = window_exprs
243 .iter()
244 .map(|expr| match expr {
245 Expr::WindowFunction(WindowFunction {
246 params: WindowFunctionParams { partition_by, .. },
247 ..
248 }) => Ok(partition_by),
249 Expr::Alias(Alias { expr, .. }) => match expr.as_ref() {
250 Expr::WindowFunction(WindowFunction {
251 params: WindowFunctionParams { partition_by, .. },
252 ..
253 }) => Ok(partition_by),
254 expr => exec_err!("Impossibly got non-window expr {expr:?}"),
255 },
256 expr => exec_err!("Impossibly got non-window expr {expr:?}"),
257 })
258 .collect::<Result<Vec<_>>>()?;
259 let result = all_partition_keys
260 .iter()
261 .min_by_key(|s| s.len())
262 .ok_or_else(|| {
263 DataFusionError::Execution("No window expressions found".to_owned())
264 })?;
265 Ok(result)
266}
267
268pub(crate) fn make_decimal_type(
271 precision: Option<u64>,
272 scale: Option<u64>,
273) -> Result<DataType> {
274 let (precision, scale) = match (precision, scale) {
276 (Some(p), Some(s)) => (p as u8, s as i8),
277 (Some(p), None) => (p as u8, 0),
278 (None, Some(_)) => {
279 return plan_err!("Cannot specify only scale for decimal data type")
280 }
281 (None, None) => (DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE),
282 };
283
284 if precision == 0
285 || precision > DECIMAL256_MAX_PRECISION
286 || scale.unsigned_abs() > precision
287 {
288 plan_err!(
289 "Decimal(precision = {precision}, scale = {scale}) should satisfy `0 < precision <= 76`, and `scale <= precision`."
290 )
291 } else if precision > DECIMAL128_MAX_PRECISION
292 && precision <= DECIMAL256_MAX_PRECISION
293 {
294 Ok(DataType::Decimal256(precision, scale))
295 } else {
296 Ok(DataType::Decimal128(precision, scale))
297 }
298}
299
300pub(crate) fn normalize_ident(id: Ident) -> String {
302 match id.quote_style {
303 Some(_) => id.value,
304 None => id.value.to_ascii_lowercase(),
305 }
306}
307
308pub(crate) fn value_to_string(value: &Value) -> Option<String> {
309 match value {
310 Value::SingleQuotedString(s) => Some(s.to_string()),
311 Value::DollarQuotedString(s) => Some(s.to_string()),
312 Value::Number(_, _) | Value::Boolean(_) => Some(value.to_string()),
313 Value::UnicodeStringLiteral(s) => Some(s.to_string()),
314 Value::EscapedStringLiteral(s) => Some(s.to_string()),
315 Value::DoubleQuotedString(_)
316 | Value::NationalStringLiteral(_)
317 | Value::SingleQuotedByteStringLiteral(_)
318 | Value::DoubleQuotedByteStringLiteral(_)
319 | Value::TripleSingleQuotedString(_)
320 | Value::TripleDoubleQuotedString(_)
321 | Value::TripleSingleQuotedByteStringLiteral(_)
322 | Value::TripleDoubleQuotedByteStringLiteral(_)
323 | Value::SingleQuotedRawStringLiteral(_)
324 | Value::DoubleQuotedRawStringLiteral(_)
325 | Value::TripleSingleQuotedRawStringLiteral(_)
326 | Value::TripleDoubleQuotedRawStringLiteral(_)
327 | Value::HexStringLiteral(_)
328 | Value::Null
329 | Value::Placeholder(_) => None,
330 }
331}
332
333pub(crate) fn rewrite_recursive_unnests_bottom_up(
334 input: &LogicalPlan,
335 unnest_placeholder_columns: &mut IndexMap<Column, Option<Vec<ColumnUnnestList>>>,
336 inner_projection_exprs: &mut Vec<Expr>,
337 original_exprs: &[Expr],
338) -> Result<Vec<Expr>> {
339 Ok(original_exprs
340 .iter()
341 .map(|expr| {
342 rewrite_recursive_unnest_bottom_up(
343 input,
344 unnest_placeholder_columns,
345 inner_projection_exprs,
346 expr,
347 )
348 })
349 .collect::<Result<Vec<_>>>()?
350 .into_iter()
351 .flatten()
352 .collect::<Vec<_>>())
353}
354
355pub const UNNEST_PLACEHOLDER: &str = "__unnest_placeholder";
356
357struct RecursiveUnnestRewriter<'a> {
362 input_schema: &'a DFSchemaRef,
363 root_expr: &'a Expr,
364 top_most_unnest: Option<Unnest>,
366 consecutive_unnest: Vec<Option<Unnest>>,
367 inner_projection_exprs: &'a mut Vec<Expr>,
368 columns_unnestings: &'a mut IndexMap<Column, Option<Vec<ColumnUnnestList>>>,
369 transformed_root_exprs: Option<Vec<Expr>>,
370}
371impl RecursiveUnnestRewriter<'_> {
372 fn get_latest_consecutive_unnest(&self) -> Vec<Unnest> {
379 self.consecutive_unnest
380 .iter()
381 .rev()
382 .skip_while(|item| item.is_none())
383 .take_while(|item| item.is_some())
384 .to_owned()
385 .cloned()
386 .map(|item| item.unwrap())
387 .collect()
388 }
389
390 fn transform(
391 &mut self,
392 level: usize,
393 alias_name: String,
394 expr_in_unnest: &Expr,
395 struct_allowed: bool,
396 ) -> Result<Vec<Expr>> {
397 let inner_expr_name = expr_in_unnest.schema_name().to_string();
398
399 let placeholder_name = format!("{UNNEST_PLACEHOLDER}({})", inner_expr_name);
403 let post_unnest_name =
404 format!("{UNNEST_PLACEHOLDER}({},depth={})", inner_expr_name, level);
405 let placeholder_column = Column::from_name(placeholder_name.clone());
408
409 let (data_type, _) = expr_in_unnest.data_type_and_nullable(self.input_schema)?;
410
411 match data_type {
412 DataType::Struct(inner_fields) => {
413 if !struct_allowed {
414 return internal_err!("unnest on struct can only be applied at the root level of select expression");
415 }
416 push_projection_dedupl(
417 self.inner_projection_exprs,
418 expr_in_unnest.clone().alias(placeholder_name.clone()),
419 );
420 self.columns_unnestings
421 .insert(Column::from_name(placeholder_name.clone()), None);
422 Ok(
423 get_struct_unnested_columns(&placeholder_name, &inner_fields)
424 .into_iter()
425 .map(Expr::Column)
426 .collect(),
427 )
428 }
429 DataType::List(_)
430 | DataType::FixedSizeList(_, _)
431 | DataType::LargeList(_) => {
432 push_projection_dedupl(
433 self.inner_projection_exprs,
434 expr_in_unnest.clone().alias(placeholder_name.clone()),
435 );
436
437 let post_unnest_expr = col(post_unnest_name.clone()).alias(alias_name);
438 let list_unnesting = self
439 .columns_unnestings
440 .entry(placeholder_column)
441 .or_insert(Some(vec![]));
442 let unnesting = ColumnUnnestList {
443 output_column: Column::from_name(post_unnest_name),
444 depth: level,
445 };
446 let list_unnestings = list_unnesting.as_mut().unwrap();
447 if !list_unnestings.contains(&unnesting) {
448 list_unnestings.push(unnesting);
449 }
450 Ok(vec![post_unnest_expr])
451 }
452 _ => {
453 internal_err!("unnest on non-list or struct type is not supported")
454 }
455 }
456 }
457}
458
459impl TreeNodeRewriter for RecursiveUnnestRewriter<'_> {
460 type Node = Expr;
461
462 fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
467 if let Expr::Unnest(ref unnest_expr) = expr {
468 let (data_type, _) =
469 unnest_expr.expr.data_type_and_nullable(self.input_schema)?;
470 self.consecutive_unnest.push(Some(unnest_expr.clone()));
471 if let DataType::Struct(_) = data_type {
481 self.consecutive_unnest.push(None);
482 }
483 if self.top_most_unnest.is_none() {
484 self.top_most_unnest = Some(unnest_expr.clone());
485 }
486
487 Ok(Transformed::no(expr))
488 } else {
489 self.consecutive_unnest.push(None);
490 Ok(Transformed::no(expr))
491 }
492 }
493
494 fn f_up(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
524 if let Expr::Unnest(ref traversing_unnest) = expr {
525 if traversing_unnest == self.top_most_unnest.as_ref().unwrap() {
526 self.top_most_unnest = None;
527 }
528 let unnest_stack = self.get_latest_consecutive_unnest();
536
537 if traversing_unnest == unnest_stack.last().unwrap() {
543 let most_inner = unnest_stack.first().unwrap();
544 let inner_expr = most_inner.expr.as_ref();
545 let unnest_recursion = unnest_stack.len();
552 let struct_allowed = (&expr == self.root_expr) && unnest_recursion == 1;
553
554 let mut transformed_exprs = self.transform(
555 unnest_recursion,
556 expr.schema_name().to_string(),
557 inner_expr,
558 struct_allowed,
559 )?;
560 if struct_allowed {
561 self.transformed_root_exprs = Some(transformed_exprs.clone());
562 }
563 return Ok(Transformed::new(
564 transformed_exprs.swap_remove(0),
565 true,
566 TreeNodeRecursion::Continue,
567 ));
568 }
569 } else {
570 self.consecutive_unnest.push(None);
571 }
572
573 if matches!(&expr, Expr::Column(_)) && self.top_most_unnest.is_none() {
578 push_projection_dedupl(self.inner_projection_exprs, expr.clone());
579 }
580
581 Ok(Transformed::no(expr))
582 }
583}
584
585fn push_projection_dedupl(projection: &mut Vec<Expr>, expr: Expr) {
586 let schema_name = expr.schema_name().to_string();
587 if !projection
588 .iter()
589 .any(|e| e.schema_name().to_string() == schema_name)
590 {
591 projection.push(expr);
592 }
593}
594pub(crate) fn rewrite_recursive_unnest_bottom_up(
604 input: &LogicalPlan,
605 unnest_placeholder_columns: &mut IndexMap<Column, Option<Vec<ColumnUnnestList>>>,
606 inner_projection_exprs: &mut Vec<Expr>,
607 original_expr: &Expr,
608) -> Result<Vec<Expr>> {
609 let mut rewriter = RecursiveUnnestRewriter {
610 input_schema: input.schema(),
611 root_expr: original_expr,
612 top_most_unnest: None,
613 consecutive_unnest: vec![],
614 inner_projection_exprs,
615 columns_unnestings: unnest_placeholder_columns,
616 transformed_root_exprs: None,
617 };
618
619 let Transformed {
629 data: transformed_expr,
630 transformed,
631 tnr: _,
632 } = original_expr.clone().rewrite(&mut rewriter)?;
633
634 if !transformed {
635 #[expect(deprecated)]
637 if matches!(&transformed_expr, Expr::Column(_))
638 || matches!(&transformed_expr, Expr::Wildcard { .. })
639 {
640 push_projection_dedupl(inner_projection_exprs, transformed_expr.clone());
641 Ok(vec![transformed_expr])
642 } else {
643 let column_name = transformed_expr.schema_name().to_string();
646 push_projection_dedupl(inner_projection_exprs, transformed_expr);
647 Ok(vec![Expr::Column(Column::from_name(column_name))])
648 }
649 } else {
650 if let Some(transformed_root_exprs) = rewriter.transformed_root_exprs {
651 return Ok(transformed_root_exprs);
652 }
653 Ok(vec![transformed_expr])
654 }
655}
656
657#[cfg(test)]
658mod tests {
659 use std::{ops::Add, sync::Arc};
660
661 use arrow::datatypes::{DataType as ArrowDataType, Field, Fields, Schema};
662 use datafusion_common::{Column, DFSchema, Result};
663 use datafusion_expr::{
664 col, lit, unnest, ColumnUnnestList, EmptyRelation, LogicalPlan,
665 };
666 use datafusion_functions::core::expr_ext::FieldAccessor;
667 use datafusion_functions_aggregate::expr_fn::count;
668
669 use crate::utils::{resolve_positions_to_exprs, rewrite_recursive_unnest_bottom_up};
670 use indexmap::IndexMap;
671
672 fn column_unnests_eq(
673 l: Vec<&str>,
674 r: &IndexMap<Column, Option<Vec<ColumnUnnestList>>>,
675 ) {
676 let r_formatted: Vec<String> = r
677 .iter()
678 .map(|i| match i.1 {
679 None => format!("{}", i.0),
680 Some(vec) => format!(
681 "{}=>[{}]",
682 i.0,
683 vec.iter()
684 .map(|i| format!("{}", i))
685 .collect::<Vec<String>>()
686 .join(", ")
687 ),
688 })
689 .collect();
690 let l_formatted: Vec<String> = l.iter().map(|i| i.to_string()).collect();
691 assert_eq!(l_formatted, r_formatted);
692 }
693
694 #[test]
695 fn test_transform_bottom_unnest_recursive() -> Result<()> {
696 let schema = Schema::new(vec![
697 Field::new(
698 "3d_col",
699 ArrowDataType::List(Arc::new(Field::new(
700 "2d_col",
701 ArrowDataType::List(Arc::new(Field::new(
702 "elements",
703 ArrowDataType::Int64,
704 true,
705 ))),
706 true,
707 ))),
708 true,
709 ),
710 Field::new("i64_col", ArrowDataType::Int64, true),
711 ]);
712
713 let dfschema = DFSchema::try_from(schema)?;
714
715 let input = LogicalPlan::EmptyRelation(EmptyRelation {
716 produce_one_row: false,
717 schema: Arc::new(dfschema),
718 });
719
720 let mut unnest_placeholder_columns = IndexMap::new();
721 let mut inner_projection_exprs = vec![];
722
723 let original_expr = unnest(unnest(col("3d_col")))
725 .add(unnest(unnest(col("3d_col"))))
726 .add(col("i64_col"));
727 let transformed_exprs = rewrite_recursive_unnest_bottom_up(
728 &input,
729 &mut unnest_placeholder_columns,
730 &mut inner_projection_exprs,
731 &original_expr,
732 )?;
733 assert_eq!(
735 transformed_exprs,
736 vec![col("__unnest_placeholder(3d_col,depth=2)")
737 .alias("UNNEST(UNNEST(3d_col))")
738 .add(
739 col("__unnest_placeholder(3d_col,depth=2)")
740 .alias("UNNEST(UNNEST(3d_col))")
741 )
742 .add(col("i64_col"))]
743 );
744 column_unnests_eq(
745 vec![
746 "__unnest_placeholder(3d_col)=>[__unnest_placeholder(3d_col,depth=2)|depth=2]",
747 ],
748 &unnest_placeholder_columns,
749 );
750
751 assert_eq!(
754 inner_projection_exprs,
755 vec![
756 col("3d_col").alias("__unnest_placeholder(3d_col)"),
757 col("i64_col")
758 ]
759 );
760
761 let original_expr_2 = unnest(col("3d_col")).alias("2d_col");
763 let transformed_exprs = rewrite_recursive_unnest_bottom_up(
764 &input,
765 &mut unnest_placeholder_columns,
766 &mut inner_projection_exprs,
767 &original_expr_2,
768 )?;
769
770 assert_eq!(
771 transformed_exprs,
772 vec![
773 (col("__unnest_placeholder(3d_col,depth=1)").alias("UNNEST(3d_col)"))
774 .alias("2d_col")
775 ]
776 );
777 column_unnests_eq(
778 vec!["__unnest_placeholder(3d_col)=>[__unnest_placeholder(3d_col,depth=2)|depth=2, __unnest_placeholder(3d_col,depth=1)|depth=1]"],
779 &unnest_placeholder_columns,
780 );
781 assert_eq!(
784 inner_projection_exprs,
785 vec![
786 col("3d_col").alias("__unnest_placeholder(3d_col)"),
787 col("i64_col")
788 ]
789 );
790
791 Ok(())
792 }
793
794 #[test]
795 fn test_transform_bottom_unnest() -> Result<()> {
796 let schema = Schema::new(vec![
797 Field::new(
798 "struct_col",
799 ArrowDataType::Struct(Fields::from(vec![
800 Field::new("field1", ArrowDataType::Int32, false),
801 Field::new("field2", ArrowDataType::Int32, false),
802 ])),
803 false,
804 ),
805 Field::new(
806 "array_col",
807 ArrowDataType::List(Arc::new(Field::new_list_field(
808 ArrowDataType::Int64,
809 true,
810 ))),
811 true,
812 ),
813 Field::new("int_col", ArrowDataType::Int32, false),
814 ]);
815
816 let dfschema = DFSchema::try_from(schema)?;
817
818 let input = LogicalPlan::EmptyRelation(EmptyRelation {
819 produce_one_row: false,
820 schema: Arc::new(dfschema),
821 });
822
823 let mut unnest_placeholder_columns = IndexMap::new();
824 let mut inner_projection_exprs = vec![];
825
826 let original_expr = unnest(col("struct_col"));
828 let transformed_exprs = rewrite_recursive_unnest_bottom_up(
829 &input,
830 &mut unnest_placeholder_columns,
831 &mut inner_projection_exprs,
832 &original_expr,
833 )?;
834 assert_eq!(
835 transformed_exprs,
836 vec![
837 col("__unnest_placeholder(struct_col).field1"),
838 col("__unnest_placeholder(struct_col).field2"),
839 ]
840 );
841 column_unnests_eq(
842 vec!["__unnest_placeholder(struct_col)"],
843 &unnest_placeholder_columns,
844 );
845 assert_eq!(
848 inner_projection_exprs,
849 vec![col("struct_col").alias("__unnest_placeholder(struct_col)"),]
850 );
851
852 let original_expr = unnest(col("array_col")).add(lit(1i64));
854 let transformed_exprs = rewrite_recursive_unnest_bottom_up(
855 &input,
856 &mut unnest_placeholder_columns,
857 &mut inner_projection_exprs,
858 &original_expr,
859 )?;
860 column_unnests_eq(
861 vec![
862 "__unnest_placeholder(struct_col)",
863 "__unnest_placeholder(array_col)=>[__unnest_placeholder(array_col,depth=1)|depth=1]",
864 ],
865 &unnest_placeholder_columns,
866 );
867 assert_eq!(
869 transformed_exprs,
870 vec![col("__unnest_placeholder(array_col,depth=1)")
871 .alias("UNNEST(array_col)")
872 .add(lit(1i64))]
873 );
874
875 assert_eq!(
879 inner_projection_exprs,
880 vec![
881 col("struct_col").alias("__unnest_placeholder(struct_col)"),
882 col("array_col").alias("__unnest_placeholder(array_col)")
883 ]
884 );
885
886 Ok(())
887 }
888
889 #[test]
891 fn test_transform_non_consecutive_unnests() -> Result<()> {
892 let schema = Schema::new(vec![
895 Field::new(
896 "struct_list",
897 ArrowDataType::List(Arc::new(Field::new(
898 "element",
899 ArrowDataType::Struct(Fields::from(vec![
900 Field::new(
901 "subfield1",
903 ArrowDataType::List(Arc::new(Field::new(
904 "i64_element",
905 ArrowDataType::Int64,
906 true,
907 ))),
908 true,
909 ),
910 Field::new(
911 "subfield2",
913 ArrowDataType::List(Arc::new(Field::new(
914 "utf8_element",
915 ArrowDataType::Utf8,
916 true,
917 ))),
918 true,
919 ),
920 ])),
921 true,
922 ))),
923 true,
924 ),
925 Field::new("int_col", ArrowDataType::Int32, false),
926 ]);
927
928 let dfschema = DFSchema::try_from(schema)?;
929
930 let input = LogicalPlan::EmptyRelation(EmptyRelation {
931 produce_one_row: false,
932 schema: Arc::new(dfschema),
933 });
934
935 let mut unnest_placeholder_columns = IndexMap::new();
936 let mut inner_projection_exprs = vec![];
937
938 let select_expr1 = unnest(unnest(col("struct_list")).field("subfield1"));
940 let transformed_exprs = rewrite_recursive_unnest_bottom_up(
941 &input,
942 &mut unnest_placeholder_columns,
943 &mut inner_projection_exprs,
944 &select_expr1,
945 )?;
946 assert_eq!(
948 transformed_exprs,
949 vec![unnest(
950 col("__unnest_placeholder(struct_list,depth=1)")
951 .alias("UNNEST(struct_list)")
952 .field("subfield1")
953 )]
954 );
955
956 column_unnests_eq(
957 vec![
958 "__unnest_placeholder(struct_list)=>[__unnest_placeholder(struct_list,depth=1)|depth=1]",
959 ],
960 &unnest_placeholder_columns,
961 );
962
963 assert_eq!(
964 inner_projection_exprs,
965 vec![col("struct_list").alias("__unnest_placeholder(struct_list)")]
966 );
967
968 let select_expr2 = unnest(unnest(col("struct_list")).field("subfield2"));
970 let transformed_exprs = rewrite_recursive_unnest_bottom_up(
971 &input,
972 &mut unnest_placeholder_columns,
973 &mut inner_projection_exprs,
974 &select_expr2,
975 )?;
976 assert_eq!(
978 transformed_exprs,
979 vec![unnest(
980 col("__unnest_placeholder(struct_list,depth=1)")
981 .alias("UNNEST(struct_list)")
982 .field("subfield2")
983 )]
984 );
985
986 column_unnests_eq(
989 vec![
990 "__unnest_placeholder(struct_list)=>[__unnest_placeholder(struct_list,depth=1)|depth=1]",
991 ],
992 &unnest_placeholder_columns,
993 );
994
995 assert_eq!(
996 inner_projection_exprs,
997 vec![col("struct_list").alias("__unnest_placeholder(struct_list)")]
998 );
999
1000 Ok(())
1001 }
1002
1003 #[test]
1004 fn test_resolve_positions_to_exprs() -> Result<()> {
1005 let select_exprs = vec![col("c1"), col("c2"), count(lit(1))];
1006
1007 let resolved = resolve_positions_to_exprs(lit(1i64), &select_exprs)?;
1009 assert_eq!(resolved, col("c1"));
1010
1011 let resolved = resolve_positions_to_exprs(lit(-1i64), &select_exprs);
1013 assert!(resolved.is_err_and(|e| e.message().contains(
1014 "Cannot find column with position -1 in SELECT clause. Valid columns: 1 to 3"
1015 )));
1016
1017 let resolved = resolve_positions_to_exprs(lit(5i64), &select_exprs);
1018 assert!(resolved.is_err_and(|e| e.message().contains(
1019 "Cannot find column with position 5 in SELECT clause. Valid columns: 1 to 3"
1020 )));
1021
1022 let resolved = resolve_positions_to_exprs(lit("text"), &select_exprs)?;
1024 assert_eq!(resolved, lit("text"));
1025
1026 let resolved = resolve_positions_to_exprs(col("fake"), &select_exprs)?;
1027 assert_eq!(resolved, col("fake"));
1028
1029 Ok(())
1030 }
1031}