futures_locks/mutex.rs
1// vim: tw=80
2
3use futures_channel::oneshot;
4use futures_task::{Context, Poll};
5use std::{
6 cell::UnsafeCell,
7 clone::Clone,
8 collections::VecDeque,
9 future::Future,
10 ops::{Deref, DerefMut},
11 pin::Pin,
12 sync
13};
14use super::{FutState, TryLockError};
15#[cfg(feature = "tokio")] use tokio::task;
16
17/// An RAII mutex guard, much like `std::sync::MutexGuard`. The wrapped data
18/// can be accessed via its `Deref` and `DerefMut` implementations.
19#[derive(Debug)]
20pub struct MutexGuard<T: ?Sized> {
21 mutex: Mutex<T>
22}
23
24impl<T: ?Sized> Drop for MutexGuard<T> {
25 fn drop(&mut self) {
26 self.mutex.unlock();
27 }
28}
29
30impl<T: ?Sized> Deref for MutexGuard<T> {
31 type Target = T;
32
33 fn deref(&self) -> &T {
34 unsafe {&*self.mutex.inner.data.get()}
35 }
36}
37
38impl<T: ?Sized> DerefMut for MutexGuard<T> {
39 fn deref_mut(&mut self) -> &mut T {
40 unsafe {&mut *self.mutex.inner.data.get()}
41 }
42}
43
44/// A `Future` representing a pending `Mutex` acquisition.
45pub struct MutexFut<T: ?Sized> {
46 state: FutState,
47 mutex: Mutex<T>,
48}
49
50impl<T: ?Sized> MutexFut<T> {
51 fn new(state: FutState, mutex: Mutex<T>) -> Self {
52 MutexFut{state, mutex}
53 }
54}
55
56impl<T: ?Sized> Drop for MutexFut<T> {
57 fn drop(&mut self) {
58 match self.state {
59 FutState::New => {
60 // Mutex hasn't yet been modified; nothing to do
61 },
62 FutState::Pending(ref mut rx) => {
63 rx.close();
64 match rx.try_recv() {
65 Ok(Some(())) => {
66 // This future received ownership of the mutex, but got
67 // dropped before it was ever polled. Release the
68 // mutex.
69 self.mutex.unlock()
70 },
71 Ok(None) => {
72 // Dropping the Future before it acquires the Mutex is
73 // equivalent to cancelling it.
74 },
75 Err(oneshot::Canceled) => {
76 // Never received ownership of the mutex
77 }
78 }
79 },
80 FutState::Acquired => {
81 // The MutexGuard will take care of releasing the Mutex
82 }
83 }
84 }
85}
86
87impl<T: ?Sized> Future for MutexFut<T> {
88 type Output = MutexGuard<T>;
89
90 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
91 let (result, new_state) = match self.state {
92 FutState::New => {
93 let mut mtx_data = self.mutex.inner.mutex.lock()
94 .expect("sync::Mutex::lock");
95 if mtx_data.owned {
96 let (tx, mut rx) = oneshot::channel::<()>();
97 mtx_data.waiters.push_back(tx);
98 // Even though we know it isn't ready, we need to poll the
99 // receiver in order to register our task for notification.
100 assert!(Pin::new(&mut rx).poll(cx).is_pending());
101 (Poll::Pending, FutState::Pending(rx))
102 } else {
103 mtx_data.owned = true;
104 let guard = MutexGuard{mutex: self.mutex.clone()};
105 (Poll::Ready(guard), FutState::Acquired)
106 }
107 },
108 FutState::Pending(ref mut rx) => {
109 match Pin::new(rx).poll(cx) {
110 Poll::Pending => return Poll::Pending,
111 Poll::Ready(_) => {
112 let state = FutState::Acquired;
113 let result = Poll::Ready(
114 MutexGuard{mutex: self.mutex.clone()}
115 );
116 (result, state)
117 } //LCOV_EXCL_LINE kcov false negative
118 }
119 },
120 FutState::Acquired => panic!("Double-poll of ready Future")
121 };
122 self.state = new_state;
123 result
124 }
125}
126
127#[derive(Debug, Default)]
128struct MutexData {
129 owned: bool,
130 // FIFO queue of waiting tasks.
131 waiters: VecDeque<oneshot::Sender<()>>,
132}
133
134#[derive(Debug, Default)]
135struct Inner<T: ?Sized> {
136 mutex: sync::Mutex<MutexData>,
137 data: UnsafeCell<T>,
138}
139
140/// `MutexWeak` is a non-owning reference to a [`Mutex`]. `MutexWeak` is to
141/// [`Mutex`] as [`std::sync::Weak`] is to [`std::sync::Arc`].
142///
143/// # Examples
144/// ```
145/// # use futures_locks::{Mutex,MutexGuard};
146/// # fn main() {
147/// let mutex = Mutex::<u32>::new(0);
148/// let mutex_weak = Mutex::downgrade(&mutex);
149/// let mutex_new = mutex_weak.upgrade().unwrap();
150/// # }
151/// ```
152///
153/// [`Mutex`]: struct.Mutex.html
154/// [`std::sync::Weak`]: https://doc.rust-lang.org/std/sync/struct.Weak.html
155/// [`std::sync::Arc`]: https://doc.rust-lang.org/std/sync/struct.Arc.html
156#[derive(Debug)]
157pub struct MutexWeak<T: ?Sized> {
158 inner: sync::Weak<Inner<T>>,
159}
160
161impl<T: ?Sized> MutexWeak<T> {
162 /// Tries to upgrade the `MutexWeak` to `Mutex`. If the `Mutex` was dropped
163 /// then the function return `None`.
164 pub fn upgrade(&self) -> Option<Mutex<T>> {
165 if let Some(inner) = self.inner.upgrade() {
166 return Some(Mutex{inner})
167 }
168 None
169 }
170}
171
172impl<T: ?Sized> Clone for MutexWeak<T> {
173 fn clone(&self) -> MutexWeak<T> {
174 MutexWeak {inner: self.inner.clone()}
175 }
176}
177
178// Clippy doesn't like the Arc within Inner. But the access rules of the Mutex
179// make it safe to send. std::sync::Mutex has the same Send impl
180#[allow(clippy::non_send_fields_in_send_ty)]
181unsafe impl<T: ?Sized + Send> Send for MutexWeak<T> {}
182unsafe impl<T: ?Sized + Send> Sync for MutexWeak<T> {}
183
184/// A Futures-aware Mutex.
185///
186/// `std::sync::Mutex` cannot be used in an asynchronous environment like Tokio,
187/// because a mutex acquisition can block an entire reactor. This class can be
188/// used instead. It functions much like `std::sync::Mutex`. Unlike that
189/// class, it also has a builtin `Arc`, making it accessible from multiple
190/// threads. It's also safe to `clone`. Also unlike `std::sync::Mutex`, this
191/// class does not detect lock poisoning.
192///
193/// # Examples
194///
195/// ```
196/// # use futures_locks::*;
197/// # use futures::executor::block_on;
198/// # use futures::{Future, FutureExt};
199/// # fn main() {
200/// let mtx = Mutex::<u32>::new(0);
201/// let fut = mtx.lock().map(|mut guard| { *guard += 5; });
202/// block_on(fut);
203/// assert_eq!(mtx.try_unwrap().unwrap(), 5);
204/// # }
205/// ```
206#[derive(Debug, Default)]
207pub struct Mutex<T: ?Sized> {
208 inner: sync::Arc<Inner<T>>,
209}
210
211impl<T: ?Sized> Clone for Mutex<T> {
212 fn clone(&self) -> Mutex<T> {
213 Mutex { inner: self.inner.clone()}
214 }
215}
216
217impl<T> Mutex<T> {
218 /// Create a new `Mutex` in the unlocked state.
219 pub fn new(t: T) -> Mutex<T> {
220 let mutex_data = MutexData {
221 owned: false,
222 waiters: VecDeque::new(),
223 };
224 let inner = Inner {
225 mutex: sync::Mutex::new(mutex_data),
226 data: UnsafeCell::new(t)
227 }; //LCOV_EXCL_LINE kcov false negative
228 Mutex { inner: sync::Arc::new(inner)}
229 }
230
231 /// Consumes the `Mutex` and returns the wrapped data. If the `Mutex` still
232 /// has multiple references (not necessarily locked), returns a copy of
233 /// `self` instead.
234 pub fn try_unwrap(self) -> Result<T, Mutex<T>> {
235 match sync::Arc::try_unwrap(self.inner) {
236 Ok(inner) => Ok({
237 // `unsafe` is no longer needed as of somewhere around 1.25.0.
238 // https://github.com/rust-lang/rust/issues/35067
239 #[allow(unused_unsafe)]
240 unsafe { inner.data.into_inner() }
241 }),
242 Err(arc) => Err(Mutex {inner: arc})
243 }
244 }
245}
246
247impl<T: ?Sized> Mutex<T> {
248 /// Create a [`MutexWeak`] reference to this `Mutex`.
249 ///
250 /// [`MutexWeak`]: struct.MutexWeak.html
251 pub fn downgrade(this: &Mutex<T>) -> MutexWeak<T> {
252 MutexWeak {inner: sync::Arc::<Inner<T>>::downgrade(&this.inner)}
253 }
254
255 /// Returns a reference to the underlying data, if there are no other
256 /// clones of the `Mutex`.
257 ///
258 /// Since this call borrows the `Mutex` mutably, no actual locking takes
259 /// place -- the mutable borrow statically guarantees no locks exist.
260 /// However, if the `Mutex` has already been cloned, then `None` will be
261 /// returned instead.
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// # use futures_locks::*;
267 /// # fn main() {
268 /// let mut mtx = Mutex::<u32>::new(0);
269 /// *mtx.get_mut().unwrap() += 5;
270 /// assert_eq!(mtx.try_unwrap().unwrap(), 5);
271 /// # }
272 /// ```
273 pub fn get_mut(&mut self) -> Option<&mut T> {
274 if let Some(inner) = sync::Arc::get_mut(&mut self.inner) {
275 let lock_data = inner.mutex.get_mut().unwrap();
276 let data = unsafe { inner.data.get().as_mut() }.unwrap();
277 debug_assert!(!lock_data.owned);
278 Some(data)
279 } else {
280 None
281 }
282 }
283
284 /// Acquires a `Mutex`, blocking the task in the meantime. When the
285 /// returned `Future` is ready, this task will have sole access to the
286 /// protected data.
287 pub fn lock(&self) -> MutexFut<T> {
288 MutexFut::new(FutState::New, self.clone())
289 }
290
291 /// Attempts to acquire the lock.
292 ///
293 /// If the operation would block, returns `Err` instead. Otherwise, returns
294 /// a guard (not a `Future`).
295 ///
296 /// # Examples
297 /// ```
298 /// # use futures_locks::*;
299 /// # fn main() {
300 /// let mut mtx = Mutex::<u32>::new(0);
301 /// match mtx.try_lock() {
302 /// Ok(mut guard) => *guard += 5,
303 /// Err(_) => println!("Better luck next time!")
304 /// };
305 /// # }
306 /// ```
307 pub fn try_lock(&self) -> Result<MutexGuard<T>, TryLockError> {
308 let mut mtx_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
309 if mtx_data.owned {
310 Err(TryLockError)
311 } else {
312 mtx_data.owned = true;
313 Ok(MutexGuard{mutex: self.clone()})
314 }
315 }
316
317 /// Release the `Mutex`
318 fn unlock(&self) {
319 let mut mtx_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
320 assert!(mtx_data.owned);
321
322 while let Some(tx) = mtx_data.waiters.pop_front() {
323 if tx.send(()).is_ok() {
324 return;
325 }
326 // An error indicates that the waiter's future was dropped
327 }
328 // Relinquish ownership
329 mtx_data.owned = false;
330 }
331
332 /// Returns true if the two `Mutex` point to the same data else false.
333 pub fn ptr_eq(this: &Mutex<T>, other: &Mutex<T>) -> bool {
334 sync::Arc::ptr_eq(&this.inner, &other.inner)
335 }
336}
337
338impl<T: 'static + ?Sized> Mutex<T> {
339 /// Acquires a `Mutex` and performs a computation on its guarded value in a
340 /// separate task. Returns a `Future` containing the result of the
341 /// computation.
342 ///
343 /// When using Tokio, this method will often hold the `Mutex` for less time
344 /// than chaining a computation to [`lock`](#method.lock). The reason is
345 /// that Tokio polls all tasks promptly upon notification. However, Tokio
346 /// does not guarantee that it will poll all futures promptly when their
347 /// owning task gets notified. So it's best to hold `Mutex`es within their
348 /// own tasks, lest their continuations get blocked by slow stacked
349 /// combinators.
350 ///
351 /// # Examples
352 ///
353 /// ```
354 /// # use futures_locks::*;
355 /// # use futures::{Future, future::ready};
356 /// # use tokio::runtime::Runtime;
357 /// # fn main() {
358 /// let mtx = Mutex::<u32>::new(0);
359 /// let mut rt = Runtime::new().unwrap();
360 /// rt.block_on(async {
361 /// mtx.with(|mut guard| {
362 /// *guard += 5;
363 /// ready::<()>(())
364 /// }).await
365 /// });
366 /// assert_eq!(mtx.try_unwrap().unwrap(), 5);
367 /// # }
368 /// ```
369 #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
370 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
371 pub fn with<B, F, R>(&self, f: F)
372 -> impl Future<Output = R>
373 where F: FnOnce(MutexGuard<T>) -> B + Send + 'static,
374 B: Future<Output = R> + Send + 'static,
375 R: Send + 'static,
376 T: Send
377 {
378 let jh = tokio::spawn({
379 let fut = self.lock();
380 async move { f(fut.await).await }
381 });
382
383 async move { jh.await.unwrap() }
384 }
385
386 /// Like [`with`](#method.with) but for Futures that aren't `Send`.
387 /// Spawns a new task on a single-threaded Runtime to complete the Future.
388 ///
389 /// # Examples
390 ///
391 /// ```
392 /// # use futures_locks::*;
393 /// # use futures::{Future, future::ready};
394 /// # use std::rc::Rc;
395 /// # use tokio::runtime::Runtime;
396 /// # fn main() {
397 /// // Note: Rc is not `Send`
398 /// let mtx = Mutex::<Rc<u32>>::new(Rc::new(0));
399 /// let mut rt = Runtime::new().unwrap();
400 /// rt.block_on(async {
401 /// mtx.with_local(|mut guard| {
402 /// *Rc::get_mut(&mut *guard).unwrap() += 5;
403 /// ready(())
404 /// }).await
405 /// });
406 /// assert_eq!(*mtx.try_unwrap().unwrap(), 5);
407 /// # }
408 /// ```
409 #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
410 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
411 pub fn with_local<B, F, R>(&self, f: F)
412 -> impl Future<Output = R>
413 where F: FnOnce(MutexGuard<T>) -> B + 'static,
414 B: Future<Output = R> + 'static + Unpin,
415 R: 'static
416 {
417 let local = task::LocalSet::new();
418 let jh = local.spawn_local({
419 let fut = self.lock();
420 async move { f(fut.await).await }
421 });
422
423 async move {
424 local.await;
425 jh.await.unwrap()
426 }
427 }
428}
429
430// Clippy doesn't like the Arc within Inner. But the access rules of the Mutex
431// make it safe to send. std::sync::Mutex has the same Send impl
432#[allow(clippy::non_send_fields_in_send_ty)]
433unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
434unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
435
436// LCOV_EXCL_START
437#[cfg(test)]
438mod t {
439 use super::*;
440
441 /// Pet Kcov
442 #[test]
443 fn debug() {
444 let m = Mutex::<u32>::new(0);
445 format!("{:?}", &m);
446 }
447
448 #[test]
449 fn test_default() {
450 let m = Mutex::default();
451 let value: u32 = m.try_unwrap().unwrap();
452 let expected = u32::default();
453
454 assert_eq!(expected, value);
455 }
456}
457// LCOV_EXCL_STOP