safina_sync/
mutex.rs

1#![forbid(unsafe_code)]
2
3use core::future::Future;
4use core::ops::{Deref, DerefMut};
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use std::collections::VecDeque;
8use std::sync::TryLockError;
9use std::task::Waker;
10
11/// An [RAII](https://doc.rust-lang.org/rust-by-example/scope/raii.html)
12/// scoped lock of a [`Mutex`](struct.Mutex.html).
13/// It automatically unlocks the mutex when dropped (falls out of scope).
14///
15/// You can access the data in the mutex through this guard's
16/// [`Deref`](https://doc.rust-lang.org/stable/std/ops/trait.Deref.html)
17/// and
18/// [`DerefMut`](https://doc.rust-lang.org/stable/std/ops/trait.DerefMut.html)
19/// implementations.
20///
21/// The struct is not
22/// [`Send`](https://doc.rust-lang.org/stable/std/marker/trait.Send.html)
23/// so you cannot await while holding it.
24///
25/// If a task panics while holding the struct, the underlying mutex becomes
26/// ["poisoned"](https://doc.rust-lang.org/stable/std/sync/struct.Mutex.html#poisoning)
27/// and subsequent calls to `lock` will panic.
28#[allow(clippy::module_name_repetitions)]
29pub struct MutexGuard<'a, T> {
30    mutex: &'a Mutex<T>,
31    value_guard: Option<std::sync::MutexGuard<'a, T>>,
32}
33impl<'a, T> MutexGuard<'a, T> {
34    fn new(mutex: &'a Mutex<T>, value_guard: std::sync::MutexGuard<'a, T>) -> MutexGuard<'a, T> {
35        let mut inner_guard = mutex.inner.lock().unwrap();
36        assert!(!inner_guard.locked);
37        inner_guard.locked = true;
38        MutexGuard {
39            mutex,
40            value_guard: Some(value_guard),
41        }
42    }
43}
44impl<'a, T> Drop for MutexGuard<'a, T> {
45    fn drop(&mut self) {
46        let mut wakers = VecDeque::new();
47        {
48            let mut inner_guard = self.mutex.inner.lock().unwrap();
49            assert!(inner_guard.locked);
50            inner_guard.locked = false;
51            std::mem::swap(&mut inner_guard.wakers, &mut wakers);
52        }
53        self.value_guard.take();
54        for waker in wakers {
55            waker.wake();
56        }
57    }
58}
59impl<'a, T> Deref for MutexGuard<'a, T> {
60    type Target = T;
61    fn deref(&self) -> &Self::Target {
62        &*self.value_guard.as_ref().unwrap()
63    }
64}
65impl<'a, T> DerefMut for MutexGuard<'a, T> {
66    fn deref_mut(&mut self) -> &mut Self::Target {
67        &mut *self.value_guard.as_mut().unwrap()
68    }
69}
70
71#[doc(hidden)]
72pub struct LockFuture<'a, T> {
73    mutex: &'a Mutex<T>,
74}
75impl<'a, T> Future for LockFuture<'a, T> {
76    type Output = MutexGuard<'a, T>;
77
78    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79        loop {
80            match self.mutex.value.try_lock() {
81                Ok(guard) => return Poll::Ready(MutexGuard::new(self.mutex, guard)),
82                Err(TryLockError::Poisoned(e)) => panic!("{}", e),
83                Err(TryLockError::WouldBlock) => {}
84            }
85            let mut guard = self.mutex.inner.lock().unwrap();
86            if guard.locked {
87                // Mutex is locked.  Add our waker and sleep.
88                guard.wakers.push_back(cx.waker().clone());
89                return Poll::Pending;
90            }
91            // Mutex is now unlocked.  Try to acquire it again.
92        }
93    }
94}
95
96struct Inner {
97    wakers: VecDeque<Waker>,
98    locked: bool,
99}
100
101/// A wrapper around
102/// [`std::sync::Mutex`](https://doc.rust-lang.org/stable/std/sync/struct.Mutex.html)
103/// with an async [`lock`](#method.lock) method.
104pub struct Mutex<T> {
105    inner: std::sync::Mutex<Inner>,
106    value: std::sync::Mutex<T>,
107}
108
109impl<T> Mutex<T> {
110    pub fn new(value: T) -> Mutex<T> {
111        Self {
112            inner: std::sync::Mutex::new(Inner {
113                wakers: VecDeque::new(),
114                locked: false,
115            }),
116            value: std::sync::Mutex::new(value),
117        }
118    }
119
120    /// Acquires the mutex, sleeping the current task until it is able to do so.
121    ///
122    /// After this function returns, is the only task holding the lock.
123    ///
124    /// The returned guard is an
125    /// [RAII](https://doc.rust-lang.org/rust-by-example/scope/raii.html)
126    /// scoped lock.
127    /// It automatically unlocks the mutex when dropped (falls out of scope).
128    ///
129    /// The returned guard is not
130    /// [`Send`](https://doc.rust-lang.org/stable/std/marker/trait.Send.html)
131    /// so you cannot `await` while holding it.
132    ///
133    /// If a task panics while holding the guard, the underlying mutex becomes
134    /// ["poisoned"](https://doc.rust-lang.org/stable/std/sync/struct.Mutex.html#poisoning)
135    /// and subsequent calls to `lock` will panic.
136    pub async fn lock(&self) -> MutexGuard<'_, T> {
137        LockFuture { mutex: self }.await
138    }
139}