lapin 4.7.0

AMQP client library
Documentation
use crate::{Error, ErrorKind, Result, types::ShortString, uri::AMQPUri};
use std::{
    fmt,
    sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
};

#[derive(Clone, Default)]
pub struct ConnectionStatus(Arc<Inner>);

impl ConnectionStatus {
    pub(crate) fn new(uri: &AMQPUri) -> Self {
        Self(Arc::new(Inner {
            vhost: uri.vhost.as_str().into(),
            username: uri.authority.userinfo.username.clone(),
            state: RwLock::new(StateInner::default()),
        }))
    }

    pub(crate) fn state(&self) -> ConnectionState {
        self.read().state
    }

    pub(crate) fn set_state(&self, state: ConnectionState) -> ConnectionState {
        let mut inner = self.write();
        std::mem::replace(&mut inner.state, state)
    }

    pub(crate) fn set_connecting(&self) -> Result<()> {
        self.write().set_connecting()
    }

    pub(crate) fn set_reconnecting(&self) {
        self.write().set_reconnecting();
    }

    pub fn vhost(&self) -> ShortString {
        self.0.vhost.clone()
    }

    pub fn username(&self) -> String {
        self.0.username.clone()
    }

    pub(crate) fn block(&self) {
        self.write().blocked = true;
    }

    pub(crate) fn unblock(&self) {
        self.write().blocked = false;
    }

    pub fn blocked(&self) -> bool {
        self.read().blocked
    }

    pub fn connected(&self) -> bool {
        self.state() == ConnectionState::Connected
    }

    pub(crate) fn ensure_connected(&self) -> Result<()> {
        if !self.connected() {
            return Err(ErrorKind::InvalidConnectionState(self.state()).into());
        }
        Ok(())
    }

    pub fn connecting(&self) -> bool {
        self.state() == ConnectionState::Connecting
    }

    pub fn reconnecting(&self) -> bool {
        self.state() == ConnectionState::Reconnecting
    }

    pub fn closing(&self) -> bool {
        self.state() == ConnectionState::Closing
    }

    pub fn closed(&self) -> bool {
        self.state() == ConnectionState::Closed
    }

    pub fn errored(&self) -> bool {
        self.state() == ConnectionState::Error
    }

    pub(crate) fn poison(&self, err: Error) {
        self.write().poison(err);
    }

    pub(crate) fn auto_close(&self) -> bool {
        [ConnectionState::Connecting, ConnectionState::Connected].contains(&self.state())
    }

    fn read(&self) -> RwLockReadGuard<'_, StateInner> {
        self.0.state.read().unwrap_or_else(|e| e.into_inner())
    }

    fn write(&self) -> RwLockWriteGuard<'_, StateInner> {
        self.0.state.write().unwrap_or_else(|e| e.into_inner())
    }
}

#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub enum ConnectionState {
    #[default]
    Initial,
    Connecting,
    Connected,
    Closing,
    Closed,
    Reconnecting,
    Error,
}

impl fmt::Debug for ConnectionStatus {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut debug = f.debug_struct("ConnectionStatus");
        debug
            .field("vhost", &self.0.vhost)
            .field("username", &self.0.username);
        if let Ok(inner) = self.0.state.try_read() {
            debug
                .field("state", &inner.state)
                .field("blocked", &inner.blocked);
        }
        debug.finish()
    }
}

struct Inner {
    vhost: ShortString,
    username: String,
    state: RwLock<StateInner>,
}

impl Default for Inner {
    fn default() -> Self {
        Self {
            vhost: "/".into(),
            username: "guest".into(),
            state: RwLock::new(StateInner::default()),
        }
    }
}

#[derive(Default)]
struct StateInner {
    state: ConnectionState,
    blocked: bool,
    poison: Option<Error>,
}

impl StateInner {
    fn set_connecting(&mut self) -> Result<()> {
        self.state = ConnectionState::Connecting;
        self.poison.take().map(Err).unwrap_or(Ok(()))
    }

    fn set_reconnecting(&mut self) {
        let _ = self.poison.take();
        self.state = ConnectionState::Reconnecting;
        self.blocked = false;
    }

    fn poison(&mut self, err: Error) {
        self.poison = Some(err);
    }
}