1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::str::FromStr;
25use std::sync::{Arc, LazyLock};
26
27use super::dml::CopyTo;
28use super::invariants::{
29 assert_always_invariants_at_current_node, assert_executable_invariants,
30 InvariantLevel,
31};
32use super::DdlStatement;
33use crate::builder::{change_redundant_column, unnest_with_options};
34use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams};
35use crate::expr_rewriter::{
36 create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
37};
38use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
39use crate::logical_plan::extension::UserDefinedLogicalNode;
40use crate::logical_plan::{DmlStatement, Statement};
41use crate::utils::{
42 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
43 grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
44};
45use crate::{
46 build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
47 Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
48 TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
49};
50
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::cse::{NormalizeEq, Normalizeable};
53use datafusion_common::tree_node::{
54 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
55};
56use datafusion_common::{
57 aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
58 DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
59 FunctionalDependencies, ParamValues, Result, ScalarValue, Spans, TableReference,
60 UnnestOptions,
61};
62use indexmap::IndexSet;
63
64use crate::display::PgJsonVisitor;
66pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
67pub use datafusion_common::{JoinConstraint, JoinType};
68
69#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
203pub enum LogicalPlan {
204 Projection(Projection),
207 Filter(Filter),
216 Window(Window),
222 Aggregate(Aggregate),
228 Sort(Sort),
231 Join(Join),
234 Repartition(Repartition),
238 Union(Union),
242 TableScan(TableScan),
245 EmptyRelation(EmptyRelation),
249 Subquery(Subquery),
252 SubqueryAlias(SubqueryAlias),
254 Limit(Limit),
256 Statement(Statement),
258 Values(Values),
263 Explain(Explain),
266 Analyze(Analyze),
270 Extension(Extension),
273 Distinct(Distinct),
276 Dml(DmlStatement),
278 Ddl(DdlStatement),
280 Copy(CopyTo),
282 DescribeTable(DescribeTable),
285 Unnest(Unnest),
288 RecursiveQuery(RecursiveQuery),
290}
291
292impl Default for LogicalPlan {
293 fn default() -> Self {
294 LogicalPlan::EmptyRelation(EmptyRelation {
295 produce_one_row: false,
296 schema: Arc::new(DFSchema::empty()),
297 })
298 }
299}
300
301impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
302 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
303 &'a self,
304 mut f: F,
305 ) -> Result<TreeNodeRecursion> {
306 f(self)
307 }
308
309 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
310 self,
311 mut f: F,
312 ) -> Result<Transformed<Self>> {
313 f(self)
314 }
315}
316
317impl LogicalPlan {
318 pub fn schema(&self) -> &DFSchemaRef {
320 match self {
321 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
322 LogicalPlan::Values(Values { schema, .. }) => schema,
323 LogicalPlan::TableScan(TableScan {
324 projected_schema, ..
325 }) => projected_schema,
326 LogicalPlan::Projection(Projection { schema, .. }) => schema,
327 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
328 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
329 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
330 LogicalPlan::Window(Window { schema, .. }) => schema,
331 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
332 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
333 LogicalPlan::Join(Join { schema, .. }) => schema,
334 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
335 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
336 LogicalPlan::Statement(statement) => statement.schema(),
337 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
338 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
339 LogicalPlan::Explain(explain) => &explain.schema,
340 LogicalPlan::Analyze(analyze) => &analyze.schema,
341 LogicalPlan::Extension(extension) => extension.node.schema(),
342 LogicalPlan::Union(Union { schema, .. }) => schema,
343 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
344 output_schema
345 }
346 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
347 LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
348 LogicalPlan::Ddl(ddl) => ddl.schema(),
349 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
350 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
351 static_term.schema()
353 }
354 }
355 }
356
357 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
360 match self {
361 LogicalPlan::Window(_)
362 | LogicalPlan::Projection(_)
363 | LogicalPlan::Aggregate(_)
364 | LogicalPlan::Unnest(_)
365 | LogicalPlan::Join(_) => self
366 .inputs()
367 .iter()
368 .map(|input| input.schema().as_ref())
369 .collect(),
370 _ => vec![],
371 }
372 }
373
374 pub fn explain_schema() -> SchemaRef {
376 SchemaRef::new(Schema::new(vec![
377 Field::new("plan_type", DataType::Utf8, false),
378 Field::new("plan", DataType::Utf8, false),
379 ]))
380 }
381
382 pub fn describe_schema() -> Schema {
384 Schema::new(vec![
385 Field::new("column_name", DataType::Utf8, false),
386 Field::new("data_type", DataType::Utf8, false),
387 Field::new("is_nullable", DataType::Utf8, false),
388 ])
389 }
390
391 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
408 let mut exprs = vec![];
409 self.apply_expressions(|e| {
410 exprs.push(e.clone());
411 Ok(TreeNodeRecursion::Continue)
412 })
413 .unwrap();
415 exprs
416 }
417
418 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
421 let mut exprs = vec![];
422 self.apply_expressions(|e| {
423 find_out_reference_exprs(e).into_iter().for_each(|e| {
424 if !exprs.contains(&e) {
425 exprs.push(e)
426 }
427 });
428 Ok(TreeNodeRecursion::Continue)
429 })
430 .unwrap();
432 self.inputs()
433 .into_iter()
434 .flat_map(|child| child.all_out_ref_exprs())
435 .for_each(|e| {
436 if !exprs.contains(&e) {
437 exprs.push(e)
438 }
439 });
440 exprs
441 }
442
443 pub fn inputs(&self) -> Vec<&LogicalPlan> {
447 match self {
448 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
449 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
450 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
451 LogicalPlan::Window(Window { input, .. }) => vec![input],
452 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
453 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
454 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
455 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
456 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
457 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
458 LogicalPlan::Extension(extension) => extension.node.inputs(),
459 LogicalPlan::Union(Union { inputs, .. }) => {
460 inputs.iter().map(|arc| arc.as_ref()).collect()
461 }
462 LogicalPlan::Distinct(
463 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
464 ) => vec![input],
465 LogicalPlan::Explain(explain) => vec![&explain.plan],
466 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
467 LogicalPlan::Dml(write) => vec![&write.input],
468 LogicalPlan::Copy(copy) => vec![©.input],
469 LogicalPlan::Ddl(ddl) => ddl.inputs(),
470 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
471 LogicalPlan::RecursiveQuery(RecursiveQuery {
472 static_term,
473 recursive_term,
474 ..
475 }) => vec![static_term, recursive_term],
476 LogicalPlan::Statement(stmt) => stmt.inputs(),
477 LogicalPlan::TableScan { .. }
479 | LogicalPlan::EmptyRelation { .. }
480 | LogicalPlan::Values { .. }
481 | LogicalPlan::DescribeTable(_) => vec![],
482 }
483 }
484
485 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
487 let mut using_columns: Vec<HashSet<Column>> = vec![];
488
489 self.apply_with_subqueries(|plan| {
490 if let LogicalPlan::Join(Join {
491 join_constraint: JoinConstraint::Using,
492 on,
493 ..
494 }) = plan
495 {
496 let columns =
498 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
499 let Some(l) = l.get_as_join_column() else {
500 return internal_err!(
501 "Invalid join key. Expected column, found {l:?}"
502 );
503 };
504 let Some(r) = r.get_as_join_column() else {
505 return internal_err!(
506 "Invalid join key. Expected column, found {r:?}"
507 );
508 };
509 accumu.insert(l.to_owned());
510 accumu.insert(r.to_owned());
511 Result::<_, DataFusionError>::Ok(accumu)
512 })?;
513 using_columns.push(columns);
514 }
515 Ok(TreeNodeRecursion::Continue)
516 })?;
517
518 Ok(using_columns)
519 }
520
521 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
523 match self {
524 LogicalPlan::Projection(projection) => {
525 Ok(Some(projection.expr.as_slice()[0].clone()))
526 }
527 LogicalPlan::Aggregate(agg) => {
528 if agg.group_expr.is_empty() {
529 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
530 } else {
531 Ok(Some(agg.group_expr.as_slice()[0].clone()))
532 }
533 }
534 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
535 Ok(Some(select_expr[0].clone()))
536 }
537 LogicalPlan::Filter(Filter { input, .. })
538 | LogicalPlan::Distinct(Distinct::All(input))
539 | LogicalPlan::Sort(Sort { input, .. })
540 | LogicalPlan::Limit(Limit { input, .. })
541 | LogicalPlan::Repartition(Repartition { input, .. })
542 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
543 LogicalPlan::Join(Join {
544 left,
545 right,
546 join_type,
547 ..
548 }) => match join_type {
549 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
550 if left.schema().fields().is_empty() {
551 right.head_output_expr()
552 } else {
553 left.head_output_expr()
554 }
555 }
556 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
557 left.head_output_expr()
558 }
559 JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
560 },
561 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
562 static_term.head_output_expr()
563 }
564 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
565 union.schema.qualified_field(0),
566 )))),
567 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
568 table.projected_schema.qualified_field(0),
569 )))),
570 LogicalPlan::SubqueryAlias(subquery_alias) => {
571 let expr_opt = subquery_alias.input.head_output_expr()?;
572 expr_opt
573 .map(|expr| {
574 Ok(Expr::Column(create_col_from_scalar_expr(
575 &expr,
576 subquery_alias.alias.to_string(),
577 )?))
578 })
579 .map_or(Ok(None), |v| v.map(Some))
580 }
581 LogicalPlan::Subquery(_) => Ok(None),
582 LogicalPlan::EmptyRelation(_)
583 | LogicalPlan::Statement(_)
584 | LogicalPlan::Values(_)
585 | LogicalPlan::Explain(_)
586 | LogicalPlan::Analyze(_)
587 | LogicalPlan::Extension(_)
588 | LogicalPlan::Dml(_)
589 | LogicalPlan::Copy(_)
590 | LogicalPlan::Ddl(_)
591 | LogicalPlan::DescribeTable(_)
592 | LogicalPlan::Unnest(_) => Ok(None),
593 }
594 }
595
596 pub fn recompute_schema(self) -> Result<Self> {
619 match self {
620 LogicalPlan::Projection(Projection {
623 expr,
624 input,
625 schema: _,
626 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
627 LogicalPlan::Dml(_) => Ok(self),
628 LogicalPlan::Copy(_) => Ok(self),
629 LogicalPlan::Values(Values { schema, values }) => {
630 Ok(LogicalPlan::Values(Values { schema, values }))
632 }
633 LogicalPlan::Filter(Filter {
634 predicate,
635 input,
636 having,
637 }) => Filter::try_new_internal(predicate, input, having)
638 .map(LogicalPlan::Filter),
639 LogicalPlan::Repartition(_) => Ok(self),
640 LogicalPlan::Window(Window {
641 input,
642 window_expr,
643 schema: _,
644 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
645 LogicalPlan::Aggregate(Aggregate {
646 input,
647 group_expr,
648 aggr_expr,
649 schema: _,
650 }) => Aggregate::try_new(input, group_expr, aggr_expr)
651 .map(LogicalPlan::Aggregate),
652 LogicalPlan::Sort(_) => Ok(self),
653 LogicalPlan::Join(Join {
654 left,
655 right,
656 filter,
657 join_type,
658 join_constraint,
659 on,
660 schema: _,
661 null_equals_null,
662 }) => {
663 let schema =
664 build_join_schema(left.schema(), right.schema(), &join_type)?;
665
666 let new_on: Vec<_> = on
667 .into_iter()
668 .map(|equi_expr| {
669 (equi_expr.0.unalias(), equi_expr.1.unalias())
671 })
672 .collect();
673
674 Ok(LogicalPlan::Join(Join {
675 left,
676 right,
677 join_type,
678 join_constraint,
679 on: new_on,
680 filter,
681 schema: DFSchemaRef::new(schema),
682 null_equals_null,
683 }))
684 }
685 LogicalPlan::Subquery(_) => Ok(self),
686 LogicalPlan::SubqueryAlias(SubqueryAlias {
687 input,
688 alias,
689 schema: _,
690 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
691 LogicalPlan::Limit(_) => Ok(self),
692 LogicalPlan::Ddl(_) => Ok(self),
693 LogicalPlan::Extension(Extension { node }) => {
694 let expr = node.expressions();
697 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
698 Ok(LogicalPlan::Extension(Extension {
699 node: node.with_exprs_and_inputs(expr, inputs)?,
700 }))
701 }
702 LogicalPlan::Union(Union { inputs, schema }) => {
703 let first_input_schema = inputs[0].schema();
704 if schema.fields().len() == first_input_schema.fields().len() {
705 Ok(LogicalPlan::Union(Union { inputs, schema }))
707 } else {
708 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
716 }
717 }
718 LogicalPlan::Distinct(distinct) => {
719 let distinct = match distinct {
720 Distinct::All(input) => Distinct::All(input),
721 Distinct::On(DistinctOn {
722 on_expr,
723 select_expr,
724 sort_expr,
725 input,
726 schema: _,
727 }) => Distinct::On(DistinctOn::try_new(
728 on_expr,
729 select_expr,
730 sort_expr,
731 input,
732 )?),
733 };
734 Ok(LogicalPlan::Distinct(distinct))
735 }
736 LogicalPlan::RecursiveQuery(_) => Ok(self),
737 LogicalPlan::Analyze(_) => Ok(self),
738 LogicalPlan::Explain(_) => Ok(self),
739 LogicalPlan::TableScan(_) => Ok(self),
740 LogicalPlan::EmptyRelation(_) => Ok(self),
741 LogicalPlan::Statement(_) => Ok(self),
742 LogicalPlan::DescribeTable(_) => Ok(self),
743 LogicalPlan::Unnest(Unnest {
744 input,
745 exec_columns,
746 options,
747 ..
748 }) => {
749 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
751 }
752 }
753 }
754
755 pub fn with_new_exprs(
781 &self,
782 mut expr: Vec<Expr>,
783 inputs: Vec<LogicalPlan>,
784 ) -> Result<LogicalPlan> {
785 match self {
786 LogicalPlan::Projection(Projection { .. }) => {
789 let input = self.only_input(inputs)?;
790 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
791 }
792 LogicalPlan::Dml(DmlStatement {
793 table_name,
794 target,
795 op,
796 ..
797 }) => {
798 self.assert_no_expressions(expr)?;
799 let input = self.only_input(inputs)?;
800 Ok(LogicalPlan::Dml(DmlStatement::new(
801 table_name.clone(),
802 Arc::clone(target),
803 op.clone(),
804 Arc::new(input),
805 )))
806 }
807 LogicalPlan::Copy(CopyTo {
808 input: _,
809 output_url,
810 file_type,
811 options,
812 partition_by,
813 }) => {
814 self.assert_no_expressions(expr)?;
815 let input = self.only_input(inputs)?;
816 Ok(LogicalPlan::Copy(CopyTo {
817 input: Arc::new(input),
818 output_url: output_url.clone(),
819 file_type: Arc::clone(file_type),
820 options: options.clone(),
821 partition_by: partition_by.clone(),
822 }))
823 }
824 LogicalPlan::Values(Values { schema, .. }) => {
825 self.assert_no_inputs(inputs)?;
826 Ok(LogicalPlan::Values(Values {
827 schema: Arc::clone(schema),
828 values: expr
829 .chunks_exact(schema.fields().len())
830 .map(|s| s.to_vec())
831 .collect(),
832 }))
833 }
834 LogicalPlan::Filter { .. } => {
835 let predicate = self.only_expr(expr)?;
836 let input = self.only_input(inputs)?;
837
838 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
839 }
840 LogicalPlan::Repartition(Repartition {
841 partitioning_scheme,
842 ..
843 }) => match partitioning_scheme {
844 Partitioning::RoundRobinBatch(n) => {
845 self.assert_no_expressions(expr)?;
846 let input = self.only_input(inputs)?;
847 Ok(LogicalPlan::Repartition(Repartition {
848 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
849 input: Arc::new(input),
850 }))
851 }
852 Partitioning::Hash(_, n) => {
853 let input = self.only_input(inputs)?;
854 Ok(LogicalPlan::Repartition(Repartition {
855 partitioning_scheme: Partitioning::Hash(expr, *n),
856 input: Arc::new(input),
857 }))
858 }
859 Partitioning::DistributeBy(_) => {
860 let input = self.only_input(inputs)?;
861 Ok(LogicalPlan::Repartition(Repartition {
862 partitioning_scheme: Partitioning::DistributeBy(expr),
863 input: Arc::new(input),
864 }))
865 }
866 },
867 LogicalPlan::Window(Window { window_expr, .. }) => {
868 assert_eq!(window_expr.len(), expr.len());
869 let input = self.only_input(inputs)?;
870 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
871 }
872 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
873 let input = self.only_input(inputs)?;
874 let agg_expr = expr.split_off(group_expr.len());
876
877 Aggregate::try_new(Arc::new(input), expr, agg_expr)
878 .map(LogicalPlan::Aggregate)
879 }
880 LogicalPlan::Sort(Sort {
881 expr: sort_expr,
882 fetch,
883 ..
884 }) => {
885 let input = self.only_input(inputs)?;
886 Ok(LogicalPlan::Sort(Sort {
887 expr: expr
888 .into_iter()
889 .zip(sort_expr.iter())
890 .map(|(expr, sort)| sort.with_expr(expr))
891 .collect(),
892 input: Arc::new(input),
893 fetch: *fetch,
894 }))
895 }
896 LogicalPlan::Join(Join {
897 join_type,
898 join_constraint,
899 on,
900 null_equals_null,
901 ..
902 }) => {
903 let (left, right) = self.only_two_inputs(inputs)?;
904 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
905
906 let equi_expr_count = on.len() * 2;
907 assert!(expr.len() >= equi_expr_count);
908
909 let filter_expr = if expr.len() > equi_expr_count {
912 expr.pop()
913 } else {
914 None
915 };
916
917 assert_eq!(expr.len(), equi_expr_count);
920 let mut new_on = Vec::with_capacity(on.len());
921 let mut iter = expr.into_iter();
922 while let Some(left) = iter.next() {
923 let Some(right) = iter.next() else {
924 internal_err!("Expected a pair of expressions to construct the join on expression")?
925 };
926
927 new_on.push((left.unalias(), right.unalias()));
929 }
930
931 Ok(LogicalPlan::Join(Join {
932 left: Arc::new(left),
933 right: Arc::new(right),
934 join_type: *join_type,
935 join_constraint: *join_constraint,
936 on: new_on,
937 filter: filter_expr,
938 schema: DFSchemaRef::new(schema),
939 null_equals_null: *null_equals_null,
940 }))
941 }
942 LogicalPlan::Subquery(Subquery {
943 outer_ref_columns,
944 spans,
945 ..
946 }) => {
947 self.assert_no_expressions(expr)?;
948 let input = self.only_input(inputs)?;
949 let subquery = LogicalPlanBuilder::from(input).build()?;
950 Ok(LogicalPlan::Subquery(Subquery {
951 subquery: Arc::new(subquery),
952 outer_ref_columns: outer_ref_columns.clone(),
953 spans: spans.clone(),
954 }))
955 }
956 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
957 self.assert_no_expressions(expr)?;
958 let input = self.only_input(inputs)?;
959 SubqueryAlias::try_new(Arc::new(input), alias.clone())
960 .map(LogicalPlan::SubqueryAlias)
961 }
962 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
963 let old_expr_len = skip.iter().chain(fetch.iter()).count();
964 if old_expr_len != expr.len() {
965 return internal_err!(
966 "Invalid number of new Limit expressions: expected {}, got {}",
967 old_expr_len,
968 expr.len()
969 );
970 }
971 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
973 let new_skip = skip.as_ref().and_then(|_| expr.pop());
974 let input = self.only_input(inputs)?;
975 Ok(LogicalPlan::Limit(Limit {
976 skip: new_skip.map(Box::new),
977 fetch: new_fetch.map(Box::new),
978 input: Arc::new(input),
979 }))
980 }
981 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
982 name,
983 if_not_exists,
984 or_replace,
985 column_defaults,
986 temporary,
987 ..
988 })) => {
989 self.assert_no_expressions(expr)?;
990 let input = self.only_input(inputs)?;
991 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
992 CreateMemoryTable {
993 input: Arc::new(input),
994 constraints: Constraints::empty(),
995 name: name.clone(),
996 if_not_exists: *if_not_exists,
997 or_replace: *or_replace,
998 column_defaults: column_defaults.clone(),
999 temporary: *temporary,
1000 },
1001 )))
1002 }
1003 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1004 name,
1005 or_replace,
1006 definition,
1007 temporary,
1008 ..
1009 })) => {
1010 self.assert_no_expressions(expr)?;
1011 let input = self.only_input(inputs)?;
1012 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1013 input: Arc::new(input),
1014 name: name.clone(),
1015 or_replace: *or_replace,
1016 temporary: *temporary,
1017 definition: definition.clone(),
1018 })))
1019 }
1020 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1021 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1022 })),
1023 LogicalPlan::Union(Union { schema, .. }) => {
1024 self.assert_no_expressions(expr)?;
1025 let input_schema = inputs[0].schema();
1026 let schema = if schema.fields().len() == input_schema.fields().len() {
1028 Arc::clone(schema)
1029 } else {
1030 Arc::clone(input_schema)
1031 };
1032 Ok(LogicalPlan::Union(Union {
1033 inputs: inputs.into_iter().map(Arc::new).collect(),
1034 schema,
1035 }))
1036 }
1037 LogicalPlan::Distinct(distinct) => {
1038 let distinct = match distinct {
1039 Distinct::All(_) => {
1040 self.assert_no_expressions(expr)?;
1041 let input = self.only_input(inputs)?;
1042 Distinct::All(Arc::new(input))
1043 }
1044 Distinct::On(DistinctOn {
1045 on_expr,
1046 select_expr,
1047 ..
1048 }) => {
1049 let input = self.only_input(inputs)?;
1050 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1051 let select_expr = expr.split_off(on_expr.len());
1052 assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1053 Distinct::On(DistinctOn::try_new(
1054 expr,
1055 select_expr,
1056 None, Arc::new(input),
1058 )?)
1059 }
1060 };
1061 Ok(LogicalPlan::Distinct(distinct))
1062 }
1063 LogicalPlan::RecursiveQuery(RecursiveQuery {
1064 name, is_distinct, ..
1065 }) => {
1066 self.assert_no_expressions(expr)?;
1067 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1068 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1069 name: name.clone(),
1070 static_term: Arc::new(static_term),
1071 recursive_term: Arc::new(recursive_term),
1072 is_distinct: *is_distinct,
1073 }))
1074 }
1075 LogicalPlan::Analyze(a) => {
1076 self.assert_no_expressions(expr)?;
1077 let input = self.only_input(inputs)?;
1078 Ok(LogicalPlan::Analyze(Analyze {
1079 verbose: a.verbose,
1080 schema: Arc::clone(&a.schema),
1081 input: Arc::new(input),
1082 }))
1083 }
1084 LogicalPlan::Explain(e) => {
1085 self.assert_no_expressions(expr)?;
1086 let input = self.only_input(inputs)?;
1087 Ok(LogicalPlan::Explain(Explain {
1088 verbose: e.verbose,
1089 plan: Arc::new(input),
1090 explain_format: e.explain_format.clone(),
1091 stringified_plans: e.stringified_plans.clone(),
1092 schema: Arc::clone(&e.schema),
1093 logical_optimization_succeeded: e.logical_optimization_succeeded,
1094 }))
1095 }
1096 LogicalPlan::Statement(Statement::Prepare(Prepare {
1097 name,
1098 data_types,
1099 ..
1100 })) => {
1101 self.assert_no_expressions(expr)?;
1102 let input = self.only_input(inputs)?;
1103 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1104 name: name.clone(),
1105 data_types: data_types.clone(),
1106 input: Arc::new(input),
1107 })))
1108 }
1109 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1110 self.assert_no_inputs(inputs)?;
1111 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1112 name: name.clone(),
1113 parameters: expr,
1114 })))
1115 }
1116 LogicalPlan::TableScan(ts) => {
1117 self.assert_no_inputs(inputs)?;
1118 Ok(LogicalPlan::TableScan(TableScan {
1119 filters: expr,
1120 ..ts.clone()
1121 }))
1122 }
1123 LogicalPlan::EmptyRelation(_)
1124 | LogicalPlan::Ddl(_)
1125 | LogicalPlan::Statement(_)
1126 | LogicalPlan::DescribeTable(_) => {
1127 self.assert_no_expressions(expr)?;
1129 self.assert_no_inputs(inputs)?;
1130 Ok(self.clone())
1131 }
1132 LogicalPlan::Unnest(Unnest {
1133 exec_columns: columns,
1134 options,
1135 ..
1136 }) => {
1137 self.assert_no_expressions(expr)?;
1138 let input = self.only_input(inputs)?;
1139 let new_plan =
1141 unnest_with_options(input, columns.clone(), options.clone())?;
1142 Ok(new_plan)
1143 }
1144 }
1145 }
1146
1147 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1149 match check {
1150 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1151 InvariantLevel::Executable => assert_executable_invariants(self),
1152 }
1153 }
1154
1155 #[inline]
1157 #[allow(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1159 if !expr.is_empty() {
1160 return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1161 }
1162 Ok(())
1163 }
1164
1165 #[inline]
1167 #[allow(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1169 if !inputs.is_empty() {
1170 return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1171 }
1172 Ok(())
1173 }
1174
1175 #[inline]
1177 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1178 if expr.len() != 1 {
1179 return internal_err!(
1180 "{self:?} should have exactly one expr, got {:?}",
1181 expr
1182 );
1183 }
1184 Ok(expr.remove(0))
1185 }
1186
1187 #[inline]
1189 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1190 if inputs.len() != 1 {
1191 return internal_err!(
1192 "{self:?} should have exactly one input, got {:?}",
1193 inputs
1194 );
1195 }
1196 Ok(inputs.remove(0))
1197 }
1198
1199 #[inline]
1201 fn only_two_inputs(
1202 &self,
1203 mut inputs: Vec<LogicalPlan>,
1204 ) -> Result<(LogicalPlan, LogicalPlan)> {
1205 if inputs.len() != 2 {
1206 return internal_err!(
1207 "{self:?} should have exactly two inputs, got {:?}",
1208 inputs
1209 );
1210 }
1211 let right = inputs.remove(1);
1212 let left = inputs.remove(0);
1213 Ok((left, right))
1214 }
1215
1216 pub fn with_param_values(
1270 self,
1271 param_values: impl Into<ParamValues>,
1272 ) -> Result<LogicalPlan> {
1273 let param_values = param_values.into();
1274 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1275
1276 Ok(
1278 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1279 plan_with_values
1280 {
1281 param_values.verify(&prepare_lp.data_types)?;
1282 Arc::unwrap_or_clone(prepare_lp.input)
1284 } else {
1285 plan_with_values
1286 },
1287 )
1288 }
1289
1290 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1295 match self {
1296 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1297 LogicalPlan::Filter(filter) => {
1298 if filter.is_scalar() {
1299 Some(1)
1300 } else {
1301 filter.input.max_rows()
1302 }
1303 }
1304 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1305 LogicalPlan::Aggregate(Aggregate {
1306 input, group_expr, ..
1307 }) => {
1308 if group_expr
1310 .iter()
1311 .all(|expr| matches!(expr, Expr::Literal(_)))
1312 {
1313 Some(1)
1314 } else {
1315 input.max_rows()
1316 }
1317 }
1318 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1319 match (fetch, input.max_rows()) {
1320 (Some(fetch_limit), Some(input_max)) => {
1321 Some(input_max.min(*fetch_limit))
1322 }
1323 (Some(fetch_limit), None) => Some(*fetch_limit),
1324 (None, Some(input_max)) => Some(input_max),
1325 (None, None) => None,
1326 }
1327 }
1328 LogicalPlan::Join(Join {
1329 left,
1330 right,
1331 join_type,
1332 ..
1333 }) => match join_type {
1334 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1335 JoinType::Left | JoinType::Right | JoinType::Full => {
1336 match (left.max_rows()?, right.max_rows()?, join_type) {
1337 (0, 0, _) => Some(0),
1338 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1339 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1340 (left_max, right_max, _) => Some(left_max * right_max),
1341 }
1342 }
1343 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1344 left.max_rows()
1345 }
1346 JoinType::RightSemi | JoinType::RightAnti => right.max_rows(),
1347 },
1348 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1349 LogicalPlan::Union(Union { inputs, .. }) => {
1350 inputs.iter().try_fold(0usize, |mut acc, plan| {
1351 acc += plan.max_rows()?;
1352 Some(acc)
1353 })
1354 }
1355 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1356 LogicalPlan::EmptyRelation(_) => Some(0),
1357 LogicalPlan::RecursiveQuery(_) => None,
1358 LogicalPlan::Subquery(_) => None,
1359 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1360 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1361 Ok(FetchType::Literal(s)) => s,
1362 _ => None,
1363 },
1364 LogicalPlan::Distinct(
1365 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1366 ) => input.max_rows(),
1367 LogicalPlan::Values(v) => Some(v.values.len()),
1368 LogicalPlan::Unnest(_) => None,
1369 LogicalPlan::Ddl(_)
1370 | LogicalPlan::Explain(_)
1371 | LogicalPlan::Analyze(_)
1372 | LogicalPlan::Dml(_)
1373 | LogicalPlan::Copy(_)
1374 | LogicalPlan::DescribeTable(_)
1375 | LogicalPlan::Statement(_)
1376 | LogicalPlan::Extension(_) => None,
1377 }
1378 }
1379
1380 pub fn contains_outer_reference(&self) -> bool {
1382 let mut contains = false;
1383 self.apply_expressions(|expr| {
1384 Ok(if expr.contains_outer() {
1385 contains = true;
1386 TreeNodeRecursion::Stop
1387 } else {
1388 TreeNodeRecursion::Continue
1389 })
1390 })
1391 .unwrap();
1392 contains
1393 }
1394
1395 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1403 match self {
1404 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1405 .output_expressions()?
1406 .into_iter()
1407 .zip(self.schema().columns())
1408 .collect()),
1409 LogicalPlan::Window(Window {
1410 window_expr,
1411 input,
1412 schema,
1413 }) => {
1414 let mut output_exprs = input.columnized_output_exprs()?;
1422 let input_len = input.schema().fields().len();
1423 output_exprs.extend(
1424 window_expr
1425 .iter()
1426 .zip(schema.columns().into_iter().skip(input_len)),
1427 );
1428 Ok(output_exprs)
1429 }
1430 _ => Ok(vec![]),
1431 }
1432 }
1433}
1434
1435impl LogicalPlan {
1436 pub fn replace_params_with_values(
1443 self,
1444 param_values: &ParamValues,
1445 ) -> Result<LogicalPlan> {
1446 self.transform_up_with_subqueries(|plan| {
1447 let schema = Arc::clone(plan.schema());
1448 let name_preserver = NamePreserver::new(&plan);
1449 plan.map_expressions(|e| {
1450 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1451 if !has_placeholder {
1452 Ok(Transformed::no(e))
1456 } else {
1457 let original_name = name_preserver.save(&e);
1458 let transformed_expr = e.transform_up(|e| {
1459 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1460 let value = param_values.get_placeholders_with_values(&id)?;
1461 Ok(Transformed::yes(Expr::Literal(value)))
1462 } else {
1463 Ok(Transformed::no(e))
1464 }
1465 })?;
1466 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1468 }
1469 })
1470 })
1471 .map(|res| res.data)
1472 }
1473
1474 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1476 let mut param_names = HashSet::new();
1477 self.apply_with_subqueries(|plan| {
1478 plan.apply_expressions(|expr| {
1479 expr.apply(|expr| {
1480 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1481 param_names.insert(id.clone());
1482 }
1483 Ok(TreeNodeRecursion::Continue)
1484 })
1485 })
1486 })
1487 .map(|_| param_names)
1488 }
1489
1490 pub fn get_parameter_types(
1492 &self,
1493 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1494 let mut param_types: HashMap<String, Option<DataType>> = HashMap::new();
1495
1496 self.apply_with_subqueries(|plan| {
1497 plan.apply_expressions(|expr| {
1498 expr.apply(|expr| {
1499 if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
1500 let prev = param_types.get(id);
1501 match (prev, data_type) {
1502 (Some(Some(prev)), Some(dt)) => {
1503 if prev != dt {
1504 plan_err!("Conflicting types for {id}")?;
1505 }
1506 }
1507 (_, Some(dt)) => {
1508 param_types.insert(id.clone(), Some(dt.clone()));
1509 }
1510 _ => {
1511 param_types.insert(id.clone(), None);
1512 }
1513 }
1514 }
1515 Ok(TreeNodeRecursion::Continue)
1516 })
1517 })
1518 })
1519 .map(|_| param_types)
1520 }
1521
1522 pub fn display_indent(&self) -> impl Display + '_ {
1554 struct Wrapper<'a>(&'a LogicalPlan);
1557 impl Display for Wrapper<'_> {
1558 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1559 let with_schema = false;
1560 let mut visitor = IndentVisitor::new(f, with_schema);
1561 match self.0.visit_with_subqueries(&mut visitor) {
1562 Ok(_) => Ok(()),
1563 Err(_) => Err(fmt::Error),
1564 }
1565 }
1566 }
1567 Wrapper(self)
1568 }
1569
1570 pub fn display_indent_schema(&self) -> impl Display + '_ {
1597 struct Wrapper<'a>(&'a LogicalPlan);
1600 impl Display for Wrapper<'_> {
1601 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1602 let with_schema = true;
1603 let mut visitor = IndentVisitor::new(f, with_schema);
1604 match self.0.visit_with_subqueries(&mut visitor) {
1605 Ok(_) => Ok(()),
1606 Err(_) => Err(fmt::Error),
1607 }
1608 }
1609 }
1610 Wrapper(self)
1611 }
1612
1613 pub fn display_pg_json(&self) -> impl Display + '_ {
1617 struct Wrapper<'a>(&'a LogicalPlan);
1620 impl Display for Wrapper<'_> {
1621 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1622 let mut visitor = PgJsonVisitor::new(f);
1623 visitor.with_schema(true);
1624 match self.0.visit_with_subqueries(&mut visitor) {
1625 Ok(_) => Ok(()),
1626 Err(_) => Err(fmt::Error),
1627 }
1628 }
1629 }
1630 Wrapper(self)
1631 }
1632
1633 pub fn display_graphviz(&self) -> impl Display + '_ {
1663 struct Wrapper<'a>(&'a LogicalPlan);
1666 impl Display for Wrapper<'_> {
1667 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1668 let mut visitor = GraphvizVisitor::new(f);
1669
1670 visitor.start_graph()?;
1671
1672 visitor.pre_visit_plan("LogicalPlan")?;
1673 self.0
1674 .visit_with_subqueries(&mut visitor)
1675 .map_err(|_| fmt::Error)?;
1676 visitor.post_visit_plan()?;
1677
1678 visitor.set_with_schema(true);
1679 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1680 self.0
1681 .visit_with_subqueries(&mut visitor)
1682 .map_err(|_| fmt::Error)?;
1683 visitor.post_visit_plan()?;
1684
1685 visitor.end_graph()?;
1686 Ok(())
1687 }
1688 }
1689 Wrapper(self)
1690 }
1691
1692 pub fn display(&self) -> impl Display + '_ {
1714 struct Wrapper<'a>(&'a LogicalPlan);
1717 impl Display for Wrapper<'_> {
1718 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1719 match self.0 {
1720 LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
1721 LogicalPlan::RecursiveQuery(RecursiveQuery {
1722 is_distinct, ..
1723 }) => {
1724 write!(f, "RecursiveQuery: is_distinct={}", is_distinct)
1725 }
1726 LogicalPlan::Values(Values { ref values, .. }) => {
1727 let str_values: Vec<_> = values
1728 .iter()
1729 .take(5)
1731 .map(|row| {
1732 let item = row
1733 .iter()
1734 .map(|expr| expr.to_string())
1735 .collect::<Vec<_>>()
1736 .join(", ");
1737 format!("({item})")
1738 })
1739 .collect();
1740
1741 let eclipse = if values.len() > 5 { "..." } else { "" };
1742 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1743 }
1744
1745 LogicalPlan::TableScan(TableScan {
1746 ref source,
1747 ref table_name,
1748 ref projection,
1749 ref filters,
1750 ref fetch,
1751 ..
1752 }) => {
1753 let projected_fields = match projection {
1754 Some(indices) => {
1755 let schema = source.schema();
1756 let names: Vec<&str> = indices
1757 .iter()
1758 .map(|i| schema.field(*i).name().as_str())
1759 .collect();
1760 format!(" projection=[{}]", names.join(", "))
1761 }
1762 _ => "".to_string(),
1763 };
1764
1765 write!(f, "TableScan: {table_name}{projected_fields}")?;
1766
1767 if !filters.is_empty() {
1768 let mut full_filter = vec![];
1769 let mut partial_filter = vec![];
1770 let mut unsupported_filters = vec![];
1771 let filters: Vec<&Expr> = filters.iter().collect();
1772
1773 if let Ok(results) =
1774 source.supports_filters_pushdown(&filters)
1775 {
1776 filters.iter().zip(results.iter()).for_each(
1777 |(x, res)| match res {
1778 TableProviderFilterPushDown::Exact => {
1779 full_filter.push(x)
1780 }
1781 TableProviderFilterPushDown::Inexact => {
1782 partial_filter.push(x)
1783 }
1784 TableProviderFilterPushDown::Unsupported => {
1785 unsupported_filters.push(x)
1786 }
1787 },
1788 );
1789 }
1790
1791 if !full_filter.is_empty() {
1792 write!(
1793 f,
1794 ", full_filters=[{}]",
1795 expr_vec_fmt!(full_filter)
1796 )?;
1797 };
1798 if !partial_filter.is_empty() {
1799 write!(
1800 f,
1801 ", partial_filters=[{}]",
1802 expr_vec_fmt!(partial_filter)
1803 )?;
1804 }
1805 if !unsupported_filters.is_empty() {
1806 write!(
1807 f,
1808 ", unsupported_filters=[{}]",
1809 expr_vec_fmt!(unsupported_filters)
1810 )?;
1811 }
1812 }
1813
1814 if let Some(n) = fetch {
1815 write!(f, ", fetch={n}")?;
1816 }
1817
1818 Ok(())
1819 }
1820 LogicalPlan::Projection(Projection { ref expr, .. }) => {
1821 write!(f, "Projection: ")?;
1822 for (i, expr_item) in expr.iter().enumerate() {
1823 if i > 0 {
1824 write!(f, ", ")?;
1825 }
1826 write!(f, "{expr_item}")?;
1827 }
1828 Ok(())
1829 }
1830 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1831 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1832 }
1833 LogicalPlan::Copy(CopyTo {
1834 input: _,
1835 output_url,
1836 file_type,
1837 options,
1838 ..
1839 }) => {
1840 let op_str = options
1841 .iter()
1842 .map(|(k, v)| format!("{k} {v}"))
1843 .collect::<Vec<String>>()
1844 .join(", ");
1845
1846 write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1847 }
1848 LogicalPlan::Ddl(ddl) => {
1849 write!(f, "{}", ddl.display())
1850 }
1851 LogicalPlan::Filter(Filter {
1852 predicate: ref expr,
1853 ..
1854 }) => write!(f, "Filter: {expr}"),
1855 LogicalPlan::Window(Window {
1856 ref window_expr, ..
1857 }) => {
1858 write!(
1859 f,
1860 "WindowAggr: windowExpr=[[{}]]",
1861 expr_vec_fmt!(window_expr)
1862 )
1863 }
1864 LogicalPlan::Aggregate(Aggregate {
1865 ref group_expr,
1866 ref aggr_expr,
1867 ..
1868 }) => write!(
1869 f,
1870 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1871 expr_vec_fmt!(group_expr),
1872 expr_vec_fmt!(aggr_expr)
1873 ),
1874 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1875 write!(f, "Sort: ")?;
1876 for (i, expr_item) in expr.iter().enumerate() {
1877 if i > 0 {
1878 write!(f, ", ")?;
1879 }
1880 write!(f, "{expr_item}")?;
1881 }
1882 if let Some(a) = fetch {
1883 write!(f, ", fetch={a}")?;
1884 }
1885
1886 Ok(())
1887 }
1888 LogicalPlan::Join(Join {
1889 on: ref keys,
1890 filter,
1891 join_constraint,
1892 join_type,
1893 ..
1894 }) => {
1895 let join_expr: Vec<String> =
1896 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1897 let filter_expr = filter
1898 .as_ref()
1899 .map(|expr| format!(" Filter: {expr}"))
1900 .unwrap_or_else(|| "".to_string());
1901 let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1902 "Cross".to_string()
1903 } else {
1904 join_type.to_string()
1905 };
1906 match join_constraint {
1907 JoinConstraint::On => {
1908 write!(
1909 f,
1910 "{} Join: {}{}",
1911 join_type,
1912 join_expr.join(", "),
1913 filter_expr
1914 )
1915 }
1916 JoinConstraint::Using => {
1917 write!(
1918 f,
1919 "{} Join: Using {}{}",
1920 join_type,
1921 join_expr.join(", "),
1922 filter_expr,
1923 )
1924 }
1925 }
1926 }
1927 LogicalPlan::Repartition(Repartition {
1928 partitioning_scheme,
1929 ..
1930 }) => match partitioning_scheme {
1931 Partitioning::RoundRobinBatch(n) => {
1932 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1933 }
1934 Partitioning::Hash(expr, n) => {
1935 let hash_expr: Vec<String> =
1936 expr.iter().map(|e| format!("{e}")).collect();
1937 write!(
1938 f,
1939 "Repartition: Hash({}) partition_count={}",
1940 hash_expr.join(", "),
1941 n
1942 )
1943 }
1944 Partitioning::DistributeBy(expr) => {
1945 let dist_by_expr: Vec<String> =
1946 expr.iter().map(|e| format!("{e}")).collect();
1947 write!(
1948 f,
1949 "Repartition: DistributeBy({})",
1950 dist_by_expr.join(", "),
1951 )
1952 }
1953 },
1954 LogicalPlan::Limit(limit) => {
1955 let skip_str = match limit.get_skip_type() {
1957 Ok(SkipType::Literal(n)) => n.to_string(),
1958 _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1959 };
1960 let fetch_str = match limit.get_fetch_type() {
1961 Ok(FetchType::Literal(Some(n))) => n.to_string(),
1962 Ok(FetchType::Literal(None)) => "None".to_string(),
1963 _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1964 };
1965 write!(
1966 f,
1967 "Limit: skip={}, fetch={}", skip_str,fetch_str,
1968 )
1969 }
1970 LogicalPlan::Subquery(Subquery { .. }) => {
1971 write!(f, "Subquery:")
1972 }
1973 LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
1974 write!(f, "SubqueryAlias: {alias}")
1975 }
1976 LogicalPlan::Statement(statement) => {
1977 write!(f, "{}", statement.display())
1978 }
1979 LogicalPlan::Distinct(distinct) => match distinct {
1980 Distinct::All(_) => write!(f, "Distinct:"),
1981 Distinct::On(DistinctOn {
1982 on_expr,
1983 select_expr,
1984 sort_expr,
1985 ..
1986 }) => write!(
1987 f,
1988 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
1989 expr_vec_fmt!(on_expr),
1990 expr_vec_fmt!(select_expr),
1991 if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
1992 ),
1993 },
1994 LogicalPlan::Explain { .. } => write!(f, "Explain"),
1995 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
1996 LogicalPlan::Union(_) => write!(f, "Union"),
1997 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
1998 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
1999 write!(f, "DescribeTable")
2000 }
2001 LogicalPlan::Unnest(Unnest {
2002 input: plan,
2003 list_type_columns: list_col_indices,
2004 struct_type_columns: struct_col_indices, .. }) => {
2005 let input_columns = plan.schema().columns();
2006 let list_type_columns = list_col_indices
2007 .iter()
2008 .map(|(i,unnest_info)|
2009 format!("{}|depth={}", &input_columns[*i].to_string(),
2010 unnest_info.depth))
2011 .collect::<Vec<String>>();
2012 let struct_type_columns = struct_col_indices
2013 .iter()
2014 .map(|i| &input_columns[*i])
2015 .collect::<Vec<&Column>>();
2016 write!(f, "Unnest: lists[{}] structs[{}]",
2018 expr_vec_fmt!(list_type_columns),
2019 expr_vec_fmt!(struct_type_columns))
2020 }
2021 }
2022 }
2023 }
2024 Wrapper(self)
2025 }
2026}
2027
2028impl Display for LogicalPlan {
2029 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2030 self.display_indent().fmt(f)
2031 }
2032}
2033
2034impl ToStringifiedPlan for LogicalPlan {
2035 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2036 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2037 }
2038}
2039
2040#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2042pub struct EmptyRelation {
2043 pub produce_one_row: bool,
2045 pub schema: DFSchemaRef,
2047}
2048
2049impl PartialOrd for EmptyRelation {
2051 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2052 self.produce_one_row.partial_cmp(&other.produce_one_row)
2053 }
2054}
2055
2056#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2079pub struct RecursiveQuery {
2080 pub name: String,
2082 pub static_term: Arc<LogicalPlan>,
2084 pub recursive_term: Arc<LogicalPlan>,
2087 pub is_distinct: bool,
2090}
2091
2092#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2096pub struct Values {
2097 pub schema: DFSchemaRef,
2099 pub values: Vec<Vec<Expr>>,
2101}
2102
2103impl PartialOrd for Values {
2105 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2106 self.values.partial_cmp(&other.values)
2107 }
2108}
2109
2110#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2113#[non_exhaustive]
2115pub struct Projection {
2116 pub expr: Vec<Expr>,
2118 pub input: Arc<LogicalPlan>,
2120 pub schema: DFSchemaRef,
2122}
2123
2124impl PartialOrd for Projection {
2126 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2127 match self.expr.partial_cmp(&other.expr) {
2128 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2129 cmp => cmp,
2130 }
2131 }
2132}
2133
2134impl Projection {
2135 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2137 let projection_schema = projection_schema(&input, &expr)?;
2138 Self::try_new_with_schema(expr, input, projection_schema)
2139 }
2140
2141 pub fn try_new_with_schema(
2143 expr: Vec<Expr>,
2144 input: Arc<LogicalPlan>,
2145 schema: DFSchemaRef,
2146 ) -> Result<Self> {
2147 #[expect(deprecated)]
2148 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2149 && expr.len() != schema.fields().len()
2150 {
2151 return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2152 }
2153 Ok(Self {
2154 expr,
2155 input,
2156 schema,
2157 })
2158 }
2159
2160 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2162 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2163 Self {
2164 expr,
2165 input,
2166 schema,
2167 }
2168 }
2169}
2170
2171pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2185 let metadata = input.schema().metadata().clone();
2186
2187 let schema =
2188 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2189 .with_functional_dependencies(calc_func_dependencies_for_project(
2190 exprs, input,
2191 )?)?;
2192
2193 Ok(Arc::new(schema))
2194}
2195
2196#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2198#[non_exhaustive]
2200pub struct SubqueryAlias {
2201 pub input: Arc<LogicalPlan>,
2203 pub alias: TableReference,
2205 pub schema: DFSchemaRef,
2207}
2208
2209impl SubqueryAlias {
2210 pub fn try_new(
2211 plan: Arc<LogicalPlan>,
2212 alias: impl Into<TableReference>,
2213 ) -> Result<Self> {
2214 let alias = alias.into();
2215 let fields = change_redundant_column(plan.schema().fields());
2216 let meta_data = plan.schema().as_ref().metadata().clone();
2217 let schema: Schema =
2218 DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into();
2219 let func_dependencies = plan.schema().functional_dependencies().clone();
2222 let schema = DFSchemaRef::new(
2223 DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
2224 .with_functional_dependencies(func_dependencies)?,
2225 );
2226 Ok(SubqueryAlias {
2227 input: plan,
2228 alias,
2229 schema,
2230 })
2231 }
2232}
2233
2234impl PartialOrd for SubqueryAlias {
2236 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2237 match self.input.partial_cmp(&other.input) {
2238 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2239 cmp => cmp,
2240 }
2241 }
2242}
2243
2244#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2256#[non_exhaustive]
2257pub struct Filter {
2258 pub predicate: Expr,
2260 pub input: Arc<LogicalPlan>,
2262 pub having: bool,
2264}
2265
2266impl Filter {
2267 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2272 Self::try_new_internal(predicate, input, false)
2273 }
2274
2275 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2278 Self::try_new_internal(predicate, input, true)
2279 }
2280
2281 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2282 match data_type {
2283 DataType::Boolean | DataType::Null => true,
2285 DataType::Dictionary(_, value_type) => {
2286 Filter::is_allowed_filter_type(value_type.as_ref())
2287 }
2288 _ => false,
2289 }
2290 }
2291
2292 fn try_new_internal(
2293 predicate: Expr,
2294 input: Arc<LogicalPlan>,
2295 having: bool,
2296 ) -> Result<Self> {
2297 if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2302 if !Filter::is_allowed_filter_type(&predicate_type) {
2303 return plan_err!(
2304 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2305 );
2306 }
2307 }
2308
2309 Ok(Self {
2310 predicate: predicate.unalias_nested().data,
2311 input,
2312 having,
2313 })
2314 }
2315
2316 fn is_scalar(&self) -> bool {
2332 let schema = self.input.schema();
2333
2334 let functional_dependencies = self.input.schema().functional_dependencies();
2335 let unique_keys = functional_dependencies.iter().filter(|dep| {
2336 let nullable = dep.nullable
2337 && dep
2338 .source_indices
2339 .iter()
2340 .any(|&source| schema.field(source).is_nullable());
2341 !nullable
2342 && dep.mode == Dependency::Single
2343 && dep.target_indices.len() == schema.fields().len()
2344 });
2345
2346 let exprs = split_conjunction(&self.predicate);
2347 let eq_pred_cols: HashSet<_> = exprs
2348 .iter()
2349 .filter_map(|expr| {
2350 let Expr::BinaryExpr(BinaryExpr {
2351 left,
2352 op: Operator::Eq,
2353 right,
2354 }) = expr
2355 else {
2356 return None;
2357 };
2358 if left == right {
2360 return None;
2361 }
2362
2363 match (left.as_ref(), right.as_ref()) {
2364 (Expr::Column(_), Expr::Column(_)) => None,
2365 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2366 Some(schema.index_of_column(c).unwrap())
2367 }
2368 _ => None,
2369 }
2370 })
2371 .collect();
2372
2373 for key in unique_keys {
2376 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2377 return true;
2378 }
2379 }
2380 false
2381 }
2382}
2383
2384#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2399pub struct Window {
2400 pub input: Arc<LogicalPlan>,
2402 pub window_expr: Vec<Expr>,
2404 pub schema: DFSchemaRef,
2406}
2407
2408impl Window {
2409 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2411 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2412 .schema()
2413 .iter()
2414 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2415 .collect();
2416 let input_len = fields.len();
2417 let mut window_fields = fields;
2418 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2419 window_fields.extend_from_slice(expr_fields.as_slice());
2420 let metadata = input.schema().metadata().clone();
2421
2422 let mut window_func_dependencies =
2424 input.schema().functional_dependencies().clone();
2425 window_func_dependencies.extend_target_indices(window_fields.len());
2426
2427 let mut new_dependencies = window_expr
2431 .iter()
2432 .enumerate()
2433 .filter_map(|(idx, expr)| {
2434 if let Expr::WindowFunction(WindowFunction {
2435 fun: WindowFunctionDefinition::WindowUDF(udwf),
2436 params: WindowFunctionParams { partition_by, .. },
2437 }) = expr
2438 {
2439 if udwf.name() == "row_number" && partition_by.is_empty() {
2442 return Some(idx + input_len);
2443 }
2444 }
2445 None
2446 })
2447 .map(|idx| {
2448 FunctionalDependence::new(vec![idx], vec![], false)
2449 .with_mode(Dependency::Single)
2450 })
2451 .collect::<Vec<_>>();
2452
2453 if !new_dependencies.is_empty() {
2454 for dependence in new_dependencies.iter_mut() {
2455 dependence.target_indices = (0..window_fields.len()).collect();
2456 }
2457 let new_deps = FunctionalDependencies::new(new_dependencies);
2459 window_func_dependencies.extend(new_deps);
2460 }
2461
2462 Self::try_new_with_schema(
2463 window_expr,
2464 input,
2465 Arc::new(
2466 DFSchema::new_with_metadata(window_fields, metadata)?
2467 .with_functional_dependencies(window_func_dependencies)?,
2468 ),
2469 )
2470 }
2471
2472 pub fn try_new_with_schema(
2473 window_expr: Vec<Expr>,
2474 input: Arc<LogicalPlan>,
2475 schema: DFSchemaRef,
2476 ) -> Result<Self> {
2477 if window_expr.len() != schema.fields().len() - input.schema().fields().len() {
2478 return plan_err!(
2479 "Window has mismatch between number of expressions ({}) and number of fields in schema ({})",
2480 window_expr.len(),
2481 schema.fields().len() - input.schema().fields().len()
2482 );
2483 }
2484
2485 Ok(Window {
2486 input,
2487 window_expr,
2488 schema,
2489 })
2490 }
2491}
2492
2493impl PartialOrd for Window {
2495 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2496 match self.input.partial_cmp(&other.input) {
2497 Some(Ordering::Equal) => self.window_expr.partial_cmp(&other.window_expr),
2498 cmp => cmp,
2499 }
2500 }
2501}
2502
2503#[derive(Clone)]
2505pub struct TableScan {
2506 pub table_name: TableReference,
2508 pub source: Arc<dyn TableSource>,
2510 pub projection: Option<Vec<usize>>,
2512 pub projected_schema: DFSchemaRef,
2514 pub filters: Vec<Expr>,
2516 pub fetch: Option<usize>,
2518}
2519
2520impl Debug for TableScan {
2521 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2522 f.debug_struct("TableScan")
2523 .field("table_name", &self.table_name)
2524 .field("source", &"...")
2525 .field("projection", &self.projection)
2526 .field("projected_schema", &self.projected_schema)
2527 .field("filters", &self.filters)
2528 .field("fetch", &self.fetch)
2529 .finish_non_exhaustive()
2530 }
2531}
2532
2533impl PartialEq for TableScan {
2534 fn eq(&self, other: &Self) -> bool {
2535 self.table_name == other.table_name
2536 && self.projection == other.projection
2537 && self.projected_schema == other.projected_schema
2538 && self.filters == other.filters
2539 && self.fetch == other.fetch
2540 }
2541}
2542
2543impl Eq for TableScan {}
2544
2545impl PartialOrd for TableScan {
2548 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2549 #[derive(PartialEq, PartialOrd)]
2550 struct ComparableTableScan<'a> {
2551 pub table_name: &'a TableReference,
2553 pub projection: &'a Option<Vec<usize>>,
2555 pub filters: &'a Vec<Expr>,
2557 pub fetch: &'a Option<usize>,
2559 }
2560 let comparable_self = ComparableTableScan {
2561 table_name: &self.table_name,
2562 projection: &self.projection,
2563 filters: &self.filters,
2564 fetch: &self.fetch,
2565 };
2566 let comparable_other = ComparableTableScan {
2567 table_name: &other.table_name,
2568 projection: &other.projection,
2569 filters: &other.filters,
2570 fetch: &other.fetch,
2571 };
2572 comparable_self.partial_cmp(&comparable_other)
2573 }
2574}
2575
2576impl Hash for TableScan {
2577 fn hash<H: Hasher>(&self, state: &mut H) {
2578 self.table_name.hash(state);
2579 self.projection.hash(state);
2580 self.projected_schema.hash(state);
2581 self.filters.hash(state);
2582 self.fetch.hash(state);
2583 }
2584}
2585
2586impl TableScan {
2587 pub fn try_new(
2590 table_name: impl Into<TableReference>,
2591 table_source: Arc<dyn TableSource>,
2592 projection: Option<Vec<usize>>,
2593 filters: Vec<Expr>,
2594 fetch: Option<usize>,
2595 ) -> Result<Self> {
2596 let table_name = table_name.into();
2597
2598 if table_name.table().is_empty() {
2599 return plan_err!("table_name cannot be empty");
2600 }
2601 let schema = table_source.schema();
2602 let func_dependencies = FunctionalDependencies::new_from_constraints(
2603 table_source.constraints(),
2604 schema.fields.len(),
2605 );
2606 let projected_schema = projection
2607 .as_ref()
2608 .map(|p| {
2609 let projected_func_dependencies =
2610 func_dependencies.project_functional_dependencies(p, p.len());
2611
2612 let df_schema = DFSchema::new_with_metadata(
2613 p.iter()
2614 .map(|i| {
2615 (Some(table_name.clone()), Arc::new(schema.field(*i).clone()))
2616 })
2617 .collect(),
2618 schema.metadata.clone(),
2619 )?;
2620 df_schema.with_functional_dependencies(projected_func_dependencies)
2621 })
2622 .unwrap_or_else(|| {
2623 let df_schema =
2624 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2625 df_schema.with_functional_dependencies(func_dependencies)
2626 })?;
2627 let projected_schema = Arc::new(projected_schema);
2628
2629 Ok(Self {
2630 table_name,
2631 source: table_source,
2632 projection,
2633 projected_schema,
2634 filters,
2635 fetch,
2636 })
2637 }
2638}
2639
2640#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2642pub struct Repartition {
2643 pub input: Arc<LogicalPlan>,
2645 pub partitioning_scheme: Partitioning,
2647}
2648
2649#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2651pub struct Union {
2652 pub inputs: Vec<Arc<LogicalPlan>>,
2654 pub schema: DFSchemaRef,
2656}
2657
2658impl Union {
2659 fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2661 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2662 Ok(Union { inputs, schema })
2663 }
2664
2665 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2670 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2671 Ok(Union { inputs, schema })
2672 }
2673
2674 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2678 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2679 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2680
2681 Ok(Union { inputs, schema })
2682 }
2683
2684 fn rewrite_inputs_from_schema(
2688 schema: &Arc<DFSchema>,
2689 inputs: Vec<Arc<LogicalPlan>>,
2690 ) -> Result<Vec<Arc<LogicalPlan>>> {
2691 let schema_width = schema.iter().count();
2692 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2693 for input in inputs {
2694 let mut expr = Vec::with_capacity(schema_width);
2698 for column in schema.columns() {
2699 if input
2700 .schema()
2701 .has_column_with_unqualified_name(column.name())
2702 {
2703 expr.push(Expr::Column(column));
2704 } else {
2705 expr.push(Expr::Literal(ScalarValue::Null).alias(column.name()));
2706 }
2707 }
2708 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2709 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2710 )));
2711 }
2712
2713 Ok(wrapped_inputs)
2714 }
2715
2716 fn derive_schema_from_inputs(
2725 inputs: &[Arc<LogicalPlan>],
2726 loose_types: bool,
2727 by_name: bool,
2728 ) -> Result<DFSchemaRef> {
2729 if inputs.len() < 2 {
2730 return plan_err!("UNION requires at least two inputs");
2731 }
2732
2733 if by_name {
2734 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2735 } else {
2736 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2737 }
2738 }
2739
2740 fn derive_schema_from_inputs_by_name(
2741 inputs: &[Arc<LogicalPlan>],
2742 loose_types: bool,
2743 ) -> Result<DFSchemaRef> {
2744 type FieldData<'a> =
2745 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2746 let mut cols: Vec<(&str, FieldData)> = Vec::new();
2747 for input in inputs.iter() {
2748 for field in input.schema().fields() {
2749 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2750 cols.iter_mut().find(|(name, _)| name == field.name())
2751 {
2752 if !loose_types && *data_type != field.data_type() {
2753 return plan_err!(
2754 "Found different types for field {}",
2755 field.name()
2756 );
2757 }
2758
2759 metadata.push(field.metadata());
2760 *is_nullable |= field.is_nullable();
2763 *occurrences += 1;
2764 } else {
2765 cols.push((
2766 field.name(),
2767 (
2768 field.data_type(),
2769 field.is_nullable(),
2770 vec![field.metadata()],
2771 1,
2772 ),
2773 ));
2774 }
2775 }
2776 }
2777
2778 let union_fields = cols
2779 .into_iter()
2780 .map(
2781 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2782 let final_is_nullable = if occurrences == inputs.len() {
2786 is_nullable
2787 } else {
2788 true
2789 };
2790
2791 let mut field =
2792 Field::new(name, data_type.clone(), final_is_nullable);
2793 field.set_metadata(intersect_maps(unmerged_metadata));
2794
2795 (None, Arc::new(field))
2796 },
2797 )
2798 .collect::<Vec<(Option<TableReference>, _)>>();
2799
2800 let union_schema_metadata =
2801 intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2802
2803 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2805 let schema = Arc::new(schema);
2806
2807 Ok(schema)
2808 }
2809
2810 fn derive_schema_from_inputs_by_position(
2811 inputs: &[Arc<LogicalPlan>],
2812 loose_types: bool,
2813 ) -> Result<DFSchemaRef> {
2814 let first_schema = inputs[0].schema();
2815 let fields_count = first_schema.fields().len();
2816 for input in inputs.iter().skip(1) {
2817 if fields_count != input.schema().fields().len() {
2818 return plan_err!(
2819 "UNION queries have different number of columns: \
2820 left has {} columns whereas right has {} columns",
2821 fields_count,
2822 input.schema().fields().len()
2823 );
2824 }
2825 }
2826
2827 let mut name_counts: HashMap<String, usize> = HashMap::new();
2828 let union_fields = (0..fields_count)
2829 .map(|i| {
2830 let fields = inputs
2831 .iter()
2832 .map(|input| input.schema().field(i))
2833 .collect::<Vec<_>>();
2834 let first_field = fields[0];
2835 let base_name = first_field.name().to_string();
2836
2837 let data_type = if loose_types {
2838 first_field.data_type()
2842 } else {
2843 fields.iter().skip(1).try_fold(
2844 first_field.data_type(),
2845 |acc, field| {
2846 if acc != field.data_type() {
2847 return plan_err!(
2848 "UNION field {i} have different type in inputs: \
2849 left has {} whereas right has {}",
2850 first_field.data_type(),
2851 field.data_type()
2852 );
2853 }
2854 Ok(acc)
2855 },
2856 )?
2857 };
2858 let nullable = fields.iter().any(|field| field.is_nullable());
2859
2860 let name = if let Some(count) = name_counts.get_mut(&base_name) {
2862 *count += 1;
2863 format!("{}_{}", base_name, count)
2864 } else {
2865 name_counts.insert(base_name.clone(), 0);
2866 base_name
2867 };
2868
2869 let mut field = Field::new(&name, data_type.clone(), nullable);
2870 let field_metadata =
2871 intersect_maps(fields.iter().map(|field| field.metadata()));
2872 field.set_metadata(field_metadata);
2873 Ok((None, Arc::new(field)))
2874 })
2875 .collect::<Result<_>>()?;
2876 let union_schema_metadata =
2877 intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2878
2879 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2881 let schema = Arc::new(schema);
2882
2883 Ok(schema)
2884 }
2885}
2886
2887fn intersect_maps<'a>(
2888 inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
2889) -> HashMap<String, String> {
2890 let mut inputs = inputs.into_iter();
2891 let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
2892 for input in inputs {
2893 merged.retain(|k, v| input.get(k) == Some(&*v));
2898 }
2899 merged
2900}
2901
2902impl PartialOrd for Union {
2904 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2905 self.inputs.partial_cmp(&other.inputs)
2906 }
2907}
2908
2909#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2932pub struct DescribeTable {
2933 pub schema: Arc<Schema>,
2935 pub output_schema: DFSchemaRef,
2937}
2938
2939impl PartialOrd for DescribeTable {
2942 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
2943 None
2945 }
2946}
2947
2948#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2950pub enum ExplainFormat {
2951 Indent,
2968 Tree,
2992 PostgresJSON,
3040 Graphviz,
3077}
3078
3079impl FromStr for ExplainFormat {
3081 type Err = DataFusionError;
3082
3083 fn from_str(format: &str) -> std::result::Result<Self, Self::Err> {
3084 match format.to_lowercase().as_str() {
3085 "indent" => Ok(ExplainFormat::Indent),
3086 "tree" => Ok(ExplainFormat::Tree),
3087 "pgjson" => Ok(ExplainFormat::PostgresJSON),
3088 "graphviz" => Ok(ExplainFormat::Graphviz),
3089 _ => {
3090 plan_err!("Invalid explain format. Expected 'indent', 'tree', 'pgjson' or 'graphviz'. Got '{format}'")
3091 }
3092 }
3093 }
3094}
3095
3096#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3103pub struct Explain {
3104 pub verbose: bool,
3106 pub explain_format: ExplainFormat,
3109 pub plan: Arc<LogicalPlan>,
3111 pub stringified_plans: Vec<StringifiedPlan>,
3113 pub schema: DFSchemaRef,
3115 pub logical_optimization_succeeded: bool,
3117}
3118
3119impl PartialOrd for Explain {
3121 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3122 #[derive(PartialEq, PartialOrd)]
3123 struct ComparableExplain<'a> {
3124 pub verbose: &'a bool,
3126 pub plan: &'a Arc<LogicalPlan>,
3128 pub stringified_plans: &'a Vec<StringifiedPlan>,
3130 pub logical_optimization_succeeded: &'a bool,
3132 }
3133 let comparable_self = ComparableExplain {
3134 verbose: &self.verbose,
3135 plan: &self.plan,
3136 stringified_plans: &self.stringified_plans,
3137 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3138 };
3139 let comparable_other = ComparableExplain {
3140 verbose: &other.verbose,
3141 plan: &other.plan,
3142 stringified_plans: &other.stringified_plans,
3143 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3144 };
3145 comparable_self.partial_cmp(&comparable_other)
3146 }
3147}
3148
3149#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3152pub struct Analyze {
3153 pub verbose: bool,
3155 pub input: Arc<LogicalPlan>,
3157 pub schema: DFSchemaRef,
3159}
3160
3161impl PartialOrd for Analyze {
3163 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3164 match self.verbose.partial_cmp(&other.verbose) {
3165 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3166 cmp => cmp,
3167 }
3168 }
3169}
3170
3171#[allow(clippy::derived_hash_with_manual_eq)]
3176#[derive(Debug, Clone, Eq, Hash)]
3177pub struct Extension {
3178 pub node: Arc<dyn UserDefinedLogicalNode>,
3180}
3181
3182impl PartialEq for Extension {
3186 fn eq(&self, other: &Self) -> bool {
3187 self.node.eq(&other.node)
3188 }
3189}
3190
3191impl PartialOrd for Extension {
3192 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3193 self.node.partial_cmp(&other.node)
3194 }
3195}
3196
3197#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3199pub struct Limit {
3200 pub skip: Option<Box<Expr>>,
3202 pub fetch: Option<Box<Expr>>,
3205 pub input: Arc<LogicalPlan>,
3207}
3208
3209pub enum SkipType {
3211 Literal(usize),
3213 UnsupportedExpr,
3215}
3216
3217pub enum FetchType {
3219 Literal(Option<usize>),
3222 UnsupportedExpr,
3224}
3225
3226impl Limit {
3227 pub fn get_skip_type(&self) -> Result<SkipType> {
3229 match self.skip.as_deref() {
3230 Some(expr) => match *expr {
3231 Expr::Literal(ScalarValue::Int64(s)) => {
3232 let s = s.unwrap_or(0);
3234 if s >= 0 {
3235 Ok(SkipType::Literal(s as usize))
3236 } else {
3237 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3238 }
3239 }
3240 _ => Ok(SkipType::UnsupportedExpr),
3241 },
3242 None => Ok(SkipType::Literal(0)),
3244 }
3245 }
3246
3247 pub fn get_fetch_type(&self) -> Result<FetchType> {
3249 match self.fetch.as_deref() {
3250 Some(expr) => match *expr {
3251 Expr::Literal(ScalarValue::Int64(Some(s))) => {
3252 if s >= 0 {
3253 Ok(FetchType::Literal(Some(s as usize)))
3254 } else {
3255 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3256 }
3257 }
3258 Expr::Literal(ScalarValue::Int64(None)) => Ok(FetchType::Literal(None)),
3259 _ => Ok(FetchType::UnsupportedExpr),
3260 },
3261 None => Ok(FetchType::Literal(None)),
3262 }
3263 }
3264}
3265
3266#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3268pub enum Distinct {
3269 All(Arc<LogicalPlan>),
3271 On(DistinctOn),
3273}
3274
3275impl Distinct {
3276 pub fn input(&self) -> &Arc<LogicalPlan> {
3278 match self {
3279 Distinct::All(input) => input,
3280 Distinct::On(DistinctOn { input, .. }) => input,
3281 }
3282 }
3283}
3284
3285#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3287pub struct DistinctOn {
3288 pub on_expr: Vec<Expr>,
3290 pub select_expr: Vec<Expr>,
3292 pub sort_expr: Option<Vec<SortExpr>>,
3296 pub input: Arc<LogicalPlan>,
3298 pub schema: DFSchemaRef,
3300}
3301
3302impl DistinctOn {
3303 pub fn try_new(
3305 on_expr: Vec<Expr>,
3306 select_expr: Vec<Expr>,
3307 sort_expr: Option<Vec<SortExpr>>,
3308 input: Arc<LogicalPlan>,
3309 ) -> Result<Self> {
3310 if on_expr.is_empty() {
3311 return plan_err!("No `ON` expressions provided");
3312 }
3313
3314 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3315 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3316 .into_iter()
3317 .collect();
3318
3319 let dfschema = DFSchema::new_with_metadata(
3320 qualified_fields,
3321 input.schema().metadata().clone(),
3322 )?;
3323
3324 let mut distinct_on = DistinctOn {
3325 on_expr,
3326 select_expr,
3327 sort_expr: None,
3328 input,
3329 schema: Arc::new(dfschema),
3330 };
3331
3332 if let Some(sort_expr) = sort_expr {
3333 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3334 }
3335
3336 Ok(distinct_on)
3337 }
3338
3339 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3343 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3344
3345 let mut matched = true;
3347 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3348 if on != &sort.expr {
3349 matched = false;
3350 break;
3351 }
3352 }
3353
3354 if self.on_expr.len() > sort_expr.len() || !matched {
3355 return plan_err!(
3356 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3357 );
3358 }
3359
3360 self.sort_expr = Some(sort_expr);
3361 Ok(self)
3362 }
3363}
3364
3365impl PartialOrd for DistinctOn {
3367 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3368 #[derive(PartialEq, PartialOrd)]
3369 struct ComparableDistinctOn<'a> {
3370 pub on_expr: &'a Vec<Expr>,
3372 pub select_expr: &'a Vec<Expr>,
3374 pub sort_expr: &'a Option<Vec<SortExpr>>,
3378 pub input: &'a Arc<LogicalPlan>,
3380 }
3381 let comparable_self = ComparableDistinctOn {
3382 on_expr: &self.on_expr,
3383 select_expr: &self.select_expr,
3384 sort_expr: &self.sort_expr,
3385 input: &self.input,
3386 };
3387 let comparable_other = ComparableDistinctOn {
3388 on_expr: &other.on_expr,
3389 select_expr: &other.select_expr,
3390 sort_expr: &other.sort_expr,
3391 input: &other.input,
3392 };
3393 comparable_self.partial_cmp(&comparable_other)
3394 }
3395}
3396
3397#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3410#[non_exhaustive]
3412pub struct Aggregate {
3413 pub input: Arc<LogicalPlan>,
3415 pub group_expr: Vec<Expr>,
3417 pub aggr_expr: Vec<Expr>,
3419 pub schema: DFSchemaRef,
3421}
3422
3423impl Aggregate {
3424 pub fn try_new(
3426 input: Arc<LogicalPlan>,
3427 group_expr: Vec<Expr>,
3428 aggr_expr: Vec<Expr>,
3429 ) -> Result<Self> {
3430 let group_expr = enumerate_grouping_sets(group_expr)?;
3431
3432 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3433
3434 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3435
3436 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3437
3438 if is_grouping_set {
3440 qualified_fields = qualified_fields
3441 .into_iter()
3442 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3443 .collect::<Vec<_>>();
3444 qualified_fields.push((
3445 None,
3446 Field::new(
3447 Self::INTERNAL_GROUPING_ID,
3448 Self::grouping_id_type(qualified_fields.len()),
3449 false,
3450 )
3451 .into(),
3452 ));
3453 }
3454
3455 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3456
3457 let schema = DFSchema::new_with_metadata(
3458 qualified_fields,
3459 input.schema().metadata().clone(),
3460 )?;
3461
3462 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3463 }
3464
3465 pub fn try_new_with_schema(
3471 input: Arc<LogicalPlan>,
3472 group_expr: Vec<Expr>,
3473 aggr_expr: Vec<Expr>,
3474 schema: DFSchemaRef,
3475 ) -> Result<Self> {
3476 if group_expr.is_empty() && aggr_expr.is_empty() {
3477 return plan_err!(
3478 "Aggregate requires at least one grouping or aggregate expression"
3479 );
3480 }
3481 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3482 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3483 return plan_err!(
3484 "Aggregate schema has wrong number of fields. Expected {} got {}",
3485 group_expr_count + aggr_expr.len(),
3486 schema.fields().len()
3487 );
3488 }
3489
3490 let aggregate_func_dependencies =
3491 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3492 let new_schema = schema.as_ref().clone();
3493 let schema = Arc::new(
3494 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3495 );
3496 Ok(Self {
3497 input,
3498 group_expr,
3499 aggr_expr,
3500 schema,
3501 })
3502 }
3503
3504 fn is_grouping_set(&self) -> bool {
3505 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3506 }
3507
3508 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3510 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3511 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3512 });
3513 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3514 if self.is_grouping_set() {
3515 exprs.push(&INTERNAL_ID_EXPR);
3516 }
3517 exprs.extend(self.aggr_expr.iter());
3518 debug_assert!(exprs.len() == self.schema.fields().len());
3519 Ok(exprs)
3520 }
3521
3522 pub fn group_expr_len(&self) -> Result<usize> {
3526 grouping_set_expr_count(&self.group_expr)
3527 }
3528
3529 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3534 if group_exprs <= 8 {
3535 DataType::UInt8
3536 } else if group_exprs <= 16 {
3537 DataType::UInt16
3538 } else if group_exprs <= 32 {
3539 DataType::UInt32
3540 } else {
3541 DataType::UInt64
3542 }
3543 }
3544
3545 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3563}
3564
3565impl PartialOrd for Aggregate {
3567 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3568 match self.input.partial_cmp(&other.input) {
3569 Some(Ordering::Equal) => {
3570 match self.group_expr.partial_cmp(&other.group_expr) {
3571 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3572 cmp => cmp,
3573 }
3574 }
3575 cmp => cmp,
3576 }
3577 }
3578}
3579
3580fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3582 group_expr
3583 .iter()
3584 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3585}
3586
3587fn calc_func_dependencies_for_aggregate(
3589 group_expr: &[Expr],
3591 input: &LogicalPlan,
3593 aggr_schema: &DFSchema,
3595) -> Result<FunctionalDependencies> {
3596 if !contains_grouping_set(group_expr) {
3602 let group_by_expr_names = group_expr
3603 .iter()
3604 .map(|item| item.schema_name().to_string())
3605 .collect::<IndexSet<_>>()
3606 .into_iter()
3607 .collect::<Vec<_>>();
3608 let aggregate_func_dependencies = aggregate_functional_dependencies(
3609 input.schema(),
3610 &group_by_expr_names,
3611 aggr_schema,
3612 );
3613 Ok(aggregate_func_dependencies)
3614 } else {
3615 Ok(FunctionalDependencies::empty())
3616 }
3617}
3618
3619fn calc_func_dependencies_for_project(
3622 exprs: &[Expr],
3623 input: &LogicalPlan,
3624) -> Result<FunctionalDependencies> {
3625 let input_fields = input.schema().field_names();
3626 let proj_indices = exprs
3628 .iter()
3629 .map(|expr| match expr {
3630 #[expect(deprecated)]
3631 Expr::Wildcard { qualifier, options } => {
3632 let wildcard_fields = exprlist_to_fields(
3633 vec![&Expr::Wildcard {
3634 qualifier: qualifier.clone(),
3635 options: options.clone(),
3636 }],
3637 input,
3638 )?;
3639 Ok::<_, DataFusionError>(
3640 wildcard_fields
3641 .into_iter()
3642 .filter_map(|(qualifier, f)| {
3643 let flat_name = qualifier
3644 .map(|t| format!("{}.{}", t, f.name()))
3645 .unwrap_or_else(|| f.name().clone());
3646 input_fields.iter().position(|item| *item == flat_name)
3647 })
3648 .collect::<Vec<_>>(),
3649 )
3650 }
3651 Expr::Alias(alias) => {
3652 let name = format!("{}", alias.expr);
3653 Ok(input_fields
3654 .iter()
3655 .position(|item| *item == name)
3656 .map(|i| vec![i])
3657 .unwrap_or(vec![]))
3658 }
3659 _ => {
3660 let name = format!("{}", expr);
3661 Ok(input_fields
3662 .iter()
3663 .position(|item| *item == name)
3664 .map(|i| vec![i])
3665 .unwrap_or(vec![]))
3666 }
3667 })
3668 .collect::<Result<Vec<_>>>()?
3669 .into_iter()
3670 .flatten()
3671 .collect::<Vec<_>>();
3672
3673 Ok(input
3674 .schema()
3675 .functional_dependencies()
3676 .project_functional_dependencies(&proj_indices, exprs.len()))
3677}
3678
3679#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3681pub struct Sort {
3682 pub expr: Vec<SortExpr>,
3684 pub input: Arc<LogicalPlan>,
3686 pub fetch: Option<usize>,
3688}
3689
3690#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3692pub struct Join {
3693 pub left: Arc<LogicalPlan>,
3695 pub right: Arc<LogicalPlan>,
3697 pub on: Vec<(Expr, Expr)>,
3699 pub filter: Option<Expr>,
3701 pub join_type: JoinType,
3703 pub join_constraint: JoinConstraint,
3705 pub schema: DFSchemaRef,
3707 pub null_equals_null: bool,
3709}
3710
3711impl Join {
3712 pub fn try_new_with_project_input(
3714 original: &LogicalPlan,
3715 left: Arc<LogicalPlan>,
3716 right: Arc<LogicalPlan>,
3717 column_on: (Vec<Column>, Vec<Column>),
3718 ) -> Result<Self> {
3719 let original_join = match original {
3720 LogicalPlan::Join(join) => join,
3721 _ => return plan_err!("Could not create join with project input"),
3722 };
3723
3724 let on: Vec<(Expr, Expr)> = column_on
3725 .0
3726 .into_iter()
3727 .zip(column_on.1)
3728 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3729 .collect();
3730 let join_schema =
3731 build_join_schema(left.schema(), right.schema(), &original_join.join_type)?;
3732
3733 Ok(Join {
3734 left,
3735 right,
3736 on,
3737 filter: original_join.filter.clone(),
3738 join_type: original_join.join_type,
3739 join_constraint: original_join.join_constraint,
3740 schema: Arc::new(join_schema),
3741 null_equals_null: original_join.null_equals_null,
3742 })
3743 }
3744}
3745
3746impl PartialOrd for Join {
3748 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3749 #[derive(PartialEq, PartialOrd)]
3750 struct ComparableJoin<'a> {
3751 pub left: &'a Arc<LogicalPlan>,
3753 pub right: &'a Arc<LogicalPlan>,
3755 pub on: &'a Vec<(Expr, Expr)>,
3757 pub filter: &'a Option<Expr>,
3759 pub join_type: &'a JoinType,
3761 pub join_constraint: &'a JoinConstraint,
3763 pub null_equals_null: &'a bool,
3765 }
3766 let comparable_self = ComparableJoin {
3767 left: &self.left,
3768 right: &self.right,
3769 on: &self.on,
3770 filter: &self.filter,
3771 join_type: &self.join_type,
3772 join_constraint: &self.join_constraint,
3773 null_equals_null: &self.null_equals_null,
3774 };
3775 let comparable_other = ComparableJoin {
3776 left: &other.left,
3777 right: &other.right,
3778 on: &other.on,
3779 filter: &other.filter,
3780 join_type: &other.join_type,
3781 join_constraint: &other.join_constraint,
3782 null_equals_null: &other.null_equals_null,
3783 };
3784 comparable_self.partial_cmp(&comparable_other)
3785 }
3786}
3787
3788#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3790pub struct Subquery {
3791 pub subquery: Arc<LogicalPlan>,
3793 pub outer_ref_columns: Vec<Expr>,
3795 pub spans: Spans,
3797}
3798
3799impl Normalizeable for Subquery {
3800 fn can_normalize(&self) -> bool {
3801 false
3802 }
3803}
3804
3805impl NormalizeEq for Subquery {
3806 fn normalize_eq(&self, other: &Self) -> bool {
3807 *self.subquery == *other.subquery
3809 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3810 && self
3811 .outer_ref_columns
3812 .iter()
3813 .zip(other.outer_ref_columns.iter())
3814 .all(|(a, b)| a.normalize_eq(b))
3815 }
3816}
3817
3818impl Subquery {
3819 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3820 match plan {
3821 Expr::ScalarSubquery(it) => Ok(it),
3822 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3823 _ => plan_err!("Could not coerce into ScalarSubquery!"),
3824 }
3825 }
3826
3827 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3828 Subquery {
3829 subquery: plan,
3830 outer_ref_columns: self.outer_ref_columns.clone(),
3831 spans: Spans::new(),
3832 }
3833 }
3834}
3835
3836impl Debug for Subquery {
3837 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3838 write!(f, "<subquery>")
3839 }
3840}
3841
3842#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3848pub enum Partitioning {
3849 RoundRobinBatch(usize),
3851 Hash(Vec<Expr>, usize),
3854 DistributeBy(Vec<Expr>),
3856}
3857
3858#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
3878pub struct ColumnUnnestList {
3879 pub output_column: Column,
3880 pub depth: usize,
3881}
3882
3883impl Display for ColumnUnnestList {
3884 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3885 write!(f, "{}|depth={}", self.output_column, self.depth)
3886 }
3887}
3888
3889#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3892pub struct Unnest {
3893 pub input: Arc<LogicalPlan>,
3895 pub exec_columns: Vec<Column>,
3897 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
3900 pub struct_type_columns: Vec<usize>,
3903 pub dependency_indices: Vec<usize>,
3906 pub schema: DFSchemaRef,
3908 pub options: UnnestOptions,
3910}
3911
3912impl PartialOrd for Unnest {
3914 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3915 #[derive(PartialEq, PartialOrd)]
3916 struct ComparableUnnest<'a> {
3917 pub input: &'a Arc<LogicalPlan>,
3919 pub exec_columns: &'a Vec<Column>,
3921 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
3924 pub struct_type_columns: &'a Vec<usize>,
3927 pub dependency_indices: &'a Vec<usize>,
3930 pub options: &'a UnnestOptions,
3932 }
3933 let comparable_self = ComparableUnnest {
3934 input: &self.input,
3935 exec_columns: &self.exec_columns,
3936 list_type_columns: &self.list_type_columns,
3937 struct_type_columns: &self.struct_type_columns,
3938 dependency_indices: &self.dependency_indices,
3939 options: &self.options,
3940 };
3941 let comparable_other = ComparableUnnest {
3942 input: &other.input,
3943 exec_columns: &other.exec_columns,
3944 list_type_columns: &other.list_type_columns,
3945 struct_type_columns: &other.struct_type_columns,
3946 dependency_indices: &other.dependency_indices,
3947 options: &other.options,
3948 };
3949 comparable_self.partial_cmp(&comparable_other)
3950 }
3951}
3952
3953#[cfg(test)]
3954mod tests {
3955
3956 use super::*;
3957 use crate::builder::LogicalTableSource;
3958 use crate::logical_plan::table_scan;
3959 use crate::{
3960 binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
3961 GroupingSet,
3962 };
3963
3964 use datafusion_common::tree_node::{
3965 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
3966 };
3967 use datafusion_common::{not_impl_err, Constraint, ScalarValue};
3968
3969 use crate::test::function_stub::count;
3970
3971 fn employee_schema() -> Schema {
3972 Schema::new(vec![
3973 Field::new("id", DataType::Int32, false),
3974 Field::new("first_name", DataType::Utf8, false),
3975 Field::new("last_name", DataType::Utf8, false),
3976 Field::new("state", DataType::Utf8, false),
3977 Field::new("salary", DataType::Int32, false),
3978 ])
3979 }
3980
3981 fn display_plan() -> Result<LogicalPlan> {
3982 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
3983 .build()?;
3984
3985 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
3986 .filter(in_subquery(col("state"), Arc::new(plan1)))?
3987 .project(vec![col("id")])?
3988 .build()
3989 }
3990
3991 #[test]
3992 fn test_display_indent() -> Result<()> {
3993 let plan = display_plan()?;
3994
3995 let expected = "Projection: employee_csv.id\
3996 \n Filter: employee_csv.state IN (<subquery>)\
3997 \n Subquery:\
3998 \n TableScan: employee_csv projection=[state]\
3999 \n TableScan: employee_csv projection=[id, state]";
4000
4001 assert_eq!(expected, format!("{}", plan.display_indent()));
4002 Ok(())
4003 }
4004
4005 #[test]
4006 fn test_display_indent_schema() -> Result<()> {
4007 let plan = display_plan()?;
4008
4009 let expected = "Projection: employee_csv.id [id:Int32]\
4010 \n Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]\
4011 \n Subquery: [state:Utf8]\
4012 \n TableScan: employee_csv projection=[state] [state:Utf8]\
4013 \n TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]";
4014
4015 assert_eq!(expected, format!("{}", plan.display_indent_schema()));
4016 Ok(())
4017 }
4018
4019 #[test]
4020 fn test_display_subquery_alias() -> Result<()> {
4021 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4022 .build()?;
4023 let plan1 = Arc::new(plan1);
4024
4025 let plan =
4026 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4027 .project(vec![col("id"), exists(plan1).alias("exists")])?
4028 .build();
4029
4030 let expected = "Projection: employee_csv.id, EXISTS (<subquery>) AS exists\
4031 \n Subquery:\
4032 \n TableScan: employee_csv projection=[state]\
4033 \n TableScan: employee_csv projection=[id, state]";
4034
4035 assert_eq!(expected, format!("{}", plan?.display_indent()));
4036 Ok(())
4037 }
4038
4039 #[test]
4040 fn test_display_graphviz() -> Result<()> {
4041 let plan = display_plan()?;
4042
4043 let expected_graphviz = r#"
4044// Begin DataFusion GraphViz Plan,
4045// display it online here: https://dreampuf.github.io/GraphvizOnline
4046
4047digraph {
4048 subgraph cluster_1
4049 {
4050 graph[label="LogicalPlan"]
4051 2[shape=box label="Projection: employee_csv.id"]
4052 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4053 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4054 4[shape=box label="Subquery:"]
4055 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4056 5[shape=box label="TableScan: employee_csv projection=[state]"]
4057 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4058 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4059 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4060 }
4061 subgraph cluster_7
4062 {
4063 graph[label="Detailed LogicalPlan"]
4064 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4065 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4066 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4067 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4068 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4069 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4070 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4071 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4072 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4073 }
4074}
4075// End DataFusion GraphViz Plan
4076"#;
4077
4078 let graphviz = format!("{}", plan.display_graphviz());
4081
4082 assert_eq!(expected_graphviz, graphviz);
4083 Ok(())
4084 }
4085
4086 #[test]
4087 fn test_display_pg_json() -> Result<()> {
4088 let plan = display_plan()?;
4089
4090 let expected_pg_json = r#"[
4091 {
4092 "Plan": {
4093 "Expressions": [
4094 "employee_csv.id"
4095 ],
4096 "Node Type": "Projection",
4097 "Output": [
4098 "id"
4099 ],
4100 "Plans": [
4101 {
4102 "Condition": "employee_csv.state IN (<subquery>)",
4103 "Node Type": "Filter",
4104 "Output": [
4105 "id",
4106 "state"
4107 ],
4108 "Plans": [
4109 {
4110 "Node Type": "Subquery",
4111 "Output": [
4112 "state"
4113 ],
4114 "Plans": [
4115 {
4116 "Node Type": "TableScan",
4117 "Output": [
4118 "state"
4119 ],
4120 "Plans": [],
4121 "Relation Name": "employee_csv"
4122 }
4123 ]
4124 },
4125 {
4126 "Node Type": "TableScan",
4127 "Output": [
4128 "id",
4129 "state"
4130 ],
4131 "Plans": [],
4132 "Relation Name": "employee_csv"
4133 }
4134 ]
4135 }
4136 ]
4137 }
4138 }
4139]"#;
4140
4141 let pg_json = format!("{}", plan.display_pg_json());
4142
4143 assert_eq!(expected_pg_json, pg_json);
4144 Ok(())
4145 }
4146
4147 #[derive(Debug, Default)]
4149 struct OkVisitor {
4150 strings: Vec<String>,
4151 }
4152
4153 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4154 type Node = LogicalPlan;
4155
4156 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4157 let s = match plan {
4158 LogicalPlan::Projection { .. } => "pre_visit Projection",
4159 LogicalPlan::Filter { .. } => "pre_visit Filter",
4160 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4161 _ => {
4162 return not_impl_err!("unknown plan type");
4163 }
4164 };
4165
4166 self.strings.push(s.into());
4167 Ok(TreeNodeRecursion::Continue)
4168 }
4169
4170 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4171 let s = match plan {
4172 LogicalPlan::Projection { .. } => "post_visit Projection",
4173 LogicalPlan::Filter { .. } => "post_visit Filter",
4174 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4175 _ => {
4176 return not_impl_err!("unknown plan type");
4177 }
4178 };
4179
4180 self.strings.push(s.into());
4181 Ok(TreeNodeRecursion::Continue)
4182 }
4183 }
4184
4185 #[test]
4186 fn visit_order() {
4187 let mut visitor = OkVisitor::default();
4188 let plan = test_plan();
4189 let res = plan.visit_with_subqueries(&mut visitor);
4190 assert!(res.is_ok());
4191
4192 assert_eq!(
4193 visitor.strings,
4194 vec![
4195 "pre_visit Projection",
4196 "pre_visit Filter",
4197 "pre_visit TableScan",
4198 "post_visit TableScan",
4199 "post_visit Filter",
4200 "post_visit Projection",
4201 ]
4202 );
4203 }
4204
4205 #[derive(Debug, Default)]
4206 struct OptionalCounter {
4208 val: Option<usize>,
4209 }
4210
4211 impl OptionalCounter {
4212 fn new(val: usize) -> Self {
4213 Self { val: Some(val) }
4214 }
4215 fn dec(&mut self) -> bool {
4217 if Some(0) == self.val {
4218 true
4219 } else {
4220 self.val = self.val.take().map(|i| i - 1);
4221 false
4222 }
4223 }
4224 }
4225
4226 #[derive(Debug, Default)]
4227 struct StoppingVisitor {
4229 inner: OkVisitor,
4230 return_false_from_pre_in: OptionalCounter,
4232 return_false_from_post_in: OptionalCounter,
4234 }
4235
4236 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4237 type Node = LogicalPlan;
4238
4239 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4240 if self.return_false_from_pre_in.dec() {
4241 return Ok(TreeNodeRecursion::Stop);
4242 }
4243 self.inner.f_down(plan)?;
4244
4245 Ok(TreeNodeRecursion::Continue)
4246 }
4247
4248 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4249 if self.return_false_from_post_in.dec() {
4250 return Ok(TreeNodeRecursion::Stop);
4251 }
4252
4253 self.inner.f_up(plan)
4254 }
4255 }
4256
4257 #[test]
4259 fn early_stopping_pre_visit() {
4260 let mut visitor = StoppingVisitor {
4261 return_false_from_pre_in: OptionalCounter::new(2),
4262 ..Default::default()
4263 };
4264 let plan = test_plan();
4265 let res = plan.visit_with_subqueries(&mut visitor);
4266 assert!(res.is_ok());
4267
4268 assert_eq!(
4269 visitor.inner.strings,
4270 vec!["pre_visit Projection", "pre_visit Filter"]
4271 );
4272 }
4273
4274 #[test]
4275 fn early_stopping_post_visit() {
4276 let mut visitor = StoppingVisitor {
4277 return_false_from_post_in: OptionalCounter::new(1),
4278 ..Default::default()
4279 };
4280 let plan = test_plan();
4281 let res = plan.visit_with_subqueries(&mut visitor);
4282 assert!(res.is_ok());
4283
4284 assert_eq!(
4285 visitor.inner.strings,
4286 vec![
4287 "pre_visit Projection",
4288 "pre_visit Filter",
4289 "pre_visit TableScan",
4290 "post_visit TableScan",
4291 ]
4292 );
4293 }
4294
4295 #[derive(Debug, Default)]
4296 struct ErrorVisitor {
4298 inner: OkVisitor,
4299 return_error_from_pre_in: OptionalCounter,
4301 return_error_from_post_in: OptionalCounter,
4303 }
4304
4305 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4306 type Node = LogicalPlan;
4307
4308 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4309 if self.return_error_from_pre_in.dec() {
4310 return not_impl_err!("Error in pre_visit");
4311 }
4312
4313 self.inner.f_down(plan)
4314 }
4315
4316 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4317 if self.return_error_from_post_in.dec() {
4318 return not_impl_err!("Error in post_visit");
4319 }
4320
4321 self.inner.f_up(plan)
4322 }
4323 }
4324
4325 #[test]
4326 fn error_pre_visit() {
4327 let mut visitor = ErrorVisitor {
4328 return_error_from_pre_in: OptionalCounter::new(2),
4329 ..Default::default()
4330 };
4331 let plan = test_plan();
4332 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4333 assert_eq!(
4334 "This feature is not implemented: Error in pre_visit",
4335 res.strip_backtrace()
4336 );
4337 assert_eq!(
4338 visitor.inner.strings,
4339 vec!["pre_visit Projection", "pre_visit Filter"]
4340 );
4341 }
4342
4343 #[test]
4344 fn error_post_visit() {
4345 let mut visitor = ErrorVisitor {
4346 return_error_from_post_in: OptionalCounter::new(1),
4347 ..Default::default()
4348 };
4349 let plan = test_plan();
4350 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4351 assert_eq!(
4352 "This feature is not implemented: Error in post_visit",
4353 res.strip_backtrace()
4354 );
4355 assert_eq!(
4356 visitor.inner.strings,
4357 vec![
4358 "pre_visit Projection",
4359 "pre_visit Filter",
4360 "pre_visit TableScan",
4361 "post_visit TableScan",
4362 ]
4363 );
4364 }
4365
4366 #[test]
4367 fn projection_expr_schema_mismatch() -> Result<()> {
4368 let empty_schema = Arc::new(DFSchema::empty());
4369 let p = Projection::try_new_with_schema(
4370 vec![col("a")],
4371 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4372 produce_one_row: false,
4373 schema: Arc::clone(&empty_schema),
4374 })),
4375 empty_schema,
4376 );
4377 assert_eq!(p.err().unwrap().strip_backtrace(), "Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4378 Ok(())
4379 }
4380
4381 fn test_plan() -> LogicalPlan {
4382 let schema = Schema::new(vec![
4383 Field::new("id", DataType::Int32, false),
4384 Field::new("state", DataType::Utf8, false),
4385 ]);
4386
4387 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4388 .unwrap()
4389 .filter(col("state").eq(lit("CO")))
4390 .unwrap()
4391 .project(vec![col("id")])
4392 .unwrap()
4393 .build()
4394 .unwrap()
4395 }
4396
4397 #[test]
4398 fn test_replace_invalid_placeholder() {
4399 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4401
4402 let plan = table_scan(TableReference::none(), &schema, None)
4403 .unwrap()
4404 .filter(col("id").eq(placeholder("")))
4405 .unwrap()
4406 .build()
4407 .unwrap();
4408
4409 let param_values = vec![ScalarValue::Int32(Some(42))];
4410 plan.replace_params_with_values(¶m_values.clone().into())
4411 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4412
4413 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4415
4416 let plan = table_scan(TableReference::none(), &schema, None)
4417 .unwrap()
4418 .filter(col("id").eq(placeholder("$0")))
4419 .unwrap()
4420 .build()
4421 .unwrap();
4422
4423 plan.replace_params_with_values(¶m_values.clone().into())
4424 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4425
4426 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4428
4429 let plan = table_scan(TableReference::none(), &schema, None)
4430 .unwrap()
4431 .filter(col("id").eq(placeholder("$00")))
4432 .unwrap()
4433 .build()
4434 .unwrap();
4435
4436 plan.replace_params_with_values(¶m_values.into())
4437 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4438 }
4439
4440 #[test]
4441 fn test_nullable_schema_after_grouping_set() {
4442 let schema = Schema::new(vec![
4443 Field::new("foo", DataType::Int32, false),
4444 Field::new("bar", DataType::Int32, false),
4445 ]);
4446
4447 let plan = table_scan(TableReference::none(), &schema, None)
4448 .unwrap()
4449 .aggregate(
4450 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4451 vec![col("foo")],
4452 vec![col("bar")],
4453 ]))],
4454 vec![count(lit(true))],
4455 )
4456 .unwrap()
4457 .build()
4458 .unwrap();
4459
4460 let output_schema = plan.schema();
4461
4462 assert!(output_schema
4463 .field_with_name(None, "foo")
4464 .unwrap()
4465 .is_nullable(),);
4466 assert!(output_schema
4467 .field_with_name(None, "bar")
4468 .unwrap()
4469 .is_nullable());
4470 }
4471
4472 #[test]
4473 fn test_filter_is_scalar() {
4474 let schema =
4476 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4477
4478 let source = Arc::new(LogicalTableSource::new(schema));
4479 let schema = Arc::new(
4480 DFSchema::try_from_qualified_schema(
4481 TableReference::bare("tab"),
4482 &source.schema(),
4483 )
4484 .unwrap(),
4485 );
4486 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4487 table_name: TableReference::bare("tab"),
4488 source: Arc::clone(&source) as Arc<dyn TableSource>,
4489 projection: None,
4490 projected_schema: Arc::clone(&schema),
4491 filters: vec![],
4492 fetch: None,
4493 }));
4494 let col = schema.field_names()[0].clone();
4495
4496 let filter = Filter::try_new(
4497 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)))),
4498 scan,
4499 )
4500 .unwrap();
4501 assert!(!filter.is_scalar());
4502 let unique_schema = Arc::new(
4503 schema
4504 .as_ref()
4505 .clone()
4506 .with_functional_dependencies(
4507 FunctionalDependencies::new_from_constraints(
4508 Some(&Constraints::new_unverified(vec![Constraint::Unique(
4509 vec![0],
4510 )])),
4511 1,
4512 ),
4513 )
4514 .unwrap(),
4515 );
4516 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4517 table_name: TableReference::bare("tab"),
4518 source,
4519 projection: None,
4520 projected_schema: Arc::clone(&unique_schema),
4521 filters: vec![],
4522 fetch: None,
4523 }));
4524 let col = schema.field_names()[0].clone();
4525
4526 let filter =
4527 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4528 assert!(filter.is_scalar());
4529 }
4530
4531 #[test]
4532 fn test_transform_explain() {
4533 let schema = Schema::new(vec![
4534 Field::new("foo", DataType::Int32, false),
4535 Field::new("bar", DataType::Int32, false),
4536 ]);
4537
4538 let plan = table_scan(TableReference::none(), &schema, None)
4539 .unwrap()
4540 .explain(false, false)
4541 .unwrap()
4542 .build()
4543 .unwrap();
4544
4545 let external_filter = col("foo").eq(lit(true));
4546
4547 let plan = plan
4550 .transform(|plan| match plan {
4551 LogicalPlan::TableScan(table) => {
4552 let filter = Filter::try_new(
4553 external_filter.clone(),
4554 Arc::new(LogicalPlan::TableScan(table)),
4555 )
4556 .unwrap();
4557 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4558 }
4559 x => Ok(Transformed::no(x)),
4560 })
4561 .data()
4562 .unwrap();
4563
4564 let expected = "Explain\
4565 \n Filter: foo = Boolean(true)\
4566 \n TableScan: ?table?";
4567 let actual = format!("{}", plan.display_indent());
4568 assert_eq!(expected.to_string(), actual)
4569 }
4570
4571 #[test]
4572 fn test_plan_partial_ord() {
4573 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
4574 produce_one_row: false,
4575 schema: Arc::new(DFSchema::empty()),
4576 });
4577
4578 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
4579 schema: Arc::new(Schema::new(vec![Field::new(
4580 "foo",
4581 DataType::Int32,
4582 false,
4583 )])),
4584 output_schema: DFSchemaRef::new(DFSchema::empty()),
4585 });
4586
4587 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
4588 schema: Arc::new(Schema::new(vec![Field::new(
4589 "foo",
4590 DataType::Int32,
4591 false,
4592 )])),
4593 output_schema: DFSchemaRef::new(DFSchema::empty()),
4594 });
4595
4596 assert_eq!(
4597 empty_relation.partial_cmp(&describe_table),
4598 Some(Ordering::Less)
4599 );
4600 assert_eq!(
4601 describe_table.partial_cmp(&empty_relation),
4602 Some(Ordering::Greater)
4603 );
4604 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
4605 }
4606
4607 #[test]
4608 fn test_limit_with_new_children() {
4609 let input = Arc::new(LogicalPlan::Values(Values {
4610 schema: Arc::new(DFSchema::empty()),
4611 values: vec![vec![]],
4612 }));
4613 let cases = [
4614 LogicalPlan::Limit(Limit {
4615 skip: None,
4616 fetch: None,
4617 input: Arc::clone(&input),
4618 }),
4619 LogicalPlan::Limit(Limit {
4620 skip: None,
4621 fetch: Some(Box::new(Expr::Literal(
4622 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4623 ))),
4624 input: Arc::clone(&input),
4625 }),
4626 LogicalPlan::Limit(Limit {
4627 skip: Some(Box::new(Expr::Literal(
4628 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4629 ))),
4630 fetch: None,
4631 input: Arc::clone(&input),
4632 }),
4633 LogicalPlan::Limit(Limit {
4634 skip: Some(Box::new(Expr::Literal(
4635 ScalarValue::new_one(&DataType::UInt32).unwrap(),
4636 ))),
4637 fetch: Some(Box::new(Expr::Literal(
4638 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4639 ))),
4640 input,
4641 }),
4642 ];
4643
4644 for limit in cases {
4645 let new_limit = limit
4646 .with_new_exprs(
4647 limit.expressions(),
4648 limit.inputs().into_iter().cloned().collect(),
4649 )
4650 .unwrap();
4651 assert_eq!(limit, new_limit);
4652 }
4653 }
4654
4655 #[test]
4656 fn test_with_subqueries_jump() {
4657 let subquery_schema =
4662 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
4663
4664 let subquery_plan =
4665 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
4666 .unwrap()
4667 .filter(col("sub_id").eq(lit(0)))
4668 .unwrap()
4669 .build()
4670 .unwrap();
4671
4672 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4673
4674 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
4675 .unwrap()
4676 .filter(col("id").eq(lit(0)))
4677 .unwrap()
4678 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
4679 .unwrap()
4680 .build()
4681 .unwrap();
4682
4683 let mut filter_found = false;
4684 plan.apply_with_subqueries(|plan| {
4685 match plan {
4686 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
4687 LogicalPlan::Filter(..) => filter_found = true,
4688 _ => {}
4689 }
4690 Ok(TreeNodeRecursion::Continue)
4691 })
4692 .unwrap();
4693 assert!(!filter_found);
4694
4695 struct ProjectJumpVisitor {
4696 filter_found: bool,
4697 }
4698
4699 impl ProjectJumpVisitor {
4700 fn new() -> Self {
4701 Self {
4702 filter_found: false,
4703 }
4704 }
4705 }
4706
4707 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
4708 type Node = LogicalPlan;
4709
4710 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
4711 match node {
4712 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
4713 LogicalPlan::Filter(..) => self.filter_found = true,
4714 _ => {}
4715 }
4716 Ok(TreeNodeRecursion::Continue)
4717 }
4718 }
4719
4720 let mut visitor = ProjectJumpVisitor::new();
4721 plan.visit_with_subqueries(&mut visitor).unwrap();
4722 assert!(!visitor.filter_found);
4723
4724 let mut filter_found = false;
4725 plan.clone()
4726 .transform_down_with_subqueries(|plan| {
4727 match plan {
4728 LogicalPlan::Projection(..) => {
4729 return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
4730 }
4731 LogicalPlan::Filter(..) => filter_found = true,
4732 _ => {}
4733 }
4734 Ok(Transformed::no(plan))
4735 })
4736 .unwrap();
4737 assert!(!filter_found);
4738
4739 let mut filter_found = false;
4740 plan.clone()
4741 .transform_down_up_with_subqueries(
4742 |plan| {
4743 match plan {
4744 LogicalPlan::Projection(..) => {
4745 return Ok(Transformed::new(
4746 plan,
4747 false,
4748 TreeNodeRecursion::Jump,
4749 ))
4750 }
4751 LogicalPlan::Filter(..) => filter_found = true,
4752 _ => {}
4753 }
4754 Ok(Transformed::no(plan))
4755 },
4756 |plan| Ok(Transformed::no(plan)),
4757 )
4758 .unwrap();
4759 assert!(!filter_found);
4760
4761 struct ProjectJumpRewriter {
4762 filter_found: bool,
4763 }
4764
4765 impl ProjectJumpRewriter {
4766 fn new() -> Self {
4767 Self {
4768 filter_found: false,
4769 }
4770 }
4771 }
4772
4773 impl TreeNodeRewriter for ProjectJumpRewriter {
4774 type Node = LogicalPlan;
4775
4776 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
4777 match node {
4778 LogicalPlan::Projection(..) => {
4779 return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
4780 }
4781 LogicalPlan::Filter(..) => self.filter_found = true,
4782 _ => {}
4783 }
4784 Ok(Transformed::no(node))
4785 }
4786 }
4787
4788 let mut rewriter = ProjectJumpRewriter::new();
4789 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
4790 assert!(!rewriter.filter_found);
4791 }
4792
4793 #[test]
4794 fn test_with_unresolved_placeholders() {
4795 let field_name = "id";
4796 let placeholder_value = "$1";
4797 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
4798
4799 let plan = table_scan(TableReference::none(), &schema, None)
4800 .unwrap()
4801 .filter(col(field_name).eq(placeholder(placeholder_value)))
4802 .unwrap()
4803 .build()
4804 .unwrap();
4805
4806 let params = plan.get_parameter_types().unwrap();
4808 assert_eq!(params.len(), 1);
4809
4810 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
4811 assert_eq!(parameter_type, None);
4812 }
4813
4814 #[test]
4815 fn test_join_with_new_exprs() -> Result<()> {
4816 fn create_test_join(
4817 on: Vec<(Expr, Expr)>,
4818 filter: Option<Expr>,
4819 ) -> Result<LogicalPlan> {
4820 let schema = Schema::new(vec![
4821 Field::new("a", DataType::Int32, false),
4822 Field::new("b", DataType::Int32, false),
4823 ]);
4824
4825 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
4826 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
4827
4828 Ok(LogicalPlan::Join(Join {
4829 left: Arc::new(
4830 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
4831 ),
4832 right: Arc::new(
4833 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
4834 ),
4835 on,
4836 filter,
4837 join_type: JoinType::Inner,
4838 join_constraint: JoinConstraint::On,
4839 schema: Arc::new(left_schema.join(&right_schema)?),
4840 null_equals_null: false,
4841 }))
4842 }
4843
4844 {
4845 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
4846 let LogicalPlan::Join(join) = join.with_new_exprs(
4847 join.expressions(),
4848 join.inputs().into_iter().cloned().collect(),
4849 )?
4850 else {
4851 unreachable!()
4852 };
4853 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
4854 assert_eq!(join.filter, None);
4855 }
4856
4857 {
4858 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
4859 let LogicalPlan::Join(join) = join.with_new_exprs(
4860 join.expressions(),
4861 join.inputs().into_iter().cloned().collect(),
4862 )?
4863 else {
4864 unreachable!()
4865 };
4866 assert_eq!(join.on, vec![]);
4867 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
4868 }
4869
4870 {
4871 let join = create_test_join(
4872 vec![(col("t1.a"), (col("t2.a")))],
4873 Some(col("t1.b").gt(col("t2.b"))),
4874 )?;
4875 let LogicalPlan::Join(join) = join.with_new_exprs(
4876 join.expressions(),
4877 join.inputs().into_iter().cloned().collect(),
4878 )?
4879 else {
4880 unreachable!()
4881 };
4882 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
4883 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
4884 }
4885
4886 {
4887 let join = create_test_join(
4888 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
4889 None,
4890 )?;
4891 let LogicalPlan::Join(join) = join.with_new_exprs(
4892 vec![
4893 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
4894 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
4895 col("t1.b"),
4896 col("t2.b"),
4897 lit(true),
4898 ],
4899 join.inputs().into_iter().cloned().collect(),
4900 )?
4901 else {
4902 unreachable!()
4903 };
4904 assert_eq!(
4905 join.on,
4906 vec![
4907 (
4908 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
4909 binary_expr(col("t2.a"), Operator::Plus, lit(2))
4910 ),
4911 (col("t1.b"), (col("t2.b")))
4912 ]
4913 );
4914 assert_eq!(join.filter, Some(lit(true)));
4915 }
4916
4917 Ok(())
4918 }
4919}