use std::{future::Future, pin::Pin};
use async_broadcast::TrySendError;
use derivative::Derivative;
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
pub struct Broadcast<E>(
async_broadcast::Sender<E>,
async_broadcast::InactiveReceiver<E>,
);
#[derive(Debug, Derivative)]
#[derivative(Clone(bound = ""))]
pub struct Ring<E>(
async_broadcast::Sender<E>,
async_broadcast::InactiveReceiver<E>,
);
impl<E> Default for Broadcast<E> {
fn default() -> Self {
let (mut tx, rx) = async_broadcast::broadcast(16);
tx.set_await_active(false);
Self(tx, rx.deactivate())
}
}
impl<E> Ring<E> {
pub fn new(capacity: usize) -> Self {
let (mut tx, rx) = async_broadcast::broadcast(capacity);
tx.set_await_active(false);
tx.set_overflow(true);
Self(tx, rx.deactivate())
}
pub fn set_capacity(&mut self, capacity: usize) {
self.0.set_capacity(capacity);
}
pub fn sender_count(&self) -> usize {
self.0.sender_count()
}
}
impl<E: Clone> Ring<E> {
pub fn broadcast(&self, msg: E) {
assert!(!self.0.is_closed());
let result = self.0.try_broadcast(msg);
assert!(!matches!(result, Err(TrySendError::Full(_))));
}
}
pub type BroadcastOwned<E: Clone + 'static> = impl Future<Output = ()> + 'static;
impl<E: Clone> Broadcast<E> {
pub fn broadcast(&self, msg: E) -> impl Future<Output = ()> + '_ {
assert!(!self.0.is_closed());
async move {
let _ = self.0.broadcast(msg).await;
}
}
pub fn broadcast_owned(&self, msg: E) -> BroadcastOwned<E>
where
E: 'static,
{
assert!(!self.0.is_closed());
let sender = self.0.clone();
async move {
let _ = sender.broadcast(msg).await;
}
}
pub fn broadcast_reserve(&mut self, msg: E) {
assert!(!self.0.is_closed());
if self.0.is_full() {
self.0.set_capacity(self.0.capacity() * 2);
}
let result = self.0.try_broadcast(msg);
assert!(!matches!(result, Err(TrySendError::Full(_))));
}
}
#[derive(Debug)]
pub struct Receiver<E>(Option<async_broadcast::Receiver<E>>);
impl<E> Drop for Receiver<E> {
fn drop(&mut self) {
if let Some(receiver) = self.0.take() {
receiver.deactivate();
}
}
}
impl<E: Clone + 'static> futures_core::Stream for Receiver<E> {
type Item = E;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.0.as_mut().unwrap();
Pin::new(this).poll_next(cx)
}
}
impl<E: Clone + 'static> super::EventSource<E> for Broadcast<E> {
type Source = Receiver<E>;
fn subscribe(&self) -> Self::Source {
Receiver(Some(self.0.new_receiver()))
}
}
impl<E: Clone + 'static> super::EventSource<E> for Ring<E> {
type Source = Receiver<E>;
fn subscribe(&self) -> Self::Source {
Receiver(Some(self.0.new_receiver()))
}
}