use std::cmp;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::time::Duration;
use std::time::Instant;
use crate::stream::RecvAction;
use crate::stream::RecvBufResetReturn;
use crate::Error;
use crate::Result;
use crate::flowcontrol;
use crate::range_buf::RangeBuf;
#[derive(Debug, Default)]
pub struct RecvBuf {
data: BTreeMap<u64, RangeBuf>,
off: u64,
len: u64,
flow_control: flowcontrol::FlowControl,
fin_off: Option<u64>,
error: Option<u64>,
drain: bool,
}
impl RecvBuf {
pub fn new(max_data: u64, initial_window: u64, max_window: u64) -> RecvBuf {
RecvBuf {
flow_control: flowcontrol::FlowControl::new(
max_data,
initial_window,
max_window,
),
..RecvBuf::default()
}
}
pub fn write(&mut self, buf: RangeBuf) -> Result<()> {
if buf.max_off() > self.max_data() {
return Err(Error::FlowControl);
}
if let Some(fin_off) = self.fin_off {
if buf.max_off() > fin_off {
return Err(Error::FinalSize);
}
if buf.fin() && fin_off != buf.max_off() {
return Err(Error::FinalSize);
}
}
if buf.fin() && buf.max_off() < self.len {
return Err(Error::FinalSize);
}
if self.fin_off.is_some() && buf.is_empty() {
return Ok(());
}
if buf.fin() {
self.fin_off = Some(buf.max_off());
}
if !buf.fin() && buf.is_empty() {
return Ok(());
}
if self.off >= buf.max_off() {
if !buf.is_empty() {
return Ok(());
}
}
let mut tmp_bufs = VecDeque::with_capacity(2);
tmp_bufs.push_back(buf);
'tmp: while let Some(mut buf) = tmp_bufs.pop_front() {
if self.off_front() > buf.off() {
buf = buf.split_off((self.off_front() - buf.off()) as usize);
}
if buf.off() < self.max_off() || buf.is_empty() {
for (_, b) in self.data.range(buf.off()..) {
let off = buf.off();
if b.off() > buf.max_off() {
break;
}
if off >= b.off() && buf.max_off() <= b.max_off() {
continue 'tmp;
}
if off >= b.off() && off < b.max_off() {
buf = buf.split_off((b.max_off() - off) as usize);
}
if off < b.off() && buf.max_off() > b.off() {
tmp_bufs
.push_back(buf.split_off((b.off() - off) as usize));
}
}
}
self.len = cmp::max(self.len, buf.max_off());
if !self.drain {
self.data.insert(buf.max_off(), buf);
} else {
self.off = self.len;
}
}
Ok(())
}
#[inline]
pub fn emit(&mut self, mut out: &mut [u8]) -> Result<(usize, bool)> {
self.emit_or_discard(RecvAction::Emit { out: &mut out })
}
pub fn emit_or_discard<B: bytes::BufMut>(
&mut self, mut action: RecvAction<B>,
) -> Result<(usize, bool)> {
let mut len = 0;
let mut cap = match &action {
RecvAction::Emit { out } => out.remaining_mut(),
RecvAction::Discard { len } => *len,
};
if !self.ready() {
return Err(Error::Done);
}
if let Some(e) = self.error {
self.data.clear();
return Err(Error::StreamReset(e));
}
while cap > 0 && self.ready() {
let mut entry = match self.data.first_entry() {
Some(entry) => entry,
None => break,
};
let buf = entry.get_mut();
let buf_len = cmp::min(buf.len(), cap);
if let RecvAction::Emit { ref mut out } = action {
debug_assert!(
cap <= out.remaining_mut(),
"We updated `cap` incorrectly"
);
out.put_slice(&buf[..buf_len])
}
self.off += buf_len as u64;
len += buf_len;
cap -= buf_len;
if buf_len < buf.len() {
buf.consume(buf_len);
break;
}
entry.remove();
}
self.flow_control.add_consumed(len as u64);
Ok((len, self.is_fin()))
}
pub fn reset(
&mut self, error_code: u64, final_size: u64,
) -> Result<RecvBufResetReturn> {
if let Some(fin_off) = self.fin_off {
if fin_off != final_size {
return Err(Error::FinalSize);
}
}
if final_size < self.len {
return Err(Error::FinalSize);
}
if self.error.is_some() {
return Ok(RecvBufResetReturn::zero());
}
let result = RecvBufResetReturn {
max_data_delta: final_size - self.len,
consumed_flowcontrol: final_size - self.off,
};
self.error = Some(error_code);
self.off = final_size;
self.data.clear();
let buf = RangeBuf::from(b"", final_size, true);
self.write(buf)?;
Ok(result)
}
pub fn update_max_data(&mut self, now: Instant) {
self.flow_control.update_max_data(now);
}
pub fn max_data_next(&mut self) -> u64 {
self.flow_control.max_data_next()
}
pub fn max_data(&self) -> u64 {
self.flow_control.max_data()
}
pub fn window(&self) -> u64 {
self.flow_control.window()
}
pub fn autotune_window(&mut self, now: Instant, rtt: Duration) {
self.flow_control.autotune_window(now, rtt);
}
pub fn shutdown(&mut self) -> Result<u64> {
if self.drain {
return Err(Error::Done);
}
self.drain = true;
self.data.clear();
let consumed = self.max_off() - self.off;
self.off = self.max_off();
Ok(consumed)
}
pub fn off_front(&self) -> u64 {
self.off
}
pub fn almost_full(&self) -> bool {
self.fin_off.is_none() && self.flow_control.should_update_max_data()
}
pub fn max_off(&self) -> u64 {
self.len
}
pub fn is_fin(&self) -> bool {
if self.fin_off == Some(self.off) {
return true;
}
false
}
pub fn is_draining(&self) -> bool {
self.drain
}
pub fn ready(&self) -> bool {
let (_, buf) = match self.data.first_key_value() {
Some(v) => v,
None => return false,
};
buf.off() == self.off
}
#[cfg(test)]
pub(crate) fn flow_control_for_tests(&self) -> &flowcontrol::FlowControl {
&self.flow_control
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stream::DEFAULT_STREAM_WINDOW;
use bytes::BufMut as _;
use rstest::rstest;
fn assert_emit_discard(
recv: &mut RecvBuf, emit: bool, target_len: usize, result_len: usize,
is_fin: bool, test_bytes: Option<&[u8]>,
) {
let mut buf = Vec::<u8>::with_capacity(512).limit(target_len);
let action = if emit {
RecvAction::Emit { out: &mut buf }
} else {
RecvAction::Discard { len: target_len }
};
let (read, fin) = recv.emit_or_discard(action).unwrap();
let buf = buf.into_inner();
if emit {
assert_eq!(buf.len(), read);
if let Some(v) = test_bytes {
assert_eq!(&buf, v);
}
}
assert_eq!(read, result_len);
assert_eq!(is_fin, fin);
}
fn assert_emit_discard_done(recv: &mut RecvBuf, emit: bool) {
let mut buf = [0u8; 32];
let action = if emit {
RecvAction::Emit {
out: &mut buf.as_mut_slice(),
}
} else {
RecvAction::Discard { len: 32 }
};
assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
}
#[rstest]
fn empty_read(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn empty_stream_frame(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(15, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let buf = RangeBuf::from(b"hello", 0, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert_emit_discard(&mut recv, emit, 32, 5, false, None);
let buf = RangeBuf::from(b"", 10, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 0);
let buf = RangeBuf::from(b"", 16, false);
assert_eq!(recv.write(buf), Err(Error::FlowControl));
let buf = RangeBuf::from(b"", 5, true);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
let buf = RangeBuf::from(b"", 5, true);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
let buf = RangeBuf::from(b"aa", 3, true);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
let buf = RangeBuf::from(b"", 6, true);
assert_eq!(recv.write(buf), Err(Error::FinalSize));
let buf = RangeBuf::from(b"", 4, true);
assert_eq!(recv.write(buf), Err(Error::FinalSize));
assert_emit_discard(&mut recv, emit, 32, 0, true, None);
}
#[rstest]
fn ordered_read(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, true);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 0);
assert_emit_discard_done(&mut recv, emit);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_emit_discard_done(&mut recv, emit);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_emit_discard(
&mut recv,
emit,
32,
19,
true,
Some(b"helloworldsomething"),
);
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn shutdown(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 0);
assert_emit_discard_done(&mut recv, emit);
assert_eq!(recv.shutdown(), Ok(10));
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 10);
assert_eq!(recv.data.len(), 0);
assert_emit_discard_done(&mut recv, emit);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 10);
assert_eq!(recv.data.len(), 0);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
assert_eq!(recv.data.len(), 0);
assert_emit_discard_done(&mut recv, emit);
assert_eq!(
recv.reset(42, 123),
Ok(RecvBufResetReturn {
max_data_delta: 104,
consumed_flowcontrol: 104,
})
);
assert_eq!(recv.len, 123);
assert_eq!(recv.off, 123);
assert_eq!(recv.data.len(), 0);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn split_read(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_emit_discard(&mut recv, emit, 10, 10, false, Some(b"somethingh"));
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 10);
assert_emit_discard(&mut recv, emit, 5, 5, false, Some(b"ellow"));
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 15);
assert_emit_discard(&mut recv, emit, 5, 4, true, Some(b"orld"));
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
}
#[test]
fn split_read_incremental_buf() {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let mut buf = Vec::new().limit(10);
assert_eq!(
recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
Ok((10, false))
);
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 10);
assert_eq!(buf.get_ref().len(), 10);
assert_eq!(buf.get_ref().as_slice(), b"somethingh");
buf.set_limit(5);
assert_eq!(
recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
Ok((5, false))
);
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 15);
assert_eq!(buf.get_ref().len(), 15);
assert_eq!(buf.get_ref().as_slice(), b"somethinghellow");
buf.set_limit(42);
assert_eq!(
recv.emit_or_discard(RecvAction::Emit { out: &mut buf }),
Ok((4, true))
);
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
assert_eq!(buf.get_ref().len(), 19);
assert_eq!(buf.get_ref().as_slice(), b"somethinghelloworld");
}
#[rstest]
fn incomplete_read(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let mut buf = [0u8; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let action = if emit {
RecvAction::Emit {
out: &mut buf.as_mut_slice(),
}
} else {
RecvAction::Discard { len: 32 }
};
assert_eq!(recv.emit_or_discard(action), Err(Error::Done));
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_emit_discard(
&mut recv,
emit,
32,
19,
true,
Some(b"somethinghelloworld"),
);
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
}
#[rstest]
fn zero_len_read(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"", 9, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert_emit_discard(&mut recv, emit, 32, 9, true, Some(b"something"));
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
}
#[rstest]
fn past_read(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
let third = RangeBuf::from(b"ello", 4, true);
let fourth = RangeBuf::from(b"ello", 5, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.write(third), Err(Error::FinalSize));
assert!(recv.write(fourth).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn fully_overlapping_read(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 4, false);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"something"));
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn fully_overlapping_read2(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 4, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somehello"));
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn fully_overlapping_read3(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 8);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert_emit_discard(&mut recv, emit, 32, 9, false, Some(b"somhellog"));
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn fully_overlapping_read_multi(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"somethingsomething", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
let third = RangeBuf::from(b"hello", 12, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 8);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 17);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 18);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
assert_emit_discard(
&mut recv,
emit,
32,
18,
false,
Some(b"somhellogsomhellog"),
);
assert_eq!(recv.len, 18);
assert_eq!(recv.off, 18);
assert_eq!(recv.data.len(), 0);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn overlapping_start_read(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 8, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert_emit_discard(
&mut recv,
emit,
32,
13,
true,
Some(b"somethingello"),
);
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 13);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn overlapping_end_read(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"something", 3, true);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert_emit_discard(&mut recv, emit, 32, 12, true, Some(b"helsomething"));
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 12);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn overlapping_end_twice_read(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"he", 0, false);
let second = RangeBuf::from(b"ow", 4, false);
let third = RangeBuf::from(b"rl", 7, false);
let fourth = RangeBuf::from(b"helloworld", 0, true);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert!(recv.write(fourth).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 6);
assert_emit_discard(&mut recv, emit, 32, 10, true, Some(b"helloworld"));
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 10);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn overlapping_end_twice_and_contained_read(
#[values(true, false)] emit: bool,
) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"hellow", 0, false);
let second = RangeBuf::from(b"barfoo", 10, true);
let third = RangeBuf::from(b"rl", 7, false);
let fourth = RangeBuf::from(b"elloworldbarfoo", 1, true);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 16);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 16);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert!(recv.write(fourth).is_ok());
assert_eq!(recv.len, 16);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
assert_emit_discard(
&mut recv,
emit,
32,
16,
true,
Some(b"helloworldbarfoo"),
);
assert_eq!(recv.len, 16);
assert_eq!(recv.off, 16);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn partially_multi_overlapping_reordered_read(
#[values(true, false)] emit: bool,
) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"hello", 8, false);
let second = RangeBuf::from(b"something", 0, false);
let third = RangeBuf::from(b"moar", 11, true);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 15);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert_emit_discard(
&mut recv,
emit,
32,
15,
true,
Some(b"somethinhelloar"),
);
assert_eq!(recv.len, 15);
assert_eq!(recv.off, 15);
assert_eq!(recv.data.len(), 0);
assert_emit_discard_done(&mut recv, emit);
}
#[rstest]
fn partially_multi_overlapping_reordered_read2(
#[values(true, false)] emit: bool,
) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"aaa", 0, false);
let second = RangeBuf::from(b"bbb", 2, false);
let third = RangeBuf::from(b"ccc", 4, false);
let fourth = RangeBuf::from(b"ddd", 6, false);
let fifth = RangeBuf::from(b"eee", 9, false);
let sixth = RangeBuf::from(b"fff", 11, false);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.write(fourth).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 4);
assert!(recv.write(sixth).is_ok());
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
assert!(recv.write(fifth).is_ok());
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 6);
assert_emit_discard(
&mut recv,
emit,
32,
14,
false,
Some(b"aabbbcdddeefff"),
);
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 14);
assert_eq!(recv.data.len(), 0);
assert_emit_discard_done(&mut recv, emit);
}
#[test]
fn mixed_read_actions() {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, true);
assert!(recv.write(second).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 0);
assert_emit_discard_done(&mut recv, true);
assert_emit_discard_done(&mut recv, false);
assert!(recv.write(third).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_emit_discard_done(&mut recv, true);
assert_emit_discard_done(&mut recv, false);
assert!(recv.write(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_emit_discard(&mut recv, true, 5, 5, false, Some(b"hello"));
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 5);
assert_emit_discard(&mut recv, false, 5, 5, false, None);
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 10);
assert_emit_discard(&mut recv, true, 9, 9, true, Some(b"something"));
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
assert_emit_discard_done(&mut recv, true);
assert_emit_discard_done(&mut recv, false);
}
}