arrow_rayon/parallel_array/
parallel_int_array.rs

1use arrow_array::types::{Int16Type, Int32Type, Int64Type, Int8Type};
2use arrow_array::{Int16Array, Int32Array, Int64Array, Int8Array};
3use rayon::iter::ParallelIterator;
4
5use crate::parallel_primitive_array::{ParallelPrimitiveArray, ParallelPrimitiveArrayRef};
6
7pub type ParallelInt8Array = ParallelPrimitiveArray<Int8Type>;
8pub type ParallelInt8ArrayRef<'data> = ParallelPrimitiveArrayRef<'data, Int8Type>;
9pub type ParallelInt16Array = ParallelPrimitiveArray<Int16Type>;
10pub type ParallelInt16ArrayRef<'data> = ParallelPrimitiveArrayRef<'data, Int16Type>;
11pub type ParallelInt32Array = ParallelPrimitiveArray<Int32Type>;
12pub type ParallelInt32ArrayRef<'data> = ParallelPrimitiveArrayRef<'data, Int32Type>;
13pub type ParallelInt64Array = ParallelPrimitiveArray<Int64Type>;
14pub type ParallelInt64ArrayRef<'data> = ParallelPrimitiveArrayRef<'data, Int64Type>;
15
16pub trait Int8ArrayRefParallelIterator<'data> {
17    type Iter: ParallelIterator<Item = Option<i8>>;
18
19    fn par_iter(&'data self) -> Self::Iter;
20}
21
22impl<'data> Int8ArrayRefParallelIterator<'data> for Int8Array {
23    type Iter = ParallelInt8ArrayRef<'data>;
24
25    fn par_iter(&'data self) -> Self::Iter {
26        ParallelInt8ArrayRef::new(self)
27    }
28}
29
30pub trait Int16ArrayRefParallelIterator<'data> {
31    type Iter: ParallelIterator<Item = Option<i16>>;
32
33    fn par_iter(&'data self) -> Self::Iter;
34}
35
36impl<'data> Int16ArrayRefParallelIterator<'data> for Int16Array {
37    type Iter = ParallelInt16ArrayRef<'data>;
38
39    fn par_iter(&'data self) -> Self::Iter {
40        ParallelInt16ArrayRef::new(self)
41    }
42}
43
44pub trait Int32ArrayRefParallelIterator<'data> {
45    type Iter: ParallelIterator<Item = Option<i32>>;
46
47    fn par_iter(&'data self) -> Self::Iter;
48}
49
50impl<'data> Int32ArrayRefParallelIterator<'data> for Int32Array {
51    type Iter = ParallelInt32ArrayRef<'data>;
52
53    fn par_iter(&'data self) -> Self::Iter {
54        ParallelInt32ArrayRef::new(self)
55    }
56}
57
58pub trait Int64ArrayRefParallelIterator<'data> {
59    type Iter: ParallelIterator<Item = Option<i64>>;
60
61    fn par_iter(&'data self) -> Self::Iter;
62}
63
64impl<'data> Int64ArrayRefParallelIterator<'data> for Int64Array {
65    type Iter = ParallelInt64ArrayRef<'data>;
66
67    fn par_iter(&'data self) -> Self::Iter {
68        ParallelInt64ArrayRef::new(self)
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use rayon::iter::IndexedParallelIterator;
75
76    use super::*;
77
78    #[test]
79    fn test_par_iter() {
80        let array1 = Int8Array::from(vec![Some(1), None, Some(2), Some(3)]);
81        let array2 = Int8Array::from(vec![Some(2), Some(4), None, Some(5)]);
82        let items: Vec<i8> = array1
83            .par_iter()
84            .zip(array2.par_iter())
85            .map(|opt| {
86                match opt {
87                    (Some(item1), Some(item2)) => item1 + item2,
88                    (Some(item1), None) => item1,
89                    (None, Some(item2)) => item2,
90                    (None, None) => 0,
91                }
92            })
93            .collect();
94        assert_eq!(items, vec![3, 4, 2, 8]);
95    }
96
97    #[test]
98    fn test_collect_array() {
99        let array = Int8Array::from(vec![Some(1), None, Some(2), Some(3)]);
100        let collected_array: ParallelInt8Array = array
101            .par_iter()
102            .map(|item| item.map_or(0, |item| item * 2))
103            .collect();
104        let int8_array = collected_array.into_inner();
105        assert_eq!(int8_array.value(0), 2);
106        assert_eq!(int8_array.value(1), 0);
107        assert_eq!(int8_array.value(2), 4);
108        assert_eq!(int8_array.value(3), 6);
109    }
110}