async_mutex/lib.rs
1//! DO NOT USE!
2//!
3//! This crate was merged into [async-lock], which provides the API this crate used to.
4//!
5//! [async-lock]: https://crates.io/crates/async-lock
6
7#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
8
9use std::cell::UnsafeCell;
10use std::fmt;
11use std::ops::{Deref, DerefMut};
12use std::process;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use std::usize;
17
18use event_listener::Event;
19
20/// An async mutex.
21pub struct Mutex<T: ?Sized> {
22 /// Current state of the mutex.
23 ///
24 /// The least significant bit is set to 1 if the mutex is locked.
25 /// The other bits hold the number of starved lock operations.
26 state: AtomicUsize,
27
28 /// Lock operations waiting for the mutex to be released.
29 lock_ops: Event,
30
31 /// The value inside the mutex.
32 data: UnsafeCell<T>,
33}
34
35unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
36unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}
37
38impl<T> Mutex<T> {
39 /// Creates a new async mutex.
40 ///
41 /// # Examples
42 ///
43 /// ```
44 /// use async_mutex::Mutex;
45 ///
46 /// let mutex = Mutex::new(0);
47 /// ```
48 pub const fn new(data: T) -> Mutex<T> {
49 Mutex {
50 state: AtomicUsize::new(0),
51 lock_ops: Event::new(),
52 data: UnsafeCell::new(data),
53 }
54 }
55
56 /// Consumes the mutex, returning the underlying data.
57 ///
58 /// # Examples
59 ///
60 /// ```
61 /// use async_mutex::Mutex;
62 ///
63 /// let mutex = Mutex::new(10);
64 /// assert_eq!(mutex.into_inner(), 10);
65 /// ```
66 pub fn into_inner(self) -> T {
67 self.data.into_inner()
68 }
69}
70
71impl<T: ?Sized> Mutex<T> {
72 /// Acquires the mutex.
73 ///
74 /// Returns a guard that releases the mutex when dropped.
75 ///
76 /// # Examples
77 ///
78 /// ```
79 /// # futures_lite::future::block_on(async {
80 /// use async_mutex::Mutex;
81 ///
82 /// let mutex = Mutex::new(10);
83 /// let guard = mutex.lock().await;
84 /// assert_eq!(*guard, 10);
85 /// # })
86 /// ```
87 #[inline]
88 pub async fn lock(&self) -> MutexGuard<'_, T> {
89 if let Some(guard) = self.try_lock() {
90 return guard;
91 }
92 self.acquire_slow().await;
93 MutexGuard(self)
94 }
95
96 /// Slow path for acquiring the mutex.
97 #[cold]
98 async fn acquire_slow(&self) {
99 // Get the current time.
100 let start = Instant::now();
101
102 loop {
103 // Start listening for events.
104 let listener = self.lock_ops.listen();
105
106 // Try locking if nobody is being starved.
107 match self.state.compare_and_swap(0, 1, Ordering::Acquire) {
108 // Lock acquired!
109 0 => return,
110
111 // Lock is held and nobody is starved.
112 1 => {}
113
114 // Somebody is starved.
115 _ => break,
116 }
117
118 // Wait for a notification.
119 listener.await;
120
121 // Try locking if nobody is being starved.
122 match self.state.compare_and_swap(0, 1, Ordering::Acquire) {
123 // Lock acquired!
124 0 => return,
125
126 // Lock is held and nobody is starved.
127 1 => {}
128
129 // Somebody is starved.
130 _ => {
131 // Notify the first listener in line because we probably received a
132 // notification that was meant for a starved task.
133 self.lock_ops.notify(1);
134 break;
135 }
136 }
137
138 // If waiting for too long, fall back to a fairer locking strategy that will prevent
139 // newer lock operations from starving us forever.
140 if start.elapsed() > Duration::from_micros(500) {
141 break;
142 }
143 }
144
145 // Increment the number of starved lock operations.
146 if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
147 // In case of potential overflow, abort.
148 process::abort();
149 }
150
151 // Decrement the counter when exiting this function.
152 let _call = CallOnDrop(|| {
153 self.state.fetch_sub(2, Ordering::Release);
154 });
155
156 loop {
157 // Start listening for events.
158 let listener = self.lock_ops.listen();
159
160 // Try locking if nobody else is being starved.
161 match self.state.compare_and_swap(2, 2 | 1, Ordering::Acquire) {
162 // Lock acquired!
163 2 => return,
164
165 // Lock is held by someone.
166 s if s % 2 == 1 => {}
167
168 // Lock is available.
169 _ => {
170 // Be fair: notify the first listener and then go wait in line.
171 self.lock_ops.notify(1);
172 }
173 }
174
175 // Wait for a notification.
176 listener.await;
177
178 // Try acquiring the lock without waiting for others.
179 if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
180 return;
181 }
182 }
183 }
184
185 /// Attempts to acquire the mutex.
186 ///
187 /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
188 /// guard is returned that releases the mutex when dropped.
189 ///
190 /// # Examples
191 ///
192 /// ```
193 /// use async_mutex::Mutex;
194 ///
195 /// let mutex = Mutex::new(10);
196 /// if let Some(guard) = mutex.try_lock() {
197 /// assert_eq!(*guard, 10);
198 /// }
199 /// # ;
200 /// ```
201 #[inline]
202 pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
203 if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
204 Some(MutexGuard(self))
205 } else {
206 None
207 }
208 }
209
210 /// Returns a mutable reference to the underlying data.
211 ///
212 /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
213 /// borrow statically guarantees the mutex is not already acquired.
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// # futures_lite::future::block_on(async {
219 /// use async_mutex::Mutex;
220 ///
221 /// let mut mutex = Mutex::new(0);
222 /// *mutex.get_mut() = 10;
223 /// assert_eq!(*mutex.lock().await, 10);
224 /// # })
225 /// ```
226 pub fn get_mut(&mut self) -> &mut T {
227 unsafe { &mut *self.data.get() }
228 }
229}
230
231impl<T: ?Sized> Mutex<T> {
232 /// Acquires the mutex and clones a reference to it.
233 ///
234 /// Returns an owned guard that releases the mutex when dropped.
235 ///
236 /// # Examples
237 ///
238 /// ```
239 /// # futures_lite::future::block_on(async {
240 /// use async_mutex::Mutex;
241 /// use std::sync::Arc;
242 ///
243 /// let mutex = Arc::new(Mutex::new(10));
244 /// let guard = mutex.lock_arc().await;
245 /// assert_eq!(*guard, 10);
246 /// # })
247 /// ```
248 #[inline]
249 pub async fn lock_arc(self: &Arc<Self>) -> MutexGuardArc<T> {
250 if let Some(guard) = self.try_lock_arc() {
251 return guard;
252 }
253 self.acquire_slow().await;
254 MutexGuardArc(self.clone())
255 }
256
257 /// Attempts to acquire the mutex and clone a reference to it.
258 ///
259 /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
260 /// owned guard is returned that releases the mutex when dropped.
261 ///
262 /// # Examples
263 ///
264 /// ```
265 /// use async_mutex::Mutex;
266 /// use std::sync::Arc;
267 ///
268 /// let mutex = Arc::new(Mutex::new(10));
269 /// if let Some(guard) = mutex.try_lock() {
270 /// assert_eq!(*guard, 10);
271 /// }
272 /// # ;
273 /// ```
274 #[inline]
275 pub fn try_lock_arc(self: &Arc<Self>) -> Option<MutexGuardArc<T>> {
276 if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
277 Some(MutexGuardArc(self.clone()))
278 } else {
279 None
280 }
281 }
282}
283
284impl<T: fmt::Debug + ?Sized> fmt::Debug for Mutex<T> {
285 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286 struct Locked;
287 impl fmt::Debug for Locked {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 f.write_str("<locked>")
290 }
291 }
292
293 match self.try_lock() {
294 None => f.debug_struct("Mutex").field("data", &Locked).finish(),
295 Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
296 }
297 }
298}
299
300impl<T> From<T> for Mutex<T> {
301 fn from(val: T) -> Mutex<T> {
302 Mutex::new(val)
303 }
304}
305
306impl<T: Default + ?Sized> Default for Mutex<T> {
307 fn default() -> Mutex<T> {
308 Mutex::new(Default::default())
309 }
310}
311
312/// A guard that releases the mutex when dropped.
313pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
314
315unsafe impl<T: Send + ?Sized> Send for MutexGuard<'_, T> {}
316unsafe impl<T: Sync + ?Sized> Sync for MutexGuard<'_, T> {}
317
318impl<'a, T: ?Sized> MutexGuard<'a, T> {
319 /// Returns a reference to the mutex a guard came from.
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// # futures_lite::future::block_on(async {
325 /// use async_mutex::{Mutex, MutexGuard};
326 ///
327 /// let mutex = Mutex::new(10i32);
328 /// let guard = mutex.lock().await;
329 /// dbg!(MutexGuard::source(&guard));
330 /// # })
331 /// ```
332 pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
333 guard.0
334 }
335}
336
337impl<T: ?Sized> Drop for MutexGuard<'_, T> {
338 fn drop(&mut self) {
339 // Remove the last bit and notify a waiting lock operation.
340 self.0.state.fetch_sub(1, Ordering::Release);
341 self.0.lock_ops.notify(1);
342 }
343}
344
345impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuard<'_, T> {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 fmt::Debug::fmt(&**self, f)
348 }
349}
350
351impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuard<'_, T> {
352 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353 (**self).fmt(f)
354 }
355}
356
357impl<T: ?Sized> Deref for MutexGuard<'_, T> {
358 type Target = T;
359
360 fn deref(&self) -> &T {
361 unsafe { &*self.0.data.get() }
362 }
363}
364
365impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
366 fn deref_mut(&mut self) -> &mut T {
367 unsafe { &mut *self.0.data.get() }
368 }
369}
370
371/// An owned guard that releases the mutex when dropped.
372pub struct MutexGuardArc<T: ?Sized>(Arc<Mutex<T>>);
373
374unsafe impl<T: Send + ?Sized> Send for MutexGuardArc<T> {}
375unsafe impl<T: Sync + ?Sized> Sync for MutexGuardArc<T> {}
376
377impl<T: ?Sized> MutexGuardArc<T> {
378 /// Returns a reference to the mutex a guard came from.
379 ///
380 /// # Examples
381 ///
382 /// ```
383 /// # futures_lite::future::block_on(async {
384 /// use async_mutex::{Mutex, MutexGuardArc};
385 /// use std::sync::Arc;
386 ///
387 /// let mutex = Arc::new(Mutex::new(10i32));
388 /// let guard = mutex.lock_arc().await;
389 /// dbg!(MutexGuardArc::source(&guard));
390 /// # })
391 /// ```
392 pub fn source(guard: &MutexGuardArc<T>) -> &Arc<Mutex<T>> {
393 &guard.0
394 }
395}
396
397impl<T: ?Sized> Drop for MutexGuardArc<T> {
398 fn drop(&mut self) {
399 // Remove the last bit and notify a waiting lock operation.
400 self.0.state.fetch_sub(1, Ordering::Release);
401 self.0.lock_ops.notify(1);
402 }
403}
404
405impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuardArc<T> {
406 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
407 fmt::Debug::fmt(&**self, f)
408 }
409}
410
411impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuardArc<T> {
412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413 (**self).fmt(f)
414 }
415}
416
417impl<T: ?Sized> Deref for MutexGuardArc<T> {
418 type Target = T;
419
420 fn deref(&self) -> &T {
421 unsafe { &*self.0.data.get() }
422 }
423}
424
425impl<T: ?Sized> DerefMut for MutexGuardArc<T> {
426 fn deref_mut(&mut self) -> &mut T {
427 unsafe { &mut *self.0.data.get() }
428 }
429}
430
431/// Calls a function when dropped.
432struct CallOnDrop<F: Fn()>(F);
433
434impl<F: Fn()> Drop for CallOnDrop<F> {
435 fn drop(&mut self) {
436 (self.0)();
437 }
438}