1use std::any::Any;
21use std::borrow::Cow;
22use std::cmp::Ordering;
23use std::collections::{HashMap, HashSet};
24use std::iter::once;
25use std::sync::Arc;
26
27use crate::dml::CopyTo;
28use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
29use crate::expr_rewriter::{
30 coerce_plan_expr_for_schema, normalize_col,
31 normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
32 rewrite_sort_cols_by_aggs,
33};
34use crate::logical_plan::{
35 Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
36 JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
37 Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
38 Window,
39};
40use crate::select_expr::SelectExpr;
41use crate::utils::{
42 can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
43 expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
44 group_window_expr_by_sort_keys,
45};
46use crate::{
47 DmlStatement, ExplainOption, Expr, ExprSchemable, Operator, RecursiveQuery,
48 Statement, TableProviderFilterPushDown, TableSource, WriteOp, and, binary_expr, lit,
49};
50
51use super::dml::InsertOp;
52use arrow::compute::can_cast_types;
53use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
54use datafusion_common::display::ToStringifiedPlan;
55use datafusion_common::file_options::file_type::FileType;
56use datafusion_common::metadata::FieldMetadata;
57use datafusion_common::{
58 Column, Constraints, DFSchema, DFSchemaRef, NullEquality, Result, ScalarValue,
59 TableReference, ToDFSchema, UnnestOptions, exec_err,
60 get_target_functional_dependencies, internal_datafusion_err, plan_datafusion_err,
61 plan_err,
62};
63use datafusion_expr_common::type_coercion::binary::type_union_resolution;
64
65use indexmap::IndexSet;
66
67pub const UNNAMED_TABLE: &str = "?table?";
69
70#[derive(Default, Debug, Clone)]
72pub struct LogicalPlanBuilderOptions {
73 add_implicit_group_by_exprs: bool,
76}
77
78impl LogicalPlanBuilderOptions {
79 pub fn new() -> Self {
80 Default::default()
81 }
82
83 pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
85 self.add_implicit_group_by_exprs = add;
86 self
87 }
88}
89
90#[derive(Debug, Clone)]
128pub struct LogicalPlanBuilder {
129 plan: Arc<LogicalPlan>,
130 options: LogicalPlanBuilderOptions,
131}
132
133impl LogicalPlanBuilder {
134 pub fn new(plan: LogicalPlan) -> Self {
136 Self {
137 plan: Arc::new(plan),
138 options: LogicalPlanBuilderOptions::default(),
139 }
140 }
141
142 pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
144 Self {
145 plan,
146 options: LogicalPlanBuilderOptions::default(),
147 }
148 }
149
150 pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
151 self.options = options;
152 self
153 }
154
155 pub fn schema(&self) -> &DFSchemaRef {
157 self.plan.schema()
158 }
159
160 pub fn plan(&self) -> &LogicalPlan {
162 &self.plan
163 }
164
165 pub fn empty(produce_one_row: bool) -> Self {
169 Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
170 produce_one_row,
171 schema: DFSchemaRef::new(DFSchema::empty()),
172 }))
173 }
174
175 pub fn to_recursive_query(
178 self,
179 name: String,
180 recursive_term: LogicalPlan,
181 is_distinct: bool,
182 ) -> Result<Self> {
183 let static_fields_len = self.plan.schema().fields().len();
185 let recursive_fields_len = recursive_term.schema().fields().len();
186 if static_fields_len != recursive_fields_len {
187 return plan_err!(
188 "Non-recursive term and recursive term must have the same number of columns ({} != {})",
189 static_fields_len,
190 recursive_fields_len
191 );
192 }
193 let coerced_recursive_term =
195 coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
196 Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
197 name,
198 static_term: self.plan,
199 recursive_term: Arc::new(coerced_recursive_term),
200 is_distinct,
201 })))
202 }
203
204 pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
212 if values.is_empty() {
213 return plan_err!("Values list cannot be empty");
214 }
215 let n_cols = values[0].len();
216 if n_cols == 0 {
217 return plan_err!("Values list cannot be zero length");
218 }
219 for (i, row) in values.iter().enumerate() {
220 if row.len() != n_cols {
221 return plan_err!(
222 "Inconsistent data length across values list: got {} values in row {} but expected {}",
223 row.len(),
224 i,
225 n_cols
226 );
227 }
228 }
229
230 Self::infer_data(values)
232 }
233
234 pub fn values_with_schema(
244 values: Vec<Vec<Expr>>,
245 schema: &DFSchemaRef,
246 ) -> Result<Self> {
247 if values.is_empty() {
248 return plan_err!("Values list cannot be empty");
249 }
250 let n_cols = schema.fields().len();
251 if n_cols == 0 {
252 return plan_err!("Values list cannot be zero length");
253 }
254 for (i, row) in values.iter().enumerate() {
255 if row.len() != n_cols {
256 return plan_err!(
257 "Inconsistent data length across values list: got {} values in row {} but expected {}",
258 row.len(),
259 i,
260 n_cols
261 );
262 }
263 }
264
265 Self::infer_values_from_schema(values, schema)
267 }
268
269 fn infer_values_from_schema(
270 values: Vec<Vec<Expr>>,
271 schema: &DFSchema,
272 ) -> Result<Self> {
273 let n_cols = values[0].len();
274 let mut fields = ValuesFields::new();
275 for j in 0..n_cols {
276 let field_type = schema.field(j).data_type();
277 let field_nullable = schema.field(j).is_nullable();
278 for row in values.iter() {
279 let value = &row[j];
280 let data_type = value.get_type(schema)?;
281
282 if !data_type.equals_datatype(field_type)
283 && !can_cast_types(&data_type, field_type)
284 {
285 return exec_err!(
286 "type mismatch and can't cast to got {} and {}",
287 data_type,
288 field_type
289 );
290 }
291 }
292 fields.push(field_type.to_owned(), field_nullable);
293 }
294
295 Self::infer_inner(values, fields, schema)
296 }
297
298 fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
299 let n_cols = values[0].len();
300 let schema = DFSchema::empty();
301 let mut fields = ValuesFields::new();
302
303 for j in 0..n_cols {
304 let mut common_type: Option<DataType> = None;
305 let mut common_metadata: Option<FieldMetadata> = None;
306 for (i, row) in values.iter().enumerate() {
307 let value = &row[j];
308 let metadata = value.metadata(&schema)?;
309 if let Some(ref cm) = common_metadata {
310 if &metadata != cm {
311 return plan_err!(
312 "Inconsistent metadata across values list at row {i} column {j}. Was {:?} but found {:?}",
313 cm,
314 metadata
315 );
316 }
317 } else {
318 common_metadata = Some(metadata.clone());
319 }
320 let data_type = value.get_type(&schema)?;
321 if data_type == DataType::Null {
322 continue;
323 }
324
325 if let Some(prev_type) = common_type {
326 let data_types = vec![prev_type.clone(), data_type.clone()];
328 let Some(new_type) = type_union_resolution(&data_types) else {
329 return plan_err!(
330 "Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}"
331 );
332 };
333 common_type = Some(new_type);
334 } else {
335 common_type = Some(data_type);
336 }
337 }
338 fields.push_with_metadata(
341 common_type.unwrap_or(DataType::Null),
342 true,
343 common_metadata,
344 );
345 }
346
347 Self::infer_inner(values, fields, &schema)
348 }
349
350 fn infer_inner(
351 mut values: Vec<Vec<Expr>>,
352 fields: ValuesFields,
353 schema: &DFSchema,
354 ) -> Result<Self> {
355 let fields = fields.into_fields();
356 for row in &mut values {
358 for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
359 if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
360 row[j] = Expr::Literal(
361 ScalarValue::try_from(field_type)?,
362 metadata.clone(),
363 );
364 } else {
365 row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
366 }
367 }
368 }
369
370 let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
371 let schema = DFSchemaRef::new(dfschema);
372
373 Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
374 }
375
376 pub fn scan(
409 table_name: impl Into<TableReference>,
410 table_source: Arc<dyn TableSource>,
411 projection: Option<Vec<usize>>,
412 ) -> Result<Self> {
413 Self::scan_with_filters(table_name, table_source, projection, vec![])
414 }
415
416 pub fn copy_to(
418 input: LogicalPlan,
419 output_url: String,
420 file_type: Arc<dyn FileType>,
421 options: HashMap<String, String>,
422 partition_by: Vec<String>,
423 ) -> Result<Self> {
424 Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
425 Arc::new(input),
426 output_url,
427 partition_by,
428 file_type,
429 options,
430 ))))
431 }
432
433 pub fn insert_into(
467 input: LogicalPlan,
468 table_name: impl Into<TableReference>,
469 target: Arc<dyn TableSource>,
470 insert_op: InsertOp,
471 ) -> Result<Self> {
472 Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
473 table_name.into(),
474 target,
475 WriteOp::Insert(insert_op),
476 Arc::new(input),
477 ))))
478 }
479
480 pub fn scan_with_filters(
482 table_name: impl Into<TableReference>,
483 table_source: Arc<dyn TableSource>,
484 projection: Option<Vec<usize>>,
485 filters: Vec<Expr>,
486 ) -> Result<Self> {
487 Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
488 }
489
490 pub fn scan_with_filters_fetch(
492 table_name: impl Into<TableReference>,
493 table_source: Arc<dyn TableSource>,
494 projection: Option<Vec<usize>>,
495 filters: Vec<Expr>,
496 fetch: Option<usize>,
497 ) -> Result<Self> {
498 Self::scan_with_filters_inner(
499 table_name,
500 table_source,
501 projection,
502 filters,
503 fetch,
504 )
505 }
506
507 fn scan_with_filters_inner(
508 table_name: impl Into<TableReference>,
509 table_source: Arc<dyn TableSource>,
510 projection: Option<Vec<usize>>,
511 filters: Vec<Expr>,
512 fetch: Option<usize>,
513 ) -> Result<Self> {
514 let table_scan =
515 TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
516
517 if table_scan.filters.is_empty()
519 && let Some(p) = table_scan.source.get_logical_plan()
520 {
521 let sub_plan = p.into_owned();
522
523 if let Some(proj) = table_scan.projection {
524 let projection_exprs = proj
525 .into_iter()
526 .map(|i| {
527 Expr::Column(Column::from(sub_plan.schema().qualified_field(i)))
528 })
529 .collect::<Vec<_>>();
530 return Self::new(sub_plan)
531 .project(projection_exprs)?
532 .alias(table_scan.table_name);
533 }
534
535 return Self::new(sub_plan).alias(table_scan.table_name);
539 }
540
541 Ok(Self::new(LogicalPlan::TableScan(table_scan)))
542 }
543
544 pub fn window_plan(
546 input: LogicalPlan,
547 window_exprs: impl IntoIterator<Item = Expr>,
548 ) -> Result<LogicalPlan> {
549 let mut plan = input;
550 let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
551 groups.sort_by(|(key_a, _), (key_b, _)| {
557 for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
558 let key_ordering = compare_sort_expr(first, second, plan.schema());
559 match key_ordering {
560 Ordering::Less => {
561 return Ordering::Less;
562 }
563 Ordering::Greater => {
564 return Ordering::Greater;
565 }
566 Ordering::Equal => {}
567 }
568 }
569 key_b.len().cmp(&key_a.len())
570 });
571 for (_, exprs) in groups {
572 let window_exprs = exprs.into_iter().collect::<Vec<_>>();
573 plan = LogicalPlanBuilder::from(plan)
576 .window(window_exprs)?
577 .build()?;
578 }
579 Ok(plan)
580 }
581
582 pub fn project(
584 self,
585 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
586 ) -> Result<Self> {
587 project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
588 }
589
590 pub fn project_with_validation(
593 self,
594 expr: Vec<(impl Into<SelectExpr>, bool)>,
595 ) -> Result<Self> {
596 project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
597 }
598
599 pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
601 let exprs: Vec<_> = indices
602 .into_iter()
603 .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
604 .collect();
605 self.project(exprs)
606 }
607
608 pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
610 let expr = normalize_col(expr.into(), &self.plan)?;
611 Filter::try_new(expr, self.plan)
612 .map(LogicalPlan::Filter)
613 .map(Self::new)
614 }
615
616 pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
618 let expr = normalize_col(expr.into(), &self.plan)?;
619 Filter::try_new(expr, self.plan)
620 .map(LogicalPlan::Filter)
621 .map(Self::from)
622 }
623
624 pub fn prepare(self, name: String, fields: Vec<FieldRef>) -> Result<Self> {
626 Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
627 Prepare {
628 name,
629 fields,
630 input: self.plan,
631 },
632 ))))
633 }
634
635 pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
642 let skip_expr = if skip == 0 {
643 None
644 } else {
645 Some(lit(skip as i64))
646 };
647 let fetch_expr = fetch.map(|f| lit(f as i64));
648 self.limit_by_expr(skip_expr, fetch_expr)
649 }
650
651 pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
655 Ok(Self::new(LogicalPlan::Limit(Limit {
656 skip: skip.map(Box::new),
657 fetch: fetch.map(Box::new),
658 input: self.plan,
659 })))
660 }
661
662 pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
664 subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
665 }
666
667 fn add_missing_columns(
696 curr_plan: LogicalPlan,
697 missing_cols: &IndexSet<Column>,
698 is_distinct: bool,
699 ) -> Result<LogicalPlan> {
700 match curr_plan {
701 LogicalPlan::Projection(Projection {
702 input,
703 mut expr,
704 schema: _,
705 }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
706 let mut missing_exprs = missing_cols
707 .iter()
708 .map(|c| normalize_col(Expr::Column(c.clone()), &input))
709 .collect::<Result<Vec<_>>>()?;
710
711 missing_exprs.retain(|e| !expr.contains(e));
715 if is_distinct {
716 Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
717 }
718 expr.extend(missing_exprs);
719 project(Arc::unwrap_or_clone(input), expr)
720 }
721 _ => {
722 let is_distinct =
723 is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
724 let new_inputs = curr_plan
725 .inputs()
726 .into_iter()
727 .map(|input_plan| {
728 Self::add_missing_columns(
729 (*input_plan).clone(),
730 missing_cols,
731 is_distinct,
732 )
733 })
734 .collect::<Result<Vec<_>>>()?;
735 curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
736 }
737 }
738 }
739
740 fn ambiguous_distinct_check(
741 missing_exprs: &[Expr],
742 missing_cols: &IndexSet<Column>,
743 projection_exprs: &[Expr],
744 ) -> Result<()> {
745 if missing_exprs.is_empty() {
746 return Ok(());
747 }
748
749 let all_aliases = missing_exprs.iter().all(|e| {
757 projection_exprs.iter().any(|proj_expr| {
758 if let Expr::Alias(Alias { expr, .. }) = proj_expr {
759 e == expr.as_ref()
760 } else {
761 false
762 }
763 })
764 });
765 if all_aliases {
766 return Ok(());
767 }
768
769 let missing_col_names = missing_cols
770 .iter()
771 .map(|col| col.flat_name())
772 .collect::<String>();
773
774 plan_err!(
775 "For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list"
776 )
777 }
778
779 pub fn sort_by(
781 self,
782 expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
783 ) -> Result<Self> {
784 self.sort(
785 expr.into_iter()
786 .map(|e| e.into().sort(true, false))
787 .collect::<Vec<SortExpr>>(),
788 )
789 }
790
791 pub fn sort(
792 self,
793 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
794 ) -> Result<Self> {
795 self.sort_with_limit(sorts, None)
796 }
797
798 pub fn sort_with_limit(
800 self,
801 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
802 fetch: Option<usize>,
803 ) -> Result<Self> {
804 let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
805
806 let schema = self.plan.schema();
807
808 let mut missing_cols: IndexSet<Column> = IndexSet::new();
810 sorts.iter().try_for_each::<_, Result<()>>(|sort| {
811 let columns = sort.expr.column_refs();
812
813 missing_cols.extend(
814 columns
815 .into_iter()
816 .filter(|c| !schema.has_column(c))
817 .cloned(),
818 );
819
820 Ok(())
821 })?;
822
823 if missing_cols.is_empty() {
824 return Ok(Self::new(LogicalPlan::Sort(Sort {
825 expr: normalize_sorts(sorts, &self.plan)?,
826 input: self.plan,
827 fetch,
828 })));
829 }
830
831 let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
833
834 let is_distinct = false;
835 let plan = Self::add_missing_columns(
836 Arc::unwrap_or_clone(self.plan),
837 &missing_cols,
838 is_distinct,
839 )?;
840
841 let sort_plan = LogicalPlan::Sort(Sort {
842 expr: normalize_sorts(sorts, &plan)?,
843 input: Arc::new(plan),
844 fetch,
845 });
846
847 Projection::try_new(new_expr, Arc::new(sort_plan))
848 .map(LogicalPlan::Projection)
849 .map(Self::new)
850 }
851
852 pub fn union(self, plan: LogicalPlan) -> Result<Self> {
854 union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
855 }
856
857 pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
859 union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
860 }
861
862 pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
864 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
865 let right_plan: LogicalPlan = plan;
866
867 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
868 union_by_name(left_plan, right_plan)?,
869 )))))
870 }
871
872 pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
874 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
875 let right_plan: LogicalPlan = plan;
876
877 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
878 union(left_plan, right_plan)?,
879 )))))
880 }
881
882 pub fn distinct(self) -> Result<Self> {
884 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
885 }
886
887 pub fn distinct_on(
890 self,
891 on_expr: Vec<Expr>,
892 select_expr: Vec<Expr>,
893 sort_expr: Option<Vec<SortExpr>>,
894 ) -> Result<Self> {
895 Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
896 DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
897 ))))
898 }
899
900 pub fn join(
914 self,
915 right: LogicalPlan,
916 join_type: JoinType,
917 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
918 filter: Option<Expr>,
919 ) -> Result<Self> {
920 self.join_detailed(
921 right,
922 join_type,
923 join_keys,
924 filter,
925 NullEquality::NullEqualsNothing,
926 )
927 }
928
929 pub fn join_on(
970 self,
971 right: LogicalPlan,
972 join_type: JoinType,
973 on_exprs: impl IntoIterator<Item = Expr>,
974 ) -> Result<Self> {
975 let filter = on_exprs.into_iter().reduce(Expr::and);
976
977 self.join_detailed(
978 right,
979 join_type,
980 (Vec::<Column>::new(), Vec::<Column>::new()),
981 filter,
982 NullEquality::NullEqualsNothing,
983 )
984 }
985
986 pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
987 if column.relation.is_some() {
988 return Ok(column);
990 }
991
992 let schema = plan.schema();
993 let fallback_schemas = plan.fallback_normalize_schemas();
994 let using_columns = plan.using_columns()?;
995 column.normalize_with_schemas_and_ambiguity_check(
996 &[&[schema], &fallback_schemas],
997 &using_columns,
998 )
999 }
1000
1001 pub fn join_detailed(
1008 self,
1009 right: LogicalPlan,
1010 join_type: JoinType,
1011 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1012 filter: Option<Expr>,
1013 null_equality: NullEquality,
1014 ) -> Result<Self> {
1015 if join_keys.0.len() != join_keys.1.len() {
1016 return plan_err!("left_keys and right_keys were not the same length");
1017 }
1018
1019 let filter = if let Some(expr) = filter {
1020 let filter = normalize_col_with_schemas_and_ambiguity_check(
1021 expr,
1022 &[&[self.schema(), right.schema()]],
1023 &[],
1024 )?;
1025 Some(filter)
1026 } else {
1027 None
1028 };
1029
1030 let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1031 join_keys
1032 .0
1033 .into_iter()
1034 .zip(join_keys.1)
1035 .map(|(l, r)| {
1036 let l = l.into();
1037 let r = r.into();
1038
1039 match (&l.relation, &r.relation) {
1040 (Some(lr), Some(rr)) => {
1041 let l_is_left =
1042 self.plan.schema().field_with_qualified_name(lr, &l.name);
1043 let l_is_right =
1044 right.schema().field_with_qualified_name(lr, &l.name);
1045 let r_is_left =
1046 self.plan.schema().field_with_qualified_name(rr, &r.name);
1047 let r_is_right =
1048 right.schema().field_with_qualified_name(rr, &r.name);
1049
1050 match (l_is_left, l_is_right, r_is_left, r_is_right) {
1051 (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1052 (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1053 _ => (
1054 Self::normalize(&self.plan, l),
1055 Self::normalize(&right, r),
1056 ),
1057 }
1058 }
1059 (Some(lr), None) => {
1060 let l_is_left =
1061 self.plan.schema().field_with_qualified_name(lr, &l.name);
1062 let l_is_right =
1063 right.schema().field_with_qualified_name(lr, &l.name);
1064
1065 match (l_is_left, l_is_right) {
1066 (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1067 (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1068 _ => (
1069 Self::normalize(&self.plan, l),
1070 Self::normalize(&right, r),
1071 ),
1072 }
1073 }
1074 (None, Some(rr)) => {
1075 let r_is_left =
1076 self.plan.schema().field_with_qualified_name(rr, &r.name);
1077 let r_is_right =
1078 right.schema().field_with_qualified_name(rr, &r.name);
1079
1080 match (r_is_left, r_is_right) {
1081 (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1082 (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1083 _ => (
1084 Self::normalize(&self.plan, l),
1085 Self::normalize(&right, r),
1086 ),
1087 }
1088 }
1089 (None, None) => {
1090 let mut swap = false;
1091 let left_key = Self::normalize(&self.plan, l.clone())
1092 .or_else(|_| {
1093 swap = true;
1094 Self::normalize(&right, l)
1095 });
1096 if swap {
1097 (Self::normalize(&self.plan, r), left_key)
1098 } else {
1099 (left_key, Self::normalize(&right, r))
1100 }
1101 }
1102 }
1103 })
1104 .unzip();
1105
1106 let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1107 let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1108
1109 let on: Vec<_> = left_keys
1110 .into_iter()
1111 .zip(right_keys)
1112 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1113 .collect();
1114 let join_schema =
1115 build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1116
1117 if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1119 return plan_err!("join condition should not be empty");
1120 }
1121
1122 Ok(Self::new(LogicalPlan::Join(Join {
1123 left: self.plan,
1124 right: Arc::new(right),
1125 on,
1126 filter,
1127 join_type,
1128 join_constraint: JoinConstraint::On,
1129 schema: DFSchemaRef::new(join_schema),
1130 null_equality,
1131 })))
1132 }
1133
1134 pub fn join_using(
1136 self,
1137 right: LogicalPlan,
1138 join_type: JoinType,
1139 using_keys: Vec<Column>,
1140 ) -> Result<Self> {
1141 let left_keys: Vec<Column> = using_keys
1142 .clone()
1143 .into_iter()
1144 .map(|c| Self::normalize(&self.plan, c))
1145 .collect::<Result<_>>()?;
1146 let right_keys: Vec<Column> = using_keys
1147 .into_iter()
1148 .map(|c| Self::normalize(&right, c))
1149 .collect::<Result<_>>()?;
1150
1151 let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1152 let mut join_on: Vec<(Expr, Expr)> = vec![];
1153 let mut filters: Option<Expr> = None;
1154 for (l, r) in &on {
1155 if self.plan.schema().has_column(l)
1156 && right.schema().has_column(r)
1157 && can_hash(
1158 datafusion_common::ExprSchema::field_from_column(
1159 self.plan.schema(),
1160 l,
1161 )?
1162 .data_type(),
1163 )
1164 {
1165 join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1166 } else if self.plan.schema().has_column(l)
1167 && right.schema().has_column(r)
1168 && can_hash(
1169 datafusion_common::ExprSchema::field_from_column(
1170 self.plan.schema(),
1171 r,
1172 )?
1173 .data_type(),
1174 )
1175 {
1176 join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1177 } else {
1178 let expr = binary_expr(
1179 Expr::Column(l.clone()),
1180 Operator::Eq,
1181 Expr::Column(r.clone()),
1182 );
1183 match filters {
1184 None => filters = Some(expr),
1185 Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1186 }
1187 }
1188 }
1189
1190 if join_on.is_empty() {
1191 let join = Self::from(self.plan).cross_join(right)?;
1192 join.filter(filters.ok_or_else(|| {
1193 internal_datafusion_err!("filters should not be None here")
1194 })?)
1195 } else {
1196 let join = Join::try_new(
1197 self.plan,
1198 Arc::new(right),
1199 join_on,
1200 filters,
1201 join_type,
1202 JoinConstraint::Using,
1203 NullEquality::NullEqualsNothing,
1204 )?;
1205
1206 Ok(Self::new(LogicalPlan::Join(join)))
1207 }
1208 }
1209
1210 pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1212 let join = Join::try_new(
1213 self.plan,
1214 Arc::new(right),
1215 vec![],
1216 None,
1217 JoinType::Inner,
1218 JoinConstraint::On,
1219 NullEquality::NullEqualsNothing,
1220 )?;
1221
1222 Ok(Self::new(LogicalPlan::Join(join)))
1223 }
1224
1225 pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1227 Ok(Self::new(LogicalPlan::Repartition(Repartition {
1228 input: self.plan,
1229 partitioning_scheme,
1230 })))
1231 }
1232
1233 pub fn window(
1235 self,
1236 window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1237 ) -> Result<Self> {
1238 let window_expr = normalize_cols(window_expr, &self.plan)?;
1239 validate_unique_names("Windows", &window_expr)?;
1240 Ok(Self::new(LogicalPlan::Window(Window::try_new(
1241 window_expr,
1242 self.plan,
1243 )?)))
1244 }
1245
1246 pub fn aggregate(
1250 self,
1251 group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1252 aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1253 ) -> Result<Self> {
1254 let group_expr = normalize_cols(group_expr, &self.plan)?;
1255 let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1256
1257 let group_expr = if self.options.add_implicit_group_by_exprs {
1258 add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1259 } else {
1260 group_expr
1261 };
1262
1263 Aggregate::try_new(self.plan, group_expr, aggr_expr)
1264 .map(LogicalPlan::Aggregate)
1265 .map(Self::new)
1266 }
1267
1268 pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1275 self.explain_option_format(
1277 ExplainOption::default()
1278 .with_verbose(verbose)
1279 .with_analyze(analyze),
1280 )
1281 }
1282
1283 pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1287 let schema = LogicalPlan::explain_schema();
1288 let schema = schema.to_dfschema_ref()?;
1289
1290 if explain_option.analyze {
1291 Ok(Self::new(LogicalPlan::Analyze(Analyze {
1292 verbose: explain_option.verbose,
1293 input: self.plan,
1294 schema,
1295 })))
1296 } else {
1297 let stringified_plans =
1298 vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1299
1300 Ok(Self::new(LogicalPlan::Explain(Explain {
1301 verbose: explain_option.verbose,
1302 plan: self.plan,
1303 explain_format: explain_option.format,
1304 stringified_plans,
1305 schema,
1306 logical_optimization_succeeded: false,
1307 })))
1308 }
1309 }
1310
1311 pub fn intersect(
1313 left_plan: LogicalPlan,
1314 right_plan: LogicalPlan,
1315 is_all: bool,
1316 ) -> Result<LogicalPlan> {
1317 LogicalPlanBuilder::intersect_or_except(
1318 left_plan,
1319 right_plan,
1320 JoinType::LeftSemi,
1321 is_all,
1322 )
1323 }
1324
1325 pub fn except(
1327 left_plan: LogicalPlan,
1328 right_plan: LogicalPlan,
1329 is_all: bool,
1330 ) -> Result<LogicalPlan> {
1331 LogicalPlanBuilder::intersect_or_except(
1332 left_plan,
1333 right_plan,
1334 JoinType::LeftAnti,
1335 is_all,
1336 )
1337 }
1338
1339 fn intersect_or_except(
1341 left_plan: LogicalPlan,
1342 right_plan: LogicalPlan,
1343 join_type: JoinType,
1344 is_all: bool,
1345 ) -> Result<LogicalPlan> {
1346 let left_len = left_plan.schema().fields().len();
1347 let right_len = right_plan.schema().fields().len();
1348
1349 if left_len != right_len {
1350 return plan_err!(
1351 "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1352 );
1353 }
1354
1355 let left_builder = LogicalPlanBuilder::from(left_plan);
1358 let right_builder = LogicalPlanBuilder::from(right_plan);
1359 let (left_builder, right_builder, _requalified) =
1360 requalify_sides_if_needed(left_builder, right_builder)?;
1361 let left_plan = left_builder.build()?;
1362 let right_plan = right_builder.build()?;
1363
1364 let join_keys = left_plan
1365 .schema()
1366 .fields()
1367 .iter()
1368 .zip(right_plan.schema().fields().iter())
1369 .map(|(left_field, right_field)| {
1370 (
1371 (Column::from_name(left_field.name())),
1372 (Column::from_name(right_field.name())),
1373 )
1374 })
1375 .unzip();
1376 if is_all {
1377 LogicalPlanBuilder::from(left_plan)
1378 .join_detailed(
1379 right_plan,
1380 join_type,
1381 join_keys,
1382 None,
1383 NullEquality::NullEqualsNull,
1384 )?
1385 .build()
1386 } else {
1387 LogicalPlanBuilder::from(left_plan)
1388 .distinct()?
1389 .join_detailed(
1390 right_plan,
1391 join_type,
1392 join_keys,
1393 None,
1394 NullEquality::NullEqualsNull,
1395 )?
1396 .build()
1397 }
1398 }
1399
1400 pub fn build(self) -> Result<LogicalPlan> {
1402 Ok(Arc::unwrap_or_clone(self.plan))
1403 }
1404
1405 pub fn join_with_expr_keys(
1420 self,
1421 right: LogicalPlan,
1422 join_type: JoinType,
1423 equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1424 filter: Option<Expr>,
1425 ) -> Result<Self> {
1426 if equi_exprs.0.len() != equi_exprs.1.len() {
1427 return plan_err!("left_keys and right_keys were not the same length");
1428 }
1429
1430 let join_key_pairs = equi_exprs
1431 .0
1432 .into_iter()
1433 .zip(equi_exprs.1)
1434 .map(|(l, r)| {
1435 let left_key = l.into();
1436 let right_key = r.into();
1437 let mut left_using_columns = HashSet::new();
1438 expr_to_columns(&left_key, &mut left_using_columns)?;
1439 let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1440 left_key,
1441 &[&[self.plan.schema()]],
1442 &[],
1443 )?;
1444
1445 let mut right_using_columns = HashSet::new();
1446 expr_to_columns(&right_key, &mut right_using_columns)?;
1447 let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1448 right_key,
1449 &[&[right.schema()]],
1450 &[],
1451 )?;
1452
1453 find_valid_equijoin_key_pair(
1455 &normalized_left_key,
1456 &normalized_right_key,
1457 self.plan.schema(),
1458 right.schema(),
1459 )?.ok_or_else(||
1460 plan_datafusion_err!(
1461 "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1462 ))
1463 })
1464 .collect::<Result<Vec<_>>>()?;
1465
1466 let join = Join::try_new(
1467 self.plan,
1468 Arc::new(right),
1469 join_key_pairs,
1470 filter,
1471 join_type,
1472 JoinConstraint::On,
1473 NullEquality::NullEqualsNothing,
1474 )?;
1475
1476 Ok(Self::new(LogicalPlan::Join(join)))
1477 }
1478
1479 pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1481 unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1482 }
1483
1484 pub fn unnest_column_with_options(
1486 self,
1487 column: impl Into<Column>,
1488 options: UnnestOptions,
1489 ) -> Result<Self> {
1490 unnest_with_options(
1491 Arc::unwrap_or_clone(self.plan),
1492 vec![column.into()],
1493 options,
1494 )
1495 .map(Self::new)
1496 }
1497
1498 pub fn unnest_columns_with_options(
1500 self,
1501 columns: Vec<Column>,
1502 options: UnnestOptions,
1503 ) -> Result<Self> {
1504 unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1505 .map(Self::new)
1506 }
1507}
1508
1509impl From<LogicalPlan> for LogicalPlanBuilder {
1510 fn from(plan: LogicalPlan) -> Self {
1511 LogicalPlanBuilder::new(plan)
1512 }
1513}
1514
1515impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1516 fn from(plan: Arc<LogicalPlan>) -> Self {
1517 LogicalPlanBuilder::new_from_arc(plan)
1518 }
1519}
1520
1521#[derive(Default)]
1523struct ValuesFields {
1524 inner: Vec<Field>,
1525}
1526
1527impl ValuesFields {
1528 pub fn new() -> Self {
1529 Self::default()
1530 }
1531
1532 pub fn push(&mut self, data_type: DataType, nullable: bool) {
1533 self.push_with_metadata(data_type, nullable, None);
1534 }
1535
1536 pub fn push_with_metadata(
1537 &mut self,
1538 data_type: DataType,
1539 nullable: bool,
1540 metadata: Option<FieldMetadata>,
1541 ) {
1542 let name = format!("column{}", self.inner.len() + 1);
1545 let mut field = Field::new(name, data_type, nullable);
1546 if let Some(metadata) = metadata {
1547 field.set_metadata(metadata.to_hashmap());
1548 }
1549 self.inner.push(field);
1550 }
1551
1552 pub fn into_fields(self) -> Fields {
1553 self.inner.into()
1554 }
1555}
1556
1557pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1569 let mut name_map = HashMap::<&str, usize>::new();
1577 let mut seen = HashSet::<Cow<String>>::new();
1579
1580 fields
1581 .iter()
1582 .map(|field| {
1583 let original_name = field.name();
1584 let mut name = Cow::Borrowed(original_name);
1585
1586 let count = name_map.entry(original_name).or_insert(0);
1587
1588 while seen.contains(&name) {
1590 *count += 1;
1591 name = Cow::Owned(format!("{original_name}:{count}"));
1592 }
1593
1594 seen.insert(name.clone());
1595
1596 match name {
1597 Cow::Borrowed(_) => None,
1598 Cow::Owned(alias) => Some(alias),
1599 }
1600 })
1601 .collect()
1602}
1603
1604fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1605 let mut table_references = schema
1606 .iter()
1607 .filter_map(|(qualifier, _)| qualifier)
1608 .collect::<Vec<_>>();
1609 table_references.dedup();
1610 let table_reference = if table_references.len() == 1 {
1611 table_references.pop().cloned()
1612 } else {
1613 None
1614 };
1615
1616 (
1617 table_reference,
1618 Arc::new(Field::new("mark", DataType::Boolean, false)),
1619 )
1620}
1621
1622pub fn build_join_schema(
1625 left: &DFSchema,
1626 right: &DFSchema,
1627 join_type: &JoinType,
1628) -> Result<DFSchema> {
1629 fn nullify_fields<'a>(
1630 fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1631 ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1632 fields
1633 .map(|(q, f)| {
1634 let field = f.as_ref().clone().with_nullable(true);
1636 (q.cloned(), Arc::new(field))
1637 })
1638 .collect()
1639 }
1640
1641 let right_fields = right.iter();
1642 let left_fields = left.iter();
1643
1644 let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1645 JoinType::Inner => {
1646 let left_fields = left_fields
1648 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1649 .collect::<Vec<_>>();
1650 let right_fields = right_fields
1651 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1652 .collect::<Vec<_>>();
1653 left_fields.into_iter().chain(right_fields).collect()
1654 }
1655 JoinType::Left => {
1656 let left_fields = left_fields
1658 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1659 .collect::<Vec<_>>();
1660 left_fields
1661 .into_iter()
1662 .chain(nullify_fields(right_fields))
1663 .collect()
1664 }
1665 JoinType::Right => {
1666 let right_fields = right_fields
1668 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1669 .collect::<Vec<_>>();
1670 nullify_fields(left_fields)
1671 .into_iter()
1672 .chain(right_fields)
1673 .collect()
1674 }
1675 JoinType::Full => {
1676 nullify_fields(left_fields)
1678 .into_iter()
1679 .chain(nullify_fields(right_fields))
1680 .collect()
1681 }
1682 JoinType::LeftSemi | JoinType::LeftAnti => {
1683 left_fields
1685 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1686 .collect()
1687 }
1688 JoinType::LeftMark => left_fields
1689 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1690 .chain(once(mark_field(right)))
1691 .collect(),
1692 JoinType::RightSemi | JoinType::RightAnti => {
1693 right_fields
1695 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1696 .collect()
1697 }
1698 JoinType::RightMark => right_fields
1699 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1700 .chain(once(mark_field(left)))
1701 .collect(),
1702 };
1703 let func_dependencies = left.functional_dependencies().join(
1704 right.functional_dependencies(),
1705 join_type,
1706 left.fields().len(),
1707 );
1708
1709 let (schema1, schema2) = match join_type {
1710 JoinType::Right
1711 | JoinType::RightSemi
1712 | JoinType::RightAnti
1713 | JoinType::RightMark => (left, right),
1714 _ => (right, left),
1715 };
1716
1717 let metadata = schema1
1718 .metadata()
1719 .clone()
1720 .into_iter()
1721 .chain(schema2.metadata().clone())
1722 .collect();
1723
1724 let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1725 dfschema.with_functional_dependencies(func_dependencies)
1726}
1727
1728pub fn requalify_sides_if_needed(
1738 left: LogicalPlanBuilder,
1739 right: LogicalPlanBuilder,
1740) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1741 let left_cols = left.schema().columns();
1742 let right_cols = right.schema().columns();
1743
1744 for l in &left_cols {
1758 for r in &right_cols {
1759 if l.name != r.name {
1760 continue;
1761 }
1762
1763 match (&l.relation, &r.relation) {
1765 (Some(l_rel), Some(r_rel)) if l_rel == r_rel => {
1767 return Ok((
1768 left.alias(TableReference::bare("left"))?,
1769 right.alias(TableReference::bare("right"))?,
1770 true,
1771 ));
1772 }
1773 (None, None) => {
1775 return Ok((
1776 left.alias(TableReference::bare("left"))?,
1777 right.alias(TableReference::bare("right"))?,
1778 true,
1779 ));
1780 }
1781 (Some(_), None) | (None, Some(_)) => {
1783 return Ok((
1784 left.alias(TableReference::bare("left"))?,
1785 right.alias(TableReference::bare("right"))?,
1786 true,
1787 ));
1788 }
1789 _ => {}
1791 }
1792 }
1793 }
1794
1795 Ok((left, right, false))
1797}
1798pub fn add_group_by_exprs_from_dependencies(
1808 mut group_expr: Vec<Expr>,
1809 schema: &DFSchemaRef,
1810) -> Result<Vec<Expr>> {
1811 let mut group_by_field_names = group_expr
1814 .iter()
1815 .map(|e| e.schema_name().to_string())
1816 .collect::<Vec<_>>();
1817
1818 if let Some(target_indices) =
1819 get_target_functional_dependencies(schema, &group_by_field_names)
1820 {
1821 for idx in target_indices {
1822 let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1823 let expr_name = expr.schema_name().to_string();
1824 if !group_by_field_names.contains(&expr_name) {
1825 group_by_field_names.push(expr_name);
1826 group_expr.push(expr);
1827 }
1828 }
1829 }
1830 Ok(group_expr)
1831}
1832
1833pub fn validate_unique_names<'a>(
1835 node_name: &str,
1836 expressions: impl IntoIterator<Item = &'a Expr>,
1837) -> Result<()> {
1838 let mut unique_names = HashMap::new();
1839
1840 expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1841 let name = expr.schema_name().to_string();
1842 match unique_names.get(&name) {
1843 None => {
1844 unique_names.insert(name, (position, expr));
1845 Ok(())
1846 },
1847 Some((existing_position, existing_expr)) => {
1848 plan_err!("{node_name} require unique expression names \
1849 but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1850 at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1851 )
1852 }
1853 }
1854 })
1855}
1856
1857pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1869 Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1870 Arc::new(left_plan),
1871 Arc::new(right_plan),
1872 ])?))
1873}
1874
1875pub fn union_by_name(
1878 left_plan: LogicalPlan,
1879 right_plan: LogicalPlan,
1880) -> Result<LogicalPlan> {
1881 Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1882 Arc::new(left_plan),
1883 Arc::new(right_plan),
1884 ])?))
1885}
1886
1887pub fn project(
1893 plan: LogicalPlan,
1894 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1895) -> Result<LogicalPlan> {
1896 project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1897}
1898
1899fn project_with_validation(
1907 plan: LogicalPlan,
1908 expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1909) -> Result<LogicalPlan> {
1910 let mut projected_expr = vec![];
1911 for (e, validate) in expr {
1912 let e = e.into();
1913 match e {
1914 SelectExpr::Wildcard(opt) => {
1915 let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1916
1917 let expanded = if let Some(replace) = opt.replace {
1920 replace_columns(expanded, &replace)?
1921 } else {
1922 expanded
1923 };
1924
1925 for e in expanded {
1926 if validate {
1927 projected_expr
1928 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1929 } else {
1930 projected_expr.push(e)
1931 }
1932 }
1933 }
1934 SelectExpr::QualifiedWildcard(table_ref, opt) => {
1935 let expanded =
1936 expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1937
1938 let expanded = if let Some(replace) = opt.replace {
1941 replace_columns(expanded, &replace)?
1942 } else {
1943 expanded
1944 };
1945
1946 for e in expanded {
1947 if validate {
1948 projected_expr
1949 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1950 } else {
1951 projected_expr.push(e)
1952 }
1953 }
1954 }
1955 SelectExpr::Expression(e) => {
1956 if validate {
1957 projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1958 } else {
1959 projected_expr.push(e)
1960 }
1961 }
1962 }
1963 }
1964 validate_unique_names("Projections", projected_expr.iter())?;
1965
1966 Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1967}
1968
1969fn replace_columns(
1974 mut exprs: Vec<Expr>,
1975 replace: &PlannedReplaceSelectItem,
1976) -> Result<Vec<Expr>> {
1977 for expr in exprs.iter_mut() {
1978 if let Expr::Column(Column { name, .. }) = expr
1979 && let Some((_, new_expr)) = replace
1980 .items()
1981 .iter()
1982 .zip(replace.expressions().iter())
1983 .find(|(item, _)| item.column_name.value == *name)
1984 {
1985 *expr = new_expr.clone().alias(name.clone())
1986 }
1987 }
1988 Ok(exprs)
1989}
1990
1991pub fn subquery_alias(
1993 plan: LogicalPlan,
1994 alias: impl Into<TableReference>,
1995) -> Result<LogicalPlan> {
1996 SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1997}
1998
1999pub fn table_scan(
2002 name: Option<impl Into<TableReference>>,
2003 table_schema: &Schema,
2004 projection: Option<Vec<usize>>,
2005) -> Result<LogicalPlanBuilder> {
2006 table_scan_with_filters(name, table_schema, projection, vec![])
2007}
2008
2009pub fn table_scan_with_filters(
2013 name: Option<impl Into<TableReference>>,
2014 table_schema: &Schema,
2015 projection: Option<Vec<usize>>,
2016 filters: Vec<Expr>,
2017) -> Result<LogicalPlanBuilder> {
2018 let table_source = table_source(table_schema);
2019 let name = name
2020 .map(|n| n.into())
2021 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2022 LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
2023}
2024
2025pub fn table_scan_with_filter_and_fetch(
2029 name: Option<impl Into<TableReference>>,
2030 table_schema: &Schema,
2031 projection: Option<Vec<usize>>,
2032 filters: Vec<Expr>,
2033 fetch: Option<usize>,
2034) -> Result<LogicalPlanBuilder> {
2035 let table_source = table_source(table_schema);
2036 let name = name
2037 .map(|n| n.into())
2038 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
2039 LogicalPlanBuilder::scan_with_filters_fetch(
2040 name,
2041 table_source,
2042 projection,
2043 filters,
2044 fetch,
2045 )
2046}
2047
2048pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
2049 let table_schema = Arc::new(table_schema.clone());
2051 Arc::new(LogicalTableSource {
2052 table_schema,
2053 constraints: Default::default(),
2054 })
2055}
2056
2057pub fn table_source_with_constraints(
2058 table_schema: &Schema,
2059 constraints: Constraints,
2060) -> Arc<dyn TableSource> {
2061 let table_schema = Arc::new(table_schema.clone());
2063 Arc::new(LogicalTableSource {
2064 table_schema,
2065 constraints,
2066 })
2067}
2068
2069pub fn wrap_projection_for_join_if_necessary(
2071 join_keys: &[Expr],
2072 input: LogicalPlan,
2073) -> Result<(LogicalPlan, Vec<Column>, bool)> {
2074 let input_schema = input.schema();
2075 let alias_join_keys: Vec<Expr> = join_keys
2076 .iter()
2077 .map(|key| {
2078 if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
2087 let alias = format!("{key}");
2088 key.clone().alias(alias)
2089 } else {
2090 key.clone()
2091 }
2092 })
2093 .collect::<Vec<_>>();
2094
2095 let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2096 let plan = if need_project {
2097 let mut projection = input_schema
2099 .columns()
2100 .into_iter()
2101 .map(Expr::Column)
2102 .collect::<Vec<_>>();
2103 let join_key_items = alias_join_keys
2104 .iter()
2105 .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2106 .cloned()
2107 .collect::<HashSet<Expr>>();
2108 projection.extend(join_key_items);
2109
2110 LogicalPlanBuilder::from(input)
2111 .project(projection.into_iter().map(SelectExpr::from))?
2112 .build()?
2113 } else {
2114 input
2115 };
2116
2117 let join_on = alias_join_keys
2118 .into_iter()
2119 .map(|key| {
2120 if let Some(col) = key.try_as_col() {
2121 Ok(col.clone())
2122 } else {
2123 let name = key.schema_name().to_string();
2124 Ok(Column::from_name(name))
2125 }
2126 })
2127 .collect::<Result<Vec<_>>>()?;
2128
2129 Ok((plan, join_on, need_project))
2130}
2131
2132pub struct LogicalTableSource {
2136 table_schema: SchemaRef,
2137 constraints: Constraints,
2138}
2139
2140impl LogicalTableSource {
2141 pub fn new(table_schema: SchemaRef) -> Self {
2143 Self {
2144 table_schema,
2145 constraints: Constraints::default(),
2146 }
2147 }
2148
2149 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2150 self.constraints = constraints;
2151 self
2152 }
2153}
2154
2155impl TableSource for LogicalTableSource {
2156 fn as_any(&self) -> &dyn Any {
2157 self
2158 }
2159
2160 fn schema(&self) -> SchemaRef {
2161 Arc::clone(&self.table_schema)
2162 }
2163
2164 fn constraints(&self) -> Option<&Constraints> {
2165 Some(&self.constraints)
2166 }
2167
2168 fn supports_filters_pushdown(
2169 &self,
2170 filters: &[&Expr],
2171 ) -> Result<Vec<TableProviderFilterPushDown>> {
2172 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2173 }
2174}
2175
2176pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2178 unnest_with_options(input, columns, UnnestOptions::default())
2179}
2180
2181pub fn get_struct_unnested_columns(
2182 col_name: &String,
2183 inner_fields: &Fields,
2184) -> Vec<Column> {
2185 inner_fields
2186 .iter()
2187 .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2188 .collect()
2189}
2190
2191pub fn unnest_with_options(
2221 input: LogicalPlan,
2222 columns_to_unnest: Vec<Column>,
2223 options: UnnestOptions,
2224) -> Result<LogicalPlan> {
2225 Ok(LogicalPlan::Unnest(Unnest::try_new(
2226 Arc::new(input),
2227 columns_to_unnest,
2228 options,
2229 )?))
2230}
2231
2232#[cfg(test)]
2233mod tests {
2234 use std::vec;
2235
2236 use super::*;
2237 use crate::lit_with_metadata;
2238 use crate::logical_plan::StringifiedPlan;
2239 use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2240
2241 use crate::test::function_stub::sum;
2242 use datafusion_common::{
2243 Constraint, DataFusionError, RecursionUnnestOption, SchemaError,
2244 };
2245 use insta::assert_snapshot;
2246
2247 #[test]
2248 fn plan_builder_simple() -> Result<()> {
2249 let plan =
2250 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2251 .filter(col("state").eq(lit("CO")))?
2252 .project(vec![col("id")])?
2253 .build()?;
2254
2255 assert_snapshot!(plan, @r#"
2256 Projection: employee_csv.id
2257 Filter: employee_csv.state = Utf8("CO")
2258 TableScan: employee_csv projection=[id, state]
2259 "#);
2260
2261 Ok(())
2262 }
2263
2264 #[test]
2265 fn plan_builder_schema() {
2266 let schema = employee_schema();
2267 let projection = None;
2268 let plan =
2269 LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2270 .unwrap();
2271 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2272
2273 let projection = None;
2276 let plan =
2277 LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2278 .unwrap();
2279 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2280 }
2281
2282 #[test]
2283 fn plan_builder_empty_name() {
2284 let schema = employee_schema();
2285 let projection = None;
2286 let err =
2287 LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2288 assert_snapshot!(
2289 err.strip_backtrace(),
2290 @"Error during planning: table_name cannot be empty"
2291 );
2292 }
2293
2294 #[test]
2295 fn plan_builder_sort() -> Result<()> {
2296 let plan =
2297 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2298 .sort(vec![
2299 expr::Sort::new(col("state"), true, true),
2300 expr::Sort::new(col("salary"), false, false),
2301 ])?
2302 .build()?;
2303
2304 assert_snapshot!(plan, @r"
2305 Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2306 TableScan: employee_csv projection=[state, salary]
2307 ");
2308
2309 Ok(())
2310 }
2311
2312 #[test]
2313 fn plan_builder_union() -> Result<()> {
2314 let plan =
2315 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2316
2317 let plan = plan
2318 .clone()
2319 .union(plan.clone().build()?)?
2320 .union(plan.clone().build()?)?
2321 .union(plan.build()?)?
2322 .build()?;
2323
2324 assert_snapshot!(plan, @r"
2325 Union
2326 Union
2327 Union
2328 TableScan: employee_csv projection=[state, salary]
2329 TableScan: employee_csv projection=[state, salary]
2330 TableScan: employee_csv projection=[state, salary]
2331 TableScan: employee_csv projection=[state, salary]
2332 ");
2333
2334 Ok(())
2335 }
2336
2337 #[test]
2338 fn plan_builder_union_distinct() -> Result<()> {
2339 let plan =
2340 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2341
2342 let plan = plan
2343 .clone()
2344 .union_distinct(plan.clone().build()?)?
2345 .union_distinct(plan.clone().build()?)?
2346 .union_distinct(plan.build()?)?
2347 .build()?;
2348
2349 assert_snapshot!(plan, @r"
2350 Distinct:
2351 Union
2352 Distinct:
2353 Union
2354 Distinct:
2355 Union
2356 TableScan: employee_csv projection=[state, salary]
2357 TableScan: employee_csv projection=[state, salary]
2358 TableScan: employee_csv projection=[state, salary]
2359 TableScan: employee_csv projection=[state, salary]
2360 ");
2361
2362 Ok(())
2363 }
2364
2365 #[test]
2366 fn plan_builder_simple_distinct() -> Result<()> {
2367 let plan =
2368 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2369 .filter(col("state").eq(lit("CO")))?
2370 .project(vec![col("id")])?
2371 .distinct()?
2372 .build()?;
2373
2374 assert_snapshot!(plan, @r#"
2375 Distinct:
2376 Projection: employee_csv.id
2377 Filter: employee_csv.state = Utf8("CO")
2378 TableScan: employee_csv projection=[id, state]
2379 "#);
2380
2381 Ok(())
2382 }
2383
2384 #[test]
2385 fn exists_subquery() -> Result<()> {
2386 let foo = test_table_scan_with_name("foo")?;
2387 let bar = test_table_scan_with_name("bar")?;
2388
2389 let subquery = LogicalPlanBuilder::from(foo)
2390 .project(vec![col("a")])?
2391 .filter(col("a").eq(col("bar.a")))?
2392 .build()?;
2393
2394 let outer_query = LogicalPlanBuilder::from(bar)
2395 .project(vec![col("a")])?
2396 .filter(exists(Arc::new(subquery)))?
2397 .build()?;
2398
2399 assert_snapshot!(outer_query, @r"
2400 Filter: EXISTS (<subquery>)
2401 Subquery:
2402 Filter: foo.a = bar.a
2403 Projection: foo.a
2404 TableScan: foo
2405 Projection: bar.a
2406 TableScan: bar
2407 ");
2408
2409 Ok(())
2410 }
2411
2412 #[test]
2413 fn filter_in_subquery() -> Result<()> {
2414 let foo = test_table_scan_with_name("foo")?;
2415 let bar = test_table_scan_with_name("bar")?;
2416
2417 let subquery = LogicalPlanBuilder::from(foo)
2418 .project(vec![col("a")])?
2419 .filter(col("a").eq(col("bar.a")))?
2420 .build()?;
2421
2422 let outer_query = LogicalPlanBuilder::from(bar)
2424 .project(vec![col("a")])?
2425 .filter(in_subquery(col("a"), Arc::new(subquery)))?
2426 .build()?;
2427
2428 assert_snapshot!(outer_query, @r"
2429 Filter: bar.a IN (<subquery>)
2430 Subquery:
2431 Filter: foo.a = bar.a
2432 Projection: foo.a
2433 TableScan: foo
2434 Projection: bar.a
2435 TableScan: bar
2436 ");
2437
2438 Ok(())
2439 }
2440
2441 #[test]
2442 fn select_scalar_subquery() -> Result<()> {
2443 let foo = test_table_scan_with_name("foo")?;
2444 let bar = test_table_scan_with_name("bar")?;
2445
2446 let subquery = LogicalPlanBuilder::from(foo)
2447 .project(vec![col("b")])?
2448 .filter(col("a").eq(col("bar.a")))?
2449 .build()?;
2450
2451 let outer_query = LogicalPlanBuilder::from(bar)
2453 .project(vec![scalar_subquery(Arc::new(subquery))])?
2454 .build()?;
2455
2456 assert_snapshot!(outer_query, @r"
2457 Projection: (<subquery>)
2458 Subquery:
2459 Filter: foo.a = bar.a
2460 Projection: foo.b
2461 TableScan: foo
2462 TableScan: bar
2463 ");
2464
2465 Ok(())
2466 }
2467
2468 #[test]
2469 fn projection_non_unique_names() -> Result<()> {
2470 let plan = table_scan(
2471 Some("employee_csv"),
2472 &employee_schema(),
2473 Some(vec![0, 1]),
2475 )?
2476 .project(vec![col("id"), col("first_name").alias("id")]);
2478
2479 match plan {
2480 Err(DataFusionError::SchemaError(err, _)) => {
2481 if let SchemaError::AmbiguousReference { field } = *err {
2482 let Column {
2483 relation,
2484 name,
2485 spans: _,
2486 } = *field;
2487 let Some(TableReference::Bare { table }) = relation else {
2488 return plan_err!(
2489 "wrong relation: {relation:?}, expected table name"
2490 );
2491 };
2492 assert_eq!(*"employee_csv", *table);
2493 assert_eq!("id", &name);
2494 Ok(())
2495 } else {
2496 plan_err!("Plan should have returned an DataFusionError::SchemaError")
2497 }
2498 }
2499 _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2500 }
2501 }
2502
2503 fn employee_schema() -> Schema {
2504 Schema::new(vec![
2505 Field::new("id", DataType::Int32, false),
2506 Field::new("first_name", DataType::Utf8, false),
2507 Field::new("last_name", DataType::Utf8, false),
2508 Field::new("state", DataType::Utf8, false),
2509 Field::new("salary", DataType::Int32, false),
2510 ])
2511 }
2512
2513 #[test]
2514 fn stringified_plan() {
2515 let stringified_plan =
2516 StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2517 assert!(stringified_plan.should_display(true));
2518 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2521 StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2522 assert!(stringified_plan.should_display(true));
2523 assert!(stringified_plan.should_display(false)); let stringified_plan =
2526 StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2527 assert!(stringified_plan.should_display(true));
2528 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2531 StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2532 assert!(stringified_plan.should_display(true));
2533 assert!(stringified_plan.should_display(false)); let stringified_plan = StringifiedPlan::new(
2536 PlanType::OptimizedLogicalPlan {
2537 optimizer_name: "random opt pass".into(),
2538 },
2539 "...the plan...",
2540 );
2541 assert!(stringified_plan.should_display(true));
2542 assert!(!stringified_plan.should_display(false));
2543 }
2544
2545 fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2546 let schema = Schema::new(vec![
2547 Field::new("a", DataType::UInt32, false),
2548 Field::new("b", DataType::UInt32, false),
2549 Field::new("c", DataType::UInt32, false),
2550 ]);
2551 table_scan(Some(name), &schema, None)?.build()
2552 }
2553
2554 #[test]
2555 fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2556 let plan1 =
2557 table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2558 let plan2 =
2559 table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2560
2561 let err_msg1 =
2562 LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2563 .unwrap_err();
2564
2565 assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2566
2567 Ok(())
2568 }
2569
2570 #[test]
2571 fn plan_builder_unnest() -> Result<()> {
2572 let err = nested_table_scan("test_table")?
2574 .unnest_column("scalar")
2575 .unwrap_err();
2576
2577 let DataFusionError::Internal(desc) = err else {
2578 return plan_err!("Plan should have returned an DataFusionError::Internal");
2579 };
2580
2581 let desc = (*desc
2582 .split(DataFusionError::BACK_TRACE_SEP)
2583 .collect::<Vec<&str>>()
2584 .first()
2585 .unwrap_or(&""))
2586 .to_string();
2587
2588 assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2589
2590 let plan = nested_table_scan("test_table")?
2592 .unnest_column("strings")?
2593 .build()?;
2594
2595 assert_snapshot!(plan, @r"
2596 Unnest: lists[test_table.strings|depth=1] structs[]
2597 TableScan: test_table
2598 ");
2599
2600 let field = plan.schema().field_with_name(None, "strings").unwrap();
2602 assert_eq!(&DataType::Utf8, field.data_type());
2603
2604 let plan = nested_table_scan("test_table")?
2606 .unnest_column("struct_singular")?
2607 .build()?;
2608
2609 assert_snapshot!(plan, @r"
2610 Unnest: lists[] structs[test_table.struct_singular]
2611 TableScan: test_table
2612 ");
2613
2614 for field_name in &["a", "b"] {
2615 let field = plan
2617 .schema()
2618 .field_with_name(None, &format!("struct_singular.{field_name}"))
2619 .unwrap();
2620 assert_eq!(&DataType::UInt32, field.data_type());
2621 }
2622
2623 let plan = nested_table_scan("test_table")?
2625 .unnest_column("strings")?
2626 .unnest_column("structs")?
2627 .unnest_column("struct_singular")?
2628 .build()?;
2629
2630 assert_snapshot!(plan, @r"
2631 Unnest: lists[] structs[test_table.struct_singular]
2632 Unnest: lists[test_table.structs|depth=1] structs[]
2633 Unnest: lists[test_table.strings|depth=1] structs[]
2634 TableScan: test_table
2635 ");
2636
2637 let field = plan.schema().field_with_name(None, "structs").unwrap();
2639 assert!(matches!(field.data_type(), DataType::Struct(_)));
2640
2641 let cols = vec!["strings", "structs", "struct_singular"]
2643 .into_iter()
2644 .map(|c| c.into())
2645 .collect();
2646
2647 let plan = nested_table_scan("test_table")?
2648 .unnest_columns_with_options(cols, UnnestOptions::default())?
2649 .build()?;
2650
2651 assert_snapshot!(plan, @r"
2652 Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2653 TableScan: test_table
2654 ");
2655
2656 let plan = nested_table_scan("test_table")?.unnest_column("missing");
2658 assert!(plan.is_err());
2659
2660 let plan = nested_table_scan("test_table")?
2662 .unnest_columns_with_options(
2663 vec!["stringss".into(), "struct_singular".into()],
2664 UnnestOptions::default()
2665 .with_recursions(RecursionUnnestOption {
2666 input_column: "stringss".into(),
2667 output_column: "stringss_depth_1".into(),
2668 depth: 1,
2669 })
2670 .with_recursions(RecursionUnnestOption {
2671 input_column: "stringss".into(),
2672 output_column: "stringss_depth_2".into(),
2673 depth: 2,
2674 }),
2675 )?
2676 .build()?;
2677
2678 assert_snapshot!(plan, @r"
2679 Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2680 TableScan: test_table
2681 ");
2682
2683 let field = plan
2685 .schema()
2686 .field_with_name(None, "stringss_depth_1")
2687 .unwrap();
2688 assert_eq!(
2689 &DataType::new_list(DataType::Utf8, false),
2690 field.data_type()
2691 );
2692 let field = plan
2693 .schema()
2694 .field_with_name(None, "stringss_depth_2")
2695 .unwrap();
2696 assert_eq!(&DataType::Utf8, field.data_type());
2697 for field_name in &["a", "b"] {
2699 let field = plan
2700 .schema()
2701 .field_with_name(None, &format!("struct_singular.{field_name}"))
2702 .unwrap();
2703 assert_eq!(&DataType::UInt32, field.data_type());
2704 }
2705
2706 Ok(())
2707 }
2708
2709 fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2710 let struct_field_in_list = Field::new_struct(
2713 "item",
2714 vec![
2715 Field::new("a", DataType::UInt32, false),
2716 Field::new("b", DataType::UInt32, false),
2717 ],
2718 false,
2719 );
2720 let string_field = Field::new_list_field(DataType::Utf8, false);
2721 let strings_field = Field::new_list("item", string_field.clone(), false);
2722 let schema = Schema::new(vec![
2723 Field::new("scalar", DataType::UInt32, false),
2724 Field::new_list("strings", string_field, false),
2725 Field::new_list("structs", struct_field_in_list, false),
2726 Field::new(
2727 "struct_singular",
2728 DataType::Struct(Fields::from(vec![
2729 Field::new("a", DataType::UInt32, false),
2730 Field::new("b", DataType::UInt32, false),
2731 ])),
2732 false,
2733 ),
2734 Field::new_list("stringss", strings_field, false),
2735 ]);
2736
2737 table_scan(Some(table_name), &schema, None)
2738 }
2739
2740 #[test]
2741 fn test_union_after_join() -> Result<()> {
2742 let values = vec![vec![lit(1)]];
2743
2744 let left = LogicalPlanBuilder::values(values.clone())?
2745 .alias("left")?
2746 .build()?;
2747 let right = LogicalPlanBuilder::values(values)?
2748 .alias("right")?
2749 .build()?;
2750
2751 let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2752
2753 let plan = LogicalPlanBuilder::from(join.clone())
2754 .union(join)?
2755 .build()?;
2756
2757 assert_snapshot!(plan, @r"
2758 Union
2759 Cross Join:
2760 SubqueryAlias: left
2761 Values: (Int32(1))
2762 SubqueryAlias: right
2763 Values: (Int32(1))
2764 Cross Join:
2765 SubqueryAlias: left
2766 Values: (Int32(1))
2767 SubqueryAlias: right
2768 Values: (Int32(1))
2769 ");
2770
2771 Ok(())
2772 }
2773
2774 #[test]
2775 fn plan_builder_from_logical_plan() -> Result<()> {
2776 let plan =
2777 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2778 .sort(vec![
2779 expr::Sort::new(col("state"), true, true),
2780 expr::Sort::new(col("salary"), false, false),
2781 ])?
2782 .build()?;
2783
2784 let plan_expected = format!("{plan}");
2785 let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2786 assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2787
2788 Ok(())
2789 }
2790
2791 #[test]
2792 fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2793 let constraints =
2794 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2795 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2796
2797 let plan =
2798 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2799 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2800 .build()?;
2801
2802 assert_snapshot!(plan, @r"
2803 Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2804 TableScan: employee_csv projection=[id, state, salary]
2805 ");
2806
2807 Ok(())
2808 }
2809
2810 #[test]
2811 fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2812 let constraints =
2813 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2814 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2815
2816 let options =
2817 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2818 let plan =
2819 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2820 .with_options(options)
2821 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2822 .build()?;
2823
2824 assert_snapshot!(plan, @r"
2825 Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2826 TableScan: employee_csv projection=[id, state, salary]
2827 ");
2828
2829 Ok(())
2830 }
2831
2832 #[test]
2833 fn test_join_metadata() -> Result<()> {
2834 let left_schema = DFSchema::new_with_metadata(
2835 vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2836 HashMap::from([("key".to_string(), "left".to_string())]),
2837 )?;
2838 let right_schema = DFSchema::new_with_metadata(
2839 vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2840 HashMap::from([("key".to_string(), "right".to_string())]),
2841 )?;
2842
2843 let join_schema =
2844 build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2845 assert_eq!(
2846 join_schema.metadata(),
2847 &HashMap::from([("key".to_string(), "left".to_string())])
2848 );
2849 let join_schema =
2850 build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2851 assert_eq!(
2852 join_schema.metadata(),
2853 &HashMap::from([("key".to_string(), "right".to_string())])
2854 );
2855
2856 Ok(())
2857 }
2858
2859 #[test]
2860 fn test_values_metadata() -> Result<()> {
2861 let metadata: HashMap<String, String> =
2862 [("ARROW:extension:metadata".to_string(), "test".to_string())]
2863 .into_iter()
2864 .collect();
2865 let metadata = FieldMetadata::from(metadata);
2866 let values = LogicalPlanBuilder::values(vec![
2867 vec![lit_with_metadata(1, Some(metadata.clone()))],
2868 vec![lit_with_metadata(2, Some(metadata.clone()))],
2869 ])?
2870 .build()?;
2871 assert_eq!(*values.schema().field(0).metadata(), metadata.to_hashmap());
2872
2873 let metadata2: HashMap<String, String> =
2875 [("ARROW:extension:metadata".to_string(), "test2".to_string())]
2876 .into_iter()
2877 .collect();
2878 let metadata2 = FieldMetadata::from(metadata2);
2879 assert!(
2880 LogicalPlanBuilder::values(vec![
2881 vec![lit_with_metadata(1, Some(metadata.clone()))],
2882 vec![lit_with_metadata(2, Some(metadata2.clone()))],
2883 ])
2884 .is_err()
2885 );
2886
2887 Ok(())
2888 }
2889
2890 #[test]
2891 fn test_unique_field_aliases() {
2892 let t1_field_1 = Field::new("a", DataType::Int32, false);
2893 let t2_field_1 = Field::new("a", DataType::Int32, false);
2894 let t2_field_3 = Field::new("a", DataType::Int32, false);
2895 let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2896 let t1_field_2 = Field::new("b", DataType::Int32, false);
2897 let t2_field_2 = Field::new("b", DataType::Int32, false);
2898
2899 let fields = vec![
2900 t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2901 ];
2902 let fields = Fields::from(fields);
2903
2904 let remove_redundant = unique_field_aliases(&fields);
2905
2906 assert_eq!(
2913 remove_redundant,
2914 vec![
2915 None,
2916 Some("a:1".to_string()),
2917 None,
2918 Some("b:1".to_string()),
2919 Some("a:2".to_string()),
2920 Some("a:1:1".to_string()),
2921 ]
2922 );
2923 }
2924}