datafusion_optimizer/simplify_expressions/
simplify_exprs.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Simplify expressions optimizer rule and implementation
19
20use std::sync::Arc;
21
22use datafusion_common::tree_node::{Transformed, TreeNode};
23use datafusion_common::{DFSchema, DFSchemaRef, DataFusionError, Result};
24use datafusion_expr::Expr;
25use datafusion_expr::execution_props::ExecutionProps;
26use datafusion_expr::logical_plan::LogicalPlan;
27use datafusion_expr::simplify::SimplifyContext;
28use datafusion_expr::utils::merge_schema;
29
30use crate::optimizer::ApplyOrder;
31use crate::utils::NamePreserver;
32use crate::{OptimizerConfig, OptimizerRule};
33
34use super::ExprSimplifier;
35
36/// Optimizer Pass that simplifies [`LogicalPlan`]s by rewriting
37/// [`Expr`]`s evaluating constants and applying algebraic
38/// simplifications
39///
40/// # Introduction
41/// It uses boolean algebra laws to simplify or reduce the number of terms in expressions.
42///
43/// # Example:
44/// `Filter: b > 2 AND b > 2`
45/// is optimized to
46/// `Filter: b > 2`
47///
48/// [`Expr`]: datafusion_expr::Expr
49#[derive(Default, Debug)]
50pub struct SimplifyExpressions {}
51
52impl OptimizerRule for SimplifyExpressions {
53    fn name(&self) -> &str {
54        "simplify_expressions"
55    }
56
57    fn apply_order(&self) -> Option<ApplyOrder> {
58        Some(ApplyOrder::BottomUp)
59    }
60
61    fn supports_rewrite(&self) -> bool {
62        true
63    }
64
65    fn rewrite(
66        &self,
67        plan: LogicalPlan,
68        config: &dyn OptimizerConfig,
69    ) -> Result<Transformed<LogicalPlan>, DataFusionError> {
70        let mut execution_props = ExecutionProps::new();
71        execution_props.query_execution_start_time = config.query_execution_start_time();
72        execution_props.config_options = Some(config.options());
73        Self::optimize_internal(plan, &execution_props)
74    }
75}
76
77impl SimplifyExpressions {
78    fn optimize_internal(
79        plan: LogicalPlan,
80        execution_props: &ExecutionProps,
81    ) -> Result<Transformed<LogicalPlan>> {
82        let schema = if !plan.inputs().is_empty() {
83            DFSchemaRef::new(merge_schema(&plan.inputs()))
84        } else if let LogicalPlan::TableScan(scan) = &plan {
85            // When predicates are pushed into a table scan, there is no input
86            // schema to resolve predicates against, so it must be handled specially
87            //
88            // Note that this is not `plan.schema()` which is the *output*
89            // schema, and reflects any pushed down projection. The output schema
90            // will not contain columns that *only* appear in pushed down predicates
91            // (and no where else) in the plan.
92            //
93            // Thus, use the full schema of the inner provider without any
94            // projection applied for simplification
95            Arc::new(DFSchema::try_from_qualified_schema(
96                scan.table_name.clone(),
97                &scan.source.schema(),
98            )?)
99        } else {
100            Arc::new(DFSchema::empty())
101        };
102
103        let info = SimplifyContext::new(execution_props).with_schema(schema);
104
105        // Inputs have already been rewritten (due to bottom-up traversal handled by Optimizer)
106        // Just need to rewrite our own expressions
107
108        let simplifier = ExprSimplifier::new(info);
109
110        // The left and right expressions in a Join on clause are not
111        // commutative, for reasons that are not entirely clear. Thus, do not
112        // reorder expressions in Join while simplifying.
113        //
114        // This is likely related to the fact that order of the columns must
115        // match the order of the children. see
116        // https://github.com/apache/datafusion/pull/8780 for more details
117        let simplifier = if let LogicalPlan::Join(_) = plan {
118            simplifier.with_canonicalize(false)
119        } else {
120            simplifier
121        };
122
123        // Preserve expression names to avoid changing the schema of the plan.
124        let name_preserver = NamePreserver::new(&plan);
125        let mut rewrite_expr = |expr: Expr| {
126            let name = name_preserver.save(&expr);
127            let expr = simplifier.simplify_with_cycle_count_transformed(expr)?.0;
128            Ok(Transformed::new_transformed(
129                name.restore(expr.data),
130                expr.transformed,
131            ))
132        };
133
134        plan.map_expressions(|expr| {
135            // Preserve the aliasing of grouping sets.
136            if let Expr::GroupingSet(_) = &expr {
137                expr.map_children(&mut rewrite_expr)
138            } else {
139                rewrite_expr(expr)
140            }
141        })
142    }
143}
144
145impl SimplifyExpressions {
146    #[expect(missing_docs)]
147    pub fn new() -> Self {
148        Self {}
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use std::ops::Not;
155
156    use arrow::datatypes::{DataType, Field, Schema};
157    use chrono::{DateTime, Utc};
158
159    use datafusion_expr::logical_plan::builder::table_scan_with_filters;
160    use datafusion_expr::logical_plan::table_scan;
161    use datafusion_expr::*;
162    use datafusion_functions_aggregate::expr_fn::{max, min};
163
164    use crate::OptimizerContext;
165    use crate::assert_optimized_plan_eq_snapshot;
166    use crate::test::{assert_fields_eq, test_table_scan_with_name};
167
168    use super::*;
169
170    fn test_table_scan() -> LogicalPlan {
171        let schema = Schema::new(vec![
172            Field::new("a", DataType::Boolean, false),
173            Field::new("b", DataType::Boolean, false),
174            Field::new("c", DataType::Boolean, false),
175            Field::new("d", DataType::UInt32, false),
176            Field::new("e", DataType::UInt32, true),
177        ]);
178        table_scan(Some("test"), &schema, None)
179            .expect("creating scan")
180            .build()
181            .expect("building plan")
182    }
183
184    macro_rules! assert_optimized_plan_equal {
185        (
186            $plan:expr,
187            @ $expected:literal $(,)?
188        ) => {{
189            let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(SimplifyExpressions::new())];
190            let optimizer_ctx = OptimizerContext::new();
191            assert_optimized_plan_eq_snapshot!(
192                optimizer_ctx,
193                rules,
194                $plan,
195                @ $expected,
196            )
197        }};
198    }
199
200    #[test]
201    fn test_simplify_table_full_filter_in_scan() -> Result<()> {
202        let fields = vec![
203            Field::new("a", DataType::UInt32, false),
204            Field::new("b", DataType::UInt32, false),
205            Field::new("c", DataType::UInt32, false),
206        ];
207
208        let schema = Schema::new(fields);
209
210        let table_scan = table_scan_with_filters(
211            Some("test"),
212            &schema,
213            Some(vec![0]),
214            vec![col("b").is_not_null()],
215        )?
216        .build()?;
217        assert_eq!(1, table_scan.schema().fields().len());
218        assert_fields_eq(&table_scan, vec!["a"]);
219
220        assert_optimized_plan_equal!(
221            table_scan,
222            @ "TableScan: test projection=[a], full_filters=[Boolean(true)]"
223        )
224    }
225
226    #[test]
227    fn test_simplify_filter_pushdown() -> Result<()> {
228        let table_scan = test_table_scan();
229        let plan = LogicalPlanBuilder::from(table_scan)
230            .project(vec![col("a")])?
231            .filter(and(col("b").gt(lit(1)), col("b").gt(lit(1))))?
232            .build()?;
233
234        assert_optimized_plan_equal!(
235            plan,
236            @ r"
237        Filter: test.b > Int32(1)
238          Projection: test.a
239            TableScan: test
240        "
241        )
242    }
243
244    #[test]
245    fn test_simplify_optimized_plan() -> Result<()> {
246        let table_scan = test_table_scan();
247        let plan = LogicalPlanBuilder::from(table_scan)
248            .project(vec![col("a")])?
249            .filter(and(col("b").gt(lit(1)), col("b").gt(lit(1))))?
250            .build()?;
251
252        assert_optimized_plan_equal!(
253            plan,
254            @ r"
255        Filter: test.b > Int32(1)
256          Projection: test.a
257            TableScan: test
258        "
259        )
260    }
261
262    #[test]
263    fn test_simplify_optimized_plan_with_or() -> Result<()> {
264        let table_scan = test_table_scan();
265        let plan = LogicalPlanBuilder::from(table_scan)
266            .project(vec![col("a")])?
267            .filter(or(col("b").gt(lit(1)), col("b").gt(lit(1))))?
268            .build()?;
269
270        assert_optimized_plan_equal!(
271            plan,
272            @ r"
273        Filter: test.b > Int32(1)
274          Projection: test.a
275            TableScan: test
276        "
277        )
278    }
279
280    #[test]
281    fn test_simplify_optimized_plan_with_composed_and() -> Result<()> {
282        let table_scan = test_table_scan();
283        // ((c > 5) AND (d < 6)) AND (c > 5) --> (c > 5) AND (d < 6)
284        let plan = LogicalPlanBuilder::from(table_scan)
285            .project(vec![col("a"), col("b")])?
286            .filter(and(
287                and(col("a").gt(lit(5)), col("b").lt(lit(6))),
288                col("a").gt(lit(5)),
289            ))?
290            .build()?;
291
292        assert_optimized_plan_equal!(
293            plan,
294            @ r"
295        Filter: test.a > Int32(5) AND test.b < Int32(6)
296          Projection: test.a, test.b
297            TableScan: test
298        "
299        )
300    }
301
302    #[test]
303    fn test_simplify_optimized_plan_eq_expr() -> Result<()> {
304        let table_scan = test_table_scan();
305        let plan = LogicalPlanBuilder::from(table_scan)
306            .filter(col("b").eq(lit(true)))?
307            .filter(col("c").eq(lit(false)))?
308            .project(vec![col("a")])?
309            .build()?;
310
311        assert_optimized_plan_equal!(
312            plan,
313            @ r"
314        Projection: test.a
315          Filter: NOT test.c
316            Filter: test.b
317              TableScan: test
318        "
319        )
320    }
321
322    #[test]
323    fn test_simplify_optimized_plan_not_eq_expr() -> Result<()> {
324        let table_scan = test_table_scan();
325        let plan = LogicalPlanBuilder::from(table_scan)
326            .filter(col("b").not_eq(lit(true)))?
327            .filter(col("c").not_eq(lit(false)))?
328            .limit(0, Some(1))?
329            .project(vec![col("a")])?
330            .build()?;
331
332        assert_optimized_plan_equal!(
333            plan,
334            @ r"
335        Projection: test.a
336          Limit: skip=0, fetch=1
337            Filter: test.c
338              Filter: NOT test.b
339                TableScan: test
340        "
341        )
342    }
343
344    #[test]
345    fn test_simplify_optimized_plan_and_expr() -> Result<()> {
346        let table_scan = test_table_scan();
347        let plan = LogicalPlanBuilder::from(table_scan)
348            .filter(col("b").not_eq(lit(true)).and(col("c").eq(lit(true))))?
349            .project(vec![col("a")])?
350            .build()?;
351
352        assert_optimized_plan_equal!(
353            plan,
354            @ r"
355        Projection: test.a
356          Filter: NOT test.b AND test.c
357            TableScan: test
358        "
359        )
360    }
361
362    #[test]
363    fn test_simplify_optimized_plan_or_expr() -> Result<()> {
364        let table_scan = test_table_scan();
365        let plan = LogicalPlanBuilder::from(table_scan)
366            .filter(col("b").not_eq(lit(true)).or(col("c").eq(lit(false))))?
367            .project(vec![col("a")])?
368            .build()?;
369
370        assert_optimized_plan_equal!(
371            plan,
372            @ r"
373        Projection: test.a
374          Filter: NOT test.b OR NOT test.c
375            TableScan: test
376        "
377        )
378    }
379
380    #[test]
381    fn test_simplify_optimized_plan_not_expr() -> Result<()> {
382        let table_scan = test_table_scan();
383        let plan = LogicalPlanBuilder::from(table_scan)
384            .filter(col("b").eq(lit(false)).not())?
385            .project(vec![col("a")])?
386            .build()?;
387
388        assert_optimized_plan_equal!(
389            plan,
390            @ r"
391        Projection: test.a
392          Filter: test.b
393            TableScan: test
394        "
395        )
396    }
397
398    #[test]
399    fn test_simplify_optimized_plan_support_projection() -> Result<()> {
400        let table_scan = test_table_scan();
401        let plan = LogicalPlanBuilder::from(table_scan)
402            .project(vec![col("a"), col("d"), col("b").eq(lit(false))])?
403            .build()?;
404
405        assert_optimized_plan_equal!(
406            plan,
407            @ r"
408        Projection: test.a, test.d, NOT test.b AS test.b = Boolean(false)
409          TableScan: test
410        "
411        )
412    }
413
414    #[test]
415    fn test_simplify_optimized_plan_support_aggregate() -> Result<()> {
416        let table_scan = test_table_scan();
417        let plan = LogicalPlanBuilder::from(table_scan)
418            .project(vec![col("a"), col("c"), col("b")])?
419            .aggregate(
420                vec![col("a"), col("c")],
421                vec![max(col("b").eq(lit(true))), min(col("b"))],
422            )?
423            .build()?;
424
425        assert_optimized_plan_equal!(
426            plan,
427            @ r"
428        Aggregate: groupBy=[[test.a, test.c]], aggr=[[max(test.b) AS max(test.b = Boolean(true)), min(test.b)]]
429          Projection: test.a, test.c, test.b
430            TableScan: test
431        "
432        )
433    }
434
435    #[test]
436    fn test_simplify_optimized_plan_support_values() -> Result<()> {
437        let expr1 = Expr::BinaryExpr(BinaryExpr::new(
438            Box::new(lit(1)),
439            Operator::Plus,
440            Box::new(lit(2)),
441        ));
442        let expr2 = Expr::BinaryExpr(BinaryExpr::new(
443            Box::new(lit(2)),
444            Operator::Minus,
445            Box::new(lit(1)),
446        ));
447        let values = vec![vec![expr1, expr2]];
448        let plan = LogicalPlanBuilder::values(values)?.build()?;
449
450        assert_optimized_plan_equal!(
451            plan,
452            @ "Values: (Int32(3) AS Int32(1) + Int32(2), Int32(1) AS Int32(2) - Int32(1))"
453        )
454    }
455
456    fn get_optimized_plan_formatted(
457        plan: LogicalPlan,
458        date_time: &DateTime<Utc>,
459    ) -> String {
460        let config = OptimizerContext::new().with_query_execution_start_time(*date_time);
461        let rule = SimplifyExpressions::new();
462
463        let optimized_plan = rule.rewrite(plan, &config).unwrap().data;
464        format!("{optimized_plan}")
465    }
466
467    #[test]
468    fn cast_expr() -> Result<()> {
469        let table_scan = test_table_scan();
470        let proj = vec![Expr::Cast(Cast::new(Box::new(lit("0")), DataType::Int32))];
471        let plan = LogicalPlanBuilder::from(table_scan)
472            .project(proj)?
473            .build()?;
474
475        let expected = "Projection: Int32(0) AS Utf8(\"0\")\
476            \n  TableScan: test";
477        let actual = get_optimized_plan_formatted(plan, &Utc::now());
478        assert_eq!(expected, actual);
479        Ok(())
480    }
481
482    #[test]
483    fn simplify_and_eval() -> Result<()> {
484        // demonstrate a case where the evaluation needs to run prior
485        // to the simplifier for it to work
486        let table_scan = test_table_scan();
487        let time = Utc::now();
488        // (true or false) != col --> !col
489        let proj = vec![lit(true).or(lit(false)).not_eq(col("a"))];
490        let plan = LogicalPlanBuilder::from(table_scan)
491            .project(proj)?
492            .build()?;
493
494        let actual = get_optimized_plan_formatted(plan, &time);
495        let expected = "Projection: NOT test.a AS Boolean(true) OR Boolean(false) != test.a\
496                        \n  TableScan: test";
497
498        assert_eq!(expected, actual);
499        Ok(())
500    }
501
502    #[test]
503    fn simplify_not_binary() -> Result<()> {
504        let table_scan = test_table_scan();
505
506        let plan = LogicalPlanBuilder::from(table_scan)
507            .filter(col("d").gt(lit(10)).not())?
508            .build()?;
509
510        assert_optimized_plan_equal!(
511            plan,
512            @ r"
513        Filter: test.d <= Int32(10)
514          TableScan: test
515        "
516        )
517    }
518
519    #[test]
520    fn simplify_not_bool_and() -> Result<()> {
521        let table_scan = test_table_scan();
522
523        let plan = LogicalPlanBuilder::from(table_scan)
524            .filter(col("d").gt(lit(10)).and(col("d").lt(lit(100))).not())?
525            .build()?;
526
527        assert_optimized_plan_equal!(
528            plan,
529            @ r"
530        Filter: test.d <= Int32(10) OR test.d >= Int32(100)
531          TableScan: test
532        "
533        )
534    }
535
536    #[test]
537    fn simplify_not_bool_or() -> Result<()> {
538        let table_scan = test_table_scan();
539
540        let plan = LogicalPlanBuilder::from(table_scan)
541            .filter(col("d").gt(lit(10)).or(col("d").lt(lit(100))).not())?
542            .build()?;
543
544        assert_optimized_plan_equal!(
545            plan,
546            @ r"
547        Filter: test.d <= Int32(10) AND test.d >= Int32(100)
548          TableScan: test
549        "
550        )
551    }
552
553    #[test]
554    fn simplify_not_not() -> Result<()> {
555        let table_scan = test_table_scan();
556
557        let plan = LogicalPlanBuilder::from(table_scan)
558            .filter(col("d").gt(lit(10)).not().not())?
559            .build()?;
560
561        assert_optimized_plan_equal!(
562            plan,
563            @ r"
564        Filter: test.d > Int32(10)
565          TableScan: test
566        "
567        )
568    }
569
570    #[test]
571    fn simplify_not_null() -> Result<()> {
572        let table_scan = test_table_scan();
573
574        let plan = LogicalPlanBuilder::from(table_scan)
575            .filter(col("e").is_null().not())?
576            .build()?;
577
578        assert_optimized_plan_equal!(
579            plan,
580            @ r"
581        Filter: test.e IS NOT NULL
582          TableScan: test
583        "
584        )
585    }
586
587    #[test]
588    fn simplify_not_not_null() -> Result<()> {
589        let table_scan = test_table_scan();
590
591        let plan = LogicalPlanBuilder::from(table_scan)
592            .filter(col("e").is_not_null().not())?
593            .build()?;
594
595        assert_optimized_plan_equal!(
596            plan,
597            @ r"
598        Filter: test.e IS NULL
599          TableScan: test
600        "
601        )
602    }
603
604    #[test]
605    fn simplify_not_in() -> Result<()> {
606        let table_scan = test_table_scan();
607
608        let plan = LogicalPlanBuilder::from(table_scan)
609            .filter(col("d").in_list(vec![lit(1), lit(2), lit(3)], false).not())?
610            .build()?;
611
612        assert_optimized_plan_equal!(
613            plan,
614            @ r"
615        Filter: test.d != Int32(1) AND test.d != Int32(2) AND test.d != Int32(3)
616          TableScan: test
617        "
618        )
619    }
620
621    #[test]
622    fn simplify_not_not_in() -> Result<()> {
623        let table_scan = test_table_scan();
624
625        let plan = LogicalPlanBuilder::from(table_scan)
626            .filter(col("d").in_list(vec![lit(1), lit(2), lit(3)], true).not())?
627            .build()?;
628
629        assert_optimized_plan_equal!(
630            plan,
631            @ r"
632        Filter: test.d = Int32(1) OR test.d = Int32(2) OR test.d = Int32(3)
633          TableScan: test
634        "
635        )
636    }
637
638    #[test]
639    fn simplify_not_between() -> Result<()> {
640        let table_scan = test_table_scan();
641        let qual = col("d").between(lit(1), lit(10));
642
643        let plan = LogicalPlanBuilder::from(table_scan)
644            .filter(qual.not())?
645            .build()?;
646
647        assert_optimized_plan_equal!(
648            plan,
649            @ r"
650        Filter: test.d < Int32(1) OR test.d > Int32(10)
651          TableScan: test
652        "
653        )
654    }
655
656    #[test]
657    fn simplify_not_not_between() -> Result<()> {
658        let table_scan = test_table_scan();
659        let qual = col("d").not_between(lit(1), lit(10));
660
661        let plan = LogicalPlanBuilder::from(table_scan)
662            .filter(qual.not())?
663            .build()?;
664
665        assert_optimized_plan_equal!(
666            plan,
667            @ r"
668        Filter: test.d >= Int32(1) AND test.d <= Int32(10)
669          TableScan: test
670        "
671        )
672    }
673
674    #[test]
675    fn simplify_not_like() -> Result<()> {
676        let schema = Schema::new(vec![
677            Field::new("a", DataType::Utf8, false),
678            Field::new("b", DataType::Utf8, false),
679        ]);
680        let table_scan = table_scan(Some("test"), &schema, None)
681            .expect("creating scan")
682            .build()
683            .expect("building plan");
684
685        let plan = LogicalPlanBuilder::from(table_scan)
686            .filter(col("a").like(col("b")).not())?
687            .build()?;
688
689        assert_optimized_plan_equal!(
690            plan,
691            @ r"
692        Filter: test.a NOT LIKE test.b
693          TableScan: test
694        "
695        )
696    }
697
698    #[test]
699    fn simplify_not_not_like() -> Result<()> {
700        let schema = Schema::new(vec![
701            Field::new("a", DataType::Utf8, false),
702            Field::new("b", DataType::Utf8, false),
703        ]);
704        let table_scan = table_scan(Some("test"), &schema, None)
705            .expect("creating scan")
706            .build()
707            .expect("building plan");
708
709        let plan = LogicalPlanBuilder::from(table_scan)
710            .filter(col("a").not_like(col("b")).not())?
711            .build()?;
712
713        assert_optimized_plan_equal!(
714            plan,
715            @ r"
716        Filter: test.a LIKE test.b
717          TableScan: test
718        "
719        )
720    }
721
722    #[test]
723    fn simplify_not_ilike() -> Result<()> {
724        let schema = Schema::new(vec![
725            Field::new("a", DataType::Utf8, false),
726            Field::new("b", DataType::Utf8, false),
727        ]);
728        let table_scan = table_scan(Some("test"), &schema, None)
729            .expect("creating scan")
730            .build()
731            .expect("building plan");
732
733        let plan = LogicalPlanBuilder::from(table_scan)
734            .filter(col("a").ilike(col("b")).not())?
735            .build()?;
736
737        assert_optimized_plan_equal!(
738            plan,
739            @ r"
740        Filter: test.a NOT ILIKE test.b
741          TableScan: test
742        "
743        )
744    }
745
746    #[test]
747    fn simplify_not_distinct_from() -> Result<()> {
748        let table_scan = test_table_scan();
749
750        let plan = LogicalPlanBuilder::from(table_scan)
751            .filter(binary_expr(col("d"), Operator::IsDistinctFrom, lit(10)).not())?
752            .build()?;
753
754        assert_optimized_plan_equal!(
755            plan,
756            @ r"
757        Filter: test.d IS NOT DISTINCT FROM Int32(10)
758          TableScan: test
759        "
760        )
761    }
762
763    #[test]
764    fn simplify_not_not_distinct_from() -> Result<()> {
765        let table_scan = test_table_scan();
766
767        let plan = LogicalPlanBuilder::from(table_scan)
768            .filter(binary_expr(col("d"), Operator::IsNotDistinctFrom, lit(10)).not())?
769            .build()?;
770
771        assert_optimized_plan_equal!(
772            plan,
773            @ r"
774        Filter: test.d IS DISTINCT FROM Int32(10)
775          TableScan: test
776        "
777        )
778    }
779
780    #[test]
781    fn simplify_equijoin_predicate() -> Result<()> {
782        let t1 = test_table_scan_with_name("t1")?;
783        let t2 = test_table_scan_with_name("t2")?;
784
785        let left_key = col("t1.a") + lit(1i64).cast_to(&DataType::UInt32, t1.schema())?;
786        let right_key =
787            col("t2.a") + lit(2i64).cast_to(&DataType::UInt32, t2.schema())?;
788        let plan = LogicalPlanBuilder::from(t1)
789            .join_with_expr_keys(
790                t2,
791                JoinType::Inner,
792                (vec![left_key], vec![right_key]),
793                None,
794            )?
795            .build()?;
796
797        // before simplify: t1.a + CAST(Int64(1), UInt32) = t2.a + CAST(Int64(2), UInt32)
798        // after simplify: t1.a + UInt32(1) = t2.a + UInt32(2) AS t1.a + Int64(1) = t2.a + Int64(2)
799        assert_optimized_plan_equal!(
800            plan,
801            @ r"
802        Inner Join: t1.a + UInt32(1) = t2.a + UInt32(2)
803          TableScan: t1
804          TableScan: t2
805        "
806        )
807    }
808
809    #[test]
810    fn simplify_is_not_null() -> Result<()> {
811        let table_scan = test_table_scan();
812
813        let plan = LogicalPlanBuilder::from(table_scan)
814            .filter(col("d").is_not_null())?
815            .build()?;
816
817        assert_optimized_plan_equal!(
818            plan,
819            @ r"
820        Filter: Boolean(true)
821          TableScan: test
822        "
823        )
824    }
825
826    #[test]
827    fn simplify_is_null() -> Result<()> {
828        let table_scan = test_table_scan();
829
830        let plan = LogicalPlanBuilder::from(table_scan)
831            .filter(col("d").is_null())?
832            .build()?;
833
834        assert_optimized_plan_equal!(
835            plan,
836            @ r"
837        Filter: Boolean(false)
838          TableScan: test
839        "
840        )
841    }
842
843    #[test]
844    fn simplify_grouping_sets() -> Result<()> {
845        let table_scan = test_table_scan();
846        let plan = LogicalPlanBuilder::from(table_scan)
847            .aggregate(
848                [grouping_set(vec![
849                    vec![(lit(42).alias("prev") + lit(1)).alias("age"), col("a")],
850                    vec![col("a").or(col("b")).and(lit(1).lt(lit(0))).alias("cond")],
851                    vec![col("d").alias("e"), (lit(1) + lit(2))],
852                ])],
853                [] as [Expr; 0],
854            )?
855            .build()?;
856
857        assert_optimized_plan_equal!(
858            plan,
859            @ r"
860        Aggregate: groupBy=[[GROUPING SETS ((Int32(43) AS age, test.a), (Boolean(false) AS cond), (test.d AS e, Int32(3) AS Int32(1) + Int32(2)))]], aggr=[[]]
861          TableScan: test
862        "
863        )
864    }
865
866    #[test]
867    fn test_simplify_regex_special_cases() -> Result<()> {
868        let schema = Schema::new(vec![
869            Field::new("a", DataType::Utf8, true),
870            Field::new("b", DataType::Utf8, false),
871        ]);
872        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
873
874        // Test `= ".*"` transforms to true (except for empty strings)
875        let plan = LogicalPlanBuilder::from(table_scan.clone())
876            .filter(binary_expr(col("a"), Operator::RegexMatch, lit(".*")))?
877            .build()?;
878
879        assert_optimized_plan_equal!(
880            plan,
881            @ r"
882        Filter: test.a IS NOT NULL
883          TableScan: test
884        "
885        )?;
886
887        // Test `!= ".*"` transforms to checking if the column is empty
888        let plan = LogicalPlanBuilder::from(table_scan.clone())
889            .filter(binary_expr(col("a"), Operator::RegexNotMatch, lit(".*")))?
890            .build()?;
891
892        assert_optimized_plan_equal!(
893            plan,
894            @ r#"
895        Filter: test.a = Utf8("")
896          TableScan: test
897        "#
898        )?;
899
900        // Test case-insensitive versions
901
902        // Test `=~ ".*"` (case-insensitive) transforms to true (except for empty strings)
903        let plan = LogicalPlanBuilder::from(table_scan.clone())
904            .filter(binary_expr(col("b"), Operator::RegexIMatch, lit(".*")))?
905            .build()?;
906
907        assert_optimized_plan_equal!(
908            plan,
909            @ r"
910        Filter: Boolean(true)
911          TableScan: test
912        "
913        )?;
914
915        // Test `!~ ".*"` (case-insensitive) transforms to checking if the column is empty
916        let plan = LogicalPlanBuilder::from(table_scan.clone())
917            .filter(binary_expr(col("a"), Operator::RegexNotIMatch, lit(".*")))?
918            .build()?;
919
920        assert_optimized_plan_equal!(
921            plan,
922            @ r#"
923        Filter: test.a = Utf8("")
924          TableScan: test
925        "#
926        )
927    }
928
929    #[test]
930    fn simplify_not_in_list() -> Result<()> {
931        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
932        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
933
934        let plan = LogicalPlanBuilder::from(table_scan)
935            .filter(col("a").in_list(vec![lit("a"), lit("b")], false).not())?
936            .build()?;
937
938        assert_optimized_plan_equal!(
939            plan,
940            @ r#"
941        Filter: test.a != Utf8("a") AND test.a != Utf8("b")
942          TableScan: test
943        "#
944        )
945    }
946
947    #[test]
948    fn simplify_not_not_in_list() -> Result<()> {
949        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
950        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
951
952        let plan = LogicalPlanBuilder::from(table_scan)
953            .filter(
954                col("a")
955                    .in_list(vec![lit("a"), lit("b")], false)
956                    .not()
957                    .not(),
958            )?
959            .build()?;
960
961        assert_optimized_plan_equal!(
962            plan,
963            @ r#"
964        Filter: test.a = Utf8("a") OR test.a = Utf8("b")
965          TableScan: test
966        "#
967        )
968    }
969
970    #[test]
971    fn simplify_not_exists() -> Result<()> {
972        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
973        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
974        let table_scan2 =
975            datafusion_expr::table_scan(Some("test2"), &schema, None)?.build()?;
976
977        let plan = LogicalPlanBuilder::from(table_scan)
978            .filter(
979                exists(Arc::new(LogicalPlanBuilder::from(table_scan2).build()?)).not(),
980            )?
981            .build()?;
982
983        assert_optimized_plan_equal!(
984            plan,
985            @ r"
986        Filter: NOT EXISTS (<subquery>)
987          Subquery:
988            TableScan: test2
989          TableScan: test
990        "
991        )
992    }
993
994    #[test]
995    fn simplify_not_not_exists() -> Result<()> {
996        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
997        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
998        let table_scan2 =
999            datafusion_expr::table_scan(Some("test2"), &schema, None)?.build()?;
1000
1001        let plan = LogicalPlanBuilder::from(table_scan)
1002            .filter(
1003                exists(Arc::new(LogicalPlanBuilder::from(table_scan2).build()?))
1004                    .not()
1005                    .not(),
1006            )?
1007            .build()?;
1008
1009        assert_optimized_plan_equal!(
1010            plan,
1011            @ r"
1012        Filter: EXISTS (<subquery>)
1013          Subquery:
1014            TableScan: test2
1015          TableScan: test
1016        "
1017        )
1018    }
1019
1020    #[test]
1021    fn simplify_not_in_subquery() -> Result<()> {
1022        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
1023        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
1024        let table_scan2 =
1025            datafusion_expr::table_scan(Some("test2"), &schema, None)?.build()?;
1026
1027        let plan = LogicalPlanBuilder::from(table_scan)
1028            .filter(
1029                in_subquery(
1030                    col("a"),
1031                    Arc::new(LogicalPlanBuilder::from(table_scan2).build()?),
1032                )
1033                .not(),
1034            )?
1035            .build()?;
1036
1037        assert_optimized_plan_equal!(
1038            plan,
1039            @ r"
1040        Filter: test.a NOT IN (<subquery>)
1041          Subquery:
1042            TableScan: test2
1043          TableScan: test
1044        "
1045        )
1046    }
1047
1048    #[test]
1049    fn simplify_not_not_in_subquery() -> Result<()> {
1050        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
1051        let table_scan = table_scan(Some("test"), &schema, None)?.build()?;
1052        let table_scan2 =
1053            datafusion_expr::table_scan(Some("test2"), &schema, None)?.build()?;
1054
1055        let plan = LogicalPlanBuilder::from(table_scan)
1056            .filter(
1057                in_subquery(
1058                    col("a"),
1059                    Arc::new(LogicalPlanBuilder::from(table_scan2).build()?),
1060                )
1061                .not()
1062                .not(),
1063            )?
1064            .build()?;
1065
1066        assert_optimized_plan_equal!(
1067            plan,
1068            @ r"
1069        Filter: test.a IN (<subquery>)
1070          Subquery:
1071            TableScan: test2
1072          TableScan: test
1073        "
1074        )
1075    }
1076}