datafusion_physical_expr/expressions/
binary.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod kernels;
19
20use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison};
21use crate::PhysicalExpr;
22use std::hash::Hash;
23use std::{any::Any, sync::Arc};
24
25use arrow::array::*;
26use arrow::compute::kernels::boolean::{and_kleene, or_kleene};
27use arrow::compute::kernels::cmp::*;
28use arrow::compute::kernels::concat_elements::concat_elements_utf8;
29use arrow::compute::{
30    cast, filter_record_batch, ilike, like, nilike, nlike, SlicesIterator,
31};
32use arrow::datatypes::*;
33use arrow::error::ArrowError;
34use datafusion_common::cast::as_boolean_array;
35use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
36use datafusion_expr::binary::BinaryTypeCoercer;
37use datafusion_expr::interval_arithmetic::{apply_operator, Interval};
38use datafusion_expr::sort_properties::ExprProperties;
39use datafusion_expr::statistics::Distribution::{Bernoulli, Gaussian};
40use datafusion_expr::statistics::{
41    combine_bernoullis, combine_gaussians, create_bernoulli_from_comparison,
42    new_generic_from_binary_op, Distribution,
43};
44use datafusion_expr::{ColumnarValue, Operator};
45use datafusion_physical_expr_common::datum::{apply, apply_cmp, apply_cmp_for_nested};
46
47use kernels::{
48    bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar,
49    bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn,
50    bitwise_shift_right_dyn_scalar, bitwise_xor_dyn, bitwise_xor_dyn_scalar,
51    concat_elements_utf8view, regex_match_dyn, regex_match_dyn_scalar,
52};
53
54/// Binary expression
55#[derive(Debug, Clone, Eq)]
56pub struct BinaryExpr {
57    left: Arc<dyn PhysicalExpr>,
58    op: Operator,
59    right: Arc<dyn PhysicalExpr>,
60    /// Specifies whether an error is returned on overflow or not
61    fail_on_overflow: bool,
62}
63
64// Manually derive PartialEq and Hash to work around https://github.com/rust-lang/rust/issues/78808
65impl PartialEq for BinaryExpr {
66    fn eq(&self, other: &Self) -> bool {
67        self.left.eq(&other.left)
68            && self.op.eq(&other.op)
69            && self.right.eq(&other.right)
70            && self.fail_on_overflow.eq(&other.fail_on_overflow)
71    }
72}
73impl Hash for BinaryExpr {
74    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
75        self.left.hash(state);
76        self.op.hash(state);
77        self.right.hash(state);
78        self.fail_on_overflow.hash(state);
79    }
80}
81
82impl BinaryExpr {
83    /// Create new binary expression
84    pub fn new(
85        left: Arc<dyn PhysicalExpr>,
86        op: Operator,
87        right: Arc<dyn PhysicalExpr>,
88    ) -> Self {
89        Self {
90            left,
91            op,
92            right,
93            fail_on_overflow: false,
94        }
95    }
96
97    /// Create new binary expression with explicit fail_on_overflow value
98    pub fn with_fail_on_overflow(self, fail_on_overflow: bool) -> Self {
99        Self {
100            left: self.left,
101            op: self.op,
102            right: self.right,
103            fail_on_overflow,
104        }
105    }
106
107    /// Get the left side of the binary expression
108    pub fn left(&self) -> &Arc<dyn PhysicalExpr> {
109        &self.left
110    }
111
112    /// Get the right side of the binary expression
113    pub fn right(&self) -> &Arc<dyn PhysicalExpr> {
114        &self.right
115    }
116
117    /// Get the operator for this binary expression
118    pub fn op(&self) -> &Operator {
119        &self.op
120    }
121}
122
123impl std::fmt::Display for BinaryExpr {
124    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
125        // Put parentheses around child binary expressions so that we can see the difference
126        // between `(a OR b) AND c` and `a OR (b AND c)`. We only insert parentheses when needed,
127        // based on operator precedence. For example, `(a AND b) OR c` and `a AND b OR c` are
128        // equivalent and the parentheses are not necessary.
129
130        fn write_child(
131            f: &mut std::fmt::Formatter,
132            expr: &dyn PhysicalExpr,
133            precedence: u8,
134        ) -> std::fmt::Result {
135            if let Some(child) = expr.as_any().downcast_ref::<BinaryExpr>() {
136                let p = child.op.precedence();
137                if p == 0 || p < precedence {
138                    write!(f, "({child})")?;
139                } else {
140                    write!(f, "{child}")?;
141                }
142            } else {
143                write!(f, "{expr}")?;
144            }
145
146            Ok(())
147        }
148
149        let precedence = self.op.precedence();
150        write_child(f, self.left.as_ref(), precedence)?;
151        write!(f, " {} ", self.op)?;
152        write_child(f, self.right.as_ref(), precedence)
153    }
154}
155
156/// Invoke a boolean kernel on a pair of arrays
157#[inline]
158fn boolean_op(
159    left: &dyn Array,
160    right: &dyn Array,
161    op: impl FnOnce(&BooleanArray, &BooleanArray) -> Result<BooleanArray, ArrowError>,
162) -> Result<Arc<dyn Array + 'static>, ArrowError> {
163    let ll = as_boolean_array(left).expect("boolean_op failed to downcast left array");
164    let rr = as_boolean_array(right).expect("boolean_op failed to downcast right array");
165    op(ll, rr).map(|t| Arc::new(t) as _)
166}
167
168impl PhysicalExpr for BinaryExpr {
169    /// Return a reference to Any that can be used for downcasting
170    fn as_any(&self) -> &dyn Any {
171        self
172    }
173
174    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
175        BinaryTypeCoercer::new(
176            &self.left.data_type(input_schema)?,
177            &self.op,
178            &self.right.data_type(input_schema)?,
179        )
180        .get_result_type()
181    }
182
183    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
184        Ok(self.left.nullable(input_schema)? || self.right.nullable(input_schema)?)
185    }
186
187    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
188        use arrow::compute::kernels::numeric::*;
189
190        // Evaluate left-hand side expression.
191        let lhs = self.left.evaluate(batch)?;
192
193        // Check if we can apply short-circuit evaluation.
194        match check_short_circuit(&lhs, &self.op) {
195            ShortCircuitStrategy::None => {}
196            ShortCircuitStrategy::ReturnLeft => return Ok(lhs),
197            ShortCircuitStrategy::ReturnRight => {
198                let rhs = self.right.evaluate(batch)?;
199                return Ok(rhs);
200            }
201            ShortCircuitStrategy::PreSelection(selection) => {
202                // The function `evaluate_selection` was not called for filtering and calculation,
203                // as it takes into account cases where the selection contains null values.
204                let batch = filter_record_batch(batch, selection)?;
205                let right_ret = self.right.evaluate(&batch)?;
206
207                match &right_ret {
208                    ColumnarValue::Array(array) => {
209                        // When the array on the right is all true or all false, skip the scatter process
210                        let boolean_array = array.as_boolean();
211                        let true_count = boolean_array.true_count();
212                        let length = boolean_array.len();
213                        if true_count == length {
214                            return Ok(lhs);
215                        } else if true_count == 0 && boolean_array.null_count() == 0 {
216                            // If the right-hand array is returned at this point,the lengths will be inconsistent;
217                            // returning a scalar can avoid this issue
218                            return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(
219                                Some(false),
220                            )));
221                        }
222
223                        return pre_selection_scatter(selection, Some(boolean_array));
224                    }
225                    ColumnarValue::Scalar(scalar) => {
226                        if let ScalarValue::Boolean(v) = scalar {
227                            // When the scalar is true or false, skip the scatter process
228                            if let Some(v) = v {
229                                if *v {
230                                    return Ok(lhs);
231                                } else {
232                                    return Ok(right_ret);
233                                }
234                            } else {
235                                return pre_selection_scatter(selection, None);
236                            }
237                        } else {
238                            return internal_err!(
239                                "Expected boolean scalar value, found: {right_ret:?}"
240                            );
241                        }
242                    }
243                }
244            }
245        }
246
247        let rhs = self.right.evaluate(batch)?;
248        let left_data_type = lhs.data_type();
249        let right_data_type = rhs.data_type();
250
251        let schema = batch.schema();
252        let input_schema = schema.as_ref();
253
254        if left_data_type.is_nested() {
255            if !left_data_type.equals_datatype(&right_data_type) {
256                return internal_err!("Cannot evaluate binary expression because of type mismatch: left {}, right {} ", left_data_type, right_data_type);
257            }
258            return apply_cmp_for_nested(self.op, &lhs, &rhs);
259        }
260
261        match self.op {
262            Operator::Plus if self.fail_on_overflow => return apply(&lhs, &rhs, add),
263            Operator::Plus => return apply(&lhs, &rhs, add_wrapping),
264            Operator::Minus if self.fail_on_overflow => return apply(&lhs, &rhs, sub),
265            Operator::Minus => return apply(&lhs, &rhs, sub_wrapping),
266            Operator::Multiply if self.fail_on_overflow => return apply(&lhs, &rhs, mul),
267            Operator::Multiply => return apply(&lhs, &rhs, mul_wrapping),
268            Operator::Divide => return apply(&lhs, &rhs, div),
269            Operator::Modulo => return apply(&lhs, &rhs, rem),
270            Operator::Eq => return apply_cmp(&lhs, &rhs, eq),
271            Operator::NotEq => return apply_cmp(&lhs, &rhs, neq),
272            Operator::Lt => return apply_cmp(&lhs, &rhs, lt),
273            Operator::Gt => return apply_cmp(&lhs, &rhs, gt),
274            Operator::LtEq => return apply_cmp(&lhs, &rhs, lt_eq),
275            Operator::GtEq => return apply_cmp(&lhs, &rhs, gt_eq),
276            Operator::IsDistinctFrom => return apply_cmp(&lhs, &rhs, distinct),
277            Operator::IsNotDistinctFrom => return apply_cmp(&lhs, &rhs, not_distinct),
278            Operator::LikeMatch => return apply_cmp(&lhs, &rhs, like),
279            Operator::ILikeMatch => return apply_cmp(&lhs, &rhs, ilike),
280            Operator::NotLikeMatch => return apply_cmp(&lhs, &rhs, nlike),
281            Operator::NotILikeMatch => return apply_cmp(&lhs, &rhs, nilike),
282            _ => {}
283        }
284
285        let result_type = self.data_type(input_schema)?;
286
287        // If the left-hand side is an array and the right-hand side is a non-null scalar, try the optimized kernel.
288        if let (ColumnarValue::Array(array), ColumnarValue::Scalar(ref scalar)) =
289            (&lhs, &rhs)
290        {
291            if !scalar.is_null() {
292                if let Some(result_array) =
293                    self.evaluate_array_scalar(array, scalar.clone())?
294                {
295                    let final_array = result_array
296                        .and_then(|a| to_result_type_array(&self.op, a, &result_type));
297                    return final_array.map(ColumnarValue::Array);
298                }
299            }
300        }
301
302        // if both arrays or both literals - extract arrays and continue execution
303        let (left, right) = (
304            lhs.into_array(batch.num_rows())?,
305            rhs.into_array(batch.num_rows())?,
306        );
307        self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type)
308            .map(ColumnarValue::Array)
309    }
310
311    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
312        vec![&self.left, &self.right]
313    }
314
315    fn with_new_children(
316        self: Arc<Self>,
317        children: Vec<Arc<dyn PhysicalExpr>>,
318    ) -> Result<Arc<dyn PhysicalExpr>> {
319        Ok(Arc::new(
320            BinaryExpr::new(Arc::clone(&children[0]), self.op, Arc::clone(&children[1]))
321                .with_fail_on_overflow(self.fail_on_overflow),
322        ))
323    }
324
325    fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
326        // Get children intervals:
327        let left_interval = children[0];
328        let right_interval = children[1];
329        // Calculate current node's interval:
330        apply_operator(&self.op, left_interval, right_interval)
331    }
332
333    fn propagate_constraints(
334        &self,
335        interval: &Interval,
336        children: &[&Interval],
337    ) -> Result<Option<Vec<Interval>>> {
338        // Get children intervals.
339        let left_interval = children[0];
340        let right_interval = children[1];
341
342        if self.op.eq(&Operator::And) {
343            if interval.eq(&Interval::CERTAINLY_TRUE) {
344                // A certainly true logical conjunction can only derive from possibly
345                // true operands. Otherwise, we prove infeasibility.
346                Ok((!left_interval.eq(&Interval::CERTAINLY_FALSE)
347                    && !right_interval.eq(&Interval::CERTAINLY_FALSE))
348                .then(|| vec![Interval::CERTAINLY_TRUE, Interval::CERTAINLY_TRUE]))
349            } else if interval.eq(&Interval::CERTAINLY_FALSE) {
350                // If the logical conjunction is certainly false, one of the
351                // operands must be false. However, it's not always possible to
352                // determine which operand is false, leading to different scenarios.
353
354                // If one operand is certainly true and the other one is uncertain,
355                // then the latter must be certainly false.
356                if left_interval.eq(&Interval::CERTAINLY_TRUE)
357                    && right_interval.eq(&Interval::UNCERTAIN)
358                {
359                    Ok(Some(vec![
360                        Interval::CERTAINLY_TRUE,
361                        Interval::CERTAINLY_FALSE,
362                    ]))
363                } else if right_interval.eq(&Interval::CERTAINLY_TRUE)
364                    && left_interval.eq(&Interval::UNCERTAIN)
365                {
366                    Ok(Some(vec![
367                        Interval::CERTAINLY_FALSE,
368                        Interval::CERTAINLY_TRUE,
369                    ]))
370                }
371                // If both children are uncertain, or if one is certainly false,
372                // we cannot conclusively refine their intervals. In this case,
373                // propagation does not result in any interval changes.
374                else {
375                    Ok(Some(vec![]))
376                }
377            } else {
378                // An uncertain logical conjunction result can not shrink the
379                // end-points of its children.
380                Ok(Some(vec![]))
381            }
382        } else if self.op.eq(&Operator::Or) {
383            if interval.eq(&Interval::CERTAINLY_FALSE) {
384                // A certainly false logical disjunction can only derive from certainly
385                // false operands. Otherwise, we prove infeasibility.
386                Ok((!left_interval.eq(&Interval::CERTAINLY_TRUE)
387                    && !right_interval.eq(&Interval::CERTAINLY_TRUE))
388                .then(|| vec![Interval::CERTAINLY_FALSE, Interval::CERTAINLY_FALSE]))
389            } else if interval.eq(&Interval::CERTAINLY_TRUE) {
390                // If the logical disjunction is certainly true, one of the
391                // operands must be true. However, it's not always possible to
392                // determine which operand is true, leading to different scenarios.
393
394                // If one operand is certainly false and the other one is uncertain,
395                // then the latter must be certainly true.
396                if left_interval.eq(&Interval::CERTAINLY_FALSE)
397                    && right_interval.eq(&Interval::UNCERTAIN)
398                {
399                    Ok(Some(vec![
400                        Interval::CERTAINLY_FALSE,
401                        Interval::CERTAINLY_TRUE,
402                    ]))
403                } else if right_interval.eq(&Interval::CERTAINLY_FALSE)
404                    && left_interval.eq(&Interval::UNCERTAIN)
405                {
406                    Ok(Some(vec![
407                        Interval::CERTAINLY_TRUE,
408                        Interval::CERTAINLY_FALSE,
409                    ]))
410                }
411                // If both children are uncertain, or if one is certainly true,
412                // we cannot conclusively refine their intervals. In this case,
413                // propagation does not result in any interval changes.
414                else {
415                    Ok(Some(vec![]))
416                }
417            } else {
418                // An uncertain logical disjunction result can not shrink the
419                // end-points of its children.
420                Ok(Some(vec![]))
421            }
422        } else if self.op.supports_propagation() {
423            Ok(
424                propagate_comparison(&self.op, interval, left_interval, right_interval)?
425                    .map(|(left, right)| vec![left, right]),
426            )
427        } else {
428            Ok(
429                propagate_arithmetic(&self.op, interval, left_interval, right_interval)?
430                    .map(|(left, right)| vec![left, right]),
431            )
432        }
433    }
434
435    fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
436        let (left, right) = (children[0], children[1]);
437
438        if self.op.is_numerical_operators() {
439            // We might be able to construct the output statistics more accurately,
440            // without falling back to an unknown distribution, if we are dealing
441            // with Gaussian distributions and numerical operations.
442            if let (Gaussian(left), Gaussian(right)) = (left, right) {
443                if let Some(result) = combine_gaussians(&self.op, left, right)? {
444                    return Ok(Gaussian(result));
445                }
446            }
447        } else if self.op.is_logic_operator() {
448            // If we are dealing with logical operators, we expect (and can only
449            // operate on) Bernoulli distributions.
450            return if let (Bernoulli(left), Bernoulli(right)) = (left, right) {
451                combine_bernoullis(&self.op, left, right).map(Bernoulli)
452            } else {
453                internal_err!(
454                    "Logical operators are only compatible with `Bernoulli` distributions"
455                )
456            };
457        } else if self.op.supports_propagation() {
458            // If we are handling comparison operators, we expect (and can only
459            // operate on) numeric distributions.
460            return create_bernoulli_from_comparison(&self.op, left, right);
461        }
462        // Fall back to an unknown distribution with only summary statistics:
463        new_generic_from_binary_op(&self.op, left, right)
464    }
465
466    /// For each operator, [`BinaryExpr`] has distinct rules.
467    /// TODO: There may be rules specific to some data types and expression ranges.
468    fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
469        let (l_order, l_range) = (children[0].sort_properties, &children[0].range);
470        let (r_order, r_range) = (children[1].sort_properties, &children[1].range);
471        match self.op() {
472            Operator::Plus => Ok(ExprProperties {
473                sort_properties: l_order.add(&r_order),
474                range: l_range.add(r_range)?,
475                preserves_lex_ordering: false,
476            }),
477            Operator::Minus => Ok(ExprProperties {
478                sort_properties: l_order.sub(&r_order),
479                range: l_range.sub(r_range)?,
480                preserves_lex_ordering: false,
481            }),
482            Operator::Gt => Ok(ExprProperties {
483                sort_properties: l_order.gt_or_gteq(&r_order),
484                range: l_range.gt(r_range)?,
485                preserves_lex_ordering: false,
486            }),
487            Operator::GtEq => Ok(ExprProperties {
488                sort_properties: l_order.gt_or_gteq(&r_order),
489                range: l_range.gt_eq(r_range)?,
490                preserves_lex_ordering: false,
491            }),
492            Operator::Lt => Ok(ExprProperties {
493                sort_properties: r_order.gt_or_gteq(&l_order),
494                range: l_range.lt(r_range)?,
495                preserves_lex_ordering: false,
496            }),
497            Operator::LtEq => Ok(ExprProperties {
498                sort_properties: r_order.gt_or_gteq(&l_order),
499                range: l_range.lt_eq(r_range)?,
500                preserves_lex_ordering: false,
501            }),
502            Operator::And => Ok(ExprProperties {
503                sort_properties: r_order.and_or(&l_order),
504                range: l_range.and(r_range)?,
505                preserves_lex_ordering: false,
506            }),
507            Operator::Or => Ok(ExprProperties {
508                sort_properties: r_order.and_or(&l_order),
509                range: l_range.or(r_range)?,
510                preserves_lex_ordering: false,
511            }),
512            _ => Ok(ExprProperties::new_unknown()),
513        }
514    }
515
516    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
517        fn write_child(
518            f: &mut std::fmt::Formatter,
519            expr: &dyn PhysicalExpr,
520            precedence: u8,
521        ) -> std::fmt::Result {
522            if let Some(child) = expr.as_any().downcast_ref::<BinaryExpr>() {
523                let p = child.op.precedence();
524                if p == 0 || p < precedence {
525                    write!(f, "(")?;
526                    child.fmt_sql(f)?;
527                    write!(f, ")")
528                } else {
529                    child.fmt_sql(f)
530                }
531            } else {
532                expr.fmt_sql(f)
533            }
534        }
535
536        let precedence = self.op.precedence();
537        write_child(f, self.left.as_ref(), precedence)?;
538        write!(f, " {} ", self.op)?;
539        write_child(f, self.right.as_ref(), precedence)
540    }
541}
542
543/// Casts dictionary array to result type for binary numerical operators. Such operators
544/// between array and scalar produce a dictionary array other than primitive array of the
545/// same operators between array and array. This leads to inconsistent result types causing
546/// errors in the following query execution. For such operators between array and scalar,
547/// we cast the dictionary array to primitive array.
548fn to_result_type_array(
549    op: &Operator,
550    array: ArrayRef,
551    result_type: &DataType,
552) -> Result<ArrayRef> {
553    if array.data_type() == result_type {
554        Ok(array)
555    } else if op.is_numerical_operators() {
556        match array.data_type() {
557            DataType::Dictionary(_, value_type) => {
558                if value_type.as_ref() == result_type {
559                    Ok(cast(&array, result_type)?)
560                } else {
561                    internal_err!(
562                            "Incompatible Dictionary value type {value_type} with result type {result_type} of Binary operator {op:?}"
563                        )
564                }
565            }
566            _ => Ok(array),
567        }
568    } else {
569        Ok(array)
570    }
571}
572
573impl BinaryExpr {
574    /// Evaluate the expression of the left input is an array and
575    /// right is literal - use scalar operations
576    fn evaluate_array_scalar(
577        &self,
578        array: &dyn Array,
579        scalar: ScalarValue,
580    ) -> Result<Option<Result<ArrayRef>>> {
581        use Operator::*;
582        let scalar_result = match &self.op {
583            RegexMatch => regex_match_dyn_scalar(array, scalar, false, false),
584            RegexIMatch => regex_match_dyn_scalar(array, scalar, false, true),
585            RegexNotMatch => regex_match_dyn_scalar(array, scalar, true, false),
586            RegexNotIMatch => regex_match_dyn_scalar(array, scalar, true, true),
587            BitwiseAnd => bitwise_and_dyn_scalar(array, scalar),
588            BitwiseOr => bitwise_or_dyn_scalar(array, scalar),
589            BitwiseXor => bitwise_xor_dyn_scalar(array, scalar),
590            BitwiseShiftRight => bitwise_shift_right_dyn_scalar(array, scalar),
591            BitwiseShiftLeft => bitwise_shift_left_dyn_scalar(array, scalar),
592            // if scalar operation is not supported - fallback to array implementation
593            _ => None,
594        };
595
596        Ok(scalar_result)
597    }
598
599    fn evaluate_with_resolved_args(
600        &self,
601        left: Arc<dyn Array>,
602        left_data_type: &DataType,
603        right: Arc<dyn Array>,
604        right_data_type: &DataType,
605    ) -> Result<ArrayRef> {
606        use Operator::*;
607        match &self.op {
608            IsDistinctFrom | IsNotDistinctFrom | Lt | LtEq | Gt | GtEq | Eq | NotEq
609            | Plus | Minus | Multiply | Divide | Modulo | LikeMatch | ILikeMatch
610            | NotLikeMatch | NotILikeMatch => unreachable!(),
611            And => {
612                if left_data_type == &DataType::Boolean {
613                    Ok(boolean_op(&left, &right, and_kleene)?)
614                } else {
615                    internal_err!(
616                        "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
617                        self.op,
618                        left.data_type(),
619                        right.data_type()
620                    )
621                }
622            }
623            Or => {
624                if left_data_type == &DataType::Boolean {
625                    Ok(boolean_op(&left, &right, or_kleene)?)
626                } else {
627                    internal_err!(
628                        "Cannot evaluate binary expression {:?} with types {:?} and {:?}",
629                        self.op,
630                        left_data_type,
631                        right_data_type
632                    )
633                }
634            }
635            RegexMatch => regex_match_dyn(left, right, false, false),
636            RegexIMatch => regex_match_dyn(left, right, false, true),
637            RegexNotMatch => regex_match_dyn(left, right, true, false),
638            RegexNotIMatch => regex_match_dyn(left, right, true, true),
639            BitwiseAnd => bitwise_and_dyn(left, right),
640            BitwiseOr => bitwise_or_dyn(left, right),
641            BitwiseXor => bitwise_xor_dyn(left, right),
642            BitwiseShiftRight => bitwise_shift_right_dyn(left, right),
643            BitwiseShiftLeft => bitwise_shift_left_dyn(left, right),
644            StringConcat => concat_elements(left, right),
645            AtArrow | ArrowAt | Arrow | LongArrow | HashArrow | HashLongArrow | AtAt
646            | HashMinus | AtQuestion | Question | QuestionAnd | QuestionPipe
647            | IntegerDivide => {
648                not_impl_err!(
649                    "Binary operator '{:?}' is not supported in the physical expr",
650                    self.op
651                )
652            }
653        }
654    }
655}
656
657enum ShortCircuitStrategy<'a> {
658    None,
659    ReturnLeft,
660    ReturnRight,
661    PreSelection(&'a BooleanArray),
662}
663
664/// Based on the results calculated from the left side of the short-circuit operation,
665/// if the proportion of `true` is less than 0.2 and the current operation is an `and`,
666/// the `RecordBatch` will be filtered in advance.
667const PRE_SELECTION_THRESHOLD: f32 = 0.2;
668
669/// Checks if a logical operator (`AND`/`OR`) can short-circuit evaluation based on the left-hand side (lhs) result.
670///
671/// Short-circuiting occurs under these circumstances:
672/// - For `AND`:
673///    - if LHS is all false => short-circuit → return LHS
674///    - if LHS is all true  => short-circuit → return RHS
675///    - if LHS is mixed and true_count/sum_count <= [`PRE_SELECTION_THRESHOLD`] -> pre-selection
676/// - For `OR`:
677///    - if LHS is all true  => short-circuit → return LHS
678///    - if LHS is all false => short-circuit → return RHS
679/// # Arguments
680/// * `lhs` - The left-hand side (lhs) columnar value (array or scalar)
681/// * `lhs` - The left-hand side (lhs) columnar value (array or scalar)
682/// * `op` - The logical operator (`AND` or `OR`)
683///
684/// # Implementation Notes
685/// 1. Only works with Boolean-typed arguments (other types automatically return `false`)
686/// 2. Handles both scalar values and array values
687/// 3. For arrays, uses optimized bit counting techniques for boolean arrays
688fn check_short_circuit<'a>(
689    lhs: &'a ColumnarValue,
690    op: &Operator,
691) -> ShortCircuitStrategy<'a> {
692    // Quick reject for non-logical operators,and quick judgment when op is and
693    let is_and = match op {
694        Operator::And => true,
695        Operator::Or => false,
696        _ => return ShortCircuitStrategy::None,
697    };
698
699    // Non-boolean types can't be short-circuited
700    if lhs.data_type() != DataType::Boolean {
701        return ShortCircuitStrategy::None;
702    }
703
704    match lhs {
705        ColumnarValue::Array(array) => {
706            // Fast path for arrays - try to downcast to boolean array
707            if let Ok(bool_array) = as_boolean_array(array) {
708                // Arrays with nulls can't be short-circuited
709                if bool_array.null_count() > 0 {
710                    return ShortCircuitStrategy::None;
711                }
712
713                let len = bool_array.len();
714                if len == 0 {
715                    return ShortCircuitStrategy::None;
716                }
717
718                let true_count = bool_array.values().count_set_bits();
719                if is_and {
720                    // For AND, prioritize checking for all-false (short circuit case)
721                    // Uses optimized false_count() method provided by Arrow
722
723                    // Short circuit if all values are false
724                    if true_count == 0 {
725                        return ShortCircuitStrategy::ReturnLeft;
726                    }
727
728                    // If no false values, then all must be true
729                    if true_count == len {
730                        return ShortCircuitStrategy::ReturnRight;
731                    }
732
733                    // determine if we can pre-selection
734                    if true_count as f32 / len as f32 <= PRE_SELECTION_THRESHOLD {
735                        return ShortCircuitStrategy::PreSelection(bool_array);
736                    }
737                } else {
738                    // For OR, prioritize checking for all-true (short circuit case)
739                    // Uses optimized true_count() method provided by Arrow
740
741                    // Short circuit if all values are true
742                    if true_count == len {
743                        return ShortCircuitStrategy::ReturnLeft;
744                    }
745
746                    // If no true values, then all must be false
747                    if true_count == 0 {
748                        return ShortCircuitStrategy::ReturnRight;
749                    }
750                }
751            }
752        }
753        ColumnarValue::Scalar(scalar) => {
754            // Fast path for scalar values
755            if let ScalarValue::Boolean(Some(is_true)) = scalar {
756                // Return Left for:
757                // - AND with false value
758                // - OR with true value
759                if (is_and && !is_true) || (!is_and && *is_true) {
760                    return ShortCircuitStrategy::ReturnLeft;
761                } else {
762                    return ShortCircuitStrategy::ReturnRight;
763                }
764            }
765        }
766    }
767
768    // If we can't short-circuit, indicate that normal evaluation should continue
769    ShortCircuitStrategy::None
770}
771
772/// Creates a new boolean array based on the evaluation of the right expression,
773/// but only for positions where the left_result is true.
774///
775/// This function is used for short-circuit evaluation optimization of logical AND operations:
776/// - When left_result has few true values, we only evaluate the right expression for those positions
777/// - Values are copied from right_array where left_result is true
778/// - All other positions are filled with false values
779///
780/// # Parameters
781/// - `left_result` Boolean array with selection mask (typically from left side of AND)
782/// - `right_result` Result of evaluating right side of expression (only for selected positions)
783///
784/// # Returns
785/// A combined ColumnarValue with values from right_result where left_result is true
786///
787/// # Example
788///  Initial Data: { 1, 2, 3, 4, 5 }
789///  Left Evaluation
790///     (Condition: Equal to 2 or 3)
791///          ↓
792///  Filtered Data: {2, 3}
793///    Left Bitmap: { 0, 1, 1, 0, 0 }
794///          ↓
795///   Right Evaluation
796///     (Condition: Even numbers)
797///          ↓
798///  Right Data: { 2 }
799///    Right Bitmap: { 1, 0 }
800///          ↓
801///   Combine Results
802///  Final Bitmap: { 0, 1, 0, 0, 0 }
803///
804/// # Note
805/// Perhaps it would be better to modify `left_result` directly without creating a copy?
806/// In practice, `left_result` should have only one owner, so making changes should be safe.
807/// However, this is difficult to achieve under the immutable constraints of [`Arc`] and [`BooleanArray`].
808fn pre_selection_scatter(
809    left_result: &BooleanArray,
810    right_result: Option<&BooleanArray>,
811) -> Result<ColumnarValue> {
812    let result_len = left_result.len();
813
814    let mut result_array_builder = BooleanArray::builder(result_len);
815
816    // keep track of current position we have in right boolean array
817    let mut right_array_pos = 0;
818
819    // keep track of how much is filled
820    let mut last_end = 0;
821    // reduce if condition in for_each
822    match right_result {
823        Some(right_result) => {
824            SlicesIterator::new(left_result).for_each(|(start, end)| {
825                // the gap needs to be filled with false
826                if start > last_end {
827                    result_array_builder.append_n(start - last_end, false);
828                }
829
830                // copy values from right array for this slice
831                let len = end - start;
832                right_result
833                    .slice(right_array_pos, len)
834                    .iter()
835                    .for_each(|v| result_array_builder.append_option(v));
836
837                right_array_pos += len;
838                last_end = end;
839            });
840        }
841        None => SlicesIterator::new(left_result).for_each(|(start, end)| {
842            // the gap needs to be filled with false
843            if start > last_end {
844                result_array_builder.append_n(start - last_end, false);
845            }
846
847            // append nulls for this slice derictly
848            let len = end - start;
849            result_array_builder.append_nulls(len);
850
851            last_end = end;
852        }),
853    }
854
855    // Fill any remaining positions with false
856    if last_end < result_len {
857        result_array_builder.append_n(result_len - last_end, false);
858    }
859    let boolean_result = result_array_builder.finish();
860
861    Ok(ColumnarValue::Array(Arc::new(boolean_result)))
862}
863
864fn concat_elements(left: Arc<dyn Array>, right: Arc<dyn Array>) -> Result<ArrayRef> {
865    Ok(match left.data_type() {
866        DataType::Utf8 => Arc::new(concat_elements_utf8(
867            left.as_string::<i32>(),
868            right.as_string::<i32>(),
869        )?),
870        DataType::LargeUtf8 => Arc::new(concat_elements_utf8(
871            left.as_string::<i64>(),
872            right.as_string::<i64>(),
873        )?),
874        DataType::Utf8View => Arc::new(concat_elements_utf8view(
875            left.as_string_view(),
876            right.as_string_view(),
877        )?),
878        other => {
879            return internal_err!(
880                "Data type {other:?} not supported for binary operation 'concat_elements' on string arrays"
881            );
882        }
883    })
884}
885
886/// Create a binary expression whose arguments are correctly coerced.
887/// This function errors if it is not possible to coerce the arguments
888/// to computational types supported by the operator.
889pub fn binary(
890    lhs: Arc<dyn PhysicalExpr>,
891    op: Operator,
892    rhs: Arc<dyn PhysicalExpr>,
893    _input_schema: &Schema,
894) -> Result<Arc<dyn PhysicalExpr>> {
895    Ok(Arc::new(BinaryExpr::new(lhs, op, rhs)))
896}
897
898/// Create a similar to expression
899pub fn similar_to(
900    negated: bool,
901    case_insensitive: bool,
902    expr: Arc<dyn PhysicalExpr>,
903    pattern: Arc<dyn PhysicalExpr>,
904) -> Result<Arc<dyn PhysicalExpr>> {
905    let binary_op = match (negated, case_insensitive) {
906        (false, false) => Operator::RegexMatch,
907        (false, true) => Operator::RegexIMatch,
908        (true, false) => Operator::RegexNotMatch,
909        (true, true) => Operator::RegexNotIMatch,
910    };
911    Ok(Arc::new(BinaryExpr::new(expr, binary_op, pattern)))
912}
913
914#[cfg(test)]
915mod tests {
916    use super::*;
917    use crate::expressions::{col, lit, try_cast, Column, Literal};
918    use datafusion_expr::lit as expr_lit;
919
920    use datafusion_common::plan_datafusion_err;
921    use datafusion_physical_expr_common::physical_expr::fmt_sql;
922
923    use crate::planner::logical2physical;
924    use arrow::array::BooleanArray;
925    use datafusion_expr::col as logical_col;
926    /// Performs a binary operation, applying any type coercion necessary
927    fn binary_op(
928        left: Arc<dyn PhysicalExpr>,
929        op: Operator,
930        right: Arc<dyn PhysicalExpr>,
931        schema: &Schema,
932    ) -> Result<Arc<dyn PhysicalExpr>> {
933        let left_type = left.data_type(schema)?;
934        let right_type = right.data_type(schema)?;
935        let (lhs, rhs) =
936            BinaryTypeCoercer::new(&left_type, &op, &right_type).get_input_types()?;
937
938        let left_expr = try_cast(left, schema, lhs)?;
939        let right_expr = try_cast(right, schema, rhs)?;
940        binary(left_expr, op, right_expr, schema)
941    }
942
943    #[test]
944    fn binary_comparison() -> Result<()> {
945        let schema = Schema::new(vec![
946            Field::new("a", DataType::Int32, false),
947            Field::new("b", DataType::Int32, false),
948        ]);
949        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
950        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
951
952        // expression: "a < b"
953        let lt = binary(
954            col("a", &schema)?,
955            Operator::Lt,
956            col("b", &schema)?,
957            &schema,
958        )?;
959        let batch =
960            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
961
962        let result = lt
963            .evaluate(&batch)?
964            .into_array(batch.num_rows())
965            .expect("Failed to convert to array");
966        assert_eq!(result.len(), 5);
967
968        let expected = [false, false, true, true, true];
969        let result =
970            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
971        for (i, &expected_item) in expected.iter().enumerate().take(5) {
972            assert_eq!(result.value(i), expected_item);
973        }
974
975        Ok(())
976    }
977
978    #[test]
979    fn binary_nested() -> Result<()> {
980        let schema = Schema::new(vec![
981            Field::new("a", DataType::Int32, false),
982            Field::new("b", DataType::Int32, false),
983        ]);
984        let a = Int32Array::from(vec![2, 4, 6, 8, 10]);
985        let b = Int32Array::from(vec![2, 5, 4, 8, 8]);
986
987        // expression: "a < b OR a == b"
988        let expr = binary(
989            binary(
990                col("a", &schema)?,
991                Operator::Lt,
992                col("b", &schema)?,
993                &schema,
994            )?,
995            Operator::Or,
996            binary(
997                col("a", &schema)?,
998                Operator::Eq,
999                col("b", &schema)?,
1000                &schema,
1001            )?,
1002            &schema,
1003        )?;
1004        let batch =
1005            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)])?;
1006
1007        assert_eq!("a@0 < b@1 OR a@0 = b@1", format!("{expr}"));
1008
1009        let result = expr
1010            .evaluate(&batch)?
1011            .into_array(batch.num_rows())
1012            .expect("Failed to convert to array");
1013        assert_eq!(result.len(), 5);
1014
1015        let expected = [true, true, false, true, false];
1016        let result =
1017            as_boolean_array(&result).expect("failed to downcast to BooleanArray");
1018        for (i, &expected_item) in expected.iter().enumerate().take(5) {
1019            assert_eq!(result.value(i), expected_item);
1020        }
1021
1022        Ok(())
1023    }
1024
1025    // runs an end-to-end test of physical type coercion:
1026    // 1. construct a record batch with two columns of type A and B
1027    //  (*_ARRAY is the Rust Arrow array type, and *_TYPE is the DataType of the elements)
1028    // 2. construct a physical expression of A OP B
1029    // 3. evaluate the expression
1030    // 4. verify that the resulting expression is of type C
1031    // 5. verify that the results of evaluation are $VEC
1032    macro_rules! test_coercion {
1033        ($A_ARRAY:ident, $A_TYPE:expr, $A_VEC:expr, $B_ARRAY:ident, $B_TYPE:expr, $B_VEC:expr, $OP:expr, $C_ARRAY:ident, $C_TYPE:expr, $VEC:expr,) => {{
1034            let schema = Schema::new(vec![
1035                Field::new("a", $A_TYPE, false),
1036                Field::new("b", $B_TYPE, false),
1037            ]);
1038            let a = $A_ARRAY::from($A_VEC);
1039            let b = $B_ARRAY::from($B_VEC);
1040            let (lhs, rhs) = BinaryTypeCoercer::new(&$A_TYPE, &$OP, &$B_TYPE).get_input_types()?;
1041
1042            let left = try_cast(col("a", &schema)?, &schema, lhs)?;
1043            let right = try_cast(col("b", &schema)?, &schema, rhs)?;
1044
1045            // verify that we can construct the expression
1046            let expression = binary(left, $OP, right, &schema)?;
1047            let batch = RecordBatch::try_new(
1048                Arc::new(schema.clone()),
1049                vec![Arc::new(a), Arc::new(b)],
1050            )?;
1051
1052            // verify that the expression's type is correct
1053            assert_eq!(expression.data_type(&schema)?, $C_TYPE);
1054
1055            // compute
1056            let result = expression.evaluate(&batch)?.into_array(batch.num_rows()).expect("Failed to convert to array");
1057
1058            // verify that the array's data_type is correct
1059            assert_eq!(*result.data_type(), $C_TYPE);
1060
1061            // verify that the data itself is downcastable
1062            let result = result
1063                .as_any()
1064                .downcast_ref::<$C_ARRAY>()
1065                .expect("failed to downcast");
1066            // verify that the result itself is correct
1067            for (i, x) in $VEC.iter().enumerate() {
1068                let v = result.value(i);
1069                assert_eq!(
1070                    v,
1071                    *x,
1072                    "Unexpected output at position {i}:\n\nActual:\n{v}\n\nExpected:\n{x}"
1073                );
1074            }
1075        }};
1076    }
1077
1078    #[test]
1079    fn test_type_coercion() -> Result<()> {
1080        test_coercion!(
1081            Int32Array,
1082            DataType::Int32,
1083            vec![1i32, 2i32],
1084            UInt32Array,
1085            DataType::UInt32,
1086            vec![1u32, 2u32],
1087            Operator::Plus,
1088            Int64Array,
1089            DataType::Int64,
1090            [2i64, 4i64],
1091        );
1092        test_coercion!(
1093            Int32Array,
1094            DataType::Int32,
1095            vec![1i32],
1096            UInt16Array,
1097            DataType::UInt16,
1098            vec![1u16],
1099            Operator::Plus,
1100            Int32Array,
1101            DataType::Int32,
1102            [2i32],
1103        );
1104        test_coercion!(
1105            Float32Array,
1106            DataType::Float32,
1107            vec![1f32],
1108            UInt16Array,
1109            DataType::UInt16,
1110            vec![1u16],
1111            Operator::Plus,
1112            Float32Array,
1113            DataType::Float32,
1114            [2f32],
1115        );
1116        test_coercion!(
1117            Float32Array,
1118            DataType::Float32,
1119            vec![2f32],
1120            UInt16Array,
1121            DataType::UInt16,
1122            vec![1u16],
1123            Operator::Multiply,
1124            Float32Array,
1125            DataType::Float32,
1126            [2f32],
1127        );
1128        test_coercion!(
1129            StringArray,
1130            DataType::Utf8,
1131            vec!["1994-12-13", "1995-01-26"],
1132            Date32Array,
1133            DataType::Date32,
1134            vec![9112, 9156],
1135            Operator::Eq,
1136            BooleanArray,
1137            DataType::Boolean,
1138            [true, true],
1139        );
1140        test_coercion!(
1141            StringArray,
1142            DataType::Utf8,
1143            vec!["1994-12-13", "1995-01-26"],
1144            Date32Array,
1145            DataType::Date32,
1146            vec![9113, 9154],
1147            Operator::Lt,
1148            BooleanArray,
1149            DataType::Boolean,
1150            [true, false],
1151        );
1152        test_coercion!(
1153            StringArray,
1154            DataType::Utf8,
1155            vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
1156            Date64Array,
1157            DataType::Date64,
1158            vec![787322096000, 791083425000],
1159            Operator::Eq,
1160            BooleanArray,
1161            DataType::Boolean,
1162            [true, true],
1163        );
1164        test_coercion!(
1165            StringArray,
1166            DataType::Utf8,
1167            vec!["1994-12-13T12:34:56", "1995-01-26T01:23:45"],
1168            Date64Array,
1169            DataType::Date64,
1170            vec![787322096001, 791083424999],
1171            Operator::Lt,
1172            BooleanArray,
1173            DataType::Boolean,
1174            [true, false],
1175        );
1176        test_coercion!(
1177            StringViewArray,
1178            DataType::Utf8View,
1179            vec!["abc"; 5],
1180            StringArray,
1181            DataType::Utf8,
1182            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1183            Operator::RegexMatch,
1184            BooleanArray,
1185            DataType::Boolean,
1186            [true, false, true, false, false],
1187        );
1188        test_coercion!(
1189            StringViewArray,
1190            DataType::Utf8View,
1191            vec!["abc"; 5],
1192            StringArray,
1193            DataType::Utf8,
1194            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1195            Operator::RegexIMatch,
1196            BooleanArray,
1197            DataType::Boolean,
1198            [true, true, true, true, false],
1199        );
1200        test_coercion!(
1201            StringArray,
1202            DataType::Utf8,
1203            vec!["abc"; 5],
1204            StringViewArray,
1205            DataType::Utf8View,
1206            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1207            Operator::RegexNotMatch,
1208            BooleanArray,
1209            DataType::Boolean,
1210            [false, true, false, true, true],
1211        );
1212        test_coercion!(
1213            StringArray,
1214            DataType::Utf8,
1215            vec!["abc"; 5],
1216            StringViewArray,
1217            DataType::Utf8View,
1218            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1219            Operator::RegexNotIMatch,
1220            BooleanArray,
1221            DataType::Boolean,
1222            [false, false, false, false, true],
1223        );
1224        test_coercion!(
1225            StringArray,
1226            DataType::Utf8,
1227            vec!["abc"; 5],
1228            StringArray,
1229            DataType::Utf8,
1230            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1231            Operator::RegexMatch,
1232            BooleanArray,
1233            DataType::Boolean,
1234            [true, false, true, false, false],
1235        );
1236        test_coercion!(
1237            StringArray,
1238            DataType::Utf8,
1239            vec!["abc"; 5],
1240            StringArray,
1241            DataType::Utf8,
1242            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1243            Operator::RegexIMatch,
1244            BooleanArray,
1245            DataType::Boolean,
1246            [true, true, true, true, false],
1247        );
1248        test_coercion!(
1249            StringArray,
1250            DataType::Utf8,
1251            vec!["abc"; 5],
1252            StringArray,
1253            DataType::Utf8,
1254            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1255            Operator::RegexNotMatch,
1256            BooleanArray,
1257            DataType::Boolean,
1258            [false, true, false, true, true],
1259        );
1260        test_coercion!(
1261            StringArray,
1262            DataType::Utf8,
1263            vec!["abc"; 5],
1264            StringArray,
1265            DataType::Utf8,
1266            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1267            Operator::RegexNotIMatch,
1268            BooleanArray,
1269            DataType::Boolean,
1270            [false, false, false, false, true],
1271        );
1272        test_coercion!(
1273            LargeStringArray,
1274            DataType::LargeUtf8,
1275            vec!["abc"; 5],
1276            LargeStringArray,
1277            DataType::LargeUtf8,
1278            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1279            Operator::RegexMatch,
1280            BooleanArray,
1281            DataType::Boolean,
1282            [true, false, true, false, false],
1283        );
1284        test_coercion!(
1285            LargeStringArray,
1286            DataType::LargeUtf8,
1287            vec!["abc"; 5],
1288            LargeStringArray,
1289            DataType::LargeUtf8,
1290            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1291            Operator::RegexIMatch,
1292            BooleanArray,
1293            DataType::Boolean,
1294            [true, true, true, true, false],
1295        );
1296        test_coercion!(
1297            LargeStringArray,
1298            DataType::LargeUtf8,
1299            vec!["abc"; 5],
1300            LargeStringArray,
1301            DataType::LargeUtf8,
1302            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1303            Operator::RegexNotMatch,
1304            BooleanArray,
1305            DataType::Boolean,
1306            [false, true, false, true, true],
1307        );
1308        test_coercion!(
1309            LargeStringArray,
1310            DataType::LargeUtf8,
1311            vec!["abc"; 5],
1312            LargeStringArray,
1313            DataType::LargeUtf8,
1314            vec!["^a", "^A", "(b|d)", "(B|D)", "^(b|c)"],
1315            Operator::RegexNotIMatch,
1316            BooleanArray,
1317            DataType::Boolean,
1318            [false, false, false, false, true],
1319        );
1320        test_coercion!(
1321            StringArray,
1322            DataType::Utf8,
1323            vec!["abc"; 5],
1324            StringArray,
1325            DataType::Utf8,
1326            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1327            Operator::LikeMatch,
1328            BooleanArray,
1329            DataType::Boolean,
1330            [true, false, false, true, false],
1331        );
1332        test_coercion!(
1333            StringArray,
1334            DataType::Utf8,
1335            vec!["abc"; 5],
1336            StringArray,
1337            DataType::Utf8,
1338            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1339            Operator::ILikeMatch,
1340            BooleanArray,
1341            DataType::Boolean,
1342            [true, true, false, true, true],
1343        );
1344        test_coercion!(
1345            StringArray,
1346            DataType::Utf8,
1347            vec!["abc"; 5],
1348            StringArray,
1349            DataType::Utf8,
1350            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1351            Operator::NotLikeMatch,
1352            BooleanArray,
1353            DataType::Boolean,
1354            [false, true, true, false, true],
1355        );
1356        test_coercion!(
1357            StringArray,
1358            DataType::Utf8,
1359            vec!["abc"; 5],
1360            StringArray,
1361            DataType::Utf8,
1362            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1363            Operator::NotILikeMatch,
1364            BooleanArray,
1365            DataType::Boolean,
1366            [false, false, true, false, false],
1367        );
1368        test_coercion!(
1369            LargeStringArray,
1370            DataType::LargeUtf8,
1371            vec!["abc"; 5],
1372            LargeStringArray,
1373            DataType::LargeUtf8,
1374            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1375            Operator::LikeMatch,
1376            BooleanArray,
1377            DataType::Boolean,
1378            [true, false, false, true, false],
1379        );
1380        test_coercion!(
1381            LargeStringArray,
1382            DataType::LargeUtf8,
1383            vec!["abc"; 5],
1384            LargeStringArray,
1385            DataType::LargeUtf8,
1386            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1387            Operator::ILikeMatch,
1388            BooleanArray,
1389            DataType::Boolean,
1390            [true, true, false, true, true],
1391        );
1392        test_coercion!(
1393            LargeStringArray,
1394            DataType::LargeUtf8,
1395            vec!["abc"; 5],
1396            LargeStringArray,
1397            DataType::LargeUtf8,
1398            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1399            Operator::NotLikeMatch,
1400            BooleanArray,
1401            DataType::Boolean,
1402            [false, true, true, false, true],
1403        );
1404        test_coercion!(
1405            LargeStringArray,
1406            DataType::LargeUtf8,
1407            vec!["abc"; 5],
1408            LargeStringArray,
1409            DataType::LargeUtf8,
1410            vec!["a__", "A%BC", "A_BC", "abc", "a%C"],
1411            Operator::NotILikeMatch,
1412            BooleanArray,
1413            DataType::Boolean,
1414            [false, false, true, false, false],
1415        );
1416        test_coercion!(
1417            Int16Array,
1418            DataType::Int16,
1419            vec![1i16, 2i16, 3i16],
1420            Int64Array,
1421            DataType::Int64,
1422            vec![10i64, 4i64, 5i64],
1423            Operator::BitwiseAnd,
1424            Int64Array,
1425            DataType::Int64,
1426            [0i64, 0i64, 1i64],
1427        );
1428        test_coercion!(
1429            UInt16Array,
1430            DataType::UInt16,
1431            vec![1u16, 2u16, 3u16],
1432            UInt64Array,
1433            DataType::UInt64,
1434            vec![10u64, 4u64, 5u64],
1435            Operator::BitwiseAnd,
1436            UInt64Array,
1437            DataType::UInt64,
1438            [0u64, 0u64, 1u64],
1439        );
1440        test_coercion!(
1441            Int16Array,
1442            DataType::Int16,
1443            vec![3i16, 2i16, 3i16],
1444            Int64Array,
1445            DataType::Int64,
1446            vec![10i64, 6i64, 5i64],
1447            Operator::BitwiseOr,
1448            Int64Array,
1449            DataType::Int64,
1450            [11i64, 6i64, 7i64],
1451        );
1452        test_coercion!(
1453            UInt16Array,
1454            DataType::UInt16,
1455            vec![1u16, 2u16, 3u16],
1456            UInt64Array,
1457            DataType::UInt64,
1458            vec![10u64, 4u64, 5u64],
1459            Operator::BitwiseOr,
1460            UInt64Array,
1461            DataType::UInt64,
1462            [11u64, 6u64, 7u64],
1463        );
1464        test_coercion!(
1465            Int16Array,
1466            DataType::Int16,
1467            vec![3i16, 2i16, 3i16],
1468            Int64Array,
1469            DataType::Int64,
1470            vec![10i64, 6i64, 5i64],
1471            Operator::BitwiseXor,
1472            Int64Array,
1473            DataType::Int64,
1474            [9i64, 4i64, 6i64],
1475        );
1476        test_coercion!(
1477            UInt16Array,
1478            DataType::UInt16,
1479            vec![3u16, 2u16, 3u16],
1480            UInt64Array,
1481            DataType::UInt64,
1482            vec![10u64, 6u64, 5u64],
1483            Operator::BitwiseXor,
1484            UInt64Array,
1485            DataType::UInt64,
1486            [9u64, 4u64, 6u64],
1487        );
1488        test_coercion!(
1489            Int16Array,
1490            DataType::Int16,
1491            vec![4i16, 27i16, 35i16],
1492            Int64Array,
1493            DataType::Int64,
1494            vec![2i64, 3i64, 4i64],
1495            Operator::BitwiseShiftRight,
1496            Int64Array,
1497            DataType::Int64,
1498            [1i64, 3i64, 2i64],
1499        );
1500        test_coercion!(
1501            UInt16Array,
1502            DataType::UInt16,
1503            vec![4u16, 27u16, 35u16],
1504            UInt64Array,
1505            DataType::UInt64,
1506            vec![2u64, 3u64, 4u64],
1507            Operator::BitwiseShiftRight,
1508            UInt64Array,
1509            DataType::UInt64,
1510            [1u64, 3u64, 2u64],
1511        );
1512        test_coercion!(
1513            Int16Array,
1514            DataType::Int16,
1515            vec![2i16, 3i16, 4i16],
1516            Int64Array,
1517            DataType::Int64,
1518            vec![4i64, 12i64, 7i64],
1519            Operator::BitwiseShiftLeft,
1520            Int64Array,
1521            DataType::Int64,
1522            [32i64, 12288i64, 512i64],
1523        );
1524        test_coercion!(
1525            UInt16Array,
1526            DataType::UInt16,
1527            vec![2u16, 3u16, 4u16],
1528            UInt64Array,
1529            DataType::UInt64,
1530            vec![4u64, 12u64, 7u64],
1531            Operator::BitwiseShiftLeft,
1532            UInt64Array,
1533            DataType::UInt64,
1534            [32u64, 12288u64, 512u64],
1535        );
1536        Ok(())
1537    }
1538
1539    // Note it would be nice to use the same test_coercion macro as
1540    // above, but sadly the type of the values of the dictionary are
1541    // not encoded in the rust type of the DictionaryArray. Thus there
1542    // is no way at the time of this writing to create a dictionary
1543    // array using the `From` trait
1544    #[test]
1545    fn test_dictionary_type_to_array_coercion() -> Result<()> {
1546        // Test string  a string dictionary
1547        let dict_type =
1548            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
1549        let string_type = DataType::Utf8;
1550
1551        // build dictionary
1552        let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
1553
1554        dict_builder.append("one")?;
1555        dict_builder.append_null();
1556        dict_builder.append("three")?;
1557        dict_builder.append("four")?;
1558        let dict_array = Arc::new(dict_builder.finish()) as ArrayRef;
1559
1560        let str_array = Arc::new(StringArray::from(vec![
1561            Some("not one"),
1562            Some("two"),
1563            None,
1564            Some("four"),
1565        ])) as ArrayRef;
1566
1567        let schema = Arc::new(Schema::new(vec![
1568            Field::new("a", dict_type.clone(), true),
1569            Field::new("b", string_type.clone(), true),
1570        ]));
1571
1572        // Test 1: a = b
1573        let result = BooleanArray::from(vec![Some(false), None, None, Some(true)]);
1574        apply_logic_op(&schema, &dict_array, &str_array, Operator::Eq, result)?;
1575
1576        // Test 2: now test the other direction
1577        // b = a
1578        let schema = Arc::new(Schema::new(vec![
1579            Field::new("a", string_type, true),
1580            Field::new("b", dict_type, true),
1581        ]));
1582        let result = BooleanArray::from(vec![Some(false), None, None, Some(true)]);
1583        apply_logic_op(&schema, &str_array, &dict_array, Operator::Eq, result)?;
1584
1585        Ok(())
1586    }
1587
1588    #[test]
1589    fn plus_op() -> Result<()> {
1590        let schema = Schema::new(vec![
1591            Field::new("a", DataType::Int32, false),
1592            Field::new("b", DataType::Int32, false),
1593        ]);
1594        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1595        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1596
1597        apply_arithmetic::<Int32Type>(
1598            Arc::new(schema),
1599            vec![Arc::new(a), Arc::new(b)],
1600            Operator::Plus,
1601            Int32Array::from(vec![2, 4, 7, 12, 21]),
1602        )?;
1603
1604        Ok(())
1605    }
1606
1607    #[test]
1608    fn plus_op_dict() -> Result<()> {
1609        let schema = Schema::new(vec![
1610            Field::new(
1611                "a",
1612                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1613                true,
1614            ),
1615            Field::new(
1616                "b",
1617                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1618                true,
1619            ),
1620        ]);
1621
1622        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1623        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
1624        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
1625
1626        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1627        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
1628        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
1629
1630        apply_arithmetic::<Int32Type>(
1631            Arc::new(schema),
1632            vec![Arc::new(a), Arc::new(b)],
1633            Operator::Plus,
1634            Int32Array::from(vec![Some(2), None, Some(4), Some(8), None]),
1635        )?;
1636
1637        Ok(())
1638    }
1639
1640    #[test]
1641    fn plus_op_dict_decimal() -> Result<()> {
1642        let schema = Schema::new(vec![
1643            Field::new(
1644                "a",
1645                DataType::Dictionary(
1646                    Box::new(DataType::Int8),
1647                    Box::new(DataType::Decimal128(10, 0)),
1648                ),
1649                true,
1650            ),
1651            Field::new(
1652                "b",
1653                DataType::Dictionary(
1654                    Box::new(DataType::Int8),
1655                    Box::new(DataType::Decimal128(10, 0)),
1656                ),
1657                true,
1658            ),
1659        ]);
1660
1661        let value = 123;
1662        let decimal_array = Arc::new(create_decimal_array(
1663            &[
1664                Some(value),
1665                Some(value + 2),
1666                Some(value - 1),
1667                Some(value + 1),
1668            ],
1669            10,
1670            0,
1671        ));
1672
1673        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
1674        let a = DictionaryArray::try_new(keys, decimal_array)?;
1675
1676        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
1677        let decimal_array = Arc::new(create_decimal_array(
1678            &[
1679                Some(value + 1),
1680                Some(value + 3),
1681                Some(value),
1682                Some(value + 2),
1683            ],
1684            10,
1685            0,
1686        ));
1687        let b = DictionaryArray::try_new(keys, decimal_array)?;
1688
1689        apply_arithmetic(
1690            Arc::new(schema),
1691            vec![Arc::new(a), Arc::new(b)],
1692            Operator::Plus,
1693            create_decimal_array(&[Some(247), None, None, Some(247), Some(246)], 11, 0),
1694        )?;
1695
1696        Ok(())
1697    }
1698
1699    #[test]
1700    fn plus_op_scalar() -> Result<()> {
1701        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1702        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1703
1704        apply_arithmetic_scalar(
1705            Arc::new(schema),
1706            vec![Arc::new(a)],
1707            Operator::Plus,
1708            ScalarValue::Int32(Some(1)),
1709            Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
1710        )?;
1711
1712        Ok(())
1713    }
1714
1715    #[test]
1716    fn plus_op_dict_scalar() -> Result<()> {
1717        let schema = Schema::new(vec![Field::new(
1718            "a",
1719            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1720            true,
1721        )]);
1722
1723        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
1724
1725        dict_builder.append(1)?;
1726        dict_builder.append_null();
1727        dict_builder.append(2)?;
1728        dict_builder.append(5)?;
1729
1730        let a = dict_builder.finish();
1731
1732        let expected: PrimitiveArray<Int32Type> =
1733            PrimitiveArray::from(vec![Some(2), None, Some(3), Some(6)]);
1734
1735        apply_arithmetic_scalar(
1736            Arc::new(schema),
1737            vec![Arc::new(a)],
1738            Operator::Plus,
1739            ScalarValue::Dictionary(
1740                Box::new(DataType::Int8),
1741                Box::new(ScalarValue::Int32(Some(1))),
1742            ),
1743            Arc::new(expected),
1744        )?;
1745
1746        Ok(())
1747    }
1748
1749    #[test]
1750    fn plus_op_dict_scalar_decimal() -> Result<()> {
1751        let schema = Schema::new(vec![Field::new(
1752            "a",
1753            DataType::Dictionary(
1754                Box::new(DataType::Int8),
1755                Box::new(DataType::Decimal128(10, 0)),
1756            ),
1757            true,
1758        )]);
1759
1760        let value = 123;
1761        let decimal_array = Arc::new(create_decimal_array(
1762            &[Some(value), None, Some(value - 1), Some(value + 1)],
1763            10,
1764            0,
1765        ));
1766
1767        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
1768        let a = DictionaryArray::try_new(keys, decimal_array)?;
1769
1770        let decimal_array = Arc::new(create_decimal_array(
1771            &[
1772                Some(value + 1),
1773                Some(value),
1774                None,
1775                Some(value + 2),
1776                Some(value + 1),
1777            ],
1778            11,
1779            0,
1780        ));
1781
1782        apply_arithmetic_scalar(
1783            Arc::new(schema),
1784            vec![Arc::new(a)],
1785            Operator::Plus,
1786            ScalarValue::Dictionary(
1787                Box::new(DataType::Int8),
1788                Box::new(ScalarValue::Decimal128(Some(1), 10, 0)),
1789            ),
1790            decimal_array,
1791        )?;
1792
1793        Ok(())
1794    }
1795
1796    #[test]
1797    fn minus_op() -> Result<()> {
1798        let schema = Arc::new(Schema::new(vec![
1799            Field::new("a", DataType::Int32, false),
1800            Field::new("b", DataType::Int32, false),
1801        ]));
1802        let a = Arc::new(Int32Array::from(vec![1, 2, 4, 8, 16]));
1803        let b = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1804
1805        apply_arithmetic::<Int32Type>(
1806            Arc::clone(&schema),
1807            vec![
1808                Arc::clone(&a) as Arc<dyn Array>,
1809                Arc::clone(&b) as Arc<dyn Array>,
1810            ],
1811            Operator::Minus,
1812            Int32Array::from(vec![0, 0, 1, 4, 11]),
1813        )?;
1814
1815        // should handle have negative values in result (for signed)
1816        apply_arithmetic::<Int32Type>(
1817            schema,
1818            vec![b, a],
1819            Operator::Minus,
1820            Int32Array::from(vec![0, 0, -1, -4, -11]),
1821        )?;
1822
1823        Ok(())
1824    }
1825
1826    #[test]
1827    fn minus_op_dict() -> Result<()> {
1828        let schema = Schema::new(vec![
1829            Field::new(
1830                "a",
1831                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1832                true,
1833            ),
1834            Field::new(
1835                "b",
1836                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1837                true,
1838            ),
1839        ]);
1840
1841        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1842        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
1843        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
1844
1845        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
1846        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
1847        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
1848
1849        apply_arithmetic::<Int32Type>(
1850            Arc::new(schema),
1851            vec![Arc::new(a), Arc::new(b)],
1852            Operator::Minus,
1853            Int32Array::from(vec![Some(0), None, Some(0), Some(0), None]),
1854        )?;
1855
1856        Ok(())
1857    }
1858
1859    #[test]
1860    fn minus_op_dict_decimal() -> Result<()> {
1861        let schema = Schema::new(vec![
1862            Field::new(
1863                "a",
1864                DataType::Dictionary(
1865                    Box::new(DataType::Int8),
1866                    Box::new(DataType::Decimal128(10, 0)),
1867                ),
1868                true,
1869            ),
1870            Field::new(
1871                "b",
1872                DataType::Dictionary(
1873                    Box::new(DataType::Int8),
1874                    Box::new(DataType::Decimal128(10, 0)),
1875                ),
1876                true,
1877            ),
1878        ]);
1879
1880        let value = 123;
1881        let decimal_array = Arc::new(create_decimal_array(
1882            &[
1883                Some(value),
1884                Some(value + 2),
1885                Some(value - 1),
1886                Some(value + 1),
1887            ],
1888            10,
1889            0,
1890        ));
1891
1892        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
1893        let a = DictionaryArray::try_new(keys, decimal_array)?;
1894
1895        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
1896        let decimal_array = Arc::new(create_decimal_array(
1897            &[
1898                Some(value + 1),
1899                Some(value + 3),
1900                Some(value),
1901                Some(value + 2),
1902            ],
1903            10,
1904            0,
1905        ));
1906        let b = DictionaryArray::try_new(keys, decimal_array)?;
1907
1908        apply_arithmetic(
1909            Arc::new(schema),
1910            vec![Arc::new(a), Arc::new(b)],
1911            Operator::Minus,
1912            create_decimal_array(&[Some(-1), None, None, Some(1), Some(0)], 11, 0),
1913        )?;
1914
1915        Ok(())
1916    }
1917
1918    #[test]
1919    fn minus_op_scalar() -> Result<()> {
1920        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1921        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
1922
1923        apply_arithmetic_scalar(
1924            Arc::new(schema),
1925            vec![Arc::new(a)],
1926            Operator::Minus,
1927            ScalarValue::Int32(Some(1)),
1928            Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4])),
1929        )?;
1930
1931        Ok(())
1932    }
1933
1934    #[test]
1935    fn minus_op_dict_scalar() -> Result<()> {
1936        let schema = Schema::new(vec![Field::new(
1937            "a",
1938            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
1939            true,
1940        )]);
1941
1942        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
1943
1944        dict_builder.append(1)?;
1945        dict_builder.append_null();
1946        dict_builder.append(2)?;
1947        dict_builder.append(5)?;
1948
1949        let a = dict_builder.finish();
1950
1951        let expected: PrimitiveArray<Int32Type> =
1952            PrimitiveArray::from(vec![Some(0), None, Some(1), Some(4)]);
1953
1954        apply_arithmetic_scalar(
1955            Arc::new(schema),
1956            vec![Arc::new(a)],
1957            Operator::Minus,
1958            ScalarValue::Dictionary(
1959                Box::new(DataType::Int8),
1960                Box::new(ScalarValue::Int32(Some(1))),
1961            ),
1962            Arc::new(expected),
1963        )?;
1964
1965        Ok(())
1966    }
1967
1968    #[test]
1969    fn minus_op_dict_scalar_decimal() -> Result<()> {
1970        let schema = Schema::new(vec![Field::new(
1971            "a",
1972            DataType::Dictionary(
1973                Box::new(DataType::Int8),
1974                Box::new(DataType::Decimal128(10, 0)),
1975            ),
1976            true,
1977        )]);
1978
1979        let value = 123;
1980        let decimal_array = Arc::new(create_decimal_array(
1981            &[Some(value), None, Some(value - 1), Some(value + 1)],
1982            10,
1983            0,
1984        ));
1985
1986        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
1987        let a = DictionaryArray::try_new(keys, decimal_array)?;
1988
1989        let decimal_array = Arc::new(create_decimal_array(
1990            &[
1991                Some(value - 1),
1992                Some(value - 2),
1993                None,
1994                Some(value),
1995                Some(value - 1),
1996            ],
1997            11,
1998            0,
1999        ));
2000
2001        apply_arithmetic_scalar(
2002            Arc::new(schema),
2003            vec![Arc::new(a)],
2004            Operator::Minus,
2005            ScalarValue::Dictionary(
2006                Box::new(DataType::Int8),
2007                Box::new(ScalarValue::Decimal128(Some(1), 10, 0)),
2008            ),
2009            decimal_array,
2010        )?;
2011
2012        Ok(())
2013    }
2014
2015    #[test]
2016    fn multiply_op() -> Result<()> {
2017        let schema = Arc::new(Schema::new(vec![
2018            Field::new("a", DataType::Int32, false),
2019            Field::new("b", DataType::Int32, false),
2020        ]));
2021        let a = Arc::new(Int32Array::from(vec![4, 8, 16, 32, 64]));
2022        let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
2023
2024        apply_arithmetic::<Int32Type>(
2025            schema,
2026            vec![a, b],
2027            Operator::Multiply,
2028            Int32Array::from(vec![8, 32, 128, 512, 2048]),
2029        )?;
2030
2031        Ok(())
2032    }
2033
2034    #[test]
2035    fn multiply_op_dict() -> Result<()> {
2036        let schema = Schema::new(vec![
2037            Field::new(
2038                "a",
2039                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2040                true,
2041            ),
2042            Field::new(
2043                "b",
2044                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2045                true,
2046            ),
2047        ]);
2048
2049        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2050        let keys = Int8Array::from(vec![Some(0), None, Some(1), Some(3), None]);
2051        let a = DictionaryArray::try_new(keys, Arc::new(a))?;
2052
2053        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2054        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2055        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2056
2057        apply_arithmetic::<Int32Type>(
2058            Arc::new(schema),
2059            vec![Arc::new(a), Arc::new(b)],
2060            Operator::Multiply,
2061            Int32Array::from(vec![Some(1), None, Some(4), Some(16), None]),
2062        )?;
2063
2064        Ok(())
2065    }
2066
2067    #[test]
2068    fn multiply_op_dict_decimal() -> Result<()> {
2069        let schema = Schema::new(vec![
2070            Field::new(
2071                "a",
2072                DataType::Dictionary(
2073                    Box::new(DataType::Int8),
2074                    Box::new(DataType::Decimal128(10, 0)),
2075                ),
2076                true,
2077            ),
2078            Field::new(
2079                "b",
2080                DataType::Dictionary(
2081                    Box::new(DataType::Int8),
2082                    Box::new(DataType::Decimal128(10, 0)),
2083                ),
2084                true,
2085            ),
2086        ]);
2087
2088        let value = 123;
2089        let decimal_array = Arc::new(create_decimal_array(
2090            &[
2091                Some(value),
2092                Some(value + 2),
2093                Some(value - 1),
2094                Some(value + 1),
2095            ],
2096            10,
2097            0,
2098        )) as ArrayRef;
2099
2100        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2101        let a = DictionaryArray::try_new(keys, decimal_array)?;
2102
2103        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2104        let decimal_array = Arc::new(create_decimal_array(
2105            &[
2106                Some(value + 1),
2107                Some(value + 3),
2108                Some(value),
2109                Some(value + 2),
2110            ],
2111            10,
2112            0,
2113        ));
2114        let b = DictionaryArray::try_new(keys, decimal_array)?;
2115
2116        apply_arithmetic(
2117            Arc::new(schema),
2118            vec![Arc::new(a), Arc::new(b)],
2119            Operator::Multiply,
2120            create_decimal_array(
2121                &[Some(15252), None, None, Some(15252), Some(15129)],
2122                21,
2123                0,
2124            ),
2125        )?;
2126
2127        Ok(())
2128    }
2129
2130    #[test]
2131    fn multiply_op_scalar() -> Result<()> {
2132        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2133        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2134
2135        apply_arithmetic_scalar(
2136            Arc::new(schema),
2137            vec![Arc::new(a)],
2138            Operator::Multiply,
2139            ScalarValue::Int32(Some(2)),
2140            Arc::new(Int32Array::from(vec![2, 4, 6, 8, 10])),
2141        )?;
2142
2143        Ok(())
2144    }
2145
2146    #[test]
2147    fn multiply_op_dict_scalar() -> Result<()> {
2148        let schema = Schema::new(vec![Field::new(
2149            "a",
2150            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2151            true,
2152        )]);
2153
2154        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2155
2156        dict_builder.append(1)?;
2157        dict_builder.append_null();
2158        dict_builder.append(2)?;
2159        dict_builder.append(5)?;
2160
2161        let a = dict_builder.finish();
2162
2163        let expected: PrimitiveArray<Int32Type> =
2164            PrimitiveArray::from(vec![Some(2), None, Some(4), Some(10)]);
2165
2166        apply_arithmetic_scalar(
2167            Arc::new(schema),
2168            vec![Arc::new(a)],
2169            Operator::Multiply,
2170            ScalarValue::Dictionary(
2171                Box::new(DataType::Int8),
2172                Box::new(ScalarValue::Int32(Some(2))),
2173            ),
2174            Arc::new(expected),
2175        )?;
2176
2177        Ok(())
2178    }
2179
2180    #[test]
2181    fn multiply_op_dict_scalar_decimal() -> Result<()> {
2182        let schema = Schema::new(vec![Field::new(
2183            "a",
2184            DataType::Dictionary(
2185                Box::new(DataType::Int8),
2186                Box::new(DataType::Decimal128(10, 0)),
2187            ),
2188            true,
2189        )]);
2190
2191        let value = 123;
2192        let decimal_array = Arc::new(create_decimal_array(
2193            &[Some(value), None, Some(value - 1), Some(value + 1)],
2194            10,
2195            0,
2196        ));
2197
2198        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2199        let a = DictionaryArray::try_new(keys, decimal_array)?;
2200
2201        let decimal_array = Arc::new(create_decimal_array(
2202            &[Some(246), Some(244), None, Some(248), Some(246)],
2203            21,
2204            0,
2205        ));
2206
2207        apply_arithmetic_scalar(
2208            Arc::new(schema),
2209            vec![Arc::new(a)],
2210            Operator::Multiply,
2211            ScalarValue::Dictionary(
2212                Box::new(DataType::Int8),
2213                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2214            ),
2215            decimal_array,
2216        )?;
2217
2218        Ok(())
2219    }
2220
2221    #[test]
2222    fn divide_op() -> Result<()> {
2223        let schema = Arc::new(Schema::new(vec![
2224            Field::new("a", DataType::Int32, false),
2225            Field::new("b", DataType::Int32, false),
2226        ]));
2227        let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
2228        let b = Arc::new(Int32Array::from(vec![2, 4, 8, 16, 32]));
2229
2230        apply_arithmetic::<Int32Type>(
2231            schema,
2232            vec![a, b],
2233            Operator::Divide,
2234            Int32Array::from(vec![4, 8, 16, 32, 64]),
2235        )?;
2236
2237        Ok(())
2238    }
2239
2240    #[test]
2241    fn divide_op_dict() -> Result<()> {
2242        let schema = Schema::new(vec![
2243            Field::new(
2244                "a",
2245                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2246                true,
2247            ),
2248            Field::new(
2249                "b",
2250                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2251                true,
2252            ),
2253        ]);
2254
2255        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2256
2257        dict_builder.append(1)?;
2258        dict_builder.append_null();
2259        dict_builder.append(2)?;
2260        dict_builder.append(5)?;
2261        dict_builder.append(0)?;
2262
2263        let a = dict_builder.finish();
2264
2265        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2266        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2267        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2268
2269        apply_arithmetic::<Int32Type>(
2270            Arc::new(schema),
2271            vec![Arc::new(a), Arc::new(b)],
2272            Operator::Divide,
2273            Int32Array::from(vec![Some(1), None, Some(1), Some(1), Some(0)]),
2274        )?;
2275
2276        Ok(())
2277    }
2278
2279    #[test]
2280    fn divide_op_dict_decimal() -> Result<()> {
2281        let schema = Schema::new(vec![
2282            Field::new(
2283                "a",
2284                DataType::Dictionary(
2285                    Box::new(DataType::Int8),
2286                    Box::new(DataType::Decimal128(10, 0)),
2287                ),
2288                true,
2289            ),
2290            Field::new(
2291                "b",
2292                DataType::Dictionary(
2293                    Box::new(DataType::Int8),
2294                    Box::new(DataType::Decimal128(10, 0)),
2295                ),
2296                true,
2297            ),
2298        ]);
2299
2300        let value = 123;
2301        let decimal_array = Arc::new(create_decimal_array(
2302            &[
2303                Some(value),
2304                Some(value + 2),
2305                Some(value - 1),
2306                Some(value + 1),
2307            ],
2308            10,
2309            0,
2310        ));
2311
2312        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2313        let a = DictionaryArray::try_new(keys, decimal_array)?;
2314
2315        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2316        let decimal_array = Arc::new(create_decimal_array(
2317            &[
2318                Some(value + 1),
2319                Some(value + 3),
2320                Some(value),
2321                Some(value + 2),
2322            ],
2323            10,
2324            0,
2325        ));
2326        let b = DictionaryArray::try_new(keys, decimal_array)?;
2327
2328        apply_arithmetic(
2329            Arc::new(schema),
2330            vec![Arc::new(a), Arc::new(b)],
2331            Operator::Divide,
2332            create_decimal_array(
2333                &[
2334                    Some(9919), // 0.9919
2335                    None,
2336                    None,
2337                    Some(10081), // 1.0081
2338                    Some(10000), // 1.0
2339                ],
2340                14,
2341                4,
2342            ),
2343        )?;
2344
2345        Ok(())
2346    }
2347
2348    #[test]
2349    fn divide_op_scalar() -> Result<()> {
2350        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2351        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2352
2353        apply_arithmetic_scalar(
2354            Arc::new(schema),
2355            vec![Arc::new(a)],
2356            Operator::Divide,
2357            ScalarValue::Int32(Some(2)),
2358            Arc::new(Int32Array::from(vec![0, 1, 1, 2, 2])),
2359        )?;
2360
2361        Ok(())
2362    }
2363
2364    #[test]
2365    fn divide_op_dict_scalar() -> Result<()> {
2366        let schema = Schema::new(vec![Field::new(
2367            "a",
2368            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2369            true,
2370        )]);
2371
2372        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2373
2374        dict_builder.append(1)?;
2375        dict_builder.append_null();
2376        dict_builder.append(2)?;
2377        dict_builder.append(5)?;
2378
2379        let a = dict_builder.finish();
2380
2381        let expected: PrimitiveArray<Int32Type> =
2382            PrimitiveArray::from(vec![Some(0), None, Some(1), Some(2)]);
2383
2384        apply_arithmetic_scalar(
2385            Arc::new(schema),
2386            vec![Arc::new(a)],
2387            Operator::Divide,
2388            ScalarValue::Dictionary(
2389                Box::new(DataType::Int8),
2390                Box::new(ScalarValue::Int32(Some(2))),
2391            ),
2392            Arc::new(expected),
2393        )?;
2394
2395        Ok(())
2396    }
2397
2398    #[test]
2399    fn divide_op_dict_scalar_decimal() -> Result<()> {
2400        let schema = Schema::new(vec![Field::new(
2401            "a",
2402            DataType::Dictionary(
2403                Box::new(DataType::Int8),
2404                Box::new(DataType::Decimal128(10, 0)),
2405            ),
2406            true,
2407        )]);
2408
2409        let value = 123;
2410        let decimal_array = Arc::new(create_decimal_array(
2411            &[Some(value), None, Some(value - 1), Some(value + 1)],
2412            10,
2413            0,
2414        ));
2415
2416        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2417        let a = DictionaryArray::try_new(keys, decimal_array)?;
2418
2419        let decimal_array = Arc::new(create_decimal_array(
2420            &[Some(615000), Some(610000), None, Some(620000), Some(615000)],
2421            14,
2422            4,
2423        ));
2424
2425        apply_arithmetic_scalar(
2426            Arc::new(schema),
2427            vec![Arc::new(a)],
2428            Operator::Divide,
2429            ScalarValue::Dictionary(
2430                Box::new(DataType::Int8),
2431                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2432            ),
2433            decimal_array,
2434        )?;
2435
2436        Ok(())
2437    }
2438
2439    #[test]
2440    fn modulus_op() -> Result<()> {
2441        let schema = Arc::new(Schema::new(vec![
2442            Field::new("a", DataType::Int32, false),
2443            Field::new("b", DataType::Int32, false),
2444        ]));
2445        let a = Arc::new(Int32Array::from(vec![8, 32, 128, 512, 2048]));
2446        let b = Arc::new(Int32Array::from(vec![2, 4, 7, 14, 32]));
2447
2448        apply_arithmetic::<Int32Type>(
2449            schema,
2450            vec![a, b],
2451            Operator::Modulo,
2452            Int32Array::from(vec![0, 0, 2, 8, 0]),
2453        )?;
2454
2455        Ok(())
2456    }
2457
2458    #[test]
2459    fn modulus_op_dict() -> Result<()> {
2460        let schema = Schema::new(vec![
2461            Field::new(
2462                "a",
2463                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2464                true,
2465            ),
2466            Field::new(
2467                "b",
2468                DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2469                true,
2470            ),
2471        ]);
2472
2473        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2474
2475        dict_builder.append(1)?;
2476        dict_builder.append_null();
2477        dict_builder.append(2)?;
2478        dict_builder.append(5)?;
2479        dict_builder.append(0)?;
2480
2481        let a = dict_builder.finish();
2482
2483        let b = Int32Array::from(vec![1, 2, 4, 8, 16]);
2484        let keys = Int8Array::from(vec![0, 1, 1, 2, 1]);
2485        let b = DictionaryArray::try_new(keys, Arc::new(b))?;
2486
2487        apply_arithmetic::<Int32Type>(
2488            Arc::new(schema),
2489            vec![Arc::new(a), Arc::new(b)],
2490            Operator::Modulo,
2491            Int32Array::from(vec![Some(0), None, Some(0), Some(1), Some(0)]),
2492        )?;
2493
2494        Ok(())
2495    }
2496
2497    #[test]
2498    fn modulus_op_dict_decimal() -> Result<()> {
2499        let schema = Schema::new(vec![
2500            Field::new(
2501                "a",
2502                DataType::Dictionary(
2503                    Box::new(DataType::Int8),
2504                    Box::new(DataType::Decimal128(10, 0)),
2505                ),
2506                true,
2507            ),
2508            Field::new(
2509                "b",
2510                DataType::Dictionary(
2511                    Box::new(DataType::Int8),
2512                    Box::new(DataType::Decimal128(10, 0)),
2513                ),
2514                true,
2515            ),
2516        ]);
2517
2518        let value = 123;
2519        let decimal_array = Arc::new(create_decimal_array(
2520            &[
2521                Some(value),
2522                Some(value + 2),
2523                Some(value - 1),
2524                Some(value + 1),
2525            ],
2526            10,
2527            0,
2528        ));
2529
2530        let keys = Int8Array::from(vec![Some(0), Some(2), None, Some(3), Some(0)]);
2531        let a = DictionaryArray::try_new(keys, decimal_array)?;
2532
2533        let keys = Int8Array::from(vec![Some(0), None, Some(3), Some(2), Some(2)]);
2534        let decimal_array = Arc::new(create_decimal_array(
2535            &[
2536                Some(value + 1),
2537                Some(value + 3),
2538                Some(value),
2539                Some(value + 2),
2540            ],
2541            10,
2542            0,
2543        ));
2544        let b = DictionaryArray::try_new(keys, decimal_array)?;
2545
2546        apply_arithmetic(
2547            Arc::new(schema),
2548            vec![Arc::new(a), Arc::new(b)],
2549            Operator::Modulo,
2550            create_decimal_array(&[Some(123), None, None, Some(1), Some(0)], 10, 0),
2551        )?;
2552
2553        Ok(())
2554    }
2555
2556    #[test]
2557    fn modulus_op_scalar() -> Result<()> {
2558        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2559        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
2560
2561        apply_arithmetic_scalar(
2562            Arc::new(schema),
2563            vec![Arc::new(a)],
2564            Operator::Modulo,
2565            ScalarValue::Int32(Some(2)),
2566            Arc::new(Int32Array::from(vec![1, 0, 1, 0, 1])),
2567        )?;
2568
2569        Ok(())
2570    }
2571
2572    #[test]
2573    fn modules_op_dict_scalar() -> Result<()> {
2574        let schema = Schema::new(vec![Field::new(
2575            "a",
2576            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
2577            true,
2578        )]);
2579
2580        let mut dict_builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
2581
2582        dict_builder.append(1)?;
2583        dict_builder.append_null();
2584        dict_builder.append(2)?;
2585        dict_builder.append(5)?;
2586
2587        let a = dict_builder.finish();
2588
2589        let expected: PrimitiveArray<Int32Type> =
2590            PrimitiveArray::from(vec![Some(1), None, Some(0), Some(1)]);
2591
2592        apply_arithmetic_scalar(
2593            Arc::new(schema),
2594            vec![Arc::new(a)],
2595            Operator::Modulo,
2596            ScalarValue::Dictionary(
2597                Box::new(DataType::Int8),
2598                Box::new(ScalarValue::Int32(Some(2))),
2599            ),
2600            Arc::new(expected),
2601        )?;
2602
2603        Ok(())
2604    }
2605
2606    #[test]
2607    fn modulus_op_dict_scalar_decimal() -> Result<()> {
2608        let schema = Schema::new(vec![Field::new(
2609            "a",
2610            DataType::Dictionary(
2611                Box::new(DataType::Int8),
2612                Box::new(DataType::Decimal128(10, 0)),
2613            ),
2614            true,
2615        )]);
2616
2617        let value = 123;
2618        let decimal_array = Arc::new(create_decimal_array(
2619            &[Some(value), None, Some(value - 1), Some(value + 1)],
2620            10,
2621            0,
2622        ));
2623
2624        let keys = Int8Array::from(vec![0, 2, 1, 3, 0]);
2625        let a = DictionaryArray::try_new(keys, decimal_array)?;
2626
2627        let decimal_array = Arc::new(create_decimal_array(
2628            &[Some(1), Some(0), None, Some(0), Some(1)],
2629            10,
2630            0,
2631        ));
2632
2633        apply_arithmetic_scalar(
2634            Arc::new(schema),
2635            vec![Arc::new(a)],
2636            Operator::Modulo,
2637            ScalarValue::Dictionary(
2638                Box::new(DataType::Int8),
2639                Box::new(ScalarValue::Decimal128(Some(2), 10, 0)),
2640            ),
2641            decimal_array,
2642        )?;
2643
2644        Ok(())
2645    }
2646
2647    fn apply_arithmetic<T: ArrowNumericType>(
2648        schema: SchemaRef,
2649        data: Vec<ArrayRef>,
2650        op: Operator,
2651        expected: PrimitiveArray<T>,
2652    ) -> Result<()> {
2653        let arithmetic_op =
2654            binary_op(col("a", &schema)?, op, col("b", &schema)?, &schema)?;
2655        let batch = RecordBatch::try_new(schema, data)?;
2656        let result = arithmetic_op
2657            .evaluate(&batch)?
2658            .into_array(batch.num_rows())
2659            .expect("Failed to convert to array");
2660
2661        assert_eq!(result.as_ref(), &expected);
2662        Ok(())
2663    }
2664
2665    fn apply_arithmetic_scalar(
2666        schema: SchemaRef,
2667        data: Vec<ArrayRef>,
2668        op: Operator,
2669        literal: ScalarValue,
2670        expected: ArrayRef,
2671    ) -> Result<()> {
2672        let lit = Arc::new(Literal::new(literal));
2673        let arithmetic_op = binary_op(col("a", &schema)?, op, lit, &schema)?;
2674        let batch = RecordBatch::try_new(schema, data)?;
2675        let result = arithmetic_op
2676            .evaluate(&batch)?
2677            .into_array(batch.num_rows())
2678            .expect("Failed to convert to array");
2679
2680        assert_eq!(&result, &expected);
2681        Ok(())
2682    }
2683
2684    fn apply_logic_op(
2685        schema: &SchemaRef,
2686        left: &ArrayRef,
2687        right: &ArrayRef,
2688        op: Operator,
2689        expected: BooleanArray,
2690    ) -> Result<()> {
2691        let op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
2692        let data: Vec<ArrayRef> = vec![Arc::clone(left), Arc::clone(right)];
2693        let batch = RecordBatch::try_new(Arc::clone(schema), data)?;
2694        let result = op
2695            .evaluate(&batch)?
2696            .into_array(batch.num_rows())
2697            .expect("Failed to convert to array");
2698
2699        assert_eq!(result.as_ref(), &expected);
2700        Ok(())
2701    }
2702
2703    // Test `scalar <op> arr` produces expected
2704    fn apply_logic_op_scalar_arr(
2705        schema: &SchemaRef,
2706        scalar: &ScalarValue,
2707        arr: &ArrayRef,
2708        op: Operator,
2709        expected: &BooleanArray,
2710    ) -> Result<()> {
2711        let scalar = lit(scalar.clone());
2712        let op = binary_op(scalar, op, col("a", schema)?, schema)?;
2713        let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
2714        let result = op
2715            .evaluate(&batch)?
2716            .into_array(batch.num_rows())
2717            .expect("Failed to convert to array");
2718        assert_eq!(result.as_ref(), expected);
2719
2720        Ok(())
2721    }
2722
2723    // Test `arr <op> scalar` produces expected
2724    fn apply_logic_op_arr_scalar(
2725        schema: &SchemaRef,
2726        arr: &ArrayRef,
2727        scalar: &ScalarValue,
2728        op: Operator,
2729        expected: &BooleanArray,
2730    ) -> Result<()> {
2731        let scalar = lit(scalar.clone());
2732        let op = binary_op(col("a", schema)?, op, scalar, schema)?;
2733        let batch = RecordBatch::try_new(Arc::clone(schema), vec![Arc::clone(arr)])?;
2734        let result = op
2735            .evaluate(&batch)?
2736            .into_array(batch.num_rows())
2737            .expect("Failed to convert to array");
2738        assert_eq!(result.as_ref(), expected);
2739
2740        Ok(())
2741    }
2742
2743    #[test]
2744    fn and_with_nulls_op() -> Result<()> {
2745        let schema = Schema::new(vec![
2746            Field::new("a", DataType::Boolean, true),
2747            Field::new("b", DataType::Boolean, true),
2748        ]);
2749        let a = Arc::new(BooleanArray::from(vec![
2750            Some(true),
2751            Some(false),
2752            None,
2753            Some(true),
2754            Some(false),
2755            None,
2756            Some(true),
2757            Some(false),
2758            None,
2759        ])) as ArrayRef;
2760        let b = Arc::new(BooleanArray::from(vec![
2761            Some(true),
2762            Some(true),
2763            Some(true),
2764            Some(false),
2765            Some(false),
2766            Some(false),
2767            None,
2768            None,
2769            None,
2770        ])) as ArrayRef;
2771
2772        let expected = BooleanArray::from(vec![
2773            Some(true),
2774            Some(false),
2775            None,
2776            Some(false),
2777            Some(false),
2778            Some(false),
2779            None,
2780            Some(false),
2781            None,
2782        ]);
2783        apply_logic_op(&Arc::new(schema), &a, &b, Operator::And, expected)?;
2784
2785        Ok(())
2786    }
2787
2788    #[test]
2789    fn regex_with_nulls() -> Result<()> {
2790        let schema = Schema::new(vec![
2791            Field::new("a", DataType::Utf8, true),
2792            Field::new("b", DataType::Utf8, true),
2793        ]);
2794        let a = Arc::new(StringArray::from(vec![
2795            Some("abc"),
2796            None,
2797            Some("abc"),
2798            None,
2799            Some("abc"),
2800        ])) as ArrayRef;
2801        let b = Arc::new(StringArray::from(vec![
2802            Some("^a"),
2803            Some("^A"),
2804            None,
2805            None,
2806            Some("^(b|c)"),
2807        ])) as ArrayRef;
2808
2809        let regex_expected =
2810            BooleanArray::from(vec![Some(true), None, None, None, Some(false)]);
2811        let regex_not_expected =
2812            BooleanArray::from(vec![Some(false), None, None, None, Some(true)]);
2813        apply_logic_op(
2814            &Arc::new(schema.clone()),
2815            &a,
2816            &b,
2817            Operator::RegexMatch,
2818            regex_expected.clone(),
2819        )?;
2820        apply_logic_op(
2821            &Arc::new(schema.clone()),
2822            &a,
2823            &b,
2824            Operator::RegexIMatch,
2825            regex_expected.clone(),
2826        )?;
2827        apply_logic_op(
2828            &Arc::new(schema.clone()),
2829            &a,
2830            &b,
2831            Operator::RegexNotMatch,
2832            regex_not_expected.clone(),
2833        )?;
2834        apply_logic_op(
2835            &Arc::new(schema),
2836            &a,
2837            &b,
2838            Operator::RegexNotIMatch,
2839            regex_not_expected.clone(),
2840        )?;
2841
2842        let schema = Schema::new(vec![
2843            Field::new("a", DataType::LargeUtf8, true),
2844            Field::new("b", DataType::LargeUtf8, true),
2845        ]);
2846        let a = Arc::new(LargeStringArray::from(vec![
2847            Some("abc"),
2848            None,
2849            Some("abc"),
2850            None,
2851            Some("abc"),
2852        ])) as ArrayRef;
2853        let b = Arc::new(LargeStringArray::from(vec![
2854            Some("^a"),
2855            Some("^A"),
2856            None,
2857            None,
2858            Some("^(b|c)"),
2859        ])) as ArrayRef;
2860
2861        apply_logic_op(
2862            &Arc::new(schema.clone()),
2863            &a,
2864            &b,
2865            Operator::RegexMatch,
2866            regex_expected.clone(),
2867        )?;
2868        apply_logic_op(
2869            &Arc::new(schema.clone()),
2870            &a,
2871            &b,
2872            Operator::RegexIMatch,
2873            regex_expected,
2874        )?;
2875        apply_logic_op(
2876            &Arc::new(schema.clone()),
2877            &a,
2878            &b,
2879            Operator::RegexNotMatch,
2880            regex_not_expected.clone(),
2881        )?;
2882        apply_logic_op(
2883            &Arc::new(schema),
2884            &a,
2885            &b,
2886            Operator::RegexNotIMatch,
2887            regex_not_expected,
2888        )?;
2889
2890        Ok(())
2891    }
2892
2893    #[test]
2894    fn or_with_nulls_op() -> Result<()> {
2895        let schema = Schema::new(vec![
2896            Field::new("a", DataType::Boolean, true),
2897            Field::new("b", DataType::Boolean, true),
2898        ]);
2899        let a = Arc::new(BooleanArray::from(vec![
2900            Some(true),
2901            Some(false),
2902            None,
2903            Some(true),
2904            Some(false),
2905            None,
2906            Some(true),
2907            Some(false),
2908            None,
2909        ])) as ArrayRef;
2910        let b = Arc::new(BooleanArray::from(vec![
2911            Some(true),
2912            Some(true),
2913            Some(true),
2914            Some(false),
2915            Some(false),
2916            Some(false),
2917            None,
2918            None,
2919            None,
2920        ])) as ArrayRef;
2921
2922        let expected = BooleanArray::from(vec![
2923            Some(true),
2924            Some(true),
2925            Some(true),
2926            Some(true),
2927            Some(false),
2928            None,
2929            Some(true),
2930            None,
2931            None,
2932        ]);
2933        apply_logic_op(&Arc::new(schema), &a, &b, Operator::Or, expected)?;
2934
2935        Ok(())
2936    }
2937
2938    /// Returns (schema, a: BooleanArray, b: BooleanArray) with all possible inputs
2939    ///
2940    /// a: [true, true, true,  NULL, NULL, NULL,  false, false, false]
2941    /// b: [true, NULL, false, true, NULL, false, true,  NULL,  false]
2942    fn bool_test_arrays() -> (SchemaRef, ArrayRef, ArrayRef) {
2943        let schema = Schema::new(vec![
2944            Field::new("a", DataType::Boolean, true),
2945            Field::new("b", DataType::Boolean, true),
2946        ]);
2947        let a: BooleanArray = [
2948            Some(true),
2949            Some(true),
2950            Some(true),
2951            None,
2952            None,
2953            None,
2954            Some(false),
2955            Some(false),
2956            Some(false),
2957        ]
2958        .iter()
2959        .collect();
2960        let b: BooleanArray = [
2961            Some(true),
2962            None,
2963            Some(false),
2964            Some(true),
2965            None,
2966            Some(false),
2967            Some(true),
2968            None,
2969            Some(false),
2970        ]
2971        .iter()
2972        .collect();
2973        (Arc::new(schema), Arc::new(a), Arc::new(b))
2974    }
2975
2976    /// Returns (schema, BooleanArray) with [true, NULL, false]
2977    fn scalar_bool_test_array() -> (SchemaRef, ArrayRef) {
2978        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
2979        let a: BooleanArray = [Some(true), None, Some(false)].iter().collect();
2980        (Arc::new(schema), Arc::new(a))
2981    }
2982
2983    #[test]
2984    fn eq_op_bool() {
2985        let (schema, a, b) = bool_test_arrays();
2986        let expected = [
2987            Some(true),
2988            None,
2989            Some(false),
2990            None,
2991            None,
2992            None,
2993            Some(false),
2994            None,
2995            Some(true),
2996        ]
2997        .iter()
2998        .collect();
2999        apply_logic_op(&schema, &a, &b, Operator::Eq, expected).unwrap();
3000    }
3001
3002    #[test]
3003    fn eq_op_bool_scalar() {
3004        let (schema, a) = scalar_bool_test_array();
3005        let expected = [Some(true), None, Some(false)].iter().collect();
3006        apply_logic_op_scalar_arr(
3007            &schema,
3008            &ScalarValue::from(true),
3009            &a,
3010            Operator::Eq,
3011            &expected,
3012        )
3013        .unwrap();
3014        apply_logic_op_arr_scalar(
3015            &schema,
3016            &a,
3017            &ScalarValue::from(true),
3018            Operator::Eq,
3019            &expected,
3020        )
3021        .unwrap();
3022
3023        let expected = [Some(false), None, Some(true)].iter().collect();
3024        apply_logic_op_scalar_arr(
3025            &schema,
3026            &ScalarValue::from(false),
3027            &a,
3028            Operator::Eq,
3029            &expected,
3030        )
3031        .unwrap();
3032        apply_logic_op_arr_scalar(
3033            &schema,
3034            &a,
3035            &ScalarValue::from(false),
3036            Operator::Eq,
3037            &expected,
3038        )
3039        .unwrap();
3040    }
3041
3042    #[test]
3043    fn neq_op_bool() {
3044        let (schema, a, b) = bool_test_arrays();
3045        let expected = [
3046            Some(false),
3047            None,
3048            Some(true),
3049            None,
3050            None,
3051            None,
3052            Some(true),
3053            None,
3054            Some(false),
3055        ]
3056        .iter()
3057        .collect();
3058        apply_logic_op(&schema, &a, &b, Operator::NotEq, expected).unwrap();
3059    }
3060
3061    #[test]
3062    fn neq_op_bool_scalar() {
3063        let (schema, a) = scalar_bool_test_array();
3064        let expected = [Some(false), None, Some(true)].iter().collect();
3065        apply_logic_op_scalar_arr(
3066            &schema,
3067            &ScalarValue::from(true),
3068            &a,
3069            Operator::NotEq,
3070            &expected,
3071        )
3072        .unwrap();
3073        apply_logic_op_arr_scalar(
3074            &schema,
3075            &a,
3076            &ScalarValue::from(true),
3077            Operator::NotEq,
3078            &expected,
3079        )
3080        .unwrap();
3081
3082        let expected = [Some(true), None, Some(false)].iter().collect();
3083        apply_logic_op_scalar_arr(
3084            &schema,
3085            &ScalarValue::from(false),
3086            &a,
3087            Operator::NotEq,
3088            &expected,
3089        )
3090        .unwrap();
3091        apply_logic_op_arr_scalar(
3092            &schema,
3093            &a,
3094            &ScalarValue::from(false),
3095            Operator::NotEq,
3096            &expected,
3097        )
3098        .unwrap();
3099    }
3100
3101    #[test]
3102    fn lt_op_bool() {
3103        let (schema, a, b) = bool_test_arrays();
3104        let expected = [
3105            Some(false),
3106            None,
3107            Some(false),
3108            None,
3109            None,
3110            None,
3111            Some(true),
3112            None,
3113            Some(false),
3114        ]
3115        .iter()
3116        .collect();
3117        apply_logic_op(&schema, &a, &b, Operator::Lt, expected).unwrap();
3118    }
3119
3120    #[test]
3121    fn lt_op_bool_scalar() {
3122        let (schema, a) = scalar_bool_test_array();
3123        let expected = [Some(false), None, Some(false)].iter().collect();
3124        apply_logic_op_scalar_arr(
3125            &schema,
3126            &ScalarValue::from(true),
3127            &a,
3128            Operator::Lt,
3129            &expected,
3130        )
3131        .unwrap();
3132
3133        let expected = [Some(false), None, Some(true)].iter().collect();
3134        apply_logic_op_arr_scalar(
3135            &schema,
3136            &a,
3137            &ScalarValue::from(true),
3138            Operator::Lt,
3139            &expected,
3140        )
3141        .unwrap();
3142
3143        let expected = [Some(true), None, Some(false)].iter().collect();
3144        apply_logic_op_scalar_arr(
3145            &schema,
3146            &ScalarValue::from(false),
3147            &a,
3148            Operator::Lt,
3149            &expected,
3150        )
3151        .unwrap();
3152
3153        let expected = [Some(false), None, Some(false)].iter().collect();
3154        apply_logic_op_arr_scalar(
3155            &schema,
3156            &a,
3157            &ScalarValue::from(false),
3158            Operator::Lt,
3159            &expected,
3160        )
3161        .unwrap();
3162    }
3163
3164    #[test]
3165    fn lt_eq_op_bool() {
3166        let (schema, a, b) = bool_test_arrays();
3167        let expected = [
3168            Some(true),
3169            None,
3170            Some(false),
3171            None,
3172            None,
3173            None,
3174            Some(true),
3175            None,
3176            Some(true),
3177        ]
3178        .iter()
3179        .collect();
3180        apply_logic_op(&schema, &a, &b, Operator::LtEq, expected).unwrap();
3181    }
3182
3183    #[test]
3184    fn lt_eq_op_bool_scalar() {
3185        let (schema, a) = scalar_bool_test_array();
3186        let expected = [Some(true), None, Some(false)].iter().collect();
3187        apply_logic_op_scalar_arr(
3188            &schema,
3189            &ScalarValue::from(true),
3190            &a,
3191            Operator::LtEq,
3192            &expected,
3193        )
3194        .unwrap();
3195
3196        let expected = [Some(true), None, Some(true)].iter().collect();
3197        apply_logic_op_arr_scalar(
3198            &schema,
3199            &a,
3200            &ScalarValue::from(true),
3201            Operator::LtEq,
3202            &expected,
3203        )
3204        .unwrap();
3205
3206        let expected = [Some(true), None, Some(true)].iter().collect();
3207        apply_logic_op_scalar_arr(
3208            &schema,
3209            &ScalarValue::from(false),
3210            &a,
3211            Operator::LtEq,
3212            &expected,
3213        )
3214        .unwrap();
3215
3216        let expected = [Some(false), None, Some(true)].iter().collect();
3217        apply_logic_op_arr_scalar(
3218            &schema,
3219            &a,
3220            &ScalarValue::from(false),
3221            Operator::LtEq,
3222            &expected,
3223        )
3224        .unwrap();
3225    }
3226
3227    #[test]
3228    fn gt_op_bool() {
3229        let (schema, a, b) = bool_test_arrays();
3230        let expected = [
3231            Some(false),
3232            None,
3233            Some(true),
3234            None,
3235            None,
3236            None,
3237            Some(false),
3238            None,
3239            Some(false),
3240        ]
3241        .iter()
3242        .collect();
3243        apply_logic_op(&schema, &a, &b, Operator::Gt, expected).unwrap();
3244    }
3245
3246    #[test]
3247    fn gt_op_bool_scalar() {
3248        let (schema, a) = scalar_bool_test_array();
3249        let expected = [Some(false), None, Some(true)].iter().collect();
3250        apply_logic_op_scalar_arr(
3251            &schema,
3252            &ScalarValue::from(true),
3253            &a,
3254            Operator::Gt,
3255            &expected,
3256        )
3257        .unwrap();
3258
3259        let expected = [Some(false), None, Some(false)].iter().collect();
3260        apply_logic_op_arr_scalar(
3261            &schema,
3262            &a,
3263            &ScalarValue::from(true),
3264            Operator::Gt,
3265            &expected,
3266        )
3267        .unwrap();
3268
3269        let expected = [Some(false), None, Some(false)].iter().collect();
3270        apply_logic_op_scalar_arr(
3271            &schema,
3272            &ScalarValue::from(false),
3273            &a,
3274            Operator::Gt,
3275            &expected,
3276        )
3277        .unwrap();
3278
3279        let expected = [Some(true), None, Some(false)].iter().collect();
3280        apply_logic_op_arr_scalar(
3281            &schema,
3282            &a,
3283            &ScalarValue::from(false),
3284            Operator::Gt,
3285            &expected,
3286        )
3287        .unwrap();
3288    }
3289
3290    #[test]
3291    fn gt_eq_op_bool() {
3292        let (schema, a, b) = bool_test_arrays();
3293        let expected = [
3294            Some(true),
3295            None,
3296            Some(true),
3297            None,
3298            None,
3299            None,
3300            Some(false),
3301            None,
3302            Some(true),
3303        ]
3304        .iter()
3305        .collect();
3306        apply_logic_op(&schema, &a, &b, Operator::GtEq, expected).unwrap();
3307    }
3308
3309    #[test]
3310    fn gt_eq_op_bool_scalar() {
3311        let (schema, a) = scalar_bool_test_array();
3312        let expected = [Some(true), None, Some(true)].iter().collect();
3313        apply_logic_op_scalar_arr(
3314            &schema,
3315            &ScalarValue::from(true),
3316            &a,
3317            Operator::GtEq,
3318            &expected,
3319        )
3320        .unwrap();
3321
3322        let expected = [Some(true), None, Some(false)].iter().collect();
3323        apply_logic_op_arr_scalar(
3324            &schema,
3325            &a,
3326            &ScalarValue::from(true),
3327            Operator::GtEq,
3328            &expected,
3329        )
3330        .unwrap();
3331
3332        let expected = [Some(false), None, Some(true)].iter().collect();
3333        apply_logic_op_scalar_arr(
3334            &schema,
3335            &ScalarValue::from(false),
3336            &a,
3337            Operator::GtEq,
3338            &expected,
3339        )
3340        .unwrap();
3341
3342        let expected = [Some(true), None, Some(true)].iter().collect();
3343        apply_logic_op_arr_scalar(
3344            &schema,
3345            &a,
3346            &ScalarValue::from(false),
3347            Operator::GtEq,
3348            &expected,
3349        )
3350        .unwrap();
3351    }
3352
3353    #[test]
3354    fn is_distinct_from_op_bool() {
3355        let (schema, a, b) = bool_test_arrays();
3356        let expected = [
3357            Some(false),
3358            Some(true),
3359            Some(true),
3360            Some(true),
3361            Some(false),
3362            Some(true),
3363            Some(true),
3364            Some(true),
3365            Some(false),
3366        ]
3367        .iter()
3368        .collect();
3369        apply_logic_op(&schema, &a, &b, Operator::IsDistinctFrom, expected).unwrap();
3370    }
3371
3372    #[test]
3373    fn is_not_distinct_from_op_bool() {
3374        let (schema, a, b) = bool_test_arrays();
3375        let expected = [
3376            Some(true),
3377            Some(false),
3378            Some(false),
3379            Some(false),
3380            Some(true),
3381            Some(false),
3382            Some(false),
3383            Some(false),
3384            Some(true),
3385        ]
3386        .iter()
3387        .collect();
3388        apply_logic_op(&schema, &a, &b, Operator::IsNotDistinctFrom, expected).unwrap();
3389    }
3390
3391    #[test]
3392    fn relatively_deeply_nested() {
3393        // Reproducer for https://github.com/apache/datafusion/issues/419
3394
3395        // where even relatively shallow binary expressions overflowed
3396        // the stack in debug builds
3397
3398        let input: Vec<_> = vec![1, 2, 3, 4, 5].into_iter().map(Some).collect();
3399        let a: Int32Array = input.iter().collect();
3400
3401        let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(a) as _)]).unwrap();
3402        let schema = batch.schema();
3403
3404        // build a left deep tree ((((a + a) + a) + a ....
3405        let tree_depth: i32 = 100;
3406        let expr = (0..tree_depth)
3407            .map(|_| col("a", schema.as_ref()).unwrap())
3408            .reduce(|l, r| binary(l, Operator::Plus, r, &schema).unwrap())
3409            .unwrap();
3410
3411        let result = expr
3412            .evaluate(&batch)
3413            .expect("evaluation")
3414            .into_array(batch.num_rows())
3415            .expect("Failed to convert to array");
3416
3417        let expected: Int32Array = input
3418            .into_iter()
3419            .map(|i| i.map(|i| i * tree_depth))
3420            .collect();
3421        assert_eq!(result.as_ref(), &expected);
3422    }
3423
3424    fn create_decimal_array(
3425        array: &[Option<i128>],
3426        precision: u8,
3427        scale: i8,
3428    ) -> Decimal128Array {
3429        let mut decimal_builder = Decimal128Builder::with_capacity(array.len());
3430        for value in array.iter().copied() {
3431            decimal_builder.append_option(value)
3432        }
3433        decimal_builder
3434            .finish()
3435            .with_precision_and_scale(precision, scale)
3436            .unwrap()
3437    }
3438
3439    #[test]
3440    fn comparison_dict_decimal_scalar_expr_test() -> Result<()> {
3441        // scalar of decimal compare with dictionary decimal array
3442        let value_i128 = 123;
3443        let decimal_scalar = ScalarValue::Dictionary(
3444            Box::new(DataType::Int8),
3445            Box::new(ScalarValue::Decimal128(Some(value_i128), 25, 3)),
3446        );
3447        let schema = Arc::new(Schema::new(vec![Field::new(
3448            "a",
3449            DataType::Dictionary(
3450                Box::new(DataType::Int8),
3451                Box::new(DataType::Decimal128(25, 3)),
3452            ),
3453            true,
3454        )]));
3455        let decimal_array = Arc::new(create_decimal_array(
3456            &[
3457                Some(value_i128),
3458                None,
3459                Some(value_i128 - 1),
3460                Some(value_i128 + 1),
3461            ],
3462            25,
3463            3,
3464        ));
3465
3466        let keys = Int8Array::from(vec![Some(0), None, Some(2), Some(3)]);
3467        let dictionary =
3468            Arc::new(DictionaryArray::try_new(keys, decimal_array)?) as ArrayRef;
3469
3470        // array = scalar
3471        apply_logic_op_arr_scalar(
3472            &schema,
3473            &dictionary,
3474            &decimal_scalar,
3475            Operator::Eq,
3476            &BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3477        )
3478        .unwrap();
3479        // array != scalar
3480        apply_logic_op_arr_scalar(
3481            &schema,
3482            &dictionary,
3483            &decimal_scalar,
3484            Operator::NotEq,
3485            &BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3486        )
3487        .unwrap();
3488        //  array < scalar
3489        apply_logic_op_arr_scalar(
3490            &schema,
3491            &dictionary,
3492            &decimal_scalar,
3493            Operator::Lt,
3494            &BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3495        )
3496        .unwrap();
3497
3498        //  array <= scalar
3499        apply_logic_op_arr_scalar(
3500            &schema,
3501            &dictionary,
3502            &decimal_scalar,
3503            Operator::LtEq,
3504            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3505        )
3506        .unwrap();
3507        // array > scalar
3508        apply_logic_op_arr_scalar(
3509            &schema,
3510            &dictionary,
3511            &decimal_scalar,
3512            Operator::Gt,
3513            &BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3514        )
3515        .unwrap();
3516
3517        // array >= scalar
3518        apply_logic_op_arr_scalar(
3519            &schema,
3520            &dictionary,
3521            &decimal_scalar,
3522            Operator::GtEq,
3523            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3524        )
3525        .unwrap();
3526
3527        Ok(())
3528    }
3529
3530    #[test]
3531    fn comparison_decimal_expr_test() -> Result<()> {
3532        // scalar of decimal compare with decimal array
3533        let value_i128 = 123;
3534        let decimal_scalar = ScalarValue::Decimal128(Some(value_i128), 25, 3);
3535        let schema = Arc::new(Schema::new(vec![Field::new(
3536            "a",
3537            DataType::Decimal128(25, 3),
3538            true,
3539        )]));
3540        let decimal_array = Arc::new(create_decimal_array(
3541            &[
3542                Some(value_i128),
3543                None,
3544                Some(value_i128 - 1),
3545                Some(value_i128 + 1),
3546            ],
3547            25,
3548            3,
3549        )) as ArrayRef;
3550        // array = scalar
3551        apply_logic_op_arr_scalar(
3552            &schema,
3553            &decimal_array,
3554            &decimal_scalar,
3555            Operator::Eq,
3556            &BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3557        )
3558        .unwrap();
3559        // array != scalar
3560        apply_logic_op_arr_scalar(
3561            &schema,
3562            &decimal_array,
3563            &decimal_scalar,
3564            Operator::NotEq,
3565            &BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3566        )
3567        .unwrap();
3568        //  array < scalar
3569        apply_logic_op_arr_scalar(
3570            &schema,
3571            &decimal_array,
3572            &decimal_scalar,
3573            Operator::Lt,
3574            &BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3575        )
3576        .unwrap();
3577
3578        //  array <= scalar
3579        apply_logic_op_arr_scalar(
3580            &schema,
3581            &decimal_array,
3582            &decimal_scalar,
3583            Operator::LtEq,
3584            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3585        )
3586        .unwrap();
3587        // array > scalar
3588        apply_logic_op_arr_scalar(
3589            &schema,
3590            &decimal_array,
3591            &decimal_scalar,
3592            Operator::Gt,
3593            &BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3594        )
3595        .unwrap();
3596
3597        // array >= scalar
3598        apply_logic_op_arr_scalar(
3599            &schema,
3600            &decimal_array,
3601            &decimal_scalar,
3602            Operator::GtEq,
3603            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3604        )
3605        .unwrap();
3606
3607        // scalar of different data type with decimal array
3608        let decimal_scalar = ScalarValue::Decimal128(Some(123_456), 10, 3);
3609        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
3610        // scalar == array
3611        apply_logic_op_scalar_arr(
3612            &schema,
3613            &decimal_scalar,
3614            &(Arc::new(Int64Array::from(vec![Some(124), None])) as ArrayRef),
3615            Operator::Eq,
3616            &BooleanArray::from(vec![Some(false), None]),
3617        )
3618        .unwrap();
3619
3620        // array != scalar
3621        apply_logic_op_arr_scalar(
3622            &schema,
3623            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(1)])) as ArrayRef),
3624            &decimal_scalar,
3625            Operator::NotEq,
3626            &BooleanArray::from(vec![Some(true), None, Some(true)]),
3627        )
3628        .unwrap();
3629
3630        // array < scalar
3631        apply_logic_op_arr_scalar(
3632            &schema,
3633            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(124)])) as ArrayRef),
3634            &decimal_scalar,
3635            Operator::Lt,
3636            &BooleanArray::from(vec![Some(true), None, Some(false)]),
3637        )
3638        .unwrap();
3639
3640        // array > scalar
3641        apply_logic_op_arr_scalar(
3642            &schema,
3643            &(Arc::new(Int64Array::from(vec![Some(123), None, Some(124)])) as ArrayRef),
3644            &decimal_scalar,
3645            Operator::Gt,
3646            &BooleanArray::from(vec![Some(false), None, Some(true)]),
3647        )
3648        .unwrap();
3649
3650        let schema =
3651            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
3652        // array == scalar
3653        apply_logic_op_arr_scalar(
3654            &schema,
3655            &(Arc::new(Float64Array::from(vec![Some(123.456), None, Some(123.457)]))
3656                as ArrayRef),
3657            &decimal_scalar,
3658            Operator::Eq,
3659            &BooleanArray::from(vec![Some(true), None, Some(false)]),
3660        )
3661        .unwrap();
3662
3663        // array <= scalar
3664        apply_logic_op_arr_scalar(
3665            &schema,
3666            &(Arc::new(Float64Array::from(vec![
3667                Some(123.456),
3668                None,
3669                Some(123.457),
3670                Some(123.45),
3671            ])) as ArrayRef),
3672            &decimal_scalar,
3673            Operator::LtEq,
3674            &BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3675        )
3676        .unwrap();
3677        // array >= scalar
3678        apply_logic_op_arr_scalar(
3679            &schema,
3680            &(Arc::new(Float64Array::from(vec![
3681                Some(123.456),
3682                None,
3683                Some(123.457),
3684                Some(123.45),
3685            ])) as ArrayRef),
3686            &decimal_scalar,
3687            Operator::GtEq,
3688            &BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3689        )
3690        .unwrap();
3691
3692        let value: i128 = 123;
3693        let decimal_array = Arc::new(create_decimal_array(
3694            &[Some(value), None, Some(value - 1), Some(value + 1)],
3695            10,
3696            0,
3697        )) as ArrayRef;
3698
3699        // comparison array op for decimal array
3700        let schema = Arc::new(Schema::new(vec![
3701            Field::new("a", DataType::Decimal128(10, 0), true),
3702            Field::new("b", DataType::Decimal128(10, 0), true),
3703        ]));
3704        let right_decimal_array = Arc::new(create_decimal_array(
3705            &[
3706                Some(value - 1),
3707                Some(value),
3708                Some(value + 1),
3709                Some(value + 1),
3710            ],
3711            10,
3712            0,
3713        )) as ArrayRef;
3714
3715        apply_logic_op(
3716            &schema,
3717            &decimal_array,
3718            &right_decimal_array,
3719            Operator::Eq,
3720            BooleanArray::from(vec![Some(false), None, Some(false), Some(true)]),
3721        )
3722        .unwrap();
3723
3724        apply_logic_op(
3725            &schema,
3726            &decimal_array,
3727            &right_decimal_array,
3728            Operator::NotEq,
3729            BooleanArray::from(vec![Some(true), None, Some(true), Some(false)]),
3730        )
3731        .unwrap();
3732
3733        apply_logic_op(
3734            &schema,
3735            &decimal_array,
3736            &right_decimal_array,
3737            Operator::Lt,
3738            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3739        )
3740        .unwrap();
3741
3742        apply_logic_op(
3743            &schema,
3744            &decimal_array,
3745            &right_decimal_array,
3746            Operator::LtEq,
3747            BooleanArray::from(vec![Some(false), None, Some(true), Some(true)]),
3748        )
3749        .unwrap();
3750
3751        apply_logic_op(
3752            &schema,
3753            &decimal_array,
3754            &right_decimal_array,
3755            Operator::Gt,
3756            BooleanArray::from(vec![Some(true), None, Some(false), Some(false)]),
3757        )
3758        .unwrap();
3759
3760        apply_logic_op(
3761            &schema,
3762            &decimal_array,
3763            &right_decimal_array,
3764            Operator::GtEq,
3765            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3766        )
3767        .unwrap();
3768
3769        // compare decimal array with other array type
3770        let value: i64 = 123;
3771        let schema = Arc::new(Schema::new(vec![
3772            Field::new("a", DataType::Int64, true),
3773            Field::new("b", DataType::Decimal128(10, 0), true),
3774        ]));
3775
3776        let int64_array = Arc::new(Int64Array::from(vec![
3777            Some(value),
3778            Some(value - 1),
3779            Some(value),
3780            Some(value + 1),
3781        ])) as ArrayRef;
3782
3783        // eq: int64array == decimal array
3784        apply_logic_op(
3785            &schema,
3786            &int64_array,
3787            &decimal_array,
3788            Operator::Eq,
3789            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3790        )
3791        .unwrap();
3792        // neq: int64array != decimal array
3793        apply_logic_op(
3794            &schema,
3795            &int64_array,
3796            &decimal_array,
3797            Operator::NotEq,
3798            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3799        )
3800        .unwrap();
3801
3802        let schema = Arc::new(Schema::new(vec![
3803            Field::new("a", DataType::Float64, true),
3804            Field::new("b", DataType::Decimal128(10, 2), true),
3805        ]));
3806
3807        let value: i128 = 123;
3808        let decimal_array = Arc::new(create_decimal_array(
3809            &[
3810                Some(value), // 1.23
3811                None,
3812                Some(value - 1), // 1.22
3813                Some(value + 1), // 1.24
3814            ],
3815            10,
3816            2,
3817        )) as ArrayRef;
3818        let float64_array = Arc::new(Float64Array::from(vec![
3819            Some(1.23),
3820            Some(1.22),
3821            Some(1.23),
3822            Some(1.24),
3823        ])) as ArrayRef;
3824        // lt: float64array < decimal array
3825        apply_logic_op(
3826            &schema,
3827            &float64_array,
3828            &decimal_array,
3829            Operator::Lt,
3830            BooleanArray::from(vec![Some(false), None, Some(false), Some(false)]),
3831        )
3832        .unwrap();
3833        // lt_eq: float64array <= decimal array
3834        apply_logic_op(
3835            &schema,
3836            &float64_array,
3837            &decimal_array,
3838            Operator::LtEq,
3839            BooleanArray::from(vec![Some(true), None, Some(false), Some(true)]),
3840        )
3841        .unwrap();
3842        // gt: float64array > decimal array
3843        apply_logic_op(
3844            &schema,
3845            &float64_array,
3846            &decimal_array,
3847            Operator::Gt,
3848            BooleanArray::from(vec![Some(false), None, Some(true), Some(false)]),
3849        )
3850        .unwrap();
3851        apply_logic_op(
3852            &schema,
3853            &float64_array,
3854            &decimal_array,
3855            Operator::GtEq,
3856            BooleanArray::from(vec![Some(true), None, Some(true), Some(true)]),
3857        )
3858        .unwrap();
3859        // is distinct: float64array is distinct decimal array
3860        // TODO: now we do not refactor the `is distinct or is not distinct` rule of coercion.
3861        // traced by https://github.com/apache/datafusion/issues/1590
3862        // the decimal array will be casted to float64array
3863        apply_logic_op(
3864            &schema,
3865            &float64_array,
3866            &decimal_array,
3867            Operator::IsDistinctFrom,
3868            BooleanArray::from(vec![Some(false), Some(true), Some(true), Some(false)]),
3869        )
3870        .unwrap();
3871        // is not distinct
3872        apply_logic_op(
3873            &schema,
3874            &float64_array,
3875            &decimal_array,
3876            Operator::IsNotDistinctFrom,
3877            BooleanArray::from(vec![Some(true), Some(false), Some(false), Some(true)]),
3878        )
3879        .unwrap();
3880
3881        Ok(())
3882    }
3883
3884    fn apply_decimal_arithmetic_op(
3885        schema: &SchemaRef,
3886        left: &ArrayRef,
3887        right: &ArrayRef,
3888        op: Operator,
3889        expected: ArrayRef,
3890    ) -> Result<()> {
3891        let arithmetic_op = binary_op(col("a", schema)?, op, col("b", schema)?, schema)?;
3892        let data: Vec<ArrayRef> = vec![Arc::clone(left), Arc::clone(right)];
3893        let batch = RecordBatch::try_new(Arc::clone(schema), data)?;
3894        let result = arithmetic_op
3895            .evaluate(&batch)?
3896            .into_array(batch.num_rows())
3897            .expect("Failed to convert to array");
3898
3899        assert_eq!(result.as_ref(), expected.as_ref());
3900        Ok(())
3901    }
3902
3903    #[test]
3904    fn arithmetic_decimal_expr_test() -> Result<()> {
3905        let schema = Arc::new(Schema::new(vec![
3906            Field::new("a", DataType::Int32, true),
3907            Field::new("b", DataType::Decimal128(10, 2), true),
3908        ]));
3909        let value: i128 = 123;
3910        let decimal_array = Arc::new(create_decimal_array(
3911            &[
3912                Some(value), // 1.23
3913                None,
3914                Some(value - 1), // 1.22
3915                Some(value + 1), // 1.24
3916            ],
3917            10,
3918            2,
3919        )) as ArrayRef;
3920        let int32_array = Arc::new(Int32Array::from(vec![
3921            Some(123),
3922            Some(122),
3923            Some(123),
3924            Some(124),
3925        ])) as ArrayRef;
3926
3927        // add: Int32array add decimal array
3928        let expect = Arc::new(create_decimal_array(
3929            &[Some(12423), None, Some(12422), Some(12524)],
3930            13,
3931            2,
3932        )) as ArrayRef;
3933        apply_decimal_arithmetic_op(
3934            &schema,
3935            &int32_array,
3936            &decimal_array,
3937            Operator::Plus,
3938            expect,
3939        )
3940        .unwrap();
3941
3942        // subtract: decimal array subtract int32 array
3943        let schema = Arc::new(Schema::new(vec![
3944            Field::new("a", DataType::Decimal128(10, 2), true),
3945            Field::new("b", DataType::Int32, true),
3946        ]));
3947        let expect = Arc::new(create_decimal_array(
3948            &[Some(-12177), None, Some(-12178), Some(-12276)],
3949            13,
3950            2,
3951        )) as ArrayRef;
3952        apply_decimal_arithmetic_op(
3953            &schema,
3954            &decimal_array,
3955            &int32_array,
3956            Operator::Minus,
3957            expect,
3958        )
3959        .unwrap();
3960
3961        // multiply: decimal array multiply int32 array
3962        let expect = Arc::new(create_decimal_array(
3963            &[Some(15129), None, Some(15006), Some(15376)],
3964            21,
3965            2,
3966        )) as ArrayRef;
3967        apply_decimal_arithmetic_op(
3968            &schema,
3969            &decimal_array,
3970            &int32_array,
3971            Operator::Multiply,
3972            expect,
3973        )
3974        .unwrap();
3975
3976        // divide: int32 array divide decimal array
3977        let schema = Arc::new(Schema::new(vec![
3978            Field::new("a", DataType::Int32, true),
3979            Field::new("b", DataType::Decimal128(10, 2), true),
3980        ]));
3981        let expect = Arc::new(create_decimal_array(
3982            &[Some(1000000), None, Some(1008196), Some(1000000)],
3983            16,
3984            4,
3985        )) as ArrayRef;
3986        apply_decimal_arithmetic_op(
3987            &schema,
3988            &int32_array,
3989            &decimal_array,
3990            Operator::Divide,
3991            expect,
3992        )
3993        .unwrap();
3994
3995        // modulus: int32 array modulus decimal array
3996        let schema = Arc::new(Schema::new(vec![
3997            Field::new("a", DataType::Int32, true),
3998            Field::new("b", DataType::Decimal128(10, 2), true),
3999        ]));
4000        let expect = Arc::new(create_decimal_array(
4001            &[Some(000), None, Some(100), Some(000)],
4002            10,
4003            2,
4004        )) as ArrayRef;
4005        apply_decimal_arithmetic_op(
4006            &schema,
4007            &int32_array,
4008            &decimal_array,
4009            Operator::Modulo,
4010            expect,
4011        )
4012        .unwrap();
4013
4014        Ok(())
4015    }
4016
4017    #[test]
4018    fn arithmetic_decimal_float_expr_test() -> Result<()> {
4019        let schema = Arc::new(Schema::new(vec![
4020            Field::new("a", DataType::Float64, true),
4021            Field::new("b", DataType::Decimal128(10, 2), true),
4022        ]));
4023        let value: i128 = 123;
4024        let decimal_array = Arc::new(create_decimal_array(
4025            &[
4026                Some(value), // 1.23
4027                None,
4028                Some(value - 1), // 1.22
4029                Some(value + 1), // 1.24
4030            ],
4031            10,
4032            2,
4033        )) as ArrayRef;
4034        let float64_array = Arc::new(Float64Array::from(vec![
4035            Some(123.0),
4036            Some(122.0),
4037            Some(123.0),
4038            Some(124.0),
4039        ])) as ArrayRef;
4040
4041        // add: float64 array add decimal array
4042        let expect = Arc::new(Float64Array::from(vec![
4043            Some(124.23),
4044            None,
4045            Some(124.22),
4046            Some(125.24),
4047        ])) as ArrayRef;
4048        apply_decimal_arithmetic_op(
4049            &schema,
4050            &float64_array,
4051            &decimal_array,
4052            Operator::Plus,
4053            expect,
4054        )
4055        .unwrap();
4056
4057        // subtract: decimal array subtract float64 array
4058        let schema = Arc::new(Schema::new(vec![
4059            Field::new("a", DataType::Float64, true),
4060            Field::new("b", DataType::Decimal128(10, 2), true),
4061        ]));
4062        let expect = Arc::new(Float64Array::from(vec![
4063            Some(121.77),
4064            None,
4065            Some(121.78),
4066            Some(122.76),
4067        ])) as ArrayRef;
4068        apply_decimal_arithmetic_op(
4069            &schema,
4070            &float64_array,
4071            &decimal_array,
4072            Operator::Minus,
4073            expect,
4074        )
4075        .unwrap();
4076
4077        // multiply: decimal array multiply float64 array
4078        let expect = Arc::new(Float64Array::from(vec![
4079            Some(151.29),
4080            None,
4081            Some(150.06),
4082            Some(153.76),
4083        ])) as ArrayRef;
4084        apply_decimal_arithmetic_op(
4085            &schema,
4086            &float64_array,
4087            &decimal_array,
4088            Operator::Multiply,
4089            expect,
4090        )
4091        .unwrap();
4092
4093        // divide: float64 array divide decimal array
4094        let schema = Arc::new(Schema::new(vec![
4095            Field::new("a", DataType::Float64, true),
4096            Field::new("b", DataType::Decimal128(10, 2), true),
4097        ]));
4098        let expect = Arc::new(Float64Array::from(vec![
4099            Some(100.0),
4100            None,
4101            Some(100.81967213114754),
4102            Some(100.0),
4103        ])) as ArrayRef;
4104        apply_decimal_arithmetic_op(
4105            &schema,
4106            &float64_array,
4107            &decimal_array,
4108            Operator::Divide,
4109            expect,
4110        )
4111        .unwrap();
4112
4113        // modulus: float64 array modulus decimal array
4114        let schema = Arc::new(Schema::new(vec![
4115            Field::new("a", DataType::Float64, true),
4116            Field::new("b", DataType::Decimal128(10, 2), true),
4117        ]));
4118        let expect = Arc::new(Float64Array::from(vec![
4119            Some(1.7763568394002505e-15),
4120            None,
4121            Some(1.0000000000000027),
4122            Some(8.881784197001252e-16),
4123        ])) as ArrayRef;
4124        apply_decimal_arithmetic_op(
4125            &schema,
4126            &float64_array,
4127            &decimal_array,
4128            Operator::Modulo,
4129            expect,
4130        )
4131        .unwrap();
4132
4133        Ok(())
4134    }
4135
4136    #[test]
4137    fn arithmetic_divide_zero() -> Result<()> {
4138        // other data type
4139        let schema = Arc::new(Schema::new(vec![
4140            Field::new("a", DataType::Int32, true),
4141            Field::new("b", DataType::Int32, true),
4142        ]));
4143        let a = Arc::new(Int32Array::from(vec![100]));
4144        let b = Arc::new(Int32Array::from(vec![0]));
4145
4146        let err = apply_arithmetic::<Int32Type>(
4147            schema,
4148            vec![a, b],
4149            Operator::Divide,
4150            Int32Array::from(vec![Some(4), Some(8), Some(16), Some(32), Some(64)]),
4151        )
4152        .unwrap_err();
4153
4154        let _expected = plan_datafusion_err!("Divide by zero");
4155
4156        assert!(matches!(err, ref _expected), "{err}");
4157
4158        // decimal
4159        let schema = Arc::new(Schema::new(vec![
4160            Field::new("a", DataType::Decimal128(25, 3), true),
4161            Field::new("b", DataType::Decimal128(25, 3), true),
4162        ]));
4163        let left_decimal_array = Arc::new(create_decimal_array(&[Some(1234567)], 25, 3));
4164        let right_decimal_array = Arc::new(create_decimal_array(&[Some(0)], 25, 3));
4165
4166        let err = apply_arithmetic::<Decimal128Type>(
4167            schema,
4168            vec![left_decimal_array, right_decimal_array],
4169            Operator::Divide,
4170            create_decimal_array(
4171                &[Some(12345670000000000000000000000000000), None],
4172                38,
4173                29,
4174            ),
4175        )
4176        .unwrap_err();
4177
4178        assert!(matches!(err, ref _expected), "{err}");
4179
4180        Ok(())
4181    }
4182
4183    #[test]
4184    fn bitwise_array_test() -> Result<()> {
4185        let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4186        let right =
4187            Arc::new(Int32Array::from(vec![Some(1), Some(3), Some(7)])) as ArrayRef;
4188        let mut result = bitwise_and_dyn(Arc::clone(&left), Arc::clone(&right))?;
4189        let expected = Int32Array::from(vec![Some(0), None, Some(3)]);
4190        assert_eq!(result.as_ref(), &expected);
4191
4192        result = bitwise_or_dyn(Arc::clone(&left), Arc::clone(&right))?;
4193        let expected = Int32Array::from(vec![Some(13), None, Some(15)]);
4194        assert_eq!(result.as_ref(), &expected);
4195
4196        result = bitwise_xor_dyn(Arc::clone(&left), Arc::clone(&right))?;
4197        let expected = Int32Array::from(vec![Some(13), None, Some(12)]);
4198        assert_eq!(result.as_ref(), &expected);
4199
4200        let left =
4201            Arc::new(UInt32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4202        let right =
4203            Arc::new(UInt32Array::from(vec![Some(1), Some(3), Some(7)])) as ArrayRef;
4204        let mut result = bitwise_and_dyn(Arc::clone(&left), Arc::clone(&right))?;
4205        let expected = UInt32Array::from(vec![Some(0), None, Some(3)]);
4206        assert_eq!(result.as_ref(), &expected);
4207
4208        result = bitwise_or_dyn(Arc::clone(&left), Arc::clone(&right))?;
4209        let expected = UInt32Array::from(vec![Some(13), None, Some(15)]);
4210        assert_eq!(result.as_ref(), &expected);
4211
4212        result = bitwise_xor_dyn(Arc::clone(&left), Arc::clone(&right))?;
4213        let expected = UInt32Array::from(vec![Some(13), None, Some(12)]);
4214        assert_eq!(result.as_ref(), &expected);
4215
4216        Ok(())
4217    }
4218
4219    #[test]
4220    fn bitwise_shift_array_test() -> Result<()> {
4221        let input = Arc::new(Int32Array::from(vec![Some(2), None, Some(10)])) as ArrayRef;
4222        let modules =
4223            Arc::new(Int32Array::from(vec![Some(2), Some(4), Some(8)])) as ArrayRef;
4224        let mut result =
4225            bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4226
4227        let expected = Int32Array::from(vec![Some(8), None, Some(2560)]);
4228        assert_eq!(result.as_ref(), &expected);
4229
4230        result = bitwise_shift_right_dyn(Arc::clone(&result), Arc::clone(&modules))?;
4231        assert_eq!(result.as_ref(), &input);
4232
4233        let input =
4234            Arc::new(UInt32Array::from(vec![Some(2), None, Some(10)])) as ArrayRef;
4235        let modules =
4236            Arc::new(UInt32Array::from(vec![Some(2), Some(4), Some(8)])) as ArrayRef;
4237        let mut result =
4238            bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4239
4240        let expected = UInt32Array::from(vec![Some(8), None, Some(2560)]);
4241        assert_eq!(result.as_ref(), &expected);
4242
4243        result = bitwise_shift_right_dyn(Arc::clone(&result), Arc::clone(&modules))?;
4244        assert_eq!(result.as_ref(), &input);
4245        Ok(())
4246    }
4247
4248    #[test]
4249    fn bitwise_shift_array_overflow_test() -> Result<()> {
4250        let input = Arc::new(Int32Array::from(vec![Some(2)])) as ArrayRef;
4251        let modules = Arc::new(Int32Array::from(vec![Some(100)])) as ArrayRef;
4252        let result = bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4253
4254        let expected = Int32Array::from(vec![Some(32)]);
4255        assert_eq!(result.as_ref(), &expected);
4256
4257        let input = Arc::new(UInt32Array::from(vec![Some(2)])) as ArrayRef;
4258        let modules = Arc::new(UInt32Array::from(vec![Some(100)])) as ArrayRef;
4259        let result = bitwise_shift_left_dyn(Arc::clone(&input), Arc::clone(&modules))?;
4260
4261        let expected = UInt32Array::from(vec![Some(32)]);
4262        assert_eq!(result.as_ref(), &expected);
4263        Ok(())
4264    }
4265
4266    #[test]
4267    fn bitwise_scalar_test() -> Result<()> {
4268        let left = Arc::new(Int32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4269        let right = ScalarValue::from(3i32);
4270        let mut result = bitwise_and_dyn_scalar(&left, right.clone()).unwrap()?;
4271        let expected = Int32Array::from(vec![Some(0), None, Some(3)]);
4272        assert_eq!(result.as_ref(), &expected);
4273
4274        result = bitwise_or_dyn_scalar(&left, right.clone()).unwrap()?;
4275        let expected = Int32Array::from(vec![Some(15), None, Some(11)]);
4276        assert_eq!(result.as_ref(), &expected);
4277
4278        result = bitwise_xor_dyn_scalar(&left, right).unwrap()?;
4279        let expected = Int32Array::from(vec![Some(15), None, Some(8)]);
4280        assert_eq!(result.as_ref(), &expected);
4281
4282        let left =
4283            Arc::new(UInt32Array::from(vec![Some(12), None, Some(11)])) as ArrayRef;
4284        let right = ScalarValue::from(3u32);
4285        let mut result = bitwise_and_dyn_scalar(&left, right.clone()).unwrap()?;
4286        let expected = UInt32Array::from(vec![Some(0), None, Some(3)]);
4287        assert_eq!(result.as_ref(), &expected);
4288
4289        result = bitwise_or_dyn_scalar(&left, right.clone()).unwrap()?;
4290        let expected = UInt32Array::from(vec![Some(15), None, Some(11)]);
4291        assert_eq!(result.as_ref(), &expected);
4292
4293        result = bitwise_xor_dyn_scalar(&left, right).unwrap()?;
4294        let expected = UInt32Array::from(vec![Some(15), None, Some(8)]);
4295        assert_eq!(result.as_ref(), &expected);
4296        Ok(())
4297    }
4298
4299    #[test]
4300    fn bitwise_shift_scalar_test() -> Result<()> {
4301        let input = Arc::new(Int32Array::from(vec![Some(2), None, Some(4)])) as ArrayRef;
4302        let module = ScalarValue::from(10i32);
4303        let mut result =
4304            bitwise_shift_left_dyn_scalar(&input, module.clone()).unwrap()?;
4305
4306        let expected = Int32Array::from(vec![Some(2048), None, Some(4096)]);
4307        assert_eq!(result.as_ref(), &expected);
4308
4309        result = bitwise_shift_right_dyn_scalar(&result, module).unwrap()?;
4310        assert_eq!(result.as_ref(), &input);
4311
4312        let input = Arc::new(UInt32Array::from(vec![Some(2), None, Some(4)])) as ArrayRef;
4313        let module = ScalarValue::from(10u32);
4314        let mut result =
4315            bitwise_shift_left_dyn_scalar(&input, module.clone()).unwrap()?;
4316
4317        let expected = UInt32Array::from(vec![Some(2048), None, Some(4096)]);
4318        assert_eq!(result.as_ref(), &expected);
4319
4320        result = bitwise_shift_right_dyn_scalar(&result, module).unwrap()?;
4321        assert_eq!(result.as_ref(), &input);
4322        Ok(())
4323    }
4324
4325    #[test]
4326    fn test_display_and_or_combo() {
4327        let expr = BinaryExpr::new(
4328            Arc::new(BinaryExpr::new(
4329                lit(ScalarValue::from(1)),
4330                Operator::And,
4331                lit(ScalarValue::from(2)),
4332            )),
4333            Operator::And,
4334            Arc::new(BinaryExpr::new(
4335                lit(ScalarValue::from(3)),
4336                Operator::And,
4337                lit(ScalarValue::from(4)),
4338            )),
4339        );
4340        assert_eq!(expr.to_string(), "1 AND 2 AND 3 AND 4");
4341
4342        let expr = BinaryExpr::new(
4343            Arc::new(BinaryExpr::new(
4344                lit(ScalarValue::from(1)),
4345                Operator::Or,
4346                lit(ScalarValue::from(2)),
4347            )),
4348            Operator::Or,
4349            Arc::new(BinaryExpr::new(
4350                lit(ScalarValue::from(3)),
4351                Operator::Or,
4352                lit(ScalarValue::from(4)),
4353            )),
4354        );
4355        assert_eq!(expr.to_string(), "1 OR 2 OR 3 OR 4");
4356
4357        let expr = BinaryExpr::new(
4358            Arc::new(BinaryExpr::new(
4359                lit(ScalarValue::from(1)),
4360                Operator::And,
4361                lit(ScalarValue::from(2)),
4362            )),
4363            Operator::Or,
4364            Arc::new(BinaryExpr::new(
4365                lit(ScalarValue::from(3)),
4366                Operator::And,
4367                lit(ScalarValue::from(4)),
4368            )),
4369        );
4370        assert_eq!(expr.to_string(), "1 AND 2 OR 3 AND 4");
4371
4372        let expr = BinaryExpr::new(
4373            Arc::new(BinaryExpr::new(
4374                lit(ScalarValue::from(1)),
4375                Operator::Or,
4376                lit(ScalarValue::from(2)),
4377            )),
4378            Operator::And,
4379            Arc::new(BinaryExpr::new(
4380                lit(ScalarValue::from(3)),
4381                Operator::Or,
4382                lit(ScalarValue::from(4)),
4383            )),
4384        );
4385        assert_eq!(expr.to_string(), "(1 OR 2) AND (3 OR 4)");
4386    }
4387
4388    #[test]
4389    fn test_to_result_type_array() {
4390        let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
4391        let keys = Int8Array::from(vec![Some(0), None, Some(2), Some(3)]);
4392        let dictionary =
4393            Arc::new(DictionaryArray::try_new(keys, values).unwrap()) as ArrayRef;
4394
4395        // Casting Dictionary to Int32
4396        let casted = to_result_type_array(
4397            &Operator::Plus,
4398            Arc::clone(&dictionary),
4399            &DataType::Int32,
4400        )
4401        .unwrap();
4402        assert_eq!(
4403            &casted,
4404            &(Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]))
4405                as ArrayRef)
4406        );
4407
4408        // Array has same datatype as result type, no casting
4409        let casted = to_result_type_array(
4410            &Operator::Plus,
4411            Arc::clone(&dictionary),
4412            dictionary.data_type(),
4413        )
4414        .unwrap();
4415        assert_eq!(&casted, &dictionary);
4416
4417        // Not numerical operator, no casting
4418        let casted = to_result_type_array(
4419            &Operator::Eq,
4420            Arc::clone(&dictionary),
4421            &DataType::Int32,
4422        )
4423        .unwrap();
4424        assert_eq!(&casted, &dictionary);
4425    }
4426
4427    #[test]
4428    fn test_add_with_overflow() -> Result<()> {
4429        // create test data
4430        let l = Arc::new(Int32Array::from(vec![1, i32::MAX]));
4431        let r = Arc::new(Int32Array::from(vec![2, 1]));
4432        let schema = Arc::new(Schema::new(vec![
4433            Field::new("l", DataType::Int32, false),
4434            Field::new("r", DataType::Int32, false),
4435        ]));
4436        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4437
4438        // create expression
4439        let expr = BinaryExpr::new(
4440            Arc::new(Column::new("l", 0)),
4441            Operator::Plus,
4442            Arc::new(Column::new("r", 1)),
4443        )
4444        .with_fail_on_overflow(true);
4445
4446        // evaluate expression
4447        let result = expr.evaluate(&batch);
4448        assert!(result
4449            .err()
4450            .unwrap()
4451            .to_string()
4452            .contains("Overflow happened on: 2147483647 + 1"));
4453        Ok(())
4454    }
4455
4456    #[test]
4457    fn test_subtract_with_overflow() -> Result<()> {
4458        // create test data
4459        let l = Arc::new(Int32Array::from(vec![1, i32::MIN]));
4460        let r = Arc::new(Int32Array::from(vec![2, 1]));
4461        let schema = Arc::new(Schema::new(vec![
4462            Field::new("l", DataType::Int32, false),
4463            Field::new("r", DataType::Int32, false),
4464        ]));
4465        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4466
4467        // create expression
4468        let expr = BinaryExpr::new(
4469            Arc::new(Column::new("l", 0)),
4470            Operator::Minus,
4471            Arc::new(Column::new("r", 1)),
4472        )
4473        .with_fail_on_overflow(true);
4474
4475        // evaluate expression
4476        let result = expr.evaluate(&batch);
4477        assert!(result
4478            .err()
4479            .unwrap()
4480            .to_string()
4481            .contains("Overflow happened on: -2147483648 - 1"));
4482        Ok(())
4483    }
4484
4485    #[test]
4486    fn test_mul_with_overflow() -> Result<()> {
4487        // create test data
4488        let l = Arc::new(Int32Array::from(vec![1, i32::MAX]));
4489        let r = Arc::new(Int32Array::from(vec![2, 2]));
4490        let schema = Arc::new(Schema::new(vec![
4491            Field::new("l", DataType::Int32, false),
4492            Field::new("r", DataType::Int32, false),
4493        ]));
4494        let batch = RecordBatch::try_new(schema, vec![l, r])?;
4495
4496        // create expression
4497        let expr = BinaryExpr::new(
4498            Arc::new(Column::new("l", 0)),
4499            Operator::Multiply,
4500            Arc::new(Column::new("r", 1)),
4501        )
4502        .with_fail_on_overflow(true);
4503
4504        // evaluate expression
4505        let result = expr.evaluate(&batch);
4506        assert!(result
4507            .err()
4508            .unwrap()
4509            .to_string()
4510            .contains("Overflow happened on: 2147483647 * 2"));
4511        Ok(())
4512    }
4513
4514    /// Test helper for SIMILAR TO binary operation
4515    fn apply_similar_to(
4516        schema: &SchemaRef,
4517        va: Vec<&str>,
4518        vb: Vec<&str>,
4519        negated: bool,
4520        case_insensitive: bool,
4521        expected: &BooleanArray,
4522    ) -> Result<()> {
4523        let a = StringArray::from(va);
4524        let b = StringArray::from(vb);
4525        let op = similar_to(
4526            negated,
4527            case_insensitive,
4528            col("a", schema)?,
4529            col("b", schema)?,
4530        )?;
4531        let batch =
4532            RecordBatch::try_new(Arc::clone(schema), vec![Arc::new(a), Arc::new(b)])?;
4533        let result = op
4534            .evaluate(&batch)?
4535            .into_array(batch.num_rows())
4536            .expect("Failed to convert to array");
4537        assert_eq!(result.as_ref(), expected);
4538
4539        Ok(())
4540    }
4541
4542    #[test]
4543    fn test_similar_to() {
4544        let schema = Arc::new(Schema::new(vec![
4545            Field::new("a", DataType::Utf8, false),
4546            Field::new("b", DataType::Utf8, false),
4547        ]));
4548
4549        let expected = [Some(true), Some(false)].iter().collect();
4550        // case-sensitive
4551        apply_similar_to(
4552            &schema,
4553            vec!["hello world", "Hello World"],
4554            vec!["hello.*", "hello.*"],
4555            false,
4556            false,
4557            &expected,
4558        )
4559        .unwrap();
4560        // case-insensitive
4561        apply_similar_to(
4562            &schema,
4563            vec!["hello world", "bye"],
4564            vec!["hello.*", "hello.*"],
4565            false,
4566            true,
4567            &expected,
4568        )
4569        .unwrap();
4570    }
4571
4572    pub fn binary_expr(
4573        left: Arc<dyn PhysicalExpr>,
4574        op: Operator,
4575        right: Arc<dyn PhysicalExpr>,
4576        schema: &Schema,
4577    ) -> Result<BinaryExpr> {
4578        Ok(binary_op(left, op, right, schema)?
4579            .as_any()
4580            .downcast_ref::<BinaryExpr>()
4581            .unwrap()
4582            .clone())
4583    }
4584
4585    /// Test for Uniform-Uniform, Unknown-Uniform, Uniform-Unknown and Unknown-Unknown evaluation.
4586    #[test]
4587    fn test_evaluate_statistics_combination_of_range_holders() -> Result<()> {
4588        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4589        let a = Arc::new(Column::new("a", 0)) as _;
4590        let b = lit(ScalarValue::from(12.0));
4591
4592        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4593        let right_interval = Interval::make(Some(12.0), Some(36.0))?;
4594        let (left_mean, right_mean) = (ScalarValue::from(6.0), ScalarValue::from(24.0));
4595        let (left_med, right_med) = (ScalarValue::from(6.0), ScalarValue::from(24.0));
4596
4597        for children in [
4598            vec![
4599                &Distribution::new_uniform(left_interval.clone())?,
4600                &Distribution::new_uniform(right_interval.clone())?,
4601            ],
4602            vec![
4603                &Distribution::new_generic(
4604                    left_mean.clone(),
4605                    left_med.clone(),
4606                    ScalarValue::Float64(None),
4607                    left_interval.clone(),
4608                )?,
4609                &Distribution::new_uniform(right_interval.clone())?,
4610            ],
4611            vec![
4612                &Distribution::new_uniform(right_interval.clone())?,
4613                &Distribution::new_generic(
4614                    right_mean.clone(),
4615                    right_med.clone(),
4616                    ScalarValue::Float64(None),
4617                    right_interval.clone(),
4618                )?,
4619            ],
4620            vec![
4621                &Distribution::new_generic(
4622                    left_mean.clone(),
4623                    left_med.clone(),
4624                    ScalarValue::Float64(None),
4625                    left_interval.clone(),
4626                )?,
4627                &Distribution::new_generic(
4628                    right_mean.clone(),
4629                    right_med.clone(),
4630                    ScalarValue::Float64(None),
4631                    right_interval.clone(),
4632                )?,
4633            ],
4634        ] {
4635            let ops = vec![
4636                Operator::Plus,
4637                Operator::Minus,
4638                Operator::Multiply,
4639                Operator::Divide,
4640            ];
4641
4642            for op in ops {
4643                let expr = binary_expr(Arc::clone(&a), op, Arc::clone(&b), schema)?;
4644                assert_eq!(
4645                    expr.evaluate_statistics(&children)?,
4646                    new_generic_from_binary_op(&op, children[0], children[1])?
4647                );
4648            }
4649        }
4650        Ok(())
4651    }
4652
4653    #[test]
4654    fn test_evaluate_statistics_bernoulli() -> Result<()> {
4655        let schema = &Schema::new(vec![
4656            Field::new("a", DataType::Int64, false),
4657            Field::new("b", DataType::Int64, false),
4658        ]);
4659        let a = Arc::new(Column::new("a", 0)) as _;
4660        let b = Arc::new(Column::new("b", 1)) as _;
4661        let eq = Arc::new(binary_expr(
4662            Arc::clone(&a),
4663            Operator::Eq,
4664            Arc::clone(&b),
4665            schema,
4666        )?);
4667        let neq = Arc::new(binary_expr(a, Operator::NotEq, b, schema)?);
4668
4669        let left_stat = &Distribution::new_uniform(Interval::make(Some(0), Some(7))?)?;
4670        let right_stat = &Distribution::new_uniform(Interval::make(Some(4), Some(11))?)?;
4671
4672        // Intervals: [0, 7], [4, 11].
4673        // The intersection is [4, 7], so the probability of equality is 4 / 64 = 1 / 16.
4674        assert_eq!(
4675            eq.evaluate_statistics(&[left_stat, right_stat])?,
4676            Distribution::new_bernoulli(ScalarValue::from(1.0 / 16.0))?
4677        );
4678
4679        // The probability of being distinct is 1 - 1 / 16 = 15 / 16.
4680        assert_eq!(
4681            neq.evaluate_statistics(&[left_stat, right_stat])?,
4682            Distribution::new_bernoulli(ScalarValue::from(15.0 / 16.0))?
4683        );
4684
4685        Ok(())
4686    }
4687
4688    #[test]
4689    fn test_propagate_statistics_combination_of_range_holders_arithmetic() -> Result<()> {
4690        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4691        let a = Arc::new(Column::new("a", 0)) as _;
4692        let b = lit(ScalarValue::from(12.0));
4693
4694        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4695        let right_interval = Interval::make(Some(12.0), Some(36.0))?;
4696
4697        let parent = Distribution::new_uniform(Interval::make(Some(-432.), Some(432.))?)?;
4698        let children = vec![
4699            vec![
4700                Distribution::new_uniform(left_interval.clone())?,
4701                Distribution::new_uniform(right_interval.clone())?,
4702            ],
4703            vec![
4704                Distribution::new_generic(
4705                    ScalarValue::from(6.),
4706                    ScalarValue::from(6.),
4707                    ScalarValue::Float64(None),
4708                    left_interval.clone(),
4709                )?,
4710                Distribution::new_uniform(right_interval.clone())?,
4711            ],
4712            vec![
4713                Distribution::new_uniform(left_interval.clone())?,
4714                Distribution::new_generic(
4715                    ScalarValue::from(12.),
4716                    ScalarValue::from(12.),
4717                    ScalarValue::Float64(None),
4718                    right_interval.clone(),
4719                )?,
4720            ],
4721            vec![
4722                Distribution::new_generic(
4723                    ScalarValue::from(6.),
4724                    ScalarValue::from(6.),
4725                    ScalarValue::Float64(None),
4726                    left_interval.clone(),
4727                )?,
4728                Distribution::new_generic(
4729                    ScalarValue::from(12.),
4730                    ScalarValue::from(12.),
4731                    ScalarValue::Float64(None),
4732                    right_interval.clone(),
4733                )?,
4734            ],
4735        ];
4736
4737        let ops = vec![
4738            Operator::Plus,
4739            Operator::Minus,
4740            Operator::Multiply,
4741            Operator::Divide,
4742        ];
4743
4744        for child_view in children {
4745            let child_refs = child_view.iter().collect::<Vec<_>>();
4746            for op in &ops {
4747                let expr = binary_expr(Arc::clone(&a), *op, Arc::clone(&b), schema)?;
4748                assert_eq!(
4749                    expr.propagate_statistics(&parent, child_refs.as_slice())?,
4750                    Some(child_view.clone())
4751                );
4752            }
4753        }
4754        Ok(())
4755    }
4756
4757    #[test]
4758    fn test_propagate_statistics_combination_of_range_holders_comparison() -> Result<()> {
4759        let schema = &Schema::new(vec![Field::new("a", DataType::Float64, false)]);
4760        let a = Arc::new(Column::new("a", 0)) as _;
4761        let b = lit(ScalarValue::from(12.0));
4762
4763        let left_interval = Interval::make(Some(0.0), Some(12.0))?;
4764        let right_interval = Interval::make(Some(6.0), Some(18.0))?;
4765
4766        let one = ScalarValue::from(1.0);
4767        let parent = Distribution::new_bernoulli(one)?;
4768        let children = vec![
4769            vec![
4770                Distribution::new_uniform(left_interval.clone())?,
4771                Distribution::new_uniform(right_interval.clone())?,
4772            ],
4773            vec![
4774                Distribution::new_generic(
4775                    ScalarValue::from(6.),
4776                    ScalarValue::from(6.),
4777                    ScalarValue::Float64(None),
4778                    left_interval.clone(),
4779                )?,
4780                Distribution::new_uniform(right_interval.clone())?,
4781            ],
4782            vec![
4783                Distribution::new_uniform(left_interval.clone())?,
4784                Distribution::new_generic(
4785                    ScalarValue::from(12.),
4786                    ScalarValue::from(12.),
4787                    ScalarValue::Float64(None),
4788                    right_interval.clone(),
4789                )?,
4790            ],
4791            vec![
4792                Distribution::new_generic(
4793                    ScalarValue::from(6.),
4794                    ScalarValue::from(6.),
4795                    ScalarValue::Float64(None),
4796                    left_interval.clone(),
4797                )?,
4798                Distribution::new_generic(
4799                    ScalarValue::from(12.),
4800                    ScalarValue::from(12.),
4801                    ScalarValue::Float64(None),
4802                    right_interval.clone(),
4803                )?,
4804            ],
4805        ];
4806
4807        let ops = vec![
4808            Operator::Eq,
4809            Operator::Gt,
4810            Operator::GtEq,
4811            Operator::Lt,
4812            Operator::LtEq,
4813        ];
4814
4815        for child_view in children {
4816            let child_refs = child_view.iter().collect::<Vec<_>>();
4817            for op in &ops {
4818                let expr = binary_expr(Arc::clone(&a), *op, Arc::clone(&b), schema)?;
4819                assert!(expr
4820                    .propagate_statistics(&parent, child_refs.as_slice())?
4821                    .is_some());
4822            }
4823        }
4824
4825        Ok(())
4826    }
4827
4828    #[test]
4829    fn test_fmt_sql() -> Result<()> {
4830        let schema = Schema::new(vec![
4831            Field::new("a", DataType::Int32, false),
4832            Field::new("b", DataType::Int32, false),
4833        ]);
4834
4835        // Test basic binary expressions
4836        let simple_expr = binary_expr(
4837            col("a", &schema)?,
4838            Operator::Plus,
4839            col("b", &schema)?,
4840            &schema,
4841        )?;
4842        let display_string = simple_expr.to_string();
4843        assert_eq!(display_string, "a@0 + b@1");
4844        let sql_string = fmt_sql(&simple_expr).to_string();
4845        assert_eq!(sql_string, "a + b");
4846
4847        // Test nested expressions with different operator precedence
4848        let nested_expr = binary_expr(
4849            Arc::new(binary_expr(
4850                col("a", &schema)?,
4851                Operator::Plus,
4852                col("b", &schema)?,
4853                &schema,
4854            )?),
4855            Operator::Multiply,
4856            col("b", &schema)?,
4857            &schema,
4858        )?;
4859        let display_string = nested_expr.to_string();
4860        assert_eq!(display_string, "(a@0 + b@1) * b@1");
4861        let sql_string = fmt_sql(&nested_expr).to_string();
4862        assert_eq!(sql_string, "(a + b) * b");
4863
4864        // Test nested expressions with same operator precedence
4865        let nested_same_prec = binary_expr(
4866            Arc::new(binary_expr(
4867                col("a", &schema)?,
4868                Operator::Plus,
4869                col("b", &schema)?,
4870                &schema,
4871            )?),
4872            Operator::Plus,
4873            col("b", &schema)?,
4874            &schema,
4875        )?;
4876        let display_string = nested_same_prec.to_string();
4877        assert_eq!(display_string, "a@0 + b@1 + b@1");
4878        let sql_string = fmt_sql(&nested_same_prec).to_string();
4879        assert_eq!(sql_string, "a + b + b");
4880
4881        // Test with literals
4882        let lit_expr = binary_expr(
4883            col("a", &schema)?,
4884            Operator::Eq,
4885            lit(ScalarValue::Int32(Some(42))),
4886            &schema,
4887        )?;
4888        let display_string = lit_expr.to_string();
4889        assert_eq!(display_string, "a@0 = 42");
4890        let sql_string = fmt_sql(&lit_expr).to_string();
4891        assert_eq!(sql_string, "a = 42");
4892
4893        Ok(())
4894    }
4895
4896    #[test]
4897    fn test_check_short_circuit() {
4898        // Test with non-nullable arrays
4899        let schema = Arc::new(Schema::new(vec![
4900            Field::new("a", DataType::Int32, false),
4901            Field::new("b", DataType::Int32, false),
4902        ]));
4903        let a_array = Int32Array::from(vec![1, 3, 4, 5, 6]);
4904        let b_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
4905        let batch = RecordBatch::try_new(
4906            Arc::clone(&schema),
4907            vec![Arc::new(a_array), Arc::new(b_array)],
4908        )
4909        .unwrap();
4910
4911        // op: AND left: all false
4912        let left_expr = logical2physical(&logical_col("a").eq(expr_lit(2)), &schema);
4913        let left_value = left_expr.evaluate(&batch).unwrap();
4914        assert!(matches!(
4915            check_short_circuit(&left_value, &Operator::And),
4916            ShortCircuitStrategy::ReturnLeft
4917        ));
4918
4919        // op: AND left: not all false
4920        let left_expr = logical2physical(&logical_col("a").eq(expr_lit(3)), &schema);
4921        let left_value = left_expr.evaluate(&batch).unwrap();
4922        let ColumnarValue::Array(array) = &left_value else {
4923            panic!("Expected ColumnarValue::Array");
4924        };
4925        let ShortCircuitStrategy::PreSelection(value) =
4926            check_short_circuit(&left_value, &Operator::And)
4927        else {
4928            panic!("Expected ShortCircuitStrategy::PreSelection");
4929        };
4930        let expected_boolean_arr: Vec<_> =
4931            as_boolean_array(array).unwrap().iter().collect();
4932        let boolean_arr: Vec<_> = value.iter().collect();
4933        assert_eq!(expected_boolean_arr, boolean_arr);
4934
4935        // op: OR left: all true
4936        let left_expr = logical2physical(&logical_col("a").gt(expr_lit(0)), &schema);
4937        let left_value = left_expr.evaluate(&batch).unwrap();
4938        assert!(matches!(
4939            check_short_circuit(&left_value, &Operator::Or),
4940            ShortCircuitStrategy::ReturnLeft
4941        ));
4942
4943        // op: OR left: not all true
4944        let left_expr: Arc<dyn PhysicalExpr> =
4945            logical2physical(&logical_col("a").gt(expr_lit(2)), &schema);
4946        let left_value = left_expr.evaluate(&batch).unwrap();
4947        assert!(matches!(
4948            check_short_circuit(&left_value, &Operator::Or),
4949            ShortCircuitStrategy::None
4950        ));
4951
4952        // Test with nullable arrays and null values
4953        let schema_nullable = Arc::new(Schema::new(vec![
4954            Field::new("c", DataType::Boolean, true),
4955            Field::new("d", DataType::Boolean, true),
4956        ]));
4957
4958        // Create arrays with null values
4959        let c_array = Arc::new(BooleanArray::from(vec![
4960            Some(true),
4961            Some(false),
4962            None,
4963            Some(true),
4964            None,
4965        ])) as ArrayRef;
4966        let d_array = Arc::new(BooleanArray::from(vec![
4967            Some(false),
4968            Some(true),
4969            Some(false),
4970            None,
4971            Some(true),
4972        ])) as ArrayRef;
4973
4974        let batch_nullable = RecordBatch::try_new(
4975            Arc::clone(&schema_nullable),
4976            vec![Arc::clone(&c_array), Arc::clone(&d_array)],
4977        )
4978        .unwrap();
4979
4980        // Case: Mixed values with nulls - shouldn't short-circuit for AND
4981        let mixed_nulls = logical2physical(&logical_col("c"), &schema_nullable);
4982        let mixed_nulls_value = mixed_nulls.evaluate(&batch_nullable).unwrap();
4983        assert!(matches!(
4984            check_short_circuit(&mixed_nulls_value, &Operator::And),
4985            ShortCircuitStrategy::None
4986        ));
4987
4988        // Case: Mixed values with nulls - shouldn't short-circuit for OR
4989        assert!(matches!(
4990            check_short_circuit(&mixed_nulls_value, &Operator::Or),
4991            ShortCircuitStrategy::None
4992        ));
4993
4994        // Test with all nulls
4995        let all_nulls = Arc::new(BooleanArray::from(vec![None, None, None])) as ArrayRef;
4996        let null_batch = RecordBatch::try_new(
4997            Arc::new(Schema::new(vec![Field::new("e", DataType::Boolean, true)])),
4998            vec![all_nulls],
4999        )
5000        .unwrap();
5001
5002        let null_expr = logical2physical(&logical_col("e"), &null_batch.schema());
5003        let null_value = null_expr.evaluate(&null_batch).unwrap();
5004
5005        // All nulls shouldn't short-circuit for AND or OR
5006        assert!(matches!(
5007            check_short_circuit(&null_value, &Operator::And),
5008            ShortCircuitStrategy::None
5009        ));
5010        assert!(matches!(
5011            check_short_circuit(&null_value, &Operator::Or),
5012            ShortCircuitStrategy::None
5013        ));
5014
5015        // Test with scalar values
5016        // Scalar true
5017        let scalar_true = ColumnarValue::Scalar(ScalarValue::Boolean(Some(true)));
5018        assert!(matches!(
5019            check_short_circuit(&scalar_true, &Operator::Or),
5020            ShortCircuitStrategy::ReturnLeft
5021        )); // Should short-circuit OR
5022        assert!(matches!(
5023            check_short_circuit(&scalar_true, &Operator::And),
5024            ShortCircuitStrategy::ReturnRight
5025        )); // Should return the RHS for AND
5026
5027        // Scalar false
5028        let scalar_false = ColumnarValue::Scalar(ScalarValue::Boolean(Some(false)));
5029        assert!(matches!(
5030            check_short_circuit(&scalar_false, &Operator::And),
5031            ShortCircuitStrategy::ReturnLeft
5032        )); // Should short-circuit AND
5033        assert!(matches!(
5034            check_short_circuit(&scalar_false, &Operator::Or),
5035            ShortCircuitStrategy::ReturnRight
5036        )); // Should return the RHS for OR
5037
5038        // Scalar null
5039        let scalar_null = ColumnarValue::Scalar(ScalarValue::Boolean(None));
5040        assert!(matches!(
5041            check_short_circuit(&scalar_null, &Operator::And),
5042            ShortCircuitStrategy::None
5043        ));
5044        assert!(matches!(
5045            check_short_circuit(&scalar_null, &Operator::Or),
5046            ShortCircuitStrategy::None
5047        ));
5048    }
5049
5050    /// Test for [pre_selection_scatter]
5051    /// Since [check_short_circuit] ensures that the left side does not contain null and is neither all_true nor all_false, as well as not being empty,
5052    /// the following tests have been designed:
5053    /// 1. Test sparse left with interleaved true/false
5054    /// 2. Test multiple consecutive true blocks
5055    /// 3. Test multiple consecutive true blocks
5056    /// 4. Test single true at first position
5057    /// 5. Test single true at last position
5058    /// 6. Test nulls in right array
5059    #[test]
5060    fn test_pre_selection_scatter() {
5061        fn create_bool_array(bools: Vec<bool>) -> BooleanArray {
5062            BooleanArray::from(bools.into_iter().map(Some).collect::<Vec<_>>())
5063        }
5064        // Test sparse left with interleaved true/false
5065        {
5066            // Left: [T, F, T, F, T]
5067            // Right: [F, T, F] (values for 3 true positions)
5068            let left = create_bool_array(vec![true, false, true, false, true]);
5069            let right = create_bool_array(vec![false, true, false]);
5070
5071            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5072            let result_arr = result.into_array(left.len()).unwrap();
5073
5074            let expected = create_bool_array(vec![false, false, true, false, false]);
5075            assert_eq!(&expected, result_arr.as_boolean());
5076        }
5077        // Test multiple consecutive true blocks
5078        {
5079            // Left: [F, T, T, F, T, T, T]
5080            // Right: [T, F, F, T, F]
5081            let left =
5082                create_bool_array(vec![false, true, true, false, true, true, true]);
5083            let right = create_bool_array(vec![true, false, false, true, false]);
5084
5085            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5086            let result_arr = result.into_array(left.len()).unwrap();
5087
5088            let expected =
5089                create_bool_array(vec![false, true, false, false, false, true, false]);
5090            assert_eq!(&expected, result_arr.as_boolean());
5091        }
5092        // Test single true at first position
5093        {
5094            // Left: [T, F, F]
5095            // Right: [F]
5096            let left = create_bool_array(vec![true, false, false]);
5097            let right = create_bool_array(vec![false]);
5098
5099            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5100            let result_arr = result.into_array(left.len()).unwrap();
5101
5102            let expected = create_bool_array(vec![false, false, false]);
5103            assert_eq!(&expected, result_arr.as_boolean());
5104        }
5105        // Test single true at last position
5106        {
5107            // Left: [F, F, T]
5108            // Right: [F]
5109            let left = create_bool_array(vec![false, false, true]);
5110            let right = create_bool_array(vec![false]);
5111
5112            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5113            let result_arr = result.into_array(left.len()).unwrap();
5114
5115            let expected = create_bool_array(vec![false, false, false]);
5116            assert_eq!(&expected, result_arr.as_boolean());
5117        }
5118        // Test nulls in right array
5119        {
5120            // Left: [F, T, F, T]
5121            // Right: [None, Some(false)] (with null at first position)
5122            let left = create_bool_array(vec![false, true, false, true]);
5123            let right = BooleanArray::from(vec![None, Some(false)]);
5124
5125            let result = pre_selection_scatter(&left, Some(&right)).unwrap();
5126            let result_arr = result.into_array(left.len()).unwrap();
5127
5128            let expected = BooleanArray::from(vec![
5129                Some(false),
5130                None, // null from right
5131                Some(false),
5132                Some(false),
5133            ]);
5134            assert_eq!(&expected, result_arr.as_boolean());
5135        }
5136    }
5137
5138    #[test]
5139    fn test_and_true_preselection_returns_lhs() {
5140        let schema =
5141            Arc::new(Schema::new(vec![Field::new("c", DataType::Boolean, false)]));
5142        let c_array = Arc::new(BooleanArray::from(vec![false, true, false, false, false]))
5143            as ArrayRef;
5144        let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::clone(&c_array)])
5145            .unwrap();
5146
5147        let expr = logical2physical(&logical_col("c").and(expr_lit(true)), &schema);
5148
5149        let result = expr.evaluate(&batch).unwrap();
5150        let ColumnarValue::Array(result_arr) = result else {
5151            panic!("Expected ColumnarValue::Array");
5152        };
5153
5154        let expected: Vec<_> = c_array.as_boolean().iter().collect();
5155        let actual: Vec<_> = result_arr.as_boolean().iter().collect();
5156        assert_eq!(
5157            expected, actual,
5158            "AND with TRUE must equal LHS even with PreSelection"
5159        );
5160    }
5161
5162    #[test]
5163    fn test_evaluate_bounds_int32() {
5164        let schema = Schema::new(vec![
5165            Field::new("a", DataType::Int32, false),
5166            Field::new("b", DataType::Int32, false),
5167        ]);
5168
5169        let a = Arc::new(Column::new("a", 0)) as _;
5170        let b = Arc::new(Column::new("b", 1)) as _;
5171
5172        // Test addition bounds
5173        let add_expr =
5174            binary_expr(Arc::clone(&a), Operator::Plus, Arc::clone(&b), &schema).unwrap();
5175        let add_bounds = add_expr
5176            .evaluate_bounds(&[
5177                &Interval::make(Some(1), Some(10)).unwrap(),
5178                &Interval::make(Some(5), Some(15)).unwrap(),
5179            ])
5180            .unwrap();
5181        assert_eq!(add_bounds, Interval::make(Some(6), Some(25)).unwrap());
5182
5183        // Test subtraction bounds
5184        let sub_expr =
5185            binary_expr(Arc::clone(&a), Operator::Minus, Arc::clone(&b), &schema)
5186                .unwrap();
5187        let sub_bounds = sub_expr
5188            .evaluate_bounds(&[
5189                &Interval::make(Some(1), Some(10)).unwrap(),
5190                &Interval::make(Some(5), Some(15)).unwrap(),
5191            ])
5192            .unwrap();
5193        assert_eq!(sub_bounds, Interval::make(Some(-14), Some(5)).unwrap());
5194
5195        // Test multiplication bounds
5196        let mul_expr =
5197            binary_expr(Arc::clone(&a), Operator::Multiply, Arc::clone(&b), &schema)
5198                .unwrap();
5199        let mul_bounds = mul_expr
5200            .evaluate_bounds(&[
5201                &Interval::make(Some(1), Some(10)).unwrap(),
5202                &Interval::make(Some(5), Some(15)).unwrap(),
5203            ])
5204            .unwrap();
5205        assert_eq!(mul_bounds, Interval::make(Some(5), Some(150)).unwrap());
5206
5207        // Test division bounds
5208        let div_expr =
5209            binary_expr(Arc::clone(&a), Operator::Divide, Arc::clone(&b), &schema)
5210                .unwrap();
5211        let div_bounds = div_expr
5212            .evaluate_bounds(&[
5213                &Interval::make(Some(10), Some(20)).unwrap(),
5214                &Interval::make(Some(2), Some(5)).unwrap(),
5215            ])
5216            .unwrap();
5217        assert_eq!(div_bounds, Interval::make(Some(2), Some(10)).unwrap());
5218    }
5219
5220    #[test]
5221    fn test_evaluate_bounds_bool() {
5222        let schema = Schema::new(vec![
5223            Field::new("a", DataType::Boolean, false),
5224            Field::new("b", DataType::Boolean, false),
5225        ]);
5226
5227        let a = Arc::new(Column::new("a", 0)) as _;
5228        let b = Arc::new(Column::new("b", 1)) as _;
5229
5230        // Test OR bounds
5231        let or_expr =
5232            binary_expr(Arc::clone(&a), Operator::Or, Arc::clone(&b), &schema).unwrap();
5233        let or_bounds = or_expr
5234            .evaluate_bounds(&[
5235                &Interval::make(Some(true), Some(true)).unwrap(),
5236                &Interval::make(Some(false), Some(false)).unwrap(),
5237            ])
5238            .unwrap();
5239        assert_eq!(or_bounds, Interval::make(Some(true), Some(true)).unwrap());
5240
5241        // Test AND bounds
5242        let and_expr =
5243            binary_expr(Arc::clone(&a), Operator::And, Arc::clone(&b), &schema).unwrap();
5244        let and_bounds = and_expr
5245            .evaluate_bounds(&[
5246                &Interval::make(Some(true), Some(true)).unwrap(),
5247                &Interval::make(Some(false), Some(false)).unwrap(),
5248            ])
5249            .unwrap();
5250        assert_eq!(
5251            and_bounds,
5252            Interval::make(Some(false), Some(false)).unwrap()
5253        );
5254    }
5255
5256    #[test]
5257    fn test_evaluate_nested_type() {
5258        let batch_schema = Arc::new(Schema::new(vec![
5259            Field::new(
5260                "a",
5261                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
5262                true,
5263            ),
5264            Field::new(
5265                "b",
5266                DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true))),
5267                true,
5268            ),
5269        ]));
5270
5271        let mut list_builder_a = ListBuilder::new(Int32Builder::new());
5272
5273        list_builder_a.append_value([Some(1)]);
5274        list_builder_a.append_value([Some(2)]);
5275        list_builder_a.append_value([]);
5276        list_builder_a.append_value([None]);
5277
5278        let list_array_a: ArrayRef = Arc::new(list_builder_a.finish());
5279
5280        let mut list_builder_b = ListBuilder::new(Int32Builder::new());
5281
5282        list_builder_b.append_value([Some(1)]);
5283        list_builder_b.append_value([Some(2)]);
5284        list_builder_b.append_value([]);
5285        list_builder_b.append_value([None]);
5286
5287        let list_array_b: ArrayRef = Arc::new(list_builder_b.finish());
5288
5289        let batch =
5290            RecordBatch::try_new(batch_schema, vec![list_array_a, list_array_b]).unwrap();
5291
5292        let schema = Arc::new(Schema::new(vec![
5293            Field::new(
5294                "a",
5295                DataType::List(Arc::new(Field::new("foo", DataType::Int32, true))),
5296                true,
5297            ),
5298            Field::new(
5299                "b",
5300                DataType::List(Arc::new(Field::new("bar", DataType::Int32, true))),
5301                true,
5302            ),
5303        ]));
5304
5305        let a = Arc::new(Column::new("a", 0)) as _;
5306        let b = Arc::new(Column::new("b", 1)) as _;
5307
5308        let eq_expr =
5309            binary_expr(Arc::clone(&a), Operator::Eq, Arc::clone(&b), &schema).unwrap();
5310
5311        let eq_result = eq_expr.evaluate(&batch).unwrap();
5312        let expected =
5313            BooleanArray::from_iter(vec![Some(true), Some(true), Some(true), Some(true)]);
5314        assert_eq!(eq_result.into_array(4).unwrap().as_boolean(), &expected);
5315    }
5316}