1use std::borrow::Cow;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::iter::once;
24use std::sync::Arc;
25
26use crate::dml::CopyTo;
27use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
28use crate::expr_rewriter::{
29 coerce_plan_expr_for_schema, normalize_col,
30 normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
31 rewrite_sort_cols_by_aggs,
32};
33use crate::logical_plan::{
34 Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
35 JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
36 Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
37 Window,
38};
39use crate::select_expr::SelectExpr;
40use crate::utils::{
41 can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
42 expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
43 group_window_expr_by_sort_keys,
44};
45use crate::{
46 DmlStatement, ExplainOption, Expr, ExprSchemable, Operator, RecursiveQuery,
47 Statement, TableProviderFilterPushDown, TableSource, WriteOp, and, binary_expr, lit,
48};
49
50use super::dml::InsertOp;
51use arrow::compute::can_cast_types;
52use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
53use datafusion_common::display::ToStringifiedPlan;
54use datafusion_common::file_options::file_type::FileType;
55use datafusion_common::metadata::FieldMetadata;
56use datafusion_common::{
57 Column, Constraints, DFSchema, DFSchemaRef, NullEquality, Result, ScalarValue,
58 TableReference, ToDFSchema, UnnestOptions, exec_err,
59 get_target_functional_dependencies, internal_datafusion_err, plan_datafusion_err,
60 plan_err,
61};
62use datafusion_expr_common::type_coercion::binary::type_union_resolution;
63
64use indexmap::IndexSet;
65
66pub const UNNAMED_TABLE: &str = "?table?";
68
69#[derive(Default, Debug, Clone)]
71pub struct LogicalPlanBuilderOptions {
72 add_implicit_group_by_exprs: bool,
75}
76
77impl LogicalPlanBuilderOptions {
78 pub fn new() -> Self {
79 Default::default()
80 }
81
82 pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
84 self.add_implicit_group_by_exprs = add;
85 self
86 }
87}
88
89#[derive(Debug, Clone)]
127pub struct LogicalPlanBuilder {
128 plan: Arc<LogicalPlan>,
129 options: LogicalPlanBuilderOptions,
130}
131
132impl LogicalPlanBuilder {
133 pub fn new(plan: LogicalPlan) -> Self {
135 Self {
136 plan: Arc::new(plan),
137 options: LogicalPlanBuilderOptions::default(),
138 }
139 }
140
141 pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
143 Self {
144 plan,
145 options: LogicalPlanBuilderOptions::default(),
146 }
147 }
148
149 pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
150 self.options = options;
151 self
152 }
153
154 pub fn schema(&self) -> &DFSchemaRef {
156 self.plan.schema()
157 }
158
159 pub fn plan(&self) -> &LogicalPlan {
161 &self.plan
162 }
163
164 pub fn empty(produce_one_row: bool) -> Self {
168 Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
169 produce_one_row,
170 schema: DFSchemaRef::new(DFSchema::empty()),
171 }))
172 }
173
174 pub fn to_recursive_query(
177 self,
178 name: String,
179 recursive_term: LogicalPlan,
180 is_distinct: bool,
181 ) -> Result<Self> {
182 let static_fields_len = self.plan.schema().fields().len();
184 let recursive_fields_len = recursive_term.schema().fields().len();
185 if static_fields_len != recursive_fields_len {
186 return plan_err!(
187 "Non-recursive term and recursive term must have the same number of columns ({} != {})",
188 static_fields_len,
189 recursive_fields_len
190 );
191 }
192 let coerced_recursive_term =
194 coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
195 Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
196 name,
197 static_term: self.plan,
198 recursive_term: Arc::new(coerced_recursive_term),
199 is_distinct,
200 })))
201 }
202
203 pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
211 if values.is_empty() {
212 return plan_err!("Values list cannot be empty");
213 }
214 let n_cols = values[0].len();
215 if n_cols == 0 {
216 return plan_err!("Values list cannot be zero length");
217 }
218 for (i, row) in values.iter().enumerate() {
219 if row.len() != n_cols {
220 return plan_err!(
221 "Inconsistent data length across values list: got {} values in row {} but expected {}",
222 row.len(),
223 i,
224 n_cols
225 );
226 }
227 }
228
229 Self::infer_data(values)
231 }
232
233 pub fn values_with_schema(
243 values: Vec<Vec<Expr>>,
244 schema: &DFSchemaRef,
245 ) -> Result<Self> {
246 if values.is_empty() {
247 return plan_err!("Values list cannot be empty");
248 }
249 let n_cols = schema.fields().len();
250 if n_cols == 0 {
251 return plan_err!("Values list cannot be zero length");
252 }
253 for (i, row) in values.iter().enumerate() {
254 if row.len() != n_cols {
255 return plan_err!(
256 "Inconsistent data length across values list: got {} values in row {} but expected {}",
257 row.len(),
258 i,
259 n_cols
260 );
261 }
262 }
263
264 Self::infer_values_from_schema(values, schema)
266 }
267
268 fn infer_values_from_schema(
269 values: Vec<Vec<Expr>>,
270 schema: &DFSchema,
271 ) -> Result<Self> {
272 let n_cols = values[0].len();
273 let mut fields = ValuesFields::new();
274 for j in 0..n_cols {
275 let field_type = schema.field(j).data_type();
276 let field_nullable = schema.field(j).is_nullable();
277 for row in values.iter() {
278 let value = &row[j];
279 let data_type = value.get_type(schema)?;
280
281 if !data_type.equals_datatype(field_type)
282 && !can_cast_types(&data_type, field_type)
283 {
284 return exec_err!(
285 "type mismatch and can't cast to got {} and {}",
286 data_type,
287 field_type
288 );
289 }
290 }
291 fields.push(field_type.to_owned(), field_nullable);
292 }
293
294 Self::infer_inner(values, fields, schema)
295 }
296
297 fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
298 let n_cols = values[0].len();
299 let schema = DFSchema::empty();
300 let mut fields = ValuesFields::new();
301
302 for j in 0..n_cols {
303 let mut common_type: Option<DataType> = None;
304 let mut common_metadata: Option<FieldMetadata> = None;
305 for (i, row) in values.iter().enumerate() {
306 let value = &row[j];
307 let metadata = value.metadata(&schema)?;
308 if let Some(ref cm) = common_metadata {
309 if &metadata != cm {
310 return plan_err!(
311 "Inconsistent metadata across values list at row {i} column {j}. Was {:?} but found {:?}",
312 cm,
313 metadata
314 );
315 }
316 } else {
317 common_metadata = Some(metadata.clone());
318 }
319 let data_type = value.get_type(&schema)?;
320 if data_type == DataType::Null {
321 continue;
322 }
323
324 if let Some(prev_type) = common_type {
325 let data_types = vec![prev_type.clone(), data_type.clone()];
327 let Some(new_type) = type_union_resolution(&data_types) else {
328 return plan_err!(
329 "Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}"
330 );
331 };
332 common_type = Some(new_type);
333 } else {
334 common_type = Some(data_type);
335 }
336 }
337 fields.push_with_metadata(
340 common_type.unwrap_or(DataType::Null),
341 true,
342 common_metadata,
343 );
344 }
345
346 Self::infer_inner(values, fields, &schema)
347 }
348
349 fn infer_inner(
350 mut values: Vec<Vec<Expr>>,
351 fields: ValuesFields,
352 schema: &DFSchema,
353 ) -> Result<Self> {
354 let fields = fields.into_fields();
355 for row in &mut values {
357 for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
358 if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
359 row[j] = Expr::Literal(
360 ScalarValue::try_from(field_type)?,
361 metadata.clone(),
362 );
363 } else {
364 row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
365 }
366 }
367 }
368
369 let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
370 let schema = DFSchemaRef::new(dfschema);
371
372 Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
373 }
374
375 pub fn scan(
408 table_name: impl Into<TableReference>,
409 table_source: Arc<dyn TableSource>,
410 projection: Option<Vec<usize>>,
411 ) -> Result<Self> {
412 Self::scan_with_filters(table_name, table_source, projection, vec![])
413 }
414
415 pub fn copy_to(
417 input: LogicalPlan,
418 output_url: String,
419 file_type: Arc<dyn FileType>,
420 options: HashMap<String, String>,
421 partition_by: Vec<String>,
422 ) -> Result<Self> {
423 Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
424 Arc::new(input),
425 output_url,
426 partition_by,
427 file_type,
428 options,
429 ))))
430 }
431
432 pub fn insert_into(
466 input: LogicalPlan,
467 table_name: impl Into<TableReference>,
468 target: Arc<dyn TableSource>,
469 insert_op: InsertOp,
470 ) -> Result<Self> {
471 Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
472 table_name.into(),
473 target,
474 WriteOp::Insert(insert_op),
475 Arc::new(input),
476 ))))
477 }
478
479 pub fn scan_with_filters(
481 table_name: impl Into<TableReference>,
482 table_source: Arc<dyn TableSource>,
483 projection: Option<Vec<usize>>,
484 filters: Vec<Expr>,
485 ) -> Result<Self> {
486 Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
487 }
488
489 pub fn scan_with_filters_fetch(
491 table_name: impl Into<TableReference>,
492 table_source: Arc<dyn TableSource>,
493 projection: Option<Vec<usize>>,
494 filters: Vec<Expr>,
495 fetch: Option<usize>,
496 ) -> Result<Self> {
497 Self::scan_with_filters_inner(
498 table_name,
499 table_source,
500 projection,
501 filters,
502 fetch,
503 )
504 }
505
506 fn scan_with_filters_inner(
507 table_name: impl Into<TableReference>,
508 table_source: Arc<dyn TableSource>,
509 projection: Option<Vec<usize>>,
510 filters: Vec<Expr>,
511 fetch: Option<usize>,
512 ) -> Result<Self> {
513 let table_scan =
514 TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
515
516 if table_scan.filters.is_empty()
518 && let Some(p) = table_scan.source.get_logical_plan()
519 {
520 let sub_plan = p.into_owned();
521
522 if let Some(proj) = table_scan.projection {
523 let projection_exprs = proj
524 .into_iter()
525 .map(|i| {
526 Expr::Column(Column::from(sub_plan.schema().qualified_field(i)))
527 })
528 .collect::<Vec<_>>();
529 return Self::new(sub_plan)
530 .project(projection_exprs)?
531 .alias(table_scan.table_name);
532 }
533
534 return Self::new(sub_plan).alias(table_scan.table_name);
538 }
539
540 Ok(Self::new(LogicalPlan::TableScan(table_scan)))
541 }
542
543 pub fn window_plan(
545 input: LogicalPlan,
546 window_exprs: impl IntoIterator<Item = Expr>,
547 ) -> Result<LogicalPlan> {
548 let mut plan = input;
549 let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
550 groups.sort_by(|(key_a, _), (key_b, _)| {
556 for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
557 let key_ordering = compare_sort_expr(first, second, plan.schema());
558 match key_ordering {
559 Ordering::Less => {
560 return Ordering::Less;
561 }
562 Ordering::Greater => {
563 return Ordering::Greater;
564 }
565 Ordering::Equal => {}
566 }
567 }
568 key_b.len().cmp(&key_a.len())
569 });
570 for (_, exprs) in groups {
571 let window_exprs = exprs.into_iter().collect::<Vec<_>>();
572 plan = LogicalPlanBuilder::from(plan)
575 .window(window_exprs)?
576 .build()?;
577 }
578 Ok(plan)
579 }
580
581 pub fn project(
583 self,
584 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
585 ) -> Result<Self> {
586 project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
587 }
588
589 pub fn project_with_validation(
592 self,
593 expr: Vec<(impl Into<SelectExpr>, bool)>,
594 ) -> Result<Self> {
595 project_with_validation(Arc::unwrap_or_clone(self.plan), expr, None)
596 .map(Self::new)
597 }
598
599 pub fn project_with_validation_and_schema(
602 self,
603 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
604 schema: &DFSchemaRef,
605 ) -> Result<Self> {
606 project_with_validation(
607 Arc::unwrap_or_clone(self.plan),
608 expr.into_iter().map(|e| (e, true)),
609 Some(schema),
610 )
611 .map(Self::new)
612 }
613
614 pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
616 let exprs: Vec<_> = indices
617 .into_iter()
618 .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
619 .collect();
620 self.project(exprs)
621 }
622
623 pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
625 let expr = normalize_col(expr.into(), &self.plan)?;
626 Filter::try_new(expr, self.plan)
627 .map(LogicalPlan::Filter)
628 .map(Self::new)
629 }
630
631 pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
633 let expr = normalize_col(expr.into(), &self.plan)?;
634 Filter::try_new(expr, self.plan)
635 .map(LogicalPlan::Filter)
636 .map(Self::from)
637 }
638
639 pub fn prepare(self, name: String, fields: Vec<FieldRef>) -> Result<Self> {
641 Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
642 Prepare {
643 name,
644 fields,
645 input: self.plan,
646 },
647 ))))
648 }
649
650 pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
657 let skip_expr = if skip == 0 {
658 None
659 } else {
660 Some(lit(skip as i64))
661 };
662 let fetch_expr = fetch.map(|f| lit(f as i64));
663 self.limit_by_expr(skip_expr, fetch_expr)
664 }
665
666 pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
670 Ok(Self::new(LogicalPlan::Limit(Limit {
671 skip: skip.map(Box::new),
672 fetch: fetch.map(Box::new),
673 input: self.plan,
674 })))
675 }
676
677 pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
679 subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
680 }
681
682 fn add_missing_columns(
711 curr_plan: LogicalPlan,
712 missing_cols: &IndexSet<Column>,
713 is_distinct: bool,
714 ) -> Result<LogicalPlan> {
715 match curr_plan {
716 LogicalPlan::Projection(Projection {
717 input,
718 mut expr,
719 schema: _,
720 }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
721 let mut missing_exprs = missing_cols
722 .iter()
723 .map(|c| normalize_col(Expr::Column(c.clone()), &input))
724 .collect::<Result<Vec<_>>>()?;
725
726 missing_exprs.retain(|e| !expr.contains(e));
730 if is_distinct {
731 Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
732 }
733 expr.extend(missing_exprs);
734 project(Arc::unwrap_or_clone(input), expr)
735 }
736 _ => {
737 let is_distinct =
738 is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
739 let new_inputs = curr_plan
740 .inputs()
741 .into_iter()
742 .map(|input_plan| {
743 Self::add_missing_columns(
744 (*input_plan).clone(),
745 missing_cols,
746 is_distinct,
747 )
748 })
749 .collect::<Result<Vec<_>>>()?;
750 curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
751 }
752 }
753 }
754
755 fn ambiguous_distinct_check(
756 missing_exprs: &[Expr],
757 missing_cols: &IndexSet<Column>,
758 projection_exprs: &[Expr],
759 ) -> Result<()> {
760 if missing_exprs.is_empty() {
761 return Ok(());
762 }
763
764 let all_aliases = missing_exprs.iter().all(|e| {
772 projection_exprs.iter().any(|proj_expr| {
773 if let Expr::Alias(Alias { expr, .. }) = proj_expr {
774 e == expr.as_ref()
775 } else {
776 false
777 }
778 })
779 });
780 if all_aliases {
781 return Ok(());
782 }
783
784 let missing_col_names = missing_cols
785 .iter()
786 .map(|col| col.flat_name())
787 .collect::<String>();
788
789 plan_err!(
790 "For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list"
791 )
792 }
793
794 pub fn sort_by(
796 self,
797 expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
798 ) -> Result<Self> {
799 self.sort(
800 expr.into_iter()
801 .map(|e| e.into().sort(true, false))
802 .collect::<Vec<SortExpr>>(),
803 )
804 }
805
806 pub fn sort(
807 self,
808 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
809 ) -> Result<Self> {
810 self.sort_with_limit(sorts, None)
811 }
812
813 pub fn sort_with_limit(
815 self,
816 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
817 fetch: Option<usize>,
818 ) -> Result<Self> {
819 let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
820
821 let schema = self.plan.schema();
822
823 let mut missing_cols: IndexSet<Column> = IndexSet::new();
825 sorts.iter().try_for_each::<_, Result<()>>(|sort| {
826 let columns = sort.expr.column_refs();
827
828 missing_cols.extend(
829 columns
830 .into_iter()
831 .filter(|c| !schema.has_column(c))
832 .cloned(),
833 );
834
835 Ok(())
836 })?;
837
838 if missing_cols.is_empty() {
839 return Ok(Self::new(LogicalPlan::Sort(Sort {
840 expr: normalize_sorts(sorts, &self.plan)?,
841 input: self.plan,
842 fetch,
843 })));
844 }
845
846 let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
848
849 let is_distinct = false;
850 let plan = Self::add_missing_columns(
851 Arc::unwrap_or_clone(self.plan),
852 &missing_cols,
853 is_distinct,
854 )?;
855
856 let sort_plan = LogicalPlan::Sort(Sort {
857 expr: normalize_sorts(sorts, &plan)?,
858 input: Arc::new(plan),
859 fetch,
860 });
861
862 Projection::try_new(new_expr, Arc::new(sort_plan))
863 .map(LogicalPlan::Projection)
864 .map(Self::new)
865 }
866
867 pub fn union(self, plan: LogicalPlan) -> Result<Self> {
869 union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
870 }
871
872 pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
874 union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
875 }
876
877 pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
879 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
880 let right_plan: LogicalPlan = plan;
881
882 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
883 union_by_name(left_plan, right_plan)?,
884 )))))
885 }
886
887 pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
889 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
890 let right_plan: LogicalPlan = plan;
891
892 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
893 union(left_plan, right_plan)?,
894 )))))
895 }
896
897 pub fn distinct(self) -> Result<Self> {
899 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
900 }
901
902 pub fn distinct_on(
905 self,
906 on_expr: Vec<Expr>,
907 select_expr: Vec<Expr>,
908 sort_expr: Option<Vec<SortExpr>>,
909 ) -> Result<Self> {
910 Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
911 DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
912 ))))
913 }
914
915 pub fn join(
929 self,
930 right: LogicalPlan,
931 join_type: JoinType,
932 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
933 filter: Option<Expr>,
934 ) -> Result<Self> {
935 self.join_detailed(
936 right,
937 join_type,
938 join_keys,
939 filter,
940 NullEquality::NullEqualsNothing,
941 )
942 }
943
944 pub fn join_on(
985 self,
986 right: LogicalPlan,
987 join_type: JoinType,
988 on_exprs: impl IntoIterator<Item = Expr>,
989 ) -> Result<Self> {
990 let filter = on_exprs.into_iter().reduce(Expr::and);
991
992 self.join_detailed(
993 right,
994 join_type,
995 (Vec::<Column>::new(), Vec::<Column>::new()),
996 filter,
997 NullEquality::NullEqualsNothing,
998 )
999 }
1000
1001 pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
1002 if column.relation.is_some() {
1003 return Ok(column);
1005 }
1006
1007 let schema = plan.schema();
1008 let fallback_schemas = plan.fallback_normalize_schemas();
1009 let using_columns = plan.using_columns()?;
1010 column.normalize_with_schemas_and_ambiguity_check(
1011 &[&[schema], &fallback_schemas],
1012 &using_columns,
1013 )
1014 }
1015
1016 pub fn join_detailed(
1023 self,
1024 right: LogicalPlan,
1025 join_type: JoinType,
1026 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1027 filter: Option<Expr>,
1028 null_equality: NullEquality,
1029 ) -> Result<Self> {
1030 self.join_detailed_with_options(
1031 right,
1032 join_type,
1033 join_keys,
1034 filter,
1035 null_equality,
1036 false,
1037 )
1038 }
1039
1040 pub fn join_detailed_with_options(
1041 self,
1042 right: LogicalPlan,
1043 join_type: JoinType,
1044 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1045 filter: Option<Expr>,
1046 null_equality: NullEquality,
1047 null_aware: bool,
1048 ) -> Result<Self> {
1049 if join_keys.0.len() != join_keys.1.len() {
1050 return plan_err!("left_keys and right_keys were not the same length");
1051 }
1052
1053 let filter = if let Some(expr) = filter {
1054 let filter = normalize_col_with_schemas_and_ambiguity_check(
1055 expr,
1056 &[&[self.schema(), right.schema()]],
1057 &[],
1058 )?;
1059 Some(filter)
1060 } else {
1061 None
1062 };
1063
1064 let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1065 join_keys
1066 .0
1067 .into_iter()
1068 .zip(join_keys.1)
1069 .map(|(l, r)| {
1070 let l = l.into();
1071 let r = r.into();
1072
1073 match (&l.relation, &r.relation) {
1074 (Some(lr), Some(rr)) => {
1075 let l_is_left =
1076 self.plan.schema().field_with_qualified_name(lr, &l.name);
1077 let l_is_right =
1078 right.schema().field_with_qualified_name(lr, &l.name);
1079 let r_is_left =
1080 self.plan.schema().field_with_qualified_name(rr, &r.name);
1081 let r_is_right =
1082 right.schema().field_with_qualified_name(rr, &r.name);
1083
1084 match (l_is_left, l_is_right, r_is_left, r_is_right) {
1085 (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1086 (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1087 _ => (
1088 Self::normalize(&self.plan, l),
1089 Self::normalize(&right, r),
1090 ),
1091 }
1092 }
1093 (Some(lr), None) => {
1094 let l_is_left =
1095 self.plan.schema().field_with_qualified_name(lr, &l.name);
1096 let l_is_right =
1097 right.schema().field_with_qualified_name(lr, &l.name);
1098
1099 match (l_is_left, l_is_right) {
1100 (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1101 (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1102 _ => (
1103 Self::normalize(&self.plan, l),
1104 Self::normalize(&right, r),
1105 ),
1106 }
1107 }
1108 (None, Some(rr)) => {
1109 let r_is_left =
1110 self.plan.schema().field_with_qualified_name(rr, &r.name);
1111 let r_is_right =
1112 right.schema().field_with_qualified_name(rr, &r.name);
1113
1114 match (r_is_left, r_is_right) {
1115 (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1116 (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1117 _ => (
1118 Self::normalize(&self.plan, l),
1119 Self::normalize(&right, r),
1120 ),
1121 }
1122 }
1123 (None, None) => {
1124 let mut swap = false;
1125 let left_key = Self::normalize(&self.plan, l.clone())
1126 .or_else(|_| {
1127 swap = true;
1128 Self::normalize(&right, l)
1129 });
1130 if swap {
1131 (Self::normalize(&self.plan, r), left_key)
1132 } else {
1133 (left_key, Self::normalize(&right, r))
1134 }
1135 }
1136 }
1137 })
1138 .unzip();
1139
1140 let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1141 let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1142
1143 let on: Vec<_> = left_keys
1144 .into_iter()
1145 .zip(right_keys)
1146 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1147 .collect();
1148 let join_schema =
1149 build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1150
1151 if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1153 return plan_err!("join condition should not be empty");
1154 }
1155
1156 Ok(Self::new(LogicalPlan::Join(Join {
1157 left: self.plan,
1158 right: Arc::new(right),
1159 on,
1160 filter,
1161 join_type,
1162 join_constraint: JoinConstraint::On,
1163 schema: DFSchemaRef::new(join_schema),
1164 null_equality,
1165 null_aware,
1166 })))
1167 }
1168
1169 pub fn join_using(
1171 self,
1172 right: LogicalPlan,
1173 join_type: JoinType,
1174 using_keys: Vec<Column>,
1175 ) -> Result<Self> {
1176 let left_keys: Vec<Column> = using_keys
1177 .clone()
1178 .into_iter()
1179 .map(|c| Self::normalize(&self.plan, c))
1180 .collect::<Result<_>>()?;
1181 let right_keys: Vec<Column> = using_keys
1182 .into_iter()
1183 .map(|c| Self::normalize(&right, c))
1184 .collect::<Result<_>>()?;
1185
1186 let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1187 let mut join_on: Vec<(Expr, Expr)> = vec![];
1188 let mut filters: Option<Expr> = None;
1189 for (l, r) in &on {
1190 if self.plan.schema().has_column(l)
1191 && right.schema().has_column(r)
1192 && can_hash(
1193 datafusion_common::ExprSchema::field_from_column(
1194 self.plan.schema(),
1195 l,
1196 )?
1197 .data_type(),
1198 )
1199 {
1200 join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1201 } else if self.plan.schema().has_column(l)
1202 && right.schema().has_column(r)
1203 && can_hash(
1204 datafusion_common::ExprSchema::field_from_column(
1205 self.plan.schema(),
1206 r,
1207 )?
1208 .data_type(),
1209 )
1210 {
1211 join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1212 } else {
1213 let expr = binary_expr(
1214 Expr::Column(l.clone()),
1215 Operator::Eq,
1216 Expr::Column(r.clone()),
1217 );
1218 match filters {
1219 None => filters = Some(expr),
1220 Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1221 }
1222 }
1223 }
1224
1225 if join_on.is_empty() {
1226 let join = Self::from(self.plan).cross_join(right)?;
1227 join.filter(filters.ok_or_else(|| {
1228 internal_datafusion_err!("filters should not be None here")
1229 })?)
1230 } else {
1231 let join = Join::try_new(
1232 self.plan,
1233 Arc::new(right),
1234 join_on,
1235 filters,
1236 join_type,
1237 JoinConstraint::Using,
1238 NullEquality::NullEqualsNothing,
1239 false, )?;
1241
1242 Ok(Self::new(LogicalPlan::Join(join)))
1243 }
1244 }
1245
1246 pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1248 let join = Join::try_new(
1249 self.plan,
1250 Arc::new(right),
1251 vec![],
1252 None,
1253 JoinType::Inner,
1254 JoinConstraint::On,
1255 NullEquality::NullEqualsNothing,
1256 false, )?;
1258
1259 Ok(Self::new(LogicalPlan::Join(join)))
1260 }
1261
1262 pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1264 Ok(Self::new(LogicalPlan::Repartition(Repartition {
1265 input: self.plan,
1266 partitioning_scheme,
1267 })))
1268 }
1269
1270 pub fn window(
1272 self,
1273 window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1274 ) -> Result<Self> {
1275 let window_expr = normalize_cols(window_expr, &self.plan)?;
1276 validate_unique_names("Windows", &window_expr)?;
1277 Ok(Self::new(LogicalPlan::Window(Window::try_new(
1278 window_expr,
1279 self.plan,
1280 )?)))
1281 }
1282
1283 pub fn aggregate(
1287 self,
1288 group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1289 aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1290 ) -> Result<Self> {
1291 let group_expr = normalize_cols(group_expr, &self.plan)?;
1292 let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1293
1294 let group_expr = if self.options.add_implicit_group_by_exprs {
1295 add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1296 } else {
1297 group_expr
1298 };
1299
1300 Aggregate::try_new(self.plan, group_expr, aggr_expr)
1301 .map(LogicalPlan::Aggregate)
1302 .map(Self::new)
1303 }
1304
1305 pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1312 self.explain_option_format(
1314 ExplainOption::default()
1315 .with_verbose(verbose)
1316 .with_analyze(analyze),
1317 )
1318 }
1319
1320 pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1324 let schema = LogicalPlan::explain_schema();
1325 let schema = schema.to_dfschema_ref()?;
1326
1327 if explain_option.analyze {
1328 Ok(Self::new(LogicalPlan::Analyze(Analyze {
1329 verbose: explain_option.verbose,
1330 input: self.plan,
1331 schema,
1332 })))
1333 } else {
1334 let stringified_plans =
1335 vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1336
1337 Ok(Self::new(LogicalPlan::Explain(Explain {
1338 verbose: explain_option.verbose,
1339 plan: self.plan,
1340 explain_format: explain_option.format,
1341 stringified_plans,
1342 schema,
1343 logical_optimization_succeeded: false,
1344 })))
1345 }
1346 }
1347
1348 pub fn intersect(
1350 left_plan: LogicalPlan,
1351 right_plan: LogicalPlan,
1352 is_all: bool,
1353 ) -> Result<LogicalPlan> {
1354 LogicalPlanBuilder::intersect_or_except(
1355 left_plan,
1356 right_plan,
1357 JoinType::LeftSemi,
1358 is_all,
1359 )
1360 }
1361
1362 pub fn except(
1364 left_plan: LogicalPlan,
1365 right_plan: LogicalPlan,
1366 is_all: bool,
1367 ) -> Result<LogicalPlan> {
1368 LogicalPlanBuilder::intersect_or_except(
1369 left_plan,
1370 right_plan,
1371 JoinType::LeftAnti,
1372 is_all,
1373 )
1374 }
1375
1376 fn intersect_or_except(
1378 left_plan: LogicalPlan,
1379 right_plan: LogicalPlan,
1380 join_type: JoinType,
1381 is_all: bool,
1382 ) -> Result<LogicalPlan> {
1383 let left_len = left_plan.schema().fields().len();
1384 let right_len = right_plan.schema().fields().len();
1385
1386 if left_len != right_len {
1387 return plan_err!(
1388 "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1389 );
1390 }
1391
1392 let left_builder = LogicalPlanBuilder::from(left_plan);
1395 let right_builder = LogicalPlanBuilder::from(right_plan);
1396 let (left_builder, right_builder, _requalified) =
1397 requalify_sides_if_needed(left_builder, right_builder)?;
1398 let left_plan = left_builder.build()?;
1399 let right_plan = right_builder.build()?;
1400
1401 let join_keys = left_plan
1402 .schema()
1403 .fields()
1404 .iter()
1405 .zip(right_plan.schema().fields().iter())
1406 .map(|(left_field, right_field)| {
1407 (
1408 (Column::from_name(left_field.name())),
1409 (Column::from_name(right_field.name())),
1410 )
1411 })
1412 .unzip();
1413 if is_all {
1414 LogicalPlanBuilder::from(left_plan)
1415 .join_detailed(
1416 right_plan,
1417 join_type,
1418 join_keys,
1419 None,
1420 NullEquality::NullEqualsNull,
1421 )?
1422 .build()
1423 } else {
1424 LogicalPlanBuilder::from(left_plan)
1425 .distinct()?
1426 .join_detailed(
1427 right_plan,
1428 join_type,
1429 join_keys,
1430 None,
1431 NullEquality::NullEqualsNull,
1432 )?
1433 .build()
1434 }
1435 }
1436
1437 pub fn build(self) -> Result<LogicalPlan> {
1439 Ok(Arc::unwrap_or_clone(self.plan))
1440 }
1441
1442 pub fn join_with_expr_keys(
1457 self,
1458 right: LogicalPlan,
1459 join_type: JoinType,
1460 equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1461 filter: Option<Expr>,
1462 ) -> Result<Self> {
1463 if equi_exprs.0.len() != equi_exprs.1.len() {
1464 return plan_err!("left_keys and right_keys were not the same length");
1465 }
1466
1467 let join_key_pairs = equi_exprs
1468 .0
1469 .into_iter()
1470 .zip(equi_exprs.1)
1471 .map(|(l, r)| {
1472 let left_key = l.into();
1473 let right_key = r.into();
1474 let mut left_using_columns = HashSet::new();
1475 expr_to_columns(&left_key, &mut left_using_columns)?;
1476 let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1477 left_key,
1478 &[&[self.plan.schema()]],
1479 &[],
1480 )?;
1481
1482 let mut right_using_columns = HashSet::new();
1483 expr_to_columns(&right_key, &mut right_using_columns)?;
1484 let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1485 right_key,
1486 &[&[right.schema()]],
1487 &[],
1488 )?;
1489
1490 find_valid_equijoin_key_pair(
1492 &normalized_left_key,
1493 &normalized_right_key,
1494 self.plan.schema(),
1495 right.schema(),
1496 )?.ok_or_else(||
1497 plan_datafusion_err!(
1498 "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1499 ))
1500 })
1501 .collect::<Result<Vec<_>>>()?;
1502
1503 let join = Join::try_new(
1504 self.plan,
1505 Arc::new(right),
1506 join_key_pairs,
1507 filter,
1508 join_type,
1509 JoinConstraint::On,
1510 NullEquality::NullEqualsNothing,
1511 false, )?;
1513
1514 Ok(Self::new(LogicalPlan::Join(join)))
1515 }
1516
1517 pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1519 unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1520 }
1521
1522 pub fn unnest_column_with_options(
1524 self,
1525 column: impl Into<Column>,
1526 options: UnnestOptions,
1527 ) -> Result<Self> {
1528 unnest_with_options(
1529 Arc::unwrap_or_clone(self.plan),
1530 vec![column.into()],
1531 options,
1532 )
1533 .map(Self::new)
1534 }
1535
1536 pub fn unnest_columns_with_options(
1538 self,
1539 columns: Vec<Column>,
1540 options: UnnestOptions,
1541 ) -> Result<Self> {
1542 unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1543 .map(Self::new)
1544 }
1545}
1546
1547impl From<LogicalPlan> for LogicalPlanBuilder {
1548 fn from(plan: LogicalPlan) -> Self {
1549 LogicalPlanBuilder::new(plan)
1550 }
1551}
1552
1553impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1554 fn from(plan: Arc<LogicalPlan>) -> Self {
1555 LogicalPlanBuilder::new_from_arc(plan)
1556 }
1557}
1558
1559#[derive(Default)]
1561struct ValuesFields {
1562 inner: Vec<Field>,
1563}
1564
1565impl ValuesFields {
1566 pub fn new() -> Self {
1567 Self::default()
1568 }
1569
1570 pub fn push(&mut self, data_type: DataType, nullable: bool) {
1571 self.push_with_metadata(data_type, nullable, None);
1572 }
1573
1574 pub fn push_with_metadata(
1575 &mut self,
1576 data_type: DataType,
1577 nullable: bool,
1578 metadata: Option<FieldMetadata>,
1579 ) {
1580 let name = format!("column{}", self.inner.len() + 1);
1583 let mut field = Field::new(name, data_type, nullable);
1584 if let Some(metadata) = metadata {
1585 field.set_metadata(metadata.to_hashmap());
1586 }
1587 self.inner.push(field);
1588 }
1589
1590 pub fn into_fields(self) -> Fields {
1591 self.inner.into()
1592 }
1593}
1594
1595pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1607 let mut name_map = HashMap::<&str, usize>::new();
1615 let mut seen = HashSet::<Cow<String>>::new();
1617
1618 fields
1619 .iter()
1620 .map(|field| {
1621 let original_name = field.name();
1622 let mut name = Cow::Borrowed(original_name);
1623
1624 let count = name_map.entry(original_name).or_insert(0);
1625
1626 while seen.contains(&name) {
1628 *count += 1;
1629 name = Cow::Owned(format!("{original_name}:{count}"));
1630 }
1631
1632 seen.insert(name.clone());
1633
1634 match name {
1635 Cow::Borrowed(_) => None,
1636 Cow::Owned(alias) => Some(alias),
1637 }
1638 })
1639 .collect()
1640}
1641
1642fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1643 let mut table_references = schema
1644 .iter()
1645 .filter_map(|(qualifier, _)| qualifier)
1646 .collect::<Vec<_>>();
1647 table_references.dedup();
1648 let table_reference = if table_references.len() == 1 {
1649 table_references.pop().cloned()
1650 } else {
1651 None
1652 };
1653
1654 (
1655 table_reference,
1656 Arc::new(Field::new("mark", DataType::Boolean, false)),
1657 )
1658}
1659
1660pub fn build_join_schema(
1663 left: &DFSchema,
1664 right: &DFSchema,
1665 join_type: &JoinType,
1666) -> Result<DFSchema> {
1667 fn nullify_fields<'a>(
1668 fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1669 ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1670 fields
1671 .map(|(q, f)| {
1672 let field = f.as_ref().clone().with_nullable(true);
1674 (q.cloned(), Arc::new(field))
1675 })
1676 .collect()
1677 }
1678
1679 let right_fields = right.iter();
1680 let left_fields = left.iter();
1681
1682 let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1683 JoinType::Inner => {
1684 let left_fields = left_fields
1686 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1687 .collect::<Vec<_>>();
1688 let right_fields = right_fields
1689 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1690 .collect::<Vec<_>>();
1691 left_fields.into_iter().chain(right_fields).collect()
1692 }
1693 JoinType::Left => {
1694 let left_fields = left_fields
1696 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1697 .collect::<Vec<_>>();
1698 left_fields
1699 .into_iter()
1700 .chain(nullify_fields(right_fields))
1701 .collect()
1702 }
1703 JoinType::Right => {
1704 let right_fields = right_fields
1706 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1707 .collect::<Vec<_>>();
1708 nullify_fields(left_fields)
1709 .into_iter()
1710 .chain(right_fields)
1711 .collect()
1712 }
1713 JoinType::Full => {
1714 nullify_fields(left_fields)
1716 .into_iter()
1717 .chain(nullify_fields(right_fields))
1718 .collect()
1719 }
1720 JoinType::LeftSemi | JoinType::LeftAnti => {
1721 left_fields
1723 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1724 .collect()
1725 }
1726 JoinType::LeftMark => left_fields
1727 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1728 .chain(once(mark_field(right)))
1729 .collect(),
1730 JoinType::RightSemi | JoinType::RightAnti => {
1731 right_fields
1733 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1734 .collect()
1735 }
1736 JoinType::RightMark => right_fields
1737 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1738 .chain(once(mark_field(left)))
1739 .collect(),
1740 };
1741 let func_dependencies = left.functional_dependencies().join(
1742 right.functional_dependencies(),
1743 join_type,
1744 left.fields().len(),
1745 );
1746
1747 let (schema1, schema2) = match join_type {
1748 JoinType::Right
1749 | JoinType::RightSemi
1750 | JoinType::RightAnti
1751 | JoinType::RightMark => (left, right),
1752 _ => (right, left),
1753 };
1754
1755 let metadata = schema1
1756 .metadata()
1757 .clone()
1758 .into_iter()
1759 .chain(schema2.metadata().clone())
1760 .collect();
1761
1762 let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1763 dfschema.with_functional_dependencies(func_dependencies)
1764}
1765
1766pub fn requalify_sides_if_needed(
1776 left: LogicalPlanBuilder,
1777 right: LogicalPlanBuilder,
1778) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1779 let left_cols = left.schema().columns();
1780 let right_cols = right.schema().columns();
1781
1782 for l in &left_cols {
1796 for r in &right_cols {
1797 if l.name != r.name {
1798 continue;
1799 }
1800
1801 match (&l.relation, &r.relation) {
1803 (Some(l_rel), Some(r_rel)) if l_rel == r_rel => {
1805 return Ok((
1806 left.alias(TableReference::bare("left"))?,
1807 right.alias(TableReference::bare("right"))?,
1808 true,
1809 ));
1810 }
1811 (None, None) => {
1813 return Ok((
1814 left.alias(TableReference::bare("left"))?,
1815 right.alias(TableReference::bare("right"))?,
1816 true,
1817 ));
1818 }
1819 (Some(_), None) | (None, Some(_)) => {
1821 return Ok((
1822 left.alias(TableReference::bare("left"))?,
1823 right.alias(TableReference::bare("right"))?,
1824 true,
1825 ));
1826 }
1827 _ => {}
1829 }
1830 }
1831 }
1832
1833 Ok((left, right, false))
1835}
1836pub fn add_group_by_exprs_from_dependencies(
1846 mut group_expr: Vec<Expr>,
1847 schema: &DFSchemaRef,
1848) -> Result<Vec<Expr>> {
1849 let mut group_by_field_names = group_expr
1852 .iter()
1853 .map(|e| e.schema_name().to_string())
1854 .collect::<Vec<_>>();
1855
1856 if let Some(target_indices) =
1857 get_target_functional_dependencies(schema, &group_by_field_names)
1858 {
1859 for idx in target_indices {
1860 let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1861 let expr_name = expr.schema_name().to_string();
1862 if !group_by_field_names.contains(&expr_name) {
1863 group_by_field_names.push(expr_name);
1864 group_expr.push(expr);
1865 }
1866 }
1867 }
1868 Ok(group_expr)
1869}
1870
1871pub fn validate_unique_names<'a>(
1873 node_name: &str,
1874 expressions: impl IntoIterator<Item = &'a Expr>,
1875) -> Result<()> {
1876 let mut unique_names = HashMap::new();
1877
1878 expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1879 let name = expr.schema_name().to_string();
1880 match unique_names.get(&name) {
1881 None => {
1882 unique_names.insert(name, (position, expr));
1883 Ok(())
1884 },
1885 Some((existing_position, existing_expr)) => {
1886 plan_err!("{node_name} require unique expression names \
1887 but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1888 at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1889 )
1890 }
1891 }
1892 })
1893}
1894
1895pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1907 Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1908 Arc::new(left_plan),
1909 Arc::new(right_plan),
1910 ])?))
1911}
1912
1913pub fn union_by_name(
1916 left_plan: LogicalPlan,
1917 right_plan: LogicalPlan,
1918) -> Result<LogicalPlan> {
1919 Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1920 Arc::new(left_plan),
1921 Arc::new(right_plan),
1922 ])?))
1923}
1924
1925pub fn project(
1931 plan: LogicalPlan,
1932 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1933) -> Result<LogicalPlan> {
1934 project_with_validation(plan, expr.into_iter().map(|e| (e, true)), None)
1935}
1936
1937fn project_with_validation(
1945 plan: LogicalPlan,
1946 expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1947 schema: Option<&DFSchemaRef>,
1948) -> Result<LogicalPlan> {
1949 let mut projected_expr = vec![];
1950 let mut has_wildcard = false;
1951 for (e, validate) in expr {
1952 let e = e.into();
1953 match e {
1954 SelectExpr::Wildcard(opt) => {
1955 has_wildcard = true;
1956 let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1957
1958 let expanded = if let Some(replace) = opt.replace {
1961 replace_columns(expanded, &replace)?
1962 } else {
1963 expanded
1964 };
1965
1966 for e in expanded {
1967 if validate {
1968 projected_expr
1969 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1970 } else {
1971 projected_expr.push(e)
1972 }
1973 }
1974 }
1975 SelectExpr::QualifiedWildcard(table_ref, opt) => {
1976 has_wildcard = true;
1977 let expanded =
1978 expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1979
1980 let expanded = if let Some(replace) = opt.replace {
1983 replace_columns(expanded, &replace)?
1984 } else {
1985 expanded
1986 };
1987
1988 for e in expanded {
1989 if validate {
1990 projected_expr
1991 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1992 } else {
1993 projected_expr.push(e)
1994 }
1995 }
1996 }
1997 SelectExpr::Expression(e) => {
1998 if validate {
1999 projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
2000 } else {
2001 projected_expr.push(e)
2002 }
2003 }
2004 }
2005 }
2006
2007 if has_wildcard && projected_expr.is_empty() && !plan.schema().fields().is_empty() {
2008 return plan_err!(
2009 "SELECT list is empty after resolving * expressions, \
2010 the wildcard expanded to zero columns"
2011 );
2012 }
2013
2014 if let Some(schema) = &schema {
2017 for (expr, field) in projected_expr.iter_mut().zip(schema.fields()) {
2018 if !matches!(expr, Expr::Column(_) | Expr::Alias(_)) {
2019 *expr = std::mem::take(expr).alias(field.name());
2020 }
2021 }
2022 }
2023
2024 validate_unique_names("Projections", projected_expr.iter())?;
2025
2026 Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
2027}
2028
2029fn replace_columns(
2034 mut exprs: Vec<Expr>,
2035 replace: &PlannedReplaceSelectItem,
2036) -> Result<Vec<Expr>> {
2037 for expr in exprs.iter_mut() {
2038 if let Expr::Column(Column { name, .. }) = expr
2039 && let Some((_, new_expr)) = replace
2040 .items()
2041 .iter()
2042 .zip(replace.expressions().iter())
2043 .find(|(item, _)| item.column_name.value == *name)
2044 {
2045 *expr = new_expr.clone().alias(name.clone())
2046 }
2047 }
2048 Ok(exprs)
2049}
2050
2051pub fn subquery_alias(
2053 plan: LogicalPlan,
2054 alias: impl Into<TableReference>,
2055) -> Result<LogicalPlan> {
2056 SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
2057}
2058
2059pub fn table_scan(
2062 name: Option<impl Into<TableReference>>,
2063 table_schema: &Schema,
2064 projection: Option<Vec<usize>>,
2065) -> Result<LogicalPlanBuilder> {
2066 table_scan_with_filters(name, table_schema, projection, vec![])
2067}
2068
2069pub fn table_scan_with_filters(
2073 name: Option<impl Into<TableReference>>,
2074 table_schema: &Schema,
2075 projection: Option<Vec<usize>>,
2076 filters: Vec<Expr>,
2077) -> Result<LogicalPlanBuilder> {
2078 let table_source = table_source(table_schema);
2079 let name = name
2080 .map(|n| n.into())
2081 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2082 LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
2083}
2084
2085pub fn table_scan_with_filter_and_fetch(
2089 name: Option<impl Into<TableReference>>,
2090 table_schema: &Schema,
2091 projection: Option<Vec<usize>>,
2092 filters: Vec<Expr>,
2093 fetch: Option<usize>,
2094) -> Result<LogicalPlanBuilder> {
2095 let table_source = table_source(table_schema);
2096 let name = name
2097 .map(|n| n.into())
2098 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2099 LogicalPlanBuilder::scan_with_filters_fetch(
2100 name,
2101 table_source,
2102 projection,
2103 filters,
2104 fetch,
2105 )
2106}
2107
2108pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
2109 let table_schema = Arc::new(table_schema.clone());
2111 Arc::new(LogicalTableSource {
2112 table_schema,
2113 constraints: Default::default(),
2114 })
2115}
2116
2117pub fn table_source_with_constraints(
2118 table_schema: &Schema,
2119 constraints: Constraints,
2120) -> Arc<dyn TableSource> {
2121 let table_schema = Arc::new(table_schema.clone());
2123 Arc::new(LogicalTableSource {
2124 table_schema,
2125 constraints,
2126 })
2127}
2128
2129pub fn wrap_projection_for_join_if_necessary(
2131 join_keys: &[Expr],
2132 input: LogicalPlan,
2133) -> Result<(LogicalPlan, Vec<Column>, bool)> {
2134 let input_schema = input.schema();
2135 let alias_join_keys: Vec<Expr> = join_keys
2136 .iter()
2137 .map(|key| {
2138 if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
2147 let alias = format!("{key}");
2148 key.clone().alias(alias)
2149 } else {
2150 key.clone()
2151 }
2152 })
2153 .collect::<Vec<_>>();
2154
2155 let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2156 let plan = if need_project {
2157 let mut projection = input_schema
2159 .columns()
2160 .into_iter()
2161 .map(Expr::Column)
2162 .collect::<Vec<_>>();
2163 #[allow(clippy::allow_attributes, clippy::mutable_key_type)]
2164 let join_key_items = alias_join_keys
2166 .iter()
2167 .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2168 .cloned()
2169 .collect::<HashSet<Expr>>();
2170 projection.extend(join_key_items);
2171
2172 LogicalPlanBuilder::from(input)
2173 .project(projection.into_iter().map(SelectExpr::from))?
2174 .build()?
2175 } else {
2176 input
2177 };
2178
2179 let join_on = alias_join_keys
2180 .into_iter()
2181 .map(|key| {
2182 if let Some(col) = key.try_as_col() {
2183 Ok(col.clone())
2184 } else {
2185 let name = key.schema_name().to_string();
2186 Ok(Column::from_name(name))
2187 }
2188 })
2189 .collect::<Result<Vec<_>>>()?;
2190
2191 Ok((plan, join_on, need_project))
2192}
2193
2194pub struct LogicalTableSource {
2198 table_schema: SchemaRef,
2199 constraints: Constraints,
2200}
2201
2202impl LogicalTableSource {
2203 pub fn new(table_schema: SchemaRef) -> Self {
2205 Self {
2206 table_schema,
2207 constraints: Constraints::default(),
2208 }
2209 }
2210
2211 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2212 self.constraints = constraints;
2213 self
2214 }
2215}
2216
2217impl TableSource for LogicalTableSource {
2218 fn schema(&self) -> SchemaRef {
2219 Arc::clone(&self.table_schema)
2220 }
2221
2222 fn constraints(&self) -> Option<&Constraints> {
2223 Some(&self.constraints)
2224 }
2225
2226 fn supports_filters_pushdown(
2227 &self,
2228 filters: &[&Expr],
2229 ) -> Result<Vec<TableProviderFilterPushDown>> {
2230 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2231 }
2232}
2233
2234pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2236 unnest_with_options(input, columns, UnnestOptions::default())
2237}
2238
2239pub fn get_struct_unnested_columns(
2240 col_name: &String,
2241 inner_fields: &Fields,
2242) -> Vec<Column> {
2243 inner_fields
2244 .iter()
2245 .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2246 .collect()
2247}
2248
2249pub fn unnest_with_options(
2279 input: LogicalPlan,
2280 columns_to_unnest: Vec<Column>,
2281 options: UnnestOptions,
2282) -> Result<LogicalPlan> {
2283 Ok(LogicalPlan::Unnest(Unnest::try_new(
2284 Arc::new(input),
2285 columns_to_unnest,
2286 options,
2287 )?))
2288}
2289
2290#[cfg(test)]
2291mod tests {
2292 use std::vec;
2293
2294 use super::*;
2295 use crate::lit_with_metadata;
2296 use crate::logical_plan::StringifiedPlan;
2297 use crate::{col, expr, expr_fn::exists, in_subquery, scalar_subquery};
2298
2299 use crate::test::function_stub::sum;
2300 use datafusion_common::{
2301 Constraint, DataFusionError, RecursionUnnestOption, SchemaError,
2302 };
2303 use insta::assert_snapshot;
2304
2305 #[test]
2306 fn plan_builder_simple() -> Result<()> {
2307 let plan =
2308 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2309 .filter(col("state").eq(lit("CO")))?
2310 .project(vec![col("id")])?
2311 .build()?;
2312
2313 assert_snapshot!(plan, @r#"
2314 Projection: employee_csv.id
2315 Filter: employee_csv.state = Utf8("CO")
2316 TableScan: employee_csv projection=[id, state]
2317 "#);
2318
2319 Ok(())
2320 }
2321
2322 #[test]
2323 fn plan_builder_schema() {
2324 let schema = employee_schema();
2325 let projection = None;
2326 let plan =
2327 LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2328 .unwrap();
2329 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2330
2331 let projection = None;
2334 let plan =
2335 LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2336 .unwrap();
2337 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2338 }
2339
2340 #[test]
2341 fn plan_builder_empty_name() {
2342 let schema = employee_schema();
2343 let projection = None;
2344 let err =
2345 LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2346 assert_snapshot!(
2347 err.strip_backtrace(),
2348 @"Error during planning: table_name cannot be empty"
2349 );
2350 }
2351
2352 #[test]
2353 fn plan_builder_sort() -> Result<()> {
2354 let plan =
2355 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2356 .sort(vec![
2357 expr::Sort::new(col("state"), true, true),
2358 expr::Sort::new(col("salary"), false, false),
2359 ])?
2360 .build()?;
2361
2362 assert_snapshot!(plan, @r"
2363 Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2364 TableScan: employee_csv projection=[state, salary]
2365 ");
2366
2367 Ok(())
2368 }
2369
2370 #[test]
2371 fn plan_builder_union() -> Result<()> {
2372 let plan =
2373 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2374
2375 let plan = plan
2376 .clone()
2377 .union(plan.clone().build()?)?
2378 .union(plan.clone().build()?)?
2379 .union(plan.build()?)?
2380 .build()?;
2381
2382 assert_snapshot!(plan, @r"
2383 Union
2384 Union
2385 Union
2386 TableScan: employee_csv projection=[state, salary]
2387 TableScan: employee_csv projection=[state, salary]
2388 TableScan: employee_csv projection=[state, salary]
2389 TableScan: employee_csv projection=[state, salary]
2390 ");
2391
2392 Ok(())
2393 }
2394
2395 #[test]
2396 fn plan_builder_union_distinct() -> Result<()> {
2397 let plan =
2398 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2399
2400 let plan = plan
2401 .clone()
2402 .union_distinct(plan.clone().build()?)?
2403 .union_distinct(plan.clone().build()?)?
2404 .union_distinct(plan.build()?)?
2405 .build()?;
2406
2407 assert_snapshot!(plan, @r"
2408 Distinct:
2409 Union
2410 Distinct:
2411 Union
2412 Distinct:
2413 Union
2414 TableScan: employee_csv projection=[state, salary]
2415 TableScan: employee_csv projection=[state, salary]
2416 TableScan: employee_csv projection=[state, salary]
2417 TableScan: employee_csv projection=[state, salary]
2418 ");
2419
2420 Ok(())
2421 }
2422
2423 #[test]
2424 fn plan_builder_simple_distinct() -> Result<()> {
2425 let plan =
2426 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2427 .filter(col("state").eq(lit("CO")))?
2428 .project(vec![col("id")])?
2429 .distinct()?
2430 .build()?;
2431
2432 assert_snapshot!(plan, @r#"
2433 Distinct:
2434 Projection: employee_csv.id
2435 Filter: employee_csv.state = Utf8("CO")
2436 TableScan: employee_csv projection=[id, state]
2437 "#);
2438
2439 Ok(())
2440 }
2441
2442 #[test]
2443 fn exists_subquery() -> Result<()> {
2444 let foo = test_table_scan_with_name("foo")?;
2445 let bar = test_table_scan_with_name("bar")?;
2446
2447 let subquery = LogicalPlanBuilder::from(foo)
2448 .project(vec![col("a")])?
2449 .filter(col("a").eq(col("bar.a")))?
2450 .build()?;
2451
2452 let outer_query = LogicalPlanBuilder::from(bar)
2453 .project(vec![col("a")])?
2454 .filter(exists(Arc::new(subquery)))?
2455 .build()?;
2456
2457 assert_snapshot!(outer_query, @r"
2458 Filter: EXISTS (<subquery>)
2459 Subquery:
2460 Filter: foo.a = bar.a
2461 Projection: foo.a
2462 TableScan: foo
2463 Projection: bar.a
2464 TableScan: bar
2465 ");
2466
2467 Ok(())
2468 }
2469
2470 #[test]
2471 fn filter_in_subquery() -> Result<()> {
2472 let foo = test_table_scan_with_name("foo")?;
2473 let bar = test_table_scan_with_name("bar")?;
2474
2475 let subquery = LogicalPlanBuilder::from(foo)
2476 .project(vec![col("a")])?
2477 .filter(col("a").eq(col("bar.a")))?
2478 .build()?;
2479
2480 let outer_query = LogicalPlanBuilder::from(bar)
2482 .project(vec![col("a")])?
2483 .filter(in_subquery(col("a"), Arc::new(subquery)))?
2484 .build()?;
2485
2486 assert_snapshot!(outer_query, @r"
2487 Filter: bar.a IN (<subquery>)
2488 Subquery:
2489 Filter: foo.a = bar.a
2490 Projection: foo.a
2491 TableScan: foo
2492 Projection: bar.a
2493 TableScan: bar
2494 ");
2495
2496 Ok(())
2497 }
2498
2499 #[test]
2500 fn select_scalar_subquery() -> Result<()> {
2501 let foo = test_table_scan_with_name("foo")?;
2502 let bar = test_table_scan_with_name("bar")?;
2503
2504 let subquery = LogicalPlanBuilder::from(foo)
2505 .project(vec![col("b")])?
2506 .filter(col("a").eq(col("bar.a")))?
2507 .build()?;
2508
2509 let outer_query = LogicalPlanBuilder::from(bar)
2511 .project(vec![scalar_subquery(Arc::new(subquery))])?
2512 .build()?;
2513
2514 assert_snapshot!(outer_query, @r"
2515 Projection: (<subquery>)
2516 Subquery:
2517 Filter: foo.a = bar.a
2518 Projection: foo.b
2519 TableScan: foo
2520 TableScan: bar
2521 ");
2522
2523 Ok(())
2524 }
2525
2526 #[test]
2527 fn projection_non_unique_names() -> Result<()> {
2528 let plan = table_scan(
2529 Some("employee_csv"),
2530 &employee_schema(),
2531 Some(vec![0, 1]),
2533 )?
2534 .project(vec![col("id"), col("first_name").alias("id")]);
2536
2537 match plan {
2538 Err(DataFusionError::SchemaError(err, _)) => {
2539 if let SchemaError::AmbiguousReference { field } = *err {
2540 let Column {
2541 relation,
2542 name,
2543 spans: _,
2544 } = *field;
2545 let Some(TableReference::Bare { table }) = relation else {
2546 return plan_err!(
2547 "wrong relation: {relation:?}, expected table name"
2548 );
2549 };
2550 assert_eq!(*"employee_csv", *table);
2551 assert_eq!("id", &name);
2552 Ok(())
2553 } else {
2554 plan_err!("Plan should have returned an DataFusionError::SchemaError")
2555 }
2556 }
2557 _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2558 }
2559 }
2560
2561 fn employee_schema() -> Schema {
2562 Schema::new(vec![
2563 Field::new("id", DataType::Int32, false),
2564 Field::new("first_name", DataType::Utf8, false),
2565 Field::new("last_name", DataType::Utf8, false),
2566 Field::new("state", DataType::Utf8, false),
2567 Field::new("salary", DataType::Int32, false),
2568 ])
2569 }
2570
2571 #[test]
2572 fn stringified_plan() {
2573 let stringified_plan =
2574 StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2575 assert!(stringified_plan.should_display(true));
2576 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2579 StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2580 assert!(stringified_plan.should_display(true));
2581 assert!(stringified_plan.should_display(false)); let stringified_plan =
2584 StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2585 assert!(stringified_plan.should_display(true));
2586 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2589 StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2590 assert!(stringified_plan.should_display(true));
2591 assert!(stringified_plan.should_display(false)); let stringified_plan = StringifiedPlan::new(
2594 PlanType::OptimizedLogicalPlan {
2595 optimizer_name: "random opt pass".into(),
2596 },
2597 "...the plan...",
2598 );
2599 assert!(stringified_plan.should_display(true));
2600 assert!(!stringified_plan.should_display(false));
2601 }
2602
2603 fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2604 let schema = Schema::new(vec![
2605 Field::new("a", DataType::UInt32, false),
2606 Field::new("b", DataType::UInt32, false),
2607 Field::new("c", DataType::UInt32, false),
2608 ]);
2609 table_scan(Some(name), &schema, None)?.build()
2610 }
2611
2612 #[test]
2613 fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2614 let plan1 =
2615 table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2616 let plan2 =
2617 table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2618
2619 let err_msg1 =
2620 LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2621 .unwrap_err();
2622
2623 assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2624
2625 Ok(())
2626 }
2627
2628 #[test]
2629 fn plan_builder_unnest() -> Result<()> {
2630 let err = nested_table_scan("test_table")?
2632 .unnest_column("scalar")
2633 .unwrap_err();
2634
2635 let DataFusionError::Internal(desc) = err else {
2636 return plan_err!("Plan should have returned an DataFusionError::Internal");
2637 };
2638
2639 let desc = (*desc
2640 .split(DataFusionError::BACK_TRACE_SEP)
2641 .collect::<Vec<&str>>()
2642 .first()
2643 .unwrap_or(&""))
2644 .to_string();
2645
2646 assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2647
2648 let plan = nested_table_scan("test_table")?
2650 .unnest_column("strings")?
2651 .build()?;
2652
2653 assert_snapshot!(plan, @r"
2654 Unnest: lists[test_table.strings|depth=1] structs[]
2655 TableScan: test_table
2656 ");
2657
2658 let field = plan.schema().field_with_name(None, "strings").unwrap();
2660 assert_eq!(&DataType::Utf8, field.data_type());
2661
2662 let plan = nested_table_scan("test_table")?
2664 .unnest_column("struct_singular")?
2665 .build()?;
2666
2667 assert_snapshot!(plan, @r"
2668 Unnest: lists[] structs[test_table.struct_singular]
2669 TableScan: test_table
2670 ");
2671
2672 for field_name in &["a", "b"] {
2673 let field = plan
2675 .schema()
2676 .field_with_name(None, &format!("struct_singular.{field_name}"))
2677 .unwrap();
2678 assert_eq!(&DataType::UInt32, field.data_type());
2679 }
2680
2681 let plan = nested_table_scan("test_table")?
2683 .unnest_column("strings")?
2684 .unnest_column("structs")?
2685 .unnest_column("struct_singular")?
2686 .build()?;
2687
2688 assert_snapshot!(plan, @r"
2689 Unnest: lists[] structs[test_table.struct_singular]
2690 Unnest: lists[test_table.structs|depth=1] structs[]
2691 Unnest: lists[test_table.strings|depth=1] structs[]
2692 TableScan: test_table
2693 ");
2694
2695 let field = plan.schema().field_with_name(None, "structs").unwrap();
2697 assert!(matches!(field.data_type(), DataType::Struct(_)));
2698
2699 let cols = vec!["strings", "structs", "struct_singular"]
2701 .into_iter()
2702 .map(|c| c.into())
2703 .collect();
2704
2705 let plan = nested_table_scan("test_table")?
2706 .unnest_columns_with_options(cols, UnnestOptions::default())?
2707 .build()?;
2708
2709 assert_snapshot!(plan, @r"
2710 Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2711 TableScan: test_table
2712 ");
2713
2714 let plan = nested_table_scan("test_table")?.unnest_column("missing");
2716 assert!(plan.is_err());
2717
2718 let plan = nested_table_scan("test_table")?
2720 .unnest_columns_with_options(
2721 vec!["stringss".into(), "struct_singular".into()],
2722 UnnestOptions::default()
2723 .with_recursions(RecursionUnnestOption {
2724 input_column: "stringss".into(),
2725 output_column: "stringss_depth_1".into(),
2726 depth: 1,
2727 })
2728 .with_recursions(RecursionUnnestOption {
2729 input_column: "stringss".into(),
2730 output_column: "stringss_depth_2".into(),
2731 depth: 2,
2732 }),
2733 )?
2734 .build()?;
2735
2736 assert_snapshot!(plan, @r"
2737 Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2738 TableScan: test_table
2739 ");
2740
2741 let field = plan
2743 .schema()
2744 .field_with_name(None, "stringss_depth_1")
2745 .unwrap();
2746 assert_eq!(
2747 &DataType::new_list(DataType::Utf8, false),
2748 field.data_type()
2749 );
2750 let field = plan
2751 .schema()
2752 .field_with_name(None, "stringss_depth_2")
2753 .unwrap();
2754 assert_eq!(&DataType::Utf8, field.data_type());
2755 for field_name in &["a", "b"] {
2757 let field = plan
2758 .schema()
2759 .field_with_name(None, &format!("struct_singular.{field_name}"))
2760 .unwrap();
2761 assert_eq!(&DataType::UInt32, field.data_type());
2762 }
2763
2764 Ok(())
2765 }
2766
2767 fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2768 let struct_field_in_list = Field::new_struct(
2771 "item",
2772 vec![
2773 Field::new("a", DataType::UInt32, false),
2774 Field::new("b", DataType::UInt32, false),
2775 ],
2776 false,
2777 );
2778 let string_field = Field::new_list_field(DataType::Utf8, false);
2779 let strings_field = Field::new_list("item", string_field.clone(), false);
2780 let schema = Schema::new(vec![
2781 Field::new("scalar", DataType::UInt32, false),
2782 Field::new_list("strings", string_field, false),
2783 Field::new_list("structs", struct_field_in_list, false),
2784 Field::new(
2785 "struct_singular",
2786 DataType::Struct(Fields::from(vec![
2787 Field::new("a", DataType::UInt32, false),
2788 Field::new("b", DataType::UInt32, false),
2789 ])),
2790 false,
2791 ),
2792 Field::new_list("stringss", strings_field, false),
2793 ]);
2794
2795 table_scan(Some(table_name), &schema, None)
2796 }
2797
2798 #[test]
2799 fn test_union_after_join() -> Result<()> {
2800 let values = vec![vec![lit(1)]];
2801
2802 let left = LogicalPlanBuilder::values(values.clone())?
2803 .alias("left")?
2804 .build()?;
2805 let right = LogicalPlanBuilder::values(values)?
2806 .alias("right")?
2807 .build()?;
2808
2809 let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2810
2811 let plan = LogicalPlanBuilder::from(join.clone())
2812 .union(join)?
2813 .build()?;
2814
2815 assert_snapshot!(plan, @r"
2816 Union
2817 Cross Join:
2818 SubqueryAlias: left
2819 Values: (Int32(1))
2820 SubqueryAlias: right
2821 Values: (Int32(1))
2822 Cross Join:
2823 SubqueryAlias: left
2824 Values: (Int32(1))
2825 SubqueryAlias: right
2826 Values: (Int32(1))
2827 ");
2828
2829 Ok(())
2830 }
2831
2832 #[test]
2833 fn plan_builder_from_logical_plan() -> Result<()> {
2834 let plan =
2835 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2836 .sort(vec![
2837 expr::Sort::new(col("state"), true, true),
2838 expr::Sort::new(col("salary"), false, false),
2839 ])?
2840 .build()?;
2841
2842 let plan_expected = format!("{plan}");
2843 let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2844 assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2845
2846 Ok(())
2847 }
2848
2849 #[test]
2850 fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2851 let constraints =
2852 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2853 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2854
2855 let plan =
2856 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2857 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2858 .build()?;
2859
2860 assert_snapshot!(plan, @r"
2861 Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2862 TableScan: employee_csv projection=[id, state, salary]
2863 ");
2864
2865 Ok(())
2866 }
2867
2868 #[test]
2869 fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2870 let constraints =
2871 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2872 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2873
2874 let options =
2875 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2876 let plan =
2877 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2878 .with_options(options)
2879 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2880 .build()?;
2881
2882 assert_snapshot!(plan, @r"
2883 Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2884 TableScan: employee_csv projection=[id, state, salary]
2885 ");
2886
2887 Ok(())
2888 }
2889
2890 #[test]
2891 fn test_join_metadata() -> Result<()> {
2892 let left_schema = DFSchema::new_with_metadata(
2893 vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2894 HashMap::from([("key".to_string(), "left".to_string())]),
2895 )?;
2896 let right_schema = DFSchema::new_with_metadata(
2897 vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2898 HashMap::from([("key".to_string(), "right".to_string())]),
2899 )?;
2900
2901 let join_schema =
2902 build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2903 assert_eq!(
2904 join_schema.metadata(),
2905 &HashMap::from([("key".to_string(), "left".to_string())])
2906 );
2907 let join_schema =
2908 build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2909 assert_eq!(
2910 join_schema.metadata(),
2911 &HashMap::from([("key".to_string(), "right".to_string())])
2912 );
2913
2914 Ok(())
2915 }
2916
2917 #[test]
2918 fn test_values_metadata() -> Result<()> {
2919 let metadata: HashMap<String, String> =
2920 [("ARROW:extension:metadata".to_string(), "test".to_string())]
2921 .into_iter()
2922 .collect();
2923 let metadata = FieldMetadata::from(metadata);
2924 let values = LogicalPlanBuilder::values(vec![
2925 vec![lit_with_metadata(1, Some(metadata.clone()))],
2926 vec![lit_with_metadata(2, Some(metadata.clone()))],
2927 ])?
2928 .build()?;
2929 assert_eq!(*values.schema().field(0).metadata(), metadata.to_hashmap());
2930
2931 let metadata2: HashMap<String, String> =
2933 [("ARROW:extension:metadata".to_string(), "test2".to_string())]
2934 .into_iter()
2935 .collect();
2936 let metadata2 = FieldMetadata::from(metadata2);
2937 assert!(
2938 LogicalPlanBuilder::values(vec![
2939 vec![lit_with_metadata(1, Some(metadata.clone()))],
2940 vec![lit_with_metadata(2, Some(metadata2.clone()))],
2941 ])
2942 .is_err()
2943 );
2944
2945 Ok(())
2946 }
2947
2948 #[test]
2949 fn test_unique_field_aliases() {
2950 let t1_field_1 = Field::new("a", DataType::Int32, false);
2951 let t2_field_1 = Field::new("a", DataType::Int32, false);
2952 let t2_field_3 = Field::new("a", DataType::Int32, false);
2953 let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2954 let t1_field_2 = Field::new("b", DataType::Int32, false);
2955 let t2_field_2 = Field::new("b", DataType::Int32, false);
2956
2957 let fields = vec![
2958 t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2959 ];
2960 let fields = Fields::from(fields);
2961
2962 let remove_redundant = unique_field_aliases(&fields);
2963
2964 assert_eq!(
2971 remove_redundant,
2972 vec![
2973 None,
2974 Some("a:1".to_string()),
2975 None,
2976 Some("b:1".to_string()),
2977 Some("a:2".to_string()),
2978 Some("a:1:1".to_string()),
2979 ]
2980 );
2981 }
2982}