1use std::any::Any;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::iter::once;
24use std::sync::Arc;
25
26use crate::dml::CopyTo;
27use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
28use crate::expr_rewriter::{
29 coerce_plan_expr_for_schema, normalize_col,
30 normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
31 rewrite_sort_cols_by_aggs,
32};
33use crate::logical_plan::{
34 Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
35 JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
36 Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
37 Window,
38};
39use crate::select_expr::SelectExpr;
40use crate::utils::{
41 can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
42 expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
43 group_window_expr_by_sort_keys,
44};
45use crate::{
46 and, binary_expr, lit, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery,
47 Statement, TableProviderFilterPushDown, TableSource, WriteOp,
48};
49
50use super::dml::InsertOp;
51use super::plan::{ColumnUnnestList, ExplainFormat};
52use arrow::compute::can_cast_types;
53use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
54use datafusion_common::display::ToStringifiedPlan;
55use datafusion_common::file_options::file_type::FileType;
56use datafusion_common::{
57 exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
58 plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef,
59 DataFusionError, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
60};
61use datafusion_expr_common::type_coercion::binary::type_union_resolution;
62
63use indexmap::IndexSet;
64
65pub const UNNAMED_TABLE: &str = "?table?";
67
68#[derive(Default, Debug, Clone)]
70pub struct LogicalPlanBuilderOptions {
71 add_implicit_group_by_exprs: bool,
74}
75
76impl LogicalPlanBuilderOptions {
77 pub fn new() -> Self {
78 Default::default()
79 }
80
81 pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
83 self.add_implicit_group_by_exprs = add;
84 self
85 }
86}
87
88#[derive(Debug, Clone)]
126pub struct LogicalPlanBuilder {
127 plan: Arc<LogicalPlan>,
128 options: LogicalPlanBuilderOptions,
129}
130
131impl LogicalPlanBuilder {
132 pub fn new(plan: LogicalPlan) -> Self {
134 Self {
135 plan: Arc::new(plan),
136 options: LogicalPlanBuilderOptions::default(),
137 }
138 }
139
140 pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
142 Self {
143 plan,
144 options: LogicalPlanBuilderOptions::default(),
145 }
146 }
147
148 pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
149 self.options = options;
150 self
151 }
152
153 pub fn schema(&self) -> &DFSchemaRef {
155 self.plan.schema()
156 }
157
158 pub fn plan(&self) -> &LogicalPlan {
160 &self.plan
161 }
162
163 pub fn empty(produce_one_row: bool) -> Self {
167 Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
168 produce_one_row,
169 schema: DFSchemaRef::new(DFSchema::empty()),
170 }))
171 }
172
173 pub fn to_recursive_query(
176 self,
177 name: String,
178 recursive_term: LogicalPlan,
179 is_distinct: bool,
180 ) -> Result<Self> {
181 if is_distinct {
183 return not_impl_err!(
184 "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported"
185 );
186 }
187 let static_fields_len = self.plan.schema().fields().len();
189 let recursive_fields_len = recursive_term.schema().fields().len();
190 if static_fields_len != recursive_fields_len {
191 return plan_err!(
192 "Non-recursive term and recursive term must have the same number of columns ({} != {})",
193 static_fields_len, recursive_fields_len
194 );
195 }
196 let coerced_recursive_term =
198 coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
199 Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
200 name,
201 static_term: self.plan,
202 recursive_term: Arc::new(coerced_recursive_term),
203 is_distinct,
204 })))
205 }
206
207 pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
215 if values.is_empty() {
216 return plan_err!("Values list cannot be empty");
217 }
218 let n_cols = values[0].len();
219 if n_cols == 0 {
220 return plan_err!("Values list cannot be zero length");
221 }
222 for (i, row) in values.iter().enumerate() {
223 if row.len() != n_cols {
224 return plan_err!(
225 "Inconsistent data length across values list: got {} values in row {} but expected {}",
226 row.len(),
227 i,
228 n_cols
229 );
230 }
231 }
232
233 Self::infer_data(values)
235 }
236
237 pub fn values_with_schema(
247 values: Vec<Vec<Expr>>,
248 schema: &DFSchemaRef,
249 ) -> Result<Self> {
250 if values.is_empty() {
251 return plan_err!("Values list cannot be empty");
252 }
253 let n_cols = schema.fields().len();
254 if n_cols == 0 {
255 return plan_err!("Values list cannot be zero length");
256 }
257 for (i, row) in values.iter().enumerate() {
258 if row.len() != n_cols {
259 return plan_err!(
260 "Inconsistent data length across values list: got {} values in row {} but expected {}",
261 row.len(),
262 i,
263 n_cols
264 );
265 }
266 }
267
268 Self::infer_values_from_schema(values, schema)
270 }
271
272 fn infer_values_from_schema(
273 values: Vec<Vec<Expr>>,
274 schema: &DFSchema,
275 ) -> Result<Self> {
276 let n_cols = values[0].len();
277 let mut fields = ValuesFields::new();
278 for j in 0..n_cols {
279 let field_type = schema.field(j).data_type();
280 let field_nullable = schema.field(j).is_nullable();
281 for row in values.iter() {
282 let value = &row[j];
283 let data_type = value.get_type(schema)?;
284
285 if !data_type.equals_datatype(field_type) {
286 if can_cast_types(&data_type, field_type) {
287 } else {
288 return exec_err!(
289 "type mismatch and can't cast to got {} and {}",
290 data_type,
291 field_type
292 );
293 }
294 }
295 }
296 fields.push(field_type.to_owned(), field_nullable);
297 }
298
299 Self::infer_inner(values, fields, schema)
300 }
301
302 fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
303 let n_cols = values[0].len();
304 let schema = DFSchema::empty();
305 let mut fields = ValuesFields::new();
306
307 for j in 0..n_cols {
308 let mut common_type: Option<DataType> = None;
309 for (i, row) in values.iter().enumerate() {
310 let value = &row[j];
311 let data_type = value.get_type(&schema)?;
312 if data_type == DataType::Null {
313 continue;
314 }
315
316 if let Some(prev_type) = common_type {
317 let data_types = vec![prev_type.clone(), data_type.clone()];
319 let Some(new_type) = type_union_resolution(&data_types) else {
320 return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
321 };
322 common_type = Some(new_type);
323 } else {
324 common_type = Some(data_type);
325 }
326 }
327 fields.push(common_type.unwrap_or(DataType::Null), true);
330 }
331
332 Self::infer_inner(values, fields, &schema)
333 }
334
335 fn infer_inner(
336 mut values: Vec<Vec<Expr>>,
337 fields: ValuesFields,
338 schema: &DFSchema,
339 ) -> Result<Self> {
340 let fields = fields.into_fields();
341 for row in &mut values {
343 for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
344 if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
345 row[j] = Expr::Literal(
346 ScalarValue::try_from(field_type)?,
347 metadata.clone(),
348 );
349 } else {
350 row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
351 }
352 }
353 }
354
355 let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
356 let schema = DFSchemaRef::new(dfschema);
357
358 Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
359 }
360
361 pub fn scan(
394 table_name: impl Into<TableReference>,
395 table_source: Arc<dyn TableSource>,
396 projection: Option<Vec<usize>>,
397 ) -> Result<Self> {
398 Self::scan_with_filters(table_name, table_source, projection, vec![])
399 }
400
401 pub fn copy_to(
403 input: LogicalPlan,
404 output_url: String,
405 file_type: Arc<dyn FileType>,
406 options: HashMap<String, String>,
407 partition_by: Vec<String>,
408 ) -> Result<Self> {
409 Ok(Self::new(LogicalPlan::Copy(CopyTo {
410 input: Arc::new(input),
411 output_url,
412 partition_by,
413 file_type,
414 options,
415 })))
416 }
417
418 pub fn insert_into(
453 input: LogicalPlan,
454 table_name: impl Into<TableReference>,
455 target: Arc<dyn TableSource>,
456 insert_op: InsertOp,
457 ) -> Result<Self> {
458 Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
459 table_name.into(),
460 target,
461 WriteOp::Insert(insert_op),
462 Arc::new(input),
463 ))))
464 }
465
466 pub fn scan_with_filters(
468 table_name: impl Into<TableReference>,
469 table_source: Arc<dyn TableSource>,
470 projection: Option<Vec<usize>>,
471 filters: Vec<Expr>,
472 ) -> Result<Self> {
473 Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
474 }
475
476 pub fn scan_with_filters_fetch(
478 table_name: impl Into<TableReference>,
479 table_source: Arc<dyn TableSource>,
480 projection: Option<Vec<usize>>,
481 filters: Vec<Expr>,
482 fetch: Option<usize>,
483 ) -> Result<Self> {
484 Self::scan_with_filters_inner(
485 table_name,
486 table_source,
487 projection,
488 filters,
489 fetch,
490 )
491 }
492
493 fn scan_with_filters_inner(
494 table_name: impl Into<TableReference>,
495 table_source: Arc<dyn TableSource>,
496 projection: Option<Vec<usize>>,
497 filters: Vec<Expr>,
498 fetch: Option<usize>,
499 ) -> Result<Self> {
500 let table_scan =
501 TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
502
503 if table_scan.filters.is_empty() {
505 if let Some(p) = table_scan.source.get_logical_plan() {
506 let sub_plan = p.into_owned();
507
508 if let Some(proj) = table_scan.projection {
509 let projection_exprs = proj
510 .into_iter()
511 .map(|i| {
512 Expr::Column(Column::from(
513 sub_plan.schema().qualified_field(i),
514 ))
515 })
516 .collect::<Vec<_>>();
517 return Self::new(sub_plan)
518 .project(projection_exprs)?
519 .alias(table_scan.table_name);
520 }
521
522 return Self::new(sub_plan).alias(table_scan.table_name);
526 }
527 }
528
529 Ok(Self::new(LogicalPlan::TableScan(table_scan)))
530 }
531
532 pub fn window_plan(
534 input: LogicalPlan,
535 window_exprs: impl IntoIterator<Item = Expr>,
536 ) -> Result<LogicalPlan> {
537 let mut plan = input;
538 let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
539 groups.sort_by(|(key_a, _), (key_b, _)| {
545 for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
546 let key_ordering = compare_sort_expr(first, second, plan.schema());
547 match key_ordering {
548 Ordering::Less => {
549 return Ordering::Less;
550 }
551 Ordering::Greater => {
552 return Ordering::Greater;
553 }
554 Ordering::Equal => {}
555 }
556 }
557 key_b.len().cmp(&key_a.len())
558 });
559 for (_, exprs) in groups {
560 let window_exprs = exprs.into_iter().collect::<Vec<_>>();
561 plan = LogicalPlanBuilder::from(plan)
564 .window(window_exprs)?
565 .build()?;
566 }
567 Ok(plan)
568 }
569
570 pub fn project(
572 self,
573 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
574 ) -> Result<Self> {
575 project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
576 }
577
578 pub fn project_with_validation(
581 self,
582 expr: Vec<(impl Into<SelectExpr>, bool)>,
583 ) -> Result<Self> {
584 project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
585 }
586
587 pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
589 let exprs: Vec<_> = indices
590 .into_iter()
591 .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
592 .collect();
593 self.project(exprs)
594 }
595
596 pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
598 let expr = normalize_col(expr.into(), &self.plan)?;
599 Filter::try_new(expr, self.plan)
600 .map(LogicalPlan::Filter)
601 .map(Self::new)
602 }
603
604 pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
606 let expr = normalize_col(expr.into(), &self.plan)?;
607 Filter::try_new(expr, self.plan)
608 .map(LogicalPlan::Filter)
609 .map(Self::from)
610 }
611
612 pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
614 Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
615 Prepare {
616 name,
617 data_types,
618 input: self.plan,
619 },
620 ))))
621 }
622
623 pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
630 let skip_expr = if skip == 0 {
631 None
632 } else {
633 Some(lit(skip as i64))
634 };
635 let fetch_expr = fetch.map(|f| lit(f as i64));
636 self.limit_by_expr(skip_expr, fetch_expr)
637 }
638
639 pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
643 Ok(Self::new(LogicalPlan::Limit(Limit {
644 skip: skip.map(Box::new),
645 fetch: fetch.map(Box::new),
646 input: self.plan,
647 })))
648 }
649
650 pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
652 subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
653 }
654
655 fn add_missing_columns(
684 curr_plan: LogicalPlan,
685 missing_cols: &IndexSet<Column>,
686 is_distinct: bool,
687 ) -> Result<LogicalPlan> {
688 match curr_plan {
689 LogicalPlan::Projection(Projection {
690 input,
691 mut expr,
692 schema: _,
693 }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
694 let mut missing_exprs = missing_cols
695 .iter()
696 .map(|c| normalize_col(Expr::Column(c.clone()), &input))
697 .collect::<Result<Vec<_>>>()?;
698
699 missing_exprs.retain(|e| !expr.contains(e));
703 if is_distinct {
704 Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
705 }
706 expr.extend(missing_exprs);
707 project(Arc::unwrap_or_clone(input), expr)
708 }
709 _ => {
710 let is_distinct =
711 is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
712 let new_inputs = curr_plan
713 .inputs()
714 .into_iter()
715 .map(|input_plan| {
716 Self::add_missing_columns(
717 (*input_plan).clone(),
718 missing_cols,
719 is_distinct,
720 )
721 })
722 .collect::<Result<Vec<_>>>()?;
723 curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
724 }
725 }
726 }
727
728 fn ambiguous_distinct_check(
729 missing_exprs: &[Expr],
730 missing_cols: &IndexSet<Column>,
731 projection_exprs: &[Expr],
732 ) -> Result<()> {
733 if missing_exprs.is_empty() {
734 return Ok(());
735 }
736
737 let all_aliases = missing_exprs.iter().all(|e| {
745 projection_exprs.iter().any(|proj_expr| {
746 if let Expr::Alias(Alias { expr, .. }) = proj_expr {
747 e == expr.as_ref()
748 } else {
749 false
750 }
751 })
752 });
753 if all_aliases {
754 return Ok(());
755 }
756
757 let missing_col_names = missing_cols
758 .iter()
759 .map(|col| col.flat_name())
760 .collect::<String>();
761
762 plan_err!("For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list")
763 }
764
765 pub fn sort_by(
767 self,
768 expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
769 ) -> Result<Self> {
770 self.sort(
771 expr.into_iter()
772 .map(|e| e.into().sort(true, false))
773 .collect::<Vec<SortExpr>>(),
774 )
775 }
776
777 pub fn sort(
778 self,
779 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
780 ) -> Result<Self> {
781 self.sort_with_limit(sorts, None)
782 }
783
784 pub fn sort_with_limit(
786 self,
787 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
788 fetch: Option<usize>,
789 ) -> Result<Self> {
790 let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
791
792 let schema = self.plan.schema();
793
794 let mut missing_cols: IndexSet<Column> = IndexSet::new();
796 sorts.iter().try_for_each::<_, Result<()>>(|sort| {
797 let columns = sort.expr.column_refs();
798
799 missing_cols.extend(
800 columns
801 .into_iter()
802 .filter(|c| !schema.has_column(c))
803 .cloned(),
804 );
805
806 Ok(())
807 })?;
808
809 if missing_cols.is_empty() {
810 return Ok(Self::new(LogicalPlan::Sort(Sort {
811 expr: normalize_sorts(sorts, &self.plan)?,
812 input: self.plan,
813 fetch,
814 })));
815 }
816
817 let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
819
820 let is_distinct = false;
821 let plan = Self::add_missing_columns(
822 Arc::unwrap_or_clone(self.plan),
823 &missing_cols,
824 is_distinct,
825 )?;
826
827 let sort_plan = LogicalPlan::Sort(Sort {
828 expr: normalize_sorts(sorts, &plan)?,
829 input: Arc::new(plan),
830 fetch,
831 });
832
833 Projection::try_new(new_expr, Arc::new(sort_plan))
834 .map(LogicalPlan::Projection)
835 .map(Self::new)
836 }
837
838 pub fn union(self, plan: LogicalPlan) -> Result<Self> {
840 union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
841 }
842
843 pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
845 union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
846 }
847
848 pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
850 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
851 let right_plan: LogicalPlan = plan;
852
853 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
854 union_by_name(left_plan, right_plan)?,
855 )))))
856 }
857
858 pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
860 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
861 let right_plan: LogicalPlan = plan;
862
863 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
864 union(left_plan, right_plan)?,
865 )))))
866 }
867
868 pub fn distinct(self) -> Result<Self> {
870 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
871 }
872
873 pub fn distinct_on(
876 self,
877 on_expr: Vec<Expr>,
878 select_expr: Vec<Expr>,
879 sort_expr: Option<Vec<SortExpr>>,
880 ) -> Result<Self> {
881 Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
882 DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
883 ))))
884 }
885
886 pub fn join(
900 self,
901 right: LogicalPlan,
902 join_type: JoinType,
903 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
904 filter: Option<Expr>,
905 ) -> Result<Self> {
906 self.join_detailed(right, join_type, join_keys, filter, false)
907 }
908
909 pub fn join_on(
950 self,
951 right: LogicalPlan,
952 join_type: JoinType,
953 on_exprs: impl IntoIterator<Item = Expr>,
954 ) -> Result<Self> {
955 let filter = on_exprs.into_iter().reduce(Expr::and);
956
957 self.join_detailed(
958 right,
959 join_type,
960 (Vec::<Column>::new(), Vec::<Column>::new()),
961 filter,
962 false,
963 )
964 }
965
966 pub(crate) fn normalize(
967 plan: &LogicalPlan,
968 column: impl Into<Column>,
969 ) -> Result<Column> {
970 let column = column.into();
971 if column.relation.is_some() {
972 return Ok(column);
974 }
975
976 let schema = plan.schema();
977 let fallback_schemas = plan.fallback_normalize_schemas();
978 let using_columns = plan.using_columns()?;
979 column.normalize_with_schemas_and_ambiguity_check(
980 &[&[schema], &fallback_schemas],
981 &using_columns,
982 )
983 }
984
985 pub fn join_detailed(
994 self,
995 right: LogicalPlan,
996 join_type: JoinType,
997 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
998 filter: Option<Expr>,
999 null_equals_null: bool,
1000 ) -> Result<Self> {
1001 if join_keys.0.len() != join_keys.1.len() {
1002 return plan_err!("left_keys and right_keys were not the same length");
1003 }
1004
1005 let filter = if let Some(expr) = filter {
1006 let filter = normalize_col_with_schemas_and_ambiguity_check(
1007 expr,
1008 &[&[self.schema(), right.schema()]],
1009 &[],
1010 )?;
1011 Some(filter)
1012 } else {
1013 None
1014 };
1015
1016 let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1017 join_keys
1018 .0
1019 .into_iter()
1020 .zip(join_keys.1)
1021 .map(|(l, r)| {
1022 let l = l.into();
1023 let r = r.into();
1024
1025 match (&l.relation, &r.relation) {
1026 (Some(lr), Some(rr)) => {
1027 let l_is_left =
1028 self.plan.schema().field_with_qualified_name(lr, &l.name);
1029 let l_is_right =
1030 right.schema().field_with_qualified_name(lr, &l.name);
1031 let r_is_left =
1032 self.plan.schema().field_with_qualified_name(rr, &r.name);
1033 let r_is_right =
1034 right.schema().field_with_qualified_name(rr, &r.name);
1035
1036 match (l_is_left, l_is_right, r_is_left, r_is_right) {
1037 (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1038 (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1039 _ => (
1040 Self::normalize(&self.plan, l),
1041 Self::normalize(&right, r),
1042 ),
1043 }
1044 }
1045 (Some(lr), None) => {
1046 let l_is_left =
1047 self.plan.schema().field_with_qualified_name(lr, &l.name);
1048 let l_is_right =
1049 right.schema().field_with_qualified_name(lr, &l.name);
1050
1051 match (l_is_left, l_is_right) {
1052 (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1053 (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1054 _ => (
1055 Self::normalize(&self.plan, l),
1056 Self::normalize(&right, r),
1057 ),
1058 }
1059 }
1060 (None, Some(rr)) => {
1061 let r_is_left =
1062 self.plan.schema().field_with_qualified_name(rr, &r.name);
1063 let r_is_right =
1064 right.schema().field_with_qualified_name(rr, &r.name);
1065
1066 match (r_is_left, r_is_right) {
1067 (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1068 (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1069 _ => (
1070 Self::normalize(&self.plan, l),
1071 Self::normalize(&right, r),
1072 ),
1073 }
1074 }
1075 (None, None) => {
1076 let mut swap = false;
1077 let left_key = Self::normalize(&self.plan, l.clone())
1078 .or_else(|_| {
1079 swap = true;
1080 Self::normalize(&right, l)
1081 });
1082 if swap {
1083 (Self::normalize(&self.plan, r), left_key)
1084 } else {
1085 (left_key, Self::normalize(&right, r))
1086 }
1087 }
1088 }
1089 })
1090 .unzip();
1091
1092 let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1093 let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1094
1095 let on: Vec<_> = left_keys
1096 .into_iter()
1097 .zip(right_keys)
1098 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1099 .collect();
1100 let join_schema =
1101 build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1102
1103 if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1105 return plan_err!("join condition should not be empty");
1106 }
1107
1108 Ok(Self::new(LogicalPlan::Join(Join {
1109 left: self.plan,
1110 right: Arc::new(right),
1111 on,
1112 filter,
1113 join_type,
1114 join_constraint: JoinConstraint::On,
1115 schema: DFSchemaRef::new(join_schema),
1116 null_equals_null,
1117 })))
1118 }
1119
1120 pub fn join_using(
1122 self,
1123 right: LogicalPlan,
1124 join_type: JoinType,
1125 using_keys: Vec<impl Into<Column> + Clone>,
1126 ) -> Result<Self> {
1127 let left_keys: Vec<Column> = using_keys
1128 .clone()
1129 .into_iter()
1130 .map(|c| Self::normalize(&self.plan, c))
1131 .collect::<Result<_>>()?;
1132 let right_keys: Vec<Column> = using_keys
1133 .into_iter()
1134 .map(|c| Self::normalize(&right, c))
1135 .collect::<Result<_>>()?;
1136
1137 let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1138 let mut join_on: Vec<(Expr, Expr)> = vec![];
1139 let mut filters: Option<Expr> = None;
1140 for (l, r) in &on {
1141 if self.plan.schema().has_column(l)
1142 && right.schema().has_column(r)
1143 && can_hash(
1144 datafusion_common::ExprSchema::field_from_column(
1145 self.plan.schema(),
1146 l,
1147 )?
1148 .data_type(),
1149 )
1150 {
1151 join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1152 } else if self.plan.schema().has_column(l)
1153 && right.schema().has_column(r)
1154 && can_hash(
1155 datafusion_common::ExprSchema::field_from_column(
1156 self.plan.schema(),
1157 r,
1158 )?
1159 .data_type(),
1160 )
1161 {
1162 join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1163 } else {
1164 let expr = binary_expr(
1165 Expr::Column(l.clone()),
1166 Operator::Eq,
1167 Expr::Column(r.clone()),
1168 );
1169 match filters {
1170 None => filters = Some(expr),
1171 Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1172 }
1173 }
1174 }
1175
1176 if join_on.is_empty() {
1177 let join = Self::from(self.plan).cross_join(right)?;
1178 join.filter(filters.ok_or_else(|| {
1179 DataFusionError::Internal("filters should not be None here".to_string())
1180 })?)
1181 } else {
1182 let join = Join::try_new(
1183 self.plan,
1184 Arc::new(right),
1185 join_on,
1186 filters,
1187 join_type,
1188 JoinConstraint::Using,
1189 false,
1190 )?;
1191
1192 Ok(Self::new(LogicalPlan::Join(join)))
1193 }
1194 }
1195
1196 pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1198 let join = Join::try_new(
1199 self.plan,
1200 Arc::new(right),
1201 vec![],
1202 None,
1203 JoinType::Inner,
1204 JoinConstraint::On,
1205 false,
1206 )?;
1207
1208 Ok(Self::new(LogicalPlan::Join(join)))
1209 }
1210
1211 pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1213 Ok(Self::new(LogicalPlan::Repartition(Repartition {
1214 input: self.plan,
1215 partitioning_scheme,
1216 })))
1217 }
1218
1219 pub fn window(
1221 self,
1222 window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1223 ) -> Result<Self> {
1224 let window_expr = normalize_cols(window_expr, &self.plan)?;
1225 validate_unique_names("Windows", &window_expr)?;
1226 Ok(Self::new(LogicalPlan::Window(Window::try_new(
1227 window_expr,
1228 self.plan,
1229 )?)))
1230 }
1231
1232 pub fn aggregate(
1236 self,
1237 group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1238 aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1239 ) -> Result<Self> {
1240 let group_expr = normalize_cols(group_expr, &self.plan)?;
1241 let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1242
1243 let group_expr = if self.options.add_implicit_group_by_exprs {
1244 add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1245 } else {
1246 group_expr
1247 };
1248
1249 Aggregate::try_new(self.plan, group_expr, aggr_expr)
1250 .map(LogicalPlan::Aggregate)
1251 .map(Self::new)
1252 }
1253
1254 pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1261 let schema = LogicalPlan::explain_schema();
1262 let schema = schema.to_dfschema_ref()?;
1263
1264 if analyze {
1265 Ok(Self::new(LogicalPlan::Analyze(Analyze {
1266 verbose,
1267 input: self.plan,
1268 schema,
1269 })))
1270 } else {
1271 let stringified_plans =
1272 vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1273
1274 Ok(Self::new(LogicalPlan::Explain(Explain {
1275 verbose,
1276 plan: self.plan,
1277 explain_format: ExplainFormat::Indent,
1278 stringified_plans,
1279 schema,
1280 logical_optimization_succeeded: false,
1281 })))
1282 }
1283 }
1284
1285 pub fn intersect(
1287 left_plan: LogicalPlan,
1288 right_plan: LogicalPlan,
1289 is_all: bool,
1290 ) -> Result<LogicalPlan> {
1291 LogicalPlanBuilder::intersect_or_except(
1292 left_plan,
1293 right_plan,
1294 JoinType::LeftSemi,
1295 is_all,
1296 )
1297 }
1298
1299 pub fn except(
1301 left_plan: LogicalPlan,
1302 right_plan: LogicalPlan,
1303 is_all: bool,
1304 ) -> Result<LogicalPlan> {
1305 LogicalPlanBuilder::intersect_or_except(
1306 left_plan,
1307 right_plan,
1308 JoinType::LeftAnti,
1309 is_all,
1310 )
1311 }
1312
1313 fn intersect_or_except(
1315 left_plan: LogicalPlan,
1316 right_plan: LogicalPlan,
1317 join_type: JoinType,
1318 is_all: bool,
1319 ) -> Result<LogicalPlan> {
1320 let left_len = left_plan.schema().fields().len();
1321 let right_len = right_plan.schema().fields().len();
1322
1323 if left_len != right_len {
1324 return plan_err!(
1325 "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1326 );
1327 }
1328
1329 let join_keys = left_plan
1330 .schema()
1331 .fields()
1332 .iter()
1333 .zip(right_plan.schema().fields().iter())
1334 .map(|(left_field, right_field)| {
1335 (
1336 (Column::from_name(left_field.name())),
1337 (Column::from_name(right_field.name())),
1338 )
1339 })
1340 .unzip();
1341 if is_all {
1342 LogicalPlanBuilder::from(left_plan)
1343 .join_detailed(right_plan, join_type, join_keys, None, true)?
1344 .build()
1345 } else {
1346 LogicalPlanBuilder::from(left_plan)
1347 .distinct()?
1348 .join_detailed(right_plan, join_type, join_keys, None, true)?
1349 .build()
1350 }
1351 }
1352
1353 pub fn build(self) -> Result<LogicalPlan> {
1355 Ok(Arc::unwrap_or_clone(self.plan))
1356 }
1357
1358 pub fn join_with_expr_keys(
1373 self,
1374 right: LogicalPlan,
1375 join_type: JoinType,
1376 equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1377 filter: Option<Expr>,
1378 ) -> Result<Self> {
1379 if equi_exprs.0.len() != equi_exprs.1.len() {
1380 return plan_err!("left_keys and right_keys were not the same length");
1381 }
1382
1383 let join_key_pairs = equi_exprs
1384 .0
1385 .into_iter()
1386 .zip(equi_exprs.1)
1387 .map(|(l, r)| {
1388 let left_key = l.into();
1389 let right_key = r.into();
1390 let mut left_using_columns = HashSet::new();
1391 expr_to_columns(&left_key, &mut left_using_columns)?;
1392 let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1393 left_key,
1394 &[&[self.plan.schema()]],
1395 &[],
1396 )?;
1397
1398 let mut right_using_columns = HashSet::new();
1399 expr_to_columns(&right_key, &mut right_using_columns)?;
1400 let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1401 right_key,
1402 &[&[right.schema()]],
1403 &[],
1404 )?;
1405
1406 find_valid_equijoin_key_pair(
1408 &normalized_left_key,
1409 &normalized_right_key,
1410 self.plan.schema(),
1411 right.schema(),
1412 )?.ok_or_else(||
1413 plan_datafusion_err!(
1414 "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1415 ))
1416 })
1417 .collect::<Result<Vec<_>>>()?;
1418
1419 let join = Join::try_new(
1420 self.plan,
1421 Arc::new(right),
1422 join_key_pairs,
1423 filter,
1424 join_type,
1425 JoinConstraint::On,
1426 false,
1427 )?;
1428
1429 Ok(Self::new(LogicalPlan::Join(join)))
1430 }
1431
1432 pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1434 unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1435 }
1436
1437 pub fn unnest_column_with_options(
1439 self,
1440 column: impl Into<Column>,
1441 options: UnnestOptions,
1442 ) -> Result<Self> {
1443 unnest_with_options(
1444 Arc::unwrap_or_clone(self.plan),
1445 vec![column.into()],
1446 options,
1447 )
1448 .map(Self::new)
1449 }
1450
1451 pub fn unnest_columns_with_options(
1453 self,
1454 columns: Vec<Column>,
1455 options: UnnestOptions,
1456 ) -> Result<Self> {
1457 unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1458 .map(Self::new)
1459 }
1460}
1461
1462impl From<LogicalPlan> for LogicalPlanBuilder {
1463 fn from(plan: LogicalPlan) -> Self {
1464 LogicalPlanBuilder::new(plan)
1465 }
1466}
1467
1468impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1469 fn from(plan: Arc<LogicalPlan>) -> Self {
1470 LogicalPlanBuilder::new_from_arc(plan)
1471 }
1472}
1473
1474#[derive(Default)]
1476struct ValuesFields {
1477 inner: Vec<Field>,
1478}
1479
1480impl ValuesFields {
1481 pub fn new() -> Self {
1482 Self::default()
1483 }
1484
1485 pub fn push(&mut self, data_type: DataType, nullable: bool) {
1486 let name = format!("column{}", self.inner.len() + 1);
1489 self.inner.push(Field::new(name, data_type, nullable));
1490 }
1491
1492 pub fn into_fields(self) -> Fields {
1493 self.inner.into()
1494 }
1495}
1496
1497pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1506 let mut name_map = HashMap::new();
1507 let mut seen: HashSet<String> = HashSet::new();
1508
1509 fields
1510 .into_iter()
1511 .map(|field| {
1512 let base_name = field.name();
1513 let count = name_map.entry(base_name.clone()).or_insert(0);
1514 let mut new_name = base_name.clone();
1515
1516 while seen.contains(&new_name) {
1518 *count += 1;
1519 new_name = format!("{base_name}:{count}");
1520 }
1521
1522 seen.insert(new_name.clone());
1523
1524 let mut modified_field =
1525 Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1526 modified_field.set_metadata(field.metadata().clone());
1527 modified_field
1528 })
1529 .collect()
1530}
1531
1532fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1533 let mut table_references = schema
1534 .iter()
1535 .filter_map(|(qualifier, _)| qualifier)
1536 .collect::<Vec<_>>();
1537 table_references.dedup();
1538 let table_reference = if table_references.len() == 1 {
1539 table_references.pop().cloned()
1540 } else {
1541 None
1542 };
1543
1544 (
1545 table_reference,
1546 Arc::new(Field::new("mark", DataType::Boolean, false)),
1547 )
1548}
1549
1550pub fn build_join_schema(
1553 left: &DFSchema,
1554 right: &DFSchema,
1555 join_type: &JoinType,
1556) -> Result<DFSchema> {
1557 fn nullify_fields<'a>(
1558 fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1559 ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1560 fields
1561 .map(|(q, f)| {
1562 let field = f.as_ref().clone().with_nullable(true);
1564 (q.cloned(), Arc::new(field))
1565 })
1566 .collect()
1567 }
1568
1569 let right_fields = right.iter();
1570 let left_fields = left.iter();
1571
1572 let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1573 JoinType::Inner => {
1574 let left_fields = left_fields
1576 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1577 .collect::<Vec<_>>();
1578 let right_fields = right_fields
1579 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1580 .collect::<Vec<_>>();
1581 left_fields.into_iter().chain(right_fields).collect()
1582 }
1583 JoinType::Left => {
1584 let left_fields = left_fields
1586 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1587 .collect::<Vec<_>>();
1588 left_fields
1589 .into_iter()
1590 .chain(nullify_fields(right_fields))
1591 .collect()
1592 }
1593 JoinType::Right => {
1594 let right_fields = right_fields
1596 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1597 .collect::<Vec<_>>();
1598 nullify_fields(left_fields)
1599 .into_iter()
1600 .chain(right_fields)
1601 .collect()
1602 }
1603 JoinType::Full => {
1604 nullify_fields(left_fields)
1606 .into_iter()
1607 .chain(nullify_fields(right_fields))
1608 .collect()
1609 }
1610 JoinType::LeftSemi | JoinType::LeftAnti => {
1611 left_fields
1613 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1614 .collect()
1615 }
1616 JoinType::LeftMark => left_fields
1617 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1618 .chain(once(mark_field(right)))
1619 .collect(),
1620 JoinType::RightSemi | JoinType::RightAnti => {
1621 right_fields
1623 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1624 .collect()
1625 }
1626 };
1627 let func_dependencies = left.functional_dependencies().join(
1628 right.functional_dependencies(),
1629 join_type,
1630 left.fields().len(),
1631 );
1632
1633 let (schema1, schema2) = match join_type {
1634 JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
1635 _ => (right, left),
1636 };
1637
1638 let metadata = schema1
1639 .metadata()
1640 .clone()
1641 .into_iter()
1642 .chain(schema2.metadata().clone())
1643 .collect();
1644
1645 let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1646 dfschema.with_functional_dependencies(func_dependencies)
1647}
1648
1649pub fn add_group_by_exprs_from_dependencies(
1659 mut group_expr: Vec<Expr>,
1660 schema: &DFSchemaRef,
1661) -> Result<Vec<Expr>> {
1662 let mut group_by_field_names = group_expr
1665 .iter()
1666 .map(|e| e.schema_name().to_string())
1667 .collect::<Vec<_>>();
1668
1669 if let Some(target_indices) =
1670 get_target_functional_dependencies(schema, &group_by_field_names)
1671 {
1672 for idx in target_indices {
1673 let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1674 let expr_name = expr.schema_name().to_string();
1675 if !group_by_field_names.contains(&expr_name) {
1676 group_by_field_names.push(expr_name);
1677 group_expr.push(expr);
1678 }
1679 }
1680 }
1681 Ok(group_expr)
1682}
1683
1684pub fn validate_unique_names<'a>(
1686 node_name: &str,
1687 expressions: impl IntoIterator<Item = &'a Expr>,
1688) -> Result<()> {
1689 let mut unique_names = HashMap::new();
1690
1691 expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1692 let name = expr.schema_name().to_string();
1693 match unique_names.get(&name) {
1694 None => {
1695 unique_names.insert(name, (position, expr));
1696 Ok(())
1697 },
1698 Some((existing_position, existing_expr)) => {
1699 plan_err!("{node_name} require unique expression names \
1700 but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1701 at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1702 )
1703 }
1704 }
1705 })
1706}
1707
1708pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1720 Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1721 Arc::new(left_plan),
1722 Arc::new(right_plan),
1723 ])?))
1724}
1725
1726pub fn union_by_name(
1729 left_plan: LogicalPlan,
1730 right_plan: LogicalPlan,
1731) -> Result<LogicalPlan> {
1732 Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1733 Arc::new(left_plan),
1734 Arc::new(right_plan),
1735 ])?))
1736}
1737
1738pub fn project(
1744 plan: LogicalPlan,
1745 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1746) -> Result<LogicalPlan> {
1747 project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1748}
1749
1750fn project_with_validation(
1758 plan: LogicalPlan,
1759 expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1760) -> Result<LogicalPlan> {
1761 let mut projected_expr = vec![];
1762 for (e, validate) in expr {
1763 let e = e.into();
1764 match e {
1765 SelectExpr::Wildcard(opt) => {
1766 let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1767
1768 let expanded = if let Some(replace) = opt.replace {
1771 replace_columns(expanded, &replace)?
1772 } else {
1773 expanded
1774 };
1775
1776 for e in expanded {
1777 if validate {
1778 projected_expr
1779 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1780 } else {
1781 projected_expr.push(e)
1782 }
1783 }
1784 }
1785 SelectExpr::QualifiedWildcard(table_ref, opt) => {
1786 let expanded =
1787 expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1788
1789 let expanded = if let Some(replace) = opt.replace {
1792 replace_columns(expanded, &replace)?
1793 } else {
1794 expanded
1795 };
1796
1797 for e in expanded {
1798 if validate {
1799 projected_expr
1800 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1801 } else {
1802 projected_expr.push(e)
1803 }
1804 }
1805 }
1806 SelectExpr::Expression(e) => {
1807 if validate {
1808 projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1809 } else {
1810 projected_expr.push(e)
1811 }
1812 }
1813 }
1814 }
1815 validate_unique_names("Projections", projected_expr.iter())?;
1816
1817 Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1818}
1819
1820fn replace_columns(
1825 mut exprs: Vec<Expr>,
1826 replace: &PlannedReplaceSelectItem,
1827) -> Result<Vec<Expr>> {
1828 for expr in exprs.iter_mut() {
1829 if let Expr::Column(Column { name, .. }) = expr {
1830 if let Some((_, new_expr)) = replace
1831 .items()
1832 .iter()
1833 .zip(replace.expressions().iter())
1834 .find(|(item, _)| item.column_name.value == *name)
1835 {
1836 *expr = new_expr.clone().alias(name.clone())
1837 }
1838 }
1839 }
1840 Ok(exprs)
1841}
1842
1843pub fn subquery_alias(
1845 plan: LogicalPlan,
1846 alias: impl Into<TableReference>,
1847) -> Result<LogicalPlan> {
1848 SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1849}
1850
1851pub fn table_scan(
1854 name: Option<impl Into<TableReference>>,
1855 table_schema: &Schema,
1856 projection: Option<Vec<usize>>,
1857) -> Result<LogicalPlanBuilder> {
1858 table_scan_with_filters(name, table_schema, projection, vec![])
1859}
1860
1861pub fn table_scan_with_filters(
1865 name: Option<impl Into<TableReference>>,
1866 table_schema: &Schema,
1867 projection: Option<Vec<usize>>,
1868 filters: Vec<Expr>,
1869) -> Result<LogicalPlanBuilder> {
1870 let table_source = table_source(table_schema);
1871 let name = name
1872 .map(|n| n.into())
1873 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1874 LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
1875}
1876
1877pub fn table_scan_with_filter_and_fetch(
1881 name: Option<impl Into<TableReference>>,
1882 table_schema: &Schema,
1883 projection: Option<Vec<usize>>,
1884 filters: Vec<Expr>,
1885 fetch: Option<usize>,
1886) -> Result<LogicalPlanBuilder> {
1887 let table_source = table_source(table_schema);
1888 let name = name
1889 .map(|n| n.into())
1890 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1891 LogicalPlanBuilder::scan_with_filters_fetch(
1892 name,
1893 table_source,
1894 projection,
1895 filters,
1896 fetch,
1897 )
1898}
1899
1900pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
1901 let table_schema = Arc::new(table_schema.clone());
1902 Arc::new(LogicalTableSource {
1903 table_schema,
1904 constraints: Default::default(),
1905 })
1906}
1907
1908pub fn table_source_with_constraints(
1909 table_schema: &Schema,
1910 constraints: Constraints,
1911) -> Arc<dyn TableSource> {
1912 let table_schema = Arc::new(table_schema.clone());
1913 Arc::new(LogicalTableSource {
1914 table_schema,
1915 constraints,
1916 })
1917}
1918
1919pub fn wrap_projection_for_join_if_necessary(
1921 join_keys: &[Expr],
1922 input: LogicalPlan,
1923) -> Result<(LogicalPlan, Vec<Column>, bool)> {
1924 let input_schema = input.schema();
1925 let alias_join_keys: Vec<Expr> = join_keys
1926 .iter()
1927 .map(|key| {
1928 if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
1937 let alias = format!("{key}");
1938 key.clone().alias(alias)
1939 } else {
1940 key.clone()
1941 }
1942 })
1943 .collect::<Vec<_>>();
1944
1945 let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
1946 let plan = if need_project {
1947 let mut projection = input_schema
1949 .columns()
1950 .into_iter()
1951 .map(Expr::Column)
1952 .collect::<Vec<_>>();
1953 let join_key_items = alias_join_keys
1954 .iter()
1955 .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
1956 .cloned()
1957 .collect::<HashSet<Expr>>();
1958 projection.extend(join_key_items);
1959
1960 LogicalPlanBuilder::from(input)
1961 .project(projection.into_iter().map(SelectExpr::from))?
1962 .build()?
1963 } else {
1964 input
1965 };
1966
1967 let join_on = alias_join_keys
1968 .into_iter()
1969 .map(|key| {
1970 if let Some(col) = key.try_as_col() {
1971 Ok(col.clone())
1972 } else {
1973 let name = key.schema_name().to_string();
1974 Ok(Column::from_name(name))
1975 }
1976 })
1977 .collect::<Result<Vec<_>>>()?;
1978
1979 Ok((plan, join_on, need_project))
1980}
1981
1982pub struct LogicalTableSource {
1986 table_schema: SchemaRef,
1987 constraints: Constraints,
1988}
1989
1990impl LogicalTableSource {
1991 pub fn new(table_schema: SchemaRef) -> Self {
1993 Self {
1994 table_schema,
1995 constraints: Constraints::default(),
1996 }
1997 }
1998
1999 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2000 self.constraints = constraints;
2001 self
2002 }
2003}
2004
2005impl TableSource for LogicalTableSource {
2006 fn as_any(&self) -> &dyn Any {
2007 self
2008 }
2009
2010 fn schema(&self) -> SchemaRef {
2011 Arc::clone(&self.table_schema)
2012 }
2013
2014 fn constraints(&self) -> Option<&Constraints> {
2015 Some(&self.constraints)
2016 }
2017
2018 fn supports_filters_pushdown(
2019 &self,
2020 filters: &[&Expr],
2021 ) -> Result<Vec<TableProviderFilterPushDown>> {
2022 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2023 }
2024}
2025
2026pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2028 unnest_with_options(input, columns, UnnestOptions::default())
2029}
2030
2031fn get_unnested_list_datatype_recursive(
2034 data_type: &DataType,
2035 depth: usize,
2036) -> Result<DataType> {
2037 match data_type {
2038 DataType::List(field)
2039 | DataType::FixedSizeList(field, _)
2040 | DataType::LargeList(field) => {
2041 if depth == 1 {
2042 return Ok(field.data_type().clone());
2043 }
2044 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
2045 }
2046 _ => {}
2047 };
2048
2049 internal_err!("trying to unnest on invalid data type {:?}", data_type)
2050}
2051
2052pub fn get_struct_unnested_columns(
2053 col_name: &String,
2054 inner_fields: &Fields,
2055) -> Vec<Column> {
2056 inner_fields
2057 .iter()
2058 .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2059 .collect()
2060}
2061
2062pub fn get_unnested_columns(
2071 col_name: &String,
2072 data_type: &DataType,
2073 depth: usize,
2074) -> Result<Vec<(Column, Arc<Field>)>> {
2075 let mut qualified_columns = Vec::with_capacity(1);
2076
2077 match data_type {
2078 DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
2079 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
2080 let new_field = Arc::new(Field::new(
2081 col_name, data_type,
2082 true,
2085 ));
2086 let column = Column::from_name(col_name);
2087 qualified_columns.push((column, new_field));
2089 }
2090 DataType::Struct(fields) => {
2091 qualified_columns.extend(fields.iter().map(|f| {
2092 let new_name = format!("{}.{}", col_name, f.name());
2093 let column = Column::from_name(&new_name);
2094 let new_field = f.as_ref().clone().with_name(new_name);
2095 (column, Arc::new(new_field))
2097 }))
2098 }
2099 _ => {
2100 return internal_err!(
2101 "trying to unnest on invalid data type {:?}",
2102 data_type
2103 );
2104 }
2105 };
2106 Ok(qualified_columns)
2107}
2108
2109pub fn unnest_with_options(
2139 input: LogicalPlan,
2140 columns_to_unnest: Vec<Column>,
2141 options: UnnestOptions,
2142) -> Result<LogicalPlan> {
2143 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
2144 let mut struct_columns = vec![];
2145 let indices_to_unnest = columns_to_unnest
2146 .iter()
2147 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
2148 .collect::<Result<HashMap<usize, &Column>>>()?;
2149
2150 let input_schema = input.schema();
2151
2152 let mut dependency_indices = vec![];
2153 let fields = input_schema
2169 .iter()
2170 .enumerate()
2171 .map(|(index, (original_qualifier, original_field))| {
2172 match indices_to_unnest.get(&index) {
2173 Some(column_to_unnest) => {
2174 let recursions_on_column = options
2175 .recursions
2176 .iter()
2177 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
2178 .collect::<Vec<_>>();
2179 let mut transformed_columns = recursions_on_column
2180 .iter()
2181 .map(|r| {
2182 list_columns.push((
2183 index,
2184 ColumnUnnestList {
2185 output_column: r.output_column.clone(),
2186 depth: r.depth,
2187 },
2188 ));
2189 Ok(get_unnested_columns(
2190 &r.output_column.name,
2191 original_field.data_type(),
2192 r.depth,
2193 )?
2194 .into_iter()
2195 .next()
2196 .unwrap()) })
2198 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
2199 if transformed_columns.is_empty() {
2200 transformed_columns = get_unnested_columns(
2201 &column_to_unnest.name,
2202 original_field.data_type(),
2203 1,
2204 )?;
2205 match original_field.data_type() {
2206 DataType::Struct(_) => {
2207 struct_columns.push(index);
2208 }
2209 DataType::List(_)
2210 | DataType::FixedSizeList(_, _)
2211 | DataType::LargeList(_) => {
2212 list_columns.push((
2213 index,
2214 ColumnUnnestList {
2215 output_column: Column::from_name(
2216 &column_to_unnest.name,
2217 ),
2218 depth: 1,
2219 },
2220 ));
2221 }
2222 _ => {}
2223 };
2224 }
2225
2226 dependency_indices
2228 .extend(std::iter::repeat_n(index, transformed_columns.len()));
2229 Ok(transformed_columns
2230 .iter()
2231 .map(|(col, field)| (col.relation.to_owned(), field.to_owned()))
2232 .collect())
2233 }
2234 None => {
2235 dependency_indices.push(index);
2236 Ok(vec![(
2237 original_qualifier.cloned(),
2238 Arc::clone(original_field),
2239 )])
2240 }
2241 }
2242 })
2243 .collect::<Result<Vec<_>>>()?
2244 .into_iter()
2245 .flatten()
2246 .collect::<Vec<_>>();
2247
2248 let metadata = input_schema.metadata().clone();
2249 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
2250 let deps = input_schema.functional_dependencies().clone();
2252 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
2253
2254 Ok(LogicalPlan::Unnest(Unnest {
2255 input: Arc::new(input),
2256 exec_columns: columns_to_unnest,
2257 list_type_columns: list_columns,
2258 struct_type_columns: struct_columns,
2259 dependency_indices,
2260 schema,
2261 options,
2262 }))
2263}
2264
2265#[cfg(test)]
2266mod tests {
2267 use super::*;
2268 use crate::logical_plan::StringifiedPlan;
2269 use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2270
2271 use crate::test::function_stub::sum;
2272 use datafusion_common::{Constraint, RecursionUnnestOption, SchemaError};
2273 use insta::assert_snapshot;
2274
2275 #[test]
2276 fn plan_builder_simple() -> Result<()> {
2277 let plan =
2278 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2279 .filter(col("state").eq(lit("CO")))?
2280 .project(vec![col("id")])?
2281 .build()?;
2282
2283 assert_snapshot!(plan, @r#"
2284 Projection: employee_csv.id
2285 Filter: employee_csv.state = Utf8("CO")
2286 TableScan: employee_csv projection=[id, state]
2287 "#);
2288
2289 Ok(())
2290 }
2291
2292 #[test]
2293 fn plan_builder_schema() {
2294 let schema = employee_schema();
2295 let projection = None;
2296 let plan =
2297 LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2298 .unwrap();
2299 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2300
2301 let projection = None;
2304 let plan =
2305 LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2306 .unwrap();
2307 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2308 }
2309
2310 #[test]
2311 fn plan_builder_empty_name() {
2312 let schema = employee_schema();
2313 let projection = None;
2314 let err =
2315 LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2316 assert_snapshot!(
2317 err.strip_backtrace(),
2318 @"Error during planning: table_name cannot be empty"
2319 );
2320 }
2321
2322 #[test]
2323 fn plan_builder_sort() -> Result<()> {
2324 let plan =
2325 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2326 .sort(vec![
2327 expr::Sort::new(col("state"), true, true),
2328 expr::Sort::new(col("salary"), false, false),
2329 ])?
2330 .build()?;
2331
2332 assert_snapshot!(plan, @r"
2333 Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2334 TableScan: employee_csv projection=[state, salary]
2335 ");
2336
2337 Ok(())
2338 }
2339
2340 #[test]
2341 fn plan_builder_union() -> Result<()> {
2342 let plan =
2343 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2344
2345 let plan = plan
2346 .clone()
2347 .union(plan.clone().build()?)?
2348 .union(plan.clone().build()?)?
2349 .union(plan.build()?)?
2350 .build()?;
2351
2352 assert_snapshot!(plan, @r"
2353 Union
2354 Union
2355 Union
2356 TableScan: employee_csv projection=[state, salary]
2357 TableScan: employee_csv projection=[state, salary]
2358 TableScan: employee_csv projection=[state, salary]
2359 TableScan: employee_csv projection=[state, salary]
2360 ");
2361
2362 Ok(())
2363 }
2364
2365 #[test]
2366 fn plan_builder_union_distinct() -> Result<()> {
2367 let plan =
2368 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2369
2370 let plan = plan
2371 .clone()
2372 .union_distinct(plan.clone().build()?)?
2373 .union_distinct(plan.clone().build()?)?
2374 .union_distinct(plan.build()?)?
2375 .build()?;
2376
2377 assert_snapshot!(plan, @r"
2378 Distinct:
2379 Union
2380 Distinct:
2381 Union
2382 Distinct:
2383 Union
2384 TableScan: employee_csv projection=[state, salary]
2385 TableScan: employee_csv projection=[state, salary]
2386 TableScan: employee_csv projection=[state, salary]
2387 TableScan: employee_csv projection=[state, salary]
2388 ");
2389
2390 Ok(())
2391 }
2392
2393 #[test]
2394 fn plan_builder_simple_distinct() -> Result<()> {
2395 let plan =
2396 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2397 .filter(col("state").eq(lit("CO")))?
2398 .project(vec![col("id")])?
2399 .distinct()?
2400 .build()?;
2401
2402 assert_snapshot!(plan, @r#"
2403 Distinct:
2404 Projection: employee_csv.id
2405 Filter: employee_csv.state = Utf8("CO")
2406 TableScan: employee_csv projection=[id, state]
2407 "#);
2408
2409 Ok(())
2410 }
2411
2412 #[test]
2413 fn exists_subquery() -> Result<()> {
2414 let foo = test_table_scan_with_name("foo")?;
2415 let bar = test_table_scan_with_name("bar")?;
2416
2417 let subquery = LogicalPlanBuilder::from(foo)
2418 .project(vec![col("a")])?
2419 .filter(col("a").eq(col("bar.a")))?
2420 .build()?;
2421
2422 let outer_query = LogicalPlanBuilder::from(bar)
2423 .project(vec![col("a")])?
2424 .filter(exists(Arc::new(subquery)))?
2425 .build()?;
2426
2427 assert_snapshot!(outer_query, @r"
2428 Filter: EXISTS (<subquery>)
2429 Subquery:
2430 Filter: foo.a = bar.a
2431 Projection: foo.a
2432 TableScan: foo
2433 Projection: bar.a
2434 TableScan: bar
2435 ");
2436
2437 Ok(())
2438 }
2439
2440 #[test]
2441 fn filter_in_subquery() -> Result<()> {
2442 let foo = test_table_scan_with_name("foo")?;
2443 let bar = test_table_scan_with_name("bar")?;
2444
2445 let subquery = LogicalPlanBuilder::from(foo)
2446 .project(vec![col("a")])?
2447 .filter(col("a").eq(col("bar.a")))?
2448 .build()?;
2449
2450 let outer_query = LogicalPlanBuilder::from(bar)
2452 .project(vec![col("a")])?
2453 .filter(in_subquery(col("a"), Arc::new(subquery)))?
2454 .build()?;
2455
2456 assert_snapshot!(outer_query, @r"
2457 Filter: bar.a IN (<subquery>)
2458 Subquery:
2459 Filter: foo.a = bar.a
2460 Projection: foo.a
2461 TableScan: foo
2462 Projection: bar.a
2463 TableScan: bar
2464 ");
2465
2466 Ok(())
2467 }
2468
2469 #[test]
2470 fn select_scalar_subquery() -> Result<()> {
2471 let foo = test_table_scan_with_name("foo")?;
2472 let bar = test_table_scan_with_name("bar")?;
2473
2474 let subquery = LogicalPlanBuilder::from(foo)
2475 .project(vec![col("b")])?
2476 .filter(col("a").eq(col("bar.a")))?
2477 .build()?;
2478
2479 let outer_query = LogicalPlanBuilder::from(bar)
2481 .project(vec![scalar_subquery(Arc::new(subquery))])?
2482 .build()?;
2483
2484 assert_snapshot!(outer_query, @r"
2485 Projection: (<subquery>)
2486 Subquery:
2487 Filter: foo.a = bar.a
2488 Projection: foo.b
2489 TableScan: foo
2490 TableScan: bar
2491 ");
2492
2493 Ok(())
2494 }
2495
2496 #[test]
2497 fn projection_non_unique_names() -> Result<()> {
2498 let plan = table_scan(
2499 Some("employee_csv"),
2500 &employee_schema(),
2501 Some(vec![0, 1]),
2503 )?
2504 .project(vec![col("id"), col("first_name").alias("id")]);
2506
2507 match plan {
2508 Err(DataFusionError::SchemaError(
2509 SchemaError::AmbiguousReference {
2510 field:
2511 Column {
2512 relation: Some(TableReference::Bare { table }),
2513 name,
2514 spans: _,
2515 },
2516 },
2517 _,
2518 )) => {
2519 assert_eq!(*"employee_csv", *table);
2520 assert_eq!("id", &name);
2521 Ok(())
2522 }
2523 _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2524 }
2525 }
2526
2527 fn employee_schema() -> Schema {
2528 Schema::new(vec![
2529 Field::new("id", DataType::Int32, false),
2530 Field::new("first_name", DataType::Utf8, false),
2531 Field::new("last_name", DataType::Utf8, false),
2532 Field::new("state", DataType::Utf8, false),
2533 Field::new("salary", DataType::Int32, false),
2534 ])
2535 }
2536
2537 #[test]
2538 fn stringified_plan() {
2539 let stringified_plan =
2540 StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2541 assert!(stringified_plan.should_display(true));
2542 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2545 StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2546 assert!(stringified_plan.should_display(true));
2547 assert!(stringified_plan.should_display(false)); let stringified_plan =
2550 StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2551 assert!(stringified_plan.should_display(true));
2552 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2555 StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2556 assert!(stringified_plan.should_display(true));
2557 assert!(stringified_plan.should_display(false)); let stringified_plan = StringifiedPlan::new(
2560 PlanType::OptimizedLogicalPlan {
2561 optimizer_name: "random opt pass".into(),
2562 },
2563 "...the plan...",
2564 );
2565 assert!(stringified_plan.should_display(true));
2566 assert!(!stringified_plan.should_display(false));
2567 }
2568
2569 fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2570 let schema = Schema::new(vec![
2571 Field::new("a", DataType::UInt32, false),
2572 Field::new("b", DataType::UInt32, false),
2573 Field::new("c", DataType::UInt32, false),
2574 ]);
2575 table_scan(Some(name), &schema, None)?.build()
2576 }
2577
2578 #[test]
2579 fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2580 let plan1 =
2581 table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2582 let plan2 =
2583 table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2584
2585 let err_msg1 =
2586 LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2587 .unwrap_err();
2588
2589 assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2590
2591 Ok(())
2592 }
2593
2594 #[test]
2595 fn plan_builder_unnest() -> Result<()> {
2596 let err = nested_table_scan("test_table")?
2598 .unnest_column("scalar")
2599 .unwrap_err();
2600
2601 let DataFusionError::Internal(desc) = err else {
2602 return plan_err!("Plan should have returned an DataFusionError::Internal");
2603 };
2604
2605 let desc = desc
2606 .split(DataFusionError::BACK_TRACE_SEP)
2607 .collect::<Vec<&str>>()
2608 .first()
2609 .unwrap_or(&"")
2610 .to_string();
2611
2612 assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2613
2614 let plan = nested_table_scan("test_table")?
2616 .unnest_column("strings")?
2617 .build()?;
2618
2619 assert_snapshot!(plan, @r"
2620 Unnest: lists[test_table.strings|depth=1] structs[]
2621 TableScan: test_table
2622 ");
2623
2624 let field = plan.schema().field_with_name(None, "strings").unwrap();
2626 assert_eq!(&DataType::Utf8, field.data_type());
2627
2628 let plan = nested_table_scan("test_table")?
2630 .unnest_column("struct_singular")?
2631 .build()?;
2632
2633 assert_snapshot!(plan, @r"
2634 Unnest: lists[] structs[test_table.struct_singular]
2635 TableScan: test_table
2636 ");
2637
2638 for field_name in &["a", "b"] {
2639 let field = plan
2641 .schema()
2642 .field_with_name(None, &format!("struct_singular.{field_name}"))
2643 .unwrap();
2644 assert_eq!(&DataType::UInt32, field.data_type());
2645 }
2646
2647 let plan = nested_table_scan("test_table")?
2649 .unnest_column("strings")?
2650 .unnest_column("structs")?
2651 .unnest_column("struct_singular")?
2652 .build()?;
2653
2654 assert_snapshot!(plan, @r"
2655 Unnest: lists[] structs[test_table.struct_singular]
2656 Unnest: lists[test_table.structs|depth=1] structs[]
2657 Unnest: lists[test_table.strings|depth=1] structs[]
2658 TableScan: test_table
2659 ");
2660
2661 let field = plan.schema().field_with_name(None, "structs").unwrap();
2663 assert!(matches!(field.data_type(), DataType::Struct(_)));
2664
2665 let cols = vec!["strings", "structs", "struct_singular"]
2667 .into_iter()
2668 .map(|c| c.into())
2669 .collect();
2670
2671 let plan = nested_table_scan("test_table")?
2672 .unnest_columns_with_options(cols, UnnestOptions::default())?
2673 .build()?;
2674
2675 assert_snapshot!(plan, @r"
2676 Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2677 TableScan: test_table
2678 ");
2679
2680 let plan = nested_table_scan("test_table")?.unnest_column("missing");
2682 assert!(plan.is_err());
2683
2684 let plan = nested_table_scan("test_table")?
2686 .unnest_columns_with_options(
2687 vec!["stringss".into(), "struct_singular".into()],
2688 UnnestOptions::default()
2689 .with_recursions(RecursionUnnestOption {
2690 input_column: "stringss".into(),
2691 output_column: "stringss_depth_1".into(),
2692 depth: 1,
2693 })
2694 .with_recursions(RecursionUnnestOption {
2695 input_column: "stringss".into(),
2696 output_column: "stringss_depth_2".into(),
2697 depth: 2,
2698 }),
2699 )?
2700 .build()?;
2701
2702 assert_snapshot!(plan, @r"
2703 Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2704 TableScan: test_table
2705 ");
2706
2707 let field = plan
2709 .schema()
2710 .field_with_name(None, "stringss_depth_1")
2711 .unwrap();
2712 assert_eq!(
2713 &DataType::new_list(DataType::Utf8, false),
2714 field.data_type()
2715 );
2716 let field = plan
2717 .schema()
2718 .field_with_name(None, "stringss_depth_2")
2719 .unwrap();
2720 assert_eq!(&DataType::Utf8, field.data_type());
2721 for field_name in &["a", "b"] {
2723 let field = plan
2724 .schema()
2725 .field_with_name(None, &format!("struct_singular.{field_name}"))
2726 .unwrap();
2727 assert_eq!(&DataType::UInt32, field.data_type());
2728 }
2729
2730 Ok(())
2731 }
2732
2733 fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2734 let struct_field_in_list = Field::new_struct(
2737 "item",
2738 vec![
2739 Field::new("a", DataType::UInt32, false),
2740 Field::new("b", DataType::UInt32, false),
2741 ],
2742 false,
2743 );
2744 let string_field = Field::new_list_field(DataType::Utf8, false);
2745 let strings_field = Field::new_list("item", string_field.clone(), false);
2746 let schema = Schema::new(vec![
2747 Field::new("scalar", DataType::UInt32, false),
2748 Field::new_list("strings", string_field, false),
2749 Field::new_list("structs", struct_field_in_list, false),
2750 Field::new(
2751 "struct_singular",
2752 DataType::Struct(Fields::from(vec![
2753 Field::new("a", DataType::UInt32, false),
2754 Field::new("b", DataType::UInt32, false),
2755 ])),
2756 false,
2757 ),
2758 Field::new_list("stringss", strings_field, false),
2759 ]);
2760
2761 table_scan(Some(table_name), &schema, None)
2762 }
2763
2764 #[test]
2765 fn test_union_after_join() -> Result<()> {
2766 let values = vec![vec![lit(1)]];
2767
2768 let left = LogicalPlanBuilder::values(values.clone())?
2769 .alias("left")?
2770 .build()?;
2771 let right = LogicalPlanBuilder::values(values)?
2772 .alias("right")?
2773 .build()?;
2774
2775 let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2776
2777 let plan = LogicalPlanBuilder::from(join.clone())
2778 .union(join)?
2779 .build()?;
2780
2781 assert_snapshot!(plan, @r"
2782 Union
2783 Cross Join:
2784 SubqueryAlias: left
2785 Values: (Int32(1))
2786 SubqueryAlias: right
2787 Values: (Int32(1))
2788 Cross Join:
2789 SubqueryAlias: left
2790 Values: (Int32(1))
2791 SubqueryAlias: right
2792 Values: (Int32(1))
2793 ");
2794
2795 Ok(())
2796 }
2797
2798 #[test]
2799 fn test_change_redundant_column() -> Result<()> {
2800 let t1_field_1 = Field::new("a", DataType::Int32, false);
2801 let t2_field_1 = Field::new("a", DataType::Int32, false);
2802 let t2_field_3 = Field::new("a", DataType::Int32, false);
2803 let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2804 let t1_field_2 = Field::new("b", DataType::Int32, false);
2805 let t2_field_2 = Field::new("b", DataType::Int32, false);
2806
2807 let field_vec = vec![
2808 t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2809 ];
2810 let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2811
2812 assert_eq!(
2813 remove_redundant,
2814 vec![
2815 Field::new("a", DataType::Int32, false),
2816 Field::new("a:1", DataType::Int32, false),
2817 Field::new("b", DataType::Int32, false),
2818 Field::new("b:1", DataType::Int32, false),
2819 Field::new("a:2", DataType::Int32, false),
2820 Field::new("a:1:1", DataType::Int32, false),
2821 ]
2822 );
2823 Ok(())
2824 }
2825
2826 #[test]
2827 fn plan_builder_from_logical_plan() -> Result<()> {
2828 let plan =
2829 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2830 .sort(vec![
2831 expr::Sort::new(col("state"), true, true),
2832 expr::Sort::new(col("salary"), false, false),
2833 ])?
2834 .build()?;
2835
2836 let plan_expected = format!("{plan}");
2837 let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2838 assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2839
2840 Ok(())
2841 }
2842
2843 #[test]
2844 fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2845 let constraints =
2846 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2847 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2848
2849 let plan =
2850 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2851 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2852 .build()?;
2853
2854 assert_snapshot!(plan, @r"
2855 Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2856 TableScan: employee_csv projection=[id, state, salary]
2857 ");
2858
2859 Ok(())
2860 }
2861
2862 #[test]
2863 fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2864 let constraints =
2865 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2866 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2867
2868 let options =
2869 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2870 let plan =
2871 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2872 .with_options(options)
2873 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2874 .build()?;
2875
2876 assert_snapshot!(plan, @r"
2877 Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2878 TableScan: employee_csv projection=[id, state, salary]
2879 ");
2880
2881 Ok(())
2882 }
2883
2884 #[test]
2885 fn test_join_metadata() -> Result<()> {
2886 let left_schema = DFSchema::new_with_metadata(
2887 vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2888 HashMap::from([("key".to_string(), "left".to_string())]),
2889 )?;
2890 let right_schema = DFSchema::new_with_metadata(
2891 vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2892 HashMap::from([("key".to_string(), "right".to_string())]),
2893 )?;
2894
2895 let join_schema =
2896 build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2897 assert_eq!(
2898 join_schema.metadata(),
2899 &HashMap::from([("key".to_string(), "left".to_string())])
2900 );
2901 let join_schema =
2902 build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2903 assert_eq!(
2904 join_schema.metadata(),
2905 &HashMap::from([("key".to_string(), "right".to_string())])
2906 );
2907
2908 Ok(())
2909 }
2910}