1use crate::{
7    buffer::{
8        reader::storage::{Chunk, Infallible as _},
9        Error, Reader,
10    },
11    varint::VarInt,
12};
13use alloc::collections::{vec_deque, VecDeque};
14use bytes::BytesMut;
15
16mod duplex;
17mod probe;
18mod reader;
19mod request;
20mod slot;
21mod writer;
22
23#[cfg(test)]
24mod tests;
25
26use request::Request;
27use slot::Slot;
28
29const MIN_BUFFER_ALLOCATION_SIZE: usize = 4096;
33
34const UNKNOWN_FINAL_SIZE: u64 = u64::MAX;
39
40#[derive(Debug, PartialEq, Default)]
85pub struct Reassembler {
86    slots: VecDeque<Slot>,
87    cursors: Cursors,
88}
89
90#[derive(Clone, Copy, Debug, PartialEq)]
91struct Cursors {
92    start_offset: u64,
93    max_recv_offset: u64,
94    final_offset: u64,
95}
96
97impl Cursors {
98    #[inline]
99    fn final_size(&self) -> Option<u64> {
100        if self.final_offset == UNKNOWN_FINAL_SIZE {
101            None
102        } else {
103            Some(self.final_offset)
104        }
105    }
106}
107
108impl Default for Cursors {
109    #[inline]
110    fn default() -> Self {
111        Self {
112            start_offset: 0,
113            max_recv_offset: 0,
114            final_offset: UNKNOWN_FINAL_SIZE,
115        }
116    }
117}
118
119impl Reassembler {
120    #[inline]
122    pub fn new() -> Reassembler {
123        Self::default()
124    }
125
126    #[inline]
128    pub fn is_writing_complete(&self) -> bool {
129        self.final_size()
130            .is_some_and(|len| self.total_received_len() == len)
131    }
132
133    #[inline]
135    pub fn is_reading_complete(&self) -> bool {
136        self.final_size() == Some(self.cursors.start_offset)
137    }
138
139    #[inline]
141    pub fn final_size(&self) -> Option<u64> {
142        self.cursors.final_size()
143    }
144
145    #[inline]
149    pub fn len(&self) -> usize {
150        self.report().0
151    }
152
153    #[inline]
155    pub fn is_empty(&self) -> bool {
156        if let Some(slot) = self.slots.front() {
157            !slot.is_occupied(self.cursors.start_offset)
158        } else {
159            true
160        }
161    }
162
163    #[inline]
165    pub fn report(&self) -> (usize, usize) {
166        let mut bytes = 0;
167        let mut chunks = 0;
168        for chunk in self.iter() {
169            bytes += chunk.len();
170            chunks += 1;
171        }
172        (bytes, chunks)
173    }
174
175    #[inline]
177    pub fn write_at(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> {
178        let mut request = Request::new(offset, data, false)?;
179        self.write_reader(&mut request)?;
180        Ok(())
181    }
182
183    #[inline]
185    pub fn write_at_fin(&mut self, offset: VarInt, data: &[u8]) -> Result<(), Error> {
186        let mut request = Request::new(offset, data, true)?;
187        self.write_reader(&mut request)?;
188        Ok(())
189    }
190
191    #[inline]
192    pub fn write_reader<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
193    where
194        R: Reader + ?Sized,
195    {
196        reader.skip_until(self.current_offset())?;
198
199        let snapshot = self.cursors;
201
202        self.check_reader_fin(reader)?;
203
204        if let Err(err) = self.write_reader_impl(reader) {
205            use core::any::TypeId;
206            if TypeId::of::<R::Error>() != TypeId::of::<core::convert::Infallible>() {
207                self.cursors = snapshot;
208            }
209            return Err(Error::ReaderError(err));
210        }
211
212        self.invariants();
213
214        Ok(())
215    }
216
217    #[inline]
219    fn check_reader_fin<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
220    where
221        R: Reader + ?Sized,
222    {
223        let buffered_offset = reader
224            .current_offset()
225            .checked_add_usize(reader.buffered_len())
226            .ok_or(Error::OutOfRange)?
227            .as_u64();
228
229        match (reader.final_offset(), self.final_size()) {
236            (Some(actual), Some(expected)) => {
237                ensure!(actual == expected, Err(Error::InvalidFin));
238            }
239            (Some(final_offset), None) => {
240                let final_offset = final_offset.as_u64();
241
242                ensure!(
244                    self.cursors.max_recv_offset <= final_offset,
245                    Err(Error::InvalidFin)
246                );
247
248                self.cursors.final_offset = final_offset;
249            }
250            (None, Some(expected)) => {
251                ensure!(expected >= buffered_offset, Err(Error::InvalidFin));
253            }
254            (None, None) => {}
255        }
256
257        self.cursors.max_recv_offset = self.cursors.max_recv_offset.max(buffered_offset);
259
260        Ok(())
261    }
262
263    #[inline(always)]
264    fn write_reader_impl<R>(&mut self, reader: &mut R) -> Result<(), R::Error>
265    where
266        R: Reader + ?Sized,
267    {
268        if reader.buffer_is_empty() {
270            let _chunk = reader.read_chunk(0)?;
271            return Ok(());
272        }
273
274        let mut selected = None;
275
276        for idx in (0..self.slots.len()).rev() {
278            let Some(slot) = self.slots.get(idx) else {
279                unsafe {
280                    assume!(false);
283                }
284            };
285
286            ensure!(slot.start() <= reader.current_offset().as_u64(), continue);
288
289            selected = Some(idx);
290            break;
291        }
292
293        let idx = if let Some(idx) = selected {
294            idx
295        } else {
296            let mut idx = 0;
297            let mut slot = self.allocate_slot(reader);
299
300            let filled = slot.try_write_reader(reader, &mut true)?;
302
303            if let Some(slot) = filled {
304                self.slots.push_front(slot);
305                idx += 1;
306            }
307            self.slots.push_front(slot);
308
309            ensure!(!reader.buffer_is_empty(), Ok(()));
310
311            idx
312        };
313
314        self.write_reader_at(reader, idx)?;
315        Ok(())
316    }
317
318    #[inline(always)]
319    fn write_reader_at<R>(&mut self, reader: &mut R, mut idx: usize) -> Result<(), R::Error>
320    where
321        R: Reader + ?Sized,
322    {
323        let initial_idx = idx;
324        let mut filled_slot = false;
325
326        unsafe {
327            assume!(
328                !reader.buffer_is_empty(),
329                "the first write should always be non-empty"
330            );
331        }
332
333        while !reader.buffer_is_empty() {
334            let Some(slot) = self.slots.get_mut(idx) else {
335                unsafe {
336                    assume!(false);
339                }
340            };
341
342            let filled = slot.try_write_reader(reader, &mut filled_slot)?;
344
345            idx += 1;
346            if let Some(slot) = filled {
347                self.insert(idx, slot);
348                idx += 1;
349            }
350
351            ensure!(!reader.buffer_is_empty(), break);
353
354            self.write_reader_with_alloc(reader, &mut idx, &mut filled_slot)?;
356
357            continue;
358        }
359
360        if filled_slot {
362            self.unsplit_range(initial_idx..idx);
363        }
364
365        Ok(())
366    }
367
368    #[inline(always)]
369    fn write_reader_with_alloc<R>(
370        &mut self,
371        reader: &mut R,
372        idx: &mut usize,
373        filled_slot: &mut bool,
374    ) -> Result<(), R::Error>
375    where
376        R: Reader + ?Sized,
377    {
378        while !reader.buffer_is_empty() {
379            if let Some(next) = self.slots.get(*idx) {
380                ensure!(next.start() > reader.current_offset().as_u64(), break);
382            }
383
384            let mut slot = self.allocate_slot(reader);
386
387            let filled = slot.try_write_reader(reader, filled_slot)?;
389
390            self.insert(*idx, slot);
392
393            *idx += 1;
394            if let Some(slot) = filled {
395                self.insert(*idx, slot);
396                *idx += 1;
397            }
398        }
399
400        Ok(())
401    }
402
403    #[inline]
404    fn unsplit_range(&mut self, range: core::ops::Range<usize>) {
405        for idx in range.rev() {
407            let Some(slot) = self.slots.get(idx) else {
408                unsafe {
409                    assume!(false);
412                }
413            };
414
415            ensure!(slot.is_full(), continue);
417
418            let start = slot.start();
419            let end = slot.end();
420
421            let Some(next) = self.slots.get(idx + 1) else {
422                continue;
423            };
424
425            ensure!(next.start() == end, continue);
426
427            let current_block = Self::align_offset(start, Self::allocation_size(start));
428            let next_block = Self::align_offset(next.start(), Self::allocation_size(next.start()));
429            ensure!(current_block == next_block, continue);
430
431            if let Some(next) = self.slots.remove(idx + 1) {
432                self.slots[idx].unsplit(next);
433            } else {
434                unsafe {
435                    assume!(false, "idx + 1 was checked above");
438                }
439            }
440        }
441    }
442
443    #[inline]
448    pub fn skip(&mut self, len: VarInt) -> Result<(), Error> {
449        ensure!(len > VarInt::ZERO, Ok(()));
451
452        let new_start_offset = self
453            .cursors
454            .start_offset
455            .checked_add(len.as_u64())
456            .and_then(|v| VarInt::new(v).ok())
457            .ok_or(Error::OutOfRange)?;
458
459        if let Some(final_size) = self.final_size() {
460            ensure!(
461                final_size >= new_start_offset.as_u64(),
462                Err(Error::InvalidFin)
463            );
464        }
465
466        self.cursors.max_recv_offset = self.cursors.max_recv_offset.max(new_start_offset.as_u64());
468
469        self.cursors.start_offset = new_start_offset.as_u64();
471
472        while let Some(mut slot) = self.slots.pop_front() {
474            if slot.end_allocated() < new_start_offset.as_u64() {
476                continue;
477            }
478
479            slot.skip_until(new_start_offset).unwrap();
481
482            if !slot.should_drop() {
484                self.slots.push_front(slot);
485            }
486
487            break;
488        }
489
490        self.invariants();
491
492        Ok(())
493    }
494
495    #[inline]
497    pub fn iter(&self) -> impl Iterator<Item = &[u8]> {
498        Iter::new(self)
499    }
500
501    #[inline]
503    pub fn drain(&mut self) -> impl Iterator<Item = BytesMut> + '_ {
504        Drain { inner: self }
505    }
506
507    #[inline]
509    pub fn pop(&mut self) -> Option<BytesMut> {
510        self.pop_watermarked(usize::MAX)
511    }
512
513    #[inline]
516    pub fn pop_watermarked(&mut self, watermark: usize) -> Option<BytesMut> {
517        let Chunk::BytesMut(chunk) = self.infallible_read_chunk(watermark) else {
518            unsafe { assume!(false) }
519        };
520
521        ensure!(!chunk.is_empty(), None);
522
523        Some(chunk)
524    }
525
526    #[inline]
529    pub fn consumed_len(&self) -> u64 {
530        self.cursors.start_offset
531    }
532
533    #[inline]
538    pub fn total_received_len(&self) -> u64 {
539        let mut offset = self.cursors.start_offset;
540
541        for slot in &self.slots {
542            ensure!(slot.is_occupied(offset), offset);
543            offset = slot.end();
544        }
545
546        offset
547    }
548
549    #[inline]
553    pub fn reset(&mut self) {
554        self.slots.clear();
555        self.cursors = Default::default();
556    }
557
558    #[inline(always)]
559    fn insert(&mut self, idx: usize, slot: Slot) {
560        if self.slots.len() < idx {
561            debug_assert_eq!(self.slots.len() + 1, idx);
562            self.slots.push_back(slot);
563        } else {
564            self.slots.insert(idx, slot);
565        }
566    }
567
568    #[inline]
570    fn allocate_slot<R>(&mut self, reader: &R) -> Slot
571    where
572        R: Reader + ?Sized,
573    {
574        let start = reader.current_offset().as_u64();
575        let mut size = Self::allocation_size(start);
576        let mut offset = Self::align_offset(start, size);
577
578        if let Some(diff) = self.cursors.start_offset.checked_sub(offset) {
580            if diff > 0 {
581                debug_assert!(
582                    reader.current_offset().as_u64() >= self.cursors.start_offset,
583                    "requests should be split before allocating slots"
584                );
585                offset = self.cursors.start_offset;
586                size -= diff as usize;
587            }
588        }
589
590        if self.cursors.final_offset
591            - reader.current_offset().as_u64()
592            - reader.buffered_len() as u64
593            == 0
594        {
595            let size_candidate = (start - offset) as usize + reader.buffered_len();
596            if size_candidate < size {
597                size = size_candidate;
598            }
599        }
600
601        let buffer = BytesMut::with_capacity(size);
602
603        let end = offset + size as u64;
604        Slot::new(offset, end, buffer)
605    }
606
607    #[inline(always)]
609    fn align_offset(offset: u64, alignment: usize) -> u64 {
610        unsafe {
611            assume!(alignment > 0);
612        }
613        (offset / (alignment as u64)) * (alignment as u64)
614    }
615
616    #[inline(always)]
631    fn allocation_size(offset: u64) -> usize {
632        for pow in (2..=4).rev() {
633            let mult = 1 << pow;
634            let square = mult * mult;
635            let min_offset = (MIN_BUFFER_ALLOCATION_SIZE * square) as u64;
636            let allocation_size = MIN_BUFFER_ALLOCATION_SIZE * mult;
637
638            if offset >= min_offset {
639                return allocation_size;
640            }
641        }
642
643        MIN_BUFFER_ALLOCATION_SIZE
644    }
645
646    #[inline(always)]
647    fn invariants(&self) {
648        if cfg!(debug_assertions) {
649            assert_eq!(
650                self.total_received_len(),
651                self.consumed_len() + self.len() as u64
652            );
653
654            let (actual_len, chunks) = self.report();
655
656            assert_eq!(actual_len == 0, self.is_empty());
657            assert_eq!(self.iter().count(), chunks);
658
659            let mut prev_end = self.cursors.start_offset;
660
661            for (idx, slot) in self.slots.iter().enumerate() {
662                assert!(slot.start() >= prev_end, "{self:#?}");
663                assert!(!slot.should_drop(), "slot range should be non-empty");
664                prev_end = slot.end_allocated();
665
666                if slot.is_full() {
668                    let start = slot.start();
669                    let end = slot.end();
670
671                    let Some(next) = self.slots.get(idx + 1) else {
672                        continue;
673                    };
674
675                    ensure!(next.start() == end, continue);
676
677                    let current_block = Self::align_offset(start, Self::allocation_size(start));
678                    let next_block =
679                        Self::align_offset(next.start(), Self::allocation_size(next.start()));
680                    ensure!(current_block == next_block, continue);
681
682                    panic!("unmerged slots at {idx} and {} {self:#?}", idx + 1);
683                }
684            }
685        }
686    }
687}
688
689pub struct Iter<'a> {
690    prev_end: u64,
691    inner: vec_deque::Iter<'a, Slot>,
692}
693
694impl<'a> Iter<'a> {
695    #[inline]
696    fn new(buffer: &'a Reassembler) -> Self {
697        Self {
698            prev_end: buffer.cursors.start_offset,
699            inner: buffer.slots.iter(),
700        }
701    }
702}
703
704impl<'a> Iterator for Iter<'a> {
705    type Item = &'a [u8];
706
707    #[inline]
708    fn next(&mut self) -> Option<Self::Item> {
709        let slot = self.inner.next()?;
710
711        ensure!(slot.is_occupied(self.prev_end), None);
712
713        self.prev_end = slot.end();
714        Some(slot.as_slice())
715    }
716}
717
718pub struct Drain<'a> {
719    inner: &'a mut Reassembler,
720}
721
722impl Iterator for Drain<'_> {
723    type Item = BytesMut;
724
725    #[inline]
726    fn next(&mut self) -> Option<Self::Item> {
727        self.inner.pop()
728    }
729
730    #[inline]
731    fn size_hint(&self) -> (usize, Option<usize>) {
732        let len = self.inner.slots.len();
733        (len, Some(len))
734    }
735}