1use crate::{
7 buffer::{
8 reader::storage::{Chunk, Infallible as _},
9 Error, Reader,
10 },
11 varint::VarInt,
12};
13use alloc::collections::{vec_deque, VecDeque};
14use bytes::BytesMut;
15
16mod duplex;
17mod probe;
18mod reader;
19mod request;
20mod slot;
21mod writer;
22
23#[cfg(test)]
24mod tests;
25
26use request::Request;
27use slot::Slot;
28
29const MIN_BUFFER_ALLOCATION_SIZE: usize = 4096;
33
34const UNKNOWN_FINAL_SIZE: u64 = u64::MAX;
39
40#[derive(Debug, PartialEq, Default)]
85pub struct Reassembler {
86 slots: VecDeque<Slot>,
87 cursors: Cursors,
88}
89
90#[derive(Clone, Copy, Debug, PartialEq)]
91struct Cursors {
92 start_offset: u64,
93 max_recv_offset: u64,
94 final_offset: u64,
95}
96
97impl Cursors {
98 #[inline]
99 fn final_size(&self) -> Option<u64> {
100 if self.final_offset == UNKNOWN_FINAL_SIZE {
101 None
102 } else {
103 Some(self.final_offset)
104 }
105 }
106}
107
108impl Default for Cursors {
109 #[inline]
110 fn default() -> Self {
111 Self {
112 start_offset: 0,
113 max_recv_offset: 0,
114 final_offset: UNKNOWN_FINAL_SIZE,
115 }
116 }
117}
118
119impl Reassembler {
120 #[inline]
122 pub fn new() -> Reassembler {
123 Self::default()
124 }
125
126 #[inline]
128 pub fn is_writing_complete(&self) -> bool {
129 self.final_size()
130 .is_some_and(|len| self.total_received_len() == len)
131 }
132
133 #[inline]
135 pub fn is_reading_complete(&self) -> bool {
136 self.final_size() == Some(self.cursors.start_offset)
137 }
138
139 #[inline]
141 pub fn final_size(&self) -> Option<u64> {
142 self.cursors.final_size()
143 }
144
145 #[inline]
149 pub fn len(&self) -> usize {
150 self.report().0
151 }
152
153 #[inline]
155 pub fn is_empty(&self) -> bool {
156 if let Some(slot) = self.slots.front() {
157 !slot.is_occupied(self.cursors.start_offset)
158 } else {
159 true
160 }
161 }
162
163 #[inline]
165 pub fn report(&self) -> (usize, usize) {
166 let mut bytes = 0;
167 let mut chunks = 0;
168 for chunk in self.iter() {
169 bytes += chunk.len();
170 chunks += 1;
171 }
172 (bytes, chunks)
173 }
174
175 #[inline]
177 pub fn write_at(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> {
178 let mut request = Request::new(offset, data, false)?;
179 self.write_reader(&mut request)?;
180 Ok(())
181 }
182
183 #[inline]
185 pub fn write_at_fin(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> {
186 let mut request = Request::new(offset, data, true)?;
187 self.write_reader(&mut request)?;
188 Ok(())
189 }
190
191 #[inline]
192 pub fn write_reader<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
193 where
194 R: Reader + ?Sized,
195 {
196 reader.skip_until(self.current_offset())?;
198
199 let snapshot = self.cursors;
201
202 self.check_reader_fin(reader)?;
203
204 if let Err(err) = self.write_reader_impl(reader) {
205 use core::any::TypeId;
206 if TypeId::of::<R::Error>() != TypeId::of::<core::convert::Infallible>() {
207 self.cursors = snapshot;
208 }
209 return Err(Error::ReaderError(err));
210 }
211
212 self.invariants();
213
214 Ok(())
215 }
216
217 #[inline]
219 fn check_reader_fin<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
220 where
221 R: Reader + ?Sized,
222 {
223 let buffered_offset = reader
224 .current_offset()
225 .checked_add_usize(reader.buffered_len())
226 .ok_or(Error::OutOfRange)?
227 .as_u64();
228
229 match (reader.final_offset(), self.final_size()) {
236 (Some(actual), Some(expected)) => {
237 ensure!(actual == expected, Err(Error::InvalidFin));
238 }
239 (Some(final_offset), None) => {
240 let final_offset = final_offset.as_u64();
241
242 ensure!(
244 self.cursors.max_recv_offset <= final_offset,
245 Err(Error::InvalidFin)
246 );
247
248 self.cursors.final_offset = final_offset;
249 }
250 (None, Some(expected)) => {
251 ensure!(expected >= buffered_offset, Err(Error::InvalidFin));
253 }
254 (None, None) => {}
255 }
256
257 self.cursors.max_recv_offset = self.cursors.max_recv_offset.max(buffered_offset);
259
260 Ok(())
261 }
262
263 #[inline(always)]
264 fn write_reader_impl<R>(&mut self, reader: &mut R) -> Result<(), R::Error>
265 where
266 R: Reader + ?Sized,
267 {
268 if reader.buffer_is_empty() {
270 let _chunk = reader.read_chunk(0)?;
271 return Ok(());
272 }
273
274 let mut selected = None;
275
276 for idx in (0..self.slots.len()).rev() {
278 let Some(slot) = self.slots.get(idx) else {
279 unsafe {
280 assume!(false);
283 }
284 };
285
286 ensure!(slot.start() <= reader.current_offset().as_u64(), continue);
288
289 selected = Some(idx);
290 break;
291 }
292
293 let idx = if let Some(idx) = selected {
294 idx
295 } else {
296 let mut idx = 0;
297 let mut slot = self.allocate_slot(reader);
299
300 let filled = slot.try_write_reader(reader, &mut true)?;
302
303 if let Some(slot) = filled {
304 self.slots.push_front(slot);
305 idx += 1;
306 }
307 self.slots.push_front(slot);
308
309 ensure!(!reader.buffer_is_empty(), Ok(()));
310
311 idx
312 };
313
314 self.write_reader_at(reader, idx)?;
315 Ok(())
316 }
317
318 #[inline(always)]
319 fn write_reader_at<R>(&mut self, reader: &mut R, mut idx: usize) -> Result<(), R::Error>
320 where
321 R: Reader + ?Sized,
322 {
323 let initial_idx = idx;
324 let mut filled_slot = false;
325
326 unsafe {
327 assume!(
328 !reader.buffer_is_empty(),
329 "the first write should always be non-empty"
330 );
331 }
332
333 while !reader.buffer_is_empty() {
334 let Some(slot) = self.slots.get_mut(idx) else {
335 unsafe {
336 assume!(false);
339 }
340 };
341
342 let filled = slot.try_write_reader(reader, &mut filled_slot)?;
344
345 idx += 1;
346 if let Some(slot) = filled {
347 self.insert(idx, slot);
348 idx += 1;
349 }
350
351 ensure!(!reader.buffer_is_empty(), break);
353
354 self.write_reader_with_alloc(reader, &mut idx, &mut filled_slot)?;
356
357 continue;
358 }
359
360 if filled_slot {
362 self.unsplit_range(initial_idx..idx);
363 }
364
365 Ok(())
366 }
367
368 #[inline(always)]
369 fn write_reader_with_alloc<R>(
370 &mut self,
371 reader: &mut R,
372 idx: &mut usize,
373 filled_slot: &mut bool,
374 ) -> Result<(), R::Error>
375 where
376 R: Reader + ?Sized,
377 {
378 while !reader.buffer_is_empty() {
379 if let Some(next) = self.slots.get(*idx) {
380 ensure!(next.start() > reader.current_offset().as_u64(), break);
382 }
383
384 let mut slot = self.allocate_slot(reader);
386
387 let filled = slot.try_write_reader(reader, filled_slot)?;
389
390 self.insert(*idx, slot);
392
393 *idx += 1;
394 if let Some(slot) = filled {
395 self.insert(*idx, slot);
396 *idx += 1;
397 }
398 }
399
400 Ok(())
401 }
402
403 #[inline]
404 fn unsplit_range(&mut self, range: core::ops::Range<usize>) {
405 for idx in range.rev() {
407 let Some(slot) = self.slots.get(idx) else {
408 unsafe {
409 assume!(false);
412 }
413 };
414
415 ensure!(slot.is_full(), continue);
417
418 let start = slot.start();
419 let end = slot.end();
420
421 let Some(next) = self.slots.get(idx + 1) else {
422 continue;
423 };
424
425 ensure!(next.start() == end, continue);
426
427 let current_block = Self::align_offset(start, Self::allocation_size(start));
428 let next_block = Self::align_offset(next.start(), Self::allocation_size(next.start()));
429 ensure!(current_block == next_block, continue);
430
431 if let Some(next) = self.slots.remove(idx + 1) {
432 self.slots[idx].unsplit(next);
433 } else {
434 unsafe {
435 assume!(false, "idx + 1 was checked above");
438 }
439 }
440 }
441 }
442
443 #[inline]
448 pub fn skip(&mut self, len: VarInt) -> Result<(), Error> {
449 ensure!(len > VarInt::ZERO, Ok(()));
451
452 let new_start_offset = self
453 .cursors
454 .start_offset
455 .checked_add(len.as_u64())
456 .and_then(|v| VarInt::new(v).ok())
457 .ok_or(Error::OutOfRange)?;
458
459 if let Some(final_size) = self.final_size() {
460 ensure!(
461 final_size >= new_start_offset.as_u64(),
462 Err(Error::InvalidFin)
463 );
464 }
465
466 self.cursors.max_recv_offset = self.cursors.max_recv_offset.max(new_start_offset.as_u64());
468
469 self.cursors.start_offset = new_start_offset.as_u64();
471
472 while let Some(mut slot) = self.slots.pop_front() {
474 if slot.end_allocated() < new_start_offset.as_u64() {
476 continue;
477 }
478
479 slot.skip_until(new_start_offset).unwrap();
481
482 if !slot.should_drop() {
484 self.slots.push_front(slot);
485 }
486
487 break;
488 }
489
490 self.invariants();
491
492 Ok(())
493 }
494
495 #[inline]
497 pub fn iter(&self) -> impl Iterator<Item = &[u8]> {
498 Iter::new(self)
499 }
500
501 #[inline]
503 pub fn drain(&mut self) -> impl Iterator<Item = BytesMut> + '_ {
504 Drain { inner: self }
505 }
506
507 #[inline]
509 pub fn pop(&mut self) -> Option<BytesMut> {
510 self.pop_watermarked(usize::MAX)
511 }
512
513 #[inline]
516 pub fn pop_watermarked(&mut self, watermark: usize) -> Option<BytesMut> {
517 let Chunk::BytesMut(chunk) = self.infallible_read_chunk(watermark) else {
518 unsafe { assume!(false) }
519 };
520
521 ensure!(!chunk.is_empty(), None);
522
523 Some(chunk)
524 }
525
526 #[inline]
529 pub fn consumed_len(&self) -> u64 {
530 self.cursors.start_offset
531 }
532
533 #[inline]
538 pub fn total_received_len(&self) -> u64 {
539 let mut offset = self.cursors.start_offset;
540
541 for slot in &self.slots {
542 ensure!(slot.is_occupied(offset), offset);
543 offset = slot.end();
544 }
545
546 offset
547 }
548
549 #[inline]
553 pub fn reset(&mut self) {
554 self.slots.clear();
555 self.cursors = Default::default();
556 }
557
558 #[inline(always)]
559 fn insert(&mut self, idx: usize, slot: Slot) {
560 if self.slots.len() < idx {
561 debug_assert_eq!(self.slots.len() + 1, idx);
562 self.slots.push_back(slot);
563 } else {
564 self.slots.insert(idx, slot);
565 }
566 }
567
568 #[inline]
570 fn allocate_slot<R>(&mut self, reader: &R) -> Slot
571 where
572 R: Reader + ?Sized,
573 {
574 let start = reader.current_offset().as_u64();
575 let mut size = Self::allocation_size(start);
576 let mut offset = Self::align_offset(start, size);
577
578 if let Some(diff) = self.cursors.start_offset.checked_sub(offset) {
580 if diff > 0 {
581 debug_assert!(
582 reader.current_offset().as_u64() >= self.cursors.start_offset,
583 "requests should be split before allocating slots"
584 );
585 offset = self.cursors.start_offset;
586 size -= diff as usize;
587 }
588 }
589
590 if self.cursors.final_offset
591 - reader.current_offset().as_u64()
592 - reader.buffered_len() as u64
593 == 0
594 {
595 let size_candidate = (start - offset) as usize + reader.buffered_len();
596 if size_candidate < size {
597 size = size_candidate;
598 }
599 }
600
601 let buffer = BytesMut::with_capacity(size);
602
603 let end = offset + size as u64;
604 Slot::new(offset, end, buffer)
605 }
606
607 #[inline(always)]
609 fn align_offset(offset: u64, alignment: usize) -> u64 {
610 unsafe {
611 assume!(alignment > 0);
612 }
613 (offset / (alignment as u64)) * (alignment as u64)
614 }
615
616 #[inline(always)]
631 fn allocation_size(offset: u64) -> usize {
632 for pow in (2..=4).rev() {
633 let mult = 1 << pow;
634 let square = mult * mult;
635 let min_offset = (MIN_BUFFER_ALLOCATION_SIZE * square) as u64;
636 let allocation_size = MIN_BUFFER_ALLOCATION_SIZE * mult;
637
638 if offset >= min_offset {
639 return allocation_size;
640 }
641 }
642
643 MIN_BUFFER_ALLOCATION_SIZE
644 }
645
646 #[inline(always)]
647 fn invariants(&self) {
648 if cfg!(debug_assertions) {
649 assert_eq!(
650 self.total_received_len(),
651 self.consumed_len() + self.len() as u64
652 );
653
654 let (actual_len, chunks) = self.report();
655
656 assert_eq!(actual_len == 0, self.is_empty());
657 assert_eq!(self.iter().count(), chunks);
658
659 let mut prev_end = self.cursors.start_offset;
660
661 for (idx, slot) in self.slots.iter().enumerate() {
662 assert!(slot.start() >= prev_end, "{self:#?}");
663 assert!(!slot.should_drop(), "slot range should be non-empty");
664 prev_end = slot.end_allocated();
665
666 if slot.is_full() {
668 let start = slot.start();
669 let end = slot.end();
670
671 let Some(next) = self.slots.get(idx + 1) else {
672 continue;
673 };
674
675 ensure!(next.start() == end, continue);
676
677 let current_block = Self::align_offset(start, Self::allocation_size(start));
678 let next_block =
679 Self::align_offset(next.start(), Self::allocation_size(next.start()));
680 ensure!(current_block == next_block, continue);
681
682 panic!("unmerged slots at {idx} and {} {self:#?}", idx + 1);
683 }
684 }
685 }
686 }
687}
688
689pub struct Iter<'a> {
690 prev_end: u64,
691 inner: vec_deque::Iter<'a, Slot>,
692}
693
694impl<'a> Iter<'a> {
695 #[inline]
696 fn new(buffer: &'a Reassembler) -> Self {
697 Self {
698 prev_end: buffer.cursors.start_offset,
699 inner: buffer.slots.iter(),
700 }
701 }
702}
703
704impl<'a> Iterator for Iter<'a> {
705 type Item = &'a [u8];
706
707 #[inline]
708 fn next(&mut self) -> Option<Self::Item> {
709 let slot = self.inner.next()?;
710
711 ensure!(slot.is_occupied(self.prev_end), None);
712
713 self.prev_end = slot.end();
714 Some(slot.as_slice())
715 }
716}
717
718pub struct Drain<'a> {
719 inner: &'a mut Reassembler,
720}
721
722impl Iterator for Drain<'_> {
723 type Item = BytesMut;
724
725 #[inline]
726 fn next(&mut self) -> Option<Self::Item> {
727 self.inner.pop()
728 }
729
730 #[inline]
731 fn size_hint(&self) -> (usize, Option<usize>) {
732 let len = self.inner.slots.len();
733 (len, Some(len))
734 }
735}