1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
use futures::{Stream, Sink, Async, AsyncSink};
use futures::sync::mpsc::SendError;
use void::Void;
use mpsc::{self, UnboundedSender, UnboundedReceiver};

#[derive(Debug)]
pub struct UnboundedBiChannel<T> {
    tx: UnboundedSender<T>,
    rx: UnboundedReceiver<T>,
}

impl<T> UnboundedBiChannel<T> {
    pub fn unbounded_send(&self, val: T) -> Result<(), SendError<T>> {
        self.tx.unbounded_send(val)
    }
}

impl<T> Sink for UnboundedBiChannel<T> {
    type SinkItem = T;
    type SinkError = SendError<T>;

    fn start_send(&mut self, item: T) -> Result<AsyncSink<T>, SendError<T>> {
        self.tx.start_send(item)
    }

    fn poll_complete(&mut self) -> Result<Async<()>, SendError<T>> {
        self.tx.poll_complete()
    }
}

impl<T> Stream for UnboundedBiChannel<T> {
    type Item = T;
    type Error = Void;

    fn poll(&mut self) -> Result<Async<Option<T>>, Void> {
        self.rx.poll()
    }
}

pub fn unbounded<T>() -> (UnboundedBiChannel<T>, UnboundedBiChannel<T>) {
    let (tx0, rx0) = mpsc::unbounded();
    let (tx1, rx1) = mpsc::unbounded();

    (
        UnboundedBiChannel {
            tx: tx0,
            rx: rx1,
        },
        UnboundedBiChannel {
            tx: tx1,
            rx: rx0,
        },
    )
}

#[cfg(test)]
mod test {
    use super::*;
    use tokio_core::reactor::Core;
    use futures::{Future, Stream};

    #[test]
    fn test() {
        let (ch0, ch1) = unbounded();

        let mut core = unwrap!(Core::new());
        let res = core.run({
            let data = 123u32;
            let msg = data;

            ch0
            .send(data)
            .map_err(|_| panic!("oh no"))
            .and_then(move |ch0| {
                ch1
                .into_future()
                .map_err(|_| panic!("oh no"))
                .and_then(move |(msg_opt, ch1)| {
                    let msg = unwrap!(msg_opt);

                    ch1
                    .send(msg)
                    .map_err(|_| panic!("oh no"))
                    .and_then(move |_ch1| {
                        ch0
                        .into_future()
                        .map_err(|_| panic!("oh no"))
                        .map(move |(msg_opt, _ch0)| {
                            let msg = unwrap!(msg_opt);
                            assert_eq!(msg, data);
                        })
                    })
                })
            })
        });

        unwrap!(res)
    }
}