rayon_hash/par/
table.rs

1/// Rayon extensions to `RawTable`
2use std::marker;
3use std::ptr;
4
5use rayon::iter::plumbing::*;
6use rayon::prelude::*;
7
8use crate::std_hash::table::{RawBucket, RawTable};
9
10struct SplitBuckets<'a, K, V> {
11    bucket: RawBucket<K, V>,
12    end: usize,
13    marker: marker::PhantomData<&'a ()>,
14}
15
16impl<'a, K, V> SplitBuckets<'a, K, V> {
17    fn new(table: &'a RawTable<K, V>) -> Self {
18        SplitBuckets {
19            bucket: table.raw_bucket_at(0),
20            end: table.capacity(),
21            marker: marker::PhantomData,
22        }
23    }
24
25    fn split<P: From<Self>>(&self) -> (P, Option<P>) {
26        let mut left = SplitBuckets { ..*self };
27        let len = left.end - left.bucket.index();
28        if len > 1 {
29            let mut right = SplitBuckets { ..left };
30            right.bucket.index_add(len / 2);
31            left.end = right.bucket.index();
32            (P::from(left), Some(P::from(right)))
33        } else {
34            (P::from(left), None)
35        }
36    }
37}
38
39impl<'a, K, V> Iterator for SplitBuckets<'a, K, V> {
40    type Item = RawBucket<K, V>;
41
42    fn next(&mut self) -> Option<Self::Item> {
43        while self.bucket.index() < self.end {
44            let item = self.bucket;
45            self.bucket.index_add(1);
46
47            unsafe {
48                if !item.is_empty() {
49                    return Some(item);
50                }
51            }
52        }
53        None
54    }
55}
56
57/// Parallel iterator over shared references to entries in a map.
58pub struct ParIter<'a, K: 'a, V: 'a> {
59    table: &'a RawTable<K, V>,
60}
61
62impl<'a, K: Sync, V: Sync> IntoParallelIterator for &'a RawTable<K, V> {
63    type Item = (&'a K, &'a V);
64    type Iter = ParIter<'a, K, V>;
65
66    fn into_par_iter(self) -> Self::Iter {
67        ParIter { table: self }
68    }
69}
70
71impl<'a, K: Sync, V: Sync> ParallelIterator for ParIter<'a, K, V> {
72    type Item = (&'a K, &'a V);
73
74    fn drive_unindexed<C>(self, consumer: C) -> C::Result
75    where
76        C: UnindexedConsumer<Self::Item>,
77    {
78        let buckets = SplitBuckets::new(self.table);
79        let producer = ParIterProducer::from(buckets);
80        bridge_unindexed(producer, consumer)
81    }
82}
83
84struct ParIterProducer<'a, K: 'a, V: 'a> {
85    iter: SplitBuckets<'a, K, V>,
86}
87
88impl<'a, K, V> From<SplitBuckets<'a, K, V>> for ParIterProducer<'a, K, V> {
89    fn from(iter: SplitBuckets<'a, K, V>) -> Self {
90        Self { iter }
91    }
92}
93
94unsafe impl<'a, K: Sync, V: Sync> Send for ParIterProducer<'a, K, V> {}
95
96impl<'a, K: Sync, V: Sync> UnindexedProducer for ParIterProducer<'a, K, V> {
97    type Item = (&'a K, &'a V);
98
99    fn split(self) -> (Self, Option<Self>) {
100        self.iter.split()
101    }
102
103    fn fold_with<F>(self, folder: F) -> F
104    where
105        F: Folder<Self::Item>,
106    {
107        let iter = self.iter.map(|bucket| unsafe {
108            let pair_ptr = bucket.pair();
109            (&(*pair_ptr).0, &(*pair_ptr).1)
110        });
111        folder.consume_iter(iter)
112    }
113}
114
115/// Parallel iterator over shared references to keys in a map.
116pub struct ParKeys<'a, K: 'a, V: 'a> {
117    table: &'a RawTable<K, V>,
118}
119
120unsafe impl<'a, K: Sync, V> Send for ParKeys<'a, K, V> {}
121
122impl<K: Sync, V> RawTable<K, V> {
123    pub fn par_keys(&self) -> ParKeys<K, V> {
124        ParKeys { table: self }
125    }
126}
127
128impl<'a, K: Sync, V> ParallelIterator for ParKeys<'a, K, V> {
129    type Item = &'a K;
130
131    fn drive_unindexed<C>(self, consumer: C) -> C::Result
132    where
133        C: UnindexedConsumer<Self::Item>,
134    {
135        let buckets = SplitBuckets::new(self.table);
136        let producer = ParKeysProducer::from(buckets);
137        bridge_unindexed(producer, consumer)
138    }
139}
140
141struct ParKeysProducer<'a, K: 'a, V: 'a> {
142    iter: SplitBuckets<'a, K, V>,
143}
144
145impl<'a, K, V> From<SplitBuckets<'a, K, V>> for ParKeysProducer<'a, K, V> {
146    fn from(iter: SplitBuckets<'a, K, V>) -> Self {
147        Self { iter }
148    }
149}
150
151unsafe impl<'a, K: Sync, V> Send for ParKeysProducer<'a, K, V> {}
152
153impl<'a, K: Sync, V> UnindexedProducer for ParKeysProducer<'a, K, V> {
154    type Item = &'a K;
155
156    fn split(self) -> (Self, Option<Self>) {
157        self.iter.split()
158    }
159
160    fn fold_with<F>(self, folder: F) -> F
161    where
162        F: Folder<Self::Item>,
163    {
164        let iter = self.iter.map(|bucket| unsafe {
165            let pair_ptr = bucket.pair();
166            &(*pair_ptr).0
167        });
168        folder.consume_iter(iter)
169    }
170}
171
172/// Parallel iterator over shared references to values in a map.
173pub struct ParValues<'a, K: 'a, V: 'a> {
174    table: &'a RawTable<K, V>,
175}
176
177unsafe impl<'a, K, V: Sync> Send for ParValues<'a, K, V> {}
178
179impl<K, V: Sync> RawTable<K, V> {
180    pub fn par_values(&self) -> ParValues<K, V> {
181        ParValues { table: self }
182    }
183}
184
185impl<'a, K, V: Sync> ParallelIterator for ParValues<'a, K, V> {
186    type Item = &'a V;
187
188    fn drive_unindexed<C>(self, consumer: C) -> C::Result
189    where
190        C: UnindexedConsumer<Self::Item>,
191    {
192        let buckets = SplitBuckets::new(self.table);
193        let producer = ParValuesProducer::from(buckets);
194        bridge_unindexed(producer, consumer)
195    }
196}
197
198struct ParValuesProducer<'a, K: 'a, V: 'a> {
199    iter: SplitBuckets<'a, K, V>,
200}
201
202impl<'a, K, V> From<SplitBuckets<'a, K, V>> for ParValuesProducer<'a, K, V> {
203    fn from(iter: SplitBuckets<'a, K, V>) -> Self {
204        Self { iter }
205    }
206}
207
208unsafe impl<'a, K, V: Sync> Send for ParValuesProducer<'a, K, V> {}
209
210impl<'a, K, V: Sync> UnindexedProducer for ParValuesProducer<'a, K, V> {
211    type Item = &'a V;
212
213    fn split(self) -> (Self, Option<Self>) {
214        self.iter.split()
215    }
216
217    fn fold_with<F>(self, folder: F) -> F
218    where
219        F: Folder<Self::Item>,
220    {
221        let iter = self.iter.map(|bucket| unsafe {
222            let pair_ptr = bucket.pair();
223            &(*pair_ptr).1
224        });
225        folder.consume_iter(iter)
226    }
227}
228
229/// Parallel iterator over mutable references to entries in a map.
230pub struct ParIterMut<'a, K: 'a, V: 'a> {
231    table: &'a mut RawTable<K, V>,
232}
233
234unsafe impl<'a, K: Sync, V: Send> Send for ParIterMut<'a, K, V> {}
235
236impl<'a, K: Sync, V: Send> IntoParallelIterator for &'a mut RawTable<K, V> {
237    type Item = (&'a K, &'a mut V);
238    type Iter = ParIterMut<'a, K, V>;
239
240    fn into_par_iter(self) -> Self::Iter {
241        ParIterMut { table: self }
242    }
243}
244
245impl<'a, K: Sync, V: Send> ParallelIterator for ParIterMut<'a, K, V> {
246    type Item = (&'a K, &'a mut V);
247
248    fn drive_unindexed<C>(self, consumer: C) -> C::Result
249    where
250        C: UnindexedConsumer<Self::Item>,
251    {
252        let buckets = SplitBuckets::new(self.table);
253        let producer = ParIterMutProducer::from(buckets);
254        bridge_unindexed(producer, consumer)
255    }
256}
257
258struct ParIterMutProducer<'a, K: 'a, V: 'a> {
259    iter: SplitBuckets<'a, K, V>,
260    // To ensure invariance with respect to V
261    marker: marker::PhantomData<&'a mut V>,
262}
263
264impl<'a, K, V> From<SplitBuckets<'a, K, V>> for ParIterMutProducer<'a, K, V> {
265    fn from(iter: SplitBuckets<'a, K, V>) -> Self {
266        Self {
267            iter,
268            marker: marker::PhantomData,
269        }
270    }
271}
272
273unsafe impl<'a, K: Sync, V: Send> Send for ParIterMutProducer<'a, K, V> {}
274
275impl<'a, K: Sync, V: Send> UnindexedProducer for ParIterMutProducer<'a, K, V> {
276    type Item = (&'a K, &'a mut V);
277
278    fn split(self) -> (Self, Option<Self>) {
279        self.iter.split()
280    }
281
282    fn fold_with<F>(self, folder: F) -> F
283    where
284        F: Folder<Self::Item>,
285    {
286        let iter = self.iter.map(|bucket| unsafe {
287            let pair_ptr = bucket.pair();
288            (&(*pair_ptr).0, &mut (*pair_ptr).1)
289        });
290        folder.consume_iter(iter)
291    }
292}
293
294/// Parallel iterator over mutable references to values in a map.
295pub struct ParValuesMut<'a, K: 'a, V: 'a> {
296    table: &'a mut RawTable<K, V>,
297}
298
299unsafe impl<'a, K, V: Send> Send for ParValuesMut<'a, K, V> {}
300
301impl<K, V: Send> RawTable<K, V> {
302    pub fn par_values_mut(&mut self) -> ParValuesMut<K, V> {
303        ParValuesMut { table: self }
304    }
305}
306
307impl<'a, K, V: Send> ParallelIterator for ParValuesMut<'a, K, V> {
308    type Item = &'a mut V;
309
310    fn drive_unindexed<C>(self, consumer: C) -> C::Result
311    where
312        C: UnindexedConsumer<Self::Item>,
313    {
314        let buckets = SplitBuckets::new(self.table);
315        let producer = ParValuesMutProducer::from(buckets);
316        bridge_unindexed(producer, consumer)
317    }
318}
319
320struct ParValuesMutProducer<'a, K: 'a, V: 'a> {
321    iter: SplitBuckets<'a, K, V>,
322    // To ensure invariance with respect to V
323    marker: marker::PhantomData<&'a mut V>,
324}
325
326impl<'a, K, V> From<SplitBuckets<'a, K, V>> for ParValuesMutProducer<'a, K, V> {
327    fn from(iter: SplitBuckets<'a, K, V>) -> Self {
328        Self {
329            iter,
330            marker: marker::PhantomData,
331        }
332    }
333}
334
335unsafe impl<'a, K, V: Send> Send for ParValuesMutProducer<'a, K, V> {}
336
337impl<'a, K, V: Send> UnindexedProducer for ParValuesMutProducer<'a, K, V> {
338    type Item = &'a mut V;
339
340    fn split(self) -> (Self, Option<Self>) {
341        self.iter.split()
342    }
343
344    fn fold_with<F>(self, folder: F) -> F
345    where
346        F: Folder<Self::Item>,
347    {
348        let iter = self.iter.map(|bucket| unsafe {
349            let pair_ptr = bucket.pair();
350            &mut (*pair_ptr).1
351        });
352        folder.consume_iter(iter)
353    }
354}
355
356/// Parallel iterator over the entries in a map, consuming it.
357pub struct ParIntoIter<K, V> {
358    table: RawTable<K, V>,
359}
360
361impl<K: Send, V: Send> IntoParallelIterator for RawTable<K, V> {
362    type Item = (K, V);
363    type Iter = ParIntoIter<K, V>;
364
365    fn into_par_iter(self) -> Self::Iter {
366        ParIntoIter { table: self }
367    }
368}
369
370impl<K: Send, V: Send> ParallelIterator for ParIntoIter<K, V> {
371    type Item = (K, V);
372
373    fn drive_unindexed<C>(self, consumer: C) -> C::Result
374    where
375        C: UnindexedConsumer<Self::Item>,
376    {
377        // Pre-set the map size to zero, indicating all items drained.
378        let mut table = self.table;
379        unsafe {
380            table.set_size(0);
381        }
382
383        let buckets = SplitBuckets::new(&table);
384        let producer = ParIntoIterProducer::from(buckets);
385        bridge_unindexed(producer, consumer)
386    }
387}
388
389struct ParIntoIterProducer<'a, K: 'a, V: 'a> {
390    iter: SplitBuckets<'a, K, V>,
391}
392
393impl<'a, K, V> From<SplitBuckets<'a, K, V>> for ParIntoIterProducer<'a, K, V> {
394    fn from(iter: SplitBuckets<'a, K, V>) -> Self {
395        Self { iter }
396    }
397}
398
399unsafe impl<'a, K: Send, V: Send> Send for ParIntoIterProducer<'a, K, V> {}
400
401impl<'a, K: Send, V: Send> UnindexedProducer for ParIntoIterProducer<'a, K, V> {
402    type Item = (K, V);
403
404    fn split(mut self) -> (Self, Option<Self>) {
405        // We must not drop self yet!
406        let (left, right) = self.iter.split();
407        self.iter = left;
408        (self, right.map(Self::from))
409    }
410
411    fn fold_with<F>(mut self, folder: F) -> F
412    where
413        F: Folder<Self::Item>,
414    {
415        let iter = self.iter.by_ref().map(|bucket| unsafe {
416            bucket.set_empty();
417            ptr::read(bucket.pair())
418        });
419        folder.consume_iter(iter)
420    }
421}
422
423impl<'a, K: 'a, V: 'a> Drop for ParIntoIterProducer<'a, K, V> {
424    fn drop(&mut self) {
425        while let Some(bucket) = self.iter.next() {
426            unsafe {
427                bucket.set_empty();
428                ptr::drop_in_place(bucket.pair());
429            }
430        }
431    }
432}