Skip to main content

veilid_tools/async_locks/
async_mutex.rs

1//! AsyncMutex
2use super::*;
3
4cfg_if! {
5    if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
6        use async_lock::Mutex as InnerAsyncMutex;
7        use async_lock::MutexGuard as InnerAsyncMutexGuard;
8        use async_lock::MutexGuardArc as InnerAsyncMutexGuardArc;
9    } else {
10        cfg_if! {
11            if #[cfg(feature="rt-async-std")] {
12                use async_std::sync::Mutex as InnerAsyncMutex;
13                use async_std::sync::MutexGuard as InnerAsyncMutexGuard;
14                use async_std::sync::MutexGuardArc as InnerAsyncMutexGuardArc;
15            } else if #[cfg(feature="rt-tokio")] {
16                use tokio::sync::Mutex as InnerAsyncMutex;
17                use tokio::sync::MutexGuard as InnerAsyncMutexGuard;
18                use tokio::sync::OwnedMutexGuard as InnerAsyncMutexGuardArc;
19            } else {
20                compile_error!("needs executor implementation");
21            }
22        }
23    }
24}
25
26cfg_if::cfg_if! {
27    if #[cfg(feature="rt-tokio")] {
28        macro_rules! asyncmutex_try_lock {
29            ($x:expr) => {
30                $x.try_lock().ok()
31            };
32        }
33
34        macro_rules! asyncmutex_lock_arc {
35            ($x:expr) => {
36                $x.clone().lock_owned()
37            };
38        }
39
40        macro_rules! asyncmutex_try_lock_arc {
41            ($x:expr) => {
42                $x.clone().try_lock_owned().ok()
43            };
44        }
45
46    } else {
47        macro_rules! asyncmutex_try_lock {
48            ($x:expr) => {
49                $x.try_lock()
50            };
51        }
52        macro_rules! asyncmutex_lock_arc {
53            ($x:expr) => {
54                $x.lock_arc()
55            };
56        }
57        macro_rules! asyncmutex_try_lock_arc {
58            ($x:expr) => {
59                $x.try_lock_arc()
60            };
61        }
62    }
63}
64
65#[derive(Debug)]
66pub struct AsyncMutex<T: ?Sized> {
67    inner: Arc<InnerAsyncMutex<T>>,
68    #[cfg(feature = "debug-locks")]
69    lock_id_container: LockIdContainer,
70}
71
72impl<T: ?Sized> AsyncMutex<T> {
73    #[inline]
74    pub fn new(t: T) -> Self
75    where
76        T: Sized,
77    {
78        Self {
79            inner: Arc::new(InnerAsyncMutex::new(t)),
80            #[cfg(feature = "debug-locks")]
81            lock_id_container: LockIdContainer::next(),
82        }
83    }
84
85    #[inline]
86    pub async fn lock(&self) -> AsyncMutexGuard<'_, T> {
87        cfg_if! {
88            if #[cfg(feature = "debug-locks")] {
89                let inner = match timeout(DEBUG_LOCKS_DURATION_MS, self.inner.lock()).await {
90                    Ok(v) => v,
91                    Err(_) => {
92                        self.lock_id_container.report_deadlock("AsyncMutex::lock deadlock");
93                    }
94                };
95            } else {
96                let inner = self.inner.lock().await;
97            }
98        }
99
100        AsyncMutexGuard {
101            inner,
102            #[cfg(feature = "debug-locks")]
103            _guard_id_container: GuardIdContainer::next(
104                self.lock_id_container.clone(),
105                #[cfg(feature = "debug-locks-detect")]
106                LockSense::Write,
107            ),
108        }
109    }
110
111    #[inline]
112    pub async fn lock_arc(self: &Arc<Self>) -> AsyncMutexGuardArc<T> {
113        cfg_if! {
114            if #[cfg(feature = "debug-locks")] {
115                let inner = match timeout(DEBUG_LOCKS_DURATION_MS, asyncmutex_lock_arc!(self.inner)).await {
116                    Ok(v) => v,
117                    Err(_) => {
118                        self.lock_id_container.report_deadlock("AsyncMutex::lock_arc deadlock");
119                    }
120                };
121            } else {
122                let inner = asyncmutex_lock_arc!(self.inner).await;
123            }
124        }
125
126        AsyncMutexGuardArc {
127            inner,
128            #[cfg(feature = "debug-locks")]
129            _guard_id_container: GuardIdContainer::next(
130                self.lock_id_container.clone(),
131                #[cfg(feature = "debug-locks-detect")]
132                LockSense::Write,
133            ),
134        }
135    }
136
137    #[inline]
138    #[must_use]
139    pub fn try_lock(&self) -> Option<AsyncMutexGuard<'_, T>> {
140        let inner = asyncmutex_try_lock!(self.inner)?;
141
142        let out = AsyncMutexGuard {
143            inner,
144            #[cfg(feature = "debug-locks")]
145            _guard_id_container: GuardIdContainer::next(
146                self.lock_id_container.clone(),
147                #[cfg(feature = "debug-locks-detect")]
148                LockSense::TryWrite,
149            ),
150        };
151
152        Some(out)
153    }
154
155    #[inline]
156    #[must_use]
157    pub fn try_lock_arc(self: &Arc<Self>) -> Option<AsyncMutexGuardArc<T>> {
158        let inner = asyncmutex_try_lock_arc!(self.inner)?;
159
160        let out = AsyncMutexGuardArc {
161            inner,
162            #[cfg(feature = "debug-locks")]
163            _guard_id_container: GuardIdContainer::next(
164                self.lock_id_container.clone(),
165                #[cfg(feature = "debug-locks-detect")]
166                LockSense::TryWrite,
167            ),
168        };
169
170        Some(out)
171    }
172}
173
174impl<T> From<T> for AsyncMutex<T> {
175    fn from(s: T) -> Self {
176        Self::new(s)
177    }
178}
179
180#[clippy::has_significant_drop]
181#[must_use = "if unused the Mutex will immediately unlock"]
182#[derive(Debug)]
183pub struct AsyncMutexGuard<'a, T: ?Sized> {
184    inner: InnerAsyncMutexGuard<'a, T>,
185    #[cfg(feature = "debug-locks")]
186    _guard_id_container: GuardIdContainer,
187}
188
189impl<T: fmt::Display + ?Sized> fmt::Display for AsyncMutexGuard<'_, T> {
190    #[inline]
191    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192        self.inner.fmt(f)
193    }
194}
195
196impl<T: ?Sized> std::ops::Deref for AsyncMutexGuard<'_, T> {
197    type Target = T;
198
199    #[inline]
200    fn deref(&self) -> &T {
201        &self.inner
202    }
203}
204
205impl<T: ?Sized> std::ops::DerefMut for AsyncMutexGuard<'_, T> {
206    #[inline]
207    fn deref_mut(&mut self) -> &mut T {
208        &mut self.inner
209    }
210}
211
212#[clippy::has_significant_drop]
213#[derive(Debug)]
214pub struct AsyncMutexGuardArc<T: ?Sized> {
215    inner: InnerAsyncMutexGuardArc<T>,
216    #[cfg(feature = "debug-locks")]
217    _guard_id_container: GuardIdContainer,
218}
219
220impl<T: fmt::Display + ?Sized> fmt::Display for AsyncMutexGuardArc<T> {
221    #[inline]
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        self.inner.fmt(f)
224    }
225}
226
227impl<T: ?Sized> std::ops::Deref for AsyncMutexGuardArc<T> {
228    type Target = T;
229
230    #[inline]
231    fn deref(&self) -> &T {
232        &self.inner
233    }
234}
235
236impl<T: ?Sized> std::ops::DerefMut for AsyncMutexGuardArc<T> {
237    #[inline]
238    fn deref_mut(&mut self) -> &mut T {
239        &mut self.inner
240    }
241}