1use indexmap::{IndexMap, IndexSet};
24use std::collections::HashMap;
25use std::sync::Arc;
26
27use datafusion_common::alias::AliasGenerator;
28use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
29use datafusion_common::{Column, DFSchema, Result, qualified_name};
30use datafusion_expr::logical_plan::LogicalPlan;
31use datafusion_expr::{Expr, ExpressionPlacement, Projection};
32
33use crate::optimizer::ApplyOrder;
34use crate::push_down_filter::replace_cols_by_name;
35use crate::utils::{ColumnReference, has_all_column_refs, schema_columns};
36use crate::{OptimizerConfig, OptimizerRule};
37
38const EXTRACTED_EXPR_PREFIX: &str = "__datafusion_extracted";
45
46fn has_extractable_expr(exprs: &[Expr]) -> bool {
54 exprs.iter().any(|expr| {
55 expr.exists(|e| Ok(e.placement() == ExpressionPlacement::MoveTowardsLeafNodes))
56 .unwrap_or(false)
57 })
58}
59
60#[derive(Default, Debug)]
103pub struct ExtractLeafExpressions {}
104
105impl ExtractLeafExpressions {
106 pub fn new() -> Self {
108 Self {}
109 }
110}
111
112impl OptimizerRule for ExtractLeafExpressions {
113 fn name(&self) -> &str {
114 "extract_leaf_expressions"
115 }
116
117 fn rewrite(
118 &self,
119 plan: LogicalPlan,
120 config: &dyn OptimizerConfig,
121 ) -> Result<Transformed<LogicalPlan>> {
122 if !config.options().optimizer.enable_leaf_expression_pushdown {
123 return Ok(Transformed::no(plan));
124 }
125 let alias_generator = config.alias_generator();
126
127 advance_generator_past_existing(&plan, alias_generator)?;
130
131 plan.transform_down_with_subqueries(|plan| {
132 extract_from_plan(plan, alias_generator)
133 })
134 }
135}
136
137fn advance_generator_past_existing(
141 plan: &LogicalPlan,
142 alias_generator: &AliasGenerator,
143) -> Result<()> {
144 plan.apply(|plan| {
145 plan.expressions().iter().try_for_each(|expr| {
146 expr.apply(|e| {
147 if let Expr::Alias(alias) = e
148 && let Some(id) = alias
149 .name
150 .strip_prefix(EXTRACTED_EXPR_PREFIX)
151 .and_then(|s| s.strip_prefix('_'))
152 .and_then(|s| s.parse().ok())
153 {
154 alias_generator.update_min_id(id);
155 }
156 Ok(TreeNodeRecursion::Continue)
157 })?;
158 Ok::<(), datafusion_common::error::DataFusionError>(())
159 })?;
160 Ok(TreeNodeRecursion::Continue)
161 })
162 .map(|_| ())
163}
164
165fn extract_from_plan(
172 plan: LogicalPlan,
173 alias_generator: &Arc<AliasGenerator>,
174) -> Result<Transformed<LogicalPlan>> {
175 if !matches!(
180 &plan,
181 LogicalPlan::Aggregate(_)
182 | LogicalPlan::Filter(_)
183 | LogicalPlan::Sort(_)
184 | LogicalPlan::Limit(_)
185 | LogicalPlan::Join(_)
186 ) {
187 return Ok(Transformed::no(plan));
188 }
189
190 let inputs = plan.inputs();
191 if inputs.is_empty() {
192 return Ok(Transformed::no(plan));
193 }
194
195 if !has_extractable_expr(&plan.expressions()) {
197 return Ok(Transformed::no(plan));
198 }
199
200 let original_schema = Arc::clone(plan.schema());
202
203 let input_schemas: Vec<Arc<DFSchema>> =
207 inputs.iter().map(|i| Arc::clone(i.schema())).collect();
208
209 let mut extractors: Vec<LeafExpressionExtractor> = input_schemas
211 .iter()
212 .map(|schema| LeafExpressionExtractor::new(schema.as_ref(), alias_generator))
213 .collect();
214
215 let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
217 input_schemas
218 .iter()
219 .map(|schema| schema_columns(schema.as_ref()))
220 .collect();
221
222 let transformed = plan.map_expressions(|expr| {
224 routing_extract(expr, &mut extractors, &input_column_sets)
225 })?;
226
227 if !transformed.transformed {
229 return Ok(transformed);
230 }
231
232 let owned_inputs: Vec<Arc<LogicalPlan>> = transformed
235 .data
236 .inputs()
237 .into_iter()
238 .map(|i| Arc::new(i.clone()))
239 .collect();
240
241 let new_inputs: Vec<LogicalPlan> = owned_inputs
243 .into_iter()
244 .zip(extractors.iter())
245 .map(|(input_arc, extractor)| {
246 match extractor.build_extraction_projection(&input_arc)? {
247 Some(plan) => Ok(plan),
248 None => Ok(Arc::unwrap_or_clone(input_arc)),
251 }
252 })
253 .collect::<Result<Vec<_>>>()?;
254
255 let new_plan = transformed
258 .data
259 .with_new_exprs(transformed.data.expressions(), new_inputs)?;
260
261 let recovered = build_recovery_projection(original_schema.as_ref(), new_plan)?;
263
264 Ok(Transformed::yes(recovered))
265}
266
267fn find_owning_input(
273 expr: &Expr,
274 input_column_sets: &[std::collections::HashSet<ColumnReference>],
275) -> Option<usize> {
276 let mut found = None;
277 for (idx, cols) in input_column_sets.iter().enumerate() {
278 if has_all_column_refs(expr, cols) {
279 if found.is_some() {
280 return None;
282 }
283 found = Some(idx);
284 }
285 }
286 found
287}
288
289fn routing_extract(
292 expr: Expr,
293 extractors: &mut [LeafExpressionExtractor],
294 input_column_sets: &[std::collections::HashSet<ColumnReference>],
295) -> Result<Transformed<Expr>> {
296 expr.transform_down(|e| {
297 if let Expr::Alias(alias) = &e
299 && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
300 {
301 return Ok(Transformed {
302 data: e,
303 transformed: false,
304 tnr: TreeNodeRecursion::Jump,
305 });
306 }
307
308 if matches!(&e, Expr::Alias(_)) {
311 return Ok(Transformed::no(e));
312 }
313
314 match e.placement() {
315 ExpressionPlacement::MoveTowardsLeafNodes => {
316 if let Some(idx) = find_owning_input(&e, input_column_sets) {
317 let col_ref = extractors[idx].add_extracted(e)?;
318 Ok(Transformed::yes(col_ref))
319 } else {
320 Ok(Transformed::no(e))
322 }
323 }
324 ExpressionPlacement::Column => {
325 if let Expr::Column(col) = &e
331 && let Some(idx) = find_owning_input(&e, input_column_sets)
332 {
333 extractors[idx].columns_needed.insert(col.clone());
334 }
335 Ok(Transformed::no(e))
336 }
337 _ => Ok(Transformed::no(e)),
338 }
339 })
340}
341
342fn remap_pairs_and_columns(
352 pairs: &[(Expr, String)],
353 columns: &IndexSet<Column>,
354 from_schema: &DFSchema,
355 to_schema: &DFSchema,
356) -> Result<ExtractionTarget> {
357 let mut replace_map = HashMap::new();
358 for ((from_q, from_f), (to_q, to_f)) in from_schema.iter().zip(to_schema.iter()) {
359 replace_map.insert(
360 qualified_name(from_q, from_f.name()),
361 Expr::Column(Column::new(to_q.cloned(), to_f.name())),
362 );
363 }
364 let remapped_pairs: Vec<(Expr, String)> = pairs
365 .iter()
366 .map(|(expr, alias)| {
367 Ok((
368 replace_cols_by_name(expr.clone(), &replace_map)?,
369 alias.clone(),
370 ))
371 })
372 .collect::<Result<_>>()?;
373 let remapped_columns: IndexSet<Column> = columns
374 .iter()
375 .filter_map(|col| {
376 let rewritten =
377 replace_cols_by_name(Expr::Column(col.clone()), &replace_map).ok()?;
378 if let Expr::Column(c) = rewritten {
379 Some(c)
380 } else {
381 Some(col.clone())
382 }
383 })
384 .collect();
385 Ok(ExtractionTarget {
386 pairs: remapped_pairs,
387 columns: remapped_columns,
388 })
389}
390
391struct ExtractionTarget {
398 pairs: Vec<(Expr, String)>,
400 columns: IndexSet<Column>,
402}
403
404fn build_projection_replace_map(projection: &Projection) -> HashMap<String, Expr> {
409 projection
410 .schema
411 .iter()
412 .zip(projection.expr.iter())
413 .map(|((qualifier, field), expr)| {
414 let key = Column::from((qualifier, field)).flat_name();
415 (key, expr.clone().unalias())
416 })
417 .collect()
418}
419
420fn build_recovery_projection(
444 original_schema: &DFSchema,
445 input: LogicalPlan,
446) -> Result<LogicalPlan> {
447 let new_schema = input.schema();
448 let orig_len = original_schema.fields().len();
449 let new_len = new_schema.fields().len();
450
451 if orig_len == new_len {
452 let schemas_match = original_schema.iter().zip(new_schema.iter()).all(
454 |((orig_q, orig_f), (new_q, new_f))| {
455 orig_f.name() == new_f.name() && orig_q == new_q
456 },
457 );
458 if schemas_match {
459 return Ok(input);
460 }
461
462 debug_assert!(
470 orig_len == new_len,
471 "build_recovery_projection: positional mapping requires same field count, \
472 got original={orig_len} vs new={new_len}"
473 );
474 let mut proj_exprs = Vec::with_capacity(orig_len);
475 for (i, (orig_qualifier, orig_field)) in original_schema.iter().enumerate() {
476 let (new_qualifier, new_field) = new_schema.qualified_field(i);
477 if orig_field.name() == new_field.name() && orig_qualifier == new_qualifier {
478 proj_exprs.push(Expr::from((orig_qualifier, orig_field)));
479 } else {
480 let new_col = Expr::Column(Column::from((new_qualifier, new_field)));
481 proj_exprs.push(
482 new_col.alias_qualified(orig_qualifier.cloned(), orig_field.name()),
483 );
484 }
485 }
486 let projection = Projection::try_new(proj_exprs, Arc::new(input))?;
487 Ok(LogicalPlan::Projection(projection))
488 } else {
489 let col_exprs: Vec<Expr> = original_schema.iter().map(Expr::from).collect();
492 let projection = Projection::try_new(col_exprs, Arc::new(input))?;
493 Ok(LogicalPlan::Projection(projection))
494 }
495}
496
497struct LeafExpressionExtractor<'a> {
508 extracted: IndexMap<Expr, String>,
510 columns_needed: IndexSet<Column>,
513 input_schema: &'a DFSchema,
515 alias_generator: &'a Arc<AliasGenerator>,
517}
518
519impl<'a> LeafExpressionExtractor<'a> {
520 fn new(input_schema: &'a DFSchema, alias_generator: &'a Arc<AliasGenerator>) -> Self {
521 Self {
522 extracted: IndexMap::new(),
523 columns_needed: IndexSet::new(),
524 input_schema,
525 alias_generator,
526 }
527 }
528
529 fn add_extracted(&mut self, expr: Expr) -> Result<Expr> {
531 if let Some(alias) = self.extracted.get(&expr) {
533 return Ok(Expr::Column(Column::new_unqualified(alias)));
534 }
535
536 for col in expr.column_refs() {
538 self.columns_needed.insert(col.clone());
539 }
540
541 let alias = self.alias_generator.next(EXTRACTED_EXPR_PREFIX);
543 self.extracted.insert(expr, alias.clone());
544
545 Ok(Expr::Column(Column::new_unqualified(&alias)))
546 }
547
548 fn build_extraction_projection(
554 &self,
555 input: &Arc<LogicalPlan>,
556 ) -> Result<Option<LogicalPlan>> {
557 if self.extracted.is_empty() {
558 return Ok(None);
559 }
560 let pairs: Vec<(Expr, String)> = self
561 .extracted
562 .iter()
563 .map(|(e, a)| (e.clone(), a.clone()))
564 .collect();
565 let proj = build_extraction_projection_impl(
566 &pairs,
567 &self.columns_needed,
568 input,
569 self.input_schema,
570 )?;
571 Ok(Some(LogicalPlan::Projection(proj)))
572 }
573}
574
575fn build_extraction_projection_impl(
587 extracted_exprs: &[(Expr, String)],
588 columns_needed: &IndexSet<Column>,
589 target: &Arc<LogicalPlan>,
590 target_schema: &DFSchema,
591) -> Result<Projection> {
592 if let LogicalPlan::Projection(existing) = target.as_ref() {
593 let mut proj_exprs = existing.expr.clone();
595
596 let existing_extractions: IndexMap<Expr, String> = existing
598 .expr
599 .iter()
600 .filter_map(|e| {
601 if let Expr::Alias(alias) = e
602 && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
603 {
604 return Some((*alias.expr.clone(), alias.name.clone()));
605 }
606 None
607 })
608 .collect();
609
610 let replace_map = build_projection_replace_map(existing);
612
613 for (expr, alias) in extracted_exprs {
615 let resolved = replace_cols_by_name(expr.clone().alias(alias), &replace_map)?;
616 let resolved_inner = if let Expr::Alias(a) = &resolved {
617 a.expr.as_ref()
618 } else {
619 &resolved
620 };
621 if let Some(existing_alias) = existing_extractions.get(resolved_inner) {
622 if existing_alias != alias {
628 proj_exprs.push(resolved);
629 }
630 } else {
631 proj_exprs.push(resolved);
632 }
633 }
634
635 let existing_cols: IndexSet<Column> = existing
641 .expr
642 .iter()
643 .filter_map(|e| {
644 if let Expr::Column(c) = e {
645 Some(c.clone())
646 } else {
647 None
648 }
649 })
650 .collect();
651
652 let input_schema = existing.input.schema();
653 for col in columns_needed {
654 let col_expr = Expr::Column(col.clone());
655 let resolved = replace_cols_by_name(col_expr, &replace_map)?;
656 if let Expr::Column(resolved_col) = &resolved
657 && !existing_cols.contains(resolved_col)
658 && input_schema.has_column(resolved_col)
659 {
660 proj_exprs.push(Expr::Column(resolved_col.clone()));
661 }
662 }
664
665 Projection::try_new(proj_exprs, Arc::clone(&existing.input))
666 } else {
667 let mut proj_exprs = Vec::new();
669 for (expr, alias) in extracted_exprs {
670 proj_exprs.push(expr.clone().alias(alias));
671 }
672 for (qualifier, field) in target_schema.iter() {
673 proj_exprs.push(Expr::from((qualifier, field)));
674 }
675 Projection::try_new(proj_exprs, Arc::clone(target))
676 }
677}
678
679#[derive(Default, Debug)]
713pub struct PushDownLeafProjections {}
714
715impl PushDownLeafProjections {
716 pub fn new() -> Self {
717 Self {}
718 }
719}
720
721impl OptimizerRule for PushDownLeafProjections {
722 fn name(&self) -> &str {
723 "push_down_leaf_projections"
724 }
725
726 fn apply_order(&self) -> Option<ApplyOrder> {
727 Some(ApplyOrder::TopDown)
728 }
729
730 fn rewrite(
731 &self,
732 plan: LogicalPlan,
733 config: &dyn OptimizerConfig,
734 ) -> Result<Transformed<LogicalPlan>> {
735 if !config.options().optimizer.enable_leaf_expression_pushdown {
736 return Ok(Transformed::no(plan));
737 }
738 let alias_generator = config.alias_generator();
739 match try_push_input(&plan, alias_generator)? {
740 Some(new_plan) => Ok(Transformed::yes(new_plan)),
741 None => Ok(Transformed::no(plan)),
742 }
743 }
744}
745
746fn try_push_input(
751 input: &LogicalPlan,
752 alias_generator: &Arc<AliasGenerator>,
753) -> Result<Option<LogicalPlan>> {
754 let LogicalPlan::Projection(proj) = input else {
755 return Ok(None);
756 };
757 split_and_push_projection(proj, alias_generator)
758}
759
760fn split_and_push_projection(
792 proj: &Projection,
793 alias_generator: &Arc<AliasGenerator>,
794) -> Result<Option<LogicalPlan>> {
795 let has_existing_extracted = proj.expr.iter().any(|e| {
798 matches!(e, Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX))
799 });
800 if !has_existing_extracted && !has_extractable_expr(&proj.expr) {
801 return Ok(None);
802 }
803
804 let input = &proj.input;
805 let input_schema = input.schema();
806
807 let mut extractors = vec![LeafExpressionExtractor::new(
823 input_schema.as_ref(),
824 alias_generator,
825 )];
826 let input_column_sets = vec![schema_columns(input_schema.as_ref())];
827
828 let original_schema = proj.schema.as_ref();
829 let mut recovery_exprs: Vec<Expr> = Vec::with_capacity(proj.expr.len());
830 let mut needs_recovery = false;
831 let mut has_new_extractions = false;
832 let mut proj_exprs_captured: usize = 0;
833 let mut standalone_columns: IndexSet<Column> = IndexSet::new();
836
837 for (expr, (qualifier, field)) in proj.expr.iter().zip(original_schema.iter()) {
838 if let Expr::Alias(alias) = expr
839 && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
840 {
841 let alias_name = alias.name.clone();
844
845 for col_ref in alias.expr.column_refs() {
846 extractors[0].columns_needed.insert(col_ref.clone());
847 }
848
849 extractors[0]
850 .extracted
851 .insert(expr.clone(), alias_name.clone());
852 recovery_exprs.push(Expr::Column(Column::new_unqualified(&alias_name)));
853 proj_exprs_captured += 1;
854 } else if let Expr::Column(col) = expr {
855 extractors[0].columns_needed.insert(col.clone());
857 standalone_columns.insert(col.clone());
858 recovery_exprs.push(expr.clone());
859 proj_exprs_captured += 1;
860 } else {
861 let transformed =
863 routing_extract(expr.clone(), &mut extractors, &input_column_sets)?;
864 if transformed.transformed {
865 has_new_extractions = true;
866 }
867 let transformed_expr = transformed.data;
868
869 let original_name = field.name();
871 let needs_alias = if let Expr::Column(col) = &transformed_expr {
872 col.name.as_str() != original_name
873 } else {
874 let expr_name = transformed_expr.schema_name().to_string();
875 original_name != &expr_name
876 };
877 let recovery_expr = if needs_alias {
878 needs_recovery = true;
879 transformed_expr
880 .clone()
881 .alias_qualified(qualifier.cloned(), original_name)
882 } else {
883 transformed_expr.clone()
884 };
885
886 if transformed.transformed || !matches!(expr, Expr::Column(_)) {
891 needs_recovery = true;
892 }
893
894 recovery_exprs.push(recovery_expr);
895 }
896 }
897
898 let extractor = &extractors[0];
901 let extraction_pairs: Vec<(Expr, String)> = extractor
902 .extracted
903 .iter()
904 .map(|(e, a)| match e {
905 Expr::Alias(alias) => (*alias.expr.clone(), a.clone()),
906 _ => (e.clone(), a.clone()),
907 })
908 .collect();
909 let columns_needed = &extractor.columns_needed;
910
911 if extraction_pairs.is_empty() {
913 return Ok(None);
914 }
915
916 if columns_needed
921 .iter()
922 .any(|c| !standalone_columns.contains(c))
923 {
924 needs_recovery = true;
925 }
926
927 let proj_input = Arc::clone(&proj.input);
929 let pushed = push_extraction_pairs(
930 &extraction_pairs,
931 columns_needed,
932 proj,
933 &proj_input,
934 alias_generator,
935 proj_exprs_captured,
936 )?;
937
938 let base_plan = match pushed {
941 Some(plan) => plan,
942 None => {
943 if !has_new_extractions {
944 return Ok(None);
949 }
950 let input_arc = Arc::clone(input);
952 let extraction = build_extraction_projection_impl(
953 &extraction_pairs,
954 columns_needed,
955 &input_arc,
956 input_schema.as_ref(),
957 )?;
958 LogicalPlan::Projection(extraction)
959 }
960 };
961
962 if needs_recovery {
964 let recovery = LogicalPlan::Projection(Projection::try_new(
965 recovery_exprs,
966 Arc::new(base_plan),
967 )?);
968 Ok(Some(recovery))
969 } else {
970 Ok(Some(base_plan))
971 }
972}
973
974fn is_pure_extraction_projection(plan: &LogicalPlan) -> bool {
978 let LogicalPlan::Projection(proj) = plan else {
979 return false;
980 };
981 let mut has_extraction = false;
982 for expr in &proj.expr {
983 match expr {
984 Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX) => {
985 has_extraction = true;
986 }
987 Expr::Column(_) => {}
988 _ => return false,
989 }
990 }
991 has_extraction
992}
993
994fn push_extraction_pairs(
997 pairs: &[(Expr, String)],
998 columns_needed: &IndexSet<Column>,
999 proj: &Projection,
1000 proj_input: &Arc<LogicalPlan>,
1001 alias_generator: &Arc<AliasGenerator>,
1002 proj_exprs_captured: usize,
1003) -> Result<Option<LogicalPlan>> {
1004 match proj_input.as_ref() {
1005 LogicalPlan::Projection(_) if proj_exprs_captured == proj.expr.len() => {
1012 let target_schema = Arc::clone(proj_input.schema());
1013 let merged = build_extraction_projection_impl(
1014 pairs,
1015 columns_needed,
1016 proj_input,
1017 target_schema.as_ref(),
1018 )?;
1019 let merged_plan = LogicalPlan::Projection(merged);
1020
1021 if is_pure_extraction_projection(&merged_plan)
1030 && let Some(pushed) = try_push_input(&merged_plan, alias_generator)?
1031 {
1032 return Ok(Some(pushed));
1033 }
1034 Ok(Some(merged_plan))
1035 }
1036 _ => try_push_into_inputs(
1042 pairs,
1043 columns_needed,
1044 proj_input.as_ref(),
1045 alias_generator,
1046 ),
1047 }
1048}
1049
1050fn route_to_inputs(
1058 pairs: &[(Expr, String)],
1059 columns: &IndexSet<Column>,
1060 node: &LogicalPlan,
1061 input_column_sets: &[std::collections::HashSet<ColumnReference>],
1062 input_schemas: &[Arc<DFSchema>],
1063) -> Result<Option<Vec<ExtractionTarget>>> {
1064 let num_inputs = input_schemas.len();
1065 let mut per_input: Vec<ExtractionTarget> = (0..num_inputs)
1066 .map(|_| ExtractionTarget {
1067 pairs: vec![],
1068 columns: IndexSet::new(),
1069 })
1070 .collect();
1071
1072 if matches!(node, LogicalPlan::Union(_)) {
1073 let union_schema = node.schema();
1077 for (idx, input_schema) in input_schemas.iter().enumerate() {
1078 per_input[idx] =
1079 remap_pairs_and_columns(pairs, columns, union_schema, input_schema)?;
1080 }
1081 } else {
1082 for (expr, alias) in pairs {
1083 match find_owning_input(expr, input_column_sets) {
1084 Some(idx) => per_input[idx].pairs.push((expr.clone(), alias.clone())),
1085 None => return Ok(None), }
1087 }
1088 for col in columns {
1089 let col_expr = Expr::Column(col.clone());
1090 match find_owning_input(&col_expr, input_column_sets) {
1091 Some(idx) => {
1092 per_input[idx].columns.insert(col.clone());
1093 }
1094 None => return Ok(None), }
1096 }
1097 }
1098
1099 if per_input.iter().all(|t| t.pairs.is_empty()) {
1101 return Ok(None);
1102 }
1103
1104 Ok(Some(per_input))
1105}
1106
1107fn try_push_into_inputs(
1136 pairs: &[(Expr, String)],
1137 columns_needed: &IndexSet<Column>,
1138 node: &LogicalPlan,
1139 alias_generator: &Arc<AliasGenerator>,
1140) -> Result<Option<LogicalPlan>> {
1141 let inputs = node.inputs();
1142 if inputs.is_empty() {
1143 return Ok(None);
1144 }
1145
1146 let remapped = if let LogicalPlan::SubqueryAlias(sa) = node {
1149 remap_pairs_and_columns(pairs, columns_needed, &sa.schema, sa.input.schema())?
1150 } else {
1151 ExtractionTarget {
1152 pairs: pairs.to_vec(),
1153 columns: columns_needed.clone(),
1154 }
1155 };
1156 let pairs = &remapped.pairs[..];
1157 let columns_needed = &remapped.columns;
1158
1159 let input_schemas: Vec<Arc<DFSchema>> =
1161 inputs.iter().map(|i| Arc::clone(i.schema())).collect();
1162 let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
1163 input_schemas.iter().map(|s| schema_columns(s)).collect();
1164
1165 let per_input = match route_to_inputs(
1167 pairs,
1168 columns_needed,
1169 node,
1170 &input_column_sets,
1171 &input_schemas,
1172 )? {
1173 Some(routed) => routed,
1174 None => return Ok(None),
1175 };
1176
1177 let num_inputs = inputs.len();
1178
1179 let mut new_inputs: Vec<LogicalPlan> = Vec::with_capacity(num_inputs);
1184 for (idx, input) in inputs.into_iter().enumerate() {
1185 if per_input[idx].pairs.is_empty() {
1186 new_inputs.push(input.clone());
1187 } else {
1188 let input_arc = Arc::new(input.clone());
1189 let target_schema = Arc::clone(input.schema());
1190 let proj = build_extraction_projection_impl(
1191 &per_input[idx].pairs,
1192 &per_input[idx].columns,
1193 &input_arc,
1194 target_schema.as_ref(),
1195 )?;
1196 let proj_schema = proj.schema.as_ref();
1200 for (_expr, alias) in &per_input[idx].pairs {
1201 if !proj_schema.fields().iter().any(|f| f.name() == alias) {
1202 return Ok(None);
1203 }
1204 }
1205 let proj_plan = LogicalPlan::Projection(proj);
1206 match try_push_input(&proj_plan, alias_generator)? {
1211 Some(pushed) => new_inputs.push(pushed),
1212 None => new_inputs.push(proj_plan),
1213 }
1214 }
1215 }
1216
1217 let new_node = node.with_new_exprs(node.expressions(), new_inputs)?;
1219
1220 let output_schema = new_node.schema();
1224 for (_expr, alias) in pairs {
1225 if !output_schema.fields().iter().any(|f| f.name() == alias) {
1226 return Ok(None);
1227 }
1228 }
1229
1230 Ok(Some(new_node))
1231}
1232
1233#[cfg(test)]
1234mod tests {
1235
1236 use super::*;
1237 use crate::optimize_projections::OptimizeProjections;
1238 use crate::test::udfs::PlacementTestUDF;
1239 use crate::test::*;
1240 use crate::{Optimizer, OptimizerContext};
1241 use datafusion_expr::expr::ScalarFunction;
1242 use datafusion_expr::{
1243 ScalarUDF, col, lit, logical_plan::builder::LogicalPlanBuilder,
1244 };
1245
1246 fn leaf_udf(expr: Expr, name: &str) -> Expr {
1247 Expr::ScalarFunction(ScalarFunction::new_udf(
1248 Arc::new(ScalarUDF::new_from_impl(
1249 PlacementTestUDF::new()
1250 .with_placement(ExpressionPlacement::MoveTowardsLeafNodes),
1251 )),
1252 vec![expr, lit(name)],
1253 ))
1254 }
1255
1256 fn format_optimization_stages(plan: &LogicalPlan) -> Result<String> {
1270 let run = |rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>| -> Result<String> {
1271 let ctx = OptimizerContext::new().with_max_passes(1);
1272 let optimizer = Optimizer::with_rules(rules);
1273 let optimized = optimizer.optimize(plan.clone(), &ctx, |_, _| {})?;
1274 Ok(format!("{optimized}"))
1275 };
1276
1277 let original = run(vec![Arc::new(OptimizeProjections::new())])?;
1278
1279 let after_extract = run(vec![
1280 Arc::new(OptimizeProjections::new()),
1281 Arc::new(ExtractLeafExpressions::new()),
1282 ])?;
1283
1284 let after_pushdown = run(vec![
1285 Arc::new(OptimizeProjections::new()),
1286 Arc::new(ExtractLeafExpressions::new()),
1287 Arc::new(PushDownLeafProjections::new()),
1288 ])?;
1289
1290 let optimized = run(vec![
1291 Arc::new(OptimizeProjections::new()),
1292 Arc::new(ExtractLeafExpressions::new()),
1293 Arc::new(PushDownLeafProjections::new()),
1294 Arc::new(OptimizeProjections::new()),
1295 ])?;
1296
1297 let mut out = format!("## Original Plan\n{original}");
1298
1299 out.push_str("\n\n## After Extraction\n");
1300 if after_extract == original {
1301 out.push_str("(same as original)");
1302 } else {
1303 out.push_str(&after_extract);
1304 }
1305
1306 out.push_str("\n\n## After Pushdown\n");
1307 if after_pushdown == after_extract {
1308 out.push_str("(same as after extraction)");
1309 } else {
1310 out.push_str(&after_pushdown);
1311 }
1312
1313 out.push_str("\n\n## Optimized\n");
1314 if optimized == after_pushdown {
1315 out.push_str("(same as after pushdown)");
1316 } else {
1317 out.push_str(&optimized);
1318 }
1319
1320 Ok(out)
1321 }
1322
1323 macro_rules! assert_stages {
1325 ($plan:expr, @ $expected:literal $(,)?) => {{
1326 let result = format_optimization_stages(&$plan)?;
1327 insta::assert_snapshot!(result, @ $expected);
1328 Ok::<(), datafusion_common::DataFusionError>(())
1329 }};
1330 }
1331
1332 #[test]
1333 fn test_extract_from_filter() -> Result<()> {
1334 let table_scan = test_table_scan_with_struct()?;
1335 let plan = LogicalPlanBuilder::from(table_scan.clone())
1336 .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1337 .select(vec![
1338 table_scan
1339 .schema()
1340 .index_of_column_by_name(None, "id")
1341 .unwrap(),
1342 ])?
1343 .build()?;
1344
1345 assert_stages!(plan, @r#"
1346 ## Original Plan
1347 Projection: test.id
1348 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1349 TableScan: test projection=[id, user]
1350
1351 ## After Extraction
1352 Projection: test.id
1353 Projection: test.id, test.user
1354 Filter: __datafusion_extracted_1 = Utf8("active")
1355 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
1356 TableScan: test projection=[id, user]
1357
1358 ## After Pushdown
1359 (same as after extraction)
1360
1361 ## Optimized
1362 Projection: test.id
1363 Filter: __datafusion_extracted_1 = Utf8("active")
1364 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id
1365 TableScan: test projection=[id, user]
1366 "#)
1367 }
1368
1369 #[test]
1370 fn test_no_extraction_for_column() -> Result<()> {
1371 let table_scan = test_table_scan()?;
1372 let plan = LogicalPlanBuilder::from(table_scan)
1373 .filter(col("a").eq(lit(1)))?
1374 .build()?;
1375
1376 assert_stages!(plan, @"
1377 ## Original Plan
1378 Filter: test.a = Int32(1)
1379 TableScan: test projection=[a, b, c]
1380
1381 ## After Extraction
1382 (same as original)
1383
1384 ## After Pushdown
1385 (same as after extraction)
1386
1387 ## Optimized
1388 (same as after pushdown)
1389 ")
1390 }
1391
1392 #[test]
1393 fn test_extract_from_projection() -> Result<()> {
1394 let table_scan = test_table_scan_with_struct()?;
1395 let plan = LogicalPlanBuilder::from(table_scan)
1396 .project(vec![leaf_udf(col("user"), "name")])?
1397 .build()?;
1398
1399 assert_stages!(plan, @r#"
1400 ## Original Plan
1401 Projection: leaf_udf(test.user, Utf8("name"))
1402 TableScan: test projection=[user]
1403
1404 ## After Extraction
1405 (same as original)
1406
1407 ## After Pushdown
1408 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1409 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1410 TableScan: test projection=[user]
1411
1412 ## Optimized
1413 Projection: leaf_udf(test.user, Utf8("name"))
1414 TableScan: test projection=[user]
1415 "#)
1416 }
1417
1418 #[test]
1419 fn test_extract_from_projection_with_subexpression() -> Result<()> {
1420 let table_scan = test_table_scan_with_struct()?;
1421 let plan = LogicalPlanBuilder::from(table_scan)
1422 .project(vec![
1423 leaf_udf(col("user"), "name")
1424 .is_not_null()
1425 .alias("has_name"),
1426 ])?
1427 .build()?;
1428
1429 assert_stages!(plan, @r#"
1430 ## Original Plan
1431 Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name
1432 TableScan: test projection=[user]
1433
1434 ## After Extraction
1435 (same as original)
1436
1437 ## After Pushdown
1438 Projection: __datafusion_extracted_1 IS NOT NULL AS has_name
1439 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1440 TableScan: test projection=[user]
1441
1442 ## Optimized
1443 Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name
1444 TableScan: test projection=[user]
1445 "#)
1446 }
1447
1448 #[test]
1449 fn test_projection_no_extraction_for_column() -> Result<()> {
1450 let table_scan = test_table_scan()?;
1451 let plan = LogicalPlanBuilder::from(table_scan)
1452 .project(vec![col("a"), col("b")])?
1453 .build()?;
1454
1455 assert_stages!(plan, @"
1456 ## Original Plan
1457 TableScan: test projection=[a, b]
1458
1459 ## After Extraction
1460 (same as original)
1461
1462 ## After Pushdown
1463 (same as after extraction)
1464
1465 ## Optimized
1466 (same as after pushdown)
1467 ")
1468 }
1469
1470 #[test]
1471 fn test_filter_with_deduplication() -> Result<()> {
1472 let table_scan = test_table_scan_with_struct()?;
1473 let field_access = leaf_udf(col("user"), "name");
1474 let plan = LogicalPlanBuilder::from(table_scan)
1476 .filter(
1477 field_access
1478 .clone()
1479 .is_not_null()
1480 .and(field_access.is_null()),
1481 )?
1482 .build()?;
1483
1484 assert_stages!(plan, @r#"
1485 ## Original Plan
1486 Filter: leaf_udf(test.user, Utf8("name")) IS NOT NULL AND leaf_udf(test.user, Utf8("name")) IS NULL
1487 TableScan: test projection=[id, user]
1488
1489 ## After Extraction
1490 Projection: test.id, test.user
1491 Filter: __datafusion_extracted_1 IS NOT NULL AND __datafusion_extracted_1 IS NULL
1492 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1493 TableScan: test projection=[id, user]
1494
1495 ## After Pushdown
1496 (same as after extraction)
1497
1498 ## Optimized
1499 (same as after pushdown)
1500 "#)
1501 }
1502
1503 #[test]
1504 fn test_already_leaf_expression_in_filter() -> Result<()> {
1505 let table_scan = test_table_scan_with_struct()?;
1506 let plan = LogicalPlanBuilder::from(table_scan)
1507 .filter(leaf_udf(col("user"), "name").eq(lit("test")))?
1508 .build()?;
1509
1510 assert_stages!(plan, @r#"
1511 ## Original Plan
1512 Filter: leaf_udf(test.user, Utf8("name")) = Utf8("test")
1513 TableScan: test projection=[id, user]
1514
1515 ## After Extraction
1516 Projection: test.id, test.user
1517 Filter: __datafusion_extracted_1 = Utf8("test")
1518 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1519 TableScan: test projection=[id, user]
1520
1521 ## After Pushdown
1522 (same as after extraction)
1523
1524 ## Optimized
1525 (same as after pushdown)
1526 "#)
1527 }
1528
1529 #[test]
1530 fn test_extract_from_aggregate_group_by() -> Result<()> {
1531 use datafusion_expr::test::function_stub::count;
1532
1533 let table_scan = test_table_scan_with_struct()?;
1534 let plan = LogicalPlanBuilder::from(table_scan)
1535 .aggregate(vec![leaf_udf(col("user"), "status")], vec![count(lit(1))])?
1536 .build()?;
1537
1538 assert_stages!(plan, @r#"
1539 ## Original Plan
1540 Aggregate: groupBy=[[leaf_udf(test.user, Utf8("status"))]], aggr=[[COUNT(Int32(1))]]
1541 TableScan: test projection=[user]
1542
1543 ## After Extraction
1544 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), COUNT(Int32(1))
1545 Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
1546 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user
1547 TableScan: test projection=[user]
1548
1549 ## After Pushdown
1550 (same as after extraction)
1551
1552 ## Optimized
1553 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), COUNT(Int32(1))
1554 Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
1555 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
1556 TableScan: test projection=[user]
1557 "#)
1558 }
1559
1560 #[test]
1561 fn test_extract_from_aggregate_args() -> Result<()> {
1562 use datafusion_expr::test::function_stub::count;
1563
1564 let table_scan = test_table_scan_with_struct()?;
1565 let plan = LogicalPlanBuilder::from(table_scan)
1566 .aggregate(
1567 vec![col("user")],
1568 vec![count(leaf_udf(col("user"), "value"))],
1569 )?
1570 .build()?;
1571
1572 assert_stages!(plan, @r#"
1573 ## Original Plan
1574 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(leaf_udf(test.user, Utf8("value")))]]
1575 TableScan: test projection=[user]
1576
1577 ## After Extraction
1578 Projection: test.user, COUNT(__datafusion_extracted_1) AS COUNT(leaf_udf(test.user,Utf8("value")))
1579 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(__datafusion_extracted_1)]]
1580 Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1581 TableScan: test projection=[user]
1582
1583 ## After Pushdown
1584 (same as after extraction)
1585
1586 ## Optimized
1587 (same as after pushdown)
1588 "#)
1589 }
1590
1591 #[test]
1592 fn test_projection_with_filter_combined() -> Result<()> {
1593 let table_scan = test_table_scan_with_struct()?;
1594 let plan = LogicalPlanBuilder::from(table_scan)
1595 .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1596 .project(vec![leaf_udf(col("user"), "name")])?
1597 .build()?;
1598
1599 assert_stages!(plan, @r#"
1600 ## Original Plan
1601 Projection: leaf_udf(test.user, Utf8("name"))
1602 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1603 TableScan: test projection=[user]
1604
1605 ## After Extraction
1606 Projection: leaf_udf(test.user, Utf8("name"))
1607 Projection: test.user
1608 Filter: __datafusion_extracted_1 = Utf8("active")
1609 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user
1610 TableScan: test projection=[user]
1611
1612 ## After Pushdown
1613 Projection: __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
1614 Filter: __datafusion_extracted_1 = Utf8("active")
1615 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
1616 TableScan: test projection=[user]
1617
1618 ## Optimized
1619 Projection: __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
1620 Filter: __datafusion_extracted_1 = Utf8("active")
1621 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
1622 TableScan: test projection=[user]
1623 "#)
1624 }
1625
1626 #[test]
1627 fn test_projection_preserves_alias() -> Result<()> {
1628 let table_scan = test_table_scan_with_struct()?;
1629 let plan = LogicalPlanBuilder::from(table_scan)
1630 .project(vec![leaf_udf(col("user"), "name").alias("username")])?
1631 .build()?;
1632
1633 assert_stages!(plan, @r#"
1634 ## Original Plan
1635 Projection: leaf_udf(test.user, Utf8("name")) AS username
1636 TableScan: test projection=[user]
1637
1638 ## After Extraction
1639 (same as original)
1640
1641 ## After Pushdown
1642 Projection: __datafusion_extracted_1 AS username
1643 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1644 TableScan: test projection=[user]
1645
1646 ## Optimized
1647 Projection: leaf_udf(test.user, Utf8("name")) AS username
1648 TableScan: test projection=[user]
1649 "#)
1650 }
1651
1652 #[test]
1656 fn test_projection_different_field_from_filter() -> Result<()> {
1657 let table_scan = test_table_scan_with_struct()?;
1658 let plan = LogicalPlanBuilder::from(table_scan)
1659 .filter(leaf_udf(col("user"), "value").gt(lit(150)))?
1660 .project(vec![col("user"), leaf_udf(col("user"), "label")])?
1661 .build()?;
1662
1663 assert_stages!(plan, @r#"
1664 ## Original Plan
1665 Projection: test.user, leaf_udf(test.user, Utf8("label"))
1666 Filter: leaf_udf(test.user, Utf8("value")) > Int32(150)
1667 TableScan: test projection=[user]
1668
1669 ## After Extraction
1670 Projection: test.user, leaf_udf(test.user, Utf8("label"))
1671 Projection: test.user
1672 Filter: __datafusion_extracted_1 > Int32(150)
1673 Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1674 TableScan: test projection=[user]
1675
1676 ## After Pushdown
1677 Projection: test.user, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("label"))
1678 Filter: __datafusion_extracted_1 > Int32(150)
1679 Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user, leaf_udf(test.user, Utf8("label")) AS __datafusion_extracted_2
1680 TableScan: test projection=[user]
1681
1682 ## Optimized
1683 (same as after pushdown)
1684 "#)
1685 }
1686
1687 #[test]
1688 fn test_projection_deduplication() -> Result<()> {
1689 let table_scan = test_table_scan_with_struct()?;
1690 let field = leaf_udf(col("user"), "name");
1691 let plan = LogicalPlanBuilder::from(table_scan)
1692 .project(vec![field.clone(), field.clone().alias("name2")])?
1693 .build()?;
1694
1695 assert_stages!(plan, @r#"
1696 ## Original Plan
1697 Projection: leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("name")) AS name2
1698 TableScan: test projection=[user]
1699
1700 ## After Extraction
1701 (same as original)
1702
1703 ## After Pushdown
1704 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_1 AS name2
1705 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1706 TableScan: test projection=[user]
1707
1708 ## Optimized
1709 Projection: leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("name")) AS name2
1710 TableScan: test projection=[user]
1711 "#)
1712 }
1713
1714 #[test]
1720 fn test_extract_through_sort() -> Result<()> {
1721 let table_scan = test_table_scan_with_struct()?;
1722 let plan = LogicalPlanBuilder::from(table_scan)
1723 .sort(vec![col("user").sort(true, true)])?
1724 .project(vec![leaf_udf(col("user"), "name")])?
1725 .build()?;
1726
1727 assert_stages!(plan, @r#"
1728 ## Original Plan
1729 Projection: leaf_udf(test.user, Utf8("name"))
1730 Sort: test.user ASC NULLS FIRST
1731 TableScan: test projection=[user]
1732
1733 ## After Extraction
1734 (same as original)
1735
1736 ## After Pushdown
1737 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1738 Sort: test.user ASC NULLS FIRST
1739 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1740 TableScan: test projection=[user]
1741
1742 ## Optimized
1743 (same as after pushdown)
1744 "#)
1745 }
1746
1747 #[test]
1749 fn test_extract_through_limit() -> Result<()> {
1750 let table_scan = test_table_scan_with_struct()?;
1751 let plan = LogicalPlanBuilder::from(table_scan)
1752 .limit(0, Some(10))?
1753 .project(vec![leaf_udf(col("user"), "name")])?
1754 .build()?;
1755
1756 assert_stages!(plan, @r#"
1757 ## Original Plan
1758 Projection: leaf_udf(test.user, Utf8("name"))
1759 Limit: skip=0, fetch=10
1760 TableScan: test projection=[user]
1761
1762 ## After Extraction
1763 (same as original)
1764
1765 ## After Pushdown
1766 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1767 Limit: skip=0, fetch=10
1768 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1769 TableScan: test projection=[user]
1770
1771 ## Optimized
1772 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1773 Limit: skip=0, fetch=10
1774 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1775 TableScan: test projection=[user]
1776 "#)
1777 }
1778
1779 #[test]
1781 fn test_extract_from_aliased_aggregate() -> Result<()> {
1782 use datafusion_expr::test::function_stub::count;
1783
1784 let table_scan = test_table_scan_with_struct()?;
1785 let plan = LogicalPlanBuilder::from(table_scan)
1786 .aggregate(
1787 vec![col("user")],
1788 vec![count(leaf_udf(col("user"), "value")).alias("cnt")],
1789 )?
1790 .build()?;
1791
1792 assert_stages!(plan, @r#"
1793 ## Original Plan
1794 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(leaf_udf(test.user, Utf8("value"))) AS cnt]]
1795 TableScan: test projection=[user]
1796
1797 ## After Extraction
1798 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(__datafusion_extracted_1) AS cnt]]
1799 Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1800 TableScan: test projection=[user]
1801
1802 ## After Pushdown
1803 (same as after extraction)
1804
1805 ## Optimized
1806 (same as after pushdown)
1807 "#)
1808 }
1809
1810 #[test]
1812 fn test_aggregate_no_extraction() -> Result<()> {
1813 use datafusion_expr::test::function_stub::count;
1814
1815 let table_scan = test_table_scan()?;
1816 let plan = LogicalPlanBuilder::from(table_scan)
1817 .aggregate(vec![col("a")], vec![count(col("b"))])?
1818 .build()?;
1819
1820 assert_stages!(plan, @"
1821 ## Original Plan
1822 Aggregate: groupBy=[[test.a]], aggr=[[COUNT(test.b)]]
1823 TableScan: test projection=[a, b]
1824
1825 ## After Extraction
1826 (same as original)
1827
1828 ## After Pushdown
1829 (same as after extraction)
1830
1831 ## Optimized
1832 (same as after pushdown)
1833 ")
1834 }
1835
1836 #[test]
1838 fn test_skip_extracted_projection() -> Result<()> {
1839 let table_scan = test_table_scan_with_struct()?;
1840 let plan = LogicalPlanBuilder::from(table_scan)
1841 .project(vec![
1842 leaf_udf(col("user"), "name").alias("__datafusion_extracted_manual"),
1843 col("user"),
1844 ])?
1845 .build()?;
1846
1847 assert_stages!(plan, @r#"
1848 ## Original Plan
1849 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_manual, test.user
1850 TableScan: test projection=[user]
1851
1852 ## After Extraction
1853 (same as original)
1854
1855 ## After Pushdown
1856 (same as after extraction)
1857
1858 ## Optimized
1859 (same as after pushdown)
1860 "#)
1861 }
1862
1863 #[test]
1865 fn test_merge_into_existing_extracted_projection() -> Result<()> {
1866 let table_scan = test_table_scan_with_struct()?;
1867 let plan = LogicalPlanBuilder::from(table_scan)
1868 .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1869 .filter(leaf_udf(col("user"), "name").is_not_null())?
1870 .build()?;
1871
1872 assert_stages!(plan, @r#"
1873 ## Original Plan
1874 Filter: leaf_udf(test.user, Utf8("name")) IS NOT NULL
1875 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1876 TableScan: test projection=[id, user]
1877
1878 ## After Extraction
1879 Projection: test.id, test.user
1880 Filter: __datafusion_extracted_1 IS NOT NULL
1881 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1882 Projection: test.id, test.user
1883 Filter: __datafusion_extracted_2 = Utf8("active")
1884 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user
1885 TableScan: test projection=[id, user]
1886
1887 ## After Pushdown
1888 Projection: test.id, test.user
1889 Filter: __datafusion_extracted_1 IS NOT NULL
1890 Filter: __datafusion_extracted_2 = Utf8("active")
1891 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1892 TableScan: test projection=[id, user]
1893
1894 ## Optimized
1895 Projection: test.id, test.user
1896 Filter: __datafusion_extracted_1 IS NOT NULL
1897 Projection: test.id, test.user, __datafusion_extracted_1
1898 Filter: __datafusion_extracted_2 = Utf8("active")
1899 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1900 TableScan: test projection=[id, user]
1901 "#)
1902 }
1903
1904 #[test]
1906 fn test_extract_through_passthrough_projection() -> Result<()> {
1907 let table_scan = test_table_scan_with_struct()?;
1908 let plan = LogicalPlanBuilder::from(table_scan)
1909 .project(vec![col("user")])?
1910 .project(vec![leaf_udf(col("user"), "name")])?
1911 .build()?;
1912
1913 assert_stages!(plan, @r#"
1914 ## Original Plan
1915 Projection: leaf_udf(test.user, Utf8("name"))
1916 TableScan: test projection=[user]
1917
1918 ## After Extraction
1919 (same as original)
1920
1921 ## After Pushdown
1922 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1923 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1924 TableScan: test projection=[user]
1925
1926 ## Optimized
1927 Projection: leaf_udf(test.user, Utf8("name"))
1928 TableScan: test projection=[user]
1929 "#)
1930 }
1931
1932 #[test]
1934 fn test_projection_early_return_no_extraction() -> Result<()> {
1935 let table_scan = test_table_scan()?;
1936 let plan = LogicalPlanBuilder::from(table_scan)
1937 .project(vec![col("a").alias("x"), col("b")])?
1938 .build()?;
1939
1940 assert_stages!(plan, @"
1941 ## Original Plan
1942 Projection: test.a AS x, test.b
1943 TableScan: test projection=[a, b]
1944
1945 ## After Extraction
1946 (same as original)
1947
1948 ## After Pushdown
1949 (same as after extraction)
1950
1951 ## Optimized
1952 (same as after pushdown)
1953 ")
1954 }
1955
1956 #[test]
1958 fn test_projection_with_arithmetic_no_extraction() -> Result<()> {
1959 let table_scan = test_table_scan()?;
1960 let plan = LogicalPlanBuilder::from(table_scan)
1961 .project(vec![(col("a") + col("b")).alias("sum")])?
1962 .build()?;
1963
1964 assert_stages!(plan, @"
1965 ## Original Plan
1966 Projection: test.a + test.b AS sum
1967 TableScan: test projection=[a, b]
1968
1969 ## After Extraction
1970 (same as original)
1971
1972 ## After Pushdown
1973 (same as after extraction)
1974
1975 ## Optimized
1976 (same as after pushdown)
1977 ")
1978 }
1979
1980 #[test]
1982 fn test_aggregate_merge_into_extracted_projection() -> Result<()> {
1983 use datafusion_expr::test::function_stub::count;
1984
1985 let table_scan = test_table_scan_with_struct()?;
1986 let plan = LogicalPlanBuilder::from(table_scan)
1987 .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1988 .aggregate(vec![leaf_udf(col("user"), "name")], vec![count(lit(1))])?
1989 .build()?;
1990
1991 assert_stages!(plan, @r#"
1992 ## Original Plan
1993 Aggregate: groupBy=[[leaf_udf(test.user, Utf8("name"))]], aggr=[[COUNT(Int32(1))]]
1994 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1995 TableScan: test projection=[user]
1996
1997 ## After Extraction
1998 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
1999 Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2000 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2001 Projection: test.user
2002 Filter: __datafusion_extracted_2 = Utf8("active")
2003 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.user
2004 TableScan: test projection=[user]
2005
2006 ## After Pushdown
2007 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
2008 Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2009 Filter: __datafusion_extracted_2 = Utf8("active")
2010 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2011 TableScan: test projection=[user]
2012
2013 ## Optimized
2014 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
2015 Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2016 Projection: __datafusion_extracted_1
2017 Filter: __datafusion_extracted_2 = Utf8("active")
2018 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2019 TableScan: test projection=[user]
2020 "#)
2021 }
2022
2023 #[test]
2027 fn test_projection_with_leaf_expr_above_aggregate() -> Result<()> {
2028 use datafusion_expr::test::function_stub::count;
2029
2030 let table_scan = test_table_scan_with_struct()?;
2031 let plan = LogicalPlanBuilder::from(table_scan)
2032 .aggregate(vec![col("user")], vec![count(lit(1))])?
2033 .project(vec![
2034 leaf_udf(col("user"), "name")
2035 .is_not_null()
2036 .alias("has_name"),
2037 col("COUNT(Int32(1))"),
2038 ])?
2039 .build()?;
2040
2041 assert_stages!(plan, @r#"
2042 ## Original Plan
2043 Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name, COUNT(Int32(1))
2044 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2045 TableScan: test projection=[user]
2046
2047 ## After Extraction
2048 (same as original)
2049
2050 ## After Pushdown
2051 Projection: __datafusion_extracted_1 IS NOT NULL AS has_name, COUNT(Int32(1))
2052 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user, COUNT(Int32(1))
2053 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2054 TableScan: test projection=[user]
2055
2056 ## Optimized
2057 Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name, COUNT(Int32(1))
2058 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2059 TableScan: test projection=[user]
2060 "#)
2061 }
2062
2063 #[test]
2065 fn test_merge_with_new_columns() -> Result<()> {
2066 let table_scan = test_table_scan()?;
2067 let plan = LogicalPlanBuilder::from(table_scan)
2068 .filter(leaf_udf(col("a"), "x").eq(lit(1)))?
2069 .filter(leaf_udf(col("b"), "y").eq(lit(2)))?
2070 .build()?;
2071
2072 assert_stages!(plan, @r#"
2073 ## Original Plan
2074 Filter: leaf_udf(test.b, Utf8("y")) = Int32(2)
2075 Filter: leaf_udf(test.a, Utf8("x")) = Int32(1)
2076 TableScan: test projection=[a, b, c]
2077
2078 ## After Extraction
2079 Projection: test.a, test.b, test.c
2080 Filter: __datafusion_extracted_1 = Int32(2)
2081 Projection: leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1, test.a, test.b, test.c
2082 Projection: test.a, test.b, test.c
2083 Filter: __datafusion_extracted_2 = Int32(1)
2084 Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c
2085 TableScan: test projection=[a, b, c]
2086
2087 ## After Pushdown
2088 Projection: test.a, test.b, test.c
2089 Filter: __datafusion_extracted_1 = Int32(2)
2090 Filter: __datafusion_extracted_2 = Int32(1)
2091 Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c, leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1
2092 TableScan: test projection=[a, b, c]
2093
2094 ## Optimized
2095 Projection: test.a, test.b, test.c
2096 Filter: __datafusion_extracted_1 = Int32(2)
2097 Projection: test.a, test.b, test.c, __datafusion_extracted_1
2098 Filter: __datafusion_extracted_2 = Int32(1)
2099 Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c, leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1
2100 TableScan: test projection=[a, b, c]
2101 "#)
2102 }
2103
2104 fn test_table_scan_with_struct_named(name: &str) -> Result<LogicalPlan> {
2110 use arrow::datatypes::Schema;
2111 let schema = Schema::new(test_table_scan_with_struct_fields());
2112 datafusion_expr::logical_plan::table_scan(Some(name), &schema, None)?.build()
2113 }
2114
2115 #[test]
2117 fn test_extract_from_join_on() -> Result<()> {
2118 use datafusion_expr::JoinType;
2119
2120 let left = test_table_scan_with_struct()?;
2121 let right = test_table_scan_with_struct_named("right")?;
2122
2123 let plan = LogicalPlanBuilder::from(left)
2124 .join_with_expr_keys(
2125 right,
2126 JoinType::Inner,
2127 (
2128 vec![leaf_udf(col("user"), "id")],
2129 vec![leaf_udf(col("user"), "id")],
2130 ),
2131 None,
2132 )?
2133 .build()?;
2134
2135 assert_stages!(plan, @r#"
2136 ## Original Plan
2137 Inner Join: leaf_udf(test.user, Utf8("id")) = leaf_udf(right.user, Utf8("id"))
2138 TableScan: test projection=[id, user]
2139 TableScan: right projection=[id, user]
2140
2141 ## After Extraction
2142 Projection: test.id, test.user, right.id, right.user
2143 Inner Join: __datafusion_extracted_1 = __datafusion_extracted_2
2144 Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_1, test.id, test.user
2145 TableScan: test projection=[id, user]
2146 Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_2, right.id, right.user
2147 TableScan: right projection=[id, user]
2148
2149 ## After Pushdown
2150 (same as after extraction)
2151
2152 ## Optimized
2153 (same as after pushdown)
2154 "#)
2155 }
2156
2157 #[test]
2159 fn test_extract_from_join_filter() -> Result<()> {
2160 use datafusion_expr::JoinType;
2161
2162 let left = test_table_scan_with_struct()?;
2163 let right = test_table_scan_with_struct_named("right")?;
2164
2165 let plan = LogicalPlanBuilder::from(left)
2166 .join_on(
2167 right,
2168 JoinType::Inner,
2169 vec![
2170 col("test.user").eq(col("right.user")),
2171 leaf_udf(col("test.user"), "status").eq(lit("active")),
2172 ],
2173 )?
2174 .build()?;
2175
2176 assert_stages!(plan, @r#"
2177 ## Original Plan
2178 Inner Join: Filter: test.user = right.user AND leaf_udf(test.user, Utf8("status")) = Utf8("active")
2179 TableScan: test projection=[id, user]
2180 TableScan: right projection=[id, user]
2181
2182 ## After Extraction
2183 Projection: test.id, test.user, right.id, right.user
2184 Inner Join: Filter: test.user = right.user AND __datafusion_extracted_1 = Utf8("active")
2185 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2186 TableScan: test projection=[id, user]
2187 TableScan: right projection=[id, user]
2188
2189 ## After Pushdown
2190 (same as after extraction)
2191
2192 ## Optimized
2193 (same as after pushdown)
2194 "#)
2195 }
2196
2197 #[test]
2199 fn test_extract_from_join_both_sides() -> Result<()> {
2200 use datafusion_expr::JoinType;
2201
2202 let left = test_table_scan_with_struct()?;
2203 let right = test_table_scan_with_struct_named("right")?;
2204
2205 let plan = LogicalPlanBuilder::from(left)
2206 .join_on(
2207 right,
2208 JoinType::Inner,
2209 vec![
2210 col("test.user").eq(col("right.user")),
2211 leaf_udf(col("test.user"), "status").eq(lit("active")),
2212 leaf_udf(col("right.user"), "role").eq(lit("admin")),
2213 ],
2214 )?
2215 .build()?;
2216
2217 assert_stages!(plan, @r#"
2218 ## Original Plan
2219 Inner Join: Filter: test.user = right.user AND leaf_udf(test.user, Utf8("status")) = Utf8("active") AND leaf_udf(right.user, Utf8("role")) = Utf8("admin")
2220 TableScan: test projection=[id, user]
2221 TableScan: right projection=[id, user]
2222
2223 ## After Extraction
2224 Projection: test.id, test.user, right.id, right.user
2225 Inner Join: Filter: test.user = right.user AND __datafusion_extracted_1 = Utf8("active") AND __datafusion_extracted_2 = Utf8("admin")
2226 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2227 TableScan: test projection=[id, user]
2228 Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id, right.user
2229 TableScan: right projection=[id, user]
2230
2231 ## After Pushdown
2232 (same as after extraction)
2233
2234 ## Optimized
2235 (same as after pushdown)
2236 "#)
2237 }
2238
2239 #[test]
2241 fn test_extract_from_join_no_extraction() -> Result<()> {
2242 use datafusion_expr::JoinType;
2243
2244 let left = test_table_scan()?;
2245 let right = test_table_scan_with_name("right")?;
2246
2247 let plan = LogicalPlanBuilder::from(left)
2248 .join(right, JoinType::Inner, (vec!["a"], vec!["a"]), None)?
2249 .build()?;
2250
2251 assert_stages!(plan, @"
2252 ## Original Plan
2253 Inner Join: test.a = right.a
2254 TableScan: test projection=[a, b, c]
2255 TableScan: right projection=[a, b, c]
2256
2257 ## After Extraction
2258 (same as original)
2259
2260 ## After Pushdown
2261 (same as after extraction)
2262
2263 ## Optimized
2264 (same as after pushdown)
2265 ")
2266 }
2267
2268 #[test]
2270 fn test_extract_from_filter_above_join() -> Result<()> {
2271 use datafusion_expr::JoinType;
2272
2273 let left = test_table_scan_with_struct()?;
2274 let right = test_table_scan_with_struct_named("right")?;
2275
2276 let plan = LogicalPlanBuilder::from(left)
2277 .join_with_expr_keys(
2278 right,
2279 JoinType::Inner,
2280 (
2281 vec![leaf_udf(col("user"), "id")],
2282 vec![leaf_udf(col("user"), "id")],
2283 ),
2284 None,
2285 )?
2286 .filter(leaf_udf(col("test.user"), "status").eq(lit("active")))?
2287 .build()?;
2288
2289 assert_stages!(plan, @r#"
2290 ## Original Plan
2291 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
2292 Inner Join: leaf_udf(test.user, Utf8("id")) = leaf_udf(right.user, Utf8("id"))
2293 TableScan: test projection=[id, user]
2294 TableScan: right projection=[id, user]
2295
2296 ## After Extraction
2297 Projection: test.id, test.user, right.id, right.user
2298 Filter: __datafusion_extracted_1 = Utf8("active")
2299 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, right.id, right.user
2300 Projection: test.id, test.user, right.id, right.user
2301 Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2302 Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user
2303 TableScan: test projection=[id, user]
2304 Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2305 TableScan: right projection=[id, user]
2306
2307 ## After Pushdown
2308 Projection: test.id, test.user, right.id, right.user
2309 Filter: __datafusion_extracted_1 = Utf8("active")
2310 Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2311 Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
2312 TableScan: test projection=[id, user]
2313 Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2314 TableScan: right projection=[id, user]
2315
2316 ## Optimized
2317 Projection: test.id, test.user, right.id, right.user
2318 Filter: __datafusion_extracted_1 = Utf8("active")
2319 Projection: test.id, test.user, __datafusion_extracted_1, right.id, right.user
2320 Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2321 Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
2322 TableScan: test projection=[id, user]
2323 Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2324 TableScan: right projection=[id, user]
2325 "#)
2326 }
2327
2328 #[test]
2331 fn test_extract_projection_above_join() -> Result<()> {
2332 use datafusion_expr::JoinType;
2333
2334 let left = test_table_scan_with_struct()?;
2335 let right = test_table_scan_with_struct_named("right")?;
2336
2337 let plan = LogicalPlanBuilder::from(left)
2338 .join(right, JoinType::Inner, (vec!["id"], vec!["id"]), None)?
2339 .project(vec![
2340 leaf_udf(col("test.user"), "status"),
2341 leaf_udf(col("right.user"), "role"),
2342 ])?
2343 .build()?;
2344
2345 assert_stages!(plan, @r#"
2346 ## Original Plan
2347 Projection: leaf_udf(test.user, Utf8("status")), leaf_udf(right.user, Utf8("role"))
2348 Inner Join: test.id = right.id
2349 TableScan: test projection=[id, user]
2350 TableScan: right projection=[id, user]
2351
2352 ## After Extraction
2353 (same as original)
2354
2355 ## After Pushdown
2356 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), __datafusion_extracted_2 AS leaf_udf(right.user,Utf8("role"))
2357 Inner Join: test.id = right.id
2358 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2359 TableScan: test projection=[id, user]
2360 Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id, right.user
2361 TableScan: right projection=[id, user]
2362
2363 ## Optimized
2364 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), __datafusion_extracted_2 AS leaf_udf(right.user,Utf8("role"))
2365 Inner Join: test.id = right.id
2366 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id
2367 TableScan: test projection=[id, user]
2368 Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id
2369 TableScan: right projection=[id, user]
2370 "#)
2371 }
2372
2373 #[test]
2376 fn test_extract_from_join_qualified_right_side() -> Result<()> {
2377 use datafusion_expr::JoinType;
2378
2379 let left = test_table_scan_with_struct()?;
2380 let right = test_table_scan_with_struct_named("right")?;
2381
2382 let plan = LogicalPlanBuilder::from(left)
2384 .join_on(
2385 right,
2386 JoinType::Inner,
2387 vec![
2388 col("test.id").eq(col("right.id")),
2389 leaf_udf(col("right.user"), "status").eq(lit("active")),
2390 ],
2391 )?
2392 .build()?;
2393
2394 assert_stages!(plan, @r#"
2395 ## Original Plan
2396 Inner Join: Filter: test.id = right.id AND leaf_udf(right.user, Utf8("status")) = Utf8("active")
2397 TableScan: test projection=[id, user]
2398 TableScan: right projection=[id, user]
2399
2400 ## After Extraction
2401 Projection: test.id, test.user, right.id, right.user
2402 Inner Join: Filter: test.id = right.id AND __datafusion_extracted_1 = Utf8("active")
2403 TableScan: test projection=[id, user]
2404 Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user
2405 TableScan: right projection=[id, user]
2406
2407 ## After Pushdown
2408 (same as after extraction)
2409
2410 ## Optimized
2411 (same as after pushdown)
2412 "#)
2413 }
2414
2415 #[test]
2419 fn test_find_owning_input_ambiguous_unqualified_column() {
2420 use std::collections::HashSet;
2421
2422 let relation = "test".into();
2426 let left_cols: HashSet<ColumnReference> = [
2427 ColumnReference::new(Some(&relation), "user"),
2428 ColumnReference::new_unqualified("user"),
2429 ]
2430 .into_iter()
2431 .collect();
2432
2433 let relation = "right".into();
2434 let right_cols: HashSet<ColumnReference> = [
2435 ColumnReference::new(Some(&relation), "user"),
2436 ColumnReference::new_unqualified("user"),
2437 ]
2438 .into_iter()
2439 .collect();
2440
2441 let input_column_sets = vec![left_cols, right_cols];
2442
2443 let unqualified = Expr::Column(Column::new_unqualified("user"));
2445 assert_eq!(find_owning_input(&unqualified, &input_column_sets), None);
2446
2447 let qualified_right = Expr::Column(Column::new(Some("right"), "user"));
2449 assert_eq!(
2450 find_owning_input(&qualified_right, &input_column_sets),
2451 Some(1)
2452 );
2453
2454 let qualified_left = Expr::Column(Column::new(Some("test"), "user"));
2456 assert_eq!(
2457 find_owning_input(&qualified_left, &input_column_sets),
2458 Some(0)
2459 );
2460 }
2461
2462 #[test]
2465 fn test_extract_from_join_cross_input_expression() -> Result<()> {
2466 let left = test_table_scan_with_struct()?;
2467 let right = test_table_scan_with_struct_named("right")?;
2468
2469 let plan = LogicalPlanBuilder::from(left)
2470 .join_on(
2471 right,
2472 datafusion_expr::JoinType::Inner,
2473 vec![col("test.id").eq(col("right.id"))],
2474 )?
2475 .filter(
2476 leaf_udf(col("test.user"), "status")
2477 .eq(leaf_udf(col("right.user"), "status")),
2478 )?
2479 .build()?;
2480
2481 assert_stages!(plan, @r#"
2482 ## Original Plan
2483 Filter: leaf_udf(test.user, Utf8("status")) = leaf_udf(right.user, Utf8("status"))
2484 Inner Join: Filter: test.id = right.id
2485 TableScan: test projection=[id, user]
2486 TableScan: right projection=[id, user]
2487
2488 ## After Extraction
2489 Projection: test.id, test.user, right.id, right.user
2490 Filter: __datafusion_extracted_1 = __datafusion_extracted_2
2491 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, right.id, right.user
2492 Inner Join: Filter: test.id = right.id
2493 TableScan: test projection=[id, user]
2494 TableScan: right projection=[id, user]
2495
2496 ## After Pushdown
2497 Projection: test.id, test.user, right.id, right.user
2498 Filter: __datafusion_extracted_1 = __datafusion_extracted_2
2499 Inner Join: Filter: test.id = right.id
2500 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2501 TableScan: test projection=[id, user]
2502 Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_2, right.id, right.user
2503 TableScan: right projection=[id, user]
2504
2505 ## Optimized
2506 (same as after pushdown)
2507 "#)
2508 }
2509
2510 #[test]
2516 fn test_extract_through_filter_with_column_rename() -> Result<()> {
2517 let table_scan = test_table_scan_with_struct()?;
2518 let plan = LogicalPlanBuilder::from(table_scan)
2519 .project(vec![col("user").alias("x")])?
2520 .filter(col("x").is_not_null())?
2521 .project(vec![leaf_udf(col("x"), "a")])?
2522 .build()?;
2523
2524 assert_stages!(plan, @r#"
2525 ## Original Plan
2526 Projection: leaf_udf(x, Utf8("a"))
2527 Filter: x IS NOT NULL
2528 Projection: test.user AS x
2529 TableScan: test projection=[user]
2530
2531 ## After Extraction
2532 (same as original)
2533
2534 ## After Pushdown
2535 Projection: __datafusion_extracted_1 AS leaf_udf(x,Utf8("a"))
2536 Filter: x IS NOT NULL
2537 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2538 TableScan: test projection=[user]
2539
2540 ## Optimized
2541 Projection: __datafusion_extracted_1 AS leaf_udf(x,Utf8("a"))
2542 Filter: x IS NOT NULL
2543 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2544 TableScan: test projection=[user]
2545 "#)
2546 }
2547
2548 #[test]
2550 fn test_extract_partial_through_filter_with_column_rename() -> Result<()> {
2551 let table_scan = test_table_scan_with_struct()?;
2552 let plan = LogicalPlanBuilder::from(table_scan)
2553 .project(vec![col("user").alias("x")])?
2554 .filter(col("x").is_not_null())?
2555 .project(vec![leaf_udf(col("x"), "a").is_not_null()])?
2556 .build()?;
2557
2558 assert_stages!(plan, @r#"
2559 ## Original Plan
2560 Projection: leaf_udf(x, Utf8("a")) IS NOT NULL
2561 Filter: x IS NOT NULL
2562 Projection: test.user AS x
2563 TableScan: test projection=[user]
2564
2565 ## After Extraction
2566 (same as original)
2567
2568 ## After Pushdown
2569 Projection: __datafusion_extracted_1 IS NOT NULL AS leaf_udf(x,Utf8("a")) IS NOT NULL
2570 Filter: x IS NOT NULL
2571 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2572 TableScan: test projection=[user]
2573
2574 ## Optimized
2575 Projection: __datafusion_extracted_1 IS NOT NULL AS leaf_udf(x,Utf8("a")) IS NOT NULL
2576 Filter: x IS NOT NULL
2577 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2578 TableScan: test projection=[user]
2579 "#)
2580 }
2581
2582 #[test]
2584 fn test_extract_from_filter_above_renaming_projection() -> Result<()> {
2585 let table_scan = test_table_scan_with_struct()?;
2586 let plan = LogicalPlanBuilder::from(table_scan)
2587 .project(vec![col("user").alias("x")])?
2588 .filter(leaf_udf(col("x"), "a").eq(lit("active")))?
2589 .build()?;
2590
2591 assert_stages!(plan, @r#"
2592 ## Original Plan
2593 Filter: leaf_udf(x, Utf8("a")) = Utf8("active")
2594 Projection: test.user AS x
2595 TableScan: test projection=[user]
2596
2597 ## After Extraction
2598 Projection: x
2599 Filter: __datafusion_extracted_1 = Utf8("active")
2600 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2601 TableScan: test projection=[user]
2602
2603 ## After Pushdown
2604 (same as after extraction)
2605
2606 ## Optimized
2607 Projection: x
2608 Filter: __datafusion_extracted_1 = Utf8("active")
2609 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2610 TableScan: test projection=[user]
2611 "#)
2612 }
2613
2614 #[test]
2620 fn test_extract_through_subquery_alias() -> Result<()> {
2621 let table_scan = test_table_scan_with_struct()?;
2622 let plan = LogicalPlanBuilder::from(table_scan)
2623 .alias("sub")?
2624 .project(vec![leaf_udf(col("sub.user"), "name")])?
2625 .build()?;
2626
2627 assert_stages!(plan, @r#"
2628 ## Original Plan
2629 Projection: leaf_udf(sub.user, Utf8("name"))
2630 SubqueryAlias: sub
2631 TableScan: test projection=[user]
2632
2633 ## After Extraction
2634 (same as original)
2635
2636 ## After Pushdown
2637 Projection: __datafusion_extracted_1 AS leaf_udf(sub.user,Utf8("name"))
2638 SubqueryAlias: sub
2639 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2640 TableScan: test projection=[user]
2641
2642 ## Optimized
2643 Projection: __datafusion_extracted_1 AS leaf_udf(sub.user,Utf8("name"))
2644 SubqueryAlias: sub
2645 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2646 TableScan: test projection=[user]
2647 "#)
2648 }
2649
2650 #[test]
2652 fn test_extract_through_subquery_alias_with_filter() -> Result<()> {
2653 let table_scan = test_table_scan_with_struct()?;
2654 let plan = LogicalPlanBuilder::from(table_scan)
2655 .alias("sub")?
2656 .filter(leaf_udf(col("sub.user"), "status").eq(lit("active")))?
2657 .project(vec![leaf_udf(col("sub.user"), "name")])?
2658 .build()?;
2659
2660 assert_stages!(plan, @r#"
2661 ## Original Plan
2662 Projection: leaf_udf(sub.user, Utf8("name"))
2663 Filter: leaf_udf(sub.user, Utf8("status")) = Utf8("active")
2664 SubqueryAlias: sub
2665 TableScan: test projection=[user]
2666
2667 ## After Extraction
2668 Projection: leaf_udf(sub.user, Utf8("name"))
2669 Projection: sub.user
2670 Filter: __datafusion_extracted_1 = Utf8("active")
2671 Projection: leaf_udf(sub.user, Utf8("status")) AS __datafusion_extracted_1, sub.user
2672 SubqueryAlias: sub
2673 TableScan: test projection=[user]
2674
2675 ## After Pushdown
2676 Projection: __datafusion_extracted_2 AS leaf_udf(sub.user,Utf8("name"))
2677 Filter: __datafusion_extracted_1 = Utf8("active")
2678 SubqueryAlias: sub
2679 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.user
2680 TableScan: test projection=[user]
2681
2682 ## Optimized
2683 Projection: __datafusion_extracted_2 AS leaf_udf(sub.user,Utf8("name"))
2684 Filter: __datafusion_extracted_1 = Utf8("active")
2685 SubqueryAlias: sub
2686 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2687 TableScan: test projection=[user]
2688 "#)
2689 }
2690
2691 #[test]
2693 fn test_extract_through_nested_subquery_alias() -> Result<()> {
2694 let table_scan = test_table_scan_with_struct()?;
2695 let plan = LogicalPlanBuilder::from(table_scan)
2696 .alias("inner_sub")?
2697 .alias("outer_sub")?
2698 .project(vec![leaf_udf(col("outer_sub.user"), "name")])?
2699 .build()?;
2700
2701 assert_stages!(plan, @r#"
2702 ## Original Plan
2703 Projection: leaf_udf(outer_sub.user, Utf8("name"))
2704 SubqueryAlias: outer_sub
2705 SubqueryAlias: inner_sub
2706 TableScan: test projection=[user]
2707
2708 ## After Extraction
2709 (same as original)
2710
2711 ## After Pushdown
2712 Projection: __datafusion_extracted_1 AS leaf_udf(outer_sub.user,Utf8("name"))
2713 SubqueryAlias: outer_sub
2714 SubqueryAlias: inner_sub
2715 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2716 TableScan: test projection=[user]
2717
2718 ## Optimized
2719 Projection: __datafusion_extracted_1 AS leaf_udf(outer_sub.user,Utf8("name"))
2720 SubqueryAlias: outer_sub
2721 SubqueryAlias: inner_sub
2722 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2723 TableScan: test projection=[user]
2724 "#)
2725 }
2726
2727 #[test]
2729 fn test_subquery_alias_no_extraction() -> Result<()> {
2730 let table_scan = test_table_scan()?;
2731 let plan = LogicalPlanBuilder::from(table_scan)
2732 .alias("sub")?
2733 .project(vec![col("sub.a"), col("sub.b")])?
2734 .build()?;
2735
2736 assert_stages!(plan, @"
2737 ## Original Plan
2738 SubqueryAlias: sub
2739 TableScan: test projection=[a, b]
2740
2741 ## After Extraction
2742 (same as original)
2743
2744 ## After Pushdown
2745 (same as after extraction)
2746
2747 ## Optimized
2748 (same as after pushdown)
2749 ")
2750 }
2751
2752 #[test]
2756 fn test_different_udfs_same_schema_name_not_deduplicated() -> Result<()> {
2757 let udf_a = Arc::new(ScalarUDF::new_from_impl(
2758 PlacementTestUDF::new()
2759 .with_placement(ExpressionPlacement::MoveTowardsLeafNodes)
2760 .with_id(1),
2761 ));
2762 let udf_b = Arc::new(ScalarUDF::new_from_impl(
2763 PlacementTestUDF::new()
2764 .with_placement(ExpressionPlacement::MoveTowardsLeafNodes)
2765 .with_id(2),
2766 ));
2767
2768 let expr_a = Expr::ScalarFunction(ScalarFunction::new_udf(
2769 udf_a,
2770 vec![col("user"), lit("field")],
2771 ));
2772 let expr_b = Expr::ScalarFunction(ScalarFunction::new_udf(
2773 udf_b,
2774 vec![col("user"), lit("field")],
2775 ));
2776
2777 assert_eq!(
2779 expr_a.schema_name().to_string(),
2780 expr_b.schema_name().to_string(),
2781 "Both expressions should have the same schema_name"
2782 );
2783 assert_ne!(
2784 expr_a, expr_b,
2785 "Expressions should NOT be equal (different UDF instances)"
2786 );
2787
2788 let table_scan = test_table_scan_with_struct()?;
2789 let plan = LogicalPlanBuilder::from(table_scan.clone())
2790 .filter(expr_a.clone().eq(lit("a")).and(expr_b.clone().eq(lit("b"))))?
2791 .select(vec![
2792 table_scan
2793 .schema()
2794 .index_of_column_by_name(None, "id")
2795 .unwrap(),
2796 ])?
2797 .build()?;
2798
2799 assert_stages!(plan, @r#"
2800 ## Original Plan
2801 Projection: test.id
2802 Filter: leaf_udf(test.user, Utf8("field")) = Utf8("a") AND leaf_udf(test.user, Utf8("field")) = Utf8("b")
2803 TableScan: test projection=[id, user]
2804
2805 ## After Extraction
2806 Projection: test.id
2807 Projection: test.id, test.user
2808 Filter: __datafusion_extracted_1 = Utf8("a") AND __datafusion_extracted_2 = Utf8("b")
2809 Projection: leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_2, test.id, test.user
2810 TableScan: test projection=[id, user]
2811
2812 ## After Pushdown
2813 (same as after extraction)
2814
2815 ## Optimized
2816 Projection: test.id
2817 Filter: __datafusion_extracted_1 = Utf8("a") AND __datafusion_extracted_2 = Utf8("b")
2818 Projection: leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_2, test.id
2819 TableScan: test projection=[id, user]
2820 "#)
2821 }
2822
2823 #[test]
2830 fn test_extraction_pushdown_through_filter_with_extracted_predicate() -> Result<()> {
2831 let table_scan = test_table_scan_with_struct()?;
2832 let plan = LogicalPlanBuilder::from(table_scan)
2833 .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
2834 .project(vec![col("id"), leaf_udf(col("user"), "name")])?
2835 .build()?;
2836
2837 assert_stages!(plan, @r#"
2838 ## Original Plan
2839 Projection: test.id, leaf_udf(test.user, Utf8("name"))
2840 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
2841 TableScan: test projection=[id, user]
2842
2843 ## After Extraction
2844 Projection: test.id, leaf_udf(test.user, Utf8("name"))
2845 Projection: test.id, test.user
2846 Filter: __datafusion_extracted_1 = Utf8("active")
2847 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2848 TableScan: test projection=[id, user]
2849
2850 ## After Pushdown
2851 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
2852 Filter: __datafusion_extracted_1 = Utf8("active")
2853 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2854 TableScan: test projection=[id, user]
2855
2856 ## Optimized
2857 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
2858 Filter: __datafusion_extracted_1 = Utf8("active")
2859 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2860 TableScan: test projection=[id, user]
2861 "#)
2862 }
2863
2864 #[test]
2866 fn test_extraction_pushdown_same_expr_in_filter_and_projection() -> Result<()> {
2867 let table_scan = test_table_scan_with_struct()?;
2868 let field_expr = leaf_udf(col("user"), "status");
2869 let plan = LogicalPlanBuilder::from(table_scan)
2870 .filter(field_expr.clone().gt(lit(5)))?
2871 .project(vec![col("id"), field_expr])?
2872 .build()?;
2873
2874 assert_stages!(plan, @r#"
2875 ## Original Plan
2876 Projection: test.id, leaf_udf(test.user, Utf8("status"))
2877 Filter: leaf_udf(test.user, Utf8("status")) > Int32(5)
2878 TableScan: test projection=[id, user]
2879
2880 ## After Extraction
2881 Projection: test.id, leaf_udf(test.user, Utf8("status"))
2882 Projection: test.id, test.user
2883 Filter: __datafusion_extracted_1 > Int32(5)
2884 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2885 TableScan: test projection=[id, user]
2886
2887 ## After Pushdown
2888 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("status"))
2889 Filter: __datafusion_extracted_1 > Int32(5)
2890 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2
2891 TableScan: test projection=[id, user]
2892
2893 ## Optimized
2894 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("status"))
2895 Filter: __datafusion_extracted_1 > Int32(5)
2896 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2
2897 TableScan: test projection=[id, user]
2898 "#)
2899 }
2900
2901 #[test]
2904 fn test_left_join_with_filter_and_projection_extraction() -> Result<()> {
2905 use datafusion_expr::JoinType;
2906
2907 let left = test_table_scan_with_struct()?;
2908 let right = test_table_scan_with_struct_named("right")?;
2909
2910 let plan = LogicalPlanBuilder::from(left)
2911 .join_on(
2912 right,
2913 JoinType::Left,
2914 vec![
2915 col("test.id").eq(col("right.id")),
2916 leaf_udf(col("right.user"), "status").gt(lit(5)),
2917 ],
2918 )?
2919 .project(vec![
2920 col("test.id"),
2921 leaf_udf(col("test.user"), "name"),
2922 leaf_udf(col("right.user"), "status"),
2923 ])?
2924 .build()?;
2925
2926 assert_stages!(plan, @r#"
2927 ## Original Plan
2928 Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(right.user, Utf8("status"))
2929 Left Join: Filter: test.id = right.id AND leaf_udf(right.user, Utf8("status")) > Int32(5)
2930 TableScan: test projection=[id, user]
2931 TableScan: right projection=[id, user]
2932
2933 ## After Extraction
2934 Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(right.user, Utf8("status"))
2935 Projection: test.id, test.user, right.id, right.user
2936 Left Join: Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2937 TableScan: test projection=[id, user]
2938 Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user
2939 TableScan: right projection=[id, user]
2940
2941 ## After Pushdown
2942 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(right.user,Utf8("status"))
2943 Left Join: Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2944 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.id, test.user
2945 TableScan: test projection=[id, user]
2946 Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_3
2947 TableScan: right projection=[id, user]
2948
2949 ## Optimized
2950 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(right.user,Utf8("status"))
2951 Left Join: Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2952 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.id
2953 TableScan: test projection=[id, user]
2954 Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_3
2955 TableScan: right projection=[id, user]
2956 "#)
2957 }
2958
2959 #[test]
2962 fn test_pure_extraction_proj_push_through_filter() -> Result<()> {
2963 let table_scan = test_table_scan_with_struct()?;
2964 let plan = LogicalPlanBuilder::from(table_scan)
2965 .filter(leaf_udf(col("user"), "status").gt(lit(5)))?
2966 .project(vec![
2967 col("id"),
2968 leaf_udf(col("user"), "name"),
2969 leaf_udf(col("user"), "status"),
2970 ])?
2971 .build()?;
2972
2973 assert_stages!(plan, @r#"
2974 ## Original Plan
2975 Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("status"))
2976 Filter: leaf_udf(test.user, Utf8("status")) > Int32(5)
2977 TableScan: test projection=[id, user]
2978
2979 ## After Extraction
2980 Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("status"))
2981 Projection: test.id, test.user
2982 Filter: __datafusion_extracted_1 > Int32(5)
2983 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2984 TableScan: test projection=[id, user]
2985
2986 ## After Pushdown
2987 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(test.user,Utf8("status"))
2988 Filter: __datafusion_extracted_1 > Int32(5)
2989 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_3
2990 TableScan: test projection=[id, user]
2991
2992 ## Optimized
2993 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(test.user,Utf8("status"))
2994 Filter: __datafusion_extracted_1 > Int32(5)
2995 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_3
2996 TableScan: test projection=[id, user]
2997 "#)
2998 }
2999
3000 #[test]
3004 fn test_merge_extraction_into_projection_with_column_ref_inflation() -> Result<()> {
3005 let table_scan = test_table_scan_with_struct()?;
3006
3007 let inner = LogicalPlanBuilder::from(table_scan)
3009 .project(vec![col("user"), col("id")])?
3010 .build()?;
3011
3012 let plan = LogicalPlanBuilder::from(inner)
3015 .project(vec![
3016 leaf_udf(col("user"), "status")
3017 .alias(format!("{EXTRACTED_EXPR_PREFIX}_1")),
3018 col("id"),
3019 ])?
3020 .build()?;
3021
3022 let ctx = OptimizerContext::new().with_max_passes(1);
3024 let optimizer =
3025 Optimizer::with_rules(vec![Arc::new(PushDownLeafProjections::new())]);
3026 let result = optimizer.optimize(plan, &ctx, |_, _| {})?;
3027
3028 insta::assert_snapshot!(format!("{result}"), @r#"
3031 Projection: __datafusion_extracted_1, test.id
3032 Projection: test.user, test.id, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
3033 TableScan: test
3034 "#);
3035
3036 Ok(())
3037 }
3038}