lapin 4.9.1

AMQP client library
Documentation
use crate::Error;
use flume::{Receiver, Sender};
use futures_core::Stream;
use std::sync::Arc;

#[derive(Clone, Debug)]
// Wrap in an arc not to tamper with receiver count
pub(crate) struct Events(Arc<Inner>);

#[derive(Debug)]
struct Inner {
    sender: Sender<Event>,
    receiver: Receiver<Event>,
}

impl Events {
    pub(crate) fn new() -> Self {
        let (sender, receiver) = flume::unbounded();
        Self(Arc::new(Inner { sender, receiver }))
    }

    pub(crate) fn sender(&self) -> EventsSender {
        EventsSender(self.0.sender.clone())
    }

    pub(crate) fn listener(&self) -> impl Stream<Item = Event> + Send + 'static {
        self.0.receiver.clone().into_stream()
    }
}

#[derive(Clone, Debug)]
pub(crate) struct EventsSender(Sender<Event>);

impl EventsSender {
    fn send(&self, event: Event) {
        // Do nothing if we don't have at least one external receiver
        if self.0.receiver_count() > 1 {
            // The only possibility of error is if we have several external receivers and the
            // connection was already dropped, so we can safely ignore this.
            let _ = self.0.send(event);
        }
    }

    pub(crate) fn connected(&self) {
        self.send(Event::Connected);
    }

    pub(crate) fn connection_blocked(&self, reason: String) {
        self.send(Event::ConnectionBlocked(reason));
    }

    pub(crate) fn connection_unblocked(&self) {
        self.send(Event::ConnectionUnblocked);
    }

    pub(crate) fn send_flow(&self, active: bool) {
        self.send(Event::SendFlow(active));
    }

    pub(crate) fn error(&self, error: Error) {
        self.send(Event::Error(error));
    }
}

/// A connection-level event delivered via [`Connection::events_listener`].
///
/// The stream produced by [`Connection::events_listener`] emits these values
/// as the connection progresses through its lifecycle.
///
/// [`Connection::events_listener`]: crate::Connection::events_listener
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Event {
    /// The connection has been established (or re-established after recovery).
    Connected,
    /// The broker has blocked the connection due to resource constraints.
    /// The inner string is the human-readable reason supplied by the broker.
    ConnectionBlocked(String),
    /// The broker has unblocked the connection.
    ConnectionUnblocked,
    /// The broker has changed the allowed flow direction.
    /// `true` means publishing is permitted; `false` means it is paused.
    SendFlow(bool),
    /// An error occurred on the connection.
    Error(Error),
}