futures_locks/
rwlock.rs

1// vim: tw=80
2
3use futures_channel::oneshot;
4use futures_task::{Context, Poll};
5use std::{
6    cell::UnsafeCell,
7    clone::Clone,
8    collections::VecDeque,
9    future::Future,
10    ops::{Deref, DerefMut},
11    pin::Pin,
12    sync,
13};
14use super::{FutState, TryLockError};
15#[cfg(feature = "tokio")] use tokio::task;
16
17/// An RAII guard, much like `std::sync::RwLockReadGuard`.  The wrapped data can
18/// be accessed via its `Deref` implementation.
19#[derive(Debug)]
20pub struct RwLockReadGuard<T: ?Sized> {
21    rwlock: RwLock<T>
22}
23
24impl<T: ?Sized> Deref for RwLockReadGuard<T> {
25    type Target = T;
26
27    fn deref(&self) -> &T {
28        unsafe {&*self.rwlock.inner.data.get()}
29    }
30}
31
32impl<T: ?Sized> Drop for RwLockReadGuard<T> {
33    fn drop(&mut self) {
34        self.rwlock.unlock_reader();
35    }
36}
37
38/// An RAII guard, much like `std::sync::RwLockWriteGuard`.  The wrapped data
39/// can be accessed via its `Deref`  and `DerefMut` implementations.
40#[derive(Debug)]
41pub struct RwLockWriteGuard<T: ?Sized> {
42    rwlock: RwLock<T>
43}
44
45impl<T: ?Sized> Deref for RwLockWriteGuard<T> {
46    type Target = T;
47
48    fn deref(&self) -> &T {
49        unsafe {&*self.rwlock.inner.data.get()}
50    }
51}
52
53impl<T: ?Sized> DerefMut for RwLockWriteGuard<T> {
54    fn deref_mut(&mut self) -> &mut T {
55        unsafe {&mut *self.rwlock.inner.data.get()}
56    }
57}
58
59impl<T: ?Sized> Drop for RwLockWriteGuard<T> {
60    fn drop(&mut self) {
61        self.rwlock.unlock_writer();
62    }
63}
64
65/// A `Future` representing a pending `RwLock` shared acquisition.
66pub struct RwLockReadFut<T: ?Sized> {
67    state: FutState,
68    rwlock: RwLock<T>,
69}
70
71impl<T: ?Sized> RwLockReadFut<T> {
72    fn new(state: FutState, rwlock: RwLock<T>) -> Self {
73        RwLockReadFut{state, rwlock}
74    }
75}
76
77impl<T: ?Sized> Drop for RwLockReadFut<T> {
78    fn drop(&mut self) {
79        match self.state {
80            FutState::New => {
81                // RwLock hasn't yet been modified; nothing to do
82            },
83            FutState::Pending(ref mut rx) => {
84                rx.close();
85                match rx.try_recv() {
86                    Ok(Some(())) => {
87                        // This future received ownership of the lock, but got
88                        // dropped before it was ever polled.  Release the
89                        // lock.
90                        self.rwlock.unlock_reader()
91                    },
92                    Ok(None) => {
93                        // Dropping the Future before it acquires the lock is
94                        // equivalent to cancelling it.
95                    },
96                    Err(oneshot::Canceled) => {
97                        // Never received ownership of the lock
98                    }
99                }
100            },
101            FutState::Acquired => {
102                // The RwLockReadGuard will take care of releasing the RwLock
103            }
104        }
105    }
106}
107
108impl<T: ?Sized> Future for RwLockReadFut<T> {
109    type Output = RwLockReadGuard<T>;
110
111    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
112        let (result, new_state) = match self.state {
113            FutState::New => {
114                let mut lock_data = self.rwlock.inner.mutex.lock()
115                    .expect("sync::Mutex::lock");
116                if lock_data.exclusive {
117                    let (tx, mut rx) = oneshot::channel::<()>();
118                    lock_data.read_waiters.push_back(tx);
119                    // Even though we know it isn't ready, we need to poll the
120                    // receiver in order to register our task for notification.
121                    assert!(Pin::new(&mut rx).poll(cx).is_pending());
122                    (Poll::Pending, FutState::Pending(rx))
123                } else {
124                    lock_data.num_readers += 1;
125                    let guard = RwLockReadGuard{rwlock: self.rwlock.clone()};
126                    (Poll::Ready(guard), FutState::Acquired)
127                }
128            },
129            FutState::Pending(ref mut rx) => {
130                match Pin::new(rx).poll(cx) {
131                    Poll::Pending => return Poll::Pending,
132                    Poll::Ready(_) => {
133                        let state = FutState::Acquired;
134                        let result = Poll::Ready(
135                            RwLockReadGuard{rwlock: self.rwlock.clone()}
136                        );
137                        (result, state)
138                    }  // LCOV_EXCL_LINE   kcov false negative
139                }
140            },
141            FutState::Acquired => panic!("Double-poll of ready Future")
142        };
143        self.state = new_state;
144        result
145    }
146}
147
148/// A `Future` representing a pending `RwLock` exclusive acquisition.
149pub struct RwLockWriteFut<T: ?Sized> {
150    state: FutState,
151    rwlock: RwLock<T>,
152}
153
154impl<T: ?Sized> RwLockWriteFut<T> {
155    fn new(state: FutState, rwlock: RwLock<T>) -> Self {
156        RwLockWriteFut{state, rwlock}
157    }
158}
159
160impl<T: ?Sized> Drop for RwLockWriteFut<T> {
161    fn drop(&mut self) {
162        match self.state {
163            FutState::New => {
164                // RwLock hasn't yet been modified; nothing to do
165            },
166            FutState::Pending(ref mut rx) => {
167                rx.close();
168                match rx.try_recv() {
169                    Ok(Some(())) => {
170                        // This future received ownership of the lock, but got
171                        // dropped before it was ever polled.  Release the
172                        // lock.
173                        self.rwlock.unlock_writer()
174                    },
175                    Ok(None) => {
176                        // Dropping the Future before it acquires the lock is
177                        // equivalent to cancelling it.
178                    },
179                    Err(oneshot::Canceled) => {
180                        // Never received ownership of the lock
181                    }
182                }
183            },
184            FutState::Acquired => {
185                // The RwLockWriteGuard will take care of releasing the RwLock
186            }
187        }
188    }
189}
190
191impl<T: ?Sized> Future for RwLockWriteFut<T> {
192    type Output = RwLockWriteGuard<T>;
193
194    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
195        let (result, new_state) = match self.state {
196            FutState::New => {
197                let mut lock_data = self.rwlock.inner.mutex.lock()
198                    .expect("sync::Mutex::lock");
199                if lock_data.exclusive || lock_data.num_readers > 0 {
200                    let (tx, mut rx) = oneshot::channel::<()>();
201                    lock_data.write_waiters.push_back(tx);
202                    // Even though we know it isn't ready, we need to poll the
203                    // receiver in order to register our task for notification.
204                    assert!(Pin::new(&mut rx).poll(cx).is_pending());
205                    (Poll::Pending, FutState::Pending(rx))
206                } else {
207                    lock_data.exclusive = true;
208                    let guard = RwLockWriteGuard{rwlock: self.rwlock.clone()};
209                    (Poll::Ready(guard), FutState::Acquired)
210                }
211            },
212            FutState::Pending(ref mut rx) => {
213                match Pin::new(rx).poll(cx) {
214                    Poll::Pending => return Poll::Pending,
215                    Poll::Ready(_) => {
216                        let state = FutState::Acquired;
217                        let result = Poll::Ready(
218                            RwLockWriteGuard{rwlock: self.rwlock.clone()}
219                        );
220                        (result, state)
221                    }  // LCOV_EXCL_LINE   kcov false negative
222                }
223            },
224            FutState::Acquired => panic!("Double-poll of ready Future")
225        };
226        self.state = new_state;
227        result
228    }
229}
230
231#[derive(Debug, Default)]
232struct RwLockData {
233    /// True iff the `RwLock` is currently exclusively owned
234    exclusive: bool,
235
236    /// The number of tasks that currently have shared ownership of the RwLock
237    num_readers: u32,
238
239    // FIFO queue of waiting readers
240    read_waiters: VecDeque<oneshot::Sender<()>>,
241
242    // FIFO queue of waiting writers
243    write_waiters: VecDeque<oneshot::Sender<()>>,
244}
245
246#[derive(Debug, Default)]
247struct Inner<T: ?Sized> {
248    mutex: sync::Mutex<RwLockData>,
249    data: UnsafeCell<T>,
250}
251
252/// A Futures-aware RwLock.
253///
254/// `std::sync::RwLock` cannot be used in an asynchronous environment like
255/// Tokio, because an acquisition can block an entire reactor.  This class can
256/// be used instead.  It functions much like `std::sync::RwLock`.  Unlike that
257/// class, it also has a builtin `Arc`, making it accessible from multiple
258/// threads.  It's also safe to `clone`.  Also unlike `std::sync::RwLock`, this
259/// class does not detect lock poisoning.
260#[derive(Debug, Default)]
261pub struct RwLock<T: ?Sized> {
262    inner: sync::Arc<Inner<T>>,
263}
264
265impl<T: ?Sized> Clone for RwLock<T> {
266    fn clone(&self) -> RwLock<T> {
267        RwLock { inner: self.inner.clone()}
268    }
269}
270
271impl<T> RwLock<T> {
272    /// Create a new `RwLock` in the unlocked state.
273    pub fn new(t: T) -> RwLock<T> {
274        let lock_data = RwLockData {
275            exclusive: false,
276            num_readers: 0,
277            read_waiters: VecDeque::new(),
278            write_waiters: VecDeque::new(),
279        };  // LCOV_EXCL_LINE   kcov false negative
280        let inner = Inner {
281            mutex: sync::Mutex::new(lock_data),
282            data: UnsafeCell::new(t)
283        };  // LCOV_EXCL_LINE   kcov false negative
284        RwLock { inner: sync::Arc::new(inner)}
285    }
286
287    /// Consumes the `RwLock` and returns the wrapped data.  If the `RwLock`
288    /// still has multiple references (not necessarily locked), returns a copy
289    /// of `self` instead.
290    pub fn try_unwrap(self) -> Result<T, RwLock<T>> {
291        match sync::Arc::try_unwrap(self.inner) {
292            Ok(inner) => Ok({
293                // `unsafe` is no longer needed as of somewhere around 1.25.0.
294                // https://github.com/rust-lang/rust/issues/35067
295                #[allow(unused_unsafe)]
296                unsafe { inner.data.into_inner() }
297            }),
298            Err(arc) => Err(RwLock {inner: arc})
299        }
300    }
301}
302
303impl<T: ?Sized> RwLock<T> {
304    /// Returns a reference to the underlying data, if there are no other
305    /// clones of the `RwLock`.
306    ///
307    /// Since this call borrows the `RwLock` mutably, no actual locking takes
308    /// place -- the mutable borrow statically guarantees no locks exist.
309    /// However, if the `RwLock` has already been cloned, then `None` will be
310    /// returned instead.
311    ///
312    /// # Examples
313    ///
314    /// ```
315    /// # use futures_locks::*;
316    /// # fn main() {
317    /// let mut lock = RwLock::<u32>::new(0);
318    /// *lock.get_mut().unwrap() += 5;
319    /// assert_eq!(lock.try_unwrap().unwrap(), 5);
320    /// # }
321    /// ```
322    pub fn get_mut(&mut self) -> Option<&mut T> {
323        if let Some(inner) = sync::Arc::get_mut(&mut self.inner) {
324            let lock_data = inner.mutex.get_mut().unwrap();
325            let data = unsafe { inner.data.get().as_mut() }.unwrap();
326            debug_assert!(!lock_data.exclusive);
327            debug_assert_eq!(lock_data.num_readers, 0);
328            Some(data)
329        } else {
330            None
331        }
332    }
333
334    /// Acquire the `RwLock` nonexclusively, read-only, blocking the task in the
335    /// meantime.
336    ///
337    /// When the returned `Future` is ready, then this task will have read-only
338    /// access to the protected data.
339    ///
340    /// # Examples
341    /// ```
342    /// # use futures_locks::*;
343    /// # use futures::executor::block_on;
344    /// # use futures::{Future, FutureExt};
345    /// # fn main() {
346    /// let rwlock = RwLock::<u32>::new(42);
347    /// let fut = rwlock.read().map(|mut guard| { *guard });
348    /// assert_eq!(block_on(fut), 42);
349    /// # }
350    ///
351    /// ```
352    pub fn read(&self) -> RwLockReadFut<T> {
353        RwLockReadFut::new(FutState::New, self.clone())
354    }
355
356    /// Acquire the `RwLock` exclusively, read-write, blocking the task in the
357    /// meantime.
358    ///
359    /// When the returned `Future` is ready, then this task will have read-write
360    /// access to the protected data.
361    ///
362    /// # Examples
363    /// ```
364    /// # use futures_locks::*;
365    /// # use futures::executor::block_on;
366    /// # use futures::{Future, FutureExt};
367    /// # fn main() {
368    /// let rwlock = RwLock::<u32>::new(42);
369    /// let fut = rwlock.write().map(|mut guard| { *guard = 5;});
370    /// block_on(fut);
371    /// assert_eq!(rwlock.try_unwrap().unwrap(), 5);
372    /// # }
373    ///
374    /// ```
375    pub fn write(&self) -> RwLockWriteFut<T> {
376        RwLockWriteFut::new(FutState::New, self.clone())
377    }
378
379    /// Attempts to acquire the `RwLock` nonexclusively.
380    ///
381    /// If the operation would block, returns `Err` instead.  Otherwise, returns
382    /// a guard (not a `Future`).
383    ///
384    /// # Examples
385    /// ```
386    /// # use futures_locks::*;
387    /// # fn main() {
388    /// let mut lock = RwLock::<u32>::new(5);
389    /// let r = match lock.try_read() {
390    ///     Ok(guard) => *guard,
391    ///     Err(_) => panic!("Better luck next time!")
392    /// };
393    /// assert_eq!(5, r);
394    /// # }
395    /// ```
396    pub fn try_read(&self) -> Result<RwLockReadGuard<T>, TryLockError> {
397        let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
398        if lock_data.exclusive {
399            Err(TryLockError)
400        } else {
401            lock_data.num_readers += 1;
402            Ok(RwLockReadGuard{rwlock: self.clone()})
403        }
404    }
405
406    /// Attempts to acquire the `RwLock` exclusively.
407    ///
408    /// If the operation would block, returns `Err` instead.  Otherwise, returns
409    /// a guard (not a `Future`).
410    ///
411    /// # Examples
412    /// ```
413    /// # use futures_locks::*;
414    /// # fn main() {
415    /// let mut lock = RwLock::<u32>::new(5);
416    /// match lock.try_write() {
417    ///     Ok(mut guard) => *guard += 5,
418    ///     Err(_) => panic!("Better luck next time!")
419    /// }
420    /// assert_eq!(10, lock.try_unwrap().unwrap());
421    /// # }
422    /// ```
423    pub fn try_write(&self) -> Result<RwLockWriteGuard<T>, TryLockError> {
424        let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
425        if lock_data.exclusive || lock_data.num_readers > 0 {
426            Err(TryLockError)
427        } else {
428            lock_data.exclusive = true;
429            Ok(RwLockWriteGuard{rwlock: self.clone()})
430        }
431    }
432
433    /// Release a shared lock of an `RwLock`.
434    fn unlock_reader(&self) {
435        let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
436        assert!(lock_data.num_readers > 0);
437        assert!(!lock_data.exclusive);
438        assert_eq!(lock_data.read_waiters.len(), 0);
439        lock_data.num_readers -= 1;
440        if lock_data.num_readers == 0 {
441            while let Some(tx) = lock_data.write_waiters.pop_front() {
442                if tx.send(()).is_ok() {
443                    lock_data.exclusive = true;
444                    return
445                }
446            }
447        }
448    }
449
450    /// Release an exclusive lock of an `RwLock`.
451    fn unlock_writer(&self) {
452        let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
453        assert!(lock_data.num_readers == 0);
454        assert!(lock_data.exclusive);
455
456        // First try to wake up any writers
457        while let Some(tx) = lock_data.write_waiters.pop_front() {
458            if tx.send(()).is_ok() {
459                return;
460            }
461        }
462
463        // If there are no writers, try to wake up readers
464        lock_data.exclusive = false;
465        lock_data.num_readers += lock_data.read_waiters.len() as u32;
466        for tx in lock_data.read_waiters.drain(..) {
467            // Ignore errors, which are due to a reader's future getting
468            // dropped before it was ready
469            let _ = tx.send(());
470        }
471    }
472}
473
474impl<T: 'static + ?Sized> RwLock<T> {
475    /// Acquires a `RwLock` nonexclusively and performs a computation on its
476    /// guarded value in a separate task.  Returns a `Future` containing the
477    /// result of the computation.
478    ///
479    /// When using Tokio, this method will often hold the `RwLock` for less time
480    /// than chaining a computation to [`read`](#method.read).  The reason is
481    /// that Tokio polls all tasks promptly upon notification.  However, Tokio
482    /// does not guarantee that it will poll all futures promptly when their
483    /// owning task gets notified.  So it's best to hold `RwLock`s within their
484    /// own tasks, lest their continuations get blocked by slow stacked
485    /// combinators.
486    ///
487    /// # Examples
488    ///
489    /// ```
490    /// # use futures_locks::*;
491    /// # use futures::{Future, future::ready};
492    /// # use tokio::runtime::Runtime;
493    /// # fn main() {
494    /// let rwlock = RwLock::<u32>::new(5);
495    /// let mut rt = Runtime::new().unwrap();
496    /// let r = rt.block_on(async {
497    ///     rwlock.with_read(|mut guard| {
498    ///         ready(*guard)
499    ///     }).await
500    /// });
501    /// assert_eq!(r, 5);
502    /// # }
503    /// ```
504    #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
505    #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
506    pub fn with_read<B, F, R>(&self, f: F)
507        -> impl Future<Output = R>
508        where F: FnOnce(RwLockReadGuard<T>) -> B + Send + 'static,
509              B: Future<Output = R> + Send + 'static,
510              R: Send + 'static,
511              T: Send
512    {
513        let jh = tokio::spawn({
514            let fut = self.read();
515            async move { f(fut.await).await }
516        });
517
518        async move { jh.await.unwrap() }
519    }
520
521    /// Like [`with_read`](#method.with_read) but for Futures that aren't
522    /// `Send`.  Spawns a new task on a single-threaded Runtime to complete the
523    /// Future.
524    ///
525    /// # Examples
526    ///
527    /// ```
528    /// # use futures_locks::*;
529    /// # use futures::{Future, future::ready};
530    /// # use std::rc::Rc;
531    /// # use tokio::runtime::Runtime;
532    /// # fn main() {
533    /// // Note: Rc is not `Send`
534    /// let rwlock = RwLock::<Rc<u32>>::new(Rc::new(5));
535    /// let mut rt = Runtime::new().unwrap();
536    /// let r = rt.block_on(async {
537    ///     rwlock.with_read_local(|mut guard| {
538    ///         ready(**guard)
539    ///     }).await
540    /// });
541    /// assert_eq!(r, 5);
542    /// # }
543    /// ```
544    #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
545    #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
546    pub fn with_read_local<B, F, R>(&self, f: F)
547        -> impl Future<Output = R>
548        where F: FnOnce(RwLockReadGuard<T>) -> B + 'static + Unpin,
549              B: Future<Output = R> + 'static,
550              R: 'static
551    {
552        let local = task::LocalSet::new();
553        let jh = local.spawn_local({
554            let fut = self.read();
555            async move { f(fut.await).await }
556        });
557
558        async move {
559            local.await;
560            jh.await.unwrap()
561        }
562    }
563
564    /// Acquires a `RwLock` exclusively and performs a computation on its
565    /// guarded value in a separate task.  Returns a `Future` containing the
566    /// result of the computation.
567    ///
568    /// When using Tokio, this method will often hold the `RwLock` for less time
569    /// than chaining a computation to [`write`](#method.write).  The reason is
570    /// that Tokio polls all tasks promptly upon notification.  However, Tokio
571    /// does not guarantee that it will poll all futures promptly when their
572    /// owning task gets notified.  So it's best to hold `RwLock`s within their
573    /// own tasks, lest their continuations get blocked by slow stacked
574    /// combinators.
575    ///
576    /// # Examples
577    ///
578    /// ```
579    /// # use futures::{Future, future::ready};
580    /// # use futures_locks::*;
581    /// # use tokio::runtime::Runtime;
582    /// # fn main() {
583    /// let rwlock = RwLock::<u32>::new(0);
584    /// let mut rt = Runtime::new().unwrap();
585    /// let r = rt.block_on(async {
586    ///     rwlock.with_write(|mut guard| {
587    ///         *guard += 5;
588    ///         ready(())
589    ///     }).await
590    /// });
591    /// assert_eq!(rwlock.try_unwrap().unwrap(), 5);
592    /// # }
593    /// ```
594    #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
595    #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
596    pub fn with_write<B, F, R>(&self, f: F)
597        -> impl Future<Output = R>
598        where F: FnOnce(RwLockWriteGuard<T>) -> B + Send + 'static,
599              B: Future<Output = R> + Send + 'static,
600              R: Send + 'static,
601              T: Send
602    {
603        let jh = tokio::spawn({
604            let fut = self.write();
605            async move { f(fut.await).await }
606        });
607
608        async move { jh.await.unwrap() }
609    }
610
611    /// Like [`with_write`](#method.with_write) but for Futures that aren't
612    /// `Send`.  Spawns a new task on a single-threaded Runtime to complete the
613    /// Future.
614    ///
615    /// # Examples
616    ///
617    /// ```
618    /// # use futures::{Future, future::ready};
619    /// # use futures_locks::*;
620    /// # use std::rc::Rc;
621    /// # use tokio::runtime::Runtime;
622    /// # fn main() {
623    /// // Note: Rc is not `Send`
624    /// let rwlock = RwLock::<Rc<u32>>::new(Rc::new(0));
625    /// let mut rt = Runtime::new().unwrap();
626    /// let r = rt.block_on(async {
627    ///     rwlock.with_write_local(|mut guard| {
628    ///         *Rc::get_mut(&mut *guard).unwrap() += 5;
629    ///         ready(())
630    ///     }).await
631    /// });
632    /// assert_eq!(*rwlock.try_unwrap().unwrap(), 5);
633    /// # }
634    /// ```
635    #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
636    #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
637    pub fn with_write_local<B, F, R>(&self, f: F)
638        -> impl Future<Output = R>
639        where F: FnOnce(RwLockWriteGuard<T>) -> B + 'static + Unpin,
640              B: Future<Output = R> + 'static,
641              R: 'static
642    {
643        let local = task::LocalSet::new();
644        let jh = local.spawn_local({
645            let fut = self.write();
646            async move { f(fut.await).await }
647        });
648
649        async move {
650            local.await;
651            jh.await.unwrap()
652        }
653    }
654}
655
656// Clippy doesn't like the Arc within Inner.  But the access rules of the RwLock
657// make it safe to send.  std::sync::RwLock has the same Send impl
658#[allow(clippy::non_send_fields_in_send_ty)]
659unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
660unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}
661
662// LCOV_EXCL_START
663#[cfg(test)]
664mod t {
665    use super::*;
666
667    /// Pet Kcov
668    #[test]
669    fn debug() {
670        let m = RwLock::<u32>::new(0);
671        format!("{:?}", &m);
672    }
673
674    #[test]
675    fn test_default() {
676        let lock = RwLock::default();
677        let value: u32 = lock.try_unwrap().unwrap();
678        let expected = u32::default();
679
680        assert_eq!(expected, value);
681    }
682}
683// LCOV_EXCL_STOP