use crate::RuntimeError;
use std::fmt;
use tokio::sync::mpsc as tokio_mpsc;
pub fn mpsc<T>(cap: usize) -> Result<(MpscSender<T>, MpscReceiver<T>), RuntimeError> {
match cap {
0 => Err(RuntimeError::InvalidArgument(
"mpsc channel capacity must be greater than zero".into(),
)),
_ => {
let (tx, rx) = tokio_mpsc::channel(cap);
Ok((MpscSender { inner: tx }, MpscReceiver { inner: rx }))
}
}
}
pub struct MpscSender<T> {
inner: tokio_mpsc::Sender<T>,
}
impl<T> MpscSender<T> {
pub fn send(&self, value: T) -> Result<(), RuntimeError> {
self.inner
.blocking_send(value)
.map_err(|_| RuntimeError::ChannelClosed)
}
pub fn try_send(&self, value: T) -> Result<(), RuntimeError> {
self.inner.try_send(value).map_err(|e| match e {
tokio_mpsc::error::TrySendError::Full(_) => RuntimeError::ChannelFull,
tokio_mpsc::error::TrySendError::Closed(_) => RuntimeError::ChannelClosed,
})
}
}
impl<T> Clone for MpscSender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T> fmt::Debug for MpscSender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MpscSender").finish()
}
}
pub struct MpscReceiver<T> {
inner: tokio_mpsc::Receiver<T>,
}
impl<T> MpscReceiver<T> {
pub async fn recv(&mut self) -> Option<T> {
self.inner.recv().await
}
pub fn close(&mut self) {
self.inner.close();
}
}
impl<T> fmt::Debug for MpscReceiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MpscReceiver").finish()
}
}