lapin 4.6.0

AMQP client library
Documentation
use crate::{
    ConnectionProperties, Error,
    auth::{AuthProvider, DefaultAuthProvider},
    protocol,
    types::{ChannelId, FieldTable, FrameSize, Heartbeat, ShortString},
    uri::AMQPUri,
};
use backon::ExponentialBuilder;
use std::{
    fmt,
    sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
};

pub struct Configuration {
    pub(crate) amqp_client_properties: FieldTable,
    pub(crate) amqp_locale: ShortString,
    pub(crate) auth_provider: Arc<dyn AuthProvider>,
    pub(crate) backoff: ExponentialBuilder,
    pub(crate) negotiated_config: NegotiatedConfig,
    pub(crate) auto_recover: bool,
}

impl Configuration {
    pub(crate) fn new(uri: &AMQPUri, options: ConnectionProperties) -> Self {
        let ConnectionProperties {
            locale,
            client_properties,
            auth_provider,
            backoff,
            auto_recover,
            ..
        } = options;
        Self {
            amqp_client_properties: client_properties,
            amqp_locale: locale,
            auth_provider: auth_provider.unwrap_or_else(|| Arc::new(DefaultAuthProvider::new(uri))),
            backoff,
            negotiated_config: NegotiatedConfig::new(uri),
            auto_recover,
        }
    }

    pub fn channel_max(&self) -> ChannelId {
        self.negotiated_config.channel_max()
    }

    pub fn frame_max(&self) -> FrameSize {
        self.negotiated_config.frame_max()
    }

    pub fn heartbeat(&self) -> Heartbeat {
        self.negotiated_config.heartbeat()
    }

    pub(crate) fn recovery_config(&self) -> RecoveryConfig {
        RecoveryConfig(self.auto_recover)
    }
}

impl Clone for Configuration {
    fn clone(&self) -> Self {
        Self {
            amqp_client_properties: self.amqp_client_properties.clone(),
            amqp_locale: self.amqp_locale.clone(),
            auth_provider: self.auth_provider.clone(),
            backoff: self.backoff,
            negotiated_config: self.negotiated_config.clone(),
            auto_recover: self.auto_recover,
        }
    }
}

impl fmt::Debug for Configuration {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt::Debug::fmt(&self.negotiated_config, f)
    }
}

#[derive(Clone)]
pub(crate) struct NegotiatedConfig {
    inner: Arc<RwLock<Inner>>,
}

#[derive(Default, Clone, Copy)]
pub(crate) struct RecoveryConfig(pub(crate) bool);

struct Inner {
    channel_max: ChannelId,
    frame_max: FrameSize,
    heartbeat: Heartbeat,
}

impl NegotiatedConfig {
    fn new(uri: &AMQPUri) -> Self {
        Self {
            inner: Arc::new(RwLock::new(Inner {
                frame_max: uri.query.frame_max.unwrap_or_default(),
                channel_max: uri.query.channel_max.unwrap_or_default(),
                heartbeat: uri.query.heartbeat.unwrap_or_default(),
            })),
        }
    }

    pub(crate) fn channel_max(&self) -> ChannelId {
        self.read_inner().channel_max
    }

    pub(crate) fn set_channel_max(&self, channel_max: ChannelId) {
        self.write_inner().channel_max = channel_max;
    }

    pub(crate) fn frame_max(&self) -> FrameSize {
        self.read_inner().frame_max
    }

    pub(crate) fn set_frame_max(&self, frame_max: FrameSize) {
        let frame_max = std::cmp::max(frame_max, protocol::constants::FRAME_MIN_SIZE);
        self.write_inner().frame_max = frame_max;
    }

    pub(crate) fn heartbeat(&self) -> Heartbeat {
        self.read_inner().heartbeat
    }

    pub(crate) fn set_heartbeat(&self, heartbeat: Heartbeat) {
        self.write_inner().heartbeat = heartbeat;
    }

    fn read_inner(&self) -> RwLockReadGuard<'_, Inner> {
        self.inner.read().unwrap_or_else(|e| e.into_inner())
    }

    fn write_inner(&self) -> RwLockWriteGuard<'_, Inner> {
        self.inner.write().unwrap_or_else(|e| e.into_inner())
    }
}

impl fmt::Debug for NegotiatedConfig {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let inner = self.read_inner();
        f.debug_struct("Configuration")
            .field("channel_max", &inner.channel_max)
            .field("frame_max", &inner.frame_max)
            .field("heartbeat", &inner.heartbeat)
            .finish()
    }
}

impl RecoveryConfig {
    pub(crate) fn can_recover(&self, error: &Error) -> bool {
        self.0 && error.can_be_recovered()
    }
}