1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
//! Zero dependency `std::sync` based bidirectional channels. Each side can send
//! and receive with its counterpart.
//!
//! Note, the default `Channel` inherits `!Sync` from `std::sync::mpsc::Receiver`. If you
//! would prefer, a `crossbeam` implementation is available by enabling the `crossbeam` flag. In
//! addition to its desirable performance characteristics, it also drops this `!Sync` constraint.
//!
//! ## Getting Started
//!
//! ```toml
//! bichannel = "1"
//! ```
//!
//! **Example Usage**
//!
//! ```
//! let (left, right) = bichannel::channel();
//!
//! // Send from the left to the right
//! left.send(1).unwrap();
//! assert_eq!(Ok(1), right.recv());
//!
//! // Send from the right to the left
//! right.send(2).unwrap();
//! assert_eq!(Ok(2), left.recv());
//! ```
//!
//! ## License
//! TODO MIT/APACHE
//!
//! ## Contributing
//!  
//! Bug reports, feature requests, and contributions are warmly welcomed.
//!
//! NOTE: This README uses [cargo-readme](https://github.com/livioribeiro/cargo-readme). To
//! update the README, use `cargo readme > README.md`

#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::{channel as create_channel, Receiver, Sender};

#[cfg(not(feature = "crossbeam"))]
pub use std::sync::mpsc::RecvError;
#[cfg(not(feature = "crossbeam"))]
pub use std::sync::mpsc::SendError;
#[cfg(not(feature = "crossbeam"))]
pub use std::sync::mpsc::TryRecvError;
#[cfg(not(feature = "crossbeam"))]
pub use std::sync::mpsc::TrySendError;

#[cfg(feature = "crossbeam")]
use crossbeam_channel::{unbounded as create_channel, Receiver, Sender};

#[cfg(feature = "crossbeam")]
pub use crossbeam_channel::RecvError;
#[cfg(feature = "crossbeam")]
pub use crossbeam_channel::SendError;
#[cfg(feature = "crossbeam")]
pub use crossbeam_channel::TryRecvError;
#[cfg(feature = "crossbeam")]
pub use crossbeam_channel::TrySendError;

/// One side of a bidirectional channel. This channel can send to and receive from its
/// counterpart.
///
/// # Examples
///
/// ```
/// let (l, r) = bichannel::channel();
///  
/// l.send(1).unwrap();
/// assert_eq!(Ok(1), r.recv());
///
/// r.send(1).unwrap();
/// assert_eq!(Ok(1), l.recv());
///
/// ```
#[derive(Debug)]
pub struct Channel<S, R> {
    sender: Sender<S>,
    receiver: Receiver<R>,
}

#[cfg(feature = "crossbeam")]
impl<S, R> Clone for Channel<S, R> {
    fn clone(&self) -> Self {
        Channel {
            sender: self.sender.clone(),
            receiver: self.receiver.clone(),
        }
    }
}

impl<S, R> Channel<S, R> {
    /// See mpsc::Sender::send
    ///
    /// Attempts to send a value to the other side of this channel, returning it back if it could
    /// not be sent.
    ///
    /// A successful send occurs when it is determined that the other end of
    /// the channel has not hung up already. An unsuccessful send would be one
    /// where the corresponding receiver has already been deallocated. Note
    /// that a return value of [`Err`] means that the data will never be
    /// received, but a return value of [`Ok`] does *not* mean that the data
    /// will be received. It is possible for the corresponding receiver to
    /// hang up immediately after this function returns [`Ok`].
    ///
    /// This method will never block the current thread.
    ///
    /// # Examples
    ///
    /// ```
    ///
    /// let (l, r) = bichannel::channel();
    ///
    /// // This send is always successful
    /// l.send(1).unwrap();
    ///
    /// // This send will fail because the receiver is gone
    /// drop(l);
    /// assert_eq!(r.send(1).unwrap_err().0, 1);
    /// ```
    pub fn send(&self, s: S) -> Result<(), SendError<S>> {
        self.sender.send(s)
    }

    /// See mpsc::Receiver::recv
    ///
    /// Attempts to wait for a value from the other side, returning an error if the
    /// other side has hung up.
    ///
    /// This function will always block the current thread if there is no data
    /// available and it's possible for more data to be sent. Once a message is
    /// sent from the other side then this will wake up and return that message.
    ///
    /// If the corresponding channel has disconnected, or it disconnects while
    /// this call is blocking, this call will wake up and return [`Err`] to
    /// indicate that no more messages can ever be received on this channel.
    /// However, since channels are buffered, messages sent before the disconnect
    /// will still be properly received.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::thread;
    ///
    /// let (left, right) = bichannel::channel::<u8, u8>();
    ///
    /// let _result = thread::spawn(move || {
    ///     right.send(1u8).unwrap();
    /// }).join().unwrap();
    ///
    /// assert_eq!(Ok(1), left.recv());
    /// ```
    ///
    /// Buffering behavior:
    ///
    /// ```
    /// use std::sync::mpsc;
    /// use std::thread;
    /// use std::sync::mpsc::RecvError;
    ///
    /// let (send, recv) = mpsc::channel();
    /// let handle = thread::spawn(move || {
    ///     send.send(1u8).unwrap();
    ///     send.send(2).unwrap();
    ///     send.send(3).unwrap();
    ///     drop(send);
    /// });
    ///
    /// // wait for the thread to join so we ensure the sender is dropped
    /// handle.join().unwrap();
    ///
    /// assert_eq!(Ok(1), recv.recv());
    /// assert_eq!(Ok(2), recv.recv());
    /// assert_eq!(Ok(3), recv.recv());
    /// assert_eq!(Err(RecvError), recv.recv());
    /// ```
    pub fn recv(&self) -> Result<R, RecvError> {
        self.receiver.recv()
    }

    /// See mpsc::Receiver::try_recv.
    ///
    /// Attempts to return a pending value from the other side without blocking.
    ///
    /// This method will never block the caller in order to wait for data to
    /// become available. Instead, this will always return immediately with a
    /// possible option of pending data on the channel.
    ///
    /// This is useful for a flavor of "optimistic check" before deciding to
    /// block on a receiver.
    ///
    /// Compared with [`recv`], this function has two failure cases instead of one
    /// (one for disconnection, one for an empty buffer).
    ///
    /// [`recv`]: Self::recv
    ///
    /// # Examples
    ///
    /// ```rust
    ///
    /// let (_, right) = bichannel::channel::<(), ()>();
    ///
    /// assert!(right.try_recv().is_err());
    /// ```
    pub fn try_recv(&self) -> Result<R, TryRecvError> {
        self.receiver.try_recv()
    }
}

/// Creates a bichannelrectional channel returning the left and right
/// sides. Each side can send and receive from its counterpart
///
/// # Examples
///
/// ```
/// let (left, right) = bichannel::channel::<&'static str, &'static str>();
///
/// left.send("ping").unwrap();
///
/// assert_eq!(right.recv().unwrap(), "ping");
/// ```
pub fn channel<T, U>() -> (Channel<T, U>, Channel<U, T>) {
    let (ls, lr) = create_channel();
    let (rs, rr) = create_channel();

    (
        Channel {
            sender: ls,
            receiver: rr,
        },
        Channel {
            sender: rs,
            receiver: lr,
        },
    )
}

#[cfg(test)]
mod examples {

    #[test]
    fn test_threaded_scenario() {
        let (thread, main) = crate::channel();

        let handle = std::thread::spawn(move || loop {
            match main.try_recv() {
                Ok("stop") => break "stopped",
                Err(crate::TryRecvError::Empty) => (),
                _ => main.send("cant stop").unwrap(),
            }
        });

        thread.send("slow down").unwrap();
        assert_eq!(thread.recv().unwrap(), "cant stop");

        thread.send("stop").unwrap();
        assert_eq!(handle.join().unwrap(), "stopped");
    }

    //    #[test]
    //    fn test_arc_scenario() {
    //        let (l, r) = crate::channel::<i8, i8>();
    //
    //        let wrapped = std::sync::Arc::new(l);
    //
    //        {
    //            let wrapped = wrapped.clone();
    //            std::thread::spawn(move || {
    //                wrapped.recv().unwrap();
    //            });
    //        }
    //
    //        wrapped.recv().unwrap();
    //    }
    //

    // fn test_fut_scenario()
}