amqp_codec/protocol/
mod.rs

1use std::fmt;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use bytestring::ByteString;
5use chrono::{DateTime, Utc};
6use derive_more::From;
7use fxhash::FxHashMap;
8use uuid::Uuid;
9
10use super::codec::{self, DecodeFormatted, Encode};
11use super::errors::AmqpParseError;
12use super::message::{InMessage, OutMessage};
13use super::types::*;
14
15impl fmt::Display for Error {
16    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
17        write!(f, "{:?}", self)
18    }
19}
20
21#[derive(Debug)]
22pub(crate) struct CompoundHeader {
23    pub size: u32,
24    pub count: u32,
25}
26
27impl CompoundHeader {
28    pub fn empty() -> CompoundHeader {
29        CompoundHeader { size: 0, count: 0 }
30    }
31}
32
33#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
34pub enum ProtocolId {
35    Amqp = 0,
36    AmqpTls = 2,
37    AmqpSasl = 3,
38}
39
40pub type Map = FxHashMap<Variant, Variant>;
41pub type StringVariantMap = FxHashMap<Str, Variant>;
42pub type Fields = FxHashMap<Symbol, Variant>;
43pub type FilterSet = FxHashMap<Symbol, Option<ByteString>>;
44pub type Timestamp = DateTime<Utc>;
45pub type Symbols = Multiple<Symbol>;
46pub type IetfLanguageTags = Multiple<IetfLanguageTag>;
47pub type Annotations = FxHashMap<Symbol, Variant>;
48
49mod definitions;
50pub use self::definitions::*;
51
52#[derive(Debug, Eq, PartialEq, Clone, From, Display)]
53pub enum MessageId {
54    #[display(fmt = "{}", _0)]
55    Ulong(u64),
56    #[display(fmt = "{}", _0)]
57    Uuid(Uuid),
58    #[display(fmt = "{:?}", _0)]
59    Binary(Bytes),
60    #[display(fmt = "{}", _0)]
61    String(ByteString),
62}
63
64impl From<usize> for MessageId {
65    fn from(id: usize) -> MessageId {
66        MessageId::Ulong(id as u64)
67    }
68}
69
70impl From<i32> for MessageId {
71    fn from(id: i32) -> MessageId {
72        MessageId::Ulong(id as u64)
73    }
74}
75
76impl DecodeFormatted for MessageId {
77    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
78        match fmt {
79            codec::FORMATCODE_SMALLULONG | codec::FORMATCODE_ULONG | codec::FORMATCODE_ULONG_0 => {
80                u64::decode_with_format(input, fmt).map(|(i, o)| (i, MessageId::Ulong(o)))
81            }
82            codec::FORMATCODE_UUID => {
83                Uuid::decode_with_format(input, fmt).map(|(i, o)| (i, MessageId::Uuid(o)))
84            }
85            codec::FORMATCODE_BINARY8 | codec::FORMATCODE_BINARY32 => {
86                Bytes::decode_with_format(input, fmt).map(|(i, o)| (i, MessageId::Binary(o)))
87            }
88            codec::FORMATCODE_STRING8 | codec::FORMATCODE_STRING32 => {
89                ByteString::decode_with_format(input, fmt).map(|(i, o)| (i, MessageId::String(o)))
90            }
91            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
92        }
93    }
94}
95
96impl Encode for MessageId {
97    fn encoded_size(&self) -> usize {
98        match *self {
99            MessageId::Ulong(v) => v.encoded_size(),
100            MessageId::Uuid(ref v) => v.encoded_size(),
101            MessageId::Binary(ref v) => v.encoded_size(),
102            MessageId::String(ref v) => v.encoded_size(),
103        }
104    }
105
106    fn encode(&self, buf: &mut BytesMut) {
107        match *self {
108            MessageId::Ulong(v) => v.encode(buf),
109            MessageId::Uuid(ref v) => v.encode(buf),
110            MessageId::Binary(ref v) => v.encode(buf),
111            MessageId::String(ref v) => v.encode(buf),
112        }
113    }
114}
115
116#[derive(Clone, Debug, PartialEq, From)]
117pub enum ErrorCondition {
118    AmqpError(AmqpError),
119    ConnectionError(ConnectionError),
120    SessionError(SessionError),
121    LinkError(LinkError),
122    Custom(Symbol),
123}
124
125impl DecodeFormatted for ErrorCondition {
126    #[inline]
127    fn decode_with_format(input: &[u8], format: u8) -> Result<(&[u8], Self), AmqpParseError> {
128        let (input, result) = Symbol::decode_with_format(input, format)?;
129        if let Ok(r) = AmqpError::try_from(&result) {
130            return Ok((input, ErrorCondition::AmqpError(r)));
131        }
132        if let Ok(r) = ConnectionError::try_from(&result) {
133            return Ok((input, ErrorCondition::ConnectionError(r)));
134        }
135        if let Ok(r) = SessionError::try_from(&result) {
136            return Ok((input, ErrorCondition::SessionError(r)));
137        }
138        if let Ok(r) = LinkError::try_from(&result) {
139            return Ok((input, ErrorCondition::LinkError(r)));
140        }
141        Ok((input, ErrorCondition::Custom(result)))
142    }
143}
144
145impl Encode for ErrorCondition {
146    fn encoded_size(&self) -> usize {
147        match *self {
148            ErrorCondition::AmqpError(ref v) => v.encoded_size(),
149            ErrorCondition::ConnectionError(ref v) => v.encoded_size(),
150            ErrorCondition::SessionError(ref v) => v.encoded_size(),
151            ErrorCondition::LinkError(ref v) => v.encoded_size(),
152            ErrorCondition::Custom(ref v) => v.encoded_size(),
153        }
154    }
155
156    fn encode(&self, buf: &mut BytesMut) {
157        match *self {
158            ErrorCondition::AmqpError(ref v) => v.encode(buf),
159            ErrorCondition::ConnectionError(ref v) => v.encode(buf),
160            ErrorCondition::SessionError(ref v) => v.encode(buf),
161            ErrorCondition::LinkError(ref v) => v.encode(buf),
162            ErrorCondition::Custom(ref v) => v.encode(buf),
163        }
164    }
165}
166
167#[derive(Clone, Debug, PartialEq)]
168pub enum DistributionMode {
169    Move,
170    Copy,
171    Custom(Symbol),
172}
173
174impl DecodeFormatted for DistributionMode {
175    fn decode_with_format(input: &[u8], format: u8) -> Result<(&[u8], Self), AmqpParseError> {
176        let (input, result) = Symbol::decode_with_format(input, format)?;
177        let result = match result.as_str() {
178            "move" => DistributionMode::Move,
179            "copy" => DistributionMode::Copy,
180            _ => DistributionMode::Custom(result),
181        };
182        Ok((input, result))
183    }
184}
185
186impl Encode for DistributionMode {
187    fn encoded_size(&self) -> usize {
188        match *self {
189            DistributionMode::Move => 6,
190            DistributionMode::Copy => 6,
191            DistributionMode::Custom(ref v) => v.encoded_size(),
192        }
193    }
194
195    fn encode(&self, buf: &mut BytesMut) {
196        match *self {
197            DistributionMode::Move => Symbol::from("move").encode(buf),
198            DistributionMode::Copy => Symbol::from("copy").encode(buf),
199            DistributionMode::Custom(ref v) => v.encode(buf),
200        }
201    }
202}
203
204impl SaslInit {
205    pub fn prepare_response(authz_id: &str, authn_id: &str, password: &str) -> Bytes {
206        Bytes::from(format!("{}\x00{}\x00{}", authz_id, authn_id, password))
207    }
208}
209
210impl Default for Properties {
211    fn default() -> Properties {
212        Properties {
213            message_id: None,
214            user_id: None,
215            to: None,
216            subject: None,
217            reply_to: None,
218            correlation_id: None,
219            content_type: None,
220            content_encoding: None,
221            absolute_expiry_time: None,
222            creation_time: None,
223            group_id: None,
224            group_sequence: None,
225            reply_to_group_id: None,
226        }
227    }
228}
229
230#[derive(Debug, Clone, From, PartialEq)]
231pub enum TransferBody {
232    Data(Bytes),
233    MessageIn(InMessage),
234    MessageOut(OutMessage),
235}
236
237impl TransferBody {
238    #[inline]
239    pub fn len(&self) -> usize {
240        self.encoded_size()
241    }
242
243    #[inline]
244    pub fn message_format(&self) -> Option<MessageFormat> {
245        match self {
246            TransferBody::Data(_) => None,
247            TransferBody::MessageIn(ref data) => data.message_format,
248            TransferBody::MessageOut(ref data) => data.message_format,
249        }
250    }
251}
252
253impl Encode for TransferBody {
254    fn encoded_size(&self) -> usize {
255        match self {
256            TransferBody::Data(ref data) => data.len(),
257            TransferBody::MessageIn(ref data) => data.encoded_size(),
258            TransferBody::MessageOut(ref data) => data.encoded_size(),
259        }
260    }
261    fn encode(&self, dst: &mut BytesMut) {
262        match *self {
263            TransferBody::Data(ref data) => dst.put_slice(&data),
264            TransferBody::MessageIn(ref data) => data.encode(dst),
265            TransferBody::MessageOut(ref data) => data.encode(dst),
266        }
267    }
268}