ntex_amqp_codec/codec/
mod.rs1use ntex_bytes::{Buf, Bytes, BytesMut};
2
3use crate::{error::AmqpParseError, types::Constructor};
4
5macro_rules! decode_check_len {
6 ($buf:ident, $size:expr) => {
7 if $buf.len() < $size {
8 return Err(AmqpParseError::Incomplete($size));
9 }
10 };
11}
12
13#[macro_use]
14pub(crate) mod decode;
15mod encode;
16
17pub trait Encode {
19 fn encoded_size(&self) -> usize;
21
22 fn encode(&self, buf: &mut BytesMut);
24}
25
26pub trait ArrayEncode {
29 const ARRAY_CONSTRUCTOR: Constructor;
30
31 fn array_encoded_size(&self) -> usize;
33
34 fn array_encode(&self, buf: &mut BytesMut);
36}
37
38pub trait Decode
41where
42 Self: Sized,
43{
44 fn decode(input: &mut Bytes) -> Result<Self, AmqpParseError>;
46}
47
48pub trait DecodeFormatted
49where
50 Self: Sized,
51{
52 fn decode_with_format(input: &mut Bytes, fmt: u8) -> Result<Self, AmqpParseError>;
53}
54
55impl<T: DecodeFormatted> Decode for T {
56 fn decode(input: &mut Bytes) -> Result<Self, AmqpParseError> {
57 let fmt = decode_format_code(input)?;
58 T::decode_with_format(input, fmt)
59 }
60}
61
62pub fn decode_format_code(input: &mut Bytes) -> Result<u8, AmqpParseError> {
63 decode_check_len!(input, 1);
64 let code = input.get_u8();
65 Ok(code)
66}
67
68pub mod format_codes {
69 pub const FORMATCODE_DESCRIBED: u8 = 0x00;
70 pub const FORMATCODE_NULL: u8 = 0x40; pub const FORMATCODE_BOOLEAN: u8 = 0x56;
72 pub const FORMATCODE_BOOLEAN_TRUE: u8 = 0x41;
73 pub const FORMATCODE_BOOLEAN_FALSE: u8 = 0x42;
74 pub const FORMATCODE_UINT_0: u8 = 0x43;
75 pub const FORMATCODE_ULONG_0: u8 = 0x44;
76 pub const FORMATCODE_UBYTE: u8 = 0x50;
77 pub const FORMATCODE_USHORT: u8 = 0x60;
78 pub const FORMATCODE_UINT: u8 = 0x70;
79 pub const FORMATCODE_ULONG: u8 = 0x80;
80 pub const FORMATCODE_BYTE: u8 = 0x51;
81 pub const FORMATCODE_SHORT: u8 = 0x61;
82 pub const FORMATCODE_INT: u8 = 0x71;
83 pub const FORMATCODE_LONG: u8 = 0x81;
84 pub const FORMATCODE_SMALLUINT: u8 = 0x52;
85 pub const FORMATCODE_SMALLULONG: u8 = 0x53;
86 pub const FORMATCODE_SMALLINT: u8 = 0x54;
87 pub const FORMATCODE_SMALLLONG: u8 = 0x55;
88 pub const FORMATCODE_FLOAT: u8 = 0x72;
89 pub const FORMATCODE_DOUBLE: u8 = 0x82;
90 pub const FORMATCODE_DECIMAL32: u8 = 0x74;
91 pub const FORMATCODE_DECIMAL64: u8 = 0x84;
92 pub const FORMATCODE_DECIMAL128: u8 = 0x94;
93 pub const FORMATCODE_CHAR: u8 = 0x73;
94 pub const FORMATCODE_TIMESTAMP: u8 = 0x83;
95 pub const FORMATCODE_UUID: u8 = 0x98;
96 pub const FORMATCODE_BINARY8: u8 = 0xa0; pub const FORMATCODE_BINARY32: u8 = 0xb0;
98 pub const FORMATCODE_STRING8: u8 = 0xa1;
99 pub const FORMATCODE_STRING32: u8 = 0xb1;
100 pub const FORMATCODE_SYMBOL8: u8 = 0xa3;
101 pub const FORMATCODE_SYMBOL32: u8 = 0xb3;
102 pub const FORMATCODE_LIST0: u8 = 0x45; pub const FORMATCODE_LIST8: u8 = 0xc0;
104 pub const FORMATCODE_LIST32: u8 = 0xd0;
105 pub const FORMATCODE_MAP8: u8 = 0xc1;
106 pub const FORMATCODE_MAP32: u8 = 0xd1;
107 pub const FORMATCODE_ARRAY8: u8 = 0xe0;
108 pub const FORMATCODE_ARRAY32: u8 = 0xf0;
109}
110
111pub use self::format_codes::*;
112
113pub struct ListHeader {
114 pub size: u32,
115 pub count: u32,
116}
117
118pub struct MapHeader {
119 pub size: u32,
120 pub count: u32,
121}
122
123pub struct ArrayHeader {
124 pub size: u32,
125 pub count: u32,
126}
127
128#[cfg(test)]
129mod tests {
130 use ntex_bytes::{Bytes, BytesMut};
131
132 use crate::codec::{Decode, Encode};
133 use crate::error::AmqpCodecError;
134 use crate::framing::{AmqpFrame, SaslFrame};
135 use crate::protocol::SaslFrameBody;
136
137 #[test]
138 fn test_sasl_mechanisms() -> Result<(), AmqpCodecError> {
139 let mut data = Bytes::from_static(
140 b"\x02\x01\0\0\0S@\xc02\x01\xe0/\x04\xb3\0\0\0\x07MSSBCBS\0\0\0\x05PLAIN\0\0\0\tANONYMOUS\0\0\0\x08EXTERNAL");
141
142 let data2 = data.clone();
143 let frame = SaslFrame::decode(&mut data).unwrap();
144 assert!(data.is_empty());
145 match frame.body {
146 SaslFrameBody::SaslMechanisms(_) => (),
147 _ => panic!("error"),
148 }
149
150 let mut buf = BytesMut::new();
151 buf.reserve(frame.encoded_size());
152 frame.encode(&mut buf);
153 let _ = buf.split_to(4);
154 assert_eq!(data2, buf.freeze());
155
156 Ok(())
157 }
158
159 #[test]
160 fn test_disposition() -> Result<(), AmqpCodecError> {
161 let data = Bytes::from_static(b"\x02\0\0\0\0S\x15\xc0\x0c\x06AC@A\0S$\xc0\x01\0B");
162
163 let frame = AmqpFrame::decode(&mut data.clone())?;
164 assert_eq!(frame.performative().name(), "Disposition");
165
166 let mut buf = BytesMut::new();
167 buf.reserve(frame.encoded_size());
168 frame.encode(&mut buf);
169 let _ = buf.split_to(4);
170 assert_eq!(data, buf.freeze());
171
172 Ok(())
173 }
174}