1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::str::FromStr;
25use std::sync::{Arc, LazyLock};
26
27use super::dml::CopyTo;
28use super::invariants::{
29 assert_always_invariants_at_current_node, assert_executable_invariants,
30 InvariantLevel,
31};
32use super::DdlStatement;
33use crate::builder::{change_redundant_column, unnest_with_options};
34use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams};
35use crate::expr_rewriter::{
36 create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
37};
38use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
39use crate::logical_plan::extension::UserDefinedLogicalNode;
40use crate::logical_plan::{DmlStatement, Statement};
41use crate::utils::{
42 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
43 grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
44};
45use crate::{
46 build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
47 CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
48 Operator, Prepare, TableProviderFilterPushDown, TableSource,
49 WindowFunctionDefinition,
50};
51
52use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
53use datafusion_common::cse::{NormalizeEq, Normalizeable};
54use datafusion_common::tree_node::{
55 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
56};
57use datafusion_common::{
58 aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
59 DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
60 FunctionalDependencies, NullEquality, ParamValues, Result, ScalarValue, Spans,
61 TableReference, UnnestOptions,
62};
63use indexmap::IndexSet;
64
65use crate::display::PgJsonVisitor;
67pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
68pub use datafusion_common::{JoinConstraint, JoinType};
69
70#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
204pub enum LogicalPlan {
205 Projection(Projection),
208 Filter(Filter),
217 Window(Window),
223 Aggregate(Aggregate),
229 Sort(Sort),
232 Join(Join),
235 Repartition(Repartition),
239 Union(Union),
243 TableScan(TableScan),
246 EmptyRelation(EmptyRelation),
250 Subquery(Subquery),
253 SubqueryAlias(SubqueryAlias),
255 Limit(Limit),
257 Statement(Statement),
259 Values(Values),
264 Explain(Explain),
267 Analyze(Analyze),
271 Extension(Extension),
274 Distinct(Distinct),
277 Dml(DmlStatement),
279 Ddl(DdlStatement),
281 Copy(CopyTo),
283 DescribeTable(DescribeTable),
286 Unnest(Unnest),
289 RecursiveQuery(RecursiveQuery),
291}
292
293impl Default for LogicalPlan {
294 fn default() -> Self {
295 LogicalPlan::EmptyRelation(EmptyRelation {
296 produce_one_row: false,
297 schema: Arc::new(DFSchema::empty()),
298 })
299 }
300}
301
302impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
303 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
304 &'a self,
305 mut f: F,
306 ) -> Result<TreeNodeRecursion> {
307 f(self)
308 }
309
310 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
311 self,
312 mut f: F,
313 ) -> Result<Transformed<Self>> {
314 f(self)
315 }
316}
317
318impl LogicalPlan {
319 pub fn schema(&self) -> &DFSchemaRef {
321 match self {
322 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
323 LogicalPlan::Values(Values { schema, .. }) => schema,
324 LogicalPlan::TableScan(TableScan {
325 projected_schema, ..
326 }) => projected_schema,
327 LogicalPlan::Projection(Projection { schema, .. }) => schema,
328 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
329 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
330 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
331 LogicalPlan::Window(Window { schema, .. }) => schema,
332 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
333 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
334 LogicalPlan::Join(Join { schema, .. }) => schema,
335 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
336 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
337 LogicalPlan::Statement(statement) => statement.schema(),
338 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
339 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
340 LogicalPlan::Explain(explain) => &explain.schema,
341 LogicalPlan::Analyze(analyze) => &analyze.schema,
342 LogicalPlan::Extension(extension) => extension.node.schema(),
343 LogicalPlan::Union(Union { schema, .. }) => schema,
344 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
345 output_schema
346 }
347 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
348 LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
349 LogicalPlan::Ddl(ddl) => ddl.schema(),
350 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
351 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
352 static_term.schema()
354 }
355 }
356 }
357
358 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
361 match self {
362 LogicalPlan::Window(_)
363 | LogicalPlan::Projection(_)
364 | LogicalPlan::Aggregate(_)
365 | LogicalPlan::Unnest(_)
366 | LogicalPlan::Join(_) => self
367 .inputs()
368 .iter()
369 .map(|input| input.schema().as_ref())
370 .collect(),
371 _ => vec![],
372 }
373 }
374
375 pub fn explain_schema() -> SchemaRef {
377 SchemaRef::new(Schema::new(vec![
378 Field::new("plan_type", DataType::Utf8, false),
379 Field::new("plan", DataType::Utf8, false),
380 ]))
381 }
382
383 pub fn describe_schema() -> Schema {
385 Schema::new(vec![
386 Field::new("column_name", DataType::Utf8, false),
387 Field::new("data_type", DataType::Utf8, false),
388 Field::new("is_nullable", DataType::Utf8, false),
389 ])
390 }
391
392 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
409 let mut exprs = vec![];
410 self.apply_expressions(|e| {
411 exprs.push(e.clone());
412 Ok(TreeNodeRecursion::Continue)
413 })
414 .unwrap();
416 exprs
417 }
418
419 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
422 let mut exprs = vec![];
423 self.apply_expressions(|e| {
424 find_out_reference_exprs(e).into_iter().for_each(|e| {
425 if !exprs.contains(&e) {
426 exprs.push(e)
427 }
428 });
429 Ok(TreeNodeRecursion::Continue)
430 })
431 .unwrap();
433 self.inputs()
434 .into_iter()
435 .flat_map(|child| child.all_out_ref_exprs())
436 .for_each(|e| {
437 if !exprs.contains(&e) {
438 exprs.push(e)
439 }
440 });
441 exprs
442 }
443
444 pub fn inputs(&self) -> Vec<&LogicalPlan> {
448 match self {
449 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
450 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
451 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
452 LogicalPlan::Window(Window { input, .. }) => vec![input],
453 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
454 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
455 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
456 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
457 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
458 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
459 LogicalPlan::Extension(extension) => extension.node.inputs(),
460 LogicalPlan::Union(Union { inputs, .. }) => {
461 inputs.iter().map(|arc| arc.as_ref()).collect()
462 }
463 LogicalPlan::Distinct(
464 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
465 ) => vec![input],
466 LogicalPlan::Explain(explain) => vec![&explain.plan],
467 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
468 LogicalPlan::Dml(write) => vec![&write.input],
469 LogicalPlan::Copy(copy) => vec![©.input],
470 LogicalPlan::Ddl(ddl) => ddl.inputs(),
471 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
472 LogicalPlan::RecursiveQuery(RecursiveQuery {
473 static_term,
474 recursive_term,
475 ..
476 }) => vec![static_term, recursive_term],
477 LogicalPlan::Statement(stmt) => stmt.inputs(),
478 LogicalPlan::TableScan { .. }
480 | LogicalPlan::EmptyRelation { .. }
481 | LogicalPlan::Values { .. }
482 | LogicalPlan::DescribeTable(_) => vec![],
483 }
484 }
485
486 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
488 let mut using_columns: Vec<HashSet<Column>> = vec![];
489
490 self.apply_with_subqueries(|plan| {
491 if let LogicalPlan::Join(Join {
492 join_constraint: JoinConstraint::Using,
493 on,
494 ..
495 }) = plan
496 {
497 let columns =
499 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
500 let Some(l) = l.get_as_join_column() else {
501 return internal_err!(
502 "Invalid join key. Expected column, found {l:?}"
503 );
504 };
505 let Some(r) = r.get_as_join_column() else {
506 return internal_err!(
507 "Invalid join key. Expected column, found {r:?}"
508 );
509 };
510 accumu.insert(l.to_owned());
511 accumu.insert(r.to_owned());
512 Result::<_, DataFusionError>::Ok(accumu)
513 })?;
514 using_columns.push(columns);
515 }
516 Ok(TreeNodeRecursion::Continue)
517 })?;
518
519 Ok(using_columns)
520 }
521
522 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
524 match self {
525 LogicalPlan::Projection(projection) => {
526 Ok(Some(projection.expr.as_slice()[0].clone()))
527 }
528 LogicalPlan::Aggregate(agg) => {
529 if agg.group_expr.is_empty() {
530 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
531 } else {
532 Ok(Some(agg.group_expr.as_slice()[0].clone()))
533 }
534 }
535 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
536 Ok(Some(select_expr[0].clone()))
537 }
538 LogicalPlan::Filter(Filter { input, .. })
539 | LogicalPlan::Distinct(Distinct::All(input))
540 | LogicalPlan::Sort(Sort { input, .. })
541 | LogicalPlan::Limit(Limit { input, .. })
542 | LogicalPlan::Repartition(Repartition { input, .. })
543 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
544 LogicalPlan::Join(Join {
545 left,
546 right,
547 join_type,
548 ..
549 }) => match join_type {
550 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
551 if left.schema().fields().is_empty() {
552 right.head_output_expr()
553 } else {
554 left.head_output_expr()
555 }
556 }
557 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
558 left.head_output_expr()
559 }
560 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
561 right.head_output_expr()
562 }
563 },
564 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
565 static_term.head_output_expr()
566 }
567 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
568 union.schema.qualified_field(0),
569 )))),
570 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
571 table.projected_schema.qualified_field(0),
572 )))),
573 LogicalPlan::SubqueryAlias(subquery_alias) => {
574 let expr_opt = subquery_alias.input.head_output_expr()?;
575 expr_opt
576 .map(|expr| {
577 Ok(Expr::Column(create_col_from_scalar_expr(
578 &expr,
579 subquery_alias.alias.to_string(),
580 )?))
581 })
582 .map_or(Ok(None), |v| v.map(Some))
583 }
584 LogicalPlan::Subquery(_) => Ok(None),
585 LogicalPlan::EmptyRelation(_)
586 | LogicalPlan::Statement(_)
587 | LogicalPlan::Values(_)
588 | LogicalPlan::Explain(_)
589 | LogicalPlan::Analyze(_)
590 | LogicalPlan::Extension(_)
591 | LogicalPlan::Dml(_)
592 | LogicalPlan::Copy(_)
593 | LogicalPlan::Ddl(_)
594 | LogicalPlan::DescribeTable(_)
595 | LogicalPlan::Unnest(_) => Ok(None),
596 }
597 }
598
599 pub fn recompute_schema(self) -> Result<Self> {
622 match self {
623 LogicalPlan::Projection(Projection {
626 expr,
627 input,
628 schema: _,
629 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
630 LogicalPlan::Dml(_) => Ok(self),
631 LogicalPlan::Copy(_) => Ok(self),
632 LogicalPlan::Values(Values { schema, values }) => {
633 Ok(LogicalPlan::Values(Values { schema, values }))
635 }
636 LogicalPlan::Filter(Filter { predicate, input }) => {
637 Filter::try_new(predicate, input).map(LogicalPlan::Filter)
638 }
639 LogicalPlan::Repartition(_) => Ok(self),
640 LogicalPlan::Window(Window {
641 input,
642 window_expr,
643 schema: _,
644 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
645 LogicalPlan::Aggregate(Aggregate {
646 input,
647 group_expr,
648 aggr_expr,
649 schema: _,
650 }) => Aggregate::try_new(input, group_expr, aggr_expr)
651 .map(LogicalPlan::Aggregate),
652 LogicalPlan::Sort(_) => Ok(self),
653 LogicalPlan::Join(Join {
654 left,
655 right,
656 filter,
657 join_type,
658 join_constraint,
659 on,
660 schema: _,
661 null_equality,
662 }) => {
663 let schema =
664 build_join_schema(left.schema(), right.schema(), &join_type)?;
665
666 let new_on: Vec<_> = on
667 .into_iter()
668 .map(|equi_expr| {
669 (equi_expr.0.unalias(), equi_expr.1.unalias())
671 })
672 .collect();
673
674 Ok(LogicalPlan::Join(Join {
675 left,
676 right,
677 join_type,
678 join_constraint,
679 on: new_on,
680 filter,
681 schema: DFSchemaRef::new(schema),
682 null_equality,
683 }))
684 }
685 LogicalPlan::Subquery(_) => Ok(self),
686 LogicalPlan::SubqueryAlias(SubqueryAlias {
687 input,
688 alias,
689 schema: _,
690 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
691 LogicalPlan::Limit(_) => Ok(self),
692 LogicalPlan::Ddl(_) => Ok(self),
693 LogicalPlan::Extension(Extension { node }) => {
694 let expr = node.expressions();
697 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
698 Ok(LogicalPlan::Extension(Extension {
699 node: node.with_exprs_and_inputs(expr, inputs)?,
700 }))
701 }
702 LogicalPlan::Union(Union { inputs, schema }) => {
703 let first_input_schema = inputs[0].schema();
704 if schema.fields().len() == first_input_schema.fields().len() {
705 Ok(LogicalPlan::Union(Union { inputs, schema }))
707 } else {
708 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
716 }
717 }
718 LogicalPlan::Distinct(distinct) => {
719 let distinct = match distinct {
720 Distinct::All(input) => Distinct::All(input),
721 Distinct::On(DistinctOn {
722 on_expr,
723 select_expr,
724 sort_expr,
725 input,
726 schema: _,
727 }) => Distinct::On(DistinctOn::try_new(
728 on_expr,
729 select_expr,
730 sort_expr,
731 input,
732 )?),
733 };
734 Ok(LogicalPlan::Distinct(distinct))
735 }
736 LogicalPlan::RecursiveQuery(_) => Ok(self),
737 LogicalPlan::Analyze(_) => Ok(self),
738 LogicalPlan::Explain(_) => Ok(self),
739 LogicalPlan::TableScan(_) => Ok(self),
740 LogicalPlan::EmptyRelation(_) => Ok(self),
741 LogicalPlan::Statement(_) => Ok(self),
742 LogicalPlan::DescribeTable(_) => Ok(self),
743 LogicalPlan::Unnest(Unnest {
744 input,
745 exec_columns,
746 options,
747 ..
748 }) => {
749 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
751 }
752 }
753 }
754
755 pub fn with_new_exprs(
781 &self,
782 mut expr: Vec<Expr>,
783 inputs: Vec<LogicalPlan>,
784 ) -> Result<LogicalPlan> {
785 match self {
786 LogicalPlan::Projection(Projection { .. }) => {
789 let input = self.only_input(inputs)?;
790 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
791 }
792 LogicalPlan::Dml(DmlStatement {
793 table_name,
794 target,
795 op,
796 ..
797 }) => {
798 self.assert_no_expressions(expr)?;
799 let input = self.only_input(inputs)?;
800 Ok(LogicalPlan::Dml(DmlStatement::new(
801 table_name.clone(),
802 Arc::clone(target),
803 op.clone(),
804 Arc::new(input),
805 )))
806 }
807 LogicalPlan::Copy(CopyTo {
808 input: _,
809 output_url,
810 file_type,
811 options,
812 partition_by,
813 output_schema: _,
814 }) => {
815 self.assert_no_expressions(expr)?;
816 let input = self.only_input(inputs)?;
817 Ok(LogicalPlan::Copy(CopyTo::new(
818 Arc::new(input),
819 output_url.clone(),
820 partition_by.clone(),
821 Arc::clone(file_type),
822 options.clone(),
823 )))
824 }
825 LogicalPlan::Values(Values { schema, .. }) => {
826 self.assert_no_inputs(inputs)?;
827 Ok(LogicalPlan::Values(Values {
828 schema: Arc::clone(schema),
829 values: expr
830 .chunks_exact(schema.fields().len())
831 .map(|s| s.to_vec())
832 .collect(),
833 }))
834 }
835 LogicalPlan::Filter { .. } => {
836 let predicate = self.only_expr(expr)?;
837 let input = self.only_input(inputs)?;
838
839 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
840 }
841 LogicalPlan::Repartition(Repartition {
842 partitioning_scheme,
843 ..
844 }) => match partitioning_scheme {
845 Partitioning::RoundRobinBatch(n) => {
846 self.assert_no_expressions(expr)?;
847 let input = self.only_input(inputs)?;
848 Ok(LogicalPlan::Repartition(Repartition {
849 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
850 input: Arc::new(input),
851 }))
852 }
853 Partitioning::Hash(_, n) => {
854 let input = self.only_input(inputs)?;
855 Ok(LogicalPlan::Repartition(Repartition {
856 partitioning_scheme: Partitioning::Hash(expr, *n),
857 input: Arc::new(input),
858 }))
859 }
860 Partitioning::DistributeBy(_) => {
861 let input = self.only_input(inputs)?;
862 Ok(LogicalPlan::Repartition(Repartition {
863 partitioning_scheme: Partitioning::DistributeBy(expr),
864 input: Arc::new(input),
865 }))
866 }
867 },
868 LogicalPlan::Window(Window { window_expr, .. }) => {
869 assert_eq!(window_expr.len(), expr.len());
870 let input = self.only_input(inputs)?;
871 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
872 }
873 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
874 let input = self.only_input(inputs)?;
875 let agg_expr = expr.split_off(group_expr.len());
877
878 Aggregate::try_new(Arc::new(input), expr, agg_expr)
879 .map(LogicalPlan::Aggregate)
880 }
881 LogicalPlan::Sort(Sort {
882 expr: sort_expr,
883 fetch,
884 ..
885 }) => {
886 let input = self.only_input(inputs)?;
887 Ok(LogicalPlan::Sort(Sort {
888 expr: expr
889 .into_iter()
890 .zip(sort_expr.iter())
891 .map(|(expr, sort)| sort.with_expr(expr))
892 .collect(),
893 input: Arc::new(input),
894 fetch: *fetch,
895 }))
896 }
897 LogicalPlan::Join(Join {
898 join_type,
899 join_constraint,
900 on,
901 null_equality,
902 ..
903 }) => {
904 let (left, right) = self.only_two_inputs(inputs)?;
905 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
906
907 let equi_expr_count = on.len() * 2;
908 assert!(expr.len() >= equi_expr_count);
909
910 let filter_expr = if expr.len() > equi_expr_count {
913 expr.pop()
914 } else {
915 None
916 };
917
918 assert_eq!(expr.len(), equi_expr_count);
921 let mut new_on = Vec::with_capacity(on.len());
922 let mut iter = expr.into_iter();
923 while let Some(left) = iter.next() {
924 let Some(right) = iter.next() else {
925 internal_err!("Expected a pair of expressions to construct the join on expression")?
926 };
927
928 new_on.push((left.unalias(), right.unalias()));
930 }
931
932 Ok(LogicalPlan::Join(Join {
933 left: Arc::new(left),
934 right: Arc::new(right),
935 join_type: *join_type,
936 join_constraint: *join_constraint,
937 on: new_on,
938 filter: filter_expr,
939 schema: DFSchemaRef::new(schema),
940 null_equality: *null_equality,
941 }))
942 }
943 LogicalPlan::Subquery(Subquery {
944 outer_ref_columns,
945 spans,
946 ..
947 }) => {
948 self.assert_no_expressions(expr)?;
949 let input = self.only_input(inputs)?;
950 let subquery = LogicalPlanBuilder::from(input).build()?;
951 Ok(LogicalPlan::Subquery(Subquery {
952 subquery: Arc::new(subquery),
953 outer_ref_columns: outer_ref_columns.clone(),
954 spans: spans.clone(),
955 }))
956 }
957 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
958 self.assert_no_expressions(expr)?;
959 let input = self.only_input(inputs)?;
960 SubqueryAlias::try_new(Arc::new(input), alias.clone())
961 .map(LogicalPlan::SubqueryAlias)
962 }
963 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
964 let old_expr_len = skip.iter().chain(fetch.iter()).count();
965 if old_expr_len != expr.len() {
966 return internal_err!(
967 "Invalid number of new Limit expressions: expected {}, got {}",
968 old_expr_len,
969 expr.len()
970 );
971 }
972 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
974 let new_skip = skip.as_ref().and_then(|_| expr.pop());
975 let input = self.only_input(inputs)?;
976 Ok(LogicalPlan::Limit(Limit {
977 skip: new_skip.map(Box::new),
978 fetch: new_fetch.map(Box::new),
979 input: Arc::new(input),
980 }))
981 }
982 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
983 name,
984 if_not_exists,
985 or_replace,
986 column_defaults,
987 temporary,
988 ..
989 })) => {
990 self.assert_no_expressions(expr)?;
991 let input = self.only_input(inputs)?;
992 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
993 CreateMemoryTable {
994 input: Arc::new(input),
995 constraints: Constraints::default(),
996 name: name.clone(),
997 if_not_exists: *if_not_exists,
998 or_replace: *or_replace,
999 column_defaults: column_defaults.clone(),
1000 temporary: *temporary,
1001 },
1002 )))
1003 }
1004 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1005 name,
1006 or_replace,
1007 definition,
1008 temporary,
1009 ..
1010 })) => {
1011 self.assert_no_expressions(expr)?;
1012 let input = self.only_input(inputs)?;
1013 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1014 input: Arc::new(input),
1015 name: name.clone(),
1016 or_replace: *or_replace,
1017 temporary: *temporary,
1018 definition: definition.clone(),
1019 })))
1020 }
1021 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1022 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1023 })),
1024 LogicalPlan::Union(Union { schema, .. }) => {
1025 self.assert_no_expressions(expr)?;
1026 let input_schema = inputs[0].schema();
1027 let schema = if schema.fields().len() == input_schema.fields().len() {
1029 Arc::clone(schema)
1030 } else {
1031 Arc::clone(input_schema)
1032 };
1033 Ok(LogicalPlan::Union(Union {
1034 inputs: inputs.into_iter().map(Arc::new).collect(),
1035 schema,
1036 }))
1037 }
1038 LogicalPlan::Distinct(distinct) => {
1039 let distinct = match distinct {
1040 Distinct::All(_) => {
1041 self.assert_no_expressions(expr)?;
1042 let input = self.only_input(inputs)?;
1043 Distinct::All(Arc::new(input))
1044 }
1045 Distinct::On(DistinctOn {
1046 on_expr,
1047 select_expr,
1048 ..
1049 }) => {
1050 let input = self.only_input(inputs)?;
1051 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1052 let select_expr = expr.split_off(on_expr.len());
1053 assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1054 Distinct::On(DistinctOn::try_new(
1055 expr,
1056 select_expr,
1057 None, Arc::new(input),
1059 )?)
1060 }
1061 };
1062 Ok(LogicalPlan::Distinct(distinct))
1063 }
1064 LogicalPlan::RecursiveQuery(RecursiveQuery {
1065 name, is_distinct, ..
1066 }) => {
1067 self.assert_no_expressions(expr)?;
1068 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1069 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1070 name: name.clone(),
1071 static_term: Arc::new(static_term),
1072 recursive_term: Arc::new(recursive_term),
1073 is_distinct: *is_distinct,
1074 }))
1075 }
1076 LogicalPlan::Analyze(a) => {
1077 self.assert_no_expressions(expr)?;
1078 let input = self.only_input(inputs)?;
1079 Ok(LogicalPlan::Analyze(Analyze {
1080 verbose: a.verbose,
1081 schema: Arc::clone(&a.schema),
1082 input: Arc::new(input),
1083 }))
1084 }
1085 LogicalPlan::Explain(e) => {
1086 self.assert_no_expressions(expr)?;
1087 let input = self.only_input(inputs)?;
1088 Ok(LogicalPlan::Explain(Explain {
1089 verbose: e.verbose,
1090 plan: Arc::new(input),
1091 explain_format: e.explain_format.clone(),
1092 stringified_plans: e.stringified_plans.clone(),
1093 schema: Arc::clone(&e.schema),
1094 logical_optimization_succeeded: e.logical_optimization_succeeded,
1095 }))
1096 }
1097 LogicalPlan::Statement(Statement::Prepare(Prepare {
1098 name,
1099 data_types,
1100 ..
1101 })) => {
1102 self.assert_no_expressions(expr)?;
1103 let input = self.only_input(inputs)?;
1104 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1105 name: name.clone(),
1106 data_types: data_types.clone(),
1107 input: Arc::new(input),
1108 })))
1109 }
1110 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1111 self.assert_no_inputs(inputs)?;
1112 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1113 name: name.clone(),
1114 parameters: expr,
1115 })))
1116 }
1117 LogicalPlan::TableScan(ts) => {
1118 self.assert_no_inputs(inputs)?;
1119 Ok(LogicalPlan::TableScan(TableScan {
1120 filters: expr,
1121 ..ts.clone()
1122 }))
1123 }
1124 LogicalPlan::EmptyRelation(_)
1125 | LogicalPlan::Ddl(_)
1126 | LogicalPlan::Statement(_)
1127 | LogicalPlan::DescribeTable(_) => {
1128 self.assert_no_expressions(expr)?;
1130 self.assert_no_inputs(inputs)?;
1131 Ok(self.clone())
1132 }
1133 LogicalPlan::Unnest(Unnest {
1134 exec_columns: columns,
1135 options,
1136 ..
1137 }) => {
1138 self.assert_no_expressions(expr)?;
1139 let input = self.only_input(inputs)?;
1140 let new_plan =
1142 unnest_with_options(input, columns.clone(), options.clone())?;
1143 Ok(new_plan)
1144 }
1145 }
1146 }
1147
1148 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1150 match check {
1151 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1152 InvariantLevel::Executable => assert_executable_invariants(self),
1153 }
1154 }
1155
1156 #[inline]
1158 #[allow(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1160 if !expr.is_empty() {
1161 return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1162 }
1163 Ok(())
1164 }
1165
1166 #[inline]
1168 #[allow(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1170 if !inputs.is_empty() {
1171 return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1172 }
1173 Ok(())
1174 }
1175
1176 #[inline]
1178 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1179 if expr.len() != 1 {
1180 return internal_err!(
1181 "{self:?} should have exactly one expr, got {:?}",
1182 expr
1183 );
1184 }
1185 Ok(expr.remove(0))
1186 }
1187
1188 #[inline]
1190 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1191 if inputs.len() != 1 {
1192 return internal_err!(
1193 "{self:?} should have exactly one input, got {:?}",
1194 inputs
1195 );
1196 }
1197 Ok(inputs.remove(0))
1198 }
1199
1200 #[inline]
1202 fn only_two_inputs(
1203 &self,
1204 mut inputs: Vec<LogicalPlan>,
1205 ) -> Result<(LogicalPlan, LogicalPlan)> {
1206 if inputs.len() != 2 {
1207 return internal_err!(
1208 "{self:?} should have exactly two inputs, got {:?}",
1209 inputs
1210 );
1211 }
1212 let right = inputs.remove(1);
1213 let left = inputs.remove(0);
1214 Ok((left, right))
1215 }
1216
1217 pub fn with_param_values(
1271 self,
1272 param_values: impl Into<ParamValues>,
1273 ) -> Result<LogicalPlan> {
1274 let param_values = param_values.into();
1275 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1276
1277 Ok(
1279 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1280 plan_with_values
1281 {
1282 param_values.verify(&prepare_lp.data_types)?;
1283 Arc::unwrap_or_clone(prepare_lp.input)
1285 } else {
1286 plan_with_values
1287 },
1288 )
1289 }
1290
1291 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1296 match self {
1297 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1298 LogicalPlan::Filter(filter) => {
1299 if filter.is_scalar() {
1300 Some(1)
1301 } else {
1302 filter.input.max_rows()
1303 }
1304 }
1305 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1306 LogicalPlan::Aggregate(Aggregate {
1307 input, group_expr, ..
1308 }) => {
1309 if group_expr
1311 .iter()
1312 .all(|expr| matches!(expr, Expr::Literal(_, _)))
1313 {
1314 Some(1)
1315 } else {
1316 input.max_rows()
1317 }
1318 }
1319 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1320 match (fetch, input.max_rows()) {
1321 (Some(fetch_limit), Some(input_max)) => {
1322 Some(input_max.min(*fetch_limit))
1323 }
1324 (Some(fetch_limit), None) => Some(*fetch_limit),
1325 (None, Some(input_max)) => Some(input_max),
1326 (None, None) => None,
1327 }
1328 }
1329 LogicalPlan::Join(Join {
1330 left,
1331 right,
1332 join_type,
1333 ..
1334 }) => match join_type {
1335 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1336 JoinType::Left | JoinType::Right | JoinType::Full => {
1337 match (left.max_rows()?, right.max_rows()?, join_type) {
1338 (0, 0, _) => Some(0),
1339 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1340 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1341 (left_max, right_max, _) => Some(left_max * right_max),
1342 }
1343 }
1344 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1345 left.max_rows()
1346 }
1347 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1348 right.max_rows()
1349 }
1350 },
1351 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1352 LogicalPlan::Union(Union { inputs, .. }) => {
1353 inputs.iter().try_fold(0usize, |mut acc, plan| {
1354 acc += plan.max_rows()?;
1355 Some(acc)
1356 })
1357 }
1358 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1359 LogicalPlan::EmptyRelation(_) => Some(0),
1360 LogicalPlan::RecursiveQuery(_) => None,
1361 LogicalPlan::Subquery(_) => None,
1362 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1363 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1364 Ok(FetchType::Literal(s)) => s,
1365 _ => None,
1366 },
1367 LogicalPlan::Distinct(
1368 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1369 ) => input.max_rows(),
1370 LogicalPlan::Values(v) => Some(v.values.len()),
1371 LogicalPlan::Unnest(_) => None,
1372 LogicalPlan::Ddl(_)
1373 | LogicalPlan::Explain(_)
1374 | LogicalPlan::Analyze(_)
1375 | LogicalPlan::Dml(_)
1376 | LogicalPlan::Copy(_)
1377 | LogicalPlan::DescribeTable(_)
1378 | LogicalPlan::Statement(_)
1379 | LogicalPlan::Extension(_) => None,
1380 }
1381 }
1382
1383 pub fn contains_outer_reference(&self) -> bool {
1385 let mut contains = false;
1386 self.apply_expressions(|expr| {
1387 Ok(if expr.contains_outer() {
1388 contains = true;
1389 TreeNodeRecursion::Stop
1390 } else {
1391 TreeNodeRecursion::Continue
1392 })
1393 })
1394 .unwrap();
1395 contains
1396 }
1397
1398 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1406 match self {
1407 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1408 .output_expressions()?
1409 .into_iter()
1410 .zip(self.schema().columns())
1411 .collect()),
1412 LogicalPlan::Window(Window {
1413 window_expr,
1414 input,
1415 schema,
1416 }) => {
1417 let mut output_exprs = input.columnized_output_exprs()?;
1425 let input_len = input.schema().fields().len();
1426 output_exprs.extend(
1427 window_expr
1428 .iter()
1429 .zip(schema.columns().into_iter().skip(input_len)),
1430 );
1431 Ok(output_exprs)
1432 }
1433 _ => Ok(vec![]),
1434 }
1435 }
1436}
1437
1438impl LogicalPlan {
1439 pub fn replace_params_with_values(
1446 self,
1447 param_values: &ParamValues,
1448 ) -> Result<LogicalPlan> {
1449 self.transform_up_with_subqueries(|plan| {
1450 let schema = Arc::clone(plan.schema());
1451 let name_preserver = NamePreserver::new(&plan);
1452 plan.map_expressions(|e| {
1453 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1454 if !has_placeholder {
1455 Ok(Transformed::no(e))
1459 } else {
1460 let original_name = name_preserver.save(&e);
1461 let transformed_expr = e.transform_up(|e| {
1462 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1463 let value = param_values.get_placeholders_with_values(&id)?;
1464 Ok(Transformed::yes(Expr::Literal(value, None)))
1465 } else {
1466 Ok(Transformed::no(e))
1467 }
1468 })?;
1469 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1471 }
1472 })
1473 })
1474 .map(|res| res.data)
1475 }
1476
1477 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1479 let mut param_names = HashSet::new();
1480 self.apply_with_subqueries(|plan| {
1481 plan.apply_expressions(|expr| {
1482 expr.apply(|expr| {
1483 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1484 param_names.insert(id.clone());
1485 }
1486 Ok(TreeNodeRecursion::Continue)
1487 })
1488 })
1489 })
1490 .map(|_| param_names)
1491 }
1492
1493 pub fn get_parameter_types(
1495 &self,
1496 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1497 let mut param_types: HashMap<String, Option<DataType>> = HashMap::new();
1498
1499 self.apply_with_subqueries(|plan| {
1500 plan.apply_expressions(|expr| {
1501 expr.apply(|expr| {
1502 if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
1503 let prev = param_types.get(id);
1504 match (prev, data_type) {
1505 (Some(Some(prev)), Some(dt)) => {
1506 if prev != dt {
1507 plan_err!("Conflicting types for {id}")?;
1508 }
1509 }
1510 (_, Some(dt)) => {
1511 param_types.insert(id.clone(), Some(dt.clone()));
1512 }
1513 _ => {
1514 param_types.insert(id.clone(), None);
1515 }
1516 }
1517 }
1518 Ok(TreeNodeRecursion::Continue)
1519 })
1520 })
1521 })
1522 .map(|_| param_types)
1523 }
1524
1525 pub fn display_indent(&self) -> impl Display + '_ {
1557 struct Wrapper<'a>(&'a LogicalPlan);
1560 impl Display for Wrapper<'_> {
1561 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1562 let with_schema = false;
1563 let mut visitor = IndentVisitor::new(f, with_schema);
1564 match self.0.visit_with_subqueries(&mut visitor) {
1565 Ok(_) => Ok(()),
1566 Err(_) => Err(fmt::Error),
1567 }
1568 }
1569 }
1570 Wrapper(self)
1571 }
1572
1573 pub fn display_indent_schema(&self) -> impl Display + '_ {
1600 struct Wrapper<'a>(&'a LogicalPlan);
1603 impl Display for Wrapper<'_> {
1604 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1605 let with_schema = true;
1606 let mut visitor = IndentVisitor::new(f, with_schema);
1607 match self.0.visit_with_subqueries(&mut visitor) {
1608 Ok(_) => Ok(()),
1609 Err(_) => Err(fmt::Error),
1610 }
1611 }
1612 }
1613 Wrapper(self)
1614 }
1615
1616 pub fn display_pg_json(&self) -> impl Display + '_ {
1620 struct Wrapper<'a>(&'a LogicalPlan);
1623 impl Display for Wrapper<'_> {
1624 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1625 let mut visitor = PgJsonVisitor::new(f);
1626 visitor.with_schema(true);
1627 match self.0.visit_with_subqueries(&mut visitor) {
1628 Ok(_) => Ok(()),
1629 Err(_) => Err(fmt::Error),
1630 }
1631 }
1632 }
1633 Wrapper(self)
1634 }
1635
1636 pub fn display_graphviz(&self) -> impl Display + '_ {
1666 struct Wrapper<'a>(&'a LogicalPlan);
1669 impl Display for Wrapper<'_> {
1670 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1671 let mut visitor = GraphvizVisitor::new(f);
1672
1673 visitor.start_graph()?;
1674
1675 visitor.pre_visit_plan("LogicalPlan")?;
1676 self.0
1677 .visit_with_subqueries(&mut visitor)
1678 .map_err(|_| fmt::Error)?;
1679 visitor.post_visit_plan()?;
1680
1681 visitor.set_with_schema(true);
1682 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1683 self.0
1684 .visit_with_subqueries(&mut visitor)
1685 .map_err(|_| fmt::Error)?;
1686 visitor.post_visit_plan()?;
1687
1688 visitor.end_graph()?;
1689 Ok(())
1690 }
1691 }
1692 Wrapper(self)
1693 }
1694
1695 pub fn display(&self) -> impl Display + '_ {
1717 struct Wrapper<'a>(&'a LogicalPlan);
1720 impl Display for Wrapper<'_> {
1721 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1722 match self.0 {
1723 LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
1724 LogicalPlan::RecursiveQuery(RecursiveQuery {
1725 is_distinct, ..
1726 }) => {
1727 write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1728 }
1729 LogicalPlan::Values(Values { ref values, .. }) => {
1730 let str_values: Vec<_> = values
1731 .iter()
1732 .take(5)
1734 .map(|row| {
1735 let item = row
1736 .iter()
1737 .map(|expr| expr.to_string())
1738 .collect::<Vec<_>>()
1739 .join(", ");
1740 format!("({item})")
1741 })
1742 .collect();
1743
1744 let eclipse = if values.len() > 5 { "..." } else { "" };
1745 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1746 }
1747
1748 LogicalPlan::TableScan(TableScan {
1749 ref source,
1750 ref table_name,
1751 ref projection,
1752 ref filters,
1753 ref fetch,
1754 ..
1755 }) => {
1756 let projected_fields = match projection {
1757 Some(indices) => {
1758 let schema = source.schema();
1759 let names: Vec<&str> = indices
1760 .iter()
1761 .map(|i| schema.field(*i).name().as_str())
1762 .collect();
1763 format!(" projection=[{}]", names.join(", "))
1764 }
1765 _ => "".to_string(),
1766 };
1767
1768 write!(f, "TableScan: {table_name}{projected_fields}")?;
1769
1770 if !filters.is_empty() {
1771 let mut full_filter = vec![];
1772 let mut partial_filter = vec![];
1773 let mut unsupported_filters = vec![];
1774 let filters: Vec<&Expr> = filters.iter().collect();
1775
1776 if let Ok(results) =
1777 source.supports_filters_pushdown(&filters)
1778 {
1779 filters.iter().zip(results.iter()).for_each(
1780 |(x, res)| match res {
1781 TableProviderFilterPushDown::Exact => {
1782 full_filter.push(x)
1783 }
1784 TableProviderFilterPushDown::Inexact => {
1785 partial_filter.push(x)
1786 }
1787 TableProviderFilterPushDown::Unsupported => {
1788 unsupported_filters.push(x)
1789 }
1790 },
1791 );
1792 }
1793
1794 if !full_filter.is_empty() {
1795 write!(
1796 f,
1797 ", full_filters=[{}]",
1798 expr_vec_fmt!(full_filter)
1799 )?;
1800 };
1801 if !partial_filter.is_empty() {
1802 write!(
1803 f,
1804 ", partial_filters=[{}]",
1805 expr_vec_fmt!(partial_filter)
1806 )?;
1807 }
1808 if !unsupported_filters.is_empty() {
1809 write!(
1810 f,
1811 ", unsupported_filters=[{}]",
1812 expr_vec_fmt!(unsupported_filters)
1813 )?;
1814 }
1815 }
1816
1817 if let Some(n) = fetch {
1818 write!(f, ", fetch={n}")?;
1819 }
1820
1821 Ok(())
1822 }
1823 LogicalPlan::Projection(Projection { ref expr, .. }) => {
1824 write!(f, "Projection:")?;
1825 for (i, expr_item) in expr.iter().enumerate() {
1826 if i > 0 {
1827 write!(f, ",")?;
1828 }
1829 write!(f, " {expr_item}")?;
1830 }
1831 Ok(())
1832 }
1833 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1834 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1835 }
1836 LogicalPlan::Copy(CopyTo {
1837 input: _,
1838 output_url,
1839 file_type,
1840 options,
1841 ..
1842 }) => {
1843 let op_str = options
1844 .iter()
1845 .map(|(k, v)| format!("{k} {v}"))
1846 .collect::<Vec<String>>()
1847 .join(", ");
1848
1849 write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1850 }
1851 LogicalPlan::Ddl(ddl) => {
1852 write!(f, "{}", ddl.display())
1853 }
1854 LogicalPlan::Filter(Filter {
1855 predicate: ref expr,
1856 ..
1857 }) => write!(f, "Filter: {expr}"),
1858 LogicalPlan::Window(Window {
1859 ref window_expr, ..
1860 }) => {
1861 write!(
1862 f,
1863 "WindowAggr: windowExpr=[[{}]]",
1864 expr_vec_fmt!(window_expr)
1865 )
1866 }
1867 LogicalPlan::Aggregate(Aggregate {
1868 ref group_expr,
1869 ref aggr_expr,
1870 ..
1871 }) => write!(
1872 f,
1873 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1874 expr_vec_fmt!(group_expr),
1875 expr_vec_fmt!(aggr_expr)
1876 ),
1877 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1878 write!(f, "Sort: ")?;
1879 for (i, expr_item) in expr.iter().enumerate() {
1880 if i > 0 {
1881 write!(f, ", ")?;
1882 }
1883 write!(f, "{expr_item}")?;
1884 }
1885 if let Some(a) = fetch {
1886 write!(f, ", fetch={a}")?;
1887 }
1888
1889 Ok(())
1890 }
1891 LogicalPlan::Join(Join {
1892 on: ref keys,
1893 filter,
1894 join_constraint,
1895 join_type,
1896 ..
1897 }) => {
1898 let join_expr: Vec<String> =
1899 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1900 let filter_expr = filter
1901 .as_ref()
1902 .map(|expr| format!(" Filter: {expr}"))
1903 .unwrap_or_else(|| "".to_string());
1904 let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1905 "Cross".to_string()
1906 } else {
1907 join_type.to_string()
1908 };
1909 match join_constraint {
1910 JoinConstraint::On => {
1911 write!(
1912 f,
1913 "{} Join: {}{}",
1914 join_type,
1915 join_expr.join(", "),
1916 filter_expr
1917 )
1918 }
1919 JoinConstraint::Using => {
1920 write!(
1921 f,
1922 "{} Join: Using {}{}",
1923 join_type,
1924 join_expr.join(", "),
1925 filter_expr,
1926 )
1927 }
1928 }
1929 }
1930 LogicalPlan::Repartition(Repartition {
1931 partitioning_scheme,
1932 ..
1933 }) => match partitioning_scheme {
1934 Partitioning::RoundRobinBatch(n) => {
1935 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1936 }
1937 Partitioning::Hash(expr, n) => {
1938 let hash_expr: Vec<String> =
1939 expr.iter().map(|e| format!("{e}")).collect();
1940 write!(
1941 f,
1942 "Repartition: Hash({}) partition_count={}",
1943 hash_expr.join(", "),
1944 n
1945 )
1946 }
1947 Partitioning::DistributeBy(expr) => {
1948 let dist_by_expr: Vec<String> =
1949 expr.iter().map(|e| format!("{e}")).collect();
1950 write!(
1951 f,
1952 "Repartition: DistributeBy({})",
1953 dist_by_expr.join(", "),
1954 )
1955 }
1956 },
1957 LogicalPlan::Limit(limit) => {
1958 let skip_str = match limit.get_skip_type() {
1960 Ok(SkipType::Literal(n)) => n.to_string(),
1961 _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1962 };
1963 let fetch_str = match limit.get_fetch_type() {
1964 Ok(FetchType::Literal(Some(n))) => n.to_string(),
1965 Ok(FetchType::Literal(None)) => "None".to_string(),
1966 _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1967 };
1968 write!(
1969 f,
1970 "Limit: skip={skip_str}, fetch={fetch_str}",
1971 )
1972 }
1973 LogicalPlan::Subquery(Subquery { .. }) => {
1974 write!(f, "Subquery:")
1975 }
1976 LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
1977 write!(f, "SubqueryAlias: {alias}")
1978 }
1979 LogicalPlan::Statement(statement) => {
1980 write!(f, "{}", statement.display())
1981 }
1982 LogicalPlan::Distinct(distinct) => match distinct {
1983 Distinct::All(_) => write!(f, "Distinct:"),
1984 Distinct::On(DistinctOn {
1985 on_expr,
1986 select_expr,
1987 sort_expr,
1988 ..
1989 }) => write!(
1990 f,
1991 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
1992 expr_vec_fmt!(on_expr),
1993 expr_vec_fmt!(select_expr),
1994 if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
1995 ),
1996 },
1997 LogicalPlan::Explain { .. } => write!(f, "Explain"),
1998 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
1999 LogicalPlan::Union(_) => write!(f, "Union"),
2000 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2001 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2002 write!(f, "DescribeTable")
2003 }
2004 LogicalPlan::Unnest(Unnest {
2005 input: plan,
2006 list_type_columns: list_col_indices,
2007 struct_type_columns: struct_col_indices, .. }) => {
2008 let input_columns = plan.schema().columns();
2009 let list_type_columns = list_col_indices
2010 .iter()
2011 .map(|(i,unnest_info)|
2012 format!("{}|depth={}", &input_columns[*i].to_string(),
2013 unnest_info.depth))
2014 .collect::<Vec<String>>();
2015 let struct_type_columns = struct_col_indices
2016 .iter()
2017 .map(|i| &input_columns[*i])
2018 .collect::<Vec<&Column>>();
2019 write!(f, "Unnest: lists[{}] structs[{}]",
2021 expr_vec_fmt!(list_type_columns),
2022 expr_vec_fmt!(struct_type_columns))
2023 }
2024 }
2025 }
2026 }
2027 Wrapper(self)
2028 }
2029}
2030
2031impl Display for LogicalPlan {
2032 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2033 self.display_indent().fmt(f)
2034 }
2035}
2036
2037impl ToStringifiedPlan for LogicalPlan {
2038 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2039 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2040 }
2041}
2042
2043#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2045pub struct EmptyRelation {
2046 pub produce_one_row: bool,
2048 pub schema: DFSchemaRef,
2050}
2051
2052impl PartialOrd for EmptyRelation {
2054 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2055 self.produce_one_row.partial_cmp(&other.produce_one_row)
2056 }
2057}
2058
2059#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2082pub struct RecursiveQuery {
2083 pub name: String,
2085 pub static_term: Arc<LogicalPlan>,
2087 pub recursive_term: Arc<LogicalPlan>,
2090 pub is_distinct: bool,
2093}
2094
2095#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2099pub struct Values {
2100 pub schema: DFSchemaRef,
2102 pub values: Vec<Vec<Expr>>,
2104}
2105
2106impl PartialOrd for Values {
2108 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2109 self.values.partial_cmp(&other.values)
2110 }
2111}
2112
2113#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2116#[non_exhaustive]
2118pub struct Projection {
2119 pub expr: Vec<Expr>,
2121 pub input: Arc<LogicalPlan>,
2123 pub schema: DFSchemaRef,
2125}
2126
2127impl PartialOrd for Projection {
2129 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2130 match self.expr.partial_cmp(&other.expr) {
2131 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2132 cmp => cmp,
2133 }
2134 }
2135}
2136
2137impl Projection {
2138 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2140 let projection_schema = projection_schema(&input, &expr)?;
2141 Self::try_new_with_schema(expr, input, projection_schema)
2142 }
2143
2144 pub fn try_new_with_schema(
2146 expr: Vec<Expr>,
2147 input: Arc<LogicalPlan>,
2148 schema: DFSchemaRef,
2149 ) -> Result<Self> {
2150 #[expect(deprecated)]
2151 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2152 && expr.len() != schema.fields().len()
2153 {
2154 return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2155 }
2156 Ok(Self {
2157 expr,
2158 input,
2159 schema,
2160 })
2161 }
2162
2163 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2165 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2166 Self {
2167 expr,
2168 input,
2169 schema,
2170 }
2171 }
2172}
2173
2174pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2188 let metadata = input.schema().metadata().clone();
2189
2190 let schema =
2191 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2192 .with_functional_dependencies(calc_func_dependencies_for_project(
2193 exprs, input,
2194 )?)?;
2195
2196 Ok(Arc::new(schema))
2197}
2198
2199#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2201#[non_exhaustive]
2203pub struct SubqueryAlias {
2204 pub input: Arc<LogicalPlan>,
2206 pub alias: TableReference,
2208 pub schema: DFSchemaRef,
2210}
2211
2212impl SubqueryAlias {
2213 pub fn try_new(
2214 plan: Arc<LogicalPlan>,
2215 alias: impl Into<TableReference>,
2216 ) -> Result<Self> {
2217 let alias = alias.into();
2218 let fields = change_redundant_column(plan.schema().fields());
2219 let meta_data = plan.schema().as_ref().metadata().clone();
2220 let schema: Schema =
2221 DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into();
2222 let func_dependencies = plan.schema().functional_dependencies().clone();
2225 let schema = DFSchemaRef::new(
2226 DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
2227 .with_functional_dependencies(func_dependencies)?,
2228 );
2229 Ok(SubqueryAlias {
2230 input: plan,
2231 alias,
2232 schema,
2233 })
2234 }
2235}
2236
2237impl PartialOrd for SubqueryAlias {
2239 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2240 match self.input.partial_cmp(&other.input) {
2241 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2242 cmp => cmp,
2243 }
2244 }
2245}
2246
2247#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2259#[non_exhaustive]
2260pub struct Filter {
2261 pub predicate: Expr,
2263 pub input: Arc<LogicalPlan>,
2265}
2266
2267impl Filter {
2268 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2273 Self::try_new_internal(predicate, input)
2274 }
2275
2276 #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2279 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2280 Self::try_new_internal(predicate, input)
2281 }
2282
2283 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2284 match data_type {
2285 DataType::Boolean | DataType::Null => true,
2287 DataType::Dictionary(_, value_type) => {
2288 Filter::is_allowed_filter_type(value_type.as_ref())
2289 }
2290 _ => false,
2291 }
2292 }
2293
2294 fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2295 if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2300 if !Filter::is_allowed_filter_type(&predicate_type) {
2301 return plan_err!(
2302 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2303 );
2304 }
2305 }
2306
2307 Ok(Self {
2308 predicate: predicate.unalias_nested().data,
2309 input,
2310 })
2311 }
2312
2313 fn is_scalar(&self) -> bool {
2329 let schema = self.input.schema();
2330
2331 let functional_dependencies = self.input.schema().functional_dependencies();
2332 let unique_keys = functional_dependencies.iter().filter(|dep| {
2333 let nullable = dep.nullable
2334 && dep
2335 .source_indices
2336 .iter()
2337 .any(|&source| schema.field(source).is_nullable());
2338 !nullable
2339 && dep.mode == Dependency::Single
2340 && dep.target_indices.len() == schema.fields().len()
2341 });
2342
2343 let exprs = split_conjunction(&self.predicate);
2344 let eq_pred_cols: HashSet<_> = exprs
2345 .iter()
2346 .filter_map(|expr| {
2347 let Expr::BinaryExpr(BinaryExpr {
2348 left,
2349 op: Operator::Eq,
2350 right,
2351 }) = expr
2352 else {
2353 return None;
2354 };
2355 if left == right {
2357 return None;
2358 }
2359
2360 match (left.as_ref(), right.as_ref()) {
2361 (Expr::Column(_), Expr::Column(_)) => None,
2362 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2363 Some(schema.index_of_column(c).unwrap())
2364 }
2365 _ => None,
2366 }
2367 })
2368 .collect();
2369
2370 for key in unique_keys {
2373 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2374 return true;
2375 }
2376 }
2377 false
2378 }
2379}
2380
2381#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2396pub struct Window {
2397 pub input: Arc<LogicalPlan>,
2399 pub window_expr: Vec<Expr>,
2401 pub schema: DFSchemaRef,
2403}
2404
2405impl Window {
2406 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2408 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2409 .schema()
2410 .iter()
2411 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2412 .collect();
2413 let input_len = fields.len();
2414 let mut window_fields = fields;
2415 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2416 window_fields.extend_from_slice(expr_fields.as_slice());
2417 let metadata = input.schema().metadata().clone();
2418
2419 let mut window_func_dependencies =
2421 input.schema().functional_dependencies().clone();
2422 window_func_dependencies.extend_target_indices(window_fields.len());
2423
2424 let mut new_dependencies = window_expr
2428 .iter()
2429 .enumerate()
2430 .filter_map(|(idx, expr)| {
2431 let Expr::WindowFunction(window_fun) = expr else {
2432 return None;
2433 };
2434 let WindowFunction {
2435 fun: WindowFunctionDefinition::WindowUDF(udwf),
2436 params: WindowFunctionParams { partition_by, .. },
2437 } = window_fun.as_ref()
2438 else {
2439 return None;
2440 };
2441 if udwf.name() == "row_number" && partition_by.is_empty() {
2444 Some(idx + input_len)
2445 } else {
2446 None
2447 }
2448 })
2449 .map(|idx| {
2450 FunctionalDependence::new(vec![idx], vec![], false)
2451 .with_mode(Dependency::Single)
2452 })
2453 .collect::<Vec<_>>();
2454
2455 if !new_dependencies.is_empty() {
2456 for dependence in new_dependencies.iter_mut() {
2457 dependence.target_indices = (0..window_fields.len()).collect();
2458 }
2459 let new_deps = FunctionalDependencies::new(new_dependencies);
2461 window_func_dependencies.extend(new_deps);
2462 }
2463
2464 Self::try_new_with_schema(
2465 window_expr,
2466 input,
2467 Arc::new(
2468 DFSchema::new_with_metadata(window_fields, metadata)?
2469 .with_functional_dependencies(window_func_dependencies)?,
2470 ),
2471 )
2472 }
2473
2474 pub fn try_new_with_schema(
2475 window_expr: Vec<Expr>,
2476 input: Arc<LogicalPlan>,
2477 schema: DFSchemaRef,
2478 ) -> Result<Self> {
2479 if window_expr.len() != schema.fields().len() - input.schema().fields().len() {
2480 return plan_err!(
2481 "Window has mismatch between number of expressions ({}) and number of fields in schema ({})",
2482 window_expr.len(),
2483 schema.fields().len() - input.schema().fields().len()
2484 );
2485 }
2486
2487 Ok(Window {
2488 input,
2489 window_expr,
2490 schema,
2491 })
2492 }
2493}
2494
2495impl PartialOrd for Window {
2497 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2498 match self.input.partial_cmp(&other.input) {
2499 Some(Ordering::Equal) => self.window_expr.partial_cmp(&other.window_expr),
2500 cmp => cmp,
2501 }
2502 }
2503}
2504
2505#[derive(Clone)]
2507pub struct TableScan {
2508 pub table_name: TableReference,
2510 pub source: Arc<dyn TableSource>,
2512 pub projection: Option<Vec<usize>>,
2514 pub projected_schema: DFSchemaRef,
2516 pub filters: Vec<Expr>,
2518 pub fetch: Option<usize>,
2520}
2521
2522impl Debug for TableScan {
2523 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2524 f.debug_struct("TableScan")
2525 .field("table_name", &self.table_name)
2526 .field("source", &"...")
2527 .field("projection", &self.projection)
2528 .field("projected_schema", &self.projected_schema)
2529 .field("filters", &self.filters)
2530 .field("fetch", &self.fetch)
2531 .finish_non_exhaustive()
2532 }
2533}
2534
2535impl PartialEq for TableScan {
2536 fn eq(&self, other: &Self) -> bool {
2537 self.table_name == other.table_name
2538 && self.projection == other.projection
2539 && self.projected_schema == other.projected_schema
2540 && self.filters == other.filters
2541 && self.fetch == other.fetch
2542 }
2543}
2544
2545impl Eq for TableScan {}
2546
2547impl PartialOrd for TableScan {
2550 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2551 #[derive(PartialEq, PartialOrd)]
2552 struct ComparableTableScan<'a> {
2553 pub table_name: &'a TableReference,
2555 pub projection: &'a Option<Vec<usize>>,
2557 pub filters: &'a Vec<Expr>,
2559 pub fetch: &'a Option<usize>,
2561 }
2562 let comparable_self = ComparableTableScan {
2563 table_name: &self.table_name,
2564 projection: &self.projection,
2565 filters: &self.filters,
2566 fetch: &self.fetch,
2567 };
2568 let comparable_other = ComparableTableScan {
2569 table_name: &other.table_name,
2570 projection: &other.projection,
2571 filters: &other.filters,
2572 fetch: &other.fetch,
2573 };
2574 comparable_self.partial_cmp(&comparable_other)
2575 }
2576}
2577
2578impl Hash for TableScan {
2579 fn hash<H: Hasher>(&self, state: &mut H) {
2580 self.table_name.hash(state);
2581 self.projection.hash(state);
2582 self.projected_schema.hash(state);
2583 self.filters.hash(state);
2584 self.fetch.hash(state);
2585 }
2586}
2587
2588impl TableScan {
2589 pub fn try_new(
2592 table_name: impl Into<TableReference>,
2593 table_source: Arc<dyn TableSource>,
2594 projection: Option<Vec<usize>>,
2595 filters: Vec<Expr>,
2596 fetch: Option<usize>,
2597 ) -> Result<Self> {
2598 let table_name = table_name.into();
2599
2600 if table_name.table().is_empty() {
2601 return plan_err!("table_name cannot be empty");
2602 }
2603 let schema = table_source.schema();
2604 let func_dependencies = FunctionalDependencies::new_from_constraints(
2605 table_source.constraints(),
2606 schema.fields.len(),
2607 );
2608 let projected_schema = projection
2609 .as_ref()
2610 .map(|p| {
2611 let projected_func_dependencies =
2612 func_dependencies.project_functional_dependencies(p, p.len());
2613
2614 let df_schema = DFSchema::new_with_metadata(
2615 p.iter()
2616 .map(|i| {
2617 (Some(table_name.clone()), Arc::new(schema.field(*i).clone()))
2618 })
2619 .collect(),
2620 schema.metadata.clone(),
2621 )?;
2622 df_schema.with_functional_dependencies(projected_func_dependencies)
2623 })
2624 .unwrap_or_else(|| {
2625 let df_schema =
2626 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2627 df_schema.with_functional_dependencies(func_dependencies)
2628 })?;
2629 let projected_schema = Arc::new(projected_schema);
2630
2631 Ok(Self {
2632 table_name,
2633 source: table_source,
2634 projection,
2635 projected_schema,
2636 filters,
2637 fetch,
2638 })
2639 }
2640}
2641
2642#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2644pub struct Repartition {
2645 pub input: Arc<LogicalPlan>,
2647 pub partitioning_scheme: Partitioning,
2649}
2650
2651#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2653pub struct Union {
2654 pub inputs: Vec<Arc<LogicalPlan>>,
2656 pub schema: DFSchemaRef,
2658}
2659
2660impl Union {
2661 fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2663 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2664 Ok(Union { inputs, schema })
2665 }
2666
2667 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2672 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2673 Ok(Union { inputs, schema })
2674 }
2675
2676 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2680 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2681 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2682
2683 Ok(Union { inputs, schema })
2684 }
2685
2686 fn rewrite_inputs_from_schema(
2690 schema: &Arc<DFSchema>,
2691 inputs: Vec<Arc<LogicalPlan>>,
2692 ) -> Result<Vec<Arc<LogicalPlan>>> {
2693 let schema_width = schema.iter().count();
2694 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2695 for input in inputs {
2696 let mut expr = Vec::with_capacity(schema_width);
2700 for column in schema.columns() {
2701 if input
2702 .schema()
2703 .has_column_with_unqualified_name(column.name())
2704 {
2705 expr.push(Expr::Column(column));
2706 } else {
2707 expr.push(
2708 Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2709 );
2710 }
2711 }
2712 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2713 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2714 )));
2715 }
2716
2717 Ok(wrapped_inputs)
2718 }
2719
2720 fn derive_schema_from_inputs(
2729 inputs: &[Arc<LogicalPlan>],
2730 loose_types: bool,
2731 by_name: bool,
2732 ) -> Result<DFSchemaRef> {
2733 if inputs.len() < 2 {
2734 return plan_err!("UNION requires at least two inputs");
2735 }
2736
2737 if by_name {
2738 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2739 } else {
2740 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2741 }
2742 }
2743
2744 fn derive_schema_from_inputs_by_name(
2745 inputs: &[Arc<LogicalPlan>],
2746 loose_types: bool,
2747 ) -> Result<DFSchemaRef> {
2748 type FieldData<'a> =
2749 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2750 let mut cols: Vec<(&str, FieldData)> = Vec::new();
2751 for input in inputs.iter() {
2752 for field in input.schema().fields() {
2753 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2754 cols.iter_mut().find(|(name, _)| name == field.name())
2755 {
2756 if !loose_types && *data_type != field.data_type() {
2757 return plan_err!(
2758 "Found different types for field {}",
2759 field.name()
2760 );
2761 }
2762
2763 metadata.push(field.metadata());
2764 *is_nullable |= field.is_nullable();
2767 *occurrences += 1;
2768 } else {
2769 cols.push((
2770 field.name(),
2771 (
2772 field.data_type(),
2773 field.is_nullable(),
2774 vec![field.metadata()],
2775 1,
2776 ),
2777 ));
2778 }
2779 }
2780 }
2781
2782 let union_fields = cols
2783 .into_iter()
2784 .map(
2785 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2786 let final_is_nullable = if occurrences == inputs.len() {
2790 is_nullable
2791 } else {
2792 true
2793 };
2794
2795 let mut field =
2796 Field::new(name, data_type.clone(), final_is_nullable);
2797 field.set_metadata(intersect_maps(unmerged_metadata));
2798
2799 (None, Arc::new(field))
2800 },
2801 )
2802 .collect::<Vec<(Option<TableReference>, _)>>();
2803
2804 let union_schema_metadata =
2805 intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2806
2807 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2809 let schema = Arc::new(schema);
2810
2811 Ok(schema)
2812 }
2813
2814 fn derive_schema_from_inputs_by_position(
2815 inputs: &[Arc<LogicalPlan>],
2816 loose_types: bool,
2817 ) -> Result<DFSchemaRef> {
2818 let first_schema = inputs[0].schema();
2819 let fields_count = first_schema.fields().len();
2820 for input in inputs.iter().skip(1) {
2821 if fields_count != input.schema().fields().len() {
2822 return plan_err!(
2823 "UNION queries have different number of columns: \
2824 left has {} columns whereas right has {} columns",
2825 fields_count,
2826 input.schema().fields().len()
2827 );
2828 }
2829 }
2830
2831 let mut name_counts: HashMap<String, usize> = HashMap::new();
2832 let union_fields = (0..fields_count)
2833 .map(|i| {
2834 let fields = inputs
2835 .iter()
2836 .map(|input| input.schema().field(i))
2837 .collect::<Vec<_>>();
2838 let first_field = fields[0];
2839 let base_name = first_field.name().to_string();
2840
2841 let data_type = if loose_types {
2842 first_field.data_type()
2846 } else {
2847 fields.iter().skip(1).try_fold(
2848 first_field.data_type(),
2849 |acc, field| {
2850 if acc != field.data_type() {
2851 return plan_err!(
2852 "UNION field {i} have different type in inputs: \
2853 left has {} whereas right has {}",
2854 first_field.data_type(),
2855 field.data_type()
2856 );
2857 }
2858 Ok(acc)
2859 },
2860 )?
2861 };
2862 let nullable = fields.iter().any(|field| field.is_nullable());
2863
2864 let name = if let Some(count) = name_counts.get_mut(&base_name) {
2866 *count += 1;
2867 format!("{base_name}_{count}")
2868 } else {
2869 name_counts.insert(base_name.clone(), 0);
2870 base_name
2871 };
2872
2873 let mut field = Field::new(&name, data_type.clone(), nullable);
2874 let field_metadata =
2875 intersect_maps(fields.iter().map(|field| field.metadata()));
2876 field.set_metadata(field_metadata);
2877 Ok((None, Arc::new(field)))
2878 })
2879 .collect::<Result<_>>()?;
2880 let union_schema_metadata =
2881 intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2882
2883 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2885 let schema = Arc::new(schema);
2886
2887 Ok(schema)
2888 }
2889}
2890
2891fn intersect_maps<'a>(
2892 inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
2893) -> HashMap<String, String> {
2894 let mut inputs = inputs.into_iter();
2895 let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
2896 for input in inputs {
2897 merged.retain(|k, v| input.get(k) == Some(&*v));
2902 }
2903 merged
2904}
2905
2906impl PartialOrd for Union {
2908 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2909 self.inputs.partial_cmp(&other.inputs)
2910 }
2911}
2912
2913#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2936pub struct DescribeTable {
2937 pub schema: Arc<Schema>,
2939 pub output_schema: DFSchemaRef,
2941}
2942
2943impl PartialOrd for DescribeTable {
2946 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
2947 None
2949 }
2950}
2951
2952#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2954pub enum ExplainFormat {
2955 Indent,
2972 Tree,
2996 PostgresJSON,
3044 Graphviz,
3081}
3082
3083impl FromStr for ExplainFormat {
3085 type Err = DataFusionError;
3086
3087 fn from_str(format: &str) -> std::result::Result<Self, Self::Err> {
3088 match format.to_lowercase().as_str() {
3089 "indent" => Ok(ExplainFormat::Indent),
3090 "tree" => Ok(ExplainFormat::Tree),
3091 "pgjson" => Ok(ExplainFormat::PostgresJSON),
3092 "graphviz" => Ok(ExplainFormat::Graphviz),
3093 _ => {
3094 plan_err!("Invalid explain format. Expected 'indent', 'tree', 'pgjson' or 'graphviz'. Got '{format}'")
3095 }
3096 }
3097 }
3098}
3099
3100#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3102pub struct ExplainOption {
3103 pub verbose: bool,
3105 pub analyze: bool,
3107 pub format: ExplainFormat,
3109}
3110
3111impl Default for ExplainOption {
3112 fn default() -> Self {
3113 ExplainOption {
3114 verbose: false,
3115 analyze: false,
3116 format: ExplainFormat::Indent,
3117 }
3118 }
3119}
3120
3121impl ExplainOption {
3122 pub fn with_verbose(mut self, verbose: bool) -> Self {
3124 self.verbose = verbose;
3125 self
3126 }
3127
3128 pub fn with_analyze(mut self, analyze: bool) -> Self {
3130 self.analyze = analyze;
3131 self
3132 }
3133
3134 pub fn with_format(mut self, format: ExplainFormat) -> Self {
3136 self.format = format;
3137 self
3138 }
3139}
3140
3141#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3148pub struct Explain {
3149 pub verbose: bool,
3151 pub explain_format: ExplainFormat,
3154 pub plan: Arc<LogicalPlan>,
3156 pub stringified_plans: Vec<StringifiedPlan>,
3158 pub schema: DFSchemaRef,
3160 pub logical_optimization_succeeded: bool,
3162}
3163
3164impl PartialOrd for Explain {
3166 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3167 #[derive(PartialEq, PartialOrd)]
3168 struct ComparableExplain<'a> {
3169 pub verbose: &'a bool,
3171 pub plan: &'a Arc<LogicalPlan>,
3173 pub stringified_plans: &'a Vec<StringifiedPlan>,
3175 pub logical_optimization_succeeded: &'a bool,
3177 }
3178 let comparable_self = ComparableExplain {
3179 verbose: &self.verbose,
3180 plan: &self.plan,
3181 stringified_plans: &self.stringified_plans,
3182 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3183 };
3184 let comparable_other = ComparableExplain {
3185 verbose: &other.verbose,
3186 plan: &other.plan,
3187 stringified_plans: &other.stringified_plans,
3188 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3189 };
3190 comparable_self.partial_cmp(&comparable_other)
3191 }
3192}
3193
3194#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3197pub struct Analyze {
3198 pub verbose: bool,
3200 pub input: Arc<LogicalPlan>,
3202 pub schema: DFSchemaRef,
3204}
3205
3206impl PartialOrd for Analyze {
3208 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3209 match self.verbose.partial_cmp(&other.verbose) {
3210 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3211 cmp => cmp,
3212 }
3213 }
3214}
3215
3216#[allow(clippy::derived_hash_with_manual_eq)]
3221#[derive(Debug, Clone, Eq, Hash)]
3222pub struct Extension {
3223 pub node: Arc<dyn UserDefinedLogicalNode>,
3225}
3226
3227impl PartialEq for Extension {
3231 fn eq(&self, other: &Self) -> bool {
3232 self.node.eq(&other.node)
3233 }
3234}
3235
3236impl PartialOrd for Extension {
3237 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3238 self.node.partial_cmp(&other.node)
3239 }
3240}
3241
3242#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3244pub struct Limit {
3245 pub skip: Option<Box<Expr>>,
3247 pub fetch: Option<Box<Expr>>,
3250 pub input: Arc<LogicalPlan>,
3252}
3253
3254pub enum SkipType {
3256 Literal(usize),
3258 UnsupportedExpr,
3260}
3261
3262pub enum FetchType {
3264 Literal(Option<usize>),
3267 UnsupportedExpr,
3269}
3270
3271impl Limit {
3272 pub fn get_skip_type(&self) -> Result<SkipType> {
3274 match self.skip.as_deref() {
3275 Some(expr) => match *expr {
3276 Expr::Literal(ScalarValue::Int64(s), _) => {
3277 let s = s.unwrap_or(0);
3279 if s >= 0 {
3280 Ok(SkipType::Literal(s as usize))
3281 } else {
3282 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3283 }
3284 }
3285 _ => Ok(SkipType::UnsupportedExpr),
3286 },
3287 None => Ok(SkipType::Literal(0)),
3289 }
3290 }
3291
3292 pub fn get_fetch_type(&self) -> Result<FetchType> {
3294 match self.fetch.as_deref() {
3295 Some(expr) => match *expr {
3296 Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3297 if s >= 0 {
3298 Ok(FetchType::Literal(Some(s as usize)))
3299 } else {
3300 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3301 }
3302 }
3303 Expr::Literal(ScalarValue::Int64(None), _) => {
3304 Ok(FetchType::Literal(None))
3305 }
3306 _ => Ok(FetchType::UnsupportedExpr),
3307 },
3308 None => Ok(FetchType::Literal(None)),
3309 }
3310 }
3311}
3312
3313#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3315pub enum Distinct {
3316 All(Arc<LogicalPlan>),
3318 On(DistinctOn),
3320}
3321
3322impl Distinct {
3323 pub fn input(&self) -> &Arc<LogicalPlan> {
3325 match self {
3326 Distinct::All(input) => input,
3327 Distinct::On(DistinctOn { input, .. }) => input,
3328 }
3329 }
3330}
3331
3332#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3334pub struct DistinctOn {
3335 pub on_expr: Vec<Expr>,
3337 pub select_expr: Vec<Expr>,
3339 pub sort_expr: Option<Vec<SortExpr>>,
3343 pub input: Arc<LogicalPlan>,
3345 pub schema: DFSchemaRef,
3347}
3348
3349impl DistinctOn {
3350 pub fn try_new(
3352 on_expr: Vec<Expr>,
3353 select_expr: Vec<Expr>,
3354 sort_expr: Option<Vec<SortExpr>>,
3355 input: Arc<LogicalPlan>,
3356 ) -> Result<Self> {
3357 if on_expr.is_empty() {
3358 return plan_err!("No `ON` expressions provided");
3359 }
3360
3361 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3362 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3363 .into_iter()
3364 .collect();
3365
3366 let dfschema = DFSchema::new_with_metadata(
3367 qualified_fields,
3368 input.schema().metadata().clone(),
3369 )?;
3370
3371 let mut distinct_on = DistinctOn {
3372 on_expr,
3373 select_expr,
3374 sort_expr: None,
3375 input,
3376 schema: Arc::new(dfschema),
3377 };
3378
3379 if let Some(sort_expr) = sort_expr {
3380 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3381 }
3382
3383 Ok(distinct_on)
3384 }
3385
3386 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3390 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3391
3392 let mut matched = true;
3394 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3395 if on != &sort.expr {
3396 matched = false;
3397 break;
3398 }
3399 }
3400
3401 if self.on_expr.len() > sort_expr.len() || !matched {
3402 return plan_err!(
3403 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3404 );
3405 }
3406
3407 self.sort_expr = Some(sort_expr);
3408 Ok(self)
3409 }
3410}
3411
3412impl PartialOrd for DistinctOn {
3414 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3415 #[derive(PartialEq, PartialOrd)]
3416 struct ComparableDistinctOn<'a> {
3417 pub on_expr: &'a Vec<Expr>,
3419 pub select_expr: &'a Vec<Expr>,
3421 pub sort_expr: &'a Option<Vec<SortExpr>>,
3425 pub input: &'a Arc<LogicalPlan>,
3427 }
3428 let comparable_self = ComparableDistinctOn {
3429 on_expr: &self.on_expr,
3430 select_expr: &self.select_expr,
3431 sort_expr: &self.sort_expr,
3432 input: &self.input,
3433 };
3434 let comparable_other = ComparableDistinctOn {
3435 on_expr: &other.on_expr,
3436 select_expr: &other.select_expr,
3437 sort_expr: &other.sort_expr,
3438 input: &other.input,
3439 };
3440 comparable_self.partial_cmp(&comparable_other)
3441 }
3442}
3443
3444#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3457#[non_exhaustive]
3459pub struct Aggregate {
3460 pub input: Arc<LogicalPlan>,
3462 pub group_expr: Vec<Expr>,
3464 pub aggr_expr: Vec<Expr>,
3466 pub schema: DFSchemaRef,
3468}
3469
3470impl Aggregate {
3471 pub fn try_new(
3473 input: Arc<LogicalPlan>,
3474 group_expr: Vec<Expr>,
3475 aggr_expr: Vec<Expr>,
3476 ) -> Result<Self> {
3477 let group_expr = enumerate_grouping_sets(group_expr)?;
3478
3479 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3480
3481 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3482
3483 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3484
3485 if is_grouping_set {
3487 qualified_fields = qualified_fields
3488 .into_iter()
3489 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3490 .collect::<Vec<_>>();
3491 qualified_fields.push((
3492 None,
3493 Field::new(
3494 Self::INTERNAL_GROUPING_ID,
3495 Self::grouping_id_type(qualified_fields.len()),
3496 false,
3497 )
3498 .into(),
3499 ));
3500 }
3501
3502 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3503
3504 let schema = DFSchema::new_with_metadata(
3505 qualified_fields,
3506 input.schema().metadata().clone(),
3507 )?;
3508
3509 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3510 }
3511
3512 pub fn try_new_with_schema(
3518 input: Arc<LogicalPlan>,
3519 group_expr: Vec<Expr>,
3520 aggr_expr: Vec<Expr>,
3521 schema: DFSchemaRef,
3522 ) -> Result<Self> {
3523 if group_expr.is_empty() && aggr_expr.is_empty() {
3524 return plan_err!(
3525 "Aggregate requires at least one grouping or aggregate expression"
3526 );
3527 }
3528 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3529 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3530 return plan_err!(
3531 "Aggregate schema has wrong number of fields. Expected {} got {}",
3532 group_expr_count + aggr_expr.len(),
3533 schema.fields().len()
3534 );
3535 }
3536
3537 let aggregate_func_dependencies =
3538 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3539 let new_schema = schema.as_ref().clone();
3540 let schema = Arc::new(
3541 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3542 );
3543 Ok(Self {
3544 input,
3545 group_expr,
3546 aggr_expr,
3547 schema,
3548 })
3549 }
3550
3551 fn is_grouping_set(&self) -> bool {
3552 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3553 }
3554
3555 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3557 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3558 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3559 });
3560 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3561 if self.is_grouping_set() {
3562 exprs.push(&INTERNAL_ID_EXPR);
3563 }
3564 exprs.extend(self.aggr_expr.iter());
3565 debug_assert!(exprs.len() == self.schema.fields().len());
3566 Ok(exprs)
3567 }
3568
3569 pub fn group_expr_len(&self) -> Result<usize> {
3573 grouping_set_expr_count(&self.group_expr)
3574 }
3575
3576 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3581 if group_exprs <= 8 {
3582 DataType::UInt8
3583 } else if group_exprs <= 16 {
3584 DataType::UInt16
3585 } else if group_exprs <= 32 {
3586 DataType::UInt32
3587 } else {
3588 DataType::UInt64
3589 }
3590 }
3591
3592 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3610}
3611
3612impl PartialOrd for Aggregate {
3614 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3615 match self.input.partial_cmp(&other.input) {
3616 Some(Ordering::Equal) => {
3617 match self.group_expr.partial_cmp(&other.group_expr) {
3618 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3619 cmp => cmp,
3620 }
3621 }
3622 cmp => cmp,
3623 }
3624 }
3625}
3626
3627fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3629 group_expr
3630 .iter()
3631 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3632}
3633
3634fn calc_func_dependencies_for_aggregate(
3636 group_expr: &[Expr],
3638 input: &LogicalPlan,
3640 aggr_schema: &DFSchema,
3642) -> Result<FunctionalDependencies> {
3643 if !contains_grouping_set(group_expr) {
3649 let group_by_expr_names = group_expr
3650 .iter()
3651 .map(|item| item.schema_name().to_string())
3652 .collect::<IndexSet<_>>()
3653 .into_iter()
3654 .collect::<Vec<_>>();
3655 let aggregate_func_dependencies = aggregate_functional_dependencies(
3656 input.schema(),
3657 &group_by_expr_names,
3658 aggr_schema,
3659 );
3660 Ok(aggregate_func_dependencies)
3661 } else {
3662 Ok(FunctionalDependencies::empty())
3663 }
3664}
3665
3666fn calc_func_dependencies_for_project(
3669 exprs: &[Expr],
3670 input: &LogicalPlan,
3671) -> Result<FunctionalDependencies> {
3672 let input_fields = input.schema().field_names();
3673 let proj_indices = exprs
3675 .iter()
3676 .map(|expr| match expr {
3677 #[expect(deprecated)]
3678 Expr::Wildcard { qualifier, options } => {
3679 let wildcard_fields = exprlist_to_fields(
3680 vec![&Expr::Wildcard {
3681 qualifier: qualifier.clone(),
3682 options: options.clone(),
3683 }],
3684 input,
3685 )?;
3686 Ok::<_, DataFusionError>(
3687 wildcard_fields
3688 .into_iter()
3689 .filter_map(|(qualifier, f)| {
3690 let flat_name = qualifier
3691 .map(|t| format!("{}.{}", t, f.name()))
3692 .unwrap_or_else(|| f.name().clone());
3693 input_fields.iter().position(|item| *item == flat_name)
3694 })
3695 .collect::<Vec<_>>(),
3696 )
3697 }
3698 Expr::Alias(alias) => {
3699 let name = format!("{}", alias.expr);
3700 Ok(input_fields
3701 .iter()
3702 .position(|item| *item == name)
3703 .map(|i| vec![i])
3704 .unwrap_or(vec![]))
3705 }
3706 _ => {
3707 let name = format!("{expr}");
3708 Ok(input_fields
3709 .iter()
3710 .position(|item| *item == name)
3711 .map(|i| vec![i])
3712 .unwrap_or(vec![]))
3713 }
3714 })
3715 .collect::<Result<Vec<_>>>()?
3716 .into_iter()
3717 .flatten()
3718 .collect::<Vec<_>>();
3719
3720 Ok(input
3721 .schema()
3722 .functional_dependencies()
3723 .project_functional_dependencies(&proj_indices, exprs.len()))
3724}
3725
3726#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3728pub struct Sort {
3729 pub expr: Vec<SortExpr>,
3731 pub input: Arc<LogicalPlan>,
3733 pub fetch: Option<usize>,
3735}
3736
3737#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3739pub struct Join {
3740 pub left: Arc<LogicalPlan>,
3742 pub right: Arc<LogicalPlan>,
3744 pub on: Vec<(Expr, Expr)>,
3746 pub filter: Option<Expr>,
3748 pub join_type: JoinType,
3750 pub join_constraint: JoinConstraint,
3752 pub schema: DFSchemaRef,
3754 pub null_equality: NullEquality,
3756}
3757
3758impl Join {
3759 pub fn try_new(
3778 left: Arc<LogicalPlan>,
3779 right: Arc<LogicalPlan>,
3780 on: Vec<(Expr, Expr)>,
3781 filter: Option<Expr>,
3782 join_type: JoinType,
3783 join_constraint: JoinConstraint,
3784 null_equality: NullEquality,
3785 ) -> Result<Self> {
3786 let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3787
3788 Ok(Join {
3789 left,
3790 right,
3791 on,
3792 filter,
3793 join_type,
3794 join_constraint,
3795 schema: Arc::new(join_schema),
3796 null_equality,
3797 })
3798 }
3799
3800 pub fn try_new_with_project_input(
3803 original: &LogicalPlan,
3804 left: Arc<LogicalPlan>,
3805 right: Arc<LogicalPlan>,
3806 column_on: (Vec<Column>, Vec<Column>),
3807 ) -> Result<(Self, bool)> {
3808 let original_join = match original {
3809 LogicalPlan::Join(join) => join,
3810 _ => return plan_err!("Could not create join with project input"),
3811 };
3812
3813 let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3814 let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3815
3816 let mut requalified = false;
3817
3818 if original_join.join_type == JoinType::Inner
3821 || original_join.join_type == JoinType::Left
3822 || original_join.join_type == JoinType::Right
3823 || original_join.join_type == JoinType::Full
3824 {
3825 (left_sch, right_sch, requalified) =
3826 requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3827 }
3828
3829 let on: Vec<(Expr, Expr)> = column_on
3830 .0
3831 .into_iter()
3832 .zip(column_on.1)
3833 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3834 .collect();
3835
3836 let join_schema = build_join_schema(
3837 left_sch.schema(),
3838 right_sch.schema(),
3839 &original_join.join_type,
3840 )?;
3841
3842 Ok((
3843 Join {
3844 left,
3845 right,
3846 on,
3847 filter: original_join.filter.clone(),
3848 join_type: original_join.join_type,
3849 join_constraint: original_join.join_constraint,
3850 schema: Arc::new(join_schema),
3851 null_equality: original_join.null_equality,
3852 },
3853 requalified,
3854 ))
3855 }
3856}
3857
3858impl PartialOrd for Join {
3860 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3861 #[derive(PartialEq, PartialOrd)]
3862 struct ComparableJoin<'a> {
3863 pub left: &'a Arc<LogicalPlan>,
3865 pub right: &'a Arc<LogicalPlan>,
3867 pub on: &'a Vec<(Expr, Expr)>,
3869 pub filter: &'a Option<Expr>,
3871 pub join_type: &'a JoinType,
3873 pub join_constraint: &'a JoinConstraint,
3875 pub null_equality: &'a NullEquality,
3877 }
3878 let comparable_self = ComparableJoin {
3879 left: &self.left,
3880 right: &self.right,
3881 on: &self.on,
3882 filter: &self.filter,
3883 join_type: &self.join_type,
3884 join_constraint: &self.join_constraint,
3885 null_equality: &self.null_equality,
3886 };
3887 let comparable_other = ComparableJoin {
3888 left: &other.left,
3889 right: &other.right,
3890 on: &other.on,
3891 filter: &other.filter,
3892 join_type: &other.join_type,
3893 join_constraint: &other.join_constraint,
3894 null_equality: &other.null_equality,
3895 };
3896 comparable_self.partial_cmp(&comparable_other)
3897 }
3898}
3899
3900#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3902pub struct Subquery {
3903 pub subquery: Arc<LogicalPlan>,
3905 pub outer_ref_columns: Vec<Expr>,
3907 pub spans: Spans,
3909}
3910
3911impl Normalizeable for Subquery {
3912 fn can_normalize(&self) -> bool {
3913 false
3914 }
3915}
3916
3917impl NormalizeEq for Subquery {
3918 fn normalize_eq(&self, other: &Self) -> bool {
3919 *self.subquery == *other.subquery
3921 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3922 && self
3923 .outer_ref_columns
3924 .iter()
3925 .zip(other.outer_ref_columns.iter())
3926 .all(|(a, b)| a.normalize_eq(b))
3927 }
3928}
3929
3930impl Subquery {
3931 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3932 match plan {
3933 Expr::ScalarSubquery(it) => Ok(it),
3934 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3935 _ => plan_err!("Could not coerce into ScalarSubquery!"),
3936 }
3937 }
3938
3939 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3940 Subquery {
3941 subquery: plan,
3942 outer_ref_columns: self.outer_ref_columns.clone(),
3943 spans: Spans::new(),
3944 }
3945 }
3946}
3947
3948impl Debug for Subquery {
3949 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3950 write!(f, "<subquery>")
3951 }
3952}
3953
3954#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3960pub enum Partitioning {
3961 RoundRobinBatch(usize),
3963 Hash(Vec<Expr>, usize),
3966 DistributeBy(Vec<Expr>),
3968}
3969
3970#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
3990pub struct ColumnUnnestList {
3991 pub output_column: Column,
3992 pub depth: usize,
3993}
3994
3995impl Display for ColumnUnnestList {
3996 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3997 write!(f, "{}|depth={}", self.output_column, self.depth)
3998 }
3999}
4000
4001#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4004pub struct Unnest {
4005 pub input: Arc<LogicalPlan>,
4007 pub exec_columns: Vec<Column>,
4009 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4012 pub struct_type_columns: Vec<usize>,
4015 pub dependency_indices: Vec<usize>,
4018 pub schema: DFSchemaRef,
4020 pub options: UnnestOptions,
4022}
4023
4024impl PartialOrd for Unnest {
4026 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4027 #[derive(PartialEq, PartialOrd)]
4028 struct ComparableUnnest<'a> {
4029 pub input: &'a Arc<LogicalPlan>,
4031 pub exec_columns: &'a Vec<Column>,
4033 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4036 pub struct_type_columns: &'a Vec<usize>,
4039 pub dependency_indices: &'a Vec<usize>,
4042 pub options: &'a UnnestOptions,
4044 }
4045 let comparable_self = ComparableUnnest {
4046 input: &self.input,
4047 exec_columns: &self.exec_columns,
4048 list_type_columns: &self.list_type_columns,
4049 struct_type_columns: &self.struct_type_columns,
4050 dependency_indices: &self.dependency_indices,
4051 options: &self.options,
4052 };
4053 let comparable_other = ComparableUnnest {
4054 input: &other.input,
4055 exec_columns: &other.exec_columns,
4056 list_type_columns: &other.list_type_columns,
4057 struct_type_columns: &other.struct_type_columns,
4058 dependency_indices: &other.dependency_indices,
4059 options: &other.options,
4060 };
4061 comparable_self.partial_cmp(&comparable_other)
4062 }
4063}
4064
4065impl Unnest {
4066 pub fn try_new(
4067 input: Arc<LogicalPlan>,
4068 exec_columns: Vec<Column>,
4069 options: UnnestOptions,
4070 ) -> Result<Self> {
4071 if exec_columns.is_empty() {
4072 return plan_err!("unnest plan requires at least 1 column to unnest");
4073 }
4074
4075 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4076 let mut struct_columns = vec![];
4077 let indices_to_unnest = exec_columns
4078 .iter()
4079 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4080 .collect::<Result<HashMap<usize, &Column>>>()?;
4081
4082 let input_schema = input.schema();
4083
4084 let mut dependency_indices = vec![];
4085 let fields = input_schema
4101 .iter()
4102 .enumerate()
4103 .map(|(index, (original_qualifier, original_field))| {
4104 match indices_to_unnest.get(&index) {
4105 Some(column_to_unnest) => {
4106 let recursions_on_column = options
4107 .recursions
4108 .iter()
4109 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4110 .collect::<Vec<_>>();
4111 let mut transformed_columns = recursions_on_column
4112 .iter()
4113 .map(|r| {
4114 list_columns.push((
4115 index,
4116 ColumnUnnestList {
4117 output_column: r.output_column.clone(),
4118 depth: r.depth,
4119 },
4120 ));
4121 Ok(get_unnested_columns(
4122 &r.output_column.name,
4123 original_field.data_type(),
4124 r.depth,
4125 )?
4126 .into_iter()
4127 .next()
4128 .unwrap()) })
4130 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4131 if transformed_columns.is_empty() {
4132 transformed_columns = get_unnested_columns(
4133 &column_to_unnest.name,
4134 original_field.data_type(),
4135 1,
4136 )?;
4137 match original_field.data_type() {
4138 DataType::Struct(_) => {
4139 struct_columns.push(index);
4140 }
4141 DataType::List(_)
4142 | DataType::FixedSizeList(_, _)
4143 | DataType::LargeList(_) => {
4144 list_columns.push((
4145 index,
4146 ColumnUnnestList {
4147 output_column: Column::from_name(
4148 &column_to_unnest.name,
4149 ),
4150 depth: 1,
4151 },
4152 ));
4153 }
4154 _ => {}
4155 };
4156 }
4157
4158 dependency_indices.extend(std::iter::repeat_n(
4160 index,
4161 transformed_columns.len(),
4162 ));
4163 Ok(transformed_columns
4164 .iter()
4165 .map(|(col, field)| {
4166 (col.relation.to_owned(), field.to_owned())
4167 })
4168 .collect())
4169 }
4170 None => {
4171 dependency_indices.push(index);
4172 Ok(vec![(
4173 original_qualifier.cloned(),
4174 Arc::clone(original_field),
4175 )])
4176 }
4177 }
4178 })
4179 .collect::<Result<Vec<_>>>()?
4180 .into_iter()
4181 .flatten()
4182 .collect::<Vec<_>>();
4183
4184 let metadata = input_schema.metadata().clone();
4185 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4186 let deps = input_schema.functional_dependencies().clone();
4188 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4189
4190 Ok(Unnest {
4191 input,
4192 exec_columns,
4193 list_type_columns: list_columns,
4194 struct_type_columns: struct_columns,
4195 dependency_indices,
4196 schema,
4197 options,
4198 })
4199 }
4200}
4201
4202fn get_unnested_columns(
4211 col_name: &String,
4212 data_type: &DataType,
4213 depth: usize,
4214) -> Result<Vec<(Column, Arc<Field>)>> {
4215 let mut qualified_columns = Vec::with_capacity(1);
4216
4217 match data_type {
4218 DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4219 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4220 let new_field = Arc::new(Field::new(
4221 col_name, data_type,
4222 true,
4225 ));
4226 let column = Column::from_name(col_name);
4227 qualified_columns.push((column, new_field));
4229 }
4230 DataType::Struct(fields) => {
4231 qualified_columns.extend(fields.iter().map(|f| {
4232 let new_name = format!("{}.{}", col_name, f.name());
4233 let column = Column::from_name(&new_name);
4234 let new_field = f.as_ref().clone().with_name(new_name);
4235 (column, Arc::new(new_field))
4237 }))
4238 }
4239 _ => {
4240 return internal_err!(
4241 "trying to unnest on invalid data type {:?}",
4242 data_type
4243 );
4244 }
4245 };
4246 Ok(qualified_columns)
4247}
4248
4249fn get_unnested_list_datatype_recursive(
4252 data_type: &DataType,
4253 depth: usize,
4254) -> Result<DataType> {
4255 match data_type {
4256 DataType::List(field)
4257 | DataType::FixedSizeList(field, _)
4258 | DataType::LargeList(field) => {
4259 if depth == 1 {
4260 return Ok(field.data_type().clone());
4261 }
4262 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4263 }
4264 _ => {}
4265 };
4266
4267 internal_err!("trying to unnest on invalid data type {:?}", data_type)
4268}
4269
4270#[cfg(test)]
4271mod tests {
4272
4273 use super::*;
4274 use crate::builder::LogicalTableSource;
4275 use crate::logical_plan::table_scan;
4276 use crate::{
4277 binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
4278 GroupingSet,
4279 };
4280
4281 use datafusion_common::tree_node::{
4282 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4283 };
4284 use datafusion_common::{not_impl_err, Constraint, ScalarValue};
4285 use insta::{assert_debug_snapshot, assert_snapshot};
4286
4287 use crate::test::function_stub::count;
4288
4289 fn employee_schema() -> Schema {
4290 Schema::new(vec![
4291 Field::new("id", DataType::Int32, false),
4292 Field::new("first_name", DataType::Utf8, false),
4293 Field::new("last_name", DataType::Utf8, false),
4294 Field::new("state", DataType::Utf8, false),
4295 Field::new("salary", DataType::Int32, false),
4296 ])
4297 }
4298
4299 fn display_plan() -> Result<LogicalPlan> {
4300 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4301 .build()?;
4302
4303 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4304 .filter(in_subquery(col("state"), Arc::new(plan1)))?
4305 .project(vec![col("id")])?
4306 .build()
4307 }
4308
4309 #[test]
4310 fn test_display_indent() -> Result<()> {
4311 let plan = display_plan()?;
4312
4313 assert_snapshot!(plan.display_indent(), @r"
4314 Projection: employee_csv.id
4315 Filter: employee_csv.state IN (<subquery>)
4316 Subquery:
4317 TableScan: employee_csv projection=[state]
4318 TableScan: employee_csv projection=[id, state]
4319 ");
4320 Ok(())
4321 }
4322
4323 #[test]
4324 fn test_display_indent_schema() -> Result<()> {
4325 let plan = display_plan()?;
4326
4327 assert_snapshot!(plan.display_indent_schema(), @r"
4328 Projection: employee_csv.id [id:Int32]
4329 Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4330 Subquery: [state:Utf8]
4331 TableScan: employee_csv projection=[state] [state:Utf8]
4332 TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4333 ");
4334 Ok(())
4335 }
4336
4337 #[test]
4338 fn test_display_subquery_alias() -> Result<()> {
4339 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4340 .build()?;
4341 let plan1 = Arc::new(plan1);
4342
4343 let plan =
4344 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4345 .project(vec![col("id"), exists(plan1).alias("exists")])?
4346 .build();
4347
4348 assert_snapshot!(plan?.display_indent(), @r"
4349 Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4350 Subquery:
4351 TableScan: employee_csv projection=[state]
4352 TableScan: employee_csv projection=[id, state]
4353 ");
4354 Ok(())
4355 }
4356
4357 #[test]
4358 fn test_display_graphviz() -> Result<()> {
4359 let plan = display_plan()?;
4360
4361 assert_snapshot!(plan.display_graphviz(), @r#"
4364 // Begin DataFusion GraphViz Plan,
4365 // display it online here: https://dreampuf.github.io/GraphvizOnline
4366
4367 digraph {
4368 subgraph cluster_1
4369 {
4370 graph[label="LogicalPlan"]
4371 2[shape=box label="Projection: employee_csv.id"]
4372 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4373 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4374 4[shape=box label="Subquery:"]
4375 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4376 5[shape=box label="TableScan: employee_csv projection=[state]"]
4377 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4378 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4379 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4380 }
4381 subgraph cluster_7
4382 {
4383 graph[label="Detailed LogicalPlan"]
4384 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4385 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4386 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4387 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4388 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4389 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4390 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4391 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4392 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4393 }
4394 }
4395 // End DataFusion GraphViz Plan
4396 "#);
4397 Ok(())
4398 }
4399
4400 #[test]
4401 fn test_display_pg_json() -> Result<()> {
4402 let plan = display_plan()?;
4403
4404 assert_snapshot!(plan.display_pg_json(), @r#"
4405 [
4406 {
4407 "Plan": {
4408 "Expressions": [
4409 "employee_csv.id"
4410 ],
4411 "Node Type": "Projection",
4412 "Output": [
4413 "id"
4414 ],
4415 "Plans": [
4416 {
4417 "Condition": "employee_csv.state IN (<subquery>)",
4418 "Node Type": "Filter",
4419 "Output": [
4420 "id",
4421 "state"
4422 ],
4423 "Plans": [
4424 {
4425 "Node Type": "Subquery",
4426 "Output": [
4427 "state"
4428 ],
4429 "Plans": [
4430 {
4431 "Node Type": "TableScan",
4432 "Output": [
4433 "state"
4434 ],
4435 "Plans": [],
4436 "Relation Name": "employee_csv"
4437 }
4438 ]
4439 },
4440 {
4441 "Node Type": "TableScan",
4442 "Output": [
4443 "id",
4444 "state"
4445 ],
4446 "Plans": [],
4447 "Relation Name": "employee_csv"
4448 }
4449 ]
4450 }
4451 ]
4452 }
4453 }
4454 ]
4455 "#);
4456 Ok(())
4457 }
4458
4459 #[derive(Debug, Default)]
4461 struct OkVisitor {
4462 strings: Vec<String>,
4463 }
4464
4465 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4466 type Node = LogicalPlan;
4467
4468 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4469 let s = match plan {
4470 LogicalPlan::Projection { .. } => "pre_visit Projection",
4471 LogicalPlan::Filter { .. } => "pre_visit Filter",
4472 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4473 _ => {
4474 return not_impl_err!("unknown plan type");
4475 }
4476 };
4477
4478 self.strings.push(s.into());
4479 Ok(TreeNodeRecursion::Continue)
4480 }
4481
4482 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4483 let s = match plan {
4484 LogicalPlan::Projection { .. } => "post_visit Projection",
4485 LogicalPlan::Filter { .. } => "post_visit Filter",
4486 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4487 _ => {
4488 return not_impl_err!("unknown plan type");
4489 }
4490 };
4491
4492 self.strings.push(s.into());
4493 Ok(TreeNodeRecursion::Continue)
4494 }
4495 }
4496
4497 #[test]
4498 fn visit_order() {
4499 let mut visitor = OkVisitor::default();
4500 let plan = test_plan();
4501 let res = plan.visit_with_subqueries(&mut visitor);
4502 assert!(res.is_ok());
4503
4504 assert_debug_snapshot!(visitor.strings, @r#"
4505 [
4506 "pre_visit Projection",
4507 "pre_visit Filter",
4508 "pre_visit TableScan",
4509 "post_visit TableScan",
4510 "post_visit Filter",
4511 "post_visit Projection",
4512 ]
4513 "#);
4514 }
4515
4516 #[derive(Debug, Default)]
4517 struct OptionalCounter {
4519 val: Option<usize>,
4520 }
4521
4522 impl OptionalCounter {
4523 fn new(val: usize) -> Self {
4524 Self { val: Some(val) }
4525 }
4526 fn dec(&mut self) -> bool {
4528 if Some(0) == self.val {
4529 true
4530 } else {
4531 self.val = self.val.take().map(|i| i - 1);
4532 false
4533 }
4534 }
4535 }
4536
4537 #[derive(Debug, Default)]
4538 struct StoppingVisitor {
4540 inner: OkVisitor,
4541 return_false_from_pre_in: OptionalCounter,
4543 return_false_from_post_in: OptionalCounter,
4545 }
4546
4547 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4548 type Node = LogicalPlan;
4549
4550 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4551 if self.return_false_from_pre_in.dec() {
4552 return Ok(TreeNodeRecursion::Stop);
4553 }
4554 self.inner.f_down(plan)?;
4555
4556 Ok(TreeNodeRecursion::Continue)
4557 }
4558
4559 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4560 if self.return_false_from_post_in.dec() {
4561 return Ok(TreeNodeRecursion::Stop);
4562 }
4563
4564 self.inner.f_up(plan)
4565 }
4566 }
4567
4568 #[test]
4570 fn early_stopping_pre_visit() {
4571 let mut visitor = StoppingVisitor {
4572 return_false_from_pre_in: OptionalCounter::new(2),
4573 ..Default::default()
4574 };
4575 let plan = test_plan();
4576 let res = plan.visit_with_subqueries(&mut visitor);
4577 assert!(res.is_ok());
4578
4579 assert_debug_snapshot!(
4580 visitor.inner.strings,
4581 @r#"
4582 [
4583 "pre_visit Projection",
4584 "pre_visit Filter",
4585 ]
4586 "#
4587 );
4588 }
4589
4590 #[test]
4591 fn early_stopping_post_visit() {
4592 let mut visitor = StoppingVisitor {
4593 return_false_from_post_in: OptionalCounter::new(1),
4594 ..Default::default()
4595 };
4596 let plan = test_plan();
4597 let res = plan.visit_with_subqueries(&mut visitor);
4598 assert!(res.is_ok());
4599
4600 assert_debug_snapshot!(
4601 visitor.inner.strings,
4602 @r#"
4603 [
4604 "pre_visit Projection",
4605 "pre_visit Filter",
4606 "pre_visit TableScan",
4607 "post_visit TableScan",
4608 ]
4609 "#
4610 );
4611 }
4612
4613 #[derive(Debug, Default)]
4614 struct ErrorVisitor {
4616 inner: OkVisitor,
4617 return_error_from_pre_in: OptionalCounter,
4619 return_error_from_post_in: OptionalCounter,
4621 }
4622
4623 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4624 type Node = LogicalPlan;
4625
4626 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4627 if self.return_error_from_pre_in.dec() {
4628 return not_impl_err!("Error in pre_visit");
4629 }
4630
4631 self.inner.f_down(plan)
4632 }
4633
4634 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4635 if self.return_error_from_post_in.dec() {
4636 return not_impl_err!("Error in post_visit");
4637 }
4638
4639 self.inner.f_up(plan)
4640 }
4641 }
4642
4643 #[test]
4644 fn error_pre_visit() {
4645 let mut visitor = ErrorVisitor {
4646 return_error_from_pre_in: OptionalCounter::new(2),
4647 ..Default::default()
4648 };
4649 let plan = test_plan();
4650 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4651 assert_snapshot!(
4652 res.strip_backtrace(),
4653 @"This feature is not implemented: Error in pre_visit"
4654 );
4655 assert_debug_snapshot!(
4656 visitor.inner.strings,
4657 @r#"
4658 [
4659 "pre_visit Projection",
4660 "pre_visit Filter",
4661 ]
4662 "#
4663 );
4664 }
4665
4666 #[test]
4667 fn error_post_visit() {
4668 let mut visitor = ErrorVisitor {
4669 return_error_from_post_in: OptionalCounter::new(1),
4670 ..Default::default()
4671 };
4672 let plan = test_plan();
4673 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4674 assert_snapshot!(
4675 res.strip_backtrace(),
4676 @"This feature is not implemented: Error in post_visit"
4677 );
4678 assert_debug_snapshot!(
4679 visitor.inner.strings,
4680 @r#"
4681 [
4682 "pre_visit Projection",
4683 "pre_visit Filter",
4684 "pre_visit TableScan",
4685 "post_visit TableScan",
4686 ]
4687 "#
4688 );
4689 }
4690
4691 #[test]
4692 fn projection_expr_schema_mismatch() -> Result<()> {
4693 let empty_schema = Arc::new(DFSchema::empty());
4694 let p = Projection::try_new_with_schema(
4695 vec![col("a")],
4696 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4697 produce_one_row: false,
4698 schema: Arc::clone(&empty_schema),
4699 })),
4700 empty_schema,
4701 );
4702 assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4703 Ok(())
4704 }
4705
4706 fn test_plan() -> LogicalPlan {
4707 let schema = Schema::new(vec![
4708 Field::new("id", DataType::Int32, false),
4709 Field::new("state", DataType::Utf8, false),
4710 ]);
4711
4712 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4713 .unwrap()
4714 .filter(col("state").eq(lit("CO")))
4715 .unwrap()
4716 .project(vec![col("id")])
4717 .unwrap()
4718 .build()
4719 .unwrap()
4720 }
4721
4722 #[test]
4723 fn test_replace_invalid_placeholder() {
4724 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4726
4727 let plan = table_scan(TableReference::none(), &schema, None)
4728 .unwrap()
4729 .filter(col("id").eq(placeholder("")))
4730 .unwrap()
4731 .build()
4732 .unwrap();
4733
4734 let param_values = vec![ScalarValue::Int32(Some(42))];
4735 plan.replace_params_with_values(¶m_values.clone().into())
4736 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4737
4738 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4740
4741 let plan = table_scan(TableReference::none(), &schema, None)
4742 .unwrap()
4743 .filter(col("id").eq(placeholder("$0")))
4744 .unwrap()
4745 .build()
4746 .unwrap();
4747
4748 plan.replace_params_with_values(¶m_values.clone().into())
4749 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4750
4751 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4753
4754 let plan = table_scan(TableReference::none(), &schema, None)
4755 .unwrap()
4756 .filter(col("id").eq(placeholder("$00")))
4757 .unwrap()
4758 .build()
4759 .unwrap();
4760
4761 plan.replace_params_with_values(¶m_values.into())
4762 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4763 }
4764
4765 #[test]
4766 fn test_nullable_schema_after_grouping_set() {
4767 let schema = Schema::new(vec![
4768 Field::new("foo", DataType::Int32, false),
4769 Field::new("bar", DataType::Int32, false),
4770 ]);
4771
4772 let plan = table_scan(TableReference::none(), &schema, None)
4773 .unwrap()
4774 .aggregate(
4775 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4776 vec![col("foo")],
4777 vec![col("bar")],
4778 ]))],
4779 vec![count(lit(true))],
4780 )
4781 .unwrap()
4782 .build()
4783 .unwrap();
4784
4785 let output_schema = plan.schema();
4786
4787 assert!(output_schema
4788 .field_with_name(None, "foo")
4789 .unwrap()
4790 .is_nullable(),);
4791 assert!(output_schema
4792 .field_with_name(None, "bar")
4793 .unwrap()
4794 .is_nullable());
4795 }
4796
4797 #[test]
4798 fn test_filter_is_scalar() {
4799 let schema =
4801 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4802
4803 let source = Arc::new(LogicalTableSource::new(schema));
4804 let schema = Arc::new(
4805 DFSchema::try_from_qualified_schema(
4806 TableReference::bare("tab"),
4807 &source.schema(),
4808 )
4809 .unwrap(),
4810 );
4811 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4812 table_name: TableReference::bare("tab"),
4813 source: Arc::clone(&source) as Arc<dyn TableSource>,
4814 projection: None,
4815 projected_schema: Arc::clone(&schema),
4816 filters: vec![],
4817 fetch: None,
4818 }));
4819 let col = schema.field_names()[0].clone();
4820
4821 let filter = Filter::try_new(
4822 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4823 scan,
4824 )
4825 .unwrap();
4826 assert!(!filter.is_scalar());
4827 let unique_schema = Arc::new(
4828 schema
4829 .as_ref()
4830 .clone()
4831 .with_functional_dependencies(
4832 FunctionalDependencies::new_from_constraints(
4833 Some(&Constraints::new_unverified(vec![Constraint::Unique(
4834 vec![0],
4835 )])),
4836 1,
4837 ),
4838 )
4839 .unwrap(),
4840 );
4841 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4842 table_name: TableReference::bare("tab"),
4843 source,
4844 projection: None,
4845 projected_schema: Arc::clone(&unique_schema),
4846 filters: vec![],
4847 fetch: None,
4848 }));
4849 let col = schema.field_names()[0].clone();
4850
4851 let filter =
4852 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4853 assert!(filter.is_scalar());
4854 }
4855
4856 #[test]
4857 fn test_transform_explain() {
4858 let schema = Schema::new(vec![
4859 Field::new("foo", DataType::Int32, false),
4860 Field::new("bar", DataType::Int32, false),
4861 ]);
4862
4863 let plan = table_scan(TableReference::none(), &schema, None)
4864 .unwrap()
4865 .explain(false, false)
4866 .unwrap()
4867 .build()
4868 .unwrap();
4869
4870 let external_filter = col("foo").eq(lit(true));
4871
4872 let plan = plan
4875 .transform(|plan| match plan {
4876 LogicalPlan::TableScan(table) => {
4877 let filter = Filter::try_new(
4878 external_filter.clone(),
4879 Arc::new(LogicalPlan::TableScan(table)),
4880 )
4881 .unwrap();
4882 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4883 }
4884 x => Ok(Transformed::no(x)),
4885 })
4886 .data()
4887 .unwrap();
4888
4889 let actual = format!("{}", plan.display_indent());
4890 assert_snapshot!(actual, @r"
4891 Explain
4892 Filter: foo = Boolean(true)
4893 TableScan: ?table?
4894 ")
4895 }
4896
4897 #[test]
4898 fn test_plan_partial_ord() {
4899 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
4900 produce_one_row: false,
4901 schema: Arc::new(DFSchema::empty()),
4902 });
4903
4904 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
4905 schema: Arc::new(Schema::new(vec![Field::new(
4906 "foo",
4907 DataType::Int32,
4908 false,
4909 )])),
4910 output_schema: DFSchemaRef::new(DFSchema::empty()),
4911 });
4912
4913 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
4914 schema: Arc::new(Schema::new(vec![Field::new(
4915 "foo",
4916 DataType::Int32,
4917 false,
4918 )])),
4919 output_schema: DFSchemaRef::new(DFSchema::empty()),
4920 });
4921
4922 assert_eq!(
4923 empty_relation.partial_cmp(&describe_table),
4924 Some(Ordering::Less)
4925 );
4926 assert_eq!(
4927 describe_table.partial_cmp(&empty_relation),
4928 Some(Ordering::Greater)
4929 );
4930 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
4931 }
4932
4933 #[test]
4934 fn test_limit_with_new_children() {
4935 let input = Arc::new(LogicalPlan::Values(Values {
4936 schema: Arc::new(DFSchema::empty()),
4937 values: vec![vec![]],
4938 }));
4939 let cases = [
4940 LogicalPlan::Limit(Limit {
4941 skip: None,
4942 fetch: None,
4943 input: Arc::clone(&input),
4944 }),
4945 LogicalPlan::Limit(Limit {
4946 skip: None,
4947 fetch: Some(Box::new(Expr::Literal(
4948 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4949 None,
4950 ))),
4951 input: Arc::clone(&input),
4952 }),
4953 LogicalPlan::Limit(Limit {
4954 skip: Some(Box::new(Expr::Literal(
4955 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4956 None,
4957 ))),
4958 fetch: None,
4959 input: Arc::clone(&input),
4960 }),
4961 LogicalPlan::Limit(Limit {
4962 skip: Some(Box::new(Expr::Literal(
4963 ScalarValue::new_one(&DataType::UInt32).unwrap(),
4964 None,
4965 ))),
4966 fetch: Some(Box::new(Expr::Literal(
4967 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4968 None,
4969 ))),
4970 input,
4971 }),
4972 ];
4973
4974 for limit in cases {
4975 let new_limit = limit
4976 .with_new_exprs(
4977 limit.expressions(),
4978 limit.inputs().into_iter().cloned().collect(),
4979 )
4980 .unwrap();
4981 assert_eq!(limit, new_limit);
4982 }
4983 }
4984
4985 #[test]
4986 fn test_with_subqueries_jump() {
4987 let subquery_schema =
4992 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
4993
4994 let subquery_plan =
4995 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
4996 .unwrap()
4997 .filter(col("sub_id").eq(lit(0)))
4998 .unwrap()
4999 .build()
5000 .unwrap();
5001
5002 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5003
5004 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5005 .unwrap()
5006 .filter(col("id").eq(lit(0)))
5007 .unwrap()
5008 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5009 .unwrap()
5010 .build()
5011 .unwrap();
5012
5013 let mut filter_found = false;
5014 plan.apply_with_subqueries(|plan| {
5015 match plan {
5016 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5017 LogicalPlan::Filter(..) => filter_found = true,
5018 _ => {}
5019 }
5020 Ok(TreeNodeRecursion::Continue)
5021 })
5022 .unwrap();
5023 assert!(!filter_found);
5024
5025 struct ProjectJumpVisitor {
5026 filter_found: bool,
5027 }
5028
5029 impl ProjectJumpVisitor {
5030 fn new() -> Self {
5031 Self {
5032 filter_found: false,
5033 }
5034 }
5035 }
5036
5037 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5038 type Node = LogicalPlan;
5039
5040 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5041 match node {
5042 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5043 LogicalPlan::Filter(..) => self.filter_found = true,
5044 _ => {}
5045 }
5046 Ok(TreeNodeRecursion::Continue)
5047 }
5048 }
5049
5050 let mut visitor = ProjectJumpVisitor::new();
5051 plan.visit_with_subqueries(&mut visitor).unwrap();
5052 assert!(!visitor.filter_found);
5053
5054 let mut filter_found = false;
5055 plan.clone()
5056 .transform_down_with_subqueries(|plan| {
5057 match plan {
5058 LogicalPlan::Projection(..) => {
5059 return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
5060 }
5061 LogicalPlan::Filter(..) => filter_found = true,
5062 _ => {}
5063 }
5064 Ok(Transformed::no(plan))
5065 })
5066 .unwrap();
5067 assert!(!filter_found);
5068
5069 let mut filter_found = false;
5070 plan.clone()
5071 .transform_down_up_with_subqueries(
5072 |plan| {
5073 match plan {
5074 LogicalPlan::Projection(..) => {
5075 return Ok(Transformed::new(
5076 plan,
5077 false,
5078 TreeNodeRecursion::Jump,
5079 ))
5080 }
5081 LogicalPlan::Filter(..) => filter_found = true,
5082 _ => {}
5083 }
5084 Ok(Transformed::no(plan))
5085 },
5086 |plan| Ok(Transformed::no(plan)),
5087 )
5088 .unwrap();
5089 assert!(!filter_found);
5090
5091 struct ProjectJumpRewriter {
5092 filter_found: bool,
5093 }
5094
5095 impl ProjectJumpRewriter {
5096 fn new() -> Self {
5097 Self {
5098 filter_found: false,
5099 }
5100 }
5101 }
5102
5103 impl TreeNodeRewriter for ProjectJumpRewriter {
5104 type Node = LogicalPlan;
5105
5106 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5107 match node {
5108 LogicalPlan::Projection(..) => {
5109 return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
5110 }
5111 LogicalPlan::Filter(..) => self.filter_found = true,
5112 _ => {}
5113 }
5114 Ok(Transformed::no(node))
5115 }
5116 }
5117
5118 let mut rewriter = ProjectJumpRewriter::new();
5119 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5120 assert!(!rewriter.filter_found);
5121 }
5122
5123 #[test]
5124 fn test_with_unresolved_placeholders() {
5125 let field_name = "id";
5126 let placeholder_value = "$1";
5127 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5128
5129 let plan = table_scan(TableReference::none(), &schema, None)
5130 .unwrap()
5131 .filter(col(field_name).eq(placeholder(placeholder_value)))
5132 .unwrap()
5133 .build()
5134 .unwrap();
5135
5136 let params = plan.get_parameter_types().unwrap();
5138 assert_eq!(params.len(), 1);
5139
5140 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5141 assert_eq!(parameter_type, None);
5142 }
5143
5144 #[test]
5145 fn test_join_with_new_exprs() -> Result<()> {
5146 fn create_test_join(
5147 on: Vec<(Expr, Expr)>,
5148 filter: Option<Expr>,
5149 ) -> Result<LogicalPlan> {
5150 let schema = Schema::new(vec![
5151 Field::new("a", DataType::Int32, false),
5152 Field::new("b", DataType::Int32, false),
5153 ]);
5154
5155 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5156 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5157
5158 Ok(LogicalPlan::Join(Join {
5159 left: Arc::new(
5160 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5161 ),
5162 right: Arc::new(
5163 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5164 ),
5165 on,
5166 filter,
5167 join_type: JoinType::Inner,
5168 join_constraint: JoinConstraint::On,
5169 schema: Arc::new(left_schema.join(&right_schema)?),
5170 null_equality: NullEquality::NullEqualsNothing,
5171 }))
5172 }
5173
5174 {
5175 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5176 let LogicalPlan::Join(join) = join.with_new_exprs(
5177 join.expressions(),
5178 join.inputs().into_iter().cloned().collect(),
5179 )?
5180 else {
5181 unreachable!()
5182 };
5183 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5184 assert_eq!(join.filter, None);
5185 }
5186
5187 {
5188 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5189 let LogicalPlan::Join(join) = join.with_new_exprs(
5190 join.expressions(),
5191 join.inputs().into_iter().cloned().collect(),
5192 )?
5193 else {
5194 unreachable!()
5195 };
5196 assert_eq!(join.on, vec![]);
5197 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5198 }
5199
5200 {
5201 let join = create_test_join(
5202 vec![(col("t1.a"), (col("t2.a")))],
5203 Some(col("t1.b").gt(col("t2.b"))),
5204 )?;
5205 let LogicalPlan::Join(join) = join.with_new_exprs(
5206 join.expressions(),
5207 join.inputs().into_iter().cloned().collect(),
5208 )?
5209 else {
5210 unreachable!()
5211 };
5212 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5213 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5214 }
5215
5216 {
5217 let join = create_test_join(
5218 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5219 None,
5220 )?;
5221 let LogicalPlan::Join(join) = join.with_new_exprs(
5222 vec![
5223 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5224 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5225 col("t1.b"),
5226 col("t2.b"),
5227 lit(true),
5228 ],
5229 join.inputs().into_iter().cloned().collect(),
5230 )?
5231 else {
5232 unreachable!()
5233 };
5234 assert_eq!(
5235 join.on,
5236 vec![
5237 (
5238 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5239 binary_expr(col("t2.a"), Operator::Plus, lit(2))
5240 ),
5241 (col("t1.b"), (col("t2.b")))
5242 ]
5243 );
5244 assert_eq!(join.filter, Some(lit(true)));
5245 }
5246
5247 Ok(())
5248 }
5249
5250 #[test]
5251 fn test_join_try_new() -> Result<()> {
5252 let schema = Schema::new(vec![
5253 Field::new("a", DataType::Int32, false),
5254 Field::new("b", DataType::Int32, false),
5255 ]);
5256
5257 let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5258
5259 let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5260
5261 let join_types = vec![
5262 JoinType::Inner,
5263 JoinType::Left,
5264 JoinType::Right,
5265 JoinType::Full,
5266 JoinType::LeftSemi,
5267 JoinType::LeftAnti,
5268 JoinType::RightSemi,
5269 JoinType::RightAnti,
5270 JoinType::LeftMark,
5271 ];
5272
5273 for join_type in join_types {
5274 let join = Join::try_new(
5275 Arc::new(left_scan.clone()),
5276 Arc::new(right_scan.clone()),
5277 vec![(col("t1.a"), col("t2.a"))],
5278 Some(col("t1.b").gt(col("t2.b"))),
5279 join_type,
5280 JoinConstraint::On,
5281 NullEquality::NullEqualsNothing,
5282 )?;
5283
5284 match join_type {
5285 JoinType::LeftSemi | JoinType::LeftAnti => {
5286 assert_eq!(join.schema.fields().len(), 2);
5287
5288 let fields = join.schema.fields();
5289 assert_eq!(
5290 fields[0].name(),
5291 "a",
5292 "First field should be 'a' from left table"
5293 );
5294 assert_eq!(
5295 fields[1].name(),
5296 "b",
5297 "Second field should be 'b' from left table"
5298 );
5299 }
5300 JoinType::RightSemi | JoinType::RightAnti => {
5301 assert_eq!(join.schema.fields().len(), 2);
5302
5303 let fields = join.schema.fields();
5304 assert_eq!(
5305 fields[0].name(),
5306 "a",
5307 "First field should be 'a' from right table"
5308 );
5309 assert_eq!(
5310 fields[1].name(),
5311 "b",
5312 "Second field should be 'b' from right table"
5313 );
5314 }
5315 JoinType::LeftMark => {
5316 assert_eq!(join.schema.fields().len(), 3);
5317
5318 let fields = join.schema.fields();
5319 assert_eq!(
5320 fields[0].name(),
5321 "a",
5322 "First field should be 'a' from left table"
5323 );
5324 assert_eq!(
5325 fields[1].name(),
5326 "b",
5327 "Second field should be 'b' from left table"
5328 );
5329 assert_eq!(
5330 fields[2].name(),
5331 "mark",
5332 "Third field should be the mark column"
5333 );
5334
5335 assert!(!fields[0].is_nullable());
5336 assert!(!fields[1].is_nullable());
5337 assert!(!fields[2].is_nullable());
5338 }
5339 _ => {
5340 assert_eq!(join.schema.fields().len(), 4);
5341
5342 let fields = join.schema.fields();
5343 assert_eq!(
5344 fields[0].name(),
5345 "a",
5346 "First field should be 'a' from left table"
5347 );
5348 assert_eq!(
5349 fields[1].name(),
5350 "b",
5351 "Second field should be 'b' from left table"
5352 );
5353 assert_eq!(
5354 fields[2].name(),
5355 "a",
5356 "Third field should be 'a' from right table"
5357 );
5358 assert_eq!(
5359 fields[3].name(),
5360 "b",
5361 "Fourth field should be 'b' from right table"
5362 );
5363
5364 if join_type == JoinType::Left {
5365 assert!(!fields[0].is_nullable());
5367 assert!(!fields[1].is_nullable());
5368 assert!(fields[2].is_nullable());
5370 assert!(fields[3].is_nullable());
5371 } else if join_type == JoinType::Right {
5372 assert!(fields[0].is_nullable());
5374 assert!(fields[1].is_nullable());
5375 assert!(!fields[2].is_nullable());
5377 assert!(!fields[3].is_nullable());
5378 } else if join_type == JoinType::Full {
5379 assert!(fields[0].is_nullable());
5380 assert!(fields[1].is_nullable());
5381 assert!(fields[2].is_nullable());
5382 assert!(fields[3].is_nullable());
5383 }
5384 }
5385 }
5386
5387 assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5388 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5389 assert_eq!(join.join_type, join_type);
5390 assert_eq!(join.join_constraint, JoinConstraint::On);
5391 assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5392 }
5393
5394 Ok(())
5395 }
5396
5397 #[test]
5398 fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5399 let left_schema = Schema::new(vec![
5400 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), ]);
5404
5405 let right_schema = Schema::new(vec![
5406 Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), Field::new("value", DataType::Float64, true), ]);
5410
5411 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5412
5413 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5414
5415 {
5417 let join = Join::try_new(
5420 Arc::new(left_plan.clone()),
5421 Arc::new(right_plan.clone()),
5422 vec![(col("t1.id"), col("t2.id"))],
5423 None,
5424 JoinType::Inner,
5425 JoinConstraint::Using,
5426 NullEquality::NullEqualsNothing,
5427 )?;
5428
5429 let fields = join.schema.fields();
5430
5431 assert_eq!(fields.len(), 6);
5432
5433 assert_eq!(
5434 fields[0].name(),
5435 "id",
5436 "First field should be 'id' from left table"
5437 );
5438 assert_eq!(
5439 fields[1].name(),
5440 "name",
5441 "Second field should be 'name' from left table"
5442 );
5443 assert_eq!(
5444 fields[2].name(),
5445 "value",
5446 "Third field should be 'value' from left table"
5447 );
5448 assert_eq!(
5449 fields[3].name(),
5450 "id",
5451 "Fourth field should be 'id' from right table"
5452 );
5453 assert_eq!(
5454 fields[4].name(),
5455 "category",
5456 "Fifth field should be 'category' from right table"
5457 );
5458 assert_eq!(
5459 fields[5].name(),
5460 "value",
5461 "Sixth field should be 'value' from right table"
5462 );
5463
5464 assert_eq!(join.join_constraint, JoinConstraint::Using);
5465 }
5466
5467 {
5469 let join = Join::try_new(
5471 Arc::new(left_plan.clone()),
5472 Arc::new(right_plan.clone()),
5473 vec![(col("t1.id"), col("t2.id"))], Some(col("t1.value").lt(col("t2.value"))), JoinType::Inner,
5476 JoinConstraint::On,
5477 NullEquality::NullEqualsNothing,
5478 )?;
5479
5480 let fields = join.schema.fields();
5481 assert_eq!(fields.len(), 6);
5482
5483 assert_eq!(
5484 fields[0].name(),
5485 "id",
5486 "First field should be 'id' from left table"
5487 );
5488 assert_eq!(
5489 fields[1].name(),
5490 "name",
5491 "Second field should be 'name' from left table"
5492 );
5493 assert_eq!(
5494 fields[2].name(),
5495 "value",
5496 "Third field should be 'value' from left table"
5497 );
5498 assert_eq!(
5499 fields[3].name(),
5500 "id",
5501 "Fourth field should be 'id' from right table"
5502 );
5503 assert_eq!(
5504 fields[4].name(),
5505 "category",
5506 "Fifth field should be 'category' from right table"
5507 );
5508 assert_eq!(
5509 fields[5].name(),
5510 "value",
5511 "Sixth field should be 'value' from right table"
5512 );
5513
5514 assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5515 }
5516
5517 {
5519 let join = Join::try_new(
5520 Arc::new(left_plan.clone()),
5521 Arc::new(right_plan.clone()),
5522 vec![(col("t1.id"), col("t2.id"))],
5523 None,
5524 JoinType::Inner,
5525 JoinConstraint::On,
5526 NullEquality::NullEqualsNull,
5527 )?;
5528
5529 assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5530 }
5531
5532 Ok(())
5533 }
5534
5535 #[test]
5536 fn test_join_try_new_schema_validation() -> Result<()> {
5537 let left_schema = Schema::new(vec![
5538 Field::new("id", DataType::Int32, false),
5539 Field::new("name", DataType::Utf8, false),
5540 Field::new("value", DataType::Float64, true),
5541 ]);
5542
5543 let right_schema = Schema::new(vec![
5544 Field::new("id", DataType::Int32, false),
5545 Field::new("category", DataType::Utf8, true),
5546 Field::new("code", DataType::Int16, false),
5547 ]);
5548
5549 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5550
5551 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5552
5553 let join_types = vec![
5554 JoinType::Inner,
5555 JoinType::Left,
5556 JoinType::Right,
5557 JoinType::Full,
5558 ];
5559
5560 for join_type in join_types {
5561 let join = Join::try_new(
5562 Arc::new(left_plan.clone()),
5563 Arc::new(right_plan.clone()),
5564 vec![(col("t1.id"), col("t2.id"))],
5565 Some(col("t1.value").gt(lit(5.0))),
5566 join_type,
5567 JoinConstraint::On,
5568 NullEquality::NullEqualsNothing,
5569 )?;
5570
5571 let fields = join.schema.fields();
5572 assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type:?} join");
5573
5574 for (i, field) in fields.iter().enumerate() {
5575 let expected_nullable = match (i, &join_type) {
5576 (0, JoinType::Right | JoinType::Full) => true, (1, JoinType::Right | JoinType::Full) => true, (2, _) => true, (3, JoinType::Left | JoinType::Full) => true, (4, _) => true, (5, JoinType::Left | JoinType::Full) => true, _ => false,
5587 };
5588
5589 assert_eq!(
5590 field.is_nullable(),
5591 expected_nullable,
5592 "Field {} ({}) nullability incorrect for {:?} join",
5593 i,
5594 field.name(),
5595 join_type
5596 );
5597 }
5598 }
5599
5600 let using_join = Join::try_new(
5601 Arc::new(left_plan.clone()),
5602 Arc::new(right_plan.clone()),
5603 vec![(col("t1.id"), col("t2.id"))],
5604 None,
5605 JoinType::Inner,
5606 JoinConstraint::Using,
5607 NullEquality::NullEqualsNothing,
5608 )?;
5609
5610 assert_eq!(
5611 using_join.schema.fields().len(),
5612 6,
5613 "USING join should have all fields"
5614 );
5615 assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5616
5617 Ok(())
5618 }
5619}