1mod required_indices;
21
22use crate::optimizer::ApplyOrder;
23use crate::{OptimizerConfig, OptimizerRule};
24use std::sync::Arc;
25
26use datafusion_common::{
27 Column, DFSchema, HashMap, JoinType, Result, assert_eq_or_internal_err,
28 get_required_group_by_exprs_indices, internal_datafusion_err, internal_err,
29};
30use datafusion_expr::expr::Alias;
31use datafusion_expr::{
32 Aggregate, Distinct, EmptyRelation, Expr, Projection, TableScan, Unnest, Window,
33 logical_plan::LogicalPlan,
34};
35
36use crate::optimize_projections::required_indices::RequiredIndices;
37use crate::utils::NamePreserver;
38use datafusion_common::tree_node::{
39 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
40};
41
42#[derive(Default, Debug)]
76pub struct OptimizeProjections {}
77
78impl OptimizeProjections {
79 #[expect(missing_docs)]
80 pub fn new() -> Self {
81 Self {}
82 }
83}
84
85impl OptimizerRule for OptimizeProjections {
86 fn name(&self) -> &str {
87 "optimize_projections"
88 }
89
90 fn apply_order(&self) -> Option<ApplyOrder> {
91 None
92 }
93
94 fn supports_rewrite(&self) -> bool {
95 true
96 }
97
98 fn rewrite(
99 &self,
100 plan: LogicalPlan,
101 config: &dyn OptimizerConfig,
102 ) -> Result<Transformed<LogicalPlan>> {
103 let indices = RequiredIndices::new_for_all_exprs(&plan);
105 optimize_projections(plan, config, indices)
106 }
107}
108
109#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
129fn optimize_projections(
130 plan: LogicalPlan,
131 config: &dyn OptimizerConfig,
132 indices: RequiredIndices,
133) -> Result<Transformed<LogicalPlan>> {
134 match plan {
137 LogicalPlan::Projection(proj) => {
138 return merge_consecutive_projections(proj)?
139 .transform_data(|proj| {
140 rewrite_projection_given_requirements(proj, config, &indices)
141 })?
142 .transform_data(|plan| optimize_subqueries(plan, config));
143 }
144 LogicalPlan::Aggregate(aggregate) => {
145 let n_group_exprs = aggregate.group_expr_len()?;
147 let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs);
150
151 let new_group_bys = if aggregate
156 .input
157 .schema()
158 .functional_dependencies()
159 .is_empty()
160 {
161 aggregate.group_expr
162 } else {
163 let group_by_expr_existing = aggregate
164 .group_expr
165 .iter()
166 .map(|group_by_expr| group_by_expr.schema_name().to_string())
167 .collect::<Vec<_>>();
168
169 if let Some(simplest_groupby_indices) =
170 get_required_group_by_exprs_indices(
171 aggregate.input.schema(),
172 &group_by_expr_existing,
173 )
174 {
175 group_by_reqs
179 .append(&simplest_groupby_indices)
180 .get_at_indices(&aggregate.group_expr)
181 } else {
182 aggregate.group_expr
183 }
184 };
185
186 let new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
189
190 if new_group_bys.is_empty() && new_aggr_expr.is_empty() {
191 return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
193 EmptyRelation {
194 produce_one_row: true,
195 schema: Arc::new(DFSchema::empty()),
196 },
197 )));
198 }
199
200 let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
201 let schema = aggregate.input.schema();
202 let necessary_indices =
203 RequiredIndices::new().with_exprs(schema, all_exprs_iter);
204 let necessary_exprs = necessary_indices.get_required_exprs(schema);
205
206 return optimize_projections(
207 Arc::unwrap_or_clone(aggregate.input),
208 config,
209 necessary_indices,
210 )?
211 .transform_data(|aggregate_input| {
212 add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)
217 })?
218 .map_data(|aggregate_input| {
219 Aggregate::try_new(
222 Arc::new(aggregate_input),
223 new_group_bys,
224 new_aggr_expr,
225 )
226 .map(LogicalPlan::Aggregate)
227 })?
228 .transform_data(|plan| optimize_subqueries(plan, config));
229 }
230 LogicalPlan::Window(window) => {
231 let input_schema = Arc::clone(window.input.schema());
232 let n_input_fields = input_schema.fields().len();
234 let (child_reqs, window_reqs) = indices.split_off(n_input_fields);
237
238 let new_window_expr = window_reqs.get_at_indices(&window.window_expr);
241
242 let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr);
245
246 return optimize_projections(
247 Arc::unwrap_or_clone(window.input),
248 config,
249 required_indices.clone(),
250 )?
251 .transform_data(|window_child| {
252 if new_window_expr.is_empty() {
253 Ok(Transformed::no(window_child))
255 } else {
256 let required_exprs =
260 required_indices.get_required_exprs(&input_schema);
261 let window_child =
262 add_projection_on_top_if_helpful(window_child, required_exprs)?
263 .data;
264 Window::try_new(new_window_expr, Arc::new(window_child))
265 .map(LogicalPlan::Window)
266 .map(Transformed::yes)
267 }
268 })?
269 .transform_data(|plan| optimize_subqueries(plan, config));
270 }
271 LogicalPlan::TableScan(table_scan) => {
272 let TableScan {
273 table_name,
274 source,
275 projection,
276 filters,
277 fetch,
278 projected_schema: _,
279 } = table_scan;
280
281 let projection = match &projection {
284 Some(projection) => indices.into_mapped_indices(|idx| projection[idx]),
285 None => indices.into_inner(),
286 };
287 let new_scan =
288 TableScan::try_new(table_name, source, Some(projection), filters, fetch)?;
289
290 return Transformed::yes(LogicalPlan::TableScan(new_scan))
291 .transform_data(|plan| optimize_subqueries(plan, config));
292 }
293 _ => {}
295 };
296
297 let mut child_required_indices: Vec<RequiredIndices> = match &plan {
300 LogicalPlan::Sort(_)
301 | LogicalPlan::Filter(_)
302 | LogicalPlan::Repartition(_)
303 | LogicalPlan::Union(_)
304 | LogicalPlan::SubqueryAlias(_)
305 | LogicalPlan::Distinct(Distinct::On(_)) => {
306 plan.inputs()
311 .into_iter()
312 .map(|input| {
313 indices
314 .clone()
315 .with_projection_beneficial()
316 .with_plan_exprs(&plan, input.schema())
317 })
318 .collect::<Result<_>>()?
319 }
320 LogicalPlan::Limit(_) => {
321 plan.inputs()
326 .into_iter()
327 .map(|input| indices.clone().with_plan_exprs(&plan, input.schema()))
328 .collect::<Result<_>>()?
329 }
330 LogicalPlan::Copy(_)
331 | LogicalPlan::Ddl(_)
332 | LogicalPlan::Dml(_)
333 | LogicalPlan::Explain(_)
334 | LogicalPlan::Analyze(_)
335 | LogicalPlan::Subquery(_)
336 | LogicalPlan::Statement(_)
337 | LogicalPlan::Distinct(Distinct::All(_)) => {
338 plan.inputs()
344 .into_iter()
345 .map(RequiredIndices::new_for_all_exprs)
346 .collect()
347 }
348 LogicalPlan::Extension(extension) => {
349 if let Some(necessary_children_indices) =
350 extension.node.necessary_children_exprs(indices.indices())
351 {
352 let children = extension.node.inputs();
353 assert_eq_or_internal_err!(
354 children.len(),
355 necessary_children_indices.len(),
356 "Inconsistent length between children and necessary children indices. \
357 Make sure `.necessary_children_exprs` implementation of the \
358 `UserDefinedLogicalNode` is consistent with actual children length \
359 for the node."
360 );
361 children
362 .into_iter()
363 .zip(necessary_children_indices)
364 .map(|(child, necessary_indices)| {
365 RequiredIndices::new_from_indices(necessary_indices)
366 .with_plan_exprs(&plan, child.schema())
367 })
368 .collect::<Result<Vec<_>>>()?
369 } else {
370 plan.inputs()
373 .into_iter()
374 .map(RequiredIndices::new_for_all_exprs)
375 .collect()
376 }
377 }
378 LogicalPlan::EmptyRelation(_)
379 | LogicalPlan::Values(_)
380 | LogicalPlan::DescribeTable(_) => {
381 return Ok(Transformed::no(plan));
383 }
384 LogicalPlan::RecursiveQuery(recursive) => {
385 if plan_contains_other_subqueries(
389 recursive.static_term.as_ref(),
390 &recursive.name,
391 ) || plan_contains_other_subqueries(
392 recursive.recursive_term.as_ref(),
393 &recursive.name,
394 ) {
395 return Ok(Transformed::no(plan));
396 }
397
398 plan.inputs()
399 .into_iter()
400 .map(|input| {
401 indices
402 .clone()
403 .with_projection_beneficial()
404 .with_plan_exprs(&plan, input.schema())
405 })
406 .collect::<Result<Vec<_>>>()?
407 }
408 LogicalPlan::Join(join) => {
409 let left_len = join.left.schema().fields().len();
410 let right_len = join.right.schema().fields().len();
411 let (left_req_indices, right_req_indices) =
412 split_join_requirements(left_len, right_len, indices, &join.join_type);
413 let left_indices =
414 left_req_indices.with_plan_exprs(&plan, join.left.schema())?;
415 let right_indices =
416 right_req_indices.with_plan_exprs(&plan, join.right.schema())?;
417 vec![
420 left_indices.with_projection_beneficial(),
421 right_indices.with_projection_beneficial(),
422 ]
423 }
424 LogicalPlan::Projection(_)
426 | LogicalPlan::Aggregate(_)
427 | LogicalPlan::Window(_)
428 | LogicalPlan::TableScan(_) => {
429 return internal_err!(
430 "OptimizeProjection: should have handled in the match statement above"
431 );
432 }
433 LogicalPlan::Unnest(Unnest {
434 input,
435 dependency_indices,
436 ..
437 }) => {
438 let required_indices =
440 RequiredIndices::new().with_plan_exprs(&plan, input.schema())?;
441
442 let mut additional_necessary_child_indices = Vec::new();
444 indices.indices().iter().for_each(|idx| {
445 if let Some(index) = dependency_indices.get(*idx) {
446 additional_necessary_child_indices.push(*index);
447 }
448 });
449 vec![required_indices.append(&additional_necessary_child_indices)]
450 }
451 };
452
453 child_required_indices.reverse();
456 assert_eq_or_internal_err!(
457 child_required_indices.len(),
458 plan.inputs().len(),
459 "OptimizeProjection: child_required_indices length mismatch with plan inputs"
460 );
461
462 let transformed_plan = plan.map_children(|child| {
464 let required_indices = child_required_indices.pop().ok_or_else(|| {
465 internal_datafusion_err!(
466 "Unexpected number of required_indices in OptimizeProjections rule"
467 )
468 })?;
469
470 let projection_beneficial = required_indices.projection_beneficial();
471 let project_exprs = required_indices.get_required_exprs(child.schema());
472
473 optimize_projections(child, config, required_indices)?.transform_data(
474 |new_input| {
475 if projection_beneficial {
476 add_projection_on_top_if_helpful(new_input, project_exprs)
477 } else {
478 Ok(Transformed::no(new_input))
479 }
480 },
481 )
482 })?;
483
484 let transformed_plan =
485 transformed_plan.transform_data(|plan| optimize_subqueries(plan, config))?;
486
487 if transformed_plan.transformed {
489 transformed_plan.map_data(|plan| plan.recompute_schema())
490 } else {
491 Ok(transformed_plan)
492 }
493}
494
495fn optimize_subqueries(
499 plan: LogicalPlan,
500 config: &dyn OptimizerConfig,
501) -> Result<Transformed<LogicalPlan>> {
502 plan.map_uncorrelated_subqueries(|subquery_plan| {
503 let indices = RequiredIndices::new_for_all_exprs(&subquery_plan);
504 optimize_projections(subquery_plan, config, indices)
505 })
506}
507
508fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Projection>> {
539 let Projection {
540 expr,
541 input,
542 schema,
543 ..
544 } = proj;
545 let LogicalPlan::Projection(prev_projection) = input.as_ref() else {
546 return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
547 };
548
549 if prev_projection.expr == expr {
552 return Projection::try_new_with_schema(
553 expr,
554 Arc::clone(&prev_projection.input),
555 schema,
556 )
557 .map(Transformed::yes);
558 }
559
560 let mut column_referral_map = HashMap::<&Column, usize>::new();
562 expr.iter()
563 .for_each(|expr| expr.add_column_ref_counts(&mut column_referral_map));
564
565 if column_referral_map.into_iter().any(|(col, usage)| {
569 usage > 1
570 && !prev_projection.expr[prev_projection.schema.index_of_column(col).unwrap()]
571 .placement()
572 .should_push_to_leaves()
573 }) {
574 return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
576 }
577
578 let LogicalPlan::Projection(prev_projection) = Arc::unwrap_or_clone(input) else {
579 unreachable!();
581 };
582
583 let name_preserver = NamePreserver::new_for_projection();
586 let mut original_names = vec![];
587 let new_exprs = expr.map_elements(|expr| {
588 original_names.push(name_preserver.save(&expr));
589
590 match expr {
592 Expr::Alias(Alias {
593 expr,
594 relation,
595 name,
596 metadata,
597 }) => rewrite_expr(*expr, &prev_projection).map(|result| {
598 result.update_data(|expr| {
599 if metadata.is_none() && expr.schema_name().to_string() == name {
606 expr
607 } else {
608 Expr::Alias(Alias {
609 expr: Box::new(expr),
610 relation,
611 name,
612 metadata,
613 })
614 }
615 })
616 }),
617 e => rewrite_expr(e, &prev_projection),
618 }
619 })?;
620
621 if new_exprs.transformed {
624 let new_exprs = new_exprs
626 .data
627 .into_iter()
628 .zip(original_names)
629 .map(|(expr, original_name)| original_name.restore(expr))
630 .collect::<Vec<_>>();
631 Projection::try_new(new_exprs, prev_projection.input).map(Transformed::yes)
632 } else {
633 let input = Arc::new(LogicalPlan::Projection(prev_projection));
635 Projection::try_new_with_schema(new_exprs.data, input, schema)
636 .map(Transformed::no)
637 }
638}
639
640fn rewrite_expr(expr: Expr, input: &Projection) -> Result<Transformed<Expr>> {
686 expr.transform_up(|expr| {
687 match expr {
688 Expr::Alias(alias) => {
690 match alias
691 .metadata
692 .as_ref()
693 .map(|h| h.is_empty())
694 .unwrap_or(true)
695 {
696 true => Ok(Transformed::yes(*alias.expr)),
697 false => Ok(Transformed::no(Expr::Alias(alias))),
698 }
699 }
700 Expr::Column(col) => {
701 let idx = input.schema.index_of_column(&col)?;
703 let input_expr = input.expr[idx].clone().unalias_nested().data;
711 Ok(Transformed::yes(input_expr))
712 }
713 _ => Ok(Transformed::no(expr)),
715 }
716 })
717}
718
719fn split_join_requirements(
750 left_len: usize,
751 right_len: usize,
752 indices: RequiredIndices,
753 join_type: &JoinType,
754) -> (RequiredIndices, RequiredIndices) {
755 match join_type {
756 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
758 indices.split_off(left_len)
761 }
762 JoinType::LeftMark => {
763 let (left_indices, _mark) = indices.split_off(left_len);
767 (left_indices, RequiredIndices::new())
768 }
769 JoinType::RightMark => {
770 let (right_indices, _mark) = indices.split_off(right_len);
772 (RequiredIndices::new(), right_indices)
773 }
774 JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()),
776 JoinType::RightSemi | JoinType::RightAnti => (RequiredIndices::new(), indices),
779 }
780}
781
782fn add_projection_on_top_if_helpful(
800 plan: LogicalPlan,
801 project_exprs: Vec<Expr>,
802) -> Result<Transformed<LogicalPlan>> {
803 if project_exprs.len() >= plan.schema().fields().len() {
805 Ok(Transformed::no(plan))
806 } else {
807 Projection::try_new(project_exprs, Arc::new(plan))
808 .map(LogicalPlan::Projection)
809 .map(Transformed::yes)
810 }
811}
812
813fn rewrite_projection_given_requirements(
831 proj: Projection,
832 config: &dyn OptimizerConfig,
833 indices: &RequiredIndices,
834) -> Result<Transformed<LogicalPlan>> {
835 let Projection { expr, input, .. } = proj;
836
837 let exprs_used = indices.get_at_indices(&expr);
838
839 let required_indices =
840 RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter());
841
842 optimize_projections(Arc::unwrap_or_clone(input), config, required_indices)?
845 .transform_data(|input| {
846 if is_projection_unnecessary(&input, &exprs_used)? {
847 Ok(Transformed::yes(input))
848 } else {
849 Projection::try_new(exprs_used, Arc::new(input))
850 .map(LogicalPlan::Projection)
851 .map(Transformed::yes)
852 }
853 })
854}
855
856pub fn is_projection_unnecessary(
860 input: &LogicalPlan,
861 proj_exprs: &[Expr],
862) -> Result<bool> {
863 if proj_exprs.len() != input.schema().fields().len() {
865 return Ok(false);
866 }
867 Ok(input.schema().iter().zip(proj_exprs.iter()).all(
868 |((field_relation, field_name), expr)| {
869 if let Expr::Column(col) = expr {
871 col.relation.as_ref() == field_relation && col.name.eq(field_name.name())
872 } else {
873 false
874 }
875 },
876 ))
877}
878
879fn plan_contains_other_subqueries(plan: &LogicalPlan, cte_name: &str) -> bool {
885 if let LogicalPlan::SubqueryAlias(alias) = plan
886 && alias.alias.table() != cte_name
887 && !subquery_alias_targets_recursive_cte(alias.input.as_ref(), cte_name)
888 {
889 return true;
890 }
891
892 let mut found = false;
893 plan.apply_expressions(|expr| {
894 if expr_contains_subquery(expr) {
895 found = true;
896 Ok(TreeNodeRecursion::Stop)
897 } else {
898 Ok(TreeNodeRecursion::Continue)
899 }
900 })
901 .expect("expression traversal never fails");
902 if found {
903 return true;
904 }
905
906 plan.inputs()
907 .into_iter()
908 .any(|child| plan_contains_other_subqueries(child, cte_name))
909}
910
911fn expr_contains_subquery(expr: &Expr) -> bool {
912 expr.exists(|e| match e {
913 Expr::ScalarSubquery(_) | Expr::Exists(_) | Expr::InSubquery(_) => Ok(true),
914 _ => Ok(false),
915 })
916 .unwrap()
918}
919
920fn subquery_alias_targets_recursive_cte(plan: &LogicalPlan, cte_name: &str) -> bool {
921 match plan {
922 LogicalPlan::TableScan(scan) => scan.table_name.table() == cte_name,
923 LogicalPlan::SubqueryAlias(alias) => {
924 subquery_alias_targets_recursive_cte(alias.input.as_ref(), cte_name)
925 }
926 _ => {
927 let inputs = plan.inputs();
928 if inputs.len() == 1 {
929 subquery_alias_targets_recursive_cte(inputs[0], cte_name)
930 } else {
931 false
932 }
933 }
934 }
935}
936
937#[cfg(test)]
938mod tests {
939 use std::cmp::Ordering;
940 use std::collections::HashMap;
941 use std::fmt::Formatter;
942 use std::ops::Add;
943 use std::sync::Arc;
944 use std::vec;
945
946 use crate::optimize_projections::OptimizeProjections;
947 use crate::optimizer::Optimizer;
948 use crate::test::{
949 assert_fields_eq, scan_empty, test_table_scan, test_table_scan_fields,
950 test_table_scan_with_name,
951 };
952 use crate::{OptimizerContext, OptimizerRule};
953 use arrow::datatypes::{DataType, Field, Schema};
954 use datafusion_common::{
955 Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference,
956 };
957 use datafusion_expr::ExprFunctionExt;
958 use datafusion_expr::{
959 BinaryExpr, Expr, Extension, Like, LogicalPlan, Operator, Projection,
960 UserDefinedLogicalNodeCore, WindowFunctionDefinition, binary_expr,
961 build_join_schema,
962 builder::table_scan_with_filters,
963 col,
964 expr::{self, Cast},
965 lit,
966 logical_plan::{builder::LogicalPlanBuilder, table_scan},
967 not, try_cast, when,
968 };
969 use insta::assert_snapshot;
970
971 use crate::assert_optimized_plan_eq_snapshot;
972 use datafusion_functions_aggregate::count::count_udaf;
973 use datafusion_functions_aggregate::expr_fn::{count, max, min};
974 use datafusion_functions_aggregate::min_max::max_udaf;
975
976 macro_rules! assert_optimized_plan_equal {
977 (
978 $plan:expr,
979 @ $expected:literal $(,)?
980 ) => {{
981 let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
982 let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(OptimizeProjections::new())];
983 assert_optimized_plan_eq_snapshot!(
984 optimizer_ctx,
985 rules,
986 $plan,
987 @ $expected,
988 )
989 }};
990 }
991
992 #[derive(Debug, Hash, PartialEq, Eq)]
993 struct NoOpUserDefined {
994 exprs: Vec<Expr>,
995 schema: DFSchemaRef,
996 input: Arc<LogicalPlan>,
997 }
998
999 impl NoOpUserDefined {
1000 fn new(schema: DFSchemaRef, input: Arc<LogicalPlan>) -> Self {
1001 Self {
1002 exprs: vec![],
1003 schema,
1004 input,
1005 }
1006 }
1007
1008 fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
1009 self.exprs = exprs;
1010 self
1011 }
1012 }
1013
1014 impl PartialOrd for NoOpUserDefined {
1016 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1017 match self.exprs.partial_cmp(&other.exprs) {
1018 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
1019 cmp => cmp,
1020 }
1021 .filter(|cmp| *cmp != Ordering::Equal || self == other)
1023 }
1024 }
1025
1026 impl UserDefinedLogicalNodeCore for NoOpUserDefined {
1027 fn name(&self) -> &str {
1028 "NoOpUserDefined"
1029 }
1030
1031 fn inputs(&self) -> Vec<&LogicalPlan> {
1032 vec![&self.input]
1033 }
1034
1035 fn schema(&self) -> &DFSchemaRef {
1036 &self.schema
1037 }
1038
1039 fn expressions(&self) -> Vec<Expr> {
1040 self.exprs.clone()
1041 }
1042
1043 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
1044 write!(f, "NoOpUserDefined")
1045 }
1046
1047 fn with_exprs_and_inputs(
1048 &self,
1049 exprs: Vec<Expr>,
1050 mut inputs: Vec<LogicalPlan>,
1051 ) -> Result<Self> {
1052 Ok(Self {
1053 exprs,
1054 input: Arc::new(inputs.swap_remove(0)),
1055 schema: Arc::clone(&self.schema),
1056 })
1057 }
1058
1059 fn necessary_children_exprs(
1060 &self,
1061 output_columns: &[usize],
1062 ) -> Option<Vec<Vec<usize>>> {
1063 Some(vec![output_columns.to_vec()])
1065 }
1066
1067 fn supports_limit_pushdown(&self) -> bool {
1068 false }
1070 }
1071
1072 #[derive(Debug, Hash, PartialEq, Eq)]
1073 struct UserDefinedCrossJoin {
1074 exprs: Vec<Expr>,
1075 schema: DFSchemaRef,
1076 left_child: Arc<LogicalPlan>,
1077 right_child: Arc<LogicalPlan>,
1078 }
1079
1080 impl UserDefinedCrossJoin {
1081 fn new(left_child: Arc<LogicalPlan>, right_child: Arc<LogicalPlan>) -> Self {
1082 let left_schema = left_child.schema();
1083 let right_schema = right_child.schema();
1084 let schema = Arc::new(
1085 build_join_schema(left_schema, right_schema, &JoinType::Inner).unwrap(),
1086 );
1087 Self {
1088 exprs: vec![],
1089 schema,
1090 left_child,
1091 right_child,
1092 }
1093 }
1094 }
1095
1096 impl PartialOrd for UserDefinedCrossJoin {
1098 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1099 match self.exprs.partial_cmp(&other.exprs) {
1100 Some(Ordering::Equal) => {
1101 match self.left_child.partial_cmp(&other.left_child) {
1102 Some(Ordering::Equal) => {
1103 self.right_child.partial_cmp(&other.right_child)
1104 }
1105 cmp => cmp,
1106 }
1107 }
1108 cmp => cmp,
1109 }
1110 .filter(|cmp| *cmp != Ordering::Equal || self == other)
1112 }
1113 }
1114
1115 impl UserDefinedLogicalNodeCore for UserDefinedCrossJoin {
1116 fn name(&self) -> &str {
1117 "UserDefinedCrossJoin"
1118 }
1119
1120 fn inputs(&self) -> Vec<&LogicalPlan> {
1121 vec![&self.left_child, &self.right_child]
1122 }
1123
1124 fn schema(&self) -> &DFSchemaRef {
1125 &self.schema
1126 }
1127
1128 fn expressions(&self) -> Vec<Expr> {
1129 self.exprs.clone()
1130 }
1131
1132 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
1133 write!(f, "UserDefinedCrossJoin")
1134 }
1135
1136 fn with_exprs_and_inputs(
1137 &self,
1138 exprs: Vec<Expr>,
1139 mut inputs: Vec<LogicalPlan>,
1140 ) -> Result<Self> {
1141 assert_eq!(inputs.len(), 2);
1142 Ok(Self {
1143 exprs,
1144 left_child: Arc::new(inputs.remove(0)),
1145 right_child: Arc::new(inputs.remove(0)),
1146 schema: Arc::clone(&self.schema),
1147 })
1148 }
1149
1150 fn necessary_children_exprs(
1151 &self,
1152 output_columns: &[usize],
1153 ) -> Option<Vec<Vec<usize>>> {
1154 let left_child_len = self.left_child.schema().fields().len();
1155 let mut left_reqs = vec![];
1156 let mut right_reqs = vec![];
1157 for &out_idx in output_columns {
1158 if out_idx < left_child_len {
1159 left_reqs.push(out_idx);
1160 } else {
1161 right_reqs.push(out_idx - left_child_len)
1164 }
1165 }
1166 Some(vec![left_reqs, right_reqs])
1167 }
1168
1169 fn supports_limit_pushdown(&self) -> bool {
1170 false }
1172 }
1173
1174 #[derive(Debug, Hash, PartialEq, Eq)]
1178 struct OpaqueRequirementsUserDefined {
1179 input: Arc<LogicalPlan>,
1180 schema: DFSchemaRef,
1181 }
1182
1183 impl PartialOrd for OpaqueRequirementsUserDefined {
1185 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1186 self.input
1187 .partial_cmp(&other.input)
1188 .filter(|cmp| *cmp != Ordering::Equal || self == other)
1189 }
1190 }
1191
1192 impl UserDefinedLogicalNodeCore for OpaqueRequirementsUserDefined {
1193 fn name(&self) -> &str {
1194 "OpaqueRequirementsUserDefined"
1195 }
1196
1197 fn inputs(&self) -> Vec<&LogicalPlan> {
1198 vec![&self.input]
1199 }
1200
1201 fn schema(&self) -> &DFSchemaRef {
1202 &self.schema
1203 }
1204
1205 fn expressions(&self) -> Vec<Expr> {
1206 vec![]
1207 }
1208
1209 fn with_exprs_and_inputs(
1210 &self,
1211 _exprs: Vec<Expr>,
1212 mut inputs: Vec<LogicalPlan>,
1213 ) -> Result<Self> {
1214 Ok(Self {
1215 input: Arc::new(inputs.swap_remove(0)),
1216 schema: Arc::clone(&self.schema),
1217 })
1218 }
1219
1220 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
1221 write!(f, "OpaqueRequirementsUserDefined")
1222 }
1223 }
1224
1225 #[test]
1226 fn merge_two_projection() -> Result<()> {
1227 let table_scan = test_table_scan()?;
1228 let plan = LogicalPlanBuilder::from(table_scan)
1229 .project(vec![col("a")])?
1230 .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1231 .build()?;
1232
1233 assert_optimized_plan_equal!(
1234 plan,
1235 @r"
1236 Projection: Int32(1) + test.a
1237 TableScan: test projection=[a]
1238 "
1239 )
1240 }
1241
1242 #[test]
1243 fn merge_three_projection() -> Result<()> {
1244 let table_scan = test_table_scan()?;
1245 let plan = LogicalPlanBuilder::from(table_scan)
1246 .project(vec![col("a"), col("b")])?
1247 .project(vec![col("a")])?
1248 .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1249 .build()?;
1250
1251 assert_optimized_plan_equal!(
1252 plan,
1253 @r"
1254 Projection: Int32(1) + test.a
1255 TableScan: test projection=[a]
1256 "
1257 )
1258 }
1259
1260 #[test]
1261 fn merge_alias() -> Result<()> {
1262 let table_scan = test_table_scan()?;
1263 let plan = LogicalPlanBuilder::from(table_scan)
1264 .project(vec![col("a")])?
1265 .project(vec![col("a").alias("alias")])?
1266 .build()?;
1267
1268 assert_optimized_plan_equal!(
1269 plan,
1270 @r"
1271 Projection: test.a AS alias
1272 TableScan: test projection=[a]
1273 "
1274 )
1275 }
1276
1277 #[test]
1278 fn merge_nested_alias() -> Result<()> {
1279 let table_scan = test_table_scan()?;
1280 let plan = LogicalPlanBuilder::from(table_scan)
1281 .project(vec![col("a").alias("alias1").alias("alias2")])?
1282 .project(vec![col("alias2").alias("alias")])?
1283 .build()?;
1284
1285 assert_optimized_plan_equal!(
1286 plan,
1287 @r"
1288 Projection: test.a AS alias
1289 TableScan: test projection=[a]
1290 "
1291 )
1292 }
1293
1294 #[test]
1295 fn test_nested_count() -> Result<()> {
1296 let schema = Schema::new(vec![Field::new("foo", DataType::Int32, false)]);
1297
1298 let groups: Vec<Expr> = vec![];
1299
1300 let plan = table_scan(TableReference::none(), &schema, None)
1301 .unwrap()
1302 .aggregate(groups.clone(), vec![count(lit(1))])
1303 .unwrap()
1304 .aggregate(groups, vec![count(lit(1))])
1305 .unwrap()
1306 .build()
1307 .unwrap();
1308
1309 assert_optimized_plan_equal!(
1310 plan,
1311 @r"
1312 Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]
1313 EmptyRelation: rows=1
1314 "
1315 )
1316 }
1317
1318 #[test]
1319 fn test_neg_push_down() -> Result<()> {
1320 let table_scan = test_table_scan()?;
1321 let plan = LogicalPlanBuilder::from(table_scan)
1322 .project(vec![-col("a")])?
1323 .build()?;
1324
1325 assert_optimized_plan_equal!(
1326 plan,
1327 @r"
1328 Projection: (- test.a)
1329 TableScan: test projection=[a]
1330 "
1331 )
1332 }
1333
1334 #[test]
1335 fn test_is_null() -> Result<()> {
1336 let table_scan = test_table_scan()?;
1337 let plan = LogicalPlanBuilder::from(table_scan)
1338 .project(vec![col("a").is_null()])?
1339 .build()?;
1340
1341 assert_optimized_plan_equal!(
1342 plan,
1343 @r"
1344 Projection: test.a IS NULL
1345 TableScan: test projection=[a]
1346 "
1347 )
1348 }
1349
1350 #[test]
1351 fn test_is_not_null() -> Result<()> {
1352 let table_scan = test_table_scan()?;
1353 let plan = LogicalPlanBuilder::from(table_scan)
1354 .project(vec![col("a").is_not_null()])?
1355 .build()?;
1356
1357 assert_optimized_plan_equal!(
1358 plan,
1359 @r"
1360 Projection: test.a IS NOT NULL
1361 TableScan: test projection=[a]
1362 "
1363 )
1364 }
1365
1366 #[test]
1367 fn test_is_true() -> Result<()> {
1368 let table_scan = test_table_scan()?;
1369 let plan = LogicalPlanBuilder::from(table_scan)
1370 .project(vec![col("a").is_true()])?
1371 .build()?;
1372
1373 assert_optimized_plan_equal!(
1374 plan,
1375 @r"
1376 Projection: test.a IS TRUE
1377 TableScan: test projection=[a]
1378 "
1379 )
1380 }
1381
1382 #[test]
1383 fn test_is_not_true() -> Result<()> {
1384 let table_scan = test_table_scan()?;
1385 let plan = LogicalPlanBuilder::from(table_scan)
1386 .project(vec![col("a").is_not_true()])?
1387 .build()?;
1388
1389 assert_optimized_plan_equal!(
1390 plan,
1391 @r"
1392 Projection: test.a IS NOT TRUE
1393 TableScan: test projection=[a]
1394 "
1395 )
1396 }
1397
1398 #[test]
1399 fn test_is_false() -> Result<()> {
1400 let table_scan = test_table_scan()?;
1401 let plan = LogicalPlanBuilder::from(table_scan)
1402 .project(vec![col("a").is_false()])?
1403 .build()?;
1404
1405 assert_optimized_plan_equal!(
1406 plan,
1407 @r"
1408 Projection: test.a IS FALSE
1409 TableScan: test projection=[a]
1410 "
1411 )
1412 }
1413
1414 #[test]
1415 fn test_is_not_false() -> Result<()> {
1416 let table_scan = test_table_scan()?;
1417 let plan = LogicalPlanBuilder::from(table_scan)
1418 .project(vec![col("a").is_not_false()])?
1419 .build()?;
1420
1421 assert_optimized_plan_equal!(
1422 plan,
1423 @r"
1424 Projection: test.a IS NOT FALSE
1425 TableScan: test projection=[a]
1426 "
1427 )
1428 }
1429
1430 #[test]
1431 fn test_is_unknown() -> Result<()> {
1432 let table_scan = test_table_scan()?;
1433 let plan = LogicalPlanBuilder::from(table_scan)
1434 .project(vec![col("a").is_unknown()])?
1435 .build()?;
1436
1437 assert_optimized_plan_equal!(
1438 plan,
1439 @r"
1440 Projection: test.a IS UNKNOWN
1441 TableScan: test projection=[a]
1442 "
1443 )
1444 }
1445
1446 #[test]
1447 fn test_is_not_unknown() -> Result<()> {
1448 let table_scan = test_table_scan()?;
1449 let plan = LogicalPlanBuilder::from(table_scan)
1450 .project(vec![col("a").is_not_unknown()])?
1451 .build()?;
1452
1453 assert_optimized_plan_equal!(
1454 plan,
1455 @r"
1456 Projection: test.a IS NOT UNKNOWN
1457 TableScan: test projection=[a]
1458 "
1459 )
1460 }
1461
1462 #[test]
1463 fn test_not() -> Result<()> {
1464 let table_scan = test_table_scan()?;
1465 let plan = LogicalPlanBuilder::from(table_scan)
1466 .project(vec![not(col("a"))])?
1467 .build()?;
1468
1469 assert_optimized_plan_equal!(
1470 plan,
1471 @r"
1472 Projection: NOT test.a
1473 TableScan: test projection=[a]
1474 "
1475 )
1476 }
1477
1478 #[test]
1479 fn test_try_cast() -> Result<()> {
1480 let table_scan = test_table_scan()?;
1481 let plan = LogicalPlanBuilder::from(table_scan)
1482 .project(vec![try_cast(col("a"), DataType::Float64)])?
1483 .build()?;
1484
1485 assert_optimized_plan_equal!(
1486 plan,
1487 @r"
1488 Projection: TRY_CAST(test.a AS Float64)
1489 TableScan: test projection=[a]
1490 "
1491 )
1492 }
1493
1494 #[test]
1495 fn test_similar_to() -> Result<()> {
1496 let table_scan = test_table_scan()?;
1497 let expr = Box::new(col("a"));
1498 let pattern = Box::new(lit("[0-9]"));
1499 let similar_to_expr =
1500 Expr::SimilarTo(Like::new(false, expr, pattern, None, false));
1501 let plan = LogicalPlanBuilder::from(table_scan)
1502 .project(vec![similar_to_expr])?
1503 .build()?;
1504
1505 assert_optimized_plan_equal!(
1506 plan,
1507 @r#"
1508 Projection: test.a SIMILAR TO Utf8("[0-9]")
1509 TableScan: test projection=[a]
1510 "#
1511 )
1512 }
1513
1514 #[test]
1515 fn test_between() -> Result<()> {
1516 let table_scan = test_table_scan()?;
1517 let plan = LogicalPlanBuilder::from(table_scan)
1518 .project(vec![col("a").between(lit(1), lit(3))])?
1519 .build()?;
1520
1521 assert_optimized_plan_equal!(
1522 plan,
1523 @r"
1524 Projection: test.a BETWEEN Int32(1) AND Int32(3)
1525 TableScan: test projection=[a]
1526 "
1527 )
1528 }
1529
1530 #[test]
1532 fn test_case_merged() -> Result<()> {
1533 let table_scan = test_table_scan()?;
1534 let plan = LogicalPlanBuilder::from(table_scan)
1535 .project(vec![col("a"), lit(0).alias("d")])?
1536 .project(vec![
1537 col("a"),
1538 when(col("a").eq(lit(1)), lit(10))
1539 .otherwise(col("d"))?
1540 .alias("d"),
1541 ])?
1542 .build()?;
1543
1544 assert_optimized_plan_equal!(
1545 plan,
1546 @r"
1547 Projection: test.a, CASE WHEN test.a = Int32(1) THEN Int32(10) ELSE Int32(0) END AS d
1548 TableScan: test projection=[a]
1549 "
1550 )
1551 }
1552
1553 #[test]
1556 fn test_derived_column() -> Result<()> {
1557 let table_scan = test_table_scan()?;
1558 let plan = LogicalPlanBuilder::from(table_scan)
1559 .project(vec![col("a").add(lit(1)).alias("a"), lit(0).alias("d")])?
1560 .project(vec![
1561 col("a"),
1562 when(col("a").eq(lit(1)), lit(10))
1563 .otherwise(col("d"))?
1564 .alias("d"),
1565 ])?
1566 .build()?;
1567
1568 assert_optimized_plan_equal!(
1569 plan,
1570 @r"
1571 Projection: a, CASE WHEN a = Int32(1) THEN Int32(10) ELSE d END AS d
1572 Projection: test.a + Int32(1) AS a, Int32(0) AS d
1573 TableScan: test projection=[a]
1574 "
1575 )
1576 }
1577
1578 #[test]
1581 fn test_user_defined_logical_plan_node() -> Result<()> {
1582 let table_scan = test_table_scan()?;
1583 let custom_plan = LogicalPlan::Extension(Extension {
1584 node: Arc::new(NoOpUserDefined::new(
1585 Arc::clone(table_scan.schema()),
1586 Arc::new(table_scan.clone()),
1587 )),
1588 });
1589 let plan = LogicalPlanBuilder::from(custom_plan)
1590 .project(vec![col("a"), lit(0).alias("d")])?
1591 .build()?;
1592
1593 assert_optimized_plan_equal!(
1594 plan,
1595 @r"
1596 Projection: test.a, Int32(0) AS d
1597 NoOpUserDefined
1598 TableScan: test projection=[a]
1599 "
1600 )
1601 }
1602
1603 #[test]
1608 fn test_user_defined_logical_plan_node2() -> Result<()> {
1609 let table_scan = test_table_scan()?;
1610 let exprs = vec![Expr::Column(Column::from_qualified_name("b"))];
1611 let custom_plan = LogicalPlan::Extension(Extension {
1612 node: Arc::new(
1613 NoOpUserDefined::new(
1614 Arc::clone(table_scan.schema()),
1615 Arc::new(table_scan.clone()),
1616 )
1617 .with_exprs(exprs),
1618 ),
1619 });
1620 let plan = LogicalPlanBuilder::from(custom_plan)
1621 .project(vec![col("a"), lit(0).alias("d")])?
1622 .build()?;
1623
1624 assert_optimized_plan_equal!(
1625 plan,
1626 @r"
1627 Projection: test.a, Int32(0) AS d
1628 NoOpUserDefined
1629 TableScan: test projection=[a, b]
1630 "
1631 )
1632 }
1633
1634 #[test]
1640 fn test_user_defined_logical_plan_node3() -> Result<()> {
1641 let table_scan = test_table_scan()?;
1642 let left_expr = Expr::Column(Column::from_qualified_name("b"));
1643 let right_expr = Expr::Column(Column::from_qualified_name("c"));
1644 let binary_expr = Expr::BinaryExpr(BinaryExpr::new(
1645 Box::new(left_expr),
1646 Operator::Plus,
1647 Box::new(right_expr),
1648 ));
1649 let exprs = vec![binary_expr];
1650 let custom_plan = LogicalPlan::Extension(Extension {
1651 node: Arc::new(
1652 NoOpUserDefined::new(
1653 Arc::clone(table_scan.schema()),
1654 Arc::new(table_scan.clone()),
1655 )
1656 .with_exprs(exprs),
1657 ),
1658 });
1659 let plan = LogicalPlanBuilder::from(custom_plan)
1660 .project(vec![col("a"), lit(0).alias("d")])?
1661 .build()?;
1662
1663 assert_optimized_plan_equal!(
1664 plan,
1665 @r"
1666 Projection: test.a, Int32(0) AS d
1667 NoOpUserDefined
1668 TableScan: test projection=[a, b, c]
1669 "
1670 )
1671 }
1672
1673 #[test]
1678 fn test_user_defined_logical_plan_node4() -> Result<()> {
1679 let left_table = test_table_scan_with_name("l")?;
1680 let right_table = test_table_scan_with_name("r")?;
1681 let custom_plan = LogicalPlan::Extension(Extension {
1682 node: Arc::new(UserDefinedCrossJoin::new(
1683 Arc::new(left_table),
1684 Arc::new(right_table),
1685 )),
1686 });
1687 let plan = LogicalPlanBuilder::from(custom_plan)
1688 .project(vec![col("l.a"), col("l.c"), col("r.a"), lit(0).alias("d")])?
1689 .build()?;
1690
1691 assert_optimized_plan_equal!(
1692 plan,
1693 @r"
1694 Projection: l.a, l.c, r.a, Int32(0) AS d
1695 UserDefinedCrossJoin
1696 TableScan: l projection=[a, c]
1697 TableScan: r projection=[a]
1698 "
1699 )
1700 }
1701
1702 #[test]
1703 fn aggregate_no_group_by() -> Result<()> {
1704 let table_scan = test_table_scan()?;
1705
1706 let plan = LogicalPlanBuilder::from(table_scan)
1707 .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1708 .build()?;
1709
1710 assert_optimized_plan_equal!(
1711 plan,
1712 @r"
1713 Aggregate: groupBy=[[]], aggr=[[max(test.b)]]
1714 TableScan: test projection=[b]
1715 "
1716 )
1717 }
1718
1719 #[test]
1720 fn aggregate_group_by() -> Result<()> {
1721 let table_scan = test_table_scan()?;
1722
1723 let plan = LogicalPlanBuilder::from(table_scan)
1724 .aggregate(vec![col("c")], vec![max(col("b"))])?
1725 .build()?;
1726
1727 assert_optimized_plan_equal!(
1728 plan,
1729 @r"
1730 Aggregate: groupBy=[[test.c]], aggr=[[max(test.b)]]
1731 TableScan: test projection=[b, c]
1732 "
1733 )
1734 }
1735
1736 #[test]
1737 fn aggregate_group_by_with_table_alias() -> Result<()> {
1738 let table_scan = test_table_scan()?;
1739
1740 let plan = LogicalPlanBuilder::from(table_scan)
1741 .alias("a")?
1742 .aggregate(vec![col("c")], vec![max(col("b"))])?
1743 .build()?;
1744
1745 assert_optimized_plan_equal!(
1746 plan,
1747 @r"
1748 Aggregate: groupBy=[[a.c]], aggr=[[max(a.b)]]
1749 SubqueryAlias: a
1750 TableScan: test projection=[b, c]
1751 "
1752 )
1753 }
1754
1755 #[test]
1756 fn aggregate_no_group_by_with_filter() -> Result<()> {
1757 let table_scan = test_table_scan()?;
1758
1759 let plan = LogicalPlanBuilder::from(table_scan)
1760 .filter(col("c").gt(lit(1)))?
1761 .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1762 .build()?;
1763
1764 assert_optimized_plan_equal!(
1765 plan,
1766 @r"
1767 Aggregate: groupBy=[[]], aggr=[[max(test.b)]]
1768 Projection: test.b
1769 Filter: test.c > Int32(1)
1770 TableScan: test projection=[b, c]
1771 "
1772 )
1773 }
1774
1775 #[test]
1776 fn aggregate_with_periods() -> Result<()> {
1777 let schema = Schema::new(vec![Field::new("tag.one", DataType::Utf8, false)]);
1778
1779 let plan = table_scan(Some("m4"), &schema, None)?
1786 .aggregate(
1787 Vec::<Expr>::new(),
1788 vec![max(col(Column::new_unqualified("tag.one"))).alias("tag.one")],
1789 )?
1790 .project([col(Column::new_unqualified("tag.one"))])?
1791 .build()?;
1792
1793 assert_optimized_plan_equal!(
1794 plan,
1795 @r"
1796 Aggregate: groupBy=[[]], aggr=[[max(m4.tag.one) AS tag.one]]
1797 TableScan: m4 projection=[tag.one]
1798 "
1799 )
1800 }
1801
1802 #[test]
1803 fn redundant_project() -> Result<()> {
1804 let table_scan = test_table_scan()?;
1805
1806 let plan = LogicalPlanBuilder::from(table_scan)
1807 .project(vec![col("a"), col("b"), col("c")])?
1808 .project(vec![col("a"), col("c"), col("b")])?
1809 .build()?;
1810 assert_optimized_plan_equal!(
1811 plan,
1812 @r"
1813 Projection: test.a, test.c, test.b
1814 TableScan: test projection=[a, b, c]
1815 "
1816 )
1817 }
1818
1819 #[test]
1820 fn reorder_scan() -> Result<()> {
1821 let schema = Schema::new(test_table_scan_fields());
1822
1823 let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?.build()?;
1824 assert_optimized_plan_equal!(
1825 plan,
1826 @"TableScan: test projection=[b, a, c]"
1827 )
1828 }
1829
1830 #[test]
1831 fn reorder_scan_projection() -> Result<()> {
1832 let schema = Schema::new(test_table_scan_fields());
1833
1834 let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?
1835 .project(vec![col("a"), col("b")])?
1836 .build()?;
1837 assert_optimized_plan_equal!(
1838 plan,
1839 @r"
1840 Projection: test.a, test.b
1841 TableScan: test projection=[b, a]
1842 "
1843 )
1844 }
1845
1846 #[test]
1847 fn reorder_projection() -> Result<()> {
1848 let table_scan = test_table_scan()?;
1849
1850 let plan = LogicalPlanBuilder::from(table_scan)
1851 .project(vec![col("c"), col("b"), col("a")])?
1852 .build()?;
1853 assert_optimized_plan_equal!(
1854 plan,
1855 @r"
1856 Projection: test.c, test.b, test.a
1857 TableScan: test projection=[a, b, c]
1858 "
1859 )
1860 }
1861
1862 #[test]
1863 fn noncontinuous_redundant_projection() -> Result<()> {
1864 let table_scan = test_table_scan()?;
1865
1866 let plan = LogicalPlanBuilder::from(table_scan)
1867 .project(vec![col("c"), col("b"), col("a")])?
1868 .filter(col("c").gt(lit(1)))?
1869 .project(vec![col("c"), col("a"), col("b")])?
1870 .filter(col("b").gt(lit(1)))?
1871 .filter(col("a").gt(lit(1)))?
1872 .project(vec![col("a"), col("c"), col("b")])?
1873 .build()?;
1874 assert_optimized_plan_equal!(
1875 plan,
1876 @r"
1877 Projection: test.a, test.c, test.b
1878 Filter: test.a > Int32(1)
1879 Filter: test.b > Int32(1)
1880 Projection: test.c, test.a, test.b
1881 Filter: test.c > Int32(1)
1882 Projection: test.c, test.b, test.a
1883 TableScan: test projection=[a, b, c]
1884 "
1885 )
1886 }
1887
1888 #[test]
1889 fn join_schema_trim_full_join_column_projection() -> Result<()> {
1890 let table_scan = test_table_scan()?;
1891
1892 let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1893 let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1894
1895 let plan = LogicalPlanBuilder::from(table_scan)
1896 .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1897 .project(vec![col("a"), col("b"), col("c1")])?
1898 .build()?;
1899
1900 let optimized_plan = optimize(plan)?;
1901
1902 assert_snapshot!(
1904 optimized_plan.clone(),
1905 @r"
1906 Left Join: test.a = test2.c1
1907 TableScan: test projection=[a, b]
1908 TableScan: test2 projection=[c1]
1909 "
1910 );
1911
1912 let optimized_join = optimized_plan;
1914 assert_eq!(
1915 **optimized_join.schema(),
1916 DFSchema::new_with_metadata(
1917 vec![
1918 (
1919 Some("test".into()),
1920 Arc::new(Field::new("a", DataType::UInt32, false))
1921 ),
1922 (
1923 Some("test".into()),
1924 Arc::new(Field::new("b", DataType::UInt32, false))
1925 ),
1926 (
1927 Some("test2".into()),
1928 Arc::new(Field::new("c1", DataType::UInt32, true))
1929 ),
1930 ],
1931 HashMap::new()
1932 )?,
1933 );
1934
1935 Ok(())
1936 }
1937
1938 #[test]
1939 fn join_schema_trim_partial_join_column_projection() -> Result<()> {
1940 let table_scan = test_table_scan()?;
1943
1944 let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1945 let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1946
1947 let plan = LogicalPlanBuilder::from(table_scan)
1948 .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1949 .project(vec![col("a"), col("b")])?
1952 .build()?;
1953
1954 let optimized_plan = optimize(plan)?;
1955
1956 assert_snapshot!(
1958 optimized_plan.clone(),
1959 @r"
1960 Projection: test.a, test.b
1961 Left Join: test.a = test2.c1
1962 TableScan: test projection=[a, b]
1963 TableScan: test2 projection=[c1]
1964 "
1965 );
1966
1967 let optimized_join = optimized_plan.inputs()[0];
1969 assert_eq!(
1970 **optimized_join.schema(),
1971 DFSchema::new_with_metadata(
1972 vec![
1973 (
1974 Some("test".into()),
1975 Arc::new(Field::new("a", DataType::UInt32, false))
1976 ),
1977 (
1978 Some("test".into()),
1979 Arc::new(Field::new("b", DataType::UInt32, false))
1980 ),
1981 (
1982 Some("test2".into()),
1983 Arc::new(Field::new("c1", DataType::UInt32, true))
1984 ),
1985 ],
1986 HashMap::new()
1987 )?,
1988 );
1989
1990 Ok(())
1991 }
1992
1993 #[test]
1994 fn join_schema_trim_using_join() -> Result<()> {
1995 let table_scan = test_table_scan()?;
1998
1999 let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
2000 let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
2001
2002 let plan = LogicalPlanBuilder::from(table_scan)
2003 .join_using(table2_scan, JoinType::Left, vec!["a".into()])?
2004 .project(vec![col("a"), col("b")])?
2005 .build()?;
2006
2007 let optimized_plan = optimize(plan)?;
2008
2009 assert_snapshot!(
2011 optimized_plan.clone(),
2012 @r"
2013 Projection: test.a, test.b
2014 Left Join: Using test.a = test2.a
2015 TableScan: test projection=[a, b]
2016 TableScan: test2 projection=[a]
2017 "
2018 );
2019
2020 let optimized_join = optimized_plan.inputs()[0];
2022 assert_eq!(
2023 **optimized_join.schema(),
2024 DFSchema::new_with_metadata(
2025 vec![
2026 (
2027 Some("test".into()),
2028 Arc::new(Field::new("a", DataType::UInt32, false))
2029 ),
2030 (
2031 Some("test".into()),
2032 Arc::new(Field::new("b", DataType::UInt32, false))
2033 ),
2034 (
2035 Some("test2".into()),
2036 Arc::new(Field::new("a", DataType::UInt32, true))
2037 ),
2038 ],
2039 HashMap::new()
2040 )?,
2041 );
2042
2043 Ok(())
2044 }
2045
2046 #[test]
2047 fn cast() -> Result<()> {
2048 let table_scan = test_table_scan()?;
2049
2050 let plan = LogicalPlanBuilder::from(table_scan)
2051 .project(vec![Expr::Cast(Cast::new(
2052 Box::new(col("c")),
2053 DataType::Float64,
2054 ))])?
2055 .build()?;
2056
2057 assert_optimized_plan_equal!(
2058 plan,
2059 @r"
2060 Projection: CAST(test.c AS Float64)
2061 TableScan: test projection=[c]
2062 "
2063 )
2064 }
2065
2066 #[test]
2067 fn table_scan_projected_schema() -> Result<()> {
2068 let table_scan = test_table_scan()?;
2069 let plan = LogicalPlanBuilder::from(test_table_scan()?)
2070 .project(vec![col("a"), col("b")])?
2071 .build()?;
2072
2073 assert_eq!(3, table_scan.schema().fields().len());
2074 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2075 assert_fields_eq(&plan, vec!["a", "b"]);
2076
2077 assert_optimized_plan_equal!(
2078 plan,
2079 @"TableScan: test projection=[a, b]"
2080 )
2081 }
2082
2083 #[test]
2084 fn table_scan_projected_schema_non_qualified_relation() -> Result<()> {
2085 let table_scan = test_table_scan()?;
2086 let input_schema = table_scan.schema();
2087 assert_eq!(3, input_schema.fields().len());
2088 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2089
2090 let expr = vec![col("test.a"), col("test.b")];
2094 let plan =
2095 LogicalPlan::Projection(Projection::try_new(expr, Arc::new(table_scan))?);
2096
2097 assert_fields_eq(&plan, vec!["a", "b"]);
2098
2099 assert_optimized_plan_equal!(
2100 plan,
2101 @"TableScan: test projection=[a, b]"
2102 )
2103 }
2104
2105 #[test]
2106 fn table_limit() -> Result<()> {
2107 let table_scan = test_table_scan()?;
2108 assert_eq!(3, table_scan.schema().fields().len());
2109 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2110
2111 let plan = LogicalPlanBuilder::from(table_scan)
2112 .project(vec![col("c"), col("a")])?
2113 .limit(0, Some(5))?
2114 .build()?;
2115
2116 assert_fields_eq(&plan, vec!["c", "a"]);
2117
2118 assert_optimized_plan_equal!(
2119 plan,
2120 @r"
2121 Limit: skip=0, fetch=5
2122 Projection: test.c, test.a
2123 TableScan: test projection=[a, c]
2124 "
2125 )
2126 }
2127
2128 #[test]
2129 fn table_scan_without_projection() -> Result<()> {
2130 let table_scan = test_table_scan()?;
2131 let plan = LogicalPlanBuilder::from(table_scan).build()?;
2132 assert_optimized_plan_equal!(
2134 plan,
2135 @"TableScan: test projection=[a, b, c]"
2136 )
2137 }
2138
2139 #[test]
2140 fn table_scan_with_literal_projection() -> Result<()> {
2141 let table_scan = test_table_scan()?;
2142 let plan = LogicalPlanBuilder::from(table_scan)
2143 .project(vec![lit(1_i64), lit(2_i64)])?
2144 .build()?;
2145 assert_optimized_plan_equal!(
2146 plan,
2147 @r"
2148 Projection: Int64(1), Int64(2)
2149 TableScan: test projection=[]
2150 "
2151 )
2152 }
2153
2154 #[test]
2156 fn table_unused_column() -> Result<()> {
2157 let table_scan = test_table_scan()?;
2158 assert_eq!(3, table_scan.schema().fields().len());
2159 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2160
2161 let plan = LogicalPlanBuilder::from(table_scan)
2163 .project(vec![col("c"), col("a"), col("b")])?
2164 .filter(col("c").gt(lit(1)))?
2165 .aggregate(vec![col("c")], vec![max(col("a"))])?
2166 .build()?;
2167
2168 assert_fields_eq(&plan, vec!["c", "max(test.a)"]);
2169
2170 let plan = optimize(plan).expect("failed to optimize plan");
2171 assert_optimized_plan_equal!(
2172 plan,
2173 @r"
2174 Aggregate: groupBy=[[test.c]], aggr=[[max(test.a)]]
2175 Filter: test.c > Int32(1)
2176 Projection: test.c, test.a
2177 TableScan: test projection=[a, c]
2178 "
2179 )
2180 }
2181
2182 #[test]
2184 fn table_unused_projection() -> Result<()> {
2185 let table_scan = test_table_scan()?;
2186 assert_eq!(3, table_scan.schema().fields().len());
2187 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2188
2189 let plan = LogicalPlanBuilder::from(table_scan)
2191 .project(vec![col("b")])?
2192 .project(vec![lit(1).alias("a")])?
2193 .build()?;
2194
2195 assert_fields_eq(&plan, vec!["a"]);
2196
2197 assert_optimized_plan_equal!(
2198 plan,
2199 @r"
2200 Projection: Int32(1) AS a
2201 TableScan: test projection=[]
2202 "
2203 )
2204 }
2205
2206 #[test]
2207 fn table_full_filter_pushdown() -> Result<()> {
2208 let schema = Schema::new(test_table_scan_fields());
2209
2210 let table_scan = table_scan_with_filters(
2211 Some("test"),
2212 &schema,
2213 None,
2214 vec![col("b").eq(lit(1))],
2215 )?
2216 .build()?;
2217 assert_eq!(3, table_scan.schema().fields().len());
2218 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2219
2220 let plan = LogicalPlanBuilder::from(table_scan)
2222 .project(vec![col("b")])?
2223 .project(vec![lit(1).alias("a")])?
2224 .build()?;
2225
2226 assert_fields_eq(&plan, vec!["a"]);
2227
2228 assert_optimized_plan_equal!(
2229 plan,
2230 @r"
2231 Projection: Int32(1) AS a
2232 TableScan: test projection=[], full_filters=[b = Int32(1)]
2233 "
2234 )
2235 }
2236
2237 #[test]
2239 fn test_double_optimization() -> Result<()> {
2240 let table_scan = test_table_scan()?;
2241
2242 let plan = LogicalPlanBuilder::from(table_scan)
2243 .project(vec![col("b")])?
2244 .project(vec![lit(1).alias("a")])?
2245 .build()?;
2246
2247 let optimized_plan1 = optimize(plan).expect("failed to optimize plan");
2248 let optimized_plan2 =
2249 optimize(optimized_plan1.clone()).expect("failed to optimize plan");
2250
2251 let formatted_plan1 = format!("{optimized_plan1:?}");
2252 let formatted_plan2 = format!("{optimized_plan2:?}");
2253 assert_eq!(formatted_plan1, formatted_plan2);
2254 Ok(())
2255 }
2256
2257 #[test]
2258 fn test_continue_processing_through_extension() -> Result<()> {
2259 let table_scan = test_table_scan()?;
2260 let plan = LogicalPlanBuilder::from(table_scan.clone())
2261 .project(vec![col("a")])?
2262 .project(vec![col("a")])?
2263 .build()?;
2264 let plan = LogicalPlan::Extension(Extension {
2265 node: Arc::new(OpaqueRequirementsUserDefined {
2266 input: Arc::new(plan),
2267 schema: Arc::clone(table_scan.schema()),
2268 }),
2269 });
2270 let plan = optimize(plan).expect("failed to optimize plan");
2271 assert_optimized_plan_equal!(
2272 plan,
2273 @r"
2274 OpaqueRequirementsUserDefined
2275 TableScan: test projection=[a]
2276 "
2277 )
2278 }
2279
2280 #[test]
2282 fn table_unused_aggregate() -> Result<()> {
2283 let table_scan = test_table_scan()?;
2284 assert_eq!(3, table_scan.schema().fields().len());
2285 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
2286
2287 let plan = LogicalPlanBuilder::from(table_scan)
2289 .aggregate(vec![col("a"), col("c")], vec![max(col("b")), min(col("b"))])?
2290 .filter(col("c").gt(lit(1)))?
2291 .project(vec![col("c"), col("a"), col("max(test.b)")])?
2292 .build()?;
2293
2294 assert_fields_eq(&plan, vec!["c", "a", "max(test.b)"]);
2295
2296 assert_optimized_plan_equal!(
2297 plan,
2298 @r"
2299 Projection: test.c, test.a, max(test.b)
2300 Filter: test.c > Int32(1)
2301 Aggregate: groupBy=[[test.a, test.c]], aggr=[[max(test.b)]]
2302 TableScan: test projection=[a, b, c]
2303 "
2304 )
2305 }
2306
2307 #[test]
2308 fn aggregate_filter_pushdown() -> Result<()> {
2309 let table_scan = test_table_scan()?;
2310 let aggr_with_filter = count_udaf()
2311 .call(vec![col("b")])
2312 .filter(col("c").gt(lit(42)))
2313 .build()?;
2314 let plan = LogicalPlanBuilder::from(table_scan)
2315 .aggregate(
2316 vec![col("a")],
2317 vec![count(col("b")), aggr_with_filter.alias("count2")],
2318 )?
2319 .build()?;
2320
2321 assert_optimized_plan_equal!(
2322 plan,
2323 @r"
2324 Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]
2325 TableScan: test projection=[a, b, c]
2326 "
2327 )
2328 }
2329
2330 #[test]
2331 fn pushdown_through_distinct() -> Result<()> {
2332 let table_scan = test_table_scan()?;
2333
2334 let plan = LogicalPlanBuilder::from(table_scan)
2335 .project(vec![col("a"), col("b")])?
2336 .distinct()?
2337 .project(vec![col("a")])?
2338 .build()?;
2339
2340 assert_optimized_plan_equal!(
2341 plan,
2342 @r"
2343 Projection: test.a
2344 Distinct:
2345 TableScan: test projection=[a, b]
2346 "
2347 )
2348 }
2349
2350 #[test]
2351 fn test_window() -> Result<()> {
2352 let table_scan = test_table_scan()?;
2353
2354 let max1 = Expr::from(expr::WindowFunction::new(
2355 WindowFunctionDefinition::AggregateUDF(max_udaf()),
2356 vec![col("test.a")],
2357 ))
2358 .partition_by(vec![col("test.b")])
2359 .build()
2360 .unwrap();
2361
2362 let max2 = Expr::from(expr::WindowFunction::new(
2363 WindowFunctionDefinition::AggregateUDF(max_udaf()),
2364 vec![col("test.b")],
2365 ));
2366 let col1 = col(max1.schema_name().to_string());
2367 let col2 = col(max2.schema_name().to_string());
2368
2369 let plan = LogicalPlanBuilder::from(table_scan)
2370 .window(vec![max1])?
2371 .window(vec![max2])?
2372 .project(vec![col1, col2])?
2373 .build()?;
2374
2375 assert_optimized_plan_equal!(
2376 plan,
2377 @r"
2378 Projection: max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
2379 WindowAggr: windowExpr=[[max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
2380 Projection: test.b, max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
2381 WindowAggr: windowExpr=[[max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
2382 TableScan: test projection=[a, b]
2383 "
2384 )
2385 }
2386
2387 #[test]
2391 fn optimize_projections_exists_or_exists_with_outer_join() -> Result<()> {
2392 use datafusion_expr::utils::disjunction;
2393 use datafusion_expr::{exists, out_ref_col};
2394
2395 let table_a = test_table_scan_with_name("a")?;
2396 let table_b = test_table_scan_with_name("b")?;
2397
2398 let sq_a = Arc::new(
2399 LogicalPlanBuilder::from(test_table_scan_with_name("sq_a")?)
2400 .filter(col("sq_a.a").eq(out_ref_col(DataType::UInt32, "a.a")))?
2401 .project(vec![lit(1)])?
2402 .build()?,
2403 );
2404
2405 let sq_b = Arc::new(
2406 LogicalPlanBuilder::from(test_table_scan_with_name("sq_b")?)
2407 .filter(col("sq_b.b").eq(out_ref_col(DataType::UInt32, "a.b")))?
2408 .project(vec![lit(1)])?
2409 .build()?,
2410 );
2411
2412 let plan = LogicalPlanBuilder::from(table_a)
2413 .filter(disjunction(vec![exists(sq_a), exists(sq_b)]).unwrap())?
2414 .join(table_b, JoinType::Left, (vec!["a"], vec!["a"]), None)?
2415 .build()?;
2416
2417 let optimizer = Optimizer::new();
2418 let config = OptimizerContext::new();
2419 optimizer.optimize(plan, &config, observe)?;
2420
2421 Ok(())
2422 }
2423
2424 #[test]
2425 fn optimize_projections_left_mark_join_with_projection() -> Result<()> {
2426 let table_a = test_table_scan_with_name("a")?;
2427 let table_b = test_table_scan_with_name("b")?;
2428 let table_c = test_table_scan_with_name("c")?;
2429
2430 let plan = LogicalPlanBuilder::from(table_a)
2431 .join(table_b, JoinType::LeftMark, (vec!["a"], vec!["a"]), None)?
2432 .project(vec![col("a.a"), col("a.b"), col("a.c")])?
2433 .join(table_c, JoinType::Left, (vec!["a"], vec!["a"]), None)?
2434 .build()?;
2435
2436 assert_optimized_plan_equal!(
2437 plan,
2438 @r"
2439 Left Join: a.a = c.a
2440 Projection: a.a, a.b, a.c
2441 LeftMark Join: a.a = b.a
2442 TableScan: a projection=[a, b, c]
2443 TableScan: b projection=[a]
2444 TableScan: c projection=[a, b, c]
2445 "
2446 )
2447 }
2448
2449 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
2450
2451 fn optimize(plan: LogicalPlan) -> Result<LogicalPlan> {
2452 let optimizer = Optimizer::with_rules(vec![Arc::new(OptimizeProjections::new())]);
2453 let optimized_plan =
2454 optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
2455 Ok(optimized_plan)
2456 }
2457}