1use log::*;
5use std::collections::BTreeMap;
6use thiserror::Error;
7
8#[derive(Debug, Error, PartialEq, Eq)]
9pub enum OrderedMessageError {
10 #[error("received message with sequence number {received}, which is way higher than our current {current}")]
11 MessageSequenceTooLarge { current: u64, received: u64 },
12
13 #[error("received message with sequence number {received}, while we're already at {current}!")]
14 MessageAlreadyReconstructed { current: u64, received: u64 },
15
16 #[error("attempted to overwrite message at sequence {received}")]
17 AttemptedToOverwriteSequence { received: u64 },
18}
19
20#[derive(Debug)]
26pub struct OrderedMessageBuffer {
27 next_sequence: u64,
28 messages: BTreeMap<u64, Vec<u8>>,
29}
30
31#[derive(Debug, PartialEq, Eq)]
33pub struct ReadContiguousData {
34 pub data: Vec<u8>,
35 pub last_sequence: u64,
36}
37
38const MAX_REASONABLE_OFFSET: u64 = 1000;
39
40impl OrderedMessageBuffer {
41 pub fn new() -> OrderedMessageBuffer {
42 OrderedMessageBuffer {
43 next_sequence: 0,
44 messages: BTreeMap::new(),
45 }
46 }
47
48 pub fn write(&mut self, sequence: u64, data: Vec<u8>) -> Result<(), OrderedMessageError> {
52 if sequence > self.next_sequence + MAX_REASONABLE_OFFSET {
54 return Err(OrderedMessageError::MessageSequenceTooLarge {
55 current: self.next_sequence,
56 received: sequence,
57 });
58 }
59
60 if self.messages.contains_key(&sequence) {
61 return Err(OrderedMessageError::AttemptedToOverwriteSequence { received: sequence });
62 }
63
64 if sequence < self.next_sequence {
65 return Err(OrderedMessageError::MessageAlreadyReconstructed {
66 current: self.next_sequence,
67 received: sequence,
68 });
69 }
70
71 trace!(
72 "Writing message index: {} length {} to OrderedMessageBuffer.",
73 sequence,
74 data.len()
75 );
76
77 self.messages.insert(sequence, data);
78 Ok(())
79 }
80
81 pub fn can_read_until(&self, target: u64) -> bool {
83 for seq in self.next_sequence..=target {
84 if !self.messages.contains_key(&seq) {
85 return false;
86 }
87 }
88 true
89 }
90
91 #[must_use]
100 pub fn read(&mut self) -> Option<ReadContiguousData> {
101 if !self.messages.contains_key(&self.next_sequence) {
102 return None;
103 }
104
105 let mut contiguous_messages = Vec::new();
106 let mut seq = self.next_sequence;
107
108 while let Some(mut data) = self.messages.remove(&seq) {
109 contiguous_messages.append(&mut data);
110 seq += 1;
111 }
112
113 let high_water = seq;
114 self.next_sequence = high_water;
115 trace!("Next high water mark is: {high_water}");
116
117 trace!(
118 "Returning {} bytes from ordered message buffer",
119 contiguous_messages.len()
120 );
121 Some(ReadContiguousData {
122 data: contiguous_messages,
123 last_sequence: self.next_sequence - 1,
124 })
125 }
126}
127
128impl Default for OrderedMessageBuffer {
129 fn default() -> Self {
130 OrderedMessageBuffer::new()
131 }
132}
133
134#[cfg(test)]
135mod test_chunking_and_reassembling {
136 use super::*;
137
138 #[test]
139 fn trying_to_write_unreasonable_high_sequence() {
140 let mut buffer = OrderedMessageBuffer::new();
141 let first_message = vec![1, 2, 3, 4];
142 let second_message = vec![5, 6, 7, 8];
143
144 buffer.write(0, first_message).unwrap();
145 buffer.write(1, second_message).unwrap();
146
147 assert_eq!(
148 Err(OrderedMessageError::MessageSequenceTooLarge {
149 current: 0,
150 received: 12345678
151 }),
152 buffer.write(12345678, b"foomp".to_vec())
153 )
154 }
155
156 #[test]
157 fn trying_to_overwrite_sequence() {
158 let mut buffer = OrderedMessageBuffer::new();
159 let message = vec![1, 2, 3, 4];
160
161 buffer.write(0, message.clone()).unwrap();
162 buffer.write(1, message.clone()).unwrap();
163 buffer.write(2, message.clone()).unwrap();
164 buffer.write(3, message.clone()).unwrap();
165
166 for seq in 0..=3 {
167 assert_eq!(
168 Err(OrderedMessageError::AttemptedToOverwriteSequence { received: seq }),
169 buffer.write(seq, message.clone())
170 )
171 }
172 }
173
174 #[test]
175 fn writing_past_data() {
176 let mut buffer = OrderedMessageBuffer::new();
177 let message = vec![1, 2, 3, 4];
178
179 buffer.write(0, message.clone()).unwrap();
180 buffer.write(1, message.clone()).unwrap();
181 buffer.write(2, message.clone()).unwrap();
182 buffer.write(3, message.clone()).unwrap();
183 let _ = buffer.read().unwrap();
184
185 for seq in 0..=3 {
186 assert_eq!(
187 Err(OrderedMessageError::MessageAlreadyReconstructed {
188 current: 4,
189 received: seq
190 }),
191 buffer.write(seq, message.clone())
192 )
193 }
194 }
195
196 #[cfg(test)]
197 mod reading_from_and_writing_to_the_buffer {
198 use super::*;
199
200 #[cfg(test)]
201 mod when_full_ordered_sequence_exists {
202 use super::*;
203
204 #[test]
205 fn read_returns_ordered_bytes_and_resets_buffer() {
206 let mut buffer = OrderedMessageBuffer::new();
207
208 let first_message = vec![1, 2, 3, 4];
209 let second_message = vec![5, 6, 7, 8];
210
211 buffer.write(0, first_message).unwrap();
212 let first_read = buffer.read().unwrap().data;
213 assert_eq!(vec![1, 2, 3, 4], first_read);
214
215 buffer.write(1, second_message).unwrap();
216 let second_read = buffer.read().unwrap().data;
217 assert_eq!(vec![5, 6, 7, 8], second_read);
218
219 assert_eq!(None, buffer.read()); }
221
222 #[test]
223 fn test_multiple_adds_stacks_up_bytes_in_the_buffer() {
224 let mut buffer = OrderedMessageBuffer::new();
225
226 let first_message = vec![1, 2, 3, 4];
227 let second_message = vec![5, 6, 7, 8];
228
229 buffer.write(0, first_message).unwrap();
230 buffer.write(1, second_message).unwrap();
231 let second_read = buffer.read();
232 assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], second_read.unwrap().data);
233 assert_eq!(None, buffer.read()); }
235
236 #[test]
237 fn out_of_order_adds_results_in_ordered_byte_vector() {
238 let mut buffer = OrderedMessageBuffer::new();
239
240 let first_message = vec![1, 2, 3, 4];
241 let second_message = vec![5, 6, 7, 8];
242
243 buffer.write(1, second_message).unwrap();
244 buffer.write(0, first_message).unwrap();
245 let read = buffer.read().unwrap().data;
246 assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], read);
247 assert_eq!(None, buffer.read()); }
249 }
250
251 mod when_there_are_gaps_in_the_sequence {
252 use super::*;
253
254 #[cfg(test)]
255 fn setup() -> OrderedMessageBuffer {
256 let mut buffer = OrderedMessageBuffer::new();
257
258 let zero_message = vec![0, 0, 0, 0];
259 let one_message = vec![1, 1, 1, 1];
260 let three_message = vec![3, 3, 3, 3];
261
262 buffer.write(0, zero_message).unwrap();
263 buffer.write(1, one_message).unwrap();
264 buffer.write(3, three_message).unwrap();
265 buffer
266 }
267 #[test]
268 fn everything_up_to_the_indexing_gap_is_returned() {
269 let mut buffer = setup();
270 let ordered_bytes = buffer.read().unwrap().data;
271 assert_eq!([0, 0, 0, 0, 1, 1, 1, 1].to_vec(), ordered_bytes);
272
273 assert_eq!(None, buffer.read());
275
276 let five_message = vec![5, 5, 5, 5];
278 buffer.write(5, five_message).unwrap();
279 assert_eq!(None, buffer.read());
280 }
281
282 #[test]
283 fn filling_the_gap_allows_us_to_get_everything() {
284 let mut buffer = setup();
285 let _ = buffer.read(); let two_message = vec![2, 2, 2, 2];
288 buffer.write(2, two_message).unwrap();
289
290 let more_ordered_bytes = buffer.read().unwrap().data;
291 assert_eq!([2, 2, 2, 2, 3, 3, 3, 3].to_vec(), more_ordered_bytes);
292
293 let five_message = vec![5, 5, 5, 5];
295 buffer.write(5, five_message).unwrap();
296
297 assert_eq!(None, buffer.read());
298
299 let four_message = vec![4, 4, 4, 4];
301 buffer.write(4, four_message).unwrap();
302
303 assert_eq!(
304 [4, 4, 4, 4, 5, 5, 5, 5].to_vec(),
305 buffer.read().unwrap().data
306 );
307
308 assert_eq!(None, buffer.read());
310 }
311
312 #[test]
313 fn filling_the_gap_allows_us_to_get_everything_when_last_element_is_empty() {
314 let mut buffer = OrderedMessageBuffer::new();
315 let zero_message = vec![0, 0, 0, 0];
316 let one_message = vec![2, 2, 2, 2];
317 let two_message = vec![];
318
319 buffer.write(0, zero_message).unwrap();
320 assert!(buffer.read().is_some()); buffer.write(2, two_message).unwrap();
323 buffer.write(1, one_message).unwrap();
324 assert!(buffer.read().is_some());
325 assert_eq!(buffer.next_sequence, 3);
326 }
327
328 #[test]
329 fn works_with_gaps_bigger_than_one() {
330 let mut buffer = OrderedMessageBuffer::new();
331 let zero_message = vec![0, 0, 0, 0];
332 let one_message = vec![2, 2, 2, 2];
333 let two_message = vec![2, 2, 2, 2];
334 let three_message = vec![2, 2, 2, 2];
335 let four_message = vec![2, 2, 2, 2];
336
337 buffer.write(0, zero_message).unwrap();
338 assert!(buffer.read().is_some());
339 assert_eq!(buffer.next_sequence, 1);
340
341 buffer.write(4, four_message).unwrap();
342 assert!(buffer.read().is_none());
343 assert_eq!(buffer.next_sequence, 1);
344
345 buffer.write(3, three_message).unwrap();
346 assert!(buffer.read().is_none());
347 assert_eq!(buffer.next_sequence, 1);
348
349 buffer.write(2, two_message).unwrap();
350 assert!(buffer.read().is_none());
351 assert_eq!(buffer.next_sequence, 1);
352
353 buffer.write(1, one_message).unwrap();
354 assert!(buffer.read().is_some());
355 assert_eq!(buffer.next_sequence, 5)
356 }
357 }
358 }
359}