1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::str::FromStr;
25use std::sync::{Arc, LazyLock};
26
27use super::dml::CopyTo;
28use super::invariants::{
29 assert_always_invariants_at_current_node, assert_executable_invariants,
30 InvariantLevel,
31};
32use super::DdlStatement;
33use crate::builder::{unique_field_aliases, unnest_with_options};
34use crate::expr::{
35 intersect_metadata_for_union, Alias, Placeholder, Sort as SortExpr, WindowFunction,
36 WindowFunctionParams,
37};
38use crate::expr_rewriter::{
39 create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
40};
41use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
42use crate::logical_plan::extension::UserDefinedLogicalNode;
43use crate::logical_plan::{DmlStatement, Statement};
44use crate::utils::{
45 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
46 grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
47};
48use crate::{
49 build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
50 CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
51 Operator, Prepare, TableProviderFilterPushDown, TableSource,
52 WindowFunctionDefinition,
53};
54
55use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
56use datafusion_common::cse::{NormalizeEq, Normalizeable};
57use datafusion_common::tree_node::{
58 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61 aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
62 DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
63 FunctionalDependencies, NullEquality, ParamValues, Result, ScalarValue, Spans,
64 TableReference, UnnestOptions,
65};
66use indexmap::IndexSet;
67
68use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
207pub enum LogicalPlan {
208 Projection(Projection),
211 Filter(Filter),
220 Window(Window),
226 Aggregate(Aggregate),
232 Sort(Sort),
235 Join(Join),
238 Repartition(Repartition),
242 Union(Union),
246 TableScan(TableScan),
249 EmptyRelation(EmptyRelation),
253 Subquery(Subquery),
256 SubqueryAlias(SubqueryAlias),
258 Limit(Limit),
260 Statement(Statement),
262 Values(Values),
267 Explain(Explain),
270 Analyze(Analyze),
274 Extension(Extension),
277 Distinct(Distinct),
280 Dml(DmlStatement),
282 Ddl(DdlStatement),
284 Copy(CopyTo),
286 DescribeTable(DescribeTable),
289 Unnest(Unnest),
292 RecursiveQuery(RecursiveQuery),
294}
295
296impl Default for LogicalPlan {
297 fn default() -> Self {
298 LogicalPlan::EmptyRelation(EmptyRelation {
299 produce_one_row: false,
300 schema: Arc::new(DFSchema::empty()),
301 })
302 }
303}
304
305impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
306 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
307 &'a self,
308 mut f: F,
309 ) -> Result<TreeNodeRecursion> {
310 f(self)
311 }
312
313 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
314 self,
315 mut f: F,
316 ) -> Result<Transformed<Self>> {
317 f(self)
318 }
319}
320
321impl LogicalPlan {
322 pub fn schema(&self) -> &DFSchemaRef {
324 match self {
325 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
326 LogicalPlan::Values(Values { schema, .. }) => schema,
327 LogicalPlan::TableScan(TableScan {
328 projected_schema, ..
329 }) => projected_schema,
330 LogicalPlan::Projection(Projection { schema, .. }) => schema,
331 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
332 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
333 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
334 LogicalPlan::Window(Window { schema, .. }) => schema,
335 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
336 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
337 LogicalPlan::Join(Join { schema, .. }) => schema,
338 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
339 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
340 LogicalPlan::Statement(statement) => statement.schema(),
341 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
342 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
343 LogicalPlan::Explain(explain) => &explain.schema,
344 LogicalPlan::Analyze(analyze) => &analyze.schema,
345 LogicalPlan::Extension(extension) => extension.node.schema(),
346 LogicalPlan::Union(Union { schema, .. }) => schema,
347 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
348 output_schema
349 }
350 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
351 LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
352 LogicalPlan::Ddl(ddl) => ddl.schema(),
353 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
354 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
355 static_term.schema()
357 }
358 }
359 }
360
361 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
364 match self {
365 LogicalPlan::Window(_)
366 | LogicalPlan::Projection(_)
367 | LogicalPlan::Aggregate(_)
368 | LogicalPlan::Unnest(_)
369 | LogicalPlan::Join(_) => self
370 .inputs()
371 .iter()
372 .map(|input| input.schema().as_ref())
373 .collect(),
374 _ => vec![],
375 }
376 }
377
378 pub fn explain_schema() -> SchemaRef {
380 SchemaRef::new(Schema::new(vec![
381 Field::new("plan_type", DataType::Utf8, false),
382 Field::new("plan", DataType::Utf8, false),
383 ]))
384 }
385
386 pub fn describe_schema() -> Schema {
388 Schema::new(vec![
389 Field::new("column_name", DataType::Utf8, false),
390 Field::new("data_type", DataType::Utf8, false),
391 Field::new("is_nullable", DataType::Utf8, false),
392 ])
393 }
394
395 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
412 let mut exprs = vec![];
413 self.apply_expressions(|e| {
414 exprs.push(e.clone());
415 Ok(TreeNodeRecursion::Continue)
416 })
417 .unwrap();
419 exprs
420 }
421
422 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
425 let mut exprs = vec![];
426 self.apply_expressions(|e| {
427 find_out_reference_exprs(e).into_iter().for_each(|e| {
428 if !exprs.contains(&e) {
429 exprs.push(e)
430 }
431 });
432 Ok(TreeNodeRecursion::Continue)
433 })
434 .unwrap();
436 self.inputs()
437 .into_iter()
438 .flat_map(|child| child.all_out_ref_exprs())
439 .for_each(|e| {
440 if !exprs.contains(&e) {
441 exprs.push(e)
442 }
443 });
444 exprs
445 }
446
447 pub fn inputs(&self) -> Vec<&LogicalPlan> {
451 match self {
452 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
453 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
454 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
455 LogicalPlan::Window(Window { input, .. }) => vec![input],
456 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
457 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
458 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
459 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
460 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
461 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
462 LogicalPlan::Extension(extension) => extension.node.inputs(),
463 LogicalPlan::Union(Union { inputs, .. }) => {
464 inputs.iter().map(|arc| arc.as_ref()).collect()
465 }
466 LogicalPlan::Distinct(
467 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
468 ) => vec![input],
469 LogicalPlan::Explain(explain) => vec![&explain.plan],
470 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
471 LogicalPlan::Dml(write) => vec![&write.input],
472 LogicalPlan::Copy(copy) => vec![©.input],
473 LogicalPlan::Ddl(ddl) => ddl.inputs(),
474 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
475 LogicalPlan::RecursiveQuery(RecursiveQuery {
476 static_term,
477 recursive_term,
478 ..
479 }) => vec![static_term, recursive_term],
480 LogicalPlan::Statement(stmt) => stmt.inputs(),
481 LogicalPlan::TableScan { .. }
483 | LogicalPlan::EmptyRelation { .. }
484 | LogicalPlan::Values { .. }
485 | LogicalPlan::DescribeTable(_) => vec![],
486 }
487 }
488
489 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
491 let mut using_columns: Vec<HashSet<Column>> = vec![];
492
493 self.apply_with_subqueries(|plan| {
494 if let LogicalPlan::Join(Join {
495 join_constraint: JoinConstraint::Using,
496 on,
497 ..
498 }) = plan
499 {
500 let columns =
502 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
503 let Some(l) = l.get_as_join_column() else {
504 return internal_err!(
505 "Invalid join key. Expected column, found {l:?}"
506 );
507 };
508 let Some(r) = r.get_as_join_column() else {
509 return internal_err!(
510 "Invalid join key. Expected column, found {r:?}"
511 );
512 };
513 accumu.insert(l.to_owned());
514 accumu.insert(r.to_owned());
515 Result::<_, DataFusionError>::Ok(accumu)
516 })?;
517 using_columns.push(columns);
518 }
519 Ok(TreeNodeRecursion::Continue)
520 })?;
521
522 Ok(using_columns)
523 }
524
525 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
527 match self {
528 LogicalPlan::Projection(projection) => {
529 Ok(Some(projection.expr.as_slice()[0].clone()))
530 }
531 LogicalPlan::Aggregate(agg) => {
532 if agg.group_expr.is_empty() {
533 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
534 } else {
535 Ok(Some(agg.group_expr.as_slice()[0].clone()))
536 }
537 }
538 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
539 Ok(Some(select_expr[0].clone()))
540 }
541 LogicalPlan::Filter(Filter { input, .. })
542 | LogicalPlan::Distinct(Distinct::All(input))
543 | LogicalPlan::Sort(Sort { input, .. })
544 | LogicalPlan::Limit(Limit { input, .. })
545 | LogicalPlan::Repartition(Repartition { input, .. })
546 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
547 LogicalPlan::Join(Join {
548 left,
549 right,
550 join_type,
551 ..
552 }) => match join_type {
553 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
554 if left.schema().fields().is_empty() {
555 right.head_output_expr()
556 } else {
557 left.head_output_expr()
558 }
559 }
560 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
561 left.head_output_expr()
562 }
563 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
564 right.head_output_expr()
565 }
566 },
567 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
568 static_term.head_output_expr()
569 }
570 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
571 union.schema.qualified_field(0),
572 )))),
573 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
574 table.projected_schema.qualified_field(0),
575 )))),
576 LogicalPlan::SubqueryAlias(subquery_alias) => {
577 let expr_opt = subquery_alias.input.head_output_expr()?;
578 expr_opt
579 .map(|expr| {
580 Ok(Expr::Column(create_col_from_scalar_expr(
581 &expr,
582 subquery_alias.alias.to_string(),
583 )?))
584 })
585 .map_or(Ok(None), |v| v.map(Some))
586 }
587 LogicalPlan::Subquery(_) => Ok(None),
588 LogicalPlan::EmptyRelation(_)
589 | LogicalPlan::Statement(_)
590 | LogicalPlan::Values(_)
591 | LogicalPlan::Explain(_)
592 | LogicalPlan::Analyze(_)
593 | LogicalPlan::Extension(_)
594 | LogicalPlan::Dml(_)
595 | LogicalPlan::Copy(_)
596 | LogicalPlan::Ddl(_)
597 | LogicalPlan::DescribeTable(_)
598 | LogicalPlan::Unnest(_) => Ok(None),
599 }
600 }
601
602 pub fn recompute_schema(self) -> Result<Self> {
625 match self {
626 LogicalPlan::Projection(Projection {
629 expr,
630 input,
631 schema: _,
632 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
633 LogicalPlan::Dml(_) => Ok(self),
634 LogicalPlan::Copy(_) => Ok(self),
635 LogicalPlan::Values(Values { schema, values }) => {
636 Ok(LogicalPlan::Values(Values { schema, values }))
638 }
639 LogicalPlan::Filter(Filter { predicate, input }) => {
640 Filter::try_new(predicate, input).map(LogicalPlan::Filter)
641 }
642 LogicalPlan::Repartition(_) => Ok(self),
643 LogicalPlan::Window(Window {
644 input,
645 window_expr,
646 schema: _,
647 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
648 LogicalPlan::Aggregate(Aggregate {
649 input,
650 group_expr,
651 aggr_expr,
652 schema: _,
653 }) => Aggregate::try_new(input, group_expr, aggr_expr)
654 .map(LogicalPlan::Aggregate),
655 LogicalPlan::Sort(_) => Ok(self),
656 LogicalPlan::Join(Join {
657 left,
658 right,
659 filter,
660 join_type,
661 join_constraint,
662 on,
663 schema: _,
664 null_equality,
665 }) => {
666 let schema =
667 build_join_schema(left.schema(), right.schema(), &join_type)?;
668
669 let new_on: Vec<_> = on
670 .into_iter()
671 .map(|equi_expr| {
672 (equi_expr.0.unalias(), equi_expr.1.unalias())
674 })
675 .collect();
676
677 Ok(LogicalPlan::Join(Join {
678 left,
679 right,
680 join_type,
681 join_constraint,
682 on: new_on,
683 filter,
684 schema: DFSchemaRef::new(schema),
685 null_equality,
686 }))
687 }
688 LogicalPlan::Subquery(_) => Ok(self),
689 LogicalPlan::SubqueryAlias(SubqueryAlias {
690 input,
691 alias,
692 schema: _,
693 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
694 LogicalPlan::Limit(_) => Ok(self),
695 LogicalPlan::Ddl(_) => Ok(self),
696 LogicalPlan::Extension(Extension { node }) => {
697 let expr = node.expressions();
700 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
701 Ok(LogicalPlan::Extension(Extension {
702 node: node.with_exprs_and_inputs(expr, inputs)?,
703 }))
704 }
705 LogicalPlan::Union(Union { inputs, schema }) => {
706 let first_input_schema = inputs[0].schema();
707 if schema.fields().len() == first_input_schema.fields().len() {
708 Ok(LogicalPlan::Union(Union { inputs, schema }))
710 } else {
711 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
719 }
720 }
721 LogicalPlan::Distinct(distinct) => {
722 let distinct = match distinct {
723 Distinct::All(input) => Distinct::All(input),
724 Distinct::On(DistinctOn {
725 on_expr,
726 select_expr,
727 sort_expr,
728 input,
729 schema: _,
730 }) => Distinct::On(DistinctOn::try_new(
731 on_expr,
732 select_expr,
733 sort_expr,
734 input,
735 )?),
736 };
737 Ok(LogicalPlan::Distinct(distinct))
738 }
739 LogicalPlan::RecursiveQuery(_) => Ok(self),
740 LogicalPlan::Analyze(_) => Ok(self),
741 LogicalPlan::Explain(_) => Ok(self),
742 LogicalPlan::TableScan(_) => Ok(self),
743 LogicalPlan::EmptyRelation(_) => Ok(self),
744 LogicalPlan::Statement(_) => Ok(self),
745 LogicalPlan::DescribeTable(_) => Ok(self),
746 LogicalPlan::Unnest(Unnest {
747 input,
748 exec_columns,
749 options,
750 ..
751 }) => {
752 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
754 }
755 }
756 }
757
758 pub fn with_new_exprs(
784 &self,
785 mut expr: Vec<Expr>,
786 inputs: Vec<LogicalPlan>,
787 ) -> Result<LogicalPlan> {
788 match self {
789 LogicalPlan::Projection(Projection { .. }) => {
792 let input = self.only_input(inputs)?;
793 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
794 }
795 LogicalPlan::Dml(DmlStatement {
796 table_name,
797 target,
798 op,
799 ..
800 }) => {
801 self.assert_no_expressions(expr)?;
802 let input = self.only_input(inputs)?;
803 Ok(LogicalPlan::Dml(DmlStatement::new(
804 table_name.clone(),
805 Arc::clone(target),
806 op.clone(),
807 Arc::new(input),
808 )))
809 }
810 LogicalPlan::Copy(CopyTo {
811 input: _,
812 output_url,
813 file_type,
814 options,
815 partition_by,
816 output_schema: _,
817 }) => {
818 self.assert_no_expressions(expr)?;
819 let input = self.only_input(inputs)?;
820 Ok(LogicalPlan::Copy(CopyTo::new(
821 Arc::new(input),
822 output_url.clone(),
823 partition_by.clone(),
824 Arc::clone(file_type),
825 options.clone(),
826 )))
827 }
828 LogicalPlan::Values(Values { schema, .. }) => {
829 self.assert_no_inputs(inputs)?;
830 Ok(LogicalPlan::Values(Values {
831 schema: Arc::clone(schema),
832 values: expr
833 .chunks_exact(schema.fields().len())
834 .map(|s| s.to_vec())
835 .collect(),
836 }))
837 }
838 LogicalPlan::Filter { .. } => {
839 let predicate = self.only_expr(expr)?;
840 let input = self.only_input(inputs)?;
841
842 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
843 }
844 LogicalPlan::Repartition(Repartition {
845 partitioning_scheme,
846 ..
847 }) => match partitioning_scheme {
848 Partitioning::RoundRobinBatch(n) => {
849 self.assert_no_expressions(expr)?;
850 let input = self.only_input(inputs)?;
851 Ok(LogicalPlan::Repartition(Repartition {
852 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
853 input: Arc::new(input),
854 }))
855 }
856 Partitioning::Hash(_, n) => {
857 let input = self.only_input(inputs)?;
858 Ok(LogicalPlan::Repartition(Repartition {
859 partitioning_scheme: Partitioning::Hash(expr, *n),
860 input: Arc::new(input),
861 }))
862 }
863 Partitioning::DistributeBy(_) => {
864 let input = self.only_input(inputs)?;
865 Ok(LogicalPlan::Repartition(Repartition {
866 partitioning_scheme: Partitioning::DistributeBy(expr),
867 input: Arc::new(input),
868 }))
869 }
870 },
871 LogicalPlan::Window(Window { window_expr, .. }) => {
872 assert_eq!(window_expr.len(), expr.len());
873 let input = self.only_input(inputs)?;
874 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
875 }
876 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
877 let input = self.only_input(inputs)?;
878 let agg_expr = expr.split_off(group_expr.len());
880
881 Aggregate::try_new(Arc::new(input), expr, agg_expr)
882 .map(LogicalPlan::Aggregate)
883 }
884 LogicalPlan::Sort(Sort {
885 expr: sort_expr,
886 fetch,
887 ..
888 }) => {
889 let input = self.only_input(inputs)?;
890 Ok(LogicalPlan::Sort(Sort {
891 expr: expr
892 .into_iter()
893 .zip(sort_expr.iter())
894 .map(|(expr, sort)| sort.with_expr(expr))
895 .collect(),
896 input: Arc::new(input),
897 fetch: *fetch,
898 }))
899 }
900 LogicalPlan::Join(Join {
901 join_type,
902 join_constraint,
903 on,
904 null_equality,
905 ..
906 }) => {
907 let (left, right) = self.only_two_inputs(inputs)?;
908 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
909
910 let equi_expr_count = on.len() * 2;
911 assert!(expr.len() >= equi_expr_count);
912
913 let filter_expr = if expr.len() > equi_expr_count {
916 expr.pop()
917 } else {
918 None
919 };
920
921 assert_eq!(expr.len(), equi_expr_count);
924 let mut new_on = Vec::with_capacity(on.len());
925 let mut iter = expr.into_iter();
926 while let Some(left) = iter.next() {
927 let Some(right) = iter.next() else {
928 internal_err!("Expected a pair of expressions to construct the join on expression")?
929 };
930
931 new_on.push((left.unalias(), right.unalias()));
933 }
934
935 Ok(LogicalPlan::Join(Join {
936 left: Arc::new(left),
937 right: Arc::new(right),
938 join_type: *join_type,
939 join_constraint: *join_constraint,
940 on: new_on,
941 filter: filter_expr,
942 schema: DFSchemaRef::new(schema),
943 null_equality: *null_equality,
944 }))
945 }
946 LogicalPlan::Subquery(Subquery {
947 outer_ref_columns,
948 spans,
949 ..
950 }) => {
951 self.assert_no_expressions(expr)?;
952 let input = self.only_input(inputs)?;
953 let subquery = LogicalPlanBuilder::from(input).build()?;
954 Ok(LogicalPlan::Subquery(Subquery {
955 subquery: Arc::new(subquery),
956 outer_ref_columns: outer_ref_columns.clone(),
957 spans: spans.clone(),
958 }))
959 }
960 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
961 self.assert_no_expressions(expr)?;
962 let input = self.only_input(inputs)?;
963 SubqueryAlias::try_new(Arc::new(input), alias.clone())
964 .map(LogicalPlan::SubqueryAlias)
965 }
966 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
967 let old_expr_len = skip.iter().chain(fetch.iter()).count();
968 if old_expr_len != expr.len() {
969 return internal_err!(
970 "Invalid number of new Limit expressions: expected {}, got {}",
971 old_expr_len,
972 expr.len()
973 );
974 }
975 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
977 let new_skip = skip.as_ref().and_then(|_| expr.pop());
978 let input = self.only_input(inputs)?;
979 Ok(LogicalPlan::Limit(Limit {
980 skip: new_skip.map(Box::new),
981 fetch: new_fetch.map(Box::new),
982 input: Arc::new(input),
983 }))
984 }
985 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
986 name,
987 if_not_exists,
988 or_replace,
989 column_defaults,
990 temporary,
991 ..
992 })) => {
993 self.assert_no_expressions(expr)?;
994 let input = self.only_input(inputs)?;
995 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
996 CreateMemoryTable {
997 input: Arc::new(input),
998 constraints: Constraints::default(),
999 name: name.clone(),
1000 if_not_exists: *if_not_exists,
1001 or_replace: *or_replace,
1002 column_defaults: column_defaults.clone(),
1003 temporary: *temporary,
1004 },
1005 )))
1006 }
1007 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1008 name,
1009 or_replace,
1010 definition,
1011 temporary,
1012 ..
1013 })) => {
1014 self.assert_no_expressions(expr)?;
1015 let input = self.only_input(inputs)?;
1016 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1017 input: Arc::new(input),
1018 name: name.clone(),
1019 or_replace: *or_replace,
1020 temporary: *temporary,
1021 definition: definition.clone(),
1022 })))
1023 }
1024 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1025 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1026 })),
1027 LogicalPlan::Union(Union { schema, .. }) => {
1028 self.assert_no_expressions(expr)?;
1029 let input_schema = inputs[0].schema();
1030 let schema = if schema.fields().len() == input_schema.fields().len() {
1032 Arc::clone(schema)
1033 } else {
1034 Arc::clone(input_schema)
1035 };
1036 Ok(LogicalPlan::Union(Union {
1037 inputs: inputs.into_iter().map(Arc::new).collect(),
1038 schema,
1039 }))
1040 }
1041 LogicalPlan::Distinct(distinct) => {
1042 let distinct = match distinct {
1043 Distinct::All(_) => {
1044 self.assert_no_expressions(expr)?;
1045 let input = self.only_input(inputs)?;
1046 Distinct::All(Arc::new(input))
1047 }
1048 Distinct::On(DistinctOn {
1049 on_expr,
1050 select_expr,
1051 ..
1052 }) => {
1053 let input = self.only_input(inputs)?;
1054 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1055 let select_expr = expr.split_off(on_expr.len());
1056 assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1057 Distinct::On(DistinctOn::try_new(
1058 expr,
1059 select_expr,
1060 None, Arc::new(input),
1062 )?)
1063 }
1064 };
1065 Ok(LogicalPlan::Distinct(distinct))
1066 }
1067 LogicalPlan::RecursiveQuery(RecursiveQuery {
1068 name, is_distinct, ..
1069 }) => {
1070 self.assert_no_expressions(expr)?;
1071 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1072 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1073 name: name.clone(),
1074 static_term: Arc::new(static_term),
1075 recursive_term: Arc::new(recursive_term),
1076 is_distinct: *is_distinct,
1077 }))
1078 }
1079 LogicalPlan::Analyze(a) => {
1080 self.assert_no_expressions(expr)?;
1081 let input = self.only_input(inputs)?;
1082 Ok(LogicalPlan::Analyze(Analyze {
1083 verbose: a.verbose,
1084 schema: Arc::clone(&a.schema),
1085 input: Arc::new(input),
1086 }))
1087 }
1088 LogicalPlan::Explain(e) => {
1089 self.assert_no_expressions(expr)?;
1090 let input = self.only_input(inputs)?;
1091 Ok(LogicalPlan::Explain(Explain {
1092 verbose: e.verbose,
1093 plan: Arc::new(input),
1094 explain_format: e.explain_format.clone(),
1095 stringified_plans: e.stringified_plans.clone(),
1096 schema: Arc::clone(&e.schema),
1097 logical_optimization_succeeded: e.logical_optimization_succeeded,
1098 }))
1099 }
1100 LogicalPlan::Statement(Statement::Prepare(Prepare {
1101 name,
1102 data_types,
1103 ..
1104 })) => {
1105 self.assert_no_expressions(expr)?;
1106 let input = self.only_input(inputs)?;
1107 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1108 name: name.clone(),
1109 data_types: data_types.clone(),
1110 input: Arc::new(input),
1111 })))
1112 }
1113 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1114 self.assert_no_inputs(inputs)?;
1115 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1116 name: name.clone(),
1117 parameters: expr,
1118 })))
1119 }
1120 LogicalPlan::TableScan(ts) => {
1121 self.assert_no_inputs(inputs)?;
1122 Ok(LogicalPlan::TableScan(TableScan {
1123 filters: expr,
1124 ..ts.clone()
1125 }))
1126 }
1127 LogicalPlan::EmptyRelation(_)
1128 | LogicalPlan::Ddl(_)
1129 | LogicalPlan::Statement(_)
1130 | LogicalPlan::DescribeTable(_) => {
1131 self.assert_no_expressions(expr)?;
1133 self.assert_no_inputs(inputs)?;
1134 Ok(self.clone())
1135 }
1136 LogicalPlan::Unnest(Unnest {
1137 exec_columns: columns,
1138 options,
1139 ..
1140 }) => {
1141 self.assert_no_expressions(expr)?;
1142 let input = self.only_input(inputs)?;
1143 let new_plan =
1145 unnest_with_options(input, columns.clone(), options.clone())?;
1146 Ok(new_plan)
1147 }
1148 }
1149 }
1150
1151 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1153 match check {
1154 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1155 InvariantLevel::Executable => assert_executable_invariants(self),
1156 }
1157 }
1158
1159 #[inline]
1161 #[allow(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1163 if !expr.is_empty() {
1164 return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1165 }
1166 Ok(())
1167 }
1168
1169 #[inline]
1171 #[allow(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1173 if !inputs.is_empty() {
1174 return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1175 }
1176 Ok(())
1177 }
1178
1179 #[inline]
1181 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1182 if expr.len() != 1 {
1183 return internal_err!(
1184 "{self:?} should have exactly one expr, got {:?}",
1185 expr
1186 );
1187 }
1188 Ok(expr.remove(0))
1189 }
1190
1191 #[inline]
1193 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1194 if inputs.len() != 1 {
1195 return internal_err!(
1196 "{self:?} should have exactly one input, got {:?}",
1197 inputs
1198 );
1199 }
1200 Ok(inputs.remove(0))
1201 }
1202
1203 #[inline]
1205 fn only_two_inputs(
1206 &self,
1207 mut inputs: Vec<LogicalPlan>,
1208 ) -> Result<(LogicalPlan, LogicalPlan)> {
1209 if inputs.len() != 2 {
1210 return internal_err!(
1211 "{self:?} should have exactly two inputs, got {:?}",
1212 inputs
1213 );
1214 }
1215 let right = inputs.remove(1);
1216 let left = inputs.remove(0);
1217 Ok((left, right))
1218 }
1219
1220 pub fn with_param_values(
1274 self,
1275 param_values: impl Into<ParamValues>,
1276 ) -> Result<LogicalPlan> {
1277 let param_values = param_values.into();
1278 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1279
1280 Ok(
1282 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1283 plan_with_values
1284 {
1285 param_values.verify(&prepare_lp.data_types)?;
1286 Arc::unwrap_or_clone(prepare_lp.input)
1288 } else {
1289 plan_with_values
1290 },
1291 )
1292 }
1293
1294 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1299 match self {
1300 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1301 LogicalPlan::Filter(filter) => {
1302 if filter.is_scalar() {
1303 Some(1)
1304 } else {
1305 filter.input.max_rows()
1306 }
1307 }
1308 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1309 LogicalPlan::Aggregate(Aggregate {
1310 input, group_expr, ..
1311 }) => {
1312 if group_expr
1314 .iter()
1315 .all(|expr| matches!(expr, Expr::Literal(_, _)))
1316 {
1317 Some(1)
1318 } else {
1319 input.max_rows()
1320 }
1321 }
1322 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1323 match (fetch, input.max_rows()) {
1324 (Some(fetch_limit), Some(input_max)) => {
1325 Some(input_max.min(*fetch_limit))
1326 }
1327 (Some(fetch_limit), None) => Some(*fetch_limit),
1328 (None, Some(input_max)) => Some(input_max),
1329 (None, None) => None,
1330 }
1331 }
1332 LogicalPlan::Join(Join {
1333 left,
1334 right,
1335 join_type,
1336 ..
1337 }) => match join_type {
1338 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1339 JoinType::Left | JoinType::Right | JoinType::Full => {
1340 match (left.max_rows()?, right.max_rows()?, join_type) {
1341 (0, 0, _) => Some(0),
1342 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1343 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1344 (left_max, right_max, _) => Some(left_max * right_max),
1345 }
1346 }
1347 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1348 left.max_rows()
1349 }
1350 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1351 right.max_rows()
1352 }
1353 },
1354 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1355 LogicalPlan::Union(Union { inputs, .. }) => {
1356 inputs.iter().try_fold(0usize, |mut acc, plan| {
1357 acc += plan.max_rows()?;
1358 Some(acc)
1359 })
1360 }
1361 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1362 LogicalPlan::EmptyRelation(_) => Some(0),
1363 LogicalPlan::RecursiveQuery(_) => None,
1364 LogicalPlan::Subquery(_) => None,
1365 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1366 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1367 Ok(FetchType::Literal(s)) => s,
1368 _ => None,
1369 },
1370 LogicalPlan::Distinct(
1371 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1372 ) => input.max_rows(),
1373 LogicalPlan::Values(v) => Some(v.values.len()),
1374 LogicalPlan::Unnest(_) => None,
1375 LogicalPlan::Ddl(_)
1376 | LogicalPlan::Explain(_)
1377 | LogicalPlan::Analyze(_)
1378 | LogicalPlan::Dml(_)
1379 | LogicalPlan::Copy(_)
1380 | LogicalPlan::DescribeTable(_)
1381 | LogicalPlan::Statement(_)
1382 | LogicalPlan::Extension(_) => None,
1383 }
1384 }
1385
1386 pub fn contains_outer_reference(&self) -> bool {
1388 let mut contains = false;
1389 self.apply_expressions(|expr| {
1390 Ok(if expr.contains_outer() {
1391 contains = true;
1392 TreeNodeRecursion::Stop
1393 } else {
1394 TreeNodeRecursion::Continue
1395 })
1396 })
1397 .unwrap();
1398 contains
1399 }
1400
1401 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1409 match self {
1410 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1411 .output_expressions()?
1412 .into_iter()
1413 .zip(self.schema().columns())
1414 .collect()),
1415 LogicalPlan::Window(Window {
1416 window_expr,
1417 input,
1418 schema,
1419 }) => {
1420 let mut output_exprs = input.columnized_output_exprs()?;
1428 let input_len = input.schema().fields().len();
1429 output_exprs.extend(
1430 window_expr
1431 .iter()
1432 .zip(schema.columns().into_iter().skip(input_len)),
1433 );
1434 Ok(output_exprs)
1435 }
1436 _ => Ok(vec![]),
1437 }
1438 }
1439}
1440
1441impl LogicalPlan {
1442 pub fn replace_params_with_values(
1449 self,
1450 param_values: &ParamValues,
1451 ) -> Result<LogicalPlan> {
1452 self.transform_up_with_subqueries(|plan| {
1453 let schema = Arc::clone(plan.schema());
1454 let name_preserver = NamePreserver::new(&plan);
1455 plan.map_expressions(|e| {
1456 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1457 if !has_placeholder {
1458 Ok(Transformed::no(e))
1462 } else {
1463 let original_name = name_preserver.save(&e);
1464 let transformed_expr = e.transform_up(|e| {
1465 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1466 let value = param_values.get_placeholders_with_values(&id)?;
1467 Ok(Transformed::yes(Expr::Literal(value, None)))
1468 } else {
1469 Ok(Transformed::no(e))
1470 }
1471 })?;
1472 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1474 }
1475 })
1476 })
1477 .map(|res| res.data)
1478 }
1479
1480 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1482 let mut param_names = HashSet::new();
1483 self.apply_with_subqueries(|plan| {
1484 plan.apply_expressions(|expr| {
1485 expr.apply(|expr| {
1486 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1487 param_names.insert(id.clone());
1488 }
1489 Ok(TreeNodeRecursion::Continue)
1490 })
1491 })
1492 })
1493 .map(|_| param_names)
1494 }
1495
1496 pub fn get_parameter_types(
1498 &self,
1499 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1500 let mut param_types: HashMap<String, Option<DataType>> = HashMap::new();
1501
1502 self.apply_with_subqueries(|plan| {
1503 plan.apply_expressions(|expr| {
1504 expr.apply(|expr| {
1505 if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
1506 let prev = param_types.get(id);
1507 match (prev, data_type) {
1508 (Some(Some(prev)), Some(dt)) => {
1509 if prev != dt {
1510 plan_err!("Conflicting types for {id}")?;
1511 }
1512 }
1513 (_, Some(dt)) => {
1514 param_types.insert(id.clone(), Some(dt.clone()));
1515 }
1516 _ => {
1517 param_types.insert(id.clone(), None);
1518 }
1519 }
1520 }
1521 Ok(TreeNodeRecursion::Continue)
1522 })
1523 })
1524 })
1525 .map(|_| param_types)
1526 }
1527
1528 pub fn display_indent(&self) -> impl Display + '_ {
1560 struct Wrapper<'a>(&'a LogicalPlan);
1563 impl Display for Wrapper<'_> {
1564 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1565 let with_schema = false;
1566 let mut visitor = IndentVisitor::new(f, with_schema);
1567 match self.0.visit_with_subqueries(&mut visitor) {
1568 Ok(_) => Ok(()),
1569 Err(_) => Err(fmt::Error),
1570 }
1571 }
1572 }
1573 Wrapper(self)
1574 }
1575
1576 pub fn display_indent_schema(&self) -> impl Display + '_ {
1603 struct Wrapper<'a>(&'a LogicalPlan);
1606 impl Display for Wrapper<'_> {
1607 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1608 let with_schema = true;
1609 let mut visitor = IndentVisitor::new(f, with_schema);
1610 match self.0.visit_with_subqueries(&mut visitor) {
1611 Ok(_) => Ok(()),
1612 Err(_) => Err(fmt::Error),
1613 }
1614 }
1615 }
1616 Wrapper(self)
1617 }
1618
1619 pub fn display_pg_json(&self) -> impl Display + '_ {
1623 struct Wrapper<'a>(&'a LogicalPlan);
1626 impl Display for Wrapper<'_> {
1627 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1628 let mut visitor = PgJsonVisitor::new(f);
1629 visitor.with_schema(true);
1630 match self.0.visit_with_subqueries(&mut visitor) {
1631 Ok(_) => Ok(()),
1632 Err(_) => Err(fmt::Error),
1633 }
1634 }
1635 }
1636 Wrapper(self)
1637 }
1638
1639 pub fn display_graphviz(&self) -> impl Display + '_ {
1669 struct Wrapper<'a>(&'a LogicalPlan);
1672 impl Display for Wrapper<'_> {
1673 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1674 let mut visitor = GraphvizVisitor::new(f);
1675
1676 visitor.start_graph()?;
1677
1678 visitor.pre_visit_plan("LogicalPlan")?;
1679 self.0
1680 .visit_with_subqueries(&mut visitor)
1681 .map_err(|_| fmt::Error)?;
1682 visitor.post_visit_plan()?;
1683
1684 visitor.set_with_schema(true);
1685 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1686 self.0
1687 .visit_with_subqueries(&mut visitor)
1688 .map_err(|_| fmt::Error)?;
1689 visitor.post_visit_plan()?;
1690
1691 visitor.end_graph()?;
1692 Ok(())
1693 }
1694 }
1695 Wrapper(self)
1696 }
1697
1698 pub fn display(&self) -> impl Display + '_ {
1720 struct Wrapper<'a>(&'a LogicalPlan);
1723 impl Display for Wrapper<'_> {
1724 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1725 match self.0 {
1726 LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row, schema: _ }) => {
1727 let rows = if *produce_one_row { 1 } else { 0 };
1728 write!(f, "EmptyRelation: rows={rows}")
1729 },
1730 LogicalPlan::RecursiveQuery(RecursiveQuery {
1731 is_distinct, ..
1732 }) => {
1733 write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1734 }
1735 LogicalPlan::Values(Values { ref values, .. }) => {
1736 let str_values: Vec<_> = values
1737 .iter()
1738 .take(5)
1740 .map(|row| {
1741 let item = row
1742 .iter()
1743 .map(|expr| expr.to_string())
1744 .collect::<Vec<_>>()
1745 .join(", ");
1746 format!("({item})")
1747 })
1748 .collect();
1749
1750 let eclipse = if values.len() > 5 { "..." } else { "" };
1751 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1752 }
1753
1754 LogicalPlan::TableScan(TableScan {
1755 ref source,
1756 ref table_name,
1757 ref projection,
1758 ref filters,
1759 ref fetch,
1760 ..
1761 }) => {
1762 let projected_fields = match projection {
1763 Some(indices) => {
1764 let schema = source.schema();
1765 let names: Vec<&str> = indices
1766 .iter()
1767 .map(|i| schema.field(*i).name().as_str())
1768 .collect();
1769 format!(" projection=[{}]", names.join(", "))
1770 }
1771 _ => "".to_string(),
1772 };
1773
1774 write!(f, "TableScan: {table_name}{projected_fields}")?;
1775
1776 if !filters.is_empty() {
1777 let mut full_filter = vec![];
1778 let mut partial_filter = vec![];
1779 let mut unsupported_filters = vec![];
1780 let filters: Vec<&Expr> = filters.iter().collect();
1781
1782 if let Ok(results) =
1783 source.supports_filters_pushdown(&filters)
1784 {
1785 filters.iter().zip(results.iter()).for_each(
1786 |(x, res)| match res {
1787 TableProviderFilterPushDown::Exact => {
1788 full_filter.push(x)
1789 }
1790 TableProviderFilterPushDown::Inexact => {
1791 partial_filter.push(x)
1792 }
1793 TableProviderFilterPushDown::Unsupported => {
1794 unsupported_filters.push(x)
1795 }
1796 },
1797 );
1798 }
1799
1800 if !full_filter.is_empty() {
1801 write!(
1802 f,
1803 ", full_filters=[{}]",
1804 expr_vec_fmt!(full_filter)
1805 )?;
1806 };
1807 if !partial_filter.is_empty() {
1808 write!(
1809 f,
1810 ", partial_filters=[{}]",
1811 expr_vec_fmt!(partial_filter)
1812 )?;
1813 }
1814 if !unsupported_filters.is_empty() {
1815 write!(
1816 f,
1817 ", unsupported_filters=[{}]",
1818 expr_vec_fmt!(unsupported_filters)
1819 )?;
1820 }
1821 }
1822
1823 if let Some(n) = fetch {
1824 write!(f, ", fetch={n}")?;
1825 }
1826
1827 Ok(())
1828 }
1829 LogicalPlan::Projection(Projection { ref expr, .. }) => {
1830 write!(f, "Projection:")?;
1831 for (i, expr_item) in expr.iter().enumerate() {
1832 if i > 0 {
1833 write!(f, ",")?;
1834 }
1835 write!(f, " {expr_item}")?;
1836 }
1837 Ok(())
1838 }
1839 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1840 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1841 }
1842 LogicalPlan::Copy(CopyTo {
1843 input: _,
1844 output_url,
1845 file_type,
1846 options,
1847 ..
1848 }) => {
1849 let op_str = options
1850 .iter()
1851 .map(|(k, v)| format!("{k} {v}"))
1852 .collect::<Vec<String>>()
1853 .join(", ");
1854
1855 write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1856 }
1857 LogicalPlan::Ddl(ddl) => {
1858 write!(f, "{}", ddl.display())
1859 }
1860 LogicalPlan::Filter(Filter {
1861 predicate: ref expr,
1862 ..
1863 }) => write!(f, "Filter: {expr}"),
1864 LogicalPlan::Window(Window {
1865 ref window_expr, ..
1866 }) => {
1867 write!(
1868 f,
1869 "WindowAggr: windowExpr=[[{}]]",
1870 expr_vec_fmt!(window_expr)
1871 )
1872 }
1873 LogicalPlan::Aggregate(Aggregate {
1874 ref group_expr,
1875 ref aggr_expr,
1876 ..
1877 }) => write!(
1878 f,
1879 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1880 expr_vec_fmt!(group_expr),
1881 expr_vec_fmt!(aggr_expr)
1882 ),
1883 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1884 write!(f, "Sort: ")?;
1885 for (i, expr_item) in expr.iter().enumerate() {
1886 if i > 0 {
1887 write!(f, ", ")?;
1888 }
1889 write!(f, "{expr_item}")?;
1890 }
1891 if let Some(a) = fetch {
1892 write!(f, ", fetch={a}")?;
1893 }
1894
1895 Ok(())
1896 }
1897 LogicalPlan::Join(Join {
1898 on: ref keys,
1899 filter,
1900 join_constraint,
1901 join_type,
1902 ..
1903 }) => {
1904 let join_expr: Vec<String> =
1905 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1906 let filter_expr = filter
1907 .as_ref()
1908 .map(|expr| format!(" Filter: {expr}"))
1909 .unwrap_or_else(|| "".to_string());
1910 let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1911 "Cross".to_string()
1912 } else {
1913 join_type.to_string()
1914 };
1915 match join_constraint {
1916 JoinConstraint::On => {
1917 write!(
1918 f,
1919 "{} Join: {}{}",
1920 join_type,
1921 join_expr.join(", "),
1922 filter_expr
1923 )
1924 }
1925 JoinConstraint::Using => {
1926 write!(
1927 f,
1928 "{} Join: Using {}{}",
1929 join_type,
1930 join_expr.join(", "),
1931 filter_expr,
1932 )
1933 }
1934 }
1935 }
1936 LogicalPlan::Repartition(Repartition {
1937 partitioning_scheme,
1938 ..
1939 }) => match partitioning_scheme {
1940 Partitioning::RoundRobinBatch(n) => {
1941 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1942 }
1943 Partitioning::Hash(expr, n) => {
1944 let hash_expr: Vec<String> =
1945 expr.iter().map(|e| format!("{e}")).collect();
1946 write!(
1947 f,
1948 "Repartition: Hash({}) partition_count={}",
1949 hash_expr.join(", "),
1950 n
1951 )
1952 }
1953 Partitioning::DistributeBy(expr) => {
1954 let dist_by_expr: Vec<String> =
1955 expr.iter().map(|e| format!("{e}")).collect();
1956 write!(
1957 f,
1958 "Repartition: DistributeBy({})",
1959 dist_by_expr.join(", "),
1960 )
1961 }
1962 },
1963 LogicalPlan::Limit(limit) => {
1964 let skip_str = match limit.get_skip_type() {
1966 Ok(SkipType::Literal(n)) => n.to_string(),
1967 _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1968 };
1969 let fetch_str = match limit.get_fetch_type() {
1970 Ok(FetchType::Literal(Some(n))) => n.to_string(),
1971 Ok(FetchType::Literal(None)) => "None".to_string(),
1972 _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1973 };
1974 write!(
1975 f,
1976 "Limit: skip={skip_str}, fetch={fetch_str}",
1977 )
1978 }
1979 LogicalPlan::Subquery(Subquery { .. }) => {
1980 write!(f, "Subquery:")
1981 }
1982 LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
1983 write!(f, "SubqueryAlias: {alias}")
1984 }
1985 LogicalPlan::Statement(statement) => {
1986 write!(f, "{}", statement.display())
1987 }
1988 LogicalPlan::Distinct(distinct) => match distinct {
1989 Distinct::All(_) => write!(f, "Distinct:"),
1990 Distinct::On(DistinctOn {
1991 on_expr,
1992 select_expr,
1993 sort_expr,
1994 ..
1995 }) => write!(
1996 f,
1997 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
1998 expr_vec_fmt!(on_expr),
1999 expr_vec_fmt!(select_expr),
2000 if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
2001 ),
2002 },
2003 LogicalPlan::Explain { .. } => write!(f, "Explain"),
2004 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2005 LogicalPlan::Union(_) => write!(f, "Union"),
2006 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2007 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2008 write!(f, "DescribeTable")
2009 }
2010 LogicalPlan::Unnest(Unnest {
2011 input: plan,
2012 list_type_columns: list_col_indices,
2013 struct_type_columns: struct_col_indices, .. }) => {
2014 let input_columns = plan.schema().columns();
2015 let list_type_columns = list_col_indices
2016 .iter()
2017 .map(|(i,unnest_info)|
2018 format!("{}|depth={}", &input_columns[*i].to_string(),
2019 unnest_info.depth))
2020 .collect::<Vec<String>>();
2021 let struct_type_columns = struct_col_indices
2022 .iter()
2023 .map(|i| &input_columns[*i])
2024 .collect::<Vec<&Column>>();
2025 write!(f, "Unnest: lists[{}] structs[{}]",
2027 expr_vec_fmt!(list_type_columns),
2028 expr_vec_fmt!(struct_type_columns))
2029 }
2030 }
2031 }
2032 }
2033 Wrapper(self)
2034 }
2035}
2036
2037impl Display for LogicalPlan {
2038 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2039 self.display_indent().fmt(f)
2040 }
2041}
2042
2043impl ToStringifiedPlan for LogicalPlan {
2044 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2045 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2046 }
2047}
2048
2049#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2053pub struct EmptyRelation {
2054 pub produce_one_row: bool,
2056 pub schema: DFSchemaRef,
2058}
2059
2060impl PartialOrd for EmptyRelation {
2062 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2063 self.produce_one_row.partial_cmp(&other.produce_one_row)
2064 }
2065}
2066
2067#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2090pub struct RecursiveQuery {
2091 pub name: String,
2093 pub static_term: Arc<LogicalPlan>,
2095 pub recursive_term: Arc<LogicalPlan>,
2098 pub is_distinct: bool,
2101}
2102
2103#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2107pub struct Values {
2108 pub schema: DFSchemaRef,
2110 pub values: Vec<Vec<Expr>>,
2112}
2113
2114impl PartialOrd for Values {
2116 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2117 self.values.partial_cmp(&other.values)
2118 }
2119}
2120
2121#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2124#[non_exhaustive]
2126pub struct Projection {
2127 pub expr: Vec<Expr>,
2129 pub input: Arc<LogicalPlan>,
2131 pub schema: DFSchemaRef,
2133}
2134
2135impl PartialOrd for Projection {
2137 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2138 match self.expr.partial_cmp(&other.expr) {
2139 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2140 cmp => cmp,
2141 }
2142 }
2143}
2144
2145impl Projection {
2146 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2148 let projection_schema = projection_schema(&input, &expr)?;
2149 Self::try_new_with_schema(expr, input, projection_schema)
2150 }
2151
2152 pub fn try_new_with_schema(
2154 expr: Vec<Expr>,
2155 input: Arc<LogicalPlan>,
2156 schema: DFSchemaRef,
2157 ) -> Result<Self> {
2158 #[expect(deprecated)]
2159 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2160 && expr.len() != schema.fields().len()
2161 {
2162 return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2163 }
2164 Ok(Self {
2165 expr,
2166 input,
2167 schema,
2168 })
2169 }
2170
2171 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2173 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2174 Self {
2175 expr,
2176 input,
2177 schema,
2178 }
2179 }
2180}
2181
2182pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2196 let metadata = input.schema().metadata().clone();
2197
2198 let schema =
2199 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2200 .with_functional_dependencies(calc_func_dependencies_for_project(
2201 exprs, input,
2202 )?)?;
2203
2204 Ok(Arc::new(schema))
2205}
2206
2207#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2209#[non_exhaustive]
2211pub struct SubqueryAlias {
2212 pub input: Arc<LogicalPlan>,
2214 pub alias: TableReference,
2216 pub schema: DFSchemaRef,
2218}
2219
2220impl SubqueryAlias {
2221 pub fn try_new(
2222 plan: Arc<LogicalPlan>,
2223 alias: impl Into<TableReference>,
2224 ) -> Result<Self> {
2225 let alias = alias.into();
2226
2227 let aliases = unique_field_aliases(plan.schema().fields());
2233 let is_projection_needed = aliases.iter().any(Option::is_some);
2234
2235 let plan = if is_projection_needed {
2237 let projection_expressions = aliases
2238 .iter()
2239 .zip(plan.schema().iter())
2240 .map(|(alias, (qualifier, field))| {
2241 let column =
2242 Expr::Column(Column::new(qualifier.cloned(), field.name()));
2243 match alias {
2244 None => column,
2245 Some(alias) => {
2246 Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2247 }
2248 }
2249 })
2250 .collect();
2251 let projection = Projection::try_new(projection_expressions, plan)?;
2252 Arc::new(LogicalPlan::Projection(projection))
2253 } else {
2254 plan
2255 };
2256
2257 let fields = plan.schema().fields().clone();
2259 let meta_data = plan.schema().metadata().clone();
2260 let func_dependencies = plan.schema().functional_dependencies().clone();
2261
2262 let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2263 let schema = Schema::from(schema);
2264
2265 let schema = DFSchemaRef::new(
2266 DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
2267 .with_functional_dependencies(func_dependencies)?,
2268 );
2269 Ok(SubqueryAlias {
2270 input: plan,
2271 alias,
2272 schema,
2273 })
2274 }
2275}
2276
2277impl PartialOrd for SubqueryAlias {
2279 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2280 match self.input.partial_cmp(&other.input) {
2281 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2282 cmp => cmp,
2283 }
2284 }
2285}
2286
2287#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2299#[non_exhaustive]
2300pub struct Filter {
2301 pub predicate: Expr,
2303 pub input: Arc<LogicalPlan>,
2305}
2306
2307impl Filter {
2308 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2313 Self::try_new_internal(predicate, input)
2314 }
2315
2316 #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2319 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2320 Self::try_new_internal(predicate, input)
2321 }
2322
2323 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2324 match data_type {
2325 DataType::Boolean | DataType::Null => true,
2327 DataType::Dictionary(_, value_type) => {
2328 Filter::is_allowed_filter_type(value_type.as_ref())
2329 }
2330 _ => false,
2331 }
2332 }
2333
2334 fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2335 if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2340 if !Filter::is_allowed_filter_type(&predicate_type) {
2341 return plan_err!(
2342 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2343 );
2344 }
2345 }
2346
2347 Ok(Self {
2348 predicate: predicate.unalias_nested().data,
2349 input,
2350 })
2351 }
2352
2353 fn is_scalar(&self) -> bool {
2369 let schema = self.input.schema();
2370
2371 let functional_dependencies = self.input.schema().functional_dependencies();
2372 let unique_keys = functional_dependencies.iter().filter(|dep| {
2373 let nullable = dep.nullable
2374 && dep
2375 .source_indices
2376 .iter()
2377 .any(|&source| schema.field(source).is_nullable());
2378 !nullable
2379 && dep.mode == Dependency::Single
2380 && dep.target_indices.len() == schema.fields().len()
2381 });
2382
2383 let exprs = split_conjunction(&self.predicate);
2384 let eq_pred_cols: HashSet<_> = exprs
2385 .iter()
2386 .filter_map(|expr| {
2387 let Expr::BinaryExpr(BinaryExpr {
2388 left,
2389 op: Operator::Eq,
2390 right,
2391 }) = expr
2392 else {
2393 return None;
2394 };
2395 if left == right {
2397 return None;
2398 }
2399
2400 match (left.as_ref(), right.as_ref()) {
2401 (Expr::Column(_), Expr::Column(_)) => None,
2402 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2403 Some(schema.index_of_column(c).unwrap())
2404 }
2405 _ => None,
2406 }
2407 })
2408 .collect();
2409
2410 for key in unique_keys {
2413 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2414 return true;
2415 }
2416 }
2417 false
2418 }
2419}
2420
2421#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2436pub struct Window {
2437 pub input: Arc<LogicalPlan>,
2439 pub window_expr: Vec<Expr>,
2441 pub schema: DFSchemaRef,
2443}
2444
2445impl Window {
2446 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2448 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2449 .schema()
2450 .iter()
2451 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2452 .collect();
2453 let input_len = fields.len();
2454 let mut window_fields = fields;
2455 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2456 window_fields.extend_from_slice(expr_fields.as_slice());
2457 let metadata = input.schema().metadata().clone();
2458
2459 let mut window_func_dependencies =
2461 input.schema().functional_dependencies().clone();
2462 window_func_dependencies.extend_target_indices(window_fields.len());
2463
2464 let mut new_dependencies = window_expr
2468 .iter()
2469 .enumerate()
2470 .filter_map(|(idx, expr)| {
2471 let Expr::WindowFunction(window_fun) = expr else {
2472 return None;
2473 };
2474 let WindowFunction {
2475 fun: WindowFunctionDefinition::WindowUDF(udwf),
2476 params: WindowFunctionParams { partition_by, .. },
2477 } = window_fun.as_ref()
2478 else {
2479 return None;
2480 };
2481 if udwf.name() == "row_number" && partition_by.is_empty() {
2484 Some(idx + input_len)
2485 } else {
2486 None
2487 }
2488 })
2489 .map(|idx| {
2490 FunctionalDependence::new(vec![idx], vec![], false)
2491 .with_mode(Dependency::Single)
2492 })
2493 .collect::<Vec<_>>();
2494
2495 if !new_dependencies.is_empty() {
2496 for dependence in new_dependencies.iter_mut() {
2497 dependence.target_indices = (0..window_fields.len()).collect();
2498 }
2499 let new_deps = FunctionalDependencies::new(new_dependencies);
2501 window_func_dependencies.extend(new_deps);
2502 }
2503
2504 if let Some(e) = window_expr.iter().find(|e| {
2506 matches!(
2507 e,
2508 Expr::WindowFunction(wf)
2509 if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2510 && wf.params.filter.is_some()
2511 )
2512 }) {
2513 return plan_err!(
2514 "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2515 );
2516 }
2517
2518 Self::try_new_with_schema(
2519 window_expr,
2520 input,
2521 Arc::new(
2522 DFSchema::new_with_metadata(window_fields, metadata)?
2523 .with_functional_dependencies(window_func_dependencies)?,
2524 ),
2525 )
2526 }
2527
2528 pub fn try_new_with_schema(
2529 window_expr: Vec<Expr>,
2530 input: Arc<LogicalPlan>,
2531 schema: DFSchemaRef,
2532 ) -> Result<Self> {
2533 if window_expr.len() != schema.fields().len() - input.schema().fields().len() {
2534 return plan_err!(
2535 "Window has mismatch between number of expressions ({}) and number of fields in schema ({})",
2536 window_expr.len(),
2537 schema.fields().len() - input.schema().fields().len()
2538 );
2539 }
2540
2541 Ok(Window {
2542 input,
2543 window_expr,
2544 schema,
2545 })
2546 }
2547}
2548
2549impl PartialOrd for Window {
2551 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2552 match self.input.partial_cmp(&other.input)? {
2553 Ordering::Equal => {} not_equal => return Some(not_equal),
2555 }
2556
2557 match self.window_expr.partial_cmp(&other.window_expr)? {
2558 Ordering::Equal => {} not_equal => return Some(not_equal),
2560 }
2561
2562 if self == other {
2565 Some(Ordering::Equal)
2566 } else {
2567 None
2568 }
2569 }
2570}
2571
2572#[derive(Clone)]
2574pub struct TableScan {
2575 pub table_name: TableReference,
2577 pub source: Arc<dyn TableSource>,
2579 pub projection: Option<Vec<usize>>,
2581 pub projected_schema: DFSchemaRef,
2583 pub filters: Vec<Expr>,
2585 pub fetch: Option<usize>,
2587}
2588
2589impl Debug for TableScan {
2590 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2591 f.debug_struct("TableScan")
2592 .field("table_name", &self.table_name)
2593 .field("source", &"...")
2594 .field("projection", &self.projection)
2595 .field("projected_schema", &self.projected_schema)
2596 .field("filters", &self.filters)
2597 .field("fetch", &self.fetch)
2598 .finish_non_exhaustive()
2599 }
2600}
2601
2602impl PartialEq for TableScan {
2603 fn eq(&self, other: &Self) -> bool {
2604 self.table_name == other.table_name
2605 && self.projection == other.projection
2606 && self.projected_schema == other.projected_schema
2607 && self.filters == other.filters
2608 && self.fetch == other.fetch
2609 }
2610}
2611
2612impl Eq for TableScan {}
2613
2614impl PartialOrd for TableScan {
2617 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2618 #[derive(PartialEq, PartialOrd)]
2619 struct ComparableTableScan<'a> {
2620 pub table_name: &'a TableReference,
2622 pub projection: &'a Option<Vec<usize>>,
2624 pub filters: &'a Vec<Expr>,
2626 pub fetch: &'a Option<usize>,
2628 }
2629 let comparable_self = ComparableTableScan {
2630 table_name: &self.table_name,
2631 projection: &self.projection,
2632 filters: &self.filters,
2633 fetch: &self.fetch,
2634 };
2635 let comparable_other = ComparableTableScan {
2636 table_name: &other.table_name,
2637 projection: &other.projection,
2638 filters: &other.filters,
2639 fetch: &other.fetch,
2640 };
2641 comparable_self.partial_cmp(&comparable_other)
2642 }
2643}
2644
2645impl Hash for TableScan {
2646 fn hash<H: Hasher>(&self, state: &mut H) {
2647 self.table_name.hash(state);
2648 self.projection.hash(state);
2649 self.projected_schema.hash(state);
2650 self.filters.hash(state);
2651 self.fetch.hash(state);
2652 }
2653}
2654
2655impl TableScan {
2656 pub fn try_new(
2659 table_name: impl Into<TableReference>,
2660 table_source: Arc<dyn TableSource>,
2661 projection: Option<Vec<usize>>,
2662 filters: Vec<Expr>,
2663 fetch: Option<usize>,
2664 ) -> Result<Self> {
2665 let table_name = table_name.into();
2666
2667 if table_name.table().is_empty() {
2668 return plan_err!("table_name cannot be empty");
2669 }
2670 let schema = table_source.schema();
2671 let func_dependencies = FunctionalDependencies::new_from_constraints(
2672 table_source.constraints(),
2673 schema.fields.len(),
2674 );
2675 let projected_schema = projection
2676 .as_ref()
2677 .map(|p| {
2678 let projected_func_dependencies =
2679 func_dependencies.project_functional_dependencies(p, p.len());
2680
2681 let df_schema = DFSchema::new_with_metadata(
2682 p.iter()
2683 .map(|i| {
2684 (Some(table_name.clone()), Arc::new(schema.field(*i).clone()))
2685 })
2686 .collect(),
2687 schema.metadata.clone(),
2688 )?;
2689 df_schema.with_functional_dependencies(projected_func_dependencies)
2690 })
2691 .unwrap_or_else(|| {
2692 let df_schema =
2693 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2694 df_schema.with_functional_dependencies(func_dependencies)
2695 })?;
2696 let projected_schema = Arc::new(projected_schema);
2697
2698 Ok(Self {
2699 table_name,
2700 source: table_source,
2701 projection,
2702 projected_schema,
2703 filters,
2704 fetch,
2705 })
2706 }
2707}
2708
2709#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2711pub struct Repartition {
2712 pub input: Arc<LogicalPlan>,
2714 pub partitioning_scheme: Partitioning,
2716}
2717
2718#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2720pub struct Union {
2721 pub inputs: Vec<Arc<LogicalPlan>>,
2723 pub schema: DFSchemaRef,
2725}
2726
2727impl Union {
2728 fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2730 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2731 Ok(Union { inputs, schema })
2732 }
2733
2734 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2739 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2740 Ok(Union { inputs, schema })
2741 }
2742
2743 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2747 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2748 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2749
2750 Ok(Union { inputs, schema })
2751 }
2752
2753 fn rewrite_inputs_from_schema(
2757 schema: &Arc<DFSchema>,
2758 inputs: Vec<Arc<LogicalPlan>>,
2759 ) -> Result<Vec<Arc<LogicalPlan>>> {
2760 let schema_width = schema.iter().count();
2761 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2762 for input in inputs {
2763 let mut expr = Vec::with_capacity(schema_width);
2767 for column in schema.columns() {
2768 if input
2769 .schema()
2770 .has_column_with_unqualified_name(column.name())
2771 {
2772 expr.push(Expr::Column(column));
2773 } else {
2774 expr.push(
2775 Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2776 );
2777 }
2778 }
2779 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2780 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2781 )));
2782 }
2783
2784 Ok(wrapped_inputs)
2785 }
2786
2787 fn derive_schema_from_inputs(
2796 inputs: &[Arc<LogicalPlan>],
2797 loose_types: bool,
2798 by_name: bool,
2799 ) -> Result<DFSchemaRef> {
2800 if inputs.len() < 2 {
2801 return plan_err!("UNION requires at least two inputs");
2802 }
2803
2804 if by_name {
2805 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2806 } else {
2807 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2808 }
2809 }
2810
2811 fn derive_schema_from_inputs_by_name(
2812 inputs: &[Arc<LogicalPlan>],
2813 loose_types: bool,
2814 ) -> Result<DFSchemaRef> {
2815 type FieldData<'a> =
2816 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2817 let mut cols: Vec<(&str, FieldData)> = Vec::new();
2818 for input in inputs.iter() {
2819 for field in input.schema().fields() {
2820 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2821 cols.iter_mut().find(|(name, _)| name == field.name())
2822 {
2823 if !loose_types && *data_type != field.data_type() {
2824 return plan_err!(
2825 "Found different types for field {}",
2826 field.name()
2827 );
2828 }
2829
2830 metadata.push(field.metadata());
2831 *is_nullable |= field.is_nullable();
2834 *occurrences += 1;
2835 } else {
2836 cols.push((
2837 field.name(),
2838 (
2839 field.data_type(),
2840 field.is_nullable(),
2841 vec![field.metadata()],
2842 1,
2843 ),
2844 ));
2845 }
2846 }
2847 }
2848
2849 let union_fields = cols
2850 .into_iter()
2851 .map(
2852 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2853 let final_is_nullable = if occurrences == inputs.len() {
2857 is_nullable
2858 } else {
2859 true
2860 };
2861
2862 let mut field =
2863 Field::new(name, data_type.clone(), final_is_nullable);
2864 field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
2865
2866 (None, Arc::new(field))
2867 },
2868 )
2869 .collect::<Vec<(Option<TableReference>, _)>>();
2870
2871 let union_schema_metadata = intersect_metadata_for_union(
2872 inputs.iter().map(|input| input.schema().metadata()),
2873 );
2874
2875 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2877 let schema = Arc::new(schema);
2878
2879 Ok(schema)
2880 }
2881
2882 fn derive_schema_from_inputs_by_position(
2883 inputs: &[Arc<LogicalPlan>],
2884 loose_types: bool,
2885 ) -> Result<DFSchemaRef> {
2886 let first_schema = inputs[0].schema();
2887 let fields_count = first_schema.fields().len();
2888 for input in inputs.iter().skip(1) {
2889 if fields_count != input.schema().fields().len() {
2890 return plan_err!(
2891 "UNION queries have different number of columns: \
2892 left has {} columns whereas right has {} columns",
2893 fields_count,
2894 input.schema().fields().len()
2895 );
2896 }
2897 }
2898
2899 let mut name_counts: HashMap<String, usize> = HashMap::new();
2900 let union_fields = (0..fields_count)
2901 .map(|i| {
2902 let fields = inputs
2903 .iter()
2904 .map(|input| input.schema().field(i))
2905 .collect::<Vec<_>>();
2906 let first_field = fields[0];
2907 let base_name = first_field.name().to_string();
2908
2909 let data_type = if loose_types {
2910 first_field.data_type()
2914 } else {
2915 fields.iter().skip(1).try_fold(
2916 first_field.data_type(),
2917 |acc, field| {
2918 if acc != field.data_type() {
2919 return plan_err!(
2920 "UNION field {i} have different type in inputs: \
2921 left has {} whereas right has {}",
2922 first_field.data_type(),
2923 field.data_type()
2924 );
2925 }
2926 Ok(acc)
2927 },
2928 )?
2929 };
2930 let nullable = fields.iter().any(|field| field.is_nullable());
2931
2932 let name = if let Some(count) = name_counts.get_mut(&base_name) {
2934 *count += 1;
2935 format!("{base_name}_{count}")
2936 } else {
2937 name_counts.insert(base_name.clone(), 0);
2938 base_name
2939 };
2940
2941 let mut field = Field::new(&name, data_type.clone(), nullable);
2942 let field_metadata = intersect_metadata_for_union(
2943 fields.iter().map(|field| field.metadata()),
2944 );
2945 field.set_metadata(field_metadata);
2946 Ok((None, Arc::new(field)))
2947 })
2948 .collect::<Result<_>>()?;
2949 let union_schema_metadata = intersect_metadata_for_union(
2950 inputs.iter().map(|input| input.schema().metadata()),
2951 );
2952
2953 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2955 let schema = Arc::new(schema);
2956
2957 Ok(schema)
2958 }
2959}
2960
2961impl PartialOrd for Union {
2963 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2964 self.inputs.partial_cmp(&other.inputs)
2965 }
2966}
2967
2968#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2991pub struct DescribeTable {
2992 pub schema: Arc<Schema>,
2994 pub output_schema: DFSchemaRef,
2996}
2997
2998impl PartialOrd for DescribeTable {
3001 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3002 None
3004 }
3005}
3006
3007#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3009pub enum ExplainFormat {
3010 Indent,
3027 Tree,
3051 PostgresJSON,
3099 Graphviz,
3136}
3137
3138impl FromStr for ExplainFormat {
3140 type Err = DataFusionError;
3141
3142 fn from_str(format: &str) -> std::result::Result<Self, Self::Err> {
3143 match format.to_lowercase().as_str() {
3144 "indent" => Ok(ExplainFormat::Indent),
3145 "tree" => Ok(ExplainFormat::Tree),
3146 "pgjson" => Ok(ExplainFormat::PostgresJSON),
3147 "graphviz" => Ok(ExplainFormat::Graphviz),
3148 _ => {
3149 plan_err!("Invalid explain format. Expected 'indent', 'tree', 'pgjson' or 'graphviz'. Got '{format}'")
3150 }
3151 }
3152 }
3153}
3154
3155#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3157pub struct ExplainOption {
3158 pub verbose: bool,
3160 pub analyze: bool,
3162 pub format: ExplainFormat,
3164}
3165
3166impl Default for ExplainOption {
3167 fn default() -> Self {
3168 ExplainOption {
3169 verbose: false,
3170 analyze: false,
3171 format: ExplainFormat::Indent,
3172 }
3173 }
3174}
3175
3176impl ExplainOption {
3177 pub fn with_verbose(mut self, verbose: bool) -> Self {
3179 self.verbose = verbose;
3180 self
3181 }
3182
3183 pub fn with_analyze(mut self, analyze: bool) -> Self {
3185 self.analyze = analyze;
3186 self
3187 }
3188
3189 pub fn with_format(mut self, format: ExplainFormat) -> Self {
3191 self.format = format;
3192 self
3193 }
3194}
3195
3196#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3203pub struct Explain {
3204 pub verbose: bool,
3206 pub explain_format: ExplainFormat,
3209 pub plan: Arc<LogicalPlan>,
3211 pub stringified_plans: Vec<StringifiedPlan>,
3213 pub schema: DFSchemaRef,
3215 pub logical_optimization_succeeded: bool,
3217}
3218
3219impl PartialOrd for Explain {
3221 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3222 #[derive(PartialEq, PartialOrd)]
3223 struct ComparableExplain<'a> {
3224 pub verbose: &'a bool,
3226 pub plan: &'a Arc<LogicalPlan>,
3228 pub stringified_plans: &'a Vec<StringifiedPlan>,
3230 pub logical_optimization_succeeded: &'a bool,
3232 }
3233 let comparable_self = ComparableExplain {
3234 verbose: &self.verbose,
3235 plan: &self.plan,
3236 stringified_plans: &self.stringified_plans,
3237 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3238 };
3239 let comparable_other = ComparableExplain {
3240 verbose: &other.verbose,
3241 plan: &other.plan,
3242 stringified_plans: &other.stringified_plans,
3243 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3244 };
3245 comparable_self.partial_cmp(&comparable_other)
3246 }
3247}
3248
3249#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3252pub struct Analyze {
3253 pub verbose: bool,
3255 pub input: Arc<LogicalPlan>,
3257 pub schema: DFSchemaRef,
3259}
3260
3261impl PartialOrd for Analyze {
3263 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3264 match self.verbose.partial_cmp(&other.verbose) {
3265 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3266 cmp => cmp,
3267 }
3268 }
3269}
3270
3271#[allow(clippy::derived_hash_with_manual_eq)]
3276#[derive(Debug, Clone, Eq, Hash)]
3277pub struct Extension {
3278 pub node: Arc<dyn UserDefinedLogicalNode>,
3280}
3281
3282impl PartialEq for Extension {
3286 fn eq(&self, other: &Self) -> bool {
3287 self.node.eq(&other.node)
3288 }
3289}
3290
3291impl PartialOrd for Extension {
3292 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3293 self.node.partial_cmp(&other.node)
3294 }
3295}
3296
3297#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3299pub struct Limit {
3300 pub skip: Option<Box<Expr>>,
3302 pub fetch: Option<Box<Expr>>,
3305 pub input: Arc<LogicalPlan>,
3307}
3308
3309pub enum SkipType {
3311 Literal(usize),
3313 UnsupportedExpr,
3315}
3316
3317pub enum FetchType {
3319 Literal(Option<usize>),
3322 UnsupportedExpr,
3324}
3325
3326impl Limit {
3327 pub fn get_skip_type(&self) -> Result<SkipType> {
3329 match self.skip.as_deref() {
3330 Some(expr) => match *expr {
3331 Expr::Literal(ScalarValue::Int64(s), _) => {
3332 let s = s.unwrap_or(0);
3334 if s >= 0 {
3335 Ok(SkipType::Literal(s as usize))
3336 } else {
3337 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3338 }
3339 }
3340 _ => Ok(SkipType::UnsupportedExpr),
3341 },
3342 None => Ok(SkipType::Literal(0)),
3344 }
3345 }
3346
3347 pub fn get_fetch_type(&self) -> Result<FetchType> {
3349 match self.fetch.as_deref() {
3350 Some(expr) => match *expr {
3351 Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3352 if s >= 0 {
3353 Ok(FetchType::Literal(Some(s as usize)))
3354 } else {
3355 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3356 }
3357 }
3358 Expr::Literal(ScalarValue::Int64(None), _) => {
3359 Ok(FetchType::Literal(None))
3360 }
3361 _ => Ok(FetchType::UnsupportedExpr),
3362 },
3363 None => Ok(FetchType::Literal(None)),
3364 }
3365 }
3366}
3367
3368#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3370pub enum Distinct {
3371 All(Arc<LogicalPlan>),
3373 On(DistinctOn),
3375}
3376
3377impl Distinct {
3378 pub fn input(&self) -> &Arc<LogicalPlan> {
3380 match self {
3381 Distinct::All(input) => input,
3382 Distinct::On(DistinctOn { input, .. }) => input,
3383 }
3384 }
3385}
3386
3387#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3389pub struct DistinctOn {
3390 pub on_expr: Vec<Expr>,
3392 pub select_expr: Vec<Expr>,
3394 pub sort_expr: Option<Vec<SortExpr>>,
3398 pub input: Arc<LogicalPlan>,
3400 pub schema: DFSchemaRef,
3402}
3403
3404impl DistinctOn {
3405 pub fn try_new(
3407 on_expr: Vec<Expr>,
3408 select_expr: Vec<Expr>,
3409 sort_expr: Option<Vec<SortExpr>>,
3410 input: Arc<LogicalPlan>,
3411 ) -> Result<Self> {
3412 if on_expr.is_empty() {
3413 return plan_err!("No `ON` expressions provided");
3414 }
3415
3416 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3417 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3418 .into_iter()
3419 .collect();
3420
3421 let dfschema = DFSchema::new_with_metadata(
3422 qualified_fields,
3423 input.schema().metadata().clone(),
3424 )?;
3425
3426 let mut distinct_on = DistinctOn {
3427 on_expr,
3428 select_expr,
3429 sort_expr: None,
3430 input,
3431 schema: Arc::new(dfschema),
3432 };
3433
3434 if let Some(sort_expr) = sort_expr {
3435 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3436 }
3437
3438 Ok(distinct_on)
3439 }
3440
3441 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3445 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3446
3447 let mut matched = true;
3449 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3450 if on != &sort.expr {
3451 matched = false;
3452 break;
3453 }
3454 }
3455
3456 if self.on_expr.len() > sort_expr.len() || !matched {
3457 return plan_err!(
3458 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3459 );
3460 }
3461
3462 self.sort_expr = Some(sort_expr);
3463 Ok(self)
3464 }
3465}
3466
3467impl PartialOrd for DistinctOn {
3469 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3470 #[derive(PartialEq, PartialOrd)]
3471 struct ComparableDistinctOn<'a> {
3472 pub on_expr: &'a Vec<Expr>,
3474 pub select_expr: &'a Vec<Expr>,
3476 pub sort_expr: &'a Option<Vec<SortExpr>>,
3480 pub input: &'a Arc<LogicalPlan>,
3482 }
3483 let comparable_self = ComparableDistinctOn {
3484 on_expr: &self.on_expr,
3485 select_expr: &self.select_expr,
3486 sort_expr: &self.sort_expr,
3487 input: &self.input,
3488 };
3489 let comparable_other = ComparableDistinctOn {
3490 on_expr: &other.on_expr,
3491 select_expr: &other.select_expr,
3492 sort_expr: &other.sort_expr,
3493 input: &other.input,
3494 };
3495 comparable_self.partial_cmp(&comparable_other)
3496 }
3497}
3498
3499#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3512#[non_exhaustive]
3514pub struct Aggregate {
3515 pub input: Arc<LogicalPlan>,
3517 pub group_expr: Vec<Expr>,
3519 pub aggr_expr: Vec<Expr>,
3521 pub schema: DFSchemaRef,
3523}
3524
3525impl Aggregate {
3526 pub fn try_new(
3528 input: Arc<LogicalPlan>,
3529 group_expr: Vec<Expr>,
3530 aggr_expr: Vec<Expr>,
3531 ) -> Result<Self> {
3532 let group_expr = enumerate_grouping_sets(group_expr)?;
3533
3534 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3535
3536 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3537
3538 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3539
3540 if is_grouping_set {
3542 qualified_fields = qualified_fields
3543 .into_iter()
3544 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3545 .collect::<Vec<_>>();
3546 qualified_fields.push((
3547 None,
3548 Field::new(
3549 Self::INTERNAL_GROUPING_ID,
3550 Self::grouping_id_type(qualified_fields.len()),
3551 false,
3552 )
3553 .into(),
3554 ));
3555 }
3556
3557 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3558
3559 let schema = DFSchema::new_with_metadata(
3560 qualified_fields,
3561 input.schema().metadata().clone(),
3562 )?;
3563
3564 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3565 }
3566
3567 pub fn try_new_with_schema(
3573 input: Arc<LogicalPlan>,
3574 group_expr: Vec<Expr>,
3575 aggr_expr: Vec<Expr>,
3576 schema: DFSchemaRef,
3577 ) -> Result<Self> {
3578 if group_expr.is_empty() && aggr_expr.is_empty() {
3579 return plan_err!(
3580 "Aggregate requires at least one grouping or aggregate expression. \
3581 Aggregate without grouping expressions nor aggregate expressions is \
3582 logically equivalent to, but less efficient than, VALUES producing \
3583 single row. Please use VALUES instead."
3584 );
3585 }
3586 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3587 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3588 return plan_err!(
3589 "Aggregate schema has wrong number of fields. Expected {} got {}",
3590 group_expr_count + aggr_expr.len(),
3591 schema.fields().len()
3592 );
3593 }
3594
3595 let aggregate_func_dependencies =
3596 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3597 let new_schema = schema.as_ref().clone();
3598 let schema = Arc::new(
3599 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3600 );
3601 Ok(Self {
3602 input,
3603 group_expr,
3604 aggr_expr,
3605 schema,
3606 })
3607 }
3608
3609 fn is_grouping_set(&self) -> bool {
3610 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3611 }
3612
3613 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3615 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3616 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3617 });
3618 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3619 if self.is_grouping_set() {
3620 exprs.push(&INTERNAL_ID_EXPR);
3621 }
3622 exprs.extend(self.aggr_expr.iter());
3623 debug_assert!(exprs.len() == self.schema.fields().len());
3624 Ok(exprs)
3625 }
3626
3627 pub fn group_expr_len(&self) -> Result<usize> {
3631 grouping_set_expr_count(&self.group_expr)
3632 }
3633
3634 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3639 if group_exprs <= 8 {
3640 DataType::UInt8
3641 } else if group_exprs <= 16 {
3642 DataType::UInt16
3643 } else if group_exprs <= 32 {
3644 DataType::UInt32
3645 } else {
3646 DataType::UInt64
3647 }
3648 }
3649
3650 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3668}
3669
3670impl PartialOrd for Aggregate {
3672 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3673 match self.input.partial_cmp(&other.input) {
3674 Some(Ordering::Equal) => {
3675 match self.group_expr.partial_cmp(&other.group_expr) {
3676 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3677 cmp => cmp,
3678 }
3679 }
3680 cmp => cmp,
3681 }
3682 }
3683}
3684
3685fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3687 group_expr
3688 .iter()
3689 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3690}
3691
3692fn calc_func_dependencies_for_aggregate(
3694 group_expr: &[Expr],
3696 input: &LogicalPlan,
3698 aggr_schema: &DFSchema,
3700) -> Result<FunctionalDependencies> {
3701 if !contains_grouping_set(group_expr) {
3707 let group_by_expr_names = group_expr
3708 .iter()
3709 .map(|item| item.schema_name().to_string())
3710 .collect::<IndexSet<_>>()
3711 .into_iter()
3712 .collect::<Vec<_>>();
3713 let aggregate_func_dependencies = aggregate_functional_dependencies(
3714 input.schema(),
3715 &group_by_expr_names,
3716 aggr_schema,
3717 );
3718 Ok(aggregate_func_dependencies)
3719 } else {
3720 Ok(FunctionalDependencies::empty())
3721 }
3722}
3723
3724fn calc_func_dependencies_for_project(
3727 exprs: &[Expr],
3728 input: &LogicalPlan,
3729) -> Result<FunctionalDependencies> {
3730 let input_fields = input.schema().field_names();
3731 let proj_indices = exprs
3733 .iter()
3734 .map(|expr| match expr {
3735 #[expect(deprecated)]
3736 Expr::Wildcard { qualifier, options } => {
3737 let wildcard_fields = exprlist_to_fields(
3738 vec![&Expr::Wildcard {
3739 qualifier: qualifier.clone(),
3740 options: options.clone(),
3741 }],
3742 input,
3743 )?;
3744 Ok::<_, DataFusionError>(
3745 wildcard_fields
3746 .into_iter()
3747 .filter_map(|(qualifier, f)| {
3748 let flat_name = qualifier
3749 .map(|t| format!("{}.{}", t, f.name()))
3750 .unwrap_or_else(|| f.name().clone());
3751 input_fields.iter().position(|item| *item == flat_name)
3752 })
3753 .collect::<Vec<_>>(),
3754 )
3755 }
3756 Expr::Alias(alias) => {
3757 let name = format!("{}", alias.expr);
3758 Ok(input_fields
3759 .iter()
3760 .position(|item| *item == name)
3761 .map(|i| vec![i])
3762 .unwrap_or(vec![]))
3763 }
3764 _ => {
3765 let name = format!("{expr}");
3766 Ok(input_fields
3767 .iter()
3768 .position(|item| *item == name)
3769 .map(|i| vec![i])
3770 .unwrap_or(vec![]))
3771 }
3772 })
3773 .collect::<Result<Vec<_>>>()?
3774 .into_iter()
3775 .flatten()
3776 .collect::<Vec<_>>();
3777
3778 Ok(input
3779 .schema()
3780 .functional_dependencies()
3781 .project_functional_dependencies(&proj_indices, exprs.len()))
3782}
3783
3784#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3786pub struct Sort {
3787 pub expr: Vec<SortExpr>,
3789 pub input: Arc<LogicalPlan>,
3791 pub fetch: Option<usize>,
3793}
3794
3795#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3797pub struct Join {
3798 pub left: Arc<LogicalPlan>,
3800 pub right: Arc<LogicalPlan>,
3802 pub on: Vec<(Expr, Expr)>,
3804 pub filter: Option<Expr>,
3806 pub join_type: JoinType,
3808 pub join_constraint: JoinConstraint,
3810 pub schema: DFSchemaRef,
3812 pub null_equality: NullEquality,
3814}
3815
3816impl Join {
3817 pub fn try_new(
3836 left: Arc<LogicalPlan>,
3837 right: Arc<LogicalPlan>,
3838 on: Vec<(Expr, Expr)>,
3839 filter: Option<Expr>,
3840 join_type: JoinType,
3841 join_constraint: JoinConstraint,
3842 null_equality: NullEquality,
3843 ) -> Result<Self> {
3844 let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3845
3846 Ok(Join {
3847 left,
3848 right,
3849 on,
3850 filter,
3851 join_type,
3852 join_constraint,
3853 schema: Arc::new(join_schema),
3854 null_equality,
3855 })
3856 }
3857
3858 pub fn try_new_with_project_input(
3861 original: &LogicalPlan,
3862 left: Arc<LogicalPlan>,
3863 right: Arc<LogicalPlan>,
3864 column_on: (Vec<Column>, Vec<Column>),
3865 ) -> Result<(Self, bool)> {
3866 let original_join = match original {
3867 LogicalPlan::Join(join) => join,
3868 _ => return plan_err!("Could not create join with project input"),
3869 };
3870
3871 let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3872 let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3873
3874 let mut requalified = false;
3875
3876 if original_join.join_type == JoinType::Inner
3879 || original_join.join_type == JoinType::Left
3880 || original_join.join_type == JoinType::Right
3881 || original_join.join_type == JoinType::Full
3882 {
3883 (left_sch, right_sch, requalified) =
3884 requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3885 }
3886
3887 let on: Vec<(Expr, Expr)> = column_on
3888 .0
3889 .into_iter()
3890 .zip(column_on.1)
3891 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3892 .collect();
3893
3894 let join_schema = build_join_schema(
3895 left_sch.schema(),
3896 right_sch.schema(),
3897 &original_join.join_type,
3898 )?;
3899
3900 Ok((
3901 Join {
3902 left,
3903 right,
3904 on,
3905 filter: original_join.filter.clone(),
3906 join_type: original_join.join_type,
3907 join_constraint: original_join.join_constraint,
3908 schema: Arc::new(join_schema),
3909 null_equality: original_join.null_equality,
3910 },
3911 requalified,
3912 ))
3913 }
3914}
3915
3916impl PartialOrd for Join {
3918 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3919 #[derive(PartialEq, PartialOrd)]
3920 struct ComparableJoin<'a> {
3921 pub left: &'a Arc<LogicalPlan>,
3923 pub right: &'a Arc<LogicalPlan>,
3925 pub on: &'a Vec<(Expr, Expr)>,
3927 pub filter: &'a Option<Expr>,
3929 pub join_type: &'a JoinType,
3931 pub join_constraint: &'a JoinConstraint,
3933 pub null_equality: &'a NullEquality,
3935 }
3936 let comparable_self = ComparableJoin {
3937 left: &self.left,
3938 right: &self.right,
3939 on: &self.on,
3940 filter: &self.filter,
3941 join_type: &self.join_type,
3942 join_constraint: &self.join_constraint,
3943 null_equality: &self.null_equality,
3944 };
3945 let comparable_other = ComparableJoin {
3946 left: &other.left,
3947 right: &other.right,
3948 on: &other.on,
3949 filter: &other.filter,
3950 join_type: &other.join_type,
3951 join_constraint: &other.join_constraint,
3952 null_equality: &other.null_equality,
3953 };
3954 comparable_self.partial_cmp(&comparable_other)
3955 }
3956}
3957
3958#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3960pub struct Subquery {
3961 pub subquery: Arc<LogicalPlan>,
3963 pub outer_ref_columns: Vec<Expr>,
3965 pub spans: Spans,
3967}
3968
3969impl Normalizeable for Subquery {
3970 fn can_normalize(&self) -> bool {
3971 false
3972 }
3973}
3974
3975impl NormalizeEq for Subquery {
3976 fn normalize_eq(&self, other: &Self) -> bool {
3977 *self.subquery == *other.subquery
3979 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3980 && self
3981 .outer_ref_columns
3982 .iter()
3983 .zip(other.outer_ref_columns.iter())
3984 .all(|(a, b)| a.normalize_eq(b))
3985 }
3986}
3987
3988impl Subquery {
3989 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3990 match plan {
3991 Expr::ScalarSubquery(it) => Ok(it),
3992 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3993 _ => plan_err!("Could not coerce into ScalarSubquery!"),
3994 }
3995 }
3996
3997 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3998 Subquery {
3999 subquery: plan,
4000 outer_ref_columns: self.outer_ref_columns.clone(),
4001 spans: Spans::new(),
4002 }
4003 }
4004}
4005
4006impl Debug for Subquery {
4007 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4008 write!(f, "<subquery>")
4009 }
4010}
4011
4012#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
4018pub enum Partitioning {
4019 RoundRobinBatch(usize),
4021 Hash(Vec<Expr>, usize),
4024 DistributeBy(Vec<Expr>),
4026}
4027
4028#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4048pub struct ColumnUnnestList {
4049 pub output_column: Column,
4050 pub depth: usize,
4051}
4052
4053impl Display for ColumnUnnestList {
4054 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4055 write!(f, "{}|depth={}", self.output_column, self.depth)
4056 }
4057}
4058
4059#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4062pub struct Unnest {
4063 pub input: Arc<LogicalPlan>,
4065 pub exec_columns: Vec<Column>,
4067 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4070 pub struct_type_columns: Vec<usize>,
4073 pub dependency_indices: Vec<usize>,
4076 pub schema: DFSchemaRef,
4078 pub options: UnnestOptions,
4080}
4081
4082impl PartialOrd for Unnest {
4084 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4085 #[derive(PartialEq, PartialOrd)]
4086 struct ComparableUnnest<'a> {
4087 pub input: &'a Arc<LogicalPlan>,
4089 pub exec_columns: &'a Vec<Column>,
4091 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4094 pub struct_type_columns: &'a Vec<usize>,
4097 pub dependency_indices: &'a Vec<usize>,
4100 pub options: &'a UnnestOptions,
4102 }
4103 let comparable_self = ComparableUnnest {
4104 input: &self.input,
4105 exec_columns: &self.exec_columns,
4106 list_type_columns: &self.list_type_columns,
4107 struct_type_columns: &self.struct_type_columns,
4108 dependency_indices: &self.dependency_indices,
4109 options: &self.options,
4110 };
4111 let comparable_other = ComparableUnnest {
4112 input: &other.input,
4113 exec_columns: &other.exec_columns,
4114 list_type_columns: &other.list_type_columns,
4115 struct_type_columns: &other.struct_type_columns,
4116 dependency_indices: &other.dependency_indices,
4117 options: &other.options,
4118 };
4119 comparable_self.partial_cmp(&comparable_other)
4120 }
4121}
4122
4123impl Unnest {
4124 pub fn try_new(
4125 input: Arc<LogicalPlan>,
4126 exec_columns: Vec<Column>,
4127 options: UnnestOptions,
4128 ) -> Result<Self> {
4129 if exec_columns.is_empty() {
4130 return plan_err!("unnest plan requires at least 1 column to unnest");
4131 }
4132
4133 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4134 let mut struct_columns = vec![];
4135 let indices_to_unnest = exec_columns
4136 .iter()
4137 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4138 .collect::<Result<HashMap<usize, &Column>>>()?;
4139
4140 let input_schema = input.schema();
4141
4142 let mut dependency_indices = vec![];
4143 let fields = input_schema
4159 .iter()
4160 .enumerate()
4161 .map(|(index, (original_qualifier, original_field))| {
4162 match indices_to_unnest.get(&index) {
4163 Some(column_to_unnest) => {
4164 let recursions_on_column = options
4165 .recursions
4166 .iter()
4167 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4168 .collect::<Vec<_>>();
4169 let mut transformed_columns = recursions_on_column
4170 .iter()
4171 .map(|r| {
4172 list_columns.push((
4173 index,
4174 ColumnUnnestList {
4175 output_column: r.output_column.clone(),
4176 depth: r.depth,
4177 },
4178 ));
4179 Ok(get_unnested_columns(
4180 &r.output_column.name,
4181 original_field.data_type(),
4182 r.depth,
4183 )?
4184 .into_iter()
4185 .next()
4186 .unwrap()) })
4188 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4189 if transformed_columns.is_empty() {
4190 transformed_columns = get_unnested_columns(
4191 &column_to_unnest.name,
4192 original_field.data_type(),
4193 1,
4194 )?;
4195 match original_field.data_type() {
4196 DataType::Struct(_) => {
4197 struct_columns.push(index);
4198 }
4199 DataType::List(_)
4200 | DataType::FixedSizeList(_, _)
4201 | DataType::LargeList(_) => {
4202 list_columns.push((
4203 index,
4204 ColumnUnnestList {
4205 output_column: Column::from_name(
4206 &column_to_unnest.name,
4207 ),
4208 depth: 1,
4209 },
4210 ));
4211 }
4212 _ => {}
4213 };
4214 }
4215
4216 dependency_indices.extend(std::iter::repeat_n(
4218 index,
4219 transformed_columns.len(),
4220 ));
4221 Ok(transformed_columns
4222 .iter()
4223 .map(|(col, field)| {
4224 (col.relation.to_owned(), field.to_owned())
4225 })
4226 .collect())
4227 }
4228 None => {
4229 dependency_indices.push(index);
4230 Ok(vec![(
4231 original_qualifier.cloned(),
4232 Arc::clone(original_field),
4233 )])
4234 }
4235 }
4236 })
4237 .collect::<Result<Vec<_>>>()?
4238 .into_iter()
4239 .flatten()
4240 .collect::<Vec<_>>();
4241
4242 let metadata = input_schema.metadata().clone();
4243 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4244 let deps = input_schema.functional_dependencies().clone();
4246 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4247
4248 Ok(Unnest {
4249 input,
4250 exec_columns,
4251 list_type_columns: list_columns,
4252 struct_type_columns: struct_columns,
4253 dependency_indices,
4254 schema,
4255 options,
4256 })
4257 }
4258}
4259
4260fn get_unnested_columns(
4269 col_name: &String,
4270 data_type: &DataType,
4271 depth: usize,
4272) -> Result<Vec<(Column, Arc<Field>)>> {
4273 let mut qualified_columns = Vec::with_capacity(1);
4274
4275 match data_type {
4276 DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4277 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4278 let new_field = Arc::new(Field::new(
4279 col_name, data_type,
4280 true,
4283 ));
4284 let column = Column::from_name(col_name);
4285 qualified_columns.push((column, new_field));
4287 }
4288 DataType::Struct(fields) => {
4289 qualified_columns.extend(fields.iter().map(|f| {
4290 let new_name = format!("{}.{}", col_name, f.name());
4291 let column = Column::from_name(&new_name);
4292 let new_field = f.as_ref().clone().with_name(new_name);
4293 (column, Arc::new(new_field))
4295 }))
4296 }
4297 _ => {
4298 return internal_err!(
4299 "trying to unnest on invalid data type {:?}",
4300 data_type
4301 );
4302 }
4303 };
4304 Ok(qualified_columns)
4305}
4306
4307fn get_unnested_list_datatype_recursive(
4310 data_type: &DataType,
4311 depth: usize,
4312) -> Result<DataType> {
4313 match data_type {
4314 DataType::List(field)
4315 | DataType::FixedSizeList(field, _)
4316 | DataType::LargeList(field) => {
4317 if depth == 1 {
4318 return Ok(field.data_type().clone());
4319 }
4320 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4321 }
4322 _ => {}
4323 };
4324
4325 internal_err!("trying to unnest on invalid data type {:?}", data_type)
4326}
4327
4328#[cfg(test)]
4329mod tests {
4330 use super::*;
4331 use crate::builder::LogicalTableSource;
4332 use crate::logical_plan::table_scan;
4333 use crate::test::function_stub::{count, count_udaf};
4334 use crate::{
4335 binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
4336 GroupingSet,
4337 };
4338 use datafusion_common::tree_node::{
4339 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4340 };
4341 use datafusion_common::{not_impl_err, Constraint, ScalarValue};
4342 use insta::{assert_debug_snapshot, assert_snapshot};
4343 use std::hash::DefaultHasher;
4344
4345 fn employee_schema() -> Schema {
4346 Schema::new(vec![
4347 Field::new("id", DataType::Int32, false),
4348 Field::new("first_name", DataType::Utf8, false),
4349 Field::new("last_name", DataType::Utf8, false),
4350 Field::new("state", DataType::Utf8, false),
4351 Field::new("salary", DataType::Int32, false),
4352 ])
4353 }
4354
4355 fn display_plan() -> Result<LogicalPlan> {
4356 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4357 .build()?;
4358
4359 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4360 .filter(in_subquery(col("state"), Arc::new(plan1)))?
4361 .project(vec![col("id")])?
4362 .build()
4363 }
4364
4365 #[test]
4366 fn test_display_indent() -> Result<()> {
4367 let plan = display_plan()?;
4368
4369 assert_snapshot!(plan.display_indent(), @r"
4370 Projection: employee_csv.id
4371 Filter: employee_csv.state IN (<subquery>)
4372 Subquery:
4373 TableScan: employee_csv projection=[state]
4374 TableScan: employee_csv projection=[id, state]
4375 ");
4376 Ok(())
4377 }
4378
4379 #[test]
4380 fn test_display_indent_schema() -> Result<()> {
4381 let plan = display_plan()?;
4382
4383 assert_snapshot!(plan.display_indent_schema(), @r"
4384 Projection: employee_csv.id [id:Int32]
4385 Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4386 Subquery: [state:Utf8]
4387 TableScan: employee_csv projection=[state] [state:Utf8]
4388 TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4389 ");
4390 Ok(())
4391 }
4392
4393 #[test]
4394 fn test_display_subquery_alias() -> Result<()> {
4395 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4396 .build()?;
4397 let plan1 = Arc::new(plan1);
4398
4399 let plan =
4400 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4401 .project(vec![col("id"), exists(plan1).alias("exists")])?
4402 .build();
4403
4404 assert_snapshot!(plan?.display_indent(), @r"
4405 Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4406 Subquery:
4407 TableScan: employee_csv projection=[state]
4408 TableScan: employee_csv projection=[id, state]
4409 ");
4410 Ok(())
4411 }
4412
4413 #[test]
4414 fn test_display_graphviz() -> Result<()> {
4415 let plan = display_plan()?;
4416
4417 assert_snapshot!(plan.display_graphviz(), @r#"
4420 // Begin DataFusion GraphViz Plan,
4421 // display it online here: https://dreampuf.github.io/GraphvizOnline
4422
4423 digraph {
4424 subgraph cluster_1
4425 {
4426 graph[label="LogicalPlan"]
4427 2[shape=box label="Projection: employee_csv.id"]
4428 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4429 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4430 4[shape=box label="Subquery:"]
4431 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4432 5[shape=box label="TableScan: employee_csv projection=[state]"]
4433 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4434 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4435 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4436 }
4437 subgraph cluster_7
4438 {
4439 graph[label="Detailed LogicalPlan"]
4440 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4441 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4442 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4443 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4444 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4445 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4446 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4447 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4448 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4449 }
4450 }
4451 // End DataFusion GraphViz Plan
4452 "#);
4453 Ok(())
4454 }
4455
4456 #[test]
4457 fn test_display_pg_json() -> Result<()> {
4458 let plan = display_plan()?;
4459
4460 assert_snapshot!(plan.display_pg_json(), @r#"
4461 [
4462 {
4463 "Plan": {
4464 "Expressions": [
4465 "employee_csv.id"
4466 ],
4467 "Node Type": "Projection",
4468 "Output": [
4469 "id"
4470 ],
4471 "Plans": [
4472 {
4473 "Condition": "employee_csv.state IN (<subquery>)",
4474 "Node Type": "Filter",
4475 "Output": [
4476 "id",
4477 "state"
4478 ],
4479 "Plans": [
4480 {
4481 "Node Type": "Subquery",
4482 "Output": [
4483 "state"
4484 ],
4485 "Plans": [
4486 {
4487 "Node Type": "TableScan",
4488 "Output": [
4489 "state"
4490 ],
4491 "Plans": [],
4492 "Relation Name": "employee_csv"
4493 }
4494 ]
4495 },
4496 {
4497 "Node Type": "TableScan",
4498 "Output": [
4499 "id",
4500 "state"
4501 ],
4502 "Plans": [],
4503 "Relation Name": "employee_csv"
4504 }
4505 ]
4506 }
4507 ]
4508 }
4509 }
4510 ]
4511 "#);
4512 Ok(())
4513 }
4514
4515 #[derive(Debug, Default)]
4517 struct OkVisitor {
4518 strings: Vec<String>,
4519 }
4520
4521 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4522 type Node = LogicalPlan;
4523
4524 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4525 let s = match plan {
4526 LogicalPlan::Projection { .. } => "pre_visit Projection",
4527 LogicalPlan::Filter { .. } => "pre_visit Filter",
4528 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4529 _ => {
4530 return not_impl_err!("unknown plan type");
4531 }
4532 };
4533
4534 self.strings.push(s.into());
4535 Ok(TreeNodeRecursion::Continue)
4536 }
4537
4538 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4539 let s = match plan {
4540 LogicalPlan::Projection { .. } => "post_visit Projection",
4541 LogicalPlan::Filter { .. } => "post_visit Filter",
4542 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4543 _ => {
4544 return not_impl_err!("unknown plan type");
4545 }
4546 };
4547
4548 self.strings.push(s.into());
4549 Ok(TreeNodeRecursion::Continue)
4550 }
4551 }
4552
4553 #[test]
4554 fn visit_order() {
4555 let mut visitor = OkVisitor::default();
4556 let plan = test_plan();
4557 let res = plan.visit_with_subqueries(&mut visitor);
4558 assert!(res.is_ok());
4559
4560 assert_debug_snapshot!(visitor.strings, @r#"
4561 [
4562 "pre_visit Projection",
4563 "pre_visit Filter",
4564 "pre_visit TableScan",
4565 "post_visit TableScan",
4566 "post_visit Filter",
4567 "post_visit Projection",
4568 ]
4569 "#);
4570 }
4571
4572 #[derive(Debug, Default)]
4573 struct OptionalCounter {
4575 val: Option<usize>,
4576 }
4577
4578 impl OptionalCounter {
4579 fn new(val: usize) -> Self {
4580 Self { val: Some(val) }
4581 }
4582 fn dec(&mut self) -> bool {
4584 if Some(0) == self.val {
4585 true
4586 } else {
4587 self.val = self.val.take().map(|i| i - 1);
4588 false
4589 }
4590 }
4591 }
4592
4593 #[derive(Debug, Default)]
4594 struct StoppingVisitor {
4596 inner: OkVisitor,
4597 return_false_from_pre_in: OptionalCounter,
4599 return_false_from_post_in: OptionalCounter,
4601 }
4602
4603 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4604 type Node = LogicalPlan;
4605
4606 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4607 if self.return_false_from_pre_in.dec() {
4608 return Ok(TreeNodeRecursion::Stop);
4609 }
4610 self.inner.f_down(plan)?;
4611
4612 Ok(TreeNodeRecursion::Continue)
4613 }
4614
4615 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4616 if self.return_false_from_post_in.dec() {
4617 return Ok(TreeNodeRecursion::Stop);
4618 }
4619
4620 self.inner.f_up(plan)
4621 }
4622 }
4623
4624 #[test]
4626 fn early_stopping_pre_visit() {
4627 let mut visitor = StoppingVisitor {
4628 return_false_from_pre_in: OptionalCounter::new(2),
4629 ..Default::default()
4630 };
4631 let plan = test_plan();
4632 let res = plan.visit_with_subqueries(&mut visitor);
4633 assert!(res.is_ok());
4634
4635 assert_debug_snapshot!(
4636 visitor.inner.strings,
4637 @r#"
4638 [
4639 "pre_visit Projection",
4640 "pre_visit Filter",
4641 ]
4642 "#
4643 );
4644 }
4645
4646 #[test]
4647 fn early_stopping_post_visit() {
4648 let mut visitor = StoppingVisitor {
4649 return_false_from_post_in: OptionalCounter::new(1),
4650 ..Default::default()
4651 };
4652 let plan = test_plan();
4653 let res = plan.visit_with_subqueries(&mut visitor);
4654 assert!(res.is_ok());
4655
4656 assert_debug_snapshot!(
4657 visitor.inner.strings,
4658 @r#"
4659 [
4660 "pre_visit Projection",
4661 "pre_visit Filter",
4662 "pre_visit TableScan",
4663 "post_visit TableScan",
4664 ]
4665 "#
4666 );
4667 }
4668
4669 #[derive(Debug, Default)]
4670 struct ErrorVisitor {
4672 inner: OkVisitor,
4673 return_error_from_pre_in: OptionalCounter,
4675 return_error_from_post_in: OptionalCounter,
4677 }
4678
4679 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4680 type Node = LogicalPlan;
4681
4682 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4683 if self.return_error_from_pre_in.dec() {
4684 return not_impl_err!("Error in pre_visit");
4685 }
4686
4687 self.inner.f_down(plan)
4688 }
4689
4690 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4691 if self.return_error_from_post_in.dec() {
4692 return not_impl_err!("Error in post_visit");
4693 }
4694
4695 self.inner.f_up(plan)
4696 }
4697 }
4698
4699 #[test]
4700 fn error_pre_visit() {
4701 let mut visitor = ErrorVisitor {
4702 return_error_from_pre_in: OptionalCounter::new(2),
4703 ..Default::default()
4704 };
4705 let plan = test_plan();
4706 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4707 assert_snapshot!(
4708 res.strip_backtrace(),
4709 @"This feature is not implemented: Error in pre_visit"
4710 );
4711 assert_debug_snapshot!(
4712 visitor.inner.strings,
4713 @r#"
4714 [
4715 "pre_visit Projection",
4716 "pre_visit Filter",
4717 ]
4718 "#
4719 );
4720 }
4721
4722 #[test]
4723 fn error_post_visit() {
4724 let mut visitor = ErrorVisitor {
4725 return_error_from_post_in: OptionalCounter::new(1),
4726 ..Default::default()
4727 };
4728 let plan = test_plan();
4729 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4730 assert_snapshot!(
4731 res.strip_backtrace(),
4732 @"This feature is not implemented: Error in post_visit"
4733 );
4734 assert_debug_snapshot!(
4735 visitor.inner.strings,
4736 @r#"
4737 [
4738 "pre_visit Projection",
4739 "pre_visit Filter",
4740 "pre_visit TableScan",
4741 "post_visit TableScan",
4742 ]
4743 "#
4744 );
4745 }
4746
4747 #[test]
4748 fn test_partial_eq_hash_and_partial_ord() {
4749 let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4750 produce_one_row: true,
4751 schema: Arc::new(DFSchema::empty()),
4752 }));
4753
4754 let count_window_function = |schema| {
4755 Window::try_new_with_schema(
4756 vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4757 WindowFunctionDefinition::AggregateUDF(count_udaf()),
4758 vec![],
4759 )))],
4760 Arc::clone(&empty_values),
4761 Arc::new(schema),
4762 )
4763 .unwrap()
4764 };
4765
4766 let schema_without_metadata = || {
4767 DFSchema::from_unqualified_fields(
4768 vec![Field::new("count", DataType::Int64, false)].into(),
4769 HashMap::new(),
4770 )
4771 .unwrap()
4772 };
4773
4774 let schema_with_metadata = || {
4775 DFSchema::from_unqualified_fields(
4776 vec![Field::new("count", DataType::Int64, false)].into(),
4777 [("key".to_string(), "value".to_string())].into(),
4778 )
4779 .unwrap()
4780 };
4781
4782 let f = count_window_function(schema_without_metadata());
4784
4785 let f2 = count_window_function(schema_without_metadata());
4787 assert_eq!(f, f2);
4788 assert_eq!(hash(&f), hash(&f2));
4789 assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4790
4791 let o = count_window_function(schema_with_metadata());
4793 assert_ne!(f, o);
4794 assert_ne!(hash(&f), hash(&o)); assert_eq!(f.partial_cmp(&o), None);
4796 }
4797
4798 fn hash<T: Hash>(value: &T) -> u64 {
4799 let hasher = &mut DefaultHasher::new();
4800 value.hash(hasher);
4801 hasher.finish()
4802 }
4803
4804 #[test]
4805 fn projection_expr_schema_mismatch() -> Result<()> {
4806 let empty_schema = Arc::new(DFSchema::empty());
4807 let p = Projection::try_new_with_schema(
4808 vec![col("a")],
4809 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4810 produce_one_row: false,
4811 schema: Arc::clone(&empty_schema),
4812 })),
4813 empty_schema,
4814 );
4815 assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4816 Ok(())
4817 }
4818
4819 fn test_plan() -> LogicalPlan {
4820 let schema = Schema::new(vec![
4821 Field::new("id", DataType::Int32, false),
4822 Field::new("state", DataType::Utf8, false),
4823 ]);
4824
4825 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4826 .unwrap()
4827 .filter(col("state").eq(lit("CO")))
4828 .unwrap()
4829 .project(vec![col("id")])
4830 .unwrap()
4831 .build()
4832 .unwrap()
4833 }
4834
4835 #[test]
4836 fn test_replace_invalid_placeholder() {
4837 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4839
4840 let plan = table_scan(TableReference::none(), &schema, None)
4841 .unwrap()
4842 .filter(col("id").eq(placeholder("")))
4843 .unwrap()
4844 .build()
4845 .unwrap();
4846
4847 let param_values = vec![ScalarValue::Int32(Some(42))];
4848 plan.replace_params_with_values(¶m_values.clone().into())
4849 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4850
4851 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4853
4854 let plan = table_scan(TableReference::none(), &schema, None)
4855 .unwrap()
4856 .filter(col("id").eq(placeholder("$0")))
4857 .unwrap()
4858 .build()
4859 .unwrap();
4860
4861 plan.replace_params_with_values(¶m_values.clone().into())
4862 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4863
4864 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4866
4867 let plan = table_scan(TableReference::none(), &schema, None)
4868 .unwrap()
4869 .filter(col("id").eq(placeholder("$00")))
4870 .unwrap()
4871 .build()
4872 .unwrap();
4873
4874 plan.replace_params_with_values(¶m_values.into())
4875 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4876 }
4877
4878 #[test]
4879 fn test_nullable_schema_after_grouping_set() {
4880 let schema = Schema::new(vec![
4881 Field::new("foo", DataType::Int32, false),
4882 Field::new("bar", DataType::Int32, false),
4883 ]);
4884
4885 let plan = table_scan(TableReference::none(), &schema, None)
4886 .unwrap()
4887 .aggregate(
4888 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4889 vec![col("foo")],
4890 vec![col("bar")],
4891 ]))],
4892 vec![count(lit(true))],
4893 )
4894 .unwrap()
4895 .build()
4896 .unwrap();
4897
4898 let output_schema = plan.schema();
4899
4900 assert!(output_schema
4901 .field_with_name(None, "foo")
4902 .unwrap()
4903 .is_nullable(),);
4904 assert!(output_schema
4905 .field_with_name(None, "bar")
4906 .unwrap()
4907 .is_nullable());
4908 }
4909
4910 #[test]
4911 fn test_filter_is_scalar() {
4912 let schema =
4914 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4915
4916 let source = Arc::new(LogicalTableSource::new(schema));
4917 let schema = Arc::new(
4918 DFSchema::try_from_qualified_schema(
4919 TableReference::bare("tab"),
4920 &source.schema(),
4921 )
4922 .unwrap(),
4923 );
4924 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4925 table_name: TableReference::bare("tab"),
4926 source: Arc::clone(&source) as Arc<dyn TableSource>,
4927 projection: None,
4928 projected_schema: Arc::clone(&schema),
4929 filters: vec![],
4930 fetch: None,
4931 }));
4932 let col = schema.field_names()[0].clone();
4933
4934 let filter = Filter::try_new(
4935 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4936 scan,
4937 )
4938 .unwrap();
4939 assert!(!filter.is_scalar());
4940 let unique_schema = Arc::new(
4941 schema
4942 .as_ref()
4943 .clone()
4944 .with_functional_dependencies(
4945 FunctionalDependencies::new_from_constraints(
4946 Some(&Constraints::new_unverified(vec![Constraint::Unique(
4947 vec![0],
4948 )])),
4949 1,
4950 ),
4951 )
4952 .unwrap(),
4953 );
4954 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4955 table_name: TableReference::bare("tab"),
4956 source,
4957 projection: None,
4958 projected_schema: Arc::clone(&unique_schema),
4959 filters: vec![],
4960 fetch: None,
4961 }));
4962 let col = schema.field_names()[0].clone();
4963
4964 let filter =
4965 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4966 assert!(filter.is_scalar());
4967 }
4968
4969 #[test]
4970 fn test_transform_explain() {
4971 let schema = Schema::new(vec![
4972 Field::new("foo", DataType::Int32, false),
4973 Field::new("bar", DataType::Int32, false),
4974 ]);
4975
4976 let plan = table_scan(TableReference::none(), &schema, None)
4977 .unwrap()
4978 .explain(false, false)
4979 .unwrap()
4980 .build()
4981 .unwrap();
4982
4983 let external_filter = col("foo").eq(lit(true));
4984
4985 let plan = plan
4988 .transform(|plan| match plan {
4989 LogicalPlan::TableScan(table) => {
4990 let filter = Filter::try_new(
4991 external_filter.clone(),
4992 Arc::new(LogicalPlan::TableScan(table)),
4993 )
4994 .unwrap();
4995 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4996 }
4997 x => Ok(Transformed::no(x)),
4998 })
4999 .data()
5000 .unwrap();
5001
5002 let actual = format!("{}", plan.display_indent());
5003 assert_snapshot!(actual, @r"
5004 Explain
5005 Filter: foo = Boolean(true)
5006 TableScan: ?table?
5007 ")
5008 }
5009
5010 #[test]
5011 fn test_plan_partial_ord() {
5012 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5013 produce_one_row: false,
5014 schema: Arc::new(DFSchema::empty()),
5015 });
5016
5017 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5018 schema: Arc::new(Schema::new(vec![Field::new(
5019 "foo",
5020 DataType::Int32,
5021 false,
5022 )])),
5023 output_schema: DFSchemaRef::new(DFSchema::empty()),
5024 });
5025
5026 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5027 schema: Arc::new(Schema::new(vec![Field::new(
5028 "foo",
5029 DataType::Int32,
5030 false,
5031 )])),
5032 output_schema: DFSchemaRef::new(DFSchema::empty()),
5033 });
5034
5035 assert_eq!(
5036 empty_relation.partial_cmp(&describe_table),
5037 Some(Ordering::Less)
5038 );
5039 assert_eq!(
5040 describe_table.partial_cmp(&empty_relation),
5041 Some(Ordering::Greater)
5042 );
5043 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5044 }
5045
5046 #[test]
5047 fn test_limit_with_new_children() {
5048 let input = Arc::new(LogicalPlan::Values(Values {
5049 schema: Arc::new(DFSchema::empty()),
5050 values: vec![vec![]],
5051 }));
5052 let cases = [
5053 LogicalPlan::Limit(Limit {
5054 skip: None,
5055 fetch: None,
5056 input: Arc::clone(&input),
5057 }),
5058 LogicalPlan::Limit(Limit {
5059 skip: None,
5060 fetch: Some(Box::new(Expr::Literal(
5061 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5062 None,
5063 ))),
5064 input: Arc::clone(&input),
5065 }),
5066 LogicalPlan::Limit(Limit {
5067 skip: Some(Box::new(Expr::Literal(
5068 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5069 None,
5070 ))),
5071 fetch: None,
5072 input: Arc::clone(&input),
5073 }),
5074 LogicalPlan::Limit(Limit {
5075 skip: Some(Box::new(Expr::Literal(
5076 ScalarValue::new_one(&DataType::UInt32).unwrap(),
5077 None,
5078 ))),
5079 fetch: Some(Box::new(Expr::Literal(
5080 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5081 None,
5082 ))),
5083 input,
5084 }),
5085 ];
5086
5087 for limit in cases {
5088 let new_limit = limit
5089 .with_new_exprs(
5090 limit.expressions(),
5091 limit.inputs().into_iter().cloned().collect(),
5092 )
5093 .unwrap();
5094 assert_eq!(limit, new_limit);
5095 }
5096 }
5097
5098 #[test]
5099 fn test_with_subqueries_jump() {
5100 let subquery_schema =
5105 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5106
5107 let subquery_plan =
5108 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5109 .unwrap()
5110 .filter(col("sub_id").eq(lit(0)))
5111 .unwrap()
5112 .build()
5113 .unwrap();
5114
5115 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5116
5117 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5118 .unwrap()
5119 .filter(col("id").eq(lit(0)))
5120 .unwrap()
5121 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5122 .unwrap()
5123 .build()
5124 .unwrap();
5125
5126 let mut filter_found = false;
5127 plan.apply_with_subqueries(|plan| {
5128 match plan {
5129 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5130 LogicalPlan::Filter(..) => filter_found = true,
5131 _ => {}
5132 }
5133 Ok(TreeNodeRecursion::Continue)
5134 })
5135 .unwrap();
5136 assert!(!filter_found);
5137
5138 struct ProjectJumpVisitor {
5139 filter_found: bool,
5140 }
5141
5142 impl ProjectJumpVisitor {
5143 fn new() -> Self {
5144 Self {
5145 filter_found: false,
5146 }
5147 }
5148 }
5149
5150 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5151 type Node = LogicalPlan;
5152
5153 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5154 match node {
5155 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5156 LogicalPlan::Filter(..) => self.filter_found = true,
5157 _ => {}
5158 }
5159 Ok(TreeNodeRecursion::Continue)
5160 }
5161 }
5162
5163 let mut visitor = ProjectJumpVisitor::new();
5164 plan.visit_with_subqueries(&mut visitor).unwrap();
5165 assert!(!visitor.filter_found);
5166
5167 let mut filter_found = false;
5168 plan.clone()
5169 .transform_down_with_subqueries(|plan| {
5170 match plan {
5171 LogicalPlan::Projection(..) => {
5172 return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
5173 }
5174 LogicalPlan::Filter(..) => filter_found = true,
5175 _ => {}
5176 }
5177 Ok(Transformed::no(plan))
5178 })
5179 .unwrap();
5180 assert!(!filter_found);
5181
5182 let mut filter_found = false;
5183 plan.clone()
5184 .transform_down_up_with_subqueries(
5185 |plan| {
5186 match plan {
5187 LogicalPlan::Projection(..) => {
5188 return Ok(Transformed::new(
5189 plan,
5190 false,
5191 TreeNodeRecursion::Jump,
5192 ))
5193 }
5194 LogicalPlan::Filter(..) => filter_found = true,
5195 _ => {}
5196 }
5197 Ok(Transformed::no(plan))
5198 },
5199 |plan| Ok(Transformed::no(plan)),
5200 )
5201 .unwrap();
5202 assert!(!filter_found);
5203
5204 struct ProjectJumpRewriter {
5205 filter_found: bool,
5206 }
5207
5208 impl ProjectJumpRewriter {
5209 fn new() -> Self {
5210 Self {
5211 filter_found: false,
5212 }
5213 }
5214 }
5215
5216 impl TreeNodeRewriter for ProjectJumpRewriter {
5217 type Node = LogicalPlan;
5218
5219 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5220 match node {
5221 LogicalPlan::Projection(..) => {
5222 return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
5223 }
5224 LogicalPlan::Filter(..) => self.filter_found = true,
5225 _ => {}
5226 }
5227 Ok(Transformed::no(node))
5228 }
5229 }
5230
5231 let mut rewriter = ProjectJumpRewriter::new();
5232 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5233 assert!(!rewriter.filter_found);
5234 }
5235
5236 #[test]
5237 fn test_with_unresolved_placeholders() {
5238 let field_name = "id";
5239 let placeholder_value = "$1";
5240 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5241
5242 let plan = table_scan(TableReference::none(), &schema, None)
5243 .unwrap()
5244 .filter(col(field_name).eq(placeholder(placeholder_value)))
5245 .unwrap()
5246 .build()
5247 .unwrap();
5248
5249 let params = plan.get_parameter_types().unwrap();
5251 assert_eq!(params.len(), 1);
5252
5253 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5254 assert_eq!(parameter_type, None);
5255 }
5256
5257 #[test]
5258 fn test_join_with_new_exprs() -> Result<()> {
5259 fn create_test_join(
5260 on: Vec<(Expr, Expr)>,
5261 filter: Option<Expr>,
5262 ) -> Result<LogicalPlan> {
5263 let schema = Schema::new(vec![
5264 Field::new("a", DataType::Int32, false),
5265 Field::new("b", DataType::Int32, false),
5266 ]);
5267
5268 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5269 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5270
5271 Ok(LogicalPlan::Join(Join {
5272 left: Arc::new(
5273 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5274 ),
5275 right: Arc::new(
5276 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5277 ),
5278 on,
5279 filter,
5280 join_type: JoinType::Inner,
5281 join_constraint: JoinConstraint::On,
5282 schema: Arc::new(left_schema.join(&right_schema)?),
5283 null_equality: NullEquality::NullEqualsNothing,
5284 }))
5285 }
5286
5287 {
5288 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5289 let LogicalPlan::Join(join) = join.with_new_exprs(
5290 join.expressions(),
5291 join.inputs().into_iter().cloned().collect(),
5292 )?
5293 else {
5294 unreachable!()
5295 };
5296 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5297 assert_eq!(join.filter, None);
5298 }
5299
5300 {
5301 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5302 let LogicalPlan::Join(join) = join.with_new_exprs(
5303 join.expressions(),
5304 join.inputs().into_iter().cloned().collect(),
5305 )?
5306 else {
5307 unreachable!()
5308 };
5309 assert_eq!(join.on, vec![]);
5310 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5311 }
5312
5313 {
5314 let join = create_test_join(
5315 vec![(col("t1.a"), (col("t2.a")))],
5316 Some(col("t1.b").gt(col("t2.b"))),
5317 )?;
5318 let LogicalPlan::Join(join) = join.with_new_exprs(
5319 join.expressions(),
5320 join.inputs().into_iter().cloned().collect(),
5321 )?
5322 else {
5323 unreachable!()
5324 };
5325 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5326 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5327 }
5328
5329 {
5330 let join = create_test_join(
5331 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5332 None,
5333 )?;
5334 let LogicalPlan::Join(join) = join.with_new_exprs(
5335 vec![
5336 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5337 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5338 col("t1.b"),
5339 col("t2.b"),
5340 lit(true),
5341 ],
5342 join.inputs().into_iter().cloned().collect(),
5343 )?
5344 else {
5345 unreachable!()
5346 };
5347 assert_eq!(
5348 join.on,
5349 vec![
5350 (
5351 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5352 binary_expr(col("t2.a"), Operator::Plus, lit(2))
5353 ),
5354 (col("t1.b"), (col("t2.b")))
5355 ]
5356 );
5357 assert_eq!(join.filter, Some(lit(true)));
5358 }
5359
5360 Ok(())
5361 }
5362
5363 #[test]
5364 fn test_join_try_new() -> Result<()> {
5365 let schema = Schema::new(vec![
5366 Field::new("a", DataType::Int32, false),
5367 Field::new("b", DataType::Int32, false),
5368 ]);
5369
5370 let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5371
5372 let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5373
5374 let join_types = vec![
5375 JoinType::Inner,
5376 JoinType::Left,
5377 JoinType::Right,
5378 JoinType::Full,
5379 JoinType::LeftSemi,
5380 JoinType::LeftAnti,
5381 JoinType::RightSemi,
5382 JoinType::RightAnti,
5383 JoinType::LeftMark,
5384 ];
5385
5386 for join_type in join_types {
5387 let join = Join::try_new(
5388 Arc::new(left_scan.clone()),
5389 Arc::new(right_scan.clone()),
5390 vec![(col("t1.a"), col("t2.a"))],
5391 Some(col("t1.b").gt(col("t2.b"))),
5392 join_type,
5393 JoinConstraint::On,
5394 NullEquality::NullEqualsNothing,
5395 )?;
5396
5397 match join_type {
5398 JoinType::LeftSemi | JoinType::LeftAnti => {
5399 assert_eq!(join.schema.fields().len(), 2);
5400
5401 let fields = join.schema.fields();
5402 assert_eq!(
5403 fields[0].name(),
5404 "a",
5405 "First field should be 'a' from left table"
5406 );
5407 assert_eq!(
5408 fields[1].name(),
5409 "b",
5410 "Second field should be 'b' from left table"
5411 );
5412 }
5413 JoinType::RightSemi | JoinType::RightAnti => {
5414 assert_eq!(join.schema.fields().len(), 2);
5415
5416 let fields = join.schema.fields();
5417 assert_eq!(
5418 fields[0].name(),
5419 "a",
5420 "First field should be 'a' from right table"
5421 );
5422 assert_eq!(
5423 fields[1].name(),
5424 "b",
5425 "Second field should be 'b' from right table"
5426 );
5427 }
5428 JoinType::LeftMark => {
5429 assert_eq!(join.schema.fields().len(), 3);
5430
5431 let fields = join.schema.fields();
5432 assert_eq!(
5433 fields[0].name(),
5434 "a",
5435 "First field should be 'a' from left table"
5436 );
5437 assert_eq!(
5438 fields[1].name(),
5439 "b",
5440 "Second field should be 'b' from left table"
5441 );
5442 assert_eq!(
5443 fields[2].name(),
5444 "mark",
5445 "Third field should be the mark column"
5446 );
5447
5448 assert!(!fields[0].is_nullable());
5449 assert!(!fields[1].is_nullable());
5450 assert!(!fields[2].is_nullable());
5451 }
5452 _ => {
5453 assert_eq!(join.schema.fields().len(), 4);
5454
5455 let fields = join.schema.fields();
5456 assert_eq!(
5457 fields[0].name(),
5458 "a",
5459 "First field should be 'a' from left table"
5460 );
5461 assert_eq!(
5462 fields[1].name(),
5463 "b",
5464 "Second field should be 'b' from left table"
5465 );
5466 assert_eq!(
5467 fields[2].name(),
5468 "a",
5469 "Third field should be 'a' from right table"
5470 );
5471 assert_eq!(
5472 fields[3].name(),
5473 "b",
5474 "Fourth field should be 'b' from right table"
5475 );
5476
5477 if join_type == JoinType::Left {
5478 assert!(!fields[0].is_nullable());
5480 assert!(!fields[1].is_nullable());
5481 assert!(fields[2].is_nullable());
5483 assert!(fields[3].is_nullable());
5484 } else if join_type == JoinType::Right {
5485 assert!(fields[0].is_nullable());
5487 assert!(fields[1].is_nullable());
5488 assert!(!fields[2].is_nullable());
5490 assert!(!fields[3].is_nullable());
5491 } else if join_type == JoinType::Full {
5492 assert!(fields[0].is_nullable());
5493 assert!(fields[1].is_nullable());
5494 assert!(fields[2].is_nullable());
5495 assert!(fields[3].is_nullable());
5496 }
5497 }
5498 }
5499
5500 assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5501 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5502 assert_eq!(join.join_type, join_type);
5503 assert_eq!(join.join_constraint, JoinConstraint::On);
5504 assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5505 }
5506
5507 Ok(())
5508 }
5509
5510 #[test]
5511 fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5512 let left_schema = Schema::new(vec![
5513 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), ]);
5517
5518 let right_schema = Schema::new(vec![
5519 Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), Field::new("value", DataType::Float64, true), ]);
5523
5524 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5525
5526 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5527
5528 {
5530 let join = Join::try_new(
5533 Arc::new(left_plan.clone()),
5534 Arc::new(right_plan.clone()),
5535 vec![(col("t1.id"), col("t2.id"))],
5536 None,
5537 JoinType::Inner,
5538 JoinConstraint::Using,
5539 NullEquality::NullEqualsNothing,
5540 )?;
5541
5542 let fields = join.schema.fields();
5543
5544 assert_eq!(fields.len(), 6);
5545
5546 assert_eq!(
5547 fields[0].name(),
5548 "id",
5549 "First field should be 'id' from left table"
5550 );
5551 assert_eq!(
5552 fields[1].name(),
5553 "name",
5554 "Second field should be 'name' from left table"
5555 );
5556 assert_eq!(
5557 fields[2].name(),
5558 "value",
5559 "Third field should be 'value' from left table"
5560 );
5561 assert_eq!(
5562 fields[3].name(),
5563 "id",
5564 "Fourth field should be 'id' from right table"
5565 );
5566 assert_eq!(
5567 fields[4].name(),
5568 "category",
5569 "Fifth field should be 'category' from right table"
5570 );
5571 assert_eq!(
5572 fields[5].name(),
5573 "value",
5574 "Sixth field should be 'value' from right table"
5575 );
5576
5577 assert_eq!(join.join_constraint, JoinConstraint::Using);
5578 }
5579
5580 {
5582 let join = Join::try_new(
5584 Arc::new(left_plan.clone()),
5585 Arc::new(right_plan.clone()),
5586 vec![(col("t1.id"), col("t2.id"))], Some(col("t1.value").lt(col("t2.value"))), JoinType::Inner,
5589 JoinConstraint::On,
5590 NullEquality::NullEqualsNothing,
5591 )?;
5592
5593 let fields = join.schema.fields();
5594 assert_eq!(fields.len(), 6);
5595
5596 assert_eq!(
5597 fields[0].name(),
5598 "id",
5599 "First field should be 'id' from left table"
5600 );
5601 assert_eq!(
5602 fields[1].name(),
5603 "name",
5604 "Second field should be 'name' from left table"
5605 );
5606 assert_eq!(
5607 fields[2].name(),
5608 "value",
5609 "Third field should be 'value' from left table"
5610 );
5611 assert_eq!(
5612 fields[3].name(),
5613 "id",
5614 "Fourth field should be 'id' from right table"
5615 );
5616 assert_eq!(
5617 fields[4].name(),
5618 "category",
5619 "Fifth field should be 'category' from right table"
5620 );
5621 assert_eq!(
5622 fields[5].name(),
5623 "value",
5624 "Sixth field should be 'value' from right table"
5625 );
5626
5627 assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5628 }
5629
5630 {
5632 let join = Join::try_new(
5633 Arc::new(left_plan.clone()),
5634 Arc::new(right_plan.clone()),
5635 vec![(col("t1.id"), col("t2.id"))],
5636 None,
5637 JoinType::Inner,
5638 JoinConstraint::On,
5639 NullEquality::NullEqualsNull,
5640 )?;
5641
5642 assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5643 }
5644
5645 Ok(())
5646 }
5647
5648 #[test]
5649 fn test_join_try_new_schema_validation() -> Result<()> {
5650 let left_schema = Schema::new(vec![
5651 Field::new("id", DataType::Int32, false),
5652 Field::new("name", DataType::Utf8, false),
5653 Field::new("value", DataType::Float64, true),
5654 ]);
5655
5656 let right_schema = Schema::new(vec![
5657 Field::new("id", DataType::Int32, false),
5658 Field::new("category", DataType::Utf8, true),
5659 Field::new("code", DataType::Int16, false),
5660 ]);
5661
5662 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5663
5664 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5665
5666 let join_types = vec![
5667 JoinType::Inner,
5668 JoinType::Left,
5669 JoinType::Right,
5670 JoinType::Full,
5671 ];
5672
5673 for join_type in join_types {
5674 let join = Join::try_new(
5675 Arc::new(left_plan.clone()),
5676 Arc::new(right_plan.clone()),
5677 vec![(col("t1.id"), col("t2.id"))],
5678 Some(col("t1.value").gt(lit(5.0))),
5679 join_type,
5680 JoinConstraint::On,
5681 NullEquality::NullEqualsNothing,
5682 )?;
5683
5684 let fields = join.schema.fields();
5685 assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type:?} join");
5686
5687 for (i, field) in fields.iter().enumerate() {
5688 let expected_nullable = match (i, &join_type) {
5689 (0, JoinType::Right | JoinType::Full) => true, (1, JoinType::Right | JoinType::Full) => true, (2, _) => true, (3, JoinType::Left | JoinType::Full) => true, (4, _) => true, (5, JoinType::Left | JoinType::Full) => true, _ => false,
5700 };
5701
5702 assert_eq!(
5703 field.is_nullable(),
5704 expected_nullable,
5705 "Field {} ({}) nullability incorrect for {:?} join",
5706 i,
5707 field.name(),
5708 join_type
5709 );
5710 }
5711 }
5712
5713 let using_join = Join::try_new(
5714 Arc::new(left_plan.clone()),
5715 Arc::new(right_plan.clone()),
5716 vec![(col("t1.id"), col("t2.id"))],
5717 None,
5718 JoinType::Inner,
5719 JoinConstraint::Using,
5720 NullEquality::NullEqualsNothing,
5721 )?;
5722
5723 assert_eq!(
5724 using_join.schema.fields().len(),
5725 6,
5726 "USING join should have all fields"
5727 );
5728 assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5729
5730 Ok(())
5731 }
5732}