async_std/sync/mutex.rs
1use std::cell::UnsafeCell;
2use std::fmt;
3use std::ops::{Deref, DerefMut};
4use std::pin::Pin;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::future::Future;
7
8use crate::sync::WakerSet;
9use crate::task::{Context, Poll};
10
11/// A mutual exclusion primitive for protecting shared data.
12///
13/// This type is an async version of [`std::sync::Mutex`].
14///
15/// [`std::sync::Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html
16///
17/// # Examples
18///
19/// ```
20/// # async_std::task::block_on(async {
21/// #
22/// use async_std::sync::{Arc, Mutex};
23/// use async_std::task;
24///
25/// let m = Arc::new(Mutex::new(0));
26/// let mut tasks = vec![];
27///
28/// for _ in 0..10 {
29/// let m = m.clone();
30/// tasks.push(task::spawn(async move {
31/// *m.lock().await += 1;
32/// }));
33/// }
34///
35/// for t in tasks {
36/// t.await.unwrap();
37/// }
38/// assert_eq!(*m.lock().await, 10);
39/// #
40/// # })
41/// ```
42pub struct Mutex<T: ?Sized> {
43 locked: AtomicBool,
44 wakers: WakerSet,
45 value: UnsafeCell<T>,
46}
47
48unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
49unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
50
51impl<T> Mutex<T> {
52 /// Creates a new mutex.
53 ///
54 /// # Examples
55 ///
56 /// ```
57 /// use async_std::sync::Mutex;
58 ///
59 /// let mutex = Mutex::new(0);
60 /// ```
61 pub fn new(t: T) -> Mutex<T> {
62 Mutex {
63 locked: AtomicBool::new(false),
64 wakers: WakerSet::new(),
65 value: UnsafeCell::new(t),
66 }
67 }
68}
69
70impl<T: ?Sized> Mutex<T> {
71 /// Acquires the lock.
72 ///
73 /// Returns a guard that releases the lock when dropped.
74 ///
75 /// # Examples
76 ///
77 /// ```
78 /// # async_std::task::block_on(async {
79 /// #
80 /// use async_std::sync::{Arc, Mutex};
81 /// use async_std::task;
82 ///
83 /// let m1 = Arc::new(Mutex::new(10));
84 /// let m2 = m1.clone();
85 ///
86 /// task::spawn(async move {
87 /// *m1.lock().await = 20;
88 /// })
89 /// .await.unwrap();
90 ///
91 /// assert_eq!(*m2.lock().await, 20);
92 /// #
93 /// # })
94 /// ```
95 pub async fn lock(&self) -> MutexGuard<'_, T> {
96 pub struct LockFuture<'a, T: ?Sized> {
97 mutex: &'a Mutex<T>,
98 opt_key: Option<usize>,
99 }
100
101 impl<'a, T: ?Sized> Future for LockFuture<'a, T> {
102 type Output = MutexGuard<'a, T>;
103
104 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105 loop {
106 // If the current task is in the set, remove it.
107 if let Some(key) = self.opt_key.take() {
108 self.mutex.wakers.remove(key);
109 }
110
111 // Try acquiring the lock.
112 match self.mutex.try_lock() {
113 Some(guard) => return Poll::Ready(guard),
114 None => {
115 // Insert this lock operation.
116 self.opt_key = Some(self.mutex.wakers.insert(cx));
117
118 // If the mutex is still locked, return.
119 if self.mutex.locked.load(Ordering::SeqCst) {
120 return Poll::Pending;
121 }
122 }
123 }
124 }
125 }
126 }
127
128 impl<T: ?Sized> Drop for LockFuture<'_, T> {
129 fn drop(&mut self) {
130 // If the current task is still in the set, that means it is being cancelled now.
131 if let Some(key) = self.opt_key {
132 self.mutex.wakers.cancel(key);
133 }
134 }
135 }
136
137 LockFuture {
138 mutex: self,
139 opt_key: None,
140 }
141 .await
142 }
143
144 /// Attempts to acquire the lock.
145 ///
146 /// If the lock could not be acquired at this time, then [`None`] is returned. Otherwise, a
147 /// guard is returned that releases the lock when dropped.
148 ///
149 /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
150 ///
151 /// # Examples
152 ///
153 /// ```
154 /// # async_std::task::block_on(async {
155 /// #
156 /// use async_std::sync::{Arc, Mutex};
157 /// use async_std::task;
158 ///
159 /// let m1 = Arc::new(Mutex::new(10));
160 /// let m2 = m1.clone();
161 ///
162 /// task::spawn(async move {
163 /// if let Some(mut guard) = m1.try_lock() {
164 /// *guard = 20;
165 /// } else {
166 /// println!("try_lock failed");
167 /// }
168 /// })
169 /// .await.unwrap();
170 ///
171 /// assert_eq!(*m2.lock().await, 20);
172 /// #
173 /// # })
174 /// ```
175 #[inline]
176 pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
177 if !self.locked.swap(true, Ordering::SeqCst) {
178 Some(MutexGuard(self))
179 } else {
180 None
181 }
182 }
183
184 /// Consumes the mutex, returning the underlying data.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// use async_std::sync::Mutex;
190 ///
191 /// let mutex = Mutex::new(10);
192 /// assert_eq!(mutex.into_inner(), 10);
193 /// ```
194 pub fn into_inner(self) -> T where T: Sized {
195 self.value.into_inner()
196 }
197
198 /// Returns a mutable reference to the underlying data.
199 ///
200 /// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
201 /// borrow statically guarantees no locks exist.
202 ///
203 /// # Examples
204 ///
205 /// ```
206 /// # async_std::task::block_on(async {
207 /// #
208 /// use async_std::sync::Mutex;
209 ///
210 /// let mut mutex = Mutex::new(0);
211 /// *mutex.get_mut() = 10;
212 /// assert_eq!(*mutex.lock().await, 10);
213 /// #
214 /// # })
215 /// ```
216 pub fn get_mut(&mut self) -> &mut T {
217 unsafe { &mut *self.value.get() }
218 }
219}
220
221impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 struct Locked;
224 impl fmt::Debug for Locked {
225 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226 f.write_str("<locked>")
227 }
228 }
229
230 match self.try_lock() {
231 None => f.debug_struct("Mutex").field("data", &Locked).finish(),
232 Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
233 }
234 }
235}
236
237impl<T> From<T> for Mutex<T> {
238 fn from(val: T) -> Mutex<T> {
239 Mutex::new(val)
240 }
241}
242
243impl<T: ?Sized + Default> Default for Mutex<T> {
244 fn default() -> Mutex<T> {
245 Mutex::new(Default::default())
246 }
247}
248
249/// A guard that releases the lock when dropped.
250pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
251
252unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
253unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
254
255impl<T: ?Sized> Drop for MutexGuard<'_, T> {
256 fn drop(&mut self) {
257 // Use `SeqCst` ordering to synchronize with `WakerSet::insert()` and `WakerSet::update()`.
258 self.0.locked.store(false, Ordering::SeqCst);
259
260 // Notify a blocked `lock()` operation if none were notified already.
261 self.0.wakers.notify_any();
262 }
263}
264
265impl<T: ?Sized +fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
266 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267 fmt::Debug::fmt(&**self, f)
268 }
269}
270
271impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273 (**self).fmt(f)
274 }
275}
276
277impl<T: ?Sized> Deref for MutexGuard<'_, T> {
278 type Target = T;
279
280 fn deref(&self) -> &T {
281 unsafe { &*self.0.value.get() }
282 }
283}
284
285impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
286 fn deref_mut(&mut self) -> &mut T {
287 unsafe { &mut *self.0.value.get() }
288 }
289}