1use std::any::Any;
21use std::borrow::Cow;
22use std::cmp::Ordering;
23use std::collections::{HashMap, HashSet};
24use std::iter::once;
25use std::sync::Arc;
26
27use crate::dml::CopyTo;
28use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
29use crate::expr_rewriter::{
30 coerce_plan_expr_for_schema, normalize_col,
31 normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
32 rewrite_sort_cols_by_aggs,
33};
34use crate::logical_plan::{
35 Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
36 JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
37 Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
38 Window,
39};
40use crate::select_expr::SelectExpr;
41use crate::utils::{
42 can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
43 expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
44 group_window_expr_by_sort_keys,
45};
46use crate::{
47 DmlStatement, ExplainOption, Expr, ExprSchemable, Operator, RecursiveQuery,
48 Statement, TableProviderFilterPushDown, TableSource, WriteOp, and, binary_expr, lit,
49};
50
51use super::dml::InsertOp;
52use arrow::compute::can_cast_types;
53use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
54use datafusion_common::display::ToStringifiedPlan;
55use datafusion_common::file_options::file_type::FileType;
56use datafusion_common::metadata::FieldMetadata;
57use datafusion_common::{
58 Column, Constraints, DFSchema, DFSchemaRef, NullEquality, Result, ScalarValue,
59 TableReference, ToDFSchema, UnnestOptions, exec_err,
60 get_target_functional_dependencies, internal_datafusion_err, plan_datafusion_err,
61 plan_err,
62};
63use datafusion_expr_common::type_coercion::binary::type_union_resolution;
64
65use indexmap::IndexSet;
66
67pub const UNNAMED_TABLE: &str = "?table?";
69
70#[derive(Default, Debug, Clone)]
72pub struct LogicalPlanBuilderOptions {
73 add_implicit_group_by_exprs: bool,
76}
77
78impl LogicalPlanBuilderOptions {
79 pub fn new() -> Self {
80 Default::default()
81 }
82
83 pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
85 self.add_implicit_group_by_exprs = add;
86 self
87 }
88}
89
90#[derive(Debug, Clone)]
128pub struct LogicalPlanBuilder {
129 plan: Arc<LogicalPlan>,
130 options: LogicalPlanBuilderOptions,
131}
132
133impl LogicalPlanBuilder {
134 pub fn new(plan: LogicalPlan) -> Self {
136 Self {
137 plan: Arc::new(plan),
138 options: LogicalPlanBuilderOptions::default(),
139 }
140 }
141
142 pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
144 Self {
145 plan,
146 options: LogicalPlanBuilderOptions::default(),
147 }
148 }
149
150 pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
151 self.options = options;
152 self
153 }
154
155 pub fn schema(&self) -> &DFSchemaRef {
157 self.plan.schema()
158 }
159
160 pub fn plan(&self) -> &LogicalPlan {
162 &self.plan
163 }
164
165 pub fn empty(produce_one_row: bool) -> Self {
169 Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
170 produce_one_row,
171 schema: DFSchemaRef::new(DFSchema::empty()),
172 }))
173 }
174
175 pub fn to_recursive_query(
178 self,
179 name: String,
180 recursive_term: LogicalPlan,
181 is_distinct: bool,
182 ) -> Result<Self> {
183 let static_fields_len = self.plan.schema().fields().len();
185 let recursive_fields_len = recursive_term.schema().fields().len();
186 if static_fields_len != recursive_fields_len {
187 return plan_err!(
188 "Non-recursive term and recursive term must have the same number of columns ({} != {})",
189 static_fields_len,
190 recursive_fields_len
191 );
192 }
193 let coerced_recursive_term =
195 coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
196 Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
197 name,
198 static_term: self.plan,
199 recursive_term: Arc::new(coerced_recursive_term),
200 is_distinct,
201 })))
202 }
203
204 pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
212 if values.is_empty() {
213 return plan_err!("Values list cannot be empty");
214 }
215 let n_cols = values[0].len();
216 if n_cols == 0 {
217 return plan_err!("Values list cannot be zero length");
218 }
219 for (i, row) in values.iter().enumerate() {
220 if row.len() != n_cols {
221 return plan_err!(
222 "Inconsistent data length across values list: got {} values in row {} but expected {}",
223 row.len(),
224 i,
225 n_cols
226 );
227 }
228 }
229
230 Self::infer_data(values)
232 }
233
234 pub fn values_with_schema(
244 values: Vec<Vec<Expr>>,
245 schema: &DFSchemaRef,
246 ) -> Result<Self> {
247 if values.is_empty() {
248 return plan_err!("Values list cannot be empty");
249 }
250 let n_cols = schema.fields().len();
251 if n_cols == 0 {
252 return plan_err!("Values list cannot be zero length");
253 }
254 for (i, row) in values.iter().enumerate() {
255 if row.len() != n_cols {
256 return plan_err!(
257 "Inconsistent data length across values list: got {} values in row {} but expected {}",
258 row.len(),
259 i,
260 n_cols
261 );
262 }
263 }
264
265 Self::infer_values_from_schema(values, schema)
267 }
268
269 fn infer_values_from_schema(
270 values: Vec<Vec<Expr>>,
271 schema: &DFSchema,
272 ) -> Result<Self> {
273 let n_cols = values[0].len();
274 let mut fields = ValuesFields::new();
275 for j in 0..n_cols {
276 let field_type = schema.field(j).data_type();
277 let field_nullable = schema.field(j).is_nullable();
278 for row in values.iter() {
279 let value = &row[j];
280 let data_type = value.get_type(schema)?;
281
282 if !data_type.equals_datatype(field_type)
283 && !can_cast_types(&data_type, field_type)
284 {
285 return exec_err!(
286 "type mismatch and can't cast to got {} and {}",
287 data_type,
288 field_type
289 );
290 }
291 }
292 fields.push(field_type.to_owned(), field_nullable);
293 }
294
295 Self::infer_inner(values, fields, schema)
296 }
297
298 fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
299 let n_cols = values[0].len();
300 let schema = DFSchema::empty();
301 let mut fields = ValuesFields::new();
302
303 for j in 0..n_cols {
304 let mut common_type: Option<DataType> = None;
305 let mut common_metadata: Option<FieldMetadata> = None;
306 for (i, row) in values.iter().enumerate() {
307 let value = &row[j];
308 let metadata = value.metadata(&schema)?;
309 if let Some(ref cm) = common_metadata {
310 if &metadata != cm {
311 return plan_err!(
312 "Inconsistent metadata across values list at row {i} column {j}. Was {:?} but found {:?}",
313 cm,
314 metadata
315 );
316 }
317 } else {
318 common_metadata = Some(metadata.clone());
319 }
320 let data_type = value.get_type(&schema)?;
321 if data_type == DataType::Null {
322 continue;
323 }
324
325 if let Some(prev_type) = common_type {
326 let data_types = vec![prev_type.clone(), data_type.clone()];
328 let Some(new_type) = type_union_resolution(&data_types) else {
329 return plan_err!(
330 "Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}"
331 );
332 };
333 common_type = Some(new_type);
334 } else {
335 common_type = Some(data_type);
336 }
337 }
338 fields.push_with_metadata(
341 common_type.unwrap_or(DataType::Null),
342 true,
343 common_metadata,
344 );
345 }
346
347 Self::infer_inner(values, fields, &schema)
348 }
349
350 fn infer_inner(
351 mut values: Vec<Vec<Expr>>,
352 fields: ValuesFields,
353 schema: &DFSchema,
354 ) -> Result<Self> {
355 let fields = fields.into_fields();
356 for row in &mut values {
358 for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
359 if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
360 row[j] = Expr::Literal(
361 ScalarValue::try_from(field_type)?,
362 metadata.clone(),
363 );
364 } else {
365 row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
366 }
367 }
368 }
369
370 let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
371 let schema = DFSchemaRef::new(dfschema);
372
373 Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
374 }
375
376 pub fn scan(
409 table_name: impl Into<TableReference>,
410 table_source: Arc<dyn TableSource>,
411 projection: Option<Vec<usize>>,
412 ) -> Result<Self> {
413 Self::scan_with_filters(table_name, table_source, projection, vec![])
414 }
415
416 pub fn copy_to(
418 input: LogicalPlan,
419 output_url: String,
420 file_type: Arc<dyn FileType>,
421 options: HashMap<String, String>,
422 partition_by: Vec<String>,
423 ) -> Result<Self> {
424 Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
425 Arc::new(input),
426 output_url,
427 partition_by,
428 file_type,
429 options,
430 ))))
431 }
432
433 pub fn insert_into(
467 input: LogicalPlan,
468 table_name: impl Into<TableReference>,
469 target: Arc<dyn TableSource>,
470 insert_op: InsertOp,
471 ) -> Result<Self> {
472 Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
473 table_name.into(),
474 target,
475 WriteOp::Insert(insert_op),
476 Arc::new(input),
477 ))))
478 }
479
480 pub fn scan_with_filters(
482 table_name: impl Into<TableReference>,
483 table_source: Arc<dyn TableSource>,
484 projection: Option<Vec<usize>>,
485 filters: Vec<Expr>,
486 ) -> Result<Self> {
487 Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
488 }
489
490 pub fn scan_with_filters_fetch(
492 table_name: impl Into<TableReference>,
493 table_source: Arc<dyn TableSource>,
494 projection: Option<Vec<usize>>,
495 filters: Vec<Expr>,
496 fetch: Option<usize>,
497 ) -> Result<Self> {
498 Self::scan_with_filters_inner(
499 table_name,
500 table_source,
501 projection,
502 filters,
503 fetch,
504 )
505 }
506
507 fn scan_with_filters_inner(
508 table_name: impl Into<TableReference>,
509 table_source: Arc<dyn TableSource>,
510 projection: Option<Vec<usize>>,
511 filters: Vec<Expr>,
512 fetch: Option<usize>,
513 ) -> Result<Self> {
514 let table_scan =
515 TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
516
517 if table_scan.filters.is_empty()
519 && let Some(p) = table_scan.source.get_logical_plan()
520 {
521 let sub_plan = p.into_owned();
522
523 if let Some(proj) = table_scan.projection {
524 let projection_exprs = proj
525 .into_iter()
526 .map(|i| {
527 Expr::Column(Column::from(sub_plan.schema().qualified_field(i)))
528 })
529 .collect::<Vec<_>>();
530 return Self::new(sub_plan)
531 .project(projection_exprs)?
532 .alias(table_scan.table_name);
533 }
534
535 return Self::new(sub_plan).alias(table_scan.table_name);
539 }
540
541 Ok(Self::new(LogicalPlan::TableScan(table_scan)))
542 }
543
544 pub fn window_plan(
546 input: LogicalPlan,
547 window_exprs: impl IntoIterator<Item = Expr>,
548 ) -> Result<LogicalPlan> {
549 let mut plan = input;
550 let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
551 groups.sort_by(|(key_a, _), (key_b, _)| {
557 for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
558 let key_ordering = compare_sort_expr(first, second, plan.schema());
559 match key_ordering {
560 Ordering::Less => {
561 return Ordering::Less;
562 }
563 Ordering::Greater => {
564 return Ordering::Greater;
565 }
566 Ordering::Equal => {}
567 }
568 }
569 key_b.len().cmp(&key_a.len())
570 });
571 for (_, exprs) in groups {
572 let window_exprs = exprs.into_iter().collect::<Vec<_>>();
573 plan = LogicalPlanBuilder::from(plan)
576 .window(window_exprs)?
577 .build()?;
578 }
579 Ok(plan)
580 }
581
582 pub fn project(
584 self,
585 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
586 ) -> Result<Self> {
587 project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
588 }
589
590 pub fn project_with_validation(
593 self,
594 expr: Vec<(impl Into<SelectExpr>, bool)>,
595 ) -> Result<Self> {
596 project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
597 }
598
599 pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
601 let exprs: Vec<_> = indices
602 .into_iter()
603 .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
604 .collect();
605 self.project(exprs)
606 }
607
608 pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
610 let expr = normalize_col(expr.into(), &self.plan)?;
611 Filter::try_new(expr, self.plan)
612 .map(LogicalPlan::Filter)
613 .map(Self::new)
614 }
615
616 pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
618 let expr = normalize_col(expr.into(), &self.plan)?;
619 Filter::try_new(expr, self.plan)
620 .map(LogicalPlan::Filter)
621 .map(Self::from)
622 }
623
624 pub fn prepare(self, name: String, fields: Vec<FieldRef>) -> Result<Self> {
626 Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
627 Prepare {
628 name,
629 fields,
630 input: self.plan,
631 },
632 ))))
633 }
634
635 pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
642 let skip_expr = if skip == 0 {
643 None
644 } else {
645 Some(lit(skip as i64))
646 };
647 let fetch_expr = fetch.map(|f| lit(f as i64));
648 self.limit_by_expr(skip_expr, fetch_expr)
649 }
650
651 pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
655 Ok(Self::new(LogicalPlan::Limit(Limit {
656 skip: skip.map(Box::new),
657 fetch: fetch.map(Box::new),
658 input: self.plan,
659 })))
660 }
661
662 pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
664 subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
665 }
666
667 fn add_missing_columns(
696 curr_plan: LogicalPlan,
697 missing_cols: &IndexSet<Column>,
698 is_distinct: bool,
699 ) -> Result<LogicalPlan> {
700 match curr_plan {
701 LogicalPlan::Projection(Projection {
702 input,
703 mut expr,
704 schema: _,
705 }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
706 let mut missing_exprs = missing_cols
707 .iter()
708 .map(|c| normalize_col(Expr::Column(c.clone()), &input))
709 .collect::<Result<Vec<_>>>()?;
710
711 missing_exprs.retain(|e| !expr.contains(e));
715 if is_distinct {
716 Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
717 }
718 expr.extend(missing_exprs);
719 project(Arc::unwrap_or_clone(input), expr)
720 }
721 _ => {
722 let is_distinct =
723 is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
724 let new_inputs = curr_plan
725 .inputs()
726 .into_iter()
727 .map(|input_plan| {
728 Self::add_missing_columns(
729 (*input_plan).clone(),
730 missing_cols,
731 is_distinct,
732 )
733 })
734 .collect::<Result<Vec<_>>>()?;
735 curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
736 }
737 }
738 }
739
740 fn ambiguous_distinct_check(
741 missing_exprs: &[Expr],
742 missing_cols: &IndexSet<Column>,
743 projection_exprs: &[Expr],
744 ) -> Result<()> {
745 if missing_exprs.is_empty() {
746 return Ok(());
747 }
748
749 let all_aliases = missing_exprs.iter().all(|e| {
757 projection_exprs.iter().any(|proj_expr| {
758 if let Expr::Alias(Alias { expr, .. }) = proj_expr {
759 e == expr.as_ref()
760 } else {
761 false
762 }
763 })
764 });
765 if all_aliases {
766 return Ok(());
767 }
768
769 let missing_col_names = missing_cols
770 .iter()
771 .map(|col| col.flat_name())
772 .collect::<String>();
773
774 plan_err!(
775 "For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list"
776 )
777 }
778
779 pub fn sort_by(
781 self,
782 expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
783 ) -> Result<Self> {
784 self.sort(
785 expr.into_iter()
786 .map(|e| e.into().sort(true, false))
787 .collect::<Vec<SortExpr>>(),
788 )
789 }
790
791 pub fn sort(
792 self,
793 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
794 ) -> Result<Self> {
795 self.sort_with_limit(sorts, None)
796 }
797
798 pub fn sort_with_limit(
800 self,
801 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
802 fetch: Option<usize>,
803 ) -> Result<Self> {
804 let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
805
806 let schema = self.plan.schema();
807
808 let mut missing_cols: IndexSet<Column> = IndexSet::new();
810 sorts.iter().try_for_each::<_, Result<()>>(|sort| {
811 let columns = sort.expr.column_refs();
812
813 missing_cols.extend(
814 columns
815 .into_iter()
816 .filter(|c| !schema.has_column(c))
817 .cloned(),
818 );
819
820 Ok(())
821 })?;
822
823 if missing_cols.is_empty() {
824 return Ok(Self::new(LogicalPlan::Sort(Sort {
825 expr: normalize_sorts(sorts, &self.plan)?,
826 input: self.plan,
827 fetch,
828 })));
829 }
830
831 let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
833
834 let is_distinct = false;
835 let plan = Self::add_missing_columns(
836 Arc::unwrap_or_clone(self.plan),
837 &missing_cols,
838 is_distinct,
839 )?;
840
841 let sort_plan = LogicalPlan::Sort(Sort {
842 expr: normalize_sorts(sorts, &plan)?,
843 input: Arc::new(plan),
844 fetch,
845 });
846
847 Projection::try_new(new_expr, Arc::new(sort_plan))
848 .map(LogicalPlan::Projection)
849 .map(Self::new)
850 }
851
852 pub fn union(self, plan: LogicalPlan) -> Result<Self> {
854 union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
855 }
856
857 pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
859 union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
860 }
861
862 pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
864 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
865 let right_plan: LogicalPlan = plan;
866
867 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
868 union_by_name(left_plan, right_plan)?,
869 )))))
870 }
871
872 pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
874 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
875 let right_plan: LogicalPlan = plan;
876
877 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
878 union(left_plan, right_plan)?,
879 )))))
880 }
881
882 pub fn distinct(self) -> Result<Self> {
884 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
885 }
886
887 pub fn distinct_on(
890 self,
891 on_expr: Vec<Expr>,
892 select_expr: Vec<Expr>,
893 sort_expr: Option<Vec<SortExpr>>,
894 ) -> Result<Self> {
895 Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
896 DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
897 ))))
898 }
899
900 pub fn join(
914 self,
915 right: LogicalPlan,
916 join_type: JoinType,
917 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
918 filter: Option<Expr>,
919 ) -> Result<Self> {
920 self.join_detailed(
921 right,
922 join_type,
923 join_keys,
924 filter,
925 NullEquality::NullEqualsNothing,
926 )
927 }
928
929 pub fn join_on(
970 self,
971 right: LogicalPlan,
972 join_type: JoinType,
973 on_exprs: impl IntoIterator<Item = Expr>,
974 ) -> Result<Self> {
975 let filter = on_exprs.into_iter().reduce(Expr::and);
976
977 self.join_detailed(
978 right,
979 join_type,
980 (Vec::<Column>::new(), Vec::<Column>::new()),
981 filter,
982 NullEquality::NullEqualsNothing,
983 )
984 }
985
986 pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
987 if column.relation.is_some() {
988 return Ok(column);
990 }
991
992 let schema = plan.schema();
993 let fallback_schemas = plan.fallback_normalize_schemas();
994 let using_columns = plan.using_columns()?;
995 column.normalize_with_schemas_and_ambiguity_check(
996 &[&[schema], &fallback_schemas],
997 &using_columns,
998 )
999 }
1000
1001 pub fn join_detailed(
1008 self,
1009 right: LogicalPlan,
1010 join_type: JoinType,
1011 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1012 filter: Option<Expr>,
1013 null_equality: NullEquality,
1014 ) -> Result<Self> {
1015 self.join_detailed_with_options(
1016 right,
1017 join_type,
1018 join_keys,
1019 filter,
1020 null_equality,
1021 false,
1022 )
1023 }
1024
1025 pub fn join_detailed_with_options(
1026 self,
1027 right: LogicalPlan,
1028 join_type: JoinType,
1029 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1030 filter: Option<Expr>,
1031 null_equality: NullEquality,
1032 null_aware: bool,
1033 ) -> Result<Self> {
1034 if join_keys.0.len() != join_keys.1.len() {
1035 return plan_err!("left_keys and right_keys were not the same length");
1036 }
1037
1038 let filter = if let Some(expr) = filter {
1039 let filter = normalize_col_with_schemas_and_ambiguity_check(
1040 expr,
1041 &[&[self.schema(), right.schema()]],
1042 &[],
1043 )?;
1044 Some(filter)
1045 } else {
1046 None
1047 };
1048
1049 let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1050 join_keys
1051 .0
1052 .into_iter()
1053 .zip(join_keys.1)
1054 .map(|(l, r)| {
1055 let l = l.into();
1056 let r = r.into();
1057
1058 match (&l.relation, &r.relation) {
1059 (Some(lr), Some(rr)) => {
1060 let l_is_left =
1061 self.plan.schema().field_with_qualified_name(lr, &l.name);
1062 let l_is_right =
1063 right.schema().field_with_qualified_name(lr, &l.name);
1064 let r_is_left =
1065 self.plan.schema().field_with_qualified_name(rr, &r.name);
1066 let r_is_right =
1067 right.schema().field_with_qualified_name(rr, &r.name);
1068
1069 match (l_is_left, l_is_right, r_is_left, r_is_right) {
1070 (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1071 (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1072 _ => (
1073 Self::normalize(&self.plan, l),
1074 Self::normalize(&right, r),
1075 ),
1076 }
1077 }
1078 (Some(lr), None) => {
1079 let l_is_left =
1080 self.plan.schema().field_with_qualified_name(lr, &l.name);
1081 let l_is_right =
1082 right.schema().field_with_qualified_name(lr, &l.name);
1083
1084 match (l_is_left, l_is_right) {
1085 (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1086 (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1087 _ => (
1088 Self::normalize(&self.plan, l),
1089 Self::normalize(&right, r),
1090 ),
1091 }
1092 }
1093 (None, Some(rr)) => {
1094 let r_is_left =
1095 self.plan.schema().field_with_qualified_name(rr, &r.name);
1096 let r_is_right =
1097 right.schema().field_with_qualified_name(rr, &r.name);
1098
1099 match (r_is_left, r_is_right) {
1100 (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1101 (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1102 _ => (
1103 Self::normalize(&self.plan, l),
1104 Self::normalize(&right, r),
1105 ),
1106 }
1107 }
1108 (None, None) => {
1109 let mut swap = false;
1110 let left_key = Self::normalize(&self.plan, l.clone())
1111 .or_else(|_| {
1112 swap = true;
1113 Self::normalize(&right, l)
1114 });
1115 if swap {
1116 (Self::normalize(&self.plan, r), left_key)
1117 } else {
1118 (left_key, Self::normalize(&right, r))
1119 }
1120 }
1121 }
1122 })
1123 .unzip();
1124
1125 let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1126 let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1127
1128 let on: Vec<_> = left_keys
1129 .into_iter()
1130 .zip(right_keys)
1131 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1132 .collect();
1133 let join_schema =
1134 build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1135
1136 if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1138 return plan_err!("join condition should not be empty");
1139 }
1140
1141 Ok(Self::new(LogicalPlan::Join(Join {
1142 left: self.plan,
1143 right: Arc::new(right),
1144 on,
1145 filter,
1146 join_type,
1147 join_constraint: JoinConstraint::On,
1148 schema: DFSchemaRef::new(join_schema),
1149 null_equality,
1150 null_aware,
1151 })))
1152 }
1153
1154 pub fn join_using(
1156 self,
1157 right: LogicalPlan,
1158 join_type: JoinType,
1159 using_keys: Vec<Column>,
1160 ) -> Result<Self> {
1161 let left_keys: Vec<Column> = using_keys
1162 .clone()
1163 .into_iter()
1164 .map(|c| Self::normalize(&self.plan, c))
1165 .collect::<Result<_>>()?;
1166 let right_keys: Vec<Column> = using_keys
1167 .into_iter()
1168 .map(|c| Self::normalize(&right, c))
1169 .collect::<Result<_>>()?;
1170
1171 let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1172 let mut join_on: Vec<(Expr, Expr)> = vec![];
1173 let mut filters: Option<Expr> = None;
1174 for (l, r) in &on {
1175 if self.plan.schema().has_column(l)
1176 && right.schema().has_column(r)
1177 && can_hash(
1178 datafusion_common::ExprSchema::field_from_column(
1179 self.plan.schema(),
1180 l,
1181 )?
1182 .data_type(),
1183 )
1184 {
1185 join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1186 } else if self.plan.schema().has_column(l)
1187 && right.schema().has_column(r)
1188 && can_hash(
1189 datafusion_common::ExprSchema::field_from_column(
1190 self.plan.schema(),
1191 r,
1192 )?
1193 .data_type(),
1194 )
1195 {
1196 join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1197 } else {
1198 let expr = binary_expr(
1199 Expr::Column(l.clone()),
1200 Operator::Eq,
1201 Expr::Column(r.clone()),
1202 );
1203 match filters {
1204 None => filters = Some(expr),
1205 Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1206 }
1207 }
1208 }
1209
1210 if join_on.is_empty() {
1211 let join = Self::from(self.plan).cross_join(right)?;
1212 join.filter(filters.ok_or_else(|| {
1213 internal_datafusion_err!("filters should not be None here")
1214 })?)
1215 } else {
1216 let join = Join::try_new(
1217 self.plan,
1218 Arc::new(right),
1219 join_on,
1220 filters,
1221 join_type,
1222 JoinConstraint::Using,
1223 NullEquality::NullEqualsNothing,
1224 false, )?;
1226
1227 Ok(Self::new(LogicalPlan::Join(join)))
1228 }
1229 }
1230
1231 pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1233 let join = Join::try_new(
1234 self.plan,
1235 Arc::new(right),
1236 vec![],
1237 None,
1238 JoinType::Inner,
1239 JoinConstraint::On,
1240 NullEquality::NullEqualsNothing,
1241 false, )?;
1243
1244 Ok(Self::new(LogicalPlan::Join(join)))
1245 }
1246
1247 pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1249 Ok(Self::new(LogicalPlan::Repartition(Repartition {
1250 input: self.plan,
1251 partitioning_scheme,
1252 })))
1253 }
1254
1255 pub fn window(
1257 self,
1258 window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1259 ) -> Result<Self> {
1260 let window_expr = normalize_cols(window_expr, &self.plan)?;
1261 validate_unique_names("Windows", &window_expr)?;
1262 Ok(Self::new(LogicalPlan::Window(Window::try_new(
1263 window_expr,
1264 self.plan,
1265 )?)))
1266 }
1267
1268 pub fn aggregate(
1272 self,
1273 group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1274 aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1275 ) -> Result<Self> {
1276 let group_expr = normalize_cols(group_expr, &self.plan)?;
1277 let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1278
1279 let group_expr = if self.options.add_implicit_group_by_exprs {
1280 add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1281 } else {
1282 group_expr
1283 };
1284
1285 Aggregate::try_new(self.plan, group_expr, aggr_expr)
1286 .map(LogicalPlan::Aggregate)
1287 .map(Self::new)
1288 }
1289
1290 pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1297 self.explain_option_format(
1299 ExplainOption::default()
1300 .with_verbose(verbose)
1301 .with_analyze(analyze),
1302 )
1303 }
1304
1305 pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1309 let schema = LogicalPlan::explain_schema();
1310 let schema = schema.to_dfschema_ref()?;
1311
1312 if explain_option.analyze {
1313 Ok(Self::new(LogicalPlan::Analyze(Analyze {
1314 verbose: explain_option.verbose,
1315 input: self.plan,
1316 schema,
1317 })))
1318 } else {
1319 let stringified_plans =
1320 vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1321
1322 Ok(Self::new(LogicalPlan::Explain(Explain {
1323 verbose: explain_option.verbose,
1324 plan: self.plan,
1325 explain_format: explain_option.format,
1326 stringified_plans,
1327 schema,
1328 logical_optimization_succeeded: false,
1329 })))
1330 }
1331 }
1332
1333 pub fn intersect(
1335 left_plan: LogicalPlan,
1336 right_plan: LogicalPlan,
1337 is_all: bool,
1338 ) -> Result<LogicalPlan> {
1339 LogicalPlanBuilder::intersect_or_except(
1340 left_plan,
1341 right_plan,
1342 JoinType::LeftSemi,
1343 is_all,
1344 )
1345 }
1346
1347 pub fn except(
1349 left_plan: LogicalPlan,
1350 right_plan: LogicalPlan,
1351 is_all: bool,
1352 ) -> Result<LogicalPlan> {
1353 LogicalPlanBuilder::intersect_or_except(
1354 left_plan,
1355 right_plan,
1356 JoinType::LeftAnti,
1357 is_all,
1358 )
1359 }
1360
1361 fn intersect_or_except(
1363 left_plan: LogicalPlan,
1364 right_plan: LogicalPlan,
1365 join_type: JoinType,
1366 is_all: bool,
1367 ) -> Result<LogicalPlan> {
1368 let left_len = left_plan.schema().fields().len();
1369 let right_len = right_plan.schema().fields().len();
1370
1371 if left_len != right_len {
1372 return plan_err!(
1373 "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1374 );
1375 }
1376
1377 let left_builder = LogicalPlanBuilder::from(left_plan);
1380 let right_builder = LogicalPlanBuilder::from(right_plan);
1381 let (left_builder, right_builder, _requalified) =
1382 requalify_sides_if_needed(left_builder, right_builder)?;
1383 let left_plan = left_builder.build()?;
1384 let right_plan = right_builder.build()?;
1385
1386 let join_keys = left_plan
1387 .schema()
1388 .fields()
1389 .iter()
1390 .zip(right_plan.schema().fields().iter())
1391 .map(|(left_field, right_field)| {
1392 (
1393 (Column::from_name(left_field.name())),
1394 (Column::from_name(right_field.name())),
1395 )
1396 })
1397 .unzip();
1398 if is_all {
1399 LogicalPlanBuilder::from(left_plan)
1400 .join_detailed(
1401 right_plan,
1402 join_type,
1403 join_keys,
1404 None,
1405 NullEquality::NullEqualsNull,
1406 )?
1407 .build()
1408 } else {
1409 LogicalPlanBuilder::from(left_plan)
1410 .distinct()?
1411 .join_detailed(
1412 right_plan,
1413 join_type,
1414 join_keys,
1415 None,
1416 NullEquality::NullEqualsNull,
1417 )?
1418 .build()
1419 }
1420 }
1421
1422 pub fn build(self) -> Result<LogicalPlan> {
1424 Ok(Arc::unwrap_or_clone(self.plan))
1425 }
1426
1427 pub fn join_with_expr_keys(
1442 self,
1443 right: LogicalPlan,
1444 join_type: JoinType,
1445 equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1446 filter: Option<Expr>,
1447 ) -> Result<Self> {
1448 if equi_exprs.0.len() != equi_exprs.1.len() {
1449 return plan_err!("left_keys and right_keys were not the same length");
1450 }
1451
1452 let join_key_pairs = equi_exprs
1453 .0
1454 .into_iter()
1455 .zip(equi_exprs.1)
1456 .map(|(l, r)| {
1457 let left_key = l.into();
1458 let right_key = r.into();
1459 let mut left_using_columns = HashSet::new();
1460 expr_to_columns(&left_key, &mut left_using_columns)?;
1461 let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1462 left_key,
1463 &[&[self.plan.schema()]],
1464 &[],
1465 )?;
1466
1467 let mut right_using_columns = HashSet::new();
1468 expr_to_columns(&right_key, &mut right_using_columns)?;
1469 let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1470 right_key,
1471 &[&[right.schema()]],
1472 &[],
1473 )?;
1474
1475 find_valid_equijoin_key_pair(
1477 &normalized_left_key,
1478 &normalized_right_key,
1479 self.plan.schema(),
1480 right.schema(),
1481 )?.ok_or_else(||
1482 plan_datafusion_err!(
1483 "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1484 ))
1485 })
1486 .collect::<Result<Vec<_>>>()?;
1487
1488 let join = Join::try_new(
1489 self.plan,
1490 Arc::new(right),
1491 join_key_pairs,
1492 filter,
1493 join_type,
1494 JoinConstraint::On,
1495 NullEquality::NullEqualsNothing,
1496 false, )?;
1498
1499 Ok(Self::new(LogicalPlan::Join(join)))
1500 }
1501
1502 pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1504 unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1505 }
1506
1507 pub fn unnest_column_with_options(
1509 self,
1510 column: impl Into<Column>,
1511 options: UnnestOptions,
1512 ) -> Result<Self> {
1513 unnest_with_options(
1514 Arc::unwrap_or_clone(self.plan),
1515 vec![column.into()],
1516 options,
1517 )
1518 .map(Self::new)
1519 }
1520
1521 pub fn unnest_columns_with_options(
1523 self,
1524 columns: Vec<Column>,
1525 options: UnnestOptions,
1526 ) -> Result<Self> {
1527 unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1528 .map(Self::new)
1529 }
1530}
1531
1532impl From<LogicalPlan> for LogicalPlanBuilder {
1533 fn from(plan: LogicalPlan) -> Self {
1534 LogicalPlanBuilder::new(plan)
1535 }
1536}
1537
1538impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1539 fn from(plan: Arc<LogicalPlan>) -> Self {
1540 LogicalPlanBuilder::new_from_arc(plan)
1541 }
1542}
1543
1544#[derive(Default)]
1546struct ValuesFields {
1547 inner: Vec<Field>,
1548}
1549
1550impl ValuesFields {
1551 pub fn new() -> Self {
1552 Self::default()
1553 }
1554
1555 pub fn push(&mut self, data_type: DataType, nullable: bool) {
1556 self.push_with_metadata(data_type, nullable, None);
1557 }
1558
1559 pub fn push_with_metadata(
1560 &mut self,
1561 data_type: DataType,
1562 nullable: bool,
1563 metadata: Option<FieldMetadata>,
1564 ) {
1565 let name = format!("column{}", self.inner.len() + 1);
1568 let mut field = Field::new(name, data_type, nullable);
1569 if let Some(metadata) = metadata {
1570 field.set_metadata(metadata.to_hashmap());
1571 }
1572 self.inner.push(field);
1573 }
1574
1575 pub fn into_fields(self) -> Fields {
1576 self.inner.into()
1577 }
1578}
1579
1580pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1592 let mut name_map = HashMap::<&str, usize>::new();
1600 let mut seen = HashSet::<Cow<String>>::new();
1602
1603 fields
1604 .iter()
1605 .map(|field| {
1606 let original_name = field.name();
1607 let mut name = Cow::Borrowed(original_name);
1608
1609 let count = name_map.entry(original_name).or_insert(0);
1610
1611 while seen.contains(&name) {
1613 *count += 1;
1614 name = Cow::Owned(format!("{original_name}:{count}"));
1615 }
1616
1617 seen.insert(name.clone());
1618
1619 match name {
1620 Cow::Borrowed(_) => None,
1621 Cow::Owned(alias) => Some(alias),
1622 }
1623 })
1624 .collect()
1625}
1626
1627fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1628 let mut table_references = schema
1629 .iter()
1630 .filter_map(|(qualifier, _)| qualifier)
1631 .collect::<Vec<_>>();
1632 table_references.dedup();
1633 let table_reference = if table_references.len() == 1 {
1634 table_references.pop().cloned()
1635 } else {
1636 None
1637 };
1638
1639 (
1640 table_reference,
1641 Arc::new(Field::new("mark", DataType::Boolean, false)),
1642 )
1643}
1644
1645pub fn build_join_schema(
1648 left: &DFSchema,
1649 right: &DFSchema,
1650 join_type: &JoinType,
1651) -> Result<DFSchema> {
1652 fn nullify_fields<'a>(
1653 fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1654 ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1655 fields
1656 .map(|(q, f)| {
1657 let field = f.as_ref().clone().with_nullable(true);
1659 (q.cloned(), Arc::new(field))
1660 })
1661 .collect()
1662 }
1663
1664 let right_fields = right.iter();
1665 let left_fields = left.iter();
1666
1667 let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1668 JoinType::Inner => {
1669 let left_fields = left_fields
1671 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1672 .collect::<Vec<_>>();
1673 let right_fields = right_fields
1674 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1675 .collect::<Vec<_>>();
1676 left_fields.into_iter().chain(right_fields).collect()
1677 }
1678 JoinType::Left => {
1679 let left_fields = left_fields
1681 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1682 .collect::<Vec<_>>();
1683 left_fields
1684 .into_iter()
1685 .chain(nullify_fields(right_fields))
1686 .collect()
1687 }
1688 JoinType::Right => {
1689 let right_fields = right_fields
1691 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1692 .collect::<Vec<_>>();
1693 nullify_fields(left_fields)
1694 .into_iter()
1695 .chain(right_fields)
1696 .collect()
1697 }
1698 JoinType::Full => {
1699 nullify_fields(left_fields)
1701 .into_iter()
1702 .chain(nullify_fields(right_fields))
1703 .collect()
1704 }
1705 JoinType::LeftSemi | JoinType::LeftAnti => {
1706 left_fields
1708 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1709 .collect()
1710 }
1711 JoinType::LeftMark => left_fields
1712 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1713 .chain(once(mark_field(right)))
1714 .collect(),
1715 JoinType::RightSemi | JoinType::RightAnti => {
1716 right_fields
1718 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1719 .collect()
1720 }
1721 JoinType::RightMark => right_fields
1722 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1723 .chain(once(mark_field(left)))
1724 .collect(),
1725 };
1726 let func_dependencies = left.functional_dependencies().join(
1727 right.functional_dependencies(),
1728 join_type,
1729 left.fields().len(),
1730 );
1731
1732 let (schema1, schema2) = match join_type {
1733 JoinType::Right
1734 | JoinType::RightSemi
1735 | JoinType::RightAnti
1736 | JoinType::RightMark => (left, right),
1737 _ => (right, left),
1738 };
1739
1740 let metadata = schema1
1741 .metadata()
1742 .clone()
1743 .into_iter()
1744 .chain(schema2.metadata().clone())
1745 .collect();
1746
1747 let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1748 dfschema.with_functional_dependencies(func_dependencies)
1749}
1750
1751pub fn requalify_sides_if_needed(
1761 left: LogicalPlanBuilder,
1762 right: LogicalPlanBuilder,
1763) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1764 let left_cols = left.schema().columns();
1765 let right_cols = right.schema().columns();
1766
1767 for l in &left_cols {
1781 for r in &right_cols {
1782 if l.name != r.name {
1783 continue;
1784 }
1785
1786 match (&l.relation, &r.relation) {
1788 (Some(l_rel), Some(r_rel)) if l_rel == r_rel => {
1790 return Ok((
1791 left.alias(TableReference::bare("left"))?,
1792 right.alias(TableReference::bare("right"))?,
1793 true,
1794 ));
1795 }
1796 (None, None) => {
1798 return Ok((
1799 left.alias(TableReference::bare("left"))?,
1800 right.alias(TableReference::bare("right"))?,
1801 true,
1802 ));
1803 }
1804 (Some(_), None) | (None, Some(_)) => {
1806 return Ok((
1807 left.alias(TableReference::bare("left"))?,
1808 right.alias(TableReference::bare("right"))?,
1809 true,
1810 ));
1811 }
1812 _ => {}
1814 }
1815 }
1816 }
1817
1818 Ok((left, right, false))
1820}
1821pub fn add_group_by_exprs_from_dependencies(
1831 mut group_expr: Vec<Expr>,
1832 schema: &DFSchemaRef,
1833) -> Result<Vec<Expr>> {
1834 let mut group_by_field_names = group_expr
1837 .iter()
1838 .map(|e| e.schema_name().to_string())
1839 .collect::<Vec<_>>();
1840
1841 if let Some(target_indices) =
1842 get_target_functional_dependencies(schema, &group_by_field_names)
1843 {
1844 for idx in target_indices {
1845 let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1846 let expr_name = expr.schema_name().to_string();
1847 if !group_by_field_names.contains(&expr_name) {
1848 group_by_field_names.push(expr_name);
1849 group_expr.push(expr);
1850 }
1851 }
1852 }
1853 Ok(group_expr)
1854}
1855
1856pub fn validate_unique_names<'a>(
1858 node_name: &str,
1859 expressions: impl IntoIterator<Item = &'a Expr>,
1860) -> Result<()> {
1861 let mut unique_names = HashMap::new();
1862
1863 expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1864 let name = expr.schema_name().to_string();
1865 match unique_names.get(&name) {
1866 None => {
1867 unique_names.insert(name, (position, expr));
1868 Ok(())
1869 },
1870 Some((existing_position, existing_expr)) => {
1871 plan_err!("{node_name} require unique expression names \
1872 but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1873 at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1874 )
1875 }
1876 }
1877 })
1878}
1879
1880pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1892 Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1893 Arc::new(left_plan),
1894 Arc::new(right_plan),
1895 ])?))
1896}
1897
1898pub fn union_by_name(
1901 left_plan: LogicalPlan,
1902 right_plan: LogicalPlan,
1903) -> Result<LogicalPlan> {
1904 Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1905 Arc::new(left_plan),
1906 Arc::new(right_plan),
1907 ])?))
1908}
1909
1910pub fn project(
1916 plan: LogicalPlan,
1917 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1918) -> Result<LogicalPlan> {
1919 project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1920}
1921
1922fn project_with_validation(
1930 plan: LogicalPlan,
1931 expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1932) -> Result<LogicalPlan> {
1933 let mut projected_expr = vec![];
1934 for (e, validate) in expr {
1935 let e = e.into();
1936 match e {
1937 SelectExpr::Wildcard(opt) => {
1938 let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1939
1940 let expanded = if let Some(replace) = opt.replace {
1943 replace_columns(expanded, &replace)?
1944 } else {
1945 expanded
1946 };
1947
1948 for e in expanded {
1949 if validate {
1950 projected_expr
1951 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1952 } else {
1953 projected_expr.push(e)
1954 }
1955 }
1956 }
1957 SelectExpr::QualifiedWildcard(table_ref, opt) => {
1958 let expanded =
1959 expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1960
1961 let expanded = if let Some(replace) = opt.replace {
1964 replace_columns(expanded, &replace)?
1965 } else {
1966 expanded
1967 };
1968
1969 for e in expanded {
1970 if validate {
1971 projected_expr
1972 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1973 } else {
1974 projected_expr.push(e)
1975 }
1976 }
1977 }
1978 SelectExpr::Expression(e) => {
1979 if validate {
1980 projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1981 } else {
1982 projected_expr.push(e)
1983 }
1984 }
1985 }
1986 }
1987 validate_unique_names("Projections", projected_expr.iter())?;
1988
1989 Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1990}
1991
1992fn replace_columns(
1997 mut exprs: Vec<Expr>,
1998 replace: &PlannedReplaceSelectItem,
1999) -> Result<Vec<Expr>> {
2000 for expr in exprs.iter_mut() {
2001 if let Expr::Column(Column { name, .. }) = expr
2002 && let Some((_, new_expr)) = replace
2003 .items()
2004 .iter()
2005 .zip(replace.expressions().iter())
2006 .find(|(item, _)| item.column_name.value == *name)
2007 {
2008 *expr = new_expr.clone().alias(name.clone())
2009 }
2010 }
2011 Ok(exprs)
2012}
2013
2014pub fn subquery_alias(
2016 plan: LogicalPlan,
2017 alias: impl Into<TableReference>,
2018) -> Result<LogicalPlan> {
2019 SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
2020}
2021
2022pub fn table_scan(
2025 name: Option<impl Into<TableReference>>,
2026 table_schema: &Schema,
2027 projection: Option<Vec<usize>>,
2028) -> Result<LogicalPlanBuilder> {
2029 table_scan_with_filters(name, table_schema, projection, vec![])
2030}
2031
2032pub fn table_scan_with_filters(
2036 name: Option<impl Into<TableReference>>,
2037 table_schema: &Schema,
2038 projection: Option<Vec<usize>>,
2039 filters: Vec<Expr>,
2040) -> Result<LogicalPlanBuilder> {
2041 let table_source = table_source(table_schema);
2042 let name = name
2043 .map(|n| n.into())
2044 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2045 LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
2046}
2047
2048pub fn table_scan_with_filter_and_fetch(
2052 name: Option<impl Into<TableReference>>,
2053 table_schema: &Schema,
2054 projection: Option<Vec<usize>>,
2055 filters: Vec<Expr>,
2056 fetch: Option<usize>,
2057) -> Result<LogicalPlanBuilder> {
2058 let table_source = table_source(table_schema);
2059 let name = name
2060 .map(|n| n.into())
2061 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2062 LogicalPlanBuilder::scan_with_filters_fetch(
2063 name,
2064 table_source,
2065 projection,
2066 filters,
2067 fetch,
2068 )
2069}
2070
2071pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
2072 let table_schema = Arc::new(table_schema.clone());
2074 Arc::new(LogicalTableSource {
2075 table_schema,
2076 constraints: Default::default(),
2077 })
2078}
2079
2080pub fn table_source_with_constraints(
2081 table_schema: &Schema,
2082 constraints: Constraints,
2083) -> Arc<dyn TableSource> {
2084 let table_schema = Arc::new(table_schema.clone());
2086 Arc::new(LogicalTableSource {
2087 table_schema,
2088 constraints,
2089 })
2090}
2091
2092pub fn wrap_projection_for_join_if_necessary(
2094 join_keys: &[Expr],
2095 input: LogicalPlan,
2096) -> Result<(LogicalPlan, Vec<Column>, bool)> {
2097 let input_schema = input.schema();
2098 let alias_join_keys: Vec<Expr> = join_keys
2099 .iter()
2100 .map(|key| {
2101 if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
2110 let alias = format!("{key}");
2111 key.clone().alias(alias)
2112 } else {
2113 key.clone()
2114 }
2115 })
2116 .collect::<Vec<_>>();
2117
2118 let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2119 let plan = if need_project {
2120 let mut projection = input_schema
2122 .columns()
2123 .into_iter()
2124 .map(Expr::Column)
2125 .collect::<Vec<_>>();
2126 let join_key_items = alias_join_keys
2127 .iter()
2128 .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2129 .cloned()
2130 .collect::<HashSet<Expr>>();
2131 projection.extend(join_key_items);
2132
2133 LogicalPlanBuilder::from(input)
2134 .project(projection.into_iter().map(SelectExpr::from))?
2135 .build()?
2136 } else {
2137 input
2138 };
2139
2140 let join_on = alias_join_keys
2141 .into_iter()
2142 .map(|key| {
2143 if let Some(col) = key.try_as_col() {
2144 Ok(col.clone())
2145 } else {
2146 let name = key.schema_name().to_string();
2147 Ok(Column::from_name(name))
2148 }
2149 })
2150 .collect::<Result<Vec<_>>>()?;
2151
2152 Ok((plan, join_on, need_project))
2153}
2154
2155pub struct LogicalTableSource {
2159 table_schema: SchemaRef,
2160 constraints: Constraints,
2161}
2162
2163impl LogicalTableSource {
2164 pub fn new(table_schema: SchemaRef) -> Self {
2166 Self {
2167 table_schema,
2168 constraints: Constraints::default(),
2169 }
2170 }
2171
2172 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2173 self.constraints = constraints;
2174 self
2175 }
2176}
2177
2178impl TableSource for LogicalTableSource {
2179 fn as_any(&self) -> &dyn Any {
2180 self
2181 }
2182
2183 fn schema(&self) -> SchemaRef {
2184 Arc::clone(&self.table_schema)
2185 }
2186
2187 fn constraints(&self) -> Option<&Constraints> {
2188 Some(&self.constraints)
2189 }
2190
2191 fn supports_filters_pushdown(
2192 &self,
2193 filters: &[&Expr],
2194 ) -> Result<Vec<TableProviderFilterPushDown>> {
2195 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2196 }
2197}
2198
2199pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2201 unnest_with_options(input, columns, UnnestOptions::default())
2202}
2203
2204pub fn get_struct_unnested_columns(
2205 col_name: &String,
2206 inner_fields: &Fields,
2207) -> Vec<Column> {
2208 inner_fields
2209 .iter()
2210 .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2211 .collect()
2212}
2213
2214pub fn unnest_with_options(
2244 input: LogicalPlan,
2245 columns_to_unnest: Vec<Column>,
2246 options: UnnestOptions,
2247) -> Result<LogicalPlan> {
2248 Ok(LogicalPlan::Unnest(Unnest::try_new(
2249 Arc::new(input),
2250 columns_to_unnest,
2251 options,
2252 )?))
2253}
2254
2255#[cfg(test)]
2256mod tests {
2257 use std::vec;
2258
2259 use super::*;
2260 use crate::lit_with_metadata;
2261 use crate::logical_plan::StringifiedPlan;
2262 use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2263
2264 use crate::test::function_stub::sum;
2265 use datafusion_common::{
2266 Constraint, DataFusionError, RecursionUnnestOption, SchemaError,
2267 };
2268 use insta::assert_snapshot;
2269
2270 #[test]
2271 fn plan_builder_simple() -> Result<()> {
2272 let plan =
2273 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2274 .filter(col("state").eq(lit("CO")))?
2275 .project(vec![col("id")])?
2276 .build()?;
2277
2278 assert_snapshot!(plan, @r#"
2279 Projection: employee_csv.id
2280 Filter: employee_csv.state = Utf8("CO")
2281 TableScan: employee_csv projection=[id, state]
2282 "#);
2283
2284 Ok(())
2285 }
2286
2287 #[test]
2288 fn plan_builder_schema() {
2289 let schema = employee_schema();
2290 let projection = None;
2291 let plan =
2292 LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2293 .unwrap();
2294 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2295
2296 let projection = None;
2299 let plan =
2300 LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2301 .unwrap();
2302 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2303 }
2304
2305 #[test]
2306 fn plan_builder_empty_name() {
2307 let schema = employee_schema();
2308 let projection = None;
2309 let err =
2310 LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2311 assert_snapshot!(
2312 err.strip_backtrace(),
2313 @"Error during planning: table_name cannot be empty"
2314 );
2315 }
2316
2317 #[test]
2318 fn plan_builder_sort() -> Result<()> {
2319 let plan =
2320 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2321 .sort(vec![
2322 expr::Sort::new(col("state"), true, true),
2323 expr::Sort::new(col("salary"), false, false),
2324 ])?
2325 .build()?;
2326
2327 assert_snapshot!(plan, @r"
2328 Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2329 TableScan: employee_csv projection=[state, salary]
2330 ");
2331
2332 Ok(())
2333 }
2334
2335 #[test]
2336 fn plan_builder_union() -> Result<()> {
2337 let plan =
2338 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2339
2340 let plan = plan
2341 .clone()
2342 .union(plan.clone().build()?)?
2343 .union(plan.clone().build()?)?
2344 .union(plan.build()?)?
2345 .build()?;
2346
2347 assert_snapshot!(plan, @r"
2348 Union
2349 Union
2350 Union
2351 TableScan: employee_csv projection=[state, salary]
2352 TableScan: employee_csv projection=[state, salary]
2353 TableScan: employee_csv projection=[state, salary]
2354 TableScan: employee_csv projection=[state, salary]
2355 ");
2356
2357 Ok(())
2358 }
2359
2360 #[test]
2361 fn plan_builder_union_distinct() -> Result<()> {
2362 let plan =
2363 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2364
2365 let plan = plan
2366 .clone()
2367 .union_distinct(plan.clone().build()?)?
2368 .union_distinct(plan.clone().build()?)?
2369 .union_distinct(plan.build()?)?
2370 .build()?;
2371
2372 assert_snapshot!(plan, @r"
2373 Distinct:
2374 Union
2375 Distinct:
2376 Union
2377 Distinct:
2378 Union
2379 TableScan: employee_csv projection=[state, salary]
2380 TableScan: employee_csv projection=[state, salary]
2381 TableScan: employee_csv projection=[state, salary]
2382 TableScan: employee_csv projection=[state, salary]
2383 ");
2384
2385 Ok(())
2386 }
2387
2388 #[test]
2389 fn plan_builder_simple_distinct() -> Result<()> {
2390 let plan =
2391 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2392 .filter(col("state").eq(lit("CO")))?
2393 .project(vec![col("id")])?
2394 .distinct()?
2395 .build()?;
2396
2397 assert_snapshot!(plan, @r#"
2398 Distinct:
2399 Projection: employee_csv.id
2400 Filter: employee_csv.state = Utf8("CO")
2401 TableScan: employee_csv projection=[id, state]
2402 "#);
2403
2404 Ok(())
2405 }
2406
2407 #[test]
2408 fn exists_subquery() -> Result<()> {
2409 let foo = test_table_scan_with_name("foo")?;
2410 let bar = test_table_scan_with_name("bar")?;
2411
2412 let subquery = LogicalPlanBuilder::from(foo)
2413 .project(vec![col("a")])?
2414 .filter(col("a").eq(col("bar.a")))?
2415 .build()?;
2416
2417 let outer_query = LogicalPlanBuilder::from(bar)
2418 .project(vec![col("a")])?
2419 .filter(exists(Arc::new(subquery)))?
2420 .build()?;
2421
2422 assert_snapshot!(outer_query, @r"
2423 Filter: EXISTS (<subquery>)
2424 Subquery:
2425 Filter: foo.a = bar.a
2426 Projection: foo.a
2427 TableScan: foo
2428 Projection: bar.a
2429 TableScan: bar
2430 ");
2431
2432 Ok(())
2433 }
2434
2435 #[test]
2436 fn filter_in_subquery() -> Result<()> {
2437 let foo = test_table_scan_with_name("foo")?;
2438 let bar = test_table_scan_with_name("bar")?;
2439
2440 let subquery = LogicalPlanBuilder::from(foo)
2441 .project(vec![col("a")])?
2442 .filter(col("a").eq(col("bar.a")))?
2443 .build()?;
2444
2445 let outer_query = LogicalPlanBuilder::from(bar)
2447 .project(vec![col("a")])?
2448 .filter(in_subquery(col("a"), Arc::new(subquery)))?
2449 .build()?;
2450
2451 assert_snapshot!(outer_query, @r"
2452 Filter: bar.a IN (<subquery>)
2453 Subquery:
2454 Filter: foo.a = bar.a
2455 Projection: foo.a
2456 TableScan: foo
2457 Projection: bar.a
2458 TableScan: bar
2459 ");
2460
2461 Ok(())
2462 }
2463
2464 #[test]
2465 fn select_scalar_subquery() -> Result<()> {
2466 let foo = test_table_scan_with_name("foo")?;
2467 let bar = test_table_scan_with_name("bar")?;
2468
2469 let subquery = LogicalPlanBuilder::from(foo)
2470 .project(vec![col("b")])?
2471 .filter(col("a").eq(col("bar.a")))?
2472 .build()?;
2473
2474 let outer_query = LogicalPlanBuilder::from(bar)
2476 .project(vec![scalar_subquery(Arc::new(subquery))])?
2477 .build()?;
2478
2479 assert_snapshot!(outer_query, @r"
2480 Projection: (<subquery>)
2481 Subquery:
2482 Filter: foo.a = bar.a
2483 Projection: foo.b
2484 TableScan: foo
2485 TableScan: bar
2486 ");
2487
2488 Ok(())
2489 }
2490
2491 #[test]
2492 fn projection_non_unique_names() -> Result<()> {
2493 let plan = table_scan(
2494 Some("employee_csv"),
2495 &employee_schema(),
2496 Some(vec![0, 1]),
2498 )?
2499 .project(vec![col("id"), col("first_name").alias("id")]);
2501
2502 match plan {
2503 Err(DataFusionError::SchemaError(err, _)) => {
2504 if let SchemaError::AmbiguousReference { field } = *err {
2505 let Column {
2506 relation,
2507 name,
2508 spans: _,
2509 } = *field;
2510 let Some(TableReference::Bare { table }) = relation else {
2511 return plan_err!(
2512 "wrong relation: {relation:?}, expected table name"
2513 );
2514 };
2515 assert_eq!(*"employee_csv", *table);
2516 assert_eq!("id", &name);
2517 Ok(())
2518 } else {
2519 plan_err!("Plan should have returned an DataFusionError::SchemaError")
2520 }
2521 }
2522 _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2523 }
2524 }
2525
2526 fn employee_schema() -> Schema {
2527 Schema::new(vec![
2528 Field::new("id", DataType::Int32, false),
2529 Field::new("first_name", DataType::Utf8, false),
2530 Field::new("last_name", DataType::Utf8, false),
2531 Field::new("state", DataType::Utf8, false),
2532 Field::new("salary", DataType::Int32, false),
2533 ])
2534 }
2535
2536 #[test]
2537 fn stringified_plan() {
2538 let stringified_plan =
2539 StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2540 assert!(stringified_plan.should_display(true));
2541 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2544 StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2545 assert!(stringified_plan.should_display(true));
2546 assert!(stringified_plan.should_display(false)); let stringified_plan =
2549 StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2550 assert!(stringified_plan.should_display(true));
2551 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2554 StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2555 assert!(stringified_plan.should_display(true));
2556 assert!(stringified_plan.should_display(false)); let stringified_plan = StringifiedPlan::new(
2559 PlanType::OptimizedLogicalPlan {
2560 optimizer_name: "random opt pass".into(),
2561 },
2562 "...the plan...",
2563 );
2564 assert!(stringified_plan.should_display(true));
2565 assert!(!stringified_plan.should_display(false));
2566 }
2567
2568 fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2569 let schema = Schema::new(vec![
2570 Field::new("a", DataType::UInt32, false),
2571 Field::new("b", DataType::UInt32, false),
2572 Field::new("c", DataType::UInt32, false),
2573 ]);
2574 table_scan(Some(name), &schema, None)?.build()
2575 }
2576
2577 #[test]
2578 fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2579 let plan1 =
2580 table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2581 let plan2 =
2582 table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2583
2584 let err_msg1 =
2585 LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2586 .unwrap_err();
2587
2588 assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2589
2590 Ok(())
2591 }
2592
2593 #[test]
2594 fn plan_builder_unnest() -> Result<()> {
2595 let err = nested_table_scan("test_table")?
2597 .unnest_column("scalar")
2598 .unwrap_err();
2599
2600 let DataFusionError::Internal(desc) = err else {
2601 return plan_err!("Plan should have returned an DataFusionError::Internal");
2602 };
2603
2604 let desc = (*desc
2605 .split(DataFusionError::BACK_TRACE_SEP)
2606 .collect::<Vec<&str>>()
2607 .first()
2608 .unwrap_or(&""))
2609 .to_string();
2610
2611 assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2612
2613 let plan = nested_table_scan("test_table")?
2615 .unnest_column("strings")?
2616 .build()?;
2617
2618 assert_snapshot!(plan, @r"
2619 Unnest: lists[test_table.strings|depth=1] structs[]
2620 TableScan: test_table
2621 ");
2622
2623 let field = plan.schema().field_with_name(None, "strings").unwrap();
2625 assert_eq!(&DataType::Utf8, field.data_type());
2626
2627 let plan = nested_table_scan("test_table")?
2629 .unnest_column("struct_singular")?
2630 .build()?;
2631
2632 assert_snapshot!(plan, @r"
2633 Unnest: lists[] structs[test_table.struct_singular]
2634 TableScan: test_table
2635 ");
2636
2637 for field_name in &["a", "b"] {
2638 let field = plan
2640 .schema()
2641 .field_with_name(None, &format!("struct_singular.{field_name}"))
2642 .unwrap();
2643 assert_eq!(&DataType::UInt32, field.data_type());
2644 }
2645
2646 let plan = nested_table_scan("test_table")?
2648 .unnest_column("strings")?
2649 .unnest_column("structs")?
2650 .unnest_column("struct_singular")?
2651 .build()?;
2652
2653 assert_snapshot!(plan, @r"
2654 Unnest: lists[] structs[test_table.struct_singular]
2655 Unnest: lists[test_table.structs|depth=1] structs[]
2656 Unnest: lists[test_table.strings|depth=1] structs[]
2657 TableScan: test_table
2658 ");
2659
2660 let field = plan.schema().field_with_name(None, "structs").unwrap();
2662 assert!(matches!(field.data_type(), DataType::Struct(_)));
2663
2664 let cols = vec!["strings", "structs", "struct_singular"]
2666 .into_iter()
2667 .map(|c| c.into())
2668 .collect();
2669
2670 let plan = nested_table_scan("test_table")?
2671 .unnest_columns_with_options(cols, UnnestOptions::default())?
2672 .build()?;
2673
2674 assert_snapshot!(plan, @r"
2675 Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2676 TableScan: test_table
2677 ");
2678
2679 let plan = nested_table_scan("test_table")?.unnest_column("missing");
2681 assert!(plan.is_err());
2682
2683 let plan = nested_table_scan("test_table")?
2685 .unnest_columns_with_options(
2686 vec!["stringss".into(), "struct_singular".into()],
2687 UnnestOptions::default()
2688 .with_recursions(RecursionUnnestOption {
2689 input_column: "stringss".into(),
2690 output_column: "stringss_depth_1".into(),
2691 depth: 1,
2692 })
2693 .with_recursions(RecursionUnnestOption {
2694 input_column: "stringss".into(),
2695 output_column: "stringss_depth_2".into(),
2696 depth: 2,
2697 }),
2698 )?
2699 .build()?;
2700
2701 assert_snapshot!(plan, @r"
2702 Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2703 TableScan: test_table
2704 ");
2705
2706 let field = plan
2708 .schema()
2709 .field_with_name(None, "stringss_depth_1")
2710 .unwrap();
2711 assert_eq!(
2712 &DataType::new_list(DataType::Utf8, false),
2713 field.data_type()
2714 );
2715 let field = plan
2716 .schema()
2717 .field_with_name(None, "stringss_depth_2")
2718 .unwrap();
2719 assert_eq!(&DataType::Utf8, field.data_type());
2720 for field_name in &["a", "b"] {
2722 let field = plan
2723 .schema()
2724 .field_with_name(None, &format!("struct_singular.{field_name}"))
2725 .unwrap();
2726 assert_eq!(&DataType::UInt32, field.data_type());
2727 }
2728
2729 Ok(())
2730 }
2731
2732 fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2733 let struct_field_in_list = Field::new_struct(
2736 "item",
2737 vec![
2738 Field::new("a", DataType::UInt32, false),
2739 Field::new("b", DataType::UInt32, false),
2740 ],
2741 false,
2742 );
2743 let string_field = Field::new_list_field(DataType::Utf8, false);
2744 let strings_field = Field::new_list("item", string_field.clone(), false);
2745 let schema = Schema::new(vec![
2746 Field::new("scalar", DataType::UInt32, false),
2747 Field::new_list("strings", string_field, false),
2748 Field::new_list("structs", struct_field_in_list, false),
2749 Field::new(
2750 "struct_singular",
2751 DataType::Struct(Fields::from(vec![
2752 Field::new("a", DataType::UInt32, false),
2753 Field::new("b", DataType::UInt32, false),
2754 ])),
2755 false,
2756 ),
2757 Field::new_list("stringss", strings_field, false),
2758 ]);
2759
2760 table_scan(Some(table_name), &schema, None)
2761 }
2762
2763 #[test]
2764 fn test_union_after_join() -> Result<()> {
2765 let values = vec![vec![lit(1)]];
2766
2767 let left = LogicalPlanBuilder::values(values.clone())?
2768 .alias("left")?
2769 .build()?;
2770 let right = LogicalPlanBuilder::values(values)?
2771 .alias("right")?
2772 .build()?;
2773
2774 let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2775
2776 let plan = LogicalPlanBuilder::from(join.clone())
2777 .union(join)?
2778 .build()?;
2779
2780 assert_snapshot!(plan, @r"
2781 Union
2782 Cross Join:
2783 SubqueryAlias: left
2784 Values: (Int32(1))
2785 SubqueryAlias: right
2786 Values: (Int32(1))
2787 Cross Join:
2788 SubqueryAlias: left
2789 Values: (Int32(1))
2790 SubqueryAlias: right
2791 Values: (Int32(1))
2792 ");
2793
2794 Ok(())
2795 }
2796
2797 #[test]
2798 fn plan_builder_from_logical_plan() -> Result<()> {
2799 let plan =
2800 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2801 .sort(vec![
2802 expr::Sort::new(col("state"), true, true),
2803 expr::Sort::new(col("salary"), false, false),
2804 ])?
2805 .build()?;
2806
2807 let plan_expected = format!("{plan}");
2808 let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2809 assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2810
2811 Ok(())
2812 }
2813
2814 #[test]
2815 fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2816 let constraints =
2817 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2818 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2819
2820 let plan =
2821 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2822 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2823 .build()?;
2824
2825 assert_snapshot!(plan, @r"
2826 Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2827 TableScan: employee_csv projection=[id, state, salary]
2828 ");
2829
2830 Ok(())
2831 }
2832
2833 #[test]
2834 fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2835 let constraints =
2836 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2837 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2838
2839 let options =
2840 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2841 let plan =
2842 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2843 .with_options(options)
2844 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2845 .build()?;
2846
2847 assert_snapshot!(plan, @r"
2848 Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2849 TableScan: employee_csv projection=[id, state, salary]
2850 ");
2851
2852 Ok(())
2853 }
2854
2855 #[test]
2856 fn test_join_metadata() -> Result<()> {
2857 let left_schema = DFSchema::new_with_metadata(
2858 vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2859 HashMap::from([("key".to_string(), "left".to_string())]),
2860 )?;
2861 let right_schema = DFSchema::new_with_metadata(
2862 vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2863 HashMap::from([("key".to_string(), "right".to_string())]),
2864 )?;
2865
2866 let join_schema =
2867 build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2868 assert_eq!(
2869 join_schema.metadata(),
2870 &HashMap::from([("key".to_string(), "left".to_string())])
2871 );
2872 let join_schema =
2873 build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2874 assert_eq!(
2875 join_schema.metadata(),
2876 &HashMap::from([("key".to_string(), "right".to_string())])
2877 );
2878
2879 Ok(())
2880 }
2881
2882 #[test]
2883 fn test_values_metadata() -> Result<()> {
2884 let metadata: HashMap<String, String> =
2885 [("ARROW:extension:metadata".to_string(), "test".to_string())]
2886 .into_iter()
2887 .collect();
2888 let metadata = FieldMetadata::from(metadata);
2889 let values = LogicalPlanBuilder::values(vec![
2890 vec![lit_with_metadata(1, Some(metadata.clone()))],
2891 vec![lit_with_metadata(2, Some(metadata.clone()))],
2892 ])?
2893 .build()?;
2894 assert_eq!(*values.schema().field(0).metadata(), metadata.to_hashmap());
2895
2896 let metadata2: HashMap<String, String> =
2898 [("ARROW:extension:metadata".to_string(), "test2".to_string())]
2899 .into_iter()
2900 .collect();
2901 let metadata2 = FieldMetadata::from(metadata2);
2902 assert!(
2903 LogicalPlanBuilder::values(vec![
2904 vec![lit_with_metadata(1, Some(metadata.clone()))],
2905 vec![lit_with_metadata(2, Some(metadata2.clone()))],
2906 ])
2907 .is_err()
2908 );
2909
2910 Ok(())
2911 }
2912
2913 #[test]
2914 fn test_unique_field_aliases() {
2915 let t1_field_1 = Field::new("a", DataType::Int32, false);
2916 let t2_field_1 = Field::new("a", DataType::Int32, false);
2917 let t2_field_3 = Field::new("a", DataType::Int32, false);
2918 let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2919 let t1_field_2 = Field::new("b", DataType::Int32, false);
2920 let t2_field_2 = Field::new("b", DataType::Int32, false);
2921
2922 let fields = vec![
2923 t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2924 ];
2925 let fields = Fields::from(fields);
2926
2927 let remove_redundant = unique_field_aliases(&fields);
2928
2929 assert_eq!(
2936 remove_redundant,
2937 vec![
2938 None,
2939 Some("a:1".to_string()),
2940 None,
2941 Some("b:1".to_string()),
2942 Some("a:2".to_string()),
2943 Some("a:1:1".to_string()),
2944 ]
2945 );
2946 }
2947}