rabbitmq_stream_protocol/codec/
decoder.rs1use std::collections::HashMap;
2
3use byteorder::ByteOrder;
4
5use crate::error::IncompleteError;
6use crate::message::Message;
7use crate::types::PublishedMessage;
8use crate::types::PublishingError;
9use crate::ResponseCode;
10use crate::{error::DecodeError, types::Header};
11
12use super::Decoder;
13
14impl Decoder for i8 {
15 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
16 read_i8(input).map(Ok)?
17 }
18}
19
20impl Decoder for i32 {
21 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
22 read_i32(input).map(Ok)?
23 }
24}
25
26impl Decoder for u8 {
27 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
28 read_u8(input).map(Ok)?
29 }
30}
31
32impl Decoder for u16 {
33 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
34 read_u16(input).map(Ok)?
35 }
36}
37
38impl Decoder for u32 {
39 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
40 read_u32(input).map(Ok)?
41 }
42}
43
44impl Decoder for u64 {
45 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
46 read_u64(input).map(Ok)?
47 }
48}
49impl Decoder for i64 {
50 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
51 read_i64(input).map(Ok)?
52 }
53}
54
55impl Decoder for Vec<u8> {
56 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
57 let (input, len) = read_i32(input)?;
58 let len = len as usize;
59 Ok((&input[len..], input[..len].to_vec()))
60 }
61}
62
63pub fn read_vec<T: Decoder>(input: &[u8]) -> Result<(&[u8], Vec<T>), DecodeError> {
64 let (mut input, len) = read_i32(input)?;
65 let len = len as usize;
66 let mut result: Vec<T> = Vec::with_capacity(len);
67 for _ in 0..len {
68 let (input1, value) = T::decode(input)?;
69 result.push(value);
70 input = input1
71 }
72 Ok((input, result))
73}
74
75impl Decoder for Vec<u32> {
76 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
77 let (input, result) = read_vec(input)?;
78 Ok((input, result))
79 }
80}
81impl Decoder for Vec<u16> {
82 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
83 let (input, result) = read_vec(input)?;
84 Ok((input, result))
85 }
86}
87
88impl Decoder for Vec<u64> {
89 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
90 let (input, result) = read_vec(input)?;
91 Ok((input, result))
92 }
93}
94
95impl Decoder for Header {
96 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
97 let (input, key) = read_u16(input)?;
98 let (input, version) = read_u16(input)?;
99
100 Ok((input, Header::new(extract_response_code(key), version)))
101 }
102}
103
104impl Decoder for PublishedMessage {
105 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
106 let (input, publishing_id) = u64::decode(input)?;
107 let (input, body) = read_vec::<u8>(input)?;
108 let (_, message) = Message::decode(&body)?;
109 Ok((input, PublishedMessage::new(publishing_id, message, None)))
110 }
111
112 fn decode_version_2(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
113 let (input, publishing_id) = u64::decode(input)?;
114 let (input, body) = read_vec::<u8>(input)?;
115 let (input, filter_value) = <Option<String>>::decode(input)?;
116 let (_, message) = Message::decode(&body)?;
117 Ok((
118 input,
119 PublishedMessage::new(publishing_id, message, filter_value),
120 ))
121 }
122}
123
124impl Decoder for Vec<PublishedMessage> {
125 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
126 let (mut input, len) = u32::decode(input)?;
127 let mut result = Vec::new();
128 for _ in 0..len {
129 let (input1, published_message) = PublishedMessage::decode(input)?;
130 result.push(published_message);
131 input = input1
132 }
133 Ok((input, result))
134 }
135}
136
137fn extract_response_code(code: u16) -> u16 {
138 code & 0b0111_1111_1111_1111
139}
140
141impl Decoder for Option<String> {
142 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
143 let (input, len) = read_i16(input)?;
144
145 if len == 0 {
146 return Ok((input, None));
147 }
148 let (bytes, input) = input.split_at(len as usize);
149 let string = String::from_utf8(bytes.to_vec())?;
150 Ok((input, Some(string)))
151 }
152}
153
154impl Decoder for HashMap<String, String> {
155 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
156 let (mut input, num_properties) = read_u32(input)?;
157
158 let mut map = HashMap::with_capacity(num_properties as usize);
159 for _ in 0..num_properties {
160 let (input1, key) = Option::<String>::decode(input)?;
161 let (input2, value) = Option::<String>::decode(input1)?;
162
163 if let (Some(k), Some(v)) = (key, value) {
164 map.insert(k, v);
165 }
166 input = input2;
167 }
168
169 Ok((input, map))
170 }
171}
172
173impl Decoder for Vec<String> {
174 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
175 let (mut input, num_properties) = read_u32(input)?;
176 let mut vec: Vec<String> = Vec::new();
177 for _ in 0..num_properties {
178 let (input1, value) = Option::<String>::decode(input)?;
179
180 if let Some(v) = value {
181 vec.push(v)
182 }
183 input = input1;
184 }
185
186 Ok((input, vec))
187 }
188}
189
190impl Decoder for PublishingError {
191 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
192 let (input, publishing_id) = read_u64(input)?;
193 let (input, code) = ResponseCode::decode(input)?;
194
195 Ok((input, PublishingError::new(publishing_id, code)))
196 }
197}
198
199impl Decoder for Vec<PublishingError> {
200 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
201 let (mut input, num_properties) = read_u32(input)?;
202 let mut vec: Vec<PublishingError> = Vec::new();
203 for _ in 0..num_properties {
204 let (input1, value) = PublishingError::decode(input)?;
205
206 vec.push(value);
207 input = input1;
208 }
209
210 Ok((input, vec))
211 }
212}
213
214pub fn check_len(input: &[u8], size: usize) -> Result<(), IncompleteError> {
215 if input.len() < size {
216 return Err(IncompleteError(size));
217 }
218 Ok(())
219}
220
221macro_rules! reader {
222 ( $fn:ident, $size:expr, $ret:ty) => {
223 #[allow(unused)]
224 pub fn $fn(input: &[u8]) -> Result<(&[u8], $ret), IncompleteError> {
225 check_len(input, $size)?;
226 let x = byteorder::BigEndian::$fn(input);
227 Ok((&input[$size..], x))
228 }
229 };
230}
231
232pub fn read_u8(input: &[u8]) -> Result<(&[u8], u8), IncompleteError> {
233 check_len(input, 1)?;
234 Ok((&input[1..], input[0]))
235}
236
237pub fn read_i8(input: &[u8]) -> Result<(&[u8], i8), IncompleteError> {
238 check_len(input, 1)?;
239 Ok((&input[1..], input[0] as i8))
240}
241
242pub fn read_exact(input: &[u8], len: usize) -> Result<(&[u8], &[u8]), IncompleteError> {
243 check_len(input, len)?;
244 Ok((&input[len..], &input[..len]))
245}
246
247reader!(read_i16, 2, i16);
248reader!(read_u16, 2, u16);
249reader!(read_u32, 4, u32);
250reader!(read_i32, 4, i32);
251reader!(read_u64, 8, u64);
252reader!(read_i64, 8, i64);
253reader!(read_f32, 4, f32);
254reader!(read_f64, 8, f64);