1use std::any::Any;
21use std::fmt::Debug;
22use std::hash::{Hash, Hasher};
23use std::sync::Arc;
24
25use crate::PhysicalExpr;
26use crate::physical_expr::physical_exprs_bag_equal;
27
28use arrow::array::*;
29use arrow::buffer::{BooleanBuffer, NullBuffer};
30use arrow::compute::kernels::boolean::{not, or_kleene};
31use arrow::compute::{SortOptions, take};
32use arrow::datatypes::*;
33use arrow::util::bit_iterator::BitIndexIterator;
34use datafusion_common::hash_utils::with_hashes;
35use datafusion_common::{
36 DFSchema, HashSet, Result, ScalarValue, assert_or_internal_err, exec_datafusion_err,
37 exec_err,
38};
39use datafusion_expr::{ColumnarValue, expr_vec_fmt};
40
41use ahash::RandomState;
42use datafusion_common::HashMap;
43use hashbrown::hash_map::RawEntryMut;
44
45trait StaticFilter {
47 fn null_count(&self) -> usize;
48
49 fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>;
51}
52
53pub struct InListExpr {
55 expr: Arc<dyn PhysicalExpr>,
56 list: Vec<Arc<dyn PhysicalExpr>>,
57 negated: bool,
58 static_filter: Option<Arc<dyn StaticFilter + Send + Sync>>,
59}
60
61impl Debug for InListExpr {
62 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
63 f.debug_struct("InListExpr")
64 .field("expr", &self.expr)
65 .field("list", &self.list)
66 .field("negated", &self.negated)
67 .finish()
68 }
69}
70
71#[derive(Debug, Clone)]
73struct ArrayStaticFilter {
74 in_array: ArrayRef,
75 state: RandomState,
76 map: HashMap<usize, (), ()>,
81}
82
83impl StaticFilter for ArrayStaticFilter {
84 fn null_count(&self) -> usize {
85 self.in_array.null_count()
86 }
87
88 fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
90 if v.data_type() == &DataType::Null
92 || self.in_array.data_type() == &DataType::Null
93 {
94 let nulls = NullBuffer::new_null(v.len());
95 return Ok(BooleanArray::new(
96 BooleanBuffer::new_unset(v.len()),
97 Some(nulls),
98 ));
99 }
100
101 downcast_dictionary_array! {
105 v => {
106 if v.values().data_type() == self.in_array.data_type() {
109 let values_contains = self.contains(v.values().as_ref(), negated)?;
110 let result = take(&values_contains, v.keys(), None)?;
111 return Ok(downcast_array(result.as_ref()));
112 }
113 }
114 _ => {}
115 }
116
117 let needle_nulls = v.logical_nulls();
118 let needle_nulls = needle_nulls.as_ref();
119 let haystack_has_nulls = self.in_array.null_count() != 0;
120
121 with_hashes([v], &self.state, |hashes| {
122 let cmp = make_comparator(v, &self.in_array, SortOptions::default())?;
123 Ok((0..v.len())
124 .map(|i| {
125 if needle_nulls.is_some_and(|nulls| nulls.is_null(i)) {
127 return None;
128 }
129
130 let hash = hashes[i];
131 let contains = self
132 .map
133 .raw_entry()
134 .from_hash(hash, |idx| cmp(i, *idx).is_eq())
135 .is_some();
136
137 match contains {
138 true => Some(!negated),
139 false if haystack_has_nulls => None,
140 false => Some(negated),
141 }
142 })
143 .collect())
144 })
145 }
146}
147
148fn instantiate_static_filter(
149 in_array: ArrayRef,
150) -> Result<Arc<dyn StaticFilter + Send + Sync>> {
151 match in_array.data_type() {
152 DataType::Int8 => Ok(Arc::new(Int8StaticFilter::try_new(&in_array)?)),
154 DataType::Int16 => Ok(Arc::new(Int16StaticFilter::try_new(&in_array)?)),
155 DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)),
156 DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)),
157 DataType::UInt8 => Ok(Arc::new(UInt8StaticFilter::try_new(&in_array)?)),
158 DataType::UInt16 => Ok(Arc::new(UInt16StaticFilter::try_new(&in_array)?)),
159 DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)),
160 DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)),
161 DataType::Float32 => Ok(Arc::new(Float32StaticFilter::try_new(&in_array)?)),
163 DataType::Float64 => Ok(Arc::new(Float64StaticFilter::try_new(&in_array)?)),
164 _ => {
165 Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?))
167 }
168 }
169}
170
171impl ArrayStaticFilter {
172 fn try_new(in_array: ArrayRef) -> Result<ArrayStaticFilter> {
179 if in_array.data_type() == &DataType::Null {
181 return Ok(ArrayStaticFilter {
182 in_array,
183 state: RandomState::new(),
184 map: HashMap::with_hasher(()),
185 });
186 }
187
188 let state = RandomState::new();
189 let mut map: HashMap<usize, (), ()> = HashMap::with_hasher(());
190
191 with_hashes([&in_array], &state, |hashes| -> Result<()> {
192 let cmp = make_comparator(&in_array, &in_array, SortOptions::default())?;
193
194 let insert_value = |idx| {
195 let hash = hashes[idx];
196 if let RawEntryMut::Vacant(v) = map
197 .raw_entry_mut()
198 .from_hash(hash, |x| cmp(*x, idx).is_eq())
199 {
200 v.insert_with_hasher(hash, idx, (), |x| hashes[*x]);
201 }
202 };
203
204 match in_array.nulls() {
205 Some(nulls) => {
206 BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len())
207 .for_each(insert_value)
208 }
209 None => (0..in_array.len()).for_each(insert_value),
210 }
211
212 Ok(())
213 })?;
214
215 Ok(Self {
216 in_array,
217 state,
218 map,
219 })
220 }
221}
222
223#[derive(Clone, Copy)]
226struct OrderedFloat32(f32);
227
228impl Hash for OrderedFloat32 {
229 fn hash<H: Hasher>(&self, state: &mut H) {
230 self.0.to_ne_bytes().hash(state);
231 }
232}
233
234impl PartialEq for OrderedFloat32 {
235 fn eq(&self, other: &Self) -> bool {
236 self.0.to_bits() == other.0.to_bits()
237 }
238}
239
240impl Eq for OrderedFloat32 {}
241
242impl From<f32> for OrderedFloat32 {
243 fn from(v: f32) -> Self {
244 Self(v)
245 }
246}
247
248#[derive(Clone, Copy)]
251struct OrderedFloat64(f64);
252
253impl Hash for OrderedFloat64 {
254 fn hash<H: Hasher>(&self, state: &mut H) {
255 self.0.to_ne_bytes().hash(state);
256 }
257}
258
259impl PartialEq for OrderedFloat64 {
260 fn eq(&self, other: &Self) -> bool {
261 self.0.to_bits() == other.0.to_bits()
262 }
263}
264
265impl Eq for OrderedFloat64 {}
266
267impl From<f64> for OrderedFloat64 {
268 fn from(v: f64) -> Self {
269 Self(v)
270 }
271}
272
273macro_rules! primitive_static_filter {
275 ($Name:ident, $ArrowType:ty) => {
276 struct $Name {
277 null_count: usize,
278 values: HashSet<<$ArrowType as ArrowPrimitiveType>::Native>,
279 }
280
281 impl $Name {
282 fn try_new(in_array: &ArrayRef) -> Result<Self> {
283 let in_array = in_array
284 .as_primitive_opt::<$ArrowType>()
285 .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
286
287 let mut values = HashSet::with_capacity(in_array.len());
288 let null_count = in_array.null_count();
289
290 for v in in_array.iter().flatten() {
291 values.insert(v);
292 }
293
294 Ok(Self { null_count, values })
295 }
296 }
297
298 impl StaticFilter for $Name {
299 fn null_count(&self) -> usize {
300 self.null_count
301 }
302
303 fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
304 downcast_dictionary_array! {
306 v => {
307 let values_contains = self.contains(v.values().as_ref(), negated)?;
308 let result = take(&values_contains, v.keys(), None)?;
309 return Ok(downcast_array(result.as_ref()))
310 }
311 _ => {}
312 }
313
314 let v = v
315 .as_primitive_opt::<$ArrowType>()
316 .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
317
318 let haystack_has_nulls = self.null_count > 0;
319
320 let needle_values = v.values();
321 let needle_nulls = v.nulls();
322 let needle_has_nulls = v.null_count() > 0;
323
324 let contains_buffer = if negated {
343 BooleanBuffer::collect_bool(needle_values.len(), |i| {
344 !self.values.contains(&needle_values[i])
345 })
346 } else {
347 BooleanBuffer::collect_bool(needle_values.len(), |i| {
348 self.values.contains(&needle_values[i])
349 })
350 };
351
352 let result_nulls = match (needle_has_nulls, haystack_has_nulls) {
357 (false, false) => {
358 None
360 }
361 (true, false) => {
362 needle_nulls.cloned()
364 }
365 (false, true) => {
366 let validity = if negated {
370 !&contains_buffer
371 } else {
372 contains_buffer.clone()
373 };
374 Some(NullBuffer::new(validity))
375 }
376 (true, true) => {
377 let needle_validity = needle_nulls.map(|n| n.inner().clone())
379 .unwrap_or_else(|| BooleanBuffer::new_set(needle_values.len()));
380
381 let haystack_validity = if negated {
383 !&contains_buffer
384 } else {
385 contains_buffer.clone()
386 };
387
388 let combined_validity = &needle_validity & &haystack_validity;
390 Some(NullBuffer::new(combined_validity))
391 }
392 };
393
394 Ok(BooleanArray::new(contains_buffer, result_nulls))
395 }
396 }
397 };
398}
399
400primitive_static_filter!(Int8StaticFilter, Int8Type);
402primitive_static_filter!(Int16StaticFilter, Int16Type);
403primitive_static_filter!(Int32StaticFilter, Int32Type);
404primitive_static_filter!(Int64StaticFilter, Int64Type);
405primitive_static_filter!(UInt8StaticFilter, UInt8Type);
406primitive_static_filter!(UInt16StaticFilter, UInt16Type);
407primitive_static_filter!(UInt32StaticFilter, UInt32Type);
408primitive_static_filter!(UInt64StaticFilter, UInt64Type);
409
410macro_rules! float_static_filter {
413 ($Name:ident, $ArrowType:ty, $OrderedType:ty) => {
414 struct $Name {
415 null_count: usize,
416 values: HashSet<$OrderedType>,
417 }
418
419 impl $Name {
420 fn try_new(in_array: &ArrayRef) -> Result<Self> {
421 let in_array = in_array
422 .as_primitive_opt::<$ArrowType>()
423 .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
424
425 let mut values = HashSet::with_capacity(in_array.len());
426 let null_count = in_array.null_count();
427
428 for v in in_array.iter().flatten() {
429 values.insert(<$OrderedType>::from(v));
430 }
431
432 Ok(Self { null_count, values })
433 }
434 }
435
436 impl StaticFilter for $Name {
437 fn null_count(&self) -> usize {
438 self.null_count
439 }
440
441 fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
442 downcast_dictionary_array! {
444 v => {
445 let values_contains = self.contains(v.values().as_ref(), negated)?;
446 let result = take(&values_contains, v.keys(), None)?;
447 return Ok(downcast_array(result.as_ref()))
448 }
449 _ => {}
450 }
451
452 let v = v
453 .as_primitive_opt::<$ArrowType>()
454 .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
455
456 let haystack_has_nulls = self.null_count > 0;
457
458 let needle_values = v.values();
459 let needle_nulls = v.nulls();
460 let needle_has_nulls = v.null_count() > 0;
461
462 let contains_buffer = if negated {
481 BooleanBuffer::collect_bool(needle_values.len(), |i| {
482 !self.values.contains(&<$OrderedType>::from(needle_values[i]))
483 })
484 } else {
485 BooleanBuffer::collect_bool(needle_values.len(), |i| {
486 self.values.contains(&<$OrderedType>::from(needle_values[i]))
487 })
488 };
489
490 let result_nulls = match (needle_has_nulls, haystack_has_nulls) {
495 (false, false) => {
496 None
498 }
499 (true, false) => {
500 needle_nulls.cloned()
502 }
503 (false, true) => {
504 let validity = if negated {
508 !&contains_buffer
509 } else {
510 contains_buffer.clone()
511 };
512 Some(NullBuffer::new(validity))
513 }
514 (true, true) => {
515 let needle_validity = needle_nulls.map(|n| n.inner().clone())
517 .unwrap_or_else(|| BooleanBuffer::new_set(needle_values.len()));
518
519 let haystack_validity = if negated {
521 !&contains_buffer
522 } else {
523 contains_buffer.clone()
524 };
525
526 let combined_validity = &needle_validity & &haystack_validity;
528 Some(NullBuffer::new(combined_validity))
529 }
530 };
531
532 Ok(BooleanArray::new(contains_buffer, result_nulls))
533 }
534 }
535 };
536}
537
538float_static_filter!(Float32StaticFilter, Float32Type, OrderedFloat32);
540float_static_filter!(Float64StaticFilter, Float64Type, OrderedFloat64);
541
542fn evaluate_list(
544 list: &[Arc<dyn PhysicalExpr>],
545 batch: &RecordBatch,
546) -> Result<ArrayRef> {
547 let scalars = list
548 .iter()
549 .map(|expr| {
550 expr.evaluate(batch).and_then(|r| match r {
551 ColumnarValue::Array(_) => {
552 exec_err!("InList expression must evaluate to a scalar")
553 }
554 ColumnarValue::Scalar(ScalarValue::Dictionary(_, v)) => Ok(*v),
556 ColumnarValue::Scalar(s) => Ok(s),
557 })
558 })
559 .collect::<Result<Vec<_>>>()?;
560
561 ScalarValue::iter_to_array(scalars)
562}
563
564fn try_evaluate_constant_list(
574 list: &[Arc<dyn PhysicalExpr>],
575 schema: &Schema,
576) -> Result<Option<ArrayRef>> {
577 let batch = RecordBatch::new_empty(Arc::new(schema.clone()));
578 match evaluate_list(list, &batch) {
579 Ok(array) => Ok(Some(array)),
580 Err(_) => {
581 Ok(None)
584 }
585 }
586}
587
588impl InListExpr {
589 fn new(
591 expr: Arc<dyn PhysicalExpr>,
592 list: Vec<Arc<dyn PhysicalExpr>>,
593 negated: bool,
594 static_filter: Option<Arc<dyn StaticFilter + Send + Sync>>,
595 ) -> Self {
596 Self {
597 expr,
598 list,
599 negated,
600 static_filter,
601 }
602 }
603
604 pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
606 &self.expr
607 }
608
609 pub fn list(&self) -> &[Arc<dyn PhysicalExpr>] {
611 &self.list
612 }
613
614 pub fn is_empty(&self) -> bool {
615 self.list.is_empty()
616 }
617
618 pub fn len(&self) -> usize {
619 self.list.len()
620 }
621
622 pub fn negated(&self) -> bool {
624 self.negated
625 }
626
627 pub fn try_new_from_array(
639 expr: Arc<dyn PhysicalExpr>,
640 array: ArrayRef,
641 negated: bool,
642 ) -> Result<Self> {
643 let list = (0..array.len())
644 .map(|i| {
645 let scalar = ScalarValue::try_from_array(array.as_ref(), i)?;
646 Ok(crate::expressions::lit(scalar) as Arc<dyn PhysicalExpr>)
647 })
648 .collect::<Result<Vec<_>>>()?;
649 Ok(Self::new(
650 expr,
651 list,
652 negated,
653 Some(instantiate_static_filter(array)?),
654 ))
655 }
656
657 pub fn try_new(
666 expr: Arc<dyn PhysicalExpr>,
667 list: Vec<Arc<dyn PhysicalExpr>>,
668 negated: bool,
669 schema: &Schema,
670 ) -> Result<Self> {
671 let expr_data_type = expr.data_type(schema)?;
673 for list_expr in list.iter() {
674 let list_expr_data_type = list_expr.data_type(schema)?;
675 assert_or_internal_err!(
676 DFSchema::datatype_is_logically_equal(
677 &expr_data_type,
678 &list_expr_data_type
679 ),
680 "The data type inlist should be same, the value type is {expr_data_type}, one of list expr type is {list_expr_data_type}"
681 );
682 }
683
684 let static_filter = match try_evaluate_constant_list(&list, schema)? {
686 Some(in_array) => Some(instantiate_static_filter(in_array)?),
687 None => None, };
689
690 Ok(Self::new(expr, list, negated, static_filter))
691 }
692}
693impl std::fmt::Display for InListExpr {
694 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
695 let list = expr_vec_fmt!(self.list);
696
697 if self.negated {
698 if self.static_filter.is_some() {
699 write!(f, "{} NOT IN (SET) ([{list}])", self.expr)
700 } else {
701 write!(f, "{} NOT IN ([{list}])", self.expr)
702 }
703 } else if self.static_filter.is_some() {
704 write!(f, "{} IN (SET) ([{list}])", self.expr)
705 } else {
706 write!(f, "{} IN ([{list}])", self.expr)
707 }
708 }
709}
710
711impl PhysicalExpr for InListExpr {
712 fn as_any(&self) -> &dyn Any {
714 self
715 }
716
717 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
718 Ok(DataType::Boolean)
719 }
720
721 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
722 if self.expr.nullable(input_schema)? {
723 return Ok(true);
724 }
725
726 if let Some(static_filter) = &self.static_filter {
727 Ok(static_filter.null_count() > 0)
728 } else {
729 for expr in &self.list {
730 if expr.nullable(input_schema)? {
731 return Ok(true);
732 }
733 }
734 Ok(false)
735 }
736 }
737
738 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
739 let num_rows = batch.num_rows();
740 let value = self.expr.evaluate(batch)?;
741 let r = match &self.static_filter {
742 Some(filter) => {
743 match value {
744 ColumnarValue::Array(array) => {
745 filter.contains(&array, self.negated)?
746 }
747 ColumnarValue::Scalar(scalar) => {
748 if scalar.is_null() {
749 let nulls = NullBuffer::new_null(num_rows);
752 return Ok(ColumnarValue::Array(Arc::new(
753 BooleanArray::new(
754 BooleanBuffer::new_unset(num_rows),
755 Some(nulls),
756 ),
757 )));
758 }
759 let array = scalar.to_array()?;
762 let result_array =
763 filter.contains(array.as_ref(), self.negated)?;
764 if result_array.is_null(0) {
767 let nulls = NullBuffer::new_null(num_rows);
768 BooleanArray::new(
769 BooleanBuffer::new_unset(num_rows),
770 Some(nulls),
771 )
772 } else if result_array.value(0) {
773 BooleanArray::new(BooleanBuffer::new_set(num_rows), None)
774 } else {
775 BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)
776 }
777 }
778 }
779 }
780 None => {
781 let value = value.into_array(num_rows)?;
783 let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold(
784 BooleanArray::new(BooleanBuffer::new_unset(num_rows), None),
785 |result, expr| -> Result<BooleanArray> {
786 let rhs = match expr? {
787 ColumnarValue::Array(array) => {
788 let cmp = make_comparator(
789 value.as_ref(),
790 array.as_ref(),
791 SortOptions::default(),
792 )?;
793 (0..num_rows)
794 .map(|i| {
795 if value.is_null(i) || array.is_null(i) {
796 return None;
797 }
798 Some(cmp(i, i).is_eq())
799 })
800 .collect::<BooleanArray>()
801 }
802 ColumnarValue::Scalar(scalar) => {
803 if scalar.is_null() {
805 BooleanArray::from(vec![None; num_rows])
807 } else {
808 let array = scalar.to_array()?;
810 let cmp = make_comparator(
811 value.as_ref(),
812 array.as_ref(),
813 SortOptions::default(),
814 )?;
815 (0..num_rows)
817 .map(|i| {
818 if value.is_null(i) {
819 None
820 } else {
821 Some(cmp(i, 0).is_eq())
822 }
823 })
824 .collect::<BooleanArray>()
825 }
826 }
827 };
828 Ok(or_kleene(&result, &rhs)?)
829 },
830 )?;
831
832 if self.negated { not(&found)? } else { found }
833 }
834 };
835 Ok(ColumnarValue::Array(Arc::new(r)))
836 }
837
838 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
839 let mut children = vec![&self.expr];
840 children.extend(&self.list);
841 children
842 }
843
844 fn with_new_children(
845 self: Arc<Self>,
846 children: Vec<Arc<dyn PhysicalExpr>>,
847 ) -> Result<Arc<dyn PhysicalExpr>> {
848 Ok(Arc::new(InListExpr::new(
850 Arc::clone(&children[0]),
851 children[1..].to_vec(),
852 self.negated,
853 self.static_filter.as_ref().map(Arc::clone),
854 )))
855 }
856
857 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
858 self.expr.fmt_sql(f)?;
859 if self.negated {
860 write!(f, " NOT")?;
861 }
862
863 write!(f, " IN (")?;
864 for (i, expr) in self.list.iter().enumerate() {
865 if i > 0 {
866 write!(f, ", ")?;
867 }
868 expr.fmt_sql(f)?;
869 }
870 write!(f, ")")
871 }
872}
873
874impl PartialEq for InListExpr {
875 fn eq(&self, other: &Self) -> bool {
876 self.expr.eq(&other.expr)
877 && physical_exprs_bag_equal(&self.list, &other.list)
878 && self.negated == other.negated
879 }
880}
881
882impl Eq for InListExpr {}
883
884impl Hash for InListExpr {
885 fn hash<H: Hasher>(&self, state: &mut H) {
886 self.expr.hash(state);
887 self.negated.hash(state);
888 self.list.hash(state);
890 }
891}
892
893pub fn in_list(
895 expr: Arc<dyn PhysicalExpr>,
896 list: Vec<Arc<dyn PhysicalExpr>>,
897 negated: &bool,
898 schema: &Schema,
899) -> Result<Arc<dyn PhysicalExpr>> {
900 Ok(Arc::new(InListExpr::try_new(expr, list, *negated, schema)?))
901}
902
903#[cfg(test)]
904mod tests {
905 use super::*;
906 use crate::expressions::{col, lit, try_cast};
907 use arrow::buffer::NullBuffer;
908 use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano, i256};
909 use datafusion_common::plan_err;
910 use datafusion_expr::type_coercion::binary::comparison_coercion;
911 use datafusion_physical_expr_common::physical_expr::fmt_sql;
912 use insta::assert_snapshot;
913 use itertools::Itertools;
914
915 type InListCastResult = (Arc<dyn PhysicalExpr>, Vec<Arc<dyn PhysicalExpr>>);
916
917 fn in_list_cast(
920 expr: Arc<dyn PhysicalExpr>,
921 list: Vec<Arc<dyn PhysicalExpr>>,
922 input_schema: &Schema,
923 ) -> Result<InListCastResult> {
924 let expr_type = &expr.data_type(input_schema)?;
925 let list_types: Vec<DataType> = list
926 .iter()
927 .map(|list_expr| list_expr.data_type(input_schema).unwrap())
928 .collect();
929 let result_type = get_coerce_type(expr_type, &list_types);
930 match result_type {
931 None => plan_err!(
932 "Can not find compatible types to compare {expr_type} with [{}]",
933 list_types.iter().join(", ")
934 ),
935 Some(data_type) => {
936 let cast_expr = try_cast(expr, input_schema, data_type.clone())?;
938 let cast_list_expr = list
939 .into_iter()
940 .map(|list_expr| {
941 try_cast(list_expr, input_schema, data_type.clone()).unwrap()
942 })
943 .collect();
944 Ok((cast_expr, cast_list_expr))
945 }
946 }
947 }
948
949 fn get_coerce_type(expr_type: &DataType, list_type: &[DataType]) -> Option<DataType> {
952 list_type
953 .iter()
954 .try_fold(expr_type.clone(), |left_type, right_type| {
955 comparison_coercion(&left_type, right_type)
956 })
957 }
958
959 macro_rules! in_list {
972 ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{
973 let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
974 in_list_raw!(
975 $BATCH,
976 cast_list_exprs,
977 $NEGATED,
978 $EXPECTED,
979 cast_expr,
980 $SCHEMA
981 );
982 }};
983 }
984
985 macro_rules! in_list_raw {
999 ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{
1000 let col_expr = $COL;
1001 let expr = in_list(Arc::clone(&col_expr), $LIST, $NEGATED, $SCHEMA).unwrap();
1002 let result = expr
1003 .evaluate(&$BATCH)?
1004 .into_array($BATCH.num_rows())
1005 .expect("Failed to convert to array");
1006 let result = as_boolean_array(&result);
1007 let expected = &BooleanArray::from($EXPECTED);
1008 assert_eq!(
1009 expected,
1010 result,
1011 "Failed for: {}\n{}: {:?}",
1012 fmt_sql(expr.as_ref()),
1013 fmt_sql(col_expr.as_ref()),
1014 col_expr
1015 .evaluate(&$BATCH)?
1016 .into_array($BATCH.num_rows())
1017 .unwrap()
1018 );
1019 }};
1020 }
1021
1022 struct InListPrimitiveTestCase {
1031 name: &'static str,
1032 value_in: ScalarValue,
1033 value_not_in: ScalarValue,
1034 other_list_values: Vec<ScalarValue>,
1035 null_value: Option<ScalarValue>,
1036 }
1037
1038 #[derive(Clone)]
1043 struct PrimitiveTestCaseData<T> {
1044 value_in: T,
1045 value_not_in: T,
1046 other_list_values: Vec<T>,
1047 }
1048
1049 fn primitive_test_case<T, D, F>(
1055 name: &'static str,
1056 constructor: F,
1057 data: PrimitiveTestCaseData<D>,
1058 ) -> InListPrimitiveTestCase
1059 where
1060 D: TryInto<T> + Clone,
1061 <D as TryInto<T>>::Error: Debug,
1062 F: Fn(Option<T>) -> ScalarValue,
1063 T: Clone,
1064 {
1065 InListPrimitiveTestCase {
1066 name,
1067 value_in: constructor(Some(data.value_in.try_into().unwrap())),
1068 value_not_in: constructor(Some(data.value_not_in.try_into().unwrap())),
1069 other_list_values: data
1070 .other_list_values
1071 .into_iter()
1072 .map(|v| constructor(Some(v.try_into().unwrap())))
1073 .collect(),
1074 null_value: Some(constructor(None)),
1075 }
1076 }
1077
1078 fn primitive_test_case_no_nulls<T, D, F>(
1081 name: &'static str,
1082 constructor: F,
1083 data: PrimitiveTestCaseData<D>,
1084 ) -> InListPrimitiveTestCase
1085 where
1086 D: TryInto<T> + Clone,
1087 <D as TryInto<T>>::Error: Debug,
1088 F: Fn(Option<T>) -> ScalarValue,
1089 T: Clone,
1090 {
1091 InListPrimitiveTestCase {
1092 name,
1093 value_in: constructor(Some(data.value_in.try_into().unwrap())),
1094 value_not_in: constructor(Some(data.value_not_in.try_into().unwrap())),
1095 other_list_values: data
1096 .other_list_values
1097 .into_iter()
1098 .map(|v| constructor(Some(v.try_into().unwrap())))
1099 .collect(),
1100 null_value: None,
1101 }
1102 }
1103
1104 fn run_test_cases(test_cases: Vec<InListPrimitiveTestCase>) -> Result<()> {
1110 for test_case in test_cases {
1111 let test_name = test_case.name;
1112
1113 let data_type = test_case.value_in.data_type();
1115
1116 let build_base_list = || -> Vec<Arc<dyn PhysicalExpr>> {
1118 let mut list = vec![lit(test_case.value_in.clone())];
1119 list.extend(test_case.other_list_values.iter().map(|v| lit(v.clone())));
1120 list
1121 };
1122
1123 match &test_case.null_value {
1124 Some(null_val) => {
1125 let schema =
1127 Schema::new(vec![Field::new("a", data_type.clone(), true)]);
1128
1129 let array = ScalarValue::iter_to_array(vec![
1131 test_case.value_in.clone(),
1132 test_case.value_not_in.clone(),
1133 null_val.clone(),
1134 ])?;
1135
1136 let col_a = col("a", &schema)?;
1137 let batch = RecordBatch::try_new(
1138 Arc::new(schema.clone()),
1139 vec![Arc::clone(&array)],
1140 )?;
1141
1142 let list = build_base_list();
1144 in_list!(
1145 batch,
1146 list,
1147 &false,
1148 vec![Some(true), Some(false), None],
1149 Arc::clone(&col_a),
1150 &schema
1151 );
1152
1153 let list = build_base_list();
1155 in_list!(
1156 batch,
1157 list,
1158 &true,
1159 vec![Some(false), Some(true), None],
1160 Arc::clone(&col_a),
1161 &schema
1162 );
1163
1164 let mut list = build_base_list();
1166 list.push(lit(null_val.clone()));
1167 in_list!(
1168 batch,
1169 list,
1170 &false,
1171 vec![Some(true), None, None],
1172 Arc::clone(&col_a),
1173 &schema
1174 );
1175
1176 let mut list = build_base_list();
1178 list.push(lit(null_val.clone()));
1179 in_list!(
1180 batch,
1181 list,
1182 &true,
1183 vec![Some(false), None, None],
1184 Arc::clone(&col_a),
1185 &schema
1186 );
1187 }
1188 None => {
1189 let schema =
1191 Schema::new(vec![Field::new("a", data_type.clone(), false)]);
1192
1193 let array = ScalarValue::iter_to_array(vec![
1195 test_case.value_in.clone(),
1196 test_case.value_not_in.clone(),
1197 ])?;
1198
1199 let col_a = col("a", &schema)?;
1200 let batch = RecordBatch::try_new(
1201 Arc::new(schema.clone()),
1202 vec![Arc::clone(&array)],
1203 )?;
1204
1205 let list = build_base_list();
1207 in_list!(
1208 batch,
1209 list,
1210 &false,
1211 vec![Some(true), Some(false)],
1212 Arc::clone(&col_a),
1213 &schema
1214 );
1215
1216 let list = build_base_list();
1218 in_list!(
1219 batch,
1220 list,
1221 &true,
1222 vec![Some(false), Some(true)],
1223 Arc::clone(&col_a),
1224 &schema
1225 );
1226
1227 eprintln!(
1228 "Test '{test_name}': exercised (false, true) branch (no nulls, negated)",
1229 );
1230 }
1231 }
1232 }
1233
1234 Ok(())
1235 }
1236
1237 #[test]
1241 fn in_list_int_types() -> Result<()> {
1242 let int_data = PrimitiveTestCaseData {
1243 value_in: 0,
1244 value_not_in: 2,
1245 other_list_values: vec![1, 3, 5],
1246 };
1247
1248 run_test_cases(vec![
1249 primitive_test_case("int8", ScalarValue::Int8, int_data.clone()),
1251 primitive_test_case("int16", ScalarValue::Int16, int_data.clone()),
1252 primitive_test_case("int32", ScalarValue::Int32, int_data.clone()),
1253 primitive_test_case("int64", ScalarValue::Int64, int_data.clone()),
1254 primitive_test_case("uint8", ScalarValue::UInt8, int_data.clone()),
1255 primitive_test_case("uint16", ScalarValue::UInt16, int_data.clone()),
1256 primitive_test_case("uint32", ScalarValue::UInt32, int_data.clone()),
1257 primitive_test_case("uint64", ScalarValue::UInt64, int_data.clone()),
1258 primitive_test_case_no_nulls("int32_no_nulls", ScalarValue::Int32, int_data),
1260 ])
1261 }
1262
1263 #[test]
1267 fn in_list_string_types() -> Result<()> {
1268 let string_data = PrimitiveTestCaseData {
1269 value_in: "a",
1270 value_not_in: "d",
1271 other_list_values: vec!["b", "c"],
1272 };
1273
1274 run_test_cases(vec![
1275 primitive_test_case("utf8", ScalarValue::Utf8, string_data.clone()),
1276 primitive_test_case(
1277 "large_utf8",
1278 ScalarValue::LargeUtf8,
1279 string_data.clone(),
1280 ),
1281 primitive_test_case("utf8_view", ScalarValue::Utf8View, string_data),
1282 ])
1283 }
1284
1285 #[test]
1289 fn in_list_binary_types() -> Result<()> {
1290 let binary_data = PrimitiveTestCaseData {
1291 value_in: vec![1_u8, 2, 3],
1292 value_not_in: vec![1_u8, 2, 2],
1293 other_list_values: vec![vec![4_u8, 5, 6], vec![7_u8, 8, 9]],
1294 };
1295
1296 run_test_cases(vec![
1297 primitive_test_case("binary", ScalarValue::Binary, binary_data.clone()),
1298 primitive_test_case(
1299 "large_binary",
1300 ScalarValue::LargeBinary,
1301 binary_data.clone(),
1302 ),
1303 primitive_test_case("binary_view", ScalarValue::BinaryView, binary_data),
1304 ])
1305 }
1306
1307 #[test]
1311 fn in_list_date_types() -> Result<()> {
1312 let date_data = PrimitiveTestCaseData {
1313 value_in: 0,
1314 value_not_in: 2,
1315 other_list_values: vec![1, 3],
1316 };
1317
1318 run_test_cases(vec![
1319 primitive_test_case("date32", ScalarValue::Date32, date_data.clone()),
1320 primitive_test_case("date64", ScalarValue::Date64, date_data),
1321 ])
1322 }
1323
1324 #[test]
1328 fn in_list_decimal() -> Result<()> {
1329 run_test_cases(vec![InListPrimitiveTestCase {
1330 name: "decimal128",
1331 value_in: ScalarValue::Decimal128(Some(0), 10, 2),
1332 value_not_in: ScalarValue::Decimal128(Some(200), 10, 2),
1333 other_list_values: vec![
1334 ScalarValue::Decimal128(Some(100), 10, 2),
1335 ScalarValue::Decimal128(Some(300), 10, 2),
1336 ],
1337 null_value: Some(ScalarValue::Decimal128(None, 10, 2)),
1338 }])
1339 }
1340
1341 #[test]
1345 fn in_list_timestamp_types() -> Result<()> {
1346 run_test_cases(vec![
1347 InListPrimitiveTestCase {
1348 name: "timestamp_nanosecond",
1349 value_in: ScalarValue::TimestampNanosecond(Some(0), None),
1350 value_not_in: ScalarValue::TimestampNanosecond(Some(2000), None),
1351 other_list_values: vec![
1352 ScalarValue::TimestampNanosecond(Some(1000), None),
1353 ScalarValue::TimestampNanosecond(Some(3000), None),
1354 ],
1355 null_value: Some(ScalarValue::TimestampNanosecond(None, None)),
1356 },
1357 InListPrimitiveTestCase {
1358 name: "timestamp_millisecond_with_tz",
1359 value_in: ScalarValue::TimestampMillisecond(
1360 Some(1500000),
1361 Some("+05:00".into()),
1362 ),
1363 value_not_in: ScalarValue::TimestampMillisecond(
1364 Some(2500000),
1365 Some("+05:00".into()),
1366 ),
1367 other_list_values: vec![ScalarValue::TimestampMillisecond(
1368 Some(3500000),
1369 Some("+05:00".into()),
1370 )],
1371 null_value: Some(ScalarValue::TimestampMillisecond(
1372 None,
1373 Some("+05:00".into()),
1374 )),
1375 },
1376 InListPrimitiveTestCase {
1377 name: "timestamp_millisecond_mixed_tz",
1378 value_in: ScalarValue::TimestampMillisecond(
1379 Some(1500000),
1380 Some("+05:00".into()),
1381 ),
1382 value_not_in: ScalarValue::TimestampMillisecond(
1383 Some(2500000),
1384 Some("+05:00".into()),
1385 ),
1386 other_list_values: vec![
1387 ScalarValue::TimestampMillisecond(
1388 Some(3500000),
1389 Some("+01:00".into()),
1390 ),
1391 ScalarValue::TimestampMillisecond(Some(4500000), Some("UTC".into())),
1392 ],
1393 null_value: Some(ScalarValue::TimestampMillisecond(
1394 None,
1395 Some("+05:00".into()),
1396 )),
1397 },
1398 ])
1399 }
1400
1401 #[test]
1402 fn in_list_float64() -> Result<()> {
1403 let schema = Schema::new(vec![Field::new("a", DataType::Float64, true)]);
1404 let a = Float64Array::from(vec![
1405 Some(0.0),
1406 Some(0.2),
1407 None,
1408 Some(f64::NAN),
1409 Some(-f64::NAN),
1410 ]);
1411 let col_a = col("a", &schema)?;
1412 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1413
1414 let list = vec![lit(0.0f64), lit(0.1f64)];
1416 in_list!(
1417 batch,
1418 list,
1419 &false,
1420 vec![Some(true), Some(false), None, Some(false), Some(false)],
1421 Arc::clone(&col_a),
1422 &schema
1423 );
1424
1425 let list = vec![lit(0.0f64), lit(0.1f64)];
1427 in_list!(
1428 batch,
1429 list,
1430 &true,
1431 vec![Some(false), Some(true), None, Some(true), Some(true)],
1432 Arc::clone(&col_a),
1433 &schema
1434 );
1435
1436 let list = vec![lit(0.0f64), lit(0.1f64), lit(ScalarValue::Null)];
1438 in_list!(
1439 batch,
1440 list,
1441 &false,
1442 vec![Some(true), None, None, None, None],
1443 Arc::clone(&col_a),
1444 &schema
1445 );
1446
1447 let list = vec![lit(0.0f64), lit(0.1f64), lit(ScalarValue::Null)];
1449 in_list!(
1450 batch,
1451 list,
1452 &true,
1453 vec![Some(false), None, None, None, None],
1454 Arc::clone(&col_a),
1455 &schema
1456 );
1457
1458 let list = vec![lit(0.0f64), lit(0.1f64), lit(f64::NAN)];
1460 in_list!(
1461 batch,
1462 list,
1463 &false,
1464 vec![Some(true), Some(false), None, Some(true), Some(false)],
1465 Arc::clone(&col_a),
1466 &schema
1467 );
1468
1469 let list = vec![lit(0.0f64), lit(0.1f64), lit(f64::NAN)];
1471 in_list!(
1472 batch,
1473 list,
1474 &true,
1475 vec![Some(false), Some(true), None, Some(false), Some(true)],
1476 Arc::clone(&col_a),
1477 &schema
1478 );
1479
1480 let list = vec![lit(0.0f64), lit(0.1f64), lit(-f64::NAN)];
1482 in_list!(
1483 batch,
1484 list,
1485 &false,
1486 vec![Some(true), Some(false), None, Some(false), Some(true)],
1487 Arc::clone(&col_a),
1488 &schema
1489 );
1490
1491 let list = vec![lit(0.0f64), lit(0.1f64), lit(-f64::NAN)];
1493 in_list!(
1494 batch,
1495 list,
1496 &true,
1497 vec![Some(false), Some(true), None, Some(true), Some(false)],
1498 Arc::clone(&col_a),
1499 &schema
1500 );
1501
1502 Ok(())
1503 }
1504
1505 #[test]
1506 fn in_list_bool() -> Result<()> {
1507 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
1508 let a = BooleanArray::from(vec![Some(true), None]);
1509 let col_a = col("a", &schema)?;
1510 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1511
1512 let list = vec![lit(true)];
1514 in_list!(
1515 batch,
1516 list,
1517 &false,
1518 vec![Some(true), None],
1519 Arc::clone(&col_a),
1520 &schema
1521 );
1522
1523 let list = vec![lit(true)];
1525 in_list!(
1526 batch,
1527 list,
1528 &true,
1529 vec![Some(false), None],
1530 Arc::clone(&col_a),
1531 &schema
1532 );
1533
1534 let list = vec![lit(true), lit(ScalarValue::Null)];
1536 in_list!(
1537 batch,
1538 list,
1539 &false,
1540 vec![Some(true), None],
1541 Arc::clone(&col_a),
1542 &schema
1543 );
1544
1545 let list = vec![lit(true), lit(ScalarValue::Null)];
1547 in_list!(
1548 batch,
1549 list,
1550 &true,
1551 vec![Some(false), None],
1552 Arc::clone(&col_a),
1553 &schema
1554 );
1555
1556 Ok(())
1557 }
1558
1559 macro_rules! test_nullable {
1560 ($COL:expr, $LIST:expr, $SCHEMA:expr, $EXPECTED:expr) => {{
1561 let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
1562 let expr = in_list(cast_expr, cast_list_exprs, &false, $SCHEMA).unwrap();
1563 let result = expr.nullable($SCHEMA)?;
1564 assert_eq!($EXPECTED, result);
1565 }};
1566 }
1567
1568 #[test]
1569 fn in_list_nullable() -> Result<()> {
1570 let schema = Schema::new(vec![
1571 Field::new("c1_nullable", DataType::Int64, true),
1572 Field::new("c2_non_nullable", DataType::Int64, false),
1573 ]);
1574
1575 let c1_nullable = col("c1_nullable", &schema)?;
1576 let c2_non_nullable = col("c2_non_nullable", &schema)?;
1577
1578 let list = vec![lit(1_i64), lit(2_i64)];
1580 test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1581 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, false);
1582
1583 let list = vec![lit(1_i64), lit(2_i64), lit(ScalarValue::Null)];
1585 test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1586 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, true);
1587
1588 let list = vec![Arc::clone(&c1_nullable)];
1589 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, true);
1590
1591 let list = vec![Arc::clone(&c2_non_nullable)];
1592 test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1593
1594 let list = vec![Arc::clone(&c2_non_nullable), Arc::clone(&c2_non_nullable)];
1595 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, false);
1596
1597 Ok(())
1598 }
1599
1600 #[test]
1601 fn in_list_no_cols() -> Result<()> {
1602 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1604 let a = Int32Array::from(vec![Some(1), Some(2), None]);
1605 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1606
1607 let list = vec![lit(ScalarValue::from(1i32)), lit(ScalarValue::from(6i32))];
1608
1609 let expr = lit(ScalarValue::Int32(Some(1)));
1611 in_list!(
1612 batch,
1613 list.clone(),
1614 &false,
1615 vec![Some(true), Some(true), Some(true)],
1617 expr,
1618 &schema
1619 );
1620
1621 let expr = lit(ScalarValue::Int32(Some(2)));
1623 in_list!(
1624 batch,
1625 list.clone(),
1626 &false,
1627 vec![Some(false), Some(false), Some(false)],
1629 expr,
1630 &schema
1631 );
1632
1633 let expr = lit(ScalarValue::Int32(None));
1635 in_list!(
1636 batch,
1637 list.clone(),
1638 &false,
1639 vec![None, None, None],
1641 expr,
1642 &schema
1643 );
1644
1645 Ok(())
1646 }
1647
1648 #[test]
1649 fn in_list_utf8_with_dict_types() -> Result<()> {
1650 fn dict_lit(key_type: DataType, value: &str) -> Arc<dyn PhysicalExpr> {
1651 lit(ScalarValue::Dictionary(
1652 Box::new(key_type),
1653 Box::new(ScalarValue::new_utf8(value.to_string())),
1654 ))
1655 }
1656
1657 fn null_dict_lit(key_type: DataType) -> Arc<dyn PhysicalExpr> {
1658 lit(ScalarValue::Dictionary(
1659 Box::new(key_type),
1660 Box::new(ScalarValue::Utf8(None)),
1661 ))
1662 }
1663
1664 let schema = Schema::new(vec![Field::new(
1665 "a",
1666 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
1667 true,
1668 )]);
1669 let a: UInt16DictionaryArray =
1670 vec![Some("a"), Some("d"), None].into_iter().collect();
1671 let col_a = col("a", &schema)?;
1672 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1673
1674 let lists = [
1676 vec![lit("a"), lit("b")],
1677 vec![
1678 dict_lit(DataType::Int8, "a"),
1679 dict_lit(DataType::UInt16, "b"),
1680 ],
1681 ];
1682 for list in lists.iter() {
1683 in_list_raw!(
1684 batch,
1685 list.clone(),
1686 &false,
1687 vec![Some(true), Some(false), None],
1688 Arc::clone(&col_a),
1689 &schema
1690 );
1691 }
1692
1693 for list in lists.iter() {
1695 in_list_raw!(
1696 batch,
1697 list.clone(),
1698 &true,
1699 vec![Some(false), Some(true), None],
1700 Arc::clone(&col_a),
1701 &schema
1702 );
1703 }
1704
1705 let lists = [
1707 vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
1708 vec![
1709 dict_lit(DataType::Int8, "a"),
1710 dict_lit(DataType::UInt16, "b"),
1711 null_dict_lit(DataType::UInt16),
1712 ],
1713 ];
1714 for list in lists.iter() {
1715 in_list_raw!(
1716 batch,
1717 list.clone(),
1718 &false,
1719 vec![Some(true), None, None],
1720 Arc::clone(&col_a),
1721 &schema
1722 );
1723 }
1724
1725 for list in lists.iter() {
1727 in_list_raw!(
1728 batch,
1729 list.clone(),
1730 &true,
1731 vec![Some(false), None, None],
1732 Arc::clone(&col_a),
1733 &schema
1734 );
1735 }
1736
1737 Ok(())
1738 }
1739
1740 #[test]
1741 fn test_fmt_sql_1() -> Result<()> {
1742 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1743 let col_a = col("a", &schema)?;
1744
1745 let list = vec![lit("a"), lit("b")];
1747 let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?;
1748 let sql_string = fmt_sql(expr.as_ref()).to_string();
1749 let display_string = expr.to_string();
1750 assert_snapshot!(sql_string, @"a IN (a, b)");
1751 assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b])");
1752 Ok(())
1753 }
1754
1755 #[test]
1756 fn test_fmt_sql_2() -> Result<()> {
1757 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1758 let col_a = col("a", &schema)?;
1759
1760 let list = vec![lit("a"), lit("b")];
1762 let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?;
1763 let sql_string = fmt_sql(expr.as_ref()).to_string();
1764 let display_string = expr.to_string();
1765
1766 assert_snapshot!(sql_string, @"a NOT IN (a, b)");
1767 assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b])");
1768 Ok(())
1769 }
1770
1771 #[test]
1772 fn test_fmt_sql_3() -> Result<()> {
1773 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1774 let col_a = col("a", &schema)?;
1775 let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))];
1777 let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?;
1778 let sql_string = fmt_sql(expr.as_ref()).to_string();
1779 let display_string = expr.to_string();
1780
1781 assert_snapshot!(sql_string, @"a IN (a, b, NULL)");
1782 assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b, NULL])");
1783 Ok(())
1784 }
1785
1786 #[test]
1787 fn test_fmt_sql_4() -> Result<()> {
1788 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1789 let col_a = col("a", &schema)?;
1790 let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))];
1792 let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?;
1793 let sql_string = fmt_sql(expr.as_ref()).to_string();
1794 let display_string = expr.to_string();
1795 assert_snapshot!(sql_string, @"a NOT IN (a, b, NULL)");
1796 assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b, NULL])");
1797 Ok(())
1798 }
1799
1800 #[test]
1801 fn in_list_struct() -> Result<()> {
1802 let struct_fields = Fields::from(vec![
1804 Field::new("x", DataType::Int32, false),
1805 Field::new("y", DataType::Utf8, false),
1806 ]);
1807 let schema = Schema::new(vec![Field::new(
1808 "a",
1809 DataType::Struct(struct_fields.clone()),
1810 true,
1811 )]);
1812
1813 let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
1815 let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1816 let struct_array =
1817 StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
1818
1819 let col_a = col("a", &schema)?;
1820 let batch =
1821 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1822
1823 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1826 struct_fields.clone(),
1827 vec![
1828 Arc::new(Int32Array::from(vec![1])),
1829 Arc::new(StringArray::from(vec!["a"])),
1830 ],
1831 None,
1832 )));
1833
1834 let struct3 = ScalarValue::Struct(Arc::new(StructArray::new(
1836 struct_fields.clone(),
1837 vec![
1838 Arc::new(Int32Array::from(vec![3])),
1839 Arc::new(StringArray::from(vec!["c"])),
1840 ],
1841 None,
1842 )));
1843
1844 let list = vec![lit(struct1.clone()), lit(struct3.clone())];
1846 in_list_raw!(
1847 batch,
1848 list.clone(),
1849 &false,
1850 vec![Some(true), Some(false), Some(true)],
1851 Arc::clone(&col_a),
1852 &schema
1853 );
1854
1855 in_list_raw!(
1857 batch,
1858 list,
1859 &true,
1860 vec![Some(false), Some(true), Some(false)],
1861 Arc::clone(&col_a),
1862 &schema
1863 );
1864
1865 Ok(())
1866 }
1867
1868 #[test]
1869 fn in_list_struct_with_nulls() -> Result<()> {
1870 let struct_fields = Fields::from(vec![
1872 Field::new("x", DataType::Int32, false),
1873 Field::new("y", DataType::Utf8, false),
1874 ]);
1875 let schema = Schema::new(vec![Field::new(
1876 "a",
1877 DataType::Struct(struct_fields.clone()),
1878 true,
1879 )]);
1880
1881 let x_array = Arc::new(Int32Array::from(vec![1, 2]));
1883 let y_array = Arc::new(StringArray::from(vec!["a", "b"]));
1884 let struct_array = StructArray::new(
1885 struct_fields.clone(),
1886 vec![x_array, y_array],
1887 Some(NullBuffer::from(vec![true, false])),
1888 );
1889
1890 let col_a = col("a", &schema)?;
1891 let batch =
1892 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1893
1894 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1896 struct_fields.clone(),
1897 vec![
1898 Arc::new(Int32Array::from(vec![1])),
1899 Arc::new(StringArray::from(vec!["a"])),
1900 ],
1901 None,
1902 )));
1903
1904 let list = vec![lit(struct1.clone())];
1906 in_list_raw!(
1907 batch,
1908 list.clone(),
1909 &false,
1910 vec![Some(true), None],
1911 Arc::clone(&col_a),
1912 &schema
1913 );
1914
1915 in_list_raw!(
1917 batch,
1918 list,
1919 &true,
1920 vec![Some(false), None],
1921 Arc::clone(&col_a),
1922 &schema
1923 );
1924
1925 Ok(())
1926 }
1927
1928 #[test]
1929 fn in_list_struct_with_null_in_list() -> Result<()> {
1930 let struct_fields = Fields::from(vec![
1932 Field::new("x", DataType::Int32, false),
1933 Field::new("y", DataType::Utf8, false),
1934 ]);
1935 let schema = Schema::new(vec![Field::new(
1936 "a",
1937 DataType::Struct(struct_fields.clone()),
1938 true,
1939 )]);
1940
1941 let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
1943 let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1944 let struct_array =
1945 StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
1946
1947 let col_a = col("a", &schema)?;
1948 let batch =
1949 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1950
1951 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1953 struct_fields.clone(),
1954 vec![
1955 Arc::new(Int32Array::from(vec![1])),
1956 Arc::new(StringArray::from(vec!["a"])),
1957 ],
1958 None,
1959 )));
1960
1961 let null_struct = ScalarValue::Struct(Arc::new(StructArray::new_null(
1962 struct_fields.clone(),
1963 1,
1964 )));
1965
1966 let list = vec![lit(struct1), lit(null_struct.clone())];
1968 in_list_raw!(
1969 batch,
1970 list.clone(),
1971 &false,
1972 vec![Some(true), None, None],
1973 Arc::clone(&col_a),
1974 &schema
1975 );
1976
1977 in_list_raw!(
1979 batch,
1980 list,
1981 &true,
1982 vec![Some(false), None, None],
1983 Arc::clone(&col_a),
1984 &schema
1985 );
1986
1987 Ok(())
1988 }
1989
1990 #[test]
1991 fn in_list_nested_struct() -> Result<()> {
1992 let inner_struct_fields = Fields::from(vec![
1994 Field::new("a", DataType::Int32, false),
1995 Field::new("b", DataType::Utf8, false),
1996 ]);
1997 let outer_struct_fields = Fields::from(vec![
1998 Field::new(
1999 "inner",
2000 DataType::Struct(inner_struct_fields.clone()),
2001 false,
2002 ),
2003 Field::new("c", DataType::Int32, false),
2004 ]);
2005 let schema = Schema::new(vec![Field::new(
2006 "x",
2007 DataType::Struct(outer_struct_fields.clone()),
2008 true,
2009 )]);
2010
2011 let inner1 = Arc::new(StructArray::new(
2013 inner_struct_fields.clone(),
2014 vec![
2015 Arc::new(Int32Array::from(vec![1, 2])),
2016 Arc::new(StringArray::from(vec!["x", "y"])),
2017 ],
2018 None,
2019 ));
2020 let c_array = Arc::new(Int32Array::from(vec![10, 20]));
2021 let outer_array =
2022 StructArray::new(outer_struct_fields.clone(), vec![inner1, c_array], None);
2023
2024 let col_x = col("x", &schema)?;
2025 let batch =
2026 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(outer_array)])?;
2027
2028 let inner_match = Arc::new(StructArray::new(
2030 inner_struct_fields.clone(),
2031 vec![
2032 Arc::new(Int32Array::from(vec![1])),
2033 Arc::new(StringArray::from(vec!["x"])),
2034 ],
2035 None,
2036 ));
2037 let outer_match = ScalarValue::Struct(Arc::new(StructArray::new(
2038 outer_struct_fields.clone(),
2039 vec![inner_match, Arc::new(Int32Array::from(vec![10]))],
2040 None,
2041 )));
2042
2043 let list = vec![lit(outer_match)];
2045 in_list_raw!(
2046 batch,
2047 list.clone(),
2048 &false,
2049 vec![Some(true), Some(false)],
2050 Arc::clone(&col_x),
2051 &schema
2052 );
2053
2054 in_list_raw!(
2056 batch,
2057 list,
2058 &true,
2059 vec![Some(false), Some(true)],
2060 Arc::clone(&col_x),
2061 &schema
2062 );
2063
2064 Ok(())
2065 }
2066
2067 #[test]
2068 fn in_list_struct_with_exprs_not_array() -> Result<()> {
2069 let struct_fields = Fields::from(vec![
2075 Field::new("x", DataType::Int32, false),
2076 Field::new("y", DataType::Utf8, false),
2077 ]);
2078 let schema = Schema::new(vec![Field::new(
2079 "a",
2080 DataType::Struct(struct_fields.clone()),
2081 true,
2082 )]);
2083
2084 let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
2086 let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
2087 let struct_array =
2088 StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
2089
2090 let col_a = col("a", &schema)?;
2091 let batch =
2092 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
2093
2094 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
2097 struct_fields.clone(),
2098 vec![
2099 Arc::new(Int32Array::from(vec![1])),
2100 Arc::new(StringArray::from(vec!["a"])),
2101 ],
2102 None,
2103 )));
2104
2105 let struct3 = ScalarValue::Struct(Arc::new(StructArray::new(
2107 struct_fields.clone(),
2108 vec![
2109 Arc::new(Int32Array::from(vec![3])),
2110 Arc::new(StringArray::from(vec!["c"])),
2111 ],
2112 None,
2113 )));
2114
2115 let list = vec![lit(struct1), lit(struct3)];
2117
2118 let expr = Arc::new(InListExpr::new(Arc::clone(&col_a), list, false, None));
2121
2122 let display_string = expr.to_string();
2125 assert!(
2126 !display_string.contains("(SET)"),
2127 "Expected display string to NOT contain '(SET)' (should use Exprs variant), but got: {display_string}",
2128 );
2129
2130 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2132 let result = as_boolean_array(&result);
2133
2134 let expected = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
2138 assert_eq!(result, &expected);
2139
2140 let expr_not = Arc::new(InListExpr::new(
2142 Arc::clone(&col_a),
2143 vec![
2144 lit(ScalarValue::Struct(Arc::new(StructArray::new(
2145 struct_fields.clone(),
2146 vec![
2147 Arc::new(Int32Array::from(vec![1])),
2148 Arc::new(StringArray::from(vec!["a"])),
2149 ],
2150 None,
2151 )))),
2152 lit(ScalarValue::Struct(Arc::new(StructArray::new(
2153 struct_fields.clone(),
2154 vec![
2155 Arc::new(Int32Array::from(vec![3])),
2156 Arc::new(StringArray::from(vec!["c"])),
2157 ],
2158 None,
2159 )))),
2160 ],
2161 true,
2162 None,
2163 ));
2164
2165 let result_not = expr_not.evaluate(&batch)?.into_array(batch.num_rows())?;
2166 let result_not = as_boolean_array(&result_not);
2167
2168 let expected_not = BooleanArray::from(vec![Some(false), Some(true), Some(false)]);
2169 assert_eq!(result_not, &expected_not);
2170
2171 Ok(())
2172 }
2173
2174 #[test]
2175 fn test_in_list_null_handling_comprehensive() -> Result<()> {
2176 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2179
2180 let a = Int64Array::from(vec![Some(1), Some(2), Some(3), None]);
2186 let col_a = col("a", &schema)?;
2187 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2188
2189 let list = vec![lit(1i64), lit(4i64)];
2192 in_list!(
2193 batch,
2194 list,
2195 &false,
2196 vec![
2197 Some(true), Some(false), Some(false), None, ],
2202 Arc::clone(&col_a),
2203 &schema
2204 );
2205
2206 let list = vec![lit(1i64), lit(ScalarValue::Int64(None))];
2209 in_list!(
2210 batch,
2211 list,
2212 &false,
2213 vec![
2214 Some(true), None, None, None, ],
2219 Arc::clone(&col_a),
2220 &schema
2221 );
2222
2223 Ok(())
2224 }
2225
2226 #[test]
2227 fn test_in_list_with_only_nulls() -> Result<()> {
2228 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2230 let a = Int64Array::from(vec![Some(1), Some(2), None]);
2231 let col_a = col("a", &schema)?;
2232 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2233
2234 let list = vec![lit(ScalarValue::Int64(None)), lit(ScalarValue::Int64(None))];
2236
2237 in_list!(
2241 batch,
2242 list.clone(),
2243 &false,
2244 vec![None, None, None],
2245 Arc::clone(&col_a),
2246 &schema
2247 );
2248
2249 in_list!(
2252 batch,
2253 list,
2254 &true,
2255 vec![None, None, None],
2256 Arc::clone(&col_a),
2257 &schema
2258 );
2259
2260 Ok(())
2261 }
2262
2263 #[test]
2264 fn test_in_list_multiple_nulls_deduplication() -> Result<()> {
2265 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2268 let col_a = col("a", &schema)?;
2269
2270 let array = Arc::new(Int64Array::from(vec![
2272 Some(1),
2273 Some(2),
2274 None,
2275 None,
2276 Some(3),
2277 None,
2278 ])) as ArrayRef;
2279
2280 let expr = Arc::new(InListExpr::try_new_from_array(
2282 Arc::clone(&col_a),
2283 array,
2284 false,
2285 )?) as Arc<dyn PhysicalExpr>;
2286
2287 let a = Int64Array::from(vec![Some(1), Some(2), Some(3), Some(4), None]);
2289 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2290
2291 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2293 let result = as_boolean_array(&result);
2294
2295 let expected = BooleanArray::from(vec![
2300 Some(true), Some(true), Some(true), None, None, ]);
2306 assert_eq!(result, &expected);
2307
2308 Ok(())
2309 }
2310
2311 #[test]
2312 fn test_not_in_null_handling_comprehensive() -> Result<()> {
2313 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2316
2317 let a = Int64Array::from(vec![Some(1), Some(2), Some(3), None]);
2319 let col_a = col("a", &schema)?;
2320 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2321
2322 let list = vec![lit(1i64), lit(4i64)];
2325 in_list!(
2326 batch,
2327 list,
2328 &true,
2329 vec![
2330 Some(false), Some(true), Some(true), None, ],
2335 Arc::clone(&col_a),
2336 &schema
2337 );
2338
2339 let list = vec![lit(1i64), lit(ScalarValue::Int64(None))];
2342 in_list!(
2343 batch,
2344 list,
2345 &true,
2346 vec![
2347 Some(false), None, None, None, ],
2352 Arc::clone(&col_a),
2353 &schema
2354 );
2355
2356 Ok(())
2357 }
2358
2359 #[test]
2360 fn test_in_list_null_type_column() -> Result<()> {
2361 let schema = Schema::new(vec![Field::new("a", DataType::Null, true)]);
2364 let a = NullArray::new(3);
2365 let col_a = col("a", &schema)?;
2366 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2367
2368 let list = vec![lit(1i64), lit(2i64)];
2371
2372 in_list!(
2376 batch,
2377 list.clone(),
2378 &false,
2379 vec![None, None, None],
2380 Arc::clone(&col_a),
2381 &schema
2382 );
2383
2384 in_list!(
2387 batch,
2388 list,
2389 &true,
2390 vec![None, None, None],
2391 Arc::clone(&col_a),
2392 &schema
2393 );
2394
2395 Ok(())
2396 }
2397
2398 #[test]
2399 fn test_in_list_null_type_list() -> Result<()> {
2400 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2402 let a = Int64Array::from(vec![Some(1), Some(2), None]);
2403 let col_a = col("a", &schema)?;
2404
2405 let null_array = Arc::new(NullArray::new(2)) as ArrayRef;
2407
2408 let expr = Arc::new(InListExpr::try_new_from_array(
2411 Arc::clone(&col_a),
2412 null_array,
2413 false,
2414 )?) as Arc<dyn PhysicalExpr>;
2415 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2416 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2417 let result = as_boolean_array(&result);
2418
2419 let expected = BooleanArray::from(vec![None, None, None]);
2422 assert_eq!(result, &expected);
2423
2424 Ok(())
2425 }
2426
2427 #[test]
2428 fn test_in_list_null_type_both() -> Result<()> {
2429 let schema = Schema::new(vec![Field::new("a", DataType::Null, true)]);
2431 let a = NullArray::new(3);
2432 let col_a = col("a", &schema)?;
2433
2434 let null_array = Arc::new(NullArray::new(2)) as ArrayRef;
2436
2437 let expr = Arc::new(InListExpr::try_new_from_array(
2439 Arc::clone(&col_a),
2440 null_array,
2441 false,
2442 )?) as Arc<dyn PhysicalExpr>;
2443
2444 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2445 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2446 let result = as_boolean_array(&result);
2447
2448 let expected = BooleanArray::from(vec![None, None, None]);
2451 assert_eq!(result, &expected);
2452
2453 Ok(())
2454 }
2455
2456 #[test]
2457 fn test_in_list_comprehensive_null_handling() -> Result<()> {
2458 let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2466 let col_b = col("b", &schema)?;
2467 let null_i32 = ScalarValue::Int32(None);
2468
2469 let make_batch = |values: Vec<Option<i32>>| -> Result<RecordBatch> {
2471 let array = Arc::new(Int32Array::from(values));
2472 Ok(RecordBatch::try_new(Arc::clone(&schema), vec![array])?)
2473 };
2474
2475 let run_test = |batch: &RecordBatch,
2477 expr: Arc<dyn PhysicalExpr>,
2478 list: Vec<Arc<dyn PhysicalExpr>>,
2479 expected: Vec<Option<bool>>|
2480 -> Result<()> {
2481 let in_expr = in_list(expr, list, &false, schema.as_ref())?;
2482 let result = in_expr.evaluate(batch)?.into_array(batch.num_rows())?;
2483 let result = as_boolean_array(&result);
2484 assert_eq!(result, &BooleanArray::from(expected));
2485 Ok(())
2486 };
2487
2488 let batch = make_batch(vec![Some(1)])?;
2494 run_test(
2495 &batch,
2496 Arc::clone(&col_b),
2497 vec![lit(1i32), lit(2i32)],
2498 vec![Some(true)],
2499 )?;
2500
2501 let batch = make_batch(vec![Some(1), Some(2)])?;
2503 run_test(
2504 &batch,
2505 Arc::clone(&col_b),
2506 vec![lit(1i32), lit(2i32)],
2507 vec![Some(true), Some(true)],
2508 )?;
2509
2510 let batch = make_batch(vec![Some(3), Some(4)])?;
2512 run_test(
2513 &batch,
2514 Arc::clone(&col_b),
2515 vec![lit(1i32), lit(2i32)],
2516 vec![Some(false), Some(false)],
2517 )?;
2518
2519 let batch = make_batch(vec![Some(1), None])?;
2521 run_test(
2522 &batch,
2523 Arc::clone(&col_b),
2524 vec![lit(1i32), lit(2i32)],
2525 vec![Some(true), None],
2526 )?;
2527
2528 let batch = make_batch(vec![Some(3), None])?;
2530 run_test(
2531 &batch,
2532 Arc::clone(&col_b),
2533 vec![lit(1i32), lit(2i32)],
2534 vec![Some(false), None],
2535 )?;
2536
2537 let batch = make_batch(vec![Some(1)])?;
2543 run_test(
2544 &batch,
2545 Arc::clone(&col_b),
2546 vec![lit(null_i32.clone()), lit(1i32)],
2547 vec![Some(true)],
2548 )?;
2549
2550 let batch = make_batch(vec![Some(2)])?;
2552 run_test(
2553 &batch,
2554 Arc::clone(&col_b),
2555 vec![lit(null_i32.clone()), lit(1i32)],
2556 vec![None],
2557 )?;
2558
2559 let batch = make_batch(vec![None])?;
2561 run_test(
2562 &batch,
2563 Arc::clone(&col_b),
2564 vec![lit(null_i32.clone()), lit(1i32)],
2565 vec![None],
2566 )?;
2567
2568 let batch = make_batch(vec![Some(1)])?;
2574 run_test(
2575 &batch,
2576 Arc::clone(&col_b),
2577 vec![lit(null_i32.clone()), lit(null_i32.clone())],
2578 vec![None],
2579 )?;
2580
2581 let batch = make_batch(vec![None])?;
2583 run_test(
2584 &batch,
2585 Arc::clone(&col_b),
2586 vec![lit(null_i32.clone()), lit(null_i32.clone())],
2587 vec![None],
2588 )?;
2589
2590 let batch = make_batch(vec![Some(1)])?;
2596 run_test(
2597 &batch,
2598 lit(1i32),
2599 vec![lit(2i32), Arc::clone(&col_b)],
2600 vec![Some(true)],
2601 )?;
2602
2603 let batch = make_batch(vec![Some(3)])?;
2605 run_test(
2606 &batch,
2607 lit(1i32),
2608 vec![lit(2i32), Arc::clone(&col_b)],
2609 vec![Some(false)],
2610 )?;
2611
2612 let batch = make_batch(vec![None])?;
2614 run_test(
2615 &batch,
2616 lit(1i32),
2617 vec![lit(2i32), Arc::clone(&col_b)],
2618 vec![None],
2619 )?;
2620
2621 let batch = make_batch(vec![Some(1)])?;
2627 run_test(
2628 &batch,
2629 Arc::clone(&col_b),
2630 vec![lit(1i32), Arc::clone(&col_b)],
2631 vec![Some(true)],
2632 )?;
2633
2634 let batch = make_batch(vec![Some(2)])?;
2636 run_test(
2637 &batch,
2638 Arc::clone(&col_b),
2639 vec![lit(1i32), Arc::clone(&col_b)],
2640 vec![Some(true)],
2641 )?;
2642
2643 let batch = make_batch(vec![None])?;
2645 run_test(
2646 &batch,
2647 Arc::clone(&col_b),
2648 vec![lit(1i32), Arc::clone(&col_b)],
2649 vec![None],
2650 )?;
2651
2652 Ok(())
2653 }
2654
2655 #[test]
2656 fn test_in_list_scalar_literal_cases() -> Result<()> {
2657 let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2662 let null_i32 = ScalarValue::Int32(None);
2663
2664 let make_batch = |values: Vec<Option<i32>>| -> Result<RecordBatch> {
2666 let array = Arc::new(Int32Array::from(values));
2667 Ok(RecordBatch::try_new(Arc::clone(&schema), vec![array])?)
2668 };
2669
2670 let run_test = |batch: &RecordBatch,
2672 expr: Arc<dyn PhysicalExpr>,
2673 list: Vec<Arc<dyn PhysicalExpr>>,
2674 negated: bool,
2675 expected: Vec<Option<bool>>|
2676 -> Result<()> {
2677 let in_expr = in_list(expr, list, &negated, schema.as_ref())?;
2678 let result = in_expr.evaluate(batch)?.into_array(batch.num_rows())?;
2679 let result = as_boolean_array(&result);
2680 let expected_array = BooleanArray::from(expected);
2681 assert_eq!(
2682 result,
2683 &expected_array,
2684 "Expected {:?}, got {:?}",
2685 expected_array,
2686 result.iter().collect::<Vec<_>>()
2687 );
2688 Ok(())
2689 };
2690
2691 let batch = make_batch(vec![Some(1)])?;
2692
2693 run_test(
2700 &batch,
2701 lit(null_i32.clone()),
2702 vec![lit(1i32), lit(1i32)],
2703 false,
2704 vec![None],
2705 )?;
2706
2707 run_test(
2709 &batch,
2710 lit(null_i32.clone()),
2711 vec![lit(null_i32.clone()), lit(1i32)],
2712 false,
2713 vec![None],
2714 )?;
2715
2716 run_test(
2718 &batch,
2719 lit(null_i32.clone()),
2720 vec![lit(null_i32.clone()), lit(null_i32.clone())],
2721 false,
2722 vec![None],
2723 )?;
2724
2725 run_test(
2733 &batch,
2734 lit(3i32),
2735 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2736 false,
2737 vec![None],
2738 )?;
2739
2740 run_test(
2742 &batch,
2743 lit(3i32),
2744 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2745 true,
2746 vec![None],
2747 )?;
2748
2749 run_test(
2751 &batch,
2752 lit(1i32),
2753 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2754 false,
2755 vec![Some(true)],
2756 )?;
2757
2758 run_test(
2760 &batch,
2761 lit(1i32),
2762 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2763 true,
2764 vec![Some(false)],
2765 )?;
2766
2767 let schema_str =
2773 Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
2774 let batch_str = RecordBatch::try_new(
2775 Arc::clone(&schema_str),
2776 vec![Arc::new(StringArray::from(vec![Some("dummy")]))],
2777 )?;
2778 let null_str = ScalarValue::Utf8(None);
2779
2780 let run_test_str = |expr: Arc<dyn PhysicalExpr>,
2781 list: Vec<Arc<dyn PhysicalExpr>>,
2782 negated: bool,
2783 expected: Vec<Option<bool>>|
2784 -> Result<()> {
2785 let in_expr = in_list(expr, list, &negated, schema_str.as_ref())?;
2786 let result = in_expr
2787 .evaluate(&batch_str)?
2788 .into_array(batch_str.num_rows())?;
2789 let result = as_boolean_array(&result);
2790 let expected_array = BooleanArray::from(expected);
2791 assert_eq!(
2792 result,
2793 &expected_array,
2794 "Expected {:?}, got {:?}",
2795 expected_array,
2796 result.iter().collect::<Vec<_>>()
2797 );
2798 Ok(())
2799 };
2800
2801 run_test_str(
2803 lit("c"),
2804 vec![lit("a"), lit("b"), lit(null_str.clone())],
2805 false,
2806 vec![None],
2807 )?;
2808
2809 run_test_str(
2811 lit("c"),
2812 vec![lit("a"), lit("b"), lit(null_str.clone())],
2813 true,
2814 vec![None],
2815 )?;
2816
2817 run_test_str(
2819 lit("a"),
2820 vec![lit("a"), lit("b"), lit(null_str.clone())],
2821 false,
2822 vec![Some(true)],
2823 )?;
2824
2825 run_test_str(
2827 lit("a"),
2828 vec![lit("a"), lit("b"), lit(null_str.clone())],
2829 true,
2830 vec![Some(false)],
2831 )?;
2832
2833 Ok(())
2834 }
2835
2836 #[test]
2837 fn test_in_list_tuple_cases() -> Result<()> {
2838 let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2842
2843 let make_struct = |v1: Option<i32>, v2: Option<i32>| -> ScalarValue {
2845 let fields = Fields::from(vec![
2846 Field::new("field_0", DataType::Int32, true),
2847 Field::new("field_1", DataType::Int32, true),
2848 ]);
2849 ScalarValue::Struct(Arc::new(StructArray::new(
2850 fields,
2851 vec![
2852 Arc::new(Int32Array::from(vec![v1])),
2853 Arc::new(Int32Array::from(vec![v2])),
2854 ],
2855 None,
2856 )))
2857 };
2858
2859 let batch = RecordBatch::try_new(
2861 Arc::clone(&schema),
2862 vec![Arc::new(Int32Array::from(vec![Some(1)]))],
2863 )?;
2864
2865 let run_tuple_test = |lhs: ScalarValue,
2867 list: Vec<ScalarValue>,
2868 expected: Vec<Option<bool>>|
2869 -> Result<()> {
2870 let expr = in_list(
2871 lit(lhs),
2872 list.into_iter().map(lit).collect(),
2873 &false,
2874 schema.as_ref(),
2875 )?;
2876 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2877 let result = as_boolean_array(&result);
2878 assert_eq!(result, &BooleanArray::from(expected));
2879 Ok(())
2880 };
2881
2882 run_tuple_test(
2884 make_struct(None, None),
2885 vec![make_struct(Some(1), Some(2))],
2886 vec![Some(false)],
2887 )?;
2888
2889 run_tuple_test(
2891 make_struct(None, None),
2892 vec![make_struct(None, Some(1))],
2893 vec![Some(false)],
2894 )?;
2895
2896 run_tuple_test(
2898 make_struct(None, None),
2899 vec![make_struct(None, None)],
2900 vec![Some(true)],
2901 )?;
2902
2903 run_tuple_test(
2905 make_struct(None, Some(1)),
2906 vec![make_struct(Some(1), Some(2))],
2907 vec![Some(false)],
2908 )?;
2909
2910 run_tuple_test(
2912 make_struct(None, Some(1)),
2913 vec![make_struct(None, Some(1))],
2914 vec![Some(true)],
2915 )?;
2916
2917 run_tuple_test(
2919 make_struct(None, Some(1)),
2920 vec![make_struct(None, None)],
2921 vec![Some(false)],
2922 )?;
2923
2924 run_tuple_test(
2926 make_struct(Some(1), Some(2)),
2927 vec![make_struct(Some(1), Some(2))],
2928 vec![Some(true)],
2929 )?;
2930
2931 run_tuple_test(
2933 make_struct(Some(1), Some(3)),
2934 vec![make_struct(Some(1), Some(2))],
2935 vec![Some(false)],
2936 )?;
2937
2938 run_tuple_test(
2940 make_struct(Some(4), Some(4)),
2941 vec![make_struct(Some(1), Some(2))],
2942 vec![Some(false)],
2943 )?;
2944
2945 run_tuple_test(
2947 make_struct(Some(1), Some(1)),
2948 vec![make_struct(None, Some(1))],
2949 vec![Some(false)],
2950 )?;
2951
2952 run_tuple_test(
2954 make_struct(Some(1), Some(1)),
2955 vec![make_struct(None, None)],
2956 vec![Some(false)],
2957 )?;
2958
2959 Ok(())
2960 }
2961
2962 #[test]
2963 fn test_in_list_dictionary_int32() -> Result<()> {
2964 let dict_type =
2966 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32));
2967 let schema = Schema::new(vec![Field::new("a", dict_type.clone(), false)]);
2968 let col_a = col("a", &schema)?;
2969
2970 let list = vec![lit(100i32), lit(200i32), lit(300i32)];
2972
2973 let expr = in_list(col_a, list, &false, &schema)?;
2975
2976 let keys = Int8Array::from(vec![0, 1, 2]);
2980 let values = Int32Array::from(vec![100, 200, 500]);
2981 let dict_array: ArrayRef =
2982 Arc::new(DictionaryArray::try_new(keys, Arc::new(values))?);
2983 let batch = RecordBatch::try_new(Arc::new(schema), vec![dict_array])?;
2984
2985 let result = expr.evaluate(&batch)?.into_array(3)?;
2987 let result = as_boolean_array(&result);
2988 assert_eq!(result, &BooleanArray::from(vec![true, true, false]));
2989 Ok(())
2990 }
2991
2992 #[test]
2993 fn test_in_list_dictionary_types() -> Result<()> {
2994 fn dict_lit_int64(key_type: DataType, value: i64) -> Arc<dyn PhysicalExpr> {
2996 lit(ScalarValue::Dictionary(
2997 Box::new(key_type),
2998 Box::new(ScalarValue::Int64(Some(value))),
2999 ))
3000 }
3001
3002 fn dict_lit_float64(key_type: DataType, value: f64) -> Arc<dyn PhysicalExpr> {
3003 lit(ScalarValue::Dictionary(
3004 Box::new(key_type),
3005 Box::new(ScalarValue::Float64(Some(value))),
3006 ))
3007 }
3008
3009 struct DictNeedleTest {
3011 list_values: Vec<Arc<dyn PhysicalExpr>>,
3012 expected: Vec<Option<bool>>,
3013 }
3014
3015 struct DictionaryInListTestCase {
3016 name: &'static str,
3017 dict_type: DataType,
3018 dict_keys: Vec<Option<i8>>,
3019 dict_values: ArrayRef,
3020 list_values_no_null: Vec<Arc<dyn PhysicalExpr>>,
3021 list_values_with_null: Vec<Arc<dyn PhysicalExpr>>,
3022 expected_1: Vec<Option<bool>>,
3023 expected_2: Vec<Option<bool>>,
3024 expected_3: Vec<Option<bool>>,
3025 expected_4: Vec<Option<bool>>,
3026 dict_needle_test: Option<DictNeedleTest>,
3027 }
3028
3029 fn run_dictionary_in_list_test(
3031 test_case: DictionaryInListTestCase,
3032 ) -> Result<()> {
3033 let schema =
3035 Schema::new(vec![Field::new("a", test_case.dict_type.clone(), true)]);
3036 let col_a = col("a", &schema)?;
3037
3038 let keys = Int8Array::from(test_case.dict_keys.clone());
3040 let dict_array: ArrayRef =
3041 Arc::new(DictionaryArray::try_new(keys, test_case.dict_values)?);
3042 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![dict_array])?;
3043
3044 let exp1 = test_case.expected_1.clone();
3045 let exp2 = test_case.expected_2.clone();
3046 let exp3 = test_case.expected_3.clone();
3047 let exp4 = test_case.expected_4;
3048
3049 in_list!(
3051 batch,
3052 test_case.list_values_no_null.clone(),
3053 &false,
3054 exp1,
3055 Arc::clone(&col_a),
3056 &schema
3057 );
3058
3059 in_list!(
3061 batch,
3062 test_case.list_values_no_null.clone(),
3063 &true,
3064 exp2,
3065 Arc::clone(&col_a),
3066 &schema
3067 );
3068
3069 in_list!(
3071 batch,
3072 test_case.list_values_with_null.clone(),
3073 &false,
3074 exp3,
3075 Arc::clone(&col_a),
3076 &schema
3077 );
3078
3079 in_list!(
3081 batch,
3082 test_case.list_values_with_null,
3083 &true,
3084 exp4,
3085 Arc::clone(&col_a),
3086 &schema
3087 );
3088
3089 if let Some(needle_test) = test_case.dict_needle_test {
3091 in_list_raw!(
3092 batch,
3093 needle_test.list_values,
3094 &false,
3095 needle_test.expected,
3096 Arc::clone(&col_a),
3097 &schema
3098 );
3099 }
3100
3101 Ok(())
3102 }
3103
3104 let utf8_case = DictionaryInListTestCase {
3108 name: "dictionary_utf8",
3109 dict_type: DataType::Dictionary(
3110 Box::new(DataType::Int8),
3111 Box::new(DataType::Utf8),
3112 ),
3113 dict_keys: vec![Some(0), Some(1), None],
3114 dict_values: Arc::new(StringArray::from(vec![Some("a"), Some("d")])),
3115 list_values_no_null: vec![lit("a"), lit("b")],
3116 list_values_with_null: vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
3117 expected_1: vec![Some(true), Some(false), None],
3118 expected_2: vec![Some(false), Some(true), None],
3119 expected_3: vec![Some(true), None, None],
3120 expected_4: vec![Some(false), None, None],
3121 dict_needle_test: None,
3122 };
3123
3124 let int64_case = DictionaryInListTestCase {
3128 name: "dictionary_int64",
3129 dict_type: DataType::Dictionary(
3130 Box::new(DataType::Int8),
3131 Box::new(DataType::Int64),
3132 ),
3133 dict_keys: vec![Some(0), Some(1), None],
3134 dict_values: Arc::new(Int64Array::from(vec![Some(10), Some(20)])),
3135 list_values_no_null: vec![lit(10i64), lit(15i64)],
3136 list_values_with_null: vec![
3137 lit(10i64),
3138 lit(15i64),
3139 lit(ScalarValue::Int64(None)),
3140 ],
3141 expected_1: vec![Some(true), Some(false), None],
3142 expected_2: vec![Some(false), Some(true), None],
3143 expected_3: vec![Some(true), None, None],
3144 expected_4: vec![Some(false), None, None],
3145 dict_needle_test: Some(DictNeedleTest {
3146 list_values: vec![
3147 dict_lit_int64(DataType::Int16, 10),
3148 dict_lit_int64(DataType::Int16, 15),
3149 ],
3150 expected: vec![Some(true), Some(false), None],
3151 }),
3152 };
3153
3154 let float64_case = DictionaryInListTestCase {
3159 name: "dictionary_float64",
3160 dict_type: DataType::Dictionary(
3161 Box::new(DataType::Int8),
3162 Box::new(DataType::Float64),
3163 ),
3164 dict_keys: vec![Some(0), Some(1), None, Some(2)],
3165 dict_values: Arc::new(Float64Array::from(vec![
3166 Some(1.5), Some(3.7), Some(f64::NAN), ])),
3170 list_values_no_null: vec![lit(1.5f64), lit(2.0f64)],
3171 list_values_with_null: vec![
3172 lit(1.5f64),
3173 lit(2.0f64),
3174 lit(ScalarValue::Float64(None)),
3175 ],
3176 expected_1: vec![Some(true), Some(false), None, Some(false)],
3179 expected_2: vec![Some(false), Some(true), None, Some(true)],
3182 expected_3: vec![Some(true), None, None, None],
3185 expected_4: vec![Some(false), None, None, None],
3188 dict_needle_test: Some(DictNeedleTest {
3189 list_values: vec![
3190 dict_lit_float64(DataType::UInt16, 1.5),
3191 dict_lit_float64(DataType::UInt16, 2.0),
3192 ],
3193 expected: vec![Some(true), Some(false), None, Some(false)],
3194 }),
3195 };
3196
3197 let test_name = utf8_case.name;
3199 run_dictionary_in_list_test(utf8_case).map_err(|e| {
3200 datafusion_common::DataFusionError::Execution(format!(
3201 "Dictionary test '{test_name}' failed: {e}"
3202 ))
3203 })?;
3204
3205 let test_name = int64_case.name;
3206 run_dictionary_in_list_test(int64_case).map_err(|e| {
3207 datafusion_common::DataFusionError::Execution(format!(
3208 "Dictionary test '{test_name}' failed: {e}"
3209 ))
3210 })?;
3211
3212 let test_name = float64_case.name;
3213 run_dictionary_in_list_test(float64_case).map_err(|e| {
3214 datafusion_common::DataFusionError::Execution(format!(
3215 "Dictionary test '{test_name}' failed: {e}"
3216 ))
3217 })?;
3218
3219 let dedup_case = DictionaryInListTestCase {
3223 name: "dictionary_deduplication",
3224 dict_type: DataType::Dictionary(
3225 Box::new(DataType::Int8),
3226 Box::new(DataType::Utf8),
3227 ),
3228 dict_keys: vec![Some(0), Some(1), Some(0), Some(1), None],
3231 dict_values: Arc::new(StringArray::from(vec![Some("a"), Some("d")])),
3232 list_values_no_null: vec![lit("a"), lit("b")],
3233 list_values_with_null: vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
3234 expected_1: vec![Some(true), Some(false), Some(true), Some(false), None],
3237 expected_2: vec![Some(false), Some(true), Some(false), Some(true), None],
3239 expected_3: vec![Some(true), None, Some(true), None, None],
3242 expected_4: vec![Some(false), None, Some(false), None, None],
3244 dict_needle_test: None,
3245 };
3246
3247 let test_name = dedup_case.name;
3248 run_dictionary_in_list_test(dedup_case).map_err(|e| {
3249 datafusion_common::DataFusionError::Execution(format!(
3250 "Dictionary test '{test_name}' failed: {e}"
3251 ))
3252 })?;
3253
3254 let dict_type =
3256 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Float64));
3257 let schema = Schema::new(vec![Field::new("a", dict_type.clone(), true)]);
3258 let col_a = col("a", &schema)?;
3259
3260 let keys = Int8Array::from(vec![Some(0), Some(1), None, Some(2)]);
3261 let values = Float64Array::from(vec![Some(1.5), Some(3.7), Some(f64::NAN)]);
3262 let dict_array: ArrayRef =
3263 Arc::new(DictionaryArray::try_new(keys, Arc::new(values))?);
3264 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![dict_array])?;
3265
3266 let list_with_nan = vec![lit(1.5f64), lit(2.0f64), lit(f64::NAN)];
3268 in_list!(
3269 batch,
3270 list_with_nan,
3271 &false,
3272 vec![Some(true), Some(false), None, Some(true)],
3273 col_a,
3274 &schema
3275 );
3276
3277 Ok(())
3278 }
3279
3280 #[test]
3281 fn test_in_list_esoteric_types() -> Result<()> {
3282 let test_type = |data_type: DataType,
3289 in_array: ArrayRef,
3290 list_values: Vec<ScalarValue>|
3291 -> Result<()> {
3292 let schema = Schema::new(vec![Field::new("a", data_type.clone(), false)]);
3293 let col_a = col("a", &schema)?;
3294 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![in_array])?;
3295
3296 let list = list_values.into_iter().map(lit).collect();
3297 in_list!(
3298 batch,
3299 list,
3300 &false,
3301 vec![Some(true), Some(false)],
3302 col_a,
3303 &schema
3304 );
3305 Ok(())
3306 };
3307
3308 test_type(
3310 DataType::Timestamp(TimeUnit::Second, None),
3311 Arc::new(TimestampSecondArray::from(vec![Some(1000), Some(2000)])),
3312 vec![
3313 ScalarValue::TimestampSecond(Some(1000), None),
3314 ScalarValue::TimestampSecond(Some(1500), None),
3315 ],
3316 )?;
3317
3318 test_type(
3319 DataType::Timestamp(TimeUnit::Millisecond, None),
3320 Arc::new(TimestampMillisecondArray::from(vec![
3321 Some(1000000),
3322 Some(2000000),
3323 ])),
3324 vec![
3325 ScalarValue::TimestampMillisecond(Some(1000000), None),
3326 ScalarValue::TimestampMillisecond(Some(1500000), None),
3327 ],
3328 )?;
3329
3330 test_type(
3331 DataType::Timestamp(TimeUnit::Microsecond, None),
3332 Arc::new(TimestampMicrosecondArray::from(vec![
3333 Some(1000000000),
3334 Some(2000000000),
3335 ])),
3336 vec![
3337 ScalarValue::TimestampMicrosecond(Some(1000000000), None),
3338 ScalarValue::TimestampMicrosecond(Some(1500000000), None),
3339 ],
3340 )?;
3341
3342 test_type(
3344 DataType::Time32(TimeUnit::Second),
3345 Arc::new(Time32SecondArray::from(vec![Some(3600), Some(7200)])),
3346 vec![
3347 ScalarValue::Time32Second(Some(3600)),
3348 ScalarValue::Time32Second(Some(5400)),
3349 ],
3350 )?;
3351
3352 test_type(
3353 DataType::Time32(TimeUnit::Millisecond),
3354 Arc::new(Time32MillisecondArray::from(vec![
3355 Some(3600000),
3356 Some(7200000),
3357 ])),
3358 vec![
3359 ScalarValue::Time32Millisecond(Some(3600000)),
3360 ScalarValue::Time32Millisecond(Some(5400000)),
3361 ],
3362 )?;
3363
3364 test_type(
3365 DataType::Time64(TimeUnit::Microsecond),
3366 Arc::new(Time64MicrosecondArray::from(vec![
3367 Some(3600000000),
3368 Some(7200000000),
3369 ])),
3370 vec![
3371 ScalarValue::Time64Microsecond(Some(3600000000)),
3372 ScalarValue::Time64Microsecond(Some(5400000000)),
3373 ],
3374 )?;
3375
3376 test_type(
3377 DataType::Time64(TimeUnit::Nanosecond),
3378 Arc::new(Time64NanosecondArray::from(vec![
3379 Some(3600000000000),
3380 Some(7200000000000),
3381 ])),
3382 vec![
3383 ScalarValue::Time64Nanosecond(Some(3600000000000)),
3384 ScalarValue::Time64Nanosecond(Some(5400000000000)),
3385 ],
3386 )?;
3387
3388 test_type(
3390 DataType::Duration(TimeUnit::Second),
3391 Arc::new(DurationSecondArray::from(vec![Some(86400), Some(172800)])),
3392 vec![
3393 ScalarValue::DurationSecond(Some(86400)),
3394 ScalarValue::DurationSecond(Some(129600)),
3395 ],
3396 )?;
3397
3398 test_type(
3399 DataType::Duration(TimeUnit::Millisecond),
3400 Arc::new(DurationMillisecondArray::from(vec![
3401 Some(86400000),
3402 Some(172800000),
3403 ])),
3404 vec![
3405 ScalarValue::DurationMillisecond(Some(86400000)),
3406 ScalarValue::DurationMillisecond(Some(129600000)),
3407 ],
3408 )?;
3409
3410 test_type(
3411 DataType::Duration(TimeUnit::Microsecond),
3412 Arc::new(DurationMicrosecondArray::from(vec![
3413 Some(86400000000),
3414 Some(172800000000),
3415 ])),
3416 vec![
3417 ScalarValue::DurationMicrosecond(Some(86400000000)),
3418 ScalarValue::DurationMicrosecond(Some(129600000000)),
3419 ],
3420 )?;
3421
3422 test_type(
3423 DataType::Duration(TimeUnit::Nanosecond),
3424 Arc::new(DurationNanosecondArray::from(vec![
3425 Some(86400000000000),
3426 Some(172800000000000),
3427 ])),
3428 vec![
3429 ScalarValue::DurationNanosecond(Some(86400000000000)),
3430 ScalarValue::DurationNanosecond(Some(129600000000000)),
3431 ],
3432 )?;
3433
3434 test_type(
3436 DataType::Interval(IntervalUnit::YearMonth),
3437 Arc::new(IntervalYearMonthArray::from(vec![Some(12), Some(24)])),
3438 vec![
3439 ScalarValue::IntervalYearMonth(Some(12)),
3440 ScalarValue::IntervalYearMonth(Some(18)),
3441 ],
3442 )?;
3443
3444 test_type(
3445 DataType::Interval(IntervalUnit::DayTime),
3446 Arc::new(IntervalDayTimeArray::from(vec![
3447 Some(IntervalDayTime {
3448 days: 1,
3449 milliseconds: 0,
3450 }),
3451 Some(IntervalDayTime {
3452 days: 2,
3453 milliseconds: 0,
3454 }),
3455 ])),
3456 vec![
3457 ScalarValue::IntervalDayTime(Some(IntervalDayTime {
3458 days: 1,
3459 milliseconds: 0,
3460 })),
3461 ScalarValue::IntervalDayTime(Some(IntervalDayTime {
3462 days: 1,
3463 milliseconds: 500,
3464 })),
3465 ],
3466 )?;
3467
3468 test_type(
3469 DataType::Interval(IntervalUnit::MonthDayNano),
3470 Arc::new(IntervalMonthDayNanoArray::from(vec![
3471 Some(IntervalMonthDayNano {
3472 months: 1,
3473 days: 0,
3474 nanoseconds: 0,
3475 }),
3476 Some(IntervalMonthDayNano {
3477 months: 2,
3478 days: 0,
3479 nanoseconds: 0,
3480 }),
3481 ])),
3482 vec![
3483 ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano {
3484 months: 1,
3485 days: 0,
3486 nanoseconds: 0,
3487 })),
3488 ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano {
3489 months: 1,
3490 days: 15,
3491 nanoseconds: 0,
3492 })),
3493 ],
3494 )?;
3495
3496 let precision = 38;
3499 let scale = 10;
3500 test_type(
3501 DataType::Decimal256(precision, scale),
3502 Arc::new(
3503 Decimal256Array::from(vec![
3504 Some(i256::from(12345)),
3505 Some(i256::from(67890)),
3506 ])
3507 .with_precision_and_scale(precision, scale)?,
3508 ),
3509 vec![
3510 ScalarValue::Decimal256(Some(i256::from(12345)), precision, scale),
3511 ScalarValue::Decimal256(Some(i256::from(54321)), precision, scale),
3512 ],
3513 )?;
3514
3515 Ok(())
3516 }
3517 fn make_in_list_with_columns(
3520 expr: Arc<dyn PhysicalExpr>,
3521 list: Vec<Arc<dyn PhysicalExpr>>,
3522 negated: bool,
3523 ) -> Arc<InListExpr> {
3524 Arc::new(InListExpr::new(expr, list, negated, None))
3525 }
3526
3527 #[test]
3528 fn test_in_list_with_columns_int32_scalars() -> Result<()> {
3529 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
3531 let col_a = col("a", &schema)?;
3532 let batch = RecordBatch::try_new(
3533 Arc::new(schema),
3534 vec![Arc::new(Int32Array::from(vec![
3535 Some(1),
3536 Some(2),
3537 Some(3),
3538 None,
3539 ]))],
3540 )?;
3541
3542 let list = vec![
3543 lit(ScalarValue::Int32(Some(1))),
3544 lit(ScalarValue::Int32(Some(3))),
3545 ];
3546 let expr = make_in_list_with_columns(col_a, list, false);
3547
3548 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3549 let result = as_boolean_array(&result);
3550 assert_eq!(
3551 result,
3552 &BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
3553 );
3554 Ok(())
3555 }
3556
3557 #[test]
3558 fn test_in_list_with_columns_int32_column_refs() -> Result<()> {
3559 let schema = Schema::new(vec![
3561 Field::new("a", DataType::Int32, true),
3562 Field::new("b", DataType::Int32, true),
3563 Field::new("c", DataType::Int32, true),
3564 ]);
3565 let batch = RecordBatch::try_new(
3566 Arc::new(schema.clone()),
3567 vec![
3568 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), None])),
3569 Arc::new(Int32Array::from(vec![
3570 Some(1),
3571 Some(99),
3572 Some(99),
3573 Some(99),
3574 ])),
3575 Arc::new(Int32Array::from(vec![Some(99), Some(99), Some(3), None])),
3576 ],
3577 )?;
3578
3579 let col_a = col("a", &schema)?;
3580 let list = vec![col("b", &schema)?, col("c", &schema)?];
3581 let expr = make_in_list_with_columns(col_a, list, false);
3582
3583 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3584 let result = as_boolean_array(&result);
3585 assert_eq!(
3590 result,
3591 &BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
3592 );
3593 Ok(())
3594 }
3595
3596 #[test]
3597 fn test_in_list_with_columns_utf8_column_refs() -> Result<()> {
3598 let schema = Schema::new(vec![
3600 Field::new("a", DataType::Utf8, false),
3601 Field::new("b", DataType::Utf8, false),
3602 ]);
3603 let batch = RecordBatch::try_new(
3604 Arc::new(schema.clone()),
3605 vec![
3606 Arc::new(StringArray::from(vec!["x", "y", "z"])),
3607 Arc::new(StringArray::from(vec!["x", "x", "z"])),
3608 ],
3609 )?;
3610
3611 let col_a = col("a", &schema)?;
3612 let list = vec![col("b", &schema)?];
3613 let expr = make_in_list_with_columns(col_a, list, false);
3614
3615 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3616 let result = as_boolean_array(&result);
3617 assert_eq!(result, &BooleanArray::from(vec![true, false, true]));
3621 Ok(())
3622 }
3623
3624 #[test]
3625 fn test_in_list_with_columns_negated() -> Result<()> {
3626 let schema = Schema::new(vec![
3628 Field::new("a", DataType::Int32, false),
3629 Field::new("b", DataType::Int32, false),
3630 ]);
3631 let batch = RecordBatch::try_new(
3632 Arc::new(schema.clone()),
3633 vec![
3634 Arc::new(Int32Array::from(vec![1, 2, 3])),
3635 Arc::new(Int32Array::from(vec![1, 99, 3])),
3636 ],
3637 )?;
3638
3639 let col_a = col("a", &schema)?;
3640 let list = vec![col("b", &schema)?];
3641 let expr = make_in_list_with_columns(col_a, list, true);
3642
3643 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3644 let result = as_boolean_array(&result);
3645 assert_eq!(result, &BooleanArray::from(vec![false, true, false]));
3649 Ok(())
3650 }
3651
3652 #[test]
3653 fn test_in_list_with_columns_null_in_list() -> Result<()> {
3654 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3656 let col_a = col("a", &schema)?;
3657 let batch = RecordBatch::try_new(
3658 Arc::new(schema),
3659 vec![Arc::new(Int32Array::from(vec![1, 2]))],
3660 )?;
3661
3662 let list = vec![
3663 lit(ScalarValue::Int32(None)),
3664 lit(ScalarValue::Int32(Some(1))),
3665 ];
3666 let expr = make_in_list_with_columns(col_a, list, false);
3667
3668 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3669 let result = as_boolean_array(&result);
3670 assert_eq!(result, &BooleanArray::from(vec![Some(true), None]));
3673 Ok(())
3674 }
3675
3676 #[test]
3677 fn test_in_list_with_columns_float_nan() -> Result<()> {
3678 let schema = Schema::new(vec![
3681 Field::new("a", DataType::Float64, false),
3682 Field::new("b", DataType::Float64, false),
3683 ]);
3684 let batch = RecordBatch::try_new(
3685 Arc::new(schema.clone()),
3686 vec![
3687 Arc::new(Float64Array::from(vec![f64::NAN, 1.0, f64::NAN])),
3688 Arc::new(Float64Array::from(vec![f64::NAN, 2.0, 0.0])),
3689 ],
3690 )?;
3691
3692 let col_a = col("a", &schema)?;
3693 let list = vec![col("b", &schema)?];
3694 let expr = make_in_list_with_columns(col_a, list, false);
3695
3696 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3697 let result = as_boolean_array(&result);
3698 assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
3702 Ok(())
3703 }
3704
3705 #[test]
3709 fn test_in_list_with_columns_short_circuit() -> Result<()> {
3710 let schema = Schema::new(vec![
3713 Field::new("a", DataType::Int32, false),
3714 Field::new("b", DataType::Int32, false),
3715 Field::new("c", DataType::Int32, false),
3716 ]);
3717 let batch = RecordBatch::try_new(
3718 Arc::new(schema.clone()),
3719 vec![
3720 Arc::new(Int32Array::from(vec![1, 2, 3])),
3721 Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(Int32Array::from(vec![99, 99, 99])),
3723 ],
3724 )?;
3725
3726 let col_a = col("a", &schema)?;
3727 let list = vec![col("b", &schema)?, col("c", &schema)?];
3728 let expr = make_in_list_with_columns(col_a, list, false);
3729
3730 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3731 let result = as_boolean_array(&result);
3732 assert_eq!(result, &BooleanArray::from(vec![true, true, true]));
3733 Ok(())
3734 }
3735
3736 #[test]
3739 fn test_in_list_with_columns_short_circuit_with_nulls() -> Result<()> {
3740 let schema = Schema::new(vec![
3743 Field::new("a", DataType::Int32, true),
3744 Field::new("b", DataType::Int32, false),
3745 Field::new("c", DataType::Int32, false),
3746 ]);
3747 let batch = RecordBatch::try_new(
3748 Arc::new(schema.clone()),
3749 vec![
3750 Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
3751 Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(Int32Array::from(vec![99, 99, 99])),
3753 ],
3754 )?;
3755
3756 let col_a = col("a", &schema)?;
3757 let list = vec![col("b", &schema)?, col("c", &schema)?];
3758 let expr = make_in_list_with_columns(col_a, list, false);
3759
3760 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3761 let result = as_boolean_array(&result);
3762 assert_eq!(
3766 result,
3767 &BooleanArray::from(vec![Some(true), None, Some(true)])
3768 );
3769 Ok(())
3770 }
3771
3772 #[test]
3775 fn test_in_list_with_columns_struct() -> Result<()> {
3776 let struct_fields = Fields::from(vec![
3777 Field::new("x", DataType::Int32, false),
3778 Field::new("y", DataType::Utf8, false),
3779 ]);
3780 let struct_dt = DataType::Struct(struct_fields.clone());
3781
3782 let schema = Schema::new(vec![
3783 Field::new("a", struct_dt.clone(), true),
3784 Field::new("b", struct_dt.clone(), false),
3785 Field::new("c", struct_dt.clone(), false),
3786 ]);
3787
3788 let a = Arc::new(StructArray::new(
3792 struct_fields.clone(),
3793 vec![
3794 Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
3795 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
3796 ],
3797 Some(vec![true, true, false, true].into()),
3798 ));
3799 let b = Arc::new(StructArray::new(
3800 struct_fields.clone(),
3801 vec![
3802 Arc::new(Int32Array::from(vec![1, 9, 3, 4])),
3803 Arc::new(StringArray::from(vec!["a", "z", "c", "d"])),
3804 ],
3805 None,
3806 ));
3807 let c = Arc::new(StructArray::new(
3808 struct_fields.clone(),
3809 vec![
3810 Arc::new(Int32Array::from(vec![9, 2, 9, 9])),
3811 Arc::new(StringArray::from(vec!["z", "b", "z", "z"])),
3812 ],
3813 None,
3814 ));
3815
3816 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b, c])?;
3817
3818 let col_a = col("a", &schema)?;
3819 let list = vec![col("b", &schema)?, col("c", &schema)?];
3820 let expr = make_in_list_with_columns(col_a, list, false);
3821
3822 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3823 let result = as_boolean_array(&result);
3824 assert_eq!(
3829 result,
3830 &BooleanArray::from(vec![Some(true), Some(true), None, Some(true)])
3831 );
3832
3833 let col_a = col("a", &schema)?;
3835 let list = vec![col("b", &schema)?, col("c", &schema)?];
3836 let expr = make_in_list_with_columns(col_a, list, true);
3837
3838 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3839 let result = as_boolean_array(&result);
3840 assert_eq!(
3845 result,
3846 &BooleanArray::from(vec![Some(false), Some(false), None, Some(false)])
3847 );
3848 Ok(())
3849 }
3850
3851 fn wrap_in_dict(array: ArrayRef) -> ArrayRef {
3863 let keys = Int32Array::from((0..array.len() as i32).collect::<Vec<_>>());
3864 Arc::new(DictionaryArray::new(keys, array))
3865 }
3866
3867 fn eval_in_list_from_array(
3871 needle: ArrayRef,
3872 in_array: ArrayRef,
3873 ) -> Result<BooleanArray> {
3874 let schema =
3875 Schema::new(vec![Field::new("a", needle.data_type().clone(), false)]);
3876 let col_a = col("a", &schema)?;
3877 let expr = Arc::new(InListExpr::try_new_from_array(col_a, in_array, false)?)
3878 as Arc<dyn PhysicalExpr>;
3879 let batch = RecordBatch::try_new(Arc::new(schema), vec![needle])?;
3880 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3881 Ok(as_boolean_array(&result).clone())
3882 }
3883
3884 #[test]
3885 fn test_in_list_from_array_type_combinations() -> Result<()> {
3886 use arrow::compute::cast;
3887
3888 let expected = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
3890
3891 let base_in = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef;
3893 let base_needle = Arc::new(Int64Array::from(vec![1i64, 4, 2])) as ArrayRef;
3894
3895 let primitive_types = vec![
3897 DataType::Int8,
3898 DataType::Int16,
3899 DataType::Int32,
3900 DataType::Int64,
3901 DataType::UInt8,
3902 DataType::UInt16,
3903 DataType::UInt32,
3904 DataType::UInt64,
3905 DataType::Float32,
3906 DataType::Float64,
3907 ];
3908
3909 for dt in &primitive_types {
3910 let in_array = cast(&base_in, dt)?;
3911 let needle = cast(&base_needle, dt)?;
3912
3913 assert_eq!(
3915 expected,
3916 eval_in_list_from_array(Arc::clone(&needle), Arc::clone(&in_array))?,
3917 "same-type failed for {dt:?}"
3918 );
3919
3920 assert_eq!(
3922 expected,
3923 eval_in_list_from_array(wrap_in_dict(needle), in_array)?,
3924 "dict-needle failed for {dt:?}"
3925 );
3926 }
3927
3928 let utf8_in = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3930 let utf8_needle = Arc::new(StringArray::from(vec!["a", "d", "b"])) as ArrayRef;
3931
3932 assert_eq!(
3934 expected,
3935 eval_in_list_from_array(Arc::clone(&utf8_needle), Arc::clone(&utf8_in),)?
3936 );
3937
3938 assert_eq!(
3940 expected,
3941 eval_in_list_from_array(
3942 wrap_in_dict(Arc::clone(&utf8_needle)),
3943 Arc::clone(&utf8_in),
3944 )?
3945 );
3946
3947 assert_eq!(
3949 expected,
3950 eval_in_list_from_array(
3951 wrap_in_dict(Arc::clone(&utf8_needle)),
3952 wrap_in_dict(Arc::clone(&utf8_in)),
3953 )?
3954 );
3955
3956 let struct_fields = Fields::from(vec![
3958 Field::new("c0", DataType::Utf8, true),
3959 Field::new("c1", DataType::Int64, true),
3960 ]);
3961 let make_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
3962 let pairs: Vec<(FieldRef, ArrayRef)> =
3963 struct_fields.iter().cloned().zip([c0, c1]).collect();
3964 Arc::new(StructArray::from(pairs))
3965 };
3966 assert_eq!(
3967 expected,
3968 eval_in_list_from_array(
3969 make_struct(
3970 Arc::clone(&utf8_needle),
3971 Arc::new(Int64Array::from(vec![1, 4, 2])),
3972 ),
3973 make_struct(
3974 Arc::clone(&utf8_in),
3975 Arc::new(Int64Array::from(vec![1, 2, 3])),
3976 ),
3977 )?
3978 );
3979
3980 let dict_struct_fields = Fields::from(vec![
3982 Field::new(
3983 "c0",
3984 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3985 true,
3986 ),
3987 Field::new("c1", DataType::Int64, true),
3988 ]);
3989 let make_dict_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
3990 let pairs: Vec<(FieldRef, ArrayRef)> =
3991 dict_struct_fields.iter().cloned().zip([c0, c1]).collect();
3992 Arc::new(StructArray::from(pairs))
3993 };
3994 assert_eq!(
3995 expected,
3996 eval_in_list_from_array(
3997 make_dict_struct(
3998 wrap_in_dict(Arc::clone(&utf8_needle)),
3999 Arc::new(Int64Array::from(vec![1, 4, 2])),
4000 ),
4001 make_dict_struct(
4002 wrap_in_dict(Arc::clone(&utf8_in)),
4003 Arc::new(Int64Array::from(vec![1, 2, 3])),
4004 ),
4005 )?
4006 );
4007
4008 Ok(())
4009 }
4010
4011 #[test]
4012 fn test_in_list_from_array_type_mismatch_errors() -> Result<()> {
4013 let err = eval_in_list_from_array(
4015 Arc::new(StringArray::from(vec!["a", "d", "b"])),
4016 wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
4017 )
4018 .unwrap_err()
4019 .to_string();
4020 assert!(
4021 err.contains("Can't compare arrays of different types"),
4022 "{err}"
4023 );
4024
4025 let err = eval_in_list_from_array(
4028 wrap_in_dict(Arc::new(StringArray::from(vec!["a", "d", "b"]))),
4029 Arc::new(Int64Array::from(vec![1, 2, 3])),
4030 )
4031 .unwrap_err()
4032 .to_string();
4033 assert!(err.contains("Failed to downcast"), "{err}");
4034
4035 let err = eval_in_list_from_array(
4038 wrap_in_dict(Arc::new(Int64Array::from(vec![1, 4, 2]))),
4039 wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
4040 )
4041 .unwrap_err()
4042 .to_string();
4043 assert!(
4044 err.contains("Can't compare arrays of different types"),
4045 "{err}"
4046 );
4047 Ok(())
4048 }
4049}