#![allow(clippy::must_use_candidate, clippy::module_name_repetitions)]
use crate::art::async_block_on;
use crate::channel::{SendError, UnboundedReceiver, UnboundedRecvError, UnboundedSender};
use async_lock::RwLock;
use std::{
collections::HashMap,
fmt::Debug,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
struct BroadcastSenderInner<T> {
count: AtomicUsize,
outputs: RwLock<HashMap<usize, UnboundedSender<T>>>,
}
#[derive(Clone)]
pub struct BroadcastSender<T> {
inner: Arc<BroadcastSenderInner<T>>,
}
impl<T> Debug for BroadcastSender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BroadcastSender")
.field("inner", &"inner")
.finish()
}
}
impl<T> BroadcastSender<T>
where
T: Clone,
{
pub async fn send_async(&self, item: T) -> Result<(), SendError<T>> {
let map = self.inner.outputs.read().await;
for sender in map.values() {
sender.send(item.clone()).await?;
}
Ok(())
}
pub async fn handle_async(&self) -> BroadcastReceiver<T> {
let id = self.inner.count.fetch_add(1, Ordering::SeqCst);
let (send, recv) = crate::channel::unbounded();
let mut map = self.inner.outputs.write().await;
map.insert(id, send);
BroadcastReceiver {
id,
output: recv,
handle: self.clone(),
}
}
pub fn handle_sync(&self) -> BroadcastReceiver<T> {
async_block_on(self.handle_async())
}
}
pub struct BroadcastReceiver<T> {
id: usize,
output: UnboundedReceiver<T>,
handle: BroadcastSender<T>,
}
impl<T> BroadcastReceiver<T>
where
T: Clone,
{
pub async fn recv_async(&mut self) -> Result<T, UnboundedRecvError> {
self.output.recv().await
}
pub fn try_recv(&mut self) -> Option<T> {
self.output.try_recv().ok()
}
pub async fn clone_async(&self) -> Self {
self.handle.handle_async().await
}
}
impl<T> Drop for BroadcastReceiver<T> {
fn drop(&mut self) {
let mut map = async_block_on(self.handle.inner.outputs.write());
map.remove(&self.id);
}
}
impl<T> Clone for BroadcastReceiver<T>
where
T: Clone,
{
fn clone(&self) -> Self {
async_block_on(self.clone_async())
}
}
pub fn channel<T: Clone>() -> (BroadcastSender<T>, BroadcastReceiver<T>) {
let inner = BroadcastSenderInner {
count: AtomicUsize::from(0),
outputs: RwLock::new(HashMap::new()),
};
let input = BroadcastSender {
inner: Arc::new(inner),
};
let output = input.handle_sync();
(input, output)
}