1use super::*;
2use rayon::iter::plumbing::*;
3use rayon::prelude::*;
4
5#[derive(Copy, Clone, Debug, PartialEq)]
7pub struct ChunkedParIter<I, S> {
8 first_offset_value: usize,
9 offset_values_and_sizes: I,
10 data: S,
11}
12
13impl<I, S> ParallelIterator for ChunkedParIter<I, S>
14where
15 S: Send + SplitAt + Dummy + Set,
16 I: Send + GetOffset + IndexedParallelIterator + Producer<Item = (usize, usize)>,
17 I::IntoIter: ExactSizeIterator<Item = (usize, usize)> + GetOffset,
18{
19 type Item = S;
20
21 #[inline]
22 fn drive_unindexed<C>(self, consumer: C) -> C::Result
23 where
24 C: UnindexedConsumer<Self::Item>,
25 {
26 bridge(self, consumer)
27 }
28
29 #[inline]
30 fn opt_len(&self) -> Option<usize> {
31 Some(self.len())
32 }
33}
34
35impl<I, S> IndexedParallelIterator for ChunkedParIter<I, S>
36where
37 S: Send + SplitAt + Dummy + Set,
38 I: Send + GetOffset + IndexedParallelIterator + Producer<Item = (usize, usize)>,
39 I::IntoIter: ExactSizeIterator<Item = (usize, usize)> + GetOffset,
40{
41 #[inline]
42 fn drive<C>(self, consumer: C) -> C::Result
43 where
44 C: Consumer<Self::Item>,
45 {
46 bridge(self, consumer)
47 }
48
49 #[inline]
50 fn len(&self) -> usize {
51 self.offset_values_and_sizes.len()
52 }
53
54 #[inline]
55 fn with_producer<CB>(self, callback: CB) -> CB::Output
56 where
57 CB: ProducerCallback<Self::Item>,
58 {
59 callback.callback(ChunkedProducer {
60 first_offset_value: self.first_offset_value,
61 offset_values_and_sizes_producer: self.offset_values_and_sizes,
62 data: self.data,
63 })
64 }
65}
66
67struct ChunkedProducer<I, S> {
68 first_offset_value: usize,
69 offset_values_and_sizes_producer: I,
70 data: S,
71}
72
73impl<I, S> Producer for ChunkedProducer<I, S>
74where
75 S: Send + SplitAt + Dummy + Set,
76 I: Send + GetOffset + Producer<Item = (usize, usize)>,
77 I::IntoIter: ExactSizeIterator<Item = (usize, usize)> + GetOffset,
78{
79 type Item = S;
80 type IntoIter = ChunkedIter<I::IntoIter, S>;
81
82 #[inline]
83 fn into_iter(self) -> Self::IntoIter {
84 ChunkedIter {
85 first_offset_value: self.first_offset_value,
86 offset_values_and_sizes: self.offset_values_and_sizes_producer.into_iter(),
87 data: self.data,
88 }
89 }
90
91 #[inline]
92 fn split_at(self, index: usize) -> (Self, Self) {
93 let off = self.offset_values_and_sizes_producer.offset_value(index);
94 let (ls, rs) = self.offset_values_and_sizes_producer.split_at(index);
95 let (l, r) = self.data.split_at(off - self.first_offset_value);
96 (
97 ChunkedProducer {
98 first_offset_value: self.first_offset_value,
99 offset_values_and_sizes_producer: ls,
100 data: l,
101 },
102 ChunkedProducer {
103 first_offset_value: off,
104 offset_values_and_sizes_producer: rs,
105 data: r,
106 },
107 )
108 }
109}
110
111impl<'a, S, O> Chunked<S, O>
112where
113 S: View<'a>,
114 O: View<'a>,
115 O::Type: IntoParOffsetValuesAndSizes + GetOffset,
116{
117 #[inline]
119 pub fn par_iter(
120 &'a self,
121 ) -> ChunkedParIter<
122 <<O as View<'a>>::Type as IntoParOffsetValuesAndSizes>::ParIter,
123 <S as View<'a>>::Type,
124 > {
125 ChunkedParIter {
126 first_offset_value: self.chunks.view().first_offset_value(),
127 offset_values_and_sizes: self.chunks.view().into_par_offset_values_and_sizes(),
128 data: self.data.view(),
129 }
130 }
131}
132
133impl<'a, S, O> Chunked<S, O>
134where
135 S: ViewMut<'a>,
136 O: View<'a>,
137 O::Type: IntoParOffsetValuesAndSizes + GetOffset,
138{
139 #[inline]
141 pub fn par_iter_mut(
142 &'a mut self,
143 ) -> ChunkedParIter<
144 <<O as View<'a>>::Type as IntoParOffsetValuesAndSizes>::ParIter,
145 <S as ViewMut<'a>>::Type,
146 > {
147 ChunkedParIter {
148 first_offset_value: self.chunks.view().first_offset_value(),
149 offset_values_and_sizes: self.chunks.view().into_par_offset_values_and_sizes(),
150 data: self.data.view_mut(),
151 }
152 }
153}
154
155impl<S, O> IntoParallelIterator for Chunked<S, O>
156where
157 O: IntoParOffsetValuesAndSizes + GetOffset,
158 S: Send + SplitAt + Set + Dummy,
159 O::ParIter: Producer<Item = (usize, usize)> + GetOffset,
160 <O::ParIter as Producer>::IntoIter: GetOffset,
161{
162 type Item = S;
163 type Iter = ChunkedParIter<O::ParIter, S>;
164
165 #[inline]
166 fn into_par_iter(self) -> Self::Iter {
167 ChunkedParIter {
168 first_offset_value: self.chunks.first_offset_value(),
169 offset_values_and_sizes: self.chunks.into_par_offset_values_and_sizes(),
170 data: self.data,
171 }
172 }
173}
174
175impl<S: IntoParChunkIterator, N: Dimension> IntoParallelIterator for UniChunked<S, N> {
176 type Item = <S as IntoParChunkIterator>::Item;
177 type Iter = <S as IntoParChunkIterator>::IterType;
178
179 #[inline]
181 fn into_par_iter(self) -> Self::Iter {
182 self.data.into_par_chunk_iter(self.chunk_size.value())
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn chunked_par() {
192 let vecs = vec![vec![0, 1, 2, 3, 4], vec![5, 6], vec![7, 8]];
193 let mut chunked = Chunked::<Vec<_>>::from_nested_vec(vecs.clone());
194 let mut view_mut = chunked.view_mut();
195 view_mut.par_iter_mut().for_each(|a| {
196 for x in a {
197 *x += 1;
198 }
199 });
200
201 chunked.view().par_iter().zip(vecs.par_iter()).for_each(
202 |(a, b): (&[usize], &Vec<usize>)| {
203 for (&x, y) in a.iter().zip(b.iter()) {
204 assert_eq!(x, y + 1);
205 }
206 },
207 );
208 }
209}