Skip to main content

ntex_amqp/
lib.rs

1#![deny(
2    rust_2018_idioms,
3    warnings,
4    unreachable_pub,
5    // missing_debug_implementations,
6    clippy::pedantic
7)]
8#![allow(
9    clippy::clone_on_copy,
10    clippy::cast_possible_truncation,
11    clippy::let_underscore_future,
12    clippy::missing_fields_in_debug,
13    clippy::must_use_candidate,
14    clippy::missing_errors_doc,
15    clippy::similar_names,
16    clippy::struct_field_names,
17    clippy::too_many_lines,
18    clippy::type_complexity
19)]
20
21#[macro_use]
22extern crate derive_more;
23
24use ntex_amqp_codec::protocol::{Handle, Milliseconds, Open, OpenInner, Symbols};
25use ntex_amqp_codec::types::Symbol;
26use ntex_bytes::ByteString;
27use ntex_service::cfg::{CfgContext, Configuration as SvcConfiguration};
28use ntex_util::time::Seconds;
29use uuid::Uuid;
30
31mod cell;
32pub mod client;
33mod connection;
34mod control;
35mod default;
36mod delivery;
37mod dispatcher;
38pub mod error;
39pub mod error_code;
40mod rcvlink;
41mod router;
42pub mod server;
43mod session;
44mod sndlink;
45mod state;
46pub mod types;
47
48pub use self::connection::{Connection, ConnectionRef, OpenSession};
49pub use self::control::{ControlFrame, ControlFrameKind};
50pub use self::delivery::{Delivery, TransferBuilder};
51pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder};
52pub use self::session::Session;
53pub use self::sndlink::{SenderLink, SenderLinkBuilder};
54pub use self::state::State;
55
56pub mod codec {
57    pub use ntex_amqp_codec::*;
58}
59
60/// Amqp1 transport configuration.
61#[derive(Debug)]
62pub struct AmqpServiceConfig {
63    pub max_frame_size: u32,
64    pub channel_max: u16,
65    pub idle_time_out: Milliseconds,
66    pub hostname: Option<ByteString>,
67    pub offered_capabilities: Option<Symbols>,
68    pub desired_capabilities: Option<Symbols>,
69    pub(crate) max_size: usize,
70    pub(crate) handshake_timeout: Seconds,
71    config: CfgContext,
72}
73
74/// Amqp1 transport configuration.
75#[derive(Debug)]
76pub struct RemoteServiceConfig {
77    pub max_frame_size: u32,
78    pub channel_max: u16,
79    pub idle_time_out: Milliseconds,
80    pub hostname: Option<ByteString>,
81    pub offered_capabilities: Option<Symbols>,
82    pub desired_capabilities: Option<Symbols>,
83}
84
85impl Default for AmqpServiceConfig {
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91impl SvcConfiguration for AmqpServiceConfig {
92    const NAME: &str = "AMQP Configuration";
93
94    fn ctx(&self) -> &CfgContext {
95        &self.config
96    }
97
98    fn set_ctx(&mut self, ctx: CfgContext) {
99        self.config = ctx;
100    }
101}
102
103impl AmqpServiceConfig {
104    /// Create connection configuration.
105    pub fn new() -> Self {
106        AmqpServiceConfig {
107            max_size: 0,
108            max_frame_size: u32::from(u16::MAX),
109            channel_max: 1024,
110            idle_time_out: 120_000,
111            hostname: None,
112            handshake_timeout: Seconds(5),
113            offered_capabilities: None,
114            desired_capabilities: None,
115            config: CfgContext::default(),
116        }
117    }
118
119    #[must_use]
120    /// The channel-max value is the highest channel number that
121    /// may be used on the Connection. This value plus one is the maximum
122    /// number of Sessions that can be simultaneously active on the Connection.
123    ///
124    /// By default channel max value is set to 1024
125    pub fn set_channel_max(mut self, num: u16) -> Self {
126        self.channel_max = num;
127        self
128    }
129
130    #[must_use]
131    /// Set max frame size for the connection.
132    ///
133    /// By default max size is set to 64kb
134    pub fn set_max_frame_size(mut self, size: u32) -> Self {
135        self.max_frame_size = size;
136        self
137    }
138
139    /// Get max frame size for the connection.
140    pub fn get_max_frame_size(&self) -> u32 {
141        self.max_frame_size
142    }
143
144    #[must_use]
145    /// Set idle time-out for the connection in seconds.
146    ///
147    /// By default idle time-out is set to 120 seconds
148    pub fn set_idle_timeout(mut self, timeout: u16) -> Self {
149        self.idle_time_out = Milliseconds::from(timeout) * 1000;
150        self
151    }
152
153    #[must_use]
154    /// Set connection hostname
155    ///
156    /// Hostname is not set by default
157    pub fn set_hostname(mut self, hostname: &str) -> Self {
158        self.hostname = Some(ByteString::from(hostname));
159        self
160    }
161
162    #[must_use]
163    /// Set offered capabilities
164    pub fn set_offered_capabilities(mut self, caps: Symbols) -> Self {
165        self.offered_capabilities = Some(caps);
166        self
167    }
168
169    #[must_use]
170    /// Set desired capabilities
171    pub fn set_desired_capabilities(mut self, caps: Symbols) -> Self {
172        self.desired_capabilities = Some(caps);
173        self
174    }
175
176    #[must_use]
177    /// Set max inbound frame size.
178    ///
179    /// If max size is set to `0`, size is unlimited.
180    /// By default max size is set to `0`
181    pub fn set_max_size(mut self, size: usize) -> Self {
182        self.max_size = size;
183        self
184    }
185
186    #[must_use]
187    /// Set handshake timeout.
188    ///
189    /// By default handshake timeout is 5 seconds.
190    pub fn set_handshake_timeout(mut self, timeout: Seconds) -> Self {
191        self.handshake_timeout = timeout;
192        self
193    }
194
195    /// Get offered capabilities
196    pub fn get_offered_capabilities(&self) -> &[Symbol] {
197        if let Some(caps) = &self.offered_capabilities {
198            &caps.0
199        } else {
200            &[]
201        }
202    }
203
204    /// Get desired capabilities
205    pub fn get_desired_capabilities(&self) -> &[Symbol] {
206        if let Some(caps) = &self.desired_capabilities {
207            &caps.0
208        } else {
209            &[]
210        }
211    }
212
213    #[must_use]
214    /// Create `Open` performative for this configuration.
215    pub fn to_open(&self) -> Open {
216        Open(Box::new(OpenInner {
217            container_id: ByteString::from(Uuid::new_v4().simple().to_string()),
218            hostname: self.hostname.clone(),
219            max_frame_size: self.max_frame_size,
220            channel_max: self.channel_max,
221            idle_time_out: if self.idle_time_out > 0 {
222                Some(self.idle_time_out)
223            } else {
224                None
225            },
226            outgoing_locales: None,
227            incoming_locales: None,
228            offered_capabilities: self.offered_capabilities.clone(),
229            desired_capabilities: self.desired_capabilities.clone(),
230            properties: None,
231        }))
232    }
233}
234
235impl RemoteServiceConfig {
236    #[must_use]
237    pub fn new(open: &Open) -> RemoteServiceConfig {
238        RemoteServiceConfig {
239            max_frame_size: open.max_frame_size(),
240            channel_max: open.channel_max(),
241            idle_time_out: open.idle_time_out().unwrap_or(0),
242            hostname: open.hostname().cloned(),
243            offered_capabilities: open.0.offered_capabilities.clone(),
244            desired_capabilities: open.0.desired_capabilities.clone(),
245        }
246    }
247
248    #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
249    pub(crate) fn timeout_remote_secs(&self) -> Seconds {
250        if self.idle_time_out > 0 {
251            Seconds::checked_new(((self.idle_time_out as f32) * 0.75 / 1000.0) as usize)
252        } else {
253            Seconds::ZERO
254        }
255    }
256}