1use std::any::Any;
21use std::cmp::Ordering;
22use std::collections::{HashMap, HashSet};
23use std::iter::once;
24use std::sync::Arc;
25
26use crate::dml::CopyTo;
27use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
28use crate::expr_rewriter::{
29 coerce_plan_expr_for_schema, normalize_col,
30 normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
31 rewrite_sort_cols_by_aggs,
32};
33use crate::logical_plan::{
34 Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
35 JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
36 Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
37 Window,
38};
39use crate::select_expr::SelectExpr;
40use crate::utils::{
41 can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
42 expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
43 group_window_expr_by_sort_keys,
44};
45use crate::{
46 and, binary_expr, lit, DmlStatement, ExplainOption, Expr, ExprSchemable, Operator,
47 RecursiveQuery, Statement, TableProviderFilterPushDown, TableSource, WriteOp,
48};
49
50use super::dml::InsertOp;
51use arrow::compute::can_cast_types;
52use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
53use datafusion_common::display::ToStringifiedPlan;
54use datafusion_common::file_options::file_type::FileType;
55use datafusion_common::{
56 exec_err, get_target_functional_dependencies, not_impl_err, plan_datafusion_err,
57 plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, NullEquality,
58 Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
59};
60use datafusion_expr_common::type_coercion::binary::type_union_resolution;
61
62use indexmap::IndexSet;
63
64pub const UNNAMED_TABLE: &str = "?table?";
66
67#[derive(Default, Debug, Clone)]
69pub struct LogicalPlanBuilderOptions {
70 add_implicit_group_by_exprs: bool,
73}
74
75impl LogicalPlanBuilderOptions {
76 pub fn new() -> Self {
77 Default::default()
78 }
79
80 pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
82 self.add_implicit_group_by_exprs = add;
83 self
84 }
85}
86
87#[derive(Debug, Clone)]
125pub struct LogicalPlanBuilder {
126 plan: Arc<LogicalPlan>,
127 options: LogicalPlanBuilderOptions,
128}
129
130impl LogicalPlanBuilder {
131 pub fn new(plan: LogicalPlan) -> Self {
133 Self {
134 plan: Arc::new(plan),
135 options: LogicalPlanBuilderOptions::default(),
136 }
137 }
138
139 pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
141 Self {
142 plan,
143 options: LogicalPlanBuilderOptions::default(),
144 }
145 }
146
147 pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
148 self.options = options;
149 self
150 }
151
152 pub fn schema(&self) -> &DFSchemaRef {
154 self.plan.schema()
155 }
156
157 pub fn plan(&self) -> &LogicalPlan {
159 &self.plan
160 }
161
162 pub fn empty(produce_one_row: bool) -> Self {
166 Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
167 produce_one_row,
168 schema: DFSchemaRef::new(DFSchema::empty()),
169 }))
170 }
171
172 pub fn to_recursive_query(
175 self,
176 name: String,
177 recursive_term: LogicalPlan,
178 is_distinct: bool,
179 ) -> Result<Self> {
180 if is_distinct {
182 return not_impl_err!(
183 "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported"
184 );
185 }
186 let static_fields_len = self.plan.schema().fields().len();
188 let recursive_fields_len = recursive_term.schema().fields().len();
189 if static_fields_len != recursive_fields_len {
190 return plan_err!(
191 "Non-recursive term and recursive term must have the same number of columns ({} != {})",
192 static_fields_len, recursive_fields_len
193 );
194 }
195 let coerced_recursive_term =
197 coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
198 Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
199 name,
200 static_term: self.plan,
201 recursive_term: Arc::new(coerced_recursive_term),
202 is_distinct,
203 })))
204 }
205
206 pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
214 if values.is_empty() {
215 return plan_err!("Values list cannot be empty");
216 }
217 let n_cols = values[0].len();
218 if n_cols == 0 {
219 return plan_err!("Values list cannot be zero length");
220 }
221 for (i, row) in values.iter().enumerate() {
222 if row.len() != n_cols {
223 return plan_err!(
224 "Inconsistent data length across values list: got {} values in row {} but expected {}",
225 row.len(),
226 i,
227 n_cols
228 );
229 }
230 }
231
232 Self::infer_data(values)
234 }
235
236 pub fn values_with_schema(
246 values: Vec<Vec<Expr>>,
247 schema: &DFSchemaRef,
248 ) -> Result<Self> {
249 if values.is_empty() {
250 return plan_err!("Values list cannot be empty");
251 }
252 let n_cols = schema.fields().len();
253 if n_cols == 0 {
254 return plan_err!("Values list cannot be zero length");
255 }
256 for (i, row) in values.iter().enumerate() {
257 if row.len() != n_cols {
258 return plan_err!(
259 "Inconsistent data length across values list: got {} values in row {} but expected {}",
260 row.len(),
261 i,
262 n_cols
263 );
264 }
265 }
266
267 Self::infer_values_from_schema(values, schema)
269 }
270
271 fn infer_values_from_schema(
272 values: Vec<Vec<Expr>>,
273 schema: &DFSchema,
274 ) -> Result<Self> {
275 let n_cols = values[0].len();
276 let mut fields = ValuesFields::new();
277 for j in 0..n_cols {
278 let field_type = schema.field(j).data_type();
279 let field_nullable = schema.field(j).is_nullable();
280 for row in values.iter() {
281 let value = &row[j];
282 let data_type = value.get_type(schema)?;
283
284 if !data_type.equals_datatype(field_type) {
285 if can_cast_types(&data_type, field_type) {
286 } else {
287 return exec_err!(
288 "type mismatch and can't cast to got {} and {}",
289 data_type,
290 field_type
291 );
292 }
293 }
294 }
295 fields.push(field_type.to_owned(), field_nullable);
296 }
297
298 Self::infer_inner(values, fields, schema)
299 }
300
301 fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
302 let n_cols = values[0].len();
303 let schema = DFSchema::empty();
304 let mut fields = ValuesFields::new();
305
306 for j in 0..n_cols {
307 let mut common_type: Option<DataType> = None;
308 for (i, row) in values.iter().enumerate() {
309 let value = &row[j];
310 let data_type = value.get_type(&schema)?;
311 if data_type == DataType::Null {
312 continue;
313 }
314
315 if let Some(prev_type) = common_type {
316 let data_types = vec![prev_type.clone(), data_type.clone()];
318 let Some(new_type) = type_union_resolution(&data_types) else {
319 return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
320 };
321 common_type = Some(new_type);
322 } else {
323 common_type = Some(data_type);
324 }
325 }
326 fields.push(common_type.unwrap_or(DataType::Null), true);
329 }
330
331 Self::infer_inner(values, fields, &schema)
332 }
333
334 fn infer_inner(
335 mut values: Vec<Vec<Expr>>,
336 fields: ValuesFields,
337 schema: &DFSchema,
338 ) -> Result<Self> {
339 let fields = fields.into_fields();
340 for row in &mut values {
342 for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
343 if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
344 row[j] = Expr::Literal(
345 ScalarValue::try_from(field_type)?,
346 metadata.clone(),
347 );
348 } else {
349 row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
350 }
351 }
352 }
353
354 let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
355 let schema = DFSchemaRef::new(dfschema);
356
357 Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
358 }
359
360 pub fn scan(
393 table_name: impl Into<TableReference>,
394 table_source: Arc<dyn TableSource>,
395 projection: Option<Vec<usize>>,
396 ) -> Result<Self> {
397 Self::scan_with_filters(table_name, table_source, projection, vec![])
398 }
399
400 pub fn copy_to(
402 input: LogicalPlan,
403 output_url: String,
404 file_type: Arc<dyn FileType>,
405 options: HashMap<String, String>,
406 partition_by: Vec<String>,
407 ) -> Result<Self> {
408 Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
409 Arc::new(input),
410 output_url,
411 partition_by,
412 file_type,
413 options,
414 ))))
415 }
416
417 pub fn insert_into(
452 input: LogicalPlan,
453 table_name: impl Into<TableReference>,
454 target: Arc<dyn TableSource>,
455 insert_op: InsertOp,
456 ) -> Result<Self> {
457 Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
458 table_name.into(),
459 target,
460 WriteOp::Insert(insert_op),
461 Arc::new(input),
462 ))))
463 }
464
465 pub fn scan_with_filters(
467 table_name: impl Into<TableReference>,
468 table_source: Arc<dyn TableSource>,
469 projection: Option<Vec<usize>>,
470 filters: Vec<Expr>,
471 ) -> Result<Self> {
472 Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
473 }
474
475 pub fn scan_with_filters_fetch(
477 table_name: impl Into<TableReference>,
478 table_source: Arc<dyn TableSource>,
479 projection: Option<Vec<usize>>,
480 filters: Vec<Expr>,
481 fetch: Option<usize>,
482 ) -> Result<Self> {
483 Self::scan_with_filters_inner(
484 table_name,
485 table_source,
486 projection,
487 filters,
488 fetch,
489 )
490 }
491
492 fn scan_with_filters_inner(
493 table_name: impl Into<TableReference>,
494 table_source: Arc<dyn TableSource>,
495 projection: Option<Vec<usize>>,
496 filters: Vec<Expr>,
497 fetch: Option<usize>,
498 ) -> Result<Self> {
499 let table_scan =
500 TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
501
502 if table_scan.filters.is_empty() {
504 if let Some(p) = table_scan.source.get_logical_plan() {
505 let sub_plan = p.into_owned();
506
507 if let Some(proj) = table_scan.projection {
508 let projection_exprs = proj
509 .into_iter()
510 .map(|i| {
511 Expr::Column(Column::from(
512 sub_plan.schema().qualified_field(i),
513 ))
514 })
515 .collect::<Vec<_>>();
516 return Self::new(sub_plan)
517 .project(projection_exprs)?
518 .alias(table_scan.table_name);
519 }
520
521 return Self::new(sub_plan).alias(table_scan.table_name);
525 }
526 }
527
528 Ok(Self::new(LogicalPlan::TableScan(table_scan)))
529 }
530
531 pub fn window_plan(
533 input: LogicalPlan,
534 window_exprs: impl IntoIterator<Item = Expr>,
535 ) -> Result<LogicalPlan> {
536 let mut plan = input;
537 let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
538 groups.sort_by(|(key_a, _), (key_b, _)| {
544 for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
545 let key_ordering = compare_sort_expr(first, second, plan.schema());
546 match key_ordering {
547 Ordering::Less => {
548 return Ordering::Less;
549 }
550 Ordering::Greater => {
551 return Ordering::Greater;
552 }
553 Ordering::Equal => {}
554 }
555 }
556 key_b.len().cmp(&key_a.len())
557 });
558 for (_, exprs) in groups {
559 let window_exprs = exprs.into_iter().collect::<Vec<_>>();
560 plan = LogicalPlanBuilder::from(plan)
563 .window(window_exprs)?
564 .build()?;
565 }
566 Ok(plan)
567 }
568
569 pub fn project(
571 self,
572 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
573 ) -> Result<Self> {
574 project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
575 }
576
577 pub fn project_with_validation(
580 self,
581 expr: Vec<(impl Into<SelectExpr>, bool)>,
582 ) -> Result<Self> {
583 project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
584 }
585
586 pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
588 let exprs: Vec<_> = indices
589 .into_iter()
590 .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
591 .collect();
592 self.project(exprs)
593 }
594
595 pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
597 let expr = normalize_col(expr.into(), &self.plan)?;
598 Filter::try_new(expr, self.plan)
599 .map(LogicalPlan::Filter)
600 .map(Self::new)
601 }
602
603 pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
605 let expr = normalize_col(expr.into(), &self.plan)?;
606 Filter::try_new(expr, self.plan)
607 .map(LogicalPlan::Filter)
608 .map(Self::from)
609 }
610
611 pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
613 Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
614 Prepare {
615 name,
616 data_types,
617 input: self.plan,
618 },
619 ))))
620 }
621
622 pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
629 let skip_expr = if skip == 0 {
630 None
631 } else {
632 Some(lit(skip as i64))
633 };
634 let fetch_expr = fetch.map(|f| lit(f as i64));
635 self.limit_by_expr(skip_expr, fetch_expr)
636 }
637
638 pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
642 Ok(Self::new(LogicalPlan::Limit(Limit {
643 skip: skip.map(Box::new),
644 fetch: fetch.map(Box::new),
645 input: self.plan,
646 })))
647 }
648
649 pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
651 subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
652 }
653
654 fn add_missing_columns(
683 curr_plan: LogicalPlan,
684 missing_cols: &IndexSet<Column>,
685 is_distinct: bool,
686 ) -> Result<LogicalPlan> {
687 match curr_plan {
688 LogicalPlan::Projection(Projection {
689 input,
690 mut expr,
691 schema: _,
692 }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
693 let mut missing_exprs = missing_cols
694 .iter()
695 .map(|c| normalize_col(Expr::Column(c.clone()), &input))
696 .collect::<Result<Vec<_>>>()?;
697
698 missing_exprs.retain(|e| !expr.contains(e));
702 if is_distinct {
703 Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
704 }
705 expr.extend(missing_exprs);
706 project(Arc::unwrap_or_clone(input), expr)
707 }
708 _ => {
709 let is_distinct =
710 is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
711 let new_inputs = curr_plan
712 .inputs()
713 .into_iter()
714 .map(|input_plan| {
715 Self::add_missing_columns(
716 (*input_plan).clone(),
717 missing_cols,
718 is_distinct,
719 )
720 })
721 .collect::<Result<Vec<_>>>()?;
722 curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
723 }
724 }
725 }
726
727 fn ambiguous_distinct_check(
728 missing_exprs: &[Expr],
729 missing_cols: &IndexSet<Column>,
730 projection_exprs: &[Expr],
731 ) -> Result<()> {
732 if missing_exprs.is_empty() {
733 return Ok(());
734 }
735
736 let all_aliases = missing_exprs.iter().all(|e| {
744 projection_exprs.iter().any(|proj_expr| {
745 if let Expr::Alias(Alias { expr, .. }) = proj_expr {
746 e == expr.as_ref()
747 } else {
748 false
749 }
750 })
751 });
752 if all_aliases {
753 return Ok(());
754 }
755
756 let missing_col_names = missing_cols
757 .iter()
758 .map(|col| col.flat_name())
759 .collect::<String>();
760
761 plan_err!("For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list")
762 }
763
764 pub fn sort_by(
766 self,
767 expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
768 ) -> Result<Self> {
769 self.sort(
770 expr.into_iter()
771 .map(|e| e.into().sort(true, false))
772 .collect::<Vec<SortExpr>>(),
773 )
774 }
775
776 pub fn sort(
777 self,
778 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
779 ) -> Result<Self> {
780 self.sort_with_limit(sorts, None)
781 }
782
783 pub fn sort_with_limit(
785 self,
786 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
787 fetch: Option<usize>,
788 ) -> Result<Self> {
789 let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
790
791 let schema = self.plan.schema();
792
793 let mut missing_cols: IndexSet<Column> = IndexSet::new();
795 sorts.iter().try_for_each::<_, Result<()>>(|sort| {
796 let columns = sort.expr.column_refs();
797
798 missing_cols.extend(
799 columns
800 .into_iter()
801 .filter(|c| !schema.has_column(c))
802 .cloned(),
803 );
804
805 Ok(())
806 })?;
807
808 if missing_cols.is_empty() {
809 return Ok(Self::new(LogicalPlan::Sort(Sort {
810 expr: normalize_sorts(sorts, &self.plan)?,
811 input: self.plan,
812 fetch,
813 })));
814 }
815
816 let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
818
819 let is_distinct = false;
820 let plan = Self::add_missing_columns(
821 Arc::unwrap_or_clone(self.plan),
822 &missing_cols,
823 is_distinct,
824 )?;
825
826 let sort_plan = LogicalPlan::Sort(Sort {
827 expr: normalize_sorts(sorts, &plan)?,
828 input: Arc::new(plan),
829 fetch,
830 });
831
832 Projection::try_new(new_expr, Arc::new(sort_plan))
833 .map(LogicalPlan::Projection)
834 .map(Self::new)
835 }
836
837 pub fn union(self, plan: LogicalPlan) -> Result<Self> {
839 union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
840 }
841
842 pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
844 union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
845 }
846
847 pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
849 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
850 let right_plan: LogicalPlan = plan;
851
852 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
853 union_by_name(left_plan, right_plan)?,
854 )))))
855 }
856
857 pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
859 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
860 let right_plan: LogicalPlan = plan;
861
862 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
863 union(left_plan, right_plan)?,
864 )))))
865 }
866
867 pub fn distinct(self) -> Result<Self> {
869 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
870 }
871
872 pub fn distinct_on(
875 self,
876 on_expr: Vec<Expr>,
877 select_expr: Vec<Expr>,
878 sort_expr: Option<Vec<SortExpr>>,
879 ) -> Result<Self> {
880 Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
881 DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
882 ))))
883 }
884
885 pub fn join(
899 self,
900 right: LogicalPlan,
901 join_type: JoinType,
902 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
903 filter: Option<Expr>,
904 ) -> Result<Self> {
905 self.join_detailed(
906 right,
907 join_type,
908 join_keys,
909 filter,
910 NullEquality::NullEqualsNothing,
911 )
912 }
913
914 pub fn join_on(
955 self,
956 right: LogicalPlan,
957 join_type: JoinType,
958 on_exprs: impl IntoIterator<Item = Expr>,
959 ) -> Result<Self> {
960 let filter = on_exprs.into_iter().reduce(Expr::and);
961
962 self.join_detailed(
963 right,
964 join_type,
965 (Vec::<Column>::new(), Vec::<Column>::new()),
966 filter,
967 NullEquality::NullEqualsNothing,
968 )
969 }
970
971 pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
972 if column.relation.is_some() {
973 return Ok(column);
975 }
976
977 let schema = plan.schema();
978 let fallback_schemas = plan.fallback_normalize_schemas();
979 let using_columns = plan.using_columns()?;
980 column.normalize_with_schemas_and_ambiguity_check(
981 &[&[schema], &fallback_schemas],
982 &using_columns,
983 )
984 }
985
986 pub fn join_detailed(
993 self,
994 right: LogicalPlan,
995 join_type: JoinType,
996 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
997 filter: Option<Expr>,
998 null_equality: NullEquality,
999 ) -> Result<Self> {
1000 if join_keys.0.len() != join_keys.1.len() {
1001 return plan_err!("left_keys and right_keys were not the same length");
1002 }
1003
1004 let filter = if let Some(expr) = filter {
1005 let filter = normalize_col_with_schemas_and_ambiguity_check(
1006 expr,
1007 &[&[self.schema(), right.schema()]],
1008 &[],
1009 )?;
1010 Some(filter)
1011 } else {
1012 None
1013 };
1014
1015 let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1016 join_keys
1017 .0
1018 .into_iter()
1019 .zip(join_keys.1)
1020 .map(|(l, r)| {
1021 let l = l.into();
1022 let r = r.into();
1023
1024 match (&l.relation, &r.relation) {
1025 (Some(lr), Some(rr)) => {
1026 let l_is_left =
1027 self.plan.schema().field_with_qualified_name(lr, &l.name);
1028 let l_is_right =
1029 right.schema().field_with_qualified_name(lr, &l.name);
1030 let r_is_left =
1031 self.plan.schema().field_with_qualified_name(rr, &r.name);
1032 let r_is_right =
1033 right.schema().field_with_qualified_name(rr, &r.name);
1034
1035 match (l_is_left, l_is_right, r_is_left, r_is_right) {
1036 (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1037 (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1038 _ => (
1039 Self::normalize(&self.plan, l),
1040 Self::normalize(&right, r),
1041 ),
1042 }
1043 }
1044 (Some(lr), None) => {
1045 let l_is_left =
1046 self.plan.schema().field_with_qualified_name(lr, &l.name);
1047 let l_is_right =
1048 right.schema().field_with_qualified_name(lr, &l.name);
1049
1050 match (l_is_left, l_is_right) {
1051 (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1052 (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1053 _ => (
1054 Self::normalize(&self.plan, l),
1055 Self::normalize(&right, r),
1056 ),
1057 }
1058 }
1059 (None, Some(rr)) => {
1060 let r_is_left =
1061 self.plan.schema().field_with_qualified_name(rr, &r.name);
1062 let r_is_right =
1063 right.schema().field_with_qualified_name(rr, &r.name);
1064
1065 match (r_is_left, r_is_right) {
1066 (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1067 (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1068 _ => (
1069 Self::normalize(&self.plan, l),
1070 Self::normalize(&right, r),
1071 ),
1072 }
1073 }
1074 (None, None) => {
1075 let mut swap = false;
1076 let left_key = Self::normalize(&self.plan, l.clone())
1077 .or_else(|_| {
1078 swap = true;
1079 Self::normalize(&right, l)
1080 });
1081 if swap {
1082 (Self::normalize(&self.plan, r), left_key)
1083 } else {
1084 (left_key, Self::normalize(&right, r))
1085 }
1086 }
1087 }
1088 })
1089 .unzip();
1090
1091 let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1092 let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1093
1094 let on: Vec<_> = left_keys
1095 .into_iter()
1096 .zip(right_keys)
1097 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1098 .collect();
1099 let join_schema =
1100 build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1101
1102 if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1104 return plan_err!("join condition should not be empty");
1105 }
1106
1107 Ok(Self::new(LogicalPlan::Join(Join {
1108 left: self.plan,
1109 right: Arc::new(right),
1110 on,
1111 filter,
1112 join_type,
1113 join_constraint: JoinConstraint::On,
1114 schema: DFSchemaRef::new(join_schema),
1115 null_equality,
1116 })))
1117 }
1118
1119 pub fn join_using(
1121 self,
1122 right: LogicalPlan,
1123 join_type: JoinType,
1124 using_keys: Vec<Column>,
1125 ) -> Result<Self> {
1126 let left_keys: Vec<Column> = using_keys
1127 .clone()
1128 .into_iter()
1129 .map(|c| Self::normalize(&self.plan, c))
1130 .collect::<Result<_>>()?;
1131 let right_keys: Vec<Column> = using_keys
1132 .into_iter()
1133 .map(|c| Self::normalize(&right, c))
1134 .collect::<Result<_>>()?;
1135
1136 let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1137 let mut join_on: Vec<(Expr, Expr)> = vec![];
1138 let mut filters: Option<Expr> = None;
1139 for (l, r) in &on {
1140 if self.plan.schema().has_column(l)
1141 && right.schema().has_column(r)
1142 && can_hash(
1143 datafusion_common::ExprSchema::field_from_column(
1144 self.plan.schema(),
1145 l,
1146 )?
1147 .data_type(),
1148 )
1149 {
1150 join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1151 } else if self.plan.schema().has_column(l)
1152 && right.schema().has_column(r)
1153 && can_hash(
1154 datafusion_common::ExprSchema::field_from_column(
1155 self.plan.schema(),
1156 r,
1157 )?
1158 .data_type(),
1159 )
1160 {
1161 join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1162 } else {
1163 let expr = binary_expr(
1164 Expr::Column(l.clone()),
1165 Operator::Eq,
1166 Expr::Column(r.clone()),
1167 );
1168 match filters {
1169 None => filters = Some(expr),
1170 Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1171 }
1172 }
1173 }
1174
1175 if join_on.is_empty() {
1176 let join = Self::from(self.plan).cross_join(right)?;
1177 join.filter(filters.ok_or_else(|| {
1178 DataFusionError::Internal("filters should not be None here".to_string())
1179 })?)
1180 } else {
1181 let join = Join::try_new(
1182 self.plan,
1183 Arc::new(right),
1184 join_on,
1185 filters,
1186 join_type,
1187 JoinConstraint::Using,
1188 NullEquality::NullEqualsNothing,
1189 )?;
1190
1191 Ok(Self::new(LogicalPlan::Join(join)))
1192 }
1193 }
1194
1195 pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1197 let join = Join::try_new(
1198 self.plan,
1199 Arc::new(right),
1200 vec![],
1201 None,
1202 JoinType::Inner,
1203 JoinConstraint::On,
1204 NullEquality::NullEqualsNothing,
1205 )?;
1206
1207 Ok(Self::new(LogicalPlan::Join(join)))
1208 }
1209
1210 pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1212 Ok(Self::new(LogicalPlan::Repartition(Repartition {
1213 input: self.plan,
1214 partitioning_scheme,
1215 })))
1216 }
1217
1218 pub fn window(
1220 self,
1221 window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1222 ) -> Result<Self> {
1223 let window_expr = normalize_cols(window_expr, &self.plan)?;
1224 validate_unique_names("Windows", &window_expr)?;
1225 Ok(Self::new(LogicalPlan::Window(Window::try_new(
1226 window_expr,
1227 self.plan,
1228 )?)))
1229 }
1230
1231 pub fn aggregate(
1235 self,
1236 group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1237 aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1238 ) -> Result<Self> {
1239 let group_expr = normalize_cols(group_expr, &self.plan)?;
1240 let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1241
1242 let group_expr = if self.options.add_implicit_group_by_exprs {
1243 add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1244 } else {
1245 group_expr
1246 };
1247
1248 Aggregate::try_new(self.plan, group_expr, aggr_expr)
1249 .map(LogicalPlan::Aggregate)
1250 .map(Self::new)
1251 }
1252
1253 pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1260 self.explain_option_format(
1262 ExplainOption::default()
1263 .with_verbose(verbose)
1264 .with_analyze(analyze),
1265 )
1266 }
1267
1268 pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1272 let schema = LogicalPlan::explain_schema();
1273 let schema = schema.to_dfschema_ref()?;
1274
1275 if explain_option.analyze {
1276 Ok(Self::new(LogicalPlan::Analyze(Analyze {
1277 verbose: explain_option.verbose,
1278 input: self.plan,
1279 schema,
1280 })))
1281 } else {
1282 let stringified_plans =
1283 vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1284
1285 Ok(Self::new(LogicalPlan::Explain(Explain {
1286 verbose: explain_option.verbose,
1287 plan: self.plan,
1288 explain_format: explain_option.format,
1289 stringified_plans,
1290 schema,
1291 logical_optimization_succeeded: false,
1292 })))
1293 }
1294 }
1295
1296 pub fn intersect(
1298 left_plan: LogicalPlan,
1299 right_plan: LogicalPlan,
1300 is_all: bool,
1301 ) -> Result<LogicalPlan> {
1302 LogicalPlanBuilder::intersect_or_except(
1303 left_plan,
1304 right_plan,
1305 JoinType::LeftSemi,
1306 is_all,
1307 )
1308 }
1309
1310 pub fn except(
1312 left_plan: LogicalPlan,
1313 right_plan: LogicalPlan,
1314 is_all: bool,
1315 ) -> Result<LogicalPlan> {
1316 LogicalPlanBuilder::intersect_or_except(
1317 left_plan,
1318 right_plan,
1319 JoinType::LeftAnti,
1320 is_all,
1321 )
1322 }
1323
1324 fn intersect_or_except(
1326 left_plan: LogicalPlan,
1327 right_plan: LogicalPlan,
1328 join_type: JoinType,
1329 is_all: bool,
1330 ) -> Result<LogicalPlan> {
1331 let left_len = left_plan.schema().fields().len();
1332 let right_len = right_plan.schema().fields().len();
1333
1334 if left_len != right_len {
1335 return plan_err!(
1336 "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1337 );
1338 }
1339
1340 let join_keys = left_plan
1341 .schema()
1342 .fields()
1343 .iter()
1344 .zip(right_plan.schema().fields().iter())
1345 .map(|(left_field, right_field)| {
1346 (
1347 (Column::from_name(left_field.name())),
1348 (Column::from_name(right_field.name())),
1349 )
1350 })
1351 .unzip();
1352 if is_all {
1353 LogicalPlanBuilder::from(left_plan)
1354 .join_detailed(
1355 right_plan,
1356 join_type,
1357 join_keys,
1358 None,
1359 NullEquality::NullEqualsNull,
1360 )?
1361 .build()
1362 } else {
1363 LogicalPlanBuilder::from(left_plan)
1364 .distinct()?
1365 .join_detailed(
1366 right_plan,
1367 join_type,
1368 join_keys,
1369 None,
1370 NullEquality::NullEqualsNull,
1371 )?
1372 .build()
1373 }
1374 }
1375
1376 pub fn build(self) -> Result<LogicalPlan> {
1378 Ok(Arc::unwrap_or_clone(self.plan))
1379 }
1380
1381 pub fn join_with_expr_keys(
1396 self,
1397 right: LogicalPlan,
1398 join_type: JoinType,
1399 equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1400 filter: Option<Expr>,
1401 ) -> Result<Self> {
1402 if equi_exprs.0.len() != equi_exprs.1.len() {
1403 return plan_err!("left_keys and right_keys were not the same length");
1404 }
1405
1406 let join_key_pairs = equi_exprs
1407 .0
1408 .into_iter()
1409 .zip(equi_exprs.1)
1410 .map(|(l, r)| {
1411 let left_key = l.into();
1412 let right_key = r.into();
1413 let mut left_using_columns = HashSet::new();
1414 expr_to_columns(&left_key, &mut left_using_columns)?;
1415 let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1416 left_key,
1417 &[&[self.plan.schema()]],
1418 &[],
1419 )?;
1420
1421 let mut right_using_columns = HashSet::new();
1422 expr_to_columns(&right_key, &mut right_using_columns)?;
1423 let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1424 right_key,
1425 &[&[right.schema()]],
1426 &[],
1427 )?;
1428
1429 find_valid_equijoin_key_pair(
1431 &normalized_left_key,
1432 &normalized_right_key,
1433 self.plan.schema(),
1434 right.schema(),
1435 )?.ok_or_else(||
1436 plan_datafusion_err!(
1437 "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1438 ))
1439 })
1440 .collect::<Result<Vec<_>>>()?;
1441
1442 let join = Join::try_new(
1443 self.plan,
1444 Arc::new(right),
1445 join_key_pairs,
1446 filter,
1447 join_type,
1448 JoinConstraint::On,
1449 NullEquality::NullEqualsNothing,
1450 )?;
1451
1452 Ok(Self::new(LogicalPlan::Join(join)))
1453 }
1454
1455 pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1457 unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1458 }
1459
1460 pub fn unnest_column_with_options(
1462 self,
1463 column: impl Into<Column>,
1464 options: UnnestOptions,
1465 ) -> Result<Self> {
1466 unnest_with_options(
1467 Arc::unwrap_or_clone(self.plan),
1468 vec![column.into()],
1469 options,
1470 )
1471 .map(Self::new)
1472 }
1473
1474 pub fn unnest_columns_with_options(
1476 self,
1477 columns: Vec<Column>,
1478 options: UnnestOptions,
1479 ) -> Result<Self> {
1480 unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1481 .map(Self::new)
1482 }
1483}
1484
1485impl From<LogicalPlan> for LogicalPlanBuilder {
1486 fn from(plan: LogicalPlan) -> Self {
1487 LogicalPlanBuilder::new(plan)
1488 }
1489}
1490
1491impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1492 fn from(plan: Arc<LogicalPlan>) -> Self {
1493 LogicalPlanBuilder::new_from_arc(plan)
1494 }
1495}
1496
1497#[derive(Default)]
1499struct ValuesFields {
1500 inner: Vec<Field>,
1501}
1502
1503impl ValuesFields {
1504 pub fn new() -> Self {
1505 Self::default()
1506 }
1507
1508 pub fn push(&mut self, data_type: DataType, nullable: bool) {
1509 let name = format!("column{}", self.inner.len() + 1);
1512 self.inner.push(Field::new(name, data_type, nullable));
1513 }
1514
1515 pub fn into_fields(self) -> Fields {
1516 self.inner.into()
1517 }
1518}
1519
1520pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1529 let mut name_map = HashMap::new();
1530 let mut seen: HashSet<String> = HashSet::new();
1531
1532 fields
1533 .into_iter()
1534 .map(|field| {
1535 let base_name = field.name();
1536 let count = name_map.entry(base_name.clone()).or_insert(0);
1537 let mut new_name = base_name.clone();
1538
1539 while seen.contains(&new_name) {
1541 *count += 1;
1542 new_name = format!("{base_name}:{count}");
1543 }
1544
1545 seen.insert(new_name.clone());
1546
1547 let mut modified_field =
1548 Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1549 modified_field.set_metadata(field.metadata().clone());
1550 modified_field
1551 })
1552 .collect()
1553}
1554
1555fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1556 let mut table_references = schema
1557 .iter()
1558 .filter_map(|(qualifier, _)| qualifier)
1559 .collect::<Vec<_>>();
1560 table_references.dedup();
1561 let table_reference = if table_references.len() == 1 {
1562 table_references.pop().cloned()
1563 } else {
1564 None
1565 };
1566
1567 (
1568 table_reference,
1569 Arc::new(Field::new("mark", DataType::Boolean, false)),
1570 )
1571}
1572
1573pub fn build_join_schema(
1576 left: &DFSchema,
1577 right: &DFSchema,
1578 join_type: &JoinType,
1579) -> Result<DFSchema> {
1580 fn nullify_fields<'a>(
1581 fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1582 ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1583 fields
1584 .map(|(q, f)| {
1585 let field = f.as_ref().clone().with_nullable(true);
1587 (q.cloned(), Arc::new(field))
1588 })
1589 .collect()
1590 }
1591
1592 let right_fields = right.iter();
1593 let left_fields = left.iter();
1594
1595 let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1596 JoinType::Inner => {
1597 let left_fields = left_fields
1599 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1600 .collect::<Vec<_>>();
1601 let right_fields = right_fields
1602 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1603 .collect::<Vec<_>>();
1604 left_fields.into_iter().chain(right_fields).collect()
1605 }
1606 JoinType::Left => {
1607 let left_fields = left_fields
1609 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1610 .collect::<Vec<_>>();
1611 left_fields
1612 .into_iter()
1613 .chain(nullify_fields(right_fields))
1614 .collect()
1615 }
1616 JoinType::Right => {
1617 let right_fields = right_fields
1619 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1620 .collect::<Vec<_>>();
1621 nullify_fields(left_fields)
1622 .into_iter()
1623 .chain(right_fields)
1624 .collect()
1625 }
1626 JoinType::Full => {
1627 nullify_fields(left_fields)
1629 .into_iter()
1630 .chain(nullify_fields(right_fields))
1631 .collect()
1632 }
1633 JoinType::LeftSemi | JoinType::LeftAnti => {
1634 left_fields
1636 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1637 .collect()
1638 }
1639 JoinType::LeftMark => left_fields
1640 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1641 .chain(once(mark_field(right)))
1642 .collect(),
1643 JoinType::RightSemi | JoinType::RightAnti => {
1644 right_fields
1646 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1647 .collect()
1648 }
1649 JoinType::RightMark => right_fields
1650 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1651 .chain(once(mark_field(left)))
1652 .collect(),
1653 };
1654 let func_dependencies = left.functional_dependencies().join(
1655 right.functional_dependencies(),
1656 join_type,
1657 left.fields().len(),
1658 );
1659
1660 let (schema1, schema2) = match join_type {
1661 JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, right),
1662 _ => (right, left),
1663 };
1664
1665 let metadata = schema1
1666 .metadata()
1667 .clone()
1668 .into_iter()
1669 .chain(schema2.metadata().clone())
1670 .collect();
1671
1672 let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1673 dfschema.with_functional_dependencies(func_dependencies)
1674}
1675
1676pub fn requalify_sides_if_needed(
1686 left: LogicalPlanBuilder,
1687 right: LogicalPlanBuilder,
1688) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1689 let left_cols = left.schema().columns();
1690 let right_cols = right.schema().columns();
1691 if left_cols.iter().any(|l| {
1692 right_cols.iter().any(|r| {
1693 l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
1694 })
1695 }) {
1696 Ok((
1699 left.alias(TableReference::bare("left"))?,
1700 right.alias(TableReference::bare("right"))?,
1701 true,
1702 ))
1703 } else {
1704 Ok((left, right, false))
1705 }
1706}
1707
1708pub fn add_group_by_exprs_from_dependencies(
1718 mut group_expr: Vec<Expr>,
1719 schema: &DFSchemaRef,
1720) -> Result<Vec<Expr>> {
1721 let mut group_by_field_names = group_expr
1724 .iter()
1725 .map(|e| e.schema_name().to_string())
1726 .collect::<Vec<_>>();
1727
1728 if let Some(target_indices) =
1729 get_target_functional_dependencies(schema, &group_by_field_names)
1730 {
1731 for idx in target_indices {
1732 let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1733 let expr_name = expr.schema_name().to_string();
1734 if !group_by_field_names.contains(&expr_name) {
1735 group_by_field_names.push(expr_name);
1736 group_expr.push(expr);
1737 }
1738 }
1739 }
1740 Ok(group_expr)
1741}
1742
1743pub fn validate_unique_names<'a>(
1745 node_name: &str,
1746 expressions: impl IntoIterator<Item = &'a Expr>,
1747) -> Result<()> {
1748 let mut unique_names = HashMap::new();
1749
1750 expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1751 let name = expr.schema_name().to_string();
1752 match unique_names.get(&name) {
1753 None => {
1754 unique_names.insert(name, (position, expr));
1755 Ok(())
1756 },
1757 Some((existing_position, existing_expr)) => {
1758 plan_err!("{node_name} require unique expression names \
1759 but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1760 at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1761 )
1762 }
1763 }
1764 })
1765}
1766
1767pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1779 Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1780 Arc::new(left_plan),
1781 Arc::new(right_plan),
1782 ])?))
1783}
1784
1785pub fn union_by_name(
1788 left_plan: LogicalPlan,
1789 right_plan: LogicalPlan,
1790) -> Result<LogicalPlan> {
1791 Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1792 Arc::new(left_plan),
1793 Arc::new(right_plan),
1794 ])?))
1795}
1796
1797pub fn project(
1803 plan: LogicalPlan,
1804 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1805) -> Result<LogicalPlan> {
1806 project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1807}
1808
1809fn project_with_validation(
1817 plan: LogicalPlan,
1818 expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1819) -> Result<LogicalPlan> {
1820 let mut projected_expr = vec![];
1821 for (e, validate) in expr {
1822 let e = e.into();
1823 match e {
1824 SelectExpr::Wildcard(opt) => {
1825 let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1826
1827 let expanded = if let Some(replace) = opt.replace {
1830 replace_columns(expanded, &replace)?
1831 } else {
1832 expanded
1833 };
1834
1835 for e in expanded {
1836 if validate {
1837 projected_expr
1838 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1839 } else {
1840 projected_expr.push(e)
1841 }
1842 }
1843 }
1844 SelectExpr::QualifiedWildcard(table_ref, opt) => {
1845 let expanded =
1846 expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1847
1848 let expanded = if let Some(replace) = opt.replace {
1851 replace_columns(expanded, &replace)?
1852 } else {
1853 expanded
1854 };
1855
1856 for e in expanded {
1857 if validate {
1858 projected_expr
1859 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1860 } else {
1861 projected_expr.push(e)
1862 }
1863 }
1864 }
1865 SelectExpr::Expression(e) => {
1866 if validate {
1867 projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1868 } else {
1869 projected_expr.push(e)
1870 }
1871 }
1872 }
1873 }
1874 validate_unique_names("Projections", projected_expr.iter())?;
1875
1876 Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1877}
1878
1879fn replace_columns(
1884 mut exprs: Vec<Expr>,
1885 replace: &PlannedReplaceSelectItem,
1886) -> Result<Vec<Expr>> {
1887 for expr in exprs.iter_mut() {
1888 if let Expr::Column(Column { name, .. }) = expr {
1889 if let Some((_, new_expr)) = replace
1890 .items()
1891 .iter()
1892 .zip(replace.expressions().iter())
1893 .find(|(item, _)| item.column_name.value == *name)
1894 {
1895 *expr = new_expr.clone().alias(name.clone())
1896 }
1897 }
1898 }
1899 Ok(exprs)
1900}
1901
1902pub fn subquery_alias(
1904 plan: LogicalPlan,
1905 alias: impl Into<TableReference>,
1906) -> Result<LogicalPlan> {
1907 SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1908}
1909
1910pub fn table_scan(
1913 name: Option<impl Into<TableReference>>,
1914 table_schema: &Schema,
1915 projection: Option<Vec<usize>>,
1916) -> Result<LogicalPlanBuilder> {
1917 table_scan_with_filters(name, table_schema, projection, vec![])
1918}
1919
1920pub fn table_scan_with_filters(
1924 name: Option<impl Into<TableReference>>,
1925 table_schema: &Schema,
1926 projection: Option<Vec<usize>>,
1927 filters: Vec<Expr>,
1928) -> Result<LogicalPlanBuilder> {
1929 let table_source = table_source(table_schema);
1930 let name = name
1931 .map(|n| n.into())
1932 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1933 LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
1934}
1935
1936pub fn table_scan_with_filter_and_fetch(
1940 name: Option<impl Into<TableReference>>,
1941 table_schema: &Schema,
1942 projection: Option<Vec<usize>>,
1943 filters: Vec<Expr>,
1944 fetch: Option<usize>,
1945) -> Result<LogicalPlanBuilder> {
1946 let table_source = table_source(table_schema);
1947 let name = name
1948 .map(|n| n.into())
1949 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1950 LogicalPlanBuilder::scan_with_filters_fetch(
1951 name,
1952 table_source,
1953 projection,
1954 filters,
1955 fetch,
1956 )
1957}
1958
1959pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
1960 let table_schema = Arc::new(table_schema.clone());
1961 Arc::new(LogicalTableSource {
1962 table_schema,
1963 constraints: Default::default(),
1964 })
1965}
1966
1967pub fn table_source_with_constraints(
1968 table_schema: &Schema,
1969 constraints: Constraints,
1970) -> Arc<dyn TableSource> {
1971 let table_schema = Arc::new(table_schema.clone());
1972 Arc::new(LogicalTableSource {
1973 table_schema,
1974 constraints,
1975 })
1976}
1977
1978pub fn wrap_projection_for_join_if_necessary(
1980 join_keys: &[Expr],
1981 input: LogicalPlan,
1982) -> Result<(LogicalPlan, Vec<Column>, bool)> {
1983 let input_schema = input.schema();
1984 let alias_join_keys: Vec<Expr> = join_keys
1985 .iter()
1986 .map(|key| {
1987 if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
1996 let alias = format!("{key}");
1997 key.clone().alias(alias)
1998 } else {
1999 key.clone()
2000 }
2001 })
2002 .collect::<Vec<_>>();
2003
2004 let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2005 let plan = if need_project {
2006 let mut projection = input_schema
2008 .columns()
2009 .into_iter()
2010 .map(Expr::Column)
2011 .collect::<Vec<_>>();
2012 let join_key_items = alias_join_keys
2013 .iter()
2014 .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2015 .cloned()
2016 .collect::<HashSet<Expr>>();
2017 projection.extend(join_key_items);
2018
2019 LogicalPlanBuilder::from(input)
2020 .project(projection.into_iter().map(SelectExpr::from))?
2021 .build()?
2022 } else {
2023 input
2024 };
2025
2026 let join_on = alias_join_keys
2027 .into_iter()
2028 .map(|key| {
2029 if let Some(col) = key.try_as_col() {
2030 Ok(col.clone())
2031 } else {
2032 let name = key.schema_name().to_string();
2033 Ok(Column::from_name(name))
2034 }
2035 })
2036 .collect::<Result<Vec<_>>>()?;
2037
2038 Ok((plan, join_on, need_project))
2039}
2040
2041pub struct LogicalTableSource {
2045 table_schema: SchemaRef,
2046 constraints: Constraints,
2047}
2048
2049impl LogicalTableSource {
2050 pub fn new(table_schema: SchemaRef) -> Self {
2052 Self {
2053 table_schema,
2054 constraints: Constraints::default(),
2055 }
2056 }
2057
2058 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2059 self.constraints = constraints;
2060 self
2061 }
2062}
2063
2064impl TableSource for LogicalTableSource {
2065 fn as_any(&self) -> &dyn Any {
2066 self
2067 }
2068
2069 fn schema(&self) -> SchemaRef {
2070 Arc::clone(&self.table_schema)
2071 }
2072
2073 fn constraints(&self) -> Option<&Constraints> {
2074 Some(&self.constraints)
2075 }
2076
2077 fn supports_filters_pushdown(
2078 &self,
2079 filters: &[&Expr],
2080 ) -> Result<Vec<TableProviderFilterPushDown>> {
2081 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2082 }
2083}
2084
2085pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2087 unnest_with_options(input, columns, UnnestOptions::default())
2088}
2089
2090pub fn get_struct_unnested_columns(
2091 col_name: &String,
2092 inner_fields: &Fields,
2093) -> Vec<Column> {
2094 inner_fields
2095 .iter()
2096 .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2097 .collect()
2098}
2099
2100pub fn unnest_with_options(
2130 input: LogicalPlan,
2131 columns_to_unnest: Vec<Column>,
2132 options: UnnestOptions,
2133) -> Result<LogicalPlan> {
2134 Ok(LogicalPlan::Unnest(Unnest::try_new(
2135 Arc::new(input),
2136 columns_to_unnest,
2137 options,
2138 )?))
2139}
2140
2141#[cfg(test)]
2142mod tests {
2143 use super::*;
2144 use crate::logical_plan::StringifiedPlan;
2145 use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2146
2147 use crate::test::function_stub::sum;
2148 use datafusion_common::{Constraint, RecursionUnnestOption, SchemaError};
2149 use insta::assert_snapshot;
2150
2151 #[test]
2152 fn plan_builder_simple() -> Result<()> {
2153 let plan =
2154 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2155 .filter(col("state").eq(lit("CO")))?
2156 .project(vec![col("id")])?
2157 .build()?;
2158
2159 assert_snapshot!(plan, @r#"
2160 Projection: employee_csv.id
2161 Filter: employee_csv.state = Utf8("CO")
2162 TableScan: employee_csv projection=[id, state]
2163 "#);
2164
2165 Ok(())
2166 }
2167
2168 #[test]
2169 fn plan_builder_schema() {
2170 let schema = employee_schema();
2171 let projection = None;
2172 let plan =
2173 LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2174 .unwrap();
2175 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2176
2177 let projection = None;
2180 let plan =
2181 LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2182 .unwrap();
2183 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2184 }
2185
2186 #[test]
2187 fn plan_builder_empty_name() {
2188 let schema = employee_schema();
2189 let projection = None;
2190 let err =
2191 LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2192 assert_snapshot!(
2193 err.strip_backtrace(),
2194 @"Error during planning: table_name cannot be empty"
2195 );
2196 }
2197
2198 #[test]
2199 fn plan_builder_sort() -> Result<()> {
2200 let plan =
2201 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2202 .sort(vec![
2203 expr::Sort::new(col("state"), true, true),
2204 expr::Sort::new(col("salary"), false, false),
2205 ])?
2206 .build()?;
2207
2208 assert_snapshot!(plan, @r"
2209 Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2210 TableScan: employee_csv projection=[state, salary]
2211 ");
2212
2213 Ok(())
2214 }
2215
2216 #[test]
2217 fn plan_builder_union() -> Result<()> {
2218 let plan =
2219 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2220
2221 let plan = plan
2222 .clone()
2223 .union(plan.clone().build()?)?
2224 .union(plan.clone().build()?)?
2225 .union(plan.build()?)?
2226 .build()?;
2227
2228 assert_snapshot!(plan, @r"
2229 Union
2230 Union
2231 Union
2232 TableScan: employee_csv projection=[state, salary]
2233 TableScan: employee_csv projection=[state, salary]
2234 TableScan: employee_csv projection=[state, salary]
2235 TableScan: employee_csv projection=[state, salary]
2236 ");
2237
2238 Ok(())
2239 }
2240
2241 #[test]
2242 fn plan_builder_union_distinct() -> Result<()> {
2243 let plan =
2244 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2245
2246 let plan = plan
2247 .clone()
2248 .union_distinct(plan.clone().build()?)?
2249 .union_distinct(plan.clone().build()?)?
2250 .union_distinct(plan.build()?)?
2251 .build()?;
2252
2253 assert_snapshot!(plan, @r"
2254 Distinct:
2255 Union
2256 Distinct:
2257 Union
2258 Distinct:
2259 Union
2260 TableScan: employee_csv projection=[state, salary]
2261 TableScan: employee_csv projection=[state, salary]
2262 TableScan: employee_csv projection=[state, salary]
2263 TableScan: employee_csv projection=[state, salary]
2264 ");
2265
2266 Ok(())
2267 }
2268
2269 #[test]
2270 fn plan_builder_simple_distinct() -> Result<()> {
2271 let plan =
2272 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2273 .filter(col("state").eq(lit("CO")))?
2274 .project(vec![col("id")])?
2275 .distinct()?
2276 .build()?;
2277
2278 assert_snapshot!(plan, @r#"
2279 Distinct:
2280 Projection: employee_csv.id
2281 Filter: employee_csv.state = Utf8("CO")
2282 TableScan: employee_csv projection=[id, state]
2283 "#);
2284
2285 Ok(())
2286 }
2287
2288 #[test]
2289 fn exists_subquery() -> Result<()> {
2290 let foo = test_table_scan_with_name("foo")?;
2291 let bar = test_table_scan_with_name("bar")?;
2292
2293 let subquery = LogicalPlanBuilder::from(foo)
2294 .project(vec![col("a")])?
2295 .filter(col("a").eq(col("bar.a")))?
2296 .build()?;
2297
2298 let outer_query = LogicalPlanBuilder::from(bar)
2299 .project(vec![col("a")])?
2300 .filter(exists(Arc::new(subquery)))?
2301 .build()?;
2302
2303 assert_snapshot!(outer_query, @r"
2304 Filter: EXISTS (<subquery>)
2305 Subquery:
2306 Filter: foo.a = bar.a
2307 Projection: foo.a
2308 TableScan: foo
2309 Projection: bar.a
2310 TableScan: bar
2311 ");
2312
2313 Ok(())
2314 }
2315
2316 #[test]
2317 fn filter_in_subquery() -> Result<()> {
2318 let foo = test_table_scan_with_name("foo")?;
2319 let bar = test_table_scan_with_name("bar")?;
2320
2321 let subquery = LogicalPlanBuilder::from(foo)
2322 .project(vec![col("a")])?
2323 .filter(col("a").eq(col("bar.a")))?
2324 .build()?;
2325
2326 let outer_query = LogicalPlanBuilder::from(bar)
2328 .project(vec![col("a")])?
2329 .filter(in_subquery(col("a"), Arc::new(subquery)))?
2330 .build()?;
2331
2332 assert_snapshot!(outer_query, @r"
2333 Filter: bar.a IN (<subquery>)
2334 Subquery:
2335 Filter: foo.a = bar.a
2336 Projection: foo.a
2337 TableScan: foo
2338 Projection: bar.a
2339 TableScan: bar
2340 ");
2341
2342 Ok(())
2343 }
2344
2345 #[test]
2346 fn select_scalar_subquery() -> Result<()> {
2347 let foo = test_table_scan_with_name("foo")?;
2348 let bar = test_table_scan_with_name("bar")?;
2349
2350 let subquery = LogicalPlanBuilder::from(foo)
2351 .project(vec![col("b")])?
2352 .filter(col("a").eq(col("bar.a")))?
2353 .build()?;
2354
2355 let outer_query = LogicalPlanBuilder::from(bar)
2357 .project(vec![scalar_subquery(Arc::new(subquery))])?
2358 .build()?;
2359
2360 assert_snapshot!(outer_query, @r"
2361 Projection: (<subquery>)
2362 Subquery:
2363 Filter: foo.a = bar.a
2364 Projection: foo.b
2365 TableScan: foo
2366 TableScan: bar
2367 ");
2368
2369 Ok(())
2370 }
2371
2372 #[test]
2373 fn projection_non_unique_names() -> Result<()> {
2374 let plan = table_scan(
2375 Some("employee_csv"),
2376 &employee_schema(),
2377 Some(vec![0, 1]),
2379 )?
2380 .project(vec![col("id"), col("first_name").alias("id")]);
2382
2383 match plan {
2384 Err(DataFusionError::SchemaError(err, _)) => {
2385 if let SchemaError::AmbiguousReference { field } = *err {
2386 let Column {
2387 relation,
2388 name,
2389 spans: _,
2390 } = *field;
2391 let Some(TableReference::Bare { table }) = relation else {
2392 return plan_err!(
2393 "wrong relation: {relation:?}, expected table name"
2394 );
2395 };
2396 assert_eq!(*"employee_csv", *table);
2397 assert_eq!("id", &name);
2398 Ok(())
2399 } else {
2400 plan_err!("Plan should have returned an DataFusionError::SchemaError")
2401 }
2402 }
2403 _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2404 }
2405 }
2406
2407 fn employee_schema() -> Schema {
2408 Schema::new(vec![
2409 Field::new("id", DataType::Int32, false),
2410 Field::new("first_name", DataType::Utf8, false),
2411 Field::new("last_name", DataType::Utf8, false),
2412 Field::new("state", DataType::Utf8, false),
2413 Field::new("salary", DataType::Int32, false),
2414 ])
2415 }
2416
2417 #[test]
2418 fn stringified_plan() {
2419 let stringified_plan =
2420 StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2421 assert!(stringified_plan.should_display(true));
2422 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2425 StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2426 assert!(stringified_plan.should_display(true));
2427 assert!(stringified_plan.should_display(false)); let stringified_plan =
2430 StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2431 assert!(stringified_plan.should_display(true));
2432 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2435 StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2436 assert!(stringified_plan.should_display(true));
2437 assert!(stringified_plan.should_display(false)); let stringified_plan = StringifiedPlan::new(
2440 PlanType::OptimizedLogicalPlan {
2441 optimizer_name: "random opt pass".into(),
2442 },
2443 "...the plan...",
2444 );
2445 assert!(stringified_plan.should_display(true));
2446 assert!(!stringified_plan.should_display(false));
2447 }
2448
2449 fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2450 let schema = Schema::new(vec![
2451 Field::new("a", DataType::UInt32, false),
2452 Field::new("b", DataType::UInt32, false),
2453 Field::new("c", DataType::UInt32, false),
2454 ]);
2455 table_scan(Some(name), &schema, None)?.build()
2456 }
2457
2458 #[test]
2459 fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2460 let plan1 =
2461 table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2462 let plan2 =
2463 table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2464
2465 let err_msg1 =
2466 LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2467 .unwrap_err();
2468
2469 assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2470
2471 Ok(())
2472 }
2473
2474 #[test]
2475 fn plan_builder_unnest() -> Result<()> {
2476 let err = nested_table_scan("test_table")?
2478 .unnest_column("scalar")
2479 .unwrap_err();
2480
2481 let DataFusionError::Internal(desc) = err else {
2482 return plan_err!("Plan should have returned an DataFusionError::Internal");
2483 };
2484
2485 let desc = desc
2486 .split(DataFusionError::BACK_TRACE_SEP)
2487 .collect::<Vec<&str>>()
2488 .first()
2489 .unwrap_or(&"")
2490 .to_string();
2491
2492 assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2493
2494 let plan = nested_table_scan("test_table")?
2496 .unnest_column("strings")?
2497 .build()?;
2498
2499 assert_snapshot!(plan, @r"
2500 Unnest: lists[test_table.strings|depth=1] structs[]
2501 TableScan: test_table
2502 ");
2503
2504 let field = plan.schema().field_with_name(None, "strings").unwrap();
2506 assert_eq!(&DataType::Utf8, field.data_type());
2507
2508 let plan = nested_table_scan("test_table")?
2510 .unnest_column("struct_singular")?
2511 .build()?;
2512
2513 assert_snapshot!(plan, @r"
2514 Unnest: lists[] structs[test_table.struct_singular]
2515 TableScan: test_table
2516 ");
2517
2518 for field_name in &["a", "b"] {
2519 let field = plan
2521 .schema()
2522 .field_with_name(None, &format!("struct_singular.{field_name}"))
2523 .unwrap();
2524 assert_eq!(&DataType::UInt32, field.data_type());
2525 }
2526
2527 let plan = nested_table_scan("test_table")?
2529 .unnest_column("strings")?
2530 .unnest_column("structs")?
2531 .unnest_column("struct_singular")?
2532 .build()?;
2533
2534 assert_snapshot!(plan, @r"
2535 Unnest: lists[] structs[test_table.struct_singular]
2536 Unnest: lists[test_table.structs|depth=1] structs[]
2537 Unnest: lists[test_table.strings|depth=1] structs[]
2538 TableScan: test_table
2539 ");
2540
2541 let field = plan.schema().field_with_name(None, "structs").unwrap();
2543 assert!(matches!(field.data_type(), DataType::Struct(_)));
2544
2545 let cols = vec!["strings", "structs", "struct_singular"]
2547 .into_iter()
2548 .map(|c| c.into())
2549 .collect();
2550
2551 let plan = nested_table_scan("test_table")?
2552 .unnest_columns_with_options(cols, UnnestOptions::default())?
2553 .build()?;
2554
2555 assert_snapshot!(plan, @r"
2556 Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2557 TableScan: test_table
2558 ");
2559
2560 let plan = nested_table_scan("test_table")?.unnest_column("missing");
2562 assert!(plan.is_err());
2563
2564 let plan = nested_table_scan("test_table")?
2566 .unnest_columns_with_options(
2567 vec!["stringss".into(), "struct_singular".into()],
2568 UnnestOptions::default()
2569 .with_recursions(RecursionUnnestOption {
2570 input_column: "stringss".into(),
2571 output_column: "stringss_depth_1".into(),
2572 depth: 1,
2573 })
2574 .with_recursions(RecursionUnnestOption {
2575 input_column: "stringss".into(),
2576 output_column: "stringss_depth_2".into(),
2577 depth: 2,
2578 }),
2579 )?
2580 .build()?;
2581
2582 assert_snapshot!(plan, @r"
2583 Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2584 TableScan: test_table
2585 ");
2586
2587 let field = plan
2589 .schema()
2590 .field_with_name(None, "stringss_depth_1")
2591 .unwrap();
2592 assert_eq!(
2593 &DataType::new_list(DataType::Utf8, false),
2594 field.data_type()
2595 );
2596 let field = plan
2597 .schema()
2598 .field_with_name(None, "stringss_depth_2")
2599 .unwrap();
2600 assert_eq!(&DataType::Utf8, field.data_type());
2601 for field_name in &["a", "b"] {
2603 let field = plan
2604 .schema()
2605 .field_with_name(None, &format!("struct_singular.{field_name}"))
2606 .unwrap();
2607 assert_eq!(&DataType::UInt32, field.data_type());
2608 }
2609
2610 Ok(())
2611 }
2612
2613 fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2614 let struct_field_in_list = Field::new_struct(
2617 "item",
2618 vec![
2619 Field::new("a", DataType::UInt32, false),
2620 Field::new("b", DataType::UInt32, false),
2621 ],
2622 false,
2623 );
2624 let string_field = Field::new_list_field(DataType::Utf8, false);
2625 let strings_field = Field::new_list("item", string_field.clone(), false);
2626 let schema = Schema::new(vec![
2627 Field::new("scalar", DataType::UInt32, false),
2628 Field::new_list("strings", string_field, false),
2629 Field::new_list("structs", struct_field_in_list, false),
2630 Field::new(
2631 "struct_singular",
2632 DataType::Struct(Fields::from(vec![
2633 Field::new("a", DataType::UInt32, false),
2634 Field::new("b", DataType::UInt32, false),
2635 ])),
2636 false,
2637 ),
2638 Field::new_list("stringss", strings_field, false),
2639 ]);
2640
2641 table_scan(Some(table_name), &schema, None)
2642 }
2643
2644 #[test]
2645 fn test_union_after_join() -> Result<()> {
2646 let values = vec![vec![lit(1)]];
2647
2648 let left = LogicalPlanBuilder::values(values.clone())?
2649 .alias("left")?
2650 .build()?;
2651 let right = LogicalPlanBuilder::values(values)?
2652 .alias("right")?
2653 .build()?;
2654
2655 let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2656
2657 let plan = LogicalPlanBuilder::from(join.clone())
2658 .union(join)?
2659 .build()?;
2660
2661 assert_snapshot!(plan, @r"
2662 Union
2663 Cross Join:
2664 SubqueryAlias: left
2665 Values: (Int32(1))
2666 SubqueryAlias: right
2667 Values: (Int32(1))
2668 Cross Join:
2669 SubqueryAlias: left
2670 Values: (Int32(1))
2671 SubqueryAlias: right
2672 Values: (Int32(1))
2673 ");
2674
2675 Ok(())
2676 }
2677
2678 #[test]
2679 fn test_change_redundant_column() -> Result<()> {
2680 let t1_field_1 = Field::new("a", DataType::Int32, false);
2681 let t2_field_1 = Field::new("a", DataType::Int32, false);
2682 let t2_field_3 = Field::new("a", DataType::Int32, false);
2683 let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2684 let t1_field_2 = Field::new("b", DataType::Int32, false);
2685 let t2_field_2 = Field::new("b", DataType::Int32, false);
2686
2687 let field_vec = vec![
2688 t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2689 ];
2690 let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2691
2692 assert_eq!(
2693 remove_redundant,
2694 vec![
2695 Field::new("a", DataType::Int32, false),
2696 Field::new("a:1", DataType::Int32, false),
2697 Field::new("b", DataType::Int32, false),
2698 Field::new("b:1", DataType::Int32, false),
2699 Field::new("a:2", DataType::Int32, false),
2700 Field::new("a:1:1", DataType::Int32, false),
2701 ]
2702 );
2703 Ok(())
2704 }
2705
2706 #[test]
2707 fn plan_builder_from_logical_plan() -> Result<()> {
2708 let plan =
2709 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2710 .sort(vec![
2711 expr::Sort::new(col("state"), true, true),
2712 expr::Sort::new(col("salary"), false, false),
2713 ])?
2714 .build()?;
2715
2716 let plan_expected = format!("{plan}");
2717 let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2718 assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2719
2720 Ok(())
2721 }
2722
2723 #[test]
2724 fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2725 let constraints =
2726 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2727 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2728
2729 let plan =
2730 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2731 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2732 .build()?;
2733
2734 assert_snapshot!(plan, @r"
2735 Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2736 TableScan: employee_csv projection=[id, state, salary]
2737 ");
2738
2739 Ok(())
2740 }
2741
2742 #[test]
2743 fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2744 let constraints =
2745 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2746 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2747
2748 let options =
2749 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2750 let plan =
2751 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2752 .with_options(options)
2753 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2754 .build()?;
2755
2756 assert_snapshot!(plan, @r"
2757 Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2758 TableScan: employee_csv projection=[id, state, salary]
2759 ");
2760
2761 Ok(())
2762 }
2763
2764 #[test]
2765 fn test_join_metadata() -> Result<()> {
2766 let left_schema = DFSchema::new_with_metadata(
2767 vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2768 HashMap::from([("key".to_string(), "left".to_string())]),
2769 )?;
2770 let right_schema = DFSchema::new_with_metadata(
2771 vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2772 HashMap::from([("key".to_string(), "right".to_string())]),
2773 )?;
2774
2775 let join_schema =
2776 build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2777 assert_eq!(
2778 join_schema.metadata(),
2779 &HashMap::from([("key".to_string(), "left".to_string())])
2780 );
2781 let join_schema =
2782 build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2783 assert_eq!(
2784 join_schema.metadata(),
2785 &HashMap::from([("key".to_string(), "right".to_string())])
2786 );
2787
2788 Ok(())
2789 }
2790}