async_std/sync/
mutex.rs

1use std::cell::UnsafeCell;
2use std::fmt;
3use std::ops::{Deref, DerefMut};
4use std::pin::Pin;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::future::Future;
7
8use crate::sync::WakerSet;
9use crate::task::{Context, Poll};
10
11/// A mutual exclusion primitive for protecting shared data.
12///
13/// This type is an async version of [`std::sync::Mutex`].
14///
15/// [`std::sync::Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html
16///
17/// # Examples
18///
19/// ```
20/// # async_std::task::block_on(async {
21/// #
22/// use async_std::sync::{Arc, Mutex};
23/// use async_std::task;
24///
25/// let m = Arc::new(Mutex::new(0));
26/// let mut tasks = vec![];
27///
28/// for _ in 0..10 {
29///     let m = m.clone();
30///     tasks.push(task::spawn(async move {
31///         *m.lock().await += 1;
32///     }));
33/// }
34///
35/// for t in tasks {
36///     t.await.unwrap();
37/// }
38/// assert_eq!(*m.lock().await, 10);
39/// #
40/// # })
41/// ```
42pub struct Mutex<T: ?Sized> {
43    locked: AtomicBool,
44    wakers: WakerSet,
45    value: UnsafeCell<T>,
46}
47
48unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
49unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
50
51impl<T> Mutex<T> {
52    /// Creates a new mutex.
53    ///
54    /// # Examples
55    ///
56    /// ```
57    /// use async_std::sync::Mutex;
58    ///
59    /// let mutex = Mutex::new(0);
60    /// ```
61    pub fn new(t: T) -> Mutex<T> {
62        Mutex {
63            locked: AtomicBool::new(false),
64            wakers: WakerSet::new(),
65            value: UnsafeCell::new(t),
66        }
67    }
68}
69
70impl<T: ?Sized> Mutex<T> {
71    /// Acquires the lock.
72    ///
73    /// Returns a guard that releases the lock when dropped.
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// # async_std::task::block_on(async {
79    /// #
80    /// use async_std::sync::{Arc, Mutex};
81    /// use async_std::task;
82    ///
83    /// let m1 = Arc::new(Mutex::new(10));
84    /// let m2 = m1.clone();
85    ///
86    /// task::spawn(async move {
87    ///     *m1.lock().await = 20;
88    /// })
89    /// .await.unwrap();
90    ///
91    /// assert_eq!(*m2.lock().await, 20);
92    /// #
93    /// # })
94    /// ```
95    pub async fn lock(&self) -> MutexGuard<'_, T> {
96        pub struct LockFuture<'a, T: ?Sized> {
97            mutex: &'a Mutex<T>,
98            opt_key: Option<usize>,
99        }
100
101        impl<'a, T: ?Sized> Future for LockFuture<'a, T> {
102            type Output = MutexGuard<'a, T>;
103
104            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105                loop {
106                    // If the current task is in the set, remove it.
107                    if let Some(key) = self.opt_key.take() {
108                        self.mutex.wakers.remove(key);
109                    }
110
111                    // Try acquiring the lock.
112                    match self.mutex.try_lock() {
113                        Some(guard) => return Poll::Ready(guard),
114                        None => {
115                            // Insert this lock operation.
116                            self.opt_key = Some(self.mutex.wakers.insert(cx));
117
118                            // If the mutex is still locked, return.
119                            if self.mutex.locked.load(Ordering::SeqCst) {
120                                return Poll::Pending;
121                            }
122                        }
123                    }
124                }
125            }
126        }
127
128        impl<T: ?Sized> Drop for LockFuture<'_, T> {
129            fn drop(&mut self) {
130                // If the current task is still in the set, that means it is being cancelled now.
131                if let Some(key) = self.opt_key {
132                    self.mutex.wakers.cancel(key);
133                }
134            }
135        }
136
137        LockFuture {
138            mutex: self,
139            opt_key: None,
140        }
141        .await
142    }
143
144    /// Attempts to acquire the lock.
145    ///
146    /// If the lock could not be acquired at this time, then [`None`] is returned. Otherwise, a
147    /// guard is returned that releases the lock when dropped.
148    ///
149    /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// # async_std::task::block_on(async {
155    /// #
156    /// use async_std::sync::{Arc, Mutex};
157    /// use async_std::task;
158    ///
159    /// let m1 = Arc::new(Mutex::new(10));
160    /// let m2 = m1.clone();
161    ///
162    /// task::spawn(async move {
163    ///     if let Some(mut guard) = m1.try_lock() {
164    ///         *guard = 20;
165    ///     } else {
166    ///         println!("try_lock failed");
167    ///     }
168    /// })
169    /// .await.unwrap();
170    ///
171    /// assert_eq!(*m2.lock().await, 20);
172    /// #
173    /// # })
174    /// ```
175    #[inline]
176    pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
177        if !self.locked.swap(true, Ordering::SeqCst) {
178            Some(MutexGuard(self))
179        } else {
180            None
181        }
182    }
183
184    /// Consumes the mutex, returning the underlying data.
185    ///
186    /// # Examples
187    ///
188    /// ```
189    /// use async_std::sync::Mutex;
190    ///
191    /// let mutex = Mutex::new(10);
192    /// assert_eq!(mutex.into_inner(), 10);
193    /// ```
194    pub fn into_inner(self) -> T where T: Sized {
195        self.value.into_inner()
196    }
197
198    /// Returns a mutable reference to the underlying data.
199    ///
200    /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
201    /// borrow statically guarantees no locks exist.
202    ///
203    /// # Examples
204    ///
205    /// ```
206    /// # async_std::task::block_on(async {
207    /// #
208    /// use async_std::sync::Mutex;
209    ///
210    /// let mut mutex = Mutex::new(0);
211    /// *mutex.get_mut() = 10;
212    /// assert_eq!(*mutex.lock().await, 10);
213    /// #
214    /// # })
215    /// ```
216    pub fn get_mut(&mut self) -> &mut T {
217        unsafe { &mut *self.value.get() }
218    }
219}
220
221impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        struct Locked;
224        impl fmt::Debug for Locked {
225            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226                f.write_str("<locked>")
227            }
228        }
229
230        match self.try_lock() {
231            None => f.debug_struct("Mutex").field("data", &Locked).finish(),
232            Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
233        }
234    }
235}
236
237impl<T> From<T> for Mutex<T> {
238    fn from(val: T) -> Mutex<T> {
239        Mutex::new(val)
240    }
241}
242
243impl<T: ?Sized + Default> Default for Mutex<T> {
244    fn default() -> Mutex<T> {
245        Mutex::new(Default::default())
246    }
247}
248
249/// A guard that releases the lock when dropped.
250pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
251
252unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
253unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
254
255impl<T: ?Sized> Drop for MutexGuard<'_, T> {
256    fn drop(&mut self) {
257        // Use `SeqCst` ordering to synchronize with `WakerSet::insert()` and `WakerSet::update()`.
258        self.0.locked.store(false, Ordering::SeqCst);
259
260        // Notify a blocked `lock()` operation if none were notified already.
261        self.0.wakers.notify_any();
262    }
263}
264
265impl<T: ?Sized +fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        fmt::Debug::fmt(&**self, f)
268    }
269}
270
271impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        (**self).fmt(f)
274    }
275}
276
277impl<T: ?Sized> Deref for MutexGuard<'_, T> {
278    type Target = T;
279
280    fn deref(&self) -> &T {
281        unsafe { &*self.0.value.get() }
282    }
283}
284
285impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
286    fn deref_mut(&mut self) -> &mut T {
287        unsafe { &mut *self.0.value.get() }
288    }
289}