1#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
2
3use std::mem;
6
7use bytes::{BufMut, BytesMut};
8
9use thiserror::Error;
10
11#[derive(Copy, Clone, Debug, Error, PartialEq, Eq)]
13pub enum Error {
14 #[error("Not enough space in buffer segment")]
16 NotEnoughSpace,
17
18 #[error("Would overwrite previously received data")]
20 WouldOverwrite,
21}
22
23#[derive(Debug)]
25pub struct BytesQuilt {
26 tail_offset: usize,
27 segments: Vec<Segment>,
28 buffer_tail: BytesMut,
29}
30
31#[derive(Copy, Clone, Debug, PartialEq)]
32enum Status {
33 Missing,
34 Received,
35}
36
37#[derive(Clone, Debug, PartialEq)]
38struct Segment {
39 status: Status,
40 offset: usize,
41 buffer: BytesMut,
42}
43
44#[derive(Copy, Clone, Debug, PartialEq, Eq)]
46pub struct MissingSegment {
47 offset: usize,
48 length: usize,
49}
50
51impl Default for BytesQuilt {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57impl BytesQuilt {
58 pub fn new() -> Self {
60 Self {
61 tail_offset: 0,
62 segments: Vec::new(),
63 buffer_tail: BytesMut::new(),
64 }
65 }
66
67 pub fn with_capacity(capacity: usize) -> Self {
69 Self {
70 tail_offset: 0,
71 segments: Vec::new(),
72 buffer_tail: BytesMut::with_capacity(capacity),
73 }
74 }
75
76 fn write_offset_at_index(
77 &mut self,
78 index: usize,
79 offset: usize,
80 bytes: &[u8],
81 ) -> Result<(), Error> {
82 use std::cmp::Ordering;
83 let segment = &mut self.segments[index];
84 if segment.status == Status::Received {
85 return Err(Error::WouldOverwrite);
86 }
87 match segment.buffer.capacity().cmp(&bytes.len()) {
88 Ordering::Less => return Err(Error::NotEnoughSpace),
91 Ordering::Equal => {
92 segment.status = Status::Received;
93 segment.buffer.put(bytes);
94 }
95 Ordering::Greater => {
96 segment.status = Status::Received;
97 segment.buffer.put(bytes);
98 let new_relative_offset = segment.buffer.len();
99 let remaining_segment = segment.buffer.split_off(new_relative_offset);
100 self.segments.insert(
101 index + 1,
102 Segment::missing(offset + new_relative_offset, remaining_segment),
103 );
104 }
105 };
106 Ok(())
107 }
108
109 pub fn put_at(&mut self, offset: usize, src: &[u8]) -> Result<Option<MissingSegment>, Error> {
113 let mut missing_segment = None;
114 debug_assert!(
115 self.segments
116 .first()
117 .map(|segment| segment.offset == 0)
118 .unwrap_or(true),
119 "first segment offset should be zero, found {:?}",
120 self.segments.first()
121 );
122 if self.tail_offset > offset {
123 match self
125 .segments
126 .binary_search_by_key(&offset, |segment| segment.offset)
127 {
128 Ok(index) => {
129 self.write_offset_at_index(index, offset, src)?;
130 }
131 Err(index) => {
132 let segment = &mut self.segments[index - 1];
136 let to_write_buffer = segment.buffer.split_off(offset - segment.offset);
137 let segment = Segment::missing(offset, to_write_buffer);
138 self.segments.insert(index, segment);
139 self.write_offset_at_index(index, offset, src)?;
140 }
141 };
142 return Ok(None);
143 } else if self.tail_offset + self.buffer_tail.len() < offset {
144 if !self.buffer_tail.is_empty() {
145 let head_offset = self.tail_offset;
146 let head_received_bytes = self.buffer_tail.split();
147 self.tail_offset += head_received_bytes.len();
148 self.segments
149 .push(Segment::received(head_offset, head_received_bytes));
150 }
151
152 let head_offset = self.tail_offset;
153 self.tail_offset = offset;
154
155 let tail_bytes = self.buffer_tail.split_off(offset - head_offset);
156 let head_bytes = mem::replace(&mut self.buffer_tail, tail_bytes);
157
158 debug_assert!(head_bytes.is_empty());
161 let segment = Segment::missing(head_offset, head_bytes);
162 missing_segment = segment.missing_segment();
163 self.segments.push(segment);
164 } else if self.tail_offset == offset && !self.buffer_tail.is_empty() {
165 return Err(Error::WouldOverwrite);
167 }
168 self.buffer_tail.put(src);
169 Ok(missing_segment)
170 }
171
172 pub fn missing_segments(&self) -> impl '_ + Iterator<Item = MissingSegment> {
174 self.segments.iter().filter_map(Segment::missing_segment)
175 }
176
177 pub fn into_inner(self) -> BytesMut {
179 let mut segments = self.segments.into_iter();
180 if let Some(segment) = segments.next() {
181 debug_assert!(
184 !segment.is_missing(),
185 "a segment at offset {} of size {} is missing",
186 segment.offset,
187 segment.buffer.len(),
188 );
189 let mut buffer: BytesMut = segment.buffer;
190 for segment in segments {
191 debug_assert!(
192 !segment.is_missing(),
193 "a segment at offset {} of size {} is missing",
194 segment.offset,
195 segment.buffer.len(),
196 );
197 buffer.unsplit(segment.buffer);
198 }
199 buffer.unsplit(self.buffer_tail);
200 return buffer;
201 }
202 self.buffer_tail
203 }
204}
205
206impl Segment {
207 fn missing(offset: usize, buffer: BytesMut) -> Self {
208 Self {
209 status: Status::Missing,
210 offset,
211 buffer,
212 }
213 }
214
215 fn received(offset: usize, buffer: BytesMut) -> Self {
216 Self {
217 status: Status::Received,
218 offset,
219 buffer,
220 }
221 }
222
223 fn is_missing(&self) -> bool {
224 self.status == Status::Missing
225 }
226
227 fn missing_segment(&self) -> Option<MissingSegment> {
228 match self.status {
229 Status::Missing => Some(MissingSegment {
230 offset: self.offset,
231 length: self.buffer.capacity(),
232 }),
233 Status::Received => None,
234 }
235 }
236}
237
238impl MissingSegment {
239 pub fn offsets_for(self, frame_size: usize) -> impl Iterator<Item = usize> {
243 let offset = self.offset;
244 let number_of_frames = self.length / frame_size;
245 (0..number_of_frames).map(move |index| (index * frame_size) + offset)
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 mod missing_segment {
254 use super::*;
255
256 #[test]
257 fn one_offset_missing() {
258 let segment = MissingSegment {
259 offset: 0,
260 length: 10,
261 };
262 assert_eq!(&[0][..], segment.offsets_for(10).collect::<Vec<_>>());
263 let segment = MissingSegment {
264 offset: 10,
265 length: 10,
266 };
267 assert_eq!(&[10][..], segment.offsets_for(10).collect::<Vec<_>>());
268 }
269
270 #[test]
271 fn two_offsets_missing() {
272 let segment = MissingSegment {
273 offset: 0,
274 length: 10,
275 };
276 assert_eq!(&[0, 5][..], segment.offsets_for(5).collect::<Vec<_>>());
277 let segment = MissingSegment {
278 offset: 10,
279 length: 10,
280 };
281 assert_eq!(&[10, 15][..], segment.offsets_for(5).collect::<Vec<_>>());
282 }
283
284 #[test]
285 fn many_offsets_missing() {
286 let segment = MissingSegment {
287 offset: 5,
288 length: 10,
289 };
290 assert_eq!(
291 &[5, 6, 7, 8, 9, 10, 11, 12, 13, 14][..],
292 segment.offsets_for(1).collect::<Vec<_>>()
293 );
294 }
295 }
296
297 #[test]
298 fn offsets_for_frame_size_five() {
299 let missing_segment = MissingSegment {
300 offset: 0,
301 length: 10,
302 };
303 assert_eq!(
304 &[0, 5][..],
305 missing_segment.offsets_for(5).collect::<Vec<_>>()
306 );
307 }
308
309 #[test]
310 fn offsets_for_frame_size_two() {
311 let missing_segment = MissingSegment {
312 offset: 0,
313 length: 10,
314 };
315 assert_eq!(
316 &[0, 2, 4, 6, 8][..],
317 missing_segment.offsets_for(2).collect::<Vec<_>>()
318 );
319 }
320
321 #[test]
322 fn fill_in_order() {
323 let mut buffer = BytesQuilt::with_capacity(20);
324 buffer.put_at(0, &[5_u8, 4, 3, 2, 1]).expect("write fail");
325 let bytes = buffer.into_inner();
326 assert_eq!(&[5_u8, 4, 3, 2, 1][..], bytes.as_ref())
327 }
328
329 #[test]
330 fn fill_in_order_produces_no_missing_segments() {
331 let mut buffer = BytesQuilt::with_capacity(20);
332 for offset in 0..20 {
333 buffer.put_at(offset, &[3]).expect("write fail");
334 }
335 assert!(buffer.missing_segments().next().is_none());
336 let bytes = buffer.into_inner();
337 assert_eq!(vec![3; 20], bytes.as_ref())
338 }
339
340 #[test]
341 fn detect_missing_segments() {
342 let mut buffer = BytesQuilt::with_capacity(20);
343 let missing_segment = buffer.put_at(5, &[5, 4, 3, 2, 1]).expect("write fail");
344 assert_eq!(
345 Some(MissingSegment {
346 offset: 0,
347 length: 5
348 }),
349 missing_segment
350 );
351 }
352
353 #[test]
354 fn detect_multiple_missing_segments() {
355 let mut buffer = BytesQuilt::with_capacity(20);
356 buffer.put_at(5, &[5, 4, 3, 2, 1]).expect("write fail");
357 buffer.put_at(15, &[1, 2, 3, 4, 5]).expect("write fail");
358 assert_eq!(
359 vec![
360 MissingSegment {
361 offset: 0,
362 length: 5
363 },
364 MissingSegment {
365 offset: 10,
366 length: 5
367 }
368 ],
369 buffer.missing_segments().collect::<Vec<_>>()
370 );
371 }
372
373 #[test]
374 fn detect_missing_segments_of_different_sizes() {
375 let mut buffer = BytesQuilt::with_capacity(40);
376 buffer.put_at(5, &[5, 4, 3, 2, 1]).expect("write fail");
377 buffer.put_at(15, &[1, 2, 3, 4, 5]).expect("write fail");
378 buffer.put_at(35, &[1, 2, 3, 4, 5]).expect("write fail");
379 assert_eq!(
380 vec![
381 MissingSegment {
382 offset: 0,
383 length: 5
384 },
385 MissingSegment {
386 offset: 10,
387 length: 5
388 },
389 MissingSegment {
390 offset: 20,
391 length: 15
392 }
393 ],
394 buffer.missing_segments().collect::<Vec<_>>()
395 );
396 }
397
398 #[test]
399 fn split_missing_segments_on_incomplete_writes() {
400 let mut buffer = BytesQuilt::with_capacity(40);
401 buffer.put_at(15, &[1, 2, 3, 4, 5]).expect("write fail");
402 assert_eq!(
403 vec![MissingSegment {
404 offset: 0,
405 length: 15
406 }],
407 buffer.missing_segments().collect::<Vec<_>>()
408 );
409 buffer.put_at(5, &[5, 4, 3, 2, 1]).expect("write fail");
410 assert_eq!(
411 vec![
412 MissingSegment {
413 offset: 0,
414 length: 5
415 },
416 MissingSegment {
417 offset: 10,
418 length: 5
419 },
420 ],
421 buffer.missing_segments().collect::<Vec<_>>()
422 );
423 }
424
425 #[test]
426 fn fill_out_of_order_start_aligned_segment() {
427 let mut buffer = BytesQuilt::with_capacity(20);
428 buffer.put_at(5, &[5, 4, 3, 2, 1]).expect("write fail");
429 buffer.put_at(0, &[10, 9, 8, 7, 6]).expect("write fail");
430 let bytes = buffer.into_inner();
431 assert_eq!(&[10, 9, 8, 7, 6, 5, 4, 3, 2, 1][..], bytes.as_ref())
432 }
433
434 #[test]
435 fn partial_fill_out_of_order_start_aligned_segment() {
436 let mut buffer = BytesQuilt::with_capacity(20);
437 buffer.put_at(4, &[2, 1]).expect("write fail");
438 buffer.put_at(0, &[6, 5]).expect("write fail");
439 buffer.put_at(2, &[4, 3]).expect("write fail");
440 let bytes = buffer.into_inner();
441 assert_eq!(&[6, 5, 4, 3, 2, 1][..], bytes.as_ref())
442 }
443
444 #[test]
445 fn fill_out_of_order_non_aligned_segment() {
446 let mut buffer = BytesQuilt::with_capacity(20);
447 buffer.put_at(4, &[2, 1]).expect("write fail");
448 buffer.put_at(2, &[4, 3]).expect("write fail");
449 buffer.put_at(0, &[6, 5]).expect("write fail");
450 let bytes = buffer.into_inner();
451 assert_eq!(&[6, 5, 4, 3, 2, 1][..], bytes.as_ref())
452 }
453
454 #[test]
455 fn partial_fill_out_of_order_non_aligned_segment() {
456 let mut buffer = BytesQuilt::with_capacity(20);
457 buffer.put_at(6, &[2, 1]).expect("write fail");
458 buffer.put_at(2, &[6, 5]).expect("write fail");
459 buffer.put_at(0, &[8, 7]).expect("write fail");
460 buffer.put_at(4, &[4, 3]).expect("write fail");
461 let bytes = buffer.into_inner();
462 assert_eq!(&[8, 7, 6, 5, 4, 3, 2, 1][..], bytes.as_ref())
463 }
464
465 #[test]
466 fn fails_to_overfill_a_missing_segment() {
467 let mut buffer = BytesQuilt::with_capacity(20);
468 buffer.put_at(4, &[2, 1]).expect("write fail");
469 assert_eq!(Err(Error::NotEnoughSpace), buffer.put_at(2, &[4, 3, 7, 8]));
470 }
471
472 #[test]
473 fn fails_to_overwrite_a_received_segment() {
474 let mut buffer = BytesQuilt::with_capacity(20);
475 buffer.put_at(4, &[2, 1]).expect("write fail");
476 buffer.put_at(2, &[4, 3]).expect("write fail");
477 assert_eq!(Err(Error::WouldOverwrite), buffer.put_at(2, &[7, 8]));
478 }
479
480 #[test]
481 fn fails_to_overwrite_a_received_segment_in_the_tail() {
482 let mut buffer = BytesQuilt::with_capacity(20);
483 buffer.put_at(4, &[2, 1]).expect("write fail");
484 assert_eq!(Err(Error::WouldOverwrite), buffer.put_at(4, &[7, 8]));
485 }
486}