light_cache/
map.rs

1pub mod builder;
2
3use parking_lot::{Mutex, RwLock, RwLockWriteGuard};
4use std::hash::{BuildHasher, Hasher};
5
6use hashbrown::hash_map::{DefaultHashBuilder, HashMap};
7use hashbrown::raw::RawTable;
8
9mod waker_node;
10pub(crate) use waker_node::Wakers;
11
12/// A concurrent hashmap implementation thats always non-blocking.
13///
14/// Calls to get and insert are not async and since values are clone, they will never block another thread.
15pub struct LightMap<K, V, S = DefaultHashBuilder> {
16    pub(crate) build_hasher: S,
17    shards: Box<[Shard<K, V>]>,
18}
19
20pub(crate) struct Shard<K, V> {
21    pub(crate) waiters: Mutex<HashMap<K, Wakers>>,
22    table: RwLock<RawTable<Entry<K, V>>>,
23}
24
25pub struct GetMut<'a, K, V> {
26    key: &'a K,
27    table: RwLockWriteGuard<'a, RawTable<Entry<K, V>>>,
28    hash: u64,
29}
30
31impl<'a, K, V> GetMut<'a, K, V>
32where
33    K: Eq + std::hash::Hash,
34{
35    pub fn apply<F>(mut self, f: F) -> bool
36    where
37        F: FnOnce(&mut V),
38    {
39        if let Some(entry) = self.table.get_mut(self.hash, |e| eq_key(self.key, &e.key)) {
40            f(&mut entry.value);
41            true
42        } else {
43            false
44        }
45    }
46}
47
48struct Entry<K, V> {
49    key: K,
50    value: V,
51}
52
53impl<K, V> LightMap<K, V> {
54    pub fn new() -> Self {
55        builder::MapBuilder::new().build(Default::default())
56    }
57
58    pub fn with_capacity(capacity: usize) -> Self {
59        builder::MapBuilder::new()
60            .estimated_size(capacity)
61            .build(Default::default())
62    }
63}
64
65impl<K, V, S: BuildHasher> LightMap<K, V, S> {
66    pub fn with_hasher(build_hasher: S) -> Self {
67        builder::MapBuilder::new().build(build_hasher)
68    }
69
70    pub fn with_capacity_and_hasher(capacity: usize, build_hasher: S) -> Self {
71        builder::MapBuilder::new()
72            .estimated_size(capacity)
73            .build(build_hasher)
74    }
75}
76
77impl<K, V, S> LightMap<K, V, S> {
78    pub fn len(&self) -> usize {
79        self.shards.iter().map(|s| s.table.read().len()).sum()
80    }
81}
82
83impl<K, V, S> LightMap<K, V, S>
84where
85    K: Eq + std::hash::Hash,
86    S: std::hash::BuildHasher,
87    V: Clone,
88{
89    pub fn insert(&self, key: K, value: V) -> Option<V> {
90        let (hash, shard) = self.shard(&key).unwrap();
91
92        shard.insert(key, value, hash, &self.build_hasher)
93    }
94
95    pub fn get(&self, key: &K) -> Option<V> {
96        let (hash, shard) = self.shard(key).unwrap();
97
98        shard.get(key, hash)
99    }
100
101    /// Note: Holding this refrence can block *ALL* operations on this shard
102    /// 
103    /// Its important to never hold this refrence for long periods of time
104    /// espically past an await point
105    pub fn get_mut<'a>(&'a self, key: &'a K) -> GetMut<'a, K, V> {
106        let (hash, shard) = self.shard(key).unwrap();
107
108        GetMut {
109            key,
110            table: shard.table.write(),
111            hash,
112        }
113    }
114
115    pub fn remove(&self, key: &K) -> Option<V> {
116        let (hash, shard) = self.shard(key).unwrap();
117
118        shard.remove(key, hash)
119    }
120
121    pub(crate) fn shard(&self, key: &K) -> Option<(u64, &Shard<K, V>)> {
122        let hash = hash_key(&self.build_hasher, key);
123
124        let idx = hash as usize % self.shards.len();
125        self.shards.get(idx).map(|s| (hash, s))
126    }
127}
128
129impl<K, V> Shard<K, V>
130where
131    K: Eq + std::hash::Hash,
132    V: Clone,
133{
134    pub(crate) fn insert<S: BuildHasher>(
135        &self,
136        key: K,
137        value: V,
138        hash: u64,
139        build_hasher: &S,
140    ) -> Option<V> {
141        let mut table = self.table.write();
142
143        match table.find_or_find_insert_slot(
144            hash,
145            |e| eq_key(&key, &e.key),
146            |e| hash_key(build_hasher, &e.key),
147        ) {
148            // saftey: we hold an exclusive lock on the table
149            Ok(entry) => unsafe { Some(std::mem::replace(&mut entry.as_mut().value, value)) },
150            Err(slot) => {
151                let entry = Entry { key, value };
152                unsafe {
153                    table.insert_in_slot(hash, slot, entry);
154                }
155
156                None
157            }
158        }
159    }
160
161    pub(crate) fn get(&self, key: &K, hash: u64) -> Option<V> {
162        let table = self.table.read();
163
164        table
165            .get(hash, |e| eq_key(key, &e.key))
166            .map(|e| e.value.clone())
167    }
168
169    pub(crate) fn remove(&self, key: &K, hash: u64) -> Option<V> {
170        let mut table = self.table.write();
171
172        table
173            .remove_entry(hash, |e| eq_key(key, &e.key))
174            .map(|e| e.value)
175    }
176}
177
178pub(crate) fn eq_key<K: Eq>(a: &K, b: &K) -> bool {
179    a.eq(b)
180}
181
182pub(crate) fn hash_key<K, S>(build_hasher: &S, key: &K) -> u64
183where
184    K: std::hash::Hash,
185    S: std::hash::BuildHasher,
186{
187    let mut hasher = build_hasher.build_hasher();
188    key.hash(&mut hasher);
189    hasher.finish()
190}
191
192fn max_parrellism() -> usize {
193    use std::sync::atomic::{AtomicUsize, Ordering};
194    static AVAILABLE_PARALLELISM: AtomicUsize = AtomicUsize::new(0);
195    let mut ap = AVAILABLE_PARALLELISM.load(Ordering::Relaxed);
196    if ap == 0 {
197        ap = std::thread::available_parallelism()
198            .map(|n| n.get())
199            .unwrap_or(1);
200
201        AVAILABLE_PARALLELISM.store(ap, Ordering::Relaxed);
202    }
203    ap
204}
205
206#[cfg(test)]
207mod test {
208    use super::*;
209
210    #[test]
211    fn test_insert_and_get_basic() {
212        let map = LightMap::new();
213
214        map.insert(1, 2);
215        map.insert(2, 3);
216        map.insert(3, 4);
217
218        assert_eq!(map.get(&1), Some(2));
219        assert_eq!(map.get(&2), Some(3));
220        assert_eq!(map.get(&3), Some(4));
221    }
222}