tokio01_test/task.rs
1//! Futures task based helpers
2//!
3//! # Example
4//!
5//! This example will use the `MockTask` to set the current task on
6//! poll.
7//!
8//! ```
9//! # #[macro_use] extern crate tokio01_test;
10//! # extern crate futures;
11//! # use tokio01_test::task::MockTask;
12//! # use futures::{sync::mpsc, Stream, Sink, Future, Async};
13//! let mut task = MockTask::new();
14//! let (tx, mut rx) = mpsc::channel(5);
15//!
16//! tx.send(()).wait();
17//!
18//! assert_ready_eq!(task.enter(|| rx.poll()), Some(()));
19//! ```
20
21use futures::executor::{spawn, Notify};
22use futures::{future, Async};
23
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::sync::{Arc, Condvar, Mutex};
26
27/// Mock task
28///
29/// A mock task is able to intercept and track notifications.
30#[derive(Debug)]
31pub struct MockTask {
32 notify: Arc<ThreadNotify>,
33}
34
35#[derive(Debug)]
36struct ThreadNotify {
37 state: AtomicUsize,
38 mutex: Mutex<()>,
39 condvar: Condvar,
40}
41
42const IDLE: usize = 0;
43const NOTIFY: usize = 1;
44const SLEEP: usize = 2;
45
46impl MockTask {
47 /// Create a new mock task
48 pub fn new() -> Self {
49 MockTask {
50 notify: Arc::new(ThreadNotify::new()),
51 }
52 }
53
54 /// Run a closure from the context of the task.
55 ///
56 /// Any notifications resulting from the execution of the closure are
57 /// tracked.
58 pub fn enter<F, R>(&mut self, f: F) -> R
59 where
60 F: FnOnce() -> R,
61 {
62 self.notify.clear();
63
64 let res = spawn(future::lazy(|| Ok::<_, ()>(f()))).poll_future_notify(&self.notify, 0);
65
66 match res.unwrap() {
67 Async::Ready(v) => v,
68 _ => unreachable!(),
69 }
70 }
71
72 /// Returns `true` if the inner future has received a readiness notification
73 /// since the last call to `enter`.
74 pub fn is_notified(&self) -> bool {
75 self.notify.is_notified()
76 }
77
78 /// Returns the number of references to the task notifier
79 ///
80 /// The task itself holds a reference. The return value will never be zero.
81 pub fn notifier_ref_count(&self) -> usize {
82 Arc::strong_count(&self.notify)
83 }
84}
85
86impl ThreadNotify {
87 fn new() -> Self {
88 ThreadNotify {
89 state: AtomicUsize::new(IDLE),
90 mutex: Mutex::new(()),
91 condvar: Condvar::new(),
92 }
93 }
94
95 /// Clears any previously received notify, avoiding potential spurrious
96 /// notifications. This should only be called immediately before running the
97 /// task.
98 fn clear(&self) {
99 self.state.store(IDLE, Ordering::SeqCst);
100 }
101
102 fn is_notified(&self) -> bool {
103 match self.state.load(Ordering::SeqCst) {
104 IDLE => false,
105 NOTIFY => true,
106 _ => unreachable!(),
107 }
108 }
109}
110
111impl Notify for ThreadNotify {
112 fn notify(&self, _unpark_id: usize) {
113 // First, try transitioning from IDLE -> NOTIFY, this does not require a
114 // lock.
115 match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
116 IDLE | NOTIFY => return,
117 SLEEP => {}
118 _ => unreachable!(),
119 }
120
121 // The other half is sleeping, this requires a lock
122 let _m = self.mutex.lock().unwrap();
123
124 // Transition from SLEEP -> NOTIFY
125 match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) {
126 SLEEP => {}
127 _ => return,
128 }
129
130 // Wakeup the sleeper
131 self.condvar.notify_one();
132 }
133}