try_drop/drop_strategies/
broadcast.rs

1//! Types and traits for the broadcast drop strategy. This is built on top of the tokio broadcast
2//! channel.
3
4mod private {
5    pub trait Sealed {}
6}
7
8use crate::{FallibleTryDropStrategy, TryDropStrategy};
9
10use std::marker::PhantomData;
11
12use crate::adapters::ArcError;
13pub use tokio::runtime::Handle;
14use tokio::sync::broadcast;
15use tokio::sync::broadcast::error::SendError;
16use tokio::sync::broadcast::error::{RecvError, TryRecvError};
17pub use tokio::sync::broadcast::Receiver as AsyncReceiver;
18use tokio::sync::broadcast::{Receiver, Sender};
19
20/// An async receiver, which is made sync via blocking on a handle to the tokio runtime.
21#[cfg_attr(feature = "derives", derive(Debug))]
22pub struct BlockingReceiver<T> {
23    receiver: Receiver<T>,
24    handle: Handle,
25}
26
27impl<T: Clone> BlockingReceiver<T> {
28    pub(crate) fn new(receiver: Receiver<T>, handle: Handle) -> Self {
29        Self { receiver, handle }
30    }
31
32    /// Receive a message from the channel, blocking until one is available.
33    pub fn recv(&mut self) -> Result<T, RecvError> {
34        self.handle.block_on(self.receiver.recv())
35    }
36
37    /// Try to receive a message from the channel, without blocking.
38    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
39        self.receiver.try_recv()
40    }
41}
42
43/// How to handle errors when sending a message to all receivers.
44pub trait Mode: private::Sealed {}
45
46/// Continue on sending errors to nobody if no receivers are available.
47#[cfg_attr(
48    feature = "derives",
49    derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)
50)]
51pub enum OkIfAlone {}
52
53impl Mode for OkIfAlone {}
54
55impl private::Sealed for OkIfAlone {}
56
57/// Return an error if there are no receivers to send errors to.
58#[cfg_attr(
59    feature = "derives",
60    derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)
61)]
62pub enum NeedsReceivers {}
63
64impl Mode for NeedsReceivers {}
65
66impl private::Sealed for NeedsReceivers {}
67
68/// A drop strategy which broadcasts a drop error to all receivers.
69#[cfg_attr(feature = "derives", derive(Debug, Clone))]
70pub struct BroadcastDropStrategy<M: Mode> {
71    sender: Sender<ArcError>,
72    handle: Handle,
73    _mode: PhantomData<M>,
74}
75
76impl<M: Mode> BroadcastDropStrategy<M> {
77    /// Create a new broadcast drop strategy from a handle to the current tokio runtime.
78    pub fn new(capacity: usize) -> (Self, BlockingReceiver<ArcError>) {
79        Self::new_with(capacity, Handle::current())
80    }
81
82    /// Create a new broadcast drop strategy, with a handle to a tokio runtime.
83    pub fn new_with(capacity: usize, handle: Handle) -> (Self, BlockingReceiver<ArcError>) {
84        let (sender, receiver) = broadcast::channel(capacity);
85        let receiver = BlockingReceiver::new(receiver, handle.clone());
86
87        (
88            Self {
89                sender,
90                handle,
91                _mode: PhantomData,
92            },
93            receiver,
94        )
95    }
96
97    /// Subscribe to this drop strategy, receiving new errors.
98    pub fn subscribe(&self) -> BlockingReceiver<ArcError> {
99        BlockingReceiver::new(self.sender.subscribe(), self.handle.clone())
100    }
101}
102
103impl TryDropStrategy for BroadcastDropStrategy<OkIfAlone> {
104    fn handle_error(&self, error: crate::Error) {
105        let _ = self.sender.send(ArcError::new(error));
106    }
107}
108
109impl FallibleTryDropStrategy for BroadcastDropStrategy<NeedsReceivers> {
110    type Error = SendError<ArcError>;
111
112    fn try_handle_error(&self, error: crate::Error) -> Result<(), Self::Error> {
113        self.sender.send(ArcError::new(error)).map(|_| ())
114    }
115}