async_named_locker/
object_holder.rs

1use std::ops::{Deref, DerefMut};
2use std::sync::{Arc, Mutex};
3use notify_future::NotifyFuture;
4
5pub struct ObjectGuard<T> {
6    obj: Option<T>,
7    holder: ObjectHolder<T>
8}
9
10impl<T> ObjectGuard<T> {
11    pub fn new(obj: T, holder: ObjectHolder<T>) -> Self {
12        ObjectGuard {
13            obj: Some(obj),
14            holder
15        }
16    }
17}
18
19impl<T> Drop for ObjectGuard<T> {
20    fn drop(&mut self) {
21        if let Some(obj) = self.obj.take() {
22            self.holder.release(obj);
23        }
24    }
25}
26
27impl<T> Deref for ObjectGuard<T> {
28    type Target = T;
29
30    fn deref(&self) -> &Self::Target {
31        self.obj.as_ref().unwrap()
32    }
33}
34
35impl<T> DerefMut for ObjectGuard<T> {
36    fn deref_mut(&mut self) -> &mut Self::Target {
37        self.obj.as_mut().unwrap()
38    }
39}
40
41struct ObjectHolderState<T> {
42    obj: Option<T>,
43    waiter_list: Vec<NotifyFuture<T>>
44}
45
46pub struct ObjectHolder<T> {
47    state: Arc<Mutex<ObjectHolderState<T>>>
48}
49
50impl<T> Clone for ObjectHolder<T> {
51    fn clone(&self) -> Self {
52        ObjectHolder {
53            state: self.state.clone()
54        }
55    }
56}
57impl <T> ObjectHolder<T> {
58    pub fn new(obj: T) -> Self {
59        ObjectHolder {
60            state: Arc::new(Mutex::new(ObjectHolderState {
61                obj: Some(obj),
62                waiter_list: vec![]
63            }))
64        }
65    }
66
67    pub async fn get(&self) -> ObjectGuard<T> {
68        let waiter = {
69            let mut state = self.state.lock().unwrap();
70            if let Some(obj) = state.obj.take() {
71                return ObjectGuard::new(obj, self.clone());
72            }
73            let waiter = NotifyFuture::new();
74            state.waiter_list.push(waiter.clone());
75            waiter
76        };
77
78        let obj = waiter.await;
79        ObjectGuard::new(obj, self.clone())
80    }
81
82    fn release(&self, obj: T) {
83        let mut state = self.state.lock().unwrap();
84        if let Some(waiter) = state.waiter_list.pop() {
85            waiter.set_complete(obj);
86        } else {
87            state.obj = Some(obj);
88        }
89    }
90}
91
92#[cfg(test)]
93mod test {
94    use super::*;
95    use tokio::time::{sleep, Duration};
96
97    #[tokio::test]
98    async fn test_object_holder() {
99        let holder = ObjectHolder::new(1);
100        let holder1 = holder.clone();
101        let guard1 = tokio::spawn(async move {
102            let guard = holder1.get().await;
103            sleep(Duration::from_secs(1)).await;
104        });
105
106        let holder2 = holder.clone();
107        let guard2 = tokio::spawn(async move {
108            let guard = holder2.get().await;
109        });
110
111        guard1.await.unwrap();
112        guard2.await.unwrap();
113    }
114}