1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use arrow_array::types::{Int16Type, Int32Type, Int64Type, Int8Type};
use arrow_array::{Int16Array, Int32Array, Int64Array, Int8Array};
use rayon::iter::ParallelIterator;

use crate::parallel_primitive_array::{ParallelPrimitiveArray, ParallelPrimitiveArrayRef};

pub type ParallelInt8Array = ParallelPrimitiveArray<Int8Type>;
pub type ParallelInt8ArrayRef<'data> = ParallelPrimitiveArrayRef<'data, Int8Type>;
pub type ParallelInt16Array = ParallelPrimitiveArray<Int16Type>;
pub type ParallelInt16ArrayRef<'data> = ParallelPrimitiveArrayRef<'data, Int16Type>;
pub type ParallelInt32Array = ParallelPrimitiveArray<Int32Type>;
pub type ParallelInt32ArrayRef<'data> = ParallelPrimitiveArrayRef<'data, Int32Type>;
pub type ParallelInt64Array = ParallelPrimitiveArray<Int64Type>;
pub type ParallelInt64ArrayRef<'data> = ParallelPrimitiveArrayRef<'data, Int64Type>;

pub trait Int8ArrayRefParallelIterator<'data> {
    type Iter: ParallelIterator<Item = Option<i8>>;

    fn par_iter(&'data self) -> Self::Iter;
}

impl<'data> Int8ArrayRefParallelIterator<'data> for Int8Array {
    type Iter = ParallelInt8ArrayRef<'data>;

    fn par_iter(&'data self) -> Self::Iter {
        ParallelInt8ArrayRef::new(self)
    }
}

pub trait Int16ArrayRefParallelIterator<'data> {
    type Iter: ParallelIterator<Item = Option<i16>>;

    fn par_iter(&'data self) -> Self::Iter;
}

impl<'data> Int16ArrayRefParallelIterator<'data> for Int16Array {
    type Iter = ParallelInt16ArrayRef<'data>;

    fn par_iter(&'data self) -> Self::Iter {
        ParallelInt16ArrayRef::new(self)
    }
}

pub trait Int32ArrayRefParallelIterator<'data> {
    type Iter: ParallelIterator<Item = Option<i32>>;

    fn par_iter(&'data self) -> Self::Iter;
}

impl<'data> Int32ArrayRefParallelIterator<'data> for Int32Array {
    type Iter = ParallelInt32ArrayRef<'data>;

    fn par_iter(&'data self) -> Self::Iter {
        ParallelInt32ArrayRef::new(self)
    }
}

pub trait Int64ArrayRefParallelIterator<'data> {
    type Iter: ParallelIterator<Item = Option<i64>>;

    fn par_iter(&'data self) -> Self::Iter;
}

impl<'data> Int64ArrayRefParallelIterator<'data> for Int64Array {
    type Iter = ParallelInt64ArrayRef<'data>;

    fn par_iter(&'data self) -> Self::Iter {
        ParallelInt64ArrayRef::new(self)
    }
}

#[cfg(test)]
mod tests {
    use rayon::iter::IndexedParallelIterator;

    use super::*;

    #[test]
    fn test_par_iter() {
        let array1 = Int8Array::from(vec![Some(1), None, Some(2), Some(3)]);
        let array2 = Int8Array::from(vec![Some(2), Some(4), None, Some(5)]);
        let items: Vec<i8> = array1
            .par_iter()
            .zip(array2.par_iter())
            .map(|opt| {
                match opt {
                    (Some(item1), Some(item2)) => item1 + item2,
                    (Some(item1), None) => item1,
                    (None, Some(item2)) => item2,
                    (None, None) => 0,
                }
            })
            .collect();
        assert_eq!(items, vec![3, 4, 2, 8]);
    }

    #[test]
    fn test_collect_array() {
        let array = Int8Array::from(vec![Some(1), None, Some(2), Some(3)]);
        let collected_array: ParallelInt8Array = array
            .par_iter()
            .map(|item| item.map_or(0, |item| item * 2))
            .collect();
        let int8_array = collected_array.into_inner();
        assert_eq!(int8_array.value(0), 2);
        assert_eq!(int8_array.value(1), 0);
        assert_eq!(int8_array.value(2), 4);
        assert_eq!(int8_array.value(3), 6);
    }
}