Skip to main content

lapin/
configuration.rs

1use crate::{
2    ConnectionProperties, Error,
3    auth::{AuthProvider, DefaultAuthProvider},
4    protocol,
5    types::{ChannelId, FieldTable, FrameSize, Heartbeat, ShortString},
6    uri::AMQPUri,
7};
8use backon::ExponentialBuilder;
9use std::{
10    fmt,
11    sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
12};
13
14pub struct Configuration {
15    pub(crate) amqp_client_properties: FieldTable,
16    pub(crate) amqp_locale: ShortString,
17    pub(crate) auth_provider: Arc<dyn AuthProvider>,
18    pub(crate) backoff: ExponentialBuilder,
19    pub(crate) negotiated_config: NegotiatedConfig,
20    pub(crate) auto_recover: bool,
21}
22
23impl Configuration {
24    pub(crate) fn new(uri: &AMQPUri, options: ConnectionProperties) -> Self {
25        let ConnectionProperties {
26            locale,
27            client_properties,
28            auth_provider,
29            backoff,
30            auto_recover,
31            ..
32        } = options;
33        Self {
34            amqp_client_properties: client_properties,
35            amqp_locale: locale,
36            auth_provider: auth_provider.unwrap_or_else(|| Arc::new(DefaultAuthProvider::new(uri))),
37            backoff,
38            negotiated_config: NegotiatedConfig::new(uri),
39            auto_recover,
40        }
41    }
42
43    pub fn channel_max(&self) -> ChannelId {
44        self.negotiated_config.channel_max()
45    }
46
47    pub fn frame_max(&self) -> FrameSize {
48        self.negotiated_config.frame_max()
49    }
50
51    pub fn heartbeat(&self) -> Heartbeat {
52        self.negotiated_config.heartbeat()
53    }
54
55    pub(crate) fn recovery_config(&self) -> RecoveryConfig {
56        RecoveryConfig(self.auto_recover)
57    }
58}
59
60impl Clone for Configuration {
61    fn clone(&self) -> Self {
62        Self {
63            amqp_client_properties: self.amqp_client_properties.clone(),
64            amqp_locale: self.amqp_locale.clone(),
65            auth_provider: self.auth_provider.clone(),
66            backoff: self.backoff,
67            negotiated_config: self.negotiated_config.clone(),
68            auto_recover: self.auto_recover,
69        }
70    }
71}
72
73impl fmt::Debug for Configuration {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        fmt::Debug::fmt(&self.negotiated_config, f)
76    }
77}
78
79#[derive(Clone)]
80pub(crate) struct NegotiatedConfig {
81    inner: Arc<RwLock<Inner>>,
82}
83
84#[derive(Default, Clone, Copy)]
85pub(crate) struct RecoveryConfig(pub(crate) bool);
86
87struct Inner {
88    channel_max: ChannelId,
89    frame_max: FrameSize,
90    heartbeat: Heartbeat,
91}
92
93impl NegotiatedConfig {
94    fn new(uri: &AMQPUri) -> Self {
95        Self {
96            inner: Arc::new(RwLock::new(Inner {
97                frame_max: uri.query.frame_max.unwrap_or_default(),
98                channel_max: uri.query.channel_max.unwrap_or_default(),
99                heartbeat: uri.query.heartbeat.unwrap_or_default(),
100            })),
101        }
102    }
103
104    pub(crate) fn channel_max(&self) -> ChannelId {
105        self.read_inner().channel_max
106    }
107
108    pub(crate) fn set_channel_max(&self, channel_max: ChannelId) {
109        self.write_inner().channel_max = channel_max;
110    }
111
112    pub(crate) fn frame_max(&self) -> FrameSize {
113        self.read_inner().frame_max
114    }
115
116    pub(crate) fn set_frame_max(&self, frame_max: FrameSize) {
117        let frame_max = std::cmp::max(frame_max, protocol::constants::FRAME_MIN_SIZE);
118        self.write_inner().frame_max = frame_max;
119    }
120
121    pub(crate) fn heartbeat(&self) -> Heartbeat {
122        self.read_inner().heartbeat
123    }
124
125    pub(crate) fn set_heartbeat(&self, heartbeat: Heartbeat) {
126        self.write_inner().heartbeat = heartbeat;
127    }
128
129    fn read_inner(&self) -> RwLockReadGuard<'_, Inner> {
130        self.inner.read().unwrap_or_else(|e| e.into_inner())
131    }
132
133    fn write_inner(&self) -> RwLockWriteGuard<'_, Inner> {
134        self.inner.write().unwrap_or_else(|e| e.into_inner())
135    }
136}
137
138impl fmt::Debug for NegotiatedConfig {
139    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140        let inner = self.read_inner();
141        f.debug_struct("Configuration")
142            .field("channel_max", &inner.channel_max)
143            .field("frame_max", &inner.frame_max)
144            .field("heartbeat", &inner.heartbeat)
145            .finish()
146    }
147}
148
149impl RecoveryConfig {
150    pub(crate) fn can_recover(&self, error: &Error) -> bool {
151        self.0 && error.can_be_recovered()
152    }
153}