nym_ordered_buffer/
buffer.rs

1// Copyright 2020-2023 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use log::*;
5use std::collections::BTreeMap;
6use thiserror::Error;
7
8#[derive(Debug, Error, PartialEq, Eq)]
9pub enum OrderedMessageError {
10    #[error("received message with sequence number {received}, which is way higher than our current {current}")]
11    MessageSequenceTooLarge { current: u64, received: u64 },
12
13    #[error("received message with sequence number {received}, while we're already at {current}!")]
14    MessageAlreadyReconstructed { current: u64, received: u64 },
15
16    #[error("attempted to overwrite message at sequence {received}")]
17    AttemptedToOverwriteSequence { received: u64 },
18}
19
20/// Stores messages and emits them in order.
21///
22/// Only contiguous messages with an index less than or equal to `next_index`
23/// will be returned - this avoids returning gaps while we wait for the buffer
24/// to fill up with the full sequence.
25#[derive(Debug)]
26pub struct OrderedMessageBuffer {
27    next_sequence: u64,
28    messages: BTreeMap<u64, Vec<u8>>,
29}
30
31/// Data returned from `OrderedMessageBuffer` on a successful read of gapless ordered data.
32#[derive(Debug, PartialEq, Eq)]
33pub struct ReadContiguousData {
34    pub data: Vec<u8>,
35    pub last_sequence: u64,
36}
37
38const MAX_REASONABLE_OFFSET: u64 = 1000;
39
40impl OrderedMessageBuffer {
41    pub fn new() -> OrderedMessageBuffer {
42        OrderedMessageBuffer {
43            next_sequence: 0,
44            messages: BTreeMap::new(),
45        }
46    }
47
48    /// Writes a message to the buffer. messages are sort on insertion, so
49    /// that later on multiple reads for incomplete sequences don't result in
50    /// useless sort work.
51    pub fn write(&mut self, sequence: u64, data: Vec<u8>) -> Result<(), OrderedMessageError> {
52        // reject messages that have clearly malformed sequence
53        if sequence > self.next_sequence + MAX_REASONABLE_OFFSET {
54            return Err(OrderedMessageError::MessageSequenceTooLarge {
55                current: self.next_sequence,
56                received: sequence,
57            });
58        }
59
60        if self.messages.contains_key(&sequence) {
61            return Err(OrderedMessageError::AttemptedToOverwriteSequence { received: sequence });
62        }
63
64        if sequence < self.next_sequence {
65            return Err(OrderedMessageError::MessageAlreadyReconstructed {
66                current: self.next_sequence,
67                received: sequence,
68            });
69        }
70
71        trace!(
72            "Writing message index: {} length {} to OrderedMessageBuffer.",
73            sequence,
74            data.len()
75        );
76
77        self.messages.insert(sequence, data);
78        Ok(())
79    }
80
81    /// Checks whether the buffer contains enough contiguous regions to read until the specified target sequence.
82    pub fn can_read_until(&self, target: u64) -> bool {
83        for seq in self.next_sequence..=target {
84            if !self.messages.contains_key(&seq) {
85                return false;
86            }
87        }
88        true
89    }
90
91    /// Returns `Option<Vec<u8>>` where it's `Some(bytes)` if there is gapless
92    /// ordered data in the buffer, and `None` if the buffer is empty or has
93    /// gaps in the contained data.
94    ///
95    /// E.g. if the buffer contains messages with indexes 0, 1, 2, and 4, then
96    /// a read will return the bytes of messages 0, 1, 2. Subsequent reads will
97    /// return `None` until message 3 comes in, at which point 3, 4, and any
98    /// further contiguous messages which have arrived will be returned.
99    #[must_use]
100    pub fn read(&mut self) -> Option<ReadContiguousData> {
101        if !self.messages.contains_key(&self.next_sequence) {
102            return None;
103        }
104
105        let mut contiguous_messages = Vec::new();
106        let mut seq = self.next_sequence;
107
108        while let Some(mut data) = self.messages.remove(&seq) {
109            contiguous_messages.append(&mut data);
110            seq += 1;
111        }
112
113        let high_water = seq;
114        self.next_sequence = high_water;
115        trace!("Next high water mark is: {high_water}");
116
117        trace!(
118            "Returning {} bytes from ordered message buffer",
119            contiguous_messages.len()
120        );
121        Some(ReadContiguousData {
122            data: contiguous_messages,
123            last_sequence: self.next_sequence - 1,
124        })
125    }
126}
127
128impl Default for OrderedMessageBuffer {
129    fn default() -> Self {
130        OrderedMessageBuffer::new()
131    }
132}
133
134#[cfg(test)]
135mod test_chunking_and_reassembling {
136    use super::*;
137
138    #[test]
139    fn trying_to_write_unreasonable_high_sequence() {
140        let mut buffer = OrderedMessageBuffer::new();
141        let first_message = vec![1, 2, 3, 4];
142        let second_message = vec![5, 6, 7, 8];
143
144        buffer.write(0, first_message).unwrap();
145        buffer.write(1, second_message).unwrap();
146
147        assert_eq!(
148            Err(OrderedMessageError::MessageSequenceTooLarge {
149                current: 0,
150                received: 12345678
151            }),
152            buffer.write(12345678, b"foomp".to_vec())
153        )
154    }
155
156    #[test]
157    fn trying_to_overwrite_sequence() {
158        let mut buffer = OrderedMessageBuffer::new();
159        let message = vec![1, 2, 3, 4];
160
161        buffer.write(0, message.clone()).unwrap();
162        buffer.write(1, message.clone()).unwrap();
163        buffer.write(2, message.clone()).unwrap();
164        buffer.write(3, message.clone()).unwrap();
165
166        for seq in 0..=3 {
167            assert_eq!(
168                Err(OrderedMessageError::AttemptedToOverwriteSequence { received: seq }),
169                buffer.write(seq, message.clone())
170            )
171        }
172    }
173
174    #[test]
175    fn writing_past_data() {
176        let mut buffer = OrderedMessageBuffer::new();
177        let message = vec![1, 2, 3, 4];
178
179        buffer.write(0, message.clone()).unwrap();
180        buffer.write(1, message.clone()).unwrap();
181        buffer.write(2, message.clone()).unwrap();
182        buffer.write(3, message.clone()).unwrap();
183        let _ = buffer.read().unwrap();
184
185        for seq in 0..=3 {
186            assert_eq!(
187                Err(OrderedMessageError::MessageAlreadyReconstructed {
188                    current: 4,
189                    received: seq
190                }),
191                buffer.write(seq, message.clone())
192            )
193        }
194    }
195
196    #[cfg(test)]
197    mod reading_from_and_writing_to_the_buffer {
198        use super::*;
199
200        #[cfg(test)]
201        mod when_full_ordered_sequence_exists {
202            use super::*;
203
204            #[test]
205            fn read_returns_ordered_bytes_and_resets_buffer() {
206                let mut buffer = OrderedMessageBuffer::new();
207
208                let first_message = vec![1, 2, 3, 4];
209                let second_message = vec![5, 6, 7, 8];
210
211                buffer.write(0, first_message).unwrap();
212                let first_read = buffer.read().unwrap().data;
213                assert_eq!(vec![1, 2, 3, 4], first_read);
214
215                buffer.write(1, second_message).unwrap();
216                let second_read = buffer.read().unwrap().data;
217                assert_eq!(vec![5, 6, 7, 8], second_read);
218
219                assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
220            }
221
222            #[test]
223            fn test_multiple_adds_stacks_up_bytes_in_the_buffer() {
224                let mut buffer = OrderedMessageBuffer::new();
225
226                let first_message = vec![1, 2, 3, 4];
227                let second_message = vec![5, 6, 7, 8];
228
229                buffer.write(0, first_message).unwrap();
230                buffer.write(1, second_message).unwrap();
231                let second_read = buffer.read();
232                assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap().data);
233                assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
234            }
235
236            #[test]
237            fn out_of_order_adds_results_in_ordered_byte_vector() {
238                let mut buffer = OrderedMessageBuffer::new();
239
240                let first_message = vec![1, 2, 3, 4];
241                let second_message = vec![5, 6, 7, 8];
242
243                buffer.write(1, second_message).unwrap();
244                buffer.write(0, first_message).unwrap();
245                let read = buffer.read().unwrap().data;
246                assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read);
247                assert_eq!(None, buffer.read()); // second read on fully ordered result set is empty
248            }
249        }
250
251        mod when_there_are_gaps_in_the_sequence {
252            use super::*;
253
254            #[cfg(test)]
255            fn setup() -> OrderedMessageBuffer {
256                let mut buffer = OrderedMessageBuffer::new();
257
258                let zero_message = vec![0, 0, 0, 0];
259                let one_message = vec![1, 1, 1, 1];
260                let three_message = vec![3, 3, 3, 3];
261
262                buffer.write(0, zero_message).unwrap();
263                buffer.write(1, one_message).unwrap();
264                buffer.write(3, three_message).unwrap();
265                buffer
266            }
267            #[test]
268            fn everything_up_to_the_indexing_gap_is_returned() {
269                let mut buffer = setup();
270                let ordered_bytes = buffer.read().unwrap().data;
271                assert_eq!([0, 0, 0, 0, 1, 1, 1, 1].to_vec(), ordered_bytes);
272
273                // we shouldn't get any more from a second attempt if nothing is added
274                assert_eq!(None, buffer.read());
275
276                // let's add another message, leaving a gap in place at index 2
277                let five_message = vec![5, 5, 5, 5];
278                buffer.write(5, five_message).unwrap();
279                assert_eq!(None, buffer.read());
280            }
281
282            #[test]
283            fn filling_the_gap_allows_us_to_get_everything() {
284                let mut buffer = setup();
285                let _ = buffer.read(); // that burns the first two. We still have a gap before the 3s.
286
287                let two_message = vec![2, 2, 2, 2];
288                buffer.write(2, two_message).unwrap();
289
290                let more_ordered_bytes = buffer.read().unwrap().data;
291                assert_eq!([2, 2, 2, 2, 3, 3, 3, 3].to_vec(), more_ordered_bytes);
292
293                // let's add another message
294                let five_message = vec![5, 5, 5, 5];
295                buffer.write(5, five_message).unwrap();
296
297                assert_eq!(None, buffer.read());
298
299                // let's fill in the gap of 4s now and read again
300                let four_message = vec![4, 4, 4, 4];
301                buffer.write(4, four_message).unwrap();
302
303                assert_eq!(
304                    [4, 4, 4, 4, 5, 5, 5, 5].to_vec(),
305                    buffer.read().unwrap().data
306                );
307
308                // at this point we should again get back nothing if we try a read
309                assert_eq!(None, buffer.read());
310            }
311
312            #[test]
313            fn filling_the_gap_allows_us_to_get_everything_when_last_element_is_empty() {
314                let mut buffer = OrderedMessageBuffer::new();
315                let zero_message = vec![0, 0, 0, 0];
316                let one_message = vec![2, 2, 2, 2];
317                let two_message = vec![];
318
319                buffer.write(0, zero_message).unwrap();
320                assert!(buffer.read().is_some()); // burn the buffer
321
322                buffer.write(2, two_message).unwrap();
323                buffer.write(1, one_message).unwrap();
324                assert!(buffer.read().is_some());
325                assert_eq!(buffer.next_sequence, 3);
326            }
327
328            #[test]
329            fn works_with_gaps_bigger_than_one() {
330                let mut buffer = OrderedMessageBuffer::new();
331                let zero_message = vec![0, 0, 0, 0];
332                let one_message = vec![2, 2, 2, 2];
333                let two_message = vec![2, 2, 2, 2];
334                let three_message = vec![2, 2, 2, 2];
335                let four_message = vec![2, 2, 2, 2];
336
337                buffer.write(0, zero_message).unwrap();
338                assert!(buffer.read().is_some());
339                assert_eq!(buffer.next_sequence, 1);
340
341                buffer.write(4, four_message).unwrap();
342                assert!(buffer.read().is_none());
343                assert_eq!(buffer.next_sequence, 1);
344
345                buffer.write(3, three_message).unwrap();
346                assert!(buffer.read().is_none());
347                assert_eq!(buffer.next_sequence, 1);
348
349                buffer.write(2, two_message).unwrap();
350                assert!(buffer.read().is_none());
351                assert_eq!(buffer.next_sequence, 1);
352
353                buffer.write(1, one_message).unwrap();
354                assert!(buffer.read().is_some());
355                assert_eq!(buffer.next_sequence, 5)
356            }
357        }
358    }
359}