future_utils/
bi_channel.rs

1use futures::{Stream, Sink, Async, AsyncSink};
2use futures::sync::mpsc::SendError;
3use void::Void;
4use mpsc::{self, UnboundedSender, UnboundedReceiver};
5
6#[derive(Debug)]
7pub struct UnboundedBiChannel<T> {
8    tx: UnboundedSender<T>,
9    rx: UnboundedReceiver<T>,
10}
11
12impl<T> UnboundedBiChannel<T> {
13    pub fn unbounded_send(&self, val: T) -> Result<(), SendError<T>> {
14        self.tx.unbounded_send(val)
15    }
16}
17
18impl<T> Sink for UnboundedBiChannel<T> {
19    type SinkItem = T;
20    type SinkError = SendError<T>;
21
22    fn start_send(&mut self, item: T) -> Result<AsyncSink<T>, SendError<T>> {
23        self.tx.start_send(item)
24    }
25
26    fn poll_complete(&mut self) -> Result<Async<()>, SendError<T>> {
27        self.tx.poll_complete()
28    }
29}
30
31impl<T> Stream for UnboundedBiChannel<T> {
32    type Item = T;
33    type Error = Void;
34
35    fn poll(&mut self) -> Result<Async<Option<T>>, Void> {
36        self.rx.poll()
37    }
38}
39
40pub fn unbounded<T>() -> (UnboundedBiChannel<T>, UnboundedBiChannel<T>) {
41    let (tx0, rx0) = mpsc::unbounded();
42    let (tx1, rx1) = mpsc::unbounded();
43
44    (
45        UnboundedBiChannel {
46            tx: tx0,
47            rx: rx1,
48        },
49        UnboundedBiChannel {
50            tx: tx1,
51            rx: rx0,
52        },
53    )
54}
55
56#[cfg(test)]
57mod test {
58    use super::*;
59    use tokio;
60    use futures::{Future, Stream};
61
62    #[test]
63    fn test() {
64        let (ch0, ch1) = unbounded();
65
66        let res = tokio::executor::current_thread::block_on_all({
67            let data = 123u32;
68
69            ch0
70            .send(data)
71            .map_err(|_| panic!("oh no"))
72            .and_then(move |ch0| {
73                ch1
74                .into_future()
75                .map_err(|_| panic!("oh no"))
76                .and_then(move |(msg_opt, ch1)| {
77                    let msg = unwrap!(msg_opt);
78
79                    ch1
80                    .send(msg)
81                    .map_err(|_| panic!("oh no"))
82                    .and_then(move |_ch1| {
83                        ch0
84                        .into_future()
85                        .map_err(|_| panic!("oh no"))
86                        .map(move |(msg_opt, _ch0)| {
87                            let msg = unwrap!(msg_opt);
88                            assert_eq!(msg, data);
89                        })
90                    })
91                })
92            })
93        });
94
95        unwrap!(res)
96    }
97}
98