async_named_locker/
object_locker.rs1use std::sync::{Mutex};
2use std::collections::HashMap;
3use std::ops::{Deref, DerefMut};
4use notify_future::NotifyFuture;
5
6struct LockerState {
7 pub is_locked: bool,
8 pub pending_list: Vec<NotifyFuture<()>>
9}
10
11struct LockerManager {
12 locker_map: Mutex<HashMap<String, LockerState>>
13}
14
15lazy_static::lazy_static! {
16 static ref LOCK_MANAGER: LockerManager = LockerManager::new();
17}
18
19impl LockerManager {
20 pub fn new() -> LockerManager {
21 Self {
22 locker_map: Mutex::new(HashMap::new())
23 }
24 }
25
26 pub async fn lock(&self, locker_id: String) {
27 let future = {
28 let mut locker_map = self.locker_map.lock().unwrap();
29 let locker_info = locker_map.get_mut(&locker_id);
30 if locker_info.is_none() {
31 locker_map.insert(locker_id.clone(), LockerState {
32 is_locked: true,
33 pending_list: Vec::new()
34 });
35 log::debug!("LockerManager:get locker {}", locker_id);
36 return;
37 } else {
38 let state = locker_info.unwrap();
39 if state.is_locked {
40 let future = NotifyFuture::new();
41 state.pending_list.push(future.clone());
42 future
43 } else {
44 state.is_locked = true;
45 log::debug!("LockerManager:get locker {}", locker_id);
46 return;
47 }
48 }
49 };
50 log::debug!("LockerManager:waiting locker {}", locker_id);
51 future.await;
52 log::debug!("LockerManager:get locker {}", locker_id);
53 }
54
55 pub fn unlock(&self, locker_id: &str) {
56 let mut locker_map = self.locker_map.lock().unwrap();
57 let locker_info = locker_map.get_mut(locker_id);
58 if locker_info.is_some() {
59 let state = locker_info.unwrap();
60 if state.pending_list.len() > 0 {
61 let future = state.pending_list.remove(0);
62 future.set_complete(());
63 } else {
64 state.is_locked = false;
65 }
66 } else {
67 assert!(false);
68 }
69 log::debug!("LockerManager:free locker {}", locker_id);
70 }
71}
72
73pub struct Locker {
74 locker_id: String,
75}
76
77impl Locker {
78 pub async fn get_locker(locker_id: impl Into<String>) -> Self {
79 let id = locker_id.into();
80 LOCK_MANAGER.lock(id.clone()).await;
81 Self {
82 locker_id: id
83 }
84 }
85}
86
87impl Drop for Locker {
88 fn drop(&mut self) {
89 LOCK_MANAGER.unlock(self.locker_id.as_str());
90 }
91}
92
93pub struct GuardObject<T> {
94 _locker: Locker,
95 obj: T
96}
97
98impl <T> GuardObject<T> {
99 pub fn new(locker: Locker, obj: T) -> Self {
100 Self {
101 _locker: locker,
102 obj
103 }
104 }
105
106 pub fn release_locker(self) -> T {
107 self.obj
108 }
109}
110
111impl <T> Deref for GuardObject<T> {
112 type Target = T;
113
114 fn deref(&self) -> &Self::Target {
115 &self.obj
116 }
117}
118
119impl <T> DerefMut for GuardObject<T> {
120 fn deref_mut(&mut self) -> &mut Self::Target {
121 &mut self.obj
122 }
123}
124
125#[cfg(test)]
126mod test {
127 use std::sync::{Arc, Mutex};
128 use std::time::Duration;
129 use crate::Locker;
130
131 #[tokio::test]
132 async fn test() {
133 let _locker = Locker::get_locker("test".to_string()).await;
134 let i = Arc::new(Mutex::new(0));
135 let i_copy = i.clone();
136 tokio::spawn(async move {
137 let _locker = Locker::get_locker("test").await;
138 assert_eq!(*i_copy.lock().unwrap(), 1);
139 });
140 tokio::time::sleep(Duration::from_secs(5)).await;
141 *i.lock().unwrap() = 1;
142 }
143}