flatk/chunked/
par_iter.rs

1use super::*;
2use rayon::iter::plumbing::*;
3use rayon::prelude::*;
4
5/// A parallel chunk iterator.
6#[derive(Copy, Clone, Debug, PartialEq)]
7pub struct ChunkedParIter<I, S> {
8    first_offset_value: usize,
9    offset_values_and_sizes: I,
10    data: S,
11}
12
13impl<I, S> ParallelIterator for ChunkedParIter<I, S>
14where
15    S: Send + SplitAt + Dummy + Set,
16    I: Send + GetOffset + IndexedParallelIterator + Producer<Item = (usize, usize)>,
17    I::IntoIter: ExactSizeIterator<Item = (usize, usize)> + GetOffset,
18{
19    type Item = S;
20
21    #[inline]
22    fn drive_unindexed<C>(self, consumer: C) -> C::Result
23    where
24        C: UnindexedConsumer<Self::Item>,
25    {
26        bridge(self, consumer)
27    }
28
29    #[inline]
30    fn opt_len(&self) -> Option<usize> {
31        Some(self.len())
32    }
33}
34
35impl<I, S> IndexedParallelIterator for ChunkedParIter<I, S>
36where
37    S: Send + SplitAt + Dummy + Set,
38    I: Send + GetOffset + IndexedParallelIterator + Producer<Item = (usize, usize)>,
39    I::IntoIter: ExactSizeIterator<Item = (usize, usize)> + GetOffset,
40{
41    #[inline]
42    fn drive<C>(self, consumer: C) -> C::Result
43    where
44        C: Consumer<Self::Item>,
45    {
46        bridge(self, consumer)
47    }
48
49    #[inline]
50    fn len(&self) -> usize {
51        self.offset_values_and_sizes.len()
52    }
53
54    #[inline]
55    fn with_producer<CB>(self, callback: CB) -> CB::Output
56    where
57        CB: ProducerCallback<Self::Item>,
58    {
59        callback.callback(ChunkedProducer {
60            first_offset_value: self.first_offset_value,
61            offset_values_and_sizes_producer: self.offset_values_and_sizes,
62            data: self.data,
63        })
64    }
65}
66
67struct ChunkedProducer<I, S> {
68    first_offset_value: usize,
69    offset_values_and_sizes_producer: I,
70    data: S,
71}
72
73impl<I, S> Producer for ChunkedProducer<I, S>
74where
75    S: Send + SplitAt + Dummy + Set,
76    I: Send + GetOffset + Producer<Item = (usize, usize)>,
77    I::IntoIter: ExactSizeIterator<Item = (usize, usize)> + GetOffset,
78{
79    type Item = S;
80    type IntoIter = ChunkedIter<I::IntoIter, S>;
81
82    #[inline]
83    fn into_iter(self) -> Self::IntoIter {
84        ChunkedIter {
85            first_offset_value: self.first_offset_value,
86            offset_values_and_sizes: self.offset_values_and_sizes_producer.into_iter(),
87            data: self.data,
88        }
89    }
90
91    #[inline]
92    fn split_at(self, index: usize) -> (Self, Self) {
93        let off = self.offset_values_and_sizes_producer.offset_value(index);
94        let (ls, rs) = self.offset_values_and_sizes_producer.split_at(index);
95        let (l, r) = self.data.split_at(off - self.first_offset_value);
96        (
97            ChunkedProducer {
98                first_offset_value: self.first_offset_value,
99                offset_values_and_sizes_producer: ls,
100                data: l,
101            },
102            ChunkedProducer {
103                first_offset_value: off,
104                offset_values_and_sizes_producer: rs,
105                data: r,
106            },
107        )
108    }
109}
110
111impl<'a, S, O> Chunked<S, O>
112where
113    S: View<'a>,
114    O: View<'a>,
115    O::Type: IntoParOffsetValuesAndSizes + GetOffset,
116{
117    /// Produce a parallel iterator over elements (borrowed slices) of a `Chunked`.
118    #[inline]
119    pub fn par_iter(
120        &'a self,
121    ) -> ChunkedParIter<
122        <<O as View<'a>>::Type as IntoParOffsetValuesAndSizes>::ParIter,
123        <S as View<'a>>::Type,
124    > {
125        ChunkedParIter {
126            first_offset_value: self.chunks.view().first_offset_value(),
127            offset_values_and_sizes: self.chunks.view().into_par_offset_values_and_sizes(),
128            data: self.data.view(),
129        }
130    }
131}
132
133impl<'a, S, O> Chunked<S, O>
134where
135    S: ViewMut<'a>,
136    O: View<'a>,
137    O::Type: IntoParOffsetValuesAndSizes + GetOffset,
138{
139    /// Produce a parallel iterator over elements (borrowed slices) of a `Chunked`.
140    #[inline]
141    pub fn par_iter_mut(
142        &'a mut self,
143    ) -> ChunkedParIter<
144        <<O as View<'a>>::Type as IntoParOffsetValuesAndSizes>::ParIter,
145        <S as ViewMut<'a>>::Type,
146    > {
147        ChunkedParIter {
148            first_offset_value: self.chunks.view().first_offset_value(),
149            offset_values_and_sizes: self.chunks.view().into_par_offset_values_and_sizes(),
150            data: self.data.view_mut(),
151        }
152    }
153}
154
155impl<S, O> IntoParallelIterator for Chunked<S, O>
156where
157    O: IntoParOffsetValuesAndSizes + GetOffset,
158    S: Send + SplitAt + Set + Dummy,
159    O::ParIter: Producer<Item = (usize, usize)> + GetOffset,
160    <O::ParIter as Producer>::IntoIter: GetOffset,
161{
162    type Item = S;
163    type Iter = ChunkedParIter<O::ParIter, S>;
164
165    #[inline]
166    fn into_par_iter(self) -> Self::Iter {
167        ChunkedParIter {
168            first_offset_value: self.chunks.first_offset_value(),
169            offset_values_and_sizes: self.chunks.into_par_offset_values_and_sizes(),
170            data: self.data,
171        }
172    }
173}
174
175impl<S: IntoParChunkIterator, N: Dimension> IntoParallelIterator for UniChunked<S, N> {
176    type Item = <S as IntoParChunkIterator>::Item;
177    type Iter = <S as IntoParChunkIterator>::IterType;
178
179    /// Convert a `UniChunked` collection into a parallel iterator over grouped elements.
180    #[inline]
181    fn into_par_iter(self) -> Self::Iter {
182        self.data.into_par_chunk_iter(self.chunk_size.value())
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn chunked_par() {
192        let vecs = vec![vec![0, 1, 2, 3, 4], vec![5, 6], vec![7, 8]];
193        let mut chunked = Chunked::<Vec<_>>::from_nested_vec(vecs.clone());
194        let mut view_mut = chunked.view_mut();
195        view_mut.par_iter_mut().for_each(|a| {
196            for x in a {
197                *x += 1;
198            }
199        });
200
201        chunked.view().par_iter().zip(vecs.par_iter()).for_each(
202            |(a, b): (&[usize], &Vec<usize>)| {
203                for (&x, y) in a.iter().zip(b.iter()) {
204                    assert_eq!(x, y + 1);
205                }
206            },
207        );
208    }
209}