ntex_amqp_codec/protocol/
mod.rs

1#![allow(clippy::derivable_impls)]
2use std::fmt;
3
4use chrono::{DateTime, Utc};
5use derive_more::From;
6use ntex_bytes::{Buf, BufMut, ByteString, Bytes, BytesMut};
7use uuid::Uuid;
8
9use crate::codec::{self, Decode, DecodeFormatted, Encode};
10use crate::types::{
11    Descriptor, List, Multiple, StaticSymbol, Str, Symbol, Variant, VecStringMap, VecSymbolMap,
12};
13use crate::{error::AmqpParseError, message::Message, HashMap};
14
15impl fmt::Display for Error {
16    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
17        write!(f, "{self:?}")
18    }
19}
20
21#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
22pub enum ProtocolId {
23    Amqp = 0,
24    AmqpTls = 2,
25    AmqpSasl = 3,
26}
27
28pub type Map = HashMap<Variant, Variant>;
29pub type StringVariantMap = HashMap<Str, Variant>;
30pub type Fields = HashMap<Symbol, Variant>;
31pub type FilterSet = HashMap<Symbol, Option<ByteString>>;
32pub type FieldsVec = VecSymbolMap;
33pub type Timestamp = DateTime<Utc>;
34pub type Symbols = Multiple<Symbol>;
35pub type IetfLanguageTags = Multiple<IetfLanguageTag>;
36pub type Annotations = HashMap<Symbol, Variant>;
37
38#[allow(
39    clippy::unreadable_literal,
40    clippy::match_bool,
41    clippy::large_enum_variant
42)]
43mod definitions;
44pub use self::definitions::*;
45
46#[derive(Debug, Eq, PartialEq, Clone, From)]
47pub enum MessageId {
48    Ulong(u64),
49    Uuid(Uuid),
50    Binary(Bytes),
51    String(ByteString),
52}
53
54impl From<usize> for MessageId {
55    fn from(id: usize) -> MessageId {
56        MessageId::Ulong(id as u64)
57    }
58}
59
60impl From<i32> for MessageId {
61    fn from(id: i32) -> MessageId {
62        MessageId::Ulong(id as u64)
63    }
64}
65
66impl DecodeFormatted for MessageId {
67    fn decode_with_format(input: &mut Bytes, fmt: u8) -> Result<Self, AmqpParseError> {
68        match fmt {
69            codec::FORMATCODE_SMALLULONG | codec::FORMATCODE_ULONG | codec::FORMATCODE_ULONG_0 => {
70                u64::decode_with_format(input, fmt).map(MessageId::Ulong)
71            }
72            codec::FORMATCODE_UUID => Uuid::decode_with_format(input, fmt).map(MessageId::Uuid),
73            codec::FORMATCODE_BINARY8 | codec::FORMATCODE_BINARY32 => {
74                Bytes::decode_with_format(input, fmt).map(MessageId::Binary)
75            }
76            codec::FORMATCODE_STRING8 | codec::FORMATCODE_STRING32 => {
77                ByteString::decode_with_format(input, fmt).map(MessageId::String)
78            }
79            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
80        }
81    }
82}
83
84impl Encode for MessageId {
85    fn encoded_size(&self) -> usize {
86        match *self {
87            MessageId::Ulong(v) => v.encoded_size(),
88            MessageId::Uuid(ref v) => v.encoded_size(),
89            MessageId::Binary(ref v) => v.encoded_size(),
90            MessageId::String(ref v) => v.encoded_size(),
91        }
92    }
93
94    fn encode(&self, buf: &mut BytesMut) {
95        match *self {
96            MessageId::Ulong(v) => v.encode(buf),
97            MessageId::Uuid(ref v) => v.encode(buf),
98            MessageId::Binary(ref v) => v.encode(buf),
99            MessageId::String(ref v) => v.encode(buf),
100        }
101    }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, From)]
105pub enum ErrorCondition {
106    AmqpError(AmqpError),
107    ConnectionError(ConnectionError),
108    SessionError(SessionError),
109    LinkError(LinkError),
110    Custom(Symbol),
111}
112
113impl Default for ErrorCondition {
114    fn default() -> ErrorCondition {
115        ErrorCondition::Custom(Symbol(Str::from("Unknown")))
116    }
117}
118
119impl DecodeFormatted for ErrorCondition {
120    #[inline]
121    fn decode_with_format(input: &mut Bytes, format: u8) -> Result<Self, AmqpParseError> {
122        let result = Symbol::decode_with_format(input, format)?;
123        if let Ok(r) = AmqpError::try_from(&result) {
124            return Ok(ErrorCondition::AmqpError(r));
125        }
126        if let Ok(r) = ConnectionError::try_from(&result) {
127            return Ok(ErrorCondition::ConnectionError(r));
128        }
129        if let Ok(r) = SessionError::try_from(&result) {
130            return Ok(ErrorCondition::SessionError(r));
131        }
132        if let Ok(r) = LinkError::try_from(&result) {
133            return Ok(ErrorCondition::LinkError(r));
134        }
135        Ok(ErrorCondition::Custom(result))
136    }
137}
138
139impl Encode for ErrorCondition {
140    fn encoded_size(&self) -> usize {
141        match *self {
142            ErrorCondition::AmqpError(ref v) => v.encoded_size(),
143            ErrorCondition::ConnectionError(ref v) => v.encoded_size(),
144            ErrorCondition::SessionError(ref v) => v.encoded_size(),
145            ErrorCondition::LinkError(ref v) => v.encoded_size(),
146            ErrorCondition::Custom(ref v) => v.encoded_size(),
147        }
148    }
149
150    fn encode(&self, buf: &mut BytesMut) {
151        match *self {
152            ErrorCondition::AmqpError(ref v) => v.encode(buf),
153            ErrorCondition::ConnectionError(ref v) => v.encode(buf),
154            ErrorCondition::SessionError(ref v) => v.encode(buf),
155            ErrorCondition::LinkError(ref v) => v.encode(buf),
156            ErrorCondition::Custom(ref v) => v.encode(buf),
157        }
158    }
159}
160
161#[derive(Clone, Debug, PartialEq, Eq)]
162pub enum DistributionMode {
163    Move,
164    Copy,
165    Custom(Symbol),
166}
167
168impl DecodeFormatted for DistributionMode {
169    fn decode_with_format(input: &mut Bytes, format: u8) -> Result<Self, AmqpParseError> {
170        let result = Symbol::decode_with_format(input, format)?;
171        let result = match result.as_str() {
172            "move" => DistributionMode::Move,
173            "copy" => DistributionMode::Copy,
174            _ => DistributionMode::Custom(result),
175        };
176        Ok(result)
177    }
178}
179
180impl Encode for DistributionMode {
181    fn encoded_size(&self) -> usize {
182        match *self {
183            DistributionMode::Move => 6,
184            DistributionMode::Copy => 6,
185            DistributionMode::Custom(ref v) => v.encoded_size(),
186        }
187    }
188
189    fn encode(&self, buf: &mut BytesMut) {
190        match *self {
191            DistributionMode::Move => Symbol::from("move").encode(buf),
192            DistributionMode::Copy => Symbol::from("copy").encode(buf),
193            DistributionMode::Custom(ref v) => v.encode(buf),
194        }
195    }
196}
197
198impl SaslInit {
199    pub fn prepare_response(authz_id: &str, authn_id: &str, password: &str) -> Bytes {
200        Bytes::from(format!("{authz_id}\x00{authn_id}\x00{password}"))
201    }
202}
203
204#[derive(Debug, Clone, From, PartialEq, Eq)]
205pub enum TransferBody {
206    Data(Bytes),
207    Message(Message),
208}
209
210impl TransferBody {
211    #[inline]
212    pub fn len(&self) -> usize {
213        self.encoded_size()
214    }
215
216    #[inline]
217    pub fn message_format(&self) -> Option<MessageFormat> {
218        match self {
219            TransferBody::Data(_) => None,
220            TransferBody::Message(ref data) => data.0.message_format,
221        }
222    }
223}
224
225impl Encode for TransferBody {
226    #[inline]
227    fn encoded_size(&self) -> usize {
228        match self {
229            TransferBody::Data(ref data) => data.len(),
230            TransferBody::Message(ref data) => data.encoded_size(),
231        }
232    }
233    #[inline]
234    fn encode(&self, dst: &mut BytesMut) {
235        match *self {
236            TransferBody::Data(ref data) => dst.put_slice(data),
237            TransferBody::Message(ref data) => data.encode(dst),
238        }
239    }
240}
241
242impl Transfer {
243    #[inline]
244    pub fn get_body(&self) -> Option<&Bytes> {
245        match self.body() {
246            Some(TransferBody::Data(ref b)) => Some(b),
247            _ => None,
248        }
249    }
250
251    #[inline]
252    pub fn load_message<T: Decode>(&self) -> Result<T, AmqpParseError> {
253        if let Some(TransferBody::Data(ref b)) = self.body() {
254            Ok(T::decode(&mut b.clone())?)
255        } else {
256            Err(AmqpParseError::UnexpectedType("body"))
257        }
258    }
259}
260
261impl Default for Role {
262    fn default() -> Role {
263        Role::Sender
264    }
265}
266
267impl Default for SenderSettleMode {
268    fn default() -> SenderSettleMode {
269        SenderSettleMode::Mixed
270    }
271}
272
273impl Default for ReceiverSettleMode {
274    fn default() -> ReceiverSettleMode {
275        ReceiverSettleMode::First
276    }
277}
278
279impl Default for TerminusDurability {
280    fn default() -> TerminusDurability {
281        TerminusDurability::None
282    }
283}
284
285impl Default for TerminusExpiryPolicy {
286    fn default() -> TerminusExpiryPolicy {
287        TerminusExpiryPolicy::LinkDetach
288    }
289}
290
291impl Default for SaslCode {
292    fn default() -> SaslCode {
293        SaslCode::Ok
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use ntex_bytes::BytesMut;
300    use uuid::Uuid;
301
302    use super::*;
303    use crate::codec::{Decode, Encode};
304    use crate::error::AmqpCodecError;
305
306    #[test]
307    fn test_message_id() -> Result<(), AmqpCodecError> {
308        let id = MessageId::Uuid(Uuid::new_v4());
309
310        let mut buf = BytesMut::new();
311        buf.reserve(id.encoded_size());
312        id.encode(&mut buf);
313
314        let new_id = MessageId::decode(&mut buf.freeze())?;
315        assert_eq!(id, new_id);
316        Ok(())
317    }
318
319    #[test]
320    fn test_properties() -> Result<(), AmqpCodecError> {
321        let id = Uuid::new_v4();
322        let props = Properties {
323            correlation_id: Some(id.into()),
324            ..Default::default()
325        };
326
327        let mut buf = BytesMut::new();
328        buf.reserve(id.encoded_size());
329        props.encode(&mut buf);
330
331        let props2 = Properties::decode(&mut buf.freeze())?;
332        assert_eq!(props, props2);
333        Ok(())
334    }
335}