futures_locks/rwlock.rs
1// vim: tw=80
2
3use futures_channel::oneshot;
4use futures_task::{Context, Poll};
5use std::{
6 cell::UnsafeCell,
7 clone::Clone,
8 collections::VecDeque,
9 future::Future,
10 ops::{Deref, DerefMut},
11 pin::Pin,
12 sync,
13};
14use super::{FutState, TryLockError};
15#[cfg(feature = "tokio")] use tokio::task;
16
17/// An RAII guard, much like `std::sync::RwLockReadGuard`. The wrapped data can
18/// be accessed via its `Deref` implementation.
19#[derive(Debug)]
20pub struct RwLockReadGuard<T: ?Sized> {
21 rwlock: RwLock<T>
22}
23
24impl<T: ?Sized> Deref for RwLockReadGuard<T> {
25 type Target = T;
26
27 fn deref(&self) -> &T {
28 unsafe {&*self.rwlock.inner.data.get()}
29 }
30}
31
32impl<T: ?Sized> Drop for RwLockReadGuard<T> {
33 fn drop(&mut self) {
34 self.rwlock.unlock_reader();
35 }
36}
37
38/// An RAII guard, much like `std::sync::RwLockWriteGuard`. The wrapped data
39/// can be accessed via its `Deref` and `DerefMut` implementations.
40#[derive(Debug)]
41pub struct RwLockWriteGuard<T: ?Sized> {
42 rwlock: RwLock<T>
43}
44
45impl<T: ?Sized> Deref for RwLockWriteGuard<T> {
46 type Target = T;
47
48 fn deref(&self) -> &T {
49 unsafe {&*self.rwlock.inner.data.get()}
50 }
51}
52
53impl<T: ?Sized> DerefMut for RwLockWriteGuard<T> {
54 fn deref_mut(&mut self) -> &mut T {
55 unsafe {&mut *self.rwlock.inner.data.get()}
56 }
57}
58
59impl<T: ?Sized> Drop for RwLockWriteGuard<T> {
60 fn drop(&mut self) {
61 self.rwlock.unlock_writer();
62 }
63}
64
65/// A `Future` representing a pending `RwLock` shared acquisition.
66pub struct RwLockReadFut<T: ?Sized> {
67 state: FutState,
68 rwlock: RwLock<T>,
69}
70
71impl<T: ?Sized> RwLockReadFut<T> {
72 fn new(state: FutState, rwlock: RwLock<T>) -> Self {
73 RwLockReadFut{state, rwlock}
74 }
75}
76
77impl<T: ?Sized> Drop for RwLockReadFut<T> {
78 fn drop(&mut self) {
79 match self.state {
80 FutState::New => {
81 // RwLock hasn't yet been modified; nothing to do
82 },
83 FutState::Pending(ref mut rx) => {
84 rx.close();
85 match rx.try_recv() {
86 Ok(Some(())) => {
87 // This future received ownership of the lock, but got
88 // dropped before it was ever polled. Release the
89 // lock.
90 self.rwlock.unlock_reader()
91 },
92 Ok(None) => {
93 // Dropping the Future before it acquires the lock is
94 // equivalent to cancelling it.
95 },
96 Err(oneshot::Canceled) => {
97 // Never received ownership of the lock
98 }
99 }
100 },
101 FutState::Acquired => {
102 // The RwLockReadGuard will take care of releasing the RwLock
103 }
104 }
105 }
106}
107
108impl<T: ?Sized> Future for RwLockReadFut<T> {
109 type Output = RwLockReadGuard<T>;
110
111 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
112 let (result, new_state) = match self.state {
113 FutState::New => {
114 let mut lock_data = self.rwlock.inner.mutex.lock()
115 .expect("sync::Mutex::lock");
116 if lock_data.exclusive {
117 let (tx, mut rx) = oneshot::channel::<()>();
118 lock_data.read_waiters.push_back(tx);
119 // Even though we know it isn't ready, we need to poll the
120 // receiver in order to register our task for notification.
121 assert!(Pin::new(&mut rx).poll(cx).is_pending());
122 (Poll::Pending, FutState::Pending(rx))
123 } else {
124 lock_data.num_readers += 1;
125 let guard = RwLockReadGuard{rwlock: self.rwlock.clone()};
126 (Poll::Ready(guard), FutState::Acquired)
127 }
128 },
129 FutState::Pending(ref mut rx) => {
130 match Pin::new(rx).poll(cx) {
131 Poll::Pending => return Poll::Pending,
132 Poll::Ready(_) => {
133 let state = FutState::Acquired;
134 let result = Poll::Ready(
135 RwLockReadGuard{rwlock: self.rwlock.clone()}
136 );
137 (result, state)
138 } // LCOV_EXCL_LINE kcov false negative
139 }
140 },
141 FutState::Acquired => panic!("Double-poll of ready Future")
142 };
143 self.state = new_state;
144 result
145 }
146}
147
148/// A `Future` representing a pending `RwLock` exclusive acquisition.
149pub struct RwLockWriteFut<T: ?Sized> {
150 state: FutState,
151 rwlock: RwLock<T>,
152}
153
154impl<T: ?Sized> RwLockWriteFut<T> {
155 fn new(state: FutState, rwlock: RwLock<T>) -> Self {
156 RwLockWriteFut{state, rwlock}
157 }
158}
159
160impl<T: ?Sized> Drop for RwLockWriteFut<T> {
161 fn drop(&mut self) {
162 match self.state {
163 FutState::New => {
164 // RwLock hasn't yet been modified; nothing to do
165 },
166 FutState::Pending(ref mut rx) => {
167 rx.close();
168 match rx.try_recv() {
169 Ok(Some(())) => {
170 // This future received ownership of the lock, but got
171 // dropped before it was ever polled. Release the
172 // lock.
173 self.rwlock.unlock_writer()
174 },
175 Ok(None) => {
176 // Dropping the Future before it acquires the lock is
177 // equivalent to cancelling it.
178 },
179 Err(oneshot::Canceled) => {
180 // Never received ownership of the lock
181 }
182 }
183 },
184 FutState::Acquired => {
185 // The RwLockWriteGuard will take care of releasing the RwLock
186 }
187 }
188 }
189}
190
191impl<T: ?Sized> Future for RwLockWriteFut<T> {
192 type Output = RwLockWriteGuard<T>;
193
194 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
195 let (result, new_state) = match self.state {
196 FutState::New => {
197 let mut lock_data = self.rwlock.inner.mutex.lock()
198 .expect("sync::Mutex::lock");
199 if lock_data.exclusive || lock_data.num_readers > 0 {
200 let (tx, mut rx) = oneshot::channel::<()>();
201 lock_data.write_waiters.push_back(tx);
202 // Even though we know it isn't ready, we need to poll the
203 // receiver in order to register our task for notification.
204 assert!(Pin::new(&mut rx).poll(cx).is_pending());
205 (Poll::Pending, FutState::Pending(rx))
206 } else {
207 lock_data.exclusive = true;
208 let guard = RwLockWriteGuard{rwlock: self.rwlock.clone()};
209 (Poll::Ready(guard), FutState::Acquired)
210 }
211 },
212 FutState::Pending(ref mut rx) => {
213 match Pin::new(rx).poll(cx) {
214 Poll::Pending => return Poll::Pending,
215 Poll::Ready(_) => {
216 let state = FutState::Acquired;
217 let result = Poll::Ready(
218 RwLockWriteGuard{rwlock: self.rwlock.clone()}
219 );
220 (result, state)
221 } // LCOV_EXCL_LINE kcov false negative
222 }
223 },
224 FutState::Acquired => panic!("Double-poll of ready Future")
225 };
226 self.state = new_state;
227 result
228 }
229}
230
231#[derive(Debug, Default)]
232struct RwLockData {
233 /// True iff the `RwLock` is currently exclusively owned
234 exclusive: bool,
235
236 /// The number of tasks that currently have shared ownership of the RwLock
237 num_readers: u32,
238
239 // FIFO queue of waiting readers
240 read_waiters: VecDeque<oneshot::Sender<()>>,
241
242 // FIFO queue of waiting writers
243 write_waiters: VecDeque<oneshot::Sender<()>>,
244}
245
246#[derive(Debug, Default)]
247struct Inner<T: ?Sized> {
248 mutex: sync::Mutex<RwLockData>,
249 data: UnsafeCell<T>,
250}
251
252/// A Futures-aware RwLock.
253///
254/// `std::sync::RwLock` cannot be used in an asynchronous environment like
255/// Tokio, because an acquisition can block an entire reactor. This class can
256/// be used instead. It functions much like `std::sync::RwLock`. Unlike that
257/// class, it also has a builtin `Arc`, making it accessible from multiple
258/// threads. It's also safe to `clone`. Also unlike `std::sync::RwLock`, this
259/// class does not detect lock poisoning.
260#[derive(Debug, Default)]
261pub struct RwLock<T: ?Sized> {
262 inner: sync::Arc<Inner<T>>,
263}
264
265impl<T: ?Sized> Clone for RwLock<T> {
266 fn clone(&self) -> RwLock<T> {
267 RwLock { inner: self.inner.clone()}
268 }
269}
270
271impl<T> RwLock<T> {
272 /// Create a new `RwLock` in the unlocked state.
273 pub fn new(t: T) -> RwLock<T> {
274 let lock_data = RwLockData {
275 exclusive: false,
276 num_readers: 0,
277 read_waiters: VecDeque::new(),
278 write_waiters: VecDeque::new(),
279 }; // LCOV_EXCL_LINE kcov false negative
280 let inner = Inner {
281 mutex: sync::Mutex::new(lock_data),
282 data: UnsafeCell::new(t)
283 }; // LCOV_EXCL_LINE kcov false negative
284 RwLock { inner: sync::Arc::new(inner)}
285 }
286
287 /// Consumes the `RwLock` and returns the wrapped data. If the `RwLock`
288 /// still has multiple references (not necessarily locked), returns a copy
289 /// of `self` instead.
290 pub fn try_unwrap(self) -> Result<T, RwLock<T>> {
291 match sync::Arc::try_unwrap(self.inner) {
292 Ok(inner) => Ok({
293 // `unsafe` is no longer needed as of somewhere around 1.25.0.
294 // https://github.com/rust-lang/rust/issues/35067
295 #[allow(unused_unsafe)]
296 unsafe { inner.data.into_inner() }
297 }),
298 Err(arc) => Err(RwLock {inner: arc})
299 }
300 }
301}
302
303impl<T: ?Sized> RwLock<T> {
304 /// Returns a reference to the underlying data, if there are no other
305 /// clones of the `RwLock`.
306 ///
307 /// Since this call borrows the `RwLock` mutably, no actual locking takes
308 /// place -- the mutable borrow statically guarantees no locks exist.
309 /// However, if the `RwLock` has already been cloned, then `None` will be
310 /// returned instead.
311 ///
312 /// # Examples
313 ///
314 /// ```
315 /// # use futures_locks::*;
316 /// # fn main() {
317 /// let mut lock = RwLock::<u32>::new(0);
318 /// *lock.get_mut().unwrap() += 5;
319 /// assert_eq!(lock.try_unwrap().unwrap(), 5);
320 /// # }
321 /// ```
322 pub fn get_mut(&mut self) -> Option<&mut T> {
323 if let Some(inner) = sync::Arc::get_mut(&mut self.inner) {
324 let lock_data = inner.mutex.get_mut().unwrap();
325 let data = unsafe { inner.data.get().as_mut() }.unwrap();
326 debug_assert!(!lock_data.exclusive);
327 debug_assert_eq!(lock_data.num_readers, 0);
328 Some(data)
329 } else {
330 None
331 }
332 }
333
334 /// Acquire the `RwLock` nonexclusively, read-only, blocking the task in the
335 /// meantime.
336 ///
337 /// When the returned `Future` is ready, then this task will have read-only
338 /// access to the protected data.
339 ///
340 /// # Examples
341 /// ```
342 /// # use futures_locks::*;
343 /// # use futures::executor::block_on;
344 /// # use futures::{Future, FutureExt};
345 /// # fn main() {
346 /// let rwlock = RwLock::<u32>::new(42);
347 /// let fut = rwlock.read().map(|mut guard| { *guard });
348 /// assert_eq!(block_on(fut), 42);
349 /// # }
350 ///
351 /// ```
352 pub fn read(&self) -> RwLockReadFut<T> {
353 RwLockReadFut::new(FutState::New, self.clone())
354 }
355
356 /// Acquire the `RwLock` exclusively, read-write, blocking the task in the
357 /// meantime.
358 ///
359 /// When the returned `Future` is ready, then this task will have read-write
360 /// access to the protected data.
361 ///
362 /// # Examples
363 /// ```
364 /// # use futures_locks::*;
365 /// # use futures::executor::block_on;
366 /// # use futures::{Future, FutureExt};
367 /// # fn main() {
368 /// let rwlock = RwLock::<u32>::new(42);
369 /// let fut = rwlock.write().map(|mut guard| { *guard = 5;});
370 /// block_on(fut);
371 /// assert_eq!(rwlock.try_unwrap().unwrap(), 5);
372 /// # }
373 ///
374 /// ```
375 pub fn write(&self) -> RwLockWriteFut<T> {
376 RwLockWriteFut::new(FutState::New, self.clone())
377 }
378
379 /// Attempts to acquire the `RwLock` nonexclusively.
380 ///
381 /// If the operation would block, returns `Err` instead. Otherwise, returns
382 /// a guard (not a `Future`).
383 ///
384 /// # Examples
385 /// ```
386 /// # use futures_locks::*;
387 /// # fn main() {
388 /// let mut lock = RwLock::<u32>::new(5);
389 /// let r = match lock.try_read() {
390 /// Ok(guard) => *guard,
391 /// Err(_) => panic!("Better luck next time!")
392 /// };
393 /// assert_eq!(5, r);
394 /// # }
395 /// ```
396 pub fn try_read(&self) -> Result<RwLockReadGuard<T>, TryLockError> {
397 let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
398 if lock_data.exclusive {
399 Err(TryLockError)
400 } else {
401 lock_data.num_readers += 1;
402 Ok(RwLockReadGuard{rwlock: self.clone()})
403 }
404 }
405
406 /// Attempts to acquire the `RwLock` exclusively.
407 ///
408 /// If the operation would block, returns `Err` instead. Otherwise, returns
409 /// a guard (not a `Future`).
410 ///
411 /// # Examples
412 /// ```
413 /// # use futures_locks::*;
414 /// # fn main() {
415 /// let mut lock = RwLock::<u32>::new(5);
416 /// match lock.try_write() {
417 /// Ok(mut guard) => *guard += 5,
418 /// Err(_) => panic!("Better luck next time!")
419 /// }
420 /// assert_eq!(10, lock.try_unwrap().unwrap());
421 /// # }
422 /// ```
423 pub fn try_write(&self) -> Result<RwLockWriteGuard<T>, TryLockError> {
424 let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
425 if lock_data.exclusive || lock_data.num_readers > 0 {
426 Err(TryLockError)
427 } else {
428 lock_data.exclusive = true;
429 Ok(RwLockWriteGuard{rwlock: self.clone()})
430 }
431 }
432
433 /// Release a shared lock of an `RwLock`.
434 fn unlock_reader(&self) {
435 let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
436 assert!(lock_data.num_readers > 0);
437 assert!(!lock_data.exclusive);
438 assert_eq!(lock_data.read_waiters.len(), 0);
439 lock_data.num_readers -= 1;
440 if lock_data.num_readers == 0 {
441 while let Some(tx) = lock_data.write_waiters.pop_front() {
442 if tx.send(()).is_ok() {
443 lock_data.exclusive = true;
444 return
445 }
446 }
447 }
448 }
449
450 /// Release an exclusive lock of an `RwLock`.
451 fn unlock_writer(&self) {
452 let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
453 assert!(lock_data.num_readers == 0);
454 assert!(lock_data.exclusive);
455
456 // First try to wake up any writers
457 while let Some(tx) = lock_data.write_waiters.pop_front() {
458 if tx.send(()).is_ok() {
459 return;
460 }
461 }
462
463 // If there are no writers, try to wake up readers
464 lock_data.exclusive = false;
465 lock_data.num_readers += lock_data.read_waiters.len() as u32;
466 for tx in lock_data.read_waiters.drain(..) {
467 // Ignore errors, which are due to a reader's future getting
468 // dropped before it was ready
469 let _ = tx.send(());
470 }
471 }
472}
473
474impl<T: 'static + ?Sized> RwLock<T> {
475 /// Acquires a `RwLock` nonexclusively and performs a computation on its
476 /// guarded value in a separate task. Returns a `Future` containing the
477 /// result of the computation.
478 ///
479 /// When using Tokio, this method will often hold the `RwLock` for less time
480 /// than chaining a computation to [`read`](#method.read). The reason is
481 /// that Tokio polls all tasks promptly upon notification. However, Tokio
482 /// does not guarantee that it will poll all futures promptly when their
483 /// owning task gets notified. So it's best to hold `RwLock`s within their
484 /// own tasks, lest their continuations get blocked by slow stacked
485 /// combinators.
486 ///
487 /// # Examples
488 ///
489 /// ```
490 /// # use futures_locks::*;
491 /// # use futures::{Future, future::ready};
492 /// # use tokio::runtime::Runtime;
493 /// # fn main() {
494 /// let rwlock = RwLock::<u32>::new(5);
495 /// let mut rt = Runtime::new().unwrap();
496 /// let r = rt.block_on(async {
497 /// rwlock.with_read(|mut guard| {
498 /// ready(*guard)
499 /// }).await
500 /// });
501 /// assert_eq!(r, 5);
502 /// # }
503 /// ```
504 #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
505 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
506 pub fn with_read<B, F, R>(&self, f: F)
507 -> impl Future<Output = R>
508 where F: FnOnce(RwLockReadGuard<T>) -> B + Send + 'static,
509 B: Future<Output = R> + Send + 'static,
510 R: Send + 'static,
511 T: Send
512 {
513 let jh = tokio::spawn({
514 let fut = self.read();
515 async move { f(fut.await).await }
516 });
517
518 async move { jh.await.unwrap() }
519 }
520
521 /// Like [`with_read`](#method.with_read) but for Futures that aren't
522 /// `Send`. Spawns a new task on a single-threaded Runtime to complete the
523 /// Future.
524 ///
525 /// # Examples
526 ///
527 /// ```
528 /// # use futures_locks::*;
529 /// # use futures::{Future, future::ready};
530 /// # use std::rc::Rc;
531 /// # use tokio::runtime::Runtime;
532 /// # fn main() {
533 /// // Note: Rc is not `Send`
534 /// let rwlock = RwLock::<Rc<u32>>::new(Rc::new(5));
535 /// let mut rt = Runtime::new().unwrap();
536 /// let r = rt.block_on(async {
537 /// rwlock.with_read_local(|mut guard| {
538 /// ready(**guard)
539 /// }).await
540 /// });
541 /// assert_eq!(r, 5);
542 /// # }
543 /// ```
544 #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
545 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
546 pub fn with_read_local<B, F, R>(&self, f: F)
547 -> impl Future<Output = R>
548 where F: FnOnce(RwLockReadGuard<T>) -> B + 'static + Unpin,
549 B: Future<Output = R> + 'static,
550 R: 'static
551 {
552 let local = task::LocalSet::new();
553 let jh = local.spawn_local({
554 let fut = self.read();
555 async move { f(fut.await).await }
556 });
557
558 async move {
559 local.await;
560 jh.await.unwrap()
561 }
562 }
563
564 /// Acquires a `RwLock` exclusively and performs a computation on its
565 /// guarded value in a separate task. Returns a `Future` containing the
566 /// result of the computation.
567 ///
568 /// When using Tokio, this method will often hold the `RwLock` for less time
569 /// than chaining a computation to [`write`](#method.write). The reason is
570 /// that Tokio polls all tasks promptly upon notification. However, Tokio
571 /// does not guarantee that it will poll all futures promptly when their
572 /// owning task gets notified. So it's best to hold `RwLock`s within their
573 /// own tasks, lest their continuations get blocked by slow stacked
574 /// combinators.
575 ///
576 /// # Examples
577 ///
578 /// ```
579 /// # use futures::{Future, future::ready};
580 /// # use futures_locks::*;
581 /// # use tokio::runtime::Runtime;
582 /// # fn main() {
583 /// let rwlock = RwLock::<u32>::new(0);
584 /// let mut rt = Runtime::new().unwrap();
585 /// let r = rt.block_on(async {
586 /// rwlock.with_write(|mut guard| {
587 /// *guard += 5;
588 /// ready(())
589 /// }).await
590 /// });
591 /// assert_eq!(rwlock.try_unwrap().unwrap(), 5);
592 /// # }
593 /// ```
594 #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
595 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
596 pub fn with_write<B, F, R>(&self, f: F)
597 -> impl Future<Output = R>
598 where F: FnOnce(RwLockWriteGuard<T>) -> B + Send + 'static,
599 B: Future<Output = R> + Send + 'static,
600 R: Send + 'static,
601 T: Send
602 {
603 let jh = tokio::spawn({
604 let fut = self.write();
605 async move { f(fut.await).await }
606 });
607
608 async move { jh.await.unwrap() }
609 }
610
611 /// Like [`with_write`](#method.with_write) but for Futures that aren't
612 /// `Send`. Spawns a new task on a single-threaded Runtime to complete the
613 /// Future.
614 ///
615 /// # Examples
616 ///
617 /// ```
618 /// # use futures::{Future, future::ready};
619 /// # use futures_locks::*;
620 /// # use std::rc::Rc;
621 /// # use tokio::runtime::Runtime;
622 /// # fn main() {
623 /// // Note: Rc is not `Send`
624 /// let rwlock = RwLock::<Rc<u32>>::new(Rc::new(0));
625 /// let mut rt = Runtime::new().unwrap();
626 /// let r = rt.block_on(async {
627 /// rwlock.with_write_local(|mut guard| {
628 /// *Rc::get_mut(&mut *guard).unwrap() += 5;
629 /// ready(())
630 /// }).await
631 /// });
632 /// assert_eq!(*rwlock.try_unwrap().unwrap(), 5);
633 /// # }
634 /// ```
635 #[cfg(any(feature = "tokio", all(docsrs, rustdoc)))]
636 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
637 pub fn with_write_local<B, F, R>(&self, f: F)
638 -> impl Future<Output = R>
639 where F: FnOnce(RwLockWriteGuard<T>) -> B + 'static + Unpin,
640 B: Future<Output = R> + 'static,
641 R: 'static
642 {
643 let local = task::LocalSet::new();
644 let jh = local.spawn_local({
645 let fut = self.write();
646 async move { f(fut.await).await }
647 });
648
649 async move {
650 local.await;
651 jh.await.unwrap()
652 }
653 }
654}
655
656// Clippy doesn't like the Arc within Inner. But the access rules of the RwLock
657// make it safe to send. std::sync::RwLock has the same Send impl
658#[allow(clippy::non_send_fields_in_send_ty)]
659unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
660unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}
661
662// LCOV_EXCL_START
663#[cfg(test)]
664mod t {
665 use super::*;
666
667 /// Pet Kcov
668 #[test]
669 fn debug() {
670 let m = RwLock::<u32>::new(0);
671 format!("{:?}", &m);
672 }
673
674 #[test]
675 fn test_default() {
676 let lock = RwLock::default();
677 let value: u32 = lock.try_unwrap().unwrap();
678 let expected = u32::default();
679
680 assert_eq!(expected, value);
681 }
682}
683// LCOV_EXCL_STOP