1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::sync::{Arc, LazyLock};
25
26use super::DdlStatement;
27use super::dml::CopyTo;
28use super::invariants::{
29 InvariantLevel, assert_always_invariants_at_current_node,
30 assert_executable_invariants,
31};
32use crate::builder::{unique_field_aliases, unnest_with_options};
33use crate::expr::{
34 Alias, Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams,
35 intersect_metadata_for_union,
36};
37use crate::expr_rewriter::{
38 NamePreserver, create_col_from_scalar_expr, normalize_cols, normalize_sorts,
39};
40use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
41use crate::logical_plan::extension::UserDefinedLogicalNode;
42use crate::logical_plan::{DmlStatement, Statement};
43use crate::utils::{
44 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
45 grouping_set_expr_count, grouping_set_to_exprlist, merge_schema, split_conjunction,
46};
47use crate::{
48 BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, GroupingSet,
49 LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
50 WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
51};
52
53use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
54use datafusion_common::cse::{NormalizeEq, Normalizeable};
55use datafusion_common::format::ExplainFormat;
56use datafusion_common::metadata::check_metadata_with_storage_equal;
57use datafusion_common::tree_node::{
58 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
59};
60use datafusion_common::{
61 Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency,
62 FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues, Result,
63 ScalarValue, Spans, TableReference, UnnestOptions, aggregate_functional_dependencies,
64 assert_eq_or_internal_err, assert_or_internal_err, internal_err, plan_err,
65};
66use indexmap::IndexSet;
67
68use crate::display::PgJsonVisitor;
70pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
71pub use datafusion_common::{JoinConstraint, JoinType};
72
73#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
206pub enum LogicalPlan {
207 Projection(Projection),
210 Filter(Filter),
219 Window(Window),
225 Aggregate(Aggregate),
231 Sort(Sort),
234 Join(Join),
237 Repartition(Repartition),
241 Union(Union),
245 TableScan(TableScan),
248 EmptyRelation(EmptyRelation),
252 Subquery(Subquery),
255 SubqueryAlias(SubqueryAlias),
257 Limit(Limit),
259 Statement(Statement),
261 Values(Values),
266 Explain(Explain),
269 Analyze(Analyze),
273 Extension(Extension),
276 Distinct(Distinct),
279 Dml(DmlStatement),
281 Ddl(DdlStatement),
283 Copy(CopyTo),
285 DescribeTable(DescribeTable),
288 Unnest(Unnest),
291 RecursiveQuery(RecursiveQuery),
293}
294
295impl Default for LogicalPlan {
296 fn default() -> Self {
297 LogicalPlan::EmptyRelation(EmptyRelation {
301 produce_one_row: false,
302 schema: Arc::clone(DFSchema::empty_ref()),
303 })
304 }
305}
306
307impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
308 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
309 &'a self,
310 mut f: F,
311 ) -> Result<TreeNodeRecursion> {
312 f(self)
313 }
314
315 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
316 self,
317 mut f: F,
318 ) -> Result<Transformed<Self>> {
319 f(self)
320 }
321}
322
323impl LogicalPlan {
324 pub fn schema(&self) -> &DFSchemaRef {
326 match self {
327 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
328 LogicalPlan::Values(Values { schema, .. }) => schema,
329 LogicalPlan::TableScan(TableScan {
330 projected_schema, ..
331 }) => projected_schema,
332 LogicalPlan::Projection(Projection { schema, .. }) => schema,
333 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
334 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
335 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
336 LogicalPlan::Window(Window { schema, .. }) => schema,
337 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
338 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
339 LogicalPlan::Join(Join { schema, .. }) => schema,
340 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
341 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
342 LogicalPlan::Statement(statement) => statement.schema(),
343 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
344 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
345 LogicalPlan::Explain(explain) => &explain.schema,
346 LogicalPlan::Analyze(analyze) => &analyze.schema,
347 LogicalPlan::Extension(extension) => extension.node.schema(),
348 LogicalPlan::Union(Union { schema, .. }) => schema,
349 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
350 output_schema
351 }
352 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
353 LogicalPlan::Copy(CopyTo { output_schema, .. }) => output_schema,
354 LogicalPlan::Ddl(ddl) => ddl.schema(),
355 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
356 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
357 static_term.schema()
359 }
360 }
361 }
362
363 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
366 match self {
367 LogicalPlan::Window(_)
368 | LogicalPlan::Projection(_)
369 | LogicalPlan::Aggregate(_)
370 | LogicalPlan::Unnest(_)
371 | LogicalPlan::Join(_) => self
372 .inputs()
373 .iter()
374 .map(|input| input.schema().as_ref())
375 .collect(),
376 _ => vec![],
377 }
378 }
379
380 pub fn explain_schema() -> SchemaRef {
382 SchemaRef::new(Schema::new(vec![
383 Field::new("plan_type", DataType::Utf8, false),
384 Field::new("plan", DataType::Utf8, false),
385 ]))
386 }
387
388 pub fn describe_schema() -> Schema {
390 Schema::new(vec![
391 Field::new("column_name", DataType::Utf8, false),
392 Field::new("data_type", DataType::Utf8, false),
393 Field::new("is_nullable", DataType::Utf8, false),
394 ])
395 }
396
397 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
414 let mut exprs = vec![];
415 self.apply_expressions(|e| {
416 exprs.push(e.clone());
417 Ok(TreeNodeRecursion::Continue)
418 })
419 .unwrap();
421 exprs
422 }
423
424 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
427 let mut exprs = vec![];
428 self.apply_expressions(|e| {
429 find_out_reference_exprs(e).into_iter().for_each(|e| {
430 if !exprs.contains(&e) {
431 exprs.push(e)
432 }
433 });
434 Ok(TreeNodeRecursion::Continue)
435 })
436 .unwrap();
438 self.inputs()
439 .into_iter()
440 .flat_map(|child| child.all_out_ref_exprs())
441 .for_each(|e| {
442 if !exprs.contains(&e) {
443 exprs.push(e)
444 }
445 });
446 exprs
447 }
448
449 pub fn inputs(&self) -> Vec<&LogicalPlan> {
453 match self {
454 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
455 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
456 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
457 LogicalPlan::Window(Window { input, .. }) => vec![input],
458 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
459 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
460 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
461 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
462 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
463 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
464 LogicalPlan::Extension(extension) => extension.node.inputs(),
465 LogicalPlan::Union(Union { inputs, .. }) => {
466 inputs.iter().map(|arc| arc.as_ref()).collect()
467 }
468 LogicalPlan::Distinct(
469 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
470 ) => vec![input],
471 LogicalPlan::Explain(explain) => vec![&explain.plan],
472 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
473 LogicalPlan::Dml(write) => vec![&write.input],
474 LogicalPlan::Copy(copy) => vec![©.input],
475 LogicalPlan::Ddl(ddl) => ddl.inputs(),
476 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
477 LogicalPlan::RecursiveQuery(RecursiveQuery {
478 static_term,
479 recursive_term,
480 ..
481 }) => vec![static_term, recursive_term],
482 LogicalPlan::Statement(stmt) => stmt.inputs(),
483 LogicalPlan::TableScan { .. }
485 | LogicalPlan::EmptyRelation { .. }
486 | LogicalPlan::Values { .. }
487 | LogicalPlan::DescribeTable(_) => vec![],
488 }
489 }
490
491 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
493 let mut using_columns: Vec<HashSet<Column>> = vec![];
494
495 self.apply_with_subqueries(|plan| {
496 if let LogicalPlan::Join(Join {
497 join_constraint: JoinConstraint::Using,
498 on,
499 ..
500 }) = plan
501 {
502 let columns =
504 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
505 let Some(l) = l.get_as_join_column() else {
506 return internal_err!(
507 "Invalid join key. Expected column, found {l:?}"
508 );
509 };
510 let Some(r) = r.get_as_join_column() else {
511 return internal_err!(
512 "Invalid join key. Expected column, found {r:?}"
513 );
514 };
515 accumu.insert(l.to_owned());
516 accumu.insert(r.to_owned());
517 Result::<_, DataFusionError>::Ok(accumu)
518 })?;
519 using_columns.push(columns);
520 }
521 Ok(TreeNodeRecursion::Continue)
522 })?;
523
524 Ok(using_columns)
525 }
526
527 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
529 match self {
530 LogicalPlan::Projection(projection) => {
531 Ok(Some(projection.expr.as_slice()[0].clone()))
532 }
533 LogicalPlan::Aggregate(agg) => {
534 if agg.group_expr.is_empty() {
535 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
536 } else {
537 Ok(Some(agg.group_expr.as_slice()[0].clone()))
538 }
539 }
540 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
541 Ok(Some(select_expr[0].clone()))
542 }
543 LogicalPlan::Filter(Filter { input, .. })
544 | LogicalPlan::Distinct(Distinct::All(input))
545 | LogicalPlan::Sort(Sort { input, .. })
546 | LogicalPlan::Limit(Limit { input, .. })
547 | LogicalPlan::Repartition(Repartition { input, .. })
548 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
549 LogicalPlan::Join(Join {
550 left,
551 right,
552 join_type,
553 ..
554 }) => match join_type {
555 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
556 if left.schema().fields().is_empty() {
557 right.head_output_expr()
558 } else {
559 left.head_output_expr()
560 }
561 }
562 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
563 left.head_output_expr()
564 }
565 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
566 right.head_output_expr()
567 }
568 },
569 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
570 static_term.head_output_expr()
571 }
572 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
573 union.schema.qualified_field(0),
574 )))),
575 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
576 table.projected_schema.qualified_field(0),
577 )))),
578 LogicalPlan::SubqueryAlias(subquery_alias) => {
579 let expr_opt = subquery_alias.input.head_output_expr()?;
580 expr_opt
581 .map(|expr| {
582 Ok(Expr::Column(create_col_from_scalar_expr(
583 &expr,
584 subquery_alias.alias.to_string(),
585 )?))
586 })
587 .map_or(Ok(None), |v| v.map(Some))
588 }
589 LogicalPlan::Subquery(_) => Ok(None),
590 LogicalPlan::EmptyRelation(_)
591 | LogicalPlan::Statement(_)
592 | LogicalPlan::Values(_)
593 | LogicalPlan::Explain(_)
594 | LogicalPlan::Analyze(_)
595 | LogicalPlan::Extension(_)
596 | LogicalPlan::Dml(_)
597 | LogicalPlan::Copy(_)
598 | LogicalPlan::Ddl(_)
599 | LogicalPlan::DescribeTable(_)
600 | LogicalPlan::Unnest(_) => Ok(None),
601 }
602 }
603
604 pub fn recompute_schema(self) -> Result<Self> {
627 match self {
628 LogicalPlan::Projection(Projection {
631 expr,
632 input,
633 schema: _,
634 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
635 LogicalPlan::Dml(_) => Ok(self),
636 LogicalPlan::Copy(_) => Ok(self),
637 LogicalPlan::Values(Values { schema, values }) => {
638 Ok(LogicalPlan::Values(Values { schema, values }))
640 }
641 LogicalPlan::Filter(Filter { predicate, input }) => {
642 Filter::try_new(predicate, input).map(LogicalPlan::Filter)
643 }
644 LogicalPlan::Repartition(_) => Ok(self),
645 LogicalPlan::Window(Window {
646 input,
647 window_expr,
648 schema: _,
649 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
650 LogicalPlan::Aggregate(Aggregate {
651 input,
652 group_expr,
653 aggr_expr,
654 schema: _,
655 }) => Aggregate::try_new(input, group_expr, aggr_expr)
656 .map(LogicalPlan::Aggregate),
657 LogicalPlan::Sort(_) => Ok(self),
658 LogicalPlan::Join(Join {
659 left,
660 right,
661 filter,
662 join_type,
663 join_constraint,
664 on,
665 schema: _,
666 null_equality,
667 null_aware,
668 }) => {
669 let schema =
670 build_join_schema(left.schema(), right.schema(), &join_type)?;
671
672 let new_on: Vec<_> = on
673 .into_iter()
674 .map(|equi_expr| {
675 (equi_expr.0.unalias(), equi_expr.1.unalias())
677 })
678 .collect();
679
680 Ok(LogicalPlan::Join(Join {
681 left,
682 right,
683 join_type,
684 join_constraint,
685 on: new_on,
686 filter,
687 schema: DFSchemaRef::new(schema),
688 null_equality,
689 null_aware,
690 }))
691 }
692 LogicalPlan::Subquery(_) => Ok(self),
693 LogicalPlan::SubqueryAlias(SubqueryAlias {
694 input,
695 alias,
696 schema: _,
697 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
698 LogicalPlan::Limit(_) => Ok(self),
699 LogicalPlan::Ddl(_) => Ok(self),
700 LogicalPlan::Extension(Extension { node }) => {
701 let expr = node.expressions();
704 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
705 Ok(LogicalPlan::Extension(Extension {
706 node: node.with_exprs_and_inputs(expr, inputs)?,
707 }))
708 }
709 LogicalPlan::Union(Union { inputs, schema }) => {
710 let first_input_schema = inputs[0].schema();
711 if schema.fields().len() == first_input_schema.fields().len() {
712 Ok(LogicalPlan::Union(Union { inputs, schema }))
714 } else {
715 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
723 }
724 }
725 LogicalPlan::Distinct(distinct) => {
726 let distinct = match distinct {
727 Distinct::All(input) => Distinct::All(input),
728 Distinct::On(DistinctOn {
729 on_expr,
730 select_expr,
731 sort_expr,
732 input,
733 schema: _,
734 }) => Distinct::On(DistinctOn::try_new(
735 on_expr,
736 select_expr,
737 sort_expr,
738 input,
739 )?),
740 };
741 Ok(LogicalPlan::Distinct(distinct))
742 }
743 LogicalPlan::RecursiveQuery(_) => Ok(self),
744 LogicalPlan::Analyze(_) => Ok(self),
745 LogicalPlan::Explain(_) => Ok(self),
746 LogicalPlan::TableScan(_) => Ok(self),
747 LogicalPlan::EmptyRelation(_) => Ok(self),
748 LogicalPlan::Statement(_) => Ok(self),
749 LogicalPlan::DescribeTable(_) => Ok(self),
750 LogicalPlan::Unnest(Unnest {
751 input,
752 exec_columns,
753 options,
754 ..
755 }) => {
756 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
758 }
759 }
760 }
761
762 pub fn with_new_exprs(
788 &self,
789 mut expr: Vec<Expr>,
790 inputs: Vec<LogicalPlan>,
791 ) -> Result<LogicalPlan> {
792 match self {
793 LogicalPlan::Projection(Projection { .. }) => {
796 let input = self.only_input(inputs)?;
797 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
798 }
799 LogicalPlan::Dml(DmlStatement {
800 table_name,
801 target,
802 op,
803 ..
804 }) => {
805 self.assert_no_expressions(expr)?;
806 let input = self.only_input(inputs)?;
807 Ok(LogicalPlan::Dml(DmlStatement::new(
808 table_name.clone(),
809 Arc::clone(target),
810 op.clone(),
811 Arc::new(input),
812 )))
813 }
814 LogicalPlan::Copy(CopyTo {
815 input: _,
816 output_url,
817 file_type,
818 options,
819 partition_by,
820 output_schema: _,
821 }) => {
822 self.assert_no_expressions(expr)?;
823 let input = self.only_input(inputs)?;
824 Ok(LogicalPlan::Copy(CopyTo::new(
825 Arc::new(input),
826 output_url.clone(),
827 partition_by.clone(),
828 Arc::clone(file_type),
829 options.clone(),
830 )))
831 }
832 LogicalPlan::Values(Values { schema, .. }) => {
833 self.assert_no_inputs(inputs)?;
834 Ok(LogicalPlan::Values(Values {
835 schema: Arc::clone(schema),
836 values: expr
837 .chunks_exact(schema.fields().len())
838 .map(|s| s.to_vec())
839 .collect(),
840 }))
841 }
842 LogicalPlan::Filter { .. } => {
843 let predicate = self.only_expr(expr)?;
844 let input = self.only_input(inputs)?;
845
846 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
847 }
848 LogicalPlan::Repartition(Repartition {
849 partitioning_scheme,
850 ..
851 }) => match partitioning_scheme {
852 Partitioning::RoundRobinBatch(n) => {
853 self.assert_no_expressions(expr)?;
854 let input = self.only_input(inputs)?;
855 Ok(LogicalPlan::Repartition(Repartition {
856 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
857 input: Arc::new(input),
858 }))
859 }
860 Partitioning::Hash(_, n) => {
861 let input = self.only_input(inputs)?;
862 Ok(LogicalPlan::Repartition(Repartition {
863 partitioning_scheme: Partitioning::Hash(expr, *n),
864 input: Arc::new(input),
865 }))
866 }
867 Partitioning::DistributeBy(_) => {
868 let input = self.only_input(inputs)?;
869 Ok(LogicalPlan::Repartition(Repartition {
870 partitioning_scheme: Partitioning::DistributeBy(expr),
871 input: Arc::new(input),
872 }))
873 }
874 },
875 LogicalPlan::Window(Window { window_expr, .. }) => {
876 assert_eq!(window_expr.len(), expr.len());
877 let input = self.only_input(inputs)?;
878 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
879 }
880 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
881 let input = self.only_input(inputs)?;
882 let agg_expr = expr.split_off(group_expr.len());
884
885 Aggregate::try_new(Arc::new(input), expr, agg_expr)
886 .map(LogicalPlan::Aggregate)
887 }
888 LogicalPlan::Sort(Sort {
889 expr: sort_expr,
890 fetch,
891 ..
892 }) => {
893 let input = self.only_input(inputs)?;
894 Ok(LogicalPlan::Sort(Sort {
895 expr: expr
896 .into_iter()
897 .zip(sort_expr.iter())
898 .map(|(expr, sort)| sort.with_expr(expr))
899 .collect(),
900 input: Arc::new(input),
901 fetch: *fetch,
902 }))
903 }
904 LogicalPlan::Join(Join {
905 join_type,
906 join_constraint,
907 on,
908 null_equality,
909 null_aware,
910 ..
911 }) => {
912 let (left, right) = self.only_two_inputs(inputs)?;
913 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
914
915 let equi_expr_count = on.len() * 2;
916 assert!(expr.len() >= equi_expr_count);
917
918 let filter_expr = if expr.len() > equi_expr_count {
921 expr.pop()
922 } else {
923 None
924 };
925
926 assert_eq!(expr.len(), equi_expr_count);
929 let mut new_on = Vec::with_capacity(on.len());
930 let mut iter = expr.into_iter();
931 while let Some(left) = iter.next() {
932 let Some(right) = iter.next() else {
933 internal_err!(
934 "Expected a pair of expressions to construct the join on expression"
935 )?
936 };
937
938 new_on.push((left.unalias(), right.unalias()));
940 }
941
942 Ok(LogicalPlan::Join(Join {
943 left: Arc::new(left),
944 right: Arc::new(right),
945 join_type: *join_type,
946 join_constraint: *join_constraint,
947 on: new_on,
948 filter: filter_expr,
949 schema: DFSchemaRef::new(schema),
950 null_equality: *null_equality,
951 null_aware: *null_aware,
952 }))
953 }
954 LogicalPlan::Subquery(Subquery {
955 outer_ref_columns,
956 spans,
957 ..
958 }) => {
959 self.assert_no_expressions(expr)?;
960 let input = self.only_input(inputs)?;
961 let subquery = LogicalPlanBuilder::from(input).build()?;
962 Ok(LogicalPlan::Subquery(Subquery {
963 subquery: Arc::new(subquery),
964 outer_ref_columns: outer_ref_columns.clone(),
965 spans: spans.clone(),
966 }))
967 }
968 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
969 self.assert_no_expressions(expr)?;
970 let input = self.only_input(inputs)?;
971 SubqueryAlias::try_new(Arc::new(input), alias.clone())
972 .map(LogicalPlan::SubqueryAlias)
973 }
974 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
975 let old_expr_len = skip.iter().chain(fetch.iter()).count();
976 assert_eq_or_internal_err!(
977 old_expr_len,
978 expr.len(),
979 "Invalid number of new Limit expressions: expected {}, got {}",
980 old_expr_len,
981 expr.len()
982 );
983 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
985 let new_skip = skip.as_ref().and_then(|_| expr.pop());
986 let input = self.only_input(inputs)?;
987 Ok(LogicalPlan::Limit(Limit {
988 skip: new_skip.map(Box::new),
989 fetch: new_fetch.map(Box::new),
990 input: Arc::new(input),
991 }))
992 }
993 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
994 name,
995 if_not_exists,
996 or_replace,
997 column_defaults,
998 temporary,
999 ..
1000 })) => {
1001 self.assert_no_expressions(expr)?;
1002 let input = self.only_input(inputs)?;
1003 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
1004 CreateMemoryTable {
1005 input: Arc::new(input),
1006 constraints: Constraints::default(),
1007 name: name.clone(),
1008 if_not_exists: *if_not_exists,
1009 or_replace: *or_replace,
1010 column_defaults: column_defaults.clone(),
1011 temporary: *temporary,
1012 },
1013 )))
1014 }
1015 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1016 name,
1017 or_replace,
1018 definition,
1019 temporary,
1020 ..
1021 })) => {
1022 self.assert_no_expressions(expr)?;
1023 let input = self.only_input(inputs)?;
1024 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1025 input: Arc::new(input),
1026 name: name.clone(),
1027 or_replace: *or_replace,
1028 temporary: *temporary,
1029 definition: definition.clone(),
1030 })))
1031 }
1032 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1033 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1034 })),
1035 LogicalPlan::Union(Union { schema, .. }) => {
1036 self.assert_no_expressions(expr)?;
1037 let input_schema = inputs[0].schema();
1038 let schema = if schema.fields().len() == input_schema.fields().len() {
1040 Arc::clone(schema)
1041 } else {
1042 Arc::clone(input_schema)
1043 };
1044 Ok(LogicalPlan::Union(Union {
1045 inputs: inputs.into_iter().map(Arc::new).collect(),
1046 schema,
1047 }))
1048 }
1049 LogicalPlan::Distinct(distinct) => {
1050 let distinct = match distinct {
1051 Distinct::All(_) => {
1052 self.assert_no_expressions(expr)?;
1053 let input = self.only_input(inputs)?;
1054 Distinct::All(Arc::new(input))
1055 }
1056 Distinct::On(DistinctOn {
1057 on_expr,
1058 select_expr,
1059 ..
1060 }) => {
1061 let input = self.only_input(inputs)?;
1062 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1063 let select_expr = expr.split_off(on_expr.len());
1064 assert!(
1065 sort_expr.is_empty(),
1066 "with_new_exprs for Distinct does not support sort expressions"
1067 );
1068 Distinct::On(DistinctOn::try_new(
1069 expr,
1070 select_expr,
1071 None, Arc::new(input),
1073 )?)
1074 }
1075 };
1076 Ok(LogicalPlan::Distinct(distinct))
1077 }
1078 LogicalPlan::RecursiveQuery(RecursiveQuery {
1079 name, is_distinct, ..
1080 }) => {
1081 self.assert_no_expressions(expr)?;
1082 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1083 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1084 name: name.clone(),
1085 static_term: Arc::new(static_term),
1086 recursive_term: Arc::new(recursive_term),
1087 is_distinct: *is_distinct,
1088 }))
1089 }
1090 LogicalPlan::Analyze(a) => {
1091 self.assert_no_expressions(expr)?;
1092 let input = self.only_input(inputs)?;
1093 Ok(LogicalPlan::Analyze(Analyze {
1094 verbose: a.verbose,
1095 schema: Arc::clone(&a.schema),
1096 input: Arc::new(input),
1097 }))
1098 }
1099 LogicalPlan::Explain(e) => {
1100 self.assert_no_expressions(expr)?;
1101 let input = self.only_input(inputs)?;
1102 Ok(LogicalPlan::Explain(Explain {
1103 verbose: e.verbose,
1104 plan: Arc::new(input),
1105 explain_format: e.explain_format.clone(),
1106 stringified_plans: e.stringified_plans.clone(),
1107 schema: Arc::clone(&e.schema),
1108 logical_optimization_succeeded: e.logical_optimization_succeeded,
1109 }))
1110 }
1111 LogicalPlan::Statement(Statement::Prepare(Prepare {
1112 name, fields, ..
1113 })) => {
1114 self.assert_no_expressions(expr)?;
1115 let input = self.only_input(inputs)?;
1116 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1117 name: name.clone(),
1118 fields: fields.clone(),
1119 input: Arc::new(input),
1120 })))
1121 }
1122 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1123 self.assert_no_inputs(inputs)?;
1124 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1125 name: name.clone(),
1126 parameters: expr,
1127 })))
1128 }
1129 LogicalPlan::TableScan(ts) => {
1130 self.assert_no_inputs(inputs)?;
1131 Ok(LogicalPlan::TableScan(TableScan {
1132 filters: expr,
1133 ..ts.clone()
1134 }))
1135 }
1136 LogicalPlan::EmptyRelation(_)
1137 | LogicalPlan::Ddl(_)
1138 | LogicalPlan::Statement(_)
1139 | LogicalPlan::DescribeTable(_) => {
1140 self.assert_no_expressions(expr)?;
1142 self.assert_no_inputs(inputs)?;
1143 Ok(self.clone())
1144 }
1145 LogicalPlan::Unnest(Unnest {
1146 exec_columns: columns,
1147 options,
1148 ..
1149 }) => {
1150 self.assert_no_expressions(expr)?;
1151 let input = self.only_input(inputs)?;
1152 let new_plan =
1154 unnest_with_options(input, columns.clone(), options.clone())?;
1155 Ok(new_plan)
1156 }
1157 }
1158 }
1159
1160 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1162 match check {
1163 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1164 InvariantLevel::Executable => assert_executable_invariants(self),
1165 }
1166 }
1167
1168 #[inline]
1170 #[expect(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1172 assert_or_internal_err!(
1173 expr.is_empty(),
1174 "{self:?} should have no exprs, got {:?}",
1175 expr
1176 );
1177 Ok(())
1178 }
1179
1180 #[inline]
1182 #[expect(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1184 assert_or_internal_err!(
1185 inputs.is_empty(),
1186 "{self:?} should have no inputs, got: {:?}",
1187 inputs
1188 );
1189 Ok(())
1190 }
1191
1192 #[inline]
1194 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1195 assert_eq_or_internal_err!(
1196 expr.len(),
1197 1,
1198 "{self:?} should have exactly one expr, got {:?}",
1199 &expr
1200 );
1201 Ok(expr.remove(0))
1202 }
1203
1204 #[inline]
1206 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1207 assert_eq_or_internal_err!(
1208 inputs.len(),
1209 1,
1210 "{self:?} should have exactly one input, got {:?}",
1211 &inputs
1212 );
1213 Ok(inputs.remove(0))
1214 }
1215
1216 #[inline]
1218 fn only_two_inputs(
1219 &self,
1220 mut inputs: Vec<LogicalPlan>,
1221 ) -> Result<(LogicalPlan, LogicalPlan)> {
1222 assert_eq_or_internal_err!(
1223 inputs.len(),
1224 2,
1225 "{self:?} should have exactly two inputs, got {:?}",
1226 &inputs
1227 );
1228 let right = inputs.remove(1);
1229 let left = inputs.remove(0);
1230 Ok((left, right))
1231 }
1232
1233 pub fn with_param_values(
1286 self,
1287 param_values: impl Into<ParamValues>,
1288 ) -> Result<LogicalPlan> {
1289 let param_values = param_values.into();
1290 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1291
1292 Ok(
1294 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1295 plan_with_values
1296 {
1297 param_values.verify_fields(&prepare_lp.fields)?;
1298 Arc::unwrap_or_clone(prepare_lp.input)
1300 } else {
1301 plan_with_values
1302 },
1303 )
1304 }
1305
1306 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1311 match self {
1312 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1313 LogicalPlan::Filter(filter) => {
1314 if filter.is_scalar() {
1315 Some(1)
1316 } else {
1317 filter.input.max_rows()
1318 }
1319 }
1320 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1321 LogicalPlan::Aggregate(Aggregate {
1322 input, group_expr, ..
1323 }) => {
1324 if group_expr
1326 .iter()
1327 .all(|expr| matches!(expr, Expr::Literal(_, _)))
1328 {
1329 Some(1)
1330 } else {
1331 input.max_rows()
1332 }
1333 }
1334 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1335 match (fetch, input.max_rows()) {
1336 (Some(fetch_limit), Some(input_max)) => {
1337 Some(input_max.min(*fetch_limit))
1338 }
1339 (Some(fetch_limit), None) => Some(*fetch_limit),
1340 (None, Some(input_max)) => Some(input_max),
1341 (None, None) => None,
1342 }
1343 }
1344 LogicalPlan::Join(Join {
1345 left,
1346 right,
1347 join_type,
1348 ..
1349 }) => match join_type {
1350 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1351 JoinType::Left | JoinType::Right | JoinType::Full => {
1352 match (left.max_rows()?, right.max_rows()?, join_type) {
1353 (0, 0, _) => Some(0),
1354 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1355 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1356 (left_max, right_max, _) => Some(left_max * right_max),
1357 }
1358 }
1359 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1360 left.max_rows()
1361 }
1362 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
1363 right.max_rows()
1364 }
1365 },
1366 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1367 LogicalPlan::Union(Union { inputs, .. }) => {
1368 inputs.iter().try_fold(0usize, |mut acc, plan| {
1369 acc += plan.max_rows()?;
1370 Some(acc)
1371 })
1372 }
1373 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1374 LogicalPlan::EmptyRelation(_) => Some(0),
1375 LogicalPlan::RecursiveQuery(_) => None,
1376 LogicalPlan::Subquery(_) => None,
1377 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1378 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1379 Ok(FetchType::Literal(s)) => s,
1380 _ => None,
1381 },
1382 LogicalPlan::Distinct(
1383 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1384 ) => input.max_rows(),
1385 LogicalPlan::Values(v) => Some(v.values.len()),
1386 LogicalPlan::Unnest(_) => None,
1387 LogicalPlan::Ddl(_)
1388 | LogicalPlan::Explain(_)
1389 | LogicalPlan::Analyze(_)
1390 | LogicalPlan::Dml(_)
1391 | LogicalPlan::Copy(_)
1392 | LogicalPlan::DescribeTable(_)
1393 | LogicalPlan::Statement(_)
1394 | LogicalPlan::Extension(_) => None,
1395 }
1396 }
1397
1398 pub fn skip(&self) -> Result<Option<usize>> {
1403 match self {
1404 LogicalPlan::Limit(limit) => match limit.get_skip_type()? {
1405 SkipType::Literal(0) => Ok(None),
1406 SkipType::Literal(n) => Ok(Some(n)),
1407 SkipType::UnsupportedExpr => Ok(None),
1408 },
1409 LogicalPlan::Sort(_) => Ok(None),
1410 LogicalPlan::TableScan(_) => Ok(None),
1411 LogicalPlan::Projection(_) => Ok(None),
1412 LogicalPlan::Filter(_) => Ok(None),
1413 LogicalPlan::Window(_) => Ok(None),
1414 LogicalPlan::Aggregate(_) => Ok(None),
1415 LogicalPlan::Join(_) => Ok(None),
1416 LogicalPlan::Repartition(_) => Ok(None),
1417 LogicalPlan::Union(_) => Ok(None),
1418 LogicalPlan::EmptyRelation(_) => Ok(None),
1419 LogicalPlan::Subquery(_) => Ok(None),
1420 LogicalPlan::SubqueryAlias(_) => Ok(None),
1421 LogicalPlan::Statement(_) => Ok(None),
1422 LogicalPlan::Values(_) => Ok(None),
1423 LogicalPlan::Explain(_) => Ok(None),
1424 LogicalPlan::Analyze(_) => Ok(None),
1425 LogicalPlan::Extension(_) => Ok(None),
1426 LogicalPlan::Distinct(_) => Ok(None),
1427 LogicalPlan::Dml(_) => Ok(None),
1428 LogicalPlan::Ddl(_) => Ok(None),
1429 LogicalPlan::Copy(_) => Ok(None),
1430 LogicalPlan::DescribeTable(_) => Ok(None),
1431 LogicalPlan::Unnest(_) => Ok(None),
1432 LogicalPlan::RecursiveQuery(_) => Ok(None),
1433 }
1434 }
1435
1436 pub fn fetch(&self) -> Result<Option<usize>> {
1442 match self {
1443 LogicalPlan::Sort(Sort { fetch, .. }) => Ok(*fetch),
1444 LogicalPlan::TableScan(TableScan { fetch, .. }) => Ok(*fetch),
1445 LogicalPlan::Limit(limit) => match limit.get_fetch_type()? {
1446 FetchType::Literal(s) => Ok(s),
1447 FetchType::UnsupportedExpr => Ok(None),
1448 },
1449 LogicalPlan::Projection(_) => Ok(None),
1450 LogicalPlan::Filter(_) => Ok(None),
1451 LogicalPlan::Window(_) => Ok(None),
1452 LogicalPlan::Aggregate(_) => Ok(None),
1453 LogicalPlan::Join(_) => Ok(None),
1454 LogicalPlan::Repartition(_) => Ok(None),
1455 LogicalPlan::Union(_) => Ok(None),
1456 LogicalPlan::EmptyRelation(_) => Ok(None),
1457 LogicalPlan::Subquery(_) => Ok(None),
1458 LogicalPlan::SubqueryAlias(_) => Ok(None),
1459 LogicalPlan::Statement(_) => Ok(None),
1460 LogicalPlan::Values(_) => Ok(None),
1461 LogicalPlan::Explain(_) => Ok(None),
1462 LogicalPlan::Analyze(_) => Ok(None),
1463 LogicalPlan::Extension(_) => Ok(None),
1464 LogicalPlan::Distinct(_) => Ok(None),
1465 LogicalPlan::Dml(_) => Ok(None),
1466 LogicalPlan::Ddl(_) => Ok(None),
1467 LogicalPlan::Copy(_) => Ok(None),
1468 LogicalPlan::DescribeTable(_) => Ok(None),
1469 LogicalPlan::Unnest(_) => Ok(None),
1470 LogicalPlan::RecursiveQuery(_) => Ok(None),
1471 }
1472 }
1473
1474 pub fn contains_outer_reference(&self) -> bool {
1476 let mut contains = false;
1477 self.apply_expressions(|expr| {
1478 Ok(if expr.contains_outer() {
1479 contains = true;
1480 TreeNodeRecursion::Stop
1481 } else {
1482 TreeNodeRecursion::Continue
1483 })
1484 })
1485 .unwrap();
1486 contains
1487 }
1488
1489 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1497 match self {
1498 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1499 .output_expressions()?
1500 .into_iter()
1501 .zip(self.schema().columns())
1502 .collect()),
1503 LogicalPlan::Window(Window {
1504 window_expr,
1505 input,
1506 schema,
1507 }) => {
1508 let mut output_exprs = input.columnized_output_exprs()?;
1516 let input_len = input.schema().fields().len();
1517 output_exprs.extend(
1518 window_expr
1519 .iter()
1520 .zip(schema.columns().into_iter().skip(input_len)),
1521 );
1522 Ok(output_exprs)
1523 }
1524 _ => Ok(vec![]),
1525 }
1526 }
1527}
1528
1529impl LogicalPlan {
1530 pub fn replace_params_with_values(
1537 self,
1538 param_values: &ParamValues,
1539 ) -> Result<LogicalPlan> {
1540 self.transform_up_with_subqueries(|plan| {
1541 let schema = Arc::clone(plan.schema());
1542 let name_preserver = NamePreserver::new(&plan);
1543 plan.map_expressions(|e| {
1544 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1545 if !has_placeholder {
1546 Ok(Transformed::no(e))
1550 } else {
1551 let original_name = name_preserver.save(&e);
1552 let transformed_expr = e.transform_up(|e| {
1553 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1554 let (value, metadata) = param_values
1555 .get_placeholders_with_values(&id)?
1556 .into_inner();
1557 Ok(Transformed::yes(Expr::Literal(value, metadata)))
1558 } else {
1559 Ok(Transformed::no(e))
1560 }
1561 })?;
1562 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1564 }
1565 })?
1566 .map_data(|plan| plan.update_schema_data_type())
1567 })
1568 .map(|res| res.data)
1569 }
1570
1571 fn update_schema_data_type(self) -> Result<LogicalPlan> {
1577 match self {
1578 LogicalPlan::Values(Values { values, schema: _ }) => {
1582 LogicalPlanBuilder::values(values)?.build()
1583 }
1584 plan => plan.recompute_schema(),
1586 }
1587 }
1588
1589 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1591 let mut param_names = HashSet::new();
1592 self.apply_with_subqueries(|plan| {
1593 plan.apply_expressions(|expr| {
1594 expr.apply(|expr| {
1595 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1596 param_names.insert(id.clone());
1597 }
1598 Ok(TreeNodeRecursion::Continue)
1599 })
1600 })
1601 })
1602 .map(|_| param_names)
1603 }
1604
1605 pub fn get_parameter_types(
1610 &self,
1611 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1612 let mut parameter_fields = self.get_parameter_fields()?;
1613 Ok(parameter_fields
1614 .drain()
1615 .map(|(name, maybe_field)| {
1616 (name, maybe_field.map(|field| field.data_type().clone()))
1617 })
1618 .collect())
1619 }
1620
1621 pub fn get_parameter_fields(
1623 &self,
1624 ) -> Result<HashMap<String, Option<FieldRef>>, DataFusionError> {
1625 let mut param_types: HashMap<String, Option<FieldRef>> = HashMap::new();
1626
1627 self.apply_with_subqueries(|plan| {
1628 plan.apply_expressions(|expr| {
1629 expr.apply(|expr| {
1630 if let Expr::Placeholder(Placeholder { id, field }) = expr {
1631 let prev = param_types.get(id);
1632 match (prev, field) {
1633 (Some(Some(prev)), Some(field)) => {
1634 check_metadata_with_storage_equal(
1635 (field.data_type(), Some(field.metadata())),
1636 (prev.data_type(), Some(prev.metadata())),
1637 "parameter",
1638 &format!(": Conflicting types for id {id}"),
1639 )?;
1640 }
1641 (_, Some(field)) => {
1642 param_types.insert(id.clone(), Some(Arc::clone(field)));
1643 }
1644 _ => {
1645 param_types.insert(id.clone(), None);
1646 }
1647 }
1648 }
1649 Ok(TreeNodeRecursion::Continue)
1650 })
1651 })
1652 })
1653 .map(|_| param_types)
1654 }
1655
1656 pub fn display_indent(&self) -> impl Display + '_ {
1688 struct Wrapper<'a>(&'a LogicalPlan);
1691 impl Display for Wrapper<'_> {
1692 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1693 let with_schema = false;
1694 let mut visitor = IndentVisitor::new(f, with_schema);
1695 match self.0.visit_with_subqueries(&mut visitor) {
1696 Ok(_) => Ok(()),
1697 Err(_) => Err(fmt::Error),
1698 }
1699 }
1700 }
1701 Wrapper(self)
1702 }
1703
1704 pub fn display_indent_schema(&self) -> impl Display + '_ {
1734 struct Wrapper<'a>(&'a LogicalPlan);
1737 impl Display for Wrapper<'_> {
1738 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1739 let with_schema = true;
1740 let mut visitor = IndentVisitor::new(f, with_schema);
1741 match self.0.visit_with_subqueries(&mut visitor) {
1742 Ok(_) => Ok(()),
1743 Err(_) => Err(fmt::Error),
1744 }
1745 }
1746 }
1747 Wrapper(self)
1748 }
1749
1750 pub fn display_pg_json(&self) -> impl Display + '_ {
1754 struct Wrapper<'a>(&'a LogicalPlan);
1757 impl Display for Wrapper<'_> {
1758 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1759 let mut visitor = PgJsonVisitor::new(f);
1760 visitor.with_schema(true);
1761 match self.0.visit_with_subqueries(&mut visitor) {
1762 Ok(_) => Ok(()),
1763 Err(_) => Err(fmt::Error),
1764 }
1765 }
1766 }
1767 Wrapper(self)
1768 }
1769
1770 pub fn display_graphviz(&self) -> impl Display + '_ {
1800 struct Wrapper<'a>(&'a LogicalPlan);
1803 impl Display for Wrapper<'_> {
1804 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1805 let mut visitor = GraphvizVisitor::new(f);
1806
1807 visitor.start_graph()?;
1808
1809 visitor.pre_visit_plan("LogicalPlan")?;
1810 self.0
1811 .visit_with_subqueries(&mut visitor)
1812 .map_err(|_| fmt::Error)?;
1813 visitor.post_visit_plan()?;
1814
1815 visitor.set_with_schema(true);
1816 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1817 self.0
1818 .visit_with_subqueries(&mut visitor)
1819 .map_err(|_| fmt::Error)?;
1820 visitor.post_visit_plan()?;
1821
1822 visitor.end_graph()?;
1823 Ok(())
1824 }
1825 }
1826 Wrapper(self)
1827 }
1828
1829 pub fn display(&self) -> impl Display + '_ {
1851 struct Wrapper<'a>(&'a LogicalPlan);
1854 impl Display for Wrapper<'_> {
1855 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1856 match self.0 {
1857 LogicalPlan::EmptyRelation(EmptyRelation {
1858 produce_one_row,
1859 schema: _,
1860 }) => {
1861 let rows = if *produce_one_row { 1 } else { 0 };
1862 write!(f, "EmptyRelation: rows={rows}")
1863 }
1864 LogicalPlan::RecursiveQuery(RecursiveQuery {
1865 is_distinct, ..
1866 }) => {
1867 write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1868 }
1869 LogicalPlan::Values(Values { values, .. }) => {
1870 let str_values: Vec<_> = values
1871 .iter()
1872 .take(5)
1874 .map(|row| {
1875 let item = row
1876 .iter()
1877 .map(|expr| expr.to_string())
1878 .collect::<Vec<_>>()
1879 .join(", ");
1880 format!("({item})")
1881 })
1882 .collect();
1883
1884 let eclipse = if values.len() > 5 { "..." } else { "" };
1885 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1886 }
1887
1888 LogicalPlan::TableScan(TableScan {
1889 source,
1890 table_name,
1891 projection,
1892 filters,
1893 fetch,
1894 ..
1895 }) => {
1896 let projected_fields = match projection {
1897 Some(indices) => {
1898 let schema = source.schema();
1899 let names: Vec<&str> = indices
1900 .iter()
1901 .map(|i| schema.field(*i).name().as_str())
1902 .collect();
1903 format!(" projection=[{}]", names.join(", "))
1904 }
1905 _ => "".to_string(),
1906 };
1907
1908 write!(f, "TableScan: {table_name}{projected_fields}")?;
1909
1910 if !filters.is_empty() {
1911 let mut full_filter = vec![];
1912 let mut partial_filter = vec![];
1913 let mut unsupported_filters = vec![];
1914 let filters: Vec<&Expr> = filters.iter().collect();
1915
1916 if let Ok(results) =
1917 source.supports_filters_pushdown(&filters)
1918 {
1919 filters.iter().zip(results.iter()).for_each(
1920 |(x, res)| match res {
1921 TableProviderFilterPushDown::Exact => {
1922 full_filter.push(x)
1923 }
1924 TableProviderFilterPushDown::Inexact => {
1925 partial_filter.push(x)
1926 }
1927 TableProviderFilterPushDown::Unsupported => {
1928 unsupported_filters.push(x)
1929 }
1930 },
1931 );
1932 }
1933
1934 if !full_filter.is_empty() {
1935 write!(
1936 f,
1937 ", full_filters=[{}]",
1938 expr_vec_fmt!(full_filter)
1939 )?;
1940 };
1941 if !partial_filter.is_empty() {
1942 write!(
1943 f,
1944 ", partial_filters=[{}]",
1945 expr_vec_fmt!(partial_filter)
1946 )?;
1947 }
1948 if !unsupported_filters.is_empty() {
1949 write!(
1950 f,
1951 ", unsupported_filters=[{}]",
1952 expr_vec_fmt!(unsupported_filters)
1953 )?;
1954 }
1955 }
1956
1957 if let Some(n) = fetch {
1958 write!(f, ", fetch={n}")?;
1959 }
1960
1961 Ok(())
1962 }
1963 LogicalPlan::Projection(Projection { expr, .. }) => {
1964 write!(f, "Projection:")?;
1965 for (i, expr_item) in expr.iter().enumerate() {
1966 if i > 0 {
1967 write!(f, ",")?;
1968 }
1969 write!(f, " {expr_item}")?;
1970 }
1971 Ok(())
1972 }
1973 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1974 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1975 }
1976 LogicalPlan::Copy(CopyTo {
1977 input: _,
1978 output_url,
1979 file_type,
1980 options,
1981 ..
1982 }) => {
1983 let op_str = options
1984 .iter()
1985 .map(|(k, v)| format!("{k} {v}"))
1986 .collect::<Vec<String>>()
1987 .join(", ");
1988
1989 write!(
1990 f,
1991 "CopyTo: format={} output_url={output_url} options: ({op_str})",
1992 file_type.get_ext()
1993 )
1994 }
1995 LogicalPlan::Ddl(ddl) => {
1996 write!(f, "{}", ddl.display())
1997 }
1998 LogicalPlan::Filter(Filter {
1999 predicate: expr, ..
2000 }) => write!(f, "Filter: {expr}"),
2001 LogicalPlan::Window(Window { window_expr, .. }) => {
2002 write!(
2003 f,
2004 "WindowAggr: windowExpr=[[{}]]",
2005 expr_vec_fmt!(window_expr)
2006 )
2007 }
2008 LogicalPlan::Aggregate(Aggregate {
2009 group_expr,
2010 aggr_expr,
2011 ..
2012 }) => write!(
2013 f,
2014 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
2015 expr_vec_fmt!(group_expr),
2016 expr_vec_fmt!(aggr_expr)
2017 ),
2018 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
2019 write!(f, "Sort: ")?;
2020 for (i, expr_item) in expr.iter().enumerate() {
2021 if i > 0 {
2022 write!(f, ", ")?;
2023 }
2024 write!(f, "{expr_item}")?;
2025 }
2026 if let Some(a) = fetch {
2027 write!(f, ", fetch={a}")?;
2028 }
2029
2030 Ok(())
2031 }
2032 LogicalPlan::Join(Join {
2033 on: keys,
2034 filter,
2035 join_constraint,
2036 join_type,
2037 ..
2038 }) => {
2039 let join_expr: Vec<String> =
2040 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
2041 let filter_expr = filter
2042 .as_ref()
2043 .map(|expr| format!(" Filter: {expr}"))
2044 .unwrap_or_else(|| "".to_string());
2045 let join_type = if filter.is_none()
2046 && keys.is_empty()
2047 && *join_type == JoinType::Inner
2048 {
2049 "Cross".to_string()
2050 } else {
2051 join_type.to_string()
2052 };
2053 match join_constraint {
2054 JoinConstraint::On => {
2055 write!(f, "{join_type} Join:",)?;
2056 if !join_expr.is_empty() || !filter_expr.is_empty() {
2057 write!(
2058 f,
2059 " {}{}",
2060 join_expr.join(", "),
2061 filter_expr
2062 )?;
2063 }
2064 Ok(())
2065 }
2066 JoinConstraint::Using => {
2067 write!(
2068 f,
2069 "{} Join: Using {}{}",
2070 join_type,
2071 join_expr.join(", "),
2072 filter_expr,
2073 )
2074 }
2075 }
2076 }
2077 LogicalPlan::Repartition(Repartition {
2078 partitioning_scheme,
2079 ..
2080 }) => match partitioning_scheme {
2081 Partitioning::RoundRobinBatch(n) => {
2082 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
2083 }
2084 Partitioning::Hash(expr, n) => {
2085 let hash_expr: Vec<String> =
2086 expr.iter().map(|e| format!("{e}")).collect();
2087 write!(
2088 f,
2089 "Repartition: Hash({}) partition_count={}",
2090 hash_expr.join(", "),
2091 n
2092 )
2093 }
2094 Partitioning::DistributeBy(expr) => {
2095 let dist_by_expr: Vec<String> =
2096 expr.iter().map(|e| format!("{e}")).collect();
2097 write!(
2098 f,
2099 "Repartition: DistributeBy({})",
2100 dist_by_expr.join(", "),
2101 )
2102 }
2103 },
2104 LogicalPlan::Limit(limit) => {
2105 let skip_str = match limit.get_skip_type() {
2107 Ok(SkipType::Literal(n)) => n.to_string(),
2108 _ => limit
2109 .skip
2110 .as_ref()
2111 .map_or_else(|| "None".to_string(), |x| x.to_string()),
2112 };
2113 let fetch_str = match limit.get_fetch_type() {
2114 Ok(FetchType::Literal(Some(n))) => n.to_string(),
2115 Ok(FetchType::Literal(None)) => "None".to_string(),
2116 _ => limit
2117 .fetch
2118 .as_ref()
2119 .map_or_else(|| "None".to_string(), |x| x.to_string()),
2120 };
2121 write!(f, "Limit: skip={skip_str}, fetch={fetch_str}",)
2122 }
2123 LogicalPlan::Subquery(Subquery { .. }) => {
2124 write!(f, "Subquery:")
2125 }
2126 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
2127 write!(f, "SubqueryAlias: {alias}")
2128 }
2129 LogicalPlan::Statement(statement) => {
2130 write!(f, "{}", statement.display())
2131 }
2132 LogicalPlan::Distinct(distinct) => match distinct {
2133 Distinct::All(_) => write!(f, "Distinct:"),
2134 Distinct::On(DistinctOn {
2135 on_expr,
2136 select_expr,
2137 sort_expr,
2138 ..
2139 }) => write!(
2140 f,
2141 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
2142 expr_vec_fmt!(on_expr),
2143 expr_vec_fmt!(select_expr),
2144 if let Some(sort_expr) = sort_expr {
2145 expr_vec_fmt!(sort_expr)
2146 } else {
2147 "".to_string()
2148 },
2149 ),
2150 },
2151 LogicalPlan::Explain { .. } => write!(f, "Explain"),
2152 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
2153 LogicalPlan::Union(_) => write!(f, "Union"),
2154 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
2155 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
2156 write!(f, "DescribeTable")
2157 }
2158 LogicalPlan::Unnest(Unnest {
2159 input: plan,
2160 list_type_columns: list_col_indices,
2161 struct_type_columns: struct_col_indices,
2162 ..
2163 }) => {
2164 let input_columns = plan.schema().columns();
2165 let list_type_columns = list_col_indices
2166 .iter()
2167 .map(|(i, unnest_info)| {
2168 format!(
2169 "{}|depth={}",
2170 &input_columns[*i].to_string(),
2171 unnest_info.depth
2172 )
2173 })
2174 .collect::<Vec<String>>();
2175 let struct_type_columns = struct_col_indices
2176 .iter()
2177 .map(|i| &input_columns[*i])
2178 .collect::<Vec<&Column>>();
2179 write!(
2181 f,
2182 "Unnest: lists[{}] structs[{}]",
2183 expr_vec_fmt!(list_type_columns),
2184 expr_vec_fmt!(struct_type_columns)
2185 )
2186 }
2187 }
2188 }
2189 }
2190 Wrapper(self)
2191 }
2192
2193 pub fn resolve_lambda_variables(self) -> Result<Transformed<LogicalPlan>> {
2197 self.transform_with_subqueries(|plan| {
2198 let schema = merge_schema(&plan.inputs());
2199
2200 plan.map_expressions(|expr| expr.resolve_lambda_variables(&schema))
2201 })
2202 }
2203}
2204
2205impl Display for LogicalPlan {
2206 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2207 self.display_indent().fmt(f)
2208 }
2209}
2210
2211impl ToStringifiedPlan for LogicalPlan {
2212 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2213 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2214 }
2215}
2216
2217#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2221pub struct EmptyRelation {
2222 pub produce_one_row: bool,
2224 pub schema: DFSchemaRef,
2226}
2227
2228impl PartialOrd for EmptyRelation {
2230 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2231 self.produce_one_row
2232 .partial_cmp(&other.produce_one_row)
2233 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2235 }
2236}
2237
2238#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2261pub struct RecursiveQuery {
2262 pub name: String,
2264 pub static_term: Arc<LogicalPlan>,
2266 pub recursive_term: Arc<LogicalPlan>,
2269 pub is_distinct: bool,
2272}
2273
2274#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2278pub struct Values {
2279 pub schema: DFSchemaRef,
2281 pub values: Vec<Vec<Expr>>,
2283}
2284
2285impl PartialOrd for Values {
2287 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2288 self.values
2289 .partial_cmp(&other.values)
2290 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2292 }
2293}
2294
2295#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2298#[non_exhaustive]
2300pub struct Projection {
2301 pub expr: Vec<Expr>,
2303 pub input: Arc<LogicalPlan>,
2305 pub schema: DFSchemaRef,
2307}
2308
2309impl PartialOrd for Projection {
2311 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2312 match self.expr.partial_cmp(&other.expr) {
2313 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2314 cmp => cmp,
2315 }
2316 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2318 }
2319}
2320
2321impl Projection {
2322 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2324 let projection_schema = projection_schema(&input, &expr)?;
2325 Self::try_new_with_schema(expr, input, projection_schema)
2326 }
2327
2328 pub fn try_new_with_schema(
2330 expr: Vec<Expr>,
2331 input: Arc<LogicalPlan>,
2332 schema: DFSchemaRef,
2333 ) -> Result<Self> {
2334 #[expect(deprecated)]
2335 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2336 && expr.len() != schema.fields().len()
2337 {
2338 return plan_err!(
2339 "Projection has mismatch between number of expressions ({}) and number of fields in schema ({})",
2340 expr.len(),
2341 schema.fields().len()
2342 );
2343 }
2344 Ok(Self {
2345 expr,
2346 input,
2347 schema,
2348 })
2349 }
2350
2351 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2353 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2354 Self {
2355 expr,
2356 input,
2357 schema,
2358 }
2359 }
2360}
2361
2362pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2382 let metadata = input.schema().metadata().clone();
2384
2385 let schema =
2387 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2388 .with_functional_dependencies(calc_func_dependencies_for_project(
2389 exprs, input,
2390 )?)?;
2391
2392 Ok(Arc::new(schema))
2393}
2394
2395#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2397#[non_exhaustive]
2399pub struct SubqueryAlias {
2400 pub input: Arc<LogicalPlan>,
2402 pub alias: TableReference,
2404 pub schema: DFSchemaRef,
2406}
2407
2408impl SubqueryAlias {
2409 pub fn try_new(
2410 plan: Arc<LogicalPlan>,
2411 alias: impl Into<TableReference>,
2412 ) -> Result<Self> {
2413 let alias = alias.into();
2414
2415 let aliases = unique_field_aliases(plan.schema().fields());
2421 let is_projection_needed = aliases.iter().any(Option::is_some);
2422
2423 let plan = if is_projection_needed {
2425 let projection_expressions = aliases
2426 .iter()
2427 .zip(plan.schema().iter())
2428 .map(|(alias, (qualifier, field))| {
2429 let column =
2430 Expr::Column(Column::new(qualifier.cloned(), field.name()));
2431 match alias {
2432 None => column,
2433 Some(alias) => {
2434 Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2435 }
2436 }
2437 })
2438 .collect();
2439 let projection = Projection::try_new(projection_expressions, plan)?;
2440 Arc::new(LogicalPlan::Projection(projection))
2441 } else {
2442 plan
2443 };
2444
2445 let fields = plan.schema().fields().clone();
2447 let meta_data = plan.schema().metadata().clone();
2448 let func_dependencies = plan.schema().functional_dependencies().clone();
2449
2450 let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2451 let schema = schema.as_arrow();
2452
2453 let schema = DFSchemaRef::new(
2454 DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2455 .with_functional_dependencies(func_dependencies)?,
2456 );
2457 Ok(SubqueryAlias {
2458 input: plan,
2459 alias,
2460 schema,
2461 })
2462 }
2463}
2464
2465impl PartialOrd for SubqueryAlias {
2467 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2468 match self.input.partial_cmp(&other.input) {
2469 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2470 cmp => cmp,
2471 }
2472 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2474 }
2475}
2476
2477#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2489#[non_exhaustive]
2490pub struct Filter {
2491 pub predicate: Expr,
2493 pub input: Arc<LogicalPlan>,
2495}
2496
2497impl Filter {
2498 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2503 Self::try_new_internal(predicate, input)
2504 }
2505
2506 #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2509 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2510 Self::try_new_internal(predicate, input)
2511 }
2512
2513 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2514 match data_type {
2515 DataType::Boolean | DataType::Null => true,
2517 DataType::Dictionary(_, value_type) => {
2518 Filter::is_allowed_filter_type(value_type.as_ref())
2519 }
2520 _ => false,
2521 }
2522 }
2523
2524 fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2525 if let Ok(predicate_type) = predicate.get_type(input.schema())
2530 && !Filter::is_allowed_filter_type(&predicate_type)
2531 {
2532 return plan_err!(
2533 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2534 );
2535 }
2536
2537 Ok(Self {
2538 predicate: predicate.unalias_nested().data,
2539 input,
2540 })
2541 }
2542
2543 fn is_scalar(&self) -> bool {
2559 let schema = self.input.schema();
2560
2561 let functional_dependencies = self.input.schema().functional_dependencies();
2562 let unique_keys = functional_dependencies.iter().filter(|dep| {
2563 let nullable = dep.nullable
2564 && dep
2565 .source_indices
2566 .iter()
2567 .any(|&source| schema.field(source).is_nullable());
2568 !nullable
2569 && dep.mode == Dependency::Single
2570 && dep.target_indices.len() == schema.fields().len()
2571 });
2572
2573 let exprs = split_conjunction(&self.predicate);
2574 let eq_pred_cols: HashSet<_> = exprs
2575 .iter()
2576 .filter_map(|expr| {
2577 let Expr::BinaryExpr(BinaryExpr {
2578 left,
2579 op: Operator::Eq,
2580 right,
2581 }) = expr
2582 else {
2583 return None;
2584 };
2585 if left == right {
2587 return None;
2588 }
2589
2590 match (left.as_ref(), right.as_ref()) {
2591 (Expr::Column(_), Expr::Column(_)) => None,
2592 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2593 Some(schema.index_of_column(c).unwrap())
2594 }
2595 _ => None,
2596 }
2597 })
2598 .collect();
2599
2600 for key in unique_keys {
2603 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2604 return true;
2605 }
2606 }
2607 false
2608 }
2609}
2610
2611#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2626pub struct Window {
2627 pub input: Arc<LogicalPlan>,
2629 pub window_expr: Vec<Expr>,
2631 pub schema: DFSchemaRef,
2633}
2634
2635impl Window {
2636 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2638 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2639 .schema()
2640 .iter()
2641 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2642 .collect();
2643 let input_len = fields.len();
2644 let mut window_fields = fields;
2645 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2646 window_fields.extend_from_slice(expr_fields.as_slice());
2647 let metadata = input.schema().metadata().clone();
2648
2649 let mut window_func_dependencies =
2651 input.schema().functional_dependencies().clone();
2652 window_func_dependencies.extend_target_indices(window_fields.len());
2653
2654 let mut new_dependencies = window_expr
2658 .iter()
2659 .enumerate()
2660 .filter_map(|(idx, expr)| {
2661 let Expr::WindowFunction(window_fun) = expr else {
2662 return None;
2663 };
2664 let WindowFunction {
2665 fun: WindowFunctionDefinition::WindowUDF(udwf),
2666 params: WindowFunctionParams { partition_by, .. },
2667 } = window_fun.as_ref()
2668 else {
2669 return None;
2670 };
2671 if udwf.name() == "row_number" && partition_by.is_empty() {
2674 Some(idx + input_len)
2675 } else {
2676 None
2677 }
2678 })
2679 .map(|idx| {
2680 FunctionalDependence::new(vec![idx], vec![], false)
2681 .with_mode(Dependency::Single)
2682 })
2683 .collect::<Vec<_>>();
2684
2685 if !new_dependencies.is_empty() {
2686 for dependence in new_dependencies.iter_mut() {
2687 dependence.target_indices = (0..window_fields.len()).collect();
2688 }
2689 let new_deps = FunctionalDependencies::new(new_dependencies);
2691 window_func_dependencies.extend(new_deps);
2692 }
2693
2694 if let Some(e) = window_expr.iter().find(|e| {
2696 matches!(
2697 e,
2698 Expr::WindowFunction(wf)
2699 if !matches!(wf.fun, WindowFunctionDefinition::AggregateUDF(_))
2700 && wf.params.filter.is_some()
2701 )
2702 }) {
2703 return plan_err!(
2704 "FILTER clause can only be used with aggregate window functions. Found in '{e}'"
2705 );
2706 }
2707
2708 Self::try_new_with_schema(
2709 window_expr,
2710 input,
2711 Arc::new(
2712 DFSchema::new_with_metadata(window_fields, metadata)?
2713 .with_functional_dependencies(window_func_dependencies)?,
2714 ),
2715 )
2716 }
2717
2718 pub fn try_new_with_schema(
2724 window_expr: Vec<Expr>,
2725 input: Arc<LogicalPlan>,
2726 schema: DFSchemaRef,
2727 ) -> Result<Self> {
2728 let input_fields_count = input.schema().fields().len();
2729 if schema.fields().len() != input_fields_count + window_expr.len() {
2730 return plan_err!(
2731 "Window schema has wrong number of fields. Expected {} got {}",
2732 input_fields_count + window_expr.len(),
2733 schema.fields().len()
2734 );
2735 }
2736
2737 Ok(Window {
2738 input,
2739 window_expr,
2740 schema,
2741 })
2742 }
2743}
2744
2745impl PartialOrd for Window {
2747 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2748 match self.input.partial_cmp(&other.input)? {
2749 Ordering::Equal => {} not_equal => return Some(not_equal),
2751 }
2752
2753 match self.window_expr.partial_cmp(&other.window_expr)? {
2754 Ordering::Equal => {} not_equal => return Some(not_equal),
2756 }
2757
2758 if self == other {
2761 Some(Ordering::Equal)
2762 } else {
2763 None
2764 }
2765 }
2766}
2767
2768#[derive(Clone)]
2770pub struct TableScan {
2771 pub table_name: TableReference,
2773 pub source: Arc<dyn TableSource>,
2775 pub projection: Option<Vec<usize>>,
2777 pub projected_schema: DFSchemaRef,
2779 pub filters: Vec<Expr>,
2781 pub fetch: Option<usize>,
2783}
2784
2785impl Debug for TableScan {
2786 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2787 f.debug_struct("TableScan")
2788 .field("table_name", &self.table_name)
2789 .field("source", &"...")
2790 .field("projection", &self.projection)
2791 .field("projected_schema", &self.projected_schema)
2792 .field("filters", &self.filters)
2793 .field("fetch", &self.fetch)
2794 .finish_non_exhaustive()
2795 }
2796}
2797
2798impl PartialEq for TableScan {
2799 fn eq(&self, other: &Self) -> bool {
2800 self.table_name == other.table_name
2801 && self.projection == other.projection
2802 && self.projected_schema == other.projected_schema
2803 && self.filters == other.filters
2804 && self.fetch == other.fetch
2805 }
2806}
2807
2808impl Eq for TableScan {}
2809
2810impl PartialOrd for TableScan {
2813 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2814 #[derive(PartialEq, PartialOrd)]
2815 struct ComparableTableScan<'a> {
2816 pub table_name: &'a TableReference,
2818 pub projection: &'a Option<Vec<usize>>,
2820 pub filters: &'a Vec<Expr>,
2822 pub fetch: &'a Option<usize>,
2824 }
2825 let comparable_self = ComparableTableScan {
2826 table_name: &self.table_name,
2827 projection: &self.projection,
2828 filters: &self.filters,
2829 fetch: &self.fetch,
2830 };
2831 let comparable_other = ComparableTableScan {
2832 table_name: &other.table_name,
2833 projection: &other.projection,
2834 filters: &other.filters,
2835 fetch: &other.fetch,
2836 };
2837 comparable_self
2838 .partial_cmp(&comparable_other)
2839 .filter(|cmp| *cmp != Ordering::Equal || self == other)
2841 }
2842}
2843
2844impl Hash for TableScan {
2845 fn hash<H: Hasher>(&self, state: &mut H) {
2846 self.table_name.hash(state);
2847 self.projection.hash(state);
2848 self.projected_schema.hash(state);
2849 self.filters.hash(state);
2850 self.fetch.hash(state);
2851 }
2852}
2853
2854impl TableScan {
2855 pub fn try_new(
2858 table_name: impl Into<TableReference>,
2859 table_source: Arc<dyn TableSource>,
2860 projection: Option<Vec<usize>>,
2861 filters: Vec<Expr>,
2862 fetch: Option<usize>,
2863 ) -> Result<Self> {
2864 let table_name = table_name.into();
2865
2866 if table_name.table().is_empty() {
2867 return plan_err!("table_name cannot be empty");
2868 }
2869 let schema = table_source.schema();
2870 let func_dependencies = FunctionalDependencies::new_from_constraints(
2871 table_source.constraints(),
2872 schema.fields.len(),
2873 );
2874 let projected_schema = projection
2875 .as_ref()
2876 .map(|p| {
2877 let projected_func_dependencies =
2878 func_dependencies.project_functional_dependencies(p, p.len());
2879
2880 let df_schema = DFSchema::new_with_metadata(
2881 p.iter()
2882 .map(|i| {
2883 (Some(table_name.clone()), Arc::clone(&schema.fields()[*i]))
2884 })
2885 .collect(),
2886 schema.metadata.clone(),
2887 )?;
2888 df_schema.with_functional_dependencies(projected_func_dependencies)
2889 })
2890 .unwrap_or_else(|| {
2891 let df_schema =
2892 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2893 df_schema.with_functional_dependencies(func_dependencies)
2894 })?;
2895 let projected_schema = Arc::new(projected_schema);
2896
2897 Ok(Self {
2898 table_name,
2899 source: table_source,
2900 projection,
2901 projected_schema,
2902 filters,
2903 fetch,
2904 })
2905 }
2906}
2907
2908#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2910pub struct Repartition {
2911 pub input: Arc<LogicalPlan>,
2913 pub partitioning_scheme: Partitioning,
2915}
2916
2917#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2919pub struct Union {
2920 pub inputs: Vec<Arc<LogicalPlan>>,
2922 pub schema: DFSchemaRef,
2924}
2925
2926impl Union {
2927 pub fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2930 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2931 Ok(Union { inputs, schema })
2932 }
2933
2934 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2939 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2940 Ok(Union { inputs, schema })
2941 }
2942
2943 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2947 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2948 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2949
2950 Ok(Union { inputs, schema })
2951 }
2952
2953 fn rewrite_inputs_from_schema(
2957 schema: &Arc<DFSchema>,
2958 inputs: Vec<Arc<LogicalPlan>>,
2959 ) -> Result<Vec<Arc<LogicalPlan>>> {
2960 let schema_width = schema.iter().count();
2961 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2962 for input in inputs {
2963 let mut expr = Vec::with_capacity(schema_width);
2967 for column in schema.columns() {
2968 if input
2969 .schema()
2970 .has_column_with_unqualified_name(column.name())
2971 {
2972 expr.push(Expr::Column(column));
2973 } else {
2974 expr.push(
2975 Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2976 );
2977 }
2978 }
2979 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2980 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2981 )));
2982 }
2983
2984 Ok(wrapped_inputs)
2985 }
2986
2987 fn derive_schema_from_inputs(
2996 inputs: &[Arc<LogicalPlan>],
2997 loose_types: bool,
2998 by_name: bool,
2999 ) -> Result<DFSchemaRef> {
3000 if inputs.len() < 2 {
3001 return plan_err!("UNION requires at least two inputs");
3002 }
3003
3004 if by_name {
3005 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
3006 } else {
3007 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
3008 }
3009 }
3010
3011 fn derive_schema_from_inputs_by_name(
3012 inputs: &[Arc<LogicalPlan>],
3013 loose_types: bool,
3014 ) -> Result<DFSchemaRef> {
3015 type FieldData<'a> =
3016 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
3017 let mut cols: Vec<(&str, FieldData)> = Vec::new();
3018 for input in inputs.iter() {
3019 for field in input.schema().fields() {
3020 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
3021 cols.iter_mut().find(|(name, _)| name == field.name())
3022 {
3023 if !loose_types && *data_type != field.data_type() {
3024 return plan_err!(
3025 "Found different types for field {}",
3026 field.name()
3027 );
3028 }
3029
3030 metadata.push(field.metadata());
3031 *is_nullable |= field.is_nullable();
3034 *occurrences += 1;
3035 } else {
3036 cols.push((
3037 field.name(),
3038 (
3039 field.data_type(),
3040 field.is_nullable(),
3041 vec![field.metadata()],
3042 1,
3043 ),
3044 ));
3045 }
3046 }
3047 }
3048
3049 let union_fields = cols
3050 .into_iter()
3051 .map(
3052 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
3053 let final_is_nullable = if occurrences == inputs.len() {
3057 is_nullable
3058 } else {
3059 true
3060 };
3061
3062 let mut field =
3063 Field::new(name, data_type.clone(), final_is_nullable);
3064 field.set_metadata(intersect_metadata_for_union(unmerged_metadata));
3065
3066 (None, Arc::new(field))
3067 },
3068 )
3069 .collect::<Vec<(Option<TableReference>, _)>>();
3070
3071 let union_schema_metadata = intersect_metadata_for_union(
3072 inputs.iter().map(|input| input.schema().metadata()),
3073 );
3074
3075 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3077 let schema = Arc::new(schema);
3078
3079 Ok(schema)
3080 }
3081
3082 fn derive_schema_from_inputs_by_position(
3083 inputs: &[Arc<LogicalPlan>],
3084 loose_types: bool,
3085 ) -> Result<DFSchemaRef> {
3086 let first_schema = inputs[0].schema();
3087 let fields_count = first_schema.fields().len();
3088 for input in inputs.iter().skip(1) {
3089 if fields_count != input.schema().fields().len() {
3090 return plan_err!(
3091 "UNION queries have different number of columns: \
3092 left has {} columns whereas right has {} columns",
3093 fields_count,
3094 input.schema().fields().len()
3095 );
3096 }
3097 }
3098
3099 let mut name_counts: HashMap<String, usize> = HashMap::new();
3100 let union_fields = (0..fields_count)
3101 .map(|i| {
3102 let fields = inputs
3103 .iter()
3104 .map(|input| input.schema().field(i))
3105 .collect::<Vec<_>>();
3106 let first_field = fields[0];
3107 let base_name = first_field.name().to_string();
3108
3109 let data_type = if loose_types {
3110 first_field.data_type()
3114 } else {
3115 fields.iter().skip(1).try_fold(
3116 first_field.data_type(),
3117 |acc, field| {
3118 if acc != field.data_type() {
3119 return plan_err!(
3120 "UNION field {i} have different type in inputs: \
3121 left has {} whereas right has {}",
3122 first_field.data_type(),
3123 field.data_type()
3124 );
3125 }
3126 Ok(acc)
3127 },
3128 )?
3129 };
3130 let nullable = fields.iter().any(|field| field.is_nullable());
3131
3132 let name = if let Some(count) = name_counts.get_mut(&base_name) {
3134 *count += 1;
3135 format!("{base_name}_{count}")
3136 } else {
3137 name_counts.insert(base_name.clone(), 0);
3138 base_name
3139 };
3140
3141 let mut field = Field::new(&name, data_type.clone(), nullable);
3142 let field_metadata = intersect_metadata_for_union(
3143 fields.iter().map(|field| field.metadata()),
3144 );
3145 field.set_metadata(field_metadata);
3146 Ok((None, Arc::new(field)))
3147 })
3148 .collect::<Result<_>>()?;
3149 let union_schema_metadata = intersect_metadata_for_union(
3150 inputs.iter().map(|input| input.schema().metadata()),
3151 );
3152
3153 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
3155 let schema = Arc::new(schema);
3156
3157 Ok(schema)
3158 }
3159}
3160
3161impl PartialOrd for Union {
3163 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3164 self.inputs
3165 .partial_cmp(&other.inputs)
3166 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3168 }
3169}
3170
3171#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3194pub struct DescribeTable {
3195 pub schema: Arc<Schema>,
3197 pub output_schema: DFSchemaRef,
3199}
3200
3201impl PartialOrd for DescribeTable {
3204 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
3205 None
3207 }
3208}
3209
3210#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3212pub struct ExplainOption {
3213 pub verbose: bool,
3215 pub analyze: bool,
3217 pub format: ExplainFormat,
3219}
3220
3221impl Default for ExplainOption {
3222 fn default() -> Self {
3223 ExplainOption {
3224 verbose: false,
3225 analyze: false,
3226 format: ExplainFormat::Indent,
3227 }
3228 }
3229}
3230
3231impl ExplainOption {
3232 pub fn with_verbose(mut self, verbose: bool) -> Self {
3234 self.verbose = verbose;
3235 self
3236 }
3237
3238 pub fn with_analyze(mut self, analyze: bool) -> Self {
3240 self.analyze = analyze;
3241 self
3242 }
3243
3244 pub fn with_format(mut self, format: ExplainFormat) -> Self {
3246 self.format = format;
3247 self
3248 }
3249}
3250
3251#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3258pub struct Explain {
3259 pub verbose: bool,
3261 pub explain_format: ExplainFormat,
3264 pub plan: Arc<LogicalPlan>,
3266 pub stringified_plans: Vec<StringifiedPlan>,
3268 pub schema: DFSchemaRef,
3270 pub logical_optimization_succeeded: bool,
3272}
3273
3274impl PartialOrd for Explain {
3276 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3277 #[derive(PartialEq, PartialOrd)]
3278 struct ComparableExplain<'a> {
3279 pub verbose: &'a bool,
3281 pub plan: &'a Arc<LogicalPlan>,
3283 pub stringified_plans: &'a Vec<StringifiedPlan>,
3285 pub logical_optimization_succeeded: &'a bool,
3287 }
3288 let comparable_self = ComparableExplain {
3289 verbose: &self.verbose,
3290 plan: &self.plan,
3291 stringified_plans: &self.stringified_plans,
3292 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3293 };
3294 let comparable_other = ComparableExplain {
3295 verbose: &other.verbose,
3296 plan: &other.plan,
3297 stringified_plans: &other.stringified_plans,
3298 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3299 };
3300 comparable_self
3301 .partial_cmp(&comparable_other)
3302 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3304 }
3305}
3306
3307#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3310pub struct Analyze {
3311 pub verbose: bool,
3313 pub input: Arc<LogicalPlan>,
3315 pub schema: DFSchemaRef,
3317}
3318
3319impl PartialOrd for Analyze {
3321 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3322 match self.verbose.partial_cmp(&other.verbose) {
3323 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3324 cmp => cmp,
3325 }
3326 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3328 }
3329}
3330
3331#[allow(clippy::allow_attributes)]
3336#[allow(clippy::derived_hash_with_manual_eq)]
3337#[derive(Debug, Clone, Eq, Hash)]
3338pub struct Extension {
3339 pub node: Arc<dyn UserDefinedLogicalNode>,
3341}
3342
3343impl PartialEq for Extension {
3347 fn eq(&self, other: &Self) -> bool {
3348 self.node.eq(&other.node)
3349 }
3350}
3351
3352impl PartialOrd for Extension {
3353 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3354 self.node.partial_cmp(&other.node)
3355 }
3356}
3357
3358#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3360pub struct Limit {
3361 pub skip: Option<Box<Expr>>,
3363 pub fetch: Option<Box<Expr>>,
3366 pub input: Arc<LogicalPlan>,
3368}
3369
3370pub enum SkipType {
3372 Literal(usize),
3374 UnsupportedExpr,
3376}
3377
3378pub enum FetchType {
3380 Literal(Option<usize>),
3383 UnsupportedExpr,
3385}
3386
3387impl Limit {
3388 pub fn get_skip_type(&self) -> Result<SkipType> {
3390 match self.skip.as_deref() {
3391 Some(expr) => match *expr {
3392 Expr::Literal(ScalarValue::Int64(s), _) => {
3393 let s = s.unwrap_or(0);
3395 if s >= 0 {
3396 Ok(SkipType::Literal(s as usize))
3397 } else {
3398 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3399 }
3400 }
3401 _ => Ok(SkipType::UnsupportedExpr),
3402 },
3403 None => Ok(SkipType::Literal(0)),
3405 }
3406 }
3407
3408 pub fn get_fetch_type(&self) -> Result<FetchType> {
3410 match self.fetch.as_deref() {
3411 Some(expr) => match *expr {
3412 Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3413 if s >= 0 {
3414 Ok(FetchType::Literal(Some(s as usize)))
3415 } else {
3416 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3417 }
3418 }
3419 Expr::Literal(ScalarValue::Int64(None), _) => {
3420 Ok(FetchType::Literal(None))
3421 }
3422 _ => Ok(FetchType::UnsupportedExpr),
3423 },
3424 None => Ok(FetchType::Literal(None)),
3425 }
3426 }
3427}
3428
3429#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3431pub enum Distinct {
3432 All(Arc<LogicalPlan>),
3434 On(DistinctOn),
3436}
3437
3438impl Distinct {
3439 pub fn input(&self) -> &Arc<LogicalPlan> {
3441 match self {
3442 Distinct::All(input) => input,
3443 Distinct::On(DistinctOn { input, .. }) => input,
3444 }
3445 }
3446}
3447
3448#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3450pub struct DistinctOn {
3451 pub on_expr: Vec<Expr>,
3453 pub select_expr: Vec<Expr>,
3455 pub sort_expr: Option<Vec<SortExpr>>,
3459 pub input: Arc<LogicalPlan>,
3461 pub schema: DFSchemaRef,
3463}
3464
3465impl DistinctOn {
3466 pub fn try_new(
3468 on_expr: Vec<Expr>,
3469 select_expr: Vec<Expr>,
3470 sort_expr: Option<Vec<SortExpr>>,
3471 input: Arc<LogicalPlan>,
3472 ) -> Result<Self> {
3473 if on_expr.is_empty() {
3474 return plan_err!("No `ON` expressions provided");
3475 }
3476
3477 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3478 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3479 .into_iter()
3480 .collect();
3481
3482 let dfschema = DFSchema::new_with_metadata(
3483 qualified_fields,
3484 input.schema().metadata().clone(),
3485 )?;
3486
3487 let mut distinct_on = DistinctOn {
3488 on_expr,
3489 select_expr,
3490 sort_expr: None,
3491 input,
3492 schema: Arc::new(dfschema),
3493 };
3494
3495 if let Some(sort_expr) = sort_expr {
3496 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3497 }
3498
3499 Ok(distinct_on)
3500 }
3501
3502 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3506 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3507
3508 let mut matched = true;
3510 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3511 if on != &sort.expr {
3512 matched = false;
3513 break;
3514 }
3515 }
3516
3517 if self.on_expr.len() > sort_expr.len() || !matched {
3518 return plan_err!(
3519 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3520 );
3521 }
3522
3523 self.sort_expr = Some(sort_expr);
3524 Ok(self)
3525 }
3526}
3527
3528impl PartialOrd for DistinctOn {
3530 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3531 #[derive(PartialEq, PartialOrd)]
3532 struct ComparableDistinctOn<'a> {
3533 pub on_expr: &'a Vec<Expr>,
3535 pub select_expr: &'a Vec<Expr>,
3537 pub sort_expr: &'a Option<Vec<SortExpr>>,
3541 pub input: &'a Arc<LogicalPlan>,
3543 }
3544 let comparable_self = ComparableDistinctOn {
3545 on_expr: &self.on_expr,
3546 select_expr: &self.select_expr,
3547 sort_expr: &self.sort_expr,
3548 input: &self.input,
3549 };
3550 let comparable_other = ComparableDistinctOn {
3551 on_expr: &other.on_expr,
3552 select_expr: &other.select_expr,
3553 sort_expr: &other.sort_expr,
3554 input: &other.input,
3555 };
3556 comparable_self
3557 .partial_cmp(&comparable_other)
3558 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3560 }
3561}
3562
3563#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3576#[non_exhaustive]
3578pub struct Aggregate {
3579 pub input: Arc<LogicalPlan>,
3581 pub group_expr: Vec<Expr>,
3583 pub aggr_expr: Vec<Expr>,
3587 pub schema: DFSchemaRef,
3589}
3590
3591impl Aggregate {
3592 pub fn try_new(
3594 input: Arc<LogicalPlan>,
3595 group_expr: Vec<Expr>,
3596 aggr_expr: Vec<Expr>,
3597 ) -> Result<Self> {
3598 let group_expr = enumerate_grouping_sets(group_expr)?;
3599
3600 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3601
3602 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3603
3604 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3605
3606 if is_grouping_set {
3608 qualified_fields = qualified_fields
3609 .into_iter()
3610 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3611 .collect::<Vec<_>>();
3612 let max_ordinal = max_grouping_set_duplicate_ordinal(&group_expr);
3613 qualified_fields.push((
3614 None,
3615 Field::new(
3616 Self::INTERNAL_GROUPING_ID,
3617 Self::grouping_id_type(qualified_fields.len(), max_ordinal),
3618 false,
3619 )
3620 .into(),
3621 ));
3622 }
3623
3624 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3625
3626 let schema = DFSchema::new_with_metadata(
3627 qualified_fields,
3628 input.schema().metadata().clone(),
3629 )?;
3630
3631 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3632 }
3633
3634 pub fn try_new_with_schema(
3640 input: Arc<LogicalPlan>,
3641 group_expr: Vec<Expr>,
3642 aggr_expr: Vec<Expr>,
3643 schema: DFSchemaRef,
3644 ) -> Result<Self> {
3645 if group_expr.is_empty() && aggr_expr.is_empty() {
3646 return plan_err!(
3647 "Aggregate requires at least one grouping or aggregate expression. \
3648 Aggregate without grouping expressions nor aggregate expressions is \
3649 logically equivalent to, but less efficient than, VALUES producing \
3650 single row. Please use VALUES instead."
3651 );
3652 }
3653 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3654 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3655 return plan_err!(
3656 "Aggregate schema has wrong number of fields. Expected {} got {}",
3657 group_expr_count + aggr_expr.len(),
3658 schema.fields().len()
3659 );
3660 }
3661
3662 let aggregate_func_dependencies =
3663 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3664 let new_schema = Arc::unwrap_or_clone(schema);
3665 let schema = Arc::new(
3666 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3667 );
3668 Ok(Self {
3669 input,
3670 group_expr,
3671 aggr_expr,
3672 schema,
3673 })
3674 }
3675
3676 fn is_grouping_set(&self) -> bool {
3677 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3678 }
3679
3680 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3682 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3683 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3684 });
3685 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3686 if self.is_grouping_set() {
3687 exprs.push(&INTERNAL_ID_EXPR);
3688 }
3689 exprs.extend(self.aggr_expr.iter());
3690 debug_assert!(exprs.len() == self.schema.fields().len());
3691 Ok(exprs)
3692 }
3693
3694 pub fn group_expr_len(&self) -> Result<usize> {
3698 grouping_set_expr_count(&self.group_expr)
3699 }
3700
3701 pub fn grouping_id_type(group_exprs: usize, max_ordinal: usize) -> DataType {
3713 let ordinal_bits = usize::BITS as usize - max_ordinal.leading_zeros() as usize;
3714 let total_bits = group_exprs + ordinal_bits;
3715 if total_bits <= 8 {
3716 DataType::UInt8
3717 } else if total_bits <= 16 {
3718 DataType::UInt16
3719 } else if total_bits <= 32 {
3720 DataType::UInt32
3721 } else {
3722 DataType::UInt64
3723 }
3724 }
3725
3726 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3759}
3760
3761impl PartialOrd for Aggregate {
3763 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3764 match self.input.partial_cmp(&other.input) {
3765 Some(Ordering::Equal) => {
3766 match self.group_expr.partial_cmp(&other.group_expr) {
3767 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3768 cmp => cmp,
3769 }
3770 }
3771 cmp => cmp,
3772 }
3773 .filter(|cmp| *cmp != Ordering::Equal || self == other)
3775 }
3776}
3777
3778#[allow(clippy::allow_attributes, clippy::mutable_key_type)] fn max_grouping_set_duplicate_ordinal(group_expr: &[Expr]) -> usize {
3786 if let Some(Expr::GroupingSet(GroupingSet::GroupingSets(sets))) = group_expr.first() {
3787 let mut counts: HashMap<&[Expr], usize> = HashMap::new();
3788 for set in sets {
3789 *counts.entry(set).or_insert(0) += 1;
3790 }
3791 counts.into_values().max().unwrap_or(0).saturating_sub(1)
3792 } else {
3793 0
3794 }
3795}
3796
3797fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3799 group_expr
3800 .iter()
3801 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3802}
3803
3804fn calc_func_dependencies_for_aggregate(
3806 group_expr: &[Expr],
3808 input: &LogicalPlan,
3810 aggr_schema: &DFSchema,
3812) -> Result<FunctionalDependencies> {
3813 if !contains_grouping_set(group_expr) {
3819 let group_by_expr_names = group_expr
3820 .iter()
3821 .map(|item| item.schema_name().to_string())
3822 .collect::<IndexSet<_>>()
3823 .into_iter()
3824 .collect::<Vec<_>>();
3825 let aggregate_func_dependencies = aggregate_functional_dependencies(
3826 input.schema(),
3827 &group_by_expr_names,
3828 aggr_schema,
3829 );
3830 Ok(aggregate_func_dependencies)
3831 } else {
3832 Ok(FunctionalDependencies::empty())
3833 }
3834}
3835
3836fn calc_func_dependencies_for_project(
3839 exprs: &[Expr],
3840 input: &LogicalPlan,
3841) -> Result<FunctionalDependencies> {
3842 let input_fields = input.schema().field_names();
3843 let proj_indices = exprs
3845 .iter()
3846 .map(|expr| match expr {
3847 #[expect(deprecated)]
3848 Expr::Wildcard { qualifier, options } => {
3849 let wildcard_fields = exprlist_to_fields(
3850 vec![&Expr::Wildcard {
3851 qualifier: qualifier.clone(),
3852 options: options.clone(),
3853 }],
3854 input,
3855 )?;
3856 Ok::<_, DataFusionError>(
3857 wildcard_fields
3858 .into_iter()
3859 .filter_map(|(qualifier, f)| {
3860 let flat_name = qualifier
3861 .map(|t| format!("{}.{}", t, f.name()))
3862 .unwrap_or_else(|| f.name().clone());
3863 input_fields.iter().position(|item| *item == flat_name)
3864 })
3865 .collect::<Vec<_>>(),
3866 )
3867 }
3868 Expr::Alias(alias) => {
3869 let name = format!("{}", alias.expr);
3870 Ok(input_fields
3871 .iter()
3872 .position(|item| *item == name)
3873 .map(|i| vec![i])
3874 .unwrap_or(vec![]))
3875 }
3876 _ => {
3877 let name = format!("{expr}");
3878 Ok(input_fields
3879 .iter()
3880 .position(|item| *item == name)
3881 .map(|i| vec![i])
3882 .unwrap_or(vec![]))
3883 }
3884 })
3885 .collect::<Result<Vec<_>>>()?
3886 .into_iter()
3887 .flatten()
3888 .collect::<Vec<_>>();
3889
3890 Ok(input
3891 .schema()
3892 .functional_dependencies()
3893 .project_functional_dependencies(&proj_indices, exprs.len()))
3894}
3895
3896#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3898pub struct Sort {
3899 pub expr: Vec<SortExpr>,
3901 pub input: Arc<LogicalPlan>,
3903 pub fetch: Option<usize>,
3905}
3906
3907#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3909pub struct Join {
3910 pub left: Arc<LogicalPlan>,
3912 pub right: Arc<LogicalPlan>,
3914 pub on: Vec<(Expr, Expr)>,
3916 pub filter: Option<Expr>,
3918 pub join_type: JoinType,
3920 pub join_constraint: JoinConstraint,
3922 pub schema: DFSchemaRef,
3924 pub null_equality: NullEquality,
3926 pub null_aware: bool,
3934}
3935
3936impl Join {
3937 #[expect(clippy::too_many_arguments)]
3957 pub fn try_new(
3958 left: Arc<LogicalPlan>,
3959 right: Arc<LogicalPlan>,
3960 on: Vec<(Expr, Expr)>,
3961 filter: Option<Expr>,
3962 join_type: JoinType,
3963 join_constraint: JoinConstraint,
3964 null_equality: NullEquality,
3965 null_aware: bool,
3966 ) -> Result<Self> {
3967 let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3968
3969 Ok(Join {
3970 left,
3971 right,
3972 on,
3973 filter,
3974 join_type,
3975 join_constraint,
3976 schema: Arc::new(join_schema),
3977 null_equality,
3978 null_aware,
3979 })
3980 }
3981
3982 pub fn try_new_with_project_input(
3985 original: &LogicalPlan,
3986 left: Arc<LogicalPlan>,
3987 right: Arc<LogicalPlan>,
3988 column_on: (Vec<Column>, Vec<Column>),
3989 ) -> Result<(Self, bool)> {
3990 let original_join = match original {
3991 LogicalPlan::Join(join) => join,
3992 _ => return plan_err!("Could not create join with project input"),
3993 };
3994
3995 let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3996 let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3997
3998 let mut requalified = false;
3999
4000 if original_join.join_type == JoinType::Inner
4003 || original_join.join_type == JoinType::Left
4004 || original_join.join_type == JoinType::Right
4005 || original_join.join_type == JoinType::Full
4006 {
4007 (left_sch, right_sch, requalified) =
4008 requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
4009 }
4010
4011 let on: Vec<(Expr, Expr)> = column_on
4012 .0
4013 .into_iter()
4014 .zip(column_on.1)
4015 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
4016 .collect();
4017
4018 let join_schema = build_join_schema(
4019 left_sch.schema(),
4020 right_sch.schema(),
4021 &original_join.join_type,
4022 )?;
4023
4024 Ok((
4025 Join {
4026 left,
4027 right,
4028 on,
4029 filter: original_join.filter.clone(),
4030 join_type: original_join.join_type,
4031 join_constraint: original_join.join_constraint,
4032 schema: Arc::new(join_schema),
4033 null_equality: original_join.null_equality,
4034 null_aware: original_join.null_aware,
4035 },
4036 requalified,
4037 ))
4038 }
4039}
4040
4041impl PartialOrd for Join {
4043 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4044 #[derive(PartialEq, PartialOrd)]
4045 struct ComparableJoin<'a> {
4046 pub left: &'a Arc<LogicalPlan>,
4048 pub right: &'a Arc<LogicalPlan>,
4050 pub on: &'a Vec<(Expr, Expr)>,
4052 pub filter: &'a Option<Expr>,
4054 pub join_type: &'a JoinType,
4056 pub join_constraint: &'a JoinConstraint,
4058 pub null_equality: &'a NullEquality,
4060 }
4061 let comparable_self = ComparableJoin {
4062 left: &self.left,
4063 right: &self.right,
4064 on: &self.on,
4065 filter: &self.filter,
4066 join_type: &self.join_type,
4067 join_constraint: &self.join_constraint,
4068 null_equality: &self.null_equality,
4069 };
4070 let comparable_other = ComparableJoin {
4071 left: &other.left,
4072 right: &other.right,
4073 on: &other.on,
4074 filter: &other.filter,
4075 join_type: &other.join_type,
4076 join_constraint: &other.join_constraint,
4077 null_equality: &other.null_equality,
4078 };
4079 comparable_self
4080 .partial_cmp(&comparable_other)
4081 .filter(|cmp| *cmp != Ordering::Equal || self == other)
4083 }
4084}
4085
4086#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
4088pub struct Subquery {
4089 pub subquery: Arc<LogicalPlan>,
4091 pub outer_ref_columns: Vec<Expr>,
4093 pub spans: Spans,
4095}
4096
4097impl Normalizeable for Subquery {
4098 fn can_normalize(&self) -> bool {
4099 false
4100 }
4101}
4102
4103impl NormalizeEq for Subquery {
4104 fn normalize_eq(&self, other: &Self) -> bool {
4105 *self.subquery == *other.subquery
4107 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
4108 && self
4109 .outer_ref_columns
4110 .iter()
4111 .zip(other.outer_ref_columns.iter())
4112 .all(|(a, b)| a.normalize_eq(b))
4113 }
4114}
4115
4116impl Subquery {
4117 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
4118 match plan {
4119 Expr::ScalarSubquery(it) => Ok(it),
4120 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
4121 _ => plan_err!("Could not coerce into ScalarSubquery!"),
4122 }
4123 }
4124
4125 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
4126 Subquery {
4127 subquery: plan,
4128 outer_ref_columns: self.outer_ref_columns.clone(),
4129 spans: Spans::new(),
4130 }
4131 }
4132}
4133
4134impl Debug for Subquery {
4135 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4136 write!(f, "<subquery>")
4137 }
4138}
4139
4140#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
4146pub enum Partitioning {
4147 RoundRobinBatch(usize),
4149 Hash(Vec<Expr>, usize),
4152 DistributeBy(Vec<Expr>),
4154}
4155
4156#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
4176pub struct ColumnUnnestList {
4177 pub output_column: Column,
4178 pub depth: usize,
4179}
4180
4181impl Display for ColumnUnnestList {
4182 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
4183 write!(f, "{}|depth={}", self.output_column, self.depth)
4184 }
4185}
4186
4187#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4190pub struct Unnest {
4191 pub input: Arc<LogicalPlan>,
4193 pub exec_columns: Vec<Column>,
4195 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
4198 pub struct_type_columns: Vec<usize>,
4201 pub dependency_indices: Vec<usize>,
4204 pub schema: DFSchemaRef,
4206 pub options: UnnestOptions,
4208}
4209
4210impl PartialOrd for Unnest {
4212 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
4213 #[derive(PartialEq, PartialOrd)]
4214 struct ComparableUnnest<'a> {
4215 pub input: &'a Arc<LogicalPlan>,
4217 pub exec_columns: &'a Vec<Column>,
4219 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
4222 pub struct_type_columns: &'a Vec<usize>,
4225 pub dependency_indices: &'a Vec<usize>,
4228 pub options: &'a UnnestOptions,
4230 }
4231 let comparable_self = ComparableUnnest {
4232 input: &self.input,
4233 exec_columns: &self.exec_columns,
4234 list_type_columns: &self.list_type_columns,
4235 struct_type_columns: &self.struct_type_columns,
4236 dependency_indices: &self.dependency_indices,
4237 options: &self.options,
4238 };
4239 let comparable_other = ComparableUnnest {
4240 input: &other.input,
4241 exec_columns: &other.exec_columns,
4242 list_type_columns: &other.list_type_columns,
4243 struct_type_columns: &other.struct_type_columns,
4244 dependency_indices: &other.dependency_indices,
4245 options: &other.options,
4246 };
4247 comparable_self
4248 .partial_cmp(&comparable_other)
4249 .filter(|cmp| *cmp != Ordering::Equal || self == other)
4251 }
4252}
4253
4254impl Unnest {
4255 pub fn try_new(
4256 input: Arc<LogicalPlan>,
4257 exec_columns: Vec<Column>,
4258 options: UnnestOptions,
4259 ) -> Result<Self> {
4260 if exec_columns.is_empty() {
4261 return plan_err!("unnest plan requires at least 1 column to unnest");
4262 }
4263
4264 let mut list_columns: Vec<(usize, ColumnUnnestList)> = vec![];
4265 let mut struct_columns = vec![];
4266 let indices_to_unnest = exec_columns
4267 .iter()
4268 .map(|c| Ok((input.schema().index_of_column(c)?, c)))
4269 .collect::<Result<HashMap<usize, &Column>>>()?;
4270
4271 let input_schema = input.schema();
4272
4273 let mut dependency_indices = vec![];
4274 let fields = input_schema
4290 .iter()
4291 .enumerate()
4292 .map(|(index, (original_qualifier, original_field))| {
4293 match indices_to_unnest.get(&index) {
4294 Some(column_to_unnest) => {
4295 let recursions_on_column = options
4296 .recursions
4297 .iter()
4298 .filter(|p| -> bool { &p.input_column == *column_to_unnest })
4299 .collect::<Vec<_>>();
4300 let mut transformed_columns = recursions_on_column
4301 .iter()
4302 .map(|r| {
4303 list_columns.push((
4304 index,
4305 ColumnUnnestList {
4306 output_column: r.output_column.clone(),
4307 depth: r.depth,
4308 },
4309 ));
4310 Ok(get_unnested_columns(
4311 &r.output_column.name,
4312 original_field.data_type(),
4313 r.depth,
4314 )?
4315 .into_iter()
4316 .next()
4317 .unwrap()) })
4319 .collect::<Result<Vec<(Column, Arc<Field>)>>>()?;
4320 if transformed_columns.is_empty() {
4321 transformed_columns = get_unnested_columns(
4322 &column_to_unnest.name,
4323 original_field.data_type(),
4324 1,
4325 )?;
4326 match original_field.data_type() {
4327 DataType::Struct(_) => {
4328 struct_columns.push(index);
4329 }
4330 DataType::List(_)
4331 | DataType::FixedSizeList(_, _)
4332 | DataType::LargeList(_)
4333 | DataType::ListView(_)
4334 | DataType::LargeListView(_) => {
4335 list_columns.push((
4336 index,
4337 ColumnUnnestList {
4338 output_column: Column::from_name(
4339 &column_to_unnest.name,
4340 ),
4341 depth: 1,
4342 },
4343 ));
4344 }
4345 _ => {}
4346 };
4347 }
4348
4349 dependency_indices.extend(std::iter::repeat_n(
4351 index,
4352 transformed_columns.len(),
4353 ));
4354 Ok(transformed_columns
4355 .iter()
4356 .map(|(col, field)| {
4357 (col.relation.to_owned(), field.to_owned())
4358 })
4359 .collect())
4360 }
4361 None => {
4362 dependency_indices.push(index);
4363 Ok(vec![(
4364 original_qualifier.cloned(),
4365 Arc::clone(original_field),
4366 )])
4367 }
4368 }
4369 })
4370 .collect::<Result<Vec<_>>>()?
4371 .into_iter()
4372 .flatten()
4373 .collect::<Vec<_>>();
4374
4375 let metadata = input_schema.metadata().clone();
4376 let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
4377 let deps = input_schema.functional_dependencies().clone();
4379 let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
4380
4381 Ok(Unnest {
4382 input,
4383 exec_columns,
4384 list_type_columns: list_columns,
4385 struct_type_columns: struct_columns,
4386 dependency_indices,
4387 schema,
4388 options,
4389 })
4390 }
4391}
4392
4393fn get_unnested_columns(
4402 col_name: &String,
4403 data_type: &DataType,
4404 depth: usize,
4405) -> Result<Vec<(Column, Arc<Field>)>> {
4406 let mut qualified_columns = Vec::with_capacity(1);
4407
4408 match data_type {
4409 DataType::List(_)
4410 | DataType::FixedSizeList(_, _)
4411 | DataType::LargeList(_)
4412 | DataType::ListView(_)
4413 | DataType::LargeListView(_) => {
4414 let data_type = get_unnested_list_datatype_recursive(data_type, depth)?;
4415 let new_field = Arc::new(Field::new(
4416 col_name, data_type,
4417 true,
4420 ));
4421 let column = Column::from_name(col_name);
4422 qualified_columns.push((column, new_field));
4424 }
4425 DataType::Struct(fields) => {
4426 qualified_columns.extend(fields.iter().map(|f| {
4427 let new_name = format!("{}.{}", col_name, f.name());
4428 let column = Column::from_name(&new_name);
4429 let new_field = f.as_ref().clone().with_name(new_name);
4430 (column, Arc::new(new_field))
4432 }))
4433 }
4434 _ => {
4435 return internal_err!("trying to unnest on invalid data type {data_type}");
4436 }
4437 };
4438 Ok(qualified_columns)
4439}
4440
4441fn get_unnested_list_datatype_recursive(
4444 data_type: &DataType,
4445 depth: usize,
4446) -> Result<DataType> {
4447 match data_type {
4448 DataType::List(field)
4449 | DataType::FixedSizeList(field, _)
4450 | DataType::LargeList(field)
4451 | DataType::ListView(field)
4452 | DataType::LargeListView(field) => {
4453 if depth == 1 {
4454 return Ok(field.data_type().clone());
4455 }
4456 return get_unnested_list_datatype_recursive(field.data_type(), depth - 1);
4457 }
4458 _ => {}
4459 };
4460
4461 internal_err!("trying to unnest on invalid data type {data_type}")
4462}
4463
4464#[cfg(test)]
4465mod tests {
4466 use super::*;
4467 use crate::builder::LogicalTableSource;
4468 use crate::logical_plan::table_scan;
4469 use crate::select_expr::SelectExpr;
4470 use crate::test::function_stub::{count, count_udaf};
4471 use crate::{
4472 GroupingSet, binary_expr, col, exists, in_subquery, lit, placeholder,
4473 scalar_subquery,
4474 };
4475 use datafusion_common::metadata::ScalarAndMetadata;
4476 use datafusion_common::tree_node::{
4477 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4478 };
4479 use datafusion_common::{Constraint, not_impl_err};
4480 use insta::{assert_debug_snapshot, assert_snapshot};
4481 use std::hash::DefaultHasher;
4482
4483 fn employee_schema() -> Schema {
4484 Schema::new(vec![
4485 Field::new("id", DataType::Int32, false),
4486 Field::new("first_name", DataType::Utf8, false),
4487 Field::new("last_name", DataType::Utf8, false),
4488 Field::new("state", DataType::Utf8, false),
4489 Field::new("salary", DataType::Int32, false),
4490 ])
4491 }
4492
4493 fn display_plan() -> Result<LogicalPlan> {
4494 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4495 .build()?;
4496
4497 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4498 .filter(in_subquery(col("state"), Arc::new(plan1)))?
4499 .project(vec![col("id")])?
4500 .build()
4501 }
4502
4503 #[test]
4504 fn test_display_indent() -> Result<()> {
4505 let plan = display_plan()?;
4506
4507 assert_snapshot!(plan.display_indent(), @r"
4508 Projection: employee_csv.id
4509 Filter: employee_csv.state IN (<subquery>)
4510 Subquery:
4511 TableScan: employee_csv projection=[state]
4512 TableScan: employee_csv projection=[id, state]
4513 ");
4514 Ok(())
4515 }
4516
4517 #[test]
4518 fn test_display_indent_schema() -> Result<()> {
4519 let plan = display_plan()?;
4520
4521 assert_snapshot!(plan.display_indent_schema(), @r"
4522 Projection: employee_csv.id [id:Int32]
4523 Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4524 Subquery: [state:Utf8]
4525 TableScan: employee_csv projection=[state] [state:Utf8]
4526 TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4527 ");
4528 Ok(())
4529 }
4530
4531 #[test]
4532 fn test_display_subquery_alias() -> Result<()> {
4533 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4534 .build()?;
4535 let plan1 = Arc::new(plan1);
4536
4537 let plan =
4538 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4539 .project(vec![col("id"), exists(plan1).alias("exists")])?
4540 .build();
4541
4542 assert_snapshot!(plan?.display_indent(), @r"
4543 Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4544 Subquery:
4545 TableScan: employee_csv projection=[state]
4546 TableScan: employee_csv projection=[id, state]
4547 ");
4548 Ok(())
4549 }
4550
4551 #[test]
4552 fn test_display_graphviz() -> Result<()> {
4553 let plan = display_plan()?;
4554
4555 assert_snapshot!(plan.display_graphviz(), @r#"
4558 // Begin DataFusion GraphViz Plan,
4559 // display it online here: https://dreampuf.github.io/GraphvizOnline
4560
4561 digraph {
4562 subgraph cluster_1
4563 {
4564 graph[label="LogicalPlan"]
4565 2[shape=box label="Projection: employee_csv.id"]
4566 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4567 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4568 4[shape=box label="Subquery:"]
4569 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4570 5[shape=box label="TableScan: employee_csv projection=[state]"]
4571 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4572 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4573 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4574 }
4575 subgraph cluster_7
4576 {
4577 graph[label="Detailed LogicalPlan"]
4578 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4579 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4580 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4581 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4582 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4583 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4584 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4585 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4586 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4587 }
4588 }
4589 // End DataFusion GraphViz Plan
4590 "#);
4591 Ok(())
4592 }
4593
4594 #[test]
4595 fn test_display_pg_json() -> Result<()> {
4596 let plan = display_plan()?;
4597
4598 assert_snapshot!(plan.display_pg_json(), @r#"
4599 [
4600 {
4601 "Plan": {
4602 "Node Type": "Projection",
4603 "Expressions": [
4604 "employee_csv.id"
4605 ],
4606 "Plans": [
4607 {
4608 "Node Type": "Filter",
4609 "Condition": "employee_csv.state IN (<subquery>)",
4610 "Plans": [
4611 {
4612 "Node Type": "Subquery",
4613 "Plans": [
4614 {
4615 "Node Type": "TableScan",
4616 "Relation Name": "employee_csv",
4617 "Plans": [],
4618 "Output": [
4619 "state"
4620 ]
4621 }
4622 ],
4623 "Output": [
4624 "state"
4625 ]
4626 },
4627 {
4628 "Node Type": "TableScan",
4629 "Relation Name": "employee_csv",
4630 "Plans": [],
4631 "Output": [
4632 "id",
4633 "state"
4634 ]
4635 }
4636 ],
4637 "Output": [
4638 "id",
4639 "state"
4640 ]
4641 }
4642 ],
4643 "Output": [
4644 "id"
4645 ]
4646 }
4647 }
4648 ]
4649 "#);
4650 Ok(())
4651 }
4652
4653 #[derive(Debug, Default)]
4655 struct OkVisitor {
4656 strings: Vec<String>,
4657 }
4658
4659 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4660 type Node = LogicalPlan;
4661
4662 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4663 let s = match plan {
4664 LogicalPlan::Projection { .. } => "pre_visit Projection",
4665 LogicalPlan::Filter { .. } => "pre_visit Filter",
4666 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4667 _ => {
4668 return not_impl_err!("unknown plan type");
4669 }
4670 };
4671
4672 self.strings.push(s.into());
4673 Ok(TreeNodeRecursion::Continue)
4674 }
4675
4676 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4677 let s = match plan {
4678 LogicalPlan::Projection { .. } => "post_visit Projection",
4679 LogicalPlan::Filter { .. } => "post_visit Filter",
4680 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4681 _ => {
4682 return not_impl_err!("unknown plan type");
4683 }
4684 };
4685
4686 self.strings.push(s.into());
4687 Ok(TreeNodeRecursion::Continue)
4688 }
4689 }
4690
4691 #[test]
4692 fn visit_order() {
4693 let mut visitor = OkVisitor::default();
4694 let plan = test_plan();
4695 let res = plan.visit_with_subqueries(&mut visitor);
4696 assert!(res.is_ok());
4697
4698 assert_debug_snapshot!(visitor.strings, @r#"
4699 [
4700 "pre_visit Projection",
4701 "pre_visit Filter",
4702 "pre_visit TableScan",
4703 "post_visit TableScan",
4704 "post_visit Filter",
4705 "post_visit Projection",
4706 ]
4707 "#);
4708 }
4709
4710 #[derive(Debug, Default)]
4711 struct OptionalCounter {
4713 val: Option<usize>,
4714 }
4715
4716 impl OptionalCounter {
4717 fn new(val: usize) -> Self {
4718 Self { val: Some(val) }
4719 }
4720 fn dec(&mut self) -> bool {
4722 if Some(0) == self.val {
4723 true
4724 } else {
4725 self.val = self.val.take().map(|i| i - 1);
4726 false
4727 }
4728 }
4729 }
4730
4731 #[derive(Debug, Default)]
4732 struct StoppingVisitor {
4734 inner: OkVisitor,
4735 return_false_from_pre_in: OptionalCounter,
4737 return_false_from_post_in: OptionalCounter,
4739 }
4740
4741 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4742 type Node = LogicalPlan;
4743
4744 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4745 if self.return_false_from_pre_in.dec() {
4746 return Ok(TreeNodeRecursion::Stop);
4747 }
4748 self.inner.f_down(plan)?;
4749
4750 Ok(TreeNodeRecursion::Continue)
4751 }
4752
4753 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4754 if self.return_false_from_post_in.dec() {
4755 return Ok(TreeNodeRecursion::Stop);
4756 }
4757
4758 self.inner.f_up(plan)
4759 }
4760 }
4761
4762 #[test]
4764 fn early_stopping_pre_visit() {
4765 let mut visitor = StoppingVisitor {
4766 return_false_from_pre_in: OptionalCounter::new(2),
4767 ..Default::default()
4768 };
4769 let plan = test_plan();
4770 let res = plan.visit_with_subqueries(&mut visitor);
4771 assert!(res.is_ok());
4772
4773 assert_debug_snapshot!(
4774 visitor.inner.strings,
4775 @r#"
4776 [
4777 "pre_visit Projection",
4778 "pre_visit Filter",
4779 ]
4780 "#
4781 );
4782 }
4783
4784 #[test]
4785 fn early_stopping_post_visit() {
4786 let mut visitor = StoppingVisitor {
4787 return_false_from_post_in: OptionalCounter::new(1),
4788 ..Default::default()
4789 };
4790 let plan = test_plan();
4791 let res = plan.visit_with_subqueries(&mut visitor);
4792 assert!(res.is_ok());
4793
4794 assert_debug_snapshot!(
4795 visitor.inner.strings,
4796 @r#"
4797 [
4798 "pre_visit Projection",
4799 "pre_visit Filter",
4800 "pre_visit TableScan",
4801 "post_visit TableScan",
4802 ]
4803 "#
4804 );
4805 }
4806
4807 #[derive(Debug, Default)]
4808 struct ErrorVisitor {
4810 inner: OkVisitor,
4811 return_error_from_pre_in: OptionalCounter,
4813 return_error_from_post_in: OptionalCounter,
4815 }
4816
4817 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4818 type Node = LogicalPlan;
4819
4820 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4821 if self.return_error_from_pre_in.dec() {
4822 return not_impl_err!("Error in pre_visit");
4823 }
4824
4825 self.inner.f_down(plan)
4826 }
4827
4828 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4829 if self.return_error_from_post_in.dec() {
4830 return not_impl_err!("Error in post_visit");
4831 }
4832
4833 self.inner.f_up(plan)
4834 }
4835 }
4836
4837 #[test]
4838 fn error_pre_visit() {
4839 let mut visitor = ErrorVisitor {
4840 return_error_from_pre_in: OptionalCounter::new(2),
4841 ..Default::default()
4842 };
4843 let plan = test_plan();
4844 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4845 assert_snapshot!(
4846 res.strip_backtrace(),
4847 @"This feature is not implemented: Error in pre_visit"
4848 );
4849 assert_debug_snapshot!(
4850 visitor.inner.strings,
4851 @r#"
4852 [
4853 "pre_visit Projection",
4854 "pre_visit Filter",
4855 ]
4856 "#
4857 );
4858 }
4859
4860 #[test]
4861 fn error_post_visit() {
4862 let mut visitor = ErrorVisitor {
4863 return_error_from_post_in: OptionalCounter::new(1),
4864 ..Default::default()
4865 };
4866 let plan = test_plan();
4867 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4868 assert_snapshot!(
4869 res.strip_backtrace(),
4870 @"This feature is not implemented: Error in post_visit"
4871 );
4872 assert_debug_snapshot!(
4873 visitor.inner.strings,
4874 @r#"
4875 [
4876 "pre_visit Projection",
4877 "pre_visit Filter",
4878 "pre_visit TableScan",
4879 "post_visit TableScan",
4880 ]
4881 "#
4882 );
4883 }
4884
4885 #[test]
4886 fn test_partial_eq_hash_and_partial_ord() {
4887 let empty_values = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4888 produce_one_row: true,
4889 schema: Arc::new(DFSchema::empty()),
4890 }));
4891
4892 let count_window_function = |schema| {
4893 Window::try_new_with_schema(
4894 vec![Expr::WindowFunction(Box::new(WindowFunction::new(
4895 WindowFunctionDefinition::AggregateUDF(count_udaf()),
4896 vec![],
4897 )))],
4898 Arc::clone(&empty_values),
4899 Arc::new(schema),
4900 )
4901 .unwrap()
4902 };
4903
4904 let schema_without_metadata = || {
4905 DFSchema::from_unqualified_fields(
4906 vec![Field::new("count", DataType::Int64, false)].into(),
4907 HashMap::new(),
4908 )
4909 .unwrap()
4910 };
4911
4912 let schema_with_metadata = || {
4913 DFSchema::from_unqualified_fields(
4914 vec![Field::new("count", DataType::Int64, false)].into(),
4915 [("key".to_string(), "value".to_string())].into(),
4916 )
4917 .unwrap()
4918 };
4919
4920 let f = count_window_function(schema_without_metadata());
4922
4923 let f2 = count_window_function(schema_without_metadata());
4925 assert_eq!(f, f2);
4926 assert_eq!(hash(&f), hash(&f2));
4927 assert_eq!(f.partial_cmp(&f2), Some(Ordering::Equal));
4928
4929 let o = count_window_function(schema_with_metadata());
4931 assert_ne!(f, o);
4932 assert_ne!(hash(&f), hash(&o)); assert_eq!(f.partial_cmp(&o), None);
4934 }
4935
4936 fn hash<T: Hash>(value: &T) -> u64 {
4937 let hasher = &mut DefaultHasher::new();
4938 value.hash(hasher);
4939 hasher.finish()
4940 }
4941
4942 #[test]
4943 fn projection_expr_schema_mismatch() -> Result<()> {
4944 let empty_schema = Arc::new(DFSchema::empty());
4945 let p = Projection::try_new_with_schema(
4946 vec![col("a")],
4947 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4948 produce_one_row: false,
4949 schema: Arc::clone(&empty_schema),
4950 })),
4951 empty_schema,
4952 );
4953 assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4954 Ok(())
4955 }
4956
4957 fn test_plan() -> LogicalPlan {
4958 let schema = Schema::new(vec![
4959 Field::new("id", DataType::Int32, false),
4960 Field::new("state", DataType::Utf8, false),
4961 ]);
4962
4963 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4964 .unwrap()
4965 .filter(col("state").eq(lit("CO")))
4966 .unwrap()
4967 .project(vec![col("id")])
4968 .unwrap()
4969 .build()
4970 .unwrap()
4971 }
4972
4973 #[test]
4974 fn test_replace_invalid_placeholder() {
4975 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4977
4978 let plan = table_scan(TableReference::none(), &schema, None)
4979 .unwrap()
4980 .filter(col("id").eq(placeholder("")))
4981 .unwrap()
4982 .build()
4983 .unwrap();
4984
4985 let param_values = vec![ScalarValue::Int32(Some(42))];
4986 plan.replace_params_with_values(¶m_values.clone().into())
4987 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4988
4989 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4991
4992 let plan = table_scan(TableReference::none(), &schema, None)
4993 .unwrap()
4994 .filter(col("id").eq(placeholder("$0")))
4995 .unwrap()
4996 .build()
4997 .unwrap();
4998
4999 plan.replace_params_with_values(¶m_values.clone().into())
5000 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
5001
5002 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5004
5005 let plan = table_scan(TableReference::none(), &schema, None)
5006 .unwrap()
5007 .filter(col("id").eq(placeholder("$00")))
5008 .unwrap()
5009 .build()
5010 .unwrap();
5011
5012 plan.replace_params_with_values(¶m_values.into())
5013 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
5014 }
5015
5016 #[test]
5017 fn test_replace_placeholder_mismatched_metadata() {
5018 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5019
5020 let plan = table_scan(TableReference::none(), &schema, None)
5022 .unwrap()
5023 .filter(col("id").eq(placeholder("$1")))
5024 .unwrap()
5025 .build()
5026 .unwrap();
5027 let prepared_builder = LogicalPlanBuilder::new(plan)
5028 .prepare(
5029 "".to_string(),
5030 vec![Field::new("", DataType::Int32, true).into()],
5031 )
5032 .unwrap();
5033
5034 let mut scalar_meta = HashMap::new();
5036 scalar_meta.insert("some_key".to_string(), "some_value".to_string());
5037 let param_values = ParamValues::List(vec![ScalarAndMetadata::new(
5038 ScalarValue::Int32(Some(42)),
5039 Some(scalar_meta.into()),
5040 )]);
5041 prepared_builder
5042 .plan()
5043 .clone()
5044 .with_param_values(param_values)
5045 .expect_err("prepared field metadata mismatch unexpectedly succeeded");
5046 }
5047
5048 #[test]
5049 fn test_replace_placeholder_empty_relation_valid_schema() {
5050 let plan = LogicalPlanBuilder::empty(false)
5052 .project(vec![
5053 SelectExpr::from(placeholder("$1")),
5054 SelectExpr::from(placeholder("$2")),
5055 ])
5056 .unwrap()
5057 .build()
5058 .unwrap();
5059
5060 assert_snapshot!(plan.display_indent_schema(), @r"
5062 Projection: $1, $2 [$1:Null;N, $2:Null;N]
5063 EmptyRelation: rows=0 []
5064 ");
5065
5066 let plan = plan
5067 .with_param_values(vec![ScalarValue::from(1i32), ScalarValue::from("s")])
5068 .unwrap();
5069
5070 assert_snapshot!(plan.display_indent_schema(), @r#"
5072 Projection: Int32(1) AS $1, Utf8("s") AS $2 [$1:Int32, $2:Utf8]
5073 EmptyRelation: rows=0 []
5074 "#);
5075 }
5076
5077 #[test]
5078 fn test_nullable_schema_after_grouping_set() {
5079 let schema = Schema::new(vec![
5080 Field::new("foo", DataType::Int32, false),
5081 Field::new("bar", DataType::Int32, false),
5082 ]);
5083
5084 let plan = table_scan(TableReference::none(), &schema, None)
5085 .unwrap()
5086 .aggregate(
5087 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
5088 vec![col("foo")],
5089 vec![col("bar")],
5090 ]))],
5091 vec![count(lit(true))],
5092 )
5093 .unwrap()
5094 .build()
5095 .unwrap();
5096
5097 let output_schema = plan.schema();
5098
5099 assert!(
5100 output_schema
5101 .field_with_name(None, "foo")
5102 .unwrap()
5103 .is_nullable(),
5104 );
5105 assert!(
5106 output_schema
5107 .field_with_name(None, "bar")
5108 .unwrap()
5109 .is_nullable()
5110 );
5111 }
5112
5113 #[test]
5114 fn grouping_id_type_accounts_for_duplicate_ordinal_bits() {
5115 assert_eq!(Aggregate::grouping_id_type(8, 0), DataType::UInt8);
5118 assert_eq!(Aggregate::grouping_id_type(8, 1), DataType::UInt16);
5119 }
5120
5121 #[test]
5122 fn test_filter_is_scalar() {
5123 let schema =
5125 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
5126
5127 let source = Arc::new(LogicalTableSource::new(schema));
5128 let schema = Arc::new(
5129 DFSchema::try_from_qualified_schema(
5130 TableReference::bare("tab"),
5131 &source.schema(),
5132 )
5133 .unwrap(),
5134 );
5135 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5136 table_name: TableReference::bare("tab"),
5137 source: Arc::clone(&source) as Arc<dyn TableSource>,
5138 projection: None,
5139 projected_schema: Arc::clone(&schema),
5140 filters: vec![],
5141 fetch: None,
5142 }));
5143 let col = schema.field_names()[0].clone();
5144
5145 let filter = Filter::try_new(
5146 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
5147 scan,
5148 )
5149 .unwrap();
5150 assert!(!filter.is_scalar());
5151 let unique_schema = Arc::new(
5152 schema
5153 .as_ref()
5154 .clone()
5155 .with_functional_dependencies(
5156 FunctionalDependencies::new_from_constraints(
5157 Some(&Constraints::new_unverified(vec![Constraint::Unique(
5158 vec![0],
5159 )])),
5160 1,
5161 ),
5162 )
5163 .unwrap(),
5164 );
5165 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
5166 table_name: TableReference::bare("tab"),
5167 source,
5168 projection: None,
5169 projected_schema: Arc::clone(&unique_schema),
5170 filters: vec![],
5171 fetch: None,
5172 }));
5173 let col = schema.field_names()[0].clone();
5174
5175 let filter =
5176 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
5177 assert!(filter.is_scalar());
5178 }
5179
5180 #[test]
5181 fn test_transform_explain() {
5182 let schema = Schema::new(vec![
5183 Field::new("foo", DataType::Int32, false),
5184 Field::new("bar", DataType::Int32, false),
5185 ]);
5186
5187 let plan = table_scan(TableReference::none(), &schema, None)
5188 .unwrap()
5189 .explain(false, false)
5190 .unwrap()
5191 .build()
5192 .unwrap();
5193
5194 let external_filter = col("foo").eq(lit(true));
5195
5196 let plan = plan
5199 .transform(|plan| match plan {
5200 LogicalPlan::TableScan(table) => {
5201 let filter = Filter::try_new(
5202 external_filter.clone(),
5203 Arc::new(LogicalPlan::TableScan(table)),
5204 )
5205 .unwrap();
5206 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
5207 }
5208 x => Ok(Transformed::no(x)),
5209 })
5210 .data()
5211 .unwrap();
5212
5213 let actual = format!("{}", plan.display_indent());
5214 assert_snapshot!(actual, @r"
5215 Explain
5216 Filter: foo = Boolean(true)
5217 TableScan: ?table?
5218 ")
5219 }
5220
5221 #[test]
5222 fn test_plan_partial_ord() {
5223 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
5224 produce_one_row: false,
5225 schema: Arc::new(DFSchema::empty()),
5226 });
5227
5228 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
5229 schema: Arc::new(Schema::new(vec![Field::new(
5230 "foo",
5231 DataType::Int32,
5232 false,
5233 )])),
5234 output_schema: DFSchemaRef::new(DFSchema::empty()),
5235 });
5236
5237 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
5238 schema: Arc::new(Schema::new(vec![Field::new(
5239 "foo",
5240 DataType::Int32,
5241 false,
5242 )])),
5243 output_schema: DFSchemaRef::new(DFSchema::empty()),
5244 });
5245
5246 assert_eq!(
5247 empty_relation.partial_cmp(&describe_table),
5248 Some(Ordering::Less)
5249 );
5250 assert_eq!(
5251 describe_table.partial_cmp(&empty_relation),
5252 Some(Ordering::Greater)
5253 );
5254 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
5255 }
5256
5257 #[test]
5258 fn test_limit_with_new_children() {
5259 let input = Arc::new(LogicalPlan::Values(Values {
5260 schema: Arc::new(DFSchema::empty()),
5261 values: vec![vec![]],
5262 }));
5263 let cases = [
5264 LogicalPlan::Limit(Limit {
5265 skip: None,
5266 fetch: None,
5267 input: Arc::clone(&input),
5268 }),
5269 LogicalPlan::Limit(Limit {
5270 skip: None,
5271 fetch: Some(Box::new(Expr::Literal(
5272 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5273 None,
5274 ))),
5275 input: Arc::clone(&input),
5276 }),
5277 LogicalPlan::Limit(Limit {
5278 skip: Some(Box::new(Expr::Literal(
5279 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5280 None,
5281 ))),
5282 fetch: None,
5283 input: Arc::clone(&input),
5284 }),
5285 LogicalPlan::Limit(Limit {
5286 skip: Some(Box::new(Expr::Literal(
5287 ScalarValue::new_one(&DataType::UInt32).unwrap(),
5288 None,
5289 ))),
5290 fetch: Some(Box::new(Expr::Literal(
5291 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
5292 None,
5293 ))),
5294 input,
5295 }),
5296 ];
5297
5298 for limit in cases {
5299 let new_limit = limit
5300 .with_new_exprs(
5301 limit.expressions(),
5302 limit.inputs().into_iter().cloned().collect(),
5303 )
5304 .unwrap();
5305 assert_eq!(limit, new_limit);
5306 }
5307 }
5308
5309 #[test]
5310 fn test_with_subqueries_jump() {
5311 let subquery_schema =
5316 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
5317
5318 let subquery_plan =
5319 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
5320 .unwrap()
5321 .filter(col("sub_id").eq(lit(0)))
5322 .unwrap()
5323 .build()
5324 .unwrap();
5325
5326 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
5327
5328 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
5329 .unwrap()
5330 .filter(col("id").eq(lit(0)))
5331 .unwrap()
5332 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
5333 .unwrap()
5334 .build()
5335 .unwrap();
5336
5337 let mut filter_found = false;
5338 plan.apply_with_subqueries(|plan| {
5339 match plan {
5340 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5341 LogicalPlan::Filter(..) => filter_found = true,
5342 _ => {}
5343 }
5344 Ok(TreeNodeRecursion::Continue)
5345 })
5346 .unwrap();
5347 assert!(!filter_found);
5348
5349 struct ProjectJumpVisitor {
5350 filter_found: bool,
5351 }
5352
5353 impl ProjectJumpVisitor {
5354 fn new() -> Self {
5355 Self {
5356 filter_found: false,
5357 }
5358 }
5359 }
5360
5361 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
5362 type Node = LogicalPlan;
5363
5364 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
5365 match node {
5366 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
5367 LogicalPlan::Filter(..) => self.filter_found = true,
5368 _ => {}
5369 }
5370 Ok(TreeNodeRecursion::Continue)
5371 }
5372 }
5373
5374 let mut visitor = ProjectJumpVisitor::new();
5375 plan.visit_with_subqueries(&mut visitor).unwrap();
5376 assert!(!visitor.filter_found);
5377
5378 let mut filter_found = false;
5379 plan.clone()
5380 .transform_down_with_subqueries(|plan| {
5381 match plan {
5382 LogicalPlan::Projection(..) => {
5383 return Ok(Transformed::new(
5384 plan,
5385 false,
5386 TreeNodeRecursion::Jump,
5387 ));
5388 }
5389 LogicalPlan::Filter(..) => filter_found = true,
5390 _ => {}
5391 }
5392 Ok(Transformed::no(plan))
5393 })
5394 .unwrap();
5395 assert!(!filter_found);
5396
5397 let mut filter_found = false;
5398 plan.clone()
5399 .transform_down_up_with_subqueries(
5400 |plan| {
5401 match plan {
5402 LogicalPlan::Projection(..) => {
5403 return Ok(Transformed::new(
5404 plan,
5405 false,
5406 TreeNodeRecursion::Jump,
5407 ));
5408 }
5409 LogicalPlan::Filter(..) => filter_found = true,
5410 _ => {}
5411 }
5412 Ok(Transformed::no(plan))
5413 },
5414 |plan| Ok(Transformed::no(plan)),
5415 )
5416 .unwrap();
5417 assert!(!filter_found);
5418
5419 struct ProjectJumpRewriter {
5420 filter_found: bool,
5421 }
5422
5423 impl ProjectJumpRewriter {
5424 fn new() -> Self {
5425 Self {
5426 filter_found: false,
5427 }
5428 }
5429 }
5430
5431 impl TreeNodeRewriter for ProjectJumpRewriter {
5432 type Node = LogicalPlan;
5433
5434 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
5435 match node {
5436 LogicalPlan::Projection(..) => {
5437 return Ok(Transformed::new(
5438 node,
5439 false,
5440 TreeNodeRecursion::Jump,
5441 ));
5442 }
5443 LogicalPlan::Filter(..) => self.filter_found = true,
5444 _ => {}
5445 }
5446 Ok(Transformed::no(node))
5447 }
5448 }
5449
5450 let mut rewriter = ProjectJumpRewriter::new();
5451 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
5452 assert!(!rewriter.filter_found);
5453 }
5454
5455 #[test]
5456 fn test_with_unresolved_placeholders() {
5457 let field_name = "id";
5458 let placeholder_value = "$1";
5459 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
5460
5461 let plan = table_scan(TableReference::none(), &schema, None)
5462 .unwrap()
5463 .filter(col(field_name).eq(placeholder(placeholder_value)))
5464 .unwrap()
5465 .build()
5466 .unwrap();
5467
5468 let params = plan.get_parameter_fields().unwrap();
5470 assert_eq!(params.len(), 1);
5471
5472 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
5473 assert_eq!(parameter_type, None);
5474 }
5475
5476 #[test]
5477 fn test_join_with_new_exprs() -> Result<()> {
5478 fn create_test_join(
5479 on: Vec<(Expr, Expr)>,
5480 filter: Option<Expr>,
5481 ) -> Result<LogicalPlan> {
5482 let schema = Schema::new(vec![
5483 Field::new("a", DataType::Int32, false),
5484 Field::new("b", DataType::Int32, false),
5485 ]);
5486
5487 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
5488 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
5489
5490 Ok(LogicalPlan::Join(Join {
5491 left: Arc::new(
5492 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
5493 ),
5494 right: Arc::new(
5495 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
5496 ),
5497 on,
5498 filter,
5499 join_type: JoinType::Inner,
5500 join_constraint: JoinConstraint::On,
5501 schema: Arc::new(left_schema.join(&right_schema)?),
5502 null_equality: NullEquality::NullEqualsNothing,
5503 null_aware: false,
5504 }))
5505 }
5506
5507 {
5508 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
5509 let LogicalPlan::Join(join) = join.with_new_exprs(
5510 join.expressions(),
5511 join.inputs().into_iter().cloned().collect(),
5512 )?
5513 else {
5514 unreachable!()
5515 };
5516 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5517 assert_eq!(join.filter, None);
5518 }
5519
5520 {
5521 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
5522 let LogicalPlan::Join(join) = join.with_new_exprs(
5523 join.expressions(),
5524 join.inputs().into_iter().cloned().collect(),
5525 )?
5526 else {
5527 unreachable!()
5528 };
5529 assert_eq!(join.on, vec![]);
5530 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
5531 }
5532
5533 {
5534 let join = create_test_join(
5535 vec![(col("t1.a"), (col("t2.a")))],
5536 Some(col("t1.b").gt(col("t2.b"))),
5537 )?;
5538 let LogicalPlan::Join(join) = join.with_new_exprs(
5539 join.expressions(),
5540 join.inputs().into_iter().cloned().collect(),
5541 )?
5542 else {
5543 unreachable!()
5544 };
5545 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
5546 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5547 }
5548
5549 {
5550 let join = create_test_join(
5551 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
5552 None,
5553 )?;
5554 let LogicalPlan::Join(join) = join.with_new_exprs(
5555 vec![
5556 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5557 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
5558 col("t1.b"),
5559 col("t2.b"),
5560 lit(true),
5561 ],
5562 join.inputs().into_iter().cloned().collect(),
5563 )?
5564 else {
5565 unreachable!()
5566 };
5567 assert_eq!(
5568 join.on,
5569 vec![
5570 (
5571 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
5572 binary_expr(col("t2.a"), Operator::Plus, lit(2))
5573 ),
5574 (col("t1.b"), (col("t2.b")))
5575 ]
5576 );
5577 assert_eq!(join.filter, Some(lit(true)));
5578 }
5579
5580 Ok(())
5581 }
5582
5583 #[test]
5584 fn test_join_try_new() -> Result<()> {
5585 let schema = Schema::new(vec![
5586 Field::new("a", DataType::Int32, false),
5587 Field::new("b", DataType::Int32, false),
5588 ]);
5589
5590 let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
5591
5592 let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
5593
5594 let join_types = vec![
5595 JoinType::Inner,
5596 JoinType::Left,
5597 JoinType::Right,
5598 JoinType::Full,
5599 JoinType::LeftSemi,
5600 JoinType::LeftAnti,
5601 JoinType::RightSemi,
5602 JoinType::RightAnti,
5603 JoinType::LeftMark,
5604 ];
5605
5606 for join_type in join_types {
5607 let join = Join::try_new(
5608 Arc::new(left_scan.clone()),
5609 Arc::new(right_scan.clone()),
5610 vec![(col("t1.a"), col("t2.a"))],
5611 Some(col("t1.b").gt(col("t2.b"))),
5612 join_type,
5613 JoinConstraint::On,
5614 NullEquality::NullEqualsNothing,
5615 false,
5616 )?;
5617
5618 match join_type {
5619 JoinType::LeftSemi | JoinType::LeftAnti => {
5620 assert_eq!(join.schema.fields().len(), 2);
5621
5622 let fields = join.schema.fields();
5623 assert_eq!(
5624 fields[0].name(),
5625 "a",
5626 "First field should be 'a' from left table"
5627 );
5628 assert_eq!(
5629 fields[1].name(),
5630 "b",
5631 "Second field should be 'b' from left table"
5632 );
5633 }
5634 JoinType::RightSemi | JoinType::RightAnti => {
5635 assert_eq!(join.schema.fields().len(), 2);
5636
5637 let fields = join.schema.fields();
5638 assert_eq!(
5639 fields[0].name(),
5640 "a",
5641 "First field should be 'a' from right table"
5642 );
5643 assert_eq!(
5644 fields[1].name(),
5645 "b",
5646 "Second field should be 'b' from right table"
5647 );
5648 }
5649 JoinType::LeftMark => {
5650 assert_eq!(join.schema.fields().len(), 3);
5651
5652 let fields = join.schema.fields();
5653 assert_eq!(
5654 fields[0].name(),
5655 "a",
5656 "First field should be 'a' from left table"
5657 );
5658 assert_eq!(
5659 fields[1].name(),
5660 "b",
5661 "Second field should be 'b' from left table"
5662 );
5663 assert_eq!(
5664 fields[2].name(),
5665 "mark",
5666 "Third field should be the mark column"
5667 );
5668
5669 assert!(!fields[0].is_nullable());
5670 assert!(!fields[1].is_nullable());
5671 assert!(!fields[2].is_nullable());
5672 }
5673 _ => {
5674 assert_eq!(join.schema.fields().len(), 4);
5675
5676 let fields = join.schema.fields();
5677 assert_eq!(
5678 fields[0].name(),
5679 "a",
5680 "First field should be 'a' from left table"
5681 );
5682 assert_eq!(
5683 fields[1].name(),
5684 "b",
5685 "Second field should be 'b' from left table"
5686 );
5687 assert_eq!(
5688 fields[2].name(),
5689 "a",
5690 "Third field should be 'a' from right table"
5691 );
5692 assert_eq!(
5693 fields[3].name(),
5694 "b",
5695 "Fourth field should be 'b' from right table"
5696 );
5697
5698 if join_type == JoinType::Left {
5699 assert!(!fields[0].is_nullable());
5701 assert!(!fields[1].is_nullable());
5702 assert!(fields[2].is_nullable());
5704 assert!(fields[3].is_nullable());
5705 } else if join_type == JoinType::Right {
5706 assert!(fields[0].is_nullable());
5708 assert!(fields[1].is_nullable());
5709 assert!(!fields[2].is_nullable());
5711 assert!(!fields[3].is_nullable());
5712 } else if join_type == JoinType::Full {
5713 assert!(fields[0].is_nullable());
5714 assert!(fields[1].is_nullable());
5715 assert!(fields[2].is_nullable());
5716 assert!(fields[3].is_nullable());
5717 }
5718 }
5719 }
5720
5721 assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5722 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5723 assert_eq!(join.join_type, join_type);
5724 assert_eq!(join.join_constraint, JoinConstraint::On);
5725 assert_eq!(join.null_equality, NullEquality::NullEqualsNothing);
5726 }
5727
5728 Ok(())
5729 }
5730
5731 #[test]
5732 fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5733 let left_schema = Schema::new(vec![
5734 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), ]);
5738
5739 let right_schema = Schema::new(vec![
5740 Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), Field::new("value", DataType::Float64, true), ]);
5744
5745 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5746
5747 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5748
5749 {
5751 let join = Join::try_new(
5754 Arc::new(left_plan.clone()),
5755 Arc::new(right_plan.clone()),
5756 vec![(col("t1.id"), col("t2.id"))],
5757 None,
5758 JoinType::Inner,
5759 JoinConstraint::Using,
5760 NullEquality::NullEqualsNothing,
5761 false,
5762 )?;
5763
5764 let fields = join.schema.fields();
5765
5766 assert_eq!(fields.len(), 6);
5767
5768 assert_eq!(
5769 fields[0].name(),
5770 "id",
5771 "First field should be 'id' from left table"
5772 );
5773 assert_eq!(
5774 fields[1].name(),
5775 "name",
5776 "Second field should be 'name' from left table"
5777 );
5778 assert_eq!(
5779 fields[2].name(),
5780 "value",
5781 "Third field should be 'value' from left table"
5782 );
5783 assert_eq!(
5784 fields[3].name(),
5785 "id",
5786 "Fourth field should be 'id' from right table"
5787 );
5788 assert_eq!(
5789 fields[4].name(),
5790 "category",
5791 "Fifth field should be 'category' from right table"
5792 );
5793 assert_eq!(
5794 fields[5].name(),
5795 "value",
5796 "Sixth field should be 'value' from right table"
5797 );
5798
5799 assert_eq!(join.join_constraint, JoinConstraint::Using);
5800 }
5801
5802 {
5804 let join = Join::try_new(
5806 Arc::new(left_plan.clone()),
5807 Arc::new(right_plan.clone()),
5808 vec![(col("t1.id"), col("t2.id"))], Some(col("t1.value").lt(col("t2.value"))), JoinType::Inner,
5811 JoinConstraint::On,
5812 NullEquality::NullEqualsNothing,
5813 false,
5814 )?;
5815
5816 let fields = join.schema.fields();
5817 assert_eq!(fields.len(), 6);
5818
5819 assert_eq!(
5820 fields[0].name(),
5821 "id",
5822 "First field should be 'id' from left table"
5823 );
5824 assert_eq!(
5825 fields[1].name(),
5826 "name",
5827 "Second field should be 'name' from left table"
5828 );
5829 assert_eq!(
5830 fields[2].name(),
5831 "value",
5832 "Third field should be 'value' from left table"
5833 );
5834 assert_eq!(
5835 fields[3].name(),
5836 "id",
5837 "Fourth field should be 'id' from right table"
5838 );
5839 assert_eq!(
5840 fields[4].name(),
5841 "category",
5842 "Fifth field should be 'category' from right table"
5843 );
5844 assert_eq!(
5845 fields[5].name(),
5846 "value",
5847 "Sixth field should be 'value' from right table"
5848 );
5849
5850 assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5851 }
5852
5853 {
5855 let join = Join::try_new(
5856 Arc::new(left_plan.clone()),
5857 Arc::new(right_plan.clone()),
5858 vec![(col("t1.id"), col("t2.id"))],
5859 None,
5860 JoinType::Inner,
5861 JoinConstraint::On,
5862 NullEquality::NullEqualsNull,
5863 false,
5864 )?;
5865
5866 assert_eq!(join.null_equality, NullEquality::NullEqualsNull);
5867 }
5868
5869 Ok(())
5870 }
5871
5872 #[test]
5873 fn test_join_try_new_schema_validation() -> Result<()> {
5874 let left_schema = Schema::new(vec![
5875 Field::new("id", DataType::Int32, false),
5876 Field::new("name", DataType::Utf8, false),
5877 Field::new("value", DataType::Float64, true),
5878 ]);
5879
5880 let right_schema = Schema::new(vec![
5881 Field::new("id", DataType::Int32, false),
5882 Field::new("category", DataType::Utf8, true),
5883 Field::new("code", DataType::Int16, false),
5884 ]);
5885
5886 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5887
5888 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5889
5890 let join_types = vec![
5891 JoinType::Inner,
5892 JoinType::Left,
5893 JoinType::Right,
5894 JoinType::Full,
5895 ];
5896
5897 for join_type in join_types {
5898 let join = Join::try_new(
5899 Arc::new(left_plan.clone()),
5900 Arc::new(right_plan.clone()),
5901 vec![(col("t1.id"), col("t2.id"))],
5902 Some(col("t1.value").gt(lit(5.0))),
5903 join_type,
5904 JoinConstraint::On,
5905 NullEquality::NullEqualsNothing,
5906 false,
5907 )?;
5908
5909 let fields = join.schema.fields();
5910 assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type} join");
5911
5912 for (i, field) in fields.iter().enumerate() {
5913 let expected_nullable = match (i, &join_type) {
5914 (0, JoinType::Right | JoinType::Full) => true, (1, JoinType::Right | JoinType::Full) => true, (2, _) => true, (3, JoinType::Left | JoinType::Full) => true, (4, _) => true, (5, JoinType::Left | JoinType::Full) => true, _ => false,
5925 };
5926
5927 assert_eq!(
5928 field.is_nullable(),
5929 expected_nullable,
5930 "Field {} ({}) nullability incorrect for {:?} join",
5931 i,
5932 field.name(),
5933 join_type
5934 );
5935 }
5936 }
5937
5938 let using_join = Join::try_new(
5939 Arc::new(left_plan.clone()),
5940 Arc::new(right_plan.clone()),
5941 vec![(col("t1.id"), col("t2.id"))],
5942 None,
5943 JoinType::Inner,
5944 JoinConstraint::Using,
5945 NullEquality::NullEqualsNothing,
5946 false,
5947 )?;
5948
5949 assert_eq!(
5950 using_join.schema.fields().len(),
5951 6,
5952 "USING join should have all fields"
5953 );
5954 assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5955
5956 Ok(())
5957 }
5958}