1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::dml::CopyTo;
27use super::invariants::{
28 assert_always_invariants_at_current_node, assert_executable_invariants,
29 InvariantLevel,
30};
31use super::DdlStatement;
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34 intersect_metadata_for_union, Alias, Placeholder, Sort as SortExpr, WindowFunction,
35 WindowFunctionParams,
36};
37use crate::expr_rewriter::{
38 create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45 grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
46};
47use crate::{
48 build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
49 CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
50 Operator, Prepare, TableProviderFilterPushDown, TableSource,
51 WindowFunctionDefinition,
52};
53
54use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
55use datafusion_common::cse::{NormalizeEq, Normalizeable};
56use datafusion_common::format::ExplainFormat;
57use datafusion_common::metadata::check_metadata_with_storage_equal;
58use datafusion_common::tree_node::{
59 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
60};
61use datafusion_common::{
62 aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
63 DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
64 FunctionalDependencies, NullEquality, ParamValues, Result, ScalarValue, Spans,
65 TableReference, UnnestOptions,
66};
67use indexmap::IndexSet;
68
69use crate::display::PgJsonVisitor;
71pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
72pub use datafusion_common::{JoinConstraint, JoinType};
73
74#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
207pub enum LogicalPlan {
208 Projection(Projection),
211 Filter(Filter),
220 Window(Window),
226 Aggregate(Aggregate),
232 Sort(Sort),
235 Join(Join),
238 Repartition(Repartition),
242 Union(Union),
246 TableScan(TableScan),
249 EmptyRelation(EmptyRelation),
253 Subquery(Subquery),
256 SubqueryAlias(SubqueryAlias),
258 Limit(Limit),
260 Statement(Statement),
262 Values(Values),
267 Explain(Explain),
270 Analyze(Analyze),
274 Extension(Extension),
277 Distinct(Distinct),
280 Dml(DmlStatement),
282 Ddl(DdlStatement),
284 Copy(CopyTo),
286 DescribeTable(DescribeTable),
289 Unnest(Unnest),
292 RecursiveQuery(RecursiveQuery),
294}
295
296impl Default for LogicalPlan {
297 fn default() -> Self {
298 LogicalPlan::EmptyRelation(EmptyRelation {
299 produce_one_row: false,
300 schema: Arc::new(DFSchema::empty()),
301 })
302 }
303}
304
305impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
306 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
307 &'a self,
308 mut f: F,
309 ) -> Result<TreeNodeRecursion> {
310 f(self)
311 }
312
313 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
314 self,
315 mut f: F,
316 ) -> Result<Transformed<Self>> {
317 f(self)
318 }
319}
320
321impl LogicalPlan {
322 pub fn schema(&self) -> &DFSchemaRef {
324 match self {
325 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
326 LogicalPlan::Values(Values { schema, .. }) => schema,
327 LogicalPlan::TableScan(TableScan {
328 projected_schema, ..
329 }) => projected_schema,
330 LogicalPlan::Projection(Projection { schema, .. }) => schema,
331 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
332 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
333 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
334 LogicalPlan::Window(Window { schema, .. }) => schema,
335 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
336 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
337 LogicalPlan::Join(Join { schema, .. }) => schema,
338 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
339 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
340 LogicalPlan::Statement(statement) => statement.schema(),
341 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
342 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
343 LogicalPlan::Explain(explain) => &explain.schema,
344 LogicalPlan::Analyze(analyze) => &analyze.schema,
345 LogicalPlan::Extension(extension) => extension.node.schema(),
346 LogicalPlan::Union(Union { schema, .. }) => schema,
347 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
348 output_schema
349 }
350 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
351 LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
352 LogicalPlan::Ddl(ddl) => ddl.schema(),
353 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
354 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
355 static_term.schema()
357 }
358 }
359 }
360
361 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
364 match self {
365 LogicalPlan::Window(_)
366 | LogicalPlan::Projection(_)
367 | LogicalPlan::Aggregate(_)
368 | LogicalPlan::Unnest(_)
369 | LogicalPlan::Join(_) => self
370 .inputs()
371 .iter()
372 .map(|input| input.schema().as_ref())
373 .collect(),
374 _ => vec![],
375 }
376 }
377
378 pub fn explain_schema() -> SchemaRef {
380 SchemaRef::new(Schema::new(vec![
381 Field::new("plan_type", DataType::Utf8, false),
382 Field::new("plan", DataType::Utf8, false),
383 ]))
384 }
385
386 pub fn describe_schema() -> Schema {
388 Schema::new(vec![
389 Field::new("column_name", DataType::Utf8, false),
390 Field::new("data_type", DataType::Utf8, false),
391 Field::new("is_nullable", DataType::Utf8, false),
392 ])
393 }
394
395 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
412 let mut exprs = vec![];
413 self.apply_expressions(|e| {
414 exprs.push(e.clone());
415 Ok(TreeNodeRecursion::Continue)
416 })
417 .unwrap();
419 exprs
420 }
421
422 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
425 let mut exprs = vec![];
426 self.apply_expressions(|e| {
427 find_out_reference_exprs(e).into_iter().for_each(|e| {
428 if !exprs.contains(&e) {
429 exprs.push(e)
430 }
431 });
432 Ok(TreeNodeRecursion::Continue)
433 })
434 .unwrap();
436 self.inputs()
437 .into_iter()
438 .flat_map(|child| child.all_out_ref_exprs())
439 .for_each(|e| {
440 if !exprs.contains(&e) {
441 exprs.push(e)
442 }
443 });
444 exprs
445 }
446
447 pub fn inputs(&self) -> Vec<&LogicalPlan> {
451 match self {
452 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
453 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
454 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
455 LogicalPlan::Window(Window { input, .. }) => vec![input],
456 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
457 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
458 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
459 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
460 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
461 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
462 LogicalPlan::Extension(extension) => extension.node.inputs(),
463 LogicalPlan::Union(Union { inputs, .. }) => {
464 inputs.iter().map(|arc| arc.as_ref()).collect()
465 }
466 LogicalPlan::Distinct(
467 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
468 ) => vec![input],
469 LogicalPlan::Explain(explain) => vec![&explain.plan],
470 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
471 LogicalPlan::Dml(write) => vec![&write.input],
472 LogicalPlan::Copy(copy) => vec![©.input],
473 LogicalPlan::Ddl(ddl) => ddl.inputs(),
474 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
475 LogicalPlan::RecursiveQuery(RecursiveQuery {
476 static_term,
477 recursive_term,
478 ..
479 }) => vec![static_term, recursive_term],
480 LogicalPlan::Statement(stmt) => stmt.inputs(),
481 LogicalPlan::TableScan { .. }
483 | LogicalPlan::EmptyRelation { .. }
484 | LogicalPlan::Values { .. }
485 | LogicalPlan::DescribeTable(_) => vec![],
486 }
487 }
488
489 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
491 let mut using_columns: Vec<HashSet<Column>> = vec![];
492
493 self.apply_with_subqueries(|plan| {
494 if let LogicalPlan::Join(Join {
495 join_constraint: JoinConstraint::Using,
496 on,
497 ..
498 }) = plan
499 {
500 let columns =
502 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
503 let Some(l) = l.get_as_join_column() else {
504 return internal_err!(
505 "Invalid join key. Expected column, found {l:?}"
506 );
507 };
508 let Some(r) = r.get_as_join_column() else {
509 return internal_err!(
510 "Invalid join key. Expected column, found {r:?}"
511 );
512 };
513 accumu.insert(l.to_owned());
514 accumu.insert(r.to_owned());
515 Result::<_, DataFusionError>::Ok(accumu)
516 })?;
517 using_columns.push(columns);
518 }
519 Ok(TreeNodeRecursion::Continue)
520 })?;
521
522 Ok(using_columns)
523 }
524
525 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
527 match self {
528 LogicalPlan::Projection(projection) => {
529 Ok(Some(projection.expr.as_slice()[0].clone()))
530 }
531 LogicalPlan::Aggregate(agg) => {
532 if agg.group_expr.is_empty() {
533 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
534 } else {
535 Ok(Some(agg.group_expr.as_slice()[0].clone()))
536 }
537 }
538 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
539 Ok(Some(select_expr[0].clone()))
540 }
541 LogicalPlan::Filter(Filter { input, .. })
542 | LogicalPlan::Distinct(Distinct::All(input))
543 | LogicalPlan::Sort(Sort { input, .. })
544 | LogicalPlan::Limit(Limit { input, .. })
545 | LogicalPlan::Repartition(Repartition { input, .. })
546 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
547 LogicalPlan::Join(Join {
548 left,
549 right,
550 join_type,
551 ..
552 }) => match join_type {
553 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
554 if left.schema().fields().is_empty() {
555 right.head_output_expr()
556 } else {
557 left.head_output_expr()
558 }
559 }
560 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
561 left.head_output_expr()
562 }
563 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
564 right.head_output_expr()
565 }
566 },
567 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
568 static_term.head_output_expr()
569 }
570 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
571 union.schema.qualified_field(0),
572 )))),
573 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
574 table.projected_schema.qualified_field(0),
575 )))),
576 LogicalPlan::SubqueryAlias(subquery_alias) => {
577 let expr_opt = subquery_alias.input.head_output_expr()?;
578 expr_opt
579 .map(|expr| {
580 Ok(Expr::Column(create_col_from_scalar_expr(
581 &expr,
582 subquery_alias.alias.to_string(),
583 )?))
584 })
585 .map_or(Ok(None), |v| v.map(Some))
586 }
587 LogicalPlan::Subquery(_) => Ok(None),
588 LogicalPlan::EmptyRelation(_)
589 | LogicalPlan::Statement(_)
590 | LogicalPlan::Values(_)
591 | LogicalPlan::Explain(_)
592 | LogicalPlan::Analyze(_)
593 | LogicalPlan::Extension(_)
594 | LogicalPlan::Dml(_)
595 | LogicalPlan::Copy(_)
596 | LogicalPlan::Ddl(_)
597 | LogicalPlan::DescribeTable(_)
598 | LogicalPlan::Unnest(_) => Ok(None),
599 }
600 }
601
602 pub fn recompute_schema(self) -> Result<Self> {
625 match self {
626 LogicalPlan::Projection(Projection {
629 expr,
630 input,
631 schema: _,
632 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
633 LogicalPlan::Dml(_) => Ok(self),
634 LogicalPlan::Copy(_) => Ok(self),
635 LogicalPlan::Values(Values { schema, values }) => {
636 Ok(LogicalPlan::Values(Values { schema, values }))
638 }
639 LogicalPlan::Filter(Filter { predicate, input }) => {
640 Filter::try_new(predicate, input).map(LogicalPlan::Filter)
641 }
642 LogicalPlan::Repartition(_) => Ok(self),
643 LogicalPlan::Window(Window {
644 input,
645 window_expr,
646 schema: _,
647 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
648 LogicalPlan::Aggregate(Aggregate {
649 input,
650 group_expr,
651 aggr_expr,
652 schema: _,
653 }) => Aggregate::try_new(input, group_expr, aggr_expr)
654 .map(LogicalPlan::Aggregate),
655 LogicalPlan::Sort(_) => Ok(self),
656 LogicalPlan::Join(Join {
657 left,
658 right,
659 filter,
660 join_type,
661 join_constraint,
662 on,
663 schema: _,
664 null_equality,
665 }) => {
666 let schema =
667 build_join_schema(left.schema(), right.schema(), &join_type)?;
668
669 let new_on: Vec<_> = on
670 .into_iter()
671 .map(|equi_expr| {
672 (equi_expr.0.unalias(), equi_expr.1.unalias())
674 })
675 .collect();
676
677 Ok(LogicalPlan::Join(Join {
678 left,
679 right,
680 join_type,
681 join_constraint,
682 on: new_on,
683 filter,
684 schema: DFSchemaRef::new(schema),
685 null_equality,
686 }))
687 }
688 LogicalPlan::Subquery(_) => Ok(self),
689 LogicalPlan::SubqueryAlias(SubqueryAlias {
690 input,
691 alias,
692 schema: _,
693 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
694 LogicalPlan::Limit(_) => Ok(self),
695 LogicalPlan::Ddl(_) => Ok(self),
696 LogicalPlan::Extension(Extension { node }) => {
697 let expr = node.expressions();
700 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
701 Ok(LogicalPlan::Extension(Extension {
702 node: node.with_exprs_and_inputs(expr, inputs)?,
703 }))
704 }
705 LogicalPlan::Union(Union { inputs, schema }) => {
706 let first_input_schema = inputs[0].schema();
707 if schema.fields().len() == first_input_schema.fields().len() {
708 Ok(LogicalPlan::Union(Union { inputs, schema }))
710 } else {
711 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
719 }
720 }
721 LogicalPlan::Distinct(distinct) => {
722 let distinct = match distinct {
723 Distinct::All(input) => Distinct::All(input),
724 Distinct::On(DistinctOn {
725 on_expr,
726 select_expr,
727 sort_expr,
728 input,
729 schema: _,
730 }) => Distinct::On(DistinctOn::try_new(
731 on_expr,
732 select_expr,
733 sort_expr,
734 input,
735 )?),
736 };
737 Ok(LogicalPlan::Distinct(distinct))
738 }
739 LogicalPlan::RecursiveQuery(_) => Ok(self),
740 LogicalPlan::Analyze(_) => Ok(self),
741 LogicalPlan::Explain(_) => Ok(self),
742 LogicalPlan::TableScan(_) => Ok(self),
743 LogicalPlan::EmptyRelation(_) => Ok(self),
744 LogicalPlan::Statement(_) => Ok(self),
745 LogicalPlan::DescribeTable(_) => Ok(self),
746 LogicalPlan::Unnest(Unnest {
747 input,
748 exec_columns,
749 options,
750 ..
751 }) => {
752 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
754 }
755 }
756 }
757
758 pub fn with_new_exprs(
784 &self,
785 mut expr: Vec<Expr>,
786 inputs: Vec<LogicalPlan>,
787 ) -> Result<LogicalPlan> {
788 match self {
789 LogicalPlan::Projection(Projection { .. }) => {
792 let input = self.only_input(inputs)?;
793 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
794 }
795 LogicalPlan::Dml(DmlStatement {
796 table_name,
797 target,
798 op,
799 ..
800 }) => {
801 self.assert_no_expressions(expr)?;
802 let input = self.only_input(inputs)?;
803 Ok(LogicalPlan::Dml(DmlStatement::new(
804 table_name.clone(),
805 Arc::clone(target),
806 op.clone(),
807 Arc::new(input),
808 )))
809 }
810 LogicalPlan::Copy(CopyTo {
811 input: _,
812 output_url,
813 file_type,
814 options,
815 partition_by,
816 output_schema: _,
817 }) => {
818 self.assert_no_expressions(expr)?;
819 let input = self.only_input(inputs)?;
820 Ok(LogicalPlan::Copy(CopyTo::new(
821 Arc::new(input),
822 output_url.clone(),
823 partition_by.clone(),
824 Arc::clone(file_type),
825 options.clone(),
826 )))
827 }
828 LogicalPlan::Values(Values { schema, .. }) => {
829 self.assert_no_inputs(inputs)?;
830 Ok(LogicalPlan::Values(Values {
831 schema: Arc::clone(schema),
832 values: expr
833 .chunks_exact(schema.fields().len())
834 .map(|s| s.to_vec())
835 .collect(),
836 }))
837 }
838 LogicalPlan::Filter { .. } => {
839 let predicate = self.only_expr(expr)?;
840 let input = self.only_input(inputs)?;
841
842 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
843 }
844 LogicalPlan::Repartition(Repartition {
845 partitioning_scheme,
846 ..
847 }) => match partitioning_scheme {
848 Partitioning::RoundRobinBatch(n) => {
849 self.assert_no_expressions(expr)?;
850 let input = self.only_input(inputs)?;
851 Ok(LogicalPlan::Repartition(Repartition {
852 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
853 input: Arc::new(input),
854 }))
855 }
856 Partitioning::Hash(_, n) => {
857 let input = self.only_input(inputs)?;
858 Ok(LogicalPlan::Repartition(Repartition {
859 partitioning_scheme: Partitioning::Hash(expr, *n),
860 input: Arc::new(input),
861 }))
862 }
863 Partitioning::DistributeBy(_) => {
864 let input = self.only_input(inputs)?;
865 Ok(LogicalPlan::Repartition(Repartition {
866 partitioning_scheme: Partitioning::DistributeBy(expr),
867 input: Arc::new(input),
868 }))
869 }
870 },
871 LogicalPlan::Window(Window { window_expr, .. }) => {
872 assert_eq!(window_expr.len(), expr.len());
873 let input = self.only_input(inputs)?;
874 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
875 }
876 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
877 let input = self.only_input(inputs)?;
878 let agg_expr = expr.split_off(group_expr.len());
880
881 Aggregate::try_new(Arc::new(input), expr, agg_expr)
882 .map(LogicalPlan::Aggregate)
883 }
884 LogicalPlan::Sort(Sort {
885 expr: sort_expr,
886 fetch,
887 ..
888 }) => {
889 let input = self.only_input(inputs)?;
890 Ok(LogicalPlan::Sort(Sort {
891 expr: expr
892 .into_iter()
893 .zip(sort_expr.iter())
894 .map(|(expr, sort)| sort.with_expr(expr))
895 .collect(),
896 input: Arc::new(input),
897 fetch: *fetch,
898 }))
899 }
900 LogicalPlan::Join(Join {
901 join_type,
902 join_constraint,
903 on,
904 null_equality,
905 ..
906 }) => {
907 let (left, right) = self.only_two_inputs(inputs)?;
908 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
909
910 let equi_expr_count = on.len() * 2;
911 assert!(expr.len() >= equi_expr_count);
912
913 let filter_expr = if expr.len() > equi_expr_count {
916 expr.pop()
917 } else {
918 None
919 };
920
921 assert_eq!(expr.len(), equi_expr_count);
924 let mut new_on = Vec::with_capacity(on.len());
925 let mut iter = expr.into_iter();
926 while let Some(left) = iter.next() {
927 let Some(right) = iter.next() else {
928 internal_err!("Expected a pair of expressions to construct the join on expression")?
929 };
930
931 new_on.push((left.unalias(), right.unalias()));
933 }
934
935 Ok(LogicalPlan::Join(Join {
936 left: Arc::new(left),
937 right: Arc::new(right),
938 join_type: *join_type,
939 join_constraint: *join_constraint,
940 on: new_on,
941 filter: filter_expr,
942 schema: DFSchemaRef::new(schema),
943 null_equality: *null_equality,
944 }))
945 }
946 LogicalPlan::Subquery(Subquery {
947 outer_ref_columns,
948 spans,
949 ..
950 }) => {
951 self.assert_no_expressions(expr)?;
952 let input = self.only_input(inputs)?;
953 let subquery = LogicalPlanBuilder::from(input).build()?;
954 Ok(LogicalPlan::Subquery(Subquery {
955 subquery: Arc::new(subquery),
956 outer_ref_columns: outer_ref_columns.clone(),
957 spans: spans.clone(),
958 }))
959 }
960 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
961 self.assert_no_expressions(expr)?;
962 let input = self.only_input(inputs)?;
963 SubqueryAlias::try_new(Arc::new(input), alias.clone())
964 .map(LogicalPlan::SubqueryAlias)
965 }
966 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
967 let old_expr_len = skip.iter().chain(fetch.iter()).count();
968 if old_expr_len != expr.len() {
969 return internal_err!(
970 "Invalid number of new Limit expressions: expected {}, got {}",
971 old_expr_len,
972 expr.len()
973 );
974 }
975 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
977 let new_skip = skip.as_ref().and_then(|_| expr.pop());
978 let input = self.only_input(inputs)?;
979 Ok(LogicalPlan::Limit(Limit {
980 skip: new_skip.map(Box::new),
981 fetch: new_fetch.map(Box::new),
982 input: Arc::new(input),
983 }))
984 }
985 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
986 name,
987 if_not_exists,
988 or_replace,
989 column_defaults,
990 temporary,
991 ..
992 })) => {
993 self.assert_no_expressions(expr)?;
994 let input = self.only_input(inputs)?;
995 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
996 CreateMemoryTable {
997 input: Arc::new(input),
998 constraints: Constraints::default(),
999 name: name.clone(),
1000 if_not_exists: *if_not_exists,
1001 or_replace: *or_replace,
1002 column_defaults: column_defaults.clone(),
1003 temporary: *temporary,
1004 },
1005 )))
1006 }
1007 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1008 name,
1009 or_replace,
1010 definition,
1011 temporary,
1012 ..
1013 })) => {
1014 self.assert_no_expressions(expr)?;
1015 let input = self.only_input(inputs)?;
1016 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1017 input: Arc::new(input),
1018 name: name.clone(),
1019 or_replace: *or_replace,
1020 temporary: *temporary,
1021 definition: definition.clone(),
1022 })))
1023 }
1024 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1025 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1026 })),
1027 LogicalPlan::Union(Union { schema, .. }) => {
1028 self.assert_no_expressions(expr)?;
1029 let input_schema = inputs[0].schema();
1030 let schema = if schema.fields().len() == input_schema.fields().len() {
1032 Arc::clone(schema)
1033 } else {
1034 Arc::clone(input_schema)
1035 };
1036 Ok(LogicalPlan::Union(Union {
1037 inputs: inputs.into_iter().map(Arc::new).collect(),
1038 schema,
1039 }))
1040 }
1041 LogicalPlan::Distinct(distinct) => {
1042 let distinct = match distinct {
1043 Distinct::All(_) => {
1044 self.assert_no_expressions(expr)?;
1045 let input = self.only_input(inputs)?;
1046 Distinct::All(Arc::new(input))
1047 }
1048 Distinct::On(DistinctOn {
1049 on_expr,
1050 select_expr,
1051 ..
1052 }) => {
1053 let input = self.only_input(inputs)?;
1054 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1055 let select_expr = expr.split_off(on_expr.len());
1056 assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1057 Distinct::On(DistinctOn::try_new(
1058 expr,
1059 select_expr,
1060 None, Arc::new(input),
1062 )?)
1063 }
1064 };
1065 Ok(LogicalPlan::Distinct(distinct))
1066 }
1067 LogicalPlan::RecursiveQuery(RecursiveQuery {
1068 name, is_distinct, ..
1069 }) => {
1070 self.assert_no_expressions(expr)?;
1071 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1072 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1073 name: name.clone(),
1074 static_term: Arc::new(static_term),
1075 recursive_term: Arc::new(recursive_term),
1076 is_distinct: *is_distinct,
1077 }))
1078 }
1079 LogicalPlan::Analyze(a) => {
1080 self.assert_no_expressions(expr)?;
1081 let input = self.only_input(inputs)?;
1082 Ok(LogicalPlan::Analyze(Analyze {
1083 verbose: a.verbose,
1084 schema: Arc::clone(&a.schema),
1085 input: Arc::new(input),
1086 }))
1087 }
1088 LogicalPlan::Explain(e) => {
1089 self.assert_no_expressions(expr)?;
1090 let input = self.only_input(inputs)?;
1091 Ok(LogicalPlan::Explain(Explain {
1092 verbose: e.verbose,
1093 plan: Arc::new(input),
1094 explain_format: e.explain_format.clone(),
1095 stringified_plans: e.stringified_plans.clone(),
1096 schema: Arc::clone(&e.schema),
1097 logical_optimization_succeeded: e.logical_optimization_succeeded,
1098 }))
1099 }
1100 LogicalPlan::Statement(Statement::Prepare(Prepare {
1101 name, fields, ..
1102 })) => {
1103 self.assert_no_expressions(expr)?;
1104 let input = self.only_input(inputs)?;
1105 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1106 name: name.clone(),
1107 fields: fields.clone(),
1108 input: Arc::new(input),
1109 })))
1110 }
1111 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1112 self.assert_no_inputs(inputs)?;
1113 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1114 name: name.clone(),
1115 parameters: expr,
1116 })))
1117 }
1118 LogicalPlan::TableScan(ts) => {
1119 self.assert_no_inputs(inputs)?;
1120 Ok(LogicalPlan::TableScan(TableScan {
1121 filters: expr,
1122 ..ts.clone()
1123 }))
1124 }
1125 LogicalPlan::EmptyRelation(_)
1126 | LogicalPlan::Ddl(_)
1127 | LogicalPlan::Statement(_)
1128 | LogicalPlan::DescribeTable(_) => {
1129 self.assert_no_expressions(expr)?;
1131 self.assert_no_inputs(inputs)?;
1132 Ok(self.clone())
1133 }
1134 LogicalPlan::Unnest(Unnest {
1135 exec_columns: columns,
1136 options,
1137 ..
1138 }) => {
1139 self.assert_no_expressions(expr)?;
1140 let input = self.only_input(inputs)?;
1141 let new_plan =
1143 unnest_with_options(input, columns.clone(), options.clone())?;
1144 Ok(new_plan)
1145 }
1146 }
1147 }
1148
1149 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1151 match check {
1152 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1153 InvariantLevel::Executable => assert_executable_invariants(self),
1154 }
1155 }
1156
1157 #[inline]
1159 #[expect(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1161 if !expr.is_empty() {
1162 return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1163 }
1164 Ok(())
1165 }
1166
1167 #[inline]
1169 #[expect(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1171 if !inputs.is_empty() {
1172 return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1173 }
1174 Ok(())
1175 }
1176
1177 #[inline]
1179 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1180 if expr.len() != 1 {
1181 return internal_err!(
1182 "{self:?} should have exactly one expr, got {:?}",
1183 expr
1184 );
1185 }
1186 Ok(expr.remove(0))
1187 }
1188
1189 #[inline]
1191 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1192 if inputs.len() != 1 {
1193 return internal_err!(
1194 "{self:?} should have exactly one input, got {:?}",
1195 inputs
1196 );
1197 }
1198 Ok(inputs.remove(0))
1199 }
1200
1201 #[inline]
1203 fn only_two_inputs(
1204 &self,
1205 mut inputs: Vec<LogicalPlan>,
1206 ) -> Result<(LogicalPlan, LogicalPlan)> {
1207 if inputs.len() != 2 {
1208 return internal_err!(
1209 "{self:?} should have exactly two inputs, got {:?}",
1210 inputs
1211 );
1212 }
1213 let right = inputs.remove(1);
1214 let left = inputs.remove(0);
1215 Ok((left, right))
1216 }
1217
1218 pub fn with_param_values(
1271 self,
1272 param_values: impl Into<ParamValues>,
1273 ) -> Result<LogicalPlan> {
1274 let param_values = param_values.into();
1275 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1276
1277 Ok(
1279 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1280 plan_with_values
1281 {
1282 param_values.verify_fields(&prepare_lp.fields)?;
1283 Arc::unwrap_or_clone(prepare_lp.input)
1285 } else {
1286 plan_with_values
1287 },
1288 )
1289 }
1290
1291 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1296 match self {
1297 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1298 LogicalPlan::Filter(filter) => {
1299 if filter.is_scalar() {
1300 Some(1)
1301 } else {
1302 filter.input.max_rows()
1303 }
1304 }
1305 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1306 LogicalPlan::Aggregate(Aggregate {
1307 input, group_expr, ..
1308 }) => {
1309 if group_expr
1311 .iter()
1312 .all(|expr| matches!(expr, Expr::Literal(_, _)))
1313 {
1314 Some(1)
1315 } else {
1316 input.max_rows()
1317 }
1318 }
1319 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1320 match (fetch, input.max_rows()) {
1321 (Some(fetch_limit), Some(input_max)) => {
1322 Some(input_max.min(*fetch_limit))
1323 }
1324 (Some(fetch_limit), None) => Some(*fetch_limit),
1325 (None, Some(input_max)) => Some(input_max),
1326 (None, None) => None,
1327 }
1328 }
1329 LogicalPlan::Join(Join {
1330 left,
1331 right,
1332 join_type,
1333 ..
1334 }) => match join_type {
1335 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1336 JoinType::Left | JoinType::Right | JoinType::Full => {
1337 match (left.max_rows()?, right.max_rows()?, join_type) {
1338 (0, 0, _) => Some(0),
1339 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1340 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1341 (left_max, right_max, _) => Some(left_max * right_max),
1342 }
1343 }
1344 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1345 left.max_rows()
1346 }
1347 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1348 right.max_rows()
1349 }
1350 },
1351 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1352 LogicalPlan::Union(Union { inputs, .. }) => {
1353 inputs.iter().try_fold(0usize, |mut acc, plan| {
1354 acc += plan.max_rows()?;
1355 Some(acc)
1356 })
1357 }
1358 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1359 LogicalPlan::EmptyRelation(_) => Some(0),
1360 LogicalPlan::RecursiveQuery(_) => None,
1361 LogicalPlan::Subquery(_) => None,
1362 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1363 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1364 Ok(FetchType::Literal(s)) => s,
1365 _ => None,
1366 },
1367 LogicalPlan::Distinct(
1368 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1369 ) => input.max_rows(),
1370 LogicalPlan::Values(v) => Some(v.values.len()),
1371 LogicalPlan::Unnest(_) => None,
1372 LogicalPlan::Ddl(_)
1373 | LogicalPlan::Explain(_)
1374 | LogicalPlan::Analyze(_)
1375 | LogicalPlan::Dml(_)
1376 | LogicalPlan::Copy(_)
1377 | LogicalPlan::DescribeTable(_)
1378 | LogicalPlan::Statement(_)
1379 | LogicalPlan::Extension(_) => None,
1380 }
1381 }
1382
1383 pub fn contains_outer_reference(&self) -> bool {
1385 let mut contains = false;
1386 self.apply_expressions(|expr| {
1387 Ok(if expr.contains_outer() {
1388 contains = true;
1389 TreeNodeRecursion::Stop
1390 } else {
1391 TreeNodeRecursion::Continue
1392 })
1393 })
1394 .unwrap();
1395 contains
1396 }
1397
1398 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1406 match self {
1407 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1408 .output_expressions()?
1409 .into_iter()
1410 .zip(self.schema().columns())
1411 .collect()),
1412 LogicalPlan::Window(Window {
1413 window_expr,
1414 input,
1415 schema,
1416 }) => {
1417 let mut output_exprs = input.columnized_output_exprs()?;
1425 let input_len = input.schema().fields().len();
1426 output_exprs.extend(
1427 window_expr
1428 .iter()
1429 .zip(schema.columns().into_iter().skip(input_len)),
1430 );
1431 Ok(output_exprs)
1432 }
1433 _ => Ok(vec![]),
1434 }
1435 }
1436}
1437
1438impl LogicalPlan {
1439 pub fn replace_params_with_values(
1446 self,
1447 param_values: &ParamValues,
1448 ) -> Result<LogicalPlan> {
1449 self.transform_up_with_subqueries(|plan| {
1450 let schema = Arc::clone(plan.schema());
1451 let name_preserver = NamePreserver::new(&plan);
1452 plan.map_expressions(|e| {
1453 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1454 if !has_placeholder {
1455 Ok(Transformed::no(e))
1459 } else {
1460 let original_name = name_preserver.save(&e);
1461 let transformed_expr = e.transform_up(|e| {
1462 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1463 let (value, metadata) = param_values
1464 .get_placeholders_with_values(&id)?
1465 .into_inner();
1466 Ok(Transformed::yes(Expr::Literal(value, metadata)))
1467 } else {
1468 Ok(Transformed::no(e))
1469 }
1470 })?;
1471 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1473 }
1474 })
1475 })
1476 .map(|res| res.data)
1477 }
1478
1479 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1481 let mut param_names = HashSet::new();
1482 self.apply_with_subqueries(|plan| {
1483 plan.apply_expressions(|expr| {
1484 expr.apply(|expr| {
1485 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1486 param_names.insert(id.clone());
1487 }
1488 Ok(TreeNodeRecursion::Continue)
1489 })
1490 })
1491 })
1492 .map(|_| param_names)
1493 }
1494
1495 pub fn get_parameter_types(
1500 &self,
1501 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1502 let mut parameter_fields = self.get_parameter_fields()?;
1503 Ok(parameter_fields
1504 .drain()
1505 .map(|(name, maybe_field)| {
1506 (name, maybe_field.map(|field| field.data_type().clone()))
1507 })
1508 .collect())
1509 }
1510
1511 pub fn get_parameter_fields(
1513 &self,
1514 ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1515 let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1516
1517 self.apply_with_subqueries(|plan| {
1518 plan.apply_expressions(|expr| {
1519 expr.apply(|expr| {
1520 if let Expr::Placeholder(Placeholder { id, field }) = expr {
1521 let prev = param_types.get(id);
1522 match (prev, field) {
1523 (Some(Some(prev)), Some(field)) => {
1524 check_metadata_with_storage_equal(
1525 (field.data_type(), Some(field.metadata())),
1526 (prev.data_type(), Some(prev.metadata())),
1527 "parameter",
1528 &format!(": Conflicting types for id {id}"),
1529 )?;
1530 }
1531 (_, Some(field)) => {
1532 param_types.insert(id.clone(), Some(Arc::clone(field)));
1533 }
1534 _ => {
1535 param_types.insert(id.clone(), None);
1536 }
1537 }
1538 }
1539 Ok(TreeNodeRecursion::Continue)
1540 })
1541 })
1542 })
1543 .map(|_| param_types)
1544 }
1545
1546 pub fn display_indent(&self) -> impl Display + '_ {
1578 struct Wrapper<'a>(&'a LogicalPlan);
1581 impl Display for Wrapper<'_> {
1582 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1583 let with_schema = false;
1584 let mut visitor = IndentVisitor::new(f, with_schema);
1585 match self.0.visit_with_subqueries(&mut visitor) {
1586 Ok(_) => Ok(()),
1587 Err(_) => Err(fmt::Error),
1588 }
1589 }
1590 }
1591 Wrapper(self)
1592 }
1593
1594 pub fn display_indent_schema(&self) -> impl Display + '_ {
1624 struct Wrapper<'a>(&'a LogicalPlan);
1627 impl Display for Wrapper<'_> {
1628 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1629 let with_schema = true;
1630 let mut visitor = IndentVisitor::new(f, with_schema);
1631 match self.0.visit_with_subqueries(&mut visitor) {
1632 Ok(_) => Ok(()),
1633 Err(_) => Err(fmt::Error),
1634 }
1635 }
1636 }
1637 Wrapper(self)
1638 }
1639
1640 pub fn display_pg_json(&self) -> impl Display + '_ {
1644 struct Wrapper<'a>(&'a LogicalPlan);
1647 impl Display for Wrapper<'_> {
1648 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1649 let mut visitor = PgJsonVisitor::new(f);
1650 visitor.with_schema(true);
1651 match self.0.visit_with_subqueries(&mut visitor) {
1652 Ok(_) => Ok(()),
1653 Err(_) => Err(fmt::Error),
1654 }
1655 }
1656 }
1657 Wrapper(self)
1658 }
1659
1660 pub fn display_graphviz(&self) -> impl Display + '_ {
1690 struct Wrapper<'a>(&'a LogicalPlan);
1693 impl Display for Wrapper<'_> {
1694 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1695 let mut visitor = GraphvizVisitor::new(f);
1696
1697 visitor.start_graph()?;
1698
1699 visitor.pre_visit_plan("LogicalPlan")?;
1700 self.0
1701 .visit_with_subqueries(&mut visitor)
1702 .map_err(|_| fmt::Error)?;
1703 visitor.post_visit_plan()?;
1704
1705 visitor.set_with_schema(true);
1706 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1707 self.0
1708 .visit_with_subqueries(&mut visitor)
1709 .map_err(|_| fmt::Error)?;
1710 visitor.post_visit_plan()?;
1711
1712 visitor.end_graph()?;
1713 Ok(())
1714 }
1715 }
1716 Wrapper(self)
1717 }
1718
1719 pub fn display(&self) -> impl Display + '_ {
1741 struct Wrapper<'a>(&'a LogicalPlan);
1744 impl Display for Wrapper<'_> {
1745 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1746 match self.0 {
1747 LogicalPlan::EmptyRelation(EmptyRelation { produce_one_row, schema: _ }) => {
1748 let rows = if *produce_one_row { 1 } else { 0 };
1749 write!(f, "EmptyRelation: rows={rows}")
1750 },
1751 LogicalPlan::RecursiveQuery(RecursiveQuery {
1752 is_distinct, ..
1753 }) => {
1754 write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1755 }
1756 LogicalPlan::Values(Values { ref values, .. }) => {
1757 let str_values: Vec<_> = values
1758 .iter()
1759 .take(5)
1761 .map(|row| {
1762 let item = row
1763 .iter()
1764 .map(|expr| expr.to_string())
1765 .collect::<Vec<_>>()
1766 .join(", ");
1767 format!("({item})")
1768 })
1769 .collect();
1770
1771 let eclipse = if values.len() > 5 { "..." } else { "" };
1772 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1773 }
1774
1775 LogicalPlan::TableScan(TableScan {
1776 ref source,
1777 ref table_name,
1778 ref projection,
1779 ref filters,
1780 ref fetch,
1781 ..
1782 }) => {
1783 let projected_fields = match projection {
1784 Some(indices) => {
1785 let schema = source.schema();
1786 let names: Vec<&str> = indices
1787 .iter()
1788 .map(|i| schema.field(*i).name().as_str())
1789 .collect();
1790 format!(" projection=[{}]", names.join(", "))
1791 }
1792 _ => "".to_string(),
1793 };
1794
1795 write!(f, "TableScan: {table_name}{projected_fields}")?;
1796
1797 if !filters.is_empty() {
1798 let mut full_filter = vec![];
1799 let mut partial_filter = vec![];
1800 let mut unsupported_filters = vec![];
1801 let filters: Vec<&Expr> = filters.iter().collect();
1802
1803 if let Ok(results) =
1804 source.supports_filters_pushdown(&filters)
1805 {
1806 filters.iter().zip(results.iter()).for_each(
1807 |(x, res)| match res {
1808 TableProviderFilterPushDown::Exact => {
1809 full_filter.push(x)
1810 }
1811 TableProviderFilterPushDown::Inexact => {
1812 partial_filter.push(x)
1813 }
1814 TableProviderFilterPushDown::Unsupported => {
1815 unsupported_filters.push(x)
1816 }
1817 },
1818 );
1819 }
1820
1821 if !full_filter.is_empty() {
1822 write!(
1823 f,
1824 ", full_filters=[{}]",
1825 expr_vec_fmt!(full_filter)
1826 )?;
1827 };
1828 if !partial_filter.is_empty() {
1829 write!(
1830 f,
1831 ", partial_filters=[{}]",
1832 expr_vec_fmt!(partial_filter)
1833 )?;
1834 }
1835 if !unsupported_filters.is_empty() {
1836 write!(
1837 f,
1838 ", unsupported_filters=[{}]",
1839 expr_vec_fmt!(unsupported_filters)
1840 )?;
1841 }
1842 }
1843
1844 if let Some(n) = fetch {
1845 write!(f, ", fetch={n}")?;
1846 }
1847
1848 Ok(())
1849 }
1850 LogicalPlan::Projection(Projection { ref expr, .. }) => {
1851 write!(f, "Projection:")?;
1852 for (i, expr_item) in expr.iter().enumerate() {
1853 if i > 0 {
1854 write!(f, ",")?;
1855 }
1856 write!(f, " {expr_item}")?;
1857 }
1858 Ok(())
1859 }
1860 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1861 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1862 }
1863 LogicalPlan::Copy(CopyTo {
1864 input: _,
1865 output_url,
1866 file_type,
1867 options,
1868 ..
1869 }) => {
1870 let op_str = options
1871 .iter()
1872 .map(|(k, v)| format!("{k} {v}"))
1873 .collect::<Vec<String>>()
1874 .join(", ");
1875
1876 write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1877 }
1878 LogicalPlan::Ddl(ddl) => {
1879 write!(f, "{}", ddl.display())
1880 }
1881 LogicalPlan::Filter(Filter {
1882 predicate: ref expr,
1883 ..
1884 }) => write!(f, "Filter: {expr}"),
1885 LogicalPlan::Window(Window {
1886 ref window_expr, ..
1887 }) => {
1888 write!(
1889 f,
1890 "WindowAggr: windowExpr=[[{}]]",
1891 expr_vec_fmt!(window_expr)
1892 )
1893 }
1894 LogicalPlan::Aggregate(Aggregate {
1895 ref group_expr,
1896 ref aggr_expr,
1897 ..
1898 }) => write!(
1899 f,
1900 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1901 expr_vec_fmt!(group_expr),
1902 expr_vec_fmt!(aggr_expr)
1903 ),
1904 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1905 write!(f, "Sort: ")?;
1906 for (i, expr_item) in expr.iter().enumerate() {
1907 if i > 0 {
1908 write!(f, ", ")?;
1909 }
1910 write!(f, "{expr_item}")?;
1911 }
1912 if let Some(a) = fetch {
1913 write!(f, ", fetch={a}")?;
1914 }
1915
1916 Ok(())
1917 }
1918 LogicalPlan::Join(Join {
1919 on: ref keys,
1920 filter,
1921 join_constraint,
1922 join_type,
1923 ..
1924 }) => {
1925 let join_expr: Vec<String> =
1926 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1927 let filter_expr = filter
1928 .as_ref()
1929 .map(|expr| format!(" Filter: {expr}"))
1930 .unwrap_or_else(|| "".to_string());
1931 let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1932 "Cross".to_string()
1933 } else {
1934 join_type.to_string()
1935 };
1936 match join_constraint {
1937 JoinConstraint::On => {
1938 write!(
1939 f,
1940 "{} Join: {}{}",
1941 join_type,
1942 join_expr.join(", "),
1943 filter_expr
1944 )
1945 }
1946 JoinConstraint::Using => {
1947 write!(
1948 f,
1949 "{} Join: Using {}{}",
1950 join_type,
1951 join_expr.join(", "),
1952 filter_expr,
1953 )
1954 }
1955 }
1956 }
1957 LogicalPlan::Repartition(Repartition {
1958 partitioning_scheme,
1959 ..
1960 }) => match partitioning_scheme {
1961 Partitioning::RoundRobinBatch(n) => {
1962 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1963 }
1964 Partitioning::Hash(expr, n) => {
1965 let hash_expr: Vec<String> =
1966 expr.iter().map(|e| format!("{e}")).collect();
1967 write!(
1968 f,
1969 "Repartition: Hash({}) partition_count={}",
1970 hash_expr.join(", "),
1971 n
1972 )
1973 }
1974 Partitioning::DistributeBy(expr) => {
1975 let dist_by_expr: Vec<String> =
1976 expr.iter().map(|e| format!("{e}")).collect();
1977 write!(
1978 f,
1979 "Repartition: DistributeBy({})",
1980 dist_by_expr.join(", "),
1981 )
1982 }
1983 },
1984 LogicalPlan::Limit(limit) => {
1985 let skip_str = match limit.get_skip_type() {
1987 Ok(SkipType::Literal(n)) => n.to_string(),
1988 _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1989 };
1990 let fetch_str = match limit.get_fetch_type() {
1991 Ok(FetchType::Literal(Some(n))) => n.to_string(),
1992 Ok(FetchType::Literal(None)) => "None".to_string(),
1993 _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1994 };
1995 write!(
1996 f,
1997 "Limit: skip={skip_str}, fetch={fetch_str}",
1998 )
1999 }
2000 LogicalPlan::Subquery(Subquery { .. }) => {
2001 write!(f, "Subquery:")
2002 }
2003 LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
2004 write!(f, "SubqueryAlias: {alias}")
2005 }
2006 LogicalPlan::Statement(statement) => {
2007 write!(f, "{}", statement.display())
2008 }
2009 LogicalPlan::Distinct(distinct) => match distinct {
2010 Distinct::All(_) => write!(f, "Distinct:"),
2011 Distinct::On(DistinctOn {
2012 on_expr,
2013 select_expr,
2014 sort_expr,
2015 ..
2016 }) => write!(
2017 f,
2018 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2019 expr_vec_fmt!(on_expr),
2020 expr_vec_fmt!(select_expr),
2021 if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
2022 ),
2023 },
2024 LogicalPlan::Explain { .. } => write!(f, "Explain"),
2025 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2026 LogicalPlan::Union(_) => write!(f, "Union"),
2027 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2028 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2029 write!(f, "DescribeTable")
2030 }
2031 LogicalPlan::Unnest(Unnest {
2032 input: plan,
2033 list_type_columns: list_col_indices,
2034 struct_type_columns: struct_col_indices, .. }) => {
2035 let input_columns = plan.schema().columns();
2036 let list_type_columns = list_col_indices
2037 .iter()
2038 .map(|(i,unnest_info)|
2039 format!("{}|depth={}", &input_columns[*i].to_string(),
2040 unnest_info.depth))
2041 .collect::<Vec<String>>();
2042 let struct_type_columns = struct_col_indices
2043 .iter()
2044 .map(|i| &input_columns[*i])
2045 .collect::<Vec<&Column>>();
2046 write!(f, "Unnest: lists[{}] structs[{}]",
2048 expr_vec_fmt!(list_type_columns),
2049 expr_vec_fmt!(struct_type_columns))
2050 }
2051 }
2052 }
2053 }
2054 Wrapper(self)
2055 }
2056}
2057
2058impl Display for LogicalPlan {
2059 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2060 self.display_indent().fmt(f)
2061 }
2062}
2063
2064impl ToStringifiedPlan for LogicalPlan {
2065 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2066 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2067 }
2068}
2069
2070#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2074pub struct EmptyRelation {
2075 pub produce_one_row: bool,
2077 pub schema: DFSchemaRef,
2079}
2080
2081impl PartialOrd for EmptyRelation {
2083 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2084 self.produce_one_row
2085 .partial_cmp(&other.produce_one_row)
2086 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2088 }
2089}
2090
2091#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2114pub struct RecursiveQuery {
2115 pub name: String,
2117 pub static_term: Arc<LogicalPlan>,
2119 pub recursive_term: Arc<LogicalPlan>,
2122 pub is_distinct: bool,
2125}
2126
2127#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2131pub struct Values {
2132 pub schema: DFSchemaRef,
2134 pub values: Vec<Vec<Expr>>,
2136}
2137
2138impl PartialOrd for Values {
2140 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2141 self.values
2142 .partial_cmp(&other.values)
2143 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2145 }
2146}
2147
2148#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2151#[non_exhaustive]
2153pub struct Projection {
2154 pub expr: Vec<Expr>,
2156 pub input: Arc<LogicalPlan>,
2158 pub schema: DFSchemaRef,
2160}
2161
2162impl PartialOrd for Projection {
2164 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2165 match self.expr.partial_cmp(&other.expr) {
2166 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2167 cmp => cmp,
2168 }
2169 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2171 }
2172}
2173
2174impl Projection {
2175 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2177 let projection_schema = projection_schema(&input, &expr)?;
2178 Self::try_new_with_schema(expr, input, projection_schema)
2179 }
2180
2181 pub fn try_new_with_schema(
2183 expr: Vec<Expr>,
2184 input: Arc<LogicalPlan>,
2185 schema: DFSchemaRef,
2186 ) -> Result<Self> {
2187 #[expect(deprecated)]
2188 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2189 && expr.len() != schema.fields().len()
2190 {
2191 return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2192 }
2193 Ok(Self {
2194 expr,
2195 input,
2196 schema,
2197 })
2198 }
2199
2200 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2202 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2203 Self {
2204 expr,
2205 input,
2206 schema,
2207 }
2208 }
2209}
2210
2211pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2231 let metadata = input.schema().metadata().clone();
2233
2234 let schema =
2236 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2237 .with_functional_dependencies(calc_func_dependencies_for_project(
2238 exprs, input,
2239 )?)?;
2240
2241 Ok(Arc::new(schema))
2242}
2243
2244#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2246#[non_exhaustive]
2248pub struct SubqueryAlias {
2249 pub input: Arc<LogicalPlan>,
2251 pub alias: TableReference,
2253 pub schema: DFSchemaRef,
2255}
2256
2257impl SubqueryAlias {
2258 pub fn try_new(
2259 plan: Arc<LogicalPlan>,
2260 alias: impl Into<TableReference>,
2261 ) -> Result<Self> {
2262 let alias = alias.into();
2263
2264 let aliases = unique_field_aliases(plan.schema().fields());
2270 let is_projection_needed = aliases.iter().any(Option::is_some);
2271
2272 let plan = if is_projection_needed {
2274 let projection_expressions = aliases
2275 .iter()
2276 .zip(plan.schema().iter())
2277 .map(|(alias, (qualifier, field))| {
2278 let column =
2279 Expr::Column(Column::new(qualifier.cloned(), field.name()));
2280 match alias {
2281 None => column,
2282 Some(alias) => {
2283 Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2284 }
2285 }
2286 })
2287 .collect();
2288 let projection = Projection::try_new(projection_expressions, plan)?;
2289 Arc::new(LogicalPlan::Projection(projection))
2290 } else {
2291 plan
2292 };
2293
2294 let fields = plan.schema().fields().clone();
2296 let meta_data = plan.schema().metadata().clone();
2297 let func_dependencies = plan.schema().functional_dependencies().clone();
2298
2299 let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2300 let schema = schema.as_arrow();
2301
2302 let schema = DFSchemaRef::new(
2303 DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2304 .with_functional_dependencies(func_dependencies)?,
2305 );
2306 Ok(SubqueryAlias {
2307 input: plan,
2308 alias,
2309 schema,
2310 })
2311 }
2312}
2313
2314impl PartialOrd for SubqueryAlias {
2316 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2317 match self.input.partial_cmp(&other.input) {
2318 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2319 cmp => cmp,
2320 }
2321 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2323 }
2324}
2325
2326#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2338#[non_exhaustive]
2339pub struct Filter {
2340 pub predicate: Expr,
2342 pub input: Arc<LogicalPlan>,
2344}
2345
2346impl Filter {
2347 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2352 Self::try_new_internal(predicate, input)
2353 }
2354
2355 #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2358 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2359 Self::try_new_internal(predicate, input)
2360 }
2361
2362 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2363 match data_type {
2364 DataType::Boolean | DataType::Null => true,
2366 DataType::Dictionary(_, value_type) => {
2367 Filter::is_allowed_filter_type(value_type.as_ref())
2368 }
2369 _ => false,
2370 }
2371 }
2372
2373 fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2374 if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2379 if !Filter::is_allowed_filter_type(&predicate_type) {
2380 return plan_err!(
2381 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2382 );
2383 }
2384 }
2385
2386 Ok(Self {
2387 predicate: predicate.unalias_nested().data,
2388 input,
2389 })
2390 }
2391
2392 fn is_scalar(&self) -> bool {
2408 let schema = self.input.schema();
2409
2410 let functional_dependencies = self.input.schema().functional_dependencies();
2411 let unique_keys = functional_dependencies.iter().filter(|dep| {
2412 let nullable = dep.nullable
2413 && dep
2414 .source_indices
2415 .iter()
2416 .any(|&source| schema.field(source).is_nullable());
2417 !nullable
2418 && dep.mode == Dependency::Single
2419 && dep.target_indices.len() == schema.fields().len()
2420 });
2421
2422 let exprs = split_conjunction(&self.predicate);
2423 let eq_pred_cols: HashSet<_> = exprs
2424 .iter()
2425 .filter_map(|expr| {
2426 let Expr::BinaryExpr(BinaryExpr {
2427 left,
2428 op: Operator::Eq,
2429 right,
2430 }) = expr
2431 else {
2432 return None;
2433 };
2434 if left == right {
2436 return None;
2437 }
2438
2439 match (left.as_ref(), right.as_ref()) {
2440 (Expr::Column(_), Expr::Column(_)) => None,
2441 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2442 Some(schema.index_of_column(c).unwrap())
2443 }
2444 _ => None,
2445 }
2446 })
2447 .collect();
2448
2449 for key in unique_keys {
2452 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2453 return true;
2454 }
2455 }
2456 false
2457 }
2458}
2459
2460#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2475pub struct Window {
2476 pub input: Arc<LogicalPlan>,
2478 pub window_expr: Vec<Expr>,
2480 pub schema: DFSchemaRef,
2482}
2483
2484impl Window {
2485 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2487 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2488 .schema()
2489 .iter()
2490 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2491 .collect();
2492 let input_len = fields.len();
2493 let mut window_fields = fields;
2494 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2495 window_fields.extend_from_slice(expr_fields.as_slice());
2496 let metadata = input.schema().metadata().clone();
2497
2498 let mut window_func_dependencies =
2500 input.schema().functional_dependencies().clone();
2501 window_func_dependencies.extend_target_indices(window_fields.len());
2502
2503 let mut new_dependencies = window_expr
2507 .iter()
2508 .enumerate()
2509 .filter_map(|(idx, expr)| {
2510 let Expr::WindowFunction(window_fun) = expr else {
2511 return None;
2512 };
2513 let WindowFunction {
2514 fun: WindowFunctionDefinition::WindowUDF(udwf),
2515 params: WindowFunctionParams { partition_by, .. },
2516 } = window_fun.as_ref()
2517 else {
2518 return None;
2519 };
2520 if udwf.name() == "row_number" && partition_by.is_empty() {
2523 Some(idx + input_len)
2524 } else {
2525 None
2526 }
2527 })
2528 .map(|idx| {
2529 FunctionalDependence::new(vec![idx], vec![], false)
2530 .with_mode(Dependency::Single)
2531 })
2532 .collect::<Vec<_>>();
2533
2534 if !new_dependencies.is_empty() {
2535 for dependence in new_dependencies.iter_mut() {
2536 dependence.target_indices = (0..window_fields.len()).collect();
2537 }
2538 let new_deps = FunctionalDependencies::new(new_dependencies);
2540 window_func_dependencies.extend(new_deps);
2541 }
2542
2543 if let Some(e) = window_expr.iter().find(|e| {
2545 matches!(
2546 e,
2547 Expr::WindowFunction(wf)
2548 if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2549 && wf.params.filter.is_some()
2550 )
2551 }) {
2552 return plan_err!(
2553 "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2554 );
2555 }
2556
2557 Self::try_new_with_schema(
2558 window_expr,
2559 input,
2560 Arc::new(
2561 DFSchema::new_with_metadata(window_fields, metadata)?
2562 .with_functional_dependencies(window_func_dependencies)?,
2563 ),
2564 )
2565 }
2566
2567 pub fn try_new_with_schema(
2573 window_expr: Vec<Expr>,
2574 input: Arc<LogicalPlan>,
2575 schema: DFSchemaRef,
2576 ) -> Result<Self> {
2577 let input_fields_count = input.schema().fields().len();
2578 if schema.fields().len() != input_fields_count + window_expr.len() {
2579 return plan_err!(
2580 "Window schema has wrong number of fields. Expected {} got {}",
2581 input_fields_count + window_expr.len(),
2582 schema.fields().len()
2583 );
2584 }
2585
2586 Ok(Window {
2587 input,
2588 window_expr,
2589 schema,
2590 })
2591 }
2592}
2593
2594impl PartialOrd for Window {
2596 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2597 match self.input.partial_cmp(&other.input)? {
2598 Ordering::Equal => {} not_equal => return Some(not_equal),
2600 }
2601
2602 match self.window_expr.partial_cmp(&other.window_expr)? {
2603 Ordering::Equal => {} not_equal => return Some(not_equal),
2605 }
2606
2607 if self == other {
2610 Some(Ordering::Equal)
2611 } else {
2612 None
2613 }
2614 }
2615}
2616
2617#[derive(Clone)]
2619pub struct TableScan {
2620 pub table_name: TableReference,
2622 pub source: Arc<dyn TableSource>,
2624 pub projection: Option<Vec<usize>>,
2626 pub projected_schema: DFSchemaRef,
2628 pub filters: Vec<Expr>,
2630 pub fetch: Option<usize>,
2632}
2633
2634impl Debug for TableScan {
2635 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2636 f.debug_struct("TableScan")
2637 .field("table_name", &self.table_name)
2638 .field("source", &"...")
2639 .field("projection", &self.projection)
2640 .field("projected_schema", &self.projected_schema)
2641 .field("filters", &self.filters)
2642 .field("fetch", &self.fetch)
2643 .finish_non_exhaustive()
2644 }
2645}
2646
2647impl PartialEq for TableScan {
2648 fn eq(&self, other: &Self) -> bool {
2649 self.table_name == other.table_name
2650 && self.projection == other.projection
2651 && self.projected_schema == other.projected_schema
2652 && self.filters == other.filters
2653 && self.fetch == other.fetch
2654 }
2655}
2656
2657impl Eq for TableScan {}
2658
2659impl PartialOrd for TableScan {
2662 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2663 #[derive(PartialEq, PartialOrd)]
2664 struct ComparableTableScan<'a> {
2665 pub table_name: &'a TableReference,
2667 pub projection: &'a Option<Vec<usize>>,
2669 pub filters: &'a Vec<Expr>,
2671 pub fetch: &'a Option<usize>,
2673 }
2674 let comparable_self = ComparableTableScan {
2675 table_name: &self.table_name,
2676 projection: &self.projection,
2677 filters: &self.filters,
2678 fetch: &self.fetch,
2679 };
2680 let comparable_other = ComparableTableScan {
2681 table_name: &other.table_name,
2682 projection: &other.projection,
2683 filters: &other.filters,
2684 fetch: &other.fetch,
2685 };
2686 comparable_self
2687 .partial_cmp(&comparable_other)
2688 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2690 }
2691}
2692
2693impl Hash for TableScan {
2694 fn hash<H: Hasher>(&self, state: &mut H) {
2695 self.table_name.hash(state);
2696 self.projection.hash(state);
2697 self.projected_schema.hash(state);
2698 self.filters.hash(state);
2699 self.fetch.hash(state);
2700 }
2701}
2702
2703impl TableScan {
2704 pub fn try_new(
2707 table_name: impl Into<TableReference>,
2708 table_source: Arc<dyn TableSource>,
2709 projection: Option<Vec<usize>>,
2710 filters: Vec<Expr>,
2711 fetch: Option<usize>,
2712 ) -> Result<Self> {
2713 let table_name = table_name.into();
2714
2715 if table_name.table().is_empty() {
2716 return plan_err!("table_name cannot be empty");
2717 }
2718 let schema = table_source.schema();
2719 let func_dependencies = FunctionalDependencies::new_from_constraints(
2720 table_source.constraints(),
2721 schema.fields.len(),
2722 );
2723 let projected_schema = projection
2724 .as_ref()
2725 .map(|p| {
2726 let projected_func_dependencies =
2727 func_dependencies.project_functional_dependencies(p, p.len());
2728
2729 let df_schema = DFSchema::new_with_metadata(
2730 p.iter()
2731 .map(|i| {
2732 (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2733 })
2734 .collect(),
2735 schema.metadata.clone(),
2736 )?;
2737 df_schema.with_functional_dependencies(projected_func_dependencies)
2738 })
2739 .unwrap_or_else(|| {
2740 let df_schema =
2741 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2742 df_schema.with_functional_dependencies(func_dependencies)
2743 })?;
2744 let projected_schema = Arc::new(projected_schema);
2745
2746 Ok(Self {
2747 table_name,
2748 source: table_source,
2749 projection,
2750 projected_schema,
2751 filters,
2752 fetch,
2753 })
2754 }
2755}
2756
2757#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2759pub struct Repartition {
2760 pub input: Arc<LogicalPlan>,
2762 pub partitioning_scheme: Partitioning,
2764}
2765
2766#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2768pub struct Union {
2769 pub inputs: Vec<Arc<LogicalPlan>>,
2771 pub schema: DFSchemaRef,
2773}
2774
2775impl Union {
2776 pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2779 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2780 Ok(Union { inputs, schema })
2781 }
2782
2783 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2788 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2789 Ok(Union { inputs, schema })
2790 }
2791
2792 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2796 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2797 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2798
2799 Ok(Union { inputs, schema })
2800 }
2801
2802 fn rewrite_inputs_from_schema(
2806 schema: &Arc<DFSchema>,
2807 inputs: Vec<Arc<LogicalPlan>>,
2808 ) -> Result<Vec<Arc<LogicalPlan>>> {
2809 let schema_width = schema.iter().count();
2810 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2811 for input in inputs {
2812 let mut expr = Vec::with_capacity(schema_width);
2816 for column in schema.columns() {
2817 if input
2818 .schema()
2819 .has_column_with_unqualified_name(column.name())
2820 {
2821 expr.push(Expr::Column(column));
2822 } else {
2823 expr.push(
2824 Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2825 );
2826 }
2827 }
2828 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2829 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2830 )));
2831 }
2832
2833 Ok(wrapped_inputs)
2834 }
2835
2836 fn derive_schema_from_inputs(
2845 inputs: &[Arc<LogicalPlan>],
2846 loose_types: bool,
2847 by_name: bool,
2848 ) -> Result<DFSchemaRef> {
2849 if inputs.len() < 2 {
2850 return plan_err!("UNION requires at least two inputs");
2851 }
2852
2853 if by_name {
2854 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2855 } else {
2856 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2857 }
2858 }
2859
2860 fn derive_schema_from_inputs_by_name(
2861 inputs: &[Arc<LogicalPlan>],
2862 loose_types: bool,
2863 ) -> Result<DFSchemaRef> {
2864 type FieldData<'a> =
2865 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2866 let mut cols: Vec<(&str, FieldData)> = Vec::new();
2867 for input in inputs.iter() {
2868 for field in input.schema().fields() {
2869 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2870 cols.iter_mut().find(|(name, _)| name == field.name())
2871 {
2872 if !loose_types && *data_type != field.data_type() {
2873 return plan_err!(
2874 "Found different types for field {}",
2875 field.name()
2876 );
2877 }
2878
2879 metadata.push(field.metadata());
2880 *is_nullable |= field.is_nullable();
2883 *occurrences += 1;
2884 } else {
2885 cols.push((
2886 field.name(),
2887 (
2888 field.data_type(),
2889 field.is_nullable(),
2890 vec![field.metadata()],
2891 1,
2892 ),
2893 ));
2894 }
2895 }
2896 }
2897
2898 let union_fields = cols
2899 .into_iter()
2900 .map(
2901 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2902 let final_is_nullable = if occurrences == inputs.len() {
2906 is_nullable
2907 } else {
2908 true
2909 };
2910
2911 let mut field =
2912 Field::new(name, data_type.clone(), final_is_nullable);
2913 field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
2914
2915 (None, Arc::new(field))
2916 },
2917 )
2918 .collect::<Vec<(Option<TableReference>, _)>>();
2919
2920 let union_schema_metadata = intersect_metadata_for_union(
2921 inputs.iter().map(|input| input.schema().metadata()),
2922 );
2923
2924 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2926 let schema = Arc::new(schema);
2927
2928 Ok(schema)
2929 }
2930
2931 fn derive_schema_from_inputs_by_position(
2932 inputs: &[Arc<LogicalPlan>],
2933 loose_types: bool,
2934 ) -> Result<DFSchemaRef> {
2935 let first_schema = inputs[0].schema();
2936 let fields_count = first_schema.fields().len();
2937 for input in inputs.iter().skip(1) {
2938 if fields_count != input.schema().fields().len() {
2939 return plan_err!(
2940 "UNION queries have different number of columns: \
2941 left has {} columns whereas right has {} columns",
2942 fields_count,
2943 input.schema().fields().len()
2944 );
2945 }
2946 }
2947
2948 let mut name_counts: HashMap<String, usize> = HashMap::new();
2949 let union_fields = (0..fields_count)
2950 .map(|i| {
2951 let fields = inputs
2952 .iter()
2953 .map(|input| input.schema().field(i))
2954 .collect::<Vec<_>>();
2955 let first_field = fields[0];
2956 let base_name = first_field.name().to_string();
2957
2958 let data_type = if loose_types {
2959 first_field.data_type()
2963 } else {
2964 fields.iter().skip(1).try_fold(
2965 first_field.data_type(),
2966 |acc, field| {
2967 if acc != field.data_type() {
2968 return plan_err!(
2969 "UNION field {i} have different type in inputs: \
2970 left has {} whereas right has {}",
2971 first_field.data_type(),
2972 field.data_type()
2973 );
2974 }
2975 Ok(acc)
2976 },
2977 )?
2978 };
2979 let nullable = fields.iter().any(|field| field.is_nullable());
2980
2981 let name = if let Some(count) = name_counts.get_mut(&base_name) {
2983 *count += 1;
2984 format!("{base_name}_{count}")
2985 } else {
2986 name_counts.insert(base_name.clone(), 0);
2987 base_name
2988 };
2989
2990 let mut field = Field::new(&name, data_type.clone(), nullable);
2991 let field_metadata = intersect_metadata_for_union(
2992 fields.iter().map(|field| field.metadata()),
2993 );
2994 field.set_metadata(field_metadata);
2995 Ok((None, Arc::new(field)))
2996 })
2997 .collect::<Result<_>>()?;
2998 let union_schema_metadata = intersect_metadata_for_union(
2999 inputs.iter().map(|input| input.schema().metadata()),
3000 );
3001
3002 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3004 let schema = Arc::new(schema);
3005
3006 Ok(schema)
3007 }
3008}
3009
3010impl PartialOrd for Union {
3012 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3013 self.inputs
3014 .partial_cmp(&other.inputs)
3015 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3017 }
3018}
3019
3020#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3043pub struct DescribeTable {
3044 pub schema: Arc<Schema>,
3046 pub output_schema: DFSchemaRef,
3048}
3049
3050impl PartialOrd for DescribeTable {
3053 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3054 None
3056 }
3057}
3058
3059#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3061pub struct ExplainOption {
3062 pub verbose: bool,
3064 pub analyze: bool,
3066 pub format: ExplainFormat,
3068}
3069
3070impl Default for ExplainOption {
3071 fn default() -> Self {
3072 ExplainOption {
3073 verbose: false,
3074 analyze: false,
3075 format: ExplainFormat::Indent,
3076 }
3077 }
3078}
3079
3080impl ExplainOption {
3081 pub fn with_verbose(mut self, verbose: bool) -> Self {
3083 self.verbose = verbose;
3084 self
3085 }
3086
3087 pub fn with_analyze(mut self, analyze: bool) -> Self {
3089 self.analyze = analyze;
3090 self
3091 }
3092
3093 pub fn with_format(mut self, format: ExplainFormat) -> Self {
3095 self.format = format;
3096 self
3097 }
3098}
3099
3100#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3107pub struct Explain {
3108 pub verbose: bool,
3110 pub explain_format: ExplainFormat,
3113 pub plan: Arc<LogicalPlan>,
3115 pub stringified_plans: Vec<StringifiedPlan>,
3117 pub schema: DFSchemaRef,
3119 pub logical_optimization_succeeded: bool,
3121}
3122
3123impl PartialOrd for Explain {
3125 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3126 #[derive(PartialEq, PartialOrd)]
3127 struct ComparableExplain<'a> {
3128 pub verbose: &'a bool,
3130 pub plan: &'a Arc<LogicalPlan>,
3132 pub stringified_plans: &'a Vec<StringifiedPlan>,
3134 pub logical_optimization_succeeded: &'a bool,
3136 }
3137 let comparable_self = ComparableExplain {
3138 verbose: &self.verbose,
3139 plan: &self.plan,
3140 stringified_plans: &self.stringified_plans,
3141 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3142 };
3143 let comparable_other = ComparableExplain {
3144 verbose: &other.verbose,
3145 plan: &other.plan,
3146 stringified_plans: &other.stringified_plans,
3147 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3148 };
3149 comparable_self
3150 .partial_cmp(&comparable_other)
3151 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3153 }
3154}
3155
3156#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3159pub struct Analyze {
3160 pub verbose: bool,
3162 pub input: Arc<LogicalPlan>,
3164 pub schema: DFSchemaRef,
3166}
3167
3168impl PartialOrd for Analyze {
3170 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3171 match self.verbose.partial_cmp(&other.verbose) {
3172 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3173 cmp => cmp,
3174 }
3175 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3177 }
3178}
3179
3180#[allow(clippy::derived_hash_with_manual_eq)]
3185#[derive(Debug, Clone, Eq, Hash)]
3186pub struct Extension {
3187 pub node: Arc<dyn UserDefinedLogicalNode>,
3189}
3190
3191impl PartialEq for Extension {
3195 fn eq(&self, other: &Self) -> bool {
3196 self.node.eq(&other.node)
3197 }
3198}
3199
3200impl PartialOrd for Extension {
3201 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3202 self.node.partial_cmp(&other.node)
3203 }
3204}
3205
3206#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3208pub struct Limit {
3209 pub skip: Option<Box<Expr>>,
3211 pub fetch: Option<Box<Expr>>,
3214 pub input: Arc<LogicalPlan>,
3216}
3217
3218pub enum SkipType {
3220 Literal(usize),
3222 UnsupportedExpr,
3224}
3225
3226pub enum FetchType {
3228 Literal(Option<usize>),
3231 UnsupportedExpr,
3233}
3234
3235impl Limit {
3236 pub fn get_skip_type(&self) -> Result<SkipType> {
3238 match self.skip.as_deref() {
3239 Some(expr) => match *expr {
3240 Expr::Literal(ScalarValue::Int64(s), _) => {
3241 let s = s.unwrap_or(0);
3243 if s >= 0 {
3244 Ok(SkipType::Literal(s as usize))
3245 } else {
3246 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3247 }
3248 }
3249 _ => Ok(SkipType::UnsupportedExpr),
3250 },
3251 None => Ok(SkipType::Literal(0)),
3253 }
3254 }
3255
3256 pub fn get_fetch_type(&self) -> Result<FetchType> {
3258 match self.fetch.as_deref() {
3259 Some(expr) => match *expr {
3260 Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3261 if s >= 0 {
3262 Ok(FetchType::Literal(Some(s as usize)))
3263 } else {
3264 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3265 }
3266 }
3267 Expr::Literal(ScalarValue::Int64(None), _) => {
3268 Ok(FetchType::Literal(None))
3269 }
3270 _ => Ok(FetchType::UnsupportedExpr),
3271 },
3272 None => Ok(FetchType::Literal(None)),
3273 }
3274 }
3275}
3276
3277#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3279pub enum Distinct {
3280 All(Arc<LogicalPlan>),
3282 On(DistinctOn),
3284}
3285
3286impl Distinct {
3287 pub fn input(&self) -> &Arc<LogicalPlan> {
3289 match self {
3290 Distinct::All(input) => input,
3291 Distinct::On(DistinctOn { input, .. }) => input,
3292 }
3293 }
3294}
3295
3296#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3298pub struct DistinctOn {
3299 pub on_expr: Vec<Expr>,
3301 pub select_expr: Vec<Expr>,
3303 pub sort_expr: Option<Vec<SortExpr>>,
3307 pub input: Arc<LogicalPlan>,
3309 pub schema: DFSchemaRef,
3311}
3312
3313impl DistinctOn {
3314 pub fn try_new(
3316 on_expr: Vec<Expr>,
3317 select_expr: Vec<Expr>,
3318 sort_expr: Option<Vec<SortExpr>>,
3319 input: Arc<LogicalPlan>,
3320 ) -> Result<Self> {
3321 if on_expr.is_empty() {
3322 return plan_err!("No `ON` expressions provided");
3323 }
3324
3325 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3326 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3327 .into_iter()
3328 .collect();
3329
3330 let dfschema = DFSchema::new_with_metadata(
3331 qualified_fields,
3332 input.schema().metadata().clone(),
3333 )?;
3334
3335 let mut distinct_on = DistinctOn {
3336 on_expr,
3337 select_expr,
3338 sort_expr: None,
3339 input,
3340 schema: Arc::new(dfschema),
3341 };
3342
3343 if let Some(sort_expr) = sort_expr {
3344 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3345 }
3346
3347 Ok(distinct_on)
3348 }
3349
3350 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3354 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3355
3356 let mut matched = true;
3358 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3359 if on != &sort.expr {
3360 matched = false;
3361 break;
3362 }
3363 }
3364
3365 if self.on_expr.len() > sort_expr.len() || !matched {
3366 return plan_err!(
3367 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3368 );
3369 }
3370
3371 self.sort_expr = Some(sort_expr);
3372 Ok(self)
3373 }
3374}
3375
3376impl PartialOrd for DistinctOn {
3378 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3379 #[derive(PartialEq, PartialOrd)]
3380 struct ComparableDistinctOn<'a> {
3381 pub on_expr: &'a Vec<Expr>,
3383 pub select_expr: &'a Vec<Expr>,
3385 pub sort_expr: &'a Option<Vec<SortExpr>>,
3389 pub input: &'a Arc<LogicalPlan>,
3391 }
3392 let comparable_self = ComparableDistinctOn {
3393 on_expr: &self.on_expr,
3394 select_expr: &self.select_expr,
3395 sort_expr: &self.sort_expr,
3396 input: &self.input,
3397 };
3398 let comparable_other = ComparableDistinctOn {
3399 on_expr: &other.on_expr,
3400 select_expr: &other.select_expr,
3401 sort_expr: &other.sort_expr,
3402 input: &other.input,
3403 };
3404 comparable_self
3405 .partial_cmp(&comparable_other)
3406 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3408 }
3409}
3410
3411#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3424#[non_exhaustive]
3426pub struct Aggregate {
3427 pub input: Arc<LogicalPlan>,
3429 pub group_expr: Vec<Expr>,
3431 pub aggr_expr: Vec<Expr>,
3433 pub schema: DFSchemaRef,
3435}
3436
3437impl Aggregate {
3438 pub fn try_new(
3440 input: Arc<LogicalPlan>,
3441 group_expr: Vec<Expr>,
3442 aggr_expr: Vec<Expr>,
3443 ) -> Result<Self> {
3444 let group_expr = enumerate_grouping_sets(group_expr)?;
3445
3446 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3447
3448 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3449
3450 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3451
3452 if is_grouping_set {
3454 qualified_fields = qualified_fields
3455 .into_iter()
3456 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3457 .collect::<Vec<_>>();
3458 qualified_fields.push((
3459 None,
3460 Field::new(
3461 Self::INTERNAL_GROUPING_ID,
3462 Self::grouping_id_type(qualified_fields.len()),
3463 false,
3464 )
3465 .into(),
3466 ));
3467 }
3468
3469 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3470
3471 let schema = DFSchema::new_with_metadata(
3472 qualified_fields,
3473 input.schema().metadata().clone(),
3474 )?;
3475
3476 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3477 }
3478
3479 pub fn try_new_with_schema(
3485 input: Arc<LogicalPlan>,
3486 group_expr: Vec<Expr>,
3487 aggr_expr: Vec<Expr>,
3488 schema: DFSchemaRef,
3489 ) -> Result<Self> {
3490 if group_expr.is_empty() && aggr_expr.is_empty() {
3491 return plan_err!(
3492 "Aggregate requires at least one grouping or aggregate expression. \
3493 Aggregate without grouping expressions nor aggregate expressions is \
3494 logically equivalent to, but less efficient than, VALUES producing \
3495 single row. Please use VALUES instead."
3496 );
3497 }
3498 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3499 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3500 return plan_err!(
3501 "Aggregate schema has wrong number of fields. Expected {} got {}",
3502 group_expr_count + aggr_expr.len(),
3503 schema.fields().len()
3504 );
3505 }
3506
3507 let aggregate_func_dependencies =
3508 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3509 let new_schema = schema.as_ref().clone();
3510 let schema = Arc::new(
3511 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3512 );
3513 Ok(Self {
3514 input,
3515 group_expr,
3516 aggr_expr,
3517 schema,
3518 })
3519 }
3520
3521 fn is_grouping_set(&self) -> bool {
3522 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3523 }
3524
3525 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3527 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3528 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3529 });
3530 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3531 if self.is_grouping_set() {
3532 exprs.push(&INTERNAL_ID_EXPR);
3533 }
3534 exprs.extend(self.aggr_expr.iter());
3535 debug_assert!(exprs.len() == self.schema.fields().len());
3536 Ok(exprs)
3537 }
3538
3539 pub fn group_expr_len(&self) -> Result<usize> {
3543 grouping_set_expr_count(&self.group_expr)
3544 }
3545
3546 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3551 if group_exprs <= 8 {
3552 DataType::UInt8
3553 } else if group_exprs <= 16 {
3554 DataType::UInt16
3555 } else if group_exprs <= 32 {
3556 DataType::UInt32
3557 } else {
3558 DataType::UInt64
3559 }
3560 }
3561
3562 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3580}
3581
3582impl PartialOrd for Aggregate {
3584 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3585 match self.input.partial_cmp(&other.input) {
3586 Some(Ordering::Equal) => {
3587 match self.group_expr.partial_cmp(&other.group_expr) {
3588 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3589 cmp => cmp,
3590 }
3591 }
3592 cmp => cmp,
3593 }
3594 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3596 }
3597}
3598
3599fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3601 group_expr
3602 .iter()
3603 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3604}
3605
3606fn calc_func_dependencies_for_aggregate(
3608 group_expr: &[Expr],
3610 input: &LogicalPlan,
3612 aggr_schema: &DFSchema,
3614) -> Result<FunctionalDependencies> {
3615 if !contains_grouping_set(group_expr) {
3621 let group_by_expr_names = group_expr
3622 .iter()
3623 .map(|item| item.schema_name().to_string())
3624 .collect::<IndexSet<_>>()
3625 .into_iter()
3626 .collect::<Vec<_>>();
3627 let aggregate_func_dependencies = aggregate_functional_dependencies(
3628 input.schema(),
3629 &group_by_expr_names,
3630 aggr_schema,
3631 );
3632 Ok(aggregate_func_dependencies)
3633 } else {
3634 Ok(FunctionalDependencies::empty())
3635 }
3636}
3637
3638fn calc_func_dependencies_for_project(
3641 exprs: &[Expr],
3642 input: &LogicalPlan,
3643) -> Result<FunctionalDependencies> {
3644 let input_fields = input.schema().field_names();
3645 let proj_indices = exprs
3647 .iter()
3648 .map(|expr| match expr {
3649 #[expect(deprecated)]
3650 Expr::Wildcard { qualifier, options } => {
3651 let wildcard_fields = exprlist_to_fields(
3652 vec![&Expr::Wildcard {
3653 qualifier: qualifier.clone(),
3654 options: options.clone(),
3655 }],
3656 input,
3657 )?;
3658 Ok::<_, DataFusionError>(
3659 wildcard_fields
3660 .into_iter()
3661 .filter_map(|(qualifier, f)| {
3662 let flat_name = qualifier
3663 .map(|t| format!("{}.{}", t, f.name()))
3664 .unwrap_or_else(|| f.name().clone());
3665 input_fields.iter().position(|item| *item == flat_name)
3666 })
3667 .collect::<Vec<_>>(),
3668 )
3669 }
3670 Expr::Alias(alias) => {
3671 let name = format!("{}", alias.expr);
3672 Ok(input_fields
3673 .iter()
3674 .position(|item| *item == name)
3675 .map(|i| vec![i])
3676 .unwrap_or(vec![]))
3677 }
3678 _ => {
3679 let name = format!("{expr}");
3680 Ok(input_fields
3681 .iter()
3682 .position(|item| *item == name)
3683 .map(|i| vec![i])
3684 .unwrap_or(vec![]))
3685 }
3686 })
3687 .collect::<Result<Vec<_>>>()?
3688 .into_iter()
3689 .flatten()
3690 .collect::<Vec<_>>();
3691
3692 Ok(input
3693 .schema()
3694 .functional_dependencies()
3695 .project_functional_dependencies(&proj_indices, exprs.len()))
3696}
3697
3698#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3700pub struct Sort {
3701 pub expr: Vec<SortExpr>,
3703 pub input: Arc<LogicalPlan>,
3705 pub fetch: Option<usize>,
3707}
3708
3709#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3711pub struct Join {
3712 pub left: Arc<LogicalPlan>,
3714 pub right: Arc<LogicalPlan>,
3716 pub on: Vec<(Expr, Expr)>,
3718 pub filter: Option<Expr>,
3720 pub join_type: JoinType,
3722 pub join_constraint: JoinConstraint,
3724 pub schema: DFSchemaRef,
3726 pub null_equality: NullEquality,
3728}
3729
3730impl Join {
3731 pub fn try_new(
3750 left: Arc<LogicalPlan>,
3751 right: Arc<LogicalPlan>,
3752 on: Vec<(Expr, Expr)>,
3753 filter: Option<Expr>,
3754 join_type: JoinType,
3755 join_constraint: JoinConstraint,
3756 null_equality: NullEquality,
3757 ) -> Result<Self> {
3758 let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3759
3760 Ok(Join {
3761 left,
3762 right,
3763 on,
3764 filter,
3765 join_type,
3766 join_constraint,
3767 schema: Arc::new(join_schema),
3768 null_equality,
3769 })
3770 }
3771
3772 pub fn try_new_with_project_input(
3775 original: &LogicalPlan,
3776 left: Arc<LogicalPlan>,
3777 right: Arc<LogicalPlan>,
3778 column_on: (Vec<Column>, Vec<Column>),
3779 ) -> Result<(Self, bool)> {
3780 let original_join = match original {
3781 LogicalPlan::Join(join) => join,
3782 _ => return plan_err!("Could not create join with project input"),
3783 };
3784
3785 let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3786 let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3787
3788 let mut requalified = false;
3789
3790 if original_join.join_type == JoinType::Inner
3793 || original_join.join_type == JoinType::Left
3794 || original_join.join_type == JoinType::Right
3795 || original_join.join_type == JoinType::Full
3796 {
3797 (left_sch, right_sch, requalified) =
3798 requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3799 }
3800
3801 let on: Vec<(Expr, Expr)> = column_on
3802 .0
3803 .into_iter()
3804 .zip(column_on.1)
3805 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3806 .collect();
3807
3808 let join_schema = build_join_schema(
3809 left_sch.schema(),
3810 right_sch.schema(),
3811 &original_join.join_type,
3812 )?;
3813
3814 Ok((
3815 Join {
3816 left,
3817 right,
3818 on,
3819 filter: original_join.filter.clone(),
3820 join_type: original_join.join_type,
3821 join_constraint: original_join.join_constraint,
3822 schema: Arc::new(join_schema),
3823 null_equality: original_join.null_equality,
3824 },
3825 requalified,
3826 ))
3827 }
3828}
3829
3830impl PartialOrd for Join {
3832 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3833 #[derive(PartialEq, PartialOrd)]
3834 struct ComparableJoin<'a> {
3835 pub left: &'a Arc<LogicalPlan>,
3837 pub right: &'a Arc<LogicalPlan>,
3839 pub on: &'a Vec<(Expr, Expr)>,
3841 pub filter: &'a Option<Expr>,
3843 pub join_type: &'a JoinType,
3845 pub join_constraint: &'a JoinConstraint,
3847 pub null_equality: &'a NullEquality,
3849 }
3850 let comparable_self = ComparableJoin {
3851 left: &self.left,
3852 right: &self.right,
3853 on: &self.on,
3854 filter: &self.filter,
3855 join_type: &self.join_type,
3856 join_constraint: &self.join_constraint,
3857 null_equality: &self.null_equality,
3858 };
3859 let comparable_other = ComparableJoin {
3860 left: &other.left,
3861 right: &other.right,
3862 on: &other.on,
3863 filter: &other.filter,
3864 join_type: &other.join_type,
3865 join_constraint: &other.join_constraint,
3866 null_equality: &other.null_equality,
3867 };
3868 comparable_self
3869 .partial_cmp(&comparable_other)
3870 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3872 }
3873}
3874
3875#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3877pub struct Subquery {
3878 pub subquery: Arc<LogicalPlan>,
3880 pub outer_ref_columns: Vec<Expr>,
3882 pub spans: Spans,
3884}
3885
3886impl Normalizeable for Subquery {
3887 fn can_normalize(&self) -> bool {
3888 false
3889 }
3890}
3891
3892impl NormalizeEq for Subquery {
3893 fn normalize_eq(&self, other: &Self) -> bool {
3894 *self.subquery == *other.subquery
3896 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3897 && self
3898 .outer_ref_columns
3899 .iter()
3900 .zip(other.outer_ref_columns.iter())
3901 .all(|(a, b)| a.normalize_eq(b))
3902 }
3903}
3904
3905impl Subquery {
3906 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3907 match plan {
3908 Expr::ScalarSubquery(it) => Ok(it),
3909 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3910 _ => plan_err!("Could not coerce into ScalarSubquery!"),
3911 }
3912 }
3913
3914 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3915 Subquery {
3916 subquery: plan,
3917 outer_ref_columns: self.outer_ref_columns.clone(),
3918 spans: Spans::new(),
3919 }
3920 }
3921}
3922
3923impl Debug for Subquery {
3924 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3925 write!(f, "<subquery>")
3926 }
3927}
3928
3929#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3935pub enum Partitioning {
3936 RoundRobinBatch(usize),
3938 Hash(Vec<Expr>, usize),
3941 DistributeBy(Vec<Expr>),
3943}
3944
3945#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
3965pub struct ColumnUnnestList {
3966 pub output_column: Column,
3967 pub depth: usize,
3968}
3969
3970impl Display for ColumnUnnestList {
3971 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3972 write!(f, "{}|depth={}", self.output_column, self.depth)
3973 }
3974}
3975
3976#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3979pub struct Unnest {
3980 pub input: Arc<LogicalPlan>,
3982 pub exec_columns: Vec<Column>,
3984 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
3987 pub struct_type_columns: Vec<usize>,
3990 pub dependency_indices: Vec<usize>,
3993 pub schema: DFSchemaRef,
3995 pub options: UnnestOptions,
3997}
3998
3999impl PartialOrd for Unnest {
4001 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4002 #[derive(PartialEq, PartialOrd)]
4003 struct ComparableUnnest<'a> {
4004 pub input: &'a Arc<LogicalPlan>,
4006 pub exec_columns: &'a Vec<Column>,
4008 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4011 pub struct_type_columns: &'a Vec<usize>,
4014 pub dependency_indices: &'a Vec<usize>,
4017 pub options: &'a UnnestOptions,
4019 }
4020 let comparable_self = ComparableUnnest {
4021 input: &self.input,
4022 exec_columns: &self.exec_columns,
4023 list_type_columns: &self.list_type_columns,
4024 struct_type_columns: &self.struct_type_columns,
4025 dependency_indices: &self.dependency_indices,
4026 options: &self.options,
4027 };
4028 let comparable_other = ComparableUnnest {
4029 input: &other.input,
4030 exec_columns: &other.exec_columns,
4031 list_type_columns: &other.list_type_columns,
4032 struct_type_columns: &other.struct_type_columns,
4033 dependency_indices: &other.dependency_indices,
4034 options: &other.options,
4035 };
4036 comparable_self
4037 .partial_cmp(&comparable_other)
4038 .filter(|cmp| *cmp != Ordering::Equal || self == other)
4040 }
4041}
4042
4043impl Unnest {
4044 pub fn try_new(
4045 input: Arc<LogicalPlan>,
4046 exec_columns: Vec<Column>,
4047 options: UnnestOptions,
4048 ) -> Result<Self> {
4049 if exec_columns.is_empty() {
4050 return plan_err!("unnest plan requires at least 1 column to unnest");
4051 }
4052
4053 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4054 let mut struct_columns = vec![];
4055 let indices_to_unnest = exec_columns
4056 .iter()
4057 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4058 .collect::<Result<HashMap<usize, &Column>>>()?;
4059
4060 let input_schema = input.schema();
4061
4062 let mut dependency_indices = vec![];
4063 let fields = input_schema
4079 .iter()
4080 .enumerate()
4081 .map(|(index, (original_qualifier, original_field))| {
4082 match indices_to_unnest.get(&index) {
4083 Some(column_to_unnest) => {
4084 let recursions_on_column = options
4085 .recursions
4086 .iter()
4087 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4088 .collect::<Vec<_>>();
4089 let mut transformed_columns = recursions_on_column
4090 .iter()
4091 .map(|r| {
4092 list_columns.push((
4093 index,
4094 ColumnUnnestList {
4095 output_column: r.output_column.clone(),
4096 depth: r.depth,
4097 },
4098 ));
4099 Ok(get_unnested_columns(
4100 &r.output_column.name,
4101 original_field.data_type(),
4102 r.depth,
4103 )?
4104 .into_iter()
4105 .next()
4106 .unwrap()) })
4108 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4109 if transformed_columns.is_empty() {
4110 transformed_columns = get_unnested_columns(
4111 &column_to_unnest.name,
4112 original_field.data_type(),
4113 1,
4114 )?;
4115 match original_field.data_type() {
4116 DataType::Struct(_) => {
4117 struct_columns.push(index);
4118 }
4119 DataType::List(_)
4120 | DataType::FixedSizeList(_, _)
4121 | DataType::LargeList(_) => {
4122 list_columns.push((
4123 index,
4124 ColumnUnnestList {
4125 output_column: Column::from_name(
4126 &column_to_unnest.name,
4127 ),
4128 depth: 1,
4129 },
4130 ));
4131 }
4132 _ => {}
4133 };
4134 }
4135
4136 dependency_indices.extend(std::iter::repeat_n(
4138 index,
4139 transformed_columns.len(),
4140 ));
4141 Ok(transformed_columns
4142 .iter()
4143 .map(|(col, field)| {
4144 (col.relation.to_owned(), field.to_owned())
4145 })
4146 .collect())
4147 }
4148 None => {
4149 dependency_indices.push(index);
4150 Ok(vec![(
4151 original_qualifier.cloned(),
4152 Arc::clone(original_field),
4153 )])
4154 }
4155 }
4156 })
4157 .collect::<Result<Vec<_>>>()?
4158 .into_iter()
4159 .flatten()
4160 .collect::<Vec<_>>();
4161
4162 let metadata = input_schema.metadata().clone();
4163 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4164 let deps = input_schema.functional_dependencies().clone();
4166 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4167
4168 Ok(Unnest {
4169 input,
4170 exec_columns,
4171 list_type_columns: list_columns,
4172 struct_type_columns: struct_columns,
4173 dependency_indices,
4174 schema,
4175 options,
4176 })
4177 }
4178}
4179
4180fn get_unnested_columns(
4189 col_name: &String,
4190 data_type: &DataType,
4191 depth: usize,
4192) -> Result<Vec<(Column, Arc<Field>)>> {
4193 let mut qualified_columns = Vec::with_capacity(1);
4194
4195 match data_type {
4196 DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4197 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4198 let new_field = Arc::new(Field::new(
4199 col_name, data_type,
4200 true,
4203 ));
4204 let column = Column::from_name(col_name);
4205 qualified_columns.push((column, new_field));
4207 }
4208 DataType::Struct(fields) => {
4209 qualified_columns.extend(fields.iter().map(|f| {
4210 let new_name = format!("{}.{}", col_name, f.name());
4211 let column = Column::from_name(&new_name);
4212 let new_field = f.as_ref().clone().with_name(new_name);
4213 (column, Arc::new(new_field))
4215 }))
4216 }
4217 _ => {
4218 return internal_err!("trying to unnest on invalid data type {data_type}");
4219 }
4220 };
4221 Ok(qualified_columns)
4222}
4223
4224fn get_unnested_list_datatype_recursive(
4227 data_type: &DataType,
4228 depth: usize,
4229) -> Result<DataType> {
4230 match data_type {
4231 DataType::List(field)
4232 | DataType::FixedSizeList(field, _)
4233 | DataType::LargeList(field) => {
4234 if depth == 1 {
4235 return Ok(field.data_type().clone());
4236 }
4237 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4238 }
4239 _ => {}
4240 };
4241
4242 internal_err!("trying to unnest on invalid data type {data_type}")
4243}
4244
4245#[cfg(test)]
4246mod tests {
4247 use super::*;
4248 use crate::builder::LogicalTableSource;
4249 use crate::logical_plan::table_scan;
4250 use crate::test::function_stub::{count, count_udaf};
4251 use crate::{
4252 binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
4253 GroupingSet,
4254 };
4255 use datafusion_common::metadata::ScalarAndMetadata;
4256 use datafusion_common::tree_node::{
4257 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4258 };
4259 use datafusion_common::{not_impl_err, Constraint, ScalarValue};
4260 use insta::{assert_debug_snapshot, assert_snapshot};
4261 use std::hash::DefaultHasher;
4262
4263 fn employee_schema() -> Schema {
4264 Schema::new(vec![
4265 Field::new("id", DataType::Int32, false),
4266 Field::new("first_name", DataType::Utf8, false),
4267 Field::new("last_name", DataType::Utf8, false),
4268 Field::new("state", DataType::Utf8, false),
4269 Field::new("salary", DataType::Int32, false),
4270 ])
4271 }
4272
4273 fn display_plan() -> Result<LogicalPlan> {
4274 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4275 .build()?;
4276
4277 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4278 .filter(in_subquery(col("state"), Arc::new(plan1)))?
4279 .project(vec![col("id")])?
4280 .build()
4281 }
4282
4283 #[test]
4284 fn test_display_indent() -> Result<()> {
4285 let plan = display_plan()?;
4286
4287 assert_snapshot!(plan.display_indent(), @r"
4288 Projection: employee_csv.id
4289 Filter: employee_csv.state IN (<subquery>)
4290 Subquery:
4291 TableScan: employee_csv projection=[state]
4292 TableScan: employee_csv projection=[id, state]
4293 ");
4294 Ok(())
4295 }
4296
4297 #[test]
4298 fn test_display_indent_schema() -> Result<()> {
4299 let plan = display_plan()?;
4300
4301 assert_snapshot!(plan.display_indent_schema(), @r"
4302 Projection: employee_csv.id [id:Int32]
4303 Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4304 Subquery: [state:Utf8]
4305 TableScan: employee_csv projection=[state] [state:Utf8]
4306 TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4307 ");
4308 Ok(())
4309 }
4310
4311 #[test]
4312 fn test_display_subquery_alias() -> Result<()> {
4313 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4314 .build()?;
4315 let plan1 = Arc::new(plan1);
4316
4317 let plan =
4318 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4319 .project(vec![col("id"), exists(plan1).alias("exists")])?
4320 .build();
4321
4322 assert_snapshot!(plan?.display_indent(), @r"
4323 Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4324 Subquery:
4325 TableScan: employee_csv projection=[state]
4326 TableScan: employee_csv projection=[id, state]
4327 ");
4328 Ok(())
4329 }
4330
4331 #[test]
4332 fn test_display_graphviz() -> Result<()> {
4333 let plan = display_plan()?;
4334
4335 assert_snapshot!(plan.display_graphviz(), @r#"
4338 // Begin DataFusion GraphViz Plan,
4339 // display it online here: https://dreampuf.github.io/GraphvizOnline
4340
4341 digraph {
4342 subgraph cluster_1
4343 {
4344 graph[label="LogicalPlan"]
4345 2[shape=box label="Projection: employee_csv.id"]
4346 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4347 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4348 4[shape=box label="Subquery:"]
4349 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4350 5[shape=box label="TableScan: employee_csv projection=[state]"]
4351 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4352 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4353 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4354 }
4355 subgraph cluster_7
4356 {
4357 graph[label="Detailed LogicalPlan"]
4358 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4359 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4360 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4361 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4362 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4363 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4364 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4365 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4366 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4367 }
4368 }
4369 // End DataFusion GraphViz Plan
4370 "#);
4371 Ok(())
4372 }
4373
4374 #[test]
4375 fn test_display_pg_json() -> Result<()> {
4376 let plan = display_plan()?;
4377
4378 assert_snapshot!(plan.display_pg_json(), @r#"
4379 [
4380 {
4381 "Plan": {
4382 "Expressions": [
4383 "employee_csv.id"
4384 ],
4385 "Node Type": "Projection",
4386 "Output": [
4387 "id"
4388 ],
4389 "Plans": [
4390 {
4391 "Condition": "employee_csv.state IN (<subquery>)",
4392 "Node Type": "Filter",
4393 "Output": [
4394 "id",
4395 "state"
4396 ],
4397 "Plans": [
4398 {
4399 "Node Type": "Subquery",
4400 "Output": [
4401 "state"
4402 ],
4403 "Plans": [
4404 {
4405 "Node Type": "TableScan",
4406 "Output": [
4407 "state"
4408 ],
4409 "Plans": [],
4410 "Relation Name": "employee_csv"
4411 }
4412 ]
4413 },
4414 {
4415 "Node Type": "TableScan",
4416 "Output": [
4417 "id",
4418 "state"
4419 ],
4420 "Plans": [],
4421 "Relation Name": "employee_csv"
4422 }
4423 ]
4424 }
4425 ]
4426 }
4427 }
4428 ]
4429 "#);
4430 Ok(())
4431 }
4432
4433 #[derive(Debug, Default)]
4435 struct OkVisitor {
4436 strings: Vec<String>,
4437 }
4438
4439 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4440 type Node = LogicalPlan;
4441
4442 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4443 let s = match plan {
4444 LogicalPlan::Projection { .. } => "pre_visit Projection",
4445 LogicalPlan::Filter { .. } => "pre_visit Filter",
4446 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4447 _ => {
4448 return not_impl_err!("unknown plan type");
4449 }
4450 };
4451
4452 self.strings.push(s.into());
4453 Ok(TreeNodeRecursion::Continue)
4454 }
4455
4456 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4457 let s = match plan {
4458 LogicalPlan::Projection { .. } => "post_visit Projection",
4459 LogicalPlan::Filter { .. } => "post_visit Filter",
4460 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4461 _ => {
4462 return not_impl_err!("unknown plan type");
4463 }
4464 };
4465
4466 self.strings.push(s.into());
4467 Ok(TreeNodeRecursion::Continue)
4468 }
4469 }
4470
4471 #[test]
4472 fn visit_order() {
4473 let mut visitor = OkVisitor::default();
4474 let plan = test_plan();
4475 let res = plan.visit_with_subqueries(&mut visitor);
4476 assert!(res.is_ok());
4477
4478 assert_debug_snapshot!(visitor.strings, @r#"
4479 [
4480 "pre_visit Projection",
4481 "pre_visit Filter",
4482 "pre_visit TableScan",
4483 "post_visit TableScan",
4484 "post_visit Filter",
4485 "post_visit Projection",
4486 ]
4487 "#);
4488 }
4489
4490 #[derive(Debug, Default)]
4491 struct OptionalCounter {
4493 val: Option<usize>,
4494 }
4495
4496 impl OptionalCounter {
4497 fn new(val: usize) -> Self {
4498 Self { val: Some(val) }
4499 }
4500 fn dec(&mut self) -> bool {
4502 if Some(0) == self.val {
4503 true
4504 } else {
4505 self.val = self.val.take().map(|i| i - 1);
4506 false
4507 }
4508 }
4509 }
4510
4511 #[derive(Debug, Default)]
4512 struct StoppingVisitor {
4514 inner: OkVisitor,
4515 return_false_from_pre_in: OptionalCounter,
4517 return_false_from_post_in: OptionalCounter,
4519 }
4520
4521 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4522 type Node = LogicalPlan;
4523
4524 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4525 if self.return_false_from_pre_in.dec() {
4526 return Ok(TreeNodeRecursion::Stop);
4527 }
4528 self.inner.f_down(plan)?;
4529
4530 Ok(TreeNodeRecursion::Continue)
4531 }
4532
4533 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4534 if self.return_false_from_post_in.dec() {
4535 return Ok(TreeNodeRecursion::Stop);
4536 }
4537
4538 self.inner.f_up(plan)
4539 }
4540 }
4541
4542 #[test]
4544 fn early_stopping_pre_visit() {
4545 let mut visitor = StoppingVisitor {
4546 return_false_from_pre_in: OptionalCounter::new(2),
4547 ..Default::default()
4548 };
4549 let plan = test_plan();
4550 let res = plan.visit_with_subqueries(&mut visitor);
4551 assert!(res.is_ok());
4552
4553 assert_debug_snapshot!(
4554 visitor.inner.strings,
4555 @r#"
4556 [
4557 "pre_visit Projection",
4558 "pre_visit Filter",
4559 ]
4560 "#
4561 );
4562 }
4563
4564 #[test]
4565 fn early_stopping_post_visit() {
4566 let mut visitor = StoppingVisitor {
4567 return_false_from_post_in: OptionalCounter::new(1),
4568 ..Default::default()
4569 };
4570 let plan = test_plan();
4571 let res = plan.visit_with_subqueries(&mut visitor);
4572 assert!(res.is_ok());
4573
4574 assert_debug_snapshot!(
4575 visitor.inner.strings,
4576 @r#"
4577 [
4578 "pre_visit Projection",
4579 "pre_visit Filter",
4580 "pre_visit TableScan",
4581 "post_visit TableScan",
4582 ]
4583 "#
4584 );
4585 }
4586
4587 #[derive(Debug, Default)]
4588 struct ErrorVisitor {
4590 inner: OkVisitor,
4591 return_error_from_pre_in: OptionalCounter,
4593 return_error_from_post_in: OptionalCounter,
4595 }
4596
4597 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4598 type Node = LogicalPlan;
4599
4600 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4601 if self.return_error_from_pre_in.dec() {
4602 return not_impl_err!("Error in pre_visit");
4603 }
4604
4605 self.inner.f_down(plan)
4606 }
4607
4608 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4609 if self.return_error_from_post_in.dec() {
4610 return not_impl_err!("Error in post_visit");
4611 }
4612
4613 self.inner.f_up(plan)
4614 }
4615 }
4616
4617 #[test]
4618 fn error_pre_visit() {
4619 let mut visitor = ErrorVisitor {
4620 return_error_from_pre_in: OptionalCounter::new(2),
4621 ..Default::default()
4622 };
4623 let plan = test_plan();
4624 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4625 assert_snapshot!(
4626 res.strip_backtrace(),
4627 @"This feature is not implemented: Error in pre_visit"
4628 );
4629 assert_debug_snapshot!(
4630 visitor.inner.strings,
4631 @r#"
4632 [
4633 "pre_visit Projection",
4634 "pre_visit Filter",
4635 ]
4636 "#
4637 );
4638 }
4639
4640 #[test]
4641 fn error_post_visit() {
4642 let mut visitor = ErrorVisitor {
4643 return_error_from_post_in: OptionalCounter::new(1),
4644 ..Default::default()
4645 };
4646 let plan = test_plan();
4647 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4648 assert_snapshot!(
4649 res.strip_backtrace(),
4650 @"This feature is not implemented: Error in post_visit"
4651 );
4652 assert_debug_snapshot!(
4653 visitor.inner.strings,
4654 @r#"
4655 [
4656 "pre_visit Projection",
4657 "pre_visit Filter",
4658 "pre_visit TableScan",
4659 "post_visit TableScan",
4660 ]
4661 "#
4662 );
4663 }
4664
4665 #[test]
4666 fn test_partial_eq_hash_and_partial_ord() {
4667 let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4668 produce_one_row: true,
4669 schema: Arc::new(DFSchema::empty()),
4670 }));
4671
4672 let count_window_function = |schema| {
4673 Window::try_new_with_schema(
4674 vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4675 WindowFunctionDefinition::AggregateUDF(count_udaf()),
4676 vec![],
4677 )))],
4678 Arc::clone(&empty_values),
4679 Arc::new(schema),
4680 )
4681 .unwrap()
4682 };
4683
4684 let schema_without_metadata = || {
4685 DFSchema::from_unqualified_fields(
4686 vec![Field::new("count", DataType::Int64, false)].into(),
4687 HashMap::new(),
4688 )
4689 .unwrap()
4690 };
4691
4692 let schema_with_metadata = || {
4693 DFSchema::from_unqualified_fields(
4694 vec![Field::new("count", DataType::Int64, false)].into(),
4695 [("key".to_string(), "value".to_string())].into(),
4696 )
4697 .unwrap()
4698 };
4699
4700 let f = count_window_function(schema_without_metadata());
4702
4703 let f2 = count_window_function(schema_without_metadata());
4705 assert_eq!(f, f2);
4706 assert_eq!(hash(&f), hash(&f2));
4707 assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4708
4709 let o = count_window_function(schema_with_metadata());
4711 assert_ne!(f, o);
4712 assert_ne!(hash(&f), hash(&o)); assert_eq!(f.partial_cmp(&o), None);
4714 }
4715
4716 fn hash<T: Hash>(value: &T) -> u64 {
4717 let hasher = &mut DefaultHasher::new();
4718 value.hash(hasher);
4719 hasher.finish()
4720 }
4721
4722 #[test]
4723 fn projection_expr_schema_mismatch() -> Result<()> {
4724 let empty_schema = Arc::new(DFSchema::empty());
4725 let p = Projection::try_new_with_schema(
4726 vec![col("a")],
4727 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4728 produce_one_row: false,
4729 schema: Arc::clone(&empty_schema),
4730 })),
4731 empty_schema,
4732 );
4733 assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4734 Ok(())
4735 }
4736
4737 fn test_plan() -> LogicalPlan {
4738 let schema = Schema::new(vec![
4739 Field::new("id", DataType::Int32, false),
4740 Field::new("state", DataType::Utf8, false),
4741 ]);
4742
4743 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4744 .unwrap()
4745 .filter(col("state").eq(lit("CO")))
4746 .unwrap()
4747 .project(vec![col("id")])
4748 .unwrap()
4749 .build()
4750 .unwrap()
4751 }
4752
4753 #[test]
4754 fn test_replace_invalid_placeholder() {
4755 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4757
4758 let plan = table_scan(TableReference::none(), &schema, None)
4759 .unwrap()
4760 .filter(col("id").eq(placeholder("")))
4761 .unwrap()
4762 .build()
4763 .unwrap();
4764
4765 let param_values = vec![ScalarValue::Int32(Some(42))];
4766 plan.replace_params_with_values(¶m_values.clone().into())
4767 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4768
4769 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4771
4772 let plan = table_scan(TableReference::none(), &schema, None)
4773 .unwrap()
4774 .filter(col("id").eq(placeholder("$0")))
4775 .unwrap()
4776 .build()
4777 .unwrap();
4778
4779 plan.replace_params_with_values(¶m_values.clone().into())
4780 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4781
4782 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4784
4785 let plan = table_scan(TableReference::none(), &schema, None)
4786 .unwrap()
4787 .filter(col("id").eq(placeholder("$00")))
4788 .unwrap()
4789 .build()
4790 .unwrap();
4791
4792 plan.replace_params_with_values(¶m_values.into())
4793 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4794 }
4795
4796 #[test]
4797 fn test_replace_placeholder_mismatched_metadata() {
4798 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4799
4800 let plan = table_scan(TableReference::none(), &schema, None)
4802 .unwrap()
4803 .filter(col("id").eq(placeholder("$1")))
4804 .unwrap()
4805 .build()
4806 .unwrap();
4807 let prepared_builder = LogicalPlanBuilder::new(plan)
4808 .prepare(
4809 "".to_string(),
4810 vec![Field::new("", DataType::Int32, true).into()],
4811 )
4812 .unwrap();
4813
4814 let mut scalar_meta = HashMap::new();
4816 scalar_meta.insert("some_key".to_string(), "some_value".to_string());
4817 let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
4818 ScalarValue::Int32(Some(42)),
4819 Some(scalar_meta.into()),
4820 )]);
4821 prepared_builder
4822 .plan()
4823 .clone()
4824 .with_param_values(param_values)
4825 .expect_err("prepared field metadata mismatch unexpectedly succeeded");
4826 }
4827
4828 #[test]
4829 fn test_nullable_schema_after_grouping_set() {
4830 let schema = Schema::new(vec![
4831 Field::new("foo", DataType::Int32, false),
4832 Field::new("bar", DataType::Int32, false),
4833 ]);
4834
4835 let plan = table_scan(TableReference::none(), &schema, None)
4836 .unwrap()
4837 .aggregate(
4838 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4839 vec![col("foo")],
4840 vec![col("bar")],
4841 ]))],
4842 vec![count(lit(true))],
4843 )
4844 .unwrap()
4845 .build()
4846 .unwrap();
4847
4848 let output_schema = plan.schema();
4849
4850 assert!(output_schema
4851 .field_with_name(None, "foo")
4852 .unwrap()
4853 .is_nullable(),);
4854 assert!(output_schema
4855 .field_with_name(None, "bar")
4856 .unwrap()
4857 .is_nullable());
4858 }
4859
4860 #[test]
4861 fn test_filter_is_scalar() {
4862 let schema =
4864 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4865
4866 let source = Arc::new(LogicalTableSource::new(schema));
4867 let schema = Arc::new(
4868 DFSchema::try_from_qualified_schema(
4869 TableReference::bare("tab"),
4870 &source.schema(),
4871 )
4872 .unwrap(),
4873 );
4874 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4875 table_name: TableReference::bare("tab"),
4876 source: Arc::clone(&source) as Arc<dyn TableSource>,
4877 projection: None,
4878 projected_schema: Arc::clone(&schema),
4879 filters: vec![],
4880 fetch: None,
4881 }));
4882 let col = schema.field_names()[0].clone();
4883
4884 let filter = Filter::try_new(
4885 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4886 scan,
4887 )
4888 .unwrap();
4889 assert!(!filter.is_scalar());
4890 let unique_schema = Arc::new(
4891 schema
4892 .as_ref()
4893 .clone()
4894 .with_functional_dependencies(
4895 FunctionalDependencies::new_from_constraints(
4896 Some(&Constraints::new_unverified(vec![Constraint::Unique(
4897 vec![0],
4898 )])),
4899 1,
4900 ),
4901 )
4902 .unwrap(),
4903 );
4904 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4905 table_name: TableReference::bare("tab"),
4906 source,
4907 projection: None,
4908 projected_schema: Arc::clone(&unique_schema),
4909 filters: vec![],
4910 fetch: None,
4911 }));
4912 let col = schema.field_names()[0].clone();
4913
4914 let filter =
4915 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4916 assert!(filter.is_scalar());
4917 }
4918
4919 #[test]
4920 fn test_transform_explain() {
4921 let schema = Schema::new(vec![
4922 Field::new("foo", DataType::Int32, false),
4923 Field::new("bar", DataType::Int32, false),
4924 ]);
4925
4926 let plan = table_scan(TableReference::none(), &schema, None)
4927 .unwrap()
4928 .explain(false, false)
4929 .unwrap()
4930 .build()
4931 .unwrap();
4932
4933 let external_filter = col("foo").eq(lit(true));
4934
4935 let plan = plan
4938 .transform(|plan| match plan {
4939 LogicalPlan::TableScan(table) => {
4940 let filter = Filter::try_new(
4941 external_filter.clone(),
4942 Arc::new(LogicalPlan::TableScan(table)),
4943 )
4944 .unwrap();
4945 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4946 }
4947 x => Ok(Transformed::no(x)),
4948 })
4949 .data()
4950 .unwrap();
4951
4952 let actual = format!("{}", plan.display_indent());
4953 assert_snapshot!(actual, @r"
4954 Explain
4955 Filter: foo = Boolean(true)
4956 TableScan: ?table?
4957 ")
4958 }
4959
4960 #[test]
4961 fn test_plan_partial_ord() {
4962 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
4963 produce_one_row: false,
4964 schema: Arc::new(DFSchema::empty()),
4965 });
4966
4967 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
4968 schema: Arc::new(Schema::new(vec![Field::new(
4969 "foo",
4970 DataType::Int32,
4971 false,
4972 )])),
4973 output_schema: DFSchemaRef::new(DFSchema::empty()),
4974 });
4975
4976 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
4977 schema: Arc::new(Schema::new(vec![Field::new(
4978 "foo",
4979 DataType::Int32,
4980 false,
4981 )])),
4982 output_schema: DFSchemaRef::new(DFSchema::empty()),
4983 });
4984
4985 assert_eq!(
4986 empty_relation.partial_cmp(&describe_table),
4987 Some(Ordering::Less)
4988 );
4989 assert_eq!(
4990 describe_table.partial_cmp(&empty_relation),
4991 Some(Ordering::Greater)
4992 );
4993 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
4994 }
4995
4996 #[test]
4997 fn test_limit_with_new_children() {
4998 let input = Arc::new(LogicalPlan::Values(Values {
4999 schema: Arc::new(DFSchema::empty()),
5000 values: vec![vec![]],
5001 }));
5002 let cases = [
5003 LogicalPlan::Limit(Limit {
5004 skip: None,
5005 fetch: None,
5006 input: Arc::clone(&input),
5007 }),
5008 LogicalPlan::Limit(Limit {
5009 skip: None,
5010 fetch: Some(Box::new(Expr::Literal(
5011 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5012 None,
5013 ))),
5014 input: Arc::clone(&input),
5015 }),
5016 LogicalPlan::Limit(Limit {
5017 skip: Some(Box::new(Expr::Literal(
5018 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5019 None,
5020 ))),
5021 fetch: None,
5022 input: Arc::clone(&input),
5023 }),
5024 LogicalPlan::Limit(Limit {
5025 skip: Some(Box::new(Expr::Literal(
5026 ScalarValue::new_one(&DataType::UInt32).unwrap(),
5027 None,
5028 ))),
5029 fetch: Some(Box::new(Expr::Literal(
5030 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5031 None,
5032 ))),
5033 input,
5034 }),
5035 ];
5036
5037 for limit in cases {
5038 let new_limit = limit
5039 .with_new_exprs(
5040 limit.expressions(),
5041 limit.inputs().into_iter().cloned().collect(),
5042 )
5043 .unwrap();
5044 assert_eq!(limit, new_limit);
5045 }
5046 }
5047
5048 #[test]
5049 fn test_with_subqueries_jump() {
5050 let subquery_schema =
5055 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5056
5057 let subquery_plan =
5058 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5059 .unwrap()
5060 .filter(col("sub_id").eq(lit(0)))
5061 .unwrap()
5062 .build()
5063 .unwrap();
5064
5065 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5066
5067 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5068 .unwrap()
5069 .filter(col("id").eq(lit(0)))
5070 .unwrap()
5071 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5072 .unwrap()
5073 .build()
5074 .unwrap();
5075
5076 let mut filter_found = false;
5077 plan.apply_with_subqueries(|plan| {
5078 match plan {
5079 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5080 LogicalPlan::Filter(..) => filter_found = true,
5081 _ => {}
5082 }
5083 Ok(TreeNodeRecursion::Continue)
5084 })
5085 .unwrap();
5086 assert!(!filter_found);
5087
5088 struct ProjectJumpVisitor {
5089 filter_found: bool,
5090 }
5091
5092 impl ProjectJumpVisitor {
5093 fn new() -> Self {
5094 Self {
5095 filter_found: false,
5096 }
5097 }
5098 }
5099
5100 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5101 type Node = LogicalPlan;
5102
5103 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5104 match node {
5105 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5106 LogicalPlan::Filter(..) => self.filter_found = true,
5107 _ => {}
5108 }
5109 Ok(TreeNodeRecursion::Continue)
5110 }
5111 }
5112
5113 let mut visitor = ProjectJumpVisitor::new();
5114 plan.visit_with_subqueries(&mut visitor).unwrap();
5115 assert!(!visitor.filter_found);
5116
5117 let mut filter_found = false;
5118 plan.clone()
5119 .transform_down_with_subqueries(|plan| {
5120 match plan {
5121 LogicalPlan::Projection(..) => {
5122 return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
5123 }
5124 LogicalPlan::Filter(..) => filter_found = true,
5125 _ => {}
5126 }
5127 Ok(Transformed::no(plan))
5128 })
5129 .unwrap();
5130 assert!(!filter_found);
5131
5132 let mut filter_found = false;
5133 plan.clone()
5134 .transform_down_up_with_subqueries(
5135 |plan| {
5136 match plan {
5137 LogicalPlan::Projection(..) => {
5138 return Ok(Transformed::new(
5139 plan,
5140 false,
5141 TreeNodeRecursion::Jump,
5142 ))
5143 }
5144 LogicalPlan::Filter(..) => filter_found = true,
5145 _ => {}
5146 }
5147 Ok(Transformed::no(plan))
5148 },
5149 |plan| Ok(Transformed::no(plan)),
5150 )
5151 .unwrap();
5152 assert!(!filter_found);
5153
5154 struct ProjectJumpRewriter {
5155 filter_found: bool,
5156 }
5157
5158 impl ProjectJumpRewriter {
5159 fn new() -> Self {
5160 Self {
5161 filter_found: false,
5162 }
5163 }
5164 }
5165
5166 impl TreeNodeRewriter for ProjectJumpRewriter {
5167 type Node = LogicalPlan;
5168
5169 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5170 match node {
5171 LogicalPlan::Projection(..) => {
5172 return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
5173 }
5174 LogicalPlan::Filter(..) => self.filter_found = true,
5175 _ => {}
5176 }
5177 Ok(Transformed::no(node))
5178 }
5179 }
5180
5181 let mut rewriter = ProjectJumpRewriter::new();
5182 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5183 assert!(!rewriter.filter_found);
5184 }
5185
5186 #[test]
5187 fn test_with_unresolved_placeholders() {
5188 let field_name = "id";
5189 let placeholder_value = "$1";
5190 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5191
5192 let plan = table_scan(TableReference::none(), &schema, None)
5193 .unwrap()
5194 .filter(col(field_name).eq(placeholder(placeholder_value)))
5195 .unwrap()
5196 .build()
5197 .unwrap();
5198
5199 let params = plan.get_parameter_fields().unwrap();
5201 assert_eq!(params.len(), 1);
5202
5203 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5204 assert_eq!(parameter_type, None);
5205 }
5206
5207 #[test]
5208 fn test_join_with_new_exprs() -> Result<()> {
5209 fn create_test_join(
5210 on: Vec<(Expr, Expr)>,
5211 filter: Option<Expr>,
5212 ) -> Result<LogicalPlan> {
5213 let schema = Schema::new(vec![
5214 Field::new("a", DataType::Int32, false),
5215 Field::new("b", DataType::Int32, false),
5216 ]);
5217
5218 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5219 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5220
5221 Ok(LogicalPlan::Join(Join {
5222 left: Arc::new(
5223 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5224 ),
5225 right: Arc::new(
5226 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5227 ),
5228 on,
5229 filter,
5230 join_type: JoinType::Inner,
5231 join_constraint: JoinConstraint::On,
5232 schema: Arc::new(left_schema.join(&right_schema)?),
5233 null_equality: NullEquality::NullEqualsNothing,
5234 }))
5235 }
5236
5237 {
5238 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5239 let LogicalPlan::Join(join) = join.with_new_exprs(
5240 join.expressions(),
5241 join.inputs().into_iter().cloned().collect(),
5242 )?
5243 else {
5244 unreachable!()
5245 };
5246 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5247 assert_eq!(join.filter, None);
5248 }
5249
5250 {
5251 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5252 let LogicalPlan::Join(join) = join.with_new_exprs(
5253 join.expressions(),
5254 join.inputs().into_iter().cloned().collect(),
5255 )?
5256 else {
5257 unreachable!()
5258 };
5259 assert_eq!(join.on, vec![]);
5260 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5261 }
5262
5263 {
5264 let join = create_test_join(
5265 vec![(col("t1.a"), (col("t2.a")))],
5266 Some(col("t1.b").gt(col("t2.b"))),
5267 )?;
5268 let LogicalPlan::Join(join) = join.with_new_exprs(
5269 join.expressions(),
5270 join.inputs().into_iter().cloned().collect(),
5271 )?
5272 else {
5273 unreachable!()
5274 };
5275 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5276 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5277 }
5278
5279 {
5280 let join = create_test_join(
5281 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5282 None,
5283 )?;
5284 let LogicalPlan::Join(join) = join.with_new_exprs(
5285 vec![
5286 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5287 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5288 col("t1.b"),
5289 col("t2.b"),
5290 lit(true),
5291 ],
5292 join.inputs().into_iter().cloned().collect(),
5293 )?
5294 else {
5295 unreachable!()
5296 };
5297 assert_eq!(
5298 join.on,
5299 vec![
5300 (
5301 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5302 binary_expr(col("t2.a"), Operator::Plus, lit(2))
5303 ),
5304 (col("t1.b"), (col("t2.b")))
5305 ]
5306 );
5307 assert_eq!(join.filter, Some(lit(true)));
5308 }
5309
5310 Ok(())
5311 }
5312
5313 #[test]
5314 fn test_join_try_new() -> Result<()> {
5315 let schema = Schema::new(vec![
5316 Field::new("a", DataType::Int32, false),
5317 Field::new("b", DataType::Int32, false),
5318 ]);
5319
5320 let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5321
5322 let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5323
5324 let join_types = vec![
5325 JoinType::Inner,
5326 JoinType::Left,
5327 JoinType::Right,
5328 JoinType::Full,
5329 JoinType::LeftSemi,
5330 JoinType::LeftAnti,
5331 JoinType::RightSemi,
5332 JoinType::RightAnti,
5333 JoinType::LeftMark,
5334 ];
5335
5336 for join_type in join_types {
5337 let join = Join::try_new(
5338 Arc::new(left_scan.clone()),
5339 Arc::new(right_scan.clone()),
5340 vec![(col("t1.a"), col("t2.a"))],
5341 Some(col("t1.b").gt(col("t2.b"))),
5342 join_type,
5343 JoinConstraint::On,
5344 NullEquality::NullEqualsNothing,
5345 )?;
5346
5347 match join_type {
5348 JoinType::LeftSemi | JoinType::LeftAnti => {
5349 assert_eq!(join.schema.fields().len(), 2);
5350
5351 let fields = join.schema.fields();
5352 assert_eq!(
5353 fields[0].name(),
5354 "a",
5355 "First field should be 'a' from left table"
5356 );
5357 assert_eq!(
5358 fields[1].name(),
5359 "b",
5360 "Second field should be 'b' from left table"
5361 );
5362 }
5363 JoinType::RightSemi | JoinType::RightAnti => {
5364 assert_eq!(join.schema.fields().len(), 2);
5365
5366 let fields = join.schema.fields();
5367 assert_eq!(
5368 fields[0].name(),
5369 "a",
5370 "First field should be 'a' from right table"
5371 );
5372 assert_eq!(
5373 fields[1].name(),
5374 "b",
5375 "Second field should be 'b' from right table"
5376 );
5377 }
5378 JoinType::LeftMark => {
5379 assert_eq!(join.schema.fields().len(), 3);
5380
5381 let fields = join.schema.fields();
5382 assert_eq!(
5383 fields[0].name(),
5384 "a",
5385 "First field should be 'a' from left table"
5386 );
5387 assert_eq!(
5388 fields[1].name(),
5389 "b",
5390 "Second field should be 'b' from left table"
5391 );
5392 assert_eq!(
5393 fields[2].name(),
5394 "mark",
5395 "Third field should be the mark column"
5396 );
5397
5398 assert!(!fields[0].is_nullable());
5399 assert!(!fields[1].is_nullable());
5400 assert!(!fields[2].is_nullable());
5401 }
5402 _ => {
5403 assert_eq!(join.schema.fields().len(), 4);
5404
5405 let fields = join.schema.fields();
5406 assert_eq!(
5407 fields[0].name(),
5408 "a",
5409 "First field should be 'a' from left table"
5410 );
5411 assert_eq!(
5412 fields[1].name(),
5413 "b",
5414 "Second field should be 'b' from left table"
5415 );
5416 assert_eq!(
5417 fields[2].name(),
5418 "a",
5419 "Third field should be 'a' from right table"
5420 );
5421 assert_eq!(
5422 fields[3].name(),
5423 "b",
5424 "Fourth field should be 'b' from right table"
5425 );
5426
5427 if join_type == JoinType::Left {
5428 assert!(!fields[0].is_nullable());
5430 assert!(!fields[1].is_nullable());
5431 assert!(fields[2].is_nullable());
5433 assert!(fields[3].is_nullable());
5434 } else if join_type == JoinType::Right {
5435 assert!(fields[0].is_nullable());
5437 assert!(fields[1].is_nullable());
5438 assert!(!fields[2].is_nullable());
5440 assert!(!fields[3].is_nullable());
5441 } else if join_type == JoinType::Full {
5442 assert!(fields[0].is_nullable());
5443 assert!(fields[1].is_nullable());
5444 assert!(fields[2].is_nullable());
5445 assert!(fields[3].is_nullable());
5446 }
5447 }
5448 }
5449
5450 assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5451 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5452 assert_eq!(join.join_type, join_type);
5453 assert_eq!(join.join_constraint, JoinConstraint::On);
5454 assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5455 }
5456
5457 Ok(())
5458 }
5459
5460 #[test]
5461 fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5462 let left_schema = Schema::new(vec![
5463 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), ]);
5467
5468 let right_schema = Schema::new(vec![
5469 Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), Field::new("value", DataType::Float64, true), ]);
5473
5474 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5475
5476 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5477
5478 {
5480 let join = Join::try_new(
5483 Arc::new(left_plan.clone()),
5484 Arc::new(right_plan.clone()),
5485 vec![(col("t1.id"), col("t2.id"))],
5486 None,
5487 JoinType::Inner,
5488 JoinConstraint::Using,
5489 NullEquality::NullEqualsNothing,
5490 )?;
5491
5492 let fields = join.schema.fields();
5493
5494 assert_eq!(fields.len(), 6);
5495
5496 assert_eq!(
5497 fields[0].name(),
5498 "id",
5499 "First field should be 'id' from left table"
5500 );
5501 assert_eq!(
5502 fields[1].name(),
5503 "name",
5504 "Second field should be 'name' from left table"
5505 );
5506 assert_eq!(
5507 fields[2].name(),
5508 "value",
5509 "Third field should be 'value' from left table"
5510 );
5511 assert_eq!(
5512 fields[3].name(),
5513 "id",
5514 "Fourth field should be 'id' from right table"
5515 );
5516 assert_eq!(
5517 fields[4].name(),
5518 "category",
5519 "Fifth field should be 'category' from right table"
5520 );
5521 assert_eq!(
5522 fields[5].name(),
5523 "value",
5524 "Sixth field should be 'value' from right table"
5525 );
5526
5527 assert_eq!(join.join_constraint, JoinConstraint::Using);
5528 }
5529
5530 {
5532 let join = Join::try_new(
5534 Arc::new(left_plan.clone()),
5535 Arc::new(right_plan.clone()),
5536 vec![(col("t1.id"), col("t2.id"))], Some(col("t1.value").lt(col("t2.value"))), JoinType::Inner,
5539 JoinConstraint::On,
5540 NullEquality::NullEqualsNothing,
5541 )?;
5542
5543 let fields = join.schema.fields();
5544 assert_eq!(fields.len(), 6);
5545
5546 assert_eq!(
5547 fields[0].name(),
5548 "id",
5549 "First field should be 'id' from left table"
5550 );
5551 assert_eq!(
5552 fields[1].name(),
5553 "name",
5554 "Second field should be 'name' from left table"
5555 );
5556 assert_eq!(
5557 fields[2].name(),
5558 "value",
5559 "Third field should be 'value' from left table"
5560 );
5561 assert_eq!(
5562 fields[3].name(),
5563 "id",
5564 "Fourth field should be 'id' from right table"
5565 );
5566 assert_eq!(
5567 fields[4].name(),
5568 "category",
5569 "Fifth field should be 'category' from right table"
5570 );
5571 assert_eq!(
5572 fields[5].name(),
5573 "value",
5574 "Sixth field should be 'value' from right table"
5575 );
5576
5577 assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5578 }
5579
5580 {
5582 let join = Join::try_new(
5583 Arc::new(left_plan.clone()),
5584 Arc::new(right_plan.clone()),
5585 vec![(col("t1.id"), col("t2.id"))],
5586 None,
5587 JoinType::Inner,
5588 JoinConstraint::On,
5589 NullEquality::NullEqualsNull,
5590 )?;
5591
5592 assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5593 }
5594
5595 Ok(())
5596 }
5597
5598 #[test]
5599 fn test_join_try_new_schema_validation() -> Result<()> {
5600 let left_schema = Schema::new(vec![
5601 Field::new("id", DataType::Int32, false),
5602 Field::new("name", DataType::Utf8, false),
5603 Field::new("value", DataType::Float64, true),
5604 ]);
5605
5606 let right_schema = Schema::new(vec![
5607 Field::new("id", DataType::Int32, false),
5608 Field::new("category", DataType::Utf8, true),
5609 Field::new("code", DataType::Int16, false),
5610 ]);
5611
5612 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5613
5614 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5615
5616 let join_types = vec![
5617 JoinType::Inner,
5618 JoinType::Left,
5619 JoinType::Right,
5620 JoinType::Full,
5621 ];
5622
5623 for join_type in join_types {
5624 let join = Join::try_new(
5625 Arc::new(left_plan.clone()),
5626 Arc::new(right_plan.clone()),
5627 vec![(col("t1.id"), col("t2.id"))],
5628 Some(col("t1.value").gt(lit(5.0))),
5629 join_type,
5630 JoinConstraint::On,
5631 NullEquality::NullEqualsNothing,
5632 )?;
5633
5634 let fields = join.schema.fields();
5635 assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5636
5637 for (i, field) in fields.iter().enumerate() {
5638 let expected_nullable = match (i, &join_type) {
5639 (0, JoinType::Right | JoinType::Full) => true, (1, JoinType::Right | JoinType::Full) => true, (2, _) => true, (3, JoinType::Left | JoinType::Full) => true, (4, _) => true, (5, JoinType::Left | JoinType::Full) => true, _ => false,
5650 };
5651
5652 assert_eq!(
5653 field.is_nullable(),
5654 expected_nullable,
5655 "Field {} ({}) nullability incorrect for {:?} join",
5656 i,
5657 field.name(),
5658 join_type
5659 );
5660 }
5661 }
5662
5663 let using_join = Join::try_new(
5664 Arc::new(left_plan.clone()),
5665 Arc::new(right_plan.clone()),
5666 vec![(col("t1.id"), col("t2.id"))],
5667 None,
5668 JoinType::Inner,
5669 JoinConstraint::Using,
5670 NullEquality::NullEqualsNothing,
5671 )?;
5672
5673 assert_eq!(
5674 using_join.schema.fields().len(),
5675 6,
5676 "USING join should have all fields"
5677 );
5678 assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5679
5680 Ok(())
5681 }
5682}