1use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray};
23use arrow::buffer::{BooleanBuffer, NullBuffer};
24use arrow::datatypes::ArrowPrimitiveType;
25
26use datafusion_expr_common::groups_accumulator::EmitTo;
27#[derive(Debug)]
53pub struct NullState {
54 seen_values: BooleanBufferBuilder,
62}
63
64impl Default for NullState {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl NullState {
71 pub fn new() -> Self {
72 Self {
73 seen_values: BooleanBufferBuilder::new(0),
74 }
75 }
76
77 pub fn size(&self) -> usize {
79 self.seen_values.capacity() / 8
81 }
82
83 pub fn accumulate<T, F>(
100 &mut self,
101 group_indices: &[usize],
102 values: &PrimitiveArray<T>,
103 opt_filter: Option<&BooleanArray>,
104 total_num_groups: usize,
105 mut value_fn: F,
106 ) where
107 T: ArrowPrimitiveType + Send,
108 F: FnMut(usize, T::Native) + Send,
109 {
110 let seen_values =
113 initialize_builder(&mut self.seen_values, total_num_groups, false);
114 accumulate(group_indices, values, opt_filter, |group_index, value| {
115 seen_values.set_bit(group_index, true);
116 value_fn(group_index, value);
117 });
118 }
119
120 pub fn accumulate_boolean<F>(
131 &mut self,
132 group_indices: &[usize],
133 values: &BooleanArray,
134 opt_filter: Option<&BooleanArray>,
135 total_num_groups: usize,
136 mut value_fn: F,
137 ) where
138 F: FnMut(usize, bool) + Send,
139 {
140 let data = values.values();
141 assert_eq!(data.len(), group_indices.len());
142
143 let seen_values =
146 initialize_builder(&mut self.seen_values, total_num_groups, false);
147
148 match (values.null_count() > 0, opt_filter) {
150 (false, None) => {
152 group_indices.iter().zip(data.iter()).for_each(
155 |(&group_index, new_value)| {
156 seen_values.set_bit(group_index, true);
157 value_fn(group_index, new_value)
158 },
159 )
160 }
161 (true, None) => {
163 let nulls = values.nulls().unwrap();
164 group_indices
165 .iter()
166 .zip(data.iter())
167 .zip(nulls.iter())
168 .for_each(|((&group_index, new_value), is_valid)| {
169 if is_valid {
170 seen_values.set_bit(group_index, true);
171 value_fn(group_index, new_value);
172 }
173 })
174 }
175 (false, Some(filter)) => {
177 assert_eq!(filter.len(), group_indices.len());
178
179 group_indices
180 .iter()
181 .zip(data.iter())
182 .zip(filter.iter())
183 .for_each(|((&group_index, new_value), filter_value)| {
184 if let Some(true) = filter_value {
185 seen_values.set_bit(group_index, true);
186 value_fn(group_index, new_value);
187 }
188 })
189 }
190 (true, Some(filter)) => {
192 assert_eq!(filter.len(), group_indices.len());
193 filter
194 .iter()
195 .zip(group_indices.iter())
196 .zip(values.iter())
197 .for_each(|((filter_value, &group_index), new_value)| {
198 if let Some(true) = filter_value {
199 if let Some(new_value) = new_value {
200 seen_values.set_bit(group_index, true);
201 value_fn(group_index, new_value)
202 }
203 }
204 })
205 }
206 }
207 }
208
209 pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
215 let nulls: BooleanBuffer = self.seen_values.finish();
216
217 let nulls = match emit_to {
218 EmitTo::All => nulls,
219 EmitTo::First(n) => {
220 let first_n_null: BooleanBuffer = nulls.iter().take(n).collect();
225 for seen in nulls.iter().skip(n) {
227 self.seen_values.append(seen);
228 }
229 first_n_null
230 }
231 };
232 NullBuffer::new(nulls)
233 }
234}
235
236pub fn accumulate<T, F>(
275 group_indices: &[usize],
276 values: &PrimitiveArray<T>,
277 opt_filter: Option<&BooleanArray>,
278 mut value_fn: F,
279) where
280 T: ArrowPrimitiveType + Send,
281 F: FnMut(usize, T::Native) + Send,
282{
283 let data: &[T::Native] = values.values();
284 assert_eq!(data.len(), group_indices.len());
285
286 match (values.null_count() > 0, opt_filter) {
287 (false, None) => {
289 let iter = group_indices.iter().zip(data.iter());
290 for (&group_index, &new_value) in iter {
291 value_fn(group_index, new_value);
292 }
293 }
294 (true, None) => {
296 let nulls = values.nulls().unwrap();
297 let group_indices_chunks = group_indices.chunks_exact(64);
300 let data_chunks = data.chunks_exact(64);
301 let bit_chunks = nulls.inner().bit_chunks();
302
303 let group_indices_remainder = group_indices_chunks.remainder();
304 let data_remainder = data_chunks.remainder();
305
306 group_indices_chunks
307 .zip(data_chunks)
308 .zip(bit_chunks.iter())
309 .for_each(|((group_index_chunk, data_chunk), mask)| {
310 let mut index_mask = 1;
312 group_index_chunk.iter().zip(data_chunk.iter()).for_each(
313 |(&group_index, &new_value)| {
314 let is_valid = (mask & index_mask) != 0;
316 if is_valid {
317 value_fn(group_index, new_value);
318 }
319 index_mask <<= 1;
320 },
321 )
322 });
323
324 let remainder_bits = bit_chunks.remainder_bits();
326 group_indices_remainder
327 .iter()
328 .zip(data_remainder.iter())
329 .enumerate()
330 .for_each(|(i, (&group_index, &new_value))| {
331 let is_valid = remainder_bits & (1 << i) != 0;
332 if is_valid {
333 value_fn(group_index, new_value);
334 }
335 });
336 }
337 (false, Some(filter)) => {
339 assert_eq!(filter.len(), group_indices.len());
340 group_indices
344 .iter()
345 .zip(data.iter())
346 .zip(filter.iter())
347 .for_each(|((&group_index, &new_value), filter_value)| {
348 if let Some(true) = filter_value {
349 value_fn(group_index, new_value);
350 }
351 })
352 }
353 (true, Some(filter)) => {
355 assert_eq!(filter.len(), group_indices.len());
356 filter
360 .iter()
361 .zip(group_indices.iter())
362 .zip(values.iter())
363 .for_each(|((filter_value, &group_index), new_value)| {
364 if let Some(true) = filter_value {
365 if let Some(new_value) = new_value {
366 value_fn(group_index, new_value)
367 }
368 }
369 })
370 }
371 }
372}
373
374pub fn accumulate_multiple<T, F>(
390 group_indices: &[usize],
391 value_columns: &[&PrimitiveArray<T>],
392 opt_filter: Option<&BooleanArray>,
393 mut value_fn: F,
394) where
395 T: ArrowPrimitiveType + Send,
396 F: FnMut(usize, usize, &[&PrimitiveArray<T>]) + Send,
397{
398 let combined_nulls = value_columns
406 .iter()
407 .map(|arr| arr.logical_nulls())
408 .fold(None, |acc, nulls| {
409 NullBuffer::union(acc.as_ref(), nulls.as_ref())
410 });
411
412 let valid_indices = match (combined_nulls, opt_filter) {
414 (None, None) => None,
415 (None, Some(filter)) => Some(filter.clone()),
416 (Some(nulls), None) => Some(BooleanArray::new(nulls.inner().clone(), None)),
417 (Some(nulls), Some(filter)) => {
418 let combined = nulls.inner() & filter.values();
419 Some(BooleanArray::new(combined, None))
420 }
421 };
422
423 for col in value_columns.iter() {
424 debug_assert_eq!(col.len(), group_indices.len());
425 }
426
427 match valid_indices {
428 None => {
429 for (batch_idx, &group_idx) in group_indices.iter().enumerate() {
430 value_fn(group_idx, batch_idx, value_columns);
431 }
432 }
433 Some(valid_indices) => {
434 for (batch_idx, &group_idx) in group_indices.iter().enumerate() {
435 if valid_indices.value(batch_idx) {
436 value_fn(group_idx, batch_idx, value_columns);
437 }
438 }
439 }
440 }
441}
442
443pub fn accumulate_indices<F>(
453 group_indices: &[usize],
454 nulls: Option<&NullBuffer>,
455 opt_filter: Option<&BooleanArray>,
456 mut index_fn: F,
457) where
458 F: FnMut(usize) + Send,
459{
460 match (nulls, opt_filter) {
461 (None, None) => {
462 for &group_index in group_indices.iter() {
463 index_fn(group_index)
464 }
465 }
466 (None, Some(filter)) => {
467 debug_assert_eq!(filter.len(), group_indices.len());
468 let group_indices_chunks = group_indices.chunks_exact(64);
469 let bit_chunks = filter.values().bit_chunks();
470
471 let group_indices_remainder = group_indices_chunks.remainder();
472
473 group_indices_chunks.zip(bit_chunks.iter()).for_each(
474 |(group_index_chunk, mask)| {
475 let mut index_mask = 1;
477 group_index_chunk.iter().for_each(|&group_index| {
478 let is_valid = (mask & index_mask) != 0;
480 if is_valid {
481 index_fn(group_index);
482 }
483 index_mask <<= 1;
484 })
485 },
486 );
487
488 let remainder_bits = bit_chunks.remainder_bits();
490 group_indices_remainder
491 .iter()
492 .enumerate()
493 .for_each(|(i, &group_index)| {
494 let is_valid = remainder_bits & (1 << i) != 0;
495 if is_valid {
496 index_fn(group_index)
497 }
498 });
499 }
500 (Some(valids), None) => {
501 debug_assert_eq!(valids.len(), group_indices.len());
502 let group_indices_chunks = group_indices.chunks_exact(64);
505 let bit_chunks = valids.inner().bit_chunks();
506
507 let group_indices_remainder = group_indices_chunks.remainder();
508
509 group_indices_chunks.zip(bit_chunks.iter()).for_each(
510 |(group_index_chunk, mask)| {
511 let mut index_mask = 1;
513 group_index_chunk.iter().for_each(|&group_index| {
514 let is_valid = (mask & index_mask) != 0;
516 if is_valid {
517 index_fn(group_index);
518 }
519 index_mask <<= 1;
520 })
521 },
522 );
523
524 let remainder_bits = bit_chunks.remainder_bits();
526 group_indices_remainder
527 .iter()
528 .enumerate()
529 .for_each(|(i, &group_index)| {
530 let is_valid = remainder_bits & (1 << i) != 0;
531 if is_valid {
532 index_fn(group_index)
533 }
534 });
535 }
536
537 (Some(valids), Some(filter)) => {
538 debug_assert_eq!(filter.len(), group_indices.len());
539 debug_assert_eq!(valids.len(), group_indices.len());
540
541 let group_indices_chunks = group_indices.chunks_exact(64);
542 let valid_bit_chunks = valids.inner().bit_chunks();
543 let filter_bit_chunks = filter.values().bit_chunks();
544
545 let group_indices_remainder = group_indices_chunks.remainder();
546
547 group_indices_chunks
548 .zip(valid_bit_chunks.iter())
549 .zip(filter_bit_chunks.iter())
550 .for_each(|((group_index_chunk, valid_mask), filter_mask)| {
551 let mut index_mask = 1;
553 group_index_chunk.iter().for_each(|&group_index| {
554 let is_valid = (valid_mask & filter_mask & index_mask) != 0;
556 if is_valid {
557 index_fn(group_index);
558 }
559 index_mask <<= 1;
560 })
561 });
562
563 let remainder_valid_bits = valid_bit_chunks.remainder_bits();
565 let remainder_filter_bits = filter_bit_chunks.remainder_bits();
566 group_indices_remainder
567 .iter()
568 .enumerate()
569 .for_each(|(i, &group_index)| {
570 let is_valid =
571 remainder_valid_bits & remainder_filter_bits & (1 << i) != 0;
572 if is_valid {
573 index_fn(group_index)
574 }
575 });
576 }
577 }
578}
579
580fn initialize_builder(
585 builder: &mut BooleanBufferBuilder,
586 total_num_groups: usize,
587 default_value: bool,
588) -> &mut BooleanBufferBuilder {
589 if builder.len() < total_num_groups {
590 let new_groups = total_num_groups - builder.len();
591 builder.append_n(new_groups, default_value);
592 }
593 builder
594}
595
596#[cfg(test)]
597mod test {
598 use super::*;
599
600 use arrow::array::{Int32Array, UInt32Array};
601 use rand::{rngs::ThreadRng, Rng};
602 use std::collections::HashSet;
603
604 #[test]
605 fn accumulate() {
606 let group_indices = (0..100).collect();
607 let values = (0..100).map(|i| (i + 1) * 10).collect();
608 let values_with_nulls = (0..100)
609 .map(|i| if i % 3 == 0 { None } else { Some((i + 1) * 10) })
610 .collect();
611
612 let filter: BooleanArray = (0..100)
615 .map(|i| {
616 let is_even = i % 2 == 0;
617 let is_fifth = i % 5 == 0;
618 if is_even {
619 None
620 } else if is_fifth {
621 Some(false)
622 } else {
623 Some(true)
624 }
625 })
626 .collect();
627
628 Fixture {
629 group_indices,
630 values,
631 values_with_nulls,
632 filter,
633 }
634 .run()
635 }
636
637 #[test]
638 fn accumulate_fuzz() {
639 let mut rng = rand::rng();
640 for _ in 0..100 {
641 Fixture::new_random(&mut rng).run();
642 }
643 }
644
645 struct Fixture {
647 group_indices: Vec<usize>,
649
650 values: Vec<u32>,
652
653 values_with_nulls: Vec<Option<u32>>,
656
657 filter: BooleanArray,
659 }
660
661 impl Fixture {
662 fn new_random(rng: &mut ThreadRng) -> Self {
663 let num_values: usize = rng.random_range(1..200);
665 let num_groups: usize = rng.random_range(2..1000);
667 let max_group = num_groups - 1;
668
669 let group_indices: Vec<usize> = (0..num_values)
670 .map(|_| rng.random_range(0..max_group))
671 .collect();
672
673 let values: Vec<u32> = (0..num_values).map(|_| rng.random()).collect();
674
675 let filter: BooleanArray = (0..num_values)
679 .map(|_| {
680 let filter_value = rng.random_range(0.0..1.0);
681 if filter_value < 0.1 {
682 Some(false)
683 } else if filter_value < 0.2 {
684 None
685 } else {
686 Some(true)
687 }
688 })
689 .collect();
690
691 let null_pct: f32 = rng.random_range(0.0..1.0);
694 let values_with_nulls: Vec<Option<u32>> = (0..num_values)
695 .map(|_| {
696 let is_null = null_pct < rng.random_range(0.0..1.0);
697 if is_null {
698 None
699 } else {
700 Some(rng.random())
701 }
702 })
703 .collect();
704
705 Self {
706 group_indices,
707 values,
708 values_with_nulls,
709 filter,
710 }
711 }
712
713 fn values_array(&self) -> UInt32Array {
715 UInt32Array::from(self.values.clone())
716 }
717
718 fn values_with_nulls_array(&self) -> UInt32Array {
720 UInt32Array::from(self.values_with_nulls.clone())
721 }
722
723 fn run(&self) {
726 let total_num_groups = *self.group_indices.iter().max().unwrap() + 1;
727
728 let group_indices = &self.group_indices;
729 let values_array = self.values_array();
730 let values_with_nulls_array = self.values_with_nulls_array();
731 let filter = &self.filter;
732
733 Self::accumulate_test(group_indices, &values_array, None, total_num_groups);
735
736 Self::accumulate_test(
738 group_indices,
739 &values_with_nulls_array,
740 None,
741 total_num_groups,
742 );
743
744 Self::accumulate_test(
746 group_indices,
747 &values_array,
748 Some(filter),
749 total_num_groups,
750 );
751
752 Self::accumulate_test(
754 group_indices,
755 &values_with_nulls_array,
756 Some(filter),
757 total_num_groups,
758 );
759 }
760
761 fn accumulate_test(
764 group_indices: &[usize],
765 values: &UInt32Array,
766 opt_filter: Option<&BooleanArray>,
767 total_num_groups: usize,
768 ) {
769 Self::accumulate_values_test(
770 group_indices,
771 values,
772 opt_filter,
773 total_num_groups,
774 );
775 Self::accumulate_indices_test(group_indices, values.nulls(), opt_filter);
776
777 let avg: usize = values.iter().filter_map(|v| v.map(|v| v as usize)).sum();
780 let boolean_values: BooleanArray =
781 values.iter().map(|v| v.map(|v| v as usize > avg)).collect();
782 Self::accumulate_boolean_test(
783 group_indices,
784 &boolean_values,
785 opt_filter,
786 total_num_groups,
787 );
788 }
789
790 fn accumulate_values_test(
793 group_indices: &[usize],
794 values: &UInt32Array,
795 opt_filter: Option<&BooleanArray>,
796 total_num_groups: usize,
797 ) {
798 let mut accumulated_values = vec![];
799 let mut null_state = NullState::new();
800
801 null_state.accumulate(
802 group_indices,
803 values,
804 opt_filter,
805 total_num_groups,
806 |group_index, value| {
807 accumulated_values.push((group_index, value));
808 },
809 );
810
811 let mut expected_values = vec![];
813 let mut mock = MockNullState::new();
814
815 match opt_filter {
816 None => group_indices.iter().zip(values.iter()).for_each(
817 |(&group_index, value)| {
818 if let Some(value) = value {
819 mock.saw_value(group_index);
820 expected_values.push((group_index, value));
821 }
822 },
823 ),
824 Some(filter) => {
825 group_indices
826 .iter()
827 .zip(values.iter())
828 .zip(filter.iter())
829 .for_each(|((&group_index, value), is_included)| {
830 if let Some(true) = is_included {
832 if let Some(value) = value {
833 mock.saw_value(group_index);
834 expected_values.push((group_index, value));
835 }
836 }
837 });
838 }
839 }
840
841 assert_eq!(accumulated_values, expected_values,
842 "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}");
843 let seen_values = null_state.seen_values.finish_cloned();
844 mock.validate_seen_values(&seen_values);
845
846 let expected_null_buffer = mock.expected_null_buffer(total_num_groups);
848
849 let null_buffer = null_state.build(EmitTo::All);
850
851 assert_eq!(null_buffer, expected_null_buffer);
852 }
853
854 fn accumulate_indices_test(
857 group_indices: &[usize],
858 nulls: Option<&NullBuffer>,
859 opt_filter: Option<&BooleanArray>,
860 ) {
861 let mut accumulated_values = vec![];
862
863 accumulate_indices(group_indices, nulls, opt_filter, |group_index| {
864 accumulated_values.push(group_index);
865 });
866
867 let mut expected_values = vec![];
869
870 match (nulls, opt_filter) {
871 (None, None) => group_indices.iter().for_each(|&group_index| {
872 expected_values.push(group_index);
873 }),
874 (Some(nulls), None) => group_indices.iter().zip(nulls.iter()).for_each(
875 |(&group_index, is_valid)| {
876 if is_valid {
877 expected_values.push(group_index);
878 }
879 },
880 ),
881 (None, Some(filter)) => group_indices.iter().zip(filter.iter()).for_each(
882 |(&group_index, is_included)| {
883 if let Some(true) = is_included {
884 expected_values.push(group_index);
885 }
886 },
887 ),
888 (Some(nulls), Some(filter)) => {
889 group_indices
890 .iter()
891 .zip(nulls.iter())
892 .zip(filter.iter())
893 .for_each(|((&group_index, is_valid), is_included)| {
894 if let (true, Some(true)) = (is_valid, is_included) {
896 expected_values.push(group_index);
897 }
898 });
899 }
900 }
901
902 assert_eq!(accumulated_values, expected_values,
903 "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}");
904 }
905
906 fn accumulate_boolean_test(
909 group_indices: &[usize],
910 values: &BooleanArray,
911 opt_filter: Option<&BooleanArray>,
912 total_num_groups: usize,
913 ) {
914 let mut accumulated_values = vec![];
915 let mut null_state = NullState::new();
916
917 null_state.accumulate_boolean(
918 group_indices,
919 values,
920 opt_filter,
921 total_num_groups,
922 |group_index, value| {
923 accumulated_values.push((group_index, value));
924 },
925 );
926
927 let mut expected_values = vec![];
929 let mut mock = MockNullState::new();
930
931 match opt_filter {
932 None => group_indices.iter().zip(values.iter()).for_each(
933 |(&group_index, value)| {
934 if let Some(value) = value {
935 mock.saw_value(group_index);
936 expected_values.push((group_index, value));
937 }
938 },
939 ),
940 Some(filter) => {
941 group_indices
942 .iter()
943 .zip(values.iter())
944 .zip(filter.iter())
945 .for_each(|((&group_index, value), is_included)| {
946 if let Some(true) = is_included {
948 if let Some(value) = value {
949 mock.saw_value(group_index);
950 expected_values.push((group_index, value));
951 }
952 }
953 });
954 }
955 }
956
957 assert_eq!(accumulated_values, expected_values,
958 "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}");
959
960 let seen_values = null_state.seen_values.finish_cloned();
961 mock.validate_seen_values(&seen_values);
962
963 let expected_null_buffer = mock.expected_null_buffer(total_num_groups);
965
966 let null_buffer = null_state.build(EmitTo::All);
967
968 assert_eq!(null_buffer, expected_null_buffer);
969 }
970 }
971
972 #[derive(Debug, Default)]
974 struct MockNullState {
975 seen_values: HashSet<usize>,
977 }
978
979 impl MockNullState {
980 fn new() -> Self {
981 Default::default()
982 }
983
984 fn saw_value(&mut self, group_index: usize) {
985 self.seen_values.insert(group_index);
986 }
987
988 fn expected_seen(&self, group_index: usize) -> bool {
990 self.seen_values.contains(&group_index)
991 }
992
993 fn validate_seen_values(&self, seen_values: &BooleanBuffer) {
995 for (group_index, is_seen) in seen_values.iter().enumerate() {
996 let expected_seen = self.expected_seen(group_index);
997 assert_eq!(
998 expected_seen, is_seen,
999 "mismatch at for group {group_index}"
1000 );
1001 }
1002 }
1003
1004 fn expected_null_buffer(&self, total_num_groups: usize) -> NullBuffer {
1006 (0..total_num_groups)
1007 .map(|group_index| self.expected_seen(group_index))
1008 .collect()
1009 }
1010 }
1011
1012 #[test]
1013 fn test_accumulate_multiple_no_nulls_no_filter() {
1014 let group_indices = vec![0, 1, 0, 1];
1015 let values1 = Int32Array::from(vec![1, 2, 3, 4]);
1016 let values2 = Int32Array::from(vec![10, 20, 30, 40]);
1017 let value_columns = [values1, values2];
1018
1019 let mut accumulated = vec![];
1020 accumulate_multiple(
1021 &group_indices,
1022 &value_columns.iter().collect::<Vec<_>>(),
1023 None,
1024 |group_idx, batch_idx, columns| {
1025 let values = columns.iter().map(|col| col.value(batch_idx)).collect();
1026 accumulated.push((group_idx, values));
1027 },
1028 );
1029
1030 let expected = vec![
1031 (0, vec![1, 10]),
1032 (1, vec![2, 20]),
1033 (0, vec![3, 30]),
1034 (1, vec![4, 40]),
1035 ];
1036 assert_eq!(accumulated, expected);
1037 }
1038
1039 #[test]
1040 fn test_accumulate_multiple_with_nulls() {
1041 let group_indices = vec![0, 1, 0, 1];
1042 let values1 = Int32Array::from(vec![Some(1), None, Some(3), Some(4)]);
1043 let values2 = Int32Array::from(vec![Some(10), Some(20), None, Some(40)]);
1044 let value_columns = [values1, values2];
1045
1046 let mut accumulated = vec![];
1047 accumulate_multiple(
1048 &group_indices,
1049 &value_columns.iter().collect::<Vec<_>>(),
1050 None,
1051 |group_idx, batch_idx, columns| {
1052 let values = columns.iter().map(|col| col.value(batch_idx)).collect();
1053 accumulated.push((group_idx, values));
1054 },
1055 );
1056
1057 let expected = vec![(0, vec![1, 10]), (1, vec![4, 40])];
1059 assert_eq!(accumulated, expected);
1060 }
1061
1062 #[test]
1063 fn test_accumulate_multiple_with_filter() {
1064 let group_indices = vec![0, 1, 0, 1];
1065 let values1 = Int32Array::from(vec![1, 2, 3, 4]);
1066 let values2 = Int32Array::from(vec![10, 20, 30, 40]);
1067 let value_columns = [values1, values2];
1068
1069 let filter = BooleanArray::from(vec![true, false, true, false]);
1070
1071 let mut accumulated = vec![];
1072 accumulate_multiple(
1073 &group_indices,
1074 &value_columns.iter().collect::<Vec<_>>(),
1075 Some(&filter),
1076 |group_idx, batch_idx, columns| {
1077 let values = columns.iter().map(|col| col.value(batch_idx)).collect();
1078 accumulated.push((group_idx, values));
1079 },
1080 );
1081
1082 let expected = vec![(0, vec![1, 10]), (0, vec![3, 30])];
1084 assert_eq!(accumulated, expected);
1085 }
1086
1087 #[test]
1088 fn test_accumulate_multiple_with_nulls_and_filter() {
1089 let group_indices = vec![0, 1, 0, 1];
1090 let values1 = Int32Array::from(vec![Some(1), None, Some(3), Some(4)]);
1091 let values2 = Int32Array::from(vec![Some(10), Some(20), None, Some(40)]);
1092 let value_columns = [values1, values2];
1093
1094 let filter = BooleanArray::from(vec![true, true, true, false]);
1095
1096 let mut accumulated = vec![];
1097 accumulate_multiple(
1098 &group_indices,
1099 &value_columns.iter().collect::<Vec<_>>(),
1100 Some(&filter),
1101 |group_idx, batch_idx, columns| {
1102 let values = columns.iter().map(|col| col.value(batch_idx)).collect();
1103 accumulated.push((group_idx, values));
1104 },
1105 );
1106
1107 let expected = [(0, vec![1, 10])];
1112 assert_eq!(accumulated, expected);
1113 }
1114}