mod private {
pub trait Sealed {}
}
use crate::{FallibleTryDropStrategy, TryDropStrategy};
use std::marker::PhantomData;
use crate::adapters::ArcError;
pub use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::SendError;
use tokio::sync::broadcast::error::{RecvError, TryRecvError};
pub use tokio::sync::broadcast::Receiver as AsyncReceiver;
use tokio::sync::broadcast::{Receiver, Sender};
#[cfg_attr(feature = "derives", derive(Debug))]
pub struct BlockingReceiver<T> {
receiver: Receiver<T>,
handle: Handle,
}
impl<T: Clone> BlockingReceiver<T> {
pub(crate) fn new(receiver: Receiver<T>, handle: Handle) -> Self {
Self { receiver, handle }
}
pub fn recv(&mut self) -> Result<T, RecvError> {
self.handle.block_on(self.receiver.recv())
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.receiver.try_recv()
}
}
pub trait Mode: private::Sealed {}
#[cfg_attr(
feature = "derives",
derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)
)]
pub enum OkIfAlone {}
impl Mode for OkIfAlone {}
impl private::Sealed for OkIfAlone {}
#[cfg_attr(
feature = "derives",
derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)
)]
pub enum NeedsReceivers {}
impl Mode for NeedsReceivers {}
impl private::Sealed for NeedsReceivers {}
#[cfg_attr(feature = "derives", derive(Debug, Clone))]
pub struct BroadcastDropStrategy<M: Mode> {
sender: Sender<ArcError>,
handle: Handle,
_mode: PhantomData<M>,
}
impl<M: Mode> BroadcastDropStrategy<M> {
pub fn new(capacity: usize) -> (Self, BlockingReceiver<ArcError>) {
Self::new_with(capacity, Handle::current())
}
pub fn new_with(capacity: usize, handle: Handle) -> (Self, BlockingReceiver<ArcError>) {
let (sender, receiver) = broadcast::channel(capacity);
let receiver = BlockingReceiver::new(receiver, handle.clone());
(
Self {
sender,
handle,
_mode: PhantomData,
},
receiver,
)
}
pub fn subscribe(&self) -> BlockingReceiver<ArcError> {
BlockingReceiver::new(self.sender.subscribe(), self.handle.clone())
}
}
impl TryDropStrategy for BroadcastDropStrategy<OkIfAlone> {
fn handle_error(&self, error: crate::Error) {
let _ = self.sender.send(ArcError::new(error));
}
}
impl FallibleTryDropStrategy for BroadcastDropStrategy<NeedsReceivers> {
type Error = SendError<ArcError>;
fn try_handle_error(&self, error: crate::Error) -> Result<(), Self::Error> {
self.sender.send(ArcError::new(error)).map(|_| ())
}
}