Skip to main content

datafusion_physical_expr/expressions/
binary.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod kernels;
19
20use crate::PhysicalExpr;
21use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
22use std::hash::Hash;
23use std::sync::Arc;
24
25use arrow::array::*;
26use arrow::compute::kernels::boolean::{and_kleene, or_kleene};
27use arrow::compute::kernels::concat_elements::{
28    concat_element_binary, concat_elements_utf8,
29};
30use arrow::compute::{SlicesIterator, cast, filter_record_batch};
31use arrow::datatypes::*;
32use arrow::error::ArrowError;
33use datafusion_common::cast::as_boolean_array;
34use datafusion_common::{Result, ScalarValue, internal_err, not_impl_err};
35
36use datafusion_expr::binary::BinaryTypeCoercer;
37use datafusion_expr::interval_arithmetic::{Interval, apply_operator};
38use datafusion_expr::sort_properties::ExprProperties;
39#[expect(deprecated)]
40use datafusion_expr::statistics::Distribution::{Bernoulli, Gaussian};
41#[expect(deprecated)]
42use datafusion_expr::statistics::{
43    Distribution, combine_bernoullis, combine_gaussians,
44    create_bernoulli_from_comparison, new_generic_from_binary_op,
45};
46use datafusion_expr::{ColumnarValue, Operator};
47use datafusion_physical_expr_common::datum::{apply, apply_cmp};
48
49use kernels::{
50    bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar,
51    bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn,
52    bitwise_shift_right_dyn_scalar, bitwise_xor_dyn, bitwise_xor_dyn_scalar,
53    concat_elements_binary_view_array, concat_elements_utf8view, regex_match_dyn,
54    regex_match_dyn_scalar,
55};
56
57/// Binary expression
58#[derive(Debug, Clone, Eq)]
59pub struct BinaryExpr {
60    left: Arc<dyn PhysicalExpr>,
61    op: Operator,
62    right: Arc<dyn PhysicalExpr>,
63    /// Specifies whether an error is returned on overflow or not
64    fail_on_overflow: bool,
65}
66
67// Manually derive PartialEq and Hash to work around https://github.com/rust-lang/rust/issues/78808
68impl PartialEq for BinaryExpr {
69    fn eq(&self, other: &Self) -> bool {
70        self.left.eq(&other.left)
71            && self.op.eq(&other.op)
72            && self.right.eq(&other.right)
73            && self.fail_on_overflow.eq(&other.fail_on_overflow)
74    }
75}
76impl Hash for BinaryExpr {
77    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
78        self.left.hash(state);
79        self.op.hash(state);
80        self.right.hash(state);
81        self.fail_on_overflow.hash(state);
82    }
83}
84
85impl BinaryExpr {
86    /// Create new binary expression
87    pub fn new(
88        left: Arc<dyn PhysicalExpr>,
89        op: Operator,
90        right: Arc<dyn PhysicalExpr>,
91    ) -> Self {
92        Self {
93            left,
94            op,
95            right,
96            fail_on_overflow: false,
97        }
98    }
99
100    /// Create new binary expression with explicit fail_on_overflow value
101    pub fn with_fail_on_overflow(self, fail_on_overflow: bool) -> Self {
102        Self {
103            left: self.left,
104            op: self.op,
105            right: self.right,
106            fail_on_overflow,
107        }
108    }
109
110    /// Get the left side of the binary expression
111    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
112        &self.left
113    }
114
115    /// Get the right side of the binary expression
116    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
117        &self.right
118    }
119
120    /// Get the operator for this binary expression
121    pub fn op(&self) -> &Operator {
122        &self.op
123    }
124}
125
126impl std::fmt::Display for BinaryExpr {
127    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
128        // Put parentheses around child binary expressions so that we can see the difference
129        // between `(a OR b) AND c` and `a OR (b AND c)`. We only insert parentheses when needed,
130        // based on operator precedence. For example, `(a AND b) OR c` and `a AND b OR c` are
131        // equivalent and the parentheses are not necessary.
132
133        fn write_child(
134            f: &mut std::fmt::Formatter,
135            expr: &dyn PhysicalExpr,
136            precedence: u8,
137        ) -> std::fmt::Result {
138            if let Some(child) = expr.downcast_ref::<BinaryExpr>() {
139                let p = child.op.precedence();
140                if p == 0 || p < precedence {
141                    write!(f, "({child})")?;
142                } else {
143                    write!(f, "{child}")?;
144                }
145            } else {
146                write!(f, "{expr}")?;
147            }
148
149            Ok(())
150        }
151
152        let precedence = self.op.precedence();
153        write_child(f, self.left.as_ref(), precedence)?;
154        write!(f, " {} ", self.op)?;
155        write_child(f, self.right.as_ref(), precedence)
156    }
157}
158
159/// Invoke a boolean kernel on a pair of arrays
160#[inline]
161fn boolean_op(
162    left: &dyn Array,
163    right: &dyn Array,
164    op: impl FnOnce(&BooleanArray, &BooleanArray) -> Result<BooleanArray, ArrowError>,
165) -> Result<Arc<dyn Array + 'static>, ArrowError> {
166    let ll = as_boolean_array(left).expect("boolean_op failed to downcast left array");
167    let rr = as_boolean_array(right).expect("boolean_op failed to downcast right array");
168    op(ll, rr).map(|t| Arc::new(t) as _)
169}
170
171/// Returns true if both operands are Date types (Date32 or Date64)
172/// Used to detect Date - Date operations which should return Int64 (days difference)
173fn is_date_minus_date(lhs: &DataType, rhs: &DataType) -> bool {
174    matches!(
175        (lhs, rhs),
176        (DataType::Date32, DataType::Date32) | (DataType::Date64, DataType::Date64)
177    )
178}
179
180/// Computes the difference between two dates and returns the result as Int64 (days)
181/// This aligns with PostgreSQL, DuckDB, and MySQL behavior where date - date returns an integer
182///
183/// Implementation: Uses Arrow's sub_wrapping to get Duration, then converts to Int64 days
184fn apply_date_subtraction(
185    lhs: &ColumnarValue,
186    rhs: &ColumnarValue,
187) -> Result<ColumnarValue> {
188    use arrow::compute::kernels::numeric::sub_wrapping;
189
190    // Use Arrow's sub_wrapping to compute the Duration result
191    let duration_result = apply(lhs, rhs, sub_wrapping)?;
192
193    // Convert Duration to Int64 (days)
194    match duration_result {
195        ColumnarValue::Array(array) => {
196            let int64_array = duration_to_days(&array)?;
197            Ok(ColumnarValue::Array(int64_array))
198        }
199        ColumnarValue::Scalar(scalar) => {
200            // Convert scalar Duration to Int64 days
201            let array = scalar.to_array_of_size(1)?;
202            let int64_array = duration_to_days(&array)?;
203            let int64_scalar = ScalarValue::try_from_array(int64_array.as_ref(), 0)?;
204            Ok(ColumnarValue::Scalar(int64_scalar))
205        }
206    }
207}
208
209/// Converts a Duration array to Int64 days
210/// Handles different Duration time units (Second, Millisecond, Microsecond, Nanosecond)
211fn duration_to_days(array: &ArrayRef) -> Result<ArrayRef> {
212    use datafusion_common::cast::{
213        as_duration_microsecond_array, as_duration_millisecond_array,
214        as_duration_nanosecond_array, as_duration_second_array,
215    };
216
217    const SECONDS_PER_DAY: i64 = 86_400;
218    const MILLIS_PER_DAY: i64 = 86_400_000;
219    const MICROS_PER_DAY: i64 = 86_400_000_000;
220    const NANOS_PER_DAY: i64 = 86_400_000_000_000;
221
222    match array.data_type() {
223        DataType::Duration(TimeUnit::Second) => {
224            let duration_array = as_duration_second_array(array)?;
225            let result: Int64Array = duration_array
226                .iter()
227                .map(|v| v.map(|val| val / SECONDS_PER_DAY))
228                .collect();
229            Ok(Arc::new(result))
230        }
231        DataType::Duration(TimeUnit::Millisecond) => {
232            let duration_array = as_duration_millisecond_array(array)?;
233            let result: Int64Array = duration_array
234                .iter()
235                .map(|v| v.map(|val| val / MILLIS_PER_DAY))
236                .collect();
237            Ok(Arc::new(result))
238        }
239        DataType::Duration(TimeUnit::Microsecond) => {
240            let duration_array = as_duration_microsecond_array(array)?;
241            let result: Int64Array = duration_array
242                .iter()
243                .map(|v| v.map(|val| val / MICROS_PER_DAY))
244                .collect();
245            Ok(Arc::new(result))
246        }
247        DataType::Duration(TimeUnit::Nanosecond) => {
248            let duration_array = as_duration_nanosecond_array(array)?;
249            let result: Int64Array = duration_array
250                .iter()
251                .map(|v| v.map(|val| val / NANOS_PER_DAY))
252                .collect();
253            Ok(Arc::new(result))
254        }
255        other => internal_err!("duration_to_days expected Duration type, got: {}", other),
256    }
257}
258
259impl PhysicalExpr for BinaryExpr {
260    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
261        BinaryTypeCoercer::new(
262            &self.left.data_type(input_schema)?,
263            &self.op,
264            &self.right.data_type(input_schema)?,
265        )
266        .get_result_type()
267    }
268
269    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
270        Ok(self.left.nullable(input_schema)? || self.right.nullable(input_schema)?)
271    }
272
273    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
274        use arrow::compute::kernels::numeric::*;
275
276        // Evaluate left-hand side expression.
277        let lhs = self.left.evaluate(batch)?;
278
279        // Check if we can apply short-circuit evaluation.
280        match check_short_circuit(&lhs, &self.op) {
281            ShortCircuitStrategy::None => {}
282            ShortCircuitStrategy::ReturnLeft => return Ok(lhs),
283            ShortCircuitStrategy::ReturnRight => {
284                let rhs = self.right.evaluate(batch)?;
285                return Ok(rhs);
286            }
287            ShortCircuitStrategy::PreSelection(selection) => {
288                // The function `evaluate_selection` was not called for filtering and calculation,
289                // as it takes into account cases where the selection contains null values.
290                let batch = filter_record_batch(batch, selection)?;
291                let right_ret = self.right.evaluate(&batch)?;
292
293                match &right_ret {
294                    ColumnarValue::Array(array) => {
295                        // When the array on the right is all true or all false, skip the scatter process
296                        let boolean_array = array.as_boolean();
297                        if boolean_array.null_count() == 0 && !boolean_array.has_false() {
298                            return Ok(lhs);
299                        } else if boolean_array.null_count() == 0
300                            && !boolean_array.has_true()
301                        {
302                            // If the right-hand array is returned at this point,the lengths will be inconsistent;
303                            // returning a scalar can avoid this issue
304                            return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(
305                                Some(false),
306                            )));
307                        }
308
309                        return pre_selection_scatter(selection, Some(boolean_array));
310                    }
311                    ColumnarValue::Scalar(scalar) => {
312                        if let ScalarValue::Boolean(v) = scalar {
313                            // When the scalar is true or false, skip the scatter process
314                            if let Some(v) = v {
315                                if *v {
316                                    return Ok(lhs);
317                                } else {
318                                    return Ok(right_ret);
319                                }
320                            } else {
321                                return pre_selection_scatter(selection, None);
322                            }
323                        } else {
324                            return internal_err!(
325                                "Expected boolean scalar value, found: {right_ret:?}"
326                            );
327                        }
328                    }
329                }
330            }
331        }
332
333        let rhs = self.right.evaluate(batch)?;
334        let left_data_type = lhs.data_type();
335        let right_data_type = rhs.data_type();
336
337        let schema = batch.schema();
338        let input_schema = schema.as_ref();
339
340        match self.op {
341            Operator::Plus if self.fail_on_overflow => return apply(&lhs, &rhs, add),
342            Operator::Plus => return apply(&lhs, &rhs, add_wrapping),
343            // Special case: Date - Date returns Int64 (days difference)
344            // This aligns with PostgreSQL, DuckDB, and MySQL behavior
345            Operator::Minus if is_date_minus_date(&left_data_type, &right_data_type) => {
346                return apply_date_subtraction(&lhs, &rhs);
347            }
348            Operator::Minus if self.fail_on_overflow => return apply(&lhs, &rhs, sub),
349            Operator::Minus => return apply(&lhs, &rhs, sub_wrapping),
350            Operator::Multiply if self.fail_on_overflow => return apply(&lhs, &rhs, mul),
351            Operator::Multiply => return apply(&lhs, &rhs, mul_wrapping),
352            Operator::Divide => return apply(&lhs, &rhs, div),
353            Operator::Modulo => return apply(&lhs, &rhs, rem),
354
355            Operator::Eq
356            | Operator::NotEq
357            | Operator::Lt
358            | Operator::Gt
359            | Operator::LtEq
360            | Operator::GtEq
361            | Operator::IsDistinctFrom
362            | Operator::IsNotDistinctFrom
363            | Operator::LikeMatch
364            | Operator::ILikeMatch
365            | Operator::NotLikeMatch
366            | Operator::NotILikeMatch => {
367                return apply_cmp(self.op, &lhs, &rhs);
368            }
369            _ => {}
370        }
371
372        let result_type = self.data_type(input_schema)?;
373
374        // If the left-hand side is an array and the right-hand side is a non-null scalar, try the optimized kernel.
375        if let (ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) = (&lhs, &rhs)
376            && !scalar.is_null()
377            && let Some(result_array) =
378                self.evaluate_array_scalar(array, scalar.clone())?
379        {
380            let final_array = result_array
381                .and_then(|a| to_result_type_array(&self.op, a, &result_type));
382            return final_array.map(ColumnarValue::Array);
383        }
384
385        // if both arrays or both literals - extract arrays and continue execution
386        let (left, right) = (
387            lhs.into_array(batch.num_rows())?,
388            rhs.into_array(batch.num_rows())?,
389        );
390        self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type)
391            .map(ColumnarValue::Array)
392    }
393
394    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
395        vec![&self.left, &self.right]
396    }
397
398    fn with_new_children(
399        self: Arc<Self>,
400        children: Vec<Arc<dyn PhysicalExpr>>,
401    ) -> Result<Arc<dyn PhysicalExpr>> {
402        Ok(Arc::new(
403            BinaryExpr::new(Arc::clone(&children[0]), self.op, Arc::clone(&children[1]))
404                .with_fail_on_overflow(self.fail_on_overflow),
405        ))
406    }
407
408    fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
409        // Get children intervals:
410        let left_interval = children[0];
411        let right_interval = children[1];
412        // Calculate current node's interval:
413        apply_operator(&self.op, left_interval, right_interval)
414    }
415
416    fn propagate_constraints(
417        &self,
418        interval: &Interval,
419        children: &[&Interval],
420    ) -> Result<Option<Vec<Interval>>> {
421        // Get children intervals.
422        let left_interval = children[0];
423        let right_interval = children[1];
424
425        if self.op.eq(&Operator::And) {
426            if interval.eq(&Interval::TRUE) {
427                // A certainly true logical conjunction can only derive from possibly
428                // true operands. Otherwise, we prove infeasibility.
429                Ok((!left_interval.eq(&Interval::FALSE)
430                    && !right_interval.eq(&Interval::FALSE))
431                .then(|| vec![Interval::TRUE, Interval::TRUE]))
432            } else if interval.eq(&Interval::FALSE) {
433                // If the logical conjunction is certainly false, one of the
434                // operands must be false. However, it's not always possible to
435                // determine which operand is false, leading to different scenarios.
436
437                // If one operand is certainly true and the other one is uncertain,
438                // then the latter must be certainly false.
439                if left_interval.eq(&Interval::TRUE)
440                    && right_interval.eq(&Interval::TRUE_OR_FALSE)
441                {
442                    Ok(Some(vec![Interval::TRUE, Interval::FALSE]))
443                } else if right_interval.eq(&Interval::TRUE)
444                    && left_interval.eq(&Interval::TRUE_OR_FALSE)
445                {
446                    Ok(Some(vec![Interval::FALSE, Interval::TRUE]))
447                }
448                // If both children are uncertain, or if one is certainly false,
449                // we cannot conclusively refine their intervals. In this case,
450                // propagation does not result in any interval changes.
451                else {
452                    Ok(Some(vec![]))
453                }
454            } else {
455                // An uncertain logical conjunction result can not shrink the
456                // end-points of its children.
457                Ok(Some(vec![]))
458            }
459        } else if self.op.eq(&Operator::Or) {
460            if interval.eq(&Interval::FALSE) {
461                // A certainly false logical disjunction can only derive from certainly
462                // false operands. Otherwise, we prove infeasibility.
463                Ok((!left_interval.eq(&Interval::TRUE)
464                    && !right_interval.eq(&Interval::TRUE))
465                .then(|| vec![Interval::FALSE, Interval::FALSE]))
466            } else if interval.eq(&Interval::TRUE) {
467                // If the logical disjunction is certainly true, one of the
468                // operands must be true. However, it's not always possible to
469                // determine which operand is true, leading to different scenarios.
470
471                // If one operand is certainly false and the other one is uncertain,
472                // then the latter must be certainly true.
473                if left_interval.eq(&Interval::FALSE)
474                    && right_interval.eq(&Interval::TRUE_OR_FALSE)
475                {
476                    Ok(Some(vec![Interval::FALSE, Interval::TRUE]))
477                } else if right_interval.eq(&Interval::FALSE)
478                    && left_interval.eq(&Interval::TRUE_OR_FALSE)
479                {
480                    Ok(Some(vec![Interval::TRUE, Interval::FALSE]))
481                }
482                // If both children are uncertain, or if one is certainly true,
483                // we cannot conclusively refine their intervals. In this case,
484                // propagation does not result in any interval changes.
485                else {
486                    Ok(Some(vec![]))
487                }
488            } else {
489                // An uncertain logical disjunction result can not shrink the
490                // end-points of its children.
491                Ok(Some(vec![]))
492            }
493        } else if self.op.supports_propagation() {
494            Ok(
495                propagate_comparison(&self.op, interval, left_interval, right_interval)?
496                    .map(|(left, right)| vec![left, right]),
497            )
498        } else {
499            Ok(
500                propagate_arithmetic(&self.op, interval, left_interval, right_interval)?
501                    .map(|(left, right)| vec![left, right]),
502            )
503        }
504    }
505
506    #[expect(deprecated)]
507    fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
508        let (left, right) = (children[0], children[1]);
509
510        if self.op.is_numerical_operators() {
511            // We might be able to construct the output statistics more accurately,
512            // without falling back to an unknown distribution, if we are dealing
513            // with Gaussian distributions and numerical operations.
514            if let (Gaussian(left), Gaussian(right)) = (left, right)
515                && let Some(result) = combine_gaussians(&self.op, left, right)?
516            {
517                return Ok(Gaussian(result));
518            }
519        } else if self.op.is_logic_operator() {
520            // If we are dealing with logical operators, we expect (and can only
521            // operate on) Bernoulli distributions.
522            return if let (Bernoulli(left), Bernoulli(right)) = (left, right) {
523                combine_bernoullis(&self.op, left, right).map(Bernoulli)
524            } else {
525                internal_err!(
526                    "Logical operators are only compatible with `Bernoulli` distributions"
527                )
528            };
529        } else if self.op.supports_propagation() {
530            // If we are handling comparison operators, we expect (and can only
531            // operate on) numeric distributions.
532            return create_bernoulli_from_comparison(&self.op, left, right);
533        }
534        // Fall back to an unknown distribution with only summary statistics:
535        new_generic_from_binary_op(&self.op, left, right)
536    }
537
538    /// For each operator, [`BinaryExpr`] has distinct rules.
539    /// TODO: There may be rules specific to some data types and expression ranges.
540    fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
541        let (l_order, l_range) = (children[0].sort_properties, &children[0].range);
542        let (r_order, r_range) = (children[1].sort_properties, &children[1].range);
543        match self.op() {
544            Operator::Plus => Ok(ExprProperties {
545                sort_properties: l_order.add(&r_order),
546                range: l_range.add(r_range)?,
547                preserves_lex_ordering: false,
548            }),
549            Operator::Minus => Ok(ExprProperties {
550                sort_properties: l_order.sub(&r_order),
551                range: l_range.sub(r_range)?,
552                preserves_lex_ordering: false,
553            }),
554            Operator::Gt => Ok(ExprProperties {
555                sort_properties: l_order.gt_or_gteq(&r_order),
556                range: l_range.gt(r_range)?,
557                preserves_lex_ordering: false,
558            }),
559            Operator::GtEq => Ok(ExprProperties {
560                sort_properties: l_order.gt_or_gteq(&r_order),
561                range: l_range.gt_eq(r_range)?,
562                preserves_lex_ordering: false,
563            }),
564            Operator::Lt => Ok(ExprProperties {
565                sort_properties: r_order.gt_or_gteq(&l_order),
566                range: l_range.lt(r_range)?,
567                preserves_lex_ordering: false,
568            }),
569            Operator::LtEq => Ok(ExprProperties {
570                sort_properties: r_order.gt_or_gteq(&l_order),
571                range: l_range.lt_eq(r_range)?,
572                preserves_lex_ordering: false,
573            }),
574            Operator::And => Ok(ExprProperties {
575                sort_properties: r_order.and_or(&l_order),
576                range: l_range.and(r_range)?,
577                preserves_lex_ordering: false,
578            }),
579            Operator::Or => Ok(ExprProperties {
580                sort_properties: r_order.and_or(&l_order),
581                range: l_range.or(r_range)?,
582                preserves_lex_ordering: false,
583            }),
584            _ => Ok(ExprProperties::new_unknown()),
585        }
586    }
587
588    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
589        fn write_child(
590            f: &mut std::fmt::Formatter,
591            expr: &dyn PhysicalExpr,
592            precedence: u8,
593        ) -> std::fmt::Result {
594            if let Some(child) = expr.downcast_ref::<BinaryExpr>() {
595                let p = child.op.precedence();
596                if p == 0 || p < precedence {
597                    write!(f, "(")?;
598                    child.fmt_sql(f)?;
599                    write!(f, ")")
600                } else {
601                    child.fmt_sql(f)
602                }
603            } else {
604                expr.fmt_sql(f)
605            }
606        }
607
608        let precedence = self.op.precedence();
609        write_child(f, self.left.as_ref(), precedence)?;
610        write!(f, " {} ", self.op)?;
611        write_child(f, self.right.as_ref(), precedence)
612    }
613}
614
615/// Casts dictionary array to result type for binary numerical operators. Such operators
616/// between array and scalar produce a dictionary array other than primitive array of the
617/// same operators between array and array. This leads to inconsistent result types causing
618/// errors in the following query execution. For such operators between array and scalar,
619/// we cast the dictionary array to primitive array.
620fn to_result_type_array(
621    op: &Operator,
622    array: ArrayRef,
623    result_type: &DataType,
624) -> Result<ArrayRef> {
625    if array.data_type() == result_type {
626        Ok(array)
627    } else if op.is_numerical_operators() {
628        match array.data_type() {
629            DataType::Dictionary(_, value_type) => {
630                if value_type.as_ref() == result_type {
631                    Ok(cast(&array, result_type)?)
632                } else {
633                    internal_err!(
634                        "Incompatible Dictionary value type {value_type} with result type {result_type} of Binary operator {op:?}"
635                    )
636                }
637            }
638            _ => Ok(array),
639        }
640    } else {
641        Ok(array)
642    }
643}
644
645impl BinaryExpr {
646    /// Evaluate the expression of the left input is an array and
647    /// right is literal - use scalar operations
648    fn evaluate_array_scalar(
649        &self,
650        array: &dyn Array,
651        scalar: ScalarValue,
652    ) -> Result<Option<Result<ArrayRef>>> {
653        use Operator::*;
654        let scalar_result = match &self.op {
655            RegexMatch => regex_match_dyn_scalar(array, &scalar, false, false),
656            RegexIMatch => regex_match_dyn_scalar(array, &scalar, false, true),
657            RegexNotMatch => regex_match_dyn_scalar(array, &scalar, true, false),
658            RegexNotIMatch => regex_match_dyn_scalar(array, &scalar, true, true),
659            BitwiseAnd => bitwise_and_dyn_scalar(array, scalar),
660            BitwiseOr => bitwise_or_dyn_scalar(array, scalar),
661            BitwiseXor => bitwise_xor_dyn_scalar(array, scalar),
662            BitwiseShiftRight => bitwise_shift_right_dyn_scalar(array, scalar),
663            BitwiseShiftLeft => bitwise_shift_left_dyn_scalar(array, scalar),
664            // if scalar operation is not supported - fallback to array implementation
665            _ => None,
666        };
667
668        Ok(scalar_result)
669    }
670
671    fn evaluate_with_resolved_args(
672        &self,
673        left: Arc<dyn Array>,
674        left_data_type: &DataType,
675        right: Arc<dyn Array>,
676        right_data_type: &DataType,
677    ) -> Result<ArrayRef> {
678        use Operator::*;
679        match &self.op {
680            IsDistinctFrom | IsNotDistinctFrom | Lt | LtEq | Gt | GtEq | Eq | NotEq
681            | Plus | Minus | Multiply | Divide | Modulo | LikeMatch | ILikeMatch
682            | NotLikeMatch | NotILikeMatch => unreachable!(),
683            And => {
684                if left_data_type == &DataType::Boolean {
685                    Ok(boolean_op(&left, &right, and_kleene)?)
686                } else {
687                    internal_err!(
688                        "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
689                        self.op,
690                        left.data_type(),
691                        right.data_type()
692                    )
693                }
694            }
695            Or => {
696                if left_data_type == &DataType::Boolean {
697                    Ok(boolean_op(&left, &right, or_kleene)?)
698                } else {
699                    internal_err!(
700                        "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
701                        self.op,
702                        left_data_type,
703                        right_data_type
704                    )
705                }
706            }
707            RegexMatch => regex_match_dyn(&left, &right, false, false),
708            RegexIMatch => regex_match_dyn(&left, &right, false, true),
709            RegexNotMatch => regex_match_dyn(&left, &right, true, false),
710            RegexNotIMatch => regex_match_dyn(&left, &right, true, true),
711            BitwiseAnd => bitwise_and_dyn(left, right),
712            BitwiseOr => bitwise_or_dyn(left, right),
713            BitwiseXor => bitwise_xor_dyn(left, right),
714            BitwiseShiftRight => bitwise_shift_right_dyn(left, right),
715            BitwiseShiftLeft => bitwise_shift_left_dyn(left, right),
716            StringConcat => concat_elements(&left, &right),
717            AtArrow | ArrowAt | Arrow | LongArrow | HashArrow | HashLongArrow | AtAt
718            | HashMinus | AtQuestion | Question | QuestionAnd | QuestionPipe
719            | IntegerDivide | Colon => {
720                not_impl_err!(
721                    "Binary operator '{:?}' is not supported in the physical expr",
722                    self.op
723                )
724            }
725        }
726    }
727}
728
729enum ShortCircuitStrategy<'a> {
730    None,
731    ReturnLeft,
732    ReturnRight,
733    PreSelection(&'a BooleanArray),
734}
735
736/// Based on the results calculated from the left side of the short-circuit operation,
737/// if the proportion of `true` is less than 0.2 and the current operation is an `and`,
738/// the `RecordBatch` will be filtered in advance.
739const PRE_SELECTION_THRESHOLD: f32 = 0.2;
740
741/// Checks if a logical operator (`AND`/`OR`) can short-circuit evaluation based on the left-hand side (lhs) result.
742///
743/// Short-circuiting occurs under these circumstances:
744/// - For `AND`:
745///    - if LHS is all false => short-circuit → return LHS
746///    - if LHS is all true  => short-circuit → return RHS
747///    - if LHS is mixed and true_count/sum_count <= [`PRE_SELECTION_THRESHOLD`] -> pre-selection
748/// - For `OR`:
749///    - if LHS is all true  => short-circuit → return LHS
750///    - if LHS is all false => short-circuit → return RHS
751/// # Arguments
752/// * `lhs` - The left-hand side (lhs) columnar value (array or scalar)
753/// * `lhs` - The left-hand side (lhs) columnar value (array or scalar)
754/// * `op` - The logical operator (`AND` or `OR`)
755///
756/// # Implementation Notes
757/// 1. Only works with Boolean-typed arguments (other types automatically return `false`)
758/// 2. Handles both scalar values and array values
759/// 3. For arrays, uses optimized bit counting techniques for boolean arrays
760fn check_short_circuit<'a>(
761    lhs: &'a ColumnarValue,
762    op: &Operator,
763) -> ShortCircuitStrategy<'a> {
764    // Quick reject for non-logical operators,and quick judgment when op is and
765    let is_and = match op {
766        Operator::And => true,
767        Operator::Or => false,
768        _ => return ShortCircuitStrategy::None,
769    };
770
771    // Non-boolean types can't be short-circuited
772    if lhs.data_type() != DataType::Boolean {
773        return ShortCircuitStrategy::None;
774    }
775
776    match lhs {
777        ColumnarValue::Array(array) => {
778            // Fast path for arrays - try to downcast to boolean array
779            if let Ok(bool_array) = as_boolean_array(array) {
780                // Arrays with nulls can't be short-circuited
781                if bool_array.null_count() > 0 {
782                    return ShortCircuitStrategy::None;
783                }
784
785                let len = bool_array.len();
786                if len == 0 {
787                    return ShortCircuitStrategy::None;
788                }
789
790                let true_count = bool_array.values().count_set_bits();
791                if is_and {
792                    // For AND, prioritize checking for all-false (short circuit case)
793                    // Uses optimized false_count() method provided by Arrow
794
795                    // Short circuit if all values are false
796                    if true_count == 0 {
797                        return ShortCircuitStrategy::ReturnLeft;
798                    }
799
800                    // If no false values, then all must be true
801                    if true_count == len {
802                        return ShortCircuitStrategy::ReturnRight;
803                    }
804
805                    // determine if we can pre-selection
806                    if true_count as f32 / len as f32 <= PRE_SELECTION_THRESHOLD {
807                        return ShortCircuitStrategy::PreSelection(bool_array);
808                    }
809                } else {
810                    // For OR, prioritize checking for all-true (short circuit case)
811                    // Uses optimized true_count() method provided by Arrow
812
813                    // Short circuit if all values are true
814                    if true_count == len {
815                        return ShortCircuitStrategy::ReturnLeft;
816                    }
817
818                    // If no true values, then all must be false
819                    if true_count == 0 {
820                        return ShortCircuitStrategy::ReturnRight;
821                    }
822                }
823            }
824        }
825        ColumnarValue::Scalar(scalar) => {
826            // Fast path for scalar values
827            if let ScalarValue::Boolean(Some(is_true)) = scalar {
828                // Return Left for:
829                // - AND with false value
830                // - OR with true value
831                if (is_and && !is_true) || (!is_and && *is_true) {
832                    return ShortCircuitStrategy::ReturnLeft;
833                } else {
834                    return ShortCircuitStrategy::ReturnRight;
835                }
836            }
837        }
838    }
839
840    // If we can't short-circuit, indicate that normal evaluation should continue
841    ShortCircuitStrategy::None
842}
843
844/// Creates a new boolean array based on the evaluation of the right expression,
845/// but only for positions where the left_result is true.
846///
847/// This function is used for short-circuit evaluation optimization of logical AND operations:
848/// - When left_result has few true values, we only evaluate the right expression for those positions
849/// - Values are copied from right_array where left_result is true
850/// - All other positions are filled with false values
851///
852/// # Parameters
853/// - `left_result` Boolean array with selection mask (typically from left side of AND)
854/// - `right_result` Result of evaluating right side of expression (only for selected positions)
855///
856/// # Returns
857/// A combined ColumnarValue with values from right_result where left_result is true
858///
859/// # Example
860///  Initial Data: { 1, 2, 3, 4, 5 }
861///  Left Evaluation
862///     (Condition: Equal to 2 or 3)
863///          ↓
864///  Filtered Data: {2, 3}
865///    Left Bitmap: { 0, 1, 1, 0, 0 }
866///          ↓
867///   Right Evaluation
868///     (Condition: Even numbers)
869///          ↓
870///  Right Data: { 2 }
871///    Right Bitmap: { 1, 0 }
872///          ↓
873///   Combine Results
874///  Final Bitmap: { 0, 1, 0, 0, 0 }
875///
876/// # Note
877/// Perhaps it would be better to modify `left_result` directly without creating a copy?
878/// In practice, `left_result` should have only one owner, so making changes should be safe.
879/// However, this is difficult to achieve under the immutable constraints of [`Arc`] and [`BooleanArray`].
880fn pre_selection_scatter(
881    left_result: &BooleanArray,
882    right_result: Option<&BooleanArray>,
883) -> Result<ColumnarValue> {
884    let result_len = left_result.len();
885
886    let mut result_array_builder = BooleanArray::builder(result_len);
887
888    // keep track of current position we have in right boolean array
889    let mut right_array_pos = 0;
890
891    // keep track of how much is filled
892    let mut last_end = 0;
893    // reduce if condition in for_each
894    match right_result {
895        Some(right_result) => {
896            SlicesIterator::new(left_result).for_each(|(start, end)| {
897                // the gap needs to be filled with false
898                if start > last_end {
899                    result_array_builder.append_n(start - last_end, false);
900                }
901
902                // copy values from right array for this slice
903                let len = end - start;
904                right_result
905                    .slice(right_array_pos, len)
906                    .iter()
907                    .for_each(|v| result_array_builder.append_option(v));
908
909                right_array_pos += len;
910                last_end = end;
911            });
912        }
913        None => SlicesIterator::new(left_result).for_each(|(start, end)| {
914            // the gap needs to be filled with false
915            if start > last_end {
916                result_array_builder.append_n(start - last_end, false);
917            }
918
919            // append nulls for this slice derictly
920            let len = end - start;
921            result_array_builder.append_nulls(len);
922
923            last_end = end;
924        }),
925    }
926
927    // Fill any remaining positions with false
928    if last_end < result_len {
929        result_array_builder.append_n(result_len - last_end, false);
930    }
931    let boolean_result = result_array_builder.finish();
932
933    Ok(ColumnarValue::Array(Arc::new(boolean_result)))
934}
935
936fn concat_elements(left: &ArrayRef, right: &ArrayRef) -> Result<ArrayRef> {
937    Ok(match left.data_type() {
938        DataType::Utf8 => Arc::new(concat_elements_utf8(
939            left.as_string::<i32>(),
940            right.as_string::<i32>(),
941        )?),
942        DataType::LargeUtf8 => Arc::new(concat_elements_utf8(
943            left.as_string::<i64>(),
944            right.as_string::<i64>(),
945        )?),
946        DataType::Utf8View => Arc::new(concat_elements_utf8view(
947            left.as_string_view(),
948            right.as_string_view(),
949        )?),
950        DataType::Binary => Arc::new(concat_element_binary::<i32>(
951            left.as_binary(),
952            right.as_binary(),
953        )?),
954        DataType::LargeBinary => Arc::new(concat_element_binary::<i64>(
955            left.as_binary(),
956            right.as_binary(),
957        )?),
958        DataType::BinaryView => Arc::new(concat_elements_binary_view_array(
959            left.as_binary_view(),
960            right.as_binary_view(),
961        )?),
962        other => {
963            return internal_err!(
964                "Data type {other:?} not supported for binary operation 'concat_elements' on string arrays"
965            );
966        }
967    })
968}
969
970/// Create a binary expression whose arguments are correctly coerced.
971/// This function errors if it is not possible to coerce the arguments
972/// to computational types supported by the operator.
973pub fn binary(
974    lhs: Arc<dyn PhysicalExpr>,
975    op: Operator,
976    rhs: Arc<dyn PhysicalExpr>,
977    _input_schema: &Schema,
978) -> Result<Arc<dyn PhysicalExpr>> {
979    Ok(Arc::new(BinaryExpr::new(lhs, op, rhs)))
980}
981
982/// Create a similar to expression
983pub fn similar_to(
984    negated: bool,
985    case_insensitive: bool,
986    expr: Arc<dyn PhysicalExpr>,
987    pattern: Arc<dyn PhysicalExpr>,
988) -> Result<Arc<dyn PhysicalExpr>> {
989    let binary_op = match (negated, case_insensitive) {
990        (false, false) => Operator::RegexMatch,
991        (false, true) => Operator::RegexIMatch,
992        (true, false) => Operator::RegexNotMatch,
993        (true, true) => Operator::RegexNotIMatch,
994    };
995    Ok(Arc::new(BinaryExpr::new(expr, binary_op, pattern)))
996}
997
998#[cfg(test)]
999mod tests {
1000    use super::*;
1001    use crate::expressions::{Column, Literal, col, lit, try_cast};
1002    use datafusion_expr::lit as expr_lit;
1003
1004    use datafusion_common::plan_datafusion_err;
1005    use datafusion_physical_expr_common::physical_expr::fmt_sql;
1006
1007    use crate::planner::logical2physical;
1008    use arrow::array::BooleanArray;
1009    use datafusion_expr::col as logical_col;
1010    /// Performs a binary operation, applying any type coercion necessary
1011    fn binary_op(
1012        left: Arc<dyn PhysicalExpr>,
1013        op: Operator,
1014        right: Arc<dyn PhysicalExpr>,
1015        schema: &Schema,
1016    ) -> Result<Arc<dyn PhysicalExpr>> {
1017        let left_type = left.data_type(schema)?;
1018        let right_type = right.data_type(schema)?;
1019        let (lhs, rhs) =
1020            BinaryTypeCoercer::new(&left_type, &op, &right_type).get_input_types()?;
1021
1022        let left_expr = try_cast(left, schema, lhs)?;
1023        let right_expr = try_cast(right, schema, rhs)?;
1024        binary(left_expr, op, right_expr, schema)
1025    }
1026
1027    #[test]
1028    fn binary_comparison() -> Result<()> {
1029        let schema = Schema::new(vec![
1030            Field::new("a", DataType::Int32, false),
1031            Field::new("b", DataType::Int32, false),
1032        ]);
1033        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1034        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1035
1036        // expression: "a < b"
1037        let lt = binary(
1038            col("a", &schema)?,
1039            Operator::Lt,
1040            col("b", &schema)?,
1041            &schema,
1042        )?;
1043        let batch =
1044            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
1045
1046        let result = lt
1047            .evaluate(&batch)?
1048            .into_array(batch.num_rows())
1049            .expect("Failed to convert to array");
1050        assert_eq!(result.len(), 5);
1051
1052        let expected = [false, false, true, true, true];
1053        let result =
1054            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
1055        for (i, &expected_item) in expected.iter().enumerate().take(5) {
1056            assert_eq!(result.value(i), expected_item);
1057        }
1058
1059        Ok(())
1060    }
1061
1062    #[test]
1063    fn binary_nested() -> Result<()> {
1064        let schema = Schema::new(vec![
1065            Field::new("a", DataType::Int32, false),
1066            Field::new("b", DataType::Int32, false),
1067        ]);
1068        let a = Int32Array::from(vec![2, 4, 6, 8, 10]);
1069        let b = Int32Array::from(vec![2, 5, 4, 8, 8]);
1070
1071        // expression: "a < b OR a == b"
1072        let expr = binary(
1073            binary(
1074                col("a", &schema)?,
1075                Operator::Lt,
1076                col("b", &schema)?,
1077                &schema,
1078            )?,
1079            Operator::Or,
1080            binary(
1081                col("a", &schema)?,
1082                Operator::Eq,
1083                col("b", &schema)?,
1084                &schema,
1085            )?,
1086            &schema,
1087        )?;
1088        let batch =
1089            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
1090
1091        assert_eq!("a@0 < b@1 OR a@0 = b@1", format!("{expr}"));
1092
1093        let result = expr
1094            .evaluate(&batch)?
1095            .into_array(batch.num_rows())
1096            .expect("Failed to convert to array");
1097        assert_eq!(result.len(), 5);
1098
1099        let expected = [true, true, false, true, false];
1100        let result =
1101            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
1102        for (i, &expected_item) in expected.iter().enumerate().take(5) {
1103            assert_eq!(result.value(i), expected_item);
1104        }
1105
1106        Ok(())
1107    }
1108
1109    // runs an end-to-end test of physical type coercion:
1110    // 1. construct a record batch with two columns of type A and B
1111    //  (*_ARRAY is the Rust Arrow array type, and *_TYPE is the DataType of the elements)
1112    // 2. construct a physical expression of A OP B
1113    // 3. evaluate the expression
1114    // 4. verify that the resulting expression is of type C
1115    // 5. verify that the results of evaluation are $VEC
1116    macro_rules! test_coercion {
1117        ($A_ARRAY:ident, $A_TYPE:expr, $A_VEC:expr, $B_ARRAY:ident, $B_TYPE:expr, $B_VEC:expr, $OP:expr, $C_ARRAY:ident, $C_TYPE:expr, $VEC:expr,) => {{
1118            let schema = Schema::new(vec![
1119                Field::new("a", $A_TYPE, false),
1120                Field::new("b", $B_TYPE, false),
1121            ]);
1122            let a = $A_ARRAY::from($A_VEC);
1123            let b = $B_ARRAY::from($B_VEC);
1124            let (lhs, rhs) =
1125                BinaryTypeCoercer::new(&$A_TYPE, &$OP, &$B_TYPE).get_input_types()?;
1126
1127            let left = try_cast(col("a", &schema)?, &schema, lhs)?;
1128            let right = try_cast(col("b", &schema)?, &schema, rhs)?;
1129
1130            // verify that we can construct the expression
1131            let expression = binary(left, $OP, right, &schema)?;
1132            let batch = RecordBatch::try_new(
1133                Arc::new(schema.clone()),
1134                vec![Arc::new(a), Arc::new(b)],
1135            )?;
1136
1137            // verify that the expression's type is correct
1138            assert_eq!(expression.data_type(&schema)?, $C_TYPE);
1139
1140            // compute
1141            let result = expression
1142                .evaluate(&batch)?
1143                .into_array(batch.num_rows())
1144                .expect("Failed to convert to array");
1145
1146            // verify that the array's data_type is correct
1147            assert_eq!(*result.data_type(), $C_TYPE);
1148
1149            // verify that the data itself is downcastable
1150            let result = result
1151                .as_any()
1152                .downcast_ref::<$C_ARRAY>()
1153                .expect("failed to downcast");
1154            // verify that the result itself is correct
1155            for (i, x) in $VEC.iter().enumerate() {
1156                let v = result.value(i);
1157                assert_eq!(
1158                    v, *x,
1159                    "Unexpected output at position {i}:\n\nActual:\n{v}\n\nExpected:\n{x}"
1160                );
1161            }
1162        }};
1163    }
1164
1165    #[test]
1166    fn test_type_coercion() -> Result<()> {
1167        test_coercion!(
1168            Int32Array,
1169            DataType::Int32,
1170            vec![1i32, 2i32],
1171            UInt32Array,
1172            DataType::UInt32,
1173            vec![1u32, 2u32],
1174            Operator::Plus,
1175            Int64Array,
1176            DataType::Int64,
1177            [2i64, 4i64],
1178        );
1179        test_coercion!(
1180            Int32Array,
1181            DataType::Int32,
1182            vec![1i32],
1183            UInt16Array,
1184            DataType::UInt16,
1185            vec![1u16],
1186            Operator::Plus,
1187            Int32Array,
1188            DataType::Int32,
1189            [2i32],
1190        );
1191        test_coercion!(
1192            Float32Array,
1193            DataType::Float32,
1194            vec![1f32],
1195            UInt16Array,
1196            DataType::UInt16,
1197            vec![1u16],
1198            Operator::Plus,
1199            Float32Array,
1200            DataType::Float32,
1201            [2f32],
1202        );
1203        test_coercion!(
1204            Float32Array,
1205            DataType::Float32,
1206            vec![2f32],
1207            UInt16Array,
1208            DataType::UInt16,
1209            vec![1u16],
1210            Operator::Multiply,
1211            Float32Array,
1212            DataType::Float32,
1213            [2f32],
1214        );
1215        test_coercion!(
1216            StringArray,
1217            DataType::Utf8,
1218            vec!["1994-12-13", "1995-01-26"],
1219            Date32Array,
1220            DataType::Date32,
1221            vec![9112, 9156],
1222            Operator::Eq,
1223            BooleanArray,
1224            DataType::Boolean,
1225            [true, true],
1226        );
1227        test_coercion!(
1228            StringArray,
1229            DataType::Utf8,
1230            vec!["1994-12-13", "1995-01-26"],
1231            Date32Array,
1232            DataType::Date32,
1233            vec![9113, 9154],
1234            Operator::Lt,
1235            BooleanArray,
1236            DataType::Boolean,
1237            [true, false],
1238        );
1239        test_coercion!(
1240            StringArray,
1241            DataType::Utf8,
1242            vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
1243            Date64Array,
1244            DataType::Date64,
1245            vec![787322096000, 791083425000],
1246            Operator::Eq,
1247            BooleanArray,
1248            DataType::Boolean,
1249            [true, true],
1250        );
1251        test_coercion!(
1252            StringArray,
1253            DataType::Utf8,
1254            vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
1255            Date64Array,
1256            DataType::Date64,
1257            vec![787322096001, 791083424999],
1258            Operator::Lt,
1259            BooleanArray,
1260            DataType::Boolean,
1261            [true, false],
1262        );
1263        test_coercion!(
1264            StringViewArray,
1265            DataType::Utf8View,
1266            vec!["abc"; 5],
1267            StringArray,
1268            DataType::Utf8,
1269            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1270            Operator::RegexMatch,
1271            BooleanArray,
1272            DataType::Boolean,
1273            [true, false, true, false, false],
1274        );
1275        test_coercion!(
1276            StringViewArray,
1277            DataType::Utf8View,
1278            vec!["abc"; 5],
1279            StringArray,
1280            DataType::Utf8,
1281            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1282            Operator::RegexIMatch,
1283            BooleanArray,
1284            DataType::Boolean,
1285            [true, true, true, true, false],
1286        );
1287        test_coercion!(
1288            StringArray,
1289            DataType::Utf8,
1290            vec!["abc"; 5],
1291            StringViewArray,
1292            DataType::Utf8View,
1293            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1294            Operator::RegexNotMatch,
1295            BooleanArray,
1296            DataType::Boolean,
1297            [false, true, false, true, true],
1298        );
1299        test_coercion!(
1300            StringArray,
1301            DataType::Utf8,
1302            vec!["abc"; 5],
1303            StringViewArray,
1304            DataType::Utf8View,
1305            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1306            Operator::RegexNotIMatch,
1307            BooleanArray,
1308            DataType::Boolean,
1309            [false, false, false, false, true],
1310        );
1311        test_coercion!(
1312            StringArray,
1313            DataType::Utf8,
1314            vec!["abc"; 5],
1315            StringArray,
1316            DataType::Utf8,
1317            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1318            Operator::RegexMatch,
1319            BooleanArray,
1320            DataType::Boolean,
1321            [true, false, true, false, false],
1322        );
1323        test_coercion!(
1324            StringArray,
1325            DataType::Utf8,
1326            vec!["abc"; 5],
1327            StringArray,
1328            DataType::Utf8,
1329            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1330            Operator::RegexIMatch,
1331            BooleanArray,
1332            DataType::Boolean,
1333            [true, true, true, true, false],
1334        );
1335        test_coercion!(
1336            StringArray,
1337            DataType::Utf8,
1338            vec!["abc"; 5],
1339            StringArray,
1340            DataType::Utf8,
1341            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1342            Operator::RegexNotMatch,
1343            BooleanArray,
1344            DataType::Boolean,
1345            [false, true, false, true, true],
1346        );
1347        test_coercion!(
1348            StringArray,
1349            DataType::Utf8,
1350            vec!["abc"; 5],
1351            StringArray,
1352            DataType::Utf8,
1353            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1354            Operator::RegexNotIMatch,
1355            BooleanArray,
1356            DataType::Boolean,
1357            [false, false, false, false, true],
1358        );
1359        test_coercion!(
1360            LargeStringArray,
1361            DataType::LargeUtf8,
1362            vec!["abc"; 5],
1363            LargeStringArray,
1364            DataType::LargeUtf8,
1365            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1366            Operator::RegexMatch,
1367            BooleanArray,
1368            DataType::Boolean,
1369            [true, false, true, false, false],
1370        );
1371        test_coercion!(
1372            LargeStringArray,
1373            DataType::LargeUtf8,
1374            vec!["abc"; 5],
1375            LargeStringArray,
1376            DataType::LargeUtf8,
1377            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1378            Operator::RegexIMatch,
1379            BooleanArray,
1380            DataType::Boolean,
1381            [true, true, true, true, false],
1382        );
1383        test_coercion!(
1384            LargeStringArray,
1385            DataType::LargeUtf8,
1386            vec!["abc"; 5],
1387            LargeStringArray,
1388            DataType::LargeUtf8,
1389            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1390            Operator::RegexNotMatch,
1391            BooleanArray,
1392            DataType::Boolean,
1393            [false, true, false, true, true],
1394        );
1395        test_coercion!(
1396            LargeStringArray,
1397            DataType::LargeUtf8,
1398            vec!["abc"; 5],
1399            LargeStringArray,
1400            DataType::LargeUtf8,
1401            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1402            Operator::RegexNotIMatch,
1403            BooleanArray,
1404            DataType::Boolean,
1405            [false, false, false, false, true],
1406        );
1407        test_coercion!(
1408            StringArray,
1409            DataType::Utf8,
1410            vec!["abc"; 5],
1411            StringArray,
1412            DataType::Utf8,
1413            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1414            Operator::LikeMatch,
1415            BooleanArray,
1416            DataType::Boolean,
1417            [true, false, false, true, false],
1418        );
1419        test_coercion!(
1420            StringArray,
1421            DataType::Utf8,
1422            vec!["abc"; 5],
1423            StringArray,
1424            DataType::Utf8,
1425            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1426            Operator::ILikeMatch,
1427            BooleanArray,
1428            DataType::Boolean,
1429            [true, true, false, true, true],
1430        );
1431        test_coercion!(
1432            StringArray,
1433            DataType::Utf8,
1434            vec!["abc"; 5],
1435            StringArray,
1436            DataType::Utf8,
1437            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1438            Operator::NotLikeMatch,
1439            BooleanArray,
1440            DataType::Boolean,
1441            [false, true, true, false, true],
1442        );
1443        test_coercion!(
1444            StringArray,
1445            DataType::Utf8,
1446            vec!["abc"; 5],
1447            StringArray,
1448            DataType::Utf8,
1449            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1450            Operator::NotILikeMatch,
1451            BooleanArray,
1452            DataType::Boolean,
1453            [false, false, true, false, false],
1454        );
1455        test_coercion!(
1456            LargeStringArray,
1457            DataType::LargeUtf8,
1458            vec!["abc"; 5],
1459            LargeStringArray,
1460            DataType::LargeUtf8,
1461            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1462            Operator::LikeMatch,
1463            BooleanArray,
1464            DataType::Boolean,
1465            [true, false, false, true, false],
1466        );
1467        test_coercion!(
1468            LargeStringArray,
1469            DataType::LargeUtf8,
1470            vec!["abc"; 5],
1471            LargeStringArray,
1472            DataType::LargeUtf8,
1473            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1474            Operator::ILikeMatch,
1475            BooleanArray,
1476            DataType::Boolean,
1477            [true, true, false, true, true],
1478        );
1479        test_coercion!(
1480            LargeStringArray,
1481            DataType::LargeUtf8,
1482            vec!["abc"; 5],
1483            LargeStringArray,
1484            DataType::LargeUtf8,
1485            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1486            Operator::NotLikeMatch,
1487            BooleanArray,
1488            DataType::Boolean,
1489            [false, true, true, false, true],
1490        );
1491        test_coercion!(
1492            LargeStringArray,
1493            DataType::LargeUtf8,
1494            vec!["abc"; 5],
1495            LargeStringArray,
1496            DataType::LargeUtf8,
1497            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1498            Operator::NotILikeMatch,
1499            BooleanArray,
1500            DataType::Boolean,
1501            [false, false, true, false, false],
1502        );
1503        test_coercion!(
1504            Int16Array,
1505            DataType::Int16,
1506            vec![1i16, 2i16, 3i16],
1507            Int64Array,
1508            DataType::Int64,
1509            vec![10i64, 4i64, 5i64],
1510            Operator::BitwiseAnd,
1511            Int64Array,
1512            DataType::Int64,
1513            [0i64, 0i64, 1i64],
1514        );
1515        test_coercion!(
1516            UInt16Array,
1517            DataType::UInt16,
1518            vec![1u16, 2u16, 3u16],
1519            UInt64Array,
1520            DataType::UInt64,
1521            vec![10u64, 4u64, 5u64],
1522            Operator::BitwiseAnd,
1523            UInt64Array,
1524            DataType::UInt64,
1525            [0u64, 0u64, 1u64],
1526        );
1527        test_coercion!(
1528            Int16Array,
1529            DataType::Int16,
1530            vec![3i16, 2i16, 3i16],
1531            Int64Array,
1532            DataType::Int64,
1533            vec![10i64, 6i64, 5i64],
1534            Operator::BitwiseOr,
1535            Int64Array,
1536            DataType::Int64,
1537            [11i64, 6i64, 7i64],
1538        );
1539        test_coercion!(
1540            UInt16Array,
1541            DataType::UInt16,
1542            vec![1u16, 2u16, 3u16],
1543            UInt64Array,
1544            DataType::UInt64,
1545            vec![10u64, 4u64, 5u64],
1546            Operator::BitwiseOr,
1547            UInt64Array,
1548            DataType::UInt64,
1549            [11u64, 6u64, 7u64],
1550        );
1551        test_coercion!(
1552            Int16Array,
1553            DataType::Int16,
1554            vec![3i16, 2i16, 3i16],
1555            Int64Array,
1556            DataType::Int64,
1557            vec![10i64, 6i64, 5i64],
1558            Operator::BitwiseXor,
1559            Int64Array,
1560            DataType::Int64,
1561            [9i64, 4i64, 6i64],
1562        );
1563        test_coercion!(
1564            UInt16Array,
1565            DataType::UInt16,
1566            vec![3u16, 2u16, 3u16],
1567            UInt64Array,
1568            DataType::UInt64,
1569            vec![10u64, 6u64, 5u64],
1570            Operator::BitwiseXor,
1571            UInt64Array,
1572            DataType::UInt64,
1573            [9u64, 4u64, 6u64],
1574        );
1575        test_coercion!(
1576            Int16Array,
1577            DataType::Int16,
1578            vec![4i16, 27i16, 35i16],
1579            Int64Array,
1580            DataType::Int64,
1581            vec![2i64, 3i64, 4i64],
1582            Operator::BitwiseShiftRight,
1583            Int64Array,
1584            DataType::Int64,
1585            [1i64, 3i64, 2i64],
1586        );
1587        test_coercion!(
1588            UInt16Array,
1589            DataType::UInt16,
1590            vec![4u16, 27u16, 35u16],
1591            UInt64Array,
1592            DataType::UInt64,
1593            vec![2u64, 3u64, 4u64],
1594            Operator::BitwiseShiftRight,
1595            UInt64Array,
1596            DataType::UInt64,
1597            [1u64, 3u64, 2u64],
1598        );
1599        test_coercion!(
1600            Int16Array,
1601            DataType::Int16,
1602            vec![2i16, 3i16, 4i16],
1603            Int64Array,
1604            DataType::Int64,
1605            vec![4i64, 12i64, 7i64],
1606            Operator::BitwiseShiftLeft,
1607            Int64Array,
1608            DataType::Int64,
1609            [32i64, 12288i64, 512i64],
1610        );
1611        test_coercion!(
1612            UInt16Array,
1613            DataType::UInt16,
1614            vec![2u16, 3u16, 4u16],
1615            UInt64Array,
1616            DataType::UInt64,
1617            vec![4u64, 12u64, 7u64],
1618            Operator::BitwiseShiftLeft,
1619            UInt64Array,
1620            DataType::UInt64,
1621            [32u64, 12288u64, 512u64],
1622        );
1623        Ok(())
1624    }
1625
1626    // Note it would be nice to use the same test_coercion macro as
1627    // above, but sadly the type of the values of the dictionary are
1628    // not encoded in the rust type of the DictionaryArray. Thus there
1629    // is no way at the time of this writing to create a dictionary
1630    // array using the `From` trait
1631    #[test]
1632    fn test_dictionary_type_to_array_coercion() -> Result<()> {
1633        // Test string  a string dictionary
1634        let dict_type =
1635            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1636        let string_type = DataType::Utf8;
1637
1638        // build dictionary
1639        let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
1640
1641        dict_builder.append("one")?;
1642        dict_builder.append_null();
1643        dict_builder.append("three")?;
1644        dict_builder.append("four")?;
1645        let dict_array = Arc::new(dict_builder.finish()) as ArrayRef;
1646
1647        let str_array = Arc::new(StringArray::from(vec![
1648            Some("not one"),
1649            Some("two"),
1650            None,
1651            Some("four"),
1652        ])) as ArrayRef;
1653
1654        let schema = Arc::new(Schema::new(vec![
1655            Field::new("a", dict_type.clone(), true),
1656            Field::new("b", string_type.clone(), true),
1657        ]));
1658
1659        // Test 1: a = b
1660        let result = BooleanArray::from(vec![Some(false), None, None, Some(true)]);
1661        apply_logic_op(&schema, &dict_array, &str_array, Operator::Eq, result)?;
1662
1663        // Test 2: now test the other direction
1664        // b = a
1665        let schema = Arc::new(Schema::new(vec![
1666            Field::new("a", string_type, true),
1667            Field::new("b", dict_type, true),
1668        ]));
1669        let result = BooleanArray::from(vec![Some(false), None, None, Some(true)]);
1670        apply_logic_op(&schema, &str_array, &dict_array, Operator::Eq, result)?;
1671
1672        Ok(())
1673    }
1674
1675    #[test]
1676    fn plus_op() -> Result<()> {
1677        let schema = Schema::new(vec![
1678            Field::new("a", DataType::Int32, false),
1679            Field::new("b", DataType::Int32, false),
1680        ]);
1681        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1682        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1683
1684        apply_arithmetic::<Int32Type>(
1685            Arc::new(schema),
1686            vec![Arc::new(a), Arc::new(b)],
1687            Operator::Plus,
1688            Int32Array::from(vec![2, 4, 7, 12, 21]),
1689        )?;
1690
1691        Ok(())
1692    }
1693
1694    #[test]
1695    fn plus_op_dict() -> Result<()> {
1696        let schema = Schema::new(vec![
1697            Field::new(
1698                "a",
1699                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1700                true,
1701            ),
1702            Field::new(
1703                "b",
1704                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1705                true,
1706            ),
1707        ]);
1708
1709        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1710        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
1711        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
1712
1713        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1714        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
1715        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
1716
1717        apply_arithmetic::<Int32Type>(
1718            Arc::new(schema),
1719            vec![Arc::new(a), Arc::new(b)],
1720            Operator::Plus,
1721            Int32Array::from(vec![Some(2), None, Some(4), Some(8), None]),
1722        )?;
1723
1724        Ok(())
1725    }
1726
1727    #[test]
1728    fn plus_op_dict_decimal() -> Result<()> {
1729        let schema = Schema::new(vec![
1730            Field::new(
1731                "a",
1732                DataType::Dictionary(
1733                    Box::new(DataType::Int8),
1734                    Box::new(DataType::Decimal128(10, 0)),
1735                ),
1736                true,
1737            ),
1738            Field::new(
1739                "b",
1740                DataType::Dictionary(
1741                    Box::new(DataType::Int8),
1742                    Box::new(DataType::Decimal128(10, 0)),
1743                ),
1744                true,
1745            ),
1746        ]);
1747
1748        let value = 123;
1749        let decimal_array = Arc::new(create_decimal_array(
1750            &[
1751                Some(value),
1752                Some(value + 2),
1753                Some(value - 1),
1754                Some(value + 1),
1755            ],
1756            10,
1757            0,
1758        ));
1759
1760        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
1761        let a = DictionaryArray::try_new(keys, decimal_array)?;
1762
1763        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
1764        let decimal_array = Arc::new(create_decimal_array(
1765            &[
1766                Some(value + 1),
1767                Some(value + 3),
1768                Some(value),
1769                Some(value + 2),
1770            ],
1771            10,
1772            0,
1773        ));
1774        let b = DictionaryArray::try_new(keys, decimal_array)?;
1775
1776        apply_arithmetic(
1777            Arc::new(schema),
1778            vec![Arc::new(a), Arc::new(b)],
1779            Operator::Plus,
1780            create_decimal_array(&[Some(247), None, None, Some(247), Some(246)], 11, 0),
1781        )?;
1782
1783        Ok(())
1784    }
1785
1786    #[test]
1787    fn plus_op_scalar() -> Result<()> {
1788        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1789        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1790
1791        apply_arithmetic_scalar(
1792            Arc::new(schema),
1793            vec![Arc::new(a)],
1794            Operator::Plus,
1795            ScalarValue::Int32(Some(1)),
1796            Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
1797        )?;
1798
1799        Ok(())
1800    }
1801
1802    #[test]
1803    fn plus_op_dict_scalar() -> Result<()> {
1804        let schema = Schema::new(vec![Field::new(
1805            "a",
1806            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1807            true,
1808        )]);
1809
1810        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
1811
1812        dict_builder.append(1)?;
1813        dict_builder.append_null();
1814        dict_builder.append(2)?;
1815        dict_builder.append(5)?;
1816
1817        let a = dict_builder.finish();
1818
1819        let expected: PrimitiveArray<Int32Type> =
1820            PrimitiveArray::from(vec![Some(2), None, Some(3), Some(6)]);
1821
1822        apply_arithmetic_scalar(
1823            Arc::new(schema),
1824            vec![Arc::new(a)],
1825            Operator::Plus,
1826            ScalarValue::Dictionary(
1827                Box::new(DataType::Int8),
1828                Box::new(ScalarValue::Int32(Some(1))),
1829            ),
1830            Arc::new(expected),
1831        )?;
1832
1833        Ok(())
1834    }
1835
1836    #[test]
1837    fn plus_op_dict_scalar_decimal() -> Result<()> {
1838        let schema = Schema::new(vec![Field::new(
1839            "a",
1840            DataType::Dictionary(
1841                Box::new(DataType::Int8),
1842                Box::new(DataType::Decimal128(10, 0)),
1843            ),
1844            true,
1845        )]);
1846
1847        let value = 123;
1848        let decimal_array = Arc::new(create_decimal_array(
1849            &[Some(value), None, Some(value - 1), Some(value + 1)],
1850            10,
1851            0,
1852        ));
1853
1854        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
1855        let a = DictionaryArray::try_new(keys, decimal_array)?;
1856
1857        let decimal_array = Arc::new(create_decimal_array(
1858            &[
1859                Some(value + 1),
1860                Some(value),
1861                None,
1862                Some(value + 2),
1863                Some(value + 1),
1864            ],
1865            11,
1866            0,
1867        ));
1868
1869        apply_arithmetic_scalar(
1870            Arc::new(schema),
1871            vec![Arc::new(a)],
1872            Operator::Plus,
1873            ScalarValue::Dictionary(
1874                Box::new(DataType::Int8),
1875                Box::new(ScalarValue::Decimal128(Some(1), 10, 0)),
1876            ),
1877            decimal_array,
1878        )?;
1879
1880        Ok(())
1881    }
1882
1883    #[test]
1884    fn minus_op() -> Result<()> {
1885        let schema = Arc::new(Schema::new(vec![
1886            Field::new("a", DataType::Int32, false),
1887            Field::new("b", DataType::Int32, false),
1888        ]));
1889        let a = Arc::new(Int32Array::from(vec![1, 2, 4, 8, 16]));
1890        let b = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1891
1892        apply_arithmetic::<Int32Type>(
1893            Arc::clone(&schema),
1894            vec![
1895                Arc::clone(&a) as Arc<dyn Array>,
1896                Arc::clone(&b) as Arc<dyn Array>,
1897            ],
1898            Operator::Minus,
1899            Int32Array::from(vec![0, 0, 1, 4, 11]),
1900        )?;
1901
1902        // should handle have negative values in result (for signed)
1903        apply_arithmetic::<Int32Type>(
1904            schema,
1905            vec![b, a],
1906            Operator::Minus,
1907            Int32Array::from(vec![0, 0, -1, -4, -11]),
1908        )?;
1909
1910        Ok(())
1911    }
1912
1913    #[test]
1914    fn minus_op_dict() -> Result<()> {
1915        let schema = Schema::new(vec![
1916            Field::new(
1917                "a",
1918                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1919                true,
1920            ),
1921            Field::new(
1922                "b",
1923                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1924                true,
1925            ),
1926        ]);
1927
1928        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1929        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
1930        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
1931
1932        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1933        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
1934        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
1935
1936        apply_arithmetic::<Int32Type>(
1937            Arc::new(schema),
1938            vec![Arc::new(a), Arc::new(b)],
1939            Operator::Minus,
1940            Int32Array::from(vec![Some(0), None, Some(0), Some(0), None]),
1941        )?;
1942
1943        Ok(())
1944    }
1945
1946    #[test]
1947    fn minus_op_dict_decimal() -> Result<()> {
1948        let schema = Schema::new(vec![
1949            Field::new(
1950                "a",
1951                DataType::Dictionary(
1952                    Box::new(DataType::Int8),
1953                    Box::new(DataType::Decimal128(10, 0)),
1954                ),
1955                true,
1956            ),
1957            Field::new(
1958                "b",
1959                DataType::Dictionary(
1960                    Box::new(DataType::Int8),
1961                    Box::new(DataType::Decimal128(10, 0)),
1962                ),
1963                true,
1964            ),
1965        ]);
1966
1967        let value = 123;
1968        let decimal_array = Arc::new(create_decimal_array(
1969            &[
1970                Some(value),
1971                Some(value + 2),
1972                Some(value - 1),
1973                Some(value + 1),
1974            ],
1975            10,
1976            0,
1977        ));
1978
1979        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
1980        let a = DictionaryArray::try_new(keys, decimal_array)?;
1981
1982        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
1983        let decimal_array = Arc::new(create_decimal_array(
1984            &[
1985                Some(value + 1),
1986                Some(value + 3),
1987                Some(value),
1988                Some(value + 2),
1989            ],
1990            10,
1991            0,
1992        ));
1993        let b = DictionaryArray::try_new(keys, decimal_array)?;
1994
1995        apply_arithmetic(
1996            Arc::new(schema),
1997            vec![Arc::new(a), Arc::new(b)],
1998            Operator::Minus,
1999            create_decimal_array(&[Some(-1), None, None, Some(1), Some(0)], 11, 0),
2000        )?;
2001
2002        Ok(())
2003    }
2004
2005    #[test]
2006    fn minus_op_scalar() -> Result<()> {
2007        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2008        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2009
2010        apply_arithmetic_scalar(
2011            Arc::new(schema),
2012            vec![Arc::new(a)],
2013            Operator::Minus,
2014            ScalarValue::Int32(Some(1)),
2015            Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4])),
2016        )?;
2017
2018        Ok(())
2019    }
2020
2021    #[test]
2022    fn minus_op_dict_scalar() -> Result<()> {
2023        let schema = Schema::new(vec![Field::new(
2024            "a",
2025            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2026            true,
2027        )]);
2028
2029        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2030
2031        dict_builder.append(1)?;
2032        dict_builder.append_null();
2033        dict_builder.append(2)?;
2034        dict_builder.append(5)?;
2035
2036        let a = dict_builder.finish();
2037
2038        let expected: PrimitiveArray<Int32Type> =
2039            PrimitiveArray::from(vec![Some(0), None, Some(1), Some(4)]);
2040
2041        apply_arithmetic_scalar(
2042            Arc::new(schema),
2043            vec![Arc::new(a)],
2044            Operator::Minus,
2045            ScalarValue::Dictionary(
2046                Box::new(DataType::Int8),
2047                Box::new(ScalarValue::Int32(Some(1))),
2048            ),
2049            Arc::new(expected),
2050        )?;
2051
2052        Ok(())
2053    }
2054
2055    #[test]
2056    fn minus_op_dict_scalar_decimal() -> Result<()> {
2057        let schema = Schema::new(vec![Field::new(
2058            "a",
2059            DataType::Dictionary(
2060                Box::new(DataType::Int8),
2061                Box::new(DataType::Decimal128(10, 0)),
2062            ),
2063            true,
2064        )]);
2065
2066        let value = 123;
2067        let decimal_array = Arc::new(create_decimal_array(
2068            &[Some(value), None, Some(value - 1), Some(value + 1)],
2069            10,
2070            0,
2071        ));
2072
2073        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2074        let a = DictionaryArray::try_new(keys, decimal_array)?;
2075
2076        let decimal_array = Arc::new(create_decimal_array(
2077            &[
2078                Some(value - 1),
2079                Some(value - 2),
2080                None,
2081                Some(value),
2082                Some(value - 1),
2083            ],
2084            11,
2085            0,
2086        ));
2087
2088        apply_arithmetic_scalar(
2089            Arc::new(schema),
2090            vec![Arc::new(a)],
2091            Operator::Minus,
2092            ScalarValue::Dictionary(
2093                Box::new(DataType::Int8),
2094                Box::new(ScalarValue::Decimal128(Some(1), 10, 0)),
2095            ),
2096            decimal_array,
2097        )?;
2098
2099        Ok(())
2100    }
2101
2102    #[test]
2103    fn multiply_op() -> Result<()> {
2104        let schema = Arc::new(Schema::new(vec![
2105            Field::new("a", DataType::Int32, false),
2106            Field::new("b", DataType::Int32, false),
2107        ]));
2108        let a = Arc::new(Int32Array::from(vec![4, 8, 16, 32, 64]));
2109        let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
2110
2111        apply_arithmetic::<Int32Type>(
2112            schema,
2113            vec![a, b],
2114            Operator::Multiply,
2115            Int32Array::from(vec![8, 32, 128, 512, 2048]),
2116        )?;
2117
2118        Ok(())
2119    }
2120
2121    #[test]
2122    fn multiply_op_dict() -> Result<()> {
2123        let schema = Schema::new(vec![
2124            Field::new(
2125                "a",
2126                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2127                true,
2128            ),
2129            Field::new(
2130                "b",
2131                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2132                true,
2133            ),
2134        ]);
2135
2136        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2137        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
2138        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
2139
2140        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2141        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2142        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2143
2144        apply_arithmetic::<Int32Type>(
2145            Arc::new(schema),
2146            vec![Arc::new(a), Arc::new(b)],
2147            Operator::Multiply,
2148            Int32Array::from(vec![Some(1), None, Some(4), Some(16), None]),
2149        )?;
2150
2151        Ok(())
2152    }
2153
2154    #[test]
2155    fn multiply_op_dict_decimal() -> Result<()> {
2156        let schema = Schema::new(vec![
2157            Field::new(
2158                "a",
2159                DataType::Dictionary(
2160                    Box::new(DataType::Int8),
2161                    Box::new(DataType::Decimal128(10, 0)),
2162                ),
2163                true,
2164            ),
2165            Field::new(
2166                "b",
2167                DataType::Dictionary(
2168                    Box::new(DataType::Int8),
2169                    Box::new(DataType::Decimal128(10, 0)),
2170                ),
2171                true,
2172            ),
2173        ]);
2174
2175        let value = 123;
2176        let decimal_array = Arc::new(create_decimal_array(
2177            &[
2178                Some(value),
2179                Some(value + 2),
2180                Some(value - 1),
2181                Some(value + 1),
2182            ],
2183            10,
2184            0,
2185        )) as ArrayRef;
2186
2187        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2188        let a = DictionaryArray::try_new(keys, decimal_array)?;
2189
2190        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2191        let decimal_array = Arc::new(create_decimal_array(
2192            &[
2193                Some(value + 1),
2194                Some(value + 3),
2195                Some(value),
2196                Some(value + 2),
2197            ],
2198            10,
2199            0,
2200        ));
2201        let b = DictionaryArray::try_new(keys, decimal_array)?;
2202
2203        apply_arithmetic(
2204            Arc::new(schema),
2205            vec![Arc::new(a), Arc::new(b)],
2206            Operator::Multiply,
2207            create_decimal_array(
2208                &[Some(15252), None, None, Some(15252), Some(15129)],
2209                21,
2210                0,
2211            ),
2212        )?;
2213
2214        Ok(())
2215    }
2216
2217    #[test]
2218    fn multiply_op_scalar() -> Result<()> {
2219        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2220        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2221
2222        apply_arithmetic_scalar(
2223            Arc::new(schema),
2224            vec![Arc::new(a)],
2225            Operator::Multiply,
2226            ScalarValue::Int32(Some(2)),
2227            Arc::new(Int32Array::from(vec![2, 4, 6, 8, 10])),
2228        )?;
2229
2230        Ok(())
2231    }
2232
2233    #[test]
2234    fn multiply_op_dict_scalar() -> Result<()> {
2235        let schema = Schema::new(vec![Field::new(
2236            "a",
2237            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2238            true,
2239        )]);
2240
2241        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2242
2243        dict_builder.append(1)?;
2244        dict_builder.append_null();
2245        dict_builder.append(2)?;
2246        dict_builder.append(5)?;
2247
2248        let a = dict_builder.finish();
2249
2250        let expected: PrimitiveArray<Int32Type> =
2251            PrimitiveArray::from(vec![Some(2), None, Some(4), Some(10)]);
2252
2253        apply_arithmetic_scalar(
2254            Arc::new(schema),
2255            vec![Arc::new(a)],
2256            Operator::Multiply,
2257            ScalarValue::Dictionary(
2258                Box::new(DataType::Int8),
2259                Box::new(ScalarValue::Int32(Some(2))),
2260            ),
2261            Arc::new(expected),
2262        )?;
2263
2264        Ok(())
2265    }
2266
2267    #[test]
2268    fn multiply_op_dict_scalar_decimal() -> Result<()> {
2269        let schema = Schema::new(vec![Field::new(
2270            "a",
2271            DataType::Dictionary(
2272                Box::new(DataType::Int8),
2273                Box::new(DataType::Decimal128(10, 0)),
2274            ),
2275            true,
2276        )]);
2277
2278        let value = 123;
2279        let decimal_array = Arc::new(create_decimal_array(
2280            &[Some(value), None, Some(value - 1), Some(value + 1)],
2281            10,
2282            0,
2283        ));
2284
2285        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2286        let a = DictionaryArray::try_new(keys, decimal_array)?;
2287
2288        let decimal_array = Arc::new(create_decimal_array(
2289            &[Some(246), Some(244), None, Some(248), Some(246)],
2290            21,
2291            0,
2292        ));
2293
2294        apply_arithmetic_scalar(
2295            Arc::new(schema),
2296            vec![Arc::new(a)],
2297            Operator::Multiply,
2298            ScalarValue::Dictionary(
2299                Box::new(DataType::Int8),
2300                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2301            ),
2302            decimal_array,
2303        )?;
2304
2305        Ok(())
2306    }
2307
2308    #[test]
2309    fn divide_op() -> Result<()> {
2310        let schema = Arc::new(Schema::new(vec![
2311            Field::new("a", DataType::Int32, false),
2312            Field::new("b", DataType::Int32, false),
2313        ]));
2314        let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
2315        let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
2316
2317        apply_arithmetic::<Int32Type>(
2318            schema,
2319            vec![a, b],
2320            Operator::Divide,
2321            Int32Array::from(vec![4, 8, 16, 32, 64]),
2322        )?;
2323
2324        Ok(())
2325    }
2326
2327    #[test]
2328    fn divide_op_dict() -> Result<()> {
2329        let schema = Schema::new(vec![
2330            Field::new(
2331                "a",
2332                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2333                true,
2334            ),
2335            Field::new(
2336                "b",
2337                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2338                true,
2339            ),
2340        ]);
2341
2342        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2343
2344        dict_builder.append(1)?;
2345        dict_builder.append_null();
2346        dict_builder.append(2)?;
2347        dict_builder.append(5)?;
2348        dict_builder.append(0)?;
2349
2350        let a = dict_builder.finish();
2351
2352        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2353        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2354        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2355
2356        apply_arithmetic::<Int32Type>(
2357            Arc::new(schema),
2358            vec![Arc::new(a), Arc::new(b)],
2359            Operator::Divide,
2360            Int32Array::from(vec![Some(1), None, Some(1), Some(1), Some(0)]),
2361        )?;
2362
2363        Ok(())
2364    }
2365
2366    #[test]
2367    fn divide_op_dict_decimal() -> Result<()> {
2368        let schema = Schema::new(vec![
2369            Field::new(
2370                "a",
2371                DataType::Dictionary(
2372                    Box::new(DataType::Int8),
2373                    Box::new(DataType::Decimal128(10, 0)),
2374                ),
2375                true,
2376            ),
2377            Field::new(
2378                "b",
2379                DataType::Dictionary(
2380                    Box::new(DataType::Int8),
2381                    Box::new(DataType::Decimal128(10, 0)),
2382                ),
2383                true,
2384            ),
2385        ]);
2386
2387        let value = 123;
2388        let decimal_array = Arc::new(create_decimal_array(
2389            &[
2390                Some(value),
2391                Some(value + 2),
2392                Some(value - 1),
2393                Some(value + 1),
2394            ],
2395            10,
2396            0,
2397        ));
2398
2399        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2400        let a = DictionaryArray::try_new(keys, decimal_array)?;
2401
2402        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2403        let decimal_array = Arc::new(create_decimal_array(
2404            &[
2405                Some(value + 1),
2406                Some(value + 3),
2407                Some(value),
2408                Some(value + 2),
2409            ],
2410            10,
2411            0,
2412        ));
2413        let b = DictionaryArray::try_new(keys, decimal_array)?;
2414
2415        apply_arithmetic(
2416            Arc::new(schema),
2417            vec![Arc::new(a), Arc::new(b)],
2418            Operator::Divide,
2419            create_decimal_array(
2420                &[
2421                    Some(9919), // 0.9919
2422                    None,
2423                    None,
2424                    Some(10081), // 1.0081
2425                    Some(10000), // 1.0
2426                ],
2427                14,
2428                4,
2429            ),
2430        )?;
2431
2432        Ok(())
2433    }
2434
2435    #[test]
2436    fn divide_op_scalar() -> Result<()> {
2437        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2438        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2439
2440        apply_arithmetic_scalar(
2441            Arc::new(schema),
2442            vec![Arc::new(a)],
2443            Operator::Divide,
2444            ScalarValue::Int32(Some(2)),
2445            Arc::new(Int32Array::from(vec![0, 1, 1, 2, 2])),
2446        )?;
2447
2448        Ok(())
2449    }
2450
2451    #[test]
2452    fn divide_op_dict_scalar() -> Result<()> {
2453        let schema = Schema::new(vec![Field::new(
2454            "a",
2455            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2456            true,
2457        )]);
2458
2459        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2460
2461        dict_builder.append(1)?;
2462        dict_builder.append_null();
2463        dict_builder.append(2)?;
2464        dict_builder.append(5)?;
2465
2466        let a = dict_builder.finish();
2467
2468        let expected: PrimitiveArray<Int32Type> =
2469            PrimitiveArray::from(vec![Some(0), None, Some(1), Some(2)]);
2470
2471        apply_arithmetic_scalar(
2472            Arc::new(schema),
2473            vec![Arc::new(a)],
2474            Operator::Divide,
2475            ScalarValue::Dictionary(
2476                Box::new(DataType::Int8),
2477                Box::new(ScalarValue::Int32(Some(2))),
2478            ),
2479            Arc::new(expected),
2480        )?;
2481
2482        Ok(())
2483    }
2484
2485    #[test]
2486    fn divide_op_dict_scalar_decimal() -> Result<()> {
2487        let schema = Schema::new(vec![Field::new(
2488            "a",
2489            DataType::Dictionary(
2490                Box::new(DataType::Int8),
2491                Box::new(DataType::Decimal128(10, 0)),
2492            ),
2493            true,
2494        )]);
2495
2496        let value = 123;
2497        let decimal_array = Arc::new(create_decimal_array(
2498            &[Some(value), None, Some(value - 1), Some(value + 1)],
2499            10,
2500            0,
2501        ));
2502
2503        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2504        let a = DictionaryArray::try_new(keys, decimal_array)?;
2505
2506        let decimal_array = Arc::new(create_decimal_array(
2507            &[Some(615000), Some(610000), None, Some(620000), Some(615000)],
2508            14,
2509            4,
2510        ));
2511
2512        apply_arithmetic_scalar(
2513            Arc::new(schema),
2514            vec![Arc::new(a)],
2515            Operator::Divide,
2516            ScalarValue::Dictionary(
2517                Box::new(DataType::Int8),
2518                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2519            ),
2520            decimal_array,
2521        )?;
2522
2523        Ok(())
2524    }
2525
2526    #[test]
2527    fn modulus_op() -> Result<()> {
2528        let schema = Arc::new(Schema::new(vec![
2529            Field::new("a", DataType::Int32, false),
2530            Field::new("b", DataType::Int32, false),
2531        ]));
2532        let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
2533        let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));
2534
2535        apply_arithmetic::<Int32Type>(
2536            schema,
2537            vec![a, b],
2538            Operator::Modulo,
2539            Int32Array::from(vec![0, 0, 2, 8, 0]),
2540        )?;
2541
2542        Ok(())
2543    }
2544
2545    #[test]
2546    fn modulus_op_dict() -> Result<()> {
2547        let schema = Schema::new(vec![
2548            Field::new(
2549                "a",
2550                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2551                true,
2552            ),
2553            Field::new(
2554                "b",
2555                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2556                true,
2557            ),
2558        ]);
2559
2560        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2561
2562        dict_builder.append(1)?;
2563        dict_builder.append_null();
2564        dict_builder.append(2)?;
2565        dict_builder.append(5)?;
2566        dict_builder.append(0)?;
2567
2568        let a = dict_builder.finish();
2569
2570        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2571        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2572        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2573
2574        apply_arithmetic::<Int32Type>(
2575            Arc::new(schema),
2576            vec![Arc::new(a), Arc::new(b)],
2577            Operator::Modulo,
2578            Int32Array::from(vec![Some(0), None, Some(0), Some(1), Some(0)]),
2579        )?;
2580
2581        Ok(())
2582    }
2583
2584    #[test]
2585    fn modulus_op_dict_decimal() -> Result<()> {
2586        let schema = Schema::new(vec![
2587            Field::new(
2588                "a",
2589                DataType::Dictionary(
2590                    Box::new(DataType::Int8),
2591                    Box::new(DataType::Decimal128(10, 0)),
2592                ),
2593                true,
2594            ),
2595            Field::new(
2596                "b",
2597                DataType::Dictionary(
2598                    Box::new(DataType::Int8),
2599                    Box::new(DataType::Decimal128(10, 0)),
2600                ),
2601                true,
2602            ),
2603        ]);
2604
2605        let value = 123;
2606        let decimal_array = Arc::new(create_decimal_array(
2607            &[
2608                Some(value),
2609                Some(value + 2),
2610                Some(value - 1),
2611                Some(value + 1),
2612            ],
2613            10,
2614            0,
2615        ));
2616
2617        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2618        let a = DictionaryArray::try_new(keys, decimal_array)?;
2619
2620        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2621        let decimal_array = Arc::new(create_decimal_array(
2622            &[
2623                Some(value + 1),
2624                Some(value + 3),
2625                Some(value),
2626                Some(value + 2),
2627            ],
2628            10,
2629            0,
2630        ));
2631        let b = DictionaryArray::try_new(keys, decimal_array)?;
2632
2633        apply_arithmetic(
2634            Arc::new(schema),
2635            vec![Arc::new(a), Arc::new(b)],
2636            Operator::Modulo,
2637            create_decimal_array(&[Some(123), None, None, Some(1), Some(0)], 10, 0),
2638        )?;
2639
2640        Ok(())
2641    }
2642
2643    #[test]
2644    fn modulus_op_scalar() -> Result<()> {
2645        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2646        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2647
2648        apply_arithmetic_scalar(
2649            Arc::new(schema),
2650            vec![Arc::new(a)],
2651            Operator::Modulo,
2652            ScalarValue::Int32(Some(2)),
2653            Arc::new(Int32Array::from(vec![1, 0, 1, 0, 1])),
2654        )?;
2655
2656        Ok(())
2657    }
2658
2659    #[test]
2660    fn modules_op_dict_scalar() -> Result<()> {
2661        let schema = Schema::new(vec![Field::new(
2662            "a",
2663            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2664            true,
2665        )]);
2666
2667        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2668
2669        dict_builder.append(1)?;
2670        dict_builder.append_null();
2671        dict_builder.append(2)?;
2672        dict_builder.append(5)?;
2673
2674        let a = dict_builder.finish();
2675
2676        let expected: PrimitiveArray<Int32Type> =
2677            PrimitiveArray::from(vec![Some(1), None, Some(0), Some(1)]);
2678
2679        apply_arithmetic_scalar(
2680            Arc::new(schema),
2681            vec![Arc::new(a)],
2682            Operator::Modulo,
2683            ScalarValue::Dictionary(
2684                Box::new(DataType::Int8),
2685                Box::new(ScalarValue::Int32(Some(2))),
2686            ),
2687            Arc::new(expected),
2688        )?;
2689
2690        Ok(())
2691    }
2692
2693    #[test]
2694    fn modulus_op_dict_scalar_decimal() -> Result<()> {
2695        let schema = Schema::new(vec![Field::new(
2696            "a",
2697            DataType::Dictionary(
2698                Box::new(DataType::Int8),
2699                Box::new(DataType::Decimal128(10, 0)),
2700            ),
2701            true,
2702        )]);
2703
2704        let value = 123;
2705        let decimal_array = Arc::new(create_decimal_array(
2706            &[Some(value), None, Some(value - 1), Some(value + 1)],
2707            10,
2708            0,
2709        ));
2710
2711        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2712        let a = DictionaryArray::try_new(keys, decimal_array)?;
2713
2714        let decimal_array = Arc::new(create_decimal_array(
2715            &[Some(1), Some(0), None, Some(0), Some(1)],
2716            10,
2717            0,
2718        ));
2719
2720        apply_arithmetic_scalar(
2721            Arc::new(schema),
2722            vec![Arc::new(a)],
2723            Operator::Modulo,
2724            ScalarValue::Dictionary(
2725                Box::new(DataType::Int8),
2726                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2727            ),
2728            decimal_array,
2729        )?;
2730
2731        Ok(())
2732    }
2733
2734    fn apply_arithmetic<T: ArrowNumericType>(
2735        schema: SchemaRef,
2736        data: Vec<ArrayRef>,
2737        op: Operator,
2738        expected: PrimitiveArray<T>,
2739    ) -> Result<()> {
2740        let arithmetic_op =
2741            binary_op(col("a", &schema)?, op, col("b", &schema)?, &schema)?;
2742        let batch = RecordBatch::try_new(schema, data)?;
2743        let result = arithmetic_op
2744            .evaluate(&batch)?
2745            .into_array(batch.num_rows())
2746            .expect("Failed to convert to array");
2747
2748        assert_eq!(result.as_ref(), &expected);
2749        Ok(())
2750    }
2751
2752    fn apply_arithmetic_scalar(
2753        schema: SchemaRef,
2754        data: Vec<ArrayRef>,
2755        op: Operator,
2756        literal: ScalarValue,
2757        expected: ArrayRef,
2758    ) -> Result<()> {
2759        let lit = Arc::new(Literal::new(literal));
2760        let arithmetic_op = binary_op(col("a", &schema)?, op, lit, &schema)?;
2761        let batch = RecordBatch::try_new(schema, data)?;
2762        let result = arithmetic_op
2763            .evaluate(&batch)?
2764            .into_array(batch.num_rows())
2765            .expect("Failed to convert to array");
2766
2767        assert_eq!(&result, &expected);
2768        Ok(())
2769    }
2770
2771    fn apply_logic_op(
2772        schema: &SchemaRef,
2773        left: &ArrayRef,
2774        right: &ArrayRef,
2775        op: Operator,
2776        expected: BooleanArray,
2777    ) -> Result<()> {
2778        let op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
2779        let data: Vec<ArrayRef> = vec![Arc::clone(left), Arc::clone(right)];
2780        let batch = RecordBatch::try_new(Arc::clone(schema), data)?;
2781        let result = op
2782            .evaluate(&batch)?
2783            .into_array(batch.num_rows())
2784            .expect("Failed to convert to array");
2785
2786        assert_eq!(result.as_ref(), &expected);
2787        Ok(())
2788    }
2789
2790    // Test `scalar <op> arr` produces expected
2791    fn apply_logic_op_scalar_arr(
2792        schema: &SchemaRef,
2793        scalar: &ScalarValue,
2794        arr: &ArrayRef,
2795        op: Operator,
2796        expected: &BooleanArray,
2797    ) -> Result<()> {
2798        let scalar = lit(scalar.clone());
2799        let op = binary_op(scalar, op, col("a", schema)?, schema)?;
2800        let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
2801        let result = op
2802            .evaluate(&batch)?
2803            .into_array(batch.num_rows())
2804            .expect("Failed to convert to array");
2805        assert_eq!(result.as_ref(), expected);
2806
2807        Ok(())
2808    }
2809
2810    // Test `arr <op> scalar` produces expected
2811    fn apply_logic_op_arr_scalar(
2812        schema: &SchemaRef,
2813        arr: &ArrayRef,
2814        scalar: &ScalarValue,
2815        op: Operator,
2816        expected: &BooleanArray,
2817    ) -> Result<()> {
2818        let scalar = lit(scalar.clone());
2819        let op = binary_op(col("a", schema)?, op, scalar, schema)?;
2820        let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
2821        let result = op
2822            .evaluate(&batch)?
2823            .into_array(batch.num_rows())
2824            .expect("Failed to convert to array");
2825        assert_eq!(result.as_ref(), expected);
2826
2827        Ok(())
2828    }
2829
2830    #[test]
2831    fn and_with_nulls_op() -> Result<()> {
2832        let schema = Schema::new(vec![
2833            Field::new("a", DataType::Boolean, true),
2834            Field::new("b", DataType::Boolean, true),
2835        ]);
2836        let a = Arc::new(BooleanArray::from(vec![
2837            Some(true),
2838            Some(false),
2839            None,
2840            Some(true),
2841            Some(false),
2842            None,
2843            Some(true),
2844            Some(false),
2845            None,
2846        ])) as ArrayRef;
2847        let b = Arc::new(BooleanArray::from(vec![
2848            Some(true),
2849            Some(true),
2850            Some(true),
2851            Some(false),
2852            Some(false),
2853            Some(false),
2854            None,
2855            None,
2856            None,
2857        ])) as ArrayRef;
2858
2859        let expected = BooleanArray::from(vec![
2860            Some(true),
2861            Some(false),
2862            None,
2863            Some(false),
2864            Some(false),
2865            Some(false),
2866            None,
2867            Some(false),
2868            None,
2869        ]);
2870        apply_logic_op(&Arc::new(schema), &a, &b, Operator::And, expected)?;
2871
2872        Ok(())
2873    }
2874
2875    #[test]
2876    fn regex_with_nulls() -> Result<()> {
2877        let schema = Schema::new(vec![
2878            Field::new("a", DataType::Utf8, true),
2879            Field::new("b", DataType::Utf8, true),
2880        ]);
2881        let a = Arc::new(StringArray::from(vec![
2882            Some("abc"),
2883            None,
2884            Some("abc"),
2885            None,
2886            Some("abc"),
2887        ])) as ArrayRef;
2888        let b = Arc::new(StringArray::from(vec![
2889            Some("^a"),
2890            Some("^A"),
2891            None,
2892            None,
2893            Some("^(b|c)"),
2894        ])) as ArrayRef;
2895
2896        let regex_expected =
2897            BooleanArray::from(vec![Some(true), None, None, None, Some(false)]);
2898        let regex_not_expected =
2899            BooleanArray::from(vec![Some(false), None, None, None, Some(true)]);
2900        apply_logic_op(
2901            &Arc::new(schema.clone()),
2902            &a,
2903            &b,
2904            Operator::RegexMatch,
2905            regex_expected.clone(),
2906        )?;
2907        apply_logic_op(
2908            &Arc::new(schema.clone()),
2909            &a,
2910            &b,
2911            Operator::RegexIMatch,
2912            regex_expected.clone(),
2913        )?;
2914        apply_logic_op(
2915            &Arc::new(schema.clone()),
2916            &a,
2917            &b,
2918            Operator::RegexNotMatch,
2919            regex_not_expected.clone(),
2920        )?;
2921        apply_logic_op(
2922            &Arc::new(schema),
2923            &a,
2924            &b,
2925            Operator::RegexNotIMatch,
2926            regex_not_expected.clone(),
2927        )?;
2928
2929        let schema = Schema::new(vec![
2930            Field::new("a", DataType::LargeUtf8, true),
2931            Field::new("b", DataType::LargeUtf8, true),
2932        ]);
2933        let a = Arc::new(LargeStringArray::from(vec![
2934            Some("abc"),
2935            None,
2936            Some("abc"),
2937            None,
2938            Some("abc"),
2939        ])) as ArrayRef;
2940        let b = Arc::new(LargeStringArray::from(vec![
2941            Some("^a"),
2942            Some("^A"),
2943            None,
2944            None,
2945            Some("^(b|c)"),
2946        ])) as ArrayRef;
2947
2948        apply_logic_op(
2949            &Arc::new(schema.clone()),
2950            &a,
2951            &b,
2952            Operator::RegexMatch,
2953            regex_expected.clone(),
2954        )?;
2955        apply_logic_op(
2956            &Arc::new(schema.clone()),
2957            &a,
2958            &b,
2959            Operator::RegexIMatch,
2960            regex_expected,
2961        )?;
2962        apply_logic_op(
2963            &Arc::new(schema.clone()),
2964            &a,
2965            &b,
2966            Operator::RegexNotMatch,
2967            regex_not_expected.clone(),
2968        )?;
2969        apply_logic_op(
2970            &Arc::new(schema),
2971            &a,
2972            &b,
2973            Operator::RegexNotIMatch,
2974            regex_not_expected,
2975        )?;
2976
2977        Ok(())
2978    }
2979
2980    #[test]
2981    fn or_with_nulls_op() -> Result<()> {
2982        let schema = Schema::new(vec![
2983            Field::new("a", DataType::Boolean, true),
2984            Field::new("b", DataType::Boolean, true),
2985        ]);
2986        let a = Arc::new(BooleanArray::from(vec![
2987            Some(true),
2988            Some(false),
2989            None,
2990            Some(true),
2991            Some(false),
2992            None,
2993            Some(true),
2994            Some(false),
2995            None,
2996        ])) as ArrayRef;
2997        let b = Arc::new(BooleanArray::from(vec![
2998            Some(true),
2999            Some(true),
3000            Some(true),
3001            Some(false),
3002            Some(false),
3003            Some(false),
3004            None,
3005            None,
3006            None,
3007        ])) as ArrayRef;
3008
3009        let expected = BooleanArray::from(vec![
3010            Some(true),
3011            Some(true),
3012            Some(true),
3013            Some(true),
3014            Some(false),
3015            None,
3016            Some(true),
3017            None,
3018            None,
3019        ]);
3020        apply_logic_op(&Arc::new(schema), &a, &b, Operator::Or, expected)?;
3021
3022        Ok(())
3023    }
3024
3025    /// Returns (schema, a: BooleanArray, b: BooleanArray) with all possible inputs
3026    ///
3027    /// a: [true, true, true,  NULL, NULL, NULL,  false, false, false]
3028    /// b: [true, NULL, false, true, NULL, false, true,  NULL,  false]
3029    fn bool_test_arrays() -> (SchemaRef, ArrayRef, ArrayRef) {
3030        let schema = Schema::new(vec![
3031            Field::new("a", DataType::Boolean, true),
3032            Field::new("b", DataType::Boolean, true),
3033        ]);
3034        let a: BooleanArray = [
3035            Some(true),
3036            Some(true),
3037            Some(true),
3038            None,
3039            None,
3040            None,
3041            Some(false),
3042            Some(false),
3043            Some(false),
3044        ]
3045        .iter()
3046        .collect();
3047        let b: BooleanArray = [
3048            Some(true),
3049            None,
3050            Some(false),
3051            Some(true),
3052            None,
3053            Some(false),
3054            Some(true),
3055            None,
3056            Some(false),
3057        ]
3058        .iter()
3059        .collect();
3060        (Arc::new(schema), Arc::new(a), Arc::new(b))
3061    }
3062
3063    /// Returns (schema, BooleanArray) with [true, NULL, false]
3064    fn scalar_bool_test_array() -> (SchemaRef, ArrayRef) {
3065        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
3066        let a: BooleanArray = [Some(true), None, Some(false)].iter().collect();
3067        (Arc::new(schema), Arc::new(a))
3068    }
3069
3070    #[test]
3071    fn eq_op_bool() {
3072        let (schema, a, b) = bool_test_arrays();
3073        let expected = [
3074            Some(true),
3075            None,
3076            Some(false),
3077            None,
3078            None,
3079            None,
3080            Some(false),
3081            None,
3082            Some(true),
3083        ]
3084        .iter()
3085        .collect();
3086        apply_logic_op(&schema, &a, &b, Operator::Eq, expected).unwrap();
3087    }
3088
3089    #[test]
3090    fn eq_op_bool_scalar() {
3091        let (schema, a) = scalar_bool_test_array();
3092        let expected = [Some(true), None, Some(false)].iter().collect();
3093        apply_logic_op_scalar_arr(
3094            &schema,
3095            &ScalarValue::from(true),
3096            &a,
3097            Operator::Eq,
3098            &expected,
3099        )
3100        .unwrap();
3101        apply_logic_op_arr_scalar(
3102            &schema,
3103            &a,
3104            &ScalarValue::from(true),
3105            Operator::Eq,
3106            &expected,
3107        )
3108        .unwrap();
3109
3110        let expected = [Some(false), None, Some(true)].iter().collect();
3111        apply_logic_op_scalar_arr(
3112            &schema,
3113            &ScalarValue::from(false),
3114            &a,
3115            Operator::Eq,
3116            &expected,
3117        )
3118        .unwrap();
3119        apply_logic_op_arr_scalar(
3120            &schema,
3121            &a,
3122            &ScalarValue::from(false),
3123            Operator::Eq,
3124            &expected,
3125        )
3126        .unwrap();
3127    }
3128
3129    #[test]
3130    fn neq_op_bool() {
3131        let (schema, a, b) = bool_test_arrays();
3132        let expected = [
3133            Some(false),
3134            None,
3135            Some(true),
3136            None,
3137            None,
3138            None,
3139            Some(true),
3140            None,
3141            Some(false),
3142        ]
3143        .iter()
3144        .collect();
3145        apply_logic_op(&schema, &a, &b, Operator::NotEq, expected).unwrap();
3146    }
3147
3148    #[test]
3149    fn neq_op_bool_scalar() {
3150        let (schema, a) = scalar_bool_test_array();
3151        let expected = [Some(false), None, Some(true)].iter().collect();
3152        apply_logic_op_scalar_arr(
3153            &schema,
3154            &ScalarValue::from(true),
3155            &a,
3156            Operator::NotEq,
3157            &expected,
3158        )
3159        .unwrap();
3160        apply_logic_op_arr_scalar(
3161            &schema,
3162            &a,
3163            &ScalarValue::from(true),
3164            Operator::NotEq,
3165            &expected,
3166        )
3167        .unwrap();
3168
3169        let expected = [Some(true), None, Some(false)].iter().collect();
3170        apply_logic_op_scalar_arr(
3171            &schema,
3172            &ScalarValue::from(false),
3173            &a,
3174            Operator::NotEq,
3175            &expected,
3176        )
3177        .unwrap();
3178        apply_logic_op_arr_scalar(
3179            &schema,
3180            &a,
3181            &ScalarValue::from(false),
3182            Operator::NotEq,
3183            &expected,
3184        )
3185        .unwrap();
3186    }
3187
3188    #[test]
3189    fn lt_op_bool() {
3190        let (schema, a, b) = bool_test_arrays();
3191        let expected = [
3192            Some(false),
3193            None,
3194            Some(false),
3195            None,
3196            None,
3197            None,
3198            Some(true),
3199            None,
3200            Some(false),
3201        ]
3202        .iter()
3203        .collect();
3204        apply_logic_op(&schema, &a, &b, Operator::Lt, expected).unwrap();
3205    }
3206
3207    #[test]
3208    fn lt_op_bool_scalar() {
3209        let (schema, a) = scalar_bool_test_array();
3210        let expected = [Some(false), None, Some(false)].iter().collect();
3211        apply_logic_op_scalar_arr(
3212            &schema,
3213            &ScalarValue::from(true),
3214            &a,
3215            Operator::Lt,
3216            &expected,
3217        )
3218        .unwrap();
3219
3220        let expected = [Some(false), None, Some(true)].iter().collect();
3221        apply_logic_op_arr_scalar(
3222            &schema,
3223            &a,
3224            &ScalarValue::from(true),
3225            Operator::Lt,
3226            &expected,
3227        )
3228        .unwrap();
3229
3230        let expected = [Some(true), None, Some(false)].iter().collect();
3231        apply_logic_op_scalar_arr(
3232            &schema,
3233            &ScalarValue::from(false),
3234            &a,
3235            Operator::Lt,
3236            &expected,
3237        )
3238        .unwrap();
3239
3240        let expected = [Some(false), None, Some(false)].iter().collect();
3241        apply_logic_op_arr_scalar(
3242            &schema,
3243            &a,
3244            &ScalarValue::from(false),
3245            Operator::Lt,
3246            &expected,
3247        )
3248        .unwrap();
3249    }
3250
3251    #[test]
3252    fn lt_eq_op_bool() {
3253        let (schema, a, b) = bool_test_arrays();
3254        let expected = [
3255            Some(true),
3256            None,
3257            Some(false),
3258            None,
3259            None,
3260            None,
3261            Some(true),
3262            None,
3263            Some(true),
3264        ]
3265        .iter()
3266        .collect();
3267        apply_logic_op(&schema, &a, &b, Operator::LtEq, expected).unwrap();
3268    }
3269
3270    #[test]
3271    fn lt_eq_op_bool_scalar() {
3272        let (schema, a) = scalar_bool_test_array();
3273        let expected = [Some(true), None, Some(false)].iter().collect();
3274        apply_logic_op_scalar_arr(
3275            &schema,
3276            &ScalarValue::from(true),
3277            &a,
3278            Operator::LtEq,
3279            &expected,
3280        )
3281        .unwrap();
3282
3283        let expected = [Some(true), None, Some(true)].iter().collect();
3284        apply_logic_op_arr_scalar(
3285            &schema,
3286            &a,
3287            &ScalarValue::from(true),
3288            Operator::LtEq,
3289            &expected,
3290        )
3291        .unwrap();
3292
3293        let expected = [Some(true), None, Some(true)].iter().collect();
3294        apply_logic_op_scalar_arr(
3295            &schema,
3296            &ScalarValue::from(false),
3297            &a,
3298            Operator::LtEq,
3299            &expected,
3300        )
3301        .unwrap();
3302
3303        let expected = [Some(false), None, Some(true)].iter().collect();
3304        apply_logic_op_arr_scalar(
3305            &schema,
3306            &a,
3307            &ScalarValue::from(false),
3308            Operator::LtEq,
3309            &expected,
3310        )
3311        .unwrap();
3312    }
3313
3314    #[test]
3315    fn gt_op_bool() {
3316        let (schema, a, b) = bool_test_arrays();
3317        let expected = [
3318            Some(false),
3319            None,
3320            Some(true),
3321            None,
3322            None,
3323            None,
3324            Some(false),
3325            None,
3326            Some(false),
3327        ]
3328        .iter()
3329        .collect();
3330        apply_logic_op(&schema, &a, &b, Operator::Gt, expected).unwrap();
3331    }
3332
3333    #[test]
3334    fn gt_op_bool_scalar() {
3335        let (schema, a) = scalar_bool_test_array();
3336        let expected = [Some(false), None, Some(true)].iter().collect();
3337        apply_logic_op_scalar_arr(
3338            &schema,
3339            &ScalarValue::from(true),
3340            &a,
3341            Operator::Gt,
3342            &expected,
3343        )
3344        .unwrap();
3345
3346        let expected = [Some(false), None, Some(false)].iter().collect();
3347        apply_logic_op_arr_scalar(
3348            &schema,
3349            &a,
3350            &ScalarValue::from(true),
3351            Operator::Gt,
3352            &expected,
3353        )
3354        .unwrap();
3355
3356        let expected = [Some(false), None, Some(false)].iter().collect();
3357        apply_logic_op_scalar_arr(
3358            &schema,
3359            &ScalarValue::from(false),
3360            &a,
3361            Operator::Gt,
3362            &expected,
3363        )
3364        .unwrap();
3365
3366        let expected = [Some(true), None, Some(false)].iter().collect();
3367        apply_logic_op_arr_scalar(
3368            &schema,
3369            &a,
3370            &ScalarValue::from(false),
3371            Operator::Gt,
3372            &expected,
3373        )
3374        .unwrap();
3375    }
3376
3377    #[test]
3378    fn gt_eq_op_bool() {
3379        let (schema, a, b) = bool_test_arrays();
3380        let expected = [
3381            Some(true),
3382            None,
3383            Some(true),
3384            None,
3385            None,
3386            None,
3387            Some(false),
3388            None,
3389            Some(true),
3390        ]
3391        .iter()
3392        .collect();
3393        apply_logic_op(&schema, &a, &b, Operator::GtEq, expected).unwrap();
3394    }
3395
3396    #[test]
3397    fn gt_eq_op_bool_scalar() {
3398        let (schema, a) = scalar_bool_test_array();
3399        let expected = [Some(true), None, Some(true)].iter().collect();
3400        apply_logic_op_scalar_arr(
3401            &schema,
3402            &ScalarValue::from(true),
3403            &a,
3404            Operator::GtEq,
3405            &expected,
3406        )
3407        .unwrap();
3408
3409        let expected = [Some(true), None, Some(false)].iter().collect();
3410        apply_logic_op_arr_scalar(
3411            &schema,
3412            &a,
3413            &ScalarValue::from(true),
3414            Operator::GtEq,
3415            &expected,
3416        )
3417        .unwrap();
3418
3419        let expected = [Some(false), None, Some(true)].iter().collect();
3420        apply_logic_op_scalar_arr(
3421            &schema,
3422            &ScalarValue::from(false),
3423            &a,
3424            Operator::GtEq,
3425            &expected,
3426        )
3427        .unwrap();
3428
3429        let expected = [Some(true), None, Some(true)].iter().collect();
3430        apply_logic_op_arr_scalar(
3431            &schema,
3432            &a,
3433            &ScalarValue::from(false),
3434            Operator::GtEq,
3435            &expected,
3436        )
3437        .unwrap();
3438    }
3439
3440    #[test]
3441    fn is_distinct_from_op_bool() {
3442        let (schema, a, b) = bool_test_arrays();
3443        let expected = [
3444            Some(false),
3445            Some(true),
3446            Some(true),
3447            Some(true),
3448            Some(false),
3449            Some(true),
3450            Some(true),
3451            Some(true),
3452            Some(false),
3453        ]
3454        .iter()
3455        .collect();
3456        apply_logic_op(&schema, &a, &b, Operator::IsDistinctFrom, expected).unwrap();
3457    }
3458
3459    #[test]
3460    fn is_not_distinct_from_op_bool() {
3461        let (schema, a, b) = bool_test_arrays();
3462        let expected = [
3463            Some(true),
3464            Some(false),
3465            Some(false),
3466            Some(false),
3467            Some(true),
3468            Some(false),
3469            Some(false),
3470            Some(false),
3471            Some(true),
3472        ]
3473        .iter()
3474        .collect();
3475        apply_logic_op(&schema, &a, &b, Operator::IsNotDistinctFrom, expected).unwrap();
3476    }
3477
3478    #[test]
3479    fn relatively_deeply_nested() {
3480        // Reproducer for https://github.com/apache/datafusion/issues/419
3481
3482        // where even relatively shallow binary expressions overflowed
3483        // the stack in debug builds
3484
3485        let input: Vec<_> = vec![1, 2, 3, 4, 5].into_iter().map(Some).collect();
3486        let a: Int32Array = input.iter().collect();
3487
3488        let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(a) as _)]).unwrap();
3489        let schema = batch.schema();
3490
3491        // build a left deep tree ((((a + a) + a) + a ....
3492        let tree_depth: i32 = 100;
3493        let expr = (0..tree_depth)
3494            .map(|_| col("a", schema.as_ref()).unwrap())
3495            .reduce(|l, r| binary(l, Operator::Plus, r, &schema).unwrap())
3496            .unwrap();
3497
3498        let result = expr
3499            .evaluate(&batch)
3500            .expect("evaluation")
3501            .into_array(batch.num_rows())
3502            .expect("Failed to convert to array");
3503
3504        let expected: Int32Array = input
3505            .into_iter()
3506            .map(|i| i.map(|i| i * tree_depth))
3507            .collect();
3508        assert_eq!(result.as_ref(), &expected);
3509    }
3510
3511    fn create_decimal_array(
3512        array: &[Option<i128>],
3513        precision: u8,
3514        scale: i8,
3515    ) -> Decimal128Array {
3516        let mut decimal_builder = Decimal128Builder::with_capacity(array.len());
3517        for value in array.iter().copied() {
3518            decimal_builder.append_option(value)
3519        }
3520        decimal_builder
3521            .finish()
3522            .with_precision_and_scale(precision, scale)
3523            .unwrap()
3524    }
3525
3526    #[test]
3527    fn comparison_dict_decimal_scalar_expr_test() -> Result<()> {
3528        // scalar of decimal compare with dictionary decimal array
3529        let value_i128 = 123;
3530        let decimal_scalar = ScalarValue::Dictionary(
3531            Box::new(DataType::Int8),
3532            Box::new(ScalarValue::Decimal128(Some(value_i128), 25, 3)),
3533        );
3534        let schema = Arc::new(Schema::new(vec![Field::new(
3535            "a",
3536            DataType::Dictionary(
3537                Box::new(DataType::Int8),
3538                Box::new(DataType::Decimal128(25, 3)),
3539            ),
3540            true,
3541        )]));
3542        let decimal_array = Arc::new(create_decimal_array(
3543            &[
3544                Some(value_i128),
3545                None,
3546                Some(value_i128 - 1),
3547                Some(value_i128 + 1),
3548            ],
3549            25,
3550            3,
3551        ));
3552
3553        let keys = Int8Array::from(vec![Some(0), None, Some(2), Some(3)]);
3554        let dictionary =
3555            Arc::new(DictionaryArray::try_new(keys, decimal_array)?) as ArrayRef;
3556
3557        // array = scalar
3558        apply_logic_op_arr_scalar(
3559            &schema,
3560            &dictionary,
3561            &decimal_scalar,
3562            Operator::Eq,
3563            &BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3564        )
3565        .unwrap();
3566        // array != scalar
3567        apply_logic_op_arr_scalar(
3568            &schema,
3569            &dictionary,
3570            &decimal_scalar,
3571            Operator::NotEq,
3572            &BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3573        )
3574        .unwrap();
3575        //  array < scalar
3576        apply_logic_op_arr_scalar(
3577            &schema,
3578            &dictionary,
3579            &decimal_scalar,
3580            Operator::Lt,
3581            &BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3582        )
3583        .unwrap();
3584
3585        //  array <= scalar
3586        apply_logic_op_arr_scalar(
3587            &schema,
3588            &dictionary,
3589            &decimal_scalar,
3590            Operator::LtEq,
3591            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3592        )
3593        .unwrap();
3594        // array > scalar
3595        apply_logic_op_arr_scalar(
3596            &schema,
3597            &dictionary,
3598            &decimal_scalar,
3599            Operator::Gt,
3600            &BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3601        )
3602        .unwrap();
3603
3604        // array >= scalar
3605        apply_logic_op_arr_scalar(
3606            &schema,
3607            &dictionary,
3608            &decimal_scalar,
3609            Operator::GtEq,
3610            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3611        )
3612        .unwrap();
3613
3614        Ok(())
3615    }
3616
3617    #[test]
3618    fn comparison_decimal_expr_test() -> Result<()> {
3619        // scalar of decimal compare with decimal array
3620        let value_i128 = 123;
3621        let decimal_scalar = ScalarValue::Decimal128(Some(value_i128), 25, 3);
3622        let schema = Arc::new(Schema::new(vec![Field::new(
3623            "a",
3624            DataType::Decimal128(25, 3),
3625            true,
3626        )]));
3627        let decimal_array = Arc::new(create_decimal_array(
3628            &[
3629                Some(value_i128),
3630                None,
3631                Some(value_i128 - 1),
3632                Some(value_i128 + 1),
3633            ],
3634            25,
3635            3,
3636        )) as ArrayRef;
3637        // array = scalar
3638        apply_logic_op_arr_scalar(
3639            &schema,
3640            &decimal_array,
3641            &decimal_scalar,
3642            Operator::Eq,
3643            &BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3644        )
3645        .unwrap();
3646        // array != scalar
3647        apply_logic_op_arr_scalar(
3648            &schema,
3649            &decimal_array,
3650            &decimal_scalar,
3651            Operator::NotEq,
3652            &BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3653        )
3654        .unwrap();
3655        //  array < scalar
3656        apply_logic_op_arr_scalar(
3657            &schema,
3658            &decimal_array,
3659            &decimal_scalar,
3660            Operator::Lt,
3661            &BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3662        )
3663        .unwrap();
3664
3665        //  array <= scalar
3666        apply_logic_op_arr_scalar(
3667            &schema,
3668            &decimal_array,
3669            &decimal_scalar,
3670            Operator::LtEq,
3671            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3672        )
3673        .unwrap();
3674        // array > scalar
3675        apply_logic_op_arr_scalar(
3676            &schema,
3677            &decimal_array,
3678            &decimal_scalar,
3679            Operator::Gt,
3680            &BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3681        )
3682        .unwrap();
3683
3684        // array >= scalar
3685        apply_logic_op_arr_scalar(
3686            &schema,
3687            &decimal_array,
3688            &decimal_scalar,
3689            Operator::GtEq,
3690            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3691        )
3692        .unwrap();
3693
3694        // scalar of different data type with decimal array
3695        let decimal_scalar = ScalarValue::Decimal128(Some(123_456), 10, 3);
3696        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
3697        // scalar == array
3698        apply_logic_op_scalar_arr(
3699            &schema,
3700            &decimal_scalar,
3701            &(Arc::new(Int64Array::from(vec![Some(124), None])) as ArrayRef),
3702            Operator::Eq,
3703            &BooleanArray::from(vec![Some(false), None]),
3704        )
3705        .unwrap();
3706
3707        // array != scalar
3708        apply_logic_op_arr_scalar(
3709            &schema,
3710            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(1)])) as ArrayRef),
3711            &decimal_scalar,
3712            Operator::NotEq,
3713            &BooleanArray::from(vec![Some(true), None, Some(true)]),
3714        )
3715        .unwrap();
3716
3717        // array < scalar
3718        apply_logic_op_arr_scalar(
3719            &schema,
3720            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(124)])) as ArrayRef),
3721            &decimal_scalar,
3722            Operator::Lt,
3723            &BooleanArray::from(vec![Some(true), None, Some(false)]),
3724        )
3725        .unwrap();
3726
3727        // array > scalar
3728        apply_logic_op_arr_scalar(
3729            &schema,
3730            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(124)])) as ArrayRef),
3731            &decimal_scalar,
3732            Operator::Gt,
3733            &BooleanArray::from(vec![Some(false), None, Some(true)]),
3734        )
3735        .unwrap();
3736
3737        let schema =
3738            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
3739        // array == scalar
3740        apply_logic_op_arr_scalar(
3741            &schema,
3742            &(Arc::new(Float64Array::from(vec![Some(123.456), None, Some(123.457)]))
3743                as ArrayRef),
3744            &decimal_scalar,
3745            Operator::Eq,
3746            &BooleanArray::from(vec![Some(true), None, Some(false)]),
3747        )
3748        .unwrap();
3749
3750        // array <= scalar
3751        apply_logic_op_arr_scalar(
3752            &schema,
3753            &(Arc::new(Float64Array::from(vec![
3754                Some(123.456),
3755                None,
3756                Some(123.457),
3757                Some(123.45),
3758            ])) as ArrayRef),
3759            &decimal_scalar,
3760            Operator::LtEq,
3761            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3762        )
3763        .unwrap();
3764        // array >= scalar
3765        apply_logic_op_arr_scalar(
3766            &schema,
3767            &(Arc::new(Float64Array::from(vec![
3768                Some(123.456),
3769                None,
3770                Some(123.457),
3771                Some(123.45),
3772            ])) as ArrayRef),
3773            &decimal_scalar,
3774            Operator::GtEq,
3775            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3776        )
3777        .unwrap();
3778
3779        let value: i128 = 123;
3780        let decimal_array = Arc::new(create_decimal_array(
3781            &[Some(value), None, Some(value - 1), Some(value + 1)],
3782            10,
3783            0,
3784        )) as ArrayRef;
3785
3786        // comparison array op for decimal array
3787        let schema = Arc::new(Schema::new(vec![
3788            Field::new("a", DataType::Decimal128(10, 0), true),
3789            Field::new("b", DataType::Decimal128(10, 0), true),
3790        ]));
3791        let right_decimal_array = Arc::new(create_decimal_array(
3792            &[
3793                Some(value - 1),
3794                Some(value),
3795                Some(value + 1),
3796                Some(value + 1),
3797            ],
3798            10,
3799            0,
3800        )) as ArrayRef;
3801
3802        apply_logic_op(
3803            &schema,
3804            &decimal_array,
3805            &right_decimal_array,
3806            Operator::Eq,
3807            BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3808        )
3809        .unwrap();
3810
3811        apply_logic_op(
3812            &schema,
3813            &decimal_array,
3814            &right_decimal_array,
3815            Operator::NotEq,
3816            BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3817        )
3818        .unwrap();
3819
3820        apply_logic_op(
3821            &schema,
3822            &decimal_array,
3823            &right_decimal_array,
3824            Operator::Lt,
3825            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3826        )
3827        .unwrap();
3828
3829        apply_logic_op(
3830            &schema,
3831            &decimal_array,
3832            &right_decimal_array,
3833            Operator::LtEq,
3834            BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3835        )
3836        .unwrap();
3837
3838        apply_logic_op(
3839            &schema,
3840            &decimal_array,
3841            &right_decimal_array,
3842            Operator::Gt,
3843            BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3844        )
3845        .unwrap();
3846
3847        apply_logic_op(
3848            &schema,
3849            &decimal_array,
3850            &right_decimal_array,
3851            Operator::GtEq,
3852            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3853        )
3854        .unwrap();
3855
3856        // compare decimal array with other array type
3857        let value: i64 = 123;
3858        let schema = Arc::new(Schema::new(vec![
3859            Field::new("a", DataType::Int64, true),
3860            Field::new("b", DataType::Decimal128(10, 0), true),
3861        ]));
3862
3863        let int64_array = Arc::new(Int64Array::from(vec![
3864            Some(value),
3865            Some(value - 1),
3866            Some(value),
3867            Some(value + 1),
3868        ])) as ArrayRef;
3869
3870        // eq: int64array == decimal array
3871        apply_logic_op(
3872            &schema,
3873            &int64_array,
3874            &decimal_array,
3875            Operator::Eq,
3876            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3877        )
3878        .unwrap();
3879        // neq: int64array != decimal array
3880        apply_logic_op(
3881            &schema,
3882            &int64_array,
3883            &decimal_array,
3884            Operator::NotEq,
3885            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3886        )
3887        .unwrap();
3888
3889        let schema = Arc::new(Schema::new(vec![
3890            Field::new("a", DataType::Float64, true),
3891            Field::new("b", DataType::Decimal128(10, 2), true),
3892        ]));
3893
3894        let value: i128 = 123;
3895        let decimal_array = Arc::new(create_decimal_array(
3896            &[
3897                Some(value), // 1.23
3898                None,
3899                Some(value - 1), // 1.22
3900                Some(value + 1), // 1.24
3901            ],
3902            10,
3903            2,
3904        )) as ArrayRef;
3905        let float64_array = Arc::new(Float64Array::from(vec![
3906            Some(1.23),
3907            Some(1.22),
3908            Some(1.23),
3909            Some(1.24),
3910        ])) as ArrayRef;
3911        // lt: float64array < decimal array
3912        apply_logic_op(
3913            &schema,
3914            &float64_array,
3915            &decimal_array,
3916            Operator::Lt,
3917            BooleanArray::from(vec![Some(false), None, Some(false), Some(false)]),
3918        )
3919        .unwrap();
3920        // lt_eq: float64array <= decimal array
3921        apply_logic_op(
3922            &schema,
3923            &float64_array,
3924            &decimal_array,
3925            Operator::LtEq,
3926            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3927        )
3928        .unwrap();
3929        // gt: float64array > decimal array
3930        apply_logic_op(
3931            &schema,
3932            &float64_array,
3933            &decimal_array,
3934            Operator::Gt,
3935            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3936        )
3937        .unwrap();
3938        apply_logic_op(
3939            &schema,
3940            &float64_array,
3941            &decimal_array,
3942            Operator::GtEq,
3943            BooleanArray::from(vec![Some(true), None, Some(true), Some(true)]),
3944        )
3945        .unwrap();
3946        // is distinct: float64array is distinct decimal array
3947        // TODO: now we do not refactor the `is distinct or is not distinct` rule of coercion.
3948        // traced by https://github.com/apache/datafusion/issues/1590
3949        // the decimal array will be casted to float64array
3950        apply_logic_op(
3951            &schema,
3952            &float64_array,
3953            &decimal_array,
3954            Operator::IsDistinctFrom,
3955            BooleanArray::from(vec![Some(false), Some(true), Some(true), Some(false)]),
3956        )
3957        .unwrap();
3958        // is not distinct
3959        apply_logic_op(
3960            &schema,
3961            &float64_array,
3962            &decimal_array,
3963            Operator::IsNotDistinctFrom,
3964            BooleanArray::from(vec![Some(true), Some(false), Some(false), Some(true)]),
3965        )
3966        .unwrap();
3967
3968        Ok(())
3969    }
3970
3971    fn apply_decimal_arithmetic_op(
3972        schema: &SchemaRef,
3973        left: &ArrayRef,
3974        right: &ArrayRef,
3975        op: Operator,
3976        expected: ArrayRef,
3977    ) -> Result<()> {
3978        let arithmetic_op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
3979        let data: Vec<ArrayRef> = vec![Arc::clone(left), Arc::clone(right)];
3980        let batch = RecordBatch::try_new(Arc::clone(schema), data)?;
3981        let result = arithmetic_op
3982            .evaluate(&batch)?
3983            .into_array(batch.num_rows())
3984            .expect("Failed to convert to array");
3985
3986        assert_eq!(result.as_ref(), expected.as_ref());
3987        Ok(())
3988    }
3989
3990    #[test]
3991    fn arithmetic_decimal_expr_test() -> Result<()> {
3992        let schema = Arc::new(Schema::new(vec![
3993            Field::new("a", DataType::Int32, true),
3994            Field::new("b", DataType::Decimal128(10, 2), true),
3995        ]));
3996        let value: i128 = 123;
3997        let decimal_array = Arc::new(create_decimal_array(
3998            &[
3999                Some(value), // 1.23
4000                None,
4001                Some(value - 1), // 1.22
4002                Some(value + 1), // 1.24
4003            ],
4004            10,
4005            2,
4006        )) as ArrayRef;
4007        let int32_array = Arc::new(Int32Array::from(vec![
4008            Some(123),
4009            Some(122),
4010            Some(123),
4011            Some(124),
4012        ])) as ArrayRef;
4013
4014        // add: Int32array add decimal array
4015        let expect = Arc::new(create_decimal_array(
4016            &[Some(12423), None, Some(12422), Some(12524)],
4017            13,
4018            2,
4019        )) as ArrayRef;
4020        apply_decimal_arithmetic_op(
4021            &schema,
4022            &int32_array,
4023            &decimal_array,
4024            Operator::Plus,
4025            expect,
4026        )
4027        .unwrap();
4028
4029        // subtract: decimal array subtract int32 array
4030        let schema = Arc::new(Schema::new(vec![
4031            Field::new("a", DataType::Decimal128(10, 2), true),
4032            Field::new("b", DataType::Int32, true),
4033        ]));
4034        let expect = Arc::new(create_decimal_array(
4035            &[Some(-12177), None, Some(-12178), Some(-12276)],
4036            13,
4037            2,
4038        )) as ArrayRef;
4039        apply_decimal_arithmetic_op(
4040            &schema,
4041            &decimal_array,
4042            &int32_array,
4043            Operator::Minus,
4044            expect,
4045        )
4046        .unwrap();
4047
4048        // multiply: decimal array multiply int32 array
4049        let expect = Arc::new(create_decimal_array(
4050            &[Some(15129), None, Some(15006), Some(15376)],
4051            21,
4052            2,
4053        )) as ArrayRef;
4054        apply_decimal_arithmetic_op(
4055            &schema,
4056            &decimal_array,
4057            &int32_array,
4058            Operator::Multiply,
4059            expect,
4060        )
4061        .unwrap();
4062
4063        // divide: int32 array divide decimal array
4064        let schema = Arc::new(Schema::new(vec![
4065            Field::new("a", DataType::Int32, true),
4066            Field::new("b", DataType::Decimal128(10, 2), true),
4067        ]));
4068        let expect = Arc::new(create_decimal_array(
4069            &[Some(1000000), None, Some(1008196), Some(1000000)],
4070            16,
4071            4,
4072        )) as ArrayRef;
4073        apply_decimal_arithmetic_op(
4074            &schema,
4075            &int32_array,
4076            &decimal_array,
4077            Operator::Divide,
4078            expect,
4079        )
4080        .unwrap();
4081
4082        // modulus: int32 array modulus decimal array
4083        let schema = Arc::new(Schema::new(vec![
4084            Field::new("a", DataType::Int32, true),
4085            Field::new("b", DataType::Decimal128(10, 2), true),
4086        ]));
4087        let expect = Arc::new(create_decimal_array(
4088            &[Some(000), None, Some(100), Some(000)],
4089            10,
4090            2,
4091        )) as ArrayRef;
4092        apply_decimal_arithmetic_op(
4093            &schema,
4094            &int32_array,
4095            &decimal_array,
4096            Operator::Modulo,
4097            expect,
4098        )
4099        .unwrap();
4100
4101        Ok(())
4102    }
4103
4104    #[test]
4105    fn arithmetic_decimal_float_expr_test() -> Result<()> {
4106        let schema = Arc::new(Schema::new(vec![
4107            Field::new("a", DataType::Float64, true),
4108            Field::new("b", DataType::Decimal128(10, 2), true),
4109        ]));
4110        let value: i128 = 123;
4111        let decimal_array = Arc::new(create_decimal_array(
4112            &[
4113                Some(value), // 1.23
4114                None,
4115                Some(value - 1), // 1.22
4116                Some(value + 1), // 1.24
4117            ],
4118            10,
4119            2,
4120        )) as ArrayRef;
4121        let float64_array = Arc::new(Float64Array::from(vec![
4122            Some(123.0),
4123            Some(122.0),
4124            Some(123.0),
4125            Some(124.0),
4126        ])) as ArrayRef;
4127
4128        // add: float64 array add decimal array
4129        let expect = Arc::new(Float64Array::from(vec![
4130            Some(124.23),
4131            None,
4132            Some(124.22),
4133            Some(125.24),
4134        ])) as ArrayRef;
4135        apply_decimal_arithmetic_op(
4136            &schema,
4137            &float64_array,
4138            &decimal_array,
4139            Operator::Plus,
4140            expect,
4141        )
4142        .unwrap();
4143
4144        // subtract: decimal array subtract float64 array
4145        let schema = Arc::new(Schema::new(vec![
4146            Field::new("a", DataType::Float64, true),
4147            Field::new("b", DataType::Decimal128(10, 2), true),
4148        ]));
4149        let expect = Arc::new(Float64Array::from(vec![
4150            Some(121.77),
4151            None,
4152            Some(121.78),
4153            Some(122.76),
4154        ])) as ArrayRef;
4155        apply_decimal_arithmetic_op(
4156            &schema,
4157            &float64_array,
4158            &decimal_array,
4159            Operator::Minus,
4160            expect,
4161        )
4162        .unwrap();
4163
4164        // multiply: decimal array multiply float64 array
4165        let expect = Arc::new(Float64Array::from(vec![
4166            Some(151.29),
4167            None,
4168            Some(150.06),
4169            Some(153.76),
4170        ])) as ArrayRef;
4171        apply_decimal_arithmetic_op(
4172            &schema,
4173            &float64_array,
4174            &decimal_array,
4175            Operator::Multiply,
4176            expect,
4177        )
4178        .unwrap();
4179
4180        // divide: float64 array divide decimal array
4181        let schema = Arc::new(Schema::new(vec![
4182            Field::new("a", DataType::Float64, true),
4183            Field::new("b", DataType::Decimal128(10, 2), true),
4184        ]));
4185        let expect = Arc::new(Float64Array::from(vec![
4186            Some(100.0),
4187            None,
4188            Some(100.81967213114754),
4189            Some(100.0),
4190        ])) as ArrayRef;
4191        apply_decimal_arithmetic_op(
4192            &schema,
4193            &float64_array,
4194            &decimal_array,
4195            Operator::Divide,
4196            expect,
4197        )
4198        .unwrap();
4199
4200        // modulus: float64 array modulus decimal array
4201        let schema = Arc::new(Schema::new(vec![
4202            Field::new("a", DataType::Float64, true),
4203            Field::new("b", DataType::Decimal128(10, 2), true),
4204        ]));
4205        let expect = Arc::new(Float64Array::from(vec![
4206            Some(1.7763568394002505e-15),
4207            None,
4208            Some(1.0000000000000027),
4209            Some(8.881784197001252e-16),
4210        ])) as ArrayRef;
4211        apply_decimal_arithmetic_op(
4212            &schema,
4213            &float64_array,
4214            &decimal_array,
4215            Operator::Modulo,
4216            expect,
4217        )
4218        .unwrap();
4219
4220        Ok(())
4221    }
4222
4223    #[test]
4224    fn arithmetic_divide_zero() -> Result<()> {
4225        // other data type
4226        let schema = Arc::new(Schema::new(vec![
4227            Field::new("a", DataType::Int32, true),
4228            Field::new("b", DataType::Int32, true),
4229        ]));
4230        let a = Arc::new(Int32Array::from(vec![100]));
4231        let b = Arc::new(Int32Array::from(vec![0]));
4232
4233        let err = apply_arithmetic::<Int32Type>(
4234            schema,
4235            vec![a, b],
4236            Operator::Divide,
4237            Int32Array::from(vec![Some(4), Some(8), Some(16), Some(32), Some(64)]),
4238        )
4239        .unwrap_err();
4240
4241        let _expected = plan_datafusion_err!("Divide by zero");
4242
4243        assert!(matches!(err, ref _expected), "{err}");
4244
4245        // decimal
4246        let schema = Arc::new(Schema::new(vec![
4247            Field::new("a", DataType::Decimal128(25, 3), true),
4248            Field::new("b", DataType::Decimal128(25, 3), true),
4249        ]));
4250        let left_decimal_array = Arc::new(create_decimal_array(&[Some(1234567)], 25, 3));
4251        let right_decimal_array = Arc::new(create_decimal_array(&[Some(0)], 25, 3));
4252
4253        let err = apply_arithmetic::<Decimal128Type>(
4254            schema,
4255            vec![left_decimal_array, right_decimal_array],
4256            Operator::Divide,
4257            create_decimal_array(
4258                &[Some(12345670000000000000000000000000000), None],
4259                38,
4260                29,
4261            ),
4262        )
4263        .unwrap_err();
4264
4265        assert!(matches!(err, ref _expected), "{err}");
4266
4267        Ok(())
4268    }
4269
4270    #[test]
4271    fn bitwise_array_test() -> Result<()> {
4272        let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4273        let right =
4274            Arc::new(Int32Array::from(vec![Some(1), Some(3), Some(7)])) as ArrayRef;
4275        let mut result = bitwise_and_dyn(Arc::clone(&left), Arc::clone(&right))?;
4276        let expected = Int32Array::from(vec![Some(0), None, Some(3)]);
4277        assert_eq!(result.as_ref(), &expected);
4278
4279        result = bitwise_or_dyn(Arc::clone(&left), Arc::clone(&right))?;
4280        let expected = Int32Array::from(vec![Some(13), None, Some(15)]);
4281        assert_eq!(result.as_ref(), &expected);
4282
4283        result = bitwise_xor_dyn(Arc::clone(&left), Arc::clone(&right))?;
4284        let expected = Int32Array::from(vec![Some(13), None, Some(12)]);
4285        assert_eq!(result.as_ref(), &expected);
4286
4287        let left =
4288            Arc::new(UInt32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4289        let right =
4290            Arc::new(UInt32Array::from(vec![Some(1), Some(3), Some(7)])) as ArrayRef;
4291        let mut result = bitwise_and_dyn(Arc::clone(&left), Arc::clone(&right))?;
4292        let expected = UInt32Array::from(vec![Some(0), None, Some(3)]);
4293        assert_eq!(result.as_ref(), &expected);
4294
4295        result = bitwise_or_dyn(Arc::clone(&left), Arc::clone(&right))?;
4296        let expected = UInt32Array::from(vec![Some(13), None, Some(15)]);
4297        assert_eq!(result.as_ref(), &expected);
4298
4299        result = bitwise_xor_dyn(Arc::clone(&left), Arc::clone(&right))?;
4300        let expected = UInt32Array::from(vec![Some(13), None, Some(12)]);
4301        assert_eq!(result.as_ref(), &expected);
4302
4303        Ok(())
4304    }
4305
4306    #[test]
4307    fn bitwise_shift_array_test() -> Result<()> {
4308        let input = Arc::new(Int32Array::from(vec![Some(2), None, Some(10)])) as ArrayRef;
4309        let modules =
4310            Arc::new(Int32Array::from(vec![Some(2), Some(4), Some(8)])) as ArrayRef;
4311        let mut result =
4312            bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4313
4314        let expected = Int32Array::from(vec![Some(8), None, Some(2560)]);
4315        assert_eq!(result.as_ref(), &expected);
4316
4317        result = bitwise_shift_right_dyn(Arc::clone(&result), Arc::clone(&modules))?;
4318        assert_eq!(result.as_ref(), &input);
4319
4320        let input =
4321            Arc::new(UInt32Array::from(vec![Some(2), None, Some(10)])) as ArrayRef;
4322        let modules =
4323            Arc::new(UInt32Array::from(vec![Some(2), Some(4), Some(8)])) as ArrayRef;
4324        let mut result =
4325            bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4326
4327        let expected = UInt32Array::from(vec![Some(8), None, Some(2560)]);
4328        assert_eq!(result.as_ref(), &expected);
4329
4330        result = bitwise_shift_right_dyn(Arc::clone(&result), Arc::clone(&modules))?;
4331        assert_eq!(result.as_ref(), &input);
4332        Ok(())
4333    }
4334
4335    #[test]
4336    fn bitwise_shift_array_overflow_test() -> Result<()> {
4337        let input = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
4338        let modules = Arc::new(Int32Array::from(vec![Some(100)])) as ArrayRef;
4339        let result = bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4340
4341        let expected = Int32Array::from(vec![Some(32)]);
4342        assert_eq!(result.as_ref(), &expected);
4343
4344        let input = Arc::new(UInt32Array::from(vec![Some(2)])) as ArrayRef;
4345        let modules = Arc::new(UInt32Array::from(vec![Some(100)])) as ArrayRef;
4346        let result = bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4347
4348        let expected = UInt32Array::from(vec![Some(32)]);
4349        assert_eq!(result.as_ref(), &expected);
4350        Ok(())
4351    }
4352
4353    #[test]
4354    fn bitwise_scalar_test() -> Result<()> {
4355        let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4356        let right = ScalarValue::from(3i32);
4357        let mut result = bitwise_and_dyn_scalar(&left, right.clone()).unwrap()?;
4358        let expected = Int32Array::from(vec![Some(0), None, Some(3)]);
4359        assert_eq!(result.as_ref(), &expected);
4360
4361        result = bitwise_or_dyn_scalar(&left, right.clone()).unwrap()?;
4362        let expected = Int32Array::from(vec![Some(15), None, Some(11)]);
4363        assert_eq!(result.as_ref(), &expected);
4364
4365        result = bitwise_xor_dyn_scalar(&left, right).unwrap()?;
4366        let expected = Int32Array::from(vec![Some(15), None, Some(8)]);
4367        assert_eq!(result.as_ref(), &expected);
4368
4369        let left =
4370            Arc::new(UInt32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4371        let right = ScalarValue::from(3u32);
4372        let mut result = bitwise_and_dyn_scalar(&left, right.clone()).unwrap()?;
4373        let expected = UInt32Array::from(vec![Some(0), None, Some(3)]);
4374        assert_eq!(result.as_ref(), &expected);
4375
4376        result = bitwise_or_dyn_scalar(&left, right.clone()).unwrap()?;
4377        let expected = UInt32Array::from(vec![Some(15), None, Some(11)]);
4378        assert_eq!(result.as_ref(), &expected);
4379
4380        result = bitwise_xor_dyn_scalar(&left, right).unwrap()?;
4381        let expected = UInt32Array::from(vec![Some(15), None, Some(8)]);
4382        assert_eq!(result.as_ref(), &expected);
4383        Ok(())
4384    }
4385
4386    #[test]
4387    fn bitwise_shift_scalar_test() -> Result<()> {
4388        let input = Arc::new(Int32Array::from(vec![Some(2), None, Some(4)])) as ArrayRef;
4389        let module = ScalarValue::from(10i32);
4390        let mut result =
4391            bitwise_shift_left_dyn_scalar(&input, module.clone()).unwrap()?;
4392
4393        let expected = Int32Array::from(vec![Some(2048), None, Some(4096)]);
4394        assert_eq!(result.as_ref(), &expected);
4395
4396        result = bitwise_shift_right_dyn_scalar(&result, module).unwrap()?;
4397        assert_eq!(result.as_ref(), &input);
4398
4399        let input = Arc::new(UInt32Array::from(vec![Some(2), None, Some(4)])) as ArrayRef;
4400        let module = ScalarValue::from(10u32);
4401        let mut result =
4402            bitwise_shift_left_dyn_scalar(&input, module.clone()).unwrap()?;
4403
4404        let expected = UInt32Array::from(vec![Some(2048), None, Some(4096)]);
4405        assert_eq!(result.as_ref(), &expected);
4406
4407        result = bitwise_shift_right_dyn_scalar(&result, module).unwrap()?;
4408        assert_eq!(result.as_ref(), &input);
4409        Ok(())
4410    }
4411
4412    #[test]
4413    fn test_display_and_or_combo() {
4414        let expr = BinaryExpr::new(
4415            Arc::new(BinaryExpr::new(
4416                lit(ScalarValue::from(1)),
4417                Operator::And,
4418                lit(ScalarValue::from(2)),
4419            )),
4420            Operator::And,
4421            Arc::new(BinaryExpr::new(
4422                lit(ScalarValue::from(3)),
4423                Operator::And,
4424                lit(ScalarValue::from(4)),
4425            )),
4426        );
4427        assert_eq!(expr.to_string(), "1 AND 2 AND 3 AND 4");
4428
4429        let expr = BinaryExpr::new(
4430            Arc::new(BinaryExpr::new(
4431                lit(ScalarValue::from(1)),
4432                Operator::Or,
4433                lit(ScalarValue::from(2)),
4434            )),
4435            Operator::Or,
4436            Arc::new(BinaryExpr::new(
4437                lit(ScalarValue::from(3)),
4438                Operator::Or,
4439                lit(ScalarValue::from(4)),
4440            )),
4441        );
4442        assert_eq!(expr.to_string(), "1 OR 2 OR 3 OR 4");
4443
4444        let expr = BinaryExpr::new(
4445            Arc::new(BinaryExpr::new(
4446                lit(ScalarValue::from(1)),
4447                Operator::And,
4448                lit(ScalarValue::from(2)),
4449            )),
4450            Operator::Or,
4451            Arc::new(BinaryExpr::new(
4452                lit(ScalarValue::from(3)),
4453                Operator::And,
4454                lit(ScalarValue::from(4)),
4455            )),
4456        );
4457        assert_eq!(expr.to_string(), "1 AND 2 OR 3 AND 4");
4458
4459        let expr = BinaryExpr::new(
4460            Arc::new(BinaryExpr::new(
4461                lit(ScalarValue::from(1)),
4462                Operator::Or,
4463                lit(ScalarValue::from(2)),
4464            )),
4465            Operator::And,
4466            Arc::new(BinaryExpr::new(
4467                lit(ScalarValue::from(3)),
4468                Operator::Or,
4469                lit(ScalarValue::from(4)),
4470            )),
4471        );
4472        assert_eq!(expr.to_string(), "(1 OR 2) AND (3 OR 4)");
4473    }
4474
4475    #[test]
4476    fn test_to_result_type_array() {
4477        let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
4478        let keys = Int8Array::from(vec![Some(0), None, Some(2), Some(3)]);
4479        let dictionary =
4480            Arc::new(DictionaryArray::try_new(keys, values).unwrap()) as ArrayRef;
4481
4482        // Casting Dictionary to Int32
4483        let casted = to_result_type_array(
4484            &Operator::Plus,
4485            Arc::clone(&dictionary),
4486            &DataType::Int32,
4487        )
4488        .unwrap();
4489        assert_eq!(
4490            &casted,
4491            &(Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]))
4492                as ArrayRef)
4493        );
4494
4495        // Array has same datatype as result type, no casting
4496        let casted = to_result_type_array(
4497            &Operator::Plus,
4498            Arc::clone(&dictionary),
4499            dictionary.data_type(),
4500        )
4501        .unwrap();
4502        assert_eq!(&casted, &dictionary);
4503
4504        // Not numerical operator, no casting
4505        let casted = to_result_type_array(
4506            &Operator::Eq,
4507            Arc::clone(&dictionary),
4508            &DataType::Int32,
4509        )
4510        .unwrap();
4511        assert_eq!(&casted, &dictionary);
4512    }
4513
4514    #[test]
4515    fn test_add_with_overflow() -> Result<()> {
4516        // create test data
4517        let l = Arc::new(Int32Array::from(vec![1, i32::MAX]));
4518        let r = Arc::new(Int32Array::from(vec![2, 1]));
4519        let schema = Arc::new(Schema::new(vec![
4520            Field::new("l", DataType::Int32, false),
4521            Field::new("r", DataType::Int32, false),
4522        ]));
4523        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4524
4525        // create expression
4526        let expr = BinaryExpr::new(
4527            Arc::new(Column::new("l", 0)),
4528            Operator::Plus,
4529            Arc::new(Column::new("r", 1)),
4530        )
4531        .with_fail_on_overflow(true);
4532
4533        // evaluate expression
4534        let result = expr.evaluate(&batch);
4535        assert!(
4536            result
4537                .err()
4538                .unwrap()
4539                .to_string()
4540                .contains("Overflow happened on: 2147483647 + 1")
4541        );
4542        Ok(())
4543    }
4544
4545    #[test]
4546    fn test_subtract_with_overflow() -> Result<()> {
4547        // create test data
4548        let l = Arc::new(Int32Array::from(vec![1, i32::MIN]));
4549        let r = Arc::new(Int32Array::from(vec![2, 1]));
4550        let schema = Arc::new(Schema::new(vec![
4551            Field::new("l", DataType::Int32, false),
4552            Field::new("r", DataType::Int32, false),
4553        ]));
4554        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4555
4556        // create expression
4557        let expr = BinaryExpr::new(
4558            Arc::new(Column::new("l", 0)),
4559            Operator::Minus,
4560            Arc::new(Column::new("r", 1)),
4561        )
4562        .with_fail_on_overflow(true);
4563
4564        // evaluate expression
4565        let result = expr.evaluate(&batch);
4566        assert!(
4567            result
4568                .err()
4569                .unwrap()
4570                .to_string()
4571                .contains("Overflow happened on: -2147483648 - 1")
4572        );
4573        Ok(())
4574    }
4575
4576    #[test]
4577    fn test_mul_with_overflow() -> Result<()> {
4578        // create test data
4579        let l = Arc::new(Int32Array::from(vec![1, i32::MAX]));
4580        let r = Arc::new(Int32Array::from(vec![2, 2]));
4581        let schema = Arc::new(Schema::new(vec![
4582            Field::new("l", DataType::Int32, false),
4583            Field::new("r", DataType::Int32, false),
4584        ]));
4585        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4586
4587        // create expression
4588        let expr = BinaryExpr::new(
4589            Arc::new(Column::new("l", 0)),
4590            Operator::Multiply,
4591            Arc::new(Column::new("r", 1)),
4592        )
4593        .with_fail_on_overflow(true);
4594
4595        // evaluate expression
4596        let result = expr.evaluate(&batch);
4597        assert!(
4598            result
4599                .err()
4600                .unwrap()
4601                .to_string()
4602                .contains("Overflow happened on: 2147483647 * 2")
4603        );
4604        Ok(())
4605    }
4606
4607    /// Test helper for SIMILAR TO binary operation
4608    fn apply_similar_to(
4609        schema: &SchemaRef,
4610        va: Vec<&str>,
4611        vb: Vec<&str>,
4612        negated: bool,
4613        case_insensitive: bool,
4614        expected: &BooleanArray,
4615    ) -> Result<()> {
4616        let a = StringArray::from(va);
4617        let b = StringArray::from(vb);
4618        let op = similar_to(
4619            negated,
4620            case_insensitive,
4621            col("a", schema)?,
4622            col("b", schema)?,
4623        )?;
4624        let batch =
4625            RecordBatch::try_new(Arc::clone(schema), vec![Arc::new(a), Arc::new(b)])?;
4626        let result = op
4627            .evaluate(&batch)?
4628            .into_array(batch.num_rows())
4629            .expect("Failed to convert to array");
4630        assert_eq!(result.as_ref(), expected);
4631
4632        Ok(())
4633    }
4634
4635    #[test]
4636    fn test_similar_to() {
4637        let schema = Arc::new(Schema::new(vec![
4638            Field::new("a", DataType::Utf8, false),
4639            Field::new("b", DataType::Utf8, false),
4640        ]));
4641
4642        let expected = [Some(true), Some(false)].iter().collect();
4643        // case-sensitive
4644        apply_similar_to(
4645            &schema,
4646            vec!["hello world", "Hello World"],
4647            vec!["hello.*", "hello.*"],
4648            false,
4649            false,
4650            &expected,
4651        )
4652        .unwrap();
4653        // case-insensitive
4654        apply_similar_to(
4655            &schema,
4656            vec!["hello world", "bye"],
4657            vec!["hello.*", "hello.*"],
4658            false,
4659            true,
4660            &expected,
4661        )
4662        .unwrap();
4663    }
4664
4665    pub fn binary_expr(
4666        left: Arc<dyn PhysicalExpr>,
4667        op: Operator,
4668        right: Arc<dyn PhysicalExpr>,
4669        schema: &Schema,
4670    ) -> Result<BinaryExpr> {
4671        Ok(binary_op(left, op, right, schema)?
4672            .downcast_ref::<BinaryExpr>()
4673            .unwrap()
4674            .clone())
4675    }
4676
4677    /// Test for Uniform-Uniform, Unknown-Uniform, Uniform-Unknown and Unknown-Unknown evaluation.
4678    #[test]
4679    #[expect(deprecated)]
4680    fn test_evaluate_statistics_combination_of_range_holders() -> Result<()> {
4681        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4682        let a = Arc::new(Column::new("a", 0)) as _;
4683        let b = lit(ScalarValue::from(12.0));
4684
4685        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4686        let right_interval = Interval::make(Some(12.0), Some(36.0))?;
4687        let (left_mean, right_mean) = (ScalarValue::from(6.0), ScalarValue::from(24.0));
4688        let (left_med, right_med) = (ScalarValue::from(6.0), ScalarValue::from(24.0));
4689
4690        for children in [
4691            vec![
4692                &Distribution::new_uniform(left_interval.clone())?,
4693                &Distribution::new_uniform(right_interval.clone())?,
4694            ],
4695            vec![
4696                &Distribution::new_generic(
4697                    left_mean.clone(),
4698                    left_med.clone(),
4699                    ScalarValue::Float64(None),
4700                    left_interval.clone(),
4701                )?,
4702                &Distribution::new_uniform(right_interval.clone())?,
4703            ],
4704            vec![
4705                &Distribution::new_uniform(right_interval.clone())?,
4706                &Distribution::new_generic(
4707                    right_mean.clone(),
4708                    right_med.clone(),
4709                    ScalarValue::Float64(None),
4710                    right_interval.clone(),
4711                )?,
4712            ],
4713            vec![
4714                &Distribution::new_generic(
4715                    left_mean.clone(),
4716                    left_med.clone(),
4717                    ScalarValue::Float64(None),
4718                    left_interval.clone(),
4719                )?,
4720                &Distribution::new_generic(
4721                    right_mean.clone(),
4722                    right_med.clone(),
4723                    ScalarValue::Float64(None),
4724                    right_interval.clone(),
4725                )?,
4726            ],
4727        ] {
4728            let ops = vec![
4729                Operator::Plus,
4730                Operator::Minus,
4731                Operator::Multiply,
4732                Operator::Divide,
4733            ];
4734
4735            for op in ops {
4736                let expr = binary_expr(Arc::clone(&a), op, Arc::clone(&b), schema)?;
4737                assert_eq!(
4738                    expr.evaluate_statistics(&children)?,
4739                    new_generic_from_binary_op(&op, children[0], children[1])?
4740                );
4741            }
4742        }
4743        Ok(())
4744    }
4745
4746    #[test]
4747    #[expect(deprecated)]
4748    fn test_evaluate_statistics_bernoulli() -> Result<()> {
4749        let schema = &Schema::new(vec![
4750            Field::new("a", DataType::Int64, false),
4751            Field::new("b", DataType::Int64, false),
4752        ]);
4753        let a = Arc::new(Column::new("a", 0)) as _;
4754        let b = Arc::new(Column::new("b", 1)) as _;
4755        let eq = Arc::new(binary_expr(
4756            Arc::clone(&a),
4757            Operator::Eq,
4758            Arc::clone(&b),
4759            schema,
4760        )?);
4761        let neq = Arc::new(binary_expr(a, Operator::NotEq, b, schema)?);
4762
4763        let left_stat = &Distribution::new_uniform(Interval::make(Some(0), Some(7))?)?;
4764        let right_stat = &Distribution::new_uniform(Interval::make(Some(4), Some(11))?)?;
4765
4766        // Intervals: [0, 7], [4, 11].
4767        // The intersection is [4, 7], so the probability of equality is 4 / 64 = 1 / 16.
4768        assert_eq!(
4769            eq.evaluate_statistics(&[left_stat, right_stat])?,
4770            Distribution::new_bernoulli(ScalarValue::from(1.0 / 16.0))?
4771        );
4772
4773        // The probability of being distinct is 1 - 1 / 16 = 15 / 16.
4774        assert_eq!(
4775            neq.evaluate_statistics(&[left_stat, right_stat])?,
4776            Distribution::new_bernoulli(ScalarValue::from(15.0 / 16.0))?
4777        );
4778
4779        Ok(())
4780    }
4781
4782    #[test]
4783    #[expect(deprecated)]
4784    fn test_propagate_statistics_combination_of_range_holders_arithmetic() -> Result<()> {
4785        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4786        let a = Arc::new(Column::new("a", 0)) as _;
4787        let b = lit(ScalarValue::from(12.0));
4788
4789        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4790        let right_interval = Interval::make(Some(12.0), Some(36.0))?;
4791
4792        let parent = Distribution::new_uniform(Interval::make(Some(-432.), Some(432.))?)?;
4793        let children = vec![
4794            vec![
4795                Distribution::new_uniform(left_interval.clone())?,
4796                Distribution::new_uniform(right_interval.clone())?,
4797            ],
4798            vec![
4799                Distribution::new_generic(
4800                    ScalarValue::from(6.),
4801                    ScalarValue::from(6.),
4802                    ScalarValue::Float64(None),
4803                    left_interval.clone(),
4804                )?,
4805                Distribution::new_uniform(right_interval.clone())?,
4806            ],
4807            vec![
4808                Distribution::new_uniform(left_interval.clone())?,
4809                Distribution::new_generic(
4810                    ScalarValue::from(12.),
4811                    ScalarValue::from(12.),
4812                    ScalarValue::Float64(None),
4813                    right_interval.clone(),
4814                )?,
4815            ],
4816            vec![
4817                Distribution::new_generic(
4818                    ScalarValue::from(6.),
4819                    ScalarValue::from(6.),
4820                    ScalarValue::Float64(None),
4821                    left_interval.clone(),
4822                )?,
4823                Distribution::new_generic(
4824                    ScalarValue::from(12.),
4825                    ScalarValue::from(12.),
4826                    ScalarValue::Float64(None),
4827                    right_interval.clone(),
4828                )?,
4829            ],
4830        ];
4831
4832        let ops = vec![
4833            Operator::Plus,
4834            Operator::Minus,
4835            Operator::Multiply,
4836            Operator::Divide,
4837        ];
4838
4839        for child_view in children {
4840            let child_refs = child_view.iter().collect::<Vec<_>>();
4841            for op in &ops {
4842                let expr = binary_expr(Arc::clone(&a), *op, Arc::clone(&b), schema)?;
4843                assert_eq!(
4844                    expr.propagate_statistics(&parent, child_refs.as_slice())?,
4845                    Some(child_view.clone())
4846                );
4847            }
4848        }
4849        Ok(())
4850    }
4851
4852    #[test]
4853    #[expect(deprecated)]
4854    fn test_propagate_statistics_combination_of_range_holders_comparison() -> Result<()> {
4855        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4856        let a = Arc::new(Column::new("a", 0)) as _;
4857        let b = lit(ScalarValue::from(12.0));
4858
4859        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4860        let right_interval = Interval::make(Some(6.0), Some(18.0))?;
4861
4862        let one = ScalarValue::from(1.0);
4863        let parent = Distribution::new_bernoulli(one)?;
4864        let children = vec![
4865            vec![
4866                Distribution::new_uniform(left_interval.clone())?,
4867                Distribution::new_uniform(right_interval.clone())?,
4868            ],
4869            vec![
4870                Distribution::new_generic(
4871                    ScalarValue::from(6.),
4872                    ScalarValue::from(6.),
4873                    ScalarValue::Float64(None),
4874                    left_interval.clone(),
4875                )?,
4876                Distribution::new_uniform(right_interval.clone())?,
4877            ],
4878            vec![
4879                Distribution::new_uniform(left_interval.clone())?,
4880                Distribution::new_generic(
4881                    ScalarValue::from(12.),
4882                    ScalarValue::from(12.),
4883                    ScalarValue::Float64(None),
4884                    right_interval.clone(),
4885                )?,
4886            ],
4887            vec![
4888                Distribution::new_generic(
4889                    ScalarValue::from(6.),
4890                    ScalarValue::from(6.),
4891                    ScalarValue::Float64(None),
4892                    left_interval.clone(),
4893                )?,
4894                Distribution::new_generic(
4895                    ScalarValue::from(12.),
4896                    ScalarValue::from(12.),
4897                    ScalarValue::Float64(None),
4898                    right_interval.clone(),
4899                )?,
4900            ],
4901        ];
4902
4903        let ops = vec![
4904            Operator::Eq,
4905            Operator::Gt,
4906            Operator::GtEq,
4907            Operator::Lt,
4908            Operator::LtEq,
4909        ];
4910
4911        for child_view in children {
4912            let child_refs = child_view.iter().collect::<Vec<_>>();
4913            for op in &ops {
4914                let expr = binary_expr(Arc::clone(&a), *op, Arc::clone(&b), schema)?;
4915                assert!(
4916                    expr.propagate_statistics(&parent, child_refs.as_slice())?
4917                        .is_some()
4918                );
4919            }
4920        }
4921
4922        Ok(())
4923    }
4924
4925    #[test]
4926    fn test_fmt_sql() -> Result<()> {
4927        let schema = Schema::new(vec![
4928            Field::new("a", DataType::Int32, false),
4929            Field::new("b", DataType::Int32, false),
4930        ]);
4931
4932        // Test basic binary expressions
4933        let simple_expr = binary_expr(
4934            col("a", &schema)?,
4935            Operator::Plus,
4936            col("b", &schema)?,
4937            &schema,
4938        )?;
4939        let display_string = simple_expr.to_string();
4940        assert_eq!(display_string, "a@0 + b@1");
4941        let sql_string = fmt_sql(&simple_expr).to_string();
4942        assert_eq!(sql_string, "a + b");
4943
4944        // Test nested expressions with different operator precedence
4945        let nested_expr = binary_expr(
4946            Arc::new(binary_expr(
4947                col("a", &schema)?,
4948                Operator::Plus,
4949                col("b", &schema)?,
4950                &schema,
4951            )?),
4952            Operator::Multiply,
4953            col("b", &schema)?,
4954            &schema,
4955        )?;
4956        let display_string = nested_expr.to_string();
4957        assert_eq!(display_string, "(a@0 + b@1) * b@1");
4958        let sql_string = fmt_sql(&nested_expr).to_string();
4959        assert_eq!(sql_string, "(a + b) * b");
4960
4961        // Test nested expressions with same operator precedence
4962        let nested_same_prec = binary_expr(
4963            Arc::new(binary_expr(
4964                col("a", &schema)?,
4965                Operator::Plus,
4966                col("b", &schema)?,
4967                &schema,
4968            )?),
4969            Operator::Plus,
4970            col("b", &schema)?,
4971            &schema,
4972        )?;
4973        let display_string = nested_same_prec.to_string();
4974        assert_eq!(display_string, "a@0 + b@1 + b@1");
4975        let sql_string = fmt_sql(&nested_same_prec).to_string();
4976        assert_eq!(sql_string, "a + b + b");
4977
4978        // Test with literals
4979        let lit_expr = binary_expr(
4980            col("a", &schema)?,
4981            Operator::Eq,
4982            lit(ScalarValue::Int32(Some(42))),
4983            &schema,
4984        )?;
4985        let display_string = lit_expr.to_string();
4986        assert_eq!(display_string, "a@0 = 42");
4987        let sql_string = fmt_sql(&lit_expr).to_string();
4988        assert_eq!(sql_string, "a = 42");
4989
4990        Ok(())
4991    }
4992
4993    #[test]
4994    fn test_check_short_circuit() {
4995        // Test with non-nullable arrays
4996        let schema = Arc::new(Schema::new(vec![
4997            Field::new("a", DataType::Int32, false),
4998            Field::new("b", DataType::Int32, false),
4999        ]));
5000        let a_array = Int32Array::from(vec![1, 3, 4, 5, 6]);
5001        let b_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
5002        let batch = RecordBatch::try_new(
5003            Arc::clone(&schema),
5004            vec![Arc::new(a_array), Arc::new(b_array)],
5005        )
5006        .unwrap();
5007
5008        // op: AND left: all false
5009        let left_expr = logical2physical(&logical_col("a").eq(expr_lit(2)), &schema);
5010        let left_value = left_expr.evaluate(&batch).unwrap();
5011        assert!(matches!(
5012            check_short_circuit(&left_value, &Operator::And),
5013            ShortCircuitStrategy::ReturnLeft
5014        ));
5015
5016        // op: AND left: not all false
5017        let left_expr = logical2physical(&logical_col("a").eq(expr_lit(3)), &schema);
5018        let left_value = left_expr.evaluate(&batch).unwrap();
5019        let ColumnarValue::Array(array) = &left_value else {
5020            panic!("Expected ColumnarValue::Array");
5021        };
5022        let ShortCircuitStrategy::PreSelection(value) =
5023            check_short_circuit(&left_value, &Operator::And)
5024        else {
5025            panic!("Expected ShortCircuitStrategy::PreSelection");
5026        };
5027        let expected_boolean_arr: Vec<_> =
5028            as_boolean_array(array).unwrap().iter().collect();
5029        let boolean_arr: Vec<_> = value.iter().collect();
5030        assert_eq!(expected_boolean_arr, boolean_arr);
5031
5032        // op: OR left: all true
5033        let left_expr = logical2physical(&logical_col("a").gt(expr_lit(0)), &schema);
5034        let left_value = left_expr.evaluate(&batch).unwrap();
5035        assert!(matches!(
5036            check_short_circuit(&left_value, &Operator::Or),
5037            ShortCircuitStrategy::ReturnLeft
5038        ));
5039
5040        // op: OR left: not all true
5041        let left_expr: Arc<dyn PhysicalExpr> =
5042            logical2physical(&logical_col("a").gt(expr_lit(2)), &schema);
5043        let left_value = left_expr.evaluate(&batch).unwrap();
5044        assert!(matches!(
5045            check_short_circuit(&left_value, &Operator::Or),
5046            ShortCircuitStrategy::None
5047        ));
5048
5049        // Test with nullable arrays and null values
5050        let schema_nullable = Arc::new(Schema::new(vec![
5051            Field::new("c", DataType::Boolean, true),
5052            Field::new("d", DataType::Boolean, true),
5053        ]));
5054
5055        // Create arrays with null values
5056        let c_array = Arc::new(BooleanArray::from(vec![
5057            Some(true),
5058            Some(false),
5059            None,
5060            Some(true),
5061            None,
5062        ])) as ArrayRef;
5063        let d_array = Arc::new(BooleanArray::from(vec![
5064            Some(false),
5065            Some(true),
5066            Some(false),
5067            None,
5068            Some(true),
5069        ])) as ArrayRef;
5070
5071        let batch_nullable = RecordBatch::try_new(
5072            Arc::clone(&schema_nullable),
5073            vec![Arc::clone(&c_array), Arc::clone(&d_array)],
5074        )
5075        .unwrap();
5076
5077        // Case: Mixed values with nulls - shouldn't short-circuit for AND
5078        let mixed_nulls = logical2physical(&logical_col("c"), &schema_nullable);
5079        let mixed_nulls_value = mixed_nulls.evaluate(&batch_nullable).unwrap();
5080        assert!(matches!(
5081            check_short_circuit(&mixed_nulls_value, &Operator::And),
5082            ShortCircuitStrategy::None
5083        ));
5084
5085        // Case: Mixed values with nulls - shouldn't short-circuit for OR
5086        assert!(matches!(
5087            check_short_circuit(&mixed_nulls_value, &Operator::Or),
5088            ShortCircuitStrategy::None
5089        ));
5090
5091        // Test with all nulls
5092        let all_nulls = Arc::new(BooleanArray::from(vec![None, None, None])) as ArrayRef;
5093        let null_batch = RecordBatch::try_new(
5094            Arc::new(Schema::new(vec![Field::new("e", DataType::Boolean, true)])),
5095            vec![all_nulls],
5096        )
5097        .unwrap();
5098
5099        let null_expr = logical2physical(&logical_col("e"), &null_batch.schema());
5100        let null_value = null_expr.evaluate(&null_batch).unwrap();
5101
5102        // All nulls shouldn't short-circuit for AND or OR
5103        assert!(matches!(
5104            check_short_circuit(&null_value, &Operator::And),
5105            ShortCircuitStrategy::None
5106        ));
5107        assert!(matches!(
5108            check_short_circuit(&null_value, &Operator::Or),
5109            ShortCircuitStrategy::None
5110        ));
5111
5112        // Test with scalar values
5113        // Scalar true
5114        let scalar_true = ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)));
5115        assert!(matches!(
5116            check_short_circuit(&scalar_true, &Operator::Or),
5117            ShortCircuitStrategy::ReturnLeft
5118        )); // Should short-circuit OR
5119        assert!(matches!(
5120            check_short_circuit(&scalar_true, &Operator::And),
5121            ShortCircuitStrategy::ReturnRight
5122        )); // Should return the RHS for AND
5123
5124        // Scalar false
5125        let scalar_false = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)));
5126        assert!(matches!(
5127            check_short_circuit(&scalar_false, &Operator::And),
5128            ShortCircuitStrategy::ReturnLeft
5129        )); // Should short-circuit AND
5130        assert!(matches!(
5131            check_short_circuit(&scalar_false, &Operator::Or),
5132            ShortCircuitStrategy::ReturnRight
5133        )); // Should return the RHS for OR
5134
5135        // Scalar null
5136        let scalar_null = ColumnarValue::Scalar(ScalarValue::Boolean(None));
5137        assert!(matches!(
5138            check_short_circuit(&scalar_null, &Operator::And),
5139            ShortCircuitStrategy::None
5140        ));
5141        assert!(matches!(
5142            check_short_circuit(&scalar_null, &Operator::Or),
5143            ShortCircuitStrategy::None
5144        ));
5145    }
5146
5147    /// Test for [pre_selection_scatter]
5148    /// Since [check_short_circuit] ensures that the left side does not contain null and is neither all_true nor all_false, as well as not being empty,
5149    /// the following tests have been designed:
5150    /// 1. Test sparse left with interleaved true/false
5151    /// 2. Test multiple consecutive true blocks
5152    /// 3. Test multiple consecutive true blocks
5153    /// 4. Test single true at first position
5154    /// 5. Test single true at last position
5155    /// 6. Test nulls in right array
5156    #[test]
5157    fn test_pre_selection_scatter() {
5158        fn create_bool_array(bools: Vec<bool>) -> BooleanArray {
5159            BooleanArray::from(bools.into_iter().map(Some).collect::<Vec<_>>())
5160        }
5161        // Test sparse left with interleaved true/false
5162        {
5163            // Left: [T, F, T, F, T]
5164            // Right: [F, T, F] (values for 3 true positions)
5165            let left = create_bool_array(vec![true, false, true, false, true]);
5166            let right = create_bool_array(vec![false, true, false]);
5167
5168            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5169            let result_arr = result.into_array(left.len()).unwrap();
5170
5171            let expected = create_bool_array(vec![false, false, true, false, false]);
5172            assert_eq!(&expected, result_arr.as_boolean());
5173        }
5174        // Test multiple consecutive true blocks
5175        {
5176            // Left: [F, T, T, F, T, T, T]
5177            // Right: [T, F, F, T, F]
5178            let left =
5179                create_bool_array(vec![false, true, true, false, true, true, true]);
5180            let right = create_bool_array(vec![true, false, false, true, false]);
5181
5182            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5183            let result_arr = result.into_array(left.len()).unwrap();
5184
5185            let expected =
5186                create_bool_array(vec![false, true, false, false, false, true, false]);
5187            assert_eq!(&expected, result_arr.as_boolean());
5188        }
5189        // Test single true at first position
5190        {
5191            // Left: [T, F, F]
5192            // Right: [F]
5193            let left = create_bool_array(vec![true, false, false]);
5194            let right = create_bool_array(vec![false]);
5195
5196            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5197            let result_arr = result.into_array(left.len()).unwrap();
5198
5199            let expected = create_bool_array(vec![false, false, false]);
5200            assert_eq!(&expected, result_arr.as_boolean());
5201        }
5202        // Test single true at last position
5203        {
5204            // Left: [F, F, T]
5205            // Right: [F]
5206            let left = create_bool_array(vec![false, false, true]);
5207            let right = create_bool_array(vec![false]);
5208
5209            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5210            let result_arr = result.into_array(left.len()).unwrap();
5211
5212            let expected = create_bool_array(vec![false, false, false]);
5213            assert_eq!(&expected, result_arr.as_boolean());
5214        }
5215        // Test nulls in right array
5216        {
5217            // Left: [F, T, F, T]
5218            // Right: [None, Some(false)] (with null at first position)
5219            let left = create_bool_array(vec![false, true, false, true]);
5220            let right = BooleanArray::from(vec![None, Some(false)]);
5221
5222            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5223            let result_arr = result.into_array(left.len()).unwrap();
5224
5225            let expected = BooleanArray::from(vec![
5226                Some(false),
5227                None, // null from right
5228                Some(false),
5229                Some(false),
5230            ]);
5231            assert_eq!(&expected, result_arr.as_boolean());
5232        }
5233    }
5234
5235    #[test]
5236    fn test_and_true_preselection_returns_lhs() {
5237        let schema =
5238            Arc::new(Schema::new(vec![Field::new("c", DataType::Boolean, false)]));
5239        let c_array = Arc::new(BooleanArray::from(vec![false, true, false, false, false]))
5240            as ArrayRef;
5241        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::clone(&c_array)])
5242            .unwrap();
5243
5244        let expr = logical2physical(&logical_col("c").and(expr_lit(true)), &schema);
5245
5246        let result = expr.evaluate(&batch).unwrap();
5247        let ColumnarValue::Array(result_arr) = result else {
5248            panic!("Expected ColumnarValue::Array");
5249        };
5250
5251        let expected: Vec<_> = c_array.as_boolean().iter().collect();
5252        let actual: Vec<_> = result_arr.as_boolean().iter().collect();
5253        assert_eq!(
5254            expected, actual,
5255            "AND with TRUE must equal LHS even with PreSelection"
5256        );
5257    }
5258
5259    #[test]
5260    fn test_evaluate_bounds_int32() {
5261        let schema = Schema::new(vec![
5262            Field::new("a", DataType::Int32, false),
5263            Field::new("b", DataType::Int32, false),
5264        ]);
5265
5266        let a = Arc::new(Column::new("a", 0)) as _;
5267        let b = Arc::new(Column::new("b", 1)) as _;
5268
5269        // Test addition bounds
5270        let add_expr =
5271            binary_expr(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema).unwrap();
5272        let add_bounds = add_expr
5273            .evaluate_bounds(&[
5274                &Interval::make(Some(1), Some(10)).unwrap(),
5275                &Interval::make(Some(5), Some(15)).unwrap(),
5276            ])
5277            .unwrap();
5278        assert_eq!(add_bounds, Interval::make(Some(6), Some(25)).unwrap());
5279
5280        // Test subtraction bounds
5281        let sub_expr =
5282            binary_expr(Arc::clone(&a), Operator::Minus, Arc::clone(&b), &schema)
5283                .unwrap();
5284        let sub_bounds = sub_expr
5285            .evaluate_bounds(&[
5286                &Interval::make(Some(1), Some(10)).unwrap(),
5287                &Interval::make(Some(5), Some(15)).unwrap(),
5288            ])
5289            .unwrap();
5290        assert_eq!(sub_bounds, Interval::make(Some(-14), Some(5)).unwrap());
5291
5292        // Test multiplication bounds
5293        let mul_expr =
5294            binary_expr(Arc::clone(&a), Operator::Multiply, Arc::clone(&b), &schema)
5295                .unwrap();
5296        let mul_bounds = mul_expr
5297            .evaluate_bounds(&[
5298                &Interval::make(Some(1), Some(10)).unwrap(),
5299                &Interval::make(Some(5), Some(15)).unwrap(),
5300            ])
5301            .unwrap();
5302        assert_eq!(mul_bounds, Interval::make(Some(5), Some(150)).unwrap());
5303
5304        // Test division bounds
5305        let div_expr =
5306            binary_expr(Arc::clone(&a), Operator::Divide, Arc::clone(&b), &schema)
5307                .unwrap();
5308        let div_bounds = div_expr
5309            .evaluate_bounds(&[
5310                &Interval::make(Some(10), Some(20)).unwrap(),
5311                &Interval::make(Some(2), Some(5)).unwrap(),
5312            ])
5313            .unwrap();
5314        assert_eq!(div_bounds, Interval::make(Some(2), Some(10)).unwrap());
5315    }
5316
5317    #[test]
5318    fn test_evaluate_bounds_bool() {
5319        let schema = Schema::new(vec![
5320            Field::new("a", DataType::Boolean, false),
5321            Field::new("b", DataType::Boolean, false),
5322        ]);
5323
5324        let a = Arc::new(Column::new("a", 0)) as _;
5325        let b = Arc::new(Column::new("b", 1)) as _;
5326
5327        // Test OR bounds
5328        let or_expr =
5329            binary_expr(Arc::clone(&a), Operator::Or, Arc::clone(&b), &schema).unwrap();
5330        let or_bounds = or_expr
5331            .evaluate_bounds(&[
5332                &Interval::make(Some(true), Some(true)).unwrap(),
5333                &Interval::make(Some(false), Some(false)).unwrap(),
5334            ])
5335            .unwrap();
5336        assert_eq!(or_bounds, Interval::make(Some(true), Some(true)).unwrap());
5337
5338        // Test AND bounds
5339        let and_expr =
5340            binary_expr(Arc::clone(&a), Operator::And, Arc::clone(&b), &schema).unwrap();
5341        let and_bounds = and_expr
5342            .evaluate_bounds(&[
5343                &Interval::make(Some(true), Some(true)).unwrap(),
5344                &Interval::make(Some(false), Some(false)).unwrap(),
5345            ])
5346            .unwrap();
5347        assert_eq!(
5348            and_bounds,
5349            Interval::make(Some(false), Some(false)).unwrap()
5350        );
5351    }
5352
5353    #[test]
5354    fn test_evaluate_nested_type() {
5355        let batch_schema = Arc::new(Schema::new(vec![
5356            Field::new(
5357                "a",
5358                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
5359                true,
5360            ),
5361            Field::new(
5362                "b",
5363                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
5364                true,
5365            ),
5366        ]));
5367
5368        let mut list_builder_a = ListBuilder::new(Int32Builder::new());
5369
5370        list_builder_a.append_value([Some(1)]);
5371        list_builder_a.append_value([Some(2)]);
5372        list_builder_a.append_value([]);
5373        list_builder_a.append_value([None]);
5374
5375        let list_array_a: ArrayRef = Arc::new(list_builder_a.finish());
5376
5377        let mut list_builder_b = ListBuilder::new(Int32Builder::new());
5378
5379        list_builder_b.append_value([Some(1)]);
5380        list_builder_b.append_value([Some(2)]);
5381        list_builder_b.append_value([]);
5382        list_builder_b.append_value([None]);
5383
5384        let list_array_b: ArrayRef = Arc::new(list_builder_b.finish());
5385
5386        let batch =
5387            RecordBatch::try_new(batch_schema, vec![list_array_a, list_array_b]).unwrap();
5388
5389        let schema = Arc::new(Schema::new(vec![
5390            Field::new(
5391                "a",
5392                DataType::List(Arc::new(Field::new("foo", DataType::Int32, true))),
5393                true,
5394            ),
5395            Field::new(
5396                "b",
5397                DataType::List(Arc::new(Field::new("bar", DataType::Int32, true))),
5398                true,
5399            ),
5400        ]));
5401
5402        let a = Arc::new(Column::new("a", 0)) as _;
5403        let b = Arc::new(Column::new("b", 1)) as _;
5404
5405        let eq_expr =
5406            binary_expr(Arc::clone(&a), Operator::Eq, Arc::clone(&b), &schema).unwrap();
5407
5408        let eq_result = eq_expr.evaluate(&batch).unwrap();
5409        let expected =
5410            BooleanArray::from_iter(vec![Some(true), Some(true), Some(true), Some(true)]);
5411        assert_eq!(eq_result.into_array(4).unwrap().as_boolean(), &expected);
5412    }
5413}