1use std::sync::Arc;
21
22use datafusion_common::tree_node::Transformed;
23use datafusion_common::JoinType;
24use datafusion_common::{plan_err, Result};
25use datafusion_expr::logical_plan::LogicalPlan;
26use datafusion_expr::{EmptyRelation, Projection, Union};
27
28use crate::optimizer::ApplyOrder;
29use crate::{OptimizerConfig, OptimizerRule};
30
31#[derive(Default, Debug)]
33pub struct PropagateEmptyRelation;
34
35impl PropagateEmptyRelation {
36 #[allow(missing_docs)]
37 pub fn new() -> Self {
38 Self {}
39 }
40}
41
42impl OptimizerRule for PropagateEmptyRelation {
43 fn name(&self) -> &str {
44 "propagate_empty_relation"
45 }
46
47 fn apply_order(&self) -> Option<ApplyOrder> {
48 Some(ApplyOrder::BottomUp)
49 }
50
51 fn supports_rewrite(&self) -> bool {
52 true
53 }
54
55 fn rewrite(
56 &self,
57 plan: LogicalPlan,
58 _config: &dyn OptimizerConfig,
59 ) -> Result<Transformed<LogicalPlan>> {
60 match plan {
61 LogicalPlan::EmptyRelation(_) => Ok(Transformed::no(plan)),
62 LogicalPlan::Projection(_)
63 | LogicalPlan::Filter(_)
64 | LogicalPlan::Window(_)
65 | LogicalPlan::Sort(_)
66 | LogicalPlan::SubqueryAlias(_)
67 | LogicalPlan::Repartition(_)
68 | LogicalPlan::Limit(_) => {
69 let empty = empty_child(&plan)?;
70 if let Some(empty_plan) = empty {
71 return Ok(Transformed::yes(empty_plan));
72 }
73 Ok(Transformed::no(plan))
74 }
75 LogicalPlan::Join(ref join) => {
76 let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
82
83 match join.join_type {
84 JoinType::Full if left_empty && right_empty => Ok(Transformed::yes(
86 LogicalPlan::EmptyRelation(EmptyRelation {
87 produce_one_row: false,
88 schema: Arc::clone(&join.schema),
89 }),
90 )),
91 JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes(
92 LogicalPlan::EmptyRelation(EmptyRelation {
93 produce_one_row: false,
94 schema: Arc::clone(&join.schema),
95 }),
96 )),
97 JoinType::Left if left_empty => Ok(Transformed::yes(
98 LogicalPlan::EmptyRelation(EmptyRelation {
99 produce_one_row: false,
100 schema: Arc::clone(&join.schema),
101 }),
102 )),
103 JoinType::Right if right_empty => Ok(Transformed::yes(
104 LogicalPlan::EmptyRelation(EmptyRelation {
105 produce_one_row: false,
106 schema: Arc::clone(&join.schema),
107 }),
108 )),
109 JoinType::LeftSemi if left_empty || right_empty => Ok(
110 Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
111 produce_one_row: false,
112 schema: Arc::clone(&join.schema),
113 })),
114 ),
115 JoinType::RightSemi if left_empty || right_empty => Ok(
116 Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
117 produce_one_row: false,
118 schema: Arc::clone(&join.schema),
119 })),
120 ),
121 JoinType::LeftAnti if left_empty => Ok(Transformed::yes(
122 LogicalPlan::EmptyRelation(EmptyRelation {
123 produce_one_row: false,
124 schema: Arc::clone(&join.schema),
125 }),
126 )),
127 JoinType::LeftAnti if right_empty => {
128 Ok(Transformed::yes((*join.left).clone()))
129 }
130 JoinType::RightAnti if left_empty => {
131 Ok(Transformed::yes((*join.right).clone()))
132 }
133 JoinType::RightAnti if right_empty => Ok(Transformed::yes(
134 LogicalPlan::EmptyRelation(EmptyRelation {
135 produce_one_row: false,
136 schema: Arc::clone(&join.schema),
137 }),
138 )),
139 _ => Ok(Transformed::no(plan)),
140 }
141 }
142 LogicalPlan::Aggregate(ref agg) => {
143 if !agg.group_expr.is_empty() {
144 if let Some(empty_plan) = empty_child(&plan)? {
145 return Ok(Transformed::yes(empty_plan));
146 }
147 }
148 Ok(Transformed::no(LogicalPlan::Aggregate(agg.clone())))
149 }
150 LogicalPlan::Union(ref union) => {
151 let new_inputs = union
152 .inputs
153 .iter()
154 .filter(|input| match &***input {
155 LogicalPlan::EmptyRelation(empty) => empty.produce_one_row,
156 _ => true,
157 })
158 .cloned()
159 .collect::<Vec<_>>();
160
161 if new_inputs.len() == union.inputs.len() {
162 Ok(Transformed::no(plan))
163 } else if new_inputs.is_empty() {
164 Ok(Transformed::yes(LogicalPlan::EmptyRelation(
165 EmptyRelation {
166 produce_one_row: false,
167 schema: Arc::clone(plan.schema()),
168 },
169 )))
170 } else if new_inputs.len() == 1 {
171 let mut new_inputs = new_inputs;
172 let input_plan = new_inputs.pop().unwrap(); let child = Arc::unwrap_or_clone(input_plan);
174 if child.schema().eq(plan.schema()) {
175 Ok(Transformed::yes(child))
176 } else {
177 Ok(Transformed::yes(LogicalPlan::Projection(
178 Projection::new_from_schema(
179 Arc::new(child),
180 Arc::clone(plan.schema()),
181 ),
182 )))
183 }
184 } else {
185 Ok(Transformed::yes(LogicalPlan::Union(Union {
186 inputs: new_inputs,
187 schema: Arc::clone(&union.schema),
188 })))
189 }
190 }
191
192 _ => Ok(Transformed::no(plan)),
193 }
194 }
195}
196
197fn binary_plan_children_is_empty(plan: &LogicalPlan) -> Result<(bool, bool)> {
198 match plan.inputs()[..] {
199 [left, right] => {
200 let left_empty = match left {
201 LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row,
202 _ => false,
203 };
204 let right_empty = match right {
205 LogicalPlan::EmptyRelation(empty) => !empty.produce_one_row,
206 _ => false,
207 };
208 Ok((left_empty, right_empty))
209 }
210 _ => plan_err!("plan just can have two child"),
211 }
212}
213
214fn empty_child(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
215 match plan.inputs()[..] {
216 [child] => match child {
217 LogicalPlan::EmptyRelation(empty) => {
218 if !empty.produce_one_row {
219 Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
220 produce_one_row: false,
221 schema: Arc::clone(plan.schema()),
222 })))
223 } else {
224 Ok(None)
225 }
226 }
227 _ => Ok(None),
228 },
229 _ => plan_err!("plan just can have one child"),
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use std::sync::Arc;
236
237 use arrow::datatypes::{DataType, Field, Schema};
238
239 use datafusion_common::{Column, DFSchema, JoinType};
240 use datafusion_expr::logical_plan::table_scan;
241 use datafusion_expr::{
242 binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Operator,
243 };
244
245 use crate::assert_optimized_plan_eq_snapshot;
246 use crate::eliminate_filter::EliminateFilter;
247 use crate::eliminate_nested_union::EliminateNestedUnion;
248 use crate::test::{
249 assert_optimized_plan_with_rules, test_table_scan, test_table_scan_fields,
250 test_table_scan_with_name,
251 };
252 use crate::OptimizerContext;
253
254 use super::*;
255
256 macro_rules! assert_optimized_plan_equal {
257 (
258 $plan:expr,
259 @ $expected:literal $(,)?
260 ) => {{
261 let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
262 let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(PropagateEmptyRelation::new())];
263 assert_optimized_plan_eq_snapshot!(
264 optimizer_ctx,
265 rules,
266 $plan,
267 @ $expected,
268 )
269 }};
270 }
271
272 fn assert_together_optimized_plan(
273 plan: LogicalPlan,
274 expected: &str,
275 eq: bool,
276 ) -> Result<()> {
277 assert_optimized_plan_with_rules(
278 vec![
279 Arc::new(EliminateFilter::new()),
280 Arc::new(EliminateNestedUnion::new()),
281 Arc::new(PropagateEmptyRelation::new()),
282 ],
283 plan,
284 expected,
285 eq,
286 )
287 }
288
289 #[test]
290 fn propagate_empty() -> Result<()> {
291 let plan = LogicalPlanBuilder::empty(false)
292 .filter(lit(true))?
293 .limit(10, None)?
294 .project(vec![binary_expr(lit(1), Operator::Plus, lit(1))])?
295 .build()?;
296
297 assert_optimized_plan_equal!(plan, @"EmptyRelation")
298 }
299
300 #[test]
301 fn cooperate_with_eliminate_filter() -> Result<()> {
302 let table_scan = test_table_scan()?;
303 let left = LogicalPlanBuilder::from(table_scan).build()?;
304 let right_table_scan = test_table_scan_with_name("test2")?;
305 let right = LogicalPlanBuilder::from(right_table_scan)
306 .project(vec![col("a")])?
307 .filter(lit(false))?
308 .build()?;
309
310 let plan = LogicalPlanBuilder::from(left)
311 .join_using(
312 right,
313 JoinType::Inner,
314 vec![Column::from_name("a".to_string())],
315 )?
316 .filter(col("a").lt_eq(lit(1i64)))?
317 .build()?;
318
319 let expected = "EmptyRelation";
320 assert_together_optimized_plan(plan, expected, true)
321 }
322
323 #[test]
324 fn propagate_union_empty() -> Result<()> {
325 let left = LogicalPlanBuilder::from(test_table_scan()?).build()?;
326 let right = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
327 .filter(lit(false))?
328 .build()?;
329
330 let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;
331
332 let expected = "Projection: a, b, c\n TableScan: test";
333 assert_together_optimized_plan(plan, expected, true)
334 }
335
336 #[test]
337 fn propagate_union_multi_empty() -> Result<()> {
338 let one =
339 LogicalPlanBuilder::from(test_table_scan_with_name("test1")?).build()?;
340 let two = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
341 .filter(lit(false))?
342 .build()?;
343 let three = LogicalPlanBuilder::from(test_table_scan_with_name("test3")?)
344 .filter(lit(false))?
345 .build()?;
346 let four =
347 LogicalPlanBuilder::from(test_table_scan_with_name("test4")?).build()?;
348
349 let plan = LogicalPlanBuilder::from(one)
350 .union(two)?
351 .union(three)?
352 .union(four)?
353 .build()?;
354
355 let expected = "Union\
356 \n TableScan: test1\
357 \n TableScan: test4";
358 assert_together_optimized_plan(plan, expected, true)
359 }
360
361 #[test]
362 fn propagate_union_all_empty() -> Result<()> {
363 let one = LogicalPlanBuilder::from(test_table_scan_with_name("test1")?)
364 .filter(lit(false))?
365 .build()?;
366 let two = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
367 .filter(lit(false))?
368 .build()?;
369 let three = LogicalPlanBuilder::from(test_table_scan_with_name("test3")?)
370 .filter(lit(false))?
371 .build()?;
372 let four = LogicalPlanBuilder::from(test_table_scan_with_name("test4")?)
373 .filter(lit(false))?
374 .build()?;
375
376 let plan = LogicalPlanBuilder::from(one)
377 .union(two)?
378 .union(three)?
379 .union(four)?
380 .build()?;
381
382 let expected = "EmptyRelation";
383 assert_together_optimized_plan(plan, expected, true)
384 }
385
386 #[test]
387 fn propagate_union_children_different_schema() -> Result<()> {
388 let one_schema = Schema::new(vec![Field::new("t1a", DataType::UInt32, false)]);
389 let t1_scan = table_scan(Some("test1"), &one_schema, None)?.build()?;
390 let one = LogicalPlanBuilder::from(t1_scan)
391 .filter(lit(false))?
392 .build()?;
393
394 let two_schema = Schema::new(vec![Field::new("t2a", DataType::UInt32, false)]);
395 let t2_scan = table_scan(Some("test2"), &two_schema, None)?.build()?;
396 let two = LogicalPlanBuilder::from(t2_scan).build()?;
397
398 let three_schema = Schema::new(vec![Field::new("t3a", DataType::UInt32, false)]);
399 let t3_scan = table_scan(Some("test3"), &three_schema, None)?.build()?;
400 let three = LogicalPlanBuilder::from(t3_scan).build()?;
401
402 let plan = LogicalPlanBuilder::from(one)
403 .union(two)?
404 .union(three)?
405 .build()?;
406
407 let expected = "Union\
408 \n TableScan: test2\
409 \n TableScan: test3";
410 assert_together_optimized_plan(plan, expected, true)
411 }
412
413 #[test]
414 fn propagate_union_alias() -> Result<()> {
415 let left = LogicalPlanBuilder::from(test_table_scan()?).build()?;
416 let right = LogicalPlanBuilder::from(test_table_scan_with_name("test2")?)
417 .filter(lit(false))?
418 .build()?;
419
420 let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;
421
422 let expected = "Projection: a, b, c\n TableScan: test";
423 assert_together_optimized_plan(plan, expected, true)
424 }
425
426 #[test]
427 fn cross_join_empty() -> Result<()> {
428 let table_scan = test_table_scan()?;
429 let left = LogicalPlanBuilder::from(table_scan).build()?;
430 let right = LogicalPlanBuilder::empty(false).build()?;
431
432 let plan = LogicalPlanBuilder::from(left)
433 .cross_join(right)?
434 .filter(col("a").lt_eq(lit(1i64)))?
435 .build()?;
436
437 let expected = "EmptyRelation";
438 assert_together_optimized_plan(plan, expected, true)
439 }
440
441 fn assert_empty_left_empty_right_lp(
442 left_empty: bool,
443 right_empty: bool,
444 join_type: JoinType,
445 eq: bool,
446 ) -> Result<()> {
447 let left_lp = if left_empty {
448 let left_table_scan = test_table_scan()?;
449
450 LogicalPlanBuilder::from(left_table_scan)
451 .filter(lit(false))?
452 .build()
453 } else {
454 let scan = test_table_scan_with_name("left").unwrap();
455 LogicalPlanBuilder::from(scan).build()
456 }?;
457
458 let right_lp = if right_empty {
459 let right_table_scan = test_table_scan_with_name("right")?;
460
461 LogicalPlanBuilder::from(right_table_scan)
462 .filter(lit(false))?
463 .build()
464 } else {
465 let scan = test_table_scan_with_name("right").unwrap();
466 LogicalPlanBuilder::from(scan).build()
467 }?;
468
469 let plan = LogicalPlanBuilder::from(left_lp)
470 .join_using(
471 right_lp,
472 join_type,
473 vec![Column::from_name("a".to_string())],
474 )?
475 .build()?;
476
477 let expected = "EmptyRelation";
478 assert_together_optimized_plan(plan, expected, eq)
479 }
480
481 fn assert_anti_join_empty_join_table_is_base_table(
483 anti_left_join: bool,
484 ) -> Result<()> {
485 let (left, right, join_type, expected) = if anti_left_join {
487 let left = test_table_scan()?;
488 let right = LogicalPlanBuilder::from(test_table_scan()?)
489 .filter(lit(false))?
490 .build()?;
491 let expected = left.display_indent().to_string();
492 (left, right, JoinType::LeftAnti, expected)
493 } else {
494 let right = test_table_scan()?;
495 let left = LogicalPlanBuilder::from(test_table_scan()?)
496 .filter(lit(false))?
497 .build()?;
498 let expected = right.display_indent().to_string();
499 (left, right, JoinType::RightAnti, expected)
500 };
501
502 let plan = LogicalPlanBuilder::from(left)
503 .join_using(right, join_type, vec![Column::from_name("a".to_string())])?
504 .build()?;
505
506 assert_together_optimized_plan(plan, &expected, true)
507 }
508
509 #[test]
510 fn test_join_empty_propagation_rules() -> Result<()> {
511 assert_empty_left_empty_right_lp(true, true, JoinType::Full, true)?;
513
514 assert_empty_left_empty_right_lp(true, false, JoinType::Left, true)?;
516
517 assert_empty_left_empty_right_lp(false, true, JoinType::Right, true)?;
519
520 assert_empty_left_empty_right_lp(true, false, JoinType::LeftSemi, true)?;
522
523 assert_empty_left_empty_right_lp(false, true, JoinType::LeftSemi, true)?;
525
526 assert_empty_left_empty_right_lp(true, false, JoinType::RightSemi, true)?;
528
529 assert_empty_left_empty_right_lp(false, true, JoinType::RightSemi, true)?;
531
532 assert_empty_left_empty_right_lp(true, false, JoinType::LeftAnti, true)?;
534
535 assert_empty_left_empty_right_lp(false, true, JoinType::RightAnti, true)?;
537
538 assert_anti_join_empty_join_table_is_base_table(true)?;
540
541 assert_anti_join_empty_join_table_is_base_table(false)
543 }
544
545 #[test]
546 fn test_join_empty_propagation_rules_noop() -> Result<()> {
547 assert_empty_left_empty_right_lp(false, true, JoinType::Left, false)?;
551
552 assert_empty_left_empty_right_lp(true, false, JoinType::Right, false)?;
554
555 assert_empty_left_empty_right_lp(false, false, JoinType::LeftSemi, false)?;
557
558 assert_empty_left_empty_right_lp(false, false, JoinType::RightSemi, false)?;
560
561 assert_empty_left_empty_right_lp(false, false, JoinType::LeftAnti, false)?;
563
564 assert_empty_left_empty_right_lp(false, true, JoinType::LeftAnti, false)?;
566
567 assert_empty_left_empty_right_lp(false, false, JoinType::RightAnti, false)?;
569
570 assert_empty_left_empty_right_lp(true, false, JoinType::RightAnti, false)
572 }
573
574 #[test]
575 fn test_empty_with_non_empty() -> Result<()> {
576 let table_scan = test_table_scan()?;
577
578 let fields = test_table_scan_fields();
579
580 let empty = LogicalPlan::EmptyRelation(EmptyRelation {
581 produce_one_row: false,
582 schema: Arc::new(DFSchema::from_unqualified_fields(
583 fields.into(),
584 Default::default(),
585 )?),
586 });
587
588 let one = LogicalPlanBuilder::from(empty.clone()).build()?;
589 let two = LogicalPlanBuilder::from(table_scan).build()?;
590 let three = LogicalPlanBuilder::from(empty).build()?;
591
592 let plan = LogicalPlanBuilder::from(one)
597 .union(two)?
598 .union(three)?
599 .build()?;
600
601 let expected = "Projection: a, b, c\
602 \n TableScan: test";
603
604 assert_together_optimized_plan(plan, expected, true)
605 }
606}