use core::ops::{Deref, DerefMut};
use core::time::Duration;
use std::sync::Arc;
pub use broadcast::error::{RecvError, TryRecvError};
use tokio::sync::{broadcast, Mutex};
use tokio::time::sleep;
#[inline]
pub(crate) fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
let (sender, receiver) = broadcast::channel(capacity);
let sender = Sender::new(sender, capacity);
let receiver = Receiver::new(receiver);
(sender, receiver)
}
pub(crate) struct Receiver<T> {
inner: broadcast::Receiver<T>,
}
impl<T> Receiver<T> {
#[inline]
fn new(inner: broadcast::Receiver<T>) -> Self {
Self { inner }
}
}
impl<T> Deref for Receiver<T> {
type Target = broadcast::Receiver<T>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> DerefMut for Receiver<T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
#[derive(Clone)]
pub(crate) struct Sender<T> {
channel_capacity: usize,
inner: broadcast::Sender<T>,
lock: Arc<Mutex<()>>,
}
impl<T> Sender<T> {
#[inline]
fn queued(&self) -> usize {
self.inner.len()
}
#[inline]
fn new(inner: broadcast::Sender<T>, channel_capacity: usize) -> Self {
Self { channel_capacity, inner, lock: Arc::default() }
}
#[inline]
fn num_senders(&self) -> usize {
self.inner.receiver_count()
}
#[inline]
pub(crate) async fn send(&self, value: T) -> anyhow::Result<()> {
if self.queued() + self.num_senders() <= self.channel_capacity {
return self.send_inner(value);
}
let check_queue_status_interval = Duration::from_millis(50);
loop {
let guard = self.lock.lock().await;
if self.queued() < self.channel_capacity {
return self.send_inner(value);
}
drop(guard);
sleep(check_queue_status_interval).await;
}
}
#[inline]
fn send_inner(&self, value: T) -> anyhow::Result<()> {
self.inner
.send(value)
.map(|_| ())
.map_err(|_| anyhow::anyhow!("couldn't send value"))
}
#[inline]
pub(crate) fn subscribe(&self) -> Receiver<T> {
Receiver::new(self.inner.subscribe())
}
}