1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
3
4#[macro_use]
5extern crate derive_more;
6
7use ntex::{io::DispatcherConfig, time::Seconds, util::ByteString};
8use ntex_amqp_codec::protocol::{Handle, Milliseconds, Open, OpenInner, Symbols};
9use ntex_amqp_codec::types::Symbol;
10use uuid::Uuid;
11
12mod cell;
13pub mod client;
14mod connection;
15mod control;
16mod default;
17mod delivery;
18mod dispatcher;
19pub mod error;
20pub mod error_code;
21mod rcvlink;
22mod router;
23pub mod server;
24mod session;
25mod sndlink;
26mod state;
27pub mod types;
28
29pub use self::connection::{Connection, ConnectionRef, OpenSession};
30pub use self::control::{ControlFrame, ControlFrameKind};
31pub use self::delivery::{Delivery, TransferBuilder};
32pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder};
33pub use self::session::Session;
34pub use self::sndlink::{SenderLink, SenderLinkBuilder};
35pub use self::state::State;
36
37pub mod codec {
38 pub use ntex_amqp_codec::*;
39}
40
41#[derive(Debug, Clone)]
43pub struct Configuration {
44 pub max_frame_size: u32,
45 pub channel_max: u16,
46 pub idle_time_out: Milliseconds,
47 pub hostname: Option<ByteString>,
48 pub offered_capabilities: Option<Symbols>,
49 pub desired_capabilities: Option<Symbols>,
50 pub(crate) max_size: usize,
51 pub(crate) disp_config: DispatcherConfig,
52 pub(crate) handshake_timeout: Seconds,
53}
54
55impl Default for Configuration {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61impl Configuration {
62 pub fn new() -> Self {
64 let disp_config = DispatcherConfig::default();
65 disp_config
66 .set_disconnect_timeout(Seconds(3))
67 .set_keepalive_timeout(Seconds(0));
68
69 Configuration {
70 disp_config,
71 max_size: 0,
72 max_frame_size: u16::MAX as u32,
73 channel_max: 1024,
74 idle_time_out: 120_000,
75 hostname: None,
76 handshake_timeout: Seconds(5),
77 offered_capabilities: None,
78 desired_capabilities: None,
79 }
80 }
81
82 pub fn channel_max(&mut self, num: u16) -> &mut Self {
88 self.channel_max = num;
89 self
90 }
91
92 pub fn max_frame_size(&mut self, size: u32) -> &mut Self {
96 self.max_frame_size = size;
97 self
98 }
99
100 pub fn get_max_frame_size(&self) -> u32 {
102 self.max_frame_size
103 }
104
105 pub fn idle_timeout(&mut self, timeout: u16) -> &mut Self {
109 self.idle_time_out = (timeout as Milliseconds) * 1000;
110 self.disp_config.set_keepalive_timeout(Seconds(timeout));
111 self
112 }
113
114 pub fn hostname(&mut self, hostname: &str) -> &mut Self {
118 self.hostname = Some(ByteString::from(hostname));
119 self
120 }
121
122 pub fn offered_capabilities(&mut self, caps: Symbols) -> &mut Self {
124 self.offered_capabilities = Some(caps);
125 self
126 }
127
128 pub fn desired_capabilities(&mut self, caps: Symbols) -> &mut Self {
130 self.desired_capabilities = Some(caps);
131 self
132 }
133
134 pub fn max_size(&mut self, size: usize) -> &mut Self {
139 self.max_size = size;
140 self
141 }
142
143 pub fn handshake_timeout(&mut self, timeout: Seconds) -> &mut Self {
147 self.handshake_timeout = timeout;
148 self
149 }
150
151 pub fn keepalive_timeout(&mut self, val: Seconds) -> &mut Self {
157 self.disp_config.set_keepalive_timeout(val);
158 self
159 }
160
161 pub fn disconnect_timeout(&mut self, val: Seconds) -> &mut Self {
170 self.disp_config.set_disconnect_timeout(val);
171 self
172 }
173
174 pub fn frame_read_rate(
182 &mut self,
183 timeout: Seconds,
184 max_timeout: Seconds,
185 rate: u16,
186 ) -> &mut Self {
187 self.disp_config
188 .set_frame_read_rate(timeout, max_timeout, rate);
189 self
190 }
191
192 pub fn get_offered_capabilities(&self) -> &[Symbol] {
194 if let Some(caps) = &self.offered_capabilities {
195 &caps.0
196 } else {
197 &[]
198 }
199 }
200
201 pub fn get_desired_capabilities(&self) -> &[Symbol] {
203 if let Some(caps) = &self.desired_capabilities {
204 &caps.0
205 } else {
206 &[]
207 }
208 }
209
210 pub fn to_open(&self) -> Open {
212 Open(Box::new(OpenInner {
213 container_id: ByteString::from(Uuid::new_v4().simple().to_string()),
214 hostname: self.hostname.clone(),
215 max_frame_size: self.max_frame_size,
216 channel_max: self.channel_max,
217 idle_time_out: if self.idle_time_out > 0 {
218 Some(self.idle_time_out)
219 } else {
220 None
221 },
222 outgoing_locales: None,
223 incoming_locales: None,
224 offered_capabilities: self.offered_capabilities.clone(),
225 desired_capabilities: self.desired_capabilities.clone(),
226 properties: None,
227 }))
228 }
229
230 pub(crate) fn timeout_remote_secs(&self) -> Seconds {
231 if self.idle_time_out > 0 {
232 Seconds::checked_new(((self.idle_time_out as f32) * 0.75 / 1000.0) as usize)
233 } else {
234 Seconds::ZERO
235 }
236 }
237
238 pub fn from_remote(&self, open: &Open) -> Configuration {
239 Configuration {
240 max_frame_size: open.max_frame_size(),
241 channel_max: open.channel_max(),
242 idle_time_out: open.idle_time_out().unwrap_or(0),
243 hostname: open.hostname().cloned(),
244 max_size: self.max_size,
245 disp_config: self.disp_config.clone(),
246 handshake_timeout: self.handshake_timeout,
247 offered_capabilities: open.0.offered_capabilities.clone(),
248 desired_capabilities: open.0.desired_capabilities.clone(),
249 }
250 }
251}