chashmap_async/lib.rs
1//! Concurrent hash maps.
2//!
3//! This crate implements concurrent hash maps, based on bucket-level multi-reader locks. It has
4//! excellent performance characteristics¹ and supports resizing, in-place mutation and more.
5//!
6//! The API derives directly from `std::collections::HashMap`, giving it a familiar feel.
7//!
8//! ¹Note that it heavily depends on the behavior of your program, but in most cases, it's really
9//! good. In some (rare) cases you might want atomic hash maps instead.
10//!
11//! # How it works
12//!
13//! `chashmap` is not lockless, but it distributes locks across the map such that lock contentions
14//! (which is what could make accesses expensive) are very rare.
15//!
16//! Hash maps consists of so called "buckets", which each defines a potential entry in the table.
17//! The bucket of some key-value pair is determined by the hash of the key. By holding a read-write
18//! lock for each bucket, we ensure that you will generally be able to insert, read, modify, etc.
19//! with only one or two locking subroutines.
20//!
21//! There is a special-case: reallocation. When the table is filled up such that very few buckets
22//! are free (note that this is "very few" and not "no", since the load factor shouldn't get too
23//! high as it hurts performance), a global lock is obtained while rehashing the table. This is
24//! pretty inefficient, but it rarely happens, and due to the adaptive nature of the capacity, it
25//! will only happen a few times when the map has just been initialized.
26//!
27//! ## Collision resolution
28//!
29//! When two hashes collide, they cannot share the same bucket, so there must be an algorithm which
30//! can resolve collisions. In our case, we use linear probing, which means that we take the bucket
31//! following it, and repeat until we find a free bucket.
32//!
33//! This method is far from ideal, but superior methods like Robin-Hood hashing works poorly (if at
34//! all) in a concurrent structure.
35//!
36//! # The API
37//!
38//! The API should feel very familiar, if you are used to the libstd hash map implementation. They
39//! share many of the methods, and I've carefully made sure that all the items, which have similarly
40//! named items in libstd, matches in semantics and behavior.
41
42#[cfg(test)]
43mod tests;
44
45use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
46use futures::{stream, stream::StreamExt};
47use owning_ref::{OwningHandle, OwningRef};
48use stable_deref_trait::StableDeref;
49use std::borrow::Borrow;
50use std::collections::hash_map::RandomState;
51use std::future::Future;
52use std::hash::{BuildHasher, Hash, Hasher};
53use std::sync::atomic::{self, AtomicUsize};
54use std::{cmp, fmt, iter, mem, ops};
55
56/// The atomic ordering used throughout the code.
57const ORDERING: atomic::Ordering = atomic::Ordering::Relaxed;
58/// The length-to-capacity factor.
59const LENGTH_MULTIPLIER: usize = 4;
60/// The maximal load factor's numerator.
61const MAX_LOAD_FACTOR_NUM: usize = 100 - 15;
62/// The maximal load factor's denominator.
63const MAX_LOAD_FACTOR_DENOM: usize = 100;
64/// The default initial capacity.
65const DEFAULT_INITIAL_CAPACITY: usize = 64;
66/// The lowest capacity a table can have.
67const MINIMUM_CAPACITY: usize = 8;
68
69/// A bucket state.
70///
71/// Buckets are the bricks of hash tables. They represent a single entry into the table.
72#[derive(Clone)]
73enum Bucket<K, V> {
74 /// The bucket contains a key-value pair.
75 Contains(K, V),
76 /// The bucket is empty and has never been used.
77 ///
78 /// Since hash collisions are resolved by jumping to the next bucket, some buckets can cluster
79 /// together, meaning that they are potential candidates for lookups. Empty buckets can be seen
80 /// as the delimiter of such cluters.
81 Empty,
82 /// The bucket was removed.
83 ///
84 /// The technique of distincting between "empty" and "removed" was first described by Knuth.
85 /// The idea is that when you search for a key, you will probe over these buckets, since the
86 /// key could have been pushed behind the removed element:
87 ///```notest
88 /// Contains(k1, v1) // hash = h
89 /// Removed
90 /// Contains(k2, v2) // hash = h
91 ///```
92 /// If we stopped at `Removed`, we won't be able to find the second KV pair. So `Removed` is
93 /// semantically different from `Empty`, as the search won't stop.
94 ///
95 /// However, we are still able to insert new pairs at the removed buckets.
96 Removed,
97}
98
99impl<K, V> Bucket<K, V> {
100 /// Is this bucket 'empty'?
101 fn is_empty(&self) -> bool {
102 matches!(*self, Bucket::Empty)
103 }
104
105 /// Is this bucket 'removed'?
106 fn is_removed(&self) -> bool {
107 matches!(*self, Bucket::Removed)
108 }
109
110 /// Is this bucket free?
111 ///
112 /// "Free" means that it can safely be replace by another bucket — namely that the bucket is
113 /// not occupied.
114 fn is_free(&self) -> bool {
115 match *self {
116 // The two replacable bucket types are removed buckets and empty buckets.
117 Bucket::Removed | Bucket::Empty => true,
118 // KV pairs can't be replaced as they contain data.
119 Bucket::Contains(..) => false,
120 }
121 }
122
123 /// Get the value (if any) of this bucket.
124 ///
125 /// This gets the value of the KV pair, if any. If the bucket is not a KV pair, `None` is
126 /// returned.
127 fn value(self) -> Option<V> {
128 if let Bucket::Contains(_, val) = self {
129 Some(val)
130 } else {
131 None
132 }
133 }
134
135 /// Get a reference to the value of the bucket (if any).
136 ///
137 /// This returns a reference to the value of the bucket, if it is a KV pair. If not, it will
138 /// return `None`.
139 ///
140 /// Rather than `Option`, it returns a `Result`, in order to make it easier to work with the
141 /// `owning_ref` crate (`try_new` and `try_map` of `OwningHandle` and `OwningRef`
142 /// respectively).
143 fn value_ref(&self) -> Result<&V, ()> {
144 if let Bucket::Contains(_, ref val) = *self {
145 Ok(val)
146 } else {
147 Err(())
148 }
149 }
150
151 /// Does the bucket match a given key?
152 ///
153 /// This returns `true` if the bucket is a KV pair with key `key`. If not, `false` is returned.
154 fn key_matches(&self, key: &K) -> bool
155 where
156 K: PartialEq,
157 {
158 if let Bucket::Contains(ref candidate_key, _) = *self {
159 // Check if the keys matches.
160 candidate_key == key
161 } else {
162 // The bucket isn't a KV pair, so we'll return false, since there is no key to test
163 // against.
164 false
165 }
166 }
167}
168
169/// The low-level representation of the hash table.
170///
171/// This is different from `CHashMap` in two ways:
172///
173/// 1. It is not wrapped in a lock, meaning that resizing and reallocation is not possible.
174/// 2. It does not track the number of occupied buckets, making it expensive to obtain the load
175/// factor.
176struct Table<K, V, S> {
177 /// The hash function builder.
178 ///
179 /// When a `Table` use the default hash builder, it randomly picks a hash function from
180 /// some family of functions in libstd. This effectively eliminates the issue of hash flooding.
181 hash_builder: S,
182 /// The bucket array.
183 ///
184 /// This vector stores the buckets. The order in which they're stored is far from arbitrary: A
185 /// KV pair `(key, val)`'s first priority location is at `self.hash(&key) % len`. If not
186 /// possible, the next bucket is used, and this process repeats until the bucket is free (or
187 /// the end is reached, in which we simply wrap around).
188 buckets: Vec<RwLock<Bucket<K, V>>>,
189}
190
191impl<K, V> Table<K, V, RandomState> {
192 /// Create a table with a certain number of buckets.
193 fn new(buckets: usize) -> Self {
194 // Fill a vector with `buckets` of `Empty` buckets.
195 let mut vec = Vec::with_capacity(buckets);
196 for _ in 0..buckets {
197 vec.push(RwLock::new(Bucket::Empty));
198 }
199
200 Table {
201 // Generate a hash function.
202 hash_builder: RandomState::new(),
203 buckets: vec,
204 }
205 }
206
207 /// Create a table with at least some capacity.
208 fn with_capacity(cap: usize) -> Self {
209 // The + 1 is needed to avoid losing fractional bucket to integer division.
210 Table::new(cmp::max(
211 MINIMUM_CAPACITY,
212 cap * MAX_LOAD_FACTOR_DENOM / MAX_LOAD_FACTOR_NUM + 1,
213 ))
214 }
215}
216
217impl<K, V, S: BuildHasher> Table<K, V, S> {
218 /// Create a `Table` with the `BuildHasher`
219 fn with_hasher(buckets: usize, hash_builder: S) -> Self {
220 // Fill a vector with `buckets` of `Empty` buckets.
221 let mut vec = Vec::with_capacity(buckets);
222 vec.resize_with(buckets, || RwLock::new(Bucket::Empty));
223
224 Table {
225 hash_builder,
226 buckets: vec,
227 }
228 }
229
230 /// Create a `Table` with a specific capacity and the `BuildHasher`
231 fn with_capacity_and_hasher(cap: usize, hash_builder: S) -> Self {
232 // The + 1 is needed to avoid losing fractional bucket to integer division.
233 Table::with_hasher(
234 cmp::max(
235 MINIMUM_CAPACITY,
236 cap * MAX_LOAD_FACTOR_DENOM / MAX_LOAD_FACTOR_NUM + 1,
237 ),
238 hash_builder,
239 )
240 }
241}
242
243impl<K: PartialEq + Hash, V, S: BuildHasher> Table<K, V, S> {
244 /// Hash some key through the internal hash function.
245 fn hash<T: ?Sized>(&self, key: &T) -> usize
246 where
247 T: Hash,
248 {
249 // Build the initial hash function state.
250 let mut hasher = self.hash_builder.build_hasher();
251 // Hash the key.
252 key.hash(&mut hasher);
253 // Cast to `usize`. Since the hash function returns `u64`, this cast won't ever cause
254 // entropy less than the output space.
255 hasher.finish() as usize
256 }
257
258 /// Scan from the first priority of a key until a match is found.
259 ///
260 /// This scans from the first priority of `key` (as defined by its hash), until a match is
261 /// found (will wrap on end), i.e. `matches` returns `true` with the bucket as argument.
262 ///
263 /// The read guard from the RW-lock of the bucket is returned.
264 async fn scan<F, Q: ?Sized>(&self, key: &Q, matches: F) -> RwLockReadGuard<'_, Bucket<K, V>>
265 where
266 F: Fn(&Bucket<K, V>) -> bool,
267 K: Borrow<Q>,
268 Q: Hash,
269 {
270 // Hash the key.
271 let hash = self.hash(key);
272
273 // Start at the first priority bucket, and then move upwards, searching for the matching
274 // bucket.
275 for i in 0..self.buckets.len() {
276 // Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
277 let lock = self.buckets[(hash + i) % self.buckets.len()].read().await;
278
279 // Check if it is a match.
280 if matches(&lock) {
281 // Yup. Return.
282 return lock;
283 }
284 }
285 panic!("`CHashMap` scan failed! No entry found.");
286 }
287
288 /// Scan from the first priority of a key until a match is found (mutable guard).
289 ///
290 /// This is similar to `scan`, but instead of an immutable lock guard, a mutable lock guard is
291 /// returned.
292 async fn scan_mut<F, Q: ?Sized>(
293 &self,
294 key: &Q,
295 matches: F,
296 ) -> RwLockWriteGuard<'_, Bucket<K, V>>
297 where
298 F: Fn(&Bucket<K, V>) -> bool,
299 K: Borrow<Q>,
300 Q: Hash,
301 {
302 // Hash the key.
303 let hash = self.hash(key);
304
305 // Start at the first priority bucket, and then move upwards, searching for the matching
306 // bucket.
307 for i in 0..self.buckets.len() {
308 // Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
309 let lock = self.buckets[(hash + i) % self.buckets.len()].write().await;
310
311 // Check if it is a match.
312 if matches(&lock) {
313 // Yup. Return.
314 return lock;
315 }
316 }
317 panic!("`CHashMap` scan_mut failed! No entry found.");
318 }
319
320 /// Scan from the first priority of a key until a match is found (bypass locks).
321 ///
322 /// This is similar to `scan_mut`, but it safely bypasses the locks by making use of the
323 /// aliasing invariants of `&mut`.
324 fn scan_mut_no_lock<F>(&mut self, key: &K, matches: F) -> &mut Bucket<K, V>
325 where
326 F: Fn(&Bucket<K, V>) -> bool,
327 {
328 // Hash the key.
329 let hash = self.hash(key);
330 // TODO: To tame the borrowchecker, we fetch this in advance.
331 let len = self.buckets.len();
332
333 // Start at the first priority bucket, and then move upwards, searching for the matching
334 // bucket.
335 for i in 0..self.buckets.len() {
336 // TODO: hacky hacky
337 let idx = (hash + i) % len;
338
339 // Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
340
341 // Check if it is a match.
342 if matches(self.buckets[idx].get_mut()) {
343 // Yup. Return.
344 return self.buckets[idx].get_mut();
345 }
346 }
347 panic!("`CHashMap` scan_mut_no_lock failed! No entry found.");
348 }
349
350 /// Find a bucket with some key, or a free bucket in same cluster.
351 ///
352 /// This scans for buckets with key `key`. If one is found, it will be returned. If none are
353 /// found, it will return a free bucket in the same cluster.
354 async fn lookup_or_free(&self, key: &K) -> RwLockWriteGuard<'_, Bucket<K, V>> {
355 // Hash the key.
356 let hash = self.hash(key);
357 // The encountered free bucket.
358 let mut free = None;
359
360 // Start at the first priority bucket, and then move upwards, searching for the matching
361 // bucket.
362 for i in 0..self.buckets.len() {
363 // Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
364 let lock = self.buckets[(hash + i) % self.buckets.len()].write().await;
365
366 if lock.key_matches(key) {
367 // We found a match.
368 return lock;
369 } else if lock.is_empty() {
370 // The cluster is over. Use the encountered free bucket, if any.
371 return free.unwrap_or(lock);
372 } else if lock.is_removed() && free.is_none() {
373 // We found a free bucket, so we can store it to later (if we don't already have
374 // one).
375 free = Some(lock)
376 }
377 }
378
379 free.expect("No free buckets found")
380 }
381
382 /// Lookup some key.
383 ///
384 /// This searches some key `key`, and returns a immutable lock guard to its bucket. If the key
385 /// couldn't be found, the returned value will be an `Empty` cluster.
386 async fn lookup<Q: ?Sized>(&self, key: &Q) -> RwLockReadGuard<'_, Bucket<K, V>>
387 where
388 K: Borrow<Q>,
389 Q: PartialEq + Hash,
390 {
391 self.scan(key, |x| match *x {
392 // We'll check that the keys does indeed match, as the chance of hash collisions
393 // happening is inevitable
394 Bucket::Contains(ref candidate_key, _) if key.eq(candidate_key.borrow()) => true,
395 // We reached an empty bucket, meaning that there are no more buckets, not even removed
396 // ones, to search.
397 Bucket::Empty => true,
398 _ => false,
399 })
400 .await
401 }
402
403 /// Lookup some key, mutably.
404 ///
405 /// This is similar to `lookup`, but it returns a mutable guard.
406 ///
407 /// Replacing at this bucket is safe as the bucket will be in the same cluster of buckets as
408 /// the first priority cluster.
409 async fn lookup_mut<Q: ?Sized>(&self, key: &Q) -> RwLockWriteGuard<'_, Bucket<K, V>>
410 where
411 K: Borrow<Q>,
412 Q: PartialEq + Hash,
413 {
414 self.scan_mut(key, |x| match *x {
415 // We'll check that the keys does indeed match, as the chance of hash collisions
416 // happening is inevitable
417 Bucket::Contains(ref candidate_key, _) if key.eq(candidate_key.borrow()) => true,
418 // We reached an empty bucket, meaning that there are no more buckets, not even removed
419 // ones, to search.
420 Bucket::Empty => true,
421 _ => false,
422 })
423 .await
424 }
425
426 /// Find a free bucket in the same cluster as some key.
427 ///
428 /// This means that the returned lock guard defines a valid, free bucket, where `key` can be
429 /// inserted.
430 async fn find_free(&self, key: &K) -> RwLockWriteGuard<'_, Bucket<K, V>> {
431 self.scan_mut(key, |x| x.is_free()).await
432 }
433
434 /// Find a free bucket in the same cluster as some key (bypassing locks).
435 ///
436 /// This is similar to `find_free`, except that it safely bypasses locks through the aliasing
437 /// guarantees of `&mut`.
438 fn find_free_no_lock(&mut self, key: &K) -> &mut Bucket<K, V> {
439 self.scan_mut_no_lock(key, |x| x.is_free())
440 }
441
442 /// Fill the table with data from another table.
443 ///
444 /// This is used to efficiently copy the data of `table` into `self`.
445 ///
446 /// # Important
447 ///
448 /// The table should be empty for this to work correctly/logically.
449 fn fill(&mut self, table: Self) {
450 // Run over all the buckets.
451 for i in table.buckets {
452 // We'll only transfer the bucket if it is a KV pair.
453 if let Bucket::Contains(key, val) = i.into_inner() {
454 // Find a bucket where the KV pair can be inserted.
455 // shush clippy, the comments are important
456 #[allow(clippy::match_like_matches_macro)]
457 let bucket = self.scan_mut_no_lock(&key, |x| match *x {
458 // Halt on an empty bucket.
459 Bucket::Empty => true,
460 // We'll assume that the rest of the buckets either contains other KV pairs (in
461 // particular, no buckets have been removed in the newly construct table).
462 _ => false,
463 });
464
465 // Set the bucket to the KV pair.
466 *bucket = Bucket::Contains(key, val);
467 }
468 }
469 }
470}
471
472impl<K: Clone, V: Clone, S: Clone> Table<K, V, S> {
473 /// Clone the struct, bypassing the locks by the statically guaranteed exclusive `&mut` access.
474 /// Not that the map is not actually mutated.
475 pub fn clone_mut(&mut self) -> Self {
476 Table {
477 // Since we copy plainly without rehashing etc., it is important that we keep the same
478 // hash function.
479 hash_builder: self.hash_builder.clone(),
480 // Lock and clone every bucket individually.
481 buckets: self
482 .buckets
483 .iter_mut()
484 .map(|x| RwLock::new(x.get_mut().clone()))
485 .collect(),
486 }
487 }
488
489 /// Clone by accessing keys and values via the locks. That requires this method to be `async`.
490 pub async fn clone_locking(&self) -> Self {
491 Table {
492 // Since we copy plainly without rehashing etc., it is important that we keep the same
493 // hash function.
494 hash_builder: self.hash_builder.clone(),
495 // Lock and clone every bucket individually.
496 buckets: stream::iter(self.buckets.iter())
497 .then(|x| async move { RwLock::new(x.read().await.clone()) })
498 .collect()
499 .await,
500 }
501 }
502}
503
504impl<K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for Table<K, V, S> {
505 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
506 // create a debug map and fill with entries
507 let mut map = f.debug_map();
508 // We'll just run over all buckets and output one after one.
509 for i in &self.buckets {
510 // Acquire the lock.
511 let lock = i.try_read();
512 // Check if the bucket actually contains anything.
513 if let Some(Bucket::Contains(ref key, ref val)) = lock.as_deref() {
514 // add this entry to the map
515 map.entry(key, val);
516 } else {
517 map.entry(&"<locked>", &"<locked>");
518 }
519 }
520 map.finish()
521 }
522}
523
524/// An iterator over the entries of some table.
525pub struct IntoIter<K, V, S> {
526 /// The inner table.
527 table: Table<K, V, S>,
528}
529
530impl<K, V, S> Iterator for IntoIter<K, V, S> {
531 type Item = (K, V);
532
533 fn next(&mut self) -> Option<(K, V)> {
534 // We own the table, and can thus do what we want with it. We'll simply pop from the
535 // buckets until we find a bucket containing data.
536 while let Some(bucket) = self.table.buckets.pop() {
537 // We can bypass dem ebil locks.
538 if let Bucket::Contains(key, val) = bucket.into_inner() {
539 // The bucket contained data, so we'll return the pair.
540 return Some((key, val));
541 }
542 }
543
544 // We've exhausted all the buckets, and no more data could be found.
545 None
546 }
547}
548
549impl<K, V, S> IntoIterator for Table<K, V, S> {
550 type Item = (K, V);
551 type IntoIter = IntoIter<K, V, S>;
552
553 fn into_iter(self) -> Self::IntoIter {
554 IntoIter { table: self }
555 }
556}
557
558/// A RAII guard for reading an entry of a hash map.
559///
560/// This is an access type dereferencing to the inner value of the entry. It will handle unlocking
561/// on drop.
562pub struct ReadGuard<'a, K: 'a, V: 'a, S> {
563 /// The inner hecking long type.
564 #[allow(clippy::type_complexity)]
565 inner: OwningRef<
566 OwningHandle<StableReadGuard<'a, Table<K, V, S>>, StableReadGuard<'a, Bucket<K, V>>>,
567 V,
568 >,
569}
570
571impl<'a, K, V, S> ops::Deref for ReadGuard<'a, K, V, S> {
572 type Target = V;
573
574 fn deref(&self) -> &V {
575 &self.inner
576 }
577}
578
579impl<'a, K, V: PartialEq, S> cmp::PartialEq for ReadGuard<'a, K, V, S> {
580 fn eq(&self, other: &ReadGuard<'a, K, V, S>) -> bool {
581 self == other
582 }
583}
584impl<'a, K, V: Eq, S> cmp::Eq for ReadGuard<'a, K, V, S> {}
585
586impl<'a, K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for ReadGuard<'a, K, V, S> {
587 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
588 write!(f, "ReadGuard({:?})", &**self)
589 }
590}
591
592/// Wrapper type for [`RwLockReadGuard`] that implements [`StableDeref`].
593#[repr(transparent)]
594struct StableReadGuard<'a, T: ?Sized>(RwLockReadGuard<'a, T>);
595
596impl<T: ?Sized> std::ops::Deref for StableReadGuard<'_, T> {
597 type Target = T;
598
599 fn deref(&self) -> &Self::Target {
600 &*self.0
601 }
602}
603
604unsafe impl<T: ?Sized> StableDeref for StableReadGuard<'_, T> {}
605
606impl<T: std::fmt::Debug + ?Sized> std::fmt::Debug for StableReadGuard<'_, T> {
607 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
608 self.0.fmt(f)
609 }
610}
611
612impl<T: std::fmt::Display + ?Sized> std::fmt::Display for StableReadGuard<'_, T> {
613 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
614 self.0.fmt(f)
615 }
616}
617
618/// A mutable RAII guard for reading an entry of a hash map.
619///
620/// This is an access type dereferencing to the inner value of the entry. It will handle unlocking
621/// on drop.
622pub struct WriteGuard<'a, K: 'a, V: 'a, S> {
623 /// The inner hecking long type.
624 #[allow(clippy::type_complexity)]
625 inner: OwningHandle<
626 OwningHandle<StableReadGuard<'a, Table<K, V, S>>, StableWriteGuard<'a, Bucket<K, V>>>,
627 &'a mut V,
628 >,
629}
630
631impl<'a, K, V, S> ops::Deref for WriteGuard<'a, K, V, S> {
632 type Target = V;
633
634 fn deref(&self) -> &V {
635 &self.inner
636 }
637}
638
639impl<'a, K, V, S> ops::DerefMut for WriteGuard<'a, K, V, S> {
640 fn deref_mut(&mut self) -> &mut V {
641 &mut self.inner
642 }
643}
644
645impl<'a, K, V: PartialEq, S> cmp::PartialEq for WriteGuard<'a, K, V, S> {
646 fn eq(&self, other: &Self) -> bool {
647 self == other
648 }
649}
650impl<'a, K, V: Eq, S> cmp::Eq for WriteGuard<'a, K, V, S> {}
651
652impl<'a, K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for WriteGuard<'a, K, V, S> {
653 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
654 write!(f, "WriteGuard({:?})", &**self)
655 }
656}
657
658#[repr(transparent)]
659struct StableWriteGuard<'a, T: ?Sized>(RwLockWriteGuard<'a, T>);
660
661impl<T: ?Sized> std::ops::Deref for StableWriteGuard<'_, T> {
662 type Target = T;
663
664 fn deref(&self) -> &Self::Target {
665 &*self.0
666 }
667}
668
669impl<T: ?Sized> std::ops::DerefMut for StableWriteGuard<'_, T> {
670 fn deref_mut(&mut self) -> &mut Self::Target {
671 &mut *self.0
672 }
673}
674
675unsafe impl<T: ?Sized> StableDeref for StableWriteGuard<'_, T> {}
676
677impl<T: std::fmt::Debug + ?Sized> std::fmt::Debug for StableWriteGuard<'_, T> {
678 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679 self.0.fmt(f)
680 }
681}
682
683impl<T: std::fmt::Display + ?Sized> std::fmt::Display for StableWriteGuard<'_, T> {
684 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
685 self.0.fmt(f)
686 }
687}
688
689/// A concurrent hash map.
690///
691/// This type defines a concurrent associative array, based on hash tables with linear probing and
692/// dynamic resizing.
693///
694/// The idea is to let each entry hold a multi-reader lock, effectively limiting lock contentions
695/// to writing simultaneously on the same entry, and resizing the table.
696///
697/// It is not an atomic or lockless hash table, since such construction is only useful in very few
698/// cases, due to limitations on in-place operations on values.
699pub struct CHashMap<K, V, S = RandomState> {
700 /// The inner table.
701 table: RwLock<Table<K, V, S>>,
702 /// The total number of KV pairs in the table.
703 ///
704 /// This is used to calculate the load factor.
705 len: AtomicUsize,
706}
707
708impl<K, V> CHashMap<K, V, RandomState> {
709 /// Create a new hash map with a certain capacity.
710 ///
711 /// "Capacity" means the amount of entries the hash map can hold before reallocating. This
712 /// function allocates a hash map with at least the capacity of `cap`.
713 pub fn with_capacity(cap: usize) -> Self {
714 CHashMap {
715 // Start at 0 KV pairs.
716 len: AtomicUsize::new(0),
717 // Make a new empty table. We will make sure that it is at least one.
718 table: RwLock::new(Table::with_capacity(cap)),
719 }
720 }
721
722 /// Create a new hash map.
723 ///
724 /// This creates a new hash map with some fixed initial capacity.
725 pub fn new() -> Self {
726 CHashMap::with_capacity(DEFAULT_INITIAL_CAPACITY)
727 }
728}
729
730impl<K, V, S> CHashMap<K, V, S> {
731 /// Get the number of entries in the hash table.
732 ///
733 /// This is entirely atomic, and will not acquire any locks.
734 ///
735 /// This is guaranteed to reflect the number of entries _at this particular moment.
736 pub fn len(&self) -> usize {
737 self.len.load(ORDERING)
738 }
739
740 /// Get the capacity of the hash table.
741 ///
742 /// The capacity is equal to the number of entries the table can hold before reallocating.
743 pub async fn capacity(&self) -> usize {
744 cmp::max(MINIMUM_CAPACITY, self.buckets().await) * MAX_LOAD_FACTOR_NUM
745 / MAX_LOAD_FACTOR_DENOM
746 }
747
748 /// Get the number of buckets of the hash table.
749 ///
750 /// "Buckets" refers to the amount of potential entries in the inner table. It is different
751 /// from capacity, in the sense that the map cannot hold this number of entries, since it needs
752 /// to keep the load factor low.
753 pub async fn buckets(&self) -> usize {
754 self.table.read().await.buckets.len()
755 }
756
757 /// Is the hash table empty?
758 pub fn is_empty(&self) -> bool {
759 self.len() == 0
760 }
761
762 /// Filter the map based on some predicate.
763 ///
764 /// This tests every entry in the hash map by closure `predicate`. If it returns `true`, the
765 /// map will retain the entry. If not, the entry will be removed.
766 ///
767 /// This won't lock the table. This can be a major performance trade-off, as it means that it
768 /// must lock on every table entry. However, it won't block other operations of the table,
769 /// while filtering.
770 pub async fn retain<F>(&self, predicate: F)
771 where
772 F: Fn(&K, &V) -> bool,
773 {
774 // Acquire the read lock to the table.
775 let table = self.table.read().await;
776 // Run over every bucket and apply the filter.
777 for bucket in &table.buckets {
778 // Acquire the read lock, which we will upgrade if necessary.
779 let lock = bucket.upgradable_read().await;
780 // Skip the free buckets.
781 match *lock {
782 Bucket::Contains(ref key, ref val) if !predicate(key, val) => {
783 // Predicate didn't match. Set the bucket to removed.
784 let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await;
785 *lock = Bucket::Removed;
786 // Decrement the length to account for the removed bucket.
787 // TODO: Can we somehow bundle these up to reduce the overhead of atomic
788 // operations? Storing in a local variable and then subtracting causes
789 // issues with consistency.
790 self.len.fetch_sub(1, ORDERING);
791 }
792 _ => (),
793 }
794 }
795 }
796}
797
798impl<K, V, S: Default + BuildHasher> CHashMap<K, V, S> {
799 /// Creates an empty `CHashMap` with the specified capacity, using `hash_builder`
800 /// to hash the keys.
801 ///
802 /// The hash map will be able to hold at least `capacity` elements without
803 /// reallocating. If `capacity` is 0, the hash map will not allocate.
804 ///
805 /// Warning: `hash_builder` is normally randomly generated, and
806 /// is designed to allow HashMaps to be resistant to attacks that
807 /// cause many collisions and very poor performance. Setting it
808 /// manually using this function can expose a DoS attack vector.
809 pub fn with_hasher_and_capacity(cap: usize, hash_builder: S) -> Self {
810 CHashMap {
811 // Start at 0 KV pairs.
812 len: AtomicUsize::new(0),
813 // Make a new empty table. We will make sure that it is at least one.
814 table: RwLock::new(Table::with_capacity_and_hasher(cap, hash_builder)),
815 }
816 }
817
818 /// Creates an empty `CHashMap` which will use the given hash builder to hash
819 /// keys.
820 ///
821 /// The created map has the default initial capacity.
822 ///
823 /// Warning: `hash_builder` is normally randomly generated, and
824 /// is designed to allow HashMaps to be resistant to attacks that
825 /// cause many collisions and very poor performance. Setting it
826 /// manually using this function can expose a DoS attack vector.
827 pub fn with_hasher(hash_builder: S) -> Self {
828 CHashMap::with_hasher_and_capacity(DEFAULT_INITIAL_CAPACITY, hash_builder)
829 }
830}
831
832impl<K, V, S> CHashMap<K, V, S>
833where
834 S: Default + BuildHasher,
835{
836 /// Clear the map.
837 ///
838 /// This clears the hash map and returns the previous version of the map.
839 ///
840 /// It is relatively efficient, although it needs to write lock a RW lock.
841 pub async fn clear(&self) -> CHashMap<K, V, S> {
842 // Acquire a writable lock.
843 let mut lock = self.table.write().await;
844 CHashMap {
845 // Replace the old table with an empty initial table.
846 table: RwLock::new(mem::replace(
847 &mut *lock,
848 Table::with_hasher(DEFAULT_INITIAL_CAPACITY, S::default()),
849 )),
850 // Replace the length with 0 and use the old length.
851 len: AtomicUsize::new(self.len.swap(0, ORDERING)),
852 }
853 }
854}
855
856#[repr(transparent)]
857struct UnsafeSendFuture<F>(F);
858
859impl<F: std::future::Future> std::future::Future for UnsafeSendFuture<F> {
860 type Output = F::Output;
861
862 fn poll(
863 self: std::pin::Pin<&mut Self>,
864 cx: &mut std::task::Context<'_>,
865 ) -> std::task::Poll<Self::Output> {
866 unsafe { self.map_unchecked_mut(|f| &mut f.0).poll(cx) }
867 }
868}
869
870unsafe impl<F> Send for UnsafeSendFuture<F> {}
871
872impl<K: PartialEq + Hash, V, S: BuildHasher> CHashMap<K, V, S> {
873 /// Get the value of some key.
874 ///
875 /// This will lookup the entry of some key `key`, and acquire the read-only lock. This means
876 /// that all other parties are blocked from _writing_ (not reading) this value while the guard
877 /// is held.
878 pub async fn get<Q: ?Sized>(&self, key: &Q) -> Option<ReadGuard<'_, K, V, S>>
879 where
880 K: Borrow<Q>,
881 Q: Hash + PartialEq,
882 {
883 self.get_inner(key).await
884 }
885
886 /// Ugly workaround because the compiler is overzealous about making futures with raw pointers
887 /// in their body !Send
888 fn get_inner<'this, 'a, 'b, Q: ?Sized>(
889 &'a self,
890 key: &'b Q,
891 ) -> impl Future<Output = Option<ReadGuard<'a, K, V, S>>> + 'this + Send
892 where
893 K: Borrow<Q>,
894 Q: Hash + PartialEq,
895 Self: 'this,
896 'a: 'this,
897 'b: 'this,
898 {
899 UnsafeSendFuture(async move {
900 // Acquire the read lock and lookup in the table.
901 if let Ok(inner) = OwningRef::new(
902 OwningHandle::new_with_async_fn(
903 StableReadGuard(self.table.read().await),
904 |x| async move { StableReadGuard(unsafe { &*x }.lookup(key).await) },
905 )
906 .await,
907 )
908 .try_map(|x| x.value_ref())
909 {
910 // The bucket contains data.
911 Some(ReadGuard { inner })
912 } else {
913 // The bucket is empty/removed.
914 None
915 }
916 })
917 }
918
919 /// Get the (mutable) value of some key.
920 ///
921 /// This will lookup the entry of some key `key`, and acquire the writable lock. This means
922 /// that all other parties are blocked from both reading and writing this value while the guard
923 /// is held.
924 pub async fn get_mut<Q: ?Sized>(&self, key: &Q) -> Option<WriteGuard<'_, K, V, S>>
925 where
926 K: Borrow<Q>,
927 Q: Hash + PartialEq,
928 {
929 self.get_mut_inner(key).await
930 }
931
932 /// Ugly workaround because the compiler is overzealous about making futures with raw pointers
933 /// in their body !Send
934 fn get_mut_inner<'this, 'a, 'b, Q: ?Sized>(
935 &'a self,
936 key: &'b Q,
937 ) -> impl Future<Output = Option<WriteGuard<'a, K, V, S>>> + 'this + Send
938 where
939 K: Borrow<Q>,
940 Q: Hash + PartialEq,
941 Self: 'this,
942 'a: 'this,
943 'b: 'this,
944 {
945 UnsafeSendFuture(async move {
946 // Acquire the write lock and lookup in the table.
947 let handle = OwningHandle::try_new(
948 OwningHandle::new_with_async_fn(
949 StableReadGuard(self.table.read().await),
950 |x| async move {
951 StableWriteGuard(UnsafeSendFuture(unsafe { &*x }.lookup_mut(key)).await)
952 },
953 )
954 .await,
955 |x| match unsafe { &mut *(x as *mut Bucket<K, V>) } {
956 Bucket::Contains(_, ref mut val) => Ok(val),
957 // The bucket contains data.
958 _ => Err(()),
959 },
960 );
961 match handle {
962 Ok(inner) => Some(WriteGuard { inner }),
963 Err(_) => None,
964 }
965 })
966 }
967
968 /// Does the hash map contain this key?
969 pub async fn contains_key<Q: ?Sized>(&self, key: &Q) -> bool
970 where
971 K: Borrow<Q>,
972 Q: Hash + PartialEq,
973 {
974 // Acquire the lock.
975 let lock = self.table.read().await;
976 // Look the key up in the table
977 let bucket = lock.lookup(key).await;
978 // Test if it is free or not.
979 !bucket.is_free()
980
981 // fuck im sleepy rn
982 // me too buddy
983 }
984}
985
986impl<K, V, S> CHashMap<K, V, S>
987where
988 K: PartialEq + Hash,
989 S: BuildHasher + Default,
990{
991 /// Insert a **new** entry.
992 ///
993 /// This inserts an entry, which the map does not already contain, into the table. If the entry
994 /// exists, the old entry won't be replaced, nor will an error be returned. It will possibly
995 /// introduce silent bugs.
996 ///
997 /// To be more specific, it assumes that the entry does not already exist, and will simply skip
998 /// to the end of the cluster, even if it does exist.
999 ///
1000 /// This is faster than e.g. `insert`, but should only be used, if you know that the entry
1001 /// doesn't already exist.
1002 ///
1003 /// # Warning
1004 ///
1005 /// Only use this, if you know what you're doing. This can easily introduce very complex logic
1006 /// errors.
1007 ///
1008 /// For most other purposes, use `insert`.
1009 ///
1010 /// # Panics
1011 ///
1012 /// This might perform checks in debug mode testing if the key exists already.
1013 pub async fn insert_new(&self, key: K, val: V) {
1014 debug_assert!(
1015 !self.contains_key(&key).await,
1016 "Hash table contains already key, contrary to \
1017 the assumptions about `insert_new`'s arguments."
1018 );
1019
1020 // Expand and lock the table. We need to expand to ensure the bounds on the load factor.
1021 let lock = self.table.read().await;
1022 {
1023 // Find the free bucket.
1024 let mut bucket = lock.find_free(&key).await;
1025
1026 // Set the bucket to the new KV pair.
1027 *bucket = Bucket::Contains(key, val);
1028 }
1029 // Expand the table (we know beforehand that the entry didn't already exist).
1030 self.expand(lock).await;
1031 }
1032
1033 /// Replace an existing entry, or insert a new one.
1034 ///
1035 /// This will replace an existing entry and return the old entry, if any. If no entry exists,
1036 /// it will simply insert the new entry and return `None`.
1037 pub async fn insert(&self, key: K, val: V) -> Option<V> {
1038 // Expand and lock the table. We need to expand to ensure the bounds on the load factor.
1039 let lock = self.table.read().await;
1040 let ret = {
1041 // Lookup the key or a free bucket in the inner table.
1042 let mut bucket = lock.lookup_or_free(&key).await;
1043
1044 // Replace the bucket.
1045 mem::replace(&mut *bucket, Bucket::Contains(key, val)).value()
1046 };
1047
1048 // Expand the table if no bucket was overwritten (i.e. the entry is fresh).
1049 if ret.is_none() {
1050 self.expand(lock).await;
1051 }
1052
1053 ret
1054 }
1055
1056 /// Insert or update.
1057 ///
1058 /// This looks up `key`. If it exists, the reference to its value is passed through closure
1059 /// `update`. If it doesn't exist, the result of closure `insert` is inserted.
1060 pub async fn upsert<F, G>(&self, key: K, insert: F, update: G)
1061 where
1062 F: FnOnce() -> V,
1063 G: FnOnce(&mut V),
1064 {
1065 // Expand and lock the table. We need to expand to ensure the bounds on the load factor.
1066 let lock = self.table.read().await;
1067 {
1068 // Lookup the key or a free bucket in the inner table.
1069 let mut bucket = lock.lookup_or_free(&key).await;
1070
1071 match *bucket {
1072 // The bucket had KV pair!
1073 Bucket::Contains(_, ref mut val) => {
1074 // Run it through the closure.
1075 update(val);
1076 // TODO: We return to stop the borrowck to yell at us. This prevents the control flow
1077 // from reaching the expansion after the match if it has been right here.
1078 return;
1079 }
1080 // The bucket was empty, simply insert.
1081 ref mut x => *x = Bucket::Contains(key, insert()),
1082 }
1083 }
1084
1085 // Expand the table (this will only happen if the function hasn't returned yet).
1086 self.expand(lock).await;
1087 }
1088
1089 /// Map or insert an entry.
1090 ///
1091 /// This sets the value associated with key `key` to `f(Some(old_val))` (if it returns `None`,
1092 /// the entry is removed) if it exists. If it does not exist, it inserts it with value
1093 /// `f(None)`, unless the closure returns `None`.
1094 ///
1095 /// Note that if `f` returns `None`, the entry of key `key` is removed unconditionally.
1096 pub async fn alter<F, Fut>(&self, key: K, f: F)
1097 where
1098 F: FnOnce(Option<V>) -> Fut,
1099 Fut: std::future::Future<Output = Option<V>>,
1100 {
1101 // Expand and lock the table. We need to expand to ensure the bounds on the load factor.
1102 let lock = self.table.read().await;
1103 {
1104 // Lookup the key or a free bucket in the inner table.
1105 let mut bucket = lock.lookup_or_free(&key).await;
1106
1107 match mem::replace(&mut *bucket, Bucket::Removed) {
1108 Bucket::Contains(_, val) => {
1109 if let Some(new_val) = f(Some(val)).await {
1110 // Set the bucket to a KV pair with the new value.
1111 *bucket = Bucket::Contains(key, new_val);
1112 // No extension required, as the bucket already had a KV pair previously.
1113 return;
1114 } else {
1115 // The old entry was removed, so we decrement the length of the map.
1116 self.len.fetch_sub(1, ORDERING);
1117 }
1118 // TODO: We return as a hack to avoid the borrowchecker from thinking we moved a
1119 // referenced object. Namely, under this match arm the expansion after the match
1120 // statement won't ever be reached.
1121 return;
1122 }
1123 _ => {
1124 if let Some(new_val) = f(None).await {
1125 // The previously free cluster will get a KV pair with the new value.
1126 *bucket = Bucket::Contains(key, new_val);
1127 } else {
1128 return;
1129 }
1130 }
1131 }
1132 }
1133
1134 // A new entry was inserted, so naturally, we expand the table.
1135 self.expand(lock).await;
1136 }
1137
1138 /// Remove an entry.
1139 ///
1140 /// This removes and returns the entry with key `key`. If no entry with said key exists, it
1141 /// will simply return `None`.
1142 pub async fn remove<Q: ?Sized>(&self, key: &Q) -> Option<V>
1143 where
1144 K: Borrow<Q>,
1145 Q: PartialEq + Hash,
1146 {
1147 // Acquire the read lock of the table.
1148 let lock = self.table.read().await;
1149
1150 // Lookup the table, mutably.
1151 let mut bucket = lock.lookup_mut(key).await;
1152 // Remove the bucket.
1153 match &mut *bucket {
1154 // There was nothing to remove.
1155 &mut Bucket::Removed | &mut Bucket::Empty => None,
1156 // TODO: We know that this is a `Bucket::Contains` variant, but to bypass borrowck
1157 // madness, we do weird weird stuff.
1158 bucket => {
1159 // Decrement the length of the map.
1160 self.len.fetch_sub(1, ORDERING);
1161
1162 // Set the bucket to "removed" and return its value.
1163 mem::replace(bucket, Bucket::Removed).value()
1164 }
1165 }
1166 }
1167
1168 /// Reserve additional space.
1169 ///
1170 /// This reserves additional `additional` buckets to the table. Note that it might reserve more
1171 /// in order make reallocation less common.
1172 pub async fn reserve(&self, additional: usize) {
1173 // Get the new length.
1174 let len = (self.len() + additional) * LENGTH_MULTIPLIER;
1175 // Acquire the write lock (needed because we'll mess with the table).
1176 let mut lock = self.table.write().await;
1177 // Handle the case where another thread has resized the table while we were acquiring the
1178 // lock.
1179 if lock.buckets.len() < len {
1180 // Swap the table out with a new table of desired size (multiplied by some factor).
1181 let table = mem::replace(
1182 &mut *lock,
1183 Table::with_capacity_and_hasher(len, S::default()),
1184 );
1185 // Fill the new table with the data from the old table.
1186 lock.fill(table);
1187 }
1188 }
1189
1190 /// Shrink the capacity of the map to reduce space usage.
1191 ///
1192 /// This will shrink the capacity of the map to the needed amount (plus some additional space
1193 /// to avoid reallocations), effectively reducing memory usage in cases where there is
1194 /// excessive space.
1195 ///
1196 /// It is healthy to run this once in a while, if the size of your hash map changes a lot (e.g.
1197 /// has a high maximum case).
1198 pub async fn shrink_to_fit(&self) {
1199 // Acquire the write lock (needed because we'll mess with the table).
1200 let mut lock = self.table.write().await;
1201 // Swap the table out with a new table of desired size (multiplied by some factor).
1202 let table = mem::replace(
1203 &mut *lock,
1204 Table::with_capacity_and_hasher(self.len(), S::default()),
1205 );
1206 // Fill the new table with the data from the old table.
1207 lock.fill(table);
1208 }
1209
1210 /// Increment the size of the hash map and expand it so one more entry can fit in.
1211 ///
1212 /// This returns the read lock, such that the caller won't have to acquire it twice.
1213 async fn expand(&self, lock: RwLockReadGuard<'_, Table<K, V, S>>) {
1214 // Increment the length to take the new element into account.
1215 let len = self.len.fetch_add(1, ORDERING) + 1;
1216
1217 // Extend if necessary. We multiply by some constant to adjust our load factor.
1218 if len * MAX_LOAD_FACTOR_DENOM > lock.buckets.len() * MAX_LOAD_FACTOR_NUM {
1219 // Drop the read lock to avoid deadlocks when acquiring the write lock.
1220 drop(lock);
1221 // Reserve 1 entry in space (the function will handle the excessive space logic).
1222 self.reserve(1).await;
1223 }
1224 }
1225}
1226
1227impl<K, V, S: Default + BuildHasher> Default for CHashMap<K, V, S> {
1228 fn default() -> Self {
1229 // Forward the call to `new`.
1230 CHashMap::with_hasher(S::default())
1231 }
1232}
1233
1234impl<K: Clone, V: Clone, S: Clone> CHashMap<K, V, S> {
1235 /// Clone the struct, bypassing the locks by the statically guaranteed exclusive `&mut` access.
1236 /// Not that the map is not actually mutated.
1237 pub fn clone_mut(&mut self) -> Self {
1238 CHashMap {
1239 table: RwLock::new(self.table.get_mut().clone_mut()),
1240 len: AtomicUsize::new(self.len.load(ORDERING)),
1241 }
1242 }
1243
1244 /// Clone by accessing keys and values via the locks. That requires this method to be `async`.
1245 pub async fn clone_locking(&self) -> Self {
1246 CHashMap {
1247 table: RwLock::new(self.table.read().await.clone_locking().await),
1248 len: AtomicUsize::new(self.len.load(ORDERING)),
1249 }
1250 }
1251}
1252
1253impl<K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for CHashMap<K, V, S> {
1254 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1255 write!(f, "CHashMap {{<{} entries>}}", self.len())
1256 }
1257}
1258
1259impl<K, V, S> IntoIterator for CHashMap<K, V, S> {
1260 type Item = (K, V);
1261 type IntoIter = IntoIter<K, V, S>;
1262
1263 fn into_iter(self) -> IntoIter<K, V, S> {
1264 self.table.into_inner().into_iter()
1265 }
1266}
1267
1268impl<K: PartialEq + Hash, V, S: Default + BuildHasher> iter::FromIterator<(K, V)>
1269 for CHashMap<K, V, S>
1270{
1271 fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
1272 // TODO: This step is required to obtain the length of the iterator. Eliminate it.
1273 let vec: Vec<_> = iter.into_iter().collect();
1274 let len = vec.len();
1275
1276 // Start with an empty table.
1277 let mut table = Table::with_capacity_and_hasher(len, S::default());
1278 // Fill the table with the pairs from the iterator.
1279 for (key, val) in vec {
1280 // Insert the KV pair. This is fine, as we are ensured that there are no duplicates in
1281 // the iterator.
1282 let bucket = table.find_free_no_lock(&key);
1283 *bucket = Bucket::Contains(key, val);
1284 }
1285
1286 CHashMap {
1287 table: RwLock::new(table),
1288 len: AtomicUsize::new(len),
1289 }
1290 }
1291}