use std::marker::PhantomData;
use std::mem;
use std::sync::Arc;
mod ring_buffer;
pub use crate::error::{RecvError, SendError, TryRecvError};
pub use ring_buffer::{AsyncProducer, AsyncReceiver, Producer, Receiver, RecvFuture, SendFuture};
pub fn channel<T: Send + Clone>(capacity: usize) -> (Producer<T>, Receiver<T>) {
let (p, r) = ring_buffer::new_channel(capacity);
(p, r)
}
pub fn channel_async<T: Send + Clone>(capacity: usize) -> (AsyncProducer<T>, AsyncReceiver<T>) {
let (p, r) = ring_buffer::new_channel(capacity);
(p.to_async(), r.to_async())
}
impl<T: Send + Clone> Producer<T> {
pub fn to_async(self) -> AsyncProducer<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
AsyncProducer {
shared,
_phantom: PhantomData,
}
}
}
impl<T: Send + Clone> AsyncProducer<T> {
pub fn to_sync(self) -> Producer<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
Producer {
shared,
_phantom: PhantomData,
}
}
}
impl<T: Send + Clone> Receiver<T> {
pub fn to_async(self) -> AsyncReceiver<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
let tail = unsafe { std::ptr::read(&self.tail) };
mem::forget(self);
AsyncReceiver { shared, tail }
}
}
impl<T: Send + Clone> AsyncReceiver<T> {
pub fn to_sync(self) -> Receiver<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
let tail = unsafe { std::ptr::read(&self.tail) };
mem::forget(self);
Receiver { shared, tail }
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn spmc_single_recv() {
let (mut tx, mut rx) = channel(2);
tx.send(10).unwrap();
assert_eq!(rx.recv().unwrap(), 10);
}
#[test]
fn spmc_multiple_receivers() {
let (mut tx, mut rx1) = channel(4);
let mut rx2 = rx1.clone();
tx.send(1).unwrap();
tx.send(2).unwrap();
assert_eq!(rx1.recv().unwrap(), 1);
assert_eq!(rx2.recv().unwrap(), 1);
assert_eq!(rx1.recv().unwrap(), 2);
assert_eq!(rx2.recv().unwrap(), 2);
}
#[test]
fn spmc_drop_receiver_unblocks_producer() {
let (mut tx, mut rx_fast) = channel(1);
let rx_slow = rx_fast.clone();
tx.send(1).unwrap();
assert_eq!(rx_fast.recv().unwrap(), 1);
let send_handle = thread::spawn(move || {
tx.send(2).unwrap();
});
thread::sleep(Duration::from_millis(100));
assert!(!send_handle.is_finished());
drop(rx_slow);
send_handle
.join()
.expect("Producer should be unblocked and finish");
assert_eq!(rx_fast.recv().unwrap(), 2);
}
}