hydra_dashmap/rayon/
map.rs

1use crate::lock::RwLock;
2use crate::mapref::multiple::{RefMulti, RefMutMulti};
3use crate::util;
4use crate::{DashMap, HashMap};
5use core::hash::{BuildHasher, Hash};
6use rayon::iter::plumbing::UnindexedConsumer;
7use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
8use std::collections::hash_map::RandomState;
9use std::sync::Arc;
10
11impl<K, V, S> ParallelExtend<(K, V)> for DashMap<K, V, S>
12where
13    K: Send + Sync + Eq + Hash,
14    V: Send + Sync,
15    S: Send + Sync + Clone + BuildHasher,
16{
17    fn par_extend<I>(&mut self, par_iter: I)
18    where
19        I: IntoParallelIterator<Item = (K, V)>,
20    {
21        (&*self).par_extend(par_iter);
22    }
23}
24
25// Since we don't actually need mutability, we can implement this on a
26// reference, similar to `io::Write for &File`.
27impl<K, V, S> ParallelExtend<(K, V)> for &'_ DashMap<K, V, S>
28where
29    K: Send + Sync + Eq + Hash,
30    V: Send + Sync,
31    S: Send + Sync + Clone + BuildHasher,
32{
33    fn par_extend<I>(&mut self, par_iter: I)
34    where
35        I: IntoParallelIterator<Item = (K, V)>,
36    {
37        let &mut map = self;
38        par_iter.into_par_iter().for_each(move |(key, value)| {
39            map.insert(key, value);
40        });
41    }
42}
43
44impl<K, V, S> FromParallelIterator<(K, V)> for DashMap<K, V, S>
45where
46    K: Send + Sync + Eq + Hash,
47    V: Send + Sync,
48    S: Send + Sync + Clone + Default + BuildHasher,
49{
50    fn from_par_iter<I>(par_iter: I) -> Self
51    where
52        I: IntoParallelIterator<Item = (K, V)>,
53    {
54        let map = Self::default();
55        (&map).par_extend(par_iter);
56        map
57    }
58}
59
60// Implementation note: while the shards will iterate in parallel, we flatten
61// sequentially within each shard (`flat_map_iter`), because the standard
62// `HashMap` only implements `ParallelIterator` by collecting to a `Vec` first.
63// There is real parallel support in the `hashbrown/rayon` feature, but we don't
64// always use that map.
65
66impl<K, V, S> IntoParallelIterator for DashMap<K, V, S>
67where
68    K: Send + Eq + Hash,
69    V: Send,
70    S: Send + Clone + BuildHasher,
71{
72    type Iter = OwningIter<K, V, S>;
73    type Item = (K, V);
74
75    fn into_par_iter(self) -> Self::Iter {
76        OwningIter {
77            shards: self.shards,
78        }
79    }
80}
81
82pub struct OwningIter<K, V, S = RandomState> {
83    pub(super) shards: Box<[RwLock<HashMap<K, V, S>>]>,
84}
85
86impl<K, V, S> ParallelIterator for OwningIter<K, V, S>
87where
88    K: Send + Eq + Hash,
89    V: Send,
90    S: Send + Clone + BuildHasher,
91{
92    type Item = (K, V);
93
94    fn drive_unindexed<C>(self, consumer: C) -> C::Result
95    where
96        C: UnindexedConsumer<Self::Item>,
97    {
98        Vec::from(self.shards)
99            .into_par_iter()
100            .flat_map_iter(|shard| {
101                shard
102                    .into_inner()
103                    .into_iter()
104                    .map(|(k, v)| (k, v.into_inner()))
105            })
106            .drive_unindexed(consumer)
107    }
108}
109
110// This impl also enables `IntoParallelRefIterator::par_iter`
111impl<'a, K, V, S> IntoParallelIterator for &'a DashMap<K, V, S>
112where
113    K: Send + Sync + Eq + Hash,
114    V: Send + Sync,
115    S: Send + Sync + Clone + BuildHasher,
116{
117    type Iter = Iter<'a, K, V, S>;
118    type Item = RefMulti<'a, K, V, S>;
119
120    fn into_par_iter(self) -> Self::Iter {
121        Iter {
122            shards: &self.shards,
123        }
124    }
125}
126
127pub struct Iter<'a, K, V, S = RandomState> {
128    pub(super) shards: &'a [RwLock<HashMap<K, V, S>>],
129}
130
131impl<'a, K, V, S> ParallelIterator for Iter<'a, K, V, S>
132where
133    K: Send + Sync + Eq + Hash,
134    V: Send + Sync,
135    S: Send + Sync + Clone + BuildHasher,
136{
137    type Item = RefMulti<'a, K, V, S>;
138
139    fn drive_unindexed<C>(self, consumer: C) -> C::Result
140    where
141        C: UnindexedConsumer<Self::Item>,
142    {
143        self.shards
144            .into_par_iter()
145            .flat_map_iter(|shard| {
146                let guard = shard.read();
147                let sref: &'a HashMap<K, V, S> = unsafe { util::change_lifetime_const(&*guard) };
148
149                let guard = Arc::new(guard);
150                sref.iter().map(move |(k, v)| {
151                    let guard = Arc::clone(&guard);
152                    unsafe { RefMulti::new(guard, k, v.get()) }
153                })
154            })
155            .drive_unindexed(consumer)
156    }
157}
158
159// This impl also enables `IntoParallelRefMutIterator::par_iter_mut`
160impl<'a, K, V, S> IntoParallelIterator for &'a mut DashMap<K, V, S>
161where
162    K: Send + Sync + Eq + Hash,
163    V: Send + Sync,
164    S: Send + Sync + Clone + BuildHasher,
165{
166    type Iter = IterMut<'a, K, V, S>;
167    type Item = RefMutMulti<'a, K, V, S>;
168
169    fn into_par_iter(self) -> Self::Iter {
170        IterMut {
171            shards: &self.shards,
172        }
173    }
174}
175
176impl<K, V, S> DashMap<K, V, S>
177where
178    K: Send + Sync + Eq + Hash,
179    V: Send + Sync,
180    S: Send + Sync + Clone + BuildHasher,
181{
182    // Unlike `IntoParallelRefMutIterator::par_iter_mut`, we only _need_ `&self`.
183    pub fn par_iter_mut(&self) -> IterMut<'_, K, V, S> {
184        IterMut {
185            shards: &self.shards,
186        }
187    }
188}
189
190pub struct IterMut<'a, K, V, S = RandomState> {
191    shards: &'a [RwLock<HashMap<K, V, S>>],
192}
193
194impl<'a, K, V, S> ParallelIterator for IterMut<'a, K, V, S>
195where
196    K: Send + Sync + Eq + Hash,
197    V: Send + Sync,
198    S: Send + Sync + Clone + BuildHasher,
199{
200    type Item = RefMutMulti<'a, K, V, S>;
201
202    fn drive_unindexed<C>(self, consumer: C) -> C::Result
203    where
204        C: UnindexedConsumer<Self::Item>,
205    {
206        self.shards
207            .into_par_iter()
208            .flat_map_iter(|shard| {
209                let mut guard = shard.write();
210                let sref: &'a mut HashMap<K, V, S> =
211                    unsafe { util::change_lifetime_mut(&mut *guard) };
212
213                let guard = Arc::new(guard);
214                sref.iter_mut().map(move |(k, v)| {
215                    let guard = Arc::clone(&guard);
216                    unsafe { RefMutMulti::new(guard, k, v.get_mut()) }
217                })
218            })
219            .drive_unindexed(consumer)
220    }
221}