future_utils/
bi_channel.rs1use futures::{Stream, Sink, Async, AsyncSink};
2use futures::sync::mpsc::SendError;
3use void::Void;
4use mpsc::{self, UnboundedSender, UnboundedReceiver};
5
6#[derive(Debug)]
7pub struct UnboundedBiChannel<T> {
8 tx: UnboundedSender<T>,
9 rx: UnboundedReceiver<T>,
10}
11
12impl<T> UnboundedBiChannel<T> {
13 pub fn unbounded_send(&self, val: T) -> Result<(), SendError<T>> {
14 self.tx.unbounded_send(val)
15 }
16}
17
18impl<T> Sink for UnboundedBiChannel<T> {
19 type SinkItem = T;
20 type SinkError = SendError<T>;
21
22 fn start_send(&mut self, item: T) -> Result<AsyncSink<T>, SendError<T>> {
23 self.tx.start_send(item)
24 }
25
26 fn poll_complete(&mut self) -> Result<Async<()>, SendError<T>> {
27 self.tx.poll_complete()
28 }
29}
30
31impl<T> Stream for UnboundedBiChannel<T> {
32 type Item = T;
33 type Error = Void;
34
35 fn poll(&mut self) -> Result<Async<Option<T>>, Void> {
36 self.rx.poll()
37 }
38}
39
40pub fn unbounded<T>() -> (UnboundedBiChannel<T>, UnboundedBiChannel<T>) {
41 let (tx0, rx0) = mpsc::unbounded();
42 let (tx1, rx1) = mpsc::unbounded();
43
44 (
45 UnboundedBiChannel {
46 tx: tx0,
47 rx: rx1,
48 },
49 UnboundedBiChannel {
50 tx: tx1,
51 rx: rx0,
52 },
53 )
54}
55
56#[cfg(test)]
57mod test {
58 use super::*;
59 use tokio;
60 use futures::{Future, Stream};
61
62 #[test]
63 fn test() {
64 let (ch0, ch1) = unbounded();
65
66 let res = tokio::executor::current_thread::block_on_all({
67 let data = 123u32;
68
69 ch0
70 .send(data)
71 .map_err(|_| panic!("oh no"))
72 .and_then(move |ch0| {
73 ch1
74 .into_future()
75 .map_err(|_| panic!("oh no"))
76 .and_then(move |(msg_opt, ch1)| {
77 let msg = unwrap!(msg_opt);
78
79 ch1
80 .send(msg)
81 .map_err(|_| panic!("oh no"))
82 .and_then(move |_ch1| {
83 ch0
84 .into_future()
85 .map_err(|_| panic!("oh no"))
86 .map(move |(msg_opt, _ch0)| {
87 let msg = unwrap!(msg_opt);
88 assert_eq!(msg, data);
89 })
90 })
91 })
92 })
93 });
94
95 unwrap!(res)
96 }
97}
98