1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::DdlStatement;
27use super::dml::CopyTo;
28use super::invariants::{
29 InvariantLevel, assert_always_invariants_at_current_node,
30 assert_executable_invariants,
31};
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34 Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams,
35 intersect_metadata_for_union,
36};
37use crate::expr_rewriter::{
38 NamePreserver, create_col_from_scalar_expr, normalize_cols, normalize_sorts,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45 grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
46};
47use crate::{
48 BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable,
49 LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
50 WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
51};
52
53use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
54use datafusion_common::cse::{NormalizeEq, Normalizeable};
55use datafusion_common::format::ExplainFormat;
56use datafusion_common::metadata::check_metadata_with_storage_equal;
57use datafusion_common::tree_node::{
58 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61 Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency,
62 FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result,
63 ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies,
64 assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
65};
66use indexmap::IndexSet;
67
68use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
206pub enum LogicalPlan {
207 Projection(Projection),
210 Filter(Filter),
219 Window(Window),
225 Aggregate(Aggregate),
231 Sort(Sort),
234 Join(Join),
237 Repartition(Repartition),
241 Union(Union),
245 TableScan(TableScan),
248 EmptyRelation(EmptyRelation),
252 Subquery(Subquery),
255 SubqueryAlias(SubqueryAlias),
257 Limit(Limit),
259 Statement(Statement),
261 Values(Values),
266 Explain(Explain),
269 Analyze(Analyze),
273 Extension(Extension),
276 Distinct(Distinct),
279 Dml(DmlStatement),
281 Ddl(DdlStatement),
283 Copy(CopyTo),
285 DescribeTable(DescribeTable),
288 Unnest(Unnest),
291 RecursiveQuery(RecursiveQuery),
293}
294
295impl Default for LogicalPlan {
296 fn default() -> Self {
297 LogicalPlan::EmptyRelation(EmptyRelation {
298 produce_one_row: false,
299 schema: Arc::new(DFSchema::empty()),
300 })
301 }
302}
303
304impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
305 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
306 &'a self,
307 mut f: F,
308 ) -> Result<TreeNodeRecursion> {
309 f(self)
310 }
311
312 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
313 self,
314 mut f: F,
315 ) -> Result<Transformed<Self>> {
316 f(self)
317 }
318}
319
320impl LogicalPlan {
321 pub fn schema(&self) -> &DFSchemaRef {
323 match self {
324 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
325 LogicalPlan::Values(Values { schema, .. }) => schema,
326 LogicalPlan::TableScan(TableScan {
327 projected_schema, ..
328 }) => projected_schema,
329 LogicalPlan::Projection(Projection { schema, .. }) => schema,
330 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
331 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
332 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
333 LogicalPlan::Window(Window { schema, .. }) => schema,
334 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
335 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
336 LogicalPlan::Join(Join { schema, .. }) => schema,
337 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
338 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
339 LogicalPlan::Statement(statement) => statement.schema(),
340 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
341 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
342 LogicalPlan::Explain(explain) => &explain.schema,
343 LogicalPlan::Analyze(analyze) => &analyze.schema,
344 LogicalPlan::Extension(extension) => extension.node.schema(),
345 LogicalPlan::Union(Union { schema, .. }) => schema,
346 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
347 output_schema
348 }
349 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
350 LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
351 LogicalPlan::Ddl(ddl) => ddl.schema(),
352 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
353 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
354 static_term.schema()
356 }
357 }
358 }
359
360 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
363 match self {
364 LogicalPlan::Window(_)
365 | LogicalPlan::Projection(_)
366 | LogicalPlan::Aggregate(_)
367 | LogicalPlan::Unnest(_)
368 | LogicalPlan::Join(_) => self
369 .inputs()
370 .iter()
371 .map(|input| input.schema().as_ref())
372 .collect(),
373 _ => vec![],
374 }
375 }
376
377 pub fn explain_schema() -> SchemaRef {
379 SchemaRef::new(Schema::new(vec![
380 Field::new("plan_type", DataType::Utf8, false),
381 Field::new("plan", DataType::Utf8, false),
382 ]))
383 }
384
385 pub fn describe_schema() -> Schema {
387 Schema::new(vec![
388 Field::new("column_name", DataType::Utf8, false),
389 Field::new("data_type", DataType::Utf8, false),
390 Field::new("is_nullable", DataType::Utf8, false),
391 ])
392 }
393
394 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
411 let mut exprs = vec![];
412 self.apply_expressions(|e| {
413 exprs.push(e.clone());
414 Ok(TreeNodeRecursion::Continue)
415 })
416 .unwrap();
418 exprs
419 }
420
421 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
424 let mut exprs = vec![];
425 self.apply_expressions(|e| {
426 find_out_reference_exprs(e).into_iter().for_each(|e| {
427 if !exprs.contains(&e) {
428 exprs.push(e)
429 }
430 });
431 Ok(TreeNodeRecursion::Continue)
432 })
433 .unwrap();
435 self.inputs()
436 .into_iter()
437 .flat_map(|child| child.all_out_ref_exprs())
438 .for_each(|e| {
439 if !exprs.contains(&e) {
440 exprs.push(e)
441 }
442 });
443 exprs
444 }
445
446 pub fn inputs(&self) -> Vec<&LogicalPlan> {
450 match self {
451 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
452 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
453 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
454 LogicalPlan::Window(Window { input, .. }) => vec![input],
455 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
456 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
457 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
458 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
459 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
460 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
461 LogicalPlan::Extension(extension) => extension.node.inputs(),
462 LogicalPlan::Union(Union { inputs, .. }) => {
463 inputs.iter().map(|arc| arc.as_ref()).collect()
464 }
465 LogicalPlan::Distinct(
466 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
467 ) => vec![input],
468 LogicalPlan::Explain(explain) => vec![&explain.plan],
469 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
470 LogicalPlan::Dml(write) => vec![&write.input],
471 LogicalPlan::Copy(copy) => vec![©.input],
472 LogicalPlan::Ddl(ddl) => ddl.inputs(),
473 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
474 LogicalPlan::RecursiveQuery(RecursiveQuery {
475 static_term,
476 recursive_term,
477 ..
478 }) => vec![static_term, recursive_term],
479 LogicalPlan::Statement(stmt) => stmt.inputs(),
480 LogicalPlan::TableScan { .. }
482 | LogicalPlan::EmptyRelation { .. }
483 | LogicalPlan::Values { .. }
484 | LogicalPlan::DescribeTable(_) => vec![],
485 }
486 }
487
488 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
490 let mut using_columns: Vec<HashSet<Column>> = vec![];
491
492 self.apply_with_subqueries(|plan| {
493 if let LogicalPlan::Join(Join {
494 join_constraint: JoinConstraint::Using,
495 on,
496 ..
497 }) = plan
498 {
499 let columns =
501 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
502 let Some(l) = l.get_as_join_column() else {
503 return internal_err!(
504 "Invalid join key. Expected column, found {l:?}"
505 );
506 };
507 let Some(r) = r.get_as_join_column() else {
508 return internal_err!(
509 "Invalid join key. Expected column, found {r:?}"
510 );
511 };
512 accumu.insert(l.to_owned());
513 accumu.insert(r.to_owned());
514 Result::<_, DataFusionError>::Ok(accumu)
515 })?;
516 using_columns.push(columns);
517 }
518 Ok(TreeNodeRecursion::Continue)
519 })?;
520
521 Ok(using_columns)
522 }
523
524 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
526 match self {
527 LogicalPlan::Projection(projection) => {
528 Ok(Some(projection.expr.as_slice()[0].clone()))
529 }
530 LogicalPlan::Aggregate(agg) => {
531 if agg.group_expr.is_empty() {
532 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
533 } else {
534 Ok(Some(agg.group_expr.as_slice()[0].clone()))
535 }
536 }
537 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
538 Ok(Some(select_expr[0].clone()))
539 }
540 LogicalPlan::Filter(Filter { input, .. })
541 | LogicalPlan::Distinct(Distinct::All(input))
542 | LogicalPlan::Sort(Sort { input, .. })
543 | LogicalPlan::Limit(Limit { input, .. })
544 | LogicalPlan::Repartition(Repartition { input, .. })
545 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
546 LogicalPlan::Join(Join {
547 left,
548 right,
549 join_type,
550 ..
551 }) => match join_type {
552 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
553 if left.schema().fields().is_empty() {
554 right.head_output_expr()
555 } else {
556 left.head_output_expr()
557 }
558 }
559 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
560 left.head_output_expr()
561 }
562 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
563 right.head_output_expr()
564 }
565 },
566 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
567 static_term.head_output_expr()
568 }
569 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
570 union.schema.qualified_field(0),
571 )))),
572 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
573 table.projected_schema.qualified_field(0),
574 )))),
575 LogicalPlan::SubqueryAlias(subquery_alias) => {
576 let expr_opt = subquery_alias.input.head_output_expr()?;
577 expr_opt
578 .map(|expr| {
579 Ok(Expr::Column(create_col_from_scalar_expr(
580 &expr,
581 subquery_alias.alias.to_string(),
582 )?))
583 })
584 .map_or(Ok(None), |v| v.map(Some))
585 }
586 LogicalPlan::Subquery(_) => Ok(None),
587 LogicalPlan::EmptyRelation(_)
588 | LogicalPlan::Statement(_)
589 | LogicalPlan::Values(_)
590 | LogicalPlan::Explain(_)
591 | LogicalPlan::Analyze(_)
592 | LogicalPlan::Extension(_)
593 | LogicalPlan::Dml(_)
594 | LogicalPlan::Copy(_)
595 | LogicalPlan::Ddl(_)
596 | LogicalPlan::DescribeTable(_)
597 | LogicalPlan::Unnest(_) => Ok(None),
598 }
599 }
600
601 pub fn recompute_schema(self) -> Result<Self> {
624 match self {
625 LogicalPlan::Projection(Projection {
628 expr,
629 input,
630 schema: _,
631 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
632 LogicalPlan::Dml(_) => Ok(self),
633 LogicalPlan::Copy(_) => Ok(self),
634 LogicalPlan::Values(Values { schema, values }) => {
635 Ok(LogicalPlan::Values(Values { schema, values }))
637 }
638 LogicalPlan::Filter(Filter { predicate, input }) => {
639 Filter::try_new(predicate, input).map(LogicalPlan::Filter)
640 }
641 LogicalPlan::Repartition(_) => Ok(self),
642 LogicalPlan::Window(Window {
643 input,
644 window_expr,
645 schema: _,
646 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
647 LogicalPlan::Aggregate(Aggregate {
648 input,
649 group_expr,
650 aggr_expr,
651 schema: _,
652 }) => Aggregate::try_new(input, group_expr, aggr_expr)
653 .map(LogicalPlan::Aggregate),
654 LogicalPlan::Sort(_) => Ok(self),
655 LogicalPlan::Join(Join {
656 left,
657 right,
658 filter,
659 join_type,
660 join_constraint,
661 on,
662 schema: _,
663 null_equality,
664 null_aware,
665 }) => {
666 let schema =
667 build_join_schema(left.schema(), right.schema(), &join_type)?;
668
669 let new_on: Vec<_> = on
670 .into_iter()
671 .map(|equi_expr| {
672 (equi_expr.0.unalias(), equi_expr.1.unalias())
674 })
675 .collect();
676
677 Ok(LogicalPlan::Join(Join {
678 left,
679 right,
680 join_type,
681 join_constraint,
682 on: new_on,
683 filter,
684 schema: DFSchemaRef::new(schema),
685 null_equality,
686 null_aware,
687 }))
688 }
689 LogicalPlan::Subquery(_) => Ok(self),
690 LogicalPlan::SubqueryAlias(SubqueryAlias {
691 input,
692 alias,
693 schema: _,
694 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
695 LogicalPlan::Limit(_) => Ok(self),
696 LogicalPlan::Ddl(_) => Ok(self),
697 LogicalPlan::Extension(Extension { node }) => {
698 let expr = node.expressions();
701 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
702 Ok(LogicalPlan::Extension(Extension {
703 node: node.with_exprs_and_inputs(expr, inputs)?,
704 }))
705 }
706 LogicalPlan::Union(Union { inputs, schema }) => {
707 let first_input_schema = inputs[0].schema();
708 if schema.fields().len() == first_input_schema.fields().len() {
709 Ok(LogicalPlan::Union(Union { inputs, schema }))
711 } else {
712 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
720 }
721 }
722 LogicalPlan::Distinct(distinct) => {
723 let distinct = match distinct {
724 Distinct::All(input) => Distinct::All(input),
725 Distinct::On(DistinctOn {
726 on_expr,
727 select_expr,
728 sort_expr,
729 input,
730 schema: _,
731 }) => Distinct::On(DistinctOn::try_new(
732 on_expr,
733 select_expr,
734 sort_expr,
735 input,
736 )?),
737 };
738 Ok(LogicalPlan::Distinct(distinct))
739 }
740 LogicalPlan::RecursiveQuery(_) => Ok(self),
741 LogicalPlan::Analyze(_) => Ok(self),
742 LogicalPlan::Explain(_) => Ok(self),
743 LogicalPlan::TableScan(_) => Ok(self),
744 LogicalPlan::EmptyRelation(_) => Ok(self),
745 LogicalPlan::Statement(_) => Ok(self),
746 LogicalPlan::DescribeTable(_) => Ok(self),
747 LogicalPlan::Unnest(Unnest {
748 input,
749 exec_columns,
750 options,
751 ..
752 }) => {
753 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
755 }
756 }
757 }
758
759 pub fn with_new_exprs(
785 &self,
786 mut expr: Vec<Expr>,
787 inputs: Vec<LogicalPlan>,
788 ) -> Result<LogicalPlan> {
789 match self {
790 LogicalPlan::Projection(Projection { .. }) => {
793 let input = self.only_input(inputs)?;
794 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
795 }
796 LogicalPlan::Dml(DmlStatement {
797 table_name,
798 target,
799 op,
800 ..
801 }) => {
802 self.assert_no_expressions(expr)?;
803 let input = self.only_input(inputs)?;
804 Ok(LogicalPlan::Dml(DmlStatement::new(
805 table_name.clone(),
806 Arc::clone(target),
807 op.clone(),
808 Arc::new(input),
809 )))
810 }
811 LogicalPlan::Copy(CopyTo {
812 input: _,
813 output_url,
814 file_type,
815 options,
816 partition_by,
817 output_schema: _,
818 }) => {
819 self.assert_no_expressions(expr)?;
820 let input = self.only_input(inputs)?;
821 Ok(LogicalPlan::Copy(CopyTo::new(
822 Arc::new(input),
823 output_url.clone(),
824 partition_by.clone(),
825 Arc::clone(file_type),
826 options.clone(),
827 )))
828 }
829 LogicalPlan::Values(Values { schema, .. }) => {
830 self.assert_no_inputs(inputs)?;
831 Ok(LogicalPlan::Values(Values {
832 schema: Arc::clone(schema),
833 values: expr
834 .chunks_exact(schema.fields().len())
835 .map(|s| s.to_vec())
836 .collect(),
837 }))
838 }
839 LogicalPlan::Filter { .. } => {
840 let predicate = self.only_expr(expr)?;
841 let input = self.only_input(inputs)?;
842
843 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
844 }
845 LogicalPlan::Repartition(Repartition {
846 partitioning_scheme,
847 ..
848 }) => match partitioning_scheme {
849 Partitioning::RoundRobinBatch(n) => {
850 self.assert_no_expressions(expr)?;
851 let input = self.only_input(inputs)?;
852 Ok(LogicalPlan::Repartition(Repartition {
853 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
854 input: Arc::new(input),
855 }))
856 }
857 Partitioning::Hash(_, n) => {
858 let input = self.only_input(inputs)?;
859 Ok(LogicalPlan::Repartition(Repartition {
860 partitioning_scheme: Partitioning::Hash(expr, *n),
861 input: Arc::new(input),
862 }))
863 }
864 Partitioning::DistributeBy(_) => {
865 let input = self.only_input(inputs)?;
866 Ok(LogicalPlan::Repartition(Repartition {
867 partitioning_scheme: Partitioning::DistributeBy(expr),
868 input: Arc::new(input),
869 }))
870 }
871 },
872 LogicalPlan::Window(Window { window_expr, .. }) => {
873 assert_eq!(window_expr.len(), expr.len());
874 let input = self.only_input(inputs)?;
875 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
876 }
877 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
878 let input = self.only_input(inputs)?;
879 let agg_expr = expr.split_off(group_expr.len());
881
882 Aggregate::try_new(Arc::new(input), expr, agg_expr)
883 .map(LogicalPlan::Aggregate)
884 }
885 LogicalPlan::Sort(Sort {
886 expr: sort_expr,
887 fetch,
888 ..
889 }) => {
890 let input = self.only_input(inputs)?;
891 Ok(LogicalPlan::Sort(Sort {
892 expr: expr
893 .into_iter()
894 .zip(sort_expr.iter())
895 .map(|(expr, sort)| sort.with_expr(expr))
896 .collect(),
897 input: Arc::new(input),
898 fetch: *fetch,
899 }))
900 }
901 LogicalPlan::Join(Join {
902 join_type,
903 join_constraint,
904 on,
905 null_equality,
906 null_aware,
907 ..
908 }) => {
909 let (left, right) = self.only_two_inputs(inputs)?;
910 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
911
912 let equi_expr_count = on.len() * 2;
913 assert!(expr.len() >= equi_expr_count);
914
915 let filter_expr = if expr.len() > equi_expr_count {
918 expr.pop()
919 } else {
920 None
921 };
922
923 assert_eq!(expr.len(), equi_expr_count);
926 let mut new_on = Vec::with_capacity(on.len());
927 let mut iter = expr.into_iter();
928 while let Some(left) = iter.next() {
929 let Some(right) = iter.next() else {
930 internal_err!(
931 "Expected a pair of expressions to construct the join on expression"
932 )?
933 };
934
935 new_on.push((left.unalias(), right.unalias()));
937 }
938
939 Ok(LogicalPlan::Join(Join {
940 left: Arc::new(left),
941 right: Arc::new(right),
942 join_type: *join_type,
943 join_constraint: *join_constraint,
944 on: new_on,
945 filter: filter_expr,
946 schema: DFSchemaRef::new(schema),
947 null_equality: *null_equality,
948 null_aware: *null_aware,
949 }))
950 }
951 LogicalPlan::Subquery(Subquery {
952 outer_ref_columns,
953 spans,
954 ..
955 }) => {
956 self.assert_no_expressions(expr)?;
957 let input = self.only_input(inputs)?;
958 let subquery = LogicalPlanBuilder::from(input).build()?;
959 Ok(LogicalPlan::Subquery(Subquery {
960 subquery: Arc::new(subquery),
961 outer_ref_columns: outer_ref_columns.clone(),
962 spans: spans.clone(),
963 }))
964 }
965 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
966 self.assert_no_expressions(expr)?;
967 let input = self.only_input(inputs)?;
968 SubqueryAlias::try_new(Arc::new(input), alias.clone())
969 .map(LogicalPlan::SubqueryAlias)
970 }
971 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
972 let old_expr_len = skip.iter().chain(fetch.iter()).count();
973 assert_eq_or_internal_err!(
974 old_expr_len,
975 expr.len(),
976 "Invalid number of new Limit expressions: expected {}, got {}",
977 old_expr_len,
978 expr.len()
979 );
980 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
982 let new_skip = skip.as_ref().and_then(|_| expr.pop());
983 let input = self.only_input(inputs)?;
984 Ok(LogicalPlan::Limit(Limit {
985 skip: new_skip.map(Box::new),
986 fetch: new_fetch.map(Box::new),
987 input: Arc::new(input),
988 }))
989 }
990 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
991 name,
992 if_not_exists,
993 or_replace,
994 column_defaults,
995 temporary,
996 ..
997 })) => {
998 self.assert_no_expressions(expr)?;
999 let input = self.only_input(inputs)?;
1000 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
1001 CreateMemoryTable {
1002 input: Arc::new(input),
1003 constraints: Constraints::default(),
1004 name: name.clone(),
1005 if_not_exists: *if_not_exists,
1006 or_replace: *or_replace,
1007 column_defaults: column_defaults.clone(),
1008 temporary: *temporary,
1009 },
1010 )))
1011 }
1012 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1013 name,
1014 or_replace,
1015 definition,
1016 temporary,
1017 ..
1018 })) => {
1019 self.assert_no_expressions(expr)?;
1020 let input = self.only_input(inputs)?;
1021 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1022 input: Arc::new(input),
1023 name: name.clone(),
1024 or_replace: *or_replace,
1025 temporary: *temporary,
1026 definition: definition.clone(),
1027 })))
1028 }
1029 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1030 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1031 })),
1032 LogicalPlan::Union(Union { schema, .. }) => {
1033 self.assert_no_expressions(expr)?;
1034 let input_schema = inputs[0].schema();
1035 let schema = if schema.fields().len() == input_schema.fields().len() {
1037 Arc::clone(schema)
1038 } else {
1039 Arc::clone(input_schema)
1040 };
1041 Ok(LogicalPlan::Union(Union {
1042 inputs: inputs.into_iter().map(Arc::new).collect(),
1043 schema,
1044 }))
1045 }
1046 LogicalPlan::Distinct(distinct) => {
1047 let distinct = match distinct {
1048 Distinct::All(_) => {
1049 self.assert_no_expressions(expr)?;
1050 let input = self.only_input(inputs)?;
1051 Distinct::All(Arc::new(input))
1052 }
1053 Distinct::On(DistinctOn {
1054 on_expr,
1055 select_expr,
1056 ..
1057 }) => {
1058 let input = self.only_input(inputs)?;
1059 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1060 let select_expr = expr.split_off(on_expr.len());
1061 assert!(
1062 sort_expr.is_empty(),
1063 "with_new_exprs for Distinct does not support sort expressions"
1064 );
1065 Distinct::On(DistinctOn::try_new(
1066 expr,
1067 select_expr,
1068 None, Arc::new(input),
1070 )?)
1071 }
1072 };
1073 Ok(LogicalPlan::Distinct(distinct))
1074 }
1075 LogicalPlan::RecursiveQuery(RecursiveQuery {
1076 name, is_distinct, ..
1077 }) => {
1078 self.assert_no_expressions(expr)?;
1079 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1080 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1081 name: name.clone(),
1082 static_term: Arc::new(static_term),
1083 recursive_term: Arc::new(recursive_term),
1084 is_distinct: *is_distinct,
1085 }))
1086 }
1087 LogicalPlan::Analyze(a) => {
1088 self.assert_no_expressions(expr)?;
1089 let input = self.only_input(inputs)?;
1090 Ok(LogicalPlan::Analyze(Analyze {
1091 verbose: a.verbose,
1092 schema: Arc::clone(&a.schema),
1093 input: Arc::new(input),
1094 }))
1095 }
1096 LogicalPlan::Explain(e) => {
1097 self.assert_no_expressions(expr)?;
1098 let input = self.only_input(inputs)?;
1099 Ok(LogicalPlan::Explain(Explain {
1100 verbose: e.verbose,
1101 plan: Arc::new(input),
1102 explain_format: e.explain_format.clone(),
1103 stringified_plans: e.stringified_plans.clone(),
1104 schema: Arc::clone(&e.schema),
1105 logical_optimization_succeeded: e.logical_optimization_succeeded,
1106 }))
1107 }
1108 LogicalPlan::Statement(Statement::Prepare(Prepare {
1109 name, fields, ..
1110 })) => {
1111 self.assert_no_expressions(expr)?;
1112 let input = self.only_input(inputs)?;
1113 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1114 name: name.clone(),
1115 fields: fields.clone(),
1116 input: Arc::new(input),
1117 })))
1118 }
1119 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1120 self.assert_no_inputs(inputs)?;
1121 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1122 name: name.clone(),
1123 parameters: expr,
1124 })))
1125 }
1126 LogicalPlan::TableScan(ts) => {
1127 self.assert_no_inputs(inputs)?;
1128 Ok(LogicalPlan::TableScan(TableScan {
1129 filters: expr,
1130 ..ts.clone()
1131 }))
1132 }
1133 LogicalPlan::EmptyRelation(_)
1134 | LogicalPlan::Ddl(_)
1135 | LogicalPlan::Statement(_)
1136 | LogicalPlan::DescribeTable(_) => {
1137 self.assert_no_expressions(expr)?;
1139 self.assert_no_inputs(inputs)?;
1140 Ok(self.clone())
1141 }
1142 LogicalPlan::Unnest(Unnest {
1143 exec_columns: columns,
1144 options,
1145 ..
1146 }) => {
1147 self.assert_no_expressions(expr)?;
1148 let input = self.only_input(inputs)?;
1149 let new_plan =
1151 unnest_with_options(input, columns.clone(), options.clone())?;
1152 Ok(new_plan)
1153 }
1154 }
1155 }
1156
1157 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1159 match check {
1160 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1161 InvariantLevel::Executable => assert_executable_invariants(self),
1162 }
1163 }
1164
1165 #[inline]
1167 #[expect(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1169 assert_or_internal_err!(
1170 expr.is_empty(),
1171 "{self:?} should have no exprs, got {:?}",
1172 expr
1173 );
1174 Ok(())
1175 }
1176
1177 #[inline]
1179 #[expect(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1181 assert_or_internal_err!(
1182 inputs.is_empty(),
1183 "{self:?} should have no inputs, got: {:?}",
1184 inputs
1185 );
1186 Ok(())
1187 }
1188
1189 #[inline]
1191 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1192 assert_eq_or_internal_err!(
1193 expr.len(),
1194 1,
1195 "{self:?} should have exactly one expr, got {:?}",
1196 &expr
1197 );
1198 Ok(expr.remove(0))
1199 }
1200
1201 #[inline]
1203 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1204 assert_eq_or_internal_err!(
1205 inputs.len(),
1206 1,
1207 "{self:?} should have exactly one input, got {:?}",
1208 &inputs
1209 );
1210 Ok(inputs.remove(0))
1211 }
1212
1213 #[inline]
1215 fn only_two_inputs(
1216 &self,
1217 mut inputs: Vec<LogicalPlan>,
1218 ) -> Result<(LogicalPlan, LogicalPlan)> {
1219 assert_eq_or_internal_err!(
1220 inputs.len(),
1221 2,
1222 "{self:?} should have exactly two inputs, got {:?}",
1223 &inputs
1224 );
1225 let right = inputs.remove(1);
1226 let left = inputs.remove(0);
1227 Ok((left, right))
1228 }
1229
1230 pub fn with_param_values(
1283 self,
1284 param_values: impl Into<ParamValues>,
1285 ) -> Result<LogicalPlan> {
1286 let param_values = param_values.into();
1287 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1288
1289 Ok(
1291 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1292 plan_with_values
1293 {
1294 param_values.verify_fields(&prepare_lp.fields)?;
1295 Arc::unwrap_or_clone(prepare_lp.input)
1297 } else {
1298 plan_with_values
1299 },
1300 )
1301 }
1302
1303 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1308 match self {
1309 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1310 LogicalPlan::Filter(filter) => {
1311 if filter.is_scalar() {
1312 Some(1)
1313 } else {
1314 filter.input.max_rows()
1315 }
1316 }
1317 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1318 LogicalPlan::Aggregate(Aggregate {
1319 input, group_expr, ..
1320 }) => {
1321 if group_expr
1323 .iter()
1324 .all(|expr| matches!(expr, Expr::Literal(_, _)))
1325 {
1326 Some(1)
1327 } else {
1328 input.max_rows()
1329 }
1330 }
1331 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1332 match (fetch, input.max_rows()) {
1333 (Some(fetch_limit), Some(input_max)) => {
1334 Some(input_max.min(*fetch_limit))
1335 }
1336 (Some(fetch_limit), None) => Some(*fetch_limit),
1337 (None, Some(input_max)) => Some(input_max),
1338 (None, None) => None,
1339 }
1340 }
1341 LogicalPlan::Join(Join {
1342 left,
1343 right,
1344 join_type,
1345 ..
1346 }) => match join_type {
1347 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1348 JoinType::Left | JoinType::Right | JoinType::Full => {
1349 match (left.max_rows()?, right.max_rows()?, join_type) {
1350 (0, 0, _) => Some(0),
1351 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1352 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1353 (left_max, right_max, _) => Some(left_max * right_max),
1354 }
1355 }
1356 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1357 left.max_rows()
1358 }
1359 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1360 right.max_rows()
1361 }
1362 },
1363 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1364 LogicalPlan::Union(Union { inputs, .. }) => {
1365 inputs.iter().try_fold(0usize, |mut acc, plan| {
1366 acc += plan.max_rows()?;
1367 Some(acc)
1368 })
1369 }
1370 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1371 LogicalPlan::EmptyRelation(_) => Some(0),
1372 LogicalPlan::RecursiveQuery(_) => None,
1373 LogicalPlan::Subquery(_) => None,
1374 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1375 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1376 Ok(FetchType::Literal(s)) => s,
1377 _ => None,
1378 },
1379 LogicalPlan::Distinct(
1380 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1381 ) => input.max_rows(),
1382 LogicalPlan::Values(v) => Some(v.values.len()),
1383 LogicalPlan::Unnest(_) => None,
1384 LogicalPlan::Ddl(_)
1385 | LogicalPlan::Explain(_)
1386 | LogicalPlan::Analyze(_)
1387 | LogicalPlan::Dml(_)
1388 | LogicalPlan::Copy(_)
1389 | LogicalPlan::DescribeTable(_)
1390 | LogicalPlan::Statement(_)
1391 | LogicalPlan::Extension(_) => None,
1392 }
1393 }
1394
1395 pub fn skip(&self) -> Result<Option<usize>> {
1400 match self {
1401 LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
1402 SkipType::Literal(0) => Ok(None),
1403 SkipType::Literal(n) => Ok(Some(n)),
1404 SkipType::UnsupportedExpr => Ok(None),
1405 },
1406 LogicalPlan::Sort(_) => Ok(None),
1407 LogicalPlan::TableScan(_) => Ok(None),
1408 LogicalPlan::Projection(_) => Ok(None),
1409 LogicalPlan::Filter(_) => Ok(None),
1410 LogicalPlan::Window(_) => Ok(None),
1411 LogicalPlan::Aggregate(_) => Ok(None),
1412 LogicalPlan::Join(_) => Ok(None),
1413 LogicalPlan::Repartition(_) => Ok(None),
1414 LogicalPlan::Union(_) => Ok(None),
1415 LogicalPlan::EmptyRelation(_) => Ok(None),
1416 LogicalPlan::Subquery(_) => Ok(None),
1417 LogicalPlan::SubqueryAlias(_) => Ok(None),
1418 LogicalPlan::Statement(_) => Ok(None),
1419 LogicalPlan::Values(_) => Ok(None),
1420 LogicalPlan::Explain(_) => Ok(None),
1421 LogicalPlan::Analyze(_) => Ok(None),
1422 LogicalPlan::Extension(_) => Ok(None),
1423 LogicalPlan::Distinct(_) => Ok(None),
1424 LogicalPlan::Dml(_) => Ok(None),
1425 LogicalPlan::Ddl(_) => Ok(None),
1426 LogicalPlan::Copy(_) => Ok(None),
1427 LogicalPlan::DescribeTable(_) => Ok(None),
1428 LogicalPlan::Unnest(_) => Ok(None),
1429 LogicalPlan::RecursiveQuery(_) => Ok(None),
1430 }
1431 }
1432
1433 pub fn fetch(&self) -> Result<Option<usize>> {
1439 match self {
1440 LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
1441 LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
1442 LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
1443 FetchType::Literal(s) => Ok(s),
1444 FetchType::UnsupportedExpr => Ok(None),
1445 },
1446 LogicalPlan::Projection(_) => Ok(None),
1447 LogicalPlan::Filter(_) => Ok(None),
1448 LogicalPlan::Window(_) => Ok(None),
1449 LogicalPlan::Aggregate(_) => Ok(None),
1450 LogicalPlan::Join(_) => Ok(None),
1451 LogicalPlan::Repartition(_) => Ok(None),
1452 LogicalPlan::Union(_) => Ok(None),
1453 LogicalPlan::EmptyRelation(_) => Ok(None),
1454 LogicalPlan::Subquery(_) => Ok(None),
1455 LogicalPlan::SubqueryAlias(_) => Ok(None),
1456 LogicalPlan::Statement(_) => Ok(None),
1457 LogicalPlan::Values(_) => Ok(None),
1458 LogicalPlan::Explain(_) => Ok(None),
1459 LogicalPlan::Analyze(_) => Ok(None),
1460 LogicalPlan::Extension(_) => Ok(None),
1461 LogicalPlan::Distinct(_) => Ok(None),
1462 LogicalPlan::Dml(_) => Ok(None),
1463 LogicalPlan::Ddl(_) => Ok(None),
1464 LogicalPlan::Copy(_) => Ok(None),
1465 LogicalPlan::DescribeTable(_) => Ok(None),
1466 LogicalPlan::Unnest(_) => Ok(None),
1467 LogicalPlan::RecursiveQuery(_) => Ok(None),
1468 }
1469 }
1470
1471 pub fn contains_outer_reference(&self) -> bool {
1473 let mut contains = false;
1474 self.apply_expressions(|expr| {
1475 Ok(if expr.contains_outer() {
1476 contains = true;
1477 TreeNodeRecursion::Stop
1478 } else {
1479 TreeNodeRecursion::Continue
1480 })
1481 })
1482 .unwrap();
1483 contains
1484 }
1485
1486 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1494 match self {
1495 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1496 .output_expressions()?
1497 .into_iter()
1498 .zip(self.schema().columns())
1499 .collect()),
1500 LogicalPlan::Window(Window {
1501 window_expr,
1502 input,
1503 schema,
1504 }) => {
1505 let mut output_exprs = input.columnized_output_exprs()?;
1513 let input_len = input.schema().fields().len();
1514 output_exprs.extend(
1515 window_expr
1516 .iter()
1517 .zip(schema.columns().into_iter().skip(input_len)),
1518 );
1519 Ok(output_exprs)
1520 }
1521 _ => Ok(vec![]),
1522 }
1523 }
1524}
1525
1526impl LogicalPlan {
1527 pub fn replace_params_with_values(
1534 self,
1535 param_values: &ParamValues,
1536 ) -> Result<LogicalPlan> {
1537 self.transform_up_with_subqueries(|plan| {
1538 let schema = Arc::clone(plan.schema());
1539 let name_preserver = NamePreserver::new(&plan);
1540 plan.map_expressions(|e| {
1541 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1542 if !has_placeholder {
1543 Ok(Transformed::no(e))
1547 } else {
1548 let original_name = name_preserver.save(&e);
1549 let transformed_expr = e.transform_up(|e| {
1550 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1551 let (value, metadata) = param_values
1552 .get_placeholders_with_values(&id)?
1553 .into_inner();
1554 Ok(Transformed::yes(Expr::Literal(value, metadata)))
1555 } else {
1556 Ok(Transformed::no(e))
1557 }
1558 })?;
1559 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1561 }
1562 })?
1563 .map_data(|plan| plan.update_schema_data_type())
1564 })
1565 .map(|res| res.data)
1566 }
1567
1568 fn update_schema_data_type(self) -> Result<LogicalPlan> {
1574 match self {
1575 LogicalPlan::Values(Values { values, schema: _ }) => {
1579 LogicalPlanBuilder::values(values)?.build()
1580 }
1581 plan => plan.recompute_schema(),
1583 }
1584 }
1585
1586 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1588 let mut param_names = HashSet::new();
1589 self.apply_with_subqueries(|plan| {
1590 plan.apply_expressions(|expr| {
1591 expr.apply(|expr| {
1592 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1593 param_names.insert(id.clone());
1594 }
1595 Ok(TreeNodeRecursion::Continue)
1596 })
1597 })
1598 })
1599 .map(|_| param_names)
1600 }
1601
1602 pub fn get_parameter_types(
1607 &self,
1608 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1609 let mut parameter_fields = self.get_parameter_fields()?;
1610 Ok(parameter_fields
1611 .drain()
1612 .map(|(name, maybe_field)| {
1613 (name, maybe_field.map(|field| field.data_type().clone()))
1614 })
1615 .collect())
1616 }
1617
1618 pub fn get_parameter_fields(
1620 &self,
1621 ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1622 let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1623
1624 self.apply_with_subqueries(|plan| {
1625 plan.apply_expressions(|expr| {
1626 expr.apply(|expr| {
1627 if let Expr::Placeholder(Placeholder { id, field }) = expr {
1628 let prev = param_types.get(id);
1629 match (prev, field) {
1630 (Some(Some(prev)), Some(field)) => {
1631 check_metadata_with_storage_equal(
1632 (field.data_type(), Some(field.metadata())),
1633 (prev.data_type(), Some(prev.metadata())),
1634 "parameter",
1635 &format!(": Conflicting types for id {id}"),
1636 )?;
1637 }
1638 (_, Some(field)) => {
1639 param_types.insert(id.clone(), Some(Arc::clone(field)));
1640 }
1641 _ => {
1642 param_types.insert(id.clone(), None);
1643 }
1644 }
1645 }
1646 Ok(TreeNodeRecursion::Continue)
1647 })
1648 })
1649 })
1650 .map(|_| param_types)
1651 }
1652
1653 pub fn display_indent(&self) -> impl Display + '_ {
1685 struct Wrapper<'a>(&'a LogicalPlan);
1688 impl Display for Wrapper<'_> {
1689 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1690 let with_schema = false;
1691 let mut visitor = IndentVisitor::new(f, with_schema);
1692 match self.0.visit_with_subqueries(&mut visitor) {
1693 Ok(_) => Ok(()),
1694 Err(_) => Err(fmt::Error),
1695 }
1696 }
1697 }
1698 Wrapper(self)
1699 }
1700
1701 pub fn display_indent_schema(&self) -> impl Display + '_ {
1731 struct Wrapper<'a>(&'a LogicalPlan);
1734 impl Display for Wrapper<'_> {
1735 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1736 let with_schema = true;
1737 let mut visitor = IndentVisitor::new(f, with_schema);
1738 match self.0.visit_with_subqueries(&mut visitor) {
1739 Ok(_) => Ok(()),
1740 Err(_) => Err(fmt::Error),
1741 }
1742 }
1743 }
1744 Wrapper(self)
1745 }
1746
1747 pub fn display_pg_json(&self) -> impl Display + '_ {
1751 struct Wrapper<'a>(&'a LogicalPlan);
1754 impl Display for Wrapper<'_> {
1755 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1756 let mut visitor = PgJsonVisitor::new(f);
1757 visitor.with_schema(true);
1758 match self.0.visit_with_subqueries(&mut visitor) {
1759 Ok(_) => Ok(()),
1760 Err(_) => Err(fmt::Error),
1761 }
1762 }
1763 }
1764 Wrapper(self)
1765 }
1766
1767 pub fn display_graphviz(&self) -> impl Display + '_ {
1797 struct Wrapper<'a>(&'a LogicalPlan);
1800 impl Display for Wrapper<'_> {
1801 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1802 let mut visitor = GraphvizVisitor::new(f);
1803
1804 visitor.start_graph()?;
1805
1806 visitor.pre_visit_plan("LogicalPlan")?;
1807 self.0
1808 .visit_with_subqueries(&mut visitor)
1809 .map_err(|_| fmt::Error)?;
1810 visitor.post_visit_plan()?;
1811
1812 visitor.set_with_schema(true);
1813 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1814 self.0
1815 .visit_with_subqueries(&mut visitor)
1816 .map_err(|_| fmt::Error)?;
1817 visitor.post_visit_plan()?;
1818
1819 visitor.end_graph()?;
1820 Ok(())
1821 }
1822 }
1823 Wrapper(self)
1824 }
1825
1826 pub fn display(&self) -> impl Display + '_ {
1848 struct Wrapper<'a>(&'a LogicalPlan);
1851 impl Display for Wrapper<'_> {
1852 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1853 match self.0 {
1854 LogicalPlan::EmptyRelation(EmptyRelation {
1855 produce_one_row,
1856 schema: _,
1857 }) => {
1858 let rows = if *produce_one_row { 1 } else { 0 };
1859 write!(f, "EmptyRelation: rows={rows}")
1860 }
1861 LogicalPlan::RecursiveQuery(RecursiveQuery {
1862 is_distinct, ..
1863 }) => {
1864 write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1865 }
1866 LogicalPlan::Values(Values { values, .. }) => {
1867 let str_values: Vec<_> = values
1868 .iter()
1869 .take(5)
1871 .map(|row| {
1872 let item = row
1873 .iter()
1874 .map(|expr| expr.to_string())
1875 .collect::<Vec<_>>()
1876 .join(", ");
1877 format!("({item})")
1878 })
1879 .collect();
1880
1881 let eclipse = if values.len() > 5 { "..." } else { "" };
1882 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1883 }
1884
1885 LogicalPlan::TableScan(TableScan {
1886 source,
1887 table_name,
1888 projection,
1889 filters,
1890 fetch,
1891 ..
1892 }) => {
1893 let projected_fields = match projection {
1894 Some(indices) => {
1895 let schema = source.schema();
1896 let names: Vec<&str> = indices
1897 .iter()
1898 .map(|i| schema.field(*i).name().as_str())
1899 .collect();
1900 format!(" projection=[{}]", names.join(", "))
1901 }
1902 _ => "".to_string(),
1903 };
1904
1905 write!(f, "TableScan: {table_name}{projected_fields}")?;
1906
1907 if !filters.is_empty() {
1908 let mut full_filter = vec![];
1909 let mut partial_filter = vec![];
1910 let mut unsupported_filters = vec![];
1911 let filters: Vec<&Expr> = filters.iter().collect();
1912
1913 if let Ok(results) =
1914 source.supports_filters_pushdown(&filters)
1915 {
1916 filters.iter().zip(results.iter()).for_each(
1917 |(x, res)| match res {
1918 TableProviderFilterPushDown::Exact => {
1919 full_filter.push(x)
1920 }
1921 TableProviderFilterPushDown::Inexact => {
1922 partial_filter.push(x)
1923 }
1924 TableProviderFilterPushDown::Unsupported => {
1925 unsupported_filters.push(x)
1926 }
1927 },
1928 );
1929 }
1930
1931 if !full_filter.is_empty() {
1932 write!(
1933 f,
1934 ", full_filters=[{}]",
1935 expr_vec_fmt!(full_filter)
1936 )?;
1937 };
1938 if !partial_filter.is_empty() {
1939 write!(
1940 f,
1941 ", partial_filters=[{}]",
1942 expr_vec_fmt!(partial_filter)
1943 )?;
1944 }
1945 if !unsupported_filters.is_empty() {
1946 write!(
1947 f,
1948 ", unsupported_filters=[{}]",
1949 expr_vec_fmt!(unsupported_filters)
1950 )?;
1951 }
1952 }
1953
1954 if let Some(n) = fetch {
1955 write!(f, ", fetch={n}")?;
1956 }
1957
1958 Ok(())
1959 }
1960 LogicalPlan::Projection(Projection { expr, .. }) => {
1961 write!(f, "Projection:")?;
1962 for (i, expr_item) in expr.iter().enumerate() {
1963 if i > 0 {
1964 write!(f, ",")?;
1965 }
1966 write!(f, " {expr_item}")?;
1967 }
1968 Ok(())
1969 }
1970 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1971 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1972 }
1973 LogicalPlan::Copy(CopyTo {
1974 input: _,
1975 output_url,
1976 file_type,
1977 options,
1978 ..
1979 }) => {
1980 let op_str = options
1981 .iter()
1982 .map(|(k, v)| format!("{k} {v}"))
1983 .collect::<Vec<String>>()
1984 .join(", ");
1985
1986 write!(
1987 f,
1988 "CopyTo: format={} output_url={output_url} options: ({op_str})",
1989 file_type.get_ext()
1990 )
1991 }
1992 LogicalPlan::Ddl(ddl) => {
1993 write!(f, "{}", ddl.display())
1994 }
1995 LogicalPlan::Filter(Filter {
1996 predicate: expr, ..
1997 }) => write!(f, "Filter: {expr}"),
1998 LogicalPlan::Window(Window { window_expr, .. }) => {
1999 write!(
2000 f,
2001 "WindowAggr: windowExpr=[[{}]]",
2002 expr_vec_fmt!(window_expr)
2003 )
2004 }
2005 LogicalPlan::Aggregate(Aggregate {
2006 group_expr,
2007 aggr_expr,
2008 ..
2009 }) => write!(
2010 f,
2011 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
2012 expr_vec_fmt!(group_expr),
2013 expr_vec_fmt!(aggr_expr)
2014 ),
2015 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
2016 write!(f, "Sort: ")?;
2017 for (i, expr_item) in expr.iter().enumerate() {
2018 if i > 0 {
2019 write!(f, ", ")?;
2020 }
2021 write!(f, "{expr_item}")?;
2022 }
2023 if let Some(a) = fetch {
2024 write!(f, ", fetch={a}")?;
2025 }
2026
2027 Ok(())
2028 }
2029 LogicalPlan::Join(Join {
2030 on: keys,
2031 filter,
2032 join_constraint,
2033 join_type,
2034 ..
2035 }) => {
2036 let join_expr: Vec<String> =
2037 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
2038 let filter_expr = filter
2039 .as_ref()
2040 .map(|expr| format!(" Filter: {expr}"))
2041 .unwrap_or_else(|| "".to_string());
2042 let join_type = if filter.is_none()
2043 && keys.is_empty()
2044 && *join_type == JoinType::Inner
2045 {
2046 "Cross".to_string()
2047 } else {
2048 join_type.to_string()
2049 };
2050 match join_constraint {
2051 JoinConstraint::On => {
2052 write!(f, "{join_type} Join:",)?;
2053 if !join_expr.is_empty() || !filter_expr.is_empty() {
2054 write!(
2055 f,
2056 " {}{}",
2057 join_expr.join(", "),
2058 filter_expr
2059 )?;
2060 }
2061 Ok(())
2062 }
2063 JoinConstraint::Using => {
2064 write!(
2065 f,
2066 "{} Join: Using {}{}",
2067 join_type,
2068 join_expr.join(", "),
2069 filter_expr,
2070 )
2071 }
2072 }
2073 }
2074 LogicalPlan::Repartition(Repartition {
2075 partitioning_scheme,
2076 ..
2077 }) => match partitioning_scheme {
2078 Partitioning::RoundRobinBatch(n) => {
2079 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
2080 }
2081 Partitioning::Hash(expr, n) => {
2082 let hash_expr: Vec<String> =
2083 expr.iter().map(|e| format!("{e}")).collect();
2084 write!(
2085 f,
2086 "Repartition: Hash({}) partition_count={}",
2087 hash_expr.join(", "),
2088 n
2089 )
2090 }
2091 Partitioning::DistributeBy(expr) => {
2092 let dist_by_expr: Vec<String> =
2093 expr.iter().map(|e| format!("{e}")).collect();
2094 write!(
2095 f,
2096 "Repartition: DistributeBy({})",
2097 dist_by_expr.join(", "),
2098 )
2099 }
2100 },
2101 LogicalPlan::Limit(limit) => {
2102 let skip_str = match limit.get_skip_type() {
2104 Ok(SkipType::Literal(n)) => n.to_string(),
2105 _ => limit
2106 .skip
2107 .as_ref()
2108 .map_or_else(|| "None".to_string(), |x| x.to_string()),
2109 };
2110 let fetch_str = match limit.get_fetch_type() {
2111 Ok(FetchType::Literal(Some(n))) => n.to_string(),
2112 Ok(FetchType::Literal(None)) => "None".to_string(),
2113 _ => limit
2114 .fetch
2115 .as_ref()
2116 .map_or_else(|| "None".to_string(), |x| x.to_string()),
2117 };
2118 write!(f, "Limit: skip={skip_str}, fetch={fetch_str}",)
2119 }
2120 LogicalPlan::Subquery(Subquery { .. }) => {
2121 write!(f, "Subquery:")
2122 }
2123 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
2124 write!(f, "SubqueryAlias: {alias}")
2125 }
2126 LogicalPlan::Statement(statement) => {
2127 write!(f, "{}", statement.display())
2128 }
2129 LogicalPlan::Distinct(distinct) => match distinct {
2130 Distinct::All(_) => write!(f, "Distinct:"),
2131 Distinct::On(DistinctOn {
2132 on_expr,
2133 select_expr,
2134 sort_expr,
2135 ..
2136 }) => write!(
2137 f,
2138 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2139 expr_vec_fmt!(on_expr),
2140 expr_vec_fmt!(select_expr),
2141 if let Some(sort_expr) = sort_expr {
2142 expr_vec_fmt!(sort_expr)
2143 } else {
2144 "".to_string()
2145 },
2146 ),
2147 },
2148 LogicalPlan::Explain { .. } => write!(f, "Explain"),
2149 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2150 LogicalPlan::Union(_) => write!(f, "Union"),
2151 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2152 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2153 write!(f, "DescribeTable")
2154 }
2155 LogicalPlan::Unnest(Unnest {
2156 input: plan,
2157 list_type_columns: list_col_indices,
2158 struct_type_columns: struct_col_indices,
2159 ..
2160 }) => {
2161 let input_columns = plan.schema().columns();
2162 let list_type_columns = list_col_indices
2163 .iter()
2164 .map(|(i, unnest_info)| {
2165 format!(
2166 "{}|depth={}",
2167 &input_columns[*i].to_string(),
2168 unnest_info.depth
2169 )
2170 })
2171 .collect::<Vec<String>>();
2172 let struct_type_columns = struct_col_indices
2173 .iter()
2174 .map(|i| &input_columns[*i])
2175 .collect::<Vec<&Column>>();
2176 write!(
2178 f,
2179 "Unnest: lists[{}] structs[{}]",
2180 expr_vec_fmt!(list_type_columns),
2181 expr_vec_fmt!(struct_type_columns)
2182 )
2183 }
2184 }
2185 }
2186 }
2187 Wrapper(self)
2188 }
2189}
2190
2191impl Display for LogicalPlan {
2192 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2193 self.display_indent().fmt(f)
2194 }
2195}
2196
2197impl ToStringifiedPlan for LogicalPlan {
2198 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2199 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2200 }
2201}
2202
2203#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2207pub struct EmptyRelation {
2208 pub produce_one_row: bool,
2210 pub schema: DFSchemaRef,
2212}
2213
2214impl PartialOrd for EmptyRelation {
2216 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2217 self.produce_one_row
2218 .partial_cmp(&other.produce_one_row)
2219 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2221 }
2222}
2223
2224#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2247pub struct RecursiveQuery {
2248 pub name: String,
2250 pub static_term: Arc<LogicalPlan>,
2252 pub recursive_term: Arc<LogicalPlan>,
2255 pub is_distinct: bool,
2258}
2259
2260#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2264pub struct Values {
2265 pub schema: DFSchemaRef,
2267 pub values: Vec<Vec<Expr>>,
2269}
2270
2271impl PartialOrd for Values {
2273 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2274 self.values
2275 .partial_cmp(&other.values)
2276 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2278 }
2279}
2280
2281#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2284#[non_exhaustive]
2286pub struct Projection {
2287 pub expr: Vec<Expr>,
2289 pub input: Arc<LogicalPlan>,
2291 pub schema: DFSchemaRef,
2293}
2294
2295impl PartialOrd for Projection {
2297 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2298 match self.expr.partial_cmp(&other.expr) {
2299 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2300 cmp => cmp,
2301 }
2302 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2304 }
2305}
2306
2307impl Projection {
2308 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2310 let projection_schema = projection_schema(&input, &expr)?;
2311 Self::try_new_with_schema(expr, input, projection_schema)
2312 }
2313
2314 pub fn try_new_with_schema(
2316 expr: Vec<Expr>,
2317 input: Arc<LogicalPlan>,
2318 schema: DFSchemaRef,
2319 ) -> Result<Self> {
2320 #[expect(deprecated)]
2321 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2322 && expr.len() != schema.fields().len()
2323 {
2324 return plan_err!(
2325 "Projection has mismatch between number of expressions ({}) and number of fields in schema ({})",
2326 expr.len(),
2327 schema.fields().len()
2328 );
2329 }
2330 Ok(Self {
2331 expr,
2332 input,
2333 schema,
2334 })
2335 }
2336
2337 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2339 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2340 Self {
2341 expr,
2342 input,
2343 schema,
2344 }
2345 }
2346}
2347
2348pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2368 let metadata = input.schema().metadata().clone();
2370
2371 let schema =
2373 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2374 .with_functional_dependencies(calc_func_dependencies_for_project(
2375 exprs, input,
2376 )?)?;
2377
2378 Ok(Arc::new(schema))
2379}
2380
2381#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2383#[non_exhaustive]
2385pub struct SubqueryAlias {
2386 pub input: Arc<LogicalPlan>,
2388 pub alias: TableReference,
2390 pub schema: DFSchemaRef,
2392}
2393
2394impl SubqueryAlias {
2395 pub fn try_new(
2396 plan: Arc<LogicalPlan>,
2397 alias: impl Into<TableReference>,
2398 ) -> Result<Self> {
2399 let alias = alias.into();
2400
2401 let aliases = unique_field_aliases(plan.schema().fields());
2407 let is_projection_needed = aliases.iter().any(Option::is_some);
2408
2409 let plan = if is_projection_needed {
2411 let projection_expressions = aliases
2412 .iter()
2413 .zip(plan.schema().iter())
2414 .map(|(alias, (qualifier, field))| {
2415 let column =
2416 Expr::Column(Column::new(qualifier.cloned(), field.name()));
2417 match alias {
2418 None => column,
2419 Some(alias) => {
2420 Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2421 }
2422 }
2423 })
2424 .collect();
2425 let projection = Projection::try_new(projection_expressions, plan)?;
2426 Arc::new(LogicalPlan::Projection(projection))
2427 } else {
2428 plan
2429 };
2430
2431 let fields = plan.schema().fields().clone();
2433 let meta_data = plan.schema().metadata().clone();
2434 let func_dependencies = plan.schema().functional_dependencies().clone();
2435
2436 let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2437 let schema = schema.as_arrow();
2438
2439 let schema = DFSchemaRef::new(
2440 DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2441 .with_functional_dependencies(func_dependencies)?,
2442 );
2443 Ok(SubqueryAlias {
2444 input: plan,
2445 alias,
2446 schema,
2447 })
2448 }
2449}
2450
2451impl PartialOrd for SubqueryAlias {
2453 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2454 match self.input.partial_cmp(&other.input) {
2455 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2456 cmp => cmp,
2457 }
2458 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2460 }
2461}
2462
2463#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2475#[non_exhaustive]
2476pub struct Filter {
2477 pub predicate: Expr,
2479 pub input: Arc<LogicalPlan>,
2481}
2482
2483impl Filter {
2484 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2489 Self::try_new_internal(predicate, input)
2490 }
2491
2492 #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2495 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2496 Self::try_new_internal(predicate, input)
2497 }
2498
2499 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2500 match data_type {
2501 DataType::Boolean | DataType::Null => true,
2503 DataType::Dictionary(_, value_type) => {
2504 Filter::is_allowed_filter_type(value_type.as_ref())
2505 }
2506 _ => false,
2507 }
2508 }
2509
2510 fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2511 if let Ok(predicate_type) = predicate.get_type(input.schema())
2516 && !Filter::is_allowed_filter_type(&predicate_type)
2517 {
2518 return plan_err!(
2519 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2520 );
2521 }
2522
2523 Ok(Self {
2524 predicate: predicate.unalias_nested().data,
2525 input,
2526 })
2527 }
2528
2529 fn is_scalar(&self) -> bool {
2545 let schema = self.input.schema();
2546
2547 let functional_dependencies = self.input.schema().functional_dependencies();
2548 let unique_keys = functional_dependencies.iter().filter(|dep| {
2549 let nullable = dep.nullable
2550 && dep
2551 .source_indices
2552 .iter()
2553 .any(|&source| schema.field(source).is_nullable());
2554 !nullable
2555 && dep.mode == Dependency::Single
2556 && dep.target_indices.len() == schema.fields().len()
2557 });
2558
2559 let exprs = split_conjunction(&self.predicate);
2560 let eq_pred_cols: HashSet<_> = exprs
2561 .iter()
2562 .filter_map(|expr| {
2563 let Expr::BinaryExpr(BinaryExpr {
2564 left,
2565 op: Operator::Eq,
2566 right,
2567 }) = expr
2568 else {
2569 return None;
2570 };
2571 if left == right {
2573 return None;
2574 }
2575
2576 match (left.as_ref(), right.as_ref()) {
2577 (Expr::Column(_), Expr::Column(_)) => None,
2578 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2579 Some(schema.index_of_column(c).unwrap())
2580 }
2581 _ => None,
2582 }
2583 })
2584 .collect();
2585
2586 for key in unique_keys {
2589 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2590 return true;
2591 }
2592 }
2593 false
2594 }
2595}
2596
2597#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2612pub struct Window {
2613 pub input: Arc<LogicalPlan>,
2615 pub window_expr: Vec<Expr>,
2617 pub schema: DFSchemaRef,
2619}
2620
2621impl Window {
2622 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2624 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2625 .schema()
2626 .iter()
2627 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2628 .collect();
2629 let input_len = fields.len();
2630 let mut window_fields = fields;
2631 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2632 window_fields.extend_from_slice(expr_fields.as_slice());
2633 let metadata = input.schema().metadata().clone();
2634
2635 let mut window_func_dependencies =
2637 input.schema().functional_dependencies().clone();
2638 window_func_dependencies.extend_target_indices(window_fields.len());
2639
2640 let mut new_dependencies = window_expr
2644 .iter()
2645 .enumerate()
2646 .filter_map(|(idx, expr)| {
2647 let Expr::WindowFunction(window_fun) = expr else {
2648 return None;
2649 };
2650 let WindowFunction {
2651 fun: WindowFunctionDefinition::WindowUDF(udwf),
2652 params: WindowFunctionParams { partition_by, .. },
2653 } = window_fun.as_ref()
2654 else {
2655 return None;
2656 };
2657 if udwf.name() == "row_number" && partition_by.is_empty() {
2660 Some(idx + input_len)
2661 } else {
2662 None
2663 }
2664 })
2665 .map(|idx| {
2666 FunctionalDependence::new(vec![idx], vec![], false)
2667 .with_mode(Dependency::Single)
2668 })
2669 .collect::<Vec<_>>();
2670
2671 if !new_dependencies.is_empty() {
2672 for dependence in new_dependencies.iter_mut() {
2673 dependence.target_indices = (0..window_fields.len()).collect();
2674 }
2675 let new_deps = FunctionalDependencies::new(new_dependencies);
2677 window_func_dependencies.extend(new_deps);
2678 }
2679
2680 if let Some(e) = window_expr.iter().find(|e| {
2682 matches!(
2683 e,
2684 Expr::WindowFunction(wf)
2685 if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2686 && wf.params.filter.is_some()
2687 )
2688 }) {
2689 return plan_err!(
2690 "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2691 );
2692 }
2693
2694 Self::try_new_with_schema(
2695 window_expr,
2696 input,
2697 Arc::new(
2698 DFSchema::new_with_metadata(window_fields, metadata)?
2699 .with_functional_dependencies(window_func_dependencies)?,
2700 ),
2701 )
2702 }
2703
2704 pub fn try_new_with_schema(
2710 window_expr: Vec<Expr>,
2711 input: Arc<LogicalPlan>,
2712 schema: DFSchemaRef,
2713 ) -> Result<Self> {
2714 let input_fields_count = input.schema().fields().len();
2715 if schema.fields().len() != input_fields_count + window_expr.len() {
2716 return plan_err!(
2717 "Window schema has wrong number of fields. Expected {} got {}",
2718 input_fields_count + window_expr.len(),
2719 schema.fields().len()
2720 );
2721 }
2722
2723 Ok(Window {
2724 input,
2725 window_expr,
2726 schema,
2727 })
2728 }
2729}
2730
2731impl PartialOrd for Window {
2733 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2734 match self.input.partial_cmp(&other.input)? {
2735 Ordering::Equal => {} not_equal => return Some(not_equal),
2737 }
2738
2739 match self.window_expr.partial_cmp(&other.window_expr)? {
2740 Ordering::Equal => {} not_equal => return Some(not_equal),
2742 }
2743
2744 if self == other {
2747 Some(Ordering::Equal)
2748 } else {
2749 None
2750 }
2751 }
2752}
2753
2754#[derive(Clone)]
2756pub struct TableScan {
2757 pub table_name: TableReference,
2759 pub source: Arc<dyn TableSource>,
2761 pub projection: Option<Vec<usize>>,
2763 pub projected_schema: DFSchemaRef,
2765 pub filters: Vec<Expr>,
2767 pub fetch: Option<usize>,
2769}
2770
2771impl Debug for TableScan {
2772 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2773 f.debug_struct("TableScan")
2774 .field("table_name", &self.table_name)
2775 .field("source", &"...")
2776 .field("projection", &self.projection)
2777 .field("projected_schema", &self.projected_schema)
2778 .field("filters", &self.filters)
2779 .field("fetch", &self.fetch)
2780 .finish_non_exhaustive()
2781 }
2782}
2783
2784impl PartialEq for TableScan {
2785 fn eq(&self, other: &Self) -> bool {
2786 self.table_name == other.table_name
2787 && self.projection == other.projection
2788 && self.projected_schema == other.projected_schema
2789 && self.filters == other.filters
2790 && self.fetch == other.fetch
2791 }
2792}
2793
2794impl Eq for TableScan {}
2795
2796impl PartialOrd for TableScan {
2799 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2800 #[derive(PartialEq, PartialOrd)]
2801 struct ComparableTableScan<'a> {
2802 pub table_name: &'a TableReference,
2804 pub projection: &'a Option<Vec<usize>>,
2806 pub filters: &'a Vec<Expr>,
2808 pub fetch: &'a Option<usize>,
2810 }
2811 let comparable_self = ComparableTableScan {
2812 table_name: &self.table_name,
2813 projection: &self.projection,
2814 filters: &self.filters,
2815 fetch: &self.fetch,
2816 };
2817 let comparable_other = ComparableTableScan {
2818 table_name: &other.table_name,
2819 projection: &other.projection,
2820 filters: &other.filters,
2821 fetch: &other.fetch,
2822 };
2823 comparable_self
2824 .partial_cmp(&comparable_other)
2825 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2827 }
2828}
2829
2830impl Hash for TableScan {
2831 fn hash<H: Hasher>(&self, state: &mut H) {
2832 self.table_name.hash(state);
2833 self.projection.hash(state);
2834 self.projected_schema.hash(state);
2835 self.filters.hash(state);
2836 self.fetch.hash(state);
2837 }
2838}
2839
2840impl TableScan {
2841 pub fn try_new(
2844 table_name: impl Into<TableReference>,
2845 table_source: Arc<dyn TableSource>,
2846 projection: Option<Vec<usize>>,
2847 filters: Vec<Expr>,
2848 fetch: Option<usize>,
2849 ) -> Result<Self> {
2850 let table_name = table_name.into();
2851
2852 if table_name.table().is_empty() {
2853 return plan_err!("table_name cannot be empty");
2854 }
2855 let schema = table_source.schema();
2856 let func_dependencies = FunctionalDependencies::new_from_constraints(
2857 table_source.constraints(),
2858 schema.fields.len(),
2859 );
2860 let projected_schema = projection
2861 .as_ref()
2862 .map(|p| {
2863 let projected_func_dependencies =
2864 func_dependencies.project_functional_dependencies(p, p.len());
2865
2866 let df_schema = DFSchema::new_with_metadata(
2867 p.iter()
2868 .map(|i| {
2869 (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2870 })
2871 .collect(),
2872 schema.metadata.clone(),
2873 )?;
2874 df_schema.with_functional_dependencies(projected_func_dependencies)
2875 })
2876 .unwrap_or_else(|| {
2877 let df_schema =
2878 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2879 df_schema.with_functional_dependencies(func_dependencies)
2880 })?;
2881 let projected_schema = Arc::new(projected_schema);
2882
2883 Ok(Self {
2884 table_name,
2885 source: table_source,
2886 projection,
2887 projected_schema,
2888 filters,
2889 fetch,
2890 })
2891 }
2892}
2893
2894#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2896pub struct Repartition {
2897 pub input: Arc<LogicalPlan>,
2899 pub partitioning_scheme: Partitioning,
2901}
2902
2903#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2905pub struct Union {
2906 pub inputs: Vec<Arc<LogicalPlan>>,
2908 pub schema: DFSchemaRef,
2910}
2911
2912impl Union {
2913 pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2916 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2917 Ok(Union { inputs, schema })
2918 }
2919
2920 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2925 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2926 Ok(Union { inputs, schema })
2927 }
2928
2929 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2933 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2934 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2935
2936 Ok(Union { inputs, schema })
2937 }
2938
2939 fn rewrite_inputs_from_schema(
2943 schema: &Arc<DFSchema>,
2944 inputs: Vec<Arc<LogicalPlan>>,
2945 ) -> Result<Vec<Arc<LogicalPlan>>> {
2946 let schema_width = schema.iter().count();
2947 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2948 for input in inputs {
2949 let mut expr = Vec::with_capacity(schema_width);
2953 for column in schema.columns() {
2954 if input
2955 .schema()
2956 .has_column_with_unqualified_name(column.name())
2957 {
2958 expr.push(Expr::Column(column));
2959 } else {
2960 expr.push(
2961 Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2962 );
2963 }
2964 }
2965 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2966 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2967 )));
2968 }
2969
2970 Ok(wrapped_inputs)
2971 }
2972
2973 fn derive_schema_from_inputs(
2982 inputs: &[Arc<LogicalPlan>],
2983 loose_types: bool,
2984 by_name: bool,
2985 ) -> Result<DFSchemaRef> {
2986 if inputs.len() < 2 {
2987 return plan_err!("UNION requires at least two inputs");
2988 }
2989
2990 if by_name {
2991 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2992 } else {
2993 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2994 }
2995 }
2996
2997 fn derive_schema_from_inputs_by_name(
2998 inputs: &[Arc<LogicalPlan>],
2999 loose_types: bool,
3000 ) -> Result<DFSchemaRef> {
3001 type FieldData<'a> =
3002 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
3003 let mut cols: Vec<(&str, FieldData)> = Vec::new();
3004 for input in inputs.iter() {
3005 for field in input.schema().fields() {
3006 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
3007 cols.iter_mut().find(|(name, _)| name == field.name())
3008 {
3009 if !loose_types && *data_type != field.data_type() {
3010 return plan_err!(
3011 "Found different types for field {}",
3012 field.name()
3013 );
3014 }
3015
3016 metadata.push(field.metadata());
3017 *is_nullable |= field.is_nullable();
3020 *occurrences += 1;
3021 } else {
3022 cols.push((
3023 field.name(),
3024 (
3025 field.data_type(),
3026 field.is_nullable(),
3027 vec![field.metadata()],
3028 1,
3029 ),
3030 ));
3031 }
3032 }
3033 }
3034
3035 let union_fields = cols
3036 .into_iter()
3037 .map(
3038 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
3039 let final_is_nullable = if occurrences == inputs.len() {
3043 is_nullable
3044 } else {
3045 true
3046 };
3047
3048 let mut field =
3049 Field::new(name, data_type.clone(), final_is_nullable);
3050 field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
3051
3052 (None, Arc::new(field))
3053 },
3054 )
3055 .collect::<Vec<(Option<TableReference>, _)>>();
3056
3057 let union_schema_metadata = intersect_metadata_for_union(
3058 inputs.iter().map(|input| input.schema().metadata()),
3059 );
3060
3061 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3063 let schema = Arc::new(schema);
3064
3065 Ok(schema)
3066 }
3067
3068 fn derive_schema_from_inputs_by_position(
3069 inputs: &[Arc<LogicalPlan>],
3070 loose_types: bool,
3071 ) -> Result<DFSchemaRef> {
3072 let first_schema = inputs[0].schema();
3073 let fields_count = first_schema.fields().len();
3074 for input in inputs.iter().skip(1) {
3075 if fields_count != input.schema().fields().len() {
3076 return plan_err!(
3077 "UNION queries have different number of columns: \
3078 left has {} columns whereas right has {} columns",
3079 fields_count,
3080 input.schema().fields().len()
3081 );
3082 }
3083 }
3084
3085 let mut name_counts: HashMap<String, usize> = HashMap::new();
3086 let union_fields = (0..fields_count)
3087 .map(|i| {
3088 let fields = inputs
3089 .iter()
3090 .map(|input| input.schema().field(i))
3091 .collect::<Vec<_>>();
3092 let first_field = fields[0];
3093 let base_name = first_field.name().to_string();
3094
3095 let data_type = if loose_types {
3096 first_field.data_type()
3100 } else {
3101 fields.iter().skip(1).try_fold(
3102 first_field.data_type(),
3103 |acc, field| {
3104 if acc != field.data_type() {
3105 return plan_err!(
3106 "UNION field {i} have different type in inputs: \
3107 left has {} whereas right has {}",
3108 first_field.data_type(),
3109 field.data_type()
3110 );
3111 }
3112 Ok(acc)
3113 },
3114 )?
3115 };
3116 let nullable = fields.iter().any(|field| field.is_nullable());
3117
3118 let name = if let Some(count) = name_counts.get_mut(&base_name) {
3120 *count += 1;
3121 format!("{base_name}_{count}")
3122 } else {
3123 name_counts.insert(base_name.clone(), 0);
3124 base_name
3125 };
3126
3127 let mut field = Field::new(&name, data_type.clone(), nullable);
3128 let field_metadata = intersect_metadata_for_union(
3129 fields.iter().map(|field| field.metadata()),
3130 );
3131 field.set_metadata(field_metadata);
3132 Ok((None, Arc::new(field)))
3133 })
3134 .collect::<Result<_>>()?;
3135 let union_schema_metadata = intersect_metadata_for_union(
3136 inputs.iter().map(|input| input.schema().metadata()),
3137 );
3138
3139 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3141 let schema = Arc::new(schema);
3142
3143 Ok(schema)
3144 }
3145}
3146
3147impl PartialOrd for Union {
3149 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3150 self.inputs
3151 .partial_cmp(&other.inputs)
3152 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3154 }
3155}
3156
3157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3180pub struct DescribeTable {
3181 pub schema: Arc<Schema>,
3183 pub output_schema: DFSchemaRef,
3185}
3186
3187impl PartialOrd for DescribeTable {
3190 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3191 None
3193 }
3194}
3195
3196#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3198pub struct ExplainOption {
3199 pub verbose: bool,
3201 pub analyze: bool,
3203 pub format: ExplainFormat,
3205}
3206
3207impl Default for ExplainOption {
3208 fn default() -> Self {
3209 ExplainOption {
3210 verbose: false,
3211 analyze: false,
3212 format: ExplainFormat::Indent,
3213 }
3214 }
3215}
3216
3217impl ExplainOption {
3218 pub fn with_verbose(mut self, verbose: bool) -> Self {
3220 self.verbose = verbose;
3221 self
3222 }
3223
3224 pub fn with_analyze(mut self, analyze: bool) -> Self {
3226 self.analyze = analyze;
3227 self
3228 }
3229
3230 pub fn with_format(mut self, format: ExplainFormat) -> Self {
3232 self.format = format;
3233 self
3234 }
3235}
3236
3237#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3244pub struct Explain {
3245 pub verbose: bool,
3247 pub explain_format: ExplainFormat,
3250 pub plan: Arc<LogicalPlan>,
3252 pub stringified_plans: Vec<StringifiedPlan>,
3254 pub schema: DFSchemaRef,
3256 pub logical_optimization_succeeded: bool,
3258}
3259
3260impl PartialOrd for Explain {
3262 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3263 #[derive(PartialEq, PartialOrd)]
3264 struct ComparableExplain<'a> {
3265 pub verbose: &'a bool,
3267 pub plan: &'a Arc<LogicalPlan>,
3269 pub stringified_plans: &'a Vec<StringifiedPlan>,
3271 pub logical_optimization_succeeded: &'a bool,
3273 }
3274 let comparable_self = ComparableExplain {
3275 verbose: &self.verbose,
3276 plan: &self.plan,
3277 stringified_plans: &self.stringified_plans,
3278 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3279 };
3280 let comparable_other = ComparableExplain {
3281 verbose: &other.verbose,
3282 plan: &other.plan,
3283 stringified_plans: &other.stringified_plans,
3284 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3285 };
3286 comparable_self
3287 .partial_cmp(&comparable_other)
3288 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3290 }
3291}
3292
3293#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3296pub struct Analyze {
3297 pub verbose: bool,
3299 pub input: Arc<LogicalPlan>,
3301 pub schema: DFSchemaRef,
3303}
3304
3305impl PartialOrd for Analyze {
3307 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3308 match self.verbose.partial_cmp(&other.verbose) {
3309 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3310 cmp => cmp,
3311 }
3312 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3314 }
3315}
3316
3317#[allow(clippy::allow_attributes)]
3322#[allow(clippy::derived_hash_with_manual_eq)]
3323#[derive(Debug, Clone, Eq, Hash)]
3324pub struct Extension {
3325 pub node: Arc<dyn UserDefinedLogicalNode>,
3327}
3328
3329impl PartialEq for Extension {
3333 fn eq(&self, other: &Self) -> bool {
3334 self.node.eq(&other.node)
3335 }
3336}
3337
3338impl PartialOrd for Extension {
3339 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3340 self.node.partial_cmp(&other.node)
3341 }
3342}
3343
3344#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3346pub struct Limit {
3347 pub skip: Option<Box<Expr>>,
3349 pub fetch: Option<Box<Expr>>,
3352 pub input: Arc<LogicalPlan>,
3354}
3355
3356pub enum SkipType {
3358 Literal(usize),
3360 UnsupportedExpr,
3362}
3363
3364pub enum FetchType {
3366 Literal(Option<usize>),
3369 UnsupportedExpr,
3371}
3372
3373impl Limit {
3374 pub fn get_skip_type(&self) -> Result<SkipType> {
3376 match self.skip.as_deref() {
3377 Some(expr) => match *expr {
3378 Expr::Literal(ScalarValue::Int64(s), _) => {
3379 let s = s.unwrap_or(0);
3381 if s >= 0 {
3382 Ok(SkipType::Literal(s as usize))
3383 } else {
3384 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3385 }
3386 }
3387 _ => Ok(SkipType::UnsupportedExpr),
3388 },
3389 None => Ok(SkipType::Literal(0)),
3391 }
3392 }
3393
3394 pub fn get_fetch_type(&self) -> Result<FetchType> {
3396 match self.fetch.as_deref() {
3397 Some(expr) => match *expr {
3398 Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3399 if s >= 0 {
3400 Ok(FetchType::Literal(Some(s as usize)))
3401 } else {
3402 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3403 }
3404 }
3405 Expr::Literal(ScalarValue::Int64(None), _) => {
3406 Ok(FetchType::Literal(None))
3407 }
3408 _ => Ok(FetchType::UnsupportedExpr),
3409 },
3410 None => Ok(FetchType::Literal(None)),
3411 }
3412 }
3413}
3414
3415#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3417pub enum Distinct {
3418 All(Arc<LogicalPlan>),
3420 On(DistinctOn),
3422}
3423
3424impl Distinct {
3425 pub fn input(&self) -> &Arc<LogicalPlan> {
3427 match self {
3428 Distinct::All(input) => input,
3429 Distinct::On(DistinctOn { input, .. }) => input,
3430 }
3431 }
3432}
3433
3434#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3436pub struct DistinctOn {
3437 pub on_expr: Vec<Expr>,
3439 pub select_expr: Vec<Expr>,
3441 pub sort_expr: Option<Vec<SortExpr>>,
3445 pub input: Arc<LogicalPlan>,
3447 pub schema: DFSchemaRef,
3449}
3450
3451impl DistinctOn {
3452 pub fn try_new(
3454 on_expr: Vec<Expr>,
3455 select_expr: Vec<Expr>,
3456 sort_expr: Option<Vec<SortExpr>>,
3457 input: Arc<LogicalPlan>,
3458 ) -> Result<Self> {
3459 if on_expr.is_empty() {
3460 return plan_err!("No `ON` expressions provided");
3461 }
3462
3463 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3464 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3465 .into_iter()
3466 .collect();
3467
3468 let dfschema = DFSchema::new_with_metadata(
3469 qualified_fields,
3470 input.schema().metadata().clone(),
3471 )?;
3472
3473 let mut distinct_on = DistinctOn {
3474 on_expr,
3475 select_expr,
3476 sort_expr: None,
3477 input,
3478 schema: Arc::new(dfschema),
3479 };
3480
3481 if let Some(sort_expr) = sort_expr {
3482 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3483 }
3484
3485 Ok(distinct_on)
3486 }
3487
3488 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3492 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3493
3494 let mut matched = true;
3496 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3497 if on != &sort.expr {
3498 matched = false;
3499 break;
3500 }
3501 }
3502
3503 if self.on_expr.len() > sort_expr.len() || !matched {
3504 return plan_err!(
3505 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3506 );
3507 }
3508
3509 self.sort_expr = Some(sort_expr);
3510 Ok(self)
3511 }
3512}
3513
3514impl PartialOrd for DistinctOn {
3516 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3517 #[derive(PartialEq, PartialOrd)]
3518 struct ComparableDistinctOn<'a> {
3519 pub on_expr: &'a Vec<Expr>,
3521 pub select_expr: &'a Vec<Expr>,
3523 pub sort_expr: &'a Option<Vec<SortExpr>>,
3527 pub input: &'a Arc<LogicalPlan>,
3529 }
3530 let comparable_self = ComparableDistinctOn {
3531 on_expr: &self.on_expr,
3532 select_expr: &self.select_expr,
3533 sort_expr: &self.sort_expr,
3534 input: &self.input,
3535 };
3536 let comparable_other = ComparableDistinctOn {
3537 on_expr: &other.on_expr,
3538 select_expr: &other.select_expr,
3539 sort_expr: &other.sort_expr,
3540 input: &other.input,
3541 };
3542 comparable_self
3543 .partial_cmp(&comparable_other)
3544 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3546 }
3547}
3548
3549#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3562#[non_exhaustive]
3564pub struct Aggregate {
3565 pub input: Arc<LogicalPlan>,
3567 pub group_expr: Vec<Expr>,
3569 pub aggr_expr: Vec<Expr>,
3571 pub schema: DFSchemaRef,
3573}
3574
3575impl Aggregate {
3576 pub fn try_new(
3578 input: Arc<LogicalPlan>,
3579 group_expr: Vec<Expr>,
3580 aggr_expr: Vec<Expr>,
3581 ) -> Result<Self> {
3582 let group_expr = enumerate_grouping_sets(group_expr)?;
3583
3584 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3585
3586 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3587
3588 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3589
3590 if is_grouping_set {
3592 qualified_fields = qualified_fields
3593 .into_iter()
3594 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3595 .collect::<Vec<_>>();
3596 qualified_fields.push((
3597 None,
3598 Field::new(
3599 Self::INTERNAL_GROUPING_ID,
3600 Self::grouping_id_type(qualified_fields.len()),
3601 false,
3602 )
3603 .into(),
3604 ));
3605 }
3606
3607 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3608
3609 let schema = DFSchema::new_with_metadata(
3610 qualified_fields,
3611 input.schema().metadata().clone(),
3612 )?;
3613
3614 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3615 }
3616
3617 #[expect(clippy::needless_pass_by_value)]
3623 pub fn try_new_with_schema(
3624 input: Arc<LogicalPlan>,
3625 group_expr: Vec<Expr>,
3626 aggr_expr: Vec<Expr>,
3627 schema: DFSchemaRef,
3628 ) -> Result<Self> {
3629 if group_expr.is_empty() && aggr_expr.is_empty() {
3630 return plan_err!(
3631 "Aggregate requires at least one grouping or aggregate expression. \
3632 Aggregate without grouping expressions nor aggregate expressions is \
3633 logically equivalent to, but less efficient than, VALUES producing \
3634 single row. Please use VALUES instead."
3635 );
3636 }
3637 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3638 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3639 return plan_err!(
3640 "Aggregate schema has wrong number of fields. Expected {} got {}",
3641 group_expr_count + aggr_expr.len(),
3642 schema.fields().len()
3643 );
3644 }
3645
3646 let aggregate_func_dependencies =
3647 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3648 let new_schema = schema.as_ref().clone();
3649 let schema = Arc::new(
3650 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3651 );
3652 Ok(Self {
3653 input,
3654 group_expr,
3655 aggr_expr,
3656 schema,
3657 })
3658 }
3659
3660 fn is_grouping_set(&self) -> bool {
3661 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3662 }
3663
3664 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3666 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3667 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3668 });
3669 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3670 if self.is_grouping_set() {
3671 exprs.push(&INTERNAL_ID_EXPR);
3672 }
3673 exprs.extend(self.aggr_expr.iter());
3674 debug_assert!(exprs.len() == self.schema.fields().len());
3675 Ok(exprs)
3676 }
3677
3678 pub fn group_expr_len(&self) -> Result<usize> {
3682 grouping_set_expr_count(&self.group_expr)
3683 }
3684
3685 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3690 if group_exprs <= 8 {
3691 DataType::UInt8
3692 } else if group_exprs <= 16 {
3693 DataType::UInt16
3694 } else if group_exprs <= 32 {
3695 DataType::UInt32
3696 } else {
3697 DataType::UInt64
3698 }
3699 }
3700
3701 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3719}
3720
3721impl PartialOrd for Aggregate {
3723 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3724 match self.input.partial_cmp(&other.input) {
3725 Some(Ordering::Equal) => {
3726 match self.group_expr.partial_cmp(&other.group_expr) {
3727 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3728 cmp => cmp,
3729 }
3730 }
3731 cmp => cmp,
3732 }
3733 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3735 }
3736}
3737
3738fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3740 group_expr
3741 .iter()
3742 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3743}
3744
3745fn calc_func_dependencies_for_aggregate(
3747 group_expr: &[Expr],
3749 input: &LogicalPlan,
3751 aggr_schema: &DFSchema,
3753) -> Result<FunctionalDependencies> {
3754 if !contains_grouping_set(group_expr) {
3760 let group_by_expr_names = group_expr
3761 .iter()
3762 .map(|item| item.schema_name().to_string())
3763 .collect::<IndexSet<_>>()
3764 .into_iter()
3765 .collect::<Vec<_>>();
3766 let aggregate_func_dependencies = aggregate_functional_dependencies(
3767 input.schema(),
3768 &group_by_expr_names,
3769 aggr_schema,
3770 );
3771 Ok(aggregate_func_dependencies)
3772 } else {
3773 Ok(FunctionalDependencies::empty())
3774 }
3775}
3776
3777fn calc_func_dependencies_for_project(
3780 exprs: &[Expr],
3781 input: &LogicalPlan,
3782) -> Result<FunctionalDependencies> {
3783 let input_fields = input.schema().field_names();
3784 let proj_indices = exprs
3786 .iter()
3787 .map(|expr| match expr {
3788 #[expect(deprecated)]
3789 Expr::Wildcard { qualifier, options } => {
3790 let wildcard_fields = exprlist_to_fields(
3791 vec![&Expr::Wildcard {
3792 qualifier: qualifier.clone(),
3793 options: options.clone(),
3794 }],
3795 input,
3796 )?;
3797 Ok::<_, DataFusionError>(
3798 wildcard_fields
3799 .into_iter()
3800 .filter_map(|(qualifier, f)| {
3801 let flat_name = qualifier
3802 .map(|t| format!("{}.{}", t, f.name()))
3803 .unwrap_or_else(|| f.name().clone());
3804 input_fields.iter().position(|item| *item == flat_name)
3805 })
3806 .collect::<Vec<_>>(),
3807 )
3808 }
3809 Expr::Alias(alias) => {
3810 let name = format!("{}", alias.expr);
3811 Ok(input_fields
3812 .iter()
3813 .position(|item| *item == name)
3814 .map(|i| vec![i])
3815 .unwrap_or(vec![]))
3816 }
3817 _ => {
3818 let name = format!("{expr}");
3819 Ok(input_fields
3820 .iter()
3821 .position(|item| *item == name)
3822 .map(|i| vec![i])
3823 .unwrap_or(vec![]))
3824 }
3825 })
3826 .collect::<Result<Vec<_>>>()?
3827 .into_iter()
3828 .flatten()
3829 .collect::<Vec<_>>();
3830
3831 Ok(input
3832 .schema()
3833 .functional_dependencies()
3834 .project_functional_dependencies(&proj_indices, exprs.len()))
3835}
3836
3837#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3839pub struct Sort {
3840 pub expr: Vec<SortExpr>,
3842 pub input: Arc<LogicalPlan>,
3844 pub fetch: Option<usize>,
3846}
3847
3848#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3850pub struct Join {
3851 pub left: Arc<LogicalPlan>,
3853 pub right: Arc<LogicalPlan>,
3855 pub on: Vec<(Expr, Expr)>,
3857 pub filter: Option<Expr>,
3859 pub join_type: JoinType,
3861 pub join_constraint: JoinConstraint,
3863 pub schema: DFSchemaRef,
3865 pub null_equality: NullEquality,
3867 pub null_aware: bool,
3875}
3876
3877impl Join {
3878 #[expect(clippy::too_many_arguments)]
3898 pub fn try_new(
3899 left: Arc<LogicalPlan>,
3900 right: Arc<LogicalPlan>,
3901 on: Vec<(Expr, Expr)>,
3902 filter: Option<Expr>,
3903 join_type: JoinType,
3904 join_constraint: JoinConstraint,
3905 null_equality: NullEquality,
3906 null_aware: bool,
3907 ) -> Result<Self> {
3908 let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3909
3910 Ok(Join {
3911 left,
3912 right,
3913 on,
3914 filter,
3915 join_type,
3916 join_constraint,
3917 schema: Arc::new(join_schema),
3918 null_equality,
3919 null_aware,
3920 })
3921 }
3922
3923 pub fn try_new_with_project_input(
3926 original: &LogicalPlan,
3927 left: Arc<LogicalPlan>,
3928 right: Arc<LogicalPlan>,
3929 column_on: (Vec<Column>, Vec<Column>),
3930 ) -> Result<(Self, bool)> {
3931 let original_join = match original {
3932 LogicalPlan::Join(join) => join,
3933 _ => return plan_err!("Could not create join with project input"),
3934 };
3935
3936 let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3937 let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3938
3939 let mut requalified = false;
3940
3941 if original_join.join_type == JoinType::Inner
3944 || original_join.join_type == JoinType::Left
3945 || original_join.join_type == JoinType::Right
3946 || original_join.join_type == JoinType::Full
3947 {
3948 (left_sch, right_sch, requalified) =
3949 requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3950 }
3951
3952 let on: Vec<(Expr, Expr)> = column_on
3953 .0
3954 .into_iter()
3955 .zip(column_on.1)
3956 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3957 .collect();
3958
3959 let join_schema = build_join_schema(
3960 left_sch.schema(),
3961 right_sch.schema(),
3962 &original_join.join_type,
3963 )?;
3964
3965 Ok((
3966 Join {
3967 left,
3968 right,
3969 on,
3970 filter: original_join.filter.clone(),
3971 join_type: original_join.join_type,
3972 join_constraint: original_join.join_constraint,
3973 schema: Arc::new(join_schema),
3974 null_equality: original_join.null_equality,
3975 null_aware: original_join.null_aware,
3976 },
3977 requalified,
3978 ))
3979 }
3980}
3981
3982impl PartialOrd for Join {
3984 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3985 #[derive(PartialEq, PartialOrd)]
3986 struct ComparableJoin<'a> {
3987 pub left: &'a Arc<LogicalPlan>,
3989 pub right: &'a Arc<LogicalPlan>,
3991 pub on: &'a Vec<(Expr, Expr)>,
3993 pub filter: &'a Option<Expr>,
3995 pub join_type: &'a JoinType,
3997 pub join_constraint: &'a JoinConstraint,
3999 pub null_equality: &'a NullEquality,
4001 }
4002 let comparable_self = ComparableJoin {
4003 left: &self.left,
4004 right: &self.right,
4005 on: &self.on,
4006 filter: &self.filter,
4007 join_type: &self.join_type,
4008 join_constraint: &self.join_constraint,
4009 null_equality: &self.null_equality,
4010 };
4011 let comparable_other = ComparableJoin {
4012 left: &other.left,
4013 right: &other.right,
4014 on: &other.on,
4015 filter: &other.filter,
4016 join_type: &other.join_type,
4017 join_constraint: &other.join_constraint,
4018 null_equality: &other.null_equality,
4019 };
4020 comparable_self
4021 .partial_cmp(&comparable_other)
4022 .filter(|cmp| *cmp != Ordering::Equal || self == other)
4024 }
4025}
4026
4027#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
4029pub struct Subquery {
4030 pub subquery: Arc<LogicalPlan>,
4032 pub outer_ref_columns: Vec<Expr>,
4034 pub spans: Spans,
4036}
4037
4038impl Normalizeable for Subquery {
4039 fn can_normalize(&self) -> bool {
4040 false
4041 }
4042}
4043
4044impl NormalizeEq for Subquery {
4045 fn normalize_eq(&self, other: &Self) -> bool {
4046 *self.subquery == *other.subquery
4048 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
4049 && self
4050 .outer_ref_columns
4051 .iter()
4052 .zip(other.outer_ref_columns.iter())
4053 .all(|(a, b)| a.normalize_eq(b))
4054 }
4055}
4056
4057impl Subquery {
4058 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
4059 match plan {
4060 Expr::ScalarSubquery(it) => Ok(it),
4061 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
4062 _ => plan_err!("Could not coerce into ScalarSubquery!"),
4063 }
4064 }
4065
4066 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
4067 Subquery {
4068 subquery: plan,
4069 outer_ref_columns: self.outer_ref_columns.clone(),
4070 spans: Spans::new(),
4071 }
4072 }
4073}
4074
4075impl Debug for Subquery {
4076 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4077 write!(f, "<subquery>")
4078 }
4079}
4080
4081#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
4087pub enum Partitioning {
4088 RoundRobinBatch(usize),
4090 Hash(Vec<Expr>, usize),
4093 DistributeBy(Vec<Expr>),
4095}
4096
4097#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4117pub struct ColumnUnnestList {
4118 pub output_column: Column,
4119 pub depth: usize,
4120}
4121
4122impl Display for ColumnUnnestList {
4123 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4124 write!(f, "{}|depth={}", self.output_column, self.depth)
4125 }
4126}
4127
4128#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4131pub struct Unnest {
4132 pub input: Arc<LogicalPlan>,
4134 pub exec_columns: Vec<Column>,
4136 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4139 pub struct_type_columns: Vec<usize>,
4142 pub dependency_indices: Vec<usize>,
4145 pub schema: DFSchemaRef,
4147 pub options: UnnestOptions,
4149}
4150
4151impl PartialOrd for Unnest {
4153 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4154 #[derive(PartialEq, PartialOrd)]
4155 struct ComparableUnnest<'a> {
4156 pub input: &'a Arc<LogicalPlan>,
4158 pub exec_columns: &'a Vec<Column>,
4160 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4163 pub struct_type_columns: &'a Vec<usize>,
4166 pub dependency_indices: &'a Vec<usize>,
4169 pub options: &'a UnnestOptions,
4171 }
4172 let comparable_self = ComparableUnnest {
4173 input: &self.input,
4174 exec_columns: &self.exec_columns,
4175 list_type_columns: &self.list_type_columns,
4176 struct_type_columns: &self.struct_type_columns,
4177 dependency_indices: &self.dependency_indices,
4178 options: &self.options,
4179 };
4180 let comparable_other = ComparableUnnest {
4181 input: &other.input,
4182 exec_columns: &other.exec_columns,
4183 list_type_columns: &other.list_type_columns,
4184 struct_type_columns: &other.struct_type_columns,
4185 dependency_indices: &other.dependency_indices,
4186 options: &other.options,
4187 };
4188 comparable_self
4189 .partial_cmp(&comparable_other)
4190 .filter(|cmp| *cmp != Ordering::Equal || self == other)
4192 }
4193}
4194
4195impl Unnest {
4196 pub fn try_new(
4197 input: Arc<LogicalPlan>,
4198 exec_columns: Vec<Column>,
4199 options: UnnestOptions,
4200 ) -> Result<Self> {
4201 if exec_columns.is_empty() {
4202 return plan_err!("unnest plan requires at least 1 column to unnest");
4203 }
4204
4205 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4206 let mut struct_columns = vec![];
4207 let indices_to_unnest = exec_columns
4208 .iter()
4209 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4210 .collect::<Result<HashMap<usize, &Column>>>()?;
4211
4212 let input_schema = input.schema();
4213
4214 let mut dependency_indices = vec![];
4215 let fields = input_schema
4231 .iter()
4232 .enumerate()
4233 .map(|(index, (original_qualifier, original_field))| {
4234 match indices_to_unnest.get(&index) {
4235 Some(column_to_unnest) => {
4236 let recursions_on_column = options
4237 .recursions
4238 .iter()
4239 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4240 .collect::<Vec<_>>();
4241 let mut transformed_columns = recursions_on_column
4242 .iter()
4243 .map(|r| {
4244 list_columns.push((
4245 index,
4246 ColumnUnnestList {
4247 output_column: r.output_column.clone(),
4248 depth: r.depth,
4249 },
4250 ));
4251 Ok(get_unnested_columns(
4252 &r.output_column.name,
4253 original_field.data_type(),
4254 r.depth,
4255 )?
4256 .into_iter()
4257 .next()
4258 .unwrap()) })
4260 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4261 if transformed_columns.is_empty() {
4262 transformed_columns = get_unnested_columns(
4263 &column_to_unnest.name,
4264 original_field.data_type(),
4265 1,
4266 )?;
4267 match original_field.data_type() {
4268 DataType::Struct(_) => {
4269 struct_columns.push(index);
4270 }
4271 DataType::List(_)
4272 | DataType::FixedSizeList(_, _)
4273 | DataType::LargeList(_) => {
4274 list_columns.push((
4275 index,
4276 ColumnUnnestList {
4277 output_column: Column::from_name(
4278 &column_to_unnest.name,
4279 ),
4280 depth: 1,
4281 },
4282 ));
4283 }
4284 _ => {}
4285 };
4286 }
4287
4288 dependency_indices.extend(std::iter::repeat_n(
4290 index,
4291 transformed_columns.len(),
4292 ));
4293 Ok(transformed_columns
4294 .iter()
4295 .map(|(col, field)| {
4296 (col.relation.to_owned(), field.to_owned())
4297 })
4298 .collect())
4299 }
4300 None => {
4301 dependency_indices.push(index);
4302 Ok(vec![(
4303 original_qualifier.cloned(),
4304 Arc::clone(original_field),
4305 )])
4306 }
4307 }
4308 })
4309 .collect::<Result<Vec<_>>>()?
4310 .into_iter()
4311 .flatten()
4312 .collect::<Vec<_>>();
4313
4314 let metadata = input_schema.metadata().clone();
4315 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4316 let deps = input_schema.functional_dependencies().clone();
4318 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4319
4320 Ok(Unnest {
4321 input,
4322 exec_columns,
4323 list_type_columns: list_columns,
4324 struct_type_columns: struct_columns,
4325 dependency_indices,
4326 schema,
4327 options,
4328 })
4329 }
4330}
4331
4332fn get_unnested_columns(
4341 col_name: &String,
4342 data_type: &DataType,
4343 depth: usize,
4344) -> Result<Vec<(Column, Arc<Field>)>> {
4345 let mut qualified_columns = Vec::with_capacity(1);
4346
4347 match data_type {
4348 DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4349 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4350 let new_field = Arc::new(Field::new(
4351 col_name, data_type,
4352 true,
4355 ));
4356 let column = Column::from_name(col_name);
4357 qualified_columns.push((column, new_field));
4359 }
4360 DataType::Struct(fields) => {
4361 qualified_columns.extend(fields.iter().map(|f| {
4362 let new_name = format!("{}.{}", col_name, f.name());
4363 let column = Column::from_name(&new_name);
4364 let new_field = f.as_ref().clone().with_name(new_name);
4365 (column, Arc::new(new_field))
4367 }))
4368 }
4369 _ => {
4370 return internal_err!("trying to unnest on invalid data type {data_type}");
4371 }
4372 };
4373 Ok(qualified_columns)
4374}
4375
4376fn get_unnested_list_datatype_recursive(
4379 data_type: &DataType,
4380 depth: usize,
4381) -> Result<DataType> {
4382 match data_type {
4383 DataType::List(field)
4384 | DataType::FixedSizeList(field, _)
4385 | DataType::LargeList(field) => {
4386 if depth == 1 {
4387 return Ok(field.data_type().clone());
4388 }
4389 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4390 }
4391 _ => {}
4392 };
4393
4394 internal_err!("trying to unnest on invalid data type {data_type}")
4395}
4396
4397#[cfg(test)]
4398mod tests {
4399 use super::*;
4400 use crate::builder::LogicalTableSource;
4401 use crate::logical_plan::table_scan;
4402 use crate::select_expr::SelectExpr;
4403 use crate::test::function_stub::{count, count_udaf};
4404 use crate::{
4405 GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder,
4406 scalar_subquery,
4407 };
4408 use datafusion_common::metadata::ScalarAndMetadata;
4409 use datafusion_common::tree_node::{
4410 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4411 };
4412 use datafusion_common::{Constraint, ScalarValue, not_impl_err};
4413 use insta::{assert_debug_snapshot, assert_snapshot};
4414 use std::hash::DefaultHasher;
4415
4416 fn employee_schema() -> Schema {
4417 Schema::new(vec![
4418 Field::new("id", DataType::Int32, false),
4419 Field::new("first_name", DataType::Utf8, false),
4420 Field::new("last_name", DataType::Utf8, false),
4421 Field::new("state", DataType::Utf8, false),
4422 Field::new("salary", DataType::Int32, false),
4423 ])
4424 }
4425
4426 fn display_plan() -> Result<LogicalPlan> {
4427 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4428 .build()?;
4429
4430 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4431 .filter(in_subquery(col("state"), Arc::new(plan1)))?
4432 .project(vec![col("id")])?
4433 .build()
4434 }
4435
4436 #[test]
4437 fn test_display_indent() -> Result<()> {
4438 let plan = display_plan()?;
4439
4440 assert_snapshot!(plan.display_indent(), @r"
4441 Projection: employee_csv.id
4442 Filter: employee_csv.state IN (<subquery>)
4443 Subquery:
4444 TableScan: employee_csv projection=[state]
4445 TableScan: employee_csv projection=[id, state]
4446 ");
4447 Ok(())
4448 }
4449
4450 #[test]
4451 fn test_display_indent_schema() -> Result<()> {
4452 let plan = display_plan()?;
4453
4454 assert_snapshot!(plan.display_indent_schema(), @r"
4455 Projection: employee_csv.id [id:Int32]
4456 Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4457 Subquery: [state:Utf8]
4458 TableScan: employee_csv projection=[state] [state:Utf8]
4459 TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4460 ");
4461 Ok(())
4462 }
4463
4464 #[test]
4465 fn test_display_subquery_alias() -> Result<()> {
4466 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4467 .build()?;
4468 let plan1 = Arc::new(plan1);
4469
4470 let plan =
4471 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4472 .project(vec![col("id"), exists(plan1).alias("exists")])?
4473 .build();
4474
4475 assert_snapshot!(plan?.display_indent(), @r"
4476 Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4477 Subquery:
4478 TableScan: employee_csv projection=[state]
4479 TableScan: employee_csv projection=[id, state]
4480 ");
4481 Ok(())
4482 }
4483
4484 #[test]
4485 fn test_display_graphviz() -> Result<()> {
4486 let plan = display_plan()?;
4487
4488 assert_snapshot!(plan.display_graphviz(), @r#"
4491 // Begin DataFusion GraphViz Plan,
4492 // display it online here: https://dreampuf.github.io/GraphvizOnline
4493
4494 digraph {
4495 subgraph cluster_1
4496 {
4497 graph[label="LogicalPlan"]
4498 2[shape=box label="Projection: employee_csv.id"]
4499 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4500 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4501 4[shape=box label="Subquery:"]
4502 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4503 5[shape=box label="TableScan: employee_csv projection=[state]"]
4504 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4505 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4506 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4507 }
4508 subgraph cluster_7
4509 {
4510 graph[label="Detailed LogicalPlan"]
4511 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4512 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4513 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4514 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4515 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4516 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4517 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4518 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4519 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4520 }
4521 }
4522 // End DataFusion GraphViz Plan
4523 "#);
4524 Ok(())
4525 }
4526
4527 #[test]
4528 fn test_display_pg_json() -> Result<()> {
4529 let plan = display_plan()?;
4530
4531 assert_snapshot!(plan.display_pg_json(), @r#"
4532 [
4533 {
4534 "Plan": {
4535 "Expressions": [
4536 "employee_csv.id"
4537 ],
4538 "Node Type": "Projection",
4539 "Output": [
4540 "id"
4541 ],
4542 "Plans": [
4543 {
4544 "Condition": "employee_csv.state IN (<subquery>)",
4545 "Node Type": "Filter",
4546 "Output": [
4547 "id",
4548 "state"
4549 ],
4550 "Plans": [
4551 {
4552 "Node Type": "Subquery",
4553 "Output": [
4554 "state"
4555 ],
4556 "Plans": [
4557 {
4558 "Node Type": "TableScan",
4559 "Output": [
4560 "state"
4561 ],
4562 "Plans": [],
4563 "Relation Name": "employee_csv"
4564 }
4565 ]
4566 },
4567 {
4568 "Node Type": "TableScan",
4569 "Output": [
4570 "id",
4571 "state"
4572 ],
4573 "Plans": [],
4574 "Relation Name": "employee_csv"
4575 }
4576 ]
4577 }
4578 ]
4579 }
4580 }
4581 ]
4582 "#);
4583 Ok(())
4584 }
4585
4586 #[derive(Debug, Default)]
4588 struct OkVisitor {
4589 strings: Vec<String>,
4590 }
4591
4592 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4593 type Node = LogicalPlan;
4594
4595 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4596 let s = match plan {
4597 LogicalPlan::Projection { .. } => "pre_visit Projection",
4598 LogicalPlan::Filter { .. } => "pre_visit Filter",
4599 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4600 _ => {
4601 return not_impl_err!("unknown plan type");
4602 }
4603 };
4604
4605 self.strings.push(s.into());
4606 Ok(TreeNodeRecursion::Continue)
4607 }
4608
4609 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4610 let s = match plan {
4611 LogicalPlan::Projection { .. } => "post_visit Projection",
4612 LogicalPlan::Filter { .. } => "post_visit Filter",
4613 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4614 _ => {
4615 return not_impl_err!("unknown plan type");
4616 }
4617 };
4618
4619 self.strings.push(s.into());
4620 Ok(TreeNodeRecursion::Continue)
4621 }
4622 }
4623
4624 #[test]
4625 fn visit_order() {
4626 let mut visitor = OkVisitor::default();
4627 let plan = test_plan();
4628 let res = plan.visit_with_subqueries(&mut visitor);
4629 assert!(res.is_ok());
4630
4631 assert_debug_snapshot!(visitor.strings, @r#"
4632 [
4633 "pre_visit Projection",
4634 "pre_visit Filter",
4635 "pre_visit TableScan",
4636 "post_visit TableScan",
4637 "post_visit Filter",
4638 "post_visit Projection",
4639 ]
4640 "#);
4641 }
4642
4643 #[derive(Debug, Default)]
4644 struct OptionalCounter {
4646 val: Option<usize>,
4647 }
4648
4649 impl OptionalCounter {
4650 fn new(val: usize) -> Self {
4651 Self { val: Some(val) }
4652 }
4653 fn dec(&mut self) -> bool {
4655 if Some(0) == self.val {
4656 true
4657 } else {
4658 self.val = self.val.take().map(|i| i - 1);
4659 false
4660 }
4661 }
4662 }
4663
4664 #[derive(Debug, Default)]
4665 struct StoppingVisitor {
4667 inner: OkVisitor,
4668 return_false_from_pre_in: OptionalCounter,
4670 return_false_from_post_in: OptionalCounter,
4672 }
4673
4674 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4675 type Node = LogicalPlan;
4676
4677 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4678 if self.return_false_from_pre_in.dec() {
4679 return Ok(TreeNodeRecursion::Stop);
4680 }
4681 self.inner.f_down(plan)?;
4682
4683 Ok(TreeNodeRecursion::Continue)
4684 }
4685
4686 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4687 if self.return_false_from_post_in.dec() {
4688 return Ok(TreeNodeRecursion::Stop);
4689 }
4690
4691 self.inner.f_up(plan)
4692 }
4693 }
4694
4695 #[test]
4697 fn early_stopping_pre_visit() {
4698 let mut visitor = StoppingVisitor {
4699 return_false_from_pre_in: OptionalCounter::new(2),
4700 ..Default::default()
4701 };
4702 let plan = test_plan();
4703 let res = plan.visit_with_subqueries(&mut visitor);
4704 assert!(res.is_ok());
4705
4706 assert_debug_snapshot!(
4707 visitor.inner.strings,
4708 @r#"
4709 [
4710 "pre_visit Projection",
4711 "pre_visit Filter",
4712 ]
4713 "#
4714 );
4715 }
4716
4717 #[test]
4718 fn early_stopping_post_visit() {
4719 let mut visitor = StoppingVisitor {
4720 return_false_from_post_in: OptionalCounter::new(1),
4721 ..Default::default()
4722 };
4723 let plan = test_plan();
4724 let res = plan.visit_with_subqueries(&mut visitor);
4725 assert!(res.is_ok());
4726
4727 assert_debug_snapshot!(
4728 visitor.inner.strings,
4729 @r#"
4730 [
4731 "pre_visit Projection",
4732 "pre_visit Filter",
4733 "pre_visit TableScan",
4734 "post_visit TableScan",
4735 ]
4736 "#
4737 );
4738 }
4739
4740 #[derive(Debug, Default)]
4741 struct ErrorVisitor {
4743 inner: OkVisitor,
4744 return_error_from_pre_in: OptionalCounter,
4746 return_error_from_post_in: OptionalCounter,
4748 }
4749
4750 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4751 type Node = LogicalPlan;
4752
4753 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4754 if self.return_error_from_pre_in.dec() {
4755 return not_impl_err!("Error in pre_visit");
4756 }
4757
4758 self.inner.f_down(plan)
4759 }
4760
4761 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4762 if self.return_error_from_post_in.dec() {
4763 return not_impl_err!("Error in post_visit");
4764 }
4765
4766 self.inner.f_up(plan)
4767 }
4768 }
4769
4770 #[test]
4771 fn error_pre_visit() {
4772 let mut visitor = ErrorVisitor {
4773 return_error_from_pre_in: OptionalCounter::new(2),
4774 ..Default::default()
4775 };
4776 let plan = test_plan();
4777 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4778 assert_snapshot!(
4779 res.strip_backtrace(),
4780 @"This feature is not implemented: Error in pre_visit"
4781 );
4782 assert_debug_snapshot!(
4783 visitor.inner.strings,
4784 @r#"
4785 [
4786 "pre_visit Projection",
4787 "pre_visit Filter",
4788 ]
4789 "#
4790 );
4791 }
4792
4793 #[test]
4794 fn error_post_visit() {
4795 let mut visitor = ErrorVisitor {
4796 return_error_from_post_in: OptionalCounter::new(1),
4797 ..Default::default()
4798 };
4799 let plan = test_plan();
4800 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4801 assert_snapshot!(
4802 res.strip_backtrace(),
4803 @"This feature is not implemented: Error in post_visit"
4804 );
4805 assert_debug_snapshot!(
4806 visitor.inner.strings,
4807 @r#"
4808 [
4809 "pre_visit Projection",
4810 "pre_visit Filter",
4811 "pre_visit TableScan",
4812 "post_visit TableScan",
4813 ]
4814 "#
4815 );
4816 }
4817
4818 #[test]
4819 fn test_partial_eq_hash_and_partial_ord() {
4820 let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4821 produce_one_row: true,
4822 schema: Arc::new(DFSchema::empty()),
4823 }));
4824
4825 let count_window_function = |schema| {
4826 Window::try_new_with_schema(
4827 vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4828 WindowFunctionDefinition::AggregateUDF(count_udaf()),
4829 vec![],
4830 )))],
4831 Arc::clone(&empty_values),
4832 Arc::new(schema),
4833 )
4834 .unwrap()
4835 };
4836
4837 let schema_without_metadata = || {
4838 DFSchema::from_unqualified_fields(
4839 vec![Field::new("count", DataType::Int64, false)].into(),
4840 HashMap::new(),
4841 )
4842 .unwrap()
4843 };
4844
4845 let schema_with_metadata = || {
4846 DFSchema::from_unqualified_fields(
4847 vec![Field::new("count", DataType::Int64, false)].into(),
4848 [("key".to_string(), "value".to_string())].into(),
4849 )
4850 .unwrap()
4851 };
4852
4853 let f = count_window_function(schema_without_metadata());
4855
4856 let f2 = count_window_function(schema_without_metadata());
4858 assert_eq!(f, f2);
4859 assert_eq!(hash(&f), hash(&f2));
4860 assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4861
4862 let o = count_window_function(schema_with_metadata());
4864 assert_ne!(f, o);
4865 assert_ne!(hash(&f), hash(&o)); assert_eq!(f.partial_cmp(&o), None);
4867 }
4868
4869 fn hash<T: Hash>(value: &T) -> u64 {
4870 let hasher = &mut DefaultHasher::new();
4871 value.hash(hasher);
4872 hasher.finish()
4873 }
4874
4875 #[test]
4876 fn projection_expr_schema_mismatch() -> Result<()> {
4877 let empty_schema = Arc::new(DFSchema::empty());
4878 let p = Projection::try_new_with_schema(
4879 vec![col("a")],
4880 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4881 produce_one_row: false,
4882 schema: Arc::clone(&empty_schema),
4883 })),
4884 empty_schema,
4885 );
4886 assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4887 Ok(())
4888 }
4889
4890 fn test_plan() -> LogicalPlan {
4891 let schema = Schema::new(vec![
4892 Field::new("id", DataType::Int32, false),
4893 Field::new("state", DataType::Utf8, false),
4894 ]);
4895
4896 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4897 .unwrap()
4898 .filter(col("state").eq(lit("CO")))
4899 .unwrap()
4900 .project(vec![col("id")])
4901 .unwrap()
4902 .build()
4903 .unwrap()
4904 }
4905
4906 #[test]
4907 fn test_replace_invalid_placeholder() {
4908 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4910
4911 let plan = table_scan(TableReference::none(), &schema, None)
4912 .unwrap()
4913 .filter(col("id").eq(placeholder("")))
4914 .unwrap()
4915 .build()
4916 .unwrap();
4917
4918 let param_values = vec![ScalarValue::Int32(Some(42))];
4919 plan.replace_params_with_values(¶m_values.clone().into())
4920 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4921
4922 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4924
4925 let plan = table_scan(TableReference::none(), &schema, None)
4926 .unwrap()
4927 .filter(col("id").eq(placeholder("$0")))
4928 .unwrap()
4929 .build()
4930 .unwrap();
4931
4932 plan.replace_params_with_values(¶m_values.clone().into())
4933 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4934
4935 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4937
4938 let plan = table_scan(TableReference::none(), &schema, None)
4939 .unwrap()
4940 .filter(col("id").eq(placeholder("$00")))
4941 .unwrap()
4942 .build()
4943 .unwrap();
4944
4945 plan.replace_params_with_values(¶m_values.into())
4946 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4947 }
4948
4949 #[test]
4950 fn test_replace_placeholder_mismatched_metadata() {
4951 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4952
4953 let plan = table_scan(TableReference::none(), &schema, None)
4955 .unwrap()
4956 .filter(col("id").eq(placeholder("$1")))
4957 .unwrap()
4958 .build()
4959 .unwrap();
4960 let prepared_builder = LogicalPlanBuilder::new(plan)
4961 .prepare(
4962 "".to_string(),
4963 vec![Field::new("", DataType::Int32, true).into()],
4964 )
4965 .unwrap();
4966
4967 let mut scalar_meta = HashMap::new();
4969 scalar_meta.insert("some_key".to_string(), "some_value".to_string());
4970 let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
4971 ScalarValue::Int32(Some(42)),
4972 Some(scalar_meta.into()),
4973 )]);
4974 prepared_builder
4975 .plan()
4976 .clone()
4977 .with_param_values(param_values)
4978 .expect_err("prepared field metadata mismatch unexpectedly succeeded");
4979 }
4980
4981 #[test]
4982 fn test_replace_placeholder_empty_relation_valid_schema() {
4983 let plan = LogicalPlanBuilder::empty(false)
4985 .project(vec![
4986 SelectExpr::from(placeholder("$1")),
4987 SelectExpr::from(placeholder("$2")),
4988 ])
4989 .unwrap()
4990 .build()
4991 .unwrap();
4992
4993 assert_snapshot!(plan.display_indent_schema(), @r"
4995 Projection: $1, $2 [$1:Null;N, $2:Null;N]
4996 EmptyRelation: rows=0 []
4997 ");
4998
4999 let plan = plan
5000 .with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
5001 .unwrap();
5002
5003 assert_snapshot!(plan.display_indent_schema(), @r#"
5005 Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
5006 EmptyRelation: rows=0 []
5007 "#);
5008 }
5009
5010 #[test]
5011 fn test_nullable_schema_after_grouping_set() {
5012 let schema = Schema::new(vec![
5013 Field::new("foo", DataType::Int32, false),
5014 Field::new("bar", DataType::Int32, false),
5015 ]);
5016
5017 let plan = table_scan(TableReference::none(), &schema, None)
5018 .unwrap()
5019 .aggregate(
5020 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
5021 vec![col("foo")],
5022 vec![col("bar")],
5023 ]))],
5024 vec![count(lit(true))],
5025 )
5026 .unwrap()
5027 .build()
5028 .unwrap();
5029
5030 let output_schema = plan.schema();
5031
5032 assert!(
5033 output_schema
5034 .field_with_name(None, "foo")
5035 .unwrap()
5036 .is_nullable(),
5037 );
5038 assert!(
5039 output_schema
5040 .field_with_name(None, "bar")
5041 .unwrap()
5042 .is_nullable()
5043 );
5044 }
5045
5046 #[test]
5047 fn test_filter_is_scalar() {
5048 let schema =
5050 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
5051
5052 let source = Arc::new(LogicalTableSource::new(schema));
5053 let schema = Arc::new(
5054 DFSchema::try_from_qualified_schema(
5055 TableReference::bare("tab"),
5056 &source.schema(),
5057 )
5058 .unwrap(),
5059 );
5060 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5061 table_name: TableReference::bare("tab"),
5062 source: Arc::clone(&source) as Arc<dyn TableSource>,
5063 projection: None,
5064 projected_schema: Arc::clone(&schema),
5065 filters: vec![],
5066 fetch: None,
5067 }));
5068 let col = schema.field_names()[0].clone();
5069
5070 let filter = Filter::try_new(
5071 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
5072 scan,
5073 )
5074 .unwrap();
5075 assert!(!filter.is_scalar());
5076 let unique_schema = Arc::new(
5077 schema
5078 .as_ref()
5079 .clone()
5080 .with_functional_dependencies(
5081 FunctionalDependencies::new_from_constraints(
5082 Some(&Constraints::new_unverified(vec![Constraint::Unique(
5083 vec![0],
5084 )])),
5085 1,
5086 ),
5087 )
5088 .unwrap(),
5089 );
5090 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5091 table_name: TableReference::bare("tab"),
5092 source,
5093 projection: None,
5094 projected_schema: Arc::clone(&unique_schema),
5095 filters: vec![],
5096 fetch: None,
5097 }));
5098 let col = schema.field_names()[0].clone();
5099
5100 let filter =
5101 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
5102 assert!(filter.is_scalar());
5103 }
5104
5105 #[test]
5106 fn test_transform_explain() {
5107 let schema = Schema::new(vec![
5108 Field::new("foo", DataType::Int32, false),
5109 Field::new("bar", DataType::Int32, false),
5110 ]);
5111
5112 let plan = table_scan(TableReference::none(), &schema, None)
5113 .unwrap()
5114 .explain(false, false)
5115 .unwrap()
5116 .build()
5117 .unwrap();
5118
5119 let external_filter = col("foo").eq(lit(true));
5120
5121 let plan = plan
5124 .transform(|plan| match plan {
5125 LogicalPlan::TableScan(table) => {
5126 let filter = Filter::try_new(
5127 external_filter.clone(),
5128 Arc::new(LogicalPlan::TableScan(table)),
5129 )
5130 .unwrap();
5131 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
5132 }
5133 x => Ok(Transformed::no(x)),
5134 })
5135 .data()
5136 .unwrap();
5137
5138 let actual = format!("{}", plan.display_indent());
5139 assert_snapshot!(actual, @r"
5140 Explain
5141 Filter: foo = Boolean(true)
5142 TableScan: ?table?
5143 ")
5144 }
5145
5146 #[test]
5147 fn test_plan_partial_ord() {
5148 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5149 produce_one_row: false,
5150 schema: Arc::new(DFSchema::empty()),
5151 });
5152
5153 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5154 schema: Arc::new(Schema::new(vec![Field::new(
5155 "foo",
5156 DataType::Int32,
5157 false,
5158 )])),
5159 output_schema: DFSchemaRef::new(DFSchema::empty()),
5160 });
5161
5162 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5163 schema: Arc::new(Schema::new(vec![Field::new(
5164 "foo",
5165 DataType::Int32,
5166 false,
5167 )])),
5168 output_schema: DFSchemaRef::new(DFSchema::empty()),
5169 });
5170
5171 assert_eq!(
5172 empty_relation.partial_cmp(&describe_table),
5173 Some(Ordering::Less)
5174 );
5175 assert_eq!(
5176 describe_table.partial_cmp(&empty_relation),
5177 Some(Ordering::Greater)
5178 );
5179 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5180 }
5181
5182 #[test]
5183 fn test_limit_with_new_children() {
5184 let input = Arc::new(LogicalPlan::Values(Values {
5185 schema: Arc::new(DFSchema::empty()),
5186 values: vec![vec![]],
5187 }));
5188 let cases = [
5189 LogicalPlan::Limit(Limit {
5190 skip: None,
5191 fetch: None,
5192 input: Arc::clone(&input),
5193 }),
5194 LogicalPlan::Limit(Limit {
5195 skip: None,
5196 fetch: Some(Box::new(Expr::Literal(
5197 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5198 None,
5199 ))),
5200 input: Arc::clone(&input),
5201 }),
5202 LogicalPlan::Limit(Limit {
5203 skip: Some(Box::new(Expr::Literal(
5204 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5205 None,
5206 ))),
5207 fetch: None,
5208 input: Arc::clone(&input),
5209 }),
5210 LogicalPlan::Limit(Limit {
5211 skip: Some(Box::new(Expr::Literal(
5212 ScalarValue::new_one(&DataType::UInt32).unwrap(),
5213 None,
5214 ))),
5215 fetch: Some(Box::new(Expr::Literal(
5216 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5217 None,
5218 ))),
5219 input,
5220 }),
5221 ];
5222
5223 for limit in cases {
5224 let new_limit = limit
5225 .with_new_exprs(
5226 limit.expressions(),
5227 limit.inputs().into_iter().cloned().collect(),
5228 )
5229 .unwrap();
5230 assert_eq!(limit, new_limit);
5231 }
5232 }
5233
5234 #[test]
5235 fn test_with_subqueries_jump() {
5236 let subquery_schema =
5241 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5242
5243 let subquery_plan =
5244 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5245 .unwrap()
5246 .filter(col("sub_id").eq(lit(0)))
5247 .unwrap()
5248 .build()
5249 .unwrap();
5250
5251 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5252
5253 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5254 .unwrap()
5255 .filter(col("id").eq(lit(0)))
5256 .unwrap()
5257 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5258 .unwrap()
5259 .build()
5260 .unwrap();
5261
5262 let mut filter_found = false;
5263 plan.apply_with_subqueries(|plan| {
5264 match plan {
5265 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5266 LogicalPlan::Filter(..) => filter_found = true,
5267 _ => {}
5268 }
5269 Ok(TreeNodeRecursion::Continue)
5270 })
5271 .unwrap();
5272 assert!(!filter_found);
5273
5274 struct ProjectJumpVisitor {
5275 filter_found: bool,
5276 }
5277
5278 impl ProjectJumpVisitor {
5279 fn new() -> Self {
5280 Self {
5281 filter_found: false,
5282 }
5283 }
5284 }
5285
5286 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5287 type Node = LogicalPlan;
5288
5289 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5290 match node {
5291 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5292 LogicalPlan::Filter(..) => self.filter_found = true,
5293 _ => {}
5294 }
5295 Ok(TreeNodeRecursion::Continue)
5296 }
5297 }
5298
5299 let mut visitor = ProjectJumpVisitor::new();
5300 plan.visit_with_subqueries(&mut visitor).unwrap();
5301 assert!(!visitor.filter_found);
5302
5303 let mut filter_found = false;
5304 plan.clone()
5305 .transform_down_with_subqueries(|plan| {
5306 match plan {
5307 LogicalPlan::Projection(..) => {
5308 return Ok(Transformed::new(
5309 plan,
5310 false,
5311 TreeNodeRecursion::Jump,
5312 ));
5313 }
5314 LogicalPlan::Filter(..) => filter_found = true,
5315 _ => {}
5316 }
5317 Ok(Transformed::no(plan))
5318 })
5319 .unwrap();
5320 assert!(!filter_found);
5321
5322 let mut filter_found = false;
5323 plan.clone()
5324 .transform_down_up_with_subqueries(
5325 |plan| {
5326 match plan {
5327 LogicalPlan::Projection(..) => {
5328 return Ok(Transformed::new(
5329 plan,
5330 false,
5331 TreeNodeRecursion::Jump,
5332 ));
5333 }
5334 LogicalPlan::Filter(..) => filter_found = true,
5335 _ => {}
5336 }
5337 Ok(Transformed::no(plan))
5338 },
5339 |plan| Ok(Transformed::no(plan)),
5340 )
5341 .unwrap();
5342 assert!(!filter_found);
5343
5344 struct ProjectJumpRewriter {
5345 filter_found: bool,
5346 }
5347
5348 impl ProjectJumpRewriter {
5349 fn new() -> Self {
5350 Self {
5351 filter_found: false,
5352 }
5353 }
5354 }
5355
5356 impl TreeNodeRewriter for ProjectJumpRewriter {
5357 type Node = LogicalPlan;
5358
5359 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5360 match node {
5361 LogicalPlan::Projection(..) => {
5362 return Ok(Transformed::new(
5363 node,
5364 false,
5365 TreeNodeRecursion::Jump,
5366 ));
5367 }
5368 LogicalPlan::Filter(..) => self.filter_found = true,
5369 _ => {}
5370 }
5371 Ok(Transformed::no(node))
5372 }
5373 }
5374
5375 let mut rewriter = ProjectJumpRewriter::new();
5376 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5377 assert!(!rewriter.filter_found);
5378 }
5379
5380 #[test]
5381 fn test_with_unresolved_placeholders() {
5382 let field_name = "id";
5383 let placeholder_value = "$1";
5384 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5385
5386 let plan = table_scan(TableReference::none(), &schema, None)
5387 .unwrap()
5388 .filter(col(field_name).eq(placeholder(placeholder_value)))
5389 .unwrap()
5390 .build()
5391 .unwrap();
5392
5393 let params = plan.get_parameter_fields().unwrap();
5395 assert_eq!(params.len(), 1);
5396
5397 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5398 assert_eq!(parameter_type, None);
5399 }
5400
5401 #[test]
5402 fn test_join_with_new_exprs() -> Result<()> {
5403 fn create_test_join(
5404 on: Vec<(Expr, Expr)>,
5405 filter: Option<Expr>,
5406 ) -> Result<LogicalPlan> {
5407 let schema = Schema::new(vec![
5408 Field::new("a", DataType::Int32, false),
5409 Field::new("b", DataType::Int32, false),
5410 ]);
5411
5412 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5413 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5414
5415 Ok(LogicalPlan::Join(Join {
5416 left: Arc::new(
5417 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5418 ),
5419 right: Arc::new(
5420 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5421 ),
5422 on,
5423 filter,
5424 join_type: JoinType::Inner,
5425 join_constraint: JoinConstraint::On,
5426 schema: Arc::new(left_schema.join(&right_schema)?),
5427 null_equality: NullEquality::NullEqualsNothing,
5428 null_aware: false,
5429 }))
5430 }
5431
5432 {
5433 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5434 let LogicalPlan::Join(join) = join.with_new_exprs(
5435 join.expressions(),
5436 join.inputs().into_iter().cloned().collect(),
5437 )?
5438 else {
5439 unreachable!()
5440 };
5441 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5442 assert_eq!(join.filter, None);
5443 }
5444
5445 {
5446 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5447 let LogicalPlan::Join(join) = join.with_new_exprs(
5448 join.expressions(),
5449 join.inputs().into_iter().cloned().collect(),
5450 )?
5451 else {
5452 unreachable!()
5453 };
5454 assert_eq!(join.on, vec![]);
5455 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5456 }
5457
5458 {
5459 let join = create_test_join(
5460 vec![(col("t1.a"), (col("t2.a")))],
5461 Some(col("t1.b").gt(col("t2.b"))),
5462 )?;
5463 let LogicalPlan::Join(join) = join.with_new_exprs(
5464 join.expressions(),
5465 join.inputs().into_iter().cloned().collect(),
5466 )?
5467 else {
5468 unreachable!()
5469 };
5470 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5471 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5472 }
5473
5474 {
5475 let join = create_test_join(
5476 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5477 None,
5478 )?;
5479 let LogicalPlan::Join(join) = join.with_new_exprs(
5480 vec![
5481 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5482 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5483 col("t1.b"),
5484 col("t2.b"),
5485 lit(true),
5486 ],
5487 join.inputs().into_iter().cloned().collect(),
5488 )?
5489 else {
5490 unreachable!()
5491 };
5492 assert_eq!(
5493 join.on,
5494 vec![
5495 (
5496 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5497 binary_expr(col("t2.a"), Operator::Plus, lit(2))
5498 ),
5499 (col("t1.b"), (col("t2.b")))
5500 ]
5501 );
5502 assert_eq!(join.filter, Some(lit(true)));
5503 }
5504
5505 Ok(())
5506 }
5507
5508 #[test]
5509 fn test_join_try_new() -> Result<()> {
5510 let schema = Schema::new(vec![
5511 Field::new("a", DataType::Int32, false),
5512 Field::new("b", DataType::Int32, false),
5513 ]);
5514
5515 let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5516
5517 let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5518
5519 let join_types = vec![
5520 JoinType::Inner,
5521 JoinType::Left,
5522 JoinType::Right,
5523 JoinType::Full,
5524 JoinType::LeftSemi,
5525 JoinType::LeftAnti,
5526 JoinType::RightSemi,
5527 JoinType::RightAnti,
5528 JoinType::LeftMark,
5529 ];
5530
5531 for join_type in join_types {
5532 let join = Join::try_new(
5533 Arc::new(left_scan.clone()),
5534 Arc::new(right_scan.clone()),
5535 vec![(col("t1.a"), col("t2.a"))],
5536 Some(col("t1.b").gt(col("t2.b"))),
5537 join_type,
5538 JoinConstraint::On,
5539 NullEquality::NullEqualsNothing,
5540 false,
5541 )?;
5542
5543 match join_type {
5544 JoinType::LeftSemi | JoinType::LeftAnti => {
5545 assert_eq!(join.schema.fields().len(), 2);
5546
5547 let fields = join.schema.fields();
5548 assert_eq!(
5549 fields[0].name(),
5550 "a",
5551 "First field should be 'a' from left table"
5552 );
5553 assert_eq!(
5554 fields[1].name(),
5555 "b",
5556 "Second field should be 'b' from left table"
5557 );
5558 }
5559 JoinType::RightSemi | JoinType::RightAnti => {
5560 assert_eq!(join.schema.fields().len(), 2);
5561
5562 let fields = join.schema.fields();
5563 assert_eq!(
5564 fields[0].name(),
5565 "a",
5566 "First field should be 'a' from right table"
5567 );
5568 assert_eq!(
5569 fields[1].name(),
5570 "b",
5571 "Second field should be 'b' from right table"
5572 );
5573 }
5574 JoinType::LeftMark => {
5575 assert_eq!(join.schema.fields().len(), 3);
5576
5577 let fields = join.schema.fields();
5578 assert_eq!(
5579 fields[0].name(),
5580 "a",
5581 "First field should be 'a' from left table"
5582 );
5583 assert_eq!(
5584 fields[1].name(),
5585 "b",
5586 "Second field should be 'b' from left table"
5587 );
5588 assert_eq!(
5589 fields[2].name(),
5590 "mark",
5591 "Third field should be the mark column"
5592 );
5593
5594 assert!(!fields[0].is_nullable());
5595 assert!(!fields[1].is_nullable());
5596 assert!(!fields[2].is_nullable());
5597 }
5598 _ => {
5599 assert_eq!(join.schema.fields().len(), 4);
5600
5601 let fields = join.schema.fields();
5602 assert_eq!(
5603 fields[0].name(),
5604 "a",
5605 "First field should be 'a' from left table"
5606 );
5607 assert_eq!(
5608 fields[1].name(),
5609 "b",
5610 "Second field should be 'b' from left table"
5611 );
5612 assert_eq!(
5613 fields[2].name(),
5614 "a",
5615 "Third field should be 'a' from right table"
5616 );
5617 assert_eq!(
5618 fields[3].name(),
5619 "b",
5620 "Fourth field should be 'b' from right table"
5621 );
5622
5623 if join_type == JoinType::Left {
5624 assert!(!fields[0].is_nullable());
5626 assert!(!fields[1].is_nullable());
5627 assert!(fields[2].is_nullable());
5629 assert!(fields[3].is_nullable());
5630 } else if join_type == JoinType::Right {
5631 assert!(fields[0].is_nullable());
5633 assert!(fields[1].is_nullable());
5634 assert!(!fields[2].is_nullable());
5636 assert!(!fields[3].is_nullable());
5637 } else if join_type == JoinType::Full {
5638 assert!(fields[0].is_nullable());
5639 assert!(fields[1].is_nullable());
5640 assert!(fields[2].is_nullable());
5641 assert!(fields[3].is_nullable());
5642 }
5643 }
5644 }
5645
5646 assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5647 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5648 assert_eq!(join.join_type, join_type);
5649 assert_eq!(join.join_constraint, JoinConstraint::On);
5650 assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5651 }
5652
5653 Ok(())
5654 }
5655
5656 #[test]
5657 fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5658 let left_schema = Schema::new(vec![
5659 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), ]);
5663
5664 let right_schema = Schema::new(vec![
5665 Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), Field::new("value", DataType::Float64, true), ]);
5669
5670 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5671
5672 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5673
5674 {
5676 let join = Join::try_new(
5679 Arc::new(left_plan.clone()),
5680 Arc::new(right_plan.clone()),
5681 vec![(col("t1.id"), col("t2.id"))],
5682 None,
5683 JoinType::Inner,
5684 JoinConstraint::Using,
5685 NullEquality::NullEqualsNothing,
5686 false,
5687 )?;
5688
5689 let fields = join.schema.fields();
5690
5691 assert_eq!(fields.len(), 6);
5692
5693 assert_eq!(
5694 fields[0].name(),
5695 "id",
5696 "First field should be 'id' from left table"
5697 );
5698 assert_eq!(
5699 fields[1].name(),
5700 "name",
5701 "Second field should be 'name' from left table"
5702 );
5703 assert_eq!(
5704 fields[2].name(),
5705 "value",
5706 "Third field should be 'value' from left table"
5707 );
5708 assert_eq!(
5709 fields[3].name(),
5710 "id",
5711 "Fourth field should be 'id' from right table"
5712 );
5713 assert_eq!(
5714 fields[4].name(),
5715 "category",
5716 "Fifth field should be 'category' from right table"
5717 );
5718 assert_eq!(
5719 fields[5].name(),
5720 "value",
5721 "Sixth field should be 'value' from right table"
5722 );
5723
5724 assert_eq!(join.join_constraint, JoinConstraint::Using);
5725 }
5726
5727 {
5729 let join = Join::try_new(
5731 Arc::new(left_plan.clone()),
5732 Arc::new(right_plan.clone()),
5733 vec![(col("t1.id"), col("t2.id"))], Some(col("t1.value").lt(col("t2.value"))), JoinType::Inner,
5736 JoinConstraint::On,
5737 NullEquality::NullEqualsNothing,
5738 false,
5739 )?;
5740
5741 let fields = join.schema.fields();
5742 assert_eq!(fields.len(), 6);
5743
5744 assert_eq!(
5745 fields[0].name(),
5746 "id",
5747 "First field should be 'id' from left table"
5748 );
5749 assert_eq!(
5750 fields[1].name(),
5751 "name",
5752 "Second field should be 'name' from left table"
5753 );
5754 assert_eq!(
5755 fields[2].name(),
5756 "value",
5757 "Third field should be 'value' from left table"
5758 );
5759 assert_eq!(
5760 fields[3].name(),
5761 "id",
5762 "Fourth field should be 'id' from right table"
5763 );
5764 assert_eq!(
5765 fields[4].name(),
5766 "category",
5767 "Fifth field should be 'category' from right table"
5768 );
5769 assert_eq!(
5770 fields[5].name(),
5771 "value",
5772 "Sixth field should be 'value' from right table"
5773 );
5774
5775 assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5776 }
5777
5778 {
5780 let join = Join::try_new(
5781 Arc::new(left_plan.clone()),
5782 Arc::new(right_plan.clone()),
5783 vec![(col("t1.id"), col("t2.id"))],
5784 None,
5785 JoinType::Inner,
5786 JoinConstraint::On,
5787 NullEquality::NullEqualsNull,
5788 false,
5789 )?;
5790
5791 assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5792 }
5793
5794 Ok(())
5795 }
5796
5797 #[test]
5798 fn test_join_try_new_schema_validation() -> Result<()> {
5799 let left_schema = Schema::new(vec![
5800 Field::new("id", DataType::Int32, false),
5801 Field::new("name", DataType::Utf8, false),
5802 Field::new("value", DataType::Float64, true),
5803 ]);
5804
5805 let right_schema = Schema::new(vec![
5806 Field::new("id", DataType::Int32, false),
5807 Field::new("category", DataType::Utf8, true),
5808 Field::new("code", DataType::Int16, false),
5809 ]);
5810
5811 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5812
5813 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5814
5815 let join_types = vec![
5816 JoinType::Inner,
5817 JoinType::Left,
5818 JoinType::Right,
5819 JoinType::Full,
5820 ];
5821
5822 for join_type in join_types {
5823 let join = Join::try_new(
5824 Arc::new(left_plan.clone()),
5825 Arc::new(right_plan.clone()),
5826 vec![(col("t1.id"), col("t2.id"))],
5827 Some(col("t1.value").gt(lit(5.0))),
5828 join_type,
5829 JoinConstraint::On,
5830 NullEquality::NullEqualsNothing,
5831 false,
5832 )?;
5833
5834 let fields = join.schema.fields();
5835 assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5836
5837 for (i, field) in fields.iter().enumerate() {
5838 let expected_nullable = match (i, &join_type) {
5839 (0, JoinType::Right | JoinType::Full) => true, (1, JoinType::Right | JoinType::Full) => true, (2, _) => true, (3, JoinType::Left | JoinType::Full) => true, (4, _) => true, (5, JoinType::Left | JoinType::Full) => true, _ => false,
5850 };
5851
5852 assert_eq!(
5853 field.is_nullable(),
5854 expected_nullable,
5855 "Field {} ({}) nullability incorrect for {:?} join",
5856 i,
5857 field.name(),
5858 join_type
5859 );
5860 }
5861 }
5862
5863 let using_join = Join::try_new(
5864 Arc::new(left_plan.clone()),
5865 Arc::new(right_plan.clone()),
5866 vec![(col("t1.id"), col("t2.id"))],
5867 None,
5868 JoinType::Inner,
5869 JoinConstraint::Using,
5870 NullEquality::NullEqualsNothing,
5871 false,
5872 )?;
5873
5874 assert_eq!(
5875 using_join.schema.fields().len(),
5876 6,
5877 "USING join should have all fields"
5878 );
5879 assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5880
5881 Ok(())
5882 }
5883}