1use std::cmp::Ordering;
21use std::collections::{HashMap, HashSet};
22use std::fmt::{self, Debug, Display, Formatter};
23use std::hash::{Hash, Hasher};
24use std::str::FromStr;
25use std::sync::{Arc, LazyLock};
26
27use super::dml::CopyTo;
28use super::invariants::{
29 assert_always_invariants_at_current_node, assert_executable_invariants,
30 InvariantLevel,
31};
32use super::DdlStatement;
33use crate::builder::{change_redundant_column, unnest_with_options};
34use crate::expr::{Placeholder, Sort as SortExpr, WindowFunction, WindowFunctionParams};
35use crate::expr_rewriter::{
36 create_col_from_scalar_expr, normalize_cols, normalize_sorts, NamePreserver,
37};
38use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
39use crate::logical_plan::extension::UserDefinedLogicalNode;
40use crate::logical_plan::{DmlStatement, Statement};
41use crate::utils::{
42 enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs,
43 grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
44};
45use crate::{
46 build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
47 Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
48 TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
49};
50
51use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52use datafusion_common::cse::{NormalizeEq, Normalizeable};
53use datafusion_common::tree_node::{
54 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
55};
56use datafusion_common::{
57 aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
58 DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
59 FunctionalDependencies, ParamValues, Result, ScalarValue, Spans, TableReference,
60 UnnestOptions,
61};
62use indexmap::IndexSet;
63
64use crate::display::PgJsonVisitor;
66pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
67pub use datafusion_common::{JoinConstraint, JoinType};
68
69#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
203pub enum LogicalPlan {
204 Projection(Projection),
207 Filter(Filter),
216 Window(Window),
222 Aggregate(Aggregate),
228 Sort(Sort),
231 Join(Join),
234 Repartition(Repartition),
238 Union(Union),
242 TableScan(TableScan),
245 EmptyRelation(EmptyRelation),
249 Subquery(Subquery),
252 SubqueryAlias(SubqueryAlias),
254 Limit(Limit),
256 Statement(Statement),
258 Values(Values),
263 Explain(Explain),
266 Analyze(Analyze),
270 Extension(Extension),
273 Distinct(Distinct),
276 Dml(DmlStatement),
278 Ddl(DdlStatement),
280 Copy(CopyTo),
282 DescribeTable(DescribeTable),
285 Unnest(Unnest),
288 RecursiveQuery(RecursiveQuery),
290}
291
292impl Default for LogicalPlan {
293 fn default() -> Self {
294 LogicalPlan::EmptyRelation(EmptyRelation {
295 produce_one_row: false,
296 schema: Arc::new(DFSchema::empty()),
297 })
298 }
299}
300
301impl<'a> TreeNodeContainer<'a, Self> for LogicalPlan {
302 fn apply_elements<F: FnMut(&'a Self) -> Result<TreeNodeRecursion>>(
303 &'a self,
304 mut f: F,
305 ) -> Result<TreeNodeRecursion> {
306 f(self)
307 }
308
309 fn map_elements<F: FnMut(Self) -> Result<Transformed<Self>>>(
310 self,
311 mut f: F,
312 ) -> Result<Transformed<Self>> {
313 f(self)
314 }
315}
316
317impl LogicalPlan {
318 pub fn schema(&self) -> &DFSchemaRef {
320 match self {
321 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
322 LogicalPlan::Values(Values { schema, .. }) => schema,
323 LogicalPlan::TableScan(TableScan {
324 projected_schema, ..
325 }) => projected_schema,
326 LogicalPlan::Projection(Projection { schema, .. }) => schema,
327 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
328 LogicalPlan::Distinct(Distinct::All(input)) => input.schema(),
329 LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema,
330 LogicalPlan::Window(Window { schema, .. }) => schema,
331 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
332 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
333 LogicalPlan::Join(Join { schema, .. }) => schema,
334 LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
335 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
336 LogicalPlan::Statement(statement) => statement.schema(),
337 LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
338 LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
339 LogicalPlan::Explain(explain) => &explain.schema,
340 LogicalPlan::Analyze(analyze) => &analyze.schema,
341 LogicalPlan::Extension(extension) => extension.node.schema(),
342 LogicalPlan::Union(Union { schema, .. }) => schema,
343 LogicalPlan::DescribeTable(DescribeTable { output_schema, .. }) => {
344 output_schema
345 }
346 LogicalPlan::Dml(DmlStatement { output_schema, .. }) => output_schema,
347 LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
348 LogicalPlan::Ddl(ddl) => ddl.schema(),
349 LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
350 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
351 static_term.schema()
353 }
354 }
355 }
356
357 pub fn fallback_normalize_schemas(&self) -> Vec<&DFSchema> {
360 match self {
361 LogicalPlan::Window(_)
362 | LogicalPlan::Projection(_)
363 | LogicalPlan::Aggregate(_)
364 | LogicalPlan::Unnest(_)
365 | LogicalPlan::Join(_) => self
366 .inputs()
367 .iter()
368 .map(|input| input.schema().as_ref())
369 .collect(),
370 _ => vec![],
371 }
372 }
373
374 pub fn explain_schema() -> SchemaRef {
376 SchemaRef::new(Schema::new(vec![
377 Field::new("plan_type", DataType::Utf8, false),
378 Field::new("plan", DataType::Utf8, false),
379 ]))
380 }
381
382 pub fn describe_schema() -> Schema {
384 Schema::new(vec![
385 Field::new("column_name", DataType::Utf8, false),
386 Field::new("data_type", DataType::Utf8, false),
387 Field::new("is_nullable", DataType::Utf8, false),
388 ])
389 }
390
391 pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
408 let mut exprs = vec![];
409 self.apply_expressions(|e| {
410 exprs.push(e.clone());
411 Ok(TreeNodeRecursion::Continue)
412 })
413 .unwrap();
415 exprs
416 }
417
418 pub fn all_out_ref_exprs(self: &LogicalPlan) -> Vec<Expr> {
421 let mut exprs = vec![];
422 self.apply_expressions(|e| {
423 find_out_reference_exprs(e).into_iter().for_each(|e| {
424 if !exprs.contains(&e) {
425 exprs.push(e)
426 }
427 });
428 Ok(TreeNodeRecursion::Continue)
429 })
430 .unwrap();
432 self.inputs()
433 .into_iter()
434 .flat_map(|child| child.all_out_ref_exprs())
435 .for_each(|e| {
436 if !exprs.contains(&e) {
437 exprs.push(e)
438 }
439 });
440 exprs
441 }
442
443 pub fn inputs(&self) -> Vec<&LogicalPlan> {
447 match self {
448 LogicalPlan::Projection(Projection { input, .. }) => vec![input],
449 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
450 LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
451 LogicalPlan::Window(Window { input, .. }) => vec![input],
452 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
453 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
454 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
455 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
456 LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
457 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
458 LogicalPlan::Extension(extension) => extension.node.inputs(),
459 LogicalPlan::Union(Union { inputs, .. }) => {
460 inputs.iter().map(|arc| arc.as_ref()).collect()
461 }
462 LogicalPlan::Distinct(
463 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
464 ) => vec![input],
465 LogicalPlan::Explain(explain) => vec![&explain.plan],
466 LogicalPlan::Analyze(analyze) => vec![&analyze.input],
467 LogicalPlan::Dml(write) => vec![&write.input],
468 LogicalPlan::Copy(copy) => vec![©.input],
469 LogicalPlan::Ddl(ddl) => ddl.inputs(),
470 LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
471 LogicalPlan::RecursiveQuery(RecursiveQuery {
472 static_term,
473 recursive_term,
474 ..
475 }) => vec![static_term, recursive_term],
476 LogicalPlan::Statement(stmt) => stmt.inputs(),
477 LogicalPlan::TableScan { .. }
479 | LogicalPlan::EmptyRelation { .. }
480 | LogicalPlan::Values { .. }
481 | LogicalPlan::DescribeTable(_) => vec![],
482 }
483 }
484
485 pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
487 let mut using_columns: Vec<HashSet<Column>> = vec![];
488
489 self.apply_with_subqueries(|plan| {
490 if let LogicalPlan::Join(Join {
491 join_constraint: JoinConstraint::Using,
492 on,
493 ..
494 }) = plan
495 {
496 let columns =
498 on.iter().try_fold(HashSet::new(), |mut accumu, (l, r)| {
499 let Some(l) = l.get_as_join_column() else {
500 return internal_err!(
501 "Invalid join key. Expected column, found {l:?}"
502 );
503 };
504 let Some(r) = r.get_as_join_column() else {
505 return internal_err!(
506 "Invalid join key. Expected column, found {r:?}"
507 );
508 };
509 accumu.insert(l.to_owned());
510 accumu.insert(r.to_owned());
511 Result::<_, DataFusionError>::Ok(accumu)
512 })?;
513 using_columns.push(columns);
514 }
515 Ok(TreeNodeRecursion::Continue)
516 })?;
517
518 Ok(using_columns)
519 }
520
521 pub fn head_output_expr(&self) -> Result<Option<Expr>> {
523 match self {
524 LogicalPlan::Projection(projection) => {
525 Ok(Some(projection.expr.as_slice()[0].clone()))
526 }
527 LogicalPlan::Aggregate(agg) => {
528 if agg.group_expr.is_empty() {
529 Ok(Some(agg.aggr_expr.as_slice()[0].clone()))
530 } else {
531 Ok(Some(agg.group_expr.as_slice()[0].clone()))
532 }
533 }
534 LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => {
535 Ok(Some(select_expr[0].clone()))
536 }
537 LogicalPlan::Filter(Filter { input, .. })
538 | LogicalPlan::Distinct(Distinct::All(input))
539 | LogicalPlan::Sort(Sort { input, .. })
540 | LogicalPlan::Limit(Limit { input, .. })
541 | LogicalPlan::Repartition(Repartition { input, .. })
542 | LogicalPlan::Window(Window { input, .. }) => input.head_output_expr(),
543 LogicalPlan::Join(Join {
544 left,
545 right,
546 join_type,
547 ..
548 }) => match join_type {
549 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
550 if left.schema().fields().is_empty() {
551 right.head_output_expr()
552 } else {
553 left.head_output_expr()
554 }
555 }
556 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
557 left.head_output_expr()
558 }
559 JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
560 },
561 LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
562 static_term.head_output_expr()
563 }
564 LogicalPlan::Union(union) => Ok(Some(Expr::Column(Column::from(
565 union.schema.qualified_field(0),
566 )))),
567 LogicalPlan::TableScan(table) => Ok(Some(Expr::Column(Column::from(
568 table.projected_schema.qualified_field(0),
569 )))),
570 LogicalPlan::SubqueryAlias(subquery_alias) => {
571 let expr_opt = subquery_alias.input.head_output_expr()?;
572 expr_opt
573 .map(|expr| {
574 Ok(Expr::Column(create_col_from_scalar_expr(
575 &expr,
576 subquery_alias.alias.to_string(),
577 )?))
578 })
579 .map_or(Ok(None), |v| v.map(Some))
580 }
581 LogicalPlan::Subquery(_) => Ok(None),
582 LogicalPlan::EmptyRelation(_)
583 | LogicalPlan::Statement(_)
584 | LogicalPlan::Values(_)
585 | LogicalPlan::Explain(_)
586 | LogicalPlan::Analyze(_)
587 | LogicalPlan::Extension(_)
588 | LogicalPlan::Dml(_)
589 | LogicalPlan::Copy(_)
590 | LogicalPlan::Ddl(_)
591 | LogicalPlan::DescribeTable(_)
592 | LogicalPlan::Unnest(_) => Ok(None),
593 }
594 }
595
596 pub fn recompute_schema(self) -> Result<Self> {
619 match self {
620 LogicalPlan::Projection(Projection {
623 expr,
624 input,
625 schema: _,
626 }) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
627 LogicalPlan::Dml(_) => Ok(self),
628 LogicalPlan::Copy(_) => Ok(self),
629 LogicalPlan::Values(Values { schema, values }) => {
630 Ok(LogicalPlan::Values(Values { schema, values }))
632 }
633 LogicalPlan::Filter(Filter { predicate, input }) => {
634 Filter::try_new(predicate, input).map(LogicalPlan::Filter)
635 }
636 LogicalPlan::Repartition(_) => Ok(self),
637 LogicalPlan::Window(Window {
638 input,
639 window_expr,
640 schema: _,
641 }) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
642 LogicalPlan::Aggregate(Aggregate {
643 input,
644 group_expr,
645 aggr_expr,
646 schema: _,
647 }) => Aggregate::try_new(input, group_expr, aggr_expr)
648 .map(LogicalPlan::Aggregate),
649 LogicalPlan::Sort(_) => Ok(self),
650 LogicalPlan::Join(Join {
651 left,
652 right,
653 filter,
654 join_type,
655 join_constraint,
656 on,
657 schema: _,
658 null_equals_null,
659 }) => {
660 let schema =
661 build_join_schema(left.schema(), right.schema(), &join_type)?;
662
663 let new_on: Vec<_> = on
664 .into_iter()
665 .map(|equi_expr| {
666 (equi_expr.0.unalias(), equi_expr.1.unalias())
668 })
669 .collect();
670
671 Ok(LogicalPlan::Join(Join {
672 left,
673 right,
674 join_type,
675 join_constraint,
676 on: new_on,
677 filter,
678 schema: DFSchemaRef::new(schema),
679 null_equals_null,
680 }))
681 }
682 LogicalPlan::Subquery(_) => Ok(self),
683 LogicalPlan::SubqueryAlias(SubqueryAlias {
684 input,
685 alias,
686 schema: _,
687 }) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
688 LogicalPlan::Limit(_) => Ok(self),
689 LogicalPlan::Ddl(_) => Ok(self),
690 LogicalPlan::Extension(Extension { node }) => {
691 let expr = node.expressions();
694 let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
695 Ok(LogicalPlan::Extension(Extension {
696 node: node.with_exprs_and_inputs(expr, inputs)?,
697 }))
698 }
699 LogicalPlan::Union(Union { inputs, schema }) => {
700 let first_input_schema = inputs[0].schema();
701 if schema.fields().len() == first_input_schema.fields().len() {
702 Ok(LogicalPlan::Union(Union { inputs, schema }))
704 } else {
705 Ok(LogicalPlan::Union(Union::try_new(inputs)?))
713 }
714 }
715 LogicalPlan::Distinct(distinct) => {
716 let distinct = match distinct {
717 Distinct::All(input) => Distinct::All(input),
718 Distinct::On(DistinctOn {
719 on_expr,
720 select_expr,
721 sort_expr,
722 input,
723 schema: _,
724 }) => Distinct::On(DistinctOn::try_new(
725 on_expr,
726 select_expr,
727 sort_expr,
728 input,
729 )?),
730 };
731 Ok(LogicalPlan::Distinct(distinct))
732 }
733 LogicalPlan::RecursiveQuery(_) => Ok(self),
734 LogicalPlan::Analyze(_) => Ok(self),
735 LogicalPlan::Explain(_) => Ok(self),
736 LogicalPlan::TableScan(_) => Ok(self),
737 LogicalPlan::EmptyRelation(_) => Ok(self),
738 LogicalPlan::Statement(_) => Ok(self),
739 LogicalPlan::DescribeTable(_) => Ok(self),
740 LogicalPlan::Unnest(Unnest {
741 input,
742 exec_columns,
743 options,
744 ..
745 }) => {
746 unnest_with_options(Arc::unwrap_or_clone(input), exec_columns, options)
748 }
749 }
750 }
751
752 pub fn with_new_exprs(
778 &self,
779 mut expr: Vec<Expr>,
780 inputs: Vec<LogicalPlan>,
781 ) -> Result<LogicalPlan> {
782 match self {
783 LogicalPlan::Projection(Projection { .. }) => {
786 let input = self.only_input(inputs)?;
787 Projection::try_new(expr, Arc::new(input)).map(LogicalPlan::Projection)
788 }
789 LogicalPlan::Dml(DmlStatement {
790 table_name,
791 target,
792 op,
793 ..
794 }) => {
795 self.assert_no_expressions(expr)?;
796 let input = self.only_input(inputs)?;
797 Ok(LogicalPlan::Dml(DmlStatement::new(
798 table_name.clone(),
799 Arc::clone(target),
800 op.clone(),
801 Arc::new(input),
802 )))
803 }
804 LogicalPlan::Copy(CopyTo {
805 input: _,
806 output_url,
807 file_type,
808 options,
809 partition_by,
810 }) => {
811 self.assert_no_expressions(expr)?;
812 let input = self.only_input(inputs)?;
813 Ok(LogicalPlan::Copy(CopyTo {
814 input: Arc::new(input),
815 output_url: output_url.clone(),
816 file_type: Arc::clone(file_type),
817 options: options.clone(),
818 partition_by: partition_by.clone(),
819 }))
820 }
821 LogicalPlan::Values(Values { schema, .. }) => {
822 self.assert_no_inputs(inputs)?;
823 Ok(LogicalPlan::Values(Values {
824 schema: Arc::clone(schema),
825 values: expr
826 .chunks_exact(schema.fields().len())
827 .map(|s| s.to_vec())
828 .collect(),
829 }))
830 }
831 LogicalPlan::Filter { .. } => {
832 let predicate = self.only_expr(expr)?;
833 let input = self.only_input(inputs)?;
834
835 Filter::try_new(predicate, Arc::new(input)).map(LogicalPlan::Filter)
836 }
837 LogicalPlan::Repartition(Repartition {
838 partitioning_scheme,
839 ..
840 }) => match partitioning_scheme {
841 Partitioning::RoundRobinBatch(n) => {
842 self.assert_no_expressions(expr)?;
843 let input = self.only_input(inputs)?;
844 Ok(LogicalPlan::Repartition(Repartition {
845 partitioning_scheme: Partitioning::RoundRobinBatch(*n),
846 input: Arc::new(input),
847 }))
848 }
849 Partitioning::Hash(_, n) => {
850 let input = self.only_input(inputs)?;
851 Ok(LogicalPlan::Repartition(Repartition {
852 partitioning_scheme: Partitioning::Hash(expr, *n),
853 input: Arc::new(input),
854 }))
855 }
856 Partitioning::DistributeBy(_) => {
857 let input = self.only_input(inputs)?;
858 Ok(LogicalPlan::Repartition(Repartition {
859 partitioning_scheme: Partitioning::DistributeBy(expr),
860 input: Arc::new(input),
861 }))
862 }
863 },
864 LogicalPlan::Window(Window { window_expr, .. }) => {
865 assert_eq!(window_expr.len(), expr.len());
866 let input = self.only_input(inputs)?;
867 Window::try_new(expr, Arc::new(input)).map(LogicalPlan::Window)
868 }
869 LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
870 let input = self.only_input(inputs)?;
871 let agg_expr = expr.split_off(group_expr.len());
873
874 Aggregate::try_new(Arc::new(input), expr, agg_expr)
875 .map(LogicalPlan::Aggregate)
876 }
877 LogicalPlan::Sort(Sort {
878 expr: sort_expr,
879 fetch,
880 ..
881 }) => {
882 let input = self.only_input(inputs)?;
883 Ok(LogicalPlan::Sort(Sort {
884 expr: expr
885 .into_iter()
886 .zip(sort_expr.iter())
887 .map(|(expr, sort)| sort.with_expr(expr))
888 .collect(),
889 input: Arc::new(input),
890 fetch: *fetch,
891 }))
892 }
893 LogicalPlan::Join(Join {
894 join_type,
895 join_constraint,
896 on,
897 null_equals_null,
898 ..
899 }) => {
900 let (left, right) = self.only_two_inputs(inputs)?;
901 let schema = build_join_schema(left.schema(), right.schema(), join_type)?;
902
903 let equi_expr_count = on.len() * 2;
904 assert!(expr.len() >= equi_expr_count);
905
906 let filter_expr = if expr.len() > equi_expr_count {
909 expr.pop()
910 } else {
911 None
912 };
913
914 assert_eq!(expr.len(), equi_expr_count);
917 let mut new_on = Vec::with_capacity(on.len());
918 let mut iter = expr.into_iter();
919 while let Some(left) = iter.next() {
920 let Some(right) = iter.next() else {
921 internal_err!("Expected a pair of expressions to construct the join on expression")?
922 };
923
924 new_on.push((left.unalias(), right.unalias()));
926 }
927
928 Ok(LogicalPlan::Join(Join {
929 left: Arc::new(left),
930 right: Arc::new(right),
931 join_type: *join_type,
932 join_constraint: *join_constraint,
933 on: new_on,
934 filter: filter_expr,
935 schema: DFSchemaRef::new(schema),
936 null_equals_null: *null_equals_null,
937 }))
938 }
939 LogicalPlan::Subquery(Subquery {
940 outer_ref_columns,
941 spans,
942 ..
943 }) => {
944 self.assert_no_expressions(expr)?;
945 let input = self.only_input(inputs)?;
946 let subquery = LogicalPlanBuilder::from(input).build()?;
947 Ok(LogicalPlan::Subquery(Subquery {
948 subquery: Arc::new(subquery),
949 outer_ref_columns: outer_ref_columns.clone(),
950 spans: spans.clone(),
951 }))
952 }
953 LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
954 self.assert_no_expressions(expr)?;
955 let input = self.only_input(inputs)?;
956 SubqueryAlias::try_new(Arc::new(input), alias.clone())
957 .map(LogicalPlan::SubqueryAlias)
958 }
959 LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
960 let old_expr_len = skip.iter().chain(fetch.iter()).count();
961 if old_expr_len != expr.len() {
962 return internal_err!(
963 "Invalid number of new Limit expressions: expected {}, got {}",
964 old_expr_len,
965 expr.len()
966 );
967 }
968 let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
970 let new_skip = skip.as_ref().and_then(|_| expr.pop());
971 let input = self.only_input(inputs)?;
972 Ok(LogicalPlan::Limit(Limit {
973 skip: new_skip.map(Box::new),
974 fetch: new_fetch.map(Box::new),
975 input: Arc::new(input),
976 }))
977 }
978 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(CreateMemoryTable {
979 name,
980 if_not_exists,
981 or_replace,
982 column_defaults,
983 temporary,
984 ..
985 })) => {
986 self.assert_no_expressions(expr)?;
987 let input = self.only_input(inputs)?;
988 Ok(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(
989 CreateMemoryTable {
990 input: Arc::new(input),
991 constraints: Constraints::empty(),
992 name: name.clone(),
993 if_not_exists: *if_not_exists,
994 or_replace: *or_replace,
995 column_defaults: column_defaults.clone(),
996 temporary: *temporary,
997 },
998 )))
999 }
1000 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1001 name,
1002 or_replace,
1003 definition,
1004 temporary,
1005 ..
1006 })) => {
1007 self.assert_no_expressions(expr)?;
1008 let input = self.only_input(inputs)?;
1009 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1010 input: Arc::new(input),
1011 name: name.clone(),
1012 or_replace: *or_replace,
1013 temporary: *temporary,
1014 definition: definition.clone(),
1015 })))
1016 }
1017 LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
1018 node: e.node.with_exprs_and_inputs(expr, inputs)?,
1019 })),
1020 LogicalPlan::Union(Union { schema, .. }) => {
1021 self.assert_no_expressions(expr)?;
1022 let input_schema = inputs[0].schema();
1023 let schema = if schema.fields().len() == input_schema.fields().len() {
1025 Arc::clone(schema)
1026 } else {
1027 Arc::clone(input_schema)
1028 };
1029 Ok(LogicalPlan::Union(Union {
1030 inputs: inputs.into_iter().map(Arc::new).collect(),
1031 schema,
1032 }))
1033 }
1034 LogicalPlan::Distinct(distinct) => {
1035 let distinct = match distinct {
1036 Distinct::All(_) => {
1037 self.assert_no_expressions(expr)?;
1038 let input = self.only_input(inputs)?;
1039 Distinct::All(Arc::new(input))
1040 }
1041 Distinct::On(DistinctOn {
1042 on_expr,
1043 select_expr,
1044 ..
1045 }) => {
1046 let input = self.only_input(inputs)?;
1047 let sort_expr = expr.split_off(on_expr.len() + select_expr.len());
1048 let select_expr = expr.split_off(on_expr.len());
1049 assert!(sort_expr.is_empty(), "with_new_exprs for Distinct does not support sort expressions");
1050 Distinct::On(DistinctOn::try_new(
1051 expr,
1052 select_expr,
1053 None, Arc::new(input),
1055 )?)
1056 }
1057 };
1058 Ok(LogicalPlan::Distinct(distinct))
1059 }
1060 LogicalPlan::RecursiveQuery(RecursiveQuery {
1061 name, is_distinct, ..
1062 }) => {
1063 self.assert_no_expressions(expr)?;
1064 let (static_term, recursive_term) = self.only_two_inputs(inputs)?;
1065 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1066 name: name.clone(),
1067 static_term: Arc::new(static_term),
1068 recursive_term: Arc::new(recursive_term),
1069 is_distinct: *is_distinct,
1070 }))
1071 }
1072 LogicalPlan::Analyze(a) => {
1073 self.assert_no_expressions(expr)?;
1074 let input = self.only_input(inputs)?;
1075 Ok(LogicalPlan::Analyze(Analyze {
1076 verbose: a.verbose,
1077 schema: Arc::clone(&a.schema),
1078 input: Arc::new(input),
1079 }))
1080 }
1081 LogicalPlan::Explain(e) => {
1082 self.assert_no_expressions(expr)?;
1083 let input = self.only_input(inputs)?;
1084 Ok(LogicalPlan::Explain(Explain {
1085 verbose: e.verbose,
1086 plan: Arc::new(input),
1087 explain_format: e.explain_format.clone(),
1088 stringified_plans: e.stringified_plans.clone(),
1089 schema: Arc::clone(&e.schema),
1090 logical_optimization_succeeded: e.logical_optimization_succeeded,
1091 }))
1092 }
1093 LogicalPlan::Statement(Statement::Prepare(Prepare {
1094 name,
1095 data_types,
1096 ..
1097 })) => {
1098 self.assert_no_expressions(expr)?;
1099 let input = self.only_input(inputs)?;
1100 Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
1101 name: name.clone(),
1102 data_types: data_types.clone(),
1103 input: Arc::new(input),
1104 })))
1105 }
1106 LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
1107 self.assert_no_inputs(inputs)?;
1108 Ok(LogicalPlan::Statement(Statement::Execute(Execute {
1109 name: name.clone(),
1110 parameters: expr,
1111 })))
1112 }
1113 LogicalPlan::TableScan(ts) => {
1114 self.assert_no_inputs(inputs)?;
1115 Ok(LogicalPlan::TableScan(TableScan {
1116 filters: expr,
1117 ..ts.clone()
1118 }))
1119 }
1120 LogicalPlan::EmptyRelation(_)
1121 | LogicalPlan::Ddl(_)
1122 | LogicalPlan::Statement(_)
1123 | LogicalPlan::DescribeTable(_) => {
1124 self.assert_no_expressions(expr)?;
1126 self.assert_no_inputs(inputs)?;
1127 Ok(self.clone())
1128 }
1129 LogicalPlan::Unnest(Unnest {
1130 exec_columns: columns,
1131 options,
1132 ..
1133 }) => {
1134 self.assert_no_expressions(expr)?;
1135 let input = self.only_input(inputs)?;
1136 let new_plan =
1138 unnest_with_options(input, columns.clone(), options.clone())?;
1139 Ok(new_plan)
1140 }
1141 }
1142 }
1143
1144 pub fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
1146 match check {
1147 InvariantLevel::Always => assert_always_invariants_at_current_node(self),
1148 InvariantLevel::Executable => assert_executable_invariants(self),
1149 }
1150 }
1151
1152 #[inline]
1154 #[allow(clippy::needless_pass_by_value)] fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
1156 if !expr.is_empty() {
1157 return internal_err!("{self:?} should have no exprs, got {:?}", expr);
1158 }
1159 Ok(())
1160 }
1161
1162 #[inline]
1164 #[allow(clippy::needless_pass_by_value)] fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
1166 if !inputs.is_empty() {
1167 return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
1168 }
1169 Ok(())
1170 }
1171
1172 #[inline]
1174 fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
1175 if expr.len() != 1 {
1176 return internal_err!(
1177 "{self:?} should have exactly one expr, got {:?}",
1178 expr
1179 );
1180 }
1181 Ok(expr.remove(0))
1182 }
1183
1184 #[inline]
1186 fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
1187 if inputs.len() != 1 {
1188 return internal_err!(
1189 "{self:?} should have exactly one input, got {:?}",
1190 inputs
1191 );
1192 }
1193 Ok(inputs.remove(0))
1194 }
1195
1196 #[inline]
1198 fn only_two_inputs(
1199 &self,
1200 mut inputs: Vec<LogicalPlan>,
1201 ) -> Result<(LogicalPlan, LogicalPlan)> {
1202 if inputs.len() != 2 {
1203 return internal_err!(
1204 "{self:?} should have exactly two inputs, got {:?}",
1205 inputs
1206 );
1207 }
1208 let right = inputs.remove(1);
1209 let left = inputs.remove(0);
1210 Ok((left, right))
1211 }
1212
1213 pub fn with_param_values(
1267 self,
1268 param_values: impl Into<ParamValues>,
1269 ) -> Result<LogicalPlan> {
1270 let param_values = param_values.into();
1271 let plan_with_values = self.replace_params_with_values(¶m_values)?;
1272
1273 Ok(
1275 if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1276 plan_with_values
1277 {
1278 param_values.verify(&prepare_lp.data_types)?;
1279 Arc::unwrap_or_clone(prepare_lp.input)
1281 } else {
1282 plan_with_values
1283 },
1284 )
1285 }
1286
1287 pub fn max_rows(self: &LogicalPlan) -> Option<usize> {
1292 match self {
1293 LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(),
1294 LogicalPlan::Filter(filter) => {
1295 if filter.is_scalar() {
1296 Some(1)
1297 } else {
1298 filter.input.max_rows()
1299 }
1300 }
1301 LogicalPlan::Window(Window { input, .. }) => input.max_rows(),
1302 LogicalPlan::Aggregate(Aggregate {
1303 input, group_expr, ..
1304 }) => {
1305 if group_expr
1307 .iter()
1308 .all(|expr| matches!(expr, Expr::Literal(_, _)))
1309 {
1310 Some(1)
1311 } else {
1312 input.max_rows()
1313 }
1314 }
1315 LogicalPlan::Sort(Sort { input, fetch, .. }) => {
1316 match (fetch, input.max_rows()) {
1317 (Some(fetch_limit), Some(input_max)) => {
1318 Some(input_max.min(*fetch_limit))
1319 }
1320 (Some(fetch_limit), None) => Some(*fetch_limit),
1321 (None, Some(input_max)) => Some(input_max),
1322 (None, None) => None,
1323 }
1324 }
1325 LogicalPlan::Join(Join {
1326 left,
1327 right,
1328 join_type,
1329 ..
1330 }) => match join_type {
1331 JoinType::Inner => Some(left.max_rows()? * right.max_rows()?),
1332 JoinType::Left | JoinType::Right | JoinType::Full => {
1333 match (left.max_rows()?, right.max_rows()?, join_type) {
1334 (0, 0, _) => Some(0),
1335 (max_rows, 0, JoinType::Left | JoinType::Full) => Some(max_rows),
1336 (0, max_rows, JoinType::Right | JoinType::Full) => Some(max_rows),
1337 (left_max, right_max, _) => Some(left_max * right_max),
1338 }
1339 }
1340 JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
1341 left.max_rows()
1342 }
1343 JoinType::RightSemi | JoinType::RightAnti => right.max_rows(),
1344 },
1345 LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
1346 LogicalPlan::Union(Union { inputs, .. }) => {
1347 inputs.iter().try_fold(0usize, |mut acc, plan| {
1348 acc += plan.max_rows()?;
1349 Some(acc)
1350 })
1351 }
1352 LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
1353 LogicalPlan::EmptyRelation(_) => Some(0),
1354 LogicalPlan::RecursiveQuery(_) => None,
1355 LogicalPlan::Subquery(_) => None,
1356 LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
1357 LogicalPlan::Limit(limit) => match limit.get_fetch_type() {
1358 Ok(FetchType::Literal(s)) => s,
1359 _ => None,
1360 },
1361 LogicalPlan::Distinct(
1362 Distinct::All(input) | Distinct::On(DistinctOn { input, .. }),
1363 ) => input.max_rows(),
1364 LogicalPlan::Values(v) => Some(v.values.len()),
1365 LogicalPlan::Unnest(_) => None,
1366 LogicalPlan::Ddl(_)
1367 | LogicalPlan::Explain(_)
1368 | LogicalPlan::Analyze(_)
1369 | LogicalPlan::Dml(_)
1370 | LogicalPlan::Copy(_)
1371 | LogicalPlan::DescribeTable(_)
1372 | LogicalPlan::Statement(_)
1373 | LogicalPlan::Extension(_) => None,
1374 }
1375 }
1376
1377 pub fn contains_outer_reference(&self) -> bool {
1379 let mut contains = false;
1380 self.apply_expressions(|expr| {
1381 Ok(if expr.contains_outer() {
1382 contains = true;
1383 TreeNodeRecursion::Stop
1384 } else {
1385 TreeNodeRecursion::Continue
1386 })
1387 })
1388 .unwrap();
1389 contains
1390 }
1391
1392 pub fn columnized_output_exprs(&self) -> Result<Vec<(&Expr, Column)>> {
1400 match self {
1401 LogicalPlan::Aggregate(aggregate) => Ok(aggregate
1402 .output_expressions()?
1403 .into_iter()
1404 .zip(self.schema().columns())
1405 .collect()),
1406 LogicalPlan::Window(Window {
1407 window_expr,
1408 input,
1409 schema,
1410 }) => {
1411 let mut output_exprs = input.columnized_output_exprs()?;
1419 let input_len = input.schema().fields().len();
1420 output_exprs.extend(
1421 window_expr
1422 .iter()
1423 .zip(schema.columns().into_iter().skip(input_len)),
1424 );
1425 Ok(output_exprs)
1426 }
1427 _ => Ok(vec![]),
1428 }
1429 }
1430}
1431
1432impl LogicalPlan {
1433 pub fn replace_params_with_values(
1440 self,
1441 param_values: &ParamValues,
1442 ) -> Result<LogicalPlan> {
1443 self.transform_up_with_subqueries(|plan| {
1444 let schema = Arc::clone(plan.schema());
1445 let name_preserver = NamePreserver::new(&plan);
1446 plan.map_expressions(|e| {
1447 let (e, has_placeholder) = e.infer_placeholder_types(&schema)?;
1448 if !has_placeholder {
1449 Ok(Transformed::no(e))
1453 } else {
1454 let original_name = name_preserver.save(&e);
1455 let transformed_expr = e.transform_up(|e| {
1456 if let Expr::Placeholder(Placeholder { id, .. }) = e {
1457 let value = param_values.get_placeholders_with_values(&id)?;
1458 Ok(Transformed::yes(Expr::Literal(value, None)))
1459 } else {
1460 Ok(Transformed::no(e))
1461 }
1462 })?;
1463 Ok(transformed_expr.update_data(|expr| original_name.restore(expr)))
1465 }
1466 })
1467 })
1468 .map(|res| res.data)
1469 }
1470
1471 pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
1473 let mut param_names = HashSet::new();
1474 self.apply_with_subqueries(|plan| {
1475 plan.apply_expressions(|expr| {
1476 expr.apply(|expr| {
1477 if let Expr::Placeholder(Placeholder { id, .. }) = expr {
1478 param_names.insert(id.clone());
1479 }
1480 Ok(TreeNodeRecursion::Continue)
1481 })
1482 })
1483 })
1484 .map(|_| param_names)
1485 }
1486
1487 pub fn get_parameter_types(
1489 &self,
1490 ) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {
1491 let mut param_types: HashMap<String, Option<DataType>> = HashMap::new();
1492
1493 self.apply_with_subqueries(|plan| {
1494 plan.apply_expressions(|expr| {
1495 expr.apply(|expr| {
1496 if let Expr::Placeholder(Placeholder { id, data_type }) = expr {
1497 let prev = param_types.get(id);
1498 match (prev, data_type) {
1499 (Some(Some(prev)), Some(dt)) => {
1500 if prev != dt {
1501 plan_err!("Conflicting types for {id}")?;
1502 }
1503 }
1504 (_, Some(dt)) => {
1505 param_types.insert(id.clone(), Some(dt.clone()));
1506 }
1507 _ => {
1508 param_types.insert(id.clone(), None);
1509 }
1510 }
1511 }
1512 Ok(TreeNodeRecursion::Continue)
1513 })
1514 })
1515 })
1516 .map(|_| param_types)
1517 }
1518
1519 pub fn display_indent(&self) -> impl Display + '_ {
1551 struct Wrapper<'a>(&'a LogicalPlan);
1554 impl Display for Wrapper<'_> {
1555 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1556 let with_schema = false;
1557 let mut visitor = IndentVisitor::new(f, with_schema);
1558 match self.0.visit_with_subqueries(&mut visitor) {
1559 Ok(_) => Ok(()),
1560 Err(_) => Err(fmt::Error),
1561 }
1562 }
1563 }
1564 Wrapper(self)
1565 }
1566
1567 pub fn display_indent_schema(&self) -> impl Display + '_ {
1594 struct Wrapper<'a>(&'a LogicalPlan);
1597 impl Display for Wrapper<'_> {
1598 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1599 let with_schema = true;
1600 let mut visitor = IndentVisitor::new(f, with_schema);
1601 match self.0.visit_with_subqueries(&mut visitor) {
1602 Ok(_) => Ok(()),
1603 Err(_) => Err(fmt::Error),
1604 }
1605 }
1606 }
1607 Wrapper(self)
1608 }
1609
1610 pub fn display_pg_json(&self) -> impl Display + '_ {
1614 struct Wrapper<'a>(&'a LogicalPlan);
1617 impl Display for Wrapper<'_> {
1618 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1619 let mut visitor = PgJsonVisitor::new(f);
1620 visitor.with_schema(true);
1621 match self.0.visit_with_subqueries(&mut visitor) {
1622 Ok(_) => Ok(()),
1623 Err(_) => Err(fmt::Error),
1624 }
1625 }
1626 }
1627 Wrapper(self)
1628 }
1629
1630 pub fn display_graphviz(&self) -> impl Display + '_ {
1660 struct Wrapper<'a>(&'a LogicalPlan);
1663 impl Display for Wrapper<'_> {
1664 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1665 let mut visitor = GraphvizVisitor::new(f);
1666
1667 visitor.start_graph()?;
1668
1669 visitor.pre_visit_plan("LogicalPlan")?;
1670 self.0
1671 .visit_with_subqueries(&mut visitor)
1672 .map_err(|_| fmt::Error)?;
1673 visitor.post_visit_plan()?;
1674
1675 visitor.set_with_schema(true);
1676 visitor.pre_visit_plan("Detailed LogicalPlan")?;
1677 self.0
1678 .visit_with_subqueries(&mut visitor)
1679 .map_err(|_| fmt::Error)?;
1680 visitor.post_visit_plan()?;
1681
1682 visitor.end_graph()?;
1683 Ok(())
1684 }
1685 }
1686 Wrapper(self)
1687 }
1688
1689 pub fn display(&self) -> impl Display + '_ {
1711 struct Wrapper<'a>(&'a LogicalPlan);
1714 impl Display for Wrapper<'_> {
1715 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
1716 match self.0 {
1717 LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
1718 LogicalPlan::RecursiveQuery(RecursiveQuery {
1719 is_distinct, ..
1720 }) => {
1721 write!(f, "RecursiveQuery: is_distinct={is_distinct}")
1722 }
1723 LogicalPlan::Values(Values { ref values, .. }) => {
1724 let str_values: Vec<_> = values
1725 .iter()
1726 .take(5)
1728 .map(|row| {
1729 let item = row
1730 .iter()
1731 .map(|expr| expr.to_string())
1732 .collect::<Vec<_>>()
1733 .join(", ");
1734 format!("({item})")
1735 })
1736 .collect();
1737
1738 let eclipse = if values.len() > 5 { "..." } else { "" };
1739 write!(f, "Values: {}{}", str_values.join(", "), eclipse)
1740 }
1741
1742 LogicalPlan::TableScan(TableScan {
1743 ref source,
1744 ref table_name,
1745 ref projection,
1746 ref filters,
1747 ref fetch,
1748 ..
1749 }) => {
1750 let projected_fields = match projection {
1751 Some(indices) => {
1752 let schema = source.schema();
1753 let names: Vec<&str> = indices
1754 .iter()
1755 .map(|i| schema.field(*i).name().as_str())
1756 .collect();
1757 format!(" projection=[{}]", names.join(", "))
1758 }
1759 _ => "".to_string(),
1760 };
1761
1762 write!(f, "TableScan: {table_name}{projected_fields}")?;
1763
1764 if !filters.is_empty() {
1765 let mut full_filter = vec![];
1766 let mut partial_filter = vec![];
1767 let mut unsupported_filters = vec![];
1768 let filters: Vec<&Expr> = filters.iter().collect();
1769
1770 if let Ok(results) =
1771 source.supports_filters_pushdown(&filters)
1772 {
1773 filters.iter().zip(results.iter()).for_each(
1774 |(x, res)| match res {
1775 TableProviderFilterPushDown::Exact => {
1776 full_filter.push(x)
1777 }
1778 TableProviderFilterPushDown::Inexact => {
1779 partial_filter.push(x)
1780 }
1781 TableProviderFilterPushDown::Unsupported => {
1782 unsupported_filters.push(x)
1783 }
1784 },
1785 );
1786 }
1787
1788 if !full_filter.is_empty() {
1789 write!(
1790 f,
1791 ", full_filters=[{}]",
1792 expr_vec_fmt!(full_filter)
1793 )?;
1794 };
1795 if !partial_filter.is_empty() {
1796 write!(
1797 f,
1798 ", partial_filters=[{}]",
1799 expr_vec_fmt!(partial_filter)
1800 )?;
1801 }
1802 if !unsupported_filters.is_empty() {
1803 write!(
1804 f,
1805 ", unsupported_filters=[{}]",
1806 expr_vec_fmt!(unsupported_filters)
1807 )?;
1808 }
1809 }
1810
1811 if let Some(n) = fetch {
1812 write!(f, ", fetch={n}")?;
1813 }
1814
1815 Ok(())
1816 }
1817 LogicalPlan::Projection(Projection { ref expr, .. }) => {
1818 write!(f, "Projection:")?;
1819 for (i, expr_item) in expr.iter().enumerate() {
1820 if i > 0 {
1821 write!(f, ",")?;
1822 }
1823 write!(f, " {expr_item}")?;
1824 }
1825 Ok(())
1826 }
1827 LogicalPlan::Dml(DmlStatement { table_name, op, .. }) => {
1828 write!(f, "Dml: op=[{op}] table=[{table_name}]")
1829 }
1830 LogicalPlan::Copy(CopyTo {
1831 input: _,
1832 output_url,
1833 file_type,
1834 options,
1835 ..
1836 }) => {
1837 let op_str = options
1838 .iter()
1839 .map(|(k, v)| format!("{k} {v}"))
1840 .collect::<Vec<String>>()
1841 .join(", ");
1842
1843 write!(f, "CopyTo: format={} output_url={output_url} options: ({op_str})", file_type.get_ext())
1844 }
1845 LogicalPlan::Ddl(ddl) => {
1846 write!(f, "{}", ddl.display())
1847 }
1848 LogicalPlan::Filter(Filter {
1849 predicate: ref expr,
1850 ..
1851 }) => write!(f, "Filter: {expr}"),
1852 LogicalPlan::Window(Window {
1853 ref window_expr, ..
1854 }) => {
1855 write!(
1856 f,
1857 "WindowAggr: windowExpr=[[{}]]",
1858 expr_vec_fmt!(window_expr)
1859 )
1860 }
1861 LogicalPlan::Aggregate(Aggregate {
1862 ref group_expr,
1863 ref aggr_expr,
1864 ..
1865 }) => write!(
1866 f,
1867 "Aggregate: groupBy=[[{}]], aggr=[[{}]]",
1868 expr_vec_fmt!(group_expr),
1869 expr_vec_fmt!(aggr_expr)
1870 ),
1871 LogicalPlan::Sort(Sort { expr, fetch, .. }) => {
1872 write!(f, "Sort: ")?;
1873 for (i, expr_item) in expr.iter().enumerate() {
1874 if i > 0 {
1875 write!(f, ", ")?;
1876 }
1877 write!(f, "{expr_item}")?;
1878 }
1879 if let Some(a) = fetch {
1880 write!(f, ", fetch={a}")?;
1881 }
1882
1883 Ok(())
1884 }
1885 LogicalPlan::Join(Join {
1886 on: ref keys,
1887 filter,
1888 join_constraint,
1889 join_type,
1890 ..
1891 }) => {
1892 let join_expr: Vec<String> =
1893 keys.iter().map(|(l, r)| format!("{l} = {r}")).collect();
1894 let filter_expr = filter
1895 .as_ref()
1896 .map(|expr| format!(" Filter: {expr}"))
1897 .unwrap_or_else(|| "".to_string());
1898 let join_type = if filter.is_none() && keys.is_empty() && matches!(join_type, JoinType::Inner) {
1899 "Cross".to_string()
1900 } else {
1901 join_type.to_string()
1902 };
1903 match join_constraint {
1904 JoinConstraint::On => {
1905 write!(
1906 f,
1907 "{} Join: {}{}",
1908 join_type,
1909 join_expr.join(", "),
1910 filter_expr
1911 )
1912 }
1913 JoinConstraint::Using => {
1914 write!(
1915 f,
1916 "{} Join: Using {}{}",
1917 join_type,
1918 join_expr.join(", "),
1919 filter_expr,
1920 )
1921 }
1922 }
1923 }
1924 LogicalPlan::Repartition(Repartition {
1925 partitioning_scheme,
1926 ..
1927 }) => match partitioning_scheme {
1928 Partitioning::RoundRobinBatch(n) => {
1929 write!(f, "Repartition: RoundRobinBatch partition_count={n}")
1930 }
1931 Partitioning::Hash(expr, n) => {
1932 let hash_expr: Vec<String> =
1933 expr.iter().map(|e| format!("{e}")).collect();
1934 write!(
1935 f,
1936 "Repartition: Hash({}) partition_count={}",
1937 hash_expr.join(", "),
1938 n
1939 )
1940 }
1941 Partitioning::DistributeBy(expr) => {
1942 let dist_by_expr: Vec<String> =
1943 expr.iter().map(|e| format!("{e}")).collect();
1944 write!(
1945 f,
1946 "Repartition: DistributeBy({})",
1947 dist_by_expr.join(", "),
1948 )
1949 }
1950 },
1951 LogicalPlan::Limit(limit) => {
1952 let skip_str = match limit.get_skip_type() {
1954 Ok(SkipType::Literal(n)) => n.to_string(),
1955 _ => limit.skip.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string()),
1956 };
1957 let fetch_str = match limit.get_fetch_type() {
1958 Ok(FetchType::Literal(Some(n))) => n.to_string(),
1959 Ok(FetchType::Literal(None)) => "None".to_string(),
1960 _ => limit.fetch.as_ref().map_or_else(|| "None".to_string(), |x| x.to_string())
1961 };
1962 write!(
1963 f,
1964 "Limit: skip={skip_str}, fetch={fetch_str}",
1965 )
1966 }
1967 LogicalPlan::Subquery(Subquery { .. }) => {
1968 write!(f, "Subquery:")
1969 }
1970 LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
1971 write!(f, "SubqueryAlias: {alias}")
1972 }
1973 LogicalPlan::Statement(statement) => {
1974 write!(f, "{}", statement.display())
1975 }
1976 LogicalPlan::Distinct(distinct) => match distinct {
1977 Distinct::All(_) => write!(f, "Distinct:"),
1978 Distinct::On(DistinctOn {
1979 on_expr,
1980 select_expr,
1981 sort_expr,
1982 ..
1983 }) => write!(
1984 f,
1985 "DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]",
1986 expr_vec_fmt!(on_expr),
1987 expr_vec_fmt!(select_expr),
1988 if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() },
1989 ),
1990 },
1991 LogicalPlan::Explain { .. } => write!(f, "Explain"),
1992 LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
1993 LogicalPlan::Union(_) => write!(f, "Union"),
1994 LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
1995 LogicalPlan::DescribeTable(DescribeTable { .. }) => {
1996 write!(f, "DescribeTable")
1997 }
1998 LogicalPlan::Unnest(Unnest {
1999 input: plan,
2000 list_type_columns: list_col_indices,
2001 struct_type_columns: struct_col_indices, .. }) => {
2002 let input_columns = plan.schema().columns();
2003 let list_type_columns = list_col_indices
2004 .iter()
2005 .map(|(i,unnest_info)|
2006 format!("{}|depth={}", &input_columns[*i].to_string(),
2007 unnest_info.depth))
2008 .collect::<Vec<String>>();
2009 let struct_type_columns = struct_col_indices
2010 .iter()
2011 .map(|i| &input_columns[*i])
2012 .collect::<Vec<&Column>>();
2013 write!(f, "Unnest: lists[{}] structs[{}]",
2015 expr_vec_fmt!(list_type_columns),
2016 expr_vec_fmt!(struct_type_columns))
2017 }
2018 }
2019 }
2020 }
2021 Wrapper(self)
2022 }
2023}
2024
2025impl Display for LogicalPlan {
2026 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2027 self.display_indent().fmt(f)
2028 }
2029}
2030
2031impl ToStringifiedPlan for LogicalPlan {
2032 fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
2033 StringifiedPlan::new(plan_type, self.display_indent().to_string())
2034 }
2035}
2036
2037#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2039pub struct EmptyRelation {
2040 pub produce_one_row: bool,
2042 pub schema: DFSchemaRef,
2044}
2045
2046impl PartialOrd for EmptyRelation {
2048 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2049 self.produce_one_row.partial_cmp(&other.produce_one_row)
2050 }
2051}
2052
2053#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2076pub struct RecursiveQuery {
2077 pub name: String,
2079 pub static_term: Arc<LogicalPlan>,
2081 pub recursive_term: Arc<LogicalPlan>,
2084 pub is_distinct: bool,
2087}
2088
2089#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2093pub struct Values {
2094 pub schema: DFSchemaRef,
2096 pub values: Vec<Vec<Expr>>,
2098}
2099
2100impl PartialOrd for Values {
2102 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2103 self.values.partial_cmp(&other.values)
2104 }
2105}
2106
2107#[derive(Clone, PartialEq, Eq, Hash, Debug)]
2110#[non_exhaustive]
2112pub struct Projection {
2113 pub expr: Vec<Expr>,
2115 pub input: Arc<LogicalPlan>,
2117 pub schema: DFSchemaRef,
2119}
2120
2121impl PartialOrd for Projection {
2123 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2124 match self.expr.partial_cmp(&other.expr) {
2125 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
2126 cmp => cmp,
2127 }
2128 }
2129}
2130
2131impl Projection {
2132 pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2134 let projection_schema = projection_schema(&input, &expr)?;
2135 Self::try_new_with_schema(expr, input, projection_schema)
2136 }
2137
2138 pub fn try_new_with_schema(
2140 expr: Vec<Expr>,
2141 input: Arc<LogicalPlan>,
2142 schema: DFSchemaRef,
2143 ) -> Result<Self> {
2144 #[expect(deprecated)]
2145 if !expr.iter().any(|e| matches!(e, Expr::Wildcard { .. }))
2146 && expr.len() != schema.fields().len()
2147 {
2148 return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
2149 }
2150 Ok(Self {
2151 expr,
2152 input,
2153 schema,
2154 })
2155 }
2156
2157 pub fn new_from_schema(input: Arc<LogicalPlan>, schema: DFSchemaRef) -> Self {
2159 let expr: Vec<Expr> = schema.columns().into_iter().map(Expr::Column).collect();
2160 Self {
2161 expr,
2162 input,
2163 schema,
2164 }
2165 }
2166}
2167
2168pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
2182 let metadata = input.schema().metadata().clone();
2183
2184 let schema =
2185 DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)?
2186 .with_functional_dependencies(calc_func_dependencies_for_project(
2187 exprs, input,
2188 )?)?;
2189
2190 Ok(Arc::new(schema))
2191}
2192
2193#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2195#[non_exhaustive]
2197pub struct SubqueryAlias {
2198 pub input: Arc<LogicalPlan>,
2200 pub alias: TableReference,
2202 pub schema: DFSchemaRef,
2204}
2205
2206impl SubqueryAlias {
2207 pub fn try_new(
2208 plan: Arc<LogicalPlan>,
2209 alias: impl Into<TableReference>,
2210 ) -> Result<Self> {
2211 let alias = alias.into();
2212 let fields = change_redundant_column(plan.schema().fields());
2213 let meta_data = plan.schema().as_ref().metadata().clone();
2214 let schema: Schema =
2215 DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into();
2216 let func_dependencies = plan.schema().functional_dependencies().clone();
2219 let schema = DFSchemaRef::new(
2220 DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
2221 .with_functional_dependencies(func_dependencies)?,
2222 );
2223 Ok(SubqueryAlias {
2224 input: plan,
2225 alias,
2226 schema,
2227 })
2228 }
2229}
2230
2231impl PartialOrd for SubqueryAlias {
2233 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2234 match self.input.partial_cmp(&other.input) {
2235 Some(Ordering::Equal) => self.alias.partial_cmp(&other.alias),
2236 cmp => cmp,
2237 }
2238 }
2239}
2240
2241#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2253#[non_exhaustive]
2254pub struct Filter {
2255 pub predicate: Expr,
2257 pub input: Arc<LogicalPlan>,
2259}
2260
2261impl Filter {
2262 pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2267 Self::try_new_internal(predicate, input)
2268 }
2269
2270 #[deprecated(since = "48.0.0", note = "Use `try_new` instead")]
2273 pub fn try_new_with_having(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2274 Self::try_new_internal(predicate, input)
2275 }
2276
2277 fn is_allowed_filter_type(data_type: &DataType) -> bool {
2278 match data_type {
2279 DataType::Boolean | DataType::Null => true,
2281 DataType::Dictionary(_, value_type) => {
2282 Filter::is_allowed_filter_type(value_type.as_ref())
2283 }
2284 _ => false,
2285 }
2286 }
2287
2288 fn try_new_internal(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
2289 if let Ok(predicate_type) = predicate.get_type(input.schema()) {
2294 if !Filter::is_allowed_filter_type(&predicate_type) {
2295 return plan_err!(
2296 "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}"
2297 );
2298 }
2299 }
2300
2301 Ok(Self {
2302 predicate: predicate.unalias_nested().data,
2303 input,
2304 })
2305 }
2306
2307 fn is_scalar(&self) -> bool {
2323 let schema = self.input.schema();
2324
2325 let functional_dependencies = self.input.schema().functional_dependencies();
2326 let unique_keys = functional_dependencies.iter().filter(|dep| {
2327 let nullable = dep.nullable
2328 && dep
2329 .source_indices
2330 .iter()
2331 .any(|&source| schema.field(source).is_nullable());
2332 !nullable
2333 && dep.mode == Dependency::Single
2334 && dep.target_indices.len() == schema.fields().len()
2335 });
2336
2337 let exprs = split_conjunction(&self.predicate);
2338 let eq_pred_cols: HashSet<_> = exprs
2339 .iter()
2340 .filter_map(|expr| {
2341 let Expr::BinaryExpr(BinaryExpr {
2342 left,
2343 op: Operator::Eq,
2344 right,
2345 }) = expr
2346 else {
2347 return None;
2348 };
2349 if left == right {
2351 return None;
2352 }
2353
2354 match (left.as_ref(), right.as_ref()) {
2355 (Expr::Column(_), Expr::Column(_)) => None,
2356 (Expr::Column(c), _) | (_, Expr::Column(c)) => {
2357 Some(schema.index_of_column(c).unwrap())
2358 }
2359 _ => None,
2360 }
2361 })
2362 .collect();
2363
2364 for key in unique_keys {
2367 if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) {
2368 return true;
2369 }
2370 }
2371 false
2372 }
2373}
2374
2375#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2390pub struct Window {
2391 pub input: Arc<LogicalPlan>,
2393 pub window_expr: Vec<Expr>,
2395 pub schema: DFSchemaRef,
2397}
2398
2399impl Window {
2400 pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
2402 let fields: Vec<(Option<TableReference>, Arc<Field>)> = input
2403 .schema()
2404 .iter()
2405 .map(|(q, f)| (q.cloned(), Arc::clone(f)))
2406 .collect();
2407 let input_len = fields.len();
2408 let mut window_fields = fields;
2409 let expr_fields = exprlist_to_fields(window_expr.as_slice(), &input)?;
2410 window_fields.extend_from_slice(expr_fields.as_slice());
2411 let metadata = input.schema().metadata().clone();
2412
2413 let mut window_func_dependencies =
2415 input.schema().functional_dependencies().clone();
2416 window_func_dependencies.extend_target_indices(window_fields.len());
2417
2418 let mut new_dependencies = window_expr
2422 .iter()
2423 .enumerate()
2424 .filter_map(|(idx, expr)| {
2425 let Expr::WindowFunction(window_fun) = expr else {
2426 return None;
2427 };
2428 let WindowFunction {
2429 fun: WindowFunctionDefinition::WindowUDF(udwf),
2430 params: WindowFunctionParams { partition_by, .. },
2431 } = window_fun.as_ref()
2432 else {
2433 return None;
2434 };
2435 if udwf.name() == "row_number" && partition_by.is_empty() {
2438 Some(idx + input_len)
2439 } else {
2440 None
2441 }
2442 })
2443 .map(|idx| {
2444 FunctionalDependence::new(vec![idx], vec![], false)
2445 .with_mode(Dependency::Single)
2446 })
2447 .collect::<Vec<_>>();
2448
2449 if !new_dependencies.is_empty() {
2450 for dependence in new_dependencies.iter_mut() {
2451 dependence.target_indices = (0..window_fields.len()).collect();
2452 }
2453 let new_deps = FunctionalDependencies::new(new_dependencies);
2455 window_func_dependencies.extend(new_deps);
2456 }
2457
2458 Self::try_new_with_schema(
2459 window_expr,
2460 input,
2461 Arc::new(
2462 DFSchema::new_with_metadata(window_fields, metadata)?
2463 .with_functional_dependencies(window_func_dependencies)?,
2464 ),
2465 )
2466 }
2467
2468 pub fn try_new_with_schema(
2469 window_expr: Vec<Expr>,
2470 input: Arc<LogicalPlan>,
2471 schema: DFSchemaRef,
2472 ) -> Result<Self> {
2473 if window_expr.len() != schema.fields().len() - input.schema().fields().len() {
2474 return plan_err!(
2475 "Window has mismatch between number of expressions ({}) and number of fields in schema ({})",
2476 window_expr.len(),
2477 schema.fields().len() - input.schema().fields().len()
2478 );
2479 }
2480
2481 Ok(Window {
2482 input,
2483 window_expr,
2484 schema,
2485 })
2486 }
2487}
2488
2489impl PartialOrd for Window {
2491 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2492 match self.input.partial_cmp(&other.input) {
2493 Some(Ordering::Equal) => self.window_expr.partial_cmp(&other.window_expr),
2494 cmp => cmp,
2495 }
2496 }
2497}
2498
2499#[derive(Clone)]
2501pub struct TableScan {
2502 pub table_name: TableReference,
2504 pub source: Arc<dyn TableSource>,
2506 pub projection: Option<Vec<usize>>,
2508 pub projected_schema: DFSchemaRef,
2510 pub filters: Vec<Expr>,
2512 pub fetch: Option<usize>,
2514}
2515
2516impl Debug for TableScan {
2517 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
2518 f.debug_struct("TableScan")
2519 .field("table_name", &self.table_name)
2520 .field("source", &"...")
2521 .field("projection", &self.projection)
2522 .field("projected_schema", &self.projected_schema)
2523 .field("filters", &self.filters)
2524 .field("fetch", &self.fetch)
2525 .finish_non_exhaustive()
2526 }
2527}
2528
2529impl PartialEq for TableScan {
2530 fn eq(&self, other: &Self) -> bool {
2531 self.table_name == other.table_name
2532 && self.projection == other.projection
2533 && self.projected_schema == other.projected_schema
2534 && self.filters == other.filters
2535 && self.fetch == other.fetch
2536 }
2537}
2538
2539impl Eq for TableScan {}
2540
2541impl PartialOrd for TableScan {
2544 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2545 #[derive(PartialEq, PartialOrd)]
2546 struct ComparableTableScan<'a> {
2547 pub table_name: &'a TableReference,
2549 pub projection: &'a Option<Vec<usize>>,
2551 pub filters: &'a Vec<Expr>,
2553 pub fetch: &'a Option<usize>,
2555 }
2556 let comparable_self = ComparableTableScan {
2557 table_name: &self.table_name,
2558 projection: &self.projection,
2559 filters: &self.filters,
2560 fetch: &self.fetch,
2561 };
2562 let comparable_other = ComparableTableScan {
2563 table_name: &other.table_name,
2564 projection: &other.projection,
2565 filters: &other.filters,
2566 fetch: &other.fetch,
2567 };
2568 comparable_self.partial_cmp(&comparable_other)
2569 }
2570}
2571
2572impl Hash for TableScan {
2573 fn hash<H: Hasher>(&self, state: &mut H) {
2574 self.table_name.hash(state);
2575 self.projection.hash(state);
2576 self.projected_schema.hash(state);
2577 self.filters.hash(state);
2578 self.fetch.hash(state);
2579 }
2580}
2581
2582impl TableScan {
2583 pub fn try_new(
2586 table_name: impl Into<TableReference>,
2587 table_source: Arc<dyn TableSource>,
2588 projection: Option<Vec<usize>>,
2589 filters: Vec<Expr>,
2590 fetch: Option<usize>,
2591 ) -> Result<Self> {
2592 let table_name = table_name.into();
2593
2594 if table_name.table().is_empty() {
2595 return plan_err!("table_name cannot be empty");
2596 }
2597 let schema = table_source.schema();
2598 let func_dependencies = FunctionalDependencies::new_from_constraints(
2599 table_source.constraints(),
2600 schema.fields.len(),
2601 );
2602 let projected_schema = projection
2603 .as_ref()
2604 .map(|p| {
2605 let projected_func_dependencies =
2606 func_dependencies.project_functional_dependencies(p, p.len());
2607
2608 let df_schema = DFSchema::new_with_metadata(
2609 p.iter()
2610 .map(|i| {
2611 (Some(table_name.clone()), Arc::new(schema.field(*i).clone()))
2612 })
2613 .collect(),
2614 schema.metadata.clone(),
2615 )?;
2616 df_schema.with_functional_dependencies(projected_func_dependencies)
2617 })
2618 .unwrap_or_else(|| {
2619 let df_schema =
2620 DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?;
2621 df_schema.with_functional_dependencies(func_dependencies)
2622 })?;
2623 let projected_schema = Arc::new(projected_schema);
2624
2625 Ok(Self {
2626 table_name,
2627 source: table_source,
2628 projection,
2629 projected_schema,
2630 filters,
2631 fetch,
2632 })
2633 }
2634}
2635
2636#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2638pub struct Repartition {
2639 pub input: Arc<LogicalPlan>,
2641 pub partitioning_scheme: Partitioning,
2643}
2644
2645#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2647pub struct Union {
2648 pub inputs: Vec<Arc<LogicalPlan>>,
2650 pub schema: DFSchemaRef,
2652}
2653
2654impl Union {
2655 fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2657 let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
2658 Ok(Union { inputs, schema })
2659 }
2660
2661 pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2666 let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
2667 Ok(Union { inputs, schema })
2668 }
2669
2670 pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2674 let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
2675 let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;
2676
2677 Ok(Union { inputs, schema })
2678 }
2679
2680 fn rewrite_inputs_from_schema(
2684 schema: &Arc<DFSchema>,
2685 inputs: Vec<Arc<LogicalPlan>>,
2686 ) -> Result<Vec<Arc<LogicalPlan>>> {
2687 let schema_width = schema.iter().count();
2688 let mut wrapped_inputs = Vec::with_capacity(inputs.len());
2689 for input in inputs {
2690 let mut expr = Vec::with_capacity(schema_width);
2694 for column in schema.columns() {
2695 if input
2696 .schema()
2697 .has_column_with_unqualified_name(column.name())
2698 {
2699 expr.push(Expr::Column(column));
2700 } else {
2701 expr.push(
2702 Expr::Literal(ScalarValue::Null, None).alias(column.name()),
2703 );
2704 }
2705 }
2706 wrapped_inputs.push(Arc::new(LogicalPlan::Projection(
2707 Projection::try_new_with_schema(expr, input, Arc::clone(schema))?,
2708 )));
2709 }
2710
2711 Ok(wrapped_inputs)
2712 }
2713
2714 fn derive_schema_from_inputs(
2723 inputs: &[Arc<LogicalPlan>],
2724 loose_types: bool,
2725 by_name: bool,
2726 ) -> Result<DFSchemaRef> {
2727 if inputs.len() < 2 {
2728 return plan_err!("UNION requires at least two inputs");
2729 }
2730
2731 if by_name {
2732 Self::derive_schema_from_inputs_by_name(inputs, loose_types)
2733 } else {
2734 Self::derive_schema_from_inputs_by_position(inputs, loose_types)
2735 }
2736 }
2737
2738 fn derive_schema_from_inputs_by_name(
2739 inputs: &[Arc<LogicalPlan>],
2740 loose_types: bool,
2741 ) -> Result<DFSchemaRef> {
2742 type FieldData<'a> =
2743 (&'a DataType, bool, Vec<&'a HashMap<String, String>>, usize);
2744 let mut cols: Vec<(&str, FieldData)> = Vec::new();
2745 for input in inputs.iter() {
2746 for field in input.schema().fields() {
2747 if let Some((_, (data_type, is_nullable, metadata, occurrences))) =
2748 cols.iter_mut().find(|(name, _)| name == field.name())
2749 {
2750 if !loose_types && *data_type != field.data_type() {
2751 return plan_err!(
2752 "Found different types for field {}",
2753 field.name()
2754 );
2755 }
2756
2757 metadata.push(field.metadata());
2758 *is_nullable |= field.is_nullable();
2761 *occurrences += 1;
2762 } else {
2763 cols.push((
2764 field.name(),
2765 (
2766 field.data_type(),
2767 field.is_nullable(),
2768 vec![field.metadata()],
2769 1,
2770 ),
2771 ));
2772 }
2773 }
2774 }
2775
2776 let union_fields = cols
2777 .into_iter()
2778 .map(
2779 |(name, (data_type, is_nullable, unmerged_metadata, occurrences))| {
2780 let final_is_nullable = if occurrences == inputs.len() {
2784 is_nullable
2785 } else {
2786 true
2787 };
2788
2789 let mut field =
2790 Field::new(name, data_type.clone(), final_is_nullable);
2791 field.set_metadata(intersect_maps(unmerged_metadata));
2792
2793 (None, Arc::new(field))
2794 },
2795 )
2796 .collect::<Vec<(Option<TableReference>, _)>>();
2797
2798 let union_schema_metadata =
2799 intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2800
2801 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2803 let schema = Arc::new(schema);
2804
2805 Ok(schema)
2806 }
2807
2808 fn derive_schema_from_inputs_by_position(
2809 inputs: &[Arc<LogicalPlan>],
2810 loose_types: bool,
2811 ) -> Result<DFSchemaRef> {
2812 let first_schema = inputs[0].schema();
2813 let fields_count = first_schema.fields().len();
2814 for input in inputs.iter().skip(1) {
2815 if fields_count != input.schema().fields().len() {
2816 return plan_err!(
2817 "UNION queries have different number of columns: \
2818 left has {} columns whereas right has {} columns",
2819 fields_count,
2820 input.schema().fields().len()
2821 );
2822 }
2823 }
2824
2825 let mut name_counts: HashMap<String, usize> = HashMap::new();
2826 let union_fields = (0..fields_count)
2827 .map(|i| {
2828 let fields = inputs
2829 .iter()
2830 .map(|input| input.schema().field(i))
2831 .collect::<Vec<_>>();
2832 let first_field = fields[0];
2833 let base_name = first_field.name().to_string();
2834
2835 let data_type = if loose_types {
2836 first_field.data_type()
2840 } else {
2841 fields.iter().skip(1).try_fold(
2842 first_field.data_type(),
2843 |acc, field| {
2844 if acc != field.data_type() {
2845 return plan_err!(
2846 "UNION field {i} have different type in inputs: \
2847 left has {} whereas right has {}",
2848 first_field.data_type(),
2849 field.data_type()
2850 );
2851 }
2852 Ok(acc)
2853 },
2854 )?
2855 };
2856 let nullable = fields.iter().any(|field| field.is_nullable());
2857
2858 let name = if let Some(count) = name_counts.get_mut(&base_name) {
2860 *count += 1;
2861 format!("{base_name}_{count}")
2862 } else {
2863 name_counts.insert(base_name.clone(), 0);
2864 base_name
2865 };
2866
2867 let mut field = Field::new(&name, data_type.clone(), nullable);
2868 let field_metadata =
2869 intersect_maps(fields.iter().map(|field| field.metadata()));
2870 field.set_metadata(field_metadata);
2871 Ok((None, Arc::new(field)))
2872 })
2873 .collect::<Result<_>>()?;
2874 let union_schema_metadata =
2875 intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2876
2877 let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2879 let schema = Arc::new(schema);
2880
2881 Ok(schema)
2882 }
2883}
2884
2885fn intersect_maps<'a>(
2886 inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
2887) -> HashMap<String, String> {
2888 let mut inputs = inputs.into_iter();
2889 let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
2890 for input in inputs {
2891 merged.retain(|k, v| input.get(k) == Some(&*v));
2896 }
2897 merged
2898}
2899
2900impl PartialOrd for Union {
2902 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2903 self.inputs.partial_cmp(&other.inputs)
2904 }
2905}
2906
2907#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2930pub struct DescribeTable {
2931 pub schema: Arc<Schema>,
2933 pub output_schema: DFSchemaRef,
2935}
2936
2937impl PartialOrd for DescribeTable {
2940 fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
2941 None
2943 }
2944}
2945
2946#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2948pub enum ExplainFormat {
2949 Indent,
2966 Tree,
2990 PostgresJSON,
3038 Graphviz,
3075}
3076
3077impl FromStr for ExplainFormat {
3079 type Err = DataFusionError;
3080
3081 fn from_str(format: &str) -> std::result::Result<Self, Self::Err> {
3082 match format.to_lowercase().as_str() {
3083 "indent" => Ok(ExplainFormat::Indent),
3084 "tree" => Ok(ExplainFormat::Tree),
3085 "pgjson" => Ok(ExplainFormat::PostgresJSON),
3086 "graphviz" => Ok(ExplainFormat::Graphviz),
3087 _ => {
3088 plan_err!("Invalid explain format. Expected 'indent', 'tree', 'pgjson' or 'graphviz'. Got '{format}'")
3089 }
3090 }
3091 }
3092}
3093
3094#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3101pub struct Explain {
3102 pub verbose: bool,
3104 pub explain_format: ExplainFormat,
3107 pub plan: Arc<LogicalPlan>,
3109 pub stringified_plans: Vec<StringifiedPlan>,
3111 pub schema: DFSchemaRef,
3113 pub logical_optimization_succeeded: bool,
3115}
3116
3117impl PartialOrd for Explain {
3119 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3120 #[derive(PartialEq, PartialOrd)]
3121 struct ComparableExplain<'a> {
3122 pub verbose: &'a bool,
3124 pub plan: &'a Arc<LogicalPlan>,
3126 pub stringified_plans: &'a Vec<StringifiedPlan>,
3128 pub logical_optimization_succeeded: &'a bool,
3130 }
3131 let comparable_self = ComparableExplain {
3132 verbose: &self.verbose,
3133 plan: &self.plan,
3134 stringified_plans: &self.stringified_plans,
3135 logical_optimization_succeeded: &self.logical_optimization_succeeded,
3136 };
3137 let comparable_other = ComparableExplain {
3138 verbose: &other.verbose,
3139 plan: &other.plan,
3140 stringified_plans: &other.stringified_plans,
3141 logical_optimization_succeeded: &other.logical_optimization_succeeded,
3142 };
3143 comparable_self.partial_cmp(&comparable_other)
3144 }
3145}
3146
3147#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3150pub struct Analyze {
3151 pub verbose: bool,
3153 pub input: Arc<LogicalPlan>,
3155 pub schema: DFSchemaRef,
3157}
3158
3159impl PartialOrd for Analyze {
3161 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3162 match self.verbose.partial_cmp(&other.verbose) {
3163 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
3164 cmp => cmp,
3165 }
3166 }
3167}
3168
3169#[allow(clippy::derived_hash_with_manual_eq)]
3174#[derive(Debug, Clone, Eq, Hash)]
3175pub struct Extension {
3176 pub node: Arc<dyn UserDefinedLogicalNode>,
3178}
3179
3180impl PartialEq for Extension {
3184 fn eq(&self, other: &Self) -> bool {
3185 self.node.eq(&other.node)
3186 }
3187}
3188
3189impl PartialOrd for Extension {
3190 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3191 self.node.partial_cmp(&other.node)
3192 }
3193}
3194
3195#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3197pub struct Limit {
3198 pub skip: Option<Box<Expr>>,
3200 pub fetch: Option<Box<Expr>>,
3203 pub input: Arc<LogicalPlan>,
3205}
3206
3207pub enum SkipType {
3209 Literal(usize),
3211 UnsupportedExpr,
3213}
3214
3215pub enum FetchType {
3217 Literal(Option<usize>),
3220 UnsupportedExpr,
3222}
3223
3224impl Limit {
3225 pub fn get_skip_type(&self) -> Result<SkipType> {
3227 match self.skip.as_deref() {
3228 Some(expr) => match *expr {
3229 Expr::Literal(ScalarValue::Int64(s), _) => {
3230 let s = s.unwrap_or(0);
3232 if s >= 0 {
3233 Ok(SkipType::Literal(s as usize))
3234 } else {
3235 plan_err!("OFFSET must be >=0, '{}' was provided", s)
3236 }
3237 }
3238 _ => Ok(SkipType::UnsupportedExpr),
3239 },
3240 None => Ok(SkipType::Literal(0)),
3242 }
3243 }
3244
3245 pub fn get_fetch_type(&self) -> Result<FetchType> {
3247 match self.fetch.as_deref() {
3248 Some(expr) => match *expr {
3249 Expr::Literal(ScalarValue::Int64(Some(s)), _) => {
3250 if s >= 0 {
3251 Ok(FetchType::Literal(Some(s as usize)))
3252 } else {
3253 plan_err!("LIMIT must be >= 0, '{}' was provided", s)
3254 }
3255 }
3256 Expr::Literal(ScalarValue::Int64(None), _) => {
3257 Ok(FetchType::Literal(None))
3258 }
3259 _ => Ok(FetchType::UnsupportedExpr),
3260 },
3261 None => Ok(FetchType::Literal(None)),
3262 }
3263 }
3264}
3265
3266#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3268pub enum Distinct {
3269 All(Arc<LogicalPlan>),
3271 On(DistinctOn),
3273}
3274
3275impl Distinct {
3276 pub fn input(&self) -> &Arc<LogicalPlan> {
3278 match self {
3279 Distinct::All(input) => input,
3280 Distinct::On(DistinctOn { input, .. }) => input,
3281 }
3282 }
3283}
3284
3285#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3287pub struct DistinctOn {
3288 pub on_expr: Vec<Expr>,
3290 pub select_expr: Vec<Expr>,
3292 pub sort_expr: Option<Vec<SortExpr>>,
3296 pub input: Arc<LogicalPlan>,
3298 pub schema: DFSchemaRef,
3300}
3301
3302impl DistinctOn {
3303 pub fn try_new(
3305 on_expr: Vec<Expr>,
3306 select_expr: Vec<Expr>,
3307 sort_expr: Option<Vec<SortExpr>>,
3308 input: Arc<LogicalPlan>,
3309 ) -> Result<Self> {
3310 if on_expr.is_empty() {
3311 return plan_err!("No `ON` expressions provided");
3312 }
3313
3314 let on_expr = normalize_cols(on_expr, input.as_ref())?;
3315 let qualified_fields = exprlist_to_fields(select_expr.as_slice(), &input)?
3316 .into_iter()
3317 .collect();
3318
3319 let dfschema = DFSchema::new_with_metadata(
3320 qualified_fields,
3321 input.schema().metadata().clone(),
3322 )?;
3323
3324 let mut distinct_on = DistinctOn {
3325 on_expr,
3326 select_expr,
3327 sort_expr: None,
3328 input,
3329 schema: Arc::new(dfschema),
3330 };
3331
3332 if let Some(sort_expr) = sort_expr {
3333 distinct_on = distinct_on.with_sort_expr(sort_expr)?;
3334 }
3335
3336 Ok(distinct_on)
3337 }
3338
3339 pub fn with_sort_expr(mut self, sort_expr: Vec<SortExpr>) -> Result<Self> {
3343 let sort_expr = normalize_sorts(sort_expr, self.input.as_ref())?;
3344
3345 let mut matched = true;
3347 for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) {
3348 if on != &sort.expr {
3349 matched = false;
3350 break;
3351 }
3352 }
3353
3354 if self.on_expr.len() > sort_expr.len() || !matched {
3355 return plan_err!(
3356 "SELECT DISTINCT ON expressions must match initial ORDER BY expressions"
3357 );
3358 }
3359
3360 self.sort_expr = Some(sort_expr);
3361 Ok(self)
3362 }
3363}
3364
3365impl PartialOrd for DistinctOn {
3367 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3368 #[derive(PartialEq, PartialOrd)]
3369 struct ComparableDistinctOn<'a> {
3370 pub on_expr: &'a Vec<Expr>,
3372 pub select_expr: &'a Vec<Expr>,
3374 pub sort_expr: &'a Option<Vec<SortExpr>>,
3378 pub input: &'a Arc<LogicalPlan>,
3380 }
3381 let comparable_self = ComparableDistinctOn {
3382 on_expr: &self.on_expr,
3383 select_expr: &self.select_expr,
3384 sort_expr: &self.sort_expr,
3385 input: &self.input,
3386 };
3387 let comparable_other = ComparableDistinctOn {
3388 on_expr: &other.on_expr,
3389 select_expr: &other.select_expr,
3390 sort_expr: &other.sort_expr,
3391 input: &other.input,
3392 };
3393 comparable_self.partial_cmp(&comparable_other)
3394 }
3395}
3396
3397#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3410#[non_exhaustive]
3412pub struct Aggregate {
3413 pub input: Arc<LogicalPlan>,
3415 pub group_expr: Vec<Expr>,
3417 pub aggr_expr: Vec<Expr>,
3419 pub schema: DFSchemaRef,
3421}
3422
3423impl Aggregate {
3424 pub fn try_new(
3426 input: Arc<LogicalPlan>,
3427 group_expr: Vec<Expr>,
3428 aggr_expr: Vec<Expr>,
3429 ) -> Result<Self> {
3430 let group_expr = enumerate_grouping_sets(group_expr)?;
3431
3432 let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
3433
3434 let grouping_expr: Vec<&Expr> = grouping_set_to_exprlist(group_expr.as_slice())?;
3435
3436 let mut qualified_fields = exprlist_to_fields(grouping_expr, &input)?;
3437
3438 if is_grouping_set {
3440 qualified_fields = qualified_fields
3441 .into_iter()
3442 .map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
3443 .collect::<Vec<_>>();
3444 qualified_fields.push((
3445 None,
3446 Field::new(
3447 Self::INTERNAL_GROUPING_ID,
3448 Self::grouping_id_type(qualified_fields.len()),
3449 false,
3450 )
3451 .into(),
3452 ));
3453 }
3454
3455 qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), &input)?);
3456
3457 let schema = DFSchema::new_with_metadata(
3458 qualified_fields,
3459 input.schema().metadata().clone(),
3460 )?;
3461
3462 Self::try_new_with_schema(input, group_expr, aggr_expr, Arc::new(schema))
3463 }
3464
3465 pub fn try_new_with_schema(
3471 input: Arc<LogicalPlan>,
3472 group_expr: Vec<Expr>,
3473 aggr_expr: Vec<Expr>,
3474 schema: DFSchemaRef,
3475 ) -> Result<Self> {
3476 if group_expr.is_empty() && aggr_expr.is_empty() {
3477 return plan_err!(
3478 "Aggregate requires at least one grouping or aggregate expression"
3479 );
3480 }
3481 let group_expr_count = grouping_set_expr_count(&group_expr)?;
3482 if schema.fields().len() != group_expr_count + aggr_expr.len() {
3483 return plan_err!(
3484 "Aggregate schema has wrong number of fields. Expected {} got {}",
3485 group_expr_count + aggr_expr.len(),
3486 schema.fields().len()
3487 );
3488 }
3489
3490 let aggregate_func_dependencies =
3491 calc_func_dependencies_for_aggregate(&group_expr, &input, &schema)?;
3492 let new_schema = schema.as_ref().clone();
3493 let schema = Arc::new(
3494 new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
3495 );
3496 Ok(Self {
3497 input,
3498 group_expr,
3499 aggr_expr,
3500 schema,
3501 })
3502 }
3503
3504 fn is_grouping_set(&self) -> bool {
3505 matches!(self.group_expr.as_slice(), [Expr::GroupingSet(_)])
3506 }
3507
3508 fn output_expressions(&self) -> Result<Vec<&Expr>> {
3510 static INTERNAL_ID_EXPR: LazyLock<Expr> = LazyLock::new(|| {
3511 Expr::Column(Column::from_name(Aggregate::INTERNAL_GROUPING_ID))
3512 });
3513 let mut exprs = grouping_set_to_exprlist(self.group_expr.as_slice())?;
3514 if self.is_grouping_set() {
3515 exprs.push(&INTERNAL_ID_EXPR);
3516 }
3517 exprs.extend(self.aggr_expr.iter());
3518 debug_assert!(exprs.len() == self.schema.fields().len());
3519 Ok(exprs)
3520 }
3521
3522 pub fn group_expr_len(&self) -> Result<usize> {
3526 grouping_set_expr_count(&self.group_expr)
3527 }
3528
3529 pub fn grouping_id_type(group_exprs: usize) -> DataType {
3534 if group_exprs <= 8 {
3535 DataType::UInt8
3536 } else if group_exprs <= 16 {
3537 DataType::UInt16
3538 } else if group_exprs <= 32 {
3539 DataType::UInt32
3540 } else {
3541 DataType::UInt64
3542 }
3543 }
3544
3545 pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
3563}
3564
3565impl PartialOrd for Aggregate {
3567 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3568 match self.input.partial_cmp(&other.input) {
3569 Some(Ordering::Equal) => {
3570 match self.group_expr.partial_cmp(&other.group_expr) {
3571 Some(Ordering::Equal) => self.aggr_expr.partial_cmp(&other.aggr_expr),
3572 cmp => cmp,
3573 }
3574 }
3575 cmp => cmp,
3576 }
3577 }
3578}
3579
3580fn contains_grouping_set(group_expr: &[Expr]) -> bool {
3582 group_expr
3583 .iter()
3584 .any(|expr| matches!(expr, Expr::GroupingSet(_)))
3585}
3586
3587fn calc_func_dependencies_for_aggregate(
3589 group_expr: &[Expr],
3591 input: &LogicalPlan,
3593 aggr_schema: &DFSchema,
3595) -> Result<FunctionalDependencies> {
3596 if !contains_grouping_set(group_expr) {
3602 let group_by_expr_names = group_expr
3603 .iter()
3604 .map(|item| item.schema_name().to_string())
3605 .collect::<IndexSet<_>>()
3606 .into_iter()
3607 .collect::<Vec<_>>();
3608 let aggregate_func_dependencies = aggregate_functional_dependencies(
3609 input.schema(),
3610 &group_by_expr_names,
3611 aggr_schema,
3612 );
3613 Ok(aggregate_func_dependencies)
3614 } else {
3615 Ok(FunctionalDependencies::empty())
3616 }
3617}
3618
3619fn calc_func_dependencies_for_project(
3622 exprs: &[Expr],
3623 input: &LogicalPlan,
3624) -> Result<FunctionalDependencies> {
3625 let input_fields = input.schema().field_names();
3626 let proj_indices = exprs
3628 .iter()
3629 .map(|expr| match expr {
3630 #[expect(deprecated)]
3631 Expr::Wildcard { qualifier, options } => {
3632 let wildcard_fields = exprlist_to_fields(
3633 vec![&Expr::Wildcard {
3634 qualifier: qualifier.clone(),
3635 options: options.clone(),
3636 }],
3637 input,
3638 )?;
3639 Ok::<_, DataFusionError>(
3640 wildcard_fields
3641 .into_iter()
3642 .filter_map(|(qualifier, f)| {
3643 let flat_name = qualifier
3644 .map(|t| format!("{}.{}", t, f.name()))
3645 .unwrap_or_else(|| f.name().clone());
3646 input_fields.iter().position(|item| *item == flat_name)
3647 })
3648 .collect::<Vec<_>>(),
3649 )
3650 }
3651 Expr::Alias(alias) => {
3652 let name = format!("{}", alias.expr);
3653 Ok(input_fields
3654 .iter()
3655 .position(|item| *item == name)
3656 .map(|i| vec![i])
3657 .unwrap_or(vec![]))
3658 }
3659 _ => {
3660 let name = format!("{expr}");
3661 Ok(input_fields
3662 .iter()
3663 .position(|item| *item == name)
3664 .map(|i| vec![i])
3665 .unwrap_or(vec![]))
3666 }
3667 })
3668 .collect::<Result<Vec<_>>>()?
3669 .into_iter()
3670 .flatten()
3671 .collect::<Vec<_>>();
3672
3673 Ok(input
3674 .schema()
3675 .functional_dependencies()
3676 .project_functional_dependencies(&proj_indices, exprs.len()))
3677}
3678
3679#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3681pub struct Sort {
3682 pub expr: Vec<SortExpr>,
3684 pub input: Arc<LogicalPlan>,
3686 pub fetch: Option<usize>,
3688}
3689
3690#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3692pub struct Join {
3693 pub left: Arc<LogicalPlan>,
3695 pub right: Arc<LogicalPlan>,
3697 pub on: Vec<(Expr, Expr)>,
3699 pub filter: Option<Expr>,
3701 pub join_type: JoinType,
3703 pub join_constraint: JoinConstraint,
3705 pub schema: DFSchemaRef,
3707 pub null_equals_null: bool,
3709}
3710
3711impl Join {
3712 pub fn try_new(
3731 left: Arc<LogicalPlan>,
3732 right: Arc<LogicalPlan>,
3733 on: Vec<(Expr, Expr)>,
3734 filter: Option<Expr>,
3735 join_type: JoinType,
3736 join_constraint: JoinConstraint,
3737 null_equals_null: bool,
3738 ) -> Result<Self> {
3739 let join_schema = build_join_schema(left.schema(), right.schema(), &join_type)?;
3740
3741 Ok(Join {
3742 left,
3743 right,
3744 on,
3745 filter,
3746 join_type,
3747 join_constraint,
3748 schema: Arc::new(join_schema),
3749 null_equals_null,
3750 })
3751 }
3752
3753 pub fn try_new_with_project_input(
3755 original: &LogicalPlan,
3756 left: Arc<LogicalPlan>,
3757 right: Arc<LogicalPlan>,
3758 column_on: (Vec<Column>, Vec<Column>),
3759 ) -> Result<Self> {
3760 let original_join = match original {
3761 LogicalPlan::Join(join) => join,
3762 _ => return plan_err!("Could not create join with project input"),
3763 };
3764
3765 let on: Vec<(Expr, Expr)> = column_on
3766 .0
3767 .into_iter()
3768 .zip(column_on.1)
3769 .map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
3770 .collect();
3771 let join_schema =
3772 build_join_schema(left.schema(), right.schema(), &original_join.join_type)?;
3773
3774 Ok(Join {
3775 left,
3776 right,
3777 on,
3778 filter: original_join.filter.clone(),
3779 join_type: original_join.join_type,
3780 join_constraint: original_join.join_constraint,
3781 schema: Arc::new(join_schema),
3782 null_equals_null: original_join.null_equals_null,
3783 })
3784 }
3785}
3786
3787impl PartialOrd for Join {
3789 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3790 #[derive(PartialEq, PartialOrd)]
3791 struct ComparableJoin<'a> {
3792 pub left: &'a Arc<LogicalPlan>,
3794 pub right: &'a Arc<LogicalPlan>,
3796 pub on: &'a Vec<(Expr, Expr)>,
3798 pub filter: &'a Option<Expr>,
3800 pub join_type: &'a JoinType,
3802 pub join_constraint: &'a JoinConstraint,
3804 pub null_equals_null: &'a bool,
3806 }
3807 let comparable_self = ComparableJoin {
3808 left: &self.left,
3809 right: &self.right,
3810 on: &self.on,
3811 filter: &self.filter,
3812 join_type: &self.join_type,
3813 join_constraint: &self.join_constraint,
3814 null_equals_null: &self.null_equals_null,
3815 };
3816 let comparable_other = ComparableJoin {
3817 left: &other.left,
3818 right: &other.right,
3819 on: &other.on,
3820 filter: &other.filter,
3821 join_type: &other.join_type,
3822 join_constraint: &other.join_constraint,
3823 null_equals_null: &other.null_equals_null,
3824 };
3825 comparable_self.partial_cmp(&comparable_other)
3826 }
3827}
3828
3829#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
3831pub struct Subquery {
3832 pub subquery: Arc<LogicalPlan>,
3834 pub outer_ref_columns: Vec<Expr>,
3836 pub spans: Spans,
3838}
3839
3840impl Normalizeable for Subquery {
3841 fn can_normalize(&self) -> bool {
3842 false
3843 }
3844}
3845
3846impl NormalizeEq for Subquery {
3847 fn normalize_eq(&self, other: &Self) -> bool {
3848 *self.subquery == *other.subquery
3850 && self.outer_ref_columns.len() == other.outer_ref_columns.len()
3851 && self
3852 .outer_ref_columns
3853 .iter()
3854 .zip(other.outer_ref_columns.iter())
3855 .all(|(a, b)| a.normalize_eq(b))
3856 }
3857}
3858
3859impl Subquery {
3860 pub fn try_from_expr(plan: &Expr) -> Result<&Subquery> {
3861 match plan {
3862 Expr::ScalarSubquery(it) => Ok(it),
3863 Expr::Cast(cast) => Subquery::try_from_expr(cast.expr.as_ref()),
3864 _ => plan_err!("Could not coerce into ScalarSubquery!"),
3865 }
3866 }
3867
3868 pub fn with_plan(&self, plan: Arc<LogicalPlan>) -> Subquery {
3869 Subquery {
3870 subquery: plan,
3871 outer_ref_columns: self.outer_ref_columns.clone(),
3872 spans: Spans::new(),
3873 }
3874 }
3875}
3876
3877impl Debug for Subquery {
3878 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3879 write!(f, "<subquery>")
3880 }
3881}
3882
3883#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
3889pub enum Partitioning {
3890 RoundRobinBatch(usize),
3892 Hash(Vec<Expr>, usize),
3895 DistributeBy(Vec<Expr>),
3897}
3898
3899#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
3919pub struct ColumnUnnestList {
3920 pub output_column: Column,
3921 pub depth: usize,
3922}
3923
3924impl Display for ColumnUnnestList {
3925 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3926 write!(f, "{}|depth={}", self.output_column, self.depth)
3927 }
3928}
3929
3930#[derive(Debug, Clone, PartialEq, Eq, Hash)]
3933pub struct Unnest {
3934 pub input: Arc<LogicalPlan>,
3936 pub exec_columns: Vec<Column>,
3938 pub list_type_columns: Vec<(usize, ColumnUnnestList)>,
3941 pub struct_type_columns: Vec<usize>,
3944 pub dependency_indices: Vec<usize>,
3947 pub schema: DFSchemaRef,
3949 pub options: UnnestOptions,
3951}
3952
3953impl PartialOrd for Unnest {
3955 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
3956 #[derive(PartialEq, PartialOrd)]
3957 struct ComparableUnnest<'a> {
3958 pub input: &'a Arc<LogicalPlan>,
3960 pub exec_columns: &'a Vec<Column>,
3962 pub list_type_columns: &'a Vec<(usize, ColumnUnnestList)>,
3965 pub struct_type_columns: &'a Vec<usize>,
3968 pub dependency_indices: &'a Vec<usize>,
3971 pub options: &'a UnnestOptions,
3973 }
3974 let comparable_self = ComparableUnnest {
3975 input: &self.input,
3976 exec_columns: &self.exec_columns,
3977 list_type_columns: &self.list_type_columns,
3978 struct_type_columns: &self.struct_type_columns,
3979 dependency_indices: &self.dependency_indices,
3980 options: &self.options,
3981 };
3982 let comparable_other = ComparableUnnest {
3983 input: &other.input,
3984 exec_columns: &other.exec_columns,
3985 list_type_columns: &other.list_type_columns,
3986 struct_type_columns: &other.struct_type_columns,
3987 dependency_indices: &other.dependency_indices,
3988 options: &other.options,
3989 };
3990 comparable_self.partial_cmp(&comparable_other)
3991 }
3992}
3993
3994#[cfg(test)]
3995mod tests {
3996
3997 use super::*;
3998 use crate::builder::LogicalTableSource;
3999 use crate::logical_plan::table_scan;
4000 use crate::{
4001 binary_expr, col, exists, in_subquery, lit, placeholder, scalar_subquery,
4002 GroupingSet,
4003 };
4004
4005 use datafusion_common::tree_node::{
4006 TransformedResult, TreeNodeRewriter, TreeNodeVisitor,
4007 };
4008 use datafusion_common::{not_impl_err, Constraint, ScalarValue};
4009 use insta::{assert_debug_snapshot, assert_snapshot};
4010
4011 use crate::test::function_stub::count;
4012
4013 fn employee_schema() -> Schema {
4014 Schema::new(vec![
4015 Field::new("id", DataType::Int32, false),
4016 Field::new("first_name", DataType::Utf8, false),
4017 Field::new("last_name", DataType::Utf8, false),
4018 Field::new("state", DataType::Utf8, false),
4019 Field::new("salary", DataType::Int32, false),
4020 ])
4021 }
4022
4023 fn display_plan() -> Result<LogicalPlan> {
4024 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4025 .build()?;
4026
4027 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4028 .filter(in_subquery(col("state"), Arc::new(plan1)))?
4029 .project(vec![col("id")])?
4030 .build()
4031 }
4032
4033 #[test]
4034 fn test_display_indent() -> Result<()> {
4035 let plan = display_plan()?;
4036
4037 assert_snapshot!(plan.display_indent(), @r"
4038 Projection: employee_csv.id
4039 Filter: employee_csv.state IN (<subquery>)
4040 Subquery:
4041 TableScan: employee_csv projection=[state]
4042 TableScan: employee_csv projection=[id, state]
4043 ");
4044 Ok(())
4045 }
4046
4047 #[test]
4048 fn test_display_indent_schema() -> Result<()> {
4049 let plan = display_plan()?;
4050
4051 assert_snapshot!(plan.display_indent_schema(), @r"
4052 Projection: employee_csv.id [id:Int32]
4053 Filter: employee_csv.state IN (<subquery>) [id:Int32, state:Utf8]
4054 Subquery: [state:Utf8]
4055 TableScan: employee_csv projection=[state] [state:Utf8]
4056 TableScan: employee_csv projection=[id, state] [id:Int32, state:Utf8]
4057 ");
4058 Ok(())
4059 }
4060
4061 #[test]
4062 fn test_display_subquery_alias() -> Result<()> {
4063 let plan1 = table_scan(Some("employee_csv"), &employee_schema(), Some(vec![3]))?
4064 .build()?;
4065 let plan1 = Arc::new(plan1);
4066
4067 let plan =
4068 table_scan(Some("employee_csv"), &employee_schema(), Some(vec![0, 3]))?
4069 .project(vec![col("id"), exists(plan1).alias("exists")])?
4070 .build();
4071
4072 assert_snapshot!(plan?.display_indent(), @r"
4073 Projection: employee_csv.id, EXISTS (<subquery>) AS exists
4074 Subquery:
4075 TableScan: employee_csv projection=[state]
4076 TableScan: employee_csv projection=[id, state]
4077 ");
4078 Ok(())
4079 }
4080
4081 #[test]
4082 fn test_display_graphviz() -> Result<()> {
4083 let plan = display_plan()?;
4084
4085 assert_snapshot!(plan.display_graphviz(), @r#"
4088 // Begin DataFusion GraphViz Plan,
4089 // display it online here: https://dreampuf.github.io/GraphvizOnline
4090
4091 digraph {
4092 subgraph cluster_1
4093 {
4094 graph[label="LogicalPlan"]
4095 2[shape=box label="Projection: employee_csv.id"]
4096 3[shape=box label="Filter: employee_csv.state IN (<subquery>)"]
4097 2 -> 3 [arrowhead=none, arrowtail=normal, dir=back]
4098 4[shape=box label="Subquery:"]
4099 3 -> 4 [arrowhead=none, arrowtail=normal, dir=back]
4100 5[shape=box label="TableScan: employee_csv projection=[state]"]
4101 4 -> 5 [arrowhead=none, arrowtail=normal, dir=back]
4102 6[shape=box label="TableScan: employee_csv projection=[id, state]"]
4103 3 -> 6 [arrowhead=none, arrowtail=normal, dir=back]
4104 }
4105 subgraph cluster_7
4106 {
4107 graph[label="Detailed LogicalPlan"]
4108 8[shape=box label="Projection: employee_csv.id\nSchema: [id:Int32]"]
4109 9[shape=box label="Filter: employee_csv.state IN (<subquery>)\nSchema: [id:Int32, state:Utf8]"]
4110 8 -> 9 [arrowhead=none, arrowtail=normal, dir=back]
4111 10[shape=box label="Subquery:\nSchema: [state:Utf8]"]
4112 9 -> 10 [arrowhead=none, arrowtail=normal, dir=back]
4113 11[shape=box label="TableScan: employee_csv projection=[state]\nSchema: [state:Utf8]"]
4114 10 -> 11 [arrowhead=none, arrowtail=normal, dir=back]
4115 12[shape=box label="TableScan: employee_csv projection=[id, state]\nSchema: [id:Int32, state:Utf8]"]
4116 9 -> 12 [arrowhead=none, arrowtail=normal, dir=back]
4117 }
4118 }
4119 // End DataFusion GraphViz Plan
4120 "#);
4121 Ok(())
4122 }
4123
4124 #[test]
4125 fn test_display_pg_json() -> Result<()> {
4126 let plan = display_plan()?;
4127
4128 assert_snapshot!(plan.display_pg_json(), @r#"
4129 [
4130 {
4131 "Plan": {
4132 "Expressions": [
4133 "employee_csv.id"
4134 ],
4135 "Node Type": "Projection",
4136 "Output": [
4137 "id"
4138 ],
4139 "Plans": [
4140 {
4141 "Condition": "employee_csv.state IN (<subquery>)",
4142 "Node Type": "Filter",
4143 "Output": [
4144 "id",
4145 "state"
4146 ],
4147 "Plans": [
4148 {
4149 "Node Type": "Subquery",
4150 "Output": [
4151 "state"
4152 ],
4153 "Plans": [
4154 {
4155 "Node Type": "TableScan",
4156 "Output": [
4157 "state"
4158 ],
4159 "Plans": [],
4160 "Relation Name": "employee_csv"
4161 }
4162 ]
4163 },
4164 {
4165 "Node Type": "TableScan",
4166 "Output": [
4167 "id",
4168 "state"
4169 ],
4170 "Plans": [],
4171 "Relation Name": "employee_csv"
4172 }
4173 ]
4174 }
4175 ]
4176 }
4177 }
4178 ]
4179 "#);
4180 Ok(())
4181 }
4182
4183 #[derive(Debug, Default)]
4185 struct OkVisitor {
4186 strings: Vec<String>,
4187 }
4188
4189 impl<'n> TreeNodeVisitor<'n> for OkVisitor {
4190 type Node = LogicalPlan;
4191
4192 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4193 let s = match plan {
4194 LogicalPlan::Projection { .. } => "pre_visit Projection",
4195 LogicalPlan::Filter { .. } => "pre_visit Filter",
4196 LogicalPlan::TableScan { .. } => "pre_visit TableScan",
4197 _ => {
4198 return not_impl_err!("unknown plan type");
4199 }
4200 };
4201
4202 self.strings.push(s.into());
4203 Ok(TreeNodeRecursion::Continue)
4204 }
4205
4206 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4207 let s = match plan {
4208 LogicalPlan::Projection { .. } => "post_visit Projection",
4209 LogicalPlan::Filter { .. } => "post_visit Filter",
4210 LogicalPlan::TableScan { .. } => "post_visit TableScan",
4211 _ => {
4212 return not_impl_err!("unknown plan type");
4213 }
4214 };
4215
4216 self.strings.push(s.into());
4217 Ok(TreeNodeRecursion::Continue)
4218 }
4219 }
4220
4221 #[test]
4222 fn visit_order() {
4223 let mut visitor = OkVisitor::default();
4224 let plan = test_plan();
4225 let res = plan.visit_with_subqueries(&mut visitor);
4226 assert!(res.is_ok());
4227
4228 assert_debug_snapshot!(visitor.strings, @r#"
4229 [
4230 "pre_visit Projection",
4231 "pre_visit Filter",
4232 "pre_visit TableScan",
4233 "post_visit TableScan",
4234 "post_visit Filter",
4235 "post_visit Projection",
4236 ]
4237 "#);
4238 }
4239
4240 #[derive(Debug, Default)]
4241 struct OptionalCounter {
4243 val: Option<usize>,
4244 }
4245
4246 impl OptionalCounter {
4247 fn new(val: usize) -> Self {
4248 Self { val: Some(val) }
4249 }
4250 fn dec(&mut self) -> bool {
4252 if Some(0) == self.val {
4253 true
4254 } else {
4255 self.val = self.val.take().map(|i| i - 1);
4256 false
4257 }
4258 }
4259 }
4260
4261 #[derive(Debug, Default)]
4262 struct StoppingVisitor {
4264 inner: OkVisitor,
4265 return_false_from_pre_in: OptionalCounter,
4267 return_false_from_post_in: OptionalCounter,
4269 }
4270
4271 impl<'n> TreeNodeVisitor<'n> for StoppingVisitor {
4272 type Node = LogicalPlan;
4273
4274 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4275 if self.return_false_from_pre_in.dec() {
4276 return Ok(TreeNodeRecursion::Stop);
4277 }
4278 self.inner.f_down(plan)?;
4279
4280 Ok(TreeNodeRecursion::Continue)
4281 }
4282
4283 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4284 if self.return_false_from_post_in.dec() {
4285 return Ok(TreeNodeRecursion::Stop);
4286 }
4287
4288 self.inner.f_up(plan)
4289 }
4290 }
4291
4292 #[test]
4294 fn early_stopping_pre_visit() {
4295 let mut visitor = StoppingVisitor {
4296 return_false_from_pre_in: OptionalCounter::new(2),
4297 ..Default::default()
4298 };
4299 let plan = test_plan();
4300 let res = plan.visit_with_subqueries(&mut visitor);
4301 assert!(res.is_ok());
4302
4303 assert_debug_snapshot!(
4304 visitor.inner.strings,
4305 @r#"
4306 [
4307 "pre_visit Projection",
4308 "pre_visit Filter",
4309 ]
4310 "#
4311 );
4312 }
4313
4314 #[test]
4315 fn early_stopping_post_visit() {
4316 let mut visitor = StoppingVisitor {
4317 return_false_from_post_in: OptionalCounter::new(1),
4318 ..Default::default()
4319 };
4320 let plan = test_plan();
4321 let res = plan.visit_with_subqueries(&mut visitor);
4322 assert!(res.is_ok());
4323
4324 assert_debug_snapshot!(
4325 visitor.inner.strings,
4326 @r#"
4327 [
4328 "pre_visit Projection",
4329 "pre_visit Filter",
4330 "pre_visit TableScan",
4331 "post_visit TableScan",
4332 ]
4333 "#
4334 );
4335 }
4336
4337 #[derive(Debug, Default)]
4338 struct ErrorVisitor {
4340 inner: OkVisitor,
4341 return_error_from_pre_in: OptionalCounter,
4343 return_error_from_post_in: OptionalCounter,
4345 }
4346
4347 impl<'n> TreeNodeVisitor<'n> for ErrorVisitor {
4348 type Node = LogicalPlan;
4349
4350 fn f_down(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4351 if self.return_error_from_pre_in.dec() {
4352 return not_impl_err!("Error in pre_visit");
4353 }
4354
4355 self.inner.f_down(plan)
4356 }
4357
4358 fn f_up(&mut self, plan: &'n LogicalPlan) -> Result<TreeNodeRecursion> {
4359 if self.return_error_from_post_in.dec() {
4360 return not_impl_err!("Error in post_visit");
4361 }
4362
4363 self.inner.f_up(plan)
4364 }
4365 }
4366
4367 #[test]
4368 fn error_pre_visit() {
4369 let mut visitor = ErrorVisitor {
4370 return_error_from_pre_in: OptionalCounter::new(2),
4371 ..Default::default()
4372 };
4373 let plan = test_plan();
4374 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4375 assert_snapshot!(
4376 res.strip_backtrace(),
4377 @"This feature is not implemented: Error in pre_visit"
4378 );
4379 assert_debug_snapshot!(
4380 visitor.inner.strings,
4381 @r#"
4382 [
4383 "pre_visit Projection",
4384 "pre_visit Filter",
4385 ]
4386 "#
4387 );
4388 }
4389
4390 #[test]
4391 fn error_post_visit() {
4392 let mut visitor = ErrorVisitor {
4393 return_error_from_post_in: OptionalCounter::new(1),
4394 ..Default::default()
4395 };
4396 let plan = test_plan();
4397 let res = plan.visit_with_subqueries(&mut visitor).unwrap_err();
4398 assert_snapshot!(
4399 res.strip_backtrace(),
4400 @"This feature is not implemented: Error in post_visit"
4401 );
4402 assert_debug_snapshot!(
4403 visitor.inner.strings,
4404 @r#"
4405 [
4406 "pre_visit Projection",
4407 "pre_visit Filter",
4408 "pre_visit TableScan",
4409 "post_visit TableScan",
4410 ]
4411 "#
4412 );
4413 }
4414
4415 #[test]
4416 fn projection_expr_schema_mismatch() -> Result<()> {
4417 let empty_schema = Arc::new(DFSchema::empty());
4418 let p = Projection::try_new_with_schema(
4419 vec![col("a")],
4420 Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
4421 produce_one_row: false,
4422 schema: Arc::clone(&empty_schema),
4423 })),
4424 empty_schema,
4425 );
4426 assert_snapshot!(p.unwrap_err().strip_backtrace(), @"Error during planning: Projection has mismatch between number of expressions (1) and number of fields in schema (0)");
4427 Ok(())
4428 }
4429
4430 fn test_plan() -> LogicalPlan {
4431 let schema = Schema::new(vec![
4432 Field::new("id", DataType::Int32, false),
4433 Field::new("state", DataType::Utf8, false),
4434 ]);
4435
4436 table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
4437 .unwrap()
4438 .filter(col("state").eq(lit("CO")))
4439 .unwrap()
4440 .project(vec![col("id")])
4441 .unwrap()
4442 .build()
4443 .unwrap()
4444 }
4445
4446 #[test]
4447 fn test_replace_invalid_placeholder() {
4448 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4450
4451 let plan = table_scan(TableReference::none(), &schema, None)
4452 .unwrap()
4453 .filter(col("id").eq(placeholder("")))
4454 .unwrap()
4455 .build()
4456 .unwrap();
4457
4458 let param_values = vec![ScalarValue::Int32(Some(42))];
4459 plan.replace_params_with_values(¶m_values.clone().into())
4460 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4461
4462 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4464
4465 let plan = table_scan(TableReference::none(), &schema, None)
4466 .unwrap()
4467 .filter(col("id").eq(placeholder("$0")))
4468 .unwrap()
4469 .build()
4470 .unwrap();
4471
4472 plan.replace_params_with_values(¶m_values.clone().into())
4473 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4474
4475 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4477
4478 let plan = table_scan(TableReference::none(), &schema, None)
4479 .unwrap()
4480 .filter(col("id").eq(placeholder("$00")))
4481 .unwrap()
4482 .build()
4483 .unwrap();
4484
4485 plan.replace_params_with_values(¶m_values.into())
4486 .expect_err("unexpectedly succeeded to replace an invalid placeholder");
4487 }
4488
4489 #[test]
4490 fn test_nullable_schema_after_grouping_set() {
4491 let schema = Schema::new(vec![
4492 Field::new("foo", DataType::Int32, false),
4493 Field::new("bar", DataType::Int32, false),
4494 ]);
4495
4496 let plan = table_scan(TableReference::none(), &schema, None)
4497 .unwrap()
4498 .aggregate(
4499 vec![Expr::GroupingSet(GroupingSet::GroupingSets(vec![
4500 vec![col("foo")],
4501 vec![col("bar")],
4502 ]))],
4503 vec![count(lit(true))],
4504 )
4505 .unwrap()
4506 .build()
4507 .unwrap();
4508
4509 let output_schema = plan.schema();
4510
4511 assert!(output_schema
4512 .field_with_name(None, "foo")
4513 .unwrap()
4514 .is_nullable(),);
4515 assert!(output_schema
4516 .field_with_name(None, "bar")
4517 .unwrap()
4518 .is_nullable());
4519 }
4520
4521 #[test]
4522 fn test_filter_is_scalar() {
4523 let schema =
4525 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
4526
4527 let source = Arc::new(LogicalTableSource::new(schema));
4528 let schema = Arc::new(
4529 DFSchema::try_from_qualified_schema(
4530 TableReference::bare("tab"),
4531 &source.schema(),
4532 )
4533 .unwrap(),
4534 );
4535 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4536 table_name: TableReference::bare("tab"),
4537 source: Arc::clone(&source) as Arc<dyn TableSource>,
4538 projection: None,
4539 projected_schema: Arc::clone(&schema),
4540 filters: vec![],
4541 fetch: None,
4542 }));
4543 let col = schema.field_names()[0].clone();
4544
4545 let filter = Filter::try_new(
4546 Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
4547 scan,
4548 )
4549 .unwrap();
4550 assert!(!filter.is_scalar());
4551 let unique_schema = Arc::new(
4552 schema
4553 .as_ref()
4554 .clone()
4555 .with_functional_dependencies(
4556 FunctionalDependencies::new_from_constraints(
4557 Some(&Constraints::new_unverified(vec![Constraint::Unique(
4558 vec![0],
4559 )])),
4560 1,
4561 ),
4562 )
4563 .unwrap(),
4564 );
4565 let scan = Arc::new(LogicalPlan::TableScan(TableScan {
4566 table_name: TableReference::bare("tab"),
4567 source,
4568 projection: None,
4569 projected_schema: Arc::clone(&unique_schema),
4570 filters: vec![],
4571 fetch: None,
4572 }));
4573 let col = schema.field_names()[0].clone();
4574
4575 let filter =
4576 Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
4577 assert!(filter.is_scalar());
4578 }
4579
4580 #[test]
4581 fn test_transform_explain() {
4582 let schema = Schema::new(vec![
4583 Field::new("foo", DataType::Int32, false),
4584 Field::new("bar", DataType::Int32, false),
4585 ]);
4586
4587 let plan = table_scan(TableReference::none(), &schema, None)
4588 .unwrap()
4589 .explain(false, false)
4590 .unwrap()
4591 .build()
4592 .unwrap();
4593
4594 let external_filter = col("foo").eq(lit(true));
4595
4596 let plan = plan
4599 .transform(|plan| match plan {
4600 LogicalPlan::TableScan(table) => {
4601 let filter = Filter::try_new(
4602 external_filter.clone(),
4603 Arc::new(LogicalPlan::TableScan(table)),
4604 )
4605 .unwrap();
4606 Ok(Transformed::yes(LogicalPlan::Filter(filter)))
4607 }
4608 x => Ok(Transformed::no(x)),
4609 })
4610 .data()
4611 .unwrap();
4612
4613 let actual = format!("{}", plan.display_indent());
4614 assert_snapshot!(actual, @r"
4615 Explain
4616 Filter: foo = Boolean(true)
4617 TableScan: ?table?
4618 ")
4619 }
4620
4621 #[test]
4622 fn test_plan_partial_ord() {
4623 let empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
4624 produce_one_row: false,
4625 schema: Arc::new(DFSchema::empty()),
4626 });
4627
4628 let describe_table = LogicalPlan::DescribeTable(DescribeTable {
4629 schema: Arc::new(Schema::new(vec![Field::new(
4630 "foo",
4631 DataType::Int32,
4632 false,
4633 )])),
4634 output_schema: DFSchemaRef::new(DFSchema::empty()),
4635 });
4636
4637 let describe_table_clone = LogicalPlan::DescribeTable(DescribeTable {
4638 schema: Arc::new(Schema::new(vec![Field::new(
4639 "foo",
4640 DataType::Int32,
4641 false,
4642 )])),
4643 output_schema: DFSchemaRef::new(DFSchema::empty()),
4644 });
4645
4646 assert_eq!(
4647 empty_relation.partial_cmp(&describe_table),
4648 Some(Ordering::Less)
4649 );
4650 assert_eq!(
4651 describe_table.partial_cmp(&empty_relation),
4652 Some(Ordering::Greater)
4653 );
4654 assert_eq!(describe_table.partial_cmp(&describe_table_clone), None);
4655 }
4656
4657 #[test]
4658 fn test_limit_with_new_children() {
4659 let input = Arc::new(LogicalPlan::Values(Values {
4660 schema: Arc::new(DFSchema::empty()),
4661 values: vec![vec![]],
4662 }));
4663 let cases = [
4664 LogicalPlan::Limit(Limit {
4665 skip: None,
4666 fetch: None,
4667 input: Arc::clone(&input),
4668 }),
4669 LogicalPlan::Limit(Limit {
4670 skip: None,
4671 fetch: Some(Box::new(Expr::Literal(
4672 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4673 None,
4674 ))),
4675 input: Arc::clone(&input),
4676 }),
4677 LogicalPlan::Limit(Limit {
4678 skip: Some(Box::new(Expr::Literal(
4679 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4680 None,
4681 ))),
4682 fetch: None,
4683 input: Arc::clone(&input),
4684 }),
4685 LogicalPlan::Limit(Limit {
4686 skip: Some(Box::new(Expr::Literal(
4687 ScalarValue::new_one(&DataType::UInt32).unwrap(),
4688 None,
4689 ))),
4690 fetch: Some(Box::new(Expr::Literal(
4691 ScalarValue::new_ten(&DataType::UInt32).unwrap(),
4692 None,
4693 ))),
4694 input,
4695 }),
4696 ];
4697
4698 for limit in cases {
4699 let new_limit = limit
4700 .with_new_exprs(
4701 limit.expressions(),
4702 limit.inputs().into_iter().cloned().collect(),
4703 )
4704 .unwrap();
4705 assert_eq!(limit, new_limit);
4706 }
4707 }
4708
4709 #[test]
4710 fn test_with_subqueries_jump() {
4711 let subquery_schema =
4716 Schema::new(vec![Field::new("sub_id", DataType::Int32, false)]);
4717
4718 let subquery_plan =
4719 table_scan(TableReference::none(), &subquery_schema, Some(vec![0]))
4720 .unwrap()
4721 .filter(col("sub_id").eq(lit(0)))
4722 .unwrap()
4723 .build()
4724 .unwrap();
4725
4726 let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
4727
4728 let plan = table_scan(TableReference::none(), &schema, Some(vec![0]))
4729 .unwrap()
4730 .filter(col("id").eq(lit(0)))
4731 .unwrap()
4732 .project(vec![col("id"), scalar_subquery(Arc::new(subquery_plan))])
4733 .unwrap()
4734 .build()
4735 .unwrap();
4736
4737 let mut filter_found = false;
4738 plan.apply_with_subqueries(|plan| {
4739 match plan {
4740 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
4741 LogicalPlan::Filter(..) => filter_found = true,
4742 _ => {}
4743 }
4744 Ok(TreeNodeRecursion::Continue)
4745 })
4746 .unwrap();
4747 assert!(!filter_found);
4748
4749 struct ProjectJumpVisitor {
4750 filter_found: bool,
4751 }
4752
4753 impl ProjectJumpVisitor {
4754 fn new() -> Self {
4755 Self {
4756 filter_found: false,
4757 }
4758 }
4759 }
4760
4761 impl<'n> TreeNodeVisitor<'n> for ProjectJumpVisitor {
4762 type Node = LogicalPlan;
4763
4764 fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
4765 match node {
4766 LogicalPlan::Projection(..) => return Ok(TreeNodeRecursion::Jump),
4767 LogicalPlan::Filter(..) => self.filter_found = true,
4768 _ => {}
4769 }
4770 Ok(TreeNodeRecursion::Continue)
4771 }
4772 }
4773
4774 let mut visitor = ProjectJumpVisitor::new();
4775 plan.visit_with_subqueries(&mut visitor).unwrap();
4776 assert!(!visitor.filter_found);
4777
4778 let mut filter_found = false;
4779 plan.clone()
4780 .transform_down_with_subqueries(|plan| {
4781 match plan {
4782 LogicalPlan::Projection(..) => {
4783 return Ok(Transformed::new(plan, false, TreeNodeRecursion::Jump))
4784 }
4785 LogicalPlan::Filter(..) => filter_found = true,
4786 _ => {}
4787 }
4788 Ok(Transformed::no(plan))
4789 })
4790 .unwrap();
4791 assert!(!filter_found);
4792
4793 let mut filter_found = false;
4794 plan.clone()
4795 .transform_down_up_with_subqueries(
4796 |plan| {
4797 match plan {
4798 LogicalPlan::Projection(..) => {
4799 return Ok(Transformed::new(
4800 plan,
4801 false,
4802 TreeNodeRecursion::Jump,
4803 ))
4804 }
4805 LogicalPlan::Filter(..) => filter_found = true,
4806 _ => {}
4807 }
4808 Ok(Transformed::no(plan))
4809 },
4810 |plan| Ok(Transformed::no(plan)),
4811 )
4812 .unwrap();
4813 assert!(!filter_found);
4814
4815 struct ProjectJumpRewriter {
4816 filter_found: bool,
4817 }
4818
4819 impl ProjectJumpRewriter {
4820 fn new() -> Self {
4821 Self {
4822 filter_found: false,
4823 }
4824 }
4825 }
4826
4827 impl TreeNodeRewriter for ProjectJumpRewriter {
4828 type Node = LogicalPlan;
4829
4830 fn f_down(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
4831 match node {
4832 LogicalPlan::Projection(..) => {
4833 return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump))
4834 }
4835 LogicalPlan::Filter(..) => self.filter_found = true,
4836 _ => {}
4837 }
4838 Ok(Transformed::no(node))
4839 }
4840 }
4841
4842 let mut rewriter = ProjectJumpRewriter::new();
4843 plan.rewrite_with_subqueries(&mut rewriter).unwrap();
4844 assert!(!rewriter.filter_found);
4845 }
4846
4847 #[test]
4848 fn test_with_unresolved_placeholders() {
4849 let field_name = "id";
4850 let placeholder_value = "$1";
4851 let schema = Schema::new(vec![Field::new(field_name, DataType::Int32, false)]);
4852
4853 let plan = table_scan(TableReference::none(), &schema, None)
4854 .unwrap()
4855 .filter(col(field_name).eq(placeholder(placeholder_value)))
4856 .unwrap()
4857 .build()
4858 .unwrap();
4859
4860 let params = plan.get_parameter_types().unwrap();
4862 assert_eq!(params.len(), 1);
4863
4864 let parameter_type = params.clone().get(placeholder_value).unwrap().clone();
4865 assert_eq!(parameter_type, None);
4866 }
4867
4868 #[test]
4869 fn test_join_with_new_exprs() -> Result<()> {
4870 fn create_test_join(
4871 on: Vec<(Expr, Expr)>,
4872 filter: Option<Expr>,
4873 ) -> Result<LogicalPlan> {
4874 let schema = Schema::new(vec![
4875 Field::new("a", DataType::Int32, false),
4876 Field::new("b", DataType::Int32, false),
4877 ]);
4878
4879 let left_schema = DFSchema::try_from_qualified_schema("t1", &schema)?;
4880 let right_schema = DFSchema::try_from_qualified_schema("t2", &schema)?;
4881
4882 Ok(LogicalPlan::Join(Join {
4883 left: Arc::new(
4884 table_scan(Some("t1"), left_schema.as_arrow(), None)?.build()?,
4885 ),
4886 right: Arc::new(
4887 table_scan(Some("t2"), right_schema.as_arrow(), None)?.build()?,
4888 ),
4889 on,
4890 filter,
4891 join_type: JoinType::Inner,
4892 join_constraint: JoinConstraint::On,
4893 schema: Arc::new(left_schema.join(&right_schema)?),
4894 null_equals_null: false,
4895 }))
4896 }
4897
4898 {
4899 let join = create_test_join(vec![(col("t1.a"), (col("t2.a")))], None)?;
4900 let LogicalPlan::Join(join) = join.with_new_exprs(
4901 join.expressions(),
4902 join.inputs().into_iter().cloned().collect(),
4903 )?
4904 else {
4905 unreachable!()
4906 };
4907 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
4908 assert_eq!(join.filter, None);
4909 }
4910
4911 {
4912 let join = create_test_join(vec![], Some(col("t1.a").gt(col("t2.a"))))?;
4913 let LogicalPlan::Join(join) = join.with_new_exprs(
4914 join.expressions(),
4915 join.inputs().into_iter().cloned().collect(),
4916 )?
4917 else {
4918 unreachable!()
4919 };
4920 assert_eq!(join.on, vec![]);
4921 assert_eq!(join.filter, Some(col("t1.a").gt(col("t2.a"))));
4922 }
4923
4924 {
4925 let join = create_test_join(
4926 vec![(col("t1.a"), (col("t2.a")))],
4927 Some(col("t1.b").gt(col("t2.b"))),
4928 )?;
4929 let LogicalPlan::Join(join) = join.with_new_exprs(
4930 join.expressions(),
4931 join.inputs().into_iter().cloned().collect(),
4932 )?
4933 else {
4934 unreachable!()
4935 };
4936 assert_eq!(join.on, vec![(col("t1.a"), (col("t2.a")))]);
4937 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
4938 }
4939
4940 {
4941 let join = create_test_join(
4942 vec![(col("t1.a"), (col("t2.a"))), (col("t1.b"), (col("t2.b")))],
4943 None,
4944 )?;
4945 let LogicalPlan::Join(join) = join.with_new_exprs(
4946 vec![
4947 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
4948 binary_expr(col("t2.a"), Operator::Plus, lit(2)),
4949 col("t1.b"),
4950 col("t2.b"),
4951 lit(true),
4952 ],
4953 join.inputs().into_iter().cloned().collect(),
4954 )?
4955 else {
4956 unreachable!()
4957 };
4958 assert_eq!(
4959 join.on,
4960 vec![
4961 (
4962 binary_expr(col("t1.a"), Operator::Plus, lit(1)),
4963 binary_expr(col("t2.a"), Operator::Plus, lit(2))
4964 ),
4965 (col("t1.b"), (col("t2.b")))
4966 ]
4967 );
4968 assert_eq!(join.filter, Some(lit(true)));
4969 }
4970
4971 Ok(())
4972 }
4973
4974 #[test]
4975 fn test_join_try_new() -> Result<()> {
4976 let schema = Schema::new(vec![
4977 Field::new("a", DataType::Int32, false),
4978 Field::new("b", DataType::Int32, false),
4979 ]);
4980
4981 let left_scan = table_scan(Some("t1"), &schema, None)?.build()?;
4982
4983 let right_scan = table_scan(Some("t2"), &schema, None)?.build()?;
4984
4985 let join_types = vec![
4986 JoinType::Inner,
4987 JoinType::Left,
4988 JoinType::Right,
4989 JoinType::Full,
4990 JoinType::LeftSemi,
4991 JoinType::LeftAnti,
4992 JoinType::RightSemi,
4993 JoinType::RightAnti,
4994 JoinType::LeftMark,
4995 ];
4996
4997 for join_type in join_types {
4998 let join = Join::try_new(
4999 Arc::new(left_scan.clone()),
5000 Arc::new(right_scan.clone()),
5001 vec![(col("t1.a"), col("t2.a"))],
5002 Some(col("t1.b").gt(col("t2.b"))),
5003 join_type,
5004 JoinConstraint::On,
5005 false,
5006 )?;
5007
5008 match join_type {
5009 JoinType::LeftSemi | JoinType::LeftAnti => {
5010 assert_eq!(join.schema.fields().len(), 2);
5011
5012 let fields = join.schema.fields();
5013 assert_eq!(
5014 fields[0].name(),
5015 "a",
5016 "First field should be 'a' from left table"
5017 );
5018 assert_eq!(
5019 fields[1].name(),
5020 "b",
5021 "Second field should be 'b' from left table"
5022 );
5023 }
5024 JoinType::RightSemi | JoinType::RightAnti => {
5025 assert_eq!(join.schema.fields().len(), 2);
5026
5027 let fields = join.schema.fields();
5028 assert_eq!(
5029 fields[0].name(),
5030 "a",
5031 "First field should be 'a' from right table"
5032 );
5033 assert_eq!(
5034 fields[1].name(),
5035 "b",
5036 "Second field should be 'b' from right table"
5037 );
5038 }
5039 JoinType::LeftMark => {
5040 assert_eq!(join.schema.fields().len(), 3);
5041
5042 let fields = join.schema.fields();
5043 assert_eq!(
5044 fields[0].name(),
5045 "a",
5046 "First field should be 'a' from left table"
5047 );
5048 assert_eq!(
5049 fields[1].name(),
5050 "b",
5051 "Second field should be 'b' from left table"
5052 );
5053 assert_eq!(
5054 fields[2].name(),
5055 "mark",
5056 "Third field should be the mark column"
5057 );
5058
5059 assert!(!fields[0].is_nullable());
5060 assert!(!fields[1].is_nullable());
5061 assert!(!fields[2].is_nullable());
5062 }
5063 _ => {
5064 assert_eq!(join.schema.fields().len(), 4);
5065
5066 let fields = join.schema.fields();
5067 assert_eq!(
5068 fields[0].name(),
5069 "a",
5070 "First field should be 'a' from left table"
5071 );
5072 assert_eq!(
5073 fields[1].name(),
5074 "b",
5075 "Second field should be 'b' from left table"
5076 );
5077 assert_eq!(
5078 fields[2].name(),
5079 "a",
5080 "Third field should be 'a' from right table"
5081 );
5082 assert_eq!(
5083 fields[3].name(),
5084 "b",
5085 "Fourth field should be 'b' from right table"
5086 );
5087
5088 if join_type == JoinType::Left {
5089 assert!(!fields[0].is_nullable());
5091 assert!(!fields[1].is_nullable());
5092 assert!(fields[2].is_nullable());
5094 assert!(fields[3].is_nullable());
5095 } else if join_type == JoinType::Right {
5096 assert!(fields[0].is_nullable());
5098 assert!(fields[1].is_nullable());
5099 assert!(!fields[2].is_nullable());
5101 assert!(!fields[3].is_nullable());
5102 } else if join_type == JoinType::Full {
5103 assert!(fields[0].is_nullable());
5104 assert!(fields[1].is_nullable());
5105 assert!(fields[2].is_nullable());
5106 assert!(fields[3].is_nullable());
5107 }
5108 }
5109 }
5110
5111 assert_eq!(join.on, vec![(col("t1.a"), col("t2.a"))]);
5112 assert_eq!(join.filter, Some(col("t1.b").gt(col("t2.b"))));
5113 assert_eq!(join.join_type, join_type);
5114 assert_eq!(join.join_constraint, JoinConstraint::On);
5115 assert!(!join.null_equals_null);
5116 }
5117
5118 Ok(())
5119 }
5120
5121 #[test]
5122 fn test_join_try_new_with_using_constraint_and_overlapping_columns() -> Result<()> {
5123 let left_schema = Schema::new(vec![
5124 Field::new("id", DataType::Int32, false), Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), ]);
5128
5129 let right_schema = Schema::new(vec![
5130 Field::new("id", DataType::Int32, false), Field::new("category", DataType::Utf8, false), Field::new("value", DataType::Float64, true), ]);
5134
5135 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5136
5137 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5138
5139 {
5141 let join = Join::try_new(
5144 Arc::new(left_plan.clone()),
5145 Arc::new(right_plan.clone()),
5146 vec![(col("t1.id"), col("t2.id"))],
5147 None,
5148 JoinType::Inner,
5149 JoinConstraint::Using,
5150 false,
5151 )?;
5152
5153 let fields = join.schema.fields();
5154
5155 assert_eq!(fields.len(), 6);
5156
5157 assert_eq!(
5158 fields[0].name(),
5159 "id",
5160 "First field should be 'id' from left table"
5161 );
5162 assert_eq!(
5163 fields[1].name(),
5164 "name",
5165 "Second field should be 'name' from left table"
5166 );
5167 assert_eq!(
5168 fields[2].name(),
5169 "value",
5170 "Third field should be 'value' from left table"
5171 );
5172 assert_eq!(
5173 fields[3].name(),
5174 "id",
5175 "Fourth field should be 'id' from right table"
5176 );
5177 assert_eq!(
5178 fields[4].name(),
5179 "category",
5180 "Fifth field should be 'category' from right table"
5181 );
5182 assert_eq!(
5183 fields[5].name(),
5184 "value",
5185 "Sixth field should be 'value' from right table"
5186 );
5187
5188 assert_eq!(join.join_constraint, JoinConstraint::Using);
5189 }
5190
5191 {
5193 let join = Join::try_new(
5195 Arc::new(left_plan.clone()),
5196 Arc::new(right_plan.clone()),
5197 vec![(col("t1.id"), col("t2.id"))], Some(col("t1.value").lt(col("t2.value"))), JoinType::Inner,
5200 JoinConstraint::On,
5201 false,
5202 )?;
5203
5204 let fields = join.schema.fields();
5205 assert_eq!(fields.len(), 6);
5206
5207 assert_eq!(
5208 fields[0].name(),
5209 "id",
5210 "First field should be 'id' from left table"
5211 );
5212 assert_eq!(
5213 fields[1].name(),
5214 "name",
5215 "Second field should be 'name' from left table"
5216 );
5217 assert_eq!(
5218 fields[2].name(),
5219 "value",
5220 "Third field should be 'value' from left table"
5221 );
5222 assert_eq!(
5223 fields[3].name(),
5224 "id",
5225 "Fourth field should be 'id' from right table"
5226 );
5227 assert_eq!(
5228 fields[4].name(),
5229 "category",
5230 "Fifth field should be 'category' from right table"
5231 );
5232 assert_eq!(
5233 fields[5].name(),
5234 "value",
5235 "Sixth field should be 'value' from right table"
5236 );
5237
5238 assert_eq!(join.filter, Some(col("t1.value").lt(col("t2.value"))));
5239 }
5240
5241 {
5243 let join = Join::try_new(
5244 Arc::new(left_plan.clone()),
5245 Arc::new(right_plan.clone()),
5246 vec![(col("t1.id"), col("t2.id"))],
5247 None,
5248 JoinType::Inner,
5249 JoinConstraint::On,
5250 true,
5251 )?;
5252
5253 assert!(join.null_equals_null);
5254 }
5255
5256 Ok(())
5257 }
5258
5259 #[test]
5260 fn test_join_try_new_schema_validation() -> Result<()> {
5261 let left_schema = Schema::new(vec![
5262 Field::new("id", DataType::Int32, false),
5263 Field::new("name", DataType::Utf8, false),
5264 Field::new("value", DataType::Float64, true),
5265 ]);
5266
5267 let right_schema = Schema::new(vec![
5268 Field::new("id", DataType::Int32, false),
5269 Field::new("category", DataType::Utf8, true),
5270 Field::new("code", DataType::Int16, false),
5271 ]);
5272
5273 let left_plan = table_scan(Some("t1"), &left_schema, None)?.build()?;
5274
5275 let right_plan = table_scan(Some("t2"), &right_schema, None)?.build()?;
5276
5277 let join_types = vec![
5278 JoinType::Inner,
5279 JoinType::Left,
5280 JoinType::Right,
5281 JoinType::Full,
5282 ];
5283
5284 for join_type in join_types {
5285 let join = Join::try_new(
5286 Arc::new(left_plan.clone()),
5287 Arc::new(right_plan.clone()),
5288 vec![(col("t1.id"), col("t2.id"))],
5289 Some(col("t1.value").gt(lit(5.0))),
5290 join_type,
5291 JoinConstraint::On,
5292 false,
5293 )?;
5294
5295 let fields = join.schema.fields();
5296 assert_eq!(fields.len(), 6, "Expected 6 fields for {join_type:?} join");
5297
5298 for (i, field) in fields.iter().enumerate() {
5299 let expected_nullable = match (i, &join_type) {
5300 (0, JoinType::Right | JoinType::Full) => true, (1, JoinType::Right | JoinType::Full) => true, (2, _) => true, (3, JoinType::Left | JoinType::Full) => true, (4, _) => true, (5, JoinType::Left | JoinType::Full) => true, _ => false,
5311 };
5312
5313 assert_eq!(
5314 field.is_nullable(),
5315 expected_nullable,
5316 "Field {} ({}) nullability incorrect for {:?} join",
5317 i,
5318 field.name(),
5319 join_type
5320 );
5321 }
5322 }
5323
5324 let using_join = Join::try_new(
5325 Arc::new(left_plan.clone()),
5326 Arc::new(right_plan.clone()),
5327 vec![(col("t1.id"), col("t2.id"))],
5328 None,
5329 JoinType::Inner,
5330 JoinConstraint::Using,
5331 false,
5332 )?;
5333
5334 assert_eq!(
5335 using_join.schema.fields().len(),
5336 6,
5337 "USING join should have all fields"
5338 );
5339 assert_eq!(using_join.join_constraint, JoinConstraint::Using);
5340
5341 Ok(())
5342 }
5343}