1use std::fmt::Debug;
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use crate::PhysicalExpr;
25use crate::physical_expr::physical_exprs_bag_equal;
26
27use arrow::array::*;
28use arrow::buffer::{BooleanBuffer, NullBuffer};
29use arrow::compute::SortOptions;
30use arrow::compute::kernels::boolean::{not, or_kleene};
31use arrow::compute::kernels::cmp::eq as arrow_eq;
32use arrow::datatypes::*;
33
34use datafusion_common::{
35 DFSchema, Result, ScalarValue, assert_or_internal_err, exec_err,
36};
37use datafusion_expr::{ColumnarValue, expr_vec_fmt};
38
39mod array_static_filter;
40mod primitive_filter;
41mod static_filter;
42mod strategy;
43
44use static_filter::StaticFilter;
45use strategy::instantiate_static_filter;
46
47pub struct InListExpr {
49 expr: Arc<dyn PhysicalExpr>,
50 list: Vec<Arc<dyn PhysicalExpr>>,
51 negated: bool,
52 static_filter: Option<Arc<dyn StaticFilter + Send + Sync>>,
53}
54
55impl Debug for InListExpr {
56 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
57 f.debug_struct("InListExpr")
58 .field("expr", &self.expr)
59 .field("list", &self.list)
60 .field("negated", &self.negated)
61 .finish()
62 }
63}
64
65fn supports_arrow_eq(dt: &DataType) -> bool {
72 use DataType::*;
73 match dt {
74 Boolean | Binary | LargeBinary | BinaryView | FixedSizeBinary(_) => true,
75 Dictionary(_, v) => supports_arrow_eq(v.as_ref()),
76 _ => dt.is_primitive() || dt.is_null() || dt.is_string(),
77 }
78}
79
80fn evaluate_list(
82 list: &[Arc<dyn PhysicalExpr>],
83 batch: &RecordBatch,
84) -> Result<ArrayRef> {
85 let scalars = list
86 .iter()
87 .map(|expr| {
88 expr.evaluate(batch).and_then(|r| match r {
89 ColumnarValue::Array(_) => {
90 exec_err!("InList expression must evaluate to a scalar")
91 }
92 ColumnarValue::Scalar(ScalarValue::Dictionary(_, v)) => Ok(*v),
94 ColumnarValue::Scalar(s) => Ok(s),
95 })
96 })
97 .collect::<Result<Vec<_>>>()?;
98
99 ScalarValue::iter_to_array(scalars)
100}
101
102fn try_evaluate_constant_list(
112 list: &[Arc<dyn PhysicalExpr>],
113 schema: &Schema,
114) -> Result<Option<ArrayRef>> {
115 let batch = RecordBatch::new_empty(Arc::new(schema.clone()));
116 match evaluate_list(list, &batch) {
117 Ok(array) => Ok(Some(array)),
118 Err(_) => {
119 Ok(None)
122 }
123 }
124}
125
126fn assert_inlist_data_types_match(
131 expr_data_type: &DataType,
132 list_data_type: &DataType,
133) -> Result<()> {
134 if *list_data_type != DataType::Null {
135 assert_or_internal_err!(
136 DFSchema::datatype_is_logically_equal(expr_data_type, list_data_type),
137 "The data type inlist should be same, the value type is {expr_data_type}, one of list expr type is {list_data_type}"
138 );
139 }
140 Ok(())
141}
142
143impl InListExpr {
144 fn new(
146 expr: Arc<dyn PhysicalExpr>,
147 list: Vec<Arc<dyn PhysicalExpr>>,
148 negated: bool,
149 static_filter: Option<Arc<dyn StaticFilter + Send + Sync>>,
150 ) -> Self {
151 Self {
152 expr,
153 list,
154 negated,
155 static_filter,
156 }
157 }
158
159 pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
161 &self.expr
162 }
163
164 pub fn list(&self) -> &[Arc<dyn PhysicalExpr>] {
166 &self.list
167 }
168
169 pub fn is_empty(&self) -> bool {
170 self.list.is_empty()
171 }
172
173 pub fn len(&self) -> usize {
174 self.list.len()
175 }
176
177 pub fn negated(&self) -> bool {
179 self.negated
180 }
181
182 pub fn try_new_from_array(
198 expr: Arc<dyn PhysicalExpr>,
199 array: ArrayRef,
200 negated: bool,
201 schema: &Schema,
202 ) -> Result<Self> {
203 let expr_data_type = expr.data_type(schema)?;
204 assert_inlist_data_types_match(&expr_data_type, array.data_type())?;
205
206 let list = (0..array.len())
207 .map(|i| {
208 let scalar = ScalarValue::try_from_array(array.as_ref(), i)?;
209 Ok(crate::expressions::lit(scalar) as Arc<dyn PhysicalExpr>)
210 })
211 .collect::<Result<Vec<_>>>()?;
212 Ok(Self::new(
213 expr,
214 list,
215 negated,
216 Some(instantiate_static_filter(array)?),
217 ))
218 }
219
220 pub fn try_new(
229 expr: Arc<dyn PhysicalExpr>,
230 list: Vec<Arc<dyn PhysicalExpr>>,
231 negated: bool,
232 schema: &Schema,
233 ) -> Result<Self> {
234 let expr_data_type = expr.data_type(schema)?;
236 for list_expr in list.iter() {
237 let list_expr_data_type = list_expr.data_type(schema)?;
238 assert_inlist_data_types_match(&expr_data_type, &list_expr_data_type)?;
239 }
240
241 let static_filter = match try_evaluate_constant_list(&list, schema)? {
243 Some(in_array) => Some(instantiate_static_filter(in_array)?),
244 None => None, };
246
247 Ok(Self::new(expr, list, negated, static_filter))
248 }
249}
250impl std::fmt::Display for InListExpr {
251 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
252 let list = expr_vec_fmt!(self.list);
253
254 if self.negated {
255 if self.static_filter.is_some() {
256 write!(f, "{} NOT IN (SET) ([{list}])", self.expr)
257 } else {
258 write!(f, "{} NOT IN ([{list}])", self.expr)
259 }
260 } else if self.static_filter.is_some() {
261 write!(f, "{} IN (SET) ([{list}])", self.expr)
262 } else {
263 write!(f, "{} IN ([{list}])", self.expr)
264 }
265 }
266}
267
268impl PhysicalExpr for InListExpr {
269 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
270 Ok(DataType::Boolean)
271 }
272
273 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
274 if self.expr.nullable(input_schema)? {
275 return Ok(true);
276 }
277
278 if let Some(static_filter) = &self.static_filter {
279 Ok(static_filter.null_count() > 0)
280 } else {
281 for expr in &self.list {
282 if expr.nullable(input_schema)? {
283 return Ok(true);
284 }
285 }
286 Ok(false)
287 }
288 }
289
290 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
291 let num_rows = batch.num_rows();
292 let value = self.expr.evaluate(batch)?;
293 let r = match &self.static_filter {
294 Some(filter) => {
295 match value {
296 ColumnarValue::Array(array) => {
297 filter.contains(&array, self.negated)?
298 }
299 ColumnarValue::Scalar(scalar) => {
300 if scalar.is_null() {
301 let nulls = NullBuffer::new_null(num_rows);
304 return Ok(ColumnarValue::Array(Arc::new(
305 BooleanArray::new(
306 BooleanBuffer::new_unset(num_rows),
307 Some(nulls),
308 ),
309 )));
310 }
311 let array = scalar.to_array()?;
314 let result_array =
315 filter.contains(array.as_ref(), self.negated)?;
316 if result_array.is_null(0) {
319 let nulls = NullBuffer::new_null(num_rows);
320 BooleanArray::new(
321 BooleanBuffer::new_unset(num_rows),
322 Some(nulls),
323 )
324 } else if result_array.value(0) {
325 BooleanArray::new(BooleanBuffer::new_set(num_rows), None)
326 } else {
327 BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)
328 }
329 }
330 }
331 }
332 None => {
333 let value = value.into_array(num_rows)?;
338 let lhs_supports_arrow_eq = supports_arrow_eq(value.data_type());
339
340 let compare_one = |expr: &Arc<dyn PhysicalExpr>| -> Result<BooleanArray> {
342 match expr.evaluate(batch)? {
343 ColumnarValue::Array(array) => {
344 if lhs_supports_arrow_eq
345 && supports_arrow_eq(array.data_type())
346 {
347 Ok(arrow_eq(&value, &array)?)
348 } else {
349 let cmp = make_comparator(
350 value.as_ref(),
351 array.as_ref(),
352 SortOptions::default(),
353 )?;
354 let buffer = BooleanBuffer::collect_bool(num_rows, |i| {
355 cmp(i, i).is_eq()
356 });
357 let nulls =
358 NullBuffer::union(value.nulls(), array.nulls());
359 Ok(BooleanArray::new(buffer, nulls))
360 }
361 }
362 ColumnarValue::Scalar(scalar) => {
363 if scalar.is_null() {
365 Ok(BooleanArray::from(vec![None; num_rows]))
367 } else if lhs_supports_arrow_eq {
368 let scalar_datum = scalar.to_scalar()?;
369 Ok(arrow_eq(&value, &scalar_datum)?)
370 } else {
371 let array = scalar.to_array()?;
373 let cmp = make_comparator(
374 value.as_ref(),
375 array.as_ref(),
376 SortOptions::default(),
377 )?;
378 let buffer = BooleanBuffer::collect_bool(num_rows, |i| {
380 cmp(i, 0).is_eq()
381 });
382 Ok(BooleanArray::new(buffer, value.nulls().cloned()))
383 }
384 }
385 }
386 };
387
388 let mut found = if let Some(first) = self.list.first() {
391 compare_one(first)?
392 } else {
393 BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)
394 };
395
396 for expr in self.list.iter().skip(1) {
397 if found.null_count() == 0 && !found.has_false() {
400 break;
401 }
402 found = or_kleene(&found, &compare_one(expr)?)?;
403 }
404
405 if self.negated { not(&found)? } else { found }
406 }
407 };
408 Ok(ColumnarValue::Array(Arc::new(r)))
409 }
410
411 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
412 let mut children = vec![&self.expr];
413 children.extend(&self.list);
414 children
415 }
416
417 fn with_new_children(
418 self: Arc<Self>,
419 children: Vec<Arc<dyn PhysicalExpr>>,
420 ) -> Result<Arc<dyn PhysicalExpr>> {
421 Ok(Arc::new(InListExpr::new(
423 Arc::clone(&children[0]),
424 children[1..].to_vec(),
425 self.negated,
426 self.static_filter.as_ref().map(Arc::clone),
427 )))
428 }
429
430 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 self.expr.fmt_sql(f)?;
432 if self.negated {
433 write!(f, " NOT")?;
434 }
435
436 write!(f, " IN (")?;
437 for (i, expr) in self.list.iter().enumerate() {
438 if i > 0 {
439 write!(f, ", ")?;
440 }
441 expr.fmt_sql(f)?;
442 }
443 write!(f, ")")
444 }
445}
446
447impl PartialEq for InListExpr {
448 fn eq(&self, other: &Self) -> bool {
449 self.expr.eq(&other.expr)
450 && physical_exprs_bag_equal(&self.list, &other.list)
451 && self.negated == other.negated
452 }
453}
454
455impl Eq for InListExpr {}
456
457impl Hash for InListExpr {
458 fn hash<H: Hasher>(&self, state: &mut H) {
459 self.expr.hash(state);
460 self.negated.hash(state);
461 self.list.hash(state);
463 }
464}
465
466pub fn in_list(
468 expr: Arc<dyn PhysicalExpr>,
469 list: Vec<Arc<dyn PhysicalExpr>>,
470 negated: &bool,
471 schema: &Schema,
472) -> Result<Arc<dyn PhysicalExpr>> {
473 Ok(Arc::new(InListExpr::try_new(expr, list, *negated, schema)?))
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use crate::expressions::{col, lit, try_cast};
480 use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano, i256};
481 use datafusion_common::plan_err;
482 use datafusion_expr::type_coercion::binary::comparison_coercion;
483 use datafusion_physical_expr_common::physical_expr::fmt_sql;
484 use insta::assert_snapshot;
485 use itertools::Itertools;
486
487 type InListCastResult = (Arc<dyn PhysicalExpr>, Vec<Arc<dyn PhysicalExpr>>);
488
489 fn in_list_cast(
492 expr: Arc<dyn PhysicalExpr>,
493 list: Vec<Arc<dyn PhysicalExpr>>,
494 input_schema: &Schema,
495 ) -> Result<InListCastResult> {
496 let expr_type = &expr.data_type(input_schema)?;
497 let list_types: Vec<DataType> = list
498 .iter()
499 .map(|list_expr| list_expr.data_type(input_schema).unwrap())
500 .collect();
501 let result_type = get_coerce_type(expr_type, &list_types);
502 match result_type {
503 None => plan_err!(
504 "Can not find compatible types to compare {expr_type} with [{}]",
505 list_types.iter().join(", ")
506 ),
507 Some(data_type) => {
508 let cast_expr = try_cast(expr, input_schema, data_type.clone())?;
510 let cast_list_expr = list
511 .into_iter()
512 .map(|list_expr| {
513 try_cast(list_expr, input_schema, data_type.clone()).unwrap()
514 })
515 .collect();
516 Ok((cast_expr, cast_list_expr))
517 }
518 }
519 }
520
521 fn get_coerce_type(expr_type: &DataType, list_type: &[DataType]) -> Option<DataType> {
524 list_type
525 .iter()
526 .try_fold(expr_type.clone(), |left_type, right_type| {
527 comparison_coercion(&left_type, right_type)
528 })
529 }
530
531 macro_rules! in_list {
544 ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{
545 let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
546 in_list_raw!(
547 $BATCH,
548 cast_list_exprs,
549 $NEGATED,
550 $EXPECTED,
551 cast_expr,
552 $SCHEMA
553 );
554 }};
555 }
556
557 macro_rules! in_list_raw {
571 ($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr, $SCHEMA:expr) => {{
572 let col_expr = $COL;
573 let expr = in_list(Arc::clone(&col_expr), $LIST, $NEGATED, $SCHEMA).unwrap();
574 let result = expr
575 .evaluate(&$BATCH)?
576 .into_array($BATCH.num_rows())
577 .expect("Failed to convert to array");
578 let result = as_boolean_array(&result);
579 let expected = &BooleanArray::from($EXPECTED);
580 assert_eq!(
581 expected,
582 result,
583 "Failed for: {}\n{}: {:?}",
584 fmt_sql(expr.as_ref()),
585 fmt_sql(col_expr.as_ref()),
586 col_expr
587 .evaluate(&$BATCH)?
588 .into_array($BATCH.num_rows())
589 .unwrap()
590 );
591 }};
592 }
593
594 struct InListPrimitiveTestCase {
603 name: &'static str,
604 value_in: ScalarValue,
605 value_not_in: ScalarValue,
606 other_list_values: Vec<ScalarValue>,
607 null_value: Option<ScalarValue>,
608 }
609
610 #[derive(Clone)]
615 struct PrimitiveTestCaseData<T> {
616 value_in: T,
617 value_not_in: T,
618 other_list_values: Vec<T>,
619 }
620
621 fn primitive_test_case<T, D, F>(
627 name: &'static str,
628 constructor: F,
629 data: PrimitiveTestCaseData<D>,
630 ) -> InListPrimitiveTestCase
631 where
632 D: TryInto<T> + Clone,
633 <D as TryInto<T>>::Error: Debug,
634 F: Fn(Option<T>) -> ScalarValue,
635 T: Clone,
636 {
637 InListPrimitiveTestCase {
638 name,
639 value_in: constructor(Some(data.value_in.try_into().unwrap())),
640 value_not_in: constructor(Some(data.value_not_in.try_into().unwrap())),
641 other_list_values: data
642 .other_list_values
643 .into_iter()
644 .map(|v| constructor(Some(v.try_into().unwrap())))
645 .collect(),
646 null_value: Some(constructor(None)),
647 }
648 }
649
650 fn primitive_test_case_no_nulls<T, D, F>(
653 name: &'static str,
654 constructor: F,
655 data: PrimitiveTestCaseData<D>,
656 ) -> InListPrimitiveTestCase
657 where
658 D: TryInto<T> + Clone,
659 <D as TryInto<T>>::Error: Debug,
660 F: Fn(Option<T>) -> ScalarValue,
661 T: Clone,
662 {
663 InListPrimitiveTestCase {
664 name,
665 value_in: constructor(Some(data.value_in.try_into().unwrap())),
666 value_not_in: constructor(Some(data.value_not_in.try_into().unwrap())),
667 other_list_values: data
668 .other_list_values
669 .into_iter()
670 .map(|v| constructor(Some(v.try_into().unwrap())))
671 .collect(),
672 null_value: None,
673 }
674 }
675
676 fn run_test_cases(test_cases: Vec<InListPrimitiveTestCase>) -> Result<()> {
682 for test_case in test_cases {
683 let test_name = test_case.name;
684
685 let data_type = test_case.value_in.data_type();
687
688 let build_base_list = || -> Vec<Arc<dyn PhysicalExpr>> {
690 let mut list = vec![lit(test_case.value_in.clone())];
691 list.extend(test_case.other_list_values.iter().map(|v| lit(v.clone())));
692 list
693 };
694
695 match &test_case.null_value {
696 Some(null_val) => {
697 let schema =
699 Schema::new(vec![Field::new("a", data_type.clone(), true)]);
700
701 let array = ScalarValue::iter_to_array(vec![
703 test_case.value_in.clone(),
704 test_case.value_not_in.clone(),
705 null_val.clone(),
706 ])?;
707
708 let col_a = col("a", &schema)?;
709 let batch = RecordBatch::try_new(
710 Arc::new(schema.clone()),
711 vec![Arc::clone(&array)],
712 )?;
713
714 let list = build_base_list();
716 in_list!(
717 batch,
718 list,
719 &false,
720 vec![Some(true), Some(false), None],
721 Arc::clone(&col_a),
722 &schema
723 );
724
725 let list = build_base_list();
727 in_list!(
728 batch,
729 list,
730 &true,
731 vec![Some(false), Some(true), None],
732 Arc::clone(&col_a),
733 &schema
734 );
735
736 let mut list = build_base_list();
738 list.push(lit(null_val.clone()));
739 in_list!(
740 batch,
741 list,
742 &false,
743 vec![Some(true), None, None],
744 Arc::clone(&col_a),
745 &schema
746 );
747
748 let mut list = build_base_list();
750 list.push(lit(null_val.clone()));
751 in_list!(
752 batch,
753 list,
754 &true,
755 vec![Some(false), None, None],
756 Arc::clone(&col_a),
757 &schema
758 );
759 }
760 None => {
761 let schema =
763 Schema::new(vec![Field::new("a", data_type.clone(), false)]);
764
765 let array = ScalarValue::iter_to_array(vec![
767 test_case.value_in.clone(),
768 test_case.value_not_in.clone(),
769 ])?;
770
771 let col_a = col("a", &schema)?;
772 let batch = RecordBatch::try_new(
773 Arc::new(schema.clone()),
774 vec![Arc::clone(&array)],
775 )?;
776
777 let list = build_base_list();
779 in_list!(
780 batch,
781 list,
782 &false,
783 vec![Some(true), Some(false)],
784 Arc::clone(&col_a),
785 &schema
786 );
787
788 let list = build_base_list();
790 in_list!(
791 batch,
792 list,
793 &true,
794 vec![Some(false), Some(true)],
795 Arc::clone(&col_a),
796 &schema
797 );
798
799 eprintln!(
800 "Test '{test_name}': exercised (false, true) branch (no nulls, negated)",
801 );
802 }
803 }
804 }
805
806 Ok(())
807 }
808
809 #[test]
813 fn in_list_int_types() -> Result<()> {
814 let int_data = PrimitiveTestCaseData {
815 value_in: 0,
816 value_not_in: 2,
817 other_list_values: vec![1, 3, 5],
818 };
819
820 run_test_cases(vec![
821 primitive_test_case("int8", ScalarValue::Int8, int_data.clone()),
823 primitive_test_case("int16", ScalarValue::Int16, int_data.clone()),
824 primitive_test_case("int32", ScalarValue::Int32, int_data.clone()),
825 primitive_test_case("int64", ScalarValue::Int64, int_data.clone()),
826 primitive_test_case("uint8", ScalarValue::UInt8, int_data.clone()),
827 primitive_test_case("uint16", ScalarValue::UInt16, int_data.clone()),
828 primitive_test_case("uint32", ScalarValue::UInt32, int_data.clone()),
829 primitive_test_case("uint64", ScalarValue::UInt64, int_data.clone()),
830 primitive_test_case_no_nulls("int32_no_nulls", ScalarValue::Int32, int_data),
832 ])
833 }
834
835 #[test]
839 fn in_list_string_types() -> Result<()> {
840 let string_data = PrimitiveTestCaseData {
841 value_in: "a",
842 value_not_in: "d",
843 other_list_values: vec!["b", "c"],
844 };
845
846 run_test_cases(vec![
847 primitive_test_case("utf8", ScalarValue::Utf8, string_data.clone()),
848 primitive_test_case(
849 "large_utf8",
850 ScalarValue::LargeUtf8,
851 string_data.clone(),
852 ),
853 primitive_test_case("utf8_view", ScalarValue::Utf8View, string_data),
854 ])
855 }
856
857 #[test]
861 fn in_list_binary_types() -> Result<()> {
862 let binary_data = PrimitiveTestCaseData {
863 value_in: vec![1_u8, 2, 3],
864 value_not_in: vec![1_u8, 2, 2],
865 other_list_values: vec![vec![4_u8, 5, 6], vec![7_u8, 8, 9]],
866 };
867
868 run_test_cases(vec![
869 primitive_test_case("binary", ScalarValue::Binary, binary_data.clone()),
870 primitive_test_case(
871 "large_binary",
872 ScalarValue::LargeBinary,
873 binary_data.clone(),
874 ),
875 primitive_test_case("binary_view", ScalarValue::BinaryView, binary_data),
876 ])
877 }
878
879 #[test]
883 fn in_list_date_types() -> Result<()> {
884 let date_data = PrimitiveTestCaseData {
885 value_in: 0,
886 value_not_in: 2,
887 other_list_values: vec![1, 3],
888 };
889
890 run_test_cases(vec![
891 primitive_test_case("date32", ScalarValue::Date32, date_data.clone()),
892 primitive_test_case("date64", ScalarValue::Date64, date_data),
893 ])
894 }
895
896 #[test]
900 fn in_list_decimal() -> Result<()> {
901 run_test_cases(vec![InListPrimitiveTestCase {
902 name: "decimal128",
903 value_in: ScalarValue::Decimal128(Some(0), 10, 2),
904 value_not_in: ScalarValue::Decimal128(Some(200), 10, 2),
905 other_list_values: vec![
906 ScalarValue::Decimal128(Some(100), 10, 2),
907 ScalarValue::Decimal128(Some(300), 10, 2),
908 ],
909 null_value: Some(ScalarValue::Decimal128(None, 10, 2)),
910 }])
911 }
912
913 #[test]
917 fn in_list_timestamp_types() -> Result<()> {
918 run_test_cases(vec![
919 InListPrimitiveTestCase {
920 name: "timestamp_nanosecond",
921 value_in: ScalarValue::TimestampNanosecond(Some(0), None),
922 value_not_in: ScalarValue::TimestampNanosecond(Some(2000), None),
923 other_list_values: vec![
924 ScalarValue::TimestampNanosecond(Some(1000), None),
925 ScalarValue::TimestampNanosecond(Some(3000), None),
926 ],
927 null_value: Some(ScalarValue::TimestampNanosecond(None, None)),
928 },
929 InListPrimitiveTestCase {
930 name: "timestamp_millisecond_with_tz",
931 value_in: ScalarValue::TimestampMillisecond(
932 Some(1500000),
933 Some("+05:00".into()),
934 ),
935 value_not_in: ScalarValue::TimestampMillisecond(
936 Some(2500000),
937 Some("+05:00".into()),
938 ),
939 other_list_values: vec![ScalarValue::TimestampMillisecond(
940 Some(3500000),
941 Some("+05:00".into()),
942 )],
943 null_value: Some(ScalarValue::TimestampMillisecond(
944 None,
945 Some("+05:00".into()),
946 )),
947 },
948 InListPrimitiveTestCase {
949 name: "timestamp_millisecond_mixed_tz",
950 value_in: ScalarValue::TimestampMillisecond(
951 Some(1500000),
952 Some("+05:00".into()),
953 ),
954 value_not_in: ScalarValue::TimestampMillisecond(
955 Some(2500000),
956 Some("+05:00".into()),
957 ),
958 other_list_values: vec![
959 ScalarValue::TimestampMillisecond(
960 Some(3500000),
961 Some("+01:00".into()),
962 ),
963 ScalarValue::TimestampMillisecond(Some(4500000), Some("UTC".into())),
964 ],
965 null_value: Some(ScalarValue::TimestampMillisecond(
966 None,
967 Some("+05:00".into()),
968 )),
969 },
970 ])
971 }
972
973 #[test]
974 fn in_list_float64() -> Result<()> {
975 let schema = Schema::new(vec![Field::new("a", DataType::Float64, true)]);
976 let a = Float64Array::from(vec![
977 Some(0.0),
978 Some(0.2),
979 None,
980 Some(f64::NAN),
981 Some(-f64::NAN),
982 ]);
983 let col_a = col("a", &schema)?;
984 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
985
986 let list = vec![lit(0.0f64), lit(0.1f64)];
988 in_list!(
989 batch,
990 list,
991 &false,
992 vec![Some(true), Some(false), None, Some(false), Some(false)],
993 Arc::clone(&col_a),
994 &schema
995 );
996
997 let list = vec![lit(0.0f64), lit(0.1f64)];
999 in_list!(
1000 batch,
1001 list,
1002 &true,
1003 vec![Some(false), Some(true), None, Some(true), Some(true)],
1004 Arc::clone(&col_a),
1005 &schema
1006 );
1007
1008 let list = vec![lit(0.0f64), lit(0.1f64), lit(ScalarValue::Null)];
1010 in_list!(
1011 batch,
1012 list,
1013 &false,
1014 vec![Some(true), None, None, None, None],
1015 Arc::clone(&col_a),
1016 &schema
1017 );
1018
1019 let list = vec![lit(0.0f64), lit(0.1f64), lit(ScalarValue::Null)];
1021 in_list!(
1022 batch,
1023 list,
1024 &true,
1025 vec![Some(false), None, None, None, None],
1026 Arc::clone(&col_a),
1027 &schema
1028 );
1029
1030 let list = vec![lit(0.0f64), lit(0.1f64), lit(f64::NAN)];
1032 in_list!(
1033 batch,
1034 list,
1035 &false,
1036 vec![Some(true), Some(false), None, Some(true), Some(false)],
1037 Arc::clone(&col_a),
1038 &schema
1039 );
1040
1041 let list = vec![lit(0.0f64), lit(0.1f64), lit(f64::NAN)];
1043 in_list!(
1044 batch,
1045 list,
1046 &true,
1047 vec![Some(false), Some(true), None, Some(false), Some(true)],
1048 Arc::clone(&col_a),
1049 &schema
1050 );
1051
1052 let list = vec![lit(0.0f64), lit(0.1f64), lit(-f64::NAN)];
1054 in_list!(
1055 batch,
1056 list,
1057 &false,
1058 vec![Some(true), Some(false), None, Some(false), Some(true)],
1059 Arc::clone(&col_a),
1060 &schema
1061 );
1062
1063 let list = vec![lit(0.0f64), lit(0.1f64), lit(-f64::NAN)];
1065 in_list!(
1066 batch,
1067 list,
1068 &true,
1069 vec![Some(false), Some(true), None, Some(true), Some(false)],
1070 Arc::clone(&col_a),
1071 &schema
1072 );
1073
1074 Ok(())
1075 }
1076
1077 #[test]
1078 fn in_list_bool() -> Result<()> {
1079 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]);
1080 let a = BooleanArray::from(vec![Some(true), None]);
1081 let col_a = col("a", &schema)?;
1082 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1083
1084 let list = vec![lit(true)];
1086 in_list!(
1087 batch,
1088 list,
1089 &false,
1090 vec![Some(true), None],
1091 Arc::clone(&col_a),
1092 &schema
1093 );
1094
1095 let list = vec![lit(true)];
1097 in_list!(
1098 batch,
1099 list,
1100 &true,
1101 vec![Some(false), None],
1102 Arc::clone(&col_a),
1103 &schema
1104 );
1105
1106 let list = vec![lit(true), lit(ScalarValue::Null)];
1108 in_list!(
1109 batch,
1110 list,
1111 &false,
1112 vec![Some(true), None],
1113 Arc::clone(&col_a),
1114 &schema
1115 );
1116
1117 let list = vec![lit(true), lit(ScalarValue::Null)];
1119 in_list!(
1120 batch,
1121 list,
1122 &true,
1123 vec![Some(false), None],
1124 Arc::clone(&col_a),
1125 &schema
1126 );
1127
1128 Ok(())
1129 }
1130
1131 macro_rules! test_nullable {
1132 ($COL:expr, $LIST:expr, $SCHEMA:expr, $EXPECTED:expr) => {{
1133 let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST, $SCHEMA)?;
1134 let expr = in_list(cast_expr, cast_list_exprs, &false, $SCHEMA).unwrap();
1135 let result = expr.nullable($SCHEMA)?;
1136 assert_eq!($EXPECTED, result);
1137 }};
1138 }
1139
1140 #[test]
1141 fn in_list_nullable() -> Result<()> {
1142 let schema = Schema::new(vec![
1143 Field::new("c1_nullable", DataType::Int64, true),
1144 Field::new("c2_non_nullable", DataType::Int64, false),
1145 ]);
1146
1147 let c1_nullable = col("c1_nullable", &schema)?;
1148 let c2_non_nullable = col("c2_non_nullable", &schema)?;
1149
1150 let list = vec![lit(1_i64), lit(2_i64)];
1152 test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1153 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, false);
1154
1155 let list = vec![lit(1_i64), lit(2_i64), lit(ScalarValue::Null)];
1157 test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1158 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, true);
1159
1160 let list = vec![Arc::clone(&c1_nullable)];
1161 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, true);
1162
1163 let list = vec![Arc::clone(&c2_non_nullable)];
1164 test_nullable!(Arc::clone(&c1_nullable), list.clone(), &schema, true);
1165
1166 let list = vec![Arc::clone(&c2_non_nullable), Arc::clone(&c2_non_nullable)];
1167 test_nullable!(Arc::clone(&c2_non_nullable), list.clone(), &schema, false);
1168
1169 Ok(())
1170 }
1171
1172 #[test]
1173 fn in_list_no_cols() -> Result<()> {
1174 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1176 let a = Int32Array::from(vec![Some(1), Some(2), None]);
1177 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1178
1179 let list = vec![lit(ScalarValue::from(1i32)), lit(ScalarValue::from(6i32))];
1180
1181 let expr = lit(ScalarValue::Int32(Some(1)));
1183 in_list!(
1184 batch,
1185 list.clone(),
1186 &false,
1187 vec![Some(true), Some(true), Some(true)],
1189 expr,
1190 &schema
1191 );
1192
1193 let expr = lit(ScalarValue::Int32(Some(2)));
1195 in_list!(
1196 batch,
1197 list.clone(),
1198 &false,
1199 vec![Some(false), Some(false), Some(false)],
1201 expr,
1202 &schema
1203 );
1204
1205 let expr = lit(ScalarValue::Int32(None));
1207 in_list!(
1208 batch,
1209 list.clone(),
1210 &false,
1211 vec![None, None, None],
1213 expr,
1214 &schema
1215 );
1216
1217 Ok(())
1218 }
1219
1220 #[test]
1221 fn in_list_utf8_with_dict_types() -> Result<()> {
1222 fn dict_lit(key_type: DataType, value: &str) -> Arc<dyn PhysicalExpr> {
1223 lit(ScalarValue::Dictionary(
1224 Box::new(key_type),
1225 Box::new(ScalarValue::new_utf8(value.to_string())),
1226 ))
1227 }
1228
1229 fn null_dict_lit(key_type: DataType) -> Arc<dyn PhysicalExpr> {
1230 lit(ScalarValue::Dictionary(
1231 Box::new(key_type),
1232 Box::new(ScalarValue::Utf8(None)),
1233 ))
1234 }
1235
1236 let schema = Schema::new(vec![Field::new(
1237 "a",
1238 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
1239 true,
1240 )]);
1241 let a: UInt16DictionaryArray =
1242 vec![Some("a"), Some("d"), None].into_iter().collect();
1243 let col_a = col("a", &schema)?;
1244 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1245
1246 let lists = [
1248 vec![lit("a"), lit("b")],
1249 vec![
1250 dict_lit(DataType::Int8, "a"),
1251 dict_lit(DataType::UInt16, "b"),
1252 ],
1253 ];
1254 for list in lists.iter() {
1255 in_list_raw!(
1256 batch,
1257 list.clone(),
1258 &false,
1259 vec![Some(true), Some(false), None],
1260 Arc::clone(&col_a),
1261 &schema
1262 );
1263 }
1264
1265 for list in lists.iter() {
1267 in_list_raw!(
1268 batch,
1269 list.clone(),
1270 &true,
1271 vec![Some(false), Some(true), None],
1272 Arc::clone(&col_a),
1273 &schema
1274 );
1275 }
1276
1277 let lists = [
1279 vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
1280 vec![
1281 dict_lit(DataType::Int8, "a"),
1282 dict_lit(DataType::UInt16, "b"),
1283 null_dict_lit(DataType::UInt16),
1284 ],
1285 ];
1286 for list in lists.iter() {
1287 in_list_raw!(
1288 batch,
1289 list.clone(),
1290 &false,
1291 vec![Some(true), None, None],
1292 Arc::clone(&col_a),
1293 &schema
1294 );
1295 }
1296
1297 for list in lists.iter() {
1299 in_list_raw!(
1300 batch,
1301 list.clone(),
1302 &true,
1303 vec![Some(false), None, None],
1304 Arc::clone(&col_a),
1305 &schema
1306 );
1307 }
1308
1309 Ok(())
1310 }
1311
1312 #[test]
1313 fn test_fmt_sql_1() -> Result<()> {
1314 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1315 let col_a = col("a", &schema)?;
1316
1317 let list = vec![lit("a"), lit("b")];
1319 let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?;
1320 let sql_string = fmt_sql(expr.as_ref()).to_string();
1321 let display_string = expr.to_string();
1322 assert_snapshot!(sql_string, @"a IN (a, b)");
1323 assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b])");
1324 Ok(())
1325 }
1326
1327 #[test]
1328 fn test_fmt_sql_2() -> Result<()> {
1329 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1330 let col_a = col("a", &schema)?;
1331
1332 let list = vec![lit("a"), lit("b")];
1334 let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?;
1335 let sql_string = fmt_sql(expr.as_ref()).to_string();
1336 let display_string = expr.to_string();
1337
1338 assert_snapshot!(sql_string, @"a NOT IN (a, b)");
1339 assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b])");
1340 Ok(())
1341 }
1342
1343 #[test]
1344 fn test_fmt_sql_3() -> Result<()> {
1345 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1346 let col_a = col("a", &schema)?;
1347 let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))];
1349 let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?;
1350 let sql_string = fmt_sql(expr.as_ref()).to_string();
1351 let display_string = expr.to_string();
1352
1353 assert_snapshot!(sql_string, @"a IN (a, b, NULL)");
1354 assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b, NULL])");
1355 Ok(())
1356 }
1357
1358 #[test]
1359 fn test_fmt_sql_4() -> Result<()> {
1360 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
1361 let col_a = col("a", &schema)?;
1362 let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))];
1364 let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?;
1365 let sql_string = fmt_sql(expr.as_ref()).to_string();
1366 let display_string = expr.to_string();
1367 assert_snapshot!(sql_string, @"a NOT IN (a, b, NULL)");
1368 assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b, NULL])");
1369 Ok(())
1370 }
1371
1372 #[test]
1373 fn in_list_struct() -> Result<()> {
1374 let struct_fields = Fields::from(vec![
1376 Field::new("x", DataType::Int32, false),
1377 Field::new("y", DataType::Utf8, false),
1378 ]);
1379 let schema = Schema::new(vec![Field::new(
1380 "a",
1381 DataType::Struct(struct_fields.clone()),
1382 true,
1383 )]);
1384
1385 let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
1387 let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1388 let struct_array =
1389 StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
1390
1391 let col_a = col("a", &schema)?;
1392 let batch =
1393 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1394
1395 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1398 struct_fields.clone(),
1399 vec![
1400 Arc::new(Int32Array::from(vec![1])),
1401 Arc::new(StringArray::from(vec!["a"])),
1402 ],
1403 None,
1404 )));
1405
1406 let struct3 = ScalarValue::Struct(Arc::new(StructArray::new(
1408 struct_fields.clone(),
1409 vec![
1410 Arc::new(Int32Array::from(vec![3])),
1411 Arc::new(StringArray::from(vec!["c"])),
1412 ],
1413 None,
1414 )));
1415
1416 let list = vec![lit(struct1.clone()), lit(struct3.clone())];
1418 in_list_raw!(
1419 batch,
1420 list.clone(),
1421 &false,
1422 vec![Some(true), Some(false), Some(true)],
1423 Arc::clone(&col_a),
1424 &schema
1425 );
1426
1427 in_list_raw!(
1429 batch,
1430 list,
1431 &true,
1432 vec![Some(false), Some(true), Some(false)],
1433 Arc::clone(&col_a),
1434 &schema
1435 );
1436
1437 Ok(())
1438 }
1439
1440 #[test]
1441 fn in_list_struct_with_nulls() -> Result<()> {
1442 let struct_fields = Fields::from(vec![
1444 Field::new("x", DataType::Int32, false),
1445 Field::new("y", DataType::Utf8, false),
1446 ]);
1447 let schema = Schema::new(vec![Field::new(
1448 "a",
1449 DataType::Struct(struct_fields.clone()),
1450 true,
1451 )]);
1452
1453 let x_array = Arc::new(Int32Array::from(vec![1, 2]));
1455 let y_array = Arc::new(StringArray::from(vec!["a", "b"]));
1456 let struct_array = StructArray::new(
1457 struct_fields.clone(),
1458 vec![x_array, y_array],
1459 Some(NullBuffer::from(vec![true, false])),
1460 );
1461
1462 let col_a = col("a", &schema)?;
1463 let batch =
1464 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1465
1466 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1468 struct_fields.clone(),
1469 vec![
1470 Arc::new(Int32Array::from(vec![1])),
1471 Arc::new(StringArray::from(vec!["a"])),
1472 ],
1473 None,
1474 )));
1475
1476 let list = vec![lit(struct1.clone())];
1478 in_list_raw!(
1479 batch,
1480 list.clone(),
1481 &false,
1482 vec![Some(true), None],
1483 Arc::clone(&col_a),
1484 &schema
1485 );
1486
1487 in_list_raw!(
1489 batch,
1490 list,
1491 &true,
1492 vec![Some(false), None],
1493 Arc::clone(&col_a),
1494 &schema
1495 );
1496
1497 Ok(())
1498 }
1499
1500 #[test]
1501 fn in_list_struct_with_null_in_list() -> Result<()> {
1502 let struct_fields = Fields::from(vec![
1504 Field::new("x", DataType::Int32, false),
1505 Field::new("y", DataType::Utf8, false),
1506 ]);
1507 let schema = Schema::new(vec![Field::new(
1508 "a",
1509 DataType::Struct(struct_fields.clone()),
1510 true,
1511 )]);
1512
1513 let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
1515 let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1516 let struct_array =
1517 StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
1518
1519 let col_a = col("a", &schema)?;
1520 let batch =
1521 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1522
1523 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1525 struct_fields.clone(),
1526 vec![
1527 Arc::new(Int32Array::from(vec![1])),
1528 Arc::new(StringArray::from(vec!["a"])),
1529 ],
1530 None,
1531 )));
1532
1533 let null_struct = ScalarValue::Struct(Arc::new(StructArray::new_null(
1534 struct_fields.clone(),
1535 1,
1536 )));
1537
1538 let list = vec![lit(struct1), lit(null_struct.clone())];
1540 in_list_raw!(
1541 batch,
1542 list.clone(),
1543 &false,
1544 vec![Some(true), None, None],
1545 Arc::clone(&col_a),
1546 &schema
1547 );
1548
1549 in_list_raw!(
1551 batch,
1552 list,
1553 &true,
1554 vec![Some(false), None, None],
1555 Arc::clone(&col_a),
1556 &schema
1557 );
1558
1559 Ok(())
1560 }
1561
1562 #[test]
1563 fn in_list_nested_struct() -> Result<()> {
1564 let inner_struct_fields = Fields::from(vec![
1566 Field::new("a", DataType::Int32, false),
1567 Field::new("b", DataType::Utf8, false),
1568 ]);
1569 let outer_struct_fields = Fields::from(vec![
1570 Field::new(
1571 "inner",
1572 DataType::Struct(inner_struct_fields.clone()),
1573 false,
1574 ),
1575 Field::new("c", DataType::Int32, false),
1576 ]);
1577 let schema = Schema::new(vec![Field::new(
1578 "x",
1579 DataType::Struct(outer_struct_fields.clone()),
1580 true,
1581 )]);
1582
1583 let inner1 = Arc::new(StructArray::new(
1585 inner_struct_fields.clone(),
1586 vec![
1587 Arc::new(Int32Array::from(vec![1, 2])),
1588 Arc::new(StringArray::from(vec!["x", "y"])),
1589 ],
1590 None,
1591 ));
1592 let c_array = Arc::new(Int32Array::from(vec![10, 20]));
1593 let outer_array =
1594 StructArray::new(outer_struct_fields.clone(), vec![inner1, c_array], None);
1595
1596 let col_x = col("x", &schema)?;
1597 let batch =
1598 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(outer_array)])?;
1599
1600 let inner_match = Arc::new(StructArray::new(
1602 inner_struct_fields.clone(),
1603 vec![
1604 Arc::new(Int32Array::from(vec![1])),
1605 Arc::new(StringArray::from(vec!["x"])),
1606 ],
1607 None,
1608 ));
1609 let outer_match = ScalarValue::Struct(Arc::new(StructArray::new(
1610 outer_struct_fields.clone(),
1611 vec![inner_match, Arc::new(Int32Array::from(vec![10]))],
1612 None,
1613 )));
1614
1615 let list = vec![lit(outer_match)];
1617 in_list_raw!(
1618 batch,
1619 list.clone(),
1620 &false,
1621 vec![Some(true), Some(false)],
1622 Arc::clone(&col_x),
1623 &schema
1624 );
1625
1626 in_list_raw!(
1628 batch,
1629 list,
1630 &true,
1631 vec![Some(false), Some(true)],
1632 Arc::clone(&col_x),
1633 &schema
1634 );
1635
1636 Ok(())
1637 }
1638
1639 #[test]
1640 fn in_list_struct_with_exprs_not_array() -> Result<()> {
1641 let struct_fields = Fields::from(vec![
1647 Field::new("x", DataType::Int32, false),
1648 Field::new("y", DataType::Utf8, false),
1649 ]);
1650 let schema = Schema::new(vec![Field::new(
1651 "a",
1652 DataType::Struct(struct_fields.clone()),
1653 true,
1654 )]);
1655
1656 let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
1658 let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
1659 let struct_array =
1660 StructArray::new(struct_fields.clone(), vec![x_array, y_array], None);
1661
1662 let col_a = col("a", &schema)?;
1663 let batch =
1664 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(struct_array)])?;
1665
1666 let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
1669 struct_fields.clone(),
1670 vec![
1671 Arc::new(Int32Array::from(vec![1])),
1672 Arc::new(StringArray::from(vec!["a"])),
1673 ],
1674 None,
1675 )));
1676
1677 let struct3 = ScalarValue::Struct(Arc::new(StructArray::new(
1679 struct_fields.clone(),
1680 vec![
1681 Arc::new(Int32Array::from(vec![3])),
1682 Arc::new(StringArray::from(vec!["c"])),
1683 ],
1684 None,
1685 )));
1686
1687 let list = vec![lit(struct1), lit(struct3)];
1689
1690 let expr = Arc::new(InListExpr::new(Arc::clone(&col_a), list, false, None));
1693
1694 let display_string = expr.to_string();
1697 assert!(
1698 !display_string.contains("(SET)"),
1699 "Expected display string to NOT contain '(SET)' (should use Exprs variant), but got: {display_string}",
1700 );
1701
1702 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
1704 let result = as_boolean_array(&result);
1705
1706 let expected = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
1710 assert_eq!(result, &expected);
1711
1712 let expr_not = Arc::new(InListExpr::new(
1714 Arc::clone(&col_a),
1715 vec![
1716 lit(ScalarValue::Struct(Arc::new(StructArray::new(
1717 struct_fields.clone(),
1718 vec![
1719 Arc::new(Int32Array::from(vec![1])),
1720 Arc::new(StringArray::from(vec!["a"])),
1721 ],
1722 None,
1723 )))),
1724 lit(ScalarValue::Struct(Arc::new(StructArray::new(
1725 struct_fields.clone(),
1726 vec![
1727 Arc::new(Int32Array::from(vec![3])),
1728 Arc::new(StringArray::from(vec!["c"])),
1729 ],
1730 None,
1731 )))),
1732 ],
1733 true,
1734 None,
1735 ));
1736
1737 let result_not = expr_not.evaluate(&batch)?.into_array(batch.num_rows())?;
1738 let result_not = as_boolean_array(&result_not);
1739
1740 let expected_not = BooleanArray::from(vec![Some(false), Some(true), Some(false)]);
1741 assert_eq!(result_not, &expected_not);
1742
1743 Ok(())
1744 }
1745
1746 #[test]
1747 fn test_in_list_null_handling_comprehensive() -> Result<()> {
1748 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
1751
1752 let a = Int64Array::from(vec![Some(1), Some(2), Some(3), None]);
1758 let col_a = col("a", &schema)?;
1759 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1760
1761 let list = vec![lit(1i64), lit(4i64)];
1764 in_list!(
1765 batch,
1766 list,
1767 &false,
1768 vec![
1769 Some(true), Some(false), Some(false), None, ],
1774 Arc::clone(&col_a),
1775 &schema
1776 );
1777
1778 let list = vec![lit(1i64), lit(ScalarValue::Int64(None))];
1781 in_list!(
1782 batch,
1783 list,
1784 &false,
1785 vec![
1786 Some(true), None, None, None, ],
1791 Arc::clone(&col_a),
1792 &schema
1793 );
1794
1795 Ok(())
1796 }
1797
1798 #[test]
1799 fn test_in_list_with_only_nulls() -> Result<()> {
1800 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
1802 let a = Int64Array::from(vec![Some(1), Some(2), None]);
1803 let col_a = col("a", &schema)?;
1804 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1805
1806 let list = vec![lit(ScalarValue::Int64(None)), lit(ScalarValue::Int64(None))];
1808
1809 in_list!(
1813 batch,
1814 list.clone(),
1815 &false,
1816 vec![None, None, None],
1817 Arc::clone(&col_a),
1818 &schema
1819 );
1820
1821 in_list!(
1824 batch,
1825 list,
1826 &true,
1827 vec![None, None, None],
1828 Arc::clone(&col_a),
1829 &schema
1830 );
1831
1832 Ok(())
1833 }
1834
1835 #[test]
1836 fn test_in_list_multiple_nulls_deduplication() -> Result<()> {
1837 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
1840 let col_a = col("a", &schema)?;
1841
1842 let array = Arc::new(Int64Array::from(vec![
1844 Some(1),
1845 Some(2),
1846 None,
1847 None,
1848 Some(3),
1849 None,
1850 ])) as ArrayRef;
1851
1852 let expr = Arc::new(InListExpr::try_new_from_array(
1854 Arc::clone(&col_a),
1855 array,
1856 false,
1857 &schema,
1858 )?) as Arc<dyn PhysicalExpr>;
1859
1860 let a = Int64Array::from(vec![Some(1), Some(2), Some(3), Some(4), None]);
1862 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1863
1864 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
1866 let result = as_boolean_array(&result);
1867
1868 let expected = BooleanArray::from(vec![
1873 Some(true), Some(true), Some(true), None, None, ]);
1879 assert_eq!(result, &expected);
1880
1881 Ok(())
1882 }
1883
1884 #[test]
1885 fn test_not_in_null_handling_comprehensive() -> Result<()> {
1886 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
1889
1890 let a = Int64Array::from(vec![Some(1), Some(2), Some(3), None]);
1892 let col_a = col("a", &schema)?;
1893 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1894
1895 let list = vec![lit(1i64), lit(4i64)];
1898 in_list!(
1899 batch,
1900 list,
1901 &true,
1902 vec![
1903 Some(false), Some(true), Some(true), None, ],
1908 Arc::clone(&col_a),
1909 &schema
1910 );
1911
1912 let list = vec![lit(1i64), lit(ScalarValue::Int64(None))];
1915 in_list!(
1916 batch,
1917 list,
1918 &true,
1919 vec![
1920 Some(false), None, None, None, ],
1925 Arc::clone(&col_a),
1926 &schema
1927 );
1928
1929 Ok(())
1930 }
1931
1932 #[test]
1933 fn test_in_list_null_type_column() -> Result<()> {
1934 let schema = Schema::new(vec![Field::new("a", DataType::Null, true)]);
1937 let a = NullArray::new(3);
1938 let col_a = col("a", &schema)?;
1939 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1940
1941 let list = vec![lit(1i64), lit(2i64)];
1944
1945 in_list!(
1949 batch,
1950 list.clone(),
1951 &false,
1952 vec![None, None, None],
1953 Arc::clone(&col_a),
1954 &schema
1955 );
1956
1957 in_list!(
1960 batch,
1961 list,
1962 &true,
1963 vec![None, None, None],
1964 Arc::clone(&col_a),
1965 &schema
1966 );
1967
1968 Ok(())
1969 }
1970
1971 #[test]
1972 fn test_in_list_null_type_list() -> Result<()> {
1973 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
1975 let a = Int64Array::from(vec![Some(1), Some(2), None]);
1976 let col_a = col("a", &schema)?;
1977
1978 let null_array = Arc::new(NullArray::new(2)) as ArrayRef;
1980
1981 let expr = Arc::new(InListExpr::try_new_from_array(
1984 Arc::clone(&col_a),
1985 null_array,
1986 false,
1987 &schema,
1988 )?) as Arc<dyn PhysicalExpr>;
1989 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
1990 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
1991 let result = as_boolean_array(&result);
1992
1993 let expected = BooleanArray::from(vec![None, None, None]);
1996 assert_eq!(result, &expected);
1997
1998 Ok(())
1999 }
2000
2001 #[test]
2002 fn test_in_list_null_type_both() -> Result<()> {
2003 let schema = Schema::new(vec![Field::new("a", DataType::Null, true)]);
2005 let a = NullArray::new(3);
2006 let col_a = col("a", &schema)?;
2007
2008 let null_array = Arc::new(NullArray::new(2)) as ArrayRef;
2010
2011 let expr = Arc::new(InListExpr::try_new_from_array(
2013 Arc::clone(&col_a),
2014 null_array,
2015 false,
2016 &schema,
2017 )?) as Arc<dyn PhysicalExpr>;
2018
2019 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)])?;
2020 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2021 let result = as_boolean_array(&result);
2022
2023 let expected = BooleanArray::from(vec![None, None, None]);
2026 assert_eq!(result, &expected);
2027
2028 Ok(())
2029 }
2030
2031 #[test]
2032 fn test_in_list_comprehensive_null_handling() -> Result<()> {
2033 let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2041 let col_b = col("b", &schema)?;
2042 let null_i32 = ScalarValue::Int32(None);
2043
2044 let make_batch = |values: Vec<Option<i32>>| -> Result<RecordBatch> {
2046 let array = Arc::new(Int32Array::from(values));
2047 Ok(RecordBatch::try_new(Arc::clone(&schema), vec![array])?)
2048 };
2049
2050 let run_test = |batch: &RecordBatch,
2052 expr: Arc<dyn PhysicalExpr>,
2053 list: Vec<Arc<dyn PhysicalExpr>>,
2054 expected: Vec<Option<bool>>|
2055 -> Result<()> {
2056 let in_expr = in_list(expr, list, &false, schema.as_ref())?;
2057 let result = in_expr.evaluate(batch)?.into_array(batch.num_rows())?;
2058 let result = as_boolean_array(&result);
2059 assert_eq!(result, &BooleanArray::from(expected));
2060 Ok(())
2061 };
2062
2063 let batch = make_batch(vec![Some(1)])?;
2069 run_test(
2070 &batch,
2071 Arc::clone(&col_b),
2072 vec![lit(1i32), lit(2i32)],
2073 vec![Some(true)],
2074 )?;
2075
2076 let batch = make_batch(vec![Some(1), Some(2)])?;
2078 run_test(
2079 &batch,
2080 Arc::clone(&col_b),
2081 vec![lit(1i32), lit(2i32)],
2082 vec![Some(true), Some(true)],
2083 )?;
2084
2085 let batch = make_batch(vec![Some(3), Some(4)])?;
2087 run_test(
2088 &batch,
2089 Arc::clone(&col_b),
2090 vec![lit(1i32), lit(2i32)],
2091 vec![Some(false), Some(false)],
2092 )?;
2093
2094 let batch = make_batch(vec![Some(1), None])?;
2096 run_test(
2097 &batch,
2098 Arc::clone(&col_b),
2099 vec![lit(1i32), lit(2i32)],
2100 vec![Some(true), None],
2101 )?;
2102
2103 let batch = make_batch(vec![Some(3), None])?;
2105 run_test(
2106 &batch,
2107 Arc::clone(&col_b),
2108 vec![lit(1i32), lit(2i32)],
2109 vec![Some(false), None],
2110 )?;
2111
2112 let batch = make_batch(vec![Some(1)])?;
2118 run_test(
2119 &batch,
2120 Arc::clone(&col_b),
2121 vec![lit(null_i32.clone()), lit(1i32)],
2122 vec![Some(true)],
2123 )?;
2124
2125 let batch = make_batch(vec![Some(2)])?;
2127 run_test(
2128 &batch,
2129 Arc::clone(&col_b),
2130 vec![lit(null_i32.clone()), lit(1i32)],
2131 vec![None],
2132 )?;
2133
2134 let batch = make_batch(vec![None])?;
2136 run_test(
2137 &batch,
2138 Arc::clone(&col_b),
2139 vec![lit(null_i32.clone()), lit(1i32)],
2140 vec![None],
2141 )?;
2142
2143 let batch = make_batch(vec![Some(1)])?;
2149 run_test(
2150 &batch,
2151 Arc::clone(&col_b),
2152 vec![lit(null_i32.clone()), lit(null_i32.clone())],
2153 vec![None],
2154 )?;
2155
2156 let batch = make_batch(vec![None])?;
2158 run_test(
2159 &batch,
2160 Arc::clone(&col_b),
2161 vec![lit(null_i32.clone()), lit(null_i32.clone())],
2162 vec![None],
2163 )?;
2164
2165 let batch = make_batch(vec![Some(1)])?;
2171 run_test(
2172 &batch,
2173 lit(1i32),
2174 vec![lit(2i32), Arc::clone(&col_b)],
2175 vec![Some(true)],
2176 )?;
2177
2178 let batch = make_batch(vec![Some(3)])?;
2180 run_test(
2181 &batch,
2182 lit(1i32),
2183 vec![lit(2i32), Arc::clone(&col_b)],
2184 vec![Some(false)],
2185 )?;
2186
2187 let batch = make_batch(vec![None])?;
2189 run_test(
2190 &batch,
2191 lit(1i32),
2192 vec![lit(2i32), Arc::clone(&col_b)],
2193 vec![None],
2194 )?;
2195
2196 let batch = make_batch(vec![Some(1)])?;
2202 run_test(
2203 &batch,
2204 Arc::clone(&col_b),
2205 vec![lit(1i32), Arc::clone(&col_b)],
2206 vec![Some(true)],
2207 )?;
2208
2209 let batch = make_batch(vec![Some(2)])?;
2211 run_test(
2212 &batch,
2213 Arc::clone(&col_b),
2214 vec![lit(1i32), Arc::clone(&col_b)],
2215 vec![Some(true)],
2216 )?;
2217
2218 let batch = make_batch(vec![None])?;
2220 run_test(
2221 &batch,
2222 Arc::clone(&col_b),
2223 vec![lit(1i32), Arc::clone(&col_b)],
2224 vec![None],
2225 )?;
2226
2227 Ok(())
2228 }
2229
2230 #[test]
2231 fn test_in_list_scalar_literal_cases() -> Result<()> {
2232 let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2237 let null_i32 = ScalarValue::Int32(None);
2238
2239 let make_batch = |values: Vec<Option<i32>>| -> Result<RecordBatch> {
2241 let array = Arc::new(Int32Array::from(values));
2242 Ok(RecordBatch::try_new(Arc::clone(&schema), vec![array])?)
2243 };
2244
2245 let run_test = |batch: &RecordBatch,
2247 expr: Arc<dyn PhysicalExpr>,
2248 list: Vec<Arc<dyn PhysicalExpr>>,
2249 negated: bool,
2250 expected: Vec<Option<bool>>|
2251 -> Result<()> {
2252 let in_expr = in_list(expr, list, &negated, schema.as_ref())?;
2253 let result = in_expr.evaluate(batch)?.into_array(batch.num_rows())?;
2254 let result = as_boolean_array(&result);
2255 let expected_array = BooleanArray::from(expected);
2256 assert_eq!(
2257 result,
2258 &expected_array,
2259 "Expected {:?}, got {:?}",
2260 expected_array,
2261 result.iter().collect::<Vec<_>>()
2262 );
2263 Ok(())
2264 };
2265
2266 let batch = make_batch(vec![Some(1)])?;
2267
2268 run_test(
2275 &batch,
2276 lit(null_i32.clone()),
2277 vec![lit(1i32), lit(1i32)],
2278 false,
2279 vec![None],
2280 )?;
2281
2282 run_test(
2284 &batch,
2285 lit(null_i32.clone()),
2286 vec![lit(null_i32.clone()), lit(1i32)],
2287 false,
2288 vec![None],
2289 )?;
2290
2291 run_test(
2293 &batch,
2294 lit(null_i32.clone()),
2295 vec![lit(null_i32.clone()), lit(null_i32.clone())],
2296 false,
2297 vec![None],
2298 )?;
2299
2300 run_test(
2308 &batch,
2309 lit(3i32),
2310 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2311 false,
2312 vec![None],
2313 )?;
2314
2315 run_test(
2317 &batch,
2318 lit(3i32),
2319 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2320 true,
2321 vec![None],
2322 )?;
2323
2324 run_test(
2326 &batch,
2327 lit(1i32),
2328 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2329 false,
2330 vec![Some(true)],
2331 )?;
2332
2333 run_test(
2335 &batch,
2336 lit(1i32),
2337 vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
2338 true,
2339 vec![Some(false)],
2340 )?;
2341
2342 let schema_str =
2348 Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
2349 let batch_str = RecordBatch::try_new(
2350 Arc::clone(&schema_str),
2351 vec![Arc::new(StringArray::from(vec![Some("dummy")]))],
2352 )?;
2353 let null_str = ScalarValue::Utf8(None);
2354
2355 let run_test_str = |expr: Arc<dyn PhysicalExpr>,
2356 list: Vec<Arc<dyn PhysicalExpr>>,
2357 negated: bool,
2358 expected: Vec<Option<bool>>|
2359 -> Result<()> {
2360 let in_expr = in_list(expr, list, &negated, schema_str.as_ref())?;
2361 let result = in_expr
2362 .evaluate(&batch_str)?
2363 .into_array(batch_str.num_rows())?;
2364 let result = as_boolean_array(&result);
2365 let expected_array = BooleanArray::from(expected);
2366 assert_eq!(
2367 result,
2368 &expected_array,
2369 "Expected {:?}, got {:?}",
2370 expected_array,
2371 result.iter().collect::<Vec<_>>()
2372 );
2373 Ok(())
2374 };
2375
2376 run_test_str(
2378 lit("c"),
2379 vec![lit("a"), lit("b"), lit(null_str.clone())],
2380 false,
2381 vec![None],
2382 )?;
2383
2384 run_test_str(
2386 lit("c"),
2387 vec![lit("a"), lit("b"), lit(null_str.clone())],
2388 true,
2389 vec![None],
2390 )?;
2391
2392 run_test_str(
2394 lit("a"),
2395 vec![lit("a"), lit("b"), lit(null_str.clone())],
2396 false,
2397 vec![Some(true)],
2398 )?;
2399
2400 run_test_str(
2402 lit("a"),
2403 vec![lit("a"), lit("b"), lit(null_str.clone())],
2404 true,
2405 vec![Some(false)],
2406 )?;
2407
2408 Ok(())
2409 }
2410
2411 #[test]
2412 fn test_in_list_tuple_cases() -> Result<()> {
2413 let schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Int32, true)]));
2417
2418 let make_struct = |v1: Option<i32>, v2: Option<i32>| -> ScalarValue {
2420 let fields = Fields::from(vec![
2421 Field::new("field_0", DataType::Int32, true),
2422 Field::new("field_1", DataType::Int32, true),
2423 ]);
2424 ScalarValue::Struct(Arc::new(StructArray::new(
2425 fields,
2426 vec![
2427 Arc::new(Int32Array::from(vec![v1])),
2428 Arc::new(Int32Array::from(vec![v2])),
2429 ],
2430 None,
2431 )))
2432 };
2433
2434 let batch = RecordBatch::try_new(
2436 Arc::clone(&schema),
2437 vec![Arc::new(Int32Array::from(vec![Some(1)]))],
2438 )?;
2439
2440 let run_tuple_test = |lhs: ScalarValue,
2442 list: Vec<ScalarValue>,
2443 expected: Vec<Option<bool>>|
2444 -> Result<()> {
2445 let expr = in_list(
2446 lit(lhs),
2447 list.into_iter().map(lit).collect(),
2448 &false,
2449 schema.as_ref(),
2450 )?;
2451 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
2452 let result = as_boolean_array(&result);
2453 assert_eq!(result, &BooleanArray::from(expected));
2454 Ok(())
2455 };
2456
2457 run_tuple_test(
2459 make_struct(None, None),
2460 vec![make_struct(Some(1), Some(2))],
2461 vec![Some(false)],
2462 )?;
2463
2464 run_tuple_test(
2466 make_struct(None, None),
2467 vec![make_struct(None, Some(1))],
2468 vec![Some(false)],
2469 )?;
2470
2471 run_tuple_test(
2473 make_struct(None, None),
2474 vec![make_struct(None, None)],
2475 vec![Some(true)],
2476 )?;
2477
2478 run_tuple_test(
2480 make_struct(None, Some(1)),
2481 vec![make_struct(Some(1), Some(2))],
2482 vec![Some(false)],
2483 )?;
2484
2485 run_tuple_test(
2487 make_struct(None, Some(1)),
2488 vec![make_struct(None, Some(1))],
2489 vec![Some(true)],
2490 )?;
2491
2492 run_tuple_test(
2494 make_struct(None, Some(1)),
2495 vec![make_struct(None, None)],
2496 vec![Some(false)],
2497 )?;
2498
2499 run_tuple_test(
2501 make_struct(Some(1), Some(2)),
2502 vec![make_struct(Some(1), Some(2))],
2503 vec![Some(true)],
2504 )?;
2505
2506 run_tuple_test(
2508 make_struct(Some(1), Some(3)),
2509 vec![make_struct(Some(1), Some(2))],
2510 vec![Some(false)],
2511 )?;
2512
2513 run_tuple_test(
2515 make_struct(Some(4), Some(4)),
2516 vec![make_struct(Some(1), Some(2))],
2517 vec![Some(false)],
2518 )?;
2519
2520 run_tuple_test(
2522 make_struct(Some(1), Some(1)),
2523 vec![make_struct(None, Some(1))],
2524 vec![Some(false)],
2525 )?;
2526
2527 run_tuple_test(
2529 make_struct(Some(1), Some(1)),
2530 vec![make_struct(None, None)],
2531 vec![Some(false)],
2532 )?;
2533
2534 Ok(())
2535 }
2536
2537 #[test]
2538 fn test_in_list_dictionary_int32() -> Result<()> {
2539 let dict_type =
2541 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32));
2542 let schema = Schema::new(vec![Field::new("a", dict_type.clone(), false)]);
2543 let col_a = col("a", &schema)?;
2544
2545 let list = vec![lit(100i32), lit(200i32), lit(300i32)];
2547
2548 let expr = in_list(col_a, list, &false, &schema)?;
2550
2551 let keys = Int8Array::from(vec![0, 1, 2]);
2555 let values = Int32Array::from(vec![100, 200, 500]);
2556 let dict_array: ArrayRef =
2557 Arc::new(DictionaryArray::try_new(keys, Arc::new(values))?);
2558 let batch = RecordBatch::try_new(Arc::new(schema), vec![dict_array])?;
2559
2560 let result = expr.evaluate(&batch)?.into_array(3)?;
2562 let result = as_boolean_array(&result);
2563 assert_eq!(result, &BooleanArray::from(vec![true, true, false]));
2564 Ok(())
2565 }
2566
2567 #[test]
2568 fn test_in_list_dictionary_types() -> Result<()> {
2569 fn dict_lit_int64(key_type: DataType, value: i64) -> Arc<dyn PhysicalExpr> {
2571 lit(ScalarValue::Dictionary(
2572 Box::new(key_type),
2573 Box::new(ScalarValue::Int64(Some(value))),
2574 ))
2575 }
2576
2577 fn dict_lit_float64(key_type: DataType, value: f64) -> Arc<dyn PhysicalExpr> {
2578 lit(ScalarValue::Dictionary(
2579 Box::new(key_type),
2580 Box::new(ScalarValue::Float64(Some(value))),
2581 ))
2582 }
2583
2584 struct DictNeedleTest {
2586 list_values: Vec<Arc<dyn PhysicalExpr>>,
2587 expected: Vec<Option<bool>>,
2588 }
2589
2590 struct DictionaryInListTestCase {
2591 name: &'static str,
2592 dict_type: DataType,
2593 dict_keys: Vec<Option<i8>>,
2594 dict_values: ArrayRef,
2595 list_values_no_null: Vec<Arc<dyn PhysicalExpr>>,
2596 list_values_with_null: Vec<Arc<dyn PhysicalExpr>>,
2597 expected_1: Vec<Option<bool>>,
2598 expected_2: Vec<Option<bool>>,
2599 expected_3: Vec<Option<bool>>,
2600 expected_4: Vec<Option<bool>>,
2601 dict_needle_test: Option<DictNeedleTest>,
2602 }
2603
2604 fn run_dictionary_in_list_test(
2606 test_case: DictionaryInListTestCase,
2607 ) -> Result<()> {
2608 let schema =
2610 Schema::new(vec![Field::new("a", test_case.dict_type.clone(), true)]);
2611 let col_a = col("a", &schema)?;
2612
2613 let keys = Int8Array::from(test_case.dict_keys.clone());
2615 let dict_array: ArrayRef =
2616 Arc::new(DictionaryArray::try_new(keys, test_case.dict_values)?);
2617 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![dict_array])?;
2618
2619 let exp1 = test_case.expected_1.clone();
2620 let exp2 = test_case.expected_2.clone();
2621 let exp3 = test_case.expected_3.clone();
2622 let exp4 = test_case.expected_4;
2623
2624 in_list!(
2626 batch,
2627 test_case.list_values_no_null.clone(),
2628 &false,
2629 exp1,
2630 Arc::clone(&col_a),
2631 &schema
2632 );
2633
2634 in_list!(
2636 batch,
2637 test_case.list_values_no_null.clone(),
2638 &true,
2639 exp2,
2640 Arc::clone(&col_a),
2641 &schema
2642 );
2643
2644 in_list!(
2646 batch,
2647 test_case.list_values_with_null.clone(),
2648 &false,
2649 exp3,
2650 Arc::clone(&col_a),
2651 &schema
2652 );
2653
2654 in_list!(
2656 batch,
2657 test_case.list_values_with_null,
2658 &true,
2659 exp4,
2660 Arc::clone(&col_a),
2661 &schema
2662 );
2663
2664 if let Some(needle_test) = test_case.dict_needle_test {
2666 in_list_raw!(
2667 batch,
2668 needle_test.list_values,
2669 &false,
2670 needle_test.expected,
2671 Arc::clone(&col_a),
2672 &schema
2673 );
2674 }
2675
2676 Ok(())
2677 }
2678
2679 let utf8_case = DictionaryInListTestCase {
2683 name: "dictionary_utf8",
2684 dict_type: DataType::Dictionary(
2685 Box::new(DataType::Int8),
2686 Box::new(DataType::Utf8),
2687 ),
2688 dict_keys: vec![Some(0), Some(1), None],
2689 dict_values: Arc::new(StringArray::from(vec![Some("a"), Some("d")])),
2690 list_values_no_null: vec![lit("a"), lit("b")],
2691 list_values_with_null: vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
2692 expected_1: vec![Some(true), Some(false), None],
2693 expected_2: vec![Some(false), Some(true), None],
2694 expected_3: vec![Some(true), None, None],
2695 expected_4: vec![Some(false), None, None],
2696 dict_needle_test: None,
2697 };
2698
2699 let int64_case = DictionaryInListTestCase {
2703 name: "dictionary_int64",
2704 dict_type: DataType::Dictionary(
2705 Box::new(DataType::Int8),
2706 Box::new(DataType::Int64),
2707 ),
2708 dict_keys: vec![Some(0), Some(1), None],
2709 dict_values: Arc::new(Int64Array::from(vec![Some(10), Some(20)])),
2710 list_values_no_null: vec![lit(10i64), lit(15i64)],
2711 list_values_with_null: vec![
2712 lit(10i64),
2713 lit(15i64),
2714 lit(ScalarValue::Int64(None)),
2715 ],
2716 expected_1: vec![Some(true), Some(false), None],
2717 expected_2: vec![Some(false), Some(true), None],
2718 expected_3: vec![Some(true), None, None],
2719 expected_4: vec![Some(false), None, None],
2720 dict_needle_test: Some(DictNeedleTest {
2721 list_values: vec![
2722 dict_lit_int64(DataType::Int16, 10),
2723 dict_lit_int64(DataType::Int16, 15),
2724 ],
2725 expected: vec![Some(true), Some(false), None],
2726 }),
2727 };
2728
2729 let float64_case = DictionaryInListTestCase {
2734 name: "dictionary_float64",
2735 dict_type: DataType::Dictionary(
2736 Box::new(DataType::Int8),
2737 Box::new(DataType::Float64),
2738 ),
2739 dict_keys: vec![Some(0), Some(1), None, Some(2)],
2740 dict_values: Arc::new(Float64Array::from(vec![
2741 Some(1.5), Some(3.7), Some(f64::NAN), ])),
2745 list_values_no_null: vec![lit(1.5f64), lit(2.0f64)],
2746 list_values_with_null: vec![
2747 lit(1.5f64),
2748 lit(2.0f64),
2749 lit(ScalarValue::Float64(None)),
2750 ],
2751 expected_1: vec![Some(true), Some(false), None, Some(false)],
2754 expected_2: vec![Some(false), Some(true), None, Some(true)],
2757 expected_3: vec![Some(true), None, None, None],
2760 expected_4: vec![Some(false), None, None, None],
2763 dict_needle_test: Some(DictNeedleTest {
2764 list_values: vec![
2765 dict_lit_float64(DataType::UInt16, 1.5),
2766 dict_lit_float64(DataType::UInt16, 2.0),
2767 ],
2768 expected: vec![Some(true), Some(false), None, Some(false)],
2769 }),
2770 };
2771
2772 let test_name = utf8_case.name;
2774 run_dictionary_in_list_test(utf8_case).map_err(|e| {
2775 datafusion_common::DataFusionError::Execution(format!(
2776 "Dictionary test '{test_name}' failed: {e}"
2777 ))
2778 })?;
2779
2780 let test_name = int64_case.name;
2781 run_dictionary_in_list_test(int64_case).map_err(|e| {
2782 datafusion_common::DataFusionError::Execution(format!(
2783 "Dictionary test '{test_name}' failed: {e}"
2784 ))
2785 })?;
2786
2787 let test_name = float64_case.name;
2788 run_dictionary_in_list_test(float64_case).map_err(|e| {
2789 datafusion_common::DataFusionError::Execution(format!(
2790 "Dictionary test '{test_name}' failed: {e}"
2791 ))
2792 })?;
2793
2794 let dedup_case = DictionaryInListTestCase {
2798 name: "dictionary_deduplication",
2799 dict_type: DataType::Dictionary(
2800 Box::new(DataType::Int8),
2801 Box::new(DataType::Utf8),
2802 ),
2803 dict_keys: vec![Some(0), Some(1), Some(0), Some(1), None],
2806 dict_values: Arc::new(StringArray::from(vec![Some("a"), Some("d")])),
2807 list_values_no_null: vec![lit("a"), lit("b")],
2808 list_values_with_null: vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))],
2809 expected_1: vec![Some(true), Some(false), Some(true), Some(false), None],
2812 expected_2: vec![Some(false), Some(true), Some(false), Some(true), None],
2814 expected_3: vec![Some(true), None, Some(true), None, None],
2817 expected_4: vec![Some(false), None, Some(false), None, None],
2819 dict_needle_test: None,
2820 };
2821
2822 let test_name = dedup_case.name;
2823 run_dictionary_in_list_test(dedup_case).map_err(|e| {
2824 datafusion_common::DataFusionError::Execution(format!(
2825 "Dictionary test '{test_name}' failed: {e}"
2826 ))
2827 })?;
2828
2829 let dict_type =
2831 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Float64));
2832 let schema = Schema::new(vec![Field::new("a", dict_type.clone(), true)]);
2833 let col_a = col("a", &schema)?;
2834
2835 let keys = Int8Array::from(vec![Some(0), Some(1), None, Some(2)]);
2836 let values = Float64Array::from(vec![Some(1.5), Some(3.7), Some(f64::NAN)]);
2837 let dict_array: ArrayRef =
2838 Arc::new(DictionaryArray::try_new(keys, Arc::new(values))?);
2839 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![dict_array])?;
2840
2841 let list_with_nan = vec![lit(1.5f64), lit(2.0f64), lit(f64::NAN)];
2843 in_list!(
2844 batch,
2845 list_with_nan,
2846 &false,
2847 vec![Some(true), Some(false), None, Some(true)],
2848 col_a,
2849 &schema
2850 );
2851
2852 Ok(())
2853 }
2854
2855 #[test]
2856 fn test_in_list_esoteric_types() -> Result<()> {
2857 let test_type = |data_type: DataType,
2864 in_array: ArrayRef,
2865 list_values: Vec<ScalarValue>|
2866 -> Result<()> {
2867 let schema = Schema::new(vec![Field::new("a", data_type.clone(), false)]);
2868 let col_a = col("a", &schema)?;
2869 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![in_array])?;
2870
2871 let list = list_values.into_iter().map(lit).collect();
2872 in_list!(
2873 batch,
2874 list,
2875 &false,
2876 vec![Some(true), Some(false)],
2877 col_a,
2878 &schema
2879 );
2880 Ok(())
2881 };
2882
2883 test_type(
2885 DataType::Timestamp(TimeUnit::Second, None),
2886 Arc::new(TimestampSecondArray::from(vec![Some(1000), Some(2000)])),
2887 vec![
2888 ScalarValue::TimestampSecond(Some(1000), None),
2889 ScalarValue::TimestampSecond(Some(1500), None),
2890 ],
2891 )?;
2892
2893 test_type(
2894 DataType::Timestamp(TimeUnit::Millisecond, None),
2895 Arc::new(TimestampMillisecondArray::from(vec![
2896 Some(1000000),
2897 Some(2000000),
2898 ])),
2899 vec![
2900 ScalarValue::TimestampMillisecond(Some(1000000), None),
2901 ScalarValue::TimestampMillisecond(Some(1500000), None),
2902 ],
2903 )?;
2904
2905 test_type(
2906 DataType::Timestamp(TimeUnit::Microsecond, None),
2907 Arc::new(TimestampMicrosecondArray::from(vec![
2908 Some(1000000000),
2909 Some(2000000000),
2910 ])),
2911 vec![
2912 ScalarValue::TimestampMicrosecond(Some(1000000000), None),
2913 ScalarValue::TimestampMicrosecond(Some(1500000000), None),
2914 ],
2915 )?;
2916
2917 test_type(
2919 DataType::Time32(TimeUnit::Second),
2920 Arc::new(Time32SecondArray::from(vec![Some(3600), Some(7200)])),
2921 vec![
2922 ScalarValue::Time32Second(Some(3600)),
2923 ScalarValue::Time32Second(Some(5400)),
2924 ],
2925 )?;
2926
2927 test_type(
2928 DataType::Time32(TimeUnit::Millisecond),
2929 Arc::new(Time32MillisecondArray::from(vec![
2930 Some(3600000),
2931 Some(7200000),
2932 ])),
2933 vec![
2934 ScalarValue::Time32Millisecond(Some(3600000)),
2935 ScalarValue::Time32Millisecond(Some(5400000)),
2936 ],
2937 )?;
2938
2939 test_type(
2940 DataType::Time64(TimeUnit::Microsecond),
2941 Arc::new(Time64MicrosecondArray::from(vec![
2942 Some(3600000000),
2943 Some(7200000000),
2944 ])),
2945 vec![
2946 ScalarValue::Time64Microsecond(Some(3600000000)),
2947 ScalarValue::Time64Microsecond(Some(5400000000)),
2948 ],
2949 )?;
2950
2951 test_type(
2952 DataType::Time64(TimeUnit::Nanosecond),
2953 Arc::new(Time64NanosecondArray::from(vec![
2954 Some(3600000000000),
2955 Some(7200000000000),
2956 ])),
2957 vec![
2958 ScalarValue::Time64Nanosecond(Some(3600000000000)),
2959 ScalarValue::Time64Nanosecond(Some(5400000000000)),
2960 ],
2961 )?;
2962
2963 test_type(
2965 DataType::Duration(TimeUnit::Second),
2966 Arc::new(DurationSecondArray::from(vec![Some(86400), Some(172800)])),
2967 vec![
2968 ScalarValue::DurationSecond(Some(86400)),
2969 ScalarValue::DurationSecond(Some(129600)),
2970 ],
2971 )?;
2972
2973 test_type(
2974 DataType::Duration(TimeUnit::Millisecond),
2975 Arc::new(DurationMillisecondArray::from(vec![
2976 Some(86400000),
2977 Some(172800000),
2978 ])),
2979 vec![
2980 ScalarValue::DurationMillisecond(Some(86400000)),
2981 ScalarValue::DurationMillisecond(Some(129600000)),
2982 ],
2983 )?;
2984
2985 test_type(
2986 DataType::Duration(TimeUnit::Microsecond),
2987 Arc::new(DurationMicrosecondArray::from(vec![
2988 Some(86400000000),
2989 Some(172800000000),
2990 ])),
2991 vec![
2992 ScalarValue::DurationMicrosecond(Some(86400000000)),
2993 ScalarValue::DurationMicrosecond(Some(129600000000)),
2994 ],
2995 )?;
2996
2997 test_type(
2998 DataType::Duration(TimeUnit::Nanosecond),
2999 Arc::new(DurationNanosecondArray::from(vec![
3000 Some(86400000000000),
3001 Some(172800000000000),
3002 ])),
3003 vec![
3004 ScalarValue::DurationNanosecond(Some(86400000000000)),
3005 ScalarValue::DurationNanosecond(Some(129600000000000)),
3006 ],
3007 )?;
3008
3009 test_type(
3011 DataType::Interval(IntervalUnit::YearMonth),
3012 Arc::new(IntervalYearMonthArray::from(vec![Some(12), Some(24)])),
3013 vec![
3014 ScalarValue::IntervalYearMonth(Some(12)),
3015 ScalarValue::IntervalYearMonth(Some(18)),
3016 ],
3017 )?;
3018
3019 test_type(
3020 DataType::Interval(IntervalUnit::DayTime),
3021 Arc::new(IntervalDayTimeArray::from(vec![
3022 Some(IntervalDayTime {
3023 days: 1,
3024 milliseconds: 0,
3025 }),
3026 Some(IntervalDayTime {
3027 days: 2,
3028 milliseconds: 0,
3029 }),
3030 ])),
3031 vec![
3032 ScalarValue::IntervalDayTime(Some(IntervalDayTime {
3033 days: 1,
3034 milliseconds: 0,
3035 })),
3036 ScalarValue::IntervalDayTime(Some(IntervalDayTime {
3037 days: 1,
3038 milliseconds: 500,
3039 })),
3040 ],
3041 )?;
3042
3043 test_type(
3044 DataType::Interval(IntervalUnit::MonthDayNano),
3045 Arc::new(IntervalMonthDayNanoArray::from(vec![
3046 Some(IntervalMonthDayNano {
3047 months: 1,
3048 days: 0,
3049 nanoseconds: 0,
3050 }),
3051 Some(IntervalMonthDayNano {
3052 months: 2,
3053 days: 0,
3054 nanoseconds: 0,
3055 }),
3056 ])),
3057 vec![
3058 ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano {
3059 months: 1,
3060 days: 0,
3061 nanoseconds: 0,
3062 })),
3063 ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano {
3064 months: 1,
3065 days: 15,
3066 nanoseconds: 0,
3067 })),
3068 ],
3069 )?;
3070
3071 let precision = 38;
3074 let scale = 10;
3075 test_type(
3076 DataType::Decimal256(precision, scale),
3077 Arc::new(
3078 Decimal256Array::from(vec![
3079 Some(i256::from(12345)),
3080 Some(i256::from(67890)),
3081 ])
3082 .with_precision_and_scale(precision, scale)?,
3083 ),
3084 vec![
3085 ScalarValue::Decimal256(Some(i256::from(12345)), precision, scale),
3086 ScalarValue::Decimal256(Some(i256::from(54321)), precision, scale),
3087 ],
3088 )?;
3089
3090 Ok(())
3091 }
3092
3093 fn make_in_list_with_columns(
3096 expr: Arc<dyn PhysicalExpr>,
3097 list: Vec<Arc<dyn PhysicalExpr>>,
3098 negated: bool,
3099 ) -> Arc<InListExpr> {
3100 Arc::new(InListExpr::new(expr, list, negated, None))
3101 }
3102
3103 #[test]
3104 fn test_in_list_with_columns_int32_scalars() -> Result<()> {
3105 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
3107 let col_a = col("a", &schema)?;
3108 let batch = RecordBatch::try_new(
3109 Arc::new(schema),
3110 vec![Arc::new(Int32Array::from(vec![
3111 Some(1),
3112 Some(2),
3113 Some(3),
3114 None,
3115 ]))],
3116 )?;
3117
3118 let list = vec![
3119 lit(ScalarValue::Int32(Some(1))),
3120 lit(ScalarValue::Int32(Some(3))),
3121 ];
3122 let expr = make_in_list_with_columns(col_a, list, false);
3123
3124 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3125 let result = as_boolean_array(&result);
3126 assert_eq!(
3127 result,
3128 &BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
3129 );
3130 Ok(())
3131 }
3132
3133 #[test]
3134 fn test_in_list_with_columns_int32_column_refs() -> Result<()> {
3135 let schema = Schema::new(vec![
3137 Field::new("a", DataType::Int32, true),
3138 Field::new("b", DataType::Int32, true),
3139 Field::new("c", DataType::Int32, true),
3140 ]);
3141 let batch = RecordBatch::try_new(
3142 Arc::new(schema.clone()),
3143 vec![
3144 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), None])),
3145 Arc::new(Int32Array::from(vec![
3146 Some(1),
3147 Some(99),
3148 Some(99),
3149 Some(99),
3150 ])),
3151 Arc::new(Int32Array::from(vec![Some(99), Some(99), Some(3), None])),
3152 ],
3153 )?;
3154
3155 let col_a = col("a", &schema)?;
3156 let list = vec![col("b", &schema)?, col("c", &schema)?];
3157 let expr = make_in_list_with_columns(col_a, list, false);
3158
3159 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3160 let result = as_boolean_array(&result);
3161 assert_eq!(
3166 result,
3167 &BooleanArray::from(vec![Some(true), Some(false), Some(true), None,])
3168 );
3169 Ok(())
3170 }
3171
3172 #[test]
3173 fn test_in_list_with_columns_utf8_column_refs() -> Result<()> {
3174 let schema = Schema::new(vec![
3176 Field::new("a", DataType::Utf8, false),
3177 Field::new("b", DataType::Utf8, false),
3178 ]);
3179 let batch = RecordBatch::try_new(
3180 Arc::new(schema.clone()),
3181 vec![
3182 Arc::new(StringArray::from(vec!["x", "y", "z"])),
3183 Arc::new(StringArray::from(vec!["x", "x", "z"])),
3184 ],
3185 )?;
3186
3187 let col_a = col("a", &schema)?;
3188 let list = vec![col("b", &schema)?];
3189 let expr = make_in_list_with_columns(col_a, list, false);
3190
3191 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3192 let result = as_boolean_array(&result);
3193 assert_eq!(result, &BooleanArray::from(vec![true, false, true]));
3197 Ok(())
3198 }
3199
3200 #[test]
3201 fn test_in_list_with_columns_negated() -> Result<()> {
3202 let schema = Schema::new(vec![
3204 Field::new("a", DataType::Int32, false),
3205 Field::new("b", DataType::Int32, false),
3206 ]);
3207 let batch = RecordBatch::try_new(
3208 Arc::new(schema.clone()),
3209 vec![
3210 Arc::new(Int32Array::from(vec![1, 2, 3])),
3211 Arc::new(Int32Array::from(vec![1, 99, 3])),
3212 ],
3213 )?;
3214
3215 let col_a = col("a", &schema)?;
3216 let list = vec![col("b", &schema)?];
3217 let expr = make_in_list_with_columns(col_a, list, true);
3218
3219 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3220 let result = as_boolean_array(&result);
3221 assert_eq!(result, &BooleanArray::from(vec![false, true, false]));
3225 Ok(())
3226 }
3227
3228 #[test]
3229 fn test_in_list_with_columns_null_in_list() -> Result<()> {
3230 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3232 let col_a = col("a", &schema)?;
3233 let batch = RecordBatch::try_new(
3234 Arc::new(schema),
3235 vec![Arc::new(Int32Array::from(vec![1, 2]))],
3236 )?;
3237
3238 let list = vec![
3239 lit(ScalarValue::Int32(None)),
3240 lit(ScalarValue::Int32(Some(1))),
3241 ];
3242 let expr = make_in_list_with_columns(col_a, list, false);
3243
3244 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3245 let result = as_boolean_array(&result);
3246 assert_eq!(result, &BooleanArray::from(vec![Some(true), None]));
3249 Ok(())
3250 }
3251
3252 #[test]
3253 fn test_in_list_with_columns_float_nan() -> Result<()> {
3254 let schema = Schema::new(vec![
3257 Field::new("a", DataType::Float64, false),
3258 Field::new("b", DataType::Float64, false),
3259 ]);
3260 let batch = RecordBatch::try_new(
3261 Arc::new(schema.clone()),
3262 vec![
3263 Arc::new(Float64Array::from(vec![f64::NAN, 1.0, f64::NAN])),
3264 Arc::new(Float64Array::from(vec![f64::NAN, 2.0, 0.0])),
3265 ],
3266 )?;
3267
3268 let col_a = col("a", &schema)?;
3269 let list = vec![col("b", &schema)?];
3270 let expr = make_in_list_with_columns(col_a, list, false);
3271
3272 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3273 let result = as_boolean_array(&result);
3274 assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
3278 Ok(())
3279 }
3280
3281 #[test]
3285 fn test_in_list_with_columns_short_circuit() -> Result<()> {
3286 let schema = Schema::new(vec![
3289 Field::new("a", DataType::Int32, false),
3290 Field::new("b", DataType::Int32, false),
3291 Field::new("c", DataType::Int32, false),
3292 ]);
3293 let batch = RecordBatch::try_new(
3294 Arc::new(schema.clone()),
3295 vec![
3296 Arc::new(Int32Array::from(vec![1, 2, 3])),
3297 Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(Int32Array::from(vec![99, 99, 99])),
3299 ],
3300 )?;
3301
3302 let col_a = col("a", &schema)?;
3303 let list = vec![col("b", &schema)?, col("c", &schema)?];
3304 let expr = make_in_list_with_columns(col_a, list, false);
3305
3306 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3307 let result = as_boolean_array(&result);
3308 assert_eq!(result, &BooleanArray::from(vec![true, true, true]));
3309 Ok(())
3310 }
3311
3312 #[test]
3315 fn test_in_list_with_columns_short_circuit_with_nulls() -> Result<()> {
3316 let schema = Schema::new(vec![
3319 Field::new("a", DataType::Int32, true),
3320 Field::new("b", DataType::Int32, false),
3321 Field::new("c", DataType::Int32, false),
3322 ]);
3323 let batch = RecordBatch::try_new(
3324 Arc::new(schema.clone()),
3325 vec![
3326 Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
3327 Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(Int32Array::from(vec![99, 99, 99])),
3329 ],
3330 )?;
3331
3332 let col_a = col("a", &schema)?;
3333 let list = vec![col("b", &schema)?, col("c", &schema)?];
3334 let expr = make_in_list_with_columns(col_a, list, false);
3335
3336 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3337 let result = as_boolean_array(&result);
3338 assert_eq!(
3342 result,
3343 &BooleanArray::from(vec![Some(true), None, Some(true)])
3344 );
3345 Ok(())
3346 }
3347
3348 #[test]
3351 fn test_in_list_with_columns_struct() -> Result<()> {
3352 let struct_fields = Fields::from(vec![
3353 Field::new("x", DataType::Int32, false),
3354 Field::new("y", DataType::Utf8, false),
3355 ]);
3356 let struct_dt = DataType::Struct(struct_fields.clone());
3357
3358 let schema = Schema::new(vec![
3359 Field::new("a", struct_dt.clone(), true),
3360 Field::new("b", struct_dt.clone(), false),
3361 Field::new("c", struct_dt.clone(), false),
3362 ]);
3363
3364 let a = Arc::new(StructArray::new(
3368 struct_fields.clone(),
3369 vec![
3370 Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
3371 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
3372 ],
3373 Some(vec![true, true, false, true].into()),
3374 ));
3375 let b = Arc::new(StructArray::new(
3376 struct_fields.clone(),
3377 vec![
3378 Arc::new(Int32Array::from(vec![1, 9, 3, 4])),
3379 Arc::new(StringArray::from(vec!["a", "z", "c", "d"])),
3380 ],
3381 None,
3382 ));
3383 let c = Arc::new(StructArray::new(
3384 struct_fields.clone(),
3385 vec![
3386 Arc::new(Int32Array::from(vec![9, 2, 9, 9])),
3387 Arc::new(StringArray::from(vec!["z", "b", "z", "z"])),
3388 ],
3389 None,
3390 ));
3391
3392 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b, c])?;
3393
3394 let col_a = col("a", &schema)?;
3395 let list = vec![col("b", &schema)?, col("c", &schema)?];
3396 let expr = make_in_list_with_columns(col_a, list, false);
3397
3398 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3399 let result = as_boolean_array(&result);
3400 assert_eq!(
3405 result,
3406 &BooleanArray::from(vec![Some(true), Some(true), None, Some(true)])
3407 );
3408
3409 let col_a = col("a", &schema)?;
3411 let list = vec![col("b", &schema)?, col("c", &schema)?];
3412 let expr = make_in_list_with_columns(col_a, list, true);
3413
3414 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3415 let result = as_boolean_array(&result);
3416 assert_eq!(
3421 result,
3422 &BooleanArray::from(vec![Some(false), Some(false), None, Some(false)])
3423 );
3424 Ok(())
3425 }
3426
3427 fn wrap_in_dict(array: ArrayRef) -> ArrayRef {
3439 let keys = Int32Array::from((0..array.len() as i32).collect::<Vec<_>>());
3440 Arc::new(DictionaryArray::new(keys, array))
3441 }
3442
3443 fn eval_in_list_from_array(
3447 needle: ArrayRef,
3448 in_array: ArrayRef,
3449 ) -> Result<BooleanArray> {
3450 let schema =
3451 Schema::new(vec![Field::new("a", needle.data_type().clone(), false)]);
3452 let col_a = col("a", &schema)?;
3453 let expr = Arc::new(InListExpr::try_new_from_array(
3454 col_a, in_array, false, &schema,
3455 )?) as Arc<dyn PhysicalExpr>;
3456 let batch = RecordBatch::try_new(Arc::new(schema), vec![needle])?;
3457 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3458 Ok(as_boolean_array(&result).clone())
3459 }
3460
3461 #[test]
3462 fn test_in_list_from_array_type_combinations() -> Result<()> {
3463 use arrow::compute::cast;
3464
3465 let expected = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
3467
3468 let base_in = Arc::new(Int64Array::from(vec![1i64, 2, 3])) as ArrayRef;
3470 let base_needle = Arc::new(Int64Array::from(vec![1i64, 4, 2])) as ArrayRef;
3471
3472 let primitive_types = vec![
3474 DataType::Int8,
3475 DataType::Int16,
3476 DataType::Int32,
3477 DataType::Int64,
3478 DataType::UInt8,
3479 DataType::UInt16,
3480 DataType::UInt32,
3481 DataType::UInt64,
3482 DataType::Float32,
3483 DataType::Float64,
3484 ];
3485
3486 for dt in &primitive_types {
3487 let in_array = cast(&base_in, dt)?;
3488 let needle = cast(&base_needle, dt)?;
3489
3490 assert_eq!(
3492 expected,
3493 eval_in_list_from_array(Arc::clone(&needle), Arc::clone(&in_array))?,
3494 "same-type failed for {dt:?}"
3495 );
3496
3497 assert_eq!(
3499 expected,
3500 eval_in_list_from_array(wrap_in_dict(needle), in_array)?,
3501 "dict-needle failed for {dt:?}"
3502 );
3503 }
3504
3505 let utf8_in = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
3507 let utf8_needle = Arc::new(StringArray::from(vec!["a", "d", "b"])) as ArrayRef;
3508
3509 assert_eq!(
3511 expected,
3512 eval_in_list_from_array(Arc::clone(&utf8_needle), Arc::clone(&utf8_in),)?
3513 );
3514
3515 assert_eq!(
3517 expected,
3518 eval_in_list_from_array(
3519 wrap_in_dict(Arc::clone(&utf8_needle)),
3520 Arc::clone(&utf8_in),
3521 )?
3522 );
3523
3524 assert_eq!(
3526 expected,
3527 eval_in_list_from_array(
3528 wrap_in_dict(Arc::clone(&utf8_needle)),
3529 wrap_in_dict(Arc::clone(&utf8_in)),
3530 )?
3531 );
3532
3533 let struct_fields = Fields::from(vec![
3535 Field::new("c0", DataType::Utf8, true),
3536 Field::new("c1", DataType::Int64, true),
3537 ]);
3538 let make_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
3539 let pairs: Vec<(FieldRef, ArrayRef)> =
3540 struct_fields.iter().cloned().zip([c0, c1]).collect();
3541 Arc::new(StructArray::from(pairs))
3542 };
3543 assert_eq!(
3544 expected,
3545 eval_in_list_from_array(
3546 make_struct(
3547 Arc::clone(&utf8_needle),
3548 Arc::new(Int64Array::from(vec![1, 4, 2])),
3549 ),
3550 make_struct(
3551 Arc::clone(&utf8_in),
3552 Arc::new(Int64Array::from(vec![1, 2, 3])),
3553 ),
3554 )?
3555 );
3556
3557 let dict_struct_fields = Fields::from(vec![
3559 Field::new(
3560 "c0",
3561 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
3562 true,
3563 ),
3564 Field::new("c1", DataType::Int64, true),
3565 ]);
3566 let make_dict_struct = |c0: ArrayRef, c1: ArrayRef| -> ArrayRef {
3567 let pairs: Vec<(FieldRef, ArrayRef)> =
3568 dict_struct_fields.iter().cloned().zip([c0, c1]).collect();
3569 Arc::new(StructArray::from(pairs))
3570 };
3571 assert_eq!(
3572 expected,
3573 eval_in_list_from_array(
3574 make_dict_struct(
3575 wrap_in_dict(Arc::clone(&utf8_needle)),
3576 Arc::new(Int64Array::from(vec![1, 4, 2])),
3577 ),
3578 make_dict_struct(
3579 wrap_in_dict(Arc::clone(&utf8_in)),
3580 Arc::new(Int64Array::from(vec![1, 2, 3])),
3581 ),
3582 )?
3583 );
3584
3585 Ok(())
3586 }
3587
3588 fn make_int32_dict_array(values: Vec<Option<i32>>) -> ArrayRef {
3589 let mut builder = PrimitiveDictionaryBuilder::<Int8Type, Int32Type>::new();
3590 for v in values {
3591 match v {
3592 Some(val) => builder.append_value(val),
3593 None => builder.append_null(),
3594 }
3595 }
3596 Arc::new(builder.finish())
3597 }
3598
3599 fn make_f64_dict_array(values: Vec<Option<f64>>) -> ArrayRef {
3600 let mut builder = PrimitiveDictionaryBuilder::<Int8Type, Float64Type>::new();
3601 for v in values {
3602 match v {
3603 Some(val) => builder.append_value(val),
3604 None => builder.append_null(),
3605 }
3606 }
3607 Arc::new(builder.finish())
3608 }
3609
3610 #[test]
3611 fn test_try_new_from_array_dict_haystack_int32() -> Result<()> {
3612 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3613 let needle = Int32Array::from(vec![1, 2, 3, 4]);
3614 let batch =
3615 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(needle)])?;
3616
3617 let haystack = make_int32_dict_array(vec![Some(1), None, Some(3)]);
3618
3619 let col_a = col("a", &schema)?;
3620 let expr = InListExpr::try_new_from_array(col_a, haystack, false, &schema)?;
3621 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3622 let result = as_boolean_array(&result);
3623 assert_eq!(
3624 result,
3625 &BooleanArray::from(vec![Some(true), None, Some(true), None])
3626 );
3627
3628 Ok(())
3629 }
3630
3631 #[test]
3632 fn test_in_list_from_array_type_mismatch_errors() -> Result<()> {
3633 assert_eq!(
3635 BooleanArray::from(vec![Some(true), Some(false), Some(true)]),
3636 eval_in_list_from_array(
3637 Arc::new(StringArray::from(vec!["a", "d", "b"])),
3638 wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
3639 )?
3640 );
3641
3642 let err = eval_in_list_from_array(
3644 wrap_in_dict(Arc::new(StringArray::from(vec!["a", "d", "b"]))),
3645 Arc::new(Int64Array::from(vec![1, 2, 3])),
3646 )
3647 .unwrap_err()
3648 .to_string();
3649 assert!(err.contains("The data type inlist should be same"), "{err}");
3650
3651 let err = eval_in_list_from_array(
3654 wrap_in_dict(Arc::new(Int64Array::from(vec![1, 4, 2]))),
3655 wrap_in_dict(Arc::new(StringArray::from(vec!["a", "b", "c"]))),
3656 )
3657 .unwrap_err()
3658 .to_string();
3659 assert!(err.contains("The data type inlist should be same"), "{err}");
3660
3661 Ok(())
3662 }
3663
3664 #[test]
3665 fn test_try_new_from_array_dict_haystack_negated() -> Result<()> {
3666 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3667 let needle = Int32Array::from(vec![1, 2, 3, 4]);
3668 let batch =
3669 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(needle)])?;
3670
3671 let haystack = make_int32_dict_array(vec![Some(1), None, Some(3)]);
3672
3673 let col_a = col("a", &schema)?;
3674 let expr = InListExpr::try_new_from_array(col_a, haystack, true, &schema)?;
3675 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3676 let result = as_boolean_array(&result);
3677 assert_eq!(
3678 result,
3679 &BooleanArray::from(vec![Some(false), None, Some(false), None])
3680 );
3681
3682 Ok(())
3683 }
3684
3685 #[test]
3686 fn test_try_new_from_array_dict_haystack_utf8() -> Result<()> {
3687 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
3688 let needle = StringArray::from(vec!["a", "b", "c"]);
3689 let batch =
3690 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(needle)])?;
3691
3692 let dict_builder = StringDictionaryBuilder::<Int8Type>::new();
3693 let mut builder = dict_builder;
3694 builder.append_value("a");
3695 builder.append_value("c");
3696 let haystack: ArrayRef = Arc::new(builder.finish());
3697
3698 let col_a = col("a", &schema)?;
3699 let expr = InListExpr::try_new_from_array(col_a, haystack, false, &schema)?;
3700 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3701 let result = as_boolean_array(&result);
3702 assert_eq!(
3703 result,
3704 &BooleanArray::from(vec![Some(true), Some(false), Some(true)])
3705 );
3706
3707 Ok(())
3708 }
3709
3710 #[test]
3711 fn test_try_new_from_array_dict_needle_and_plain_haystack() -> Result<()> {
3712 let schema = Schema::new(vec![Field::new(
3713 "a",
3714 DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Int32)),
3715 false,
3716 )]);
3717
3718 let needle = make_int32_dict_array(vec![Some(1), Some(2), Some(3), Some(4)]);
3719 let batch =
3720 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::clone(&needle)])?;
3721
3722 let haystack: ArrayRef = Arc::new(Int32Array::from(vec![1, 3]));
3723 let col_a = col("a", &schema)?;
3724 let expr = InListExpr::try_new_from_array(col_a, haystack, false, &schema)?;
3725 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3726 let result = as_boolean_array(&result);
3727 assert_eq!(
3728 result,
3729 &BooleanArray::from(vec![Some(true), Some(false), Some(true), Some(false)])
3730 );
3731
3732 Ok(())
3733 }
3734
3735 #[test]
3736 fn test_try_new_from_array_dict_haystack_float64() -> Result<()> {
3737 let schema = Schema::new(vec![Field::new("a", DataType::Float64, false)]);
3738 let needle = Float64Array::from(vec![1.0, 2.0, 3.0]);
3739 let batch =
3740 RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(needle)])?;
3741
3742 let haystack = make_f64_dict_array(vec![Some(1.0), Some(3.0)]);
3743
3744 let col_a = col("a", &schema)?;
3745 let expr = InListExpr::try_new_from_array(col_a, haystack, false, &schema)?;
3746 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3747 let result = as_boolean_array(&result);
3748 assert_eq!(
3749 result,
3750 &BooleanArray::from(vec![Some(true), Some(false), Some(true)])
3751 );
3752
3753 Ok(())
3754 }
3755
3756 #[test]
3757 fn test_try_new_from_array_type_mismatch_rejects() -> Result<()> {
3758 let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3759 let col_a = col("a", &schema)?;
3760 let haystack: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0]));
3761
3762 let result = InListExpr::try_new_from_array(col_a, haystack, false, &schema);
3763 assert!(result.is_err());
3764 Ok(())
3765 }
3766
3767 #[test]
3768 fn test_try_new_from_array_struct_haystack() -> Result<()> {
3769 let struct_fields = Fields::from(vec![
3770 Field::new("x", DataType::Int32, false),
3771 Field::new("y", DataType::Utf8, false),
3772 ]);
3773 let struct_dt = DataType::Struct(struct_fields.clone());
3774 let schema = Schema::new(vec![Field::new("a", struct_dt, true)]);
3775
3776 let needle = Arc::new(StructArray::new(
3778 struct_fields.clone(),
3779 vec![
3780 Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
3781 Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
3782 ],
3783 Some(vec![true, true, false, true].into()),
3784 ));
3785 let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![needle])?;
3786
3787 let haystack: ArrayRef = Arc::new(StructArray::new(
3789 struct_fields,
3790 vec![
3791 Arc::new(Int32Array::from(vec![1, 4])),
3792 Arc::new(StringArray::from(vec!["a", "d"])),
3793 ],
3794 None,
3795 ));
3796
3797 let col_a = col("a", &schema)?;
3798 let expr = InListExpr::try_new_from_array(
3799 Arc::clone(&col_a),
3800 Arc::clone(&haystack),
3801 false,
3802 &schema,
3803 )?;
3804 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3805 let result = as_boolean_array(&result);
3806 assert_eq!(
3808 result,
3809 &BooleanArray::from(vec![Some(true), Some(false), None, Some(true)])
3810 );
3811
3812 let expr = InListExpr::try_new_from_array(col_a, haystack, true, &schema)?;
3814 let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
3815 let result = as_boolean_array(&result);
3816 assert_eq!(
3817 result,
3818 &BooleanArray::from(vec![Some(false), Some(true), None, Some(false)])
3819 );
3820
3821 Ok(())
3822 }
3823}