lockable/lockable_map_impl.rs
1use futures::stream::{FuturesUnordered, Stream, StreamExt};
2use itertools::Itertools;
3use std::borrow::Borrow;
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::{Deref, DerefMut};
8
9use super::guard::Guard;
10use super::limit::{AsyncLimit, SyncLimit};
11use super::map_like::{GetOrInsertNoneResult, MapLike};
12use super::utils::primary_arc::PrimaryArc;
13use crate::utils::primary_arc::{ReplicaArc, ReplicaOwnedMutexGuard};
14
15// TODO Does it make sense to make the inner Mutex (i.e. tokio::sync::Mutex) a template parameter with a Mutex trait? It could allow user code to select std::sync::Mutex if they don't need the async_lock functions, or maybe even use their own mutex type if we make the trait public.
16
17pub trait LockableMapConfig {
18 type MapImpl<K, V>: MapLike<K, Entry<Self::WrappedV<V>>>
19 where
20 K: Eq + PartialEq + Hash + Clone;
21 type WrappedV<V>;
22
23 fn borrow_value<V>(v: &Self::WrappedV<V>) -> &V;
24 fn borrow_value_mut<V>(v: &mut Self::WrappedV<V>) -> &mut V;
25 fn wrap_value<V>(&self, v: V) -> Self::WrappedV<V>;
26 fn unwrap_value<V>(v: Self::WrappedV<V>) -> V;
27
28 /// This gets executed every time a value is unlocked.
29 /// The `v` parameter is the value that is being unlocked.
30 /// It is `None` if we locked and then unlocked a key that
31 /// actually doesn't have an entry in the map.
32 fn on_unlock<V>(&self, v: Option<&mut Self::WrappedV<V>>);
33}
34
35#[derive(Debug)]
36pub struct EntryValue<V> {
37 // While unlocked, an entry is always Some. While locked, it can be temporarily None
38 // since we enter None values into the map to lock keys that actually don't exist in the map.
39 pub(super) value: Option<V>,
40}
41
42pub(super) type Entry<V> = PrimaryArc<tokio::sync::Mutex<EntryValue<V>>>;
43
44pub struct LockableMapImpl<K, V, C>
45where
46 K: Eq + PartialEq + Hash + Clone,
47 C: LockableMapConfig + Clone,
48{
49 // We always use std::sync::Mutex for protecting the whole map since its guards
50 // never have to be kept across await boundaries, and std::sync::Mutex is faster
51 // than tokio::sync::Mutex. But the inner per-key locks use tokio::sync::Mutex
52 // because they need to be kept across await boundaries.
53 //
54 // We never hand the inner Arc around a map entry out of the encapsulation of this class,
55 // except through non-cloneable Guard objects encapsulating those Arcs.
56 // This allows us to reason about which threads can or cannot increase the refcounts.
57 //
58 // Invariants:
59 // 1) Every key can only be locked once at the same time, even if the key has no value in the map.
60 // -> This is fulfilled by inserting `None` values into the map for locking keys that don't have a value.
61 // 2) An entry can only be `None` if there is a [ReplicaArc]/[ReplicaOwnedMutexGuard] (e.g. [Guard]) for it, or if `entries` is locked (e.g. we're currently running some code that will reinstate the invariant before unlocking `entries`).
62 // This invariant ensures we don't accidentally leave `None` entries behind when we don't need them anymore.
63 // It is fulfilled through the following rules
64 // 2A) Any of the [PrimaryArc] entries in the map can only be cloned (i.e. have the refcount increased) while `entries` is locked. Cloning them will create a [ReplicaArc] that cannot be cloned further.
65 // However, while holding a [ReplicaArc], other threads can come in, lock `entries`, and create their own [ReplicaArc] to the same entry.
66 // 2B) Code creating a new `None` [PrimaryArc] in the map must always create a [ReplicaArc]/[ReplicaOwnedMutexGuard] for it.
67 // 2C) Anybody dropping a [ReplicaArc]/[ReplicaOwnedMutexGuard] must first get a lock on `entries`, then drop the ReplicaArc, then, if it is None,
68 // call [Self::_delete_if_unlocked_and_nobody_waiting_for_lock] or [Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock] to clean up the `None` entry.
69 // The easiest way to do this is to put the [ReplicaArg] into a [Guard] object that will do this correctly in its [Drop].
70 // Exception: If the same lock on `entries` that was present when the [ReplicaArc] was created is still held, i.e. no other thread could have dropped their instance of the [ReplicaArc] inbetween, then it's ok to just drop it without any `None` checks.
71 // (2C) means that it isn't necessarily the [ReplicaArc] created in (2B) that will clean up the `None` entry, since there could still be other [ReplicaArc]s, but the last [ReplicaArc] will clean it up.
72 // TODO Can 2C be enforced with some kind of Guard that isn't about locking but just about cleaning up the `None` entry? The actual [Guard] could then maybe hold this inner guard.
73 entries: std::sync::Mutex<C::MapImpl<K, V>>,
74
75 config: C,
76
77 _k: PhantomData<K>,
78 _v: PhantomData<V>,
79}
80
81enum LoadOrInsertMutexResult<V> {
82 Existing {
83 mutex: ReplicaArc<tokio::sync::Mutex<EntryValue<V>>>,
84 },
85 Inserted {
86 guard: ReplicaOwnedMutexGuard<EntryValue<V>>,
87 },
88}
89
90impl<K, V, C> LockableMapImpl<K, V, C>
91where
92 K: Eq + PartialEq + Hash + Clone,
93 C: LockableMapConfig + Clone,
94{
95 #[inline]
96 pub fn new(config: C) -> Self {
97 Self {
98 entries: std::sync::Mutex::new(C::MapImpl::<K, V>::new()),
99 config,
100 _k: PhantomData,
101 _v: PhantomData,
102 }
103 }
104
105 #[inline]
106 pub fn config(&self) -> &C {
107 &self.config
108 }
109
110 #[cfg(test)]
111 #[inline]
112 pub fn config_mut(&mut self) -> &mut C {
113 &mut self.config
114 }
115
116 #[inline]
117 pub fn num_entries_or_locked(&self) -> usize {
118 self._entries().len()
119 }
120
121 fn _entries(&self) -> EntriesGuard<'_, K, V, C> {
122 EntriesGuard::new(self.entries
123 .lock()
124 .expect("The global mutex protecting the LockableCache is poisoned. This shouldn't happen since there shouldn't be any user code running while this lock is held so no thread should ever panic with it"))
125 }
126
127 // WARNING: Call site must be very careful to always fulfill invariant 2C with the returned [ReplicaArc].
128 async fn _load_or_insert_mutex_for_key_async<S, E, F, OnEvictFn>(
129 this: &S,
130 key: &K,
131 limit: AsyncLimit<K, V, C, S, E, F, OnEvictFn>,
132 ) -> Result<LoadOrInsertMutexResult<C::WrappedV<V>>, E>
133 where
134 S: Borrow<Self> + Clone,
135 F: Future<Output = Result<(), E>>,
136 OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> F,
137 {
138 // Note: this logic is duplicated in _load_or_insert_mutex_for_key_sync without the .await calls
139 let mut entries = match limit {
140 AsyncLimit::NoLimit { .. } => {
141 // do nothing
142 this.borrow()._entries()
143 }
144 AsyncLimit::SoftLimit {
145 max_entries,
146 mut on_evict,
147 } => {
148 // free up space for the new entry if necessary
149 loop {
150 let locked = {
151 let mut entries = this.borrow()._entries();
152 let num_overlimit_entries =
153 entries.len().saturating_sub(max_entries.get() - 1);
154 if num_overlimit_entries == 0 {
155 // There is enough space, no need to free up space
156 break entries;
157 }
158 // There is not enough space, free up some.
159 let locked = Self::_lock_up_to_n_first_unlocked_entries(
160 this,
161 &mut entries,
162 num_overlimit_entries,
163 );
164
165 // If we couldn't lock any entries to free their space up, then
166 // all cache entries are currently locked. If we just waited
167 // until we lock one, there would be a potential dead lock
168 // if multiple threads hold locks and try to get more locks.
169 // Let's avoid that deadlock and allow the current locking
170 // request, even though it goes above the limit.
171 // This is why we call [AsyncLimit::SoftLimit] a "soft" limit.
172 if locked.is_empty() {
173 // TODO Test that this works, i.e. that the map still correctly works when it's full and doesn't deadlock (and same for the _load_or_insert_mutex_for_key_sync version)
174 break entries;
175 }
176
177 // We now have some entries locked that may free up enough space.
178 // Let's evict them. We have to free up the entries lock for that
179 // so that the on_evict user code can call back into Self::_unlock()
180 // for those entries. Additionally, we don't want `entries` to stay locked
181 // since user eviction code may write the entry back to an underlying
182 // storage layer and take some time.
183 //
184 // We do want to keep the entry itself locked and pass the guard to user code,
185 // because if user code does do writebacks, it probably needs the entry to be
186 // locked until the writeback is complete.
187 //
188 // However, unlocking entries means other user code may also run and cause
189 // race conditions, e.g. add new entries into the space we just created.
190 // Because of that, once on_evict returns, we'll check take the lock again
191 // in the next loop iteration and check again if we now have enough space.
192 std::mem::drop(entries);
193 locked
194 };
195 on_evict(locked).await?;
196 }
197 }
198 };
199 let result = match entries.get_or_insert_none(key) {
200 GetOrInsertNoneResult::Existing(mutex) => LoadOrInsertMutexResult::Existing {
201 // The call site needs to make sure it fulfills invariant 2C when dropping this [ReplicaArc].
202 mutex: PrimaryArc::clone(mutex),
203 },
204 GetOrInsertNoneResult::Inserted(mutex) => {
205 // If we just inserted the new entry, it'll have a `None` value. To fulfill invariant 2B, we need to put it in a [ReplicaOwnedMutexGuard].
206 // The call site needs to make sure it fulfills invariant 2C when dropping that [ReplicaOwnedMutexGuard].
207 let Ok(guard) = PrimaryArc::clone(mutex).try_lock_owned() else {
208 panic!(
209 "We're the only one who has seen this mutex so far. Locking can't fail."
210 );
211 };
212 LoadOrInsertMutexResult::Inserted { guard }
213 }
214 };
215 Ok(result)
216 }
217
218 // WARNING: Call site must be very careful to always fulfill invariant 2C with the returned [ReplicaArc].
219 fn _load_or_insert_mutex_for_key_sync<S, E, OnEvictFn>(
220 this: &S,
221 key: &K,
222 limit: SyncLimit<K, V, C, S, E, OnEvictFn>,
223 ) -> Result<LoadOrInsertMutexResult<C::WrappedV<V>>, E>
224 where
225 S: Borrow<Self> + Clone,
226 OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> Result<(), E>,
227 {
228 // Note: this logic is duplicated in _load_or_insert_mutex_for_key_sync with some .await calls
229 let mut entries = match limit {
230 SyncLimit::NoLimit { .. } => {
231 // do nothing
232 this.borrow()._entries()
233 }
234 SyncLimit::SoftLimit {
235 max_entries,
236 mut on_evict,
237 } => {
238 // free up space for the new entry if necessary
239 loop {
240 let locked = {
241 let mut entries = this.borrow()._entries();
242 let num_overlimit_entries =
243 entries.len().saturating_sub(max_entries.get() - 1);
244 if num_overlimit_entries == 0 {
245 // There is enough space, no need to free up space
246 break entries;
247 }
248 // There is not enough space, free up some.
249 let locked = Self::_lock_up_to_n_first_unlocked_entries(
250 this,
251 &mut entries,
252 num_overlimit_entries,
253 );
254
255 // If we couldn't lock any entries to free their space up, then
256 // all cache entries are currently locked. If we just waited
257 // until we lock one, there would be a potential dead lock
258 // if multiple threads hold locks and try to get more locks.
259 // Let's avoid that deadlock and allow the current locking
260 // request, even though it goes above the limit.
261 // This is why we call [AsyncLimit::SoftLimit] a "soft" limit.
262 if locked.is_empty() {
263 break entries;
264 }
265
266 // We now have some entries locked that may free up enough space.
267 // Let's evict them. We have to free up the entries lock for that
268 // so that the on_evict user code can call back into Self::_unlock()
269 // for those entries. Additionally, we don't want `entries` to stay locked
270 // since user eviction code may write the entry back to an underlying
271 // storage layer and take some time.
272 //
273 // We do want to keep the entry itself locked and pass the guard to user code,
274 // because if user code does do writebacks, it probably needs the entry to be
275 // locked until the writeback is complete.
276 //
277 // However, unlocking entries means other user code may also run and cause
278 // race conditions, e.g. add new entries into the space we just created.
279 // Because of that, once on_evict returns, we'll check take the lock again
280 // in the next loop iteration and check again if we now have enough space.
281 std::mem::drop(entries);
282 locked
283 };
284 on_evict(locked)?;
285 }
286 }
287 };
288 let result = match entries.get_or_insert_none(key) {
289 GetOrInsertNoneResult::Existing(mutex) => LoadOrInsertMutexResult::Existing {
290 // The call site needs to make sure it fulfills invariant 2C when dropping this [ReplicaArc].
291 mutex: PrimaryArc::clone(mutex),
292 },
293 GetOrInsertNoneResult::Inserted(mutex) => {
294 // If we just inserted the new entry, it'll have a `None` value. To fulfill invariant 2B, we need to put it in a [ReplicaOwnedMutexGuard].
295 // The call site needs to make sure it fulfills invariant 2C when dropping that [ReplicaOwnedMutexGuard].
296 let Ok(guard) = PrimaryArc::clone(mutex).try_lock_owned() else {
297 panic!(
298 "We're the only one who has seen this mutex so far. Locking can't fail."
299 );
300 };
301 LoadOrInsertMutexResult::Inserted { guard }
302 }
303 };
304 Ok(result)
305 }
306
307 fn _make_guard<S: Borrow<Self>>(
308 this: S,
309 key: K,
310 guard: ReplicaOwnedMutexGuard<EntryValue<C::WrappedV<V>>>,
311 ) -> Guard<K, V, C, S> {
312 Guard::new(this, key, guard)
313 }
314
315 #[inline]
316 pub fn blocking_lock<S, E, OnEvictFn>(
317 this: S,
318 key: K,
319 limit: SyncLimit<K, V, C, S, E, OnEvictFn>,
320 ) -> Result<Guard<K, V, C, S>, E>
321 where
322 S: Borrow<Self> + Clone,
323 OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> Result<(), E>,
324 {
325 let mutex = Self::_load_or_insert_mutex_for_key_sync(&this, &key, limit)?;
326 // Now we have an Arc::clone of the mutex for this key, and the global mutex is already unlocked so other threads can access the cache.
327 // The following blocks the thread until the mutex for this key is acquired.
328
329 let guard = match mutex {
330 LoadOrInsertMutexResult::Existing { mutex } => mutex.blocking_lock_owned(),
331 LoadOrInsertMutexResult::Inserted { guard } => guard,
332 };
333
334 // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
335 Ok(Self::_make_guard(this, key, guard))
336 }
337
338 #[inline]
339 pub async fn async_lock<S, E, F, OnEvictFn>(
340 this: S,
341 key: K,
342 limit: AsyncLimit<K, V, C, S, E, F, OnEvictFn>,
343 ) -> Result<Guard<K, V, C, S>, E>
344 where
345 S: Borrow<Self> + Clone,
346 F: Future<Output = Result<(), E>>,
347 OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> F,
348 {
349 let mutex = Self::_load_or_insert_mutex_for_key_async(&this, &key, limit).await?;
350 // Now we have an Arc::clone of the mutex for this key, and the global mutex is already unlocked so other threads can access the cache.
351 // The following blocks the task until the mutex for this key is acquired.
352
353 let guard = match mutex {
354 LoadOrInsertMutexResult::Existing { mutex } => mutex.lock_owned().await,
355 LoadOrInsertMutexResult::Inserted { guard } => guard,
356 };
357
358 // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
359 Ok(Self::_make_guard(this, key, guard))
360 }
361
362 #[inline]
363 pub fn try_lock<S, E, OnEvictFn>(
364 this: S,
365 key: K,
366 limit: SyncLimit<K, V, C, S, E, OnEvictFn>,
367 ) -> Result<Option<Guard<K, V, C, S>>, E>
368 where
369 S: Borrow<Self> + Clone,
370 OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> Result<(), E>,
371 {
372 let mutex = Self::_load_or_insert_mutex_for_key_sync(&this, &key, limit)?;
373 // Now we have an Arc::clone of the mutex for this key, and the global mutex is already unlocked so other threads can access the cache.
374 // The following tries to lock the mutex.
375
376 match mutex {
377 LoadOrInsertMutexResult::Existing { mutex } => match mutex.try_lock_owned() {
378 Ok(guard) => {
379 // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
380 Ok(Some(Self::_make_guard(this, key, guard)))
381 }
382 Err(replica_arc) => {
383 // To fulfill invariant 2C, we need to call [Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock] here.
384 let mut entries = this.borrow()._entries();
385 Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock(
386 &mut entries,
387 &key,
388 replica_arc,
389 );
390 Ok(None)
391 }
392 },
393 LoadOrInsertMutexResult::Inserted { guard } => {
394 // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
395 Ok(Some(Self::_make_guard(this, key, guard)))
396 }
397 }
398 }
399
400 #[inline]
401 pub async fn try_lock_async<S, E, F, OnEvictFn>(
402 this: S,
403 key: K,
404 limit: AsyncLimit<K, V, C, S, E, F, OnEvictFn>,
405 ) -> Result<Option<Guard<K, V, C, S>>, E>
406 where
407 S: Borrow<Self> + Clone,
408 F: Future<Output = Result<(), E>>,
409 OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> F,
410 {
411 let mutex = Self::_load_or_insert_mutex_for_key_async(&this, &key, limit).await?;
412 // Now we have an Arc::clone of the mutex for this key, and the global mutex is already unlocked so other threads can access the cache.
413 // The following tries to lock the mutex.
414
415 match mutex {
416 LoadOrInsertMutexResult::Existing { mutex } => match mutex.try_lock_owned() {
417 Ok(guard) => {
418 // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
419 Ok(Some(Self::_make_guard(this, key, guard)))
420 }
421 Err(replica_arc) => {
422 // TODO Deduplicate this code with the try_lock method
423 // To fulfill invariant 2C, we need to call [Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock] here.
424 let mut entries = this.borrow()._entries();
425 Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock(
426 &mut entries,
427 &key,
428 replica_arc,
429 );
430 Ok(None)
431 }
432 },
433 LoadOrInsertMutexResult::Inserted { guard } => {
434 // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
435 Ok(Some(Self::_make_guard(this, key, guard)))
436 }
437 }
438 }
439
440 pub fn lock_all_unlocked<S: Borrow<Self> + Clone>(
441 this: S,
442 take_while_condition: &impl Fn(&Guard<K, V, C, S>) -> bool,
443 ) -> Vec<Guard<K, V, C, S>> {
444 let entries = this.borrow()._entries();
445 let mut previously_unlocked_entries = entries
446 .iter()
447 .filter_map(
448 |(key, mutex)| match PrimaryArc::clone(mutex).try_lock_owned() {
449 Ok(guard) => Some(Self::_make_guard(this.clone(), key.clone(), guard)),
450 Err(_) => {
451 // Just dropping the [ReplicaArc] here without calling [Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock] is fine
452 // despite invariant 2C, because we hold a lock on `entries` and had that lock since the call to [PrimaryArc::clone].
453 None
454 }
455 },
456 )
457 .take_while_inclusive(take_while_condition)
458 // Collecting into a Vec so that we don't have to keep `entries` locked
459 // while the returned iterator is alive.
460 .collect::<Vec<_>>();
461
462 // We now have all entries fulfilling the `take_while_condition` plus one entry that probably does not
463 // (however, it might fulfill the condition if all entries fulfill it).
464 // We need to remove that last entry and drop it, but before we can do that, we need to drop
465 // `entries` because otherwise we'd have a deadlock when the entry tries to unlock itself.
466 // This whole issue is actually the reason why we used `take_while_inclusive` instead of just
467 // `take_while` above. `take_while` would drop this entry while the stream is being processed
468 // and cause this very deadlock.
469
470 std::mem::drop(entries);
471 if let Some(last_entry) = previously_unlocked_entries.pop() {
472 if take_while_condition(&last_entry) {
473 // It actually fulfilled the take_while_condition.
474 // This can happen if all entries in the map fulfill the condition and this was the overall last map element.
475 // We actually want to return it, so add it back to the return value.
476 previously_unlocked_entries.push(last_entry);
477 }
478 }
479
480 previously_unlocked_entries
481 }
482
483 /// Locks all entries in the cache and returns their guards as a stream.
484 /// For entries that are locked by other threads or tasks, the stream will wait until they are unlocked.
485 /// If that other thread or task having a lock for an entry
486 /// - creates the entry => the stream will return them
487 /// - removes the entry => the stream will not return them
488 /// - entries that were locked by another thread or task but don't have a value will not be returned
489 pub async fn lock_all_entries<S: Borrow<Self> + Clone>(
490 this: S,
491 ) -> impl Stream<Item = Guard<K, V, C, S>> {
492 let entries = this.borrow()._entries();
493 let stream = entries
494 .iter()
495 .map(|(key, mutex)| {
496 let this = this.clone();
497 let key = key.clone();
498 // Concurrency: PrimaryArc::clone must happen before we go async, while we still have the lock on `entries`,
499 // so that invariant 2A is fulfilled (refcount must only be increased while `entries` is locked).
500 // The refcount will only be decreased through the Guard, which means it will also only happen
501 // while `entries` is locked and invariant 2C is fulfilled.
502 let mutex = PrimaryArc::clone(mutex);
503 async move {
504 let guard = mutex.lock_owned().await;
505 let guard = Self::_make_guard(this, key, guard);
506 if guard.value().is_some() {
507 Some(guard)
508 } else {
509 // Dropping the guard fulfills invariant 2C.
510 None
511 }
512 }
513 })
514 .collect::<FuturesUnordered<_>>()
515 // Filter out entries that were removed or not-preexisting and not created while locked
516 .filter_map(futures::future::ready);
517 // Drop to ensure that the stream doesn't accidentally capture a lock on `entries`,
518 // so that other threads can keep locking/unlocking while the stream is being processed.
519 std::mem::drop(entries);
520 stream
521 }
522
523 pub(super) fn _unlock(
524 &self,
525 key: &K,
526 mut guard: ReplicaOwnedMutexGuard<EntryValue<C::WrappedV<V>>>,
527 ) {
528 self.config.on_unlock(guard.value.as_mut());
529 let entry_carries_a_value = guard.value.is_some();
530
531 // We need to get the `entries` lock before we drop the guard, see invariant 2C.
532 let mut entries = self._entries();
533 std::mem::drop(guard);
534
535 // Now the guard is dropped and the lock for this key is unlocked.
536 // If there are any other Self::blocking_lock/async_lock/try_lock()
537 // calls for this key already running and waiting for the mutex,
538 // they will be unblocked now and their guard will be created.
539
540 // If the guard we dropped carried a value, keep the entry in the map.
541 // But if it doesn't carry a value, clean up since the entry semantically
542 // doesn't exist in the map and was only created to have a place to put
543 // the mutex. This fulfills invariant 2C.
544 if !entry_carries_a_value {
545 Self::_delete_if_unlocked_and_nobody_waiting_for_lock(&mut entries, key);
546 }
547 }
548
549 fn _delete_if_unlocked_and_nobody_waiting_for_lock(
550 entries: &mut EntriesGuard<'_, K, V, C>,
551 key: &K,
552 ) {
553 let mutex: &Entry<C::WrappedV<V>> = entries
554 .get(key)
555 .expect("This entry must exist or this function shouldn't have been called");
556 // We have a lock on `entries` and invariant 2A ensures that no other threads or tasks can currently
557 // increase num_replicas (i.e. `Arc::strong_count`) or create clones of this Arc. This means that if num_replicas == 0,
558 // we know that we are the only ones with a handle to this `Arc` and we can clean it up without race conditions.
559 if mutex.num_replicas() == 0 {
560 // TODO Combine the `get` above and `remove` here into a single hashing operation, using the hash map's entry API
561 let remove_result = entries.remove(key);
562 assert!(
563 remove_result.is_some(),
564 "We just got this entry above from the hash map, it cannot have vanished since then"
565 );
566 } else {
567 // Another task or thread currently has a [ReplicaArc] for this entry, it may or may not be locked. We cannot clean up yet.
568 // With invariant 2C, we know that thread or task hasn't cleaned up yet but will wait for us to release the `entries`
569 // lock and then eventually call [Self::_delete_if_unlocked_and_nobody_waiting_for_lock] again.
570 // We can just exit and let them deal with it.
571 }
572 }
573
574 fn _delete_if_unlocked_none_and_nobody_waiting_for_lock(
575 entries: &mut EntriesGuard<'_, K, V, C>,
576 key: &K,
577 entry: ReplicaArc<tokio::sync::Mutex<EntryValue<C::WrappedV<V>>>>,
578 ) {
579 // We have a lock on `entries` and invariant 2A ensures that no other threads or tasks can currently
580 // increase num_replicas (i.e. `Arc::strong_count`) or create clones of this Arc. This means that if num_replicas == 1,
581 // we know that we are the only ones with a handle to this `Arc` and we can clean it up without race conditions.
582 if entry.num_replicas() == 1 {
583 let locked = entry
584 .try_lock()
585 .expect("We're the only one who has access to this mutex. Locking can't fail.");
586 if locked.value.is_none() {
587 // TODO Combine the `get` above and `remove` here into a single hashing operation, using the hash map's entry API
588 let remove_result = entries.remove(key);
589 assert!(
590 remove_result.is_some(),
591 "We just got this entry above from the hash map, it cannot have vanished since then"
592 );
593 }
594 } else {
595 // Another task or thread currently has a [ReplicaArc] for this entry, it may or may not be locked. We cannot clean up yet.
596 // With invariant 2C, we know that thread or task hasn't cleaned up yet but will wait for us to release the `entries`
597 // lock and then eventually call [Self::_delete_if_unlocked_and_nobody_waiting_for_lock] again.
598 // We can just exit and let them deal with it.
599 }
600 }
601
602 pub fn into_entries_unordered(self) -> impl Iterator<Item = (K, C::WrappedV<V>)> {
603 let entries: C::MapImpl<K, V> = self.entries.into_inner().expect("Lock poisoned");
604
605 #[cfg(any(test, feature = "slow_assertions"))]
606 EntriesGuard::<K, V, C>::assert_invariant(&entries);
607
608 // We now have exclusive access to the LockableMapImpl object. Rust lifetime rules ensure that no other thread or task can have any
609 // Guard for an entry since both owned and non-owned guards are bound to the lifetime of the LockableMapImpl (owned guards
610 // indirectly through the Arc but if user code calls this function, it means they had to call Arc::try_unwrap or something similar
611 // which ensures that there are no other threads with access to it.
612
613 entries
614 .into_iter()
615 .map(|(key, value)| {
616 let value = PrimaryArc::try_unwrap(value)
617 .unwrap_or_else(|_| panic!("We're the only one with access, there shouldn't be any other threads or tasks that have a copy of this Arc."));
618 let value = value.into_inner().value.expect("Invariant 2 violated. There shouldn't be any `None` entries since there aren't any ReplicaArcs.");
619 (key, value)
620 })
621 }
622
623 // Caveat: Locked keys are listed even if they don't carry a value
624 #[inline]
625 pub fn keys_with_entries_or_locked(&self) -> Vec<K> {
626 let entries = self._entries();
627 entries.iter().map(|(key, _value)| key).cloned().collect()
628 }
629
630 fn _lock_up_to_n_first_unlocked_entries<S: Borrow<Self> + Clone>(
631 this: &S,
632 entries: &mut EntriesGuard<'_, K, V, C>,
633 num_entries: usize,
634 ) -> Vec<Guard<K, V, C, S>> {
635 let mut result = Vec::with_capacity(num_entries);
636 for (key, mutex) in entries.iter() {
637 match PrimaryArc::clone(mutex).try_lock_owned() {
638 Ok(guard) => {
639 if guard.value.is_some() {
640 result.push(Self::_make_guard(this.clone(), key.clone(), guard))
641 } else {
642 // We have not created this `None` entry and according to invariant 2, we know that there is
643 // a [ReplicaArc] for it somewhere. Even though it seems to be unlocked, we know it exists.
644 // Because we have a lock on `entries`, invariant 2C tells us that this [ReplicaArc]'s destruction
645 // will wait for us before cleaning up the `None`.
646 // We don't need to worry about cleaning up the `None` ourselves.
647 assert!(mutex.num_replicas() > 1, "Invariant violated");
648 }
649 }
650 Err(_replica_arc) => {
651 // Invariant 2C:
652 // A failed try_lock means we currently have another lock and we can rely on that one
653 // calling _unlock and potentially deleting an item if it is None. So we don't need
654 // to create a guard object here. Because we have a lock on `entries`, there are no
655 // race conditions with that _unlock.
656 // It's ok to just drop the [ReplicaArc] without calling _delete_if_unlocked_none_and_nobody_waiting_for_lock
657 // because we have a lock on `entries` and had that lock since the call to [PrimaryArc::clone].
658 assert!(mutex.num_replicas() > 1, "Invariant violated");
659 }
660 }
661 if result.len() >= num_entries {
662 break;
663 }
664 }
665 result
666 }
667}
668
669impl<K, V, C> Debug for LockableMapImpl<K, V, C>
670where
671 K: Eq + PartialEq + Hash + Clone,
672 C: LockableMapConfig + Clone,
673{
674 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675 fmt.debug_struct("LockableMapImpl").finish()
676 }
677}
678
679/// Simple wrapper around `MutexGuard<C::MapImpl<K, V>>`.
680/// In release mode, this doesn't do anything else.
681/// In debug mode or tests, it enforces our invariants.
682struct EntriesGuard<'a, K, V, C>
683where
684 K: Eq + PartialEq + Hash + Clone,
685 C: LockableMapConfig + Clone,
686{
687 entries: std::sync::MutexGuard<'a, C::MapImpl<K, V>>,
688 _k: PhantomData<K>,
689 _v: PhantomData<V>,
690 _c: PhantomData<C>,
691}
692
693impl<'a, K, V, C> EntriesGuard<'a, K, V, C>
694where
695 K: Eq + PartialEq + Hash + Clone,
696 C: LockableMapConfig + Clone,
697{
698 #[track_caller]
699 fn new(entries: std::sync::MutexGuard<'a, C::MapImpl<K, V>>) -> Self {
700 #[cfg(any(test, feature = "slow_assertions"))]
701 Self::assert_invariant(&entries);
702
703 Self {
704 entries,
705 _k: PhantomData,
706 _v: PhantomData,
707 _c: PhantomData,
708 }
709 }
710
711 #[cfg(any(test, feature = "slow_assertions"))]
712 #[track_caller]
713 fn assert_invariant(entries: &C::MapImpl<K, V>) {
714 for (_key, entry) in entries.iter() {
715 if entry.num_replicas() == 0 {
716 let Ok(guard) = PrimaryArc::clone(entry).try_lock_owned() else {
717 panic!("We're the only one with access, locking can't fail");
718 };
719 assert!(
720 guard.value.is_some(),
721 "Invariant 2 violated. Found an entry without ReplicaArcs that is None."
722 );
723 }
724 }
725 }
726}
727
728impl<K, V, C> Deref for EntriesGuard<'_, K, V, C>
729where
730 K: Eq + PartialEq + Hash + Clone,
731 C: LockableMapConfig + Clone,
732{
733 type Target = C::MapImpl<K, V>;
734
735 fn deref(&self) -> &Self::Target {
736 &self.entries
737 }
738}
739
740impl<K, V, C> DerefMut for EntriesGuard<'_, K, V, C>
741where
742 K: Eq + PartialEq + Hash + Clone,
743 C: LockableMapConfig + Clone,
744{
745 fn deref_mut(&mut self) -> &mut Self::Target {
746 &mut self.entries
747 }
748}
749
750#[cfg(any(test, feature = "slow_assertions"))]
751impl<K, V, C> Drop for EntriesGuard<'_, K, V, C>
752where
753 K: Eq + PartialEq + Hash + Clone,
754 C: LockableMapConfig + Clone,
755{
756 fn drop(&mut self) {
757 Self::assert_invariant(&self.entries);
758 }
759}
760
761#[cfg(test)]
762mod tests {
763 use super::*;
764 use crate::lockable_hash_map::LockableHashMapConfig;
765
766 #[test]
767 fn test_debug() {
768 let map = LockableMapImpl::<i64, String, LockableHashMapConfig>::new(LockableHashMapConfig);
769 assert_eq!("LockableMapImpl", format!("{:?}", map));
770 }
771}