1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::DdlStatement;
27use super::dml::CopyTo;
28use super::invariants::{
29 InvariantLevel, assert_always_invariants_at_current_node,
30 assert_executable_invariants,
31};
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34 Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams,
35 intersect_metadata_for_union,
36};
37use crate::expr_rewriter::{
38 NamePreserver, create_col_from_scalar_expr, normalize_cols, normalize_sorts,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45 grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
46};
47use crate::{
48 BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable,
49 LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
50 WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
51};
52
53use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
54use datafusion_common::cse::{NormalizeEq, Normalizeable};
55use datafusion_common::format::ExplainFormat;
56use datafusion_common::metadata::check_metadata_with_storage_equal;
57use datafusion_common::tree_node::{
58 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61 Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency,
62 FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result,
63 ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies,
64 assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
65};
66use indexmap::IndexSet;
67
68use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
206pub enum LogicalPlan {
207 Projection(Projection),
210 Filter(Filter),
219 Window(Window),
225 Aggregate(Aggregate),
231 Sort(Sort),
234 Join(Join),
237 Repartition(Repartition),
241 Union(Union),
245 TableScan(TableScan),
248 EmptyRelation(EmptyRelation),
252 Subquery(Subquery),
255 SubqueryAlias(SubqueryAlias),
257 Limit(Limit),
259 Statement(Statement),
261 Values(Values),
266 Explain(Explain),
269 Analyze(Analyze),
273 Extension(Extension),
276 Distinct(Distinct),
279 Dml(DmlStatement),
281 Ddl(DdlStatement),
283 Copy(CopyTo),
285 DescribeTable(DescribeTable),
288 Unnest(Unnest),
291 RecursiveQuery(RecursiveQuery),
293}
294
295impl Default for LogicalPlan {
296 fn default() -> Self {
297 LogicalPlan::EmptyRelation(EmptyRelation {
298 produce_one_row: false,
299 schema: Arc::new(DFSchema::empty()),
300 })
301 }
302}
303
304impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
305 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
306 &'a self,
307 mut f: F,
308 ) -> Result<TreeNodeRecursion> {
309 f(self)
310 }
311
312 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
313 self,
314 mut f: F,
315 ) -> Result<Transformed<Self>> {
316 f(self)
317 }
318}
319
320impl LogicalPlan {
321 pub fn schema(&self) -> &DFSchemaRef {
323 match self {
324 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
325 LogicalPlan::Values(Values { schema, .. }) => schema,
326 LogicalPlan::TableScan(TableScan {
327 projected_schema, ..
328 }) => projected_schema,
329 LogicalPlan::Projection(Projection { schema, .. }) => schema,
330 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
331 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
332 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
333 LogicalPlan::Window(Window { schema, .. }) => schema,
334 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
335 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
336 LogicalPlan::Join(Join { schema, .. }) => schema,
337 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
338 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
339 LogicalPlan::Statement(statement) => statement.schema(),
340 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
341 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
342 LogicalPlan::Explain(explain) => &explain.schema,
343 LogicalPlan::Analyze(analyze) => &analyze.schema,
344 LogicalPlan::Extension(extension) => extension.node.schema(),
345 LogicalPlan::Union(Union { schema, .. }) => schema,
346 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
347 output_schema
348 }
349 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
350 LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
351 LogicalPlan::Ddl(ddl) => ddl.schema(),
352 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
353 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
354 static_term.schema()
356 }
357 }
358 }
359
360 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
363 match self {
364 LogicalPlan::Window(_)
365 | LogicalPlan::Projection(_)
366 | LogicalPlan::Aggregate(_)
367 | LogicalPlan::Unnest(_)
368 | LogicalPlan::Join(_) => self
369 .inputs()
370 .iter()
371 .map(|input| input.schema().as_ref())
372 .collect(),
373 _ => vec![],
374 }
375 }
376
377 pub fn explain_schema() -> SchemaRef {
379 SchemaRef::new(Schema::new(vec![
380 Field::new("plan_type", DataType::Utf8, false),
381 Field::new("plan", DataType::Utf8, false),
382 ]))
383 }
384
385 pub fn describe_schema() -> Schema {
387 Schema::new(vec![
388 Field::new("column_name", DataType::Utf8, false),
389 Field::new("data_type", DataType::Utf8, false),
390 Field::new("is_nullable", DataType::Utf8, false),
391 ])
392 }
393
394 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
411 let mut exprs = vec![];
412 self.apply_expressions(|e| {
413 exprs.push(e.clone());
414 Ok(TreeNodeRecursion::Continue)
415 })
416 .unwrap();
418 exprs
419 }
420
421 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
424 let mut exprs = vec![];
425 self.apply_expressions(|e| {
426 find_out_reference_exprs(e).into_iter().for_each(|e| {
427 if !exprs.contains(&e) {
428 exprs.push(e)
429 }
430 });
431 Ok(TreeNodeRecursion::Continue)
432 })
433 .unwrap();
435 self.inputs()
436 .into_iter()
437 .flat_map(|child| child.all_out_ref_exprs())
438 .for_each(|e| {
439 if !exprs.contains(&e) {
440 exprs.push(e)
441 }
442 });
443 exprs
444 }
445
446 pub fn inputs(&self) -> Vec<&LogicalPlan> {
450 match self {
451 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
452 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
453 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
454 LogicalPlan::Window(Window { input, .. }) => vec![input],
455 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
456 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
457 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
458 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
459 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
460 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
461 LogicalPlan::Extension(extension) => extension.node.inputs(),
462 LogicalPlan::Union(Union { inputs, .. }) => {
463 inputs.iter().map(|arc| arc.as_ref()).collect()
464 }
465 LogicalPlan::Distinct(
466 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
467 ) => vec![input],
468 LogicalPlan::Explain(explain) => vec![&explain.plan],
469 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
470 LogicalPlan::Dml(write) => vec![&write.input],
471 LogicalPlan::Copy(copy) => vec![©.input],
472 LogicalPlan::Ddl(ddl) => ddl.inputs(),
473 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
474 LogicalPlan::RecursiveQuery(RecursiveQuery {
475 static_term,
476 recursive_term,
477 ..
478 }) => vec![static_term, recursive_term],
479 LogicalPlan::Statement(stmt) => stmt.inputs(),
480 LogicalPlan::TableScan { .. }
482 | LogicalPlan::EmptyRelation { .. }
483 | LogicalPlan::Values { .. }
484 | LogicalPlan::DescribeTable(_) => vec![],
485 }
486 }
487
488 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
490 let mut using_columns: Vec<HashSet<Column>> = vec![];
491
492 self.apply_with_subqueries(|plan| {
493 if let LogicalPlan::Join(Join {
494 join_constraint: JoinConstraint::Using,
495 on,
496 ..
497 }) = plan
498 {
499 let columns =
501 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
502 let Some(l) = l.get_as_join_column() else {
503 return internal_err!(
504 "Invalid join key. Expected column, found {l:?}"
505 );
506 };
507 let Some(r) = r.get_as_join_column() else {
508 return internal_err!(
509 "Invalid join key. Expected column, found {r:?}"
510 );
511 };
512 accumu.insert(l.to_owned());
513 accumu.insert(r.to_owned());
514 Result::<_, DataFusionError>::Ok(accumu)
515 })?;
516 using_columns.push(columns);
517 }
518 Ok(TreeNodeRecursion::Continue)
519 })?;
520
521 Ok(using_columns)
522 }
523
524 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
526 match self {
527 LogicalPlan::Projection(projection) => {
528 Ok(Some(projection.expr.as_slice()[0].clone()))
529 }
530 LogicalPlan::Aggregate(agg) => {
531 if agg.group_expr.is_empty() {
532 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
533 } else {
534 Ok(Some(agg.group_expr.as_slice()[0].clone()))
535 }
536 }
537 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
538 Ok(Some(select_expr[0].clone()))
539 }
540 LogicalPlan::Filter(Filter { input, .. })
541 | LogicalPlan::Distinct(Distinct::All(input))
542 | LogicalPlan::Sort(Sort { input, .. })
543 | LogicalPlan::Limit(Limit { input, .. })
544 | LogicalPlan::Repartition(Repartition { input, .. })
545 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
546 LogicalPlan::Join(Join {
547 left,
548 right,
549 join_type,
550 ..
551 }) => match join_type {
552 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
553 if left.schema().fields().is_empty() {
554 right.head_output_expr()
555 } else {
556 left.head_output_expr()
557 }
558 }
559 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
560 left.head_output_expr()
561 }
562 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
563 right.head_output_expr()
564 }
565 },
566 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
567 static_term.head_output_expr()
568 }
569 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
570 union.schema.qualified_field(0),
571 )))),
572 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
573 table.projected_schema.qualified_field(0),
574 )))),
575 LogicalPlan::SubqueryAlias(subquery_alias) => {
576 let expr_opt = subquery_alias.input.head_output_expr()?;
577 expr_opt
578 .map(|expr| {
579 Ok(Expr::Column(create_col_from_scalar_expr(
580 &expr,
581 subquery_alias.alias.to_string(),
582 )?))
583 })
584 .map_or(Ok(None), |v| v.map(Some))
585 }
586 LogicalPlan::Subquery(_) => Ok(None),
587 LogicalPlan::EmptyRelation(_)
588 | LogicalPlan::Statement(_)
589 | LogicalPlan::Values(_)
590 | LogicalPlan::Explain(_)
591 | LogicalPlan::Analyze(_)
592 | LogicalPlan::Extension(_)
593 | LogicalPlan::Dml(_)
594 | LogicalPlan::Copy(_)
595 | LogicalPlan::Ddl(_)
596 | LogicalPlan::DescribeTable(_)
597 | LogicalPlan::Unnest(_) => Ok(None),
598 }
599 }
600
601 pub fn recompute_schema(self) -> Result<Self> {
624 match self {
625 LogicalPlan::Projection(Projection {
628 expr,
629 input,
630 schema: _,
631 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
632 LogicalPlan::Dml(_) => Ok(self),
633 LogicalPlan::Copy(_) => Ok(self),
634 LogicalPlan::Values(Values { schema, values }) => {
635 Ok(LogicalPlan::Values(Values { schema, values }))
637 }
638 LogicalPlan::Filter(Filter { predicate, input }) => {
639 Filter::try_new(predicate, input).map(LogicalPlan::Filter)
640 }
641 LogicalPlan::Repartition(_) => Ok(self),
642 LogicalPlan::Window(Window {
643 input,
644 window_expr,
645 schema: _,
646 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
647 LogicalPlan::Aggregate(Aggregate {
648 input,
649 group_expr,
650 aggr_expr,
651 schema: _,
652 }) => Aggregate::try_new(input, group_expr, aggr_expr)
653 .map(LogicalPlan::Aggregate),
654 LogicalPlan::Sort(_) => Ok(self),
655 LogicalPlan::Join(Join {
656 left,
657 right,
658 filter,
659 join_type,
660 join_constraint,
661 on,
662 schema: _,
663 null_equality,
664 }) => {
665 let schema =
666 build_join_schema(left.schema(), right.schema(), &join_type)?;
667
668 let new_on: Vec<_> = on
669 .into_iter()
670 .map(|equi_expr| {
671 (equi_expr.0.unalias(), equi_expr.1.unalias())
673 })
674 .collect();
675
676 Ok(LogicalPlan::Join(Join {
677 left,
678 right,
679 join_type,
680 join_constraint,
681 on: new_on,
682 filter,
683 schema: DFSchemaRef::new(schema),
684 null_equality,
685 }))
686 }
687 LogicalPlan::Subquery(_) => Ok(self),
688 LogicalPlan::SubqueryAlias(SubqueryAlias {
689 input,
690 alias,
691 schema: _,
692 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
693 LogicalPlan::Limit(_) => Ok(self),
694 LogicalPlan::Ddl(_) => Ok(self),
695 LogicalPlan::Extension(Extension { node }) => {
696 let expr = node.expressions();
699 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
700 Ok(LogicalPlan::Extension(Extension {
701 node: node.with_exprs_and_inputs(expr, inputs)?,
702 }))
703 }
704 LogicalPlan::Union(Union { inputs, schema }) => {
705 let first_input_schema = inputs[0].schema();
706 if schema.fields().len() == first_input_schema.fields().len() {
707 Ok(LogicalPlan::Union(Union { inputs, schema }))
709 } else {
710 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
718 }
719 }
720 LogicalPlan::Distinct(distinct) => {
721 let distinct = match distinct {
722 Distinct::All(input) => Distinct::All(input),
723 Distinct::On(DistinctOn {
724 on_expr,
725 select_expr,
726 sort_expr,
727 input,
728 schema: _,
729 }) => Distinct::On(DistinctOn::try_new(
730 on_expr,
731 select_expr,
732 sort_expr,
733 input,
734 )?),
735 };
736 Ok(LogicalPlan::Distinct(distinct))
737 }
738 LogicalPlan::RecursiveQuery(_) => Ok(self),
739 LogicalPlan::Analyze(_) => Ok(self),
740 LogicalPlan::Explain(_) => Ok(self),
741 LogicalPlan::TableScan(_) => Ok(self),
742 LogicalPlan::EmptyRelation(_) => Ok(self),
743 LogicalPlan::Statement(_) => Ok(self),
744 LogicalPlan::DescribeTable(_) => Ok(self),
745 LogicalPlan::Unnest(Unnest {
746 input,
747 exec_columns,
748 options,
749 ..
750 }) => {
751 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
753 }
754 }
755 }
756
757 pub fn with_new_exprs(
783 &self,
784 mut expr: Vec<Expr>,
785 inputs: Vec<LogicalPlan>,
786 ) -> Result<LogicalPlan> {
787 match self {
788 LogicalPlan::Projection(Projection { .. }) => {
791 let input = self.only_input(inputs)?;
792 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
793 }
794 LogicalPlan::Dml(DmlStatement {
795 table_name,
796 target,
797 op,
798 ..
799 }) => {
800 self.assert_no_expressions(expr)?;
801 let input = self.only_input(inputs)?;
802 Ok(LogicalPlan::Dml(DmlStatement::new(
803 table_name.clone(),
804 Arc::clone(target),
805 op.clone(),
806 Arc::new(input),
807 )))
808 }
809 LogicalPlan::Copy(CopyTo {
810 input: _,
811 output_url,
812 file_type,
813 options,
814 partition_by,
815 output_schema: _,
816 }) => {
817 self.assert_no_expressions(expr)?;
818 let input = self.only_input(inputs)?;
819 Ok(LogicalPlan::Copy(CopyTo::new(
820 Arc::new(input),
821 output_url.clone(),
822 partition_by.clone(),
823 Arc::clone(file_type),
824 options.clone(),
825 )))
826 }
827 LogicalPlan::Values(Values { schema, .. }) => {
828 self.assert_no_inputs(inputs)?;
829 Ok(LogicalPlan::Values(Values {
830 schema: Arc::clone(schema),
831 values: expr
832 .chunks_exact(schema.fields().len())
833 .map(|s| s.to_vec())
834 .collect(),
835 }))
836 }
837 LogicalPlan::Filter { .. } => {
838 let predicate = self.only_expr(expr)?;
839 let input = self.only_input(inputs)?;
840
841 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
842 }
843 LogicalPlan::Repartition(Repartition {
844 partitioning_scheme,
845 ..
846 }) => match partitioning_scheme {
847 Partitioning::RoundRobinBatch(n) => {
848 self.assert_no_expressions(expr)?;
849 let input = self.only_input(inputs)?;
850 Ok(LogicalPlan::Repartition(Repartition {
851 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
852 input: Arc::new(input),
853 }))
854 }
855 Partitioning::Hash(_, n) => {
856 let input = self.only_input(inputs)?;
857 Ok(LogicalPlan::Repartition(Repartition {
858 partitioning_scheme: Partitioning::Hash(expr, *n),
859 input: Arc::new(input),
860 }))
861 }
862 Partitioning::DistributeBy(_) => {
863 let input = self.only_input(inputs)?;
864 Ok(LogicalPlan::Repartition(Repartition {
865 partitioning_scheme: Partitioning::DistributeBy(expr),
866 input: Arc::new(input),
867 }))
868 }
869 },
870 LogicalPlan::Window(Window { window_expr, .. }) => {
871 assert_eq!(window_expr.len(), expr.len());
872 let input = self.only_input(inputs)?;
873 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
874 }
875 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
876 let input = self.only_input(inputs)?;
877 let agg_expr = expr.split_off(group_expr.len());
879
880 Aggregate::try_new(Arc::new(input), expr, agg_expr)
881 .map(LogicalPlan::Aggregate)
882 }
883 LogicalPlan::Sort(Sort {
884 expr: sort_expr,
885 fetch,
886 ..
887 }) => {
888 let input = self.only_input(inputs)?;
889 Ok(LogicalPlan::Sort(Sort {
890 expr: expr
891 .into_iter()
892 .zip(sort_expr.iter())
893 .map(|(expr, sort)| sort.with_expr(expr))
894 .collect(),
895 input: Arc::new(input),
896 fetch: *fetch,
897 }))
898 }
899 LogicalPlan::Join(Join {
900 join_type,
901 join_constraint,
902 on,
903 null_equality,
904 ..
905 }) => {
906 let (left, right) = self.only_two_inputs(inputs)?;
907 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
908
909 let equi_expr_count = on.len() * 2;
910 assert!(expr.len() >= equi_expr_count);
911
912 let filter_expr = if expr.len() > equi_expr_count {
915 expr.pop()
916 } else {
917 None
918 };
919
920 assert_eq!(expr.len(), equi_expr_count);
923 let mut new_on = Vec::with_capacity(on.len());
924 let mut iter = expr.into_iter();
925 while let Some(left) = iter.next() {
926 let Some(right) = iter.next() else {
927 internal_err!(
928 "Expected a pair of expressions to construct the join on expression"
929 )?
930 };
931
932 new_on.push((left.unalias(), right.unalias()));
934 }
935
936 Ok(LogicalPlan::Join(Join {
937 left: Arc::new(left),
938 right: Arc::new(right),
939 join_type: *join_type,
940 join_constraint: *join_constraint,
941 on: new_on,
942 filter: filter_expr,
943 schema: DFSchemaRef::new(schema),
944 null_equality: *null_equality,
945 }))
946 }
947 LogicalPlan::Subquery(Subquery {
948 outer_ref_columns,
949 spans,
950 ..
951 }) => {
952 self.assert_no_expressions(expr)?;
953 let input = self.only_input(inputs)?;
954 let subquery = LogicalPlanBuilder::from(input).build()?;
955 Ok(LogicalPlan::Subquery(Subquery {
956 subquery: Arc::new(subquery),
957 outer_ref_columns: outer_ref_columns.clone(),
958 spans: spans.clone(),
959 }))
960 }
961 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
962 self.assert_no_expressions(expr)?;
963 let input = self.only_input(inputs)?;
964 SubqueryAlias::try_new(Arc::new(input), alias.clone())
965 .map(LogicalPlan::SubqueryAlias)
966 }
967 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
968 let old_expr_len = skip.iter().chain(fetch.iter()).count();
969 assert_eq_or_internal_err!(
970 old_expr_len,
971 expr.len(),
972 "Invalid number of new Limit expressions: expected {}, got {}",
973 old_expr_len,
974 expr.len()
975 );
976 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
978 let new_skip = skip.as_ref().and_then(|_| expr.pop());
979 let input = self.only_input(inputs)?;
980 Ok(LogicalPlan::Limit(Limit {
981 skip: new_skip.map(Box::new),
982 fetch: new_fetch.map(Box::new),
983 input: Arc::new(input),
984 }))
985 }
986 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
987 name,
988 if_not_exists,
989 or_replace,
990 column_defaults,
991 temporary,
992 ..
993 })) => {
994 self.assert_no_expressions(expr)?;
995 let input = self.only_input(inputs)?;
996 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
997 CreateMemoryTable {
998 input: Arc::new(input),
999 constraints: Constraints::default(),
1000 name: name.clone(),
1001 if_not_exists: *if_not_exists,
1002 or_replace: *or_replace,
1003 column_defaults: column_defaults.clone(),
1004 temporary: *temporary,
1005 },
1006 )))
1007 }
1008 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1009 name,
1010 or_replace,
1011 definition,
1012 temporary,
1013 ..
1014 })) => {
1015 self.assert_no_expressions(expr)?;
1016 let input = self.only_input(inputs)?;
1017 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1018 input: Arc::new(input),
1019 name: name.clone(),
1020 or_replace: *or_replace,
1021 temporary: *temporary,
1022 definition: definition.clone(),
1023 })))
1024 }
1025 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1026 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1027 })),
1028 LogicalPlan::Union(Union { schema, .. }) => {
1029 self.assert_no_expressions(expr)?;
1030 let input_schema = inputs[0].schema();
1031 let schema = if schema.fields().len() == input_schema.fields().len() {
1033 Arc::clone(schema)
1034 } else {
1035 Arc::clone(input_schema)
1036 };
1037 Ok(LogicalPlan::Union(Union {
1038 inputs: inputs.into_iter().map(Arc::new).collect(),
1039 schema,
1040 }))
1041 }
1042 LogicalPlan::Distinct(distinct) => {
1043 let distinct = match distinct {
1044 Distinct::All(_) => {
1045 self.assert_no_expressions(expr)?;
1046 let input = self.only_input(inputs)?;
1047 Distinct::All(Arc::new(input))
1048 }
1049 Distinct::On(DistinctOn {
1050 on_expr,
1051 select_expr,
1052 ..
1053 }) => {
1054 let input = self.only_input(inputs)?;
1055 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1056 let select_expr = expr.split_off(on_expr.len());
1057 assert!(
1058 sort_expr.is_empty(),
1059 "with_new_exprs for Distinct does not support sort expressions"
1060 );
1061 Distinct::On(DistinctOn::try_new(
1062 expr,
1063 select_expr,
1064 None, Arc::new(input),
1066 )?)
1067 }
1068 };
1069 Ok(LogicalPlan::Distinct(distinct))
1070 }
1071 LogicalPlan::RecursiveQuery(RecursiveQuery {
1072 name, is_distinct, ..
1073 }) => {
1074 self.assert_no_expressions(expr)?;
1075 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1076 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1077 name: name.clone(),
1078 static_term: Arc::new(static_term),
1079 recursive_term: Arc::new(recursive_term),
1080 is_distinct: *is_distinct,
1081 }))
1082 }
1083 LogicalPlan::Analyze(a) => {
1084 self.assert_no_expressions(expr)?;
1085 let input = self.only_input(inputs)?;
1086 Ok(LogicalPlan::Analyze(Analyze {
1087 verbose: a.verbose,
1088 schema: Arc::clone(&a.schema),
1089 input: Arc::new(input),
1090 }))
1091 }
1092 LogicalPlan::Explain(e) => {
1093 self.assert_no_expressions(expr)?;
1094 let input = self.only_input(inputs)?;
1095 Ok(LogicalPlan::Explain(Explain {
1096 verbose: e.verbose,
1097 plan: Arc::new(input),
1098 explain_format: e.explain_format.clone(),
1099 stringified_plans: e.stringified_plans.clone(),
1100 schema: Arc::clone(&e.schema),
1101 logical_optimization_succeeded: e.logical_optimization_succeeded,
1102 }))
1103 }
1104 LogicalPlan::Statement(Statement::Prepare(Prepare {
1105 name, fields, ..
1106 })) => {
1107 self.assert_no_expressions(expr)?;
1108 let input = self.only_input(inputs)?;
1109 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1110 name: name.clone(),
1111 fields: fields.clone(),
1112 input: Arc::new(input),
1113 })))
1114 }
1115 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1116 self.assert_no_inputs(inputs)?;
1117 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1118 name: name.clone(),
1119 parameters: expr,
1120 })))
1121 }
1122 LogicalPlan::TableScan(ts) => {
1123 self.assert_no_inputs(inputs)?;
1124 Ok(LogicalPlan::TableScan(TableScan {
1125 filters: expr,
1126 ..ts.clone()
1127 }))
1128 }
1129 LogicalPlan::EmptyRelation(_)
1130 | LogicalPlan::Ddl(_)
1131 | LogicalPlan::Statement(_)
1132 | LogicalPlan::DescribeTable(_) => {
1133 self.assert_no_expressions(expr)?;
1135 self.assert_no_inputs(inputs)?;
1136 Ok(self.clone())
1137 }
1138 LogicalPlan::Unnest(Unnest {
1139 exec_columns: columns,
1140 options,
1141 ..
1142 }) => {
1143 self.assert_no_expressions(expr)?;
1144 let input = self.only_input(inputs)?;
1145 let new_plan =
1147 unnest_with_options(input, columns.clone(), options.clone())?;
1148 Ok(new_plan)
1149 }
1150 }
1151 }
1152
1153 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1155 match check {
1156 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1157 InvariantLevel::Executable => assert_executable_invariants(self),
1158 }
1159 }
1160
1161 #[inline]
1163 #[expect(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1165 assert_or_internal_err!(
1166 expr.is_empty(),
1167 "{self:?} should have no exprs, got {:?}",
1168 expr
1169 );
1170 Ok(())
1171 }
1172
1173 #[inline]
1175 #[expect(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1177 assert_or_internal_err!(
1178 inputs.is_empty(),
1179 "{self:?} should have no inputs, got: {:?}",
1180 inputs
1181 );
1182 Ok(())
1183 }
1184
1185 #[inline]
1187 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1188 assert_eq_or_internal_err!(
1189 expr.len(),
1190 1,
1191 "{self:?} should have exactly one expr, got {:?}",
1192 &expr
1193 );
1194 Ok(expr.remove(0))
1195 }
1196
1197 #[inline]
1199 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1200 assert_eq_or_internal_err!(
1201 inputs.len(),
1202 1,
1203 "{self:?} should have exactly one input, got {:?}",
1204 &inputs
1205 );
1206 Ok(inputs.remove(0))
1207 }
1208
1209 #[inline]
1211 fn only_two_inputs(
1212 &self,
1213 mut inputs: Vec<LogicalPlan>,
1214 ) -> Result<(LogicalPlan, LogicalPlan)> {
1215 assert_eq_or_internal_err!(
1216 inputs.len(),
1217 2,
1218 "{self:?} should have exactly two inputs, got {:?}",
1219 &inputs
1220 );
1221 let right = inputs.remove(1);
1222 let left = inputs.remove(0);
1223 Ok((left, right))
1224 }
1225
1226 pub fn with_param_values(
1279 self,
1280 param_values: impl Into<ParamValues>,
1281 ) -> Result<LogicalPlan> {
1282 let param_values = param_values.into();
1283 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1284
1285 Ok(
1287 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1288 plan_with_values
1289 {
1290 param_values.verify_fields(&prepare_lp.fields)?;
1291 Arc::unwrap_or_clone(prepare_lp.input)
1293 } else {
1294 plan_with_values
1295 },
1296 )
1297 }
1298
1299 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1304 match self {
1305 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1306 LogicalPlan::Filter(filter) => {
1307 if filter.is_scalar() {
1308 Some(1)
1309 } else {
1310 filter.input.max_rows()
1311 }
1312 }
1313 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1314 LogicalPlan::Aggregate(Aggregate {
1315 input, group_expr, ..
1316 }) => {
1317 if group_expr
1319 .iter()
1320 .all(|expr| matches!(expr, Expr::Literal(_, _)))
1321 {
1322 Some(1)
1323 } else {
1324 input.max_rows()
1325 }
1326 }
1327 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1328 match (fetch, input.max_rows()) {
1329 (Some(fetch_limit), Some(input_max)) => {
1330 Some(input_max.min(*fetch_limit))
1331 }
1332 (Some(fetch_limit), None) => Some(*fetch_limit),
1333 (None, Some(input_max)) => Some(input_max),
1334 (None, None) => None,
1335 }
1336 }
1337 LogicalPlan::Join(Join {
1338 left,
1339 right,
1340 join_type,
1341 ..
1342 }) => match join_type {
1343 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1344 JoinType::Left | JoinType::Right | JoinType::Full => {
1345 match (left.max_rows()?, right.max_rows()?, join_type) {
1346 (0, 0, _) => Some(0),
1347 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1348 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1349 (left_max, right_max, _) => Some(left_max * right_max),
1350 }
1351 }
1352 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1353 left.max_rows()
1354 }
1355 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1356 right.max_rows()
1357 }
1358 },
1359 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1360 LogicalPlan::Union(Union { inputs, .. }) => {
1361 inputs.iter().try_fold(0usize, |mut acc, plan| {
1362 acc += plan.max_rows()?;
1363 Some(acc)
1364 })
1365 }
1366 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1367 LogicalPlan::EmptyRelation(_) => Some(0),
1368 LogicalPlan::RecursiveQuery(_) => None,
1369 LogicalPlan::Subquery(_) => None,
1370 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1371 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1372 Ok(FetchType::Literal(s)) => s,
1373 _ => None,
1374 },
1375 LogicalPlan::Distinct(
1376 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1377 ) => input.max_rows(),
1378 LogicalPlan::Values(v) => Some(v.values.len()),
1379 LogicalPlan::Unnest(_) => None,
1380 LogicalPlan::Ddl(_)
1381 | LogicalPlan::Explain(_)
1382 | LogicalPlan::Analyze(_)
1383 | LogicalPlan::Dml(_)
1384 | LogicalPlan::Copy(_)
1385 | LogicalPlan::DescribeTable(_)
1386 | LogicalPlan::Statement(_)
1387 | LogicalPlan::Extension(_) => None,
1388 }
1389 }
1390
1391 pub fn skip(&self) -> Result<Option<usize>> {
1396 match self {
1397 LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
1398 SkipType::Literal(0) => Ok(None),
1399 SkipType::Literal(n) => Ok(Some(n)),
1400 SkipType::UnsupportedExpr => Ok(None),
1401 },
1402 LogicalPlan::Sort(_) => Ok(None),
1403 LogicalPlan::TableScan(_) => Ok(None),
1404 LogicalPlan::Projection(_) => Ok(None),
1405 LogicalPlan::Filter(_) => Ok(None),
1406 LogicalPlan::Window(_) => Ok(None),
1407 LogicalPlan::Aggregate(_) => Ok(None),
1408 LogicalPlan::Join(_) => Ok(None),
1409 LogicalPlan::Repartition(_) => Ok(None),
1410 LogicalPlan::Union(_) => Ok(None),
1411 LogicalPlan::EmptyRelation(_) => Ok(None),
1412 LogicalPlan::Subquery(_) => Ok(None),
1413 LogicalPlan::SubqueryAlias(_) => Ok(None),
1414 LogicalPlan::Statement(_) => Ok(None),
1415 LogicalPlan::Values(_) => Ok(None),
1416 LogicalPlan::Explain(_) => Ok(None),
1417 LogicalPlan::Analyze(_) => Ok(None),
1418 LogicalPlan::Extension(_) => Ok(None),
1419 LogicalPlan::Distinct(_) => Ok(None),
1420 LogicalPlan::Dml(_) => Ok(None),
1421 LogicalPlan::Ddl(_) => Ok(None),
1422 LogicalPlan::Copy(_) => Ok(None),
1423 LogicalPlan::DescribeTable(_) => Ok(None),
1424 LogicalPlan::Unnest(_) => Ok(None),
1425 LogicalPlan::RecursiveQuery(_) => Ok(None),
1426 }
1427 }
1428
1429 pub fn fetch(&self) -> Result<Option<usize>> {
1435 match self {
1436 LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
1437 LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
1438 LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
1439 FetchType::Literal(s) => Ok(s),
1440 FetchType::UnsupportedExpr => Ok(None),
1441 },
1442 LogicalPlan::Projection(_) => Ok(None),
1443 LogicalPlan::Filter(_) => Ok(None),
1444 LogicalPlan::Window(_) => Ok(None),
1445 LogicalPlan::Aggregate(_) => Ok(None),
1446 LogicalPlan::Join(_) => Ok(None),
1447 LogicalPlan::Repartition(_) => Ok(None),
1448 LogicalPlan::Union(_) => Ok(None),
1449 LogicalPlan::EmptyRelation(_) => Ok(None),
1450 LogicalPlan::Subquery(_) => Ok(None),
1451 LogicalPlan::SubqueryAlias(_) => Ok(None),
1452 LogicalPlan::Statement(_) => Ok(None),
1453 LogicalPlan::Values(_) => Ok(None),
1454 LogicalPlan::Explain(_) => Ok(None),
1455 LogicalPlan::Analyze(_) => Ok(None),
1456 LogicalPlan::Extension(_) => Ok(None),
1457 LogicalPlan::Distinct(_) => Ok(None),
1458 LogicalPlan::Dml(_) => Ok(None),
1459 LogicalPlan::Ddl(_) => Ok(None),
1460 LogicalPlan::Copy(_) => Ok(None),
1461 LogicalPlan::DescribeTable(_) => Ok(None),
1462 LogicalPlan::Unnest(_) => Ok(None),
1463 LogicalPlan::RecursiveQuery(_) => Ok(None),
1464 }
1465 }
1466
1467 pub fn contains_outer_reference(&self) -> bool {
1469 let mut contains = false;
1470 self.apply_expressions(|expr| {
1471 Ok(if expr.contains_outer() {
1472 contains = true;
1473 TreeNodeRecursion::Stop
1474 } else {
1475 TreeNodeRecursion::Continue
1476 })
1477 })
1478 .unwrap();
1479 contains
1480 }
1481
1482 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1490 match self {
1491 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1492 .output_expressions()?
1493 .into_iter()
1494 .zip(self.schema().columns())
1495 .collect()),
1496 LogicalPlan::Window(Window {
1497 window_expr,
1498 input,
1499 schema,
1500 }) => {
1501 let mut output_exprs = input.columnized_output_exprs()?;
1509 let input_len = input.schema().fields().len();
1510 output_exprs.extend(
1511 window_expr
1512 .iter()
1513 .zip(schema.columns().into_iter().skip(input_len)),
1514 );
1515 Ok(output_exprs)
1516 }
1517 _ => Ok(vec![]),
1518 }
1519 }
1520}
1521
1522impl LogicalPlan {
1523 pub fn replace_params_with_values(
1530 self,
1531 param_values: &ParamValues,
1532 ) -> Result<LogicalPlan> {
1533 self.transform_up_with_subqueries(|plan| {
1534 let schema = Arc::clone(plan.schema());
1535 let name_preserver = NamePreserver::new(&plan);
1536 plan.map_expressions(|e| {
1537 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1538 if !has_placeholder {
1539 Ok(Transformed::no(e))
1543 } else {
1544 let original_name = name_preserver.save(&e);
1545 let transformed_expr = e.transform_up(|e| {
1546 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1547 let (value, metadata) = param_values
1548 .get_placeholders_with_values(&id)?
1549 .into_inner();
1550 Ok(Transformed::yes(Expr::Literal(value, metadata)))
1551 } else {
1552 Ok(Transformed::no(e))
1553 }
1554 })?;
1555 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1557 }
1558 })?
1559 .map_data(|plan| plan.update_schema_data_type())
1560 })
1561 .map(|res| res.data)
1562 }
1563
1564 fn update_schema_data_type(self) -> Result<LogicalPlan> {
1570 match self {
1571 LogicalPlan::Values(Values { values, schema: _ }) => {
1575 LogicalPlanBuilder::values(values)?.build()
1576 }
1577 plan => plan.recompute_schema(),
1579 }
1580 }
1581
1582 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1584 let mut param_names = HashSet::new();
1585 self.apply_with_subqueries(|plan| {
1586 plan.apply_expressions(|expr| {
1587 expr.apply(|expr| {
1588 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1589 param_names.insert(id.clone());
1590 }
1591 Ok(TreeNodeRecursion::Continue)
1592 })
1593 })
1594 })
1595 .map(|_| param_names)
1596 }
1597
1598 pub fn get_parameter_types(
1603 &self,
1604 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1605 let mut parameter_fields = self.get_parameter_fields()?;
1606 Ok(parameter_fields
1607 .drain()
1608 .map(|(name, maybe_field)| {
1609 (name, maybe_field.map(|field| field.data_type().clone()))
1610 })
1611 .collect())
1612 }
1613
1614 pub fn get_parameter_fields(
1616 &self,
1617 ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1618 let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1619
1620 self.apply_with_subqueries(|plan| {
1621 plan.apply_expressions(|expr| {
1622 expr.apply(|expr| {
1623 if let Expr::Placeholder(Placeholder { id, field }) = expr {
1624 let prev = param_types.get(id);
1625 match (prev, field) {
1626 (Some(Some(prev)), Some(field)) => {
1627 check_metadata_with_storage_equal(
1628 (field.data_type(), Some(field.metadata())),
1629 (prev.data_type(), Some(prev.metadata())),
1630 "parameter",
1631 &format!(": Conflicting types for id {id}"),
1632 )?;
1633 }
1634 (_, Some(field)) => {
1635 param_types.insert(id.clone(), Some(Arc::clone(field)));
1636 }
1637 _ => {
1638 param_types.insert(id.clone(), None);
1639 }
1640 }
1641 }
1642 Ok(TreeNodeRecursion::Continue)
1643 })
1644 })
1645 })
1646 .map(|_| param_types)
1647 }
1648
1649 pub fn display_indent(&self) -> impl Display + '_ {
1681 struct Wrapper<'a>(&'a LogicalPlan);
1684 impl Display for Wrapper<'_> {
1685 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1686 let with_schema = false;
1687 let mut visitor = IndentVisitor::new(f, with_schema);
1688 match self.0.visit_with_subqueries(&mut visitor) {
1689 Ok(_) => Ok(()),
1690 Err(_) => Err(fmt::Error),
1691 }
1692 }
1693 }
1694 Wrapper(self)
1695 }
1696
1697 pub fn display_indent_schema(&self) -> impl Display + '_ {
1727 struct Wrapper<'a>(&'a LogicalPlan);
1730 impl Display for Wrapper<'_> {
1731 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1732 let with_schema = true;
1733 let mut visitor = IndentVisitor::new(f, with_schema);
1734 match self.0.visit_with_subqueries(&mut visitor) {
1735 Ok(_) => Ok(()),
1736 Err(_) => Err(fmt::Error),
1737 }
1738 }
1739 }
1740 Wrapper(self)
1741 }
1742
1743 pub fn display_pg_json(&self) -> impl Display + '_ {
1747 struct Wrapper<'a>(&'a LogicalPlan);
1750 impl Display for Wrapper<'_> {
1751 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1752 let mut visitor = PgJsonVisitor::new(f);
1753 visitor.with_schema(true);
1754 match self.0.visit_with_subqueries(&mut visitor) {
1755 Ok(_) => Ok(()),
1756 Err(_) => Err(fmt::Error),
1757 }
1758 }
1759 }
1760 Wrapper(self)
1761 }
1762
1763 pub fn display_graphviz(&self) -> impl Display + '_ {
1793 struct Wrapper<'a>(&'a LogicalPlan);
1796 impl Display for Wrapper<'_> {
1797 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1798 let mut visitor = GraphvizVisitor::new(f);
1799
1800 visitor.start_graph()?;
1801
1802 visitor.pre_visit_plan("LogicalPlan")?;
1803 self.0
1804 .visit_with_subqueries(&mut visitor)
1805 .map_err(|_| fmt::Error)?;
1806 visitor.post_visit_plan()?;
1807
1808 visitor.set_with_schema(true);
1809 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1810 self.0
1811 .visit_with_subqueries(&mut visitor)
1812 .map_err(|_| fmt::Error)?;
1813 visitor.post_visit_plan()?;
1814
1815 visitor.end_graph()?;
1816 Ok(())
1817 }
1818 }
1819 Wrapper(self)
1820 }
1821
1822 pub fn display(&self) -> impl Display + '_ {
1844 struct Wrapper<'a>(&'a LogicalPlan);
1847 impl Display for Wrapper<'_> {
1848 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1849 match self.0 {
1850 LogicalPlan::EmptyRelation(EmptyRelation {
1851 produce_one_row,
1852 schema: _,
1853 }) => {
1854 let rows = if *produce_one_row { 1 } else { 0 };
1855 write!(f, "EmptyRelation: rows={rows}")
1856 }
1857 LogicalPlan::RecursiveQuery(RecursiveQuery {
1858 is_distinct, ..
1859 }) => {
1860 write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1861 }
1862 LogicalPlan::Values(Values { values, .. }) => {
1863 let str_values: Vec<_> = values
1864 .iter()
1865 .take(5)
1867 .map(|row| {
1868 let item = row
1869 .iter()
1870 .map(|expr| expr.to_string())
1871 .collect::<Vec<_>>()
1872 .join(", ");
1873 format!("({item})")
1874 })
1875 .collect();
1876
1877 let eclipse = if values.len() > 5 { "..." } else { "" };
1878 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1879 }
1880
1881 LogicalPlan::TableScan(TableScan {
1882 source,
1883 table_name,
1884 projection,
1885 filters,
1886 fetch,
1887 ..
1888 }) => {
1889 let projected_fields = match projection {
1890 Some(indices) => {
1891 let schema = source.schema();
1892 let names: Vec<&str> = indices
1893 .iter()
1894 .map(|i| schema.field(*i).name().as_str())
1895 .collect();
1896 format!(" projection=[{}]", names.join(", "))
1897 }
1898 _ => "".to_string(),
1899 };
1900
1901 write!(f, "TableScan: {table_name}{projected_fields}")?;
1902
1903 if !filters.is_empty() {
1904 let mut full_filter = vec![];
1905 let mut partial_filter = vec![];
1906 let mut unsupported_filters = vec![];
1907 let filters: Vec<&Expr> = filters.iter().collect();
1908
1909 if let Ok(results) =
1910 source.supports_filters_pushdown(&filters)
1911 {
1912 filters.iter().zip(results.iter()).for_each(
1913 |(x, res)| match res {
1914 TableProviderFilterPushDown::Exact => {
1915 full_filter.push(x)
1916 }
1917 TableProviderFilterPushDown::Inexact => {
1918 partial_filter.push(x)
1919 }
1920 TableProviderFilterPushDown::Unsupported => {
1921 unsupported_filters.push(x)
1922 }
1923 },
1924 );
1925 }
1926
1927 if !full_filter.is_empty() {
1928 write!(
1929 f,
1930 ", full_filters=[{}]",
1931 expr_vec_fmt!(full_filter)
1932 )?;
1933 };
1934 if !partial_filter.is_empty() {
1935 write!(
1936 f,
1937 ", partial_filters=[{}]",
1938 expr_vec_fmt!(partial_filter)
1939 )?;
1940 }
1941 if !unsupported_filters.is_empty() {
1942 write!(
1943 f,
1944 ", unsupported_filters=[{}]",
1945 expr_vec_fmt!(unsupported_filters)
1946 )?;
1947 }
1948 }
1949
1950 if let Some(n) = fetch {
1951 write!(f, ", fetch={n}")?;
1952 }
1953
1954 Ok(())
1955 }
1956 LogicalPlan::Projection(Projection { expr, .. }) => {
1957 write!(f, "Projection:")?;
1958 for (i, expr_item) in expr.iter().enumerate() {
1959 if i > 0 {
1960 write!(f, ",")?;
1961 }
1962 write!(f, " {expr_item}")?;
1963 }
1964 Ok(())
1965 }
1966 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1967 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1968 }
1969 LogicalPlan::Copy(CopyTo {
1970 input: _,
1971 output_url,
1972 file_type,
1973 options,
1974 ..
1975 }) => {
1976 let op_str = options
1977 .iter()
1978 .map(|(k, v)| format!("{k} {v}"))
1979 .collect::<Vec<String>>()
1980 .join(", ");
1981
1982 write!(
1983 f,
1984 "CopyTo: format={} output_url={output_url} options: ({op_str})",
1985 file_type.get_ext()
1986 )
1987 }
1988 LogicalPlan::Ddl(ddl) => {
1989 write!(f, "{}", ddl.display())
1990 }
1991 LogicalPlan::Filter(Filter {
1992 predicate: expr, ..
1993 }) => write!(f, "Filter: {expr}"),
1994 LogicalPlan::Window(Window { window_expr, .. }) => {
1995 write!(
1996 f,
1997 "WindowAggr: windowExpr=[[{}]]",
1998 expr_vec_fmt!(window_expr)
1999 )
2000 }
2001 LogicalPlan::Aggregate(Aggregate {
2002 group_expr,
2003 aggr_expr,
2004 ..
2005 }) => write!(
2006 f,
2007 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
2008 expr_vec_fmt!(group_expr),
2009 expr_vec_fmt!(aggr_expr)
2010 ),
2011 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
2012 write!(f, "Sort: ")?;
2013 for (i, expr_item) in expr.iter().enumerate() {
2014 if i > 0 {
2015 write!(f, ", ")?;
2016 }
2017 write!(f, "{expr_item}")?;
2018 }
2019 if let Some(a) = fetch {
2020 write!(f, ", fetch={a}")?;
2021 }
2022
2023 Ok(())
2024 }
2025 LogicalPlan::Join(Join {
2026 on: keys,
2027 filter,
2028 join_constraint,
2029 join_type,
2030 ..
2031 }) => {
2032 let join_expr: Vec<String> =
2033 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
2034 let filter_expr = filter
2035 .as_ref()
2036 .map(|expr| format!(" Filter: {expr}"))
2037 .unwrap_or_else(|| "".to_string());
2038 let join_type = if filter.is_none()
2039 && keys.is_empty()
2040 && matches!(join_type, JoinType::Inner)
2041 {
2042 "Cross".to_string()
2043 } else {
2044 join_type.to_string()
2045 };
2046 match join_constraint {
2047 JoinConstraint::On => {
2048 write!(
2049 f,
2050 "{} Join: {}{}",
2051 join_type,
2052 join_expr.join(", "),
2053 filter_expr
2054 )
2055 }
2056 JoinConstraint::Using => {
2057 write!(
2058 f,
2059 "{} Join: Using {}{}",
2060 join_type,
2061 join_expr.join(", "),
2062 filter_expr,
2063 )
2064 }
2065 }
2066 }
2067 LogicalPlan::Repartition(Repartition {
2068 partitioning_scheme,
2069 ..
2070 }) => match partitioning_scheme {
2071 Partitioning::RoundRobinBatch(n) => {
2072 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
2073 }
2074 Partitioning::Hash(expr, n) => {
2075 let hash_expr: Vec<String> =
2076 expr.iter().map(|e| format!("{e}")).collect();
2077 write!(
2078 f,
2079 "Repartition: Hash({}) partition_count={}",
2080 hash_expr.join(", "),
2081 n
2082 )
2083 }
2084 Partitioning::DistributeBy(expr) => {
2085 let dist_by_expr: Vec<String> =
2086 expr.iter().map(|e| format!("{e}")).collect();
2087 write!(
2088 f,
2089 "Repartition: DistributeBy({})",
2090 dist_by_expr.join(", "),
2091 )
2092 }
2093 },
2094 LogicalPlan::Limit(limit) => {
2095 let skip_str = match limit.get_skip_type() {
2097 Ok(SkipType::Literal(n)) => n.to_string(),
2098 _ => limit
2099 .skip
2100 .as_ref()
2101 .map_or_else(|| "None".to_string(), |x| x.to_string()),
2102 };
2103 let fetch_str = match limit.get_fetch_type() {
2104 Ok(FetchType::Literal(Some(n))) => n.to_string(),
2105 Ok(FetchType::Literal(None)) => "None".to_string(),
2106 _ => limit
2107 .fetch
2108 .as_ref()
2109 .map_or_else(|| "None".to_string(), |x| x.to_string()),
2110 };
2111 write!(f, "Limit: skip={skip_str}, fetch={fetch_str}",)
2112 }
2113 LogicalPlan::Subquery(Subquery { .. }) => {
2114 write!(f, "Subquery:")
2115 }
2116 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
2117 write!(f, "SubqueryAlias: {alias}")
2118 }
2119 LogicalPlan::Statement(statement) => {
2120 write!(f, "{}", statement.display())
2121 }
2122 LogicalPlan::Distinct(distinct) => match distinct {
2123 Distinct::All(_) => write!(f, "Distinct:"),
2124 Distinct::On(DistinctOn {
2125 on_expr,
2126 select_expr,
2127 sort_expr,
2128 ..
2129 }) => write!(
2130 f,
2131 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2132 expr_vec_fmt!(on_expr),
2133 expr_vec_fmt!(select_expr),
2134 if let Some(sort_expr) = sort_expr {
2135 expr_vec_fmt!(sort_expr)
2136 } else {
2137 "".to_string()
2138 },
2139 ),
2140 },
2141 LogicalPlan::Explain { .. } => write!(f, "Explain"),
2142 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2143 LogicalPlan::Union(_) => write!(f, "Union"),
2144 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2145 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2146 write!(f, "DescribeTable")
2147 }
2148 LogicalPlan::Unnest(Unnest {
2149 input: plan,
2150 list_type_columns: list_col_indices,
2151 struct_type_columns: struct_col_indices,
2152 ..
2153 }) => {
2154 let input_columns = plan.schema().columns();
2155 let list_type_columns = list_col_indices
2156 .iter()
2157 .map(|(i, unnest_info)| {
2158 format!(
2159 "{}|depth={}",
2160 &input_columns[*i].to_string(),
2161 unnest_info.depth
2162 )
2163 })
2164 .collect::<Vec<String>>();
2165 let struct_type_columns = struct_col_indices
2166 .iter()
2167 .map(|i| &input_columns[*i])
2168 .collect::<Vec<&Column>>();
2169 write!(
2171 f,
2172 "Unnest: lists[{}] structs[{}]",
2173 expr_vec_fmt!(list_type_columns),
2174 expr_vec_fmt!(struct_type_columns)
2175 )
2176 }
2177 }
2178 }
2179 }
2180 Wrapper(self)
2181 }
2182}
2183
2184impl Display for LogicalPlan {
2185 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2186 self.display_indent().fmt(f)
2187 }
2188}
2189
2190impl ToStringifiedPlan for LogicalPlan {
2191 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2192 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2193 }
2194}
2195
2196#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2200pub struct EmptyRelation {
2201 pub produce_one_row: bool,
2203 pub schema: DFSchemaRef,
2205}
2206
2207impl PartialOrd for EmptyRelation {
2209 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2210 self.produce_one_row
2211 .partial_cmp(&other.produce_one_row)
2212 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2214 }
2215}
2216
2217#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2240pub struct RecursiveQuery {
2241 pub name: String,
2243 pub static_term: Arc<LogicalPlan>,
2245 pub recursive_term: Arc<LogicalPlan>,
2248 pub is_distinct: bool,
2251}
2252
2253#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2257pub struct Values {
2258 pub schema: DFSchemaRef,
2260 pub values: Vec<Vec<Expr>>,
2262}
2263
2264impl PartialOrd for Values {
2266 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2267 self.values
2268 .partial_cmp(&other.values)
2269 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2271 }
2272}
2273
2274#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2277#[non_exhaustive]
2279pub struct Projection {
2280 pub expr: Vec<Expr>,
2282 pub input: Arc<LogicalPlan>,
2284 pub schema: DFSchemaRef,
2286}
2287
2288impl PartialOrd for Projection {
2290 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2291 match self.expr.partial_cmp(&other.expr) {
2292 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2293 cmp => cmp,
2294 }
2295 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2297 }
2298}
2299
2300impl Projection {
2301 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2303 let projection_schema = projection_schema(&input, &expr)?;
2304 Self::try_new_with_schema(expr, input, projection_schema)
2305 }
2306
2307 pub fn try_new_with_schema(
2309 expr: Vec<Expr>,
2310 input: Arc<LogicalPlan>,
2311 schema: DFSchemaRef,
2312 ) -> Result<Self> {
2313 #[expect(deprecated)]
2314 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2315 && expr.len() != schema.fields().len()
2316 {
2317 return plan_err!(
2318 "Projection has mismatch between number of expressions ({}) and number of fields in schema ({})",
2319 expr.len(),
2320 schema.fields().len()
2321 );
2322 }
2323 Ok(Self {
2324 expr,
2325 input,
2326 schema,
2327 })
2328 }
2329
2330 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2332 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2333 Self {
2334 expr,
2335 input,
2336 schema,
2337 }
2338 }
2339}
2340
2341pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2361 let metadata = input.schema().metadata().clone();
2363
2364 let schema =
2366 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2367 .with_functional_dependencies(calc_func_dependencies_for_project(
2368 exprs, input,
2369 )?)?;
2370
2371 Ok(Arc::new(schema))
2372}
2373
2374#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2376#[non_exhaustive]
2378pub struct SubqueryAlias {
2379 pub input: Arc<LogicalPlan>,
2381 pub alias: TableReference,
2383 pub schema: DFSchemaRef,
2385}
2386
2387impl SubqueryAlias {
2388 pub fn try_new(
2389 plan: Arc<LogicalPlan>,
2390 alias: impl Into<TableReference>,
2391 ) -> Result<Self> {
2392 let alias = alias.into();
2393
2394 let aliases = unique_field_aliases(plan.schema().fields());
2400 let is_projection_needed = aliases.iter().any(Option::is_some);
2401
2402 let plan = if is_projection_needed {
2404 let projection_expressions = aliases
2405 .iter()
2406 .zip(plan.schema().iter())
2407 .map(|(alias, (qualifier, field))| {
2408 let column =
2409 Expr::Column(Column::new(qualifier.cloned(), field.name()));
2410 match alias {
2411 None => column,
2412 Some(alias) => {
2413 Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2414 }
2415 }
2416 })
2417 .collect();
2418 let projection = Projection::try_new(projection_expressions, plan)?;
2419 Arc::new(LogicalPlan::Projection(projection))
2420 } else {
2421 plan
2422 };
2423
2424 let fields = plan.schema().fields().clone();
2426 let meta_data = plan.schema().metadata().clone();
2427 let func_dependencies = plan.schema().functional_dependencies().clone();
2428
2429 let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2430 let schema = schema.as_arrow();
2431
2432 let schema = DFSchemaRef::new(
2433 DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2434 .with_functional_dependencies(func_dependencies)?,
2435 );
2436 Ok(SubqueryAlias {
2437 input: plan,
2438 alias,
2439 schema,
2440 })
2441 }
2442}
2443
2444impl PartialOrd for SubqueryAlias {
2446 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2447 match self.input.partial_cmp(&other.input) {
2448 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2449 cmp => cmp,
2450 }
2451 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2453 }
2454}
2455
2456#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2468#[non_exhaustive]
2469pub struct Filter {
2470 pub predicate: Expr,
2472 pub input: Arc<LogicalPlan>,
2474}
2475
2476impl Filter {
2477 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2482 Self::try_new_internal(predicate, input)
2483 }
2484
2485 #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2488 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2489 Self::try_new_internal(predicate, input)
2490 }
2491
2492 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2493 match data_type {
2494 DataType::Boolean | DataType::Null => true,
2496 DataType::Dictionary(_, value_type) => {
2497 Filter::is_allowed_filter_type(value_type.as_ref())
2498 }
2499 _ => false,
2500 }
2501 }
2502
2503 fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2504 if let Ok(predicate_type) = predicate.get_type(input.schema())
2509 && !Filter::is_allowed_filter_type(&predicate_type)
2510 {
2511 return plan_err!(
2512 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2513 );
2514 }
2515
2516 Ok(Self {
2517 predicate: predicate.unalias_nested().data,
2518 input,
2519 })
2520 }
2521
2522 fn is_scalar(&self) -> bool {
2538 let schema = self.input.schema();
2539
2540 let functional_dependencies = self.input.schema().functional_dependencies();
2541 let unique_keys = functional_dependencies.iter().filter(|dep| {
2542 let nullable = dep.nullable
2543 && dep
2544 .source_indices
2545 .iter()
2546 .any(|&source| schema.field(source).is_nullable());
2547 !nullable
2548 && dep.mode == Dependency::Single
2549 && dep.target_indices.len() == schema.fields().len()
2550 });
2551
2552 let exprs = split_conjunction(&self.predicate);
2553 let eq_pred_cols: HashSet<_> = exprs
2554 .iter()
2555 .filter_map(|expr| {
2556 let Expr::BinaryExpr(BinaryExpr {
2557 left,
2558 op: Operator::Eq,
2559 right,
2560 }) = expr
2561 else {
2562 return None;
2563 };
2564 if left == right {
2566 return None;
2567 }
2568
2569 match (left.as_ref(), right.as_ref()) {
2570 (Expr::Column(_), Expr::Column(_)) => None,
2571 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2572 Some(schema.index_of_column(c).unwrap())
2573 }
2574 _ => None,
2575 }
2576 })
2577 .collect();
2578
2579 for key in unique_keys {
2582 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2583 return true;
2584 }
2585 }
2586 false
2587 }
2588}
2589
2590#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2605pub struct Window {
2606 pub input: Arc<LogicalPlan>,
2608 pub window_expr: Vec<Expr>,
2610 pub schema: DFSchemaRef,
2612}
2613
2614impl Window {
2615 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2617 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2618 .schema()
2619 .iter()
2620 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2621 .collect();
2622 let input_len = fields.len();
2623 let mut window_fields = fields;
2624 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2625 window_fields.extend_from_slice(expr_fields.as_slice());
2626 let metadata = input.schema().metadata().clone();
2627
2628 let mut window_func_dependencies =
2630 input.schema().functional_dependencies().clone();
2631 window_func_dependencies.extend_target_indices(window_fields.len());
2632
2633 let mut new_dependencies = window_expr
2637 .iter()
2638 .enumerate()
2639 .filter_map(|(idx, expr)| {
2640 let Expr::WindowFunction(window_fun) = expr else {
2641 return None;
2642 };
2643 let WindowFunction {
2644 fun: WindowFunctionDefinition::WindowUDF(udwf),
2645 params: WindowFunctionParams { partition_by, .. },
2646 } = window_fun.as_ref()
2647 else {
2648 return None;
2649 };
2650 if udwf.name() == "row_number" && partition_by.is_empty() {
2653 Some(idx + input_len)
2654 } else {
2655 None
2656 }
2657 })
2658 .map(|idx| {
2659 FunctionalDependence::new(vec![idx], vec![], false)
2660 .with_mode(Dependency::Single)
2661 })
2662 .collect::<Vec<_>>();
2663
2664 if !new_dependencies.is_empty() {
2665 for dependence in new_dependencies.iter_mut() {
2666 dependence.target_indices = (0..window_fields.len()).collect();
2667 }
2668 let new_deps = FunctionalDependencies::new(new_dependencies);
2670 window_func_dependencies.extend(new_deps);
2671 }
2672
2673 if let Some(e) = window_expr.iter().find(|e| {
2675 matches!(
2676 e,
2677 Expr::WindowFunction(wf)
2678 if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2679 && wf.params.filter.is_some()
2680 )
2681 }) {
2682 return plan_err!(
2683 "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2684 );
2685 }
2686
2687 Self::try_new_with_schema(
2688 window_expr,
2689 input,
2690 Arc::new(
2691 DFSchema::new_with_metadata(window_fields, metadata)?
2692 .with_functional_dependencies(window_func_dependencies)?,
2693 ),
2694 )
2695 }
2696
2697 pub fn try_new_with_schema(
2703 window_expr: Vec<Expr>,
2704 input: Arc<LogicalPlan>,
2705 schema: DFSchemaRef,
2706 ) -> Result<Self> {
2707 let input_fields_count = input.schema().fields().len();
2708 if schema.fields().len() != input_fields_count + window_expr.len() {
2709 return plan_err!(
2710 "Window schema has wrong number of fields. Expected {} got {}",
2711 input_fields_count + window_expr.len(),
2712 schema.fields().len()
2713 );
2714 }
2715
2716 Ok(Window {
2717 input,
2718 window_expr,
2719 schema,
2720 })
2721 }
2722}
2723
2724impl PartialOrd for Window {
2726 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2727 match self.input.partial_cmp(&other.input)? {
2728 Ordering::Equal => {} not_equal => return Some(not_equal),
2730 }
2731
2732 match self.window_expr.partial_cmp(&other.window_expr)? {
2733 Ordering::Equal => {} not_equal => return Some(not_equal),
2735 }
2736
2737 if self == other {
2740 Some(Ordering::Equal)
2741 } else {
2742 None
2743 }
2744 }
2745}
2746
2747#[derive(Clone)]
2749pub struct TableScan {
2750 pub table_name: TableReference,
2752 pub source: Arc<dyn TableSource>,
2754 pub projection: Option<Vec<usize>>,
2756 pub projected_schema: DFSchemaRef,
2758 pub filters: Vec<Expr>,
2760 pub fetch: Option<usize>,
2762}
2763
2764impl Debug for TableScan {
2765 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2766 f.debug_struct("TableScan")
2767 .field("table_name", &self.table_name)
2768 .field("source", &"...")
2769 .field("projection", &self.projection)
2770 .field("projected_schema", &self.projected_schema)
2771 .field("filters", &self.filters)
2772 .field("fetch", &self.fetch)
2773 .finish_non_exhaustive()
2774 }
2775}
2776
2777impl PartialEq for TableScan {
2778 fn eq(&self, other: &Self) -> bool {
2779 self.table_name == other.table_name
2780 && self.projection == other.projection
2781 && self.projected_schema == other.projected_schema
2782 && self.filters == other.filters
2783 && self.fetch == other.fetch
2784 }
2785}
2786
2787impl Eq for TableScan {}
2788
2789impl PartialOrd for TableScan {
2792 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2793 #[derive(PartialEq, PartialOrd)]
2794 struct ComparableTableScan<'a> {
2795 pub table_name: &'a TableReference,
2797 pub projection: &'a Option<Vec<usize>>,
2799 pub filters: &'a Vec<Expr>,
2801 pub fetch: &'a Option<usize>,
2803 }
2804 let comparable_self = ComparableTableScan {
2805 table_name: &self.table_name,
2806 projection: &self.projection,
2807 filters: &self.filters,
2808 fetch: &self.fetch,
2809 };
2810 let comparable_other = ComparableTableScan {
2811 table_name: &other.table_name,
2812 projection: &other.projection,
2813 filters: &other.filters,
2814 fetch: &other.fetch,
2815 };
2816 comparable_self
2817 .partial_cmp(&comparable_other)
2818 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2820 }
2821}
2822
2823impl Hash for TableScan {
2824 fn hash<H: Hasher>(&self, state: &mut H) {
2825 self.table_name.hash(state);
2826 self.projection.hash(state);
2827 self.projected_schema.hash(state);
2828 self.filters.hash(state);
2829 self.fetch.hash(state);
2830 }
2831}
2832
2833impl TableScan {
2834 pub fn try_new(
2837 table_name: impl Into<TableReference>,
2838 table_source: Arc<dyn TableSource>,
2839 projection: Option<Vec<usize>>,
2840 filters: Vec<Expr>,
2841 fetch: Option<usize>,
2842 ) -> Result<Self> {
2843 let table_name = table_name.into();
2844
2845 if table_name.table().is_empty() {
2846 return plan_err!("table_name cannot be empty");
2847 }
2848 let schema = table_source.schema();
2849 let func_dependencies = FunctionalDependencies::new_from_constraints(
2850 table_source.constraints(),
2851 schema.fields.len(),
2852 );
2853 let projected_schema = projection
2854 .as_ref()
2855 .map(|p| {
2856 let projected_func_dependencies =
2857 func_dependencies.project_functional_dependencies(p, p.len());
2858
2859 let df_schema = DFSchema::new_with_metadata(
2860 p.iter()
2861 .map(|i| {
2862 (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2863 })
2864 .collect(),
2865 schema.metadata.clone(),
2866 )?;
2867 df_schema.with_functional_dependencies(projected_func_dependencies)
2868 })
2869 .unwrap_or_else(|| {
2870 let df_schema =
2871 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2872 df_schema.with_functional_dependencies(func_dependencies)
2873 })?;
2874 let projected_schema = Arc::new(projected_schema);
2875
2876 Ok(Self {
2877 table_name,
2878 source: table_source,
2879 projection,
2880 projected_schema,
2881 filters,
2882 fetch,
2883 })
2884 }
2885}
2886
2887#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2889pub struct Repartition {
2890 pub input: Arc<LogicalPlan>,
2892 pub partitioning_scheme: Partitioning,
2894}
2895
2896#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2898pub struct Union {
2899 pub inputs: Vec<Arc<LogicalPlan>>,
2901 pub schema: DFSchemaRef,
2903}
2904
2905impl Union {
2906 pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2909 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2910 Ok(Union { inputs, schema })
2911 }
2912
2913 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2918 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2919 Ok(Union { inputs, schema })
2920 }
2921
2922 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2926 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2927 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2928
2929 Ok(Union { inputs, schema })
2930 }
2931
2932 fn rewrite_inputs_from_schema(
2936 schema: &Arc<DFSchema>,
2937 inputs: Vec<Arc<LogicalPlan>>,
2938 ) -> Result<Vec<Arc<LogicalPlan>>> {
2939 let schema_width = schema.iter().count();
2940 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2941 for input in inputs {
2942 let mut expr = Vec::with_capacity(schema_width);
2946 for column in schema.columns() {
2947 if input
2948 .schema()
2949 .has_column_with_unqualified_name(column.name())
2950 {
2951 expr.push(Expr::Column(column));
2952 } else {
2953 expr.push(
2954 Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2955 );
2956 }
2957 }
2958 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2959 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2960 )));
2961 }
2962
2963 Ok(wrapped_inputs)
2964 }
2965
2966 fn derive_schema_from_inputs(
2975 inputs: &[Arc<LogicalPlan>],
2976 loose_types: bool,
2977 by_name: bool,
2978 ) -> Result<DFSchemaRef> {
2979 if inputs.len() < 2 {
2980 return plan_err!("UNION requires at least two inputs");
2981 }
2982
2983 if by_name {
2984 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2985 } else {
2986 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2987 }
2988 }
2989
2990 fn derive_schema_from_inputs_by_name(
2991 inputs: &[Arc<LogicalPlan>],
2992 loose_types: bool,
2993 ) -> Result<DFSchemaRef> {
2994 type FieldData<'a> =
2995 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2996 let mut cols: Vec<(&str, FieldData)> = Vec::new();
2997 for input in inputs.iter() {
2998 for field in input.schema().fields() {
2999 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
3000 cols.iter_mut().find(|(name, _)| name == field.name())
3001 {
3002 if !loose_types && *data_type != field.data_type() {
3003 return plan_err!(
3004 "Found different types for field {}",
3005 field.name()
3006 );
3007 }
3008
3009 metadata.push(field.metadata());
3010 *is_nullable |= field.is_nullable();
3013 *occurrences += 1;
3014 } else {
3015 cols.push((
3016 field.name(),
3017 (
3018 field.data_type(),
3019 field.is_nullable(),
3020 vec![field.metadata()],
3021 1,
3022 ),
3023 ));
3024 }
3025 }
3026 }
3027
3028 let union_fields = cols
3029 .into_iter()
3030 .map(
3031 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
3032 let final_is_nullable = if occurrences == inputs.len() {
3036 is_nullable
3037 } else {
3038 true
3039 };
3040
3041 let mut field =
3042 Field::new(name, data_type.clone(), final_is_nullable);
3043 field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
3044
3045 (None, Arc::new(field))
3046 },
3047 )
3048 .collect::<Vec<(Option<TableReference>, _)>>();
3049
3050 let union_schema_metadata = intersect_metadata_for_union(
3051 inputs.iter().map(|input| input.schema().metadata()),
3052 );
3053
3054 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3056 let schema = Arc::new(schema);
3057
3058 Ok(schema)
3059 }
3060
3061 fn derive_schema_from_inputs_by_position(
3062 inputs: &[Arc<LogicalPlan>],
3063 loose_types: bool,
3064 ) -> Result<DFSchemaRef> {
3065 let first_schema = inputs[0].schema();
3066 let fields_count = first_schema.fields().len();
3067 for input in inputs.iter().skip(1) {
3068 if fields_count != input.schema().fields().len() {
3069 return plan_err!(
3070 "UNION queries have different number of columns: \
3071 left has {} columns whereas right has {} columns",
3072 fields_count,
3073 input.schema().fields().len()
3074 );
3075 }
3076 }
3077
3078 let mut name_counts: HashMap<String, usize> = HashMap::new();
3079 let union_fields = (0..fields_count)
3080 .map(|i| {
3081 let fields = inputs
3082 .iter()
3083 .map(|input| input.schema().field(i))
3084 .collect::<Vec<_>>();
3085 let first_field = fields[0];
3086 let base_name = first_field.name().to_string();
3087
3088 let data_type = if loose_types {
3089 first_field.data_type()
3093 } else {
3094 fields.iter().skip(1).try_fold(
3095 first_field.data_type(),
3096 |acc, field| {
3097 if acc != field.data_type() {
3098 return plan_err!(
3099 "UNION field {i} have different type in inputs: \
3100 left has {} whereas right has {}",
3101 first_field.data_type(),
3102 field.data_type()
3103 );
3104 }
3105 Ok(acc)
3106 },
3107 )?
3108 };
3109 let nullable = fields.iter().any(|field| field.is_nullable());
3110
3111 let name = if let Some(count) = name_counts.get_mut(&base_name) {
3113 *count += 1;
3114 format!("{base_name}_{count}")
3115 } else {
3116 name_counts.insert(base_name.clone(), 0);
3117 base_name
3118 };
3119
3120 let mut field = Field::new(&name, data_type.clone(), nullable);
3121 let field_metadata = intersect_metadata_for_union(
3122 fields.iter().map(|field| field.metadata()),
3123 );
3124 field.set_metadata(field_metadata);
3125 Ok((None, Arc::new(field)))
3126 })
3127 .collect::<Result<_>>()?;
3128 let union_schema_metadata = intersect_metadata_for_union(
3129 inputs.iter().map(|input| input.schema().metadata()),
3130 );
3131
3132 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3134 let schema = Arc::new(schema);
3135
3136 Ok(schema)
3137 }
3138}
3139
3140impl PartialOrd for Union {
3142 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3143 self.inputs
3144 .partial_cmp(&other.inputs)
3145 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3147 }
3148}
3149
3150#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3173pub struct DescribeTable {
3174 pub schema: Arc<Schema>,
3176 pub output_schema: DFSchemaRef,
3178}
3179
3180impl PartialOrd for DescribeTable {
3183 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3184 None
3186 }
3187}
3188
3189#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3191pub struct ExplainOption {
3192 pub verbose: bool,
3194 pub analyze: bool,
3196 pub format: ExplainFormat,
3198}
3199
3200impl Default for ExplainOption {
3201 fn default() -> Self {
3202 ExplainOption {
3203 verbose: false,
3204 analyze: false,
3205 format: ExplainFormat::Indent,
3206 }
3207 }
3208}
3209
3210impl ExplainOption {
3211 pub fn with_verbose(mut self, verbose: bool) -> Self {
3213 self.verbose = verbose;
3214 self
3215 }
3216
3217 pub fn with_analyze(mut self, analyze: bool) -> Self {
3219 self.analyze = analyze;
3220 self
3221 }
3222
3223 pub fn with_format(mut self, format: ExplainFormat) -> Self {
3225 self.format = format;
3226 self
3227 }
3228}
3229
3230#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3237pub struct Explain {
3238 pub verbose: bool,
3240 pub explain_format: ExplainFormat,
3243 pub plan: Arc<LogicalPlan>,
3245 pub stringified_plans: Vec<StringifiedPlan>,
3247 pub schema: DFSchemaRef,
3249 pub logical_optimization_succeeded: bool,
3251}
3252
3253impl PartialOrd for Explain {
3255 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3256 #[derive(PartialEq, PartialOrd)]
3257 struct ComparableExplain<'a> {
3258 pub verbose: &'a bool,
3260 pub plan: &'a Arc<LogicalPlan>,
3262 pub stringified_plans: &'a Vec<StringifiedPlan>,
3264 pub logical_optimization_succeeded: &'a bool,
3266 }
3267 let comparable_self = ComparableExplain {
3268 verbose: &self.verbose,
3269 plan: &self.plan,
3270 stringified_plans: &self.stringified_plans,
3271 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3272 };
3273 let comparable_other = ComparableExplain {
3274 verbose: &other.verbose,
3275 plan: &other.plan,
3276 stringified_plans: &other.stringified_plans,
3277 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3278 };
3279 comparable_self
3280 .partial_cmp(&comparable_other)
3281 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3283 }
3284}
3285
3286#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3289pub struct Analyze {
3290 pub verbose: bool,
3292 pub input: Arc<LogicalPlan>,
3294 pub schema: DFSchemaRef,
3296}
3297
3298impl PartialOrd for Analyze {
3300 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3301 match self.verbose.partial_cmp(&other.verbose) {
3302 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3303 cmp => cmp,
3304 }
3305 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3307 }
3308}
3309
3310#[allow(clippy::allow_attributes)]
3315#[allow(clippy::derived_hash_with_manual_eq)]
3316#[derive(Debug, Clone, Eq, Hash)]
3317pub struct Extension {
3318 pub node: Arc<dyn UserDefinedLogicalNode>,
3320}
3321
3322impl PartialEq for Extension {
3326 fn eq(&self, other: &Self) -> bool {
3327 self.node.eq(&other.node)
3328 }
3329}
3330
3331impl PartialOrd for Extension {
3332 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3333 self.node.partial_cmp(&other.node)
3334 }
3335}
3336
3337#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3339pub struct Limit {
3340 pub skip: Option<Box<Expr>>,
3342 pub fetch: Option<Box<Expr>>,
3345 pub input: Arc<LogicalPlan>,
3347}
3348
3349pub enum SkipType {
3351 Literal(usize),
3353 UnsupportedExpr,
3355}
3356
3357pub enum FetchType {
3359 Literal(Option<usize>),
3362 UnsupportedExpr,
3364}
3365
3366impl Limit {
3367 pub fn get_skip_type(&self) -> Result<SkipType> {
3369 match self.skip.as_deref() {
3370 Some(expr) => match *expr {
3371 Expr::Literal(ScalarValue::Int64(s), _) => {
3372 let s = s.unwrap_or(0);
3374 if s >= 0 {
3375 Ok(SkipType::Literal(s as usize))
3376 } else {
3377 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3378 }
3379 }
3380 _ => Ok(SkipType::UnsupportedExpr),
3381 },
3382 None => Ok(SkipType::Literal(0)),
3384 }
3385 }
3386
3387 pub fn get_fetch_type(&self) -> Result<FetchType> {
3389 match self.fetch.as_deref() {
3390 Some(expr) => match *expr {
3391 Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3392 if s >= 0 {
3393 Ok(FetchType::Literal(Some(s as usize)))
3394 } else {
3395 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3396 }
3397 }
3398 Expr::Literal(ScalarValue::Int64(None), _) => {
3399 Ok(FetchType::Literal(None))
3400 }
3401 _ => Ok(FetchType::UnsupportedExpr),
3402 },
3403 None => Ok(FetchType::Literal(None)),
3404 }
3405 }
3406}
3407
3408#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3410pub enum Distinct {
3411 All(Arc<LogicalPlan>),
3413 On(DistinctOn),
3415}
3416
3417impl Distinct {
3418 pub fn input(&self) -> &Arc<LogicalPlan> {
3420 match self {
3421 Distinct::All(input) => input,
3422 Distinct::On(DistinctOn { input, .. }) => input,
3423 }
3424 }
3425}
3426
3427#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3429pub struct DistinctOn {
3430 pub on_expr: Vec<Expr>,
3432 pub select_expr: Vec<Expr>,
3434 pub sort_expr: Option<Vec<SortExpr>>,
3438 pub input: Arc<LogicalPlan>,
3440 pub schema: DFSchemaRef,
3442}
3443
3444impl DistinctOn {
3445 pub fn try_new(
3447 on_expr: Vec<Expr>,
3448 select_expr: Vec<Expr>,
3449 sort_expr: Option<Vec<SortExpr>>,
3450 input: Arc<LogicalPlan>,
3451 ) -> Result<Self> {
3452 if on_expr.is_empty() {
3453 return plan_err!("No `ON` expressions provided");
3454 }
3455
3456 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3457 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3458 .into_iter()
3459 .collect();
3460
3461 let dfschema = DFSchema::new_with_metadata(
3462 qualified_fields,
3463 input.schema().metadata().clone(),
3464 )?;
3465
3466 let mut distinct_on = DistinctOn {
3467 on_expr,
3468 select_expr,
3469 sort_expr: None,
3470 input,
3471 schema: Arc::new(dfschema),
3472 };
3473
3474 if let Some(sort_expr) = sort_expr {
3475 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3476 }
3477
3478 Ok(distinct_on)
3479 }
3480
3481 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3485 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3486
3487 let mut matched = true;
3489 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3490 if on != &sort.expr {
3491 matched = false;
3492 break;
3493 }
3494 }
3495
3496 if self.on_expr.len() > sort_expr.len() || !matched {
3497 return plan_err!(
3498 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3499 );
3500 }
3501
3502 self.sort_expr = Some(sort_expr);
3503 Ok(self)
3504 }
3505}
3506
3507impl PartialOrd for DistinctOn {
3509 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3510 #[derive(PartialEq, PartialOrd)]
3511 struct ComparableDistinctOn<'a> {
3512 pub on_expr: &'a Vec<Expr>,
3514 pub select_expr: &'a Vec<Expr>,
3516 pub sort_expr: &'a Option<Vec<SortExpr>>,
3520 pub input: &'a Arc<LogicalPlan>,
3522 }
3523 let comparable_self = ComparableDistinctOn {
3524 on_expr: &self.on_expr,
3525 select_expr: &self.select_expr,
3526 sort_expr: &self.sort_expr,
3527 input: &self.input,
3528 };
3529 let comparable_other = ComparableDistinctOn {
3530 on_expr: &other.on_expr,
3531 select_expr: &other.select_expr,
3532 sort_expr: &other.sort_expr,
3533 input: &other.input,
3534 };
3535 comparable_self
3536 .partial_cmp(&comparable_other)
3537 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3539 }
3540}
3541
3542#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3555#[non_exhaustive]
3557pub struct Aggregate {
3558 pub input: Arc<LogicalPlan>,
3560 pub group_expr: Vec<Expr>,
3562 pub aggr_expr: Vec<Expr>,
3564 pub schema: DFSchemaRef,
3566}
3567
3568impl Aggregate {
3569 pub fn try_new(
3571 input: Arc<LogicalPlan>,
3572 group_expr: Vec<Expr>,
3573 aggr_expr: Vec<Expr>,
3574 ) -> Result<Self> {
3575 let group_expr = enumerate_grouping_sets(group_expr)?;
3576
3577 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3578
3579 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3580
3581 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3582
3583 if is_grouping_set {
3585 qualified_fields = qualified_fields
3586 .into_iter()
3587 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3588 .collect::<Vec<_>>();
3589 qualified_fields.push((
3590 None,
3591 Field::new(
3592 Self::INTERNAL_GROUPING_ID,
3593 Self::grouping_id_type(qualified_fields.len()),
3594 false,
3595 )
3596 .into(),
3597 ));
3598 }
3599
3600 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3601
3602 let schema = DFSchema::new_with_metadata(
3603 qualified_fields,
3604 input.schema().metadata().clone(),
3605 )?;
3606
3607 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3608 }
3609
3610 #[expect(clippy::needless_pass_by_value)]
3616 pub fn try_new_with_schema(
3617 input: Arc<LogicalPlan>,
3618 group_expr: Vec<Expr>,
3619 aggr_expr: Vec<Expr>,
3620 schema: DFSchemaRef,
3621 ) -> Result<Self> {
3622 if group_expr.is_empty() && aggr_expr.is_empty() {
3623 return plan_err!(
3624 "Aggregate requires at least one grouping or aggregate expression. \
3625 Aggregate without grouping expressions nor aggregate expressions is \
3626 logically equivalent to, but less efficient than, VALUES producing \
3627 single row. Please use VALUES instead."
3628 );
3629 }
3630 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3631 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3632 return plan_err!(
3633 "Aggregate schema has wrong number of fields. Expected {} got {}",
3634 group_expr_count + aggr_expr.len(),
3635 schema.fields().len()
3636 );
3637 }
3638
3639 let aggregate_func_dependencies =
3640 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3641 let new_schema = schema.as_ref().clone();
3642 let schema = Arc::new(
3643 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3644 );
3645 Ok(Self {
3646 input,
3647 group_expr,
3648 aggr_expr,
3649 schema,
3650 })
3651 }
3652
3653 fn is_grouping_set(&self) -> bool {
3654 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3655 }
3656
3657 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3659 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3660 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3661 });
3662 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3663 if self.is_grouping_set() {
3664 exprs.push(&INTERNAL_ID_EXPR);
3665 }
3666 exprs.extend(self.aggr_expr.iter());
3667 debug_assert!(exprs.len() == self.schema.fields().len());
3668 Ok(exprs)
3669 }
3670
3671 pub fn group_expr_len(&self) -> Result<usize> {
3675 grouping_set_expr_count(&self.group_expr)
3676 }
3677
3678 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3683 if group_exprs <= 8 {
3684 DataType::UInt8
3685 } else if group_exprs <= 16 {
3686 DataType::UInt16
3687 } else if group_exprs <= 32 {
3688 DataType::UInt32
3689 } else {
3690 DataType::UInt64
3691 }
3692 }
3693
3694 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3712}
3713
3714impl PartialOrd for Aggregate {
3716 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3717 match self.input.partial_cmp(&other.input) {
3718 Some(Ordering::Equal) => {
3719 match self.group_expr.partial_cmp(&other.group_expr) {
3720 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3721 cmp => cmp,
3722 }
3723 }
3724 cmp => cmp,
3725 }
3726 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3728 }
3729}
3730
3731fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3733 group_expr
3734 .iter()
3735 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3736}
3737
3738fn calc_func_dependencies_for_aggregate(
3740 group_expr: &[Expr],
3742 input: &LogicalPlan,
3744 aggr_schema: &DFSchema,
3746) -> Result<FunctionalDependencies> {
3747 if !contains_grouping_set(group_expr) {
3753 let group_by_expr_names = group_expr
3754 .iter()
3755 .map(|item| item.schema_name().to_string())
3756 .collect::<IndexSet<_>>()
3757 .into_iter()
3758 .collect::<Vec<_>>();
3759 let aggregate_func_dependencies = aggregate_functional_dependencies(
3760 input.schema(),
3761 &group_by_expr_names,
3762 aggr_schema,
3763 );
3764 Ok(aggregate_func_dependencies)
3765 } else {
3766 Ok(FunctionalDependencies::empty())
3767 }
3768}
3769
3770fn calc_func_dependencies_for_project(
3773 exprs: &[Expr],
3774 input: &LogicalPlan,
3775) -> Result<FunctionalDependencies> {
3776 let input_fields = input.schema().field_names();
3777 let proj_indices = exprs
3779 .iter()
3780 .map(|expr| match expr {
3781 #[expect(deprecated)]
3782 Expr::Wildcard { qualifier, options } => {
3783 let wildcard_fields = exprlist_to_fields(
3784 vec![&Expr::Wildcard {
3785 qualifier: qualifier.clone(),
3786 options: options.clone(),
3787 }],
3788 input,
3789 )?;
3790 Ok::<_, DataFusionError>(
3791 wildcard_fields
3792 .into_iter()
3793 .filter_map(|(qualifier, f)| {
3794 let flat_name = qualifier
3795 .map(|t| format!("{}.{}", t, f.name()))
3796 .unwrap_or_else(|| f.name().clone());
3797 input_fields.iter().position(|item| *item == flat_name)
3798 })
3799 .collect::<Vec<_>>(),
3800 )
3801 }
3802 Expr::Alias(alias) => {
3803 let name = format!("{}", alias.expr);
3804 Ok(input_fields
3805 .iter()
3806 .position(|item| *item == name)
3807 .map(|i| vec![i])
3808 .unwrap_or(vec![]))
3809 }
3810 _ => {
3811 let name = format!("{expr}");
3812 Ok(input_fields
3813 .iter()
3814 .position(|item| *item == name)
3815 .map(|i| vec![i])
3816 .unwrap_or(vec![]))
3817 }
3818 })
3819 .collect::<Result<Vec<_>>>()?
3820 .into_iter()
3821 .flatten()
3822 .collect::<Vec<_>>();
3823
3824 Ok(input
3825 .schema()
3826 .functional_dependencies()
3827 .project_functional_dependencies(&proj_indices, exprs.len()))
3828}
3829
3830#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3832pub struct Sort {
3833 pub expr: Vec<SortExpr>,
3835 pub input: Arc<LogicalPlan>,
3837 pub fetch: Option<usize>,
3839}
3840
3841#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3843pub struct Join {
3844 pub left: Arc<LogicalPlan>,
3846 pub right: Arc<LogicalPlan>,
3848 pub on: Vec<(Expr, Expr)>,
3850 pub filter: Option<Expr>,
3852 pub join_type: JoinType,
3854 pub join_constraint: JoinConstraint,
3856 pub schema: DFSchemaRef,
3858 pub null_equality: NullEquality,
3860}
3861
3862impl Join {
3863 pub fn try_new(
3882 left: Arc<LogicalPlan>,
3883 right: Arc<LogicalPlan>,
3884 on: Vec<(Expr, Expr)>,
3885 filter: Option<Expr>,
3886 join_type: JoinType,
3887 join_constraint: JoinConstraint,
3888 null_equality: NullEquality,
3889 ) -> Result<Self> {
3890 let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3891
3892 Ok(Join {
3893 left,
3894 right,
3895 on,
3896 filter,
3897 join_type,
3898 join_constraint,
3899 schema: Arc::new(join_schema),
3900 null_equality,
3901 })
3902 }
3903
3904 pub fn try_new_with_project_input(
3907 original: &LogicalPlan,
3908 left: Arc<LogicalPlan>,
3909 right: Arc<LogicalPlan>,
3910 column_on: (Vec<Column>, Vec<Column>),
3911 ) -> Result<(Self, bool)> {
3912 let original_join = match original {
3913 LogicalPlan::Join(join) => join,
3914 _ => return plan_err!("Could not create join with project input"),
3915 };
3916
3917 let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3918 let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3919
3920 let mut requalified = false;
3921
3922 if original_join.join_type == JoinType::Inner
3925 || original_join.join_type == JoinType::Left
3926 || original_join.join_type == JoinType::Right
3927 || original_join.join_type == JoinType::Full
3928 {
3929 (left_sch, right_sch, requalified) =
3930 requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3931 }
3932
3933 let on: Vec<(Expr, Expr)> = column_on
3934 .0
3935 .into_iter()
3936 .zip(column_on.1)
3937 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3938 .collect();
3939
3940 let join_schema = build_join_schema(
3941 left_sch.schema(),
3942 right_sch.schema(),
3943 &original_join.join_type,
3944 )?;
3945
3946 Ok((
3947 Join {
3948 left,
3949 right,
3950 on,
3951 filter: original_join.filter.clone(),
3952 join_type: original_join.join_type,
3953 join_constraint: original_join.join_constraint,
3954 schema: Arc::new(join_schema),
3955 null_equality: original_join.null_equality,
3956 },
3957 requalified,
3958 ))
3959 }
3960}
3961
3962impl PartialOrd for Join {
3964 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3965 #[derive(PartialEq, PartialOrd)]
3966 struct ComparableJoin<'a> {
3967 pub left: &'a Arc<LogicalPlan>,
3969 pub right: &'a Arc<LogicalPlan>,
3971 pub on: &'a Vec<(Expr, Expr)>,
3973 pub filter: &'a Option<Expr>,
3975 pub join_type: &'a JoinType,
3977 pub join_constraint: &'a JoinConstraint,
3979 pub null_equality: &'a NullEquality,
3981 }
3982 let comparable_self = ComparableJoin {
3983 left: &self.left,
3984 right: &self.right,
3985 on: &self.on,
3986 filter: &self.filter,
3987 join_type: &self.join_type,
3988 join_constraint: &self.join_constraint,
3989 null_equality: &self.null_equality,
3990 };
3991 let comparable_other = ComparableJoin {
3992 left: &other.left,
3993 right: &other.right,
3994 on: &other.on,
3995 filter: &other.filter,
3996 join_type: &other.join_type,
3997 join_constraint: &other.join_constraint,
3998 null_equality: &other.null_equality,
3999 };
4000 comparable_self
4001 .partial_cmp(&comparable_other)
4002 .filter(|cmp| *cmp != Ordering::Equal || self == other)
4004 }
4005}
4006
4007#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
4009pub struct Subquery {
4010 pub subquery: Arc<LogicalPlan>,
4012 pub outer_ref_columns: Vec<Expr>,
4014 pub spans: Spans,
4016}
4017
4018impl Normalizeable for Subquery {
4019 fn can_normalize(&self) -> bool {
4020 false
4021 }
4022}
4023
4024impl NormalizeEq for Subquery {
4025 fn normalize_eq(&self, other: &Self) -> bool {
4026 *self.subquery == *other.subquery
4028 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
4029 && self
4030 .outer_ref_columns
4031 .iter()
4032 .zip(other.outer_ref_columns.iter())
4033 .all(|(a, b)| a.normalize_eq(b))
4034 }
4035}
4036
4037impl Subquery {
4038 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
4039 match plan {
4040 Expr::ScalarSubquery(it) => Ok(it),
4041 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
4042 _ => plan_err!("Could not coerce into ScalarSubquery!"),
4043 }
4044 }
4045
4046 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
4047 Subquery {
4048 subquery: plan,
4049 outer_ref_columns: self.outer_ref_columns.clone(),
4050 spans: Spans::new(),
4051 }
4052 }
4053}
4054
4055impl Debug for Subquery {
4056 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4057 write!(f, "<subquery>")
4058 }
4059}
4060
4061#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
4067pub enum Partitioning {
4068 RoundRobinBatch(usize),
4070 Hash(Vec<Expr>, usize),
4073 DistributeBy(Vec<Expr>),
4075}
4076
4077#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4097pub struct ColumnUnnestList {
4098 pub output_column: Column,
4099 pub depth: usize,
4100}
4101
4102impl Display for ColumnUnnestList {
4103 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4104 write!(f, "{}|depth={}", self.output_column, self.depth)
4105 }
4106}
4107
4108#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4111pub struct Unnest {
4112 pub input: Arc<LogicalPlan>,
4114 pub exec_columns: Vec<Column>,
4116 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4119 pub struct_type_columns: Vec<usize>,
4122 pub dependency_indices: Vec<usize>,
4125 pub schema: DFSchemaRef,
4127 pub options: UnnestOptions,
4129}
4130
4131impl PartialOrd for Unnest {
4133 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4134 #[derive(PartialEq, PartialOrd)]
4135 struct ComparableUnnest<'a> {
4136 pub input: &'a Arc<LogicalPlan>,
4138 pub exec_columns: &'a Vec<Column>,
4140 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4143 pub struct_type_columns: &'a Vec<usize>,
4146 pub dependency_indices: &'a Vec<usize>,
4149 pub options: &'a UnnestOptions,
4151 }
4152 let comparable_self = ComparableUnnest {
4153 input: &self.input,
4154 exec_columns: &self.exec_columns,
4155 list_type_columns: &self.list_type_columns,
4156 struct_type_columns: &self.struct_type_columns,
4157 dependency_indices: &self.dependency_indices,
4158 options: &self.options,
4159 };
4160 let comparable_other = ComparableUnnest {
4161 input: &other.input,
4162 exec_columns: &other.exec_columns,
4163 list_type_columns: &other.list_type_columns,
4164 struct_type_columns: &other.struct_type_columns,
4165 dependency_indices: &other.dependency_indices,
4166 options: &other.options,
4167 };
4168 comparable_self
4169 .partial_cmp(&comparable_other)
4170 .filter(|cmp| *cmp != Ordering::Equal || self == other)
4172 }
4173}
4174
4175impl Unnest {
4176 pub fn try_new(
4177 input: Arc<LogicalPlan>,
4178 exec_columns: Vec<Column>,
4179 options: UnnestOptions,
4180 ) -> Result<Self> {
4181 if exec_columns.is_empty() {
4182 return plan_err!("unnest plan requires at least 1 column to unnest");
4183 }
4184
4185 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4186 let mut struct_columns = vec![];
4187 let indices_to_unnest = exec_columns
4188 .iter()
4189 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4190 .collect::<Result<HashMap<usize, &Column>>>()?;
4191
4192 let input_schema = input.schema();
4193
4194 let mut dependency_indices = vec![];
4195 let fields = input_schema
4211 .iter()
4212 .enumerate()
4213 .map(|(index, (original_qualifier, original_field))| {
4214 match indices_to_unnest.get(&index) {
4215 Some(column_to_unnest) => {
4216 let recursions_on_column = options
4217 .recursions
4218 .iter()
4219 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4220 .collect::<Vec<_>>();
4221 let mut transformed_columns = recursions_on_column
4222 .iter()
4223 .map(|r| {
4224 list_columns.push((
4225 index,
4226 ColumnUnnestList {
4227 output_column: r.output_column.clone(),
4228 depth: r.depth,
4229 },
4230 ));
4231 Ok(get_unnested_columns(
4232 &r.output_column.name,
4233 original_field.data_type(),
4234 r.depth,
4235 )?
4236 .into_iter()
4237 .next()
4238 .unwrap()) })
4240 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4241 if transformed_columns.is_empty() {
4242 transformed_columns = get_unnested_columns(
4243 &column_to_unnest.name,
4244 original_field.data_type(),
4245 1,
4246 )?;
4247 match original_field.data_type() {
4248 DataType::Struct(_) => {
4249 struct_columns.push(index);
4250 }
4251 DataType::List(_)
4252 | DataType::FixedSizeList(_, _)
4253 | DataType::LargeList(_) => {
4254 list_columns.push((
4255 index,
4256 ColumnUnnestList {
4257 output_column: Column::from_name(
4258 &column_to_unnest.name,
4259 ),
4260 depth: 1,
4261 },
4262 ));
4263 }
4264 _ => {}
4265 };
4266 }
4267
4268 dependency_indices.extend(std::iter::repeat_n(
4270 index,
4271 transformed_columns.len(),
4272 ));
4273 Ok(transformed_columns
4274 .iter()
4275 .map(|(col, field)| {
4276 (col.relation.to_owned(), field.to_owned())
4277 })
4278 .collect())
4279 }
4280 None => {
4281 dependency_indices.push(index);
4282 Ok(vec![(
4283 original_qualifier.cloned(),
4284 Arc::clone(original_field),
4285 )])
4286 }
4287 }
4288 })
4289 .collect::<Result<Vec<_>>>()?
4290 .into_iter()
4291 .flatten()
4292 .collect::<Vec<_>>();
4293
4294 let metadata = input_schema.metadata().clone();
4295 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4296 let deps = input_schema.functional_dependencies().clone();
4298 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4299
4300 Ok(Unnest {
4301 input,
4302 exec_columns,
4303 list_type_columns: list_columns,
4304 struct_type_columns: struct_columns,
4305 dependency_indices,
4306 schema,
4307 options,
4308 })
4309 }
4310}
4311
4312fn get_unnested_columns(
4321 col_name: &String,
4322 data_type: &DataType,
4323 depth: usize,
4324) -> Result<Vec<(Column, Arc<Field>)>> {
4325 let mut qualified_columns = Vec::with_capacity(1);
4326
4327 match data_type {
4328 DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4329 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4330 let new_field = Arc::new(Field::new(
4331 col_name, data_type,
4332 true,
4335 ));
4336 let column = Column::from_name(col_name);
4337 qualified_columns.push((column, new_field));
4339 }
4340 DataType::Struct(fields) => {
4341 qualified_columns.extend(fields.iter().map(|f| {
4342 let new_name = format!("{}.{}", col_name, f.name());
4343 let column = Column::from_name(&new_name);
4344 let new_field = f.as_ref().clone().with_name(new_name);
4345 (column, Arc::new(new_field))
4347 }))
4348 }
4349 _ => {
4350 return internal_err!("trying to unnest on invalid data type {data_type}");
4351 }
4352 };
4353 Ok(qualified_columns)
4354}
4355
4356fn get_unnested_list_datatype_recursive(
4359 data_type: &DataType,
4360 depth: usize,
4361) -> Result<DataType> {
4362 match data_type {
4363 DataType::List(field)
4364 | DataType::FixedSizeList(field, _)
4365 | DataType::LargeList(field) => {
4366 if depth == 1 {
4367 return Ok(field.data_type().clone());
4368 }
4369 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4370 }
4371 _ => {}
4372 };
4373
4374 internal_err!("trying to unnest on invalid data type {data_type}")
4375}
4376
4377#[cfg(test)]
4378mod tests {
4379 use super::*;
4380 use crate::builder::LogicalTableSource;
4381 use crate::logical_plan::table_scan;
4382 use crate::select_expr::SelectExpr;
4383 use crate::test::function_stub::{count, count_udaf};
4384 use crate::{
4385 GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder,
4386 scalar_subquery,
4387 };
4388 use datafusion_common::metadata::ScalarAndMetadata;
4389 use datafusion_common::tree_node::{
4390 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4391 };
4392 use datafusion_common::{Constraint, ScalarValue, not_impl_err};
4393 use insta::{assert_debug_snapshot, assert_snapshot};
4394 use std::hash::DefaultHasher;
4395
4396 fn employee_schema() -> Schema {
4397 Schema::new(vec![
4398 Field::new("id", DataType::Int32, false),
4399 Field::new("first_name", DataType::Utf8, false),
4400 Field::new("last_name", DataType::Utf8, false),
4401 Field::new("state", DataType::Utf8, false),
4402 Field::new("salary", DataType::Int32, false),
4403 ])
4404 }
4405
4406 fn display_plan() -> Result<LogicalPlan> {
4407 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4408 .build()?;
4409
4410 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4411 .filter(in_subquery(col("state"), Arc::new(plan1)))?
4412 .project(vec![col("id")])?
4413 .build()
4414 }
4415
4416 #[test]
4417 fn test_display_indent() -> Result<()> {
4418 let plan = display_plan()?;
4419
4420 assert_snapshot!(plan.display_indent(), @r"
4421 Projection: employee_csv.id
4422 Filter: employee_csv.state IN (<subquery>)
4423 Subquery:
4424 TableScan: employee_csv projection=[state]
4425 TableScan: employee_csv projection=[id, state]
4426 ");
4427 Ok(())
4428 }
4429
4430 #[test]
4431 fn test_display_indent_schema() -> Result<()> {
4432 let plan = display_plan()?;
4433
4434 assert_snapshot!(plan.display_indent_schema(), @r"
4435 Projection: employee_csv.id [id:Int32]
4436 Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4437 Subquery: [state:Utf8]
4438 TableScan: employee_csv projection=[state] [state:Utf8]
4439 TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4440 ");
4441 Ok(())
4442 }
4443
4444 #[test]
4445 fn test_display_subquery_alias() -> Result<()> {
4446 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4447 .build()?;
4448 let plan1 = Arc::new(plan1);
4449
4450 let plan =
4451 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4452 .project(vec![col("id"), exists(plan1).alias("exists")])?
4453 .build();
4454
4455 assert_snapshot!(plan?.display_indent(), @r"
4456 Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4457 Subquery:
4458 TableScan: employee_csv projection=[state]
4459 TableScan: employee_csv projection=[id, state]
4460 ");
4461 Ok(())
4462 }
4463
4464 #[test]
4465 fn test_display_graphviz() -> Result<()> {
4466 let plan = display_plan()?;
4467
4468 assert_snapshot!(plan.display_graphviz(), @r#"
4471 // Begin DataFusion GraphViz Plan,
4472 // display it online here: https://dreampuf.github.io/GraphvizOnline
4473
4474 digraph {
4475 subgraph cluster_1
4476 {
4477 graph[label="LogicalPlan"]
4478 2[shape=box label="Projection: employee_csv.id"]
4479 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4480 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4481 4[shape=box label="Subquery:"]
4482 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4483 5[shape=box label="TableScan: employee_csv projection=[state]"]
4484 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4485 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4486 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4487 }
4488 subgraph cluster_7
4489 {
4490 graph[label="Detailed LogicalPlan"]
4491 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4492 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4493 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4494 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4495 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4496 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4497 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4498 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4499 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4500 }
4501 }
4502 // End DataFusion GraphViz Plan
4503 "#);
4504 Ok(())
4505 }
4506
4507 #[test]
4508 fn test_display_pg_json() -> Result<()> {
4509 let plan = display_plan()?;
4510
4511 assert_snapshot!(plan.display_pg_json(), @r#"
4512 [
4513 {
4514 "Plan": {
4515 "Expressions": [
4516 "employee_csv.id"
4517 ],
4518 "Node Type": "Projection",
4519 "Output": [
4520 "id"
4521 ],
4522 "Plans": [
4523 {
4524 "Condition": "employee_csv.state IN (<subquery>)",
4525 "Node Type": "Filter",
4526 "Output": [
4527 "id",
4528 "state"
4529 ],
4530 "Plans": [
4531 {
4532 "Node Type": "Subquery",
4533 "Output": [
4534 "state"
4535 ],
4536 "Plans": [
4537 {
4538 "Node Type": "TableScan",
4539 "Output": [
4540 "state"
4541 ],
4542 "Plans": [],
4543 "Relation Name": "employee_csv"
4544 }
4545 ]
4546 },
4547 {
4548 "Node Type": "TableScan",
4549 "Output": [
4550 "id",
4551 "state"
4552 ],
4553 "Plans": [],
4554 "Relation Name": "employee_csv"
4555 }
4556 ]
4557 }
4558 ]
4559 }
4560 }
4561 ]
4562 "#);
4563 Ok(())
4564 }
4565
4566 #[derive(Debug, Default)]
4568 struct OkVisitor {
4569 strings: Vec<String>,
4570 }
4571
4572 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4573 type Node = LogicalPlan;
4574
4575 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4576 let s = match plan {
4577 LogicalPlan::Projection { .. } => "pre_visit Projection",
4578 LogicalPlan::Filter { .. } => "pre_visit Filter",
4579 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4580 _ => {
4581 return not_impl_err!("unknown plan type");
4582 }
4583 };
4584
4585 self.strings.push(s.into());
4586 Ok(TreeNodeRecursion::Continue)
4587 }
4588
4589 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4590 let s = match plan {
4591 LogicalPlan::Projection { .. } => "post_visit Projection",
4592 LogicalPlan::Filter { .. } => "post_visit Filter",
4593 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4594 _ => {
4595 return not_impl_err!("unknown plan type");
4596 }
4597 };
4598
4599 self.strings.push(s.into());
4600 Ok(TreeNodeRecursion::Continue)
4601 }
4602 }
4603
4604 #[test]
4605 fn visit_order() {
4606 let mut visitor = OkVisitor::default();
4607 let plan = test_plan();
4608 let res = plan.visit_with_subqueries(&mut visitor);
4609 assert!(res.is_ok());
4610
4611 assert_debug_snapshot!(visitor.strings, @r#"
4612 [
4613 "pre_visit Projection",
4614 "pre_visit Filter",
4615 "pre_visit TableScan",
4616 "post_visit TableScan",
4617 "post_visit Filter",
4618 "post_visit Projection",
4619 ]
4620 "#);
4621 }
4622
4623 #[derive(Debug, Default)]
4624 struct OptionalCounter {
4626 val: Option<usize>,
4627 }
4628
4629 impl OptionalCounter {
4630 fn new(val: usize) -> Self {
4631 Self { val: Some(val) }
4632 }
4633 fn dec(&mut self) -> bool {
4635 if Some(0) == self.val {
4636 true
4637 } else {
4638 self.val = self.val.take().map(|i| i - 1);
4639 false
4640 }
4641 }
4642 }
4643
4644 #[derive(Debug, Default)]
4645 struct StoppingVisitor {
4647 inner: OkVisitor,
4648 return_false_from_pre_in: OptionalCounter,
4650 return_false_from_post_in: OptionalCounter,
4652 }
4653
4654 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4655 type Node = LogicalPlan;
4656
4657 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4658 if self.return_false_from_pre_in.dec() {
4659 return Ok(TreeNodeRecursion::Stop);
4660 }
4661 self.inner.f_down(plan)?;
4662
4663 Ok(TreeNodeRecursion::Continue)
4664 }
4665
4666 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4667 if self.return_false_from_post_in.dec() {
4668 return Ok(TreeNodeRecursion::Stop);
4669 }
4670
4671 self.inner.f_up(plan)
4672 }
4673 }
4674
4675 #[test]
4677 fn early_stopping_pre_visit() {
4678 let mut visitor = StoppingVisitor {
4679 return_false_from_pre_in: OptionalCounter::new(2),
4680 ..Default::default()
4681 };
4682 let plan = test_plan();
4683 let res = plan.visit_with_subqueries(&mut visitor);
4684 assert!(res.is_ok());
4685
4686 assert_debug_snapshot!(
4687 visitor.inner.strings,
4688 @r#"
4689 [
4690 "pre_visit Projection",
4691 "pre_visit Filter",
4692 ]
4693 "#
4694 );
4695 }
4696
4697 #[test]
4698 fn early_stopping_post_visit() {
4699 let mut visitor = StoppingVisitor {
4700 return_false_from_post_in: OptionalCounter::new(1),
4701 ..Default::default()
4702 };
4703 let plan = test_plan();
4704 let res = plan.visit_with_subqueries(&mut visitor);
4705 assert!(res.is_ok());
4706
4707 assert_debug_snapshot!(
4708 visitor.inner.strings,
4709 @r#"
4710 [
4711 "pre_visit Projection",
4712 "pre_visit Filter",
4713 "pre_visit TableScan",
4714 "post_visit TableScan",
4715 ]
4716 "#
4717 );
4718 }
4719
4720 #[derive(Debug, Default)]
4721 struct ErrorVisitor {
4723 inner: OkVisitor,
4724 return_error_from_pre_in: OptionalCounter,
4726 return_error_from_post_in: OptionalCounter,
4728 }
4729
4730 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4731 type Node = LogicalPlan;
4732
4733 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4734 if self.return_error_from_pre_in.dec() {
4735 return not_impl_err!("Error in pre_visit");
4736 }
4737
4738 self.inner.f_down(plan)
4739 }
4740
4741 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4742 if self.return_error_from_post_in.dec() {
4743 return not_impl_err!("Error in post_visit");
4744 }
4745
4746 self.inner.f_up(plan)
4747 }
4748 }
4749
4750 #[test]
4751 fn error_pre_visit() {
4752 let mut visitor = ErrorVisitor {
4753 return_error_from_pre_in: OptionalCounter::new(2),
4754 ..Default::default()
4755 };
4756 let plan = test_plan();
4757 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4758 assert_snapshot!(
4759 res.strip_backtrace(),
4760 @"This feature is not implemented: Error in pre_visit"
4761 );
4762 assert_debug_snapshot!(
4763 visitor.inner.strings,
4764 @r#"
4765 [
4766 "pre_visit Projection",
4767 "pre_visit Filter",
4768 ]
4769 "#
4770 );
4771 }
4772
4773 #[test]
4774 fn error_post_visit() {
4775 let mut visitor = ErrorVisitor {
4776 return_error_from_post_in: OptionalCounter::new(1),
4777 ..Default::default()
4778 };
4779 let plan = test_plan();
4780 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4781 assert_snapshot!(
4782 res.strip_backtrace(),
4783 @"This feature is not implemented: Error in post_visit"
4784 );
4785 assert_debug_snapshot!(
4786 visitor.inner.strings,
4787 @r#"
4788 [
4789 "pre_visit Projection",
4790 "pre_visit Filter",
4791 "pre_visit TableScan",
4792 "post_visit TableScan",
4793 ]
4794 "#
4795 );
4796 }
4797
4798 #[test]
4799 fn test_partial_eq_hash_and_partial_ord() {
4800 let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4801 produce_one_row: true,
4802 schema: Arc::new(DFSchema::empty()),
4803 }));
4804
4805 let count_window_function = |schema| {
4806 Window::try_new_with_schema(
4807 vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4808 WindowFunctionDefinition::AggregateUDF(count_udaf()),
4809 vec![],
4810 )))],
4811 Arc::clone(&empty_values),
4812 Arc::new(schema),
4813 )
4814 .unwrap()
4815 };
4816
4817 let schema_without_metadata = || {
4818 DFSchema::from_unqualified_fields(
4819 vec![Field::new("count", DataType::Int64, false)].into(),
4820 HashMap::new(),
4821 )
4822 .unwrap()
4823 };
4824
4825 let schema_with_metadata = || {
4826 DFSchema::from_unqualified_fields(
4827 vec![Field::new("count", DataType::Int64, false)].into(),
4828 [("key".to_string(), "value".to_string())].into(),
4829 )
4830 .unwrap()
4831 };
4832
4833 let f = count_window_function(schema_without_metadata());
4835
4836 let f2 = count_window_function(schema_without_metadata());
4838 assert_eq!(f, f2);
4839 assert_eq!(hash(&f), hash(&f2));
4840 assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4841
4842 let o = count_window_function(schema_with_metadata());
4844 assert_ne!(f, o);
4845 assert_ne!(hash(&f), hash(&o)); assert_eq!(f.partial_cmp(&o), None);
4847 }
4848
4849 fn hash<T: Hash>(value: &T) -> u64 {
4850 let hasher = &mut DefaultHasher::new();
4851 value.hash(hasher);
4852 hasher.finish()
4853 }
4854
4855 #[test]
4856 fn projection_expr_schema_mismatch() -> Result<()> {
4857 let empty_schema = Arc::new(DFSchema::empty());
4858 let p = Projection::try_new_with_schema(
4859 vec![col("a")],
4860 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4861 produce_one_row: false,
4862 schema: Arc::clone(&empty_schema),
4863 })),
4864 empty_schema,
4865 );
4866 assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4867 Ok(())
4868 }
4869
4870 fn test_plan() -> LogicalPlan {
4871 let schema = Schema::new(vec![
4872 Field::new("id", DataType::Int32, false),
4873 Field::new("state", DataType::Utf8, false),
4874 ]);
4875
4876 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4877 .unwrap()
4878 .filter(col("state").eq(lit("CO")))
4879 .unwrap()
4880 .project(vec![col("id")])
4881 .unwrap()
4882 .build()
4883 .unwrap()
4884 }
4885
4886 #[test]
4887 fn test_replace_invalid_placeholder() {
4888 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4890
4891 let plan = table_scan(TableReference::none(), &schema, None)
4892 .unwrap()
4893 .filter(col("id").eq(placeholder("")))
4894 .unwrap()
4895 .build()
4896 .unwrap();
4897
4898 let param_values = vec![ScalarValue::Int32(Some(42))];
4899 plan.replace_params_with_values(¶m_values.clone().into())
4900 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4901
4902 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4904
4905 let plan = table_scan(TableReference::none(), &schema, None)
4906 .unwrap()
4907 .filter(col("id").eq(placeholder("$0")))
4908 .unwrap()
4909 .build()
4910 .unwrap();
4911
4912 plan.replace_params_with_values(¶m_values.clone().into())
4913 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4914
4915 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4917
4918 let plan = table_scan(TableReference::none(), &schema, None)
4919 .unwrap()
4920 .filter(col("id").eq(placeholder("$00")))
4921 .unwrap()
4922 .build()
4923 .unwrap();
4924
4925 plan.replace_params_with_values(¶m_values.into())
4926 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4927 }
4928
4929 #[test]
4930 fn test_replace_placeholder_mismatched_metadata() {
4931 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4932
4933 let plan = table_scan(TableReference::none(), &schema, None)
4935 .unwrap()
4936 .filter(col("id").eq(placeholder("$1")))
4937 .unwrap()
4938 .build()
4939 .unwrap();
4940 let prepared_builder = LogicalPlanBuilder::new(plan)
4941 .prepare(
4942 "".to_string(),
4943 vec![Field::new("", DataType::Int32, true).into()],
4944 )
4945 .unwrap();
4946
4947 let mut scalar_meta = HashMap::new();
4949 scalar_meta.insert("some_key".to_string(), "some_value".to_string());
4950 let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
4951 ScalarValue::Int32(Some(42)),
4952 Some(scalar_meta.into()),
4953 )]);
4954 prepared_builder
4955 .plan()
4956 .clone()
4957 .with_param_values(param_values)
4958 .expect_err("prepared field metadata mismatch unexpectedly succeeded");
4959 }
4960
4961 #[test]
4962 fn test_replace_placeholder_empty_relation_valid_schema() {
4963 let plan = LogicalPlanBuilder::empty(false)
4965 .project(vec![
4966 SelectExpr::from(placeholder("$1")),
4967 SelectExpr::from(placeholder("$2")),
4968 ])
4969 .unwrap()
4970 .build()
4971 .unwrap();
4972
4973 assert_snapshot!(plan.display_indent_schema(), @r"
4975 Projection: $1, $2 [$1:Null;N, $2:Null;N]
4976 EmptyRelation: rows=0 []
4977 ");
4978
4979 let plan = plan
4980 .with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
4981 .unwrap();
4982
4983 assert_snapshot!(plan.display_indent_schema(), @r#"
4985 Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
4986 EmptyRelation: rows=0 []
4987 "#);
4988 }
4989
4990 #[test]
4991 fn test_nullable_schema_after_grouping_set() {
4992 let schema = Schema::new(vec![
4993 Field::new("foo", DataType::Int32, false),
4994 Field::new("bar", DataType::Int32, false),
4995 ]);
4996
4997 let plan = table_scan(TableReference::none(), &schema, None)
4998 .unwrap()
4999 .aggregate(
5000 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
5001 vec![col("foo")],
5002 vec![col("bar")],
5003 ]))],
5004 vec![count(lit(true))],
5005 )
5006 .unwrap()
5007 .build()
5008 .unwrap();
5009
5010 let output_schema = plan.schema();
5011
5012 assert!(
5013 output_schema
5014 .field_with_name(None, "foo")
5015 .unwrap()
5016 .is_nullable(),
5017 );
5018 assert!(
5019 output_schema
5020 .field_with_name(None, "bar")
5021 .unwrap()
5022 .is_nullable()
5023 );
5024 }
5025
5026 #[test]
5027 fn test_filter_is_scalar() {
5028 let schema =
5030 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
5031
5032 let source = Arc::new(LogicalTableSource::new(schema));
5033 let schema = Arc::new(
5034 DFSchema::try_from_qualified_schema(
5035 TableReference::bare("tab"),
5036 &source.schema(),
5037 )
5038 .unwrap(),
5039 );
5040 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5041 table_name: TableReference::bare("tab"),
5042 source: Arc::clone(&source) as Arc<dyn TableSource>,
5043 projection: None,
5044 projected_schema: Arc::clone(&schema),
5045 filters: vec![],
5046 fetch: None,
5047 }));
5048 let col = schema.field_names()[0].clone();
5049
5050 let filter = Filter::try_new(
5051 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
5052 scan,
5053 )
5054 .unwrap();
5055 assert!(!filter.is_scalar());
5056 let unique_schema = Arc::new(
5057 schema
5058 .as_ref()
5059 .clone()
5060 .with_functional_dependencies(
5061 FunctionalDependencies::new_from_constraints(
5062 Some(&Constraints::new_unverified(vec![Constraint::Unique(
5063 vec![0],
5064 )])),
5065 1,
5066 ),
5067 )
5068 .unwrap(),
5069 );
5070 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5071 table_name: TableReference::bare("tab"),
5072 source,
5073 projection: None,
5074 projected_schema: Arc::clone(&unique_schema),
5075 filters: vec![],
5076 fetch: None,
5077 }));
5078 let col = schema.field_names()[0].clone();
5079
5080 let filter =
5081 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
5082 assert!(filter.is_scalar());
5083 }
5084
5085 #[test]
5086 fn test_transform_explain() {
5087 let schema = Schema::new(vec![
5088 Field::new("foo", DataType::Int32, false),
5089 Field::new("bar", DataType::Int32, false),
5090 ]);
5091
5092 let plan = table_scan(TableReference::none(), &schema, None)
5093 .unwrap()
5094 .explain(false, false)
5095 .unwrap()
5096 .build()
5097 .unwrap();
5098
5099 let external_filter = col("foo").eq(lit(true));
5100
5101 let plan = plan
5104 .transform(|plan| match plan {
5105 LogicalPlan::TableScan(table) => {
5106 let filter = Filter::try_new(
5107 external_filter.clone(),
5108 Arc::new(LogicalPlan::TableScan(table)),
5109 )
5110 .unwrap();
5111 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
5112 }
5113 x => Ok(Transformed::no(x)),
5114 })
5115 .data()
5116 .unwrap();
5117
5118 let actual = format!("{}", plan.display_indent());
5119 assert_snapshot!(actual, @r"
5120 Explain
5121 Filter: foo = Boolean(true)
5122 TableScan: ?table?
5123 ")
5124 }
5125
5126 #[test]
5127 fn test_plan_partial_ord() {
5128 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5129 produce_one_row: false,
5130 schema: Arc::new(DFSchema::empty()),
5131 });
5132
5133 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5134 schema: Arc::new(Schema::new(vec![Field::new(
5135 "foo",
5136 DataType::Int32,
5137 false,
5138 )])),
5139 output_schema: DFSchemaRef::new(DFSchema::empty()),
5140 });
5141
5142 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5143 schema: Arc::new(Schema::new(vec![Field::new(
5144 "foo",
5145 DataType::Int32,
5146 false,
5147 )])),
5148 output_schema: DFSchemaRef::new(DFSchema::empty()),
5149 });
5150
5151 assert_eq!(
5152 empty_relation.partial_cmp(&describe_table),
5153 Some(Ordering::Less)
5154 );
5155 assert_eq!(
5156 describe_table.partial_cmp(&empty_relation),
5157 Some(Ordering::Greater)
5158 );
5159 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5160 }
5161
5162 #[test]
5163 fn test_limit_with_new_children() {
5164 let input = Arc::new(LogicalPlan::Values(Values {
5165 schema: Arc::new(DFSchema::empty()),
5166 values: vec![vec![]],
5167 }));
5168 let cases = [
5169 LogicalPlan::Limit(Limit {
5170 skip: None,
5171 fetch: None,
5172 input: Arc::clone(&input),
5173 }),
5174 LogicalPlan::Limit(Limit {
5175 skip: None,
5176 fetch: Some(Box::new(Expr::Literal(
5177 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5178 None,
5179 ))),
5180 input: Arc::clone(&input),
5181 }),
5182 LogicalPlan::Limit(Limit {
5183 skip: Some(Box::new(Expr::Literal(
5184 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5185 None,
5186 ))),
5187 fetch: None,
5188 input: Arc::clone(&input),
5189 }),
5190 LogicalPlan::Limit(Limit {
5191 skip: Some(Box::new(Expr::Literal(
5192 ScalarValue::new_one(&DataType::UInt32).unwrap(),
5193 None,
5194 ))),
5195 fetch: Some(Box::new(Expr::Literal(
5196 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5197 None,
5198 ))),
5199 input,
5200 }),
5201 ];
5202
5203 for limit in cases {
5204 let new_limit = limit
5205 .with_new_exprs(
5206 limit.expressions(),
5207 limit.inputs().into_iter().cloned().collect(),
5208 )
5209 .unwrap();
5210 assert_eq!(limit, new_limit);
5211 }
5212 }
5213
5214 #[test]
5215 fn test_with_subqueries_jump() {
5216 let subquery_schema =
5221 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5222
5223 let subquery_plan =
5224 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5225 .unwrap()
5226 .filter(col("sub_id").eq(lit(0)))
5227 .unwrap()
5228 .build()
5229 .unwrap();
5230
5231 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5232
5233 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5234 .unwrap()
5235 .filter(col("id").eq(lit(0)))
5236 .unwrap()
5237 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5238 .unwrap()
5239 .build()
5240 .unwrap();
5241
5242 let mut filter_found = false;
5243 plan.apply_with_subqueries(|plan| {
5244 match plan {
5245 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5246 LogicalPlan::Filter(..) => filter_found = true,
5247 _ => {}
5248 }
5249 Ok(TreeNodeRecursion::Continue)
5250 })
5251 .unwrap();
5252 assert!(!filter_found);
5253
5254 struct ProjectJumpVisitor {
5255 filter_found: bool,
5256 }
5257
5258 impl ProjectJumpVisitor {
5259 fn new() -> Self {
5260 Self {
5261 filter_found: false,
5262 }
5263 }
5264 }
5265
5266 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5267 type Node = LogicalPlan;
5268
5269 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5270 match node {
5271 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5272 LogicalPlan::Filter(..) => self.filter_found = true,
5273 _ => {}
5274 }
5275 Ok(TreeNodeRecursion::Continue)
5276 }
5277 }
5278
5279 let mut visitor = ProjectJumpVisitor::new();
5280 plan.visit_with_subqueries(&mut visitor).unwrap();
5281 assert!(!visitor.filter_found);
5282
5283 let mut filter_found = false;
5284 plan.clone()
5285 .transform_down_with_subqueries(|plan| {
5286 match plan {
5287 LogicalPlan::Projection(..) => {
5288 return Ok(Transformed::new(
5289 plan,
5290 false,
5291 TreeNodeRecursion::Jump,
5292 ));
5293 }
5294 LogicalPlan::Filter(..) => filter_found = true,
5295 _ => {}
5296 }
5297 Ok(Transformed::no(plan))
5298 })
5299 .unwrap();
5300 assert!(!filter_found);
5301
5302 let mut filter_found = false;
5303 plan.clone()
5304 .transform_down_up_with_subqueries(
5305 |plan| {
5306 match plan {
5307 LogicalPlan::Projection(..) => {
5308 return Ok(Transformed::new(
5309 plan,
5310 false,
5311 TreeNodeRecursion::Jump,
5312 ));
5313 }
5314 LogicalPlan::Filter(..) => filter_found = true,
5315 _ => {}
5316 }
5317 Ok(Transformed::no(plan))
5318 },
5319 |plan| Ok(Transformed::no(plan)),
5320 )
5321 .unwrap();
5322 assert!(!filter_found);
5323
5324 struct ProjectJumpRewriter {
5325 filter_found: bool,
5326 }
5327
5328 impl ProjectJumpRewriter {
5329 fn new() -> Self {
5330 Self {
5331 filter_found: false,
5332 }
5333 }
5334 }
5335
5336 impl TreeNodeRewriter for ProjectJumpRewriter {
5337 type Node = LogicalPlan;
5338
5339 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5340 match node {
5341 LogicalPlan::Projection(..) => {
5342 return Ok(Transformed::new(
5343 node,
5344 false,
5345 TreeNodeRecursion::Jump,
5346 ));
5347 }
5348 LogicalPlan::Filter(..) => self.filter_found = true,
5349 _ => {}
5350 }
5351 Ok(Transformed::no(node))
5352 }
5353 }
5354
5355 let mut rewriter = ProjectJumpRewriter::new();
5356 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5357 assert!(!rewriter.filter_found);
5358 }
5359
5360 #[test]
5361 fn test_with_unresolved_placeholders() {
5362 let field_name = "id";
5363 let placeholder_value = "$1";
5364 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5365
5366 let plan = table_scan(TableReference::none(), &schema, None)
5367 .unwrap()
5368 .filter(col(field_name).eq(placeholder(placeholder_value)))
5369 .unwrap()
5370 .build()
5371 .unwrap();
5372
5373 let params = plan.get_parameter_fields().unwrap();
5375 assert_eq!(params.len(), 1);
5376
5377 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5378 assert_eq!(parameter_type, None);
5379 }
5380
5381 #[test]
5382 fn test_join_with_new_exprs() -> Result<()> {
5383 fn create_test_join(
5384 on: Vec<(Expr, Expr)>,
5385 filter: Option<Expr>,
5386 ) -> Result<LogicalPlan> {
5387 let schema = Schema::new(vec![
5388 Field::new("a", DataType::Int32, false),
5389 Field::new("b", DataType::Int32, false),
5390 ]);
5391
5392 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5393 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5394
5395 Ok(LogicalPlan::Join(Join {
5396 left: Arc::new(
5397 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5398 ),
5399 right: Arc::new(
5400 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5401 ),
5402 on,
5403 filter,
5404 join_type: JoinType::Inner,
5405 join_constraint: JoinConstraint::On,
5406 schema: Arc::new(left_schema.join(&right_schema)?),
5407 null_equality: NullEquality::NullEqualsNothing,
5408 }))
5409 }
5410
5411 {
5412 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5413 let LogicalPlan::Join(join) = join.with_new_exprs(
5414 join.expressions(),
5415 join.inputs().into_iter().cloned().collect(),
5416 )?
5417 else {
5418 unreachable!()
5419 };
5420 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5421 assert_eq!(join.filter, None);
5422 }
5423
5424 {
5425 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5426 let LogicalPlan::Join(join) = join.with_new_exprs(
5427 join.expressions(),
5428 join.inputs().into_iter().cloned().collect(),
5429 )?
5430 else {
5431 unreachable!()
5432 };
5433 assert_eq!(join.on, vec![]);
5434 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5435 }
5436
5437 {
5438 let join = create_test_join(
5439 vec![(col("t1.a"), (col("t2.a")))],
5440 Some(col("t1.b").gt(col("t2.b"))),
5441 )?;
5442 let LogicalPlan::Join(join) = join.with_new_exprs(
5443 join.expressions(),
5444 join.inputs().into_iter().cloned().collect(),
5445 )?
5446 else {
5447 unreachable!()
5448 };
5449 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5450 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5451 }
5452
5453 {
5454 let join = create_test_join(
5455 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5456 None,
5457 )?;
5458 let LogicalPlan::Join(join) = join.with_new_exprs(
5459 vec![
5460 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5461 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5462 col("t1.b"),
5463 col("t2.b"),
5464 lit(true),
5465 ],
5466 join.inputs().into_iter().cloned().collect(),
5467 )?
5468 else {
5469 unreachable!()
5470 };
5471 assert_eq!(
5472 join.on,
5473 vec![
5474 (
5475 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5476 binary_expr(col("t2.a"), Operator::Plus, lit(2))
5477 ),
5478 (col("t1.b"), (col("t2.b")))
5479 ]
5480 );
5481 assert_eq!(join.filter, Some(lit(true)));
5482 }
5483
5484 Ok(())
5485 }
5486
5487 #[test]
5488 fn test_join_try_new() -> Result<()> {
5489 let schema = Schema::new(vec![
5490 Field::new("a", DataType::Int32, false),
5491 Field::new("b", DataType::Int32, false),
5492 ]);
5493
5494 let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5495
5496 let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5497
5498 let join_types = vec![
5499 JoinType::Inner,
5500 JoinType::Left,
5501 JoinType::Right,
5502 JoinType::Full,
5503 JoinType::LeftSemi,
5504 JoinType::LeftAnti,
5505 JoinType::RightSemi,
5506 JoinType::RightAnti,
5507 JoinType::LeftMark,
5508 ];
5509
5510 for join_type in join_types {
5511 let join = Join::try_new(
5512 Arc::new(left_scan.clone()),
5513 Arc::new(right_scan.clone()),
5514 vec![(col("t1.a"), col("t2.a"))],
5515 Some(col("t1.b").gt(col("t2.b"))),
5516 join_type,
5517 JoinConstraint::On,
5518 NullEquality::NullEqualsNothing,
5519 )?;
5520
5521 match join_type {
5522 JoinType::LeftSemi | JoinType::LeftAnti => {
5523 assert_eq!(join.schema.fields().len(), 2);
5524
5525 let fields = join.schema.fields();
5526 assert_eq!(
5527 fields[0].name(),
5528 "a",
5529 "First field should be 'a' from left table"
5530 );
5531 assert_eq!(
5532 fields[1].name(),
5533 "b",
5534 "Second field should be 'b' from left table"
5535 );
5536 }
5537 JoinType::RightSemi | JoinType::RightAnti => {
5538 assert_eq!(join.schema.fields().len(), 2);
5539
5540 let fields = join.schema.fields();
5541 assert_eq!(
5542 fields[0].name(),
5543 "a",
5544 "First field should be 'a' from right table"
5545 );
5546 assert_eq!(
5547 fields[1].name(),
5548 "b",
5549 "Second field should be 'b' from right table"
5550 );
5551 }
5552 JoinType::LeftMark => {
5553 assert_eq!(join.schema.fields().len(), 3);
5554
5555 let fields = join.schema.fields();
5556 assert_eq!(
5557 fields[0].name(),
5558 "a",
5559 "First field should be 'a' from left table"
5560 );
5561 assert_eq!(
5562 fields[1].name(),
5563 "b",
5564 "Second field should be 'b' from left table"
5565 );
5566 assert_eq!(
5567 fields[2].name(),
5568 "mark",
5569 "Third field should be the mark column"
5570 );
5571
5572 assert!(!fields[0].is_nullable());
5573 assert!(!fields[1].is_nullable());
5574 assert!(!fields[2].is_nullable());
5575 }
5576 _ => {
5577 assert_eq!(join.schema.fields().len(), 4);
5578
5579 let fields = join.schema.fields();
5580 assert_eq!(
5581 fields[0].name(),
5582 "a",
5583 "First field should be 'a' from left table"
5584 );
5585 assert_eq!(
5586 fields[1].name(),
5587 "b",
5588 "Second field should be 'b' from left table"
5589 );
5590 assert_eq!(
5591 fields[2].name(),
5592 "a",
5593 "Third field should be 'a' from right table"
5594 );
5595 assert_eq!(
5596 fields[3].name(),
5597 "b",
5598 "Fourth field should be 'b' from right table"
5599 );
5600
5601 if join_type == JoinType::Left {
5602 assert!(!fields[0].is_nullable());
5604 assert!(!fields[1].is_nullable());
5605 assert!(fields[2].is_nullable());
5607 assert!(fields[3].is_nullable());
5608 } else if join_type == JoinType::Right {
5609 assert!(fields[0].is_nullable());
5611 assert!(fields[1].is_nullable());
5612 assert!(!fields[2].is_nullable());
5614 assert!(!fields[3].is_nullable());
5615 } else if join_type == JoinType::Full {
5616 assert!(fields[0].is_nullable());
5617 assert!(fields[1].is_nullable());
5618 assert!(fields[2].is_nullable());
5619 assert!(fields[3].is_nullable());
5620 }
5621 }
5622 }
5623
5624 assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5625 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5626 assert_eq!(join.join_type, join_type);
5627 assert_eq!(join.join_constraint, JoinConstraint::On);
5628 assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5629 }
5630
5631 Ok(())
5632 }
5633
5634 #[test]
5635 fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5636 let left_schema = Schema::new(vec![
5637 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), ]);
5641
5642 let right_schema = Schema::new(vec![
5643 Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), Field::new("value", DataType::Float64, true), ]);
5647
5648 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5649
5650 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5651
5652 {
5654 let join = Join::try_new(
5657 Arc::new(left_plan.clone()),
5658 Arc::new(right_plan.clone()),
5659 vec![(col("t1.id"), col("t2.id"))],
5660 None,
5661 JoinType::Inner,
5662 JoinConstraint::Using,
5663 NullEquality::NullEqualsNothing,
5664 )?;
5665
5666 let fields = join.schema.fields();
5667
5668 assert_eq!(fields.len(), 6);
5669
5670 assert_eq!(
5671 fields[0].name(),
5672 "id",
5673 "First field should be 'id' from left table"
5674 );
5675 assert_eq!(
5676 fields[1].name(),
5677 "name",
5678 "Second field should be 'name' from left table"
5679 );
5680 assert_eq!(
5681 fields[2].name(),
5682 "value",
5683 "Third field should be 'value' from left table"
5684 );
5685 assert_eq!(
5686 fields[3].name(),
5687 "id",
5688 "Fourth field should be 'id' from right table"
5689 );
5690 assert_eq!(
5691 fields[4].name(),
5692 "category",
5693 "Fifth field should be 'category' from right table"
5694 );
5695 assert_eq!(
5696 fields[5].name(),
5697 "value",
5698 "Sixth field should be 'value' from right table"
5699 );
5700
5701 assert_eq!(join.join_constraint, JoinConstraint::Using);
5702 }
5703
5704 {
5706 let join = Join::try_new(
5708 Arc::new(left_plan.clone()),
5709 Arc::new(right_plan.clone()),
5710 vec![(col("t1.id"), col("t2.id"))], Some(col("t1.value").lt(col("t2.value"))), JoinType::Inner,
5713 JoinConstraint::On,
5714 NullEquality::NullEqualsNothing,
5715 )?;
5716
5717 let fields = join.schema.fields();
5718 assert_eq!(fields.len(), 6);
5719
5720 assert_eq!(
5721 fields[0].name(),
5722 "id",
5723 "First field should be 'id' from left table"
5724 );
5725 assert_eq!(
5726 fields[1].name(),
5727 "name",
5728 "Second field should be 'name' from left table"
5729 );
5730 assert_eq!(
5731 fields[2].name(),
5732 "value",
5733 "Third field should be 'value' from left table"
5734 );
5735 assert_eq!(
5736 fields[3].name(),
5737 "id",
5738 "Fourth field should be 'id' from right table"
5739 );
5740 assert_eq!(
5741 fields[4].name(),
5742 "category",
5743 "Fifth field should be 'category' from right table"
5744 );
5745 assert_eq!(
5746 fields[5].name(),
5747 "value",
5748 "Sixth field should be 'value' from right table"
5749 );
5750
5751 assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5752 }
5753
5754 {
5756 let join = Join::try_new(
5757 Arc::new(left_plan.clone()),
5758 Arc::new(right_plan.clone()),
5759 vec![(col("t1.id"), col("t2.id"))],
5760 None,
5761 JoinType::Inner,
5762 JoinConstraint::On,
5763 NullEquality::NullEqualsNull,
5764 )?;
5765
5766 assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5767 }
5768
5769 Ok(())
5770 }
5771
5772 #[test]
5773 fn test_join_try_new_schema_validation() -> Result<()> {
5774 let left_schema = Schema::new(vec![
5775 Field::new("id", DataType::Int32, false),
5776 Field::new("name", DataType::Utf8, false),
5777 Field::new("value", DataType::Float64, true),
5778 ]);
5779
5780 let right_schema = Schema::new(vec![
5781 Field::new("id", DataType::Int32, false),
5782 Field::new("category", DataType::Utf8, true),
5783 Field::new("code", DataType::Int16, false),
5784 ]);
5785
5786 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5787
5788 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5789
5790 let join_types = vec![
5791 JoinType::Inner,
5792 JoinType::Left,
5793 JoinType::Right,
5794 JoinType::Full,
5795 ];
5796
5797 for join_type in join_types {
5798 let join = Join::try_new(
5799 Arc::new(left_plan.clone()),
5800 Arc::new(right_plan.clone()),
5801 vec![(col("t1.id"), col("t2.id"))],
5802 Some(col("t1.value").gt(lit(5.0))),
5803 join_type,
5804 JoinConstraint::On,
5805 NullEquality::NullEqualsNothing,
5806 )?;
5807
5808 let fields = join.schema.fields();
5809 assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5810
5811 for (i, field) in fields.iter().enumerate() {
5812 let expected_nullable = match (i, &join_type) {
5813 (0, JoinType::Right | JoinType::Full) => true, (1, JoinType::Right | JoinType::Full) => true, (2, _) => true, (3, JoinType::Left | JoinType::Full) => true, (4, _) => true, (5, JoinType::Left | JoinType::Full) => true, _ => false,
5824 };
5825
5826 assert_eq!(
5827 field.is_nullable(),
5828 expected_nullable,
5829 "Field {} ({}) nullability incorrect for {:?} join",
5830 i,
5831 field.name(),
5832 join_type
5833 );
5834 }
5835 }
5836
5837 let using_join = Join::try_new(
5838 Arc::new(left_plan.clone()),
5839 Arc::new(right_plan.clone()),
5840 vec![(col("t1.id"), col("t2.id"))],
5841 None,
5842 JoinType::Inner,
5843 JoinConstraint::Using,
5844 NullEquality::NullEqualsNothing,
5845 )?;
5846
5847 assert_eq!(
5848 using_join.schema.fields().len(),
5849 6,
5850 "USING join should have all fields"
5851 );
5852 assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5853
5854 Ok(())
5855 }
5856}