rabbitmq_stream_protocol/codec/
encoder.rs

1use std::{collections::HashMap, io::Write};
2
3use byteorder::{BigEndian, WriteBytesExt};
4
5use crate::{error::EncodeError, types::Header, types::PublishedMessage, ResponseCode};
6
7use crate::types::PublishingError;
8
9use super::Encoder;
10
11impl Encoder for i8 {
12    fn encoded_size(&self) -> u32 {
13        1
14    }
15
16    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
17        writer.write_i8(*self)?;
18        Ok(())
19    }
20}
21
22impl Encoder for i32 {
23    fn encoded_size(&self) -> u32 {
24        4
25    }
26
27    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
28        writer.write_i32::<BigEndian>(*self)?;
29        Ok(())
30    }
31}
32
33impl Encoder for u8 {
34    fn encoded_size(&self) -> u32 {
35        1
36    }
37
38    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
39        writer.write_u8(*self)?;
40        Ok(())
41    }
42}
43
44impl Encoder for u16 {
45    fn encoded_size(&self) -> u32 {
46        2
47    }
48
49    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
50        writer.write_u16::<BigEndian>(*self)?;
51        Ok(())
52    }
53}
54
55impl Encoder for u32 {
56    fn encoded_size(&self) -> u32 {
57        4
58    }
59
60    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
61        writer.write_u32::<BigEndian>(*self)?;
62        Ok(())
63    }
64}
65
66impl Encoder for u64 {
67    fn encoded_size(&self) -> u32 {
68        8
69    }
70
71    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
72        writer.write_u64::<BigEndian>(*self)?;
73        Ok(())
74    }
75}
76
77impl Encoder for i64 {
78    fn encoded_size(&self) -> u32 {
79        8
80    }
81
82    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
83        writer.write_i64::<BigEndian>(*self)?;
84        Ok(())
85    }
86}
87
88impl Encoder for Header {
89    fn encoded_size(&self) -> u32 {
90        2 + 2
91    }
92
93    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
94        writer.write_u16::<BigEndian>(self.key())?;
95        writer.write_u16::<BigEndian>(self.version())?;
96
97        Ok(())
98    }
99}
100
101impl Encoder for PublishedMessage {
102    fn encoded_size(&self) -> u32 {
103        self.publishing_id.encoded_size() + 4 + self.message.encoded_size()
104    }
105
106    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
107        self.publishing_id.encode(writer)?;
108        self.message.encoded_size().encode(writer)?;
109        self.message.encode(writer)?;
110        Ok(())
111    }
112
113    fn encoded_size_version_2(&self) -> u32 {
114        self.publishing_id.encoded_size()
115            + self.filter_value.encoded_size()
116            + 4
117            + self.message.encoded_size()
118    }
119
120    fn encode_version_2(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
121        self.publishing_id.encode(writer)?;
122        self.filter_value.encode(writer)?;
123        self.message.encoded_size().encode(writer)?;
124        self.message.encode(writer)?;
125        Ok(())
126    }
127}
128
129impl Encoder for Vec<PublishedMessage> {
130    fn encoded_size(&self) -> u32 {
131        4 + self.iter().fold(0, |acc, v| acc + v.encoded_size())
132    }
133
134    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
135        writer.write_u32::<BigEndian>(self.len() as u32)?;
136        for x in self {
137            x.encode(writer)?;
138        }
139        Ok(())
140    }
141
142    fn encoded_size_version_2(&self) -> u32 {
143        4 + self
144            .iter()
145            .fold(0, |acc, v| acc + v.encoded_size_version_2())
146    }
147
148    fn encode_version_2(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
149        writer.write_u32::<BigEndian>(self.len() as u32)?;
150        for x in self {
151            x.encode_version_2(writer)?;
152        }
153        Ok(())
154    }
155}
156
157impl Encoder for &str {
158    fn encoded_size(&self) -> u32 {
159        2 + self.len() as u32
160    }
161
162    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
163        writer.write_i16::<BigEndian>(self.len() as i16)?;
164        writer.write_all(self.as_bytes())?;
165        Ok(())
166    }
167}
168
169impl Encoder for HashMap<String, String> {
170    fn encoded_size(&self) -> u32 {
171        4 + self.iter().fold(0, |acc, (k, v)| {
172            acc + k.as_str().encoded_size() + v.as_str().encoded_size()
173        })
174    }
175
176    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
177        writer.write_u32::<BigEndian>(self.len() as u32)?;
178
179        for (k, v) in self {
180            k.as_str().encode(writer)?;
181            v.as_str().encode(writer)?;
182        }
183        Ok(())
184    }
185}
186
187impl Encoder for Option<String> {
188    fn encoded_size(&self) -> u32 {
189        2 + self.as_ref().map_or(0, |string| string.len() as u32)
190    }
191
192    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
193        match self {
194            Some(string) => string.as_str().encode(writer)?,
195            None => writer.write_i16::<BigEndian>(0)?,
196        }
197        Ok(())
198    }
199}
200impl Encoder for Vec<String> {
201    fn encoded_size(&self) -> u32 {
202        4 + self
203            .iter()
204            .fold(0, |acc, v| acc + v.as_str().encoded_size())
205    }
206
207    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
208        writer.write_u32::<BigEndian>(self.len() as u32)?;
209        for x in self {
210            x.as_str().encode(writer)?;
211        }
212        Ok(())
213    }
214}
215
216impl Encoder for Vec<u8> {
217    fn encoded_size(&self) -> u32 {
218        4 + self.len() as u32
219    }
220
221    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
222        writer.write_i32::<BigEndian>(self.len() as i32)?;
223        writer.write_all(self)?;
224        Ok(())
225    }
226}
227
228impl Encoder for Vec<u32> {
229    fn encoded_size(&self) -> u32 {
230        4 + self.iter().fold(0, |acc, v| acc + v.encoded_size())
231    }
232
233    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
234        writer.write_i32::<BigEndian>(self.len() as i32)?;
235        for x in self {
236            x.encode(writer)?;
237        }
238        Ok(())
239    }
240}
241
242impl Encoder for Vec<u16> {
243    fn encoded_size(&self) -> u32 {
244        4 + self.iter().fold(0, |acc, v| acc + v.encoded_size())
245    }
246
247    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
248        writer.write_i32::<BigEndian>(self.len() as i32)?;
249        for x in self {
250            x.encode(writer)?;
251        }
252        Ok(())
253    }
254}
255impl Encoder for Vec<u64> {
256    fn encoded_size(&self) -> u32 {
257        4 + self.iter().fold(0, |acc, v| acc + v.encoded_size())
258    }
259
260    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
261        writer.write_i32::<BigEndian>(self.len() as i32)?;
262        for x in self {
263            x.encode(writer)?;
264        }
265        Ok(())
266    }
267}
268
269impl Encoder for ResponseCode {
270    fn encoded_size(&self) -> u32 {
271        2
272    }
273
274    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
275        writer.write_u16::<BigEndian>(self.into())?;
276        Ok(())
277    }
278}
279
280pub fn encode_response_code(code: u16) -> u16 {
281    code | 0b1000_0000_0000_0000
282}
283
284impl Encoder for PublishingError {
285    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
286        self.publishing_id.encode(writer)?;
287        self.error_code.encode(writer)?;
288        Ok(())
289    }
290
291    fn encoded_size(&self) -> u32 {
292        8 + 2
293    }
294}
295
296impl Encoder for Vec<PublishingError> {
297    fn encoded_size(&self) -> u32 {
298        4 + self.iter().fold(0, |acc, v| acc + v.encoded_size())
299    }
300
301    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
302        writer.write_i32::<BigEndian>(self.len() as i32)?;
303        for x in self {
304            x.encode(writer)?;
305        }
306        Ok(())
307    }
308}