amqp_codec/codec/
decode.rs

1use std::collections::HashMap;
2use std::convert::TryFrom;
3use std::hash::{BuildHasher, Hash};
4use std::{char, str, u8};
5
6use byteorder::{BigEndian, ByteOrder};
7use bytes::Bytes;
8use bytestring::ByteString;
9use chrono::{DateTime, TimeZone, Utc};
10use fxhash::FxHashMap;
11use ordered_float::OrderedFloat;
12use uuid::Uuid;
13
14use crate::codec::{self, ArrayDecode, Decode, DecodeFormatted};
15use crate::errors::AmqpParseError;
16use crate::framing::{self, AmqpFrame, SaslFrame, HEADER_LEN};
17use crate::protocol::{self, CompoundHeader};
18use crate::types::{
19    Descriptor, List, Multiple, Str, Symbol, Variant, VariantMap, VecStringMap, VecSymbolMap,
20};
21
22macro_rules! be_read {
23    ($input:ident, $fn:ident, $size:expr) => {{
24        decode_check_len!($input, $size);
25        let x = BigEndian::$fn($input);
26        Ok((&$input[$size..], x))
27    }};
28}
29
30fn read_u8(input: &[u8]) -> Result<(&[u8], u8), AmqpParseError> {
31    decode_check_len!(input, 1);
32    Ok((&input[1..], input[0]))
33}
34
35fn read_i8(input: &[u8]) -> Result<(&[u8], i8), AmqpParseError> {
36    decode_check_len!(input, 1);
37    Ok((&input[1..], input[0] as i8))
38}
39
40fn read_bytes_u8(input: &[u8]) -> Result<(&[u8], &[u8]), AmqpParseError> {
41    let (input, len) = read_u8(input)?;
42    let len = len as usize;
43    decode_check_len!(input, len);
44    let (bytes, input) = input.split_at(len);
45    Ok((input, bytes))
46}
47
48fn read_bytes_u32(input: &[u8]) -> Result<(&[u8], &[u8]), AmqpParseError> {
49    let result: Result<(&[u8], u32), AmqpParseError> = be_read!(input, read_u32, 4);
50    let (input, len) = result?;
51    let len = len as usize;
52    decode_check_len!(input, len);
53    let (bytes, input) = input.split_at(len);
54    Ok((input, bytes))
55}
56
57#[macro_export]
58macro_rules! validate_code {
59    ($fmt:ident, $code:expr) => {
60        if $fmt != $code {
61            return Err(AmqpParseError::InvalidFormatCode($fmt));
62        }
63    };
64}
65
66impl DecodeFormatted for bool {
67    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
68        match fmt {
69            codec::FORMATCODE_BOOLEAN => read_u8(input).map(|(i, o)| (i, o != 0)),
70            codec::FORMATCODE_BOOLEAN_TRUE => Ok((input, true)),
71            codec::FORMATCODE_BOOLEAN_FALSE => Ok((input, false)),
72            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
73        }
74    }
75}
76
77impl DecodeFormatted for u8 {
78    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
79        validate_code!(fmt, codec::FORMATCODE_UBYTE);
80        read_u8(input)
81    }
82}
83
84impl DecodeFormatted for u16 {
85    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
86        validate_code!(fmt, codec::FORMATCODE_USHORT);
87        be_read!(input, read_u16, 2)
88    }
89}
90
91impl DecodeFormatted for u32 {
92    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
93        match fmt {
94            codec::FORMATCODE_UINT => be_read!(input, read_u32, 4),
95            codec::FORMATCODE_SMALLUINT => read_u8(input).map(|(i, o)| (i, u32::from(o))),
96            codec::FORMATCODE_UINT_0 => Ok((input, 0)),
97            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
98        }
99    }
100}
101
102impl DecodeFormatted for u64 {
103    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
104        match fmt {
105            codec::FORMATCODE_ULONG => be_read!(input, read_u64, 8),
106            codec::FORMATCODE_SMALLULONG => read_u8(input).map(|(i, o)| (i, u64::from(o))),
107            codec::FORMATCODE_ULONG_0 => Ok((input, 0)),
108            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
109        }
110    }
111}
112
113impl DecodeFormatted for i8 {
114    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
115        validate_code!(fmt, codec::FORMATCODE_BYTE);
116        read_i8(input)
117    }
118}
119
120impl DecodeFormatted for i16 {
121    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
122        validate_code!(fmt, codec::FORMATCODE_SHORT);
123        be_read!(input, read_i16, 2)
124    }
125}
126
127impl DecodeFormatted for i32 {
128    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
129        match fmt {
130            codec::FORMATCODE_INT => be_read!(input, read_i32, 4),
131            codec::FORMATCODE_SMALLINT => read_i8(input).map(|(i, o)| (i, i32::from(o))),
132            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
133        }
134    }
135}
136
137impl DecodeFormatted for i64 {
138    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
139        match fmt {
140            codec::FORMATCODE_LONG => be_read!(input, read_i64, 8),
141            codec::FORMATCODE_SMALLLONG => read_i8(input).map(|(i, o)| (i, i64::from(o))),
142            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
143        }
144    }
145}
146
147impl DecodeFormatted for f32 {
148    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
149        validate_code!(fmt, codec::FORMATCODE_FLOAT);
150        be_read!(input, read_f32, 4)
151    }
152}
153
154impl DecodeFormatted for f64 {
155    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
156        validate_code!(fmt, codec::FORMATCODE_DOUBLE);
157        be_read!(input, read_f64, 8)
158    }
159}
160
161impl DecodeFormatted for char {
162    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
163        validate_code!(fmt, codec::FORMATCODE_CHAR);
164        let result: Result<(&[u8], u32), AmqpParseError> = be_read!(input, read_u32, 4);
165        let (i, o) = result?;
166        if let Some(c) = char::from_u32(o) {
167            Ok((i, c))
168        } else {
169            Err(AmqpParseError::InvalidChar(o))
170        } // todo: replace with CharTryFromError once try_from is stabilized
171    }
172}
173
174impl DecodeFormatted for DateTime<Utc> {
175    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
176        validate_code!(fmt, codec::FORMATCODE_TIMESTAMP);
177        be_read!(input, read_i64, 8).map(|(i, o)| (i, datetime_from_millis(o)))
178    }
179}
180
181impl DecodeFormatted for Uuid {
182    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
183        validate_code!(fmt, codec::FORMATCODE_UUID);
184        decode_check_len!(input, 16);
185        let uuid = Uuid::from_slice(&input[..16])?;
186        Ok((&input[16..], uuid))
187    }
188}
189
190impl DecodeFormatted for Bytes {
191    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
192        match fmt {
193            codec::FORMATCODE_BINARY8 => {
194                read_bytes_u8(input).map(|(i, o)| (i, Bytes::copy_from_slice(o)))
195            }
196            codec::FORMATCODE_BINARY32 => {
197                read_bytes_u32(input).map(|(i, o)| (i, Bytes::copy_from_slice(o)))
198            }
199            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
200        }
201    }
202}
203
204impl DecodeFormatted for ByteString {
205    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
206        match fmt {
207            codec::FORMATCODE_STRING8 => {
208                let (input, bytes) = read_bytes_u8(input)?;
209                Ok((input, ByteString::try_from(bytes)?))
210            }
211            codec::FORMATCODE_STRING32 => {
212                let (input, bytes) = read_bytes_u32(input)?;
213                Ok((input, ByteString::try_from(bytes)?))
214            }
215            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
216        }
217    }
218}
219
220impl DecodeFormatted for Str {
221    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
222        match fmt {
223            codec::FORMATCODE_STRING8 => {
224                let (input, bytes) = read_bytes_u8(input)?;
225                Ok((input, Str::from_str(str::from_utf8(bytes)?)))
226            }
227            codec::FORMATCODE_STRING32 => {
228                let (input, bytes) = read_bytes_u32(input)?;
229                Ok((input, Str::from_str(str::from_utf8(bytes)?)))
230            }
231            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
232        }
233    }
234}
235
236impl DecodeFormatted for Symbol {
237    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
238        match fmt {
239            codec::FORMATCODE_SYMBOL8 => {
240                let (input, bytes) = read_bytes_u8(input)?;
241                Ok((input, Symbol::from_slice(str::from_utf8(bytes)?)))
242            }
243            codec::FORMATCODE_SYMBOL32 => {
244                let (input, bytes) = read_bytes_u32(input)?;
245                Ok((input, Symbol::from_slice(str::from_utf8(bytes)?)))
246            }
247            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
248        }
249    }
250}
251
252impl ArrayDecode for Symbol {
253    fn array_decode(input: &[u8]) -> Result<(&[u8], Self), AmqpParseError> {
254        let (input, bytes) = read_bytes_u32(input)?;
255        Ok((input, Symbol::from_slice(str::from_utf8(bytes)?)))
256    }
257}
258
259impl<K: Decode + Eq + Hash, V: Decode, S: BuildHasher + Default> DecodeFormatted
260    for HashMap<K, V, S>
261{
262    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
263        let (input, header) = decode_map_header(input, fmt)?;
264        let mut map_input = &input[..header.size as usize];
265        let count = header.count / 2;
266        let mut map: HashMap<K, V, S> =
267            HashMap::with_capacity_and_hasher(count as usize, Default::default());
268        for _ in 0..count {
269            let (input1, key) = K::decode(map_input)?;
270            let (input2, value) = V::decode(input1)?;
271            map_input = input2;
272            map.insert(key, value); // todo: ensure None returned?
273        }
274        // todo: validate map_input is empty
275        Ok((&input[header.size as usize..], map))
276    }
277}
278
279impl<T: DecodeFormatted> DecodeFormatted for Vec<T> {
280    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
281        let (input, header) = decode_array_header(input, fmt)?;
282        let item_fmt = input[0]; // todo: support descriptor
283        let mut input = &input[1..];
284        let mut result: Vec<T> = Vec::with_capacity(header.count as usize);
285        for _ in 0..header.count {
286            let (new_input, decoded) = T::decode_with_format(input, item_fmt)?;
287            result.push(decoded);
288            input = new_input;
289        }
290        Ok((input, result))
291    }
292}
293
294impl DecodeFormatted for VecSymbolMap {
295    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
296        let (input, header) = decode_map_header(input, fmt)?;
297        let mut map_input = &input[..header.size as usize];
298        let count = header.count / 2;
299        let mut map = Vec::with_capacity(count as usize);
300        for _ in 0..count {
301            let (input1, key) = Symbol::decode(map_input)?;
302            let (input2, value) = Variant::decode(input1)?;
303            map_input = input2;
304            map.push((key, value)); // todo: ensure None returned?
305        }
306        // todo: validate map_input is empty
307        Ok((&input[header.size as usize..], VecSymbolMap(map)))
308    }
309}
310
311impl DecodeFormatted for VecStringMap {
312    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
313        let (input, header) = decode_map_header(input, fmt)?;
314        let mut map_input = &input[..header.size as usize];
315        let count = header.count / 2;
316        let mut map = Vec::with_capacity(count as usize);
317        for _ in 0..count {
318            let (input1, key) = Str::decode(map_input)?;
319            let (input2, value) = Variant::decode(input1)?;
320            map_input = input2;
321            map.push((key, value)); // todo: ensure None returned?
322        }
323        // todo: validate map_input is empty
324        Ok((&input[header.size as usize..], VecStringMap(map)))
325    }
326}
327
328impl<T: ArrayDecode + DecodeFormatted> DecodeFormatted for Multiple<T> {
329    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
330        match fmt {
331            codec::FORMATCODE_ARRAY8 | codec::FORMATCODE_ARRAY32 => {
332                let (input, items) = Vec::<T>::decode_with_format(input, fmt)?;
333                Ok((input, Multiple(items)))
334            }
335            _ => {
336                let (input, item) = T::decode_with_format(input, fmt)?;
337                Ok((input, Multiple(vec![item])))
338            }
339        }
340    }
341}
342
343impl DecodeFormatted for List {
344    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
345        let (mut input, header) = decode_list_header(input, fmt)?;
346        let mut result: Vec<Variant> = Vec::with_capacity(header.count as usize);
347        for _ in 0..header.count {
348            let (new_input, decoded) = Variant::decode(input)?;
349            result.push(decoded);
350            input = new_input;
351        }
352        Ok((input, List(result)))
353    }
354}
355
356impl DecodeFormatted for Variant {
357    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
358        match fmt {
359            codec::FORMATCODE_NULL => Ok((input, Variant::Null)),
360            codec::FORMATCODE_BOOLEAN => {
361                bool::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Boolean(o)))
362            }
363            codec::FORMATCODE_BOOLEAN_FALSE => Ok((input, Variant::Boolean(false))),
364            codec::FORMATCODE_BOOLEAN_TRUE => Ok((input, Variant::Boolean(true))),
365            codec::FORMATCODE_UINT_0 => Ok((input, Variant::Uint(0))),
366            codec::FORMATCODE_ULONG_0 => Ok((input, Variant::Ulong(0))),
367            codec::FORMATCODE_UBYTE => {
368                u8::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Ubyte(o)))
369            }
370            codec::FORMATCODE_USHORT => {
371                u16::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Ushort(o)))
372            }
373            codec::FORMATCODE_UINT => {
374                u32::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Uint(o)))
375            }
376            codec::FORMATCODE_ULONG => {
377                u64::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Ulong(o)))
378            }
379            codec::FORMATCODE_BYTE => {
380                i8::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Byte(o)))
381            }
382            codec::FORMATCODE_SHORT => {
383                i16::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Short(o)))
384            }
385            codec::FORMATCODE_INT => {
386                i32::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Int(o)))
387            }
388            codec::FORMATCODE_LONG => {
389                i64::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Long(o)))
390            }
391            codec::FORMATCODE_SMALLUINT => {
392                u32::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Uint(o)))
393            }
394            codec::FORMATCODE_SMALLULONG => {
395                u64::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Ulong(o)))
396            }
397            codec::FORMATCODE_SMALLINT => {
398                i32::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Int(o)))
399            }
400            codec::FORMATCODE_SMALLLONG => {
401                i64::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Long(o)))
402            }
403            codec::FORMATCODE_FLOAT => f32::decode_with_format(input, fmt)
404                .map(|(i, o)| (i, Variant::Float(OrderedFloat(o)))),
405            codec::FORMATCODE_DOUBLE => f64::decode_with_format(input, fmt)
406                .map(|(i, o)| (i, Variant::Double(OrderedFloat(o)))),
407            // codec::FORMATCODE_DECIMAL32 => x::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Decimal(o))),
408            // codec::FORMATCODE_DECIMAL64 => x::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Decimal(o))),
409            // codec::FORMATCODE_DECIMAL128 => x::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Decimal(o))),
410            codec::FORMATCODE_CHAR => {
411                char::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Char(o)))
412            }
413            codec::FORMATCODE_TIMESTAMP => DateTime::<Utc>::decode_with_format(input, fmt)
414                .map(|(i, o)| (i, Variant::Timestamp(o))),
415            codec::FORMATCODE_UUID => {
416                Uuid::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Uuid(o)))
417            }
418            codec::FORMATCODE_BINARY8 => {
419                Bytes::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Binary(o)))
420            }
421            codec::FORMATCODE_BINARY32 => {
422                Bytes::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Binary(o)))
423            }
424            codec::FORMATCODE_STRING8 => ByteString::decode_with_format(input, fmt)
425                .map(|(i, o)| (i, Variant::String(o.into()))),
426            codec::FORMATCODE_STRING32 => ByteString::decode_with_format(input, fmt)
427                .map(|(i, o)| (i, Variant::String(o.into()))),
428            codec::FORMATCODE_SYMBOL8 => {
429                Symbol::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Symbol(o)))
430            }
431            codec::FORMATCODE_SYMBOL32 => {
432                Symbol::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Symbol(o)))
433            }
434            codec::FORMATCODE_LIST0 => Ok((input, Variant::List(List(vec![])))),
435            codec::FORMATCODE_LIST8 => {
436                List::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::List(o)))
437            }
438            codec::FORMATCODE_LIST32 => {
439                List::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::List(o)))
440            }
441            codec::FORMATCODE_MAP8 => FxHashMap::<Variant, Variant>::decode_with_format(input, fmt)
442                .map(|(i, o)| (i, Variant::Map(VariantMap::new(o)))),
443            codec::FORMATCODE_MAP32 => {
444                FxHashMap::<Variant, Variant>::decode_with_format(input, fmt)
445                    .map(|(i, o)| (i, Variant::Map(VariantMap::new(o))))
446            }
447            // codec::FORMATCODE_ARRAY8 => Vec::<Variant>::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Array(o))),
448            // codec::FORMATCODE_ARRAY32 => Vec::<Variant>::decode_with_format(input, fmt).map(|(i, o)| (i, Variant::Array(o))),
449            codec::FORMATCODE_DESCRIBED => {
450                let (input, descriptor) = Descriptor::decode(input)?;
451                let (input, value) = Variant::decode(input)?;
452                Ok((input, Variant::Described((descriptor, Box::new(value)))))
453            }
454            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
455        }
456    }
457}
458
459impl<T: DecodeFormatted> DecodeFormatted for Option<T> {
460    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
461        match fmt {
462            codec::FORMATCODE_NULL => Ok((input, None)),
463            _ => T::decode_with_format(input, fmt).map(|(i, o)| (i, Some(o))),
464        }
465    }
466}
467
468impl DecodeFormatted for Descriptor {
469    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError> {
470        match fmt {
471            codec::FORMATCODE_SMALLULONG => {
472                u64::decode_with_format(input, fmt).map(|(i, o)| (i, Descriptor::Ulong(o)))
473            }
474            codec::FORMATCODE_ULONG => {
475                u64::decode_with_format(input, fmt).map(|(i, o)| (i, Descriptor::Ulong(o)))
476            }
477            codec::FORMATCODE_SYMBOL8 => {
478                Symbol::decode_with_format(input, fmt).map(|(i, o)| (i, Descriptor::Symbol(o)))
479            }
480            codec::FORMATCODE_SYMBOL32 => {
481                Symbol::decode_with_format(input, fmt).map(|(i, o)| (i, Descriptor::Symbol(o)))
482            }
483            _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
484        }
485    }
486}
487
488impl Decode for AmqpFrame {
489    fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpParseError> {
490        let (input, channel_id) = decode_frame_header(input, framing::FRAME_TYPE_AMQP)?;
491        let (input, performative) = protocol::Frame::decode(input)?;
492        Ok((input, AmqpFrame::new(channel_id, performative)))
493    }
494}
495
496impl Decode for SaslFrame {
497    fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpParseError> {
498        let (input, _) = decode_frame_header(input, framing::FRAME_TYPE_SASL)?;
499        let (input, frame) = protocol::SaslFrameBody::decode(input)?;
500        Ok((input, SaslFrame { body: frame }))
501    }
502}
503
504fn decode_frame_header(
505    input: &[u8],
506    expected_frame_type: u8,
507) -> Result<(&[u8], u16), AmqpParseError> {
508    decode_check_len!(input, 4);
509    let doff = input[0];
510    let frame_type = input[1];
511    if frame_type != expected_frame_type {
512        return Err(AmqpParseError::UnexpectedFrameType(frame_type));
513    }
514
515    let channel_id = BigEndian::read_u16(&input[2..]);
516    let doff = doff as usize * 4;
517    if doff < HEADER_LEN {
518        return Err(AmqpParseError::InvalidSize);
519    }
520    let ext_header_len = doff - HEADER_LEN;
521    decode_check_len!(input, ext_header_len + 4);
522    let input = &input[ext_header_len + 4..]; // skipping remaining two header bytes and ext header
523    Ok((input, channel_id))
524}
525
526fn decode_array_header(input: &[u8], fmt: u8) -> Result<(&[u8], CompoundHeader), AmqpParseError> {
527    match fmt {
528        codec::FORMATCODE_ARRAY8 => decode_compound8(input),
529        codec::FORMATCODE_ARRAY32 => decode_compound32(input),
530        _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
531    }
532}
533
534pub(crate) fn decode_list_header(
535    input: &[u8],
536    fmt: u8,
537) -> Result<(&[u8], CompoundHeader), AmqpParseError> {
538    match fmt {
539        codec::FORMATCODE_LIST0 => Ok((input, CompoundHeader::empty())),
540        codec::FORMATCODE_LIST8 => decode_compound8(input),
541        codec::FORMATCODE_LIST32 => decode_compound32(input),
542        _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
543    }
544}
545
546pub(crate) fn decode_map_header(
547    input: &[u8],
548    fmt: u8,
549) -> Result<(&[u8], CompoundHeader), AmqpParseError> {
550    match fmt {
551        codec::FORMATCODE_MAP8 => decode_compound8(input),
552        codec::FORMATCODE_MAP32 => decode_compound32(input),
553        _ => Err(AmqpParseError::InvalidFormatCode(fmt)),
554    }
555}
556
557fn decode_compound8(input: &[u8]) -> Result<(&[u8], CompoundHeader), AmqpParseError> {
558    decode_check_len!(input, 2);
559    let size = input[0] - 1; // -1 for 1 byte count
560    let count = input[1];
561    Ok((
562        &input[2..],
563        CompoundHeader {
564            size: u32::from(size),
565            count: u32::from(count),
566        },
567    ))
568}
569
570fn decode_compound32(input: &[u8]) -> Result<(&[u8], CompoundHeader), AmqpParseError> {
571    decode_check_len!(input, 8);
572    let size = BigEndian::read_u32(input) - 4; // -4 for 4 byte count
573    let count = BigEndian::read_u32(&input[4..]);
574    Ok((&input[8..], CompoundHeader { size, count }))
575}
576
577fn datetime_from_millis(millis: i64) -> DateTime<Utc> {
578    let seconds = millis / 1000;
579    if seconds < 0 {
580        // In order to handle time before 1970 correctly, we need to subtract a second
581        // and use the nanoseconds field to add it back. This is a result of the nanoseconds
582        // parameter being u32
583        let nanoseconds = ((1000 + (millis - (seconds * 1000))) * 1_000_000).abs() as u32;
584        Utc.timestamp(seconds - 1, nanoseconds)
585    } else {
586        let nanoseconds = ((millis - (seconds * 1000)) * 1_000_000).abs() as u32;
587        Utc.timestamp(seconds, nanoseconds)
588    }
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594    use crate::codec::Encode;
595    use bytes::{BufMut, BytesMut};
596
597    const LOREM: &str = include_str!("lorem.txt");
598
599    macro_rules! decode_tests {
600        ($($name:ident: $kind:ident, $test:expr, $expected:expr,)*) => {
601        $(
602            #[test]
603            fn $name() {
604                let b1 = &mut BytesMut::with_capacity(($test).encoded_size());
605                ($test).encode(b1);
606                assert_eq!($expected, unwrap_value($kind::decode(b1)));
607            }
608        )*
609        }
610    }
611
612    decode_tests! {
613        ubyte: u8, 255_u8, 255_u8,
614        ushort: u16, 350_u16, 350_u16,
615
616        uint_zero: u32, 0_u32, 0_u32,
617        uint_small: u32, 128_u32, 128_u32,
618        uint_big: u32, 2147483647_u32, 2147483647_u32,
619
620        ulong_zero: u64, 0_u64, 0_u64,
621        ulong_small: u64, 128_u64, 128_u64,
622        uulong_big: u64, 2147483649_u64, 2147483649_u64,
623
624        byte: i8, -128_i8, -128_i8,
625        short: i16, -255_i16, -255_i16,
626
627        int_zero: i32, 0_i32, 0_i32,
628        int_small: i32, -50000_i32, -50000_i32,
629        int_neg: i32, -128_i32, -128_i32,
630
631        long_zero: i64, 0_i64, 0_i64,
632        long_big: i64, -2147483647_i64, -2147483647_i64,
633        long_small: i64, -128_i64, -128_i64,
634
635        float: f32, 1.234_f32, 1.234_f32,
636        double: f64, 1.234_f64, 1.234_f64,
637
638        test_char: char, '💯', '💯',
639
640        uuid: Uuid, Uuid::from_slice(&[4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87]).expect("parse error"),
641        Uuid::parse_str("0436430c2b02624c2032570501212b57").expect("parse error"),
642
643        binary_short: Bytes, Bytes::from(&[4u8, 5u8][..]), Bytes::from(&[4u8, 5u8][..]),
644        binary_long: Bytes, Bytes::from(&[4u8; 500][..]), Bytes::from(&[4u8; 500][..]),
645
646        string_short: ByteString, ByteString::from("Hello there"), ByteString::from("Hello there"),
647        string_long: ByteString, ByteString::from(LOREM), ByteString::from(LOREM),
648
649        // symbol_short: Symbol, Symbol::from("Hello there"), Symbol::from("Hello there"),
650        // symbol_long: Symbol, Symbol::from(LOREM), Symbol::from(LOREM),
651
652        variant_ubyte: Variant, Variant::Ubyte(255_u8), Variant::Ubyte(255_u8),
653        variant_ushort: Variant, Variant::Ushort(350_u16), Variant::Ushort(350_u16),
654
655        variant_uint_zero: Variant, Variant::Uint(0_u32), Variant::Uint(0_u32),
656        variant_uint_small: Variant, Variant::Uint(128_u32), Variant::Uint(128_u32),
657        variant_uint_big: Variant, Variant::Uint(2147483647_u32), Variant::Uint(2147483647_u32),
658
659        variant_ulong_zero: Variant, Variant::Ulong(0_u64), Variant::Ulong(0_u64),
660        variant_ulong_small: Variant, Variant::Ulong(128_u64), Variant::Ulong(128_u64),
661        variant_ulong_big: Variant, Variant::Ulong(2147483649_u64), Variant::Ulong(2147483649_u64),
662
663        variant_byte: Variant, Variant::Byte(-128_i8), Variant::Byte(-128_i8),
664        variant_short: Variant, Variant::Short(-255_i16), Variant::Short(-255_i16),
665
666        variant_int_zero: Variant, Variant::Int(0_i32), Variant::Int(0_i32),
667        variant_int_small: Variant, Variant::Int(-50000_i32), Variant::Int(-50000_i32),
668        variant_int_neg: Variant, Variant::Int(-128_i32), Variant::Int(-128_i32),
669
670        variant_long_zero: Variant, Variant::Long(0_i64), Variant::Long(0_i64),
671        variant_long_big: Variant, Variant::Long(-2147483647_i64), Variant::Long(-2147483647_i64),
672        variant_long_small: Variant, Variant::Long(-128_i64), Variant::Long(-128_i64),
673
674        variant_float: Variant, Variant::Float(OrderedFloat(1.234_f32)), Variant::Float(OrderedFloat(1.234_f32)),
675        variant_double: Variant, Variant::Double(OrderedFloat(1.234_f64)), Variant::Double(OrderedFloat(1.234_f64)),
676
677        variant_char: Variant, Variant::Char('💯'), Variant::Char('💯'),
678
679        variant_uuid: Variant, Variant::Uuid(Uuid::from_slice(&[4, 54, 67, 12, 43, 2, 98, 76, 32, 50, 87, 5, 1, 33, 43, 87]).expect("parse error")),
680        Variant::Uuid(Uuid::parse_str("0436430c2b02624c2032570501212b57").expect("parse error")),
681
682        variant_binary_short: Variant, Variant::Binary(Bytes::from(&[4u8, 5u8][..])), Variant::Binary(Bytes::from(&[4u8, 5u8][..])),
683        variant_binary_long: Variant, Variant::Binary(Bytes::from(&[4u8; 500][..])), Variant::Binary(Bytes::from(&[4u8; 500][..])),
684
685        variant_string_short: Variant, Variant::String(ByteString::from("Hello there").into()), Variant::String(ByteString::from("Hello there").into()),
686        variant_string_long: Variant, Variant::String(ByteString::from(LOREM).into()), Variant::String(ByteString::from(LOREM).into()),
687
688        // variant_symbol_short: Variant, Variant::Symbol(Symbol::from("Hello there")), Variant::Symbol(Symbol::from("Hello there")),
689        // variant_symbol_long: Variant, Variant::Symbol(Symbol::from(LOREM)), Variant::Symbol(Symbol::from(LOREM)),
690    }
691
692    fn unwrap_value<T>(res: Result<(&[u8], T), AmqpParseError>) -> T {
693        let r = res.map(|(_i, o)| o);
694        assert!(r.is_ok());
695        r.unwrap()
696    }
697
698    #[test]
699    fn test_bool_true() {
700        let b1 = &mut BytesMut::with_capacity(0);
701        b1.put_u8(0x41);
702        assert_eq!(true, unwrap_value(bool::decode(b1)));
703
704        let b2 = &mut BytesMut::with_capacity(0);
705        b2.put_u8(0x56);
706        b2.put_u8(0x01);
707        assert_eq!(true, unwrap_value(bool::decode(b2)));
708    }
709
710    #[test]
711    fn test_bool_false() {
712        let b1 = &mut BytesMut::with_capacity(0);
713        b1.put_u8(0x42u8);
714        assert_eq!(false, unwrap_value(bool::decode(b1)));
715
716        let b2 = &mut BytesMut::with_capacity(0);
717        b2.put_u8(0x56);
718        b2.put_u8(0x00);
719        assert_eq!(false, unwrap_value(bool::decode(b2)));
720    }
721
722    /// UTC with a precision of milliseconds. For example, 1311704463521
723    /// represents the moment 2011-07-26T18:21:03.521Z.
724    #[test]
725    fn test_timestamp() {
726        let b1 = &mut BytesMut::with_capacity(0);
727        let datetime = Utc.ymd(2011, 7, 26).and_hms_milli(18, 21, 3, 521);
728        datetime.encode(b1);
729
730        let expected = Utc.ymd(2011, 7, 26).and_hms_milli(18, 21, 3, 521);
731        assert_eq!(expected, unwrap_value(DateTime::<Utc>::decode(b1)));
732    }
733
734    #[test]
735    fn test_timestamp_pre_unix() {
736        let b1 = &mut BytesMut::with_capacity(0);
737        let datetime = Utc.ymd(1968, 7, 26).and_hms_milli(18, 21, 3, 521);
738        datetime.encode(b1);
739
740        let expected = Utc.ymd(1968, 7, 26).and_hms_milli(18, 21, 3, 521);
741        assert_eq!(expected, unwrap_value(DateTime::<Utc>::decode(b1)));
742    }
743
744    #[test]
745    fn variant_null() {
746        let mut b = BytesMut::with_capacity(0);
747        Variant::Null.encode(&mut b);
748        let t = unwrap_value(Variant::decode(&mut b));
749        assert_eq!(Variant::Null, t);
750    }
751
752    #[test]
753    fn variant_bool_true() {
754        let b1 = &mut BytesMut::with_capacity(0);
755        b1.put_u8(0x41);
756        assert_eq!(Variant::Boolean(true), unwrap_value(Variant::decode(b1)));
757
758        let b2 = &mut BytesMut::with_capacity(0);
759        b2.put_u8(0x56);
760        b2.put_u8(0x01);
761        assert_eq!(Variant::Boolean(true), unwrap_value(Variant::decode(b2)));
762    }
763
764    #[test]
765    fn variant_bool_false() {
766        let b1 = &mut BytesMut::with_capacity(0);
767        b1.put_u8(0x42u8);
768        assert_eq!(Variant::Boolean(false), unwrap_value(Variant::decode(b1)));
769
770        let b2 = &mut BytesMut::with_capacity(0);
771        b2.put_u8(0x56);
772        b2.put_u8(0x00);
773        assert_eq!(Variant::Boolean(false), unwrap_value(Variant::decode(b2)));
774    }
775
776    /// UTC with a precision of milliseconds. For example, 1311704463521
777    /// represents the moment 2011-07-26T18:21:03.521Z.
778    #[test]
779    fn variant_timestamp() {
780        let b1 = &mut BytesMut::with_capacity(0);
781        let datetime = Utc.ymd(2011, 7, 26).and_hms_milli(18, 21, 3, 521);
782        Variant::Timestamp(datetime).encode(b1);
783
784        let expected = Utc.ymd(2011, 7, 26).and_hms_milli(18, 21, 3, 521);
785        assert_eq!(
786            Variant::Timestamp(expected),
787            unwrap_value(Variant::decode(b1))
788        );
789    }
790
791    #[test]
792    fn variant_timestamp_pre_unix() {
793        let b1 = &mut BytesMut::with_capacity(0);
794        let datetime = Utc.ymd(1968, 7, 26).and_hms_milli(18, 21, 3, 521);
795        Variant::Timestamp(datetime).encode(b1);
796
797        let expected = Utc.ymd(1968, 7, 26).and_hms_milli(18, 21, 3, 521);
798        assert_eq!(
799            Variant::Timestamp(expected),
800            unwrap_value(Variant::decode(b1))
801        );
802    }
803
804    #[test]
805    fn option_i8() {
806        let b1 = &mut BytesMut::with_capacity(0);
807        Some(42i8).encode(b1);
808
809        assert_eq!(Some(42), unwrap_value(Option::<i8>::decode(b1)));
810
811        let b2 = &mut BytesMut::with_capacity(0);
812        let o1: Option<i8> = None;
813        o1.encode(b2);
814
815        assert_eq!(None, unwrap_value(Option::<i8>::decode(b2)));
816    }
817
818    #[test]
819    fn option_string() {
820        let b1 = &mut BytesMut::with_capacity(0);
821        Some(ByteString::from("hello")).encode(b1);
822
823        assert_eq!(
824            Some(ByteString::from("hello")),
825            unwrap_value(Option::<ByteString>::decode(b1))
826        );
827
828        let b2 = &mut BytesMut::with_capacity(0);
829        let o1: Option<ByteString> = None;
830        o1.encode(b2);
831
832        assert_eq!(None, unwrap_value(Option::<ByteString>::decode(b2)));
833    }
834}