Skip to main content

datafusion_optimizer/simplify_expressions/
simplify_exprs.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Simplify expressions optimizer rule and implementation
19
20use std::sync::Arc;
21
22use datafusion_common::tree_node::{Transformed, TreeNode};
23use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result};
24use datafusion_expr::Expr;
25use datafusion_expr::logical_plan::LogicalPlan;
26use datafusion_expr::simplify::SimplifyContext;
27use datafusion_expr::utils::merge_schema;
28
29use crate::optimizer::ApplyOrder;
30use crate::utils::NamePreserver;
31use crate::{OptimizerConfig, OptimizerRule};
32
33use super::ExprSimplifier;
34
35/// Optimizer Pass that simplifies [`LogicalPlan`]s by rewriting
36/// [`Expr`]`s evaluating constants and applying algebraic
37/// simplifications
38///
39/// # Introduction
40/// It uses boolean algebra laws to simplify or reduce the number of terms in expressions.
41///
42/// # Example:
43/// `Filter: b > 2 AND b > 2`
44/// is optimized to
45/// `Filter: b > 2`
46///
47/// [`Expr`]: datafusion_expr::Expr
48#[derive(Default, Debug)]
49pub struct SimplifyExpressions {}
50
51impl OptimizerRule for SimplifyExpressions {
52    fn name(&self) -> &str {
53        "simplify_expressions"
54    }
55
56    fn apply_order(&self) -> Option<ApplyOrder> {
57        Some(ApplyOrder::BottomUp)
58    }
59
60    fn supports_rewrite(&self) -> bool {
61        true
62    }
63
64    fn rewrite(
65        &self,
66        plan: LogicalPlan,
67        config: &dyn OptimizerConfig,
68    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
69        Self::optimize_internal(plan, config)
70    }
71}
72
73impl SimplifyExpressions {
74    fn optimize_internal(
75        plan: LogicalPlan,
76        config: &dyn OptimizerConfig,
77    ) -> Result<Transformed<LogicalPlan>> {
78        let schema = if !plan.inputs().is_empty() {
79            DFSchemaRef::new(merge_schema(&plan.inputs()))
80        } else if let LogicalPlan::TableScan(scan) = &plan {
81            // When predicates are pushed into a table scan, there is no input
82            // schema to resolve predicates against, so it must be handled specially
83            //
84            // Note that this is not `plan.schema()` which is the *output*
85            // schema, and reflects any pushed down projection. The output schema
86            // will not contain columns that *only* appear in pushed down predicates
87            // (and no where else) in the plan.
88            //
89            // Thus, use the full schema of the inner provider without any
90            // projection applied for simplification
91            Arc::new(DFSchema::try_from_qualified_schema(
92                scan.table_name.clone(),
93                &scan.source.schema(),
94            )?)
95        } else {
96            Arc::new(DFSchema::empty())
97        };
98
99        let info = SimplifyContext::default()
100            .with_schema(schema)
101            .with_config_options(config.options())
102            .with_query_execution_start_time(config.query_execution_start_time());
103
104        // Inputs have already been rewritten (due to bottom-up traversal handled by Optimizer)
105        // Just need to rewrite our own expressions
106
107        let simplifier = ExprSimplifier::new(info);
108
109        // The left and right expressions in a Join on clause are not
110        // commutative, for reasons that are not entirely clear. Thus, do not
111        // reorder expressions in Join while simplifying.
112        //
113        // This is likely related to the fact that order of the columns must
114        // match the order of the children. see
115        // https://github.com/apache/datafusion/pull/8780 for more details
116        let simplifier = if let LogicalPlan::Join(_) = plan {
117            simplifier.with_canonicalize(false)
118        } else {
119            simplifier
120        };
121
122        // Preserve expression names to avoid changing the schema of the plan.
123        let name_preserver = NamePreserver::new(&plan);
124        let mut rewrite_expr = |expr: Expr| {
125            let name = name_preserver.save(&expr);
126            let expr = simplifier.simplify_with_cycle_count_transformed(expr)?.0;
127            Ok(Transformed::new_transformed(
128                name.restore(expr.data),
129                expr.transformed,
130            ))
131        };
132
133        plan.map_expressions(|expr| {
134            // Preserve the aliasing of grouping sets.
135            if let Expr::GroupingSet(_) = &expr {
136                expr.map_children(&mut rewrite_expr)
137            } else {
138                rewrite_expr(expr)
139            }
140        })
141    }
142}
143
144impl SimplifyExpressions {
145    #[expect(missing_docs)]
146    pub fn new() -> Self {
147        Self {}
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use std::ops::Not;
154
155    use arrow::datatypes::{DataType, Field, Schema};
156    use chrono::{DateTime, Utc};
157
158    use datafusion_expr::logical_plan::builder::table_scan_with_filters;
159    use datafusion_expr::logical_plan::table_scan;
160    use datafusion_expr::*;
161    use datafusion_functions_aggregate::expr_fn::{max, min};
162
163    use crate::OptimizerContext;
164    use crate::assert_optimized_plan_eq_snapshot;
165    use crate::test::{assert_fields_eq, test_table_scan_with_name};
166
167    use super::*;
168
169    fn test_table_scan() -> LogicalPlan {
170        let schema = Schema::new(vec![
171            Field::new("a", DataType::Boolean, false),
172            Field::new("b", DataType::Boolean, false),
173            Field::new("c", DataType::Boolean, false),
174            Field::new("d", DataType::UInt32, false),
175            Field::new("e", DataType::UInt32, true),
176        ]);
177        table_scan(Some("test"), &schema, None)
178            .expect("creating scan")
179            .build()
180            .expect("building plan")
181    }
182
183    macro_rules! assert_optimized_plan_equal {
184        (
185            $plan:expr,
186            @ $expected:literal $(,)?
187        ) => {{
188            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(SimplifyExpressions::new())];
189            let optimizer_ctx = OptimizerContext::new();
190            assert_optimized_plan_eq_snapshot!(
191                optimizer_ctx,
192                rules,
193                $plan,
194                @ $expected,
195            )
196        }};
197    }
198
199    #[test]
200    fn test_simplify_table_full_filter_in_scan() -> Result<()> {
201        let fields = vec![
202            Field::new("a", DataType::UInt32, false),
203            Field::new("b", DataType::UInt32, false),
204            Field::new("c", DataType::UInt32, false),
205        ];
206
207        let schema = Schema::new(fields);
208
209        let table_scan = table_scan_with_filters(
210            Some("test"),
211            &schema,
212            Some(vec![0]),
213            vec![col("b").is_not_null()],
214        )?
215        .build()?;
216        assert_eq!(1, table_scan.schema().fields().len());
217        assert_fields_eq(&table_scan, vec!["a"]);
218
219        assert_optimized_plan_equal!(
220            table_scan,
221            @ "TableScan: test projection=[a], full_filters=[Boolean(true)]"
222        )
223    }
224
225    #[test]
226    fn test_simplify_filter_pushdown() -> Result<()> {
227        let table_scan = test_table_scan();
228        let plan = LogicalPlanBuilder::from(table_scan)
229            .project(vec![col("a")])?
230            .filter(and(col("b").gt(lit(1)), col("b").gt(lit(1))))?
231            .build()?;
232
233        assert_optimized_plan_equal!(
234            plan,
235            @ r"
236        Filter: test.b > Int32(1)
237          Projection: test.a
238            TableScan: test
239        "
240        )
241    }
242
243    #[test]
244    fn test_simplify_optimized_plan() -> Result<()> {
245        let table_scan = test_table_scan();
246        let plan = LogicalPlanBuilder::from(table_scan)
247            .project(vec![col("a")])?
248            .filter(and(col("b").gt(lit(1)), col("b").gt(lit(1))))?
249            .build()?;
250
251        assert_optimized_plan_equal!(
252            plan,
253            @ r"
254        Filter: test.b > Int32(1)
255          Projection: test.a
256            TableScan: test
257        "
258        )
259    }
260
261    #[test]
262    fn test_simplify_optimized_plan_with_or() -> Result<()> {
263        let table_scan = test_table_scan();
264        let plan = LogicalPlanBuilder::from(table_scan)
265            .project(vec![col("a")])?
266            .filter(or(col("b").gt(lit(1)), col("b").gt(lit(1))))?
267            .build()?;
268
269        assert_optimized_plan_equal!(
270            plan,
271            @ r"
272        Filter: test.b > Int32(1)
273          Projection: test.a
274            TableScan: test
275        "
276        )
277    }
278
279    #[test]
280    fn test_simplify_optimized_plan_with_composed_and() -> Result<()> {
281        let table_scan = test_table_scan();
282        // ((c > 5) AND (d < 6)) AND (c > 5) --> (c > 5) AND (d < 6)
283        let plan = LogicalPlanBuilder::from(table_scan)
284            .project(vec![col("a"), col("b")])?
285            .filter(and(
286                and(col("a").gt(lit(5)), col("b").lt(lit(6))),
287                col("a").gt(lit(5)),
288            ))?
289            .build()?;
290
291        assert_optimized_plan_equal!(
292            plan,
293            @ r"
294        Filter: test.a > Int32(5) AND test.b < Int32(6)
295          Projection: test.a, test.b
296            TableScan: test
297        "
298        )
299    }
300
301    #[test]
302    fn test_simplify_optimized_plan_eq_expr() -> Result<()> {
303        let table_scan = test_table_scan();
304        let plan = LogicalPlanBuilder::from(table_scan)
305            .filter(col("b").eq(lit(true)))?
306            .filter(col("c").eq(lit(false)))?
307            .project(vec![col("a")])?
308            .build()?;
309
310        assert_optimized_plan_equal!(
311            plan,
312            @ r"
313        Projection: test.a
314          Filter: NOT test.c
315            Filter: test.b
316              TableScan: test
317        "
318        )
319    }
320
321    #[test]
322    fn test_simplify_optimized_plan_not_eq_expr() -> Result<()> {
323        let table_scan = test_table_scan();
324        let plan = LogicalPlanBuilder::from(table_scan)
325            .filter(col("b").not_eq(lit(true)))?
326            .filter(col("c").not_eq(lit(false)))?
327            .limit(0, Some(1))?
328            .project(vec![col("a")])?
329            .build()?;
330
331        assert_optimized_plan_equal!(
332            plan,
333            @ r"
334        Projection: test.a
335          Limit: skip=0, fetch=1
336            Filter: test.c
337              Filter: NOT test.b
338                TableScan: test
339        "
340        )
341    }
342
343    #[test]
344    fn test_simplify_optimized_plan_and_expr() -> Result<()> {
345        let table_scan = test_table_scan();
346        let plan = LogicalPlanBuilder::from(table_scan)
347            .filter(col("b").not_eq(lit(true)).and(col("c").eq(lit(true))))?
348            .project(vec![col("a")])?
349            .build()?;
350
351        assert_optimized_plan_equal!(
352            plan,
353            @ r"
354        Projection: test.a
355          Filter: NOT test.b AND test.c
356            TableScan: test
357        "
358        )
359    }
360
361    #[test]
362    fn test_simplify_optimized_plan_or_expr() -> Result<()> {
363        let table_scan = test_table_scan();
364        let plan = LogicalPlanBuilder::from(table_scan)
365            .filter(col("b").not_eq(lit(true)).or(col("c").eq(lit(false))))?
366            .project(vec![col("a")])?
367            .build()?;
368
369        assert_optimized_plan_equal!(
370            plan,
371            @ r"
372        Projection: test.a
373          Filter: NOT test.b OR NOT test.c
374            TableScan: test
375        "
376        )
377    }
378
379    #[test]
380    fn test_simplify_optimized_plan_not_expr() -> Result<()> {
381        let table_scan = test_table_scan();
382        let plan = LogicalPlanBuilder::from(table_scan)
383            .filter(col("b").eq(lit(false)).not())?
384            .project(vec![col("a")])?
385            .build()?;
386
387        assert_optimized_plan_equal!(
388            plan,
389            @ r"
390        Projection: test.a
391          Filter: test.b
392            TableScan: test
393        "
394        )
395    }
396
397    #[test]
398    fn test_simplify_optimized_plan_support_projection() -> Result<()> {
399        let table_scan = test_table_scan();
400        let plan = LogicalPlanBuilder::from(table_scan)
401            .project(vec![col("a"), col("d"), col("b").eq(lit(false))])?
402            .build()?;
403
404        assert_optimized_plan_equal!(
405            plan,
406            @ r"
407        Projection: test.a, test.d, NOT test.b AS test.b = Boolean(false)
408          TableScan: test
409        "
410        )
411    }
412
413    #[test]
414    fn test_simplify_optimized_plan_support_aggregate() -> Result<()> {
415        let table_scan = test_table_scan();
416        let plan = LogicalPlanBuilder::from(table_scan)
417            .project(vec![col("a"), col("c"), col("b")])?
418            .aggregate(
419                vec![col("a"), col("c")],
420                vec![max(col("b").eq(lit(true))), min(col("b"))],
421            )?
422            .build()?;
423
424        assert_optimized_plan_equal!(
425            plan,
426            @ r"
427        Aggregate: groupBy=[[test.a, test.c]], aggr=[[max(test.b) AS max(test.b = Boolean(true)), min(test.b)]]
428          Projection: test.a, test.c, test.b
429            TableScan: test
430        "
431        )
432    }
433
434    #[test]
435    fn test_simplify_optimized_plan_support_values() -> Result<()> {
436        let expr1 = Expr::BinaryExpr(BinaryExpr::new(
437            Box::new(lit(1)),
438            Operator::Plus,
439            Box::new(lit(2)),
440        ));
441        let expr2 = Expr::BinaryExpr(BinaryExpr::new(
442            Box::new(lit(2)),
443            Operator::Minus,
444            Box::new(lit(1)),
445        ));
446        let values = vec![vec![expr1, expr2]];
447        let plan = LogicalPlanBuilder::values(values)?.build()?;
448
449        assert_optimized_plan_equal!(
450            plan,
451            @ "Values: (Int32(3) AS Int32(1) + Int32(2), Int32(1) AS Int32(2) - Int32(1))"
452        )
453    }
454
455    fn get_optimized_plan_formatted(
456        plan: LogicalPlan,
457        date_time: &DateTime<Utc>,
458    ) -> String {
459        let config = OptimizerContext::new().with_query_execution_start_time(*date_time);
460        let rule = SimplifyExpressions::new();
461
462        let optimized_plan = rule.rewrite(plan, &config).unwrap().data;
463        format!("{optimized_plan}")
464    }
465
466    #[test]
467    fn cast_expr() -> Result<()> {
468        let table_scan = test_table_scan();
469        let proj = vec![Expr::Cast(Cast::new(Box::new(lit("0")), DataType::Int32))];
470        let plan = LogicalPlanBuilder::from(table_scan)
471            .project(proj)?
472            .build()?;
473
474        let expected = "Projection: Int32(0) AS Utf8(\"0\")\
475            \n  TableScan: test";
476        let actual = get_optimized_plan_formatted(plan, &Utc::now());
477        assert_eq!(expected, actual);
478        Ok(())
479    }
480
481    #[test]
482    fn simplify_and_eval() -> Result<()> {
483        // demonstrate a case where the evaluation needs to run prior
484        // to the simplifier for it to work
485        let table_scan = test_table_scan();
486        let time = Utc::now();
487        // (true or false) != col --> !col
488        let proj = vec![lit(true).or(lit(false)).not_eq(col("a"))];
489        let plan = LogicalPlanBuilder::from(table_scan)
490            .project(proj)?
491            .build()?;
492
493        let actual = get_optimized_plan_formatted(plan, &time);
494        let expected = "Projection: NOT test.a AS Boolean(true) OR Boolean(false) != test.a\
495                        \n  TableScan: test";
496
497        assert_eq!(expected, actual);
498        Ok(())
499    }
500
501    #[test]
502    fn simplify_not_binary() -> Result<()> {
503        let table_scan = test_table_scan();
504
505        let plan = LogicalPlanBuilder::from(table_scan)
506            .filter(col("d").gt(lit(10)).not())?
507            .build()?;
508
509        assert_optimized_plan_equal!(
510            plan,
511            @ r"
512        Filter: test.d <= Int32(10)
513          TableScan: test
514        "
515        )
516    }
517
518    #[test]
519    fn simplify_not_bool_and() -> Result<()> {
520        let table_scan = test_table_scan();
521
522        let plan = LogicalPlanBuilder::from(table_scan)
523            .filter(col("d").gt(lit(10)).and(col("d").lt(lit(100))).not())?
524            .build()?;
525
526        assert_optimized_plan_equal!(
527            plan,
528            @ r"
529        Filter: test.d <= Int32(10) OR test.d >= Int32(100)
530          TableScan: test
531        "
532        )
533    }
534
535    #[test]
536    fn simplify_not_bool_or() -> Result<()> {
537        let table_scan = test_table_scan();
538
539        let plan = LogicalPlanBuilder::from(table_scan)
540            .filter(col("d").gt(lit(10)).or(col("d").lt(lit(100))).not())?
541            .build()?;
542
543        assert_optimized_plan_equal!(
544            plan,
545            @ r"
546        Filter: test.d <= Int32(10) AND test.d >= Int32(100)
547          TableScan: test
548        "
549        )
550    }
551
552    #[test]
553    fn simplify_not_not() -> Result<()> {
554        let table_scan = test_table_scan();
555
556        let plan = LogicalPlanBuilder::from(table_scan)
557            .filter(col("d").gt(lit(10)).not().not())?
558            .build()?;
559
560        assert_optimized_plan_equal!(
561            plan,
562            @ r"
563        Filter: test.d > Int32(10)
564          TableScan: test
565        "
566        )
567    }
568
569    #[test]
570    fn simplify_not_null() -> Result<()> {
571        let table_scan = test_table_scan();
572
573        let plan = LogicalPlanBuilder::from(table_scan)
574            .filter(col("e").is_null().not())?
575            .build()?;
576
577        assert_optimized_plan_equal!(
578            plan,
579            @ r"
580        Filter: test.e IS NOT NULL
581          TableScan: test
582        "
583        )
584    }
585
586    #[test]
587    fn simplify_not_not_null() -> Result<()> {
588        let table_scan = test_table_scan();
589
590        let plan = LogicalPlanBuilder::from(table_scan)
591            .filter(col("e").is_not_null().not())?
592            .build()?;
593
594        assert_optimized_plan_equal!(
595            plan,
596            @ r"
597        Filter: test.e IS NULL
598          TableScan: test
599        "
600        )
601    }
602
603    #[test]
604    fn simplify_not_in() -> Result<()> {
605        let table_scan = test_table_scan();
606
607        let plan = LogicalPlanBuilder::from(table_scan)
608            .filter(col("d").in_list(vec![lit(1), lit(2), lit(3)], false).not())?
609            .build()?;
610
611        assert_optimized_plan_equal!(
612            plan,
613            @ r"
614        Filter: test.d != Int32(1) AND test.d != Int32(2) AND test.d != Int32(3)
615          TableScan: test
616        "
617        )
618    }
619
620    #[test]
621    fn simplify_not_not_in() -> Result<()> {
622        let table_scan = test_table_scan();
623
624        let plan = LogicalPlanBuilder::from(table_scan)
625            .filter(col("d").in_list(vec![lit(1), lit(2), lit(3)], true).not())?
626            .build()?;
627
628        assert_optimized_plan_equal!(
629            plan,
630            @ r"
631        Filter: test.d = Int32(1) OR test.d = Int32(2) OR test.d = Int32(3)
632          TableScan: test
633        "
634        )
635    }
636
637    #[test]
638    fn simplify_not_between() -> Result<()> {
639        let table_scan = test_table_scan();
640        let qual = col("d").between(lit(1), lit(10));
641
642        let plan = LogicalPlanBuilder::from(table_scan)
643            .filter(qual.not())?
644            .build()?;
645
646        assert_optimized_plan_equal!(
647            plan,
648            @ r"
649        Filter: test.d < Int32(1) OR test.d > Int32(10)
650          TableScan: test
651        "
652        )
653    }
654
655    #[test]
656    fn simplify_not_not_between() -> Result<()> {
657        let table_scan = test_table_scan();
658        let qual = col("d").not_between(lit(1), lit(10));
659
660        let plan = LogicalPlanBuilder::from(table_scan)
661            .filter(qual.not())?
662            .build()?;
663
664        assert_optimized_plan_equal!(
665            plan,
666            @ r"
667        Filter: test.d >= Int32(1) AND test.d <= Int32(10)
668          TableScan: test
669        "
670        )
671    }
672
673    #[test]
674    fn simplify_not_like() -> Result<()> {
675        let schema = Schema::new(vec![
676            Field::new("a", DataType::Utf8, false),
677            Field::new("b", DataType::Utf8, false),
678        ]);
679        let table_scan = table_scan(Some("test"), &schema, None)
680            .expect("creating scan")
681            .build()
682            .expect("building plan");
683
684        let plan = LogicalPlanBuilder::from(table_scan)
685            .filter(col("a").like(col("b")).not())?
686            .build()?;
687
688        assert_optimized_plan_equal!(
689            plan,
690            @ r"
691        Filter: test.a NOT LIKE test.b
692          TableScan: test
693        "
694        )
695    }
696
697    #[test]
698    fn simplify_not_not_like() -> Result<()> {
699        let schema = Schema::new(vec![
700            Field::new("a", DataType::Utf8, false),
701            Field::new("b", DataType::Utf8, false),
702        ]);
703        let table_scan = table_scan(Some("test"), &schema, None)
704            .expect("creating scan")
705            .build()
706            .expect("building plan");
707
708        let plan = LogicalPlanBuilder::from(table_scan)
709            .filter(col("a").not_like(col("b")).not())?
710            .build()?;
711
712        assert_optimized_plan_equal!(
713            plan,
714            @ r"
715        Filter: test.a LIKE test.b
716          TableScan: test
717        "
718        )
719    }
720
721    #[test]
722    fn simplify_not_ilike() -> Result<()> {
723        let schema = Schema::new(vec![
724            Field::new("a", DataType::Utf8, false),
725            Field::new("b", DataType::Utf8, false),
726        ]);
727        let table_scan = table_scan(Some("test"), &schema, None)
728            .expect("creating scan")
729            .build()
730            .expect("building plan");
731
732        let plan = LogicalPlanBuilder::from(table_scan)
733            .filter(col("a").ilike(col("b")).not())?
734            .build()?;
735
736        assert_optimized_plan_equal!(
737            plan,
738            @ r"
739        Filter: test.a NOT ILIKE test.b
740          TableScan: test
741        "
742        )
743    }
744
745    #[test]
746    fn simplify_not_distinct_from() -> Result<()> {
747        let table_scan = test_table_scan();
748
749        let plan = LogicalPlanBuilder::from(table_scan)
750            .filter(binary_expr(col("d"), Operator::IsDistinctFrom, lit(10)).not())?
751            .build()?;
752
753        assert_optimized_plan_equal!(
754            plan,
755            @ r"
756        Filter: test.d IS NOT DISTINCT FROM Int32(10)
757          TableScan: test
758        "
759        )
760    }
761
762    #[test]
763    fn simplify_not_not_distinct_from() -> Result<()> {
764        let table_scan = test_table_scan();
765
766        let plan = LogicalPlanBuilder::from(table_scan)
767            .filter(binary_expr(col("d"), Operator::IsNotDistinctFrom, lit(10)).not())?
768            .build()?;
769
770        assert_optimized_plan_equal!(
771            plan,
772            @ r"
773        Filter: test.d IS DISTINCT FROM Int32(10)
774          TableScan: test
775        "
776        )
777    }
778
779    #[test]
780    fn simplify_equijoin_predicate() -> Result<()> {
781        let t1 = test_table_scan_with_name("t1")?;
782        let t2 = test_table_scan_with_name("t2")?;
783
784        let left_key = col("t1.a") + lit(1i64).cast_to(&DataType::UInt32, t1.schema())?;
785        let right_key =
786            col("t2.a") + lit(2i64).cast_to(&DataType::UInt32, t2.schema())?;
787        let plan = LogicalPlanBuilder::from(t1)
788            .join_with_expr_keys(
789                t2,
790                JoinType::Inner,
791                (vec![left_key], vec![right_key]),
792                None,
793            )?
794            .build()?;
795
796        // before simplify: t1.a + CAST(Int64(1), UInt32) = t2.a + CAST(Int64(2), UInt32)
797        // after simplify: t1.a + UInt32(1) = t2.a + UInt32(2) AS t1.a + Int64(1) = t2.a + Int64(2)
798        assert_optimized_plan_equal!(
799            plan,
800            @ r"
801        Inner Join: t1.a + UInt32(1) = t2.a + UInt32(2)
802          TableScan: t1
803          TableScan: t2
804        "
805        )
806    }
807
808    #[test]
809    fn simplify_is_not_null() -> Result<()> {
810        let table_scan = test_table_scan();
811
812        let plan = LogicalPlanBuilder::from(table_scan)
813            .filter(col("d").is_not_null())?
814            .build()?;
815
816        assert_optimized_plan_equal!(
817            plan,
818            @ r"
819        Filter: Boolean(true)
820          TableScan: test
821        "
822        )
823    }
824
825    #[test]
826    fn simplify_is_null() -> Result<()> {
827        let table_scan = test_table_scan();
828
829        let plan = LogicalPlanBuilder::from(table_scan)
830            .filter(col("d").is_null())?
831            .build()?;
832
833        assert_optimized_plan_equal!(
834            plan,
835            @ r"
836        Filter: Boolean(false)
837          TableScan: test
838        "
839        )
840    }
841
842    #[test]
843    fn simplify_grouping_sets() -> Result<()> {
844        let table_scan = test_table_scan();
845        let plan = LogicalPlanBuilder::from(table_scan)
846            .aggregate(
847                [grouping_set(vec![
848                    vec![(lit(42).alias("prev") + lit(1)).alias("age"), col("a")],
849                    vec![col("a").or(col("b")).and(lit(1).lt(lit(0))).alias("cond")],
850                    vec![col("d").alias("e"), (lit(1) + lit(2))],
851                ])],
852                [] as [Expr; 0],
853            )?
854            .build()?;
855
856        assert_optimized_plan_equal!(
857            plan,
858            @ r"
859        Aggregate: groupBy=[[GROUPING SETS ((Int32(43) AS age, test.a), (Boolean(false) AS cond), (test.d AS e, Int32(3) AS Int32(1) + Int32(2)))]], aggr=[[]]
860          TableScan: test
861        "
862        )
863    }
864
865    #[test]
866    fn test_simplify_regex_special_cases() -> Result<()> {
867        let schema = Schema::new(vec![
868            Field::new("a", DataType::Utf8, true),
869            Field::new("b", DataType::Utf8, false),
870        ]);
871        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
872
873        // Test `= ".*"` transforms to true (except for empty strings)
874        let plan = LogicalPlanBuilder::from(table_scan.clone())
875            .filter(binary_expr(col("a"), Operator::RegexMatch, lit(".*")))?
876            .build()?;
877
878        assert_optimized_plan_equal!(
879            plan,
880            @ r"
881        Filter: test.a IS NOT NULL
882          TableScan: test
883        "
884        )?;
885
886        // Test `!= ".*"` transforms to checking if the column is empty
887        let plan = LogicalPlanBuilder::from(table_scan.clone())
888            .filter(binary_expr(col("a"), Operator::RegexNotMatch, lit(".*")))?
889            .build()?;
890
891        assert_optimized_plan_equal!(
892            plan,
893            @ r#"
894        Filter: test.a = Utf8("")
895          TableScan: test
896        "#
897        )?;
898
899        // Test case-insensitive versions
900
901        // Test `=~ ".*"` (case-insensitive) transforms to true (except for empty strings)
902        let plan = LogicalPlanBuilder::from(table_scan.clone())
903            .filter(binary_expr(col("b"), Operator::RegexIMatch, lit(".*")))?
904            .build()?;
905
906        assert_optimized_plan_equal!(
907            plan,
908            @ r"
909        Filter: Boolean(true)
910          TableScan: test
911        "
912        )?;
913
914        // Test `!~ ".*"` (case-insensitive) transforms to checking if the column is empty
915        let plan = LogicalPlanBuilder::from(table_scan.clone())
916            .filter(binary_expr(col("a"), Operator::RegexNotIMatch, lit(".*")))?
917            .build()?;
918
919        assert_optimized_plan_equal!(
920            plan,
921            @ r#"
922        Filter: test.a = Utf8("")
923          TableScan: test
924        "#
925        )
926    }
927
928    #[test]
929    fn simplify_not_in_list() -> Result<()> {
930        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
931        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
932
933        let plan = LogicalPlanBuilder::from(table_scan)
934            .filter(col("a").in_list(vec![lit("a"), lit("b")], false).not())?
935            .build()?;
936
937        assert_optimized_plan_equal!(
938            plan,
939            @ r#"
940        Filter: test.a != Utf8("a") AND test.a != Utf8("b")
941          TableScan: test
942        "#
943        )
944    }
945
946    #[test]
947    fn simplify_not_not_in_list() -> Result<()> {
948        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
949        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
950
951        let plan = LogicalPlanBuilder::from(table_scan)
952            .filter(
953                col("a")
954                    .in_list(vec![lit("a"), lit("b")], false)
955                    .not()
956                    .not(),
957            )?
958            .build()?;
959
960        assert_optimized_plan_equal!(
961            plan,
962            @ r#"
963        Filter: test.a = Utf8("a") OR test.a = Utf8("b")
964          TableScan: test
965        "#
966        )
967    }
968
969    #[test]
970    fn simplify_not_exists() -> Result<()> {
971        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
972        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
973        let table_scan2 =
974            datafusion_expr::table_scan(Some("test2"), &schema, None)?.build()?;
975
976        let plan = LogicalPlanBuilder::from(table_scan)
977            .filter(
978                exists(Arc::new(LogicalPlanBuilder::from(table_scan2).build()?)).not(),
979            )?
980            .build()?;
981
982        assert_optimized_plan_equal!(
983            plan,
984            @ r"
985        Filter: NOT EXISTS (<subquery>)
986          Subquery:
987            TableScan: test2
988          TableScan: test
989        "
990        )
991    }
992
993    #[test]
994    fn simplify_not_not_exists() -> Result<()> {
995        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
996        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
997        let table_scan2 =
998            datafusion_expr::table_scan(Some("test2"), &schema, None)?.build()?;
999
1000        let plan = LogicalPlanBuilder::from(table_scan)
1001            .filter(
1002                exists(Arc::new(LogicalPlanBuilder::from(table_scan2).build()?))
1003                    .not()
1004                    .not(),
1005            )?
1006            .build()?;
1007
1008        assert_optimized_plan_equal!(
1009            plan,
1010            @ r"
1011        Filter: EXISTS (<subquery>)
1012          Subquery:
1013            TableScan: test2
1014          TableScan: test
1015        "
1016        )
1017    }
1018
1019    #[test]
1020    fn simplify_not_in_subquery() -> Result<()> {
1021        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
1022        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
1023        let table_scan2 =
1024            datafusion_expr::table_scan(Some("test2"), &schema, None)?.build()?;
1025
1026        let plan = LogicalPlanBuilder::from(table_scan)
1027            .filter(
1028                in_subquery(
1029                    col("a"),
1030                    Arc::new(LogicalPlanBuilder::from(table_scan2).build()?),
1031                )
1032                .not(),
1033            )?
1034            .build()?;
1035
1036        assert_optimized_plan_equal!(
1037            plan,
1038            @ r"
1039        Filter: test.a NOT IN (<subquery>)
1040          Subquery:
1041            TableScan: test2
1042          TableScan: test
1043        "
1044        )
1045    }
1046
1047    #[test]
1048    fn simplify_not_not_in_subquery() -> Result<()> {
1049        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
1050        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
1051        let table_scan2 =
1052            datafusion_expr::table_scan(Some("test2"), &schema, None)?.build()?;
1053
1054        let plan = LogicalPlanBuilder::from(table_scan)
1055            .filter(
1056                in_subquery(
1057                    col("a"),
1058                    Arc::new(LogicalPlanBuilder::from(table_scan2).build()?),
1059                )
1060                .not()
1061                .not(),
1062            )?
1063            .build()?;
1064
1065        assert_optimized_plan_equal!(
1066            plan,
1067            @ r"
1068        Filter: test.a IN (<subquery>)
1069          Subquery:
1070            TableScan: test2
1071          TableScan: test
1072        "
1073        )
1074    }
1075}