async_mutex/
lib.rs

1//! DO NOT USE!
2//!
3//! This crate was merged into [async-lock], which provides the API this crate used to.
4//!
5//! [async-lock]: https://crates.io/crates/async-lock
6
7#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
8
9use std::cell::UnsafeCell;
10use std::fmt;
11use std::ops::{Deref, DerefMut};
12use std::process;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use std::usize;
17
18use event_listener::Event;
19
20/// An async mutex.
21pub struct Mutex<T: ?Sized> {
22    /// Current state of the mutex.
23    ///
24    /// The least significant bit is set to 1 if the mutex is locked.
25    /// The other bits hold the number of starved lock operations.
26    state: AtomicUsize,
27
28    /// Lock operations waiting for the mutex to be released.
29    lock_ops: Event,
30
31    /// The value inside the mutex.
32    data: UnsafeCell<T>,
33}
34
35unsafe impl<T: Send + ?Sized> Send for Mutex<T> {}
36unsafe impl<T: Send + ?Sized> Sync for Mutex<T> {}
37
38impl<T> Mutex<T> {
39    /// Creates a new async mutex.
40    ///
41    /// # Examples
42    ///
43    /// ```
44    /// use async_mutex::Mutex;
45    ///
46    /// let mutex = Mutex::new(0);
47    /// ```
48    pub const fn new(data: T) -> Mutex<T> {
49        Mutex {
50            state: AtomicUsize::new(0),
51            lock_ops: Event::new(),
52            data: UnsafeCell::new(data),
53        }
54    }
55
56    /// Consumes the mutex, returning the underlying data.
57    ///
58    /// # Examples
59    ///
60    /// ```
61    /// use async_mutex::Mutex;
62    ///
63    /// let mutex = Mutex::new(10);
64    /// assert_eq!(mutex.into_inner(), 10);
65    /// ```
66    pub fn into_inner(self) -> T {
67        self.data.into_inner()
68    }
69}
70
71impl<T: ?Sized> Mutex<T> {
72    /// Acquires the mutex.
73    ///
74    /// Returns a guard that releases the mutex when dropped.
75    ///
76    /// # Examples
77    ///
78    /// ```
79    /// # futures_lite::future::block_on(async {
80    /// use async_mutex::Mutex;
81    ///
82    /// let mutex = Mutex::new(10);
83    /// let guard = mutex.lock().await;
84    /// assert_eq!(*guard, 10);
85    /// # })
86    /// ```
87    #[inline]
88    pub async fn lock(&self) -> MutexGuard<'_, T> {
89        if let Some(guard) = self.try_lock() {
90            return guard;
91        }
92        self.acquire_slow().await;
93        MutexGuard(self)
94    }
95
96    /// Slow path for acquiring the mutex.
97    #[cold]
98    async fn acquire_slow(&self) {
99        // Get the current time.
100        let start = Instant::now();
101
102        loop {
103            // Start listening for events.
104            let listener = self.lock_ops.listen();
105
106            // Try locking if nobody is being starved.
107            match self.state.compare_and_swap(0, 1, Ordering::Acquire) {
108                // Lock acquired!
109                0 => return,
110
111                // Lock is held and nobody is starved.
112                1 => {}
113
114                // Somebody is starved.
115                _ => break,
116            }
117
118            // Wait for a notification.
119            listener.await;
120
121            // Try locking if nobody is being starved.
122            match self.state.compare_and_swap(0, 1, Ordering::Acquire) {
123                // Lock acquired!
124                0 => return,
125
126                // Lock is held and nobody is starved.
127                1 => {}
128
129                // Somebody is starved.
130                _ => {
131                    // Notify the first listener in line because we probably received a
132                    // notification that was meant for a starved task.
133                    self.lock_ops.notify(1);
134                    break;
135                }
136            }
137
138            // If waiting for too long, fall back to a fairer locking strategy that will prevent
139            // newer lock operations from starving us forever.
140            if start.elapsed() > Duration::from_micros(500) {
141                break;
142            }
143        }
144
145        // Increment the number of starved lock operations.
146        if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
147            // In case of potential overflow, abort.
148            process::abort();
149        }
150
151        // Decrement the counter when exiting this function.
152        let _call = CallOnDrop(|| {
153            self.state.fetch_sub(2, Ordering::Release);
154        });
155
156        loop {
157            // Start listening for events.
158            let listener = self.lock_ops.listen();
159
160            // Try locking if nobody else is being starved.
161            match self.state.compare_and_swap(2, 2 | 1, Ordering::Acquire) {
162                // Lock acquired!
163                2 => return,
164
165                // Lock is held by someone.
166                s if s % 2 == 1 => {}
167
168                // Lock is available.
169                _ => {
170                    // Be fair: notify the first listener and then go wait in line.
171                    self.lock_ops.notify(1);
172                }
173            }
174
175            // Wait for a notification.
176            listener.await;
177
178            // Try acquiring the lock without waiting for others.
179            if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
180                return;
181            }
182        }
183    }
184
185    /// Attempts to acquire the mutex.
186    ///
187    /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
188    /// guard is returned that releases the mutex when dropped.
189    ///
190    /// # Examples
191    ///
192    /// ```
193    /// use async_mutex::Mutex;
194    ///
195    /// let mutex = Mutex::new(10);
196    /// if let Some(guard) = mutex.try_lock() {
197    ///     assert_eq!(*guard, 10);
198    /// }
199    /// # ;
200    /// ```
201    #[inline]
202    pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
203        if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
204            Some(MutexGuard(self))
205        } else {
206            None
207        }
208    }
209
210    /// Returns a mutable reference to the underlying data.
211    ///
212    /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
213    /// borrow statically guarantees the mutex is not already acquired.
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// # futures_lite::future::block_on(async {
219    /// use async_mutex::Mutex;
220    ///
221    /// let mut mutex = Mutex::new(0);
222    /// *mutex.get_mut() = 10;
223    /// assert_eq!(*mutex.lock().await, 10);
224    /// # })
225    /// ```
226    pub fn get_mut(&mut self) -> &mut T {
227        unsafe { &mut *self.data.get() }
228    }
229}
230
231impl<T: ?Sized> Mutex<T> {
232    /// Acquires the mutex and clones a reference to it.
233    ///
234    /// Returns an owned guard that releases the mutex when dropped.
235    ///
236    /// # Examples
237    ///
238    /// ```
239    /// # futures_lite::future::block_on(async {
240    /// use async_mutex::Mutex;
241    /// use std::sync::Arc;
242    ///
243    /// let mutex = Arc::new(Mutex::new(10));
244    /// let guard = mutex.lock_arc().await;
245    /// assert_eq!(*guard, 10);
246    /// # })
247    /// ```
248    #[inline]
249    pub async fn lock_arc(self: &Arc<Self>) -> MutexGuardArc<T> {
250        if let Some(guard) = self.try_lock_arc() {
251            return guard;
252        }
253        self.acquire_slow().await;
254        MutexGuardArc(self.clone())
255    }
256
257    /// Attempts to acquire the mutex and clone a reference to it.
258    ///
259    /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
260    /// owned guard is returned that releases the mutex when dropped.
261    ///
262    /// # Examples
263    ///
264    /// ```
265    /// use async_mutex::Mutex;
266    /// use std::sync::Arc;
267    ///
268    /// let mutex = Arc::new(Mutex::new(10));
269    /// if let Some(guard) = mutex.try_lock() {
270    ///     assert_eq!(*guard, 10);
271    /// }
272    /// # ;
273    /// ```
274    #[inline]
275    pub fn try_lock_arc(self: &Arc<Self>) -> Option<MutexGuardArc<T>> {
276        if self.state.compare_and_swap(0, 1, Ordering::Acquire) == 0 {
277            Some(MutexGuardArc(self.clone()))
278        } else {
279            None
280        }
281    }
282}
283
284impl<T: fmt::Debug + ?Sized> fmt::Debug for Mutex<T> {
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        struct Locked;
287        impl fmt::Debug for Locked {
288            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289                f.write_str("<locked>")
290            }
291        }
292
293        match self.try_lock() {
294            None => f.debug_struct("Mutex").field("data", &Locked).finish(),
295            Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
296        }
297    }
298}
299
300impl<T> From<T> for Mutex<T> {
301    fn from(val: T) -> Mutex<T> {
302        Mutex::new(val)
303    }
304}
305
306impl<T: Default + ?Sized> Default for Mutex<T> {
307    fn default() -> Mutex<T> {
308        Mutex::new(Default::default())
309    }
310}
311
312/// A guard that releases the mutex when dropped.
313pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
314
315unsafe impl<T: Send + ?Sized> Send for MutexGuard<'_, T> {}
316unsafe impl<T: Sync + ?Sized> Sync for MutexGuard<'_, T> {}
317
318impl<'a, T: ?Sized> MutexGuard<'a, T> {
319    /// Returns a reference to the mutex a guard came from.
320    ///
321    /// # Examples
322    ///
323    /// ```
324    /// # futures_lite::future::block_on(async {
325    /// use async_mutex::{Mutex, MutexGuard};
326    ///
327    /// let mutex = Mutex::new(10i32);
328    /// let guard = mutex.lock().await;
329    /// dbg!(MutexGuard::source(&guard));
330    /// # })
331    /// ```
332    pub fn source(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
333        guard.0
334    }
335}
336
337impl<T: ?Sized> Drop for MutexGuard<'_, T> {
338    fn drop(&mut self) {
339        // Remove the last bit and notify a waiting lock operation.
340        self.0.state.fetch_sub(1, Ordering::Release);
341        self.0.lock_ops.notify(1);
342    }
343}
344
345impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuard<'_, T> {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        fmt::Debug::fmt(&**self, f)
348    }
349}
350
351impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuard<'_, T> {
352    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353        (**self).fmt(f)
354    }
355}
356
357impl<T: ?Sized> Deref for MutexGuard<'_, T> {
358    type Target = T;
359
360    fn deref(&self) -> &T {
361        unsafe { &*self.0.data.get() }
362    }
363}
364
365impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
366    fn deref_mut(&mut self) -> &mut T {
367        unsafe { &mut *self.0.data.get() }
368    }
369}
370
371/// An owned guard that releases the mutex when dropped.
372pub struct MutexGuardArc<T: ?Sized>(Arc<Mutex<T>>);
373
374unsafe impl<T: Send + ?Sized> Send for MutexGuardArc<T> {}
375unsafe impl<T: Sync + ?Sized> Sync for MutexGuardArc<T> {}
376
377impl<T: ?Sized> MutexGuardArc<T> {
378    /// Returns a reference to the mutex a guard came from.
379    ///
380    /// # Examples
381    ///
382    /// ```
383    /// # futures_lite::future::block_on(async {
384    /// use async_mutex::{Mutex, MutexGuardArc};
385    /// use std::sync::Arc;
386    ///
387    /// let mutex = Arc::new(Mutex::new(10i32));
388    /// let guard = mutex.lock_arc().await;
389    /// dbg!(MutexGuardArc::source(&guard));
390    /// # })
391    /// ```
392    pub fn source(guard: &MutexGuardArc<T>) -> &Arc<Mutex<T>> {
393        &guard.0
394    }
395}
396
397impl<T: ?Sized> Drop for MutexGuardArc<T> {
398    fn drop(&mut self) {
399        // Remove the last bit and notify a waiting lock operation.
400        self.0.state.fetch_sub(1, Ordering::Release);
401        self.0.lock_ops.notify(1);
402    }
403}
404
405impl<T: fmt::Debug + ?Sized> fmt::Debug for MutexGuardArc<T> {
406    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
407        fmt::Debug::fmt(&**self, f)
408    }
409}
410
411impl<T: fmt::Display + ?Sized> fmt::Display for MutexGuardArc<T> {
412    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413        (**self).fmt(f)
414    }
415}
416
417impl<T: ?Sized> Deref for MutexGuardArc<T> {
418    type Target = T;
419
420    fn deref(&self) -> &T {
421        unsafe { &*self.0.data.get() }
422    }
423}
424
425impl<T: ?Sized> DerefMut for MutexGuardArc<T> {
426    fn deref_mut(&mut self) -> &mut T {
427        unsafe { &mut *self.0.data.get() }
428    }
429}
430
431/// Calls a function when dropped.
432struct CallOnDrop<F: Fn()>(F);
433
434impl<F: Fn()> Drop for CallOnDrop<F> {
435    fn drop(&mut self) {
436        (self.0)();
437    }
438}