message_io/util/
encoding.rs

1use integer_encoding::VarInt;
2
3/// This is the max required bytes to encode a u64 using the varint encoding scheme.
4/// It is size 10=ceil(64/7)
5pub const MAX_ENCODED_SIZE: usize = 10;
6
7/// Encode a message, returning the bytes that must be sent before the message.
8/// A buffer is used to avoid heap allocation.
9pub fn encode_size<'a>(message: &[u8], buf: &'a mut [u8; MAX_ENCODED_SIZE]) -> &'a [u8] {
10    let varint_size = message.len().encode_var(buf);
11    &buf[..varint_size]
12}
13
14/// Decodes an encoded value in a buffer.
15/// The function returns the message size and the consumed bytes or none if the buffer is too small.
16pub fn decode_size(data: &[u8]) -> Option<(usize, usize)> {
17    usize::decode_var(data)
18}
19
20/// Used to decoded messages from several/partial data chunks
21pub struct Decoder {
22    stored: Vec<u8>,
23}
24
25impl Default for Decoder {
26    /// Creates a new decoder.
27    /// It will only reserve memory in cases where decoding needs to keep data among messages.
28    fn default() -> Decoder {
29        Decoder { stored: Vec::new() }
30    }
31}
32
33impl Decoder {
34    fn try_decode(&mut self, data: &[u8], mut decoded_callback: impl FnMut(&[u8])) {
35        let mut next_data = data;
36        loop {
37            if let Some((expected_size, used_bytes)) = decode_size(next_data) {
38                let remaining = &next_data[used_bytes..];
39                if remaining.len() >= expected_size {
40                    let (decoded, not_decoded) = remaining.split_at(expected_size);
41                    decoded_callback(decoded);
42                    if !not_decoded.is_empty() {
43                        next_data = not_decoded;
44                        continue;
45                    }
46                    else {
47                        break;
48                    }
49                }
50            }
51            self.stored.extend_from_slice(next_data);
52            break;
53        }
54    }
55
56    fn store_and_decoded_data<'a>(&mut self, data: &'a [u8]) -> Option<(&[u8], &'a [u8])> {
57        // Process frame header
58        let ((expected_size, used_bytes), data) = match decode_size(&self.stored) {
59            Some(size_info) => (size_info, data),
60            None => {
61                // we append at most the potential data needed to decode the size
62                let max_remaining = (MAX_ENCODED_SIZE - self.stored.len()).min(data.len());
63                self.stored.extend_from_slice(&data[..max_remaining]);
64
65                if let Some(x) = decode_size(&self.stored) {
66                    // Now we know the size
67                    (x, &data[max_remaining..])
68                }
69                else {
70                    // We still don't know the size (data was too small)
71                    return None;
72                }
73            }
74        };
75
76        // At this point we know at least the expected size of the frame.
77        let remaining = expected_size - (self.stored.len() - used_bytes);
78        if data.len() < remaining {
79            // We need more data to decoder
80            self.stored.extend_from_slice(data);
81            None
82        }
83        else {
84            // We can complete a message here
85            let (to_store, remaining) = data.split_at(remaining);
86            self.stored.extend_from_slice(to_store);
87            Some((&self.stored[used_bytes..], remaining))
88        }
89    }
90
91    /// Tries to decode data without reserve any memory, direcly from `data`.
92    /// `decoded_callback` will be called for each decoded message.
93    /// If `data` is not enough to decoding a message, the data will be stored
94    /// until more data is decoded (more successives calls to this function).
95    pub fn decode(&mut self, data: &[u8], mut decoded_callback: impl FnMut(&[u8])) {
96        if self.stored.is_empty() {
97            self.try_decode(data, decoded_callback);
98        }
99        else {
100            //There was already data in the Decoder
101            if let Some((decoded_data, remaining)) = self.store_and_decoded_data(data) {
102                decoded_callback(decoded_data);
103                self.stored.clear();
104                self.try_decode(remaining, decoded_callback);
105            }
106        }
107    }
108
109    /// Returns the bytes len stored in this decoder.
110    /// It can include both, the padding bytes and the data message bytes.
111    /// After decoding a message, its bytes are removed from the decoder.
112    pub fn stored_size(&self) -> usize {
113        self.stored.len()
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    const MESSAGE_SIZE: usize = 20; // only works if (X + PADDING ) % 6 == 0
122    const ENCODED_MESSAGE_SIZE: usize = 1 + MESSAGE_SIZE; // 1 = log_2(20)/7
123    const MESSAGE: [u8; MESSAGE_SIZE] = [42; MESSAGE_SIZE];
124    const MESSAGE_A: [u8; MESSAGE_SIZE] = ['A' as u8; MESSAGE_SIZE];
125    const MESSAGE_B: [u8; MESSAGE_SIZE] = ['B' as u8; MESSAGE_SIZE];
126    const MESSAGE_C: [u8; MESSAGE_SIZE] = ['C' as u8; MESSAGE_SIZE];
127
128    fn encode_message(buffer: &mut Vec<u8>, message: &[u8]) {
129        let mut buf = [0; MAX_ENCODED_SIZE];
130        buffer.extend_from_slice(&*encode_size(message, &mut buf));
131        buffer.extend_from_slice(message);
132    }
133
134    #[test]
135    fn encode_one_message() {
136        let mut buffer = Vec::new();
137        encode_message(&mut buffer, &MESSAGE);
138
139        assert_eq!(ENCODED_MESSAGE_SIZE, buffer.len());
140        let (expected_size, used_bytes) = decode_size(&buffer).unwrap();
141        assert_eq!(MESSAGE_SIZE, expected_size);
142        assert_eq!(used_bytes, 1);
143        assert_eq!(&MESSAGE, &buffer[used_bytes..]);
144    }
145
146    #[test]
147    fn encode_one_big_message() {
148        let mut buffer = Vec::new();
149        encode_message(&mut buffer, &vec![0; 1000]);
150
151        assert_eq!(1002, buffer.len());
152        let (expected_size, used_bytes) = decode_size(&buffer).unwrap();
153        assert_eq!(1000, expected_size);
154        assert_eq!(used_bytes, 2);
155        assert_eq!(&vec![0; 1000], &buffer[used_bytes..]);
156    }
157
158    #[test]
159    // [ data  ]
160    // [message]
161    fn decode_one_message() {
162        let mut buffer = Vec::new();
163        encode_message(&mut buffer, &MESSAGE);
164
165        let mut decoder = Decoder::default();
166        let mut times_called = 0;
167        decoder.decode(&buffer, |decoded| {
168            times_called += 1;
169            assert_eq!(MESSAGE, decoded);
170        });
171
172        assert_eq!(1, times_called);
173        assert_eq!(0, decoder.stored.len());
174    }
175
176    #[test]
177    // [          4B         ]
178    // [       message       ]
179    fn decode_message_no_size() {
180        let mut buffer = Vec::new();
181        encode_message(&mut buffer, &[]);
182
183        let mut decoder = Decoder::default();
184
185        let mut times_called = 0;
186        decoder.decode(&buffer, |_decoded| {
187            // Should not be called
188            times_called += 1;
189        });
190
191        assert_eq!(1, times_called);
192        assert_eq!(0, decoder.stored.len());
193    }
194
195    #[test]
196    // [          5B          ]
197    // [        message       ]
198    fn decode_message_one_byte() {
199        let mut buffer = Vec::new();
200        encode_message(&mut buffer, &[0xFF]);
201
202        let mut decoder = Decoder::default();
203
204        let mut times_called = 0;
205        decoder.decode(&buffer, |decoded| {
206            times_called += 1;
207            assert_eq!([0xFF], decoded);
208        });
209
210        assert_eq!(1, times_called);
211        assert_eq!(0, decoder.stored.len());
212    }
213
214    #[test]
215    // [          data           ]
216    // [message][message][message]
217    fn decode_multiple_messages_exact() {
218        let mut buffer = Vec::new();
219
220        let messages = [&MESSAGE_A, &MESSAGE_B, &MESSAGE_C];
221        encode_message(&mut buffer, messages[0]);
222        encode_message(&mut buffer, messages[1]);
223        encode_message(&mut buffer, messages[2]);
224
225        let mut decoder = Decoder::default();
226
227        let mut times_called = 0;
228        decoder.decode(&buffer, |decoded| {
229            assert_eq!(messages[times_called], decoded);
230            times_called += 1;
231        });
232
233        assert_eq!(3, times_called);
234        assert_eq!(0, decoder.stored.len());
235    }
236
237    #[test]
238    // [ data ][ data ]
239    // [    message   ]
240    fn decode_one_message_in_two_parts() {
241        let mut buffer = Vec::new();
242        encode_message(&mut buffer, &MESSAGE);
243
244        const SPLIT: usize = ENCODED_MESSAGE_SIZE / 2;
245        let (first, second) = buffer.split_at(SPLIT);
246
247        let mut decoder = Decoder::default();
248
249        let mut times_called = 0;
250        decoder.decode(&first, |_decoded| {
251            // Should not be called
252            times_called += 1;
253        });
254
255        assert_eq!(0, times_called);
256        assert_eq!(SPLIT, decoder.stored.len());
257
258        decoder.decode(&second, |decoded| {
259            times_called += 1;
260            assert_eq!(MESSAGE, decoded);
261        });
262
263        assert_eq!(1, times_called);
264        assert_eq!(0, decoder.stored.len());
265    }
266
267    #[test]
268    // [ data ][        data        ]
269    // [   message   ][   message   ]
270    fn decode_two_messages_in_two_parts() {
271        let mut buffer = Vec::new();
272        encode_message(&mut buffer, &MESSAGE);
273        encode_message(&mut buffer, &MESSAGE);
274
275        const SPLIT: usize = ENCODED_MESSAGE_SIZE * 2 / 3;
276        let (first, second) = buffer.split_at(SPLIT);
277
278        let mut decoder = Decoder::default();
279
280        let mut times_called = 0;
281        decoder.decode(&first, |_decoded| {
282            // Should not be called
283            times_called += 1;
284        });
285
286        assert_eq!(0, times_called);
287        assert_eq!(SPLIT, decoder.stored.len());
288
289        decoder.decode(&second, |decoded| {
290            times_called += 1;
291            assert_eq!(MESSAGE, decoded);
292        });
293
294        assert_eq!(2, times_called);
295        assert_eq!(0, decoder.stored.len());
296    }
297
298    #[test]
299    // [ 1B ][ 1B ][...][ 1B ]
300    // [       message       ]
301    fn decode_byte_per_byte() {
302        let mut buffer = Vec::new();
303        encode_message(&mut buffer, &MESSAGE);
304
305        let mut decoder = Decoder::default();
306
307        let mut times_called = 0;
308        for i in 0..buffer.len() {
309            decoder.decode(&buffer[i..i + 1], |decoded| {
310                assert_eq!(buffer.len() - 1, i);
311                times_called += 1;
312                assert_eq!(MESSAGE, decoded);
313            });
314
315            if i < buffer.len() - 1 {
316                assert_eq!(i + 1, decoder.stored.len());
317            }
318        }
319
320        assert_eq!(0, decoder.stored.len());
321        assert_eq!(1, times_called);
322    }
323
324    #[test]
325    // [ 1B ][   remaining   ]
326    // [       message       ]
327    fn decode_message_after_non_enough_padding() {
328        let msg = [0; 1000];
329        let mut buffer = Vec::new();
330        encode_message(&mut buffer, &msg);
331
332        let (start_1b, remaining) = buffer.split_at(2);
333
334        let mut decoder = Decoder::default();
335
336        let mut times_called = 0;
337        decoder.decode(&start_1b, |_decoded| {
338            // Should not be called
339            times_called += 1;
340        });
341
342        assert_eq!(0, times_called);
343        assert_eq!(2, decoder.stored.len());
344
345        decoder.decode(&remaining, |decoded| {
346            times_called += 1;
347            assert_eq!(msg, decoded);
348        });
349
350        assert_eq!(1, times_called);
351        assert_eq!(0, decoder.stored.len());
352    }
353
354    #[test]
355    // [ 1B ][ 1B ][ remaining   ]
356    // [         message         ]
357    fn decode_message_var_size_in_two_data() {
358        let msg = [0; 1000];
359        let mut buffer = Vec::new();
360        encode_message(&mut buffer, &msg);
361
362        let (start_1b, remaining) = buffer.split_at(1);
363
364        let mut decoder = Decoder::default();
365
366        let mut times_called = 0;
367        decoder.decode(&start_1b, |_decoded| {
368            // Should not be called
369            times_called += 1;
370        });
371
372        assert_eq!(0, times_called);
373        assert_eq!(1, decoder.stored.len());
374
375        let (next_1b, remaining) = remaining.split_at(1);
376
377        let mut times_called = 0;
378        decoder.decode(&next_1b, |_decoded| {
379            // Should not be called
380            times_called += 1;
381        });
382
383        assert_eq!(0, times_called);
384        assert_eq!(2, decoder.stored.len());
385
386        decoder.decode(&remaining, |decoded| {
387            times_called += 1;
388            assert_eq!(msg, decoded);
389        });
390
391        assert_eq!(1, times_called);
392        assert_eq!(0, decoder.stored.len());
393    }
394}