1mod required_indices;
21
22use crate::optimizer::ApplyOrder;
23use crate::{OptimizerConfig, OptimizerRule};
24use std::collections::HashSet;
25use std::sync::Arc;
26
27use datafusion_common::{
28 get_required_group_by_exprs_indices, internal_datafusion_err, internal_err, Column,
29 HashMap, JoinType, Result,
30};
31use datafusion_expr::expr::Alias;
32use datafusion_expr::Unnest;
33use datafusion_expr::{
34 logical_plan::LogicalPlan, projection_schema, Aggregate, Distinct, Expr, Projection,
35 TableScan, Window,
36};
37
38use crate::optimize_projections::required_indices::RequiredIndices;
39use crate::utils::NamePreserver;
40use datafusion_common::tree_node::{
41 Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
42};
43
44#[derive(Default, Debug)]
60pub struct OptimizeProjections {}
61
62impl OptimizeProjections {
63 #[allow(missing_docs)]
64 pub fn new() -> Self {
65 Self {}
66 }
67}
68
69impl OptimizerRule for OptimizeProjections {
70 fn name(&self) -> &str {
71 "optimize_projections"
72 }
73
74 fn apply_order(&self) -> Option<ApplyOrder> {
75 None
76 }
77
78 fn supports_rewrite(&self) -> bool {
79 true
80 }
81
82 fn rewrite(
83 &self,
84 plan: LogicalPlan,
85 config: &dyn OptimizerConfig,
86 ) -> Result<Transformed<LogicalPlan>> {
87 let indices = RequiredIndices::new_for_all_exprs(&plan);
89 optimize_projections(plan, config, indices)
90 }
91}
92
93#[cfg_attr(feature = "recursive_protection", recursive::recursive)]
113fn optimize_projections(
114 plan: LogicalPlan,
115 config: &dyn OptimizerConfig,
116 indices: RequiredIndices,
117) -> Result<Transformed<LogicalPlan>> {
118 match plan {
121 LogicalPlan::Projection(proj) => {
122 return merge_consecutive_projections(proj)?.transform_data(|proj| {
123 rewrite_projection_given_requirements(proj, config, &indices)
124 })
125 }
126 LogicalPlan::Aggregate(aggregate) => {
127 let n_group_exprs = aggregate.group_expr_len()?;
129 let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs);
132
133 let group_by_expr_existing = aggregate
135 .group_expr
136 .iter()
137 .map(|group_by_expr| group_by_expr.schema_name().to_string())
138 .collect::<Vec<_>>();
139
140 let new_group_bys = if let Some(simplest_groupby_indices) =
141 get_required_group_by_exprs_indices(
142 aggregate.input.schema(),
143 &group_by_expr_existing,
144 ) {
145 group_by_reqs
149 .append(&simplest_groupby_indices)
150 .get_at_indices(&aggregate.group_expr)
151 } else {
152 aggregate.group_expr
153 };
154
155 let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
158
159 if new_aggr_expr.is_empty()
168 && new_group_bys.is_empty()
169 && !aggregate.aggr_expr.is_empty()
170 {
171 new_aggr_expr = aggregate.aggr_expr;
173 new_aggr_expr.resize_with(1, || unreachable!());
174 }
175
176 let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
177 let schema = aggregate.input.schema();
178 let necessary_indices =
179 RequiredIndices::new().with_exprs(schema, all_exprs_iter);
180 let necessary_exprs = necessary_indices.get_required_exprs(schema);
181
182 return optimize_projections(
183 Arc::unwrap_or_clone(aggregate.input),
184 config,
185 necessary_indices,
186 )?
187 .transform_data(|aggregate_input| {
188 add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)
193 })?
194 .map_data(|aggregate_input| {
195 Aggregate::try_new(
198 Arc::new(aggregate_input),
199 new_group_bys,
200 new_aggr_expr,
201 )
202 .map(LogicalPlan::Aggregate)
203 });
204 }
205 LogicalPlan::Window(window) => {
206 let input_schema = Arc::clone(window.input.schema());
207 let n_input_fields = input_schema.fields().len();
209 let (child_reqs, window_reqs) = indices.split_off(n_input_fields);
212
213 let new_window_expr = window_reqs.get_at_indices(&window.window_expr);
216
217 let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr);
220
221 return optimize_projections(
222 Arc::unwrap_or_clone(window.input),
223 config,
224 required_indices.clone(),
225 )?
226 .transform_data(|window_child| {
227 if new_window_expr.is_empty() {
228 Ok(Transformed::no(window_child))
230 } else {
231 let required_exprs =
235 required_indices.get_required_exprs(&input_schema);
236 let window_child =
237 add_projection_on_top_if_helpful(window_child, required_exprs)?
238 .data;
239 Window::try_new(new_window_expr, Arc::new(window_child))
240 .map(LogicalPlan::Window)
241 .map(Transformed::yes)
242 }
243 });
244 }
245 LogicalPlan::TableScan(table_scan) => {
246 let TableScan {
247 table_name,
248 source,
249 projection,
250 filters,
251 fetch,
252 projected_schema: _,
253 } = table_scan;
254
255 let projection = match &projection {
258 Some(projection) => indices.into_mapped_indices(|idx| projection[idx]),
259 None => indices.into_inner(),
260 };
261 return TableScan::try_new(
262 table_name,
263 source,
264 Some(projection),
265 filters,
266 fetch,
267 )
268 .map(LogicalPlan::TableScan)
269 .map(Transformed::yes);
270 }
271 _ => {}
273 };
274
275 let mut child_required_indices: Vec<RequiredIndices> = match &plan {
278 LogicalPlan::Sort(_)
279 | LogicalPlan::Filter(_)
280 | LogicalPlan::Repartition(_)
281 | LogicalPlan::Union(_)
282 | LogicalPlan::SubqueryAlias(_)
283 | LogicalPlan::Distinct(Distinct::On(_)) => {
284 plan.inputs()
289 .into_iter()
290 .map(|input| {
291 indices
292 .clone()
293 .with_projection_beneficial()
294 .with_plan_exprs(&plan, input.schema())
295 })
296 .collect::<Result<_>>()?
297 }
298 LogicalPlan::Limit(_) => {
299 plan.inputs()
304 .into_iter()
305 .map(|input| indices.clone().with_plan_exprs(&plan, input.schema()))
306 .collect::<Result<_>>()?
307 }
308 LogicalPlan::Copy(_)
309 | LogicalPlan::Ddl(_)
310 | LogicalPlan::Dml(_)
311 | LogicalPlan::Explain(_)
312 | LogicalPlan::Analyze(_)
313 | LogicalPlan::Subquery(_)
314 | LogicalPlan::Statement(_)
315 | LogicalPlan::Distinct(Distinct::All(_)) => {
316 plan.inputs()
322 .into_iter()
323 .map(RequiredIndices::new_for_all_exprs)
324 .collect()
325 }
326 LogicalPlan::Extension(extension) => {
327 let Some(necessary_children_indices) =
328 extension.node.necessary_children_exprs(indices.indices())
329 else {
330 return Ok(Transformed::no(plan));
332 };
333 let children = extension.node.inputs();
334 if children.len() != necessary_children_indices.len() {
335 return internal_err!("Inconsistent length between children and necessary children indices. \
336 Make sure `.necessary_children_exprs` implementation of the `UserDefinedLogicalNode` is \
337 consistent with actual children length for the node.");
338 }
339 children
340 .into_iter()
341 .zip(necessary_children_indices)
342 .map(|(child, necessary_indices)| {
343 RequiredIndices::new_from_indices(necessary_indices)
344 .with_plan_exprs(&plan, child.schema())
345 })
346 .collect::<Result<Vec<_>>>()?
347 }
348 LogicalPlan::EmptyRelation(_)
349 | LogicalPlan::RecursiveQuery(_)
350 | LogicalPlan::Values(_)
351 | LogicalPlan::DescribeTable(_) => {
352 return Ok(Transformed::no(plan));
354 }
355 LogicalPlan::Join(join) => {
356 let left_len = join.left.schema().fields().len();
357 let (left_req_indices, right_req_indices) =
358 split_join_requirements(left_len, indices, &join.join_type);
359 let left_indices =
360 left_req_indices.with_plan_exprs(&plan, join.left.schema())?;
361 let right_indices =
362 right_req_indices.with_plan_exprs(&plan, join.right.schema())?;
363 vec![
366 left_indices.with_projection_beneficial(),
367 right_indices.with_projection_beneficial(),
368 ]
369 }
370 LogicalPlan::Projection(_)
372 | LogicalPlan::Aggregate(_)
373 | LogicalPlan::Window(_)
374 | LogicalPlan::TableScan(_) => {
375 return internal_err!(
376 "OptimizeProjection: should have handled in the match statement above"
377 );
378 }
379 LogicalPlan::Unnest(Unnest {
380 dependency_indices, ..
381 }) => {
382 vec![RequiredIndices::new_from_indices(
383 dependency_indices.clone(),
384 )]
385 }
386 };
387
388 child_required_indices.reverse();
391 if child_required_indices.len() != plan.inputs().len() {
392 return internal_err!(
393 "OptimizeProjection: child_required_indices length mismatch with plan inputs"
394 );
395 }
396
397 let transformed_plan = plan.map_children(|child| {
399 let required_indices = child_required_indices.pop().ok_or_else(|| {
400 internal_datafusion_err!(
401 "Unexpected number of required_indices in OptimizeProjections rule"
402 )
403 })?;
404
405 let projection_beneficial = required_indices.projection_beneficial();
406 let project_exprs = required_indices.get_required_exprs(child.schema());
407
408 optimize_projections(child, config, required_indices)?.transform_data(
409 |new_input| {
410 if projection_beneficial {
411 add_projection_on_top_if_helpful(new_input, project_exprs)
412 } else {
413 Ok(Transformed::no(new_input))
414 }
415 },
416 )
417 })?;
418
419 if transformed_plan.transformed {
421 transformed_plan.map_data(|plan| plan.recompute_schema())
422 } else {
423 Ok(transformed_plan)
424 }
425}
426
427fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Projection>> {
448 let Projection {
449 expr,
450 input,
451 schema,
452 ..
453 } = proj;
454 let LogicalPlan::Projection(prev_projection) = input.as_ref() else {
455 return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
456 };
457
458 let mut column_referral_map = HashMap::<&Column, usize>::new();
460 expr.iter()
461 .for_each(|expr| expr.add_column_ref_counts(&mut column_referral_map));
462
463 if column_referral_map.into_iter().any(|(col, usage)| {
467 usage > 1
468 && !is_expr_trivial(
469 &prev_projection.expr
470 [prev_projection.schema.index_of_column(col).unwrap()],
471 )
472 }) {
473 return Projection::try_new_with_schema(expr, input, schema).map(Transformed::no);
475 }
476
477 let LogicalPlan::Projection(prev_projection) = Arc::unwrap_or_clone(input) else {
478 unreachable!();
480 };
481
482 let name_preserver = NamePreserver::new_for_projection();
485 let mut original_names = vec![];
486 let new_exprs = expr.map_elements(|expr| {
487 original_names.push(name_preserver.save(&expr));
488
489 match expr {
491 Expr::Alias(Alias {
492 expr,
493 relation,
494 name,
495 }) => rewrite_expr(*expr, &prev_projection).map(|result| {
496 result.update_data(|expr| Expr::Alias(Alias::new(expr, relation, name)))
497 }),
498 e => rewrite_expr(e, &prev_projection),
499 }
500 })?;
501
502 if new_exprs.transformed {
505 let new_exprs = new_exprs
507 .data
508 .into_iter()
509 .zip(original_names)
510 .map(|(expr, original_name)| original_name.restore(expr))
511 .collect::<Vec<_>>();
512 Projection::try_new(new_exprs, prev_projection.input).map(Transformed::yes)
513 } else {
514 let input = Arc::new(LogicalPlan::Projection(prev_projection));
516 Projection::try_new_with_schema(new_exprs.data, input, schema)
517 .map(Transformed::no)
518 }
519}
520
521fn is_expr_trivial(expr: &Expr) -> bool {
523 matches!(expr, Expr::Column(_) | Expr::Literal(_))
524}
525
526fn rewrite_expr(expr: Expr, input: &Projection) -> Result<Transformed<Expr>> {
571 expr.transform_up(|expr| {
572 match expr {
573 Expr::Alias(alias) => Ok(Transformed::yes(*alias.expr)),
575 Expr::Column(col) => {
576 let idx = input.schema.index_of_column(&col)?;
578 let input_expr = input.expr[idx].clone().unalias_nested().data;
586 Ok(Transformed::yes(input_expr))
587 }
588 _ => Ok(Transformed::no(expr)),
590 }
591 })
592}
593
594fn outer_columns<'a>(expr: &'a Expr, columns: &mut HashSet<&'a Column>) {
603 expr.apply(|expr| {
605 match expr {
606 Expr::OuterReferenceColumn(_, col) => {
607 columns.insert(col);
608 }
609 Expr::ScalarSubquery(subquery) => {
610 outer_columns_helper_multi(&subquery.outer_ref_columns, columns);
611 }
612 Expr::Exists(exists) => {
613 outer_columns_helper_multi(&exists.subquery.outer_ref_columns, columns);
614 }
615 Expr::InSubquery(insubquery) => {
616 outer_columns_helper_multi(
617 &insubquery.subquery.outer_ref_columns,
618 columns,
619 );
620 }
621 _ => {}
622 };
623 Ok(TreeNodeRecursion::Continue)
624 })
625 .unwrap();
627}
628
629fn outer_columns_helper_multi<'a, 'b>(
638 exprs: impl IntoIterator<Item = &'a Expr>,
639 columns: &'b mut HashSet<&'a Column>,
640) {
641 exprs.into_iter().for_each(|e| outer_columns(e, columns));
642}
643
644fn split_join_requirements(
674 left_len: usize,
675 indices: RequiredIndices,
676 join_type: &JoinType,
677) -> (RequiredIndices, RequiredIndices) {
678 match join_type {
679 JoinType::Inner
681 | JoinType::Left
682 | JoinType::Right
683 | JoinType::Full
684 | JoinType::LeftMark => {
685 indices.split_off(left_len)
688 }
689 JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()),
691 JoinType::RightSemi | JoinType::RightAnti => (RequiredIndices::new(), indices),
694 }
695}
696
697fn add_projection_on_top_if_helpful(
715 plan: LogicalPlan,
716 project_exprs: Vec<Expr>,
717) -> Result<Transformed<LogicalPlan>> {
718 if project_exprs.len() >= plan.schema().fields().len() {
720 Ok(Transformed::no(plan))
721 } else {
722 Projection::try_new(project_exprs, Arc::new(plan))
723 .map(LogicalPlan::Projection)
724 .map(Transformed::yes)
725 }
726}
727
728fn rewrite_projection_given_requirements(
746 proj: Projection,
747 config: &dyn OptimizerConfig,
748 indices: &RequiredIndices,
749) -> Result<Transformed<LogicalPlan>> {
750 let Projection { expr, input, .. } = proj;
751
752 let exprs_used = indices.get_at_indices(&expr);
753
754 let required_indices =
755 RequiredIndices::new().with_exprs(input.schema(), exprs_used.iter());
756
757 optimize_projections(Arc::unwrap_or_clone(input), config, required_indices)?
760 .transform_data(|input| {
761 if is_projection_unnecessary(&input, &exprs_used)? {
762 Ok(Transformed::yes(input))
763 } else {
764 Projection::try_new(exprs_used, Arc::new(input))
765 .map(LogicalPlan::Projection)
766 .map(Transformed::yes)
767 }
768 })
769}
770
771fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result<bool> {
775 let proj_schema = projection_schema(input, proj_exprs)?;
776 Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial))
777}
778
779#[cfg(test)]
780mod tests {
781 use std::cmp::Ordering;
782 use std::collections::HashMap;
783 use std::fmt::Formatter;
784 use std::ops::Add;
785 use std::sync::Arc;
786 use std::vec;
787
788 use crate::optimize_projections::OptimizeProjections;
789 use crate::optimizer::Optimizer;
790 use crate::test::{
791 assert_fields_eq, assert_optimized_plan_eq, scan_empty, test_table_scan,
792 test_table_scan_fields, test_table_scan_with_name,
793 };
794 use crate::{OptimizerContext, OptimizerRule};
795 use arrow::datatypes::{DataType, Field, Schema};
796 use datafusion_common::{
797 Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference,
798 };
799 use datafusion_expr::ExprFunctionExt;
800 use datafusion_expr::{
801 binary_expr, build_join_schema,
802 builder::table_scan_with_filters,
803 col,
804 expr::{self, Cast},
805 lit,
806 logical_plan::{builder::LogicalPlanBuilder, table_scan},
807 not, try_cast, when, BinaryExpr, Expr, Extension, Like, LogicalPlan, Operator,
808 Projection, UserDefinedLogicalNodeCore, WindowFunctionDefinition,
809 };
810
811 use datafusion_functions_aggregate::count::count_udaf;
812 use datafusion_functions_aggregate::expr_fn::{count, max, min};
813 use datafusion_functions_aggregate::min_max::max_udaf;
814
815 fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
816 assert_optimized_plan_eq(Arc::new(OptimizeProjections::new()), plan, expected)
817 }
818
819 #[derive(Debug, Hash, PartialEq, Eq)]
820 struct NoOpUserDefined {
821 exprs: Vec<Expr>,
822 schema: DFSchemaRef,
823 input: Arc<LogicalPlan>,
824 }
825
826 impl NoOpUserDefined {
827 fn new(schema: DFSchemaRef, input: Arc<LogicalPlan>) -> Self {
828 Self {
829 exprs: vec![],
830 schema,
831 input,
832 }
833 }
834
835 fn with_exprs(mut self, exprs: Vec<Expr>) -> Self {
836 self.exprs = exprs;
837 self
838 }
839 }
840
841 impl PartialOrd for NoOpUserDefined {
843 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
844 match self.exprs.partial_cmp(&other.exprs) {
845 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
846 cmp => cmp,
847 }
848 }
849 }
850
851 impl UserDefinedLogicalNodeCore for NoOpUserDefined {
852 fn name(&self) -> &str {
853 "NoOpUserDefined"
854 }
855
856 fn inputs(&self) -> Vec<&LogicalPlan> {
857 vec![&self.input]
858 }
859
860 fn schema(&self) -> &DFSchemaRef {
861 &self.schema
862 }
863
864 fn expressions(&self) -> Vec<Expr> {
865 self.exprs.clone()
866 }
867
868 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
869 write!(f, "NoOpUserDefined")
870 }
871
872 fn with_exprs_and_inputs(
873 &self,
874 exprs: Vec<Expr>,
875 mut inputs: Vec<LogicalPlan>,
876 ) -> Result<Self> {
877 Ok(Self {
878 exprs,
879 input: Arc::new(inputs.swap_remove(0)),
880 schema: Arc::clone(&self.schema),
881 })
882 }
883
884 fn necessary_children_exprs(
885 &self,
886 output_columns: &[usize],
887 ) -> Option<Vec<Vec<usize>>> {
888 Some(vec![output_columns.to_vec()])
890 }
891
892 fn supports_limit_pushdown(&self) -> bool {
893 false }
895 }
896
897 #[derive(Debug, Hash, PartialEq, Eq)]
898 struct UserDefinedCrossJoin {
899 exprs: Vec<Expr>,
900 schema: DFSchemaRef,
901 left_child: Arc<LogicalPlan>,
902 right_child: Arc<LogicalPlan>,
903 }
904
905 impl UserDefinedCrossJoin {
906 fn new(left_child: Arc<LogicalPlan>, right_child: Arc<LogicalPlan>) -> Self {
907 let left_schema = left_child.schema();
908 let right_schema = right_child.schema();
909 let schema = Arc::new(
910 build_join_schema(left_schema, right_schema, &JoinType::Inner).unwrap(),
911 );
912 Self {
913 exprs: vec![],
914 schema,
915 left_child,
916 right_child,
917 }
918 }
919 }
920
921 impl PartialOrd for UserDefinedCrossJoin {
923 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
924 match self.exprs.partial_cmp(&other.exprs) {
925 Some(Ordering::Equal) => {
926 match self.left_child.partial_cmp(&other.left_child) {
927 Some(Ordering::Equal) => {
928 self.right_child.partial_cmp(&other.right_child)
929 }
930 cmp => cmp,
931 }
932 }
933 cmp => cmp,
934 }
935 }
936 }
937
938 impl UserDefinedLogicalNodeCore for UserDefinedCrossJoin {
939 fn name(&self) -> &str {
940 "UserDefinedCrossJoin"
941 }
942
943 fn inputs(&self) -> Vec<&LogicalPlan> {
944 vec![&self.left_child, &self.right_child]
945 }
946
947 fn schema(&self) -> &DFSchemaRef {
948 &self.schema
949 }
950
951 fn expressions(&self) -> Vec<Expr> {
952 self.exprs.clone()
953 }
954
955 fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result {
956 write!(f, "UserDefinedCrossJoin")
957 }
958
959 fn with_exprs_and_inputs(
960 &self,
961 exprs: Vec<Expr>,
962 mut inputs: Vec<LogicalPlan>,
963 ) -> Result<Self> {
964 assert_eq!(inputs.len(), 2);
965 Ok(Self {
966 exprs,
967 left_child: Arc::new(inputs.remove(0)),
968 right_child: Arc::new(inputs.remove(0)),
969 schema: Arc::clone(&self.schema),
970 })
971 }
972
973 fn necessary_children_exprs(
974 &self,
975 output_columns: &[usize],
976 ) -> Option<Vec<Vec<usize>>> {
977 let left_child_len = self.left_child.schema().fields().len();
978 let mut left_reqs = vec![];
979 let mut right_reqs = vec![];
980 for &out_idx in output_columns {
981 if out_idx < left_child_len {
982 left_reqs.push(out_idx);
983 } else {
984 right_reqs.push(out_idx - left_child_len)
987 }
988 }
989 Some(vec![left_reqs, right_reqs])
990 }
991
992 fn supports_limit_pushdown(&self) -> bool {
993 false }
995 }
996
997 #[test]
998 fn merge_two_projection() -> Result<()> {
999 let table_scan = test_table_scan()?;
1000 let plan = LogicalPlanBuilder::from(table_scan)
1001 .project(vec![col("a")])?
1002 .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1003 .build()?;
1004
1005 let expected = "Projection: Int32(1) + test.a\
1006 \n TableScan: test projection=[a]";
1007 assert_optimized_plan_equal(plan, expected)
1008 }
1009
1010 #[test]
1011 fn merge_three_projection() -> Result<()> {
1012 let table_scan = test_table_scan()?;
1013 let plan = LogicalPlanBuilder::from(table_scan)
1014 .project(vec![col("a"), col("b")])?
1015 .project(vec![col("a")])?
1016 .project(vec![binary_expr(lit(1), Operator::Plus, col("a"))])?
1017 .build()?;
1018
1019 let expected = "Projection: Int32(1) + test.a\
1020 \n TableScan: test projection=[a]";
1021 assert_optimized_plan_equal(plan, expected)
1022 }
1023
1024 #[test]
1025 fn merge_alias() -> Result<()> {
1026 let table_scan = test_table_scan()?;
1027 let plan = LogicalPlanBuilder::from(table_scan)
1028 .project(vec![col("a")])?
1029 .project(vec![col("a").alias("alias")])?
1030 .build()?;
1031
1032 let expected = "Projection: test.a AS alias\
1033 \n TableScan: test projection=[a]";
1034 assert_optimized_plan_equal(plan, expected)
1035 }
1036
1037 #[test]
1038 fn merge_nested_alias() -> Result<()> {
1039 let table_scan = test_table_scan()?;
1040 let plan = LogicalPlanBuilder::from(table_scan)
1041 .project(vec![col("a").alias("alias1").alias("alias2")])?
1042 .project(vec![col("alias2").alias("alias")])?
1043 .build()?;
1044
1045 let expected = "Projection: test.a AS alias\
1046 \n TableScan: test projection=[a]";
1047 assert_optimized_plan_equal(plan, expected)
1048 }
1049
1050 #[test]
1051 fn test_nested_count() -> Result<()> {
1052 let schema = Schema::new(vec![Field::new("foo", DataType::Int32, false)]);
1053
1054 let groups: Vec<Expr> = vec![];
1055
1056 let plan = table_scan(TableReference::none(), &schema, None)
1057 .unwrap()
1058 .aggregate(groups.clone(), vec![count(lit(1))])
1059 .unwrap()
1060 .aggregate(groups, vec![count(lit(1))])
1061 .unwrap()
1062 .build()
1063 .unwrap();
1064
1065 let expected = "Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]\
1066 \n Projection: \
1067 \n Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]\
1068 \n TableScan: ?table? projection=[]";
1069 assert_optimized_plan_equal(plan, expected)
1070 }
1071
1072 #[test]
1073 fn test_neg_push_down() -> Result<()> {
1074 let table_scan = test_table_scan()?;
1075 let plan = LogicalPlanBuilder::from(table_scan)
1076 .project(vec![-col("a")])?
1077 .build()?;
1078
1079 let expected = "Projection: (- test.a)\
1080 \n TableScan: test projection=[a]";
1081 assert_optimized_plan_equal(plan, expected)
1082 }
1083
1084 #[test]
1085 fn test_is_null() -> Result<()> {
1086 let table_scan = test_table_scan()?;
1087 let plan = LogicalPlanBuilder::from(table_scan)
1088 .project(vec![col("a").is_null()])?
1089 .build()?;
1090
1091 let expected = "Projection: test.a IS NULL\
1092 \n TableScan: test projection=[a]";
1093 assert_optimized_plan_equal(plan, expected)
1094 }
1095
1096 #[test]
1097 fn test_is_not_null() -> Result<()> {
1098 let table_scan = test_table_scan()?;
1099 let plan = LogicalPlanBuilder::from(table_scan)
1100 .project(vec![col("a").is_not_null()])?
1101 .build()?;
1102
1103 let expected = "Projection: test.a IS NOT NULL\
1104 \n TableScan: test projection=[a]";
1105 assert_optimized_plan_equal(plan, expected)
1106 }
1107
1108 #[test]
1109 fn test_is_true() -> Result<()> {
1110 let table_scan = test_table_scan()?;
1111 let plan = LogicalPlanBuilder::from(table_scan)
1112 .project(vec![col("a").is_true()])?
1113 .build()?;
1114
1115 let expected = "Projection: test.a IS TRUE\
1116 \n TableScan: test projection=[a]";
1117 assert_optimized_plan_equal(plan, expected)
1118 }
1119
1120 #[test]
1121 fn test_is_not_true() -> Result<()> {
1122 let table_scan = test_table_scan()?;
1123 let plan = LogicalPlanBuilder::from(table_scan)
1124 .project(vec![col("a").is_not_true()])?
1125 .build()?;
1126
1127 let expected = "Projection: test.a IS NOT TRUE\
1128 \n TableScan: test projection=[a]";
1129 assert_optimized_plan_equal(plan, expected)
1130 }
1131
1132 #[test]
1133 fn test_is_false() -> Result<()> {
1134 let table_scan = test_table_scan()?;
1135 let plan = LogicalPlanBuilder::from(table_scan)
1136 .project(vec![col("a").is_false()])?
1137 .build()?;
1138
1139 let expected = "Projection: test.a IS FALSE\
1140 \n TableScan: test projection=[a]";
1141 assert_optimized_plan_equal(plan, expected)
1142 }
1143
1144 #[test]
1145 fn test_is_not_false() -> Result<()> {
1146 let table_scan = test_table_scan()?;
1147 let plan = LogicalPlanBuilder::from(table_scan)
1148 .project(vec![col("a").is_not_false()])?
1149 .build()?;
1150
1151 let expected = "Projection: test.a IS NOT FALSE\
1152 \n TableScan: test projection=[a]";
1153 assert_optimized_plan_equal(plan, expected)
1154 }
1155
1156 #[test]
1157 fn test_is_unknown() -> Result<()> {
1158 let table_scan = test_table_scan()?;
1159 let plan = LogicalPlanBuilder::from(table_scan)
1160 .project(vec![col("a").is_unknown()])?
1161 .build()?;
1162
1163 let expected = "Projection: test.a IS UNKNOWN\
1164 \n TableScan: test projection=[a]";
1165 assert_optimized_plan_equal(plan, expected)
1166 }
1167
1168 #[test]
1169 fn test_is_not_unknown() -> Result<()> {
1170 let table_scan = test_table_scan()?;
1171 let plan = LogicalPlanBuilder::from(table_scan)
1172 .project(vec![col("a").is_not_unknown()])?
1173 .build()?;
1174
1175 let expected = "Projection: test.a IS NOT UNKNOWN\
1176 \n TableScan: test projection=[a]";
1177 assert_optimized_plan_equal(plan, expected)
1178 }
1179
1180 #[test]
1181 fn test_not() -> Result<()> {
1182 let table_scan = test_table_scan()?;
1183 let plan = LogicalPlanBuilder::from(table_scan)
1184 .project(vec![not(col("a"))])?
1185 .build()?;
1186
1187 let expected = "Projection: NOT test.a\
1188 \n TableScan: test projection=[a]";
1189 assert_optimized_plan_equal(plan, expected)
1190 }
1191
1192 #[test]
1193 fn test_try_cast() -> Result<()> {
1194 let table_scan = test_table_scan()?;
1195 let plan = LogicalPlanBuilder::from(table_scan)
1196 .project(vec![try_cast(col("a"), DataType::Float64)])?
1197 .build()?;
1198
1199 let expected = "Projection: TRY_CAST(test.a AS Float64)\
1200 \n TableScan: test projection=[a]";
1201 assert_optimized_plan_equal(plan, expected)
1202 }
1203
1204 #[test]
1205 fn test_similar_to() -> Result<()> {
1206 let table_scan = test_table_scan()?;
1207 let expr = Box::new(col("a"));
1208 let pattern = Box::new(lit("[0-9]"));
1209 let similar_to_expr =
1210 Expr::SimilarTo(Like::new(false, expr, pattern, None, false));
1211 let plan = LogicalPlanBuilder::from(table_scan)
1212 .project(vec![similar_to_expr])?
1213 .build()?;
1214
1215 let expected = "Projection: test.a SIMILAR TO Utf8(\"[0-9]\")\
1216 \n TableScan: test projection=[a]";
1217 assert_optimized_plan_equal(plan, expected)
1218 }
1219
1220 #[test]
1221 fn test_between() -> Result<()> {
1222 let table_scan = test_table_scan()?;
1223 let plan = LogicalPlanBuilder::from(table_scan)
1224 .project(vec![col("a").between(lit(1), lit(3))])?
1225 .build()?;
1226
1227 let expected = "Projection: test.a BETWEEN Int32(1) AND Int32(3)\
1228 \n TableScan: test projection=[a]";
1229 assert_optimized_plan_equal(plan, expected)
1230 }
1231
1232 #[test]
1234 fn test_case_merged() -> Result<()> {
1235 let table_scan = test_table_scan()?;
1236 let plan = LogicalPlanBuilder::from(table_scan)
1237 .project(vec![col("a"), lit(0).alias("d")])?
1238 .project(vec![
1239 col("a"),
1240 when(col("a").eq(lit(1)), lit(10))
1241 .otherwise(col("d"))?
1242 .alias("d"),
1243 ])?
1244 .build()?;
1245
1246 let expected = "Projection: test.a, CASE WHEN test.a = Int32(1) THEN Int32(10) ELSE Int32(0) END AS d\
1247 \n TableScan: test projection=[a]";
1248 assert_optimized_plan_equal(plan, expected)
1249 }
1250
1251 #[test]
1254 fn test_derived_column() -> Result<()> {
1255 let table_scan = test_table_scan()?;
1256 let plan = LogicalPlanBuilder::from(table_scan)
1257 .project(vec![col("a").add(lit(1)).alias("a"), lit(0).alias("d")])?
1258 .project(vec![
1259 col("a"),
1260 when(col("a").eq(lit(1)), lit(10))
1261 .otherwise(col("d"))?
1262 .alias("d"),
1263 ])?
1264 .build()?;
1265
1266 let expected =
1267 "Projection: a, CASE WHEN a = Int32(1) THEN Int32(10) ELSE d END AS d\
1268 \n Projection: test.a + Int32(1) AS a, Int32(0) AS d\
1269 \n TableScan: test projection=[a]";
1270 assert_optimized_plan_equal(plan, expected)
1271 }
1272
1273 #[test]
1276 fn test_user_defined_logical_plan_node() -> Result<()> {
1277 let table_scan = test_table_scan()?;
1278 let custom_plan = LogicalPlan::Extension(Extension {
1279 node: Arc::new(NoOpUserDefined::new(
1280 Arc::clone(table_scan.schema()),
1281 Arc::new(table_scan.clone()),
1282 )),
1283 });
1284 let plan = LogicalPlanBuilder::from(custom_plan)
1285 .project(vec![col("a"), lit(0).alias("d")])?
1286 .build()?;
1287
1288 let expected = "Projection: test.a, Int32(0) AS d\
1289 \n NoOpUserDefined\
1290 \n TableScan: test projection=[a]";
1291 assert_optimized_plan_equal(plan, expected)
1292 }
1293
1294 #[test]
1299 fn test_user_defined_logical_plan_node2() -> Result<()> {
1300 let table_scan = test_table_scan()?;
1301 let exprs = vec![Expr::Column(Column::from_qualified_name("b"))];
1302 let custom_plan = LogicalPlan::Extension(Extension {
1303 node: Arc::new(
1304 NoOpUserDefined::new(
1305 Arc::clone(table_scan.schema()),
1306 Arc::new(table_scan.clone()),
1307 )
1308 .with_exprs(exprs),
1309 ),
1310 });
1311 let plan = LogicalPlanBuilder::from(custom_plan)
1312 .project(vec![col("a"), lit(0).alias("d")])?
1313 .build()?;
1314
1315 let expected = "Projection: test.a, Int32(0) AS d\
1316 \n NoOpUserDefined\
1317 \n TableScan: test projection=[a, b]";
1318 assert_optimized_plan_equal(plan, expected)
1319 }
1320
1321 #[test]
1327 fn test_user_defined_logical_plan_node3() -> Result<()> {
1328 let table_scan = test_table_scan()?;
1329 let left_expr = Expr::Column(Column::from_qualified_name("b"));
1330 let right_expr = Expr::Column(Column::from_qualified_name("c"));
1331 let binary_expr = Expr::BinaryExpr(BinaryExpr::new(
1332 Box::new(left_expr),
1333 Operator::Plus,
1334 Box::new(right_expr),
1335 ));
1336 let exprs = vec![binary_expr];
1337 let custom_plan = LogicalPlan::Extension(Extension {
1338 node: Arc::new(
1339 NoOpUserDefined::new(
1340 Arc::clone(table_scan.schema()),
1341 Arc::new(table_scan.clone()),
1342 )
1343 .with_exprs(exprs),
1344 ),
1345 });
1346 let plan = LogicalPlanBuilder::from(custom_plan)
1347 .project(vec![col("a"), lit(0).alias("d")])?
1348 .build()?;
1349
1350 let expected = "Projection: test.a, Int32(0) AS d\
1351 \n NoOpUserDefined\
1352 \n TableScan: test projection=[a, b, c]";
1353 assert_optimized_plan_equal(plan, expected)
1354 }
1355
1356 #[test]
1361 fn test_user_defined_logical_plan_node4() -> Result<()> {
1362 let left_table = test_table_scan_with_name("l")?;
1363 let right_table = test_table_scan_with_name("r")?;
1364 let custom_plan = LogicalPlan::Extension(Extension {
1365 node: Arc::new(UserDefinedCrossJoin::new(
1366 Arc::new(left_table),
1367 Arc::new(right_table),
1368 )),
1369 });
1370 let plan = LogicalPlanBuilder::from(custom_plan)
1371 .project(vec![col("l.a"), col("l.c"), col("r.a"), lit(0).alias("d")])?
1372 .build()?;
1373
1374 let expected = "Projection: l.a, l.c, r.a, Int32(0) AS d\
1375 \n UserDefinedCrossJoin\
1376 \n TableScan: l projection=[a, c]\
1377 \n TableScan: r projection=[a]";
1378 assert_optimized_plan_equal(plan, expected)
1379 }
1380
1381 #[test]
1382 fn aggregate_no_group_by() -> Result<()> {
1383 let table_scan = test_table_scan()?;
1384
1385 let plan = LogicalPlanBuilder::from(table_scan)
1386 .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1387 .build()?;
1388
1389 let expected = "Aggregate: groupBy=[[]], aggr=[[max(test.b)]]\
1390 \n TableScan: test projection=[b]";
1391
1392 assert_optimized_plan_equal(plan, expected)
1393 }
1394
1395 #[test]
1396 fn aggregate_group_by() -> Result<()> {
1397 let table_scan = test_table_scan()?;
1398
1399 let plan = LogicalPlanBuilder::from(table_scan)
1400 .aggregate(vec![col("c")], vec![max(col("b"))])?
1401 .build()?;
1402
1403 let expected = "Aggregate: groupBy=[[test.c]], aggr=[[max(test.b)]]\
1404 \n TableScan: test projection=[b, c]";
1405
1406 assert_optimized_plan_equal(plan, expected)
1407 }
1408
1409 #[test]
1410 fn aggregate_group_by_with_table_alias() -> Result<()> {
1411 let table_scan = test_table_scan()?;
1412
1413 let plan = LogicalPlanBuilder::from(table_scan)
1414 .alias("a")?
1415 .aggregate(vec![col("c")], vec![max(col("b"))])?
1416 .build()?;
1417
1418 let expected = "Aggregate: groupBy=[[a.c]], aggr=[[max(a.b)]]\
1419 \n SubqueryAlias: a\
1420 \n TableScan: test projection=[b, c]";
1421
1422 assert_optimized_plan_equal(plan, expected)
1423 }
1424
1425 #[test]
1426 fn aggregate_no_group_by_with_filter() -> Result<()> {
1427 let table_scan = test_table_scan()?;
1428
1429 let plan = LogicalPlanBuilder::from(table_scan)
1430 .filter(col("c").gt(lit(1)))?
1431 .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
1432 .build()?;
1433
1434 let expected = "Aggregate: groupBy=[[]], aggr=[[max(test.b)]]\
1435 \n Projection: test.b\
1436 \n Filter: test.c > Int32(1)\
1437 \n TableScan: test projection=[b, c]";
1438
1439 assert_optimized_plan_equal(plan, expected)
1440 }
1441
1442 #[test]
1443 fn aggregate_with_periods() -> Result<()> {
1444 let schema = Schema::new(vec![Field::new("tag.one", DataType::Utf8, false)]);
1445
1446 let plan = table_scan(Some("m4"), &schema, None)?
1453 .aggregate(
1454 Vec::<Expr>::new(),
1455 vec![max(col(Column::new_unqualified("tag.one"))).alias("tag.one")],
1456 )?
1457 .project([col(Column::new_unqualified("tag.one"))])?
1458 .build()?;
1459
1460 let expected = "\
1461 Aggregate: groupBy=[[]], aggr=[[max(m4.tag.one) AS tag.one]]\
1462 \n TableScan: m4 projection=[tag.one]";
1463
1464 assert_optimized_plan_equal(plan, expected)
1465 }
1466
1467 #[test]
1468 fn redundant_project() -> Result<()> {
1469 let table_scan = test_table_scan()?;
1470
1471 let plan = LogicalPlanBuilder::from(table_scan)
1472 .project(vec![col("a"), col("b"), col("c")])?
1473 .project(vec![col("a"), col("c"), col("b")])?
1474 .build()?;
1475 let expected = "Projection: test.a, test.c, test.b\
1476 \n TableScan: test projection=[a, b, c]";
1477
1478 assert_optimized_plan_equal(plan, expected)
1479 }
1480
1481 #[test]
1482 fn reorder_scan() -> Result<()> {
1483 let schema = Schema::new(test_table_scan_fields());
1484
1485 let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?.build()?;
1486 let expected = "TableScan: test projection=[b, a, c]";
1487
1488 assert_optimized_plan_equal(plan, expected)
1489 }
1490
1491 #[test]
1492 fn reorder_scan_projection() -> Result<()> {
1493 let schema = Schema::new(test_table_scan_fields());
1494
1495 let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?
1496 .project(vec![col("a"), col("b")])?
1497 .build()?;
1498 let expected = "Projection: test.a, test.b\
1499 \n TableScan: test projection=[b, a]";
1500
1501 assert_optimized_plan_equal(plan, expected)
1502 }
1503
1504 #[test]
1505 fn reorder_projection() -> Result<()> {
1506 let table_scan = test_table_scan()?;
1507
1508 let plan = LogicalPlanBuilder::from(table_scan)
1509 .project(vec![col("c"), col("b"), col("a")])?
1510 .build()?;
1511 let expected = "Projection: test.c, test.b, test.a\
1512 \n TableScan: test projection=[a, b, c]";
1513
1514 assert_optimized_plan_equal(plan, expected)
1515 }
1516
1517 #[test]
1518 fn noncontinuous_redundant_projection() -> Result<()> {
1519 let table_scan = test_table_scan()?;
1520
1521 let plan = LogicalPlanBuilder::from(table_scan)
1522 .project(vec![col("c"), col("b"), col("a")])?
1523 .filter(col("c").gt(lit(1)))?
1524 .project(vec![col("c"), col("a"), col("b")])?
1525 .filter(col("b").gt(lit(1)))?
1526 .filter(col("a").gt(lit(1)))?
1527 .project(vec![col("a"), col("c"), col("b")])?
1528 .build()?;
1529 let expected = "Projection: test.a, test.c, test.b\
1530 \n Filter: test.a > Int32(1)\
1531 \n Filter: test.b > Int32(1)\
1532 \n Projection: test.c, test.a, test.b\
1533 \n Filter: test.c > Int32(1)\
1534 \n Projection: test.c, test.b, test.a\
1535 \n TableScan: test projection=[a, b, c]";
1536 assert_optimized_plan_equal(plan, expected)
1537 }
1538
1539 #[test]
1540 fn join_schema_trim_full_join_column_projection() -> Result<()> {
1541 let table_scan = test_table_scan()?;
1542
1543 let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1544 let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1545
1546 let plan = LogicalPlanBuilder::from(table_scan)
1547 .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1548 .project(vec![col("a"), col("b"), col("c1")])?
1549 .build()?;
1550
1551 let expected = "Left Join: test.a = test2.c1\
1553 \n TableScan: test projection=[a, b]\
1554 \n TableScan: test2 projection=[c1]";
1555
1556 let optimized_plan = optimize(plan)?;
1557 let formatted_plan = format!("{optimized_plan}");
1558 assert_eq!(formatted_plan, expected);
1559
1560 let optimized_join = optimized_plan;
1562 assert_eq!(
1563 **optimized_join.schema(),
1564 DFSchema::new_with_metadata(
1565 vec![
1566 (
1567 Some("test".into()),
1568 Arc::new(Field::new("a", DataType::UInt32, false))
1569 ),
1570 (
1571 Some("test".into()),
1572 Arc::new(Field::new("b", DataType::UInt32, false))
1573 ),
1574 (
1575 Some("test2".into()),
1576 Arc::new(Field::new("c1", DataType::UInt32, true))
1577 ),
1578 ],
1579 HashMap::new()
1580 )?,
1581 );
1582
1583 Ok(())
1584 }
1585
1586 #[test]
1587 fn join_schema_trim_partial_join_column_projection() -> Result<()> {
1588 let table_scan = test_table_scan()?;
1591
1592 let schema = Schema::new(vec![Field::new("c1", DataType::UInt32, false)]);
1593 let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1594
1595 let plan = LogicalPlanBuilder::from(table_scan)
1596 .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
1597 .project(vec![col("a"), col("b")])?
1600 .build()?;
1601
1602 let expected = "Projection: test.a, test.b\
1604 \n Left Join: test.a = test2.c1\
1605 \n TableScan: test projection=[a, b]\
1606 \n TableScan: test2 projection=[c1]";
1607
1608 let optimized_plan = optimize(plan)?;
1609 let formatted_plan = format!("{optimized_plan}");
1610 assert_eq!(formatted_plan, expected);
1611
1612 let optimized_join = optimized_plan.inputs()[0];
1614 assert_eq!(
1615 **optimized_join.schema(),
1616 DFSchema::new_with_metadata(
1617 vec![
1618 (
1619 Some("test".into()),
1620 Arc::new(Field::new("a", DataType::UInt32, false))
1621 ),
1622 (
1623 Some("test".into()),
1624 Arc::new(Field::new("b", DataType::UInt32, false))
1625 ),
1626 (
1627 Some("test2".into()),
1628 Arc::new(Field::new("c1", DataType::UInt32, true))
1629 ),
1630 ],
1631 HashMap::new()
1632 )?,
1633 );
1634
1635 Ok(())
1636 }
1637
1638 #[test]
1639 fn join_schema_trim_using_join() -> Result<()> {
1640 let table_scan = test_table_scan()?;
1643
1644 let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]);
1645 let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
1646
1647 let plan = LogicalPlanBuilder::from(table_scan)
1648 .join_using(table2_scan, JoinType::Left, vec!["a"])?
1649 .project(vec![col("a"), col("b")])?
1650 .build()?;
1651
1652 let expected = "Projection: test.a, test.b\
1654 \n Left Join: Using test.a = test2.a\
1655 \n TableScan: test projection=[a, b]\
1656 \n TableScan: test2 projection=[a]";
1657
1658 let optimized_plan = optimize(plan)?;
1659 let formatted_plan = format!("{optimized_plan}");
1660 assert_eq!(formatted_plan, expected);
1661
1662 let optimized_join = optimized_plan.inputs()[0];
1664 assert_eq!(
1665 **optimized_join.schema(),
1666 DFSchema::new_with_metadata(
1667 vec![
1668 (
1669 Some("test".into()),
1670 Arc::new(Field::new("a", DataType::UInt32, false))
1671 ),
1672 (
1673 Some("test".into()),
1674 Arc::new(Field::new("b", DataType::UInt32, false))
1675 ),
1676 (
1677 Some("test2".into()),
1678 Arc::new(Field::new("a", DataType::UInt32, true))
1679 ),
1680 ],
1681 HashMap::new()
1682 )?,
1683 );
1684
1685 Ok(())
1686 }
1687
1688 #[test]
1689 fn cast() -> Result<()> {
1690 let table_scan = test_table_scan()?;
1691
1692 let projection = LogicalPlanBuilder::from(table_scan)
1693 .project(vec![Expr::Cast(Cast::new(
1694 Box::new(col("c")),
1695 DataType::Float64,
1696 ))])?
1697 .build()?;
1698
1699 let expected = "Projection: CAST(test.c AS Float64)\
1700 \n TableScan: test projection=[c]";
1701
1702 assert_optimized_plan_equal(projection, expected)
1703 }
1704
1705 #[test]
1706 fn table_scan_projected_schema() -> Result<()> {
1707 let table_scan = test_table_scan()?;
1708 let plan = LogicalPlanBuilder::from(test_table_scan()?)
1709 .project(vec![col("a"), col("b")])?
1710 .build()?;
1711
1712 assert_eq!(3, table_scan.schema().fields().len());
1713 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1714 assert_fields_eq(&plan, vec!["a", "b"]);
1715
1716 let expected = "TableScan: test projection=[a, b]";
1717
1718 assert_optimized_plan_equal(plan, expected)
1719 }
1720
1721 #[test]
1722 fn table_scan_projected_schema_non_qualified_relation() -> Result<()> {
1723 let table_scan = test_table_scan()?;
1724 let input_schema = table_scan.schema();
1725 assert_eq!(3, input_schema.fields().len());
1726 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1727
1728 let expr = vec![col("test.a"), col("test.b")];
1732 let plan =
1733 LogicalPlan::Projection(Projection::try_new(expr, Arc::new(table_scan))?);
1734
1735 assert_fields_eq(&plan, vec!["a", "b"]);
1736
1737 let expected = "TableScan: test projection=[a, b]";
1738
1739 assert_optimized_plan_equal(plan, expected)
1740 }
1741
1742 #[test]
1743 fn table_limit() -> Result<()> {
1744 let table_scan = test_table_scan()?;
1745 assert_eq!(3, table_scan.schema().fields().len());
1746 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1747
1748 let plan = LogicalPlanBuilder::from(table_scan)
1749 .project(vec![col("c"), col("a")])?
1750 .limit(0, Some(5))?
1751 .build()?;
1752
1753 assert_fields_eq(&plan, vec!["c", "a"]);
1754
1755 let expected = "Limit: skip=0, fetch=5\
1756 \n Projection: test.c, test.a\
1757 \n TableScan: test projection=[a, c]";
1758
1759 assert_optimized_plan_equal(plan, expected)
1760 }
1761
1762 #[test]
1763 fn table_scan_without_projection() -> Result<()> {
1764 let table_scan = test_table_scan()?;
1765 let plan = LogicalPlanBuilder::from(table_scan).build()?;
1766 let expected = "TableScan: test projection=[a, b, c]";
1768 assert_optimized_plan_equal(plan, expected)
1769 }
1770
1771 #[test]
1772 fn table_scan_with_literal_projection() -> Result<()> {
1773 let table_scan = test_table_scan()?;
1774 let plan = LogicalPlanBuilder::from(table_scan)
1775 .project(vec![lit(1_i64), lit(2_i64)])?
1776 .build()?;
1777 let expected = "Projection: Int64(1), Int64(2)\
1778 \n TableScan: test projection=[]";
1779 assert_optimized_plan_equal(plan, expected)
1780 }
1781
1782 #[test]
1784 fn table_unused_column() -> Result<()> {
1785 let table_scan = test_table_scan()?;
1786 assert_eq!(3, table_scan.schema().fields().len());
1787 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1788
1789 let plan = LogicalPlanBuilder::from(table_scan)
1791 .project(vec![col("c"), col("a"), col("b")])?
1792 .filter(col("c").gt(lit(1)))?
1793 .aggregate(vec![col("c")], vec![max(col("a"))])?
1794 .build()?;
1795
1796 assert_fields_eq(&plan, vec!["c", "max(test.a)"]);
1797
1798 let plan = optimize(plan).expect("failed to optimize plan");
1799 let expected = "\
1800 Aggregate: groupBy=[[test.c]], aggr=[[max(test.a)]]\
1801 \n Filter: test.c > Int32(1)\
1802 \n Projection: test.c, test.a\
1803 \n TableScan: test projection=[a, c]";
1804
1805 assert_optimized_plan_equal(plan, expected)
1806 }
1807
1808 #[test]
1810 fn table_unused_projection() -> Result<()> {
1811 let table_scan = test_table_scan()?;
1812 assert_eq!(3, table_scan.schema().fields().len());
1813 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1814
1815 let plan = LogicalPlanBuilder::from(table_scan)
1817 .project(vec![col("b")])?
1818 .project(vec![lit(1).alias("a")])?
1819 .build()?;
1820
1821 assert_fields_eq(&plan, vec!["a"]);
1822
1823 let expected = "\
1824 Projection: Int32(1) AS a\
1825 \n TableScan: test projection=[]";
1826
1827 assert_optimized_plan_equal(plan, expected)
1828 }
1829
1830 #[test]
1831 fn table_full_filter_pushdown() -> Result<()> {
1832 let schema = Schema::new(test_table_scan_fields());
1833
1834 let table_scan = table_scan_with_filters(
1835 Some("test"),
1836 &schema,
1837 None,
1838 vec![col("b").eq(lit(1))],
1839 )?
1840 .build()?;
1841 assert_eq!(3, table_scan.schema().fields().len());
1842 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1843
1844 let plan = LogicalPlanBuilder::from(table_scan)
1846 .project(vec![col("b")])?
1847 .project(vec![lit(1).alias("a")])?
1848 .build()?;
1849
1850 assert_fields_eq(&plan, vec!["a"]);
1851
1852 let expected = "\
1853 Projection: Int32(1) AS a\
1854 \n TableScan: test projection=[], full_filters=[b = Int32(1)]";
1855
1856 assert_optimized_plan_equal(plan, expected)
1857 }
1858
1859 #[test]
1861 fn test_double_optimization() -> Result<()> {
1862 let table_scan = test_table_scan()?;
1863
1864 let plan = LogicalPlanBuilder::from(table_scan)
1865 .project(vec![col("b")])?
1866 .project(vec![lit(1).alias("a")])?
1867 .build()?;
1868
1869 let optimized_plan1 = optimize(plan).expect("failed to optimize plan");
1870 let optimized_plan2 =
1871 optimize(optimized_plan1.clone()).expect("failed to optimize plan");
1872
1873 let formatted_plan1 = format!("{optimized_plan1:?}");
1874 let formatted_plan2 = format!("{optimized_plan2:?}");
1875 assert_eq!(formatted_plan1, formatted_plan2);
1876 Ok(())
1877 }
1878
1879 #[test]
1881 fn table_unused_aggregate() -> Result<()> {
1882 let table_scan = test_table_scan()?;
1883 assert_eq!(3, table_scan.schema().fields().len());
1884 assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
1885
1886 let plan = LogicalPlanBuilder::from(table_scan)
1888 .aggregate(vec![col("a"), col("c")], vec![max(col("b")), min(col("b"))])?
1889 .filter(col("c").gt(lit(1)))?
1890 .project(vec![col("c"), col("a"), col("max(test.b)")])?
1891 .build()?;
1892
1893 assert_fields_eq(&plan, vec!["c", "a", "max(test.b)"]);
1894
1895 let expected = "Projection: test.c, test.a, max(test.b)\
1896 \n Filter: test.c > Int32(1)\
1897 \n Aggregate: groupBy=[[test.a, test.c]], aggr=[[max(test.b)]]\
1898 \n TableScan: test projection=[a, b, c]";
1899
1900 assert_optimized_plan_equal(plan, expected)
1901 }
1902
1903 #[test]
1904 fn aggregate_filter_pushdown() -> Result<()> {
1905 let table_scan = test_table_scan()?;
1906 let aggr_with_filter = count_udaf()
1907 .call(vec![col("b")])
1908 .filter(col("c").gt(lit(42)))
1909 .build()?;
1910 let plan = LogicalPlanBuilder::from(table_scan)
1911 .aggregate(
1912 vec![col("a")],
1913 vec![count(col("b")), aggr_with_filter.alias("count2")],
1914 )?
1915 .build()?;
1916
1917 let expected = "Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
1918 \n TableScan: test projection=[a, b, c]";
1919
1920 assert_optimized_plan_equal(plan, expected)
1921 }
1922
1923 #[test]
1924 fn pushdown_through_distinct() -> Result<()> {
1925 let table_scan = test_table_scan()?;
1926
1927 let plan = LogicalPlanBuilder::from(table_scan)
1928 .project(vec![col("a"), col("b")])?
1929 .distinct()?
1930 .project(vec![col("a")])?
1931 .build()?;
1932
1933 let expected = "Projection: test.a\
1934 \n Distinct:\
1935 \n TableScan: test projection=[a, b]";
1936
1937 assert_optimized_plan_equal(plan, expected)
1938 }
1939
1940 #[test]
1941 fn test_window() -> Result<()> {
1942 let table_scan = test_table_scan()?;
1943
1944 let max1 = Expr::WindowFunction(expr::WindowFunction::new(
1945 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1946 vec![col("test.a")],
1947 ))
1948 .partition_by(vec![col("test.b")])
1949 .build()
1950 .unwrap();
1951
1952 let max2 = Expr::WindowFunction(expr::WindowFunction::new(
1953 WindowFunctionDefinition::AggregateUDF(max_udaf()),
1954 vec![col("test.b")],
1955 ));
1956 let col1 = col(max1.schema_name().to_string());
1957 let col2 = col(max2.schema_name().to_string());
1958
1959 let plan = LogicalPlanBuilder::from(table_scan)
1960 .window(vec![max1])?
1961 .window(vec![max2])?
1962 .project(vec![col1, col2])?
1963 .build()?;
1964
1965 let expected = "Projection: max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
1966 \n WindowAggr: windowExpr=[[max(test.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
1967 \n Projection: test.b, max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
1968 \n WindowAggr: windowExpr=[[max(test.a) PARTITION BY [test.b] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
1969 \n TableScan: test projection=[a, b]";
1970
1971 assert_optimized_plan_equal(plan, expected)
1972 }
1973
1974 fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
1975
1976 fn optimize(plan: LogicalPlan) -> Result<LogicalPlan> {
1977 let optimizer = Optimizer::with_rules(vec![Arc::new(OptimizeProjections::new())]);
1978 let optimized_plan =
1979 optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
1980 Ok(optimized_plan)
1981 }
1982}