1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::DdlStatement;
27use super::dml::CopyTo;
28use super::invariants::{
29 InvariantLevel, assert_always_invariants_at_current_node,
30 assert_executable_invariants,
31};
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34 Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams,
35 intersect_metadata_for_union,
36};
37use crate::expr_rewriter::{
38 NamePreserver, create_col_from_scalar_expr, normalize_cols, normalize_sorts,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45 grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
46};
47use crate::{
48 BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable,
49 LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
50 WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
51};
52
53use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
54use datafusion_common::cse::{NormalizeEq, Normalizeable};
55use datafusion_common::format::ExplainFormat;
56use datafusion_common::metadata::check_metadata_with_storage_equal;
57use datafusion_common::tree_node::{
58 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61 Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency,
62 FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result,
63 ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies,
64 assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
65};
66use indexmap::IndexSet;
67
68use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
206pub enum LogicalPlan {
207 Projection(Projection),
210 Filter(Filter),
219 Window(Window),
225 Aggregate(Aggregate),
231 Sort(Sort),
234 Join(Join),
237 Repartition(Repartition),
241 Union(Union),
245 TableScan(TableScan),
248 EmptyRelation(EmptyRelation),
252 Subquery(Subquery),
255 SubqueryAlias(SubqueryAlias),
257 Limit(Limit),
259 Statement(Statement),
261 Values(Values),
266 Explain(Explain),
269 Analyze(Analyze),
273 Extension(Extension),
276 Distinct(Distinct),
279 Dml(DmlStatement),
281 Ddl(DdlStatement),
283 Copy(CopyTo),
285 DescribeTable(DescribeTable),
288 Unnest(Unnest),
291 RecursiveQuery(RecursiveQuery),
293}
294
295impl Default for LogicalPlan {
296 fn default() -> Self {
297 LogicalPlan::EmptyRelation(EmptyRelation {
298 produce_one_row: false,
299 schema: Arc::new(DFSchema::empty()),
300 })
301 }
302}
303
304impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
305 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
306 &'a self,
307 mut f: F,
308 ) -> Result<TreeNodeRecursion> {
309 f(self)
310 }
311
312 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
313 self,
314 mut f: F,
315 ) -> Result<Transformed<Self>> {
316 f(self)
317 }
318}
319
320impl LogicalPlan {
321 pub fn schema(&self) -> &DFSchemaRef {
323 match self {
324 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
325 LogicalPlan::Values(Values { schema, .. }) => schema,
326 LogicalPlan::TableScan(TableScan {
327 projected_schema, ..
328 }) => projected_schema,
329 LogicalPlan::Projection(Projection { schema, .. }) => schema,
330 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
331 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
332 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
333 LogicalPlan::Window(Window { schema, .. }) => schema,
334 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
335 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
336 LogicalPlan::Join(Join { schema, .. }) => schema,
337 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
338 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
339 LogicalPlan::Statement(statement) => statement.schema(),
340 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
341 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
342 LogicalPlan::Explain(explain) => &explain.schema,
343 LogicalPlan::Analyze(analyze) => &analyze.schema,
344 LogicalPlan::Extension(extension) => extension.node.schema(),
345 LogicalPlan::Union(Union { schema, .. }) => schema,
346 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
347 output_schema
348 }
349 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
350 LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
351 LogicalPlan::Ddl(ddl) => ddl.schema(),
352 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
353 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
354 static_term.schema()
356 }
357 }
358 }
359
360 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
363 match self {
364 LogicalPlan::Window(_)
365 | LogicalPlan::Projection(_)
366 | LogicalPlan::Aggregate(_)
367 | LogicalPlan::Unnest(_)
368 | LogicalPlan::Join(_) => self
369 .inputs()
370 .iter()
371 .map(|input| input.schema().as_ref())
372 .collect(),
373 _ => vec![],
374 }
375 }
376
377 pub fn explain_schema() -> SchemaRef {
379 SchemaRef::new(Schema::new(vec![
380 Field::new("plan_type", DataType::Utf8, false),
381 Field::new("plan", DataType::Utf8, false),
382 ]))
383 }
384
385 pub fn describe_schema() -> Schema {
387 Schema::new(vec![
388 Field::new("column_name", DataType::Utf8, false),
389 Field::new("data_type", DataType::Utf8, false),
390 Field::new("is_nullable", DataType::Utf8, false),
391 ])
392 }
393
394 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
411 let mut exprs = vec![];
412 self.apply_expressions(|e| {
413 exprs.push(e.clone());
414 Ok(TreeNodeRecursion::Continue)
415 })
416 .unwrap();
418 exprs
419 }
420
421 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
424 let mut exprs = vec![];
425 self.apply_expressions(|e| {
426 find_out_reference_exprs(e).into_iter().for_each(|e| {
427 if !exprs.contains(&e) {
428 exprs.push(e)
429 }
430 });
431 Ok(TreeNodeRecursion::Continue)
432 })
433 .unwrap();
435 self.inputs()
436 .into_iter()
437 .flat_map(|child| child.all_out_ref_exprs())
438 .for_each(|e| {
439 if !exprs.contains(&e) {
440 exprs.push(e)
441 }
442 });
443 exprs
444 }
445
446 pub fn inputs(&self) -> Vec<&LogicalPlan> {
450 match self {
451 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
452 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
453 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
454 LogicalPlan::Window(Window { input, .. }) => vec![input],
455 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
456 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
457 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
458 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
459 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
460 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
461 LogicalPlan::Extension(extension) => extension.node.inputs(),
462 LogicalPlan::Union(Union { inputs, .. }) => {
463 inputs.iter().map(|arc| arc.as_ref()).collect()
464 }
465 LogicalPlan::Distinct(
466 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
467 ) => vec![input],
468 LogicalPlan::Explain(explain) => vec![&explain.plan],
469 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
470 LogicalPlan::Dml(write) => vec![&write.input],
471 LogicalPlan::Copy(copy) => vec![©.input],
472 LogicalPlan::Ddl(ddl) => ddl.inputs(),
473 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
474 LogicalPlan::RecursiveQuery(RecursiveQuery {
475 static_term,
476 recursive_term,
477 ..
478 }) => vec![static_term, recursive_term],
479 LogicalPlan::Statement(stmt) => stmt.inputs(),
480 LogicalPlan::TableScan { .. }
482 | LogicalPlan::EmptyRelation { .. }
483 | LogicalPlan::Values { .. }
484 | LogicalPlan::DescribeTable(_) => vec![],
485 }
486 }
487
488 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
490 let mut using_columns: Vec<HashSet<Column>> = vec![];
491
492 self.apply_with_subqueries(|plan| {
493 if let LogicalPlan::Join(Join {
494 join_constraint: JoinConstraint::Using,
495 on,
496 ..
497 }) = plan
498 {
499 let columns =
501 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
502 let Some(l) = l.get_as_join_column() else {
503 return internal_err!(
504 "Invalid join key. Expected column, found {l:?}"
505 );
506 };
507 let Some(r) = r.get_as_join_column() else {
508 return internal_err!(
509 "Invalid join key. Expected column, found {r:?}"
510 );
511 };
512 accumu.insert(l.to_owned());
513 accumu.insert(r.to_owned());
514 Result::<_, DataFusionError>::Ok(accumu)
515 })?;
516 using_columns.push(columns);
517 }
518 Ok(TreeNodeRecursion::Continue)
519 })?;
520
521 Ok(using_columns)
522 }
523
524 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
526 match self {
527 LogicalPlan::Projection(projection) => {
528 Ok(Some(projection.expr.as_slice()[0].clone()))
529 }
530 LogicalPlan::Aggregate(agg) => {
531 if agg.group_expr.is_empty() {
532 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
533 } else {
534 Ok(Some(agg.group_expr.as_slice()[0].clone()))
535 }
536 }
537 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
538 Ok(Some(select_expr[0].clone()))
539 }
540 LogicalPlan::Filter(Filter { input, .. })
541 | LogicalPlan::Distinct(Distinct::All(input))
542 | LogicalPlan::Sort(Sort { input, .. })
543 | LogicalPlan::Limit(Limit { input, .. })
544 | LogicalPlan::Repartition(Repartition { input, .. })
545 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
546 LogicalPlan::Join(Join {
547 left,
548 right,
549 join_type,
550 ..
551 }) => match join_type {
552 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
553 if left.schema().fields().is_empty() {
554 right.head_output_expr()
555 } else {
556 left.head_output_expr()
557 }
558 }
559 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
560 left.head_output_expr()
561 }
562 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
563 right.head_output_expr()
564 }
565 },
566 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
567 static_term.head_output_expr()
568 }
569 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
570 union.schema.qualified_field(0),
571 )))),
572 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
573 table.projected_schema.qualified_field(0),
574 )))),
575 LogicalPlan::SubqueryAlias(subquery_alias) => {
576 let expr_opt = subquery_alias.input.head_output_expr()?;
577 expr_opt
578 .map(|expr| {
579 Ok(Expr::Column(create_col_from_scalar_expr(
580 &expr,
581 subquery_alias.alias.to_string(),
582 )?))
583 })
584 .map_or(Ok(None), |v| v.map(Some))
585 }
586 LogicalPlan::Subquery(_) => Ok(None),
587 LogicalPlan::EmptyRelation(_)
588 | LogicalPlan::Statement(_)
589 | LogicalPlan::Values(_)
590 | LogicalPlan::Explain(_)
591 | LogicalPlan::Analyze(_)
592 | LogicalPlan::Extension(_)
593 | LogicalPlan::Dml(_)
594 | LogicalPlan::Copy(_)
595 | LogicalPlan::Ddl(_)
596 | LogicalPlan::DescribeTable(_)
597 | LogicalPlan::Unnest(_) => Ok(None),
598 }
599 }
600
601 pub fn recompute_schema(self) -> Result<Self> {
624 match self {
625 LogicalPlan::Projection(Projection {
628 expr,
629 input,
630 schema: _,
631 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
632 LogicalPlan::Dml(_) => Ok(self),
633 LogicalPlan::Copy(_) => Ok(self),
634 LogicalPlan::Values(Values { schema, values }) => {
635 Ok(LogicalPlan::Values(Values { schema, values }))
637 }
638 LogicalPlan::Filter(Filter { predicate, input }) => {
639 Filter::try_new(predicate, input).map(LogicalPlan::Filter)
640 }
641 LogicalPlan::Repartition(_) => Ok(self),
642 LogicalPlan::Window(Window {
643 input,
644 window_expr,
645 schema: _,
646 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
647 LogicalPlan::Aggregate(Aggregate {
648 input,
649 group_expr,
650 aggr_expr,
651 schema: _,
652 }) => Aggregate::try_new(input, group_expr, aggr_expr)
653 .map(LogicalPlan::Aggregate),
654 LogicalPlan::Sort(_) => Ok(self),
655 LogicalPlan::Join(Join {
656 left,
657 right,
658 filter,
659 join_type,
660 join_constraint,
661 on,
662 schema: _,
663 null_equality,
664 null_aware,
665 }) => {
666 let schema =
667 build_join_schema(left.schema(), right.schema(), &join_type)?;
668
669 let new_on: Vec<_> = on
670 .into_iter()
671 .map(|equi_expr| {
672 (equi_expr.0.unalias(), equi_expr.1.unalias())
674 })
675 .collect();
676
677 Ok(LogicalPlan::Join(Join {
678 left,
679 right,
680 join_type,
681 join_constraint,
682 on: new_on,
683 filter,
684 schema: DFSchemaRef::new(schema),
685 null_equality,
686 null_aware,
687 }))
688 }
689 LogicalPlan::Subquery(_) => Ok(self),
690 LogicalPlan::SubqueryAlias(SubqueryAlias {
691 input,
692 alias,
693 schema: _,
694 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
695 LogicalPlan::Limit(_) => Ok(self),
696 LogicalPlan::Ddl(_) => Ok(self),
697 LogicalPlan::Extension(Extension { node }) => {
698 let expr = node.expressions();
701 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
702 Ok(LogicalPlan::Extension(Extension {
703 node: node.with_exprs_and_inputs(expr, inputs)?,
704 }))
705 }
706 LogicalPlan::Union(Union { inputs, schema }) => {
707 let first_input_schema = inputs[0].schema();
708 if schema.fields().len() == first_input_schema.fields().len() {
709 Ok(LogicalPlan::Union(Union { inputs, schema }))
711 } else {
712 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
720 }
721 }
722 LogicalPlan::Distinct(distinct) => {
723 let distinct = match distinct {
724 Distinct::All(input) => Distinct::All(input),
725 Distinct::On(DistinctOn {
726 on_expr,
727 select_expr,
728 sort_expr,
729 input,
730 schema: _,
731 }) => Distinct::On(DistinctOn::try_new(
732 on_expr,
733 select_expr,
734 sort_expr,
735 input,
736 )?),
737 };
738 Ok(LogicalPlan::Distinct(distinct))
739 }
740 LogicalPlan::RecursiveQuery(_) => Ok(self),
741 LogicalPlan::Analyze(_) => Ok(self),
742 LogicalPlan::Explain(_) => Ok(self),
743 LogicalPlan::TableScan(_) => Ok(self),
744 LogicalPlan::EmptyRelation(_) => Ok(self),
745 LogicalPlan::Statement(_) => Ok(self),
746 LogicalPlan::DescribeTable(_) => Ok(self),
747 LogicalPlan::Unnest(Unnest {
748 input,
749 exec_columns,
750 options,
751 ..
752 }) => {
753 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
755 }
756 }
757 }
758
759 pub fn with_new_exprs(
785 &self,
786 mut expr: Vec<Expr>,
787 inputs: Vec<LogicalPlan>,
788 ) -> Result<LogicalPlan> {
789 match self {
790 LogicalPlan::Projection(Projection { .. }) => {
793 let input = self.only_input(inputs)?;
794 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
795 }
796 LogicalPlan::Dml(DmlStatement {
797 table_name,
798 target,
799 op,
800 ..
801 }) => {
802 self.assert_no_expressions(expr)?;
803 let input = self.only_input(inputs)?;
804 Ok(LogicalPlan::Dml(DmlStatement::new(
805 table_name.clone(),
806 Arc::clone(target),
807 op.clone(),
808 Arc::new(input),
809 )))
810 }
811 LogicalPlan::Copy(CopyTo {
812 input: _,
813 output_url,
814 file_type,
815 options,
816 partition_by,
817 output_schema: _,
818 }) => {
819 self.assert_no_expressions(expr)?;
820 let input = self.only_input(inputs)?;
821 Ok(LogicalPlan::Copy(CopyTo::new(
822 Arc::new(input),
823 output_url.clone(),
824 partition_by.clone(),
825 Arc::clone(file_type),
826 options.clone(),
827 )))
828 }
829 LogicalPlan::Values(Values { schema, .. }) => {
830 self.assert_no_inputs(inputs)?;
831 Ok(LogicalPlan::Values(Values {
832 schema: Arc::clone(schema),
833 values: expr
834 .chunks_exact(schema.fields().len())
835 .map(|s| s.to_vec())
836 .collect(),
837 }))
838 }
839 LogicalPlan::Filter { .. } => {
840 let predicate = self.only_expr(expr)?;
841 let input = self.only_input(inputs)?;
842
843 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
844 }
845 LogicalPlan::Repartition(Repartition {
846 partitioning_scheme,
847 ..
848 }) => match partitioning_scheme {
849 Partitioning::RoundRobinBatch(n) => {
850 self.assert_no_expressions(expr)?;
851 let input = self.only_input(inputs)?;
852 Ok(LogicalPlan::Repartition(Repartition {
853 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
854 input: Arc::new(input),
855 }))
856 }
857 Partitioning::Hash(_, n) => {
858 let input = self.only_input(inputs)?;
859 Ok(LogicalPlan::Repartition(Repartition {
860 partitioning_scheme: Partitioning::Hash(expr, *n),
861 input: Arc::new(input),
862 }))
863 }
864 Partitioning::DistributeBy(_) => {
865 let input = self.only_input(inputs)?;
866 Ok(LogicalPlan::Repartition(Repartition {
867 partitioning_scheme: Partitioning::DistributeBy(expr),
868 input: Arc::new(input),
869 }))
870 }
871 },
872 LogicalPlan::Window(Window { window_expr, .. }) => {
873 assert_eq!(window_expr.len(), expr.len());
874 let input = self.only_input(inputs)?;
875 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
876 }
877 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
878 let input = self.only_input(inputs)?;
879 let agg_expr = expr.split_off(group_expr.len());
881
882 Aggregate::try_new(Arc::new(input), expr, agg_expr)
883 .map(LogicalPlan::Aggregate)
884 }
885 LogicalPlan::Sort(Sort {
886 expr: sort_expr,
887 fetch,
888 ..
889 }) => {
890 let input = self.only_input(inputs)?;
891 Ok(LogicalPlan::Sort(Sort {
892 expr: expr
893 .into_iter()
894 .zip(sort_expr.iter())
895 .map(|(expr, sort)| sort.with_expr(expr))
896 .collect(),
897 input: Arc::new(input),
898 fetch: *fetch,
899 }))
900 }
901 LogicalPlan::Join(Join {
902 join_type,
903 join_constraint,
904 on,
905 null_equality,
906 null_aware,
907 ..
908 }) => {
909 let (left, right) = self.only_two_inputs(inputs)?;
910 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
911
912 let equi_expr_count = on.len() * 2;
913 assert!(expr.len() >= equi_expr_count);
914
915 let filter_expr = if expr.len() > equi_expr_count {
918 expr.pop()
919 } else {
920 None
921 };
922
923 assert_eq!(expr.len(), equi_expr_count);
926 let mut new_on = Vec::with_capacity(on.len());
927 let mut iter = expr.into_iter();
928 while let Some(left) = iter.next() {
929 let Some(right) = iter.next() else {
930 internal_err!(
931 "Expected a pair of expressions to construct the join on expression"
932 )?
933 };
934
935 new_on.push((left.unalias(), right.unalias()));
937 }
938
939 Ok(LogicalPlan::Join(Join {
940 left: Arc::new(left),
941 right: Arc::new(right),
942 join_type: *join_type,
943 join_constraint: *join_constraint,
944 on: new_on,
945 filter: filter_expr,
946 schema: DFSchemaRef::new(schema),
947 null_equality: *null_equality,
948 null_aware: *null_aware,
949 }))
950 }
951 LogicalPlan::Subquery(Subquery {
952 outer_ref_columns,
953 spans,
954 ..
955 }) => {
956 self.assert_no_expressions(expr)?;
957 let input = self.only_input(inputs)?;
958 let subquery = LogicalPlanBuilder::from(input).build()?;
959 Ok(LogicalPlan::Subquery(Subquery {
960 subquery: Arc::new(subquery),
961 outer_ref_columns: outer_ref_columns.clone(),
962 spans: spans.clone(),
963 }))
964 }
965 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
966 self.assert_no_expressions(expr)?;
967 let input = self.only_input(inputs)?;
968 SubqueryAlias::try_new(Arc::new(input), alias.clone())
969 .map(LogicalPlan::SubqueryAlias)
970 }
971 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
972 let old_expr_len = skip.iter().chain(fetch.iter()).count();
973 assert_eq_or_internal_err!(
974 old_expr_len,
975 expr.len(),
976 "Invalid number of new Limit expressions: expected {}, got {}",
977 old_expr_len,
978 expr.len()
979 );
980 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
982 let new_skip = skip.as_ref().and_then(|_| expr.pop());
983 let input = self.only_input(inputs)?;
984 Ok(LogicalPlan::Limit(Limit {
985 skip: new_skip.map(Box::new),
986 fetch: new_fetch.map(Box::new),
987 input: Arc::new(input),
988 }))
989 }
990 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
991 name,
992 if_not_exists,
993 or_replace,
994 column_defaults,
995 temporary,
996 ..
997 })) => {
998 self.assert_no_expressions(expr)?;
999 let input = self.only_input(inputs)?;
1000 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
1001 CreateMemoryTable {
1002 input: Arc::new(input),
1003 constraints: Constraints::default(),
1004 name: name.clone(),
1005 if_not_exists: *if_not_exists,
1006 or_replace: *or_replace,
1007 column_defaults: column_defaults.clone(),
1008 temporary: *temporary,
1009 },
1010 )))
1011 }
1012 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1013 name,
1014 or_replace,
1015 definition,
1016 temporary,
1017 ..
1018 })) => {
1019 self.assert_no_expressions(expr)?;
1020 let input = self.only_input(inputs)?;
1021 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1022 input: Arc::new(input),
1023 name: name.clone(),
1024 or_replace: *or_replace,
1025 temporary: *temporary,
1026 definition: definition.clone(),
1027 })))
1028 }
1029 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1030 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1031 })),
1032 LogicalPlan::Union(Union { schema, .. }) => {
1033 self.assert_no_expressions(expr)?;
1034 let input_schema = inputs[0].schema();
1035 let schema = if schema.fields().len() == input_schema.fields().len() {
1037 Arc::clone(schema)
1038 } else {
1039 Arc::clone(input_schema)
1040 };
1041 Ok(LogicalPlan::Union(Union {
1042 inputs: inputs.into_iter().map(Arc::new).collect(),
1043 schema,
1044 }))
1045 }
1046 LogicalPlan::Distinct(distinct) => {
1047 let distinct = match distinct {
1048 Distinct::All(_) => {
1049 self.assert_no_expressions(expr)?;
1050 let input = self.only_input(inputs)?;
1051 Distinct::All(Arc::new(input))
1052 }
1053 Distinct::On(DistinctOn {
1054 on_expr,
1055 select_expr,
1056 ..
1057 }) => {
1058 let input = self.only_input(inputs)?;
1059 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1060 let select_expr = expr.split_off(on_expr.len());
1061 assert!(
1062 sort_expr.is_empty(),
1063 "with_new_exprs for Distinct does not support sort expressions"
1064 );
1065 Distinct::On(DistinctOn::try_new(
1066 expr,
1067 select_expr,
1068 None, Arc::new(input),
1070 )?)
1071 }
1072 };
1073 Ok(LogicalPlan::Distinct(distinct))
1074 }
1075 LogicalPlan::RecursiveQuery(RecursiveQuery {
1076 name, is_distinct, ..
1077 }) => {
1078 self.assert_no_expressions(expr)?;
1079 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1080 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1081 name: name.clone(),
1082 static_term: Arc::new(static_term),
1083 recursive_term: Arc::new(recursive_term),
1084 is_distinct: *is_distinct,
1085 }))
1086 }
1087 LogicalPlan::Analyze(a) => {
1088 self.assert_no_expressions(expr)?;
1089 let input = self.only_input(inputs)?;
1090 Ok(LogicalPlan::Analyze(Analyze {
1091 verbose: a.verbose,
1092 schema: Arc::clone(&a.schema),
1093 input: Arc::new(input),
1094 }))
1095 }
1096 LogicalPlan::Explain(e) => {
1097 self.assert_no_expressions(expr)?;
1098 let input = self.only_input(inputs)?;
1099 Ok(LogicalPlan::Explain(Explain {
1100 verbose: e.verbose,
1101 plan: Arc::new(input),
1102 explain_format: e.explain_format.clone(),
1103 stringified_plans: e.stringified_plans.clone(),
1104 schema: Arc::clone(&e.schema),
1105 logical_optimization_succeeded: e.logical_optimization_succeeded,
1106 }))
1107 }
1108 LogicalPlan::Statement(Statement::Prepare(Prepare {
1109 name, fields, ..
1110 })) => {
1111 self.assert_no_expressions(expr)?;
1112 let input = self.only_input(inputs)?;
1113 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1114 name: name.clone(),
1115 fields: fields.clone(),
1116 input: Arc::new(input),
1117 })))
1118 }
1119 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1120 self.assert_no_inputs(inputs)?;
1121 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1122 name: name.clone(),
1123 parameters: expr,
1124 })))
1125 }
1126 LogicalPlan::TableScan(ts) => {
1127 self.assert_no_inputs(inputs)?;
1128 Ok(LogicalPlan::TableScan(TableScan {
1129 filters: expr,
1130 ..ts.clone()
1131 }))
1132 }
1133 LogicalPlan::EmptyRelation(_)
1134 | LogicalPlan::Ddl(_)
1135 | LogicalPlan::Statement(_)
1136 | LogicalPlan::DescribeTable(_) => {
1137 self.assert_no_expressions(expr)?;
1139 self.assert_no_inputs(inputs)?;
1140 Ok(self.clone())
1141 }
1142 LogicalPlan::Unnest(Unnest {
1143 exec_columns: columns,
1144 options,
1145 ..
1146 }) => {
1147 self.assert_no_expressions(expr)?;
1148 let input = self.only_input(inputs)?;
1149 let new_plan =
1151 unnest_with_options(input, columns.clone(), options.clone())?;
1152 Ok(new_plan)
1153 }
1154 }
1155 }
1156
1157 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1159 match check {
1160 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1161 InvariantLevel::Executable => assert_executable_invariants(self),
1162 }
1163 }
1164
1165 #[inline]
1167 #[expect(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1169 assert_or_internal_err!(
1170 expr.is_empty(),
1171 "{self:?} should have no exprs, got {:?}",
1172 expr
1173 );
1174 Ok(())
1175 }
1176
1177 #[inline]
1179 #[expect(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1181 assert_or_internal_err!(
1182 inputs.is_empty(),
1183 "{self:?} should have no inputs, got: {:?}",
1184 inputs
1185 );
1186 Ok(())
1187 }
1188
1189 #[inline]
1191 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1192 assert_eq_or_internal_err!(
1193 expr.len(),
1194 1,
1195 "{self:?} should have exactly one expr, got {:?}",
1196 &expr
1197 );
1198 Ok(expr.remove(0))
1199 }
1200
1201 #[inline]
1203 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1204 assert_eq_or_internal_err!(
1205 inputs.len(),
1206 1,
1207 "{self:?} should have exactly one input, got {:?}",
1208 &inputs
1209 );
1210 Ok(inputs.remove(0))
1211 }
1212
1213 #[inline]
1215 fn only_two_inputs(
1216 &self,
1217 mut inputs: Vec<LogicalPlan>,
1218 ) -> Result<(LogicalPlan, LogicalPlan)> {
1219 assert_eq_or_internal_err!(
1220 inputs.len(),
1221 2,
1222 "{self:?} should have exactly two inputs, got {:?}",
1223 &inputs
1224 );
1225 let right = inputs.remove(1);
1226 let left = inputs.remove(0);
1227 Ok((left, right))
1228 }
1229
1230 pub fn with_param_values(
1283 self,
1284 param_values: impl Into<ParamValues>,
1285 ) -> Result<LogicalPlan> {
1286 let param_values = param_values.into();
1287 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1288
1289 Ok(
1291 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1292 plan_with_values
1293 {
1294 param_values.verify_fields(&prepare_lp.fields)?;
1295 Arc::unwrap_or_clone(prepare_lp.input)
1297 } else {
1298 plan_with_values
1299 },
1300 )
1301 }
1302
1303 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1308 match self {
1309 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1310 LogicalPlan::Filter(filter) => {
1311 if filter.is_scalar() {
1312 Some(1)
1313 } else {
1314 filter.input.max_rows()
1315 }
1316 }
1317 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1318 LogicalPlan::Aggregate(Aggregate {
1319 input, group_expr, ..
1320 }) => {
1321 if group_expr
1323 .iter()
1324 .all(|expr| matches!(expr, Expr::Literal(_, _)))
1325 {
1326 Some(1)
1327 } else {
1328 input.max_rows()
1329 }
1330 }
1331 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1332 match (fetch, input.max_rows()) {
1333 (Some(fetch_limit), Some(input_max)) => {
1334 Some(input_max.min(*fetch_limit))
1335 }
1336 (Some(fetch_limit), None) => Some(*fetch_limit),
1337 (None, Some(input_max)) => Some(input_max),
1338 (None, None) => None,
1339 }
1340 }
1341 LogicalPlan::Join(Join {
1342 left,
1343 right,
1344 join_type,
1345 ..
1346 }) => match join_type {
1347 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1348 JoinType::Left | JoinType::Right | JoinType::Full => {
1349 match (left.max_rows()?, right.max_rows()?, join_type) {
1350 (0, 0, _) => Some(0),
1351 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1352 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1353 (left_max, right_max, _) => Some(left_max * right_max),
1354 }
1355 }
1356 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1357 left.max_rows()
1358 }
1359 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1360 right.max_rows()
1361 }
1362 },
1363 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1364 LogicalPlan::Union(Union { inputs, .. }) => {
1365 inputs.iter().try_fold(0usize, |mut acc, plan| {
1366 acc += plan.max_rows()?;
1367 Some(acc)
1368 })
1369 }
1370 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1371 LogicalPlan::EmptyRelation(_) => Some(0),
1372 LogicalPlan::RecursiveQuery(_) => None,
1373 LogicalPlan::Subquery(_) => None,
1374 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1375 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1376 Ok(FetchType::Literal(s)) => s,
1377 _ => None,
1378 },
1379 LogicalPlan::Distinct(
1380 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1381 ) => input.max_rows(),
1382 LogicalPlan::Values(v) => Some(v.values.len()),
1383 LogicalPlan::Unnest(_) => None,
1384 LogicalPlan::Ddl(_)
1385 | LogicalPlan::Explain(_)
1386 | LogicalPlan::Analyze(_)
1387 | LogicalPlan::Dml(_)
1388 | LogicalPlan::Copy(_)
1389 | LogicalPlan::DescribeTable(_)
1390 | LogicalPlan::Statement(_)
1391 | LogicalPlan::Extension(_) => None,
1392 }
1393 }
1394
1395 pub fn contains_outer_reference(&self) -> bool {
1397 let mut contains = false;
1398 self.apply_expressions(|expr| {
1399 Ok(if expr.contains_outer() {
1400 contains = true;
1401 TreeNodeRecursion::Stop
1402 } else {
1403 TreeNodeRecursion::Continue
1404 })
1405 })
1406 .unwrap();
1407 contains
1408 }
1409
1410 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1418 match self {
1419 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1420 .output_expressions()?
1421 .into_iter()
1422 .zip(self.schema().columns())
1423 .collect()),
1424 LogicalPlan::Window(Window {
1425 window_expr,
1426 input,
1427 schema,
1428 }) => {
1429 let mut output_exprs = input.columnized_output_exprs()?;
1437 let input_len = input.schema().fields().len();
1438 output_exprs.extend(
1439 window_expr
1440 .iter()
1441 .zip(schema.columns().into_iter().skip(input_len)),
1442 );
1443 Ok(output_exprs)
1444 }
1445 _ => Ok(vec![]),
1446 }
1447 }
1448}
1449
1450impl LogicalPlan {
1451 pub fn replace_params_with_values(
1458 self,
1459 param_values: &ParamValues,
1460 ) -> Result<LogicalPlan> {
1461 self.transform_up_with_subqueries(|plan| {
1462 let schema = Arc::clone(plan.schema());
1463 let name_preserver = NamePreserver::new(&plan);
1464 plan.map_expressions(|e| {
1465 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1466 if !has_placeholder {
1467 Ok(Transformed::no(e))
1471 } else {
1472 let original_name = name_preserver.save(&e);
1473 let transformed_expr = e.transform_up(|e| {
1474 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1475 let (value, metadata) = param_values
1476 .get_placeholders_with_values(&id)?
1477 .into_inner();
1478 Ok(Transformed::yes(Expr::Literal(value, metadata)))
1479 } else {
1480 Ok(Transformed::no(e))
1481 }
1482 })?;
1483 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1485 }
1486 })?
1487 .map_data(|plan| plan.update_schema_data_type())
1488 })
1489 .map(|res| res.data)
1490 }
1491
1492 fn update_schema_data_type(self) -> Result<LogicalPlan> {
1498 match self {
1499 LogicalPlan::Values(Values { values, schema: _ }) => {
1503 LogicalPlanBuilder::values(values)?.build()
1504 }
1505 plan => plan.recompute_schema(),
1507 }
1508 }
1509
1510 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1512 let mut param_names = HashSet::new();
1513 self.apply_with_subqueries(|plan| {
1514 plan.apply_expressions(|expr| {
1515 expr.apply(|expr| {
1516 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1517 param_names.insert(id.clone());
1518 }
1519 Ok(TreeNodeRecursion::Continue)
1520 })
1521 })
1522 })
1523 .map(|_| param_names)
1524 }
1525
1526 pub fn get_parameter_types(
1531 &self,
1532 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1533 let mut parameter_fields = self.get_parameter_fields()?;
1534 Ok(parameter_fields
1535 .drain()
1536 .map(|(name, maybe_field)| {
1537 (name, maybe_field.map(|field| field.data_type().clone()))
1538 })
1539 .collect())
1540 }
1541
1542 pub fn get_parameter_fields(
1544 &self,
1545 ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1546 let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1547
1548 self.apply_with_subqueries(|plan| {
1549 plan.apply_expressions(|expr| {
1550 expr.apply(|expr| {
1551 if let Expr::Placeholder(Placeholder { id, field }) = expr {
1552 let prev = param_types.get(id);
1553 match (prev, field) {
1554 (Some(Some(prev)), Some(field)) => {
1555 check_metadata_with_storage_equal(
1556 (field.data_type(), Some(field.metadata())),
1557 (prev.data_type(), Some(prev.metadata())),
1558 "parameter",
1559 &format!(": Conflicting types for id {id}"),
1560 )?;
1561 }
1562 (_, Some(field)) => {
1563 param_types.insert(id.clone(), Some(Arc::clone(field)));
1564 }
1565 _ => {
1566 param_types.insert(id.clone(), None);
1567 }
1568 }
1569 }
1570 Ok(TreeNodeRecursion::Continue)
1571 })
1572 })
1573 })
1574 .map(|_| param_types)
1575 }
1576
1577 pub fn display_indent(&self) -> impl Display + '_ {
1609 struct Wrapper<'a>(&'a LogicalPlan);
1612 impl Display for Wrapper<'_> {
1613 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1614 let with_schema = false;
1615 let mut visitor = IndentVisitor::new(f, with_schema);
1616 match self.0.visit_with_subqueries(&mut visitor) {
1617 Ok(_) => Ok(()),
1618 Err(_) => Err(fmt::Error),
1619 }
1620 }
1621 }
1622 Wrapper(self)
1623 }
1624
1625 pub fn display_indent_schema(&self) -> impl Display + '_ {
1655 struct Wrapper<'a>(&'a LogicalPlan);
1658 impl Display for Wrapper<'_> {
1659 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1660 let with_schema = true;
1661 let mut visitor = IndentVisitor::new(f, with_schema);
1662 match self.0.visit_with_subqueries(&mut visitor) {
1663 Ok(_) => Ok(()),
1664 Err(_) => Err(fmt::Error),
1665 }
1666 }
1667 }
1668 Wrapper(self)
1669 }
1670
1671 pub fn display_pg_json(&self) -> impl Display + '_ {
1675 struct Wrapper<'a>(&'a LogicalPlan);
1678 impl Display for Wrapper<'_> {
1679 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1680 let mut visitor = PgJsonVisitor::new(f);
1681 visitor.with_schema(true);
1682 match self.0.visit_with_subqueries(&mut visitor) {
1683 Ok(_) => Ok(()),
1684 Err(_) => Err(fmt::Error),
1685 }
1686 }
1687 }
1688 Wrapper(self)
1689 }
1690
1691 pub fn display_graphviz(&self) -> impl Display + '_ {
1721 struct Wrapper<'a>(&'a LogicalPlan);
1724 impl Display for Wrapper<'_> {
1725 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1726 let mut visitor = GraphvizVisitor::new(f);
1727
1728 visitor.start_graph()?;
1729
1730 visitor.pre_visit_plan("LogicalPlan")?;
1731 self.0
1732 .visit_with_subqueries(&mut visitor)
1733 .map_err(|_| fmt::Error)?;
1734 visitor.post_visit_plan()?;
1735
1736 visitor.set_with_schema(true);
1737 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1738 self.0
1739 .visit_with_subqueries(&mut visitor)
1740 .map_err(|_| fmt::Error)?;
1741 visitor.post_visit_plan()?;
1742
1743 visitor.end_graph()?;
1744 Ok(())
1745 }
1746 }
1747 Wrapper(self)
1748 }
1749
1750 pub fn display(&self) -> impl Display + '_ {
1772 struct Wrapper<'a>(&'a LogicalPlan);
1775 impl Display for Wrapper<'_> {
1776 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1777 match self.0 {
1778 LogicalPlan::EmptyRelation(EmptyRelation {
1779 produce_one_row,
1780 schema: _,
1781 }) => {
1782 let rows = if *produce_one_row { 1 } else { 0 };
1783 write!(f, "EmptyRelation: rows={rows}")
1784 }
1785 LogicalPlan::RecursiveQuery(RecursiveQuery {
1786 is_distinct, ..
1787 }) => {
1788 write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1789 }
1790 LogicalPlan::Values(Values { values, .. }) => {
1791 let str_values: Vec<_> = values
1792 .iter()
1793 .take(5)
1795 .map(|row| {
1796 let item = row
1797 .iter()
1798 .map(|expr| expr.to_string())
1799 .collect::<Vec<_>>()
1800 .join(", ");
1801 format!("({item})")
1802 })
1803 .collect();
1804
1805 let eclipse = if values.len() > 5 { "..." } else { "" };
1806 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1807 }
1808
1809 LogicalPlan::TableScan(TableScan {
1810 source,
1811 table_name,
1812 projection,
1813 filters,
1814 fetch,
1815 ..
1816 }) => {
1817 let projected_fields = match projection {
1818 Some(indices) => {
1819 let schema = source.schema();
1820 let names: Vec<&str> = indices
1821 .iter()
1822 .map(|i| schema.field(*i).name().as_str())
1823 .collect();
1824 format!(" projection=[{}]", names.join(", "))
1825 }
1826 _ => "".to_string(),
1827 };
1828
1829 write!(f, "TableScan: {table_name}{projected_fields}")?;
1830
1831 if !filters.is_empty() {
1832 let mut full_filter = vec![];
1833 let mut partial_filter = vec![];
1834 let mut unsupported_filters = vec![];
1835 let filters: Vec<&Expr> = filters.iter().collect();
1836
1837 if let Ok(results) =
1838 source.supports_filters_pushdown(&filters)
1839 {
1840 filters.iter().zip(results.iter()).for_each(
1841 |(x, res)| match res {
1842 TableProviderFilterPushDown::Exact => {
1843 full_filter.push(x)
1844 }
1845 TableProviderFilterPushDown::Inexact => {
1846 partial_filter.push(x)
1847 }
1848 TableProviderFilterPushDown::Unsupported => {
1849 unsupported_filters.push(x)
1850 }
1851 },
1852 );
1853 }
1854
1855 if !full_filter.is_empty() {
1856 write!(
1857 f,
1858 ", full_filters=[{}]",
1859 expr_vec_fmt!(full_filter)
1860 )?;
1861 };
1862 if !partial_filter.is_empty() {
1863 write!(
1864 f,
1865 ", partial_filters=[{}]",
1866 expr_vec_fmt!(partial_filter)
1867 )?;
1868 }
1869 if !unsupported_filters.is_empty() {
1870 write!(
1871 f,
1872 ", unsupported_filters=[{}]",
1873 expr_vec_fmt!(unsupported_filters)
1874 )?;
1875 }
1876 }
1877
1878 if let Some(n) = fetch {
1879 write!(f, ", fetch={n}")?;
1880 }
1881
1882 Ok(())
1883 }
1884 LogicalPlan::Projection(Projection { expr, .. }) => {
1885 write!(f, "Projection:")?;
1886 for (i, expr_item) in expr.iter().enumerate() {
1887 if i > 0 {
1888 write!(f, ",")?;
1889 }
1890 write!(f, " {expr_item}")?;
1891 }
1892 Ok(())
1893 }
1894 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1895 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1896 }
1897 LogicalPlan::Copy(CopyTo {
1898 input: _,
1899 output_url,
1900 file_type,
1901 options,
1902 ..
1903 }) => {
1904 let op_str = options
1905 .iter()
1906 .map(|(k, v)| format!("{k} {v}"))
1907 .collect::<Vec<String>>()
1908 .join(", ");
1909
1910 write!(
1911 f,
1912 "CopyTo: format={} output_url={output_url} options: ({op_str})",
1913 file_type.get_ext()
1914 )
1915 }
1916 LogicalPlan::Ddl(ddl) => {
1917 write!(f, "{}", ddl.display())
1918 }
1919 LogicalPlan::Filter(Filter {
1920 predicate: expr, ..
1921 }) => write!(f, "Filter: {expr}"),
1922 LogicalPlan::Window(Window { window_expr, .. }) => {
1923 write!(
1924 f,
1925 "WindowAggr: windowExpr=[[{}]]",
1926 expr_vec_fmt!(window_expr)
1927 )
1928 }
1929 LogicalPlan::Aggregate(Aggregate {
1930 group_expr,
1931 aggr_expr,
1932 ..
1933 }) => write!(
1934 f,
1935 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1936 expr_vec_fmt!(group_expr),
1937 expr_vec_fmt!(aggr_expr)
1938 ),
1939 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1940 write!(f, "Sort: ")?;
1941 for (i, expr_item) in expr.iter().enumerate() {
1942 if i > 0 {
1943 write!(f, ", ")?;
1944 }
1945 write!(f, "{expr_item}")?;
1946 }
1947 if let Some(a) = fetch {
1948 write!(f, ", fetch={a}")?;
1949 }
1950
1951 Ok(())
1952 }
1953 LogicalPlan::Join(Join {
1954 on: keys,
1955 filter,
1956 join_constraint,
1957 join_type,
1958 ..
1959 }) => {
1960 let join_expr: Vec<String> =
1961 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1962 let filter_expr = filter
1963 .as_ref()
1964 .map(|expr| format!(" Filter: {expr}"))
1965 .unwrap_or_else(|| "".to_string());
1966 let join_type = if filter.is_none()
1967 && keys.is_empty()
1968 && *join_type == JoinType::Inner
1969 {
1970 "Cross".to_string()
1971 } else {
1972 join_type.to_string()
1973 };
1974 match join_constraint {
1975 JoinConstraint::On => {
1976 write!(f, "{join_type} Join:",)?;
1977 if !join_expr.is_empty() || !filter_expr.is_empty() {
1978 write!(
1979 f,
1980 " {}{}",
1981 join_expr.join(", "),
1982 filter_expr
1983 )?;
1984 }
1985 Ok(())
1986 }
1987 JoinConstraint::Using => {
1988 write!(
1989 f,
1990 "{} Join: Using {}{}",
1991 join_type,
1992 join_expr.join(", "),
1993 filter_expr,
1994 )
1995 }
1996 }
1997 }
1998 LogicalPlan::Repartition(Repartition {
1999 partitioning_scheme,
2000 ..
2001 }) => match partitioning_scheme {
2002 Partitioning::RoundRobinBatch(n) => {
2003 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
2004 }
2005 Partitioning::Hash(expr, n) => {
2006 let hash_expr: Vec<String> =
2007 expr.iter().map(|e| format!("{e}")).collect();
2008 write!(
2009 f,
2010 "Repartition: Hash({}) partition_count={}",
2011 hash_expr.join(", "),
2012 n
2013 )
2014 }
2015 Partitioning::DistributeBy(expr) => {
2016 let dist_by_expr: Vec<String> =
2017 expr.iter().map(|e| format!("{e}")).collect();
2018 write!(
2019 f,
2020 "Repartition: DistributeBy({})",
2021 dist_by_expr.join(", "),
2022 )
2023 }
2024 },
2025 LogicalPlan::Limit(limit) => {
2026 let skip_str = match limit.get_skip_type() {
2028 Ok(SkipType::Literal(n)) => n.to_string(),
2029 _ => limit
2030 .skip
2031 .as_ref()
2032 .map_or_else(|| "None".to_string(), |x| x.to_string()),
2033 };
2034 let fetch_str = match limit.get_fetch_type() {
2035 Ok(FetchType::Literal(Some(n))) => n.to_string(),
2036 Ok(FetchType::Literal(None)) => "None".to_string(),
2037 _ => limit
2038 .fetch
2039 .as_ref()
2040 .map_or_else(|| "None".to_string(), |x| x.to_string()),
2041 };
2042 write!(f, "Limit: skip={skip_str}, fetch={fetch_str}",)
2043 }
2044 LogicalPlan::Subquery(Subquery { .. }) => {
2045 write!(f, "Subquery:")
2046 }
2047 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
2048 write!(f, "SubqueryAlias: {alias}")
2049 }
2050 LogicalPlan::Statement(statement) => {
2051 write!(f, "{}", statement.display())
2052 }
2053 LogicalPlan::Distinct(distinct) => match distinct {
2054 Distinct::All(_) => write!(f, "Distinct:"),
2055 Distinct::On(DistinctOn {
2056 on_expr,
2057 select_expr,
2058 sort_expr,
2059 ..
2060 }) => write!(
2061 f,
2062 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2063 expr_vec_fmt!(on_expr),
2064 expr_vec_fmt!(select_expr),
2065 if let Some(sort_expr) = sort_expr {
2066 expr_vec_fmt!(sort_expr)
2067 } else {
2068 "".to_string()
2069 },
2070 ),
2071 },
2072 LogicalPlan::Explain { .. } => write!(f, "Explain"),
2073 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2074 LogicalPlan::Union(_) => write!(f, "Union"),
2075 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2076 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2077 write!(f, "DescribeTable")
2078 }
2079 LogicalPlan::Unnest(Unnest {
2080 input: plan,
2081 list_type_columns: list_col_indices,
2082 struct_type_columns: struct_col_indices,
2083 ..
2084 }) => {
2085 let input_columns = plan.schema().columns();
2086 let list_type_columns = list_col_indices
2087 .iter()
2088 .map(|(i, unnest_info)| {
2089 format!(
2090 "{}|depth={}",
2091 &input_columns[*i].to_string(),
2092 unnest_info.depth
2093 )
2094 })
2095 .collect::<Vec<String>>();
2096 let struct_type_columns = struct_col_indices
2097 .iter()
2098 .map(|i| &input_columns[*i])
2099 .collect::<Vec<&Column>>();
2100 write!(
2102 f,
2103 "Unnest: lists[{}] structs[{}]",
2104 expr_vec_fmt!(list_type_columns),
2105 expr_vec_fmt!(struct_type_columns)
2106 )
2107 }
2108 }
2109 }
2110 }
2111 Wrapper(self)
2112 }
2113}
2114
2115impl Display for LogicalPlan {
2116 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2117 self.display_indent().fmt(f)
2118 }
2119}
2120
2121impl ToStringifiedPlan for LogicalPlan {
2122 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2123 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2124 }
2125}
2126
2127#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2131pub struct EmptyRelation {
2132 pub produce_one_row: bool,
2134 pub schema: DFSchemaRef,
2136}
2137
2138impl PartialOrd for EmptyRelation {
2140 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2141 self.produce_one_row
2142 .partial_cmp(&other.produce_one_row)
2143 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2145 }
2146}
2147
2148#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2171pub struct RecursiveQuery {
2172 pub name: String,
2174 pub static_term: Arc<LogicalPlan>,
2176 pub recursive_term: Arc<LogicalPlan>,
2179 pub is_distinct: bool,
2182}
2183
2184#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2188pub struct Values {
2189 pub schema: DFSchemaRef,
2191 pub values: Vec<Vec<Expr>>,
2193}
2194
2195impl PartialOrd for Values {
2197 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2198 self.values
2199 .partial_cmp(&other.values)
2200 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2202 }
2203}
2204
2205#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2208#[non_exhaustive]
2210pub struct Projection {
2211 pub expr: Vec<Expr>,
2213 pub input: Arc<LogicalPlan>,
2215 pub schema: DFSchemaRef,
2217}
2218
2219impl PartialOrd for Projection {
2221 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2222 match self.expr.partial_cmp(&other.expr) {
2223 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2224 cmp => cmp,
2225 }
2226 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2228 }
2229}
2230
2231impl Projection {
2232 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2234 let projection_schema = projection_schema(&input, &expr)?;
2235 Self::try_new_with_schema(expr, input, projection_schema)
2236 }
2237
2238 pub fn try_new_with_schema(
2240 expr: Vec<Expr>,
2241 input: Arc<LogicalPlan>,
2242 schema: DFSchemaRef,
2243 ) -> Result<Self> {
2244 #[expect(deprecated)]
2245 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2246 && expr.len() != schema.fields().len()
2247 {
2248 return plan_err!(
2249 "Projection has mismatch between number of expressions ({}) and number of fields in schema ({})",
2250 expr.len(),
2251 schema.fields().len()
2252 );
2253 }
2254 Ok(Self {
2255 expr,
2256 input,
2257 schema,
2258 })
2259 }
2260
2261 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2263 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2264 Self {
2265 expr,
2266 input,
2267 schema,
2268 }
2269 }
2270}
2271
2272pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2292 let metadata = input.schema().metadata().clone();
2294
2295 let schema =
2297 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2298 .with_functional_dependencies(calc_func_dependencies_for_project(
2299 exprs, input,
2300 )?)?;
2301
2302 Ok(Arc::new(schema))
2303}
2304
2305#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2307#[non_exhaustive]
2309pub struct SubqueryAlias {
2310 pub input: Arc<LogicalPlan>,
2312 pub alias: TableReference,
2314 pub schema: DFSchemaRef,
2316}
2317
2318impl SubqueryAlias {
2319 pub fn try_new(
2320 plan: Arc<LogicalPlan>,
2321 alias: impl Into<TableReference>,
2322 ) -> Result<Self> {
2323 let alias = alias.into();
2324
2325 let aliases = unique_field_aliases(plan.schema().fields());
2331 let is_projection_needed = aliases.iter().any(Option::is_some);
2332
2333 let plan = if is_projection_needed {
2335 let projection_expressions = aliases
2336 .iter()
2337 .zip(plan.schema().iter())
2338 .map(|(alias, (qualifier, field))| {
2339 let column =
2340 Expr::Column(Column::new(qualifier.cloned(), field.name()));
2341 match alias {
2342 None => column,
2343 Some(alias) => {
2344 Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2345 }
2346 }
2347 })
2348 .collect();
2349 let projection = Projection::try_new(projection_expressions, plan)?;
2350 Arc::new(LogicalPlan::Projection(projection))
2351 } else {
2352 plan
2353 };
2354
2355 let fields = plan.schema().fields().clone();
2357 let meta_data = plan.schema().metadata().clone();
2358 let func_dependencies = plan.schema().functional_dependencies().clone();
2359
2360 let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2361 let schema = schema.as_arrow();
2362
2363 let schema = DFSchemaRef::new(
2364 DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2365 .with_functional_dependencies(func_dependencies)?,
2366 );
2367 Ok(SubqueryAlias {
2368 input: plan,
2369 alias,
2370 schema,
2371 })
2372 }
2373}
2374
2375impl PartialOrd for SubqueryAlias {
2377 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2378 match self.input.partial_cmp(&other.input) {
2379 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2380 cmp => cmp,
2381 }
2382 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2384 }
2385}
2386
2387#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2399#[non_exhaustive]
2400pub struct Filter {
2401 pub predicate: Expr,
2403 pub input: Arc<LogicalPlan>,
2405}
2406
2407impl Filter {
2408 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2413 Self::try_new_internal(predicate, input)
2414 }
2415
2416 #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2419 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2420 Self::try_new_internal(predicate, input)
2421 }
2422
2423 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2424 match data_type {
2425 DataType::Boolean | DataType::Null => true,
2427 DataType::Dictionary(_, value_type) => {
2428 Filter::is_allowed_filter_type(value_type.as_ref())
2429 }
2430 _ => false,
2431 }
2432 }
2433
2434 fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2435 if let Ok(predicate_type) = predicate.get_type(input.schema())
2440 && !Filter::is_allowed_filter_type(&predicate_type)
2441 {
2442 return plan_err!(
2443 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2444 );
2445 }
2446
2447 Ok(Self {
2448 predicate: predicate.unalias_nested().data,
2449 input,
2450 })
2451 }
2452
2453 fn is_scalar(&self) -> bool {
2469 let schema = self.input.schema();
2470
2471 let functional_dependencies = self.input.schema().functional_dependencies();
2472 let unique_keys = functional_dependencies.iter().filter(|dep| {
2473 let nullable = dep.nullable
2474 && dep
2475 .source_indices
2476 .iter()
2477 .any(|&source| schema.field(source).is_nullable());
2478 !nullable
2479 && dep.mode == Dependency::Single
2480 && dep.target_indices.len() == schema.fields().len()
2481 });
2482
2483 let exprs = split_conjunction(&self.predicate);
2484 let eq_pred_cols: HashSet<_> = exprs
2485 .iter()
2486 .filter_map(|expr| {
2487 let Expr::BinaryExpr(BinaryExpr {
2488 left,
2489 op: Operator::Eq,
2490 right,
2491 }) = expr
2492 else {
2493 return None;
2494 };
2495 if left == right {
2497 return None;
2498 }
2499
2500 match (left.as_ref(), right.as_ref()) {
2501 (Expr::Column(_), Expr::Column(_)) => None,
2502 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2503 Some(schema.index_of_column(c).unwrap())
2504 }
2505 _ => None,
2506 }
2507 })
2508 .collect();
2509
2510 for key in unique_keys {
2513 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2514 return true;
2515 }
2516 }
2517 false
2518 }
2519}
2520
2521#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2536pub struct Window {
2537 pub input: Arc<LogicalPlan>,
2539 pub window_expr: Vec<Expr>,
2541 pub schema: DFSchemaRef,
2543}
2544
2545impl Window {
2546 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2548 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2549 .schema()
2550 .iter()
2551 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2552 .collect();
2553 let input_len = fields.len();
2554 let mut window_fields = fields;
2555 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2556 window_fields.extend_from_slice(expr_fields.as_slice());
2557 let metadata = input.schema().metadata().clone();
2558
2559 let mut window_func_dependencies =
2561 input.schema().functional_dependencies().clone();
2562 window_func_dependencies.extend_target_indices(window_fields.len());
2563
2564 let mut new_dependencies = window_expr
2568 .iter()
2569 .enumerate()
2570 .filter_map(|(idx, expr)| {
2571 let Expr::WindowFunction(window_fun) = expr else {
2572 return None;
2573 };
2574 let WindowFunction {
2575 fun: WindowFunctionDefinition::WindowUDF(udwf),
2576 params: WindowFunctionParams { partition_by, .. },
2577 } = window_fun.as_ref()
2578 else {
2579 return None;
2580 };
2581 if udwf.name() == "row_number" && partition_by.is_empty() {
2584 Some(idx + input_len)
2585 } else {
2586 None
2587 }
2588 })
2589 .map(|idx| {
2590 FunctionalDependence::new(vec![idx], vec![], false)
2591 .with_mode(Dependency::Single)
2592 })
2593 .collect::<Vec<_>>();
2594
2595 if !new_dependencies.is_empty() {
2596 for dependence in new_dependencies.iter_mut() {
2597 dependence.target_indices = (0..window_fields.len()).collect();
2598 }
2599 let new_deps = FunctionalDependencies::new(new_dependencies);
2601 window_func_dependencies.extend(new_deps);
2602 }
2603
2604 if let Some(e) = window_expr.iter().find(|e| {
2606 matches!(
2607 e,
2608 Expr::WindowFunction(wf)
2609 if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2610 && wf.params.filter.is_some()
2611 )
2612 }) {
2613 return plan_err!(
2614 "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2615 );
2616 }
2617
2618 Self::try_new_with_schema(
2619 window_expr,
2620 input,
2621 Arc::new(
2622 DFSchema::new_with_metadata(window_fields, metadata)?
2623 .with_functional_dependencies(window_func_dependencies)?,
2624 ),
2625 )
2626 }
2627
2628 pub fn try_new_with_schema(
2634 window_expr: Vec<Expr>,
2635 input: Arc<LogicalPlan>,
2636 schema: DFSchemaRef,
2637 ) -> Result<Self> {
2638 let input_fields_count = input.schema().fields().len();
2639 if schema.fields().len() != input_fields_count + window_expr.len() {
2640 return plan_err!(
2641 "Window schema has wrong number of fields. Expected {} got {}",
2642 input_fields_count + window_expr.len(),
2643 schema.fields().len()
2644 );
2645 }
2646
2647 Ok(Window {
2648 input,
2649 window_expr,
2650 schema,
2651 })
2652 }
2653}
2654
2655impl PartialOrd for Window {
2657 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2658 match self.input.partial_cmp(&other.input)? {
2659 Ordering::Equal => {} not_equal => return Some(not_equal),
2661 }
2662
2663 match self.window_expr.partial_cmp(&other.window_expr)? {
2664 Ordering::Equal => {} not_equal => return Some(not_equal),
2666 }
2667
2668 if self == other {
2671 Some(Ordering::Equal)
2672 } else {
2673 None
2674 }
2675 }
2676}
2677
2678#[derive(Clone)]
2680pub struct TableScan {
2681 pub table_name: TableReference,
2683 pub source: Arc<dyn TableSource>,
2685 pub projection: Option<Vec<usize>>,
2687 pub projected_schema: DFSchemaRef,
2689 pub filters: Vec<Expr>,
2691 pub fetch: Option<usize>,
2693}
2694
2695impl Debug for TableScan {
2696 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2697 f.debug_struct("TableScan")
2698 .field("table_name", &self.table_name)
2699 .field("source", &"...")
2700 .field("projection", &self.projection)
2701 .field("projected_schema", &self.projected_schema)
2702 .field("filters", &self.filters)
2703 .field("fetch", &self.fetch)
2704 .finish_non_exhaustive()
2705 }
2706}
2707
2708impl PartialEq for TableScan {
2709 fn eq(&self, other: &Self) -> bool {
2710 self.table_name == other.table_name
2711 && self.projection == other.projection
2712 && self.projected_schema == other.projected_schema
2713 && self.filters == other.filters
2714 && self.fetch == other.fetch
2715 }
2716}
2717
2718impl Eq for TableScan {}
2719
2720impl PartialOrd for TableScan {
2723 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2724 #[derive(PartialEq, PartialOrd)]
2725 struct ComparableTableScan<'a> {
2726 pub table_name: &'a TableReference,
2728 pub projection: &'a Option<Vec<usize>>,
2730 pub filters: &'a Vec<Expr>,
2732 pub fetch: &'a Option<usize>,
2734 }
2735 let comparable_self = ComparableTableScan {
2736 table_name: &self.table_name,
2737 projection: &self.projection,
2738 filters: &self.filters,
2739 fetch: &self.fetch,
2740 };
2741 let comparable_other = ComparableTableScan {
2742 table_name: &other.table_name,
2743 projection: &other.projection,
2744 filters: &other.filters,
2745 fetch: &other.fetch,
2746 };
2747 comparable_self
2748 .partial_cmp(&comparable_other)
2749 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2751 }
2752}
2753
2754impl Hash for TableScan {
2755 fn hash<H: Hasher>(&self, state: &mut H) {
2756 self.table_name.hash(state);
2757 self.projection.hash(state);
2758 self.projected_schema.hash(state);
2759 self.filters.hash(state);
2760 self.fetch.hash(state);
2761 }
2762}
2763
2764impl TableScan {
2765 pub fn try_new(
2768 table_name: impl Into<TableReference>,
2769 table_source: Arc<dyn TableSource>,
2770 projection: Option<Vec<usize>>,
2771 filters: Vec<Expr>,
2772 fetch: Option<usize>,
2773 ) -> Result<Self> {
2774 let table_name = table_name.into();
2775
2776 if table_name.table().is_empty() {
2777 return plan_err!("table_name cannot be empty");
2778 }
2779 let schema = table_source.schema();
2780 let func_dependencies = FunctionalDependencies::new_from_constraints(
2781 table_source.constraints(),
2782 schema.fields.len(),
2783 );
2784 let projected_schema = projection
2785 .as_ref()
2786 .map(|p| {
2787 let projected_func_dependencies =
2788 func_dependencies.project_functional_dependencies(p, p.len());
2789
2790 let df_schema = DFSchema::new_with_metadata(
2791 p.iter()
2792 .map(|i| {
2793 (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2794 })
2795 .collect(),
2796 schema.metadata.clone(),
2797 )?;
2798 df_schema.with_functional_dependencies(projected_func_dependencies)
2799 })
2800 .unwrap_or_else(|| {
2801 let df_schema =
2802 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2803 df_schema.with_functional_dependencies(func_dependencies)
2804 })?;
2805 let projected_schema = Arc::new(projected_schema);
2806
2807 Ok(Self {
2808 table_name,
2809 source: table_source,
2810 projection,
2811 projected_schema,
2812 filters,
2813 fetch,
2814 })
2815 }
2816}
2817
2818#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2820pub struct Repartition {
2821 pub input: Arc<LogicalPlan>,
2823 pub partitioning_scheme: Partitioning,
2825}
2826
2827#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2829pub struct Union {
2830 pub inputs: Vec<Arc<LogicalPlan>>,
2832 pub schema: DFSchemaRef,
2834}
2835
2836impl Union {
2837 pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2840 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2841 Ok(Union { inputs, schema })
2842 }
2843
2844 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2849 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2850 Ok(Union { inputs, schema })
2851 }
2852
2853 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2857 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2858 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2859
2860 Ok(Union { inputs, schema })
2861 }
2862
2863 fn rewrite_inputs_from_schema(
2867 schema: &Arc<DFSchema>,
2868 inputs: Vec<Arc<LogicalPlan>>,
2869 ) -> Result<Vec<Arc<LogicalPlan>>> {
2870 let schema_width = schema.iter().count();
2871 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2872 for input in inputs {
2873 let mut expr = Vec::with_capacity(schema_width);
2877 for column in schema.columns() {
2878 if input
2879 .schema()
2880 .has_column_with_unqualified_name(column.name())
2881 {
2882 expr.push(Expr::Column(column));
2883 } else {
2884 expr.push(
2885 Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2886 );
2887 }
2888 }
2889 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2890 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2891 )));
2892 }
2893
2894 Ok(wrapped_inputs)
2895 }
2896
2897 fn derive_schema_from_inputs(
2906 inputs: &[Arc<LogicalPlan>],
2907 loose_types: bool,
2908 by_name: bool,
2909 ) -> Result<DFSchemaRef> {
2910 if inputs.len() < 2 {
2911 return plan_err!("UNION requires at least two inputs");
2912 }
2913
2914 if by_name {
2915 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2916 } else {
2917 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2918 }
2919 }
2920
2921 fn derive_schema_from_inputs_by_name(
2922 inputs: &[Arc<LogicalPlan>],
2923 loose_types: bool,
2924 ) -> Result<DFSchemaRef> {
2925 type FieldData<'a> =
2926 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2927 let mut cols: Vec<(&str, FieldData)> = Vec::new();
2928 for input in inputs.iter() {
2929 for field in input.schema().fields() {
2930 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2931 cols.iter_mut().find(|(name, _)| name == field.name())
2932 {
2933 if !loose_types && *data_type != field.data_type() {
2934 return plan_err!(
2935 "Found different types for field {}",
2936 field.name()
2937 );
2938 }
2939
2940 metadata.push(field.metadata());
2941 *is_nullable |= field.is_nullable();
2944 *occurrences += 1;
2945 } else {
2946 cols.push((
2947 field.name(),
2948 (
2949 field.data_type(),
2950 field.is_nullable(),
2951 vec![field.metadata()],
2952 1,
2953 ),
2954 ));
2955 }
2956 }
2957 }
2958
2959 let union_fields = cols
2960 .into_iter()
2961 .map(
2962 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2963 let final_is_nullable = if occurrences == inputs.len() {
2967 is_nullable
2968 } else {
2969 true
2970 };
2971
2972 let mut field =
2973 Field::new(name, data_type.clone(), final_is_nullable);
2974 field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
2975
2976 (None, Arc::new(field))
2977 },
2978 )
2979 .collect::<Vec<(Option<TableReference>, _)>>();
2980
2981 let union_schema_metadata = intersect_metadata_for_union(
2982 inputs.iter().map(|input| input.schema().metadata()),
2983 );
2984
2985 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2987 let schema = Arc::new(schema);
2988
2989 Ok(schema)
2990 }
2991
2992 fn derive_schema_from_inputs_by_position(
2993 inputs: &[Arc<LogicalPlan>],
2994 loose_types: bool,
2995 ) -> Result<DFSchemaRef> {
2996 let first_schema = inputs[0].schema();
2997 let fields_count = first_schema.fields().len();
2998 for input in inputs.iter().skip(1) {
2999 if fields_count != input.schema().fields().len() {
3000 return plan_err!(
3001 "UNION queries have different number of columns: \
3002 left has {} columns whereas right has {} columns",
3003 fields_count,
3004 input.schema().fields().len()
3005 );
3006 }
3007 }
3008
3009 let mut name_counts: HashMap<String, usize> = HashMap::new();
3010 let union_fields = (0..fields_count)
3011 .map(|i| {
3012 let fields = inputs
3013 .iter()
3014 .map(|input| input.schema().field(i))
3015 .collect::<Vec<_>>();
3016 let first_field = fields[0];
3017 let base_name = first_field.name().to_string();
3018
3019 let data_type = if loose_types {
3020 first_field.data_type()
3024 } else {
3025 fields.iter().skip(1).try_fold(
3026 first_field.data_type(),
3027 |acc, field| {
3028 if acc != field.data_type() {
3029 return plan_err!(
3030 "UNION field {i} have different type in inputs: \
3031 left has {} whereas right has {}",
3032 first_field.data_type(),
3033 field.data_type()
3034 );
3035 }
3036 Ok(acc)
3037 },
3038 )?
3039 };
3040 let nullable = fields.iter().any(|field| field.is_nullable());
3041
3042 let name = if let Some(count) = name_counts.get_mut(&base_name) {
3044 *count += 1;
3045 format!("{base_name}_{count}")
3046 } else {
3047 name_counts.insert(base_name.clone(), 0);
3048 base_name
3049 };
3050
3051 let mut field = Field::new(&name, data_type.clone(), nullable);
3052 let field_metadata = intersect_metadata_for_union(
3053 fields.iter().map(|field| field.metadata()),
3054 );
3055 field.set_metadata(field_metadata);
3056 Ok((None, Arc::new(field)))
3057 })
3058 .collect::<Result<_>>()?;
3059 let union_schema_metadata = intersect_metadata_for_union(
3060 inputs.iter().map(|input| input.schema().metadata()),
3061 );
3062
3063 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3065 let schema = Arc::new(schema);
3066
3067 Ok(schema)
3068 }
3069}
3070
3071impl PartialOrd for Union {
3073 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3074 self.inputs
3075 .partial_cmp(&other.inputs)
3076 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3078 }
3079}
3080
3081#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3104pub struct DescribeTable {
3105 pub schema: Arc<Schema>,
3107 pub output_schema: DFSchemaRef,
3109}
3110
3111impl PartialOrd for DescribeTable {
3114 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3115 None
3117 }
3118}
3119
3120#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3122pub struct ExplainOption {
3123 pub verbose: bool,
3125 pub analyze: bool,
3127 pub format: ExplainFormat,
3129}
3130
3131impl Default for ExplainOption {
3132 fn default() -> Self {
3133 ExplainOption {
3134 verbose: false,
3135 analyze: false,
3136 format: ExplainFormat::Indent,
3137 }
3138 }
3139}
3140
3141impl ExplainOption {
3142 pub fn with_verbose(mut self, verbose: bool) -> Self {
3144 self.verbose = verbose;
3145 self
3146 }
3147
3148 pub fn with_analyze(mut self, analyze: bool) -> Self {
3150 self.analyze = analyze;
3151 self
3152 }
3153
3154 pub fn with_format(mut self, format: ExplainFormat) -> Self {
3156 self.format = format;
3157 self
3158 }
3159}
3160
3161#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3168pub struct Explain {
3169 pub verbose: bool,
3171 pub explain_format: ExplainFormat,
3174 pub plan: Arc<LogicalPlan>,
3176 pub stringified_plans: Vec<StringifiedPlan>,
3178 pub schema: DFSchemaRef,
3180 pub logical_optimization_succeeded: bool,
3182}
3183
3184impl PartialOrd for Explain {
3186 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3187 #[derive(PartialEq, PartialOrd)]
3188 struct ComparableExplain<'a> {
3189 pub verbose: &'a bool,
3191 pub plan: &'a Arc<LogicalPlan>,
3193 pub stringified_plans: &'a Vec<StringifiedPlan>,
3195 pub logical_optimization_succeeded: &'a bool,
3197 }
3198 let comparable_self = ComparableExplain {
3199 verbose: &self.verbose,
3200 plan: &self.plan,
3201 stringified_plans: &self.stringified_plans,
3202 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3203 };
3204 let comparable_other = ComparableExplain {
3205 verbose: &other.verbose,
3206 plan: &other.plan,
3207 stringified_plans: &other.stringified_plans,
3208 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3209 };
3210 comparable_self
3211 .partial_cmp(&comparable_other)
3212 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3214 }
3215}
3216
3217#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3220pub struct Analyze {
3221 pub verbose: bool,
3223 pub input: Arc<LogicalPlan>,
3225 pub schema: DFSchemaRef,
3227}
3228
3229impl PartialOrd for Analyze {
3231 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3232 match self.verbose.partial_cmp(&other.verbose) {
3233 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3234 cmp => cmp,
3235 }
3236 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3238 }
3239}
3240
3241#[allow(clippy::allow_attributes)]
3246#[allow(clippy::derived_hash_with_manual_eq)]
3247#[derive(Debug, Clone, Eq, Hash)]
3248pub struct Extension {
3249 pub node: Arc<dyn UserDefinedLogicalNode>,
3251}
3252
3253impl PartialEq for Extension {
3257 fn eq(&self, other: &Self) -> bool {
3258 self.node.eq(&other.node)
3259 }
3260}
3261
3262impl PartialOrd for Extension {
3263 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3264 self.node.partial_cmp(&other.node)
3265 }
3266}
3267
3268#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3270pub struct Limit {
3271 pub skip: Option<Box<Expr>>,
3273 pub fetch: Option<Box<Expr>>,
3276 pub input: Arc<LogicalPlan>,
3278}
3279
3280pub enum SkipType {
3282 Literal(usize),
3284 UnsupportedExpr,
3286}
3287
3288pub enum FetchType {
3290 Literal(Option<usize>),
3293 UnsupportedExpr,
3295}
3296
3297impl Limit {
3298 pub fn get_skip_type(&self) -> Result<SkipType> {
3300 match self.skip.as_deref() {
3301 Some(expr) => match *expr {
3302 Expr::Literal(ScalarValue::Int64(s), _) => {
3303 let s = s.unwrap_or(0);
3305 if s >= 0 {
3306 Ok(SkipType::Literal(s as usize))
3307 } else {
3308 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3309 }
3310 }
3311 _ => Ok(SkipType::UnsupportedExpr),
3312 },
3313 None => Ok(SkipType::Literal(0)),
3315 }
3316 }
3317
3318 pub fn get_fetch_type(&self) -> Result<FetchType> {
3320 match self.fetch.as_deref() {
3321 Some(expr) => match *expr {
3322 Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3323 if s >= 0 {
3324 Ok(FetchType::Literal(Some(s as usize)))
3325 } else {
3326 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3327 }
3328 }
3329 Expr::Literal(ScalarValue::Int64(None), _) => {
3330 Ok(FetchType::Literal(None))
3331 }
3332 _ => Ok(FetchType::UnsupportedExpr),
3333 },
3334 None => Ok(FetchType::Literal(None)),
3335 }
3336 }
3337}
3338
3339#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3341pub enum Distinct {
3342 All(Arc<LogicalPlan>),
3344 On(DistinctOn),
3346}
3347
3348impl Distinct {
3349 pub fn input(&self) -> &Arc<LogicalPlan> {
3351 match self {
3352 Distinct::All(input) => input,
3353 Distinct::On(DistinctOn { input, .. }) => input,
3354 }
3355 }
3356}
3357
3358#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3360pub struct DistinctOn {
3361 pub on_expr: Vec<Expr>,
3363 pub select_expr: Vec<Expr>,
3365 pub sort_expr: Option<Vec<SortExpr>>,
3369 pub input: Arc<LogicalPlan>,
3371 pub schema: DFSchemaRef,
3373}
3374
3375impl DistinctOn {
3376 pub fn try_new(
3378 on_expr: Vec<Expr>,
3379 select_expr: Vec<Expr>,
3380 sort_expr: Option<Vec<SortExpr>>,
3381 input: Arc<LogicalPlan>,
3382 ) -> Result<Self> {
3383 if on_expr.is_empty() {
3384 return plan_err!("No `ON` expressions provided");
3385 }
3386
3387 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3388 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3389 .into_iter()
3390 .collect();
3391
3392 let dfschema = DFSchema::new_with_metadata(
3393 qualified_fields,
3394 input.schema().metadata().clone(),
3395 )?;
3396
3397 let mut distinct_on = DistinctOn {
3398 on_expr,
3399 select_expr,
3400 sort_expr: None,
3401 input,
3402 schema: Arc::new(dfschema),
3403 };
3404
3405 if let Some(sort_expr) = sort_expr {
3406 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3407 }
3408
3409 Ok(distinct_on)
3410 }
3411
3412 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3416 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3417
3418 let mut matched = true;
3420 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3421 if on != &sort.expr {
3422 matched = false;
3423 break;
3424 }
3425 }
3426
3427 if self.on_expr.len() > sort_expr.len() || !matched {
3428 return plan_err!(
3429 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3430 );
3431 }
3432
3433 self.sort_expr = Some(sort_expr);
3434 Ok(self)
3435 }
3436}
3437
3438impl PartialOrd for DistinctOn {
3440 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3441 #[derive(PartialEq, PartialOrd)]
3442 struct ComparableDistinctOn<'a> {
3443 pub on_expr: &'a Vec<Expr>,
3445 pub select_expr: &'a Vec<Expr>,
3447 pub sort_expr: &'a Option<Vec<SortExpr>>,
3451 pub input: &'a Arc<LogicalPlan>,
3453 }
3454 let comparable_self = ComparableDistinctOn {
3455 on_expr: &self.on_expr,
3456 select_expr: &self.select_expr,
3457 sort_expr: &self.sort_expr,
3458 input: &self.input,
3459 };
3460 let comparable_other = ComparableDistinctOn {
3461 on_expr: &other.on_expr,
3462 select_expr: &other.select_expr,
3463 sort_expr: &other.sort_expr,
3464 input: &other.input,
3465 };
3466 comparable_self
3467 .partial_cmp(&comparable_other)
3468 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3470 }
3471}
3472
3473#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3486#[non_exhaustive]
3488pub struct Aggregate {
3489 pub input: Arc<LogicalPlan>,
3491 pub group_expr: Vec<Expr>,
3493 pub aggr_expr: Vec<Expr>,
3495 pub schema: DFSchemaRef,
3497}
3498
3499impl Aggregate {
3500 pub fn try_new(
3502 input: Arc<LogicalPlan>,
3503 group_expr: Vec<Expr>,
3504 aggr_expr: Vec<Expr>,
3505 ) -> Result<Self> {
3506 let group_expr = enumerate_grouping_sets(group_expr)?;
3507
3508 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3509
3510 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3511
3512 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3513
3514 if is_grouping_set {
3516 qualified_fields = qualified_fields
3517 .into_iter()
3518 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3519 .collect::<Vec<_>>();
3520 qualified_fields.push((
3521 None,
3522 Field::new(
3523 Self::INTERNAL_GROUPING_ID,
3524 Self::grouping_id_type(qualified_fields.len()),
3525 false,
3526 )
3527 .into(),
3528 ));
3529 }
3530
3531 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3532
3533 let schema = DFSchema::new_with_metadata(
3534 qualified_fields,
3535 input.schema().metadata().clone(),
3536 )?;
3537
3538 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3539 }
3540
3541 #[expect(clippy::needless_pass_by_value)]
3547 pub fn try_new_with_schema(
3548 input: Arc<LogicalPlan>,
3549 group_expr: Vec<Expr>,
3550 aggr_expr: Vec<Expr>,
3551 schema: DFSchemaRef,
3552 ) -> Result<Self> {
3553 if group_expr.is_empty() && aggr_expr.is_empty() {
3554 return plan_err!(
3555 "Aggregate requires at least one grouping or aggregate expression. \
3556 Aggregate without grouping expressions nor aggregate expressions is \
3557 logically equivalent to, but less efficient than, VALUES producing \
3558 single row. Please use VALUES instead."
3559 );
3560 }
3561 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3562 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3563 return plan_err!(
3564 "Aggregate schema has wrong number of fields. Expected {} got {}",
3565 group_expr_count + aggr_expr.len(),
3566 schema.fields().len()
3567 );
3568 }
3569
3570 let aggregate_func_dependencies =
3571 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3572 let new_schema = schema.as_ref().clone();
3573 let schema = Arc::new(
3574 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3575 );
3576 Ok(Self {
3577 input,
3578 group_expr,
3579 aggr_expr,
3580 schema,
3581 })
3582 }
3583
3584 fn is_grouping_set(&self) -> bool {
3585 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3586 }
3587
3588 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3590 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3591 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3592 });
3593 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3594 if self.is_grouping_set() {
3595 exprs.push(&INTERNAL_ID_EXPR);
3596 }
3597 exprs.extend(self.aggr_expr.iter());
3598 debug_assert!(exprs.len() == self.schema.fields().len());
3599 Ok(exprs)
3600 }
3601
3602 pub fn group_expr_len(&self) -> Result<usize> {
3606 grouping_set_expr_count(&self.group_expr)
3607 }
3608
3609 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3614 if group_exprs <= 8 {
3615 DataType::UInt8
3616 } else if group_exprs <= 16 {
3617 DataType::UInt16
3618 } else if group_exprs <= 32 {
3619 DataType::UInt32
3620 } else {
3621 DataType::UInt64
3622 }
3623 }
3624
3625 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3643}
3644
3645impl PartialOrd for Aggregate {
3647 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3648 match self.input.partial_cmp(&other.input) {
3649 Some(Ordering::Equal) => {
3650 match self.group_expr.partial_cmp(&other.group_expr) {
3651 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3652 cmp => cmp,
3653 }
3654 }
3655 cmp => cmp,
3656 }
3657 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3659 }
3660}
3661
3662fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3664 group_expr
3665 .iter()
3666 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3667}
3668
3669fn calc_func_dependencies_for_aggregate(
3671 group_expr: &[Expr],
3673 input: &LogicalPlan,
3675 aggr_schema: &DFSchema,
3677) -> Result<FunctionalDependencies> {
3678 if !contains_grouping_set(group_expr) {
3684 let group_by_expr_names = group_expr
3685 .iter()
3686 .map(|item| item.schema_name().to_string())
3687 .collect::<IndexSet<_>>()
3688 .into_iter()
3689 .collect::<Vec<_>>();
3690 let aggregate_func_dependencies = aggregate_functional_dependencies(
3691 input.schema(),
3692 &group_by_expr_names,
3693 aggr_schema,
3694 );
3695 Ok(aggregate_func_dependencies)
3696 } else {
3697 Ok(FunctionalDependencies::empty())
3698 }
3699}
3700
3701fn calc_func_dependencies_for_project(
3704 exprs: &[Expr],
3705 input: &LogicalPlan,
3706) -> Result<FunctionalDependencies> {
3707 let input_fields = input.schema().field_names();
3708 let proj_indices = exprs
3710 .iter()
3711 .map(|expr| match expr {
3712 #[expect(deprecated)]
3713 Expr::Wildcard { qualifier, options } => {
3714 let wildcard_fields = exprlist_to_fields(
3715 vec![&Expr::Wildcard {
3716 qualifier: qualifier.clone(),
3717 options: options.clone(),
3718 }],
3719 input,
3720 )?;
3721 Ok::<_, DataFusionError>(
3722 wildcard_fields
3723 .into_iter()
3724 .filter_map(|(qualifier, f)| {
3725 let flat_name = qualifier
3726 .map(|t| format!("{}.{}", t, f.name()))
3727 .unwrap_or_else(|| f.name().clone());
3728 input_fields.iter().position(|item| *item == flat_name)
3729 })
3730 .collect::<Vec<_>>(),
3731 )
3732 }
3733 Expr::Alias(alias) => {
3734 let name = format!("{}", alias.expr);
3735 Ok(input_fields
3736 .iter()
3737 .position(|item| *item == name)
3738 .map(|i| vec![i])
3739 .unwrap_or(vec![]))
3740 }
3741 _ => {
3742 let name = format!("{expr}");
3743 Ok(input_fields
3744 .iter()
3745 .position(|item| *item == name)
3746 .map(|i| vec![i])
3747 .unwrap_or(vec![]))
3748 }
3749 })
3750 .collect::<Result<Vec<_>>>()?
3751 .into_iter()
3752 .flatten()
3753 .collect::<Vec<_>>();
3754
3755 Ok(input
3756 .schema()
3757 .functional_dependencies()
3758 .project_functional_dependencies(&proj_indices, exprs.len()))
3759}
3760
3761#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3763pub struct Sort {
3764 pub expr: Vec<SortExpr>,
3766 pub input: Arc<LogicalPlan>,
3768 pub fetch: Option<usize>,
3770}
3771
3772#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3774pub struct Join {
3775 pub left: Arc<LogicalPlan>,
3777 pub right: Arc<LogicalPlan>,
3779 pub on: Vec<(Expr, Expr)>,
3781 pub filter: Option<Expr>,
3783 pub join_type: JoinType,
3785 pub join_constraint: JoinConstraint,
3787 pub schema: DFSchemaRef,
3789 pub null_equality: NullEquality,
3791 pub null_aware: bool,
3799}
3800
3801impl Join {
3802 #[expect(clippy::too_many_arguments)]
3822 pub fn try_new(
3823 left: Arc<LogicalPlan>,
3824 right: Arc<LogicalPlan>,
3825 on: Vec<(Expr, Expr)>,
3826 filter: Option<Expr>,
3827 join_type: JoinType,
3828 join_constraint: JoinConstraint,
3829 null_equality: NullEquality,
3830 null_aware: bool,
3831 ) -> Result<Self> {
3832 let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3833
3834 Ok(Join {
3835 left,
3836 right,
3837 on,
3838 filter,
3839 join_type,
3840 join_constraint,
3841 schema: Arc::new(join_schema),
3842 null_equality,
3843 null_aware,
3844 })
3845 }
3846
3847 pub fn try_new_with_project_input(
3850 original: &LogicalPlan,
3851 left: Arc<LogicalPlan>,
3852 right: Arc<LogicalPlan>,
3853 column_on: (Vec<Column>, Vec<Column>),
3854 ) -> Result<(Self, bool)> {
3855 let original_join = match original {
3856 LogicalPlan::Join(join) => join,
3857 _ => return plan_err!("Could not create join with project input"),
3858 };
3859
3860 let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3861 let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3862
3863 let mut requalified = false;
3864
3865 if original_join.join_type == JoinType::Inner
3868 || original_join.join_type == JoinType::Left
3869 || original_join.join_type == JoinType::Right
3870 || original_join.join_type == JoinType::Full
3871 {
3872 (left_sch, right_sch, requalified) =
3873 requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3874 }
3875
3876 let on: Vec<(Expr, Expr)> = column_on
3877 .0
3878 .into_iter()
3879 .zip(column_on.1)
3880 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3881 .collect();
3882
3883 let join_schema = build_join_schema(
3884 left_sch.schema(),
3885 right_sch.schema(),
3886 &original_join.join_type,
3887 )?;
3888
3889 Ok((
3890 Join {
3891 left,
3892 right,
3893 on,
3894 filter: original_join.filter.clone(),
3895 join_type: original_join.join_type,
3896 join_constraint: original_join.join_constraint,
3897 schema: Arc::new(join_schema),
3898 null_equality: original_join.null_equality,
3899 null_aware: original_join.null_aware,
3900 },
3901 requalified,
3902 ))
3903 }
3904}
3905
3906impl PartialOrd for Join {
3908 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3909 #[derive(PartialEq, PartialOrd)]
3910 struct ComparableJoin<'a> {
3911 pub left: &'a Arc<LogicalPlan>,
3913 pub right: &'a Arc<LogicalPlan>,
3915 pub on: &'a Vec<(Expr, Expr)>,
3917 pub filter: &'a Option<Expr>,
3919 pub join_type: &'a JoinType,
3921 pub join_constraint: &'a JoinConstraint,
3923 pub null_equality: &'a NullEquality,
3925 }
3926 let comparable_self = ComparableJoin {
3927 left: &self.left,
3928 right: &self.right,
3929 on: &self.on,
3930 filter: &self.filter,
3931 join_type: &self.join_type,
3932 join_constraint: &self.join_constraint,
3933 null_equality: &self.null_equality,
3934 };
3935 let comparable_other = ComparableJoin {
3936 left: &other.left,
3937 right: &other.right,
3938 on: &other.on,
3939 filter: &other.filter,
3940 join_type: &other.join_type,
3941 join_constraint: &other.join_constraint,
3942 null_equality: &other.null_equality,
3943 };
3944 comparable_self
3945 .partial_cmp(&comparable_other)
3946 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3948 }
3949}
3950
3951#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3953pub struct Subquery {
3954 pub subquery: Arc<LogicalPlan>,
3956 pub outer_ref_columns: Vec<Expr>,
3958 pub spans: Spans,
3960}
3961
3962impl Normalizeable for Subquery {
3963 fn can_normalize(&self) -> bool {
3964 false
3965 }
3966}
3967
3968impl NormalizeEq for Subquery {
3969 fn normalize_eq(&self, other: &Self) -> bool {
3970 *self.subquery == *other.subquery
3972 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3973 && self
3974 .outer_ref_columns
3975 .iter()
3976 .zip(other.outer_ref_columns.iter())
3977 .all(|(a, b)| a.normalize_eq(b))
3978 }
3979}
3980
3981impl Subquery {
3982 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3983 match plan {
3984 Expr::ScalarSubquery(it) => Ok(it),
3985 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3986 _ => plan_err!("Could not coerce into ScalarSubquery!"),
3987 }
3988 }
3989
3990 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3991 Subquery {
3992 subquery: plan,
3993 outer_ref_columns: self.outer_ref_columns.clone(),
3994 spans: Spans::new(),
3995 }
3996 }
3997}
3998
3999impl Debug for Subquery {
4000 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4001 write!(f, "<subquery>")
4002 }
4003}
4004
4005#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
4011pub enum Partitioning {
4012 RoundRobinBatch(usize),
4014 Hash(Vec<Expr>, usize),
4017 DistributeBy(Vec<Expr>),
4019}
4020
4021#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4041pub struct ColumnUnnestList {
4042 pub output_column: Column,
4043 pub depth: usize,
4044}
4045
4046impl Display for ColumnUnnestList {
4047 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4048 write!(f, "{}|depth={}", self.output_column, self.depth)
4049 }
4050}
4051
4052#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4055pub struct Unnest {
4056 pub input: Arc<LogicalPlan>,
4058 pub exec_columns: Vec<Column>,
4060 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4063 pub struct_type_columns: Vec<usize>,
4066 pub dependency_indices: Vec<usize>,
4069 pub schema: DFSchemaRef,
4071 pub options: UnnestOptions,
4073}
4074
4075impl PartialOrd for Unnest {
4077 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4078 #[derive(PartialEq, PartialOrd)]
4079 struct ComparableUnnest<'a> {
4080 pub input: &'a Arc<LogicalPlan>,
4082 pub exec_columns: &'a Vec<Column>,
4084 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4087 pub struct_type_columns: &'a Vec<usize>,
4090 pub dependency_indices: &'a Vec<usize>,
4093 pub options: &'a UnnestOptions,
4095 }
4096 let comparable_self = ComparableUnnest {
4097 input: &self.input,
4098 exec_columns: &self.exec_columns,
4099 list_type_columns: &self.list_type_columns,
4100 struct_type_columns: &self.struct_type_columns,
4101 dependency_indices: &self.dependency_indices,
4102 options: &self.options,
4103 };
4104 let comparable_other = ComparableUnnest {
4105 input: &other.input,
4106 exec_columns: &other.exec_columns,
4107 list_type_columns: &other.list_type_columns,
4108 struct_type_columns: &other.struct_type_columns,
4109 dependency_indices: &other.dependency_indices,
4110 options: &other.options,
4111 };
4112 comparable_self
4113 .partial_cmp(&comparable_other)
4114 .filter(|cmp| *cmp != Ordering::Equal || self == other)
4116 }
4117}
4118
4119impl Unnest {
4120 pub fn try_new(
4121 input: Arc<LogicalPlan>,
4122 exec_columns: Vec<Column>,
4123 options: UnnestOptions,
4124 ) -> Result<Self> {
4125 if exec_columns.is_empty() {
4126 return plan_err!("unnest plan requires at least 1 column to unnest");
4127 }
4128
4129 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4130 let mut struct_columns = vec![];
4131 let indices_to_unnest = exec_columns
4132 .iter()
4133 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4134 .collect::<Result<HashMap<usize, &Column>>>()?;
4135
4136 let input_schema = input.schema();
4137
4138 let mut dependency_indices = vec![];
4139 let fields = input_schema
4155 .iter()
4156 .enumerate()
4157 .map(|(index, (original_qualifier, original_field))| {
4158 match indices_to_unnest.get(&index) {
4159 Some(column_to_unnest) => {
4160 let recursions_on_column = options
4161 .recursions
4162 .iter()
4163 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4164 .collect::<Vec<_>>();
4165 let mut transformed_columns = recursions_on_column
4166 .iter()
4167 .map(|r| {
4168 list_columns.push((
4169 index,
4170 ColumnUnnestList {
4171 output_column: r.output_column.clone(),
4172 depth: r.depth,
4173 },
4174 ));
4175 Ok(get_unnested_columns(
4176 &r.output_column.name,
4177 original_field.data_type(),
4178 r.depth,
4179 )?
4180 .into_iter()
4181 .next()
4182 .unwrap()) })
4184 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4185 if transformed_columns.is_empty() {
4186 transformed_columns = get_unnested_columns(
4187 &column_to_unnest.name,
4188 original_field.data_type(),
4189 1,
4190 )?;
4191 match original_field.data_type() {
4192 DataType::Struct(_) => {
4193 struct_columns.push(index);
4194 }
4195 DataType::List(_)
4196 | DataType::FixedSizeList(_, _)
4197 | DataType::LargeList(_) => {
4198 list_columns.push((
4199 index,
4200 ColumnUnnestList {
4201 output_column: Column::from_name(
4202 &column_to_unnest.name,
4203 ),
4204 depth: 1,
4205 },
4206 ));
4207 }
4208 _ => {}
4209 };
4210 }
4211
4212 dependency_indices.extend(std::iter::repeat_n(
4214 index,
4215 transformed_columns.len(),
4216 ));
4217 Ok(transformed_columns
4218 .iter()
4219 .map(|(col, field)| {
4220 (col.relation.to_owned(), field.to_owned())
4221 })
4222 .collect())
4223 }
4224 None => {
4225 dependency_indices.push(index);
4226 Ok(vec![(
4227 original_qualifier.cloned(),
4228 Arc::clone(original_field),
4229 )])
4230 }
4231 }
4232 })
4233 .collect::<Result<Vec<_>>>()?
4234 .into_iter()
4235 .flatten()
4236 .collect::<Vec<_>>();
4237
4238 let metadata = input_schema.metadata().clone();
4239 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4240 let deps = input_schema.functional_dependencies().clone();
4242 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4243
4244 Ok(Unnest {
4245 input,
4246 exec_columns,
4247 list_type_columns: list_columns,
4248 struct_type_columns: struct_columns,
4249 dependency_indices,
4250 schema,
4251 options,
4252 })
4253 }
4254}
4255
4256fn get_unnested_columns(
4265 col_name: &String,
4266 data_type: &DataType,
4267 depth: usize,
4268) -> Result<Vec<(Column, Arc<Field>)>> {
4269 let mut qualified_columns = Vec::with_capacity(1);
4270
4271 match data_type {
4272 DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4273 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4274 let new_field = Arc::new(Field::new(
4275 col_name, data_type,
4276 true,
4279 ));
4280 let column = Column::from_name(col_name);
4281 qualified_columns.push((column, new_field));
4283 }
4284 DataType::Struct(fields) => {
4285 qualified_columns.extend(fields.iter().map(|f| {
4286 let new_name = format!("{}.{}", col_name, f.name());
4287 let column = Column::from_name(&new_name);
4288 let new_field = f.as_ref().clone().with_name(new_name);
4289 (column, Arc::new(new_field))
4291 }))
4292 }
4293 _ => {
4294 return internal_err!("trying to unnest on invalid data type {data_type}");
4295 }
4296 };
4297 Ok(qualified_columns)
4298}
4299
4300fn get_unnested_list_datatype_recursive(
4303 data_type: &DataType,
4304 depth: usize,
4305) -> Result<DataType> {
4306 match data_type {
4307 DataType::List(field)
4308 | DataType::FixedSizeList(field, _)
4309 | DataType::LargeList(field) => {
4310 if depth == 1 {
4311 return Ok(field.data_type().clone());
4312 }
4313 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4314 }
4315 _ => {}
4316 };
4317
4318 internal_err!("trying to unnest on invalid data type {data_type}")
4319}
4320
4321#[cfg(test)]
4322mod tests {
4323 use super::*;
4324 use crate::builder::LogicalTableSource;
4325 use crate::logical_plan::table_scan;
4326 use crate::select_expr::SelectExpr;
4327 use crate::test::function_stub::{count, count_udaf};
4328 use crate::{
4329 GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder,
4330 scalar_subquery,
4331 };
4332 use datafusion_common::metadata::ScalarAndMetadata;
4333 use datafusion_common::tree_node::{
4334 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4335 };
4336 use datafusion_common::{Constraint, ScalarValue, not_impl_err};
4337 use insta::{assert_debug_snapshot, assert_snapshot};
4338 use std::hash::DefaultHasher;
4339
4340 fn employee_schema() -> Schema {
4341 Schema::new(vec![
4342 Field::new("id", DataType::Int32, false),
4343 Field::new("first_name", DataType::Utf8, false),
4344 Field::new("last_name", DataType::Utf8, false),
4345 Field::new("state", DataType::Utf8, false),
4346 Field::new("salary", DataType::Int32, false),
4347 ])
4348 }
4349
4350 fn display_plan() -> Result<LogicalPlan> {
4351 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4352 .build()?;
4353
4354 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4355 .filter(in_subquery(col("state"), Arc::new(plan1)))?
4356 .project(vec![col("id")])?
4357 .build()
4358 }
4359
4360 #[test]
4361 fn test_display_indent() -> Result<()> {
4362 let plan = display_plan()?;
4363
4364 assert_snapshot!(plan.display_indent(), @r"
4365 Projection: employee_csv.id
4366 Filter: employee_csv.state IN (<subquery>)
4367 Subquery:
4368 TableScan: employee_csv projection=[state]
4369 TableScan: employee_csv projection=[id, state]
4370 ");
4371 Ok(())
4372 }
4373
4374 #[test]
4375 fn test_display_indent_schema() -> Result<()> {
4376 let plan = display_plan()?;
4377
4378 assert_snapshot!(plan.display_indent_schema(), @r"
4379 Projection: employee_csv.id [id:Int32]
4380 Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4381 Subquery: [state:Utf8]
4382 TableScan: employee_csv projection=[state] [state:Utf8]
4383 TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4384 ");
4385 Ok(())
4386 }
4387
4388 #[test]
4389 fn test_display_subquery_alias() -> Result<()> {
4390 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4391 .build()?;
4392 let plan1 = Arc::new(plan1);
4393
4394 let plan =
4395 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4396 .project(vec![col("id"), exists(plan1).alias("exists")])?
4397 .build();
4398
4399 assert_snapshot!(plan?.display_indent(), @r"
4400 Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4401 Subquery:
4402 TableScan: employee_csv projection=[state]
4403 TableScan: employee_csv projection=[id, state]
4404 ");
4405 Ok(())
4406 }
4407
4408 #[test]
4409 fn test_display_graphviz() -> Result<()> {
4410 let plan = display_plan()?;
4411
4412 assert_snapshot!(plan.display_graphviz(), @r#"
4415 // Begin DataFusion GraphViz Plan,
4416 // display it online here: https://dreampuf.github.io/GraphvizOnline
4417
4418 digraph {
4419 subgraph cluster_1
4420 {
4421 graph[label="LogicalPlan"]
4422 2[shape=box label="Projection: employee_csv.id"]
4423 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4424 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4425 4[shape=box label="Subquery:"]
4426 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4427 5[shape=box label="TableScan: employee_csv projection=[state]"]
4428 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4429 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4430 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4431 }
4432 subgraph cluster_7
4433 {
4434 graph[label="Detailed LogicalPlan"]
4435 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4436 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4437 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4438 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4439 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4440 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4441 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4442 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4443 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4444 }
4445 }
4446 // End DataFusion GraphViz Plan
4447 "#);
4448 Ok(())
4449 }
4450
4451 #[test]
4452 fn test_display_pg_json() -> Result<()> {
4453 let plan = display_plan()?;
4454
4455 assert_snapshot!(plan.display_pg_json(), @r#"
4456 [
4457 {
4458 "Plan": {
4459 "Expressions": [
4460 "employee_csv.id"
4461 ],
4462 "Node Type": "Projection",
4463 "Output": [
4464 "id"
4465 ],
4466 "Plans": [
4467 {
4468 "Condition": "employee_csv.state IN (<subquery>)",
4469 "Node Type": "Filter",
4470 "Output": [
4471 "id",
4472 "state"
4473 ],
4474 "Plans": [
4475 {
4476 "Node Type": "Subquery",
4477 "Output": [
4478 "state"
4479 ],
4480 "Plans": [
4481 {
4482 "Node Type": "TableScan",
4483 "Output": [
4484 "state"
4485 ],
4486 "Plans": [],
4487 "Relation Name": "employee_csv"
4488 }
4489 ]
4490 },
4491 {
4492 "Node Type": "TableScan",
4493 "Output": [
4494 "id",
4495 "state"
4496 ],
4497 "Plans": [],
4498 "Relation Name": "employee_csv"
4499 }
4500 ]
4501 }
4502 ]
4503 }
4504 }
4505 ]
4506 "#);
4507 Ok(())
4508 }
4509
4510 #[derive(Debug, Default)]
4512 struct OkVisitor {
4513 strings: Vec<String>,
4514 }
4515
4516 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4517 type Node = LogicalPlan;
4518
4519 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4520 let s = match plan {
4521 LogicalPlan::Projection { .. } => "pre_visit Projection",
4522 LogicalPlan::Filter { .. } => "pre_visit Filter",
4523 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4524 _ => {
4525 return not_impl_err!("unknown plan type");
4526 }
4527 };
4528
4529 self.strings.push(s.into());
4530 Ok(TreeNodeRecursion::Continue)
4531 }
4532
4533 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4534 let s = match plan {
4535 LogicalPlan::Projection { .. } => "post_visit Projection",
4536 LogicalPlan::Filter { .. } => "post_visit Filter",
4537 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4538 _ => {
4539 return not_impl_err!("unknown plan type");
4540 }
4541 };
4542
4543 self.strings.push(s.into());
4544 Ok(TreeNodeRecursion::Continue)
4545 }
4546 }
4547
4548 #[test]
4549 fn visit_order() {
4550 let mut visitor = OkVisitor::default();
4551 let plan = test_plan();
4552 let res = plan.visit_with_subqueries(&mut visitor);
4553 assert!(res.is_ok());
4554
4555 assert_debug_snapshot!(visitor.strings, @r#"
4556 [
4557 "pre_visit Projection",
4558 "pre_visit Filter",
4559 "pre_visit TableScan",
4560 "post_visit TableScan",
4561 "post_visit Filter",
4562 "post_visit Projection",
4563 ]
4564 "#);
4565 }
4566
4567 #[derive(Debug, Default)]
4568 struct OptionalCounter {
4570 val: Option<usize>,
4571 }
4572
4573 impl OptionalCounter {
4574 fn new(val: usize) -> Self {
4575 Self { val: Some(val) }
4576 }
4577 fn dec(&mut self) -> bool {
4579 if Some(0) == self.val {
4580 true
4581 } else {
4582 self.val = self.val.take().map(|i| i - 1);
4583 false
4584 }
4585 }
4586 }
4587
4588 #[derive(Debug, Default)]
4589 struct StoppingVisitor {
4591 inner: OkVisitor,
4592 return_false_from_pre_in: OptionalCounter,
4594 return_false_from_post_in: OptionalCounter,
4596 }
4597
4598 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4599 type Node = LogicalPlan;
4600
4601 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4602 if self.return_false_from_pre_in.dec() {
4603 return Ok(TreeNodeRecursion::Stop);
4604 }
4605 self.inner.f_down(plan)?;
4606
4607 Ok(TreeNodeRecursion::Continue)
4608 }
4609
4610 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4611 if self.return_false_from_post_in.dec() {
4612 return Ok(TreeNodeRecursion::Stop);
4613 }
4614
4615 self.inner.f_up(plan)
4616 }
4617 }
4618
4619 #[test]
4621 fn early_stopping_pre_visit() {
4622 let mut visitor = StoppingVisitor {
4623 return_false_from_pre_in: OptionalCounter::new(2),
4624 ..Default::default()
4625 };
4626 let plan = test_plan();
4627 let res = plan.visit_with_subqueries(&mut visitor);
4628 assert!(res.is_ok());
4629
4630 assert_debug_snapshot!(
4631 visitor.inner.strings,
4632 @r#"
4633 [
4634 "pre_visit Projection",
4635 "pre_visit Filter",
4636 ]
4637 "#
4638 );
4639 }
4640
4641 #[test]
4642 fn early_stopping_post_visit() {
4643 let mut visitor = StoppingVisitor {
4644 return_false_from_post_in: OptionalCounter::new(1),
4645 ..Default::default()
4646 };
4647 let plan = test_plan();
4648 let res = plan.visit_with_subqueries(&mut visitor);
4649 assert!(res.is_ok());
4650
4651 assert_debug_snapshot!(
4652 visitor.inner.strings,
4653 @r#"
4654 [
4655 "pre_visit Projection",
4656 "pre_visit Filter",
4657 "pre_visit TableScan",
4658 "post_visit TableScan",
4659 ]
4660 "#
4661 );
4662 }
4663
4664 #[derive(Debug, Default)]
4665 struct ErrorVisitor {
4667 inner: OkVisitor,
4668 return_error_from_pre_in: OptionalCounter,
4670 return_error_from_post_in: OptionalCounter,
4672 }
4673
4674 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4675 type Node = LogicalPlan;
4676
4677 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4678 if self.return_error_from_pre_in.dec() {
4679 return not_impl_err!("Error in pre_visit");
4680 }
4681
4682 self.inner.f_down(plan)
4683 }
4684
4685 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4686 if self.return_error_from_post_in.dec() {
4687 return not_impl_err!("Error in post_visit");
4688 }
4689
4690 self.inner.f_up(plan)
4691 }
4692 }
4693
4694 #[test]
4695 fn error_pre_visit() {
4696 let mut visitor = ErrorVisitor {
4697 return_error_from_pre_in: OptionalCounter::new(2),
4698 ..Default::default()
4699 };
4700 let plan = test_plan();
4701 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4702 assert_snapshot!(
4703 res.strip_backtrace(),
4704 @"This feature is not implemented: Error in pre_visit"
4705 );
4706 assert_debug_snapshot!(
4707 visitor.inner.strings,
4708 @r#"
4709 [
4710 "pre_visit Projection",
4711 "pre_visit Filter",
4712 ]
4713 "#
4714 );
4715 }
4716
4717 #[test]
4718 fn error_post_visit() {
4719 let mut visitor = ErrorVisitor {
4720 return_error_from_post_in: OptionalCounter::new(1),
4721 ..Default::default()
4722 };
4723 let plan = test_plan();
4724 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4725 assert_snapshot!(
4726 res.strip_backtrace(),
4727 @"This feature is not implemented: Error in post_visit"
4728 );
4729 assert_debug_snapshot!(
4730 visitor.inner.strings,
4731 @r#"
4732 [
4733 "pre_visit Projection",
4734 "pre_visit Filter",
4735 "pre_visit TableScan",
4736 "post_visit TableScan",
4737 ]
4738 "#
4739 );
4740 }
4741
4742 #[test]
4743 fn test_partial_eq_hash_and_partial_ord() {
4744 let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4745 produce_one_row: true,
4746 schema: Arc::new(DFSchema::empty()),
4747 }));
4748
4749 let count_window_function = |schema| {
4750 Window::try_new_with_schema(
4751 vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4752 WindowFunctionDefinition::AggregateUDF(count_udaf()),
4753 vec![],
4754 )))],
4755 Arc::clone(&empty_values),
4756 Arc::new(schema),
4757 )
4758 .unwrap()
4759 };
4760
4761 let schema_without_metadata = || {
4762 DFSchema::from_unqualified_fields(
4763 vec![Field::new("count", DataType::Int64, false)].into(),
4764 HashMap::new(),
4765 )
4766 .unwrap()
4767 };
4768
4769 let schema_with_metadata = || {
4770 DFSchema::from_unqualified_fields(
4771 vec![Field::new("count", DataType::Int64, false)].into(),
4772 [("key".to_string(), "value".to_string())].into(),
4773 )
4774 .unwrap()
4775 };
4776
4777 let f = count_window_function(schema_without_metadata());
4779
4780 let f2 = count_window_function(schema_without_metadata());
4782 assert_eq!(f, f2);
4783 assert_eq!(hash(&f), hash(&f2));
4784 assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4785
4786 let o = count_window_function(schema_with_metadata());
4788 assert_ne!(f, o);
4789 assert_ne!(hash(&f), hash(&o)); assert_eq!(f.partial_cmp(&o), None);
4791 }
4792
4793 fn hash<T: Hash>(value: &T) -> u64 {
4794 let hasher = &mut DefaultHasher::new();
4795 value.hash(hasher);
4796 hasher.finish()
4797 }
4798
4799 #[test]
4800 fn projection_expr_schema_mismatch() -> Result<()> {
4801 let empty_schema = Arc::new(DFSchema::empty());
4802 let p = Projection::try_new_with_schema(
4803 vec![col("a")],
4804 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4805 produce_one_row: false,
4806 schema: Arc::clone(&empty_schema),
4807 })),
4808 empty_schema,
4809 );
4810 assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4811 Ok(())
4812 }
4813
4814 fn test_plan() -> LogicalPlan {
4815 let schema = Schema::new(vec![
4816 Field::new("id", DataType::Int32, false),
4817 Field::new("state", DataType::Utf8, false),
4818 ]);
4819
4820 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4821 .unwrap()
4822 .filter(col("state").eq(lit("CO")))
4823 .unwrap()
4824 .project(vec![col("id")])
4825 .unwrap()
4826 .build()
4827 .unwrap()
4828 }
4829
4830 #[test]
4831 fn test_replace_invalid_placeholder() {
4832 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4834
4835 let plan = table_scan(TableReference::none(), &schema, None)
4836 .unwrap()
4837 .filter(col("id").eq(placeholder("")))
4838 .unwrap()
4839 .build()
4840 .unwrap();
4841
4842 let param_values = vec![ScalarValue::Int32(Some(42))];
4843 plan.replace_params_with_values(¶m_values.clone().into())
4844 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4845
4846 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4848
4849 let plan = table_scan(TableReference::none(), &schema, None)
4850 .unwrap()
4851 .filter(col("id").eq(placeholder("$0")))
4852 .unwrap()
4853 .build()
4854 .unwrap();
4855
4856 plan.replace_params_with_values(¶m_values.clone().into())
4857 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4858
4859 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4861
4862 let plan = table_scan(TableReference::none(), &schema, None)
4863 .unwrap()
4864 .filter(col("id").eq(placeholder("$00")))
4865 .unwrap()
4866 .build()
4867 .unwrap();
4868
4869 plan.replace_params_with_values(¶m_values.into())
4870 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4871 }
4872
4873 #[test]
4874 fn test_replace_placeholder_mismatched_metadata() {
4875 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4876
4877 let plan = table_scan(TableReference::none(), &schema, None)
4879 .unwrap()
4880 .filter(col("id").eq(placeholder("$1")))
4881 .unwrap()
4882 .build()
4883 .unwrap();
4884 let prepared_builder = LogicalPlanBuilder::new(plan)
4885 .prepare(
4886 "".to_string(),
4887 vec![Field::new("", DataType::Int32, true).into()],
4888 )
4889 .unwrap();
4890
4891 let mut scalar_meta = HashMap::new();
4893 scalar_meta.insert("some_key".to_string(), "some_value".to_string());
4894 let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
4895 ScalarValue::Int32(Some(42)),
4896 Some(scalar_meta.into()),
4897 )]);
4898 prepared_builder
4899 .plan()
4900 .clone()
4901 .with_param_values(param_values)
4902 .expect_err("prepared field metadata mismatch unexpectedly succeeded");
4903 }
4904
4905 #[test]
4906 fn test_replace_placeholder_empty_relation_valid_schema() {
4907 let plan = LogicalPlanBuilder::empty(false)
4909 .project(vec![
4910 SelectExpr::from(placeholder("$1")),
4911 SelectExpr::from(placeholder("$2")),
4912 ])
4913 .unwrap()
4914 .build()
4915 .unwrap();
4916
4917 assert_snapshot!(plan.display_indent_schema(), @r"
4919 Projection: $1, $2 [$1:Null;N, $2:Null;N]
4920 EmptyRelation: rows=0 []
4921 ");
4922
4923 let plan = plan
4924 .with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
4925 .unwrap();
4926
4927 assert_snapshot!(plan.display_indent_schema(), @r#"
4929 Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
4930 EmptyRelation: rows=0 []
4931 "#);
4932 }
4933
4934 #[test]
4935 fn test_nullable_schema_after_grouping_set() {
4936 let schema = Schema::new(vec![
4937 Field::new("foo", DataType::Int32, false),
4938 Field::new("bar", DataType::Int32, false),
4939 ]);
4940
4941 let plan = table_scan(TableReference::none(), &schema, None)
4942 .unwrap()
4943 .aggregate(
4944 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4945 vec![col("foo")],
4946 vec![col("bar")],
4947 ]))],
4948 vec![count(lit(true))],
4949 )
4950 .unwrap()
4951 .build()
4952 .unwrap();
4953
4954 let output_schema = plan.schema();
4955
4956 assert!(
4957 output_schema
4958 .field_with_name(None, "foo")
4959 .unwrap()
4960 .is_nullable(),
4961 );
4962 assert!(
4963 output_schema
4964 .field_with_name(None, "bar")
4965 .unwrap()
4966 .is_nullable()
4967 );
4968 }
4969
4970 #[test]
4971 fn test_filter_is_scalar() {
4972 let schema =
4974 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4975
4976 let source = Arc::new(LogicalTableSource::new(schema));
4977 let schema = Arc::new(
4978 DFSchema::try_from_qualified_schema(
4979 TableReference::bare("tab"),
4980 &source.schema(),
4981 )
4982 .unwrap(),
4983 );
4984 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4985 table_name: TableReference::bare("tab"),
4986 source: Arc::clone(&source) as Arc<dyn TableSource>,
4987 projection: None,
4988 projected_schema: Arc::clone(&schema),
4989 filters: vec![],
4990 fetch: None,
4991 }));
4992 let col = schema.field_names()[0].clone();
4993
4994 let filter = Filter::try_new(
4995 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4996 scan,
4997 )
4998 .unwrap();
4999 assert!(!filter.is_scalar());
5000 let unique_schema = Arc::new(
5001 schema
5002 .as_ref()
5003 .clone()
5004 .with_functional_dependencies(
5005 FunctionalDependencies::new_from_constraints(
5006 Some(&Constraints::new_unverified(vec![Constraint::Unique(
5007 vec![0],
5008 )])),
5009 1,
5010 ),
5011 )
5012 .unwrap(),
5013 );
5014 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5015 table_name: TableReference::bare("tab"),
5016 source,
5017 projection: None,
5018 projected_schema: Arc::clone(&unique_schema),
5019 filters: vec![],
5020 fetch: None,
5021 }));
5022 let col = schema.field_names()[0].clone();
5023
5024 let filter =
5025 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
5026 assert!(filter.is_scalar());
5027 }
5028
5029 #[test]
5030 fn test_transform_explain() {
5031 let schema = Schema::new(vec![
5032 Field::new("foo", DataType::Int32, false),
5033 Field::new("bar", DataType::Int32, false),
5034 ]);
5035
5036 let plan = table_scan(TableReference::none(), &schema, None)
5037 .unwrap()
5038 .explain(false, false)
5039 .unwrap()
5040 .build()
5041 .unwrap();
5042
5043 let external_filter = col("foo").eq(lit(true));
5044
5045 let plan = plan
5048 .transform(|plan| match plan {
5049 LogicalPlan::TableScan(table) => {
5050 let filter = Filter::try_new(
5051 external_filter.clone(),
5052 Arc::new(LogicalPlan::TableScan(table)),
5053 )
5054 .unwrap();
5055 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
5056 }
5057 x => Ok(Transformed::no(x)),
5058 })
5059 .data()
5060 .unwrap();
5061
5062 let actual = format!("{}", plan.display_indent());
5063 assert_snapshot!(actual, @r"
5064 Explain
5065 Filter: foo = Boolean(true)
5066 TableScan: ?table?
5067 ")
5068 }
5069
5070 #[test]
5071 fn test_plan_partial_ord() {
5072 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5073 produce_one_row: false,
5074 schema: Arc::new(DFSchema::empty()),
5075 });
5076
5077 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5078 schema: Arc::new(Schema::new(vec![Field::new(
5079 "foo",
5080 DataType::Int32,
5081 false,
5082 )])),
5083 output_schema: DFSchemaRef::new(DFSchema::empty()),
5084 });
5085
5086 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5087 schema: Arc::new(Schema::new(vec![Field::new(
5088 "foo",
5089 DataType::Int32,
5090 false,
5091 )])),
5092 output_schema: DFSchemaRef::new(DFSchema::empty()),
5093 });
5094
5095 assert_eq!(
5096 empty_relation.partial_cmp(&describe_table),
5097 Some(Ordering::Less)
5098 );
5099 assert_eq!(
5100 describe_table.partial_cmp(&empty_relation),
5101 Some(Ordering::Greater)
5102 );
5103 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5104 }
5105
5106 #[test]
5107 fn test_limit_with_new_children() {
5108 let input = Arc::new(LogicalPlan::Values(Values {
5109 schema: Arc::new(DFSchema::empty()),
5110 values: vec![vec![]],
5111 }));
5112 let cases = [
5113 LogicalPlan::Limit(Limit {
5114 skip: None,
5115 fetch: None,
5116 input: Arc::clone(&input),
5117 }),
5118 LogicalPlan::Limit(Limit {
5119 skip: None,
5120 fetch: Some(Box::new(Expr::Literal(
5121 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5122 None,
5123 ))),
5124 input: Arc::clone(&input),
5125 }),
5126 LogicalPlan::Limit(Limit {
5127 skip: Some(Box::new(Expr::Literal(
5128 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5129 None,
5130 ))),
5131 fetch: None,
5132 input: Arc::clone(&input),
5133 }),
5134 LogicalPlan::Limit(Limit {
5135 skip: Some(Box::new(Expr::Literal(
5136 ScalarValue::new_one(&DataType::UInt32).unwrap(),
5137 None,
5138 ))),
5139 fetch: Some(Box::new(Expr::Literal(
5140 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5141 None,
5142 ))),
5143 input,
5144 }),
5145 ];
5146
5147 for limit in cases {
5148 let new_limit = limit
5149 .with_new_exprs(
5150 limit.expressions(),
5151 limit.inputs().into_iter().cloned().collect(),
5152 )
5153 .unwrap();
5154 assert_eq!(limit, new_limit);
5155 }
5156 }
5157
5158 #[test]
5159 fn test_with_subqueries_jump() {
5160 let subquery_schema =
5165 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5166
5167 let subquery_plan =
5168 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5169 .unwrap()
5170 .filter(col("sub_id").eq(lit(0)))
5171 .unwrap()
5172 .build()
5173 .unwrap();
5174
5175 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5176
5177 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5178 .unwrap()
5179 .filter(col("id").eq(lit(0)))
5180 .unwrap()
5181 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5182 .unwrap()
5183 .build()
5184 .unwrap();
5185
5186 let mut filter_found = false;
5187 plan.apply_with_subqueries(|plan| {
5188 match plan {
5189 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5190 LogicalPlan::Filter(..) => filter_found = true,
5191 _ => {}
5192 }
5193 Ok(TreeNodeRecursion::Continue)
5194 })
5195 .unwrap();
5196 assert!(!filter_found);
5197
5198 struct ProjectJumpVisitor {
5199 filter_found: bool,
5200 }
5201
5202 impl ProjectJumpVisitor {
5203 fn new() -> Self {
5204 Self {
5205 filter_found: false,
5206 }
5207 }
5208 }
5209
5210 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5211 type Node = LogicalPlan;
5212
5213 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5214 match node {
5215 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5216 LogicalPlan::Filter(..) => self.filter_found = true,
5217 _ => {}
5218 }
5219 Ok(TreeNodeRecursion::Continue)
5220 }
5221 }
5222
5223 let mut visitor = ProjectJumpVisitor::new();
5224 plan.visit_with_subqueries(&mut visitor).unwrap();
5225 assert!(!visitor.filter_found);
5226
5227 let mut filter_found = false;
5228 plan.clone()
5229 .transform_down_with_subqueries(|plan| {
5230 match plan {
5231 LogicalPlan::Projection(..) => {
5232 return Ok(Transformed::new(
5233 plan,
5234 false,
5235 TreeNodeRecursion::Jump,
5236 ));
5237 }
5238 LogicalPlan::Filter(..) => filter_found = true,
5239 _ => {}
5240 }
5241 Ok(Transformed::no(plan))
5242 })
5243 .unwrap();
5244 assert!(!filter_found);
5245
5246 let mut filter_found = false;
5247 plan.clone()
5248 .transform_down_up_with_subqueries(
5249 |plan| {
5250 match plan {
5251 LogicalPlan::Projection(..) => {
5252 return Ok(Transformed::new(
5253 plan,
5254 false,
5255 TreeNodeRecursion::Jump,
5256 ));
5257 }
5258 LogicalPlan::Filter(..) => filter_found = true,
5259 _ => {}
5260 }
5261 Ok(Transformed::no(plan))
5262 },
5263 |plan| Ok(Transformed::no(plan)),
5264 )
5265 .unwrap();
5266 assert!(!filter_found);
5267
5268 struct ProjectJumpRewriter {
5269 filter_found: bool,
5270 }
5271
5272 impl ProjectJumpRewriter {
5273 fn new() -> Self {
5274 Self {
5275 filter_found: false,
5276 }
5277 }
5278 }
5279
5280 impl TreeNodeRewriter for ProjectJumpRewriter {
5281 type Node = LogicalPlan;
5282
5283 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5284 match node {
5285 LogicalPlan::Projection(..) => {
5286 return Ok(Transformed::new(
5287 node,
5288 false,
5289 TreeNodeRecursion::Jump,
5290 ));
5291 }
5292 LogicalPlan::Filter(..) => self.filter_found = true,
5293 _ => {}
5294 }
5295 Ok(Transformed::no(node))
5296 }
5297 }
5298
5299 let mut rewriter = ProjectJumpRewriter::new();
5300 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5301 assert!(!rewriter.filter_found);
5302 }
5303
5304 #[test]
5305 fn test_with_unresolved_placeholders() {
5306 let field_name = "id";
5307 let placeholder_value = "$1";
5308 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5309
5310 let plan = table_scan(TableReference::none(), &schema, None)
5311 .unwrap()
5312 .filter(col(field_name).eq(placeholder(placeholder_value)))
5313 .unwrap()
5314 .build()
5315 .unwrap();
5316
5317 let params = plan.get_parameter_fields().unwrap();
5319 assert_eq!(params.len(), 1);
5320
5321 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5322 assert_eq!(parameter_type, None);
5323 }
5324
5325 #[test]
5326 fn test_join_with_new_exprs() -> Result<()> {
5327 fn create_test_join(
5328 on: Vec<(Expr, Expr)>,
5329 filter: Option<Expr>,
5330 ) -> Result<LogicalPlan> {
5331 let schema = Schema::new(vec![
5332 Field::new("a", DataType::Int32, false),
5333 Field::new("b", DataType::Int32, false),
5334 ]);
5335
5336 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5337 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5338
5339 Ok(LogicalPlan::Join(Join {
5340 left: Arc::new(
5341 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5342 ),
5343 right: Arc::new(
5344 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5345 ),
5346 on,
5347 filter,
5348 join_type: JoinType::Inner,
5349 join_constraint: JoinConstraint::On,
5350 schema: Arc::new(left_schema.join(&right_schema)?),
5351 null_equality: NullEquality::NullEqualsNothing,
5352 null_aware: false,
5353 }))
5354 }
5355
5356 {
5357 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5358 let LogicalPlan::Join(join) = join.with_new_exprs(
5359 join.expressions(),
5360 join.inputs().into_iter().cloned().collect(),
5361 )?
5362 else {
5363 unreachable!()
5364 };
5365 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5366 assert_eq!(join.filter, None);
5367 }
5368
5369 {
5370 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5371 let LogicalPlan::Join(join) = join.with_new_exprs(
5372 join.expressions(),
5373 join.inputs().into_iter().cloned().collect(),
5374 )?
5375 else {
5376 unreachable!()
5377 };
5378 assert_eq!(join.on, vec![]);
5379 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5380 }
5381
5382 {
5383 let join = create_test_join(
5384 vec![(col("t1.a"), (col("t2.a")))],
5385 Some(col("t1.b").gt(col("t2.b"))),
5386 )?;
5387 let LogicalPlan::Join(join) = join.with_new_exprs(
5388 join.expressions(),
5389 join.inputs().into_iter().cloned().collect(),
5390 )?
5391 else {
5392 unreachable!()
5393 };
5394 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5395 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5396 }
5397
5398 {
5399 let join = create_test_join(
5400 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5401 None,
5402 )?;
5403 let LogicalPlan::Join(join) = join.with_new_exprs(
5404 vec![
5405 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5406 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5407 col("t1.b"),
5408 col("t2.b"),
5409 lit(true),
5410 ],
5411 join.inputs().into_iter().cloned().collect(),
5412 )?
5413 else {
5414 unreachable!()
5415 };
5416 assert_eq!(
5417 join.on,
5418 vec![
5419 (
5420 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5421 binary_expr(col("t2.a"), Operator::Plus, lit(2))
5422 ),
5423 (col("t1.b"), (col("t2.b")))
5424 ]
5425 );
5426 assert_eq!(join.filter, Some(lit(true)));
5427 }
5428
5429 Ok(())
5430 }
5431
5432 #[test]
5433 fn test_join_try_new() -> Result<()> {
5434 let schema = Schema::new(vec![
5435 Field::new("a", DataType::Int32, false),
5436 Field::new("b", DataType::Int32, false),
5437 ]);
5438
5439 let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5440
5441 let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5442
5443 let join_types = vec![
5444 JoinType::Inner,
5445 JoinType::Left,
5446 JoinType::Right,
5447 JoinType::Full,
5448 JoinType::LeftSemi,
5449 JoinType::LeftAnti,
5450 JoinType::RightSemi,
5451 JoinType::RightAnti,
5452 JoinType::LeftMark,
5453 ];
5454
5455 for join_type in join_types {
5456 let join = Join::try_new(
5457 Arc::new(left_scan.clone()),
5458 Arc::new(right_scan.clone()),
5459 vec![(col("t1.a"), col("t2.a"))],
5460 Some(col("t1.b").gt(col("t2.b"))),
5461 join_type,
5462 JoinConstraint::On,
5463 NullEquality::NullEqualsNothing,
5464 false,
5465 )?;
5466
5467 match join_type {
5468 JoinType::LeftSemi | JoinType::LeftAnti => {
5469 assert_eq!(join.schema.fields().len(), 2);
5470
5471 let fields = join.schema.fields();
5472 assert_eq!(
5473 fields[0].name(),
5474 "a",
5475 "First field should be 'a' from left table"
5476 );
5477 assert_eq!(
5478 fields[1].name(),
5479 "b",
5480 "Second field should be 'b' from left table"
5481 );
5482 }
5483 JoinType::RightSemi | JoinType::RightAnti => {
5484 assert_eq!(join.schema.fields().len(), 2);
5485
5486 let fields = join.schema.fields();
5487 assert_eq!(
5488 fields[0].name(),
5489 "a",
5490 "First field should be 'a' from right table"
5491 );
5492 assert_eq!(
5493 fields[1].name(),
5494 "b",
5495 "Second field should be 'b' from right table"
5496 );
5497 }
5498 JoinType::LeftMark => {
5499 assert_eq!(join.schema.fields().len(), 3);
5500
5501 let fields = join.schema.fields();
5502 assert_eq!(
5503 fields[0].name(),
5504 "a",
5505 "First field should be 'a' from left table"
5506 );
5507 assert_eq!(
5508 fields[1].name(),
5509 "b",
5510 "Second field should be 'b' from left table"
5511 );
5512 assert_eq!(
5513 fields[2].name(),
5514 "mark",
5515 "Third field should be the mark column"
5516 );
5517
5518 assert!(!fields[0].is_nullable());
5519 assert!(!fields[1].is_nullable());
5520 assert!(!fields[2].is_nullable());
5521 }
5522 _ => {
5523 assert_eq!(join.schema.fields().len(), 4);
5524
5525 let fields = join.schema.fields();
5526 assert_eq!(
5527 fields[0].name(),
5528 "a",
5529 "First field should be 'a' from left table"
5530 );
5531 assert_eq!(
5532 fields[1].name(),
5533 "b",
5534 "Second field should be 'b' from left table"
5535 );
5536 assert_eq!(
5537 fields[2].name(),
5538 "a",
5539 "Third field should be 'a' from right table"
5540 );
5541 assert_eq!(
5542 fields[3].name(),
5543 "b",
5544 "Fourth field should be 'b' from right table"
5545 );
5546
5547 if join_type == JoinType::Left {
5548 assert!(!fields[0].is_nullable());
5550 assert!(!fields[1].is_nullable());
5551 assert!(fields[2].is_nullable());
5553 assert!(fields[3].is_nullable());
5554 } else if join_type == JoinType::Right {
5555 assert!(fields[0].is_nullable());
5557 assert!(fields[1].is_nullable());
5558 assert!(!fields[2].is_nullable());
5560 assert!(!fields[3].is_nullable());
5561 } else if join_type == JoinType::Full {
5562 assert!(fields[0].is_nullable());
5563 assert!(fields[1].is_nullable());
5564 assert!(fields[2].is_nullable());
5565 assert!(fields[3].is_nullable());
5566 }
5567 }
5568 }
5569
5570 assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5571 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5572 assert_eq!(join.join_type, join_type);
5573 assert_eq!(join.join_constraint, JoinConstraint::On);
5574 assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5575 }
5576
5577 Ok(())
5578 }
5579
5580 #[test]
5581 fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5582 let left_schema = Schema::new(vec![
5583 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), ]);
5587
5588 let right_schema = Schema::new(vec![
5589 Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), Field::new("value", DataType::Float64, true), ]);
5593
5594 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5595
5596 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5597
5598 {
5600 let join = Join::try_new(
5603 Arc::new(left_plan.clone()),
5604 Arc::new(right_plan.clone()),
5605 vec![(col("t1.id"), col("t2.id"))],
5606 None,
5607 JoinType::Inner,
5608 JoinConstraint::Using,
5609 NullEquality::NullEqualsNothing,
5610 false,
5611 )?;
5612
5613 let fields = join.schema.fields();
5614
5615 assert_eq!(fields.len(), 6);
5616
5617 assert_eq!(
5618 fields[0].name(),
5619 "id",
5620 "First field should be 'id' from left table"
5621 );
5622 assert_eq!(
5623 fields[1].name(),
5624 "name",
5625 "Second field should be 'name' from left table"
5626 );
5627 assert_eq!(
5628 fields[2].name(),
5629 "value",
5630 "Third field should be 'value' from left table"
5631 );
5632 assert_eq!(
5633 fields[3].name(),
5634 "id",
5635 "Fourth field should be 'id' from right table"
5636 );
5637 assert_eq!(
5638 fields[4].name(),
5639 "category",
5640 "Fifth field should be 'category' from right table"
5641 );
5642 assert_eq!(
5643 fields[5].name(),
5644 "value",
5645 "Sixth field should be 'value' from right table"
5646 );
5647
5648 assert_eq!(join.join_constraint, JoinConstraint::Using);
5649 }
5650
5651 {
5653 let join = Join::try_new(
5655 Arc::new(left_plan.clone()),
5656 Arc::new(right_plan.clone()),
5657 vec![(col("t1.id"), col("t2.id"))], Some(col("t1.value").lt(col("t2.value"))), JoinType::Inner,
5660 JoinConstraint::On,
5661 NullEquality::NullEqualsNothing,
5662 false,
5663 )?;
5664
5665 let fields = join.schema.fields();
5666 assert_eq!(fields.len(), 6);
5667
5668 assert_eq!(
5669 fields[0].name(),
5670 "id",
5671 "First field should be 'id' from left table"
5672 );
5673 assert_eq!(
5674 fields[1].name(),
5675 "name",
5676 "Second field should be 'name' from left table"
5677 );
5678 assert_eq!(
5679 fields[2].name(),
5680 "value",
5681 "Third field should be 'value' from left table"
5682 );
5683 assert_eq!(
5684 fields[3].name(),
5685 "id",
5686 "Fourth field should be 'id' from right table"
5687 );
5688 assert_eq!(
5689 fields[4].name(),
5690 "category",
5691 "Fifth field should be 'category' from right table"
5692 );
5693 assert_eq!(
5694 fields[5].name(),
5695 "value",
5696 "Sixth field should be 'value' from right table"
5697 );
5698
5699 assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5700 }
5701
5702 {
5704 let join = Join::try_new(
5705 Arc::new(left_plan.clone()),
5706 Arc::new(right_plan.clone()),
5707 vec![(col("t1.id"), col("t2.id"))],
5708 None,
5709 JoinType::Inner,
5710 JoinConstraint::On,
5711 NullEquality::NullEqualsNull,
5712 false,
5713 )?;
5714
5715 assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5716 }
5717
5718 Ok(())
5719 }
5720
5721 #[test]
5722 fn test_join_try_new_schema_validation() -> Result<()> {
5723 let left_schema = Schema::new(vec![
5724 Field::new("id", DataType::Int32, false),
5725 Field::new("name", DataType::Utf8, false),
5726 Field::new("value", DataType::Float64, true),
5727 ]);
5728
5729 let right_schema = Schema::new(vec![
5730 Field::new("id", DataType::Int32, false),
5731 Field::new("category", DataType::Utf8, true),
5732 Field::new("code", DataType::Int16, false),
5733 ]);
5734
5735 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5736
5737 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5738
5739 let join_types = vec![
5740 JoinType::Inner,
5741 JoinType::Left,
5742 JoinType::Right,
5743 JoinType::Full,
5744 ];
5745
5746 for join_type in join_types {
5747 let join = Join::try_new(
5748 Arc::new(left_plan.clone()),
5749 Arc::new(right_plan.clone()),
5750 vec![(col("t1.id"), col("t2.id"))],
5751 Some(col("t1.value").gt(lit(5.0))),
5752 join_type,
5753 JoinConstraint::On,
5754 NullEquality::NullEqualsNothing,
5755 false,
5756 )?;
5757
5758 let fields = join.schema.fields();
5759 assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5760
5761 for (i, field) in fields.iter().enumerate() {
5762 let expected_nullable = match (i, &join_type) {
5763 (0, JoinType::Right | JoinType::Full) => true, (1, JoinType::Right | JoinType::Full) => true, (2, _) => true, (3, JoinType::Left | JoinType::Full) => true, (4, _) => true, (5, JoinType::Left | JoinType::Full) => true, _ => false,
5774 };
5775
5776 assert_eq!(
5777 field.is_nullable(),
5778 expected_nullable,
5779 "Field {} ({}) nullability incorrect for {:?} join",
5780 i,
5781 field.name(),
5782 join_type
5783 );
5784 }
5785 }
5786
5787 let using_join = Join::try_new(
5788 Arc::new(left_plan.clone()),
5789 Arc::new(right_plan.clone()),
5790 vec![(col("t1.id"), col("t2.id"))],
5791 None,
5792 JoinType::Inner,
5793 JoinConstraint::Using,
5794 NullEquality::NullEqualsNothing,
5795 false,
5796 )?;
5797
5798 assert_eq!(
5799 using_join.schema.fields().len(),
5800 6,
5801 "USING join should have all fields"
5802 );
5803 assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5804
5805 Ok(())
5806 }
5807}