amqp_codec/codec/
mod.rs

1use bytes::BytesMut;
2use std::marker::Sized;
3
4use crate::errors::AmqpParseError;
5
6macro_rules! decode_check_len {
7    ($buf:ident, $size:expr) => {
8        if $buf.len() < $size {
9            return Err(AmqpParseError::Incomplete(Some($size)));
10        }
11    };
12}
13
14#[macro_use]
15mod decode;
16mod encode;
17
18pub(crate) use self::decode::decode_list_header;
19
20pub trait Encode {
21    fn encoded_size(&self) -> usize;
22
23    fn encode(&self, buf: &mut BytesMut);
24}
25
26pub trait ArrayEncode {
27    const ARRAY_FORMAT_CODE: u8;
28
29    fn array_encoded_size(&self) -> usize;
30
31    fn array_encode(&self, buf: &mut BytesMut);
32}
33
34pub trait Decode
35where
36    Self: Sized,
37{
38    fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpParseError>;
39}
40
41pub trait DecodeFormatted
42where
43    Self: Sized,
44{
45    fn decode_with_format(input: &[u8], fmt: u8) -> Result<(&[u8], Self), AmqpParseError>;
46}
47
48pub trait ArrayDecode: Sized {
49    fn array_decode(input: &[u8]) -> Result<(&[u8], Self), AmqpParseError>;
50}
51
52impl<T: DecodeFormatted> Decode for T {
53    fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpParseError> {
54        let (input, fmt) = decode_format_code(input)?;
55        T::decode_with_format(input, fmt)
56    }
57}
58
59pub fn decode_format_code(input: &[u8]) -> Result<(&[u8], u8), AmqpParseError> {
60    decode_check_len!(input, 1);
61    Ok((&input[1..], input[0]))
62}
63
64pub const FORMATCODE_DESCRIBED: u8 = 0x00;
65pub const FORMATCODE_NULL: u8 = 0x40; // fixed width --V
66pub const FORMATCODE_BOOLEAN: u8 = 0x56;
67pub const FORMATCODE_BOOLEAN_TRUE: u8 = 0x41;
68pub const FORMATCODE_BOOLEAN_FALSE: u8 = 0x42;
69pub const FORMATCODE_UINT_0: u8 = 0x43;
70pub const FORMATCODE_ULONG_0: u8 = 0x44;
71pub const FORMATCODE_UBYTE: u8 = 0x50;
72pub const FORMATCODE_USHORT: u8 = 0x60;
73pub const FORMATCODE_UINT: u8 = 0x70;
74pub const FORMATCODE_ULONG: u8 = 0x80;
75pub const FORMATCODE_BYTE: u8 = 0x51;
76pub const FORMATCODE_SHORT: u8 = 0x61;
77pub const FORMATCODE_INT: u8 = 0x71;
78pub const FORMATCODE_LONG: u8 = 0x81;
79pub const FORMATCODE_SMALLUINT: u8 = 0x52;
80pub const FORMATCODE_SMALLULONG: u8 = 0x53;
81pub const FORMATCODE_SMALLINT: u8 = 0x54;
82pub const FORMATCODE_SMALLLONG: u8 = 0x55;
83pub const FORMATCODE_FLOAT: u8 = 0x72;
84pub const FORMATCODE_DOUBLE: u8 = 0x82;
85// pub const FORMATCODE_DECIMAL32: u8 = 0x74;
86// pub const FORMATCODE_DECIMAL64: u8 = 0x84;
87// pub const FORMATCODE_DECIMAL128: u8 = 0x94;
88pub const FORMATCODE_CHAR: u8 = 0x73;
89pub const FORMATCODE_TIMESTAMP: u8 = 0x83;
90pub const FORMATCODE_UUID: u8 = 0x98;
91pub const FORMATCODE_BINARY8: u8 = 0xa0; // variable --V
92pub const FORMATCODE_BINARY32: u8 = 0xb0;
93pub const FORMATCODE_STRING8: u8 = 0xa1;
94pub const FORMATCODE_STRING32: u8 = 0xb1;
95pub const FORMATCODE_SYMBOL8: u8 = 0xa3;
96pub const FORMATCODE_SYMBOL32: u8 = 0xb3;
97pub const FORMATCODE_LIST0: u8 = 0x45; // compound --V
98pub const FORMATCODE_LIST8: u8 = 0xc0;
99pub const FORMATCODE_LIST32: u8 = 0xd0;
100pub const FORMATCODE_MAP8: u8 = 0xc1;
101pub const FORMATCODE_MAP32: u8 = 0xd1;
102pub const FORMATCODE_ARRAY8: u8 = 0xe0;
103pub const FORMATCODE_ARRAY32: u8 = 0xf0;
104
105#[cfg(test)]
106mod tests {
107    use bytes::{Bytes, BytesMut};
108
109    use crate::codec::{Decode, Encode};
110    use crate::errors::AmqpCodecError;
111    use crate::framing::{AmqpFrame, SaslFrame};
112    use crate::protocol::SaslFrameBody;
113
114    #[test]
115    fn test_sasl_mechanisms() -> Result<(), AmqpCodecError> {
116        let data = b"\x02\x01\0\0\0S@\xc02\x01\xe0/\x04\xb3\0\0\0\x07MSSBCBS\0\0\0\x05PLAIN\0\0\0\tANONYMOUS\0\0\0\x08EXTERNAL";
117
118        let (remainder, frame) = SaslFrame::decode(data.as_ref())?;
119        assert!(remainder.is_empty());
120        match frame.body {
121            SaslFrameBody::SaslMechanisms(_) => (),
122            _ => panic!("error"),
123        }
124
125        let mut buf = BytesMut::new();
126        buf.reserve(frame.encoded_size());
127        frame.encode(&mut buf);
128        buf.split_to(4);
129        assert_eq!(Bytes::from_static(data), buf.freeze());
130
131        Ok(())
132    }
133
134    #[test]
135    fn test_disposition() -> Result<(), AmqpCodecError> {
136        let data = b"\x02\0\0\0\0S\x15\xc0\x0c\x06AC@A\0S$\xc0\x01\0B";
137
138        let (remainder, frame) = AmqpFrame::decode(data.as_ref())?;
139        assert!(remainder.is_empty());
140        assert_eq!(frame.performative().name(), "Disposition");
141
142        let mut buf = BytesMut::new();
143        buf.reserve(frame.encoded_size());
144        frame.encode(&mut buf);
145        buf.split_to(4);
146        assert_eq!(Bytes::from_static(data), buf.freeze());
147
148        Ok(())
149    }
150}