1use crate::{
2 ConnectionProperties, Error,
3 auth::{AuthProvider, DefaultAuthProvider},
4 protocol,
5 types::{ChannelId, FieldTable, FrameSize, Heartbeat, ShortString},
6 uri::AMQPUri,
7};
8use backon::ExponentialBuilder;
9use std::{
10 fmt,
11 sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
12};
13
14pub struct Configuration {
15 pub(crate) amqp_client_properties: FieldTable,
16 pub(crate) amqp_locale: ShortString,
17 pub(crate) auth_provider: Arc<dyn AuthProvider>,
18 pub(crate) backoff: ExponentialBuilder,
19 pub(crate) negotiated_config: NegotiatedConfig,
20 pub(crate) auto_recover: bool,
21}
22
23impl Configuration {
24 pub(crate) fn new(uri: &AMQPUri, options: ConnectionProperties) -> Self {
25 let ConnectionProperties {
26 locale,
27 client_properties,
28 auth_provider,
29 backoff,
30 auto_recover,
31 ..
32 } = options;
33 Self {
34 amqp_client_properties: client_properties,
35 amqp_locale: locale,
36 auth_provider: auth_provider.unwrap_or_else(|| Arc::new(DefaultAuthProvider::new(uri))),
37 backoff,
38 negotiated_config: NegotiatedConfig::new(uri),
39 auto_recover,
40 }
41 }
42
43 pub fn channel_max(&self) -> ChannelId {
44 self.negotiated_config.channel_max()
45 }
46
47 pub fn frame_max(&self) -> FrameSize {
48 self.negotiated_config.frame_max()
49 }
50
51 pub fn heartbeat(&self) -> Heartbeat {
52 self.negotiated_config.heartbeat()
53 }
54
55 pub(crate) fn recovery_config(&self) -> RecoveryConfig {
56 RecoveryConfig(self.auto_recover)
57 }
58}
59
60impl Clone for Configuration {
61 fn clone(&self) -> Self {
62 Self {
63 amqp_client_properties: self.amqp_client_properties.clone(),
64 amqp_locale: self.amqp_locale.clone(),
65 auth_provider: self.auth_provider.clone(),
66 backoff: self.backoff,
67 negotiated_config: self.negotiated_config.clone(),
68 auto_recover: self.auto_recover,
69 }
70 }
71}
72
73impl fmt::Debug for Configuration {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 fmt::Debug::fmt(&self.negotiated_config, f)
76 }
77}
78
79#[derive(Clone)]
80pub(crate) struct NegotiatedConfig {
81 inner: Arc<RwLock<Inner>>,
82}
83
84#[derive(Default, Clone, Copy)]
85pub(crate) struct RecoveryConfig(pub(crate) bool);
86
87struct Inner {
88 channel_max: ChannelId,
89 frame_max: FrameSize,
90 heartbeat: Heartbeat,
91}
92
93impl NegotiatedConfig {
94 fn new(uri: &AMQPUri) -> Self {
95 Self {
96 inner: Arc::new(RwLock::new(Inner {
97 frame_max: uri.query.frame_max.unwrap_or_default(),
98 channel_max: uri.query.channel_max.unwrap_or_default(),
99 heartbeat: uri.query.heartbeat.unwrap_or_default(),
100 })),
101 }
102 }
103
104 pub(crate) fn channel_max(&self) -> ChannelId {
105 self.read_inner().channel_max
106 }
107
108 pub(crate) fn set_channel_max(&self, channel_max: ChannelId) {
109 self.write_inner().channel_max = channel_max;
110 }
111
112 pub(crate) fn frame_max(&self) -> FrameSize {
113 self.read_inner().frame_max
114 }
115
116 pub(crate) fn set_frame_max(&self, frame_max: FrameSize) {
117 let frame_max = std::cmp::max(frame_max, protocol::constants::FRAME_MIN_SIZE);
118 self.write_inner().frame_max = frame_max;
119 }
120
121 pub(crate) fn heartbeat(&self) -> Heartbeat {
122 self.read_inner().heartbeat
123 }
124
125 pub(crate) fn set_heartbeat(&self, heartbeat: Heartbeat) {
126 self.write_inner().heartbeat = heartbeat;
127 }
128
129 fn read_inner(&self) -> RwLockReadGuard<'_, Inner> {
130 self.inner.read().unwrap_or_else(|e| e.into_inner())
131 }
132
133 fn write_inner(&self) -> RwLockWriteGuard<'_, Inner> {
134 self.inner.write().unwrap_or_else(|e| e.into_inner())
135 }
136}
137
138impl fmt::Debug for NegotiatedConfig {
139 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140 let inner = self.read_inner();
141 f.debug_struct("Configuration")
142 .field("channel_max", &inner.channel_max)
143 .field("frame_max", &inner.frame_max)
144 .field("heartbeat", &inner.heartbeat)
145 .finish()
146 }
147}
148
149impl RecoveryConfig {
150 pub(crate) fn can_recover(&self, error: &Error) -> bool {
151 self.0 && error.can_be_recovered()
152 }
153}