1use std::fmt;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use bytestring::ByteString;
5use chrono::{DateTime, Utc};
6use derive_more::From;
7use fxhash::FxHashMap;
8use uuid::Uuid;
9
10use super::codec::{self, DecodeFormatted, Encode};
11use super::errors::AmqpParseError;
12use super::message::{InMessage, OutMessage};
13use super::types::*;
14
15impl fmt::Display for Error {
16 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
17 write!(f, "{:?}", self)
18 }
19}
20
21#[derive(Debug)]
22pub(crate) struct CompoundHeader {
23 pub size: u32,
24 pub count: u32,
25}
26
27impl CompoundHeader {
28 pub fn empty() -> CompoundHeader {
29 CompoundHeader { size: 0, count: 0 }
30 }
31}
32
33#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
34pub enum ProtocolId {
35 Amqp = 0,
36 AmqpTls = 2,
37 AmqpSasl = 3,
38}
39
40pub type Map = FxHashMap<Variant, Variant>;
41pub type StringVariantMap = FxHashMap<Str, Variant>;
42pub type Fields = FxHashMap<Symbol, Variant>;
43pub type FilterSet = FxHashMap<Symbol, Option<ByteString>>;
44pub type Timestamp = DateTime<Utc>;
45pub type Symbols = Multiple<Symbol>;
46pub type IetfLanguageTags = Multiple<IetfLanguageTag>;
47pub type Annotations = FxHashMap<Symbol, Variant>;
48
49mod definitions;
50pub use self::definitions::*;
51
52#[derive(Debug, Eq, PartialEq, Clone, From, Display)]
53pub enum MessageId {
54 #[display(fmt = "{}", _0)]
55 Ulong(u64),
56 #[display(fmt = "{}", _0)]
57 Uuid(Uuid),
58 #[display(fmt = "{:?}", _0)]
59 Binary(Bytes),
60 #[display(fmt = "{}", _0)]
61 String(ByteString),
62}
63
64impl From<usize> for MessageId {
65 fn from(id: usize) -> MessageId {
66 MessageId::Ulong(id as u64)
67 }
68}
69
70impl From<i32> for MessageId {
71 fn from(id: i32) -> MessageId {
72 MessageId::Ulong(id as u64)
73 }
74}
75
76impl DecodeFormatted for MessageId {
77 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
78 match fmt {
79 codec::FORMATCODE_SMALLULONG | codec::FORMATCODE_ULONG | codec::FORMATCODE_ULONG_0 => {
80 u64::decode_with_format(input, fmt).map(|(i, o)| (i, MessageId::Ulong(o)))
81 }
82 codec::FORMATCODE_UUID => {
83 Uuid::decode_with_format(input, fmt).map(|(i, o)| (i, MessageId::Uuid(o)))
84 }
85 codec::FORMATCODE_BINARY8 | codec::FORMATCODE_BINARY32 => {
86 Bytes::decode_with_format(input, fmt).map(|(i, o)| (i, MessageId::Binary(o)))
87 }
88 codec::FORMATCODE_STRING8 | codec::FORMATCODE_STRING32 => {
89 ByteString::decode_with_format(input, fmt).map(|(i, o)| (i, MessageId::String(o)))
90 }
91 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
92 }
93 }
94}
95
96impl Encode for MessageId {
97 fn encoded_size(&self) -> usize {
98 match *self {
99 MessageId::Ulong(v) => v.encoded_size(),
100 MessageId::Uuid(ref v) => v.encoded_size(),
101 MessageId::Binary(ref v) => v.encoded_size(),
102 MessageId::String(ref v) => v.encoded_size(),
103 }
104 }
105
106 fn encode(&self, buf: &mut BytesMut) {
107 match *self {
108 MessageId::Ulong(v) => v.encode(buf),
109 MessageId::Uuid(ref v) => v.encode(buf),
110 MessageId::Binary(ref v) => v.encode(buf),
111 MessageId::String(ref v) => v.encode(buf),
112 }
113 }
114}
115
116#[derive(Clone, Debug, PartialEq, From)]
117pub enum ErrorCondition {
118 AmqpError(AmqpError),
119 ConnectionError(ConnectionError),
120 SessionError(SessionError),
121 LinkError(LinkError),
122 Custom(Symbol),
123}
124
125impl DecodeFormatted for ErrorCondition {
126 #[inline]
127 fn decode_with_format(input: &[u8], format: u8) -> Result<(&[u8], Self), AmqpParseError> {
128 let (input, result) = Symbol::decode_with_format(input, format)?;
129 if let Ok(r) = AmqpError::try_from(&result) {
130 return Ok((input, ErrorCondition::AmqpError(r)));
131 }
132 if let Ok(r) = ConnectionError::try_from(&result) {
133 return Ok((input, ErrorCondition::ConnectionError(r)));
134 }
135 if let Ok(r) = SessionError::try_from(&result) {
136 return Ok((input, ErrorCondition::SessionError(r)));
137 }
138 if let Ok(r) = LinkError::try_from(&result) {
139 return Ok((input, ErrorCondition::LinkError(r)));
140 }
141 Ok((input, ErrorCondition::Custom(result)))
142 }
143}
144
145impl Encode for ErrorCondition {
146 fn encoded_size(&self) -> usize {
147 match *self {
148 ErrorCondition::AmqpError(ref v) => v.encoded_size(),
149 ErrorCondition::ConnectionError(ref v) => v.encoded_size(),
150 ErrorCondition::SessionError(ref v) => v.encoded_size(),
151 ErrorCondition::LinkError(ref v) => v.encoded_size(),
152 ErrorCondition::Custom(ref v) => v.encoded_size(),
153 }
154 }
155
156 fn encode(&self, buf: &mut BytesMut) {
157 match *self {
158 ErrorCondition::AmqpError(ref v) => v.encode(buf),
159 ErrorCondition::ConnectionError(ref v) => v.encode(buf),
160 ErrorCondition::SessionError(ref v) => v.encode(buf),
161 ErrorCondition::LinkError(ref v) => v.encode(buf),
162 ErrorCondition::Custom(ref v) => v.encode(buf),
163 }
164 }
165}
166
167#[derive(Clone, Debug, PartialEq)]
168pub enum DistributionMode {
169 Move,
170 Copy,
171 Custom(Symbol),
172}
173
174impl DecodeFormatted for DistributionMode {
175 fn decode_with_format(input: &[u8], format: u8) -> Result<(&[u8], Self), AmqpParseError> {
176 let (input, result) = Symbol::decode_with_format(input, format)?;
177 let result = match result.as_str() {
178 "move" => DistributionMode::Move,
179 "copy" => DistributionMode::Copy,
180 _ => DistributionMode::Custom(result),
181 };
182 Ok((input, result))
183 }
184}
185
186impl Encode for DistributionMode {
187 fn encoded_size(&self) -> usize {
188 match *self {
189 DistributionMode::Move => 6,
190 DistributionMode::Copy => 6,
191 DistributionMode::Custom(ref v) => v.encoded_size(),
192 }
193 }
194
195 fn encode(&self, buf: &mut BytesMut) {
196 match *self {
197 DistributionMode::Move => Symbol::from("move").encode(buf),
198 DistributionMode::Copy => Symbol::from("copy").encode(buf),
199 DistributionMode::Custom(ref v) => v.encode(buf),
200 }
201 }
202}
203
204impl SaslInit {
205 pub fn prepare_response(authz_id: &str, authn_id: &str, password: &str) -> Bytes {
206 Bytes::from(format!("{}\x00{}\x00{}", authz_id, authn_id, password))
207 }
208}
209
210impl Default for Properties {
211 fn default() -> Properties {
212 Properties {
213 message_id: None,
214 user_id: None,
215 to: None,
216 subject: None,
217 reply_to: None,
218 correlation_id: None,
219 content_type: None,
220 content_encoding: None,
221 absolute_expiry_time: None,
222 creation_time: None,
223 group_id: None,
224 group_sequence: None,
225 reply_to_group_id: None,
226 }
227 }
228}
229
230#[derive(Debug, Clone, From, PartialEq)]
231pub enum TransferBody {
232 Data(Bytes),
233 MessageIn(InMessage),
234 MessageOut(OutMessage),
235}
236
237impl TransferBody {
238 #[inline]
239 pub fn len(&self) -> usize {
240 self.encoded_size()
241 }
242
243 #[inline]
244 pub fn message_format(&self) -> Option<MessageFormat> {
245 match self {
246 TransferBody::Data(_) => None,
247 TransferBody::MessageIn(ref data) => data.message_format,
248 TransferBody::MessageOut(ref data) => data.message_format,
249 }
250 }
251}
252
253impl Encode for TransferBody {
254 fn encoded_size(&self) -> usize {
255 match self {
256 TransferBody::Data(ref data) => data.len(),
257 TransferBody::MessageIn(ref data) => data.encoded_size(),
258 TransferBody::MessageOut(ref data) => data.encoded_size(),
259 }
260 }
261 fn encode(&self, dst: &mut BytesMut) {
262 match *self {
263 TransferBody::Data(ref data) => dst.put_slice(&data),
264 TransferBody::MessageIn(ref data) => data.encode(dst),
265 TransferBody::MessageOut(ref data) => data.encode(dst),
266 }
267 }
268}