Skip to main content

datafusion_physical_expr/expressions/
in_list.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Implementation of `InList` expressions: [`InListExpr`]
19
20use std::any::Any;
21use std::fmt::Debug;
22use std::hash::{Hash, Hasher};
23use std::sync::Arc;
24
25use crate::PhysicalExpr;
26use crate::physical_expr::physical_exprs_bag_equal;
27
28use arrow::array::*;
29use arrow::buffer::{BooleanBuffer, NullBuffer};
30use arrow::compute::kernels::boolean::{not, or_kleene};
31use arrow::compute::kernels::cmp::eq as arrow_eq;
32use arrow::compute::{SortOptions, take};
33use arrow::datatypes::*;
34use arrow::util::bit_iterator::BitIndexIterator;
35use datafusion_common::hash_utils::with_hashes;
36use datafusion_common::{
37    DFSchema, HashSet, Result, ScalarValue, assert_or_internal_err, exec_datafusion_err,
38    exec_err,
39};
40use datafusion_expr::{ColumnarValue, expr_vec_fmt};
41
42use ahash::RandomState;
43use datafusion_common::HashMap;
44use hashbrown::hash_map::RawEntryMut;
45
46/// Trait for InList static filters
47trait StaticFilter {
48    fn null_count(&self) -> usize;
49
50    /// Checks if values in `v` are contained in the filter
51    fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>;
52}
53
54/// InList
55pub struct InListExpr {
56    expr: Arc<dyn PhysicalExpr>,
57    list: Vec<Arc<dyn PhysicalExpr>>,
58    negated: bool,
59    static_filter: Option<Arc<dyn StaticFilter + Send + Sync>>,
60}
61
62impl Debug for InListExpr {
63    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64        f.debug_struct("InListExpr")
65            .field("expr", &self.expr)
66            .field("list", &self.list)
67            .field("negated", &self.negated)
68            .finish()
69    }
70}
71
72/// Static filter for InList that stores the array and hash set for O(1) lookups
73#[derive(Debug, Clone)]
74struct ArrayStaticFilter {
75    in_array: ArrayRef,
76    state: RandomState,
77    /// Used to provide a lookup from value to in list index
78    ///
79    /// Note: usize::hash is not used, instead the raw entry
80    /// API is used to store entries w.r.t their value
81    map: HashMap<usize, (), ()>,
82}
83
84impl StaticFilter for ArrayStaticFilter {
85    fn null_count(&self) -> usize {
86        self.in_array.null_count()
87    }
88
89    /// Checks if values in `v` are contained in the `in_array` using this hash set for lookup.
90    fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
91        // Null type comparisons always return null (SQL three-valued logic)
92        if v.data_type() == &DataType::Null
93            || self.in_array.data_type() == &DataType::Null
94        {
95            let nulls = NullBuffer::new_null(v.len());
96            return Ok(BooleanArray::new(
97                BooleanBuffer::new_unset(v.len()),
98                Some(nulls),
99            ));
100        }
101
102        // Unwrap dictionary-encoded needles when the value type matches
103        // in_array, evaluating against the dictionary values and mapping
104        // back via keys.
105        downcast_dictionary_array! {
106            v => {
107                // Only unwrap when the haystack (in_array) type matches
108                // the dictionary value type
109                if v.values().data_type() == self.in_array.data_type() {
110                    let values_contains = self.contains(v.values().as_ref(), negated)?;
111                    let result = take(&values_contains, v.keys(), None)?;
112                    return Ok(downcast_array(result.as_ref()));
113                }
114            }
115            _ => {}
116        }
117
118        let needle_nulls = v.logical_nulls();
119        let needle_nulls = needle_nulls.as_ref();
120        let haystack_has_nulls = self.in_array.null_count() != 0;
121
122        with_hashes([v], &self.state, |hashes| {
123            let cmp = make_comparator(v, &self.in_array, SortOptions::default())?;
124            Ok((0..v.len())
125                .map(|i| {
126                    // SQL three-valued logic: null IN (...) is always null
127                    if needle_nulls.is_some_and(|nulls| nulls.is_null(i)) {
128                        return None;
129                    }
130
131                    let hash = hashes[i];
132                    let contains = self
133                        .map
134                        .raw_entry()
135                        .from_hash(hash, |idx| cmp(i, *idx).is_eq())
136                        .is_some();
137
138                    match contains {
139                        true => Some(!negated),
140                        false if haystack_has_nulls => None,
141                        false => Some(negated),
142                    }
143                })
144                .collect())
145        })
146    }
147}
148
149/// Returns true if Arrow's vectorized `eq` kernel supports this data type.
150///
151/// Supported: primitives, boolean, strings (Utf8/LargeUtf8/Utf8View),
152/// binary (Binary/LargeBinary/BinaryView/FixedSizeBinary), Null, and
153/// Dictionary-encoded variants of the above.
154/// Unsupported: nested types (Struct, List, Map, Union) and RunEndEncoded.
155fn supports_arrow_eq(dt: &DataType) -> bool {
156    use DataType::*;
157    match dt {
158        Boolean | Binary | LargeBinary | BinaryView | FixedSizeBinary(_) => true,
159        Dictionary(_, v) => supports_arrow_eq(v.as_ref()),
160        _ => dt.is_primitive() || dt.is_null() || dt.is_string(),
161    }
162}
163
164fn instantiate_static_filter(
165    in_array: ArrayRef,
166) -> Result<Arc<dyn StaticFilter + Send + Sync>> {
167    match in_array.data_type() {
168        // Integer primitive types
169        DataType::Int8 => Ok(Arc::new(Int8StaticFilter::try_new(&in_array)?)),
170        DataType::Int16 => Ok(Arc::new(Int16StaticFilter::try_new(&in_array)?)),
171        DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)),
172        DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)),
173        DataType::UInt8 => Ok(Arc::new(UInt8StaticFilter::try_new(&in_array)?)),
174        DataType::UInt16 => Ok(Arc::new(UInt16StaticFilter::try_new(&in_array)?)),
175        DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)),
176        DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)),
177        // Float primitive types (use ordered wrappers for Hash/Eq)
178        DataType::Float32 => Ok(Arc::new(Float32StaticFilter::try_new(&in_array)?)),
179        DataType::Float64 => Ok(Arc::new(Float64StaticFilter::try_new(&in_array)?)),
180        _ => {
181            /* fall through to generic implementation for unsupported types (Struct, etc.) */
182            Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?))
183        }
184    }
185}
186
187impl ArrayStaticFilter {
188    /// Computes a [`StaticFilter`] for the provided [`Array`] if there
189    /// are nulls present or there are more than the configured number of
190    /// elements.
191    ///
192    /// Note: This is split into a separate function as higher-rank trait bounds currently
193    /// cause type inference to misbehave
194    fn try_new(in_array: ArrayRef) -> Result<ArrayStaticFilter> {
195        // Null type has no natural order - return empty hash set
196        if in_array.data_type() == &DataType::Null {
197            return Ok(ArrayStaticFilter {
198                in_array,
199                state: RandomState::new(),
200                map: HashMap::with_hasher(()),
201            });
202        }
203
204        let state = RandomState::new();
205        let mut map: HashMap<usize, (), ()> = HashMap::with_hasher(());
206
207        with_hashes([&in_array], &state, |hashes| -> Result<()> {
208            let cmp = make_comparator(&in_array, &in_array, SortOptions::default())?;
209
210            let insert_value = |idx| {
211                let hash = hashes[idx];
212                if let RawEntryMut::Vacant(v) = map
213                    .raw_entry_mut()
214                    .from_hash(hash, |x| cmp(*x, idx).is_eq())
215                {
216                    v.insert_with_hasher(hash, idx, (), |x| hashes[*x]);
217                }
218            };
219
220            match in_array.nulls() {
221                Some(nulls) => {
222                    BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len())
223                        .for_each(insert_value)
224                }
225                None => (0..in_array.len()).for_each(insert_value),
226            }
227
228            Ok(())
229        })?;
230
231        Ok(Self {
232            in_array,
233            state,
234            map,
235        })
236    }
237}
238
239/// Wrapper for f32 that implements Hash and Eq using bit comparison.
240/// This treats NaN values as equal to each other when they have the same bit pattern.
241#[derive(Clone, Copy)]
242struct OrderedFloat32(f32);
243
244impl Hash for OrderedFloat32 {
245    fn hash<H: Hasher>(&self, state: &mut H) {
246        self.0.to_ne_bytes().hash(state);
247    }
248}
249
250impl PartialEq for OrderedFloat32 {
251    fn eq(&self, other: &Self) -> bool {
252        self.0.to_bits() == other.0.to_bits()
253    }
254}
255
256impl Eq for OrderedFloat32 {}
257
258impl From<f32> for OrderedFloat32 {
259    fn from(v: f32) -> Self {
260        Self(v)
261    }
262}
263
264/// Wrapper for f64 that implements Hash and Eq using bit comparison.
265/// This treats NaN values as equal to each other when they have the same bit pattern.
266#[derive(Clone, Copy)]
267struct OrderedFloat64(f64);
268
269impl Hash for OrderedFloat64 {
270    fn hash<H: Hasher>(&self, state: &mut H) {
271        self.0.to_ne_bytes().hash(state);
272    }
273}
274
275impl PartialEq for OrderedFloat64 {
276    fn eq(&self, other: &Self) -> bool {
277        self.0.to_bits() == other.0.to_bits()
278    }
279}
280
281impl Eq for OrderedFloat64 {}
282
283impl From<f64> for OrderedFloat64 {
284    fn from(v: f64) -> Self {
285        Self(v)
286    }
287}
288
289// Macro to generate specialized StaticFilter implementations for primitive types
290macro_rules! primitive_static_filter {
291    ($Name:ident, $ArrowType:ty) => {
292        struct $Name {
293            null_count: usize,
294            values: HashSet<<$ArrowType as ArrowPrimitiveType>::Native>,
295        }
296
297        impl $Name {
298            fn try_new(in_array: &ArrayRef) -> Result<Self> {
299                let in_array = in_array
300                    .as_primitive_opt::<$ArrowType>()
301                    .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
302
303                let mut values = HashSet::with_capacity(in_array.len());
304                let null_count = in_array.null_count();
305
306                for v in in_array.iter().flatten() {
307                    values.insert(v);
308                }
309
310                Ok(Self { null_count, values })
311            }
312        }
313
314        impl StaticFilter for $Name {
315            fn null_count(&self) -> usize {
316                self.null_count
317            }
318
319            fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
320                // Handle dictionary arrays by recursing on the values
321                downcast_dictionary_array! {
322                    v => {
323                        let values_contains = self.contains(v.values().as_ref(), negated)?;
324                        let result = take(&values_contains, v.keys(), None)?;
325                        return Ok(downcast_array(result.as_ref()))
326                    }
327                    _ => {}
328                }
329
330                let v = v
331                    .as_primitive_opt::<$ArrowType>()
332                    .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
333
334                let haystack_has_nulls = self.null_count > 0;
335
336                let needle_values = v.values();
337                let needle_nulls = v.nulls();
338                let needle_has_nulls = v.null_count() > 0;
339
340                // Truth table for `value [NOT] IN (set)` with SQL three-valued logic:
341                // ("-" means the value doesn't affect the result)
342                //
343                // | needle_null | haystack_null | negated | in set? | result |
344                // |-------------|---------------|---------|---------|--------|
345                // | true        | -             | false   | -       | null   |
346                // | true        | -             | true    | -       | null   |
347                // | false       | true          | false   | yes     | true   |
348                // | false       | true          | false   | no      | null   |
349                // | false       | true          | true    | yes     | false  |
350                // | false       | true          | true    | no      | null   |
351                // | false       | false         | false   | yes     | true   |
352                // | false       | false         | false   | no      | false  |
353                // | false       | false         | true    | yes     | false  |
354                // | false       | false         | true    | no      | true   |
355
356                // Compute the "contains" result using collect_bool (fast batched approach)
357                // This ignores nulls - we handle them separately
358                let contains_buffer = if negated {
359                    BooleanBuffer::collect_bool(needle_values.len(), |i| {
360                        !self.values.contains(&needle_values[i])
361                    })
362                } else {
363                    BooleanBuffer::collect_bool(needle_values.len(), |i| {
364                        self.values.contains(&needle_values[i])
365                    })
366                };
367
368                // Compute the null mask
369                // Output is null when:
370                // 1. needle value is null, OR
371                // 2. needle value is not in set AND haystack has nulls
372                let result_nulls = match (needle_has_nulls, haystack_has_nulls) {
373                    (false, false) => {
374                        // No nulls anywhere
375                        None
376                    }
377                    (true, false) => {
378                        // Only needle has nulls - just use needle's null mask
379                        needle_nulls.cloned()
380                    }
381                    (false, true) => {
382                        // Only haystack has nulls - result is null when value not in set
383                        // Valid (not null) when original "in set" is true
384                        // For NOT IN: contains_buffer = !original, so validity = !contains_buffer
385                        let validity = if negated {
386                            !&contains_buffer
387                        } else {
388                            contains_buffer.clone()
389                        };
390                        Some(NullBuffer::new(validity))
391                    }
392                    (true, true) => {
393                        // Both have nulls - combine needle nulls with haystack-induced nulls
394                        let needle_validity = needle_nulls.map(|n| n.inner().clone())
395                            .unwrap_or_else(|| BooleanBuffer::new_set(needle_values.len()));
396
397                        // Valid when original "in set" is true (see above)
398                        let haystack_validity = if negated {
399                            !&contains_buffer
400                        } else {
401                            contains_buffer.clone()
402                        };
403
404                        // Combined validity: valid only where both are valid
405                        let combined_validity = &needle_validity & &haystack_validity;
406                        Some(NullBuffer::new(combined_validity))
407                    }
408                };
409
410                Ok(BooleanArray::new(contains_buffer, result_nulls))
411            }
412        }
413    };
414}
415
416// Generate specialized filters for all integer primitive types
417primitive_static_filter!(Int8StaticFilter, Int8Type);
418primitive_static_filter!(Int16StaticFilter, Int16Type);
419primitive_static_filter!(Int32StaticFilter, Int32Type);
420primitive_static_filter!(Int64StaticFilter, Int64Type);
421primitive_static_filter!(UInt8StaticFilter, UInt8Type);
422primitive_static_filter!(UInt16StaticFilter, UInt16Type);
423primitive_static_filter!(UInt32StaticFilter, UInt32Type);
424primitive_static_filter!(UInt64StaticFilter, UInt64Type);
425
426// Macro to generate specialized StaticFilter implementations for float types
427// Floats require a wrapper type (OrderedFloat*) to implement Hash/Eq due to NaN semantics
428macro_rules! float_static_filter {
429    ($Name:ident, $ArrowType:ty, $OrderedType:ty) => {
430        struct $Name {
431            null_count: usize,
432            values: HashSet<$OrderedType>,
433        }
434
435        impl $Name {
436            fn try_new(in_array: &ArrayRef) -> Result<Self> {
437                let in_array = in_array
438                    .as_primitive_opt::<$ArrowType>()
439                    .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
440
441                let mut values = HashSet::with_capacity(in_array.len());
442                let null_count = in_array.null_count();
443
444                for v in in_array.iter().flatten() {
445                    values.insert(<$OrderedType>::from(v));
446                }
447
448                Ok(Self { null_count, values })
449            }
450        }
451
452        impl StaticFilter for $Name {
453            fn null_count(&self) -> usize {
454                self.null_count
455            }
456
457            fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
458                // Handle dictionary arrays by recursing on the values
459                downcast_dictionary_array! {
460                    v => {
461                        let values_contains = self.contains(v.values().as_ref(), negated)?;
462                        let result = take(&values_contains, v.keys(), None)?;
463                        return Ok(downcast_array(result.as_ref()))
464                    }
465                    _ => {}
466                }
467
468                let v = v
469                    .as_primitive_opt::<$ArrowType>()
470                    .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
471
472                let haystack_has_nulls = self.null_count > 0;
473
474                let needle_values = v.values();
475                let needle_nulls = v.nulls();
476                let needle_has_nulls = v.null_count() > 0;
477
478                // Truth table for `value [NOT] IN (set)` with SQL three-valued logic:
479                // ("-" means the value doesn't affect the result)
480                //
481                // | needle_null | haystack_null | negated | in set? | result |
482                // |-------------|---------------|---------|---------|--------|
483                // | true        | -             | false   | -       | null   |
484                // | true        | -             | true    | -       | null   |
485                // | false       | true          | false   | yes     | true   |
486                // | false       | true          | false   | no      | null   |
487                // | false       | true          | true    | yes     | false  |
488                // | false       | true          | true    | no      | null   |
489                // | false       | false         | false   | yes     | true   |
490                // | false       | false         | false   | no      | false  |
491                // | false       | false         | true    | yes     | false  |
492                // | false       | false         | true    | no      | true   |
493
494                // Compute the "contains" result using collect_bool (fast batched approach)
495                // This ignores nulls - we handle them separately
496                let contains_buffer = if negated {
497                    BooleanBuffer::collect_bool(needle_values.len(), |i| {
498                        !self.values.contains(&<$OrderedType>::from(needle_values[i]))
499                    })
500                } else {
501                    BooleanBuffer::collect_bool(needle_values.len(), |i| {
502                        self.values.contains(&<$OrderedType>::from(needle_values[i]))
503                    })
504                };
505
506                // Compute the null mask
507                // Output is null when:
508                // 1. needle value is null, OR
509                // 2. needle value is not in set AND haystack has nulls
510                let result_nulls = match (needle_has_nulls, haystack_has_nulls) {
511                    (false, false) => {
512                        // No nulls anywhere
513                        None
514                    }
515                    (true, false) => {
516                        // Only needle has nulls - just use needle's null mask
517                        needle_nulls.cloned()
518                    }
519                    (false, true) => {
520                        // Only haystack has nulls - result is null when value not in set
521                        // Valid (not null) when original "in set" is true
522                        // For NOT IN: contains_buffer = !original, so validity = !contains_buffer
523                        let validity = if negated {
524                            !&contains_buffer
525                        } else {
526                            contains_buffer.clone()
527                        };
528                        Some(NullBuffer::new(validity))
529                    }
530                    (true, true) => {
531                        // Both have nulls - combine needle nulls with haystack-induced nulls
532                        let needle_validity = needle_nulls.map(|n| n.inner().clone())
533                            .unwrap_or_else(|| BooleanBuffer::new_set(needle_values.len()));
534
535                        // Valid when original "in set" is true (see above)
536                        let haystack_validity = if negated {
537                            !&contains_buffer
538                        } else {
539                            contains_buffer.clone()
540                        };
541
542                        // Combined validity: valid only where both are valid
543                        let combined_validity = &needle_validity & &haystack_validity;
544                        Some(NullBuffer::new(combined_validity))
545                    }
546                };
547
548                Ok(BooleanArray::new(contains_buffer, result_nulls))
549            }
550        }
551    };
552}
553
554// Generate specialized filters for float types using ordered wrappers
555float_static_filter!(Float32StaticFilter, Float32Type, OrderedFloat32);
556float_static_filter!(Float64StaticFilter, Float64Type, OrderedFloat64);
557
558/// Evaluates the list of expressions into an array, flattening any dictionaries
559fn evaluate_list(
560    list: &[Arc<dyn PhysicalExpr>],
561    batch: &RecordBatch,
562) -> Result<ArrayRef> {
563    let scalars = list
564        .iter()
565        .map(|expr| {
566            expr.evaluate(batch).and_then(|r| match r {
567                ColumnarValue::Array(_) => {
568                    exec_err!("InList expression must evaluate to a scalar")
569                }
570                // Flatten dictionary values
571                ColumnarValue::Scalar(ScalarValue::Dictionary(_, v)) => Ok(*v),
572                ColumnarValue::Scalar(s) => Ok(s),
573            })
574        })
575        .collect::<Result<Vec<_>>>()?;
576
577    ScalarValue::iter_to_array(scalars)
578}
579
580/// Try to evaluate a list of expressions as constants.
581///
582/// Returns:
583/// - `Ok(Some(ArrayRef))` if all expressions are constants (can be evaluated on an empty RecordBatch)
584/// - `Ok(None)` if the list contains non-constant expressions
585/// - `Err(...)` only for actual errors (not for non-constant expressions)
586///
587/// This is used to detect when a list contains only literals, casts of literals,
588/// or other constant expressions.
589fn try_evaluate_constant_list(
590    list: &[Arc<dyn PhysicalExpr>],
591    schema: &Schema,
592) -> Result<Option<ArrayRef>> {
593    let batch = RecordBatch::new_empty(Arc::new(schema.clone()));
594    match evaluate_list(list, &batch) {
595        Ok(array) => Ok(Some(array)),
596        Err(_) => {
597            // Non-constant expressions can't be evaluated on an empty batch
598            // This is not an error, just means we can't use a static filter
599            Ok(None)
600        }
601    }
602}
603
604impl InListExpr {
605    /// Create a new InList expression
606    fn new(
607        expr: Arc<dyn PhysicalExpr>,
608        list: Vec<Arc<dyn PhysicalExpr>>,
609        negated: bool,
610        static_filter: Option<Arc<dyn StaticFilter + Send + Sync>>,
611    ) -> Self {
612        Self {
613            expr,
614            list,
615            negated,
616            static_filter,
617        }
618    }
619
620    /// Input expression
621    pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
622        &self.expr
623    }
624
625    /// List to search in
626    pub fn list(&self) -> &[Arc<dyn PhysicalExpr>] {
627        &self.list
628    }
629
630    pub fn is_empty(&self) -> bool {
631        self.list.is_empty()
632    }
633
634    pub fn len(&self) -> usize {
635        self.list.len()
636    }
637
638    /// Is this negated e.g. NOT IN LIST
639    pub fn negated(&self) -> bool {
640        self.negated
641    }
642
643    /// Create a new InList expression directly from an array, bypassing expression evaluation.
644    ///
645    /// This is more efficient than `in_list()` when you already have the list as an array,
646    /// as it avoids the conversion: `ArrayRef -> Vec<PhysicalExpr> -> ArrayRef -> StaticFilter`.
647    /// Instead it goes directly: `ArrayRef -> StaticFilter`.
648    ///
649    /// The `list` field will be empty when using this constructor, as the array is stored
650    /// directly in the static filter.
651    ///
652    /// This does not make the expression any more performant at runtime, but it does make it slightly
653    /// cheaper to build.
654    pub fn try_new_from_array(
655        expr: Arc<dyn PhysicalExpr>,
656        array: ArrayRef,
657        negated: bool,
658    ) -> Result<Self> {
659        let list = (0..array.len())
660            .map(|i| {
661                let scalar = ScalarValue::try_from_array(array.as_ref(), i)?;
662                Ok(crate::expressions::lit(scalar) as Arc<dyn PhysicalExpr>)
663            })
664            .collect::<Result<Vec<_>>>()?;
665        Ok(Self::new(
666            expr,
667            list,
668            negated,
669            Some(instantiate_static_filter(array)?),
670        ))
671    }
672
673    /// Create a new InList expression, using a static filter when possible.
674    ///
675    /// This validates data types and attempts to create a static filter for constant
676    /// list expressions. Uses specialized StaticFilter implementations for better
677    /// performance (e.g., Int32StaticFilter for Int32).
678    ///
679    /// Returns an error if data types don't match. If the list contains non-constant
680    /// expressions, falls back to dynamic evaluation at runtime.
681    pub fn try_new(
682        expr: Arc<dyn PhysicalExpr>,
683        list: Vec<Arc<dyn PhysicalExpr>>,
684        negated: bool,
685        schema: &Schema,
686    ) -> Result<Self> {
687        // Check the data types match
688        let expr_data_type = expr.data_type(schema)?;
689        for list_expr in list.iter() {
690            let list_expr_data_type = list_expr.data_type(schema)?;
691            assert_or_internal_err!(
692                DFSchema::datatype_is_logically_equal(
693                    &expr_data_type,
694                    &list_expr_data_type
695                ),
696                "The data type inlist should be same, the value type is {expr_data_type}, one of list expr type is {list_expr_data_type}"
697            );
698        }
699
700        // Try to create a static filter if all list expressions are constants
701        let static_filter = match try_evaluate_constant_list(&list, schema)? {
702            Some(in_array) => Some(instantiate_static_filter(in_array)?),
703            None => None, // Non-constant expressions, fall back to dynamic evaluation
704        };
705
706        Ok(Self::new(expr, list, negated, static_filter))
707    }
708}
709impl std::fmt::Display for InListExpr {
710    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
711        let list = expr_vec_fmt!(self.list);
712
713        if self.negated {
714            if self.static_filter.is_some() {
715                write!(f, "{} NOT IN (SET) ([{list}])", self.expr)
716            } else {
717                write!(f, "{} NOT IN ([{list}])", self.expr)
718            }
719        } else if self.static_filter.is_some() {
720            write!(f, "{} IN (SET) ([{list}])", self.expr)
721        } else {
722            write!(f, "{} IN ([{list}])", self.expr)
723        }
724    }
725}
726
727impl PhysicalExpr for InListExpr {
728    /// Return a reference to Any that can be used for downcasting
729    fn as_any(&self) -> &dyn Any {
730        self
731    }
732
733    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
734        Ok(DataType::Boolean)
735    }
736
737    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
738        if self.expr.nullable(input_schema)? {
739            return Ok(true);
740        }
741
742        if let Some(static_filter) = &self.static_filter {
743            Ok(static_filter.null_count() > 0)
744        } else {
745            for expr in &self.list {
746                if expr.nullable(input_schema)? {
747                    return Ok(true);
748                }
749            }
750            Ok(false)
751        }
752    }
753
754    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
755        let num_rows = batch.num_rows();
756        let value = self.expr.evaluate(batch)?;
757        let r = match &self.static_filter {
758            Some(filter) => {
759                match value {
760                    ColumnarValue::Array(array) => {
761                        filter.contains(&array, self.negated)?
762                    }
763                    ColumnarValue::Scalar(scalar) => {
764                        if scalar.is_null() {
765                            // SQL three-valued logic: null IN (...) is always null
766                            // The code below would handle this correctly but this is a faster path
767                            let nulls = NullBuffer::new_null(num_rows);
768                            return Ok(ColumnarValue::Array(Arc::new(
769                                BooleanArray::new(
770                                    BooleanBuffer::new_unset(num_rows),
771                                    Some(nulls),
772                                ),
773                            )));
774                        }
775                        // Use a 1 row array to avoid code duplication/branching
776                        // Since all we do is compute hash and lookup this should be efficient enough
777                        let array = scalar.to_array()?;
778                        let result_array =
779                            filter.contains(array.as_ref(), self.negated)?;
780                        // Broadcast the single result to all rows
781                        // Must check is_null() to preserve NULL values (SQL three-valued logic)
782                        if result_array.is_null(0) {
783                            let nulls = NullBuffer::new_null(num_rows);
784                            BooleanArray::new(
785                                BooleanBuffer::new_unset(num_rows),
786                                Some(nulls),
787                            )
788                        } else if result_array.value(0) {
789                            BooleanArray::new(BooleanBuffer::new_set(num_rows), None)
790                        } else {
791                            BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)
792                        }
793                    }
794                }
795            }
796            None => {
797                // No static filter: iterate through each expression, compare, and OR results.
798                // Use Arrow's vectorized eq kernel for types it supports (primitive,
799                // boolean, string, binary, dictionary), falling back to row-by-row
800                // comparator for unsupported types (nested, RunEndEncoded, etc.).
801                let value = value.into_array(num_rows)?;
802                let lhs_supports_arrow_eq = supports_arrow_eq(value.data_type());
803                let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold(
804                    BooleanArray::new(BooleanBuffer::new_unset(num_rows), None),
805                    |result, expr| -> Result<BooleanArray> {
806                        let rhs = match expr? {
807                            ColumnarValue::Array(array) => {
808                                if lhs_supports_arrow_eq
809                                    && supports_arrow_eq(array.data_type())
810                                {
811                                    arrow_eq(&value, &array)?
812                                } else {
813                                    let cmp = make_comparator(
814                                        value.as_ref(),
815                                        array.as_ref(),
816                                        SortOptions::default(),
817                                    )?;
818                                    (0..num_rows)
819                                        .map(|i| {
820                                            if value.is_null(i) || array.is_null(i) {
821                                                return None;
822                                            }
823                                            Some(cmp(i, i).is_eq())
824                                        })
825                                        .collect::<BooleanArray>()
826                                }
827                            }
828                            ColumnarValue::Scalar(scalar) => {
829                                // Check if scalar is null once, before the loop
830                                if scalar.is_null() {
831                                    // If scalar is null, all comparisons return null
832                                    BooleanArray::from(vec![None; num_rows])
833                                } else if lhs_supports_arrow_eq {
834                                    let scalar_datum = scalar.to_scalar()?;
835                                    arrow_eq(&value, &scalar_datum)?
836                                } else {
837                                    // Convert scalar to 1-element array
838                                    let array = scalar.to_array()?;
839                                    let cmp = make_comparator(
840                                        value.as_ref(),
841                                        array.as_ref(),
842                                        SortOptions::default(),
843                                    )?;
844                                    // Compare each row of value with the single scalar element
845                                    (0..num_rows)
846                                        .map(|i| {
847                                            if value.is_null(i) {
848                                                None
849                                            } else {
850                                                Some(cmp(i, 0).is_eq())
851                                            }
852                                        })
853                                        .collect::<BooleanArray>()
854                                }
855                            }
856                        };
857                        Ok(or_kleene(&result, &rhs)?)
858                    },
859                )?;
860
861                if self.negated { not(&found)? } else { found }
862            }
863        };
864        Ok(ColumnarValue::Array(Arc::new(r)))
865    }
866
867    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
868        let mut children = vec![&self.expr];
869        children.extend(&self.list);
870        children
871    }
872
873    fn with_new_children(
874        self: Arc<Self>,
875        children: Vec<Arc<dyn PhysicalExpr>>,
876    ) -> Result<Arc<dyn PhysicalExpr>> {
877        // assume the static_filter will not change during the rewrite process
878        Ok(Arc::new(InListExpr::new(
879            Arc::clone(&children[0]),
880            children[1..].to_vec(),
881            self.negated,
882            self.static_filter.as_ref().map(Arc::clone),
883        )))
884    }
885
886    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
887        self.expr.fmt_sql(f)?;
888        if self.negated {
889            write!(f, " NOT")?;
890        }
891
892        write!(f, " IN (")?;
893        for (i, expr) in self.list.iter().enumerate() {
894            if i > 0 {
895                write!(f, ", ")?;
896            }
897            expr.fmt_sql(f)?;
898        }
899        write!(f, ")")
900    }
901}
902
903impl PartialEq for InListExpr {
904    fn eq(&self, other: &Self) -> bool {
905        self.expr.eq(&other.expr)
906            && physical_exprs_bag_equal(&self.list, &other.list)
907            && self.negated == other.negated
908    }
909}
910
911impl Eq for InListExpr {}
912
913impl Hash for InListExpr {
914    fn hash<H: Hasher>(&self, state: &mut H) {
915        self.expr.hash(state);
916        self.negated.hash(state);
917        // Add `self.static_filter` when hash is available
918        self.list.hash(state);
919    }
920}
921
922/// Creates a unary expression InList
923pub fn in_list(
924    expr: Arc<dyn PhysicalExpr>,
925    list: Vec<Arc<dyn PhysicalExpr>>,
926    negated: &bool,
927    schema: &Schema,
928) -> Result<Arc<dyn PhysicalExpr>> {
929    Ok(Arc::new(InListExpr::try_new(expr, list, *negated, schema)?))
930}
931
932#[cfg(test)]
933mod tests {
934    use super::*;
935    use crate::expressions::{col, lit, try_cast};
936    use arrow::buffer::NullBuffer;
937    use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano, i256};
938    use datafusion_common::plan_err;
939    use datafusion_expr::type_coercion::binary::comparison_coercion;
940    use datafusion_physical_expr_common::physical_expr::fmt_sql;
941    use insta::assert_snapshot;
942    use itertools::Itertools;
943
944    type InListCastResult = (Arc<dyn PhysicalExpr>, Vec<Arc<dyn PhysicalExpr>>);
945
946    // Try to do the type coercion for list physical expr.
947    // It's just used in the test
948    fn in_list_cast(
949        expr: Arc<dyn PhysicalExpr>,
950        list: Vec<Arc<dyn PhysicalExpr>>,
951        input_schema: &Schema,
952    ) -> Result<InListCastResult> {
953        let expr_type = &expr.data_type(input_schema)?;
954        let list_types: Vec<DataType> = list
955            .iter()
956            .map(|list_expr| list_expr.data_type(input_schema).unwrap())
957            .collect();
958        let result_type = get_coerce_type(expr_type, &list_types);
959        match result_type {
960            None => plan_err!(
961                "Can not find compatible types to compare {expr_type} with [{}]",
962                list_types.iter().join(", ")
963            ),
964            Some(data_type) => {
965                // find the coerced type
966                let cast_expr = try_cast(expr, input_schema, data_type.clone())?;
967                let cast_list_expr = list
968                    .into_iter()
969                    .map(|list_expr| {
970                        try_cast(list_expr, input_schema, data_type.clone()).unwrap()
971                    })
972                    .collect();
973                Ok((cast_expr, cast_list_expr))
974            }
975        }
976    }
977
978    // Attempts to coerce the types of `list_type` to be comparable with the
979    // `expr_type`
980    fn get_coerce_type(expr_type: &DataType, list_type: &[DataType]) -> Option<DataType> {
981        list_type
982            .iter()
983            .try_fold(expr_type.clone(), |left_type, right_type| {
984                comparison_coercion(&left_type, right_type)
985            })
986    }
987
988    /// Test helper macro that evaluates an IN LIST expression with automatic type casting.
989    ///
990    /// # Parameters
991    /// - `$BATCH`: The `RecordBatch` containing the input data to evaluate against
992    /// - `$LIST`: A `Vec<Arc<dyn PhysicalExpr>>` of literal expressions representing the IN list values
993    /// - `$NEGATED`: A `&bool` indicating whether this is a NOT IN operation (true) or IN operation (false)
994    /// - `$EXPECTED`: A `Vec<Option<bool>>` representing the expected boolean results for each row
995    /// - `$COL`: An `Arc<dyn PhysicalExpr>` representing the column expression to evaluate
996    /// - `$SCHEMA`: A `&Schema` reference for the input batch
997    ///
998    /// This macro first applies type casting to the column and list expressions to ensure
999    /// type compatibility, then delegates to `in_list_raw!` to perform the evaluation and assertion.
1000    macro_rules! in_list {
1001        ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{
1002            let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
1003            in_list_raw!(
1004                $BATCH,
1005                cast_list_exprs,
1006                $NEGATED,
1007                $EXPECTED,
1008                cast_expr,
1009                $SCHEMA
1010            );
1011        }};
1012    }
1013
1014    /// Test helper macro that evaluates an IN LIST expression without automatic type casting.
1015    ///
1016    /// # Parameters
1017    /// - `$BATCH`: The `RecordBatch` containing the input data to evaluate against
1018    /// - `$LIST`: A `Vec<Arc<dyn PhysicalExpr>>` of literal expressions representing the IN list values
1019    /// - `$NEGATED`: A `&bool` indicating whether this is a NOT IN operation (true) or IN operation (false)
1020    /// - `$EXPECTED`: A `Vec<Option<bool>>` representing the expected boolean results for each row
1021    /// - `$COL`: An `Arc<dyn PhysicalExpr>` representing the column expression to evaluate
1022    /// - `$SCHEMA`: A `&Schema` reference for the input batch
1023    ///
1024    /// This macro creates an IN LIST expression, evaluates it against the batch, converts the result
1025    /// to a `BooleanArray`, and asserts that it matches the expected output. Use this when the column
1026    /// and list expressions are already the correct types and don't require casting.
1027    macro_rules! in_list_raw {
1028        ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{
1029            let col_expr = $COL;
1030            let expr = in_list(Arc::clone(&col_expr), $LIST, $NEGATED, $SCHEMA).unwrap();
1031            let result = expr
1032                .evaluate(&$BATCH)?
1033                .into_array($BATCH.num_rows())
1034                .expect("Failed to convert to array");
1035            let result = as_boolean_array(&result);
1036            let expected = &BooleanArray::from($EXPECTED);
1037            assert_eq!(
1038                expected,
1039                result,
1040                "Failed for: {}\n{}: {:?}",
1041                fmt_sql(expr.as_ref()),
1042                fmt_sql(col_expr.as_ref()),
1043                col_expr
1044                    .evaluate(&$BATCH)?
1045                    .into_array($BATCH.num_rows())
1046                    .unwrap()
1047            );
1048        }};
1049    }
1050
1051    /// Test case for primitive types following the standard IN LIST pattern.
1052    ///
1053    /// Each test case represents a data type with:
1054    /// - `value_in`: A value that appears in both the test array and the IN list (matches → true)
1055    /// - `value_not_in`: A value that appears in the test array but NOT in the IN list (doesn't match → false)
1056    /// - `other_list_values`: Additional values in the IN list besides `value_in`
1057    /// - `null_value`: Optional null scalar value for NULL handling tests. When None, tests
1058    ///   without nulls are run, exercising the `(false, false)` and `(false, true)` branches.
1059    struct InListPrimitiveTestCase {
1060        name: &'static str,
1061        value_in: ScalarValue,
1062        value_not_in: ScalarValue,
1063        other_list_values: Vec<ScalarValue>,
1064        null_value: Option<ScalarValue>,
1065    }
1066
1067    /// Generic test data struct for primitive types.
1068    ///
1069    /// Holds test values needed for IN LIST tests, allowing the data
1070    /// to be declared explicitly and reused across multiple types.
1071    #[derive(Clone)]
1072    struct PrimitiveTestCaseData<T> {
1073        value_in: T,
1074        value_not_in: T,
1075        other_list_values: Vec<T>,
1076    }
1077
1078    /// Helper to create test cases for any primitive type using generic data.
1079    ///
1080    /// Uses TryInto for flexible type conversion, allowing test data to be
1081    /// declared in any convertible type (e.g., i32 for all integer types).
1082    /// Creates a test case WITH null support (for null handling tests).
1083    fn primitive_test_case<T, D, F>(
1084        name: &'static str,
1085        constructor: F,
1086        data: PrimitiveTestCaseData<D>,
1087    ) -> InListPrimitiveTestCase
1088    where
1089        D: TryInto<T> + Clone,
1090        <D as TryInto<T>>::Error: Debug,
1091        F: Fn(Option<T>) -> ScalarValue,
1092        T: Clone,
1093    {
1094        InListPrimitiveTestCase {
1095            name,
1096            value_in: constructor(Some(data.value_in.try_into().unwrap())),
1097            value_not_in: constructor(Some(data.value_not_in.try_into().unwrap())),
1098            other_list_values: data
1099                .other_list_values
1100                .into_iter()
1101                .map(|v| constructor(Some(v.try_into().unwrap())))
1102                .collect(),
1103            null_value: Some(constructor(None)),
1104        }
1105    }
1106
1107    /// Helper to create test cases WITHOUT null support.
1108    /// These test cases exercise the `(false, true)` branch (no nulls, negated).
1109    fn primitive_test_case_no_nulls<T, D, F>(
1110        name: &'static str,
1111        constructor: F,
1112        data: PrimitiveTestCaseData<D>,
1113    ) -> InListPrimitiveTestCase
1114    where
1115        D: TryInto<T> + Clone,
1116        <D as TryInto<T>>::Error: Debug,
1117        F: Fn(Option<T>) -> ScalarValue,
1118        T: Clone,
1119    {
1120        InListPrimitiveTestCase {
1121            name,
1122            value_in: constructor(Some(data.value_in.try_into().unwrap())),
1123            value_not_in: constructor(Some(data.value_not_in.try_into().unwrap())),
1124            other_list_values: data
1125                .other_list_values
1126                .into_iter()
1127                .map(|v| constructor(Some(v.try_into().unwrap())))
1128                .collect(),
1129            null_value: None,
1130        }
1131    }
1132
1133    /// Runs test cases for multiple types, providing detailed SQL error messages on failure.
1134    ///
1135    /// For each test case, runs IN LIST scenarios based on whether null_value is Some or None:
1136    /// - With null_value (Some): 4 tests including null handling
1137    /// - Without null_value (None): 2 tests exercising the no-nulls paths
1138    fn run_test_cases(test_cases: Vec<InListPrimitiveTestCase>) -> Result<()> {
1139        for test_case in test_cases {
1140            let test_name = test_case.name;
1141
1142            // Get the data type from the scalar value
1143            let data_type = test_case.value_in.data_type();
1144
1145            // Build the base list: [value_in, ...other_list_values]
1146            let build_base_list = || -> Vec<Arc<dyn PhysicalExpr>> {
1147                let mut list = vec![lit(test_case.value_in.clone())];
1148                list.extend(test_case.other_list_values.iter().map(|v| lit(v.clone())));
1149                list
1150            };
1151
1152            match &test_case.null_value {
1153                Some(null_val) => {
1154                    // Tests WITH nulls in the needle array
1155                    let schema =
1156                        Schema::new(vec![Field::new("a", data_type.clone(), true)]);
1157
1158                    // Create array from scalar values: [value_in, value_not_in, None]
1159                    let array = ScalarValue::iter_to_array(vec![
1160                        test_case.value_in.clone(),
1161                        test_case.value_not_in.clone(),
1162                        null_val.clone(),
1163                    ])?;
1164
1165                    let col_a = col("a", &schema)?;
1166                    let batch = RecordBatch::try_new(
1167                        Arc::new(schema.clone()),
1168                        vec![Arc::clone(&array)],
1169                    )?;
1170
1171                    // Test 1: a IN (list) → [true, false, null]
1172                    let list = build_base_list();
1173                    in_list!(
1174                        batch,
1175                        list,
1176                        &false,
1177                        vec![Some(true), Some(false), None],
1178                        Arc::clone(&col_a),
1179                        &schema
1180                    );
1181
1182                    // Test 2: a NOT IN (list) → [false, true, null]
1183                    let list = build_base_list();
1184                    in_list!(
1185                        batch,
1186                        list,
1187                        &true,
1188                        vec![Some(false), Some(true), None],
1189                        Arc::clone(&col_a),
1190                        &schema
1191                    );
1192
1193                    // Test 3: a IN (list, NULL) → [true, null, null]
1194                    let mut list = build_base_list();
1195                    list.push(lit(null_val.clone()));
1196                    in_list!(
1197                        batch,
1198                        list,
1199                        &false,
1200                        vec![Some(true), None, None],
1201                        Arc::clone(&col_a),
1202                        &schema
1203                    );
1204
1205                    // Test 4: a NOT IN (list, NULL) → [false, null, null]
1206                    let mut list = build_base_list();
1207                    list.push(lit(null_val.clone()));
1208                    in_list!(
1209                        batch,
1210                        list,
1211                        &true,
1212                        vec![Some(false), None, None],
1213                        Arc::clone(&col_a),
1214                        &schema
1215                    );
1216                }
1217                None => {
1218                    // Tests WITHOUT nulls - exercises the (false, false) and (false, true) branches
1219                    let schema =
1220                        Schema::new(vec![Field::new("a", data_type.clone(), false)]);
1221
1222                    // Create array from scalar values: [value_in, value_not_in] (no NULL)
1223                    let array = ScalarValue::iter_to_array(vec![
1224                        test_case.value_in.clone(),
1225                        test_case.value_not_in.clone(),
1226                    ])?;
1227
1228                    let col_a = col("a", &schema)?;
1229                    let batch = RecordBatch::try_new(
1230                        Arc::new(schema.clone()),
1231                        vec![Arc::clone(&array)],
1232                    )?;
1233
1234                    // Test 1: a IN (list) → [true, false] - exercises (false, false) branch
1235                    let list = build_base_list();
1236                    in_list!(
1237                        batch,
1238                        list,
1239                        &false,
1240                        vec![Some(true), Some(false)],
1241                        Arc::clone(&col_a),
1242                        &schema
1243                    );
1244
1245                    // Test 2: a NOT IN (list) → [false, true] - exercises (false, true) branch
1246                    let list = build_base_list();
1247                    in_list!(
1248                        batch,
1249                        list,
1250                        &true,
1251                        vec![Some(false), Some(true)],
1252                        Arc::clone(&col_a),
1253                        &schema
1254                    );
1255
1256                    eprintln!(
1257                        "Test '{test_name}': exercised (false, true) branch (no nulls, negated)",
1258                    );
1259                }
1260            }
1261        }
1262
1263        Ok(())
1264    }
1265
1266    /// Test IN LIST for all integer types (Int8/16/32/64, UInt8/16/32/64).
1267    ///
1268    /// Test data: 0 (in list), 2 (not in list), [1, 3, 5] (other list values)
1269    #[test]
1270    fn in_list_int_types() -> Result<()> {
1271        let int_data = PrimitiveTestCaseData {
1272            value_in: 0,
1273            value_not_in: 2,
1274            other_list_values: vec![1, 3, 5],
1275        };
1276
1277        run_test_cases(vec![
1278            // Tests WITH nulls
1279            primitive_test_case("int8", ScalarValue::Int8, int_data.clone()),
1280            primitive_test_case("int16", ScalarValue::Int16, int_data.clone()),
1281            primitive_test_case("int32", ScalarValue::Int32, int_data.clone()),
1282            primitive_test_case("int64", ScalarValue::Int64, int_data.clone()),
1283            primitive_test_case("uint8", ScalarValue::UInt8, int_data.clone()),
1284            primitive_test_case("uint16", ScalarValue::UInt16, int_data.clone()),
1285            primitive_test_case("uint32", ScalarValue::UInt32, int_data.clone()),
1286            primitive_test_case("uint64", ScalarValue::UInt64, int_data.clone()),
1287            // Tests WITHOUT nulls - exercises (false, true) branch
1288            primitive_test_case_no_nulls("int32_no_nulls", ScalarValue::Int32, int_data),
1289        ])
1290    }
1291
1292    /// Test IN LIST for all string types (Utf8, LargeUtf8, Utf8View).
1293    ///
1294    /// Test data: "a" (in list), "d" (not in list), ["b", "c"] (other list values)
1295    #[test]
1296    fn in_list_string_types() -> Result<()> {
1297        let string_data = PrimitiveTestCaseData {
1298            value_in: "a",
1299            value_not_in: "d",
1300            other_list_values: vec!["b", "c"],
1301        };
1302
1303        run_test_cases(vec![
1304            primitive_test_case("utf8", ScalarValue::Utf8, string_data.clone()),
1305            primitive_test_case(
1306                "large_utf8",
1307                ScalarValue::LargeUtf8,
1308                string_data.clone(),
1309            ),
1310            primitive_test_case("utf8_view", ScalarValue::Utf8View, string_data),
1311        ])
1312    }
1313
1314    /// Test IN LIST for all binary types (Binary, LargeBinary, BinaryView).
1315    ///
1316    /// Test data: [1,2,3] (in list), [1,2,2] (not in list), [[4,5,6], [7,8,9]] (other list values)
1317    #[test]
1318    fn in_list_binary_types() -> Result<()> {
1319        let binary_data = PrimitiveTestCaseData {
1320            value_in: vec![1_u8, 2, 3],
1321            value_not_in: vec![1_u8, 2, 2],
1322            other_list_values: vec![vec![4_u8, 5, 6], vec![7_u8, 8, 9]],
1323        };
1324
1325        run_test_cases(vec![
1326            primitive_test_case("binary", ScalarValue::Binary, binary_data.clone()),
1327            primitive_test_case(
1328                "large_binary",
1329                ScalarValue::LargeBinary,
1330                binary_data.clone(),
1331            ),
1332            primitive_test_case("binary_view", ScalarValue::BinaryView, binary_data),
1333        ])
1334    }
1335
1336    /// Test IN LIST for date types (Date32, Date64).
1337    ///
1338    /// Test data: 0 (in list), 2 (not in list), [1, 3] (other list values)
1339    #[test]
1340    fn in_list_date_types() -> Result<()> {
1341        let date_data = PrimitiveTestCaseData {
1342            value_in: 0,
1343            value_not_in: 2,
1344            other_list_values: vec![1, 3],
1345        };
1346
1347        run_test_cases(vec![
1348            primitive_test_case("date32", ScalarValue::Date32, date_data.clone()),
1349            primitive_test_case("date64", ScalarValue::Date64, date_data),
1350        ])
1351    }
1352
1353    /// Test IN LIST for Decimal128 type.
1354    ///
1355    /// Test data: 0 (in list), 200 (not in list), [100, 300] (other list values) with precision=10, scale=2
1356    #[test]
1357    fn in_list_decimal() -> Result<()> {
1358        run_test_cases(vec![InListPrimitiveTestCase {
1359            name: "decimal128",
1360            value_in: ScalarValue::Decimal128(Some(0), 10, 2),
1361            value_not_in: ScalarValue::Decimal128(Some(200), 10, 2),
1362            other_list_values: vec![
1363                ScalarValue::Decimal128(Some(100), 10, 2),
1364                ScalarValue::Decimal128(Some(300), 10, 2),
1365            ],
1366            null_value: Some(ScalarValue::Decimal128(None, 10, 2)),
1367        }])
1368    }
1369
1370    /// Test IN LIST for timestamp types.
1371    ///
1372    /// Test data: 0 (in list), 2000 (not in list), [1000, 3000] (other list values)
1373    #[test]
1374    fn in_list_timestamp_types() -> Result<()> {
1375        run_test_cases(vec![
1376            InListPrimitiveTestCase {
1377                name: "timestamp_nanosecond",
1378                value_in: ScalarValue::TimestampNanosecond(Some(0), None),
1379                value_not_in: ScalarValue::TimestampNanosecond(Some(2000), None),
1380                other_list_values: vec![
1381                    ScalarValue::TimestampNanosecond(Some(1000), None),
1382                    ScalarValue::TimestampNanosecond(Some(3000), None),
1383                ],
1384                null_value: Some(ScalarValue::TimestampNanosecond(None, None)),
1385            },
1386            InListPrimitiveTestCase {
1387                name: "timestamp_millisecond_with_tz",
1388                value_in: ScalarValue::TimestampMillisecond(
1389                    Some(1500000),
1390                    Some("+05:00".into()),
1391                ),
1392                value_not_in: ScalarValue::TimestampMillisecond(
1393                    Some(2500000),
1394                    Some("+05:00".into()),
1395                ),
1396                other_list_values: vec![ScalarValue::TimestampMillisecond(
1397                    Some(3500000),
1398                    Some("+05:00".into()),
1399                )],
1400                null_value: Some(ScalarValue::TimestampMillisecond(
1401                    None,
1402                    Some("+05:00".into()),
1403                )),
1404            },
1405            InListPrimitiveTestCase {
1406                name: "timestamp_millisecond_mixed_tz",
1407                value_in: ScalarValue::TimestampMillisecond(
1408                    Some(1500000),
1409                    Some("+05:00".into()),
1410                ),
1411                value_not_in: ScalarValue::TimestampMillisecond(
1412                    Some(2500000),
1413                    Some("+05:00".into()),
1414                ),
1415                other_list_values: vec![
1416                    ScalarValue::TimestampMillisecond(
1417                        Some(3500000),
1418                        Some("+01:00".into()),
1419                    ),
1420                    ScalarValue::TimestampMillisecond(Some(4500000), Some("UTC".into())),
1421                ],
1422                null_value: Some(ScalarValue::TimestampMillisecond(
1423                    None,
1424                    Some("+05:00".into()),
1425                )),
1426            },
1427        ])
1428    }
1429
1430    #[test]
1431    fn in_list_float64() -> Result<()> {
1432        let schema = Schema::new(vec![Field::new("a", DataType::Float64, true)]);
1433        let a = Float64Array::from(vec![
1434            Some(0.0),
1435            Some(0.2),
1436            None,
1437            Some(f64::NAN),
1438            Some(-f64::NAN),
1439        ]);
1440        let col_a = col("a", &schema)?;
1441        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1442
1443        // expression: "a in (0.0, 0.1)"
1444        let list = vec![lit(0.0f64), lit(0.1f64)];
1445        in_list!(
1446            batch,
1447            list,
1448            &false,
1449            vec![Some(true), Some(false), None, Some(false), Some(false)],
1450            Arc::clone(&col_a),
1451            &schema
1452        );
1453
1454        // expression: "a not in (0.0, 0.1)"
1455        let list = vec![lit(0.0f64), lit(0.1f64)];
1456        in_list!(
1457            batch,
1458            list,
1459            &true,
1460            vec![Some(false), Some(true), None, Some(true), Some(true)],
1461            Arc::clone(&col_a),
1462            &schema
1463        );
1464
1465        // expression: "a in (0.0, 0.1, NULL)"
1466        let list = vec![lit(0.0f64), lit(0.1f64), lit(ScalarValue::Null)];
1467        in_list!(
1468            batch,
1469            list,
1470            &false,
1471            vec![Some(true), None, None, None, None],
1472            Arc::clone(&col_a),
1473            &schema
1474        );
1475
1476        // expression: "a not in (0.0, 0.1, NULL)"
1477        let list = vec![lit(0.0f64), lit(0.1f64), lit(ScalarValue::Null)];
1478        in_list!(
1479            batch,
1480            list,
1481            &true,
1482            vec![Some(false), None, None, None, None],
1483            Arc::clone(&col_a),
1484            &schema
1485        );
1486
1487        // expression: "a in (0.0, 0.1, NaN)"
1488        let list = vec![lit(0.0f64), lit(0.1f64), lit(f64::NAN)];
1489        in_list!(
1490            batch,
1491            list,
1492            &false,
1493            vec![Some(true), Some(false), None, Some(true), Some(false)],
1494            Arc::clone(&col_a),
1495            &schema
1496        );
1497
1498        // expression: "a not in (0.0, 0.1, NaN)"
1499        let list = vec![lit(0.0f64), lit(0.1f64), lit(f64::NAN)];
1500        in_list!(
1501            batch,
1502            list,
1503            &true,
1504            vec![Some(false), Some(true), None, Some(false), Some(true)],
1505            Arc::clone(&col_a),
1506            &schema
1507        );
1508
1509        // expression: "a in (0.0, 0.1, -NaN)"
1510        let list = vec![lit(0.0f64), lit(0.1f64), lit(-f64::NAN)];
1511        in_list!(
1512            batch,
1513            list,
1514            &false,
1515            vec![Some(true), Some(false), None, Some(false), Some(true)],
1516            Arc::clone(&col_a),
1517            &schema
1518        );
1519
1520        // expression: "a not in (0.0, 0.1, -NaN)"
1521        let list = vec![lit(0.0f64), lit(0.1f64), lit(-f64::NAN)];
1522        in_list!(
1523            batch,
1524            list,
1525            &true,
1526            vec![Some(false), Some(true), None, Some(true), Some(false)],
1527            Arc::clone(&col_a),
1528            &schema
1529        );
1530
1531        Ok(())
1532    }
1533
1534    #[test]
1535    fn in_list_bool() -> Result<()> {
1536        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
1537        let a = BooleanArray::from(vec![Some(true), None]);
1538        let col_a = col("a", &schema)?;
1539        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1540
1541        // expression: "a in (true)"
1542        let list = vec![lit(true)];
1543        in_list!(
1544            batch,
1545            list,
1546            &false,
1547            vec![Some(true), None],
1548            Arc::clone(&col_a),
1549            &schema
1550        );
1551
1552        // expression: "a not in (true)"
1553        let list = vec![lit(true)];
1554        in_list!(
1555            batch,
1556            list,
1557            &true,
1558            vec![Some(false), None],
1559            Arc::clone(&col_a),
1560            &schema
1561        );
1562
1563        // expression: "a in (true, NULL)"
1564        let list = vec![lit(true), lit(ScalarValue::Null)];
1565        in_list!(
1566            batch,
1567            list,
1568            &false,
1569            vec![Some(true), None],
1570            Arc::clone(&col_a),
1571            &schema
1572        );
1573
1574        // expression: "a not in (true, NULL)"
1575        let list = vec![lit(true), lit(ScalarValue::Null)];
1576        in_list!(
1577            batch,
1578            list,
1579            &true,
1580            vec![Some(false), None],
1581            Arc::clone(&col_a),
1582            &schema
1583        );
1584
1585        Ok(())
1586    }
1587
1588    macro_rules! test_nullable {
1589        ($COL:expr, $LIST:expr, $SCHEMA:expr, $EXPECTED:expr) => {{
1590            let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
1591            let expr = in_list(cast_expr, cast_list_exprs, &false, $SCHEMA).unwrap();
1592            let result = expr.nullable($SCHEMA)?;
1593            assert_eq!($EXPECTED, result);
1594        }};
1595    }
1596
1597    #[test]
1598    fn in_list_nullable() -> Result<()> {
1599        let schema = Schema::new(vec![
1600            Field::new("c1_nullable", DataType::Int64, true),
1601            Field::new("c2_non_nullable", DataType::Int64, false),
1602        ]);
1603
1604        let c1_nullable = col("c1_nullable", &schema)?;
1605        let c2_non_nullable = col("c2_non_nullable", &schema)?;
1606
1607        // static_filter has no nulls
1608        let list = vec![lit(1_i64), lit(2_i64)];
1609        test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1610        test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, false);
1611
1612        // static_filter has nulls
1613        let list = vec![lit(1_i64), lit(2_i64), lit(ScalarValue::Null)];
1614        test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1615        test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, true);
1616
1617        let list = vec![Arc::clone(&c1_nullable)];
1618        test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, true);
1619
1620        let list = vec![Arc::clone(&c2_non_nullable)];
1621        test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1622
1623        let list = vec![Arc::clone(&c2_non_nullable), Arc::clone(&c2_non_nullable)];
1624        test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, false);
1625
1626        Ok(())
1627    }
1628
1629    #[test]
1630    fn in_list_no_cols() -> Result<()> {
1631        // test logic when the in_list expression doesn't have any columns
1632        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1633        let a = Int32Array::from(vec![Some(1), Some(2), None]);
1634        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1635
1636        let list = vec![lit(ScalarValue::from(1i32)), lit(ScalarValue::from(6i32))];
1637
1638        // 1 IN (1, 6)
1639        let expr = lit(ScalarValue::Int32(Some(1)));
1640        in_list!(
1641            batch,
1642            list.clone(),
1643            &false,
1644            // should have three outputs, as the input batch has three rows
1645            vec![Some(true), Some(true), Some(true)],
1646            expr,
1647            &schema
1648        );
1649
1650        // 2 IN (1, 6)
1651        let expr = lit(ScalarValue::Int32(Some(2)));
1652        in_list!(
1653            batch,
1654            list.clone(),
1655            &false,
1656            // should have three outputs, as the input batch has three rows
1657            vec![Some(false), Some(false), Some(false)],
1658            expr,
1659            &schema
1660        );
1661
1662        // NULL IN (1, 6)
1663        let expr = lit(ScalarValue::Int32(None));
1664        in_list!(
1665            batch,
1666            list.clone(),
1667            &false,
1668            // should have three outputs, as the input batch has three rows
1669            vec![None, None, None],
1670            expr,
1671            &schema
1672        );
1673
1674        Ok(())
1675    }
1676
1677    #[test]
1678    fn in_list_utf8_with_dict_types() -> Result<()> {
1679        fn dict_lit(key_type: DataType, value: &str) -> Arc<dyn PhysicalExpr> {
1680            lit(ScalarValue::Dictionary(
1681                Box::new(key_type),
1682                Box::new(ScalarValue::new_utf8(value.to_string())),
1683            ))
1684        }
1685
1686        fn null_dict_lit(key_type: DataType) -> Arc<dyn PhysicalExpr> {
1687            lit(ScalarValue::Dictionary(
1688                Box::new(key_type),
1689                Box::new(ScalarValue::Utf8(None)),
1690            ))
1691        }
1692
1693        let schema = Schema::new(vec![Field::new(
1694            "a",
1695            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
1696            true,
1697        )]);
1698        let a: UInt16DictionaryArray =
1699            vec![Some("a"), Some("d"), None].into_iter().collect();
1700        let col_a = col("a", &schema)?;
1701        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1702
1703        // expression: "a in ("a", "b")"
1704        let lists = [
1705            vec![lit("a"), lit("b")],
1706            vec![
1707                dict_lit(DataType::Int8, "a"),
1708                dict_lit(DataType::UInt16, "b"),
1709            ],
1710        ];
1711        for list in lists.iter() {
1712            in_list_raw!(
1713                batch,
1714                list.clone(),
1715                &false,
1716                vec![Some(true), Some(false), None],
1717                Arc::clone(&col_a),
1718                &schema
1719            );
1720        }
1721
1722        // expression: "a not in ("a", "b")"
1723        for list in lists.iter() {
1724            in_list_raw!(
1725                batch,
1726                list.clone(),
1727                &true,
1728                vec![Some(false), Some(true), None],
1729                Arc::clone(&col_a),
1730                &schema
1731            );
1732        }
1733
1734        // expression: "a in ("a", "b", null)"
1735        let lists = [
1736            vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
1737            vec![
1738                dict_lit(DataType::Int8, "a"),
1739                dict_lit(DataType::UInt16, "b"),
1740                null_dict_lit(DataType::UInt16),
1741            ],
1742        ];
1743        for list in lists.iter() {
1744            in_list_raw!(
1745                batch,
1746                list.clone(),
1747                &false,
1748                vec![Some(true), None, None],
1749                Arc::clone(&col_a),
1750                &schema
1751            );
1752        }
1753
1754        // expression: "a not in ("a", "b", null)"
1755        for list in lists.iter() {
1756            in_list_raw!(
1757                batch,
1758                list.clone(),
1759                &true,
1760                vec![Some(false), None, None],
1761                Arc::clone(&col_a),
1762                &schema
1763            );
1764        }
1765
1766        Ok(())
1767    }
1768
1769    #[test]
1770    fn test_fmt_sql_1() -> Result<()> {
1771        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1772        let col_a = col("a", &schema)?;
1773
1774        // Test: a IN ('a', 'b')
1775        let list = vec![lit("a"), lit("b")];
1776        let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?;
1777        let sql_string = fmt_sql(expr.as_ref()).to_string();
1778        let display_string = expr.to_string();
1779        assert_snapshot!(sql_string, @"a IN (a, b)");
1780        assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b])");
1781        Ok(())
1782    }
1783
1784    #[test]
1785    fn test_fmt_sql_2() -> Result<()> {
1786        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1787        let col_a = col("a", &schema)?;
1788
1789        // Test: a NOT IN ('a', 'b')
1790        let list = vec![lit("a"), lit("b")];
1791        let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?;
1792        let sql_string = fmt_sql(expr.as_ref()).to_string();
1793        let display_string = expr.to_string();
1794
1795        assert_snapshot!(sql_string, @"a NOT IN (a, b)");
1796        assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b])");
1797        Ok(())
1798    }
1799
1800    #[test]
1801    fn test_fmt_sql_3() -> Result<()> {
1802        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1803        let col_a = col("a", &schema)?;
1804        // Test: a IN ('a', 'b', NULL)
1805        let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))];
1806        let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?;
1807        let sql_string = fmt_sql(expr.as_ref()).to_string();
1808        let display_string = expr.to_string();
1809
1810        assert_snapshot!(sql_string, @"a IN (a, b, NULL)");
1811        assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b, NULL])");
1812        Ok(())
1813    }
1814
1815    #[test]
1816    fn test_fmt_sql_4() -> Result<()> {
1817        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1818        let col_a = col("a", &schema)?;
1819        // Test: a NOT IN ('a', 'b', NULL)
1820        let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))];
1821        let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?;
1822        let sql_string = fmt_sql(expr.as_ref()).to_string();
1823        let display_string = expr.to_string();
1824        assert_snapshot!(sql_string, @"a NOT IN (a, b, NULL)");
1825        assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b, NULL])");
1826        Ok(())
1827    }
1828
1829    #[test]
1830    fn in_list_struct() -> Result<()> {
1831        // Create schema with a struct column
1832        let struct_fields = Fields::from(vec![
1833            Field::new("x", DataType::Int32, false),
1834            Field::new("y", DataType::Utf8, false),
1835        ]);
1836        let schema = Schema::new(vec![Field::new(
1837            "a",
1838            DataType::Struct(struct_fields.clone()),
1839            true,
1840        )]);
1841
1842        // Create test data: array of structs
1843        let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
1844        let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1845        let struct_array =
1846            StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
1847
1848        let col_a = col("a", &schema)?;
1849        let batch =
1850            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1851
1852        // Create literal structs for the IN list
1853        // Struct {x: 1, y: "a"}
1854        let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1855            struct_fields.clone(),
1856            vec![
1857                Arc::new(Int32Array::from(vec![1])),
1858                Arc::new(StringArray::from(vec!["a"])),
1859            ],
1860            None,
1861        )));
1862
1863        // Struct {x: 3, y: "c"}
1864        let struct3 = ScalarValue::Struct(Arc::new(StructArray::new(
1865            struct_fields.clone(),
1866            vec![
1867                Arc::new(Int32Array::from(vec![3])),
1868                Arc::new(StringArray::from(vec!["c"])),
1869            ],
1870            None,
1871        )));
1872
1873        // Test: a IN ({1, "a"}, {3, "c"})
1874        let list = vec![lit(struct1.clone()), lit(struct3.clone())];
1875        in_list_raw!(
1876            batch,
1877            list.clone(),
1878            &false,
1879            vec![Some(true), Some(false), Some(true)],
1880            Arc::clone(&col_a),
1881            &schema
1882        );
1883
1884        // Test: a NOT IN ({1, "a"}, {3, "c"})
1885        in_list_raw!(
1886            batch,
1887            list,
1888            &true,
1889            vec![Some(false), Some(true), Some(false)],
1890            Arc::clone(&col_a),
1891            &schema
1892        );
1893
1894        Ok(())
1895    }
1896
1897    #[test]
1898    fn in_list_struct_with_nulls() -> Result<()> {
1899        // Create schema with a struct column
1900        let struct_fields = Fields::from(vec![
1901            Field::new("x", DataType::Int32, false),
1902            Field::new("y", DataType::Utf8, false),
1903        ]);
1904        let schema = Schema::new(vec![Field::new(
1905            "a",
1906            DataType::Struct(struct_fields.clone()),
1907            true,
1908        )]);
1909
1910        // Create test data with a null struct
1911        let x_array = Arc::new(Int32Array::from(vec![1, 2]));
1912        let y_array = Arc::new(StringArray::from(vec!["a", "b"]));
1913        let struct_array = StructArray::new(
1914            struct_fields.clone(),
1915            vec![x_array, y_array],
1916            Some(NullBuffer::from(vec![true, false])),
1917        );
1918
1919        let col_a = col("a", &schema)?;
1920        let batch =
1921            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1922
1923        // Create literal struct for the IN list
1924        let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1925            struct_fields.clone(),
1926            vec![
1927                Arc::new(Int32Array::from(vec![1])),
1928                Arc::new(StringArray::from(vec!["a"])),
1929            ],
1930            None,
1931        )));
1932
1933        // Test: a IN ({1, "a"})
1934        let list = vec![lit(struct1.clone())];
1935        in_list_raw!(
1936            batch,
1937            list.clone(),
1938            &false,
1939            vec![Some(true), None],
1940            Arc::clone(&col_a),
1941            &schema
1942        );
1943
1944        // Test: a NOT IN ({1, "a"})
1945        in_list_raw!(
1946            batch,
1947            list,
1948            &true,
1949            vec![Some(false), None],
1950            Arc::clone(&col_a),
1951            &schema
1952        );
1953
1954        Ok(())
1955    }
1956
1957    #[test]
1958    fn in_list_struct_with_null_in_list() -> Result<()> {
1959        // Create schema with a struct column
1960        let struct_fields = Fields::from(vec![
1961            Field::new("x", DataType::Int32, false),
1962            Field::new("y", DataType::Utf8, false),
1963        ]);
1964        let schema = Schema::new(vec![Field::new(
1965            "a",
1966            DataType::Struct(struct_fields.clone()),
1967            true,
1968        )]);
1969
1970        // Create test data
1971        let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
1972        let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1973        let struct_array =
1974            StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
1975
1976        let col_a = col("a", &schema)?;
1977        let batch =
1978            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1979
1980        // Create literal structs including a NULL
1981        let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1982            struct_fields.clone(),
1983            vec![
1984                Arc::new(Int32Array::from(vec![1])),
1985                Arc::new(StringArray::from(vec!["a"])),
1986            ],
1987            None,
1988        )));
1989
1990        let null_struct = ScalarValue::Struct(Arc::new(StructArray::new_null(
1991            struct_fields.clone(),
1992            1,
1993        )));
1994
1995        // Test: a IN ({1, "a"}, NULL)
1996        let list = vec![lit(struct1), lit(null_struct.clone())];
1997        in_list_raw!(
1998            batch,
1999            list.clone(),
2000            &false,
2001            vec![Some(true), None, None],
2002            Arc::clone(&col_a),
2003            &schema
2004        );
2005
2006        // Test: a NOT IN ({1, "a"}, NULL)
2007        in_list_raw!(
2008            batch,
2009            list,
2010            &true,
2011            vec![Some(false), None, None],
2012            Arc::clone(&col_a),
2013            &schema
2014        );
2015
2016        Ok(())
2017    }
2018
2019    #[test]
2020    fn in_list_nested_struct() -> Result<()> {
2021        // Create nested struct schema
2022        let inner_struct_fields = Fields::from(vec![
2023            Field::new("a", DataType::Int32, false),
2024            Field::new("b", DataType::Utf8, false),
2025        ]);
2026        let outer_struct_fields = Fields::from(vec![
2027            Field::new(
2028                "inner",
2029                DataType::Struct(inner_struct_fields.clone()),
2030                false,
2031            ),
2032            Field::new("c", DataType::Int32, false),
2033        ]);
2034        let schema = Schema::new(vec![Field::new(
2035            "x",
2036            DataType::Struct(outer_struct_fields.clone()),
2037            true,
2038        )]);
2039
2040        // Create test data with nested structs
2041        let inner1 = Arc::new(StructArray::new(
2042            inner_struct_fields.clone(),
2043            vec![
2044                Arc::new(Int32Array::from(vec![1, 2])),
2045                Arc::new(StringArray::from(vec!["x", "y"])),
2046            ],
2047            None,
2048        ));
2049        let c_array = Arc::new(Int32Array::from(vec![10, 20]));
2050        let outer_array =
2051            StructArray::new(outer_struct_fields.clone(), vec![inner1, c_array], None);
2052
2053        let col_x = col("x", &schema)?;
2054        let batch =
2055            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(outer_array)])?;
2056
2057        // Create a nested struct literal matching the first row
2058        let inner_match = Arc::new(StructArray::new(
2059            inner_struct_fields.clone(),
2060            vec![
2061                Arc::new(Int32Array::from(vec![1])),
2062                Arc::new(StringArray::from(vec!["x"])),
2063            ],
2064            None,
2065        ));
2066        let outer_match = ScalarValue::Struct(Arc::new(StructArray::new(
2067            outer_struct_fields.clone(),
2068            vec![inner_match, Arc::new(Int32Array::from(vec![10]))],
2069            None,
2070        )));
2071
2072        // Test: x IN ({{1, "x"}, 10})
2073        let list = vec![lit(outer_match)];
2074        in_list_raw!(
2075            batch,
2076            list.clone(),
2077            &false,
2078            vec![Some(true), Some(false)],
2079            Arc::clone(&col_x),
2080            &schema
2081        );
2082
2083        // Test: x NOT IN ({{1, "x"}, 10})
2084        in_list_raw!(
2085            batch,
2086            list,
2087            &true,
2088            vec![Some(false), Some(true)],
2089            Arc::clone(&col_x),
2090            &schema
2091        );
2092
2093        Ok(())
2094    }
2095
2096    #[test]
2097    fn in_list_struct_with_exprs_not_array() -> Result<()> {
2098        // Test InList using expressions (not the array constructor) with structs
2099        // By using InListExpr::new directly, we bypass the array optimization
2100        // and use the Exprs variant, testing the expression evaluation path
2101
2102        // Create schema with a struct column {x: Int32, y: Utf8}
2103        let struct_fields = Fields::from(vec![
2104            Field::new("x", DataType::Int32, false),
2105            Field::new("y", DataType::Utf8, false),
2106        ]);
2107        let schema = Schema::new(vec![Field::new(
2108            "a",
2109            DataType::Struct(struct_fields.clone()),
2110            true,
2111        )]);
2112
2113        // Create test data: array of structs [{1, "a"}, {2, "b"}, {3, "c"}]
2114        let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
2115        let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
2116        let struct_array =
2117            StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
2118
2119        let col_a = col("a", &schema)?;
2120        let batch =
2121            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
2122
2123        // Create struct literals with the SAME shape (so types are compatible)
2124        // Struct {x: 1, y: "a"}
2125        let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
2126            struct_fields.clone(),
2127            vec![
2128                Arc::new(Int32Array::from(vec![1])),
2129                Arc::new(StringArray::from(vec!["a"])),
2130            ],
2131            None,
2132        )));
2133
2134        // Struct {x: 3, y: "c"}
2135        let struct3 = ScalarValue::Struct(Arc::new(StructArray::new(
2136            struct_fields.clone(),
2137            vec![
2138                Arc::new(Int32Array::from(vec![3])),
2139                Arc::new(StringArray::from(vec!["c"])),
2140            ],
2141            None,
2142        )));
2143
2144        // Create list of struct expressions
2145        let list = vec![lit(struct1), lit(struct3)];
2146
2147        // Use InListExpr::new directly (not in_list()) to bypass array optimization
2148        // This creates an InList without a static filter
2149        let expr = Arc::new(InListExpr::new(Arc::clone(&col_a), list, false, None));
2150
2151        // Verify that the expression doesn't have a static filter
2152        // by checking the display string does NOT contain "(SET)"
2153        let display_string = expr.to_string();
2154        assert!(
2155            !display_string.contains("(SET)"),
2156            "Expected display string to NOT contain '(SET)' (should use Exprs variant), but got: {display_string}",
2157        );
2158
2159        // Evaluate the expression
2160        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2161        let result = as_boolean_array(&result);
2162
2163        // Expected: first row {1, "a"} matches struct1,
2164        //           second row {2, "b"} doesn't match,
2165        //           third row {3, "c"} matches struct3
2166        let expected = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
2167        assert_eq!(result, &expected);
2168
2169        // Test NOT IN as well
2170        let expr_not = Arc::new(InListExpr::new(
2171            Arc::clone(&col_a),
2172            vec![
2173                lit(ScalarValue::Struct(Arc::new(StructArray::new(
2174                    struct_fields.clone(),
2175                    vec![
2176                        Arc::new(Int32Array::from(vec![1])),
2177                        Arc::new(StringArray::from(vec!["a"])),
2178                    ],
2179                    None,
2180                )))),
2181                lit(ScalarValue::Struct(Arc::new(StructArray::new(
2182                    struct_fields.clone(),
2183                    vec![
2184                        Arc::new(Int32Array::from(vec![3])),
2185                        Arc::new(StringArray::from(vec!["c"])),
2186                    ],
2187                    None,
2188                )))),
2189            ],
2190            true,
2191            None,
2192        ));
2193
2194        let result_not = expr_not.evaluate(&batch)?.into_array(batch.num_rows())?;
2195        let result_not = as_boolean_array(&result_not);
2196
2197        let expected_not = BooleanArray::from(vec![Some(false), Some(true), Some(false)]);
2198        assert_eq!(result_not, &expected_not);
2199
2200        Ok(())
2201    }
2202
2203    #[test]
2204    fn test_in_list_null_handling_comprehensive() -> Result<()> {
2205        // Comprehensive test demonstrating SQL three-valued logic for IN expressions
2206        // This test explicitly shows all possible outcomes: true, false, and null
2207        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2208
2209        // Test data: [1, 2, 3, null]
2210        // - 1 will match in both lists
2211        // - 2 will not match in either list
2212        // - 3 will not match in either list
2213        // - null is always null
2214        let a = Int64Array::from(vec![Some(1), Some(2), Some(3), None]);
2215        let col_a = col("a", &schema)?;
2216        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2217
2218        // Case 1: List WITHOUT null - demonstrates true/false/null outcomes
2219        // "a IN (1, 4)" - 1 matches, 2 and 3 don't match, null is null
2220        let list = vec![lit(1i64), lit(4i64)];
2221        in_list!(
2222            batch,
2223            list,
2224            &false,
2225            vec![
2226                Some(true),  // 1 is in the list → true
2227                Some(false), // 2 is not in the list → false
2228                Some(false), // 3 is not in the list → false
2229                None,        // null IN (...) → null (SQL three-valued logic)
2230            ],
2231            Arc::clone(&col_a),
2232            &schema
2233        );
2234
2235        // Case 2: List WITH null - demonstrates null propagation for non-matches
2236        // "a IN (1, NULL)" - 1 matches (true), 2/3 don't match but list has null (null), null is null
2237        let list = vec![lit(1i64), lit(ScalarValue::Int64(None))];
2238        in_list!(
2239            batch,
2240            list,
2241            &false,
2242            vec![
2243                Some(true), // 1 is in the list → true (found match)
2244                None, // 2 is not in list, but list has NULL → null (might match NULL)
2245                None, // 3 is not in list, but list has NULL → null (might match NULL)
2246                None, // null IN (...) → null (SQL three-valued logic)
2247            ],
2248            Arc::clone(&col_a),
2249            &schema
2250        );
2251
2252        Ok(())
2253    }
2254
2255    #[test]
2256    fn test_in_list_with_only_nulls() -> Result<()> {
2257        // Edge case: IN list contains ONLY null values
2258        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2259        let a = Int64Array::from(vec![Some(1), Some(2), None]);
2260        let col_a = col("a", &schema)?;
2261        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2262
2263        // "a IN (NULL, NULL)" - list has only nulls
2264        let list = vec![lit(ScalarValue::Int64(None)), lit(ScalarValue::Int64(None))];
2265
2266        // All results should be NULL because:
2267        // - Non-null values (1, 2) can't match anything concrete, but list might contain matching value
2268        // - NULL value is always NULL in IN expressions
2269        in_list!(
2270            batch,
2271            list.clone(),
2272            &false,
2273            vec![None, None, None],
2274            Arc::clone(&col_a),
2275            &schema
2276        );
2277
2278        // "a NOT IN (NULL, NULL)" - list has only nulls
2279        // All results should still be NULL due to three-valued logic
2280        in_list!(
2281            batch,
2282            list,
2283            &true,
2284            vec![None, None, None],
2285            Arc::clone(&col_a),
2286            &schema
2287        );
2288
2289        Ok(())
2290    }
2291
2292    #[test]
2293    fn test_in_list_multiple_nulls_deduplication() -> Result<()> {
2294        // Test that multiple NULLs in the list are handled correctly
2295        // This verifies deduplication doesn't break null handling
2296        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2297        let col_a = col("a", &schema)?;
2298
2299        // Create array with multiple nulls: [1, 2, NULL, NULL, 3, NULL]
2300        let array = Arc::new(Int64Array::from(vec![
2301            Some(1),
2302            Some(2),
2303            None,
2304            None,
2305            Some(3),
2306            None,
2307        ])) as ArrayRef;
2308
2309        // Create InListExpr from array
2310        let expr = Arc::new(InListExpr::try_new_from_array(
2311            Arc::clone(&col_a),
2312            array,
2313            false,
2314        )?) as Arc<dyn PhysicalExpr>;
2315
2316        // Create test data: [1, 2, 3, 4, null]
2317        let a = Int64Array::from(vec![Some(1), Some(2), Some(3), Some(4), None]);
2318        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2319
2320        // Evaluate the expression
2321        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2322        let result = as_boolean_array(&result);
2323
2324        // Expected behavior with multiple NULLs in list:
2325        // - Values in the list (1,2,3) → true
2326        // - Values not in the list (4) → NULL (because list contains NULL)
2327        // - NULL input → NULL
2328        let expected = BooleanArray::from(vec![
2329            Some(true), // 1 is in list
2330            Some(true), // 2 is in list
2331            Some(true), // 3 is in list
2332            None,       // 4 not in list, but list has NULLs
2333            None,       // NULL input
2334        ]);
2335        assert_eq!(result, &expected);
2336
2337        Ok(())
2338    }
2339
2340    #[test]
2341    fn test_not_in_null_handling_comprehensive() -> Result<()> {
2342        // Comprehensive test demonstrating SQL three-valued logic for NOT IN expressions
2343        // This test explicitly shows all possible outcomes for NOT IN: true, false, and null
2344        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2345
2346        // Test data: [1, 2, 3, null]
2347        let a = Int64Array::from(vec![Some(1), Some(2), Some(3), None]);
2348        let col_a = col("a", &schema)?;
2349        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2350
2351        // Case 1: List WITHOUT null - demonstrates true/false/null outcomes for NOT IN
2352        // "a NOT IN (1, 4)" - 1 matches (false), 2 and 3 don't match (true), null is null
2353        let list = vec![lit(1i64), lit(4i64)];
2354        in_list!(
2355            batch,
2356            list,
2357            &true,
2358            vec![
2359                Some(false), // 1 is in the list → NOT IN returns false
2360                Some(true),  // 2 is not in the list → NOT IN returns true
2361                Some(true),  // 3 is not in the list → NOT IN returns true
2362                None,        // null NOT IN (...) → null (SQL three-valued logic)
2363            ],
2364            Arc::clone(&col_a),
2365            &schema
2366        );
2367
2368        // Case 2: List WITH null - demonstrates null propagation for NOT IN
2369        // "a NOT IN (1, NULL)" - 1 matches (false), 2/3 don't match but list has null (null), null is null
2370        let list = vec![lit(1i64), lit(ScalarValue::Int64(None))];
2371        in_list!(
2372            batch,
2373            list,
2374            &true,
2375            vec![
2376                Some(false), // 1 is in the list → NOT IN returns false
2377                None, // 2 is not in known values, but list has NULL → null (can't prove it's not in list)
2378                None, // 3 is not in known values, but list has NULL → null (can't prove it's not in list)
2379                None, // null NOT IN (...) → null (SQL three-valued logic)
2380            ],
2381            Arc::clone(&col_a),
2382            &schema
2383        );
2384
2385        Ok(())
2386    }
2387
2388    #[test]
2389    fn test_in_list_null_type_column() -> Result<()> {
2390        // Test with a column that has DataType::Null (not just nullable values)
2391        // All values in a NullArray are null by definition
2392        let schema = Schema::new(vec![Field::new("a", DataType::Null, true)]);
2393        let a = NullArray::new(3);
2394        let col_a = col("a", &schema)?;
2395        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2396
2397        // "null_column IN (1, 2)" - comparing Null type against Int64 list
2398        // Note: This tests type coercion behavior between Null and Int64
2399        let list = vec![lit(1i64), lit(2i64)];
2400
2401        // All results should be NULL because:
2402        // - Every value in the column is null (DataType::Null)
2403        // - null IN (anything) always returns null per SQL three-valued logic
2404        in_list!(
2405            batch,
2406            list.clone(),
2407            &false,
2408            vec![None, None, None],
2409            Arc::clone(&col_a),
2410            &schema
2411        );
2412
2413        // "null_column NOT IN (1, 2)"
2414        // Same behavior for NOT IN - null NOT IN (anything) is still null
2415        in_list!(
2416            batch,
2417            list,
2418            &true,
2419            vec![None, None, None],
2420            Arc::clone(&col_a),
2421            &schema
2422        );
2423
2424        Ok(())
2425    }
2426
2427    #[test]
2428    fn test_in_list_null_type_list() -> Result<()> {
2429        // Test with a list that has DataType::Null
2430        let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2431        let a = Int64Array::from(vec![Some(1), Some(2), None]);
2432        let col_a = col("a", &schema)?;
2433
2434        // Create a NullArray as the list
2435        let null_array = Arc::new(NullArray::new(2)) as ArrayRef;
2436
2437        // Try to create InListExpr with a NullArray list
2438        // This tests whether try_new_from_array can handle Null type arrays
2439        let expr = Arc::new(InListExpr::try_new_from_array(
2440            Arc::clone(&col_a),
2441            null_array,
2442            false,
2443        )?) as Arc<dyn PhysicalExpr>;
2444        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2445        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2446        let result = as_boolean_array(&result);
2447
2448        // If it succeeds, all results should be NULL
2449        // because the list contains only null type values
2450        let expected = BooleanArray::from(vec![None, None, None]);
2451        assert_eq!(result, &expected);
2452
2453        Ok(())
2454    }
2455
2456    #[test]
2457    fn test_in_list_null_type_both() -> Result<()> {
2458        // Test when both column and list are DataType::Null
2459        let schema = Schema::new(vec![Field::new("a", DataType::Null, true)]);
2460        let a = NullArray::new(3);
2461        let col_a = col("a", &schema)?;
2462
2463        // Create a NullArray as the list
2464        let null_array = Arc::new(NullArray::new(2)) as ArrayRef;
2465
2466        // Try to create InListExpr with both Null types
2467        let expr = Arc::new(InListExpr::try_new_from_array(
2468            Arc::clone(&col_a),
2469            null_array,
2470            false,
2471        )?) as Arc<dyn PhysicalExpr>;
2472
2473        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2474        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2475        let result = as_boolean_array(&result);
2476
2477        // If successful, all results should be NULL
2478        // null IN [null, null] -> null
2479        let expected = BooleanArray::from(vec![None, None, None]);
2480        assert_eq!(result, &expected);
2481
2482        Ok(())
2483    }
2484
2485    #[test]
2486    fn test_in_list_comprehensive_null_handling() -> Result<()> {
2487        // Comprehensive test for IN LIST operations with various NULL handling scenarios.
2488        // This test covers the key cases validated against DuckDB as the source of truth.
2489        //
2490        // Note: Some scalar literal tests (like NULL IN (1, 2)) are omitted as they
2491        // appear to expose an issue with static filter optimization. These are covered
2492        // by existing tests like in_list_no_cols().
2493
2494        let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2495        let col_b = col("b", &schema)?;
2496        let null_i32 = ScalarValue::Int32(None);
2497
2498        // Helper to create a batch
2499        let make_batch = |values: Vec<Option<i32>>| -> Result<RecordBatch> {
2500            let array = Arc::new(Int32Array::from(values));
2501            Ok(RecordBatch::try_new(Arc::clone(&schema), vec![array])?)
2502        };
2503
2504        // Helper to run a test
2505        let run_test = |batch: &RecordBatch,
2506                        expr: Arc<dyn PhysicalExpr>,
2507                        list: Vec<Arc<dyn PhysicalExpr>>,
2508                        expected: Vec<Option<bool>>|
2509         -> Result<()> {
2510            let in_expr = in_list(expr, list, &false, schema.as_ref())?;
2511            let result = in_expr.evaluate(batch)?.into_array(batch.num_rows())?;
2512            let result = as_boolean_array(&result);
2513            assert_eq!(result, &BooleanArray::from(expected));
2514            Ok(())
2515        };
2516
2517        // ========================================================================
2518        // COLUMN TESTS - col(b) IN [1, 2]
2519        // ========================================================================
2520
2521        // [1] IN (1, 2) => [TRUE]
2522        let batch = make_batch(vec![Some(1)])?;
2523        run_test(
2524            &batch,
2525            Arc::clone(&col_b),
2526            vec![lit(1i32), lit(2i32)],
2527            vec![Some(true)],
2528        )?;
2529
2530        // [1, 2] IN (1, 2) => [TRUE, TRUE]
2531        let batch = make_batch(vec![Some(1), Some(2)])?;
2532        run_test(
2533            &batch,
2534            Arc::clone(&col_b),
2535            vec![lit(1i32), lit(2i32)],
2536            vec![Some(true), Some(true)],
2537        )?;
2538
2539        // [3, 4] IN (1, 2) => [FALSE, FALSE]
2540        let batch = make_batch(vec![Some(3), Some(4)])?;
2541        run_test(
2542            &batch,
2543            Arc::clone(&col_b),
2544            vec![lit(1i32), lit(2i32)],
2545            vec![Some(false), Some(false)],
2546        )?;
2547
2548        // [1, NULL] IN (1, 2) => [TRUE, NULL]
2549        let batch = make_batch(vec![Some(1), None])?;
2550        run_test(
2551            &batch,
2552            Arc::clone(&col_b),
2553            vec![lit(1i32), lit(2i32)],
2554            vec![Some(true), None],
2555        )?;
2556
2557        // [3, NULL] IN (1, 2) => [FALSE, NULL] (no match, NULL is NULL)
2558        let batch = make_batch(vec![Some(3), None])?;
2559        run_test(
2560            &batch,
2561            Arc::clone(&col_b),
2562            vec![lit(1i32), lit(2i32)],
2563            vec![Some(false), None],
2564        )?;
2565
2566        // ========================================================================
2567        // COLUMN WITH NULL IN LIST - col(b) IN [NULL, 1]
2568        // ========================================================================
2569
2570        // [1] IN (NULL, 1) => [TRUE] (found match)
2571        let batch = make_batch(vec![Some(1)])?;
2572        run_test(
2573            &batch,
2574            Arc::clone(&col_b),
2575            vec![lit(null_i32.clone()), lit(1i32)],
2576            vec![Some(true)],
2577        )?;
2578
2579        // [2] IN (NULL, 1) => [NULL] (no match, but list has NULL)
2580        let batch = make_batch(vec![Some(2)])?;
2581        run_test(
2582            &batch,
2583            Arc::clone(&col_b),
2584            vec![lit(null_i32.clone()), lit(1i32)],
2585            vec![None],
2586        )?;
2587
2588        // [NULL] IN (NULL, 1) => [NULL]
2589        let batch = make_batch(vec![None])?;
2590        run_test(
2591            &batch,
2592            Arc::clone(&col_b),
2593            vec![lit(null_i32.clone()), lit(1i32)],
2594            vec![None],
2595        )?;
2596
2597        // ========================================================================
2598        // COLUMN WITH ALL NULLS IN LIST - col(b) IN [NULL, NULL]
2599        // ========================================================================
2600
2601        // [1] IN (NULL, NULL) => [NULL]
2602        let batch = make_batch(vec![Some(1)])?;
2603        run_test(
2604            &batch,
2605            Arc::clone(&col_b),
2606            vec![lit(null_i32.clone()), lit(null_i32.clone())],
2607            vec![None],
2608        )?;
2609
2610        // [NULL] IN (NULL, NULL) => [NULL]
2611        let batch = make_batch(vec![None])?;
2612        run_test(
2613            &batch,
2614            Arc::clone(&col_b),
2615            vec![lit(null_i32.clone()), lit(null_i32.clone())],
2616            vec![None],
2617        )?;
2618
2619        // ========================================================================
2620        // LITERAL IN LIST WITH COLUMN - lit(1) IN [2, col(b)]
2621        // ========================================================================
2622
2623        // 1 IN (2, [1]) => [TRUE] (matches column value)
2624        let batch = make_batch(vec![Some(1)])?;
2625        run_test(
2626            &batch,
2627            lit(1i32),
2628            vec![lit(2i32), Arc::clone(&col_b)],
2629            vec![Some(true)],
2630        )?;
2631
2632        // 1 IN (2, [3]) => [FALSE] (no match)
2633        let batch = make_batch(vec![Some(3)])?;
2634        run_test(
2635            &batch,
2636            lit(1i32),
2637            vec![lit(2i32), Arc::clone(&col_b)],
2638            vec![Some(false)],
2639        )?;
2640
2641        // 1 IN (2, [NULL]) => [NULL] (no match, column is NULL)
2642        let batch = make_batch(vec![None])?;
2643        run_test(
2644            &batch,
2645            lit(1i32),
2646            vec![lit(2i32), Arc::clone(&col_b)],
2647            vec![None],
2648        )?;
2649
2650        // ========================================================================
2651        // COLUMN IN LIST CONTAINING ITSELF - col(b) IN [1, col(b)]
2652        // ========================================================================
2653
2654        // [1] IN (1, [1]) => [TRUE] (always matches - either list literal or itself)
2655        let batch = make_batch(vec![Some(1)])?;
2656        run_test(
2657            &batch,
2658            Arc::clone(&col_b),
2659            vec![lit(1i32), Arc::clone(&col_b)],
2660            vec![Some(true)],
2661        )?;
2662
2663        // [2] IN (1, [2]) => [TRUE] (matches itself)
2664        let batch = make_batch(vec![Some(2)])?;
2665        run_test(
2666            &batch,
2667            Arc::clone(&col_b),
2668            vec![lit(1i32), Arc::clone(&col_b)],
2669            vec![Some(true)],
2670        )?;
2671
2672        // [NULL] IN (1, [NULL]) => [NULL] (NULL is never equal to anything)
2673        let batch = make_batch(vec![None])?;
2674        run_test(
2675            &batch,
2676            Arc::clone(&col_b),
2677            vec![lit(1i32), Arc::clone(&col_b)],
2678            vec![None],
2679        )?;
2680
2681        Ok(())
2682    }
2683
2684    #[test]
2685    fn test_in_list_scalar_literal_cases() -> Result<()> {
2686        // Test scalar literal cases (both NULL and non-NULL) to ensure SQL three-valued
2687        // logic is correctly implemented. This covers the important case where a scalar
2688        // value is tested against a list containing NULL.
2689
2690        let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2691        let null_i32 = ScalarValue::Int32(None);
2692
2693        // Helper to create a batch
2694        let make_batch = |values: Vec<Option<i32>>| -> Result<RecordBatch> {
2695            let array = Arc::new(Int32Array::from(values));
2696            Ok(RecordBatch::try_new(Arc::clone(&schema), vec![array])?)
2697        };
2698
2699        // Helper to run a test
2700        let run_test = |batch: &RecordBatch,
2701                        expr: Arc<dyn PhysicalExpr>,
2702                        list: Vec<Arc<dyn PhysicalExpr>>,
2703                        negated: bool,
2704                        expected: Vec<Option<bool>>|
2705         -> Result<()> {
2706            let in_expr = in_list(expr, list, &negated, schema.as_ref())?;
2707            let result = in_expr.evaluate(batch)?.into_array(batch.num_rows())?;
2708            let result = as_boolean_array(&result);
2709            let expected_array = BooleanArray::from(expected);
2710            assert_eq!(
2711                result,
2712                &expected_array,
2713                "Expected {:?}, got {:?}",
2714                expected_array,
2715                result.iter().collect::<Vec<_>>()
2716            );
2717            Ok(())
2718        };
2719
2720        let batch = make_batch(vec![Some(1)])?;
2721
2722        // ========================================================================
2723        // NULL LITERAL TESTS
2724        // According to SQL semantics, NULL IN (any_list) should always return NULL
2725        // ========================================================================
2726
2727        // NULL IN (1, 1) => NULL
2728        run_test(
2729            &batch,
2730            lit(null_i32.clone()),
2731            vec![lit(1i32), lit(1i32)],
2732            false,
2733            vec![None],
2734        )?;
2735
2736        // NULL IN (NULL, 1) => NULL
2737        run_test(
2738            &batch,
2739            lit(null_i32.clone()),
2740            vec![lit(null_i32.clone()), lit(1i32)],
2741            false,
2742            vec![None],
2743        )?;
2744
2745        // NULL IN (NULL, NULL) => NULL
2746        run_test(
2747            &batch,
2748            lit(null_i32.clone()),
2749            vec![lit(null_i32.clone()), lit(null_i32.clone())],
2750            false,
2751            vec![None],
2752        )?;
2753
2754        // ========================================================================
2755        // NON-NULL SCALAR LITERALS WITH NULL IN LIST - Int32
2756        // When a scalar value is NOT in a list containing NULL, the result is NULL
2757        // When a scalar value IS in the list, the result is TRUE (NULL doesn't matter)
2758        // ========================================================================
2759
2760        // 3 IN (0, 1, 2, NULL) => NULL (not in list, but list has NULL)
2761        run_test(
2762            &batch,
2763            lit(3i32),
2764            vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2765            false,
2766            vec![None],
2767        )?;
2768
2769        // 3 NOT IN (0, 1, 2, NULL) => NULL (not in list, but list has NULL)
2770        run_test(
2771            &batch,
2772            lit(3i32),
2773            vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2774            true,
2775            vec![None],
2776        )?;
2777
2778        // 1 IN (0, 1, 2, NULL) => TRUE (found match, NULL doesn't matter)
2779        run_test(
2780            &batch,
2781            lit(1i32),
2782            vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2783            false,
2784            vec![Some(true)],
2785        )?;
2786
2787        // 1 NOT IN (0, 1, 2, NULL) => FALSE (found match, NULL doesn't matter)
2788        run_test(
2789            &batch,
2790            lit(1i32),
2791            vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2792            true,
2793            vec![Some(false)],
2794        )?;
2795
2796        // ========================================================================
2797        // NON-NULL SCALAR LITERALS WITH NULL IN LIST - String
2798        // Same semantics as Int32 but with string type
2799        // ========================================================================
2800
2801        let schema_str =
2802            Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
2803        let batch_str = RecordBatch::try_new(
2804            Arc::clone(&schema_str),
2805            vec![Arc::new(StringArray::from(vec![Some("dummy")]))],
2806        )?;
2807        let null_str = ScalarValue::Utf8(None);
2808
2809        let run_test_str = |expr: Arc<dyn PhysicalExpr>,
2810                            list: Vec<Arc<dyn PhysicalExpr>>,
2811                            negated: bool,
2812                            expected: Vec<Option<bool>>|
2813         -> Result<()> {
2814            let in_expr = in_list(expr, list, &negated, schema_str.as_ref())?;
2815            let result = in_expr
2816                .evaluate(&batch_str)?
2817                .into_array(batch_str.num_rows())?;
2818            let result = as_boolean_array(&result);
2819            let expected_array = BooleanArray::from(expected);
2820            assert_eq!(
2821                result,
2822                &expected_array,
2823                "Expected {:?}, got {:?}",
2824                expected_array,
2825                result.iter().collect::<Vec<_>>()
2826            );
2827            Ok(())
2828        };
2829
2830        // 'c' IN ('a', 'b', NULL) => NULL (not in list, but list has NULL)
2831        run_test_str(
2832            lit("c"),
2833            vec![lit("a"), lit("b"), lit(null_str.clone())],
2834            false,
2835            vec![None],
2836        )?;
2837
2838        // 'c' NOT IN ('a', 'b', NULL) => NULL (not in list, but list has NULL)
2839        run_test_str(
2840            lit("c"),
2841            vec![lit("a"), lit("b"), lit(null_str.clone())],
2842            true,
2843            vec![None],
2844        )?;
2845
2846        // 'a' IN ('a', 'b', NULL) => TRUE (found match, NULL doesn't matter)
2847        run_test_str(
2848            lit("a"),
2849            vec![lit("a"), lit("b"), lit(null_str.clone())],
2850            false,
2851            vec![Some(true)],
2852        )?;
2853
2854        // 'a' NOT IN ('a', 'b', NULL) => FALSE (found match, NULL doesn't matter)
2855        run_test_str(
2856            lit("a"),
2857            vec![lit("a"), lit("b"), lit(null_str.clone())],
2858            true,
2859            vec![Some(false)],
2860        )?;
2861
2862        Ok(())
2863    }
2864
2865    #[test]
2866    fn test_in_list_tuple_cases() -> Result<()> {
2867        // Test tuple/struct cases from the original request: (lit, lit) IN (lit, lit)
2868        // These test row-wise comparisons like (1, 2) IN ((1, 2), (3, 4))
2869
2870        let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2871
2872        // Helper to create struct scalars for tuple comparisons
2873        let make_struct = |v1: Option<i32>, v2: Option<i32>| -> ScalarValue {
2874            let fields = Fields::from(vec![
2875                Field::new("field_0", DataType::Int32, true),
2876                Field::new("field_1", DataType::Int32, true),
2877            ]);
2878            ScalarValue::Struct(Arc::new(StructArray::new(
2879                fields,
2880                vec![
2881                    Arc::new(Int32Array::from(vec![v1])),
2882                    Arc::new(Int32Array::from(vec![v2])),
2883                ],
2884                None,
2885            )))
2886        };
2887
2888        // Need a single row batch for scalar tests
2889        let batch = RecordBatch::try_new(
2890            Arc::clone(&schema),
2891            vec![Arc::new(Int32Array::from(vec![Some(1)]))],
2892        )?;
2893
2894        // Helper to run tuple tests
2895        let run_tuple_test = |lhs: ScalarValue,
2896                              list: Vec<ScalarValue>,
2897                              expected: Vec<Option<bool>>|
2898         -> Result<()> {
2899            let expr = in_list(
2900                lit(lhs),
2901                list.into_iter().map(lit).collect(),
2902                &false,
2903                schema.as_ref(),
2904            )?;
2905            let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2906            let result = as_boolean_array(&result);
2907            assert_eq!(result, &BooleanArray::from(expected));
2908            Ok(())
2909        };
2910
2911        // (NULL, NULL) IN ((1, 2)) => FALSE (tuples don't match)
2912        run_tuple_test(
2913            make_struct(None, None),
2914            vec![make_struct(Some(1), Some(2))],
2915            vec![Some(false)],
2916        )?;
2917
2918        // (NULL, NULL) IN ((NULL, 1)) => FALSE
2919        run_tuple_test(
2920            make_struct(None, None),
2921            vec![make_struct(None, Some(1))],
2922            vec![Some(false)],
2923        )?;
2924
2925        // (NULL, NULL) IN ((NULL, NULL)) => TRUE (exact match including nulls)
2926        run_tuple_test(
2927            make_struct(None, None),
2928            vec![make_struct(None, None)],
2929            vec![Some(true)],
2930        )?;
2931
2932        // (NULL, 1) IN ((1, 2)) => FALSE
2933        run_tuple_test(
2934            make_struct(None, Some(1)),
2935            vec![make_struct(Some(1), Some(2))],
2936            vec![Some(false)],
2937        )?;
2938
2939        // (NULL, 1) IN ((NULL, 1)) => TRUE (exact match)
2940        run_tuple_test(
2941            make_struct(None, Some(1)),
2942            vec![make_struct(None, Some(1))],
2943            vec![Some(true)],
2944        )?;
2945
2946        // (NULL, 1) IN ((NULL, NULL)) => FALSE
2947        run_tuple_test(
2948            make_struct(None, Some(1)),
2949            vec![make_struct(None, None)],
2950            vec![Some(false)],
2951        )?;
2952
2953        // (1, 2) IN ((1, 2)) => TRUE
2954        run_tuple_test(
2955            make_struct(Some(1), Some(2)),
2956            vec![make_struct(Some(1), Some(2))],
2957            vec![Some(true)],
2958        )?;
2959
2960        // (1, 3) IN ((1, 2)) => FALSE
2961        run_tuple_test(
2962            make_struct(Some(1), Some(3)),
2963            vec![make_struct(Some(1), Some(2))],
2964            vec![Some(false)],
2965        )?;
2966
2967        // (4, 4) IN ((1, 2)) => FALSE
2968        run_tuple_test(
2969            make_struct(Some(4), Some(4)),
2970            vec![make_struct(Some(1), Some(2))],
2971            vec![Some(false)],
2972        )?;
2973
2974        // (1, 1) IN ((NULL, 1)) => FALSE
2975        run_tuple_test(
2976            make_struct(Some(1), Some(1)),
2977            vec![make_struct(None, Some(1))],
2978            vec![Some(false)],
2979        )?;
2980
2981        // (1, 1) IN ((NULL, NULL)) => FALSE
2982        run_tuple_test(
2983            make_struct(Some(1), Some(1)),
2984            vec![make_struct(None, None)],
2985            vec![Some(false)],
2986        )?;
2987
2988        Ok(())
2989    }
2990
2991    #[test]
2992    fn test_in_list_dictionary_int32() -> Result<()> {
2993        // Create schema with dictionary-encoded Int32 column
2994        let dict_type =
2995            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32));
2996        let schema = Schema::new(vec![Field::new("a", dict_type.clone(), false)]);
2997        let col_a = col("a", &schema)?;
2998
2999        // Create IN list with Int32 literals: (100, 200, 300)
3000        let list = vec![lit(100i32), lit(200i32), lit(300i32)];
3001
3002        // Create InListExpr via in_list() - this uses Int32StaticFilter for Int32 lists
3003        let expr = in_list(col_a, list, &false, &schema)?;
3004
3005        // Create dictionary-encoded batch with values [100, 200, 500]
3006        // Dictionary: keys [0, 1, 2] -> values [100, 200, 500]
3007        // Using values clearly distinct from keys to avoid confusion
3008        let keys = Int8Array::from(vec![0, 1, 2]);
3009        let values = Int32Array::from(vec![100, 200, 500]);
3010        let dict_array: ArrayRef =
3011            Arc::new(DictionaryArray::try_new(keys, Arc::new(values))?);
3012        let batch = RecordBatch::try_new(Arc::new(schema), vec![dict_array])?;
3013
3014        // Expected: [100 IN (100,200,300), 200 IN (100,200,300), 500 IN (100,200,300)] = [true, true, false]
3015        let result = expr.evaluate(&batch)?.into_array(3)?;
3016        let result = as_boolean_array(&result);
3017        assert_eq!(result, &BooleanArray::from(vec![true, true, false]));
3018        Ok(())
3019    }
3020
3021    #[test]
3022    fn test_in_list_dictionary_types() -> Result<()> {
3023        // Helper functions for creating dictionary literals
3024        fn dict_lit_int64(key_type: DataType, value: i64) -> Arc<dyn PhysicalExpr> {
3025            lit(ScalarValue::Dictionary(
3026                Box::new(key_type),
3027                Box::new(ScalarValue::Int64(Some(value))),
3028            ))
3029        }
3030
3031        fn dict_lit_float64(key_type: DataType, value: f64) -> Arc<dyn PhysicalExpr> {
3032            lit(ScalarValue::Dictionary(
3033                Box::new(key_type),
3034                Box::new(ScalarValue::Float64(Some(value))),
3035            ))
3036        }
3037
3038        // Test case structures
3039        struct DictNeedleTest {
3040            list_values: Vec<Arc<dyn PhysicalExpr>>,
3041            expected: Vec<Option<bool>>,
3042        }
3043
3044        struct DictionaryInListTestCase {
3045            name: &'static str,
3046            dict_type: DataType,
3047            dict_keys: Vec<Option<i8>>,
3048            dict_values: ArrayRef,
3049            list_values_no_null: Vec<Arc<dyn PhysicalExpr>>,
3050            list_values_with_null: Vec<Arc<dyn PhysicalExpr>>,
3051            expected_1: Vec<Option<bool>>,
3052            expected_2: Vec<Option<bool>>,
3053            expected_3: Vec<Option<bool>>,
3054            expected_4: Vec<Option<bool>>,
3055            dict_needle_test: Option<DictNeedleTest>,
3056        }
3057
3058        // Test harness function
3059        fn run_dictionary_in_list_test(
3060            test_case: DictionaryInListTestCase,
3061        ) -> Result<()> {
3062            // Create schema with dictionary type
3063            let schema =
3064                Schema::new(vec![Field::new("a", test_case.dict_type.clone(), true)]);
3065            let col_a = col("a", &schema)?;
3066
3067            // Create dictionary array from keys and values
3068            let keys = Int8Array::from(test_case.dict_keys.clone());
3069            let dict_array: ArrayRef =
3070                Arc::new(DictionaryArray::try_new(keys, test_case.dict_values)?);
3071            let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![dict_array])?;
3072
3073            let exp1 = test_case.expected_1.clone();
3074            let exp2 = test_case.expected_2.clone();
3075            let exp3 = test_case.expected_3.clone();
3076            let exp4 = test_case.expected_4;
3077
3078            // Test 1: a IN (values_no_null)
3079            in_list!(
3080                batch,
3081                test_case.list_values_no_null.clone(),
3082                &false,
3083                exp1,
3084                Arc::clone(&col_a),
3085                &schema
3086            );
3087
3088            // Test 2: a NOT IN (values_no_null)
3089            in_list!(
3090                batch,
3091                test_case.list_values_no_null.clone(),
3092                &true,
3093                exp2,
3094                Arc::clone(&col_a),
3095                &schema
3096            );
3097
3098            // Test 3: a IN (values_with_null)
3099            in_list!(
3100                batch,
3101                test_case.list_values_with_null.clone(),
3102                &false,
3103                exp3,
3104                Arc::clone(&col_a),
3105                &schema
3106            );
3107
3108            // Test 4: a NOT IN (values_with_null)
3109            in_list!(
3110                batch,
3111                test_case.list_values_with_null,
3112                &true,
3113                exp4,
3114                Arc::clone(&col_a),
3115                &schema
3116            );
3117
3118            // Optional: Dictionary needle test (if provided)
3119            if let Some(needle_test) = test_case.dict_needle_test {
3120                in_list_raw!(
3121                    batch,
3122                    needle_test.list_values,
3123                    &false,
3124                    needle_test.expected,
3125                    Arc::clone(&col_a),
3126                    &schema
3127                );
3128            }
3129
3130            Ok(())
3131        }
3132
3133        // Test case 1: UTF8
3134        // Dictionary: keys [0, 1, null] → values ["a", "d", -]
3135        // Rows: ["a", "d", null]
3136        let utf8_case = DictionaryInListTestCase {
3137            name: "dictionary_utf8",
3138            dict_type: DataType::Dictionary(
3139                Box::new(DataType::Int8),
3140                Box::new(DataType::Utf8),
3141            ),
3142            dict_keys: vec![Some(0), Some(1), None],
3143            dict_values: Arc::new(StringArray::from(vec![Some("a"), Some("d")])),
3144            list_values_no_null: vec![lit("a"), lit("b")],
3145            list_values_with_null: vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
3146            expected_1: vec![Some(true), Some(false), None],
3147            expected_2: vec![Some(false), Some(true), None],
3148            expected_3: vec![Some(true), None, None],
3149            expected_4: vec![Some(false), None, None],
3150            dict_needle_test: None,
3151        };
3152
3153        // Test case 2: Int64 with dictionary needles
3154        // Dictionary: keys [0, 1, null] → values [10, 20, -]
3155        // Rows: [10, 20, null]
3156        let int64_case = DictionaryInListTestCase {
3157            name: "dictionary_int64",
3158            dict_type: DataType::Dictionary(
3159                Box::new(DataType::Int8),
3160                Box::new(DataType::Int64),
3161            ),
3162            dict_keys: vec![Some(0), Some(1), None],
3163            dict_values: Arc::new(Int64Array::from(vec![Some(10), Some(20)])),
3164            list_values_no_null: vec![lit(10i64), lit(15i64)],
3165            list_values_with_null: vec![
3166                lit(10i64),
3167                lit(15i64),
3168                lit(ScalarValue::Int64(None)),
3169            ],
3170            expected_1: vec![Some(true), Some(false), None],
3171            expected_2: vec![Some(false), Some(true), None],
3172            expected_3: vec![Some(true), None, None],
3173            expected_4: vec![Some(false), None, None],
3174            dict_needle_test: Some(DictNeedleTest {
3175                list_values: vec![
3176                    dict_lit_int64(DataType::Int16, 10),
3177                    dict_lit_int64(DataType::Int16, 15),
3178                ],
3179                expected: vec![Some(true), Some(false), None],
3180            }),
3181        };
3182
3183        // Test case 3: Float64 with NaN and dictionary needles
3184        // Dictionary: keys [0, 1, null, 2] → values [1.5, 3.7, NaN, -]
3185        // Rows: [1.5, 3.7, null, NaN]
3186        // Note: NaN is a value (not null), so it goes in the values array
3187        let float64_case = DictionaryInListTestCase {
3188            name: "dictionary_float64",
3189            dict_type: DataType::Dictionary(
3190                Box::new(DataType::Int8),
3191                Box::new(DataType::Float64),
3192            ),
3193            dict_keys: vec![Some(0), Some(1), None, Some(2)],
3194            dict_values: Arc::new(Float64Array::from(vec![
3195                Some(1.5),      // index 0
3196                Some(3.7),      // index 1
3197                Some(f64::NAN), // index 2
3198            ])),
3199            list_values_no_null: vec![lit(1.5f64), lit(2.0f64)],
3200            list_values_with_null: vec![
3201                lit(1.5f64),
3202                lit(2.0f64),
3203                lit(ScalarValue::Float64(None)),
3204            ],
3205            // Test 1: a IN (1.5, 2.0) → [true, false, null, false]
3206            // NaN is false because NaN not in list and no NULL in list
3207            expected_1: vec![Some(true), Some(false), None, Some(false)],
3208            // Test 2: a NOT IN (1.5, 2.0) → [false, true, null, true]
3209            // NaN is true because NaN not in list
3210            expected_2: vec![Some(false), Some(true), None, Some(true)],
3211            // Test 3: a IN (1.5, 2.0, NULL) → [true, null, null, null]
3212            // 3.7 and NaN become null due to NULL in list (three-valued logic)
3213            expected_3: vec![Some(true), None, None, None],
3214            // Test 4: a NOT IN (1.5, 2.0, NULL) → [false, null, null, null]
3215            // 3.7 and NaN become null due to NULL in list
3216            expected_4: vec![Some(false), None, None, None],
3217            dict_needle_test: Some(DictNeedleTest {
3218                list_values: vec![
3219                    dict_lit_float64(DataType::UInt16, 1.5),
3220                    dict_lit_float64(DataType::UInt16, 2.0),
3221                ],
3222                expected: vec![Some(true), Some(false), None, Some(false)],
3223            }),
3224        };
3225
3226        // Execute all test cases
3227        let test_name = utf8_case.name;
3228        run_dictionary_in_list_test(utf8_case).map_err(|e| {
3229            datafusion_common::DataFusionError::Execution(format!(
3230                "Dictionary test '{test_name}' failed: {e}"
3231            ))
3232        })?;
3233
3234        let test_name = int64_case.name;
3235        run_dictionary_in_list_test(int64_case).map_err(|e| {
3236            datafusion_common::DataFusionError::Execution(format!(
3237                "Dictionary test '{test_name}' failed: {e}"
3238            ))
3239        })?;
3240
3241        let test_name = float64_case.name;
3242        run_dictionary_in_list_test(float64_case).map_err(|e| {
3243            datafusion_common::DataFusionError::Execution(format!(
3244                "Dictionary test '{test_name}' failed: {e}"
3245            ))
3246        })?;
3247
3248        // Additional test: Dictionary deduplication with repeated keys
3249        // This tests that multiple rows with the same key (pointing to the same value)
3250        // are evaluated correctly
3251        let dedup_case = DictionaryInListTestCase {
3252            name: "dictionary_deduplication",
3253            dict_type: DataType::Dictionary(
3254                Box::new(DataType::Int8),
3255                Box::new(DataType::Utf8),
3256            ),
3257            // Keys: [0, 1, 0, 1, null] - keys 0 and 1 are repeated
3258            // This creates data: ["a", "d", "a", "d", null]
3259            dict_keys: vec![Some(0), Some(1), Some(0), Some(1), None],
3260            dict_values: Arc::new(StringArray::from(vec![Some("a"), Some("d")])),
3261            list_values_no_null: vec![lit("a"), lit("b")],
3262            list_values_with_null: vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
3263            // Test 1: a IN ("a", "b") → [true, false, true, false, null]
3264            // Rows 0 and 2 both have key 0 → "a", so both are true
3265            expected_1: vec![Some(true), Some(false), Some(true), Some(false), None],
3266            // Test 2: a NOT IN ("a", "b") → [false, true, false, true, null]
3267            expected_2: vec![Some(false), Some(true), Some(false), Some(true), None],
3268            // Test 3: a IN ("a", "b", NULL) → [true, null, true, null, null]
3269            // "d" becomes null due to NULL in list
3270            expected_3: vec![Some(true), None, Some(true), None, None],
3271            // Test 4: a NOT IN ("a", "b", NULL) → [false, null, false, null, null]
3272            expected_4: vec![Some(false), None, Some(false), None, None],
3273            dict_needle_test: None,
3274        };
3275
3276        let test_name = dedup_case.name;
3277        run_dictionary_in_list_test(dedup_case).map_err(|e| {
3278            datafusion_common::DataFusionError::Execution(format!(
3279                "Dictionary test '{test_name}' failed: {e}"
3280            ))
3281        })?;
3282
3283        // Additional test for Float64 NaN in IN list
3284        let dict_type =
3285            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Float64));
3286        let schema = Schema::new(vec![Field::new("a", dict_type.clone(), true)]);
3287        let col_a = col("a", &schema)?;
3288
3289        let keys = Int8Array::from(vec![Some(0), Some(1), None, Some(2)]);
3290        let values = Float64Array::from(vec![Some(1.5), Some(3.7), Some(f64::NAN)]);
3291        let dict_array: ArrayRef =
3292            Arc::new(DictionaryArray::try_new(keys, Arc::new(values))?);
3293        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![dict_array])?;
3294
3295        // Test: a IN (1.5, 2.0, NaN)
3296        let list_with_nan = vec![lit(1.5f64), lit(2.0f64), lit(f64::NAN)];
3297        in_list!(
3298            batch,
3299            list_with_nan,
3300            &false,
3301            vec![Some(true), Some(false), None, Some(true)],
3302            col_a,
3303            &schema
3304        );
3305
3306        Ok(())
3307    }
3308
3309    #[test]
3310    fn test_in_list_esoteric_types() -> Result<()> {
3311        // Test esoteric/less common types to validate the transform and mapping flow.
3312        // These types are reinterpreted to base primitive types (e.g., Timestamp -> UInt64,
3313        // Interval -> Decimal128, Float16 -> UInt16). We just need to verify basic
3314        // functionality works - no need for comprehensive null handling tests.
3315
3316        // Helper: simple IN test that expects [Some(true), Some(false)]
3317        let test_type = |data_type: DataType,
3318                         in_array: ArrayRef,
3319                         list_values: Vec<ScalarValue>|
3320         -> Result<()> {
3321            let schema = Schema::new(vec![Field::new("a", data_type.clone(), false)]);
3322            let col_a = col("a", &schema)?;
3323            let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![in_array])?;
3324
3325            let list = list_values.into_iter().map(lit).collect();
3326            in_list!(
3327                batch,
3328                list,
3329                &false,
3330                vec![Some(true), Some(false)],
3331                col_a,
3332                &schema
3333            );
3334            Ok(())
3335        };
3336
3337        // Timestamp types (all units map to Int64 -> UInt64)
3338        test_type(
3339            DataType::Timestamp(TimeUnit::Second, None),
3340            Arc::new(TimestampSecondArray::from(vec![Some(1000), Some(2000)])),
3341            vec![
3342                ScalarValue::TimestampSecond(Some(1000), None),
3343                ScalarValue::TimestampSecond(Some(1500), None),
3344            ],
3345        )?;
3346
3347        test_type(
3348            DataType::Timestamp(TimeUnit::Millisecond, None),
3349            Arc::new(TimestampMillisecondArray::from(vec![
3350                Some(1000000),
3351                Some(2000000),
3352            ])),
3353            vec![
3354                ScalarValue::TimestampMillisecond(Some(1000000), None),
3355                ScalarValue::TimestampMillisecond(Some(1500000), None),
3356            ],
3357        )?;
3358
3359        test_type(
3360            DataType::Timestamp(TimeUnit::Microsecond, None),
3361            Arc::new(TimestampMicrosecondArray::from(vec![
3362                Some(1000000000),
3363                Some(2000000000),
3364            ])),
3365            vec![
3366                ScalarValue::TimestampMicrosecond(Some(1000000000), None),
3367                ScalarValue::TimestampMicrosecond(Some(1500000000), None),
3368            ],
3369        )?;
3370
3371        // Time32 and Time64 (map to Int32 -> UInt32 and Int64 -> UInt64 respectively)
3372        test_type(
3373            DataType::Time32(TimeUnit::Second),
3374            Arc::new(Time32SecondArray::from(vec![Some(3600), Some(7200)])),
3375            vec![
3376                ScalarValue::Time32Second(Some(3600)),
3377                ScalarValue::Time32Second(Some(5400)),
3378            ],
3379        )?;
3380
3381        test_type(
3382            DataType::Time32(TimeUnit::Millisecond),
3383            Arc::new(Time32MillisecondArray::from(vec![
3384                Some(3600000),
3385                Some(7200000),
3386            ])),
3387            vec![
3388                ScalarValue::Time32Millisecond(Some(3600000)),
3389                ScalarValue::Time32Millisecond(Some(5400000)),
3390            ],
3391        )?;
3392
3393        test_type(
3394            DataType::Time64(TimeUnit::Microsecond),
3395            Arc::new(Time64MicrosecondArray::from(vec![
3396                Some(3600000000),
3397                Some(7200000000),
3398            ])),
3399            vec![
3400                ScalarValue::Time64Microsecond(Some(3600000000)),
3401                ScalarValue::Time64Microsecond(Some(5400000000)),
3402            ],
3403        )?;
3404
3405        test_type(
3406            DataType::Time64(TimeUnit::Nanosecond),
3407            Arc::new(Time64NanosecondArray::from(vec![
3408                Some(3600000000000),
3409                Some(7200000000000),
3410            ])),
3411            vec![
3412                ScalarValue::Time64Nanosecond(Some(3600000000000)),
3413                ScalarValue::Time64Nanosecond(Some(5400000000000)),
3414            ],
3415        )?;
3416
3417        // Duration types (map to Int64 -> UInt64)
3418        test_type(
3419            DataType::Duration(TimeUnit::Second),
3420            Arc::new(DurationSecondArray::from(vec![Some(86400), Some(172800)])),
3421            vec![
3422                ScalarValue::DurationSecond(Some(86400)),
3423                ScalarValue::DurationSecond(Some(129600)),
3424            ],
3425        )?;
3426
3427        test_type(
3428            DataType::Duration(TimeUnit::Millisecond),
3429            Arc::new(DurationMillisecondArray::from(vec![
3430                Some(86400000),
3431                Some(172800000),
3432            ])),
3433            vec![
3434                ScalarValue::DurationMillisecond(Some(86400000)),
3435                ScalarValue::DurationMillisecond(Some(129600000)),
3436            ],
3437        )?;
3438
3439        test_type(
3440            DataType::Duration(TimeUnit::Microsecond),
3441            Arc::new(DurationMicrosecondArray::from(vec![
3442                Some(86400000000),
3443                Some(172800000000),
3444            ])),
3445            vec![
3446                ScalarValue::DurationMicrosecond(Some(86400000000)),
3447                ScalarValue::DurationMicrosecond(Some(129600000000)),
3448            ],
3449        )?;
3450
3451        test_type(
3452            DataType::Duration(TimeUnit::Nanosecond),
3453            Arc::new(DurationNanosecondArray::from(vec![
3454                Some(86400000000000),
3455                Some(172800000000000),
3456            ])),
3457            vec![
3458                ScalarValue::DurationNanosecond(Some(86400000000000)),
3459                ScalarValue::DurationNanosecond(Some(129600000000000)),
3460            ],
3461        )?;
3462
3463        // Interval types (map to 16-byte Decimal128Type)
3464        test_type(
3465            DataType::Interval(IntervalUnit::YearMonth),
3466            Arc::new(IntervalYearMonthArray::from(vec![Some(12), Some(24)])),
3467            vec![
3468                ScalarValue::IntervalYearMonth(Some(12)),
3469                ScalarValue::IntervalYearMonth(Some(18)),
3470            ],
3471        )?;
3472
3473        test_type(
3474            DataType::Interval(IntervalUnit::DayTime),
3475            Arc::new(IntervalDayTimeArray::from(vec![
3476                Some(IntervalDayTime {
3477                    days: 1,
3478                    milliseconds: 0,
3479                }),
3480                Some(IntervalDayTime {
3481                    days: 2,
3482                    milliseconds: 0,
3483                }),
3484            ])),
3485            vec![
3486                ScalarValue::IntervalDayTime(Some(IntervalDayTime {
3487                    days: 1,
3488                    milliseconds: 0,
3489                })),
3490                ScalarValue::IntervalDayTime(Some(IntervalDayTime {
3491                    days: 1,
3492                    milliseconds: 500,
3493                })),
3494            ],
3495        )?;
3496
3497        test_type(
3498            DataType::Interval(IntervalUnit::MonthDayNano),
3499            Arc::new(IntervalMonthDayNanoArray::from(vec![
3500                Some(IntervalMonthDayNano {
3501                    months: 1,
3502                    days: 0,
3503                    nanoseconds: 0,
3504                }),
3505                Some(IntervalMonthDayNano {
3506                    months: 2,
3507                    days: 0,
3508                    nanoseconds: 0,
3509                }),
3510            ])),
3511            vec![
3512                ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano {
3513                    months: 1,
3514                    days: 0,
3515                    nanoseconds: 0,
3516                })),
3517                ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano {
3518                    months: 1,
3519                    days: 15,
3520                    nanoseconds: 0,
3521                })),
3522            ],
3523        )?;
3524
3525        // Decimal256 (maps to Decimal128Type for 16-byte width)
3526        // Need to use with_precision_and_scale() to set the metadata
3527        let precision = 38;
3528        let scale = 10;
3529        test_type(
3530            DataType::Decimal256(precision, scale),
3531            Arc::new(
3532                Decimal256Array::from(vec![
3533                    Some(i256::from(12345)),
3534                    Some(i256::from(67890)),
3535                ])
3536                .with_precision_and_scale(precision, scale)?,
3537            ),
3538            vec![
3539                ScalarValue::Decimal256(Some(i256::from(12345)), precision, scale),
3540                ScalarValue::Decimal256(Some(i256::from(54321)), precision, scale),
3541            ],
3542        )?;
3543
3544        Ok(())
3545    }
3546
3547    /// Helper: creates an InListExpr with `static_filter = None`
3548    /// to force the column-reference evaluation path.
3549    fn make_in_list_with_columns(
3550        expr: Arc<dyn PhysicalExpr>,
3551        list: Vec<Arc<dyn PhysicalExpr>>,
3552        negated: bool,
3553    ) -> Arc<InListExpr> {
3554        Arc::new(InListExpr::new(expr, list, negated, None))
3555    }
3556
3557    #[test]
3558    fn test_in_list_with_columns_int32_scalars() -> Result<()> {
3559        // Column-reference path with scalar literals (bypassing static filter)
3560        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
3561        let col_a = col("a", &schema)?;
3562        let batch = RecordBatch::try_new(
3563            Arc::new(schema),
3564            vec![Arc::new(Int32Array::from(vec![
3565                Some(1),
3566                Some(2),
3567                Some(3),
3568                None,
3569            ]))],
3570        )?;
3571
3572        let list = vec![
3573            lit(ScalarValue::Int32(Some(1))),
3574            lit(ScalarValue::Int32(Some(3))),
3575        ];
3576        let expr = make_in_list_with_columns(col_a, list, false);
3577
3578        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3579        let result = as_boolean_array(&result);
3580        assert_eq!(
3581            result,
3582            &BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
3583        );
3584        Ok(())
3585    }
3586
3587    #[test]
3588    fn test_in_list_with_columns_int32_column_refs() -> Result<()> {
3589        // IN list with column references
3590        let schema = Schema::new(vec![
3591            Field::new("a", DataType::Int32, true),
3592            Field::new("b", DataType::Int32, true),
3593            Field::new("c", DataType::Int32, true),
3594        ]);
3595        let batch = RecordBatch::try_new(
3596            Arc::new(schema.clone()),
3597            vec![
3598                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), None])),
3599                Arc::new(Int32Array::from(vec![
3600                    Some(1),
3601                    Some(99),
3602                    Some(99),
3603                    Some(99),
3604                ])),
3605                Arc::new(Int32Array::from(vec![Some(99), Some(99), Some(3), None])),
3606            ],
3607        )?;
3608
3609        let col_a = col("a", &schema)?;
3610        let list = vec![col("b", &schema)?, col("c", &schema)?];
3611        let expr = make_in_list_with_columns(col_a, list, false);
3612
3613        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3614        let result = as_boolean_array(&result);
3615        // row 0: 1 IN (1, 99) → true
3616        // row 1: 2 IN (99, 99) → false
3617        // row 2: 3 IN (99, 3) → true
3618        // row 3: NULL IN (99, NULL) → NULL
3619        assert_eq!(
3620            result,
3621            &BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
3622        );
3623        Ok(())
3624    }
3625
3626    #[test]
3627    fn test_in_list_with_columns_utf8_column_refs() -> Result<()> {
3628        // IN list with Utf8 column references
3629        let schema = Schema::new(vec![
3630            Field::new("a", DataType::Utf8, false),
3631            Field::new("b", DataType::Utf8, false),
3632        ]);
3633        let batch = RecordBatch::try_new(
3634            Arc::new(schema.clone()),
3635            vec![
3636                Arc::new(StringArray::from(vec!["x", "y", "z"])),
3637                Arc::new(StringArray::from(vec!["x", "x", "z"])),
3638            ],
3639        )?;
3640
3641        let col_a = col("a", &schema)?;
3642        let list = vec![col("b", &schema)?];
3643        let expr = make_in_list_with_columns(col_a, list, false);
3644
3645        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3646        let result = as_boolean_array(&result);
3647        // row 0: "x" IN ("x") → true
3648        // row 1: "y" IN ("x") → false
3649        // row 2: "z" IN ("z") → true
3650        assert_eq!(result, &BooleanArray::from(vec![true, false, true]));
3651        Ok(())
3652    }
3653
3654    #[test]
3655    fn test_in_list_with_columns_negated() -> Result<()> {
3656        // NOT IN with column references
3657        let schema = Schema::new(vec![
3658            Field::new("a", DataType::Int32, false),
3659            Field::new("b", DataType::Int32, false),
3660        ]);
3661        let batch = RecordBatch::try_new(
3662            Arc::new(schema.clone()),
3663            vec![
3664                Arc::new(Int32Array::from(vec![1, 2, 3])),
3665                Arc::new(Int32Array::from(vec![1, 99, 3])),
3666            ],
3667        )?;
3668
3669        let col_a = col("a", &schema)?;
3670        let list = vec![col("b", &schema)?];
3671        let expr = make_in_list_with_columns(col_a, list, true);
3672
3673        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3674        let result = as_boolean_array(&result);
3675        // row 0: 1 NOT IN (1) → false
3676        // row 1: 2 NOT IN (99) → true
3677        // row 2: 3 NOT IN (3) → false
3678        assert_eq!(result, &BooleanArray::from(vec![false, true, false]));
3679        Ok(())
3680    }
3681
3682    #[test]
3683    fn test_in_list_with_columns_null_in_list() -> Result<()> {
3684        // IN list with NULL scalar (column-reference path)
3685        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3686        let col_a = col("a", &schema)?;
3687        let batch = RecordBatch::try_new(
3688            Arc::new(schema),
3689            vec![Arc::new(Int32Array::from(vec![1, 2]))],
3690        )?;
3691
3692        let list = vec![
3693            lit(ScalarValue::Int32(None)),
3694            lit(ScalarValue::Int32(Some(1))),
3695        ];
3696        let expr = make_in_list_with_columns(col_a, list, false);
3697
3698        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3699        let result = as_boolean_array(&result);
3700        // row 0: 1 IN (NULL, 1) → true (true OR null = true)
3701        // row 1: 2 IN (NULL, 1) → NULL (false OR null = null)
3702        assert_eq!(result, &BooleanArray::from(vec![Some(true), None]));
3703        Ok(())
3704    }
3705
3706    #[test]
3707    fn test_in_list_with_columns_float_nan() -> Result<()> {
3708        // Verify NaN == NaN is true in the column-reference path
3709        // (consistent with Arrow's totalOrder semantics)
3710        let schema = Schema::new(vec![
3711            Field::new("a", DataType::Float64, false),
3712            Field::new("b", DataType::Float64, false),
3713        ]);
3714        let batch = RecordBatch::try_new(
3715            Arc::new(schema.clone()),
3716            vec![
3717                Arc::new(Float64Array::from(vec![f64::NAN, 1.0, f64::NAN])),
3718                Arc::new(Float64Array::from(vec![f64::NAN, 2.0, 0.0])),
3719            ],
3720        )?;
3721
3722        let col_a = col("a", &schema)?;
3723        let list = vec![col("b", &schema)?];
3724        let expr = make_in_list_with_columns(col_a, list, false);
3725
3726        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3727        let result = as_boolean_array(&result);
3728        // row 0: NaN IN (NaN) → true
3729        // row 1: 1.0 IN (2.0) → false
3730        // row 2: NaN IN (0.0) → false
3731        assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
3732        Ok(())
3733    }
3734    /// Tests that short-circuit evaluation produces correct results.
3735    /// When all rows match after the first list item, remaining items
3736    /// should be skipped without affecting correctness.
3737    #[test]
3738    fn test_in_list_with_columns_short_circuit() -> Result<()> {
3739        // a IN (b, c) where b already matches every row of a
3740        // The short-circuit should skip evaluating c
3741        let schema = Schema::new(vec![
3742            Field::new("a", DataType::Int32, false),
3743            Field::new("b", DataType::Int32, false),
3744            Field::new("c", DataType::Int32, false),
3745        ]);
3746        let batch = RecordBatch::try_new(
3747            Arc::new(schema.clone()),
3748            vec![
3749                Arc::new(Int32Array::from(vec![1, 2, 3])),
3750                Arc::new(Int32Array::from(vec![1, 2, 3])), // b == a for all rows
3751                Arc::new(Int32Array::from(vec![99, 99, 99])),
3752            ],
3753        )?;
3754
3755        let col_a = col("a", &schema)?;
3756        let list = vec![col("b", &schema)?, col("c", &schema)?];
3757        let expr = make_in_list_with_columns(col_a, list, false);
3758
3759        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3760        let result = as_boolean_array(&result);
3761        assert_eq!(result, &BooleanArray::from(vec![true, true, true]));
3762        Ok(())
3763    }
3764
3765    /// Short-circuit must NOT skip when nulls are present (three-valued logic).
3766    /// Even if all non-null values are true, null rows keep the result as null.
3767    #[test]
3768    fn test_in_list_with_columns_short_circuit_with_nulls() -> Result<()> {
3769        // a IN (b, c) where a has nulls
3770        // Even if b matches all non-null rows, result should preserve nulls
3771        let schema = Schema::new(vec![
3772            Field::new("a", DataType::Int32, true),
3773            Field::new("b", DataType::Int32, false),
3774            Field::new("c", DataType::Int32, false),
3775        ]);
3776        let batch = RecordBatch::try_new(
3777            Arc::new(schema.clone()),
3778            vec![
3779                Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
3780                Arc::new(Int32Array::from(vec![1, 2, 3])), // matches non-null rows
3781                Arc::new(Int32Array::from(vec![99, 99, 99])),
3782            ],
3783        )?;
3784
3785        let col_a = col("a", &schema)?;
3786        let list = vec![col("b", &schema)?, col("c", &schema)?];
3787        let expr = make_in_list_with_columns(col_a, list, false);
3788
3789        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3790        let result = as_boolean_array(&result);
3791        // row 0: 1 IN (1, 99) → true
3792        // row 1: NULL IN (2, 99) → NULL
3793        // row 2: 3 IN (3, 99) → true
3794        assert_eq!(
3795            result,
3796            &BooleanArray::from(vec![Some(true), None, Some(true)])
3797        );
3798        Ok(())
3799    }
3800
3801    /// Tests the make_comparator + collect_bool fallback path using
3802    /// struct column references (nested types don't support arrow_eq).
3803    #[test]
3804    fn test_in_list_with_columns_struct() -> Result<()> {
3805        let struct_fields = Fields::from(vec![
3806            Field::new("x", DataType::Int32, false),
3807            Field::new("y", DataType::Utf8, false),
3808        ]);
3809        let struct_dt = DataType::Struct(struct_fields.clone());
3810
3811        let schema = Schema::new(vec![
3812            Field::new("a", struct_dt.clone(), true),
3813            Field::new("b", struct_dt.clone(), false),
3814            Field::new("c", struct_dt.clone(), false),
3815        ]);
3816
3817        // a: [{1,"a"}, {2,"b"}, NULL,    {4,"d"}]
3818        // b: [{1,"a"}, {9,"z"}, {3,"c"}, {4,"d"}]
3819        // c: [{9,"z"}, {2,"b"}, {9,"z"}, {9,"z"}]
3820        let a = Arc::new(StructArray::new(
3821            struct_fields.clone(),
3822            vec![
3823                Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
3824                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
3825            ],
3826            Some(vec![true, true, false, true].into()),
3827        ));
3828        let b = Arc::new(StructArray::new(
3829            struct_fields.clone(),
3830            vec![
3831                Arc::new(Int32Array::from(vec![1, 9, 3, 4])),
3832                Arc::new(StringArray::from(vec!["a", "z", "c", "d"])),
3833            ],
3834            None,
3835        ));
3836        let c = Arc::new(StructArray::new(
3837            struct_fields.clone(),
3838            vec![
3839                Arc::new(Int32Array::from(vec![9, 2, 9, 9])),
3840                Arc::new(StringArray::from(vec!["z", "b", "z", "z"])),
3841            ],
3842            None,
3843        ));
3844
3845        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b, c])?;
3846
3847        let col_a = col("a", &schema)?;
3848        let list = vec![col("b", &schema)?, col("c", &schema)?];
3849        let expr = make_in_list_with_columns(col_a, list, false);
3850
3851        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3852        let result = as_boolean_array(&result);
3853        // row 0: {1,"a"} IN ({1,"a"}, {9,"z"}) → true  (matches b)
3854        // row 1: {2,"b"} IN ({9,"z"}, {2,"b"}) → true  (matches c)
3855        // row 2: NULL    IN ({3,"c"}, {9,"z"}) → NULL
3856        // row 3: {4,"d"} IN ({4,"d"}, {9,"z"}) → true  (matches b)
3857        assert_eq!(
3858            result,
3859            &BooleanArray::from(vec![Some(true), Some(true), None, Some(true)])
3860        );
3861
3862        // Also test NOT IN
3863        let col_a = col("a", &schema)?;
3864        let list = vec![col("b", &schema)?, col("c", &schema)?];
3865        let expr = make_in_list_with_columns(col_a, list, true);
3866
3867        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3868        let result = as_boolean_array(&result);
3869        // row 0: {1,"a"} NOT IN ({1,"a"}, {9,"z"}) → false
3870        // row 1: {2,"b"} NOT IN ({9,"z"}, {2,"b"}) → false
3871        // row 2: NULL    NOT IN ({3,"c"}, {9,"z"}) → NULL
3872        // row 3: {4,"d"} NOT IN ({4,"d"}, {9,"z"}) → false
3873        assert_eq!(
3874            result,
3875            &BooleanArray::from(vec![Some(false), Some(false), None, Some(false)])
3876        );
3877        Ok(())
3878    }
3879
3880    // -----------------------------------------------------------------------
3881    // Tests for try_new_from_array: evaluates `needle IN in_array`.
3882    //
3883    // This exercises the code path used by HashJoin dynamic filter pushdown,
3884    // where in_array is built directly from the join's build-side arrays.
3885    // Unlike try_new (used by SQL IN expressions), which always produces a
3886    // non-Dictionary in_array because evaluate_list() flattens Dictionary
3887    // scalars, try_new_from_array passes the array directly and can produce
3888    // a Dictionary in_array.
3889    // -----------------------------------------------------------------------
3890
3891    fn wrap_in_dict(array: ArrayRef) -> ArrayRef {
3892        let keys = Int32Array::from((0..array.len() as i32).collect::<Vec<_>>());
3893        Arc::new(DictionaryArray::new(keys, array))
3894    }
3895
3896    /// Evaluates `needle IN in_array` via try_new_from_array, the same
3897    /// path used by HashJoin dynamic filter pushdown (not the SQL literal
3898    /// IN path which goes through try_new).
3899    fn eval_in_list_from_array(
3900        needle: ArrayRef,
3901        in_array: ArrayRef,
3902    ) -> Result<BooleanArray> {
3903        let schema =
3904            Schema::new(vec![Field::new("a", needle.data_type().clone(), false)]);
3905        let col_a = col("a", &schema)?;
3906        let expr = Arc::new(InListExpr::try_new_from_array(col_a, in_array, false)?)
3907            as Arc<dyn PhysicalExpr>;
3908        let batch = RecordBatch::try_new(Arc::new(schema), vec![needle])?;
3909        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3910        Ok(as_boolean_array(&result).clone())
3911    }
3912
3913    #[test]
3914    fn test_in_list_from_array_type_combinations() -> Result<()> {
3915        use arrow::compute::cast;
3916
3917        // All cases: needle[0] and needle[2] match, needle[1] does not.
3918        let expected = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
3919
3920        // Base arrays cast to each target type
3921        let base_in = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef;
3922        let base_needle = Arc::new(Int64Array::from(vec![1i64, 4, 2])) as ArrayRef;
3923
3924        // Test all specializations in instantiate_static_filter
3925        let primitive_types = vec![
3926            DataType::Int8,
3927            DataType::Int16,
3928            DataType::Int32,
3929            DataType::Int64,
3930            DataType::UInt8,
3931            DataType::UInt16,
3932            DataType::UInt32,
3933            DataType::UInt64,
3934            DataType::Float32,
3935            DataType::Float64,
3936        ];
3937
3938        for dt in &primitive_types {
3939            let in_array = cast(&base_in, dt)?;
3940            let needle = cast(&base_needle, dt)?;
3941
3942            // T in_array, T needle
3943            assert_eq!(
3944                expected,
3945                eval_in_list_from_array(Arc::clone(&needle), Arc::clone(&in_array))?,
3946                "same-type failed for {dt:?}"
3947            );
3948
3949            // T in_array, Dict(Int32, T) needle
3950            assert_eq!(
3951                expected,
3952                eval_in_list_from_array(wrap_in_dict(needle), in_array)?,
3953                "dict-needle failed for {dt:?}"
3954            );
3955        }
3956
3957        // Utf8 (falls through to ArrayStaticFilter)
3958        let utf8_in = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3959        let utf8_needle = Arc::new(StringArray::from(vec!["a", "d", "b"])) as ArrayRef;
3960
3961        // Utf8 in_array, Utf8 needle
3962        assert_eq!(
3963            expected,
3964            eval_in_list_from_array(Arc::clone(&utf8_needle), Arc::clone(&utf8_in),)?
3965        );
3966
3967        // Utf8 in_array, Dict(Utf8) needle
3968        assert_eq!(
3969            expected,
3970            eval_in_list_from_array(
3971                wrap_in_dict(Arc::clone(&utf8_needle)),
3972                Arc::clone(&utf8_in),
3973            )?
3974        );
3975
3976        // Dict(Utf8) in_array, Dict(Utf8) needle: the #20937 bug
3977        assert_eq!(
3978            expected,
3979            eval_in_list_from_array(
3980                wrap_in_dict(Arc::clone(&utf8_needle)),
3981                wrap_in_dict(Arc::clone(&utf8_in)),
3982            )?
3983        );
3984
3985        // Struct in_array, Struct needle: multi-column join
3986        let struct_fields = Fields::from(vec![
3987            Field::new("c0", DataType::Utf8, true),
3988            Field::new("c1", DataType::Int64, true),
3989        ]);
3990        let make_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
3991            let pairs: Vec<(FieldRef, ArrayRef)> =
3992                struct_fields.iter().cloned().zip([c0, c1]).collect();
3993            Arc::new(StructArray::from(pairs))
3994        };
3995        assert_eq!(
3996            expected,
3997            eval_in_list_from_array(
3998                make_struct(
3999                    Arc::clone(&utf8_needle),
4000                    Arc::new(Int64Array::from(vec![1, 4, 2])),
4001                ),
4002                make_struct(
4003                    Arc::clone(&utf8_in),
4004                    Arc::new(Int64Array::from(vec![1, 2, 3])),
4005                ),
4006            )?
4007        );
4008
4009        // Struct with Dict fields: multi-column Dict join
4010        let dict_struct_fields = Fields::from(vec![
4011            Field::new(
4012                "c0",
4013                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4014                true,
4015            ),
4016            Field::new("c1", DataType::Int64, true),
4017        ]);
4018        let make_dict_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
4019            let pairs: Vec<(FieldRef, ArrayRef)> =
4020                dict_struct_fields.iter().cloned().zip([c0, c1]).collect();
4021            Arc::new(StructArray::from(pairs))
4022        };
4023        assert_eq!(
4024            expected,
4025            eval_in_list_from_array(
4026                make_dict_struct(
4027                    wrap_in_dict(Arc::clone(&utf8_needle)),
4028                    Arc::new(Int64Array::from(vec![1, 4, 2])),
4029                ),
4030                make_dict_struct(
4031                    wrap_in_dict(Arc::clone(&utf8_in)),
4032                    Arc::new(Int64Array::from(vec![1, 2, 3])),
4033                ),
4034            )?
4035        );
4036
4037        Ok(())
4038    }
4039
4040    #[test]
4041    fn test_in_list_from_array_type_mismatch_errors() -> Result<()> {
4042        // Utf8 needle, Dict(Utf8) in_array
4043        let err = eval_in_list_from_array(
4044            Arc::new(StringArray::from(vec!["a", "d", "b"])),
4045            wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
4046        )
4047        .unwrap_err()
4048        .to_string();
4049        assert!(
4050            err.contains("Can't compare arrays of different types"),
4051            "{err}"
4052        );
4053
4054        // Dict(Utf8) needle, Int64 in_array: specialized Int64StaticFilter
4055        // rejects the Utf8 dictionary values at construction time
4056        let err = eval_in_list_from_array(
4057            wrap_in_dict(Arc::new(StringArray::from(vec!["a", "d", "b"]))),
4058            Arc::new(Int64Array::from(vec![1, 2, 3])),
4059        )
4060        .unwrap_err()
4061        .to_string();
4062        assert!(err.contains("Failed to downcast"), "{err}");
4063
4064        // Dict(Int64) needle, Dict(Utf8) in_array: both Dict but different
4065        // value types, make_comparator rejects the comparison
4066        let err = eval_in_list_from_array(
4067            wrap_in_dict(Arc::new(Int64Array::from(vec![1, 4, 2]))),
4068            wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
4069        )
4070        .unwrap_err()
4071        .to_string();
4072        assert!(
4073            err.contains("Can't compare arrays of different types"),
4074            "{err}"
4075        );
4076        Ok(())
4077    }
4078}