bytes_quilt/
lib.rs

1#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
2
3//! Provides a data structure for tracking random-access writes to a buffer.
4
5use std::mem;
6
7use bytes::{BufMut, BytesMut};
8
9use thiserror::Error;
10
11/// The error type for writing to the `BytesQuilt`.
12#[derive(Copy, Clone, Debug, Error, PartialEq, Eq)]
13pub enum Error {
14    /// Attempted to write past the end of the current buffer.
15    #[error("Not enough space in buffer segment")]
16    NotEnoughSpace,
17
18    /// Attempted to write more data than would fit into the missing segment.
19    #[error("Would overwrite previously received data")]
20    WouldOverwrite,
21}
22
23/// A byte buffer that tracks the locations of random-access writes.
24#[derive(Debug)]
25pub struct BytesQuilt {
26    tail_offset: usize,
27    segments: Vec<Segment>,
28    buffer_tail: BytesMut,
29}
30
31#[derive(Copy, Clone, Debug, PartialEq)]
32enum Status {
33    Missing,
34    Received,
35}
36
37#[derive(Clone, Debug, PartialEq)]
38struct Segment {
39    status: Status,
40    offset: usize,
41    buffer: BytesMut,
42}
43
44/// A description of a segment in the buffer that hasn't been written to.
45#[derive(Copy, Clone, Debug, PartialEq, Eq)]
46pub struct MissingSegment {
47    offset: usize,
48    length: usize,
49}
50
51impl Default for BytesQuilt {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57impl BytesQuilt {
58    /// Creates a new `BytesQuilt` with default capacity.
59    pub fn new() -> Self {
60        Self {
61            tail_offset: 0,
62            segments: Vec::new(),
63            buffer_tail: BytesMut::new(),
64        }
65    }
66
67    /// Creates a new `BytesQuilt` with the specified capacity.
68    pub fn with_capacity(capacity: usize) -> Self {
69        Self {
70            tail_offset: 0,
71            segments: Vec::new(),
72            buffer_tail: BytesMut::with_capacity(capacity),
73        }
74    }
75
76    fn write_offset_at_index(
77        &mut self,
78        index: usize,
79        offset: usize,
80        bytes: &[u8],
81    ) -> Result<(), Error> {
82        use std::cmp::Ordering;
83        let segment = &mut self.segments[index];
84        if segment.status == Status::Received {
85            return Err(Error::WouldOverwrite);
86        }
87        match segment.buffer.capacity().cmp(&bytes.len()) {
88            // TODO[ZS 2023-08-25]: This probably shouldn't even be an error,
89            // we should just grow the buffer.
90            Ordering::Less => return Err(Error::NotEnoughSpace),
91            Ordering::Equal => {
92                segment.status = Status::Received;
93                segment.buffer.put(bytes);
94            }
95            Ordering::Greater => {
96                segment.status = Status::Received;
97                segment.buffer.put(bytes);
98                let new_relative_offset = segment.buffer.len();
99                let remaining_segment = segment.buffer.split_off(new_relative_offset);
100                self.segments.insert(
101                    index + 1,
102                    Segment::missing(offset + new_relative_offset, remaining_segment),
103                );
104            }
105        };
106        Ok(())
107    }
108
109    /// Transfer bytes into `self` from `src` at `offset`.
110    ///
111    /// The `offset` is given from the beginning of the buffer.
112    pub fn put_at(&mut self, offset: usize, src: &[u8]) -> Result<Option<MissingSegment>, Error> {
113        let mut missing_segment = None;
114        debug_assert!(
115            self.segments
116                .first()
117                .map(|segment| segment.offset == 0)
118                .unwrap_or(true),
119            "first segment offset should be zero, found {:?}",
120            self.segments.first()
121        );
122        if self.tail_offset > offset {
123            // We should have a missing segment that this offset can write into
124            match self
125                .segments
126                .binary_search_by_key(&offset, |segment| segment.offset)
127            {
128                Ok(index) => {
129                    self.write_offset_at_index(index, offset, src)?;
130                }
131                Err(index) => {
132                    // This indexing might be safe because the first
133                    // entry in the segments vec should always start
134                    // with `offset = 0`
135                    let segment = &mut self.segments[index - 1];
136                    let to_write_buffer = segment.buffer.split_off(offset - segment.offset);
137                    let segment = Segment::missing(offset, to_write_buffer);
138                    self.segments.insert(index, segment);
139                    self.write_offset_at_index(index, offset, src)?;
140                }
141            };
142            return Ok(None);
143        } else if self.tail_offset + self.buffer_tail.len() < offset {
144            if !self.buffer_tail.is_empty() {
145                let head_offset = self.tail_offset;
146                let head_received_bytes = self.buffer_tail.split();
147                self.tail_offset += head_received_bytes.len();
148                self.segments
149                    .push(Segment::received(head_offset, head_received_bytes));
150            }
151
152            let head_offset = self.tail_offset;
153            self.tail_offset = offset;
154
155            let tail_bytes = self.buffer_tail.split_off(offset - head_offset);
156            let head_bytes = mem::replace(&mut self.buffer_tail, tail_bytes);
157
158            // This is true because of the conditional split above to
159            // identify and store a received segment
160            debug_assert!(head_bytes.is_empty());
161            let segment = Segment::missing(head_offset, head_bytes);
162            missing_segment = segment.missing_segment();
163            self.segments.push(segment);
164        } else if self.tail_offset == offset && !self.buffer_tail.is_empty() {
165            // Supposed to write at beginning of tail, but tail is not empty!
166            return Err(Error::WouldOverwrite);
167        }
168        self.buffer_tail.put(src);
169        Ok(missing_segment)
170    }
171
172    /// An iterator over each `MissingSegment` in the `BytesQuilt`.
173    pub fn missing_segments(&self) -> impl '_ + Iterator<Item = MissingSegment> {
174        self.segments.iter().filter_map(Segment::missing_segment)
175    }
176
177    /// Reassemble the inner `BytesMut` and return it.
178    pub fn into_inner(self) -> BytesMut {
179        let mut segments = self.segments.into_iter();
180        if let Some(segment) = segments.next() {
181            // TODO[ZS 2023-08-25]: initialize these unwritten
182            // sections with zeroes.
183            debug_assert!(
184                !segment.is_missing(),
185                "a segment at offset {} of size {} is missing",
186                segment.offset,
187                segment.buffer.len(),
188            );
189            let mut buffer: BytesMut = segment.buffer;
190            for segment in segments {
191                debug_assert!(
192                    !segment.is_missing(),
193                    "a segment at offset {} of size {} is missing",
194                    segment.offset,
195                    segment.buffer.len(),
196                );
197                buffer.unsplit(segment.buffer);
198            }
199            buffer.unsplit(self.buffer_tail);
200            return buffer;
201        }
202        self.buffer_tail
203    }
204}
205
206impl Segment {
207    fn missing(offset: usize, buffer: BytesMut) -> Self {
208        Self {
209            status: Status::Missing,
210            offset,
211            buffer,
212        }
213    }
214
215    fn received(offset: usize, buffer: BytesMut) -> Self {
216        Self {
217            status: Status::Received,
218            offset,
219            buffer,
220        }
221    }
222
223    fn is_missing(&self) -> bool {
224        self.status == Status::Missing
225    }
226
227    fn missing_segment(&self) -> Option<MissingSegment> {
228        match self.status {
229            Status::Missing => Some(MissingSegment {
230                offset: self.offset,
231                length: self.buffer.capacity(),
232            }),
233            Status::Received => None,
234        }
235    }
236}
237
238impl MissingSegment {
239    /// Returns an iterator of all the absolute offsets for byte
240    /// segments of a specific size that can fit within this
241    /// `MissingSegment`.
242    pub fn offsets_for(self, frame_size: usize) -> impl Iterator<Item = usize> {
243        let offset = self.offset;
244        let number_of_frames = self.length / frame_size;
245        (0..number_of_frames).map(move |index| (index * frame_size) + offset)
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    mod missing_segment {
254        use super::*;
255
256        #[test]
257        fn one_offset_missing() {
258            let segment = MissingSegment {
259                offset: 0,
260                length: 10,
261            };
262            assert_eq!(&[0][..], segment.offsets_for(10).collect::<Vec<_>>());
263            let segment = MissingSegment {
264                offset: 10,
265                length: 10,
266            };
267            assert_eq!(&[10][..], segment.offsets_for(10).collect::<Vec<_>>());
268        }
269
270        #[test]
271        fn two_offsets_missing() {
272            let segment = MissingSegment {
273                offset: 0,
274                length: 10,
275            };
276            assert_eq!(&[0, 5][..], segment.offsets_for(5).collect::<Vec<_>>());
277            let segment = MissingSegment {
278                offset: 10,
279                length: 10,
280            };
281            assert_eq!(&[10, 15][..], segment.offsets_for(5).collect::<Vec<_>>());
282        }
283
284        #[test]
285        fn many_offsets_missing() {
286            let segment = MissingSegment {
287                offset: 5,
288                length: 10,
289            };
290            assert_eq!(
291                &[5, 6, 7, 8, 9, 10, 11, 12, 13, 14][..],
292                segment.offsets_for(1).collect::<Vec<_>>()
293            );
294        }
295    }
296
297    #[test]
298    fn offsets_for_frame_size_five() {
299        let missing_segment = MissingSegment {
300            offset: 0,
301            length: 10,
302        };
303        assert_eq!(
304            &[0, 5][..],
305            missing_segment.offsets_for(5).collect::<Vec<_>>()
306        );
307    }
308
309    #[test]
310    fn offsets_for_frame_size_two() {
311        let missing_segment = MissingSegment {
312            offset: 0,
313            length: 10,
314        };
315        assert_eq!(
316            &[0, 2, 4, 6, 8][..],
317            missing_segment.offsets_for(2).collect::<Vec<_>>()
318        );
319    }
320
321    #[test]
322    fn fill_in_order() {
323        let mut buffer = BytesQuilt::with_capacity(20);
324        buffer.put_at(0, &[5_u8, 4, 3, 2, 1]).expect("write fail");
325        let bytes = buffer.into_inner();
326        assert_eq!(&[5_u8, 4, 3, 2, 1][..], bytes.as_ref())
327    }
328
329    #[test]
330    fn fill_in_order_produces_no_missing_segments() {
331        let mut buffer = BytesQuilt::with_capacity(20);
332        for offset in 0..20 {
333            buffer.put_at(offset, &[3]).expect("write fail");
334        }
335        assert!(buffer.missing_segments().next().is_none());
336        let bytes = buffer.into_inner();
337        assert_eq!(vec![3; 20], bytes.as_ref())
338    }
339
340    #[test]
341    fn detect_missing_segments() {
342        let mut buffer = BytesQuilt::with_capacity(20);
343        let missing_segment = buffer.put_at(5, &[5, 4, 3, 2, 1]).expect("write fail");
344        assert_eq!(
345            Some(MissingSegment {
346                offset: 0,
347                length: 5
348            }),
349            missing_segment
350        );
351    }
352
353    #[test]
354    fn detect_multiple_missing_segments() {
355        let mut buffer = BytesQuilt::with_capacity(20);
356        buffer.put_at(5, &[5, 4, 3, 2, 1]).expect("write fail");
357        buffer.put_at(15, &[1, 2, 3, 4, 5]).expect("write fail");
358        assert_eq!(
359            vec![
360                MissingSegment {
361                    offset: 0,
362                    length: 5
363                },
364                MissingSegment {
365                    offset: 10,
366                    length: 5
367                }
368            ],
369            buffer.missing_segments().collect::<Vec<_>>()
370        );
371    }
372
373    #[test]
374    fn detect_missing_segments_of_different_sizes() {
375        let mut buffer = BytesQuilt::with_capacity(40);
376        buffer.put_at(5, &[5, 4, 3, 2, 1]).expect("write fail");
377        buffer.put_at(15, &[1, 2, 3, 4, 5]).expect("write fail");
378        buffer.put_at(35, &[1, 2, 3, 4, 5]).expect("write fail");
379        assert_eq!(
380            vec![
381                MissingSegment {
382                    offset: 0,
383                    length: 5
384                },
385                MissingSegment {
386                    offset: 10,
387                    length: 5
388                },
389                MissingSegment {
390                    offset: 20,
391                    length: 15
392                }
393            ],
394            buffer.missing_segments().collect::<Vec<_>>()
395        );
396    }
397
398    #[test]
399    fn split_missing_segments_on_incomplete_writes() {
400        let mut buffer = BytesQuilt::with_capacity(40);
401        buffer.put_at(15, &[1, 2, 3, 4, 5]).expect("write fail");
402        assert_eq!(
403            vec![MissingSegment {
404                offset: 0,
405                length: 15
406            }],
407            buffer.missing_segments().collect::<Vec<_>>()
408        );
409        buffer.put_at(5, &[5, 4, 3, 2, 1]).expect("write fail");
410        assert_eq!(
411            vec![
412                MissingSegment {
413                    offset: 0,
414                    length: 5
415                },
416                MissingSegment {
417                    offset: 10,
418                    length: 5
419                },
420            ],
421            buffer.missing_segments().collect::<Vec<_>>()
422        );
423    }
424
425    #[test]
426    fn fill_out_of_order_start_aligned_segment() {
427        let mut buffer = BytesQuilt::with_capacity(20);
428        buffer.put_at(5, &[5, 4, 3, 2, 1]).expect("write fail");
429        buffer.put_at(0, &[10, 9, 8, 7, 6]).expect("write fail");
430        let bytes = buffer.into_inner();
431        assert_eq!(&[10, 9, 8, 7, 6, 5, 4, 3, 2, 1][..], bytes.as_ref())
432    }
433
434    #[test]
435    fn partial_fill_out_of_order_start_aligned_segment() {
436        let mut buffer = BytesQuilt::with_capacity(20);
437        buffer.put_at(4, &[2, 1]).expect("write fail");
438        buffer.put_at(0, &[6, 5]).expect("write fail");
439        buffer.put_at(2, &[4, 3]).expect("write fail");
440        let bytes = buffer.into_inner();
441        assert_eq!(&[6, 5, 4, 3, 2, 1][..], bytes.as_ref())
442    }
443
444    #[test]
445    fn fill_out_of_order_non_aligned_segment() {
446        let mut buffer = BytesQuilt::with_capacity(20);
447        buffer.put_at(4, &[2, 1]).expect("write fail");
448        buffer.put_at(2, &[4, 3]).expect("write fail");
449        buffer.put_at(0, &[6, 5]).expect("write fail");
450        let bytes = buffer.into_inner();
451        assert_eq!(&[6, 5, 4, 3, 2, 1][..], bytes.as_ref())
452    }
453
454    #[test]
455    fn partial_fill_out_of_order_non_aligned_segment() {
456        let mut buffer = BytesQuilt::with_capacity(20);
457        buffer.put_at(6, &[2, 1]).expect("write fail");
458        buffer.put_at(2, &[6, 5]).expect("write fail");
459        buffer.put_at(0, &[8, 7]).expect("write fail");
460        buffer.put_at(4, &[4, 3]).expect("write fail");
461        let bytes = buffer.into_inner();
462        assert_eq!(&[8, 7, 6, 5, 4, 3, 2, 1][..], bytes.as_ref())
463    }
464
465    #[test]
466    fn fails_to_overfill_a_missing_segment() {
467        let mut buffer = BytesQuilt::with_capacity(20);
468        buffer.put_at(4, &[2, 1]).expect("write fail");
469        assert_eq!(Err(Error::NotEnoughSpace), buffer.put_at(2, &[4, 3, 7, 8]));
470    }
471
472    #[test]
473    fn fails_to_overwrite_a_received_segment() {
474        let mut buffer = BytesQuilt::with_capacity(20);
475        buffer.put_at(4, &[2, 1]).expect("write fail");
476        buffer.put_at(2, &[4, 3]).expect("write fail");
477        assert_eq!(Err(Error::WouldOverwrite), buffer.put_at(2, &[7, 8]));
478    }
479
480    #[test]
481    fn fails_to_overwrite_a_received_segment_in_the_tail() {
482        let mut buffer = BytesQuilt::with_capacity(20);
483        buffer.put_at(4, &[2, 1]).expect("write fail");
484        assert_eq!(Err(Error::WouldOverwrite), buffer.put_at(4, &[7, 8]));
485    }
486}