use std::marker::PhantomData;
use std::mem;
mod ring_buffer;
pub use crate::error::{RecvError, SendError, TryRecvError, TrySendError}; pub use ring_buffer::{AsyncReceiver, AsyncSender, Receiver, RecvFuture, SendFuture, Sender};
pub fn bounded<T: Send + Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let (p, r) = ring_buffer::new_channel(capacity);
(p, r)
}
pub fn bounded_async<T: Send + Clone>(capacity: usize) -> (AsyncSender<T>, AsyncReceiver<T>) {
let (p, r) = ring_buffer::new_channel(capacity);
(p.to_async(), r.to_async())
}
impl<T: Send + Clone> Sender<T> {
pub fn to_async(self) -> AsyncSender<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
AsyncSender {
shared,
_phantom: PhantomData,
}
}
}
impl<T: Send + Clone> AsyncSender<T> {
pub fn to_sync(self) -> Sender<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
mem::forget(self);
Sender {
shared,
_phantom: PhantomData,
}
}
}
impl<T: Send + Clone> Receiver<T> {
pub fn to_async(self) -> AsyncReceiver<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
let tail = unsafe { std::ptr::read(&self.tail) };
mem::forget(self);
AsyncReceiver { shared, tail }
}
}
impl<T: Send + Clone> AsyncReceiver<T> {
pub fn to_sync(self) -> Receiver<T> {
let shared = unsafe { std::ptr::read(&self.shared) };
let tail = unsafe { std::ptr::read(&self.tail) };
mem::forget(self);
Receiver { shared, tail }
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
pub const SHORT_TIMEOUT: Duration = Duration::from_millis(500);
pub const LONG_TIMEOUT: Duration = Duration::from_secs(3);
pub const STRESS_TIMEOUT: Duration = Duration::from_secs(15);
pub const ITEMS_LOW: usize = 50;
pub const ITEMS_MEDIUM: usize = 200;
pub const ITEMS_HIGH: usize = 1000;
#[test]
fn spmc_single_recv() {
let (mut tx, mut rx) = bounded(2);
assert_eq!(tx.len(), 0);
assert!(tx.is_empty());
assert!(!tx.is_full());
assert_eq!(rx.len(), 0);
assert!(rx.is_empty());
assert!(!rx.is_full());
tx.send(10).unwrap();
assert_eq!(tx.len(), 1);
assert!(!tx.is_empty());
assert_eq!(rx.len(), 1);
assert!(!rx.is_empty());
assert_eq!(rx.recv().unwrap(), 10);
assert_eq!(tx.len(), 0);
assert!(tx.is_empty());
assert_eq!(rx.len(), 0);
assert!(rx.is_empty());
}
#[test]
fn spmc_multiple_receivers_len_checks() {
let (mut tx, mut rx1) = bounded(4);
let mut rx2 = rx1.clone();
assert_eq!(tx.len(), 0);
assert!(tx.is_empty());
assert_eq!(rx1.len(), 0);
assert!(rx1.is_empty());
assert_eq!(rx2.len(), 0);
assert!(rx2.is_empty());
tx.send(1).unwrap(); assert_eq!(tx.len(), 1);
assert_eq!(rx1.len(), 1);
assert_eq!(rx2.len(), 1);
tx.send(2).unwrap(); assert_eq!(tx.len(), 2);
assert_eq!(rx1.len(), 2);
assert_eq!(rx2.len(), 2);
assert_eq!(rx1.recv().unwrap(), 1); assert_eq!(tx.len(), 2); assert_eq!(rx1.len(), 1); assert_eq!(rx2.len(), 2);
assert_eq!(rx2.recv().unwrap(), 1); assert_eq!(tx.len(), 1); assert_eq!(rx1.len(), 1);
assert_eq!(rx2.len(), 1);
assert_eq!(rx1.recv().unwrap(), 2); assert_eq!(tx.len(), 1); assert_eq!(rx1.len(), 0);
assert_eq!(rx2.len(), 1);
assert_eq!(rx2.recv().unwrap(), 2); assert_eq!(tx.len(), 0); assert!(tx.is_empty());
assert_eq!(rx1.len(), 0);
assert!(rx1.is_empty());
assert_eq!(rx2.len(), 0);
assert!(rx2.is_empty());
}
#[test]
fn spmc_sync_try_send() {
let (mut tx, mut rx1) = bounded(1);
let mut rx2 = rx1.clone();
assert!(tx.try_send(10).is_ok()); assert_eq!(tx.len(), 1);
assert!(tx.is_full());
match tx.try_send(20) {
Err(TrySendError::Full(val)) => assert_eq!(val, 20),
other => panic!("Expected TrySendError::Full, got {:?}", other),
}
assert_eq!(rx1.recv().unwrap(), 10); assert_eq!(tx.len(), 1); assert!(tx.is_full());
assert_eq!(rx2.recv().unwrap(), 10); assert_eq!(tx.len(), 0); assert!(!tx.is_full());
assert!(tx.is_empty());
assert!(tx.try_send(30).is_ok()); assert_eq!(tx.len(), 1);
assert!(tx.is_full());
}
#[test]
fn spmc_multiple_receivers() {
let (mut tx, mut rx1) = bounded(ITEMS_LOW);
let mut rx2 = rx1.clone();
let mut rx3 = rx1.clone();
for i in 0..ITEMS_LOW {
tx.send(i).unwrap();
}
assert_eq!(tx.len(), ITEMS_LOW);
assert!(tx.is_full());
let h1 = thread::spawn(move || {
for i in 0..ITEMS_LOW {
assert_eq!(rx1.recv().unwrap(), i);
}
});
let h2 = thread::spawn(move || {
for i in 0..ITEMS_LOW {
assert_eq!(rx2.recv().unwrap(), i);
}
});
let h3 = thread::spawn(move || {
for i in 0..ITEMS_LOW {
assert_eq!(rx3.recv().unwrap(), i);
}
});
h1.join().unwrap();
h2.join().unwrap();
h3.join().unwrap();
}
#[test]
fn spmc_sync_slow_consumer_blocks_producer() {
let (mut tx, mut rx_fast) = bounded(1); let mut rx_slow = rx_fast.clone();
tx.send(1).unwrap();
assert_eq!(tx.len(), 1);
assert!(tx.is_full());
assert_eq!(rx_fast.recv().unwrap(), 1);
assert_eq!(tx.len(), 1);
assert!(tx.is_full());
let send_handle = thread::spawn(move || {
tx.send(2).unwrap();
});
thread::sleep(SHORT_TIMEOUT);
assert!(!send_handle.is_finished(), "Sender should have blocked");
assert_eq!(rx_slow.recv().unwrap(), 1);
send_handle
.join()
.expect("Sender panicked or was not unblocked");
assert_eq!(rx_fast.len(), 1); assert_eq!(rx_slow.len(), 1);
assert_eq!(rx_fast.recv().unwrap(), 2);
assert_eq!(rx_slow.recv().unwrap(), 2);
assert!(rx_fast.is_empty());
assert!(rx_slow.is_empty());
}
#[test]
fn spmc_sync_all_receivers_drop_closes_channel() {
let (mut tx, rx) = bounded(2);
let rx2 = rx.clone();
tx.send(1).unwrap();
drop(rx);
drop(rx2);
assert_eq!(tx.send(2), Err(SendError::Closed));
}
}