datafusion_physical_expr/expressions/
binary.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod kernels;
19
20use std::hash::Hash;
21use std::{any::Any, sync::Arc};
22
23use crate::expressions::binary::kernels::concat_elements_utf8view;
24use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
25use crate::PhysicalExpr;
26
27use arrow::array::*;
28use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
29use arrow::compute::kernels::cmp::*;
30use arrow::compute::kernels::comparison::{regexp_is_match, regexp_is_match_scalar};
31use arrow::compute::kernels::concat_elements::concat_elements_utf8;
32use arrow::compute::{cast, ilike, like, nilike, nlike};
33use arrow::datatypes::*;
34use arrow::error::ArrowError;
35use datafusion_common::cast::as_boolean_array;
36use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
37use datafusion_expr::binary::BinaryTypeCoercer;
38use datafusion_expr::interval_arithmetic::{apply_operator, Interval};
39use datafusion_expr::sort_properties::ExprProperties;
40use datafusion_expr::statistics::Distribution::{Bernoulli, Gaussian};
41use datafusion_expr::statistics::{
42    combine_bernoullis, combine_gaussians, create_bernoulli_from_comparison,
43    new_generic_from_binary_op, Distribution,
44};
45use datafusion_expr::{ColumnarValue, Operator};
46use datafusion_physical_expr_common::datum::{apply, apply_cmp, apply_cmp_for_nested};
47
48use kernels::{
49    bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar,
50    bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn,
51    bitwise_shift_right_dyn_scalar, bitwise_xor_dyn, bitwise_xor_dyn_scalar,
52};
53
54/// Binary expression
55#[derive(Debug, Clone, Eq)]
56pub struct BinaryExpr {
57    left: Arc<dyn PhysicalExpr>,
58    op: Operator,
59    right: Arc<dyn PhysicalExpr>,
60    /// Specifies whether an error is returned on overflow or not
61    fail_on_overflow: bool,
62}
63
64// Manually derive PartialEq and Hash to work around https://github.com/rust-lang/rust/issues/78808
65impl PartialEq for BinaryExpr {
66    fn eq(&self, other: &Self) -> bool {
67        self.left.eq(&other.left)
68            && self.op.eq(&other.op)
69            && self.right.eq(&other.right)
70            && self.fail_on_overflow.eq(&other.fail_on_overflow)
71    }
72}
73impl Hash for BinaryExpr {
74    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
75        self.left.hash(state);
76        self.op.hash(state);
77        self.right.hash(state);
78        self.fail_on_overflow.hash(state);
79    }
80}
81
82impl BinaryExpr {
83    /// Create new binary expression
84    pub fn new(
85        left: Arc<dyn PhysicalExpr>,
86        op: Operator,
87        right: Arc<dyn PhysicalExpr>,
88    ) -> Self {
89        Self {
90            left,
91            op,
92            right,
93            fail_on_overflow: false,
94        }
95    }
96
97    /// Create new binary expression with explicit fail_on_overflow value
98    pub fn with_fail_on_overflow(self, fail_on_overflow: bool) -> Self {
99        Self {
100            left: self.left,
101            op: self.op,
102            right: self.right,
103            fail_on_overflow,
104        }
105    }
106
107    /// Get the left side of the binary expression
108    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
109        &self.left
110    }
111
112    /// Get the right side of the binary expression
113    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
114        &self.right
115    }
116
117    /// Get the operator for this binary expression
118    pub fn op(&self) -> &Operator {
119        &self.op
120    }
121}
122
123impl std::fmt::Display for BinaryExpr {
124    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
125        // Put parentheses around child binary expressions so that we can see the difference
126        // between `(a OR b) AND c` and `a OR (b AND c)`. We only insert parentheses when needed,
127        // based on operator precedence. For example, `(a AND b) OR c` and `a AND b OR c` are
128        // equivalent and the parentheses are not necessary.
129
130        fn write_child(
131            f: &mut std::fmt::Formatter,
132            expr: &dyn PhysicalExpr,
133            precedence: u8,
134        ) -> std::fmt::Result {
135            if let Some(child) = expr.as_any().downcast_ref::<BinaryExpr>() {
136                let p = child.op.precedence();
137                if p == 0 || p < precedence {
138                    write!(f, "({child})")?;
139                } else {
140                    write!(f, "{child}")?;
141                }
142            } else {
143                write!(f, "{expr}")?;
144            }
145
146            Ok(())
147        }
148
149        let precedence = self.op.precedence();
150        write_child(f, self.left.as_ref(), precedence)?;
151        write!(f, " {} ", self.op)?;
152        write_child(f, self.right.as_ref(), precedence)
153    }
154}
155
156/// Invoke a boolean kernel on a pair of arrays
157#[inline]
158fn boolean_op(
159    left: &dyn Array,
160    right: &dyn Array,
161    op: impl FnOnce(&BooleanArray, &BooleanArray) -> Result<BooleanArray, ArrowError>,
162) -> Result<Arc<(dyn Array + 'static)>, ArrowError> {
163    let ll = as_boolean_array(left).expect("boolean_op failed to downcast left array");
164    let rr = as_boolean_array(right).expect("boolean_op failed to downcast right array");
165    op(ll, rr).map(|t| Arc::new(t) as _)
166}
167
168macro_rules! binary_string_array_flag_op {
169    ($LEFT:expr, $RIGHT:expr, $OP:ident, $NOT:expr, $FLAG:expr) => {{
170        match $LEFT.data_type() {
171            DataType::Utf8 => {
172                compute_utf8_flag_op!($LEFT, $RIGHT, $OP, StringArray, $NOT, $FLAG)
173            },
174            DataType::Utf8View => {
175                compute_utf8view_flag_op!($LEFT, $RIGHT, $OP, StringViewArray, $NOT, $FLAG)
176            }
177            DataType::LargeUtf8 => {
178                compute_utf8_flag_op!($LEFT, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG)
179            },
180            other => internal_err!(
181                "Data type {:?} not supported for binary_string_array_flag_op operation '{}' on string array",
182                other, stringify!($OP)
183            ),
184        }
185    }};
186}
187
188/// Invoke a compute kernel on a pair of binary data arrays with flags
189macro_rules! compute_utf8_flag_op {
190    ($LEFT:expr, $RIGHT:expr, $OP:ident, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
191        let ll = $LEFT
192            .as_any()
193            .downcast_ref::<$ARRAYTYPE>()
194            .expect("compute_utf8_flag_op failed to downcast array");
195        let rr = $RIGHT
196            .as_any()
197            .downcast_ref::<$ARRAYTYPE>()
198            .expect("compute_utf8_flag_op failed to downcast array");
199
200        let flag = if $FLAG {
201            Some($ARRAYTYPE::from(vec!["i"; ll.len()]))
202        } else {
203            None
204        };
205        let mut array = $OP(ll, rr, flag.as_ref())?;
206        if $NOT {
207            array = not(&array).unwrap();
208        }
209        Ok(Arc::new(array))
210    }};
211}
212
213/// Invoke a compute kernel on a pair of binary data arrays with flags
214macro_rules! compute_utf8view_flag_op {
215    ($LEFT:expr, $RIGHT:expr, $OP:ident, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
216        let ll = $LEFT
217            .as_any()
218            .downcast_ref::<$ARRAYTYPE>()
219            .expect("compute_utf8view_flag_op failed to downcast array");
220        let rr = $RIGHT
221            .as_any()
222            .downcast_ref::<$ARRAYTYPE>()
223            .expect("compute_utf8view_flag_op failed to downcast array");
224
225        let flag = if $FLAG {
226            Some($ARRAYTYPE::from(vec!["i"; ll.len()]))
227        } else {
228            None
229        };
230        let mut array = $OP(ll, rr, flag.as_ref())?;
231        if $NOT {
232            array = not(&array).unwrap();
233        }
234        Ok(Arc::new(array))
235    }};
236}
237
238macro_rules! binary_string_array_flag_op_scalar {
239    ($LEFT:ident, $RIGHT:expr, $OP:ident, $NOT:expr, $FLAG:expr) => {{
240        // This macro is slightly different from binary_string_array_flag_op because, when comparing with a scalar value,
241        // the query can be optimized in such a way that operands will be dicts, so we need to support it here
242        let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
243            DataType::Utf8 => {
244                compute_utf8_flag_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $NOT, $FLAG)
245            },
246            DataType::Utf8View => {
247                compute_utf8view_flag_op_scalar!($LEFT, $RIGHT, $OP, StringViewArray, $NOT, $FLAG)
248            }
249            DataType::LargeUtf8 => {
250                compute_utf8_flag_op_scalar!($LEFT, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG)
251            },
252            DataType::Dictionary(_, _) => {
253                let values = $LEFT.as_any_dictionary().values();
254
255                match values.data_type() {
256                    DataType::Utf8 => compute_utf8_flag_op_scalar!(values, $RIGHT, $OP, StringArray, $NOT, $FLAG),
257                    DataType::Utf8View => compute_utf8view_flag_op_scalar!(values, $RIGHT, $OP, StringViewArray, $NOT, $FLAG),
258                    DataType::LargeUtf8 => compute_utf8_flag_op_scalar!(values, $RIGHT, $OP, LargeStringArray, $NOT, $FLAG),
259                    other => internal_err!(
260                        "Data type {:?} not supported as a dictionary value type for binary_string_array_flag_op_scalar operation '{}' on string array",
261                        other, stringify!($OP)
262                    ),
263                }.map(
264                    // downcast_dictionary_array duplicates code per possible key type, so we aim to do all prep work before
265                    |evaluated_values| downcast_dictionary_array! {
266                        $LEFT => {
267                            let unpacked_dict = evaluated_values.take_iter($LEFT.keys().iter().map(|opt| opt.map(|v| v as _))).collect::<BooleanArray>();
268                            Arc::new(unpacked_dict) as _
269                        },
270                        _ => unreachable!(),
271                    }
272                )
273            },
274            other => internal_err!(
275                "Data type {:?} not supported for binary_string_array_flag_op_scalar operation '{}' on string array",
276                other, stringify!($OP)
277            ),
278        };
279        Some(result)
280    }};
281}
282
283/// Invoke a compute kernel on a data array and a scalar value with flag
284macro_rules! compute_utf8_flag_op_scalar {
285    ($LEFT:expr, $RIGHT:expr, $OP:ident, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
286        let ll = $LEFT
287            .as_any()
288            .downcast_ref::<$ARRAYTYPE>()
289            .expect("compute_utf8_flag_op_scalar failed to downcast array");
290
291        let string_value = match $RIGHT.try_as_str() {
292            Some(Some(string_value)) => string_value,
293            // null literal or non string
294            _ => return internal_err!(
295                        "compute_utf8_flag_op_scalar failed to cast literal value {} for operation '{}'",
296                        $RIGHT, stringify!($OP)
297                    )
298        };
299
300        let flag = $FLAG.then_some("i");
301        let mut array =
302            paste::expr! {[<$OP _scalar>]}(ll, &string_value, flag)?;
303        if $NOT {
304            array = not(&array).unwrap();
305        }
306
307        Ok(Arc::new(array))
308    }};
309}
310
311/// Invoke a compute kernel on a data array and a scalar value with flag
312macro_rules! compute_utf8view_flag_op_scalar {
313    ($LEFT:expr, $RIGHT:expr, $OP:ident, $ARRAYTYPE:ident, $NOT:expr, $FLAG:expr) => {{
314        let ll = $LEFT
315            .as_any()
316            .downcast_ref::<$ARRAYTYPE>()
317            .expect("compute_utf8view_flag_op_scalar failed to downcast array");
318
319        let string_value = match $RIGHT.try_as_str() {
320            Some(Some(string_value)) => string_value,
321            // null literal or non string
322            _ => return internal_err!(
323                        "compute_utf8view_flag_op_scalar failed to cast literal value {} for operation '{}'",
324                        $RIGHT, stringify!($OP)
325                    )
326        };
327
328        let flag = $FLAG.then_some("i");
329        let mut array =
330            paste::expr! {[<$OP _scalar>]}(ll, &string_value, flag)?;
331        if $NOT {
332            array = not(&array).unwrap();
333        }
334
335        Ok(Arc::new(array))
336    }};
337}
338
339impl PhysicalExpr for BinaryExpr {
340    /// Return a reference to Any that can be used for downcasting
341    fn as_any(&self) -> &dyn Any {
342        self
343    }
344
345    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
346        BinaryTypeCoercer::new(
347            &self.left.data_type(input_schema)?,
348            &self.op,
349            &self.right.data_type(input_schema)?,
350        )
351        .get_result_type()
352    }
353
354    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
355        Ok(self.left.nullable(input_schema)? || self.right.nullable(input_schema)?)
356    }
357
358    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
359        use arrow::compute::kernels::numeric::*;
360
361        let lhs = self.left.evaluate(batch)?;
362
363        // Optimize for short-circuiting `Operator::And` or `Operator::Or` operations and return early.
364        if check_short_circuit(&lhs, &self.op) {
365            return Ok(lhs);
366        }
367
368        let rhs = self.right.evaluate(batch)?;
369        let left_data_type = lhs.data_type();
370        let right_data_type = rhs.data_type();
371
372        let schema = batch.schema();
373        let input_schema = schema.as_ref();
374
375        if left_data_type.is_nested() {
376            if right_data_type != left_data_type {
377                return internal_err!("type mismatch");
378            }
379            return apply_cmp_for_nested(self.op, &lhs, &rhs);
380        }
381
382        match self.op {
383            Operator::Plus if self.fail_on_overflow => return apply(&lhs, &rhs, add),
384            Operator::Plus => return apply(&lhs, &rhs, add_wrapping),
385            Operator::Minus if self.fail_on_overflow => return apply(&lhs, &rhs, sub),
386            Operator::Minus => return apply(&lhs, &rhs, sub_wrapping),
387            Operator::Multiply if self.fail_on_overflow => return apply(&lhs, &rhs, mul),
388            Operator::Multiply => return apply(&lhs, &rhs, mul_wrapping),
389            Operator::Divide => return apply(&lhs, &rhs, div),
390            Operator::Modulo => return apply(&lhs, &rhs, rem),
391            Operator::Eq => return apply_cmp(&lhs, &rhs, eq),
392            Operator::NotEq => return apply_cmp(&lhs, &rhs, neq),
393            Operator::Lt => return apply_cmp(&lhs, &rhs, lt),
394            Operator::Gt => return apply_cmp(&lhs, &rhs, gt),
395            Operator::LtEq => return apply_cmp(&lhs, &rhs, lt_eq),
396            Operator::GtEq => return apply_cmp(&lhs, &rhs, gt_eq),
397            Operator::IsDistinctFrom => return apply_cmp(&lhs, &rhs, distinct),
398            Operator::IsNotDistinctFrom => return apply_cmp(&lhs, &rhs, not_distinct),
399            Operator::LikeMatch => return apply_cmp(&lhs, &rhs, like),
400            Operator::ILikeMatch => return apply_cmp(&lhs, &rhs, ilike),
401            Operator::NotLikeMatch => return apply_cmp(&lhs, &rhs, nlike),
402            Operator::NotILikeMatch => return apply_cmp(&lhs, &rhs, nilike),
403            _ => {}
404        }
405
406        let result_type = self.data_type(input_schema)?;
407
408        // Attempt to use special kernels if one input is scalar and the other is an array
409        let scalar_result = match (&lhs, &rhs) {
410            (ColumnarValue::Array(array), ColumnarValue::Scalar(scalar)) => {
411                // if left is array and right is literal(not NULL) - use scalar operations
412                if scalar.is_null() {
413                    None
414                } else {
415                    self.evaluate_array_scalar(array, scalar.clone())?.map(|r| {
416                        r.and_then(|a| to_result_type_array(&self.op, a, &result_type))
417                    })
418                }
419            }
420            (_, _) => None, // default to array implementation
421        };
422
423        if let Some(result) = scalar_result {
424            return result.map(ColumnarValue::Array);
425        }
426
427        // if both arrays or both literals - extract arrays and continue execution
428        let (left, right) = (
429            lhs.into_array(batch.num_rows())?,
430            rhs.into_array(batch.num_rows())?,
431        );
432        self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type)
433            .map(ColumnarValue::Array)
434    }
435
436    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
437        vec![&self.left, &self.right]
438    }
439
440    fn with_new_children(
441        self: Arc<Self>,
442        children: Vec<Arc<dyn PhysicalExpr>>,
443    ) -> Result<Arc<dyn PhysicalExpr>> {
444        Ok(Arc::new(
445            BinaryExpr::new(Arc::clone(&children[0]), self.op, Arc::clone(&children[1]))
446                .with_fail_on_overflow(self.fail_on_overflow),
447        ))
448    }
449
450    fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
451        // Get children intervals:
452        let left_interval = children[0];
453        let right_interval = children[1];
454        // Calculate current node's interval:
455        apply_operator(&self.op, left_interval, right_interval)
456    }
457
458    fn propagate_constraints(
459        &self,
460        interval: &Interval,
461        children: &[&Interval],
462    ) -> Result<Option<Vec<Interval>>> {
463        // Get children intervals.
464        let left_interval = children[0];
465        let right_interval = children[1];
466
467        if self.op.eq(&Operator::And) {
468            if interval.eq(&Interval::CERTAINLY_TRUE) {
469                // A certainly true logical conjunction can only derive from possibly
470                // true operands. Otherwise, we prove infeasibility.
471                Ok((!left_interval.eq(&Interval::CERTAINLY_FALSE)
472                    && !right_interval.eq(&Interval::CERTAINLY_FALSE))
473                .then(|| vec![Interval::CERTAINLY_TRUE, Interval::CERTAINLY_TRUE]))
474            } else if interval.eq(&Interval::CERTAINLY_FALSE) {
475                // If the logical conjunction is certainly false, one of the
476                // operands must be false. However, it's not always possible to
477                // determine which operand is false, leading to different scenarios.
478
479                // If one operand is certainly true and the other one is uncertain,
480                // then the latter must be certainly false.
481                if left_interval.eq(&Interval::CERTAINLY_TRUE)
482                    && right_interval.eq(&Interval::UNCERTAIN)
483                {
484                    Ok(Some(vec![
485                        Interval::CERTAINLY_TRUE,
486                        Interval::CERTAINLY_FALSE,
487                    ]))
488                } else if right_interval.eq(&Interval::CERTAINLY_TRUE)
489                    && left_interval.eq(&Interval::UNCERTAIN)
490                {
491                    Ok(Some(vec![
492                        Interval::CERTAINLY_FALSE,
493                        Interval::CERTAINLY_TRUE,
494                    ]))
495                }
496                // If both children are uncertain, or if one is certainly false,
497                // we cannot conclusively refine their intervals. In this case,
498                // propagation does not result in any interval changes.
499                else {
500                    Ok(Some(vec![]))
501                }
502            } else {
503                // An uncertain logical conjunction result can not shrink the
504                // end-points of its children.
505                Ok(Some(vec![]))
506            }
507        } else if self.op.eq(&Operator::Or) {
508            if interval.eq(&Interval::CERTAINLY_FALSE) {
509                // A certainly false logical conjunction can only derive from certainly
510                // false operands. Otherwise, we prove infeasibility.
511                Ok((!left_interval.eq(&Interval::CERTAINLY_TRUE)
512                    && !right_interval.eq(&Interval::CERTAINLY_TRUE))
513                .then(|| vec![Interval::CERTAINLY_FALSE, Interval::CERTAINLY_FALSE]))
514            } else if interval.eq(&Interval::CERTAINLY_TRUE) {
515                // If the logical disjunction is certainly true, one of the
516                // operands must be true. However, it's not always possible to
517                // determine which operand is true, leading to different scenarios.
518
519                // If one operand is certainly false and the other one is uncertain,
520                // then the latter must be certainly true.
521                if left_interval.eq(&Interval::CERTAINLY_FALSE)
522                    && right_interval.eq(&Interval::UNCERTAIN)
523                {
524                    Ok(Some(vec![
525                        Interval::CERTAINLY_FALSE,
526                        Interval::CERTAINLY_TRUE,
527                    ]))
528                } else if right_interval.eq(&Interval::CERTAINLY_FALSE)
529                    && left_interval.eq(&Interval::UNCERTAIN)
530                {
531                    Ok(Some(vec![
532                        Interval::CERTAINLY_TRUE,
533                        Interval::CERTAINLY_FALSE,
534                    ]))
535                }
536                // If both children are uncertain, or if one is certainly true,
537                // we cannot conclusively refine their intervals. In this case,
538                // propagation does not result in any interval changes.
539                else {
540                    Ok(Some(vec![]))
541                }
542            } else {
543                // An uncertain logical disjunction result can not shrink the
544                // end-points of its children.
545                Ok(Some(vec![]))
546            }
547        } else if self.op.supports_propagation() {
548            Ok(
549                propagate_comparison(&self.op, interval, left_interval, right_interval)?
550                    .map(|(left, right)| vec![left, right]),
551            )
552        } else {
553            Ok(
554                propagate_arithmetic(&self.op, interval, left_interval, right_interval)?
555                    .map(|(left, right)| vec![left, right]),
556            )
557        }
558    }
559
560    fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
561        let (left, right) = (children[0], children[1]);
562
563        if self.op.is_numerical_operators() {
564            // We might be able to construct the output statistics more accurately,
565            // without falling back to an unknown distribution, if we are dealing
566            // with Gaussian distributions and numerical operations.
567            if let (Gaussian(left), Gaussian(right)) = (left, right) {
568                if let Some(result) = combine_gaussians(&self.op, left, right)? {
569                    return Ok(Gaussian(result));
570                }
571            }
572        } else if self.op.is_logic_operator() {
573            // If we are dealing with logical operators, we expect (and can only
574            // operate on) Bernoulli distributions.
575            return if let (Bernoulli(left), Bernoulli(right)) = (left, right) {
576                combine_bernoullis(&self.op, left, right).map(Bernoulli)
577            } else {
578                internal_err!(
579                    "Logical operators are only compatible with `Bernoulli` distributions"
580                )
581            };
582        } else if self.op.supports_propagation() {
583            // If we are handling comparison operators, we expect (and can only
584            // operate on) numeric distributions.
585            return create_bernoulli_from_comparison(&self.op, left, right);
586        }
587        // Fall back to an unknown distribution with only summary statistics:
588        new_generic_from_binary_op(&self.op, left, right)
589    }
590
591    /// For each operator, [`BinaryExpr`] has distinct rules.
592    /// TODO: There may be rules specific to some data types and expression ranges.
593    fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
594        let (l_order, l_range) = (children[0].sort_properties, &children[0].range);
595        let (r_order, r_range) = (children[1].sort_properties, &children[1].range);
596        match self.op() {
597            Operator::Plus => Ok(ExprProperties {
598                sort_properties: l_order.add(&r_order),
599                range: l_range.add(r_range)?,
600                preserves_lex_ordering: false,
601            }),
602            Operator::Minus => Ok(ExprProperties {
603                sort_properties: l_order.sub(&r_order),
604                range: l_range.sub(r_range)?,
605                preserves_lex_ordering: false,
606            }),
607            Operator::Gt => Ok(ExprProperties {
608                sort_properties: l_order.gt_or_gteq(&r_order),
609                range: l_range.gt(r_range)?,
610                preserves_lex_ordering: false,
611            }),
612            Operator::GtEq => Ok(ExprProperties {
613                sort_properties: l_order.gt_or_gteq(&r_order),
614                range: l_range.gt_eq(r_range)?,
615                preserves_lex_ordering: false,
616            }),
617            Operator::Lt => Ok(ExprProperties {
618                sort_properties: r_order.gt_or_gteq(&l_order),
619                range: l_range.lt(r_range)?,
620                preserves_lex_ordering: false,
621            }),
622            Operator::LtEq => Ok(ExprProperties {
623                sort_properties: r_order.gt_or_gteq(&l_order),
624                range: l_range.lt_eq(r_range)?,
625                preserves_lex_ordering: false,
626            }),
627            Operator::And => Ok(ExprProperties {
628                sort_properties: r_order.and_or(&l_order),
629                range: l_range.and(r_range)?,
630                preserves_lex_ordering: false,
631            }),
632            Operator::Or => Ok(ExprProperties {
633                sort_properties: r_order.and_or(&l_order),
634                range: l_range.or(r_range)?,
635                preserves_lex_ordering: false,
636            }),
637            _ => Ok(ExprProperties::new_unknown()),
638        }
639    }
640
641    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
642        fn write_child(
643            f: &mut std::fmt::Formatter,
644            expr: &dyn PhysicalExpr,
645            precedence: u8,
646        ) -> std::fmt::Result {
647            if let Some(child) = expr.as_any().downcast_ref::<BinaryExpr>() {
648                let p = child.op.precedence();
649                if p == 0 || p < precedence {
650                    write!(f, "(")?;
651                    child.fmt_sql(f)?;
652                    write!(f, ")")
653                } else {
654                    child.fmt_sql(f)
655                }
656            } else {
657                expr.fmt_sql(f)
658            }
659        }
660
661        let precedence = self.op.precedence();
662        write_child(f, self.left.as_ref(), precedence)?;
663        write!(f, " {} ", self.op)?;
664        write_child(f, self.right.as_ref(), precedence)
665    }
666}
667
668/// Casts dictionary array to result type for binary numerical operators. Such operators
669/// between array and scalar produce a dictionary array other than primitive array of the
670/// same operators between array and array. This leads to inconsistent result types causing
671/// errors in the following query execution. For such operators between array and scalar,
672/// we cast the dictionary array to primitive array.
673fn to_result_type_array(
674    op: &Operator,
675    array: ArrayRef,
676    result_type: &DataType,
677) -> Result<ArrayRef> {
678    if array.data_type() == result_type {
679        Ok(array)
680    } else if op.is_numerical_operators() {
681        match array.data_type() {
682            DataType::Dictionary(_, value_type) => {
683                if value_type.as_ref() == result_type {
684                    Ok(cast(&array, result_type)?)
685                } else {
686                    internal_err!(
687                            "Incompatible Dictionary value type {value_type:?} with result type {result_type:?} of Binary operator {op:?}"
688                        )
689                }
690            }
691            _ => Ok(array),
692        }
693    } else {
694        Ok(array)
695    }
696}
697
698impl BinaryExpr {
699    /// Evaluate the expression of the left input is an array and
700    /// right is literal - use scalar operations
701    fn evaluate_array_scalar(
702        &self,
703        array: &dyn Array,
704        scalar: ScalarValue,
705    ) -> Result<Option<Result<ArrayRef>>> {
706        use Operator::*;
707        let scalar_result = match &self.op {
708            RegexMatch => binary_string_array_flag_op_scalar!(
709                array,
710                scalar,
711                regexp_is_match,
712                false,
713                false
714            ),
715            RegexIMatch => binary_string_array_flag_op_scalar!(
716                array,
717                scalar,
718                regexp_is_match,
719                false,
720                true
721            ),
722            RegexNotMatch => binary_string_array_flag_op_scalar!(
723                array,
724                scalar,
725                regexp_is_match,
726                true,
727                false
728            ),
729            RegexNotIMatch => binary_string_array_flag_op_scalar!(
730                array,
731                scalar,
732                regexp_is_match,
733                true,
734                true
735            ),
736            BitwiseAnd => bitwise_and_dyn_scalar(array, scalar),
737            BitwiseOr => bitwise_or_dyn_scalar(array, scalar),
738            BitwiseXor => bitwise_xor_dyn_scalar(array, scalar),
739            BitwiseShiftRight => bitwise_shift_right_dyn_scalar(array, scalar),
740            BitwiseShiftLeft => bitwise_shift_left_dyn_scalar(array, scalar),
741            // if scalar operation is not supported - fallback to array implementation
742            _ => None,
743        };
744
745        Ok(scalar_result)
746    }
747
748    fn evaluate_with_resolved_args(
749        &self,
750        left: Arc<dyn Array>,
751        left_data_type: &DataType,
752        right: Arc<dyn Array>,
753        right_data_type: &DataType,
754    ) -> Result<ArrayRef> {
755        use Operator::*;
756        match &self.op {
757            IsDistinctFrom | IsNotDistinctFrom | Lt | LtEq | Gt | GtEq | Eq | NotEq
758            | Plus | Minus | Multiply | Divide | Modulo | LikeMatch | ILikeMatch
759            | NotLikeMatch | NotILikeMatch => unreachable!(),
760            And => {
761                if left_data_type == &DataType::Boolean {
762                    Ok(boolean_op(&left, &right, and_kleene)?)
763                } else {
764                    internal_err!(
765                        "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
766                        self.op,
767                        left.data_type(),
768                        right.data_type()
769                    )
770                }
771            }
772            Or => {
773                if left_data_type == &DataType::Boolean {
774                    Ok(boolean_op(&left, &right, or_kleene)?)
775                } else {
776                    internal_err!(
777                        "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
778                        self.op,
779                        left_data_type,
780                        right_data_type
781                    )
782                }
783            }
784            RegexMatch => {
785                binary_string_array_flag_op!(left, right, regexp_is_match, false, false)
786            }
787            RegexIMatch => {
788                binary_string_array_flag_op!(left, right, regexp_is_match, false, true)
789            }
790            RegexNotMatch => {
791                binary_string_array_flag_op!(left, right, regexp_is_match, true, false)
792            }
793            RegexNotIMatch => {
794                binary_string_array_flag_op!(left, right, regexp_is_match, true, true)
795            }
796            BitwiseAnd => bitwise_and_dyn(left, right),
797            BitwiseOr => bitwise_or_dyn(left, right),
798            BitwiseXor => bitwise_xor_dyn(left, right),
799            BitwiseShiftRight => bitwise_shift_right_dyn(left, right),
800            BitwiseShiftLeft => bitwise_shift_left_dyn(left, right),
801            StringConcat => concat_elements(left, right),
802            AtArrow | ArrowAt | Arrow | LongArrow | HashArrow | HashLongArrow | AtAt
803            | HashMinus | AtQuestion | Question | QuestionAnd | QuestionPipe
804            | IntegerDivide => {
805                not_impl_err!(
806                    "Binary operator '{:?}' is not supported in the physical expr",
807                    self.op
808                )
809            }
810        }
811    }
812}
813
814/// Checks if a logical operator (`AND`/`OR`) can short-circuit evaluation based on the left-hand side (lhs) result.
815///
816/// Short-circuiting occurs when evaluating the right-hand side (rhs) becomes unnecessary:
817/// - For `AND`: if ALL values in `lhs` are `false`, the expression must be `false` regardless of rhs.
818/// - For `OR`: if ALL values in `lhs` are `true`, the expression must be `true` regardless of rhs.
819///
820/// Returns `true` if short-circuiting is possible, `false` otherwise.
821///
822/// # Arguments
823/// * `arg` - The left-hand side (lhs) columnar value (array or scalar)
824/// * `op` - The logical operator (`AND` or `OR`)
825///
826/// # Implementation Notes
827/// 1. Only works with Boolean-typed arguments (other types automatically return `false`)
828/// 2. Handles both scalar values and array values
829/// 3. For arrays, uses optimized `true_count()`/`false_count()` methods from arrow-rs.
830///    `bool_or`/`bool_and` maybe a better choice too,for detailed discussion,see:[link](https://github.com/apache/datafusion/pull/15462#discussion_r2020558418)
831fn check_short_circuit(arg: &ColumnarValue, op: &Operator) -> bool {
832    let data_type = arg.data_type();
833    match (data_type, op) {
834        (DataType::Boolean, Operator::And) => {
835            match arg {
836                ColumnarValue::Array(array) => {
837                    if let Ok(array) = as_boolean_array(&array) {
838                        return array.false_count() == array.len();
839                    }
840                }
841                ColumnarValue::Scalar(scalar) => {
842                    if let ScalarValue::Boolean(Some(value)) = scalar {
843                        return !value;
844                    }
845                }
846            }
847            false
848        }
849        (DataType::Boolean, Operator::Or) => {
850            match arg {
851                ColumnarValue::Array(array) => {
852                    if let Ok(array) = as_boolean_array(&array) {
853                        return array.true_count() == array.len();
854                    }
855                }
856                ColumnarValue::Scalar(scalar) => {
857                    if let ScalarValue::Boolean(Some(value)) = scalar {
858                        return *value;
859                    }
860                }
861            }
862            false
863        }
864        _ => false,
865    }
866}
867
868fn concat_elements(left: Arc<dyn Array>, right: Arc<dyn Array>) -> Result<ArrayRef> {
869    Ok(match left.data_type() {
870        DataType::Utf8 => Arc::new(concat_elements_utf8(
871            left.as_string::<i32>(),
872            right.as_string::<i32>(),
873        )?),
874        DataType::LargeUtf8 => Arc::new(concat_elements_utf8(
875            left.as_string::<i64>(),
876            right.as_string::<i64>(),
877        )?),
878        DataType::Utf8View => Arc::new(concat_elements_utf8view(
879            left.as_string_view(),
880            right.as_string_view(),
881        )?),
882        other => {
883            return internal_err!(
884                "Data type {other:?} not supported for binary operation 'concat_elements' on string arrays"
885            );
886        }
887    })
888}
889
890/// Create a binary expression whose arguments are correctly coerced.
891/// This function errors if it is not possible to coerce the arguments
892/// to computational types supported by the operator.
893pub fn binary(
894    lhs: Arc<dyn PhysicalExpr>,
895    op: Operator,
896    rhs: Arc<dyn PhysicalExpr>,
897    _input_schema: &Schema,
898) -> Result<Arc<dyn PhysicalExpr>> {
899    Ok(Arc::new(BinaryExpr::new(lhs, op, rhs)))
900}
901
902/// Create a similar to expression
903pub fn similar_to(
904    negated: bool,
905    case_insensitive: bool,
906    expr: Arc<dyn PhysicalExpr>,
907    pattern: Arc<dyn PhysicalExpr>,
908) -> Result<Arc<dyn PhysicalExpr>> {
909    let binary_op = match (negated, case_insensitive) {
910        (false, false) => Operator::RegexMatch,
911        (false, true) => Operator::RegexIMatch,
912        (true, false) => Operator::RegexNotMatch,
913        (true, true) => Operator::RegexNotIMatch,
914    };
915    Ok(Arc::new(BinaryExpr::new(expr, binary_op, pattern)))
916}
917
918#[cfg(test)]
919mod tests {
920    use super::*;
921    use crate::expressions::{col, lit, try_cast, Column, Literal};
922
923    use datafusion_common::plan_datafusion_err;
924    use datafusion_physical_expr_common::physical_expr::fmt_sql;
925
926    /// Performs a binary operation, applying any type coercion necessary
927    fn binary_op(
928        left: Arc<dyn PhysicalExpr>,
929        op: Operator,
930        right: Arc<dyn PhysicalExpr>,
931        schema: &Schema,
932    ) -> Result<Arc<dyn PhysicalExpr>> {
933        let left_type = left.data_type(schema)?;
934        let right_type = right.data_type(schema)?;
935        let (lhs, rhs) =
936            BinaryTypeCoercer::new(&left_type, &op, &right_type).get_input_types()?;
937
938        let left_expr = try_cast(left, schema, lhs)?;
939        let right_expr = try_cast(right, schema, rhs)?;
940        binary(left_expr, op, right_expr, schema)
941    }
942
943    #[test]
944    fn binary_comparison() -> Result<()> {
945        let schema = Schema::new(vec![
946            Field::new("a", DataType::Int32, false),
947            Field::new("b", DataType::Int32, false),
948        ]);
949        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
950        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
951
952        // expression: "a < b"
953        let lt = binary(
954            col("a", &schema)?,
955            Operator::Lt,
956            col("b", &schema)?,
957            &schema,
958        )?;
959        let batch =
960            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
961
962        let result = lt
963            .evaluate(&batch)?
964            .into_array(batch.num_rows())
965            .expect("Failed to convert to array");
966        assert_eq!(result.len(), 5);
967
968        let expected = [false, false, true, true, true];
969        let result =
970            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
971        for (i, &expected_item) in expected.iter().enumerate().take(5) {
972            assert_eq!(result.value(i), expected_item);
973        }
974
975        Ok(())
976    }
977
978    #[test]
979    fn binary_nested() -> Result<()> {
980        let schema = Schema::new(vec![
981            Field::new("a", DataType::Int32, false),
982            Field::new("b", DataType::Int32, false),
983        ]);
984        let a = Int32Array::from(vec![2, 4, 6, 8, 10]);
985        let b = Int32Array::from(vec![2, 5, 4, 8, 8]);
986
987        // expression: "a < b OR a == b"
988        let expr = binary(
989            binary(
990                col("a", &schema)?,
991                Operator::Lt,
992                col("b", &schema)?,
993                &schema,
994            )?,
995            Operator::Or,
996            binary(
997                col("a", &schema)?,
998                Operator::Eq,
999                col("b", &schema)?,
1000                &schema,
1001            )?,
1002            &schema,
1003        )?;
1004        let batch =
1005            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
1006
1007        assert_eq!("a@0 < b@1 OR a@0 = b@1", format!("{expr}"));
1008
1009        let result = expr
1010            .evaluate(&batch)?
1011            .into_array(batch.num_rows())
1012            .expect("Failed to convert to array");
1013        assert_eq!(result.len(), 5);
1014
1015        let expected = [true, true, false, true, false];
1016        let result =
1017            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
1018        for (i, &expected_item) in expected.iter().enumerate().take(5) {
1019            assert_eq!(result.value(i), expected_item);
1020        }
1021
1022        Ok(())
1023    }
1024
1025    // runs an end-to-end test of physical type coercion:
1026    // 1. construct a record batch with two columns of type A and B
1027    //  (*_ARRAY is the Rust Arrow array type, and *_TYPE is the DataType of the elements)
1028    // 2. construct a physical expression of A OP B
1029    // 3. evaluate the expression
1030    // 4. verify that the resulting expression is of type C
1031    // 5. verify that the results of evaluation are $VEC
1032    macro_rules! test_coercion {
1033        ($A_ARRAY:ident, $A_TYPE:expr, $A_VEC:expr, $B_ARRAY:ident, $B_TYPE:expr, $B_VEC:expr, $OP:expr, $C_ARRAY:ident, $C_TYPE:expr, $VEC:expr,) => {{
1034            let schema = Schema::new(vec![
1035                Field::new("a", $A_TYPE, false),
1036                Field::new("b", $B_TYPE, false),
1037            ]);
1038            let a = $A_ARRAY::from($A_VEC);
1039            let b = $B_ARRAY::from($B_VEC);
1040            let (lhs, rhs) = BinaryTypeCoercer::new(&$A_TYPE, &$OP, &$B_TYPE).get_input_types()?;
1041
1042            let left = try_cast(col("a", &schema)?, &schema, lhs)?;
1043            let right = try_cast(col("b", &schema)?, &schema, rhs)?;
1044
1045            // verify that we can construct the expression
1046            let expression = binary(left, $OP, right, &schema)?;
1047            let batch = RecordBatch::try_new(
1048                Arc::new(schema.clone()),
1049                vec![Arc::new(a), Arc::new(b)],
1050            )?;
1051
1052            // verify that the expression's type is correct
1053            assert_eq!(expression.data_type(&schema)?, $C_TYPE);
1054
1055            // compute
1056            let result = expression.evaluate(&batch)?.into_array(batch.num_rows()).expect("Failed to convert to array");
1057
1058            // verify that the array's data_type is correct
1059            assert_eq!(*result.data_type(), $C_TYPE);
1060
1061            // verify that the data itself is downcastable
1062            let result = result
1063                .as_any()
1064                .downcast_ref::<$C_ARRAY>()
1065                .expect("failed to downcast");
1066            // verify that the result itself is correct
1067            for (i, x) in $VEC.iter().enumerate() {
1068                let v = result.value(i);
1069                assert_eq!(
1070                    v,
1071                    *x,
1072                    "Unexpected output at position {i}:\n\nActual:\n{v}\n\nExpected:\n{x}"
1073                );
1074            }
1075        }};
1076    }
1077
1078    #[test]
1079    fn test_type_coercion() -> Result<()> {
1080        test_coercion!(
1081            Int32Array,
1082            DataType::Int32,
1083            vec![1i32, 2i32],
1084            UInt32Array,
1085            DataType::UInt32,
1086            vec![1u32, 2u32],
1087            Operator::Plus,
1088            Int64Array,
1089            DataType::Int64,
1090            [2i64, 4i64],
1091        );
1092        test_coercion!(
1093            Int32Array,
1094            DataType::Int32,
1095            vec![1i32],
1096            UInt16Array,
1097            DataType::UInt16,
1098            vec![1u16],
1099            Operator::Plus,
1100            Int32Array,
1101            DataType::Int32,
1102            [2i32],
1103        );
1104        test_coercion!(
1105            Float32Array,
1106            DataType::Float32,
1107            vec![1f32],
1108            UInt16Array,
1109            DataType::UInt16,
1110            vec![1u16],
1111            Operator::Plus,
1112            Float32Array,
1113            DataType::Float32,
1114            [2f32],
1115        );
1116        test_coercion!(
1117            Float32Array,
1118            DataType::Float32,
1119            vec![2f32],
1120            UInt16Array,
1121            DataType::UInt16,
1122            vec![1u16],
1123            Operator::Multiply,
1124            Float32Array,
1125            DataType::Float32,
1126            [2f32],
1127        );
1128        test_coercion!(
1129            StringArray,
1130            DataType::Utf8,
1131            vec!["1994-12-13", "1995-01-26"],
1132            Date32Array,
1133            DataType::Date32,
1134            vec![9112, 9156],
1135            Operator::Eq,
1136            BooleanArray,
1137            DataType::Boolean,
1138            [true, true],
1139        );
1140        test_coercion!(
1141            StringArray,
1142            DataType::Utf8,
1143            vec!["1994-12-13", "1995-01-26"],
1144            Date32Array,
1145            DataType::Date32,
1146            vec![9113, 9154],
1147            Operator::Lt,
1148            BooleanArray,
1149            DataType::Boolean,
1150            [true, false],
1151        );
1152        test_coercion!(
1153            StringArray,
1154            DataType::Utf8,
1155            vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
1156            Date64Array,
1157            DataType::Date64,
1158            vec![787322096000, 791083425000],
1159            Operator::Eq,
1160            BooleanArray,
1161            DataType::Boolean,
1162            [true, true],
1163        );
1164        test_coercion!(
1165            StringArray,
1166            DataType::Utf8,
1167            vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
1168            Date64Array,
1169            DataType::Date64,
1170            vec![787322096001, 791083424999],
1171            Operator::Lt,
1172            BooleanArray,
1173            DataType::Boolean,
1174            [true, false],
1175        );
1176        test_coercion!(
1177            StringViewArray,
1178            DataType::Utf8View,
1179            vec!["abc"; 5],
1180            StringArray,
1181            DataType::Utf8,
1182            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1183            Operator::RegexMatch,
1184            BooleanArray,
1185            DataType::Boolean,
1186            [true, false, true, false, false],
1187        );
1188        test_coercion!(
1189            StringViewArray,
1190            DataType::Utf8View,
1191            vec!["abc"; 5],
1192            StringArray,
1193            DataType::Utf8,
1194            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1195            Operator::RegexIMatch,
1196            BooleanArray,
1197            DataType::Boolean,
1198            [true, true, true, true, false],
1199        );
1200        test_coercion!(
1201            StringArray,
1202            DataType::Utf8,
1203            vec!["abc"; 5],
1204            StringViewArray,
1205            DataType::Utf8View,
1206            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1207            Operator::RegexNotMatch,
1208            BooleanArray,
1209            DataType::Boolean,
1210            [false, true, false, true, true],
1211        );
1212        test_coercion!(
1213            StringArray,
1214            DataType::Utf8,
1215            vec!["abc"; 5],
1216            StringViewArray,
1217            DataType::Utf8View,
1218            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1219            Operator::RegexNotIMatch,
1220            BooleanArray,
1221            DataType::Boolean,
1222            [false, false, false, false, true],
1223        );
1224        test_coercion!(
1225            StringArray,
1226            DataType::Utf8,
1227            vec!["abc"; 5],
1228            StringArray,
1229            DataType::Utf8,
1230            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1231            Operator::RegexMatch,
1232            BooleanArray,
1233            DataType::Boolean,
1234            [true, false, true, false, false],
1235        );
1236        test_coercion!(
1237            StringArray,
1238            DataType::Utf8,
1239            vec!["abc"; 5],
1240            StringArray,
1241            DataType::Utf8,
1242            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1243            Operator::RegexIMatch,
1244            BooleanArray,
1245            DataType::Boolean,
1246            [true, true, true, true, false],
1247        );
1248        test_coercion!(
1249            StringArray,
1250            DataType::Utf8,
1251            vec!["abc"; 5],
1252            StringArray,
1253            DataType::Utf8,
1254            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1255            Operator::RegexNotMatch,
1256            BooleanArray,
1257            DataType::Boolean,
1258            [false, true, false, true, true],
1259        );
1260        test_coercion!(
1261            StringArray,
1262            DataType::Utf8,
1263            vec!["abc"; 5],
1264            StringArray,
1265            DataType::Utf8,
1266            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1267            Operator::RegexNotIMatch,
1268            BooleanArray,
1269            DataType::Boolean,
1270            [false, false, false, false, true],
1271        );
1272        test_coercion!(
1273            LargeStringArray,
1274            DataType::LargeUtf8,
1275            vec!["abc"; 5],
1276            LargeStringArray,
1277            DataType::LargeUtf8,
1278            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1279            Operator::RegexMatch,
1280            BooleanArray,
1281            DataType::Boolean,
1282            [true, false, true, false, false],
1283        );
1284        test_coercion!(
1285            LargeStringArray,
1286            DataType::LargeUtf8,
1287            vec!["abc"; 5],
1288            LargeStringArray,
1289            DataType::LargeUtf8,
1290            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1291            Operator::RegexIMatch,
1292            BooleanArray,
1293            DataType::Boolean,
1294            [true, true, true, true, false],
1295        );
1296        test_coercion!(
1297            LargeStringArray,
1298            DataType::LargeUtf8,
1299            vec!["abc"; 5],
1300            LargeStringArray,
1301            DataType::LargeUtf8,
1302            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1303            Operator::RegexNotMatch,
1304            BooleanArray,
1305            DataType::Boolean,
1306            [false, true, false, true, true],
1307        );
1308        test_coercion!(
1309            LargeStringArray,
1310            DataType::LargeUtf8,
1311            vec!["abc"; 5],
1312            LargeStringArray,
1313            DataType::LargeUtf8,
1314            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1315            Operator::RegexNotIMatch,
1316            BooleanArray,
1317            DataType::Boolean,
1318            [false, false, false, false, true],
1319        );
1320        test_coercion!(
1321            StringArray,
1322            DataType::Utf8,
1323            vec!["abc"; 5],
1324            StringArray,
1325            DataType::Utf8,
1326            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1327            Operator::LikeMatch,
1328            BooleanArray,
1329            DataType::Boolean,
1330            [true, false, false, true, false],
1331        );
1332        test_coercion!(
1333            StringArray,
1334            DataType::Utf8,
1335            vec!["abc"; 5],
1336            StringArray,
1337            DataType::Utf8,
1338            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1339            Operator::ILikeMatch,
1340            BooleanArray,
1341            DataType::Boolean,
1342            [true, true, false, true, true],
1343        );
1344        test_coercion!(
1345            StringArray,
1346            DataType::Utf8,
1347            vec!["abc"; 5],
1348            StringArray,
1349            DataType::Utf8,
1350            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1351            Operator::NotLikeMatch,
1352            BooleanArray,
1353            DataType::Boolean,
1354            [false, true, true, false, true],
1355        );
1356        test_coercion!(
1357            StringArray,
1358            DataType::Utf8,
1359            vec!["abc"; 5],
1360            StringArray,
1361            DataType::Utf8,
1362            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1363            Operator::NotILikeMatch,
1364            BooleanArray,
1365            DataType::Boolean,
1366            [false, false, true, false, false],
1367        );
1368        test_coercion!(
1369            LargeStringArray,
1370            DataType::LargeUtf8,
1371            vec!["abc"; 5],
1372            LargeStringArray,
1373            DataType::LargeUtf8,
1374            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1375            Operator::LikeMatch,
1376            BooleanArray,
1377            DataType::Boolean,
1378            [true, false, false, true, false],
1379        );
1380        test_coercion!(
1381            LargeStringArray,
1382            DataType::LargeUtf8,
1383            vec!["abc"; 5],
1384            LargeStringArray,
1385            DataType::LargeUtf8,
1386            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1387            Operator::ILikeMatch,
1388            BooleanArray,
1389            DataType::Boolean,
1390            [true, true, false, true, true],
1391        );
1392        test_coercion!(
1393            LargeStringArray,
1394            DataType::LargeUtf8,
1395            vec!["abc"; 5],
1396            LargeStringArray,
1397            DataType::LargeUtf8,
1398            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1399            Operator::NotLikeMatch,
1400            BooleanArray,
1401            DataType::Boolean,
1402            [false, true, true, false, true],
1403        );
1404        test_coercion!(
1405            LargeStringArray,
1406            DataType::LargeUtf8,
1407            vec!["abc"; 5],
1408            LargeStringArray,
1409            DataType::LargeUtf8,
1410            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1411            Operator::NotILikeMatch,
1412            BooleanArray,
1413            DataType::Boolean,
1414            [false, false, true, false, false],
1415        );
1416        test_coercion!(
1417            Int16Array,
1418            DataType::Int16,
1419            vec![1i16, 2i16, 3i16],
1420            Int64Array,
1421            DataType::Int64,
1422            vec![10i64, 4i64, 5i64],
1423            Operator::BitwiseAnd,
1424            Int64Array,
1425            DataType::Int64,
1426            [0i64, 0i64, 1i64],
1427        );
1428        test_coercion!(
1429            UInt16Array,
1430            DataType::UInt16,
1431            vec![1u16, 2u16, 3u16],
1432            UInt64Array,
1433            DataType::UInt64,
1434            vec![10u64, 4u64, 5u64],
1435            Operator::BitwiseAnd,
1436            UInt64Array,
1437            DataType::UInt64,
1438            [0u64, 0u64, 1u64],
1439        );
1440        test_coercion!(
1441            Int16Array,
1442            DataType::Int16,
1443            vec![3i16, 2i16, 3i16],
1444            Int64Array,
1445            DataType::Int64,
1446            vec![10i64, 6i64, 5i64],
1447            Operator::BitwiseOr,
1448            Int64Array,
1449            DataType::Int64,
1450            [11i64, 6i64, 7i64],
1451        );
1452        test_coercion!(
1453            UInt16Array,
1454            DataType::UInt16,
1455            vec![1u16, 2u16, 3u16],
1456            UInt64Array,
1457            DataType::UInt64,
1458            vec![10u64, 4u64, 5u64],
1459            Operator::BitwiseOr,
1460            UInt64Array,
1461            DataType::UInt64,
1462            [11u64, 6u64, 7u64],
1463        );
1464        test_coercion!(
1465            Int16Array,
1466            DataType::Int16,
1467            vec![3i16, 2i16, 3i16],
1468            Int64Array,
1469            DataType::Int64,
1470            vec![10i64, 6i64, 5i64],
1471            Operator::BitwiseXor,
1472            Int64Array,
1473            DataType::Int64,
1474            [9i64, 4i64, 6i64],
1475        );
1476        test_coercion!(
1477            UInt16Array,
1478            DataType::UInt16,
1479            vec![3u16, 2u16, 3u16],
1480            UInt64Array,
1481            DataType::UInt64,
1482            vec![10u64, 6u64, 5u64],
1483            Operator::BitwiseXor,
1484            UInt64Array,
1485            DataType::UInt64,
1486            [9u64, 4u64, 6u64],
1487        );
1488        test_coercion!(
1489            Int16Array,
1490            DataType::Int16,
1491            vec![4i16, 27i16, 35i16],
1492            Int64Array,
1493            DataType::Int64,
1494            vec![2i64, 3i64, 4i64],
1495            Operator::BitwiseShiftRight,
1496            Int64Array,
1497            DataType::Int64,
1498            [1i64, 3i64, 2i64],
1499        );
1500        test_coercion!(
1501            UInt16Array,
1502            DataType::UInt16,
1503            vec![4u16, 27u16, 35u16],
1504            UInt64Array,
1505            DataType::UInt64,
1506            vec![2u64, 3u64, 4u64],
1507            Operator::BitwiseShiftRight,
1508            UInt64Array,
1509            DataType::UInt64,
1510            [1u64, 3u64, 2u64],
1511        );
1512        test_coercion!(
1513            Int16Array,
1514            DataType::Int16,
1515            vec![2i16, 3i16, 4i16],
1516            Int64Array,
1517            DataType::Int64,
1518            vec![4i64, 12i64, 7i64],
1519            Operator::BitwiseShiftLeft,
1520            Int64Array,
1521            DataType::Int64,
1522            [32i64, 12288i64, 512i64],
1523        );
1524        test_coercion!(
1525            UInt16Array,
1526            DataType::UInt16,
1527            vec![2u16, 3u16, 4u16],
1528            UInt64Array,
1529            DataType::UInt64,
1530            vec![4u64, 12u64, 7u64],
1531            Operator::BitwiseShiftLeft,
1532            UInt64Array,
1533            DataType::UInt64,
1534            [32u64, 12288u64, 512u64],
1535        );
1536        Ok(())
1537    }
1538
1539    // Note it would be nice to use the same test_coercion macro as
1540    // above, but sadly the type of the values of the dictionary are
1541    // not encoded in the rust type of the DictionaryArray. Thus there
1542    // is no way at the time of this writing to create a dictionary
1543    // array using the `From` trait
1544    #[test]
1545    fn test_dictionary_type_to_array_coercion() -> Result<()> {
1546        // Test string  a string dictionary
1547        let dict_type =
1548            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1549        let string_type = DataType::Utf8;
1550
1551        // build dictionary
1552        let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
1553
1554        dict_builder.append("one")?;
1555        dict_builder.append_null();
1556        dict_builder.append("three")?;
1557        dict_builder.append("four")?;
1558        let dict_array = Arc::new(dict_builder.finish()) as ArrayRef;
1559
1560        let str_array = Arc::new(StringArray::from(vec![
1561            Some("not one"),
1562            Some("two"),
1563            None,
1564            Some("four"),
1565        ])) as ArrayRef;
1566
1567        let schema = Arc::new(Schema::new(vec![
1568            Field::new("a", dict_type.clone(), true),
1569            Field::new("b", string_type.clone(), true),
1570        ]));
1571
1572        // Test 1: a = b
1573        let result = BooleanArray::from(vec![Some(false), None, None, Some(true)]);
1574        apply_logic_op(&schema, &dict_array, &str_array, Operator::Eq, result)?;
1575
1576        // Test 2: now test the other direction
1577        // b = a
1578        let schema = Arc::new(Schema::new(vec![
1579            Field::new("a", string_type, true),
1580            Field::new("b", dict_type, true),
1581        ]));
1582        let result = BooleanArray::from(vec![Some(false), None, None, Some(true)]);
1583        apply_logic_op(&schema, &str_array, &dict_array, Operator::Eq, result)?;
1584
1585        Ok(())
1586    }
1587
1588    #[test]
1589    fn plus_op() -> Result<()> {
1590        let schema = Schema::new(vec![
1591            Field::new("a", DataType::Int32, false),
1592            Field::new("b", DataType::Int32, false),
1593        ]);
1594        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1595        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1596
1597        apply_arithmetic::<Int32Type>(
1598            Arc::new(schema),
1599            vec![Arc::new(a), Arc::new(b)],
1600            Operator::Plus,
1601            Int32Array::from(vec![2, 4, 7, 12, 21]),
1602        )?;
1603
1604        Ok(())
1605    }
1606
1607    #[test]
1608    fn plus_op_dict() -> Result<()> {
1609        let schema = Schema::new(vec![
1610            Field::new(
1611                "a",
1612                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1613                true,
1614            ),
1615            Field::new(
1616                "b",
1617                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1618                true,
1619            ),
1620        ]);
1621
1622        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1623        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
1624        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
1625
1626        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1627        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
1628        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
1629
1630        apply_arithmetic::<Int32Type>(
1631            Arc::new(schema),
1632            vec![Arc::new(a), Arc::new(b)],
1633            Operator::Plus,
1634            Int32Array::from(vec![Some(2), None, Some(4), Some(8), None]),
1635        )?;
1636
1637        Ok(())
1638    }
1639
1640    #[test]
1641    fn plus_op_dict_decimal() -> Result<()> {
1642        let schema = Schema::new(vec![
1643            Field::new(
1644                "a",
1645                DataType::Dictionary(
1646                    Box::new(DataType::Int8),
1647                    Box::new(DataType::Decimal128(10, 0)),
1648                ),
1649                true,
1650            ),
1651            Field::new(
1652                "b",
1653                DataType::Dictionary(
1654                    Box::new(DataType::Int8),
1655                    Box::new(DataType::Decimal128(10, 0)),
1656                ),
1657                true,
1658            ),
1659        ]);
1660
1661        let value = 123;
1662        let decimal_array = Arc::new(create_decimal_array(
1663            &[
1664                Some(value),
1665                Some(value + 2),
1666                Some(value - 1),
1667                Some(value + 1),
1668            ],
1669            10,
1670            0,
1671        ));
1672
1673        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
1674        let a = DictionaryArray::try_new(keys, decimal_array)?;
1675
1676        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
1677        let decimal_array = Arc::new(create_decimal_array(
1678            &[
1679                Some(value + 1),
1680                Some(value + 3),
1681                Some(value),
1682                Some(value + 2),
1683            ],
1684            10,
1685            0,
1686        ));
1687        let b = DictionaryArray::try_new(keys, decimal_array)?;
1688
1689        apply_arithmetic(
1690            Arc::new(schema),
1691            vec![Arc::new(a), Arc::new(b)],
1692            Operator::Plus,
1693            create_decimal_array(&[Some(247), None, None, Some(247), Some(246)], 11, 0),
1694        )?;
1695
1696        Ok(())
1697    }
1698
1699    #[test]
1700    fn plus_op_scalar() -> Result<()> {
1701        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1702        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1703
1704        apply_arithmetic_scalar(
1705            Arc::new(schema),
1706            vec![Arc::new(a)],
1707            Operator::Plus,
1708            ScalarValue::Int32(Some(1)),
1709            Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
1710        )?;
1711
1712        Ok(())
1713    }
1714
1715    #[test]
1716    fn plus_op_dict_scalar() -> Result<()> {
1717        let schema = Schema::new(vec![Field::new(
1718            "a",
1719            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1720            true,
1721        )]);
1722
1723        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
1724
1725        dict_builder.append(1)?;
1726        dict_builder.append_null();
1727        dict_builder.append(2)?;
1728        dict_builder.append(5)?;
1729
1730        let a = dict_builder.finish();
1731
1732        let expected: PrimitiveArray<Int32Type> =
1733            PrimitiveArray::from(vec![Some(2), None, Some(3), Some(6)]);
1734
1735        apply_arithmetic_scalar(
1736            Arc::new(schema),
1737            vec![Arc::new(a)],
1738            Operator::Plus,
1739            ScalarValue::Dictionary(
1740                Box::new(DataType::Int8),
1741                Box::new(ScalarValue::Int32(Some(1))),
1742            ),
1743            Arc::new(expected),
1744        )?;
1745
1746        Ok(())
1747    }
1748
1749    #[test]
1750    fn plus_op_dict_scalar_decimal() -> Result<()> {
1751        let schema = Schema::new(vec![Field::new(
1752            "a",
1753            DataType::Dictionary(
1754                Box::new(DataType::Int8),
1755                Box::new(DataType::Decimal128(10, 0)),
1756            ),
1757            true,
1758        )]);
1759
1760        let value = 123;
1761        let decimal_array = Arc::new(create_decimal_array(
1762            &[Some(value), None, Some(value - 1), Some(value + 1)],
1763            10,
1764            0,
1765        ));
1766
1767        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
1768        let a = DictionaryArray::try_new(keys, decimal_array)?;
1769
1770        let decimal_array = Arc::new(create_decimal_array(
1771            &[
1772                Some(value + 1),
1773                Some(value),
1774                None,
1775                Some(value + 2),
1776                Some(value + 1),
1777            ],
1778            11,
1779            0,
1780        ));
1781
1782        apply_arithmetic_scalar(
1783            Arc::new(schema),
1784            vec![Arc::new(a)],
1785            Operator::Plus,
1786            ScalarValue::Dictionary(
1787                Box::new(DataType::Int8),
1788                Box::new(ScalarValue::Decimal128(Some(1), 10, 0)),
1789            ),
1790            decimal_array,
1791        )?;
1792
1793        Ok(())
1794    }
1795
1796    #[test]
1797    fn minus_op() -> Result<()> {
1798        let schema = Arc::new(Schema::new(vec![
1799            Field::new("a", DataType::Int32, false),
1800            Field::new("b", DataType::Int32, false),
1801        ]));
1802        let a = Arc::new(Int32Array::from(vec![1, 2, 4, 8, 16]));
1803        let b = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1804
1805        apply_arithmetic::<Int32Type>(
1806            Arc::clone(&schema),
1807            vec![
1808                Arc::clone(&a) as Arc<dyn Array>,
1809                Arc::clone(&b) as Arc<dyn Array>,
1810            ],
1811            Operator::Minus,
1812            Int32Array::from(vec![0, 0, 1, 4, 11]),
1813        )?;
1814
1815        // should handle have negative values in result (for signed)
1816        apply_arithmetic::<Int32Type>(
1817            schema,
1818            vec![b, a],
1819            Operator::Minus,
1820            Int32Array::from(vec![0, 0, -1, -4, -11]),
1821        )?;
1822
1823        Ok(())
1824    }
1825
1826    #[test]
1827    fn minus_op_dict() -> Result<()> {
1828        let schema = Schema::new(vec![
1829            Field::new(
1830                "a",
1831                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1832                true,
1833            ),
1834            Field::new(
1835                "b",
1836                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1837                true,
1838            ),
1839        ]);
1840
1841        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1842        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
1843        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
1844
1845        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1846        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
1847        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
1848
1849        apply_arithmetic::<Int32Type>(
1850            Arc::new(schema),
1851            vec![Arc::new(a), Arc::new(b)],
1852            Operator::Minus,
1853            Int32Array::from(vec![Some(0), None, Some(0), Some(0), None]),
1854        )?;
1855
1856        Ok(())
1857    }
1858
1859    #[test]
1860    fn minus_op_dict_decimal() -> Result<()> {
1861        let schema = Schema::new(vec![
1862            Field::new(
1863                "a",
1864                DataType::Dictionary(
1865                    Box::new(DataType::Int8),
1866                    Box::new(DataType::Decimal128(10, 0)),
1867                ),
1868                true,
1869            ),
1870            Field::new(
1871                "b",
1872                DataType::Dictionary(
1873                    Box::new(DataType::Int8),
1874                    Box::new(DataType::Decimal128(10, 0)),
1875                ),
1876                true,
1877            ),
1878        ]);
1879
1880        let value = 123;
1881        let decimal_array = Arc::new(create_decimal_array(
1882            &[
1883                Some(value),
1884                Some(value + 2),
1885                Some(value - 1),
1886                Some(value + 1),
1887            ],
1888            10,
1889            0,
1890        ));
1891
1892        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
1893        let a = DictionaryArray::try_new(keys, decimal_array)?;
1894
1895        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
1896        let decimal_array = Arc::new(create_decimal_array(
1897            &[
1898                Some(value + 1),
1899                Some(value + 3),
1900                Some(value),
1901                Some(value + 2),
1902            ],
1903            10,
1904            0,
1905        ));
1906        let b = DictionaryArray::try_new(keys, decimal_array)?;
1907
1908        apply_arithmetic(
1909            Arc::new(schema),
1910            vec![Arc::new(a), Arc::new(b)],
1911            Operator::Minus,
1912            create_decimal_array(&[Some(-1), None, None, Some(1), Some(0)], 11, 0),
1913        )?;
1914
1915        Ok(())
1916    }
1917
1918    #[test]
1919    fn minus_op_scalar() -> Result<()> {
1920        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1921        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1922
1923        apply_arithmetic_scalar(
1924            Arc::new(schema),
1925            vec![Arc::new(a)],
1926            Operator::Minus,
1927            ScalarValue::Int32(Some(1)),
1928            Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4])),
1929        )?;
1930
1931        Ok(())
1932    }
1933
1934    #[test]
1935    fn minus_op_dict_scalar() -> Result<()> {
1936        let schema = Schema::new(vec![Field::new(
1937            "a",
1938            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1939            true,
1940        )]);
1941
1942        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
1943
1944        dict_builder.append(1)?;
1945        dict_builder.append_null();
1946        dict_builder.append(2)?;
1947        dict_builder.append(5)?;
1948
1949        let a = dict_builder.finish();
1950
1951        let expected: PrimitiveArray<Int32Type> =
1952            PrimitiveArray::from(vec![Some(0), None, Some(1), Some(4)]);
1953
1954        apply_arithmetic_scalar(
1955            Arc::new(schema),
1956            vec![Arc::new(a)],
1957            Operator::Minus,
1958            ScalarValue::Dictionary(
1959                Box::new(DataType::Int8),
1960                Box::new(ScalarValue::Int32(Some(1))),
1961            ),
1962            Arc::new(expected),
1963        )?;
1964
1965        Ok(())
1966    }
1967
1968    #[test]
1969    fn minus_op_dict_scalar_decimal() -> Result<()> {
1970        let schema = Schema::new(vec![Field::new(
1971            "a",
1972            DataType::Dictionary(
1973                Box::new(DataType::Int8),
1974                Box::new(DataType::Decimal128(10, 0)),
1975            ),
1976            true,
1977        )]);
1978
1979        let value = 123;
1980        let decimal_array = Arc::new(create_decimal_array(
1981            &[Some(value), None, Some(value - 1), Some(value + 1)],
1982            10,
1983            0,
1984        ));
1985
1986        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
1987        let a = DictionaryArray::try_new(keys, decimal_array)?;
1988
1989        let decimal_array = Arc::new(create_decimal_array(
1990            &[
1991                Some(value - 1),
1992                Some(value - 2),
1993                None,
1994                Some(value),
1995                Some(value - 1),
1996            ],
1997            11,
1998            0,
1999        ));
2000
2001        apply_arithmetic_scalar(
2002            Arc::new(schema),
2003            vec![Arc::new(a)],
2004            Operator::Minus,
2005            ScalarValue::Dictionary(
2006                Box::new(DataType::Int8),
2007                Box::new(ScalarValue::Decimal128(Some(1), 10, 0)),
2008            ),
2009            decimal_array,
2010        )?;
2011
2012        Ok(())
2013    }
2014
2015    #[test]
2016    fn multiply_op() -> Result<()> {
2017        let schema = Arc::new(Schema::new(vec![
2018            Field::new("a", DataType::Int32, false),
2019            Field::new("b", DataType::Int32, false),
2020        ]));
2021        let a = Arc::new(Int32Array::from(vec![4, 8, 16, 32, 64]));
2022        let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
2023
2024        apply_arithmetic::<Int32Type>(
2025            schema,
2026            vec![a, b],
2027            Operator::Multiply,
2028            Int32Array::from(vec![8, 32, 128, 512, 2048]),
2029        )?;
2030
2031        Ok(())
2032    }
2033
2034    #[test]
2035    fn multiply_op_dict() -> Result<()> {
2036        let schema = Schema::new(vec![
2037            Field::new(
2038                "a",
2039                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2040                true,
2041            ),
2042            Field::new(
2043                "b",
2044                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2045                true,
2046            ),
2047        ]);
2048
2049        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2050        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
2051        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
2052
2053        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2054        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2055        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2056
2057        apply_arithmetic::<Int32Type>(
2058            Arc::new(schema),
2059            vec![Arc::new(a), Arc::new(b)],
2060            Operator::Multiply,
2061            Int32Array::from(vec![Some(1), None, Some(4), Some(16), None]),
2062        )?;
2063
2064        Ok(())
2065    }
2066
2067    #[test]
2068    fn multiply_op_dict_decimal() -> Result<()> {
2069        let schema = Schema::new(vec![
2070            Field::new(
2071                "a",
2072                DataType::Dictionary(
2073                    Box::new(DataType::Int8),
2074                    Box::new(DataType::Decimal128(10, 0)),
2075                ),
2076                true,
2077            ),
2078            Field::new(
2079                "b",
2080                DataType::Dictionary(
2081                    Box::new(DataType::Int8),
2082                    Box::new(DataType::Decimal128(10, 0)),
2083                ),
2084                true,
2085            ),
2086        ]);
2087
2088        let value = 123;
2089        let decimal_array = Arc::new(create_decimal_array(
2090            &[
2091                Some(value),
2092                Some(value + 2),
2093                Some(value - 1),
2094                Some(value + 1),
2095            ],
2096            10,
2097            0,
2098        )) as ArrayRef;
2099
2100        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2101        let a = DictionaryArray::try_new(keys, decimal_array)?;
2102
2103        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2104        let decimal_array = Arc::new(create_decimal_array(
2105            &[
2106                Some(value + 1),
2107                Some(value + 3),
2108                Some(value),
2109                Some(value + 2),
2110            ],
2111            10,
2112            0,
2113        ));
2114        let b = DictionaryArray::try_new(keys, decimal_array)?;
2115
2116        apply_arithmetic(
2117            Arc::new(schema),
2118            vec![Arc::new(a), Arc::new(b)],
2119            Operator::Multiply,
2120            create_decimal_array(
2121                &[Some(15252), None, None, Some(15252), Some(15129)],
2122                21,
2123                0,
2124            ),
2125        )?;
2126
2127        Ok(())
2128    }
2129
2130    #[test]
2131    fn multiply_op_scalar() -> Result<()> {
2132        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2133        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2134
2135        apply_arithmetic_scalar(
2136            Arc::new(schema),
2137            vec![Arc::new(a)],
2138            Operator::Multiply,
2139            ScalarValue::Int32(Some(2)),
2140            Arc::new(Int32Array::from(vec![2, 4, 6, 8, 10])),
2141        )?;
2142
2143        Ok(())
2144    }
2145
2146    #[test]
2147    fn multiply_op_dict_scalar() -> Result<()> {
2148        let schema = Schema::new(vec![Field::new(
2149            "a",
2150            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2151            true,
2152        )]);
2153
2154        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2155
2156        dict_builder.append(1)?;
2157        dict_builder.append_null();
2158        dict_builder.append(2)?;
2159        dict_builder.append(5)?;
2160
2161        let a = dict_builder.finish();
2162
2163        let expected: PrimitiveArray<Int32Type> =
2164            PrimitiveArray::from(vec![Some(2), None, Some(4), Some(10)]);
2165
2166        apply_arithmetic_scalar(
2167            Arc::new(schema),
2168            vec![Arc::new(a)],
2169            Operator::Multiply,
2170            ScalarValue::Dictionary(
2171                Box::new(DataType::Int8),
2172                Box::new(ScalarValue::Int32(Some(2))),
2173            ),
2174            Arc::new(expected),
2175        )?;
2176
2177        Ok(())
2178    }
2179
2180    #[test]
2181    fn multiply_op_dict_scalar_decimal() -> Result<()> {
2182        let schema = Schema::new(vec![Field::new(
2183            "a",
2184            DataType::Dictionary(
2185                Box::new(DataType::Int8),
2186                Box::new(DataType::Decimal128(10, 0)),
2187            ),
2188            true,
2189        )]);
2190
2191        let value = 123;
2192        let decimal_array = Arc::new(create_decimal_array(
2193            &[Some(value), None, Some(value - 1), Some(value + 1)],
2194            10,
2195            0,
2196        ));
2197
2198        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2199        let a = DictionaryArray::try_new(keys, decimal_array)?;
2200
2201        let decimal_array = Arc::new(create_decimal_array(
2202            &[Some(246), Some(244), None, Some(248), Some(246)],
2203            21,
2204            0,
2205        ));
2206
2207        apply_arithmetic_scalar(
2208            Arc::new(schema),
2209            vec![Arc::new(a)],
2210            Operator::Multiply,
2211            ScalarValue::Dictionary(
2212                Box::new(DataType::Int8),
2213                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2214            ),
2215            decimal_array,
2216        )?;
2217
2218        Ok(())
2219    }
2220
2221    #[test]
2222    fn divide_op() -> Result<()> {
2223        let schema = Arc::new(Schema::new(vec![
2224            Field::new("a", DataType::Int32, false),
2225            Field::new("b", DataType::Int32, false),
2226        ]));
2227        let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
2228        let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
2229
2230        apply_arithmetic::<Int32Type>(
2231            schema,
2232            vec![a, b],
2233            Operator::Divide,
2234            Int32Array::from(vec![4, 8, 16, 32, 64]),
2235        )?;
2236
2237        Ok(())
2238    }
2239
2240    #[test]
2241    fn divide_op_dict() -> Result<()> {
2242        let schema = Schema::new(vec![
2243            Field::new(
2244                "a",
2245                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2246                true,
2247            ),
2248            Field::new(
2249                "b",
2250                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2251                true,
2252            ),
2253        ]);
2254
2255        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2256
2257        dict_builder.append(1)?;
2258        dict_builder.append_null();
2259        dict_builder.append(2)?;
2260        dict_builder.append(5)?;
2261        dict_builder.append(0)?;
2262
2263        let a = dict_builder.finish();
2264
2265        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2266        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2267        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2268
2269        apply_arithmetic::<Int32Type>(
2270            Arc::new(schema),
2271            vec![Arc::new(a), Arc::new(b)],
2272            Operator::Divide,
2273            Int32Array::from(vec![Some(1), None, Some(1), Some(1), Some(0)]),
2274        )?;
2275
2276        Ok(())
2277    }
2278
2279    #[test]
2280    fn divide_op_dict_decimal() -> Result<()> {
2281        let schema = Schema::new(vec![
2282            Field::new(
2283                "a",
2284                DataType::Dictionary(
2285                    Box::new(DataType::Int8),
2286                    Box::new(DataType::Decimal128(10, 0)),
2287                ),
2288                true,
2289            ),
2290            Field::new(
2291                "b",
2292                DataType::Dictionary(
2293                    Box::new(DataType::Int8),
2294                    Box::new(DataType::Decimal128(10, 0)),
2295                ),
2296                true,
2297            ),
2298        ]);
2299
2300        let value = 123;
2301        let decimal_array = Arc::new(create_decimal_array(
2302            &[
2303                Some(value),
2304                Some(value + 2),
2305                Some(value - 1),
2306                Some(value + 1),
2307            ],
2308            10,
2309            0,
2310        ));
2311
2312        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2313        let a = DictionaryArray::try_new(keys, decimal_array)?;
2314
2315        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2316        let decimal_array = Arc::new(create_decimal_array(
2317            &[
2318                Some(value + 1),
2319                Some(value + 3),
2320                Some(value),
2321                Some(value + 2),
2322            ],
2323            10,
2324            0,
2325        ));
2326        let b = DictionaryArray::try_new(keys, decimal_array)?;
2327
2328        apply_arithmetic(
2329            Arc::new(schema),
2330            vec![Arc::new(a), Arc::new(b)],
2331            Operator::Divide,
2332            create_decimal_array(
2333                &[
2334                    Some(9919), // 0.9919
2335                    None,
2336                    None,
2337                    Some(10081), // 1.0081
2338                    Some(10000), // 1.0
2339                ],
2340                14,
2341                4,
2342            ),
2343        )?;
2344
2345        Ok(())
2346    }
2347
2348    #[test]
2349    fn divide_op_scalar() -> Result<()> {
2350        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2351        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2352
2353        apply_arithmetic_scalar(
2354            Arc::new(schema),
2355            vec![Arc::new(a)],
2356            Operator::Divide,
2357            ScalarValue::Int32(Some(2)),
2358            Arc::new(Int32Array::from(vec![0, 1, 1, 2, 2])),
2359        )?;
2360
2361        Ok(())
2362    }
2363
2364    #[test]
2365    fn divide_op_dict_scalar() -> Result<()> {
2366        let schema = Schema::new(vec![Field::new(
2367            "a",
2368            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2369            true,
2370        )]);
2371
2372        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2373
2374        dict_builder.append(1)?;
2375        dict_builder.append_null();
2376        dict_builder.append(2)?;
2377        dict_builder.append(5)?;
2378
2379        let a = dict_builder.finish();
2380
2381        let expected: PrimitiveArray<Int32Type> =
2382            PrimitiveArray::from(vec![Some(0), None, Some(1), Some(2)]);
2383
2384        apply_arithmetic_scalar(
2385            Arc::new(schema),
2386            vec![Arc::new(a)],
2387            Operator::Divide,
2388            ScalarValue::Dictionary(
2389                Box::new(DataType::Int8),
2390                Box::new(ScalarValue::Int32(Some(2))),
2391            ),
2392            Arc::new(expected),
2393        )?;
2394
2395        Ok(())
2396    }
2397
2398    #[test]
2399    fn divide_op_dict_scalar_decimal() -> Result<()> {
2400        let schema = Schema::new(vec![Field::new(
2401            "a",
2402            DataType::Dictionary(
2403                Box::new(DataType::Int8),
2404                Box::new(DataType::Decimal128(10, 0)),
2405            ),
2406            true,
2407        )]);
2408
2409        let value = 123;
2410        let decimal_array = Arc::new(create_decimal_array(
2411            &[Some(value), None, Some(value - 1), Some(value + 1)],
2412            10,
2413            0,
2414        ));
2415
2416        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2417        let a = DictionaryArray::try_new(keys, decimal_array)?;
2418
2419        let decimal_array = Arc::new(create_decimal_array(
2420            &[Some(615000), Some(610000), None, Some(620000), Some(615000)],
2421            14,
2422            4,
2423        ));
2424
2425        apply_arithmetic_scalar(
2426            Arc::new(schema),
2427            vec![Arc::new(a)],
2428            Operator::Divide,
2429            ScalarValue::Dictionary(
2430                Box::new(DataType::Int8),
2431                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2432            ),
2433            decimal_array,
2434        )?;
2435
2436        Ok(())
2437    }
2438
2439    #[test]
2440    fn modulus_op() -> Result<()> {
2441        let schema = Arc::new(Schema::new(vec![
2442            Field::new("a", DataType::Int32, false),
2443            Field::new("b", DataType::Int32, false),
2444        ]));
2445        let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
2446        let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));
2447
2448        apply_arithmetic::<Int32Type>(
2449            schema,
2450            vec![a, b],
2451            Operator::Modulo,
2452            Int32Array::from(vec![0, 0, 2, 8, 0]),
2453        )?;
2454
2455        Ok(())
2456    }
2457
2458    #[test]
2459    fn modulus_op_dict() -> Result<()> {
2460        let schema = Schema::new(vec![
2461            Field::new(
2462                "a",
2463                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2464                true,
2465            ),
2466            Field::new(
2467                "b",
2468                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2469                true,
2470            ),
2471        ]);
2472
2473        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2474
2475        dict_builder.append(1)?;
2476        dict_builder.append_null();
2477        dict_builder.append(2)?;
2478        dict_builder.append(5)?;
2479        dict_builder.append(0)?;
2480
2481        let a = dict_builder.finish();
2482
2483        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2484        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2485        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2486
2487        apply_arithmetic::<Int32Type>(
2488            Arc::new(schema),
2489            vec![Arc::new(a), Arc::new(b)],
2490            Operator::Modulo,
2491            Int32Array::from(vec![Some(0), None, Some(0), Some(1), Some(0)]),
2492        )?;
2493
2494        Ok(())
2495    }
2496
2497    #[test]
2498    fn modulus_op_dict_decimal() -> Result<()> {
2499        let schema = Schema::new(vec![
2500            Field::new(
2501                "a",
2502                DataType::Dictionary(
2503                    Box::new(DataType::Int8),
2504                    Box::new(DataType::Decimal128(10, 0)),
2505                ),
2506                true,
2507            ),
2508            Field::new(
2509                "b",
2510                DataType::Dictionary(
2511                    Box::new(DataType::Int8),
2512                    Box::new(DataType::Decimal128(10, 0)),
2513                ),
2514                true,
2515            ),
2516        ]);
2517
2518        let value = 123;
2519        let decimal_array = Arc::new(create_decimal_array(
2520            &[
2521                Some(value),
2522                Some(value + 2),
2523                Some(value - 1),
2524                Some(value + 1),
2525            ],
2526            10,
2527            0,
2528        ));
2529
2530        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2531        let a = DictionaryArray::try_new(keys, decimal_array)?;
2532
2533        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2534        let decimal_array = Arc::new(create_decimal_array(
2535            &[
2536                Some(value + 1),
2537                Some(value + 3),
2538                Some(value),
2539                Some(value + 2),
2540            ],
2541            10,
2542            0,
2543        ));
2544        let b = DictionaryArray::try_new(keys, decimal_array)?;
2545
2546        apply_arithmetic(
2547            Arc::new(schema),
2548            vec![Arc::new(a), Arc::new(b)],
2549            Operator::Modulo,
2550            create_decimal_array(&[Some(123), None, None, Some(1), Some(0)], 10, 0),
2551        )?;
2552
2553        Ok(())
2554    }
2555
2556    #[test]
2557    fn modulus_op_scalar() -> Result<()> {
2558        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2559        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2560
2561        apply_arithmetic_scalar(
2562            Arc::new(schema),
2563            vec![Arc::new(a)],
2564            Operator::Modulo,
2565            ScalarValue::Int32(Some(2)),
2566            Arc::new(Int32Array::from(vec![1, 0, 1, 0, 1])),
2567        )?;
2568
2569        Ok(())
2570    }
2571
2572    #[test]
2573    fn modules_op_dict_scalar() -> Result<()> {
2574        let schema = Schema::new(vec![Field::new(
2575            "a",
2576            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2577            true,
2578        )]);
2579
2580        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2581
2582        dict_builder.append(1)?;
2583        dict_builder.append_null();
2584        dict_builder.append(2)?;
2585        dict_builder.append(5)?;
2586
2587        let a = dict_builder.finish();
2588
2589        let expected: PrimitiveArray<Int32Type> =
2590            PrimitiveArray::from(vec![Some(1), None, Some(0), Some(1)]);
2591
2592        apply_arithmetic_scalar(
2593            Arc::new(schema),
2594            vec![Arc::new(a)],
2595            Operator::Modulo,
2596            ScalarValue::Dictionary(
2597                Box::new(DataType::Int8),
2598                Box::new(ScalarValue::Int32(Some(2))),
2599            ),
2600            Arc::new(expected),
2601        )?;
2602
2603        Ok(())
2604    }
2605
2606    #[test]
2607    fn modulus_op_dict_scalar_decimal() -> Result<()> {
2608        let schema = Schema::new(vec![Field::new(
2609            "a",
2610            DataType::Dictionary(
2611                Box::new(DataType::Int8),
2612                Box::new(DataType::Decimal128(10, 0)),
2613            ),
2614            true,
2615        )]);
2616
2617        let value = 123;
2618        let decimal_array = Arc::new(create_decimal_array(
2619            &[Some(value), None, Some(value - 1), Some(value + 1)],
2620            10,
2621            0,
2622        ));
2623
2624        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2625        let a = DictionaryArray::try_new(keys, decimal_array)?;
2626
2627        let decimal_array = Arc::new(create_decimal_array(
2628            &[Some(1), Some(0), None, Some(0), Some(1)],
2629            10,
2630            0,
2631        ));
2632
2633        apply_arithmetic_scalar(
2634            Arc::new(schema),
2635            vec![Arc::new(a)],
2636            Operator::Modulo,
2637            ScalarValue::Dictionary(
2638                Box::new(DataType::Int8),
2639                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2640            ),
2641            decimal_array,
2642        )?;
2643
2644        Ok(())
2645    }
2646
2647    fn apply_arithmetic<T: ArrowNumericType>(
2648        schema: SchemaRef,
2649        data: Vec<ArrayRef>,
2650        op: Operator,
2651        expected: PrimitiveArray<T>,
2652    ) -> Result<()> {
2653        let arithmetic_op =
2654            binary_op(col("a", &schema)?, op, col("b", &schema)?, &schema)?;
2655        let batch = RecordBatch::try_new(schema, data)?;
2656        let result = arithmetic_op
2657            .evaluate(&batch)?
2658            .into_array(batch.num_rows())
2659            .expect("Failed to convert to array");
2660
2661        assert_eq!(result.as_ref(), &expected);
2662        Ok(())
2663    }
2664
2665    fn apply_arithmetic_scalar(
2666        schema: SchemaRef,
2667        data: Vec<ArrayRef>,
2668        op: Operator,
2669        literal: ScalarValue,
2670        expected: ArrayRef,
2671    ) -> Result<()> {
2672        let lit = Arc::new(Literal::new(literal));
2673        let arithmetic_op = binary_op(col("a", &schema)?, op, lit, &schema)?;
2674        let batch = RecordBatch::try_new(schema, data)?;
2675        let result = arithmetic_op
2676            .evaluate(&batch)?
2677            .into_array(batch.num_rows())
2678            .expect("Failed to convert to array");
2679
2680        assert_eq!(&result, &expected);
2681        Ok(())
2682    }
2683
2684    fn apply_logic_op(
2685        schema: &SchemaRef,
2686        left: &ArrayRef,
2687        right: &ArrayRef,
2688        op: Operator,
2689        expected: BooleanArray,
2690    ) -> Result<()> {
2691        let op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
2692        let data: Vec<ArrayRef> = vec![Arc::clone(left), Arc::clone(right)];
2693        let batch = RecordBatch::try_new(Arc::clone(schema), data)?;
2694        let result = op
2695            .evaluate(&batch)?
2696            .into_array(batch.num_rows())
2697            .expect("Failed to convert to array");
2698
2699        assert_eq!(result.as_ref(), &expected);
2700        Ok(())
2701    }
2702
2703    // Test `scalar <op> arr` produces expected
2704    fn apply_logic_op_scalar_arr(
2705        schema: &SchemaRef,
2706        scalar: &ScalarValue,
2707        arr: &ArrayRef,
2708        op: Operator,
2709        expected: &BooleanArray,
2710    ) -> Result<()> {
2711        let scalar = lit(scalar.clone());
2712        let op = binary_op(scalar, op, col("a", schema)?, schema)?;
2713        let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
2714        let result = op
2715            .evaluate(&batch)?
2716            .into_array(batch.num_rows())
2717            .expect("Failed to convert to array");
2718        assert_eq!(result.as_ref(), expected);
2719
2720        Ok(())
2721    }
2722
2723    // Test `arr <op> scalar` produces expected
2724    fn apply_logic_op_arr_scalar(
2725        schema: &SchemaRef,
2726        arr: &ArrayRef,
2727        scalar: &ScalarValue,
2728        op: Operator,
2729        expected: &BooleanArray,
2730    ) -> Result<()> {
2731        let scalar = lit(scalar.clone());
2732        let op = binary_op(col("a", schema)?, op, scalar, schema)?;
2733        let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
2734        let result = op
2735            .evaluate(&batch)?
2736            .into_array(batch.num_rows())
2737            .expect("Failed to convert to array");
2738        assert_eq!(result.as_ref(), expected);
2739
2740        Ok(())
2741    }
2742
2743    #[test]
2744    fn and_with_nulls_op() -> Result<()> {
2745        let schema = Schema::new(vec![
2746            Field::new("a", DataType::Boolean, true),
2747            Field::new("b", DataType::Boolean, true),
2748        ]);
2749        let a = Arc::new(BooleanArray::from(vec![
2750            Some(true),
2751            Some(false),
2752            None,
2753            Some(true),
2754            Some(false),
2755            None,
2756            Some(true),
2757            Some(false),
2758            None,
2759        ])) as ArrayRef;
2760        let b = Arc::new(BooleanArray::from(vec![
2761            Some(true),
2762            Some(true),
2763            Some(true),
2764            Some(false),
2765            Some(false),
2766            Some(false),
2767            None,
2768            None,
2769            None,
2770        ])) as ArrayRef;
2771
2772        let expected = BooleanArray::from(vec![
2773            Some(true),
2774            Some(false),
2775            None,
2776            Some(false),
2777            Some(false),
2778            Some(false),
2779            None,
2780            Some(false),
2781            None,
2782        ]);
2783        apply_logic_op(&Arc::new(schema), &a, &b, Operator::And, expected)?;
2784
2785        Ok(())
2786    }
2787
2788    #[test]
2789    fn regex_with_nulls() -> Result<()> {
2790        let schema = Schema::new(vec![
2791            Field::new("a", DataType::Utf8, true),
2792            Field::new("b", DataType::Utf8, true),
2793        ]);
2794        let a = Arc::new(StringArray::from(vec![
2795            Some("abc"),
2796            None,
2797            Some("abc"),
2798            None,
2799            Some("abc"),
2800        ])) as ArrayRef;
2801        let b = Arc::new(StringArray::from(vec![
2802            Some("^a"),
2803            Some("^A"),
2804            None,
2805            None,
2806            Some("^(b|c)"),
2807        ])) as ArrayRef;
2808
2809        let regex_expected =
2810            BooleanArray::from(vec![Some(true), None, None, None, Some(false)]);
2811        let regex_not_expected =
2812            BooleanArray::from(vec![Some(false), None, None, None, Some(true)]);
2813        apply_logic_op(
2814            &Arc::new(schema.clone()),
2815            &a,
2816            &b,
2817            Operator::RegexMatch,
2818            regex_expected.clone(),
2819        )?;
2820        apply_logic_op(
2821            &Arc::new(schema.clone()),
2822            &a,
2823            &b,
2824            Operator::RegexIMatch,
2825            regex_expected.clone(),
2826        )?;
2827        apply_logic_op(
2828            &Arc::new(schema.clone()),
2829            &a,
2830            &b,
2831            Operator::RegexNotMatch,
2832            regex_not_expected.clone(),
2833        )?;
2834        apply_logic_op(
2835            &Arc::new(schema),
2836            &a,
2837            &b,
2838            Operator::RegexNotIMatch,
2839            regex_not_expected.clone(),
2840        )?;
2841
2842        let schema = Schema::new(vec![
2843            Field::new("a", DataType::LargeUtf8, true),
2844            Field::new("b", DataType::LargeUtf8, true),
2845        ]);
2846        let a = Arc::new(LargeStringArray::from(vec![
2847            Some("abc"),
2848            None,
2849            Some("abc"),
2850            None,
2851            Some("abc"),
2852        ])) as ArrayRef;
2853        let b = Arc::new(LargeStringArray::from(vec![
2854            Some("^a"),
2855            Some("^A"),
2856            None,
2857            None,
2858            Some("^(b|c)"),
2859        ])) as ArrayRef;
2860
2861        apply_logic_op(
2862            &Arc::new(schema.clone()),
2863            &a,
2864            &b,
2865            Operator::RegexMatch,
2866            regex_expected.clone(),
2867        )?;
2868        apply_logic_op(
2869            &Arc::new(schema.clone()),
2870            &a,
2871            &b,
2872            Operator::RegexIMatch,
2873            regex_expected,
2874        )?;
2875        apply_logic_op(
2876            &Arc::new(schema.clone()),
2877            &a,
2878            &b,
2879            Operator::RegexNotMatch,
2880            regex_not_expected.clone(),
2881        )?;
2882        apply_logic_op(
2883            &Arc::new(schema),
2884            &a,
2885            &b,
2886            Operator::RegexNotIMatch,
2887            regex_not_expected,
2888        )?;
2889
2890        Ok(())
2891    }
2892
2893    #[test]
2894    fn or_with_nulls_op() -> Result<()> {
2895        let schema = Schema::new(vec![
2896            Field::new("a", DataType::Boolean, true),
2897            Field::new("b", DataType::Boolean, true),
2898        ]);
2899        let a = Arc::new(BooleanArray::from(vec![
2900            Some(true),
2901            Some(false),
2902            None,
2903            Some(true),
2904            Some(false),
2905            None,
2906            Some(true),
2907            Some(false),
2908            None,
2909        ])) as ArrayRef;
2910        let b = Arc::new(BooleanArray::from(vec![
2911            Some(true),
2912            Some(true),
2913            Some(true),
2914            Some(false),
2915            Some(false),
2916            Some(false),
2917            None,
2918            None,
2919            None,
2920        ])) as ArrayRef;
2921
2922        let expected = BooleanArray::from(vec![
2923            Some(true),
2924            Some(true),
2925            Some(true),
2926            Some(true),
2927            Some(false),
2928            None,
2929            Some(true),
2930            None,
2931            None,
2932        ]);
2933        apply_logic_op(&Arc::new(schema), &a, &b, Operator::Or, expected)?;
2934
2935        Ok(())
2936    }
2937
2938    /// Returns (schema, a: BooleanArray, b: BooleanArray) with all possible inputs
2939    ///
2940    /// a: [true, true, true,  NULL, NULL, NULL,  false, false, false]
2941    /// b: [true, NULL, false, true, NULL, false, true,  NULL,  false]
2942    fn bool_test_arrays() -> (SchemaRef, ArrayRef, ArrayRef) {
2943        let schema = Schema::new(vec![
2944            Field::new("a", DataType::Boolean, true),
2945            Field::new("b", DataType::Boolean, true),
2946        ]);
2947        let a: BooleanArray = [
2948            Some(true),
2949            Some(true),
2950            Some(true),
2951            None,
2952            None,
2953            None,
2954            Some(false),
2955            Some(false),
2956            Some(false),
2957        ]
2958        .iter()
2959        .collect();
2960        let b: BooleanArray = [
2961            Some(true),
2962            None,
2963            Some(false),
2964            Some(true),
2965            None,
2966            Some(false),
2967            Some(true),
2968            None,
2969            Some(false),
2970        ]
2971        .iter()
2972        .collect();
2973        (Arc::new(schema), Arc::new(a), Arc::new(b))
2974    }
2975
2976    /// Returns (schema, BooleanArray) with [true, NULL, false]
2977    fn scalar_bool_test_array() -> (SchemaRef, ArrayRef) {
2978        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
2979        let a: BooleanArray = [Some(true), None, Some(false)].iter().collect();
2980        (Arc::new(schema), Arc::new(a))
2981    }
2982
2983    #[test]
2984    fn eq_op_bool() {
2985        let (schema, a, b) = bool_test_arrays();
2986        let expected = [
2987            Some(true),
2988            None,
2989            Some(false),
2990            None,
2991            None,
2992            None,
2993            Some(false),
2994            None,
2995            Some(true),
2996        ]
2997        .iter()
2998        .collect();
2999        apply_logic_op(&schema, &a, &b, Operator::Eq, expected).unwrap();
3000    }
3001
3002    #[test]
3003    fn eq_op_bool_scalar() {
3004        let (schema, a) = scalar_bool_test_array();
3005        let expected = [Some(true), None, Some(false)].iter().collect();
3006        apply_logic_op_scalar_arr(
3007            &schema,
3008            &ScalarValue::from(true),
3009            &a,
3010            Operator::Eq,
3011            &expected,
3012        )
3013        .unwrap();
3014        apply_logic_op_arr_scalar(
3015            &schema,
3016            &a,
3017            &ScalarValue::from(true),
3018            Operator::Eq,
3019            &expected,
3020        )
3021        .unwrap();
3022
3023        let expected = [Some(false), None, Some(true)].iter().collect();
3024        apply_logic_op_scalar_arr(
3025            &schema,
3026            &ScalarValue::from(false),
3027            &a,
3028            Operator::Eq,
3029            &expected,
3030        )
3031        .unwrap();
3032        apply_logic_op_arr_scalar(
3033            &schema,
3034            &a,
3035            &ScalarValue::from(false),
3036            Operator::Eq,
3037            &expected,
3038        )
3039        .unwrap();
3040    }
3041
3042    #[test]
3043    fn neq_op_bool() {
3044        let (schema, a, b) = bool_test_arrays();
3045        let expected = [
3046            Some(false),
3047            None,
3048            Some(true),
3049            None,
3050            None,
3051            None,
3052            Some(true),
3053            None,
3054            Some(false),
3055        ]
3056        .iter()
3057        .collect();
3058        apply_logic_op(&schema, &a, &b, Operator::NotEq, expected).unwrap();
3059    }
3060
3061    #[test]
3062    fn neq_op_bool_scalar() {
3063        let (schema, a) = scalar_bool_test_array();
3064        let expected = [Some(false), None, Some(true)].iter().collect();
3065        apply_logic_op_scalar_arr(
3066            &schema,
3067            &ScalarValue::from(true),
3068            &a,
3069            Operator::NotEq,
3070            &expected,
3071        )
3072        .unwrap();
3073        apply_logic_op_arr_scalar(
3074            &schema,
3075            &a,
3076            &ScalarValue::from(true),
3077            Operator::NotEq,
3078            &expected,
3079        )
3080        .unwrap();
3081
3082        let expected = [Some(true), None, Some(false)].iter().collect();
3083        apply_logic_op_scalar_arr(
3084            &schema,
3085            &ScalarValue::from(false),
3086            &a,
3087            Operator::NotEq,
3088            &expected,
3089        )
3090        .unwrap();
3091        apply_logic_op_arr_scalar(
3092            &schema,
3093            &a,
3094            &ScalarValue::from(false),
3095            Operator::NotEq,
3096            &expected,
3097        )
3098        .unwrap();
3099    }
3100
3101    #[test]
3102    fn lt_op_bool() {
3103        let (schema, a, b) = bool_test_arrays();
3104        let expected = [
3105            Some(false),
3106            None,
3107            Some(false),
3108            None,
3109            None,
3110            None,
3111            Some(true),
3112            None,
3113            Some(false),
3114        ]
3115        .iter()
3116        .collect();
3117        apply_logic_op(&schema, &a, &b, Operator::Lt, expected).unwrap();
3118    }
3119
3120    #[test]
3121    fn lt_op_bool_scalar() {
3122        let (schema, a) = scalar_bool_test_array();
3123        let expected = [Some(false), None, Some(false)].iter().collect();
3124        apply_logic_op_scalar_arr(
3125            &schema,
3126            &ScalarValue::from(true),
3127            &a,
3128            Operator::Lt,
3129            &expected,
3130        )
3131        .unwrap();
3132
3133        let expected = [Some(false), None, Some(true)].iter().collect();
3134        apply_logic_op_arr_scalar(
3135            &schema,
3136            &a,
3137            &ScalarValue::from(true),
3138            Operator::Lt,
3139            &expected,
3140        )
3141        .unwrap();
3142
3143        let expected = [Some(true), None, Some(false)].iter().collect();
3144        apply_logic_op_scalar_arr(
3145            &schema,
3146            &ScalarValue::from(false),
3147            &a,
3148            Operator::Lt,
3149            &expected,
3150        )
3151        .unwrap();
3152
3153        let expected = [Some(false), None, Some(false)].iter().collect();
3154        apply_logic_op_arr_scalar(
3155            &schema,
3156            &a,
3157            &ScalarValue::from(false),
3158            Operator::Lt,
3159            &expected,
3160        )
3161        .unwrap();
3162    }
3163
3164    #[test]
3165    fn lt_eq_op_bool() {
3166        let (schema, a, b) = bool_test_arrays();
3167        let expected = [
3168            Some(true),
3169            None,
3170            Some(false),
3171            None,
3172            None,
3173            None,
3174            Some(true),
3175            None,
3176            Some(true),
3177        ]
3178        .iter()
3179        .collect();
3180        apply_logic_op(&schema, &a, &b, Operator::LtEq, expected).unwrap();
3181    }
3182
3183    #[test]
3184    fn lt_eq_op_bool_scalar() {
3185        let (schema, a) = scalar_bool_test_array();
3186        let expected = [Some(true), None, Some(false)].iter().collect();
3187        apply_logic_op_scalar_arr(
3188            &schema,
3189            &ScalarValue::from(true),
3190            &a,
3191            Operator::LtEq,
3192            &expected,
3193        )
3194        .unwrap();
3195
3196        let expected = [Some(true), None, Some(true)].iter().collect();
3197        apply_logic_op_arr_scalar(
3198            &schema,
3199            &a,
3200            &ScalarValue::from(true),
3201            Operator::LtEq,
3202            &expected,
3203        )
3204        .unwrap();
3205
3206        let expected = [Some(true), None, Some(true)].iter().collect();
3207        apply_logic_op_scalar_arr(
3208            &schema,
3209            &ScalarValue::from(false),
3210            &a,
3211            Operator::LtEq,
3212            &expected,
3213        )
3214        .unwrap();
3215
3216        let expected = [Some(false), None, Some(true)].iter().collect();
3217        apply_logic_op_arr_scalar(
3218            &schema,
3219            &a,
3220            &ScalarValue::from(false),
3221            Operator::LtEq,
3222            &expected,
3223        )
3224        .unwrap();
3225    }
3226
3227    #[test]
3228    fn gt_op_bool() {
3229        let (schema, a, b) = bool_test_arrays();
3230        let expected = [
3231            Some(false),
3232            None,
3233            Some(true),
3234            None,
3235            None,
3236            None,
3237            Some(false),
3238            None,
3239            Some(false),
3240        ]
3241        .iter()
3242        .collect();
3243        apply_logic_op(&schema, &a, &b, Operator::Gt, expected).unwrap();
3244    }
3245
3246    #[test]
3247    fn gt_op_bool_scalar() {
3248        let (schema, a) = scalar_bool_test_array();
3249        let expected = [Some(false), None, Some(true)].iter().collect();
3250        apply_logic_op_scalar_arr(
3251            &schema,
3252            &ScalarValue::from(true),
3253            &a,
3254            Operator::Gt,
3255            &expected,
3256        )
3257        .unwrap();
3258
3259        let expected = [Some(false), None, Some(false)].iter().collect();
3260        apply_logic_op_arr_scalar(
3261            &schema,
3262            &a,
3263            &ScalarValue::from(true),
3264            Operator::Gt,
3265            &expected,
3266        )
3267        .unwrap();
3268
3269        let expected = [Some(false), None, Some(false)].iter().collect();
3270        apply_logic_op_scalar_arr(
3271            &schema,
3272            &ScalarValue::from(false),
3273            &a,
3274            Operator::Gt,
3275            &expected,
3276        )
3277        .unwrap();
3278
3279        let expected = [Some(true), None, Some(false)].iter().collect();
3280        apply_logic_op_arr_scalar(
3281            &schema,
3282            &a,
3283            &ScalarValue::from(false),
3284            Operator::Gt,
3285            &expected,
3286        )
3287        .unwrap();
3288    }
3289
3290    #[test]
3291    fn gt_eq_op_bool() {
3292        let (schema, a, b) = bool_test_arrays();
3293        let expected = [
3294            Some(true),
3295            None,
3296            Some(true),
3297            None,
3298            None,
3299            None,
3300            Some(false),
3301            None,
3302            Some(true),
3303        ]
3304        .iter()
3305        .collect();
3306        apply_logic_op(&schema, &a, &b, Operator::GtEq, expected).unwrap();
3307    }
3308
3309    #[test]
3310    fn gt_eq_op_bool_scalar() {
3311        let (schema, a) = scalar_bool_test_array();
3312        let expected = [Some(true), None, Some(true)].iter().collect();
3313        apply_logic_op_scalar_arr(
3314            &schema,
3315            &ScalarValue::from(true),
3316            &a,
3317            Operator::GtEq,
3318            &expected,
3319        )
3320        .unwrap();
3321
3322        let expected = [Some(true), None, Some(false)].iter().collect();
3323        apply_logic_op_arr_scalar(
3324            &schema,
3325            &a,
3326            &ScalarValue::from(true),
3327            Operator::GtEq,
3328            &expected,
3329        )
3330        .unwrap();
3331
3332        let expected = [Some(false), None, Some(true)].iter().collect();
3333        apply_logic_op_scalar_arr(
3334            &schema,
3335            &ScalarValue::from(false),
3336            &a,
3337            Operator::GtEq,
3338            &expected,
3339        )
3340        .unwrap();
3341
3342        let expected = [Some(true), None, Some(true)].iter().collect();
3343        apply_logic_op_arr_scalar(
3344            &schema,
3345            &a,
3346            &ScalarValue::from(false),
3347            Operator::GtEq,
3348            &expected,
3349        )
3350        .unwrap();
3351    }
3352
3353    #[test]
3354    fn is_distinct_from_op_bool() {
3355        let (schema, a, b) = bool_test_arrays();
3356        let expected = [
3357            Some(false),
3358            Some(true),
3359            Some(true),
3360            Some(true),
3361            Some(false),
3362            Some(true),
3363            Some(true),
3364            Some(true),
3365            Some(false),
3366        ]
3367        .iter()
3368        .collect();
3369        apply_logic_op(&schema, &a, &b, Operator::IsDistinctFrom, expected).unwrap();
3370    }
3371
3372    #[test]
3373    fn is_not_distinct_from_op_bool() {
3374        let (schema, a, b) = bool_test_arrays();
3375        let expected = [
3376            Some(true),
3377            Some(false),
3378            Some(false),
3379            Some(false),
3380            Some(true),
3381            Some(false),
3382            Some(false),
3383            Some(false),
3384            Some(true),
3385        ]
3386        .iter()
3387        .collect();
3388        apply_logic_op(&schema, &a, &b, Operator::IsNotDistinctFrom, expected).unwrap();
3389    }
3390
3391    #[test]
3392    fn relatively_deeply_nested() {
3393        // Reproducer for https://github.com/apache/datafusion/issues/419
3394
3395        // where even relatively shallow binary expressions overflowed
3396        // the stack in debug builds
3397
3398        let input: Vec<_> = vec![1, 2, 3, 4, 5].into_iter().map(Some).collect();
3399        let a: Int32Array = input.iter().collect();
3400
3401        let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(a) as _)]).unwrap();
3402        let schema = batch.schema();
3403
3404        // build a left deep tree ((((a + a) + a) + a ....
3405        let tree_depth: i32 = 100;
3406        let expr = (0..tree_depth)
3407            .map(|_| col("a", schema.as_ref()).unwrap())
3408            .reduce(|l, r| binary(l, Operator::Plus, r, &schema).unwrap())
3409            .unwrap();
3410
3411        let result = expr
3412            .evaluate(&batch)
3413            .expect("evaluation")
3414            .into_array(batch.num_rows())
3415            .expect("Failed to convert to array");
3416
3417        let expected: Int32Array = input
3418            .into_iter()
3419            .map(|i| i.map(|i| i * tree_depth))
3420            .collect();
3421        assert_eq!(result.as_ref(), &expected);
3422    }
3423
3424    fn create_decimal_array(
3425        array: &[Option<i128>],
3426        precision: u8,
3427        scale: i8,
3428    ) -> Decimal128Array {
3429        let mut decimal_builder = Decimal128Builder::with_capacity(array.len());
3430        for value in array.iter().copied() {
3431            decimal_builder.append_option(value)
3432        }
3433        decimal_builder
3434            .finish()
3435            .with_precision_and_scale(precision, scale)
3436            .unwrap()
3437    }
3438
3439    #[test]
3440    fn comparison_dict_decimal_scalar_expr_test() -> Result<()> {
3441        // scalar of decimal compare with dictionary decimal array
3442        let value_i128 = 123;
3443        let decimal_scalar = ScalarValue::Dictionary(
3444            Box::new(DataType::Int8),
3445            Box::new(ScalarValue::Decimal128(Some(value_i128), 25, 3)),
3446        );
3447        let schema = Arc::new(Schema::new(vec![Field::new(
3448            "a",
3449            DataType::Dictionary(
3450                Box::new(DataType::Int8),
3451                Box::new(DataType::Decimal128(25, 3)),
3452            ),
3453            true,
3454        )]));
3455        let decimal_array = Arc::new(create_decimal_array(
3456            &[
3457                Some(value_i128),
3458                None,
3459                Some(value_i128 - 1),
3460                Some(value_i128 + 1),
3461            ],
3462            25,
3463            3,
3464        ));
3465
3466        let keys = Int8Array::from(vec![Some(0), None, Some(2), Some(3)]);
3467        let dictionary =
3468            Arc::new(DictionaryArray::try_new(keys, decimal_array)?) as ArrayRef;
3469
3470        // array = scalar
3471        apply_logic_op_arr_scalar(
3472            &schema,
3473            &dictionary,
3474            &decimal_scalar,
3475            Operator::Eq,
3476            &BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3477        )
3478        .unwrap();
3479        // array != scalar
3480        apply_logic_op_arr_scalar(
3481            &schema,
3482            &dictionary,
3483            &decimal_scalar,
3484            Operator::NotEq,
3485            &BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3486        )
3487        .unwrap();
3488        //  array < scalar
3489        apply_logic_op_arr_scalar(
3490            &schema,
3491            &dictionary,
3492            &decimal_scalar,
3493            Operator::Lt,
3494            &BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3495        )
3496        .unwrap();
3497
3498        //  array <= scalar
3499        apply_logic_op_arr_scalar(
3500            &schema,
3501            &dictionary,
3502            &decimal_scalar,
3503            Operator::LtEq,
3504            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3505        )
3506        .unwrap();
3507        // array > scalar
3508        apply_logic_op_arr_scalar(
3509            &schema,
3510            &dictionary,
3511            &decimal_scalar,
3512            Operator::Gt,
3513            &BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3514        )
3515        .unwrap();
3516
3517        // array >= scalar
3518        apply_logic_op_arr_scalar(
3519            &schema,
3520            &dictionary,
3521            &decimal_scalar,
3522            Operator::GtEq,
3523            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3524        )
3525        .unwrap();
3526
3527        Ok(())
3528    }
3529
3530    #[test]
3531    fn comparison_decimal_expr_test() -> Result<()> {
3532        // scalar of decimal compare with decimal array
3533        let value_i128 = 123;
3534        let decimal_scalar = ScalarValue::Decimal128(Some(value_i128), 25, 3);
3535        let schema = Arc::new(Schema::new(vec![Field::new(
3536            "a",
3537            DataType::Decimal128(25, 3),
3538            true,
3539        )]));
3540        let decimal_array = Arc::new(create_decimal_array(
3541            &[
3542                Some(value_i128),
3543                None,
3544                Some(value_i128 - 1),
3545                Some(value_i128 + 1),
3546            ],
3547            25,
3548            3,
3549        )) as ArrayRef;
3550        // array = scalar
3551        apply_logic_op_arr_scalar(
3552            &schema,
3553            &decimal_array,
3554            &decimal_scalar,
3555            Operator::Eq,
3556            &BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3557        )
3558        .unwrap();
3559        // array != scalar
3560        apply_logic_op_arr_scalar(
3561            &schema,
3562            &decimal_array,
3563            &decimal_scalar,
3564            Operator::NotEq,
3565            &BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3566        )
3567        .unwrap();
3568        //  array < scalar
3569        apply_logic_op_arr_scalar(
3570            &schema,
3571            &decimal_array,
3572            &decimal_scalar,
3573            Operator::Lt,
3574            &BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3575        )
3576        .unwrap();
3577
3578        //  array <= scalar
3579        apply_logic_op_arr_scalar(
3580            &schema,
3581            &decimal_array,
3582            &decimal_scalar,
3583            Operator::LtEq,
3584            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3585        )
3586        .unwrap();
3587        // array > scalar
3588        apply_logic_op_arr_scalar(
3589            &schema,
3590            &decimal_array,
3591            &decimal_scalar,
3592            Operator::Gt,
3593            &BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3594        )
3595        .unwrap();
3596
3597        // array >= scalar
3598        apply_logic_op_arr_scalar(
3599            &schema,
3600            &decimal_array,
3601            &decimal_scalar,
3602            Operator::GtEq,
3603            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3604        )
3605        .unwrap();
3606
3607        // scalar of different data type with decimal array
3608        let decimal_scalar = ScalarValue::Decimal128(Some(123_456), 10, 3);
3609        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
3610        // scalar == array
3611        apply_logic_op_scalar_arr(
3612            &schema,
3613            &decimal_scalar,
3614            &(Arc::new(Int64Array::from(vec![Some(124), None])) as ArrayRef),
3615            Operator::Eq,
3616            &BooleanArray::from(vec![Some(false), None]),
3617        )
3618        .unwrap();
3619
3620        // array != scalar
3621        apply_logic_op_arr_scalar(
3622            &schema,
3623            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(1)])) as ArrayRef),
3624            &decimal_scalar,
3625            Operator::NotEq,
3626            &BooleanArray::from(vec![Some(true), None, Some(true)]),
3627        )
3628        .unwrap();
3629
3630        // array < scalar
3631        apply_logic_op_arr_scalar(
3632            &schema,
3633            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(124)])) as ArrayRef),
3634            &decimal_scalar,
3635            Operator::Lt,
3636            &BooleanArray::from(vec![Some(true), None, Some(false)]),
3637        )
3638        .unwrap();
3639
3640        // array > scalar
3641        apply_logic_op_arr_scalar(
3642            &schema,
3643            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(124)])) as ArrayRef),
3644            &decimal_scalar,
3645            Operator::Gt,
3646            &BooleanArray::from(vec![Some(false), None, Some(true)]),
3647        )
3648        .unwrap();
3649
3650        let schema =
3651            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
3652        // array == scalar
3653        apply_logic_op_arr_scalar(
3654            &schema,
3655            &(Arc::new(Float64Array::from(vec![Some(123.456), None, Some(123.457)]))
3656                as ArrayRef),
3657            &decimal_scalar,
3658            Operator::Eq,
3659            &BooleanArray::from(vec![Some(true), None, Some(false)]),
3660        )
3661        .unwrap();
3662
3663        // array <= scalar
3664        apply_logic_op_arr_scalar(
3665            &schema,
3666            &(Arc::new(Float64Array::from(vec![
3667                Some(123.456),
3668                None,
3669                Some(123.457),
3670                Some(123.45),
3671            ])) as ArrayRef),
3672            &decimal_scalar,
3673            Operator::LtEq,
3674            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3675        )
3676        .unwrap();
3677        // array >= scalar
3678        apply_logic_op_arr_scalar(
3679            &schema,
3680            &(Arc::new(Float64Array::from(vec![
3681                Some(123.456),
3682                None,
3683                Some(123.457),
3684                Some(123.45),
3685            ])) as ArrayRef),
3686            &decimal_scalar,
3687            Operator::GtEq,
3688            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3689        )
3690        .unwrap();
3691
3692        let value: i128 = 123;
3693        let decimal_array = Arc::new(create_decimal_array(
3694            &[Some(value), None, Some(value - 1), Some(value + 1)],
3695            10,
3696            0,
3697        )) as ArrayRef;
3698
3699        // comparison array op for decimal array
3700        let schema = Arc::new(Schema::new(vec![
3701            Field::new("a", DataType::Decimal128(10, 0), true),
3702            Field::new("b", DataType::Decimal128(10, 0), true),
3703        ]));
3704        let right_decimal_array = Arc::new(create_decimal_array(
3705            &[
3706                Some(value - 1),
3707                Some(value),
3708                Some(value + 1),
3709                Some(value + 1),
3710            ],
3711            10,
3712            0,
3713        )) as ArrayRef;
3714
3715        apply_logic_op(
3716            &schema,
3717            &decimal_array,
3718            &right_decimal_array,
3719            Operator::Eq,
3720            BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3721        )
3722        .unwrap();
3723
3724        apply_logic_op(
3725            &schema,
3726            &decimal_array,
3727            &right_decimal_array,
3728            Operator::NotEq,
3729            BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3730        )
3731        .unwrap();
3732
3733        apply_logic_op(
3734            &schema,
3735            &decimal_array,
3736            &right_decimal_array,
3737            Operator::Lt,
3738            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3739        )
3740        .unwrap();
3741
3742        apply_logic_op(
3743            &schema,
3744            &decimal_array,
3745            &right_decimal_array,
3746            Operator::LtEq,
3747            BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3748        )
3749        .unwrap();
3750
3751        apply_logic_op(
3752            &schema,
3753            &decimal_array,
3754            &right_decimal_array,
3755            Operator::Gt,
3756            BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3757        )
3758        .unwrap();
3759
3760        apply_logic_op(
3761            &schema,
3762            &decimal_array,
3763            &right_decimal_array,
3764            Operator::GtEq,
3765            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3766        )
3767        .unwrap();
3768
3769        // compare decimal array with other array type
3770        let value: i64 = 123;
3771        let schema = Arc::new(Schema::new(vec![
3772            Field::new("a", DataType::Int64, true),
3773            Field::new("b", DataType::Decimal128(10, 0), true),
3774        ]));
3775
3776        let int64_array = Arc::new(Int64Array::from(vec![
3777            Some(value),
3778            Some(value - 1),
3779            Some(value),
3780            Some(value + 1),
3781        ])) as ArrayRef;
3782
3783        // eq: int64array == decimal array
3784        apply_logic_op(
3785            &schema,
3786            &int64_array,
3787            &decimal_array,
3788            Operator::Eq,
3789            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3790        )
3791        .unwrap();
3792        // neq: int64array != decimal array
3793        apply_logic_op(
3794            &schema,
3795            &int64_array,
3796            &decimal_array,
3797            Operator::NotEq,
3798            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3799        )
3800        .unwrap();
3801
3802        let schema = Arc::new(Schema::new(vec![
3803            Field::new("a", DataType::Float64, true),
3804            Field::new("b", DataType::Decimal128(10, 2), true),
3805        ]));
3806
3807        let value: i128 = 123;
3808        let decimal_array = Arc::new(create_decimal_array(
3809            &[
3810                Some(value), // 1.23
3811                None,
3812                Some(value - 1), // 1.22
3813                Some(value + 1), // 1.24
3814            ],
3815            10,
3816            2,
3817        )) as ArrayRef;
3818        let float64_array = Arc::new(Float64Array::from(vec![
3819            Some(1.23),
3820            Some(1.22),
3821            Some(1.23),
3822            Some(1.24),
3823        ])) as ArrayRef;
3824        // lt: float64array < decimal array
3825        apply_logic_op(
3826            &schema,
3827            &float64_array,
3828            &decimal_array,
3829            Operator::Lt,
3830            BooleanArray::from(vec![Some(false), None, Some(false), Some(false)]),
3831        )
3832        .unwrap();
3833        // lt_eq: float64array <= decimal array
3834        apply_logic_op(
3835            &schema,
3836            &float64_array,
3837            &decimal_array,
3838            Operator::LtEq,
3839            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3840        )
3841        .unwrap();
3842        // gt: float64array > decimal array
3843        apply_logic_op(
3844            &schema,
3845            &float64_array,
3846            &decimal_array,
3847            Operator::Gt,
3848            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3849        )
3850        .unwrap();
3851        apply_logic_op(
3852            &schema,
3853            &float64_array,
3854            &decimal_array,
3855            Operator::GtEq,
3856            BooleanArray::from(vec![Some(true), None, Some(true), Some(true)]),
3857        )
3858        .unwrap();
3859        // is distinct: float64array is distinct decimal array
3860        // TODO: now we do not refactor the `is distinct or is not distinct` rule of coercion.
3861        // traced by https://github.com/apache/datafusion/issues/1590
3862        // the decimal array will be casted to float64array
3863        apply_logic_op(
3864            &schema,
3865            &float64_array,
3866            &decimal_array,
3867            Operator::IsDistinctFrom,
3868            BooleanArray::from(vec![Some(false), Some(true), Some(true), Some(false)]),
3869        )
3870        .unwrap();
3871        // is not distinct
3872        apply_logic_op(
3873            &schema,
3874            &float64_array,
3875            &decimal_array,
3876            Operator::IsNotDistinctFrom,
3877            BooleanArray::from(vec![Some(true), Some(false), Some(false), Some(true)]),
3878        )
3879        .unwrap();
3880
3881        Ok(())
3882    }
3883
3884    fn apply_decimal_arithmetic_op(
3885        schema: &SchemaRef,
3886        left: &ArrayRef,
3887        right: &ArrayRef,
3888        op: Operator,
3889        expected: ArrayRef,
3890    ) -> Result<()> {
3891        let arithmetic_op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
3892        let data: Vec<ArrayRef> = vec![Arc::clone(left), Arc::clone(right)];
3893        let batch = RecordBatch::try_new(Arc::clone(schema), data)?;
3894        let result = arithmetic_op
3895            .evaluate(&batch)?
3896            .into_array(batch.num_rows())
3897            .expect("Failed to convert to array");
3898
3899        assert_eq!(result.as_ref(), expected.as_ref());
3900        Ok(())
3901    }
3902
3903    #[test]
3904    fn arithmetic_decimal_expr_test() -> Result<()> {
3905        let schema = Arc::new(Schema::new(vec![
3906            Field::new("a", DataType::Int32, true),
3907            Field::new("b", DataType::Decimal128(10, 2), true),
3908        ]));
3909        let value: i128 = 123;
3910        let decimal_array = Arc::new(create_decimal_array(
3911            &[
3912                Some(value), // 1.23
3913                None,
3914                Some(value - 1), // 1.22
3915                Some(value + 1), // 1.24
3916            ],
3917            10,
3918            2,
3919        )) as ArrayRef;
3920        let int32_array = Arc::new(Int32Array::from(vec![
3921            Some(123),
3922            Some(122),
3923            Some(123),
3924            Some(124),
3925        ])) as ArrayRef;
3926
3927        // add: Int32array add decimal array
3928        let expect = Arc::new(create_decimal_array(
3929            &[Some(12423), None, Some(12422), Some(12524)],
3930            13,
3931            2,
3932        )) as ArrayRef;
3933        apply_decimal_arithmetic_op(
3934            &schema,
3935            &int32_array,
3936            &decimal_array,
3937            Operator::Plus,
3938            expect,
3939        )
3940        .unwrap();
3941
3942        // subtract: decimal array subtract int32 array
3943        let schema = Arc::new(Schema::new(vec![
3944            Field::new("a", DataType::Decimal128(10, 2), true),
3945            Field::new("b", DataType::Int32, true),
3946        ]));
3947        let expect = Arc::new(create_decimal_array(
3948            &[Some(-12177), None, Some(-12178), Some(-12276)],
3949            13,
3950            2,
3951        )) as ArrayRef;
3952        apply_decimal_arithmetic_op(
3953            &schema,
3954            &decimal_array,
3955            &int32_array,
3956            Operator::Minus,
3957            expect,
3958        )
3959        .unwrap();
3960
3961        // multiply: decimal array multiply int32 array
3962        let expect = Arc::new(create_decimal_array(
3963            &[Some(15129), None, Some(15006), Some(15376)],
3964            21,
3965            2,
3966        )) as ArrayRef;
3967        apply_decimal_arithmetic_op(
3968            &schema,
3969            &decimal_array,
3970            &int32_array,
3971            Operator::Multiply,
3972            expect,
3973        )
3974        .unwrap();
3975
3976        // divide: int32 array divide decimal array
3977        let schema = Arc::new(Schema::new(vec![
3978            Field::new("a", DataType::Int32, true),
3979            Field::new("b", DataType::Decimal128(10, 2), true),
3980        ]));
3981        let expect = Arc::new(create_decimal_array(
3982            &[Some(1000000), None, Some(1008196), Some(1000000)],
3983            16,
3984            4,
3985        )) as ArrayRef;
3986        apply_decimal_arithmetic_op(
3987            &schema,
3988            &int32_array,
3989            &decimal_array,
3990            Operator::Divide,
3991            expect,
3992        )
3993        .unwrap();
3994
3995        // modulus: int32 array modulus decimal array
3996        let schema = Arc::new(Schema::new(vec![
3997            Field::new("a", DataType::Int32, true),
3998            Field::new("b", DataType::Decimal128(10, 2), true),
3999        ]));
4000        let expect = Arc::new(create_decimal_array(
4001            &[Some(000), None, Some(100), Some(000)],
4002            10,
4003            2,
4004        )) as ArrayRef;
4005        apply_decimal_arithmetic_op(
4006            &schema,
4007            &int32_array,
4008            &decimal_array,
4009            Operator::Modulo,
4010            expect,
4011        )
4012        .unwrap();
4013
4014        Ok(())
4015    }
4016
4017    #[test]
4018    fn arithmetic_decimal_float_expr_test() -> Result<()> {
4019        let schema = Arc::new(Schema::new(vec![
4020            Field::new("a", DataType::Float64, true),
4021            Field::new("b", DataType::Decimal128(10, 2), true),
4022        ]));
4023        let value: i128 = 123;
4024        let decimal_array = Arc::new(create_decimal_array(
4025            &[
4026                Some(value), // 1.23
4027                None,
4028                Some(value - 1), // 1.22
4029                Some(value + 1), // 1.24
4030            ],
4031            10,
4032            2,
4033        )) as ArrayRef;
4034        let float64_array = Arc::new(Float64Array::from(vec![
4035            Some(123.0),
4036            Some(122.0),
4037            Some(123.0),
4038            Some(124.0),
4039        ])) as ArrayRef;
4040
4041        // add: float64 array add decimal array
4042        let expect = Arc::new(Float64Array::from(vec![
4043            Some(124.23),
4044            None,
4045            Some(124.22),
4046            Some(125.24),
4047        ])) as ArrayRef;
4048        apply_decimal_arithmetic_op(
4049            &schema,
4050            &float64_array,
4051            &decimal_array,
4052            Operator::Plus,
4053            expect,
4054        )
4055        .unwrap();
4056
4057        // subtract: decimal array subtract float64 array
4058        let schema = Arc::new(Schema::new(vec![
4059            Field::new("a", DataType::Float64, true),
4060            Field::new("b", DataType::Decimal128(10, 2), true),
4061        ]));
4062        let expect = Arc::new(Float64Array::from(vec![
4063            Some(121.77),
4064            None,
4065            Some(121.78),
4066            Some(122.76),
4067        ])) as ArrayRef;
4068        apply_decimal_arithmetic_op(
4069            &schema,
4070            &float64_array,
4071            &decimal_array,
4072            Operator::Minus,
4073            expect,
4074        )
4075        .unwrap();
4076
4077        // multiply: decimal array multiply float64 array
4078        let expect = Arc::new(Float64Array::from(vec![
4079            Some(151.29),
4080            None,
4081            Some(150.06),
4082            Some(153.76),
4083        ])) as ArrayRef;
4084        apply_decimal_arithmetic_op(
4085            &schema,
4086            &float64_array,
4087            &decimal_array,
4088            Operator::Multiply,
4089            expect,
4090        )
4091        .unwrap();
4092
4093        // divide: float64 array divide decimal array
4094        let schema = Arc::new(Schema::new(vec![
4095            Field::new("a", DataType::Float64, true),
4096            Field::new("b", DataType::Decimal128(10, 2), true),
4097        ]));
4098        let expect = Arc::new(Float64Array::from(vec![
4099            Some(100.0),
4100            None,
4101            Some(100.81967213114754),
4102            Some(100.0),
4103        ])) as ArrayRef;
4104        apply_decimal_arithmetic_op(
4105            &schema,
4106            &float64_array,
4107            &decimal_array,
4108            Operator::Divide,
4109            expect,
4110        )
4111        .unwrap();
4112
4113        // modulus: float64 array modulus decimal array
4114        let schema = Arc::new(Schema::new(vec![
4115            Field::new("a", DataType::Float64, true),
4116            Field::new("b", DataType::Decimal128(10, 2), true),
4117        ]));
4118        let expect = Arc::new(Float64Array::from(vec![
4119            Some(1.7763568394002505e-15),
4120            None,
4121            Some(1.0000000000000027),
4122            Some(8.881784197001252e-16),
4123        ])) as ArrayRef;
4124        apply_decimal_arithmetic_op(
4125            &schema,
4126            &float64_array,
4127            &decimal_array,
4128            Operator::Modulo,
4129            expect,
4130        )
4131        .unwrap();
4132
4133        Ok(())
4134    }
4135
4136    #[test]
4137    fn arithmetic_divide_zero() -> Result<()> {
4138        // other data type
4139        let schema = Arc::new(Schema::new(vec![
4140            Field::new("a", DataType::Int32, true),
4141            Field::new("b", DataType::Int32, true),
4142        ]));
4143        let a = Arc::new(Int32Array::from(vec![100]));
4144        let b = Arc::new(Int32Array::from(vec![0]));
4145
4146        let err = apply_arithmetic::<Int32Type>(
4147            schema,
4148            vec![a, b],
4149            Operator::Divide,
4150            Int32Array::from(vec![Some(4), Some(8), Some(16), Some(32), Some(64)]),
4151        )
4152        .unwrap_err();
4153
4154        let _expected = plan_datafusion_err!("Divide by zero");
4155
4156        assert!(matches!(err, ref _expected), "{err}");
4157
4158        // decimal
4159        let schema = Arc::new(Schema::new(vec![
4160            Field::new("a", DataType::Decimal128(25, 3), true),
4161            Field::new("b", DataType::Decimal128(25, 3), true),
4162        ]));
4163        let left_decimal_array = Arc::new(create_decimal_array(&[Some(1234567)], 25, 3));
4164        let right_decimal_array = Arc::new(create_decimal_array(&[Some(0)], 25, 3));
4165
4166        let err = apply_arithmetic::<Decimal128Type>(
4167            schema,
4168            vec![left_decimal_array, right_decimal_array],
4169            Operator::Divide,
4170            create_decimal_array(
4171                &[Some(12345670000000000000000000000000000), None],
4172                38,
4173                29,
4174            ),
4175        )
4176        .unwrap_err();
4177
4178        assert!(matches!(err, ref _expected), "{err}");
4179
4180        Ok(())
4181    }
4182
4183    #[test]
4184    fn bitwise_array_test() -> Result<()> {
4185        let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4186        let right =
4187            Arc::new(Int32Array::from(vec![Some(1), Some(3), Some(7)])) as ArrayRef;
4188        let mut result = bitwise_and_dyn(Arc::clone(&left), Arc::clone(&right))?;
4189        let expected = Int32Array::from(vec![Some(0), None, Some(3)]);
4190        assert_eq!(result.as_ref(), &expected);
4191
4192        result = bitwise_or_dyn(Arc::clone(&left), Arc::clone(&right))?;
4193        let expected = Int32Array::from(vec![Some(13), None, Some(15)]);
4194        assert_eq!(result.as_ref(), &expected);
4195
4196        result = bitwise_xor_dyn(Arc::clone(&left), Arc::clone(&right))?;
4197        let expected = Int32Array::from(vec![Some(13), None, Some(12)]);
4198        assert_eq!(result.as_ref(), &expected);
4199
4200        let left =
4201            Arc::new(UInt32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4202        let right =
4203            Arc::new(UInt32Array::from(vec![Some(1), Some(3), Some(7)])) as ArrayRef;
4204        let mut result = bitwise_and_dyn(Arc::clone(&left), Arc::clone(&right))?;
4205        let expected = UInt32Array::from(vec![Some(0), None, Some(3)]);
4206        assert_eq!(result.as_ref(), &expected);
4207
4208        result = bitwise_or_dyn(Arc::clone(&left), Arc::clone(&right))?;
4209        let expected = UInt32Array::from(vec![Some(13), None, Some(15)]);
4210        assert_eq!(result.as_ref(), &expected);
4211
4212        result = bitwise_xor_dyn(Arc::clone(&left), Arc::clone(&right))?;
4213        let expected = UInt32Array::from(vec![Some(13), None, Some(12)]);
4214        assert_eq!(result.as_ref(), &expected);
4215
4216        Ok(())
4217    }
4218
4219    #[test]
4220    fn bitwise_shift_array_test() -> Result<()> {
4221        let input = Arc::new(Int32Array::from(vec![Some(2), None, Some(10)])) as ArrayRef;
4222        let modules =
4223            Arc::new(Int32Array::from(vec![Some(2), Some(4), Some(8)])) as ArrayRef;
4224        let mut result =
4225            bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4226
4227        let expected = Int32Array::from(vec![Some(8), None, Some(2560)]);
4228        assert_eq!(result.as_ref(), &expected);
4229
4230        result = bitwise_shift_right_dyn(Arc::clone(&result), Arc::clone(&modules))?;
4231        assert_eq!(result.as_ref(), &input);
4232
4233        let input =
4234            Arc::new(UInt32Array::from(vec![Some(2), None, Some(10)])) as ArrayRef;
4235        let modules =
4236            Arc::new(UInt32Array::from(vec![Some(2), Some(4), Some(8)])) as ArrayRef;
4237        let mut result =
4238            bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4239
4240        let expected = UInt32Array::from(vec![Some(8), None, Some(2560)]);
4241        assert_eq!(result.as_ref(), &expected);
4242
4243        result = bitwise_shift_right_dyn(Arc::clone(&result), Arc::clone(&modules))?;
4244        assert_eq!(result.as_ref(), &input);
4245        Ok(())
4246    }
4247
4248    #[test]
4249    fn bitwise_shift_array_overflow_test() -> Result<()> {
4250        let input = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
4251        let modules = Arc::new(Int32Array::from(vec![Some(100)])) as ArrayRef;
4252        let result = bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4253
4254        let expected = Int32Array::from(vec![Some(32)]);
4255        assert_eq!(result.as_ref(), &expected);
4256
4257        let input = Arc::new(UInt32Array::from(vec![Some(2)])) as ArrayRef;
4258        let modules = Arc::new(UInt32Array::from(vec![Some(100)])) as ArrayRef;
4259        let result = bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4260
4261        let expected = UInt32Array::from(vec![Some(32)]);
4262        assert_eq!(result.as_ref(), &expected);
4263        Ok(())
4264    }
4265
4266    #[test]
4267    fn bitwise_scalar_test() -> Result<()> {
4268        let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4269        let right = ScalarValue::from(3i32);
4270        let mut result = bitwise_and_dyn_scalar(&left, right.clone()).unwrap()?;
4271        let expected = Int32Array::from(vec![Some(0), None, Some(3)]);
4272        assert_eq!(result.as_ref(), &expected);
4273
4274        result = bitwise_or_dyn_scalar(&left, right.clone()).unwrap()?;
4275        let expected = Int32Array::from(vec![Some(15), None, Some(11)]);
4276        assert_eq!(result.as_ref(), &expected);
4277
4278        result = bitwise_xor_dyn_scalar(&left, right).unwrap()?;
4279        let expected = Int32Array::from(vec![Some(15), None, Some(8)]);
4280        assert_eq!(result.as_ref(), &expected);
4281
4282        let left =
4283            Arc::new(UInt32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4284        let right = ScalarValue::from(3u32);
4285        let mut result = bitwise_and_dyn_scalar(&left, right.clone()).unwrap()?;
4286        let expected = UInt32Array::from(vec![Some(0), None, Some(3)]);
4287        assert_eq!(result.as_ref(), &expected);
4288
4289        result = bitwise_or_dyn_scalar(&left, right.clone()).unwrap()?;
4290        let expected = UInt32Array::from(vec![Some(15), None, Some(11)]);
4291        assert_eq!(result.as_ref(), &expected);
4292
4293        result = bitwise_xor_dyn_scalar(&left, right).unwrap()?;
4294        let expected = UInt32Array::from(vec![Some(15), None, Some(8)]);
4295        assert_eq!(result.as_ref(), &expected);
4296        Ok(())
4297    }
4298
4299    #[test]
4300    fn bitwise_shift_scalar_test() -> Result<()> {
4301        let input = Arc::new(Int32Array::from(vec![Some(2), None, Some(4)])) as ArrayRef;
4302        let module = ScalarValue::from(10i32);
4303        let mut result =
4304            bitwise_shift_left_dyn_scalar(&input, module.clone()).unwrap()?;
4305
4306        let expected = Int32Array::from(vec![Some(2048), None, Some(4096)]);
4307        assert_eq!(result.as_ref(), &expected);
4308
4309        result = bitwise_shift_right_dyn_scalar(&result, module).unwrap()?;
4310        assert_eq!(result.as_ref(), &input);
4311
4312        let input = Arc::new(UInt32Array::from(vec![Some(2), None, Some(4)])) as ArrayRef;
4313        let module = ScalarValue::from(10u32);
4314        let mut result =
4315            bitwise_shift_left_dyn_scalar(&input, module.clone()).unwrap()?;
4316
4317        let expected = UInt32Array::from(vec![Some(2048), None, Some(4096)]);
4318        assert_eq!(result.as_ref(), &expected);
4319
4320        result = bitwise_shift_right_dyn_scalar(&result, module).unwrap()?;
4321        assert_eq!(result.as_ref(), &input);
4322        Ok(())
4323    }
4324
4325    #[test]
4326    fn test_display_and_or_combo() {
4327        let expr = BinaryExpr::new(
4328            Arc::new(BinaryExpr::new(
4329                lit(ScalarValue::from(1)),
4330                Operator::And,
4331                lit(ScalarValue::from(2)),
4332            )),
4333            Operator::And,
4334            Arc::new(BinaryExpr::new(
4335                lit(ScalarValue::from(3)),
4336                Operator::And,
4337                lit(ScalarValue::from(4)),
4338            )),
4339        );
4340        assert_eq!(expr.to_string(), "1 AND 2 AND 3 AND 4");
4341
4342        let expr = BinaryExpr::new(
4343            Arc::new(BinaryExpr::new(
4344                lit(ScalarValue::from(1)),
4345                Operator::Or,
4346                lit(ScalarValue::from(2)),
4347            )),
4348            Operator::Or,
4349            Arc::new(BinaryExpr::new(
4350                lit(ScalarValue::from(3)),
4351                Operator::Or,
4352                lit(ScalarValue::from(4)),
4353            )),
4354        );
4355        assert_eq!(expr.to_string(), "1 OR 2 OR 3 OR 4");
4356
4357        let expr = BinaryExpr::new(
4358            Arc::new(BinaryExpr::new(
4359                lit(ScalarValue::from(1)),
4360                Operator::And,
4361                lit(ScalarValue::from(2)),
4362            )),
4363            Operator::Or,
4364            Arc::new(BinaryExpr::new(
4365                lit(ScalarValue::from(3)),
4366                Operator::And,
4367                lit(ScalarValue::from(4)),
4368            )),
4369        );
4370        assert_eq!(expr.to_string(), "1 AND 2 OR 3 AND 4");
4371
4372        let expr = BinaryExpr::new(
4373            Arc::new(BinaryExpr::new(
4374                lit(ScalarValue::from(1)),
4375                Operator::Or,
4376                lit(ScalarValue::from(2)),
4377            )),
4378            Operator::And,
4379            Arc::new(BinaryExpr::new(
4380                lit(ScalarValue::from(3)),
4381                Operator::Or,
4382                lit(ScalarValue::from(4)),
4383            )),
4384        );
4385        assert_eq!(expr.to_string(), "(1 OR 2) AND (3 OR 4)");
4386    }
4387
4388    #[test]
4389    fn test_to_result_type_array() {
4390        let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
4391        let keys = Int8Array::from(vec![Some(0), None, Some(2), Some(3)]);
4392        let dictionary =
4393            Arc::new(DictionaryArray::try_new(keys, values).unwrap()) as ArrayRef;
4394
4395        // Casting Dictionary to Int32
4396        let casted = to_result_type_array(
4397            &Operator::Plus,
4398            Arc::clone(&dictionary),
4399            &DataType::Int32,
4400        )
4401        .unwrap();
4402        assert_eq!(
4403            &casted,
4404            &(Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]))
4405                as ArrayRef)
4406        );
4407
4408        // Array has same datatype as result type, no casting
4409        let casted = to_result_type_array(
4410            &Operator::Plus,
4411            Arc::clone(&dictionary),
4412            dictionary.data_type(),
4413        )
4414        .unwrap();
4415        assert_eq!(&casted, &dictionary);
4416
4417        // Not numerical operator, no casting
4418        let casted = to_result_type_array(
4419            &Operator::Eq,
4420            Arc::clone(&dictionary),
4421            &DataType::Int32,
4422        )
4423        .unwrap();
4424        assert_eq!(&casted, &dictionary);
4425    }
4426
4427    #[test]
4428    fn test_add_with_overflow() -> Result<()> {
4429        // create test data
4430        let l = Arc::new(Int32Array::from(vec![1, i32::MAX]));
4431        let r = Arc::new(Int32Array::from(vec![2, 1]));
4432        let schema = Arc::new(Schema::new(vec![
4433            Field::new("l", DataType::Int32, false),
4434            Field::new("r", DataType::Int32, false),
4435        ]));
4436        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4437
4438        // create expression
4439        let expr = BinaryExpr::new(
4440            Arc::new(Column::new("l", 0)),
4441            Operator::Plus,
4442            Arc::new(Column::new("r", 1)),
4443        )
4444        .with_fail_on_overflow(true);
4445
4446        // evaluate expression
4447        let result = expr.evaluate(&batch);
4448        assert!(result
4449            .err()
4450            .unwrap()
4451            .to_string()
4452            .contains("Overflow happened on: 2147483647 + 1"));
4453        Ok(())
4454    }
4455
4456    #[test]
4457    fn test_subtract_with_overflow() -> Result<()> {
4458        // create test data
4459        let l = Arc::new(Int32Array::from(vec![1, i32::MIN]));
4460        let r = Arc::new(Int32Array::from(vec![2, 1]));
4461        let schema = Arc::new(Schema::new(vec![
4462            Field::new("l", DataType::Int32, false),
4463            Field::new("r", DataType::Int32, false),
4464        ]));
4465        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4466
4467        // create expression
4468        let expr = BinaryExpr::new(
4469            Arc::new(Column::new("l", 0)),
4470            Operator::Minus,
4471            Arc::new(Column::new("r", 1)),
4472        )
4473        .with_fail_on_overflow(true);
4474
4475        // evaluate expression
4476        let result = expr.evaluate(&batch);
4477        assert!(result
4478            .err()
4479            .unwrap()
4480            .to_string()
4481            .contains("Overflow happened on: -2147483648 - 1"));
4482        Ok(())
4483    }
4484
4485    #[test]
4486    fn test_mul_with_overflow() -> Result<()> {
4487        // create test data
4488        let l = Arc::new(Int32Array::from(vec![1, i32::MAX]));
4489        let r = Arc::new(Int32Array::from(vec![2, 2]));
4490        let schema = Arc::new(Schema::new(vec![
4491            Field::new("l", DataType::Int32, false),
4492            Field::new("r", DataType::Int32, false),
4493        ]));
4494        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4495
4496        // create expression
4497        let expr = BinaryExpr::new(
4498            Arc::new(Column::new("l", 0)),
4499            Operator::Multiply,
4500            Arc::new(Column::new("r", 1)),
4501        )
4502        .with_fail_on_overflow(true);
4503
4504        // evaluate expression
4505        let result = expr.evaluate(&batch);
4506        assert!(result
4507            .err()
4508            .unwrap()
4509            .to_string()
4510            .contains("Overflow happened on: 2147483647 * 2"));
4511        Ok(())
4512    }
4513
4514    /// Test helper for SIMILAR TO binary operation
4515    fn apply_similar_to(
4516        schema: &SchemaRef,
4517        va: Vec<&str>,
4518        vb: Vec<&str>,
4519        negated: bool,
4520        case_insensitive: bool,
4521        expected: &BooleanArray,
4522    ) -> Result<()> {
4523        let a = StringArray::from(va);
4524        let b = StringArray::from(vb);
4525        let op = similar_to(
4526            negated,
4527            case_insensitive,
4528            col("a", schema)?,
4529            col("b", schema)?,
4530        )?;
4531        let batch =
4532            RecordBatch::try_new(Arc::clone(schema), vec![Arc::new(a), Arc::new(b)])?;
4533        let result = op
4534            .evaluate(&batch)?
4535            .into_array(batch.num_rows())
4536            .expect("Failed to convert to array");
4537        assert_eq!(result.as_ref(), expected);
4538
4539        Ok(())
4540    }
4541
4542    #[test]
4543    fn test_similar_to() {
4544        let schema = Arc::new(Schema::new(vec![
4545            Field::new("a", DataType::Utf8, false),
4546            Field::new("b", DataType::Utf8, false),
4547        ]));
4548
4549        let expected = [Some(true), Some(false)].iter().collect();
4550        // case-sensitive
4551        apply_similar_to(
4552            &schema,
4553            vec!["hello world", "Hello World"],
4554            vec!["hello.*", "hello.*"],
4555            false,
4556            false,
4557            &expected,
4558        )
4559        .unwrap();
4560        // case-insensitive
4561        apply_similar_to(
4562            &schema,
4563            vec!["hello world", "bye"],
4564            vec!["hello.*", "hello.*"],
4565            false,
4566            true,
4567            &expected,
4568        )
4569        .unwrap();
4570    }
4571
4572    pub fn binary_expr(
4573        left: Arc<dyn PhysicalExpr>,
4574        op: Operator,
4575        right: Arc<dyn PhysicalExpr>,
4576        schema: &Schema,
4577    ) -> Result<BinaryExpr> {
4578        Ok(binary_op(left, op, right, schema)?
4579            .as_any()
4580            .downcast_ref::<BinaryExpr>()
4581            .unwrap()
4582            .clone())
4583    }
4584
4585    /// Test for Uniform-Uniform, Unknown-Uniform, Uniform-Unknown and Unknown-Unknown evaluation.
4586    #[test]
4587    fn test_evaluate_statistics_combination_of_range_holders() -> Result<()> {
4588        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4589        let a = Arc::new(Column::new("a", 0)) as _;
4590        let b = lit(ScalarValue::from(12.0));
4591
4592        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4593        let right_interval = Interval::make(Some(12.0), Some(36.0))?;
4594        let (left_mean, right_mean) = (ScalarValue::from(6.0), ScalarValue::from(24.0));
4595        let (left_med, right_med) = (ScalarValue::from(6.0), ScalarValue::from(24.0));
4596
4597        for children in [
4598            vec![
4599                &Distribution::new_uniform(left_interval.clone())?,
4600                &Distribution::new_uniform(right_interval.clone())?,
4601            ],
4602            vec![
4603                &Distribution::new_generic(
4604                    left_mean.clone(),
4605                    left_med.clone(),
4606                    ScalarValue::Float64(None),
4607                    left_interval.clone(),
4608                )?,
4609                &Distribution::new_uniform(right_interval.clone())?,
4610            ],
4611            vec![
4612                &Distribution::new_uniform(right_interval.clone())?,
4613                &Distribution::new_generic(
4614                    right_mean.clone(),
4615                    right_med.clone(),
4616                    ScalarValue::Float64(None),
4617                    right_interval.clone(),
4618                )?,
4619            ],
4620            vec![
4621                &Distribution::new_generic(
4622                    left_mean.clone(),
4623                    left_med.clone(),
4624                    ScalarValue::Float64(None),
4625                    left_interval.clone(),
4626                )?,
4627                &Distribution::new_generic(
4628                    right_mean.clone(),
4629                    right_med.clone(),
4630                    ScalarValue::Float64(None),
4631                    right_interval.clone(),
4632                )?,
4633            ],
4634        ] {
4635            let ops = vec![
4636                Operator::Plus,
4637                Operator::Minus,
4638                Operator::Multiply,
4639                Operator::Divide,
4640            ];
4641
4642            for op in ops {
4643                let expr = binary_expr(Arc::clone(&a), op, Arc::clone(&b), schema)?;
4644                assert_eq!(
4645                    expr.evaluate_statistics(&children)?,
4646                    new_generic_from_binary_op(&op, children[0], children[1])?
4647                );
4648            }
4649        }
4650        Ok(())
4651    }
4652
4653    #[test]
4654    fn test_evaluate_statistics_bernoulli() -> Result<()> {
4655        let schema = &Schema::new(vec![
4656            Field::new("a", DataType::Int64, false),
4657            Field::new("b", DataType::Int64, false),
4658        ]);
4659        let a = Arc::new(Column::new("a", 0)) as _;
4660        let b = Arc::new(Column::new("b", 1)) as _;
4661        let eq = Arc::new(binary_expr(
4662            Arc::clone(&a),
4663            Operator::Eq,
4664            Arc::clone(&b),
4665            schema,
4666        )?);
4667        let neq = Arc::new(binary_expr(a, Operator::NotEq, b, schema)?);
4668
4669        let left_stat = &Distribution::new_uniform(Interval::make(Some(0), Some(7))?)?;
4670        let right_stat = &Distribution::new_uniform(Interval::make(Some(4), Some(11))?)?;
4671
4672        // Intervals: [0, 7], [4, 11].
4673        // The intersection is [4, 7], so the probability of equality is 4 / 64 = 1 / 16.
4674        assert_eq!(
4675            eq.evaluate_statistics(&[left_stat, right_stat])?,
4676            Distribution::new_bernoulli(ScalarValue::from(1.0 / 16.0))?
4677        );
4678
4679        // The probability of being distinct is 1 - 1 / 16 = 15 / 16.
4680        assert_eq!(
4681            neq.evaluate_statistics(&[left_stat, right_stat])?,
4682            Distribution::new_bernoulli(ScalarValue::from(15.0 / 16.0))?
4683        );
4684
4685        Ok(())
4686    }
4687
4688    #[test]
4689    fn test_propagate_statistics_combination_of_range_holders_arithmetic() -> Result<()> {
4690        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4691        let a = Arc::new(Column::new("a", 0)) as _;
4692        let b = lit(ScalarValue::from(12.0));
4693
4694        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4695        let right_interval = Interval::make(Some(12.0), Some(36.0))?;
4696
4697        let parent = Distribution::new_uniform(Interval::make(Some(-432.), Some(432.))?)?;
4698        let children = vec![
4699            vec![
4700                Distribution::new_uniform(left_interval.clone())?,
4701                Distribution::new_uniform(right_interval.clone())?,
4702            ],
4703            vec![
4704                Distribution::new_generic(
4705                    ScalarValue::from(6.),
4706                    ScalarValue::from(6.),
4707                    ScalarValue::Float64(None),
4708                    left_interval.clone(),
4709                )?,
4710                Distribution::new_uniform(right_interval.clone())?,
4711            ],
4712            vec![
4713                Distribution::new_uniform(left_interval.clone())?,
4714                Distribution::new_generic(
4715                    ScalarValue::from(12.),
4716                    ScalarValue::from(12.),
4717                    ScalarValue::Float64(None),
4718                    right_interval.clone(),
4719                )?,
4720            ],
4721            vec![
4722                Distribution::new_generic(
4723                    ScalarValue::from(6.),
4724                    ScalarValue::from(6.),
4725                    ScalarValue::Float64(None),
4726                    left_interval.clone(),
4727                )?,
4728                Distribution::new_generic(
4729                    ScalarValue::from(12.),
4730                    ScalarValue::from(12.),
4731                    ScalarValue::Float64(None),
4732                    right_interval.clone(),
4733                )?,
4734            ],
4735        ];
4736
4737        let ops = vec![
4738            Operator::Plus,
4739            Operator::Minus,
4740            Operator::Multiply,
4741            Operator::Divide,
4742        ];
4743
4744        for child_view in children {
4745            let child_refs = child_view.iter().collect::<Vec<_>>();
4746            for op in &ops {
4747                let expr = binary_expr(Arc::clone(&a), *op, Arc::clone(&b), schema)?;
4748                assert_eq!(
4749                    expr.propagate_statistics(&parent, child_refs.as_slice())?,
4750                    Some(child_view.clone())
4751                );
4752            }
4753        }
4754        Ok(())
4755    }
4756
4757    #[test]
4758    fn test_propagate_statistics_combination_of_range_holders_comparison() -> Result<()> {
4759        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4760        let a = Arc::new(Column::new("a", 0)) as _;
4761        let b = lit(ScalarValue::from(12.0));
4762
4763        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4764        let right_interval = Interval::make(Some(6.0), Some(18.0))?;
4765
4766        let one = ScalarValue::from(1.0);
4767        let parent = Distribution::new_bernoulli(one)?;
4768        let children = vec![
4769            vec![
4770                Distribution::new_uniform(left_interval.clone())?,
4771                Distribution::new_uniform(right_interval.clone())?,
4772            ],
4773            vec![
4774                Distribution::new_generic(
4775                    ScalarValue::from(6.),
4776                    ScalarValue::from(6.),
4777                    ScalarValue::Float64(None),
4778                    left_interval.clone(),
4779                )?,
4780                Distribution::new_uniform(right_interval.clone())?,
4781            ],
4782            vec![
4783                Distribution::new_uniform(left_interval.clone())?,
4784                Distribution::new_generic(
4785                    ScalarValue::from(12.),
4786                    ScalarValue::from(12.),
4787                    ScalarValue::Float64(None),
4788                    right_interval.clone(),
4789                )?,
4790            ],
4791            vec![
4792                Distribution::new_generic(
4793                    ScalarValue::from(6.),
4794                    ScalarValue::from(6.),
4795                    ScalarValue::Float64(None),
4796                    left_interval.clone(),
4797                )?,
4798                Distribution::new_generic(
4799                    ScalarValue::from(12.),
4800                    ScalarValue::from(12.),
4801                    ScalarValue::Float64(None),
4802                    right_interval.clone(),
4803                )?,
4804            ],
4805        ];
4806
4807        let ops = vec![
4808            Operator::Eq,
4809            Operator::Gt,
4810            Operator::GtEq,
4811            Operator::Lt,
4812            Operator::LtEq,
4813        ];
4814
4815        for child_view in children {
4816            let child_refs = child_view.iter().collect::<Vec<_>>();
4817            for op in &ops {
4818                let expr = binary_expr(Arc::clone(&a), *op, Arc::clone(&b), schema)?;
4819                assert!(expr
4820                    .propagate_statistics(&parent, child_refs.as_slice())?
4821                    .is_some());
4822            }
4823        }
4824
4825        Ok(())
4826    }
4827
4828    #[test]
4829    fn test_fmt_sql() -> Result<()> {
4830        let schema = Schema::new(vec![
4831            Field::new("a", DataType::Int32, false),
4832            Field::new("b", DataType::Int32, false),
4833        ]);
4834
4835        // Test basic binary expressions
4836        let simple_expr = binary_expr(
4837            col("a", &schema)?,
4838            Operator::Plus,
4839            col("b", &schema)?,
4840            &schema,
4841        )?;
4842        let display_string = simple_expr.to_string();
4843        assert_eq!(display_string, "a@0 + b@1");
4844        let sql_string = fmt_sql(&simple_expr).to_string();
4845        assert_eq!(sql_string, "a + b");
4846
4847        // Test nested expressions with different operator precedence
4848        let nested_expr = binary_expr(
4849            Arc::new(binary_expr(
4850                col("a", &schema)?,
4851                Operator::Plus,
4852                col("b", &schema)?,
4853                &schema,
4854            )?),
4855            Operator::Multiply,
4856            col("b", &schema)?,
4857            &schema,
4858        )?;
4859        let display_string = nested_expr.to_string();
4860        assert_eq!(display_string, "(a@0 + b@1) * b@1");
4861        let sql_string = fmt_sql(&nested_expr).to_string();
4862        assert_eq!(sql_string, "(a + b) * b");
4863
4864        // Test nested expressions with same operator precedence
4865        let nested_same_prec = binary_expr(
4866            Arc::new(binary_expr(
4867                col("a", &schema)?,
4868                Operator::Plus,
4869                col("b", &schema)?,
4870                &schema,
4871            )?),
4872            Operator::Plus,
4873            col("b", &schema)?,
4874            &schema,
4875        )?;
4876        let display_string = nested_same_prec.to_string();
4877        assert_eq!(display_string, "a@0 + b@1 + b@1");
4878        let sql_string = fmt_sql(&nested_same_prec).to_string();
4879        assert_eq!(sql_string, "a + b + b");
4880
4881        // Test with literals
4882        let lit_expr = binary_expr(
4883            col("a", &schema)?,
4884            Operator::Eq,
4885            lit(ScalarValue::Int32(Some(42))),
4886            &schema,
4887        )?;
4888        let display_string = lit_expr.to_string();
4889        assert_eq!(display_string, "a@0 = 42");
4890        let sql_string = fmt_sql(&lit_expr).to_string();
4891        assert_eq!(sql_string, "a = 42");
4892
4893        Ok(())
4894    }
4895
4896    #[test]
4897    fn test_check_short_circuit() {
4898        use crate::planner::logical2physical;
4899        use datafusion_expr::col as logical_col;
4900        use datafusion_expr::lit;
4901        let schema = Arc::new(Schema::new(vec![
4902            Field::new("a", DataType::Int32, false),
4903            Field::new("b", DataType::Int32, false),
4904        ]));
4905        let a_array = Int32Array::from(vec![1, 3, 4, 5, 6]);
4906        let b_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
4907        let batch = RecordBatch::try_new(
4908            Arc::clone(&schema),
4909            vec![Arc::new(a_array), Arc::new(b_array)],
4910        )
4911        .unwrap();
4912
4913        // op: AND left: all false
4914        let left_expr = logical2physical(&logical_col("a").eq(lit(2)), &schema);
4915        let left_value = left_expr.evaluate(&batch).unwrap();
4916        assert!(check_short_circuit(&left_value, &Operator::And));
4917        // op: AND left: not all false
4918        let left_expr = logical2physical(&logical_col("a").eq(lit(3)), &schema);
4919        let left_value = left_expr.evaluate(&batch).unwrap();
4920        assert!(!check_short_circuit(&left_value, &Operator::And));
4921        // op: OR left: all true
4922        let left_expr = logical2physical(&logical_col("a").gt(lit(0)), &schema);
4923        let left_value = left_expr.evaluate(&batch).unwrap();
4924        assert!(check_short_circuit(&left_value, &Operator::Or));
4925        // op: OR left: not all true
4926        let left_expr = logical2physical(&logical_col("a").gt(lit(2)), &schema);
4927        let left_value = left_expr.evaluate(&batch).unwrap();
4928        assert!(!check_short_circuit(&left_value, &Operator::Or));
4929    }
4930}