axum-cometd 0.5.0

Framework for CometD server creation
Documentation
use crate::messages::SubscriptionMessage;
use async_broadcast::{Receiver, RecvError};
use std::{fmt::Debug, sync::Arc, time::Duration};
use tokio::{sync::Notify, time};

#[derive(Debug)]
pub(crate) struct ClientReceiver {
    start_timeout: Arc<Notify>,
    rx: Receiver<SubscriptionMessage>,
}

impl ClientReceiver {
    #[inline(always)]
    pub(crate) fn new(start_timeout: Arc<Notify>, rx: Receiver<SubscriptionMessage>) -> Self {
        Self { start_timeout, rx }
    }

    #[inline]
    pub(crate) async fn recv_timeout(
        &mut self,
        duration: Duration,
    ) -> Result<Option<SubscriptionMessage>, time::error::Elapsed> {
        time::timeout(duration, async {
            match self.rx.recv().await {
                Ok(data) => Some(data),
                Err(RecvError::Closed) => None,
                Err(RecvError::Overflowed(_)) => {
                    unreachable!("broadcast overflow mode was enabled")
                }
            }
        })
        .await
    }
}

impl Drop for ClientReceiver {
    fn drop(&mut self) {
        self.start_timeout.notify_waiters();
    }
}