1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::str::FromStr;
25use std::sync::{Arc, LazyLock};
26
27use super::dml::CopyTo;
28use super::invariants::{
29 assert_always_invariants_at_current_node, assert_executable_invariants,
30 InvariantLevel,
31};
32use super::DdlStatement;
33use crate::builder::{change_redundant_column, unnest_with_options};
34use crate::expr::{
35 intersect_metadata_for_union, Placeholder, Sort as SortExpr, WindowFunction,
36 WindowFunctionParams,
37};
38use crate::expr_rewriter::{
39 create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
40};
41use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
42use crate::logical_plan::extension::UserDefinedLogicalNode;
43use crate::logical_plan::{DmlStatement, Statement};
44use crate::utils::{
45 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
46 grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
47};
48use crate::{
49 build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
50 CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
51 Operator, Prepare, TableProviderFilterPushDown, TableSource,
52 WindowFunctionDefinition,
53};
54
55use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
56use datafusion_common::cse::{NormalizeEq, Normalizeable};
57use datafusion_common::tree_node::{
58 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61 aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
62 DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
63 FunctionalDependencies, NullEquality, ParamValues, Result, ScalarValue, Spans,
64 TableReference, UnnestOptions,
65};
66use indexmap::IndexSet;
67
68use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
207pub enum LogicalPlan {
208 Projection(Projection),
211 Filter(Filter),
220 Window(Window),
226 Aggregate(Aggregate),
232 Sort(Sort),
235 Join(Join),
238 Repartition(Repartition),
242 Union(Union),
246 TableScan(TableScan),
249 EmptyRelation(EmptyRelation),
253 Subquery(Subquery),
256 SubqueryAlias(SubqueryAlias),
258 Limit(Limit),
260 Statement(Statement),
262 Values(Values),
267 Explain(Explain),
270 Analyze(Analyze),
274 Extension(Extension),
277 Distinct(Distinct),
280 Dml(DmlStatement),
282 Ddl(DdlStatement),
284 Copy(CopyTo),
286 DescribeTable(DescribeTable),
289 Unnest(Unnest),
292 RecursiveQuery(RecursiveQuery),
294}
295
296impl Default for LogicalPlan {
297 fn default() -> Self {
298 LogicalPlan::EmptyRelation(EmptyRelation {
299 produce_one_row: false,
300 schema: Arc::new(DFSchema::empty()),
301 })
302 }
303}
304
305impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
306 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
307 &'a self,
308 mut f: F,
309 ) -> Result<TreeNodeRecursion> {
310 f(self)
311 }
312
313 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
314 self,
315 mut f: F,
316 ) -> Result<Transformed<Self>> {
317 f(self)
318 }
319}
320
321impl LogicalPlan {
322 pub fn schema(&self) -> &DFSchemaRef {
324 match self {
325 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
326 LogicalPlan::Values(Values { schema, .. }) => schema,
327 LogicalPlan::TableScan(TableScan {
328 projected_schema, ..
329 }) => projected_schema,
330 LogicalPlan::Projection(Projection { schema, .. }) => schema,
331 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
332 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
333 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
334 LogicalPlan::Window(Window { schema, .. }) => schema,
335 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
336 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
337 LogicalPlan::Join(Join { schema, .. }) => schema,
338 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
339 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
340 LogicalPlan::Statement(statement) => statement.schema(),
341 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
342 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
343 LogicalPlan::Explain(explain) => &explain.schema,
344 LogicalPlan::Analyze(analyze) => &analyze.schema,
345 LogicalPlan::Extension(extension) => extension.node.schema(),
346 LogicalPlan::Union(Union { schema, .. }) => schema,
347 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
348 output_schema
349 }
350 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
351 LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
352 LogicalPlan::Ddl(ddl) => ddl.schema(),
353 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
354 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
355 static_term.schema()
357 }
358 }
359 }
360
361 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
364 match self {
365 LogicalPlan::Window(_)
366 | LogicalPlan::Projection(_)
367 | LogicalPlan::Aggregate(_)
368 | LogicalPlan::Unnest(_)
369 | LogicalPlan::Join(_) => self
370 .inputs()
371 .iter()
372 .map(|input| input.schema().as_ref())
373 .collect(),
374 _ => vec![],
375 }
376 }
377
378 pub fn explain_schema() -> SchemaRef {
380 SchemaRef::new(Schema::new(vec![
381 Field::new("plan_type", DataType::Utf8, false),
382 Field::new("plan", DataType::Utf8, false),
383 ]))
384 }
385
386 pub fn describe_schema() -> Schema {
388 Schema::new(vec![
389 Field::new("column_name", DataType::Utf8, false),
390 Field::new("data_type", DataType::Utf8, false),
391 Field::new("is_nullable", DataType::Utf8, false),
392 ])
393 }
394
395 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
412 let mut exprs = vec![];
413 self.apply_expressions(|e| {
414 exprs.push(e.clone());
415 Ok(TreeNodeRecursion::Continue)
416 })
417 .unwrap();
419 exprs
420 }
421
422 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
425 let mut exprs = vec![];
426 self.apply_expressions(|e| {
427 find_out_reference_exprs(e).into_iter().for_each(|e| {
428 if !exprs.contains(&e) {
429 exprs.push(e)
430 }
431 });
432 Ok(TreeNodeRecursion::Continue)
433 })
434 .unwrap();
436 self.inputs()
437 .into_iter()
438 .flat_map(|child| child.all_out_ref_exprs())
439 .for_each(|e| {
440 if !exprs.contains(&e) {
441 exprs.push(e)
442 }
443 });
444 exprs
445 }
446
447 pub fn inputs(&self) -> Vec<&LogicalPlan> {
451 match self {
452 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
453 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
454 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
455 LogicalPlan::Window(Window { input, .. }) => vec![input],
456 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
457 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
458 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
459 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
460 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
461 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
462 LogicalPlan::Extension(extension) => extension.node.inputs(),
463 LogicalPlan::Union(Union { inputs, .. }) => {
464 inputs.iter().map(|arc| arc.as_ref()).collect()
465 }
466 LogicalPlan::Distinct(
467 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
468 ) => vec![input],
469 LogicalPlan::Explain(explain) => vec![&explain.plan],
470 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
471 LogicalPlan::Dml(write) => vec![&write.input],
472 LogicalPlan::Copy(copy) => vec![©.input],
473 LogicalPlan::Ddl(ddl) => ddl.inputs(),
474 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
475 LogicalPlan::RecursiveQuery(RecursiveQuery {
476 static_term,
477 recursive_term,
478 ..
479 }) => vec![static_term, recursive_term],
480 LogicalPlan::Statement(stmt) => stmt.inputs(),
481 LogicalPlan::TableScan { .. }
483 | LogicalPlan::EmptyRelation { .. }
484 | LogicalPlan::Values { .. }
485 | LogicalPlan::DescribeTable(_) => vec![],
486 }
487 }
488
489 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
491 let mut using_columns: Vec<HashSet<Column>> = vec![];
492
493 self.apply_with_subqueries(|plan| {
494 if let LogicalPlan::Join(Join {
495 join_constraint: JoinConstraint::Using,
496 on,
497 ..
498 }) = plan
499 {
500 let columns =
502 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
503 let Some(l) = l.get_as_join_column() else {
504 return internal_err!(
505 "Invalid join key. Expected column, found {l:?}"
506 );
507 };
508 let Some(r) = r.get_as_join_column() else {
509 return internal_err!(
510 "Invalid join key. Expected column, found {r:?}"
511 );
512 };
513 accumu.insert(l.to_owned());
514 accumu.insert(r.to_owned());
515 Result::<_, DataFusionError>::Ok(accumu)
516 })?;
517 using_columns.push(columns);
518 }
519 Ok(TreeNodeRecursion::Continue)
520 })?;
521
522 Ok(using_columns)
523 }
524
525 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
527 match self {
528 LogicalPlan::Projection(projection) => {
529 Ok(Some(projection.expr.as_slice()[0].clone()))
530 }
531 LogicalPlan::Aggregate(agg) => {
532 if agg.group_expr.is_empty() {
533 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
534 } else {
535 Ok(Some(agg.group_expr.as_slice()[0].clone()))
536 }
537 }
538 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
539 Ok(Some(select_expr[0].clone()))
540 }
541 LogicalPlan::Filter(Filter { input, .. })
542 | LogicalPlan::Distinct(Distinct::All(input))
543 | LogicalPlan::Sort(Sort { input, .. })
544 | LogicalPlan::Limit(Limit { input, .. })
545 | LogicalPlan::Repartition(Repartition { input, .. })
546 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
547 LogicalPlan::Join(Join {
548 left,
549 right,
550 join_type,
551 ..
552 }) => match join_type {
553 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
554 if left.schema().fields().is_empty() {
555 right.head_output_expr()
556 } else {
557 left.head_output_expr()
558 }
559 }
560 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
561 left.head_output_expr()
562 }
563 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
564 right.head_output_expr()
565 }
566 },
567 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
568 static_term.head_output_expr()
569 }
570 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
571 union.schema.qualified_field(0),
572 )))),
573 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
574 table.projected_schema.qualified_field(0),
575 )))),
576 LogicalPlan::SubqueryAlias(subquery_alias) => {
577 let expr_opt = subquery_alias.input.head_output_expr()?;
578 expr_opt
579 .map(|expr| {
580 Ok(Expr::Column(create_col_from_scalar_expr(
581 &expr,
582 subquery_alias.alias.to_string(),
583 )?))
584 })
585 .map_or(Ok(None), |v| v.map(Some))
586 }
587 LogicalPlan::Subquery(_) => Ok(None),
588 LogicalPlan::EmptyRelation(_)
589 | LogicalPlan::Statement(_)
590 | LogicalPlan::Values(_)
591 | LogicalPlan::Explain(_)
592 | LogicalPlan::Analyze(_)
593 | LogicalPlan::Extension(_)
594 | LogicalPlan::Dml(_)
595 | LogicalPlan::Copy(_)
596 | LogicalPlan::Ddl(_)
597 | LogicalPlan::DescribeTable(_)
598 | LogicalPlan::Unnest(_) => Ok(None),
599 }
600 }
601
602 pub fn recompute_schema(self) -> Result<Self> {
625 match self {
626 LogicalPlan::Projection(Projection {
629 expr,
630 input,
631 schema: _,
632 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
633 LogicalPlan::Dml(_) => Ok(self),
634 LogicalPlan::Copy(_) => Ok(self),
635 LogicalPlan::Values(Values { schema, values }) => {
636 Ok(LogicalPlan::Values(Values { schema, values }))
638 }
639 LogicalPlan::Filter(Filter { predicate, input }) => {
640 Filter::try_new(predicate, input).map(LogicalPlan::Filter)
641 }
642 LogicalPlan::Repartition(_) => Ok(self),
643 LogicalPlan::Window(Window {
644 input,
645 window_expr,
646 schema: _,
647 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
648 LogicalPlan::Aggregate(Aggregate {
649 input,
650 group_expr,
651 aggr_expr,
652 schema: _,
653 }) => Aggregate::try_new(input, group_expr, aggr_expr)
654 .map(LogicalPlan::Aggregate),
655 LogicalPlan::Sort(_) => Ok(self),
656 LogicalPlan::Join(Join {
657 left,
658 right,
659 filter,
660 join_type,
661 join_constraint,
662 on,
663 schema: _,
664 null_equality,
665 }) => {
666 let schema =
667 build_join_schema(left.schema(), right.schema(), &join_type)?;
668
669 let new_on: Vec<_> = on
670 .into_iter()
671 .map(|equi_expr| {
672 (equi_expr.0.unalias(), equi_expr.1.unalias())
674 })
675 .collect();
676
677 Ok(LogicalPlan::Join(Join {
678 left,
679 right,
680 join_type,
681 join_constraint,
682 on: new_on,
683 filter,
684 schema: DFSchemaRef::new(schema),
685 null_equality,
686 }))
687 }
688 LogicalPlan::Subquery(_) => Ok(self),
689 LogicalPlan::SubqueryAlias(SubqueryAlias {
690 input,
691 alias,
692 schema: _,
693 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
694 LogicalPlan::Limit(_) => Ok(self),
695 LogicalPlan::Ddl(_) => Ok(self),
696 LogicalPlan::Extension(Extension { node }) => {
697 let expr = node.expressions();
700 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
701 Ok(LogicalPlan::Extension(Extension {
702 node: node.with_exprs_and_inputs(expr, inputs)?,
703 }))
704 }
705 LogicalPlan::Union(Union { inputs, schema }) => {
706 let first_input_schema = inputs[0].schema();
707 if schema.fields().len() == first_input_schema.fields().len() {
708 Ok(LogicalPlan::Union(Union { inputs, schema }))
710 } else {
711 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
719 }
720 }
721 LogicalPlan::Distinct(distinct) => {
722 let distinct = match distinct {
723 Distinct::All(input) => Distinct::All(input),
724 Distinct::On(DistinctOn {
725 on_expr,
726 select_expr,
727 sort_expr,
728 input,
729 schema: _,
730 }) => Distinct::On(DistinctOn::try_new(
731 on_expr,
732 select_expr,
733 sort_expr,
734 input,
735 )?),
736 };
737 Ok(LogicalPlan::Distinct(distinct))
738 }
739 LogicalPlan::RecursiveQuery(_) => Ok(self),
740 LogicalPlan::Analyze(_) => Ok(self),
741 LogicalPlan::Explain(_) => Ok(self),
742 LogicalPlan::TableScan(_) => Ok(self),
743 LogicalPlan::EmptyRelation(_) => Ok(self),
744 LogicalPlan::Statement(_) => Ok(self),
745 LogicalPlan::DescribeTable(_) => Ok(self),
746 LogicalPlan::Unnest(Unnest {
747 input,
748 exec_columns,
749 options,
750 ..
751 }) => {
752 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
754 }
755 }
756 }
757
758 pub fn with_new_exprs(
784 &self,
785 mut expr: Vec<Expr>,
786 inputs: Vec<LogicalPlan>,
787 ) -> Result<LogicalPlan> {
788 match self {
789 LogicalPlan::Projection(Projection { .. }) => {
792 let input = self.only_input(inputs)?;
793 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
794 }
795 LogicalPlan::Dml(DmlStatement {
796 table_name,
797 target,
798 op,
799 ..
800 }) => {
801 self.assert_no_expressions(expr)?;
802 let input = self.only_input(inputs)?;
803 Ok(LogicalPlan::Dml(DmlStatement::new(
804 table_name.clone(),
805 Arc::clone(target),
806 op.clone(),
807 Arc::new(input),
808 )))
809 }
810 LogicalPlan::Copy(CopyTo {
811 input: _,
812 output_url,
813 file_type,
814 options,
815 partition_by,
816 output_schema: _,
817 }) => {
818 self.assert_no_expressions(expr)?;
819 let input = self.only_input(inputs)?;
820 Ok(LogicalPlan::Copy(CopyTo::new(
821 Arc::new(input),
822 output_url.clone(),
823 partition_by.clone(),
824 Arc::clone(file_type),
825 options.clone(),
826 )))
827 }
828 LogicalPlan::Values(Values { schema, .. }) => {
829 self.assert_no_inputs(inputs)?;
830 Ok(LogicalPlan::Values(Values {
831 schema: Arc::clone(schema),
832 values: expr
833 .chunks_exact(schema.fields().len())
834 .map(|s| s.to_vec())
835 .collect(),
836 }))
837 }
838 LogicalPlan::Filter { .. } => {
839 let predicate = self.only_expr(expr)?;
840 let input = self.only_input(inputs)?;
841
842 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
843 }
844 LogicalPlan::Repartition(Repartition {
845 partitioning_scheme,
846 ..
847 }) => match partitioning_scheme {
848 Partitioning::RoundRobinBatch(n) => {
849 self.assert_no_expressions(expr)?;
850 let input = self.only_input(inputs)?;
851 Ok(LogicalPlan::Repartition(Repartition {
852 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
853 input: Arc::new(input),
854 }))
855 }
856 Partitioning::Hash(_, n) => {
857 let input = self.only_input(inputs)?;
858 Ok(LogicalPlan::Repartition(Repartition {
859 partitioning_scheme: Partitioning::Hash(expr, *n),
860 input: Arc::new(input),
861 }))
862 }
863 Partitioning::DistributeBy(_) => {
864 let input = self.only_input(inputs)?;
865 Ok(LogicalPlan::Repartition(Repartition {
866 partitioning_scheme: Partitioning::DistributeBy(expr),
867 input: Arc::new(input),
868 }))
869 }
870 },
871 LogicalPlan::Window(Window { window_expr, .. }) => {
872 assert_eq!(window_expr.len(), expr.len());
873 let input = self.only_input(inputs)?;
874 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
875 }
876 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
877 let input = self.only_input(inputs)?;
878 let agg_expr = expr.split_off(group_expr.len());
880
881 Aggregate::try_new(Arc::new(input), expr, agg_expr)
882 .map(LogicalPlan::Aggregate)
883 }
884 LogicalPlan::Sort(Sort {
885 expr: sort_expr,
886 fetch,
887 ..
888 }) => {
889 let input = self.only_input(inputs)?;
890 Ok(LogicalPlan::Sort(Sort {
891 expr: expr
892 .into_iter()
893 .zip(sort_expr.iter())
894 .map(|(expr, sort)| sort.with_expr(expr))
895 .collect(),
896 input: Arc::new(input),
897 fetch: *fetch,
898 }))
899 }
900 LogicalPlan::Join(Join {
901 join_type,
902 join_constraint,
903 on,
904 null_equality,
905 ..
906 }) => {
907 let (left, right) = self.only_two_inputs(inputs)?;
908 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
909
910 let equi_expr_count = on.len() * 2;
911 assert!(expr.len() >= equi_expr_count);
912
913 let filter_expr = if expr.len() > equi_expr_count {
916 expr.pop()
917 } else {
918 None
919 };
920
921 assert_eq!(expr.len(), equi_expr_count);
924 let mut new_on = Vec::with_capacity(on.len());
925 let mut iter = expr.into_iter();
926 while let Some(left) = iter.next() {
927 let Some(right) = iter.next() else {
928 internal_err!("Expected a pair of expressions to construct the join on expression")?
929 };
930
931 new_on.push((left.unalias(), right.unalias()));
933 }
934
935 Ok(LogicalPlan::Join(Join {
936 left: Arc::new(left),
937 right: Arc::new(right),
938 join_type: *join_type,
939 join_constraint: *join_constraint,
940 on: new_on,
941 filter: filter_expr,
942 schema: DFSchemaRef::new(schema),
943 null_equality: *null_equality,
944 }))
945 }
946 LogicalPlan::Subquery(Subquery {
947 outer_ref_columns,
948 spans,
949 ..
950 }) => {
951 self.assert_no_expressions(expr)?;
952 let input = self.only_input(inputs)?;
953 let subquery = LogicalPlanBuilder::from(input).build()?;
954 Ok(LogicalPlan::Subquery(Subquery {
955 subquery: Arc::new(subquery),
956 outer_ref_columns: outer_ref_columns.clone(),
957 spans: spans.clone(),
958 }))
959 }
960 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
961 self.assert_no_expressions(expr)?;
962 let input = self.only_input(inputs)?;
963 SubqueryAlias::try_new(Arc::new(input), alias.clone())
964 .map(LogicalPlan::SubqueryAlias)
965 }
966 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
967 let old_expr_len = skip.iter().chain(fetch.iter()).count();
968 if old_expr_len != expr.len() {
969 return internal_err!(
970 "Invalid number of new Limit expressions: expected {}, got {}",
971 old_expr_len,
972 expr.len()
973 );
974 }
975 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
977 let new_skip = skip.as_ref().and_then(|_| expr.pop());
978 let input = self.only_input(inputs)?;
979 Ok(LogicalPlan::Limit(Limit {
980 skip: new_skip.map(Box::new),
981 fetch: new_fetch.map(Box::new),
982 input: Arc::new(input),
983 }))
984 }
985 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
986 name,
987 if_not_exists,
988 or_replace,
989 column_defaults,
990 temporary,
991 ..
992 })) => {
993 self.assert_no_expressions(expr)?;
994 let input = self.only_input(inputs)?;
995 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
996 CreateMemoryTable {
997 input: Arc::new(input),
998 constraints: Constraints::default(),
999 name: name.clone(),
1000 if_not_exists: *if_not_exists,
1001 or_replace: *or_replace,
1002 column_defaults: column_defaults.clone(),
1003 temporary: *temporary,
1004 },
1005 )))
1006 }
1007 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1008 name,
1009 or_replace,
1010 definition,
1011 temporary,
1012 ..
1013 })) => {
1014 self.assert_no_expressions(expr)?;
1015 let input = self.only_input(inputs)?;
1016 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1017 input: Arc::new(input),
1018 name: name.clone(),
1019 or_replace: *or_replace,
1020 temporary: *temporary,
1021 definition: definition.clone(),
1022 })))
1023 }
1024 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1025 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1026 })),
1027 LogicalPlan::Union(Union { schema, .. }) => {
1028 self.assert_no_expressions(expr)?;
1029 let input_schema = inputs[0].schema();
1030 let schema = if schema.fields().len() == input_schema.fields().len() {
1032 Arc::clone(schema)
1033 } else {
1034 Arc::clone(input_schema)
1035 };
1036 Ok(LogicalPlan::Union(Union {
1037 inputs: inputs.into_iter().map(Arc::new).collect(),
1038 schema,
1039 }))
1040 }
1041 LogicalPlan::Distinct(distinct) => {
1042 let distinct = match distinct {
1043 Distinct::All(_) => {
1044 self.assert_no_expressions(expr)?;
1045 let input = self.only_input(inputs)?;
1046 Distinct::All(Arc::new(input))
1047 }
1048 Distinct::On(DistinctOn {
1049 on_expr,
1050 select_expr,
1051 ..
1052 }) => {
1053 let input = self.only_input(inputs)?;
1054 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1055 let select_expr = expr.split_off(on_expr.len());
1056 assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1057 Distinct::On(DistinctOn::try_new(
1058 expr,
1059 select_expr,
1060 None, Arc::new(input),
1062 )?)
1063 }
1064 };
1065 Ok(LogicalPlan::Distinct(distinct))
1066 }
1067 LogicalPlan::RecursiveQuery(RecursiveQuery {
1068 name, is_distinct, ..
1069 }) => {
1070 self.assert_no_expressions(expr)?;
1071 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1072 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1073 name: name.clone(),
1074 static_term: Arc::new(static_term),
1075 recursive_term: Arc::new(recursive_term),
1076 is_distinct: *is_distinct,
1077 }))
1078 }
1079 LogicalPlan::Analyze(a) => {
1080 self.assert_no_expressions(expr)?;
1081 let input = self.only_input(inputs)?;
1082 Ok(LogicalPlan::Analyze(Analyze {
1083 verbose: a.verbose,
1084 schema: Arc::clone(&a.schema),
1085 input: Arc::new(input),
1086 }))
1087 }
1088 LogicalPlan::Explain(e) => {
1089 self.assert_no_expressions(expr)?;
1090 let input = self.only_input(inputs)?;
1091 Ok(LogicalPlan::Explain(Explain {
1092 verbose: e.verbose,
1093 plan: Arc::new(input),
1094 explain_format: e.explain_format.clone(),
1095 stringified_plans: e.stringified_plans.clone(),
1096 schema: Arc::clone(&e.schema),
1097 logical_optimization_succeeded: e.logical_optimization_succeeded,
1098 }))
1099 }
1100 LogicalPlan::Statement(Statement::Prepare(Prepare {
1101 name,
1102 data_types,
1103 ..
1104 })) => {
1105 self.assert_no_expressions(expr)?;
1106 let input = self.only_input(inputs)?;
1107 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1108 name: name.clone(),
1109 data_types: data_types.clone(),
1110 input: Arc::new(input),
1111 })))
1112 }
1113 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1114 self.assert_no_inputs(inputs)?;
1115 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1116 name: name.clone(),
1117 parameters: expr,
1118 })))
1119 }
1120 LogicalPlan::TableScan(ts) => {
1121 self.assert_no_inputs(inputs)?;
1122 Ok(LogicalPlan::TableScan(TableScan {
1123 filters: expr,
1124 ..ts.clone()
1125 }))
1126 }
1127 LogicalPlan::EmptyRelation(_)
1128 | LogicalPlan::Ddl(_)
1129 | LogicalPlan::Statement(_)
1130 | LogicalPlan::DescribeTable(_) => {
1131 self.assert_no_expressions(expr)?;
1133 self.assert_no_inputs(inputs)?;
1134 Ok(self.clone())
1135 }
1136 LogicalPlan::Unnest(Unnest {
1137 exec_columns: columns,
1138 options,
1139 ..
1140 }) => {
1141 self.assert_no_expressions(expr)?;
1142 let input = self.only_input(inputs)?;
1143 let new_plan =
1145 unnest_with_options(input, columns.clone(), options.clone())?;
1146 Ok(new_plan)
1147 }
1148 }
1149 }
1150
1151 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1153 match check {
1154 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1155 InvariantLevel::Executable => assert_executable_invariants(self),
1156 }
1157 }
1158
1159 #[inline]
1161 #[allow(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1163 if !expr.is_empty() {
1164 return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1165 }
1166 Ok(())
1167 }
1168
1169 #[inline]
1171 #[allow(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1173 if !inputs.is_empty() {
1174 return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1175 }
1176 Ok(())
1177 }
1178
1179 #[inline]
1181 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1182 if expr.len() != 1 {
1183 return internal_err!(
1184 "{self:?} should have exactly one expr, got {:?}",
1185 expr
1186 );
1187 }
1188 Ok(expr.remove(0))
1189 }
1190
1191 #[inline]
1193 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1194 if inputs.len() != 1 {
1195 return internal_err!(
1196 "{self:?} should have exactly one input, got {:?}",
1197 inputs
1198 );
1199 }
1200 Ok(inputs.remove(0))
1201 }
1202
1203 #[inline]
1205 fn only_two_inputs(
1206 &self,
1207 mut inputs: Vec<LogicalPlan>,
1208 ) -> Result<(LogicalPlan, LogicalPlan)> {
1209 if inputs.len() != 2 {
1210 return internal_err!(
1211 "{self:?} should have exactly two inputs, got {:?}",
1212 inputs
1213 );
1214 }
1215 let right = inputs.remove(1);
1216 let left = inputs.remove(0);
1217 Ok((left, right))
1218 }
1219
1220 pub fn with_param_values(
1274 self,
1275 param_values: impl Into<ParamValues>,
1276 ) -> Result<LogicalPlan> {
1277 let param_values = param_values.into();
1278 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1279
1280 Ok(
1282 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1283 plan_with_values
1284 {
1285 param_values.verify(&prepare_lp.data_types)?;
1286 Arc::unwrap_or_clone(prepare_lp.input)
1288 } else {
1289 plan_with_values
1290 },
1291 )
1292 }
1293
1294 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1299 match self {
1300 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1301 LogicalPlan::Filter(filter) => {
1302 if filter.is_scalar() {
1303 Some(1)
1304 } else {
1305 filter.input.max_rows()
1306 }
1307 }
1308 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1309 LogicalPlan::Aggregate(Aggregate {
1310 input, group_expr, ..
1311 }) => {
1312 if group_expr
1314 .iter()
1315 .all(|expr| matches!(expr, Expr::Literal(_, _)))
1316 {
1317 Some(1)
1318 } else {
1319 input.max_rows()
1320 }
1321 }
1322 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1323 match (fetch, input.max_rows()) {
1324 (Some(fetch_limit), Some(input_max)) => {
1325 Some(input_max.min(*fetch_limit))
1326 }
1327 (Some(fetch_limit), None) => Some(*fetch_limit),
1328 (None, Some(input_max)) => Some(input_max),
1329 (None, None) => None,
1330 }
1331 }
1332 LogicalPlan::Join(Join {
1333 left,
1334 right,
1335 join_type,
1336 ..
1337 }) => match join_type {
1338 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1339 JoinType::Left | JoinType::Right | JoinType::Full => {
1340 match (left.max_rows()?, right.max_rows()?, join_type) {
1341 (0, 0, _) => Some(0),
1342 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1343 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1344 (left_max, right_max, _) => Some(left_max * right_max),
1345 }
1346 }
1347 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1348 left.max_rows()
1349 }
1350 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1351 right.max_rows()
1352 }
1353 },
1354 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1355 LogicalPlan::Union(Union { inputs, .. }) => {
1356 inputs.iter().try_fold(0usize, |mut acc, plan| {
1357 acc += plan.max_rows()?;
1358 Some(acc)
1359 })
1360 }
1361 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1362 LogicalPlan::EmptyRelation(_) => Some(0),
1363 LogicalPlan::RecursiveQuery(_) => None,
1364 LogicalPlan::Subquery(_) => None,
1365 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1366 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1367 Ok(FetchType::Literal(s)) => s,
1368 _ => None,
1369 },
1370 LogicalPlan::Distinct(
1371 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1372 ) => input.max_rows(),
1373 LogicalPlan::Values(v) => Some(v.values.len()),
1374 LogicalPlan::Unnest(_) => None,
1375 LogicalPlan::Ddl(_)
1376 | LogicalPlan::Explain(_)
1377 | LogicalPlan::Analyze(_)
1378 | LogicalPlan::Dml(_)
1379 | LogicalPlan::Copy(_)
1380 | LogicalPlan::DescribeTable(_)
1381 | LogicalPlan::Statement(_)
1382 | LogicalPlan::Extension(_) => None,
1383 }
1384 }
1385
1386 pub fn contains_outer_reference(&self) -> bool {
1388 let mut contains = false;
1389 self.apply_expressions(|expr| {
1390 Ok(if expr.contains_outer() {
1391 contains = true;
1392 TreeNodeRecursion::Stop
1393 } else {
1394 TreeNodeRecursion::Continue
1395 })
1396 })
1397 .unwrap();
1398 contains
1399 }
1400
1401 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1409 match self {
1410 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1411 .output_expressions()?
1412 .into_iter()
1413 .zip(self.schema().columns())
1414 .collect()),
1415 LogicalPlan::Window(Window {
1416 window_expr,
1417 input,
1418 schema,
1419 }) => {
1420 let mut output_exprs = input.columnized_output_exprs()?;
1428 let input_len = input.schema().fields().len();
1429 output_exprs.extend(
1430 window_expr
1431 .iter()
1432 .zip(schema.columns().into_iter().skip(input_len)),
1433 );
1434 Ok(output_exprs)
1435 }
1436 _ => Ok(vec![]),
1437 }
1438 }
1439}
1440
1441impl LogicalPlan {
1442 pub fn replace_params_with_values(
1449 self,
1450 param_values: &ParamValues,
1451 ) -> Result<LogicalPlan> {
1452 self.transform_up_with_subqueries(|plan| {
1453 let schema = Arc::clone(plan.schema());
1454 let name_preserver = NamePreserver::new(&plan);
1455 plan.map_expressions(|e| {
1456 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1457 if !has_placeholder {
1458 Ok(Transformed::no(e))
1462 } else {
1463 let original_name = name_preserver.save(&e);
1464 let transformed_expr = e.transform_up(|e| {
1465 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1466 let value = param_values.get_placeholders_with_values(&id)?;
1467 Ok(Transformed::yes(Expr::Literal(value, None)))
1468 } else {
1469 Ok(Transformed::no(e))
1470 }
1471 })?;
1472 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1474 }
1475 })
1476 })
1477 .map(|res| res.data)
1478 }
1479
1480 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1482 let mut param_names = HashSet::new();
1483 self.apply_with_subqueries(|plan| {
1484 plan.apply_expressions(|expr| {
1485 expr.apply(|expr| {
1486 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1487 param_names.insert(id.clone());
1488 }
1489 Ok(TreeNodeRecursion::Continue)
1490 })
1491 })
1492 })
1493 .map(|_| param_names)
1494 }
1495
1496 pub fn get_parameter_types(
1498 &self,
1499 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1500 let mut param_types: HashMap<String, Option<DataType>> = HashMap::new();
1501
1502 self.apply_with_subqueries(|plan| {
1503 plan.apply_expressions(|expr| {
1504 expr.apply(|expr| {
1505 if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
1506 let prev = param_types.get(id);
1507 match (prev, data_type) {
1508 (Some(Some(prev)), Some(dt)) => {
1509 if prev != dt {
1510 plan_err!("Conflicting types for {id}")?;
1511 }
1512 }
1513 (_, Some(dt)) => {
1514 param_types.insert(id.clone(), Some(dt.clone()));
1515 }
1516 _ => {
1517 param_types.insert(id.clone(), None);
1518 }
1519 }
1520 }
1521 Ok(TreeNodeRecursion::Continue)
1522 })
1523 })
1524 })
1525 .map(|_| param_types)
1526 }
1527
1528 pub fn display_indent(&self) -> impl Display + '_ {
1560 struct Wrapper<'a>(&'a LogicalPlan);
1563 impl Display for Wrapper<'_> {
1564 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1565 let with_schema = false;
1566 let mut visitor = IndentVisitor::new(f, with_schema);
1567 match self.0.visit_with_subqueries(&mut visitor) {
1568 Ok(_) => Ok(()),
1569 Err(_) => Err(fmt::Error),
1570 }
1571 }
1572 }
1573 Wrapper(self)
1574 }
1575
1576 pub fn display_indent_schema(&self) -> impl Display + '_ {
1603 struct Wrapper<'a>(&'a LogicalPlan);
1606 impl Display for Wrapper<'_> {
1607 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1608 let with_schema = true;
1609 let mut visitor = IndentVisitor::new(f, with_schema);
1610 match self.0.visit_with_subqueries(&mut visitor) {
1611 Ok(_) => Ok(()),
1612 Err(_) => Err(fmt::Error),
1613 }
1614 }
1615 }
1616 Wrapper(self)
1617 }
1618
1619 pub fn display_pg_json(&self) -> impl Display + '_ {
1623 struct Wrapper<'a>(&'a LogicalPlan);
1626 impl Display for Wrapper<'_> {
1627 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1628 let mut visitor = PgJsonVisitor::new(f);
1629 visitor.with_schema(true);
1630 match self.0.visit_with_subqueries(&mut visitor) {
1631 Ok(_) => Ok(()),
1632 Err(_) => Err(fmt::Error),
1633 }
1634 }
1635 }
1636 Wrapper(self)
1637 }
1638
1639 pub fn display_graphviz(&self) -> impl Display + '_ {
1669 struct Wrapper<'a>(&'a LogicalPlan);
1672 impl Display for Wrapper<'_> {
1673 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1674 let mut visitor = GraphvizVisitor::new(f);
1675
1676 visitor.start_graph()?;
1677
1678 visitor.pre_visit_plan("LogicalPlan")?;
1679 self.0
1680 .visit_with_subqueries(&mut visitor)
1681 .map_err(|_| fmt::Error)?;
1682 visitor.post_visit_plan()?;
1683
1684 visitor.set_with_schema(true);
1685 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1686 self.0
1687 .visit_with_subqueries(&mut visitor)
1688 .map_err(|_| fmt::Error)?;
1689 visitor.post_visit_plan()?;
1690
1691 visitor.end_graph()?;
1692 Ok(())
1693 }
1694 }
1695 Wrapper(self)
1696 }
1697
1698 pub fn display(&self) -> impl Display + '_ {
1720 struct Wrapper<'a>(&'a LogicalPlan);
1723 impl Display for Wrapper<'_> {
1724 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1725 match self.0 {
1726 LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row, schema: _ }) => {
1727 let rows = if *produce_one_row { 1 } else { 0 };
1728 write!(f, "EmptyRelation: rows={rows}")
1729 },
1730 LogicalPlan::RecursiveQuery(RecursiveQuery {
1731 is_distinct, ..
1732 }) => {
1733 write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1734 }
1735 LogicalPlan::Values(Values { ref values, .. }) => {
1736 let str_values: Vec<_> = values
1737 .iter()
1738 .take(5)
1740 .map(|row| {
1741 let item = row
1742 .iter()
1743 .map(|expr| expr.to_string())
1744 .collect::<Vec<_>>()
1745 .join(", ");
1746 format!("({item})")
1747 })
1748 .collect();
1749
1750 let eclipse = if values.len() > 5 { "..." } else { "" };
1751 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1752 }
1753
1754 LogicalPlan::TableScan(TableScan {
1755 ref source,
1756 ref table_name,
1757 ref projection,
1758 ref filters,
1759 ref fetch,
1760 ..
1761 }) => {
1762 let projected_fields = match projection {
1763 Some(indices) => {
1764 let schema = source.schema();
1765 let names: Vec<&str> = indices
1766 .iter()
1767 .map(|i| schema.field(*i).name().as_str())
1768 .collect();
1769 format!(" projection=[{}]", names.join(", "))
1770 }
1771 _ => "".to_string(),
1772 };
1773
1774 write!(f, "TableScan: {table_name}{projected_fields}")?;
1775
1776 if !filters.is_empty() {
1777 let mut full_filter = vec![];
1778 let mut partial_filter = vec![];
1779 let mut unsupported_filters = vec![];
1780 let filters: Vec<&Expr> = filters.iter().collect();
1781
1782 if let Ok(results) =
1783 source.supports_filters_pushdown(&filters)
1784 {
1785 filters.iter().zip(results.iter()).for_each(
1786 |(x, res)| match res {
1787 TableProviderFilterPushDown::Exact => {
1788 full_filter.push(x)
1789 }
1790 TableProviderFilterPushDown::Inexact => {
1791 partial_filter.push(x)
1792 }
1793 TableProviderFilterPushDown::Unsupported => {
1794 unsupported_filters.push(x)
1795 }
1796 },
1797 );
1798 }
1799
1800 if !full_filter.is_empty() {
1801 write!(
1802 f,
1803 ", full_filters=[{}]",
1804 expr_vec_fmt!(full_filter)
1805 )?;
1806 };
1807 if !partial_filter.is_empty() {
1808 write!(
1809 f,
1810 ", partial_filters=[{}]",
1811 expr_vec_fmt!(partial_filter)
1812 )?;
1813 }
1814 if !unsupported_filters.is_empty() {
1815 write!(
1816 f,
1817 ", unsupported_filters=[{}]",
1818 expr_vec_fmt!(unsupported_filters)
1819 )?;
1820 }
1821 }
1822
1823 if let Some(n) = fetch {
1824 write!(f, ", fetch={n}")?;
1825 }
1826
1827 Ok(())
1828 }
1829 LogicalPlan::Projection(Projection { ref expr, .. }) => {
1830 write!(f, "Projection:")?;
1831 for (i, expr_item) in expr.iter().enumerate() {
1832 if i > 0 {
1833 write!(f, ",")?;
1834 }
1835 write!(f, " {expr_item}")?;
1836 }
1837 Ok(())
1838 }
1839 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1840 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1841 }
1842 LogicalPlan::Copy(CopyTo {
1843 input: _,
1844 output_url,
1845 file_type,
1846 options,
1847 ..
1848 }) => {
1849 let op_str = options
1850 .iter()
1851 .map(|(k, v)| format!("{k} {v}"))
1852 .collect::<Vec<String>>()
1853 .join(", ");
1854
1855 write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1856 }
1857 LogicalPlan::Ddl(ddl) => {
1858 write!(f, "{}", ddl.display())
1859 }
1860 LogicalPlan::Filter(Filter {
1861 predicate: ref expr,
1862 ..
1863 }) => write!(f, "Filter: {expr}"),
1864 LogicalPlan::Window(Window {
1865 ref window_expr, ..
1866 }) => {
1867 write!(
1868 f,
1869 "WindowAggr: windowExpr=[[{}]]",
1870 expr_vec_fmt!(window_expr)
1871 )
1872 }
1873 LogicalPlan::Aggregate(Aggregate {
1874 ref group_expr,
1875 ref aggr_expr,
1876 ..
1877 }) => write!(
1878 f,
1879 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1880 expr_vec_fmt!(group_expr),
1881 expr_vec_fmt!(aggr_expr)
1882 ),
1883 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1884 write!(f, "Sort: ")?;
1885 for (i, expr_item) in expr.iter().enumerate() {
1886 if i > 0 {
1887 write!(f, ", ")?;
1888 }
1889 write!(f, "{expr_item}")?;
1890 }
1891 if let Some(a) = fetch {
1892 write!(f, ", fetch={a}")?;
1893 }
1894
1895 Ok(())
1896 }
1897 LogicalPlan::Join(Join {
1898 on: ref keys,
1899 filter,
1900 join_constraint,
1901 join_type,
1902 ..
1903 }) => {
1904 let join_expr: Vec<String> =
1905 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1906 let filter_expr = filter
1907 .as_ref()
1908 .map(|expr| format!(" Filter: {expr}"))
1909 .unwrap_or_else(|| "".to_string());
1910 let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1911 "Cross".to_string()
1912 } else {
1913 join_type.to_string()
1914 };
1915 match join_constraint {
1916 JoinConstraint::On => {
1917 write!(
1918 f,
1919 "{} Join: {}{}",
1920 join_type,
1921 join_expr.join(", "),
1922 filter_expr
1923 )
1924 }
1925 JoinConstraint::Using => {
1926 write!(
1927 f,
1928 "{} Join: Using {}{}",
1929 join_type,
1930 join_expr.join(", "),
1931 filter_expr,
1932 )
1933 }
1934 }
1935 }
1936 LogicalPlan::Repartition(Repartition {
1937 partitioning_scheme,
1938 ..
1939 }) => match partitioning_scheme {
1940 Partitioning::RoundRobinBatch(n) => {
1941 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1942 }
1943 Partitioning::Hash(expr, n) => {
1944 let hash_expr: Vec<String> =
1945 expr.iter().map(|e| format!("{e}")).collect();
1946 write!(
1947 f,
1948 "Repartition: Hash({}) partition_count={}",
1949 hash_expr.join(", "),
1950 n
1951 )
1952 }
1953 Partitioning::DistributeBy(expr) => {
1954 let dist_by_expr: Vec<String> =
1955 expr.iter().map(|e| format!("{e}")).collect();
1956 write!(
1957 f,
1958 "Repartition: DistributeBy({})",
1959 dist_by_expr.join(", "),
1960 )
1961 }
1962 },
1963 LogicalPlan::Limit(limit) => {
1964 let skip_str = match limit.get_skip_type() {
1966 Ok(SkipType::Literal(n)) => n.to_string(),
1967 _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1968 };
1969 let fetch_str = match limit.get_fetch_type() {
1970 Ok(FetchType::Literal(Some(n))) => n.to_string(),
1971 Ok(FetchType::Literal(None)) => "None".to_string(),
1972 _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1973 };
1974 write!(
1975 f,
1976 "Limit: skip={skip_str}, fetch={fetch_str}",
1977 )
1978 }
1979 LogicalPlan::Subquery(Subquery { .. }) => {
1980 write!(f, "Subquery:")
1981 }
1982 LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
1983 write!(f, "SubqueryAlias: {alias}")
1984 }
1985 LogicalPlan::Statement(statement) => {
1986 write!(f, "{}", statement.display())
1987 }
1988 LogicalPlan::Distinct(distinct) => match distinct {
1989 Distinct::All(_) => write!(f, "Distinct:"),
1990 Distinct::On(DistinctOn {
1991 on_expr,
1992 select_expr,
1993 sort_expr,
1994 ..
1995 }) => write!(
1996 f,
1997 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
1998 expr_vec_fmt!(on_expr),
1999 expr_vec_fmt!(select_expr),
2000 if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
2001 ),
2002 },
2003 LogicalPlan::Explain { .. } => write!(f, "Explain"),
2004 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2005 LogicalPlan::Union(_) => write!(f, "Union"),
2006 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2007 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2008 write!(f, "DescribeTable")
2009 }
2010 LogicalPlan::Unnest(Unnest {
2011 input: plan,
2012 list_type_columns: list_col_indices,
2013 struct_type_columns: struct_col_indices, .. }) => {
2014 let input_columns = plan.schema().columns();
2015 let list_type_columns = list_col_indices
2016 .iter()
2017 .map(|(i,unnest_info)|
2018 format!("{}|depth={}", &input_columns[*i].to_string(),
2019 unnest_info.depth))
2020 .collect::<Vec<String>>();
2021 let struct_type_columns = struct_col_indices
2022 .iter()
2023 .map(|i| &input_columns[*i])
2024 .collect::<Vec<&Column>>();
2025 write!(f, "Unnest: lists[{}] structs[{}]",
2027 expr_vec_fmt!(list_type_columns),
2028 expr_vec_fmt!(struct_type_columns))
2029 }
2030 }
2031 }
2032 }
2033 Wrapper(self)
2034 }
2035}
2036
2037impl Display for LogicalPlan {
2038 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2039 self.display_indent().fmt(f)
2040 }
2041}
2042
2043impl ToStringifiedPlan for LogicalPlan {
2044 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2045 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2046 }
2047}
2048
2049#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2053pub struct EmptyRelation {
2054 pub produce_one_row: bool,
2056 pub schema: DFSchemaRef,
2058}
2059
2060impl PartialOrd for EmptyRelation {
2062 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2063 self.produce_one_row.partial_cmp(&other.produce_one_row)
2064 }
2065}
2066
2067#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2090pub struct RecursiveQuery {
2091 pub name: String,
2093 pub static_term: Arc<LogicalPlan>,
2095 pub recursive_term: Arc<LogicalPlan>,
2098 pub is_distinct: bool,
2101}
2102
2103#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2107pub struct Values {
2108 pub schema: DFSchemaRef,
2110 pub values: Vec<Vec<Expr>>,
2112}
2113
2114impl PartialOrd for Values {
2116 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2117 self.values.partial_cmp(&other.values)
2118 }
2119}
2120
2121#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2124#[non_exhaustive]
2126pub struct Projection {
2127 pub expr: Vec<Expr>,
2129 pub input: Arc<LogicalPlan>,
2131 pub schema: DFSchemaRef,
2133}
2134
2135impl PartialOrd for Projection {
2137 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2138 match self.expr.partial_cmp(&other.expr) {
2139 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2140 cmp => cmp,
2141 }
2142 }
2143}
2144
2145impl Projection {
2146 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2148 let projection_schema = projection_schema(&input, &expr)?;
2149 Self::try_new_with_schema(expr, input, projection_schema)
2150 }
2151
2152 pub fn try_new_with_schema(
2154 expr: Vec<Expr>,
2155 input: Arc<LogicalPlan>,
2156 schema: DFSchemaRef,
2157 ) -> Result<Self> {
2158 #[expect(deprecated)]
2159 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2160 && expr.len() != schema.fields().len()
2161 {
2162 return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2163 }
2164 Ok(Self {
2165 expr,
2166 input,
2167 schema,
2168 })
2169 }
2170
2171 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2173 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2174 Self {
2175 expr,
2176 input,
2177 schema,
2178 }
2179 }
2180}
2181
2182pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2196 let metadata = input.schema().metadata().clone();
2197
2198 let schema =
2199 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2200 .with_functional_dependencies(calc_func_dependencies_for_project(
2201 exprs, input,
2202 )?)?;
2203
2204 Ok(Arc::new(schema))
2205}
2206
2207#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2209#[non_exhaustive]
2211pub struct SubqueryAlias {
2212 pub input: Arc<LogicalPlan>,
2214 pub alias: TableReference,
2216 pub schema: DFSchemaRef,
2218}
2219
2220impl SubqueryAlias {
2221 pub fn try_new(
2222 plan: Arc<LogicalPlan>,
2223 alias: impl Into<TableReference>,
2224 ) -> Result<Self> {
2225 let alias = alias.into();
2226 let fields = change_redundant_column(plan.schema().fields());
2227 let meta_data = plan.schema().as_ref().metadata().clone();
2228 let schema: Schema =
2229 DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into();
2230 let func_dependencies = plan.schema().functional_dependencies().clone();
2233 let schema = DFSchemaRef::new(
2234 DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
2235 .with_functional_dependencies(func_dependencies)?,
2236 );
2237 Ok(SubqueryAlias {
2238 input: plan,
2239 alias,
2240 schema,
2241 })
2242 }
2243}
2244
2245impl PartialOrd for SubqueryAlias {
2247 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2248 match self.input.partial_cmp(&other.input) {
2249 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2250 cmp => cmp,
2251 }
2252 }
2253}
2254
2255#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2267#[non_exhaustive]
2268pub struct Filter {
2269 pub predicate: Expr,
2271 pub input: Arc<LogicalPlan>,
2273}
2274
2275impl Filter {
2276 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2281 Self::try_new_internal(predicate, input)
2282 }
2283
2284 #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2287 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2288 Self::try_new_internal(predicate, input)
2289 }
2290
2291 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2292 match data_type {
2293 DataType::Boolean | DataType::Null => true,
2295 DataType::Dictionary(_, value_type) => {
2296 Filter::is_allowed_filter_type(value_type.as_ref())
2297 }
2298 _ => false,
2299 }
2300 }
2301
2302 fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2303 if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2308 if !Filter::is_allowed_filter_type(&predicate_type) {
2309 return plan_err!(
2310 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2311 );
2312 }
2313 }
2314
2315 Ok(Self {
2316 predicate: predicate.unalias_nested().data,
2317 input,
2318 })
2319 }
2320
2321 fn is_scalar(&self) -> bool {
2337 let schema = self.input.schema();
2338
2339 let functional_dependencies = self.input.schema().functional_dependencies();
2340 let unique_keys = functional_dependencies.iter().filter(|dep| {
2341 let nullable = dep.nullable
2342 && dep
2343 .source_indices
2344 .iter()
2345 .any(|&source| schema.field(source).is_nullable());
2346 !nullable
2347 && dep.mode == Dependency::Single
2348 && dep.target_indices.len() == schema.fields().len()
2349 });
2350
2351 let exprs = split_conjunction(&self.predicate);
2352 let eq_pred_cols: HashSet<_> = exprs
2353 .iter()
2354 .filter_map(|expr| {
2355 let Expr::BinaryExpr(BinaryExpr {
2356 left,
2357 op: Operator::Eq,
2358 right,
2359 }) = expr
2360 else {
2361 return None;
2362 };
2363 if left == right {
2365 return None;
2366 }
2367
2368 match (left.as_ref(), right.as_ref()) {
2369 (Expr::Column(_), Expr::Column(_)) => None,
2370 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2371 Some(schema.index_of_column(c).unwrap())
2372 }
2373 _ => None,
2374 }
2375 })
2376 .collect();
2377
2378 for key in unique_keys {
2381 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2382 return true;
2383 }
2384 }
2385 false
2386 }
2387}
2388
2389#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2404pub struct Window {
2405 pub input: Arc<LogicalPlan>,
2407 pub window_expr: Vec<Expr>,
2409 pub schema: DFSchemaRef,
2411}
2412
2413impl Window {
2414 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2416 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2417 .schema()
2418 .iter()
2419 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2420 .collect();
2421 let input_len = fields.len();
2422 let mut window_fields = fields;
2423 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2424 window_fields.extend_from_slice(expr_fields.as_slice());
2425 let metadata = input.schema().metadata().clone();
2426
2427 let mut window_func_dependencies =
2429 input.schema().functional_dependencies().clone();
2430 window_func_dependencies.extend_target_indices(window_fields.len());
2431
2432 let mut new_dependencies = window_expr
2436 .iter()
2437 .enumerate()
2438 .filter_map(|(idx, expr)| {
2439 let Expr::WindowFunction(window_fun) = expr else {
2440 return None;
2441 };
2442 let WindowFunction {
2443 fun: WindowFunctionDefinition::WindowUDF(udwf),
2444 params: WindowFunctionParams { partition_by, .. },
2445 } = window_fun.as_ref()
2446 else {
2447 return None;
2448 };
2449 if udwf.name() == "row_number" && partition_by.is_empty() {
2452 Some(idx + input_len)
2453 } else {
2454 None
2455 }
2456 })
2457 .map(|idx| {
2458 FunctionalDependence::new(vec![idx], vec![], false)
2459 .with_mode(Dependency::Single)
2460 })
2461 .collect::<Vec<_>>();
2462
2463 if !new_dependencies.is_empty() {
2464 for dependence in new_dependencies.iter_mut() {
2465 dependence.target_indices = (0..window_fields.len()).collect();
2466 }
2467 let new_deps = FunctionalDependencies::new(new_dependencies);
2469 window_func_dependencies.extend(new_deps);
2470 }
2471
2472 if let Some(e) = window_expr.iter().find(|e| {
2474 matches!(
2475 e,
2476 Expr::WindowFunction(wf)
2477 if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2478 && wf.params.filter.is_some()
2479 )
2480 }) {
2481 return plan_err!(
2482 "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2483 );
2484 }
2485
2486 Self::try_new_with_schema(
2487 window_expr,
2488 input,
2489 Arc::new(
2490 DFSchema::new_with_metadata(window_fields, metadata)?
2491 .with_functional_dependencies(window_func_dependencies)?,
2492 ),
2493 )
2494 }
2495
2496 pub fn try_new_with_schema(
2497 window_expr: Vec<Expr>,
2498 input: Arc<LogicalPlan>,
2499 schema: DFSchemaRef,
2500 ) -> Result<Self> {
2501 if window_expr.len() != schema.fields().len() - input.schema().fields().len() {
2502 return plan_err!(
2503 "Window has mismatch between number of expressions ({}) and number of fields in schema ({})",
2504 window_expr.len(),
2505 schema.fields().len() - input.schema().fields().len()
2506 );
2507 }
2508
2509 Ok(Window {
2510 input,
2511 window_expr,
2512 schema,
2513 })
2514 }
2515}
2516
2517impl PartialOrd for Window {
2519 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2520 match self.input.partial_cmp(&other.input)? {
2521 Ordering::Equal => {} not_equal => return Some(not_equal),
2523 }
2524
2525 match self.window_expr.partial_cmp(&other.window_expr)? {
2526 Ordering::Equal => {} not_equal => return Some(not_equal),
2528 }
2529
2530 if self == other {
2533 Some(Ordering::Equal)
2534 } else {
2535 None
2536 }
2537 }
2538}
2539
2540#[derive(Clone)]
2542pub struct TableScan {
2543 pub table_name: TableReference,
2545 pub source: Arc<dyn TableSource>,
2547 pub projection: Option<Vec<usize>>,
2549 pub projected_schema: DFSchemaRef,
2551 pub filters: Vec<Expr>,
2553 pub fetch: Option<usize>,
2555}
2556
2557impl Debug for TableScan {
2558 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2559 f.debug_struct("TableScan")
2560 .field("table_name", &self.table_name)
2561 .field("source", &"...")
2562 .field("projection", &self.projection)
2563 .field("projected_schema", &self.projected_schema)
2564 .field("filters", &self.filters)
2565 .field("fetch", &self.fetch)
2566 .finish_non_exhaustive()
2567 }
2568}
2569
2570impl PartialEq for TableScan {
2571 fn eq(&self, other: &Self) -> bool {
2572 self.table_name == other.table_name
2573 && self.projection == other.projection
2574 && self.projected_schema == other.projected_schema
2575 && self.filters == other.filters
2576 && self.fetch == other.fetch
2577 }
2578}
2579
2580impl Eq for TableScan {}
2581
2582impl PartialOrd for TableScan {
2585 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2586 #[derive(PartialEq, PartialOrd)]
2587 struct ComparableTableScan<'a> {
2588 pub table_name: &'a TableReference,
2590 pub projection: &'a Option<Vec<usize>>,
2592 pub filters: &'a Vec<Expr>,
2594 pub fetch: &'a Option<usize>,
2596 }
2597 let comparable_self = ComparableTableScan {
2598 table_name: &self.table_name,
2599 projection: &self.projection,
2600 filters: &self.filters,
2601 fetch: &self.fetch,
2602 };
2603 let comparable_other = ComparableTableScan {
2604 table_name: &other.table_name,
2605 projection: &other.projection,
2606 filters: &other.filters,
2607 fetch: &other.fetch,
2608 };
2609 comparable_self.partial_cmp(&comparable_other)
2610 }
2611}
2612
2613impl Hash for TableScan {
2614 fn hash<H: Hasher>(&self, state: &mut H) {
2615 self.table_name.hash(state);
2616 self.projection.hash(state);
2617 self.projected_schema.hash(state);
2618 self.filters.hash(state);
2619 self.fetch.hash(state);
2620 }
2621}
2622
2623impl TableScan {
2624 pub fn try_new(
2627 table_name: impl Into<TableReference>,
2628 table_source: Arc<dyn TableSource>,
2629 projection: Option<Vec<usize>>,
2630 filters: Vec<Expr>,
2631 fetch: Option<usize>,
2632 ) -> Result<Self> {
2633 let table_name = table_name.into();
2634
2635 if table_name.table().is_empty() {
2636 return plan_err!("table_name cannot be empty");
2637 }
2638 let schema = table_source.schema();
2639 let func_dependencies = FunctionalDependencies::new_from_constraints(
2640 table_source.constraints(),
2641 schema.fields.len(),
2642 );
2643 let projected_schema = projection
2644 .as_ref()
2645 .map(|p| {
2646 let projected_func_dependencies =
2647 func_dependencies.project_functional_dependencies(p, p.len());
2648
2649 let df_schema = DFSchema::new_with_metadata(
2650 p.iter()
2651 .map(|i| {
2652 (Some(table_name.clone()), Arc::new(schema.field(*i).clone()))
2653 })
2654 .collect(),
2655 schema.metadata.clone(),
2656 )?;
2657 df_schema.with_functional_dependencies(projected_func_dependencies)
2658 })
2659 .unwrap_or_else(|| {
2660 let df_schema =
2661 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2662 df_schema.with_functional_dependencies(func_dependencies)
2663 })?;
2664 let projected_schema = Arc::new(projected_schema);
2665
2666 Ok(Self {
2667 table_name,
2668 source: table_source,
2669 projection,
2670 projected_schema,
2671 filters,
2672 fetch,
2673 })
2674 }
2675}
2676
2677#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2679pub struct Repartition {
2680 pub input: Arc<LogicalPlan>,
2682 pub partitioning_scheme: Partitioning,
2684}
2685
2686#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2688pub struct Union {
2689 pub inputs: Vec<Arc<LogicalPlan>>,
2691 pub schema: DFSchemaRef,
2693}
2694
2695impl Union {
2696 fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2698 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2699 Ok(Union { inputs, schema })
2700 }
2701
2702 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2707 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2708 Ok(Union { inputs, schema })
2709 }
2710
2711 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2715 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2716 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2717
2718 Ok(Union { inputs, schema })
2719 }
2720
2721 fn rewrite_inputs_from_schema(
2725 schema: &Arc<DFSchema>,
2726 inputs: Vec<Arc<LogicalPlan>>,
2727 ) -> Result<Vec<Arc<LogicalPlan>>> {
2728 let schema_width = schema.iter().count();
2729 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2730 for input in inputs {
2731 let mut expr = Vec::with_capacity(schema_width);
2735 for column in schema.columns() {
2736 if input
2737 .schema()
2738 .has_column_with_unqualified_name(column.name())
2739 {
2740 expr.push(Expr::Column(column));
2741 } else {
2742 expr.push(
2743 Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2744 );
2745 }
2746 }
2747 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2748 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2749 )));
2750 }
2751
2752 Ok(wrapped_inputs)
2753 }
2754
2755 fn derive_schema_from_inputs(
2764 inputs: &[Arc<LogicalPlan>],
2765 loose_types: bool,
2766 by_name: bool,
2767 ) -> Result<DFSchemaRef> {
2768 if inputs.len() < 2 {
2769 return plan_err!("UNION requires at least two inputs");
2770 }
2771
2772 if by_name {
2773 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2774 } else {
2775 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2776 }
2777 }
2778
2779 fn derive_schema_from_inputs_by_name(
2780 inputs: &[Arc<LogicalPlan>],
2781 loose_types: bool,
2782 ) -> Result<DFSchemaRef> {
2783 type FieldData<'a> =
2784 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2785 let mut cols: Vec<(&str, FieldData)> = Vec::new();
2786 for input in inputs.iter() {
2787 for field in input.schema().fields() {
2788 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2789 cols.iter_mut().find(|(name, _)| name == field.name())
2790 {
2791 if !loose_types && *data_type != field.data_type() {
2792 return plan_err!(
2793 "Found different types for field {}",
2794 field.name()
2795 );
2796 }
2797
2798 metadata.push(field.metadata());
2799 *is_nullable |= field.is_nullable();
2802 *occurrences += 1;
2803 } else {
2804 cols.push((
2805 field.name(),
2806 (
2807 field.data_type(),
2808 field.is_nullable(),
2809 vec![field.metadata()],
2810 1,
2811 ),
2812 ));
2813 }
2814 }
2815 }
2816
2817 let union_fields = cols
2818 .into_iter()
2819 .map(
2820 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2821 let final_is_nullable = if occurrences == inputs.len() {
2825 is_nullable
2826 } else {
2827 true
2828 };
2829
2830 let mut field =
2831 Field::new(name, data_type.clone(), final_is_nullable);
2832 field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
2833
2834 (None, Arc::new(field))
2835 },
2836 )
2837 .collect::<Vec<(Option<TableReference>, _)>>();
2838
2839 let union_schema_metadata = intersect_metadata_for_union(
2840 inputs.iter().map(|input| input.schema().metadata()),
2841 );
2842
2843 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2845 let schema = Arc::new(schema);
2846
2847 Ok(schema)
2848 }
2849
2850 fn derive_schema_from_inputs_by_position(
2851 inputs: &[Arc<LogicalPlan>],
2852 loose_types: bool,
2853 ) -> Result<DFSchemaRef> {
2854 let first_schema = inputs[0].schema();
2855 let fields_count = first_schema.fields().len();
2856 for input in inputs.iter().skip(1) {
2857 if fields_count != input.schema().fields().len() {
2858 return plan_err!(
2859 "UNION queries have different number of columns: \
2860 left has {} columns whereas right has {} columns",
2861 fields_count,
2862 input.schema().fields().len()
2863 );
2864 }
2865 }
2866
2867 let mut name_counts: HashMap<String, usize> = HashMap::new();
2868 let union_fields = (0..fields_count)
2869 .map(|i| {
2870 let fields = inputs
2871 .iter()
2872 .map(|input| input.schema().field(i))
2873 .collect::<Vec<_>>();
2874 let first_field = fields[0];
2875 let base_name = first_field.name().to_string();
2876
2877 let data_type = if loose_types {
2878 first_field.data_type()
2882 } else {
2883 fields.iter().skip(1).try_fold(
2884 first_field.data_type(),
2885 |acc, field| {
2886 if acc != field.data_type() {
2887 return plan_err!(
2888 "UNION field {i} have different type in inputs: \
2889 left has {} whereas right has {}",
2890 first_field.data_type(),
2891 field.data_type()
2892 );
2893 }
2894 Ok(acc)
2895 },
2896 )?
2897 };
2898 let nullable = fields.iter().any(|field| field.is_nullable());
2899
2900 let name = if let Some(count) = name_counts.get_mut(&base_name) {
2902 *count += 1;
2903 format!("{base_name}_{count}")
2904 } else {
2905 name_counts.insert(base_name.clone(), 0);
2906 base_name
2907 };
2908
2909 let mut field = Field::new(&name, data_type.clone(), nullable);
2910 let field_metadata = intersect_metadata_for_union(
2911 fields.iter().map(|field| field.metadata()),
2912 );
2913 field.set_metadata(field_metadata);
2914 Ok((None, Arc::new(field)))
2915 })
2916 .collect::<Result<_>>()?;
2917 let union_schema_metadata = intersect_metadata_for_union(
2918 inputs.iter().map(|input| input.schema().metadata()),
2919 );
2920
2921 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2923 let schema = Arc::new(schema);
2924
2925 Ok(schema)
2926 }
2927}
2928
2929impl PartialOrd for Union {
2931 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2932 self.inputs.partial_cmp(&other.inputs)
2933 }
2934}
2935
2936#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2959pub struct DescribeTable {
2960 pub schema: Arc<Schema>,
2962 pub output_schema: DFSchemaRef,
2964}
2965
2966impl PartialOrd for DescribeTable {
2969 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
2970 None
2972 }
2973}
2974
2975#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2977pub enum ExplainFormat {
2978 Indent,
2995 Tree,
3019 PostgresJSON,
3067 Graphviz,
3104}
3105
3106impl FromStr for ExplainFormat {
3108 type Err = DataFusionError;
3109
3110 fn from_str(format: &str) -> std::result::Result<Self, Self::Err> {
3111 match format.to_lowercase().as_str() {
3112 "indent" => Ok(ExplainFormat::Indent),
3113 "tree" => Ok(ExplainFormat::Tree),
3114 "pgjson" => Ok(ExplainFormat::PostgresJSON),
3115 "graphviz" => Ok(ExplainFormat::Graphviz),
3116 _ => {
3117 plan_err!("Invalid explain format. Expected 'indent', 'tree', 'pgjson' or 'graphviz'. Got '{format}'")
3118 }
3119 }
3120 }
3121}
3122
3123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3125pub struct ExplainOption {
3126 pub verbose: bool,
3128 pub analyze: bool,
3130 pub format: ExplainFormat,
3132}
3133
3134impl Default for ExplainOption {
3135 fn default() -> Self {
3136 ExplainOption {
3137 verbose: false,
3138 analyze: false,
3139 format: ExplainFormat::Indent,
3140 }
3141 }
3142}
3143
3144impl ExplainOption {
3145 pub fn with_verbose(mut self, verbose: bool) -> Self {
3147 self.verbose = verbose;
3148 self
3149 }
3150
3151 pub fn with_analyze(mut self, analyze: bool) -> Self {
3153 self.analyze = analyze;
3154 self
3155 }
3156
3157 pub fn with_format(mut self, format: ExplainFormat) -> Self {
3159 self.format = format;
3160 self
3161 }
3162}
3163
3164#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3171pub struct Explain {
3172 pub verbose: bool,
3174 pub explain_format: ExplainFormat,
3177 pub plan: Arc<LogicalPlan>,
3179 pub stringified_plans: Vec<StringifiedPlan>,
3181 pub schema: DFSchemaRef,
3183 pub logical_optimization_succeeded: bool,
3185}
3186
3187impl PartialOrd for Explain {
3189 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3190 #[derive(PartialEq, PartialOrd)]
3191 struct ComparableExplain<'a> {
3192 pub verbose: &'a bool,
3194 pub plan: &'a Arc<LogicalPlan>,
3196 pub stringified_plans: &'a Vec<StringifiedPlan>,
3198 pub logical_optimization_succeeded: &'a bool,
3200 }
3201 let comparable_self = ComparableExplain {
3202 verbose: &self.verbose,
3203 plan: &self.plan,
3204 stringified_plans: &self.stringified_plans,
3205 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3206 };
3207 let comparable_other = ComparableExplain {
3208 verbose: &other.verbose,
3209 plan: &other.plan,
3210 stringified_plans: &other.stringified_plans,
3211 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3212 };
3213 comparable_self.partial_cmp(&comparable_other)
3214 }
3215}
3216
3217#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3220pub struct Analyze {
3221 pub verbose: bool,
3223 pub input: Arc<LogicalPlan>,
3225 pub schema: DFSchemaRef,
3227}
3228
3229impl PartialOrd for Analyze {
3231 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3232 match self.verbose.partial_cmp(&other.verbose) {
3233 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3234 cmp => cmp,
3235 }
3236 }
3237}
3238
3239#[allow(clippy::derived_hash_with_manual_eq)]
3244#[derive(Debug, Clone, Eq, Hash)]
3245pub struct Extension {
3246 pub node: Arc<dyn UserDefinedLogicalNode>,
3248}
3249
3250impl PartialEq for Extension {
3254 fn eq(&self, other: &Self) -> bool {
3255 self.node.eq(&other.node)
3256 }
3257}
3258
3259impl PartialOrd for Extension {
3260 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3261 self.node.partial_cmp(&other.node)
3262 }
3263}
3264
3265#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3267pub struct Limit {
3268 pub skip: Option<Box<Expr>>,
3270 pub fetch: Option<Box<Expr>>,
3273 pub input: Arc<LogicalPlan>,
3275}
3276
3277pub enum SkipType {
3279 Literal(usize),
3281 UnsupportedExpr,
3283}
3284
3285pub enum FetchType {
3287 Literal(Option<usize>),
3290 UnsupportedExpr,
3292}
3293
3294impl Limit {
3295 pub fn get_skip_type(&self) -> Result<SkipType> {
3297 match self.skip.as_deref() {
3298 Some(expr) => match *expr {
3299 Expr::Literal(ScalarValue::Int64(s), _) => {
3300 let s = s.unwrap_or(0);
3302 if s >= 0 {
3303 Ok(SkipType::Literal(s as usize))
3304 } else {
3305 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3306 }
3307 }
3308 _ => Ok(SkipType::UnsupportedExpr),
3309 },
3310 None => Ok(SkipType::Literal(0)),
3312 }
3313 }
3314
3315 pub fn get_fetch_type(&self) -> Result<FetchType> {
3317 match self.fetch.as_deref() {
3318 Some(expr) => match *expr {
3319 Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3320 if s >= 0 {
3321 Ok(FetchType::Literal(Some(s as usize)))
3322 } else {
3323 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3324 }
3325 }
3326 Expr::Literal(ScalarValue::Int64(None), _) => {
3327 Ok(FetchType::Literal(None))
3328 }
3329 _ => Ok(FetchType::UnsupportedExpr),
3330 },
3331 None => Ok(FetchType::Literal(None)),
3332 }
3333 }
3334}
3335
3336#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3338pub enum Distinct {
3339 All(Arc<LogicalPlan>),
3341 On(DistinctOn),
3343}
3344
3345impl Distinct {
3346 pub fn input(&self) -> &Arc<LogicalPlan> {
3348 match self {
3349 Distinct::All(input) => input,
3350 Distinct::On(DistinctOn { input, .. }) => input,
3351 }
3352 }
3353}
3354
3355#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3357pub struct DistinctOn {
3358 pub on_expr: Vec<Expr>,
3360 pub select_expr: Vec<Expr>,
3362 pub sort_expr: Option<Vec<SortExpr>>,
3366 pub input: Arc<LogicalPlan>,
3368 pub schema: DFSchemaRef,
3370}
3371
3372impl DistinctOn {
3373 pub fn try_new(
3375 on_expr: Vec<Expr>,
3376 select_expr: Vec<Expr>,
3377 sort_expr: Option<Vec<SortExpr>>,
3378 input: Arc<LogicalPlan>,
3379 ) -> Result<Self> {
3380 if on_expr.is_empty() {
3381 return plan_err!("No `ON` expressions provided");
3382 }
3383
3384 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3385 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3386 .into_iter()
3387 .collect();
3388
3389 let dfschema = DFSchema::new_with_metadata(
3390 qualified_fields,
3391 input.schema().metadata().clone(),
3392 )?;
3393
3394 let mut distinct_on = DistinctOn {
3395 on_expr,
3396 select_expr,
3397 sort_expr: None,
3398 input,
3399 schema: Arc::new(dfschema),
3400 };
3401
3402 if let Some(sort_expr) = sort_expr {
3403 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3404 }
3405
3406 Ok(distinct_on)
3407 }
3408
3409 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3413 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3414
3415 let mut matched = true;
3417 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3418 if on != &sort.expr {
3419 matched = false;
3420 break;
3421 }
3422 }
3423
3424 if self.on_expr.len() > sort_expr.len() || !matched {
3425 return plan_err!(
3426 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3427 );
3428 }
3429
3430 self.sort_expr = Some(sort_expr);
3431 Ok(self)
3432 }
3433}
3434
3435impl PartialOrd for DistinctOn {
3437 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3438 #[derive(PartialEq, PartialOrd)]
3439 struct ComparableDistinctOn<'a> {
3440 pub on_expr: &'a Vec<Expr>,
3442 pub select_expr: &'a Vec<Expr>,
3444 pub sort_expr: &'a Option<Vec<SortExpr>>,
3448 pub input: &'a Arc<LogicalPlan>,
3450 }
3451 let comparable_self = ComparableDistinctOn {
3452 on_expr: &self.on_expr,
3453 select_expr: &self.select_expr,
3454 sort_expr: &self.sort_expr,
3455 input: &self.input,
3456 };
3457 let comparable_other = ComparableDistinctOn {
3458 on_expr: &other.on_expr,
3459 select_expr: &other.select_expr,
3460 sort_expr: &other.sort_expr,
3461 input: &other.input,
3462 };
3463 comparable_self.partial_cmp(&comparable_other)
3464 }
3465}
3466
3467#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3480#[non_exhaustive]
3482pub struct Aggregate {
3483 pub input: Arc<LogicalPlan>,
3485 pub group_expr: Vec<Expr>,
3487 pub aggr_expr: Vec<Expr>,
3489 pub schema: DFSchemaRef,
3491}
3492
3493impl Aggregate {
3494 pub fn try_new(
3496 input: Arc<LogicalPlan>,
3497 group_expr: Vec<Expr>,
3498 aggr_expr: Vec<Expr>,
3499 ) -> Result<Self> {
3500 let group_expr = enumerate_grouping_sets(group_expr)?;
3501
3502 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3503
3504 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3505
3506 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3507
3508 if is_grouping_set {
3510 qualified_fields = qualified_fields
3511 .into_iter()
3512 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3513 .collect::<Vec<_>>();
3514 qualified_fields.push((
3515 None,
3516 Field::new(
3517 Self::INTERNAL_GROUPING_ID,
3518 Self::grouping_id_type(qualified_fields.len()),
3519 false,
3520 )
3521 .into(),
3522 ));
3523 }
3524
3525 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3526
3527 let schema = DFSchema::new_with_metadata(
3528 qualified_fields,
3529 input.schema().metadata().clone(),
3530 )?;
3531
3532 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3533 }
3534
3535 pub fn try_new_with_schema(
3541 input: Arc<LogicalPlan>,
3542 group_expr: Vec<Expr>,
3543 aggr_expr: Vec<Expr>,
3544 schema: DFSchemaRef,
3545 ) -> Result<Self> {
3546 if group_expr.is_empty() && aggr_expr.is_empty() {
3547 return plan_err!(
3548 "Aggregate requires at least one grouping or aggregate expression. \
3549 Aggregate without grouping expressions nor aggregate expressions is \
3550 logically equivalent to, but less efficient than, VALUES producing \
3551 single row. Please use VALUES instead."
3552 );
3553 }
3554 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3555 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3556 return plan_err!(
3557 "Aggregate schema has wrong number of fields. Expected {} got {}",
3558 group_expr_count + aggr_expr.len(),
3559 schema.fields().len()
3560 );
3561 }
3562
3563 let aggregate_func_dependencies =
3564 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3565 let new_schema = schema.as_ref().clone();
3566 let schema = Arc::new(
3567 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3568 );
3569 Ok(Self {
3570 input,
3571 group_expr,
3572 aggr_expr,
3573 schema,
3574 })
3575 }
3576
3577 fn is_grouping_set(&self) -> bool {
3578 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3579 }
3580
3581 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3583 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3584 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3585 });
3586 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3587 if self.is_grouping_set() {
3588 exprs.push(&INTERNAL_ID_EXPR);
3589 }
3590 exprs.extend(self.aggr_expr.iter());
3591 debug_assert!(exprs.len() == self.schema.fields().len());
3592 Ok(exprs)
3593 }
3594
3595 pub fn group_expr_len(&self) -> Result<usize> {
3599 grouping_set_expr_count(&self.group_expr)
3600 }
3601
3602 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3607 if group_exprs <= 8 {
3608 DataType::UInt8
3609 } else if group_exprs <= 16 {
3610 DataType::UInt16
3611 } else if group_exprs <= 32 {
3612 DataType::UInt32
3613 } else {
3614 DataType::UInt64
3615 }
3616 }
3617
3618 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3636}
3637
3638impl PartialOrd for Aggregate {
3640 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3641 match self.input.partial_cmp(&other.input) {
3642 Some(Ordering::Equal) => {
3643 match self.group_expr.partial_cmp(&other.group_expr) {
3644 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3645 cmp => cmp,
3646 }
3647 }
3648 cmp => cmp,
3649 }
3650 }
3651}
3652
3653fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3655 group_expr
3656 .iter()
3657 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3658}
3659
3660fn calc_func_dependencies_for_aggregate(
3662 group_expr: &[Expr],
3664 input: &LogicalPlan,
3666 aggr_schema: &DFSchema,
3668) -> Result<FunctionalDependencies> {
3669 if !contains_grouping_set(group_expr) {
3675 let group_by_expr_names = group_expr
3676 .iter()
3677 .map(|item| item.schema_name().to_string())
3678 .collect::<IndexSet<_>>()
3679 .into_iter()
3680 .collect::<Vec<_>>();
3681 let aggregate_func_dependencies = aggregate_functional_dependencies(
3682 input.schema(),
3683 &group_by_expr_names,
3684 aggr_schema,
3685 );
3686 Ok(aggregate_func_dependencies)
3687 } else {
3688 Ok(FunctionalDependencies::empty())
3689 }
3690}
3691
3692fn calc_func_dependencies_for_project(
3695 exprs: &[Expr],
3696 input: &LogicalPlan,
3697) -> Result<FunctionalDependencies> {
3698 let input_fields = input.schema().field_names();
3699 let proj_indices = exprs
3701 .iter()
3702 .map(|expr| match expr {
3703 #[expect(deprecated)]
3704 Expr::Wildcard { qualifier, options } => {
3705 let wildcard_fields = exprlist_to_fields(
3706 vec![&Expr::Wildcard {
3707 qualifier: qualifier.clone(),
3708 options: options.clone(),
3709 }],
3710 input,
3711 )?;
3712 Ok::<_, DataFusionError>(
3713 wildcard_fields
3714 .into_iter()
3715 .filter_map(|(qualifier, f)| {
3716 let flat_name = qualifier
3717 .map(|t| format!("{}.{}", t, f.name()))
3718 .unwrap_or_else(|| f.name().clone());
3719 input_fields.iter().position(|item| *item == flat_name)
3720 })
3721 .collect::<Vec<_>>(),
3722 )
3723 }
3724 Expr::Alias(alias) => {
3725 let name = format!("{}", alias.expr);
3726 Ok(input_fields
3727 .iter()
3728 .position(|item| *item == name)
3729 .map(|i| vec![i])
3730 .unwrap_or(vec![]))
3731 }
3732 _ => {
3733 let name = format!("{expr}");
3734 Ok(input_fields
3735 .iter()
3736 .position(|item| *item == name)
3737 .map(|i| vec![i])
3738 .unwrap_or(vec![]))
3739 }
3740 })
3741 .collect::<Result<Vec<_>>>()?
3742 .into_iter()
3743 .flatten()
3744 .collect::<Vec<_>>();
3745
3746 Ok(input
3747 .schema()
3748 .functional_dependencies()
3749 .project_functional_dependencies(&proj_indices, exprs.len()))
3750}
3751
3752#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3754pub struct Sort {
3755 pub expr: Vec<SortExpr>,
3757 pub input: Arc<LogicalPlan>,
3759 pub fetch: Option<usize>,
3761}
3762
3763#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3765pub struct Join {
3766 pub left: Arc<LogicalPlan>,
3768 pub right: Arc<LogicalPlan>,
3770 pub on: Vec<(Expr, Expr)>,
3772 pub filter: Option<Expr>,
3774 pub join_type: JoinType,
3776 pub join_constraint: JoinConstraint,
3778 pub schema: DFSchemaRef,
3780 pub null_equality: NullEquality,
3782}
3783
3784impl Join {
3785 pub fn try_new(
3804 left: Arc<LogicalPlan>,
3805 right: Arc<LogicalPlan>,
3806 on: Vec<(Expr, Expr)>,
3807 filter: Option<Expr>,
3808 join_type: JoinType,
3809 join_constraint: JoinConstraint,
3810 null_equality: NullEquality,
3811 ) -> Result<Self> {
3812 let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3813
3814 Ok(Join {
3815 left,
3816 right,
3817 on,
3818 filter,
3819 join_type,
3820 join_constraint,
3821 schema: Arc::new(join_schema),
3822 null_equality,
3823 })
3824 }
3825
3826 pub fn try_new_with_project_input(
3829 original: &LogicalPlan,
3830 left: Arc<LogicalPlan>,
3831 right: Arc<LogicalPlan>,
3832 column_on: (Vec<Column>, Vec<Column>),
3833 ) -> Result<(Self, bool)> {
3834 let original_join = match original {
3835 LogicalPlan::Join(join) => join,
3836 _ => return plan_err!("Could not create join with project input"),
3837 };
3838
3839 let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3840 let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3841
3842 let mut requalified = false;
3843
3844 if original_join.join_type == JoinType::Inner
3847 || original_join.join_type == JoinType::Left
3848 || original_join.join_type == JoinType::Right
3849 || original_join.join_type == JoinType::Full
3850 {
3851 (left_sch, right_sch, requalified) =
3852 requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3853 }
3854
3855 let on: Vec<(Expr, Expr)> = column_on
3856 .0
3857 .into_iter()
3858 .zip(column_on.1)
3859 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3860 .collect();
3861
3862 let join_schema = build_join_schema(
3863 left_sch.schema(),
3864 right_sch.schema(),
3865 &original_join.join_type,
3866 )?;
3867
3868 Ok((
3869 Join {
3870 left,
3871 right,
3872 on,
3873 filter: original_join.filter.clone(),
3874 join_type: original_join.join_type,
3875 join_constraint: original_join.join_constraint,
3876 schema: Arc::new(join_schema),
3877 null_equality: original_join.null_equality,
3878 },
3879 requalified,
3880 ))
3881 }
3882}
3883
3884impl PartialOrd for Join {
3886 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3887 #[derive(PartialEq, PartialOrd)]
3888 struct ComparableJoin<'a> {
3889 pub left: &'a Arc<LogicalPlan>,
3891 pub right: &'a Arc<LogicalPlan>,
3893 pub on: &'a Vec<(Expr, Expr)>,
3895 pub filter: &'a Option<Expr>,
3897 pub join_type: &'a JoinType,
3899 pub join_constraint: &'a JoinConstraint,
3901 pub null_equality: &'a NullEquality,
3903 }
3904 let comparable_self = ComparableJoin {
3905 left: &self.left,
3906 right: &self.right,
3907 on: &self.on,
3908 filter: &self.filter,
3909 join_type: &self.join_type,
3910 join_constraint: &self.join_constraint,
3911 null_equality: &self.null_equality,
3912 };
3913 let comparable_other = ComparableJoin {
3914 left: &other.left,
3915 right: &other.right,
3916 on: &other.on,
3917 filter: &other.filter,
3918 join_type: &other.join_type,
3919 join_constraint: &other.join_constraint,
3920 null_equality: &other.null_equality,
3921 };
3922 comparable_self.partial_cmp(&comparable_other)
3923 }
3924}
3925
3926#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3928pub struct Subquery {
3929 pub subquery: Arc<LogicalPlan>,
3931 pub outer_ref_columns: Vec<Expr>,
3933 pub spans: Spans,
3935}
3936
3937impl Normalizeable for Subquery {
3938 fn can_normalize(&self) -> bool {
3939 false
3940 }
3941}
3942
3943impl NormalizeEq for Subquery {
3944 fn normalize_eq(&self, other: &Self) -> bool {
3945 *self.subquery == *other.subquery
3947 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3948 && self
3949 .outer_ref_columns
3950 .iter()
3951 .zip(other.outer_ref_columns.iter())
3952 .all(|(a, b)| a.normalize_eq(b))
3953 }
3954}
3955
3956impl Subquery {
3957 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3958 match plan {
3959 Expr::ScalarSubquery(it) => Ok(it),
3960 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3961 _ => plan_err!("Could not coerce into ScalarSubquery!"),
3962 }
3963 }
3964
3965 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3966 Subquery {
3967 subquery: plan,
3968 outer_ref_columns: self.outer_ref_columns.clone(),
3969 spans: Spans::new(),
3970 }
3971 }
3972}
3973
3974impl Debug for Subquery {
3975 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3976 write!(f, "<subquery>")
3977 }
3978}
3979
3980#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3986pub enum Partitioning {
3987 RoundRobinBatch(usize),
3989 Hash(Vec<Expr>, usize),
3992 DistributeBy(Vec<Expr>),
3994}
3995
3996#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4016pub struct ColumnUnnestList {
4017 pub output_column: Column,
4018 pub depth: usize,
4019}
4020
4021impl Display for ColumnUnnestList {
4022 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4023 write!(f, "{}|depth={}", self.output_column, self.depth)
4024 }
4025}
4026
4027#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4030pub struct Unnest {
4031 pub input: Arc<LogicalPlan>,
4033 pub exec_columns: Vec<Column>,
4035 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4038 pub struct_type_columns: Vec<usize>,
4041 pub dependency_indices: Vec<usize>,
4044 pub schema: DFSchemaRef,
4046 pub options: UnnestOptions,
4048}
4049
4050impl PartialOrd for Unnest {
4052 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4053 #[derive(PartialEq, PartialOrd)]
4054 struct ComparableUnnest<'a> {
4055 pub input: &'a Arc<LogicalPlan>,
4057 pub exec_columns: &'a Vec<Column>,
4059 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4062 pub struct_type_columns: &'a Vec<usize>,
4065 pub dependency_indices: &'a Vec<usize>,
4068 pub options: &'a UnnestOptions,
4070 }
4071 let comparable_self = ComparableUnnest {
4072 input: &self.input,
4073 exec_columns: &self.exec_columns,
4074 list_type_columns: &self.list_type_columns,
4075 struct_type_columns: &self.struct_type_columns,
4076 dependency_indices: &self.dependency_indices,
4077 options: &self.options,
4078 };
4079 let comparable_other = ComparableUnnest {
4080 input: &other.input,
4081 exec_columns: &other.exec_columns,
4082 list_type_columns: &other.list_type_columns,
4083 struct_type_columns: &other.struct_type_columns,
4084 dependency_indices: &other.dependency_indices,
4085 options: &other.options,
4086 };
4087 comparable_self.partial_cmp(&comparable_other)
4088 }
4089}
4090
4091impl Unnest {
4092 pub fn try_new(
4093 input: Arc<LogicalPlan>,
4094 exec_columns: Vec<Column>,
4095 options: UnnestOptions,
4096 ) -> Result<Self> {
4097 if exec_columns.is_empty() {
4098 return plan_err!("unnest plan requires at least 1 column to unnest");
4099 }
4100
4101 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4102 let mut struct_columns = vec![];
4103 let indices_to_unnest = exec_columns
4104 .iter()
4105 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4106 .collect::<Result<HashMap<usize, &Column>>>()?;
4107
4108 let input_schema = input.schema();
4109
4110 let mut dependency_indices = vec![];
4111 let fields = input_schema
4127 .iter()
4128 .enumerate()
4129 .map(|(index, (original_qualifier, original_field))| {
4130 match indices_to_unnest.get(&index) {
4131 Some(column_to_unnest) => {
4132 let recursions_on_column = options
4133 .recursions
4134 .iter()
4135 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4136 .collect::<Vec<_>>();
4137 let mut transformed_columns = recursions_on_column
4138 .iter()
4139 .map(|r| {
4140 list_columns.push((
4141 index,
4142 ColumnUnnestList {
4143 output_column: r.output_column.clone(),
4144 depth: r.depth,
4145 },
4146 ));
4147 Ok(get_unnested_columns(
4148 &r.output_column.name,
4149 original_field.data_type(),
4150 r.depth,
4151 )?
4152 .into_iter()
4153 .next()
4154 .unwrap()) })
4156 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4157 if transformed_columns.is_empty() {
4158 transformed_columns = get_unnested_columns(
4159 &column_to_unnest.name,
4160 original_field.data_type(),
4161 1,
4162 )?;
4163 match original_field.data_type() {
4164 DataType::Struct(_) => {
4165 struct_columns.push(index);
4166 }
4167 DataType::List(_)
4168 | DataType::FixedSizeList(_, _)
4169 | DataType::LargeList(_) => {
4170 list_columns.push((
4171 index,
4172 ColumnUnnestList {
4173 output_column: Column::from_name(
4174 &column_to_unnest.name,
4175 ),
4176 depth: 1,
4177 },
4178 ));
4179 }
4180 _ => {}
4181 };
4182 }
4183
4184 dependency_indices.extend(std::iter::repeat_n(
4186 index,
4187 transformed_columns.len(),
4188 ));
4189 Ok(transformed_columns
4190 .iter()
4191 .map(|(col, field)| {
4192 (col.relation.to_owned(), field.to_owned())
4193 })
4194 .collect())
4195 }
4196 None => {
4197 dependency_indices.push(index);
4198 Ok(vec![(
4199 original_qualifier.cloned(),
4200 Arc::clone(original_field),
4201 )])
4202 }
4203 }
4204 })
4205 .collect::<Result<Vec<_>>>()?
4206 .into_iter()
4207 .flatten()
4208 .collect::<Vec<_>>();
4209
4210 let metadata = input_schema.metadata().clone();
4211 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4212 let deps = input_schema.functional_dependencies().clone();
4214 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4215
4216 Ok(Unnest {
4217 input,
4218 exec_columns,
4219 list_type_columns: list_columns,
4220 struct_type_columns: struct_columns,
4221 dependency_indices,
4222 schema,
4223 options,
4224 })
4225 }
4226}
4227
4228fn get_unnested_columns(
4237 col_name: &String,
4238 data_type: &DataType,
4239 depth: usize,
4240) -> Result<Vec<(Column, Arc<Field>)>> {
4241 let mut qualified_columns = Vec::with_capacity(1);
4242
4243 match data_type {
4244 DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4245 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4246 let new_field = Arc::new(Field::new(
4247 col_name, data_type,
4248 true,
4251 ));
4252 let column = Column::from_name(col_name);
4253 qualified_columns.push((column, new_field));
4255 }
4256 DataType::Struct(fields) => {
4257 qualified_columns.extend(fields.iter().map(|f| {
4258 let new_name = format!("{}.{}", col_name, f.name());
4259 let column = Column::from_name(&new_name);
4260 let new_field = f.as_ref().clone().with_name(new_name);
4261 (column, Arc::new(new_field))
4263 }))
4264 }
4265 _ => {
4266 return internal_err!(
4267 "trying to unnest on invalid data type {:?}",
4268 data_type
4269 );
4270 }
4271 };
4272 Ok(qualified_columns)
4273}
4274
4275fn get_unnested_list_datatype_recursive(
4278 data_type: &DataType,
4279 depth: usize,
4280) -> Result<DataType> {
4281 match data_type {
4282 DataType::List(field)
4283 | DataType::FixedSizeList(field, _)
4284 | DataType::LargeList(field) => {
4285 if depth == 1 {
4286 return Ok(field.data_type().clone());
4287 }
4288 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4289 }
4290 _ => {}
4291 };
4292
4293 internal_err!("trying to unnest on invalid data type {:?}", data_type)
4294}
4295
4296#[cfg(test)]
4297mod tests {
4298 use super::*;
4299 use crate::builder::LogicalTableSource;
4300 use crate::logical_plan::table_scan;
4301 use crate::test::function_stub::{count, count_udaf};
4302 use crate::{
4303 binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
4304 GroupingSet,
4305 };
4306 use datafusion_common::tree_node::{
4307 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4308 };
4309 use datafusion_common::{not_impl_err, Constraint, ScalarValue};
4310 use insta::{assert_debug_snapshot, assert_snapshot};
4311 use std::hash::DefaultHasher;
4312
4313 fn employee_schema() -> Schema {
4314 Schema::new(vec![
4315 Field::new("id", DataType::Int32, false),
4316 Field::new("first_name", DataType::Utf8, false),
4317 Field::new("last_name", DataType::Utf8, false),
4318 Field::new("state", DataType::Utf8, false),
4319 Field::new("salary", DataType::Int32, false),
4320 ])
4321 }
4322
4323 fn display_plan() -> Result<LogicalPlan> {
4324 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4325 .build()?;
4326
4327 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4328 .filter(in_subquery(col("state"), Arc::new(plan1)))?
4329 .project(vec![col("id")])?
4330 .build()
4331 }
4332
4333 #[test]
4334 fn test_display_indent() -> Result<()> {
4335 let plan = display_plan()?;
4336
4337 assert_snapshot!(plan.display_indent(), @r"
4338 Projection: employee_csv.id
4339 Filter: employee_csv.state IN (<subquery>)
4340 Subquery:
4341 TableScan: employee_csv projection=[state]
4342 TableScan: employee_csv projection=[id, state]
4343 ");
4344 Ok(())
4345 }
4346
4347 #[test]
4348 fn test_display_indent_schema() -> Result<()> {
4349 let plan = display_plan()?;
4350
4351 assert_snapshot!(plan.display_indent_schema(), @r"
4352 Projection: employee_csv.id [id:Int32]
4353 Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4354 Subquery: [state:Utf8]
4355 TableScan: employee_csv projection=[state] [state:Utf8]
4356 TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4357 ");
4358 Ok(())
4359 }
4360
4361 #[test]
4362 fn test_display_subquery_alias() -> Result<()> {
4363 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4364 .build()?;
4365 let plan1 = Arc::new(plan1);
4366
4367 let plan =
4368 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4369 .project(vec![col("id"), exists(plan1).alias("exists")])?
4370 .build();
4371
4372 assert_snapshot!(plan?.display_indent(), @r"
4373 Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4374 Subquery:
4375 TableScan: employee_csv projection=[state]
4376 TableScan: employee_csv projection=[id, state]
4377 ");
4378 Ok(())
4379 }
4380
4381 #[test]
4382 fn test_display_graphviz() -> Result<()> {
4383 let plan = display_plan()?;
4384
4385 assert_snapshot!(plan.display_graphviz(), @r#"
4388 // Begin DataFusion GraphViz Plan,
4389 // display it online here: https://dreampuf.github.io/GraphvizOnline
4390
4391 digraph {
4392 subgraph cluster_1
4393 {
4394 graph[label="LogicalPlan"]
4395 2[shape=box label="Projection: employee_csv.id"]
4396 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4397 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4398 4[shape=box label="Subquery:"]
4399 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4400 5[shape=box label="TableScan: employee_csv projection=[state]"]
4401 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4402 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4403 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4404 }
4405 subgraph cluster_7
4406 {
4407 graph[label="Detailed LogicalPlan"]
4408 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4409 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4410 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4411 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4412 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4413 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4414 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4415 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4416 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4417 }
4418 }
4419 // End DataFusion GraphViz Plan
4420 "#);
4421 Ok(())
4422 }
4423
4424 #[test]
4425 fn test_display_pg_json() -> Result<()> {
4426 let plan = display_plan()?;
4427
4428 assert_snapshot!(plan.display_pg_json(), @r#"
4429 [
4430 {
4431 "Plan": {
4432 "Expressions": [
4433 "employee_csv.id"
4434 ],
4435 "Node Type": "Projection",
4436 "Output": [
4437 "id"
4438 ],
4439 "Plans": [
4440 {
4441 "Condition": "employee_csv.state IN (<subquery>)",
4442 "Node Type": "Filter",
4443 "Output": [
4444 "id",
4445 "state"
4446 ],
4447 "Plans": [
4448 {
4449 "Node Type": "Subquery",
4450 "Output": [
4451 "state"
4452 ],
4453 "Plans": [
4454 {
4455 "Node Type": "TableScan",
4456 "Output": [
4457 "state"
4458 ],
4459 "Plans": [],
4460 "Relation Name": "employee_csv"
4461 }
4462 ]
4463 },
4464 {
4465 "Node Type": "TableScan",
4466 "Output": [
4467 "id",
4468 "state"
4469 ],
4470 "Plans": [],
4471 "Relation Name": "employee_csv"
4472 }
4473 ]
4474 }
4475 ]
4476 }
4477 }
4478 ]
4479 "#);
4480 Ok(())
4481 }
4482
4483 #[derive(Debug, Default)]
4485 struct OkVisitor {
4486 strings: Vec<String>,
4487 }
4488
4489 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4490 type Node = LogicalPlan;
4491
4492 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4493 let s = match plan {
4494 LogicalPlan::Projection { .. } => "pre_visit Projection",
4495 LogicalPlan::Filter { .. } => "pre_visit Filter",
4496 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4497 _ => {
4498 return not_impl_err!("unknown plan type");
4499 }
4500 };
4501
4502 self.strings.push(s.into());
4503 Ok(TreeNodeRecursion::Continue)
4504 }
4505
4506 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4507 let s = match plan {
4508 LogicalPlan::Projection { .. } => "post_visit Projection",
4509 LogicalPlan::Filter { .. } => "post_visit Filter",
4510 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4511 _ => {
4512 return not_impl_err!("unknown plan type");
4513 }
4514 };
4515
4516 self.strings.push(s.into());
4517 Ok(TreeNodeRecursion::Continue)
4518 }
4519 }
4520
4521 #[test]
4522 fn visit_order() {
4523 let mut visitor = OkVisitor::default();
4524 let plan = test_plan();
4525 let res = plan.visit_with_subqueries(&mut visitor);
4526 assert!(res.is_ok());
4527
4528 assert_debug_snapshot!(visitor.strings, @r#"
4529 [
4530 "pre_visit Projection",
4531 "pre_visit Filter",
4532 "pre_visit TableScan",
4533 "post_visit TableScan",
4534 "post_visit Filter",
4535 "post_visit Projection",
4536 ]
4537 "#);
4538 }
4539
4540 #[derive(Debug, Default)]
4541 struct OptionalCounter {
4543 val: Option<usize>,
4544 }
4545
4546 impl OptionalCounter {
4547 fn new(val: usize) -> Self {
4548 Self { val: Some(val) }
4549 }
4550 fn dec(&mut self) -> bool {
4552 if Some(0) == self.val {
4553 true
4554 } else {
4555 self.val = self.val.take().map(|i| i - 1);
4556 false
4557 }
4558 }
4559 }
4560
4561 #[derive(Debug, Default)]
4562 struct StoppingVisitor {
4564 inner: OkVisitor,
4565 return_false_from_pre_in: OptionalCounter,
4567 return_false_from_post_in: OptionalCounter,
4569 }
4570
4571 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4572 type Node = LogicalPlan;
4573
4574 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4575 if self.return_false_from_pre_in.dec() {
4576 return Ok(TreeNodeRecursion::Stop);
4577 }
4578 self.inner.f_down(plan)?;
4579
4580 Ok(TreeNodeRecursion::Continue)
4581 }
4582
4583 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4584 if self.return_false_from_post_in.dec() {
4585 return Ok(TreeNodeRecursion::Stop);
4586 }
4587
4588 self.inner.f_up(plan)
4589 }
4590 }
4591
4592 #[test]
4594 fn early_stopping_pre_visit() {
4595 let mut visitor = StoppingVisitor {
4596 return_false_from_pre_in: OptionalCounter::new(2),
4597 ..Default::default()
4598 };
4599 let plan = test_plan();
4600 let res = plan.visit_with_subqueries(&mut visitor);
4601 assert!(res.is_ok());
4602
4603 assert_debug_snapshot!(
4604 visitor.inner.strings,
4605 @r#"
4606 [
4607 "pre_visit Projection",
4608 "pre_visit Filter",
4609 ]
4610 "#
4611 );
4612 }
4613
4614 #[test]
4615 fn early_stopping_post_visit() {
4616 let mut visitor = StoppingVisitor {
4617 return_false_from_post_in: OptionalCounter::new(1),
4618 ..Default::default()
4619 };
4620 let plan = test_plan();
4621 let res = plan.visit_with_subqueries(&mut visitor);
4622 assert!(res.is_ok());
4623
4624 assert_debug_snapshot!(
4625 visitor.inner.strings,
4626 @r#"
4627 [
4628 "pre_visit Projection",
4629 "pre_visit Filter",
4630 "pre_visit TableScan",
4631 "post_visit TableScan",
4632 ]
4633 "#
4634 );
4635 }
4636
4637 #[derive(Debug, Default)]
4638 struct ErrorVisitor {
4640 inner: OkVisitor,
4641 return_error_from_pre_in: OptionalCounter,
4643 return_error_from_post_in: OptionalCounter,
4645 }
4646
4647 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4648 type Node = LogicalPlan;
4649
4650 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4651 if self.return_error_from_pre_in.dec() {
4652 return not_impl_err!("Error in pre_visit");
4653 }
4654
4655 self.inner.f_down(plan)
4656 }
4657
4658 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4659 if self.return_error_from_post_in.dec() {
4660 return not_impl_err!("Error in post_visit");
4661 }
4662
4663 self.inner.f_up(plan)
4664 }
4665 }
4666
4667 #[test]
4668 fn error_pre_visit() {
4669 let mut visitor = ErrorVisitor {
4670 return_error_from_pre_in: OptionalCounter::new(2),
4671 ..Default::default()
4672 };
4673 let plan = test_plan();
4674 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4675 assert_snapshot!(
4676 res.strip_backtrace(),
4677 @"This feature is not implemented: Error in pre_visit"
4678 );
4679 assert_debug_snapshot!(
4680 visitor.inner.strings,
4681 @r#"
4682 [
4683 "pre_visit Projection",
4684 "pre_visit Filter",
4685 ]
4686 "#
4687 );
4688 }
4689
4690 #[test]
4691 fn error_post_visit() {
4692 let mut visitor = ErrorVisitor {
4693 return_error_from_post_in: OptionalCounter::new(1),
4694 ..Default::default()
4695 };
4696 let plan = test_plan();
4697 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4698 assert_snapshot!(
4699 res.strip_backtrace(),
4700 @"This feature is not implemented: Error in post_visit"
4701 );
4702 assert_debug_snapshot!(
4703 visitor.inner.strings,
4704 @r#"
4705 [
4706 "pre_visit Projection",
4707 "pre_visit Filter",
4708 "pre_visit TableScan",
4709 "post_visit TableScan",
4710 ]
4711 "#
4712 );
4713 }
4714
4715 #[test]
4716 fn test_partial_eq_hash_and_partial_ord() {
4717 let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4718 produce_one_row: true,
4719 schema: Arc::new(DFSchema::empty()),
4720 }));
4721
4722 let count_window_function = |schema| {
4723 Window::try_new_with_schema(
4724 vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4725 WindowFunctionDefinition::AggregateUDF(count_udaf()),
4726 vec![],
4727 )))],
4728 Arc::clone(&empty_values),
4729 Arc::new(schema),
4730 )
4731 .unwrap()
4732 };
4733
4734 let schema_without_metadata = || {
4735 DFSchema::from_unqualified_fields(
4736 vec![Field::new("count", DataType::Int64, false)].into(),
4737 HashMap::new(),
4738 )
4739 .unwrap()
4740 };
4741
4742 let schema_with_metadata = || {
4743 DFSchema::from_unqualified_fields(
4744 vec![Field::new("count", DataType::Int64, false)].into(),
4745 [("key".to_string(), "value".to_string())].into(),
4746 )
4747 .unwrap()
4748 };
4749
4750 let f = count_window_function(schema_without_metadata());
4752
4753 let f2 = count_window_function(schema_without_metadata());
4755 assert_eq!(f, f2);
4756 assert_eq!(hash(&f), hash(&f2));
4757 assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4758
4759 let o = count_window_function(schema_with_metadata());
4761 assert_ne!(f, o);
4762 assert_ne!(hash(&f), hash(&o)); assert_eq!(f.partial_cmp(&o), None);
4764 }
4765
4766 fn hash<T: Hash>(value: &T) -> u64 {
4767 let hasher = &mut DefaultHasher::new();
4768 value.hash(hasher);
4769 hasher.finish()
4770 }
4771
4772 #[test]
4773 fn projection_expr_schema_mismatch() -> Result<()> {
4774 let empty_schema = Arc::new(DFSchema::empty());
4775 let p = Projection::try_new_with_schema(
4776 vec![col("a")],
4777 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4778 produce_one_row: false,
4779 schema: Arc::clone(&empty_schema),
4780 })),
4781 empty_schema,
4782 );
4783 assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4784 Ok(())
4785 }
4786
4787 fn test_plan() -> LogicalPlan {
4788 let schema = Schema::new(vec![
4789 Field::new("id", DataType::Int32, false),
4790 Field::new("state", DataType::Utf8, false),
4791 ]);
4792
4793 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4794 .unwrap()
4795 .filter(col("state").eq(lit("CO")))
4796 .unwrap()
4797 .project(vec![col("id")])
4798 .unwrap()
4799 .build()
4800 .unwrap()
4801 }
4802
4803 #[test]
4804 fn test_replace_invalid_placeholder() {
4805 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4807
4808 let plan = table_scan(TableReference::none(), &schema, None)
4809 .unwrap()
4810 .filter(col("id").eq(placeholder("")))
4811 .unwrap()
4812 .build()
4813 .unwrap();
4814
4815 let param_values = vec![ScalarValue::Int32(Some(42))];
4816 plan.replace_params_with_values(¶m_values.clone().into())
4817 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4818
4819 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4821
4822 let plan = table_scan(TableReference::none(), &schema, None)
4823 .unwrap()
4824 .filter(col("id").eq(placeholder("$0")))
4825 .unwrap()
4826 .build()
4827 .unwrap();
4828
4829 plan.replace_params_with_values(¶m_values.clone().into())
4830 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4831
4832 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4834
4835 let plan = table_scan(TableReference::none(), &schema, None)
4836 .unwrap()
4837 .filter(col("id").eq(placeholder("$00")))
4838 .unwrap()
4839 .build()
4840 .unwrap();
4841
4842 plan.replace_params_with_values(¶m_values.into())
4843 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4844 }
4845
4846 #[test]
4847 fn test_nullable_schema_after_grouping_set() {
4848 let schema = Schema::new(vec![
4849 Field::new("foo", DataType::Int32, false),
4850 Field::new("bar", DataType::Int32, false),
4851 ]);
4852
4853 let plan = table_scan(TableReference::none(), &schema, None)
4854 .unwrap()
4855 .aggregate(
4856 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4857 vec![col("foo")],
4858 vec![col("bar")],
4859 ]))],
4860 vec![count(lit(true))],
4861 )
4862 .unwrap()
4863 .build()
4864 .unwrap();
4865
4866 let output_schema = plan.schema();
4867
4868 assert!(output_schema
4869 .field_with_name(None, "foo")
4870 .unwrap()
4871 .is_nullable(),);
4872 assert!(output_schema
4873 .field_with_name(None, "bar")
4874 .unwrap()
4875 .is_nullable());
4876 }
4877
4878 #[test]
4879 fn test_filter_is_scalar() {
4880 let schema =
4882 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4883
4884 let source = Arc::new(LogicalTableSource::new(schema));
4885 let schema = Arc::new(
4886 DFSchema::try_from_qualified_schema(
4887 TableReference::bare("tab"),
4888 &source.schema(),
4889 )
4890 .unwrap(),
4891 );
4892 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4893 table_name: TableReference::bare("tab"),
4894 source: Arc::clone(&source) as Arc<dyn TableSource>,
4895 projection: None,
4896 projected_schema: Arc::clone(&schema),
4897 filters: vec![],
4898 fetch: None,
4899 }));
4900 let col = schema.field_names()[0].clone();
4901
4902 let filter = Filter::try_new(
4903 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4904 scan,
4905 )
4906 .unwrap();
4907 assert!(!filter.is_scalar());
4908 let unique_schema = Arc::new(
4909 schema
4910 .as_ref()
4911 .clone()
4912 .with_functional_dependencies(
4913 FunctionalDependencies::new_from_constraints(
4914 Some(&Constraints::new_unverified(vec![Constraint::Unique(
4915 vec![0],
4916 )])),
4917 1,
4918 ),
4919 )
4920 .unwrap(),
4921 );
4922 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4923 table_name: TableReference::bare("tab"),
4924 source,
4925 projection: None,
4926 projected_schema: Arc::clone(&unique_schema),
4927 filters: vec![],
4928 fetch: None,
4929 }));
4930 let col = schema.field_names()[0].clone();
4931
4932 let filter =
4933 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4934 assert!(filter.is_scalar());
4935 }
4936
4937 #[test]
4938 fn test_transform_explain() {
4939 let schema = Schema::new(vec![
4940 Field::new("foo", DataType::Int32, false),
4941 Field::new("bar", DataType::Int32, false),
4942 ]);
4943
4944 let plan = table_scan(TableReference::none(), &schema, None)
4945 .unwrap()
4946 .explain(false, false)
4947 .unwrap()
4948 .build()
4949 .unwrap();
4950
4951 let external_filter = col("foo").eq(lit(true));
4952
4953 let plan = plan
4956 .transform(|plan| match plan {
4957 LogicalPlan::TableScan(table) => {
4958 let filter = Filter::try_new(
4959 external_filter.clone(),
4960 Arc::new(LogicalPlan::TableScan(table)),
4961 )
4962 .unwrap();
4963 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4964 }
4965 x => Ok(Transformed::no(x)),
4966 })
4967 .data()
4968 .unwrap();
4969
4970 let actual = format!("{}", plan.display_indent());
4971 assert_snapshot!(actual, @r"
4972 Explain
4973 Filter: foo = Boolean(true)
4974 TableScan: ?table?
4975 ")
4976 }
4977
4978 #[test]
4979 fn test_plan_partial_ord() {
4980 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
4981 produce_one_row: false,
4982 schema: Arc::new(DFSchema::empty()),
4983 });
4984
4985 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
4986 schema: Arc::new(Schema::new(vec![Field::new(
4987 "foo",
4988 DataType::Int32,
4989 false,
4990 )])),
4991 output_schema: DFSchemaRef::new(DFSchema::empty()),
4992 });
4993
4994 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
4995 schema: Arc::new(Schema::new(vec![Field::new(
4996 "foo",
4997 DataType::Int32,
4998 false,
4999 )])),
5000 output_schema: DFSchemaRef::new(DFSchema::empty()),
5001 });
5002
5003 assert_eq!(
5004 empty_relation.partial_cmp(&describe_table),
5005 Some(Ordering::Less)
5006 );
5007 assert_eq!(
5008 describe_table.partial_cmp(&empty_relation),
5009 Some(Ordering::Greater)
5010 );
5011 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5012 }
5013
5014 #[test]
5015 fn test_limit_with_new_children() {
5016 let input = Arc::new(LogicalPlan::Values(Values {
5017 schema: Arc::new(DFSchema::empty()),
5018 values: vec![vec![]],
5019 }));
5020 let cases = [
5021 LogicalPlan::Limit(Limit {
5022 skip: None,
5023 fetch: None,
5024 input: Arc::clone(&input),
5025 }),
5026 LogicalPlan::Limit(Limit {
5027 skip: None,
5028 fetch: Some(Box::new(Expr::Literal(
5029 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5030 None,
5031 ))),
5032 input: Arc::clone(&input),
5033 }),
5034 LogicalPlan::Limit(Limit {
5035 skip: Some(Box::new(Expr::Literal(
5036 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5037 None,
5038 ))),
5039 fetch: None,
5040 input: Arc::clone(&input),
5041 }),
5042 LogicalPlan::Limit(Limit {
5043 skip: Some(Box::new(Expr::Literal(
5044 ScalarValue::new_one(&DataType::UInt32).unwrap(),
5045 None,
5046 ))),
5047 fetch: Some(Box::new(Expr::Literal(
5048 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5049 None,
5050 ))),
5051 input,
5052 }),
5053 ];
5054
5055 for limit in cases {
5056 let new_limit = limit
5057 .with_new_exprs(
5058 limit.expressions(),
5059 limit.inputs().into_iter().cloned().collect(),
5060 )
5061 .unwrap();
5062 assert_eq!(limit, new_limit);
5063 }
5064 }
5065
5066 #[test]
5067 fn test_with_subqueries_jump() {
5068 let subquery_schema =
5073 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5074
5075 let subquery_plan =
5076 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5077 .unwrap()
5078 .filter(col("sub_id").eq(lit(0)))
5079 .unwrap()
5080 .build()
5081 .unwrap();
5082
5083 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5084
5085 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5086 .unwrap()
5087 .filter(col("id").eq(lit(0)))
5088 .unwrap()
5089 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5090 .unwrap()
5091 .build()
5092 .unwrap();
5093
5094 let mut filter_found = false;
5095 plan.apply_with_subqueries(|plan| {
5096 match plan {
5097 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5098 LogicalPlan::Filter(..) => filter_found = true,
5099 _ => {}
5100 }
5101 Ok(TreeNodeRecursion::Continue)
5102 })
5103 .unwrap();
5104 assert!(!filter_found);
5105
5106 struct ProjectJumpVisitor {
5107 filter_found: bool,
5108 }
5109
5110 impl ProjectJumpVisitor {
5111 fn new() -> Self {
5112 Self {
5113 filter_found: false,
5114 }
5115 }
5116 }
5117
5118 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5119 type Node = LogicalPlan;
5120
5121 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5122 match node {
5123 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5124 LogicalPlan::Filter(..) => self.filter_found = true,
5125 _ => {}
5126 }
5127 Ok(TreeNodeRecursion::Continue)
5128 }
5129 }
5130
5131 let mut visitor = ProjectJumpVisitor::new();
5132 plan.visit_with_subqueries(&mut visitor).unwrap();
5133 assert!(!visitor.filter_found);
5134
5135 let mut filter_found = false;
5136 plan.clone()
5137 .transform_down_with_subqueries(|plan| {
5138 match plan {
5139 LogicalPlan::Projection(..) => {
5140 return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
5141 }
5142 LogicalPlan::Filter(..) => filter_found = true,
5143 _ => {}
5144 }
5145 Ok(Transformed::no(plan))
5146 })
5147 .unwrap();
5148 assert!(!filter_found);
5149
5150 let mut filter_found = false;
5151 plan.clone()
5152 .transform_down_up_with_subqueries(
5153 |plan| {
5154 match plan {
5155 LogicalPlan::Projection(..) => {
5156 return Ok(Transformed::new(
5157 plan,
5158 false,
5159 TreeNodeRecursion::Jump,
5160 ))
5161 }
5162 LogicalPlan::Filter(..) => filter_found = true,
5163 _ => {}
5164 }
5165 Ok(Transformed::no(plan))
5166 },
5167 |plan| Ok(Transformed::no(plan)),
5168 )
5169 .unwrap();
5170 assert!(!filter_found);
5171
5172 struct ProjectJumpRewriter {
5173 filter_found: bool,
5174 }
5175
5176 impl ProjectJumpRewriter {
5177 fn new() -> Self {
5178 Self {
5179 filter_found: false,
5180 }
5181 }
5182 }
5183
5184 impl TreeNodeRewriter for ProjectJumpRewriter {
5185 type Node = LogicalPlan;
5186
5187 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5188 match node {
5189 LogicalPlan::Projection(..) => {
5190 return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
5191 }
5192 LogicalPlan::Filter(..) => self.filter_found = true,
5193 _ => {}
5194 }
5195 Ok(Transformed::no(node))
5196 }
5197 }
5198
5199 let mut rewriter = ProjectJumpRewriter::new();
5200 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5201 assert!(!rewriter.filter_found);
5202 }
5203
5204 #[test]
5205 fn test_with_unresolved_placeholders() {
5206 let field_name = "id";
5207 let placeholder_value = "$1";
5208 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5209
5210 let plan = table_scan(TableReference::none(), &schema, None)
5211 .unwrap()
5212 .filter(col(field_name).eq(placeholder(placeholder_value)))
5213 .unwrap()
5214 .build()
5215 .unwrap();
5216
5217 let params = plan.get_parameter_types().unwrap();
5219 assert_eq!(params.len(), 1);
5220
5221 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5222 assert_eq!(parameter_type, None);
5223 }
5224
5225 #[test]
5226 fn test_join_with_new_exprs() -> Result<()> {
5227 fn create_test_join(
5228 on: Vec<(Expr, Expr)>,
5229 filter: Option<Expr>,
5230 ) -> Result<LogicalPlan> {
5231 let schema = Schema::new(vec![
5232 Field::new("a", DataType::Int32, false),
5233 Field::new("b", DataType::Int32, false),
5234 ]);
5235
5236 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5237 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5238
5239 Ok(LogicalPlan::Join(Join {
5240 left: Arc::new(
5241 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5242 ),
5243 right: Arc::new(
5244 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5245 ),
5246 on,
5247 filter,
5248 join_type: JoinType::Inner,
5249 join_constraint: JoinConstraint::On,
5250 schema: Arc::new(left_schema.join(&right_schema)?),
5251 null_equality: NullEquality::NullEqualsNothing,
5252 }))
5253 }
5254
5255 {
5256 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5257 let LogicalPlan::Join(join) = join.with_new_exprs(
5258 join.expressions(),
5259 join.inputs().into_iter().cloned().collect(),
5260 )?
5261 else {
5262 unreachable!()
5263 };
5264 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5265 assert_eq!(join.filter, None);
5266 }
5267
5268 {
5269 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5270 let LogicalPlan::Join(join) = join.with_new_exprs(
5271 join.expressions(),
5272 join.inputs().into_iter().cloned().collect(),
5273 )?
5274 else {
5275 unreachable!()
5276 };
5277 assert_eq!(join.on, vec![]);
5278 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5279 }
5280
5281 {
5282 let join = create_test_join(
5283 vec![(col("t1.a"), (col("t2.a")))],
5284 Some(col("t1.b").gt(col("t2.b"))),
5285 )?;
5286 let LogicalPlan::Join(join) = join.with_new_exprs(
5287 join.expressions(),
5288 join.inputs().into_iter().cloned().collect(),
5289 )?
5290 else {
5291 unreachable!()
5292 };
5293 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5294 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5295 }
5296
5297 {
5298 let join = create_test_join(
5299 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5300 None,
5301 )?;
5302 let LogicalPlan::Join(join) = join.with_new_exprs(
5303 vec![
5304 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5305 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5306 col("t1.b"),
5307 col("t2.b"),
5308 lit(true),
5309 ],
5310 join.inputs().into_iter().cloned().collect(),
5311 )?
5312 else {
5313 unreachable!()
5314 };
5315 assert_eq!(
5316 join.on,
5317 vec![
5318 (
5319 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5320 binary_expr(col("t2.a"), Operator::Plus, lit(2))
5321 ),
5322 (col("t1.b"), (col("t2.b")))
5323 ]
5324 );
5325 assert_eq!(join.filter, Some(lit(true)));
5326 }
5327
5328 Ok(())
5329 }
5330
5331 #[test]
5332 fn test_join_try_new() -> Result<()> {
5333 let schema = Schema::new(vec![
5334 Field::new("a", DataType::Int32, false),
5335 Field::new("b", DataType::Int32, false),
5336 ]);
5337
5338 let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5339
5340 let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5341
5342 let join_types = vec![
5343 JoinType::Inner,
5344 JoinType::Left,
5345 JoinType::Right,
5346 JoinType::Full,
5347 JoinType::LeftSemi,
5348 JoinType::LeftAnti,
5349 JoinType::RightSemi,
5350 JoinType::RightAnti,
5351 JoinType::LeftMark,
5352 ];
5353
5354 for join_type in join_types {
5355 let join = Join::try_new(
5356 Arc::new(left_scan.clone()),
5357 Arc::new(right_scan.clone()),
5358 vec![(col("t1.a"), col("t2.a"))],
5359 Some(col("t1.b").gt(col("t2.b"))),
5360 join_type,
5361 JoinConstraint::On,
5362 NullEquality::NullEqualsNothing,
5363 )?;
5364
5365 match join_type {
5366 JoinType::LeftSemi | JoinType::LeftAnti => {
5367 assert_eq!(join.schema.fields().len(), 2);
5368
5369 let fields = join.schema.fields();
5370 assert_eq!(
5371 fields[0].name(),
5372 "a",
5373 "First field should be 'a' from left table"
5374 );
5375 assert_eq!(
5376 fields[1].name(),
5377 "b",
5378 "Second field should be 'b' from left table"
5379 );
5380 }
5381 JoinType::RightSemi | JoinType::RightAnti => {
5382 assert_eq!(join.schema.fields().len(), 2);
5383
5384 let fields = join.schema.fields();
5385 assert_eq!(
5386 fields[0].name(),
5387 "a",
5388 "First field should be 'a' from right table"
5389 );
5390 assert_eq!(
5391 fields[1].name(),
5392 "b",
5393 "Second field should be 'b' from right table"
5394 );
5395 }
5396 JoinType::LeftMark => {
5397 assert_eq!(join.schema.fields().len(), 3);
5398
5399 let fields = join.schema.fields();
5400 assert_eq!(
5401 fields[0].name(),
5402 "a",
5403 "First field should be 'a' from left table"
5404 );
5405 assert_eq!(
5406 fields[1].name(),
5407 "b",
5408 "Second field should be 'b' from left table"
5409 );
5410 assert_eq!(
5411 fields[2].name(),
5412 "mark",
5413 "Third field should be the mark column"
5414 );
5415
5416 assert!(!fields[0].is_nullable());
5417 assert!(!fields[1].is_nullable());
5418 assert!(!fields[2].is_nullable());
5419 }
5420 _ => {
5421 assert_eq!(join.schema.fields().len(), 4);
5422
5423 let fields = join.schema.fields();
5424 assert_eq!(
5425 fields[0].name(),
5426 "a",
5427 "First field should be 'a' from left table"
5428 );
5429 assert_eq!(
5430 fields[1].name(),
5431 "b",
5432 "Second field should be 'b' from left table"
5433 );
5434 assert_eq!(
5435 fields[2].name(),
5436 "a",
5437 "Third field should be 'a' from right table"
5438 );
5439 assert_eq!(
5440 fields[3].name(),
5441 "b",
5442 "Fourth field should be 'b' from right table"
5443 );
5444
5445 if join_type == JoinType::Left {
5446 assert!(!fields[0].is_nullable());
5448 assert!(!fields[1].is_nullable());
5449 assert!(fields[2].is_nullable());
5451 assert!(fields[3].is_nullable());
5452 } else if join_type == JoinType::Right {
5453 assert!(fields[0].is_nullable());
5455 assert!(fields[1].is_nullable());
5456 assert!(!fields[2].is_nullable());
5458 assert!(!fields[3].is_nullable());
5459 } else if join_type == JoinType::Full {
5460 assert!(fields[0].is_nullable());
5461 assert!(fields[1].is_nullable());
5462 assert!(fields[2].is_nullable());
5463 assert!(fields[3].is_nullable());
5464 }
5465 }
5466 }
5467
5468 assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5469 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5470 assert_eq!(join.join_type, join_type);
5471 assert_eq!(join.join_constraint, JoinConstraint::On);
5472 assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5473 }
5474
5475 Ok(())
5476 }
5477
5478 #[test]
5479 fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5480 let left_schema = Schema::new(vec![
5481 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), ]);
5485
5486 let right_schema = Schema::new(vec![
5487 Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), Field::new("value", DataType::Float64, true), ]);
5491
5492 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5493
5494 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5495
5496 {
5498 let join = Join::try_new(
5501 Arc::new(left_plan.clone()),
5502 Arc::new(right_plan.clone()),
5503 vec![(col("t1.id"), col("t2.id"))],
5504 None,
5505 JoinType::Inner,
5506 JoinConstraint::Using,
5507 NullEquality::NullEqualsNothing,
5508 )?;
5509
5510 let fields = join.schema.fields();
5511
5512 assert_eq!(fields.len(), 6);
5513
5514 assert_eq!(
5515 fields[0].name(),
5516 "id",
5517 "First field should be 'id' from left table"
5518 );
5519 assert_eq!(
5520 fields[1].name(),
5521 "name",
5522 "Second field should be 'name' from left table"
5523 );
5524 assert_eq!(
5525 fields[2].name(),
5526 "value",
5527 "Third field should be 'value' from left table"
5528 );
5529 assert_eq!(
5530 fields[3].name(),
5531 "id",
5532 "Fourth field should be 'id' from right table"
5533 );
5534 assert_eq!(
5535 fields[4].name(),
5536 "category",
5537 "Fifth field should be 'category' from right table"
5538 );
5539 assert_eq!(
5540 fields[5].name(),
5541 "value",
5542 "Sixth field should be 'value' from right table"
5543 );
5544
5545 assert_eq!(join.join_constraint, JoinConstraint::Using);
5546 }
5547
5548 {
5550 let join = Join::try_new(
5552 Arc::new(left_plan.clone()),
5553 Arc::new(right_plan.clone()),
5554 vec![(col("t1.id"), col("t2.id"))], Some(col("t1.value").lt(col("t2.value"))), JoinType::Inner,
5557 JoinConstraint::On,
5558 NullEquality::NullEqualsNothing,
5559 )?;
5560
5561 let fields = join.schema.fields();
5562 assert_eq!(fields.len(), 6);
5563
5564 assert_eq!(
5565 fields[0].name(),
5566 "id",
5567 "First field should be 'id' from left table"
5568 );
5569 assert_eq!(
5570 fields[1].name(),
5571 "name",
5572 "Second field should be 'name' from left table"
5573 );
5574 assert_eq!(
5575 fields[2].name(),
5576 "value",
5577 "Third field should be 'value' from left table"
5578 );
5579 assert_eq!(
5580 fields[3].name(),
5581 "id",
5582 "Fourth field should be 'id' from right table"
5583 );
5584 assert_eq!(
5585 fields[4].name(),
5586 "category",
5587 "Fifth field should be 'category' from right table"
5588 );
5589 assert_eq!(
5590 fields[5].name(),
5591 "value",
5592 "Sixth field should be 'value' from right table"
5593 );
5594
5595 assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5596 }
5597
5598 {
5600 let join = Join::try_new(
5601 Arc::new(left_plan.clone()),
5602 Arc::new(right_plan.clone()),
5603 vec![(col("t1.id"), col("t2.id"))],
5604 None,
5605 JoinType::Inner,
5606 JoinConstraint::On,
5607 NullEquality::NullEqualsNull,
5608 )?;
5609
5610 assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5611 }
5612
5613 Ok(())
5614 }
5615
5616 #[test]
5617 fn test_join_try_new_schema_validation() -> Result<()> {
5618 let left_schema = Schema::new(vec![
5619 Field::new("id", DataType::Int32, false),
5620 Field::new("name", DataType::Utf8, false),
5621 Field::new("value", DataType::Float64, true),
5622 ]);
5623
5624 let right_schema = Schema::new(vec![
5625 Field::new("id", DataType::Int32, false),
5626 Field::new("category", DataType::Utf8, true),
5627 Field::new("code", DataType::Int16, false),
5628 ]);
5629
5630 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5631
5632 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5633
5634 let join_types = vec![
5635 JoinType::Inner,
5636 JoinType::Left,
5637 JoinType::Right,
5638 JoinType::Full,
5639 ];
5640
5641 for join_type in join_types {
5642 let join = Join::try_new(
5643 Arc::new(left_plan.clone()),
5644 Arc::new(right_plan.clone()),
5645 vec![(col("t1.id"), col("t2.id"))],
5646 Some(col("t1.value").gt(lit(5.0))),
5647 join_type,
5648 JoinConstraint::On,
5649 NullEquality::NullEqualsNothing,
5650 )?;
5651
5652 let fields = join.schema.fields();
5653 assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type:?} join");
5654
5655 for (i, field) in fields.iter().enumerate() {
5656 let expected_nullable = match (i, &join_type) {
5657 (0, JoinType::Right | JoinType::Full) => true, (1, JoinType::Right | JoinType::Full) => true, (2, _) => true, (3, JoinType::Left | JoinType::Full) => true, (4, _) => true, (5, JoinType::Left | JoinType::Full) => true, _ => false,
5668 };
5669
5670 assert_eq!(
5671 field.is_nullable(),
5672 expected_nullable,
5673 "Field {} ({}) nullability incorrect for {:?} join",
5674 i,
5675 field.name(),
5676 join_type
5677 );
5678 }
5679 }
5680
5681 let using_join = Join::try_new(
5682 Arc::new(left_plan.clone()),
5683 Arc::new(right_plan.clone()),
5684 vec![(col("t1.id"), col("t2.id"))],
5685 None,
5686 JoinType::Inner,
5687 JoinConstraint::Using,
5688 NullEquality::NullEqualsNothing,
5689 )?;
5690
5691 assert_eq!(
5692 using_join.schema.fields().len(),
5693 6,
5694 "USING join should have all fields"
5695 );
5696 assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5697
5698 Ok(())
5699 }
5700}