collab-server 0.0.7

Nomad's collab server
Documentation
use core::ops::{Deref, DerefMut};
use core::time::Duration;
use std::sync::Arc;

pub use broadcast::error::{RecvError, TryRecvError};
use tokio::sync::{broadcast, Mutex};
use tokio::time::sleep;

#[inline]
pub(crate) fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
    let (sender, receiver) = broadcast::channel(capacity);
    let sender = Sender::new(sender, capacity);
    let receiver = Receiver::new(receiver);
    (sender, receiver)
}

pub(crate) struct Receiver<T> {
    inner: broadcast::Receiver<T>,
}

impl<T> Receiver<T> {
    /// TODO: docs
    #[inline]
    fn new(inner: broadcast::Receiver<T>) -> Self {
        Self { inner }
    }
}

impl<T> Deref for Receiver<T> {
    type Target = broadcast::Receiver<T>;

    #[inline]
    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

impl<T> DerefMut for Receiver<T> {
    #[inline]
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.inner
    }
}

#[derive(Clone)]
pub(crate) struct Sender<T> {
    channel_capacity: usize,
    inner: broadcast::Sender<T>,
    lock: Arc<Mutex<()>>,
}

impl<T> Sender<T> {
    /// TODO: docs
    #[inline]
    fn queued(&self) -> usize {
        self.inner.len()
    }

    /// TODO: docs
    #[inline]
    fn new(inner: broadcast::Sender<T>, channel_capacity: usize) -> Self {
        Self { channel_capacity, inner, lock: Arc::default() }
    }

    /// TODO: docs
    #[inline]
    fn num_senders(&self) -> usize {
        self.inner.receiver_count()
    }

    /// TODO: docs
    #[inline]
    pub(crate) async fn send(&self, value: T) -> anyhow::Result<()> {
        if self.queued() + self.num_senders() <= self.channel_capacity {
            return self.send_inner(value);
        }

        let check_queue_status_interval = Duration::from_millis(50);

        loop {
            let guard = self.lock.lock().await;

            if self.queued() < self.channel_capacity {
                return self.send_inner(value);
            }

            drop(guard);

            sleep(check_queue_status_interval).await;
        }
    }

    #[inline]
    fn send_inner(&self, value: T) -> anyhow::Result<()> {
        self.inner
            .send(value)
            .map(|_| ())
            .map_err(|_| anyhow::anyhow!("couldn't send value"))
    }

    /// TODO: docs
    #[inline]
    pub(crate) fn subscribe(&self) -> Receiver<T> {
        Receiver::new(self.inner.subscribe())
    }
}