ant_quic/connection/
assembler.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8use std::{
9    cmp::Ordering,
10    collections::{BinaryHeap, binary_heap::PeekMut},
11    mem,
12};
13
14use bytes::{Buf, Bytes, BytesMut};
15
16use crate::range_set::RangeSet;
17
18/// Helper to assemble unordered stream frames into an ordered stream
19#[derive(Debug, Default)]
20pub(super) struct Assembler {
21    state: State,
22    data: BinaryHeap<Buffer>,
23    /// Total number of buffered bytes, including duplicates in ordered mode.
24    buffered: usize,
25    /// Estimated number of allocated bytes, will never be less than `buffered`.
26    allocated: usize,
27    /// Number of bytes read by the application. When only ordered reads have been used, this is the
28    /// length of the contiguous prefix of the stream which has been consumed by the application,
29    /// aka the stream offset.
30    bytes_read: u64,
31    end: u64,
32}
33
34impl Assembler {
35    pub(super) fn new() -> Self {
36        Self::default()
37    }
38
39    /// Reset to the initial state
40    pub(super) fn reinit(&mut self) {
41        let old_data = mem::take(&mut self.data);
42        *self = Self::default();
43        self.data = old_data;
44        self.data.clear();
45    }
46
47    pub(super) fn ensure_ordering(&mut self, ordered: bool) -> Result<(), IllegalOrderedRead> {
48        if ordered && !self.state.is_ordered() {
49            return Err(IllegalOrderedRead);
50        } else if !ordered && self.state.is_ordered() {
51            // Enter unordered mode
52            if !self.data.is_empty() {
53                // Get rid of possible duplicates
54                self.defragment();
55            }
56            let mut recvd = RangeSet::new();
57            recvd.insert(0..self.bytes_read);
58            for chunk in &self.data {
59                recvd.insert(chunk.offset..chunk.offset + chunk.bytes.len() as u64);
60            }
61            self.state = State::Unordered { recvd };
62        }
63        Ok(())
64    }
65
66    /// Get the the next chunk
67    pub(super) fn read(&mut self, max_length: usize, ordered: bool) -> Option<Chunk> {
68        loop {
69            let mut chunk = self.data.peek_mut()?;
70
71            if ordered {
72                if chunk.offset > self.bytes_read {
73                    // Next chunk is after current read index
74                    return None;
75                } else if (chunk.offset + chunk.bytes.len() as u64) <= self.bytes_read {
76                    // Next chunk is useless as the read index is beyond its end
77                    self.buffered -= chunk.bytes.len();
78                    self.allocated -= chunk.allocation_size;
79                    PeekMut::pop(chunk);
80                    continue;
81                }
82
83                // Determine `start` and `len` of the slice of useful data in chunk
84                let start = (self.bytes_read - chunk.offset) as usize;
85                if start > 0 {
86                    chunk.bytes.advance(start);
87                    chunk.offset += start as u64;
88                    self.buffered -= start;
89                }
90            }
91
92            return Some(if max_length < chunk.bytes.len() {
93                self.bytes_read += max_length as u64;
94                let offset = chunk.offset;
95                chunk.offset += max_length as u64;
96                self.buffered -= max_length;
97                Chunk::new(offset, chunk.bytes.split_to(max_length))
98            } else {
99                self.bytes_read += chunk.bytes.len() as u64;
100                self.buffered -= chunk.bytes.len();
101                self.allocated -= chunk.allocation_size;
102                let chunk = PeekMut::pop(chunk);
103                Chunk::new(chunk.offset, chunk.bytes)
104            });
105        }
106    }
107
108    /// Copy fragmented chunk data to new chunks backed by a single buffer
109    ///
110    /// This makes sure we're not unnecessarily holding on to many larger allocations.
111    /// We merge contiguous chunks in the process of doing so.
112    fn defragment(&mut self) {
113        let new = BinaryHeap::with_capacity(self.data.len());
114        let old = mem::replace(&mut self.data, new);
115        let mut buffers = old.into_sorted_vec();
116        self.buffered = 0;
117        let mut fragmented_buffered = 0;
118        let mut offset = 0;
119        for chunk in buffers.iter_mut().rev() {
120            chunk.try_mark_defragment(offset);
121            let size = chunk.bytes.len();
122            offset = chunk.offset + size as u64;
123            self.buffered += size;
124            if !chunk.defragmented {
125                fragmented_buffered += size;
126            }
127        }
128        self.allocated = self.buffered;
129        let mut buffer = BytesMut::with_capacity(fragmented_buffered);
130        let mut offset = 0;
131        for chunk in buffers.into_iter().rev() {
132            if chunk.defragmented {
133                // bytes might be empty after try_mark_defragment
134                if !chunk.bytes.is_empty() {
135                    self.data.push(chunk);
136                }
137                continue;
138            }
139            // Overlap is resolved by try_mark_defragment
140            if chunk.offset != offset + (buffer.len() as u64) {
141                if !buffer.is_empty() {
142                    self.data
143                        .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
144                }
145                offset = chunk.offset;
146            }
147            buffer.extend_from_slice(&chunk.bytes);
148        }
149        if !buffer.is_empty() {
150            self.data
151                .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
152        }
153    }
154
155    // Note: If a packet contains many frames from the same stream, the estimated over-allocation
156    // will be much higher because we are counting the same allocation multiple times.
157    pub(super) fn insert(&mut self, mut offset: u64, mut bytes: Bytes, allocation_size: usize) {
158        debug_assert!(
159            bytes.len() <= allocation_size,
160            "allocation_size less than bytes.len(): {:?} < {:?}",
161            allocation_size,
162            bytes.len()
163        );
164        self.end = self.end.max(offset + bytes.len() as u64);
165        if let State::Unordered { ref mut recvd } = self.state {
166            // Discard duplicate data
167            for duplicate in recvd.replace(offset..offset + bytes.len() as u64) {
168                if duplicate.start > offset {
169                    let buffer = Buffer::new(
170                        offset,
171                        bytes.split_to((duplicate.start - offset) as usize),
172                        allocation_size,
173                    );
174                    self.buffered += buffer.bytes.len();
175                    self.allocated += buffer.allocation_size;
176                    self.data.push(buffer);
177                    offset = duplicate.start;
178                }
179                bytes.advance((duplicate.end - offset) as usize);
180                offset = duplicate.end;
181            }
182        } else if offset < self.bytes_read {
183            if (offset + bytes.len() as u64) <= self.bytes_read {
184                return;
185            } else {
186                let diff = self.bytes_read - offset;
187                offset += diff;
188                bytes.advance(diff as usize);
189            }
190        }
191
192        if bytes.is_empty() {
193            return;
194        }
195        let buffer = Buffer::new(offset, bytes, allocation_size);
196        self.buffered += buffer.bytes.len();
197        self.allocated += buffer.allocation_size;
198        self.data.push(buffer);
199        // `self.buffered` also counts duplicate bytes, therefore we use
200        // `self.end - self.bytes_read` as an upper bound of buffered unique
201        // bytes. This will cause a defragmentation if the amount of duplicate
202        // bytes exceedes a proportion of the receive window size.
203        let buffered = self.buffered.min((self.end - self.bytes_read) as usize);
204        let over_allocation = self.allocated - buffered;
205        // Rationale: on the one hand, we want to defragment rarely, ideally never
206        // in non-pathological scenarios. However, a pathological or malicious
207        // peer could send us one-byte frames, and since we use reference-counted
208        // buffers in order to prevent copying, this could result in keeping a lot
209        // of memory allocated. This limits over-allocation in proportion to the
210        // buffered data. The constants are chosen somewhat arbitrarily and try to
211        // balance between defragmentation overhead and over-allocation.
212        let threshold = 32768.max(buffered * 3 / 2);
213        if over_allocation > threshold {
214            self.defragment()
215        }
216    }
217
218    /// Number of bytes consumed by the application
219    pub(super) fn bytes_read(&self) -> u64 {
220        self.bytes_read
221    }
222
223    /// Discard all buffered data
224    pub(super) fn clear(&mut self) {
225        self.data.clear();
226        self.buffered = 0;
227        self.allocated = 0;
228    }
229}
230
231/// A chunk of data from the receive stream
232#[derive(Debug, PartialEq, Eq)]
233pub struct Chunk {
234    /// The offset in the stream
235    pub offset: u64,
236    /// The contents of the chunk
237    pub bytes: Bytes,
238}
239
240impl Chunk {
241    fn new(offset: u64, bytes: Bytes) -> Self {
242        Self { offset, bytes }
243    }
244}
245
246#[derive(Debug, Eq)]
247struct Buffer {
248    offset: u64,
249    bytes: Bytes,
250    /// Size of the allocation behind `bytes`, if `defragmented == false`.
251    /// Otherwise this will be set to `bytes.len()` by `try_mark_defragment`.
252    /// Will never be less than `bytes.len()`.
253    allocation_size: usize,
254    defragmented: bool,
255}
256
257impl Buffer {
258    /// Constructs a new fragmented Buffer
259    fn new(offset: u64, bytes: Bytes, allocation_size: usize) -> Self {
260        Self {
261            offset,
262            bytes,
263            allocation_size,
264            defragmented: false,
265        }
266    }
267
268    /// Constructs a new defragmented Buffer
269    fn new_defragmented(offset: u64, bytes: Bytes) -> Self {
270        let allocation_size = bytes.len();
271        Self {
272            offset,
273            bytes,
274            allocation_size,
275            defragmented: true,
276        }
277    }
278
279    /// Discards data before `offset` and flags `self` as defragmented if it has good utilization
280    fn try_mark_defragment(&mut self, offset: u64) {
281        let duplicate = offset.saturating_sub(self.offset) as usize;
282        self.offset = self.offset.max(offset);
283        if duplicate >= self.bytes.len() {
284            // All bytes are duplicate
285            self.bytes = Bytes::new();
286            self.defragmented = true;
287            self.allocation_size = 0;
288            return;
289        }
290        self.bytes.advance(duplicate);
291        // Make sure that fragmented buffers with high utilization become defragmented and
292        // defragmented buffers remain defragmented
293        self.defragmented = self.defragmented || self.bytes.len() * 6 / 5 >= self.allocation_size;
294        if self.defragmented {
295            // Make sure that defragmented buffers do not contribute to over-allocation
296            self.allocation_size = self.bytes.len();
297        }
298    }
299}
300
301impl Ord for Buffer {
302    // Invert ordering based on offset (max-heap, min offset first),
303    // prioritize longer chunks at the same offset.
304    fn cmp(&self, other: &Self) -> Ordering {
305        self.offset
306            .cmp(&other.offset)
307            .reverse()
308            .then(self.bytes.len().cmp(&other.bytes.len()))
309    }
310}
311
312impl PartialOrd for Buffer {
313    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
314        Some(self.cmp(other))
315    }
316}
317
318impl PartialEq for Buffer {
319    fn eq(&self, other: &Self) -> bool {
320        (self.offset, self.bytes.len()) == (other.offset, other.bytes.len())
321    }
322}
323
324#[derive(Debug)]
325enum State {
326    Ordered,
327    Unordered {
328        /// The set of offsets that have been received from the peer, including portions not yet
329        /// read by the application.
330        recvd: RangeSet,
331    },
332}
333
334impl State {
335    fn is_ordered(&self) -> bool {
336        matches!(self, Self::Ordered)
337    }
338}
339
340impl Default for State {
341    fn default() -> Self {
342        Self::Ordered
343    }
344}
345
346/// Error indicating that an ordered read was performed on a stream after an unordered read
347#[derive(Debug)]
348pub struct IllegalOrderedRead;
349
350#[cfg(test)]
351mod test {
352    use super::*;
353    use assert_matches::assert_matches;
354
355    #[test]
356    fn assemble_ordered() {
357        let mut x = Assembler::new();
358        assert_matches!(next(&mut x, 32), None);
359        x.insert(0, Bytes::from_static(b"123"), 3);
360        assert_matches!(next(&mut x, 1), Some(ref y) if &y[..] == b"1");
361        assert_matches!(next(&mut x, 3), Some(ref y) if &y[..] == b"23");
362        x.insert(3, Bytes::from_static(b"456"), 3);
363        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
364        x.insert(6, Bytes::from_static(b"789"), 3);
365        x.insert(9, Bytes::from_static(b"10"), 2);
366        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"789");
367        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"10");
368        assert_matches!(next(&mut x, 32), None);
369    }
370
371    #[test]
372    fn assemble_unordered() {
373        let mut x = Assembler::new();
374        x.ensure_ordering(false).unwrap();
375        x.insert(3, Bytes::from_static(b"456"), 3);
376        assert_matches!(next(&mut x, 32), None);
377        x.insert(0, Bytes::from_static(b"123"), 3);
378        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
379        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
380        assert_matches!(next(&mut x, 32), None);
381    }
382
383    #[test]
384    fn assemble_duplicate() {
385        let mut x = Assembler::new();
386        x.insert(0, Bytes::from_static(b"123"), 3);
387        x.insert(0, Bytes::from_static(b"123"), 3);
388        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
389        assert_matches!(next(&mut x, 32), None);
390    }
391
392    #[test]
393    fn assemble_duplicate_compact() {
394        let mut x = Assembler::new();
395        x.insert(0, Bytes::from_static(b"123"), 3);
396        x.insert(0, Bytes::from_static(b"123"), 3);
397        x.defragment();
398        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
399        assert_matches!(next(&mut x, 32), None);
400    }
401
402    #[test]
403    fn assemble_contained() {
404        let mut x = Assembler::new();
405        x.insert(0, Bytes::from_static(b"12345"), 5);
406        x.insert(1, Bytes::from_static(b"234"), 3);
407        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
408        assert_matches!(next(&mut x, 32), None);
409    }
410
411    #[test]
412    fn assemble_contained_compact() {
413        let mut x = Assembler::new();
414        x.insert(0, Bytes::from_static(b"12345"), 5);
415        x.insert(1, Bytes::from_static(b"234"), 3);
416        x.defragment();
417        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
418        assert_matches!(next(&mut x, 32), None);
419    }
420
421    #[test]
422    fn assemble_contains() {
423        let mut x = Assembler::new();
424        x.insert(1, Bytes::from_static(b"234"), 3);
425        x.insert(0, Bytes::from_static(b"12345"), 5);
426        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
427        assert_matches!(next(&mut x, 32), None);
428    }
429
430    #[test]
431    fn assemble_contains_compact() {
432        let mut x = Assembler::new();
433        x.insert(1, Bytes::from_static(b"234"), 3);
434        x.insert(0, Bytes::from_static(b"12345"), 5);
435        x.defragment();
436        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
437        assert_matches!(next(&mut x, 32), None);
438    }
439
440    #[test]
441    fn assemble_overlapping() {
442        let mut x = Assembler::new();
443        x.insert(0, Bytes::from_static(b"123"), 3);
444        x.insert(1, Bytes::from_static(b"234"), 3);
445        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
446        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"4");
447        assert_matches!(next(&mut x, 32), None);
448    }
449
450    #[test]
451    fn assemble_overlapping_compact() {
452        let mut x = Assembler::new();
453        x.insert(0, Bytes::from_static(b"123"), 4);
454        x.insert(1, Bytes::from_static(b"234"), 4);
455        x.defragment();
456        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
457        assert_matches!(next(&mut x, 32), None);
458    }
459
460    #[test]
461    fn assemble_complex() {
462        let mut x = Assembler::new();
463        x.insert(0, Bytes::from_static(b"1"), 1);
464        x.insert(2, Bytes::from_static(b"3"), 1);
465        x.insert(4, Bytes::from_static(b"5"), 1);
466        x.insert(0, Bytes::from_static(b"123456"), 6);
467        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
468        assert_matches!(next(&mut x, 32), None);
469    }
470
471    #[test]
472    fn assemble_complex_compact() {
473        let mut x = Assembler::new();
474        x.insert(0, Bytes::from_static(b"1"), 1);
475        x.insert(2, Bytes::from_static(b"3"), 1);
476        x.insert(4, Bytes::from_static(b"5"), 1);
477        x.insert(0, Bytes::from_static(b"123456"), 6);
478        x.defragment();
479        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
480        assert_matches!(next(&mut x, 32), None);
481    }
482
483    #[test]
484    fn assemble_old() {
485        let mut x = Assembler::new();
486        x.insert(0, Bytes::from_static(b"1234"), 4);
487        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
488        x.insert(0, Bytes::from_static(b"1234"), 4);
489        assert_matches!(next(&mut x, 32), None);
490    }
491
492    #[test]
493    fn compact() {
494        let mut x = Assembler::new();
495        x.insert(0, Bytes::from_static(b"abc"), 4);
496        x.insert(3, Bytes::from_static(b"def"), 4);
497        x.insert(9, Bytes::from_static(b"jkl"), 4);
498        x.insert(12, Bytes::from_static(b"mno"), 4);
499        x.defragment();
500        assert_eq!(
501            next_unordered(&mut x),
502            Chunk::new(0, Bytes::from_static(b"abcdef"))
503        );
504        assert_eq!(
505            next_unordered(&mut x),
506            Chunk::new(9, Bytes::from_static(b"jklmno"))
507        );
508    }
509
510    #[test]
511    fn defrag_with_missing_prefix() {
512        let mut x = Assembler::new();
513        x.insert(3, Bytes::from_static(b"def"), 3);
514        x.defragment();
515        assert_eq!(
516            next_unordered(&mut x),
517            Chunk::new(3, Bytes::from_static(b"def"))
518        );
519    }
520
521    #[test]
522    fn defrag_read_chunk() {
523        let mut x = Assembler::new();
524        x.insert(3, Bytes::from_static(b"def"), 4);
525        x.insert(0, Bytes::from_static(b"abc"), 4);
526        x.insert(7, Bytes::from_static(b"hij"), 4);
527        x.insert(11, Bytes::from_static(b"lmn"), 4);
528        x.defragment();
529        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"abcdef");
530        x.insert(5, Bytes::from_static(b"fghijklmn"), 9);
531        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"ghijklmn");
532        x.insert(13, Bytes::from_static(b"nopq"), 4);
533        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"opq");
534        x.insert(15, Bytes::from_static(b"pqrs"), 4);
535        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"rs");
536        assert_matches!(x.read(usize::MAX, true), None);
537    }
538
539    #[test]
540    fn unordered_happy_path() {
541        let mut x = Assembler::new();
542        x.ensure_ordering(false).unwrap();
543        x.insert(0, Bytes::from_static(b"abc"), 3);
544        assert_eq!(
545            next_unordered(&mut x),
546            Chunk::new(0, Bytes::from_static(b"abc"))
547        );
548        assert_eq!(x.read(usize::MAX, false), None);
549        x.insert(3, Bytes::from_static(b"def"), 3);
550        assert_eq!(
551            next_unordered(&mut x),
552            Chunk::new(3, Bytes::from_static(b"def"))
553        );
554        assert_eq!(x.read(usize::MAX, false), None);
555    }
556
557    #[test]
558    fn unordered_dedup() {
559        let mut x = Assembler::new();
560        x.ensure_ordering(false).unwrap();
561        x.insert(3, Bytes::from_static(b"def"), 3);
562        assert_eq!(
563            next_unordered(&mut x),
564            Chunk::new(3, Bytes::from_static(b"def"))
565        );
566        assert_eq!(x.read(usize::MAX, false), None);
567        x.insert(0, Bytes::from_static(b"a"), 1);
568        x.insert(0, Bytes::from_static(b"abcdefghi"), 9);
569        x.insert(0, Bytes::from_static(b"abcd"), 4);
570        assert_eq!(
571            next_unordered(&mut x),
572            Chunk::new(0, Bytes::from_static(b"a"))
573        );
574        assert_eq!(
575            next_unordered(&mut x),
576            Chunk::new(1, Bytes::from_static(b"bc"))
577        );
578        assert_eq!(
579            next_unordered(&mut x),
580            Chunk::new(6, Bytes::from_static(b"ghi"))
581        );
582        assert_eq!(x.read(usize::MAX, false), None);
583        x.insert(8, Bytes::from_static(b"ijkl"), 4);
584        assert_eq!(
585            next_unordered(&mut x),
586            Chunk::new(9, Bytes::from_static(b"jkl"))
587        );
588        assert_eq!(x.read(usize::MAX, false), None);
589        x.insert(12, Bytes::from_static(b"mno"), 3);
590        assert_eq!(
591            next_unordered(&mut x),
592            Chunk::new(12, Bytes::from_static(b"mno"))
593        );
594        assert_eq!(x.read(usize::MAX, false), None);
595        x.insert(2, Bytes::from_static(b"cde"), 3);
596        assert_eq!(x.read(usize::MAX, false), None);
597    }
598
599    #[test]
600    fn chunks_dedup() {
601        let mut x = Assembler::new();
602        x.insert(3, Bytes::from_static(b"def"), 3);
603        assert_eq!(x.read(usize::MAX, true), None);
604        x.insert(0, Bytes::from_static(b"a"), 1);
605        x.insert(1, Bytes::from_static(b"bcdefghi"), 9);
606        x.insert(0, Bytes::from_static(b"abcd"), 4);
607        assert_eq!(
608            x.read(usize::MAX, true),
609            Some(Chunk::new(0, Bytes::from_static(b"abcd")))
610        );
611        assert_eq!(
612            x.read(usize::MAX, true),
613            Some(Chunk::new(4, Bytes::from_static(b"efghi")))
614        );
615        assert_eq!(x.read(usize::MAX, true), None);
616        x.insert(8, Bytes::from_static(b"ijkl"), 4);
617        assert_eq!(
618            x.read(usize::MAX, true),
619            Some(Chunk::new(9, Bytes::from_static(b"jkl")))
620        );
621        assert_eq!(x.read(usize::MAX, true), None);
622        x.insert(12, Bytes::from_static(b"mno"), 3);
623        assert_eq!(
624            x.read(usize::MAX, true),
625            Some(Chunk::new(12, Bytes::from_static(b"mno")))
626        );
627        assert_eq!(x.read(usize::MAX, true), None);
628        x.insert(2, Bytes::from_static(b"cde"), 3);
629        assert_eq!(x.read(usize::MAX, true), None);
630    }
631
632    #[test]
633    fn ordered_eager_discard() {
634        let mut x = Assembler::new();
635        x.insert(0, Bytes::from_static(b"abc"), 3);
636        assert_eq!(x.data.len(), 1);
637        assert_eq!(
638            x.read(usize::MAX, true),
639            Some(Chunk::new(0, Bytes::from_static(b"abc")))
640        );
641        x.insert(0, Bytes::from_static(b"ab"), 2);
642        assert_eq!(x.data.len(), 0);
643        x.insert(2, Bytes::from_static(b"cd"), 2);
644        assert_eq!(
645            x.data.peek(),
646            Some(&Buffer::new(3, Bytes::from_static(b"d"), 2))
647        );
648    }
649
650    #[test]
651    fn ordered_insert_unordered_read() {
652        let mut x = Assembler::new();
653        x.insert(0, Bytes::from_static(b"abc"), 3);
654        x.insert(0, Bytes::from_static(b"abc"), 3);
655        x.ensure_ordering(false).unwrap();
656        assert_eq!(
657            x.read(3, false),
658            Some(Chunk::new(0, Bytes::from_static(b"abc")))
659        );
660        assert_eq!(x.read(3, false), None);
661    }
662
663    fn next_unordered(x: &mut Assembler) -> Chunk {
664        x.read(usize::MAX, false).unwrap()
665    }
666
667    fn next(x: &mut Assembler, size: usize) -> Option<Bytes> {
668        x.read(size, true).map(|chunk| chunk.bytes)
669    }
670}