1use futures::{
14 future::{
15 Future,
16 Shared,
17 },
18 FutureExt,
19 lock,
20};
21use std::{
22 fmt,
23 ops::{
24 Deref,
25 DerefMut,
26 },
27 pin::Pin,
28 sync::{
29 Arc,
30 atomic::{
31 AtomicUsize,
32 Ordering,
33 },
34 RwLock,
35 RwLockReadGuard,
36 RwLockWriteGuard,
37 Weak,
38 },
39 task::{
40 Context,
41 Poll,
42 Waker,
43 },
44};
45
46pub struct FutRwLock<T: ?Sized> {
48 inner: Arc<RwLock<T>>,
49 reader_locks: Arc<AtomicUsize>,
50 waker: Arc<RwLock<Option<Waker>>>,
51 writer_awaiting_reader_locks_future: Arc<
52 lock::Mutex<Option<Shared<WriterAwaitingReaderLocksFuture>>>
53 >,
54 writer_lock: Arc<lock::Mutex<()>>,
55}
56
57impl<T> FutRwLock<T> {
58 pub fn new(t: T) -> FutRwLock<T> {
60 FutRwLock{
61 inner: Arc::new(RwLock::new(t)),
62 reader_locks: Arc::new(AtomicUsize::new(0usize)),
63 waker: Arc::new(RwLock::new(None)),
64 writer_awaiting_reader_locks_future: Arc::new(lock::Mutex::new(None)),
65 writer_lock: Arc::new(lock::Mutex::new(())),
66 }
67 }
68}
69
70impl <T> From<T> for FutRwLock<T> {
71 fn from(t: T) -> FutRwLock<T> {
72 FutRwLock::new(t)
73 }
74}
75
76impl <T: Default> Default for FutRwLock<T> {
77 fn default() -> FutRwLock<T> {
78 FutRwLock::new(Default::default())
79 }
80}
81
82impl <T: ?Sized + fmt::Debug> fmt::Debug for FutRwLock<T> {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 f
85 .debug_struct("FutRwLock")
86 .field("inner", &self.inner)
87 .finish()
88 }
89}
90
91impl <T: ?Sized> FutRwLock<T> {
92 pub fn is_poisoned(&self) -> bool {
94 self.inner.is_poisoned()
95 }
96
97 pub async fn read(&self) -> FutRwLockReadGuard<'_, T> {
102 let _writer_lock = self.writer_lock.lock().await;
104 self.read_lock_increment();
106 FutRwLockReadGuard::new(
108 self,
109 self.inner
110 .read()
111 .unwrap_or_else(|poisoned| poisoned.into_inner()),
112 )
113 }
114
115 fn read_lock_decrement(&self) {
117 let _post_op_reader_lock_count = self
119 .reader_locks
120 .fetch_sub(1usize, Ordering::SeqCst);
121 let _ = self
126 .waker
127 .read()
128 .map(|waker_unlock_result|
129 waker_unlock_result
130 .as_ref()
131 .map(|waker|{
132 waker.wake_by_ref()
134 })
135 );
136 }
137
138 fn read_lock_increment(&self) {
140 let _ = self
141 .reader_locks
142 .fetch_add(1usize, Ordering::SeqCst);
143 }
144
145 pub fn try_read_now(&self) -> Option<FutRwLockReadGuard<'_, T>> {
147 self
148 .writer_lock
149 .try_lock()
150 .map(|_writer_lock_guard| {
151 self.read_lock_increment();
153 FutRwLockReadGuard::new(
155 self,
156 self.inner
157 .read()
158 .unwrap_or_else(|poisoned| poisoned.into_inner()),
159 )
160 })
161 }
162
163 pub fn try_write_now(&self) -> Option<FutRwLockWriteGuard<'_, T>> {
165 if let Some (writer_lock) = self
166 .writer_lock
167 .try_lock()
168 {
169 if self.reader_locks.load(Ordering::SeqCst) == 0 {
171 Some(FutRwLockWriteGuard::new(
173 self,
174 self.inner
175 .write()
176 .unwrap_or_else(|poisoned| poisoned.into_inner()),
177 writer_lock ))
179 } else {
180 None
182 }
183 } else {
184 None
186 }
187 }
188
189 pub async fn write(&self) -> FutRwLockWriteGuard<'_, T> {
195 let writer_lock = self.writer_lock.lock().await;
197 if self.reader_locks.load(Ordering::SeqCst) > 0 {
199 let new_writer_awaiting_reader_locks_future = WriterAwaitingReaderLocksFuture{
201 reader_locks: Arc::downgrade(&self.reader_locks),
202 waker: Arc::downgrade(&self.waker),
203 };
204 let shared_future = new_writer_awaiting_reader_locks_future.shared();
205 self.writer_awaiting_reader_locks_future
208 .lock()
209 .await
210 .replace(
211 shared_future.clone()
212 );
213 let _ = shared_future.await;
215 *self.writer_awaiting_reader_locks_future.lock().await = None;
217 }
218 FutRwLockWriteGuard::new(
220 self,
221 self.inner
222 .write()
223 .unwrap_or_else(|poisoned| poisoned.into_inner()),
224 writer_lock )
226 }
227}
228
229pub struct FutRwLockReadGuard<'a, T: ?Sized + 'a> {
231 async_rwlock: &'a FutRwLock<T>,
232 inner_read_guard: RwLockReadGuard<'a, T>,
233 }
235
236impl <T: ?Sized> Deref for FutRwLockReadGuard <'_, T> {
237 type Target = T;
238 fn deref(&self) -> &Self::Target {
239 self.inner_read_guard.deref()
241 }
242}
243
244impl <'a, T: ?Sized + 'a> Drop for FutRwLockReadGuard <'a, T>{
245 fn drop(&mut self) {
246 self
248 .async_rwlock
249 .read_lock_decrement();
250 }
251}
252
253impl <'a, T: 'a + ?Sized > FutRwLockReadGuard <'a, T> {
254 fn new(
256 async_rwlock: &'a FutRwLock<T>,
257 inner_read_guard: RwLockReadGuard<'a, T>,
258 ) -> FutRwLockReadGuard<'a, T> {
260 FutRwLockReadGuard {
261 async_rwlock,
262 inner_read_guard,
263 }
265 }
266}
267
268#[allow(dead_code)]
270pub struct FutRwLockWriteGuard<'a, T: ?Sized + 'a> {
271 async_rwlock: &'a FutRwLock<T>,
272 inner_write_guard: RwLockWriteGuard<'a, T>,
273 writer_lock: lock::MutexGuard<'a, ()>,
274}
275
276impl <'a, T: 'a + ?Sized > FutRwLockWriteGuard <'a, T> {
277 fn new(
278 async_rwlock: &'a FutRwLock<T>,
279 inner_write_guard: RwLockWriteGuard<'a, T>,
280 writer_lock: lock::MutexGuard<'a, ()>,
281 ) -> FutRwLockWriteGuard<'a, T> {
282 FutRwLockWriteGuard {
283 async_rwlock,
284 inner_write_guard,
285 writer_lock,
286 }
287 }
288}
289
290impl <'a, T:'a + ?Sized> Deref for FutRwLockWriteGuard <'a, T> {
291 type Target = T;
292 fn deref(&self) -> &Self::Target {
293 self.inner_write_guard.deref()
295 }
296}
297
298impl <'a, T:'a + ?Sized> DerefMut for FutRwLockWriteGuard <'a, T> {
299 fn deref_mut(&mut self) -> &mut Self::Target {
300 self.inner_write_guard.deref_mut()
302 }
303}
304
305impl <T: fmt::Debug> fmt::Debug for FutRwLockWriteGuard<'_, T> {
306 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307 f
308 .debug_struct("FutRwLockWriteGuard")
309 .field("async_rwlock", &self.async_rwlock)
310 .field("inner_write_guard", &self.inner_write_guard)
311 .finish()
312 }
313}
314
315impl <T: ?Sized + fmt::Display> fmt::Display for FutRwLockWriteGuard<'_, T> {
316 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
317 (*self.inner_write_guard).fmt(f)
318 }
319}
320
321struct WriterAwaitingReaderLocksFuture {
323 reader_locks: Weak<AtomicUsize>,
324 waker: Weak<RwLock<Option<Waker>>>,
325}
326
327impl Future for WriterAwaitingReaderLocksFuture {
328 type Output = Result<(), ()>;
329 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
330 if let Some(reader_locks_atomicusize) = self.reader_locks.upgrade() {
332 if let Some(waker_rwlock) = self.waker.upgrade() {
333 if reader_locks_atomicusize.load(Ordering::SeqCst) > 0 {
335 waker_rwlock
337 .write()
338 .unwrap_or_else(|poisoned| poisoned.into_inner())
339 .replace(cx.waker().clone());
340 Poll::Pending
342 } else {
343 let _ = waker_rwlock
346 .write()
347 .unwrap_or_else(|poisoned| poisoned.into_inner())
348 .take();
349 Poll::Ready(Ok(()))
351 }
352 } else {
353 Poll::Ready(Err(()))
356 }
358 } else {
359 Poll::Ready(Err(()))
362 }
363 }
364}
365
366
367
368
369
370#[cfg(test)]
371mod tests {
372 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
373
374 use super::*;
375 use futures::{
377 join,
378 future::{
379 join_all,
380 },
381 };
382
383 use rand::{RngCore,SeedableRng};
384 use rand_chacha::ChaChaRng;
385
386 use wasm_bindgen_test::*;
387 use instant::{Instant, Duration};
390
391
392 async fn get_some_reads_then_wait(
395 rwlock: &FutRwLock<()>,
396 wait_ns: u64
397 ) -> Instant {
398 let _read_1 = rwlock.read().await;
399 let _read_2 = rwlock.read().await;
400 let reads_acquired_instant = Instant::now();
401 sleep(wait_ns).await;
402 reads_acquired_instant
403 }
404
405 async fn get_write_then_wait (
408 rwlock: &FutRwLock<()>,
409 wait_ns: u64,
410 ) -> Instant {
411 let _write_0 = rwlock.write().await;
412 let write_acquired_instant = Instant::now();
413 sleep(wait_ns).await;
414 write_acquired_instant
415 }
416
417 async fn get_write (
420 rwlock: &FutRwLock<()>,
421 ) -> Instant {
422 let _write_0 = rwlock.write().await;
423 let write_acquired_instant = Instant::now();
424 write_acquired_instant
425 }
426
427 async fn get_some_reads(
430 rwlock: &FutRwLock<()>,
431 ) -> Instant {
432 let _read_1 = rwlock.read().await;
433 let _read_2 = rwlock.read().await;
434 let reads_acquired_instant = Instant::now();
435
436 reads_acquired_instant
437 }
438
439 async fn sleep(t: u64) {
441 let msg: String = format!("{}", t as i32);
442 if cfg!(target_arch = "wasm32") {
443 let fuff = js_function_promisify::Callback::new(move || {
444 web_sys::console::warn_1(js_sys::JsString::from(msg.clone()).as_ref());
445 Ok("".into())
446 });
447 let window = web_sys::window().expect("Must get window to sleep");
448 let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(
449 fuff.as_function().as_ref(), t as i32,
450 ).unwrap();
451 let _ = fuff.await;
452 } else {
453 #[cfg(not(target_arch = "wasm32"))]
454 let _ = tokio::time::sleep(Duration::from_millis(t)).await;
455 std::println!("{}", msg );
456 }
457 }
458
459 #[cfg(not(target_arch = "wasm32"))]
461 macro_rules! run_test {
462 ($f:ident) => {
463 {
464 tokio_test::block_on($f())
465 }
466 };
467 }
468 #[cfg(target_arch = "wasm32")]
469 macro_rules! run_test {
470 ($f:ident) => {
471 {
472 wasm_bindgen_futures::spawn_local($f())
473 }
474 };
475 }
476
477 #[test]
478 #[wasm_bindgen_test]
479 fn write_mutates_inner_value() {
480 let async_test = move || async {
481 let p: FutRwLock<Option<u8>> = FutRwLock::new(None);
482 let mut a = p.write().await;
483 *a = Some(16);
484 assert_eq!(*a, Some(16));
485 };
486 run_test!(async_test)
487 }
488
489 #[test]
490 #[wasm_bindgen_test]
491 fn write_awaits_reads() {
492 let async_test = move || async {
493 let p: FutRwLock<()> = FutRwLock::new(());
494 let target_wait_ns = 10u64;
495 let (reads_acquired_instant, write_acquired_instant) = join!(
496 get_some_reads_then_wait(&p, target_wait_ns),
497 get_write(&p),
498 );
499 assert!(
500 write_acquired_instant >= reads_acquired_instant,
501 "Writes acquired after reads done"
502 );
503 let read_to_write_duration = write_acquired_instant
504 .duration_since(reads_acquired_instant);
505
506 assert!(
507 read_to_write_duration >= Duration::from_nanos(target_wait_ns),
508 "Writes acquired after reads done by duration magnitude"
509 );
510 };
511 run_test!(async_test)
512 }
513
514 #[test]
515 #[wasm_bindgen_test]
516 fn write_order () {
517 let async_test = move || async {
518 type TimeTy = u64;
519 type ValTy = i8;
520 type TestTy = Vec<ValTy>;
521 type FutrwTy = FutRwLock<TestTy>;
522
523 const TEST_DEPTH:usize = 32;
524 const TEST_NS_STEP:TimeTy = 100;
525
526 let p: FutrwTy = FutRwLock::new(Vec::with_capacity(TEST_DEPTH));
527 {
528 let mut rng = ChaChaRng::from_entropy();
529 type Spec = (
530 TimeTy, usize, ValTy );
534 async fn sleep_then_write_push (
537 rwlock: &FutrwTy,
538 ns: TimeTy,
539 val: ValTy,
540 ) -> Instant {
541 sleep(ns).await;
542 let write_acquire_attempted_instant = Instant::now();
543 let mut w = rwlock.write().await;
544 (*w).push(val.clone());
545 write_acquire_attempted_instant
546 }
547
548 let mut specs: Vec<Spec> = Vec::with_capacity(TEST_DEPTH);
549 for i in 0..TEST_DEPTH {
550 specs.push((
551 (TEST_DEPTH - i) as TimeTy * TEST_NS_STEP, i, rng.next_u32() as ValTy, ));
555 }
556
557 let (futs, target) = specs.iter().fold(
558 (vec![], vec![]),
559 |(mut fs, mut ts), (ns, ind, val)| {
560 fs.push(sleep_then_write_push(&p, *ns, *val));
561 ts.push((ns, ind, val));
562 (fs, ts)
563 }
564 );
565 let target_vals : Vec<ValTy> = target.iter().map(|(_,_,v)| **v).collect();
567 let write_acquired_attempt_instants: Vec<Instant> = join_all(futs).await;
568 let mut targets_by_acquisition_attempt_instant: Vec<(Instant, ValTy)> = (0..TEST_DEPTH)
569 .map(|i| (write_acquired_attempt_instants[i], target_vals[i]) ).collect();
570 targets_by_acquisition_attempt_instant.sort_by(|(i,_), (j,_)| i.partial_cmp(j).unwrap());
571 let target_vals : Vec<ValTy> = targets_by_acquisition_attempt_instant.iter().map(|(_,v)| *v).collect();
572 let result = p.read().await;
573 assert_eq!(
574 *result,
575 target_vals,
576 "Write results in a buffer must match acquire attempt order: \n{:#?}",
577 specs
578 )
579 }
580 };
581 run_test!(async_test)
582 }
583
584 #[test]
585 #[wasm_bindgen_test]
586 fn write_then_drop_then_read() {
587 let async_test = move || async {
588 let p: FutRwLock<Option<u8>> = FutRwLock::new(None);
589 {
590 let mut a = p.write().await;
591 *a = Some(16);
592 drop(a);
593 let b = *p.read().await;
594 assert_eq!(b, Some(16), "read following write 1 must see new value");
595 }
596 let mut a = p.write().await;
597 *a = Some(144);
598 drop(a);
599 let b = *p.read().await;
600 assert_eq!(b, Some(144), "read following write 2 must see new value");
601 };
602 run_test!(async_test)
603 }
604
605 #[test]
606 #[wasm_bindgen_test]
607 fn write_prevents_try_write_now_and_try_read_now() {
608 let async_test = move || async {
609 let p: FutRwLock<Option<u8>> = FutRwLock::new(None);
610 let _a = p.write().await;
611 assert!(p.try_write_now().is_none(), "try_write_now returns None when write lock active");
612 assert!(p.try_read_now().is_none(), "try_read_now returns None when write lock active");
613 assert!(p.try_write_now().is_none(), "try_write_now returns None when write lock active");
614 assert!(p.try_read_now().is_none(), "try_read_now returns None when write lock active");
615 drop(_a);
616 assert!(p.try_write_now().is_some(), "try_write_now returns Some when write lock active");
617 assert!(p.try_read_now().is_some(), "try_read_now returns Some when write lock active");
618 };
619 run_test!(async_test)
620 }
621
622 #[test]
623 #[wasm_bindgen_test]
624 fn reads_await_write() {
625 let async_test = move || async {
626 let p: FutRwLock<()> = FutRwLock::new(());
627 let target_wait_ns = 10u64;
628 let (write_acquired_instant, reads_acquired_instant) = join!(
629 get_write_then_wait(&p, target_wait_ns),
630 get_some_reads(&p),
631 );
632 assert!(
633 reads_acquired_instant >= write_acquired_instant,
634 "Read acquired after write done"
635 );
636 let write_to_read_duration = reads_acquired_instant
637 .duration_since(write_acquired_instant);
638
639 assert!(
640 write_to_read_duration >= Duration::from_nanos(target_wait_ns),
641 "Reads acquired after write done by duration magnitude"
642 );
643 };
644 run_test!(async_test)
645 }
646
647
648 #[test]
649 #[wasm_bindgen_test]
650 fn read_one() {
651 let async_test = move || async {
652 let p: FutRwLock<Option<u8>> = FutRwLock::new(Some(22));
653 let a = p.read().await;
654 assert_eq!(*a, Some(22));
655 };
656 run_test!(async_test)
657 }
658
659 #[test]
660 #[wasm_bindgen_test]
661 fn read_multiple() {
662 let async_test = move || async {
663 let p: FutRwLock<Option<u8>> = FutRwLock::new(Some(22));
664 let a = p.read().await;
665 let a_0 = p.read().await;
666 assert_eq!(*a, Some(22));
667 assert_eq!(*a_0, Some(22));
668 };
669 run_test!(async_test)
670 }
671
672 #[test]
673 #[wasm_bindgen_test]
674 fn read_prevents_try_write_now_and_allows_try_read_now() {
675 let async_test = move || async {
676 let p: FutRwLock<u16> = FutRwLock::new(1622);
677 let read_0 = p.read().await;
678 let read_1_opt = p.try_read_now();
679 assert!(p.try_write_now().is_none(), "try_write_now returns None when read lock active");
680 assert!(read_1_opt.is_some(), "try_read_now returns Some when read lock active");
681 assert_eq!(*read_0, *read_1_opt.unwrap(), "Read and try read now point to same");
682 };
683 run_test!(async_test)
684 }
685
686 }