axum-cometd 0.5.0

Framework for CometD server creation
Documentation
mod client_timeout;

use crate::{
    messages::SubscriptionMessage,
    types::{ClientId, ClientReceiver},
    LongPollingServiceContext,
};
use async_broadcast::{InactiveReceiver, SendError, Sender, TrySendError};
use std::{fmt::Debug, sync::Arc, time::Duration};
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;

#[derive(Debug)]
pub(crate) struct ClientSender {
    stop_signal: CancellationToken,
    start_timeout: Arc<Notify>,
    cancel_timeout: Arc<Notify>,
    tx: Sender<SubscriptionMessage>,
    inactive_rx: InactiveReceiver<SubscriptionMessage>,
}

impl ClientSender {
    #[inline]
    pub(crate) fn create(
        context: Arc<LongPollingServiceContext>,
        client_id: ClientId,
        timeout: Duration,
        tx: Sender<SubscriptionMessage>,
        inactive_rx: InactiveReceiver<SubscriptionMessage>,
    ) -> Self {
        let stop_signal = CancellationToken::new();
        let start_timeout = Arc::new(Notify::new());
        let cancel_timeout = Arc::new(Notify::new());

        client_timeout::spawn(
            context,
            client_id,
            timeout,
            stop_signal.clone(),
            start_timeout.clone(),
            cancel_timeout.clone(),
        );

        start_timeout.notify_waiters();

        Self {
            stop_signal,
            start_timeout,
            cancel_timeout,
            tx,
            inactive_rx,
        }
    }

    #[inline]
    pub(crate) fn subscribe(&self) -> ClientReceiver {
        self.cancel_timeout.notify_waiters();

        let start_timeout = self.start_timeout.clone();
        let rx = self.inactive_rx.activate_cloned();

        ClientReceiver::new(start_timeout, rx)
    }

    #[inline]
    pub(crate) async fn send(
        &self,
        msg: SubscriptionMessage,
    ) -> Result<(), SendError<SubscriptionMessage>> {
        match self.tx.try_broadcast(msg) {
            Ok(None) | Err(TrySendError::Inactive(_)) => Ok(()),
            Ok(Some(msg)) | Err(TrySendError::Full(msg)) => match self.tx.broadcast(msg).await {
                Ok(None) => Ok(()),
                Err(err) => Err(err),
                Ok(Some(_msg)) => unreachable!("broadcast overflow mode was enabled"),
            },
            Err(TrySendError::Closed(msg)) => Err(SendError(msg)),
        }
    }
}

impl Drop for ClientSender {
    fn drop(&mut self) {
        self.stop_signal.cancel();
    }
}