try_drop/drop_strategies/
broadcast.rs1mod private {
5 pub trait Sealed {}
6}
7
8use crate::{FallibleTryDropStrategy, TryDropStrategy};
9
10use std::marker::PhantomData;
11
12use crate::adapters::ArcError;
13pub use tokio::runtime::Handle;
14use tokio::sync::broadcast;
15use tokio::sync::broadcast::error::SendError;
16use tokio::sync::broadcast::error::{RecvError, TryRecvError};
17pub use tokio::sync::broadcast::Receiver as AsyncReceiver;
18use tokio::sync::broadcast::{Receiver, Sender};
19
20#[cfg_attr(feature = "derives", derive(Debug))]
22pub struct BlockingReceiver<T> {
23 receiver: Receiver<T>,
24 handle: Handle,
25}
26
27impl<T: Clone> BlockingReceiver<T> {
28 pub(crate) fn new(receiver: Receiver<T>, handle: Handle) -> Self {
29 Self { receiver, handle }
30 }
31
32 pub fn recv(&mut self) -> Result<T, RecvError> {
34 self.handle.block_on(self.receiver.recv())
35 }
36
37 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
39 self.receiver.try_recv()
40 }
41}
42
43pub trait Mode: private::Sealed {}
45
46#[cfg_attr(
48 feature = "derives",
49 derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)
50)]
51pub enum OkIfAlone {}
52
53impl Mode for OkIfAlone {}
54
55impl private::Sealed for OkIfAlone {}
56
57#[cfg_attr(
59 feature = "derives",
60 derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)
61)]
62pub enum NeedsReceivers {}
63
64impl Mode for NeedsReceivers {}
65
66impl private::Sealed for NeedsReceivers {}
67
68#[cfg_attr(feature = "derives", derive(Debug, Clone))]
70pub struct BroadcastDropStrategy<M: Mode> {
71 sender: Sender<ArcError>,
72 handle: Handle,
73 _mode: PhantomData<M>,
74}
75
76impl<M: Mode> BroadcastDropStrategy<M> {
77 pub fn new(capacity: usize) -> (Self, BlockingReceiver<ArcError>) {
79 Self::new_with(capacity, Handle::current())
80 }
81
82 pub fn new_with(capacity: usize, handle: Handle) -> (Self, BlockingReceiver<ArcError>) {
84 let (sender, receiver) = broadcast::channel(capacity);
85 let receiver = BlockingReceiver::new(receiver, handle.clone());
86
87 (
88 Self {
89 sender,
90 handle,
91 _mode: PhantomData,
92 },
93 receiver,
94 )
95 }
96
97 pub fn subscribe(&self) -> BlockingReceiver<ArcError> {
99 BlockingReceiver::new(self.sender.subscribe(), self.handle.clone())
100 }
101}
102
103impl TryDropStrategy for BroadcastDropStrategy<OkIfAlone> {
104 fn handle_error(&self, error: crate::Error) {
105 let _ = self.sender.send(ArcError::new(error));
106 }
107}
108
109impl FallibleTryDropStrategy for BroadcastDropStrategy<NeedsReceivers> {
110 type Error = SendError<ArcError>;
111
112 fn try_handle_error(&self, error: crate::Error) -> Result<(), Self::Error> {
113 self.sender.send(ArcError::new(error)).map(|_| ())
114 }
115}