1use std::{
9 cmp::Ordering,
10 collections::{BinaryHeap, binary_heap::PeekMut},
11 mem,
12};
13
14use bytes::{Buf, Bytes, BytesMut};
15
16use crate::range_set::RangeSet;
17
18#[derive(Debug, Default)]
20pub(super) struct Assembler {
21 state: State,
22 data: BinaryHeap<Buffer>,
23 buffered: usize,
25 allocated: usize,
27 bytes_read: u64,
31 end: u64,
32}
33
34impl Assembler {
35 pub(super) fn new() -> Self {
36 Self::default()
37 }
38
39 pub(super) fn reinit(&mut self) {
41 let old_data = mem::take(&mut self.data);
42 *self = Self::default();
43 self.data = old_data;
44 self.data.clear();
45 }
46
47 pub(super) fn ensure_ordering(&mut self, ordered: bool) -> Result<(), IllegalOrderedRead> {
48 if ordered && !self.state.is_ordered() {
49 return Err(IllegalOrderedRead);
50 } else if !ordered && self.state.is_ordered() {
51 if !self.data.is_empty() {
53 self.defragment();
55 }
56 let mut recvd = RangeSet::new();
57 recvd.insert(0..self.bytes_read);
58 for chunk in &self.data {
59 recvd.insert(chunk.offset..chunk.offset + chunk.bytes.len() as u64);
60 }
61 self.state = State::Unordered { recvd };
62 }
63 Ok(())
64 }
65
66 pub(super) fn read(&mut self, max_length: usize, ordered: bool) -> Option<Chunk> {
68 loop {
69 let mut chunk = self.data.peek_mut()?;
70
71 if ordered {
72 if chunk.offset > self.bytes_read {
73 return None;
75 } else if (chunk.offset + chunk.bytes.len() as u64) <= self.bytes_read {
76 self.buffered -= chunk.bytes.len();
78 self.allocated -= chunk.allocation_size;
79 PeekMut::pop(chunk);
80 continue;
81 }
82
83 let start = (self.bytes_read - chunk.offset) as usize;
85 if start > 0 {
86 chunk.bytes.advance(start);
87 chunk.offset += start as u64;
88 self.buffered -= start;
89 }
90 }
91
92 return Some(if max_length < chunk.bytes.len() {
93 self.bytes_read += max_length as u64;
94 let offset = chunk.offset;
95 chunk.offset += max_length as u64;
96 self.buffered -= max_length;
97 Chunk::new(offset, chunk.bytes.split_to(max_length))
98 } else {
99 self.bytes_read += chunk.bytes.len() as u64;
100 self.buffered -= chunk.bytes.len();
101 self.allocated -= chunk.allocation_size;
102 let chunk = PeekMut::pop(chunk);
103 Chunk::new(chunk.offset, chunk.bytes)
104 });
105 }
106 }
107
108 fn defragment(&mut self) {
113 let new = BinaryHeap::with_capacity(self.data.len());
114 let old = mem::replace(&mut self.data, new);
115 let mut buffers = old.into_sorted_vec();
116 self.buffered = 0;
117 let mut fragmented_buffered = 0;
118 let mut offset = 0;
119 for chunk in buffers.iter_mut().rev() {
120 chunk.try_mark_defragment(offset);
121 let size = chunk.bytes.len();
122 offset = chunk.offset + size as u64;
123 self.buffered += size;
124 if !chunk.defragmented {
125 fragmented_buffered += size;
126 }
127 }
128 self.allocated = self.buffered;
129 let mut buffer = BytesMut::with_capacity(fragmented_buffered);
130 let mut offset = 0;
131 for chunk in buffers.into_iter().rev() {
132 if chunk.defragmented {
133 if !chunk.bytes.is_empty() {
135 self.data.push(chunk);
136 }
137 continue;
138 }
139 if chunk.offset != offset + (buffer.len() as u64) {
141 if !buffer.is_empty() {
142 self.data
143 .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
144 }
145 offset = chunk.offset;
146 }
147 buffer.extend_from_slice(&chunk.bytes);
148 }
149 if !buffer.is_empty() {
150 self.data
151 .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
152 }
153 }
154
155 pub(super) fn insert(&mut self, mut offset: u64, mut bytes: Bytes, allocation_size: usize) {
158 debug_assert!(
159 bytes.len() <= allocation_size,
160 "allocation_size less than bytes.len(): {:?} < {:?}",
161 allocation_size,
162 bytes.len()
163 );
164 self.end = self.end.max(offset + bytes.len() as u64);
165 if let State::Unordered { ref mut recvd } = self.state {
166 for duplicate in recvd.replace(offset..offset + bytes.len() as u64) {
168 if duplicate.start > offset {
169 let buffer = Buffer::new(
170 offset,
171 bytes.split_to((duplicate.start - offset) as usize),
172 allocation_size,
173 );
174 self.buffered += buffer.bytes.len();
175 self.allocated += buffer.allocation_size;
176 self.data.push(buffer);
177 offset = duplicate.start;
178 }
179 bytes.advance((duplicate.end - offset) as usize);
180 offset = duplicate.end;
181 }
182 } else if offset < self.bytes_read {
183 if (offset + bytes.len() as u64) <= self.bytes_read {
184 return;
185 } else {
186 let diff = self.bytes_read - offset;
187 offset += diff;
188 bytes.advance(diff as usize);
189 }
190 }
191
192 if bytes.is_empty() {
193 return;
194 }
195 let buffer = Buffer::new(offset, bytes, allocation_size);
196 self.buffered += buffer.bytes.len();
197 self.allocated += buffer.allocation_size;
198 self.data.push(buffer);
199 let buffered = self.buffered.min((self.end - self.bytes_read) as usize);
204 let over_allocation = self.allocated - buffered;
205 let threshold = 32768.max(buffered * 3 / 2);
213 if over_allocation > threshold {
214 self.defragment()
215 }
216 }
217
218 pub(super) fn bytes_read(&self) -> u64 {
220 self.bytes_read
221 }
222
223 pub(super) fn clear(&mut self) {
225 self.data.clear();
226 self.buffered = 0;
227 self.allocated = 0;
228 }
229}
230
231#[derive(Debug, PartialEq, Eq)]
233pub struct Chunk {
234 pub offset: u64,
236 pub bytes: Bytes,
238}
239
240impl Chunk {
241 fn new(offset: u64, bytes: Bytes) -> Self {
242 Self { offset, bytes }
243 }
244}
245
246#[derive(Debug, Eq)]
247struct Buffer {
248 offset: u64,
249 bytes: Bytes,
250 allocation_size: usize,
254 defragmented: bool,
255}
256
257impl Buffer {
258 fn new(offset: u64, bytes: Bytes, allocation_size: usize) -> Self {
260 Self {
261 offset,
262 bytes,
263 allocation_size,
264 defragmented: false,
265 }
266 }
267
268 fn new_defragmented(offset: u64, bytes: Bytes) -> Self {
270 let allocation_size = bytes.len();
271 Self {
272 offset,
273 bytes,
274 allocation_size,
275 defragmented: true,
276 }
277 }
278
279 fn try_mark_defragment(&mut self, offset: u64) {
281 let duplicate = offset.saturating_sub(self.offset) as usize;
282 self.offset = self.offset.max(offset);
283 if duplicate >= self.bytes.len() {
284 self.bytes = Bytes::new();
286 self.defragmented = true;
287 self.allocation_size = 0;
288 return;
289 }
290 self.bytes.advance(duplicate);
291 self.defragmented = self.defragmented || self.bytes.len() * 6 / 5 >= self.allocation_size;
294 if self.defragmented {
295 self.allocation_size = self.bytes.len();
297 }
298 }
299}
300
301impl Ord for Buffer {
302 fn cmp(&self, other: &Self) -> Ordering {
305 self.offset
306 .cmp(&other.offset)
307 .reverse()
308 .then(self.bytes.len().cmp(&other.bytes.len()))
309 }
310}
311
312impl PartialOrd for Buffer {
313 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
314 Some(self.cmp(other))
315 }
316}
317
318impl PartialEq for Buffer {
319 fn eq(&self, other: &Self) -> bool {
320 (self.offset, self.bytes.len()) == (other.offset, other.bytes.len())
321 }
322}
323
324#[derive(Debug, Default)]
325enum State {
326 #[default]
327 Ordered,
328 Unordered {
329 recvd: RangeSet,
332 },
333}
334
335impl State {
336 fn is_ordered(&self) -> bool {
337 matches!(self, Self::Ordered)
338 }
339}
340
341#[derive(Debug)]
343pub struct IllegalOrderedRead;
344
345#[cfg(test)]
346mod test {
347 use super::*;
348 use assert_matches::assert_matches;
349
350 #[test]
351 fn assemble_ordered() {
352 let mut x = Assembler::new();
353 assert_matches!(next(&mut x, 32), None);
354 x.insert(0, Bytes::from_static(b"123"), 3);
355 assert_matches!(next(&mut x, 1), Some(ref y) if &y[..] == b"1");
356 assert_matches!(next(&mut x, 3), Some(ref y) if &y[..] == b"23");
357 x.insert(3, Bytes::from_static(b"456"), 3);
358 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
359 x.insert(6, Bytes::from_static(b"789"), 3);
360 x.insert(9, Bytes::from_static(b"10"), 2);
361 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"789");
362 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"10");
363 assert_matches!(next(&mut x, 32), None);
364 }
365
366 #[test]
367 fn assemble_unordered() {
368 let mut x = Assembler::new();
369 x.ensure_ordering(false).unwrap();
370 x.insert(3, Bytes::from_static(b"456"), 3);
371 assert_matches!(next(&mut x, 32), None);
372 x.insert(0, Bytes::from_static(b"123"), 3);
373 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
374 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
375 assert_matches!(next(&mut x, 32), None);
376 }
377
378 #[test]
379 fn assemble_duplicate() {
380 let mut x = Assembler::new();
381 x.insert(0, Bytes::from_static(b"123"), 3);
382 x.insert(0, Bytes::from_static(b"123"), 3);
383 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
384 assert_matches!(next(&mut x, 32), None);
385 }
386
387 #[test]
388 fn assemble_duplicate_compact() {
389 let mut x = Assembler::new();
390 x.insert(0, Bytes::from_static(b"123"), 3);
391 x.insert(0, Bytes::from_static(b"123"), 3);
392 x.defragment();
393 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
394 assert_matches!(next(&mut x, 32), None);
395 }
396
397 #[test]
398 fn assemble_contained() {
399 let mut x = Assembler::new();
400 x.insert(0, Bytes::from_static(b"12345"), 5);
401 x.insert(1, Bytes::from_static(b"234"), 3);
402 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
403 assert_matches!(next(&mut x, 32), None);
404 }
405
406 #[test]
407 fn assemble_contained_compact() {
408 let mut x = Assembler::new();
409 x.insert(0, Bytes::from_static(b"12345"), 5);
410 x.insert(1, Bytes::from_static(b"234"), 3);
411 x.defragment();
412 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
413 assert_matches!(next(&mut x, 32), None);
414 }
415
416 #[test]
417 fn assemble_contains() {
418 let mut x = Assembler::new();
419 x.insert(1, Bytes::from_static(b"234"), 3);
420 x.insert(0, Bytes::from_static(b"12345"), 5);
421 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
422 assert_matches!(next(&mut x, 32), None);
423 }
424
425 #[test]
426 fn assemble_contains_compact() {
427 let mut x = Assembler::new();
428 x.insert(1, Bytes::from_static(b"234"), 3);
429 x.insert(0, Bytes::from_static(b"12345"), 5);
430 x.defragment();
431 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
432 assert_matches!(next(&mut x, 32), None);
433 }
434
435 #[test]
436 fn assemble_overlapping() {
437 let mut x = Assembler::new();
438 x.insert(0, Bytes::from_static(b"123"), 3);
439 x.insert(1, Bytes::from_static(b"234"), 3);
440 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
441 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"4");
442 assert_matches!(next(&mut x, 32), None);
443 }
444
445 #[test]
446 fn assemble_overlapping_compact() {
447 let mut x = Assembler::new();
448 x.insert(0, Bytes::from_static(b"123"), 4);
449 x.insert(1, Bytes::from_static(b"234"), 4);
450 x.defragment();
451 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
452 assert_matches!(next(&mut x, 32), None);
453 }
454
455 #[test]
456 fn assemble_complex() {
457 let mut x = Assembler::new();
458 x.insert(0, Bytes::from_static(b"1"), 1);
459 x.insert(2, Bytes::from_static(b"3"), 1);
460 x.insert(4, Bytes::from_static(b"5"), 1);
461 x.insert(0, Bytes::from_static(b"123456"), 6);
462 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
463 assert_matches!(next(&mut x, 32), None);
464 }
465
466 #[test]
467 fn assemble_complex_compact() {
468 let mut x = Assembler::new();
469 x.insert(0, Bytes::from_static(b"1"), 1);
470 x.insert(2, Bytes::from_static(b"3"), 1);
471 x.insert(4, Bytes::from_static(b"5"), 1);
472 x.insert(0, Bytes::from_static(b"123456"), 6);
473 x.defragment();
474 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
475 assert_matches!(next(&mut x, 32), None);
476 }
477
478 #[test]
479 fn assemble_old() {
480 let mut x = Assembler::new();
481 x.insert(0, Bytes::from_static(b"1234"), 4);
482 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
483 x.insert(0, Bytes::from_static(b"1234"), 4);
484 assert_matches!(next(&mut x, 32), None);
485 }
486
487 #[test]
488 fn compact() {
489 let mut x = Assembler::new();
490 x.insert(0, Bytes::from_static(b"abc"), 4);
491 x.insert(3, Bytes::from_static(b"def"), 4);
492 x.insert(9, Bytes::from_static(b"jkl"), 4);
493 x.insert(12, Bytes::from_static(b"mno"), 4);
494 x.defragment();
495 assert_eq!(
496 next_unordered(&mut x),
497 Chunk::new(0, Bytes::from_static(b"abcdef"))
498 );
499 assert_eq!(
500 next_unordered(&mut x),
501 Chunk::new(9, Bytes::from_static(b"jklmno"))
502 );
503 }
504
505 #[test]
506 fn defrag_with_missing_prefix() {
507 let mut x = Assembler::new();
508 x.insert(3, Bytes::from_static(b"def"), 3);
509 x.defragment();
510 assert_eq!(
511 next_unordered(&mut x),
512 Chunk::new(3, Bytes::from_static(b"def"))
513 );
514 }
515
516 #[test]
517 fn defrag_read_chunk() {
518 let mut x = Assembler::new();
519 x.insert(3, Bytes::from_static(b"def"), 4);
520 x.insert(0, Bytes::from_static(b"abc"), 4);
521 x.insert(7, Bytes::from_static(b"hij"), 4);
522 x.insert(11, Bytes::from_static(b"lmn"), 4);
523 x.defragment();
524 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"abcdef");
525 x.insert(5, Bytes::from_static(b"fghijklmn"), 9);
526 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"ghijklmn");
527 x.insert(13, Bytes::from_static(b"nopq"), 4);
528 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"opq");
529 x.insert(15, Bytes::from_static(b"pqrs"), 4);
530 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"rs");
531 assert_matches!(x.read(usize::MAX, true), None);
532 }
533
534 #[test]
535 fn unordered_happy_path() {
536 let mut x = Assembler::new();
537 x.ensure_ordering(false).unwrap();
538 x.insert(0, Bytes::from_static(b"abc"), 3);
539 assert_eq!(
540 next_unordered(&mut x),
541 Chunk::new(0, Bytes::from_static(b"abc"))
542 );
543 assert_eq!(x.read(usize::MAX, false), None);
544 x.insert(3, Bytes::from_static(b"def"), 3);
545 assert_eq!(
546 next_unordered(&mut x),
547 Chunk::new(3, Bytes::from_static(b"def"))
548 );
549 assert_eq!(x.read(usize::MAX, false), None);
550 }
551
552 #[test]
553 fn unordered_dedup() {
554 let mut x = Assembler::new();
555 x.ensure_ordering(false).unwrap();
556 x.insert(3, Bytes::from_static(b"def"), 3);
557 assert_eq!(
558 next_unordered(&mut x),
559 Chunk::new(3, Bytes::from_static(b"def"))
560 );
561 assert_eq!(x.read(usize::MAX, false), None);
562 x.insert(0, Bytes::from_static(b"a"), 1);
563 x.insert(0, Bytes::from_static(b"abcdefghi"), 9);
564 x.insert(0, Bytes::from_static(b"abcd"), 4);
565 assert_eq!(
566 next_unordered(&mut x),
567 Chunk::new(0, Bytes::from_static(b"a"))
568 );
569 assert_eq!(
570 next_unordered(&mut x),
571 Chunk::new(1, Bytes::from_static(b"bc"))
572 );
573 assert_eq!(
574 next_unordered(&mut x),
575 Chunk::new(6, Bytes::from_static(b"ghi"))
576 );
577 assert_eq!(x.read(usize::MAX, false), None);
578 x.insert(8, Bytes::from_static(b"ijkl"), 4);
579 assert_eq!(
580 next_unordered(&mut x),
581 Chunk::new(9, Bytes::from_static(b"jkl"))
582 );
583 assert_eq!(x.read(usize::MAX, false), None);
584 x.insert(12, Bytes::from_static(b"mno"), 3);
585 assert_eq!(
586 next_unordered(&mut x),
587 Chunk::new(12, Bytes::from_static(b"mno"))
588 );
589 assert_eq!(x.read(usize::MAX, false), None);
590 x.insert(2, Bytes::from_static(b"cde"), 3);
591 assert_eq!(x.read(usize::MAX, false), None);
592 }
593
594 #[test]
595 fn chunks_dedup() {
596 let mut x = Assembler::new();
597 x.insert(3, Bytes::from_static(b"def"), 3);
598 assert_eq!(x.read(usize::MAX, true), None);
599 x.insert(0, Bytes::from_static(b"a"), 1);
600 x.insert(1, Bytes::from_static(b"bcdefghi"), 9);
601 x.insert(0, Bytes::from_static(b"abcd"), 4);
602 assert_eq!(
603 x.read(usize::MAX, true),
604 Some(Chunk::new(0, Bytes::from_static(b"abcd")))
605 );
606 assert_eq!(
607 x.read(usize::MAX, true),
608 Some(Chunk::new(4, Bytes::from_static(b"efghi")))
609 );
610 assert_eq!(x.read(usize::MAX, true), None);
611 x.insert(8, Bytes::from_static(b"ijkl"), 4);
612 assert_eq!(
613 x.read(usize::MAX, true),
614 Some(Chunk::new(9, Bytes::from_static(b"jkl")))
615 );
616 assert_eq!(x.read(usize::MAX, true), None);
617 x.insert(12, Bytes::from_static(b"mno"), 3);
618 assert_eq!(
619 x.read(usize::MAX, true),
620 Some(Chunk::new(12, Bytes::from_static(b"mno")))
621 );
622 assert_eq!(x.read(usize::MAX, true), None);
623 x.insert(2, Bytes::from_static(b"cde"), 3);
624 assert_eq!(x.read(usize::MAX, true), None);
625 }
626
627 #[test]
628 fn ordered_eager_discard() {
629 let mut x = Assembler::new();
630 x.insert(0, Bytes::from_static(b"abc"), 3);
631 assert_eq!(x.data.len(), 1);
632 assert_eq!(
633 x.read(usize::MAX, true),
634 Some(Chunk::new(0, Bytes::from_static(b"abc")))
635 );
636 x.insert(0, Bytes::from_static(b"ab"), 2);
637 assert_eq!(x.data.len(), 0);
638 x.insert(2, Bytes::from_static(b"cd"), 2);
639 assert_eq!(
640 x.data.peek(),
641 Some(&Buffer::new(3, Bytes::from_static(b"d"), 2))
642 );
643 }
644
645 #[test]
646 fn ordered_insert_unordered_read() {
647 let mut x = Assembler::new();
648 x.insert(0, Bytes::from_static(b"abc"), 3);
649 x.insert(0, Bytes::from_static(b"abc"), 3);
650 x.ensure_ordering(false).unwrap();
651 assert_eq!(
652 x.read(3, false),
653 Some(Chunk::new(0, Bytes::from_static(b"abc")))
654 );
655 assert_eq!(x.read(3, false), None);
656 }
657
658 fn next_unordered(x: &mut Assembler) -> Chunk {
659 x.read(usize::MAX, false).unwrap()
660 }
661
662 fn next(x: &mut Assembler, size: usize) -> Option<Bytes> {
663 x.read(size, true).map(|chunk| chunk.bytes)
664 }
665}