rabbitmq_stream_protocol/codec/
decoder.rs

1use std::collections::HashMap;
2
3use byteorder::ByteOrder;
4
5use crate::error::IncompleteError;
6use crate::message::Message;
7use crate::types::PublishedMessage;
8use crate::types::PublishingError;
9use crate::ResponseCode;
10use crate::{error::DecodeError, types::Header};
11
12use super::Decoder;
13
14impl Decoder for i8 {
15    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
16        read_i8(input).map(Ok)?
17    }
18}
19
20impl Decoder for i32 {
21    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
22        read_i32(input).map(Ok)?
23    }
24}
25
26impl Decoder for u8 {
27    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
28        read_u8(input).map(Ok)?
29    }
30}
31
32impl Decoder for u16 {
33    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
34        read_u16(input).map(Ok)?
35    }
36}
37
38impl Decoder for u32 {
39    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
40        read_u32(input).map(Ok)?
41    }
42}
43
44impl Decoder for u64 {
45    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
46        read_u64(input).map(Ok)?
47    }
48}
49impl Decoder for i64 {
50    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
51        read_i64(input).map(Ok)?
52    }
53}
54
55impl Decoder for Vec<u8> {
56    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
57        let (input, len) = read_i32(input)?;
58        let len = len as usize;
59        Ok((&input[len..], input[..len].to_vec()))
60    }
61}
62
63pub fn read_vec<T: Decoder>(input: &[u8]) -> Result<(&[u8], Vec<T>), DecodeError> {
64    let (mut input, len) = read_i32(input)?;
65    let len = len as usize;
66    let mut result: Vec<T> = Vec::with_capacity(len);
67    for _ in 0..len {
68        let (input1, value) = T::decode(input)?;
69        result.push(value);
70        input = input1
71    }
72    Ok((input, result))
73}
74
75impl Decoder for Vec<u32> {
76    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
77        let (input, result) = read_vec(input)?;
78        Ok((input, result))
79    }
80}
81impl Decoder for Vec<u16> {
82    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
83        let (input, result) = read_vec(input)?;
84        Ok((input, result))
85    }
86}
87
88impl Decoder for Vec<u64> {
89    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
90        let (input, result) = read_vec(input)?;
91        Ok((input, result))
92    }
93}
94
95impl Decoder for Header {
96    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
97        let (input, key) = read_u16(input)?;
98        let (input, version) = read_u16(input)?;
99
100        Ok((input, Header::new(extract_response_code(key), version)))
101    }
102}
103
104impl Decoder for PublishedMessage {
105    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
106        let (input, publishing_id) = u64::decode(input)?;
107        let (input, body) = read_vec::<u8>(input)?;
108        let (_, message) = Message::decode(&body)?;
109        Ok((input, PublishedMessage::new(publishing_id, message, None)))
110    }
111
112    fn decode_version_2(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
113        let (input, publishing_id) = u64::decode(input)?;
114        let (input, body) = read_vec::<u8>(input)?;
115        let (input, filter_value) = <Option<String>>::decode(input)?;
116        let (_, message) = Message::decode(&body)?;
117        Ok((
118            input,
119            PublishedMessage::new(publishing_id, message, filter_value),
120        ))
121    }
122}
123
124impl Decoder for Vec<PublishedMessage> {
125    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
126        let (mut input, len) = u32::decode(input)?;
127        let mut result = Vec::new();
128        for _ in 0..len {
129            let (input1, published_message) = PublishedMessage::decode(input)?;
130            result.push(published_message);
131            input = input1
132        }
133        Ok((input, result))
134    }
135}
136
137fn extract_response_code(code: u16) -> u16 {
138    code & 0b0111_1111_1111_1111
139}
140
141impl Decoder for Option<String> {
142    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
143        let (input, len) = read_i16(input)?;
144
145        if len == 0 {
146            return Ok((input, None));
147        }
148        let (bytes, input) = input.split_at(len as usize);
149        let string = String::from_utf8(bytes.to_vec())?;
150        Ok((input, Some(string)))
151    }
152}
153
154impl Decoder for HashMap<String, String> {
155    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
156        let (mut input, num_properties) = read_u32(input)?;
157
158        let mut map = HashMap::with_capacity(num_properties as usize);
159        for _ in 0..num_properties {
160            let (input1, key) = Option::<String>::decode(input)?;
161            let (input2, value) = Option::<String>::decode(input1)?;
162
163            if let (Some(k), Some(v)) = (key, value) {
164                map.insert(k, v);
165            }
166            input = input2;
167        }
168
169        Ok((input, map))
170    }
171}
172
173impl Decoder for Vec<String> {
174    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
175        let (mut input, num_properties) = read_u32(input)?;
176        let mut vec: Vec<String> = Vec::new();
177        for _ in 0..num_properties {
178            let (input1, value) = Option::<String>::decode(input)?;
179
180            if let Some(v) = value {
181                vec.push(v)
182            }
183            input = input1;
184        }
185
186        Ok((input, vec))
187    }
188}
189
190impl Decoder for PublishingError {
191    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
192        let (input, publishing_id) = read_u64(input)?;
193        let (input, code) = ResponseCode::decode(input)?;
194
195        Ok((input, PublishingError::new(publishing_id, code)))
196    }
197}
198
199impl Decoder for Vec<PublishingError> {
200    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
201        let (mut input, num_properties) = read_u32(input)?;
202        let mut vec: Vec<PublishingError> = Vec::new();
203        for _ in 0..num_properties {
204            let (input1, value) = PublishingError::decode(input)?;
205
206            vec.push(value);
207            input = input1;
208        }
209
210        Ok((input, vec))
211    }
212}
213
214pub fn check_len(input: &[u8], size: usize) -> Result<(), IncompleteError> {
215    if input.len() < size {
216        return Err(IncompleteError(size));
217    }
218    Ok(())
219}
220
221macro_rules! reader {
222    ( $fn:ident, $size:expr, $ret:ty) => {
223        #[allow(unused)]
224        pub fn $fn(input: &[u8]) -> Result<(&[u8], $ret), IncompleteError> {
225            check_len(input, $size)?;
226            let x = byteorder::BigEndian::$fn(input);
227            Ok((&input[$size..], x))
228        }
229    };
230}
231
232pub fn read_u8(input: &[u8]) -> Result<(&[u8], u8), IncompleteError> {
233    check_len(input, 1)?;
234    Ok((&input[1..], input[0]))
235}
236
237pub fn read_i8(input: &[u8]) -> Result<(&[u8], i8), IncompleteError> {
238    check_len(input, 1)?;
239    Ok((&input[1..], input[0] as i8))
240}
241
242pub fn read_exact(input: &[u8], len: usize) -> Result<(&[u8], &[u8]), IncompleteError> {
243    check_len(input, len)?;
244    Ok((&input[len..], &input[..len]))
245}
246
247reader!(read_i16, 2, i16);
248reader!(read_u16, 2, u16);
249reader!(read_u32, 4, u32);
250reader!(read_i32, 4, i32);
251reader!(read_u64, 8, u64);
252reader!(read_i64, 8, i64);
253reader!(read_f32, 4, f32);
254reader!(read_f64, 8, f64);