1use crate::filter::{SlicesIterator, prep_null_mask_filter};
21use arrow_array::cast::AsArray;
22use arrow_array::types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, Utf8Type};
23use arrow_array::*;
24use arrow_buffer::{
25 BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, OffsetBufferBuilder,
26 ScalarBuffer,
27};
28use arrow_data::ArrayData;
29use arrow_data::transform::MutableArrayData;
30use arrow_schema::{ArrowError, DataType};
31use std::fmt::{Debug, Formatter};
32use std::hash::Hash;
33use std::marker::PhantomData;
34use std::ops::Not;
35use std::sync::Arc;
36
37pub fn zip(
97 mask: &BooleanArray,
98 truthy: &dyn Datum,
99 falsy: &dyn Datum,
100) -> Result<ArrayRef, ArrowError> {
101 let (truthy_array, truthy_is_scalar) = truthy.get();
102 let (falsy_array, falsy_is_scalar) = falsy.get();
103
104 if falsy_is_scalar && truthy_is_scalar {
105 let zipper = ScalarZipper::try_new(truthy, falsy)?;
106 return zipper.zip_impl.create_output(mask);
107 }
108
109 let truthy = truthy_array;
110 let falsy = falsy_array;
111
112 if truthy.data_type() != falsy.data_type() {
113 return Err(ArrowError::InvalidArgumentError(
114 "arguments need to have the same data type".into(),
115 ));
116 }
117
118 if truthy_is_scalar && truthy.len() != 1 {
119 return Err(ArrowError::InvalidArgumentError(
120 "scalar arrays must have 1 element".into(),
121 ));
122 }
123 if !truthy_is_scalar && truthy.len() != mask.len() {
124 return Err(ArrowError::InvalidArgumentError(
125 "all arrays should have the same length".into(),
126 ));
127 }
128 if falsy_is_scalar && falsy.len() != 1 {
129 return Err(ArrowError::InvalidArgumentError(
130 "scalar arrays must have 1 element".into(),
131 ));
132 }
133 if !falsy_is_scalar && falsy.len() != mask.len() {
134 return Err(ArrowError::InvalidArgumentError(
135 "all arrays should have the same length".into(),
136 ));
137 }
138
139 let falsy = falsy.to_data();
140 let truthy = truthy.to_data();
141
142 zip_impl(mask, &truthy, truthy_is_scalar, &falsy, falsy_is_scalar)
143}
144
145fn zip_impl(
146 mask: &BooleanArray,
147 truthy: &ArrayData,
148 truthy_is_scalar: bool,
149 falsy: &ArrayData,
150 falsy_is_scalar: bool,
151) -> Result<ArrayRef, ArrowError> {
152 let mut mutable = MutableArrayData::new(vec![truthy, falsy], false, truthy.len());
153
154 let mut filled = 0;
159
160 let mask_buffer = maybe_prep_null_mask_filter(mask);
161 SlicesIterator::from(&mask_buffer).for_each(|(start, end)| {
162 if start > filled {
164 if falsy_is_scalar {
165 for _ in filled..start {
166 mutable.extend(1, 0, 1);
168 }
169 } else {
170 mutable.extend(1, filled, start);
171 }
172 }
173 if truthy_is_scalar {
175 for _ in start..end {
176 mutable.extend(0, 0, 1);
178 }
179 } else {
180 mutable.extend(0, start, end);
181 }
182 filled = end;
183 });
184 if filled < mask.len() {
186 if falsy_is_scalar {
187 for _ in filled..mask.len() {
188 mutable.extend(1, 0, 1);
190 }
191 } else {
192 mutable.extend(1, filled, mask.len());
193 }
194 }
195
196 let data = mutable.freeze();
197 Ok(make_array(data))
198}
199
200#[derive(Debug, Clone)]
222pub struct ScalarZipper {
223 zip_impl: Arc<dyn ZipImpl>,
224}
225
226impl ScalarZipper {
227 pub fn try_new(truthy: &dyn Datum, falsy: &dyn Datum) -> Result<Self, ArrowError> {
235 let (truthy, truthy_is_scalar) = truthy.get();
236 let (falsy, falsy_is_scalar) = falsy.get();
237
238 if truthy.data_type() != falsy.data_type() {
239 return Err(ArrowError::InvalidArgumentError(
240 "arguments need to have the same data type".into(),
241 ));
242 }
243
244 if !truthy_is_scalar {
245 return Err(ArrowError::InvalidArgumentError(
246 "only scalar arrays are supported".into(),
247 ));
248 }
249
250 if !falsy_is_scalar {
251 return Err(ArrowError::InvalidArgumentError(
252 "only scalar arrays are supported".into(),
253 ));
254 }
255
256 if truthy.len() != 1 {
257 return Err(ArrowError::InvalidArgumentError(
258 "scalar arrays must have 1 element".into(),
259 ));
260 }
261 if falsy.len() != 1 {
262 return Err(ArrowError::InvalidArgumentError(
263 "scalar arrays must have 1 element".into(),
264 ));
265 }
266
267 macro_rules! primitive_size_helper {
268 ($t:ty) => {
269 Arc::new(PrimitiveScalarImpl::<$t>::new(truthy, falsy)) as Arc<dyn ZipImpl>
270 };
271 }
272
273 let zip_impl = downcast_primitive! {
274 truthy.data_type() => (primitive_size_helper),
275 DataType::Utf8 => {
276 Arc::new(BytesScalarImpl::<Utf8Type>::new(truthy, falsy)) as Arc<dyn ZipImpl>
277 },
278 DataType::LargeUtf8 => {
279 Arc::new(BytesScalarImpl::<LargeUtf8Type>::new(truthy, falsy)) as Arc<dyn ZipImpl>
280 },
281 DataType::Binary => {
282 Arc::new(BytesScalarImpl::<BinaryType>::new(truthy, falsy)) as Arc<dyn ZipImpl>
283 },
284 DataType::LargeBinary => {
285 Arc::new(BytesScalarImpl::<LargeBinaryType>::new(truthy, falsy)) as Arc<dyn ZipImpl>
286 },
287 _ => {
289 Arc::new(FallbackImpl::new(truthy, falsy)) as Arc<dyn ZipImpl>
290 },
291 };
292
293 Ok(Self { zip_impl })
294 }
295
296 pub fn zip(&self, mask: &BooleanArray) -> Result<ArrayRef, ArrowError> {
299 self.zip_impl.create_output(mask)
300 }
301}
302
303trait ZipImpl: Debug + Send + Sync {
305 fn create_output(&self, input: &BooleanArray) -> Result<ArrayRef, ArrowError>;
307}
308
309#[derive(Debug, PartialEq)]
310struct FallbackImpl {
311 truthy: ArrayData,
312 falsy: ArrayData,
313}
314
315impl FallbackImpl {
316 fn new(left: &dyn Array, right: &dyn Array) -> Self {
317 Self {
318 truthy: left.to_data(),
319 falsy: right.to_data(),
320 }
321 }
322}
323
324impl ZipImpl for FallbackImpl {
325 fn create_output(&self, predicate: &BooleanArray) -> Result<ArrayRef, ArrowError> {
326 zip_impl(predicate, &self.truthy, true, &self.falsy, true)
327 }
328}
329
330struct PrimitiveScalarImpl<T: ArrowPrimitiveType> {
331 data_type: DataType,
332 truthy: Option<T::Native>,
333 falsy: Option<T::Native>,
334}
335
336impl<T: ArrowPrimitiveType> Debug for PrimitiveScalarImpl<T> {
337 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
338 f.debug_struct("PrimitiveScalarImpl")
339 .field("data_type", &self.data_type)
340 .field("truthy", &self.truthy)
341 .field("falsy", &self.falsy)
342 .finish()
343 }
344}
345
346impl<T: ArrowPrimitiveType> PrimitiveScalarImpl<T> {
347 fn new(truthy: &dyn Array, falsy: &dyn Array) -> Self {
348 Self {
349 data_type: truthy.data_type().clone(),
350 truthy: Self::get_value_from_scalar(truthy),
351 falsy: Self::get_value_from_scalar(falsy),
352 }
353 }
354
355 fn get_value_from_scalar(scalar: &dyn Array) -> Option<T::Native> {
356 if scalar.is_null(0) {
357 None
358 } else {
359 let value = scalar.as_primitive::<T>().value(0);
360
361 Some(value)
362 }
363 }
364
365 fn get_scalar_and_null_buffer_for_single_non_nullable(
369 predicate: BooleanBuffer,
370 value: T::Native,
371 ) -> (Vec<T::Native>, Option<NullBuffer>) {
372 let result_len = predicate.len();
373 let nulls = NullBuffer::new(predicate);
374 let scalars = vec![value; result_len];
375
376 (scalars, Some(nulls))
377 }
378}
379
380impl<T: ArrowPrimitiveType> ZipImpl for PrimitiveScalarImpl<T> {
381 fn create_output(&self, predicate: &BooleanArray) -> Result<ArrayRef, ArrowError> {
382 let result_len = predicate.len();
383 let predicate = maybe_prep_null_mask_filter(predicate);
385
386 let (scalars, nulls): (Vec<T::Native>, Option<NullBuffer>) = match (self.truthy, self.falsy)
387 {
388 (Some(truthy_val), Some(falsy_val)) => {
389 let scalars: Vec<T::Native> = predicate
390 .iter()
391 .map(|b| if b { truthy_val } else { falsy_val })
392 .collect();
393
394 (scalars, None)
395 }
396 (Some(truthy_val), None) => {
397 Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, truthy_val)
401 }
402 (None, Some(falsy_val)) => {
403 let predicate = predicate.not();
408
409 Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, falsy_val)
410 }
411 (None, None) => {
412 let nulls = NullBuffer::new_null(result_len);
414 let scalars = vec![T::default_value(); result_len];
415
416 (scalars, Some(nulls))
417 }
418 };
419
420 let scalars = ScalarBuffer::<T::Native>::from(scalars);
421 let output = PrimitiveArray::<T>::try_new(scalars, nulls)?;
422
423 let output = output.with_data_type(self.data_type.clone());
425
426 Ok(Arc::new(output))
427 }
428}
429
430#[derive(PartialEq, Hash)]
431struct BytesScalarImpl<T: ByteArrayType> {
432 truthy: Option<Vec<u8>>,
433 falsy: Option<Vec<u8>>,
434 phantom: PhantomData<T>,
435}
436
437impl<T: ByteArrayType> Debug for BytesScalarImpl<T> {
438 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
439 f.debug_struct("BytesScalarImpl")
440 .field("truthy", &self.truthy)
441 .field("falsy", &self.falsy)
442 .finish()
443 }
444}
445
446impl<T: ByteArrayType> BytesScalarImpl<T> {
447 fn new(truthy_value: &dyn Array, falsy_value: &dyn Array) -> Self {
448 Self {
449 truthy: Self::get_value_from_scalar(truthy_value),
450 falsy: Self::get_value_from_scalar(falsy_value),
451 phantom: PhantomData,
452 }
453 }
454
455 fn get_value_from_scalar(scalar: &dyn Array) -> Option<Vec<u8>> {
456 if scalar.is_null(0) {
457 None
458 } else {
459 let bytes: &[u8] = scalar.as_bytes::<T>().value(0).as_ref();
460
461 Some(bytes.to_vec())
462 }
463 }
464
465 fn get_scalar_and_null_buffer_for_single_non_nullable(
469 predicate: BooleanBuffer,
470 value: &[u8],
471 ) -> (Buffer, OffsetBuffer<T::Offset>, Option<NullBuffer>) {
472 let value_length = value.len();
473
474 let number_of_true = predicate.count_set_bits();
475
476 if number_of_true == 0 {
478 let nulls = NullBuffer::new_null(predicate.len());
480
481 return (
482 Buffer::from(&[]),
484 OffsetBuffer::<T::Offset>::new_zeroed(predicate.len()),
486 Some(nulls),
487 );
488 }
489
490 let offsets = OffsetBuffer::<T::Offset>::from_lengths(
491 predicate.iter().map(|b| if b { value_length } else { 0 }),
492 );
493
494 let mut bytes = MutableBuffer::with_capacity(0);
495 bytes.repeat_slice_n_times(value, number_of_true);
496
497 let bytes = Buffer::from(bytes);
498
499 let nulls = NullBuffer::new(predicate);
502
503 (bytes, offsets, Some(nulls))
504 }
505
506 fn get_bytes_and_offset_for_all_same_value(
509 number_of_values: usize,
510 value: &[u8],
511 ) -> (Buffer, OffsetBuffer<T::Offset>) {
512 let value_length = value.len();
513
514 let offsets =
515 OffsetBuffer::<T::Offset>::from_repeated_length(value_length, number_of_values);
516
517 let mut bytes = MutableBuffer::with_capacity(0);
518 bytes.repeat_slice_n_times(value, number_of_values);
519 let bytes = Buffer::from(bytes);
520
521 (bytes, offsets)
522 }
523
524 fn create_output_on_non_nulls(
525 predicate: &BooleanBuffer,
526 truthy_val: &[u8],
527 falsy_val: &[u8],
528 ) -> (Buffer, OffsetBuffer<<T as ByteArrayType>::Offset>) {
529 let true_count = predicate.count_set_bits();
530
531 match true_count {
532 0 => {
533 let (bytes, offsets) =
536 Self::get_bytes_and_offset_for_all_same_value(predicate.len(), falsy_val);
537
538 return (bytes, offsets);
539 }
540 n if n == predicate.len() => {
541 let (bytes, offsets) =
543 Self::get_bytes_and_offset_for_all_same_value(predicate.len(), truthy_val);
544
545 return (bytes, offsets);
546 }
547
548 _ => {
549 }
551 }
552
553 let total_number_of_bytes =
554 true_count * truthy_val.len() + (predicate.len() - true_count) * falsy_val.len();
555 let mut mutable = MutableBuffer::with_capacity(total_number_of_bytes);
556 let mut offset_buffer_builder = OffsetBufferBuilder::<T::Offset>::new(predicate.len());
557
558 let mut filled = 0;
560
561 let truthy_len = truthy_val.len();
562 let falsy_len = falsy_val.len();
563
564 SlicesIterator::from(predicate).for_each(|(start, end)| {
565 if start > filled {
567 let false_repeat_count = start - filled;
568 mutable.repeat_slice_n_times(falsy_val, false_repeat_count);
570
571 for _ in 0..false_repeat_count {
572 offset_buffer_builder.push_length(falsy_len)
573 }
574 }
575
576 let true_repeat_count = end - start;
577 mutable.repeat_slice_n_times(truthy_val, true_repeat_count);
579
580 for _ in 0..true_repeat_count {
581 offset_buffer_builder.push_length(truthy_len)
582 }
583 filled = end;
584 });
585 if filled < predicate.len() {
587 let false_repeat_count = predicate.len() - filled;
588 mutable.repeat_slice_n_times(falsy_val, false_repeat_count);
590
591 for _ in 0..false_repeat_count {
592 offset_buffer_builder.push_length(falsy_len)
593 }
594 }
595
596 (mutable.into(), offset_buffer_builder.finish())
597 }
598}
599
600impl<T: ByteArrayType> ZipImpl for BytesScalarImpl<T> {
601 fn create_output(&self, predicate: &BooleanArray) -> Result<ArrayRef, ArrowError> {
602 let result_len = predicate.len();
603 let predicate = maybe_prep_null_mask_filter(predicate);
605
606 let (bytes, offsets, nulls): (Buffer, OffsetBuffer<T::Offset>, Option<NullBuffer>) =
607 match (self.truthy.as_deref(), self.falsy.as_deref()) {
608 (Some(truthy_val), Some(falsy_val)) => {
609 let (bytes, offsets) =
610 Self::create_output_on_non_nulls(&predicate, truthy_val, falsy_val);
611
612 (bytes, offsets, None)
613 }
614 (Some(truthy_val), None) => {
615 Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, truthy_val)
616 }
617 (None, Some(falsy_val)) => {
618 let predicate = predicate.not();
623 Self::get_scalar_and_null_buffer_for_single_non_nullable(predicate, falsy_val)
624 }
625 (None, None) => {
626 let nulls = NullBuffer::new_null(result_len);
628
629 (
630 Buffer::from(&[]),
632 OffsetBuffer::<T::Offset>::new_zeroed(predicate.len()),
634 Some(nulls),
635 )
636 }
637 };
638
639 let output = unsafe {
640 GenericByteArray::<T>::new_unchecked(offsets, bytes, nulls)
643 };
644
645 Ok(Arc::new(output))
646 }
647}
648
649fn maybe_prep_null_mask_filter(predicate: &BooleanArray) -> BooleanBuffer {
650 if predicate.null_count() == 0 {
652 predicate.values().clone()
653 } else {
654 let cleaned = prep_null_mask_filter(predicate);
655 let (boolean_buffer, _) = cleaned.into_parts();
656 boolean_buffer
657 }
658}
659
660#[cfg(test)]
661mod test {
662 use super::*;
663 use arrow_array::types::Int32Type;
664
665 #[test]
666 fn test_zip_kernel_one() {
667 let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
668 let b = Int32Array::from(vec![None, Some(3), Some(6), Some(7), Some(3)]);
669 let mask = BooleanArray::from(vec![true, true, false, false, true]);
670 let out = zip(&mask, &a, &b).unwrap();
671 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
672 let expected = Int32Array::from(vec![Some(5), None, Some(6), Some(7), Some(1)]);
673 assert_eq!(actual, &expected);
674 }
675
676 #[test]
677 fn test_zip_kernel_two() {
678 let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
679 let b = Int32Array::from(vec![None, Some(3), Some(6), Some(7), Some(3)]);
680 let mask = BooleanArray::from(vec![false, false, true, true, false]);
681 let out = zip(&mask, &a, &b).unwrap();
682 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
683 let expected = Int32Array::from(vec![None, Some(3), Some(7), None, Some(3)]);
684 assert_eq!(actual, &expected);
685 }
686
687 #[test]
688 fn test_zip_kernel_scalar_falsy_1() {
689 let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
690
691 let fallback = Scalar::new(Int32Array::from_value(42, 1));
692
693 let mask = BooleanArray::from(vec![true, true, false, false, true]);
694 let out = zip(&mask, &a, &fallback).unwrap();
695 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
696 let expected = Int32Array::from(vec![Some(5), None, Some(42), Some(42), Some(1)]);
697 assert_eq!(actual, &expected);
698 }
699
700 #[test]
701 fn test_zip_kernel_scalar_falsy_2() {
702 let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
703
704 let fallback = Scalar::new(Int32Array::from_value(42, 1));
705
706 let mask = BooleanArray::from(vec![false, false, true, true, false]);
707 let out = zip(&mask, &a, &fallback).unwrap();
708 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
709 let expected = Int32Array::from(vec![Some(42), Some(42), Some(7), None, Some(42)]);
710 assert_eq!(actual, &expected);
711 }
712
713 #[test]
714 fn test_zip_kernel_scalar_truthy_1() {
715 let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
716
717 let fallback = Scalar::new(Int32Array::from_value(42, 1));
718
719 let mask = BooleanArray::from(vec![true, true, false, false, true]);
720 let out = zip(&mask, &fallback, &a).unwrap();
721 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
722 let expected = Int32Array::from(vec![Some(42), Some(42), Some(7), None, Some(42)]);
723 assert_eq!(actual, &expected);
724 }
725
726 #[test]
727 fn test_zip_kernel_scalar_truthy_2() {
728 let a = Int32Array::from(vec![Some(5), None, Some(7), None, Some(1)]);
729
730 let fallback = Scalar::new(Int32Array::from_value(42, 1));
731
732 let mask = BooleanArray::from(vec![false, false, true, true, false]);
733 let out = zip(&mask, &fallback, &a).unwrap();
734 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
735 let expected = Int32Array::from(vec![Some(5), None, Some(42), Some(42), Some(1)]);
736 assert_eq!(actual, &expected);
737 }
738
739 #[test]
740 fn test_zip_kernel_scalar_both_mask_ends_with_true() {
741 let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
742 let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1));
743
744 let mask = BooleanArray::from(vec![true, true, false, false, true]);
745 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
746 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
747 let expected = Int32Array::from(vec![Some(42), Some(42), Some(123), Some(123), Some(42)]);
748 assert_eq!(actual, &expected);
749 }
750
751 #[test]
752 fn test_zip_kernel_scalar_both_mask_ends_with_false() {
753 let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
754 let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1));
755
756 let mask = BooleanArray::from(vec![true, true, false, true, false, false]);
757 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
758 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
759 let expected = Int32Array::from(vec![
760 Some(42),
761 Some(42),
762 Some(123),
763 Some(42),
764 Some(123),
765 Some(123),
766 ]);
767 assert_eq!(actual, &expected);
768 }
769
770 #[test]
771 fn test_zip_kernel_primitive_scalar_none_1() {
772 let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
773 let scalar_falsy = Scalar::new(Int32Array::new_null(1));
774
775 let mask = BooleanArray::from(vec![true, true, false, false, true]);
776 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
777 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
778 let expected = Int32Array::from(vec![Some(42), Some(42), None, None, Some(42)]);
779 assert_eq!(actual, &expected);
780 }
781
782 #[test]
783 fn test_zip_kernel_primitive_scalar_none_2() {
784 let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
785 let scalar_falsy = Scalar::new(Int32Array::new_null(1));
786
787 let mask = BooleanArray::from(vec![false, false, true, true, false]);
788 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
789 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
790 let expected = Int32Array::from(vec![None, None, Some(42), Some(42), None]);
791 assert_eq!(actual, &expected);
792 }
793
794 #[test]
795 fn test_zip_kernel_primitive_scalar_both_null() {
796 let scalar_truthy = Scalar::new(Int32Array::new_null(1));
797 let scalar_falsy = Scalar::new(Int32Array::new_null(1));
798
799 let mask = BooleanArray::from(vec![false, false, true, true, false]);
800 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
801 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
802 let expected = Int32Array::from(vec![None, None, None, None, None]);
803 assert_eq!(actual, &expected);
804 }
805
806 #[test]
807 fn test_zip_primitive_array_with_nulls_is_mask_should_be_treated_as_false() {
808 let truthy = Int32Array::from_iter_values(vec![1, 2, 3, 4, 5, 6]);
809 let falsy = Int32Array::from_iter_values(vec![7, 8, 9, 10, 11, 12]);
810
811 let mask = {
812 let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]);
813 let nulls = NullBuffer::from(vec![
814 true, true, true,
815 false, true, true,
817 ]);
818 BooleanArray::new(booleans, Some(nulls))
819 };
820 let out = zip(&mask, &truthy, &falsy).unwrap();
821 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
822 let expected = Int32Array::from(vec![
823 Some(1),
824 Some(2),
825 Some(9),
826 Some(10), Some(11),
828 Some(12),
829 ]);
830 assert_eq!(actual, &expected);
831 }
832
833 #[test]
834 fn test_zip_kernel_primitive_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false()
835 {
836 let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
837 let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1));
838
839 let mask = {
840 let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]);
841 let nulls = NullBuffer::from(vec![
842 true, true, true,
843 false, true, true,
845 ]);
846 BooleanArray::new(booleans, Some(nulls))
847 };
848 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
849 let actual = out.as_any().downcast_ref::<Int32Array>().unwrap();
850 let expected = Int32Array::from(vec![
851 Some(42),
852 Some(42),
853 Some(123),
854 Some(123), Some(123),
856 Some(123),
857 ]);
858 assert_eq!(actual, &expected);
859 }
860
861 #[test]
862 fn test_zip_string_array_with_nulls_is_mask_should_be_treated_as_false() {
863 let truthy = StringArray::from_iter_values(vec!["1", "2", "3", "4", "5", "6"]);
864 let falsy = StringArray::from_iter_values(vec!["7", "8", "9", "10", "11", "12"]);
865
866 let mask = {
867 let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]);
868 let nulls = NullBuffer::from(vec![
869 true, true, true,
870 false, true, true,
872 ]);
873 BooleanArray::new(booleans, Some(nulls))
874 };
875 let out = zip(&mask, &truthy, &falsy).unwrap();
876 let actual = out.as_string::<i32>();
877 let expected = StringArray::from_iter_values(vec![
878 "1", "2", "9", "10", "11", "12",
880 ]);
881 assert_eq!(actual, &expected);
882 }
883
884 #[test]
885 fn test_zip_kernel_large_string_scalar_with_boolean_array_mask_with_nulls_should_be_treated_as_false()
886 {
887 let scalar_truthy = Scalar::new(LargeStringArray::from_iter_values(["test"]));
888 let scalar_falsy = Scalar::new(LargeStringArray::from_iter_values(["something else"]));
889
890 let mask = {
891 let booleans = BooleanBuffer::from(vec![true, true, false, true, false, false]);
892 let nulls = NullBuffer::from(vec![
893 true, true, true,
894 false, true, true,
896 ]);
897 BooleanArray::new(booleans, Some(nulls))
898 };
899 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
900 let actual = out.as_any().downcast_ref::<LargeStringArray>().unwrap();
901 let expected = LargeStringArray::from_iter(vec![
902 Some("test"),
903 Some("test"),
904 Some("something else"),
905 Some("something else"), Some("something else"),
907 Some("something else"),
908 ]);
909 assert_eq!(actual, &expected);
910 }
911
912 #[test]
913 fn test_zip_kernel_bytes_scalar_none_1() {
914 let scalar_truthy = Scalar::new(StringArray::from_iter_values(["hello"]));
915 let scalar_falsy = Scalar::new(StringArray::new_null(1));
916
917 let mask = BooleanArray::from(vec![true, true, false, false, true]);
918 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
919 let actual = out.as_any().downcast_ref::<StringArray>().unwrap();
920 let expected = StringArray::from_iter(vec![
921 Some("hello"),
922 Some("hello"),
923 None,
924 None,
925 Some("hello"),
926 ]);
927 assert_eq!(actual, &expected);
928 }
929
930 #[test]
931 fn test_zip_kernel_bytes_scalar_none_2() {
932 let scalar_truthy = Scalar::new(StringArray::new_null(1));
933 let scalar_falsy = Scalar::new(StringArray::from_iter_values(["hello"]));
934
935 let mask = BooleanArray::from(vec![true, true, false, false, true]);
936 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
937 let actual = out.as_any().downcast_ref::<StringArray>().unwrap();
938 let expected = StringArray::from_iter(vec![None, None, Some("hello"), Some("hello"), None]);
939 assert_eq!(actual, &expected);
940 }
941
942 #[test]
943 fn test_zip_kernel_bytes_scalar_both() {
944 let scalar_truthy = Scalar::new(StringArray::from_iter_values(["test"]));
945 let scalar_falsy = Scalar::new(StringArray::from_iter_values(["something else"]));
946
947 let mask = BooleanArray::from(vec![true, true, false, true, false, false]);
949 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
950 let actual = out.as_any().downcast_ref::<StringArray>().unwrap();
951 let expected = StringArray::from_iter(vec![
952 Some("test"),
953 Some("test"),
954 Some("something else"),
955 Some("test"),
956 Some("something else"),
957 Some("something else"),
958 ]);
959 assert_eq!(actual, &expected);
960 }
961
962 #[test]
963 fn test_zip_scalar_bytes_only_taking_one_side() {
964 let mask_len = 5;
965 let all_true_mask = BooleanArray::from(vec![true; mask_len]);
966 let all_false_mask = BooleanArray::from(vec![false; mask_len]);
967
968 let null_scalar = Scalar::new(StringArray::new_null(1));
969 let non_null_scalar_1 = Scalar::new(StringArray::from_iter_values(["test"]));
970 let non_null_scalar_2 = Scalar::new(StringArray::from_iter_values(["something else"]));
971
972 {
973 let out = zip(&all_true_mask, &null_scalar, &non_null_scalar_1).unwrap();
976 let actual = out.as_string::<i32>();
977 let expected = StringArray::from_iter(std::iter::repeat_n(None::<&str>, mask_len));
978 assert_eq!(actual, &expected);
979 }
980
981 {
982 let out = zip(&all_false_mask, &null_scalar, &non_null_scalar_1).unwrap();
985 let actual = out.as_string::<i32>();
986 let expected = StringArray::from_iter(std::iter::repeat_n(Some("test"), mask_len));
987 assert_eq!(actual, &expected);
988 }
989
990 {
991 let out = zip(&all_true_mask, &non_null_scalar_1, &null_scalar).unwrap();
994 let actual = out.as_string::<i32>();
995 let expected = StringArray::from_iter(std::iter::repeat_n(Some("test"), mask_len));
996 assert_eq!(actual, &expected);
997 }
998
999 {
1000 let out = zip(&all_false_mask, &non_null_scalar_1, &null_scalar).unwrap();
1003 let actual = out.as_string::<i32>();
1004 let expected = StringArray::from_iter(std::iter::repeat_n(None::<&str>, mask_len));
1005 assert_eq!(actual, &expected);
1006 }
1007
1008 {
1009 let out = zip(&all_true_mask, &non_null_scalar_1, &non_null_scalar_2).unwrap();
1012 let actual = out.as_string::<i32>();
1013 let expected = StringArray::from_iter(std::iter::repeat_n(Some("test"), mask_len));
1014 assert_eq!(actual, &expected);
1015 }
1016
1017 {
1018 let out = zip(&all_false_mask, &non_null_scalar_1, &non_null_scalar_2).unwrap();
1021 let actual = out.as_string::<i32>();
1022 let expected =
1023 StringArray::from_iter(std::iter::repeat_n(Some("something else"), mask_len));
1024 assert_eq!(actual, &expected);
1025 }
1026
1027 {
1028 let mask = BooleanArray::from(vec![true, false, true, false, true]);
1031 let out = zip(&mask, &null_scalar, &null_scalar).unwrap();
1032 let actual = out.as_string::<i32>();
1033 let expected = StringArray::from_iter(std::iter::repeat_n(None::<&str>, mask_len));
1034 assert_eq!(actual, &expected);
1035 }
1036 }
1037
1038 #[test]
1039 fn test_scalar_zipper() {
1040 let scalar_truthy = Scalar::new(Int32Array::from_value(42, 1));
1041 let scalar_falsy = Scalar::new(Int32Array::from_value(123, 1));
1042
1043 let mask = BooleanArray::from(vec![false, false, true, true, false]);
1044
1045 let scalar_zipper = ScalarZipper::try_new(&scalar_truthy, &scalar_falsy).unwrap();
1046 let out = scalar_zipper.zip(&mask).unwrap();
1047 let actual = out.as_primitive::<Int32Type>();
1048 let expected = Int32Array::from(vec![Some(123), Some(123), Some(42), Some(42), Some(123)]);
1049 assert_eq!(actual, &expected);
1050
1051 let mask = BooleanArray::from(vec![true, false, true]);
1053 let out = scalar_zipper.zip(&mask).unwrap();
1054 let actual = out.as_primitive::<Int32Type>();
1055 let expected = Int32Array::from(vec![Some(42), Some(123), Some(42)]);
1056 assert_eq!(actual, &expected);
1057 }
1058
1059 #[test]
1060 fn test_zip_kernel_scalar_strings() {
1061 let scalar_truthy = Scalar::new(StringArray::from(vec!["hello"]));
1062 let scalar_falsy = Scalar::new(StringArray::from(vec!["world"]));
1063
1064 let mask = BooleanArray::from(vec![true, false, true, false, true]);
1065 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1066 let actual = out.as_string::<i32>();
1067 let expected = StringArray::from(vec![
1068 Some("hello"),
1069 Some("world"),
1070 Some("hello"),
1071 Some("world"),
1072 Some("hello"),
1073 ]);
1074 assert_eq!(actual, &expected);
1075 }
1076
1077 #[test]
1078 fn test_zip_kernel_scalar_binary() {
1079 let truthy_bytes: &[u8] = b"\xFF\xFE\xFD";
1080 let falsy_bytes: &[u8] = b"world";
1081 let scalar_truthy = Scalar::new(BinaryArray::from_iter_values(
1082 vec![truthy_bytes],
1084 ));
1085 let scalar_falsy = Scalar::new(BinaryArray::from_iter_values(vec![falsy_bytes]));
1086
1087 let mask = BooleanArray::from(vec![true, false, true, false, true]);
1088 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1089 let actual = out.as_binary::<i32>();
1090 let expected = BinaryArray::from(vec![
1091 Some(truthy_bytes),
1092 Some(falsy_bytes),
1093 Some(truthy_bytes),
1094 Some(falsy_bytes),
1095 Some(truthy_bytes),
1096 ]);
1097 assert_eq!(actual, &expected);
1098 }
1099
1100 #[test]
1101 fn test_zip_kernel_scalar_large_binary() {
1102 let truthy_bytes: &[u8] = b"hey";
1103 let falsy_bytes: &[u8] = b"world";
1104 let scalar_truthy = Scalar::new(LargeBinaryArray::from_iter_values(vec![truthy_bytes]));
1105 let scalar_falsy = Scalar::new(LargeBinaryArray::from_iter_values(vec![falsy_bytes]));
1106
1107 let mask = BooleanArray::from(vec![true, false, true, false, true]);
1108 let out = zip(&mask, &scalar_truthy, &scalar_falsy).unwrap();
1109 let actual = out.as_binary::<i64>();
1110 let expected = LargeBinaryArray::from(vec![
1111 Some(truthy_bytes),
1112 Some(falsy_bytes),
1113 Some(truthy_bytes),
1114 Some(falsy_bytes),
1115 Some(truthy_bytes),
1116 ]);
1117 assert_eq!(actual, &expected);
1118 }
1119
1120 #[test]
1122 fn test_zip_decimal_with_custom_precision_and_scale() {
1123 let arr = Decimal128Array::from_iter_values([12345, 456, 7890, -123223423432432])
1124 .with_precision_and_scale(20, 2)
1125 .unwrap();
1126
1127 let arr: ArrayRef = Arc::new(arr);
1128
1129 let scalar_1 = Scalar::new(arr.slice(0, 1));
1130 let scalar_2 = Scalar::new(arr.slice(1, 1));
1131 let null_scalar = Scalar::new(new_null_array(arr.data_type(), 1));
1132 let array_1: ArrayRef = arr.slice(0, 2);
1133 let array_2: ArrayRef = arr.slice(2, 2);
1134
1135 test_zip_output_data_types_for_input(scalar_1, scalar_2, null_scalar, array_1, array_2);
1136 }
1137
1138 #[test]
1140 fn test_zip_timestamp_with_timezone() {
1141 let arr = TimestampSecondArray::from(vec![0, 1000, 2000, 4000])
1142 .with_timezone("+01:00".to_string());
1143
1144 let arr: ArrayRef = Arc::new(arr);
1145
1146 let scalar_1 = Scalar::new(arr.slice(0, 1));
1147 let scalar_2 = Scalar::new(arr.slice(1, 1));
1148 let null_scalar = Scalar::new(new_null_array(arr.data_type(), 1));
1149 let array_1: ArrayRef = arr.slice(0, 2);
1150 let array_2: ArrayRef = arr.slice(2, 2);
1151
1152 test_zip_output_data_types_for_input(scalar_1, scalar_2, null_scalar, array_1, array_2);
1153 }
1154
1155 fn test_zip_output_data_types_for_input(
1156 scalar_1: Scalar<ArrayRef>,
1157 scalar_2: Scalar<ArrayRef>,
1158 null_scalar: Scalar<ArrayRef>,
1159 array_1: ArrayRef,
1160 array_2: ArrayRef,
1161 ) {
1162 test_zip_output_data_type(&scalar_1, &scalar_2, 10);
1164
1165 test_zip_output_data_type(&null_scalar, &scalar_1, 10);
1167 test_zip_output_data_type(&scalar_1, &null_scalar, 10);
1168
1169 test_zip_output_data_type(&array_1.as_ref(), &scalar_1, array_1.len());
1171 test_zip_output_data_type(&scalar_1, &array_1.as_ref(), array_1.len());
1172
1173 test_zip_output_data_type(&array_1.as_ref(), &null_scalar, array_1.len());
1175
1176 test_zip_output_data_type(&null_scalar, &array_1.as_ref(), array_1.len());
1177
1178 test_zip_output_data_type(&array_1.as_ref(), &array_2.as_ref(), array_1.len());
1180 }
1181
1182 fn test_zip_output_data_type(truthy: &dyn Datum, falsy: &dyn Datum, mask_length: usize) {
1183 let expected_data_type = truthy.get().0.data_type().clone();
1184 assert_eq!(&expected_data_type, falsy.get().0.data_type());
1185
1186 let mask_all_true = BooleanArray::from(vec![true; mask_length]);
1188 let mask_all_false = BooleanArray::from(vec![false; mask_length]);
1189 let mask_some_true_and_false =
1190 BooleanArray::from((0..mask_length).map(|i| i % 2 == 0).collect::<Vec<bool>>());
1191
1192 for mask in [&mask_all_true, &mask_all_false, &mask_some_true_and_false] {
1193 let out = zip(mask, truthy, falsy).unwrap();
1194 assert_eq!(out.data_type(), &expected_data_type);
1195 }
1196 }
1197
1198 #[test]
1199 fn zip_scalar_fallback_impl() {
1200 let truthy_list_item_scalar = Some(vec![Some(1), None, Some(3)]);
1201 let truthy_list_array_scalar =
1202 Scalar::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1203 truthy_list_item_scalar.clone(),
1204 ]));
1205 let falsy_list_item_scalar = Some(vec![None, Some(2), Some(4)]);
1206 let falsy_list_array_scalar =
1207 Scalar::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1208 falsy_list_item_scalar.clone(),
1209 ]));
1210 let mask = BooleanArray::from(vec![true, false, true, false, false, true, false]);
1211 let out = zip(&mask, &truthy_list_array_scalar, &falsy_list_array_scalar).unwrap();
1212 let actual = out.as_list::<i32>();
1213
1214 let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
1215 truthy_list_item_scalar.clone(),
1216 falsy_list_item_scalar.clone(),
1217 truthy_list_item_scalar.clone(),
1218 falsy_list_item_scalar.clone(),
1219 falsy_list_item_scalar.clone(),
1220 truthy_list_item_scalar.clone(),
1221 falsy_list_item_scalar.clone(),
1222 ]);
1223 assert_eq!(actual, &expected);
1224 }
1225}