async_compatibility_layer/async_primitives/
broadcast.rs1#![allow(clippy::must_use_candidate, clippy::module_name_repetitions)]
2use crate::art::async_block_on;
3use crate::channel::{SendError, UnboundedReceiver, UnboundedRecvError, UnboundedSender};
4use async_lock::RwLock;
5
6use std::{
7 collections::HashMap,
8 fmt::Debug,
9 sync::{
10 atomic::{AtomicUsize, Ordering},
11 Arc,
12 },
13};
14
15struct BroadcastSenderInner<T> {
17 count: AtomicUsize,
19 outputs: RwLock<HashMap<usize, UnboundedSender<T>>>,
21}
22
23#[derive(Clone)]
25pub struct BroadcastSender<T> {
26 inner: Arc<BroadcastSenderInner<T>>,
28}
29
30impl<T> Debug for BroadcastSender<T> {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 f.debug_struct("BroadcastSender")
33 .field("inner", &"inner")
34 .finish()
35 }
36}
37
38impl<T> BroadcastSender<T>
39where
40 T: Clone,
41{
42 pub async fn send_async(&self, item: T) -> Result<(), SendError<T>> {
49 let map = self.inner.outputs.read().await;
50 for sender in map.values() {
51 sender.send(item.clone()).await?;
52 }
53 Ok(())
54 }
55
56 pub async fn handle_async(&self) -> BroadcastReceiver<T> {
58 let id = self.inner.count.fetch_add(1, Ordering::SeqCst);
59 let (send, recv) = crate::channel::unbounded();
60 let mut map = self.inner.outputs.write().await;
61 map.insert(id, send);
62 BroadcastReceiver {
63 id,
64 output: recv,
65 handle: self.clone(),
66 }
67 }
68
69 pub fn handle_sync(&self) -> BroadcastReceiver<T> {
71 async_block_on(self.handle_async())
72 }
73}
74
75pub struct BroadcastReceiver<T> {
77 id: usize,
79 output: UnboundedReceiver<T>,
81 handle: BroadcastSender<T>,
83}
84
85impl<T> BroadcastReceiver<T>
86where
87 T: Clone,
88{
89 pub async fn recv_async(&mut self) -> Result<T, UnboundedRecvError> {
95 self.output.recv().await
96 }
97
98 pub fn try_recv(&mut self) -> Option<T> {
100 self.output.try_recv().ok()
101 }
102
103 pub async fn clone_async(&self) -> Self {
105 self.handle.handle_async().await
106 }
107}
108
109impl<T> Drop for BroadcastReceiver<T> {
110 fn drop(&mut self) {
112 let mut map = async_block_on(self.handle.inner.outputs.write());
113 map.remove(&self.id);
114 }
115}
116
117impl<T> Clone for BroadcastReceiver<T>
118where
119 T: Clone,
120{
121 fn clone(&self) -> Self {
122 async_block_on(self.clone_async())
123 }
124}
125
126pub fn channel<T: Clone>() -> (BroadcastSender<T>, BroadcastReceiver<T>) {
128 let inner = BroadcastSenderInner {
129 count: AtomicUsize::from(0),
130 outputs: RwLock::new(HashMap::new()),
131 };
132 let input = BroadcastSender {
133 inner: Arc::new(inner),
134 };
135 let output = input.handle_sync();
136 (input, output)
137}