futures_locks/
mutex.rs

1// vim: tw=80
2
3use futures_channel::oneshot;
4use futures_task::{Context, Poll};
5use std::{
6    cell::UnsafeCell,
7    clone::Clone,
8    collections::VecDeque,
9    future::Future,
10    ops::{Deref, DerefMut},
11    pin::Pin,
12    sync
13};
14use super::{FutState, TryLockError};
15#[cfg(feature = "tokio")] use tokio::task;
16
17/// An RAII mutex guard, much like `std::sync::MutexGuard`.  The wrapped data
18/// can be accessed via its `Deref` and `DerefMut` implementations.
19#[derive(Debug)]
20pub struct MutexGuard<T: ?Sized> {
21    mutex: Mutex<T>
22}
23
24impl<T: ?Sized> Drop for MutexGuard<T> {
25    fn drop(&mut self) {
26        self.mutex.unlock();
27    }
28}
29
30impl<T: ?Sized> Deref for MutexGuard<T> {
31    type Target = T;
32
33    fn deref(&self) -> &T {
34        unsafe {&*self.mutex.inner.data.get()}
35    }
36}
37
38impl<T: ?Sized> DerefMut for MutexGuard<T> {
39    fn deref_mut(&mut self) -> &mut T {
40        unsafe {&mut *self.mutex.inner.data.get()}
41    }
42}
43
44/// A `Future` representing a pending `Mutex` acquisition.
45pub struct MutexFut<T: ?Sized> {
46    state: FutState,
47    mutex: Mutex<T>,
48}
49
50impl<T: ?Sized> MutexFut<T> {
51    fn new(state: FutState, mutex: Mutex<T>) -> Self {
52        MutexFut{state, mutex}
53    }
54}
55
56impl<T: ?Sized> Drop for MutexFut<T> {
57    fn drop(&mut self) {
58        match self.state {
59            FutState::New => {
60                // Mutex hasn't yet been modified; nothing to do
61            },
62            FutState::Pending(ref mut rx) => {
63                rx.close();
64                match rx.try_recv() {
65                    Ok(Some(())) => {
66                        // This future received ownership of the mutex, but got
67                        // dropped before it was ever polled.  Release the
68                        // mutex.
69                        self.mutex.unlock()
70                    },
71                    Ok(None) => {
72                        // Dropping the Future before it acquires the Mutex is
73                        // equivalent to cancelling it.
74                    },
75                    Err(oneshot::Canceled) => {
76                        // Never received ownership of the mutex
77                    }
78                }
79            },
80            FutState::Acquired => {
81                // The MutexGuard will take care of releasing the Mutex
82            }
83        }
84    }
85}
86
87impl<T: ?Sized> Future for MutexFut<T> {
88    type Output = MutexGuard<T>;
89
90    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
91        let (result, new_state) = match self.state {
92            FutState::New => {
93                let mut mtx_data = self.mutex.inner.mutex.lock()
94                    .expect("sync::Mutex::lock");
95                if mtx_data.owned {
96                    let (tx, mut rx) = oneshot::channel::<()>();
97                    mtx_data.waiters.push_back(tx);
98                    // Even though we know it isn't ready, we need to poll the
99                    // receiver in order to register our task for notification.
100                    assert!(Pin::new(&mut rx).poll(cx).is_pending());
101                    (Poll::Pending, FutState::Pending(rx))
102                } else {
103                    mtx_data.owned = true;
104                    let guard = MutexGuard{mutex: self.mutex.clone()};
105                    (Poll::Ready(guard), FutState::Acquired)
106                }
107            },
108            FutState::Pending(ref mut rx) => {
109                match Pin::new(rx).poll(cx) {
110                    Poll::Pending => return Poll::Pending,
111                    Poll::Ready(_) => {
112                        let state = FutState::Acquired;
113                        let result = Poll::Ready(
114                            MutexGuard{mutex: self.mutex.clone()}
115                        );
116                        (result, state)
117                    }  //LCOV_EXCL_LINE    kcov false negative
118                }
119            },
120            FutState::Acquired => panic!("Double-poll of ready Future")
121        };
122        self.state = new_state;
123        result
124    }
125}
126
127#[derive(Debug, Default)]
128struct MutexData {
129    owned: bool,
130    // FIFO queue of waiting tasks.
131    waiters: VecDeque<oneshot::Sender<()>>,
132}
133
134#[derive(Debug, Default)]
135struct Inner<T: ?Sized> {
136    mutex: sync::Mutex<MutexData>,
137    data: UnsafeCell<T>,
138}
139
140/// `MutexWeak` is a non-owning reference to a [`Mutex`].  `MutexWeak` is to
141/// [`Mutex`] as [`std::sync::Weak`] is to [`std::sync::Arc`].
142///
143/// # Examples
144/// ```
145/// # use futures_locks::{Mutex,MutexGuard};
146/// # fn main() {
147/// let mutex = Mutex::<u32>::new(0);
148/// let mutex_weak = Mutex::downgrade(&mutex);
149/// let mutex_new = mutex_weak.upgrade().unwrap();
150/// # }
151/// ```
152///
153/// [`Mutex`]: struct.Mutex.html
154/// [`std::sync::Weak`]: https://doc.rust-lang.org/std/sync/struct.Weak.html
155/// [`std::sync::Arc`]: https://doc.rust-lang.org/std/sync/struct.Arc.html
156#[derive(Debug)]
157pub struct MutexWeak<T: ?Sized> {
158    inner: sync::Weak<Inner<T>>,
159}
160
161impl<T: ?Sized> MutexWeak<T> {
162    /// Tries to upgrade the `MutexWeak` to `Mutex`. If the `Mutex` was dropped
163    /// then the function return `None`.
164    pub fn upgrade(&self) -> Option<Mutex<T>> {
165        if let Some(inner) = self.inner.upgrade() {
166            return Some(Mutex{inner})
167        }
168        None
169    }
170}
171
172impl<T: ?Sized> Clone for MutexWeak<T> {
173    fn clone(&self) -> MutexWeak<T> {
174        MutexWeak {inner: self.inner.clone()}
175    }
176}
177
178// Clippy doesn't like the Arc within Inner.  But the access rules of the Mutex
179// make it safe to send.  std::sync::Mutex has the same Send impl
180#[allow(clippy::non_send_fields_in_send_ty)]
181unsafe impl<T: ?Sized + Send> Send for MutexWeak<T> {}
182unsafe impl<T: ?Sized + Send> Sync for MutexWeak<T> {}
183
184/// A Futures-aware Mutex.
185///
186/// `std::sync::Mutex` cannot be used in an asynchronous environment like Tokio,
187/// because a mutex acquisition can block an entire reactor.  This class can be
188/// used instead.  It functions much like `std::sync::Mutex`.  Unlike that
189/// class, it also has a builtin `Arc`, making it accessible from multiple
190/// threads.  It's also safe to `clone`.  Also unlike `std::sync::Mutex`, this
191/// class does not detect lock poisoning.
192///
193/// # Examples
194///
195/// ```
196/// # use futures_locks::*;
197/// # use futures::executor::block_on;
198/// # use futures::{Future, FutureExt};
199/// # fn main() {
200/// let mtx = Mutex::<u32>::new(0);
201/// let fut = mtx.lock().map(|mut guard| { *guard += 5; });
202/// block_on(fut);
203/// assert_eq!(mtx.try_unwrap().unwrap(), 5);
204/// # }
205/// ```
206#[derive(Debug, Default)]
207pub struct Mutex<T: ?Sized> {
208    inner: sync::Arc<Inner<T>>,
209}
210
211impl<T: ?Sized> Clone for Mutex<T> {
212    fn clone(&self) -> Mutex<T> {
213        Mutex { inner: self.inner.clone()}
214    }
215}
216
217impl<T> Mutex<T> {
218    /// Create a new `Mutex` in the unlocked state.
219    pub fn new(t: T) -> Mutex<T> {
220        let mutex_data = MutexData {
221            owned: false,
222            waiters: VecDeque::new(),
223        };
224        let inner = Inner {
225            mutex: sync::Mutex::new(mutex_data),
226            data: UnsafeCell::new(t)
227        };  //LCOV_EXCL_LINE    kcov false negative
228        Mutex { inner: sync::Arc::new(inner)}
229    }
230
231    /// Consumes the `Mutex` and returns the wrapped data.  If the `Mutex` still
232    /// has multiple references (not necessarily locked), returns a copy of
233    /// `self` instead.
234    pub fn try_unwrap(self) -> Result<T, Mutex<T>> {
235        match sync::Arc::try_unwrap(self.inner) {
236            Ok(inner) => Ok({
237                // `unsafe` is no longer needed as of somewhere around 1.25.0.
238                // https://github.com/rust-lang/rust/issues/35067
239                #[allow(unused_unsafe)]
240                unsafe { inner.data.into_inner() }
241            }),
242            Err(arc) => Err(Mutex {inner: arc})
243        }
244    }
245}
246
247impl<T: ?Sized> Mutex<T> {
248    /// Create a [`MutexWeak`] reference to this `Mutex`.
249    ///
250    /// [`MutexWeak`]: struct.MutexWeak.html
251    pub fn downgrade(this: &Mutex<T>) -> MutexWeak<T> {
252        MutexWeak {inner: sync::Arc::<Inner<T>>::downgrade(&this.inner)}
253    }
254
255    /// Returns a reference to the underlying data, if there are no other
256    /// clones of the `Mutex`.
257    ///
258    /// Since this call borrows the `Mutex` mutably, no actual locking takes
259    /// place -- the mutable borrow statically guarantees no locks exist.
260    /// However, if the `Mutex` has already been cloned, then `None` will be
261    /// returned instead.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// # use futures_locks::*;
267    /// # fn main() {
268    /// let mut mtx = Mutex::<u32>::new(0);
269    /// *mtx.get_mut().unwrap() += 5;
270    /// assert_eq!(mtx.try_unwrap().unwrap(), 5);
271    /// # }
272    /// ```
273    pub fn get_mut(&mut self) -> Option<&mut T> {
274        if let Some(inner) = sync::Arc::get_mut(&mut self.inner) {
275            let lock_data = inner.mutex.get_mut().unwrap();
276            let data = unsafe { inner.data.get().as_mut() }.unwrap();
277            debug_assert!(!lock_data.owned);
278            Some(data)
279        } else {
280            None
281        }
282    }
283
284    /// Acquires a `Mutex`, blocking the task in the meantime.  When the
285    /// returned `Future` is ready, this task will have sole access to the
286    /// protected data.
287    pub fn lock(&self) -> MutexFut<T> {
288        MutexFut::new(FutState::New, self.clone())
289    }
290
291    /// Attempts to acquire the lock.
292    ///
293    /// If the operation would block, returns `Err` instead.  Otherwise, returns
294    /// a guard (not a `Future`).
295    ///
296    /// # Examples
297    /// ```
298    /// # use futures_locks::*;
299    /// # fn main() {
300    /// let mut mtx = Mutex::<u32>::new(0);
301    /// match mtx.try_lock() {
302    ///     Ok(mut guard) => *guard += 5,
303    ///     Err(_) => println!("Better luck next time!")
304    /// };
305    /// # }
306    /// ```
307    pub fn try_lock(&self) -> Result<MutexGuard<T>, TryLockError> {
308        let mut mtx_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
309        if mtx_data.owned {
310            Err(TryLockError)
311        } else {
312            mtx_data.owned = true;
313            Ok(MutexGuard{mutex: self.clone()})
314        }
315    }
316
317    /// Release the `Mutex`
318    fn unlock(&self) {
319        let mut mtx_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
320        assert!(mtx_data.owned);
321
322        while let Some(tx) = mtx_data.waiters.pop_front() {
323            if tx.send(()).is_ok() {
324                return;
325            }
326            // An error indicates that the waiter's future was dropped
327        }
328        // Relinquish ownership
329        mtx_data.owned = false;
330    }
331
332    /// Returns true if the two `Mutex` point to the same data else false.
333    pub fn ptr_eq(this: &Mutex<T>, other: &Mutex<T>) -> bool {
334        sync::Arc::ptr_eq(&this.inner, &other.inner)
335    }
336}
337
338impl<T: 'static + ?Sized> Mutex<T> {
339    /// Acquires a `Mutex` and performs a computation on its guarded value in a
340    /// separate task.  Returns a `Future` containing the result of the
341    /// computation.
342    ///
343    /// When using Tokio, this method will often hold the `Mutex` for less time
344    /// than chaining a computation to [`lock`](#method.lock).  The reason is
345    /// that Tokio polls all tasks promptly upon notification.  However, Tokio
346    /// does not guarantee that it will poll all futures promptly when their
347    /// owning task gets notified.  So it's best to hold `Mutex`es within their
348    /// own tasks, lest their continuations get blocked by slow stacked
349    /// combinators.
350    ///
351    /// # Examples
352    ///
353    /// ```
354    /// # use futures_locks::*;
355    /// # use futures::{Future, future::ready};
356    /// # use tokio::runtime::Runtime;
357    /// # fn main() {
358    /// let mtx = Mutex::<u32>::new(0);
359    /// let mut rt = Runtime::new().unwrap();
360    /// rt.block_on(async {
361    ///     mtx.with(|mut guard| {
362    ///         *guard += 5;
363    ///         ready::<()>(())
364    ///     }).await
365    /// });
366    /// assert_eq!(mtx.try_unwrap().unwrap(), 5);
367    /// # }
368    /// ```
369    #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
370    #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
371    pub fn with<B, F, R>(&self, f: F)
372        -> impl Future<Output = R>
373        where F: FnOnce(MutexGuard<T>) -> B + Send + 'static,
374              B: Future<Output = R> + Send + 'static,
375              R: Send + 'static,
376              T: Send
377    {
378        let jh = tokio::spawn({
379            let fut = self.lock();
380            async move { f(fut.await).await }
381        });
382
383        async move { jh.await.unwrap() }
384    }
385
386    /// Like [`with`](#method.with) but for Futures that aren't `Send`.
387    /// Spawns a new task on a single-threaded Runtime to complete the Future.
388    ///
389    /// # Examples
390    ///
391    /// ```
392    /// # use futures_locks::*;
393    /// # use futures::{Future, future::ready};
394    /// # use std::rc::Rc;
395    /// # use tokio::runtime::Runtime;
396    /// # fn main() {
397    /// // Note: Rc is not `Send`
398    /// let mtx = Mutex::<Rc<u32>>::new(Rc::new(0));
399    /// let mut rt = Runtime::new().unwrap();
400    /// rt.block_on(async {
401    ///     mtx.with_local(|mut guard| {
402    ///         *Rc::get_mut(&mut *guard).unwrap() += 5;
403    ///         ready(())
404    ///     }).await
405    /// });
406    /// assert_eq!(*mtx.try_unwrap().unwrap(), 5);
407    /// # }
408    /// ```
409    #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
410    #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
411    pub fn with_local<B, F, R>(&self, f: F)
412        -> impl Future<Output = R>
413        where F: FnOnce(MutexGuard<T>) -> B + 'static,
414              B: Future<Output = R> + 'static + Unpin,
415              R: 'static
416    {
417        let local = task::LocalSet::new();
418        let jh = local.spawn_local({
419            let fut = self.lock();
420            async move { f(fut.await).await }
421        });
422
423        async move {
424            local.await;
425            jh.await.unwrap()
426        }
427    }
428}
429
430// Clippy doesn't like the Arc within Inner.  But the access rules of the Mutex
431// make it safe to send.  std::sync::Mutex has the same Send impl
432#[allow(clippy::non_send_fields_in_send_ty)]
433unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
434unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
435
436// LCOV_EXCL_START
437#[cfg(test)]
438mod t {
439    use super::*;
440
441    /// Pet Kcov
442    #[test]
443    fn debug() {
444        let m = Mutex::<u32>::new(0);
445        format!("{:?}", &m);
446    }
447
448    #[test]
449    fn test_default() {
450        let m = Mutex::default();
451        let value: u32 = m.try_unwrap().unwrap();
452        let expected = u32::default();
453
454        assert_eq!(expected, value);
455    }
456}
457// LCOV_EXCL_STOP