1#![deny(
2 rust_2018_idioms,
3 warnings,
4 unreachable_pub,
5 clippy::pedantic
7)]
8#![allow(
9 clippy::clone_on_copy,
10 clippy::cast_possible_truncation,
11 clippy::let_underscore_future,
12 clippy::missing_fields_in_debug,
13 clippy::must_use_candidate,
14 clippy::missing_errors_doc,
15 clippy::similar_names,
16 clippy::struct_field_names,
17 clippy::too_many_lines,
18 clippy::type_complexity
19)]
20
21#[macro_use]
22extern crate derive_more;
23
24use ntex_amqp_codec::protocol::{Handle, Milliseconds, Open, OpenInner, Symbols};
25use ntex_amqp_codec::types::Symbol;
26use ntex_bytes::ByteString;
27use ntex_service::cfg::{CfgContext, Configuration as SvcConfiguration};
28use ntex_util::time::Seconds;
29use uuid::Uuid;
30
31mod cell;
32pub mod client;
33mod connection;
34mod control;
35mod default;
36mod delivery;
37mod dispatcher;
38pub mod error;
39pub mod error_code;
40mod rcvlink;
41mod router;
42pub mod server;
43mod session;
44mod sndlink;
45mod state;
46pub mod types;
47
48pub use self::connection::{Connection, ConnectionRef, OpenSession};
49pub use self::control::{ControlFrame, ControlFrameKind};
50pub use self::delivery::{Delivery, TransferBuilder};
51pub use self::rcvlink::{ReceiverLink, ReceiverLinkBuilder};
52pub use self::session::Session;
53pub use self::sndlink::{SenderLink, SenderLinkBuilder};
54pub use self::state::State;
55
56pub mod codec {
57 pub use ntex_amqp_codec::*;
58}
59
60#[derive(Debug)]
62pub struct AmqpServiceConfig {
63 pub max_frame_size: u32,
64 pub channel_max: u16,
65 pub idle_time_out: Milliseconds,
66 pub hostname: Option<ByteString>,
67 pub offered_capabilities: Option<Symbols>,
68 pub desired_capabilities: Option<Symbols>,
69 pub(crate) max_size: usize,
70 pub(crate) handshake_timeout: Seconds,
71 config: CfgContext,
72}
73
74#[derive(Debug)]
76pub struct RemoteServiceConfig {
77 pub max_frame_size: u32,
78 pub channel_max: u16,
79 pub idle_time_out: Milliseconds,
80 pub hostname: Option<ByteString>,
81 pub offered_capabilities: Option<Symbols>,
82 pub desired_capabilities: Option<Symbols>,
83}
84
85impl Default for AmqpServiceConfig {
86 fn default() -> Self {
87 Self::new()
88 }
89}
90
91impl SvcConfiguration for AmqpServiceConfig {
92 const NAME: &str = "AMQP Configuration";
93
94 fn ctx(&self) -> &CfgContext {
95 &self.config
96 }
97
98 fn set_ctx(&mut self, ctx: CfgContext) {
99 self.config = ctx;
100 }
101}
102
103impl AmqpServiceConfig {
104 pub fn new() -> Self {
106 AmqpServiceConfig {
107 max_size: 0,
108 max_frame_size: u32::from(u16::MAX),
109 channel_max: 1024,
110 idle_time_out: 120_000,
111 hostname: None,
112 handshake_timeout: Seconds(5),
113 offered_capabilities: None,
114 desired_capabilities: None,
115 config: CfgContext::default(),
116 }
117 }
118
119 #[must_use]
120 pub fn set_channel_max(mut self, num: u16) -> Self {
126 self.channel_max = num;
127 self
128 }
129
130 #[must_use]
131 pub fn set_max_frame_size(mut self, size: u32) -> Self {
135 self.max_frame_size = size;
136 self
137 }
138
139 pub fn get_max_frame_size(&self) -> u32 {
141 self.max_frame_size
142 }
143
144 #[must_use]
145 pub fn set_idle_timeout(mut self, timeout: u16) -> Self {
149 self.idle_time_out = Milliseconds::from(timeout) * 1000;
150 self
151 }
152
153 #[must_use]
154 pub fn set_hostname(mut self, hostname: &str) -> Self {
158 self.hostname = Some(ByteString::from(hostname));
159 self
160 }
161
162 #[must_use]
163 pub fn set_offered_capabilities(mut self, caps: Symbols) -> Self {
165 self.offered_capabilities = Some(caps);
166 self
167 }
168
169 #[must_use]
170 pub fn set_desired_capabilities(mut self, caps: Symbols) -> Self {
172 self.desired_capabilities = Some(caps);
173 self
174 }
175
176 #[must_use]
177 pub fn set_max_size(mut self, size: usize) -> Self {
182 self.max_size = size;
183 self
184 }
185
186 #[must_use]
187 pub fn set_handshake_timeout(mut self, timeout: Seconds) -> Self {
191 self.handshake_timeout = timeout;
192 self
193 }
194
195 pub fn get_offered_capabilities(&self) -> &[Symbol] {
197 if let Some(caps) = &self.offered_capabilities {
198 &caps.0
199 } else {
200 &[]
201 }
202 }
203
204 pub fn get_desired_capabilities(&self) -> &[Symbol] {
206 if let Some(caps) = &self.desired_capabilities {
207 &caps.0
208 } else {
209 &[]
210 }
211 }
212
213 #[must_use]
214 pub fn to_open(&self) -> Open {
216 Open(Box::new(OpenInner {
217 container_id: ByteString::from(Uuid::new_v4().simple().to_string()),
218 hostname: self.hostname.clone(),
219 max_frame_size: self.max_frame_size,
220 channel_max: self.channel_max,
221 idle_time_out: if self.idle_time_out > 0 {
222 Some(self.idle_time_out)
223 } else {
224 None
225 },
226 outgoing_locales: None,
227 incoming_locales: None,
228 offered_capabilities: self.offered_capabilities.clone(),
229 desired_capabilities: self.desired_capabilities.clone(),
230 properties: None,
231 }))
232 }
233}
234
235impl RemoteServiceConfig {
236 #[must_use]
237 pub fn new(open: &Open) -> RemoteServiceConfig {
238 RemoteServiceConfig {
239 max_frame_size: open.max_frame_size(),
240 channel_max: open.channel_max(),
241 idle_time_out: open.idle_time_out().unwrap_or(0),
242 hostname: open.hostname().cloned(),
243 offered_capabilities: open.0.offered_capabilities.clone(),
244 desired_capabilities: open.0.desired_capabilities.clone(),
245 }
246 }
247
248 #[allow(clippy::cast_sign_loss, clippy::cast_precision_loss)]
249 pub(crate) fn timeout_remote_secs(&self) -> Seconds {
250 if self.idle_time_out > 0 {
251 Seconds::checked_new(((self.idle_time_out as f32) * 0.75 / 1000.0) as usize)
252 } else {
253 Seconds::ZERO
254 }
255 }
256}