use std::{
mem::MaybeUninit,
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};
use ringbuf::{Consumer, HeapRb, Producer, SharedRb};
#[must_use]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum SendResult<T> {
Ok,
Full(T),
Disconnected,
}
impl<T> SendResult<T> {
pub fn is_disconnected(&self) -> bool {
matches!(self, Self::Disconnected)
}
pub fn is_ok(&self) -> bool {
matches!(self, Self::Ok)
}
}
impl<T: std::fmt::Debug> SendResult<T> {
pub fn unwrap(self) {
match self {
SendResult::Ok => return,
SendResult::Full(message) => panic!(
"failed to send message - queue was full when sending {:?}",
message
),
SendResult::Disconnected => panic!("client was disconnected when sending message"),
}
}
}
pub struct NonBlockingSender<T> {
inner: Producer<T, Arc<SharedRb<T, Vec<MaybeUninit<T>>>>>,
is_closed: Arc<AtomicBool>,
}
impl<T> NonBlockingSender<T> {
pub fn try_send(&mut self, message: T) -> SendResult<T> {
if self.is_closed.load(Ordering::SeqCst) {
SendResult::Disconnected
} else {
let res = self.inner.push(message);
if let Err(message) = res {
SendResult::Full(message)
} else {
SendResult::Ok
}
}
}
pub fn mpsc(self) -> MicroBlockingSender<T> {
return MicroBlockingSender {
inner: Arc::new(Mutex::new(self)),
};
}
}
impl<T> Drop for NonBlockingSender<T> {
fn drop(&mut self) {
self.is_closed.store(true, Ordering::SeqCst);
}
}
pub struct MicroBlockingSender<T> {
inner: Arc<Mutex<NonBlockingSender<T>>>,
}
impl<T> MicroBlockingSender<T> {
pub fn try_send(&self, message: T) -> SendResult<T> {
self.inner.lock().unwrap().try_send(message)
}
}
impl<T> Clone for MicroBlockingSender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
#[must_use]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum RecvResult<T> {
Ok(T),
Empty,
Disconnected,
}
impl<T> RecvResult<T> {
pub fn is_disconnected(&self) -> bool {
matches!(self, Self::Disconnected)
}
pub fn is_ok(&self) -> bool {
matches!(self, Self::Ok(_) | Self::Empty)
}
}
impl<T> RecvResult<T> {
pub fn unwrap(self) -> Option<T> {
match self {
RecvResult::Ok(message) => Some(message),
RecvResult::Empty => None,
RecvResult::Disconnected => panic!("receiver was disconnected when receiving message"),
}
}
}
pub struct NonBlockingReceiver<T> {
inner: Consumer<T, Arc<SharedRb<T, Vec<MaybeUninit<T>>>>>,
is_closed: Arc<AtomicBool>,
}
impl<T> NonBlockingReceiver<T> {
pub fn try_recv(&mut self) -> RecvResult<T> {
if self.is_closed.load(Ordering::SeqCst) {
RecvResult::Disconnected
} else {
let res = self.inner.pop();
if let Some(message) = res {
RecvResult::Ok(message)
} else {
RecvResult::Empty
}
}
}
}
impl<T> Drop for NonBlockingReceiver<T> {
fn drop(&mut self) {
self.is_closed.store(true, Ordering::SeqCst);
}
}
pub fn nonblocking_channel<T>(
capacity: NonZeroUsize,
) -> (NonBlockingSender<T>, NonBlockingReceiver<T>) {
let (sender, receiver) = HeapRb::<T>::new(capacity.get()).split();
let is_closed = Arc::new(AtomicBool::from(false));
let sender = NonBlockingSender {
inner: sender,
is_closed: Arc::clone(&is_closed),
};
let receiver = NonBlockingReceiver {
inner: receiver,
is_closed,
};
return (sender, receiver);
}