use std::{
sync::Arc,
cell::UnsafeCell,
};
use crate::{
queue::{
waker::Checker,
unbounded::SpscUnbounded,
bounded::{Bounded, SpscBounded},
},
error::{SendError, RecvError, TrySendError, TryRecvError},
};
#[inline(always)]
pub fn bounded<T: Send>(size: u32) -> (BSender<T>, BReceiver<T>) {
let queue = Arc::new(UnsafeCell::new(SpscBounded::new(size)));
(BSender::new(queue.clone()), BReceiver::new(queue))
}
pub struct BSender<T> {
core: Arc<UnsafeCell<SpscBounded<T>>>,
}
unsafe impl<T: Send> Send for BSender<T> {}
unsafe impl<T: Send> Sync for BSender<T> {}
impl<T: Send> Clone for BSender<T> {
#[inline(always)]
fn clone(&self) -> Self {
Self { core: self.core.clone() }
}
}
impl<T: Send> BSender<T> {
#[inline(always)]
fn new(inner: Arc<UnsafeCell<SpscBounded<T>>>) -> Self <> {
Self { core: inner }
}
#[inline(always)]
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
unsafe { (*self.core.get()).try_send(value) }
}
#[inline(always)]
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
unsafe { (*self.core.get()).send(value, (*self.core.get()).cast()) }
}
#[inline(always)]
pub fn length(&self) -> u32 {
unsafe { (*self.core.get()).length() }
}
#[inline(always)]
pub fn close(&self) {
unsafe { (*self.core.get()).close() }
}
#[inline(always)]
pub fn is_close(&self) -> bool {
unsafe { (*self.core.get()).is_close() }
}
}
pub struct BReceiver<T> {
core: Arc<UnsafeCell<SpscBounded<T>>>,
}
unsafe impl<T: Send> Send for BReceiver<T> {}
unsafe impl<T: Send> Sync for BReceiver<T> {}
impl<T: Send> Clone for BReceiver<T> {
#[inline(always)]
fn clone(&self) -> Self {
Self { core: self.core.clone() }
}
}
impl<T: Send> BReceiver<T> {
#[inline(always)]
fn new(inner: Arc<UnsafeCell<SpscBounded<T>>>) -> Self <> {
Self { core: inner }
}
#[inline(always)]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
unsafe { (*self.core.get()).try_recv() }
}
#[inline(always)]
pub fn recv(&self) -> Result<T, RecvError> {
unsafe { (*self.core.get()).recv((*self.core.get()).cast()) }
}
#[inline(always)]
pub fn length(&self) -> u32 {
unsafe { (*self.core.get()).length() }
}
#[inline(always)]
pub fn close(&self) {
unsafe { (*self.core.get()).close() }
}
#[inline(always)]
pub fn is_close(&self) -> bool {
unsafe { (*self.core.get()).is_close() }
}
}
#[inline(always)]
pub fn unbounded<T: Send>() -> (USender<T>, UReceiver<T>) {
let queue = Arc::new(UnsafeCell::new(SpscUnbounded::default()));
(USender::new(queue.clone()), UReceiver::new(queue))
}
pub struct USender<T> {
core: Arc<UnsafeCell<SpscUnbounded<T>>>,
}
unsafe impl<T: Send> Send for USender<T> {}
unsafe impl<T: Send> Sync for USender<T> {}
impl<T: Send> Clone for USender<T> {
#[inline(always)]
fn clone(&self) -> Self {
Self { core: self.core.clone() }
}
}
impl<T: Send> USender<T> {
#[inline(always)]
fn new(inner: Arc<UnsafeCell<SpscUnbounded<T>>>) -> Self <> {
Self { core: inner }
}
#[inline(always)]
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
unsafe { (*self.core.get()).send(value) }
}
#[inline(always)]
pub fn close(&self) {
unsafe { (*self.core.get()).close() }
}
}
pub struct UReceiver<T> {
core: Arc<UnsafeCell<SpscUnbounded<T>>>,
}
unsafe impl<T: Send> Send for UReceiver<T> {}
unsafe impl<T: Send> Sync for UReceiver<T> {}
impl<T: Send> Clone for UReceiver<T> {
#[inline(always)]
fn clone(&self) -> Self {
Self { core: self.core.clone() }
}
}
impl<T: Send> UReceiver<T> {
#[inline(always)]
fn new(inner: Arc<UnsafeCell<SpscUnbounded<T>>>) -> Self <> {
Self { core: inner }
}
#[inline(always)]
pub fn recv(&self) -> Result<T, RecvError> {
unsafe { (*self.core.get()).recv() }
}
#[inline(always)]
pub fn close(&self) {
unsafe { (*self.core.get()).close() }
}
}
mod test {
#[test]
fn bounds() {
fn is_send<T: Send>() {}
is_send::<crate::queue::spsc::BSender<i32>>();
is_send::<crate::queue::spsc::BReceiver<i32>>();
}
#[test]
fn unbound() {
fn is_send<T: Send>() {}
is_send::<crate::queue::spsc::USender<i32>>();
is_send::<crate::queue::spsc::UReceiver<i32>>();
}
#[test]
fn send_recv() {
let (tx_b, rx_b) = crate::queue::spsc::bounded(3);
tx_b.try_send(1).unwrap();
assert_eq!(rx_b.try_recv().unwrap(), 1);
let (tx_u, rx_u) = crate::queue::spsc::unbounded();
tx_u.send(1).unwrap();
assert_eq!(rx_u.recv().unwrap(), 1);
}
#[test]
fn send_shared_recv() {
let (tx_b1, rx_b) = crate::queue::spsc::bounded(4);
let tx_b2 = tx_b1.clone();
tx_b1.send(1).unwrap();
assert_eq!(rx_b.recv().unwrap(), 1);
tx_b2.send(2).unwrap();
assert_eq!(rx_b.recv().unwrap(), 2);
let (tx_u1, rx_u) = crate::queue::spsc::unbounded();
let tx_u2 = tx_u1.clone();
tx_u1.send(1).unwrap();
assert_eq!(rx_u.recv().unwrap(), 1);
tx_u2.send(2).unwrap();
assert_eq!(rx_u.recv().unwrap(), 2);
}
#[test]
fn send_recv_threads() {
let (tx_b, rx_b) = crate::queue::spsc::bounded(4);
let thread = std::thread::spawn(move || {
tx_b.send(1).unwrap();
});
assert_eq!(rx_b.recv().unwrap(), 1);
thread.join().unwrap();
let (tx_u, rx_u) = crate::queue::spsc::unbounded();
let thread = std::thread::spawn(move || {
tx_u.send(1).unwrap();
});
assert_eq!(rx_u.recv().unwrap(), 1);
thread.join().unwrap();
}
#[test]
fn send_recv_threads_no_capacity() {
let (tx, rx) = crate::queue::spsc::bounded(0);
let thread = std::thread::spawn(move || {
tx.send(1).unwrap();
tx.send(2).unwrap();
});
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(rx.recv().unwrap(), 1);
std::thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(rx.recv().unwrap(), 2);
thread.join().unwrap();
}
#[test]
fn send_close_gets_none() {
let (tx_b, rx_b) = crate::queue::spsc::bounded::<i32>(1);
let thread = std::thread::spawn(move || {
assert!(rx_b.recv().is_err());
});
tx_b.close();
thread.join().unwrap();
let (tx_u, rx_u) = crate::queue::spsc::unbounded::<i32>();
let thread = std::thread::spawn(move || {
assert!(rx_u.recv().is_err());
});
std::thread::sleep(std::time::Duration::from_millis(1000));
tx_u.close();
thread.join().unwrap();
}
#[test]
fn spsc_no_capacity() {
let amt = 30000;
let (tx, rx) = crate::queue::spsc::bounded(0);
let txc = tx.clone();
std::thread::spawn(move || {
for _ in 0..amt {
assert_eq!(txc.send(1), Ok(()));
}
});
for _ in 0..amt {
assert_eq!(rx.recv(), Ok(1));
}
}
}