ntex_amqp/
lib.rs

1//#![deny(rust_2018_idioms, warnings, unreachable_pub)]
2#![allow(clippy::type_complexity, clippy::let_underscore_future)]
3
4#[macro_use]
5extern crate derive_more;
6
7use ntex::{io::DispatcherConfig, time::Seconds, util::ByteString};
8use ntex_amqp_codec::protocol::{Handle, Milliseconds, Open, OpenInner, Symbols};
9use ntex_amqp_codec::types::Symbol;
10use uuid::Uuid;
11
12mod cell;
13pub mod client;
14mod connection;
15mod control;
16mod default;
17mod delivery;
18mod dispatcher;
19pub mod error;
20pub mod error_code;
21mod rcvlink;
22mod router;
23pub mod server;
24mod session;
25mod sndlink;
26mod state;
27pub mod types;
28
29pub use self::connection::{Connection, ConnectionRef, OpenSession};
30pub use self::control::{ControlFrame, ControlFrameKind};
31pub use self::delivery::{Delivery, TransferBuilder};
32pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder};
33pub use self::session::Session;
34pub use self::sndlink::{SenderLink, SenderLinkBuilder};
35pub use self::state::State;
36
37pub mod codec {
38    pub use ntex_amqp_codec::*;
39}
40
41/// Amqp1 transport configuration.
42#[derive(Debug, Clone)]
43pub struct Configuration {
44    pub max_frame_size: u32,
45    pub channel_max: u16,
46    pub idle_time_out: Milliseconds,
47    pub hostname: Option<ByteString>,
48    pub offered_capabilities: Option<Symbols>,
49    pub desired_capabilities: Option<Symbols>,
50    pub(crate) max_size: usize,
51    pub(crate) disp_config: DispatcherConfig,
52    pub(crate) handshake_timeout: Seconds,
53}
54
55impl Default for Configuration {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61impl Configuration {
62    /// Create connection configuration.
63    pub fn new() -> Self {
64        let disp_config = DispatcherConfig::default();
65        disp_config
66            .set_disconnect_timeout(Seconds(3))
67            .set_keepalive_timeout(Seconds(0));
68
69        Configuration {
70            disp_config,
71            max_size: 0,
72            max_frame_size: u16::MAX as u32,
73            channel_max: 1024,
74            idle_time_out: 120_000,
75            hostname: None,
76            handshake_timeout: Seconds(5),
77            offered_capabilities: None,
78            desired_capabilities: None,
79        }
80    }
81
82    /// The channel-max value is the highest channel number that
83    /// may be used on the Connection. This value plus one is the maximum
84    /// number of Sessions that can be simultaneously active on the Connection.
85    ///
86    /// By default channel max value is set to 1024
87    pub fn channel_max(&mut self, num: u16) -> &mut Self {
88        self.channel_max = num;
89        self
90    }
91
92    /// Set max frame size for the connection.
93    ///
94    /// By default max size is set to 64kb
95    pub fn max_frame_size(&mut self, size: u32) -> &mut Self {
96        self.max_frame_size = size;
97        self
98    }
99
100    /// Get max frame size for the connection.
101    pub fn get_max_frame_size(&self) -> u32 {
102        self.max_frame_size
103    }
104
105    /// Set idle time-out for the connection in seconds.
106    ///
107    /// By default idle time-out is set to 120 seconds
108    pub fn idle_timeout(&mut self, timeout: u16) -> &mut Self {
109        self.idle_time_out = (timeout as Milliseconds) * 1000;
110        self.disp_config.set_keepalive_timeout(Seconds(timeout));
111        self
112    }
113
114    /// Set connection hostname
115    ///
116    /// Hostname is not set by default
117    pub fn hostname(&mut self, hostname: &str) -> &mut Self {
118        self.hostname = Some(ByteString::from(hostname));
119        self
120    }
121
122    /// Set offered capabilities
123    pub fn offered_capabilities(&mut self, caps: Symbols) -> &mut Self {
124        self.offered_capabilities = Some(caps);
125        self
126    }
127
128    /// Set desired capabilities
129    pub fn desired_capabilities(&mut self, caps: Symbols) -> &mut Self {
130        self.desired_capabilities = Some(caps);
131        self
132    }
133
134    /// Set max inbound frame size.
135    ///
136    /// If max size is set to `0`, size is unlimited.
137    /// By default max size is set to `0`
138    pub fn max_size(&mut self, size: usize) -> &mut Self {
139        self.max_size = size;
140        self
141    }
142
143    /// Set handshake timeout.
144    ///
145    /// By default handshake timeout is 5 seconds.
146    pub fn handshake_timeout(&mut self, timeout: Seconds) -> &mut Self {
147        self.handshake_timeout = timeout;
148        self
149    }
150
151    /// Set server connection keep-alive timeout.
152    ///
153    /// To disable timeout set value to 0.
154    ///
155    /// By default keep-alive timeout is disabled.
156    pub fn keepalive_timeout(&mut self, val: Seconds) -> &mut Self {
157        self.disp_config.set_keepalive_timeout(val);
158        self
159    }
160
161    /// Set server connection disconnect timeout.
162    ///
163    /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
164    /// within this time, the connection get dropped.
165    ///
166    /// To disable timeout set value to 0.
167    ///
168    /// By default disconnect timeout is set to 3 seconds.
169    pub fn disconnect_timeout(&mut self, val: Seconds) -> &mut Self {
170        self.disp_config.set_disconnect_timeout(val);
171        self
172    }
173
174    /// Set read rate parameters for single frame.
175    ///
176    /// Set read timeout, max timeout and rate for reading payload. If the client
177    /// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds.
178    /// But no more than `max_timeout` timeout.
179    ///
180    /// By default frame read rate is disabled.
181    pub fn frame_read_rate(
182        &mut self,
183        timeout: Seconds,
184        max_timeout: Seconds,
185        rate: u16,
186    ) -> &mut Self {
187        self.disp_config
188            .set_frame_read_rate(timeout, max_timeout, rate);
189        self
190    }
191
192    /// Get offered capabilities
193    pub fn get_offered_capabilities(&self) -> &[Symbol] {
194        if let Some(caps) = &self.offered_capabilities {
195            &caps.0
196        } else {
197            &[]
198        }
199    }
200
201    /// Get desired capabilities
202    pub fn get_desired_capabilities(&self) -> &[Symbol] {
203        if let Some(caps) = &self.desired_capabilities {
204            &caps.0
205        } else {
206            &[]
207        }
208    }
209
210    /// Create `Open` performative for this configuration.
211    pub fn to_open(&self) -> Open {
212        Open(Box::new(OpenInner {
213            container_id: ByteString::from(Uuid::new_v4().simple().to_string()),
214            hostname: self.hostname.clone(),
215            max_frame_size: self.max_frame_size,
216            channel_max: self.channel_max,
217            idle_time_out: if self.idle_time_out > 0 {
218                Some(self.idle_time_out)
219            } else {
220                None
221            },
222            outgoing_locales: None,
223            incoming_locales: None,
224            offered_capabilities: self.offered_capabilities.clone(),
225            desired_capabilities: self.desired_capabilities.clone(),
226            properties: None,
227        }))
228    }
229
230    pub(crate) fn timeout_remote_secs(&self) -> Seconds {
231        if self.idle_time_out > 0 {
232            Seconds::checked_new(((self.idle_time_out as f32) * 0.75 / 1000.0) as usize)
233        } else {
234            Seconds::ZERO
235        }
236    }
237
238    pub fn from_remote(&self, open: &Open) -> Configuration {
239        Configuration {
240            max_frame_size: open.max_frame_size(),
241            channel_max: open.channel_max(),
242            idle_time_out: open.idle_time_out().unwrap_or(0),
243            hostname: open.hostname().cloned(),
244            max_size: self.max_size,
245            disp_config: self.disp_config.clone(),
246            handshake_timeout: self.handshake_timeout,
247            offered_capabilities: open.0.offered_capabilities.clone(),
248            desired_capabilities: open.0.desired_capabilities.clone(),
249        }
250    }
251}