1use indexmap::{IndexMap, IndexSet};
24use std::collections::HashMap;
25use std::sync::Arc;
26
27use datafusion_common::alias::AliasGenerator;
28use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
29use datafusion_common::{Column, DFSchema, Result, qualified_name};
30use datafusion_expr::logical_plan::LogicalPlan;
31use datafusion_expr::{Expr, ExpressionPlacement, Projection};
32
33use crate::optimizer::ApplyOrder;
34use crate::push_down_filter::replace_cols_by_name;
35use crate::utils::has_all_column_refs;
36use crate::{OptimizerConfig, OptimizerRule};
37
38const EXTRACTED_EXPR_PREFIX: &str = "__datafusion_extracted";
45
46fn has_extractable_expr(exprs: &[Expr]) -> bool {
54 exprs.iter().any(|expr| {
55 expr.exists(|e| Ok(e.placement() == ExpressionPlacement::MoveTowardsLeafNodes))
56 .unwrap_or(false)
57 })
58}
59
60#[derive(Default, Debug)]
103pub struct ExtractLeafExpressions {}
104
105impl ExtractLeafExpressions {
106 pub fn new() -> Self {
108 Self {}
109 }
110}
111
112impl OptimizerRule for ExtractLeafExpressions {
113 fn name(&self) -> &str {
114 "extract_leaf_expressions"
115 }
116
117 fn rewrite(
118 &self,
119 plan: LogicalPlan,
120 config: &dyn OptimizerConfig,
121 ) -> Result<Transformed<LogicalPlan>> {
122 if !config.options().optimizer.enable_leaf_expression_pushdown {
123 return Ok(Transformed::no(plan));
124 }
125 let alias_generator = config.alias_generator();
126
127 advance_generator_past_existing(&plan, alias_generator)?;
130
131 plan.transform_down_with_subqueries(|plan| {
132 extract_from_plan(plan, alias_generator)
133 })
134 }
135}
136
137fn advance_generator_past_existing(
141 plan: &LogicalPlan,
142 alias_generator: &AliasGenerator,
143) -> Result<()> {
144 plan.apply(|plan| {
145 plan.expressions().iter().try_for_each(|expr| {
146 expr.apply(|e| {
147 if let Expr::Alias(alias) = e
148 && let Some(id) = alias
149 .name
150 .strip_prefix(EXTRACTED_EXPR_PREFIX)
151 .and_then(|s| s.strip_prefix('_'))
152 .and_then(|s| s.parse().ok())
153 {
154 alias_generator.update_min_id(id);
155 }
156 Ok(TreeNodeRecursion::Continue)
157 })?;
158 Ok::<(), datafusion_common::error::DataFusionError>(())
159 })?;
160 Ok(TreeNodeRecursion::Continue)
161 })
162 .map(|_| ())
163}
164
165fn extract_from_plan(
172 plan: LogicalPlan,
173 alias_generator: &Arc<AliasGenerator>,
174) -> Result<Transformed<LogicalPlan>> {
175 if !matches!(
180 &plan,
181 LogicalPlan::Aggregate(_)
182 | LogicalPlan::Filter(_)
183 | LogicalPlan::Sort(_)
184 | LogicalPlan::Limit(_)
185 | LogicalPlan::Join(_)
186 ) {
187 return Ok(Transformed::no(plan));
188 }
189
190 let inputs = plan.inputs();
191 if inputs.is_empty() {
192 return Ok(Transformed::no(plan));
193 }
194
195 if !has_extractable_expr(&plan.expressions()) {
197 return Ok(Transformed::no(plan));
198 }
199
200 let original_schema = Arc::clone(plan.schema());
202
203 let input_schemas: Vec<Arc<DFSchema>> =
207 inputs.iter().map(|i| Arc::clone(i.schema())).collect();
208
209 let mut extractors: Vec<LeafExpressionExtractor> = input_schemas
211 .iter()
212 .map(|schema| LeafExpressionExtractor::new(schema.as_ref(), alias_generator))
213 .collect();
214
215 let input_column_sets: Vec<std::collections::HashSet<Column>> = input_schemas
217 .iter()
218 .map(|schema| schema_columns(schema.as_ref()))
219 .collect();
220
221 let transformed = plan.map_expressions(|expr| {
223 routing_extract(expr, &mut extractors, &input_column_sets)
224 })?;
225
226 if !transformed.transformed {
228 return Ok(transformed);
229 }
230
231 let owned_inputs: Vec<Arc<LogicalPlan>> = transformed
234 .data
235 .inputs()
236 .into_iter()
237 .map(|i| Arc::new(i.clone()))
238 .collect();
239
240 let new_inputs: Vec<LogicalPlan> = owned_inputs
242 .into_iter()
243 .zip(extractors.iter())
244 .map(|(input_arc, extractor)| {
245 match extractor.build_extraction_projection(&input_arc)? {
246 Some(plan) => Ok(plan),
247 None => {
250 Ok(Arc::try_unwrap(input_arc).unwrap_or_else(|arc| (*arc).clone()))
251 }
252 }
253 })
254 .collect::<Result<Vec<_>>>()?;
255
256 let new_plan = transformed
259 .data
260 .with_new_exprs(transformed.data.expressions(), new_inputs)?;
261
262 let recovered = build_recovery_projection(original_schema.as_ref(), new_plan)?;
264
265 Ok(Transformed::yes(recovered))
266}
267
268fn find_owning_input(
274 expr: &Expr,
275 input_column_sets: &[std::collections::HashSet<Column>],
276) -> Option<usize> {
277 let mut found = None;
278 for (idx, cols) in input_column_sets.iter().enumerate() {
279 if has_all_column_refs(expr, cols) {
280 if found.is_some() {
281 return None;
283 }
284 found = Some(idx);
285 }
286 }
287 found
288}
289
290fn routing_extract(
293 expr: Expr,
294 extractors: &mut [LeafExpressionExtractor],
295 input_column_sets: &[std::collections::HashSet<Column>],
296) -> Result<Transformed<Expr>> {
297 expr.transform_down(|e| {
298 if let Expr::Alias(alias) = &e
300 && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
301 {
302 return Ok(Transformed {
303 data: e,
304 transformed: false,
305 tnr: TreeNodeRecursion::Jump,
306 });
307 }
308
309 if matches!(&e, Expr::Alias(_)) {
312 return Ok(Transformed::no(e));
313 }
314
315 match e.placement() {
316 ExpressionPlacement::MoveTowardsLeafNodes => {
317 if let Some(idx) = find_owning_input(&e, input_column_sets) {
318 let col_ref = extractors[idx].add_extracted(e)?;
319 Ok(Transformed::yes(col_ref))
320 } else {
321 Ok(Transformed::no(e))
323 }
324 }
325 ExpressionPlacement::Column => {
326 if let Expr::Column(col) = &e
332 && let Some(idx) = find_owning_input(&e, input_column_sets)
333 {
334 extractors[idx].columns_needed.insert(col.clone());
335 }
336 Ok(Transformed::no(e))
337 }
338 _ => Ok(Transformed::no(e)),
339 }
340 })
341}
342
343fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> {
345 schema
346 .iter()
347 .flat_map(|(qualifier, field)| {
348 [
349 Column::new(qualifier.cloned(), field.name()),
350 Column::new_unqualified(field.name()),
351 ]
352 })
353 .collect()
354}
355
356fn remap_pairs_and_columns(
366 pairs: &[(Expr, String)],
367 columns: &IndexSet<Column>,
368 from_schema: &DFSchema,
369 to_schema: &DFSchema,
370) -> Result<ExtractionTarget> {
371 let mut replace_map = HashMap::new();
372 for ((from_q, from_f), (to_q, to_f)) in from_schema.iter().zip(to_schema.iter()) {
373 replace_map.insert(
374 qualified_name(from_q, from_f.name()),
375 Expr::Column(Column::new(to_q.cloned(), to_f.name())),
376 );
377 }
378 let remapped_pairs: Vec<(Expr, String)> = pairs
379 .iter()
380 .map(|(expr, alias)| {
381 Ok((
382 replace_cols_by_name(expr.clone(), &replace_map)?,
383 alias.clone(),
384 ))
385 })
386 .collect::<Result<_>>()?;
387 let remapped_columns: IndexSet<Column> = columns
388 .iter()
389 .filter_map(|col| {
390 let rewritten =
391 replace_cols_by_name(Expr::Column(col.clone()), &replace_map).ok()?;
392 if let Expr::Column(c) = rewritten {
393 Some(c)
394 } else {
395 Some(col.clone())
396 }
397 })
398 .collect();
399 Ok(ExtractionTarget {
400 pairs: remapped_pairs,
401 columns: remapped_columns,
402 })
403}
404
405struct ExtractionTarget {
412 pairs: Vec<(Expr, String)>,
414 columns: IndexSet<Column>,
416}
417
418fn build_projection_replace_map(projection: &Projection) -> HashMap<String, Expr> {
423 projection
424 .schema
425 .iter()
426 .zip(projection.expr.iter())
427 .map(|((qualifier, field), expr)| {
428 let key = Column::from((qualifier, field)).flat_name();
429 (key, expr.clone().unalias())
430 })
431 .collect()
432}
433
434fn build_recovery_projection(
458 original_schema: &DFSchema,
459 input: LogicalPlan,
460) -> Result<LogicalPlan> {
461 let new_schema = input.schema();
462 let orig_len = original_schema.fields().len();
463 let new_len = new_schema.fields().len();
464
465 if orig_len == new_len {
466 let schemas_match = original_schema.iter().zip(new_schema.iter()).all(
468 |((orig_q, orig_f), (new_q, new_f))| {
469 orig_f.name() == new_f.name() && orig_q == new_q
470 },
471 );
472 if schemas_match {
473 return Ok(input);
474 }
475
476 debug_assert!(
484 orig_len == new_len,
485 "build_recovery_projection: positional mapping requires same field count, \
486 got original={orig_len} vs new={new_len}"
487 );
488 let mut proj_exprs = Vec::with_capacity(orig_len);
489 for (i, (orig_qualifier, orig_field)) in original_schema.iter().enumerate() {
490 let (new_qualifier, new_field) = new_schema.qualified_field(i);
491 if orig_field.name() == new_field.name() && orig_qualifier == new_qualifier {
492 proj_exprs.push(Expr::from((orig_qualifier, orig_field)));
493 } else {
494 let new_col = Expr::Column(Column::from((new_qualifier, new_field)));
495 proj_exprs.push(
496 new_col.alias_qualified(orig_qualifier.cloned(), orig_field.name()),
497 );
498 }
499 }
500 let projection = Projection::try_new(proj_exprs, Arc::new(input))?;
501 Ok(LogicalPlan::Projection(projection))
502 } else {
503 let col_exprs: Vec<Expr> = original_schema.iter().map(Expr::from).collect();
506 let projection = Projection::try_new(col_exprs, Arc::new(input))?;
507 Ok(LogicalPlan::Projection(projection))
508 }
509}
510
511struct LeafExpressionExtractor<'a> {
522 extracted: IndexMap<Expr, String>,
524 columns_needed: IndexSet<Column>,
527 input_schema: &'a DFSchema,
529 alias_generator: &'a Arc<AliasGenerator>,
531}
532
533impl<'a> LeafExpressionExtractor<'a> {
534 fn new(input_schema: &'a DFSchema, alias_generator: &'a Arc<AliasGenerator>) -> Self {
535 Self {
536 extracted: IndexMap::new(),
537 columns_needed: IndexSet::new(),
538 input_schema,
539 alias_generator,
540 }
541 }
542
543 fn add_extracted(&mut self, expr: Expr) -> Result<Expr> {
545 if let Some(alias) = self.extracted.get(&expr) {
547 return Ok(Expr::Column(Column::new_unqualified(alias)));
548 }
549
550 for col in expr.column_refs() {
552 self.columns_needed.insert(col.clone());
553 }
554
555 let alias = self.alias_generator.next(EXTRACTED_EXPR_PREFIX);
557 self.extracted.insert(expr, alias.clone());
558
559 Ok(Expr::Column(Column::new_unqualified(&alias)))
560 }
561
562 fn build_extraction_projection(
568 &self,
569 input: &Arc<LogicalPlan>,
570 ) -> Result<Option<LogicalPlan>> {
571 if self.extracted.is_empty() {
572 return Ok(None);
573 }
574 let pairs: Vec<(Expr, String)> = self
575 .extracted
576 .iter()
577 .map(|(e, a)| (e.clone(), a.clone()))
578 .collect();
579 let proj = build_extraction_projection_impl(
580 &pairs,
581 &self.columns_needed,
582 input,
583 self.input_schema,
584 )?;
585 Ok(Some(LogicalPlan::Projection(proj)))
586 }
587}
588
589fn build_extraction_projection_impl(
601 extracted_exprs: &[(Expr, String)],
602 columns_needed: &IndexSet<Column>,
603 target: &Arc<LogicalPlan>,
604 target_schema: &DFSchema,
605) -> Result<Projection> {
606 if let LogicalPlan::Projection(existing) = target.as_ref() {
607 let mut proj_exprs = existing.expr.clone();
609
610 let existing_extractions: IndexMap<Expr, String> = existing
612 .expr
613 .iter()
614 .filter_map(|e| {
615 if let Expr::Alias(alias) = e
616 && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
617 {
618 return Some((*alias.expr.clone(), alias.name.clone()));
619 }
620 None
621 })
622 .collect();
623
624 let replace_map = build_projection_replace_map(existing);
626
627 for (expr, alias) in extracted_exprs {
629 let resolved = replace_cols_by_name(expr.clone().alias(alias), &replace_map)?;
630 let resolved_inner = if let Expr::Alias(a) = &resolved {
631 a.expr.as_ref()
632 } else {
633 &resolved
634 };
635 if let Some(existing_alias) = existing_extractions.get(resolved_inner) {
636 if existing_alias != alias {
642 proj_exprs.push(resolved);
643 }
644 } else {
645 proj_exprs.push(resolved);
646 }
647 }
648
649 let existing_cols: IndexSet<Column> = existing
655 .expr
656 .iter()
657 .filter_map(|e| {
658 if let Expr::Column(c) = e {
659 Some(c.clone())
660 } else {
661 None
662 }
663 })
664 .collect();
665
666 let input_schema = existing.input.schema();
667 for col in columns_needed {
668 let col_expr = Expr::Column(col.clone());
669 let resolved = replace_cols_by_name(col_expr, &replace_map)?;
670 if let Expr::Column(resolved_col) = &resolved
671 && !existing_cols.contains(resolved_col)
672 && input_schema.has_column(resolved_col)
673 {
674 proj_exprs.push(Expr::Column(resolved_col.clone()));
675 }
676 }
678
679 Projection::try_new(proj_exprs, Arc::clone(&existing.input))
680 } else {
681 let mut proj_exprs = Vec::new();
683 for (expr, alias) in extracted_exprs {
684 proj_exprs.push(expr.clone().alias(alias));
685 }
686 for (qualifier, field) in target_schema.iter() {
687 proj_exprs.push(Expr::from((qualifier, field)));
688 }
689 Projection::try_new(proj_exprs, Arc::clone(target))
690 }
691}
692
693#[derive(Default, Debug)]
727pub struct PushDownLeafProjections {}
728
729impl PushDownLeafProjections {
730 pub fn new() -> Self {
731 Self {}
732 }
733}
734
735impl OptimizerRule for PushDownLeafProjections {
736 fn name(&self) -> &str {
737 "push_down_leaf_projections"
738 }
739
740 fn apply_order(&self) -> Option<ApplyOrder> {
741 Some(ApplyOrder::TopDown)
742 }
743
744 fn rewrite(
745 &self,
746 plan: LogicalPlan,
747 config: &dyn OptimizerConfig,
748 ) -> Result<Transformed<LogicalPlan>> {
749 if !config.options().optimizer.enable_leaf_expression_pushdown {
750 return Ok(Transformed::no(plan));
751 }
752 let alias_generator = config.alias_generator();
753 match try_push_input(&plan, alias_generator)? {
754 Some(new_plan) => Ok(Transformed::yes(new_plan)),
755 None => Ok(Transformed::no(plan)),
756 }
757 }
758}
759
760fn try_push_input(
765 input: &LogicalPlan,
766 alias_generator: &Arc<AliasGenerator>,
767) -> Result<Option<LogicalPlan>> {
768 let LogicalPlan::Projection(proj) = input else {
769 return Ok(None);
770 };
771 split_and_push_projection(proj, alias_generator)
772}
773
774fn split_and_push_projection(
806 proj: &Projection,
807 alias_generator: &Arc<AliasGenerator>,
808) -> Result<Option<LogicalPlan>> {
809 let has_existing_extracted = proj.expr.iter().any(|e| {
812 matches!(e, Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX))
813 });
814 if !has_existing_extracted && !has_extractable_expr(&proj.expr) {
815 return Ok(None);
816 }
817
818 let input = &proj.input;
819 let input_schema = input.schema();
820
821 let mut extractors = vec![LeafExpressionExtractor::new(
837 input_schema.as_ref(),
838 alias_generator,
839 )];
840 let input_column_sets = vec![schema_columns(input_schema.as_ref())];
841
842 let original_schema = proj.schema.as_ref();
843 let mut recovery_exprs: Vec<Expr> = Vec::with_capacity(proj.expr.len());
844 let mut needs_recovery = false;
845 let mut has_new_extractions = false;
846 let mut proj_exprs_captured: usize = 0;
847 let mut standalone_columns: IndexSet<Column> = IndexSet::new();
850
851 for (expr, (qualifier, field)) in proj.expr.iter().zip(original_schema.iter()) {
852 if let Expr::Alias(alias) = expr
853 && alias.name.starts_with(EXTRACTED_EXPR_PREFIX)
854 {
855 let alias_name = alias.name.clone();
858
859 for col_ref in alias.expr.column_refs() {
860 extractors[0].columns_needed.insert(col_ref.clone());
861 }
862
863 extractors[0]
864 .extracted
865 .insert(expr.clone(), alias_name.clone());
866 recovery_exprs.push(Expr::Column(Column::new_unqualified(&alias_name)));
867 proj_exprs_captured += 1;
868 } else if let Expr::Column(col) = expr {
869 extractors[0].columns_needed.insert(col.clone());
871 standalone_columns.insert(col.clone());
872 recovery_exprs.push(expr.clone());
873 proj_exprs_captured += 1;
874 } else {
875 let transformed =
877 routing_extract(expr.clone(), &mut extractors, &input_column_sets)?;
878 if transformed.transformed {
879 has_new_extractions = true;
880 }
881 let transformed_expr = transformed.data;
882
883 let original_name = field.name();
885 let needs_alias = if let Expr::Column(col) = &transformed_expr {
886 col.name.as_str() != original_name
887 } else {
888 let expr_name = transformed_expr.schema_name().to_string();
889 original_name != &expr_name
890 };
891 let recovery_expr = if needs_alias {
892 needs_recovery = true;
893 transformed_expr
894 .clone()
895 .alias_qualified(qualifier.cloned(), original_name)
896 } else {
897 transformed_expr.clone()
898 };
899
900 if transformed.transformed || !matches!(expr, Expr::Column(_)) {
905 needs_recovery = true;
906 }
907
908 recovery_exprs.push(recovery_expr);
909 }
910 }
911
912 let extractor = &extractors[0];
915 let extraction_pairs: Vec<(Expr, String)> = extractor
916 .extracted
917 .iter()
918 .map(|(e, a)| match e {
919 Expr::Alias(alias) => (*alias.expr.clone(), a.clone()),
920 _ => (e.clone(), a.clone()),
921 })
922 .collect();
923 let columns_needed = &extractor.columns_needed;
924
925 if extraction_pairs.is_empty() {
927 return Ok(None);
928 }
929
930 if columns_needed
935 .iter()
936 .any(|c| !standalone_columns.contains(c))
937 {
938 needs_recovery = true;
939 }
940
941 let proj_input = Arc::clone(&proj.input);
943 let pushed = push_extraction_pairs(
944 &extraction_pairs,
945 columns_needed,
946 proj,
947 &proj_input,
948 alias_generator,
949 proj_exprs_captured,
950 )?;
951
952 let base_plan = match pushed {
955 Some(plan) => plan,
956 None => {
957 if !has_new_extractions {
958 return Ok(None);
963 }
964 let input_arc = Arc::clone(input);
966 let extraction = build_extraction_projection_impl(
967 &extraction_pairs,
968 columns_needed,
969 &input_arc,
970 input_schema.as_ref(),
971 )?;
972 LogicalPlan::Projection(extraction)
973 }
974 };
975
976 if needs_recovery {
978 let recovery = LogicalPlan::Projection(Projection::try_new(
979 recovery_exprs,
980 Arc::new(base_plan),
981 )?);
982 Ok(Some(recovery))
983 } else {
984 Ok(Some(base_plan))
985 }
986}
987
988fn is_pure_extraction_projection(plan: &LogicalPlan) -> bool {
992 let LogicalPlan::Projection(proj) = plan else {
993 return false;
994 };
995 let mut has_extraction = false;
996 for expr in &proj.expr {
997 match expr {
998 Expr::Alias(alias) if alias.name.starts_with(EXTRACTED_EXPR_PREFIX) => {
999 has_extraction = true;
1000 }
1001 Expr::Column(_) => {}
1002 _ => return false,
1003 }
1004 }
1005 has_extraction
1006}
1007
1008fn push_extraction_pairs(
1011 pairs: &[(Expr, String)],
1012 columns_needed: &IndexSet<Column>,
1013 proj: &Projection,
1014 proj_input: &Arc<LogicalPlan>,
1015 alias_generator: &Arc<AliasGenerator>,
1016 proj_exprs_captured: usize,
1017) -> Result<Option<LogicalPlan>> {
1018 match proj_input.as_ref() {
1019 LogicalPlan::Projection(_) if proj_exprs_captured == proj.expr.len() => {
1026 let target_schema = Arc::clone(proj_input.schema());
1027 let merged = build_extraction_projection_impl(
1028 pairs,
1029 columns_needed,
1030 proj_input,
1031 target_schema.as_ref(),
1032 )?;
1033 let merged_plan = LogicalPlan::Projection(merged);
1034
1035 if is_pure_extraction_projection(&merged_plan)
1044 && let Some(pushed) = try_push_input(&merged_plan, alias_generator)?
1045 {
1046 return Ok(Some(pushed));
1047 }
1048 Ok(Some(merged_plan))
1049 }
1050 _ => try_push_into_inputs(
1056 pairs,
1057 columns_needed,
1058 proj_input.as_ref(),
1059 alias_generator,
1060 ),
1061 }
1062}
1063
1064fn route_to_inputs(
1072 pairs: &[(Expr, String)],
1073 columns: &IndexSet<Column>,
1074 node: &LogicalPlan,
1075 input_column_sets: &[std::collections::HashSet<Column>],
1076 input_schemas: &[Arc<DFSchema>],
1077) -> Result<Option<Vec<ExtractionTarget>>> {
1078 let num_inputs = input_schemas.len();
1079 let mut per_input: Vec<ExtractionTarget> = (0..num_inputs)
1080 .map(|_| ExtractionTarget {
1081 pairs: vec![],
1082 columns: IndexSet::new(),
1083 })
1084 .collect();
1085
1086 if matches!(node, LogicalPlan::Union(_)) {
1087 let union_schema = node.schema();
1091 for (idx, input_schema) in input_schemas.iter().enumerate() {
1092 per_input[idx] =
1093 remap_pairs_and_columns(pairs, columns, union_schema, input_schema)?;
1094 }
1095 } else {
1096 for (expr, alias) in pairs {
1097 match find_owning_input(expr, input_column_sets) {
1098 Some(idx) => per_input[idx].pairs.push((expr.clone(), alias.clone())),
1099 None => return Ok(None), }
1101 }
1102 for col in columns {
1103 let col_expr = Expr::Column(col.clone());
1104 match find_owning_input(&col_expr, input_column_sets) {
1105 Some(idx) => {
1106 per_input[idx].columns.insert(col.clone());
1107 }
1108 None => return Ok(None), }
1110 }
1111 }
1112
1113 if per_input.iter().all(|t| t.pairs.is_empty()) {
1115 return Ok(None);
1116 }
1117
1118 Ok(Some(per_input))
1119}
1120
1121fn try_push_into_inputs(
1150 pairs: &[(Expr, String)],
1151 columns_needed: &IndexSet<Column>,
1152 node: &LogicalPlan,
1153 alias_generator: &Arc<AliasGenerator>,
1154) -> Result<Option<LogicalPlan>> {
1155 let inputs = node.inputs();
1156 if inputs.is_empty() {
1157 return Ok(None);
1158 }
1159
1160 let remapped = if let LogicalPlan::SubqueryAlias(sa) = node {
1163 remap_pairs_and_columns(pairs, columns_needed, &sa.schema, sa.input.schema())?
1164 } else {
1165 ExtractionTarget {
1166 pairs: pairs.to_vec(),
1167 columns: columns_needed.clone(),
1168 }
1169 };
1170 let pairs = &remapped.pairs[..];
1171 let columns_needed = &remapped.columns;
1172
1173 let input_schemas: Vec<Arc<DFSchema>> =
1175 inputs.iter().map(|i| Arc::clone(i.schema())).collect();
1176 let input_column_sets: Vec<std::collections::HashSet<Column>> =
1177 input_schemas.iter().map(|s| schema_columns(s)).collect();
1178
1179 let per_input = match route_to_inputs(
1181 pairs,
1182 columns_needed,
1183 node,
1184 &input_column_sets,
1185 &input_schemas,
1186 )? {
1187 Some(routed) => routed,
1188 None => return Ok(None),
1189 };
1190
1191 let num_inputs = inputs.len();
1192
1193 let mut new_inputs: Vec<LogicalPlan> = Vec::with_capacity(num_inputs);
1198 for (idx, input) in inputs.into_iter().enumerate() {
1199 if per_input[idx].pairs.is_empty() {
1200 new_inputs.push(input.clone());
1201 } else {
1202 let input_arc = Arc::new(input.clone());
1203 let target_schema = Arc::clone(input.schema());
1204 let proj = build_extraction_projection_impl(
1205 &per_input[idx].pairs,
1206 &per_input[idx].columns,
1207 &input_arc,
1208 target_schema.as_ref(),
1209 )?;
1210 let proj_schema = proj.schema.as_ref();
1214 for (_expr, alias) in &per_input[idx].pairs {
1215 if !proj_schema.fields().iter().any(|f| f.name() == alias) {
1216 return Ok(None);
1217 }
1218 }
1219 let proj_plan = LogicalPlan::Projection(proj);
1220 match try_push_input(&proj_plan, alias_generator)? {
1225 Some(pushed) => new_inputs.push(pushed),
1226 None => new_inputs.push(proj_plan),
1227 }
1228 }
1229 }
1230
1231 let new_node = node.with_new_exprs(node.expressions(), new_inputs)?;
1233
1234 let output_schema = new_node.schema();
1238 for (_expr, alias) in pairs {
1239 if !output_schema.fields().iter().any(|f| f.name() == alias) {
1240 return Ok(None);
1241 }
1242 }
1243
1244 Ok(Some(new_node))
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249 use std::sync::Arc;
1250
1251 use super::*;
1252 use crate::optimize_projections::OptimizeProjections;
1253 use crate::test::udfs::PlacementTestUDF;
1254 use crate::test::*;
1255 use crate::{Optimizer, OptimizerContext};
1256 use datafusion_common::Result;
1257 use datafusion_expr::expr::ScalarFunction;
1258 use datafusion_expr::{Expr, ExpressionPlacement};
1259 use datafusion_expr::{
1260 ScalarUDF, col, lit, logical_plan::builder::LogicalPlanBuilder,
1261 };
1262
1263 fn leaf_udf(expr: Expr, name: &str) -> Expr {
1264 Expr::ScalarFunction(ScalarFunction::new_udf(
1265 Arc::new(ScalarUDF::new_from_impl(
1266 PlacementTestUDF::new()
1267 .with_placement(ExpressionPlacement::MoveTowardsLeafNodes),
1268 )),
1269 vec![expr, lit(name)],
1270 ))
1271 }
1272
1273 fn format_optimization_stages(plan: &LogicalPlan) -> Result<String> {
1287 let run = |rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>| -> Result<String> {
1288 let ctx = OptimizerContext::new().with_max_passes(1);
1289 let optimizer = Optimizer::with_rules(rules);
1290 let optimized = optimizer.optimize(plan.clone(), &ctx, |_, _| {})?;
1291 Ok(format!("{optimized}"))
1292 };
1293
1294 let original = run(vec![Arc::new(OptimizeProjections::new())])?;
1295
1296 let after_extract = run(vec![
1297 Arc::new(OptimizeProjections::new()),
1298 Arc::new(ExtractLeafExpressions::new()),
1299 ])?;
1300
1301 let after_pushdown = run(vec![
1302 Arc::new(OptimizeProjections::new()),
1303 Arc::new(ExtractLeafExpressions::new()),
1304 Arc::new(PushDownLeafProjections::new()),
1305 ])?;
1306
1307 let optimized = run(vec![
1308 Arc::new(OptimizeProjections::new()),
1309 Arc::new(ExtractLeafExpressions::new()),
1310 Arc::new(PushDownLeafProjections::new()),
1311 Arc::new(OptimizeProjections::new()),
1312 ])?;
1313
1314 let mut out = format!("## Original Plan\n{original}");
1315
1316 out.push_str("\n\n## After Extraction\n");
1317 if after_extract == original {
1318 out.push_str("(same as original)");
1319 } else {
1320 out.push_str(&after_extract);
1321 }
1322
1323 out.push_str("\n\n## After Pushdown\n");
1324 if after_pushdown == after_extract {
1325 out.push_str("(same as after extraction)");
1326 } else {
1327 out.push_str(&after_pushdown);
1328 }
1329
1330 out.push_str("\n\n## Optimized\n");
1331 if optimized == after_pushdown {
1332 out.push_str("(same as after pushdown)");
1333 } else {
1334 out.push_str(&optimized);
1335 }
1336
1337 Ok(out)
1338 }
1339
1340 macro_rules! assert_stages {
1342 ($plan:expr, @ $expected:literal $(,)?) => {{
1343 let result = format_optimization_stages(&$plan)?;
1344 insta::assert_snapshot!(result, @ $expected);
1345 Ok::<(), datafusion_common::DataFusionError>(())
1346 }};
1347 }
1348
1349 #[test]
1350 fn test_extract_from_filter() -> Result<()> {
1351 let table_scan = test_table_scan_with_struct()?;
1352 let plan = LogicalPlanBuilder::from(table_scan.clone())
1353 .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1354 .select(vec![
1355 table_scan
1356 .schema()
1357 .index_of_column_by_name(None, "id")
1358 .unwrap(),
1359 ])?
1360 .build()?;
1361
1362 assert_stages!(plan, @r#"
1363 ## Original Plan
1364 Projection: test.id
1365 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1366 TableScan: test projection=[id, user]
1367
1368 ## After Extraction
1369 Projection: test.id
1370 Projection: test.id, test.user
1371 Filter: __datafusion_extracted_1 = Utf8("active")
1372 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
1373 TableScan: test projection=[id, user]
1374
1375 ## After Pushdown
1376 (same as after extraction)
1377
1378 ## Optimized
1379 Projection: test.id
1380 Filter: __datafusion_extracted_1 = Utf8("active")
1381 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id
1382 TableScan: test projection=[id, user]
1383 "#)
1384 }
1385
1386 #[test]
1387 fn test_no_extraction_for_column() -> Result<()> {
1388 let table_scan = test_table_scan()?;
1389 let plan = LogicalPlanBuilder::from(table_scan)
1390 .filter(col("a").eq(lit(1)))?
1391 .build()?;
1392
1393 assert_stages!(plan, @"
1394 ## Original Plan
1395 Filter: test.a = Int32(1)
1396 TableScan: test projection=[a, b, c]
1397
1398 ## After Extraction
1399 (same as original)
1400
1401 ## After Pushdown
1402 (same as after extraction)
1403
1404 ## Optimized
1405 (same as after pushdown)
1406 ")
1407 }
1408
1409 #[test]
1410 fn test_extract_from_projection() -> Result<()> {
1411 let table_scan = test_table_scan_with_struct()?;
1412 let plan = LogicalPlanBuilder::from(table_scan)
1413 .project(vec![leaf_udf(col("user"), "name")])?
1414 .build()?;
1415
1416 assert_stages!(plan, @r#"
1417 ## Original Plan
1418 Projection: leaf_udf(test.user, Utf8("name"))
1419 TableScan: test projection=[user]
1420
1421 ## After Extraction
1422 (same as original)
1423
1424 ## After Pushdown
1425 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1426 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1427 TableScan: test projection=[user]
1428
1429 ## Optimized
1430 Projection: leaf_udf(test.user, Utf8("name"))
1431 TableScan: test projection=[user]
1432 "#)
1433 }
1434
1435 #[test]
1436 fn test_extract_from_projection_with_subexpression() -> Result<()> {
1437 let table_scan = test_table_scan_with_struct()?;
1438 let plan = LogicalPlanBuilder::from(table_scan)
1439 .project(vec![
1440 leaf_udf(col("user"), "name")
1441 .is_not_null()
1442 .alias("has_name"),
1443 ])?
1444 .build()?;
1445
1446 assert_stages!(plan, @r#"
1447 ## Original Plan
1448 Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name
1449 TableScan: test projection=[user]
1450
1451 ## After Extraction
1452 (same as original)
1453
1454 ## After Pushdown
1455 Projection: __datafusion_extracted_1 IS NOT NULL AS has_name
1456 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1457 TableScan: test projection=[user]
1458
1459 ## Optimized
1460 Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name
1461 TableScan: test projection=[user]
1462 "#)
1463 }
1464
1465 #[test]
1466 fn test_projection_no_extraction_for_column() -> Result<()> {
1467 let table_scan = test_table_scan()?;
1468 let plan = LogicalPlanBuilder::from(table_scan)
1469 .project(vec![col("a"), col("b")])?
1470 .build()?;
1471
1472 assert_stages!(plan, @"
1473 ## Original Plan
1474 TableScan: test projection=[a, b]
1475
1476 ## After Extraction
1477 (same as original)
1478
1479 ## After Pushdown
1480 (same as after extraction)
1481
1482 ## Optimized
1483 (same as after pushdown)
1484 ")
1485 }
1486
1487 #[test]
1488 fn test_filter_with_deduplication() -> Result<()> {
1489 let table_scan = test_table_scan_with_struct()?;
1490 let field_access = leaf_udf(col("user"), "name");
1491 let plan = LogicalPlanBuilder::from(table_scan)
1493 .filter(
1494 field_access
1495 .clone()
1496 .is_not_null()
1497 .and(field_access.is_null()),
1498 )?
1499 .build()?;
1500
1501 assert_stages!(plan, @r#"
1502 ## Original Plan
1503 Filter: leaf_udf(test.user, Utf8("name")) IS NOT NULL AND leaf_udf(test.user, Utf8("name")) IS NULL
1504 TableScan: test projection=[id, user]
1505
1506 ## After Extraction
1507 Projection: test.id, test.user
1508 Filter: __datafusion_extracted_1 IS NOT NULL AND __datafusion_extracted_1 IS NULL
1509 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1510 TableScan: test projection=[id, user]
1511
1512 ## After Pushdown
1513 (same as after extraction)
1514
1515 ## Optimized
1516 (same as after pushdown)
1517 "#)
1518 }
1519
1520 #[test]
1521 fn test_already_leaf_expression_in_filter() -> Result<()> {
1522 let table_scan = test_table_scan_with_struct()?;
1523 let plan = LogicalPlanBuilder::from(table_scan)
1524 .filter(leaf_udf(col("user"), "name").eq(lit("test")))?
1525 .build()?;
1526
1527 assert_stages!(plan, @r#"
1528 ## Original Plan
1529 Filter: leaf_udf(test.user, Utf8("name")) = Utf8("test")
1530 TableScan: test projection=[id, user]
1531
1532 ## After Extraction
1533 Projection: test.id, test.user
1534 Filter: __datafusion_extracted_1 = Utf8("test")
1535 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1536 TableScan: test projection=[id, user]
1537
1538 ## After Pushdown
1539 (same as after extraction)
1540
1541 ## Optimized
1542 (same as after pushdown)
1543 "#)
1544 }
1545
1546 #[test]
1547 fn test_extract_from_aggregate_group_by() -> Result<()> {
1548 use datafusion_expr::test::function_stub::count;
1549
1550 let table_scan = test_table_scan_with_struct()?;
1551 let plan = LogicalPlanBuilder::from(table_scan)
1552 .aggregate(vec![leaf_udf(col("user"), "status")], vec![count(lit(1))])?
1553 .build()?;
1554
1555 assert_stages!(plan, @r#"
1556 ## Original Plan
1557 Aggregate: groupBy=[[leaf_udf(test.user, Utf8("status"))]], aggr=[[COUNT(Int32(1))]]
1558 TableScan: test projection=[user]
1559
1560 ## After Extraction
1561 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), COUNT(Int32(1))
1562 Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
1563 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user
1564 TableScan: test projection=[user]
1565
1566 ## After Pushdown
1567 (same as after extraction)
1568
1569 ## Optimized
1570 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), COUNT(Int32(1))
1571 Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
1572 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
1573 TableScan: test projection=[user]
1574 "#)
1575 }
1576
1577 #[test]
1578 fn test_extract_from_aggregate_args() -> Result<()> {
1579 use datafusion_expr::test::function_stub::count;
1580
1581 let table_scan = test_table_scan_with_struct()?;
1582 let plan = LogicalPlanBuilder::from(table_scan)
1583 .aggregate(
1584 vec![col("user")],
1585 vec![count(leaf_udf(col("user"), "value"))],
1586 )?
1587 .build()?;
1588
1589 assert_stages!(plan, @r#"
1590 ## Original Plan
1591 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(leaf_udf(test.user, Utf8("value")))]]
1592 TableScan: test projection=[user]
1593
1594 ## After Extraction
1595 Projection: test.user, COUNT(__datafusion_extracted_1) AS COUNT(leaf_udf(test.user,Utf8("value")))
1596 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(__datafusion_extracted_1)]]
1597 Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1598 TableScan: test projection=[user]
1599
1600 ## After Pushdown
1601 (same as after extraction)
1602
1603 ## Optimized
1604 (same as after pushdown)
1605 "#)
1606 }
1607
1608 #[test]
1609 fn test_projection_with_filter_combined() -> Result<()> {
1610 let table_scan = test_table_scan_with_struct()?;
1611 let plan = LogicalPlanBuilder::from(table_scan)
1612 .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1613 .project(vec![leaf_udf(col("user"), "name")])?
1614 .build()?;
1615
1616 assert_stages!(plan, @r#"
1617 ## Original Plan
1618 Projection: leaf_udf(test.user, Utf8("name"))
1619 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1620 TableScan: test projection=[user]
1621
1622 ## After Extraction
1623 Projection: leaf_udf(test.user, Utf8("name"))
1624 Projection: test.user
1625 Filter: __datafusion_extracted_1 = Utf8("active")
1626 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user
1627 TableScan: test projection=[user]
1628
1629 ## After Pushdown
1630 Projection: __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
1631 Filter: __datafusion_extracted_1 = Utf8("active")
1632 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
1633 TableScan: test projection=[user]
1634
1635 ## Optimized
1636 Projection: __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
1637 Filter: __datafusion_extracted_1 = Utf8("active")
1638 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
1639 TableScan: test projection=[user]
1640 "#)
1641 }
1642
1643 #[test]
1644 fn test_projection_preserves_alias() -> Result<()> {
1645 let table_scan = test_table_scan_with_struct()?;
1646 let plan = LogicalPlanBuilder::from(table_scan)
1647 .project(vec![leaf_udf(col("user"), "name").alias("username")])?
1648 .build()?;
1649
1650 assert_stages!(plan, @r#"
1651 ## Original Plan
1652 Projection: leaf_udf(test.user, Utf8("name")) AS username
1653 TableScan: test projection=[user]
1654
1655 ## After Extraction
1656 (same as original)
1657
1658 ## After Pushdown
1659 Projection: __datafusion_extracted_1 AS username
1660 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1661 TableScan: test projection=[user]
1662
1663 ## Optimized
1664 Projection: leaf_udf(test.user, Utf8("name")) AS username
1665 TableScan: test projection=[user]
1666 "#)
1667 }
1668
1669 #[test]
1673 fn test_projection_different_field_from_filter() -> Result<()> {
1674 let table_scan = test_table_scan_with_struct()?;
1675 let plan = LogicalPlanBuilder::from(table_scan)
1676 .filter(leaf_udf(col("user"), "value").gt(lit(150)))?
1677 .project(vec![col("user"), leaf_udf(col("user"), "label")])?
1678 .build()?;
1679
1680 assert_stages!(plan, @r#"
1681 ## Original Plan
1682 Projection: test.user, leaf_udf(test.user, Utf8("label"))
1683 Filter: leaf_udf(test.user, Utf8("value")) > Int32(150)
1684 TableScan: test projection=[user]
1685
1686 ## After Extraction
1687 Projection: test.user, leaf_udf(test.user, Utf8("label"))
1688 Projection: test.user
1689 Filter: __datafusion_extracted_1 > Int32(150)
1690 Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1691 TableScan: test projection=[user]
1692
1693 ## After Pushdown
1694 Projection: test.user, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("label"))
1695 Filter: __datafusion_extracted_1 > Int32(150)
1696 Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user, leaf_udf(test.user, Utf8("label")) AS __datafusion_extracted_2
1697 TableScan: test projection=[user]
1698
1699 ## Optimized
1700 (same as after pushdown)
1701 "#)
1702 }
1703
1704 #[test]
1705 fn test_projection_deduplication() -> Result<()> {
1706 let table_scan = test_table_scan_with_struct()?;
1707 let field = leaf_udf(col("user"), "name");
1708 let plan = LogicalPlanBuilder::from(table_scan)
1709 .project(vec![field.clone(), field.clone().alias("name2")])?
1710 .build()?;
1711
1712 assert_stages!(plan, @r#"
1713 ## Original Plan
1714 Projection: leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("name")) AS name2
1715 TableScan: test projection=[user]
1716
1717 ## After Extraction
1718 (same as original)
1719
1720 ## After Pushdown
1721 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_1 AS name2
1722 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1723 TableScan: test projection=[user]
1724
1725 ## Optimized
1726 Projection: leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("name")) AS name2
1727 TableScan: test projection=[user]
1728 "#)
1729 }
1730
1731 #[test]
1737 fn test_extract_through_sort() -> Result<()> {
1738 let table_scan = test_table_scan_with_struct()?;
1739 let plan = LogicalPlanBuilder::from(table_scan)
1740 .sort(vec![col("user").sort(true, true)])?
1741 .project(vec![leaf_udf(col("user"), "name")])?
1742 .build()?;
1743
1744 assert_stages!(plan, @r#"
1745 ## Original Plan
1746 Projection: leaf_udf(test.user, Utf8("name"))
1747 Sort: test.user ASC NULLS FIRST
1748 TableScan: test projection=[user]
1749
1750 ## After Extraction
1751 (same as original)
1752
1753 ## After Pushdown
1754 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1755 Sort: test.user ASC NULLS FIRST
1756 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1757 TableScan: test projection=[user]
1758
1759 ## Optimized
1760 (same as after pushdown)
1761 "#)
1762 }
1763
1764 #[test]
1766 fn test_extract_through_limit() -> Result<()> {
1767 let table_scan = test_table_scan_with_struct()?;
1768 let plan = LogicalPlanBuilder::from(table_scan)
1769 .limit(0, Some(10))?
1770 .project(vec![leaf_udf(col("user"), "name")])?
1771 .build()?;
1772
1773 assert_stages!(plan, @r#"
1774 ## Original Plan
1775 Projection: leaf_udf(test.user, Utf8("name"))
1776 Limit: skip=0, fetch=10
1777 TableScan: test projection=[user]
1778
1779 ## After Extraction
1780 (same as original)
1781
1782 ## After Pushdown
1783 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1784 Limit: skip=0, fetch=10
1785 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1786 TableScan: test projection=[user]
1787
1788 ## Optimized
1789 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1790 Limit: skip=0, fetch=10
1791 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1792 TableScan: test projection=[user]
1793 "#)
1794 }
1795
1796 #[test]
1798 fn test_extract_from_aliased_aggregate() -> Result<()> {
1799 use datafusion_expr::test::function_stub::count;
1800
1801 let table_scan = test_table_scan_with_struct()?;
1802 let plan = LogicalPlanBuilder::from(table_scan)
1803 .aggregate(
1804 vec![col("user")],
1805 vec![count(leaf_udf(col("user"), "value")).alias("cnt")],
1806 )?
1807 .build()?;
1808
1809 assert_stages!(plan, @r#"
1810 ## Original Plan
1811 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(leaf_udf(test.user, Utf8("value"))) AS cnt]]
1812 TableScan: test projection=[user]
1813
1814 ## After Extraction
1815 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(__datafusion_extracted_1) AS cnt]]
1816 Projection: leaf_udf(test.user, Utf8("value")) AS __datafusion_extracted_1, test.user
1817 TableScan: test projection=[user]
1818
1819 ## After Pushdown
1820 (same as after extraction)
1821
1822 ## Optimized
1823 (same as after pushdown)
1824 "#)
1825 }
1826
1827 #[test]
1829 fn test_aggregate_no_extraction() -> Result<()> {
1830 use datafusion_expr::test::function_stub::count;
1831
1832 let table_scan = test_table_scan()?;
1833 let plan = LogicalPlanBuilder::from(table_scan)
1834 .aggregate(vec![col("a")], vec![count(col("b"))])?
1835 .build()?;
1836
1837 assert_stages!(plan, @"
1838 ## Original Plan
1839 Aggregate: groupBy=[[test.a]], aggr=[[COUNT(test.b)]]
1840 TableScan: test projection=[a, b]
1841
1842 ## After Extraction
1843 (same as original)
1844
1845 ## After Pushdown
1846 (same as after extraction)
1847
1848 ## Optimized
1849 (same as after pushdown)
1850 ")
1851 }
1852
1853 #[test]
1855 fn test_skip_extracted_projection() -> Result<()> {
1856 let table_scan = test_table_scan_with_struct()?;
1857 let plan = LogicalPlanBuilder::from(table_scan)
1858 .project(vec![
1859 leaf_udf(col("user"), "name").alias("__datafusion_extracted_manual"),
1860 col("user"),
1861 ])?
1862 .build()?;
1863
1864 assert_stages!(plan, @r#"
1865 ## Original Plan
1866 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_manual, test.user
1867 TableScan: test projection=[user]
1868
1869 ## After Extraction
1870 (same as original)
1871
1872 ## After Pushdown
1873 (same as after extraction)
1874
1875 ## Optimized
1876 (same as after pushdown)
1877 "#)
1878 }
1879
1880 #[test]
1882 fn test_merge_into_existing_extracted_projection() -> Result<()> {
1883 let table_scan = test_table_scan_with_struct()?;
1884 let plan = LogicalPlanBuilder::from(table_scan)
1885 .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
1886 .filter(leaf_udf(col("user"), "name").is_not_null())?
1887 .build()?;
1888
1889 assert_stages!(plan, @r#"
1890 ## Original Plan
1891 Filter: leaf_udf(test.user, Utf8("name")) IS NOT NULL
1892 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
1893 TableScan: test projection=[id, user]
1894
1895 ## After Extraction
1896 Projection: test.id, test.user
1897 Filter: __datafusion_extracted_1 IS NOT NULL
1898 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.id, test.user
1899 Projection: test.id, test.user
1900 Filter: __datafusion_extracted_2 = Utf8("active")
1901 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user
1902 TableScan: test projection=[id, user]
1903
1904 ## After Pushdown
1905 Projection: test.id, test.user
1906 Filter: __datafusion_extracted_1 IS NOT NULL
1907 Filter: __datafusion_extracted_2 = Utf8("active")
1908 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1909 TableScan: test projection=[id, user]
1910
1911 ## Optimized
1912 Projection: test.id, test.user
1913 Filter: __datafusion_extracted_1 IS NOT NULL
1914 Projection: test.id, test.user, __datafusion_extracted_1
1915 Filter: __datafusion_extracted_2 = Utf8("active")
1916 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
1917 TableScan: test projection=[id, user]
1918 "#)
1919 }
1920
1921 #[test]
1923 fn test_extract_through_passthrough_projection() -> Result<()> {
1924 let table_scan = test_table_scan_with_struct()?;
1925 let plan = LogicalPlanBuilder::from(table_scan)
1926 .project(vec![col("user")])?
1927 .project(vec![leaf_udf(col("user"), "name")])?
1928 .build()?;
1929
1930 assert_stages!(plan, @r#"
1931 ## Original Plan
1932 Projection: leaf_udf(test.user, Utf8("name"))
1933 TableScan: test projection=[user]
1934
1935 ## After Extraction
1936 (same as original)
1937
1938 ## After Pushdown
1939 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name"))
1940 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
1941 TableScan: test projection=[user]
1942
1943 ## Optimized
1944 Projection: leaf_udf(test.user, Utf8("name"))
1945 TableScan: test projection=[user]
1946 "#)
1947 }
1948
1949 #[test]
1951 fn test_projection_early_return_no_extraction() -> Result<()> {
1952 let table_scan = test_table_scan()?;
1953 let plan = LogicalPlanBuilder::from(table_scan)
1954 .project(vec![col("a").alias("x"), col("b")])?
1955 .build()?;
1956
1957 assert_stages!(plan, @"
1958 ## Original Plan
1959 Projection: test.a AS x, test.b
1960 TableScan: test projection=[a, b]
1961
1962 ## After Extraction
1963 (same as original)
1964
1965 ## After Pushdown
1966 (same as after extraction)
1967
1968 ## Optimized
1969 (same as after pushdown)
1970 ")
1971 }
1972
1973 #[test]
1975 fn test_projection_with_arithmetic_no_extraction() -> Result<()> {
1976 let table_scan = test_table_scan()?;
1977 let plan = LogicalPlanBuilder::from(table_scan)
1978 .project(vec![(col("a") + col("b")).alias("sum")])?
1979 .build()?;
1980
1981 assert_stages!(plan, @"
1982 ## Original Plan
1983 Projection: test.a + test.b AS sum
1984 TableScan: test projection=[a, b]
1985
1986 ## After Extraction
1987 (same as original)
1988
1989 ## After Pushdown
1990 (same as after extraction)
1991
1992 ## Optimized
1993 (same as after pushdown)
1994 ")
1995 }
1996
1997 #[test]
1999 fn test_aggregate_merge_into_extracted_projection() -> Result<()> {
2000 use datafusion_expr::test::function_stub::count;
2001
2002 let table_scan = test_table_scan_with_struct()?;
2003 let plan = LogicalPlanBuilder::from(table_scan)
2004 .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
2005 .aggregate(vec![leaf_udf(col("user"), "name")], vec![count(lit(1))])?
2006 .build()?;
2007
2008 assert_stages!(plan, @r#"
2009 ## Original Plan
2010 Aggregate: groupBy=[[leaf_udf(test.user, Utf8("name"))]], aggr=[[COUNT(Int32(1))]]
2011 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
2012 TableScan: test projection=[user]
2013
2014 ## After Extraction
2015 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
2016 Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2017 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2018 Projection: test.user
2019 Filter: __datafusion_extracted_2 = Utf8("active")
2020 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.user
2021 TableScan: test projection=[user]
2022
2023 ## After Pushdown
2024 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
2025 Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2026 Filter: __datafusion_extracted_2 = Utf8("active")
2027 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2028 TableScan: test projection=[user]
2029
2030 ## Optimized
2031 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("name")), COUNT(Int32(1))
2032 Aggregate: groupBy=[[__datafusion_extracted_1]], aggr=[[COUNT(Int32(1))]]
2033 Projection: __datafusion_extracted_1
2034 Filter: __datafusion_extracted_2 = Utf8("active")
2035 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2036 TableScan: test projection=[user]
2037 "#)
2038 }
2039
2040 #[test]
2044 fn test_projection_with_leaf_expr_above_aggregate() -> Result<()> {
2045 use datafusion_expr::test::function_stub::count;
2046
2047 let table_scan = test_table_scan_with_struct()?;
2048 let plan = LogicalPlanBuilder::from(table_scan)
2049 .aggregate(vec![col("user")], vec![count(lit(1))])?
2050 .project(vec![
2051 leaf_udf(col("user"), "name")
2052 .is_not_null()
2053 .alias("has_name"),
2054 col("COUNT(Int32(1))"),
2055 ])?
2056 .build()?;
2057
2058 assert_stages!(plan, @r#"
2059 ## Original Plan
2060 Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name, COUNT(Int32(1))
2061 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2062 TableScan: test projection=[user]
2063
2064 ## After Extraction
2065 (same as original)
2066
2067 ## After Pushdown
2068 Projection: __datafusion_extracted_1 IS NOT NULL AS has_name, COUNT(Int32(1))
2069 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user, COUNT(Int32(1))
2070 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2071 TableScan: test projection=[user]
2072
2073 ## Optimized
2074 Projection: leaf_udf(test.user, Utf8("name")) IS NOT NULL AS has_name, COUNT(Int32(1))
2075 Aggregate: groupBy=[[test.user]], aggr=[[COUNT(Int32(1))]]
2076 TableScan: test projection=[user]
2077 "#)
2078 }
2079
2080 #[test]
2082 fn test_merge_with_new_columns() -> Result<()> {
2083 let table_scan = test_table_scan()?;
2084 let plan = LogicalPlanBuilder::from(table_scan)
2085 .filter(leaf_udf(col("a"), "x").eq(lit(1)))?
2086 .filter(leaf_udf(col("b"), "y").eq(lit(2)))?
2087 .build()?;
2088
2089 assert_stages!(plan, @r#"
2090 ## Original Plan
2091 Filter: leaf_udf(test.b, Utf8("y")) = Int32(2)
2092 Filter: leaf_udf(test.a, Utf8("x")) = Int32(1)
2093 TableScan: test projection=[a, b, c]
2094
2095 ## After Extraction
2096 Projection: test.a, test.b, test.c
2097 Filter: __datafusion_extracted_1 = Int32(2)
2098 Projection: leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1, test.a, test.b, test.c
2099 Projection: test.a, test.b, test.c
2100 Filter: __datafusion_extracted_2 = Int32(1)
2101 Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c
2102 TableScan: test projection=[a, b, c]
2103
2104 ## After Pushdown
2105 Projection: test.a, test.b, test.c
2106 Filter: __datafusion_extracted_1 = Int32(2)
2107 Filter: __datafusion_extracted_2 = Int32(1)
2108 Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c, leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1
2109 TableScan: test projection=[a, b, c]
2110
2111 ## Optimized
2112 Projection: test.a, test.b, test.c
2113 Filter: __datafusion_extracted_1 = Int32(2)
2114 Projection: test.a, test.b, test.c, __datafusion_extracted_1
2115 Filter: __datafusion_extracted_2 = Int32(1)
2116 Projection: leaf_udf(test.a, Utf8("x")) AS __datafusion_extracted_2, test.a, test.b, test.c, leaf_udf(test.b, Utf8("y")) AS __datafusion_extracted_1
2117 TableScan: test projection=[a, b, c]
2118 "#)
2119 }
2120
2121 fn test_table_scan_with_struct_named(name: &str) -> Result<LogicalPlan> {
2127 use arrow::datatypes::Schema;
2128 let schema = Schema::new(test_table_scan_with_struct_fields());
2129 datafusion_expr::logical_plan::table_scan(Some(name), &schema, None)?.build()
2130 }
2131
2132 #[test]
2134 fn test_extract_from_join_on() -> Result<()> {
2135 use datafusion_expr::JoinType;
2136
2137 let left = test_table_scan_with_struct()?;
2138 let right = test_table_scan_with_struct_named("right")?;
2139
2140 let plan = LogicalPlanBuilder::from(left)
2141 .join_with_expr_keys(
2142 right,
2143 JoinType::Inner,
2144 (
2145 vec![leaf_udf(col("user"), "id")],
2146 vec![leaf_udf(col("user"), "id")],
2147 ),
2148 None,
2149 )?
2150 .build()?;
2151
2152 assert_stages!(plan, @r#"
2153 ## Original Plan
2154 Inner Join: leaf_udf(test.user, Utf8("id")) = leaf_udf(right.user, Utf8("id"))
2155 TableScan: test projection=[id, user]
2156 TableScan: right projection=[id, user]
2157
2158 ## After Extraction
2159 Projection: test.id, test.user, right.id, right.user
2160 Inner Join: __datafusion_extracted_1 = __datafusion_extracted_2
2161 Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_1, test.id, test.user
2162 TableScan: test projection=[id, user]
2163 Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_2, right.id, right.user
2164 TableScan: right projection=[id, user]
2165
2166 ## After Pushdown
2167 (same as after extraction)
2168
2169 ## Optimized
2170 (same as after pushdown)
2171 "#)
2172 }
2173
2174 #[test]
2176 fn test_extract_from_join_filter() -> Result<()> {
2177 use datafusion_expr::JoinType;
2178
2179 let left = test_table_scan_with_struct()?;
2180 let right = test_table_scan_with_struct_named("right")?;
2181
2182 let plan = LogicalPlanBuilder::from(left)
2183 .join_on(
2184 right,
2185 JoinType::Inner,
2186 vec![
2187 col("test.user").eq(col("right.user")),
2188 leaf_udf(col("test.user"), "status").eq(lit("active")),
2189 ],
2190 )?
2191 .build()?;
2192
2193 assert_stages!(plan, @r#"
2194 ## Original Plan
2195 Inner Join: Filter: test.user = right.user AND leaf_udf(test.user, Utf8("status")) = Utf8("active")
2196 TableScan: test projection=[id, user]
2197 TableScan: right projection=[id, user]
2198
2199 ## After Extraction
2200 Projection: test.id, test.user, right.id, right.user
2201 Inner Join: Filter: test.user = right.user AND __datafusion_extracted_1 = Utf8("active")
2202 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2203 TableScan: test projection=[id, user]
2204 TableScan: right projection=[id, user]
2205
2206 ## After Pushdown
2207 (same as after extraction)
2208
2209 ## Optimized
2210 (same as after pushdown)
2211 "#)
2212 }
2213
2214 #[test]
2216 fn test_extract_from_join_both_sides() -> Result<()> {
2217 use datafusion_expr::JoinType;
2218
2219 let left = test_table_scan_with_struct()?;
2220 let right = test_table_scan_with_struct_named("right")?;
2221
2222 let plan = LogicalPlanBuilder::from(left)
2223 .join_on(
2224 right,
2225 JoinType::Inner,
2226 vec![
2227 col("test.user").eq(col("right.user")),
2228 leaf_udf(col("test.user"), "status").eq(lit("active")),
2229 leaf_udf(col("right.user"), "role").eq(lit("admin")),
2230 ],
2231 )?
2232 .build()?;
2233
2234 assert_stages!(plan, @r#"
2235 ## Original Plan
2236 Inner Join: Filter: test.user = right.user AND leaf_udf(test.user, Utf8("status")) = Utf8("active") AND leaf_udf(right.user, Utf8("role")) = Utf8("admin")
2237 TableScan: test projection=[id, user]
2238 TableScan: right projection=[id, user]
2239
2240 ## After Extraction
2241 Projection: test.id, test.user, right.id, right.user
2242 Inner Join: Filter: test.user = right.user AND __datafusion_extracted_1 = Utf8("active") AND __datafusion_extracted_2 = Utf8("admin")
2243 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2244 TableScan: test projection=[id, user]
2245 Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id, right.user
2246 TableScan: right projection=[id, user]
2247
2248 ## After Pushdown
2249 (same as after extraction)
2250
2251 ## Optimized
2252 (same as after pushdown)
2253 "#)
2254 }
2255
2256 #[test]
2258 fn test_extract_from_join_no_extraction() -> Result<()> {
2259 use datafusion_expr::JoinType;
2260
2261 let left = test_table_scan()?;
2262 let right = test_table_scan_with_name("right")?;
2263
2264 let plan = LogicalPlanBuilder::from(left)
2265 .join(right, JoinType::Inner, (vec!["a"], vec!["a"]), None)?
2266 .build()?;
2267
2268 assert_stages!(plan, @"
2269 ## Original Plan
2270 Inner Join: test.a = right.a
2271 TableScan: test projection=[a, b, c]
2272 TableScan: right projection=[a, b, c]
2273
2274 ## After Extraction
2275 (same as original)
2276
2277 ## After Pushdown
2278 (same as after extraction)
2279
2280 ## Optimized
2281 (same as after pushdown)
2282 ")
2283 }
2284
2285 #[test]
2287 fn test_extract_from_filter_above_join() -> Result<()> {
2288 use datafusion_expr::JoinType;
2289
2290 let left = test_table_scan_with_struct()?;
2291 let right = test_table_scan_with_struct_named("right")?;
2292
2293 let plan = LogicalPlanBuilder::from(left)
2294 .join_with_expr_keys(
2295 right,
2296 JoinType::Inner,
2297 (
2298 vec![leaf_udf(col("user"), "id")],
2299 vec![leaf_udf(col("user"), "id")],
2300 ),
2301 None,
2302 )?
2303 .filter(leaf_udf(col("test.user"), "status").eq(lit("active")))?
2304 .build()?;
2305
2306 assert_stages!(plan, @r#"
2307 ## Original Plan
2308 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
2309 Inner Join: leaf_udf(test.user, Utf8("id")) = leaf_udf(right.user, Utf8("id"))
2310 TableScan: test projection=[id, user]
2311 TableScan: right projection=[id, user]
2312
2313 ## After Extraction
2314 Projection: test.id, test.user, right.id, right.user
2315 Filter: __datafusion_extracted_1 = Utf8("active")
2316 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, right.id, right.user
2317 Projection: test.id, test.user, right.id, right.user
2318 Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2319 Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user
2320 TableScan: test projection=[id, user]
2321 Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2322 TableScan: right projection=[id, user]
2323
2324 ## After Pushdown
2325 Projection: test.id, test.user, right.id, right.user
2326 Filter: __datafusion_extracted_1 = Utf8("active")
2327 Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2328 Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
2329 TableScan: test projection=[id, user]
2330 Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2331 TableScan: right projection=[id, user]
2332
2333 ## Optimized
2334 Projection: test.id, test.user, right.id, right.user
2335 Filter: __datafusion_extracted_1 = Utf8("active")
2336 Projection: test.id, test.user, __datafusion_extracted_1, right.id, right.user
2337 Inner Join: __datafusion_extracted_2 = __datafusion_extracted_3
2338 Projection: leaf_udf(test.user, Utf8("id")) AS __datafusion_extracted_2, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
2339 TableScan: test projection=[id, user]
2340 Projection: leaf_udf(right.user, Utf8("id")) AS __datafusion_extracted_3, right.id, right.user
2341 TableScan: right projection=[id, user]
2342 "#)
2343 }
2344
2345 #[test]
2348 fn test_extract_projection_above_join() -> Result<()> {
2349 use datafusion_expr::JoinType;
2350
2351 let left = test_table_scan_with_struct()?;
2352 let right = test_table_scan_with_struct_named("right")?;
2353
2354 let plan = LogicalPlanBuilder::from(left)
2355 .join(right, JoinType::Inner, (vec!["id"], vec!["id"]), None)?
2356 .project(vec![
2357 leaf_udf(col("test.user"), "status"),
2358 leaf_udf(col("right.user"), "role"),
2359 ])?
2360 .build()?;
2361
2362 assert_stages!(plan, @r#"
2363 ## Original Plan
2364 Projection: leaf_udf(test.user, Utf8("status")), leaf_udf(right.user, Utf8("role"))
2365 Inner Join: test.id = right.id
2366 TableScan: test projection=[id, user]
2367 TableScan: right projection=[id, user]
2368
2369 ## After Extraction
2370 (same as original)
2371
2372 ## After Pushdown
2373 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), __datafusion_extracted_2 AS leaf_udf(right.user,Utf8("role"))
2374 Inner Join: test.id = right.id
2375 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2376 TableScan: test projection=[id, user]
2377 Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id, right.user
2378 TableScan: right projection=[id, user]
2379
2380 ## Optimized
2381 Projection: __datafusion_extracted_1 AS leaf_udf(test.user,Utf8("status")), __datafusion_extracted_2 AS leaf_udf(right.user,Utf8("role"))
2382 Inner Join: test.id = right.id
2383 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id
2384 TableScan: test projection=[id, user]
2385 Projection: leaf_udf(right.user, Utf8("role")) AS __datafusion_extracted_2, right.id
2386 TableScan: right projection=[id, user]
2387 "#)
2388 }
2389
2390 #[test]
2393 fn test_extract_from_join_qualified_right_side() -> Result<()> {
2394 use datafusion_expr::JoinType;
2395
2396 let left = test_table_scan_with_struct()?;
2397 let right = test_table_scan_with_struct_named("right")?;
2398
2399 let plan = LogicalPlanBuilder::from(left)
2401 .join_on(
2402 right,
2403 JoinType::Inner,
2404 vec![
2405 col("test.id").eq(col("right.id")),
2406 leaf_udf(col("right.user"), "status").eq(lit("active")),
2407 ],
2408 )?
2409 .build()?;
2410
2411 assert_stages!(plan, @r#"
2412 ## Original Plan
2413 Inner Join: Filter: test.id = right.id AND leaf_udf(right.user, Utf8("status")) = Utf8("active")
2414 TableScan: test projection=[id, user]
2415 TableScan: right projection=[id, user]
2416
2417 ## After Extraction
2418 Projection: test.id, test.user, right.id, right.user
2419 Inner Join: Filter: test.id = right.id AND __datafusion_extracted_1 = Utf8("active")
2420 TableScan: test projection=[id, user]
2421 Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user
2422 TableScan: right projection=[id, user]
2423
2424 ## After Pushdown
2425 (same as after extraction)
2426
2427 ## Optimized
2428 (same as after pushdown)
2429 "#)
2430 }
2431
2432 #[test]
2436 fn test_find_owning_input_ambiguous_unqualified_column() {
2437 use std::collections::HashSet;
2438
2439 let left_cols: HashSet<Column> = [
2443 Column::new(Some("test"), "user"),
2444 Column::new_unqualified("user"),
2445 ]
2446 .into_iter()
2447 .collect();
2448
2449 let right_cols: HashSet<Column> = [
2450 Column::new(Some("right"), "user"),
2451 Column::new_unqualified("user"),
2452 ]
2453 .into_iter()
2454 .collect();
2455
2456 let input_column_sets = vec![left_cols, right_cols];
2457
2458 let unqualified = Expr::Column(Column::new_unqualified("user"));
2460 assert_eq!(find_owning_input(&unqualified, &input_column_sets), None);
2461
2462 let qualified_right = Expr::Column(Column::new(Some("right"), "user"));
2464 assert_eq!(
2465 find_owning_input(&qualified_right, &input_column_sets),
2466 Some(1)
2467 );
2468
2469 let qualified_left = Expr::Column(Column::new(Some("test"), "user"));
2471 assert_eq!(
2472 find_owning_input(&qualified_left, &input_column_sets),
2473 Some(0)
2474 );
2475 }
2476
2477 #[test]
2480 fn test_extract_from_join_cross_input_expression() -> Result<()> {
2481 let left = test_table_scan_with_struct()?;
2482 let right = test_table_scan_with_struct_named("right")?;
2483
2484 let plan = LogicalPlanBuilder::from(left)
2485 .join_on(
2486 right,
2487 datafusion_expr::JoinType::Inner,
2488 vec![col("test.id").eq(col("right.id"))],
2489 )?
2490 .filter(
2491 leaf_udf(col("test.user"), "status")
2492 .eq(leaf_udf(col("right.user"), "status")),
2493 )?
2494 .build()?;
2495
2496 assert_stages!(plan, @r#"
2497 ## Original Plan
2498 Filter: leaf_udf(test.user, Utf8("status")) = leaf_udf(right.user, Utf8("status"))
2499 Inner Join: Filter: test.id = right.id
2500 TableScan: test projection=[id, user]
2501 TableScan: right projection=[id, user]
2502
2503 ## After Extraction
2504 Projection: test.id, test.user, right.id, right.user
2505 Filter: __datafusion_extracted_1 = __datafusion_extracted_2
2506 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_2, test.id, test.user, right.id, right.user
2507 Inner Join: Filter: test.id = right.id
2508 TableScan: test projection=[id, user]
2509 TableScan: right projection=[id, user]
2510
2511 ## After Pushdown
2512 Projection: test.id, test.user, right.id, right.user
2513 Filter: __datafusion_extracted_1 = __datafusion_extracted_2
2514 Inner Join: Filter: test.id = right.id
2515 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2516 TableScan: test projection=[id, user]
2517 Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_2, right.id, right.user
2518 TableScan: right projection=[id, user]
2519
2520 ## Optimized
2521 (same as after pushdown)
2522 "#)
2523 }
2524
2525 #[test]
2531 fn test_extract_through_filter_with_column_rename() -> Result<()> {
2532 let table_scan = test_table_scan_with_struct()?;
2533 let plan = LogicalPlanBuilder::from(table_scan)
2534 .project(vec![col("user").alias("x")])?
2535 .filter(col("x").is_not_null())?
2536 .project(vec![leaf_udf(col("x"), "a")])?
2537 .build()?;
2538
2539 assert_stages!(plan, @r#"
2540 ## Original Plan
2541 Projection: leaf_udf(x, Utf8("a"))
2542 Filter: x IS NOT NULL
2543 Projection: test.user AS x
2544 TableScan: test projection=[user]
2545
2546 ## After Extraction
2547 (same as original)
2548
2549 ## After Pushdown
2550 Projection: __datafusion_extracted_1 AS leaf_udf(x,Utf8("a"))
2551 Filter: x IS NOT NULL
2552 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2553 TableScan: test projection=[user]
2554
2555 ## Optimized
2556 Projection: __datafusion_extracted_1 AS leaf_udf(x,Utf8("a"))
2557 Filter: x IS NOT NULL
2558 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2559 TableScan: test projection=[user]
2560 "#)
2561 }
2562
2563 #[test]
2565 fn test_extract_partial_through_filter_with_column_rename() -> Result<()> {
2566 let table_scan = test_table_scan_with_struct()?;
2567 let plan = LogicalPlanBuilder::from(table_scan)
2568 .project(vec![col("user").alias("x")])?
2569 .filter(col("x").is_not_null())?
2570 .project(vec![leaf_udf(col("x"), "a").is_not_null()])?
2571 .build()?;
2572
2573 assert_stages!(plan, @r#"
2574 ## Original Plan
2575 Projection: leaf_udf(x, Utf8("a")) IS NOT NULL
2576 Filter: x IS NOT NULL
2577 Projection: test.user AS x
2578 TableScan: test projection=[user]
2579
2580 ## After Extraction
2581 (same as original)
2582
2583 ## After Pushdown
2584 Projection: __datafusion_extracted_1 IS NOT NULL AS leaf_udf(x,Utf8("a")) IS NOT NULL
2585 Filter: x IS NOT NULL
2586 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2587 TableScan: test projection=[user]
2588
2589 ## Optimized
2590 Projection: __datafusion_extracted_1 IS NOT NULL AS leaf_udf(x,Utf8("a")) IS NOT NULL
2591 Filter: x IS NOT NULL
2592 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2593 TableScan: test projection=[user]
2594 "#)
2595 }
2596
2597 #[test]
2599 fn test_extract_from_filter_above_renaming_projection() -> Result<()> {
2600 let table_scan = test_table_scan_with_struct()?;
2601 let plan = LogicalPlanBuilder::from(table_scan)
2602 .project(vec![col("user").alias("x")])?
2603 .filter(leaf_udf(col("x"), "a").eq(lit("active")))?
2604 .build()?;
2605
2606 assert_stages!(plan, @r#"
2607 ## Original Plan
2608 Filter: leaf_udf(x, Utf8("a")) = Utf8("active")
2609 Projection: test.user AS x
2610 TableScan: test projection=[user]
2611
2612 ## After Extraction
2613 Projection: x
2614 Filter: __datafusion_extracted_1 = Utf8("active")
2615 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1, test.user
2616 TableScan: test projection=[user]
2617
2618 ## After Pushdown
2619 (same as after extraction)
2620
2621 ## Optimized
2622 Projection: x
2623 Filter: __datafusion_extracted_1 = Utf8("active")
2624 Projection: test.user AS x, leaf_udf(test.user, Utf8("a")) AS __datafusion_extracted_1
2625 TableScan: test projection=[user]
2626 "#)
2627 }
2628
2629 #[test]
2635 fn test_extract_through_subquery_alias() -> Result<()> {
2636 let table_scan = test_table_scan_with_struct()?;
2637 let plan = LogicalPlanBuilder::from(table_scan)
2638 .alias("sub")?
2639 .project(vec![leaf_udf(col("sub.user"), "name")])?
2640 .build()?;
2641
2642 assert_stages!(plan, @r#"
2643 ## Original Plan
2644 Projection: leaf_udf(sub.user, Utf8("name"))
2645 SubqueryAlias: sub
2646 TableScan: test projection=[user]
2647
2648 ## After Extraction
2649 (same as original)
2650
2651 ## After Pushdown
2652 Projection: __datafusion_extracted_1 AS leaf_udf(sub.user,Utf8("name"))
2653 SubqueryAlias: sub
2654 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2655 TableScan: test projection=[user]
2656
2657 ## Optimized
2658 Projection: __datafusion_extracted_1 AS leaf_udf(sub.user,Utf8("name"))
2659 SubqueryAlias: sub
2660 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2661 TableScan: test projection=[user]
2662 "#)
2663 }
2664
2665 #[test]
2667 fn test_extract_through_subquery_alias_with_filter() -> Result<()> {
2668 let table_scan = test_table_scan_with_struct()?;
2669 let plan = LogicalPlanBuilder::from(table_scan)
2670 .alias("sub")?
2671 .filter(leaf_udf(col("sub.user"), "status").eq(lit("active")))?
2672 .project(vec![leaf_udf(col("sub.user"), "name")])?
2673 .build()?;
2674
2675 assert_stages!(plan, @r#"
2676 ## Original Plan
2677 Projection: leaf_udf(sub.user, Utf8("name"))
2678 Filter: leaf_udf(sub.user, Utf8("status")) = Utf8("active")
2679 SubqueryAlias: sub
2680 TableScan: test projection=[user]
2681
2682 ## After Extraction
2683 Projection: leaf_udf(sub.user, Utf8("name"))
2684 Projection: sub.user
2685 Filter: __datafusion_extracted_1 = Utf8("active")
2686 Projection: leaf_udf(sub.user, Utf8("status")) AS __datafusion_extracted_1, sub.user
2687 SubqueryAlias: sub
2688 TableScan: test projection=[user]
2689
2690 ## After Pushdown
2691 Projection: __datafusion_extracted_2 AS leaf_udf(sub.user,Utf8("name"))
2692 Filter: __datafusion_extracted_1 = Utf8("active")
2693 SubqueryAlias: sub
2694 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.user
2695 TableScan: test projection=[user]
2696
2697 ## Optimized
2698 Projection: __datafusion_extracted_2 AS leaf_udf(sub.user,Utf8("name"))
2699 Filter: __datafusion_extracted_1 = Utf8("active")
2700 SubqueryAlias: sub
2701 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2702 TableScan: test projection=[user]
2703 "#)
2704 }
2705
2706 #[test]
2708 fn test_extract_through_nested_subquery_alias() -> Result<()> {
2709 let table_scan = test_table_scan_with_struct()?;
2710 let plan = LogicalPlanBuilder::from(table_scan)
2711 .alias("inner_sub")?
2712 .alias("outer_sub")?
2713 .project(vec![leaf_udf(col("outer_sub.user"), "name")])?
2714 .build()?;
2715
2716 assert_stages!(plan, @r#"
2717 ## Original Plan
2718 Projection: leaf_udf(outer_sub.user, Utf8("name"))
2719 SubqueryAlias: outer_sub
2720 SubqueryAlias: inner_sub
2721 TableScan: test projection=[user]
2722
2723 ## After Extraction
2724 (same as original)
2725
2726 ## After Pushdown
2727 Projection: __datafusion_extracted_1 AS leaf_udf(outer_sub.user,Utf8("name"))
2728 SubqueryAlias: outer_sub
2729 SubqueryAlias: inner_sub
2730 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1, test.user
2731 TableScan: test projection=[user]
2732
2733 ## Optimized
2734 Projection: __datafusion_extracted_1 AS leaf_udf(outer_sub.user,Utf8("name"))
2735 SubqueryAlias: outer_sub
2736 SubqueryAlias: inner_sub
2737 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_1
2738 TableScan: test projection=[user]
2739 "#)
2740 }
2741
2742 #[test]
2744 fn test_subquery_alias_no_extraction() -> Result<()> {
2745 let table_scan = test_table_scan()?;
2746 let plan = LogicalPlanBuilder::from(table_scan)
2747 .alias("sub")?
2748 .project(vec![col("sub.a"), col("sub.b")])?
2749 .build()?;
2750
2751 assert_stages!(plan, @"
2752 ## Original Plan
2753 SubqueryAlias: sub
2754 TableScan: test projection=[a, b]
2755
2756 ## After Extraction
2757 (same as original)
2758
2759 ## After Pushdown
2760 (same as after extraction)
2761
2762 ## Optimized
2763 (same as after pushdown)
2764 ")
2765 }
2766
2767 #[test]
2771 fn test_different_udfs_same_schema_name_not_deduplicated() -> Result<()> {
2772 let udf_a = Arc::new(ScalarUDF::new_from_impl(
2773 PlacementTestUDF::new()
2774 .with_placement(ExpressionPlacement::MoveTowardsLeafNodes)
2775 .with_id(1),
2776 ));
2777 let udf_b = Arc::new(ScalarUDF::new_from_impl(
2778 PlacementTestUDF::new()
2779 .with_placement(ExpressionPlacement::MoveTowardsLeafNodes)
2780 .with_id(2),
2781 ));
2782
2783 let expr_a = Expr::ScalarFunction(ScalarFunction::new_udf(
2784 udf_a,
2785 vec![col("user"), lit("field")],
2786 ));
2787 let expr_b = Expr::ScalarFunction(ScalarFunction::new_udf(
2788 udf_b,
2789 vec![col("user"), lit("field")],
2790 ));
2791
2792 assert_eq!(
2794 expr_a.schema_name().to_string(),
2795 expr_b.schema_name().to_string(),
2796 "Both expressions should have the same schema_name"
2797 );
2798 assert_ne!(
2799 expr_a, expr_b,
2800 "Expressions should NOT be equal (different UDF instances)"
2801 );
2802
2803 let table_scan = test_table_scan_with_struct()?;
2804 let plan = LogicalPlanBuilder::from(table_scan.clone())
2805 .filter(expr_a.clone().eq(lit("a")).and(expr_b.clone().eq(lit("b"))))?
2806 .select(vec![
2807 table_scan
2808 .schema()
2809 .index_of_column_by_name(None, "id")
2810 .unwrap(),
2811 ])?
2812 .build()?;
2813
2814 assert_stages!(plan, @r#"
2815 ## Original Plan
2816 Projection: test.id
2817 Filter: leaf_udf(test.user, Utf8("field")) = Utf8("a") AND leaf_udf(test.user, Utf8("field")) = Utf8("b")
2818 TableScan: test projection=[id, user]
2819
2820 ## After Extraction
2821 Projection: test.id
2822 Projection: test.id, test.user
2823 Filter: __datafusion_extracted_1 = Utf8("a") AND __datafusion_extracted_2 = Utf8("b")
2824 Projection: leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_2, test.id, test.user
2825 TableScan: test projection=[id, user]
2826
2827 ## After Pushdown
2828 (same as after extraction)
2829
2830 ## Optimized
2831 Projection: test.id
2832 Filter: __datafusion_extracted_1 = Utf8("a") AND __datafusion_extracted_2 = Utf8("b")
2833 Projection: leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_1, leaf_udf(test.user, Utf8("field")) AS __datafusion_extracted_2, test.id
2834 TableScan: test projection=[id, user]
2835 "#)
2836 }
2837
2838 #[test]
2845 fn test_extraction_pushdown_through_filter_with_extracted_predicate() -> Result<()> {
2846 let table_scan = test_table_scan_with_struct()?;
2847 let plan = LogicalPlanBuilder::from(table_scan)
2848 .filter(leaf_udf(col("user"), "status").eq(lit("active")))?
2849 .project(vec![col("id"), leaf_udf(col("user"), "name")])?
2850 .build()?;
2851
2852 assert_stages!(plan, @r#"
2853 ## Original Plan
2854 Projection: test.id, leaf_udf(test.user, Utf8("name"))
2855 Filter: leaf_udf(test.user, Utf8("status")) = Utf8("active")
2856 TableScan: test projection=[id, user]
2857
2858 ## After Extraction
2859 Projection: test.id, leaf_udf(test.user, Utf8("name"))
2860 Projection: test.id, test.user
2861 Filter: __datafusion_extracted_1 = Utf8("active")
2862 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2863 TableScan: test projection=[id, user]
2864
2865 ## After Pushdown
2866 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
2867 Filter: __datafusion_extracted_1 = Utf8("active")
2868 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2869 TableScan: test projection=[id, user]
2870
2871 ## Optimized
2872 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name"))
2873 Filter: __datafusion_extracted_1 = Utf8("active")
2874 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2
2875 TableScan: test projection=[id, user]
2876 "#)
2877 }
2878
2879 #[test]
2881 fn test_extraction_pushdown_same_expr_in_filter_and_projection() -> Result<()> {
2882 let table_scan = test_table_scan_with_struct()?;
2883 let field_expr = leaf_udf(col("user"), "status");
2884 let plan = LogicalPlanBuilder::from(table_scan)
2885 .filter(field_expr.clone().gt(lit(5)))?
2886 .project(vec![col("id"), field_expr])?
2887 .build()?;
2888
2889 assert_stages!(plan, @r#"
2890 ## Original Plan
2891 Projection: test.id, leaf_udf(test.user, Utf8("status"))
2892 Filter: leaf_udf(test.user, Utf8("status")) > Int32(5)
2893 TableScan: test projection=[id, user]
2894
2895 ## After Extraction
2896 Projection: test.id, leaf_udf(test.user, Utf8("status"))
2897 Projection: test.id, test.user
2898 Filter: __datafusion_extracted_1 > Int32(5)
2899 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2900 TableScan: test projection=[id, user]
2901
2902 ## After Pushdown
2903 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("status"))
2904 Filter: __datafusion_extracted_1 > Int32(5)
2905 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2
2906 TableScan: test projection=[id, user]
2907
2908 ## Optimized
2909 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("status"))
2910 Filter: __datafusion_extracted_1 > Int32(5)
2911 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_2
2912 TableScan: test projection=[id, user]
2913 "#)
2914 }
2915
2916 #[test]
2919 fn test_left_join_with_filter_and_projection_extraction() -> Result<()> {
2920 use datafusion_expr::JoinType;
2921
2922 let left = test_table_scan_with_struct()?;
2923 let right = test_table_scan_with_struct_named("right")?;
2924
2925 let plan = LogicalPlanBuilder::from(left)
2926 .join_on(
2927 right,
2928 JoinType::Left,
2929 vec![
2930 col("test.id").eq(col("right.id")),
2931 leaf_udf(col("right.user"), "status").gt(lit(5)),
2932 ],
2933 )?
2934 .project(vec![
2935 col("test.id"),
2936 leaf_udf(col("test.user"), "name"),
2937 leaf_udf(col("right.user"), "status"),
2938 ])?
2939 .build()?;
2940
2941 assert_stages!(plan, @r#"
2942 ## Original Plan
2943 Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(right.user, Utf8("status"))
2944 Left Join: Filter: test.id = right.id AND leaf_udf(right.user, Utf8("status")) > Int32(5)
2945 TableScan: test projection=[id, user]
2946 TableScan: right projection=[id, user]
2947
2948 ## After Extraction
2949 Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(right.user, Utf8("status"))
2950 Projection: test.id, test.user, right.id, right.user
2951 Left Join: Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2952 TableScan: test projection=[id, user]
2953 Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user
2954 TableScan: right projection=[id, user]
2955
2956 ## After Pushdown
2957 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(right.user,Utf8("status"))
2958 Left Join: Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2959 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.id, test.user
2960 TableScan: test projection=[id, user]
2961 Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, right.user, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_3
2962 TableScan: right projection=[id, user]
2963
2964 ## Optimized
2965 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(right.user,Utf8("status"))
2966 Left Join: Filter: test.id = right.id AND __datafusion_extracted_1 > Int32(5)
2967 Projection: leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, test.id
2968 TableScan: test projection=[id, user]
2969 Projection: leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_1, right.id, leaf_udf(right.user, Utf8("status")) AS __datafusion_extracted_3
2970 TableScan: right projection=[id, user]
2971 "#)
2972 }
2973
2974 #[test]
2977 fn test_pure_extraction_proj_push_through_filter() -> Result<()> {
2978 let table_scan = test_table_scan_with_struct()?;
2979 let plan = LogicalPlanBuilder::from(table_scan)
2980 .filter(leaf_udf(col("user"), "status").gt(lit(5)))?
2981 .project(vec![
2982 col("id"),
2983 leaf_udf(col("user"), "name"),
2984 leaf_udf(col("user"), "status"),
2985 ])?
2986 .build()?;
2987
2988 assert_stages!(plan, @r#"
2989 ## Original Plan
2990 Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("status"))
2991 Filter: leaf_udf(test.user, Utf8("status")) > Int32(5)
2992 TableScan: test projection=[id, user]
2993
2994 ## After Extraction
2995 Projection: test.id, leaf_udf(test.user, Utf8("name")), leaf_udf(test.user, Utf8("status"))
2996 Projection: test.id, test.user
2997 Filter: __datafusion_extracted_1 > Int32(5)
2998 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user
2999 TableScan: test projection=[id, user]
3000
3001 ## After Pushdown
3002 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(test.user,Utf8("status"))
3003 Filter: __datafusion_extracted_1 > Int32(5)
3004 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, test.user, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_3
3005 TableScan: test projection=[id, user]
3006
3007 ## Optimized
3008 Projection: test.id, __datafusion_extracted_2 AS leaf_udf(test.user,Utf8("name")), __datafusion_extracted_3 AS leaf_udf(test.user,Utf8("status"))
3009 Filter: __datafusion_extracted_1 > Int32(5)
3010 Projection: leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1, test.id, leaf_udf(test.user, Utf8("name")) AS __datafusion_extracted_2, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_3
3011 TableScan: test projection=[id, user]
3012 "#)
3013 }
3014
3015 #[test]
3019 fn test_merge_extraction_into_projection_with_column_ref_inflation() -> Result<()> {
3020 let table_scan = test_table_scan_with_struct()?;
3021
3022 let inner = LogicalPlanBuilder::from(table_scan)
3024 .project(vec![col("user"), col("id")])?
3025 .build()?;
3026
3027 let plan = LogicalPlanBuilder::from(inner)
3030 .project(vec![
3031 leaf_udf(col("user"), "status")
3032 .alias(format!("{EXTRACTED_EXPR_PREFIX}_1")),
3033 col("id"),
3034 ])?
3035 .build()?;
3036
3037 let ctx = OptimizerContext::new().with_max_passes(1);
3039 let optimizer =
3040 Optimizer::with_rules(vec![Arc::new(PushDownLeafProjections::new())]);
3041 let result = optimizer.optimize(plan, &ctx, |_, _| {})?;
3042
3043 insta::assert_snapshot!(format!("{result}"), @r#"
3046 Projection: __datafusion_extracted_1, test.id
3047 Projection: test.user, test.id, leaf_udf(test.user, Utf8("status")) AS __datafusion_extracted_1
3048 TableScan: test
3049 "#);
3050
3051 Ok(())
3052 }
3053}