relay/
lib.rs

1//! # relay
2//!
3//! A light-weight channel using `Future`. A relay channel does not implement
4//! `Send`, and so is not meant for synchronizing between threads. Instead,
5//! its used to send message between tasks that live in the same thread.
6//!
7//! It is similar to the `oneshot` channel in the `futures` crate, but since
8//! it is not meant for sending across threads, it performs about twice as
9//! fast.
10//!
11//! ## Example
12//!
13//! ```rust
14//! # extern crate futures;
15//! # extern crate relay;
16//! # use futures::Future;
17//! # fn main() {
18//! let (tx, rx) = relay::channel();
19//! tx.complete("foo");
20//! assert_eq!(rx.wait().unwrap(), "foo");
21//! # }
22//! ```
23#![deny(warnings)]
24#![deny(missing_docs)]
25#![deny(missing_debug_implementations)]
26extern crate futures;
27
28use std::cell::RefCell;
29use std::fmt;
30use std::rc::Rc;
31
32use futures::{Future, Poll, Async};
33use futures::task::{self, Task};
34
35/// Create a new channel to send a message.
36pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
37    let inner = Rc::new(RefCell::new(Inner {
38        value: None,
39        complete: false,
40        tx_task: None,
41        rx_task: None,
42    }));
43    let tx = Sender {
44        inner: inner.clone(),
45    };
46    let rx = Receiver {
47        inner: inner,
48    };
49    (tx, rx)
50}
51
52/// The Sender portion of a channel.
53pub struct Sender<T> {
54    inner: Rc<RefCell<Inner<T>>>,
55}
56
57impl<T> Sender<T> {
58    /// Sends the message to the `Receiver`.
59    pub fn complete(self, val: T) {
60        let mut borrow = self.inner.borrow_mut();
61        borrow.value = Some(val);
62    }
63
64    /// Returns true if the `Receiver` has been dropped.
65    pub fn is_canceled(&self) -> bool {
66        self.inner.borrow().complete
67    }
68
69    /// Creates a `Future` that waits until someone is waiting on the `Receiver`.
70    pub fn waiting(self) -> Waiting<T> {
71        Waiting {
72            tx: Some(self),
73        }
74    }
75}
76
77impl<T> Drop for Sender<T> {
78    fn drop(&mut self) {
79        let rx_task = {
80            let mut borrow = self.inner.borrow_mut();
81            borrow.complete = true;
82            borrow.tx_task.take();
83            borrow.rx_task.take()
84        };
85        if let Some(task) = rx_task {
86            task.notify();
87        }
88    }
89}
90
91impl<T> fmt::Debug for Sender<T> {
92    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
93        f.pad("Sender")
94    }
95}
96
97/// The receiver end of the channel.
98///
99/// The Receiver is a `Future` that resolves to the sent message.
100pub struct Receiver<T> {
101    inner: Rc<RefCell<Inner<T>>>,
102}
103
104impl<T> Receiver<T> {
105    /// Returns true if the `Sender` was dropped without sending a message.
106    pub fn is_canceled(&self) -> bool {
107        let borrow = self.inner.borrow();
108        borrow.complete && borrow.value.is_none()
109    }
110
111    /// Tries to receive the value if completed.
112    ///
113    /// - Ok(Some(T)): Success.
114    /// - Ok(None): Not ready yet.
115    /// - Err(Canceled): The sender dropped without sending a value.
116    ///
117    /// It is safe to call this when not in a futures task context.
118    pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
119        self.recv_inner(false)
120    }
121
122    fn recv_inner(&mut self, should_park: bool) -> Result<Option<T>, Canceled> {
123        let mut borrow = self.inner.borrow_mut();
124        if let Some(val) = borrow.value.take() {
125            Ok(Some(val))
126        } else if borrow.complete {
127            Err(Canceled)
128        } else {
129            if should_park {
130                borrow.rx_task = Some(task::current());
131            }
132            if let Some(task) = borrow.tx_task.take() {
133                task.notify();
134            }
135            Ok(None)
136        }
137    }
138}
139
140impl<T> Future for Receiver<T> {
141    type Item = T;
142    type Error = Canceled;
143
144    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
145        self.recv_inner(true).map(|opt| match opt {
146            Some(t) => Async::Ready(t),
147            None => Async::NotReady,
148        })
149    }
150}
151
152impl<T> Drop for Receiver<T> {
153    fn drop(&mut self) {
154        let tx_task = {
155            let mut borrow = self.inner.borrow_mut();
156            borrow.complete = true;
157            borrow.rx_task.take();
158            borrow.tx_task.take()
159        };
160        if let Some(task) = tx_task {
161            task.notify();
162        }
163    }
164}
165
166impl<T> fmt::Debug for Receiver<T> {
167    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
168        f.pad("Receiver")
169    }
170}
171
172/// A `Future` waiting for interest to be registered on the `Receiver`.
173pub struct Waiting<T> {
174    tx: Option<Sender<T>>,
175}
176
177impl<T> Future for Waiting<T> {
178    type Item = Sender<T>;
179    type Error = Canceled;
180
181    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
182        if self.tx.as_ref().unwrap().is_canceled() {
183            Err(Canceled)
184        } else if self.tx.as_ref().unwrap().inner.borrow().rx_task.is_some() {
185            Ok(Async::Ready(self.tx.take().unwrap()))
186        } else {
187            self.tx.as_ref().unwrap().inner.borrow_mut().tx_task = Some(task::current());
188            Ok(Async::NotReady)
189        }
190    }
191}
192
193impl<T> fmt::Debug for Waiting<T> {
194    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
195        f.pad("Waiting")
196    }
197}
198
199/// Represents that the `Sender` dropped before sending a message.
200#[derive(Debug, Clone, Copy, PartialEq)]
201pub struct Canceled;
202
203struct Inner<T> {
204    value: Option<T>,
205    complete: bool,
206    tx_task: Option<Task>,
207    rx_task: Option<Task>,
208}
209
210#[cfg(test)]
211mod tests {
212    use futures::Future;
213    use super::channel;
214
215    #[test]
216    fn test_smoke() {
217        let (tx, rx) = channel();
218        tx.complete(33);
219        assert_eq!(rx.wait().unwrap(), 33);
220    }
221
222    #[test]
223    fn test_canceled() {
224        let (_, rx) = channel::<()>();
225        assert_eq!(rx.wait().unwrap_err(), super::Canceled);
226    }
227
228    #[test]
229    fn test_is_canceled() {
230        let (tx, _) = channel::<()>();
231        assert!(tx.is_canceled());
232
233        let (_, rx) = channel::<()>();
234        assert!(rx.is_canceled());
235
236        let (tx, rx) = channel::<()>();
237        assert!(!tx.is_canceled());
238        assert!(!rx.is_canceled());
239
240        tx.complete(());
241        assert!(!rx.is_canceled());
242    }
243
244    #[test]
245    fn test_tx_complete_rx_unparked() {
246        let (tx, rx) = channel();
247
248        let res = rx.join(::futures::lazy(move || {
249            tx.complete(55);
250            Ok(11)
251        }));
252        assert_eq!(res.wait().unwrap(), (55, 11));
253    }
254
255    #[test]
256    fn test_tx_dropped_rx_unparked() {
257        let (tx, rx) = channel::<i32>();
258
259        let res = rx.join(::futures::lazy(move || {
260            let _tx = tx;
261            Ok(11)
262        }));
263        assert_eq!(res.wait().unwrap_err(), super::Canceled);
264    }
265
266    #[test]
267    fn test_waiting_unparked() {
268        let (tx, rx) = channel::<i32>();
269
270        let res = tx.waiting().join(::futures::lazy(move || {
271            let mut rx = rx;
272            let _ = rx.poll(); // unpark
273            Ok(rx)
274        })).and_then(|(tx, rx)| {
275            tx.complete(5);
276            rx
277        });
278        assert_eq!(res.wait().unwrap(), 5);
279    }
280
281    #[test]
282    fn test_waiting_canceled() {
283        let (tx, rx) = channel::<i32>();
284
285        let res = tx.waiting().join(::futures::lazy(move || {
286            let _rx = rx;
287            Ok(())
288        }));
289        assert_eq!(res.wait().unwrap_err(), super::Canceled);
290    }
291
292    #[test]
293    fn try_recv() {
294        let (tx, mut rx) = channel::<i32>();
295
296        assert!(rx.try_recv().unwrap().is_none());
297
298        ::futures::lazy(move || {
299            tx.complete(5);
300            Ok::<(), ()>(())
301        }).wait().unwrap();
302
303        assert_eq!(rx.try_recv().unwrap(), Some(5));
304    }
305
306    #[test]
307    fn try_recv_canceled() {
308        let (tx, mut rx) = channel::<i32>();
309
310        assert!(rx.try_recv().unwrap().is_none());
311
312        ::futures::lazy(move || {
313            let _tx = tx;
314            Ok::<(), ()>(())
315        }).wait().unwrap();
316
317        assert!(rx.try_recv().is_err());
318    }
319}