1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
//! Bi-directional channels built on [crossbeam_channel](https://docs.rs/crossbeam-channel/0.2.6/crossbeam_channel/)
//!
//! # Examples
//!
//! ```rust
//! # use doublecross::unbounded;
//! # use std::thread;
//! // doublecross supports different types for each
//! // direction in the channel: the type arguments are
//! // the receive/send types for the left channel, and 
//! // the send/receive types for the right channel.
//! let (left, right) = unbounded::<bool, i32>();
//! thread::spawn(move || {
//!     loop {
//!         let val = right.recv().unwrap();
//!         right.send(val % 2 == 0);
//!     }
//! });
//!
//! for i in 0..10 {
//!     left.send(i);
//!     if left.recv().unwrap() {
//!         println!("{} is even", i);
//!     }
//! }
//! ```
#[macro_use]
extern crate crossbeam_channel as channel;
use channel::internal::select::{RecvArgument, SendArgument};
use channel::{Receiver, Sender};
use std::option;

/// Creates a bi-directional channel of bounded capacity.
///
/// This type of channel has an internal buffer of length `cap` in which
/// messages get queued; the buffer for each side of the channel is distinct.
///
/// A rather special case is a zero-capacity channel, also known as a
/// rendezvous channel. Such a channel cannot hold any messages since
/// its buffer is of length zero. Instead, send and receive operations
/// must be executing at the same time in order to pair up and pass
/// the message over.
/// 
/// # Type Arguments
///
/// Unforunately it is often necessary to annotate types to help the compiler;
/// the type `T` refers to the type the left channel receives and the right
/// channel sends, and the type `U` inversely refers to the type the right
/// channel receives and the left channel sends.
///
/// # Warning
///
/// No effort is made to prevent a deadlock (ie both sides waiting
/// for a buffer space on the other side) and it is important to
/// use this channel in such a way as to avoid deadlocks, or to
/// recognize their occurence and handle them.
///
/// # Examples
///
/// ```rust
/// # use doublecross::bounded;
/// # use std::thread;
/// let (left, right) = bounded::<(), ()>(0);
///
/// thread::spawn(move || {
///     // ...
///     left.send(());
/// });
///
/// println!("waiting for rendezvous");
/// right.recv().unwrap();
/// println!("rendezvous complete");
/// ```
pub fn bounded<T, U>(cap: usize) -> (BiChannel<T, U>, BiChannel<U, T>) {
    let (tx1, rx1) = channel::bounded(cap);
    let (tx2, rx2) = channel::bounded(cap);
    (BiChannel::new(tx1, rx2), BiChannel::new(tx2, rx1))
}

/// Creates a bi-directional channel of unbounded capacity.
///
/// This type of channel can hold any number of messages in
/// either direction (ie: it has infinite capacity on both sides)
///
/// # Type Arguments
///
/// Unforunately it is often necessary to annotate types to help the compiler;
/// the type `T` refers to the type the left channel receives and the right
/// channel sends, and the type `U` inversely refers to the type the right
/// channel receives and the left channel sends.
///
/// # Examples
///
/// ```rust
/// # use doublecross::unbounded;
/// let (left, right) = unbounded::<i32, i32>();
/// left.send(10);
/// assert_eq!(right.recv(), Some(10));
/// ```
pub fn unbounded<T, U>() -> (BiChannel<T, U>, BiChannel<U, T>) {
    let (tx1, rx1) = channel::unbounded();
    let (tx2, rx2) = channel::unbounded();
    (BiChannel::new(tx1, rx2), BiChannel::new(tx2, rx1))
}

/// Bi-directional communication build on, and
/// usable with, crossbeam-channel channels.
pub struct BiChannel<T, U> {
    pub rx: Receiver<T>,
    pub tx: Sender<U>,
}

impl<T, U> BiChannel<T, U> {
    pub fn new(tx: Sender<U>, rx: Receiver<T>) -> Self {
        BiChannel { rx, tx }
    }

    pub fn send(&self, msg: U) {
        self.tx.send(msg)
    }

    pub fn recv(&self) -> Option<T> {
        self.rx.recv()
    }
}

impl<'a, T, U> RecvArgument<'a, T> for &'a BiChannel<T, U> {
    type Iter = option::IntoIter<&'a Receiver<T>>;

    fn _as_recv_argument(&'a self) -> Self::Iter {
        Some(&self.rx).into_iter()
    }
}

impl<'a, T, U> SendArgument<'a, T> for &'a BiChannel<U, T> {
    type Iter = option::IntoIter<&'a Sender<T>>;

    fn _as_send_argument(&'a self) -> Self::Iter {
        Some(&self.tx).into_iter()
    }
}

#[cfg(test)]
mod tests {
    use channel;
    use std::thread;
    use std::time::Duration;

    #[test]
    fn simultaneous_handover() {
        let (left, right) = super::bounded(1);
        left.send(10);
        right.send(20);
        assert_eq!(left.recv(), Some(20));
        assert_eq!(right.recv(), Some(10));
    }

    #[test]
    fn rendezvous_recv_select() {
        let (left, right) = super::bounded::<(), ()>(0);
        let timeout = Duration::from_millis(10);

        thread::spawn(move || {
            left.send(());
        });

        select! {
            recv(right, _msg) => {},
            recv(channel::after(timeout)) => {
                panic!("timeout waiting for rendezvous");
            },
        }
    }

    #[test]
    fn rendezvous_send_select() {
        let (left, right) = super::bounded::<(), ()>(0);
        let timeout = Duration::from_millis(10);

        thread::spawn(move || {
            left.recv();
        });

        select! {
            send(right, ()) => {},
            recv(channel::after(timeout)) => {
                panic!("timeout waiting for rendezvous");
            },
        }
    }

    #[test]
    fn asymmetric_message_types() {
        let (left, right) = super::unbounded::<u8, i16>();
        left.send(0i16);
        assert_eq!(right.recv().unwrap(), 0i16);
        right.send(0u8);
        assert_eq!(left.recv().unwrap(), 0u8);
    }
}