tokio01_test/
task.rs

1//! Futures task based helpers
2//!
3//! # Example
4//!
5//! This example will use the `MockTask` to set the current task on
6//! poll.
7//!
8//! ```
9//! # #[macro_use] extern crate tokio01_test;
10//! # extern crate futures;
11//! # use tokio01_test::task::MockTask;
12//! # use futures::{sync::mpsc, Stream, Sink, Future, Async};
13//! let mut task = MockTask::new();
14//! let (tx, mut rx) = mpsc::channel(5);
15//!
16//! tx.send(()).wait();
17//!
18//! assert_ready_eq!(task.enter(|| rx.poll()), Some(()));
19//! ```
20
21use futures::executor::{spawn, Notify};
22use futures::{future, Async};
23
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::sync::{Arc, Condvar, Mutex};
26
27/// Mock task
28///
29/// A mock task is able to intercept and track notifications.
30#[derive(Debug)]
31pub struct MockTask {
32    notify: Arc<ThreadNotify>,
33}
34
35#[derive(Debug)]
36struct ThreadNotify {
37    state: AtomicUsize,
38    mutex: Mutex<()>,
39    condvar: Condvar,
40}
41
42const IDLE: usize = 0;
43const NOTIFY: usize = 1;
44const SLEEP: usize = 2;
45
46impl MockTask {
47    /// Create a new mock task
48    pub fn new() -> Self {
49        MockTask {
50            notify: Arc::new(ThreadNotify::new()),
51        }
52    }
53
54    /// Run a closure from the context of the task.
55    ///
56    /// Any notifications resulting from the execution of the closure are
57    /// tracked.
58    pub fn enter<F, R>(&mut self, f: F) -> R
59    where
60        F: FnOnce() -> R,
61    {
62        self.notify.clear();
63
64        let res = spawn(future::lazy(|| Ok::<_, ()>(f()))).poll_future_notify(&self.notify, 0);
65
66        match res.unwrap() {
67            Async::Ready(v) => v,
68            _ => unreachable!(),
69        }
70    }
71
72    /// Returns `true` if the inner future has received a readiness notification
73    /// since the last call to `enter`.
74    pub fn is_notified(&self) -> bool {
75        self.notify.is_notified()
76    }
77
78    /// Returns the number of references to the task notifier
79    ///
80    /// The task itself holds a reference. The return value will never be zero.
81    pub fn notifier_ref_count(&self) -> usize {
82        Arc::strong_count(&self.notify)
83    }
84}
85
86impl ThreadNotify {
87    fn new() -> Self {
88        ThreadNotify {
89            state: AtomicUsize::new(IDLE),
90            mutex: Mutex::new(()),
91            condvar: Condvar::new(),
92        }
93    }
94
95    /// Clears any previously received notify, avoiding potential spurrious
96    /// notifications. This should only be called immediately before running the
97    /// task.
98    fn clear(&self) {
99        self.state.store(IDLE, Ordering::SeqCst);
100    }
101
102    fn is_notified(&self) -> bool {
103        match self.state.load(Ordering::SeqCst) {
104            IDLE => false,
105            NOTIFY => true,
106            _ => unreachable!(),
107        }
108    }
109}
110
111impl Notify for ThreadNotify {
112    fn notify(&self, _unpark_id: usize) {
113        // First, try transitioning from IDLE -> NOTIFY, this does not require a
114        // lock.
115        match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
116            IDLE | NOTIFY => return,
117            SLEEP => {}
118            _ => unreachable!(),
119        }
120
121        // The other half is sleeping, this requires a lock
122        let _m = self.mutex.lock().unwrap();
123
124        // Transition from SLEEP -> NOTIFY
125        match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) {
126            SLEEP => {}
127            _ => return,
128        }
129
130        // Wakeup the sleeper
131        self.condvar.notify_one();
132    }
133}