datafusion_physical_expr/expressions/
binary.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod kernels;
19
20use crate::expressions::binary::kernels::concat_elements_utf8view;
21use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
22use crate::PhysicalExpr;
23use std::hash::Hash;
24use std::{any::Any, sync::Arc};
25
26use arrow::array::*;
27use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
28use arrow::compute::kernels::cmp::*;
29use arrow::compute::kernels::comparison::{regexp_is_match, regexp_is_match_scalar};
30use arrow::compute::kernels::concat_elements::concat_elements_utf8;
31use arrow::compute::{
32    cast, filter_record_batch, ilike, like, nilike, nlike, SlicesIterator,
33};
34use arrow::datatypes::*;
35use arrow::error::ArrowError;
36use datafusion_common::cast::as_boolean_array;
37use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
38use datafusion_expr::binary::BinaryTypeCoercer;
39use datafusion_expr::interval_arithmetic::{apply_operator, Interval};
40use datafusion_expr::sort_properties::ExprProperties;
41use datafusion_expr::statistics::Distribution::{Bernoulli, Gaussian};
42use datafusion_expr::statistics::{
43    combine_bernoullis, combine_gaussians, create_bernoulli_from_comparison,
44    new_generic_from_binary_op, Distribution,
45};
46use datafusion_expr::{ColumnarValue, Operator};
47use datafusion_physical_expr_common::datum::{apply, apply_cmp, apply_cmp_for_nested};
48
49use kernels::{
50    bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar,
51    bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn,
52    bitwise_shift_right_dyn_scalar, bitwise_xor_dyn, bitwise_xor_dyn_scalar,
53};
54
55/// Binary expression
56#[derive(Debug, Clone, Eq)]
57pub struct BinaryExpr {
58    left: Arc<dyn PhysicalExpr>,
59    op: Operator,
60    right: Arc<dyn PhysicalExpr>,
61    /// Specifies whether an error is returned on overflow or not
62    fail_on_overflow: bool,
63}
64
65// Manually derive PartialEq and Hash to work around https://github.com/rust-lang/rust/issues/78808
66impl PartialEq for BinaryExpr {
67    fn eq(&self, other: &Self) -> bool {
68        self.left.eq(&other.left)
69            && self.op.eq(&other.op)
70            && self.right.eq(&other.right)
71            && self.fail_on_overflow.eq(&other.fail_on_overflow)
72    }
73}
74impl Hash for BinaryExpr {
75    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
76        self.left.hash(state);
77        self.op.hash(state);
78        self.right.hash(state);
79        self.fail_on_overflow.hash(state);
80    }
81}
82
83impl BinaryExpr {
84    /// Create new binary expression
85    pub fn new(
86        left: Arc<dyn PhysicalExpr>,
87        op: Operator,
88        right: Arc<dyn PhysicalExpr>,
89    ) -> Self {
90        Self {
91            left,
92            op,
93            right,
94            fail_on_overflow: false,
95        }
96    }
97
98    /// Create new binary expression with explicit fail_on_overflow value
99    pub fn with_fail_on_overflow(self, fail_on_overflow: bool) -> Self {
100        Self {
101            left: self.left,
102            op: self.op,
103            right: self.right,
104            fail_on_overflow,
105        }
106    }
107
108    /// Get the left side of the binary expression
109    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
110        &self.left
111    }
112
113    /// Get the right side of the binary expression
114    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
115        &self.right
116    }
117
118    /// Get the operator for this binary expression
119    pub fn op(&self) -> &Operator {
120        &self.op
121    }
122}
123
124impl std::fmt::Display for BinaryExpr {
125    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
126        // Put parentheses around child binary expressions so that we can see the difference
127        // between `(a OR b) AND c` and `a OR (b AND c)`. We only insert parentheses when needed,
128        // based on operator precedence. For example, `(a AND b) OR c` and `a AND b OR c` are
129        // equivalent and the parentheses are not necessary.
130
131        fn write_child(
132            f: &mut std::fmt::Formatter,
133            expr: &dyn PhysicalExpr,
134            precedence: u8,
135        ) -> std::fmt::Result {
136            if let Some(child) = expr.as_any().downcast_ref::<BinaryExpr>() {
137                let p = child.op.precedence();
138                if p == 0 || p < precedence {
139                    write!(f, "({child})")?;
140                } else {
141                    write!(f, "{child}")?;
142                }
143            } else {
144                write!(f, "{expr}")?;
145            }
146
147            Ok(())
148        }
149
150        let precedence = self.op.precedence();
151        write_child(f, self.left.as_ref(), precedence)?;
152        write!(f, " {} ", self.op)?;
153        write_child(f, self.right.as_ref(), precedence)
154    }
155}
156
157/// Invoke a boolean kernel on a pair of arrays
158#[inline]
159fn boolean_op(
160    left: &dyn Array,
161    right: &dyn Array,
162    op: impl FnOnce(&BooleanArray, &BooleanArray) -> Result<BooleanArray, ArrowError>,
163) -> Result<Arc<(dyn Array + 'static)>, ArrowError> {
164    let ll = as_boolean_array(left).expect("boolean_op failed to downcast left array");
165    let rr = as_boolean_array(right).expect("boolean_op failed to downcast right array");
166    op(ll, rr).map(|t| Arc::new(t) as _)
167}
168
169macro_rules! binary_string_array_flag_op {
170    ($LEFT:expr, $RIGHT:expr, $OP:ident, $NOT:expr, $FLAG:expr) => {{
171        match $LEFT.data_type() {
172            DataType::Utf8 => {
173                compute_utf8_flag_op!($LEFT, $RIGHT, $OP, StringArray, $NOT, $FLAG)
174            },
175            DataType::Utf8View => {
176                compute_utf8view_flag_op!($LEFT, $RIGHT, $OP, StringViewArray, $NOT, $FLAG)
177            }
178            DataType::LargeUtf8 => {
179                compute_utf8_flag_op!($LEFT, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG)
180            },
181            other => internal_err!(
182                "Data type {:?} not supported for binary_string_array_flag_op operation '{}' on string array",
183                other, stringify!($OP)
184            ),
185        }
186    }};
187}
188
189/// Invoke a compute kernel on a pair of binary data arrays with flags
190macro_rules! compute_utf8_flag_op {
191    ($LEFT:expr, $RIGHT:expr, $OP:ident, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
192        let ll = $LEFT
193            .as_any()
194            .downcast_ref::<$ARRAYTYPE>()
195            .expect("compute_utf8_flag_op failed to downcast array");
196        let rr = $RIGHT
197            .as_any()
198            .downcast_ref::<$ARRAYTYPE>()
199            .expect("compute_utf8_flag_op failed to downcast array");
200
201        let flag = if $FLAG {
202            Some($ARRAYTYPE::from(vec!["i"; ll.len()]))
203        } else {
204            None
205        };
206        let mut array = $OP(ll, rr, flag.as_ref())?;
207        if $NOT {
208            array = not(&array).unwrap();
209        }
210        Ok(Arc::new(array))
211    }};
212}
213
214/// Invoke a compute kernel on a pair of binary data arrays with flags
215macro_rules! compute_utf8view_flag_op {
216    ($LEFT:expr, $RIGHT:expr, $OP:ident, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
217        let ll = $LEFT
218            .as_any()
219            .downcast_ref::<$ARRAYTYPE>()
220            .expect("compute_utf8view_flag_op failed to downcast array");
221        let rr = $RIGHT
222            .as_any()
223            .downcast_ref::<$ARRAYTYPE>()
224            .expect("compute_utf8view_flag_op failed to downcast array");
225
226        let flag = if $FLAG {
227            Some($ARRAYTYPE::from(vec!["i"; ll.len()]))
228        } else {
229            None
230        };
231        let mut array = $OP(ll, rr, flag.as_ref())?;
232        if $NOT {
233            array = not(&array).unwrap();
234        }
235        Ok(Arc::new(array))
236    }};
237}
238
239macro_rules! binary_string_array_flag_op_scalar {
240    ($LEFT:ident, $RIGHT:expr, $OP:ident, $NOT:expr, $FLAG:expr) => {{
241        // This macro is slightly different from binary_string_array_flag_op because, when comparing with a scalar value,
242        // the query can be optimized in such a way that operands will be dicts, so we need to support it here
243        let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
244            DataType::Utf8 => {
245                compute_utf8_flag_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $NOT, $FLAG)
246            },
247            DataType::Utf8View => {
248                compute_utf8view_flag_op_scalar!($LEFT, $RIGHT, $OP, StringViewArray, $NOT, $FLAG)
249            }
250            DataType::LargeUtf8 => {
251                compute_utf8_flag_op_scalar!($LEFT, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG)
252            },
253            DataType::Dictionary(_, _) => {
254                let values = $LEFT.as_any_dictionary().values();
255
256                match values.data_type() {
257                    DataType::Utf8 => compute_utf8_flag_op_scalar!(values, $RIGHT, $OP, StringArray, $NOT, $FLAG),
258                    DataType::Utf8View => compute_utf8view_flag_op_scalar!(values, $RIGHT, $OP, StringViewArray, $NOT, $FLAG),
259                    DataType::LargeUtf8 => compute_utf8_flag_op_scalar!(values, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG),
260                    other => internal_err!(
261                        "Data type {:?} not supported as a dictionary value type for binary_string_array_flag_op_scalar operation '{}' on string array",
262                        other, stringify!($OP)
263                    ),
264                }.map(
265                    // downcast_dictionary_array duplicates code per possible key type, so we aim to do all prep work before
266                    |evaluated_values| downcast_dictionary_array! {
267                        $LEFT => {
268                            let unpacked_dict = evaluated_values.take_iter($LEFT.keys().iter().map(|opt| opt.map(|v| v as _))).collect::<BooleanArray>();
269                            Arc::new(unpacked_dict) as _
270                        },
271                        _ => unreachable!(),
272                    }
273                )
274            },
275            other => internal_err!(
276                "Data type {:?} not supported for binary_string_array_flag_op_scalar operation '{}' on string array",
277                other, stringify!($OP)
278            ),
279        };
280        Some(result)
281    }};
282}
283
284/// Invoke a compute kernel on a data array and a scalar value with flag
285macro_rules! compute_utf8_flag_op_scalar {
286    ($LEFT:expr, $RIGHT:expr, $OP:ident, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
287        let ll = $LEFT
288            .as_any()
289            .downcast_ref::<$ARRAYTYPE>()
290            .expect("compute_utf8_flag_op_scalar failed to downcast array");
291
292        let string_value = match $RIGHT.try_as_str() {
293            Some(Some(string_value)) => string_value,
294            // null literal or non string
295            _ => return internal_err!(
296                        "compute_utf8_flag_op_scalar failed to cast literal value {} for operation '{}'",
297                        $RIGHT, stringify!($OP)
298                    )
299        };
300
301        let flag = $FLAG.then_some("i");
302        let mut array =
303            paste::expr! {[<$OP _scalar>]}(ll, &string_value, flag)?;
304        if $NOT {
305            array = not(&array).unwrap();
306        }
307
308        Ok(Arc::new(array))
309    }};
310}
311
312/// Invoke a compute kernel on a data array and a scalar value with flag
313macro_rules! compute_utf8view_flag_op_scalar {
314    ($LEFT:expr, $RIGHT:expr, $OP:ident, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
315        let ll = $LEFT
316            .as_any()
317            .downcast_ref::<$ARRAYTYPE>()
318            .expect("compute_utf8view_flag_op_scalar failed to downcast array");
319
320        let string_value = match $RIGHT.try_as_str() {
321            Some(Some(string_value)) => string_value,
322            // null literal or non string
323            _ => return internal_err!(
324                        "compute_utf8view_flag_op_scalar failed to cast literal value {} for operation '{}'",
325                        $RIGHT, stringify!($OP)
326                    )
327        };
328
329        let flag = $FLAG.then_some("i");
330        let mut array =
331            paste::expr! {[<$OP _scalar>]}(ll, &string_value, flag)?;
332        if $NOT {
333            array = not(&array).unwrap();
334        }
335
336        Ok(Arc::new(array))
337    }};
338}
339
340impl PhysicalExpr for BinaryExpr {
341    /// Return a reference to Any that can be used for downcasting
342    fn as_any(&self) -> &dyn Any {
343        self
344    }
345
346    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
347        BinaryTypeCoercer::new(
348            &self.left.data_type(input_schema)?,
349            &self.op,
350            &self.right.data_type(input_schema)?,
351        )
352        .get_result_type()
353    }
354
355    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
356        Ok(self.left.nullable(input_schema)? || self.right.nullable(input_schema)?)
357    }
358
359    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
360        use arrow::compute::kernels::numeric::*;
361
362        // Evaluate left-hand side expression.
363        let lhs = self.left.evaluate(batch)?;
364
365        // Check if we can apply short-circuit evaluation.
366        match check_short_circuit(&lhs, &self.op) {
367            ShortCircuitStrategy::None => {}
368            ShortCircuitStrategy::ReturnLeft => return Ok(lhs),
369            ShortCircuitStrategy::ReturnRight => {
370                let rhs = self.right.evaluate(batch)?;
371                return Ok(rhs);
372            }
373            ShortCircuitStrategy::PreSelection(selection) => {
374                // The function `evaluate_selection` was not called for filtering and calculation,
375                // as it takes into account cases where the selection contains null values.
376                let batch = filter_record_batch(batch, selection)?;
377                let right_ret = self.right.evaluate(&batch)?;
378
379                match &right_ret {
380                    ColumnarValue::Array(array) => {
381                        // When the array on the right is all true or all false, skip the scatter process
382                        let boolean_array = array.as_boolean();
383                        let true_count = boolean_array.true_count();
384                        let length = boolean_array.len();
385                        if true_count == length {
386                            return Ok(lhs);
387                        } else if true_count == 0 && boolean_array.null_count() == 0 {
388                            // If the right-hand array is returned at this point,the lengths will be inconsistent;
389                            // returning a scalar can avoid this issue
390                            return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(
391                                Some(false),
392                            )));
393                        }
394
395                        return pre_selection_scatter(selection, Some(boolean_array));
396                    }
397                    ColumnarValue::Scalar(scalar) => {
398                        if let ScalarValue::Boolean(v) = scalar {
399                            // When the scalar is true or false, skip the scatter process
400                            if let Some(v) = v {
401                                if *v {
402                                    return Ok(lhs);
403                                } else {
404                                    return Ok(right_ret);
405                                }
406                            } else {
407                                return pre_selection_scatter(selection, None);
408                            }
409                        } else {
410                            return internal_err!(
411                                "Expected boolean scalar value, found: {right_ret:?}"
412                            );
413                        }
414                    }
415                }
416            }
417        }
418
419        let rhs = self.right.evaluate(batch)?;
420        let left_data_type = lhs.data_type();
421        let right_data_type = rhs.data_type();
422
423        let schema = batch.schema();
424        let input_schema = schema.as_ref();
425
426        if left_data_type.is_nested() {
427            if !left_data_type.equals_datatype(&right_data_type) {
428                return internal_err!("Cannot evaluate binary expression because of type mismatch: left {}, right {} ", left_data_type, right_data_type);
429            }
430            return apply_cmp_for_nested(self.op, &lhs, &rhs);
431        }
432
433        match self.op {
434            Operator::Plus if self.fail_on_overflow => return apply(&lhs, &rhs, add),
435            Operator::Plus => return apply(&lhs, &rhs, add_wrapping),
436            Operator::Minus if self.fail_on_overflow => return apply(&lhs, &rhs, sub),
437            Operator::Minus => return apply(&lhs, &rhs, sub_wrapping),
438            Operator::Multiply if self.fail_on_overflow => return apply(&lhs, &rhs, mul),
439            Operator::Multiply => return apply(&lhs, &rhs, mul_wrapping),
440            Operator::Divide => return apply(&lhs, &rhs, div),
441            Operator::Modulo => return apply(&lhs, &rhs, rem),
442            Operator::Eq => return apply_cmp(&lhs, &rhs, eq),
443            Operator::NotEq => return apply_cmp(&lhs, &rhs, neq),
444            Operator::Lt => return apply_cmp(&lhs, &rhs, lt),
445            Operator::Gt => return apply_cmp(&lhs, &rhs, gt),
446            Operator::LtEq => return apply_cmp(&lhs, &rhs, lt_eq),
447            Operator::GtEq => return apply_cmp(&lhs, &rhs, gt_eq),
448            Operator::IsDistinctFrom => return apply_cmp(&lhs, &rhs, distinct),
449            Operator::IsNotDistinctFrom => return apply_cmp(&lhs, &rhs, not_distinct),
450            Operator::LikeMatch => return apply_cmp(&lhs, &rhs, like),
451            Operator::ILikeMatch => return apply_cmp(&lhs, &rhs, ilike),
452            Operator::NotLikeMatch => return apply_cmp(&lhs, &rhs, nlike),
453            Operator::NotILikeMatch => return apply_cmp(&lhs, &rhs, nilike),
454            _ => {}
455        }
456
457        let result_type = self.data_type(input_schema)?;
458
459        // If the left-hand side is an array and the right-hand side is a non-null scalar, try the optimized kernel.
460        if let (ColumnarValue::Array(array), ColumnarValue::Scalar(ref scalar)) =
461            (&lhs, &rhs)
462        {
463            if !scalar.is_null() {
464                if let Some(result_array) =
465                    self.evaluate_array_scalar(array, scalar.clone())?
466                {
467                    let final_array = result_array
468                        .and_then(|a| to_result_type_array(&self.op, a, &result_type));
469                    return final_array.map(ColumnarValue::Array);
470                }
471            }
472        }
473
474        // if both arrays or both literals - extract arrays and continue execution
475        let (left, right) = (
476            lhs.into_array(batch.num_rows())?,
477            rhs.into_array(batch.num_rows())?,
478        );
479        self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type)
480            .map(ColumnarValue::Array)
481    }
482
483    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
484        vec![&self.left, &self.right]
485    }
486
487    fn with_new_children(
488        self: Arc<Self>,
489        children: Vec<Arc<dyn PhysicalExpr>>,
490    ) -> Result<Arc<dyn PhysicalExpr>> {
491        Ok(Arc::new(
492            BinaryExpr::new(Arc::clone(&children[0]), self.op, Arc::clone(&children[1]))
493                .with_fail_on_overflow(self.fail_on_overflow),
494        ))
495    }
496
497    fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
498        // Get children intervals:
499        let left_interval = children[0];
500        let right_interval = children[1];
501        // Calculate current node's interval:
502        apply_operator(&self.op, left_interval, right_interval)
503    }
504
505    fn propagate_constraints(
506        &self,
507        interval: &Interval,
508        children: &[&Interval],
509    ) -> Result<Option<Vec<Interval>>> {
510        // Get children intervals.
511        let left_interval = children[0];
512        let right_interval = children[1];
513
514        if self.op.eq(&Operator::And) {
515            if interval.eq(&Interval::CERTAINLY_TRUE) {
516                // A certainly true logical conjunction can only derive from possibly
517                // true operands. Otherwise, we prove infeasibility.
518                Ok((!left_interval.eq(&Interval::CERTAINLY_FALSE)
519                    && !right_interval.eq(&Interval::CERTAINLY_FALSE))
520                .then(|| vec![Interval::CERTAINLY_TRUE, Interval::CERTAINLY_TRUE]))
521            } else if interval.eq(&Interval::CERTAINLY_FALSE) {
522                // If the logical conjunction is certainly false, one of the
523                // operands must be false. However, it's not always possible to
524                // determine which operand is false, leading to different scenarios.
525
526                // If one operand is certainly true and the other one is uncertain,
527                // then the latter must be certainly false.
528                if left_interval.eq(&Interval::CERTAINLY_TRUE)
529                    && right_interval.eq(&Interval::UNCERTAIN)
530                {
531                    Ok(Some(vec![
532                        Interval::CERTAINLY_TRUE,
533                        Interval::CERTAINLY_FALSE,
534                    ]))
535                } else if right_interval.eq(&Interval::CERTAINLY_TRUE)
536                    && left_interval.eq(&Interval::UNCERTAIN)
537                {
538                    Ok(Some(vec![
539                        Interval::CERTAINLY_FALSE,
540                        Interval::CERTAINLY_TRUE,
541                    ]))
542                }
543                // If both children are uncertain, or if one is certainly false,
544                // we cannot conclusively refine their intervals. In this case,
545                // propagation does not result in any interval changes.
546                else {
547                    Ok(Some(vec![]))
548                }
549            } else {
550                // An uncertain logical conjunction result can not shrink the
551                // end-points of its children.
552                Ok(Some(vec![]))
553            }
554        } else if self.op.eq(&Operator::Or) {
555            if interval.eq(&Interval::CERTAINLY_FALSE) {
556                // A certainly false logical disjunction can only derive from certainly
557                // false operands. Otherwise, we prove infeasibility.
558                Ok((!left_interval.eq(&Interval::CERTAINLY_TRUE)
559                    && !right_interval.eq(&Interval::CERTAINLY_TRUE))
560                .then(|| vec![Interval::CERTAINLY_FALSE, Interval::CERTAINLY_FALSE]))
561            } else if interval.eq(&Interval::CERTAINLY_TRUE) {
562                // If the logical disjunction is certainly true, one of the
563                // operands must be true. However, it's not always possible to
564                // determine which operand is true, leading to different scenarios.
565
566                // If one operand is certainly false and the other one is uncertain,
567                // then the latter must be certainly true.
568                if left_interval.eq(&Interval::CERTAINLY_FALSE)
569                    && right_interval.eq(&Interval::UNCERTAIN)
570                {
571                    Ok(Some(vec![
572                        Interval::CERTAINLY_FALSE,
573                        Interval::CERTAINLY_TRUE,
574                    ]))
575                } else if right_interval.eq(&Interval::CERTAINLY_FALSE)
576                    && left_interval.eq(&Interval::UNCERTAIN)
577                {
578                    Ok(Some(vec![
579                        Interval::CERTAINLY_TRUE,
580                        Interval::CERTAINLY_FALSE,
581                    ]))
582                }
583                // If both children are uncertain, or if one is certainly true,
584                // we cannot conclusively refine their intervals. In this case,
585                // propagation does not result in any interval changes.
586                else {
587                    Ok(Some(vec![]))
588                }
589            } else {
590                // An uncertain logical disjunction result can not shrink the
591                // end-points of its children.
592                Ok(Some(vec![]))
593            }
594        } else if self.op.supports_propagation() {
595            Ok(
596                propagate_comparison(&self.op, interval, left_interval, right_interval)?
597                    .map(|(left, right)| vec![left, right]),
598            )
599        } else {
600            Ok(
601                propagate_arithmetic(&self.op, interval, left_interval, right_interval)?
602                    .map(|(left, right)| vec![left, right]),
603            )
604        }
605    }
606
607    fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
608        let (left, right) = (children[0], children[1]);
609
610        if self.op.is_numerical_operators() {
611            // We might be able to construct the output statistics more accurately,
612            // without falling back to an unknown distribution, if we are dealing
613            // with Gaussian distributions and numerical operations.
614            if let (Gaussian(left), Gaussian(right)) = (left, right) {
615                if let Some(result) = combine_gaussians(&self.op, left, right)? {
616                    return Ok(Gaussian(result));
617                }
618            }
619        } else if self.op.is_logic_operator() {
620            // If we are dealing with logical operators, we expect (and can only
621            // operate on) Bernoulli distributions.
622            return if let (Bernoulli(left), Bernoulli(right)) = (left, right) {
623                combine_bernoullis(&self.op, left, right).map(Bernoulli)
624            } else {
625                internal_err!(
626                    "Logical operators are only compatible with `Bernoulli` distributions"
627                )
628            };
629        } else if self.op.supports_propagation() {
630            // If we are handling comparison operators, we expect (and can only
631            // operate on) numeric distributions.
632            return create_bernoulli_from_comparison(&self.op, left, right);
633        }
634        // Fall back to an unknown distribution with only summary statistics:
635        new_generic_from_binary_op(&self.op, left, right)
636    }
637
638    /// For each operator, [`BinaryExpr`] has distinct rules.
639    /// TODO: There may be rules specific to some data types and expression ranges.
640    fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
641        let (l_order, l_range) = (children[0].sort_properties, &children[0].range);
642        let (r_order, r_range) = (children[1].sort_properties, &children[1].range);
643        match self.op() {
644            Operator::Plus => Ok(ExprProperties {
645                sort_properties: l_order.add(&r_order),
646                range: l_range.add(r_range)?,
647                preserves_lex_ordering: false,
648            }),
649            Operator::Minus => Ok(ExprProperties {
650                sort_properties: l_order.sub(&r_order),
651                range: l_range.sub(r_range)?,
652                preserves_lex_ordering: false,
653            }),
654            Operator::Gt => Ok(ExprProperties {
655                sort_properties: l_order.gt_or_gteq(&r_order),
656                range: l_range.gt(r_range)?,
657                preserves_lex_ordering: false,
658            }),
659            Operator::GtEq => Ok(ExprProperties {
660                sort_properties: l_order.gt_or_gteq(&r_order),
661                range: l_range.gt_eq(r_range)?,
662                preserves_lex_ordering: false,
663            }),
664            Operator::Lt => Ok(ExprProperties {
665                sort_properties: r_order.gt_or_gteq(&l_order),
666                range: l_range.lt(r_range)?,
667                preserves_lex_ordering: false,
668            }),
669            Operator::LtEq => Ok(ExprProperties {
670                sort_properties: r_order.gt_or_gteq(&l_order),
671                range: l_range.lt_eq(r_range)?,
672                preserves_lex_ordering: false,
673            }),
674            Operator::And => Ok(ExprProperties {
675                sort_properties: r_order.and_or(&l_order),
676                range: l_range.and(r_range)?,
677                preserves_lex_ordering: false,
678            }),
679            Operator::Or => Ok(ExprProperties {
680                sort_properties: r_order.and_or(&l_order),
681                range: l_range.or(r_range)?,
682                preserves_lex_ordering: false,
683            }),
684            _ => Ok(ExprProperties::new_unknown()),
685        }
686    }
687
688    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
689        fn write_child(
690            f: &mut std::fmt::Formatter,
691            expr: &dyn PhysicalExpr,
692            precedence: u8,
693        ) -> std::fmt::Result {
694            if let Some(child) = expr.as_any().downcast_ref::<BinaryExpr>() {
695                let p = child.op.precedence();
696                if p == 0 || p < precedence {
697                    write!(f, "(")?;
698                    child.fmt_sql(f)?;
699                    write!(f, ")")
700                } else {
701                    child.fmt_sql(f)
702                }
703            } else {
704                expr.fmt_sql(f)
705            }
706        }
707
708        let precedence = self.op.precedence();
709        write_child(f, self.left.as_ref(), precedence)?;
710        write!(f, " {} ", self.op)?;
711        write_child(f, self.right.as_ref(), precedence)
712    }
713}
714
715/// Casts dictionary array to result type for binary numerical operators. Such operators
716/// between array and scalar produce a dictionary array other than primitive array of the
717/// same operators between array and array. This leads to inconsistent result types causing
718/// errors in the following query execution. For such operators between array and scalar,
719/// we cast the dictionary array to primitive array.
720fn to_result_type_array(
721    op: &Operator,
722    array: ArrayRef,
723    result_type: &DataType,
724) -> Result<ArrayRef> {
725    if array.data_type() == result_type {
726        Ok(array)
727    } else if op.is_numerical_operators() {
728        match array.data_type() {
729            DataType::Dictionary(_, value_type) => {
730                if value_type.as_ref() == result_type {
731                    Ok(cast(&array, result_type)?)
732                } else {
733                    internal_err!(
734                            "Incompatible Dictionary value type {value_type:?} with result type {result_type:?} of Binary operator {op:?}"
735                        )
736                }
737            }
738            _ => Ok(array),
739        }
740    } else {
741        Ok(array)
742    }
743}
744
745impl BinaryExpr {
746    /// Evaluate the expression of the left input is an array and
747    /// right is literal - use scalar operations
748    fn evaluate_array_scalar(
749        &self,
750        array: &dyn Array,
751        scalar: ScalarValue,
752    ) -> Result<Option<Result<ArrayRef>>> {
753        use Operator::*;
754        let scalar_result = match &self.op {
755            RegexMatch => binary_string_array_flag_op_scalar!(
756                array,
757                scalar,
758                regexp_is_match,
759                false,
760                false
761            ),
762            RegexIMatch => binary_string_array_flag_op_scalar!(
763                array,
764                scalar,
765                regexp_is_match,
766                false,
767                true
768            ),
769            RegexNotMatch => binary_string_array_flag_op_scalar!(
770                array,
771                scalar,
772                regexp_is_match,
773                true,
774                false
775            ),
776            RegexNotIMatch => binary_string_array_flag_op_scalar!(
777                array,
778                scalar,
779                regexp_is_match,
780                true,
781                true
782            ),
783            BitwiseAnd => bitwise_and_dyn_scalar(array, scalar),
784            BitwiseOr => bitwise_or_dyn_scalar(array, scalar),
785            BitwiseXor => bitwise_xor_dyn_scalar(array, scalar),
786            BitwiseShiftRight => bitwise_shift_right_dyn_scalar(array, scalar),
787            BitwiseShiftLeft => bitwise_shift_left_dyn_scalar(array, scalar),
788            // if scalar operation is not supported - fallback to array implementation
789            _ => None,
790        };
791
792        Ok(scalar_result)
793    }
794
795    fn evaluate_with_resolved_args(
796        &self,
797        left: Arc<dyn Array>,
798        left_data_type: &DataType,
799        right: Arc<dyn Array>,
800        right_data_type: &DataType,
801    ) -> Result<ArrayRef> {
802        use Operator::*;
803        match &self.op {
804            IsDistinctFrom | IsNotDistinctFrom | Lt | LtEq | Gt | GtEq | Eq | NotEq
805            | Plus | Minus | Multiply | Divide | Modulo | LikeMatch | ILikeMatch
806            | NotLikeMatch | NotILikeMatch => unreachable!(),
807            And => {
808                if left_data_type == &DataType::Boolean {
809                    Ok(boolean_op(&left, &right, and_kleene)?)
810                } else {
811                    internal_err!(
812                        "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
813                        self.op,
814                        left.data_type(),
815                        right.data_type()
816                    )
817                }
818            }
819            Or => {
820                if left_data_type == &DataType::Boolean {
821                    Ok(boolean_op(&left, &right, or_kleene)?)
822                } else {
823                    internal_err!(
824                        "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
825                        self.op,
826                        left_data_type,
827                        right_data_type
828                    )
829                }
830            }
831            RegexMatch => {
832                binary_string_array_flag_op!(left, right, regexp_is_match, false, false)
833            }
834            RegexIMatch => {
835                binary_string_array_flag_op!(left, right, regexp_is_match, false, true)
836            }
837            RegexNotMatch => {
838                binary_string_array_flag_op!(left, right, regexp_is_match, true, false)
839            }
840            RegexNotIMatch => {
841                binary_string_array_flag_op!(left, right, regexp_is_match, true, true)
842            }
843            BitwiseAnd => bitwise_and_dyn(left, right),
844            BitwiseOr => bitwise_or_dyn(left, right),
845            BitwiseXor => bitwise_xor_dyn(left, right),
846            BitwiseShiftRight => bitwise_shift_right_dyn(left, right),
847            BitwiseShiftLeft => bitwise_shift_left_dyn(left, right),
848            StringConcat => concat_elements(left, right),
849            AtArrow | ArrowAt | Arrow | LongArrow | HashArrow | HashLongArrow | AtAt
850            | HashMinus | AtQuestion | Question | QuestionAnd | QuestionPipe
851            | IntegerDivide => {
852                not_impl_err!(
853                    "Binary operator '{:?}' is not supported in the physical expr",
854                    self.op
855                )
856            }
857        }
858    }
859}
860
861enum ShortCircuitStrategy<'a> {
862    None,
863    ReturnLeft,
864    ReturnRight,
865    PreSelection(&'a BooleanArray),
866}
867
868/// Based on the results calculated from the left side of the short-circuit operation,
869/// if the proportion of `true` is less than 0.2 and the current operation is an `and`,
870/// the `RecordBatch` will be filtered in advance.
871const PRE_SELECTION_THRESHOLD: f32 = 0.2;
872
873/// Checks if a logical operator (`AND`/`OR`) can short-circuit evaluation based on the left-hand side (lhs) result.
874///
875/// Short-circuiting occurs under these circumstances:
876/// - For `AND`:
877///    - if LHS is all false => short-circuit → return LHS
878///    - if LHS is all true  => short-circuit → return RHS
879///    - if LHS is mixed and true_count/sum_count <= [`PRE_SELECTION_THRESHOLD`] -> pre-selection
880/// - For `OR`:
881///    - if LHS is all true  => short-circuit → return LHS
882///    - if LHS is all false => short-circuit → return RHS
883/// # Arguments
884/// * `lhs` - The left-hand side (lhs) columnar value (array or scalar)
885/// * `lhs` - The left-hand side (lhs) columnar value (array or scalar)
886/// * `op` - The logical operator (`AND` or `OR`)
887///
888/// # Implementation Notes
889/// 1. Only works with Boolean-typed arguments (other types automatically return `false`)
890/// 2. Handles both scalar values and array values
891/// 3. For arrays, uses optimized bit counting techniques for boolean arrays
892fn check_short_circuit<'a>(
893    lhs: &'a ColumnarValue,
894    op: &Operator,
895) -> ShortCircuitStrategy<'a> {
896    // Quick reject for non-logical operators,and quick judgment when op is and
897    let is_and = match op {
898        Operator::And => true,
899        Operator::Or => false,
900        _ => return ShortCircuitStrategy::None,
901    };
902
903    // Non-boolean types can't be short-circuited
904    if lhs.data_type() != DataType::Boolean {
905        return ShortCircuitStrategy::None;
906    }
907
908    match lhs {
909        ColumnarValue::Array(array) => {
910            // Fast path for arrays - try to downcast to boolean array
911            if let Ok(bool_array) = as_boolean_array(array) {
912                // Arrays with nulls can't be short-circuited
913                if bool_array.null_count() > 0 {
914                    return ShortCircuitStrategy::None;
915                }
916
917                let len = bool_array.len();
918                if len == 0 {
919                    return ShortCircuitStrategy::None;
920                }
921
922                let true_count = bool_array.values().count_set_bits();
923                if is_and {
924                    // For AND, prioritize checking for all-false (short circuit case)
925                    // Uses optimized false_count() method provided by Arrow
926
927                    // Short circuit if all values are false
928                    if true_count == 0 {
929                        return ShortCircuitStrategy::ReturnLeft;
930                    }
931
932                    // If no false values, then all must be true
933                    if true_count == len {
934                        return ShortCircuitStrategy::ReturnRight;
935                    }
936
937                    // determine if we can pre-selection
938                    if true_count as f32 / len as f32 <= PRE_SELECTION_THRESHOLD {
939                        return ShortCircuitStrategy::PreSelection(bool_array);
940                    }
941                } else {
942                    // For OR, prioritize checking for all-true (short circuit case)
943                    // Uses optimized true_count() method provided by Arrow
944
945                    // Short circuit if all values are true
946                    if true_count == len {
947                        return ShortCircuitStrategy::ReturnLeft;
948                    }
949
950                    // If no true values, then all must be false
951                    if true_count == 0 {
952                        return ShortCircuitStrategy::ReturnRight;
953                    }
954                }
955            }
956        }
957        ColumnarValue::Scalar(scalar) => {
958            // Fast path for scalar values
959            if let ScalarValue::Boolean(Some(is_true)) = scalar {
960                // Return Left for:
961                // - AND with false value
962                // - OR with true value
963                if (is_and && !is_true) || (!is_and && *is_true) {
964                    return ShortCircuitStrategy::ReturnLeft;
965                } else {
966                    return ShortCircuitStrategy::ReturnRight;
967                }
968            }
969        }
970    }
971
972    // If we can't short-circuit, indicate that normal evaluation should continue
973    ShortCircuitStrategy::None
974}
975
976/// Creates a new boolean array based on the evaluation of the right expression,
977/// but only for positions where the left_result is true.
978///
979/// This function is used for short-circuit evaluation optimization of logical AND operations:
980/// - When left_result has few true values, we only evaluate the right expression for those positions
981/// - Values are copied from right_array where left_result is true
982/// - All other positions are filled with false values
983///
984/// # Parameters
985/// - `left_result` Boolean array with selection mask (typically from left side of AND)
986/// - `right_result` Result of evaluating right side of expression (only for selected positions)
987///
988/// # Returns
989/// A combined ColumnarValue with values from right_result where left_result is true
990///
991/// # Example
992///  Initial Data: { 1, 2, 3, 4, 5 }
993///  Left Evaluation
994///     (Condition: Equal to 2 or 3)
995///          ↓
996///  Filtered Data: {2, 3}
997///    Left Bitmap: { 0, 1, 1, 0, 0 }
998///          ↓
999///   Right Evaluation
1000///     (Condition: Even numbers)
1001///          ↓
1002///  Right Data: { 2 }
1003///    Right Bitmap: { 1, 0 }
1004///          ↓
1005///   Combine Results
1006///  Final Bitmap: { 0, 1, 0, 0, 0 }
1007///
1008/// # Note
1009/// Perhaps it would be better to modify `left_result` directly without creating a copy?
1010/// In practice, `left_result` should have only one owner, so making changes should be safe.
1011/// However, this is difficult to achieve under the immutable constraints of [`Arc`] and [`BooleanArray`].
1012fn pre_selection_scatter(
1013    left_result: &BooleanArray,
1014    right_result: Option<&BooleanArray>,
1015) -> Result<ColumnarValue> {
1016    let result_len = left_result.len();
1017
1018    let mut result_array_builder = BooleanArray::builder(result_len);
1019
1020    // keep track of current position we have in right boolean array
1021    let mut right_array_pos = 0;
1022
1023    // keep track of how much is filled
1024    let mut last_end = 0;
1025    // reduce if condition in for_each
1026    match right_result {
1027        Some(right_result) => {
1028            SlicesIterator::new(left_result).for_each(|(start, end)| {
1029                // the gap needs to be filled with false
1030                if start > last_end {
1031                    result_array_builder.append_n(start - last_end, false);
1032                }
1033
1034                // copy values from right array for this slice
1035                let len = end - start;
1036                right_result
1037                    .slice(right_array_pos, len)
1038                    .iter()
1039                    .for_each(|v| result_array_builder.append_option(v));
1040
1041                right_array_pos += len;
1042                last_end = end;
1043            });
1044        }
1045        None => SlicesIterator::new(left_result).for_each(|(start, end)| {
1046            // the gap needs to be filled with false
1047            if start > last_end {
1048                result_array_builder.append_n(start - last_end, false);
1049            }
1050
1051            // append nulls for this slice derictly
1052            let len = end - start;
1053            result_array_builder.append_nulls(len);
1054
1055            last_end = end;
1056        }),
1057    }
1058
1059    // Fill any remaining positions with false
1060    if last_end < result_len {
1061        result_array_builder.append_n(result_len - last_end, false);
1062    }
1063    let boolean_result = result_array_builder.finish();
1064
1065    Ok(ColumnarValue::Array(Arc::new(boolean_result)))
1066}
1067
1068fn concat_elements(left: Arc<dyn Array>, right: Arc<dyn Array>) -> Result<ArrayRef> {
1069    Ok(match left.data_type() {
1070        DataType::Utf8 => Arc::new(concat_elements_utf8(
1071            left.as_string::<i32>(),
1072            right.as_string::<i32>(),
1073        )?),
1074        DataType::LargeUtf8 => Arc::new(concat_elements_utf8(
1075            left.as_string::<i64>(),
1076            right.as_string::<i64>(),
1077        )?),
1078        DataType::Utf8View => Arc::new(concat_elements_utf8view(
1079            left.as_string_view(),
1080            right.as_string_view(),
1081        )?),
1082        other => {
1083            return internal_err!(
1084                "Data type {other:?} not supported for binary operation 'concat_elements' on string arrays"
1085            );
1086        }
1087    })
1088}
1089
1090/// Create a binary expression whose arguments are correctly coerced.
1091/// This function errors if it is not possible to coerce the arguments
1092/// to computational types supported by the operator.
1093pub fn binary(
1094    lhs: Arc<dyn PhysicalExpr>,
1095    op: Operator,
1096    rhs: Arc<dyn PhysicalExpr>,
1097    _input_schema: &Schema,
1098) -> Result<Arc<dyn PhysicalExpr>> {
1099    Ok(Arc::new(BinaryExpr::new(lhs, op, rhs)))
1100}
1101
1102/// Create a similar to expression
1103pub fn similar_to(
1104    negated: bool,
1105    case_insensitive: bool,
1106    expr: Arc<dyn PhysicalExpr>,
1107    pattern: Arc<dyn PhysicalExpr>,
1108) -> Result<Arc<dyn PhysicalExpr>> {
1109    let binary_op = match (negated, case_insensitive) {
1110        (false, false) => Operator::RegexMatch,
1111        (false, true) => Operator::RegexIMatch,
1112        (true, false) => Operator::RegexNotMatch,
1113        (true, true) => Operator::RegexNotIMatch,
1114    };
1115    Ok(Arc::new(BinaryExpr::new(expr, binary_op, pattern)))
1116}
1117
1118#[cfg(test)]
1119mod tests {
1120    use super::*;
1121    use crate::expressions::{col, lit, try_cast, Column, Literal};
1122    use datafusion_expr::lit as expr_lit;
1123
1124    use datafusion_common::plan_datafusion_err;
1125    use datafusion_physical_expr_common::physical_expr::fmt_sql;
1126
1127    use crate::planner::logical2physical;
1128    use arrow::array::BooleanArray;
1129    use datafusion_expr::col as logical_col;
1130    /// Performs a binary operation, applying any type coercion necessary
1131    fn binary_op(
1132        left: Arc<dyn PhysicalExpr>,
1133        op: Operator,
1134        right: Arc<dyn PhysicalExpr>,
1135        schema: &Schema,
1136    ) -> Result<Arc<dyn PhysicalExpr>> {
1137        let left_type = left.data_type(schema)?;
1138        let right_type = right.data_type(schema)?;
1139        let (lhs, rhs) =
1140            BinaryTypeCoercer::new(&left_type, &op, &right_type).get_input_types()?;
1141
1142        let left_expr = try_cast(left, schema, lhs)?;
1143        let right_expr = try_cast(right, schema, rhs)?;
1144        binary(left_expr, op, right_expr, schema)
1145    }
1146
1147    #[test]
1148    fn binary_comparison() -> Result<()> {
1149        let schema = Schema::new(vec![
1150            Field::new("a", DataType::Int32, false),
1151            Field::new("b", DataType::Int32, false),
1152        ]);
1153        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1154        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1155
1156        // expression: "a < b"
1157        let lt = binary(
1158            col("a", &schema)?,
1159            Operator::Lt,
1160            col("b", &schema)?,
1161            &schema,
1162        )?;
1163        let batch =
1164            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
1165
1166        let result = lt
1167            .evaluate(&batch)?
1168            .into_array(batch.num_rows())
1169            .expect("Failed to convert to array");
1170        assert_eq!(result.len(), 5);
1171
1172        let expected = [false, false, true, true, true];
1173        let result =
1174            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
1175        for (i, &expected_item) in expected.iter().enumerate().take(5) {
1176            assert_eq!(result.value(i), expected_item);
1177        }
1178
1179        Ok(())
1180    }
1181
1182    #[test]
1183    fn binary_nested() -> Result<()> {
1184        let schema = Schema::new(vec![
1185            Field::new("a", DataType::Int32, false),
1186            Field::new("b", DataType::Int32, false),
1187        ]);
1188        let a = Int32Array::from(vec![2, 4, 6, 8, 10]);
1189        let b = Int32Array::from(vec![2, 5, 4, 8, 8]);
1190
1191        // expression: "a < b OR a == b"
1192        let expr = binary(
1193            binary(
1194                col("a", &schema)?,
1195                Operator::Lt,
1196                col("b", &schema)?,
1197                &schema,
1198            )?,
1199            Operator::Or,
1200            binary(
1201                col("a", &schema)?,
1202                Operator::Eq,
1203                col("b", &schema)?,
1204                &schema,
1205            )?,
1206            &schema,
1207        )?;
1208        let batch =
1209            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
1210
1211        assert_eq!("a@0 < b@1 OR a@0 = b@1", format!("{expr}"));
1212
1213        let result = expr
1214            .evaluate(&batch)?
1215            .into_array(batch.num_rows())
1216            .expect("Failed to convert to array");
1217        assert_eq!(result.len(), 5);
1218
1219        let expected = [true, true, false, true, false];
1220        let result =
1221            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
1222        for (i, &expected_item) in expected.iter().enumerate().take(5) {
1223            assert_eq!(result.value(i), expected_item);
1224        }
1225
1226        Ok(())
1227    }
1228
1229    // runs an end-to-end test of physical type coercion:
1230    // 1. construct a record batch with two columns of type A and B
1231    //  (*_ARRAY is the Rust Arrow array type, and *_TYPE is the DataType of the elements)
1232    // 2. construct a physical expression of A OP B
1233    // 3. evaluate the expression
1234    // 4. verify that the resulting expression is of type C
1235    // 5. verify that the results of evaluation are $VEC
1236    macro_rules! test_coercion {
1237        ($A_ARRAY:ident, $A_TYPE:expr, $A_VEC:expr, $B_ARRAY:ident, $B_TYPE:expr, $B_VEC:expr, $OP:expr, $C_ARRAY:ident, $C_TYPE:expr, $VEC:expr,) => {{
1238            let schema = Schema::new(vec![
1239                Field::new("a", $A_TYPE, false),
1240                Field::new("b", $B_TYPE, false),
1241            ]);
1242            let a = $A_ARRAY::from($A_VEC);
1243            let b = $B_ARRAY::from($B_VEC);
1244            let (lhs, rhs) = BinaryTypeCoercer::new(&$A_TYPE, &$OP, &$B_TYPE).get_input_types()?;
1245
1246            let left = try_cast(col("a", &schema)?, &schema, lhs)?;
1247            let right = try_cast(col("b", &schema)?, &schema, rhs)?;
1248
1249            // verify that we can construct the expression
1250            let expression = binary(left, $OP, right, &schema)?;
1251            let batch = RecordBatch::try_new(
1252                Arc::new(schema.clone()),
1253                vec![Arc::new(a), Arc::new(b)],
1254            )?;
1255
1256            // verify that the expression's type is correct
1257            assert_eq!(expression.data_type(&schema)?, $C_TYPE);
1258
1259            // compute
1260            let result = expression.evaluate(&batch)?.into_array(batch.num_rows()).expect("Failed to convert to array");
1261
1262            // verify that the array's data_type is correct
1263            assert_eq!(*result.data_type(), $C_TYPE);
1264
1265            // verify that the data itself is downcastable
1266            let result = result
1267                .as_any()
1268                .downcast_ref::<$C_ARRAY>()
1269                .expect("failed to downcast");
1270            // verify that the result itself is correct
1271            for (i, x) in $VEC.iter().enumerate() {
1272                let v = result.value(i);
1273                assert_eq!(
1274                    v,
1275                    *x,
1276                    "Unexpected output at position {i}:\n\nActual:\n{v}\n\nExpected:\n{x}"
1277                );
1278            }
1279        }};
1280    }
1281
1282    #[test]
1283    fn test_type_coercion() -> Result<()> {
1284        test_coercion!(
1285            Int32Array,
1286            DataType::Int32,
1287            vec![1i32, 2i32],
1288            UInt32Array,
1289            DataType::UInt32,
1290            vec![1u32, 2u32],
1291            Operator::Plus,
1292            Int64Array,
1293            DataType::Int64,
1294            [2i64, 4i64],
1295        );
1296        test_coercion!(
1297            Int32Array,
1298            DataType::Int32,
1299            vec![1i32],
1300            UInt16Array,
1301            DataType::UInt16,
1302            vec![1u16],
1303            Operator::Plus,
1304            Int32Array,
1305            DataType::Int32,
1306            [2i32],
1307        );
1308        test_coercion!(
1309            Float32Array,
1310            DataType::Float32,
1311            vec![1f32],
1312            UInt16Array,
1313            DataType::UInt16,
1314            vec![1u16],
1315            Operator::Plus,
1316            Float32Array,
1317            DataType::Float32,
1318            [2f32],
1319        );
1320        test_coercion!(
1321            Float32Array,
1322            DataType::Float32,
1323            vec![2f32],
1324            UInt16Array,
1325            DataType::UInt16,
1326            vec![1u16],
1327            Operator::Multiply,
1328            Float32Array,
1329            DataType::Float32,
1330            [2f32],
1331        );
1332        test_coercion!(
1333            StringArray,
1334            DataType::Utf8,
1335            vec!["1994-12-13", "1995-01-26"],
1336            Date32Array,
1337            DataType::Date32,
1338            vec![9112, 9156],
1339            Operator::Eq,
1340            BooleanArray,
1341            DataType::Boolean,
1342            [true, true],
1343        );
1344        test_coercion!(
1345            StringArray,
1346            DataType::Utf8,
1347            vec!["1994-12-13", "1995-01-26"],
1348            Date32Array,
1349            DataType::Date32,
1350            vec![9113, 9154],
1351            Operator::Lt,
1352            BooleanArray,
1353            DataType::Boolean,
1354            [true, false],
1355        );
1356        test_coercion!(
1357            StringArray,
1358            DataType::Utf8,
1359            vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
1360            Date64Array,
1361            DataType::Date64,
1362            vec![787322096000, 791083425000],
1363            Operator::Eq,
1364            BooleanArray,
1365            DataType::Boolean,
1366            [true, true],
1367        );
1368        test_coercion!(
1369            StringArray,
1370            DataType::Utf8,
1371            vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
1372            Date64Array,
1373            DataType::Date64,
1374            vec![787322096001, 791083424999],
1375            Operator::Lt,
1376            BooleanArray,
1377            DataType::Boolean,
1378            [true, false],
1379        );
1380        test_coercion!(
1381            StringViewArray,
1382            DataType::Utf8View,
1383            vec!["abc"; 5],
1384            StringArray,
1385            DataType::Utf8,
1386            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1387            Operator::RegexMatch,
1388            BooleanArray,
1389            DataType::Boolean,
1390            [true, false, true, false, false],
1391        );
1392        test_coercion!(
1393            StringViewArray,
1394            DataType::Utf8View,
1395            vec!["abc"; 5],
1396            StringArray,
1397            DataType::Utf8,
1398            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1399            Operator::RegexIMatch,
1400            BooleanArray,
1401            DataType::Boolean,
1402            [true, true, true, true, false],
1403        );
1404        test_coercion!(
1405            StringArray,
1406            DataType::Utf8,
1407            vec!["abc"; 5],
1408            StringViewArray,
1409            DataType::Utf8View,
1410            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1411            Operator::RegexNotMatch,
1412            BooleanArray,
1413            DataType::Boolean,
1414            [false, true, false, true, true],
1415        );
1416        test_coercion!(
1417            StringArray,
1418            DataType::Utf8,
1419            vec!["abc"; 5],
1420            StringViewArray,
1421            DataType::Utf8View,
1422            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1423            Operator::RegexNotIMatch,
1424            BooleanArray,
1425            DataType::Boolean,
1426            [false, false, false, false, true],
1427        );
1428        test_coercion!(
1429            StringArray,
1430            DataType::Utf8,
1431            vec!["abc"; 5],
1432            StringArray,
1433            DataType::Utf8,
1434            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1435            Operator::RegexMatch,
1436            BooleanArray,
1437            DataType::Boolean,
1438            [true, false, true, false, false],
1439        );
1440        test_coercion!(
1441            StringArray,
1442            DataType::Utf8,
1443            vec!["abc"; 5],
1444            StringArray,
1445            DataType::Utf8,
1446            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1447            Operator::RegexIMatch,
1448            BooleanArray,
1449            DataType::Boolean,
1450            [true, true, true, true, false],
1451        );
1452        test_coercion!(
1453            StringArray,
1454            DataType::Utf8,
1455            vec!["abc"; 5],
1456            StringArray,
1457            DataType::Utf8,
1458            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1459            Operator::RegexNotMatch,
1460            BooleanArray,
1461            DataType::Boolean,
1462            [false, true, false, true, true],
1463        );
1464        test_coercion!(
1465            StringArray,
1466            DataType::Utf8,
1467            vec!["abc"; 5],
1468            StringArray,
1469            DataType::Utf8,
1470            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1471            Operator::RegexNotIMatch,
1472            BooleanArray,
1473            DataType::Boolean,
1474            [false, false, false, false, true],
1475        );
1476        test_coercion!(
1477            LargeStringArray,
1478            DataType::LargeUtf8,
1479            vec!["abc"; 5],
1480            LargeStringArray,
1481            DataType::LargeUtf8,
1482            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1483            Operator::RegexMatch,
1484            BooleanArray,
1485            DataType::Boolean,
1486            [true, false, true, false, false],
1487        );
1488        test_coercion!(
1489            LargeStringArray,
1490            DataType::LargeUtf8,
1491            vec!["abc"; 5],
1492            LargeStringArray,
1493            DataType::LargeUtf8,
1494            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1495            Operator::RegexIMatch,
1496            BooleanArray,
1497            DataType::Boolean,
1498            [true, true, true, true, false],
1499        );
1500        test_coercion!(
1501            LargeStringArray,
1502            DataType::LargeUtf8,
1503            vec!["abc"; 5],
1504            LargeStringArray,
1505            DataType::LargeUtf8,
1506            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1507            Operator::RegexNotMatch,
1508            BooleanArray,
1509            DataType::Boolean,
1510            [false, true, false, true, true],
1511        );
1512        test_coercion!(
1513            LargeStringArray,
1514            DataType::LargeUtf8,
1515            vec!["abc"; 5],
1516            LargeStringArray,
1517            DataType::LargeUtf8,
1518            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1519            Operator::RegexNotIMatch,
1520            BooleanArray,
1521            DataType::Boolean,
1522            [false, false, false, false, true],
1523        );
1524        test_coercion!(
1525            StringArray,
1526            DataType::Utf8,
1527            vec!["abc"; 5],
1528            StringArray,
1529            DataType::Utf8,
1530            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1531            Operator::LikeMatch,
1532            BooleanArray,
1533            DataType::Boolean,
1534            [true, false, false, true, false],
1535        );
1536        test_coercion!(
1537            StringArray,
1538            DataType::Utf8,
1539            vec!["abc"; 5],
1540            StringArray,
1541            DataType::Utf8,
1542            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1543            Operator::ILikeMatch,
1544            BooleanArray,
1545            DataType::Boolean,
1546            [true, true, false, true, true],
1547        );
1548        test_coercion!(
1549            StringArray,
1550            DataType::Utf8,
1551            vec!["abc"; 5],
1552            StringArray,
1553            DataType::Utf8,
1554            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1555            Operator::NotLikeMatch,
1556            BooleanArray,
1557            DataType::Boolean,
1558            [false, true, true, false, true],
1559        );
1560        test_coercion!(
1561            StringArray,
1562            DataType::Utf8,
1563            vec!["abc"; 5],
1564            StringArray,
1565            DataType::Utf8,
1566            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1567            Operator::NotILikeMatch,
1568            BooleanArray,
1569            DataType::Boolean,
1570            [false, false, true, false, false],
1571        );
1572        test_coercion!(
1573            LargeStringArray,
1574            DataType::LargeUtf8,
1575            vec!["abc"; 5],
1576            LargeStringArray,
1577            DataType::LargeUtf8,
1578            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1579            Operator::LikeMatch,
1580            BooleanArray,
1581            DataType::Boolean,
1582            [true, false, false, true, false],
1583        );
1584        test_coercion!(
1585            LargeStringArray,
1586            DataType::LargeUtf8,
1587            vec!["abc"; 5],
1588            LargeStringArray,
1589            DataType::LargeUtf8,
1590            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1591            Operator::ILikeMatch,
1592            BooleanArray,
1593            DataType::Boolean,
1594            [true, true, false, true, true],
1595        );
1596        test_coercion!(
1597            LargeStringArray,
1598            DataType::LargeUtf8,
1599            vec!["abc"; 5],
1600            LargeStringArray,
1601            DataType::LargeUtf8,
1602            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1603            Operator::NotLikeMatch,
1604            BooleanArray,
1605            DataType::Boolean,
1606            [false, true, true, false, true],
1607        );
1608        test_coercion!(
1609            LargeStringArray,
1610            DataType::LargeUtf8,
1611            vec!["abc"; 5],
1612            LargeStringArray,
1613            DataType::LargeUtf8,
1614            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1615            Operator::NotILikeMatch,
1616            BooleanArray,
1617            DataType::Boolean,
1618            [false, false, true, false, false],
1619        );
1620        test_coercion!(
1621            Int16Array,
1622            DataType::Int16,
1623            vec![1i16, 2i16, 3i16],
1624            Int64Array,
1625            DataType::Int64,
1626            vec![10i64, 4i64, 5i64],
1627            Operator::BitwiseAnd,
1628            Int64Array,
1629            DataType::Int64,
1630            [0i64, 0i64, 1i64],
1631        );
1632        test_coercion!(
1633            UInt16Array,
1634            DataType::UInt16,
1635            vec![1u16, 2u16, 3u16],
1636            UInt64Array,
1637            DataType::UInt64,
1638            vec![10u64, 4u64, 5u64],
1639            Operator::BitwiseAnd,
1640            UInt64Array,
1641            DataType::UInt64,
1642            [0u64, 0u64, 1u64],
1643        );
1644        test_coercion!(
1645            Int16Array,
1646            DataType::Int16,
1647            vec![3i16, 2i16, 3i16],
1648            Int64Array,
1649            DataType::Int64,
1650            vec![10i64, 6i64, 5i64],
1651            Operator::BitwiseOr,
1652            Int64Array,
1653            DataType::Int64,
1654            [11i64, 6i64, 7i64],
1655        );
1656        test_coercion!(
1657            UInt16Array,
1658            DataType::UInt16,
1659            vec![1u16, 2u16, 3u16],
1660            UInt64Array,
1661            DataType::UInt64,
1662            vec![10u64, 4u64, 5u64],
1663            Operator::BitwiseOr,
1664            UInt64Array,
1665            DataType::UInt64,
1666            [11u64, 6u64, 7u64],
1667        );
1668        test_coercion!(
1669            Int16Array,
1670            DataType::Int16,
1671            vec![3i16, 2i16, 3i16],
1672            Int64Array,
1673            DataType::Int64,
1674            vec![10i64, 6i64, 5i64],
1675            Operator::BitwiseXor,
1676            Int64Array,
1677            DataType::Int64,
1678            [9i64, 4i64, 6i64],
1679        );
1680        test_coercion!(
1681            UInt16Array,
1682            DataType::UInt16,
1683            vec![3u16, 2u16, 3u16],
1684            UInt64Array,
1685            DataType::UInt64,
1686            vec![10u64, 6u64, 5u64],
1687            Operator::BitwiseXor,
1688            UInt64Array,
1689            DataType::UInt64,
1690            [9u64, 4u64, 6u64],
1691        );
1692        test_coercion!(
1693            Int16Array,
1694            DataType::Int16,
1695            vec![4i16, 27i16, 35i16],
1696            Int64Array,
1697            DataType::Int64,
1698            vec![2i64, 3i64, 4i64],
1699            Operator::BitwiseShiftRight,
1700            Int64Array,
1701            DataType::Int64,
1702            [1i64, 3i64, 2i64],
1703        );
1704        test_coercion!(
1705            UInt16Array,
1706            DataType::UInt16,
1707            vec![4u16, 27u16, 35u16],
1708            UInt64Array,
1709            DataType::UInt64,
1710            vec![2u64, 3u64, 4u64],
1711            Operator::BitwiseShiftRight,
1712            UInt64Array,
1713            DataType::UInt64,
1714            [1u64, 3u64, 2u64],
1715        );
1716        test_coercion!(
1717            Int16Array,
1718            DataType::Int16,
1719            vec![2i16, 3i16, 4i16],
1720            Int64Array,
1721            DataType::Int64,
1722            vec![4i64, 12i64, 7i64],
1723            Operator::BitwiseShiftLeft,
1724            Int64Array,
1725            DataType::Int64,
1726            [32i64, 12288i64, 512i64],
1727        );
1728        test_coercion!(
1729            UInt16Array,
1730            DataType::UInt16,
1731            vec![2u16, 3u16, 4u16],
1732            UInt64Array,
1733            DataType::UInt64,
1734            vec![4u64, 12u64, 7u64],
1735            Operator::BitwiseShiftLeft,
1736            UInt64Array,
1737            DataType::UInt64,
1738            [32u64, 12288u64, 512u64],
1739        );
1740        Ok(())
1741    }
1742
1743    // Note it would be nice to use the same test_coercion macro as
1744    // above, but sadly the type of the values of the dictionary are
1745    // not encoded in the rust type of the DictionaryArray. Thus there
1746    // is no way at the time of this writing to create a dictionary
1747    // array using the `From` trait
1748    #[test]
1749    fn test_dictionary_type_to_array_coercion() -> Result<()> {
1750        // Test string  a string dictionary
1751        let dict_type =
1752            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1753        let string_type = DataType::Utf8;
1754
1755        // build dictionary
1756        let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
1757
1758        dict_builder.append("one")?;
1759        dict_builder.append_null();
1760        dict_builder.append("three")?;
1761        dict_builder.append("four")?;
1762        let dict_array = Arc::new(dict_builder.finish()) as ArrayRef;
1763
1764        let str_array = Arc::new(StringArray::from(vec![
1765            Some("not one"),
1766            Some("two"),
1767            None,
1768            Some("four"),
1769        ])) as ArrayRef;
1770
1771        let schema = Arc::new(Schema::new(vec![
1772            Field::new("a", dict_type.clone(), true),
1773            Field::new("b", string_type.clone(), true),
1774        ]));
1775
1776        // Test 1: a = b
1777        let result = BooleanArray::from(vec![Some(false), None, None, Some(true)]);
1778        apply_logic_op(&schema, &dict_array, &str_array, Operator::Eq, result)?;
1779
1780        // Test 2: now test the other direction
1781        // b = a
1782        let schema = Arc::new(Schema::new(vec![
1783            Field::new("a", string_type, true),
1784            Field::new("b", dict_type, true),
1785        ]));
1786        let result = BooleanArray::from(vec![Some(false), None, None, Some(true)]);
1787        apply_logic_op(&schema, &str_array, &dict_array, Operator::Eq, result)?;
1788
1789        Ok(())
1790    }
1791
1792    #[test]
1793    fn plus_op() -> Result<()> {
1794        let schema = Schema::new(vec![
1795            Field::new("a", DataType::Int32, false),
1796            Field::new("b", DataType::Int32, false),
1797        ]);
1798        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1799        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1800
1801        apply_arithmetic::<Int32Type>(
1802            Arc::new(schema),
1803            vec![Arc::new(a), Arc::new(b)],
1804            Operator::Plus,
1805            Int32Array::from(vec![2, 4, 7, 12, 21]),
1806        )?;
1807
1808        Ok(())
1809    }
1810
1811    #[test]
1812    fn plus_op_dict() -> Result<()> {
1813        let schema = Schema::new(vec![
1814            Field::new(
1815                "a",
1816                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1817                true,
1818            ),
1819            Field::new(
1820                "b",
1821                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1822                true,
1823            ),
1824        ]);
1825
1826        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1827        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
1828        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
1829
1830        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1831        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
1832        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
1833
1834        apply_arithmetic::<Int32Type>(
1835            Arc::new(schema),
1836            vec![Arc::new(a), Arc::new(b)],
1837            Operator::Plus,
1838            Int32Array::from(vec![Some(2), None, Some(4), Some(8), None]),
1839        )?;
1840
1841        Ok(())
1842    }
1843
1844    #[test]
1845    fn plus_op_dict_decimal() -> Result<()> {
1846        let schema = Schema::new(vec![
1847            Field::new(
1848                "a",
1849                DataType::Dictionary(
1850                    Box::new(DataType::Int8),
1851                    Box::new(DataType::Decimal128(10, 0)),
1852                ),
1853                true,
1854            ),
1855            Field::new(
1856                "b",
1857                DataType::Dictionary(
1858                    Box::new(DataType::Int8),
1859                    Box::new(DataType::Decimal128(10, 0)),
1860                ),
1861                true,
1862            ),
1863        ]);
1864
1865        let value = 123;
1866        let decimal_array = Arc::new(create_decimal_array(
1867            &[
1868                Some(value),
1869                Some(value + 2),
1870                Some(value - 1),
1871                Some(value + 1),
1872            ],
1873            10,
1874            0,
1875        ));
1876
1877        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
1878        let a = DictionaryArray::try_new(keys, decimal_array)?;
1879
1880        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
1881        let decimal_array = Arc::new(create_decimal_array(
1882            &[
1883                Some(value + 1),
1884                Some(value + 3),
1885                Some(value),
1886                Some(value + 2),
1887            ],
1888            10,
1889            0,
1890        ));
1891        let b = DictionaryArray::try_new(keys, decimal_array)?;
1892
1893        apply_arithmetic(
1894            Arc::new(schema),
1895            vec![Arc::new(a), Arc::new(b)],
1896            Operator::Plus,
1897            create_decimal_array(&[Some(247), None, None, Some(247), Some(246)], 11, 0),
1898        )?;
1899
1900        Ok(())
1901    }
1902
1903    #[test]
1904    fn plus_op_scalar() -> Result<()> {
1905        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1906        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1907
1908        apply_arithmetic_scalar(
1909            Arc::new(schema),
1910            vec![Arc::new(a)],
1911            Operator::Plus,
1912            ScalarValue::Int32(Some(1)),
1913            Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
1914        )?;
1915
1916        Ok(())
1917    }
1918
1919    #[test]
1920    fn plus_op_dict_scalar() -> Result<()> {
1921        let schema = Schema::new(vec![Field::new(
1922            "a",
1923            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1924            true,
1925        )]);
1926
1927        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
1928
1929        dict_builder.append(1)?;
1930        dict_builder.append_null();
1931        dict_builder.append(2)?;
1932        dict_builder.append(5)?;
1933
1934        let a = dict_builder.finish();
1935
1936        let expected: PrimitiveArray<Int32Type> =
1937            PrimitiveArray::from(vec![Some(2), None, Some(3), Some(6)]);
1938
1939        apply_arithmetic_scalar(
1940            Arc::new(schema),
1941            vec![Arc::new(a)],
1942            Operator::Plus,
1943            ScalarValue::Dictionary(
1944                Box::new(DataType::Int8),
1945                Box::new(ScalarValue::Int32(Some(1))),
1946            ),
1947            Arc::new(expected),
1948        )?;
1949
1950        Ok(())
1951    }
1952
1953    #[test]
1954    fn plus_op_dict_scalar_decimal() -> Result<()> {
1955        let schema = Schema::new(vec![Field::new(
1956            "a",
1957            DataType::Dictionary(
1958                Box::new(DataType::Int8),
1959                Box::new(DataType::Decimal128(10, 0)),
1960            ),
1961            true,
1962        )]);
1963
1964        let value = 123;
1965        let decimal_array = Arc::new(create_decimal_array(
1966            &[Some(value), None, Some(value - 1), Some(value + 1)],
1967            10,
1968            0,
1969        ));
1970
1971        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
1972        let a = DictionaryArray::try_new(keys, decimal_array)?;
1973
1974        let decimal_array = Arc::new(create_decimal_array(
1975            &[
1976                Some(value + 1),
1977                Some(value),
1978                None,
1979                Some(value + 2),
1980                Some(value + 1),
1981            ],
1982            11,
1983            0,
1984        ));
1985
1986        apply_arithmetic_scalar(
1987            Arc::new(schema),
1988            vec![Arc::new(a)],
1989            Operator::Plus,
1990            ScalarValue::Dictionary(
1991                Box::new(DataType::Int8),
1992                Box::new(ScalarValue::Decimal128(Some(1), 10, 0)),
1993            ),
1994            decimal_array,
1995        )?;
1996
1997        Ok(())
1998    }
1999
2000    #[test]
2001    fn minus_op() -> Result<()> {
2002        let schema = Arc::new(Schema::new(vec![
2003            Field::new("a", DataType::Int32, false),
2004            Field::new("b", DataType::Int32, false),
2005        ]));
2006        let a = Arc::new(Int32Array::from(vec![1, 2, 4, 8, 16]));
2007        let b = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
2008
2009        apply_arithmetic::<Int32Type>(
2010            Arc::clone(&schema),
2011            vec![
2012                Arc::clone(&a) as Arc<dyn Array>,
2013                Arc::clone(&b) as Arc<dyn Array>,
2014            ],
2015            Operator::Minus,
2016            Int32Array::from(vec![0, 0, 1, 4, 11]),
2017        )?;
2018
2019        // should handle have negative values in result (for signed)
2020        apply_arithmetic::<Int32Type>(
2021            schema,
2022            vec![b, a],
2023            Operator::Minus,
2024            Int32Array::from(vec![0, 0, -1, -4, -11]),
2025        )?;
2026
2027        Ok(())
2028    }
2029
2030    #[test]
2031    fn minus_op_dict() -> Result<()> {
2032        let schema = Schema::new(vec![
2033            Field::new(
2034                "a",
2035                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2036                true,
2037            ),
2038            Field::new(
2039                "b",
2040                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2041                true,
2042            ),
2043        ]);
2044
2045        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2046        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
2047        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
2048
2049        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2050        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2051        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2052
2053        apply_arithmetic::<Int32Type>(
2054            Arc::new(schema),
2055            vec![Arc::new(a), Arc::new(b)],
2056            Operator::Minus,
2057            Int32Array::from(vec![Some(0), None, Some(0), Some(0), None]),
2058        )?;
2059
2060        Ok(())
2061    }
2062
2063    #[test]
2064    fn minus_op_dict_decimal() -> Result<()> {
2065        let schema = Schema::new(vec![
2066            Field::new(
2067                "a",
2068                DataType::Dictionary(
2069                    Box::new(DataType::Int8),
2070                    Box::new(DataType::Decimal128(10, 0)),
2071                ),
2072                true,
2073            ),
2074            Field::new(
2075                "b",
2076                DataType::Dictionary(
2077                    Box::new(DataType::Int8),
2078                    Box::new(DataType::Decimal128(10, 0)),
2079                ),
2080                true,
2081            ),
2082        ]);
2083
2084        let value = 123;
2085        let decimal_array = Arc::new(create_decimal_array(
2086            &[
2087                Some(value),
2088                Some(value + 2),
2089                Some(value - 1),
2090                Some(value + 1),
2091            ],
2092            10,
2093            0,
2094        ));
2095
2096        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2097        let a = DictionaryArray::try_new(keys, decimal_array)?;
2098
2099        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2100        let decimal_array = Arc::new(create_decimal_array(
2101            &[
2102                Some(value + 1),
2103                Some(value + 3),
2104                Some(value),
2105                Some(value + 2),
2106            ],
2107            10,
2108            0,
2109        ));
2110        let b = DictionaryArray::try_new(keys, decimal_array)?;
2111
2112        apply_arithmetic(
2113            Arc::new(schema),
2114            vec![Arc::new(a), Arc::new(b)],
2115            Operator::Minus,
2116            create_decimal_array(&[Some(-1), None, None, Some(1), Some(0)], 11, 0),
2117        )?;
2118
2119        Ok(())
2120    }
2121
2122    #[test]
2123    fn minus_op_scalar() -> Result<()> {
2124        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2125        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2126
2127        apply_arithmetic_scalar(
2128            Arc::new(schema),
2129            vec![Arc::new(a)],
2130            Operator::Minus,
2131            ScalarValue::Int32(Some(1)),
2132            Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4])),
2133        )?;
2134
2135        Ok(())
2136    }
2137
2138    #[test]
2139    fn minus_op_dict_scalar() -> Result<()> {
2140        let schema = Schema::new(vec![Field::new(
2141            "a",
2142            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2143            true,
2144        )]);
2145
2146        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2147
2148        dict_builder.append(1)?;
2149        dict_builder.append_null();
2150        dict_builder.append(2)?;
2151        dict_builder.append(5)?;
2152
2153        let a = dict_builder.finish();
2154
2155        let expected: PrimitiveArray<Int32Type> =
2156            PrimitiveArray::from(vec![Some(0), None, Some(1), Some(4)]);
2157
2158        apply_arithmetic_scalar(
2159            Arc::new(schema),
2160            vec![Arc::new(a)],
2161            Operator::Minus,
2162            ScalarValue::Dictionary(
2163                Box::new(DataType::Int8),
2164                Box::new(ScalarValue::Int32(Some(1))),
2165            ),
2166            Arc::new(expected),
2167        )?;
2168
2169        Ok(())
2170    }
2171
2172    #[test]
2173    fn minus_op_dict_scalar_decimal() -> Result<()> {
2174        let schema = Schema::new(vec![Field::new(
2175            "a",
2176            DataType::Dictionary(
2177                Box::new(DataType::Int8),
2178                Box::new(DataType::Decimal128(10, 0)),
2179            ),
2180            true,
2181        )]);
2182
2183        let value = 123;
2184        let decimal_array = Arc::new(create_decimal_array(
2185            &[Some(value), None, Some(value - 1), Some(value + 1)],
2186            10,
2187            0,
2188        ));
2189
2190        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2191        let a = DictionaryArray::try_new(keys, decimal_array)?;
2192
2193        let decimal_array = Arc::new(create_decimal_array(
2194            &[
2195                Some(value - 1),
2196                Some(value - 2),
2197                None,
2198                Some(value),
2199                Some(value - 1),
2200            ],
2201            11,
2202            0,
2203        ));
2204
2205        apply_arithmetic_scalar(
2206            Arc::new(schema),
2207            vec![Arc::new(a)],
2208            Operator::Minus,
2209            ScalarValue::Dictionary(
2210                Box::new(DataType::Int8),
2211                Box::new(ScalarValue::Decimal128(Some(1), 10, 0)),
2212            ),
2213            decimal_array,
2214        )?;
2215
2216        Ok(())
2217    }
2218
2219    #[test]
2220    fn multiply_op() -> Result<()> {
2221        let schema = Arc::new(Schema::new(vec![
2222            Field::new("a", DataType::Int32, false),
2223            Field::new("b", DataType::Int32, false),
2224        ]));
2225        let a = Arc::new(Int32Array::from(vec![4, 8, 16, 32, 64]));
2226        let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
2227
2228        apply_arithmetic::<Int32Type>(
2229            schema,
2230            vec![a, b],
2231            Operator::Multiply,
2232            Int32Array::from(vec![8, 32, 128, 512, 2048]),
2233        )?;
2234
2235        Ok(())
2236    }
2237
2238    #[test]
2239    fn multiply_op_dict() -> Result<()> {
2240        let schema = Schema::new(vec![
2241            Field::new(
2242                "a",
2243                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2244                true,
2245            ),
2246            Field::new(
2247                "b",
2248                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2249                true,
2250            ),
2251        ]);
2252
2253        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2254        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
2255        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
2256
2257        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2258        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2259        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2260
2261        apply_arithmetic::<Int32Type>(
2262            Arc::new(schema),
2263            vec![Arc::new(a), Arc::new(b)],
2264            Operator::Multiply,
2265            Int32Array::from(vec![Some(1), None, Some(4), Some(16), None]),
2266        )?;
2267
2268        Ok(())
2269    }
2270
2271    #[test]
2272    fn multiply_op_dict_decimal() -> Result<()> {
2273        let schema = Schema::new(vec![
2274            Field::new(
2275                "a",
2276                DataType::Dictionary(
2277                    Box::new(DataType::Int8),
2278                    Box::new(DataType::Decimal128(10, 0)),
2279                ),
2280                true,
2281            ),
2282            Field::new(
2283                "b",
2284                DataType::Dictionary(
2285                    Box::new(DataType::Int8),
2286                    Box::new(DataType::Decimal128(10, 0)),
2287                ),
2288                true,
2289            ),
2290        ]);
2291
2292        let value = 123;
2293        let decimal_array = Arc::new(create_decimal_array(
2294            &[
2295                Some(value),
2296                Some(value + 2),
2297                Some(value - 1),
2298                Some(value + 1),
2299            ],
2300            10,
2301            0,
2302        )) as ArrayRef;
2303
2304        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2305        let a = DictionaryArray::try_new(keys, decimal_array)?;
2306
2307        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2308        let decimal_array = Arc::new(create_decimal_array(
2309            &[
2310                Some(value + 1),
2311                Some(value + 3),
2312                Some(value),
2313                Some(value + 2),
2314            ],
2315            10,
2316            0,
2317        ));
2318        let b = DictionaryArray::try_new(keys, decimal_array)?;
2319
2320        apply_arithmetic(
2321            Arc::new(schema),
2322            vec![Arc::new(a), Arc::new(b)],
2323            Operator::Multiply,
2324            create_decimal_array(
2325                &[Some(15252), None, None, Some(15252), Some(15129)],
2326                21,
2327                0,
2328            ),
2329        )?;
2330
2331        Ok(())
2332    }
2333
2334    #[test]
2335    fn multiply_op_scalar() -> Result<()> {
2336        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2337        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2338
2339        apply_arithmetic_scalar(
2340            Arc::new(schema),
2341            vec![Arc::new(a)],
2342            Operator::Multiply,
2343            ScalarValue::Int32(Some(2)),
2344            Arc::new(Int32Array::from(vec![2, 4, 6, 8, 10])),
2345        )?;
2346
2347        Ok(())
2348    }
2349
2350    #[test]
2351    fn multiply_op_dict_scalar() -> Result<()> {
2352        let schema = Schema::new(vec![Field::new(
2353            "a",
2354            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2355            true,
2356        )]);
2357
2358        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2359
2360        dict_builder.append(1)?;
2361        dict_builder.append_null();
2362        dict_builder.append(2)?;
2363        dict_builder.append(5)?;
2364
2365        let a = dict_builder.finish();
2366
2367        let expected: PrimitiveArray<Int32Type> =
2368            PrimitiveArray::from(vec![Some(2), None, Some(4), Some(10)]);
2369
2370        apply_arithmetic_scalar(
2371            Arc::new(schema),
2372            vec![Arc::new(a)],
2373            Operator::Multiply,
2374            ScalarValue::Dictionary(
2375                Box::new(DataType::Int8),
2376                Box::new(ScalarValue::Int32(Some(2))),
2377            ),
2378            Arc::new(expected),
2379        )?;
2380
2381        Ok(())
2382    }
2383
2384    #[test]
2385    fn multiply_op_dict_scalar_decimal() -> Result<()> {
2386        let schema = Schema::new(vec![Field::new(
2387            "a",
2388            DataType::Dictionary(
2389                Box::new(DataType::Int8),
2390                Box::new(DataType::Decimal128(10, 0)),
2391            ),
2392            true,
2393        )]);
2394
2395        let value = 123;
2396        let decimal_array = Arc::new(create_decimal_array(
2397            &[Some(value), None, Some(value - 1), Some(value + 1)],
2398            10,
2399            0,
2400        ));
2401
2402        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2403        let a = DictionaryArray::try_new(keys, decimal_array)?;
2404
2405        let decimal_array = Arc::new(create_decimal_array(
2406            &[Some(246), Some(244), None, Some(248), Some(246)],
2407            21,
2408            0,
2409        ));
2410
2411        apply_arithmetic_scalar(
2412            Arc::new(schema),
2413            vec![Arc::new(a)],
2414            Operator::Multiply,
2415            ScalarValue::Dictionary(
2416                Box::new(DataType::Int8),
2417                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2418            ),
2419            decimal_array,
2420        )?;
2421
2422        Ok(())
2423    }
2424
2425    #[test]
2426    fn divide_op() -> Result<()> {
2427        let schema = Arc::new(Schema::new(vec![
2428            Field::new("a", DataType::Int32, false),
2429            Field::new("b", DataType::Int32, false),
2430        ]));
2431        let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
2432        let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
2433
2434        apply_arithmetic::<Int32Type>(
2435            schema,
2436            vec![a, b],
2437            Operator::Divide,
2438            Int32Array::from(vec![4, 8, 16, 32, 64]),
2439        )?;
2440
2441        Ok(())
2442    }
2443
2444    #[test]
2445    fn divide_op_dict() -> Result<()> {
2446        let schema = Schema::new(vec![
2447            Field::new(
2448                "a",
2449                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2450                true,
2451            ),
2452            Field::new(
2453                "b",
2454                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2455                true,
2456            ),
2457        ]);
2458
2459        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2460
2461        dict_builder.append(1)?;
2462        dict_builder.append_null();
2463        dict_builder.append(2)?;
2464        dict_builder.append(5)?;
2465        dict_builder.append(0)?;
2466
2467        let a = dict_builder.finish();
2468
2469        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2470        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2471        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2472
2473        apply_arithmetic::<Int32Type>(
2474            Arc::new(schema),
2475            vec![Arc::new(a), Arc::new(b)],
2476            Operator::Divide,
2477            Int32Array::from(vec![Some(1), None, Some(1), Some(1), Some(0)]),
2478        )?;
2479
2480        Ok(())
2481    }
2482
2483    #[test]
2484    fn divide_op_dict_decimal() -> Result<()> {
2485        let schema = Schema::new(vec![
2486            Field::new(
2487                "a",
2488                DataType::Dictionary(
2489                    Box::new(DataType::Int8),
2490                    Box::new(DataType::Decimal128(10, 0)),
2491                ),
2492                true,
2493            ),
2494            Field::new(
2495                "b",
2496                DataType::Dictionary(
2497                    Box::new(DataType::Int8),
2498                    Box::new(DataType::Decimal128(10, 0)),
2499                ),
2500                true,
2501            ),
2502        ]);
2503
2504        let value = 123;
2505        let decimal_array = Arc::new(create_decimal_array(
2506            &[
2507                Some(value),
2508                Some(value + 2),
2509                Some(value - 1),
2510                Some(value + 1),
2511            ],
2512            10,
2513            0,
2514        ));
2515
2516        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2517        let a = DictionaryArray::try_new(keys, decimal_array)?;
2518
2519        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2520        let decimal_array = Arc::new(create_decimal_array(
2521            &[
2522                Some(value + 1),
2523                Some(value + 3),
2524                Some(value),
2525                Some(value + 2),
2526            ],
2527            10,
2528            0,
2529        ));
2530        let b = DictionaryArray::try_new(keys, decimal_array)?;
2531
2532        apply_arithmetic(
2533            Arc::new(schema),
2534            vec![Arc::new(a), Arc::new(b)],
2535            Operator::Divide,
2536            create_decimal_array(
2537                &[
2538                    Some(9919), // 0.9919
2539                    None,
2540                    None,
2541                    Some(10081), // 1.0081
2542                    Some(10000), // 1.0
2543                ],
2544                14,
2545                4,
2546            ),
2547        )?;
2548
2549        Ok(())
2550    }
2551
2552    #[test]
2553    fn divide_op_scalar() -> Result<()> {
2554        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2555        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2556
2557        apply_arithmetic_scalar(
2558            Arc::new(schema),
2559            vec![Arc::new(a)],
2560            Operator::Divide,
2561            ScalarValue::Int32(Some(2)),
2562            Arc::new(Int32Array::from(vec![0, 1, 1, 2, 2])),
2563        )?;
2564
2565        Ok(())
2566    }
2567
2568    #[test]
2569    fn divide_op_dict_scalar() -> Result<()> {
2570        let schema = Schema::new(vec![Field::new(
2571            "a",
2572            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2573            true,
2574        )]);
2575
2576        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2577
2578        dict_builder.append(1)?;
2579        dict_builder.append_null();
2580        dict_builder.append(2)?;
2581        dict_builder.append(5)?;
2582
2583        let a = dict_builder.finish();
2584
2585        let expected: PrimitiveArray<Int32Type> =
2586            PrimitiveArray::from(vec![Some(0), None, Some(1), Some(2)]);
2587
2588        apply_arithmetic_scalar(
2589            Arc::new(schema),
2590            vec![Arc::new(a)],
2591            Operator::Divide,
2592            ScalarValue::Dictionary(
2593                Box::new(DataType::Int8),
2594                Box::new(ScalarValue::Int32(Some(2))),
2595            ),
2596            Arc::new(expected),
2597        )?;
2598
2599        Ok(())
2600    }
2601
2602    #[test]
2603    fn divide_op_dict_scalar_decimal() -> Result<()> {
2604        let schema = Schema::new(vec![Field::new(
2605            "a",
2606            DataType::Dictionary(
2607                Box::new(DataType::Int8),
2608                Box::new(DataType::Decimal128(10, 0)),
2609            ),
2610            true,
2611        )]);
2612
2613        let value = 123;
2614        let decimal_array = Arc::new(create_decimal_array(
2615            &[Some(value), None, Some(value - 1), Some(value + 1)],
2616            10,
2617            0,
2618        ));
2619
2620        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2621        let a = DictionaryArray::try_new(keys, decimal_array)?;
2622
2623        let decimal_array = Arc::new(create_decimal_array(
2624            &[Some(615000), Some(610000), None, Some(620000), Some(615000)],
2625            14,
2626            4,
2627        ));
2628
2629        apply_arithmetic_scalar(
2630            Arc::new(schema),
2631            vec![Arc::new(a)],
2632            Operator::Divide,
2633            ScalarValue::Dictionary(
2634                Box::new(DataType::Int8),
2635                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2636            ),
2637            decimal_array,
2638        )?;
2639
2640        Ok(())
2641    }
2642
2643    #[test]
2644    fn modulus_op() -> Result<()> {
2645        let schema = Arc::new(Schema::new(vec![
2646            Field::new("a", DataType::Int32, false),
2647            Field::new("b", DataType::Int32, false),
2648        ]));
2649        let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
2650        let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));
2651
2652        apply_arithmetic::<Int32Type>(
2653            schema,
2654            vec![a, b],
2655            Operator::Modulo,
2656            Int32Array::from(vec![0, 0, 2, 8, 0]),
2657        )?;
2658
2659        Ok(())
2660    }
2661
2662    #[test]
2663    fn modulus_op_dict() -> Result<()> {
2664        let schema = Schema::new(vec![
2665            Field::new(
2666                "a",
2667                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2668                true,
2669            ),
2670            Field::new(
2671                "b",
2672                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2673                true,
2674            ),
2675        ]);
2676
2677        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2678
2679        dict_builder.append(1)?;
2680        dict_builder.append_null();
2681        dict_builder.append(2)?;
2682        dict_builder.append(5)?;
2683        dict_builder.append(0)?;
2684
2685        let a = dict_builder.finish();
2686
2687        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2688        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2689        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2690
2691        apply_arithmetic::<Int32Type>(
2692            Arc::new(schema),
2693            vec![Arc::new(a), Arc::new(b)],
2694            Operator::Modulo,
2695            Int32Array::from(vec![Some(0), None, Some(0), Some(1), Some(0)]),
2696        )?;
2697
2698        Ok(())
2699    }
2700
2701    #[test]
2702    fn modulus_op_dict_decimal() -> Result<()> {
2703        let schema = Schema::new(vec![
2704            Field::new(
2705                "a",
2706                DataType::Dictionary(
2707                    Box::new(DataType::Int8),
2708                    Box::new(DataType::Decimal128(10, 0)),
2709                ),
2710                true,
2711            ),
2712            Field::new(
2713                "b",
2714                DataType::Dictionary(
2715                    Box::new(DataType::Int8),
2716                    Box::new(DataType::Decimal128(10, 0)),
2717                ),
2718                true,
2719            ),
2720        ]);
2721
2722        let value = 123;
2723        let decimal_array = Arc::new(create_decimal_array(
2724            &[
2725                Some(value),
2726                Some(value + 2),
2727                Some(value - 1),
2728                Some(value + 1),
2729            ],
2730            10,
2731            0,
2732        ));
2733
2734        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2735        let a = DictionaryArray::try_new(keys, decimal_array)?;
2736
2737        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2738        let decimal_array = Arc::new(create_decimal_array(
2739            &[
2740                Some(value + 1),
2741                Some(value + 3),
2742                Some(value),
2743                Some(value + 2),
2744            ],
2745            10,
2746            0,
2747        ));
2748        let b = DictionaryArray::try_new(keys, decimal_array)?;
2749
2750        apply_arithmetic(
2751            Arc::new(schema),
2752            vec![Arc::new(a), Arc::new(b)],
2753            Operator::Modulo,
2754            create_decimal_array(&[Some(123), None, None, Some(1), Some(0)], 10, 0),
2755        )?;
2756
2757        Ok(())
2758    }
2759
2760    #[test]
2761    fn modulus_op_scalar() -> Result<()> {
2762        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2763        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2764
2765        apply_arithmetic_scalar(
2766            Arc::new(schema),
2767            vec![Arc::new(a)],
2768            Operator::Modulo,
2769            ScalarValue::Int32(Some(2)),
2770            Arc::new(Int32Array::from(vec![1, 0, 1, 0, 1])),
2771        )?;
2772
2773        Ok(())
2774    }
2775
2776    #[test]
2777    fn modules_op_dict_scalar() -> Result<()> {
2778        let schema = Schema::new(vec![Field::new(
2779            "a",
2780            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2781            true,
2782        )]);
2783
2784        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2785
2786        dict_builder.append(1)?;
2787        dict_builder.append_null();
2788        dict_builder.append(2)?;
2789        dict_builder.append(5)?;
2790
2791        let a = dict_builder.finish();
2792
2793        let expected: PrimitiveArray<Int32Type> =
2794            PrimitiveArray::from(vec![Some(1), None, Some(0), Some(1)]);
2795
2796        apply_arithmetic_scalar(
2797            Arc::new(schema),
2798            vec![Arc::new(a)],
2799            Operator::Modulo,
2800            ScalarValue::Dictionary(
2801                Box::new(DataType::Int8),
2802                Box::new(ScalarValue::Int32(Some(2))),
2803            ),
2804            Arc::new(expected),
2805        )?;
2806
2807        Ok(())
2808    }
2809
2810    #[test]
2811    fn modulus_op_dict_scalar_decimal() -> Result<()> {
2812        let schema = Schema::new(vec![Field::new(
2813            "a",
2814            DataType::Dictionary(
2815                Box::new(DataType::Int8),
2816                Box::new(DataType::Decimal128(10, 0)),
2817            ),
2818            true,
2819        )]);
2820
2821        let value = 123;
2822        let decimal_array = Arc::new(create_decimal_array(
2823            &[Some(value), None, Some(value - 1), Some(value + 1)],
2824            10,
2825            0,
2826        ));
2827
2828        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2829        let a = DictionaryArray::try_new(keys, decimal_array)?;
2830
2831        let decimal_array = Arc::new(create_decimal_array(
2832            &[Some(1), Some(0), None, Some(0), Some(1)],
2833            10,
2834            0,
2835        ));
2836
2837        apply_arithmetic_scalar(
2838            Arc::new(schema),
2839            vec![Arc::new(a)],
2840            Operator::Modulo,
2841            ScalarValue::Dictionary(
2842                Box::new(DataType::Int8),
2843                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2844            ),
2845            decimal_array,
2846        )?;
2847
2848        Ok(())
2849    }
2850
2851    fn apply_arithmetic<T: ArrowNumericType>(
2852        schema: SchemaRef,
2853        data: Vec<ArrayRef>,
2854        op: Operator,
2855        expected: PrimitiveArray<T>,
2856    ) -> Result<()> {
2857        let arithmetic_op =
2858            binary_op(col("a", &schema)?, op, col("b", &schema)?, &schema)?;
2859        let batch = RecordBatch::try_new(schema, data)?;
2860        let result = arithmetic_op
2861            .evaluate(&batch)?
2862            .into_array(batch.num_rows())
2863            .expect("Failed to convert to array");
2864
2865        assert_eq!(result.as_ref(), &expected);
2866        Ok(())
2867    }
2868
2869    fn apply_arithmetic_scalar(
2870        schema: SchemaRef,
2871        data: Vec<ArrayRef>,
2872        op: Operator,
2873        literal: ScalarValue,
2874        expected: ArrayRef,
2875    ) -> Result<()> {
2876        let lit = Arc::new(Literal::new(literal));
2877        let arithmetic_op = binary_op(col("a", &schema)?, op, lit, &schema)?;
2878        let batch = RecordBatch::try_new(schema, data)?;
2879        let result = arithmetic_op
2880            .evaluate(&batch)?
2881            .into_array(batch.num_rows())
2882            .expect("Failed to convert to array");
2883
2884        assert_eq!(&result, &expected);
2885        Ok(())
2886    }
2887
2888    fn apply_logic_op(
2889        schema: &SchemaRef,
2890        left: &ArrayRef,
2891        right: &ArrayRef,
2892        op: Operator,
2893        expected: BooleanArray,
2894    ) -> Result<()> {
2895        let op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
2896        let data: Vec<ArrayRef> = vec![Arc::clone(left), Arc::clone(right)];
2897        let batch = RecordBatch::try_new(Arc::clone(schema), data)?;
2898        let result = op
2899            .evaluate(&batch)?
2900            .into_array(batch.num_rows())
2901            .expect("Failed to convert to array");
2902
2903        assert_eq!(result.as_ref(), &expected);
2904        Ok(())
2905    }
2906
2907    // Test `scalar <op> arr` produces expected
2908    fn apply_logic_op_scalar_arr(
2909        schema: &SchemaRef,
2910        scalar: &ScalarValue,
2911        arr: &ArrayRef,
2912        op: Operator,
2913        expected: &BooleanArray,
2914    ) -> Result<()> {
2915        let scalar = lit(scalar.clone());
2916        let op = binary_op(scalar, op, col("a", schema)?, schema)?;
2917        let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
2918        let result = op
2919            .evaluate(&batch)?
2920            .into_array(batch.num_rows())
2921            .expect("Failed to convert to array");
2922        assert_eq!(result.as_ref(), expected);
2923
2924        Ok(())
2925    }
2926
2927    // Test `arr <op> scalar` produces expected
2928    fn apply_logic_op_arr_scalar(
2929        schema: &SchemaRef,
2930        arr: &ArrayRef,
2931        scalar: &ScalarValue,
2932        op: Operator,
2933        expected: &BooleanArray,
2934    ) -> Result<()> {
2935        let scalar = lit(scalar.clone());
2936        let op = binary_op(col("a", schema)?, op, scalar, schema)?;
2937        let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
2938        let result = op
2939            .evaluate(&batch)?
2940            .into_array(batch.num_rows())
2941            .expect("Failed to convert to array");
2942        assert_eq!(result.as_ref(), expected);
2943
2944        Ok(())
2945    }
2946
2947    #[test]
2948    fn and_with_nulls_op() -> Result<()> {
2949        let schema = Schema::new(vec![
2950            Field::new("a", DataType::Boolean, true),
2951            Field::new("b", DataType::Boolean, true),
2952        ]);
2953        let a = Arc::new(BooleanArray::from(vec![
2954            Some(true),
2955            Some(false),
2956            None,
2957            Some(true),
2958            Some(false),
2959            None,
2960            Some(true),
2961            Some(false),
2962            None,
2963        ])) as ArrayRef;
2964        let b = Arc::new(BooleanArray::from(vec![
2965            Some(true),
2966            Some(true),
2967            Some(true),
2968            Some(false),
2969            Some(false),
2970            Some(false),
2971            None,
2972            None,
2973            None,
2974        ])) as ArrayRef;
2975
2976        let expected = BooleanArray::from(vec![
2977            Some(true),
2978            Some(false),
2979            None,
2980            Some(false),
2981            Some(false),
2982            Some(false),
2983            None,
2984            Some(false),
2985            None,
2986        ]);
2987        apply_logic_op(&Arc::new(schema), &a, &b, Operator::And, expected)?;
2988
2989        Ok(())
2990    }
2991
2992    #[test]
2993    fn regex_with_nulls() -> Result<()> {
2994        let schema = Schema::new(vec![
2995            Field::new("a", DataType::Utf8, true),
2996            Field::new("b", DataType::Utf8, true),
2997        ]);
2998        let a = Arc::new(StringArray::from(vec![
2999            Some("abc"),
3000            None,
3001            Some("abc"),
3002            None,
3003            Some("abc"),
3004        ])) as ArrayRef;
3005        let b = Arc::new(StringArray::from(vec![
3006            Some("^a"),
3007            Some("^A"),
3008            None,
3009            None,
3010            Some("^(b|c)"),
3011        ])) as ArrayRef;
3012
3013        let regex_expected =
3014            BooleanArray::from(vec![Some(true), None, None, None, Some(false)]);
3015        let regex_not_expected =
3016            BooleanArray::from(vec![Some(false), None, None, None, Some(true)]);
3017        apply_logic_op(
3018            &Arc::new(schema.clone()),
3019            &a,
3020            &b,
3021            Operator::RegexMatch,
3022            regex_expected.clone(),
3023        )?;
3024        apply_logic_op(
3025            &Arc::new(schema.clone()),
3026            &a,
3027            &b,
3028            Operator::RegexIMatch,
3029            regex_expected.clone(),
3030        )?;
3031        apply_logic_op(
3032            &Arc::new(schema.clone()),
3033            &a,
3034            &b,
3035            Operator::RegexNotMatch,
3036            regex_not_expected.clone(),
3037        )?;
3038        apply_logic_op(
3039            &Arc::new(schema),
3040            &a,
3041            &b,
3042            Operator::RegexNotIMatch,
3043            regex_not_expected.clone(),
3044        )?;
3045
3046        let schema = Schema::new(vec![
3047            Field::new("a", DataType::LargeUtf8, true),
3048            Field::new("b", DataType::LargeUtf8, true),
3049        ]);
3050        let a = Arc::new(LargeStringArray::from(vec![
3051            Some("abc"),
3052            None,
3053            Some("abc"),
3054            None,
3055            Some("abc"),
3056        ])) as ArrayRef;
3057        let b = Arc::new(LargeStringArray::from(vec![
3058            Some("^a"),
3059            Some("^A"),
3060            None,
3061            None,
3062            Some("^(b|c)"),
3063        ])) as ArrayRef;
3064
3065        apply_logic_op(
3066            &Arc::new(schema.clone()),
3067            &a,
3068            &b,
3069            Operator::RegexMatch,
3070            regex_expected.clone(),
3071        )?;
3072        apply_logic_op(
3073            &Arc::new(schema.clone()),
3074            &a,
3075            &b,
3076            Operator::RegexIMatch,
3077            regex_expected,
3078        )?;
3079        apply_logic_op(
3080            &Arc::new(schema.clone()),
3081            &a,
3082            &b,
3083            Operator::RegexNotMatch,
3084            regex_not_expected.clone(),
3085        )?;
3086        apply_logic_op(
3087            &Arc::new(schema),
3088            &a,
3089            &b,
3090            Operator::RegexNotIMatch,
3091            regex_not_expected,
3092        )?;
3093
3094        Ok(())
3095    }
3096
3097    #[test]
3098    fn or_with_nulls_op() -> Result<()> {
3099        let schema = Schema::new(vec![
3100            Field::new("a", DataType::Boolean, true),
3101            Field::new("b", DataType::Boolean, true),
3102        ]);
3103        let a = Arc::new(BooleanArray::from(vec![
3104            Some(true),
3105            Some(false),
3106            None,
3107            Some(true),
3108            Some(false),
3109            None,
3110            Some(true),
3111            Some(false),
3112            None,
3113        ])) as ArrayRef;
3114        let b = Arc::new(BooleanArray::from(vec![
3115            Some(true),
3116            Some(true),
3117            Some(true),
3118            Some(false),
3119            Some(false),
3120            Some(false),
3121            None,
3122            None,
3123            None,
3124        ])) as ArrayRef;
3125
3126        let expected = BooleanArray::from(vec![
3127            Some(true),
3128            Some(true),
3129            Some(true),
3130            Some(true),
3131            Some(false),
3132            None,
3133            Some(true),
3134            None,
3135            None,
3136        ]);
3137        apply_logic_op(&Arc::new(schema), &a, &b, Operator::Or, expected)?;
3138
3139        Ok(())
3140    }
3141
3142    /// Returns (schema, a: BooleanArray, b: BooleanArray) with all possible inputs
3143    ///
3144    /// a: [true, true, true,  NULL, NULL, NULL,  false, false, false]
3145    /// b: [true, NULL, false, true, NULL, false, true,  NULL,  false]
3146    fn bool_test_arrays() -> (SchemaRef, ArrayRef, ArrayRef) {
3147        let schema = Schema::new(vec![
3148            Field::new("a", DataType::Boolean, true),
3149            Field::new("b", DataType::Boolean, true),
3150        ]);
3151        let a: BooleanArray = [
3152            Some(true),
3153            Some(true),
3154            Some(true),
3155            None,
3156            None,
3157            None,
3158            Some(false),
3159            Some(false),
3160            Some(false),
3161        ]
3162        .iter()
3163        .collect();
3164        let b: BooleanArray = [
3165            Some(true),
3166            None,
3167            Some(false),
3168            Some(true),
3169            None,
3170            Some(false),
3171            Some(true),
3172            None,
3173            Some(false),
3174        ]
3175        .iter()
3176        .collect();
3177        (Arc::new(schema), Arc::new(a), Arc::new(b))
3178    }
3179
3180    /// Returns (schema, BooleanArray) with [true, NULL, false]
3181    fn scalar_bool_test_array() -> (SchemaRef, ArrayRef) {
3182        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
3183        let a: BooleanArray = [Some(true), None, Some(false)].iter().collect();
3184        (Arc::new(schema), Arc::new(a))
3185    }
3186
3187    #[test]
3188    fn eq_op_bool() {
3189        let (schema, a, b) = bool_test_arrays();
3190        let expected = [
3191            Some(true),
3192            None,
3193            Some(false),
3194            None,
3195            None,
3196            None,
3197            Some(false),
3198            None,
3199            Some(true),
3200        ]
3201        .iter()
3202        .collect();
3203        apply_logic_op(&schema, &a, &b, Operator::Eq, expected).unwrap();
3204    }
3205
3206    #[test]
3207    fn eq_op_bool_scalar() {
3208        let (schema, a) = scalar_bool_test_array();
3209        let expected = [Some(true), None, Some(false)].iter().collect();
3210        apply_logic_op_scalar_arr(
3211            &schema,
3212            &ScalarValue::from(true),
3213            &a,
3214            Operator::Eq,
3215            &expected,
3216        )
3217        .unwrap();
3218        apply_logic_op_arr_scalar(
3219            &schema,
3220            &a,
3221            &ScalarValue::from(true),
3222            Operator::Eq,
3223            &expected,
3224        )
3225        .unwrap();
3226
3227        let expected = [Some(false), None, Some(true)].iter().collect();
3228        apply_logic_op_scalar_arr(
3229            &schema,
3230            &ScalarValue::from(false),
3231            &a,
3232            Operator::Eq,
3233            &expected,
3234        )
3235        .unwrap();
3236        apply_logic_op_arr_scalar(
3237            &schema,
3238            &a,
3239            &ScalarValue::from(false),
3240            Operator::Eq,
3241            &expected,
3242        )
3243        .unwrap();
3244    }
3245
3246    #[test]
3247    fn neq_op_bool() {
3248        let (schema, a, b) = bool_test_arrays();
3249        let expected = [
3250            Some(false),
3251            None,
3252            Some(true),
3253            None,
3254            None,
3255            None,
3256            Some(true),
3257            None,
3258            Some(false),
3259        ]
3260        .iter()
3261        .collect();
3262        apply_logic_op(&schema, &a, &b, Operator::NotEq, expected).unwrap();
3263    }
3264
3265    #[test]
3266    fn neq_op_bool_scalar() {
3267        let (schema, a) = scalar_bool_test_array();
3268        let expected = [Some(false), None, Some(true)].iter().collect();
3269        apply_logic_op_scalar_arr(
3270            &schema,
3271            &ScalarValue::from(true),
3272            &a,
3273            Operator::NotEq,
3274            &expected,
3275        )
3276        .unwrap();
3277        apply_logic_op_arr_scalar(
3278            &schema,
3279            &a,
3280            &ScalarValue::from(true),
3281            Operator::NotEq,
3282            &expected,
3283        )
3284        .unwrap();
3285
3286        let expected = [Some(true), None, Some(false)].iter().collect();
3287        apply_logic_op_scalar_arr(
3288            &schema,
3289            &ScalarValue::from(false),
3290            &a,
3291            Operator::NotEq,
3292            &expected,
3293        )
3294        .unwrap();
3295        apply_logic_op_arr_scalar(
3296            &schema,
3297            &a,
3298            &ScalarValue::from(false),
3299            Operator::NotEq,
3300            &expected,
3301        )
3302        .unwrap();
3303    }
3304
3305    #[test]
3306    fn lt_op_bool() {
3307        let (schema, a, b) = bool_test_arrays();
3308        let expected = [
3309            Some(false),
3310            None,
3311            Some(false),
3312            None,
3313            None,
3314            None,
3315            Some(true),
3316            None,
3317            Some(false),
3318        ]
3319        .iter()
3320        .collect();
3321        apply_logic_op(&schema, &a, &b, Operator::Lt, expected).unwrap();
3322    }
3323
3324    #[test]
3325    fn lt_op_bool_scalar() {
3326        let (schema, a) = scalar_bool_test_array();
3327        let expected = [Some(false), None, Some(false)].iter().collect();
3328        apply_logic_op_scalar_arr(
3329            &schema,
3330            &ScalarValue::from(true),
3331            &a,
3332            Operator::Lt,
3333            &expected,
3334        )
3335        .unwrap();
3336
3337        let expected = [Some(false), None, Some(true)].iter().collect();
3338        apply_logic_op_arr_scalar(
3339            &schema,
3340            &a,
3341            &ScalarValue::from(true),
3342            Operator::Lt,
3343            &expected,
3344        )
3345        .unwrap();
3346
3347        let expected = [Some(true), None, Some(false)].iter().collect();
3348        apply_logic_op_scalar_arr(
3349            &schema,
3350            &ScalarValue::from(false),
3351            &a,
3352            Operator::Lt,
3353            &expected,
3354        )
3355        .unwrap();
3356
3357        let expected = [Some(false), None, Some(false)].iter().collect();
3358        apply_logic_op_arr_scalar(
3359            &schema,
3360            &a,
3361            &ScalarValue::from(false),
3362            Operator::Lt,
3363            &expected,
3364        )
3365        .unwrap();
3366    }
3367
3368    #[test]
3369    fn lt_eq_op_bool() {
3370        let (schema, a, b) = bool_test_arrays();
3371        let expected = [
3372            Some(true),
3373            None,
3374            Some(false),
3375            None,
3376            None,
3377            None,
3378            Some(true),
3379            None,
3380            Some(true),
3381        ]
3382        .iter()
3383        .collect();
3384        apply_logic_op(&schema, &a, &b, Operator::LtEq, expected).unwrap();
3385    }
3386
3387    #[test]
3388    fn lt_eq_op_bool_scalar() {
3389        let (schema, a) = scalar_bool_test_array();
3390        let expected = [Some(true), None, Some(false)].iter().collect();
3391        apply_logic_op_scalar_arr(
3392            &schema,
3393            &ScalarValue::from(true),
3394            &a,
3395            Operator::LtEq,
3396            &expected,
3397        )
3398        .unwrap();
3399
3400        let expected = [Some(true), None, Some(true)].iter().collect();
3401        apply_logic_op_arr_scalar(
3402            &schema,
3403            &a,
3404            &ScalarValue::from(true),
3405            Operator::LtEq,
3406            &expected,
3407        )
3408        .unwrap();
3409
3410        let expected = [Some(true), None, Some(true)].iter().collect();
3411        apply_logic_op_scalar_arr(
3412            &schema,
3413            &ScalarValue::from(false),
3414            &a,
3415            Operator::LtEq,
3416            &expected,
3417        )
3418        .unwrap();
3419
3420        let expected = [Some(false), None, Some(true)].iter().collect();
3421        apply_logic_op_arr_scalar(
3422            &schema,
3423            &a,
3424            &ScalarValue::from(false),
3425            Operator::LtEq,
3426            &expected,
3427        )
3428        .unwrap();
3429    }
3430
3431    #[test]
3432    fn gt_op_bool() {
3433        let (schema, a, b) = bool_test_arrays();
3434        let expected = [
3435            Some(false),
3436            None,
3437            Some(true),
3438            None,
3439            None,
3440            None,
3441            Some(false),
3442            None,
3443            Some(false),
3444        ]
3445        .iter()
3446        .collect();
3447        apply_logic_op(&schema, &a, &b, Operator::Gt, expected).unwrap();
3448    }
3449
3450    #[test]
3451    fn gt_op_bool_scalar() {
3452        let (schema, a) = scalar_bool_test_array();
3453        let expected = [Some(false), None, Some(true)].iter().collect();
3454        apply_logic_op_scalar_arr(
3455            &schema,
3456            &ScalarValue::from(true),
3457            &a,
3458            Operator::Gt,
3459            &expected,
3460        )
3461        .unwrap();
3462
3463        let expected = [Some(false), None, Some(false)].iter().collect();
3464        apply_logic_op_arr_scalar(
3465            &schema,
3466            &a,
3467            &ScalarValue::from(true),
3468            Operator::Gt,
3469            &expected,
3470        )
3471        .unwrap();
3472
3473        let expected = [Some(false), None, Some(false)].iter().collect();
3474        apply_logic_op_scalar_arr(
3475            &schema,
3476            &ScalarValue::from(false),
3477            &a,
3478            Operator::Gt,
3479            &expected,
3480        )
3481        .unwrap();
3482
3483        let expected = [Some(true), None, Some(false)].iter().collect();
3484        apply_logic_op_arr_scalar(
3485            &schema,
3486            &a,
3487            &ScalarValue::from(false),
3488            Operator::Gt,
3489            &expected,
3490        )
3491        .unwrap();
3492    }
3493
3494    #[test]
3495    fn gt_eq_op_bool() {
3496        let (schema, a, b) = bool_test_arrays();
3497        let expected = [
3498            Some(true),
3499            None,
3500            Some(true),
3501            None,
3502            None,
3503            None,
3504            Some(false),
3505            None,
3506            Some(true),
3507        ]
3508        .iter()
3509        .collect();
3510        apply_logic_op(&schema, &a, &b, Operator::GtEq, expected).unwrap();
3511    }
3512
3513    #[test]
3514    fn gt_eq_op_bool_scalar() {
3515        let (schema, a) = scalar_bool_test_array();
3516        let expected = [Some(true), None, Some(true)].iter().collect();
3517        apply_logic_op_scalar_arr(
3518            &schema,
3519            &ScalarValue::from(true),
3520            &a,
3521            Operator::GtEq,
3522            &expected,
3523        )
3524        .unwrap();
3525
3526        let expected = [Some(true), None, Some(false)].iter().collect();
3527        apply_logic_op_arr_scalar(
3528            &schema,
3529            &a,
3530            &ScalarValue::from(true),
3531            Operator::GtEq,
3532            &expected,
3533        )
3534        .unwrap();
3535
3536        let expected = [Some(false), None, Some(true)].iter().collect();
3537        apply_logic_op_scalar_arr(
3538            &schema,
3539            &ScalarValue::from(false),
3540            &a,
3541            Operator::GtEq,
3542            &expected,
3543        )
3544        .unwrap();
3545
3546        let expected = [Some(true), None, Some(true)].iter().collect();
3547        apply_logic_op_arr_scalar(
3548            &schema,
3549            &a,
3550            &ScalarValue::from(false),
3551            Operator::GtEq,
3552            &expected,
3553        )
3554        .unwrap();
3555    }
3556
3557    #[test]
3558    fn is_distinct_from_op_bool() {
3559        let (schema, a, b) = bool_test_arrays();
3560        let expected = [
3561            Some(false),
3562            Some(true),
3563            Some(true),
3564            Some(true),
3565            Some(false),
3566            Some(true),
3567            Some(true),
3568            Some(true),
3569            Some(false),
3570        ]
3571        .iter()
3572        .collect();
3573        apply_logic_op(&schema, &a, &b, Operator::IsDistinctFrom, expected).unwrap();
3574    }
3575
3576    #[test]
3577    fn is_not_distinct_from_op_bool() {
3578        let (schema, a, b) = bool_test_arrays();
3579        let expected = [
3580            Some(true),
3581            Some(false),
3582            Some(false),
3583            Some(false),
3584            Some(true),
3585            Some(false),
3586            Some(false),
3587            Some(false),
3588            Some(true),
3589        ]
3590        .iter()
3591        .collect();
3592        apply_logic_op(&schema, &a, &b, Operator::IsNotDistinctFrom, expected).unwrap();
3593    }
3594
3595    #[test]
3596    fn relatively_deeply_nested() {
3597        // Reproducer for https://github.com/apache/datafusion/issues/419
3598
3599        // where even relatively shallow binary expressions overflowed
3600        // the stack in debug builds
3601
3602        let input: Vec<_> = vec![1, 2, 3, 4, 5].into_iter().map(Some).collect();
3603        let a: Int32Array = input.iter().collect();
3604
3605        let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(a) as _)]).unwrap();
3606        let schema = batch.schema();
3607
3608        // build a left deep tree ((((a + a) + a) + a ....
3609        let tree_depth: i32 = 100;
3610        let expr = (0..tree_depth)
3611            .map(|_| col("a", schema.as_ref()).unwrap())
3612            .reduce(|l, r| binary(l, Operator::Plus, r, &schema).unwrap())
3613            .unwrap();
3614
3615        let result = expr
3616            .evaluate(&batch)
3617            .expect("evaluation")
3618            .into_array(batch.num_rows())
3619            .expect("Failed to convert to array");
3620
3621        let expected: Int32Array = input
3622            .into_iter()
3623            .map(|i| i.map(|i| i * tree_depth))
3624            .collect();
3625        assert_eq!(result.as_ref(), &expected);
3626    }
3627
3628    fn create_decimal_array(
3629        array: &[Option<i128>],
3630        precision: u8,
3631        scale: i8,
3632    ) -> Decimal128Array {
3633        let mut decimal_builder = Decimal128Builder::with_capacity(array.len());
3634        for value in array.iter().copied() {
3635            decimal_builder.append_option(value)
3636        }
3637        decimal_builder
3638            .finish()
3639            .with_precision_and_scale(precision, scale)
3640            .unwrap()
3641    }
3642
3643    #[test]
3644    fn comparison_dict_decimal_scalar_expr_test() -> Result<()> {
3645        // scalar of decimal compare with dictionary decimal array
3646        let value_i128 = 123;
3647        let decimal_scalar = ScalarValue::Dictionary(
3648            Box::new(DataType::Int8),
3649            Box::new(ScalarValue::Decimal128(Some(value_i128), 25, 3)),
3650        );
3651        let schema = Arc::new(Schema::new(vec![Field::new(
3652            "a",
3653            DataType::Dictionary(
3654                Box::new(DataType::Int8),
3655                Box::new(DataType::Decimal128(25, 3)),
3656            ),
3657            true,
3658        )]));
3659        let decimal_array = Arc::new(create_decimal_array(
3660            &[
3661                Some(value_i128),
3662                None,
3663                Some(value_i128 - 1),
3664                Some(value_i128 + 1),
3665            ],
3666            25,
3667            3,
3668        ));
3669
3670        let keys = Int8Array::from(vec![Some(0), None, Some(2), Some(3)]);
3671        let dictionary =
3672            Arc::new(DictionaryArray::try_new(keys, decimal_array)?) as ArrayRef;
3673
3674        // array = scalar
3675        apply_logic_op_arr_scalar(
3676            &schema,
3677            &dictionary,
3678            &decimal_scalar,
3679            Operator::Eq,
3680            &BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3681        )
3682        .unwrap();
3683        // array != scalar
3684        apply_logic_op_arr_scalar(
3685            &schema,
3686            &dictionary,
3687            &decimal_scalar,
3688            Operator::NotEq,
3689            &BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3690        )
3691        .unwrap();
3692        //  array < scalar
3693        apply_logic_op_arr_scalar(
3694            &schema,
3695            &dictionary,
3696            &decimal_scalar,
3697            Operator::Lt,
3698            &BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3699        )
3700        .unwrap();
3701
3702        //  array <= scalar
3703        apply_logic_op_arr_scalar(
3704            &schema,
3705            &dictionary,
3706            &decimal_scalar,
3707            Operator::LtEq,
3708            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3709        )
3710        .unwrap();
3711        // array > scalar
3712        apply_logic_op_arr_scalar(
3713            &schema,
3714            &dictionary,
3715            &decimal_scalar,
3716            Operator::Gt,
3717            &BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3718        )
3719        .unwrap();
3720
3721        // array >= scalar
3722        apply_logic_op_arr_scalar(
3723            &schema,
3724            &dictionary,
3725            &decimal_scalar,
3726            Operator::GtEq,
3727            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3728        )
3729        .unwrap();
3730
3731        Ok(())
3732    }
3733
3734    #[test]
3735    fn comparison_decimal_expr_test() -> Result<()> {
3736        // scalar of decimal compare with decimal array
3737        let value_i128 = 123;
3738        let decimal_scalar = ScalarValue::Decimal128(Some(value_i128), 25, 3);
3739        let schema = Arc::new(Schema::new(vec![Field::new(
3740            "a",
3741            DataType::Decimal128(25, 3),
3742            true,
3743        )]));
3744        let decimal_array = Arc::new(create_decimal_array(
3745            &[
3746                Some(value_i128),
3747                None,
3748                Some(value_i128 - 1),
3749                Some(value_i128 + 1),
3750            ],
3751            25,
3752            3,
3753        )) as ArrayRef;
3754        // array = scalar
3755        apply_logic_op_arr_scalar(
3756            &schema,
3757            &decimal_array,
3758            &decimal_scalar,
3759            Operator::Eq,
3760            &BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3761        )
3762        .unwrap();
3763        // array != scalar
3764        apply_logic_op_arr_scalar(
3765            &schema,
3766            &decimal_array,
3767            &decimal_scalar,
3768            Operator::NotEq,
3769            &BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3770        )
3771        .unwrap();
3772        //  array < scalar
3773        apply_logic_op_arr_scalar(
3774            &schema,
3775            &decimal_array,
3776            &decimal_scalar,
3777            Operator::Lt,
3778            &BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3779        )
3780        .unwrap();
3781
3782        //  array <= scalar
3783        apply_logic_op_arr_scalar(
3784            &schema,
3785            &decimal_array,
3786            &decimal_scalar,
3787            Operator::LtEq,
3788            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3789        )
3790        .unwrap();
3791        // array > scalar
3792        apply_logic_op_arr_scalar(
3793            &schema,
3794            &decimal_array,
3795            &decimal_scalar,
3796            Operator::Gt,
3797            &BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3798        )
3799        .unwrap();
3800
3801        // array >= scalar
3802        apply_logic_op_arr_scalar(
3803            &schema,
3804            &decimal_array,
3805            &decimal_scalar,
3806            Operator::GtEq,
3807            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3808        )
3809        .unwrap();
3810
3811        // scalar of different data type with decimal array
3812        let decimal_scalar = ScalarValue::Decimal128(Some(123_456), 10, 3);
3813        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
3814        // scalar == array
3815        apply_logic_op_scalar_arr(
3816            &schema,
3817            &decimal_scalar,
3818            &(Arc::new(Int64Array::from(vec![Some(124), None])) as ArrayRef),
3819            Operator::Eq,
3820            &BooleanArray::from(vec![Some(false), None]),
3821        )
3822        .unwrap();
3823
3824        // array != scalar
3825        apply_logic_op_arr_scalar(
3826            &schema,
3827            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(1)])) as ArrayRef),
3828            &decimal_scalar,
3829            Operator::NotEq,
3830            &BooleanArray::from(vec![Some(true), None, Some(true)]),
3831        )
3832        .unwrap();
3833
3834        // array < scalar
3835        apply_logic_op_arr_scalar(
3836            &schema,
3837            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(124)])) as ArrayRef),
3838            &decimal_scalar,
3839            Operator::Lt,
3840            &BooleanArray::from(vec![Some(true), None, Some(false)]),
3841        )
3842        .unwrap();
3843
3844        // array > scalar
3845        apply_logic_op_arr_scalar(
3846            &schema,
3847            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(124)])) as ArrayRef),
3848            &decimal_scalar,
3849            Operator::Gt,
3850            &BooleanArray::from(vec![Some(false), None, Some(true)]),
3851        )
3852        .unwrap();
3853
3854        let schema =
3855            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
3856        // array == scalar
3857        apply_logic_op_arr_scalar(
3858            &schema,
3859            &(Arc::new(Float64Array::from(vec![Some(123.456), None, Some(123.457)]))
3860                as ArrayRef),
3861            &decimal_scalar,
3862            Operator::Eq,
3863            &BooleanArray::from(vec![Some(true), None, Some(false)]),
3864        )
3865        .unwrap();
3866
3867        // array <= scalar
3868        apply_logic_op_arr_scalar(
3869            &schema,
3870            &(Arc::new(Float64Array::from(vec![
3871                Some(123.456),
3872                None,
3873                Some(123.457),
3874                Some(123.45),
3875            ])) as ArrayRef),
3876            &decimal_scalar,
3877            Operator::LtEq,
3878            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3879        )
3880        .unwrap();
3881        // array >= scalar
3882        apply_logic_op_arr_scalar(
3883            &schema,
3884            &(Arc::new(Float64Array::from(vec![
3885                Some(123.456),
3886                None,
3887                Some(123.457),
3888                Some(123.45),
3889            ])) as ArrayRef),
3890            &decimal_scalar,
3891            Operator::GtEq,
3892            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3893        )
3894        .unwrap();
3895
3896        let value: i128 = 123;
3897        let decimal_array = Arc::new(create_decimal_array(
3898            &[Some(value), None, Some(value - 1), Some(value + 1)],
3899            10,
3900            0,
3901        )) as ArrayRef;
3902
3903        // comparison array op for decimal array
3904        let schema = Arc::new(Schema::new(vec![
3905            Field::new("a", DataType::Decimal128(10, 0), true),
3906            Field::new("b", DataType::Decimal128(10, 0), true),
3907        ]));
3908        let right_decimal_array = Arc::new(create_decimal_array(
3909            &[
3910                Some(value - 1),
3911                Some(value),
3912                Some(value + 1),
3913                Some(value + 1),
3914            ],
3915            10,
3916            0,
3917        )) as ArrayRef;
3918
3919        apply_logic_op(
3920            &schema,
3921            &decimal_array,
3922            &right_decimal_array,
3923            Operator::Eq,
3924            BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3925        )
3926        .unwrap();
3927
3928        apply_logic_op(
3929            &schema,
3930            &decimal_array,
3931            &right_decimal_array,
3932            Operator::NotEq,
3933            BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3934        )
3935        .unwrap();
3936
3937        apply_logic_op(
3938            &schema,
3939            &decimal_array,
3940            &right_decimal_array,
3941            Operator::Lt,
3942            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3943        )
3944        .unwrap();
3945
3946        apply_logic_op(
3947            &schema,
3948            &decimal_array,
3949            &right_decimal_array,
3950            Operator::LtEq,
3951            BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3952        )
3953        .unwrap();
3954
3955        apply_logic_op(
3956            &schema,
3957            &decimal_array,
3958            &right_decimal_array,
3959            Operator::Gt,
3960            BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3961        )
3962        .unwrap();
3963
3964        apply_logic_op(
3965            &schema,
3966            &decimal_array,
3967            &right_decimal_array,
3968            Operator::GtEq,
3969            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3970        )
3971        .unwrap();
3972
3973        // compare decimal array with other array type
3974        let value: i64 = 123;
3975        let schema = Arc::new(Schema::new(vec![
3976            Field::new("a", DataType::Int64, true),
3977            Field::new("b", DataType::Decimal128(10, 0), true),
3978        ]));
3979
3980        let int64_array = Arc::new(Int64Array::from(vec![
3981            Some(value),
3982            Some(value - 1),
3983            Some(value),
3984            Some(value + 1),
3985        ])) as ArrayRef;
3986
3987        // eq: int64array == decimal array
3988        apply_logic_op(
3989            &schema,
3990            &int64_array,
3991            &decimal_array,
3992            Operator::Eq,
3993            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3994        )
3995        .unwrap();
3996        // neq: int64array != decimal array
3997        apply_logic_op(
3998            &schema,
3999            &int64_array,
4000            &decimal_array,
4001            Operator::NotEq,
4002            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
4003        )
4004        .unwrap();
4005
4006        let schema = Arc::new(Schema::new(vec![
4007            Field::new("a", DataType::Float64, true),
4008            Field::new("b", DataType::Decimal128(10, 2), true),
4009        ]));
4010
4011        let value: i128 = 123;
4012        let decimal_array = Arc::new(create_decimal_array(
4013            &[
4014                Some(value), // 1.23
4015                None,
4016                Some(value - 1), // 1.22
4017                Some(value + 1), // 1.24
4018            ],
4019            10,
4020            2,
4021        )) as ArrayRef;
4022        let float64_array = Arc::new(Float64Array::from(vec![
4023            Some(1.23),
4024            Some(1.22),
4025            Some(1.23),
4026            Some(1.24),
4027        ])) as ArrayRef;
4028        // lt: float64array < decimal array
4029        apply_logic_op(
4030            &schema,
4031            &float64_array,
4032            &decimal_array,
4033            Operator::Lt,
4034            BooleanArray::from(vec![Some(false), None, Some(false), Some(false)]),
4035        )
4036        .unwrap();
4037        // lt_eq: float64array <= decimal array
4038        apply_logic_op(
4039            &schema,
4040            &float64_array,
4041            &decimal_array,
4042            Operator::LtEq,
4043            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
4044        )
4045        .unwrap();
4046        // gt: float64array > decimal array
4047        apply_logic_op(
4048            &schema,
4049            &float64_array,
4050            &decimal_array,
4051            Operator::Gt,
4052            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
4053        )
4054        .unwrap();
4055        apply_logic_op(
4056            &schema,
4057            &float64_array,
4058            &decimal_array,
4059            Operator::GtEq,
4060            BooleanArray::from(vec![Some(true), None, Some(true), Some(true)]),
4061        )
4062        .unwrap();
4063        // is distinct: float64array is distinct decimal array
4064        // TODO: now we do not refactor the `is distinct or is not distinct` rule of coercion.
4065        // traced by https://github.com/apache/datafusion/issues/1590
4066        // the decimal array will be casted to float64array
4067        apply_logic_op(
4068            &schema,
4069            &float64_array,
4070            &decimal_array,
4071            Operator::IsDistinctFrom,
4072            BooleanArray::from(vec![Some(false), Some(true), Some(true), Some(false)]),
4073        )
4074        .unwrap();
4075        // is not distinct
4076        apply_logic_op(
4077            &schema,
4078            &float64_array,
4079            &decimal_array,
4080            Operator::IsNotDistinctFrom,
4081            BooleanArray::from(vec![Some(true), Some(false), Some(false), Some(true)]),
4082        )
4083        .unwrap();
4084
4085        Ok(())
4086    }
4087
4088    fn apply_decimal_arithmetic_op(
4089        schema: &SchemaRef,
4090        left: &ArrayRef,
4091        right: &ArrayRef,
4092        op: Operator,
4093        expected: ArrayRef,
4094    ) -> Result<()> {
4095        let arithmetic_op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
4096        let data: Vec<ArrayRef> = vec![Arc::clone(left), Arc::clone(right)];
4097        let batch = RecordBatch::try_new(Arc::clone(schema), data)?;
4098        let result = arithmetic_op
4099            .evaluate(&batch)?
4100            .into_array(batch.num_rows())
4101            .expect("Failed to convert to array");
4102
4103        assert_eq!(result.as_ref(), expected.as_ref());
4104        Ok(())
4105    }
4106
4107    #[test]
4108    fn arithmetic_decimal_expr_test() -> Result<()> {
4109        let schema = Arc::new(Schema::new(vec![
4110            Field::new("a", DataType::Int32, true),
4111            Field::new("b", DataType::Decimal128(10, 2), true),
4112        ]));
4113        let value: i128 = 123;
4114        let decimal_array = Arc::new(create_decimal_array(
4115            &[
4116                Some(value), // 1.23
4117                None,
4118                Some(value - 1), // 1.22
4119                Some(value + 1), // 1.24
4120            ],
4121            10,
4122            2,
4123        )) as ArrayRef;
4124        let int32_array = Arc::new(Int32Array::from(vec![
4125            Some(123),
4126            Some(122),
4127            Some(123),
4128            Some(124),
4129        ])) as ArrayRef;
4130
4131        // add: Int32array add decimal array
4132        let expect = Arc::new(create_decimal_array(
4133            &[Some(12423), None, Some(12422), Some(12524)],
4134            13,
4135            2,
4136        )) as ArrayRef;
4137        apply_decimal_arithmetic_op(
4138            &schema,
4139            &int32_array,
4140            &decimal_array,
4141            Operator::Plus,
4142            expect,
4143        )
4144        .unwrap();
4145
4146        // subtract: decimal array subtract int32 array
4147        let schema = Arc::new(Schema::new(vec![
4148            Field::new("a", DataType::Decimal128(10, 2), true),
4149            Field::new("b", DataType::Int32, true),
4150        ]));
4151        let expect = Arc::new(create_decimal_array(
4152            &[Some(-12177), None, Some(-12178), Some(-12276)],
4153            13,
4154            2,
4155        )) as ArrayRef;
4156        apply_decimal_arithmetic_op(
4157            &schema,
4158            &decimal_array,
4159            &int32_array,
4160            Operator::Minus,
4161            expect,
4162        )
4163        .unwrap();
4164
4165        // multiply: decimal array multiply int32 array
4166        let expect = Arc::new(create_decimal_array(
4167            &[Some(15129), None, Some(15006), Some(15376)],
4168            21,
4169            2,
4170        )) as ArrayRef;
4171        apply_decimal_arithmetic_op(
4172            &schema,
4173            &decimal_array,
4174            &int32_array,
4175            Operator::Multiply,
4176            expect,
4177        )
4178        .unwrap();
4179
4180        // divide: int32 array divide decimal array
4181        let schema = Arc::new(Schema::new(vec![
4182            Field::new("a", DataType::Int32, true),
4183            Field::new("b", DataType::Decimal128(10, 2), true),
4184        ]));
4185        let expect = Arc::new(create_decimal_array(
4186            &[Some(1000000), None, Some(1008196), Some(1000000)],
4187            16,
4188            4,
4189        )) as ArrayRef;
4190        apply_decimal_arithmetic_op(
4191            &schema,
4192            &int32_array,
4193            &decimal_array,
4194            Operator::Divide,
4195            expect,
4196        )
4197        .unwrap();
4198
4199        // modulus: int32 array modulus decimal array
4200        let schema = Arc::new(Schema::new(vec![
4201            Field::new("a", DataType::Int32, true),
4202            Field::new("b", DataType::Decimal128(10, 2), true),
4203        ]));
4204        let expect = Arc::new(create_decimal_array(
4205            &[Some(000), None, Some(100), Some(000)],
4206            10,
4207            2,
4208        )) as ArrayRef;
4209        apply_decimal_arithmetic_op(
4210            &schema,
4211            &int32_array,
4212            &decimal_array,
4213            Operator::Modulo,
4214            expect,
4215        )
4216        .unwrap();
4217
4218        Ok(())
4219    }
4220
4221    #[test]
4222    fn arithmetic_decimal_float_expr_test() -> Result<()> {
4223        let schema = Arc::new(Schema::new(vec![
4224            Field::new("a", DataType::Float64, true),
4225            Field::new("b", DataType::Decimal128(10, 2), true),
4226        ]));
4227        let value: i128 = 123;
4228        let decimal_array = Arc::new(create_decimal_array(
4229            &[
4230                Some(value), // 1.23
4231                None,
4232                Some(value - 1), // 1.22
4233                Some(value + 1), // 1.24
4234            ],
4235            10,
4236            2,
4237        )) as ArrayRef;
4238        let float64_array = Arc::new(Float64Array::from(vec![
4239            Some(123.0),
4240            Some(122.0),
4241            Some(123.0),
4242            Some(124.0),
4243        ])) as ArrayRef;
4244
4245        // add: float64 array add decimal array
4246        let expect = Arc::new(Float64Array::from(vec![
4247            Some(124.23),
4248            None,
4249            Some(124.22),
4250            Some(125.24),
4251        ])) as ArrayRef;
4252        apply_decimal_arithmetic_op(
4253            &schema,
4254            &float64_array,
4255            &decimal_array,
4256            Operator::Plus,
4257            expect,
4258        )
4259        .unwrap();
4260
4261        // subtract: decimal array subtract float64 array
4262        let schema = Arc::new(Schema::new(vec![
4263            Field::new("a", DataType::Float64, true),
4264            Field::new("b", DataType::Decimal128(10, 2), true),
4265        ]));
4266        let expect = Arc::new(Float64Array::from(vec![
4267            Some(121.77),
4268            None,
4269            Some(121.78),
4270            Some(122.76),
4271        ])) as ArrayRef;
4272        apply_decimal_arithmetic_op(
4273            &schema,
4274            &float64_array,
4275            &decimal_array,
4276            Operator::Minus,
4277            expect,
4278        )
4279        .unwrap();
4280
4281        // multiply: decimal array multiply float64 array
4282        let expect = Arc::new(Float64Array::from(vec![
4283            Some(151.29),
4284            None,
4285            Some(150.06),
4286            Some(153.76),
4287        ])) as ArrayRef;
4288        apply_decimal_arithmetic_op(
4289            &schema,
4290            &float64_array,
4291            &decimal_array,
4292            Operator::Multiply,
4293            expect,
4294        )
4295        .unwrap();
4296
4297        // divide: float64 array divide decimal array
4298        let schema = Arc::new(Schema::new(vec![
4299            Field::new("a", DataType::Float64, true),
4300            Field::new("b", DataType::Decimal128(10, 2), true),
4301        ]));
4302        let expect = Arc::new(Float64Array::from(vec![
4303            Some(100.0),
4304            None,
4305            Some(100.81967213114754),
4306            Some(100.0),
4307        ])) as ArrayRef;
4308        apply_decimal_arithmetic_op(
4309            &schema,
4310            &float64_array,
4311            &decimal_array,
4312            Operator::Divide,
4313            expect,
4314        )
4315        .unwrap();
4316
4317        // modulus: float64 array modulus decimal array
4318        let schema = Arc::new(Schema::new(vec![
4319            Field::new("a", DataType::Float64, true),
4320            Field::new("b", DataType::Decimal128(10, 2), true),
4321        ]));
4322        let expect = Arc::new(Float64Array::from(vec![
4323            Some(1.7763568394002505e-15),
4324            None,
4325            Some(1.0000000000000027),
4326            Some(8.881784197001252e-16),
4327        ])) as ArrayRef;
4328        apply_decimal_arithmetic_op(
4329            &schema,
4330            &float64_array,
4331            &decimal_array,
4332            Operator::Modulo,
4333            expect,
4334        )
4335        .unwrap();
4336
4337        Ok(())
4338    }
4339
4340    #[test]
4341    fn arithmetic_divide_zero() -> Result<()> {
4342        // other data type
4343        let schema = Arc::new(Schema::new(vec![
4344            Field::new("a", DataType::Int32, true),
4345            Field::new("b", DataType::Int32, true),
4346        ]));
4347        let a = Arc::new(Int32Array::from(vec![100]));
4348        let b = Arc::new(Int32Array::from(vec![0]));
4349
4350        let err = apply_arithmetic::<Int32Type>(
4351            schema,
4352            vec![a, b],
4353            Operator::Divide,
4354            Int32Array::from(vec![Some(4), Some(8), Some(16), Some(32), Some(64)]),
4355        )
4356        .unwrap_err();
4357
4358        let _expected = plan_datafusion_err!("Divide by zero");
4359
4360        assert!(matches!(err, ref _expected), "{err}");
4361
4362        // decimal
4363        let schema = Arc::new(Schema::new(vec![
4364            Field::new("a", DataType::Decimal128(25, 3), true),
4365            Field::new("b", DataType::Decimal128(25, 3), true),
4366        ]));
4367        let left_decimal_array = Arc::new(create_decimal_array(&[Some(1234567)], 25, 3));
4368        let right_decimal_array = Arc::new(create_decimal_array(&[Some(0)], 25, 3));
4369
4370        let err = apply_arithmetic::<Decimal128Type>(
4371            schema,
4372            vec![left_decimal_array, right_decimal_array],
4373            Operator::Divide,
4374            create_decimal_array(
4375                &[Some(12345670000000000000000000000000000), None],
4376                38,
4377                29,
4378            ),
4379        )
4380        .unwrap_err();
4381
4382        assert!(matches!(err, ref _expected), "{err}");
4383
4384        Ok(())
4385    }
4386
4387    #[test]
4388    fn bitwise_array_test() -> Result<()> {
4389        let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4390        let right =
4391            Arc::new(Int32Array::from(vec![Some(1), Some(3), Some(7)])) as ArrayRef;
4392        let mut result = bitwise_and_dyn(Arc::clone(&left), Arc::clone(&right))?;
4393        let expected = Int32Array::from(vec![Some(0), None, Some(3)]);
4394        assert_eq!(result.as_ref(), &expected);
4395
4396        result = bitwise_or_dyn(Arc::clone(&left), Arc::clone(&right))?;
4397        let expected = Int32Array::from(vec![Some(13), None, Some(15)]);
4398        assert_eq!(result.as_ref(), &expected);
4399
4400        result = bitwise_xor_dyn(Arc::clone(&left), Arc::clone(&right))?;
4401        let expected = Int32Array::from(vec![Some(13), None, Some(12)]);
4402        assert_eq!(result.as_ref(), &expected);
4403
4404        let left =
4405            Arc::new(UInt32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4406        let right =
4407            Arc::new(UInt32Array::from(vec![Some(1), Some(3), Some(7)])) as ArrayRef;
4408        let mut result = bitwise_and_dyn(Arc::clone(&left), Arc::clone(&right))?;
4409        let expected = UInt32Array::from(vec![Some(0), None, Some(3)]);
4410        assert_eq!(result.as_ref(), &expected);
4411
4412        result = bitwise_or_dyn(Arc::clone(&left), Arc::clone(&right))?;
4413        let expected = UInt32Array::from(vec![Some(13), None, Some(15)]);
4414        assert_eq!(result.as_ref(), &expected);
4415
4416        result = bitwise_xor_dyn(Arc::clone(&left), Arc::clone(&right))?;
4417        let expected = UInt32Array::from(vec![Some(13), None, Some(12)]);
4418        assert_eq!(result.as_ref(), &expected);
4419
4420        Ok(())
4421    }
4422
4423    #[test]
4424    fn bitwise_shift_array_test() -> Result<()> {
4425        let input = Arc::new(Int32Array::from(vec![Some(2), None, Some(10)])) as ArrayRef;
4426        let modules =
4427            Arc::new(Int32Array::from(vec![Some(2), Some(4), Some(8)])) as ArrayRef;
4428        let mut result =
4429            bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4430
4431        let expected = Int32Array::from(vec![Some(8), None, Some(2560)]);
4432        assert_eq!(result.as_ref(), &expected);
4433
4434        result = bitwise_shift_right_dyn(Arc::clone(&result), Arc::clone(&modules))?;
4435        assert_eq!(result.as_ref(), &input);
4436
4437        let input =
4438            Arc::new(UInt32Array::from(vec![Some(2), None, Some(10)])) as ArrayRef;
4439        let modules =
4440            Arc::new(UInt32Array::from(vec![Some(2), Some(4), Some(8)])) as ArrayRef;
4441        let mut result =
4442            bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4443
4444        let expected = UInt32Array::from(vec![Some(8), None, Some(2560)]);
4445        assert_eq!(result.as_ref(), &expected);
4446
4447        result = bitwise_shift_right_dyn(Arc::clone(&result), Arc::clone(&modules))?;
4448        assert_eq!(result.as_ref(), &input);
4449        Ok(())
4450    }
4451
4452    #[test]
4453    fn bitwise_shift_array_overflow_test() -> Result<()> {
4454        let input = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
4455        let modules = Arc::new(Int32Array::from(vec![Some(100)])) as ArrayRef;
4456        let result = bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4457
4458        let expected = Int32Array::from(vec![Some(32)]);
4459        assert_eq!(result.as_ref(), &expected);
4460
4461        let input = Arc::new(UInt32Array::from(vec![Some(2)])) as ArrayRef;
4462        let modules = Arc::new(UInt32Array::from(vec![Some(100)])) as ArrayRef;
4463        let result = bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4464
4465        let expected = UInt32Array::from(vec![Some(32)]);
4466        assert_eq!(result.as_ref(), &expected);
4467        Ok(())
4468    }
4469
4470    #[test]
4471    fn bitwise_scalar_test() -> Result<()> {
4472        let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4473        let right = ScalarValue::from(3i32);
4474        let mut result = bitwise_and_dyn_scalar(&left, right.clone()).unwrap()?;
4475        let expected = Int32Array::from(vec![Some(0), None, Some(3)]);
4476        assert_eq!(result.as_ref(), &expected);
4477
4478        result = bitwise_or_dyn_scalar(&left, right.clone()).unwrap()?;
4479        let expected = Int32Array::from(vec![Some(15), None, Some(11)]);
4480        assert_eq!(result.as_ref(), &expected);
4481
4482        result = bitwise_xor_dyn_scalar(&left, right).unwrap()?;
4483        let expected = Int32Array::from(vec![Some(15), None, Some(8)]);
4484        assert_eq!(result.as_ref(), &expected);
4485
4486        let left =
4487            Arc::new(UInt32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4488        let right = ScalarValue::from(3u32);
4489        let mut result = bitwise_and_dyn_scalar(&left, right.clone()).unwrap()?;
4490        let expected = UInt32Array::from(vec![Some(0), None, Some(3)]);
4491        assert_eq!(result.as_ref(), &expected);
4492
4493        result = bitwise_or_dyn_scalar(&left, right.clone()).unwrap()?;
4494        let expected = UInt32Array::from(vec![Some(15), None, Some(11)]);
4495        assert_eq!(result.as_ref(), &expected);
4496
4497        result = bitwise_xor_dyn_scalar(&left, right).unwrap()?;
4498        let expected = UInt32Array::from(vec![Some(15), None, Some(8)]);
4499        assert_eq!(result.as_ref(), &expected);
4500        Ok(())
4501    }
4502
4503    #[test]
4504    fn bitwise_shift_scalar_test() -> Result<()> {
4505        let input = Arc::new(Int32Array::from(vec![Some(2), None, Some(4)])) as ArrayRef;
4506        let module = ScalarValue::from(10i32);
4507        let mut result =
4508            bitwise_shift_left_dyn_scalar(&input, module.clone()).unwrap()?;
4509
4510        let expected = Int32Array::from(vec![Some(2048), None, Some(4096)]);
4511        assert_eq!(result.as_ref(), &expected);
4512
4513        result = bitwise_shift_right_dyn_scalar(&result, module).unwrap()?;
4514        assert_eq!(result.as_ref(), &input);
4515
4516        let input = Arc::new(UInt32Array::from(vec![Some(2), None, Some(4)])) as ArrayRef;
4517        let module = ScalarValue::from(10u32);
4518        let mut result =
4519            bitwise_shift_left_dyn_scalar(&input, module.clone()).unwrap()?;
4520
4521        let expected = UInt32Array::from(vec![Some(2048), None, Some(4096)]);
4522        assert_eq!(result.as_ref(), &expected);
4523
4524        result = bitwise_shift_right_dyn_scalar(&result, module).unwrap()?;
4525        assert_eq!(result.as_ref(), &input);
4526        Ok(())
4527    }
4528
4529    #[test]
4530    fn test_display_and_or_combo() {
4531        let expr = BinaryExpr::new(
4532            Arc::new(BinaryExpr::new(
4533                lit(ScalarValue::from(1)),
4534                Operator::And,
4535                lit(ScalarValue::from(2)),
4536            )),
4537            Operator::And,
4538            Arc::new(BinaryExpr::new(
4539                lit(ScalarValue::from(3)),
4540                Operator::And,
4541                lit(ScalarValue::from(4)),
4542            )),
4543        );
4544        assert_eq!(expr.to_string(), "1 AND 2 AND 3 AND 4");
4545
4546        let expr = BinaryExpr::new(
4547            Arc::new(BinaryExpr::new(
4548                lit(ScalarValue::from(1)),
4549                Operator::Or,
4550                lit(ScalarValue::from(2)),
4551            )),
4552            Operator::Or,
4553            Arc::new(BinaryExpr::new(
4554                lit(ScalarValue::from(3)),
4555                Operator::Or,
4556                lit(ScalarValue::from(4)),
4557            )),
4558        );
4559        assert_eq!(expr.to_string(), "1 OR 2 OR 3 OR 4");
4560
4561        let expr = BinaryExpr::new(
4562            Arc::new(BinaryExpr::new(
4563                lit(ScalarValue::from(1)),
4564                Operator::And,
4565                lit(ScalarValue::from(2)),
4566            )),
4567            Operator::Or,
4568            Arc::new(BinaryExpr::new(
4569                lit(ScalarValue::from(3)),
4570                Operator::And,
4571                lit(ScalarValue::from(4)),
4572            )),
4573        );
4574        assert_eq!(expr.to_string(), "1 AND 2 OR 3 AND 4");
4575
4576        let expr = BinaryExpr::new(
4577            Arc::new(BinaryExpr::new(
4578                lit(ScalarValue::from(1)),
4579                Operator::Or,
4580                lit(ScalarValue::from(2)),
4581            )),
4582            Operator::And,
4583            Arc::new(BinaryExpr::new(
4584                lit(ScalarValue::from(3)),
4585                Operator::Or,
4586                lit(ScalarValue::from(4)),
4587            )),
4588        );
4589        assert_eq!(expr.to_string(), "(1 OR 2) AND (3 OR 4)");
4590    }
4591
4592    #[test]
4593    fn test_to_result_type_array() {
4594        let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
4595        let keys = Int8Array::from(vec![Some(0), None, Some(2), Some(3)]);
4596        let dictionary =
4597            Arc::new(DictionaryArray::try_new(keys, values).unwrap()) as ArrayRef;
4598
4599        // Casting Dictionary to Int32
4600        let casted = to_result_type_array(
4601            &Operator::Plus,
4602            Arc::clone(&dictionary),
4603            &DataType::Int32,
4604        )
4605        .unwrap();
4606        assert_eq!(
4607            &casted,
4608            &(Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]))
4609                as ArrayRef)
4610        );
4611
4612        // Array has same datatype as result type, no casting
4613        let casted = to_result_type_array(
4614            &Operator::Plus,
4615            Arc::clone(&dictionary),
4616            dictionary.data_type(),
4617        )
4618        .unwrap();
4619        assert_eq!(&casted, &dictionary);
4620
4621        // Not numerical operator, no casting
4622        let casted = to_result_type_array(
4623            &Operator::Eq,
4624            Arc::clone(&dictionary),
4625            &DataType::Int32,
4626        )
4627        .unwrap();
4628        assert_eq!(&casted, &dictionary);
4629    }
4630
4631    #[test]
4632    fn test_add_with_overflow() -> Result<()> {
4633        // create test data
4634        let l = Arc::new(Int32Array::from(vec![1, i32::MAX]));
4635        let r = Arc::new(Int32Array::from(vec![2, 1]));
4636        let schema = Arc::new(Schema::new(vec![
4637            Field::new("l", DataType::Int32, false),
4638            Field::new("r", DataType::Int32, false),
4639        ]));
4640        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4641
4642        // create expression
4643        let expr = BinaryExpr::new(
4644            Arc::new(Column::new("l", 0)),
4645            Operator::Plus,
4646            Arc::new(Column::new("r", 1)),
4647        )
4648        .with_fail_on_overflow(true);
4649
4650        // evaluate expression
4651        let result = expr.evaluate(&batch);
4652        assert!(result
4653            .err()
4654            .unwrap()
4655            .to_string()
4656            .contains("Overflow happened on: 2147483647 + 1"));
4657        Ok(())
4658    }
4659
4660    #[test]
4661    fn test_subtract_with_overflow() -> Result<()> {
4662        // create test data
4663        let l = Arc::new(Int32Array::from(vec![1, i32::MIN]));
4664        let r = Arc::new(Int32Array::from(vec![2, 1]));
4665        let schema = Arc::new(Schema::new(vec![
4666            Field::new("l", DataType::Int32, false),
4667            Field::new("r", DataType::Int32, false),
4668        ]));
4669        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4670
4671        // create expression
4672        let expr = BinaryExpr::new(
4673            Arc::new(Column::new("l", 0)),
4674            Operator::Minus,
4675            Arc::new(Column::new("r", 1)),
4676        )
4677        .with_fail_on_overflow(true);
4678
4679        // evaluate expression
4680        let result = expr.evaluate(&batch);
4681        assert!(result
4682            .err()
4683            .unwrap()
4684            .to_string()
4685            .contains("Overflow happened on: -2147483648 - 1"));
4686        Ok(())
4687    }
4688
4689    #[test]
4690    fn test_mul_with_overflow() -> Result<()> {
4691        // create test data
4692        let l = Arc::new(Int32Array::from(vec![1, i32::MAX]));
4693        let r = Arc::new(Int32Array::from(vec![2, 2]));
4694        let schema = Arc::new(Schema::new(vec![
4695            Field::new("l", DataType::Int32, false),
4696            Field::new("r", DataType::Int32, false),
4697        ]));
4698        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4699
4700        // create expression
4701        let expr = BinaryExpr::new(
4702            Arc::new(Column::new("l", 0)),
4703            Operator::Multiply,
4704            Arc::new(Column::new("r", 1)),
4705        )
4706        .with_fail_on_overflow(true);
4707
4708        // evaluate expression
4709        let result = expr.evaluate(&batch);
4710        assert!(result
4711            .err()
4712            .unwrap()
4713            .to_string()
4714            .contains("Overflow happened on: 2147483647 * 2"));
4715        Ok(())
4716    }
4717
4718    /// Test helper for SIMILAR TO binary operation
4719    fn apply_similar_to(
4720        schema: &SchemaRef,
4721        va: Vec<&str>,
4722        vb: Vec<&str>,
4723        negated: bool,
4724        case_insensitive: bool,
4725        expected: &BooleanArray,
4726    ) -> Result<()> {
4727        let a = StringArray::from(va);
4728        let b = StringArray::from(vb);
4729        let op = similar_to(
4730            negated,
4731            case_insensitive,
4732            col("a", schema)?,
4733            col("b", schema)?,
4734        )?;
4735        let batch =
4736            RecordBatch::try_new(Arc::clone(schema), vec![Arc::new(a), Arc::new(b)])?;
4737        let result = op
4738            .evaluate(&batch)?
4739            .into_array(batch.num_rows())
4740            .expect("Failed to convert to array");
4741        assert_eq!(result.as_ref(), expected);
4742
4743        Ok(())
4744    }
4745
4746    #[test]
4747    fn test_similar_to() {
4748        let schema = Arc::new(Schema::new(vec![
4749            Field::new("a", DataType::Utf8, false),
4750            Field::new("b", DataType::Utf8, false),
4751        ]));
4752
4753        let expected = [Some(true), Some(false)].iter().collect();
4754        // case-sensitive
4755        apply_similar_to(
4756            &schema,
4757            vec!["hello world", "Hello World"],
4758            vec!["hello.*", "hello.*"],
4759            false,
4760            false,
4761            &expected,
4762        )
4763        .unwrap();
4764        // case-insensitive
4765        apply_similar_to(
4766            &schema,
4767            vec!["hello world", "bye"],
4768            vec!["hello.*", "hello.*"],
4769            false,
4770            true,
4771            &expected,
4772        )
4773        .unwrap();
4774    }
4775
4776    pub fn binary_expr(
4777        left: Arc<dyn PhysicalExpr>,
4778        op: Operator,
4779        right: Arc<dyn PhysicalExpr>,
4780        schema: &Schema,
4781    ) -> Result<BinaryExpr> {
4782        Ok(binary_op(left, op, right, schema)?
4783            .as_any()
4784            .downcast_ref::<BinaryExpr>()
4785            .unwrap()
4786            .clone())
4787    }
4788
4789    /// Test for Uniform-Uniform, Unknown-Uniform, Uniform-Unknown and Unknown-Unknown evaluation.
4790    #[test]
4791    fn test_evaluate_statistics_combination_of_range_holders() -> Result<()> {
4792        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4793        let a = Arc::new(Column::new("a", 0)) as _;
4794        let b = lit(ScalarValue::from(12.0));
4795
4796        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4797        let right_interval = Interval::make(Some(12.0), Some(36.0))?;
4798        let (left_mean, right_mean) = (ScalarValue::from(6.0), ScalarValue::from(24.0));
4799        let (left_med, right_med) = (ScalarValue::from(6.0), ScalarValue::from(24.0));
4800
4801        for children in [
4802            vec![
4803                &Distribution::new_uniform(left_interval.clone())?,
4804                &Distribution::new_uniform(right_interval.clone())?,
4805            ],
4806            vec![
4807                &Distribution::new_generic(
4808                    left_mean.clone(),
4809                    left_med.clone(),
4810                    ScalarValue::Float64(None),
4811                    left_interval.clone(),
4812                )?,
4813                &Distribution::new_uniform(right_interval.clone())?,
4814            ],
4815            vec![
4816                &Distribution::new_uniform(right_interval.clone())?,
4817                &Distribution::new_generic(
4818                    right_mean.clone(),
4819                    right_med.clone(),
4820                    ScalarValue::Float64(None),
4821                    right_interval.clone(),
4822                )?,
4823            ],
4824            vec![
4825                &Distribution::new_generic(
4826                    left_mean.clone(),
4827                    left_med.clone(),
4828                    ScalarValue::Float64(None),
4829                    left_interval.clone(),
4830                )?,
4831                &Distribution::new_generic(
4832                    right_mean.clone(),
4833                    right_med.clone(),
4834                    ScalarValue::Float64(None),
4835                    right_interval.clone(),
4836                )?,
4837            ],
4838        ] {
4839            let ops = vec![
4840                Operator::Plus,
4841                Operator::Minus,
4842                Operator::Multiply,
4843                Operator::Divide,
4844            ];
4845
4846            for op in ops {
4847                let expr = binary_expr(Arc::clone(&a), op, Arc::clone(&b), schema)?;
4848                assert_eq!(
4849                    expr.evaluate_statistics(&children)?,
4850                    new_generic_from_binary_op(&op, children[0], children[1])?
4851                );
4852            }
4853        }
4854        Ok(())
4855    }
4856
4857    #[test]
4858    fn test_evaluate_statistics_bernoulli() -> Result<()> {
4859        let schema = &Schema::new(vec![
4860            Field::new("a", DataType::Int64, false),
4861            Field::new("b", DataType::Int64, false),
4862        ]);
4863        let a = Arc::new(Column::new("a", 0)) as _;
4864        let b = Arc::new(Column::new("b", 1)) as _;
4865        let eq = Arc::new(binary_expr(
4866            Arc::clone(&a),
4867            Operator::Eq,
4868            Arc::clone(&b),
4869            schema,
4870        )?);
4871        let neq = Arc::new(binary_expr(a, Operator::NotEq, b, schema)?);
4872
4873        let left_stat = &Distribution::new_uniform(Interval::make(Some(0), Some(7))?)?;
4874        let right_stat = &Distribution::new_uniform(Interval::make(Some(4), Some(11))?)?;
4875
4876        // Intervals: [0, 7], [4, 11].
4877        // The intersection is [4, 7], so the probability of equality is 4 / 64 = 1 / 16.
4878        assert_eq!(
4879            eq.evaluate_statistics(&[left_stat, right_stat])?,
4880            Distribution::new_bernoulli(ScalarValue::from(1.0 / 16.0))?
4881        );
4882
4883        // The probability of being distinct is 1 - 1 / 16 = 15 / 16.
4884        assert_eq!(
4885            neq.evaluate_statistics(&[left_stat, right_stat])?,
4886            Distribution::new_bernoulli(ScalarValue::from(15.0 / 16.0))?
4887        );
4888
4889        Ok(())
4890    }
4891
4892    #[test]
4893    fn test_propagate_statistics_combination_of_range_holders_arithmetic() -> Result<()> {
4894        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4895        let a = Arc::new(Column::new("a", 0)) as _;
4896        let b = lit(ScalarValue::from(12.0));
4897
4898        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4899        let right_interval = Interval::make(Some(12.0), Some(36.0))?;
4900
4901        let parent = Distribution::new_uniform(Interval::make(Some(-432.), Some(432.))?)?;
4902        let children = vec![
4903            vec![
4904                Distribution::new_uniform(left_interval.clone())?,
4905                Distribution::new_uniform(right_interval.clone())?,
4906            ],
4907            vec![
4908                Distribution::new_generic(
4909                    ScalarValue::from(6.),
4910                    ScalarValue::from(6.),
4911                    ScalarValue::Float64(None),
4912                    left_interval.clone(),
4913                )?,
4914                Distribution::new_uniform(right_interval.clone())?,
4915            ],
4916            vec![
4917                Distribution::new_uniform(left_interval.clone())?,
4918                Distribution::new_generic(
4919                    ScalarValue::from(12.),
4920                    ScalarValue::from(12.),
4921                    ScalarValue::Float64(None),
4922                    right_interval.clone(),
4923                )?,
4924            ],
4925            vec![
4926                Distribution::new_generic(
4927                    ScalarValue::from(6.),
4928                    ScalarValue::from(6.),
4929                    ScalarValue::Float64(None),
4930                    left_interval.clone(),
4931                )?,
4932                Distribution::new_generic(
4933                    ScalarValue::from(12.),
4934                    ScalarValue::from(12.),
4935                    ScalarValue::Float64(None),
4936                    right_interval.clone(),
4937                )?,
4938            ],
4939        ];
4940
4941        let ops = vec![
4942            Operator::Plus,
4943            Operator::Minus,
4944            Operator::Multiply,
4945            Operator::Divide,
4946        ];
4947
4948        for child_view in children {
4949            let child_refs = child_view.iter().collect::<Vec<_>>();
4950            for op in &ops {
4951                let expr = binary_expr(Arc::clone(&a), *op, Arc::clone(&b), schema)?;
4952                assert_eq!(
4953                    expr.propagate_statistics(&parent, child_refs.as_slice())?,
4954                    Some(child_view.clone())
4955                );
4956            }
4957        }
4958        Ok(())
4959    }
4960
4961    #[test]
4962    fn test_propagate_statistics_combination_of_range_holders_comparison() -> Result<()> {
4963        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4964        let a = Arc::new(Column::new("a", 0)) as _;
4965        let b = lit(ScalarValue::from(12.0));
4966
4967        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4968        let right_interval = Interval::make(Some(6.0), Some(18.0))?;
4969
4970        let one = ScalarValue::from(1.0);
4971        let parent = Distribution::new_bernoulli(one)?;
4972        let children = vec![
4973            vec![
4974                Distribution::new_uniform(left_interval.clone())?,
4975                Distribution::new_uniform(right_interval.clone())?,
4976            ],
4977            vec![
4978                Distribution::new_generic(
4979                    ScalarValue::from(6.),
4980                    ScalarValue::from(6.),
4981                    ScalarValue::Float64(None),
4982                    left_interval.clone(),
4983                )?,
4984                Distribution::new_uniform(right_interval.clone())?,
4985            ],
4986            vec![
4987                Distribution::new_uniform(left_interval.clone())?,
4988                Distribution::new_generic(
4989                    ScalarValue::from(12.),
4990                    ScalarValue::from(12.),
4991                    ScalarValue::Float64(None),
4992                    right_interval.clone(),
4993                )?,
4994            ],
4995            vec![
4996                Distribution::new_generic(
4997                    ScalarValue::from(6.),
4998                    ScalarValue::from(6.),
4999                    ScalarValue::Float64(None),
5000                    left_interval.clone(),
5001                )?,
5002                Distribution::new_generic(
5003                    ScalarValue::from(12.),
5004                    ScalarValue::from(12.),
5005                    ScalarValue::Float64(None),
5006                    right_interval.clone(),
5007                )?,
5008            ],
5009        ];
5010
5011        let ops = vec![
5012            Operator::Eq,
5013            Operator::Gt,
5014            Operator::GtEq,
5015            Operator::Lt,
5016            Operator::LtEq,
5017        ];
5018
5019        for child_view in children {
5020            let child_refs = child_view.iter().collect::<Vec<_>>();
5021            for op in &ops {
5022                let expr = binary_expr(Arc::clone(&a), *op, Arc::clone(&b), schema)?;
5023                assert!(expr
5024                    .propagate_statistics(&parent, child_refs.as_slice())?
5025                    .is_some());
5026            }
5027        }
5028
5029        Ok(())
5030    }
5031
5032    #[test]
5033    fn test_fmt_sql() -> Result<()> {
5034        let schema = Schema::new(vec![
5035            Field::new("a", DataType::Int32, false),
5036            Field::new("b", DataType::Int32, false),
5037        ]);
5038
5039        // Test basic binary expressions
5040        let simple_expr = binary_expr(
5041            col("a", &schema)?,
5042            Operator::Plus,
5043            col("b", &schema)?,
5044            &schema,
5045        )?;
5046        let display_string = simple_expr.to_string();
5047        assert_eq!(display_string, "a@0 + b@1");
5048        let sql_string = fmt_sql(&simple_expr).to_string();
5049        assert_eq!(sql_string, "a + b");
5050
5051        // Test nested expressions with different operator precedence
5052        let nested_expr = binary_expr(
5053            Arc::new(binary_expr(
5054                col("a", &schema)?,
5055                Operator::Plus,
5056                col("b", &schema)?,
5057                &schema,
5058            )?),
5059            Operator::Multiply,
5060            col("b", &schema)?,
5061            &schema,
5062        )?;
5063        let display_string = nested_expr.to_string();
5064        assert_eq!(display_string, "(a@0 + b@1) * b@1");
5065        let sql_string = fmt_sql(&nested_expr).to_string();
5066        assert_eq!(sql_string, "(a + b) * b");
5067
5068        // Test nested expressions with same operator precedence
5069        let nested_same_prec = binary_expr(
5070            Arc::new(binary_expr(
5071                col("a", &schema)?,
5072                Operator::Plus,
5073                col("b", &schema)?,
5074                &schema,
5075            )?),
5076            Operator::Plus,
5077            col("b", &schema)?,
5078            &schema,
5079        )?;
5080        let display_string = nested_same_prec.to_string();
5081        assert_eq!(display_string, "a@0 + b@1 + b@1");
5082        let sql_string = fmt_sql(&nested_same_prec).to_string();
5083        assert_eq!(sql_string, "a + b + b");
5084
5085        // Test with literals
5086        let lit_expr = binary_expr(
5087            col("a", &schema)?,
5088            Operator::Eq,
5089            lit(ScalarValue::Int32(Some(42))),
5090            &schema,
5091        )?;
5092        let display_string = lit_expr.to_string();
5093        assert_eq!(display_string, "a@0 = 42");
5094        let sql_string = fmt_sql(&lit_expr).to_string();
5095        assert_eq!(sql_string, "a = 42");
5096
5097        Ok(())
5098    }
5099
5100    #[test]
5101    fn test_check_short_circuit() {
5102        // Test with non-nullable arrays
5103        let schema = Arc::new(Schema::new(vec![
5104            Field::new("a", DataType::Int32, false),
5105            Field::new("b", DataType::Int32, false),
5106        ]));
5107        let a_array = Int32Array::from(vec![1, 3, 4, 5, 6]);
5108        let b_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
5109        let batch = RecordBatch::try_new(
5110            Arc::clone(&schema),
5111            vec![Arc::new(a_array), Arc::new(b_array)],
5112        )
5113        .unwrap();
5114
5115        // op: AND left: all false
5116        let left_expr = logical2physical(&logical_col("a").eq(expr_lit(2)), &schema);
5117        let left_value = left_expr.evaluate(&batch).unwrap();
5118        assert!(matches!(
5119            check_short_circuit(&left_value, &Operator::And),
5120            ShortCircuitStrategy::ReturnLeft
5121        ));
5122
5123        // op: AND left: not all false
5124        let left_expr = logical2physical(&logical_col("a").eq(expr_lit(3)), &schema);
5125        let left_value = left_expr.evaluate(&batch).unwrap();
5126        let ColumnarValue::Array(array) = &left_value else {
5127            panic!("Expected ColumnarValue::Array");
5128        };
5129        let ShortCircuitStrategy::PreSelection(value) =
5130            check_short_circuit(&left_value, &Operator::And)
5131        else {
5132            panic!("Expected ShortCircuitStrategy::PreSelection");
5133        };
5134        let expected_boolean_arr: Vec<_> =
5135            as_boolean_array(array).unwrap().iter().collect();
5136        let boolean_arr: Vec<_> = value.iter().collect();
5137        assert_eq!(expected_boolean_arr, boolean_arr);
5138
5139        // op: OR left: all true
5140        let left_expr = logical2physical(&logical_col("a").gt(expr_lit(0)), &schema);
5141        let left_value = left_expr.evaluate(&batch).unwrap();
5142        assert!(matches!(
5143            check_short_circuit(&left_value, &Operator::Or),
5144            ShortCircuitStrategy::ReturnLeft
5145        ));
5146
5147        // op: OR left: not all true
5148        let left_expr: Arc<dyn PhysicalExpr> =
5149            logical2physical(&logical_col("a").gt(expr_lit(2)), &schema);
5150        let left_value = left_expr.evaluate(&batch).unwrap();
5151        assert!(matches!(
5152            check_short_circuit(&left_value, &Operator::Or),
5153            ShortCircuitStrategy::None
5154        ));
5155
5156        // Test with nullable arrays and null values
5157        let schema_nullable = Arc::new(Schema::new(vec![
5158            Field::new("c", DataType::Boolean, true),
5159            Field::new("d", DataType::Boolean, true),
5160        ]));
5161
5162        // Create arrays with null values
5163        let c_array = Arc::new(BooleanArray::from(vec![
5164            Some(true),
5165            Some(false),
5166            None,
5167            Some(true),
5168            None,
5169        ])) as ArrayRef;
5170        let d_array = Arc::new(BooleanArray::from(vec![
5171            Some(false),
5172            Some(true),
5173            Some(false),
5174            None,
5175            Some(true),
5176        ])) as ArrayRef;
5177
5178        let batch_nullable = RecordBatch::try_new(
5179            Arc::clone(&schema_nullable),
5180            vec![Arc::clone(&c_array), Arc::clone(&d_array)],
5181        )
5182        .unwrap();
5183
5184        // Case: Mixed values with nulls - shouldn't short-circuit for AND
5185        let mixed_nulls = logical2physical(&logical_col("c"), &schema_nullable);
5186        let mixed_nulls_value = mixed_nulls.evaluate(&batch_nullable).unwrap();
5187        assert!(matches!(
5188            check_short_circuit(&mixed_nulls_value, &Operator::And),
5189            ShortCircuitStrategy::None
5190        ));
5191
5192        // Case: Mixed values with nulls - shouldn't short-circuit for OR
5193        assert!(matches!(
5194            check_short_circuit(&mixed_nulls_value, &Operator::Or),
5195            ShortCircuitStrategy::None
5196        ));
5197
5198        // Test with all nulls
5199        let all_nulls = Arc::new(BooleanArray::from(vec![None, None, None])) as ArrayRef;
5200        let null_batch = RecordBatch::try_new(
5201            Arc::new(Schema::new(vec![Field::new("e", DataType::Boolean, true)])),
5202            vec![all_nulls],
5203        )
5204        .unwrap();
5205
5206        let null_expr = logical2physical(&logical_col("e"), &null_batch.schema());
5207        let null_value = null_expr.evaluate(&null_batch).unwrap();
5208
5209        // All nulls shouldn't short-circuit for AND or OR
5210        assert!(matches!(
5211            check_short_circuit(&null_value, &Operator::And),
5212            ShortCircuitStrategy::None
5213        ));
5214        assert!(matches!(
5215            check_short_circuit(&null_value, &Operator::Or),
5216            ShortCircuitStrategy::None
5217        ));
5218
5219        // Test with scalar values
5220        // Scalar true
5221        let scalar_true = ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)));
5222        assert!(matches!(
5223            check_short_circuit(&scalar_true, &Operator::Or),
5224            ShortCircuitStrategy::ReturnLeft
5225        )); // Should short-circuit OR
5226        assert!(matches!(
5227            check_short_circuit(&scalar_true, &Operator::And),
5228            ShortCircuitStrategy::ReturnRight
5229        )); // Should return the RHS for AND
5230
5231        // Scalar false
5232        let scalar_false = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)));
5233        assert!(matches!(
5234            check_short_circuit(&scalar_false, &Operator::And),
5235            ShortCircuitStrategy::ReturnLeft
5236        )); // Should short-circuit AND
5237        assert!(matches!(
5238            check_short_circuit(&scalar_false, &Operator::Or),
5239            ShortCircuitStrategy::ReturnRight
5240        )); // Should return the RHS for OR
5241
5242        // Scalar null
5243        let scalar_null = ColumnarValue::Scalar(ScalarValue::Boolean(None));
5244        assert!(matches!(
5245            check_short_circuit(&scalar_null, &Operator::And),
5246            ShortCircuitStrategy::None
5247        ));
5248        assert!(matches!(
5249            check_short_circuit(&scalar_null, &Operator::Or),
5250            ShortCircuitStrategy::None
5251        ));
5252    }
5253
5254    /// Test for [pre_selection_scatter]
5255    /// Since [check_short_circuit] ensures that the left side does not contain null and is neither all_true nor all_false, as well as not being empty,
5256    /// the following tests have been designed:
5257    /// 1. Test sparse left with interleaved true/false
5258    /// 2. Test multiple consecutive true blocks
5259    /// 3. Test multiple consecutive true blocks
5260    /// 4. Test single true at first position
5261    /// 5. Test single true at last position
5262    /// 6. Test nulls in right array
5263    #[test]
5264    fn test_pre_selection_scatter() {
5265        fn create_bool_array(bools: Vec<bool>) -> BooleanArray {
5266            BooleanArray::from(bools.into_iter().map(Some).collect::<Vec<_>>())
5267        }
5268        // Test sparse left with interleaved true/false
5269        {
5270            // Left: [T, F, T, F, T]
5271            // Right: [F, T, F] (values for 3 true positions)
5272            let left = create_bool_array(vec![true, false, true, false, true]);
5273            let right = create_bool_array(vec![false, true, false]);
5274
5275            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5276            let result_arr = result.into_array(left.len()).unwrap();
5277
5278            let expected = create_bool_array(vec![false, false, true, false, false]);
5279            assert_eq!(&expected, result_arr.as_boolean());
5280        }
5281        // Test multiple consecutive true blocks
5282        {
5283            // Left: [F, T, T, F, T, T, T]
5284            // Right: [T, F, F, T, F]
5285            let left =
5286                create_bool_array(vec![false, true, true, false, true, true, true]);
5287            let right = create_bool_array(vec![true, false, false, true, false]);
5288
5289            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5290            let result_arr = result.into_array(left.len()).unwrap();
5291
5292            let expected =
5293                create_bool_array(vec![false, true, false, false, false, true, false]);
5294            assert_eq!(&expected, result_arr.as_boolean());
5295        }
5296        // Test single true at first position
5297        {
5298            // Left: [T, F, F]
5299            // Right: [F]
5300            let left = create_bool_array(vec![true, false, false]);
5301            let right = create_bool_array(vec![false]);
5302
5303            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5304            let result_arr = result.into_array(left.len()).unwrap();
5305
5306            let expected = create_bool_array(vec![false, false, false]);
5307            assert_eq!(&expected, result_arr.as_boolean());
5308        }
5309        // Test single true at last position
5310        {
5311            // Left: [F, F, T]
5312            // Right: [F]
5313            let left = create_bool_array(vec![false, false, true]);
5314            let right = create_bool_array(vec![false]);
5315
5316            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5317            let result_arr = result.into_array(left.len()).unwrap();
5318
5319            let expected = create_bool_array(vec![false, false, false]);
5320            assert_eq!(&expected, result_arr.as_boolean());
5321        }
5322        // Test nulls in right array
5323        {
5324            // Left: [F, T, F, T]
5325            // Right: [None, Some(false)] (with null at first position)
5326            let left = create_bool_array(vec![false, true, false, true]);
5327            let right = BooleanArray::from(vec![None, Some(false)]);
5328
5329            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5330            let result_arr = result.into_array(left.len()).unwrap();
5331
5332            let expected = BooleanArray::from(vec![
5333                Some(false),
5334                None, // null from right
5335                Some(false),
5336                Some(false),
5337            ]);
5338            assert_eq!(&expected, result_arr.as_boolean());
5339        }
5340    }
5341
5342    #[test]
5343    fn test_and_true_preselection_returns_lhs() {
5344        let schema =
5345            Arc::new(Schema::new(vec![Field::new("c", DataType::Boolean, false)]));
5346        let c_array = Arc::new(BooleanArray::from(vec![false, true, false, false, false]))
5347            as ArrayRef;
5348        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::clone(&c_array)])
5349            .unwrap();
5350
5351        let expr = logical2physical(&logical_col("c").and(expr_lit(true)), &schema);
5352
5353        let result = expr.evaluate(&batch).unwrap();
5354        let ColumnarValue::Array(result_arr) = result else {
5355            panic!("Expected ColumnarValue::Array");
5356        };
5357
5358        let expected: Vec<_> = c_array.as_boolean().iter().collect();
5359        let actual: Vec<_> = result_arr.as_boolean().iter().collect();
5360        assert_eq!(
5361            expected, actual,
5362            "AND with TRUE must equal LHS even with PreSelection"
5363        );
5364    }
5365
5366    #[test]
5367    fn test_evaluate_bounds_int32() {
5368        let schema = Schema::new(vec![
5369            Field::new("a", DataType::Int32, false),
5370            Field::new("b", DataType::Int32, false),
5371        ]);
5372
5373        let a = Arc::new(Column::new("a", 0)) as _;
5374        let b = Arc::new(Column::new("b", 1)) as _;
5375
5376        // Test addition bounds
5377        let add_expr =
5378            binary_expr(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema).unwrap();
5379        let add_bounds = add_expr
5380            .evaluate_bounds(&[
5381                &Interval::make(Some(1), Some(10)).unwrap(),
5382                &Interval::make(Some(5), Some(15)).unwrap(),
5383            ])
5384            .unwrap();
5385        assert_eq!(add_bounds, Interval::make(Some(6), Some(25)).unwrap());
5386
5387        // Test subtraction bounds
5388        let sub_expr =
5389            binary_expr(Arc::clone(&a), Operator::Minus, Arc::clone(&b), &schema)
5390                .unwrap();
5391        let sub_bounds = sub_expr
5392            .evaluate_bounds(&[
5393                &Interval::make(Some(1), Some(10)).unwrap(),
5394                &Interval::make(Some(5), Some(15)).unwrap(),
5395            ])
5396            .unwrap();
5397        assert_eq!(sub_bounds, Interval::make(Some(-14), Some(5)).unwrap());
5398
5399        // Test multiplication bounds
5400        let mul_expr =
5401            binary_expr(Arc::clone(&a), Operator::Multiply, Arc::clone(&b), &schema)
5402                .unwrap();
5403        let mul_bounds = mul_expr
5404            .evaluate_bounds(&[
5405                &Interval::make(Some(1), Some(10)).unwrap(),
5406                &Interval::make(Some(5), Some(15)).unwrap(),
5407            ])
5408            .unwrap();
5409        assert_eq!(mul_bounds, Interval::make(Some(5), Some(150)).unwrap());
5410
5411        // Test division bounds
5412        let div_expr =
5413            binary_expr(Arc::clone(&a), Operator::Divide, Arc::clone(&b), &schema)
5414                .unwrap();
5415        let div_bounds = div_expr
5416            .evaluate_bounds(&[
5417                &Interval::make(Some(10), Some(20)).unwrap(),
5418                &Interval::make(Some(2), Some(5)).unwrap(),
5419            ])
5420            .unwrap();
5421        assert_eq!(div_bounds, Interval::make(Some(2), Some(10)).unwrap());
5422    }
5423
5424    #[test]
5425    fn test_evaluate_bounds_bool() {
5426        let schema = Schema::new(vec![
5427            Field::new("a", DataType::Boolean, false),
5428            Field::new("b", DataType::Boolean, false),
5429        ]);
5430
5431        let a = Arc::new(Column::new("a", 0)) as _;
5432        let b = Arc::new(Column::new("b", 1)) as _;
5433
5434        // Test OR bounds
5435        let or_expr =
5436            binary_expr(Arc::clone(&a), Operator::Or, Arc::clone(&b), &schema).unwrap();
5437        let or_bounds = or_expr
5438            .evaluate_bounds(&[
5439                &Interval::make(Some(true), Some(true)).unwrap(),
5440                &Interval::make(Some(false), Some(false)).unwrap(),
5441            ])
5442            .unwrap();
5443        assert_eq!(or_bounds, Interval::make(Some(true), Some(true)).unwrap());
5444
5445        // Test AND bounds
5446        let and_expr =
5447            binary_expr(Arc::clone(&a), Operator::And, Arc::clone(&b), &schema).unwrap();
5448        let and_bounds = and_expr
5449            .evaluate_bounds(&[
5450                &Interval::make(Some(true), Some(true)).unwrap(),
5451                &Interval::make(Some(false), Some(false)).unwrap(),
5452            ])
5453            .unwrap();
5454        assert_eq!(
5455            and_bounds,
5456            Interval::make(Some(false), Some(false)).unwrap()
5457        );
5458    }
5459
5460    #[test]
5461    fn test_evaluate_nested_type() {
5462        let batch_schema = Arc::new(Schema::new(vec![
5463            Field::new(
5464                "a",
5465                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
5466                true,
5467            ),
5468            Field::new(
5469                "b",
5470                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
5471                true,
5472            ),
5473        ]));
5474
5475        let mut list_builder_a = ListBuilder::new(Int32Builder::new());
5476
5477        list_builder_a.append_value([Some(1)]);
5478        list_builder_a.append_value([Some(2)]);
5479        list_builder_a.append_value([]);
5480        list_builder_a.append_value([None]);
5481
5482        let list_array_a: ArrayRef = Arc::new(list_builder_a.finish());
5483
5484        let mut list_builder_b = ListBuilder::new(Int32Builder::new());
5485
5486        list_builder_b.append_value([Some(1)]);
5487        list_builder_b.append_value([Some(2)]);
5488        list_builder_b.append_value([]);
5489        list_builder_b.append_value([None]);
5490
5491        let list_array_b: ArrayRef = Arc::new(list_builder_b.finish());
5492
5493        let batch =
5494            RecordBatch::try_new(batch_schema, vec![list_array_a, list_array_b]).unwrap();
5495
5496        let schema = Arc::new(Schema::new(vec![
5497            Field::new(
5498                "a",
5499                DataType::List(Arc::new(Field::new("foo", DataType::Int32, true))),
5500                true,
5501            ),
5502            Field::new(
5503                "b",
5504                DataType::List(Arc::new(Field::new("bar", DataType::Int32, true))),
5505                true,
5506            ),
5507        ]));
5508
5509        let a = Arc::new(Column::new("a", 0)) as _;
5510        let b = Arc::new(Column::new("b", 1)) as _;
5511
5512        let eq_expr =
5513            binary_expr(Arc::clone(&a), Operator::Eq, Arc::clone(&b), &schema).unwrap();
5514
5515        let eq_result = eq_expr.evaluate(&batch).unwrap();
5516        let expected =
5517            BooleanArray::from_iter(vec![Some(true), Some(true), Some(true), Some(true)]);
5518        assert_eq!(eq_result.into_array(4).unwrap().as_boolean(), &expected);
5519    }
5520}