1use std::{
9 cmp::Ordering,
10 collections::{BinaryHeap, binary_heap::PeekMut},
11 mem,
12};
13
14use bytes::{Buf, Bytes, BytesMut};
15
16use crate::range_set::RangeSet;
17
18#[derive(Debug, Default)]
20pub(super) struct Assembler {
21 state: State,
22 data: BinaryHeap<Buffer>,
23 buffered: usize,
25 allocated: usize,
27 bytes_read: u64,
31 end: u64,
32}
33
34impl Assembler {
35 pub(super) fn new() -> Self {
36 Self::default()
37 }
38
39 pub(super) fn reinit(&mut self) {
41 let old_data = mem::take(&mut self.data);
42 *self = Self::default();
43 self.data = old_data;
44 self.data.clear();
45 }
46
47 pub(super) fn ensure_ordering(&mut self, ordered: bool) -> Result<(), IllegalOrderedRead> {
48 if ordered && !self.state.is_ordered() {
49 return Err(IllegalOrderedRead);
50 } else if !ordered && self.state.is_ordered() {
51 if !self.data.is_empty() {
53 self.defragment();
55 }
56 let mut recvd = RangeSet::new();
57 recvd.insert(0..self.bytes_read);
58 for chunk in &self.data {
59 recvd.insert(chunk.offset..chunk.offset + chunk.bytes.len() as u64);
60 }
61 self.state = State::Unordered { recvd };
62 }
63 Ok(())
64 }
65
66 pub(super) fn read(&mut self, max_length: usize, ordered: bool) -> Option<Chunk> {
68 loop {
69 let mut chunk = self.data.peek_mut()?;
70
71 if ordered {
72 if chunk.offset > self.bytes_read {
73 return None;
75 } else if (chunk.offset + chunk.bytes.len() as u64) <= self.bytes_read {
76 self.buffered -= chunk.bytes.len();
78 self.allocated -= chunk.allocation_size;
79 PeekMut::pop(chunk);
80 continue;
81 }
82
83 let start = (self.bytes_read - chunk.offset) as usize;
85 if start > 0 {
86 chunk.bytes.advance(start);
87 chunk.offset += start as u64;
88 self.buffered -= start;
89 }
90 }
91
92 return Some(if max_length < chunk.bytes.len() {
93 self.bytes_read += max_length as u64;
94 let offset = chunk.offset;
95 chunk.offset += max_length as u64;
96 self.buffered -= max_length;
97 Chunk::new(offset, chunk.bytes.split_to(max_length))
98 } else {
99 self.bytes_read += chunk.bytes.len() as u64;
100 self.buffered -= chunk.bytes.len();
101 self.allocated -= chunk.allocation_size;
102 let chunk = PeekMut::pop(chunk);
103 Chunk::new(chunk.offset, chunk.bytes)
104 });
105 }
106 }
107
108 fn defragment(&mut self) {
113 let new = BinaryHeap::with_capacity(self.data.len());
114 let old = mem::replace(&mut self.data, new);
115 let mut buffers = old.into_sorted_vec();
116 self.buffered = 0;
117 let mut fragmented_buffered = 0;
118 let mut offset = 0;
119 for chunk in buffers.iter_mut().rev() {
120 chunk.try_mark_defragment(offset);
121 let size = chunk.bytes.len();
122 offset = chunk.offset + size as u64;
123 self.buffered += size;
124 if !chunk.defragmented {
125 fragmented_buffered += size;
126 }
127 }
128 self.allocated = self.buffered;
129 let mut buffer = BytesMut::with_capacity(fragmented_buffered);
130 let mut offset = 0;
131 for chunk in buffers.into_iter().rev() {
132 if chunk.defragmented {
133 if !chunk.bytes.is_empty() {
135 self.data.push(chunk);
136 }
137 continue;
138 }
139 if chunk.offset != offset + (buffer.len() as u64) {
141 if !buffer.is_empty() {
142 self.data
143 .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
144 }
145 offset = chunk.offset;
146 }
147 buffer.extend_from_slice(&chunk.bytes);
148 }
149 if !buffer.is_empty() {
150 self.data
151 .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
152 }
153 }
154
155 pub(super) fn insert(&mut self, mut offset: u64, mut bytes: Bytes, allocation_size: usize) {
158 debug_assert!(
159 bytes.len() <= allocation_size,
160 "allocation_size less than bytes.len(): {:?} < {:?}",
161 allocation_size,
162 bytes.len()
163 );
164 self.end = self.end.max(offset + bytes.len() as u64);
165 if let State::Unordered { ref mut recvd } = self.state {
166 for duplicate in recvd.replace(offset..offset + bytes.len() as u64) {
168 if duplicate.start > offset {
169 let buffer = Buffer::new(
170 offset,
171 bytes.split_to((duplicate.start - offset) as usize),
172 allocation_size,
173 );
174 self.buffered += buffer.bytes.len();
175 self.allocated += buffer.allocation_size;
176 self.data.push(buffer);
177 offset = duplicate.start;
178 }
179 bytes.advance((duplicate.end - offset) as usize);
180 offset = duplicate.end;
181 }
182 } else if offset < self.bytes_read {
183 if (offset + bytes.len() as u64) <= self.bytes_read {
184 return;
185 } else {
186 let diff = self.bytes_read - offset;
187 offset += diff;
188 bytes.advance(diff as usize);
189 }
190 }
191
192 if bytes.is_empty() {
193 return;
194 }
195 let buffer = Buffer::new(offset, bytes, allocation_size);
196 self.buffered += buffer.bytes.len();
197 self.allocated += buffer.allocation_size;
198 self.data.push(buffer);
199 let buffered = self.buffered.min((self.end - self.bytes_read) as usize);
204 let over_allocation = self.allocated - buffered;
205 let threshold = 32768.max(buffered * 3 / 2);
213 if over_allocation > threshold {
214 self.defragment()
215 }
216 }
217
218 pub(super) fn bytes_read(&self) -> u64 {
220 self.bytes_read
221 }
222
223 pub(super) fn clear(&mut self) {
225 self.data.clear();
226 self.buffered = 0;
227 self.allocated = 0;
228 }
229}
230
231#[derive(Debug, PartialEq, Eq)]
233pub struct Chunk {
234 pub offset: u64,
236 pub bytes: Bytes,
238}
239
240impl Chunk {
241 fn new(offset: u64, bytes: Bytes) -> Self {
242 Self { offset, bytes }
243 }
244}
245
246#[derive(Debug, Eq)]
247struct Buffer {
248 offset: u64,
249 bytes: Bytes,
250 allocation_size: usize,
254 defragmented: bool,
255}
256
257impl Buffer {
258 fn new(offset: u64, bytes: Bytes, allocation_size: usize) -> Self {
260 Self {
261 offset,
262 bytes,
263 allocation_size,
264 defragmented: false,
265 }
266 }
267
268 fn new_defragmented(offset: u64, bytes: Bytes) -> Self {
270 let allocation_size = bytes.len();
271 Self {
272 offset,
273 bytes,
274 allocation_size,
275 defragmented: true,
276 }
277 }
278
279 fn try_mark_defragment(&mut self, offset: u64) {
281 let duplicate = offset.saturating_sub(self.offset) as usize;
282 self.offset = self.offset.max(offset);
283 if duplicate >= self.bytes.len() {
284 self.bytes = Bytes::new();
286 self.defragmented = true;
287 self.allocation_size = 0;
288 return;
289 }
290 self.bytes.advance(duplicate);
291 self.defragmented = self.defragmented || self.bytes.len() * 6 / 5 >= self.allocation_size;
294 if self.defragmented {
295 self.allocation_size = self.bytes.len();
297 }
298 }
299}
300
301impl Ord for Buffer {
302 fn cmp(&self, other: &Self) -> Ordering {
305 self.offset
306 .cmp(&other.offset)
307 .reverse()
308 .then(self.bytes.len().cmp(&other.bytes.len()))
309 }
310}
311
312impl PartialOrd for Buffer {
313 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
314 Some(self.cmp(other))
315 }
316}
317
318impl PartialEq for Buffer {
319 fn eq(&self, other: &Self) -> bool {
320 (self.offset, self.bytes.len()) == (other.offset, other.bytes.len())
321 }
322}
323
324#[derive(Debug)]
325enum State {
326 Ordered,
327 Unordered {
328 recvd: RangeSet,
331 },
332}
333
334impl State {
335 fn is_ordered(&self) -> bool {
336 matches!(self, Self::Ordered)
337 }
338}
339
340impl Default for State {
341 fn default() -> Self {
342 Self::Ordered
343 }
344}
345
346#[derive(Debug)]
348pub struct IllegalOrderedRead;
349
350#[cfg(test)]
351mod test {
352 use super::*;
353 use assert_matches::assert_matches;
354
355 #[test]
356 fn assemble_ordered() {
357 let mut x = Assembler::new();
358 assert_matches!(next(&mut x, 32), None);
359 x.insert(0, Bytes::from_static(b"123"), 3);
360 assert_matches!(next(&mut x, 1), Some(ref y) if &y[..] == b"1");
361 assert_matches!(next(&mut x, 3), Some(ref y) if &y[..] == b"23");
362 x.insert(3, Bytes::from_static(b"456"), 3);
363 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
364 x.insert(6, Bytes::from_static(b"789"), 3);
365 x.insert(9, Bytes::from_static(b"10"), 2);
366 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"789");
367 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"10");
368 assert_matches!(next(&mut x, 32), None);
369 }
370
371 #[test]
372 fn assemble_unordered() {
373 let mut x = Assembler::new();
374 x.ensure_ordering(false).unwrap();
375 x.insert(3, Bytes::from_static(b"456"), 3);
376 assert_matches!(next(&mut x, 32), None);
377 x.insert(0, Bytes::from_static(b"123"), 3);
378 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
379 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
380 assert_matches!(next(&mut x, 32), None);
381 }
382
383 #[test]
384 fn assemble_duplicate() {
385 let mut x = Assembler::new();
386 x.insert(0, Bytes::from_static(b"123"), 3);
387 x.insert(0, Bytes::from_static(b"123"), 3);
388 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
389 assert_matches!(next(&mut x, 32), None);
390 }
391
392 #[test]
393 fn assemble_duplicate_compact() {
394 let mut x = Assembler::new();
395 x.insert(0, Bytes::from_static(b"123"), 3);
396 x.insert(0, Bytes::from_static(b"123"), 3);
397 x.defragment();
398 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
399 assert_matches!(next(&mut x, 32), None);
400 }
401
402 #[test]
403 fn assemble_contained() {
404 let mut x = Assembler::new();
405 x.insert(0, Bytes::from_static(b"12345"), 5);
406 x.insert(1, Bytes::from_static(b"234"), 3);
407 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
408 assert_matches!(next(&mut x, 32), None);
409 }
410
411 #[test]
412 fn assemble_contained_compact() {
413 let mut x = Assembler::new();
414 x.insert(0, Bytes::from_static(b"12345"), 5);
415 x.insert(1, Bytes::from_static(b"234"), 3);
416 x.defragment();
417 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
418 assert_matches!(next(&mut x, 32), None);
419 }
420
421 #[test]
422 fn assemble_contains() {
423 let mut x = Assembler::new();
424 x.insert(1, Bytes::from_static(b"234"), 3);
425 x.insert(0, Bytes::from_static(b"12345"), 5);
426 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
427 assert_matches!(next(&mut x, 32), None);
428 }
429
430 #[test]
431 fn assemble_contains_compact() {
432 let mut x = Assembler::new();
433 x.insert(1, Bytes::from_static(b"234"), 3);
434 x.insert(0, Bytes::from_static(b"12345"), 5);
435 x.defragment();
436 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
437 assert_matches!(next(&mut x, 32), None);
438 }
439
440 #[test]
441 fn assemble_overlapping() {
442 let mut x = Assembler::new();
443 x.insert(0, Bytes::from_static(b"123"), 3);
444 x.insert(1, Bytes::from_static(b"234"), 3);
445 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
446 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"4");
447 assert_matches!(next(&mut x, 32), None);
448 }
449
450 #[test]
451 fn assemble_overlapping_compact() {
452 let mut x = Assembler::new();
453 x.insert(0, Bytes::from_static(b"123"), 4);
454 x.insert(1, Bytes::from_static(b"234"), 4);
455 x.defragment();
456 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
457 assert_matches!(next(&mut x, 32), None);
458 }
459
460 #[test]
461 fn assemble_complex() {
462 let mut x = Assembler::new();
463 x.insert(0, Bytes::from_static(b"1"), 1);
464 x.insert(2, Bytes::from_static(b"3"), 1);
465 x.insert(4, Bytes::from_static(b"5"), 1);
466 x.insert(0, Bytes::from_static(b"123456"), 6);
467 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
468 assert_matches!(next(&mut x, 32), None);
469 }
470
471 #[test]
472 fn assemble_complex_compact() {
473 let mut x = Assembler::new();
474 x.insert(0, Bytes::from_static(b"1"), 1);
475 x.insert(2, Bytes::from_static(b"3"), 1);
476 x.insert(4, Bytes::from_static(b"5"), 1);
477 x.insert(0, Bytes::from_static(b"123456"), 6);
478 x.defragment();
479 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
480 assert_matches!(next(&mut x, 32), None);
481 }
482
483 #[test]
484 fn assemble_old() {
485 let mut x = Assembler::new();
486 x.insert(0, Bytes::from_static(b"1234"), 4);
487 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
488 x.insert(0, Bytes::from_static(b"1234"), 4);
489 assert_matches!(next(&mut x, 32), None);
490 }
491
492 #[test]
493 fn compact() {
494 let mut x = Assembler::new();
495 x.insert(0, Bytes::from_static(b"abc"), 4);
496 x.insert(3, Bytes::from_static(b"def"), 4);
497 x.insert(9, Bytes::from_static(b"jkl"), 4);
498 x.insert(12, Bytes::from_static(b"mno"), 4);
499 x.defragment();
500 assert_eq!(
501 next_unordered(&mut x),
502 Chunk::new(0, Bytes::from_static(b"abcdef"))
503 );
504 assert_eq!(
505 next_unordered(&mut x),
506 Chunk::new(9, Bytes::from_static(b"jklmno"))
507 );
508 }
509
510 #[test]
511 fn defrag_with_missing_prefix() {
512 let mut x = Assembler::new();
513 x.insert(3, Bytes::from_static(b"def"), 3);
514 x.defragment();
515 assert_eq!(
516 next_unordered(&mut x),
517 Chunk::new(3, Bytes::from_static(b"def"))
518 );
519 }
520
521 #[test]
522 fn defrag_read_chunk() {
523 let mut x = Assembler::new();
524 x.insert(3, Bytes::from_static(b"def"), 4);
525 x.insert(0, Bytes::from_static(b"abc"), 4);
526 x.insert(7, Bytes::from_static(b"hij"), 4);
527 x.insert(11, Bytes::from_static(b"lmn"), 4);
528 x.defragment();
529 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"abcdef");
530 x.insert(5, Bytes::from_static(b"fghijklmn"), 9);
531 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"ghijklmn");
532 x.insert(13, Bytes::from_static(b"nopq"), 4);
533 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"opq");
534 x.insert(15, Bytes::from_static(b"pqrs"), 4);
535 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"rs");
536 assert_matches!(x.read(usize::MAX, true), None);
537 }
538
539 #[test]
540 fn unordered_happy_path() {
541 let mut x = Assembler::new();
542 x.ensure_ordering(false).unwrap();
543 x.insert(0, Bytes::from_static(b"abc"), 3);
544 assert_eq!(
545 next_unordered(&mut x),
546 Chunk::new(0, Bytes::from_static(b"abc"))
547 );
548 assert_eq!(x.read(usize::MAX, false), None);
549 x.insert(3, Bytes::from_static(b"def"), 3);
550 assert_eq!(
551 next_unordered(&mut x),
552 Chunk::new(3, Bytes::from_static(b"def"))
553 );
554 assert_eq!(x.read(usize::MAX, false), None);
555 }
556
557 #[test]
558 fn unordered_dedup() {
559 let mut x = Assembler::new();
560 x.ensure_ordering(false).unwrap();
561 x.insert(3, Bytes::from_static(b"def"), 3);
562 assert_eq!(
563 next_unordered(&mut x),
564 Chunk::new(3, Bytes::from_static(b"def"))
565 );
566 assert_eq!(x.read(usize::MAX, false), None);
567 x.insert(0, Bytes::from_static(b"a"), 1);
568 x.insert(0, Bytes::from_static(b"abcdefghi"), 9);
569 x.insert(0, Bytes::from_static(b"abcd"), 4);
570 assert_eq!(
571 next_unordered(&mut x),
572 Chunk::new(0, Bytes::from_static(b"a"))
573 );
574 assert_eq!(
575 next_unordered(&mut x),
576 Chunk::new(1, Bytes::from_static(b"bc"))
577 );
578 assert_eq!(
579 next_unordered(&mut x),
580 Chunk::new(6, Bytes::from_static(b"ghi"))
581 );
582 assert_eq!(x.read(usize::MAX, false), None);
583 x.insert(8, Bytes::from_static(b"ijkl"), 4);
584 assert_eq!(
585 next_unordered(&mut x),
586 Chunk::new(9, Bytes::from_static(b"jkl"))
587 );
588 assert_eq!(x.read(usize::MAX, false), None);
589 x.insert(12, Bytes::from_static(b"mno"), 3);
590 assert_eq!(
591 next_unordered(&mut x),
592 Chunk::new(12, Bytes::from_static(b"mno"))
593 );
594 assert_eq!(x.read(usize::MAX, false), None);
595 x.insert(2, Bytes::from_static(b"cde"), 3);
596 assert_eq!(x.read(usize::MAX, false), None);
597 }
598
599 #[test]
600 fn chunks_dedup() {
601 let mut x = Assembler::new();
602 x.insert(3, Bytes::from_static(b"def"), 3);
603 assert_eq!(x.read(usize::MAX, true), None);
604 x.insert(0, Bytes::from_static(b"a"), 1);
605 x.insert(1, Bytes::from_static(b"bcdefghi"), 9);
606 x.insert(0, Bytes::from_static(b"abcd"), 4);
607 assert_eq!(
608 x.read(usize::MAX, true),
609 Some(Chunk::new(0, Bytes::from_static(b"abcd")))
610 );
611 assert_eq!(
612 x.read(usize::MAX, true),
613 Some(Chunk::new(4, Bytes::from_static(b"efghi")))
614 );
615 assert_eq!(x.read(usize::MAX, true), None);
616 x.insert(8, Bytes::from_static(b"ijkl"), 4);
617 assert_eq!(
618 x.read(usize::MAX, true),
619 Some(Chunk::new(9, Bytes::from_static(b"jkl")))
620 );
621 assert_eq!(x.read(usize::MAX, true), None);
622 x.insert(12, Bytes::from_static(b"mno"), 3);
623 assert_eq!(
624 x.read(usize::MAX, true),
625 Some(Chunk::new(12, Bytes::from_static(b"mno")))
626 );
627 assert_eq!(x.read(usize::MAX, true), None);
628 x.insert(2, Bytes::from_static(b"cde"), 3);
629 assert_eq!(x.read(usize::MAX, true), None);
630 }
631
632 #[test]
633 fn ordered_eager_discard() {
634 let mut x = Assembler::new();
635 x.insert(0, Bytes::from_static(b"abc"), 3);
636 assert_eq!(x.data.len(), 1);
637 assert_eq!(
638 x.read(usize::MAX, true),
639 Some(Chunk::new(0, Bytes::from_static(b"abc")))
640 );
641 x.insert(0, Bytes::from_static(b"ab"), 2);
642 assert_eq!(x.data.len(), 0);
643 x.insert(2, Bytes::from_static(b"cd"), 2);
644 assert_eq!(
645 x.data.peek(),
646 Some(&Buffer::new(3, Bytes::from_static(b"d"), 2))
647 );
648 }
649
650 #[test]
651 fn ordered_insert_unordered_read() {
652 let mut x = Assembler::new();
653 x.insert(0, Bytes::from_static(b"abc"), 3);
654 x.insert(0, Bytes::from_static(b"abc"), 3);
655 x.ensure_ordering(false).unwrap();
656 assert_eq!(
657 x.read(3, false),
658 Some(Chunk::new(0, Bytes::from_static(b"abc")))
659 );
660 assert_eq!(x.read(3, false), None);
661 }
662
663 fn next_unordered(x: &mut Assembler) -> Chunk {
664 x.read(usize::MAX, false).unwrap()
665 }
666
667 fn next(x: &mut Assembler, size: usize) -> Option<Bytes> {
668 x.read(size, true).map(|chunk| chunk.bytes)
669 }
670}