1use std::collections::HashMap;
2use std::convert::TryFrom;
3use std::hash::{BuildHasher, Hash};
4use std::{char, str, u8};
5
6use byteorder::{BigEndian, ByteOrder};
7use bytes::Bytes;
8use bytestring::ByteString;
9use chrono::{DateTime, TimeZone, Utc};
10use fxhash::FxHashMap;
11use ordered_float::OrderedFloat;
12use uuid::Uuid;
13
14use crate::codec::{self, ArrayDecode, Decode, DecodeFormatted};
15use crate::errors::AmqpParseError;
16use crate::framing::{self, AmqpFrame, SaslFrame, HEADER_LEN};
17use crate::protocol::{self, CompoundHeader};
18use crate::types::{
19 Descriptor, List, Multiple, Str, Symbol, Variant, VariantMap, VecStringMap, VecSymbolMap,
20};
21
22macro_rules! be_read {
23 ($input:ident, $fn:ident, $size:expr) => {{
24 decode_check_len!($input, $size);
25 let x = BigEndian::$fn($input);
26 Ok((&$input[$size..], x))
27 }};
28}
29
30fn read_u8(input: &[u8]) -> Result<(&[u8], u8), AmqpParseError> {
31 decode_check_len!(input, 1);
32 Ok((&input[1..], input[0]))
33}
34
35fn read_i8(input: &[u8]) -> Result<(&[u8], i8), AmqpParseError> {
36 decode_check_len!(input, 1);
37 Ok((&input[1..], input[0] as i8))
38}
39
40fn read_bytes_u8(input: &[u8]) -> Result<(&[u8], &[u8]), AmqpParseError> {
41 let (input, len) = read_u8(input)?;
42 let len = len as usize;
43 decode_check_len!(input, len);
44 let (bytes, input) = input.split_at(len);
45 Ok((input, bytes))
46}
47
48fn read_bytes_u32(input: &[u8]) -> Result<(&[u8], &[u8]), AmqpParseError> {
49 let result: Result<(&[u8], u32), AmqpParseError> = be_read!(input, read_u32, 4);
50 let (input, len) = result?;
51 let len = len as usize;
52 decode_check_len!(input, len);
53 let (bytes, input) = input.split_at(len);
54 Ok((input, bytes))
55}
56
57#[macro_export]
58macro_rules! validate_code {
59 ($fmt:ident, $code:expr) => {
60 if $fmt != $code {
61 return Err(AmqpParseError::InvalidFormatCode($fmt));
62 }
63 };
64}
65
66impl DecodeFormatted for bool {
67 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
68 match fmt {
69 codec::FORMATCODE_BOOLEAN => read_u8(input).map(|(i, o)| (i, o != 0)),
70 codec::FORMATCODE_BOOLEAN_TRUE => Ok((input, true)),
71 codec::FORMATCODE_BOOLEAN_FALSE => Ok((input, false)),
72 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
73 }
74 }
75}
76
77impl DecodeFormatted for u8 {
78 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
79 validate_code!(fmt, codec::FORMATCODE_UBYTE);
80 read_u8(input)
81 }
82}
83
84impl DecodeFormatted for u16 {
85 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
86 validate_code!(fmt, codec::FORMATCODE_USHORT);
87 be_read!(input, read_u16, 2)
88 }
89}
90
91impl DecodeFormatted for u32 {
92 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
93 match fmt {
94 codec::FORMATCODE_UINT => be_read!(input, read_u32, 4),
95 codec::FORMATCODE_SMALLUINT => read_u8(input).map(|(i, o)| (i, u32::from(o))),
96 codec::FORMATCODE_UINT_0 => Ok((input, 0)),
97 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
98 }
99 }
100}
101
102impl DecodeFormatted for u64 {
103 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
104 match fmt {
105 codec::FORMATCODE_ULONG => be_read!(input, read_u64, 8),
106 codec::FORMATCODE_SMALLULONG => read_u8(input).map(|(i, o)| (i, u64::from(o))),
107 codec::FORMATCODE_ULONG_0 => Ok((input, 0)),
108 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
109 }
110 }
111}
112
113impl DecodeFormatted for i8 {
114 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
115 validate_code!(fmt, codec::FORMATCODE_BYTE);
116 read_i8(input)
117 }
118}
119
120impl DecodeFormatted for i16 {
121 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
122 validate_code!(fmt, codec::FORMATCODE_SHORT);
123 be_read!(input, read_i16, 2)
124 }
125}
126
127impl DecodeFormatted for i32 {
128 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
129 match fmt {
130 codec::FORMATCODE_INT => be_read!(input, read_i32, 4),
131 codec::FORMATCODE_SMALLINT => read_i8(input).map(|(i, o)| (i, i32::from(o))),
132 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
133 }
134 }
135}
136
137impl DecodeFormatted for i64 {
138 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
139 match fmt {
140 codec::FORMATCODE_LONG => be_read!(input, read_i64, 8),
141 codec::FORMATCODE_SMALLLONG => read_i8(input).map(|(i, o)| (i, i64::from(o))),
142 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
143 }
144 }
145}
146
147impl DecodeFormatted for f32 {
148 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
149 validate_code!(fmt, codec::FORMATCODE_FLOAT);
150 be_read!(input, read_f32, 4)
151 }
152}
153
154impl DecodeFormatted for f64 {
155 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
156 validate_code!(fmt, codec::FORMATCODE_DOUBLE);
157 be_read!(input, read_f64, 8)
158 }
159}
160
161impl DecodeFormatted for char {
162 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
163 validate_code!(fmt, codec::FORMATCODE_CHAR);
164 let result: Result<(&[u8], u32), AmqpParseError> = be_read!(input, read_u32, 4);
165 let (i, o) = result?;
166 if let Some(c) = char::from_u32(o) {
167 Ok((i, c))
168 } else {
169 Err(AmqpParseError::InvalidChar(o))
170 } }
172}
173
174impl DecodeFormatted for DateTime<Utc> {
175 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
176 validate_code!(fmt, codec::FORMATCODE_TIMESTAMP);
177 be_read!(input, read_i64, 8).map(|(i, o)| (i, datetime_from_millis(o)))
178 }
179}
180
181impl DecodeFormatted for Uuid {
182 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
183 validate_code!(fmt, codec::FORMATCODE_UUID);
184 decode_check_len!(input, 16);
185 let uuid = Uuid::from_slice(&input[..16])?;
186 Ok((&input[16..], uuid))
187 }
188}
189
190impl DecodeFormatted for Bytes {
191 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
192 match fmt {
193 codec::FORMATCODE_BINARY8 => {
194 read_bytes_u8(input).map(|(i, o)| (i, Bytes::copy_from_slice(o)))
195 }
196 codec::FORMATCODE_BINARY32 => {
197 read_bytes_u32(input).map(|(i, o)| (i, Bytes::copy_from_slice(o)))
198 }
199 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
200 }
201 }
202}
203
204impl DecodeFormatted for ByteString {
205 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
206 match fmt {
207 codec::FORMATCODE_STRING8 => {
208 let (input, bytes) = read_bytes_u8(input)?;
209 Ok((input, ByteString::try_from(bytes)?))
210 }
211 codec::FORMATCODE_STRING32 => {
212 let (input, bytes) = read_bytes_u32(input)?;
213 Ok((input, ByteString::try_from(bytes)?))
214 }
215 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
216 }
217 }
218}
219
220impl DecodeFormatted for Str {
221 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
222 match fmt {
223 codec::FORMATCODE_STRING8 => {
224 let (input, bytes) = read_bytes_u8(input)?;
225 Ok((input, Str::from_str(str::from_utf8(bytes)?)))
226 }
227 codec::FORMATCODE_STRING32 => {
228 let (input, bytes) = read_bytes_u32(input)?;
229 Ok((input, Str::from_str(str::from_utf8(bytes)?)))
230 }
231 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
232 }
233 }
234}
235
236impl DecodeFormatted for Symbol {
237 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
238 match fmt {
239 codec::FORMATCODE_SYMBOL8 => {
240 let (input, bytes) = read_bytes_u8(input)?;
241 Ok((input, Symbol::from_slice(str::from_utf8(bytes)?)))
242 }
243 codec::FORMATCODE_SYMBOL32 => {
244 let (input, bytes) = read_bytes_u32(input)?;
245 Ok((input, Symbol::from_slice(str::from_utf8(bytes)?)))
246 }
247 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
248 }
249 }
250}
251
252impl ArrayDecode for Symbol {
253 fn array_decode(input: &[u8]) -> Result<(&[u8], Self), AmqpParseError> {
254 let (input, bytes) = read_bytes_u32(input)?;
255 Ok((input, Symbol::from_slice(str::from_utf8(bytes)?)))
256 }
257}
258
259impl<K: Decode + Eq + Hash, V: Decode, S: BuildHasher + Default> DecodeFormatted
260 for HashMap<K, V, S>
261{
262 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
263 let (input, header) = decode_map_header(input, fmt)?;
264 let mut map_input = &input[..header.size as usize];
265 let count = header.count / 2;
266 let mut map: HashMap<K, V, S> =
267 HashMap::with_capacity_and_hasher(count as usize, Default::default());
268 for _ in 0..count {
269 let (input1, key) = K::decode(map_input)?;
270 let (input2, value) = V::decode(input1)?;
271 map_input = input2;
272 map.insert(key, value); }
274 Ok((&input[header.size as usize..], map))
276 }
277}
278
279impl<T: DecodeFormatted> DecodeFormatted for Vec<T> {
280 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
281 let (input, header) = decode_array_header(input, fmt)?;
282 let item_fmt = input[0]; let mut input = &input[1..];
284 let mut result: Vec<T> = Vec::with_capacity(header.count as usize);
285 for _ in 0..header.count {
286 let (new_input, decoded) = T::decode_with_format(input, item_fmt)?;
287 result.push(decoded);
288 input = new_input;
289 }
290 Ok((input, result))
291 }
292}
293
294impl DecodeFormatted for VecSymbolMap {
295 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
296 let (input, header) = decode_map_header(input, fmt)?;
297 let mut map_input = &input[..header.size as usize];
298 let count = header.count / 2;
299 let mut map = Vec::with_capacity(count as usize);
300 for _ in 0..count {
301 let (input1, key) = Symbol::decode(map_input)?;
302 let (input2, value) = Variant::decode(input1)?;
303 map_input = input2;
304 map.push((key, value)); }
306 Ok((&input[header.size as usize..], VecSymbolMap(map)))
308 }
309}
310
311impl DecodeFormatted for VecStringMap {
312 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
313 let (input, header) = decode_map_header(input, fmt)?;
314 let mut map_input = &input[..header.size as usize];
315 let count = header.count / 2;
316 let mut map = Vec::with_capacity(count as usize);
317 for _ in 0..count {
318 let (input1, key) = Str::decode(map_input)?;
319 let (input2, value) = Variant::decode(input1)?;
320 map_input = input2;
321 map.push((key, value)); }
323 Ok((&input[header.size as usize..], VecStringMap(map)))
325 }
326}
327
328impl<T: ArrayDecode + DecodeFormatted> DecodeFormatted for Multiple<T> {
329 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
330 match fmt {
331 codec::FORMATCODE_ARRAY8 | codec::FORMATCODE_ARRAY32 => {
332 let (input, items) = Vec::<T>::decode_with_format(input, fmt)?;
333 Ok((input, Multiple(items)))
334 }
335 _ => {
336 let (input, item) = T::decode_with_format(input, fmt)?;
337 Ok((input, Multiple(vec![item])))
338 }
339 }
340 }
341}
342
343impl DecodeFormatted for List {
344 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
345 let (mut input, header) = decode_list_header(input, fmt)?;
346 let mut result: Vec<Variant> = Vec::with_capacity(header.count as usize);
347 for _ in 0..header.count {
348 let (new_input, decoded) = Variant::decode(input)?;
349 result.push(decoded);
350 input = new_input;
351 }
352 Ok((input, List(result)))
353 }
354}
355
356impl DecodeFormatted for Variant {
357 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
358 match fmt {
359 codec::FORMATCODE_NULL => Ok((input, Variant::Null)),
360 codec::FORMATCODE_BOOLEAN => {
361 bool::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Boolean(o)))
362 }
363 codec::FORMATCODE_BOOLEAN_FALSE => Ok((input, Variant::Boolean(false))),
364 codec::FORMATCODE_BOOLEAN_TRUE => Ok((input, Variant::Boolean(true))),
365 codec::FORMATCODE_UINT_0 => Ok((input, Variant::Uint(0))),
366 codec::FORMATCODE_ULONG_0 => Ok((input, Variant::Ulong(0))),
367 codec::FORMATCODE_UBYTE => {
368 u8::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Ubyte(o)))
369 }
370 codec::FORMATCODE_USHORT => {
371 u16::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Ushort(o)))
372 }
373 codec::FORMATCODE_UINT => {
374 u32::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Uint(o)))
375 }
376 codec::FORMATCODE_ULONG => {
377 u64::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Ulong(o)))
378 }
379 codec::FORMATCODE_BYTE => {
380 i8::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Byte(o)))
381 }
382 codec::FORMATCODE_SHORT => {
383 i16::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Short(o)))
384 }
385 codec::FORMATCODE_INT => {
386 i32::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Int(o)))
387 }
388 codec::FORMATCODE_LONG => {
389 i64::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Long(o)))
390 }
391 codec::FORMATCODE_SMALLUINT => {
392 u32::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Uint(o)))
393 }
394 codec::FORMATCODE_SMALLULONG => {
395 u64::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Ulong(o)))
396 }
397 codec::FORMATCODE_SMALLINT => {
398 i32::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Int(o)))
399 }
400 codec::FORMATCODE_SMALLLONG => {
401 i64::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Long(o)))
402 }
403 codec::FORMATCODE_FLOAT => f32::decode_with_format(input, fmt)
404 .map(|(i, o)| (i, Variant::Float(OrderedFloat(o)))),
405 codec::FORMATCODE_DOUBLE => f64::decode_with_format(input, fmt)
406 .map(|(i, o)| (i, Variant::Double(OrderedFloat(o)))),
407 codec::FORMATCODE_CHAR => {
411 char::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Char(o)))
412 }
413 codec::FORMATCODE_TIMESTAMP => DateTime::<Utc>::decode_with_format(input, fmt)
414 .map(|(i, o)| (i, Variant::Timestamp(o))),
415 codec::FORMATCODE_UUID => {
416 Uuid::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Uuid(o)))
417 }
418 codec::FORMATCODE_BINARY8 => {
419 Bytes::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Binary(o)))
420 }
421 codec::FORMATCODE_BINARY32 => {
422 Bytes::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Binary(o)))
423 }
424 codec::FORMATCODE_STRING8 => ByteString::decode_with_format(input, fmt)
425 .map(|(i, o)| (i, Variant::String(o.into()))),
426 codec::FORMATCODE_STRING32 => ByteString::decode_with_format(input, fmt)
427 .map(|(i, o)| (i, Variant::String(o.into()))),
428 codec::FORMATCODE_SYMBOL8 => {
429 Symbol::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Symbol(o)))
430 }
431 codec::FORMATCODE_SYMBOL32 => {
432 Symbol::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Symbol(o)))
433 }
434 codec::FORMATCODE_LIST0 => Ok((input, Variant::List(List(vec![])))),
435 codec::FORMATCODE_LIST8 => {
436 List::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::List(o)))
437 }
438 codec::FORMATCODE_LIST32 => {
439 List::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::List(o)))
440 }
441 codec::FORMATCODE_MAP8 => FxHashMap::<Variant, Variant>::decode_with_format(input, fmt)
442 .map(|(i, o)| (i, Variant::Map(VariantMap::new(o)))),
443 codec::FORMATCODE_MAP32 => {
444 FxHashMap::<Variant, Variant>::decode_with_format(input, fmt)
445 .map(|(i, o)| (i, Variant::Map(VariantMap::new(o))))
446 }
447 codec::FORMATCODE_DESCRIBED => {
450 let (input, descriptor) = Descriptor::decode(input)?;
451 let (input, value) = Variant::decode(input)?;
452 Ok((input, Variant::Described((descriptor, Box::new(value)))))
453 }
454 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
455 }
456 }
457}
458
459impl<T: DecodeFormatted> DecodeFormatted for Option<T> {
460 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
461 match fmt {
462 codec::FORMATCODE_NULL => Ok((input, None)),
463 _ => T::decode_with_format(input, fmt).map(|(i, o)| (i, Some(o))),
464 }
465 }
466}
467
468impl DecodeFormatted for Descriptor {
469 fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
470 match fmt {
471 codec::FORMATCODE_SMALLULONG => {
472 u64::decode_with_format(input, fmt).map(|(i, o)| (i, Descriptor::Ulong(o)))
473 }
474 codec::FORMATCODE_ULONG => {
475 u64::decode_with_format(input, fmt).map(|(i, o)| (i, Descriptor::Ulong(o)))
476 }
477 codec::FORMATCODE_SYMBOL8 => {
478 Symbol::decode_with_format(input, fmt).map(|(i, o)| (i, Descriptor::Symbol(o)))
479 }
480 codec::FORMATCODE_SYMBOL32 => {
481 Symbol::decode_with_format(input, fmt).map(|(i, o)| (i, Descriptor::Symbol(o)))
482 }
483 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
484 }
485 }
486}
487
488impl Decode for AmqpFrame {
489 fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpParseError> {
490 let (input, channel_id) = decode_frame_header(input, framing::FRAME_TYPE_AMQP)?;
491 let (input, performative) = protocol::Frame::decode(input)?;
492 Ok((input, AmqpFrame::new(channel_id, performative)))
493 }
494}
495
496impl Decode for SaslFrame {
497 fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpParseError> {
498 let (input, _) = decode_frame_header(input, framing::FRAME_TYPE_SASL)?;
499 let (input, frame) = protocol::SaslFrameBody::decode(input)?;
500 Ok((input, SaslFrame { body: frame }))
501 }
502}
503
504fn decode_frame_header(
505 input: &[u8],
506 expected_frame_type: u8,
507) -> Result<(&[u8], u16), AmqpParseError> {
508 decode_check_len!(input, 4);
509 let doff = input[0];
510 let frame_type = input[1];
511 if frame_type != expected_frame_type {
512 return Err(AmqpParseError::UnexpectedFrameType(frame_type));
513 }
514
515 let channel_id = BigEndian::read_u16(&input[2..]);
516 let doff = doff as usize * 4;
517 if doff < HEADER_LEN {
518 return Err(AmqpParseError::InvalidSize);
519 }
520 let ext_header_len = doff - HEADER_LEN;
521 decode_check_len!(input, ext_header_len + 4);
522 let input = &input[ext_header_len + 4..]; Ok((input, channel_id))
524}
525
526fn decode_array_header(input: &[u8], fmt: u8) -> Result<(&[u8], CompoundHeader), AmqpParseError> {
527 match fmt {
528 codec::FORMATCODE_ARRAY8 => decode_compound8(input),
529 codec::FORMATCODE_ARRAY32 => decode_compound32(input),
530 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
531 }
532}
533
534pub(crate) fn decode_list_header(
535 input: &[u8],
536 fmt: u8,
537) -> Result<(&[u8], CompoundHeader), AmqpParseError> {
538 match fmt {
539 codec::FORMATCODE_LIST0 => Ok((input, CompoundHeader::empty())),
540 codec::FORMATCODE_LIST8 => decode_compound8(input),
541 codec::FORMATCODE_LIST32 => decode_compound32(input),
542 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
543 }
544}
545
546pub(crate) fn decode_map_header(
547 input: &[u8],
548 fmt: u8,
549) -> Result<(&[u8], CompoundHeader), AmqpParseError> {
550 match fmt {
551 codec::FORMATCODE_MAP8 => decode_compound8(input),
552 codec::FORMATCODE_MAP32 => decode_compound32(input),
553 _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
554 }
555}
556
557fn decode_compound8(input: &[u8]) -> Result<(&[u8], CompoundHeader), AmqpParseError> {
558 decode_check_len!(input, 2);
559 let size = input[0] - 1; let count = input[1];
561 Ok((
562 &input[2..],
563 CompoundHeader {
564 size: u32::from(size),
565 count: u32::from(count),
566 },
567 ))
568}
569
570fn decode_compound32(input: &[u8]) -> Result<(&[u8], CompoundHeader), AmqpParseError> {
571 decode_check_len!(input, 8);
572 let size = BigEndian::read_u32(input) - 4; let count = BigEndian::read_u32(&input[4..]);
574 Ok((&input[8..], CompoundHeader { size, count }))
575}
576
577fn datetime_from_millis(millis: i64) -> DateTime<Utc> {
578 let seconds = millis / 1000;
579 if seconds < 0 {
580 let nanoseconds = ((1000 + (millis - (seconds * 1000))) * 1_000_000).abs() as u32;
584 Utc.timestamp(seconds - 1, nanoseconds)
585 } else {
586 let nanoseconds = ((millis - (seconds * 1000)) * 1_000_000).abs() as u32;
587 Utc.timestamp(seconds, nanoseconds)
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594 use crate::codec::Encode;
595 use bytes::{BufMut, BytesMut};
596
597 const LOREM: &str = include_str!("lorem.txt");
598
599 macro_rules! decode_tests {
600 ($($name:ident: $kind:ident, $test:expr, $expected:expr,)*) => {
601 $(
602 #[test]
603 fn $name() {
604 let b1 = &mut BytesMut::with_capacity(($test).encoded_size());
605 ($test).encode(b1);
606 assert_eq!($expected, unwrap_value($kind::decode(b1)));
607 }
608 )*
609 }
610 }
611
612 decode_tests! {
613 ubyte: u8, 255_u8, 255_u8,
614 ushort: u16, 350_u16, 350_u16,
615
616 uint_zero: u32, 0_u32, 0_u32,
617 uint_small: u32, 128_u32, 128_u32,
618 uint_big: u32, 2147483647_u32, 2147483647_u32,
619
620 ulong_zero: u64, 0_u64, 0_u64,
621 ulong_small: u64, 128_u64, 128_u64,
622 uulong_big: u64, 2147483649_u64, 2147483649_u64,
623
624 byte: i8, -128_i8, -128_i8,
625 short: i16, -255_i16, -255_i16,
626
627 int_zero: i32, 0_i32, 0_i32,
628 int_small: i32, -50000_i32, -50000_i32,
629 int_neg: i32, -128_i32, -128_i32,
630
631 long_zero: i64, 0_i64, 0_i64,
632 long_big: i64, -2147483647_i64, -2147483647_i64,
633 long_small: i64, -128_i64, -128_i64,
634
635 float: f32, 1.234_f32, 1.234_f32,
636 double: f64, 1.234_f64, 1.234_f64,
637
638 test_char: char, '💯', '💯',
639
640 uuid: Uuid, Uuid::from_slice(&[4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87]).expect("parse error"),
641 Uuid::parse_str("0436430c2b02624c2032570501212b57").expect("parse error"),
642
643 binary_short: Bytes, Bytes::from(&[4u8, 5u8][..]), Bytes::from(&[4u8, 5u8][..]),
644 binary_long: Bytes, Bytes::from(&[4u8; 500][..]), Bytes::from(&[4u8; 500][..]),
645
646 string_short: ByteString, ByteString::from("Hello there"), ByteString::from("Hello there"),
647 string_long: ByteString, ByteString::from(LOREM), ByteString::from(LOREM),
648
649 variant_ubyte: Variant, Variant::Ubyte(255_u8), Variant::Ubyte(255_u8),
653 variant_ushort: Variant, Variant::Ushort(350_u16), Variant::Ushort(350_u16),
654
655 variant_uint_zero: Variant, Variant::Uint(0_u32), Variant::Uint(0_u32),
656 variant_uint_small: Variant, Variant::Uint(128_u32), Variant::Uint(128_u32),
657 variant_uint_big: Variant, Variant::Uint(2147483647_u32), Variant::Uint(2147483647_u32),
658
659 variant_ulong_zero: Variant, Variant::Ulong(0_u64), Variant::Ulong(0_u64),
660 variant_ulong_small: Variant, Variant::Ulong(128_u64), Variant::Ulong(128_u64),
661 variant_ulong_big: Variant, Variant::Ulong(2147483649_u64), Variant::Ulong(2147483649_u64),
662
663 variant_byte: Variant, Variant::Byte(-128_i8), Variant::Byte(-128_i8),
664 variant_short: Variant, Variant::Short(-255_i16), Variant::Short(-255_i16),
665
666 variant_int_zero: Variant, Variant::Int(0_i32), Variant::Int(0_i32),
667 variant_int_small: Variant, Variant::Int(-50000_i32), Variant::Int(-50000_i32),
668 variant_int_neg: Variant, Variant::Int(-128_i32), Variant::Int(-128_i32),
669
670 variant_long_zero: Variant, Variant::Long(0_i64), Variant::Long(0_i64),
671 variant_long_big: Variant, Variant::Long(-2147483647_i64), Variant::Long(-2147483647_i64),
672 variant_long_small: Variant, Variant::Long(-128_i64), Variant::Long(-128_i64),
673
674 variant_float: Variant, Variant::Float(OrderedFloat(1.234_f32)), Variant::Float(OrderedFloat(1.234_f32)),
675 variant_double: Variant, Variant::Double(OrderedFloat(1.234_f64)), Variant::Double(OrderedFloat(1.234_f64)),
676
677 variant_char: Variant, Variant::Char('💯'), Variant::Char('💯'),
678
679 variant_uuid: Variant, Variant::Uuid(Uuid::from_slice(&[4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87]).expect("parse error")),
680 Variant::Uuid(Uuid::parse_str("0436430c2b02624c2032570501212b57").expect("parse error")),
681
682 variant_binary_short: Variant, Variant::Binary(Bytes::from(&[4u8, 5u8][..])), Variant::Binary(Bytes::from(&[4u8, 5u8][..])),
683 variant_binary_long: Variant, Variant::Binary(Bytes::from(&[4u8; 500][..])), Variant::Binary(Bytes::from(&[4u8; 500][..])),
684
685 variant_string_short: Variant, Variant::String(ByteString::from("Hello there").into()), Variant::String(ByteString::from("Hello there").into()),
686 variant_string_long: Variant, Variant::String(ByteString::from(LOREM).into()), Variant::String(ByteString::from(LOREM).into()),
687
688 }
691
692 fn unwrap_value<T>(res: Result<(&[u8], T), AmqpParseError>) -> T {
693 let r = res.map(|(_i, o)| o);
694 assert!(r.is_ok());
695 r.unwrap()
696 }
697
698 #[test]
699 fn test_bool_true() {
700 let b1 = &mut BytesMut::with_capacity(0);
701 b1.put_u8(0x41);
702 assert_eq!(true, unwrap_value(bool::decode(b1)));
703
704 let b2 = &mut BytesMut::with_capacity(0);
705 b2.put_u8(0x56);
706 b2.put_u8(0x01);
707 assert_eq!(true, unwrap_value(bool::decode(b2)));
708 }
709
710 #[test]
711 fn test_bool_false() {
712 let b1 = &mut BytesMut::with_capacity(0);
713 b1.put_u8(0x42u8);
714 assert_eq!(false, unwrap_value(bool::decode(b1)));
715
716 let b2 = &mut BytesMut::with_capacity(0);
717 b2.put_u8(0x56);
718 b2.put_u8(0x00);
719 assert_eq!(false, unwrap_value(bool::decode(b2)));
720 }
721
722 #[test]
725 fn test_timestamp() {
726 let b1 = &mut BytesMut::with_capacity(0);
727 let datetime = Utc.ymd(2011, 7, 26).and_hms_milli(18, 21, 3, 521);
728 datetime.encode(b1);
729
730 let expected = Utc.ymd(2011, 7, 26).and_hms_milli(18, 21, 3, 521);
731 assert_eq!(expected, unwrap_value(DateTime::<Utc>::decode(b1)));
732 }
733
734 #[test]
735 fn test_timestamp_pre_unix() {
736 let b1 = &mut BytesMut::with_capacity(0);
737 let datetime = Utc.ymd(1968, 7, 26).and_hms_milli(18, 21, 3, 521);
738 datetime.encode(b1);
739
740 let expected = Utc.ymd(1968, 7, 26).and_hms_milli(18, 21, 3, 521);
741 assert_eq!(expected, unwrap_value(DateTime::<Utc>::decode(b1)));
742 }
743
744 #[test]
745 fn variant_null() {
746 let mut b = BytesMut::with_capacity(0);
747 Variant::Null.encode(&mut b);
748 let t = unwrap_value(Variant::decode(&mut b));
749 assert_eq!(Variant::Null, t);
750 }
751
752 #[test]
753 fn variant_bool_true() {
754 let b1 = &mut BytesMut::with_capacity(0);
755 b1.put_u8(0x41);
756 assert_eq!(Variant::Boolean(true), unwrap_value(Variant::decode(b1)));
757
758 let b2 = &mut BytesMut::with_capacity(0);
759 b2.put_u8(0x56);
760 b2.put_u8(0x01);
761 assert_eq!(Variant::Boolean(true), unwrap_value(Variant::decode(b2)));
762 }
763
764 #[test]
765 fn variant_bool_false() {
766 let b1 = &mut BytesMut::with_capacity(0);
767 b1.put_u8(0x42u8);
768 assert_eq!(Variant::Boolean(false), unwrap_value(Variant::decode(b1)));
769
770 let b2 = &mut BytesMut::with_capacity(0);
771 b2.put_u8(0x56);
772 b2.put_u8(0x00);
773 assert_eq!(Variant::Boolean(false), unwrap_value(Variant::decode(b2)));
774 }
775
776 #[test]
779 fn variant_timestamp() {
780 let b1 = &mut BytesMut::with_capacity(0);
781 let datetime = Utc.ymd(2011, 7, 26).and_hms_milli(18, 21, 3, 521);
782 Variant::Timestamp(datetime).encode(b1);
783
784 let expected = Utc.ymd(2011, 7, 26).and_hms_milli(18, 21, 3, 521);
785 assert_eq!(
786 Variant::Timestamp(expected),
787 unwrap_value(Variant::decode(b1))
788 );
789 }
790
791 #[test]
792 fn variant_timestamp_pre_unix() {
793 let b1 = &mut BytesMut::with_capacity(0);
794 let datetime = Utc.ymd(1968, 7, 26).and_hms_milli(18, 21, 3, 521);
795 Variant::Timestamp(datetime).encode(b1);
796
797 let expected = Utc.ymd(1968, 7, 26).and_hms_milli(18, 21, 3, 521);
798 assert_eq!(
799 Variant::Timestamp(expected),
800 unwrap_value(Variant::decode(b1))
801 );
802 }
803
804 #[test]
805 fn option_i8() {
806 let b1 = &mut BytesMut::with_capacity(0);
807 Some(42i8).encode(b1);
808
809 assert_eq!(Some(42), unwrap_value(Option::<i8>::decode(b1)));
810
811 let b2 = &mut BytesMut::with_capacity(0);
812 let o1: Option<i8> = None;
813 o1.encode(b2);
814
815 assert_eq!(None, unwrap_value(Option::<i8>::decode(b2)));
816 }
817
818 #[test]
819 fn option_string() {
820 let b1 = &mut BytesMut::with_capacity(0);
821 Some(ByteString::from("hello")).encode(b1);
822
823 assert_eq!(
824 Some(ByteString::from("hello")),
825 unwrap_value(Option::<ByteString>::decode(b1))
826 );
827
828 let b2 = &mut BytesMut::with_capacity(0);
829 let o1: Option<ByteString> = None;
830 o1.encode(b2);
831
832 assert_eq!(None, unwrap_value(Option::<ByteString>::decode(b2)));
833 }
834}