use std::fmt::{Debug, Formatter};
use std::time::Duration;
use tokio::sync::broadcast;
use crate::error::{RecvError, RecvTimeoutError, SendError, TryRecvError};
#[derive(Clone)]
pub struct Sender<T> {
inner: broadcast::Sender<T>,
}
pub struct Receiver<T> {
inner: broadcast::Receiver<T>,
}
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
self.inner.send(value).map_err(SendError::from).map(|_| ())
}
#[allow(dead_code)]
pub fn into_inner(self) -> broadcast::Sender<T> {
self.inner
}
}
impl<T: Clone> Receiver<T> {
pub async fn recv(&mut self) -> Result<T, RecvError> {
self.inner.recv().await.map_err(RecvError::from)
}
pub async fn recv_timeout(&mut self, timeout: Duration) -> Result<T, RecvTimeoutError> {
match tokio::time::timeout(timeout, self.inner.recv()).await {
Ok(result) => match result {
Ok(value) => Ok(value),
Err(err) => Err(match err {
broadcast::error::RecvError::Lagged(n) => RecvTimeoutError::Lagged(n),
broadcast::error::RecvError::Closed => RecvTimeoutError::Disconnected,
}),
},
Err(_) => Err(RecvTimeoutError::Timeout),
}
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.inner.try_recv().map_err(TryRecvError::from)
}
pub fn resubscribe(&self) -> Receiver<T> {
Self {
inner: self.inner.resubscribe(),
}
}
#[allow(dead_code)]
pub fn into_inner(self) -> broadcast::Receiver<T> {
self.inner
}
}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T> Debug for Sender<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Sender").finish_non_exhaustive()
}
}
impl<T: Clone> Debug for Receiver<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Receiver").finish_non_exhaustive()
}
}
impl<T: Clone> Clone for Receiver<T> {
fn clone(&self) -> Self {
self.resubscribe()
}
}
pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = broadcast::channel(capacity);
let sender = Sender { inner: tx };
let receiver = Receiver { inner: rx };
(sender, receiver)
}