ntex_amqp_codec/protocol/
mod.rs1#![allow(clippy::derivable_impls)]
2use std::fmt;
3
4use chrono::{DateTime, Utc};
5use derive_more::From;
6use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
7use uuid::Uuid;
8
9use crate::codec::{self, Decode, DecodeFormatted, Encode};
10use crate::types::{
11 Descriptor, List, Multiple, StaticSymbol, Str, Symbol, Variant, VecStringMap, VecSymbolMap,
12};
13use crate::{error::AmqpParseError, message::Message, HashMap};
14
15impl fmt::Display for Error {
16 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
17 write!(f, "{self:?}")
18 }
19}
20
21#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
22pub enum ProtocolId {
23 Amqp = 0,
24 AmqpTls = 2,
25 AmqpSasl = 3,
26}
27
28pub type Map = HashMap<Variant, Variant>;
29pub type StringVariantMap = HashMap<Str, Variant>;
30pub type Fields = HashMap<Symbol, Variant>;
31pub type FilterSet = HashMap<Symbol, Option<ByteString>>;
32pub type FieldsVec = VecSymbolMap;
33pub type Timestamp = DateTime<Utc>;
34pub type Symbols = Multiple<Symbol>;
35pub type IetfLanguageTags = Multiple<IetfLanguageTag>;
36pub type Annotations = HashMap<Symbol, Variant>;
37
38#[allow(
39 clippy::unreadable_literal,
40 clippy::match_bool,
41 clippy::large_enum_variant
42)]
43mod definitions;
44pub use self::definitions::*;
45
46#[derive(Debug, Eq, PartialEq, Clone, From)]
47pub enum MessageId {
48 Ulong(u64),
49 Uuid(Uuid),
50 Binary(Bytes),
51 String(ByteString),
52}
53
54impl From<usize> for MessageId {
55 fn from(id: usize) -> MessageId {
56 MessageId::Ulong(id as u64)
57 }
58}
59
60impl From<i32> for MessageId {
61 fn from(id: i32) -> MessageId {
62 MessageId::Ulong(id as u64)
63 }
64}
65
66impl DecodeFormatted for MessageId {
67 fn decode_with_format(input: &mut Bytes, fmt: u8) -> Result<Self, AmqpParseError> {
68 match fmt {
69 codec::FORMATCODE_SMALLULONG | codec::FORMATCODE_ULONG | codec::FORMATCODE_ULONG_0 => {
70 u64::decode_with_format(input, fmt).map(MessageId::Ulong)
71 }
72 codec::FORMATCODE_UUID => Uuid::decode_with_format(input, fmt).map(MessageId::Uuid),
73 codec::FORMATCODE_BINARY8 | codec::FORMATCODE_BINARY32 => {
74 Bytes::decode_with_format(input, fmt).map(MessageId::Binary)
75 }
76 codec::FORMATCODE_STRING8 | codec::FORMATCODE_STRING32 => {
77 ByteString::decode_with_format(input, fmt).map(MessageId::String)
78 }
79 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
80 }
81 }
82}
83
84impl Encode for MessageId {
85 fn encoded_size(&self) -> usize {
86 match *self {
87 MessageId::Ulong(v) => v.encoded_size(),
88 MessageId::Uuid(ref v) => v.encoded_size(),
89 MessageId::Binary(ref v) => v.encoded_size(),
90 MessageId::String(ref v) => v.encoded_size(),
91 }
92 }
93
94 fn encode(&self, buf: &mut BytesMut) {
95 match *self {
96 MessageId::Ulong(v) => v.encode(buf),
97 MessageId::Uuid(ref v) => v.encode(buf),
98 MessageId::Binary(ref v) => v.encode(buf),
99 MessageId::String(ref v) => v.encode(buf),
100 }
101 }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, From)]
105pub enum ErrorCondition {
106 AmqpError(AmqpError),
107 ConnectionError(ConnectionError),
108 SessionError(SessionError),
109 LinkError(LinkError),
110 Custom(Symbol),
111}
112
113impl Default for ErrorCondition {
114 fn default() -> ErrorCondition {
115 ErrorCondition::Custom(Symbol(Str::from("Unknown")))
116 }
117}
118
119impl DecodeFormatted for ErrorCondition {
120 #[inline]
121 fn decode_with_format(input: &mut Bytes, format: u8) -> Result<Self, AmqpParseError> {
122 let result = Symbol::decode_with_format(input, format)?;
123 if let Ok(r) = AmqpError::try_from(&result) {
124 return Ok(ErrorCondition::AmqpError(r));
125 }
126 if let Ok(r) = ConnectionError::try_from(&result) {
127 return Ok(ErrorCondition::ConnectionError(r));
128 }
129 if let Ok(r) = SessionError::try_from(&result) {
130 return Ok(ErrorCondition::SessionError(r));
131 }
132 if let Ok(r) = LinkError::try_from(&result) {
133 return Ok(ErrorCondition::LinkError(r));
134 }
135 Ok(ErrorCondition::Custom(result))
136 }
137}
138
139impl Encode for ErrorCondition {
140 fn encoded_size(&self) -> usize {
141 match *self {
142 ErrorCondition::AmqpError(ref v) => v.encoded_size(),
143 ErrorCondition::ConnectionError(ref v) => v.encoded_size(),
144 ErrorCondition::SessionError(ref v) => v.encoded_size(),
145 ErrorCondition::LinkError(ref v) => v.encoded_size(),
146 ErrorCondition::Custom(ref v) => v.encoded_size(),
147 }
148 }
149
150 fn encode(&self, buf: &mut BytesMut) {
151 match *self {
152 ErrorCondition::AmqpError(ref v) => v.encode(buf),
153 ErrorCondition::ConnectionError(ref v) => v.encode(buf),
154 ErrorCondition::SessionError(ref v) => v.encode(buf),
155 ErrorCondition::LinkError(ref v) => v.encode(buf),
156 ErrorCondition::Custom(ref v) => v.encode(buf),
157 }
158 }
159}
160
161#[derive(Clone, Debug, PartialEq, Eq)]
162pub enum DistributionMode {
163 Move,
164 Copy,
165 Custom(Symbol),
166}
167
168impl DecodeFormatted for DistributionMode {
169 fn decode_with_format(input: &mut Bytes, format: u8) -> Result<Self, AmqpParseError> {
170 let result = Symbol::decode_with_format(input, format)?;
171 let result = match result.as_str() {
172 "move" => DistributionMode::Move,
173 "copy" => DistributionMode::Copy,
174 _ => DistributionMode::Custom(result),
175 };
176 Ok(result)
177 }
178}
179
180impl Encode for DistributionMode {
181 fn encoded_size(&self) -> usize {
182 match *self {
183 DistributionMode::Move => 6,
184 DistributionMode::Copy => 6,
185 DistributionMode::Custom(ref v) => v.encoded_size(),
186 }
187 }
188
189 fn encode(&self, buf: &mut BytesMut) {
190 match *self {
191 DistributionMode::Move => Symbol::from("move").encode(buf),
192 DistributionMode::Copy => Symbol::from("copy").encode(buf),
193 DistributionMode::Custom(ref v) => v.encode(buf),
194 }
195 }
196}
197
198impl SaslInit {
199 pub fn prepare_response(authz_id: &str, authn_id: &str, password: &str) -> Bytes {
200 Bytes::from(format!("{authz_id}\x00{authn_id}\x00{password}"))
201 }
202}
203
204#[derive(Debug, Clone, From, PartialEq, Eq)]
205pub enum TransferBody {
206 Data(Bytes),
207 Message(Message),
208}
209
210impl TransferBody {
211 #[inline]
212 pub fn len(&self) -> usize {
213 self.encoded_size()
214 }
215
216 #[inline]
217 pub fn message_format(&self) -> Option<MessageFormat> {
218 match self {
219 TransferBody::Data(_) => None,
220 TransferBody::Message(ref data) => data.0.message_format,
221 }
222 }
223}
224
225impl Encode for TransferBody {
226 #[inline]
227 fn encoded_size(&self) -> usize {
228 match self {
229 TransferBody::Data(ref data) => data.len(),
230 TransferBody::Message(ref data) => data.encoded_size(),
231 }
232 }
233 #[inline]
234 fn encode(&self, dst: &mut BytesMut) {
235 match *self {
236 TransferBody::Data(ref data) => dst.put_slice(data),
237 TransferBody::Message(ref data) => data.encode(dst),
238 }
239 }
240}
241
242impl Transfer {
243 #[inline]
244 pub fn get_body(&self) -> Option<&Bytes> {
245 match self.body() {
246 Some(TransferBody::Data(ref b)) => Some(b),
247 _ => None,
248 }
249 }
250
251 #[inline]
252 pub fn load_message<T: Decode>(&self) -> Result<T, AmqpParseError> {
253 if let Some(TransferBody::Data(ref b)) = self.body() {
254 Ok(T::decode(&mut b.clone())?)
255 } else {
256 Err(AmqpParseError::UnexpectedType("body"))
257 }
258 }
259}
260
261impl Default for Role {
262 fn default() -> Role {
263 Role::Sender
264 }
265}
266
267impl Default for SenderSettleMode {
268 fn default() -> SenderSettleMode {
269 SenderSettleMode::Mixed
270 }
271}
272
273impl Default for ReceiverSettleMode {
274 fn default() -> ReceiverSettleMode {
275 ReceiverSettleMode::First
276 }
277}
278
279impl Default for TerminusDurability {
280 fn default() -> TerminusDurability {
281 TerminusDurability::None
282 }
283}
284
285impl Default for TerminusExpiryPolicy {
286 fn default() -> TerminusExpiryPolicy {
287 TerminusExpiryPolicy::LinkDetach
288 }
289}
290
291impl Default for SaslCode {
292 fn default() -> SaslCode {
293 SaslCode::Ok
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use ntex_bytes::BytesMut;
300 use uuid::Uuid;
301
302 use super::*;
303 use crate::codec::{Decode, Encode};
304 use crate::error::AmqpCodecError;
305
306 #[test]
307 fn test_message_id() -> Result<(), AmqpCodecError> {
308 let id = MessageId::Uuid(Uuid::new_v4());
309
310 let mut buf = BytesMut::new();
311 buf.reserve(id.encoded_size());
312 id.encode(&mut buf);
313
314 let new_id = MessageId::decode(&mut buf.freeze())?;
315 assert_eq!(id, new_id);
316 Ok(())
317 }
318
319 #[test]
320 fn test_properties() -> Result<(), AmqpCodecError> {
321 let id = Uuid::new_v4();
322 let props = Properties {
323 correlation_id: Some(id.into()),
324 ..Default::default()
325 };
326
327 let mut buf = BytesMut::new();
328 buf.reserve(id.encoded_size());
329 props.encode(&mut buf);
330
331 let props2 = Properties::decode(&mut buf.freeze())?;
332 assert_eq!(props, props2);
333 Ok(())
334 }
335}