veilid_tools/async_locks/
async_mutex.rs1use super::*;
3
4cfg_if! {
5 if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
6 use async_lock::Mutex as InnerAsyncMutex;
7 use async_lock::MutexGuard as InnerAsyncMutexGuard;
8 use async_lock::MutexGuardArc as InnerAsyncMutexGuardArc;
9 } else {
10 cfg_if! {
11 if #[cfg(feature="rt-async-std")] {
12 use async_std::sync::Mutex as InnerAsyncMutex;
13 use async_std::sync::MutexGuard as InnerAsyncMutexGuard;
14 use async_std::sync::MutexGuardArc as InnerAsyncMutexGuardArc;
15 } else if #[cfg(feature="rt-tokio")] {
16 use tokio::sync::Mutex as InnerAsyncMutex;
17 use tokio::sync::MutexGuard as InnerAsyncMutexGuard;
18 use tokio::sync::OwnedMutexGuard as InnerAsyncMutexGuardArc;
19 } else {
20 compile_error!("needs executor implementation");
21 }
22 }
23 }
24}
25
26cfg_if::cfg_if! {
27 if #[cfg(feature="rt-tokio")] {
28 macro_rules! asyncmutex_try_lock {
29 ($x:expr) => {
30 $x.try_lock().ok()
31 };
32 }
33
34 macro_rules! asyncmutex_lock_arc {
35 ($x:expr) => {
36 $x.clone().lock_owned()
37 };
38 }
39
40 macro_rules! asyncmutex_try_lock_arc {
41 ($x:expr) => {
42 $x.clone().try_lock_owned().ok()
43 };
44 }
45
46 } else {
47 macro_rules! asyncmutex_try_lock {
48 ($x:expr) => {
49 $x.try_lock()
50 };
51 }
52 macro_rules! asyncmutex_lock_arc {
53 ($x:expr) => {
54 $x.lock_arc()
55 };
56 }
57 macro_rules! asyncmutex_try_lock_arc {
58 ($x:expr) => {
59 $x.try_lock_arc()
60 };
61 }
62 }
63}
64
65#[derive(Debug)]
66pub struct AsyncMutex<T: ?Sized> {
67 inner: Arc<InnerAsyncMutex<T>>,
68 #[cfg(feature = "debug-locks")]
69 lock_id_container: LockIdContainer,
70}
71
72impl<T: ?Sized> AsyncMutex<T> {
73 #[inline]
74 pub fn new(t: T) -> Self
75 where
76 T: Sized,
77 {
78 Self {
79 inner: Arc::new(InnerAsyncMutex::new(t)),
80 #[cfg(feature = "debug-locks")]
81 lock_id_container: LockIdContainer::next(),
82 }
83 }
84
85 #[inline]
86 pub async fn lock(&self) -> AsyncMutexGuard<'_, T> {
87 cfg_if! {
88 if #[cfg(feature = "debug-locks")] {
89 let inner = match timeout(DEBUG_LOCKS_DURATION_MS, self.inner.lock()).await {
90 Ok(v) => v,
91 Err(_) => {
92 self.lock_id_container.report_deadlock("AsyncMutex::lock deadlock");
93 }
94 };
95 } else {
96 let inner = self.inner.lock().await;
97 }
98 }
99
100 AsyncMutexGuard {
101 inner,
102 #[cfg(feature = "debug-locks")]
103 _guard_id_container: GuardIdContainer::next(
104 self.lock_id_container.clone(),
105 #[cfg(feature = "debug-locks-detect")]
106 LockSense::Write,
107 ),
108 }
109 }
110
111 #[inline]
112 pub async fn lock_arc(self: &Arc<Self>) -> AsyncMutexGuardArc<T> {
113 cfg_if! {
114 if #[cfg(feature = "debug-locks")] {
115 let inner = match timeout(DEBUG_LOCKS_DURATION_MS, asyncmutex_lock_arc!(self.inner)).await {
116 Ok(v) => v,
117 Err(_) => {
118 self.lock_id_container.report_deadlock("AsyncMutex::lock_arc deadlock");
119 }
120 };
121 } else {
122 let inner = asyncmutex_lock_arc!(self.inner).await;
123 }
124 }
125
126 AsyncMutexGuardArc {
127 inner,
128 #[cfg(feature = "debug-locks")]
129 _guard_id_container: GuardIdContainer::next(
130 self.lock_id_container.clone(),
131 #[cfg(feature = "debug-locks-detect")]
132 LockSense::Write,
133 ),
134 }
135 }
136
137 #[inline]
138 #[must_use]
139 pub fn try_lock(&self) -> Option<AsyncMutexGuard<'_, T>> {
140 let inner = asyncmutex_try_lock!(self.inner)?;
141
142 let out = AsyncMutexGuard {
143 inner,
144 #[cfg(feature = "debug-locks")]
145 _guard_id_container: GuardIdContainer::next(
146 self.lock_id_container.clone(),
147 #[cfg(feature = "debug-locks-detect")]
148 LockSense::TryWrite,
149 ),
150 };
151
152 Some(out)
153 }
154
155 #[inline]
156 #[must_use]
157 pub fn try_lock_arc(self: &Arc<Self>) -> Option<AsyncMutexGuardArc<T>> {
158 let inner = asyncmutex_try_lock_arc!(self.inner)?;
159
160 let out = AsyncMutexGuardArc {
161 inner,
162 #[cfg(feature = "debug-locks")]
163 _guard_id_container: GuardIdContainer::next(
164 self.lock_id_container.clone(),
165 #[cfg(feature = "debug-locks-detect")]
166 LockSense::TryWrite,
167 ),
168 };
169
170 Some(out)
171 }
172}
173
174impl<T> From<T> for AsyncMutex<T> {
175 fn from(s: T) -> Self {
176 Self::new(s)
177 }
178}
179
180#[clippy::has_significant_drop]
181#[must_use = "if unused the Mutex will immediately unlock"]
182#[derive(Debug)]
183pub struct AsyncMutexGuard<'a, T: ?Sized> {
184 inner: InnerAsyncMutexGuard<'a, T>,
185 #[cfg(feature = "debug-locks")]
186 _guard_id_container: GuardIdContainer,
187}
188
189impl<T: fmt::Display + ?Sized> fmt::Display for AsyncMutexGuard<'_, T> {
190 #[inline]
191 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192 self.inner.fmt(f)
193 }
194}
195
196impl<T: ?Sized> std::ops::Deref for AsyncMutexGuard<'_, T> {
197 type Target = T;
198
199 #[inline]
200 fn deref(&self) -> &T {
201 &self.inner
202 }
203}
204
205impl<T: ?Sized> std::ops::DerefMut for AsyncMutexGuard<'_, T> {
206 #[inline]
207 fn deref_mut(&mut self) -> &mut T {
208 &mut self.inner
209 }
210}
211
212#[clippy::has_significant_drop]
213#[derive(Debug)]
214pub struct AsyncMutexGuardArc<T: ?Sized> {
215 inner: InnerAsyncMutexGuardArc<T>,
216 #[cfg(feature = "debug-locks")]
217 _guard_id_container: GuardIdContainer,
218}
219
220impl<T: fmt::Display + ?Sized> fmt::Display for AsyncMutexGuardArc<T> {
221 #[inline]
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 self.inner.fmt(f)
224 }
225}
226
227impl<T: ?Sized> std::ops::Deref for AsyncMutexGuardArc<T> {
228 type Target = T;
229
230 #[inline]
231 fn deref(&self) -> &T {
232 &self.inner
233 }
234}
235
236impl<T: ?Sized> std::ops::DerefMut for AsyncMutexGuardArc<T> {
237 #[inline]
238 fn deref_mut(&mut self) -> &mut T {
239 &mut self.inner
240 }
241}