lapin 4.6.0

AMQP client library
Documentation
use crate::{Error, ErrorKind, Result, types::ShortString, uri::AMQPUri};
use std::{
    fmt,
    sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
};

#[derive(Clone, Default)]
pub struct ConnectionStatus(Arc<RwLock<Inner>>);

impl ConnectionStatus {
    pub(crate) fn new(uri: &AMQPUri) -> Self {
        let status = Self::default();
        status.set_vhost(&uri.vhost);
        status.set_username(&uri.authority.userinfo.username);
        status
    }

    pub(crate) fn state(&self) -> ConnectionState {
        self.read().state
    }

    pub(crate) fn set_state(&self, state: ConnectionState) -> ConnectionState {
        let mut inner = self.write();
        std::mem::replace(&mut inner.state, state)
    }

    pub(crate) fn set_connecting(&self) -> Result<()> {
        self.write().set_connecting()
    }

    pub(crate) fn set_reconnecting(&self) {
        self.write().set_reconnecting();
    }

    pub fn vhost(&self) -> ShortString {
        self.read().vhost.clone()
    }

    pub(crate) fn set_vhost(&self, vhost: &str) {
        self.write().vhost = vhost.into();
    }

    pub fn username(&self) -> String {
        self.read().username.clone()
    }

    pub(crate) fn set_username(&self, username: &str) {
        self.write().username = username.into();
    }

    pub(crate) fn block(&self) {
        self.write().blocked = true;
    }

    pub(crate) fn unblock(&self) {
        self.write().blocked = false;
    }

    pub fn blocked(&self) -> bool {
        self.read().blocked
    }

    pub fn connected(&self) -> bool {
        self.state() == ConnectionState::Connected
    }

    pub(crate) fn ensure_connected(&self) -> Result<()> {
        if !self.connected() {
            return Err(ErrorKind::InvalidConnectionState(self.state()).into());
        }
        Ok(())
    }

    pub fn connecting(&self) -> bool {
        self.state() == ConnectionState::Connecting
    }

    pub fn reconnecting(&self) -> bool {
        self.state() == ConnectionState::Reconnecting
    }

    pub fn closing(&self) -> bool {
        self.state() == ConnectionState::Closing
    }

    pub fn closed(&self) -> bool {
        self.state() == ConnectionState::Closed
    }

    pub fn errored(&self) -> bool {
        self.state() == ConnectionState::Error
    }

    pub(crate) fn poison(&self, err: Error) {
        self.write().poison(err);
    }

    pub(crate) fn auto_close(&self) -> bool {
        [ConnectionState::Connecting, ConnectionState::Connected].contains(&self.state())
    }

    fn read(&self) -> RwLockReadGuard<'_, Inner> {
        self.0.read().unwrap_or_else(|e| e.into_inner())
    }

    fn write(&self) -> RwLockWriteGuard<'_, Inner> {
        self.0.write().unwrap_or_else(|e| e.into_inner())
    }
}

#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub enum ConnectionState {
    #[default]
    Initial,
    Connecting,
    Connected,
    Closing,
    Closed,
    Reconnecting,
    Error,
}

impl fmt::Debug for ConnectionStatus {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut debug = f.debug_struct("ConnectionStatus");
        if let Ok(inner) = self.0.try_read() {
            debug
                .field("state", &inner.state)
                .field("vhost", &inner.vhost)
                .field("username", &inner.username)
                .field("blocked", &inner.blocked);
        }
        debug.finish()
    }
}

struct Inner {
    state: ConnectionState,
    vhost: ShortString,
    username: String,
    blocked: bool,
    poison: Option<Error>,
}

impl Default for Inner {
    fn default() -> Self {
        Self {
            state: ConnectionState::default(),
            vhost: "/".into(),
            username: "guest".into(),
            blocked: false,
            poison: None,
        }
    }
}

impl Inner {
    fn set_connecting(&mut self) -> Result<()> {
        self.state = ConnectionState::Connecting;
        self.poison.take().map(Err).unwrap_or(Ok(()))
    }

    fn set_reconnecting(&mut self) {
        let _ = self.poison.take();
        self.state = ConnectionState::Reconnecting;
        self.blocked = false;
    }

    fn poison(&mut self, err: Error) {
        self.poison = Some(err);
    }
}