datafusion_optimizer/
propagate_empty_relation.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PropagateEmptyRelation`] eliminates nodes fed by `EmptyRelation`
19
20use std::sync::Arc;
21
22use datafusion_common::tree_node::Transformed;
23use datafusion_common::JoinType;
24use datafusion_common::{plan_err, Result};
25use datafusion_expr::logical_plan::LogicalPlan;
26use datafusion_expr::{EmptyRelation, Projection, Union};
27
28use crate::optimizer::ApplyOrder;
29use crate::{OptimizerConfig, OptimizerRule};
30
31/// Optimization rule that bottom-up to eliminate plan by propagating empty_relation.
32#[derive(Default, Debug)]
33pub struct PropagateEmptyRelation;
34
35impl PropagateEmptyRelation {
36    #[allow(missing_docs)]
37    pub fn new() -> Self {
38        Self {}
39    }
40}
41
42impl OptimizerRule for PropagateEmptyRelation {
43    fn name(&self) -> &str {
44        "propagate_empty_relation"
45    }
46
47    fn apply_order(&self) -> Option<ApplyOrder> {
48        Some(ApplyOrder::BottomUp)
49    }
50
51    fn supports_rewrite(&self) -> bool {
52        true
53    }
54
55    fn rewrite(
56        &self,
57        plan: LogicalPlan,
58        _config: &dyn OptimizerConfig,
59    ) -> Result<Transformed<LogicalPlan>> {
60        match plan {
61            LogicalPlan::EmptyRelation(_) => Ok(Transformed::no(plan)),
62            LogicalPlan::Projection(_)
63            | LogicalPlan::Filter(_)
64            | LogicalPlan::Window(_)
65            | LogicalPlan::Sort(_)
66            | LogicalPlan::SubqueryAlias(_)
67            | LogicalPlan::Repartition(_)
68            | LogicalPlan::Limit(_) => {
69                let empty = empty_child(&plan)?;
70                if let Some(empty_plan) = empty {
71                    return Ok(Transformed::yes(empty_plan));
72                }
73                Ok(Transformed::no(plan))
74            }
75            LogicalPlan::Join(ref join) => {
76                // TODO: For Join, more join type need to be careful:
77                // For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
78                // columns + right side columns replaced with null values.
79                // For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
80                // columns + left side columns replaced with null values.
81                let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
82
83                match join.join_type {
84                    // For Full Join, only both sides are empty, the Join result is empty.
85                    JoinType::Full if left_empty && right_empty => Ok(Transformed::yes(
86                        LogicalPlan::EmptyRelation(EmptyRelation {
87                            produce_one_row: false,
88                            schema: Arc::clone(&join.schema),
89                        }),
90                    )),
91                    JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes(
92                        LogicalPlan::EmptyRelation(EmptyRelation {
93                            produce_one_row: false,
94                            schema: Arc::clone(&join.schema),
95                        }),
96                    )),
97                    JoinType::Left if left_empty => Ok(Transformed::yes(
98                        LogicalPlan::EmptyRelation(EmptyRelation {
99                            produce_one_row: false,
100                            schema: Arc::clone(&join.schema),
101                        }),
102                    )),
103                    JoinType::Right if right_empty => Ok(Transformed::yes(
104                        LogicalPlan::EmptyRelation(EmptyRelation {
105                            produce_one_row: false,
106                            schema: Arc::clone(&join.schema),
107                        }),
108                    )),
109                    JoinType::LeftSemi if left_empty || right_empty => Ok(
110                        Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
111                            produce_one_row: false,
112                            schema: Arc::clone(&join.schema),
113                        })),
114                    ),
115                    JoinType::RightSemi if left_empty || right_empty => Ok(
116                        Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
117                            produce_one_row: false,
118                            schema: Arc::clone(&join.schema),
119                        })),
120                    ),
121                    JoinType::LeftAnti if left_empty => Ok(Transformed::yes(
122                        LogicalPlan::EmptyRelation(EmptyRelation {
123                            produce_one_row: false,
124                            schema: Arc::clone(&join.schema),
125                        }),
126                    )),
127                    JoinType::LeftAnti if right_empty => {
128                        Ok(Transformed::yes((*join.left).clone()))
129                    }
130                    JoinType::RightAnti if left_empty => {
131                        Ok(Transformed::yes((*join.right).clone()))
132                    }
133                    JoinType::RightAnti if right_empty => Ok(Transformed::yes(
134                        LogicalPlan::EmptyRelation(EmptyRelation {
135                            produce_one_row: false,
136                            schema: Arc::clone(&join.schema),
137                        }),
138                    )),
139                    _ => Ok(Transformed::no(plan)),
140                }
141            }
142            LogicalPlan::Aggregate(ref agg) => {
143                if !agg.group_expr.is_empty() {
144                    if let Some(empty_plan) = empty_child(&plan)? {
145                        return Ok(Transformed::yes(empty_plan));
146                    }
147                }
148                Ok(Transformed::no(LogicalPlan::Aggregate(agg.clone())))
149            }
150            LogicalPlan::Union(ref union) => {
151                let new_inputs = union
152                    .inputs
153                    .iter()
154                    .filter(|input| match &***input {
155                        LogicalPlan::EmptyRelation(empty) => empty.produce_one_row,
156                        _ => true,
157                    })
158                    .cloned()
159                    .collect::<Vec<_>>();
160
161                if new_inputs.len() == union.inputs.len() {
162                    Ok(Transformed::no(plan))
163                } else if new_inputs.is_empty() {
164                    Ok(Transformed::yes(LogicalPlan::EmptyRelation(
165                        EmptyRelation {
166                            produce_one_row: false,
167                            schema: Arc::clone(plan.schema()),
168                        },
169                    )))
170                } else if new_inputs.len() == 1 {
171                    let mut new_inputs = new_inputs;
172                    let input_plan = new_inputs.pop().unwrap(); // length checked
173                    let child = Arc::unwrap_or_clone(input_plan);
174                    if child.schema().eq(plan.schema()) {
175                        Ok(Transformed::yes(child))
176                    } else {
177                        Ok(Transformed::yes(LogicalPlan::Projection(
178                            Projection::new_from_schema(
179                                Arc::new(child),
180                                Arc::clone(plan.schema()),
181                            ),
182                        )))
183                    }
184                } else {
185                    Ok(Transformed::yes(LogicalPlan::Union(Union {
186                        inputs: new_inputs,
187                        schema: Arc::clone(&union.schema),
188                    })))
189                }
190            }
191
192            _ => Ok(Transformed::no(plan)),
193        }
194    }
195}
196
197fn binary_plan_children_is_empty(plan: &LogicalPlan) -> Result<(bool, bool)> {
198    match plan.inputs()[..] {
199        [left, right] => {
200            let left_empty = match left {
201                LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row,
202                _ => false,
203            };
204            let right_empty = match right {
205                LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row,
206                _ => false,
207            };
208            Ok((left_empty, right_empty))
209        }
210        _ => plan_err!("plan just can have two child"),
211    }
212}
213
214fn empty_child(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
215    match plan.inputs()[..] {
216        [child] => match child {
217            LogicalPlan::EmptyRelation(empty) => {
218                if !empty.produce_one_row {
219                    Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
220                        produce_one_row: false,
221                        schema: Arc::clone(plan.schema()),
222                    })))
223                } else {
224                    Ok(None)
225                }
226            }
227            _ => Ok(None),
228        },
229        _ => plan_err!("plan just can have one child"),
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use std::sync::Arc;
236
237    use arrow::datatypes::{DataType, Field, Schema};
238
239    use datafusion_common::{Column, DFSchema, JoinType};
240    use datafusion_expr::logical_plan::table_scan;
241    use datafusion_expr::{
242        binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Operator,
243    };
244
245    use crate::assert_optimized_plan_eq_snapshot;
246    use crate::eliminate_filter::EliminateFilter;
247    use crate::eliminate_nested_union::EliminateNestedUnion;
248    use crate::test::{
249        assert_optimized_plan_with_rules, test_table_scan, test_table_scan_fields,
250        test_table_scan_with_name,
251    };
252    use crate::OptimizerContext;
253
254    use super::*;
255
256    macro_rules! assert_optimized_plan_equal {
257        (
258            $plan:expr,
259            @ $expected:literal $(,)?
260        ) => {{
261            let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
262            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(PropagateEmptyRelation::new())];
263            assert_optimized_plan_eq_snapshot!(
264                optimizer_ctx,
265                rules,
266                $plan,
267                @ $expected,
268            )
269        }};
270    }
271
272    fn assert_together_optimized_plan(
273        plan: LogicalPlan,
274        expected: &str,
275        eq: bool,
276    ) -> Result<()> {
277        assert_optimized_plan_with_rules(
278            vec![
279                Arc::new(EliminateFilter::new()),
280                Arc::new(EliminateNestedUnion::new()),
281                Arc::new(PropagateEmptyRelation::new()),
282            ],
283            plan,
284            expected,
285            eq,
286        )
287    }
288
289    #[test]
290    fn propagate_empty() -> Result<()> {
291        let plan = LogicalPlanBuilder::empty(false)
292            .filter(lit(true))?
293            .limit(10, None)?
294            .project(vec![binary_expr(lit(1), Operator::Plus, lit(1))])?
295            .build()?;
296
297        assert_optimized_plan_equal!(plan, @"EmptyRelation")
298    }
299
300    #[test]
301    fn cooperate_with_eliminate_filter() -> Result<()> {
302        let table_scan = test_table_scan()?;
303        let left = LogicalPlanBuilder::from(table_scan).build()?;
304        let right_table_scan = test_table_scan_with_name("test2")?;
305        let right = LogicalPlanBuilder::from(right_table_scan)
306            .project(vec![col("a")])?
307            .filter(lit(false))?
308            .build()?;
309
310        let plan = LogicalPlanBuilder::from(left)
311            .join_using(
312                right,
313                JoinType::Inner,
314                vec![Column::from_name("a".to_string())],
315            )?
316            .filter(col("a").lt_eq(lit(1i64)))?
317            .build()?;
318
319        let expected = "EmptyRelation";
320        assert_together_optimized_plan(plan, expected, true)
321    }
322
323    #[test]
324    fn propagate_union_empty() -> Result<()> {
325        let left = LogicalPlanBuilder::from(test_table_scan()?).build()?;
326        let right = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
327            .filter(lit(false))?
328            .build()?;
329
330        let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;
331
332        let expected = "Projection: a, b, c\n  TableScan: test";
333        assert_together_optimized_plan(plan, expected, true)
334    }
335
336    #[test]
337    fn propagate_union_multi_empty() -> Result<()> {
338        let one =
339            LogicalPlanBuilder::from(test_table_scan_with_name("test1")?).build()?;
340        let two = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
341            .filter(lit(false))?
342            .build()?;
343        let three = LogicalPlanBuilder::from(test_table_scan_with_name("test3")?)
344            .filter(lit(false))?
345            .build()?;
346        let four =
347            LogicalPlanBuilder::from(test_table_scan_with_name("test4")?).build()?;
348
349        let plan = LogicalPlanBuilder::from(one)
350            .union(two)?
351            .union(three)?
352            .union(four)?
353            .build()?;
354
355        let expected = "Union\
356            \n  TableScan: test1\
357            \n  TableScan: test4";
358        assert_together_optimized_plan(plan, expected, true)
359    }
360
361    #[test]
362    fn propagate_union_all_empty() -> Result<()> {
363        let one = LogicalPlanBuilder::from(test_table_scan_with_name("test1")?)
364            .filter(lit(false))?
365            .build()?;
366        let two = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
367            .filter(lit(false))?
368            .build()?;
369        let three = LogicalPlanBuilder::from(test_table_scan_with_name("test3")?)
370            .filter(lit(false))?
371            .build()?;
372        let four = LogicalPlanBuilder::from(test_table_scan_with_name("test4")?)
373            .filter(lit(false))?
374            .build()?;
375
376        let plan = LogicalPlanBuilder::from(one)
377            .union(two)?
378            .union(three)?
379            .union(four)?
380            .build()?;
381
382        let expected = "EmptyRelation";
383        assert_together_optimized_plan(plan, expected, true)
384    }
385
386    #[test]
387    fn propagate_union_children_different_schema() -> Result<()> {
388        let one_schema = Schema::new(vec![Field::new("t1a", DataType::UInt32, false)]);
389        let t1_scan = table_scan(Some("test1"), &one_schema, None)?.build()?;
390        let one = LogicalPlanBuilder::from(t1_scan)
391            .filter(lit(false))?
392            .build()?;
393
394        let two_schema = Schema::new(vec![Field::new("t2a", DataType::UInt32, false)]);
395        let t2_scan = table_scan(Some("test2"), &two_schema, None)?.build()?;
396        let two = LogicalPlanBuilder::from(t2_scan).build()?;
397
398        let three_schema = Schema::new(vec![Field::new("t3a", DataType::UInt32, false)]);
399        let t3_scan = table_scan(Some("test3"), &three_schema, None)?.build()?;
400        let three = LogicalPlanBuilder::from(t3_scan).build()?;
401
402        let plan = LogicalPlanBuilder::from(one)
403            .union(two)?
404            .union(three)?
405            .build()?;
406
407        let expected = "Union\
408            \n  TableScan: test2\
409            \n  TableScan: test3";
410        assert_together_optimized_plan(plan, expected, true)
411    }
412
413    #[test]
414    fn propagate_union_alias() -> Result<()> {
415        let left = LogicalPlanBuilder::from(test_table_scan()?).build()?;
416        let right = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
417            .filter(lit(false))?
418            .build()?;
419
420        let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;
421
422        let expected = "Projection: a, b, c\n  TableScan: test";
423        assert_together_optimized_plan(plan, expected, true)
424    }
425
426    #[test]
427    fn cross_join_empty() -> Result<()> {
428        let table_scan = test_table_scan()?;
429        let left = LogicalPlanBuilder::from(table_scan).build()?;
430        let right = LogicalPlanBuilder::empty(false).build()?;
431
432        let plan = LogicalPlanBuilder::from(left)
433            .cross_join(right)?
434            .filter(col("a").lt_eq(lit(1i64)))?
435            .build()?;
436
437        let expected = "EmptyRelation";
438        assert_together_optimized_plan(plan, expected, true)
439    }
440
441    fn assert_empty_left_empty_right_lp(
442        left_empty: bool,
443        right_empty: bool,
444        join_type: JoinType,
445        eq: bool,
446    ) -> Result<()> {
447        let left_lp = if left_empty {
448            let left_table_scan = test_table_scan()?;
449
450            LogicalPlanBuilder::from(left_table_scan)
451                .filter(lit(false))?
452                .build()
453        } else {
454            let scan = test_table_scan_with_name("left").unwrap();
455            LogicalPlanBuilder::from(scan).build()
456        }?;
457
458        let right_lp = if right_empty {
459            let right_table_scan = test_table_scan_with_name("right")?;
460
461            LogicalPlanBuilder::from(right_table_scan)
462                .filter(lit(false))?
463                .build()
464        } else {
465            let scan = test_table_scan_with_name("right").unwrap();
466            LogicalPlanBuilder::from(scan).build()
467        }?;
468
469        let plan = LogicalPlanBuilder::from(left_lp)
470            .join_using(
471                right_lp,
472                join_type,
473                vec![Column::from_name("a".to_string())],
474            )?
475            .build()?;
476
477        let expected = "EmptyRelation";
478        assert_together_optimized_plan(plan, expected, eq)
479    }
480
481    // TODO: fix this long name
482    fn assert_anti_join_empty_join_table_is_base_table(
483        anti_left_join: bool,
484    ) -> Result<()> {
485        // if we have an anti join with an empty join table, then the result is the base_table
486        let (left, right, join_type, expected) = if anti_left_join {
487            let left = test_table_scan()?;
488            let right = LogicalPlanBuilder::from(test_table_scan()?)
489                .filter(lit(false))?
490                .build()?;
491            let expected = left.display_indent().to_string();
492            (left, right, JoinType::LeftAnti, expected)
493        } else {
494            let right = test_table_scan()?;
495            let left = LogicalPlanBuilder::from(test_table_scan()?)
496                .filter(lit(false))?
497                .build()?;
498            let expected = right.display_indent().to_string();
499            (left, right, JoinType::RightAnti, expected)
500        };
501
502        let plan = LogicalPlanBuilder::from(left)
503            .join_using(right, join_type, vec![Column::from_name("a".to_string())])?
504            .build()?;
505
506        assert_together_optimized_plan(plan, &expected, true)
507    }
508
509    #[test]
510    fn test_join_empty_propagation_rules() -> Result<()> {
511        // test full join with empty left and empty right
512        assert_empty_left_empty_right_lp(true, true, JoinType::Full, true)?;
513
514        // test left join with empty left
515        assert_empty_left_empty_right_lp(true, false, JoinType::Left, true)?;
516
517        // test right join with empty right
518        assert_empty_left_empty_right_lp(false, true, JoinType::Right, true)?;
519
520        // test left semi join with empty left
521        assert_empty_left_empty_right_lp(true, false, JoinType::LeftSemi, true)?;
522
523        // test left semi join with empty right
524        assert_empty_left_empty_right_lp(false, true, JoinType::LeftSemi, true)?;
525
526        // test right semi join with empty left
527        assert_empty_left_empty_right_lp(true, false, JoinType::RightSemi, true)?;
528
529        // test right semi join with empty right
530        assert_empty_left_empty_right_lp(false, true, JoinType::RightSemi, true)?;
531
532        // test left anti join empty left
533        assert_empty_left_empty_right_lp(true, false, JoinType::LeftAnti, true)?;
534
535        // test right anti join empty right
536        assert_empty_left_empty_right_lp(false, true, JoinType::RightAnti, true)?;
537
538        // test left anti join empty right
539        assert_anti_join_empty_join_table_is_base_table(true)?;
540
541        // test right anti join empty left
542        assert_anti_join_empty_join_table_is_base_table(false)
543    }
544
545    #[test]
546    fn test_join_empty_propagation_rules_noop() -> Result<()> {
547        // these cases should not result in an empty relation
548
549        // test left join with empty right
550        assert_empty_left_empty_right_lp(false, true, JoinType::Left, false)?;
551
552        // test right join with empty left
553        assert_empty_left_empty_right_lp(true, false, JoinType::Right, false)?;
554
555        // test left semi with non-empty left and right
556        assert_empty_left_empty_right_lp(false, false, JoinType::LeftSemi, false)?;
557
558        // test right semi with non-empty left and right
559        assert_empty_left_empty_right_lp(false, false, JoinType::RightSemi, false)?;
560
561        // test left anti join with non-empty left and right
562        assert_empty_left_empty_right_lp(false, false, JoinType::LeftAnti, false)?;
563
564        // test left anti with non-empty left and empty right
565        assert_empty_left_empty_right_lp(false, true, JoinType::LeftAnti, false)?;
566
567        // test right anti join with non-empty left and right
568        assert_empty_left_empty_right_lp(false, false, JoinType::RightAnti, false)?;
569
570        // test right anti with empty left and non-empty right
571        assert_empty_left_empty_right_lp(true, false, JoinType::RightAnti, false)
572    }
573
574    #[test]
575    fn test_empty_with_non_empty() -> Result<()> {
576        let table_scan = test_table_scan()?;
577
578        let fields = test_table_scan_fields();
579
580        let empty = LogicalPlan::EmptyRelation(EmptyRelation {
581            produce_one_row: false,
582            schema: Arc::new(DFSchema::from_unqualified_fields(
583                fields.into(),
584                Default::default(),
585            )?),
586        });
587
588        let one = LogicalPlanBuilder::from(empty.clone()).build()?;
589        let two = LogicalPlanBuilder::from(table_scan).build()?;
590        let three = LogicalPlanBuilder::from(empty).build()?;
591
592        // Union
593        //  EmptyRelation
594        //  TableScan: test
595        //  EmptyRelation
596        let plan = LogicalPlanBuilder::from(one)
597            .union(two)?
598            .union(three)?
599            .build()?;
600
601        let expected = "Projection: a, b, c\
602        \n  TableScan: test";
603
604        assert_together_optimized_plan(plan, expected, true)
605    }
606}