chashmap_async/
lib.rs

1//! Concurrent hash maps.
2//!
3//! This crate implements concurrent hash maps, based on bucket-level multi-reader locks. It has
4//! excellent performance characteristics¹ and supports resizing, in-place mutation and more.
5//!
6//! The API derives directly from `std::collections::HashMap`, giving it a familiar feel.
7//!
8//! ¹Note that it heavily depends on the behavior of your program, but in most cases, it's really
9//!  good. In some (rare) cases you might want atomic hash maps instead.
10//!
11//! # How it works
12//!
13//! `chashmap` is not lockless, but it distributes locks across the map such that lock contentions
14//! (which is what could make accesses expensive) are very rare.
15//!
16//! Hash maps consists of so called "buckets", which each defines a potential entry in the table.
17//! The bucket of some key-value pair is determined by the hash of the key. By holding a read-write
18//! lock for each bucket, we ensure that you will generally be able to insert, read, modify, etc.
19//! with only one or two locking subroutines.
20//!
21//! There is a special-case: reallocation. When the table is filled up such that very few buckets
22//! are free (note that this is "very few" and not "no", since the load factor shouldn't get too
23//! high as it hurts performance), a global lock is obtained while rehashing the table. This is
24//! pretty inefficient, but it rarely happens, and due to the adaptive nature of the capacity, it
25//! will only happen a few times when the map has just been initialized.
26//!
27//! ## Collision resolution
28//!
29//! When two hashes collide, they cannot share the same bucket, so there must be an algorithm which
30//! can resolve collisions. In our case, we use linear probing, which means that we take the bucket
31//! following it, and repeat until we find a free bucket.
32//!
33//! This method is far from ideal, but superior methods like Robin-Hood hashing works poorly (if at
34//! all) in a concurrent structure.
35//!
36//! # The API
37//!
38//! The API should feel very familiar, if you are used to the libstd hash map implementation. They
39//! share many of the methods, and I've carefully made sure that all the items, which have similarly
40//! named items in libstd, matches in semantics and behavior.
41
42#[cfg(test)]
43mod tests;
44
45use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
46use futures::{stream, stream::StreamExt};
47use owning_ref::{OwningHandle, OwningRef};
48use stable_deref_trait::StableDeref;
49use std::borrow::Borrow;
50use std::collections::hash_map::RandomState;
51use std::future::Future;
52use std::hash::{BuildHasher, Hash, Hasher};
53use std::sync::atomic::{self, AtomicUsize};
54use std::{cmp, fmt, iter, mem, ops};
55
56/// The atomic ordering used throughout the code.
57const ORDERING: atomic::Ordering = atomic::Ordering::Relaxed;
58/// The length-to-capacity factor.
59const LENGTH_MULTIPLIER: usize = 4;
60/// The maximal load factor's numerator.
61const MAX_LOAD_FACTOR_NUM: usize = 100 - 15;
62/// The maximal load factor's denominator.
63const MAX_LOAD_FACTOR_DENOM: usize = 100;
64/// The default initial capacity.
65const DEFAULT_INITIAL_CAPACITY: usize = 64;
66/// The lowest capacity a table can have.
67const MINIMUM_CAPACITY: usize = 8;
68
69/// A bucket state.
70///
71/// Buckets are the bricks of hash tables. They represent a single entry into the table.
72#[derive(Clone)]
73enum Bucket<K, V> {
74    /// The bucket contains a key-value pair.
75    Contains(K, V),
76    /// The bucket is empty and has never been used.
77    ///
78    /// Since hash collisions are resolved by jumping to the next bucket, some buckets can cluster
79    /// together, meaning that they are potential candidates for lookups. Empty buckets can be seen
80    /// as the delimiter of such cluters.
81    Empty,
82    /// The bucket was removed.
83    ///
84    /// The technique of distincting between "empty" and "removed" was first described by Knuth.
85    /// The idea is that when you search for a key, you will probe over these buckets, since the
86    /// key could have been pushed behind the removed element:
87    ///```notest
88    ///     Contains(k1, v1) // hash = h
89    ///     Removed
90    ///     Contains(k2, v2) // hash = h
91    ///```
92    /// If we stopped at `Removed`, we won't be able to find the second KV pair. So `Removed` is
93    /// semantically different from `Empty`, as the search won't stop.
94    ///
95    /// However, we are still able to insert new pairs at the removed buckets.
96    Removed,
97}
98
99impl<K, V> Bucket<K, V> {
100    /// Is this bucket 'empty'?
101    fn is_empty(&self) -> bool {
102        matches!(*self, Bucket::Empty)
103    }
104
105    /// Is this bucket 'removed'?
106    fn is_removed(&self) -> bool {
107        matches!(*self, Bucket::Removed)
108    }
109
110    /// Is this bucket free?
111    ///
112    /// "Free" means that it can safely be replace by another bucket — namely that the bucket is
113    /// not occupied.
114    fn is_free(&self) -> bool {
115        match *self {
116            // The two replacable bucket types are removed buckets and empty buckets.
117            Bucket::Removed | Bucket::Empty => true,
118            // KV pairs can't be replaced as they contain data.
119            Bucket::Contains(..) => false,
120        }
121    }
122
123    /// Get the value (if any) of this bucket.
124    ///
125    /// This gets the value of the KV pair, if any. If the bucket is not a KV pair, `None` is
126    /// returned.
127    fn value(self) -> Option<V> {
128        if let Bucket::Contains(_, val) = self {
129            Some(val)
130        } else {
131            None
132        }
133    }
134
135    /// Get a reference to the value of the bucket (if any).
136    ///
137    /// This returns a reference to the value of the bucket, if it is a KV pair. If not, it will
138    /// return `None`.
139    ///
140    /// Rather than `Option`, it returns a `Result`, in order to make it easier to work with the
141    /// `owning_ref` crate (`try_new` and `try_map` of `OwningHandle` and `OwningRef`
142    /// respectively).
143    fn value_ref(&self) -> Result<&V, ()> {
144        if let Bucket::Contains(_, ref val) = *self {
145            Ok(val)
146        } else {
147            Err(())
148        }
149    }
150
151    /// Does the bucket match a given key?
152    ///
153    /// This returns `true` if the bucket is a KV pair with key `key`. If not, `false` is returned.
154    fn key_matches(&self, key: &K) -> bool
155    where
156        K: PartialEq,
157    {
158        if let Bucket::Contains(ref candidate_key, _) = *self {
159            // Check if the keys matches.
160            candidate_key == key
161        } else {
162            // The bucket isn't a KV pair, so we'll return false, since there is no key to test
163            // against.
164            false
165        }
166    }
167}
168
169/// The low-level representation of the hash table.
170///
171/// This is different from `CHashMap` in two ways:
172///
173/// 1. It is not wrapped in a lock, meaning that resizing and reallocation is not possible.
174/// 2. It does not track the number of occupied buckets, making it expensive to obtain the load
175///    factor.
176struct Table<K, V, S> {
177    /// The hash function builder.
178    ///
179    /// When a `Table` use the default hash builder, it randomly picks a hash function from
180    /// some family of functions in libstd. This effectively eliminates the issue of hash flooding.
181    hash_builder: S,
182    /// The bucket array.
183    ///
184    /// This vector stores the buckets. The order in which they're stored is far from arbitrary: A
185    /// KV pair `(key, val)`'s first priority location is at `self.hash(&key) % len`. If not
186    /// possible, the next bucket is used, and this process repeats until the bucket is free (or
187    /// the end is reached, in which we simply wrap around).
188    buckets: Vec<RwLock<Bucket<K, V>>>,
189}
190
191impl<K, V> Table<K, V, RandomState> {
192    /// Create a table with a certain number of buckets.
193    fn new(buckets: usize) -> Self {
194        // Fill a vector with `buckets` of `Empty` buckets.
195        let mut vec = Vec::with_capacity(buckets);
196        for _ in 0..buckets {
197            vec.push(RwLock::new(Bucket::Empty));
198        }
199
200        Table {
201            // Generate a hash function.
202            hash_builder: RandomState::new(),
203            buckets: vec,
204        }
205    }
206
207    /// Create a table with at least some capacity.
208    fn with_capacity(cap: usize) -> Self {
209        // The + 1 is needed to avoid losing fractional bucket to integer division.
210        Table::new(cmp::max(
211            MINIMUM_CAPACITY,
212            cap * MAX_LOAD_FACTOR_DENOM / MAX_LOAD_FACTOR_NUM + 1,
213        ))
214    }
215}
216
217impl<K, V, S: BuildHasher> Table<K, V, S> {
218    /// Create a `Table` with the `BuildHasher`
219    fn with_hasher(buckets: usize, hash_builder: S) -> Self {
220        // Fill a vector with `buckets` of `Empty` buckets.
221        let mut vec = Vec::with_capacity(buckets);
222        vec.resize_with(buckets, || RwLock::new(Bucket::Empty));
223
224        Table {
225            hash_builder,
226            buckets: vec,
227        }
228    }
229
230    /// Create a `Table` with a specific capacity and the `BuildHasher`
231    fn with_capacity_and_hasher(cap: usize, hash_builder: S) -> Self {
232        // The + 1 is needed to avoid losing fractional bucket to integer division.
233        Table::with_hasher(
234            cmp::max(
235                MINIMUM_CAPACITY,
236                cap * MAX_LOAD_FACTOR_DENOM / MAX_LOAD_FACTOR_NUM + 1,
237            ),
238            hash_builder,
239        )
240    }
241}
242
243impl<K: PartialEq + Hash, V, S: BuildHasher> Table<K, V, S> {
244    /// Hash some key through the internal hash function.
245    fn hash<T: ?Sized>(&self, key: &T) -> usize
246    where
247        T: Hash,
248    {
249        // Build the initial hash function state.
250        let mut hasher = self.hash_builder.build_hasher();
251        // Hash the key.
252        key.hash(&mut hasher);
253        // Cast to `usize`. Since the hash function returns `u64`, this cast won't ever cause
254        // entropy less than the output space.
255        hasher.finish() as usize
256    }
257
258    /// Scan from the first priority of a key until a match is found.
259    ///
260    /// This scans from the first priority of `key` (as defined by its hash), until a match is
261    /// found (will wrap on end), i.e. `matches` returns `true` with the bucket as argument.
262    ///
263    /// The read guard from the RW-lock of the bucket is returned.
264    async fn scan<F, Q: ?Sized>(&self, key: &Q, matches: F) -> RwLockReadGuard<'_, Bucket<K, V>>
265    where
266        F: Fn(&Bucket<K, V>) -> bool,
267        K: Borrow<Q>,
268        Q: Hash,
269    {
270        // Hash the key.
271        let hash = self.hash(key);
272
273        // Start at the first priority bucket, and then move upwards, searching for the matching
274        // bucket.
275        for i in 0..self.buckets.len() {
276            // Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
277            let lock = self.buckets[(hash + i) % self.buckets.len()].read().await;
278
279            // Check if it is a match.
280            if matches(&lock) {
281                // Yup. Return.
282                return lock;
283            }
284        }
285        panic!("`CHashMap` scan failed! No entry found.");
286    }
287
288    /// Scan from the first priority of a key until a match is found (mutable guard).
289    ///
290    /// This is similar to `scan`, but instead of an immutable lock guard, a mutable lock guard is
291    /// returned.
292    async fn scan_mut<F, Q: ?Sized>(
293        &self,
294        key: &Q,
295        matches: F,
296    ) -> RwLockWriteGuard<'_, Bucket<K, V>>
297    where
298        F: Fn(&Bucket<K, V>) -> bool,
299        K: Borrow<Q>,
300        Q: Hash,
301    {
302        // Hash the key.
303        let hash = self.hash(key);
304
305        // Start at the first priority bucket, and then move upwards, searching for the matching
306        // bucket.
307        for i in 0..self.buckets.len() {
308            // Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
309            let lock = self.buckets[(hash + i) % self.buckets.len()].write().await;
310
311            // Check if it is a match.
312            if matches(&lock) {
313                // Yup. Return.
314                return lock;
315            }
316        }
317        panic!("`CHashMap` scan_mut failed! No entry found.");
318    }
319
320    /// Scan from the first priority of a key until a match is found (bypass locks).
321    ///
322    /// This is similar to `scan_mut`, but it safely bypasses the locks by making use of the
323    /// aliasing invariants of `&mut`.
324    fn scan_mut_no_lock<F>(&mut self, key: &K, matches: F) -> &mut Bucket<K, V>
325    where
326        F: Fn(&Bucket<K, V>) -> bool,
327    {
328        // Hash the key.
329        let hash = self.hash(key);
330        // TODO: To tame the borrowchecker, we fetch this in advance.
331        let len = self.buckets.len();
332
333        // Start at the first priority bucket, and then move upwards, searching for the matching
334        // bucket.
335        for i in 0..self.buckets.len() {
336            // TODO: hacky hacky
337            let idx = (hash + i) % len;
338
339            // Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
340
341            // Check if it is a match.
342            if matches(self.buckets[idx].get_mut()) {
343                // Yup. Return.
344                return self.buckets[idx].get_mut();
345            }
346        }
347        panic!("`CHashMap` scan_mut_no_lock failed! No entry found.");
348    }
349
350    /// Find a bucket with some key, or a free bucket in same cluster.
351    ///
352    /// This scans for buckets with key `key`. If one is found, it will be returned. If none are
353    /// found, it will return a free bucket in the same cluster.
354    async fn lookup_or_free(&self, key: &K) -> RwLockWriteGuard<'_, Bucket<K, V>> {
355        // Hash the key.
356        let hash = self.hash(key);
357        // The encountered free bucket.
358        let mut free = None;
359
360        // Start at the first priority bucket, and then move upwards, searching for the matching
361        // bucket.
362        for i in 0..self.buckets.len() {
363            // Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
364            let lock = self.buckets[(hash + i) % self.buckets.len()].write().await;
365
366            if lock.key_matches(key) {
367                // We found a match.
368                return lock;
369            } else if lock.is_empty() {
370                // The cluster is over. Use the encountered free bucket, if any.
371                return free.unwrap_or(lock);
372            } else if lock.is_removed() && free.is_none() {
373                // We found a free bucket, so we can store it to later (if we don't already have
374                // one).
375                free = Some(lock)
376            }
377        }
378
379        free.expect("No free buckets found")
380    }
381
382    /// Lookup some key.
383    ///
384    /// This searches some key `key`, and returns a immutable lock guard to its bucket. If the key
385    /// couldn't be found, the returned value will be an `Empty` cluster.
386    async fn lookup<Q: ?Sized>(&self, key: &Q) -> RwLockReadGuard<'_, Bucket<K, V>>
387    where
388        K: Borrow<Q>,
389        Q: PartialEq + Hash,
390    {
391        self.scan(key, |x| match *x {
392            // We'll check that the keys does indeed match, as the chance of hash collisions
393            // happening is inevitable
394            Bucket::Contains(ref candidate_key, _) if key.eq(candidate_key.borrow()) => true,
395            // We reached an empty bucket, meaning that there are no more buckets, not even removed
396            // ones, to search.
397            Bucket::Empty => true,
398            _ => false,
399        })
400        .await
401    }
402
403    /// Lookup some key, mutably.
404    ///
405    /// This is similar to `lookup`, but it returns a mutable guard.
406    ///
407    /// Replacing at this bucket is safe as the bucket will be in the same cluster of buckets as
408    /// the first priority cluster.
409    async fn lookup_mut<Q: ?Sized>(&self, key: &Q) -> RwLockWriteGuard<'_, Bucket<K, V>>
410    where
411        K: Borrow<Q>,
412        Q: PartialEq + Hash,
413    {
414        self.scan_mut(key, |x| match *x {
415            // We'll check that the keys does indeed match, as the chance of hash collisions
416            // happening is inevitable
417            Bucket::Contains(ref candidate_key, _) if key.eq(candidate_key.borrow()) => true,
418            // We reached an empty bucket, meaning that there are no more buckets, not even removed
419            // ones, to search.
420            Bucket::Empty => true,
421            _ => false,
422        })
423        .await
424    }
425
426    /// Find a free bucket in the same cluster as some key.
427    ///
428    /// This means that the returned lock guard defines a valid, free bucket, where `key` can be
429    /// inserted.
430    async fn find_free(&self, key: &K) -> RwLockWriteGuard<'_, Bucket<K, V>> {
431        self.scan_mut(key, |x| x.is_free()).await
432    }
433
434    /// Find a free bucket in the same cluster as some key (bypassing locks).
435    ///
436    /// This is similar to `find_free`, except that it safely bypasses locks through the aliasing
437    /// guarantees of `&mut`.
438    fn find_free_no_lock(&mut self, key: &K) -> &mut Bucket<K, V> {
439        self.scan_mut_no_lock(key, |x| x.is_free())
440    }
441
442    /// Fill the table with data from another table.
443    ///
444    /// This is used to efficiently copy the data of `table` into `self`.
445    ///
446    /// # Important
447    ///
448    /// The table should be empty for this to work correctly/logically.
449    fn fill(&mut self, table: Self) {
450        // Run over all the buckets.
451        for i in table.buckets {
452            // We'll only transfer the bucket if it is a KV pair.
453            if let Bucket::Contains(key, val) = i.into_inner() {
454                // Find a bucket where the KV pair can be inserted.
455                // shush clippy, the comments are important
456                #[allow(clippy::match_like_matches_macro)]
457                let bucket = self.scan_mut_no_lock(&key, |x| match *x {
458                    // Halt on an empty bucket.
459                    Bucket::Empty => true,
460                    // We'll assume that the rest of the buckets either contains other KV pairs (in
461                    // particular, no buckets have been removed in the newly construct table).
462                    _ => false,
463                });
464
465                // Set the bucket to the KV pair.
466                *bucket = Bucket::Contains(key, val);
467            }
468        }
469    }
470}
471
472impl<K: Clone, V: Clone, S: Clone> Table<K, V, S> {
473    /// Clone the struct, bypassing the locks by the statically guaranteed exclusive `&mut` access.
474    /// Not that the map is not actually mutated.
475    pub fn clone_mut(&mut self) -> Self {
476        Table {
477            // Since we copy plainly without rehashing etc., it is important that we keep the same
478            // hash function.
479            hash_builder: self.hash_builder.clone(),
480            // Lock and clone every bucket individually.
481            buckets: self
482                .buckets
483                .iter_mut()
484                .map(|x| RwLock::new(x.get_mut().clone()))
485                .collect(),
486        }
487    }
488
489    /// Clone by accessing keys and values via the locks. That requires this method to be `async`.
490    pub async fn clone_locking(&self) -> Self {
491        Table {
492            // Since we copy plainly without rehashing etc., it is important that we keep the same
493            // hash function.
494            hash_builder: self.hash_builder.clone(),
495            // Lock and clone every bucket individually.
496            buckets: stream::iter(self.buckets.iter())
497                .then(|x| async move { RwLock::new(x.read().await.clone()) })
498                .collect()
499                .await,
500        }
501    }
502}
503
504impl<K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for Table<K, V, S> {
505    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
506        // create a debug map and fill with entries
507        let mut map = f.debug_map();
508        // We'll just run over all buckets and output one after one.
509        for i in &self.buckets {
510            // Acquire the lock.
511            let lock = i.try_read();
512            // Check if the bucket actually contains anything.
513            if let Some(Bucket::Contains(ref key, ref val)) = lock.as_deref() {
514                // add this entry to the map
515                map.entry(key, val);
516            } else {
517                map.entry(&"<locked>", &"<locked>");
518            }
519        }
520        map.finish()
521    }
522}
523
524/// An iterator over the entries of some table.
525pub struct IntoIter<K, V, S> {
526    /// The inner table.
527    table: Table<K, V, S>,
528}
529
530impl<K, V, S> Iterator for IntoIter<K, V, S> {
531    type Item = (K, V);
532
533    fn next(&mut self) -> Option<(K, V)> {
534        // We own the table, and can thus do what we want with it. We'll simply pop from the
535        // buckets until we find a bucket containing data.
536        while let Some(bucket) = self.table.buckets.pop() {
537            // We can bypass dem ebil locks.
538            if let Bucket::Contains(key, val) = bucket.into_inner() {
539                // The bucket contained data, so we'll return the pair.
540                return Some((key, val));
541            }
542        }
543
544        // We've exhausted all the buckets, and no more data could be found.
545        None
546    }
547}
548
549impl<K, V, S> IntoIterator for Table<K, V, S> {
550    type Item = (K, V);
551    type IntoIter = IntoIter<K, V, S>;
552
553    fn into_iter(self) -> Self::IntoIter {
554        IntoIter { table: self }
555    }
556}
557
558/// A RAII guard for reading an entry of a hash map.
559///
560/// This is an access type dereferencing to the inner value of the entry. It will handle unlocking
561/// on drop.
562pub struct ReadGuard<'a, K: 'a, V: 'a, S> {
563    /// The inner hecking long type.
564    #[allow(clippy::type_complexity)]
565    inner: OwningRef<
566        OwningHandle<StableReadGuard<'a, Table<K, V, S>>, StableReadGuard<'a, Bucket<K, V>>>,
567        V,
568    >,
569}
570
571impl<'a, K, V, S> ops::Deref for ReadGuard<'a, K, V, S> {
572    type Target = V;
573
574    fn deref(&self) -> &V {
575        &self.inner
576    }
577}
578
579impl<'a, K, V: PartialEq, S> cmp::PartialEq for ReadGuard<'a, K, V, S> {
580    fn eq(&self, other: &ReadGuard<'a, K, V, S>) -> bool {
581        self == other
582    }
583}
584impl<'a, K, V: Eq, S> cmp::Eq for ReadGuard<'a, K, V, S> {}
585
586impl<'a, K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for ReadGuard<'a, K, V, S> {
587    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
588        write!(f, "ReadGuard({:?})", &**self)
589    }
590}
591
592/// Wrapper type for [`RwLockReadGuard`] that implements [`StableDeref`].
593#[repr(transparent)]
594struct StableReadGuard<'a, T: ?Sized>(RwLockReadGuard<'a, T>);
595
596impl<T: ?Sized> std::ops::Deref for StableReadGuard<'_, T> {
597    type Target = T;
598
599    fn deref(&self) -> &Self::Target {
600        &*self.0
601    }
602}
603
604unsafe impl<T: ?Sized> StableDeref for StableReadGuard<'_, T> {}
605
606impl<T: std::fmt::Debug + ?Sized> std::fmt::Debug for StableReadGuard<'_, T> {
607    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
608        self.0.fmt(f)
609    }
610}
611
612impl<T: std::fmt::Display + ?Sized> std::fmt::Display for StableReadGuard<'_, T> {
613    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
614        self.0.fmt(f)
615    }
616}
617
618/// A mutable RAII guard for reading an entry of a hash map.
619///
620/// This is an access type dereferencing to the inner value of the entry. It will handle unlocking
621/// on drop.
622pub struct WriteGuard<'a, K: 'a, V: 'a, S> {
623    /// The inner hecking long type.
624    #[allow(clippy::type_complexity)]
625    inner: OwningHandle<
626        OwningHandle<StableReadGuard<'a, Table<K, V, S>>, StableWriteGuard<'a, Bucket<K, V>>>,
627        &'a mut V,
628    >,
629}
630
631impl<'a, K, V, S> ops::Deref for WriteGuard<'a, K, V, S> {
632    type Target = V;
633
634    fn deref(&self) -> &V {
635        &self.inner
636    }
637}
638
639impl<'a, K, V, S> ops::DerefMut for WriteGuard<'a, K, V, S> {
640    fn deref_mut(&mut self) -> &mut V {
641        &mut self.inner
642    }
643}
644
645impl<'a, K, V: PartialEq, S> cmp::PartialEq for WriteGuard<'a, K, V, S> {
646    fn eq(&self, other: &Self) -> bool {
647        self == other
648    }
649}
650impl<'a, K, V: Eq, S> cmp::Eq for WriteGuard<'a, K, V, S> {}
651
652impl<'a, K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for WriteGuard<'a, K, V, S> {
653    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
654        write!(f, "WriteGuard({:?})", &**self)
655    }
656}
657
658#[repr(transparent)]
659struct StableWriteGuard<'a, T: ?Sized>(RwLockWriteGuard<'a, T>);
660
661impl<T: ?Sized> std::ops::Deref for StableWriteGuard<'_, T> {
662    type Target = T;
663
664    fn deref(&self) -> &Self::Target {
665        &*self.0
666    }
667}
668
669impl<T: ?Sized> std::ops::DerefMut for StableWriteGuard<'_, T> {
670    fn deref_mut(&mut self) -> &mut Self::Target {
671        &mut *self.0
672    }
673}
674
675unsafe impl<T: ?Sized> StableDeref for StableWriteGuard<'_, T> {}
676
677impl<T: std::fmt::Debug + ?Sized> std::fmt::Debug for StableWriteGuard<'_, T> {
678    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
679        self.0.fmt(f)
680    }
681}
682
683impl<T: std::fmt::Display + ?Sized> std::fmt::Display for StableWriteGuard<'_, T> {
684    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
685        self.0.fmt(f)
686    }
687}
688
689/// A concurrent hash map.
690///
691/// This type defines a concurrent associative array, based on hash tables with linear probing and
692/// dynamic resizing.
693///
694/// The idea is to let each entry hold a multi-reader lock, effectively limiting lock contentions
695/// to writing simultaneously on the same entry, and resizing the table.
696///
697/// It is not an atomic or lockless hash table, since such construction is only useful in very few
698/// cases, due to limitations on in-place operations on values.
699pub struct CHashMap<K, V, S = RandomState> {
700    /// The inner table.
701    table: RwLock<Table<K, V, S>>,
702    /// The total number of KV pairs in the table.
703    ///
704    /// This is used to calculate the load factor.
705    len: AtomicUsize,
706}
707
708impl<K, V> CHashMap<K, V, RandomState> {
709    /// Create a new hash map with a certain capacity.
710    ///
711    /// "Capacity" means the amount of entries the hash map can hold before reallocating. This
712    /// function allocates a hash map with at least the capacity of `cap`.
713    pub fn with_capacity(cap: usize) -> Self {
714        CHashMap {
715            // Start at 0 KV pairs.
716            len: AtomicUsize::new(0),
717            // Make a new empty table. We will make sure that it is at least one.
718            table: RwLock::new(Table::with_capacity(cap)),
719        }
720    }
721
722    /// Create a new hash map.
723    ///
724    /// This creates a new hash map with some fixed initial capacity.
725    pub fn new() -> Self {
726        CHashMap::with_capacity(DEFAULT_INITIAL_CAPACITY)
727    }
728}
729
730impl<K, V, S> CHashMap<K, V, S> {
731    /// Get the number of entries in the hash table.
732    ///
733    /// This is entirely atomic, and will not acquire any locks.
734    ///
735    /// This is guaranteed to reflect the number of entries _at this particular moment.
736    pub fn len(&self) -> usize {
737        self.len.load(ORDERING)
738    }
739
740    /// Get the capacity of the hash table.
741    ///
742    /// The capacity is equal to the number of entries the table can hold before reallocating.
743    pub async fn capacity(&self) -> usize {
744        cmp::max(MINIMUM_CAPACITY, self.buckets().await) * MAX_LOAD_FACTOR_NUM
745            / MAX_LOAD_FACTOR_DENOM
746    }
747
748    /// Get the number of buckets of the hash table.
749    ///
750    /// "Buckets" refers to the amount of potential entries in the inner table. It is different
751    /// from capacity, in the sense that the map cannot hold this number of entries, since it needs
752    /// to keep the load factor low.
753    pub async fn buckets(&self) -> usize {
754        self.table.read().await.buckets.len()
755    }
756
757    /// Is the hash table empty?
758    pub fn is_empty(&self) -> bool {
759        self.len() == 0
760    }
761
762    /// Filter the map based on some predicate.
763    ///
764    /// This tests every entry in the hash map by closure `predicate`. If it returns `true`, the
765    /// map will retain the entry. If not, the entry will be removed.
766    ///
767    /// This won't lock the table. This can be a major performance trade-off, as it means that it
768    /// must lock on every table entry. However, it won't block other operations of the table,
769    /// while filtering.
770    pub async fn retain<F>(&self, predicate: F)
771    where
772        F: Fn(&K, &V) -> bool,
773    {
774        // Acquire the read lock to the table.
775        let table = self.table.read().await;
776        // Run over every bucket and apply the filter.
777        for bucket in &table.buckets {
778            // Acquire the read lock, which we will upgrade if necessary.
779            let lock = bucket.upgradable_read().await;
780            // Skip the free buckets.
781            match *lock {
782                Bucket::Contains(ref key, ref val) if !predicate(key, val) => {
783                    // Predicate didn't match. Set the bucket to removed.
784                    let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await;
785                    *lock = Bucket::Removed;
786                    // Decrement the length to account for the removed bucket.
787                    // TODO: Can we somehow bundle these up to reduce the overhead of atomic
788                    //       operations? Storing in a local variable and then subtracting causes
789                    //       issues with consistency.
790                    self.len.fetch_sub(1, ORDERING);
791                }
792                _ => (),
793            }
794        }
795    }
796}
797
798impl<K, V, S: Default + BuildHasher> CHashMap<K, V, S> {
799    /// Creates an empty `CHashMap` with the specified capacity, using `hash_builder`
800    /// to hash the keys.
801    ///
802    /// The hash map will be able to hold at least `capacity` elements without
803    /// reallocating. If `capacity` is 0, the hash map will not allocate.
804    ///
805    /// Warning: `hash_builder` is normally randomly generated, and
806    /// is designed to allow HashMaps to be resistant to attacks that
807    /// cause many collisions and very poor performance. Setting it
808    /// manually using this function can expose a DoS attack vector.
809    pub fn with_hasher_and_capacity(cap: usize, hash_builder: S) -> Self {
810        CHashMap {
811            // Start at 0 KV pairs.
812            len: AtomicUsize::new(0),
813            // Make a new empty table. We will make sure that it is at least one.
814            table: RwLock::new(Table::with_capacity_and_hasher(cap, hash_builder)),
815        }
816    }
817
818    /// Creates an empty `CHashMap` which will use the given hash builder to hash
819    /// keys.
820    ///
821    /// The created map has the default initial capacity.
822    ///
823    /// Warning: `hash_builder` is normally randomly generated, and
824    /// is designed to allow HashMaps to be resistant to attacks that
825    /// cause many collisions and very poor performance. Setting it
826    /// manually using this function can expose a DoS attack vector.
827    pub fn with_hasher(hash_builder: S) -> Self {
828        CHashMap::with_hasher_and_capacity(DEFAULT_INITIAL_CAPACITY, hash_builder)
829    }
830}
831
832impl<K, V, S> CHashMap<K, V, S>
833where
834    S: Default + BuildHasher,
835{
836    /// Clear the map.
837    ///
838    /// This clears the hash map and returns the previous version of the map.
839    ///
840    /// It is relatively efficient, although it needs to write lock a RW lock.
841    pub async fn clear(&self) -> CHashMap<K, V, S> {
842        // Acquire a writable lock.
843        let mut lock = self.table.write().await;
844        CHashMap {
845            // Replace the old table with an empty initial table.
846            table: RwLock::new(mem::replace(
847                &mut *lock,
848                Table::with_hasher(DEFAULT_INITIAL_CAPACITY, S::default()),
849            )),
850            // Replace the length with 0 and use the old length.
851            len: AtomicUsize::new(self.len.swap(0, ORDERING)),
852        }
853    }
854}
855
856#[repr(transparent)]
857struct UnsafeSendFuture<F>(F);
858
859impl<F: std::future::Future> std::future::Future for UnsafeSendFuture<F> {
860    type Output = F::Output;
861
862    fn poll(
863        self: std::pin::Pin<&mut Self>,
864        cx: &mut std::task::Context<'_>,
865    ) -> std::task::Poll<Self::Output> {
866        unsafe { self.map_unchecked_mut(|f| &mut f.0).poll(cx) }
867    }
868}
869
870unsafe impl<F> Send for UnsafeSendFuture<F> {}
871
872impl<K: PartialEq + Hash, V, S: BuildHasher> CHashMap<K, V, S> {
873    /// Get the value of some key.
874    ///
875    /// This will lookup the entry of some key `key`, and acquire the read-only lock. This means
876    /// that all other parties are blocked from _writing_ (not reading) this value while the guard
877    /// is held.
878    pub async fn get<Q: ?Sized>(&self, key: &Q) -> Option<ReadGuard<'_, K, V, S>>
879    where
880        K: Borrow<Q>,
881        Q: Hash + PartialEq,
882    {
883        self.get_inner(key).await
884    }
885
886    /// Ugly workaround because the compiler is overzealous about making futures with raw pointers
887    /// in their body !Send
888    fn get_inner<'this, 'a, 'b, Q: ?Sized>(
889        &'a self,
890        key: &'b Q,
891    ) -> impl Future<Output = Option<ReadGuard<'a, K, V, S>>> + 'this + Send
892    where
893        K: Borrow<Q>,
894        Q: Hash + PartialEq,
895        Self: 'this,
896        'a: 'this,
897        'b: 'this,
898    {
899        UnsafeSendFuture(async move {
900            // Acquire the read lock and lookup in the table.
901            if let Ok(inner) = OwningRef::new(
902                OwningHandle::new_with_async_fn(
903                    StableReadGuard(self.table.read().await),
904                    |x| async move { StableReadGuard(unsafe { &*x }.lookup(key).await) },
905                )
906                .await,
907            )
908            .try_map(|x| x.value_ref())
909            {
910                // The bucket contains data.
911                Some(ReadGuard { inner })
912            } else {
913                // The bucket is empty/removed.
914                None
915            }
916        })
917    }
918
919    /// Get the (mutable) value of some key.
920    ///
921    /// This will lookup the entry of some key `key`, and acquire the writable lock. This means
922    /// that all other parties are blocked from both reading and writing this value while the guard
923    /// is held.
924    pub async fn get_mut<Q: ?Sized>(&self, key: &Q) -> Option<WriteGuard<'_, K, V, S>>
925    where
926        K: Borrow<Q>,
927        Q: Hash + PartialEq,
928    {
929        self.get_mut_inner(key).await
930    }
931
932    /// Ugly workaround because the compiler is overzealous about making futures with raw pointers
933    /// in their body !Send
934    fn get_mut_inner<'this, 'a, 'b, Q: ?Sized>(
935        &'a self,
936        key: &'b Q,
937    ) -> impl Future<Output = Option<WriteGuard<'a, K, V, S>>> + 'this + Send
938    where
939        K: Borrow<Q>,
940        Q: Hash + PartialEq,
941        Self: 'this,
942        'a: 'this,
943        'b: 'this,
944    {
945        UnsafeSendFuture(async move {
946            // Acquire the write lock and lookup in the table.
947            let handle = OwningHandle::try_new(
948                OwningHandle::new_with_async_fn(
949                    StableReadGuard(self.table.read().await),
950                    |x| async move {
951                        StableWriteGuard(UnsafeSendFuture(unsafe { &*x }.lookup_mut(key)).await)
952                    },
953                )
954                .await,
955                |x| match unsafe { &mut *(x as *mut Bucket<K, V>) } {
956                    Bucket::Contains(_, ref mut val) => Ok(val),
957                    // The bucket contains data.
958                    _ => Err(()),
959                },
960            );
961            match handle {
962                Ok(inner) => Some(WriteGuard { inner }),
963                Err(_) => None,
964            }
965        })
966    }
967
968    /// Does the hash map contain this key?
969    pub async fn contains_key<Q: ?Sized>(&self, key: &Q) -> bool
970    where
971        K: Borrow<Q>,
972        Q: Hash + PartialEq,
973    {
974        // Acquire the lock.
975        let lock = self.table.read().await;
976        // Look the key up in the table
977        let bucket = lock.lookup(key).await;
978        // Test if it is free or not.
979        !bucket.is_free()
980
981        // fuck im sleepy rn
982        // me too buddy
983    }
984}
985
986impl<K, V, S> CHashMap<K, V, S>
987where
988    K: PartialEq + Hash,
989    S: BuildHasher + Default,
990{
991    /// Insert a **new** entry.
992    ///
993    /// This inserts an entry, which the map does not already contain, into the table. If the entry
994    /// exists, the old entry won't be replaced, nor will an error be returned. It will possibly
995    /// introduce silent bugs.
996    ///
997    /// To be more specific, it assumes that the entry does not already exist, and will simply skip
998    /// to the end of the cluster, even if it does exist.
999    ///
1000    /// This is faster than e.g. `insert`, but should only be used, if you know that the entry
1001    /// doesn't already exist.
1002    ///
1003    /// # Warning
1004    ///
1005    /// Only use this, if you know what you're doing. This can easily introduce very complex logic
1006    /// errors.
1007    ///
1008    /// For most other purposes, use `insert`.
1009    ///
1010    /// # Panics
1011    ///
1012    /// This might perform checks in debug mode testing if the key exists already.
1013    pub async fn insert_new(&self, key: K, val: V) {
1014        debug_assert!(
1015            !self.contains_key(&key).await,
1016            "Hash table contains already key, contrary to \
1017            the assumptions about `insert_new`'s arguments."
1018        );
1019
1020        // Expand and lock the table. We need to expand to ensure the bounds on the load factor.
1021        let lock = self.table.read().await;
1022        {
1023            // Find the free bucket.
1024            let mut bucket = lock.find_free(&key).await;
1025
1026            // Set the bucket to the new KV pair.
1027            *bucket = Bucket::Contains(key, val);
1028        }
1029        // Expand the table (we know beforehand that the entry didn't already exist).
1030        self.expand(lock).await;
1031    }
1032
1033    /// Replace an existing entry, or insert a new one.
1034    ///
1035    /// This will replace an existing entry and return the old entry, if any. If no entry exists,
1036    /// it will simply insert the new entry and return `None`.
1037    pub async fn insert(&self, key: K, val: V) -> Option<V> {
1038        // Expand and lock the table. We need to expand to ensure the bounds on the load factor.
1039        let lock = self.table.read().await;
1040        let ret = {
1041            // Lookup the key or a free bucket in the inner table.
1042            let mut bucket = lock.lookup_or_free(&key).await;
1043
1044            // Replace the bucket.
1045            mem::replace(&mut *bucket, Bucket::Contains(key, val)).value()
1046        };
1047
1048        // Expand the table if no bucket was overwritten (i.e. the entry is fresh).
1049        if ret.is_none() {
1050            self.expand(lock).await;
1051        }
1052
1053        ret
1054    }
1055
1056    /// Insert or update.
1057    ///
1058    /// This looks up `key`. If it exists, the reference to its value is passed through closure
1059    /// `update`.  If it doesn't exist, the result of closure `insert` is inserted.
1060    pub async fn upsert<F, G>(&self, key: K, insert: F, update: G)
1061    where
1062        F: FnOnce() -> V,
1063        G: FnOnce(&mut V),
1064    {
1065        // Expand and lock the table. We need to expand to ensure the bounds on the load factor.
1066        let lock = self.table.read().await;
1067        {
1068            // Lookup the key or a free bucket in the inner table.
1069            let mut bucket = lock.lookup_or_free(&key).await;
1070
1071            match *bucket {
1072                // The bucket had KV pair!
1073                Bucket::Contains(_, ref mut val) => {
1074                    // Run it through the closure.
1075                    update(val);
1076                    // TODO: We return to stop the borrowck to yell at us. This prevents the control flow
1077                    //       from reaching the expansion after the match if it has been right here.
1078                    return;
1079                }
1080                // The bucket was empty, simply insert.
1081                ref mut x => *x = Bucket::Contains(key, insert()),
1082            }
1083        }
1084
1085        // Expand the table (this will only happen if the function hasn't returned yet).
1086        self.expand(lock).await;
1087    }
1088
1089    /// Map or insert an entry.
1090    ///
1091    /// This sets the value associated with key `key` to `f(Some(old_val))` (if it returns `None`,
1092    /// the entry is removed) if it exists. If it does not exist, it inserts it with value
1093    /// `f(None)`, unless the closure returns `None`.
1094    ///
1095    /// Note that if `f` returns `None`, the entry of key `key` is removed unconditionally.
1096    pub async fn alter<F, Fut>(&self, key: K, f: F)
1097    where
1098        F: FnOnce(Option<V>) -> Fut,
1099        Fut: std::future::Future<Output = Option<V>>,
1100    {
1101        // Expand and lock the table. We need to expand to ensure the bounds on the load factor.
1102        let lock = self.table.read().await;
1103        {
1104            // Lookup the key or a free bucket in the inner table.
1105            let mut bucket = lock.lookup_or_free(&key).await;
1106
1107            match mem::replace(&mut *bucket, Bucket::Removed) {
1108                Bucket::Contains(_, val) => {
1109                    if let Some(new_val) = f(Some(val)).await {
1110                        // Set the bucket to a KV pair with the new value.
1111                        *bucket = Bucket::Contains(key, new_val);
1112                        // No extension required, as the bucket already had a KV pair previously.
1113                        return;
1114                    } else {
1115                        // The old entry was removed, so we decrement the length of the map.
1116                        self.len.fetch_sub(1, ORDERING);
1117                    }
1118                    // TODO: We return as a hack to avoid the borrowchecker from thinking we moved a
1119                    //       referenced object. Namely, under this match arm the expansion after the match
1120                    //       statement won't ever be reached.
1121                    return;
1122                }
1123                _ => {
1124                    if let Some(new_val) = f(None).await {
1125                        // The previously free cluster will get a KV pair with the new value.
1126                        *bucket = Bucket::Contains(key, new_val);
1127                    } else {
1128                        return;
1129                    }
1130                }
1131            }
1132        }
1133
1134        // A new entry was inserted, so naturally, we expand the table.
1135        self.expand(lock).await;
1136    }
1137
1138    /// Remove an entry.
1139    ///
1140    /// This removes and returns the entry with key `key`. If no entry with said key exists, it
1141    /// will simply return `None`.
1142    pub async fn remove<Q: ?Sized>(&self, key: &Q) -> Option<V>
1143    where
1144        K: Borrow<Q>,
1145        Q: PartialEq + Hash,
1146    {
1147        // Acquire the read lock of the table.
1148        let lock = self.table.read().await;
1149
1150        // Lookup the table, mutably.
1151        let mut bucket = lock.lookup_mut(key).await;
1152        // Remove the bucket.
1153        match &mut *bucket {
1154            // There was nothing to remove.
1155            &mut Bucket::Removed | &mut Bucket::Empty => None,
1156            // TODO: We know that this is a `Bucket::Contains` variant, but to bypass borrowck
1157            //       madness, we do weird weird stuff.
1158            bucket => {
1159                // Decrement the length of the map.
1160                self.len.fetch_sub(1, ORDERING);
1161
1162                // Set the bucket to "removed" and return its value.
1163                mem::replace(bucket, Bucket::Removed).value()
1164            }
1165        }
1166    }
1167
1168    /// Reserve additional space.
1169    ///
1170    /// This reserves additional `additional` buckets to the table. Note that it might reserve more
1171    /// in order make reallocation less common.
1172    pub async fn reserve(&self, additional: usize) {
1173        // Get the new length.
1174        let len = (self.len() + additional) * LENGTH_MULTIPLIER;
1175        // Acquire the write lock (needed because we'll mess with the table).
1176        let mut lock = self.table.write().await;
1177        // Handle the case where another thread has resized the table while we were acquiring the
1178        // lock.
1179        if lock.buckets.len() < len {
1180            // Swap the table out with a new table of desired size (multiplied by some factor).
1181            let table = mem::replace(
1182                &mut *lock,
1183                Table::with_capacity_and_hasher(len, S::default()),
1184            );
1185            // Fill the new table with the data from the old table.
1186            lock.fill(table);
1187        }
1188    }
1189
1190    /// Shrink the capacity of the map to reduce space usage.
1191    ///
1192    /// This will shrink the capacity of the map to the needed amount (plus some additional space
1193    /// to avoid reallocations), effectively reducing memory usage in cases where there is
1194    /// excessive space.
1195    ///
1196    /// It is healthy to run this once in a while, if the size of your hash map changes a lot (e.g.
1197    /// has a high maximum case).
1198    pub async fn shrink_to_fit(&self) {
1199        // Acquire the write lock (needed because we'll mess with the table).
1200        let mut lock = self.table.write().await;
1201        // Swap the table out with a new table of desired size (multiplied by some factor).
1202        let table = mem::replace(
1203            &mut *lock,
1204            Table::with_capacity_and_hasher(self.len(), S::default()),
1205        );
1206        // Fill the new table with the data from the old table.
1207        lock.fill(table);
1208    }
1209
1210    /// Increment the size of the hash map and expand it so one more entry can fit in.
1211    ///
1212    /// This returns the read lock, such that the caller won't have to acquire it twice.
1213    async fn expand(&self, lock: RwLockReadGuard<'_, Table<K, V, S>>) {
1214        // Increment the length to take the new element into account.
1215        let len = self.len.fetch_add(1, ORDERING) + 1;
1216
1217        // Extend if necessary. We multiply by some constant to adjust our load factor.
1218        if len * MAX_LOAD_FACTOR_DENOM > lock.buckets.len() * MAX_LOAD_FACTOR_NUM {
1219            // Drop the read lock to avoid deadlocks when acquiring the write lock.
1220            drop(lock);
1221            // Reserve 1 entry in space (the function will handle the excessive space logic).
1222            self.reserve(1).await;
1223        }
1224    }
1225}
1226
1227impl<K, V, S: Default + BuildHasher> Default for CHashMap<K, V, S> {
1228    fn default() -> Self {
1229        // Forward the call to `new`.
1230        CHashMap::with_hasher(S::default())
1231    }
1232}
1233
1234impl<K: Clone, V: Clone, S: Clone> CHashMap<K, V, S> {
1235    /// Clone the struct, bypassing the locks by the statically guaranteed exclusive `&mut` access.
1236    /// Not that the map is not actually mutated.
1237    pub fn clone_mut(&mut self) -> Self {
1238        CHashMap {
1239            table: RwLock::new(self.table.get_mut().clone_mut()),
1240            len: AtomicUsize::new(self.len.load(ORDERING)),
1241        }
1242    }
1243
1244    /// Clone by accessing keys and values via the locks. That requires this method to be `async`.
1245    pub async fn clone_locking(&self) -> Self {
1246        CHashMap {
1247            table: RwLock::new(self.table.read().await.clone_locking().await),
1248            len: AtomicUsize::new(self.len.load(ORDERING)),
1249        }
1250    }
1251}
1252
1253impl<K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for CHashMap<K, V, S> {
1254    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1255        write!(f, "CHashMap {{<{} entries>}}", self.len())
1256    }
1257}
1258
1259impl<K, V, S> IntoIterator for CHashMap<K, V, S> {
1260    type Item = (K, V);
1261    type IntoIter = IntoIter<K, V, S>;
1262
1263    fn into_iter(self) -> IntoIter<K, V, S> {
1264        self.table.into_inner().into_iter()
1265    }
1266}
1267
1268impl<K: PartialEq + Hash, V, S: Default + BuildHasher> iter::FromIterator<(K, V)>
1269    for CHashMap<K, V, S>
1270{
1271    fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
1272        // TODO: This step is required to obtain the length of the iterator. Eliminate it.
1273        let vec: Vec<_> = iter.into_iter().collect();
1274        let len = vec.len();
1275
1276        // Start with an empty table.
1277        let mut table = Table::with_capacity_and_hasher(len, S::default());
1278        // Fill the table with the pairs from the iterator.
1279        for (key, val) in vec {
1280            // Insert the KV pair. This is fine, as we are ensured that there are no duplicates in
1281            // the iterator.
1282            let bucket = table.find_free_no_lock(&key);
1283            *bucket = Bucket::Contains(key, val);
1284        }
1285
1286        CHashMap {
1287            table: RwLock::new(table),
1288            len: AtomicUsize::new(len),
1289        }
1290    }
1291}