1#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
2
3use std::{
4 collections::HashMap,
5 hash::Hash,
6 sync::{
7 atomic::{AtomicUsize, Ordering},
8 Arc,
9 },
10};
11
12use tokio::sync::{Mutex, OwnedMutexGuard, TryLockError};
13
14#[derive(Debug)]
17pub struct KeyLock<K> {
18 locks: Mutex<HashMap<K, Arc<Mutex<()>>>>,
20 accesses: AtomicUsize,
22}
23
24impl<K> Default for KeyLock<K> {
25 fn default() -> Self {
26 Self { locks: Mutex::default(), accesses: AtomicUsize::default() }
27 }
28}
29
30impl<K> KeyLock<K>
31where
32 K: Eq + Hash + Send + Clone,
33{
34 #[must_use]
36 pub fn new() -> Self {
37 Self::default()
38 }
39
40 pub async fn lock(&self, key: K) -> OwnedMutexGuard<()> {
42 let mut locks = self.locks.lock().await;
43
44 if self.accesses.fetch_add(1, Ordering::Relaxed) % 1000 == 0 {
45 Self::clean_up(&mut locks);
46 }
47
48 let lock = locks.entry(key).or_insert_with(|| Arc::new(Mutex::new(()))).clone();
49 drop(locks);
50
51 lock.lock_owned().await
52 }
53
54 pub async fn try_lock(&self, key: K) -> Result<OwnedMutexGuard<()>, TryLockError> {
57 let mut locks = self.locks.lock().await;
58
59 if self.accesses.fetch_add(1, Ordering::Relaxed) % 1000 == 0 {
60 Self::clean_up(&mut locks);
61 }
62
63 let lock = locks.entry(key).or_insert_with(|| Arc::new(Mutex::new(()))).clone();
64 drop(locks);
65
66 lock.try_lock_owned()
67 }
68
69 pub fn blocking_lock(&self, key: K) -> OwnedMutexGuard<()> {
72 let mut locks = self.locks.blocking_lock();
73
74 if self.accesses.fetch_add(1, Ordering::Relaxed) % 1000 == 0 {
75 Self::clean_up(&mut locks);
76 }
77
78 let lock = locks.entry(key).or_insert_with(|| Arc::new(Mutex::new(()))).clone();
79 drop(locks);
80
81 lock.blocking_lock_owned()
82 }
83
84 pub fn blocking_try_lock(&self, key: K) -> Result<OwnedMutexGuard<()>, TryLockError> {
87 let mut locks = self.locks.blocking_lock();
88
89 if self.accesses.fetch_add(1, Ordering::Relaxed) % 1000 == 0 {
90 Self::clean_up(&mut locks);
91 }
92
93 let lock = locks.entry(key).or_insert_with(|| Arc::new(Mutex::new(()))).clone();
94 drop(locks);
95
96 lock.try_lock_owned()
97 }
98
99 pub async fn clean(&self) {
101 let mut locks = self.locks.lock().await;
102 Self::clean_up(&mut locks);
103 }
104
105 pub fn blocking_clean(&self) {
107 let mut locks = self.locks.blocking_lock();
108 Self::clean_up(&mut locks);
109 }
110
111 fn clean_up(locks: &mut HashMap<K, Arc<Mutex<()>>>) {
113 let mut to_remove = Vec::new();
114 for (key, lock) in locks.iter() {
115 if lock.try_lock().is_ok() {
116 to_remove.push(key.clone());
117 }
118 }
119 for key in to_remove {
120 locks.remove(&key);
121 }
122 }
123}
124
125#[cfg(test)]
126mod tests;