async_named_locker/
object_locker.rs

1use std::sync::{Mutex};
2use std::collections::HashMap;
3use std::ops::{Deref, DerefMut};
4use notify_future::NotifyFuture;
5
6struct LockerState {
7    pub is_locked: bool,
8    pub pending_list: Vec<NotifyFuture<()>>
9}
10
11struct LockerManager {
12    locker_map: Mutex<HashMap<String, LockerState>>
13}
14
15lazy_static::lazy_static! {
16    static ref LOCK_MANAGER: LockerManager = LockerManager::new();
17}
18
19impl LockerManager {
20    pub fn new() -> LockerManager {
21        Self {
22            locker_map: Mutex::new(HashMap::new())
23        }
24    }
25
26    pub async fn lock(&self, locker_id: String) {
27        let future = {
28            let mut locker_map = self.locker_map.lock().unwrap();
29            let locker_info = locker_map.get_mut(&locker_id);
30            if locker_info.is_none() {
31                locker_map.insert(locker_id.clone(), LockerState {
32                    is_locked: true,
33                    pending_list: Vec::new()
34                });
35                log::debug!("LockerManager:get locker {}", locker_id);
36                return;
37            } else {
38                let state = locker_info.unwrap();
39                if state.is_locked {
40                    let future = NotifyFuture::new();
41                    state.pending_list.push(future.clone());
42                    future
43                } else {
44                    state.is_locked = true;
45                    log::debug!("LockerManager:get locker {}", locker_id);
46                    return;
47                }
48            }
49        };
50        log::debug!("LockerManager:waiting locker {}", locker_id);
51        future.await;
52        log::debug!("LockerManager:get locker {}", locker_id);
53    }
54
55    pub fn unlock(&self, locker_id: &str) {
56        let mut locker_map = self.locker_map.lock().unwrap();
57        let locker_info = locker_map.get_mut(locker_id);
58        if locker_info.is_some() {
59            let state = locker_info.unwrap();
60            if state.pending_list.len() > 0 {
61                let future = state.pending_list.remove(0);
62                future.set_complete(());
63            } else {
64                state.is_locked = false;
65            }
66        } else {
67            assert!(false);
68        }
69        log::debug!("LockerManager:free locker {}", locker_id);
70    }
71}
72
73pub struct Locker {
74    locker_id: String,
75}
76
77impl Locker {
78    pub async fn get_locker(locker_id: impl Into<String>) -> Self {
79        let id = locker_id.into();
80        LOCK_MANAGER.lock(id.clone()).await;
81        Self {
82            locker_id: id
83        }
84    }
85}
86
87impl Drop for Locker {
88    fn drop(&mut self) {
89        LOCK_MANAGER.unlock(self.locker_id.as_str());
90    }
91}
92
93pub struct GuardObject<T> {
94    _locker: Locker,
95    obj: T
96}
97
98impl <T> GuardObject<T> {
99    pub fn new(locker: Locker, obj: T) -> Self {
100        Self {
101            _locker: locker,
102            obj
103        }
104    }
105
106    pub fn release_locker(self) -> T {
107        self.obj
108    }
109}
110
111impl <T> Deref for GuardObject<T> {
112    type Target = T;
113
114    fn deref(&self) -> &Self::Target {
115        &self.obj
116    }
117}
118
119impl <T> DerefMut for GuardObject<T> {
120    fn deref_mut(&mut self) -> &mut Self::Target {
121        &mut self.obj
122    }
123}
124
125#[cfg(test)]
126mod test {
127    use std::sync::{Arc, Mutex};
128    use std::time::Duration;
129    use crate::Locker;
130
131    #[tokio::test]
132    async fn test() {
133        let _locker = Locker::get_locker("test".to_string()).await;
134        let i = Arc::new(Mutex::new(0));
135        let i_copy = i.clone();
136        tokio::spawn(async move {
137            let _locker = Locker::get_locker("test").await;
138            assert_eq!(*i_copy.lock().unwrap(), 1);
139        });
140        tokio::time::sleep(Duration::from_secs(5)).await;
141        *i.lock().unwrap() = 1;
142    }
143}