1use integer_encoding::VarInt;
2
3pub const MAX_ENCODED_SIZE: usize = 10;
6
7pub fn encode_size<'a>(message: &[u8], buf: &'a mut [u8; MAX_ENCODED_SIZE]) -> &'a [u8] {
10 let varint_size = message.len().encode_var(buf);
11 &buf[..varint_size]
12}
13
14pub fn decode_size(data: &[u8]) -> Option<(usize, usize)> {
17 usize::decode_var(data)
18}
19
20pub struct Decoder {
22 stored: Vec<u8>,
23}
24
25impl Default for Decoder {
26 fn default() -> Decoder {
29 Decoder { stored: Vec::new() }
30 }
31}
32
33impl Decoder {
34 fn try_decode(&mut self, data: &[u8], mut decoded_callback: impl FnMut(&[u8])) {
35 let mut next_data = data;
36 loop {
37 if let Some((expected_size, used_bytes)) = decode_size(next_data) {
38 let remaining = &next_data[used_bytes..];
39 if remaining.len() >= expected_size {
40 let (decoded, not_decoded) = remaining.split_at(expected_size);
41 decoded_callback(decoded);
42 if !not_decoded.is_empty() {
43 next_data = not_decoded;
44 continue;
45 }
46 else {
47 break;
48 }
49 }
50 }
51 self.stored.extend_from_slice(next_data);
52 break;
53 }
54 }
55
56 fn store_and_decoded_data<'a>(&mut self, data: &'a [u8]) -> Option<(&[u8], &'a [u8])> {
57 let ((expected_size, used_bytes), data) = match decode_size(&self.stored) {
59 Some(size_info) => (size_info, data),
60 None => {
61 let max_remaining = (MAX_ENCODED_SIZE - self.stored.len()).min(data.len());
63 self.stored.extend_from_slice(&data[..max_remaining]);
64
65 if let Some(x) = decode_size(&self.stored) {
66 (x, &data[max_remaining..])
68 }
69 else {
70 return None;
72 }
73 }
74 };
75
76 let remaining = expected_size - (self.stored.len() - used_bytes);
78 if data.len() < remaining {
79 self.stored.extend_from_slice(data);
81 None
82 }
83 else {
84 let (to_store, remaining) = data.split_at(remaining);
86 self.stored.extend_from_slice(to_store);
87 Some((&self.stored[used_bytes..], remaining))
88 }
89 }
90
91 pub fn decode(&mut self, data: &[u8], mut decoded_callback: impl FnMut(&[u8])) {
96 if self.stored.is_empty() {
97 self.try_decode(data, decoded_callback);
98 }
99 else {
100 if let Some((decoded_data, remaining)) = self.store_and_decoded_data(data) {
102 decoded_callback(decoded_data);
103 self.stored.clear();
104 self.try_decode(remaining, decoded_callback);
105 }
106 }
107 }
108
109 pub fn stored_size(&self) -> usize {
113 self.stored.len()
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120
121 const MESSAGE_SIZE: usize = 20; const ENCODED_MESSAGE_SIZE: usize = 1 + MESSAGE_SIZE; const MESSAGE: [u8; MESSAGE_SIZE] = [42; MESSAGE_SIZE];
124 const MESSAGE_A: [u8; MESSAGE_SIZE] = ['A' as u8; MESSAGE_SIZE];
125 const MESSAGE_B: [u8; MESSAGE_SIZE] = ['B' as u8; MESSAGE_SIZE];
126 const MESSAGE_C: [u8; MESSAGE_SIZE] = ['C' as u8; MESSAGE_SIZE];
127
128 fn encode_message(buffer: &mut Vec<u8>, message: &[u8]) {
129 let mut buf = [0; MAX_ENCODED_SIZE];
130 buffer.extend_from_slice(&*encode_size(message, &mut buf));
131 buffer.extend_from_slice(message);
132 }
133
134 #[test]
135 fn encode_one_message() {
136 let mut buffer = Vec::new();
137 encode_message(&mut buffer, &MESSAGE);
138
139 assert_eq!(ENCODED_MESSAGE_SIZE, buffer.len());
140 let (expected_size, used_bytes) = decode_size(&buffer).unwrap();
141 assert_eq!(MESSAGE_SIZE, expected_size);
142 assert_eq!(used_bytes, 1);
143 assert_eq!(&MESSAGE, &buffer[used_bytes..]);
144 }
145
146 #[test]
147 fn encode_one_big_message() {
148 let mut buffer = Vec::new();
149 encode_message(&mut buffer, &vec![0; 1000]);
150
151 assert_eq!(1002, buffer.len());
152 let (expected_size, used_bytes) = decode_size(&buffer).unwrap();
153 assert_eq!(1000, expected_size);
154 assert_eq!(used_bytes, 2);
155 assert_eq!(&vec![0; 1000], &buffer[used_bytes..]);
156 }
157
158 #[test]
159 fn decode_one_message() {
162 let mut buffer = Vec::new();
163 encode_message(&mut buffer, &MESSAGE);
164
165 let mut decoder = Decoder::default();
166 let mut times_called = 0;
167 decoder.decode(&buffer, |decoded| {
168 times_called += 1;
169 assert_eq!(MESSAGE, decoded);
170 });
171
172 assert_eq!(1, times_called);
173 assert_eq!(0, decoder.stored.len());
174 }
175
176 #[test]
177 fn decode_message_no_size() {
180 let mut buffer = Vec::new();
181 encode_message(&mut buffer, &[]);
182
183 let mut decoder = Decoder::default();
184
185 let mut times_called = 0;
186 decoder.decode(&buffer, |_decoded| {
187 times_called += 1;
189 });
190
191 assert_eq!(1, times_called);
192 assert_eq!(0, decoder.stored.len());
193 }
194
195 #[test]
196 fn decode_message_one_byte() {
199 let mut buffer = Vec::new();
200 encode_message(&mut buffer, &[0xFF]);
201
202 let mut decoder = Decoder::default();
203
204 let mut times_called = 0;
205 decoder.decode(&buffer, |decoded| {
206 times_called += 1;
207 assert_eq!([0xFF], decoded);
208 });
209
210 assert_eq!(1, times_called);
211 assert_eq!(0, decoder.stored.len());
212 }
213
214 #[test]
215 fn decode_multiple_messages_exact() {
218 let mut buffer = Vec::new();
219
220 let messages = [&MESSAGE_A, &MESSAGE_B, &MESSAGE_C];
221 encode_message(&mut buffer, messages[0]);
222 encode_message(&mut buffer, messages[1]);
223 encode_message(&mut buffer, messages[2]);
224
225 let mut decoder = Decoder::default();
226
227 let mut times_called = 0;
228 decoder.decode(&buffer, |decoded| {
229 assert_eq!(messages[times_called], decoded);
230 times_called += 1;
231 });
232
233 assert_eq!(3, times_called);
234 assert_eq!(0, decoder.stored.len());
235 }
236
237 #[test]
238 fn decode_one_message_in_two_parts() {
241 let mut buffer = Vec::new();
242 encode_message(&mut buffer, &MESSAGE);
243
244 const SPLIT: usize = ENCODED_MESSAGE_SIZE / 2;
245 let (first, second) = buffer.split_at(SPLIT);
246
247 let mut decoder = Decoder::default();
248
249 let mut times_called = 0;
250 decoder.decode(&first, |_decoded| {
251 times_called += 1;
253 });
254
255 assert_eq!(0, times_called);
256 assert_eq!(SPLIT, decoder.stored.len());
257
258 decoder.decode(&second, |decoded| {
259 times_called += 1;
260 assert_eq!(MESSAGE, decoded);
261 });
262
263 assert_eq!(1, times_called);
264 assert_eq!(0, decoder.stored.len());
265 }
266
267 #[test]
268 fn decode_two_messages_in_two_parts() {
271 let mut buffer = Vec::new();
272 encode_message(&mut buffer, &MESSAGE);
273 encode_message(&mut buffer, &MESSAGE);
274
275 const SPLIT: usize = ENCODED_MESSAGE_SIZE * 2 / 3;
276 let (first, second) = buffer.split_at(SPLIT);
277
278 let mut decoder = Decoder::default();
279
280 let mut times_called = 0;
281 decoder.decode(&first, |_decoded| {
282 times_called += 1;
284 });
285
286 assert_eq!(0, times_called);
287 assert_eq!(SPLIT, decoder.stored.len());
288
289 decoder.decode(&second, |decoded| {
290 times_called += 1;
291 assert_eq!(MESSAGE, decoded);
292 });
293
294 assert_eq!(2, times_called);
295 assert_eq!(0, decoder.stored.len());
296 }
297
298 #[test]
299 fn decode_byte_per_byte() {
302 let mut buffer = Vec::new();
303 encode_message(&mut buffer, &MESSAGE);
304
305 let mut decoder = Decoder::default();
306
307 let mut times_called = 0;
308 for i in 0..buffer.len() {
309 decoder.decode(&buffer[i..i + 1], |decoded| {
310 assert_eq!(buffer.len() - 1, i);
311 times_called += 1;
312 assert_eq!(MESSAGE, decoded);
313 });
314
315 if i < buffer.len() - 1 {
316 assert_eq!(i + 1, decoder.stored.len());
317 }
318 }
319
320 assert_eq!(0, decoder.stored.len());
321 assert_eq!(1, times_called);
322 }
323
324 #[test]
325 fn decode_message_after_non_enough_padding() {
328 let msg = [0; 1000];
329 let mut buffer = Vec::new();
330 encode_message(&mut buffer, &msg);
331
332 let (start_1b, remaining) = buffer.split_at(2);
333
334 let mut decoder = Decoder::default();
335
336 let mut times_called = 0;
337 decoder.decode(&start_1b, |_decoded| {
338 times_called += 1;
340 });
341
342 assert_eq!(0, times_called);
343 assert_eq!(2, decoder.stored.len());
344
345 decoder.decode(&remaining, |decoded| {
346 times_called += 1;
347 assert_eq!(msg, decoded);
348 });
349
350 assert_eq!(1, times_called);
351 assert_eq!(0, decoder.stored.len());
352 }
353
354 #[test]
355 fn decode_message_var_size_in_two_data() {
358 let msg = [0; 1000];
359 let mut buffer = Vec::new();
360 encode_message(&mut buffer, &msg);
361
362 let (start_1b, remaining) = buffer.split_at(1);
363
364 let mut decoder = Decoder::default();
365
366 let mut times_called = 0;
367 decoder.decode(&start_1b, |_decoded| {
368 times_called += 1;
370 });
371
372 assert_eq!(0, times_called);
373 assert_eq!(1, decoder.stored.len());
374
375 let (next_1b, remaining) = remaining.split_at(1);
376
377 let mut times_called = 0;
378 decoder.decode(&next_1b, |_decoded| {
379 times_called += 1;
381 });
382
383 assert_eq!(0, times_called);
384 assert_eq!(2, decoder.stored.len());
385
386 decoder.decode(&remaining, |decoded| {
387 times_called += 1;
388 assert_eq!(msg, decoded);
389 });
390
391 assert_eq!(1, times_called);
392 assert_eq!(0, decoder.stored.len());
393 }
394}