light_cache/
cache.rs

1pub(crate) mod get_or_insert;
2pub(crate) use get_or_insert::{GetOrInsertFuture, GetOrTryInsertFuture};
3
4pub(crate) mod get_or_insert_race;
5pub(crate) use get_or_insert_race::{GetOrInsertRace, GetOrTryInsertRace};
6
7mod no_policy;
8pub use no_policy::NoPolicy;
9
10use crate::map::LightMap;
11use crate::policy::{NoopPolicy, Policy};
12pub use hashbrown::hash_map::DefaultHashBuilder;
13
14use std::future::Future;
15use std::hash::{BuildHasher, Hash};
16use std::sync::Arc;
17
18/// A concurrent hashmap that allows for efficient async insertion of values
19/// 
20/// LightCache is cloneable and can be shared across threads, so theres no need to explicitly wrap it in an Arc
21/// 
22/// ## LightCache offers two modes for async insertion
23/// 
24/// ### Cooperative: 
25/// [`Self::get_or_insert`] and [`Self::get_or_try_insert`] only allow one worker task at a time to poll their futures
26/// and will wake up the other tasks when the value is inserted
27/// 
28/// ### Race: 
29/// [`Self::get_or_insert_race`] and [`Self::get_or_try_insert_race`] allow all worker tasks to poll their futures at the same time
30pub struct LightCache<K, V, S = DefaultHashBuilder, P = NoopPolicy> {
31    pub(crate) inner: Arc<LightCacheInner<K, V, S, P>>,
32}
33
34impl<K, V, S, P> Clone for LightCache<K, V, S, P> {
35    fn clone(&self) -> Self {
36        LightCache {
37            inner: self.inner.clone(),
38        }
39    }
40}
41
42pub(crate) struct LightCacheInner<K, V, S = DefaultHashBuilder, P = NoopPolicy> {
43    pub(crate) policy: P,
44    map: LightMap<K, V, S>,
45}
46
47impl<K, V> LightCache<K, V> {
48    pub fn new() -> Self {
49        LightCache {
50            inner: Arc::new(LightCacheInner {
51                map: LightMap::new(),
52                policy: NoopPolicy::new(),
53            }),
54        }
55    }
56
57    pub fn with_capacity(capacity: usize) -> Self {
58        LightCache {
59            inner: Arc::new(LightCacheInner {
60                map: LightMap::with_capacity(capacity),
61                policy: NoopPolicy::new(),
62            }),
63        }
64    }
65}
66
67impl<K, V, S: BuildHasher, P> LightCache<K, V, S, P> {
68    pub fn from_parts(policy: P, hasher: S) -> Self {
69        LightCache {
70            inner: Arc::new(LightCacheInner {
71                map: LightMap::with_hasher(hasher),
72                policy,
73            }),
74        }
75    }
76
77    pub fn from_parts_with_capacity(policy: P, hasher: S, capacity: usize) -> Self {
78        LightCache {
79            inner: Arc::new(LightCacheInner {
80                map: LightMap::with_capacity_and_hasher(capacity, hasher),
81                policy,
82            }),
83        }
84    }
85}
86
87impl<K, V, S, P> LightCache<K, V, S, P> {
88    pub fn len(&self) -> usize {
89        self.inner.map.len()
90    }
91}
92
93impl<K, V, S, P> LightCache<K, V, S, P>
94where
95    K: Eq + Hash + Copy,
96    V: Clone + Sync,
97    S: BuildHasher,
98    P: Policy<K, V>,
99{
100    /// Get or insert the value for the given key
101    ///
102    /// In the case that a key is being inserted by another thread using this method or [`Self::get_or_try_insert`]
103    /// tasks will cooperativley compute the value and notify the other task when the value is inserted.
104    /// If a task fails to insert the value, (via panic or error) another task will take over until theyve all tried.
105    ///
106    /// ## Note
107    /// If a call to remove is issued between the time of inserting, and waking up tasks, the other tasks will simply see the empty slot and try again
108    pub async fn get_or_insert<F, Fut>(&self, key: K, init: F) -> V
109    where
110        F: FnOnce() -> Fut,
111        Fut: Future<Output = V>,
112    {
113        if let Some(value) = self.get(&key) {
114            return value;
115        }
116
117        self.get_or_insert_inner(key, init).await
118    }
119
120    /// Get or insert the value for the given key, returning an error if the value could not be inserted
121    /// 
122    /// See [`Self::get_or_insert`] for more information
123    pub async fn get_or_try_insert<F, Fut, Err>(
124        &self,
125        key: K,
126        init: F,
127    ) -> Result<V, Err>
128    where
129        F: FnOnce() -> Fut,
130        Fut: std::future::Future<Output = Result<V, Err>>,
131    {
132        if let Some(value) = self.get(&key) {
133            return Ok(value);
134        }
135
136        self.get_or_try_insert_inner(key, init).await
137    }
138
139    /// Get or insert a value into the cache, but instead of waiting for a single caller to finish the insertion (coopertively),
140    /// any callers of this method will always attempt to poll their own future
141    /// 
142    /// This is safe to use with any other get_or_* method
143    pub async fn get_or_insert_race<F, Fut>(&self, key: K, init: F) -> V
144    where
145        F: FnOnce() -> Fut,
146        Fut: Future<Output = V>,
147    {
148        if let Some(val) = self.get(&key) {
149            return val;
150        }
151
152        self.get_or_insert_race_inner(key, init).await
153    }
154
155    /// See [`Self::get_or_insert_race`] for more information
156    pub async fn get_or_try_insert_race<F, Fut, E>(&self, key: K, init: F) -> Result<V, E>
157    where
158        F: FnOnce() -> Fut,
159        Fut: Future<Output = Result<V, E>>,
160    {
161        if let Some(val) = self.get(&key) {
162            return Ok(val);
163        }
164
165        self.get_or_try_insert_race_inner(key, init).await
166    }
167
168    /// Insert a value directly into the cache
169    /// 
170    /// ## Warning: 
171    /// Doing a [`Self::insert`] while another task is doing any type of async write ([Self::get_or_insert], [Self::get_or_try_insert], etc)
172    /// will "leak" a wakers entry, which will never be removed unless another `get_or_*` method is fully completed without being
173    /// interrupted by a call to this method.
174    /// 
175    /// This is mostly not a big deal as Wakers is small, and this pattern really should be avoided in practice.
176    #[inline]
177    pub fn insert(&self, key: K, value: V) -> Option<V> {
178        self.policy().insert(key, value, self)
179    }
180
181    /// Try to get a value from the cache
182    #[inline]
183    pub fn get(&self, key: &K) -> Option<V> {
184        self.policy().get(key, self)
185    }
186
187    /// Remove a value from the cache, returning the value if it was present
188    ///
189    /// ## Note: 
190    /// If this is called while another task is trying to [`Self::get_or_insert`] or [`Self::get_or_try_insert`],
191    /// it will force them to recompute the value and insert it again.
192    #[inline]
193    pub fn remove(&self, key: &K) -> Option<V> {
194        self.policy().remove(key, self)
195    }
196
197    /// Prune the cache of any expired keys
198    #[inline]
199    pub fn prune(&self) {
200        // todo: can we disblae must use warning? compiler should ignore the assignment though
201        let _lock = self.policy().lock_and_prune(self);
202    }
203}
204
205impl<K, V, S, P> LightCache<K, V, S, P>
206where
207    K: Eq + Hash + Copy,
208    V: Clone + Sync,
209    S: BuildHasher,
210    P: Policy<K, V>,
211{
212    #[inline]
213    fn get_or_insert_inner<F, Fut>(&self, key: K, init: F) -> GetOrInsertFuture<K, V, S, P, F, Fut>
214    where
215        F: FnOnce() -> Fut,
216        Fut: Future<Output = V>,
217    {
218        let (hash, shard) = self.inner.map.shard(&key).unwrap();
219
220        GetOrInsertFuture::new(self, shard, hash, key, init)
221    }
222
223    #[inline]
224    fn get_or_try_insert_inner<F, Fut, E>(
225        &self,
226        key: K,
227        init: F,
228    ) -> GetOrTryInsertFuture<K, V, S, P, F, Fut>
229    where
230        F: FnOnce() -> Fut,
231        Fut: Future<Output = Result<V, E>>,
232    {
233        let (hash, shard) = self.inner.map.shard(&key).unwrap();
234
235        GetOrTryInsertFuture::new(self, shard, hash, key, init)
236    }
237
238    #[inline]
239    fn get_or_insert_race_inner<F, Fut>(&self, key: K, init: F) -> GetOrInsertRace<K, V, S, P, Fut>
240    where
241        F: FnOnce() -> Fut,
242        Fut: Future<Output = V>,
243    {
244        let (hash, shard) = self.inner.map.shard(&key).unwrap();
245
246        GetOrInsertRace::new(self, shard, hash, key, init())
247    }
248
249    #[inline]
250    fn get_or_try_insert_race_inner<F, Fut, E>(
251        &self,
252        key: K,
253        init: F,
254    ) -> GetOrTryInsertRace<K, V, S, P, Fut>
255    where
256        F: FnOnce() -> Fut,
257        Fut: Future<Output = Result<V, E>>,
258    {
259        let (hash, shard) = self.inner.map.shard(&key).unwrap();
260
261        GetOrTryInsertRace::new(self, shard, hash, key, init())
262    }
263}
264
265impl<K, V, S, P> LightCache<K, V, S, P>
266where
267    K: Eq + Hash + Copy,
268    V: Clone + Sync,
269    S: BuildHasher,
270{
271    #[inline]
272    pub(crate) fn get_no_policy(&self, key: &K) -> Option<V> {
273        self.inner.map.get(key)
274    }
275
276    #[inline]
277    pub(crate) fn insert_no_policy(&self, key: K, value: V) -> Option<V> {
278        self.inner.map.insert(key, value)
279    }
280
281    #[inline]
282    pub(crate) fn remove_no_policy(&self, key: &K) -> Option<V> {
283        self.inner.map.remove(key)
284    }
285
286    #[inline]
287    pub(crate) fn policy(&self) -> &P {
288        &self.inner.policy
289    }
290}