1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::DdlStatement;
27use super::dml::CopyTo;
28use super::invariants::{
29 InvariantLevel, assert_always_invariants_at_current_node,
30 assert_executable_invariants,
31};
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34 Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams,
35 intersect_metadata_for_union,
36};
37use crate::expr_rewriter::{
38 NamePreserver, create_col_from_scalar_expr, normalize_cols, normalize_sorts,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45 grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
46};
47use crate::{
48 BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable,
49 LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
50 WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
51};
52
53use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
54use datafusion_common::cse::{NormalizeEq, Normalizeable};
55use datafusion_common::format::ExplainFormat;
56use datafusion_common::metadata::check_metadata_with_storage_equal;
57use datafusion_common::tree_node::{
58 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61 Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency,
62 FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result,
63 ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies,
64 assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
65};
66use indexmap::IndexSet;
67
68use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
206pub enum LogicalPlan {
207 Projection(Projection),
210 Filter(Filter),
219 Window(Window),
225 Aggregate(Aggregate),
231 Sort(Sort),
234 Join(Join),
237 Repartition(Repartition),
241 Union(Union),
245 TableScan(TableScan),
248 EmptyRelation(EmptyRelation),
252 Subquery(Subquery),
255 SubqueryAlias(SubqueryAlias),
257 Limit(Limit),
259 Statement(Statement),
261 Values(Values),
266 Explain(Explain),
269 Analyze(Analyze),
273 Extension(Extension),
276 Distinct(Distinct),
279 Dml(DmlStatement),
281 Ddl(DdlStatement),
283 Copy(CopyTo),
285 DescribeTable(DescribeTable),
288 Unnest(Unnest),
291 RecursiveQuery(RecursiveQuery),
293}
294
295impl Default for LogicalPlan {
296 fn default() -> Self {
297 LogicalPlan::EmptyRelation(EmptyRelation {
298 produce_one_row: false,
299 schema: Arc::new(DFSchema::empty()),
300 })
301 }
302}
303
304impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
305 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
306 &'a self,
307 mut f: F,
308 ) -> Result<TreeNodeRecursion> {
309 f(self)
310 }
311
312 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
313 self,
314 mut f: F,
315 ) -> Result<Transformed<Self>> {
316 f(self)
317 }
318}
319
320impl LogicalPlan {
321 pub fn schema(&self) -> &DFSchemaRef {
323 match self {
324 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
325 LogicalPlan::Values(Values { schema, .. }) => schema,
326 LogicalPlan::TableScan(TableScan {
327 projected_schema, ..
328 }) => projected_schema,
329 LogicalPlan::Projection(Projection { schema, .. }) => schema,
330 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
331 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
332 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
333 LogicalPlan::Window(Window { schema, .. }) => schema,
334 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
335 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
336 LogicalPlan::Join(Join { schema, .. }) => schema,
337 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
338 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
339 LogicalPlan::Statement(statement) => statement.schema(),
340 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
341 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
342 LogicalPlan::Explain(explain) => &explain.schema,
343 LogicalPlan::Analyze(analyze) => &analyze.schema,
344 LogicalPlan::Extension(extension) => extension.node.schema(),
345 LogicalPlan::Union(Union { schema, .. }) => schema,
346 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
347 output_schema
348 }
349 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
350 LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
351 LogicalPlan::Ddl(ddl) => ddl.schema(),
352 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
353 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
354 static_term.schema()
356 }
357 }
358 }
359
360 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
363 match self {
364 LogicalPlan::Window(_)
365 | LogicalPlan::Projection(_)
366 | LogicalPlan::Aggregate(_)
367 | LogicalPlan::Unnest(_)
368 | LogicalPlan::Join(_) => self
369 .inputs()
370 .iter()
371 .map(|input| input.schema().as_ref())
372 .collect(),
373 _ => vec![],
374 }
375 }
376
377 pub fn explain_schema() -> SchemaRef {
379 SchemaRef::new(Schema::new(vec![
380 Field::new("plan_type", DataType::Utf8, false),
381 Field::new("plan", DataType::Utf8, false),
382 ]))
383 }
384
385 pub fn describe_schema() -> Schema {
387 Schema::new(vec![
388 Field::new("column_name", DataType::Utf8, false),
389 Field::new("data_type", DataType::Utf8, false),
390 Field::new("is_nullable", DataType::Utf8, false),
391 ])
392 }
393
394 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
411 let mut exprs = vec![];
412 self.apply_expressions(|e| {
413 exprs.push(e.clone());
414 Ok(TreeNodeRecursion::Continue)
415 })
416 .unwrap();
418 exprs
419 }
420
421 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
424 let mut exprs = vec![];
425 self.apply_expressions(|e| {
426 find_out_reference_exprs(e).into_iter().for_each(|e| {
427 if !exprs.contains(&e) {
428 exprs.push(e)
429 }
430 });
431 Ok(TreeNodeRecursion::Continue)
432 })
433 .unwrap();
435 self.inputs()
436 .into_iter()
437 .flat_map(|child| child.all_out_ref_exprs())
438 .for_each(|e| {
439 if !exprs.contains(&e) {
440 exprs.push(e)
441 }
442 });
443 exprs
444 }
445
446 pub fn inputs(&self) -> Vec<&LogicalPlan> {
450 match self {
451 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
452 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
453 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
454 LogicalPlan::Window(Window { input, .. }) => vec![input],
455 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
456 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
457 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
458 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
459 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
460 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
461 LogicalPlan::Extension(extension) => extension.node.inputs(),
462 LogicalPlan::Union(Union { inputs, .. }) => {
463 inputs.iter().map(|arc| arc.as_ref()).collect()
464 }
465 LogicalPlan::Distinct(
466 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
467 ) => vec![input],
468 LogicalPlan::Explain(explain) => vec![&explain.plan],
469 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
470 LogicalPlan::Dml(write) => vec![&write.input],
471 LogicalPlan::Copy(copy) => vec![©.input],
472 LogicalPlan::Ddl(ddl) => ddl.inputs(),
473 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
474 LogicalPlan::RecursiveQuery(RecursiveQuery {
475 static_term,
476 recursive_term,
477 ..
478 }) => vec![static_term, recursive_term],
479 LogicalPlan::Statement(stmt) => stmt.inputs(),
480 LogicalPlan::TableScan { .. }
482 | LogicalPlan::EmptyRelation { .. }
483 | LogicalPlan::Values { .. }
484 | LogicalPlan::DescribeTable(_) => vec![],
485 }
486 }
487
488 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
490 let mut using_columns: Vec<HashSet<Column>> = vec![];
491
492 self.apply_with_subqueries(|plan| {
493 if let LogicalPlan::Join(Join {
494 join_constraint: JoinConstraint::Using,
495 on,
496 ..
497 }) = plan
498 {
499 let columns =
501 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
502 let Some(l) = l.get_as_join_column() else {
503 return internal_err!(
504 "Invalid join key. Expected column, found {l:?}"
505 );
506 };
507 let Some(r) = r.get_as_join_column() else {
508 return internal_err!(
509 "Invalid join key. Expected column, found {r:?}"
510 );
511 };
512 accumu.insert(l.to_owned());
513 accumu.insert(r.to_owned());
514 Result::<_, DataFusionError>::Ok(accumu)
515 })?;
516 using_columns.push(columns);
517 }
518 Ok(TreeNodeRecursion::Continue)
519 })?;
520
521 Ok(using_columns)
522 }
523
524 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
526 match self {
527 LogicalPlan::Projection(projection) => {
528 Ok(Some(projection.expr.as_slice()[0].clone()))
529 }
530 LogicalPlan::Aggregate(agg) => {
531 if agg.group_expr.is_empty() {
532 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
533 } else {
534 Ok(Some(agg.group_expr.as_slice()[0].clone()))
535 }
536 }
537 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
538 Ok(Some(select_expr[0].clone()))
539 }
540 LogicalPlan::Filter(Filter { input, .. })
541 | LogicalPlan::Distinct(Distinct::All(input))
542 | LogicalPlan::Sort(Sort { input, .. })
543 | LogicalPlan::Limit(Limit { input, .. })
544 | LogicalPlan::Repartition(Repartition { input, .. })
545 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
546 LogicalPlan::Join(Join {
547 left,
548 right,
549 join_type,
550 ..
551 }) => match join_type {
552 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
553 if left.schema().fields().is_empty() {
554 right.head_output_expr()
555 } else {
556 left.head_output_expr()
557 }
558 }
559 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
560 left.head_output_expr()
561 }
562 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
563 right.head_output_expr()
564 }
565 },
566 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
567 static_term.head_output_expr()
568 }
569 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
570 union.schema.qualified_field(0),
571 )))),
572 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
573 table.projected_schema.qualified_field(0),
574 )))),
575 LogicalPlan::SubqueryAlias(subquery_alias) => {
576 let expr_opt = subquery_alias.input.head_output_expr()?;
577 expr_opt
578 .map(|expr| {
579 Ok(Expr::Column(create_col_from_scalar_expr(
580 &expr,
581 subquery_alias.alias.to_string(),
582 )?))
583 })
584 .map_or(Ok(None), |v| v.map(Some))
585 }
586 LogicalPlan::Subquery(_) => Ok(None),
587 LogicalPlan::EmptyRelation(_)
588 | LogicalPlan::Statement(_)
589 | LogicalPlan::Values(_)
590 | LogicalPlan::Explain(_)
591 | LogicalPlan::Analyze(_)
592 | LogicalPlan::Extension(_)
593 | LogicalPlan::Dml(_)
594 | LogicalPlan::Copy(_)
595 | LogicalPlan::Ddl(_)
596 | LogicalPlan::DescribeTable(_)
597 | LogicalPlan::Unnest(_) => Ok(None),
598 }
599 }
600
601 pub fn recompute_schema(self) -> Result<Self> {
624 match self {
625 LogicalPlan::Projection(Projection {
628 expr,
629 input,
630 schema: _,
631 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
632 LogicalPlan::Dml(_) => Ok(self),
633 LogicalPlan::Copy(_) => Ok(self),
634 LogicalPlan::Values(Values { schema, values }) => {
635 Ok(LogicalPlan::Values(Values { schema, values }))
637 }
638 LogicalPlan::Filter(Filter { predicate, input }) => {
639 Filter::try_new(predicate, input).map(LogicalPlan::Filter)
640 }
641 LogicalPlan::Repartition(_) => Ok(self),
642 LogicalPlan::Window(Window {
643 input,
644 window_expr,
645 schema: _,
646 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
647 LogicalPlan::Aggregate(Aggregate {
648 input,
649 group_expr,
650 aggr_expr,
651 schema: _,
652 }) => Aggregate::try_new(input, group_expr, aggr_expr)
653 .map(LogicalPlan::Aggregate),
654 LogicalPlan::Sort(_) => Ok(self),
655 LogicalPlan::Join(Join {
656 left,
657 right,
658 filter,
659 join_type,
660 join_constraint,
661 on,
662 schema: _,
663 null_equality,
664 }) => {
665 let schema =
666 build_join_schema(left.schema(), right.schema(), &join_type)?;
667
668 let new_on: Vec<_> = on
669 .into_iter()
670 .map(|equi_expr| {
671 (equi_expr.0.unalias(), equi_expr.1.unalias())
673 })
674 .collect();
675
676 Ok(LogicalPlan::Join(Join {
677 left,
678 right,
679 join_type,
680 join_constraint,
681 on: new_on,
682 filter,
683 schema: DFSchemaRef::new(schema),
684 null_equality,
685 }))
686 }
687 LogicalPlan::Subquery(_) => Ok(self),
688 LogicalPlan::SubqueryAlias(SubqueryAlias {
689 input,
690 alias,
691 schema: _,
692 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
693 LogicalPlan::Limit(_) => Ok(self),
694 LogicalPlan::Ddl(_) => Ok(self),
695 LogicalPlan::Extension(Extension { node }) => {
696 let expr = node.expressions();
699 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
700 Ok(LogicalPlan::Extension(Extension {
701 node: node.with_exprs_and_inputs(expr, inputs)?,
702 }))
703 }
704 LogicalPlan::Union(Union { inputs, schema }) => {
705 let first_input_schema = inputs[0].schema();
706 if schema.fields().len() == first_input_schema.fields().len() {
707 Ok(LogicalPlan::Union(Union { inputs, schema }))
709 } else {
710 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
718 }
719 }
720 LogicalPlan::Distinct(distinct) => {
721 let distinct = match distinct {
722 Distinct::All(input) => Distinct::All(input),
723 Distinct::On(DistinctOn {
724 on_expr,
725 select_expr,
726 sort_expr,
727 input,
728 schema: _,
729 }) => Distinct::On(DistinctOn::try_new(
730 on_expr,
731 select_expr,
732 sort_expr,
733 input,
734 )?),
735 };
736 Ok(LogicalPlan::Distinct(distinct))
737 }
738 LogicalPlan::RecursiveQuery(_) => Ok(self),
739 LogicalPlan::Analyze(_) => Ok(self),
740 LogicalPlan::Explain(_) => Ok(self),
741 LogicalPlan::TableScan(_) => Ok(self),
742 LogicalPlan::EmptyRelation(_) => Ok(self),
743 LogicalPlan::Statement(_) => Ok(self),
744 LogicalPlan::DescribeTable(_) => Ok(self),
745 LogicalPlan::Unnest(Unnest {
746 input,
747 exec_columns,
748 options,
749 ..
750 }) => {
751 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
753 }
754 }
755 }
756
757 pub fn with_new_exprs(
783 &self,
784 mut expr: Vec<Expr>,
785 inputs: Vec<LogicalPlan>,
786 ) -> Result<LogicalPlan> {
787 match self {
788 LogicalPlan::Projection(Projection { .. }) => {
791 let input = self.only_input(inputs)?;
792 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
793 }
794 LogicalPlan::Dml(DmlStatement {
795 table_name,
796 target,
797 op,
798 ..
799 }) => {
800 self.assert_no_expressions(expr)?;
801 let input = self.only_input(inputs)?;
802 Ok(LogicalPlan::Dml(DmlStatement::new(
803 table_name.clone(),
804 Arc::clone(target),
805 op.clone(),
806 Arc::new(input),
807 )))
808 }
809 LogicalPlan::Copy(CopyTo {
810 input: _,
811 output_url,
812 file_type,
813 options,
814 partition_by,
815 output_schema: _,
816 }) => {
817 self.assert_no_expressions(expr)?;
818 let input = self.only_input(inputs)?;
819 Ok(LogicalPlan::Copy(CopyTo::new(
820 Arc::new(input),
821 output_url.clone(),
822 partition_by.clone(),
823 Arc::clone(file_type),
824 options.clone(),
825 )))
826 }
827 LogicalPlan::Values(Values { schema, .. }) => {
828 self.assert_no_inputs(inputs)?;
829 Ok(LogicalPlan::Values(Values {
830 schema: Arc::clone(schema),
831 values: expr
832 .chunks_exact(schema.fields().len())
833 .map(|s| s.to_vec())
834 .collect(),
835 }))
836 }
837 LogicalPlan::Filter { .. } => {
838 let predicate = self.only_expr(expr)?;
839 let input = self.only_input(inputs)?;
840
841 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
842 }
843 LogicalPlan::Repartition(Repartition {
844 partitioning_scheme,
845 ..
846 }) => match partitioning_scheme {
847 Partitioning::RoundRobinBatch(n) => {
848 self.assert_no_expressions(expr)?;
849 let input = self.only_input(inputs)?;
850 Ok(LogicalPlan::Repartition(Repartition {
851 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
852 input: Arc::new(input),
853 }))
854 }
855 Partitioning::Hash(_, n) => {
856 let input = self.only_input(inputs)?;
857 Ok(LogicalPlan::Repartition(Repartition {
858 partitioning_scheme: Partitioning::Hash(expr, *n),
859 input: Arc::new(input),
860 }))
861 }
862 Partitioning::DistributeBy(_) => {
863 let input = self.only_input(inputs)?;
864 Ok(LogicalPlan::Repartition(Repartition {
865 partitioning_scheme: Partitioning::DistributeBy(expr),
866 input: Arc::new(input),
867 }))
868 }
869 },
870 LogicalPlan::Window(Window { window_expr, .. }) => {
871 assert_eq!(window_expr.len(), expr.len());
872 let input = self.only_input(inputs)?;
873 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
874 }
875 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
876 let input = self.only_input(inputs)?;
877 let agg_expr = expr.split_off(group_expr.len());
879
880 Aggregate::try_new(Arc::new(input), expr, agg_expr)
881 .map(LogicalPlan::Aggregate)
882 }
883 LogicalPlan::Sort(Sort {
884 expr: sort_expr,
885 fetch,
886 ..
887 }) => {
888 let input = self.only_input(inputs)?;
889 Ok(LogicalPlan::Sort(Sort {
890 expr: expr
891 .into_iter()
892 .zip(sort_expr.iter())
893 .map(|(expr, sort)| sort.with_expr(expr))
894 .collect(),
895 input: Arc::new(input),
896 fetch: *fetch,
897 }))
898 }
899 LogicalPlan::Join(Join {
900 join_type,
901 join_constraint,
902 on,
903 null_equality,
904 ..
905 }) => {
906 let (left, right) = self.only_two_inputs(inputs)?;
907 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
908
909 let equi_expr_count = on.len() * 2;
910 assert!(expr.len() >= equi_expr_count);
911
912 let filter_expr = if expr.len() > equi_expr_count {
915 expr.pop()
916 } else {
917 None
918 };
919
920 assert_eq!(expr.len(), equi_expr_count);
923 let mut new_on = Vec::with_capacity(on.len());
924 let mut iter = expr.into_iter();
925 while let Some(left) = iter.next() {
926 let Some(right) = iter.next() else {
927 internal_err!(
928 "Expected a pair of expressions to construct the join on expression"
929 )?
930 };
931
932 new_on.push((left.unalias(), right.unalias()));
934 }
935
936 Ok(LogicalPlan::Join(Join {
937 left: Arc::new(left),
938 right: Arc::new(right),
939 join_type: *join_type,
940 join_constraint: *join_constraint,
941 on: new_on,
942 filter: filter_expr,
943 schema: DFSchemaRef::new(schema),
944 null_equality: *null_equality,
945 }))
946 }
947 LogicalPlan::Subquery(Subquery {
948 outer_ref_columns,
949 spans,
950 ..
951 }) => {
952 self.assert_no_expressions(expr)?;
953 let input = self.only_input(inputs)?;
954 let subquery = LogicalPlanBuilder::from(input).build()?;
955 Ok(LogicalPlan::Subquery(Subquery {
956 subquery: Arc::new(subquery),
957 outer_ref_columns: outer_ref_columns.clone(),
958 spans: spans.clone(),
959 }))
960 }
961 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
962 self.assert_no_expressions(expr)?;
963 let input = self.only_input(inputs)?;
964 SubqueryAlias::try_new(Arc::new(input), alias.clone())
965 .map(LogicalPlan::SubqueryAlias)
966 }
967 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
968 let old_expr_len = skip.iter().chain(fetch.iter()).count();
969 assert_eq_or_internal_err!(
970 old_expr_len,
971 expr.len(),
972 "Invalid number of new Limit expressions: expected {}, got {}",
973 old_expr_len,
974 expr.len()
975 );
976 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
978 let new_skip = skip.as_ref().and_then(|_| expr.pop());
979 let input = self.only_input(inputs)?;
980 Ok(LogicalPlan::Limit(Limit {
981 skip: new_skip.map(Box::new),
982 fetch: new_fetch.map(Box::new),
983 input: Arc::new(input),
984 }))
985 }
986 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
987 name,
988 if_not_exists,
989 or_replace,
990 column_defaults,
991 temporary,
992 ..
993 })) => {
994 self.assert_no_expressions(expr)?;
995 let input = self.only_input(inputs)?;
996 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
997 CreateMemoryTable {
998 input: Arc::new(input),
999 constraints: Constraints::default(),
1000 name: name.clone(),
1001 if_not_exists: *if_not_exists,
1002 or_replace: *or_replace,
1003 column_defaults: column_defaults.clone(),
1004 temporary: *temporary,
1005 },
1006 )))
1007 }
1008 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1009 name,
1010 or_replace,
1011 definition,
1012 temporary,
1013 ..
1014 })) => {
1015 self.assert_no_expressions(expr)?;
1016 let input = self.only_input(inputs)?;
1017 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1018 input: Arc::new(input),
1019 name: name.clone(),
1020 or_replace: *or_replace,
1021 temporary: *temporary,
1022 definition: definition.clone(),
1023 })))
1024 }
1025 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1026 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1027 })),
1028 LogicalPlan::Union(Union { schema, .. }) => {
1029 self.assert_no_expressions(expr)?;
1030 let input_schema = inputs[0].schema();
1031 let schema = if schema.fields().len() == input_schema.fields().len() {
1033 Arc::clone(schema)
1034 } else {
1035 Arc::clone(input_schema)
1036 };
1037 Ok(LogicalPlan::Union(Union {
1038 inputs: inputs.into_iter().map(Arc::new).collect(),
1039 schema,
1040 }))
1041 }
1042 LogicalPlan::Distinct(distinct) => {
1043 let distinct = match distinct {
1044 Distinct::All(_) => {
1045 self.assert_no_expressions(expr)?;
1046 let input = self.only_input(inputs)?;
1047 Distinct::All(Arc::new(input))
1048 }
1049 Distinct::On(DistinctOn {
1050 on_expr,
1051 select_expr,
1052 ..
1053 }) => {
1054 let input = self.only_input(inputs)?;
1055 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1056 let select_expr = expr.split_off(on_expr.len());
1057 assert!(
1058 sort_expr.is_empty(),
1059 "with_new_exprs for Distinct does not support sort expressions"
1060 );
1061 Distinct::On(DistinctOn::try_new(
1062 expr,
1063 select_expr,
1064 None, Arc::new(input),
1066 )?)
1067 }
1068 };
1069 Ok(LogicalPlan::Distinct(distinct))
1070 }
1071 LogicalPlan::RecursiveQuery(RecursiveQuery {
1072 name, is_distinct, ..
1073 }) => {
1074 self.assert_no_expressions(expr)?;
1075 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1076 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1077 name: name.clone(),
1078 static_term: Arc::new(static_term),
1079 recursive_term: Arc::new(recursive_term),
1080 is_distinct: *is_distinct,
1081 }))
1082 }
1083 LogicalPlan::Analyze(a) => {
1084 self.assert_no_expressions(expr)?;
1085 let input = self.only_input(inputs)?;
1086 Ok(LogicalPlan::Analyze(Analyze {
1087 verbose: a.verbose,
1088 schema: Arc::clone(&a.schema),
1089 input: Arc::new(input),
1090 }))
1091 }
1092 LogicalPlan::Explain(e) => {
1093 self.assert_no_expressions(expr)?;
1094 let input = self.only_input(inputs)?;
1095 Ok(LogicalPlan::Explain(Explain {
1096 verbose: e.verbose,
1097 plan: Arc::new(input),
1098 explain_format: e.explain_format.clone(),
1099 stringified_plans: e.stringified_plans.clone(),
1100 schema: Arc::clone(&e.schema),
1101 logical_optimization_succeeded: e.logical_optimization_succeeded,
1102 }))
1103 }
1104 LogicalPlan::Statement(Statement::Prepare(Prepare {
1105 name, fields, ..
1106 })) => {
1107 self.assert_no_expressions(expr)?;
1108 let input = self.only_input(inputs)?;
1109 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1110 name: name.clone(),
1111 fields: fields.clone(),
1112 input: Arc::new(input),
1113 })))
1114 }
1115 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1116 self.assert_no_inputs(inputs)?;
1117 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1118 name: name.clone(),
1119 parameters: expr,
1120 })))
1121 }
1122 LogicalPlan::TableScan(ts) => {
1123 self.assert_no_inputs(inputs)?;
1124 Ok(LogicalPlan::TableScan(TableScan {
1125 filters: expr,
1126 ..ts.clone()
1127 }))
1128 }
1129 LogicalPlan::EmptyRelation(_)
1130 | LogicalPlan::Ddl(_)
1131 | LogicalPlan::Statement(_)
1132 | LogicalPlan::DescribeTable(_) => {
1133 self.assert_no_expressions(expr)?;
1135 self.assert_no_inputs(inputs)?;
1136 Ok(self.clone())
1137 }
1138 LogicalPlan::Unnest(Unnest {
1139 exec_columns: columns,
1140 options,
1141 ..
1142 }) => {
1143 self.assert_no_expressions(expr)?;
1144 let input = self.only_input(inputs)?;
1145 let new_plan =
1147 unnest_with_options(input, columns.clone(), options.clone())?;
1148 Ok(new_plan)
1149 }
1150 }
1151 }
1152
1153 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1155 match check {
1156 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1157 InvariantLevel::Executable => assert_executable_invariants(self),
1158 }
1159 }
1160
1161 #[inline]
1163 #[expect(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1165 assert_or_internal_err!(
1166 expr.is_empty(),
1167 "{self:?} should have no exprs, got {:?}",
1168 expr
1169 );
1170 Ok(())
1171 }
1172
1173 #[inline]
1175 #[expect(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1177 assert_or_internal_err!(
1178 inputs.is_empty(),
1179 "{self:?} should have no inputs, got: {:?}",
1180 inputs
1181 );
1182 Ok(())
1183 }
1184
1185 #[inline]
1187 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1188 assert_eq_or_internal_err!(
1189 expr.len(),
1190 1,
1191 "{self:?} should have exactly one expr, got {:?}",
1192 &expr
1193 );
1194 Ok(expr.remove(0))
1195 }
1196
1197 #[inline]
1199 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1200 assert_eq_or_internal_err!(
1201 inputs.len(),
1202 1,
1203 "{self:?} should have exactly one input, got {:?}",
1204 &inputs
1205 );
1206 Ok(inputs.remove(0))
1207 }
1208
1209 #[inline]
1211 fn only_two_inputs(
1212 &self,
1213 mut inputs: Vec<LogicalPlan>,
1214 ) -> Result<(LogicalPlan, LogicalPlan)> {
1215 assert_eq_or_internal_err!(
1216 inputs.len(),
1217 2,
1218 "{self:?} should have exactly two inputs, got {:?}",
1219 &inputs
1220 );
1221 let right = inputs.remove(1);
1222 let left = inputs.remove(0);
1223 Ok((left, right))
1224 }
1225
1226 pub fn with_param_values(
1279 self,
1280 param_values: impl Into<ParamValues>,
1281 ) -> Result<LogicalPlan> {
1282 let param_values = param_values.into();
1283 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1284
1285 Ok(
1287 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1288 plan_with_values
1289 {
1290 param_values.verify_fields(&prepare_lp.fields)?;
1291 Arc::unwrap_or_clone(prepare_lp.input)
1293 } else {
1294 plan_with_values
1295 },
1296 )
1297 }
1298
1299 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1304 match self {
1305 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1306 LogicalPlan::Filter(filter) => {
1307 if filter.is_scalar() {
1308 Some(1)
1309 } else {
1310 filter.input.max_rows()
1311 }
1312 }
1313 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1314 LogicalPlan::Aggregate(Aggregate {
1315 input, group_expr, ..
1316 }) => {
1317 if group_expr
1319 .iter()
1320 .all(|expr| matches!(expr, Expr::Literal(_, _)))
1321 {
1322 Some(1)
1323 } else {
1324 input.max_rows()
1325 }
1326 }
1327 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1328 match (fetch, input.max_rows()) {
1329 (Some(fetch_limit), Some(input_max)) => {
1330 Some(input_max.min(*fetch_limit))
1331 }
1332 (Some(fetch_limit), None) => Some(*fetch_limit),
1333 (None, Some(input_max)) => Some(input_max),
1334 (None, None) => None,
1335 }
1336 }
1337 LogicalPlan::Join(Join {
1338 left,
1339 right,
1340 join_type,
1341 ..
1342 }) => match join_type {
1343 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1344 JoinType::Left | JoinType::Right | JoinType::Full => {
1345 match (left.max_rows()?, right.max_rows()?, join_type) {
1346 (0, 0, _) => Some(0),
1347 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1348 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1349 (left_max, right_max, _) => Some(left_max * right_max),
1350 }
1351 }
1352 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1353 left.max_rows()
1354 }
1355 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1356 right.max_rows()
1357 }
1358 },
1359 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1360 LogicalPlan::Union(Union { inputs, .. }) => {
1361 inputs.iter().try_fold(0usize, |mut acc, plan| {
1362 acc += plan.max_rows()?;
1363 Some(acc)
1364 })
1365 }
1366 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1367 LogicalPlan::EmptyRelation(_) => Some(0),
1368 LogicalPlan::RecursiveQuery(_) => None,
1369 LogicalPlan::Subquery(_) => None,
1370 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1371 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1372 Ok(FetchType::Literal(s)) => s,
1373 _ => None,
1374 },
1375 LogicalPlan::Distinct(
1376 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1377 ) => input.max_rows(),
1378 LogicalPlan::Values(v) => Some(v.values.len()),
1379 LogicalPlan::Unnest(_) => None,
1380 LogicalPlan::Ddl(_)
1381 | LogicalPlan::Explain(_)
1382 | LogicalPlan::Analyze(_)
1383 | LogicalPlan::Dml(_)
1384 | LogicalPlan::Copy(_)
1385 | LogicalPlan::DescribeTable(_)
1386 | LogicalPlan::Statement(_)
1387 | LogicalPlan::Extension(_) => None,
1388 }
1389 }
1390
1391 pub fn contains_outer_reference(&self) -> bool {
1393 let mut contains = false;
1394 self.apply_expressions(|expr| {
1395 Ok(if expr.contains_outer() {
1396 contains = true;
1397 TreeNodeRecursion::Stop
1398 } else {
1399 TreeNodeRecursion::Continue
1400 })
1401 })
1402 .unwrap();
1403 contains
1404 }
1405
1406 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1414 match self {
1415 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1416 .output_expressions()?
1417 .into_iter()
1418 .zip(self.schema().columns())
1419 .collect()),
1420 LogicalPlan::Window(Window {
1421 window_expr,
1422 input,
1423 schema,
1424 }) => {
1425 let mut output_exprs = input.columnized_output_exprs()?;
1433 let input_len = input.schema().fields().len();
1434 output_exprs.extend(
1435 window_expr
1436 .iter()
1437 .zip(schema.columns().into_iter().skip(input_len)),
1438 );
1439 Ok(output_exprs)
1440 }
1441 _ => Ok(vec![]),
1442 }
1443 }
1444}
1445
1446impl LogicalPlan {
1447 pub fn replace_params_with_values(
1454 self,
1455 param_values: &ParamValues,
1456 ) -> Result<LogicalPlan> {
1457 self.transform_up_with_subqueries(|plan| {
1458 let schema = Arc::clone(plan.schema());
1459 let name_preserver = NamePreserver::new(&plan);
1460 plan.map_expressions(|e| {
1461 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1462 if !has_placeholder {
1463 Ok(Transformed::no(e))
1467 } else {
1468 let original_name = name_preserver.save(&e);
1469 let transformed_expr = e.transform_up(|e| {
1470 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1471 let (value, metadata) = param_values
1472 .get_placeholders_with_values(&id)?
1473 .into_inner();
1474 Ok(Transformed::yes(Expr::Literal(value, metadata)))
1475 } else {
1476 Ok(Transformed::no(e))
1477 }
1478 })?;
1479 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1481 }
1482 })?
1483 .map_data(|plan| plan.update_schema_data_type())
1484 })
1485 .map(|res| res.data)
1486 }
1487
1488 fn update_schema_data_type(self) -> Result<LogicalPlan> {
1494 match self {
1495 LogicalPlan::Values(Values { values, schema: _ }) => {
1499 LogicalPlanBuilder::values(values)?.build()
1500 }
1501 plan => plan.recompute_schema(),
1503 }
1504 }
1505
1506 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1508 let mut param_names = HashSet::new();
1509 self.apply_with_subqueries(|plan| {
1510 plan.apply_expressions(|expr| {
1511 expr.apply(|expr| {
1512 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1513 param_names.insert(id.clone());
1514 }
1515 Ok(TreeNodeRecursion::Continue)
1516 })
1517 })
1518 })
1519 .map(|_| param_names)
1520 }
1521
1522 pub fn get_parameter_types(
1527 &self,
1528 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1529 let mut parameter_fields = self.get_parameter_fields()?;
1530 Ok(parameter_fields
1531 .drain()
1532 .map(|(name, maybe_field)| {
1533 (name, maybe_field.map(|field| field.data_type().clone()))
1534 })
1535 .collect())
1536 }
1537
1538 pub fn get_parameter_fields(
1540 &self,
1541 ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1542 let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1543
1544 self.apply_with_subqueries(|plan| {
1545 plan.apply_expressions(|expr| {
1546 expr.apply(|expr| {
1547 if let Expr::Placeholder(Placeholder { id, field }) = expr {
1548 let prev = param_types.get(id);
1549 match (prev, field) {
1550 (Some(Some(prev)), Some(field)) => {
1551 check_metadata_with_storage_equal(
1552 (field.data_type(), Some(field.metadata())),
1553 (prev.data_type(), Some(prev.metadata())),
1554 "parameter",
1555 &format!(": Conflicting types for id {id}"),
1556 )?;
1557 }
1558 (_, Some(field)) => {
1559 param_types.insert(id.clone(), Some(Arc::clone(field)));
1560 }
1561 _ => {
1562 param_types.insert(id.clone(), None);
1563 }
1564 }
1565 }
1566 Ok(TreeNodeRecursion::Continue)
1567 })
1568 })
1569 })
1570 .map(|_| param_types)
1571 }
1572
1573 pub fn display_indent(&self) -> impl Display + '_ {
1605 struct Wrapper<'a>(&'a LogicalPlan);
1608 impl Display for Wrapper<'_> {
1609 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1610 let with_schema = false;
1611 let mut visitor = IndentVisitor::new(f, with_schema);
1612 match self.0.visit_with_subqueries(&mut visitor) {
1613 Ok(_) => Ok(()),
1614 Err(_) => Err(fmt::Error),
1615 }
1616 }
1617 }
1618 Wrapper(self)
1619 }
1620
1621 pub fn display_indent_schema(&self) -> impl Display + '_ {
1651 struct Wrapper<'a>(&'a LogicalPlan);
1654 impl Display for Wrapper<'_> {
1655 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1656 let with_schema = true;
1657 let mut visitor = IndentVisitor::new(f, with_schema);
1658 match self.0.visit_with_subqueries(&mut visitor) {
1659 Ok(_) => Ok(()),
1660 Err(_) => Err(fmt::Error),
1661 }
1662 }
1663 }
1664 Wrapper(self)
1665 }
1666
1667 pub fn display_pg_json(&self) -> impl Display + '_ {
1671 struct Wrapper<'a>(&'a LogicalPlan);
1674 impl Display for Wrapper<'_> {
1675 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1676 let mut visitor = PgJsonVisitor::new(f);
1677 visitor.with_schema(true);
1678 match self.0.visit_with_subqueries(&mut visitor) {
1679 Ok(_) => Ok(()),
1680 Err(_) => Err(fmt::Error),
1681 }
1682 }
1683 }
1684 Wrapper(self)
1685 }
1686
1687 pub fn display_graphviz(&self) -> impl Display + '_ {
1717 struct Wrapper<'a>(&'a LogicalPlan);
1720 impl Display for Wrapper<'_> {
1721 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1722 let mut visitor = GraphvizVisitor::new(f);
1723
1724 visitor.start_graph()?;
1725
1726 visitor.pre_visit_plan("LogicalPlan")?;
1727 self.0
1728 .visit_with_subqueries(&mut visitor)
1729 .map_err(|_| fmt::Error)?;
1730 visitor.post_visit_plan()?;
1731
1732 visitor.set_with_schema(true);
1733 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1734 self.0
1735 .visit_with_subqueries(&mut visitor)
1736 .map_err(|_| fmt::Error)?;
1737 visitor.post_visit_plan()?;
1738
1739 visitor.end_graph()?;
1740 Ok(())
1741 }
1742 }
1743 Wrapper(self)
1744 }
1745
1746 pub fn display(&self) -> impl Display + '_ {
1768 struct Wrapper<'a>(&'a LogicalPlan);
1771 impl Display for Wrapper<'_> {
1772 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1773 match self.0 {
1774 LogicalPlan::EmptyRelation(EmptyRelation {
1775 produce_one_row,
1776 schema: _,
1777 }) => {
1778 let rows = if *produce_one_row { 1 } else { 0 };
1779 write!(f, "EmptyRelation: rows={rows}")
1780 }
1781 LogicalPlan::RecursiveQuery(RecursiveQuery {
1782 is_distinct, ..
1783 }) => {
1784 write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1785 }
1786 LogicalPlan::Values(Values { values, .. }) => {
1787 let str_values: Vec<_> = values
1788 .iter()
1789 .take(5)
1791 .map(|row| {
1792 let item = row
1793 .iter()
1794 .map(|expr| expr.to_string())
1795 .collect::<Vec<_>>()
1796 .join(", ");
1797 format!("({item})")
1798 })
1799 .collect();
1800
1801 let eclipse = if values.len() > 5 { "..." } else { "" };
1802 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1803 }
1804
1805 LogicalPlan::TableScan(TableScan {
1806 source,
1807 table_name,
1808 projection,
1809 filters,
1810 fetch,
1811 ..
1812 }) => {
1813 let projected_fields = match projection {
1814 Some(indices) => {
1815 let schema = source.schema();
1816 let names: Vec<&str> = indices
1817 .iter()
1818 .map(|i| schema.field(*i).name().as_str())
1819 .collect();
1820 format!(" projection=[{}]", names.join(", "))
1821 }
1822 _ => "".to_string(),
1823 };
1824
1825 write!(f, "TableScan: {table_name}{projected_fields}")?;
1826
1827 if !filters.is_empty() {
1828 let mut full_filter = vec![];
1829 let mut partial_filter = vec![];
1830 let mut unsupported_filters = vec![];
1831 let filters: Vec<&Expr> = filters.iter().collect();
1832
1833 if let Ok(results) =
1834 source.supports_filters_pushdown(&filters)
1835 {
1836 filters.iter().zip(results.iter()).for_each(
1837 |(x, res)| match res {
1838 TableProviderFilterPushDown::Exact => {
1839 full_filter.push(x)
1840 }
1841 TableProviderFilterPushDown::Inexact => {
1842 partial_filter.push(x)
1843 }
1844 TableProviderFilterPushDown::Unsupported => {
1845 unsupported_filters.push(x)
1846 }
1847 },
1848 );
1849 }
1850
1851 if !full_filter.is_empty() {
1852 write!(
1853 f,
1854 ", full_filters=[{}]",
1855 expr_vec_fmt!(full_filter)
1856 )?;
1857 };
1858 if !partial_filter.is_empty() {
1859 write!(
1860 f,
1861 ", partial_filters=[{}]",
1862 expr_vec_fmt!(partial_filter)
1863 )?;
1864 }
1865 if !unsupported_filters.is_empty() {
1866 write!(
1867 f,
1868 ", unsupported_filters=[{}]",
1869 expr_vec_fmt!(unsupported_filters)
1870 )?;
1871 }
1872 }
1873
1874 if let Some(n) = fetch {
1875 write!(f, ", fetch={n}")?;
1876 }
1877
1878 Ok(())
1879 }
1880 LogicalPlan::Projection(Projection { expr, .. }) => {
1881 write!(f, "Projection:")?;
1882 for (i, expr_item) in expr.iter().enumerate() {
1883 if i > 0 {
1884 write!(f, ",")?;
1885 }
1886 write!(f, " {expr_item}")?;
1887 }
1888 Ok(())
1889 }
1890 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1891 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1892 }
1893 LogicalPlan::Copy(CopyTo {
1894 input: _,
1895 output_url,
1896 file_type,
1897 options,
1898 ..
1899 }) => {
1900 let op_str = options
1901 .iter()
1902 .map(|(k, v)| format!("{k} {v}"))
1903 .collect::<Vec<String>>()
1904 .join(", ");
1905
1906 write!(
1907 f,
1908 "CopyTo: format={} output_url={output_url} options: ({op_str})",
1909 file_type.get_ext()
1910 )
1911 }
1912 LogicalPlan::Ddl(ddl) => {
1913 write!(f, "{}", ddl.display())
1914 }
1915 LogicalPlan::Filter(Filter {
1916 predicate: expr, ..
1917 }) => write!(f, "Filter: {expr}"),
1918 LogicalPlan::Window(Window { window_expr, .. }) => {
1919 write!(
1920 f,
1921 "WindowAggr: windowExpr=[[{}]]",
1922 expr_vec_fmt!(window_expr)
1923 )
1924 }
1925 LogicalPlan::Aggregate(Aggregate {
1926 group_expr,
1927 aggr_expr,
1928 ..
1929 }) => write!(
1930 f,
1931 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1932 expr_vec_fmt!(group_expr),
1933 expr_vec_fmt!(aggr_expr)
1934 ),
1935 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1936 write!(f, "Sort: ")?;
1937 for (i, expr_item) in expr.iter().enumerate() {
1938 if i > 0 {
1939 write!(f, ", ")?;
1940 }
1941 write!(f, "{expr_item}")?;
1942 }
1943 if let Some(a) = fetch {
1944 write!(f, ", fetch={a}")?;
1945 }
1946
1947 Ok(())
1948 }
1949 LogicalPlan::Join(Join {
1950 on: keys,
1951 filter,
1952 join_constraint,
1953 join_type,
1954 ..
1955 }) => {
1956 let join_expr: Vec<String> =
1957 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1958 let filter_expr = filter
1959 .as_ref()
1960 .map(|expr| format!(" Filter: {expr}"))
1961 .unwrap_or_else(|| "".to_string());
1962 let join_type = if filter.is_none()
1963 && keys.is_empty()
1964 && matches!(join_type, JoinType::Inner)
1965 {
1966 "Cross".to_string()
1967 } else {
1968 join_type.to_string()
1969 };
1970 match join_constraint {
1971 JoinConstraint::On => {
1972 write!(
1973 f,
1974 "{} Join: {}{}",
1975 join_type,
1976 join_expr.join(", "),
1977 filter_expr
1978 )
1979 }
1980 JoinConstraint::Using => {
1981 write!(
1982 f,
1983 "{} Join: Using {}{}",
1984 join_type,
1985 join_expr.join(", "),
1986 filter_expr,
1987 )
1988 }
1989 }
1990 }
1991 LogicalPlan::Repartition(Repartition {
1992 partitioning_scheme,
1993 ..
1994 }) => match partitioning_scheme {
1995 Partitioning::RoundRobinBatch(n) => {
1996 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1997 }
1998 Partitioning::Hash(expr, n) => {
1999 let hash_expr: Vec<String> =
2000 expr.iter().map(|e| format!("{e}")).collect();
2001 write!(
2002 f,
2003 "Repartition: Hash({}) partition_count={}",
2004 hash_expr.join(", "),
2005 n
2006 )
2007 }
2008 Partitioning::DistributeBy(expr) => {
2009 let dist_by_expr: Vec<String> =
2010 expr.iter().map(|e| format!("{e}")).collect();
2011 write!(
2012 f,
2013 "Repartition: DistributeBy({})",
2014 dist_by_expr.join(", "),
2015 )
2016 }
2017 },
2018 LogicalPlan::Limit(limit) => {
2019 let skip_str = match limit.get_skip_type() {
2021 Ok(SkipType::Literal(n)) => n.to_string(),
2022 _ => limit
2023 .skip
2024 .as_ref()
2025 .map_or_else(|| "None".to_string(), |x| x.to_string()),
2026 };
2027 let fetch_str = match limit.get_fetch_type() {
2028 Ok(FetchType::Literal(Some(n))) => n.to_string(),
2029 Ok(FetchType::Literal(None)) => "None".to_string(),
2030 _ => limit
2031 .fetch
2032 .as_ref()
2033 .map_or_else(|| "None".to_string(), |x| x.to_string()),
2034 };
2035 write!(f, "Limit: skip={skip_str}, fetch={fetch_str}",)
2036 }
2037 LogicalPlan::Subquery(Subquery { .. }) => {
2038 write!(f, "Subquery:")
2039 }
2040 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
2041 write!(f, "SubqueryAlias: {alias}")
2042 }
2043 LogicalPlan::Statement(statement) => {
2044 write!(f, "{}", statement.display())
2045 }
2046 LogicalPlan::Distinct(distinct) => match distinct {
2047 Distinct::All(_) => write!(f, "Distinct:"),
2048 Distinct::On(DistinctOn {
2049 on_expr,
2050 select_expr,
2051 sort_expr,
2052 ..
2053 }) => write!(
2054 f,
2055 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2056 expr_vec_fmt!(on_expr),
2057 expr_vec_fmt!(select_expr),
2058 if let Some(sort_expr) = sort_expr {
2059 expr_vec_fmt!(sort_expr)
2060 } else {
2061 "".to_string()
2062 },
2063 ),
2064 },
2065 LogicalPlan::Explain { .. } => write!(f, "Explain"),
2066 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2067 LogicalPlan::Union(_) => write!(f, "Union"),
2068 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2069 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2070 write!(f, "DescribeTable")
2071 }
2072 LogicalPlan::Unnest(Unnest {
2073 input: plan,
2074 list_type_columns: list_col_indices,
2075 struct_type_columns: struct_col_indices,
2076 ..
2077 }) => {
2078 let input_columns = plan.schema().columns();
2079 let list_type_columns = list_col_indices
2080 .iter()
2081 .map(|(i, unnest_info)| {
2082 format!(
2083 "{}|depth={}",
2084 &input_columns[*i].to_string(),
2085 unnest_info.depth
2086 )
2087 })
2088 .collect::<Vec<String>>();
2089 let struct_type_columns = struct_col_indices
2090 .iter()
2091 .map(|i| &input_columns[*i])
2092 .collect::<Vec<&Column>>();
2093 write!(
2095 f,
2096 "Unnest: lists[{}] structs[{}]",
2097 expr_vec_fmt!(list_type_columns),
2098 expr_vec_fmt!(struct_type_columns)
2099 )
2100 }
2101 }
2102 }
2103 }
2104 Wrapper(self)
2105 }
2106}
2107
2108impl Display for LogicalPlan {
2109 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2110 self.display_indent().fmt(f)
2111 }
2112}
2113
2114impl ToStringifiedPlan for LogicalPlan {
2115 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2116 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2117 }
2118}
2119
2120#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2124pub struct EmptyRelation {
2125 pub produce_one_row: bool,
2127 pub schema: DFSchemaRef,
2129}
2130
2131impl PartialOrd for EmptyRelation {
2133 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2134 self.produce_one_row
2135 .partial_cmp(&other.produce_one_row)
2136 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2138 }
2139}
2140
2141#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2164pub struct RecursiveQuery {
2165 pub name: String,
2167 pub static_term: Arc<LogicalPlan>,
2169 pub recursive_term: Arc<LogicalPlan>,
2172 pub is_distinct: bool,
2175}
2176
2177#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2181pub struct Values {
2182 pub schema: DFSchemaRef,
2184 pub values: Vec<Vec<Expr>>,
2186}
2187
2188impl PartialOrd for Values {
2190 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2191 self.values
2192 .partial_cmp(&other.values)
2193 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2195 }
2196}
2197
2198#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2201#[non_exhaustive]
2203pub struct Projection {
2204 pub expr: Vec<Expr>,
2206 pub input: Arc<LogicalPlan>,
2208 pub schema: DFSchemaRef,
2210}
2211
2212impl PartialOrd for Projection {
2214 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2215 match self.expr.partial_cmp(&other.expr) {
2216 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2217 cmp => cmp,
2218 }
2219 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2221 }
2222}
2223
2224impl Projection {
2225 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2227 let projection_schema = projection_schema(&input, &expr)?;
2228 Self::try_new_with_schema(expr, input, projection_schema)
2229 }
2230
2231 pub fn try_new_with_schema(
2233 expr: Vec<Expr>,
2234 input: Arc<LogicalPlan>,
2235 schema: DFSchemaRef,
2236 ) -> Result<Self> {
2237 #[expect(deprecated)]
2238 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2239 && expr.len() != schema.fields().len()
2240 {
2241 return plan_err!(
2242 "Projection has mismatch between number of expressions ({}) and number of fields in schema ({})",
2243 expr.len(),
2244 schema.fields().len()
2245 );
2246 }
2247 Ok(Self {
2248 expr,
2249 input,
2250 schema,
2251 })
2252 }
2253
2254 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2256 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2257 Self {
2258 expr,
2259 input,
2260 schema,
2261 }
2262 }
2263}
2264
2265pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2285 let metadata = input.schema().metadata().clone();
2287
2288 let schema =
2290 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2291 .with_functional_dependencies(calc_func_dependencies_for_project(
2292 exprs, input,
2293 )?)?;
2294
2295 Ok(Arc::new(schema))
2296}
2297
2298#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2300#[non_exhaustive]
2302pub struct SubqueryAlias {
2303 pub input: Arc<LogicalPlan>,
2305 pub alias: TableReference,
2307 pub schema: DFSchemaRef,
2309}
2310
2311impl SubqueryAlias {
2312 pub fn try_new(
2313 plan: Arc<LogicalPlan>,
2314 alias: impl Into<TableReference>,
2315 ) -> Result<Self> {
2316 let alias = alias.into();
2317
2318 let aliases = unique_field_aliases(plan.schema().fields());
2324 let is_projection_needed = aliases.iter().any(Option::is_some);
2325
2326 let plan = if is_projection_needed {
2328 let projection_expressions = aliases
2329 .iter()
2330 .zip(plan.schema().iter())
2331 .map(|(alias, (qualifier, field))| {
2332 let column =
2333 Expr::Column(Column::new(qualifier.cloned(), field.name()));
2334 match alias {
2335 None => column,
2336 Some(alias) => {
2337 Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2338 }
2339 }
2340 })
2341 .collect();
2342 let projection = Projection::try_new(projection_expressions, plan)?;
2343 Arc::new(LogicalPlan::Projection(projection))
2344 } else {
2345 plan
2346 };
2347
2348 let fields = plan.schema().fields().clone();
2350 let meta_data = plan.schema().metadata().clone();
2351 let func_dependencies = plan.schema().functional_dependencies().clone();
2352
2353 let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2354 let schema = schema.as_arrow();
2355
2356 let schema = DFSchemaRef::new(
2357 DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2358 .with_functional_dependencies(func_dependencies)?,
2359 );
2360 Ok(SubqueryAlias {
2361 input: plan,
2362 alias,
2363 schema,
2364 })
2365 }
2366}
2367
2368impl PartialOrd for SubqueryAlias {
2370 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2371 match self.input.partial_cmp(&other.input) {
2372 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2373 cmp => cmp,
2374 }
2375 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2377 }
2378}
2379
2380#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2392#[non_exhaustive]
2393pub struct Filter {
2394 pub predicate: Expr,
2396 pub input: Arc<LogicalPlan>,
2398}
2399
2400impl Filter {
2401 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2406 Self::try_new_internal(predicate, input)
2407 }
2408
2409 #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2412 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2413 Self::try_new_internal(predicate, input)
2414 }
2415
2416 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2417 match data_type {
2418 DataType::Boolean | DataType::Null => true,
2420 DataType::Dictionary(_, value_type) => {
2421 Filter::is_allowed_filter_type(value_type.as_ref())
2422 }
2423 _ => false,
2424 }
2425 }
2426
2427 fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2428 if let Ok(predicate_type) = predicate.get_type(input.schema())
2433 && !Filter::is_allowed_filter_type(&predicate_type)
2434 {
2435 return plan_err!(
2436 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2437 );
2438 }
2439
2440 Ok(Self {
2441 predicate: predicate.unalias_nested().data,
2442 input,
2443 })
2444 }
2445
2446 fn is_scalar(&self) -> bool {
2462 let schema = self.input.schema();
2463
2464 let functional_dependencies = self.input.schema().functional_dependencies();
2465 let unique_keys = functional_dependencies.iter().filter(|dep| {
2466 let nullable = dep.nullable
2467 && dep
2468 .source_indices
2469 .iter()
2470 .any(|&source| schema.field(source).is_nullable());
2471 !nullable
2472 && dep.mode == Dependency::Single
2473 && dep.target_indices.len() == schema.fields().len()
2474 });
2475
2476 let exprs = split_conjunction(&self.predicate);
2477 let eq_pred_cols: HashSet<_> = exprs
2478 .iter()
2479 .filter_map(|expr| {
2480 let Expr::BinaryExpr(BinaryExpr {
2481 left,
2482 op: Operator::Eq,
2483 right,
2484 }) = expr
2485 else {
2486 return None;
2487 };
2488 if left == right {
2490 return None;
2491 }
2492
2493 match (left.as_ref(), right.as_ref()) {
2494 (Expr::Column(_), Expr::Column(_)) => None,
2495 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2496 Some(schema.index_of_column(c).unwrap())
2497 }
2498 _ => None,
2499 }
2500 })
2501 .collect();
2502
2503 for key in unique_keys {
2506 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2507 return true;
2508 }
2509 }
2510 false
2511 }
2512}
2513
2514#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2529pub struct Window {
2530 pub input: Arc<LogicalPlan>,
2532 pub window_expr: Vec<Expr>,
2534 pub schema: DFSchemaRef,
2536}
2537
2538impl Window {
2539 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2541 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2542 .schema()
2543 .iter()
2544 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2545 .collect();
2546 let input_len = fields.len();
2547 let mut window_fields = fields;
2548 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2549 window_fields.extend_from_slice(expr_fields.as_slice());
2550 let metadata = input.schema().metadata().clone();
2551
2552 let mut window_func_dependencies =
2554 input.schema().functional_dependencies().clone();
2555 window_func_dependencies.extend_target_indices(window_fields.len());
2556
2557 let mut new_dependencies = window_expr
2561 .iter()
2562 .enumerate()
2563 .filter_map(|(idx, expr)| {
2564 let Expr::WindowFunction(window_fun) = expr else {
2565 return None;
2566 };
2567 let WindowFunction {
2568 fun: WindowFunctionDefinition::WindowUDF(udwf),
2569 params: WindowFunctionParams { partition_by, .. },
2570 } = window_fun.as_ref()
2571 else {
2572 return None;
2573 };
2574 if udwf.name() == "row_number" && partition_by.is_empty() {
2577 Some(idx + input_len)
2578 } else {
2579 None
2580 }
2581 })
2582 .map(|idx| {
2583 FunctionalDependence::new(vec![idx], vec![], false)
2584 .with_mode(Dependency::Single)
2585 })
2586 .collect::<Vec<_>>();
2587
2588 if !new_dependencies.is_empty() {
2589 for dependence in new_dependencies.iter_mut() {
2590 dependence.target_indices = (0..window_fields.len()).collect();
2591 }
2592 let new_deps = FunctionalDependencies::new(new_dependencies);
2594 window_func_dependencies.extend(new_deps);
2595 }
2596
2597 if let Some(e) = window_expr.iter().find(|e| {
2599 matches!(
2600 e,
2601 Expr::WindowFunction(wf)
2602 if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2603 && wf.params.filter.is_some()
2604 )
2605 }) {
2606 return plan_err!(
2607 "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2608 );
2609 }
2610
2611 Self::try_new_with_schema(
2612 window_expr,
2613 input,
2614 Arc::new(
2615 DFSchema::new_with_metadata(window_fields, metadata)?
2616 .with_functional_dependencies(window_func_dependencies)?,
2617 ),
2618 )
2619 }
2620
2621 pub fn try_new_with_schema(
2627 window_expr: Vec<Expr>,
2628 input: Arc<LogicalPlan>,
2629 schema: DFSchemaRef,
2630 ) -> Result<Self> {
2631 let input_fields_count = input.schema().fields().len();
2632 if schema.fields().len() != input_fields_count + window_expr.len() {
2633 return plan_err!(
2634 "Window schema has wrong number of fields. Expected {} got {}",
2635 input_fields_count + window_expr.len(),
2636 schema.fields().len()
2637 );
2638 }
2639
2640 Ok(Window {
2641 input,
2642 window_expr,
2643 schema,
2644 })
2645 }
2646}
2647
2648impl PartialOrd for Window {
2650 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2651 match self.input.partial_cmp(&other.input)? {
2652 Ordering::Equal => {} not_equal => return Some(not_equal),
2654 }
2655
2656 match self.window_expr.partial_cmp(&other.window_expr)? {
2657 Ordering::Equal => {} not_equal => return Some(not_equal),
2659 }
2660
2661 if self == other {
2664 Some(Ordering::Equal)
2665 } else {
2666 None
2667 }
2668 }
2669}
2670
2671#[derive(Clone)]
2673pub struct TableScan {
2674 pub table_name: TableReference,
2676 pub source: Arc<dyn TableSource>,
2678 pub projection: Option<Vec<usize>>,
2680 pub projected_schema: DFSchemaRef,
2682 pub filters: Vec<Expr>,
2684 pub fetch: Option<usize>,
2686}
2687
2688impl Debug for TableScan {
2689 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2690 f.debug_struct("TableScan")
2691 .field("table_name", &self.table_name)
2692 .field("source", &"...")
2693 .field("projection", &self.projection)
2694 .field("projected_schema", &self.projected_schema)
2695 .field("filters", &self.filters)
2696 .field("fetch", &self.fetch)
2697 .finish_non_exhaustive()
2698 }
2699}
2700
2701impl PartialEq for TableScan {
2702 fn eq(&self, other: &Self) -> bool {
2703 self.table_name == other.table_name
2704 && self.projection == other.projection
2705 && self.projected_schema == other.projected_schema
2706 && self.filters == other.filters
2707 && self.fetch == other.fetch
2708 }
2709}
2710
2711impl Eq for TableScan {}
2712
2713impl PartialOrd for TableScan {
2716 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2717 #[derive(PartialEq, PartialOrd)]
2718 struct ComparableTableScan<'a> {
2719 pub table_name: &'a TableReference,
2721 pub projection: &'a Option<Vec<usize>>,
2723 pub filters: &'a Vec<Expr>,
2725 pub fetch: &'a Option<usize>,
2727 }
2728 let comparable_self = ComparableTableScan {
2729 table_name: &self.table_name,
2730 projection: &self.projection,
2731 filters: &self.filters,
2732 fetch: &self.fetch,
2733 };
2734 let comparable_other = ComparableTableScan {
2735 table_name: &other.table_name,
2736 projection: &other.projection,
2737 filters: &other.filters,
2738 fetch: &other.fetch,
2739 };
2740 comparable_self
2741 .partial_cmp(&comparable_other)
2742 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2744 }
2745}
2746
2747impl Hash for TableScan {
2748 fn hash<H: Hasher>(&self, state: &mut H) {
2749 self.table_name.hash(state);
2750 self.projection.hash(state);
2751 self.projected_schema.hash(state);
2752 self.filters.hash(state);
2753 self.fetch.hash(state);
2754 }
2755}
2756
2757impl TableScan {
2758 pub fn try_new(
2761 table_name: impl Into<TableReference>,
2762 table_source: Arc<dyn TableSource>,
2763 projection: Option<Vec<usize>>,
2764 filters: Vec<Expr>,
2765 fetch: Option<usize>,
2766 ) -> Result<Self> {
2767 let table_name = table_name.into();
2768
2769 if table_name.table().is_empty() {
2770 return plan_err!("table_name cannot be empty");
2771 }
2772 let schema = table_source.schema();
2773 let func_dependencies = FunctionalDependencies::new_from_constraints(
2774 table_source.constraints(),
2775 schema.fields.len(),
2776 );
2777 let projected_schema = projection
2778 .as_ref()
2779 .map(|p| {
2780 let projected_func_dependencies =
2781 func_dependencies.project_functional_dependencies(p, p.len());
2782
2783 let df_schema = DFSchema::new_with_metadata(
2784 p.iter()
2785 .map(|i| {
2786 (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2787 })
2788 .collect(),
2789 schema.metadata.clone(),
2790 )?;
2791 df_schema.with_functional_dependencies(projected_func_dependencies)
2792 })
2793 .unwrap_or_else(|| {
2794 let df_schema =
2795 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2796 df_schema.with_functional_dependencies(func_dependencies)
2797 })?;
2798 let projected_schema = Arc::new(projected_schema);
2799
2800 Ok(Self {
2801 table_name,
2802 source: table_source,
2803 projection,
2804 projected_schema,
2805 filters,
2806 fetch,
2807 })
2808 }
2809}
2810
2811#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2813pub struct Repartition {
2814 pub input: Arc<LogicalPlan>,
2816 pub partitioning_scheme: Partitioning,
2818}
2819
2820#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2822pub struct Union {
2823 pub inputs: Vec<Arc<LogicalPlan>>,
2825 pub schema: DFSchemaRef,
2827}
2828
2829impl Union {
2830 pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2833 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2834 Ok(Union { inputs, schema })
2835 }
2836
2837 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2842 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2843 Ok(Union { inputs, schema })
2844 }
2845
2846 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2850 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2851 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2852
2853 Ok(Union { inputs, schema })
2854 }
2855
2856 fn rewrite_inputs_from_schema(
2860 schema: &Arc<DFSchema>,
2861 inputs: Vec<Arc<LogicalPlan>>,
2862 ) -> Result<Vec<Arc<LogicalPlan>>> {
2863 let schema_width = schema.iter().count();
2864 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2865 for input in inputs {
2866 let mut expr = Vec::with_capacity(schema_width);
2870 for column in schema.columns() {
2871 if input
2872 .schema()
2873 .has_column_with_unqualified_name(column.name())
2874 {
2875 expr.push(Expr::Column(column));
2876 } else {
2877 expr.push(
2878 Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2879 );
2880 }
2881 }
2882 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2883 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2884 )));
2885 }
2886
2887 Ok(wrapped_inputs)
2888 }
2889
2890 fn derive_schema_from_inputs(
2899 inputs: &[Arc<LogicalPlan>],
2900 loose_types: bool,
2901 by_name: bool,
2902 ) -> Result<DFSchemaRef> {
2903 if inputs.len() < 2 {
2904 return plan_err!("UNION requires at least two inputs");
2905 }
2906
2907 if by_name {
2908 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2909 } else {
2910 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2911 }
2912 }
2913
2914 fn derive_schema_from_inputs_by_name(
2915 inputs: &[Arc<LogicalPlan>],
2916 loose_types: bool,
2917 ) -> Result<DFSchemaRef> {
2918 type FieldData<'a> =
2919 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2920 let mut cols: Vec<(&str, FieldData)> = Vec::new();
2921 for input in inputs.iter() {
2922 for field in input.schema().fields() {
2923 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2924 cols.iter_mut().find(|(name, _)| name == field.name())
2925 {
2926 if !loose_types && *data_type != field.data_type() {
2927 return plan_err!(
2928 "Found different types for field {}",
2929 field.name()
2930 );
2931 }
2932
2933 metadata.push(field.metadata());
2934 *is_nullable |= field.is_nullable();
2937 *occurrences += 1;
2938 } else {
2939 cols.push((
2940 field.name(),
2941 (
2942 field.data_type(),
2943 field.is_nullable(),
2944 vec![field.metadata()],
2945 1,
2946 ),
2947 ));
2948 }
2949 }
2950 }
2951
2952 let union_fields = cols
2953 .into_iter()
2954 .map(
2955 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2956 let final_is_nullable = if occurrences == inputs.len() {
2960 is_nullable
2961 } else {
2962 true
2963 };
2964
2965 let mut field =
2966 Field::new(name, data_type.clone(), final_is_nullable);
2967 field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
2968
2969 (None, Arc::new(field))
2970 },
2971 )
2972 .collect::<Vec<(Option<TableReference>, _)>>();
2973
2974 let union_schema_metadata = intersect_metadata_for_union(
2975 inputs.iter().map(|input| input.schema().metadata()),
2976 );
2977
2978 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2980 let schema = Arc::new(schema);
2981
2982 Ok(schema)
2983 }
2984
2985 fn derive_schema_from_inputs_by_position(
2986 inputs: &[Arc<LogicalPlan>],
2987 loose_types: bool,
2988 ) -> Result<DFSchemaRef> {
2989 let first_schema = inputs[0].schema();
2990 let fields_count = first_schema.fields().len();
2991 for input in inputs.iter().skip(1) {
2992 if fields_count != input.schema().fields().len() {
2993 return plan_err!(
2994 "UNION queries have different number of columns: \
2995 left has {} columns whereas right has {} columns",
2996 fields_count,
2997 input.schema().fields().len()
2998 );
2999 }
3000 }
3001
3002 let mut name_counts: HashMap<String, usize> = HashMap::new();
3003 let union_fields = (0..fields_count)
3004 .map(|i| {
3005 let fields = inputs
3006 .iter()
3007 .map(|input| input.schema().field(i))
3008 .collect::<Vec<_>>();
3009 let first_field = fields[0];
3010 let base_name = first_field.name().to_string();
3011
3012 let data_type = if loose_types {
3013 first_field.data_type()
3017 } else {
3018 fields.iter().skip(1).try_fold(
3019 first_field.data_type(),
3020 |acc, field| {
3021 if acc != field.data_type() {
3022 return plan_err!(
3023 "UNION field {i} have different type in inputs: \
3024 left has {} whereas right has {}",
3025 first_field.data_type(),
3026 field.data_type()
3027 );
3028 }
3029 Ok(acc)
3030 },
3031 )?
3032 };
3033 let nullable = fields.iter().any(|field| field.is_nullable());
3034
3035 let name = if let Some(count) = name_counts.get_mut(&base_name) {
3037 *count += 1;
3038 format!("{base_name}_{count}")
3039 } else {
3040 name_counts.insert(base_name.clone(), 0);
3041 base_name
3042 };
3043
3044 let mut field = Field::new(&name, data_type.clone(), nullable);
3045 let field_metadata = intersect_metadata_for_union(
3046 fields.iter().map(|field| field.metadata()),
3047 );
3048 field.set_metadata(field_metadata);
3049 Ok((None, Arc::new(field)))
3050 })
3051 .collect::<Result<_>>()?;
3052 let union_schema_metadata = intersect_metadata_for_union(
3053 inputs.iter().map(|input| input.schema().metadata()),
3054 );
3055
3056 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3058 let schema = Arc::new(schema);
3059
3060 Ok(schema)
3061 }
3062}
3063
3064impl PartialOrd for Union {
3066 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3067 self.inputs
3068 .partial_cmp(&other.inputs)
3069 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3071 }
3072}
3073
3074#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3097pub struct DescribeTable {
3098 pub schema: Arc<Schema>,
3100 pub output_schema: DFSchemaRef,
3102}
3103
3104impl PartialOrd for DescribeTable {
3107 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3108 None
3110 }
3111}
3112
3113#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3115pub struct ExplainOption {
3116 pub verbose: bool,
3118 pub analyze: bool,
3120 pub format: ExplainFormat,
3122}
3123
3124impl Default for ExplainOption {
3125 fn default() -> Self {
3126 ExplainOption {
3127 verbose: false,
3128 analyze: false,
3129 format: ExplainFormat::Indent,
3130 }
3131 }
3132}
3133
3134impl ExplainOption {
3135 pub fn with_verbose(mut self, verbose: bool) -> Self {
3137 self.verbose = verbose;
3138 self
3139 }
3140
3141 pub fn with_analyze(mut self, analyze: bool) -> Self {
3143 self.analyze = analyze;
3144 self
3145 }
3146
3147 pub fn with_format(mut self, format: ExplainFormat) -> Self {
3149 self.format = format;
3150 self
3151 }
3152}
3153
3154#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3161pub struct Explain {
3162 pub verbose: bool,
3164 pub explain_format: ExplainFormat,
3167 pub plan: Arc<LogicalPlan>,
3169 pub stringified_plans: Vec<StringifiedPlan>,
3171 pub schema: DFSchemaRef,
3173 pub logical_optimization_succeeded: bool,
3175}
3176
3177impl PartialOrd for Explain {
3179 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3180 #[derive(PartialEq, PartialOrd)]
3181 struct ComparableExplain<'a> {
3182 pub verbose: &'a bool,
3184 pub plan: &'a Arc<LogicalPlan>,
3186 pub stringified_plans: &'a Vec<StringifiedPlan>,
3188 pub logical_optimization_succeeded: &'a bool,
3190 }
3191 let comparable_self = ComparableExplain {
3192 verbose: &self.verbose,
3193 plan: &self.plan,
3194 stringified_plans: &self.stringified_plans,
3195 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3196 };
3197 let comparable_other = ComparableExplain {
3198 verbose: &other.verbose,
3199 plan: &other.plan,
3200 stringified_plans: &other.stringified_plans,
3201 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3202 };
3203 comparable_self
3204 .partial_cmp(&comparable_other)
3205 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3207 }
3208}
3209
3210#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3213pub struct Analyze {
3214 pub verbose: bool,
3216 pub input: Arc<LogicalPlan>,
3218 pub schema: DFSchemaRef,
3220}
3221
3222impl PartialOrd for Analyze {
3224 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3225 match self.verbose.partial_cmp(&other.verbose) {
3226 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3227 cmp => cmp,
3228 }
3229 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3231 }
3232}
3233
3234#[allow(clippy::allow_attributes)]
3239#[allow(clippy::derived_hash_with_manual_eq)]
3240#[derive(Debug, Clone, Eq, Hash)]
3241pub struct Extension {
3242 pub node: Arc<dyn UserDefinedLogicalNode>,
3244}
3245
3246impl PartialEq for Extension {
3250 fn eq(&self, other: &Self) -> bool {
3251 self.node.eq(&other.node)
3252 }
3253}
3254
3255impl PartialOrd for Extension {
3256 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3257 self.node.partial_cmp(&other.node)
3258 }
3259}
3260
3261#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3263pub struct Limit {
3264 pub skip: Option<Box<Expr>>,
3266 pub fetch: Option<Box<Expr>>,
3269 pub input: Arc<LogicalPlan>,
3271}
3272
3273pub enum SkipType {
3275 Literal(usize),
3277 UnsupportedExpr,
3279}
3280
3281pub enum FetchType {
3283 Literal(Option<usize>),
3286 UnsupportedExpr,
3288}
3289
3290impl Limit {
3291 pub fn get_skip_type(&self) -> Result<SkipType> {
3293 match self.skip.as_deref() {
3294 Some(expr) => match *expr {
3295 Expr::Literal(ScalarValue::Int64(s), _) => {
3296 let s = s.unwrap_or(0);
3298 if s >= 0 {
3299 Ok(SkipType::Literal(s as usize))
3300 } else {
3301 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3302 }
3303 }
3304 _ => Ok(SkipType::UnsupportedExpr),
3305 },
3306 None => Ok(SkipType::Literal(0)),
3308 }
3309 }
3310
3311 pub fn get_fetch_type(&self) -> Result<FetchType> {
3313 match self.fetch.as_deref() {
3314 Some(expr) => match *expr {
3315 Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3316 if s >= 0 {
3317 Ok(FetchType::Literal(Some(s as usize)))
3318 } else {
3319 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3320 }
3321 }
3322 Expr::Literal(ScalarValue::Int64(None), _) => {
3323 Ok(FetchType::Literal(None))
3324 }
3325 _ => Ok(FetchType::UnsupportedExpr),
3326 },
3327 None => Ok(FetchType::Literal(None)),
3328 }
3329 }
3330}
3331
3332#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3334pub enum Distinct {
3335 All(Arc<LogicalPlan>),
3337 On(DistinctOn),
3339}
3340
3341impl Distinct {
3342 pub fn input(&self) -> &Arc<LogicalPlan> {
3344 match self {
3345 Distinct::All(input) => input,
3346 Distinct::On(DistinctOn { input, .. }) => input,
3347 }
3348 }
3349}
3350
3351#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3353pub struct DistinctOn {
3354 pub on_expr: Vec<Expr>,
3356 pub select_expr: Vec<Expr>,
3358 pub sort_expr: Option<Vec<SortExpr>>,
3362 pub input: Arc<LogicalPlan>,
3364 pub schema: DFSchemaRef,
3366}
3367
3368impl DistinctOn {
3369 pub fn try_new(
3371 on_expr: Vec<Expr>,
3372 select_expr: Vec<Expr>,
3373 sort_expr: Option<Vec<SortExpr>>,
3374 input: Arc<LogicalPlan>,
3375 ) -> Result<Self> {
3376 if on_expr.is_empty() {
3377 return plan_err!("No `ON` expressions provided");
3378 }
3379
3380 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3381 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3382 .into_iter()
3383 .collect();
3384
3385 let dfschema = DFSchema::new_with_metadata(
3386 qualified_fields,
3387 input.schema().metadata().clone(),
3388 )?;
3389
3390 let mut distinct_on = DistinctOn {
3391 on_expr,
3392 select_expr,
3393 sort_expr: None,
3394 input,
3395 schema: Arc::new(dfschema),
3396 };
3397
3398 if let Some(sort_expr) = sort_expr {
3399 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3400 }
3401
3402 Ok(distinct_on)
3403 }
3404
3405 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3409 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3410
3411 let mut matched = true;
3413 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3414 if on != &sort.expr {
3415 matched = false;
3416 break;
3417 }
3418 }
3419
3420 if self.on_expr.len() > sort_expr.len() || !matched {
3421 return plan_err!(
3422 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3423 );
3424 }
3425
3426 self.sort_expr = Some(sort_expr);
3427 Ok(self)
3428 }
3429}
3430
3431impl PartialOrd for DistinctOn {
3433 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3434 #[derive(PartialEq, PartialOrd)]
3435 struct ComparableDistinctOn<'a> {
3436 pub on_expr: &'a Vec<Expr>,
3438 pub select_expr: &'a Vec<Expr>,
3440 pub sort_expr: &'a Option<Vec<SortExpr>>,
3444 pub input: &'a Arc<LogicalPlan>,
3446 }
3447 let comparable_self = ComparableDistinctOn {
3448 on_expr: &self.on_expr,
3449 select_expr: &self.select_expr,
3450 sort_expr: &self.sort_expr,
3451 input: &self.input,
3452 };
3453 let comparable_other = ComparableDistinctOn {
3454 on_expr: &other.on_expr,
3455 select_expr: &other.select_expr,
3456 sort_expr: &other.sort_expr,
3457 input: &other.input,
3458 };
3459 comparable_self
3460 .partial_cmp(&comparable_other)
3461 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3463 }
3464}
3465
3466#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3479#[non_exhaustive]
3481pub struct Aggregate {
3482 pub input: Arc<LogicalPlan>,
3484 pub group_expr: Vec<Expr>,
3486 pub aggr_expr: Vec<Expr>,
3488 pub schema: DFSchemaRef,
3490}
3491
3492impl Aggregate {
3493 pub fn try_new(
3495 input: Arc<LogicalPlan>,
3496 group_expr: Vec<Expr>,
3497 aggr_expr: Vec<Expr>,
3498 ) -> Result<Self> {
3499 let group_expr = enumerate_grouping_sets(group_expr)?;
3500
3501 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3502
3503 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3504
3505 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3506
3507 if is_grouping_set {
3509 qualified_fields = qualified_fields
3510 .into_iter()
3511 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3512 .collect::<Vec<_>>();
3513 qualified_fields.push((
3514 None,
3515 Field::new(
3516 Self::INTERNAL_GROUPING_ID,
3517 Self::grouping_id_type(qualified_fields.len()),
3518 false,
3519 )
3520 .into(),
3521 ));
3522 }
3523
3524 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3525
3526 let schema = DFSchema::new_with_metadata(
3527 qualified_fields,
3528 input.schema().metadata().clone(),
3529 )?;
3530
3531 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3532 }
3533
3534 #[expect(clippy::needless_pass_by_value)]
3540 pub fn try_new_with_schema(
3541 input: Arc<LogicalPlan>,
3542 group_expr: Vec<Expr>,
3543 aggr_expr: Vec<Expr>,
3544 schema: DFSchemaRef,
3545 ) -> Result<Self> {
3546 if group_expr.is_empty() && aggr_expr.is_empty() {
3547 return plan_err!(
3548 "Aggregate requires at least one grouping or aggregate expression. \
3549 Aggregate without grouping expressions nor aggregate expressions is \
3550 logically equivalent to, but less efficient than, VALUES producing \
3551 single row. Please use VALUES instead."
3552 );
3553 }
3554 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3555 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3556 return plan_err!(
3557 "Aggregate schema has wrong number of fields. Expected {} got {}",
3558 group_expr_count + aggr_expr.len(),
3559 schema.fields().len()
3560 );
3561 }
3562
3563 let aggregate_func_dependencies =
3564 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3565 let new_schema = schema.as_ref().clone();
3566 let schema = Arc::new(
3567 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3568 );
3569 Ok(Self {
3570 input,
3571 group_expr,
3572 aggr_expr,
3573 schema,
3574 })
3575 }
3576
3577 fn is_grouping_set(&self) -> bool {
3578 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3579 }
3580
3581 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3583 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3584 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3585 });
3586 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3587 if self.is_grouping_set() {
3588 exprs.push(&INTERNAL_ID_EXPR);
3589 }
3590 exprs.extend(self.aggr_expr.iter());
3591 debug_assert!(exprs.len() == self.schema.fields().len());
3592 Ok(exprs)
3593 }
3594
3595 pub fn group_expr_len(&self) -> Result<usize> {
3599 grouping_set_expr_count(&self.group_expr)
3600 }
3601
3602 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3607 if group_exprs <= 8 {
3608 DataType::UInt8
3609 } else if group_exprs <= 16 {
3610 DataType::UInt16
3611 } else if group_exprs <= 32 {
3612 DataType::UInt32
3613 } else {
3614 DataType::UInt64
3615 }
3616 }
3617
3618 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3636}
3637
3638impl PartialOrd for Aggregate {
3640 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3641 match self.input.partial_cmp(&other.input) {
3642 Some(Ordering::Equal) => {
3643 match self.group_expr.partial_cmp(&other.group_expr) {
3644 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3645 cmp => cmp,
3646 }
3647 }
3648 cmp => cmp,
3649 }
3650 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3652 }
3653}
3654
3655fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3657 group_expr
3658 .iter()
3659 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3660}
3661
3662fn calc_func_dependencies_for_aggregate(
3664 group_expr: &[Expr],
3666 input: &LogicalPlan,
3668 aggr_schema: &DFSchema,
3670) -> Result<FunctionalDependencies> {
3671 if !contains_grouping_set(group_expr) {
3677 let group_by_expr_names = group_expr
3678 .iter()
3679 .map(|item| item.schema_name().to_string())
3680 .collect::<IndexSet<_>>()
3681 .into_iter()
3682 .collect::<Vec<_>>();
3683 let aggregate_func_dependencies = aggregate_functional_dependencies(
3684 input.schema(),
3685 &group_by_expr_names,
3686 aggr_schema,
3687 );
3688 Ok(aggregate_func_dependencies)
3689 } else {
3690 Ok(FunctionalDependencies::empty())
3691 }
3692}
3693
3694fn calc_func_dependencies_for_project(
3697 exprs: &[Expr],
3698 input: &LogicalPlan,
3699) -> Result<FunctionalDependencies> {
3700 let input_fields = input.schema().field_names();
3701 let proj_indices = exprs
3703 .iter()
3704 .map(|expr| match expr {
3705 #[expect(deprecated)]
3706 Expr::Wildcard { qualifier, options } => {
3707 let wildcard_fields = exprlist_to_fields(
3708 vec![&Expr::Wildcard {
3709 qualifier: qualifier.clone(),
3710 options: options.clone(),
3711 }],
3712 input,
3713 )?;
3714 Ok::<_, DataFusionError>(
3715 wildcard_fields
3716 .into_iter()
3717 .filter_map(|(qualifier, f)| {
3718 let flat_name = qualifier
3719 .map(|t| format!("{}.{}", t, f.name()))
3720 .unwrap_or_else(|| f.name().clone());
3721 input_fields.iter().position(|item| *item == flat_name)
3722 })
3723 .collect::<Vec<_>>(),
3724 )
3725 }
3726 Expr::Alias(alias) => {
3727 let name = format!("{}", alias.expr);
3728 Ok(input_fields
3729 .iter()
3730 .position(|item| *item == name)
3731 .map(|i| vec![i])
3732 .unwrap_or(vec![]))
3733 }
3734 _ => {
3735 let name = format!("{expr}");
3736 Ok(input_fields
3737 .iter()
3738 .position(|item| *item == name)
3739 .map(|i| vec![i])
3740 .unwrap_or(vec![]))
3741 }
3742 })
3743 .collect::<Result<Vec<_>>>()?
3744 .into_iter()
3745 .flatten()
3746 .collect::<Vec<_>>();
3747
3748 Ok(input
3749 .schema()
3750 .functional_dependencies()
3751 .project_functional_dependencies(&proj_indices, exprs.len()))
3752}
3753
3754#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3756pub struct Sort {
3757 pub expr: Vec<SortExpr>,
3759 pub input: Arc<LogicalPlan>,
3761 pub fetch: Option<usize>,
3763}
3764
3765#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3767pub struct Join {
3768 pub left: Arc<LogicalPlan>,
3770 pub right: Arc<LogicalPlan>,
3772 pub on: Vec<(Expr, Expr)>,
3774 pub filter: Option<Expr>,
3776 pub join_type: JoinType,
3778 pub join_constraint: JoinConstraint,
3780 pub schema: DFSchemaRef,
3782 pub null_equality: NullEquality,
3784}
3785
3786impl Join {
3787 pub fn try_new(
3806 left: Arc<LogicalPlan>,
3807 right: Arc<LogicalPlan>,
3808 on: Vec<(Expr, Expr)>,
3809 filter: Option<Expr>,
3810 join_type: JoinType,
3811 join_constraint: JoinConstraint,
3812 null_equality: NullEquality,
3813 ) -> Result<Self> {
3814 let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3815
3816 Ok(Join {
3817 left,
3818 right,
3819 on,
3820 filter,
3821 join_type,
3822 join_constraint,
3823 schema: Arc::new(join_schema),
3824 null_equality,
3825 })
3826 }
3827
3828 pub fn try_new_with_project_input(
3831 original: &LogicalPlan,
3832 left: Arc<LogicalPlan>,
3833 right: Arc<LogicalPlan>,
3834 column_on: (Vec<Column>, Vec<Column>),
3835 ) -> Result<(Self, bool)> {
3836 let original_join = match original {
3837 LogicalPlan::Join(join) => join,
3838 _ => return plan_err!("Could not create join with project input"),
3839 };
3840
3841 let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3842 let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3843
3844 let mut requalified = false;
3845
3846 if original_join.join_type == JoinType::Inner
3849 || original_join.join_type == JoinType::Left
3850 || original_join.join_type == JoinType::Right
3851 || original_join.join_type == JoinType::Full
3852 {
3853 (left_sch, right_sch, requalified) =
3854 requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3855 }
3856
3857 let on: Vec<(Expr, Expr)> = column_on
3858 .0
3859 .into_iter()
3860 .zip(column_on.1)
3861 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3862 .collect();
3863
3864 let join_schema = build_join_schema(
3865 left_sch.schema(),
3866 right_sch.schema(),
3867 &original_join.join_type,
3868 )?;
3869
3870 Ok((
3871 Join {
3872 left,
3873 right,
3874 on,
3875 filter: original_join.filter.clone(),
3876 join_type: original_join.join_type,
3877 join_constraint: original_join.join_constraint,
3878 schema: Arc::new(join_schema),
3879 null_equality: original_join.null_equality,
3880 },
3881 requalified,
3882 ))
3883 }
3884}
3885
3886impl PartialOrd for Join {
3888 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3889 #[derive(PartialEq, PartialOrd)]
3890 struct ComparableJoin<'a> {
3891 pub left: &'a Arc<LogicalPlan>,
3893 pub right: &'a Arc<LogicalPlan>,
3895 pub on: &'a Vec<(Expr, Expr)>,
3897 pub filter: &'a Option<Expr>,
3899 pub join_type: &'a JoinType,
3901 pub join_constraint: &'a JoinConstraint,
3903 pub null_equality: &'a NullEquality,
3905 }
3906 let comparable_self = ComparableJoin {
3907 left: &self.left,
3908 right: &self.right,
3909 on: &self.on,
3910 filter: &self.filter,
3911 join_type: &self.join_type,
3912 join_constraint: &self.join_constraint,
3913 null_equality: &self.null_equality,
3914 };
3915 let comparable_other = ComparableJoin {
3916 left: &other.left,
3917 right: &other.right,
3918 on: &other.on,
3919 filter: &other.filter,
3920 join_type: &other.join_type,
3921 join_constraint: &other.join_constraint,
3922 null_equality: &other.null_equality,
3923 };
3924 comparable_self
3925 .partial_cmp(&comparable_other)
3926 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3928 }
3929}
3930
3931#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3933pub struct Subquery {
3934 pub subquery: Arc<LogicalPlan>,
3936 pub outer_ref_columns: Vec<Expr>,
3938 pub spans: Spans,
3940}
3941
3942impl Normalizeable for Subquery {
3943 fn can_normalize(&self) -> bool {
3944 false
3945 }
3946}
3947
3948impl NormalizeEq for Subquery {
3949 fn normalize_eq(&self, other: &Self) -> bool {
3950 *self.subquery == *other.subquery
3952 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3953 && self
3954 .outer_ref_columns
3955 .iter()
3956 .zip(other.outer_ref_columns.iter())
3957 .all(|(a, b)| a.normalize_eq(b))
3958 }
3959}
3960
3961impl Subquery {
3962 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3963 match plan {
3964 Expr::ScalarSubquery(it) => Ok(it),
3965 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3966 _ => plan_err!("Could not coerce into ScalarSubquery!"),
3967 }
3968 }
3969
3970 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3971 Subquery {
3972 subquery: plan,
3973 outer_ref_columns: self.outer_ref_columns.clone(),
3974 spans: Spans::new(),
3975 }
3976 }
3977}
3978
3979impl Debug for Subquery {
3980 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3981 write!(f, "<subquery>")
3982 }
3983}
3984
3985#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3991pub enum Partitioning {
3992 RoundRobinBatch(usize),
3994 Hash(Vec<Expr>, usize),
3997 DistributeBy(Vec<Expr>),
3999}
4000
4001#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4021pub struct ColumnUnnestList {
4022 pub output_column: Column,
4023 pub depth: usize,
4024}
4025
4026impl Display for ColumnUnnestList {
4027 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4028 write!(f, "{}|depth={}", self.output_column, self.depth)
4029 }
4030}
4031
4032#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4035pub struct Unnest {
4036 pub input: Arc<LogicalPlan>,
4038 pub exec_columns: Vec<Column>,
4040 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4043 pub struct_type_columns: Vec<usize>,
4046 pub dependency_indices: Vec<usize>,
4049 pub schema: DFSchemaRef,
4051 pub options: UnnestOptions,
4053}
4054
4055impl PartialOrd for Unnest {
4057 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4058 #[derive(PartialEq, PartialOrd)]
4059 struct ComparableUnnest<'a> {
4060 pub input: &'a Arc<LogicalPlan>,
4062 pub exec_columns: &'a Vec<Column>,
4064 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4067 pub struct_type_columns: &'a Vec<usize>,
4070 pub dependency_indices: &'a Vec<usize>,
4073 pub options: &'a UnnestOptions,
4075 }
4076 let comparable_self = ComparableUnnest {
4077 input: &self.input,
4078 exec_columns: &self.exec_columns,
4079 list_type_columns: &self.list_type_columns,
4080 struct_type_columns: &self.struct_type_columns,
4081 dependency_indices: &self.dependency_indices,
4082 options: &self.options,
4083 };
4084 let comparable_other = ComparableUnnest {
4085 input: &other.input,
4086 exec_columns: &other.exec_columns,
4087 list_type_columns: &other.list_type_columns,
4088 struct_type_columns: &other.struct_type_columns,
4089 dependency_indices: &other.dependency_indices,
4090 options: &other.options,
4091 };
4092 comparable_self
4093 .partial_cmp(&comparable_other)
4094 .filter(|cmp| *cmp != Ordering::Equal || self == other)
4096 }
4097}
4098
4099impl Unnest {
4100 pub fn try_new(
4101 input: Arc<LogicalPlan>,
4102 exec_columns: Vec<Column>,
4103 options: UnnestOptions,
4104 ) -> Result<Self> {
4105 if exec_columns.is_empty() {
4106 return plan_err!("unnest plan requires at least 1 column to unnest");
4107 }
4108
4109 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4110 let mut struct_columns = vec![];
4111 let indices_to_unnest = exec_columns
4112 .iter()
4113 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4114 .collect::<Result<HashMap<usize, &Column>>>()?;
4115
4116 let input_schema = input.schema();
4117
4118 let mut dependency_indices = vec![];
4119 let fields = input_schema
4135 .iter()
4136 .enumerate()
4137 .map(|(index, (original_qualifier, original_field))| {
4138 match indices_to_unnest.get(&index) {
4139 Some(column_to_unnest) => {
4140 let recursions_on_column = options
4141 .recursions
4142 .iter()
4143 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4144 .collect::<Vec<_>>();
4145 let mut transformed_columns = recursions_on_column
4146 .iter()
4147 .map(|r| {
4148 list_columns.push((
4149 index,
4150 ColumnUnnestList {
4151 output_column: r.output_column.clone(),
4152 depth: r.depth,
4153 },
4154 ));
4155 Ok(get_unnested_columns(
4156 &r.output_column.name,
4157 original_field.data_type(),
4158 r.depth,
4159 )?
4160 .into_iter()
4161 .next()
4162 .unwrap()) })
4164 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4165 if transformed_columns.is_empty() {
4166 transformed_columns = get_unnested_columns(
4167 &column_to_unnest.name,
4168 original_field.data_type(),
4169 1,
4170 )?;
4171 match original_field.data_type() {
4172 DataType::Struct(_) => {
4173 struct_columns.push(index);
4174 }
4175 DataType::List(_)
4176 | DataType::FixedSizeList(_, _)
4177 | DataType::LargeList(_) => {
4178 list_columns.push((
4179 index,
4180 ColumnUnnestList {
4181 output_column: Column::from_name(
4182 &column_to_unnest.name,
4183 ),
4184 depth: 1,
4185 },
4186 ));
4187 }
4188 _ => {}
4189 };
4190 }
4191
4192 dependency_indices.extend(std::iter::repeat_n(
4194 index,
4195 transformed_columns.len(),
4196 ));
4197 Ok(transformed_columns
4198 .iter()
4199 .map(|(col, field)| {
4200 (col.relation.to_owned(), field.to_owned())
4201 })
4202 .collect())
4203 }
4204 None => {
4205 dependency_indices.push(index);
4206 Ok(vec![(
4207 original_qualifier.cloned(),
4208 Arc::clone(original_field),
4209 )])
4210 }
4211 }
4212 })
4213 .collect::<Result<Vec<_>>>()?
4214 .into_iter()
4215 .flatten()
4216 .collect::<Vec<_>>();
4217
4218 let metadata = input_schema.metadata().clone();
4219 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4220 let deps = input_schema.functional_dependencies().clone();
4222 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4223
4224 Ok(Unnest {
4225 input,
4226 exec_columns,
4227 list_type_columns: list_columns,
4228 struct_type_columns: struct_columns,
4229 dependency_indices,
4230 schema,
4231 options,
4232 })
4233 }
4234}
4235
4236fn get_unnested_columns(
4245 col_name: &String,
4246 data_type: &DataType,
4247 depth: usize,
4248) -> Result<Vec<(Column, Arc<Field>)>> {
4249 let mut qualified_columns = Vec::with_capacity(1);
4250
4251 match data_type {
4252 DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
4253 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4254 let new_field = Arc::new(Field::new(
4255 col_name, data_type,
4256 true,
4259 ));
4260 let column = Column::from_name(col_name);
4261 qualified_columns.push((column, new_field));
4263 }
4264 DataType::Struct(fields) => {
4265 qualified_columns.extend(fields.iter().map(|f| {
4266 let new_name = format!("{}.{}", col_name, f.name());
4267 let column = Column::from_name(&new_name);
4268 let new_field = f.as_ref().clone().with_name(new_name);
4269 (column, Arc::new(new_field))
4271 }))
4272 }
4273 _ => {
4274 return internal_err!("trying to unnest on invalid data type {data_type}");
4275 }
4276 };
4277 Ok(qualified_columns)
4278}
4279
4280fn get_unnested_list_datatype_recursive(
4283 data_type: &DataType,
4284 depth: usize,
4285) -> Result<DataType> {
4286 match data_type {
4287 DataType::List(field)
4288 | DataType::FixedSizeList(field, _)
4289 | DataType::LargeList(field) => {
4290 if depth == 1 {
4291 return Ok(field.data_type().clone());
4292 }
4293 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4294 }
4295 _ => {}
4296 };
4297
4298 internal_err!("trying to unnest on invalid data type {data_type}")
4299}
4300
4301#[cfg(test)]
4302mod tests {
4303 use super::*;
4304 use crate::builder::LogicalTableSource;
4305 use crate::logical_plan::table_scan;
4306 use crate::select_expr::SelectExpr;
4307 use crate::test::function_stub::{count, count_udaf};
4308 use crate::{
4309 GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder,
4310 scalar_subquery,
4311 };
4312 use datafusion_common::metadata::ScalarAndMetadata;
4313 use datafusion_common::tree_node::{
4314 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4315 };
4316 use datafusion_common::{Constraint, ScalarValue, not_impl_err};
4317 use insta::{assert_debug_snapshot, assert_snapshot};
4318 use std::hash::DefaultHasher;
4319
4320 fn employee_schema() -> Schema {
4321 Schema::new(vec![
4322 Field::new("id", DataType::Int32, false),
4323 Field::new("first_name", DataType::Utf8, false),
4324 Field::new("last_name", DataType::Utf8, false),
4325 Field::new("state", DataType::Utf8, false),
4326 Field::new("salary", DataType::Int32, false),
4327 ])
4328 }
4329
4330 fn display_plan() -> Result<LogicalPlan> {
4331 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4332 .build()?;
4333
4334 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4335 .filter(in_subquery(col("state"), Arc::new(plan1)))?
4336 .project(vec![col("id")])?
4337 .build()
4338 }
4339
4340 #[test]
4341 fn test_display_indent() -> Result<()> {
4342 let plan = display_plan()?;
4343
4344 assert_snapshot!(plan.display_indent(), @r"
4345 Projection: employee_csv.id
4346 Filter: employee_csv.state IN (<subquery>)
4347 Subquery:
4348 TableScan: employee_csv projection=[state]
4349 TableScan: employee_csv projection=[id, state]
4350 ");
4351 Ok(())
4352 }
4353
4354 #[test]
4355 fn test_display_indent_schema() -> Result<()> {
4356 let plan = display_plan()?;
4357
4358 assert_snapshot!(plan.display_indent_schema(), @r"
4359 Projection: employee_csv.id [id:Int32]
4360 Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4361 Subquery: [state:Utf8]
4362 TableScan: employee_csv projection=[state] [state:Utf8]
4363 TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4364 ");
4365 Ok(())
4366 }
4367
4368 #[test]
4369 fn test_display_subquery_alias() -> Result<()> {
4370 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4371 .build()?;
4372 let plan1 = Arc::new(plan1);
4373
4374 let plan =
4375 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4376 .project(vec![col("id"), exists(plan1).alias("exists")])?
4377 .build();
4378
4379 assert_snapshot!(plan?.display_indent(), @r"
4380 Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4381 Subquery:
4382 TableScan: employee_csv projection=[state]
4383 TableScan: employee_csv projection=[id, state]
4384 ");
4385 Ok(())
4386 }
4387
4388 #[test]
4389 fn test_display_graphviz() -> Result<()> {
4390 let plan = display_plan()?;
4391
4392 assert_snapshot!(plan.display_graphviz(), @r#"
4395 // Begin DataFusion GraphViz Plan,
4396 // display it online here: https://dreampuf.github.io/GraphvizOnline
4397
4398 digraph {
4399 subgraph cluster_1
4400 {
4401 graph[label="LogicalPlan"]
4402 2[shape=box label="Projection: employee_csv.id"]
4403 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4404 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4405 4[shape=box label="Subquery:"]
4406 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4407 5[shape=box label="TableScan: employee_csv projection=[state]"]
4408 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4409 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4410 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4411 }
4412 subgraph cluster_7
4413 {
4414 graph[label="Detailed LogicalPlan"]
4415 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4416 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4417 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4418 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4419 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4420 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4421 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4422 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4423 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4424 }
4425 }
4426 // End DataFusion GraphViz Plan
4427 "#);
4428 Ok(())
4429 }
4430
4431 #[test]
4432 fn test_display_pg_json() -> Result<()> {
4433 let plan = display_plan()?;
4434
4435 assert_snapshot!(plan.display_pg_json(), @r#"
4436 [
4437 {
4438 "Plan": {
4439 "Expressions": [
4440 "employee_csv.id"
4441 ],
4442 "Node Type": "Projection",
4443 "Output": [
4444 "id"
4445 ],
4446 "Plans": [
4447 {
4448 "Condition": "employee_csv.state IN (<subquery>)",
4449 "Node Type": "Filter",
4450 "Output": [
4451 "id",
4452 "state"
4453 ],
4454 "Plans": [
4455 {
4456 "Node Type": "Subquery",
4457 "Output": [
4458 "state"
4459 ],
4460 "Plans": [
4461 {
4462 "Node Type": "TableScan",
4463 "Output": [
4464 "state"
4465 ],
4466 "Plans": [],
4467 "Relation Name": "employee_csv"
4468 }
4469 ]
4470 },
4471 {
4472 "Node Type": "TableScan",
4473 "Output": [
4474 "id",
4475 "state"
4476 ],
4477 "Plans": [],
4478 "Relation Name": "employee_csv"
4479 }
4480 ]
4481 }
4482 ]
4483 }
4484 }
4485 ]
4486 "#);
4487 Ok(())
4488 }
4489
4490 #[derive(Debug, Default)]
4492 struct OkVisitor {
4493 strings: Vec<String>,
4494 }
4495
4496 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4497 type Node = LogicalPlan;
4498
4499 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4500 let s = match plan {
4501 LogicalPlan::Projection { .. } => "pre_visit Projection",
4502 LogicalPlan::Filter { .. } => "pre_visit Filter",
4503 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4504 _ => {
4505 return not_impl_err!("unknown plan type");
4506 }
4507 };
4508
4509 self.strings.push(s.into());
4510 Ok(TreeNodeRecursion::Continue)
4511 }
4512
4513 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4514 let s = match plan {
4515 LogicalPlan::Projection { .. } => "post_visit Projection",
4516 LogicalPlan::Filter { .. } => "post_visit Filter",
4517 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4518 _ => {
4519 return not_impl_err!("unknown plan type");
4520 }
4521 };
4522
4523 self.strings.push(s.into());
4524 Ok(TreeNodeRecursion::Continue)
4525 }
4526 }
4527
4528 #[test]
4529 fn visit_order() {
4530 let mut visitor = OkVisitor::default();
4531 let plan = test_plan();
4532 let res = plan.visit_with_subqueries(&mut visitor);
4533 assert!(res.is_ok());
4534
4535 assert_debug_snapshot!(visitor.strings, @r#"
4536 [
4537 "pre_visit Projection",
4538 "pre_visit Filter",
4539 "pre_visit TableScan",
4540 "post_visit TableScan",
4541 "post_visit Filter",
4542 "post_visit Projection",
4543 ]
4544 "#);
4545 }
4546
4547 #[derive(Debug, Default)]
4548 struct OptionalCounter {
4550 val: Option<usize>,
4551 }
4552
4553 impl OptionalCounter {
4554 fn new(val: usize) -> Self {
4555 Self { val: Some(val) }
4556 }
4557 fn dec(&mut self) -> bool {
4559 if Some(0) == self.val {
4560 true
4561 } else {
4562 self.val = self.val.take().map(|i| i - 1);
4563 false
4564 }
4565 }
4566 }
4567
4568 #[derive(Debug, Default)]
4569 struct StoppingVisitor {
4571 inner: OkVisitor,
4572 return_false_from_pre_in: OptionalCounter,
4574 return_false_from_post_in: OptionalCounter,
4576 }
4577
4578 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4579 type Node = LogicalPlan;
4580
4581 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4582 if self.return_false_from_pre_in.dec() {
4583 return Ok(TreeNodeRecursion::Stop);
4584 }
4585 self.inner.f_down(plan)?;
4586
4587 Ok(TreeNodeRecursion::Continue)
4588 }
4589
4590 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4591 if self.return_false_from_post_in.dec() {
4592 return Ok(TreeNodeRecursion::Stop);
4593 }
4594
4595 self.inner.f_up(plan)
4596 }
4597 }
4598
4599 #[test]
4601 fn early_stopping_pre_visit() {
4602 let mut visitor = StoppingVisitor {
4603 return_false_from_pre_in: OptionalCounter::new(2),
4604 ..Default::default()
4605 };
4606 let plan = test_plan();
4607 let res = plan.visit_with_subqueries(&mut visitor);
4608 assert!(res.is_ok());
4609
4610 assert_debug_snapshot!(
4611 visitor.inner.strings,
4612 @r#"
4613 [
4614 "pre_visit Projection",
4615 "pre_visit Filter",
4616 ]
4617 "#
4618 );
4619 }
4620
4621 #[test]
4622 fn early_stopping_post_visit() {
4623 let mut visitor = StoppingVisitor {
4624 return_false_from_post_in: OptionalCounter::new(1),
4625 ..Default::default()
4626 };
4627 let plan = test_plan();
4628 let res = plan.visit_with_subqueries(&mut visitor);
4629 assert!(res.is_ok());
4630
4631 assert_debug_snapshot!(
4632 visitor.inner.strings,
4633 @r#"
4634 [
4635 "pre_visit Projection",
4636 "pre_visit Filter",
4637 "pre_visit TableScan",
4638 "post_visit TableScan",
4639 ]
4640 "#
4641 );
4642 }
4643
4644 #[derive(Debug, Default)]
4645 struct ErrorVisitor {
4647 inner: OkVisitor,
4648 return_error_from_pre_in: OptionalCounter,
4650 return_error_from_post_in: OptionalCounter,
4652 }
4653
4654 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4655 type Node = LogicalPlan;
4656
4657 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4658 if self.return_error_from_pre_in.dec() {
4659 return not_impl_err!("Error in pre_visit");
4660 }
4661
4662 self.inner.f_down(plan)
4663 }
4664
4665 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4666 if self.return_error_from_post_in.dec() {
4667 return not_impl_err!("Error in post_visit");
4668 }
4669
4670 self.inner.f_up(plan)
4671 }
4672 }
4673
4674 #[test]
4675 fn error_pre_visit() {
4676 let mut visitor = ErrorVisitor {
4677 return_error_from_pre_in: OptionalCounter::new(2),
4678 ..Default::default()
4679 };
4680 let plan = test_plan();
4681 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4682 assert_snapshot!(
4683 res.strip_backtrace(),
4684 @"This feature is not implemented: Error in pre_visit"
4685 );
4686 assert_debug_snapshot!(
4687 visitor.inner.strings,
4688 @r#"
4689 [
4690 "pre_visit Projection",
4691 "pre_visit Filter",
4692 ]
4693 "#
4694 );
4695 }
4696
4697 #[test]
4698 fn error_post_visit() {
4699 let mut visitor = ErrorVisitor {
4700 return_error_from_post_in: OptionalCounter::new(1),
4701 ..Default::default()
4702 };
4703 let plan = test_plan();
4704 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4705 assert_snapshot!(
4706 res.strip_backtrace(),
4707 @"This feature is not implemented: Error in post_visit"
4708 );
4709 assert_debug_snapshot!(
4710 visitor.inner.strings,
4711 @r#"
4712 [
4713 "pre_visit Projection",
4714 "pre_visit Filter",
4715 "pre_visit TableScan",
4716 "post_visit TableScan",
4717 ]
4718 "#
4719 );
4720 }
4721
4722 #[test]
4723 fn test_partial_eq_hash_and_partial_ord() {
4724 let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4725 produce_one_row: true,
4726 schema: Arc::new(DFSchema::empty()),
4727 }));
4728
4729 let count_window_function = |schema| {
4730 Window::try_new_with_schema(
4731 vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4732 WindowFunctionDefinition::AggregateUDF(count_udaf()),
4733 vec![],
4734 )))],
4735 Arc::clone(&empty_values),
4736 Arc::new(schema),
4737 )
4738 .unwrap()
4739 };
4740
4741 let schema_without_metadata = || {
4742 DFSchema::from_unqualified_fields(
4743 vec![Field::new("count", DataType::Int64, false)].into(),
4744 HashMap::new(),
4745 )
4746 .unwrap()
4747 };
4748
4749 let schema_with_metadata = || {
4750 DFSchema::from_unqualified_fields(
4751 vec![Field::new("count", DataType::Int64, false)].into(),
4752 [("key".to_string(), "value".to_string())].into(),
4753 )
4754 .unwrap()
4755 };
4756
4757 let f = count_window_function(schema_without_metadata());
4759
4760 let f2 = count_window_function(schema_without_metadata());
4762 assert_eq!(f, f2);
4763 assert_eq!(hash(&f), hash(&f2));
4764 assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4765
4766 let o = count_window_function(schema_with_metadata());
4768 assert_ne!(f, o);
4769 assert_ne!(hash(&f), hash(&o)); assert_eq!(f.partial_cmp(&o), None);
4771 }
4772
4773 fn hash<T: Hash>(value: &T) -> u64 {
4774 let hasher = &mut DefaultHasher::new();
4775 value.hash(hasher);
4776 hasher.finish()
4777 }
4778
4779 #[test]
4780 fn projection_expr_schema_mismatch() -> Result<()> {
4781 let empty_schema = Arc::new(DFSchema::empty());
4782 let p = Projection::try_new_with_schema(
4783 vec![col("a")],
4784 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4785 produce_one_row: false,
4786 schema: Arc::clone(&empty_schema),
4787 })),
4788 empty_schema,
4789 );
4790 assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4791 Ok(())
4792 }
4793
4794 fn test_plan() -> LogicalPlan {
4795 let schema = Schema::new(vec![
4796 Field::new("id", DataType::Int32, false),
4797 Field::new("state", DataType::Utf8, false),
4798 ]);
4799
4800 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4801 .unwrap()
4802 .filter(col("state").eq(lit("CO")))
4803 .unwrap()
4804 .project(vec![col("id")])
4805 .unwrap()
4806 .build()
4807 .unwrap()
4808 }
4809
4810 #[test]
4811 fn test_replace_invalid_placeholder() {
4812 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4814
4815 let plan = table_scan(TableReference::none(), &schema, None)
4816 .unwrap()
4817 .filter(col("id").eq(placeholder("")))
4818 .unwrap()
4819 .build()
4820 .unwrap();
4821
4822 let param_values = vec![ScalarValue::Int32(Some(42))];
4823 plan.replace_params_with_values(¶m_values.clone().into())
4824 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4825
4826 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4828
4829 let plan = table_scan(TableReference::none(), &schema, None)
4830 .unwrap()
4831 .filter(col("id").eq(placeholder("$0")))
4832 .unwrap()
4833 .build()
4834 .unwrap();
4835
4836 plan.replace_params_with_values(¶m_values.clone().into())
4837 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4838
4839 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4841
4842 let plan = table_scan(TableReference::none(), &schema, None)
4843 .unwrap()
4844 .filter(col("id").eq(placeholder("$00")))
4845 .unwrap()
4846 .build()
4847 .unwrap();
4848
4849 plan.replace_params_with_values(¶m_values.into())
4850 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4851 }
4852
4853 #[test]
4854 fn test_replace_placeholder_mismatched_metadata() {
4855 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4856
4857 let plan = table_scan(TableReference::none(), &schema, None)
4859 .unwrap()
4860 .filter(col("id").eq(placeholder("$1")))
4861 .unwrap()
4862 .build()
4863 .unwrap();
4864 let prepared_builder = LogicalPlanBuilder::new(plan)
4865 .prepare(
4866 "".to_string(),
4867 vec![Field::new("", DataType::Int32, true).into()],
4868 )
4869 .unwrap();
4870
4871 let mut scalar_meta = HashMap::new();
4873 scalar_meta.insert("some_key".to_string(), "some_value".to_string());
4874 let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
4875 ScalarValue::Int32(Some(42)),
4876 Some(scalar_meta.into()),
4877 )]);
4878 prepared_builder
4879 .plan()
4880 .clone()
4881 .with_param_values(param_values)
4882 .expect_err("prepared field metadata mismatch unexpectedly succeeded");
4883 }
4884
4885 #[test]
4886 fn test_replace_placeholder_empty_relation_valid_schema() {
4887 let plan = LogicalPlanBuilder::empty(false)
4889 .project(vec![
4890 SelectExpr::from(placeholder("$1")),
4891 SelectExpr::from(placeholder("$2")),
4892 ])
4893 .unwrap()
4894 .build()
4895 .unwrap();
4896
4897 assert_snapshot!(plan.display_indent_schema(), @r"
4899 Projection: $1, $2 [$1:Null;N, $2:Null;N]
4900 EmptyRelation: rows=0 []
4901 ");
4902
4903 let plan = plan
4904 .with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
4905 .unwrap();
4906
4907 assert_snapshot!(plan.display_indent_schema(), @r#"
4909 Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
4910 EmptyRelation: rows=0 []
4911 "#);
4912 }
4913
4914 #[test]
4915 fn test_nullable_schema_after_grouping_set() {
4916 let schema = Schema::new(vec![
4917 Field::new("foo", DataType::Int32, false),
4918 Field::new("bar", DataType::Int32, false),
4919 ]);
4920
4921 let plan = table_scan(TableReference::none(), &schema, None)
4922 .unwrap()
4923 .aggregate(
4924 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4925 vec![col("foo")],
4926 vec![col("bar")],
4927 ]))],
4928 vec![count(lit(true))],
4929 )
4930 .unwrap()
4931 .build()
4932 .unwrap();
4933
4934 let output_schema = plan.schema();
4935
4936 assert!(
4937 output_schema
4938 .field_with_name(None, "foo")
4939 .unwrap()
4940 .is_nullable(),
4941 );
4942 assert!(
4943 output_schema
4944 .field_with_name(None, "bar")
4945 .unwrap()
4946 .is_nullable()
4947 );
4948 }
4949
4950 #[test]
4951 fn test_filter_is_scalar() {
4952 let schema =
4954 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4955
4956 let source = Arc::new(LogicalTableSource::new(schema));
4957 let schema = Arc::new(
4958 DFSchema::try_from_qualified_schema(
4959 TableReference::bare("tab"),
4960 &source.schema(),
4961 )
4962 .unwrap(),
4963 );
4964 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4965 table_name: TableReference::bare("tab"),
4966 source: Arc::clone(&source) as Arc<dyn TableSource>,
4967 projection: None,
4968 projected_schema: Arc::clone(&schema),
4969 filters: vec![],
4970 fetch: None,
4971 }));
4972 let col = schema.field_names()[0].clone();
4973
4974 let filter = Filter::try_new(
4975 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4976 scan,
4977 )
4978 .unwrap();
4979 assert!(!filter.is_scalar());
4980 let unique_schema = Arc::new(
4981 schema
4982 .as_ref()
4983 .clone()
4984 .with_functional_dependencies(
4985 FunctionalDependencies::new_from_constraints(
4986 Some(&Constraints::new_unverified(vec![Constraint::Unique(
4987 vec![0],
4988 )])),
4989 1,
4990 ),
4991 )
4992 .unwrap(),
4993 );
4994 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4995 table_name: TableReference::bare("tab"),
4996 source,
4997 projection: None,
4998 projected_schema: Arc::clone(&unique_schema),
4999 filters: vec![],
5000 fetch: None,
5001 }));
5002 let col = schema.field_names()[0].clone();
5003
5004 let filter =
5005 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
5006 assert!(filter.is_scalar());
5007 }
5008
5009 #[test]
5010 fn test_transform_explain() {
5011 let schema = Schema::new(vec![
5012 Field::new("foo", DataType::Int32, false),
5013 Field::new("bar", DataType::Int32, false),
5014 ]);
5015
5016 let plan = table_scan(TableReference::none(), &schema, None)
5017 .unwrap()
5018 .explain(false, false)
5019 .unwrap()
5020 .build()
5021 .unwrap();
5022
5023 let external_filter = col("foo").eq(lit(true));
5024
5025 let plan = plan
5028 .transform(|plan| match plan {
5029 LogicalPlan::TableScan(table) => {
5030 let filter = Filter::try_new(
5031 external_filter.clone(),
5032 Arc::new(LogicalPlan::TableScan(table)),
5033 )
5034 .unwrap();
5035 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
5036 }
5037 x => Ok(Transformed::no(x)),
5038 })
5039 .data()
5040 .unwrap();
5041
5042 let actual = format!("{}", plan.display_indent());
5043 assert_snapshot!(actual, @r"
5044 Explain
5045 Filter: foo = Boolean(true)
5046 TableScan: ?table?
5047 ")
5048 }
5049
5050 #[test]
5051 fn test_plan_partial_ord() {
5052 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5053 produce_one_row: false,
5054 schema: Arc::new(DFSchema::empty()),
5055 });
5056
5057 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5058 schema: Arc::new(Schema::new(vec![Field::new(
5059 "foo",
5060 DataType::Int32,
5061 false,
5062 )])),
5063 output_schema: DFSchemaRef::new(DFSchema::empty()),
5064 });
5065
5066 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5067 schema: Arc::new(Schema::new(vec![Field::new(
5068 "foo",
5069 DataType::Int32,
5070 false,
5071 )])),
5072 output_schema: DFSchemaRef::new(DFSchema::empty()),
5073 });
5074
5075 assert_eq!(
5076 empty_relation.partial_cmp(&describe_table),
5077 Some(Ordering::Less)
5078 );
5079 assert_eq!(
5080 describe_table.partial_cmp(&empty_relation),
5081 Some(Ordering::Greater)
5082 );
5083 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5084 }
5085
5086 #[test]
5087 fn test_limit_with_new_children() {
5088 let input = Arc::new(LogicalPlan::Values(Values {
5089 schema: Arc::new(DFSchema::empty()),
5090 values: vec![vec![]],
5091 }));
5092 let cases = [
5093 LogicalPlan::Limit(Limit {
5094 skip: None,
5095 fetch: None,
5096 input: Arc::clone(&input),
5097 }),
5098 LogicalPlan::Limit(Limit {
5099 skip: None,
5100 fetch: Some(Box::new(Expr::Literal(
5101 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5102 None,
5103 ))),
5104 input: Arc::clone(&input),
5105 }),
5106 LogicalPlan::Limit(Limit {
5107 skip: Some(Box::new(Expr::Literal(
5108 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5109 None,
5110 ))),
5111 fetch: None,
5112 input: Arc::clone(&input),
5113 }),
5114 LogicalPlan::Limit(Limit {
5115 skip: Some(Box::new(Expr::Literal(
5116 ScalarValue::new_one(&DataType::UInt32).unwrap(),
5117 None,
5118 ))),
5119 fetch: Some(Box::new(Expr::Literal(
5120 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5121 None,
5122 ))),
5123 input,
5124 }),
5125 ];
5126
5127 for limit in cases {
5128 let new_limit = limit
5129 .with_new_exprs(
5130 limit.expressions(),
5131 limit.inputs().into_iter().cloned().collect(),
5132 )
5133 .unwrap();
5134 assert_eq!(limit, new_limit);
5135 }
5136 }
5137
5138 #[test]
5139 fn test_with_subqueries_jump() {
5140 let subquery_schema =
5145 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5146
5147 let subquery_plan =
5148 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5149 .unwrap()
5150 .filter(col("sub_id").eq(lit(0)))
5151 .unwrap()
5152 .build()
5153 .unwrap();
5154
5155 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5156
5157 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5158 .unwrap()
5159 .filter(col("id").eq(lit(0)))
5160 .unwrap()
5161 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5162 .unwrap()
5163 .build()
5164 .unwrap();
5165
5166 let mut filter_found = false;
5167 plan.apply_with_subqueries(|plan| {
5168 match plan {
5169 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5170 LogicalPlan::Filter(..) => filter_found = true,
5171 _ => {}
5172 }
5173 Ok(TreeNodeRecursion::Continue)
5174 })
5175 .unwrap();
5176 assert!(!filter_found);
5177
5178 struct ProjectJumpVisitor {
5179 filter_found: bool,
5180 }
5181
5182 impl ProjectJumpVisitor {
5183 fn new() -> Self {
5184 Self {
5185 filter_found: false,
5186 }
5187 }
5188 }
5189
5190 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5191 type Node = LogicalPlan;
5192
5193 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5194 match node {
5195 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5196 LogicalPlan::Filter(..) => self.filter_found = true,
5197 _ => {}
5198 }
5199 Ok(TreeNodeRecursion::Continue)
5200 }
5201 }
5202
5203 let mut visitor = ProjectJumpVisitor::new();
5204 plan.visit_with_subqueries(&mut visitor).unwrap();
5205 assert!(!visitor.filter_found);
5206
5207 let mut filter_found = false;
5208 plan.clone()
5209 .transform_down_with_subqueries(|plan| {
5210 match plan {
5211 LogicalPlan::Projection(..) => {
5212 return Ok(Transformed::new(
5213 plan,
5214 false,
5215 TreeNodeRecursion::Jump,
5216 ));
5217 }
5218 LogicalPlan::Filter(..) => filter_found = true,
5219 _ => {}
5220 }
5221 Ok(Transformed::no(plan))
5222 })
5223 .unwrap();
5224 assert!(!filter_found);
5225
5226 let mut filter_found = false;
5227 plan.clone()
5228 .transform_down_up_with_subqueries(
5229 |plan| {
5230 match plan {
5231 LogicalPlan::Projection(..) => {
5232 return Ok(Transformed::new(
5233 plan,
5234 false,
5235 TreeNodeRecursion::Jump,
5236 ));
5237 }
5238 LogicalPlan::Filter(..) => filter_found = true,
5239 _ => {}
5240 }
5241 Ok(Transformed::no(plan))
5242 },
5243 |plan| Ok(Transformed::no(plan)),
5244 )
5245 .unwrap();
5246 assert!(!filter_found);
5247
5248 struct ProjectJumpRewriter {
5249 filter_found: bool,
5250 }
5251
5252 impl ProjectJumpRewriter {
5253 fn new() -> Self {
5254 Self {
5255 filter_found: false,
5256 }
5257 }
5258 }
5259
5260 impl TreeNodeRewriter for ProjectJumpRewriter {
5261 type Node = LogicalPlan;
5262
5263 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5264 match node {
5265 LogicalPlan::Projection(..) => {
5266 return Ok(Transformed::new(
5267 node,
5268 false,
5269 TreeNodeRecursion::Jump,
5270 ));
5271 }
5272 LogicalPlan::Filter(..) => self.filter_found = true,
5273 _ => {}
5274 }
5275 Ok(Transformed::no(node))
5276 }
5277 }
5278
5279 let mut rewriter = ProjectJumpRewriter::new();
5280 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5281 assert!(!rewriter.filter_found);
5282 }
5283
5284 #[test]
5285 fn test_with_unresolved_placeholders() {
5286 let field_name = "id";
5287 let placeholder_value = "$1";
5288 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5289
5290 let plan = table_scan(TableReference::none(), &schema, None)
5291 .unwrap()
5292 .filter(col(field_name).eq(placeholder(placeholder_value)))
5293 .unwrap()
5294 .build()
5295 .unwrap();
5296
5297 let params = plan.get_parameter_fields().unwrap();
5299 assert_eq!(params.len(), 1);
5300
5301 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5302 assert_eq!(parameter_type, None);
5303 }
5304
5305 #[test]
5306 fn test_join_with_new_exprs() -> Result<()> {
5307 fn create_test_join(
5308 on: Vec<(Expr, Expr)>,
5309 filter: Option<Expr>,
5310 ) -> Result<LogicalPlan> {
5311 let schema = Schema::new(vec![
5312 Field::new("a", DataType::Int32, false),
5313 Field::new("b", DataType::Int32, false),
5314 ]);
5315
5316 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5317 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5318
5319 Ok(LogicalPlan::Join(Join {
5320 left: Arc::new(
5321 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5322 ),
5323 right: Arc::new(
5324 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5325 ),
5326 on,
5327 filter,
5328 join_type: JoinType::Inner,
5329 join_constraint: JoinConstraint::On,
5330 schema: Arc::new(left_schema.join(&right_schema)?),
5331 null_equality: NullEquality::NullEqualsNothing,
5332 }))
5333 }
5334
5335 {
5336 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5337 let LogicalPlan::Join(join) = join.with_new_exprs(
5338 join.expressions(),
5339 join.inputs().into_iter().cloned().collect(),
5340 )?
5341 else {
5342 unreachable!()
5343 };
5344 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5345 assert_eq!(join.filter, None);
5346 }
5347
5348 {
5349 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5350 let LogicalPlan::Join(join) = join.with_new_exprs(
5351 join.expressions(),
5352 join.inputs().into_iter().cloned().collect(),
5353 )?
5354 else {
5355 unreachable!()
5356 };
5357 assert_eq!(join.on, vec![]);
5358 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5359 }
5360
5361 {
5362 let join = create_test_join(
5363 vec![(col("t1.a"), (col("t2.a")))],
5364 Some(col("t1.b").gt(col("t2.b"))),
5365 )?;
5366 let LogicalPlan::Join(join) = join.with_new_exprs(
5367 join.expressions(),
5368 join.inputs().into_iter().cloned().collect(),
5369 )?
5370 else {
5371 unreachable!()
5372 };
5373 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5374 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5375 }
5376
5377 {
5378 let join = create_test_join(
5379 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5380 None,
5381 )?;
5382 let LogicalPlan::Join(join) = join.with_new_exprs(
5383 vec![
5384 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5385 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5386 col("t1.b"),
5387 col("t2.b"),
5388 lit(true),
5389 ],
5390 join.inputs().into_iter().cloned().collect(),
5391 )?
5392 else {
5393 unreachable!()
5394 };
5395 assert_eq!(
5396 join.on,
5397 vec![
5398 (
5399 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5400 binary_expr(col("t2.a"), Operator::Plus, lit(2))
5401 ),
5402 (col("t1.b"), (col("t2.b")))
5403 ]
5404 );
5405 assert_eq!(join.filter, Some(lit(true)));
5406 }
5407
5408 Ok(())
5409 }
5410
5411 #[test]
5412 fn test_join_try_new() -> Result<()> {
5413 let schema = Schema::new(vec![
5414 Field::new("a", DataType::Int32, false),
5415 Field::new("b", DataType::Int32, false),
5416 ]);
5417
5418 let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5419
5420 let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5421
5422 let join_types = vec![
5423 JoinType::Inner,
5424 JoinType::Left,
5425 JoinType::Right,
5426 JoinType::Full,
5427 JoinType::LeftSemi,
5428 JoinType::LeftAnti,
5429 JoinType::RightSemi,
5430 JoinType::RightAnti,
5431 JoinType::LeftMark,
5432 ];
5433
5434 for join_type in join_types {
5435 let join = Join::try_new(
5436 Arc::new(left_scan.clone()),
5437 Arc::new(right_scan.clone()),
5438 vec![(col("t1.a"), col("t2.a"))],
5439 Some(col("t1.b").gt(col("t2.b"))),
5440 join_type,
5441 JoinConstraint::On,
5442 NullEquality::NullEqualsNothing,
5443 )?;
5444
5445 match join_type {
5446 JoinType::LeftSemi | JoinType::LeftAnti => {
5447 assert_eq!(join.schema.fields().len(), 2);
5448
5449 let fields = join.schema.fields();
5450 assert_eq!(
5451 fields[0].name(),
5452 "a",
5453 "First field should be 'a' from left table"
5454 );
5455 assert_eq!(
5456 fields[1].name(),
5457 "b",
5458 "Second field should be 'b' from left table"
5459 );
5460 }
5461 JoinType::RightSemi | JoinType::RightAnti => {
5462 assert_eq!(join.schema.fields().len(), 2);
5463
5464 let fields = join.schema.fields();
5465 assert_eq!(
5466 fields[0].name(),
5467 "a",
5468 "First field should be 'a' from right table"
5469 );
5470 assert_eq!(
5471 fields[1].name(),
5472 "b",
5473 "Second field should be 'b' from right table"
5474 );
5475 }
5476 JoinType::LeftMark => {
5477 assert_eq!(join.schema.fields().len(), 3);
5478
5479 let fields = join.schema.fields();
5480 assert_eq!(
5481 fields[0].name(),
5482 "a",
5483 "First field should be 'a' from left table"
5484 );
5485 assert_eq!(
5486 fields[1].name(),
5487 "b",
5488 "Second field should be 'b' from left table"
5489 );
5490 assert_eq!(
5491 fields[2].name(),
5492 "mark",
5493 "Third field should be the mark column"
5494 );
5495
5496 assert!(!fields[0].is_nullable());
5497 assert!(!fields[1].is_nullable());
5498 assert!(!fields[2].is_nullable());
5499 }
5500 _ => {
5501 assert_eq!(join.schema.fields().len(), 4);
5502
5503 let fields = join.schema.fields();
5504 assert_eq!(
5505 fields[0].name(),
5506 "a",
5507 "First field should be 'a' from left table"
5508 );
5509 assert_eq!(
5510 fields[1].name(),
5511 "b",
5512 "Second field should be 'b' from left table"
5513 );
5514 assert_eq!(
5515 fields[2].name(),
5516 "a",
5517 "Third field should be 'a' from right table"
5518 );
5519 assert_eq!(
5520 fields[3].name(),
5521 "b",
5522 "Fourth field should be 'b' from right table"
5523 );
5524
5525 if join_type == JoinType::Left {
5526 assert!(!fields[0].is_nullable());
5528 assert!(!fields[1].is_nullable());
5529 assert!(fields[2].is_nullable());
5531 assert!(fields[3].is_nullable());
5532 } else if join_type == JoinType::Right {
5533 assert!(fields[0].is_nullable());
5535 assert!(fields[1].is_nullable());
5536 assert!(!fields[2].is_nullable());
5538 assert!(!fields[3].is_nullable());
5539 } else if join_type == JoinType::Full {
5540 assert!(fields[0].is_nullable());
5541 assert!(fields[1].is_nullable());
5542 assert!(fields[2].is_nullable());
5543 assert!(fields[3].is_nullable());
5544 }
5545 }
5546 }
5547
5548 assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5549 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5550 assert_eq!(join.join_type, join_type);
5551 assert_eq!(join.join_constraint, JoinConstraint::On);
5552 assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5553 }
5554
5555 Ok(())
5556 }
5557
5558 #[test]
5559 fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5560 let left_schema = Schema::new(vec![
5561 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), ]);
5565
5566 let right_schema = Schema::new(vec![
5567 Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), Field::new("value", DataType::Float64, true), ]);
5571
5572 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5573
5574 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5575
5576 {
5578 let join = Join::try_new(
5581 Arc::new(left_plan.clone()),
5582 Arc::new(right_plan.clone()),
5583 vec![(col("t1.id"), col("t2.id"))],
5584 None,
5585 JoinType::Inner,
5586 JoinConstraint::Using,
5587 NullEquality::NullEqualsNothing,
5588 )?;
5589
5590 let fields = join.schema.fields();
5591
5592 assert_eq!(fields.len(), 6);
5593
5594 assert_eq!(
5595 fields[0].name(),
5596 "id",
5597 "First field should be 'id' from left table"
5598 );
5599 assert_eq!(
5600 fields[1].name(),
5601 "name",
5602 "Second field should be 'name' from left table"
5603 );
5604 assert_eq!(
5605 fields[2].name(),
5606 "value",
5607 "Third field should be 'value' from left table"
5608 );
5609 assert_eq!(
5610 fields[3].name(),
5611 "id",
5612 "Fourth field should be 'id' from right table"
5613 );
5614 assert_eq!(
5615 fields[4].name(),
5616 "category",
5617 "Fifth field should be 'category' from right table"
5618 );
5619 assert_eq!(
5620 fields[5].name(),
5621 "value",
5622 "Sixth field should be 'value' from right table"
5623 );
5624
5625 assert_eq!(join.join_constraint, JoinConstraint::Using);
5626 }
5627
5628 {
5630 let join = Join::try_new(
5632 Arc::new(left_plan.clone()),
5633 Arc::new(right_plan.clone()),
5634 vec![(col("t1.id"), col("t2.id"))], Some(col("t1.value").lt(col("t2.value"))), JoinType::Inner,
5637 JoinConstraint::On,
5638 NullEquality::NullEqualsNothing,
5639 )?;
5640
5641 let fields = join.schema.fields();
5642 assert_eq!(fields.len(), 6);
5643
5644 assert_eq!(
5645 fields[0].name(),
5646 "id",
5647 "First field should be 'id' from left table"
5648 );
5649 assert_eq!(
5650 fields[1].name(),
5651 "name",
5652 "Second field should be 'name' from left table"
5653 );
5654 assert_eq!(
5655 fields[2].name(),
5656 "value",
5657 "Third field should be 'value' from left table"
5658 );
5659 assert_eq!(
5660 fields[3].name(),
5661 "id",
5662 "Fourth field should be 'id' from right table"
5663 );
5664 assert_eq!(
5665 fields[4].name(),
5666 "category",
5667 "Fifth field should be 'category' from right table"
5668 );
5669 assert_eq!(
5670 fields[5].name(),
5671 "value",
5672 "Sixth field should be 'value' from right table"
5673 );
5674
5675 assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5676 }
5677
5678 {
5680 let join = Join::try_new(
5681 Arc::new(left_plan.clone()),
5682 Arc::new(right_plan.clone()),
5683 vec![(col("t1.id"), col("t2.id"))],
5684 None,
5685 JoinType::Inner,
5686 JoinConstraint::On,
5687 NullEquality::NullEqualsNull,
5688 )?;
5689
5690 assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5691 }
5692
5693 Ok(())
5694 }
5695
5696 #[test]
5697 fn test_join_try_new_schema_validation() -> Result<()> {
5698 let left_schema = Schema::new(vec![
5699 Field::new("id", DataType::Int32, false),
5700 Field::new("name", DataType::Utf8, false),
5701 Field::new("value", DataType::Float64, true),
5702 ]);
5703
5704 let right_schema = Schema::new(vec![
5705 Field::new("id", DataType::Int32, false),
5706 Field::new("category", DataType::Utf8, true),
5707 Field::new("code", DataType::Int16, false),
5708 ]);
5709
5710 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5711
5712 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5713
5714 let join_types = vec![
5715 JoinType::Inner,
5716 JoinType::Left,
5717 JoinType::Right,
5718 JoinType::Full,
5719 ];
5720
5721 for join_type in join_types {
5722 let join = Join::try_new(
5723 Arc::new(left_plan.clone()),
5724 Arc::new(right_plan.clone()),
5725 vec![(col("t1.id"), col("t2.id"))],
5726 Some(col("t1.value").gt(lit(5.0))),
5727 join_type,
5728 JoinConstraint::On,
5729 NullEquality::NullEqualsNothing,
5730 )?;
5731
5732 let fields = join.schema.fields();
5733 assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5734
5735 for (i, field) in fields.iter().enumerate() {
5736 let expected_nullable = match (i, &join_type) {
5737 (0, JoinType::Right | JoinType::Full) => true, (1, JoinType::Right | JoinType::Full) => true, (2, _) => true, (3, JoinType::Left | JoinType::Full) => true, (4, _) => true, (5, JoinType::Left | JoinType::Full) => true, _ => false,
5748 };
5749
5750 assert_eq!(
5751 field.is_nullable(),
5752 expected_nullable,
5753 "Field {} ({}) nullability incorrect for {:?} join",
5754 i,
5755 field.name(),
5756 join_type
5757 );
5758 }
5759 }
5760
5761 let using_join = Join::try_new(
5762 Arc::new(left_plan.clone()),
5763 Arc::new(right_plan.clone()),
5764 vec![(col("t1.id"), col("t2.id"))],
5765 None,
5766 JoinType::Inner,
5767 JoinConstraint::Using,
5768 NullEquality::NullEqualsNothing,
5769 )?;
5770
5771 assert_eq!(
5772 using_join.schema.fields().len(),
5773 6,
5774 "USING join should have all fields"
5775 );
5776 assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5777
5778 Ok(())
5779 }
5780}