1use std::any::Any;
21use std::fmt::Debug;
22use std::hash::{Hash, Hasher};
23use std::sync::Arc;
24
25use crate::PhysicalExpr;
26use crate::physical_expr::physical_exprs_bag_equal;
27
28use arrow::array::*;
29use arrow::buffer::{BooleanBuffer, NullBuffer};
30use arrow::compute::kernels::boolean::{not, or_kleene};
31use arrow::compute::kernels::cmp::eq as arrow_eq;
32use arrow::compute::{SortOptions, take};
33use arrow::datatypes::*;
34use arrow::util::bit_iterator::BitIndexIterator;
35use datafusion_common::hash_utils::with_hashes;
36use datafusion_common::{
37 DFSchema, HashSet, Result, ScalarValue, assert_or_internal_err, exec_datafusion_err,
38 exec_err,
39};
40use datafusion_expr::{ColumnarValue, expr_vec_fmt};
41
42use ahash::RandomState;
43use datafusion_common::HashMap;
44use hashbrown::hash_map::RawEntryMut;
45
46trait StaticFilter {
48 fn null_count(&self) -> usize;
49
50 fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>;
52}
53
54pub struct InListExpr {
56 expr: Arc<dyn PhysicalExpr>,
57 list: Vec<Arc<dyn PhysicalExpr>>,
58 negated: bool,
59 static_filter: Option<Arc<dyn StaticFilter + Send + Sync>>,
60}
61
62impl Debug for InListExpr {
63 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64 f.debug_struct("InListExpr")
65 .field("expr", &self.expr)
66 .field("list", &self.list)
67 .field("negated", &self.negated)
68 .finish()
69 }
70}
71
72#[derive(Debug, Clone)]
74struct ArrayStaticFilter {
75 in_array: ArrayRef,
76 state: RandomState,
77 map: HashMap<usize, (), ()>,
82}
83
84impl StaticFilter for ArrayStaticFilter {
85 fn null_count(&self) -> usize {
86 self.in_array.null_count()
87 }
88
89 fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
91 if v.data_type() == &DataType::Null
93 || self.in_array.data_type() == &DataType::Null
94 {
95 let nulls = NullBuffer::new_null(v.len());
96 return Ok(BooleanArray::new(
97 BooleanBuffer::new_unset(v.len()),
98 Some(nulls),
99 ));
100 }
101
102 downcast_dictionary_array! {
106 v => {
107 if v.values().data_type() == self.in_array.data_type() {
110 let values_contains = self.contains(v.values().as_ref(), negated)?;
111 let result = take(&values_contains, v.keys(), None)?;
112 return Ok(downcast_array(result.as_ref()));
113 }
114 }
115 _ => {}
116 }
117
118 let needle_nulls = v.logical_nulls();
119 let needle_nulls = needle_nulls.as_ref();
120 let haystack_has_nulls = self.in_array.null_count() != 0;
121
122 with_hashes([v], &self.state, |hashes| {
123 let cmp = make_comparator(v, &self.in_array, SortOptions::default())?;
124 Ok((0..v.len())
125 .map(|i| {
126 if needle_nulls.is_some_and(|nulls| nulls.is_null(i)) {
128 return None;
129 }
130
131 let hash = hashes[i];
132 let contains = self
133 .map
134 .raw_entry()
135 .from_hash(hash, |idx| cmp(i, *idx).is_eq())
136 .is_some();
137
138 match contains {
139 true => Some(!negated),
140 false if haystack_has_nulls => None,
141 false => Some(negated),
142 }
143 })
144 .collect())
145 })
146 }
147}
148
149fn supports_arrow_eq(dt: &DataType) -> bool {
156 use DataType::*;
157 match dt {
158 Boolean | Binary | LargeBinary | BinaryView | FixedSizeBinary(_) => true,
159 Dictionary(_, v) => supports_arrow_eq(v.as_ref()),
160 _ => dt.is_primitive() || dt.is_null() || dt.is_string(),
161 }
162}
163
164fn instantiate_static_filter(
165 in_array: ArrayRef,
166) -> Result<Arc<dyn StaticFilter + Send + Sync>> {
167 match in_array.data_type() {
168 DataType::Int8 => Ok(Arc::new(Int8StaticFilter::try_new(&in_array)?)),
170 DataType::Int16 => Ok(Arc::new(Int16StaticFilter::try_new(&in_array)?)),
171 DataType::Int32 => Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)),
172 DataType::Int64 => Ok(Arc::new(Int64StaticFilter::try_new(&in_array)?)),
173 DataType::UInt8 => Ok(Arc::new(UInt8StaticFilter::try_new(&in_array)?)),
174 DataType::UInt16 => Ok(Arc::new(UInt16StaticFilter::try_new(&in_array)?)),
175 DataType::UInt32 => Ok(Arc::new(UInt32StaticFilter::try_new(&in_array)?)),
176 DataType::UInt64 => Ok(Arc::new(UInt64StaticFilter::try_new(&in_array)?)),
177 DataType::Float32 => Ok(Arc::new(Float32StaticFilter::try_new(&in_array)?)),
179 DataType::Float64 => Ok(Arc::new(Float64StaticFilter::try_new(&in_array)?)),
180 _ => {
181 Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?))
183 }
184 }
185}
186
187impl ArrayStaticFilter {
188 fn try_new(in_array: ArrayRef) -> Result<ArrayStaticFilter> {
195 if in_array.data_type() == &DataType::Null {
197 return Ok(ArrayStaticFilter {
198 in_array,
199 state: RandomState::new(),
200 map: HashMap::with_hasher(()),
201 });
202 }
203
204 let state = RandomState::new();
205 let mut map: HashMap<usize, (), ()> = HashMap::with_hasher(());
206
207 with_hashes([&in_array], &state, |hashes| -> Result<()> {
208 let cmp = make_comparator(&in_array, &in_array, SortOptions::default())?;
209
210 let insert_value = |idx| {
211 let hash = hashes[idx];
212 if let RawEntryMut::Vacant(v) = map
213 .raw_entry_mut()
214 .from_hash(hash, |x| cmp(*x, idx).is_eq())
215 {
216 v.insert_with_hasher(hash, idx, (), |x| hashes[*x]);
217 }
218 };
219
220 match in_array.nulls() {
221 Some(nulls) => {
222 BitIndexIterator::new(nulls.validity(), nulls.offset(), nulls.len())
223 .for_each(insert_value)
224 }
225 None => (0..in_array.len()).for_each(insert_value),
226 }
227
228 Ok(())
229 })?;
230
231 Ok(Self {
232 in_array,
233 state,
234 map,
235 })
236 }
237}
238
239#[derive(Clone, Copy)]
242struct OrderedFloat32(f32);
243
244impl Hash for OrderedFloat32 {
245 fn hash<H: Hasher>(&self, state: &mut H) {
246 self.0.to_ne_bytes().hash(state);
247 }
248}
249
250impl PartialEq for OrderedFloat32 {
251 fn eq(&self, other: &Self) -> bool {
252 self.0.to_bits() == other.0.to_bits()
253 }
254}
255
256impl Eq for OrderedFloat32 {}
257
258impl From<f32> for OrderedFloat32 {
259 fn from(v: f32) -> Self {
260 Self(v)
261 }
262}
263
264#[derive(Clone, Copy)]
267struct OrderedFloat64(f64);
268
269impl Hash for OrderedFloat64 {
270 fn hash<H: Hasher>(&self, state: &mut H) {
271 self.0.to_ne_bytes().hash(state);
272 }
273}
274
275impl PartialEq for OrderedFloat64 {
276 fn eq(&self, other: &Self) -> bool {
277 self.0.to_bits() == other.0.to_bits()
278 }
279}
280
281impl Eq for OrderedFloat64 {}
282
283impl From<f64> for OrderedFloat64 {
284 fn from(v: f64) -> Self {
285 Self(v)
286 }
287}
288
289macro_rules! primitive_static_filter {
291 ($Name:ident, $ArrowType:ty) => {
292 struct $Name {
293 null_count: usize,
294 values: HashSet<<$ArrowType as ArrowPrimitiveType>::Native>,
295 }
296
297 impl $Name {
298 fn try_new(in_array: &ArrayRef) -> Result<Self> {
299 let in_array = in_array
300 .as_primitive_opt::<$ArrowType>()
301 .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
302
303 let mut values = HashSet::with_capacity(in_array.len());
304 let null_count = in_array.null_count();
305
306 for v in in_array.iter().flatten() {
307 values.insert(v);
308 }
309
310 Ok(Self { null_count, values })
311 }
312 }
313
314 impl StaticFilter for $Name {
315 fn null_count(&self) -> usize {
316 self.null_count
317 }
318
319 fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
320 downcast_dictionary_array! {
322 v => {
323 let values_contains = self.contains(v.values().as_ref(), negated)?;
324 let result = take(&values_contains, v.keys(), None)?;
325 return Ok(downcast_array(result.as_ref()))
326 }
327 _ => {}
328 }
329
330 let v = v
331 .as_primitive_opt::<$ArrowType>()
332 .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
333
334 let haystack_has_nulls = self.null_count > 0;
335
336 let needle_values = v.values();
337 let needle_nulls = v.nulls();
338 let needle_has_nulls = v.null_count() > 0;
339
340 let contains_buffer = if negated {
359 BooleanBuffer::collect_bool(needle_values.len(), |i| {
360 !self.values.contains(&needle_values[i])
361 })
362 } else {
363 BooleanBuffer::collect_bool(needle_values.len(), |i| {
364 self.values.contains(&needle_values[i])
365 })
366 };
367
368 let result_nulls = match (needle_has_nulls, haystack_has_nulls) {
373 (false, false) => {
374 None
376 }
377 (true, false) => {
378 needle_nulls.cloned()
380 }
381 (false, true) => {
382 let validity = if negated {
386 !&contains_buffer
387 } else {
388 contains_buffer.clone()
389 };
390 Some(NullBuffer::new(validity))
391 }
392 (true, true) => {
393 let needle_validity = needle_nulls.map(|n| n.inner().clone())
395 .unwrap_or_else(|| BooleanBuffer::new_set(needle_values.len()));
396
397 let haystack_validity = if negated {
399 !&contains_buffer
400 } else {
401 contains_buffer.clone()
402 };
403
404 let combined_validity = &needle_validity & &haystack_validity;
406 Some(NullBuffer::new(combined_validity))
407 }
408 };
409
410 Ok(BooleanArray::new(contains_buffer, result_nulls))
411 }
412 }
413 };
414}
415
416primitive_static_filter!(Int8StaticFilter, Int8Type);
418primitive_static_filter!(Int16StaticFilter, Int16Type);
419primitive_static_filter!(Int32StaticFilter, Int32Type);
420primitive_static_filter!(Int64StaticFilter, Int64Type);
421primitive_static_filter!(UInt8StaticFilter, UInt8Type);
422primitive_static_filter!(UInt16StaticFilter, UInt16Type);
423primitive_static_filter!(UInt32StaticFilter, UInt32Type);
424primitive_static_filter!(UInt64StaticFilter, UInt64Type);
425
426macro_rules! float_static_filter {
429 ($Name:ident, $ArrowType:ty, $OrderedType:ty) => {
430 struct $Name {
431 null_count: usize,
432 values: HashSet<$OrderedType>,
433 }
434
435 impl $Name {
436 fn try_new(in_array: &ArrayRef) -> Result<Self> {
437 let in_array = in_array
438 .as_primitive_opt::<$ArrowType>()
439 .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
440
441 let mut values = HashSet::with_capacity(in_array.len());
442 let null_count = in_array.null_count();
443
444 for v in in_array.iter().flatten() {
445 values.insert(<$OrderedType>::from(v));
446 }
447
448 Ok(Self { null_count, values })
449 }
450 }
451
452 impl StaticFilter for $Name {
453 fn null_count(&self) -> usize {
454 self.null_count
455 }
456
457 fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
458 downcast_dictionary_array! {
460 v => {
461 let values_contains = self.contains(v.values().as_ref(), negated)?;
462 let result = take(&values_contains, v.keys(), None)?;
463 return Ok(downcast_array(result.as_ref()))
464 }
465 _ => {}
466 }
467
468 let v = v
469 .as_primitive_opt::<$ArrowType>()
470 .ok_or_else(|| exec_datafusion_err!("Failed to downcast an array to a '{}' array", stringify!($ArrowType)))?;
471
472 let haystack_has_nulls = self.null_count > 0;
473
474 let needle_values = v.values();
475 let needle_nulls = v.nulls();
476 let needle_has_nulls = v.null_count() > 0;
477
478 let contains_buffer = if negated {
497 BooleanBuffer::collect_bool(needle_values.len(), |i| {
498 !self.values.contains(&<$OrderedType>::from(needle_values[i]))
499 })
500 } else {
501 BooleanBuffer::collect_bool(needle_values.len(), |i| {
502 self.values.contains(&<$OrderedType>::from(needle_values[i]))
503 })
504 };
505
506 let result_nulls = match (needle_has_nulls, haystack_has_nulls) {
511 (false, false) => {
512 None
514 }
515 (true, false) => {
516 needle_nulls.cloned()
518 }
519 (false, true) => {
520 let validity = if negated {
524 !&contains_buffer
525 } else {
526 contains_buffer.clone()
527 };
528 Some(NullBuffer::new(validity))
529 }
530 (true, true) => {
531 let needle_validity = needle_nulls.map(|n| n.inner().clone())
533 .unwrap_or_else(|| BooleanBuffer::new_set(needle_values.len()));
534
535 let haystack_validity = if negated {
537 !&contains_buffer
538 } else {
539 contains_buffer.clone()
540 };
541
542 let combined_validity = &needle_validity & &haystack_validity;
544 Some(NullBuffer::new(combined_validity))
545 }
546 };
547
548 Ok(BooleanArray::new(contains_buffer, result_nulls))
549 }
550 }
551 };
552}
553
554float_static_filter!(Float32StaticFilter, Float32Type, OrderedFloat32);
556float_static_filter!(Float64StaticFilter, Float64Type, OrderedFloat64);
557
558fn evaluate_list(
560 list: &[Arc<dyn PhysicalExpr>],
561 batch: &RecordBatch,
562) -> Result<ArrayRef> {
563 let scalars = list
564 .iter()
565 .map(|expr| {
566 expr.evaluate(batch).and_then(|r| match r {
567 ColumnarValue::Array(_) => {
568 exec_err!("InList expression must evaluate to a scalar")
569 }
570 ColumnarValue::Scalar(ScalarValue::Dictionary(_, v)) => Ok(*v),
572 ColumnarValue::Scalar(s) => Ok(s),
573 })
574 })
575 .collect::<Result<Vec<_>>>()?;
576
577 ScalarValue::iter_to_array(scalars)
578}
579
580fn try_evaluate_constant_list(
590 list: &[Arc<dyn PhysicalExpr>],
591 schema: &Schema,
592) -> Result<Option<ArrayRef>> {
593 let batch = RecordBatch::new_empty(Arc::new(schema.clone()));
594 match evaluate_list(list, &batch) {
595 Ok(array) => Ok(Some(array)),
596 Err(_) => {
597 Ok(None)
600 }
601 }
602}
603
604impl InListExpr {
605 fn new(
607 expr: Arc<dyn PhysicalExpr>,
608 list: Vec<Arc<dyn PhysicalExpr>>,
609 negated: bool,
610 static_filter: Option<Arc<dyn StaticFilter + Send + Sync>>,
611 ) -> Self {
612 Self {
613 expr,
614 list,
615 negated,
616 static_filter,
617 }
618 }
619
620 pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
622 &self.expr
623 }
624
625 pub fn list(&self) -> &[Arc<dyn PhysicalExpr>] {
627 &self.list
628 }
629
630 pub fn is_empty(&self) -> bool {
631 self.list.is_empty()
632 }
633
634 pub fn len(&self) -> usize {
635 self.list.len()
636 }
637
638 pub fn negated(&self) -> bool {
640 self.negated
641 }
642
643 pub fn try_new_from_array(
655 expr: Arc<dyn PhysicalExpr>,
656 array: ArrayRef,
657 negated: bool,
658 ) -> Result<Self> {
659 let list = (0..array.len())
660 .map(|i| {
661 let scalar = ScalarValue::try_from_array(array.as_ref(), i)?;
662 Ok(crate::expressions::lit(scalar) as Arc<dyn PhysicalExpr>)
663 })
664 .collect::<Result<Vec<_>>>()?;
665 Ok(Self::new(
666 expr,
667 list,
668 negated,
669 Some(instantiate_static_filter(array)?),
670 ))
671 }
672
673 pub fn try_new(
682 expr: Arc<dyn PhysicalExpr>,
683 list: Vec<Arc<dyn PhysicalExpr>>,
684 negated: bool,
685 schema: &Schema,
686 ) -> Result<Self> {
687 let expr_data_type = expr.data_type(schema)?;
689 for list_expr in list.iter() {
690 let list_expr_data_type = list_expr.data_type(schema)?;
691 assert_or_internal_err!(
692 DFSchema::datatype_is_logically_equal(
693 &expr_data_type,
694 &list_expr_data_type
695 ),
696 "The data type inlist should be same, the value type is {expr_data_type}, one of list expr type is {list_expr_data_type}"
697 );
698 }
699
700 let static_filter = match try_evaluate_constant_list(&list, schema)? {
702 Some(in_array) => Some(instantiate_static_filter(in_array)?),
703 None => None, };
705
706 Ok(Self::new(expr, list, negated, static_filter))
707 }
708}
709impl std::fmt::Display for InListExpr {
710 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
711 let list = expr_vec_fmt!(self.list);
712
713 if self.negated {
714 if self.static_filter.is_some() {
715 write!(f, "{} NOT IN (SET) ([{list}])", self.expr)
716 } else {
717 write!(f, "{} NOT IN ([{list}])", self.expr)
718 }
719 } else if self.static_filter.is_some() {
720 write!(f, "{} IN (SET) ([{list}])", self.expr)
721 } else {
722 write!(f, "{} IN ([{list}])", self.expr)
723 }
724 }
725}
726
727impl PhysicalExpr for InListExpr {
728 fn as_any(&self) -> &dyn Any {
730 self
731 }
732
733 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
734 Ok(DataType::Boolean)
735 }
736
737 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
738 if self.expr.nullable(input_schema)? {
739 return Ok(true);
740 }
741
742 if let Some(static_filter) = &self.static_filter {
743 Ok(static_filter.null_count() > 0)
744 } else {
745 for expr in &self.list {
746 if expr.nullable(input_schema)? {
747 return Ok(true);
748 }
749 }
750 Ok(false)
751 }
752 }
753
754 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
755 let num_rows = batch.num_rows();
756 let value = self.expr.evaluate(batch)?;
757 let r = match &self.static_filter {
758 Some(filter) => {
759 match value {
760 ColumnarValue::Array(array) => {
761 filter.contains(&array, self.negated)?
762 }
763 ColumnarValue::Scalar(scalar) => {
764 if scalar.is_null() {
765 let nulls = NullBuffer::new_null(num_rows);
768 return Ok(ColumnarValue::Array(Arc::new(
769 BooleanArray::new(
770 BooleanBuffer::new_unset(num_rows),
771 Some(nulls),
772 ),
773 )));
774 }
775 let array = scalar.to_array()?;
778 let result_array =
779 filter.contains(array.as_ref(), self.negated)?;
780 if result_array.is_null(0) {
783 let nulls = NullBuffer::new_null(num_rows);
784 BooleanArray::new(
785 BooleanBuffer::new_unset(num_rows),
786 Some(nulls),
787 )
788 } else if result_array.value(0) {
789 BooleanArray::new(BooleanBuffer::new_set(num_rows), None)
790 } else {
791 BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)
792 }
793 }
794 }
795 }
796 None => {
797 let value = value.into_array(num_rows)?;
802 let lhs_supports_arrow_eq = supports_arrow_eq(value.data_type());
803 let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold(
804 BooleanArray::new(BooleanBuffer::new_unset(num_rows), None),
805 |result, expr| -> Result<BooleanArray> {
806 let rhs = match expr? {
807 ColumnarValue::Array(array) => {
808 if lhs_supports_arrow_eq
809 && supports_arrow_eq(array.data_type())
810 {
811 arrow_eq(&value, &array)?
812 } else {
813 let cmp = make_comparator(
814 value.as_ref(),
815 array.as_ref(),
816 SortOptions::default(),
817 )?;
818 (0..num_rows)
819 .map(|i| {
820 if value.is_null(i) || array.is_null(i) {
821 return None;
822 }
823 Some(cmp(i, i).is_eq())
824 })
825 .collect::<BooleanArray>()
826 }
827 }
828 ColumnarValue::Scalar(scalar) => {
829 if scalar.is_null() {
831 BooleanArray::from(vec![None; num_rows])
833 } else if lhs_supports_arrow_eq {
834 let scalar_datum = scalar.to_scalar()?;
835 arrow_eq(&value, &scalar_datum)?
836 } else {
837 let array = scalar.to_array()?;
839 let cmp = make_comparator(
840 value.as_ref(),
841 array.as_ref(),
842 SortOptions::default(),
843 )?;
844 (0..num_rows)
846 .map(|i| {
847 if value.is_null(i) {
848 None
849 } else {
850 Some(cmp(i, 0).is_eq())
851 }
852 })
853 .collect::<BooleanArray>()
854 }
855 }
856 };
857 Ok(or_kleene(&result, &rhs)?)
858 },
859 )?;
860
861 if self.negated { not(&found)? } else { found }
862 }
863 };
864 Ok(ColumnarValue::Array(Arc::new(r)))
865 }
866
867 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
868 let mut children = vec![&self.expr];
869 children.extend(&self.list);
870 children
871 }
872
873 fn with_new_children(
874 self: Arc<Self>,
875 children: Vec<Arc<dyn PhysicalExpr>>,
876 ) -> Result<Arc<dyn PhysicalExpr>> {
877 Ok(Arc::new(InListExpr::new(
879 Arc::clone(&children[0]),
880 children[1..].to_vec(),
881 self.negated,
882 self.static_filter.as_ref().map(Arc::clone),
883 )))
884 }
885
886 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
887 self.expr.fmt_sql(f)?;
888 if self.negated {
889 write!(f, " NOT")?;
890 }
891
892 write!(f, " IN (")?;
893 for (i, expr) in self.list.iter().enumerate() {
894 if i > 0 {
895 write!(f, ", ")?;
896 }
897 expr.fmt_sql(f)?;
898 }
899 write!(f, ")")
900 }
901}
902
903impl PartialEq for InListExpr {
904 fn eq(&self, other: &Self) -> bool {
905 self.expr.eq(&other.expr)
906 && physical_exprs_bag_equal(&self.list, &other.list)
907 && self.negated == other.negated
908 }
909}
910
911impl Eq for InListExpr {}
912
913impl Hash for InListExpr {
914 fn hash<H: Hasher>(&self, state: &mut H) {
915 self.expr.hash(state);
916 self.negated.hash(state);
917 self.list.hash(state);
919 }
920}
921
922pub fn in_list(
924 expr: Arc<dyn PhysicalExpr>,
925 list: Vec<Arc<dyn PhysicalExpr>>,
926 negated: &bool,
927 schema: &Schema,
928) -> Result<Arc<dyn PhysicalExpr>> {
929 Ok(Arc::new(InListExpr::try_new(expr, list, *negated, schema)?))
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935 use crate::expressions::{col, lit, try_cast};
936 use arrow::buffer::NullBuffer;
937 use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano, i256};
938 use datafusion_common::plan_err;
939 use datafusion_expr::type_coercion::binary::comparison_coercion;
940 use datafusion_physical_expr_common::physical_expr::fmt_sql;
941 use insta::assert_snapshot;
942 use itertools::Itertools;
943
944 type InListCastResult = (Arc<dyn PhysicalExpr>, Vec<Arc<dyn PhysicalExpr>>);
945
946 fn in_list_cast(
949 expr: Arc<dyn PhysicalExpr>,
950 list: Vec<Arc<dyn PhysicalExpr>>,
951 input_schema: &Schema,
952 ) -> Result<InListCastResult> {
953 let expr_type = &expr.data_type(input_schema)?;
954 let list_types: Vec<DataType> = list
955 .iter()
956 .map(|list_expr| list_expr.data_type(input_schema).unwrap())
957 .collect();
958 let result_type = get_coerce_type(expr_type, &list_types);
959 match result_type {
960 None => plan_err!(
961 "Can not find compatible types to compare {expr_type} with [{}]",
962 list_types.iter().join(", ")
963 ),
964 Some(data_type) => {
965 let cast_expr = try_cast(expr, input_schema, data_type.clone())?;
967 let cast_list_expr = list
968 .into_iter()
969 .map(|list_expr| {
970 try_cast(list_expr, input_schema, data_type.clone()).unwrap()
971 })
972 .collect();
973 Ok((cast_expr, cast_list_expr))
974 }
975 }
976 }
977
978 fn get_coerce_type(expr_type: &DataType, list_type: &[DataType]) -> Option<DataType> {
981 list_type
982 .iter()
983 .try_fold(expr_type.clone(), |left_type, right_type| {
984 comparison_coercion(&left_type, right_type)
985 })
986 }
987
988 macro_rules! in_list {
1001 ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{
1002 let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
1003 in_list_raw!(
1004 $BATCH,
1005 cast_list_exprs,
1006 $NEGATED,
1007 $EXPECTED,
1008 cast_expr,
1009 $SCHEMA
1010 );
1011 }};
1012 }
1013
1014 macro_rules! in_list_raw {
1028 ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{
1029 let col_expr = $COL;
1030 let expr = in_list(Arc::clone(&col_expr), $LIST, $NEGATED, $SCHEMA).unwrap();
1031 let result = expr
1032 .evaluate(&$BATCH)?
1033 .into_array($BATCH.num_rows())
1034 .expect("Failed to convert to array");
1035 let result = as_boolean_array(&result);
1036 let expected = &BooleanArray::from($EXPECTED);
1037 assert_eq!(
1038 expected,
1039 result,
1040 "Failed for: {}\n{}: {:?}",
1041 fmt_sql(expr.as_ref()),
1042 fmt_sql(col_expr.as_ref()),
1043 col_expr
1044 .evaluate(&$BATCH)?
1045 .into_array($BATCH.num_rows())
1046 .unwrap()
1047 );
1048 }};
1049 }
1050
1051 struct InListPrimitiveTestCase {
1060 name: &'static str,
1061 value_in: ScalarValue,
1062 value_not_in: ScalarValue,
1063 other_list_values: Vec<ScalarValue>,
1064 null_value: Option<ScalarValue>,
1065 }
1066
1067 #[derive(Clone)]
1072 struct PrimitiveTestCaseData<T> {
1073 value_in: T,
1074 value_not_in: T,
1075 other_list_values: Vec<T>,
1076 }
1077
1078 fn primitive_test_case<T, D, F>(
1084 name: &'static str,
1085 constructor: F,
1086 data: PrimitiveTestCaseData<D>,
1087 ) -> InListPrimitiveTestCase
1088 where
1089 D: TryInto<T> + Clone,
1090 <D as TryInto<T>>::Error: Debug,
1091 F: Fn(Option<T>) -> ScalarValue,
1092 T: Clone,
1093 {
1094 InListPrimitiveTestCase {
1095 name,
1096 value_in: constructor(Some(data.value_in.try_into().unwrap())),
1097 value_not_in: constructor(Some(data.value_not_in.try_into().unwrap())),
1098 other_list_values: data
1099 .other_list_values
1100 .into_iter()
1101 .map(|v| constructor(Some(v.try_into().unwrap())))
1102 .collect(),
1103 null_value: Some(constructor(None)),
1104 }
1105 }
1106
1107 fn primitive_test_case_no_nulls<T, D, F>(
1110 name: &'static str,
1111 constructor: F,
1112 data: PrimitiveTestCaseData<D>,
1113 ) -> InListPrimitiveTestCase
1114 where
1115 D: TryInto<T> + Clone,
1116 <D as TryInto<T>>::Error: Debug,
1117 F: Fn(Option<T>) -> ScalarValue,
1118 T: Clone,
1119 {
1120 InListPrimitiveTestCase {
1121 name,
1122 value_in: constructor(Some(data.value_in.try_into().unwrap())),
1123 value_not_in: constructor(Some(data.value_not_in.try_into().unwrap())),
1124 other_list_values: data
1125 .other_list_values
1126 .into_iter()
1127 .map(|v| constructor(Some(v.try_into().unwrap())))
1128 .collect(),
1129 null_value: None,
1130 }
1131 }
1132
1133 fn run_test_cases(test_cases: Vec<InListPrimitiveTestCase>) -> Result<()> {
1139 for test_case in test_cases {
1140 let test_name = test_case.name;
1141
1142 let data_type = test_case.value_in.data_type();
1144
1145 let build_base_list = || -> Vec<Arc<dyn PhysicalExpr>> {
1147 let mut list = vec![lit(test_case.value_in.clone())];
1148 list.extend(test_case.other_list_values.iter().map(|v| lit(v.clone())));
1149 list
1150 };
1151
1152 match &test_case.null_value {
1153 Some(null_val) => {
1154 let schema =
1156 Schema::new(vec![Field::new("a", data_type.clone(), true)]);
1157
1158 let array = ScalarValue::iter_to_array(vec![
1160 test_case.value_in.clone(),
1161 test_case.value_not_in.clone(),
1162 null_val.clone(),
1163 ])?;
1164
1165 let col_a = col("a", &schema)?;
1166 let batch = RecordBatch::try_new(
1167 Arc::new(schema.clone()),
1168 vec![Arc::clone(&array)],
1169 )?;
1170
1171 let list = build_base_list();
1173 in_list!(
1174 batch,
1175 list,
1176 &false,
1177 vec![Some(true), Some(false), None],
1178 Arc::clone(&col_a),
1179 &schema
1180 );
1181
1182 let list = build_base_list();
1184 in_list!(
1185 batch,
1186 list,
1187 &true,
1188 vec![Some(false), Some(true), None],
1189 Arc::clone(&col_a),
1190 &schema
1191 );
1192
1193 let mut list = build_base_list();
1195 list.push(lit(null_val.clone()));
1196 in_list!(
1197 batch,
1198 list,
1199 &false,
1200 vec![Some(true), None, None],
1201 Arc::clone(&col_a),
1202 &schema
1203 );
1204
1205 let mut list = build_base_list();
1207 list.push(lit(null_val.clone()));
1208 in_list!(
1209 batch,
1210 list,
1211 &true,
1212 vec![Some(false), None, None],
1213 Arc::clone(&col_a),
1214 &schema
1215 );
1216 }
1217 None => {
1218 let schema =
1220 Schema::new(vec![Field::new("a", data_type.clone(), false)]);
1221
1222 let array = ScalarValue::iter_to_array(vec![
1224 test_case.value_in.clone(),
1225 test_case.value_not_in.clone(),
1226 ])?;
1227
1228 let col_a = col("a", &schema)?;
1229 let batch = RecordBatch::try_new(
1230 Arc::new(schema.clone()),
1231 vec![Arc::clone(&array)],
1232 )?;
1233
1234 let list = build_base_list();
1236 in_list!(
1237 batch,
1238 list,
1239 &false,
1240 vec![Some(true), Some(false)],
1241 Arc::clone(&col_a),
1242 &schema
1243 );
1244
1245 let list = build_base_list();
1247 in_list!(
1248 batch,
1249 list,
1250 &true,
1251 vec![Some(false), Some(true)],
1252 Arc::clone(&col_a),
1253 &schema
1254 );
1255
1256 eprintln!(
1257 "Test '{test_name}': exercised (false, true) branch (no nulls, negated)",
1258 );
1259 }
1260 }
1261 }
1262
1263 Ok(())
1264 }
1265
1266 #[test]
1270 fn in_list_int_types() -> Result<()> {
1271 let int_data = PrimitiveTestCaseData {
1272 value_in: 0,
1273 value_not_in: 2,
1274 other_list_values: vec![1, 3, 5],
1275 };
1276
1277 run_test_cases(vec![
1278 primitive_test_case("int8", ScalarValue::Int8, int_data.clone()),
1280 primitive_test_case("int16", ScalarValue::Int16, int_data.clone()),
1281 primitive_test_case("int32", ScalarValue::Int32, int_data.clone()),
1282 primitive_test_case("int64", ScalarValue::Int64, int_data.clone()),
1283 primitive_test_case("uint8", ScalarValue::UInt8, int_data.clone()),
1284 primitive_test_case("uint16", ScalarValue::UInt16, int_data.clone()),
1285 primitive_test_case("uint32", ScalarValue::UInt32, int_data.clone()),
1286 primitive_test_case("uint64", ScalarValue::UInt64, int_data.clone()),
1287 primitive_test_case_no_nulls("int32_no_nulls", ScalarValue::Int32, int_data),
1289 ])
1290 }
1291
1292 #[test]
1296 fn in_list_string_types() -> Result<()> {
1297 let string_data = PrimitiveTestCaseData {
1298 value_in: "a",
1299 value_not_in: "d",
1300 other_list_values: vec!["b", "c"],
1301 };
1302
1303 run_test_cases(vec![
1304 primitive_test_case("utf8", ScalarValue::Utf8, string_data.clone()),
1305 primitive_test_case(
1306 "large_utf8",
1307 ScalarValue::LargeUtf8,
1308 string_data.clone(),
1309 ),
1310 primitive_test_case("utf8_view", ScalarValue::Utf8View, string_data),
1311 ])
1312 }
1313
1314 #[test]
1318 fn in_list_binary_types() -> Result<()> {
1319 let binary_data = PrimitiveTestCaseData {
1320 value_in: vec![1_u8, 2, 3],
1321 value_not_in: vec![1_u8, 2, 2],
1322 other_list_values: vec![vec![4_u8, 5, 6], vec![7_u8, 8, 9]],
1323 };
1324
1325 run_test_cases(vec![
1326 primitive_test_case("binary", ScalarValue::Binary, binary_data.clone()),
1327 primitive_test_case(
1328 "large_binary",
1329 ScalarValue::LargeBinary,
1330 binary_data.clone(),
1331 ),
1332 primitive_test_case("binary_view", ScalarValue::BinaryView, binary_data),
1333 ])
1334 }
1335
1336 #[test]
1340 fn in_list_date_types() -> Result<()> {
1341 let date_data = PrimitiveTestCaseData {
1342 value_in: 0,
1343 value_not_in: 2,
1344 other_list_values: vec![1, 3],
1345 };
1346
1347 run_test_cases(vec![
1348 primitive_test_case("date32", ScalarValue::Date32, date_data.clone()),
1349 primitive_test_case("date64", ScalarValue::Date64, date_data),
1350 ])
1351 }
1352
1353 #[test]
1357 fn in_list_decimal() -> Result<()> {
1358 run_test_cases(vec![InListPrimitiveTestCase {
1359 name: "decimal128",
1360 value_in: ScalarValue::Decimal128(Some(0), 10, 2),
1361 value_not_in: ScalarValue::Decimal128(Some(200), 10, 2),
1362 other_list_values: vec![
1363 ScalarValue::Decimal128(Some(100), 10, 2),
1364 ScalarValue::Decimal128(Some(300), 10, 2),
1365 ],
1366 null_value: Some(ScalarValue::Decimal128(None, 10, 2)),
1367 }])
1368 }
1369
1370 #[test]
1374 fn in_list_timestamp_types() -> Result<()> {
1375 run_test_cases(vec![
1376 InListPrimitiveTestCase {
1377 name: "timestamp_nanosecond",
1378 value_in: ScalarValue::TimestampNanosecond(Some(0), None),
1379 value_not_in: ScalarValue::TimestampNanosecond(Some(2000), None),
1380 other_list_values: vec![
1381 ScalarValue::TimestampNanosecond(Some(1000), None),
1382 ScalarValue::TimestampNanosecond(Some(3000), None),
1383 ],
1384 null_value: Some(ScalarValue::TimestampNanosecond(None, None)),
1385 },
1386 InListPrimitiveTestCase {
1387 name: "timestamp_millisecond_with_tz",
1388 value_in: ScalarValue::TimestampMillisecond(
1389 Some(1500000),
1390 Some("+05:00".into()),
1391 ),
1392 value_not_in: ScalarValue::TimestampMillisecond(
1393 Some(2500000),
1394 Some("+05:00".into()),
1395 ),
1396 other_list_values: vec![ScalarValue::TimestampMillisecond(
1397 Some(3500000),
1398 Some("+05:00".into()),
1399 )],
1400 null_value: Some(ScalarValue::TimestampMillisecond(
1401 None,
1402 Some("+05:00".into()),
1403 )),
1404 },
1405 InListPrimitiveTestCase {
1406 name: "timestamp_millisecond_mixed_tz",
1407 value_in: ScalarValue::TimestampMillisecond(
1408 Some(1500000),
1409 Some("+05:00".into()),
1410 ),
1411 value_not_in: ScalarValue::TimestampMillisecond(
1412 Some(2500000),
1413 Some("+05:00".into()),
1414 ),
1415 other_list_values: vec![
1416 ScalarValue::TimestampMillisecond(
1417 Some(3500000),
1418 Some("+01:00".into()),
1419 ),
1420 ScalarValue::TimestampMillisecond(Some(4500000), Some("UTC".into())),
1421 ],
1422 null_value: Some(ScalarValue::TimestampMillisecond(
1423 None,
1424 Some("+05:00".into()),
1425 )),
1426 },
1427 ])
1428 }
1429
1430 #[test]
1431 fn in_list_float64() -> Result<()> {
1432 let schema = Schema::new(vec![Field::new("a", DataType::Float64, true)]);
1433 let a = Float64Array::from(vec![
1434 Some(0.0),
1435 Some(0.2),
1436 None,
1437 Some(f64::NAN),
1438 Some(-f64::NAN),
1439 ]);
1440 let col_a = col("a", &schema)?;
1441 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1442
1443 let list = vec![lit(0.0f64), lit(0.1f64)];
1445 in_list!(
1446 batch,
1447 list,
1448 &false,
1449 vec![Some(true), Some(false), None, Some(false), Some(false)],
1450 Arc::clone(&col_a),
1451 &schema
1452 );
1453
1454 let list = vec![lit(0.0f64), lit(0.1f64)];
1456 in_list!(
1457 batch,
1458 list,
1459 &true,
1460 vec![Some(false), Some(true), None, Some(true), Some(true)],
1461 Arc::clone(&col_a),
1462 &schema
1463 );
1464
1465 let list = vec![lit(0.0f64), lit(0.1f64), lit(ScalarValue::Null)];
1467 in_list!(
1468 batch,
1469 list,
1470 &false,
1471 vec![Some(true), None, None, None, None],
1472 Arc::clone(&col_a),
1473 &schema
1474 );
1475
1476 let list = vec![lit(0.0f64), lit(0.1f64), lit(ScalarValue::Null)];
1478 in_list!(
1479 batch,
1480 list,
1481 &true,
1482 vec![Some(false), None, None, None, None],
1483 Arc::clone(&col_a),
1484 &schema
1485 );
1486
1487 let list = vec![lit(0.0f64), lit(0.1f64), lit(f64::NAN)];
1489 in_list!(
1490 batch,
1491 list,
1492 &false,
1493 vec![Some(true), Some(false), None, Some(true), Some(false)],
1494 Arc::clone(&col_a),
1495 &schema
1496 );
1497
1498 let list = vec![lit(0.0f64), lit(0.1f64), lit(f64::NAN)];
1500 in_list!(
1501 batch,
1502 list,
1503 &true,
1504 vec![Some(false), Some(true), None, Some(false), Some(true)],
1505 Arc::clone(&col_a),
1506 &schema
1507 );
1508
1509 let list = vec![lit(0.0f64), lit(0.1f64), lit(-f64::NAN)];
1511 in_list!(
1512 batch,
1513 list,
1514 &false,
1515 vec![Some(true), Some(false), None, Some(false), Some(true)],
1516 Arc::clone(&col_a),
1517 &schema
1518 );
1519
1520 let list = vec![lit(0.0f64), lit(0.1f64), lit(-f64::NAN)];
1522 in_list!(
1523 batch,
1524 list,
1525 &true,
1526 vec![Some(false), Some(true), None, Some(true), Some(false)],
1527 Arc::clone(&col_a),
1528 &schema
1529 );
1530
1531 Ok(())
1532 }
1533
1534 #[test]
1535 fn in_list_bool() -> Result<()> {
1536 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
1537 let a = BooleanArray::from(vec![Some(true), None]);
1538 let col_a = col("a", &schema)?;
1539 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1540
1541 let list = vec![lit(true)];
1543 in_list!(
1544 batch,
1545 list,
1546 &false,
1547 vec![Some(true), None],
1548 Arc::clone(&col_a),
1549 &schema
1550 );
1551
1552 let list = vec![lit(true)];
1554 in_list!(
1555 batch,
1556 list,
1557 &true,
1558 vec![Some(false), None],
1559 Arc::clone(&col_a),
1560 &schema
1561 );
1562
1563 let list = vec![lit(true), lit(ScalarValue::Null)];
1565 in_list!(
1566 batch,
1567 list,
1568 &false,
1569 vec![Some(true), None],
1570 Arc::clone(&col_a),
1571 &schema
1572 );
1573
1574 let list = vec![lit(true), lit(ScalarValue::Null)];
1576 in_list!(
1577 batch,
1578 list,
1579 &true,
1580 vec![Some(false), None],
1581 Arc::clone(&col_a),
1582 &schema
1583 );
1584
1585 Ok(())
1586 }
1587
1588 macro_rules! test_nullable {
1589 ($COL:expr, $LIST:expr, $SCHEMA:expr, $EXPECTED:expr) => {{
1590 let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
1591 let expr = in_list(cast_expr, cast_list_exprs, &false, $SCHEMA).unwrap();
1592 let result = expr.nullable($SCHEMA)?;
1593 assert_eq!($EXPECTED, result);
1594 }};
1595 }
1596
1597 #[test]
1598 fn in_list_nullable() -> Result<()> {
1599 let schema = Schema::new(vec![
1600 Field::new("c1_nullable", DataType::Int64, true),
1601 Field::new("c2_non_nullable", DataType::Int64, false),
1602 ]);
1603
1604 let c1_nullable = col("c1_nullable", &schema)?;
1605 let c2_non_nullable = col("c2_non_nullable", &schema)?;
1606
1607 let list = vec![lit(1_i64), lit(2_i64)];
1609 test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1610 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, false);
1611
1612 let list = vec![lit(1_i64), lit(2_i64), lit(ScalarValue::Null)];
1614 test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1615 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, true);
1616
1617 let list = vec![Arc::clone(&c1_nullable)];
1618 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, true);
1619
1620 let list = vec![Arc::clone(&c2_non_nullable)];
1621 test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1622
1623 let list = vec![Arc::clone(&c2_non_nullable), Arc::clone(&c2_non_nullable)];
1624 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, false);
1625
1626 Ok(())
1627 }
1628
1629 #[test]
1630 fn in_list_no_cols() -> Result<()> {
1631 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1633 let a = Int32Array::from(vec![Some(1), Some(2), None]);
1634 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1635
1636 let list = vec![lit(ScalarValue::from(1i32)), lit(ScalarValue::from(6i32))];
1637
1638 let expr = lit(ScalarValue::Int32(Some(1)));
1640 in_list!(
1641 batch,
1642 list.clone(),
1643 &false,
1644 vec![Some(true), Some(true), Some(true)],
1646 expr,
1647 &schema
1648 );
1649
1650 let expr = lit(ScalarValue::Int32(Some(2)));
1652 in_list!(
1653 batch,
1654 list.clone(),
1655 &false,
1656 vec![Some(false), Some(false), Some(false)],
1658 expr,
1659 &schema
1660 );
1661
1662 let expr = lit(ScalarValue::Int32(None));
1664 in_list!(
1665 batch,
1666 list.clone(),
1667 &false,
1668 vec![None, None, None],
1670 expr,
1671 &schema
1672 );
1673
1674 Ok(())
1675 }
1676
1677 #[test]
1678 fn in_list_utf8_with_dict_types() -> Result<()> {
1679 fn dict_lit(key_type: DataType, value: &str) -> Arc<dyn PhysicalExpr> {
1680 lit(ScalarValue::Dictionary(
1681 Box::new(key_type),
1682 Box::new(ScalarValue::new_utf8(value.to_string())),
1683 ))
1684 }
1685
1686 fn null_dict_lit(key_type: DataType) -> Arc<dyn PhysicalExpr> {
1687 lit(ScalarValue::Dictionary(
1688 Box::new(key_type),
1689 Box::new(ScalarValue::Utf8(None)),
1690 ))
1691 }
1692
1693 let schema = Schema::new(vec![Field::new(
1694 "a",
1695 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
1696 true,
1697 )]);
1698 let a: UInt16DictionaryArray =
1699 vec![Some("a"), Some("d"), None].into_iter().collect();
1700 let col_a = col("a", &schema)?;
1701 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1702
1703 let lists = [
1705 vec![lit("a"), lit("b")],
1706 vec![
1707 dict_lit(DataType::Int8, "a"),
1708 dict_lit(DataType::UInt16, "b"),
1709 ],
1710 ];
1711 for list in lists.iter() {
1712 in_list_raw!(
1713 batch,
1714 list.clone(),
1715 &false,
1716 vec![Some(true), Some(false), None],
1717 Arc::clone(&col_a),
1718 &schema
1719 );
1720 }
1721
1722 for list in lists.iter() {
1724 in_list_raw!(
1725 batch,
1726 list.clone(),
1727 &true,
1728 vec![Some(false), Some(true), None],
1729 Arc::clone(&col_a),
1730 &schema
1731 );
1732 }
1733
1734 let lists = [
1736 vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
1737 vec![
1738 dict_lit(DataType::Int8, "a"),
1739 dict_lit(DataType::UInt16, "b"),
1740 null_dict_lit(DataType::UInt16),
1741 ],
1742 ];
1743 for list in lists.iter() {
1744 in_list_raw!(
1745 batch,
1746 list.clone(),
1747 &false,
1748 vec![Some(true), None, None],
1749 Arc::clone(&col_a),
1750 &schema
1751 );
1752 }
1753
1754 for list in lists.iter() {
1756 in_list_raw!(
1757 batch,
1758 list.clone(),
1759 &true,
1760 vec![Some(false), None, None],
1761 Arc::clone(&col_a),
1762 &schema
1763 );
1764 }
1765
1766 Ok(())
1767 }
1768
1769 #[test]
1770 fn test_fmt_sql_1() -> Result<()> {
1771 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1772 let col_a = col("a", &schema)?;
1773
1774 let list = vec![lit("a"), lit("b")];
1776 let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?;
1777 let sql_string = fmt_sql(expr.as_ref()).to_string();
1778 let display_string = expr.to_string();
1779 assert_snapshot!(sql_string, @"a IN (a, b)");
1780 assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b])");
1781 Ok(())
1782 }
1783
1784 #[test]
1785 fn test_fmt_sql_2() -> Result<()> {
1786 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1787 let col_a = col("a", &schema)?;
1788
1789 let list = vec![lit("a"), lit("b")];
1791 let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?;
1792 let sql_string = fmt_sql(expr.as_ref()).to_string();
1793 let display_string = expr.to_string();
1794
1795 assert_snapshot!(sql_string, @"a NOT IN (a, b)");
1796 assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b])");
1797 Ok(())
1798 }
1799
1800 #[test]
1801 fn test_fmt_sql_3() -> Result<()> {
1802 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1803 let col_a = col("a", &schema)?;
1804 let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))];
1806 let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?;
1807 let sql_string = fmt_sql(expr.as_ref()).to_string();
1808 let display_string = expr.to_string();
1809
1810 assert_snapshot!(sql_string, @"a IN (a, b, NULL)");
1811 assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b, NULL])");
1812 Ok(())
1813 }
1814
1815 #[test]
1816 fn test_fmt_sql_4() -> Result<()> {
1817 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1818 let col_a = col("a", &schema)?;
1819 let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))];
1821 let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?;
1822 let sql_string = fmt_sql(expr.as_ref()).to_string();
1823 let display_string = expr.to_string();
1824 assert_snapshot!(sql_string, @"a NOT IN (a, b, NULL)");
1825 assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b, NULL])");
1826 Ok(())
1827 }
1828
1829 #[test]
1830 fn in_list_struct() -> Result<()> {
1831 let struct_fields = Fields::from(vec![
1833 Field::new("x", DataType::Int32, false),
1834 Field::new("y", DataType::Utf8, false),
1835 ]);
1836 let schema = Schema::new(vec![Field::new(
1837 "a",
1838 DataType::Struct(struct_fields.clone()),
1839 true,
1840 )]);
1841
1842 let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
1844 let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1845 let struct_array =
1846 StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
1847
1848 let col_a = col("a", &schema)?;
1849 let batch =
1850 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1851
1852 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1855 struct_fields.clone(),
1856 vec![
1857 Arc::new(Int32Array::from(vec![1])),
1858 Arc::new(StringArray::from(vec!["a"])),
1859 ],
1860 None,
1861 )));
1862
1863 let struct3 = ScalarValue::Struct(Arc::new(StructArray::new(
1865 struct_fields.clone(),
1866 vec![
1867 Arc::new(Int32Array::from(vec![3])),
1868 Arc::new(StringArray::from(vec!["c"])),
1869 ],
1870 None,
1871 )));
1872
1873 let list = vec![lit(struct1.clone()), lit(struct3.clone())];
1875 in_list_raw!(
1876 batch,
1877 list.clone(),
1878 &false,
1879 vec![Some(true), Some(false), Some(true)],
1880 Arc::clone(&col_a),
1881 &schema
1882 );
1883
1884 in_list_raw!(
1886 batch,
1887 list,
1888 &true,
1889 vec![Some(false), Some(true), Some(false)],
1890 Arc::clone(&col_a),
1891 &schema
1892 );
1893
1894 Ok(())
1895 }
1896
1897 #[test]
1898 fn in_list_struct_with_nulls() -> Result<()> {
1899 let struct_fields = Fields::from(vec![
1901 Field::new("x", DataType::Int32, false),
1902 Field::new("y", DataType::Utf8, false),
1903 ]);
1904 let schema = Schema::new(vec![Field::new(
1905 "a",
1906 DataType::Struct(struct_fields.clone()),
1907 true,
1908 )]);
1909
1910 let x_array = Arc::new(Int32Array::from(vec![1, 2]));
1912 let y_array = Arc::new(StringArray::from(vec!["a", "b"]));
1913 let struct_array = StructArray::new(
1914 struct_fields.clone(),
1915 vec![x_array, y_array],
1916 Some(NullBuffer::from(vec![true, false])),
1917 );
1918
1919 let col_a = col("a", &schema)?;
1920 let batch =
1921 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1922
1923 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1925 struct_fields.clone(),
1926 vec![
1927 Arc::new(Int32Array::from(vec![1])),
1928 Arc::new(StringArray::from(vec!["a"])),
1929 ],
1930 None,
1931 )));
1932
1933 let list = vec![lit(struct1.clone())];
1935 in_list_raw!(
1936 batch,
1937 list.clone(),
1938 &false,
1939 vec![Some(true), None],
1940 Arc::clone(&col_a),
1941 &schema
1942 );
1943
1944 in_list_raw!(
1946 batch,
1947 list,
1948 &true,
1949 vec![Some(false), None],
1950 Arc::clone(&col_a),
1951 &schema
1952 );
1953
1954 Ok(())
1955 }
1956
1957 #[test]
1958 fn in_list_struct_with_null_in_list() -> Result<()> {
1959 let struct_fields = Fields::from(vec![
1961 Field::new("x", DataType::Int32, false),
1962 Field::new("y", DataType::Utf8, false),
1963 ]);
1964 let schema = Schema::new(vec![Field::new(
1965 "a",
1966 DataType::Struct(struct_fields.clone()),
1967 true,
1968 )]);
1969
1970 let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
1972 let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1973 let struct_array =
1974 StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
1975
1976 let col_a = col("a", &schema)?;
1977 let batch =
1978 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1979
1980 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1982 struct_fields.clone(),
1983 vec![
1984 Arc::new(Int32Array::from(vec![1])),
1985 Arc::new(StringArray::from(vec!["a"])),
1986 ],
1987 None,
1988 )));
1989
1990 let null_struct = ScalarValue::Struct(Arc::new(StructArray::new_null(
1991 struct_fields.clone(),
1992 1,
1993 )));
1994
1995 let list = vec![lit(struct1), lit(null_struct.clone())];
1997 in_list_raw!(
1998 batch,
1999 list.clone(),
2000 &false,
2001 vec![Some(true), None, None],
2002 Arc::clone(&col_a),
2003 &schema
2004 );
2005
2006 in_list_raw!(
2008 batch,
2009 list,
2010 &true,
2011 vec![Some(false), None, None],
2012 Arc::clone(&col_a),
2013 &schema
2014 );
2015
2016 Ok(())
2017 }
2018
2019 #[test]
2020 fn in_list_nested_struct() -> Result<()> {
2021 let inner_struct_fields = Fields::from(vec![
2023 Field::new("a", DataType::Int32, false),
2024 Field::new("b", DataType::Utf8, false),
2025 ]);
2026 let outer_struct_fields = Fields::from(vec![
2027 Field::new(
2028 "inner",
2029 DataType::Struct(inner_struct_fields.clone()),
2030 false,
2031 ),
2032 Field::new("c", DataType::Int32, false),
2033 ]);
2034 let schema = Schema::new(vec![Field::new(
2035 "x",
2036 DataType::Struct(outer_struct_fields.clone()),
2037 true,
2038 )]);
2039
2040 let inner1 = Arc::new(StructArray::new(
2042 inner_struct_fields.clone(),
2043 vec![
2044 Arc::new(Int32Array::from(vec![1, 2])),
2045 Arc::new(StringArray::from(vec!["x", "y"])),
2046 ],
2047 None,
2048 ));
2049 let c_array = Arc::new(Int32Array::from(vec![10, 20]));
2050 let outer_array =
2051 StructArray::new(outer_struct_fields.clone(), vec![inner1, c_array], None);
2052
2053 let col_x = col("x", &schema)?;
2054 let batch =
2055 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(outer_array)])?;
2056
2057 let inner_match = Arc::new(StructArray::new(
2059 inner_struct_fields.clone(),
2060 vec![
2061 Arc::new(Int32Array::from(vec![1])),
2062 Arc::new(StringArray::from(vec!["x"])),
2063 ],
2064 None,
2065 ));
2066 let outer_match = ScalarValue::Struct(Arc::new(StructArray::new(
2067 outer_struct_fields.clone(),
2068 vec![inner_match, Arc::new(Int32Array::from(vec![10]))],
2069 None,
2070 )));
2071
2072 let list = vec![lit(outer_match)];
2074 in_list_raw!(
2075 batch,
2076 list.clone(),
2077 &false,
2078 vec![Some(true), Some(false)],
2079 Arc::clone(&col_x),
2080 &schema
2081 );
2082
2083 in_list_raw!(
2085 batch,
2086 list,
2087 &true,
2088 vec![Some(false), Some(true)],
2089 Arc::clone(&col_x),
2090 &schema
2091 );
2092
2093 Ok(())
2094 }
2095
2096 #[test]
2097 fn in_list_struct_with_exprs_not_array() -> Result<()> {
2098 let struct_fields = Fields::from(vec![
2104 Field::new("x", DataType::Int32, false),
2105 Field::new("y", DataType::Utf8, false),
2106 ]);
2107 let schema = Schema::new(vec![Field::new(
2108 "a",
2109 DataType::Struct(struct_fields.clone()),
2110 true,
2111 )]);
2112
2113 let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
2115 let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
2116 let struct_array =
2117 StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
2118
2119 let col_a = col("a", &schema)?;
2120 let batch =
2121 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
2122
2123 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
2126 struct_fields.clone(),
2127 vec![
2128 Arc::new(Int32Array::from(vec![1])),
2129 Arc::new(StringArray::from(vec!["a"])),
2130 ],
2131 None,
2132 )));
2133
2134 let struct3 = ScalarValue::Struct(Arc::new(StructArray::new(
2136 struct_fields.clone(),
2137 vec![
2138 Arc::new(Int32Array::from(vec![3])),
2139 Arc::new(StringArray::from(vec!["c"])),
2140 ],
2141 None,
2142 )));
2143
2144 let list = vec![lit(struct1), lit(struct3)];
2146
2147 let expr = Arc::new(InListExpr::new(Arc::clone(&col_a), list, false, None));
2150
2151 let display_string = expr.to_string();
2154 assert!(
2155 !display_string.contains("(SET)"),
2156 "Expected display string to NOT contain '(SET)' (should use Exprs variant), but got: {display_string}",
2157 );
2158
2159 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2161 let result = as_boolean_array(&result);
2162
2163 let expected = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
2167 assert_eq!(result, &expected);
2168
2169 let expr_not = Arc::new(InListExpr::new(
2171 Arc::clone(&col_a),
2172 vec![
2173 lit(ScalarValue::Struct(Arc::new(StructArray::new(
2174 struct_fields.clone(),
2175 vec![
2176 Arc::new(Int32Array::from(vec![1])),
2177 Arc::new(StringArray::from(vec!["a"])),
2178 ],
2179 None,
2180 )))),
2181 lit(ScalarValue::Struct(Arc::new(StructArray::new(
2182 struct_fields.clone(),
2183 vec![
2184 Arc::new(Int32Array::from(vec![3])),
2185 Arc::new(StringArray::from(vec!["c"])),
2186 ],
2187 None,
2188 )))),
2189 ],
2190 true,
2191 None,
2192 ));
2193
2194 let result_not = expr_not.evaluate(&batch)?.into_array(batch.num_rows())?;
2195 let result_not = as_boolean_array(&result_not);
2196
2197 let expected_not = BooleanArray::from(vec![Some(false), Some(true), Some(false)]);
2198 assert_eq!(result_not, &expected_not);
2199
2200 Ok(())
2201 }
2202
2203 #[test]
2204 fn test_in_list_null_handling_comprehensive() -> Result<()> {
2205 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2208
2209 let a = Int64Array::from(vec![Some(1), Some(2), Some(3), None]);
2215 let col_a = col("a", &schema)?;
2216 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2217
2218 let list = vec![lit(1i64), lit(4i64)];
2221 in_list!(
2222 batch,
2223 list,
2224 &false,
2225 vec![
2226 Some(true), Some(false), Some(false), None, ],
2231 Arc::clone(&col_a),
2232 &schema
2233 );
2234
2235 let list = vec![lit(1i64), lit(ScalarValue::Int64(None))];
2238 in_list!(
2239 batch,
2240 list,
2241 &false,
2242 vec![
2243 Some(true), None, None, None, ],
2248 Arc::clone(&col_a),
2249 &schema
2250 );
2251
2252 Ok(())
2253 }
2254
2255 #[test]
2256 fn test_in_list_with_only_nulls() -> Result<()> {
2257 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2259 let a = Int64Array::from(vec![Some(1), Some(2), None]);
2260 let col_a = col("a", &schema)?;
2261 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2262
2263 let list = vec![lit(ScalarValue::Int64(None)), lit(ScalarValue::Int64(None))];
2265
2266 in_list!(
2270 batch,
2271 list.clone(),
2272 &false,
2273 vec![None, None, None],
2274 Arc::clone(&col_a),
2275 &schema
2276 );
2277
2278 in_list!(
2281 batch,
2282 list,
2283 &true,
2284 vec![None, None, None],
2285 Arc::clone(&col_a),
2286 &schema
2287 );
2288
2289 Ok(())
2290 }
2291
2292 #[test]
2293 fn test_in_list_multiple_nulls_deduplication() -> Result<()> {
2294 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2297 let col_a = col("a", &schema)?;
2298
2299 let array = Arc::new(Int64Array::from(vec![
2301 Some(1),
2302 Some(2),
2303 None,
2304 None,
2305 Some(3),
2306 None,
2307 ])) as ArrayRef;
2308
2309 let expr = Arc::new(InListExpr::try_new_from_array(
2311 Arc::clone(&col_a),
2312 array,
2313 false,
2314 )?) as Arc<dyn PhysicalExpr>;
2315
2316 let a = Int64Array::from(vec![Some(1), Some(2), Some(3), Some(4), None]);
2318 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2319
2320 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2322 let result = as_boolean_array(&result);
2323
2324 let expected = BooleanArray::from(vec![
2329 Some(true), Some(true), Some(true), None, None, ]);
2335 assert_eq!(result, &expected);
2336
2337 Ok(())
2338 }
2339
2340 #[test]
2341 fn test_not_in_null_handling_comprehensive() -> Result<()> {
2342 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2345
2346 let a = Int64Array::from(vec![Some(1), Some(2), Some(3), None]);
2348 let col_a = col("a", &schema)?;
2349 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2350
2351 let list = vec![lit(1i64), lit(4i64)];
2354 in_list!(
2355 batch,
2356 list,
2357 &true,
2358 vec![
2359 Some(false), Some(true), Some(true), None, ],
2364 Arc::clone(&col_a),
2365 &schema
2366 );
2367
2368 let list = vec![lit(1i64), lit(ScalarValue::Int64(None))];
2371 in_list!(
2372 batch,
2373 list,
2374 &true,
2375 vec![
2376 Some(false), None, None, None, ],
2381 Arc::clone(&col_a),
2382 &schema
2383 );
2384
2385 Ok(())
2386 }
2387
2388 #[test]
2389 fn test_in_list_null_type_column() -> Result<()> {
2390 let schema = Schema::new(vec![Field::new("a", DataType::Null, true)]);
2393 let a = NullArray::new(3);
2394 let col_a = col("a", &schema)?;
2395 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2396
2397 let list = vec![lit(1i64), lit(2i64)];
2400
2401 in_list!(
2405 batch,
2406 list.clone(),
2407 &false,
2408 vec![None, None, None],
2409 Arc::clone(&col_a),
2410 &schema
2411 );
2412
2413 in_list!(
2416 batch,
2417 list,
2418 &true,
2419 vec![None, None, None],
2420 Arc::clone(&col_a),
2421 &schema
2422 );
2423
2424 Ok(())
2425 }
2426
2427 #[test]
2428 fn test_in_list_null_type_list() -> Result<()> {
2429 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2431 let a = Int64Array::from(vec![Some(1), Some(2), None]);
2432 let col_a = col("a", &schema)?;
2433
2434 let null_array = Arc::new(NullArray::new(2)) as ArrayRef;
2436
2437 let expr = Arc::new(InListExpr::try_new_from_array(
2440 Arc::clone(&col_a),
2441 null_array,
2442 false,
2443 )?) as Arc<dyn PhysicalExpr>;
2444 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2445 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2446 let result = as_boolean_array(&result);
2447
2448 let expected = BooleanArray::from(vec![None, None, None]);
2451 assert_eq!(result, &expected);
2452
2453 Ok(())
2454 }
2455
2456 #[test]
2457 fn test_in_list_null_type_both() -> Result<()> {
2458 let schema = Schema::new(vec![Field::new("a", DataType::Null, true)]);
2460 let a = NullArray::new(3);
2461 let col_a = col("a", &schema)?;
2462
2463 let null_array = Arc::new(NullArray::new(2)) as ArrayRef;
2465
2466 let expr = Arc::new(InListExpr::try_new_from_array(
2468 Arc::clone(&col_a),
2469 null_array,
2470 false,
2471 )?) as Arc<dyn PhysicalExpr>;
2472
2473 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2474 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2475 let result = as_boolean_array(&result);
2476
2477 let expected = BooleanArray::from(vec![None, None, None]);
2480 assert_eq!(result, &expected);
2481
2482 Ok(())
2483 }
2484
2485 #[test]
2486 fn test_in_list_comprehensive_null_handling() -> Result<()> {
2487 let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2495 let col_b = col("b", &schema)?;
2496 let null_i32 = ScalarValue::Int32(None);
2497
2498 let make_batch = |values: Vec<Option<i32>>| -> Result<RecordBatch> {
2500 let array = Arc::new(Int32Array::from(values));
2501 Ok(RecordBatch::try_new(Arc::clone(&schema), vec![array])?)
2502 };
2503
2504 let run_test = |batch: &RecordBatch,
2506 expr: Arc<dyn PhysicalExpr>,
2507 list: Vec<Arc<dyn PhysicalExpr>>,
2508 expected: Vec<Option<bool>>|
2509 -> Result<()> {
2510 let in_expr = in_list(expr, list, &false, schema.as_ref())?;
2511 let result = in_expr.evaluate(batch)?.into_array(batch.num_rows())?;
2512 let result = as_boolean_array(&result);
2513 assert_eq!(result, &BooleanArray::from(expected));
2514 Ok(())
2515 };
2516
2517 let batch = make_batch(vec![Some(1)])?;
2523 run_test(
2524 &batch,
2525 Arc::clone(&col_b),
2526 vec![lit(1i32), lit(2i32)],
2527 vec![Some(true)],
2528 )?;
2529
2530 let batch = make_batch(vec![Some(1), Some(2)])?;
2532 run_test(
2533 &batch,
2534 Arc::clone(&col_b),
2535 vec![lit(1i32), lit(2i32)],
2536 vec![Some(true), Some(true)],
2537 )?;
2538
2539 let batch = make_batch(vec![Some(3), Some(4)])?;
2541 run_test(
2542 &batch,
2543 Arc::clone(&col_b),
2544 vec![lit(1i32), lit(2i32)],
2545 vec![Some(false), Some(false)],
2546 )?;
2547
2548 let batch = make_batch(vec![Some(1), None])?;
2550 run_test(
2551 &batch,
2552 Arc::clone(&col_b),
2553 vec![lit(1i32), lit(2i32)],
2554 vec![Some(true), None],
2555 )?;
2556
2557 let batch = make_batch(vec![Some(3), None])?;
2559 run_test(
2560 &batch,
2561 Arc::clone(&col_b),
2562 vec![lit(1i32), lit(2i32)],
2563 vec![Some(false), None],
2564 )?;
2565
2566 let batch = make_batch(vec![Some(1)])?;
2572 run_test(
2573 &batch,
2574 Arc::clone(&col_b),
2575 vec![lit(null_i32.clone()), lit(1i32)],
2576 vec![Some(true)],
2577 )?;
2578
2579 let batch = make_batch(vec![Some(2)])?;
2581 run_test(
2582 &batch,
2583 Arc::clone(&col_b),
2584 vec![lit(null_i32.clone()), lit(1i32)],
2585 vec![None],
2586 )?;
2587
2588 let batch = make_batch(vec![None])?;
2590 run_test(
2591 &batch,
2592 Arc::clone(&col_b),
2593 vec![lit(null_i32.clone()), lit(1i32)],
2594 vec![None],
2595 )?;
2596
2597 let batch = make_batch(vec![Some(1)])?;
2603 run_test(
2604 &batch,
2605 Arc::clone(&col_b),
2606 vec![lit(null_i32.clone()), lit(null_i32.clone())],
2607 vec![None],
2608 )?;
2609
2610 let batch = make_batch(vec![None])?;
2612 run_test(
2613 &batch,
2614 Arc::clone(&col_b),
2615 vec![lit(null_i32.clone()), lit(null_i32.clone())],
2616 vec![None],
2617 )?;
2618
2619 let batch = make_batch(vec![Some(1)])?;
2625 run_test(
2626 &batch,
2627 lit(1i32),
2628 vec![lit(2i32), Arc::clone(&col_b)],
2629 vec![Some(true)],
2630 )?;
2631
2632 let batch = make_batch(vec![Some(3)])?;
2634 run_test(
2635 &batch,
2636 lit(1i32),
2637 vec![lit(2i32), Arc::clone(&col_b)],
2638 vec![Some(false)],
2639 )?;
2640
2641 let batch = make_batch(vec![None])?;
2643 run_test(
2644 &batch,
2645 lit(1i32),
2646 vec![lit(2i32), Arc::clone(&col_b)],
2647 vec![None],
2648 )?;
2649
2650 let batch = make_batch(vec![Some(1)])?;
2656 run_test(
2657 &batch,
2658 Arc::clone(&col_b),
2659 vec![lit(1i32), Arc::clone(&col_b)],
2660 vec![Some(true)],
2661 )?;
2662
2663 let batch = make_batch(vec![Some(2)])?;
2665 run_test(
2666 &batch,
2667 Arc::clone(&col_b),
2668 vec![lit(1i32), Arc::clone(&col_b)],
2669 vec![Some(true)],
2670 )?;
2671
2672 let batch = make_batch(vec![None])?;
2674 run_test(
2675 &batch,
2676 Arc::clone(&col_b),
2677 vec![lit(1i32), Arc::clone(&col_b)],
2678 vec![None],
2679 )?;
2680
2681 Ok(())
2682 }
2683
2684 #[test]
2685 fn test_in_list_scalar_literal_cases() -> Result<()> {
2686 let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2691 let null_i32 = ScalarValue::Int32(None);
2692
2693 let make_batch = |values: Vec<Option<i32>>| -> Result<RecordBatch> {
2695 let array = Arc::new(Int32Array::from(values));
2696 Ok(RecordBatch::try_new(Arc::clone(&schema), vec![array])?)
2697 };
2698
2699 let run_test = |batch: &RecordBatch,
2701 expr: Arc<dyn PhysicalExpr>,
2702 list: Vec<Arc<dyn PhysicalExpr>>,
2703 negated: bool,
2704 expected: Vec<Option<bool>>|
2705 -> Result<()> {
2706 let in_expr = in_list(expr, list, &negated, schema.as_ref())?;
2707 let result = in_expr.evaluate(batch)?.into_array(batch.num_rows())?;
2708 let result = as_boolean_array(&result);
2709 let expected_array = BooleanArray::from(expected);
2710 assert_eq!(
2711 result,
2712 &expected_array,
2713 "Expected {:?}, got {:?}",
2714 expected_array,
2715 result.iter().collect::<Vec<_>>()
2716 );
2717 Ok(())
2718 };
2719
2720 let batch = make_batch(vec![Some(1)])?;
2721
2722 run_test(
2729 &batch,
2730 lit(null_i32.clone()),
2731 vec![lit(1i32), lit(1i32)],
2732 false,
2733 vec![None],
2734 )?;
2735
2736 run_test(
2738 &batch,
2739 lit(null_i32.clone()),
2740 vec![lit(null_i32.clone()), lit(1i32)],
2741 false,
2742 vec![None],
2743 )?;
2744
2745 run_test(
2747 &batch,
2748 lit(null_i32.clone()),
2749 vec![lit(null_i32.clone()), lit(null_i32.clone())],
2750 false,
2751 vec![None],
2752 )?;
2753
2754 run_test(
2762 &batch,
2763 lit(3i32),
2764 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2765 false,
2766 vec![None],
2767 )?;
2768
2769 run_test(
2771 &batch,
2772 lit(3i32),
2773 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2774 true,
2775 vec![None],
2776 )?;
2777
2778 run_test(
2780 &batch,
2781 lit(1i32),
2782 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2783 false,
2784 vec![Some(true)],
2785 )?;
2786
2787 run_test(
2789 &batch,
2790 lit(1i32),
2791 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2792 true,
2793 vec![Some(false)],
2794 )?;
2795
2796 let schema_str =
2802 Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
2803 let batch_str = RecordBatch::try_new(
2804 Arc::clone(&schema_str),
2805 vec![Arc::new(StringArray::from(vec![Some("dummy")]))],
2806 )?;
2807 let null_str = ScalarValue::Utf8(None);
2808
2809 let run_test_str = |expr: Arc<dyn PhysicalExpr>,
2810 list: Vec<Arc<dyn PhysicalExpr>>,
2811 negated: bool,
2812 expected: Vec<Option<bool>>|
2813 -> Result<()> {
2814 let in_expr = in_list(expr, list, &negated, schema_str.as_ref())?;
2815 let result = in_expr
2816 .evaluate(&batch_str)?
2817 .into_array(batch_str.num_rows())?;
2818 let result = as_boolean_array(&result);
2819 let expected_array = BooleanArray::from(expected);
2820 assert_eq!(
2821 result,
2822 &expected_array,
2823 "Expected {:?}, got {:?}",
2824 expected_array,
2825 result.iter().collect::<Vec<_>>()
2826 );
2827 Ok(())
2828 };
2829
2830 run_test_str(
2832 lit("c"),
2833 vec![lit("a"), lit("b"), lit(null_str.clone())],
2834 false,
2835 vec![None],
2836 )?;
2837
2838 run_test_str(
2840 lit("c"),
2841 vec![lit("a"), lit("b"), lit(null_str.clone())],
2842 true,
2843 vec![None],
2844 )?;
2845
2846 run_test_str(
2848 lit("a"),
2849 vec![lit("a"), lit("b"), lit(null_str.clone())],
2850 false,
2851 vec![Some(true)],
2852 )?;
2853
2854 run_test_str(
2856 lit("a"),
2857 vec![lit("a"), lit("b"), lit(null_str.clone())],
2858 true,
2859 vec![Some(false)],
2860 )?;
2861
2862 Ok(())
2863 }
2864
2865 #[test]
2866 fn test_in_list_tuple_cases() -> Result<()> {
2867 let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2871
2872 let make_struct = |v1: Option<i32>, v2: Option<i32>| -> ScalarValue {
2874 let fields = Fields::from(vec![
2875 Field::new("field_0", DataType::Int32, true),
2876 Field::new("field_1", DataType::Int32, true),
2877 ]);
2878 ScalarValue::Struct(Arc::new(StructArray::new(
2879 fields,
2880 vec![
2881 Arc::new(Int32Array::from(vec![v1])),
2882 Arc::new(Int32Array::from(vec![v2])),
2883 ],
2884 None,
2885 )))
2886 };
2887
2888 let batch = RecordBatch::try_new(
2890 Arc::clone(&schema),
2891 vec![Arc::new(Int32Array::from(vec![Some(1)]))],
2892 )?;
2893
2894 let run_tuple_test = |lhs: ScalarValue,
2896 list: Vec<ScalarValue>,
2897 expected: Vec<Option<bool>>|
2898 -> Result<()> {
2899 let expr = in_list(
2900 lit(lhs),
2901 list.into_iter().map(lit).collect(),
2902 &false,
2903 schema.as_ref(),
2904 )?;
2905 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2906 let result = as_boolean_array(&result);
2907 assert_eq!(result, &BooleanArray::from(expected));
2908 Ok(())
2909 };
2910
2911 run_tuple_test(
2913 make_struct(None, None),
2914 vec![make_struct(Some(1), Some(2))],
2915 vec![Some(false)],
2916 )?;
2917
2918 run_tuple_test(
2920 make_struct(None, None),
2921 vec![make_struct(None, Some(1))],
2922 vec![Some(false)],
2923 )?;
2924
2925 run_tuple_test(
2927 make_struct(None, None),
2928 vec![make_struct(None, None)],
2929 vec![Some(true)],
2930 )?;
2931
2932 run_tuple_test(
2934 make_struct(None, Some(1)),
2935 vec![make_struct(Some(1), Some(2))],
2936 vec![Some(false)],
2937 )?;
2938
2939 run_tuple_test(
2941 make_struct(None, Some(1)),
2942 vec![make_struct(None, Some(1))],
2943 vec![Some(true)],
2944 )?;
2945
2946 run_tuple_test(
2948 make_struct(None, Some(1)),
2949 vec![make_struct(None, None)],
2950 vec![Some(false)],
2951 )?;
2952
2953 run_tuple_test(
2955 make_struct(Some(1), Some(2)),
2956 vec![make_struct(Some(1), Some(2))],
2957 vec![Some(true)],
2958 )?;
2959
2960 run_tuple_test(
2962 make_struct(Some(1), Some(3)),
2963 vec![make_struct(Some(1), Some(2))],
2964 vec![Some(false)],
2965 )?;
2966
2967 run_tuple_test(
2969 make_struct(Some(4), Some(4)),
2970 vec![make_struct(Some(1), Some(2))],
2971 vec![Some(false)],
2972 )?;
2973
2974 run_tuple_test(
2976 make_struct(Some(1), Some(1)),
2977 vec![make_struct(None, Some(1))],
2978 vec![Some(false)],
2979 )?;
2980
2981 run_tuple_test(
2983 make_struct(Some(1), Some(1)),
2984 vec![make_struct(None, None)],
2985 vec![Some(false)],
2986 )?;
2987
2988 Ok(())
2989 }
2990
2991 #[test]
2992 fn test_in_list_dictionary_int32() -> Result<()> {
2993 let dict_type =
2995 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32));
2996 let schema = Schema::new(vec![Field::new("a", dict_type.clone(), false)]);
2997 let col_a = col("a", &schema)?;
2998
2999 let list = vec![lit(100i32), lit(200i32), lit(300i32)];
3001
3002 let expr = in_list(col_a, list, &false, &schema)?;
3004
3005 let keys = Int8Array::from(vec![0, 1, 2]);
3009 let values = Int32Array::from(vec![100, 200, 500]);
3010 let dict_array: ArrayRef =
3011 Arc::new(DictionaryArray::try_new(keys, Arc::new(values))?);
3012 let batch = RecordBatch::try_new(Arc::new(schema), vec![dict_array])?;
3013
3014 let result = expr.evaluate(&batch)?.into_array(3)?;
3016 let result = as_boolean_array(&result);
3017 assert_eq!(result, &BooleanArray::from(vec![true, true, false]));
3018 Ok(())
3019 }
3020
3021 #[test]
3022 fn test_in_list_dictionary_types() -> Result<()> {
3023 fn dict_lit_int64(key_type: DataType, value: i64) -> Arc<dyn PhysicalExpr> {
3025 lit(ScalarValue::Dictionary(
3026 Box::new(key_type),
3027 Box::new(ScalarValue::Int64(Some(value))),
3028 ))
3029 }
3030
3031 fn dict_lit_float64(key_type: DataType, value: f64) -> Arc<dyn PhysicalExpr> {
3032 lit(ScalarValue::Dictionary(
3033 Box::new(key_type),
3034 Box::new(ScalarValue::Float64(Some(value))),
3035 ))
3036 }
3037
3038 struct DictNeedleTest {
3040 list_values: Vec<Arc<dyn PhysicalExpr>>,
3041 expected: Vec<Option<bool>>,
3042 }
3043
3044 struct DictionaryInListTestCase {
3045 name: &'static str,
3046 dict_type: DataType,
3047 dict_keys: Vec<Option<i8>>,
3048 dict_values: ArrayRef,
3049 list_values_no_null: Vec<Arc<dyn PhysicalExpr>>,
3050 list_values_with_null: Vec<Arc<dyn PhysicalExpr>>,
3051 expected_1: Vec<Option<bool>>,
3052 expected_2: Vec<Option<bool>>,
3053 expected_3: Vec<Option<bool>>,
3054 expected_4: Vec<Option<bool>>,
3055 dict_needle_test: Option<DictNeedleTest>,
3056 }
3057
3058 fn run_dictionary_in_list_test(
3060 test_case: DictionaryInListTestCase,
3061 ) -> Result<()> {
3062 let schema =
3064 Schema::new(vec![Field::new("a", test_case.dict_type.clone(), true)]);
3065 let col_a = col("a", &schema)?;
3066
3067 let keys = Int8Array::from(test_case.dict_keys.clone());
3069 let dict_array: ArrayRef =
3070 Arc::new(DictionaryArray::try_new(keys, test_case.dict_values)?);
3071 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![dict_array])?;
3072
3073 let exp1 = test_case.expected_1.clone();
3074 let exp2 = test_case.expected_2.clone();
3075 let exp3 = test_case.expected_3.clone();
3076 let exp4 = test_case.expected_4;
3077
3078 in_list!(
3080 batch,
3081 test_case.list_values_no_null.clone(),
3082 &false,
3083 exp1,
3084 Arc::clone(&col_a),
3085 &schema
3086 );
3087
3088 in_list!(
3090 batch,
3091 test_case.list_values_no_null.clone(),
3092 &true,
3093 exp2,
3094 Arc::clone(&col_a),
3095 &schema
3096 );
3097
3098 in_list!(
3100 batch,
3101 test_case.list_values_with_null.clone(),
3102 &false,
3103 exp3,
3104 Arc::clone(&col_a),
3105 &schema
3106 );
3107
3108 in_list!(
3110 batch,
3111 test_case.list_values_with_null,
3112 &true,
3113 exp4,
3114 Arc::clone(&col_a),
3115 &schema
3116 );
3117
3118 if let Some(needle_test) = test_case.dict_needle_test {
3120 in_list_raw!(
3121 batch,
3122 needle_test.list_values,
3123 &false,
3124 needle_test.expected,
3125 Arc::clone(&col_a),
3126 &schema
3127 );
3128 }
3129
3130 Ok(())
3131 }
3132
3133 let utf8_case = DictionaryInListTestCase {
3137 name: "dictionary_utf8",
3138 dict_type: DataType::Dictionary(
3139 Box::new(DataType::Int8),
3140 Box::new(DataType::Utf8),
3141 ),
3142 dict_keys: vec![Some(0), Some(1), None],
3143 dict_values: Arc::new(StringArray::from(vec![Some("a"), Some("d")])),
3144 list_values_no_null: vec![lit("a"), lit("b")],
3145 list_values_with_null: vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
3146 expected_1: vec![Some(true), Some(false), None],
3147 expected_2: vec![Some(false), Some(true), None],
3148 expected_3: vec![Some(true), None, None],
3149 expected_4: vec![Some(false), None, None],
3150 dict_needle_test: None,
3151 };
3152
3153 let int64_case = DictionaryInListTestCase {
3157 name: "dictionary_int64",
3158 dict_type: DataType::Dictionary(
3159 Box::new(DataType::Int8),
3160 Box::new(DataType::Int64),
3161 ),
3162 dict_keys: vec![Some(0), Some(1), None],
3163 dict_values: Arc::new(Int64Array::from(vec![Some(10), Some(20)])),
3164 list_values_no_null: vec![lit(10i64), lit(15i64)],
3165 list_values_with_null: vec![
3166 lit(10i64),
3167 lit(15i64),
3168 lit(ScalarValue::Int64(None)),
3169 ],
3170 expected_1: vec![Some(true), Some(false), None],
3171 expected_2: vec![Some(false), Some(true), None],
3172 expected_3: vec![Some(true), None, None],
3173 expected_4: vec![Some(false), None, None],
3174 dict_needle_test: Some(DictNeedleTest {
3175 list_values: vec![
3176 dict_lit_int64(DataType::Int16, 10),
3177 dict_lit_int64(DataType::Int16, 15),
3178 ],
3179 expected: vec![Some(true), Some(false), None],
3180 }),
3181 };
3182
3183 let float64_case = DictionaryInListTestCase {
3188 name: "dictionary_float64",
3189 dict_type: DataType::Dictionary(
3190 Box::new(DataType::Int8),
3191 Box::new(DataType::Float64),
3192 ),
3193 dict_keys: vec![Some(0), Some(1), None, Some(2)],
3194 dict_values: Arc::new(Float64Array::from(vec![
3195 Some(1.5), Some(3.7), Some(f64::NAN), ])),
3199 list_values_no_null: vec![lit(1.5f64), lit(2.0f64)],
3200 list_values_with_null: vec![
3201 lit(1.5f64),
3202 lit(2.0f64),
3203 lit(ScalarValue::Float64(None)),
3204 ],
3205 expected_1: vec![Some(true), Some(false), None, Some(false)],
3208 expected_2: vec![Some(false), Some(true), None, Some(true)],
3211 expected_3: vec![Some(true), None, None, None],
3214 expected_4: vec![Some(false), None, None, None],
3217 dict_needle_test: Some(DictNeedleTest {
3218 list_values: vec![
3219 dict_lit_float64(DataType::UInt16, 1.5),
3220 dict_lit_float64(DataType::UInt16, 2.0),
3221 ],
3222 expected: vec![Some(true), Some(false), None, Some(false)],
3223 }),
3224 };
3225
3226 let test_name = utf8_case.name;
3228 run_dictionary_in_list_test(utf8_case).map_err(|e| {
3229 datafusion_common::DataFusionError::Execution(format!(
3230 "Dictionary test '{test_name}' failed: {e}"
3231 ))
3232 })?;
3233
3234 let test_name = int64_case.name;
3235 run_dictionary_in_list_test(int64_case).map_err(|e| {
3236 datafusion_common::DataFusionError::Execution(format!(
3237 "Dictionary test '{test_name}' failed: {e}"
3238 ))
3239 })?;
3240
3241 let test_name = float64_case.name;
3242 run_dictionary_in_list_test(float64_case).map_err(|e| {
3243 datafusion_common::DataFusionError::Execution(format!(
3244 "Dictionary test '{test_name}' failed: {e}"
3245 ))
3246 })?;
3247
3248 let dedup_case = DictionaryInListTestCase {
3252 name: "dictionary_deduplication",
3253 dict_type: DataType::Dictionary(
3254 Box::new(DataType::Int8),
3255 Box::new(DataType::Utf8),
3256 ),
3257 dict_keys: vec![Some(0), Some(1), Some(0), Some(1), None],
3260 dict_values: Arc::new(StringArray::from(vec![Some("a"), Some("d")])),
3261 list_values_no_null: vec![lit("a"), lit("b")],
3262 list_values_with_null: vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
3263 expected_1: vec![Some(true), Some(false), Some(true), Some(false), None],
3266 expected_2: vec![Some(false), Some(true), Some(false), Some(true), None],
3268 expected_3: vec![Some(true), None, Some(true), None, None],
3271 expected_4: vec![Some(false), None, Some(false), None, None],
3273 dict_needle_test: None,
3274 };
3275
3276 let test_name = dedup_case.name;
3277 run_dictionary_in_list_test(dedup_case).map_err(|e| {
3278 datafusion_common::DataFusionError::Execution(format!(
3279 "Dictionary test '{test_name}' failed: {e}"
3280 ))
3281 })?;
3282
3283 let dict_type =
3285 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Float64));
3286 let schema = Schema::new(vec![Field::new("a", dict_type.clone(), true)]);
3287 let col_a = col("a", &schema)?;
3288
3289 let keys = Int8Array::from(vec![Some(0), Some(1), None, Some(2)]);
3290 let values = Float64Array::from(vec![Some(1.5), Some(3.7), Some(f64::NAN)]);
3291 let dict_array: ArrayRef =
3292 Arc::new(DictionaryArray::try_new(keys, Arc::new(values))?);
3293 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![dict_array])?;
3294
3295 let list_with_nan = vec![lit(1.5f64), lit(2.0f64), lit(f64::NAN)];
3297 in_list!(
3298 batch,
3299 list_with_nan,
3300 &false,
3301 vec![Some(true), Some(false), None, Some(true)],
3302 col_a,
3303 &schema
3304 );
3305
3306 Ok(())
3307 }
3308
3309 #[test]
3310 fn test_in_list_esoteric_types() -> Result<()> {
3311 let test_type = |data_type: DataType,
3318 in_array: ArrayRef,
3319 list_values: Vec<ScalarValue>|
3320 -> Result<()> {
3321 let schema = Schema::new(vec![Field::new("a", data_type.clone(), false)]);
3322 let col_a = col("a", &schema)?;
3323 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![in_array])?;
3324
3325 let list = list_values.into_iter().map(lit).collect();
3326 in_list!(
3327 batch,
3328 list,
3329 &false,
3330 vec![Some(true), Some(false)],
3331 col_a,
3332 &schema
3333 );
3334 Ok(())
3335 };
3336
3337 test_type(
3339 DataType::Timestamp(TimeUnit::Second, None),
3340 Arc::new(TimestampSecondArray::from(vec![Some(1000), Some(2000)])),
3341 vec![
3342 ScalarValue::TimestampSecond(Some(1000), None),
3343 ScalarValue::TimestampSecond(Some(1500), None),
3344 ],
3345 )?;
3346
3347 test_type(
3348 DataType::Timestamp(TimeUnit::Millisecond, None),
3349 Arc::new(TimestampMillisecondArray::from(vec![
3350 Some(1000000),
3351 Some(2000000),
3352 ])),
3353 vec![
3354 ScalarValue::TimestampMillisecond(Some(1000000), None),
3355 ScalarValue::TimestampMillisecond(Some(1500000), None),
3356 ],
3357 )?;
3358
3359 test_type(
3360 DataType::Timestamp(TimeUnit::Microsecond, None),
3361 Arc::new(TimestampMicrosecondArray::from(vec![
3362 Some(1000000000),
3363 Some(2000000000),
3364 ])),
3365 vec![
3366 ScalarValue::TimestampMicrosecond(Some(1000000000), None),
3367 ScalarValue::TimestampMicrosecond(Some(1500000000), None),
3368 ],
3369 )?;
3370
3371 test_type(
3373 DataType::Time32(TimeUnit::Second),
3374 Arc::new(Time32SecondArray::from(vec![Some(3600), Some(7200)])),
3375 vec![
3376 ScalarValue::Time32Second(Some(3600)),
3377 ScalarValue::Time32Second(Some(5400)),
3378 ],
3379 )?;
3380
3381 test_type(
3382 DataType::Time32(TimeUnit::Millisecond),
3383 Arc::new(Time32MillisecondArray::from(vec![
3384 Some(3600000),
3385 Some(7200000),
3386 ])),
3387 vec![
3388 ScalarValue::Time32Millisecond(Some(3600000)),
3389 ScalarValue::Time32Millisecond(Some(5400000)),
3390 ],
3391 )?;
3392
3393 test_type(
3394 DataType::Time64(TimeUnit::Microsecond),
3395 Arc::new(Time64MicrosecondArray::from(vec![
3396 Some(3600000000),
3397 Some(7200000000),
3398 ])),
3399 vec![
3400 ScalarValue::Time64Microsecond(Some(3600000000)),
3401 ScalarValue::Time64Microsecond(Some(5400000000)),
3402 ],
3403 )?;
3404
3405 test_type(
3406 DataType::Time64(TimeUnit::Nanosecond),
3407 Arc::new(Time64NanosecondArray::from(vec![
3408 Some(3600000000000),
3409 Some(7200000000000),
3410 ])),
3411 vec![
3412 ScalarValue::Time64Nanosecond(Some(3600000000000)),
3413 ScalarValue::Time64Nanosecond(Some(5400000000000)),
3414 ],
3415 )?;
3416
3417 test_type(
3419 DataType::Duration(TimeUnit::Second),
3420 Arc::new(DurationSecondArray::from(vec![Some(86400), Some(172800)])),
3421 vec![
3422 ScalarValue::DurationSecond(Some(86400)),
3423 ScalarValue::DurationSecond(Some(129600)),
3424 ],
3425 )?;
3426
3427 test_type(
3428 DataType::Duration(TimeUnit::Millisecond),
3429 Arc::new(DurationMillisecondArray::from(vec![
3430 Some(86400000),
3431 Some(172800000),
3432 ])),
3433 vec![
3434 ScalarValue::DurationMillisecond(Some(86400000)),
3435 ScalarValue::DurationMillisecond(Some(129600000)),
3436 ],
3437 )?;
3438
3439 test_type(
3440 DataType::Duration(TimeUnit::Microsecond),
3441 Arc::new(DurationMicrosecondArray::from(vec![
3442 Some(86400000000),
3443 Some(172800000000),
3444 ])),
3445 vec![
3446 ScalarValue::DurationMicrosecond(Some(86400000000)),
3447 ScalarValue::DurationMicrosecond(Some(129600000000)),
3448 ],
3449 )?;
3450
3451 test_type(
3452 DataType::Duration(TimeUnit::Nanosecond),
3453 Arc::new(DurationNanosecondArray::from(vec![
3454 Some(86400000000000),
3455 Some(172800000000000),
3456 ])),
3457 vec![
3458 ScalarValue::DurationNanosecond(Some(86400000000000)),
3459 ScalarValue::DurationNanosecond(Some(129600000000000)),
3460 ],
3461 )?;
3462
3463 test_type(
3465 DataType::Interval(IntervalUnit::YearMonth),
3466 Arc::new(IntervalYearMonthArray::from(vec![Some(12), Some(24)])),
3467 vec![
3468 ScalarValue::IntervalYearMonth(Some(12)),
3469 ScalarValue::IntervalYearMonth(Some(18)),
3470 ],
3471 )?;
3472
3473 test_type(
3474 DataType::Interval(IntervalUnit::DayTime),
3475 Arc::new(IntervalDayTimeArray::from(vec![
3476 Some(IntervalDayTime {
3477 days: 1,
3478 milliseconds: 0,
3479 }),
3480 Some(IntervalDayTime {
3481 days: 2,
3482 milliseconds: 0,
3483 }),
3484 ])),
3485 vec![
3486 ScalarValue::IntervalDayTime(Some(IntervalDayTime {
3487 days: 1,
3488 milliseconds: 0,
3489 })),
3490 ScalarValue::IntervalDayTime(Some(IntervalDayTime {
3491 days: 1,
3492 milliseconds: 500,
3493 })),
3494 ],
3495 )?;
3496
3497 test_type(
3498 DataType::Interval(IntervalUnit::MonthDayNano),
3499 Arc::new(IntervalMonthDayNanoArray::from(vec![
3500 Some(IntervalMonthDayNano {
3501 months: 1,
3502 days: 0,
3503 nanoseconds: 0,
3504 }),
3505 Some(IntervalMonthDayNano {
3506 months: 2,
3507 days: 0,
3508 nanoseconds: 0,
3509 }),
3510 ])),
3511 vec![
3512 ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano {
3513 months: 1,
3514 days: 0,
3515 nanoseconds: 0,
3516 })),
3517 ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano {
3518 months: 1,
3519 days: 15,
3520 nanoseconds: 0,
3521 })),
3522 ],
3523 )?;
3524
3525 let precision = 38;
3528 let scale = 10;
3529 test_type(
3530 DataType::Decimal256(precision, scale),
3531 Arc::new(
3532 Decimal256Array::from(vec![
3533 Some(i256::from(12345)),
3534 Some(i256::from(67890)),
3535 ])
3536 .with_precision_and_scale(precision, scale)?,
3537 ),
3538 vec![
3539 ScalarValue::Decimal256(Some(i256::from(12345)), precision, scale),
3540 ScalarValue::Decimal256(Some(i256::from(54321)), precision, scale),
3541 ],
3542 )?;
3543
3544 Ok(())
3545 }
3546
3547 fn make_in_list_with_columns(
3550 expr: Arc<dyn PhysicalExpr>,
3551 list: Vec<Arc<dyn PhysicalExpr>>,
3552 negated: bool,
3553 ) -> Arc<InListExpr> {
3554 Arc::new(InListExpr::new(expr, list, negated, None))
3555 }
3556
3557 #[test]
3558 fn test_in_list_with_columns_int32_scalars() -> Result<()> {
3559 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
3561 let col_a = col("a", &schema)?;
3562 let batch = RecordBatch::try_new(
3563 Arc::new(schema),
3564 vec![Arc::new(Int32Array::from(vec![
3565 Some(1),
3566 Some(2),
3567 Some(3),
3568 None,
3569 ]))],
3570 )?;
3571
3572 let list = vec![
3573 lit(ScalarValue::Int32(Some(1))),
3574 lit(ScalarValue::Int32(Some(3))),
3575 ];
3576 let expr = make_in_list_with_columns(col_a, list, false);
3577
3578 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3579 let result = as_boolean_array(&result);
3580 assert_eq!(
3581 result,
3582 &BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
3583 );
3584 Ok(())
3585 }
3586
3587 #[test]
3588 fn test_in_list_with_columns_int32_column_refs() -> Result<()> {
3589 let schema = Schema::new(vec![
3591 Field::new("a", DataType::Int32, true),
3592 Field::new("b", DataType::Int32, true),
3593 Field::new("c", DataType::Int32, true),
3594 ]);
3595 let batch = RecordBatch::try_new(
3596 Arc::new(schema.clone()),
3597 vec![
3598 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), None])),
3599 Arc::new(Int32Array::from(vec![
3600 Some(1),
3601 Some(99),
3602 Some(99),
3603 Some(99),
3604 ])),
3605 Arc::new(Int32Array::from(vec![Some(99), Some(99), Some(3), None])),
3606 ],
3607 )?;
3608
3609 let col_a = col("a", &schema)?;
3610 let list = vec![col("b", &schema)?, col("c", &schema)?];
3611 let expr = make_in_list_with_columns(col_a, list, false);
3612
3613 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3614 let result = as_boolean_array(&result);
3615 assert_eq!(
3620 result,
3621 &BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
3622 );
3623 Ok(())
3624 }
3625
3626 #[test]
3627 fn test_in_list_with_columns_utf8_column_refs() -> Result<()> {
3628 let schema = Schema::new(vec![
3630 Field::new("a", DataType::Utf8, false),
3631 Field::new("b", DataType::Utf8, false),
3632 ]);
3633 let batch = RecordBatch::try_new(
3634 Arc::new(schema.clone()),
3635 vec![
3636 Arc::new(StringArray::from(vec!["x", "y", "z"])),
3637 Arc::new(StringArray::from(vec!["x", "x", "z"])),
3638 ],
3639 )?;
3640
3641 let col_a = col("a", &schema)?;
3642 let list = vec![col("b", &schema)?];
3643 let expr = make_in_list_with_columns(col_a, list, false);
3644
3645 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3646 let result = as_boolean_array(&result);
3647 assert_eq!(result, &BooleanArray::from(vec![true, false, true]));
3651 Ok(())
3652 }
3653
3654 #[test]
3655 fn test_in_list_with_columns_negated() -> Result<()> {
3656 let schema = Schema::new(vec![
3658 Field::new("a", DataType::Int32, false),
3659 Field::new("b", DataType::Int32, false),
3660 ]);
3661 let batch = RecordBatch::try_new(
3662 Arc::new(schema.clone()),
3663 vec![
3664 Arc::new(Int32Array::from(vec![1, 2, 3])),
3665 Arc::new(Int32Array::from(vec![1, 99, 3])),
3666 ],
3667 )?;
3668
3669 let col_a = col("a", &schema)?;
3670 let list = vec![col("b", &schema)?];
3671 let expr = make_in_list_with_columns(col_a, list, true);
3672
3673 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3674 let result = as_boolean_array(&result);
3675 assert_eq!(result, &BooleanArray::from(vec![false, true, false]));
3679 Ok(())
3680 }
3681
3682 #[test]
3683 fn test_in_list_with_columns_null_in_list() -> Result<()> {
3684 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3686 let col_a = col("a", &schema)?;
3687 let batch = RecordBatch::try_new(
3688 Arc::new(schema),
3689 vec![Arc::new(Int32Array::from(vec![1, 2]))],
3690 )?;
3691
3692 let list = vec![
3693 lit(ScalarValue::Int32(None)),
3694 lit(ScalarValue::Int32(Some(1))),
3695 ];
3696 let expr = make_in_list_with_columns(col_a, list, false);
3697
3698 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3699 let result = as_boolean_array(&result);
3700 assert_eq!(result, &BooleanArray::from(vec![Some(true), None]));
3703 Ok(())
3704 }
3705
3706 #[test]
3707 fn test_in_list_with_columns_float_nan() -> Result<()> {
3708 let schema = Schema::new(vec![
3711 Field::new("a", DataType::Float64, false),
3712 Field::new("b", DataType::Float64, false),
3713 ]);
3714 let batch = RecordBatch::try_new(
3715 Arc::new(schema.clone()),
3716 vec![
3717 Arc::new(Float64Array::from(vec![f64::NAN, 1.0, f64::NAN])),
3718 Arc::new(Float64Array::from(vec![f64::NAN, 2.0, 0.0])),
3719 ],
3720 )?;
3721
3722 let col_a = col("a", &schema)?;
3723 let list = vec![col("b", &schema)?];
3724 let expr = make_in_list_with_columns(col_a, list, false);
3725
3726 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3727 let result = as_boolean_array(&result);
3728 assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
3732 Ok(())
3733 }
3734 #[test]
3738 fn test_in_list_with_columns_short_circuit() -> Result<()> {
3739 let schema = Schema::new(vec![
3742 Field::new("a", DataType::Int32, false),
3743 Field::new("b", DataType::Int32, false),
3744 Field::new("c", DataType::Int32, false),
3745 ]);
3746 let batch = RecordBatch::try_new(
3747 Arc::new(schema.clone()),
3748 vec![
3749 Arc::new(Int32Array::from(vec![1, 2, 3])),
3750 Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(Int32Array::from(vec![99, 99, 99])),
3752 ],
3753 )?;
3754
3755 let col_a = col("a", &schema)?;
3756 let list = vec![col("b", &schema)?, col("c", &schema)?];
3757 let expr = make_in_list_with_columns(col_a, list, false);
3758
3759 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3760 let result = as_boolean_array(&result);
3761 assert_eq!(result, &BooleanArray::from(vec![true, true, true]));
3762 Ok(())
3763 }
3764
3765 #[test]
3768 fn test_in_list_with_columns_short_circuit_with_nulls() -> Result<()> {
3769 let schema = Schema::new(vec![
3772 Field::new("a", DataType::Int32, true),
3773 Field::new("b", DataType::Int32, false),
3774 Field::new("c", DataType::Int32, false),
3775 ]);
3776 let batch = RecordBatch::try_new(
3777 Arc::new(schema.clone()),
3778 vec![
3779 Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
3780 Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(Int32Array::from(vec![99, 99, 99])),
3782 ],
3783 )?;
3784
3785 let col_a = col("a", &schema)?;
3786 let list = vec![col("b", &schema)?, col("c", &schema)?];
3787 let expr = make_in_list_with_columns(col_a, list, false);
3788
3789 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3790 let result = as_boolean_array(&result);
3791 assert_eq!(
3795 result,
3796 &BooleanArray::from(vec![Some(true), None, Some(true)])
3797 );
3798 Ok(())
3799 }
3800
3801 #[test]
3804 fn test_in_list_with_columns_struct() -> Result<()> {
3805 let struct_fields = Fields::from(vec![
3806 Field::new("x", DataType::Int32, false),
3807 Field::new("y", DataType::Utf8, false),
3808 ]);
3809 let struct_dt = DataType::Struct(struct_fields.clone());
3810
3811 let schema = Schema::new(vec![
3812 Field::new("a", struct_dt.clone(), true),
3813 Field::new("b", struct_dt.clone(), false),
3814 Field::new("c", struct_dt.clone(), false),
3815 ]);
3816
3817 let a = Arc::new(StructArray::new(
3821 struct_fields.clone(),
3822 vec![
3823 Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
3824 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
3825 ],
3826 Some(vec![true, true, false, true].into()),
3827 ));
3828 let b = Arc::new(StructArray::new(
3829 struct_fields.clone(),
3830 vec![
3831 Arc::new(Int32Array::from(vec![1, 9, 3, 4])),
3832 Arc::new(StringArray::from(vec!["a", "z", "c", "d"])),
3833 ],
3834 None,
3835 ));
3836 let c = Arc::new(StructArray::new(
3837 struct_fields.clone(),
3838 vec![
3839 Arc::new(Int32Array::from(vec![9, 2, 9, 9])),
3840 Arc::new(StringArray::from(vec!["z", "b", "z", "z"])),
3841 ],
3842 None,
3843 ));
3844
3845 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b, c])?;
3846
3847 let col_a = col("a", &schema)?;
3848 let list = vec![col("b", &schema)?, col("c", &schema)?];
3849 let expr = make_in_list_with_columns(col_a, list, false);
3850
3851 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3852 let result = as_boolean_array(&result);
3853 assert_eq!(
3858 result,
3859 &BooleanArray::from(vec![Some(true), Some(true), None, Some(true)])
3860 );
3861
3862 let col_a = col("a", &schema)?;
3864 let list = vec![col("b", &schema)?, col("c", &schema)?];
3865 let expr = make_in_list_with_columns(col_a, list, true);
3866
3867 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3868 let result = as_boolean_array(&result);
3869 assert_eq!(
3874 result,
3875 &BooleanArray::from(vec![Some(false), Some(false), None, Some(false)])
3876 );
3877 Ok(())
3878 }
3879
3880 fn wrap_in_dict(array: ArrayRef) -> ArrayRef {
3892 let keys = Int32Array::from((0..array.len() as i32).collect::<Vec<_>>());
3893 Arc::new(DictionaryArray::new(keys, array))
3894 }
3895
3896 fn eval_in_list_from_array(
3900 needle: ArrayRef,
3901 in_array: ArrayRef,
3902 ) -> Result<BooleanArray> {
3903 let schema =
3904 Schema::new(vec![Field::new("a", needle.data_type().clone(), false)]);
3905 let col_a = col("a", &schema)?;
3906 let expr = Arc::new(InListExpr::try_new_from_array(col_a, in_array, false)?)
3907 as Arc<dyn PhysicalExpr>;
3908 let batch = RecordBatch::try_new(Arc::new(schema), vec![needle])?;
3909 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3910 Ok(as_boolean_array(&result).clone())
3911 }
3912
3913 #[test]
3914 fn test_in_list_from_array_type_combinations() -> Result<()> {
3915 use arrow::compute::cast;
3916
3917 let expected = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
3919
3920 let base_in = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef;
3922 let base_needle = Arc::new(Int64Array::from(vec![1i64, 4, 2])) as ArrayRef;
3923
3924 let primitive_types = vec![
3926 DataType::Int8,
3927 DataType::Int16,
3928 DataType::Int32,
3929 DataType::Int64,
3930 DataType::UInt8,
3931 DataType::UInt16,
3932 DataType::UInt32,
3933 DataType::UInt64,
3934 DataType::Float32,
3935 DataType::Float64,
3936 ];
3937
3938 for dt in &primitive_types {
3939 let in_array = cast(&base_in, dt)?;
3940 let needle = cast(&base_needle, dt)?;
3941
3942 assert_eq!(
3944 expected,
3945 eval_in_list_from_array(Arc::clone(&needle), Arc::clone(&in_array))?,
3946 "same-type failed for {dt:?}"
3947 );
3948
3949 assert_eq!(
3951 expected,
3952 eval_in_list_from_array(wrap_in_dict(needle), in_array)?,
3953 "dict-needle failed for {dt:?}"
3954 );
3955 }
3956
3957 let utf8_in = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3959 let utf8_needle = Arc::new(StringArray::from(vec!["a", "d", "b"])) as ArrayRef;
3960
3961 assert_eq!(
3963 expected,
3964 eval_in_list_from_array(Arc::clone(&utf8_needle), Arc::clone(&utf8_in),)?
3965 );
3966
3967 assert_eq!(
3969 expected,
3970 eval_in_list_from_array(
3971 wrap_in_dict(Arc::clone(&utf8_needle)),
3972 Arc::clone(&utf8_in),
3973 )?
3974 );
3975
3976 assert_eq!(
3978 expected,
3979 eval_in_list_from_array(
3980 wrap_in_dict(Arc::clone(&utf8_needle)),
3981 wrap_in_dict(Arc::clone(&utf8_in)),
3982 )?
3983 );
3984
3985 let struct_fields = Fields::from(vec![
3987 Field::new("c0", DataType::Utf8, true),
3988 Field::new("c1", DataType::Int64, true),
3989 ]);
3990 let make_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
3991 let pairs: Vec<(FieldRef, ArrayRef)> =
3992 struct_fields.iter().cloned().zip([c0, c1]).collect();
3993 Arc::new(StructArray::from(pairs))
3994 };
3995 assert_eq!(
3996 expected,
3997 eval_in_list_from_array(
3998 make_struct(
3999 Arc::clone(&utf8_needle),
4000 Arc::new(Int64Array::from(vec![1, 4, 2])),
4001 ),
4002 make_struct(
4003 Arc::clone(&utf8_in),
4004 Arc::new(Int64Array::from(vec![1, 2, 3])),
4005 ),
4006 )?
4007 );
4008
4009 let dict_struct_fields = Fields::from(vec![
4011 Field::new(
4012 "c0",
4013 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
4014 true,
4015 ),
4016 Field::new("c1", DataType::Int64, true),
4017 ]);
4018 let make_dict_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
4019 let pairs: Vec<(FieldRef, ArrayRef)> =
4020 dict_struct_fields.iter().cloned().zip([c0, c1]).collect();
4021 Arc::new(StructArray::from(pairs))
4022 };
4023 assert_eq!(
4024 expected,
4025 eval_in_list_from_array(
4026 make_dict_struct(
4027 wrap_in_dict(Arc::clone(&utf8_needle)),
4028 Arc::new(Int64Array::from(vec![1, 4, 2])),
4029 ),
4030 make_dict_struct(
4031 wrap_in_dict(Arc::clone(&utf8_in)),
4032 Arc::new(Int64Array::from(vec![1, 2, 3])),
4033 ),
4034 )?
4035 );
4036
4037 Ok(())
4038 }
4039
4040 #[test]
4041 fn test_in_list_from_array_type_mismatch_errors() -> Result<()> {
4042 let err = eval_in_list_from_array(
4044 Arc::new(StringArray::from(vec!["a", "d", "b"])),
4045 wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
4046 )
4047 .unwrap_err()
4048 .to_string();
4049 assert!(
4050 err.contains("Can't compare arrays of different types"),
4051 "{err}"
4052 );
4053
4054 let err = eval_in_list_from_array(
4057 wrap_in_dict(Arc::new(StringArray::from(vec!["a", "d", "b"]))),
4058 Arc::new(Int64Array::from(vec![1, 2, 3])),
4059 )
4060 .unwrap_err()
4061 .to_string();
4062 assert!(err.contains("Failed to downcast"), "{err}");
4063
4064 let err = eval_in_list_from_array(
4067 wrap_in_dict(Arc::new(Int64Array::from(vec![1, 4, 2]))),
4068 wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
4069 )
4070 .unwrap_err()
4071 .to_string();
4072 assert!(
4073 err.contains("Can't compare arrays of different types"),
4074 "{err}"
4075 );
4076 Ok(())
4077 }
4078}