Skip to main content

futures_util/lock/
mutex.rs

1use std::cell::UnsafeCell;
2use std::marker::PhantomData;
3use std::ops::{Deref, DerefMut};
4use std::pin::Pin;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::{Arc, Mutex as StdMutex};
7use std::{fmt, mem};
8
9use slab::Slab;
10
11use futures_core::future::{FusedFuture, Future};
12use futures_core::task::{Context, Poll, Waker};
13
14/// A futures-aware mutex.
15///
16/// # Fairness
17///
18/// This mutex provides no fairness guarantees. Tasks may not acquire the mutex
19/// in the order that they requested the lock, and it's possible for a single task
20/// which repeatedly takes the lock to starve other tasks, which may be left waiting
21/// indefinitely.
22pub struct Mutex<T: ?Sized> {
23    state: AtomicUsize,
24    waiters: StdMutex<Slab<Waiter>>,
25    value: UnsafeCell<T>,
26}
27
28impl<T: ?Sized> fmt::Debug for Mutex<T> {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        let state = self.state.load(Ordering::SeqCst);
31        f.debug_struct("Mutex")
32            .field("is_locked", &((state & IS_LOCKED) != 0))
33            .field("has_waiters", &((state & HAS_WAITERS) != 0))
34            .finish()
35    }
36}
37
38impl<T> From<T> for Mutex<T> {
39    fn from(t: T) -> Self {
40        Self::new(t)
41    }
42}
43
44impl<T: Default> Default for Mutex<T> {
45    fn default() -> Self {
46        Self::new(Default::default())
47    }
48}
49
50enum Waiter {
51    Waiting(Waker),
52    Woken,
53}
54
55impl Waiter {
56    fn register(&mut self, waker: &Waker) {
57        match self {
58            Self::Waiting(w) if waker.will_wake(w) => {}
59            _ => *self = Self::Waiting(waker.clone()),
60        }
61    }
62
63    fn wake(&mut self) {
64        match mem::replace(self, Self::Woken) {
65            Self::Waiting(waker) => waker.wake(),
66            Self::Woken => {}
67        }
68    }
69}
70
71const IS_LOCKED: usize = 1 << 0;
72const HAS_WAITERS: usize = 1 << 1;
73
74impl<T> Mutex<T> {
75    /// Creates a new futures-aware mutex.
76    pub const fn new(t: T) -> Self {
77        Self {
78            state: AtomicUsize::new(0),
79            waiters: StdMutex::new(Slab::new()),
80            value: UnsafeCell::new(t),
81        }
82    }
83
84    /// Consumes this mutex, returning the underlying data.
85    ///
86    /// # Examples
87    ///
88    /// ```
89    /// use futures::lock::Mutex;
90    ///
91    /// let mutex = Mutex::new(0);
92    /// assert_eq!(mutex.into_inner(), 0);
93    /// ```
94    pub fn into_inner(self) -> T {
95        self.value.into_inner()
96    }
97}
98
99impl<T: ?Sized> Mutex<T> {
100    /// Attempt to acquire the lock immediately.
101    ///
102    /// If the lock is currently held, this will return `None`.
103    pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
104        let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
105        if (old_state & IS_LOCKED) == 0 {
106            Some(MutexGuard { mutex: self })
107        } else {
108            None
109        }
110    }
111
112    /// Attempt to acquire the lock immediately.
113    ///
114    /// If the lock is currently held, this will return `None`.
115    pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> {
116        let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
117        if (old_state & IS_LOCKED) == 0 {
118            Some(OwnedMutexGuard { mutex: self.clone() })
119        } else {
120            None
121        }
122    }
123
124    /// Acquire the lock asynchronously.
125    ///
126    /// This method returns a future that will resolve once the lock has been
127    /// successfully acquired.
128    pub fn lock(&self) -> MutexLockFuture<'_, T> {
129        MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
130    }
131
132    /// Acquire the lock asynchronously.
133    ///
134    /// This method returns a future that will resolve once the lock has been
135    /// successfully acquired.
136    pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> {
137        OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
138    }
139
140    /// Returns a mutable reference to the underlying data.
141    ///
142    /// Since this call borrows the `Mutex` mutably, no actual locking needs to
143    /// take place -- the mutable borrow statically guarantees no locks exist.
144    ///
145    /// # Examples
146    ///
147    /// ```
148    /// # futures::executor::block_on(async {
149    /// use futures::lock::Mutex;
150    ///
151    /// let mut mutex = Mutex::new(0);
152    /// *mutex.get_mut() = 10;
153    /// assert_eq!(*mutex.lock().await, 10);
154    /// # });
155    /// ```
156    pub fn get_mut(&mut self) -> &mut T {
157        // We know statically that there are no other references to `self`, so
158        // there's no need to lock the inner mutex.
159        unsafe { &mut *self.value.get() }
160    }
161
162    fn remove_waker(&self, wait_key: usize, wake_another: bool) {
163        if wait_key != WAIT_KEY_NONE {
164            let mut waiters = self.waiters.lock().unwrap();
165            match waiters.remove(wait_key) {
166                Waiter::Waiting(_) => {}
167                Waiter::Woken => {
168                    // We were awoken, but then dropped before we could
169                    // wake up to acquire the lock. Wake up another
170                    // waiter.
171                    if wake_another {
172                        if let Some((_i, waiter)) = waiters.iter_mut().next() {
173                            waiter.wake();
174                        }
175                    }
176                }
177            }
178            if waiters.is_empty() {
179                self.state.fetch_and(!HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
180            }
181        }
182    }
183
184    // Unlocks the mutex. Called by MutexGuard and MappedMutexGuard when they are
185    // dropped.
186    fn unlock(&self) {
187        let old_state = self.state.fetch_and(!IS_LOCKED, Ordering::AcqRel);
188        if (old_state & HAS_WAITERS) != 0 {
189            let mut waiters = self.waiters.lock().unwrap();
190            if let Some((_i, waiter)) = waiters.iter_mut().next() {
191                waiter.wake();
192            }
193        }
194    }
195}
196
197// Sentinel for when no slot in the `Slab` has been dedicated to this object.
198const WAIT_KEY_NONE: usize = usize::MAX;
199
200/// A future which resolves when the target mutex has been successfully acquired, owned version.
201pub struct OwnedMutexLockFuture<T: ?Sized> {
202    // `None` indicates that the mutex was successfully acquired.
203    mutex: Option<Arc<Mutex<T>>>,
204    wait_key: usize,
205}
206
207impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> {
208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
209        f.debug_struct("OwnedMutexLockFuture")
210            .field("was_acquired", &self.mutex.is_none())
211            .field("mutex", &self.mutex)
212            .field(
213                "wait_key",
214                &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
215            )
216            .finish()
217    }
218}
219
220impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> {
221    fn is_terminated(&self) -> bool {
222        self.mutex.is_none()
223    }
224}
225
226impl<T: ?Sized> Future for OwnedMutexLockFuture<T> {
227    type Output = OwnedMutexGuard<T>;
228
229    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
230        let this = self.get_mut();
231
232        let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion");
233
234        if let Some(lock) = mutex.try_lock_owned() {
235            mutex.remove_waker(this.wait_key, false);
236            this.mutex = None;
237            return Poll::Ready(lock);
238        }
239
240        {
241            let mut waiters = mutex.waiters.lock().unwrap();
242            if this.wait_key == WAIT_KEY_NONE {
243                this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
244                if waiters.len() == 1 {
245                    mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
246                }
247            } else {
248                waiters[this.wait_key].register(cx.waker());
249            }
250        }
251
252        // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
253        // attempting to acquire the lock again.
254        if let Some(lock) = mutex.try_lock_owned() {
255            mutex.remove_waker(this.wait_key, false);
256            this.mutex = None;
257            return Poll::Ready(lock);
258        }
259
260        Poll::Pending
261    }
262}
263
264impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> {
265    fn drop(&mut self) {
266        if let Some(mutex) = self.mutex.as_ref() {
267            // This future was dropped before it acquired the mutex.
268            //
269            // Remove ourselves from the map, waking up another waiter if we
270            // had been awoken to acquire the lock.
271            mutex.remove_waker(self.wait_key, true);
272        }
273    }
274}
275
276/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods.
277/// When this structure is dropped (falls out of scope), the lock will be
278/// unlocked.
279#[clippy::has_significant_drop]
280pub struct OwnedMutexGuard<T: ?Sized> {
281    mutex: Arc<Mutex<T>>,
282}
283
284impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        f.debug_struct("OwnedMutexGuard")
287            .field("value", &&**self)
288            .field("mutex", &self.mutex)
289            .finish()
290    }
291}
292
293impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
294    fn drop(&mut self) {
295        self.mutex.unlock()
296    }
297}
298
299impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
300    type Target = T;
301    fn deref(&self) -> &T {
302        unsafe { &*self.mutex.value.get() }
303    }
304}
305
306impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
307    fn deref_mut(&mut self) -> &mut T {
308        unsafe { &mut *self.mutex.value.get() }
309    }
310}
311
312/// A future which resolves when the target mutex has been successfully acquired.
313pub struct MutexLockFuture<'a, T: ?Sized> {
314    // `None` indicates that the mutex was successfully acquired.
315    mutex: Option<&'a Mutex<T>>,
316    wait_key: usize,
317}
318
319impl<T: ?Sized> fmt::Debug for MutexLockFuture<'_, T> {
320    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321        f.debug_struct("MutexLockFuture")
322            .field("was_acquired", &self.mutex.is_none())
323            .field("mutex", &self.mutex)
324            .field(
325                "wait_key",
326                &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
327            )
328            .finish()
329    }
330}
331
332impl<T: ?Sized> FusedFuture for MutexLockFuture<'_, T> {
333    fn is_terminated(&self) -> bool {
334        self.mutex.is_none()
335    }
336}
337
338impl<'a, T: ?Sized> Future for MutexLockFuture<'a, T> {
339    type Output = MutexGuard<'a, T>;
340
341    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
342        let mutex = self.mutex.expect("polled MutexLockFuture after completion");
343
344        if let Some(lock) = mutex.try_lock() {
345            mutex.remove_waker(self.wait_key, false);
346            self.mutex = None;
347            return Poll::Ready(lock);
348        }
349
350        {
351            let mut waiters = mutex.waiters.lock().unwrap();
352            if self.wait_key == WAIT_KEY_NONE {
353                self.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
354                if waiters.len() == 1 {
355                    mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
356                }
357            } else {
358                waiters[self.wait_key].register(cx.waker());
359            }
360        }
361
362        // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
363        // attempting to acquire the lock again.
364        if let Some(lock) = mutex.try_lock() {
365            mutex.remove_waker(self.wait_key, false);
366            self.mutex = None;
367            return Poll::Ready(lock);
368        }
369
370        Poll::Pending
371    }
372}
373
374impl<T: ?Sized> Drop for MutexLockFuture<'_, T> {
375    fn drop(&mut self) {
376        if let Some(mutex) = self.mutex {
377            // This future was dropped before it acquired the mutex.
378            //
379            // Remove ourselves from the map, waking up another waiter if we
380            // had been awoken to acquire the lock.
381            mutex.remove_waker(self.wait_key, true);
382        }
383    }
384}
385
386/// An RAII guard returned by the `lock` and `try_lock` methods.
387/// When this structure is dropped (falls out of scope), the lock will be
388/// unlocked.
389#[clippy::has_significant_drop]
390pub struct MutexGuard<'a, T: ?Sized> {
391    mutex: &'a Mutex<T>,
392}
393
394impl<'a, T: ?Sized> MutexGuard<'a, T> {
395    /// Returns a locked view over a portion of the locked data.
396    ///
397    /// # Example
398    ///
399    /// ```
400    /// # futures::executor::block_on(async {
401    /// use futures::lock::{Mutex, MutexGuard};
402    ///
403    /// let data = Mutex::new(Some("value".to_string()));
404    /// {
405    ///     let locked_str = MutexGuard::map(data.lock().await, |opt| opt.as_mut().unwrap());
406    ///     assert_eq!(&*locked_str, "value");
407    /// }
408    /// # });
409    /// ```
410    #[inline]
411    pub fn map<U: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, U>
412    where
413        F: FnOnce(&mut T) -> &mut U,
414    {
415        let mutex = this.mutex;
416        let value = f(unsafe { &mut *this.mutex.value.get() });
417        // Don't run the `drop` method for MutexGuard. The ownership of the underlying
418        // locked state is being moved to the returned MappedMutexGuard.
419        mem::forget(this);
420        MappedMutexGuard { mutex, value, _marker: PhantomData }
421    }
422}
423
424impl<T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
425    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
426        f.debug_struct("MutexGuard").field("value", &&**self).field("mutex", &self.mutex).finish()
427    }
428}
429
430impl<T: ?Sized> Drop for MutexGuard<'_, T> {
431    fn drop(&mut self) {
432        self.mutex.unlock()
433    }
434}
435
436impl<T: ?Sized> Deref for MutexGuard<'_, T> {
437    type Target = T;
438    fn deref(&self) -> &T {
439        unsafe { &*self.mutex.value.get() }
440    }
441}
442
443impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
444    fn deref_mut(&mut self) -> &mut T {
445        unsafe { &mut *self.mutex.value.get() }
446    }
447}
448
449/// An RAII guard returned by the `MutexGuard::map` and `MappedMutexGuard::map` methods.
450/// When this structure is dropped (falls out of scope), the lock will be unlocked.
451#[clippy::has_significant_drop]
452pub struct MappedMutexGuard<'a, T: ?Sized, U: ?Sized> {
453    mutex: &'a Mutex<T>,
454    value: *mut U,
455    _marker: PhantomData<&'a mut U>,
456}
457
458impl<'a, T: ?Sized, U: ?Sized> MappedMutexGuard<'a, T, U> {
459    /// Returns a locked view over a portion of the locked data.
460    ///
461    /// # Example
462    ///
463    /// ```
464    /// # futures::executor::block_on(async {
465    /// use futures::lock::{MappedMutexGuard, Mutex, MutexGuard};
466    ///
467    /// let data = Mutex::new(Some("value".to_string()));
468    /// {
469    ///     let locked_str = MutexGuard::map(data.lock().await, |opt| opt.as_mut().unwrap());
470    ///     let locked_char = MappedMutexGuard::map(locked_str, |s| s.get_mut(0..1).unwrap());
471    ///     assert_eq!(&*locked_char, "v");
472    /// }
473    /// # });
474    /// ```
475    #[inline]
476    pub fn map<V: ?Sized, F>(this: Self, f: F) -> MappedMutexGuard<'a, T, V>
477    where
478        F: FnOnce(&mut U) -> &mut V,
479    {
480        let mutex = this.mutex;
481        let value = f(unsafe { &mut *this.value });
482        // Don't run the `drop` method for MappedMutexGuard. The ownership of the underlying
483        // locked state is being moved to the returned MappedMutexGuard.
484        mem::forget(this);
485        MappedMutexGuard { mutex, value, _marker: PhantomData }
486    }
487}
488
489impl<T: ?Sized, U: ?Sized + fmt::Debug> fmt::Debug for MappedMutexGuard<'_, T, U> {
490    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
491        f.debug_struct("MappedMutexGuard")
492            .field("value", &&**self)
493            .field("mutex", &self.mutex)
494            .finish()
495    }
496}
497
498impl<T: ?Sized, U: ?Sized> Drop for MappedMutexGuard<'_, T, U> {
499    fn drop(&mut self) {
500        self.mutex.unlock()
501    }
502}
503
504impl<T: ?Sized, U: ?Sized> Deref for MappedMutexGuard<'_, T, U> {
505    type Target = U;
506    fn deref(&self) -> &U {
507        unsafe { &*self.value }
508    }
509}
510
511impl<T: ?Sized, U: ?Sized> DerefMut for MappedMutexGuard<'_, T, U> {
512    fn deref_mut(&mut self) -> &mut U {
513        unsafe { &mut *self.value }
514    }
515}
516
517// Mutexes can be moved freely between threads and acquired on any thread so long
518// as the inner value can be safely sent between threads.
519unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
520unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
521
522// It's safe to switch which thread the acquire is being attempted on so long as
523// `T` can be accessed on that thread.
524unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {}
525
526// doesn't have any interesting `&self` methods (only Debug)
527unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {}
528
529// It's safe to switch which thread the acquire is being attempted on so long as
530// `T` can be accessed on that thread.
531unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {}
532
533// doesn't have any interesting `&self` methods (only Debug)
534unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {}
535
536// Safe to send since we don't track any thread-specific details-- the inner
537// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
538unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
539unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
540
541unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {}
542unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {}
543
544unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {}
545unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {}
546
547#[cfg(test)]
548mod tests {
549    use super::*;
550    use std::format;
551
552    #[test]
553    fn test_mutex_guard_debug_not_recurse() {
554        let mutex = Mutex::new(42);
555        let guard = mutex.try_lock().unwrap();
556        let _ = format!("{guard:?}");
557        let guard = MutexGuard::map(guard, |n| n);
558        let _ = format!("{guard:?}");
559    }
560}