lockable/
lockable_map_impl.rs

1use futures::stream::{FuturesUnordered, Stream, StreamExt};
2use itertools::Itertools;
3use std::borrow::Borrow;
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::{Deref, DerefMut};
8
9use super::guard::Guard;
10use super::limit::{AsyncLimit, SyncLimit};
11use super::map_like::{GetOrInsertNoneResult, MapLike};
12use super::utils::primary_arc::PrimaryArc;
13use crate::utils::primary_arc::{ReplicaArc, ReplicaOwnedMutexGuard};
14
15// TODO Does it make sense to make the inner Mutex (i.e. tokio::sync::Mutex) a template parameter with a Mutex trait? It could allow user code to select std::sync::Mutex if they don't need the async_lock functions, or maybe even use their own mutex type if we make the trait public.
16
17pub trait LockableMapConfig {
18    type MapImpl<K, V>: MapLike<K, Entry<Self::WrappedV<V>>>
19    where
20        K: Eq + PartialEq + Hash + Clone;
21    type WrappedV<V>;
22
23    fn borrow_value<V>(v: &Self::WrappedV<V>) -> &V;
24    fn borrow_value_mut<V>(v: &mut Self::WrappedV<V>) -> &mut V;
25    fn wrap_value<V>(&self, v: V) -> Self::WrappedV<V>;
26    fn unwrap_value<V>(v: Self::WrappedV<V>) -> V;
27
28    /// This gets executed every time a value is unlocked.
29    /// The `v` parameter is the value that is being unlocked.
30    /// It is `None` if we locked and then unlocked a key that
31    /// actually doesn't have an entry in the map.
32    fn on_unlock<V>(&self, v: Option<&mut Self::WrappedV<V>>);
33}
34
35#[derive(Debug)]
36pub struct EntryValue<V> {
37    // While unlocked, an entry is always Some. While locked, it can be temporarily None
38    // since we enter None values into the map to lock keys that actually don't exist in the map.
39    pub(super) value: Option<V>,
40}
41
42pub(super) type Entry<V> = PrimaryArc<tokio::sync::Mutex<EntryValue<V>>>;
43
44pub struct LockableMapImpl<K, V, C>
45where
46    K: Eq + PartialEq + Hash + Clone,
47    C: LockableMapConfig + Clone,
48{
49    // We always use std::sync::Mutex for protecting the whole map since its guards
50    // never have to be kept across await boundaries, and std::sync::Mutex is faster
51    // than tokio::sync::Mutex. But the inner per-key locks use tokio::sync::Mutex
52    // because they need to be kept across await boundaries.
53    //
54    // We never hand the inner Arc around a map entry out of the encapsulation of this class,
55    // except through non-cloneable Guard objects encapsulating those Arcs.
56    // This allows us to reason about which threads can or cannot increase the refcounts.
57    //
58    // Invariants:
59    //   1) Every key can only be locked once at the same time, even if the key has no value in the map.
60    //      -> This is fulfilled by inserting `None` values into the map for locking keys that don't have a value.
61    //   2) An entry can only be `None` if there is a [ReplicaArc]/[ReplicaOwnedMutexGuard] (e.g. [Guard]) for it, or if `entries` is locked (e.g. we're currently running some code that will reinstate the invariant before unlocking `entries`).
62    //      This invariant ensures we don't accidentally leave `None` entries behind when we don't need them anymore.
63    //      It is fulfilled through the following rules
64    //      2A) Any of the [PrimaryArc] entries in the map can only be cloned (i.e. have the refcount increased) while `entries` is locked. Cloning them will create a [ReplicaArc] that cannot be cloned further.
65    //          However, while holding a [ReplicaArc], other threads can come in, lock `entries`, and create their own [ReplicaArc] to the same entry.
66    //      2B) Code creating a new `None` [PrimaryArc] in the map must always create a [ReplicaArc]/[ReplicaOwnedMutexGuard] for it.
67    //      2C) Anybody dropping a [ReplicaArc]/[ReplicaOwnedMutexGuard] must first get a lock on `entries`, then drop the ReplicaArc, then, if it is None,
68    //          call [Self::_delete_if_unlocked_and_nobody_waiting_for_lock] or [Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock] to clean up the `None` entry.
69    //          The easiest way to do this is to put the [ReplicaArg] into a [Guard] object that will do this correctly in its [Drop].
70    //          Exception: If the same lock on `entries` that was present when the [ReplicaArc] was created is still held, i.e. no other thread could have dropped their instance of the [ReplicaArc] inbetween, then it's ok to just drop it without any `None` checks.
71    //      (2C) means that it isn't necessarily the [ReplicaArc] created in (2B) that will clean up the `None` entry, since there could still be other [ReplicaArc]s, but the last [ReplicaArc] will clean it up.
72    //  TODO Can 2C be enforced with some kind of Guard that isn't about locking but just about cleaning up the `None` entry? The actual [Guard] could then maybe hold this inner guard.
73    entries: std::sync::Mutex<C::MapImpl<K, V>>,
74
75    config: C,
76
77    _k: PhantomData<K>,
78    _v: PhantomData<V>,
79}
80
81enum LoadOrInsertMutexResult<V> {
82    Existing {
83        mutex: ReplicaArc<tokio::sync::Mutex<EntryValue<V>>>,
84    },
85    Inserted {
86        guard: ReplicaOwnedMutexGuard<EntryValue<V>>,
87    },
88}
89
90impl<K, V, C> LockableMapImpl<K, V, C>
91where
92    K: Eq + PartialEq + Hash + Clone,
93    C: LockableMapConfig + Clone,
94{
95    #[inline]
96    pub fn new(config: C) -> Self {
97        Self {
98            entries: std::sync::Mutex::new(C::MapImpl::<K, V>::new()),
99            config,
100            _k: PhantomData,
101            _v: PhantomData,
102        }
103    }
104
105    #[inline]
106    pub fn config(&self) -> &C {
107        &self.config
108    }
109
110    #[cfg(test)]
111    #[inline]
112    pub fn config_mut(&mut self) -> &mut C {
113        &mut self.config
114    }
115
116    #[inline]
117    pub fn num_entries_or_locked(&self) -> usize {
118        self._entries().len()
119    }
120
121    fn _entries(&self) -> EntriesGuard<'_, K, V, C> {
122        EntriesGuard::new(self.entries
123            .lock()
124            .expect("The global mutex protecting the LockableCache is poisoned. This shouldn't happen since there shouldn't be any user code running while this lock is held so no thread should ever panic with it"))
125    }
126
127    // WARNING: Call site must be very careful to always fulfill invariant 2C with the returned [ReplicaArc].
128    async fn _load_or_insert_mutex_for_key_async<S, E, F, OnEvictFn>(
129        this: &S,
130        key: &K,
131        limit: AsyncLimit<K, V, C, S, E, F, OnEvictFn>,
132    ) -> Result<LoadOrInsertMutexResult<C::WrappedV<V>>, E>
133    where
134        S: Borrow<Self> + Clone,
135        F: Future<Output = Result<(), E>>,
136        OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> F,
137    {
138        // Note: this logic is duplicated in _load_or_insert_mutex_for_key_sync without the .await calls
139        let mut entries = match limit {
140            AsyncLimit::NoLimit { .. } => {
141                // do nothing
142                this.borrow()._entries()
143            }
144            AsyncLimit::SoftLimit {
145                max_entries,
146                mut on_evict,
147            } => {
148                // free up space for the new entry if necessary
149                loop {
150                    let locked = {
151                        let mut entries = this.borrow()._entries();
152                        let num_overlimit_entries =
153                            entries.len().saturating_sub(max_entries.get() - 1);
154                        if num_overlimit_entries == 0 {
155                            // There is enough space, no need to free up space
156                            break entries;
157                        }
158                        // There is not enough space, free up some.
159                        let locked = Self::_lock_up_to_n_first_unlocked_entries(
160                            this,
161                            &mut entries,
162                            num_overlimit_entries,
163                        );
164
165                        // If we couldn't lock any entries to free their space up, then
166                        // all cache entries are currently locked. If we just waited
167                        // until we lock one, there would be a potential dead lock
168                        // if multiple threads hold locks and try to get more locks.
169                        // Let's avoid that deadlock and allow the current locking
170                        // request, even though it goes above the limit.
171                        // This is why we call [AsyncLimit::SoftLimit] a "soft" limit.
172                        if locked.is_empty() {
173                            // TODO Test that this works, i.e. that the map still correctly works when it's full and doesn't deadlock (and same for the _load_or_insert_mutex_for_key_sync version)
174                            break entries;
175                        }
176
177                        // We now have some entries locked that may free up enough space.
178                        // Let's evict them. We have to free up the entries lock for that
179                        // so that the on_evict user code can call back into Self::_unlock()
180                        // for those entries. Additionally, we don't want `entries` to stay locked
181                        // since user eviction code may write the entry back to an underlying
182                        // storage layer and take some time.
183                        //
184                        // We do want to keep the entry itself locked and pass the guard to user code,
185                        // because if user code does do writebacks, it probably needs the entry to be
186                        // locked until the writeback is complete.
187                        //
188                        // However, unlocking entries means other user code may also run and cause
189                        // race conditions, e.g. add new entries into the space we just created.
190                        // Because of that, once on_evict returns, we'll check take the lock again
191                        // in the next loop iteration and check again if we now have enough space.
192                        std::mem::drop(entries);
193                        locked
194                    };
195                    on_evict(locked).await?;
196                }
197            }
198        };
199        let result = match entries.get_or_insert_none(key) {
200            GetOrInsertNoneResult::Existing(mutex) => LoadOrInsertMutexResult::Existing {
201                // The call site needs to make sure it fulfills invariant 2C when dropping this [ReplicaArc].
202                mutex: PrimaryArc::clone(mutex),
203            },
204            GetOrInsertNoneResult::Inserted(mutex) => {
205                // If we just inserted the new entry, it'll have a `None` value. To fulfill invariant 2B, we need to put it in a [ReplicaOwnedMutexGuard].
206                // The call site needs to make sure it fulfills invariant 2C when dropping that [ReplicaOwnedMutexGuard].
207                let Ok(guard) = PrimaryArc::clone(mutex).try_lock_owned() else {
208                    panic!(
209                        "We're the only one who has seen this mutex so far. Locking can't fail."
210                    );
211                };
212                LoadOrInsertMutexResult::Inserted { guard }
213            }
214        };
215        Ok(result)
216    }
217
218    // WARNING: Call site must be very careful to always fulfill invariant 2C with the returned [ReplicaArc].
219    fn _load_or_insert_mutex_for_key_sync<S, E, OnEvictFn>(
220        this: &S,
221        key: &K,
222        limit: SyncLimit<K, V, C, S, E, OnEvictFn>,
223    ) -> Result<LoadOrInsertMutexResult<C::WrappedV<V>>, E>
224    where
225        S: Borrow<Self> + Clone,
226        OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> Result<(), E>,
227    {
228        // Note: this logic is duplicated in _load_or_insert_mutex_for_key_sync with some .await calls
229        let mut entries = match limit {
230            SyncLimit::NoLimit { .. } => {
231                // do nothing
232                this.borrow()._entries()
233            }
234            SyncLimit::SoftLimit {
235                max_entries,
236                mut on_evict,
237            } => {
238                // free up space for the new entry if necessary
239                loop {
240                    let locked = {
241                        let mut entries = this.borrow()._entries();
242                        let num_overlimit_entries =
243                            entries.len().saturating_sub(max_entries.get() - 1);
244                        if num_overlimit_entries == 0 {
245                            // There is enough space, no need to free up space
246                            break entries;
247                        }
248                        // There is not enough space, free up some.
249                        let locked = Self::_lock_up_to_n_first_unlocked_entries(
250                            this,
251                            &mut entries,
252                            num_overlimit_entries,
253                        );
254
255                        // If we couldn't lock any entries to free their space up, then
256                        // all cache entries are currently locked. If we just waited
257                        // until we lock one, there would be a potential dead lock
258                        // if multiple threads hold locks and try to get more locks.
259                        // Let's avoid that deadlock and allow the current locking
260                        // request, even though it goes above the limit.
261                        // This is why we call [AsyncLimit::SoftLimit] a "soft" limit.
262                        if locked.is_empty() {
263                            break entries;
264                        }
265
266                        // We now have some entries locked that may free up enough space.
267                        // Let's evict them. We have to free up the entries lock for that
268                        // so that the on_evict user code can call back into Self::_unlock()
269                        // for those entries. Additionally, we don't want `entries` to stay locked
270                        // since user eviction code may write the entry back to an underlying
271                        // storage layer and take some time.
272                        //
273                        // We do want to keep the entry itself locked and pass the guard to user code,
274                        // because if user code does do writebacks, it probably needs the entry to be
275                        // locked until the writeback is complete.
276                        //
277                        // However, unlocking entries means other user code may also run and cause
278                        // race conditions, e.g. add new entries into the space we just created.
279                        // Because of that, once on_evict returns, we'll check take the lock again
280                        // in the next loop iteration and check again if we now have enough space.
281                        std::mem::drop(entries);
282                        locked
283                    };
284                    on_evict(locked)?;
285                }
286            }
287        };
288        let result = match entries.get_or_insert_none(key) {
289            GetOrInsertNoneResult::Existing(mutex) => LoadOrInsertMutexResult::Existing {
290                // The call site needs to make sure it fulfills invariant 2C when dropping this [ReplicaArc].
291                mutex: PrimaryArc::clone(mutex),
292            },
293            GetOrInsertNoneResult::Inserted(mutex) => {
294                // If we just inserted the new entry, it'll have a `None` value. To fulfill invariant 2B, we need to put it in a [ReplicaOwnedMutexGuard].
295                // The call site needs to make sure it fulfills invariant 2C when dropping that [ReplicaOwnedMutexGuard].
296                let Ok(guard) = PrimaryArc::clone(mutex).try_lock_owned() else {
297                    panic!(
298                        "We're the only one who has seen this mutex so far. Locking can't fail."
299                    );
300                };
301                LoadOrInsertMutexResult::Inserted { guard }
302            }
303        };
304        Ok(result)
305    }
306
307    fn _make_guard<S: Borrow<Self>>(
308        this: S,
309        key: K,
310        guard: ReplicaOwnedMutexGuard<EntryValue<C::WrappedV<V>>>,
311    ) -> Guard<K, V, C, S> {
312        Guard::new(this, key, guard)
313    }
314
315    #[inline]
316    pub fn blocking_lock<S, E, OnEvictFn>(
317        this: S,
318        key: K,
319        limit: SyncLimit<K, V, C, S, E, OnEvictFn>,
320    ) -> Result<Guard<K, V, C, S>, E>
321    where
322        S: Borrow<Self> + Clone,
323        OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> Result<(), E>,
324    {
325        let mutex = Self::_load_or_insert_mutex_for_key_sync(&this, &key, limit)?;
326        // Now we have an Arc::clone of the mutex for this key, and the global mutex is already unlocked so other threads can access the cache.
327        // The following blocks the thread until the mutex for this key is acquired.
328
329        let guard = match mutex {
330            LoadOrInsertMutexResult::Existing { mutex } => mutex.blocking_lock_owned(),
331            LoadOrInsertMutexResult::Inserted { guard } => guard,
332        };
333
334        // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
335        Ok(Self::_make_guard(this, key, guard))
336    }
337
338    #[inline]
339    pub async fn async_lock<S, E, F, OnEvictFn>(
340        this: S,
341        key: K,
342        limit: AsyncLimit<K, V, C, S, E, F, OnEvictFn>,
343    ) -> Result<Guard<K, V, C, S>, E>
344    where
345        S: Borrow<Self> + Clone,
346        F: Future<Output = Result<(), E>>,
347        OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> F,
348    {
349        let mutex = Self::_load_or_insert_mutex_for_key_async(&this, &key, limit).await?;
350        // Now we have an Arc::clone of the mutex for this key, and the global mutex is already unlocked so other threads can access the cache.
351        // The following blocks the task until the mutex for this key is acquired.
352
353        let guard = match mutex {
354            LoadOrInsertMutexResult::Existing { mutex } => mutex.lock_owned().await,
355            LoadOrInsertMutexResult::Inserted { guard } => guard,
356        };
357
358        // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
359        Ok(Self::_make_guard(this, key, guard))
360    }
361
362    #[inline]
363    pub fn try_lock<S, E, OnEvictFn>(
364        this: S,
365        key: K,
366        limit: SyncLimit<K, V, C, S, E, OnEvictFn>,
367    ) -> Result<Option<Guard<K, V, C, S>>, E>
368    where
369        S: Borrow<Self> + Clone,
370        OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> Result<(), E>,
371    {
372        let mutex = Self::_load_or_insert_mutex_for_key_sync(&this, &key, limit)?;
373        // Now we have an Arc::clone of the mutex for this key, and the global mutex is already unlocked so other threads can access the cache.
374        // The following tries to lock the mutex.
375
376        match mutex {
377            LoadOrInsertMutexResult::Existing { mutex } => match mutex.try_lock_owned() {
378                Ok(guard) => {
379                    // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
380                    Ok(Some(Self::_make_guard(this, key, guard)))
381                }
382                Err(replica_arc) => {
383                    // To fulfill invariant 2C, we need to call [Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock] here.
384                    let mut entries = this.borrow()._entries();
385                    Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock(
386                        &mut entries,
387                        &key,
388                        replica_arc,
389                    );
390                    Ok(None)
391                }
392            },
393            LoadOrInsertMutexResult::Inserted { guard } => {
394                // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
395                Ok(Some(Self::_make_guard(this, key, guard)))
396            }
397        }
398    }
399
400    #[inline]
401    pub async fn try_lock_async<S, E, F, OnEvictFn>(
402        this: S,
403        key: K,
404        limit: AsyncLimit<K, V, C, S, E, F, OnEvictFn>,
405    ) -> Result<Option<Guard<K, V, C, S>>, E>
406    where
407        S: Borrow<Self> + Clone,
408        F: Future<Output = Result<(), E>>,
409        OnEvictFn: FnMut(Vec<Guard<K, V, C, S>>) -> F,
410    {
411        let mutex = Self::_load_or_insert_mutex_for_key_async(&this, &key, limit).await?;
412        // Now we have an Arc::clone of the mutex for this key, and the global mutex is already unlocked so other threads can access the cache.
413        // The following tries to lock the mutex.
414
415        match mutex {
416            LoadOrInsertMutexResult::Existing { mutex } => match mutex.try_lock_owned() {
417                Ok(guard) => {
418                    // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
419                    Ok(Some(Self::_make_guard(this, key, guard)))
420                }
421                Err(replica_arc) => {
422                    // TODO Deduplicate this code with the try_lock method
423                    // To fulfill invariant 2C, we need to call [Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock] here.
424                    let mut entries = this.borrow()._entries();
425                    Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock(
426                        &mut entries,
427                        &key,
428                        replica_arc,
429                    );
430                    Ok(None)
431                }
432            },
433            LoadOrInsertMutexResult::Inserted { guard } => {
434                // To fulfill invariant 2C, we immediately put the [ReplicaOwnedMutexGuard] into a [Guard] object.
435                Ok(Some(Self::_make_guard(this, key, guard)))
436            }
437        }
438    }
439
440    pub fn lock_all_unlocked<S: Borrow<Self> + Clone>(
441        this: S,
442        take_while_condition: &impl Fn(&Guard<K, V, C, S>) -> bool,
443    ) -> Vec<Guard<K, V, C, S>> {
444        let entries = this.borrow()._entries();
445        let mut previously_unlocked_entries = entries
446            .iter()
447            .filter_map(
448                |(key, mutex)| match PrimaryArc::clone(mutex).try_lock_owned() {
449                    Ok(guard) => Some(Self::_make_guard(this.clone(), key.clone(), guard)),
450                    Err(_) => {
451                        // Just dropping the [ReplicaArc] here without calling [Self::_delete_if_unlocked_none_and_nobody_waiting_for_lock] is fine
452                        // despite invariant 2C, because we hold a lock on `entries` and had that lock since the call to [PrimaryArc::clone].
453                        None
454                    }
455                },
456            )
457            .take_while_inclusive(take_while_condition)
458            // Collecting into a Vec so that we don't have to keep `entries` locked
459            // while the returned iterator is alive.
460            .collect::<Vec<_>>();
461
462        // We now have all entries fulfilling the `take_while_condition` plus one entry that probably does not
463        // (however, it might fulfill the condition if all entries fulfill it).
464        // We need to remove that last entry and drop it, but before we can do that, we need to drop
465        // `entries` because otherwise we'd have a deadlock when the entry tries to unlock itself.
466        // This whole issue is actually the reason why we used `take_while_inclusive` instead of just
467        // `take_while` above. `take_while` would drop this entry while the stream is being processed
468        // and cause this very deadlock.
469
470        std::mem::drop(entries);
471        if let Some(last_entry) = previously_unlocked_entries.pop() {
472            if take_while_condition(&last_entry) {
473                // It actually fulfilled the take_while_condition.
474                // This can happen if all entries in the map fulfill the condition and this was the overall last map element.
475                // We actually want to return it, so add it back to the return value.
476                previously_unlocked_entries.push(last_entry);
477            }
478        }
479
480        previously_unlocked_entries
481    }
482
483    /// Locks all entries in the cache and returns their guards as a stream.
484    /// For entries that are locked by other threads or tasks, the stream will wait until they are unlocked.
485    /// If that other thread or task having a lock for an entry
486    /// - creates the entry => the stream will return them
487    /// - removes the entry => the stream will not return them
488    /// - entries that were locked by another thread or task but don't have a value will not be returned
489    pub async fn lock_all_entries<S: Borrow<Self> + Clone>(
490        this: S,
491    ) -> impl Stream<Item = Guard<K, V, C, S>> {
492        let entries = this.borrow()._entries();
493        let stream = entries
494            .iter()
495            .map(|(key, mutex)| {
496                let this = this.clone();
497                let key = key.clone();
498                // Concurrency: PrimaryArc::clone must happen before we go async, while we still have the lock on `entries`,
499                //              so that invariant 2A is fulfilled (refcount must only be increased while `entries` is locked).
500                //              The refcount will only be decreased through the Guard, which means it will also only happen
501                //              while `entries` is locked and invariant 2C is fulfilled.
502                let mutex = PrimaryArc::clone(mutex);
503                async move {
504                    let guard = mutex.lock_owned().await;
505                    let guard = Self::_make_guard(this, key, guard);
506                    if guard.value().is_some() {
507                        Some(guard)
508                    } else {
509                        // Dropping the guard fulfills invariant 2C.
510                        None
511                    }
512                }
513            })
514            .collect::<FuturesUnordered<_>>()
515            // Filter out entries that were removed or not-preexisting and not created while locked
516            .filter_map(futures::future::ready);
517        // Drop to ensure that the stream doesn't accidentally capture a lock on `entries`,
518        // so that other threads can keep locking/unlocking while the stream is being processed.
519        std::mem::drop(entries);
520        stream
521    }
522
523    pub(super) fn _unlock(
524        &self,
525        key: &K,
526        mut guard: ReplicaOwnedMutexGuard<EntryValue<C::WrappedV<V>>>,
527    ) {
528        self.config.on_unlock(guard.value.as_mut());
529        let entry_carries_a_value = guard.value.is_some();
530
531        // We need to get the `entries` lock before we drop the guard, see invariant 2C.
532        let mut entries = self._entries();
533        std::mem::drop(guard);
534
535        // Now the guard is dropped and the lock for this key is unlocked.
536        // If there are any other Self::blocking_lock/async_lock/try_lock()
537        // calls for this key already running and waiting for the mutex,
538        // they will be unblocked now and their guard will be created.
539
540        // If the guard we dropped carried a value, keep the entry in the map.
541        // But if it doesn't carry a value, clean up since the entry semantically
542        // doesn't exist in the map and was only created to have a place to put
543        // the mutex. This fulfills invariant 2C.
544        if !entry_carries_a_value {
545            Self::_delete_if_unlocked_and_nobody_waiting_for_lock(&mut entries, key);
546        }
547    }
548
549    fn _delete_if_unlocked_and_nobody_waiting_for_lock(
550        entries: &mut EntriesGuard<'_, K, V, C>,
551        key: &K,
552    ) {
553        let mutex: &Entry<C::WrappedV<V>> = entries
554            .get(key)
555            .expect("This entry must exist or this function shouldn't have been called");
556        // We have a lock on `entries` and invariant 2A ensures that no other threads or tasks can currently
557        // increase num_replicas (i.e. `Arc::strong_count`) or create clones of this Arc. This means that if num_replicas == 0,
558        // we know that we are the only ones with a handle to this `Arc` and we can clean it up without race conditions.
559        if mutex.num_replicas() == 0 {
560            // TODO Combine the `get` above and `remove` here into a single hashing operation, using the hash map's entry API
561            let remove_result = entries.remove(key);
562            assert!(
563                remove_result.is_some(),
564                "We just got this entry above from the hash map, it cannot have vanished since then"
565            );
566        } else {
567            // Another task or thread currently has a [ReplicaArc] for this entry, it may or may not be locked. We cannot clean up yet.
568            // With invariant 2C, we know that thread or task hasn't cleaned up yet but will wait for us to release the `entries`
569            // lock and then eventually call [Self::_delete_if_unlocked_and_nobody_waiting_for_lock] again.
570            // We can just exit and let them deal with it.
571        }
572    }
573
574    fn _delete_if_unlocked_none_and_nobody_waiting_for_lock(
575        entries: &mut EntriesGuard<'_, K, V, C>,
576        key: &K,
577        entry: ReplicaArc<tokio::sync::Mutex<EntryValue<C::WrappedV<V>>>>,
578    ) {
579        // We have a lock on `entries` and invariant 2A ensures that no other threads or tasks can currently
580        // increase num_replicas (i.e. `Arc::strong_count`) or create clones of this Arc. This means that if num_replicas == 1,
581        // we know that we are the only ones with a handle to this `Arc` and we can clean it up without race conditions.
582        if entry.num_replicas() == 1 {
583            let locked = entry
584                .try_lock()
585                .expect("We're the only one who has access to this mutex. Locking can't fail.");
586            if locked.value.is_none() {
587                // TODO Combine the `get` above and `remove` here into a single hashing operation, using the hash map's entry API
588                let remove_result = entries.remove(key);
589                assert!(
590                    remove_result.is_some(),
591                    "We just got this entry above from the hash map, it cannot have vanished since then"
592                );
593            }
594        } else {
595            // Another task or thread currently has a [ReplicaArc] for this entry, it may or may not be locked. We cannot clean up yet.
596            // With invariant 2C, we know that thread or task hasn't cleaned up yet but will wait for us to release the `entries`
597            // lock and then eventually call [Self::_delete_if_unlocked_and_nobody_waiting_for_lock] again.
598            // We can just exit and let them deal with it.
599        }
600    }
601
602    pub fn into_entries_unordered(self) -> impl Iterator<Item = (K, C::WrappedV<V>)> {
603        let entries: C::MapImpl<K, V> = self.entries.into_inner().expect("Lock poisoned");
604
605        #[cfg(any(test, feature = "slow_assertions"))]
606        EntriesGuard::<K, V, C>::assert_invariant(&entries);
607
608        // We now have exclusive access to the LockableMapImpl object. Rust lifetime rules ensure that no other thread or task can have any
609        // Guard for an entry since both owned and non-owned guards are bound to the lifetime of the LockableMapImpl (owned guards
610        // indirectly through the Arc but if user code calls this function, it means they had to call Arc::try_unwrap or something similar
611        // which ensures that there are no other threads with access to it.
612
613        entries
614            .into_iter()
615            .map(|(key, value)| {
616                let value = PrimaryArc::try_unwrap(value)
617                    .unwrap_or_else(|_| panic!("We're the only one with access, there shouldn't be any other threads or tasks that have a copy of this Arc."));
618                let value = value.into_inner().value.expect("Invariant 2 violated. There shouldn't be any `None` entries since there aren't any ReplicaArcs.");
619                (key, value)
620            })
621    }
622
623    // Caveat: Locked keys are listed even if they don't carry a value
624    #[inline]
625    pub fn keys_with_entries_or_locked(&self) -> Vec<K> {
626        let entries = self._entries();
627        entries.iter().map(|(key, _value)| key).cloned().collect()
628    }
629
630    fn _lock_up_to_n_first_unlocked_entries<S: Borrow<Self> + Clone>(
631        this: &S,
632        entries: &mut EntriesGuard<'_, K, V, C>,
633        num_entries: usize,
634    ) -> Vec<Guard<K, V, C, S>> {
635        let mut result = Vec::with_capacity(num_entries);
636        for (key, mutex) in entries.iter() {
637            match PrimaryArc::clone(mutex).try_lock_owned() {
638                Ok(guard) => {
639                    if guard.value.is_some() {
640                        result.push(Self::_make_guard(this.clone(), key.clone(), guard))
641                    } else {
642                        // We have not created this `None` entry and according to invariant 2, we know that there is
643                        // a [ReplicaArc] for it somewhere. Even though it seems to be unlocked, we know it exists.
644                        // Because we have a lock on `entries`, invariant 2C tells us that this [ReplicaArc]'s destruction
645                        // will wait for us before cleaning up the `None`.
646                        // We don't need to worry about cleaning up the `None` ourselves.
647                        assert!(mutex.num_replicas() > 1, "Invariant violated");
648                    }
649                }
650                Err(_replica_arc) => {
651                    // Invariant 2C:
652                    // A failed try_lock means we currently have another lock and we can rely on that one
653                    // calling _unlock and potentially deleting an item if it is None. So we don't need
654                    // to create a guard object here. Because we have a lock on `entries`, there are no
655                    // race conditions with that _unlock.
656                    // It's ok to just drop the [ReplicaArc] without calling _delete_if_unlocked_none_and_nobody_waiting_for_lock
657                    // because we have a lock on `entries` and had that lock since the call to [PrimaryArc::clone].
658                    assert!(mutex.num_replicas() > 1, "Invariant violated");
659                }
660            }
661            if result.len() >= num_entries {
662                break;
663            }
664        }
665        result
666    }
667}
668
669impl<K, V, C> Debug for LockableMapImpl<K, V, C>
670where
671    K: Eq + PartialEq + Hash + Clone,
672    C: LockableMapConfig + Clone,
673{
674    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675        fmt.debug_struct("LockableMapImpl").finish()
676    }
677}
678
679/// Simple wrapper around `MutexGuard<C::MapImpl<K, V>>`.
680/// In release mode, this doesn't do anything else.
681/// In debug mode or tests, it enforces our invariants.
682struct EntriesGuard<'a, K, V, C>
683where
684    K: Eq + PartialEq + Hash + Clone,
685    C: LockableMapConfig + Clone,
686{
687    entries: std::sync::MutexGuard<'a, C::MapImpl<K, V>>,
688    _k: PhantomData<K>,
689    _v: PhantomData<V>,
690    _c: PhantomData<C>,
691}
692
693impl<'a, K, V, C> EntriesGuard<'a, K, V, C>
694where
695    K: Eq + PartialEq + Hash + Clone,
696    C: LockableMapConfig + Clone,
697{
698    #[track_caller]
699    fn new(entries: std::sync::MutexGuard<'a, C::MapImpl<K, V>>) -> Self {
700        #[cfg(any(test, feature = "slow_assertions"))]
701        Self::assert_invariant(&entries);
702
703        Self {
704            entries,
705            _k: PhantomData,
706            _v: PhantomData,
707            _c: PhantomData,
708        }
709    }
710
711    #[cfg(any(test, feature = "slow_assertions"))]
712    #[track_caller]
713    fn assert_invariant(entries: &C::MapImpl<K, V>) {
714        for (_key, entry) in entries.iter() {
715            if entry.num_replicas() == 0 {
716                let Ok(guard) = PrimaryArc::clone(entry).try_lock_owned() else {
717                    panic!("We're the only one with access, locking can't fail");
718                };
719                assert!(
720                    guard.value.is_some(),
721                    "Invariant 2 violated. Found an entry without ReplicaArcs that is None."
722                );
723            }
724        }
725    }
726}
727
728impl<K, V, C> Deref for EntriesGuard<'_, K, V, C>
729where
730    K: Eq + PartialEq + Hash + Clone,
731    C: LockableMapConfig + Clone,
732{
733    type Target = C::MapImpl<K, V>;
734
735    fn deref(&self) -> &Self::Target {
736        &self.entries
737    }
738}
739
740impl<K, V, C> DerefMut for EntriesGuard<'_, K, V, C>
741where
742    K: Eq + PartialEq + Hash + Clone,
743    C: LockableMapConfig + Clone,
744{
745    fn deref_mut(&mut self) -> &mut Self::Target {
746        &mut self.entries
747    }
748}
749
750#[cfg(any(test, feature = "slow_assertions"))]
751impl<K, V, C> Drop for EntriesGuard<'_, K, V, C>
752where
753    K: Eq + PartialEq + Hash + Clone,
754    C: LockableMapConfig + Clone,
755{
756    fn drop(&mut self) {
757        Self::assert_invariant(&self.entries);
758    }
759}
760
761#[cfg(test)]
762mod tests {
763    use super::*;
764    use crate::lockable_hash_map::LockableHashMapConfig;
765
766    #[test]
767    fn test_debug() {
768        let map = LockableMapImpl::<i64, String, LockableHashMapConfig>::new(LockableHashMapConfig);
769        assert_eq!("LockableMapImpl", format!("{:?}", map));
770    }
771}