1use std::any::Any;
21use std::borrow::Cow;
22use std::cmp::Ordering;
23use std::collections::{HashMap, HashSet};
24use std::iter::once;
25use std::sync::Arc;
26
27use crate::dml::CopyTo;
28use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr};
29use crate::expr_rewriter::{
30 coerce_plan_expr_for_schema, normalize_col,
31 normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts,
32 rewrite_sort_cols_by_aggs,
33};
34use crate::logical_plan::{
35 Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
36 JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
37 Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
38 Window,
39};
40use crate::select_expr::SelectExpr;
41use crate::utils::{
42 can_hash, columnize_expr, compare_sort_expr, expand_qualified_wildcard,
43 expand_wildcard, expr_to_columns, find_valid_equijoin_key_pair,
44 group_window_expr_by_sort_keys,
45};
46use crate::{
47 and, binary_expr, lit, DmlStatement, ExplainOption, Expr, ExprSchemable, Operator,
48 RecursiveQuery, Statement, TableProviderFilterPushDown, TableSource, WriteOp,
49};
50
51use super::dml::InsertOp;
52use arrow::compute::can_cast_types;
53use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
54use datafusion_common::display::ToStringifiedPlan;
55use datafusion_common::file_options::file_type::FileType;
56use datafusion_common::metadata::FieldMetadata;
57use datafusion_common::{
58 exec_err, get_target_functional_dependencies, internal_datafusion_err, not_impl_err,
59 plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef,
60 NullEquality, Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
61};
62use datafusion_expr_common::type_coercion::binary::type_union_resolution;
63
64use indexmap::IndexSet;
65
66pub const UNNAMED_TABLE: &str = "?table?";
68
69#[derive(Default, Debug, Clone)]
71pub struct LogicalPlanBuilderOptions {
72 add_implicit_group_by_exprs: bool,
75}
76
77impl LogicalPlanBuilderOptions {
78 pub fn new() -> Self {
79 Default::default()
80 }
81
82 pub fn with_add_implicit_group_by_exprs(mut self, add: bool) -> Self {
84 self.add_implicit_group_by_exprs = add;
85 self
86 }
87}
88
89#[derive(Debug, Clone)]
127pub struct LogicalPlanBuilder {
128 plan: Arc<LogicalPlan>,
129 options: LogicalPlanBuilderOptions,
130}
131
132impl LogicalPlanBuilder {
133 pub fn new(plan: LogicalPlan) -> Self {
135 Self {
136 plan: Arc::new(plan),
137 options: LogicalPlanBuilderOptions::default(),
138 }
139 }
140
141 pub fn new_from_arc(plan: Arc<LogicalPlan>) -> Self {
143 Self {
144 plan,
145 options: LogicalPlanBuilderOptions::default(),
146 }
147 }
148
149 pub fn with_options(mut self, options: LogicalPlanBuilderOptions) -> Self {
150 self.options = options;
151 self
152 }
153
154 pub fn schema(&self) -> &DFSchemaRef {
156 self.plan.schema()
157 }
158
159 pub fn plan(&self) -> &LogicalPlan {
161 &self.plan
162 }
163
164 pub fn empty(produce_one_row: bool) -> Self {
168 Self::new(LogicalPlan::EmptyRelation(EmptyRelation {
169 produce_one_row,
170 schema: DFSchemaRef::new(DFSchema::empty()),
171 }))
172 }
173
174 pub fn to_recursive_query(
177 self,
178 name: String,
179 recursive_term: LogicalPlan,
180 is_distinct: bool,
181 ) -> Result<Self> {
182 if is_distinct {
184 return not_impl_err!(
185 "Recursive queries with a distinct 'UNION' (in which the previous iteration's results will be de-duplicated) is not supported"
186 );
187 }
188 let static_fields_len = self.plan.schema().fields().len();
190 let recursive_fields_len = recursive_term.schema().fields().len();
191 if static_fields_len != recursive_fields_len {
192 return plan_err!(
193 "Non-recursive term and recursive term must have the same number of columns ({} != {})",
194 static_fields_len, recursive_fields_len
195 );
196 }
197 let coerced_recursive_term =
199 coerce_plan_expr_for_schema(recursive_term, self.plan.schema())?;
200 Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
201 name,
202 static_term: self.plan,
203 recursive_term: Arc::new(coerced_recursive_term),
204 is_distinct,
205 })))
206 }
207
208 pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
216 if values.is_empty() {
217 return plan_err!("Values list cannot be empty");
218 }
219 let n_cols = values[0].len();
220 if n_cols == 0 {
221 return plan_err!("Values list cannot be zero length");
222 }
223 for (i, row) in values.iter().enumerate() {
224 if row.len() != n_cols {
225 return plan_err!(
226 "Inconsistent data length across values list: got {} values in row {} but expected {}",
227 row.len(),
228 i,
229 n_cols
230 );
231 }
232 }
233
234 Self::infer_data(values)
236 }
237
238 pub fn values_with_schema(
248 values: Vec<Vec<Expr>>,
249 schema: &DFSchemaRef,
250 ) -> Result<Self> {
251 if values.is_empty() {
252 return plan_err!("Values list cannot be empty");
253 }
254 let n_cols = schema.fields().len();
255 if n_cols == 0 {
256 return plan_err!("Values list cannot be zero length");
257 }
258 for (i, row) in values.iter().enumerate() {
259 if row.len() != n_cols {
260 return plan_err!(
261 "Inconsistent data length across values list: got {} values in row {} but expected {}",
262 row.len(),
263 i,
264 n_cols
265 );
266 }
267 }
268
269 Self::infer_values_from_schema(values, schema)
271 }
272
273 fn infer_values_from_schema(
274 values: Vec<Vec<Expr>>,
275 schema: &DFSchema,
276 ) -> Result<Self> {
277 let n_cols = values[0].len();
278 let mut fields = ValuesFields::new();
279 for j in 0..n_cols {
280 let field_type = schema.field(j).data_type();
281 let field_nullable = schema.field(j).is_nullable();
282 for row in values.iter() {
283 let value = &row[j];
284 let data_type = value.get_type(schema)?;
285
286 if !data_type.equals_datatype(field_type)
287 && !can_cast_types(&data_type, field_type)
288 {
289 return exec_err!(
290 "type mismatch and can't cast to got {} and {}",
291 data_type,
292 field_type
293 );
294 }
295 }
296 fields.push(field_type.to_owned(), field_nullable);
297 }
298
299 Self::infer_inner(values, fields, schema)
300 }
301
302 fn infer_data(values: Vec<Vec<Expr>>) -> Result<Self> {
303 let n_cols = values[0].len();
304 let schema = DFSchema::empty();
305 let mut fields = ValuesFields::new();
306
307 for j in 0..n_cols {
308 let mut common_type: Option<DataType> = None;
309 let mut common_metadata: Option<FieldMetadata> = None;
310 for (i, row) in values.iter().enumerate() {
311 let value = &row[j];
312 let metadata = value.metadata(&schema)?;
313 if let Some(ref cm) = common_metadata {
314 if &metadata != cm {
315 return plan_err!("Inconsistent metadata across values list at row {i} column {j}. Was {:?} but found {:?}", cm, metadata);
316 }
317 } else {
318 common_metadata = Some(metadata.clone());
319 }
320 let data_type = value.get_type(&schema)?;
321 if data_type == DataType::Null {
322 continue;
323 }
324
325 if let Some(prev_type) = common_type {
326 let data_types = vec![prev_type.clone(), data_type.clone()];
328 let Some(new_type) = type_union_resolution(&data_types) else {
329 return plan_err!("Inconsistent data type across values list at row {i} column {j}. Was {prev_type} but found {data_type}");
330 };
331 common_type = Some(new_type);
332 } else {
333 common_type = Some(data_type);
334 }
335 }
336 fields.push_with_metadata(
339 common_type.unwrap_or(DataType::Null),
340 true,
341 common_metadata,
342 );
343 }
344
345 Self::infer_inner(values, fields, &schema)
346 }
347
348 fn infer_inner(
349 mut values: Vec<Vec<Expr>>,
350 fields: ValuesFields,
351 schema: &DFSchema,
352 ) -> Result<Self> {
353 let fields = fields.into_fields();
354 for row in &mut values {
356 for (j, field_type) in fields.iter().map(|f| f.data_type()).enumerate() {
357 if let Expr::Literal(ScalarValue::Null, metadata) = &row[j] {
358 row[j] = Expr::Literal(
359 ScalarValue::try_from(field_type)?,
360 metadata.clone(),
361 );
362 } else {
363 row[j] = std::mem::take(&mut row[j]).cast_to(field_type, schema)?;
364 }
365 }
366 }
367
368 let dfschema = DFSchema::from_unqualified_fields(fields, HashMap::new())?;
369 let schema = DFSchemaRef::new(dfschema);
370
371 Ok(Self::new(LogicalPlan::Values(Values { schema, values })))
372 }
373
374 pub fn scan(
407 table_name: impl Into<TableReference>,
408 table_source: Arc<dyn TableSource>,
409 projection: Option<Vec<usize>>,
410 ) -> Result<Self> {
411 Self::scan_with_filters(table_name, table_source, projection, vec![])
412 }
413
414 pub fn copy_to(
416 input: LogicalPlan,
417 output_url: String,
418 file_type: Arc<dyn FileType>,
419 options: HashMap<String, String>,
420 partition_by: Vec<String>,
421 ) -> Result<Self> {
422 Ok(Self::new(LogicalPlan::Copy(CopyTo::new(
423 Arc::new(input),
424 output_url,
425 partition_by,
426 file_type,
427 options,
428 ))))
429 }
430
431 pub fn insert_into(
465 input: LogicalPlan,
466 table_name: impl Into<TableReference>,
467 target: Arc<dyn TableSource>,
468 insert_op: InsertOp,
469 ) -> Result<Self> {
470 Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
471 table_name.into(),
472 target,
473 WriteOp::Insert(insert_op),
474 Arc::new(input),
475 ))))
476 }
477
478 pub fn scan_with_filters(
480 table_name: impl Into<TableReference>,
481 table_source: Arc<dyn TableSource>,
482 projection: Option<Vec<usize>>,
483 filters: Vec<Expr>,
484 ) -> Result<Self> {
485 Self::scan_with_filters_inner(table_name, table_source, projection, filters, None)
486 }
487
488 pub fn scan_with_filters_fetch(
490 table_name: impl Into<TableReference>,
491 table_source: Arc<dyn TableSource>,
492 projection: Option<Vec<usize>>,
493 filters: Vec<Expr>,
494 fetch: Option<usize>,
495 ) -> Result<Self> {
496 Self::scan_with_filters_inner(
497 table_name,
498 table_source,
499 projection,
500 filters,
501 fetch,
502 )
503 }
504
505 fn scan_with_filters_inner(
506 table_name: impl Into<TableReference>,
507 table_source: Arc<dyn TableSource>,
508 projection: Option<Vec<usize>>,
509 filters: Vec<Expr>,
510 fetch: Option<usize>,
511 ) -> Result<Self> {
512 let table_scan =
513 TableScan::try_new(table_name, table_source, projection, filters, fetch)?;
514
515 if table_scan.filters.is_empty() {
517 if let Some(p) = table_scan.source.get_logical_plan() {
518 let sub_plan = p.into_owned();
519
520 if let Some(proj) = table_scan.projection {
521 let projection_exprs = proj
522 .into_iter()
523 .map(|i| {
524 Expr::Column(Column::from(
525 sub_plan.schema().qualified_field(i),
526 ))
527 })
528 .collect::<Vec<_>>();
529 return Self::new(sub_plan)
530 .project(projection_exprs)?
531 .alias(table_scan.table_name);
532 }
533
534 return Self::new(sub_plan).alias(table_scan.table_name);
538 }
539 }
540
541 Ok(Self::new(LogicalPlan::TableScan(table_scan)))
542 }
543
544 pub fn window_plan(
546 input: LogicalPlan,
547 window_exprs: impl IntoIterator<Item = Expr>,
548 ) -> Result<LogicalPlan> {
549 let mut plan = input;
550 let mut groups = group_window_expr_by_sort_keys(window_exprs)?;
551 groups.sort_by(|(key_a, _), (key_b, _)| {
557 for ((first, _), (second, _)) in key_a.iter().zip(key_b.iter()) {
558 let key_ordering = compare_sort_expr(first, second, plan.schema());
559 match key_ordering {
560 Ordering::Less => {
561 return Ordering::Less;
562 }
563 Ordering::Greater => {
564 return Ordering::Greater;
565 }
566 Ordering::Equal => {}
567 }
568 }
569 key_b.len().cmp(&key_a.len())
570 });
571 for (_, exprs) in groups {
572 let window_exprs = exprs.into_iter().collect::<Vec<_>>();
573 plan = LogicalPlanBuilder::from(plan)
576 .window(window_exprs)?
577 .build()?;
578 }
579 Ok(plan)
580 }
581
582 pub fn project(
584 self,
585 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
586 ) -> Result<Self> {
587 project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
588 }
589
590 pub fn project_with_validation(
593 self,
594 expr: Vec<(impl Into<SelectExpr>, bool)>,
595 ) -> Result<Self> {
596 project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
597 }
598
599 pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
601 let exprs: Vec<_> = indices
602 .into_iter()
603 .map(|x| Expr::Column(Column::from(self.plan.schema().qualified_field(x))))
604 .collect();
605 self.project(exprs)
606 }
607
608 pub fn filter(self, expr: impl Into<Expr>) -> Result<Self> {
610 let expr = normalize_col(expr.into(), &self.plan)?;
611 Filter::try_new(expr, self.plan)
612 .map(LogicalPlan::Filter)
613 .map(Self::new)
614 }
615
616 pub fn having(self, expr: impl Into<Expr>) -> Result<Self> {
618 let expr = normalize_col(expr.into(), &self.plan)?;
619 Filter::try_new(expr, self.plan)
620 .map(LogicalPlan::Filter)
621 .map(Self::from)
622 }
623
624 pub fn prepare(self, name: String, fields: Vec<FieldRef>) -> Result<Self> {
626 Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
627 Prepare {
628 name,
629 fields,
630 input: self.plan,
631 },
632 ))))
633 }
634
635 pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<Self> {
642 let skip_expr = if skip == 0 {
643 None
644 } else {
645 Some(lit(skip as i64))
646 };
647 let fetch_expr = fetch.map(|f| lit(f as i64));
648 self.limit_by_expr(skip_expr, fetch_expr)
649 }
650
651 pub fn limit_by_expr(self, skip: Option<Expr>, fetch: Option<Expr>) -> Result<Self> {
655 Ok(Self::new(LogicalPlan::Limit(Limit {
656 skip: skip.map(Box::new),
657 fetch: fetch.map(Box::new),
658 input: self.plan,
659 })))
660 }
661
662 pub fn alias(self, alias: impl Into<TableReference>) -> Result<Self> {
664 subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new)
665 }
666
667 fn add_missing_columns(
696 curr_plan: LogicalPlan,
697 missing_cols: &IndexSet<Column>,
698 is_distinct: bool,
699 ) -> Result<LogicalPlan> {
700 match curr_plan {
701 LogicalPlan::Projection(Projection {
702 input,
703 mut expr,
704 schema: _,
705 }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => {
706 let mut missing_exprs = missing_cols
707 .iter()
708 .map(|c| normalize_col(Expr::Column(c.clone()), &input))
709 .collect::<Result<Vec<_>>>()?;
710
711 missing_exprs.retain(|e| !expr.contains(e));
715 if is_distinct {
716 Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?;
717 }
718 expr.extend(missing_exprs);
719 project(Arc::unwrap_or_clone(input), expr)
720 }
721 _ => {
722 let is_distinct =
723 is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_));
724 let new_inputs = curr_plan
725 .inputs()
726 .into_iter()
727 .map(|input_plan| {
728 Self::add_missing_columns(
729 (*input_plan).clone(),
730 missing_cols,
731 is_distinct,
732 )
733 })
734 .collect::<Result<Vec<_>>>()?;
735 curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs)
736 }
737 }
738 }
739
740 fn ambiguous_distinct_check(
741 missing_exprs: &[Expr],
742 missing_cols: &IndexSet<Column>,
743 projection_exprs: &[Expr],
744 ) -> Result<()> {
745 if missing_exprs.is_empty() {
746 return Ok(());
747 }
748
749 let all_aliases = missing_exprs.iter().all(|e| {
757 projection_exprs.iter().any(|proj_expr| {
758 if let Expr::Alias(Alias { expr, .. }) = proj_expr {
759 e == expr.as_ref()
760 } else {
761 false
762 }
763 })
764 });
765 if all_aliases {
766 return Ok(());
767 }
768
769 let missing_col_names = missing_cols
770 .iter()
771 .map(|col| col.flat_name())
772 .collect::<String>();
773
774 plan_err!("For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list")
775 }
776
777 pub fn sort_by(
779 self,
780 expr: impl IntoIterator<Item = impl Into<Expr>> + Clone,
781 ) -> Result<Self> {
782 self.sort(
783 expr.into_iter()
784 .map(|e| e.into().sort(true, false))
785 .collect::<Vec<SortExpr>>(),
786 )
787 }
788
789 pub fn sort(
790 self,
791 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
792 ) -> Result<Self> {
793 self.sort_with_limit(sorts, None)
794 }
795
796 pub fn sort_with_limit(
798 self,
799 sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
800 fetch: Option<usize>,
801 ) -> Result<Self> {
802 let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;
803
804 let schema = self.plan.schema();
805
806 let mut missing_cols: IndexSet<Column> = IndexSet::new();
808 sorts.iter().try_for_each::<_, Result<()>>(|sort| {
809 let columns = sort.expr.column_refs();
810
811 missing_cols.extend(
812 columns
813 .into_iter()
814 .filter(|c| !schema.has_column(c))
815 .cloned(),
816 );
817
818 Ok(())
819 })?;
820
821 if missing_cols.is_empty() {
822 return Ok(Self::new(LogicalPlan::Sort(Sort {
823 expr: normalize_sorts(sorts, &self.plan)?,
824 input: self.plan,
825 fetch,
826 })));
827 }
828
829 let new_expr = schema.columns().into_iter().map(Expr::Column).collect();
831
832 let is_distinct = false;
833 let plan = Self::add_missing_columns(
834 Arc::unwrap_or_clone(self.plan),
835 &missing_cols,
836 is_distinct,
837 )?;
838
839 let sort_plan = LogicalPlan::Sort(Sort {
840 expr: normalize_sorts(sorts, &plan)?,
841 input: Arc::new(plan),
842 fetch,
843 });
844
845 Projection::try_new(new_expr, Arc::new(sort_plan))
846 .map(LogicalPlan::Projection)
847 .map(Self::new)
848 }
849
850 pub fn union(self, plan: LogicalPlan) -> Result<Self> {
852 union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
853 }
854
855 pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
857 union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
858 }
859
860 pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
862 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
863 let right_plan: LogicalPlan = plan;
864
865 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
866 union_by_name(left_plan, right_plan)?,
867 )))))
868 }
869
870 pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
872 let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
873 let right_plan: LogicalPlan = plan;
874
875 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
876 union(left_plan, right_plan)?,
877 )))))
878 }
879
880 pub fn distinct(self) -> Result<Self> {
882 Ok(Self::new(LogicalPlan::Distinct(Distinct::All(self.plan))))
883 }
884
885 pub fn distinct_on(
888 self,
889 on_expr: Vec<Expr>,
890 select_expr: Vec<Expr>,
891 sort_expr: Option<Vec<SortExpr>>,
892 ) -> Result<Self> {
893 Ok(Self::new(LogicalPlan::Distinct(Distinct::On(
894 DistinctOn::try_new(on_expr, select_expr, sort_expr, self.plan)?,
895 ))))
896 }
897
898 pub fn join(
912 self,
913 right: LogicalPlan,
914 join_type: JoinType,
915 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
916 filter: Option<Expr>,
917 ) -> Result<Self> {
918 self.join_detailed(
919 right,
920 join_type,
921 join_keys,
922 filter,
923 NullEquality::NullEqualsNothing,
924 )
925 }
926
927 pub fn join_on(
968 self,
969 right: LogicalPlan,
970 join_type: JoinType,
971 on_exprs: impl IntoIterator<Item = Expr>,
972 ) -> Result<Self> {
973 let filter = on_exprs.into_iter().reduce(Expr::and);
974
975 self.join_detailed(
976 right,
977 join_type,
978 (Vec::<Column>::new(), Vec::<Column>::new()),
979 filter,
980 NullEquality::NullEqualsNothing,
981 )
982 }
983
984 pub(crate) fn normalize(plan: &LogicalPlan, column: Column) -> Result<Column> {
985 if column.relation.is_some() {
986 return Ok(column);
988 }
989
990 let schema = plan.schema();
991 let fallback_schemas = plan.fallback_normalize_schemas();
992 let using_columns = plan.using_columns()?;
993 column.normalize_with_schemas_and_ambiguity_check(
994 &[&[schema], &fallback_schemas],
995 &using_columns,
996 )
997 }
998
999 pub fn join_detailed(
1006 self,
1007 right: LogicalPlan,
1008 join_type: JoinType,
1009 join_keys: (Vec<impl Into<Column>>, Vec<impl Into<Column>>),
1010 filter: Option<Expr>,
1011 null_equality: NullEquality,
1012 ) -> Result<Self> {
1013 if join_keys.0.len() != join_keys.1.len() {
1014 return plan_err!("left_keys and right_keys were not the same length");
1015 }
1016
1017 let filter = if let Some(expr) = filter {
1018 let filter = normalize_col_with_schemas_and_ambiguity_check(
1019 expr,
1020 &[&[self.schema(), right.schema()]],
1021 &[],
1022 )?;
1023 Some(filter)
1024 } else {
1025 None
1026 };
1027
1028 let (left_keys, right_keys): (Vec<Result<Column>>, Vec<Result<Column>>) =
1029 join_keys
1030 .0
1031 .into_iter()
1032 .zip(join_keys.1)
1033 .map(|(l, r)| {
1034 let l = l.into();
1035 let r = r.into();
1036
1037 match (&l.relation, &r.relation) {
1038 (Some(lr), Some(rr)) => {
1039 let l_is_left =
1040 self.plan.schema().field_with_qualified_name(lr, &l.name);
1041 let l_is_right =
1042 right.schema().field_with_qualified_name(lr, &l.name);
1043 let r_is_left =
1044 self.plan.schema().field_with_qualified_name(rr, &r.name);
1045 let r_is_right =
1046 right.schema().field_with_qualified_name(rr, &r.name);
1047
1048 match (l_is_left, l_is_right, r_is_left, r_is_right) {
1049 (_, Ok(_), Ok(_), _) => (Ok(r), Ok(l)),
1050 (Ok(_), _, _, Ok(_)) => (Ok(l), Ok(r)),
1051 _ => (
1052 Self::normalize(&self.plan, l),
1053 Self::normalize(&right, r),
1054 ),
1055 }
1056 }
1057 (Some(lr), None) => {
1058 let l_is_left =
1059 self.plan.schema().field_with_qualified_name(lr, &l.name);
1060 let l_is_right =
1061 right.schema().field_with_qualified_name(lr, &l.name);
1062
1063 match (l_is_left, l_is_right) {
1064 (Ok(_), _) => (Ok(l), Self::normalize(&right, r)),
1065 (_, Ok(_)) => (Self::normalize(&self.plan, r), Ok(l)),
1066 _ => (
1067 Self::normalize(&self.plan, l),
1068 Self::normalize(&right, r),
1069 ),
1070 }
1071 }
1072 (None, Some(rr)) => {
1073 let r_is_left =
1074 self.plan.schema().field_with_qualified_name(rr, &r.name);
1075 let r_is_right =
1076 right.schema().field_with_qualified_name(rr, &r.name);
1077
1078 match (r_is_left, r_is_right) {
1079 (Ok(_), _) => (Ok(r), Self::normalize(&right, l)),
1080 (_, Ok(_)) => (Self::normalize(&self.plan, l), Ok(r)),
1081 _ => (
1082 Self::normalize(&self.plan, l),
1083 Self::normalize(&right, r),
1084 ),
1085 }
1086 }
1087 (None, None) => {
1088 let mut swap = false;
1089 let left_key = Self::normalize(&self.plan, l.clone())
1090 .or_else(|_| {
1091 swap = true;
1092 Self::normalize(&right, l)
1093 });
1094 if swap {
1095 (Self::normalize(&self.plan, r), left_key)
1096 } else {
1097 (left_key, Self::normalize(&right, r))
1098 }
1099 }
1100 }
1101 })
1102 .unzip();
1103
1104 let left_keys = left_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1105 let right_keys = right_keys.into_iter().collect::<Result<Vec<Column>>>()?;
1106
1107 let on: Vec<_> = left_keys
1108 .into_iter()
1109 .zip(right_keys)
1110 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
1111 .collect();
1112 let join_schema =
1113 build_join_schema(self.plan.schema(), right.schema(), &join_type)?;
1114
1115 if join_type != JoinType::Inner && on.is_empty() && filter.is_none() {
1117 return plan_err!("join condition should not be empty");
1118 }
1119
1120 Ok(Self::new(LogicalPlan::Join(Join {
1121 left: self.plan,
1122 right: Arc::new(right),
1123 on,
1124 filter,
1125 join_type,
1126 join_constraint: JoinConstraint::On,
1127 schema: DFSchemaRef::new(join_schema),
1128 null_equality,
1129 })))
1130 }
1131
1132 pub fn join_using(
1134 self,
1135 right: LogicalPlan,
1136 join_type: JoinType,
1137 using_keys: Vec<Column>,
1138 ) -> Result<Self> {
1139 let left_keys: Vec<Column> = using_keys
1140 .clone()
1141 .into_iter()
1142 .map(|c| Self::normalize(&self.plan, c))
1143 .collect::<Result<_>>()?;
1144 let right_keys: Vec<Column> = using_keys
1145 .into_iter()
1146 .map(|c| Self::normalize(&right, c))
1147 .collect::<Result<_>>()?;
1148
1149 let on: Vec<(_, _)> = left_keys.into_iter().zip(right_keys).collect();
1150 let mut join_on: Vec<(Expr, Expr)> = vec![];
1151 let mut filters: Option<Expr> = None;
1152 for (l, r) in &on {
1153 if self.plan.schema().has_column(l)
1154 && right.schema().has_column(r)
1155 && can_hash(
1156 datafusion_common::ExprSchema::field_from_column(
1157 self.plan.schema(),
1158 l,
1159 )?
1160 .data_type(),
1161 )
1162 {
1163 join_on.push((Expr::Column(l.clone()), Expr::Column(r.clone())));
1164 } else if self.plan.schema().has_column(l)
1165 && right.schema().has_column(r)
1166 && can_hash(
1167 datafusion_common::ExprSchema::field_from_column(
1168 self.plan.schema(),
1169 r,
1170 )?
1171 .data_type(),
1172 )
1173 {
1174 join_on.push((Expr::Column(r.clone()), Expr::Column(l.clone())));
1175 } else {
1176 let expr = binary_expr(
1177 Expr::Column(l.clone()),
1178 Operator::Eq,
1179 Expr::Column(r.clone()),
1180 );
1181 match filters {
1182 None => filters = Some(expr),
1183 Some(filter_expr) => filters = Some(and(expr, filter_expr)),
1184 }
1185 }
1186 }
1187
1188 if join_on.is_empty() {
1189 let join = Self::from(self.plan).cross_join(right)?;
1190 join.filter(filters.ok_or_else(|| {
1191 internal_datafusion_err!("filters should not be None here")
1192 })?)
1193 } else {
1194 let join = Join::try_new(
1195 self.plan,
1196 Arc::new(right),
1197 join_on,
1198 filters,
1199 join_type,
1200 JoinConstraint::Using,
1201 NullEquality::NullEqualsNothing,
1202 )?;
1203
1204 Ok(Self::new(LogicalPlan::Join(join)))
1205 }
1206 }
1207
1208 pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
1210 let join = Join::try_new(
1211 self.plan,
1212 Arc::new(right),
1213 vec![],
1214 None,
1215 JoinType::Inner,
1216 JoinConstraint::On,
1217 NullEquality::NullEqualsNothing,
1218 )?;
1219
1220 Ok(Self::new(LogicalPlan::Join(join)))
1221 }
1222
1223 pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<Self> {
1225 Ok(Self::new(LogicalPlan::Repartition(Repartition {
1226 input: self.plan,
1227 partitioning_scheme,
1228 })))
1229 }
1230
1231 pub fn window(
1233 self,
1234 window_expr: impl IntoIterator<Item = impl Into<Expr>>,
1235 ) -> Result<Self> {
1236 let window_expr = normalize_cols(window_expr, &self.plan)?;
1237 validate_unique_names("Windows", &window_expr)?;
1238 Ok(Self::new(LogicalPlan::Window(Window::try_new(
1239 window_expr,
1240 self.plan,
1241 )?)))
1242 }
1243
1244 pub fn aggregate(
1248 self,
1249 group_expr: impl IntoIterator<Item = impl Into<Expr>>,
1250 aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
1251 ) -> Result<Self> {
1252 let group_expr = normalize_cols(group_expr, &self.plan)?;
1253 let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
1254
1255 let group_expr = if self.options.add_implicit_group_by_exprs {
1256 add_group_by_exprs_from_dependencies(group_expr, self.plan.schema())?
1257 } else {
1258 group_expr
1259 };
1260
1261 Aggregate::try_new(self.plan, group_expr, aggr_expr)
1262 .map(LogicalPlan::Aggregate)
1263 .map(Self::new)
1264 }
1265
1266 pub fn explain(self, verbose: bool, analyze: bool) -> Result<Self> {
1273 self.explain_option_format(
1275 ExplainOption::default()
1276 .with_verbose(verbose)
1277 .with_analyze(analyze),
1278 )
1279 }
1280
1281 pub fn explain_option_format(self, explain_option: ExplainOption) -> Result<Self> {
1285 let schema = LogicalPlan::explain_schema();
1286 let schema = schema.to_dfschema_ref()?;
1287
1288 if explain_option.analyze {
1289 Ok(Self::new(LogicalPlan::Analyze(Analyze {
1290 verbose: explain_option.verbose,
1291 input: self.plan,
1292 schema,
1293 })))
1294 } else {
1295 let stringified_plans =
1296 vec![self.plan.to_stringified(PlanType::InitialLogicalPlan)];
1297
1298 Ok(Self::new(LogicalPlan::Explain(Explain {
1299 verbose: explain_option.verbose,
1300 plan: self.plan,
1301 explain_format: explain_option.format,
1302 stringified_plans,
1303 schema,
1304 logical_optimization_succeeded: false,
1305 })))
1306 }
1307 }
1308
1309 pub fn intersect(
1311 left_plan: LogicalPlan,
1312 right_plan: LogicalPlan,
1313 is_all: bool,
1314 ) -> Result<LogicalPlan> {
1315 LogicalPlanBuilder::intersect_or_except(
1316 left_plan,
1317 right_plan,
1318 JoinType::LeftSemi,
1319 is_all,
1320 )
1321 }
1322
1323 pub fn except(
1325 left_plan: LogicalPlan,
1326 right_plan: LogicalPlan,
1327 is_all: bool,
1328 ) -> Result<LogicalPlan> {
1329 LogicalPlanBuilder::intersect_or_except(
1330 left_plan,
1331 right_plan,
1332 JoinType::LeftAnti,
1333 is_all,
1334 )
1335 }
1336
1337 fn intersect_or_except(
1339 left_plan: LogicalPlan,
1340 right_plan: LogicalPlan,
1341 join_type: JoinType,
1342 is_all: bool,
1343 ) -> Result<LogicalPlan> {
1344 let left_len = left_plan.schema().fields().len();
1345 let right_len = right_plan.schema().fields().len();
1346
1347 if left_len != right_len {
1348 return plan_err!(
1349 "INTERSECT/EXCEPT query must have the same number of columns. Left is {left_len} and right is {right_len}."
1350 );
1351 }
1352
1353 let join_keys = left_plan
1354 .schema()
1355 .fields()
1356 .iter()
1357 .zip(right_plan.schema().fields().iter())
1358 .map(|(left_field, right_field)| {
1359 (
1360 (Column::from_name(left_field.name())),
1361 (Column::from_name(right_field.name())),
1362 )
1363 })
1364 .unzip();
1365 if is_all {
1366 LogicalPlanBuilder::from(left_plan)
1367 .join_detailed(
1368 right_plan,
1369 join_type,
1370 join_keys,
1371 None,
1372 NullEquality::NullEqualsNull,
1373 )?
1374 .build()
1375 } else {
1376 LogicalPlanBuilder::from(left_plan)
1377 .distinct()?
1378 .join_detailed(
1379 right_plan,
1380 join_type,
1381 join_keys,
1382 None,
1383 NullEquality::NullEqualsNull,
1384 )?
1385 .build()
1386 }
1387 }
1388
1389 pub fn build(self) -> Result<LogicalPlan> {
1391 Ok(Arc::unwrap_or_clone(self.plan))
1392 }
1393
1394 pub fn join_with_expr_keys(
1409 self,
1410 right: LogicalPlan,
1411 join_type: JoinType,
1412 equi_exprs: (Vec<impl Into<Expr>>, Vec<impl Into<Expr>>),
1413 filter: Option<Expr>,
1414 ) -> Result<Self> {
1415 if equi_exprs.0.len() != equi_exprs.1.len() {
1416 return plan_err!("left_keys and right_keys were not the same length");
1417 }
1418
1419 let join_key_pairs = equi_exprs
1420 .0
1421 .into_iter()
1422 .zip(equi_exprs.1)
1423 .map(|(l, r)| {
1424 let left_key = l.into();
1425 let right_key = r.into();
1426 let mut left_using_columns = HashSet::new();
1427 expr_to_columns(&left_key, &mut left_using_columns)?;
1428 let normalized_left_key = normalize_col_with_schemas_and_ambiguity_check(
1429 left_key,
1430 &[&[self.plan.schema()]],
1431 &[],
1432 )?;
1433
1434 let mut right_using_columns = HashSet::new();
1435 expr_to_columns(&right_key, &mut right_using_columns)?;
1436 let normalized_right_key = normalize_col_with_schemas_and_ambiguity_check(
1437 right_key,
1438 &[&[right.schema()]],
1439 &[],
1440 )?;
1441
1442 find_valid_equijoin_key_pair(
1444 &normalized_left_key,
1445 &normalized_right_key,
1446 self.plan.schema(),
1447 right.schema(),
1448 )?.ok_or_else(||
1449 plan_datafusion_err!(
1450 "can't create join plan, join key should belong to one input, error key: ({normalized_left_key},{normalized_right_key})"
1451 ))
1452 })
1453 .collect::<Result<Vec<_>>>()?;
1454
1455 let join = Join::try_new(
1456 self.plan,
1457 Arc::new(right),
1458 join_key_pairs,
1459 filter,
1460 join_type,
1461 JoinConstraint::On,
1462 NullEquality::NullEqualsNothing,
1463 )?;
1464
1465 Ok(Self::new(LogicalPlan::Join(join)))
1466 }
1467
1468 pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
1470 unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
1471 }
1472
1473 pub fn unnest_column_with_options(
1475 self,
1476 column: impl Into<Column>,
1477 options: UnnestOptions,
1478 ) -> Result<Self> {
1479 unnest_with_options(
1480 Arc::unwrap_or_clone(self.plan),
1481 vec![column.into()],
1482 options,
1483 )
1484 .map(Self::new)
1485 }
1486
1487 pub fn unnest_columns_with_options(
1489 self,
1490 columns: Vec<Column>,
1491 options: UnnestOptions,
1492 ) -> Result<Self> {
1493 unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
1494 .map(Self::new)
1495 }
1496}
1497
1498impl From<LogicalPlan> for LogicalPlanBuilder {
1499 fn from(plan: LogicalPlan) -> Self {
1500 LogicalPlanBuilder::new(plan)
1501 }
1502}
1503
1504impl From<Arc<LogicalPlan>> for LogicalPlanBuilder {
1505 fn from(plan: Arc<LogicalPlan>) -> Self {
1506 LogicalPlanBuilder::new_from_arc(plan)
1507 }
1508}
1509
1510#[derive(Default)]
1512struct ValuesFields {
1513 inner: Vec<Field>,
1514}
1515
1516impl ValuesFields {
1517 pub fn new() -> Self {
1518 Self::default()
1519 }
1520
1521 pub fn push(&mut self, data_type: DataType, nullable: bool) {
1522 self.push_with_metadata(data_type, nullable, None);
1523 }
1524
1525 pub fn push_with_metadata(
1526 &mut self,
1527 data_type: DataType,
1528 nullable: bool,
1529 metadata: Option<FieldMetadata>,
1530 ) {
1531 let name = format!("column{}", self.inner.len() + 1);
1534 let mut field = Field::new(name, data_type, nullable);
1535 if let Some(metadata) = metadata {
1536 field.set_metadata(metadata.to_hashmap());
1537 }
1538 self.inner.push(field);
1539 }
1540
1541 pub fn into_fields(self) -> Fields {
1542 self.inner.into()
1543 }
1544}
1545
1546pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1558 let mut name_map = HashMap::<&str, usize>::new();
1566 let mut seen = HashSet::<Cow<String>>::new();
1568
1569 fields
1570 .iter()
1571 .map(|field| {
1572 let original_name = field.name();
1573 let mut name = Cow::Borrowed(original_name);
1574
1575 let count = name_map.entry(original_name).or_insert(0);
1576
1577 while seen.contains(&name) {
1579 *count += 1;
1580 name = Cow::Owned(format!("{original_name}:{count}"));
1581 }
1582
1583 seen.insert(name.clone());
1584
1585 match name {
1586 Cow::Borrowed(_) => None,
1587 Cow::Owned(alias) => Some(alias),
1588 }
1589 })
1590 .collect()
1591}
1592
1593fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
1594 let mut table_references = schema
1595 .iter()
1596 .filter_map(|(qualifier, _)| qualifier)
1597 .collect::<Vec<_>>();
1598 table_references.dedup();
1599 let table_reference = if table_references.len() == 1 {
1600 table_references.pop().cloned()
1601 } else {
1602 None
1603 };
1604
1605 (
1606 table_reference,
1607 Arc::new(Field::new("mark", DataType::Boolean, false)),
1608 )
1609}
1610
1611pub fn build_join_schema(
1614 left: &DFSchema,
1615 right: &DFSchema,
1616 join_type: &JoinType,
1617) -> Result<DFSchema> {
1618 fn nullify_fields<'a>(
1619 fields: impl Iterator<Item = (Option<&'a TableReference>, &'a Arc<Field>)>,
1620 ) -> Vec<(Option<TableReference>, Arc<Field>)> {
1621 fields
1622 .map(|(q, f)| {
1623 let field = f.as_ref().clone().with_nullable(true);
1625 (q.cloned(), Arc::new(field))
1626 })
1627 .collect()
1628 }
1629
1630 let right_fields = right.iter();
1631 let left_fields = left.iter();
1632
1633 let qualified_fields: Vec<(Option<TableReference>, Arc<Field>)> = match join_type {
1634 JoinType::Inner => {
1635 let left_fields = left_fields
1637 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1638 .collect::<Vec<_>>();
1639 let right_fields = right_fields
1640 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1641 .collect::<Vec<_>>();
1642 left_fields.into_iter().chain(right_fields).collect()
1643 }
1644 JoinType::Left => {
1645 let left_fields = left_fields
1647 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1648 .collect::<Vec<_>>();
1649 left_fields
1650 .into_iter()
1651 .chain(nullify_fields(right_fields))
1652 .collect()
1653 }
1654 JoinType::Right => {
1655 let right_fields = right_fields
1657 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1658 .collect::<Vec<_>>();
1659 nullify_fields(left_fields)
1660 .into_iter()
1661 .chain(right_fields)
1662 .collect()
1663 }
1664 JoinType::Full => {
1665 nullify_fields(left_fields)
1667 .into_iter()
1668 .chain(nullify_fields(right_fields))
1669 .collect()
1670 }
1671 JoinType::LeftSemi | JoinType::LeftAnti => {
1672 left_fields
1674 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1675 .collect()
1676 }
1677 JoinType::LeftMark => left_fields
1678 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1679 .chain(once(mark_field(right)))
1680 .collect(),
1681 JoinType::RightSemi | JoinType::RightAnti => {
1682 right_fields
1684 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1685 .collect()
1686 }
1687 JoinType::RightMark => right_fields
1688 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
1689 .chain(once(mark_field(left)))
1690 .collect(),
1691 };
1692 let func_dependencies = left.functional_dependencies().join(
1693 right.functional_dependencies(),
1694 join_type,
1695 left.fields().len(),
1696 );
1697
1698 let (schema1, schema2) = match join_type {
1699 JoinType::Right
1700 | JoinType::RightSemi
1701 | JoinType::RightAnti
1702 | JoinType::RightMark => (left, right),
1703 _ => (right, left),
1704 };
1705
1706 let metadata = schema1
1707 .metadata()
1708 .clone()
1709 .into_iter()
1710 .chain(schema2.metadata().clone())
1711 .collect();
1712
1713 let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1714 dfschema.with_functional_dependencies(func_dependencies)
1715}
1716
1717pub fn requalify_sides_if_needed(
1727 left: LogicalPlanBuilder,
1728 right: LogicalPlanBuilder,
1729) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1730 let left_cols = left.schema().columns();
1731 let right_cols = right.schema().columns();
1732 if left_cols.iter().any(|l| {
1733 right_cols.iter().any(|r| {
1734 l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
1735 })
1736 }) {
1737 Ok((
1740 left.alias(TableReference::bare("left"))?,
1741 right.alias(TableReference::bare("right"))?,
1742 true,
1743 ))
1744 } else {
1745 Ok((left, right, false))
1746 }
1747}
1748
1749pub fn add_group_by_exprs_from_dependencies(
1759 mut group_expr: Vec<Expr>,
1760 schema: &DFSchemaRef,
1761) -> Result<Vec<Expr>> {
1762 let mut group_by_field_names = group_expr
1765 .iter()
1766 .map(|e| e.schema_name().to_string())
1767 .collect::<Vec<_>>();
1768
1769 if let Some(target_indices) =
1770 get_target_functional_dependencies(schema, &group_by_field_names)
1771 {
1772 for idx in target_indices {
1773 let expr = Expr::Column(Column::from(schema.qualified_field(idx)));
1774 let expr_name = expr.schema_name().to_string();
1775 if !group_by_field_names.contains(&expr_name) {
1776 group_by_field_names.push(expr_name);
1777 group_expr.push(expr);
1778 }
1779 }
1780 }
1781 Ok(group_expr)
1782}
1783
1784pub fn validate_unique_names<'a>(
1786 node_name: &str,
1787 expressions: impl IntoIterator<Item = &'a Expr>,
1788) -> Result<()> {
1789 let mut unique_names = HashMap::new();
1790
1791 expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
1792 let name = expr.schema_name().to_string();
1793 match unique_names.get(&name) {
1794 None => {
1795 unique_names.insert(name, (position, expr));
1796 Ok(())
1797 },
1798 Some((existing_position, existing_expr)) => {
1799 plan_err!("{node_name} require unique expression names \
1800 but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
1801 at position {position} have the same name. Consider aliasing (\"AS\") one of them."
1802 )
1803 }
1804 }
1805 })
1806}
1807
1808pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1820 Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1821 Arc::new(left_plan),
1822 Arc::new(right_plan),
1823 ])?))
1824}
1825
1826pub fn union_by_name(
1829 left_plan: LogicalPlan,
1830 right_plan: LogicalPlan,
1831) -> Result<LogicalPlan> {
1832 Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
1833 Arc::new(left_plan),
1834 Arc::new(right_plan),
1835 ])?))
1836}
1837
1838pub fn project(
1844 plan: LogicalPlan,
1845 expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
1846) -> Result<LogicalPlan> {
1847 project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1848}
1849
1850fn project_with_validation(
1858 plan: LogicalPlan,
1859 expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1860) -> Result<LogicalPlan> {
1861 let mut projected_expr = vec![];
1862 for (e, validate) in expr {
1863 let e = e.into();
1864 match e {
1865 SelectExpr::Wildcard(opt) => {
1866 let expanded = expand_wildcard(plan.schema(), &plan, Some(&opt))?;
1867
1868 let expanded = if let Some(replace) = opt.replace {
1871 replace_columns(expanded, &replace)?
1872 } else {
1873 expanded
1874 };
1875
1876 for e in expanded {
1877 if validate {
1878 projected_expr
1879 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1880 } else {
1881 projected_expr.push(e)
1882 }
1883 }
1884 }
1885 SelectExpr::QualifiedWildcard(table_ref, opt) => {
1886 let expanded =
1887 expand_qualified_wildcard(&table_ref, plan.schema(), Some(&opt))?;
1888
1889 let expanded = if let Some(replace) = opt.replace {
1892 replace_columns(expanded, &replace)?
1893 } else {
1894 expanded
1895 };
1896
1897 for e in expanded {
1898 if validate {
1899 projected_expr
1900 .push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1901 } else {
1902 projected_expr.push(e)
1903 }
1904 }
1905 }
1906 SelectExpr::Expression(e) => {
1907 if validate {
1908 projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1909 } else {
1910 projected_expr.push(e)
1911 }
1912 }
1913 }
1914 }
1915 validate_unique_names("Projections", projected_expr.iter())?;
1916
1917 Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
1918}
1919
1920fn replace_columns(
1925 mut exprs: Vec<Expr>,
1926 replace: &PlannedReplaceSelectItem,
1927) -> Result<Vec<Expr>> {
1928 for expr in exprs.iter_mut() {
1929 if let Expr::Column(Column { name, .. }) = expr {
1930 if let Some((_, new_expr)) = replace
1931 .items()
1932 .iter()
1933 .zip(replace.expressions().iter())
1934 .find(|(item, _)| item.column_name.value == *name)
1935 {
1936 *expr = new_expr.clone().alias(name.clone())
1937 }
1938 }
1939 }
1940 Ok(exprs)
1941}
1942
1943pub fn subquery_alias(
1945 plan: LogicalPlan,
1946 alias: impl Into<TableReference>,
1947) -> Result<LogicalPlan> {
1948 SubqueryAlias::try_new(Arc::new(plan), alias).map(LogicalPlan::SubqueryAlias)
1949}
1950
1951pub fn table_scan(
1954 name: Option<impl Into<TableReference>>,
1955 table_schema: &Schema,
1956 projection: Option<Vec<usize>>,
1957) -> Result<LogicalPlanBuilder> {
1958 table_scan_with_filters(name, table_schema, projection, vec![])
1959}
1960
1961pub fn table_scan_with_filters(
1965 name: Option<impl Into<TableReference>>,
1966 table_schema: &Schema,
1967 projection: Option<Vec<usize>>,
1968 filters: Vec<Expr>,
1969) -> Result<LogicalPlanBuilder> {
1970 let table_source = table_source(table_schema);
1971 let name = name
1972 .map(|n| n.into())
1973 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1974 LogicalPlanBuilder::scan_with_filters(name, table_source, projection, filters)
1975}
1976
1977pub fn table_scan_with_filter_and_fetch(
1981 name: Option<impl Into<TableReference>>,
1982 table_schema: &Schema,
1983 projection: Option<Vec<usize>>,
1984 filters: Vec<Expr>,
1985 fetch: Option<usize>,
1986) -> Result<LogicalPlanBuilder> {
1987 let table_source = table_source(table_schema);
1988 let name = name
1989 .map(|n| n.into())
1990 .unwrap_or_else(|| TableReference::bare(UNNAMED_TABLE));
1991 LogicalPlanBuilder::scan_with_filters_fetch(
1992 name,
1993 table_source,
1994 projection,
1995 filters,
1996 fetch,
1997 )
1998}
1999
2000pub fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
2001 let table_schema = Arc::new(table_schema.clone());
2003 Arc::new(LogicalTableSource {
2004 table_schema,
2005 constraints: Default::default(),
2006 })
2007}
2008
2009pub fn table_source_with_constraints(
2010 table_schema: &Schema,
2011 constraints: Constraints,
2012) -> Arc<dyn TableSource> {
2013 let table_schema = Arc::new(table_schema.clone());
2015 Arc::new(LogicalTableSource {
2016 table_schema,
2017 constraints,
2018 })
2019}
2020
2021pub fn wrap_projection_for_join_if_necessary(
2023 join_keys: &[Expr],
2024 input: LogicalPlan,
2025) -> Result<(LogicalPlan, Vec<Column>, bool)> {
2026 let input_schema = input.schema();
2027 let alias_join_keys: Vec<Expr> = join_keys
2028 .iter()
2029 .map(|key| {
2030 if matches!(key, Expr::Cast(_)) || matches!(key, Expr::TryCast(_)) {
2039 let alias = format!("{key}");
2040 key.clone().alias(alias)
2041 } else {
2042 key.clone()
2043 }
2044 })
2045 .collect::<Vec<_>>();
2046
2047 let need_project = join_keys.iter().any(|key| !matches!(key, Expr::Column(_)));
2048 let plan = if need_project {
2049 let mut projection = input_schema
2051 .columns()
2052 .into_iter()
2053 .map(Expr::Column)
2054 .collect::<Vec<_>>();
2055 let join_key_items = alias_join_keys
2056 .iter()
2057 .flat_map(|expr| expr.try_as_col().is_none().then_some(expr))
2058 .cloned()
2059 .collect::<HashSet<Expr>>();
2060 projection.extend(join_key_items);
2061
2062 LogicalPlanBuilder::from(input)
2063 .project(projection.into_iter().map(SelectExpr::from))?
2064 .build()?
2065 } else {
2066 input
2067 };
2068
2069 let join_on = alias_join_keys
2070 .into_iter()
2071 .map(|key| {
2072 if let Some(col) = key.try_as_col() {
2073 Ok(col.clone())
2074 } else {
2075 let name = key.schema_name().to_string();
2076 Ok(Column::from_name(name))
2077 }
2078 })
2079 .collect::<Result<Vec<_>>>()?;
2080
2081 Ok((plan, join_on, need_project))
2082}
2083
2084pub struct LogicalTableSource {
2088 table_schema: SchemaRef,
2089 constraints: Constraints,
2090}
2091
2092impl LogicalTableSource {
2093 pub fn new(table_schema: SchemaRef) -> Self {
2095 Self {
2096 table_schema,
2097 constraints: Constraints::default(),
2098 }
2099 }
2100
2101 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
2102 self.constraints = constraints;
2103 self
2104 }
2105}
2106
2107impl TableSource for LogicalTableSource {
2108 fn as_any(&self) -> &dyn Any {
2109 self
2110 }
2111
2112 fn schema(&self) -> SchemaRef {
2113 Arc::clone(&self.table_schema)
2114 }
2115
2116 fn constraints(&self) -> Option<&Constraints> {
2117 Some(&self.constraints)
2118 }
2119
2120 fn supports_filters_pushdown(
2121 &self,
2122 filters: &[&Expr],
2123 ) -> Result<Vec<TableProviderFilterPushDown>> {
2124 Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
2125 }
2126}
2127
2128pub fn unnest(input: LogicalPlan, columns: Vec<Column>) -> Result<LogicalPlan> {
2130 unnest_with_options(input, columns, UnnestOptions::default())
2131}
2132
2133pub fn get_struct_unnested_columns(
2134 col_name: &String,
2135 inner_fields: &Fields,
2136) -> Vec<Column> {
2137 inner_fields
2138 .iter()
2139 .map(|f| Column::from_name(format!("{}.{}", col_name, f.name())))
2140 .collect()
2141}
2142
2143pub fn unnest_with_options(
2173 input: LogicalPlan,
2174 columns_to_unnest: Vec<Column>,
2175 options: UnnestOptions,
2176) -> Result<LogicalPlan> {
2177 Ok(LogicalPlan::Unnest(Unnest::try_new(
2178 Arc::new(input),
2179 columns_to_unnest,
2180 options,
2181 )?))
2182}
2183
2184#[cfg(test)]
2185mod tests {
2186 use std::vec;
2187
2188 use super::*;
2189 use crate::lit_with_metadata;
2190 use crate::logical_plan::StringifiedPlan;
2191 use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
2192
2193 use crate::test::function_stub::sum;
2194 use datafusion_common::{
2195 Constraint, DataFusionError, RecursionUnnestOption, SchemaError,
2196 };
2197 use insta::assert_snapshot;
2198
2199 #[test]
2200 fn plan_builder_simple() -> Result<()> {
2201 let plan =
2202 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2203 .filter(col("state").eq(lit("CO")))?
2204 .project(vec![col("id")])?
2205 .build()?;
2206
2207 assert_snapshot!(plan, @r#"
2208 Projection: employee_csv.id
2209 Filter: employee_csv.state = Utf8("CO")
2210 TableScan: employee_csv projection=[id, state]
2211 "#);
2212
2213 Ok(())
2214 }
2215
2216 #[test]
2217 fn plan_builder_schema() {
2218 let schema = employee_schema();
2219 let projection = None;
2220 let plan =
2221 LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
2222 .unwrap();
2223 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2224
2225 let projection = None;
2228 let plan =
2229 LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
2230 .unwrap();
2231 assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
2232 }
2233
2234 #[test]
2235 fn plan_builder_empty_name() {
2236 let schema = employee_schema();
2237 let projection = None;
2238 let err =
2239 LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
2240 assert_snapshot!(
2241 err.strip_backtrace(),
2242 @"Error during planning: table_name cannot be empty"
2243 );
2244 }
2245
2246 #[test]
2247 fn plan_builder_sort() -> Result<()> {
2248 let plan =
2249 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2250 .sort(vec![
2251 expr::Sort::new(col("state"), true, true),
2252 expr::Sort::new(col("salary"), false, false),
2253 ])?
2254 .build()?;
2255
2256 assert_snapshot!(plan, @r"
2257 Sort: employee_csv.state ASC NULLS FIRST, employee_csv.salary DESC NULLS LAST
2258 TableScan: employee_csv projection=[state, salary]
2259 ");
2260
2261 Ok(())
2262 }
2263
2264 #[test]
2265 fn plan_builder_union() -> Result<()> {
2266 let plan =
2267 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2268
2269 let plan = plan
2270 .clone()
2271 .union(plan.clone().build()?)?
2272 .union(plan.clone().build()?)?
2273 .union(plan.build()?)?
2274 .build()?;
2275
2276 assert_snapshot!(plan, @r"
2277 Union
2278 Union
2279 Union
2280 TableScan: employee_csv projection=[state, salary]
2281 TableScan: employee_csv projection=[state, salary]
2282 TableScan: employee_csv projection=[state, salary]
2283 TableScan: employee_csv projection=[state, salary]
2284 ");
2285
2286 Ok(())
2287 }
2288
2289 #[test]
2290 fn plan_builder_union_distinct() -> Result<()> {
2291 let plan =
2292 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?;
2293
2294 let plan = plan
2295 .clone()
2296 .union_distinct(plan.clone().build()?)?
2297 .union_distinct(plan.clone().build()?)?
2298 .union_distinct(plan.build()?)?
2299 .build()?;
2300
2301 assert_snapshot!(plan, @r"
2302 Distinct:
2303 Union
2304 Distinct:
2305 Union
2306 Distinct:
2307 Union
2308 TableScan: employee_csv projection=[state, salary]
2309 TableScan: employee_csv projection=[state, salary]
2310 TableScan: employee_csv projection=[state, salary]
2311 TableScan: employee_csv projection=[state, salary]
2312 ");
2313
2314 Ok(())
2315 }
2316
2317 #[test]
2318 fn plan_builder_simple_distinct() -> Result<()> {
2319 let plan =
2320 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
2321 .filter(col("state").eq(lit("CO")))?
2322 .project(vec![col("id")])?
2323 .distinct()?
2324 .build()?;
2325
2326 assert_snapshot!(plan, @r#"
2327 Distinct:
2328 Projection: employee_csv.id
2329 Filter: employee_csv.state = Utf8("CO")
2330 TableScan: employee_csv projection=[id, state]
2331 "#);
2332
2333 Ok(())
2334 }
2335
2336 #[test]
2337 fn exists_subquery() -> Result<()> {
2338 let foo = test_table_scan_with_name("foo")?;
2339 let bar = test_table_scan_with_name("bar")?;
2340
2341 let subquery = LogicalPlanBuilder::from(foo)
2342 .project(vec![col("a")])?
2343 .filter(col("a").eq(col("bar.a")))?
2344 .build()?;
2345
2346 let outer_query = LogicalPlanBuilder::from(bar)
2347 .project(vec![col("a")])?
2348 .filter(exists(Arc::new(subquery)))?
2349 .build()?;
2350
2351 assert_snapshot!(outer_query, @r"
2352 Filter: EXISTS (<subquery>)
2353 Subquery:
2354 Filter: foo.a = bar.a
2355 Projection: foo.a
2356 TableScan: foo
2357 Projection: bar.a
2358 TableScan: bar
2359 ");
2360
2361 Ok(())
2362 }
2363
2364 #[test]
2365 fn filter_in_subquery() -> Result<()> {
2366 let foo = test_table_scan_with_name("foo")?;
2367 let bar = test_table_scan_with_name("bar")?;
2368
2369 let subquery = LogicalPlanBuilder::from(foo)
2370 .project(vec![col("a")])?
2371 .filter(col("a").eq(col("bar.a")))?
2372 .build()?;
2373
2374 let outer_query = LogicalPlanBuilder::from(bar)
2376 .project(vec![col("a")])?
2377 .filter(in_subquery(col("a"), Arc::new(subquery)))?
2378 .build()?;
2379
2380 assert_snapshot!(outer_query, @r"
2381 Filter: bar.a IN (<subquery>)
2382 Subquery:
2383 Filter: foo.a = bar.a
2384 Projection: foo.a
2385 TableScan: foo
2386 Projection: bar.a
2387 TableScan: bar
2388 ");
2389
2390 Ok(())
2391 }
2392
2393 #[test]
2394 fn select_scalar_subquery() -> Result<()> {
2395 let foo = test_table_scan_with_name("foo")?;
2396 let bar = test_table_scan_with_name("bar")?;
2397
2398 let subquery = LogicalPlanBuilder::from(foo)
2399 .project(vec![col("b")])?
2400 .filter(col("a").eq(col("bar.a")))?
2401 .build()?;
2402
2403 let outer_query = LogicalPlanBuilder::from(bar)
2405 .project(vec![scalar_subquery(Arc::new(subquery))])?
2406 .build()?;
2407
2408 assert_snapshot!(outer_query, @r"
2409 Projection: (<subquery>)
2410 Subquery:
2411 Filter: foo.a = bar.a
2412 Projection: foo.b
2413 TableScan: foo
2414 TableScan: bar
2415 ");
2416
2417 Ok(())
2418 }
2419
2420 #[test]
2421 fn projection_non_unique_names() -> Result<()> {
2422 let plan = table_scan(
2423 Some("employee_csv"),
2424 &employee_schema(),
2425 Some(vec![0, 1]),
2427 )?
2428 .project(vec![col("id"), col("first_name").alias("id")]);
2430
2431 match plan {
2432 Err(DataFusionError::SchemaError(err, _)) => {
2433 if let SchemaError::AmbiguousReference { field } = *err {
2434 let Column {
2435 relation,
2436 name,
2437 spans: _,
2438 } = *field;
2439 let Some(TableReference::Bare { table }) = relation else {
2440 return plan_err!(
2441 "wrong relation: {relation:?}, expected table name"
2442 );
2443 };
2444 assert_eq!(*"employee_csv", *table);
2445 assert_eq!("id", &name);
2446 Ok(())
2447 } else {
2448 plan_err!("Plan should have returned an DataFusionError::SchemaError")
2449 }
2450 }
2451 _ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2452 }
2453 }
2454
2455 fn employee_schema() -> Schema {
2456 Schema::new(vec![
2457 Field::new("id", DataType::Int32, false),
2458 Field::new("first_name", DataType::Utf8, false),
2459 Field::new("last_name", DataType::Utf8, false),
2460 Field::new("state", DataType::Utf8, false),
2461 Field::new("salary", DataType::Int32, false),
2462 ])
2463 }
2464
2465 #[test]
2466 fn stringified_plan() {
2467 let stringified_plan =
2468 StringifiedPlan::new(PlanType::InitialLogicalPlan, "...the plan...");
2469 assert!(stringified_plan.should_display(true));
2470 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2473 StringifiedPlan::new(PlanType::FinalLogicalPlan, "...the plan...");
2474 assert!(stringified_plan.should_display(true));
2475 assert!(stringified_plan.should_display(false)); let stringified_plan =
2478 StringifiedPlan::new(PlanType::InitialPhysicalPlan, "...the plan...");
2479 assert!(stringified_plan.should_display(true));
2480 assert!(!stringified_plan.should_display(false)); let stringified_plan =
2483 StringifiedPlan::new(PlanType::FinalPhysicalPlan, "...the plan...");
2484 assert!(stringified_plan.should_display(true));
2485 assert!(stringified_plan.should_display(false)); let stringified_plan = StringifiedPlan::new(
2488 PlanType::OptimizedLogicalPlan {
2489 optimizer_name: "random opt pass".into(),
2490 },
2491 "...the plan...",
2492 );
2493 assert!(stringified_plan.should_display(true));
2494 assert!(!stringified_plan.should_display(false));
2495 }
2496
2497 fn test_table_scan_with_name(name: &str) -> Result<LogicalPlan> {
2498 let schema = Schema::new(vec![
2499 Field::new("a", DataType::UInt32, false),
2500 Field::new("b", DataType::UInt32, false),
2501 Field::new("c", DataType::UInt32, false),
2502 ]);
2503 table_scan(Some(name), &schema, None)?.build()
2504 }
2505
2506 #[test]
2507 fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
2508 let plan1 =
2509 table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
2510 let plan2 =
2511 table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;
2512
2513 let err_msg1 =
2514 LogicalPlanBuilder::intersect(plan1.build()?, plan2.build()?, true)
2515 .unwrap_err();
2516
2517 assert_snapshot!(err_msg1.strip_backtrace(), @"Error during planning: INTERSECT/EXCEPT query must have the same number of columns. Left is 1 and right is 2.");
2518
2519 Ok(())
2520 }
2521
2522 #[test]
2523 fn plan_builder_unnest() -> Result<()> {
2524 let err = nested_table_scan("test_table")?
2526 .unnest_column("scalar")
2527 .unwrap_err();
2528
2529 let DataFusionError::Internal(desc) = err else {
2530 return plan_err!("Plan should have returned an DataFusionError::Internal");
2531 };
2532
2533 let desc = (*desc
2534 .split(DataFusionError::BACK_TRACE_SEP)
2535 .collect::<Vec<&str>>()
2536 .first()
2537 .unwrap_or(&""))
2538 .to_string();
2539
2540 assert_snapshot!(desc, @"trying to unnest on invalid data type UInt32");
2541
2542 let plan = nested_table_scan("test_table")?
2544 .unnest_column("strings")?
2545 .build()?;
2546
2547 assert_snapshot!(plan, @r"
2548 Unnest: lists[test_table.strings|depth=1] structs[]
2549 TableScan: test_table
2550 ");
2551
2552 let field = plan.schema().field_with_name(None, "strings").unwrap();
2554 assert_eq!(&DataType::Utf8, field.data_type());
2555
2556 let plan = nested_table_scan("test_table")?
2558 .unnest_column("struct_singular")?
2559 .build()?;
2560
2561 assert_snapshot!(plan, @r"
2562 Unnest: lists[] structs[test_table.struct_singular]
2563 TableScan: test_table
2564 ");
2565
2566 for field_name in &["a", "b"] {
2567 let field = plan
2569 .schema()
2570 .field_with_name(None, &format!("struct_singular.{field_name}"))
2571 .unwrap();
2572 assert_eq!(&DataType::UInt32, field.data_type());
2573 }
2574
2575 let plan = nested_table_scan("test_table")?
2577 .unnest_column("strings")?
2578 .unnest_column("structs")?
2579 .unnest_column("struct_singular")?
2580 .build()?;
2581
2582 assert_snapshot!(plan, @r"
2583 Unnest: lists[] structs[test_table.struct_singular]
2584 Unnest: lists[test_table.structs|depth=1] structs[]
2585 Unnest: lists[test_table.strings|depth=1] structs[]
2586 TableScan: test_table
2587 ");
2588
2589 let field = plan.schema().field_with_name(None, "structs").unwrap();
2591 assert!(matches!(field.data_type(), DataType::Struct(_)));
2592
2593 let cols = vec!["strings", "structs", "struct_singular"]
2595 .into_iter()
2596 .map(|c| c.into())
2597 .collect();
2598
2599 let plan = nested_table_scan("test_table")?
2600 .unnest_columns_with_options(cols, UnnestOptions::default())?
2601 .build()?;
2602
2603 assert_snapshot!(plan, @r"
2604 Unnest: lists[test_table.strings|depth=1, test_table.structs|depth=1] structs[test_table.struct_singular]
2605 TableScan: test_table
2606 ");
2607
2608 let plan = nested_table_scan("test_table")?.unnest_column("missing");
2610 assert!(plan.is_err());
2611
2612 let plan = nested_table_scan("test_table")?
2614 .unnest_columns_with_options(
2615 vec!["stringss".into(), "struct_singular".into()],
2616 UnnestOptions::default()
2617 .with_recursions(RecursionUnnestOption {
2618 input_column: "stringss".into(),
2619 output_column: "stringss_depth_1".into(),
2620 depth: 1,
2621 })
2622 .with_recursions(RecursionUnnestOption {
2623 input_column: "stringss".into(),
2624 output_column: "stringss_depth_2".into(),
2625 depth: 2,
2626 }),
2627 )?
2628 .build()?;
2629
2630 assert_snapshot!(plan, @r"
2631 Unnest: lists[test_table.stringss|depth=1, test_table.stringss|depth=2] structs[test_table.struct_singular]
2632 TableScan: test_table
2633 ");
2634
2635 let field = plan
2637 .schema()
2638 .field_with_name(None, "stringss_depth_1")
2639 .unwrap();
2640 assert_eq!(
2641 &DataType::new_list(DataType::Utf8, false),
2642 field.data_type()
2643 );
2644 let field = plan
2645 .schema()
2646 .field_with_name(None, "stringss_depth_2")
2647 .unwrap();
2648 assert_eq!(&DataType::Utf8, field.data_type());
2649 for field_name in &["a", "b"] {
2651 let field = plan
2652 .schema()
2653 .field_with_name(None, &format!("struct_singular.{field_name}"))
2654 .unwrap();
2655 assert_eq!(&DataType::UInt32, field.data_type());
2656 }
2657
2658 Ok(())
2659 }
2660
2661 fn nested_table_scan(table_name: &str) -> Result<LogicalPlanBuilder> {
2662 let struct_field_in_list = Field::new_struct(
2665 "item",
2666 vec![
2667 Field::new("a", DataType::UInt32, false),
2668 Field::new("b", DataType::UInt32, false),
2669 ],
2670 false,
2671 );
2672 let string_field = Field::new_list_field(DataType::Utf8, false);
2673 let strings_field = Field::new_list("item", string_field.clone(), false);
2674 let schema = Schema::new(vec![
2675 Field::new("scalar", DataType::UInt32, false),
2676 Field::new_list("strings", string_field, false),
2677 Field::new_list("structs", struct_field_in_list, false),
2678 Field::new(
2679 "struct_singular",
2680 DataType::Struct(Fields::from(vec![
2681 Field::new("a", DataType::UInt32, false),
2682 Field::new("b", DataType::UInt32, false),
2683 ])),
2684 false,
2685 ),
2686 Field::new_list("stringss", strings_field, false),
2687 ]);
2688
2689 table_scan(Some(table_name), &schema, None)
2690 }
2691
2692 #[test]
2693 fn test_union_after_join() -> Result<()> {
2694 let values = vec![vec![lit(1)]];
2695
2696 let left = LogicalPlanBuilder::values(values.clone())?
2697 .alias("left")?
2698 .build()?;
2699 let right = LogicalPlanBuilder::values(values)?
2700 .alias("right")?
2701 .build()?;
2702
2703 let join = LogicalPlanBuilder::from(left).cross_join(right)?.build()?;
2704
2705 let plan = LogicalPlanBuilder::from(join.clone())
2706 .union(join)?
2707 .build()?;
2708
2709 assert_snapshot!(plan, @r"
2710 Union
2711 Cross Join:
2712 SubqueryAlias: left
2713 Values: (Int32(1))
2714 SubqueryAlias: right
2715 Values: (Int32(1))
2716 Cross Join:
2717 SubqueryAlias: left
2718 Values: (Int32(1))
2719 SubqueryAlias: right
2720 Values: (Int32(1))
2721 ");
2722
2723 Ok(())
2724 }
2725
2726 #[test]
2727 fn plan_builder_from_logical_plan() -> Result<()> {
2728 let plan =
2729 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3, 4]))?
2730 .sort(vec![
2731 expr::Sort::new(col("state"), true, true),
2732 expr::Sort::new(col("salary"), false, false),
2733 ])?
2734 .build()?;
2735
2736 let plan_expected = format!("{plan}");
2737 let plan_builder: LogicalPlanBuilder = Arc::new(plan).into();
2738 assert_eq!(plan_expected, format!("{}", plan_builder.plan));
2739
2740 Ok(())
2741 }
2742
2743 #[test]
2744 fn plan_builder_aggregate_without_implicit_group_by_exprs() -> Result<()> {
2745 let constraints =
2746 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2747 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2748
2749 let plan =
2750 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2751 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2752 .build()?;
2753
2754 assert_snapshot!(plan, @r"
2755 Aggregate: groupBy=[[employee_csv.id]], aggr=[[sum(employee_csv.salary)]]
2756 TableScan: employee_csv projection=[id, state, salary]
2757 ");
2758
2759 Ok(())
2760 }
2761
2762 #[test]
2763 fn plan_builder_aggregate_with_implicit_group_by_exprs() -> Result<()> {
2764 let constraints =
2765 Constraints::new_unverified(vec![Constraint::PrimaryKey(vec![0])]);
2766 let table_source = table_source_with_constraints(&employee_schema(), constraints);
2767
2768 let options =
2769 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
2770 let plan =
2771 LogicalPlanBuilder::scan("employee_csv", table_source, Some(vec![0, 3, 4]))?
2772 .with_options(options)
2773 .aggregate(vec![col("id")], vec![sum(col("salary"))])?
2774 .build()?;
2775
2776 assert_snapshot!(plan, @r"
2777 Aggregate: groupBy=[[employee_csv.id, employee_csv.state, employee_csv.salary]], aggr=[[sum(employee_csv.salary)]]
2778 TableScan: employee_csv projection=[id, state, salary]
2779 ");
2780
2781 Ok(())
2782 }
2783
2784 #[test]
2785 fn test_join_metadata() -> Result<()> {
2786 let left_schema = DFSchema::new_with_metadata(
2787 vec![(None, Arc::new(Field::new("a", DataType::Int32, false)))],
2788 HashMap::from([("key".to_string(), "left".to_string())]),
2789 )?;
2790 let right_schema = DFSchema::new_with_metadata(
2791 vec![(None, Arc::new(Field::new("b", DataType::Int32, false)))],
2792 HashMap::from([("key".to_string(), "right".to_string())]),
2793 )?;
2794
2795 let join_schema =
2796 build_join_schema(&left_schema, &right_schema, &JoinType::Left)?;
2797 assert_eq!(
2798 join_schema.metadata(),
2799 &HashMap::from([("key".to_string(), "left".to_string())])
2800 );
2801 let join_schema =
2802 build_join_schema(&left_schema, &right_schema, &JoinType::Right)?;
2803 assert_eq!(
2804 join_schema.metadata(),
2805 &HashMap::from([("key".to_string(), "right".to_string())])
2806 );
2807
2808 Ok(())
2809 }
2810
2811 #[test]
2812 fn test_values_metadata() -> Result<()> {
2813 let metadata: HashMap<String, String> =
2814 [("ARROW:extension:metadata".to_string(), "test".to_string())]
2815 .into_iter()
2816 .collect();
2817 let metadata = FieldMetadata::from(metadata);
2818 let values = LogicalPlanBuilder::values(vec![
2819 vec![lit_with_metadata(1, Some(metadata.clone()))],
2820 vec![lit_with_metadata(2, Some(metadata.clone()))],
2821 ])?
2822 .build()?;
2823 assert_eq!(*values.schema().field(0).metadata(), metadata.to_hashmap());
2824
2825 let metadata2: HashMap<String, String> =
2827 [("ARROW:extension:metadata".to_string(), "test2".to_string())]
2828 .into_iter()
2829 .collect();
2830 let metadata2 = FieldMetadata::from(metadata2);
2831 assert!(LogicalPlanBuilder::values(vec![
2832 vec![lit_with_metadata(1, Some(metadata.clone()))],
2833 vec![lit_with_metadata(2, Some(metadata2.clone()))],
2834 ])
2835 .is_err());
2836
2837 Ok(())
2838 }
2839
2840 #[test]
2841 fn test_unique_field_aliases() {
2842 let t1_field_1 = Field::new("a", DataType::Int32, false);
2843 let t2_field_1 = Field::new("a", DataType::Int32, false);
2844 let t2_field_3 = Field::new("a", DataType::Int32, false);
2845 let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2846 let t1_field_2 = Field::new("b", DataType::Int32, false);
2847 let t2_field_2 = Field::new("b", DataType::Int32, false);
2848
2849 let fields = vec![
2850 t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2851 ];
2852 let fields = Fields::from(fields);
2853
2854 let remove_redundant = unique_field_aliases(&fields);
2855
2856 assert_eq!(
2863 remove_redundant,
2864 vec![
2865 None,
2866 Some("a:1".to_string()),
2867 None,
2868 Some("b:1".to_string()),
2869 Some("a:2".to_string()),
2870 Some("a:1:1".to_string()),
2871 ]
2872 );
2873 }
2874}