use std::cmp;
use std::collections::hash_map;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use crate::Error;
use crate::Result;
const MAX_WRITE_SIZE: usize = 1000;
#[derive(Default)]
pub struct StreamMap {
streams: HashMap<u64, Stream>,
peer_max_streams_bidi: u64,
peer_max_streams_uni: u64,
local_max_streams_bidi: u64,
local_max_streams_uni: u64,
flushable: VecDeque<u64>,
readable: HashSet<u64>,
writable: HashSet<u64>,
almost_full: HashSet<u64>,
}
impl StreamMap {
pub fn get(&self, id: u64) -> Option<&Stream> {
self.streams.get(&id)
}
pub fn get_mut(&mut self, id: u64) -> Option<&mut Stream> {
self.streams.get_mut(&id)
}
pub(crate) fn get_or_create(
&mut self, id: u64, local_params: &crate::TransportParams,
peer_params: &crate::TransportParams, local: bool, is_server: bool,
) -> Result<&mut Stream> {
let stream = match self.streams.entry(id) {
hash_map::Entry::Vacant(v) => {
if local != is_local(id, is_server) {
return Err(Error::InvalidStreamState);
}
let (max_rx_data, max_tx_data) = match (local, is_bidi(id)) {
(true, true) => (
local_params.initial_max_stream_data_bidi_local,
peer_params.initial_max_stream_data_bidi_remote,
),
(true, false) => (0, peer_params.initial_max_stream_data_uni),
(false, true) => (
local_params.initial_max_stream_data_bidi_remote,
peer_params.initial_max_stream_data_bidi_local,
),
(false, false) =>
(local_params.initial_max_stream_data_uni, 0),
};
match (is_local(id, is_server), is_bidi(id)) {
(true, true) =>
self.peer_max_streams_bidi = self
.peer_max_streams_bidi
.checked_sub(1)
.ok_or(Error::StreamLimit)?,
(true, false) =>
self.peer_max_streams_uni = self
.peer_max_streams_uni
.checked_sub(1)
.ok_or(Error::StreamLimit)?,
(false, true) =>
self.local_max_streams_bidi = self
.local_max_streams_bidi
.checked_sub(1)
.ok_or(Error::StreamLimit)?,
(false, false) =>
self.local_max_streams_uni = self
.local_max_streams_uni
.checked_sub(1)
.ok_or(Error::StreamLimit)?,
};
let s = Stream::new(max_rx_data, max_tx_data);
v.insert(s)
},
hash_map::Entry::Occupied(v) => v.into_mut(),
};
if stream.is_writable() {
self.writable.insert(id);
}
Ok(stream)
}
pub fn push_flushable(&mut self, stream_id: u64) {
self.flushable.push_back(stream_id);
}
pub fn pop_flushable(&mut self) -> Option<u64> {
self.flushable.pop_front()
}
pub fn mark_readable(&mut self, stream_id: u64, readable: bool) {
if readable {
self.readable.insert(stream_id);
} else {
self.readable.remove(&stream_id);
}
}
pub fn mark_writable(&mut self, stream_id: u64, writable: bool) {
if writable {
self.writable.insert(stream_id);
} else {
self.writable.remove(&stream_id);
}
}
pub fn mark_almost_full(&mut self, stream_id: u64, almost_full: bool) {
if almost_full {
self.almost_full.insert(stream_id);
} else {
self.almost_full.remove(&stream_id);
}
}
pub fn update_local_max_streams_bidi(&mut self, v: u64) {
self.local_max_streams_bidi = cmp::max(self.local_max_streams_bidi, v);
}
pub fn update_local_max_streams_uni(&mut self, v: u64) {
self.local_max_streams_uni = cmp::max(self.local_max_streams_uni, v);
}
pub fn update_peer_max_streams_bidi(&mut self, v: u64) {
self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, v);
}
pub fn update_peer_max_streams_uni(&mut self, v: u64) {
self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, v);
}
pub fn readable(&self) -> StreamIter {
StreamIter::from(&self.readable)
}
pub fn writable(&self) -> StreamIter {
StreamIter::from(&self.writable)
}
pub fn almost_full(&self) -> StreamIter {
StreamIter::from(&self.almost_full)
}
pub fn has_flushable(&self) -> bool {
!self.flushable.is_empty()
}
pub fn has_almost_full(&self) -> bool {
!self.almost_full.is_empty()
}
#[cfg(test)]
pub fn iter_mut(&mut self) -> hash_map::IterMut<u64, Stream> {
self.streams.iter_mut()
}
}
#[derive(Default)]
pub struct Stream {
pub recv: RecvBuf,
pub send: SendBuf,
}
impl Stream {
pub fn new(max_rx_data: u64, max_tx_data: u64) -> Stream {
Stream {
recv: RecvBuf::new(max_rx_data),
send: SendBuf::new(max_tx_data),
}
}
pub fn is_readable(&self) -> bool {
self.recv.ready()
}
pub fn is_writable(&self) -> bool {
!self.send.shutdown &&
!self.send.is_fin() &&
self.send.off < self.send.max_data
}
pub fn is_flushable(&self) -> bool {
self.send.ready() && self.send.off() < self.send.max_data
}
}
pub fn is_local(stream_id: u64, is_server: bool) -> bool {
(stream_id & 0x1) == (is_server as u64)
}
pub fn is_bidi(stream_id: u64) -> bool {
(stream_id & 0x2) == 0
}
#[derive(Default)]
pub struct StreamIter {
streams: Vec<u64>,
}
impl StreamIter {
fn from(streams: &HashSet<u64>) -> Self {
StreamIter {
streams: streams.iter().copied().collect(),
}
}
}
impl Iterator for StreamIter {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
self.streams.pop()
}
}
impl ExactSizeIterator for StreamIter {
fn len(&self) -> usize {
self.streams.len()
}
}
#[derive(Default)]
pub struct RecvBuf {
data: BinaryHeap<RangeBuf>,
off: u64,
len: u64,
max_data: u64,
max_data_next: u64,
fin_off: Option<u64>,
drain: bool,
}
impl RecvBuf {
fn new(max_data: u64) -> RecvBuf {
RecvBuf {
max_data,
max_data_next: max_data,
..RecvBuf::default()
}
}
pub fn push(&mut self, buf: RangeBuf) -> Result<()> {
if buf.max_off() > self.max_data {
return Err(Error::FlowControl);
}
if let Some(fin_off) = self.fin_off {
if buf.max_off() > fin_off {
return Err(Error::FinalSize);
}
if buf.fin() && fin_off != buf.max_off() {
return Err(Error::FinalSize);
}
}
if buf.fin() && buf.max_off() < self.len {
return Err(Error::FinalSize);
}
if self.fin_off.is_some() && buf.is_empty() {
return Ok(());
}
if buf.fin() {
self.fin_off = Some(buf.max_off());
}
if !buf.fin() && buf.is_empty() {
return Ok(());
}
if self.off >= buf.max_off() {
if !buf.is_empty() {
return Ok(());
}
}
if self.drain {
return Ok(());
}
let mut tmp_buf = Some(buf);
while let Some(mut buf) = tmp_buf {
tmp_buf = None;
if self.off > buf.off() {
buf = buf.split_off((self.off - buf.off()) as usize);
}
for b in &self.data {
if buf.off() >= b.off() && buf.max_off() <= b.max_off() {
return Ok(());
}
if buf.off() >= b.off() && buf.off() < b.max_off() {
buf = buf.split_off((b.max_off() - buf.off()) as usize);
}
if buf.off() < b.off() && buf.max_off() > b.off() {
tmp_buf = Some(buf.split_off((b.off() - buf.off()) as usize));
}
}
self.len = cmp::max(self.len, buf.max_off());
self.data.push(buf);
}
Ok(())
}
pub fn pop(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
let mut len = 0;
let mut cap = out.len();
if !self.ready() {
return Err(Error::Done);
}
while cap > 0 && self.ready() {
let mut buf = match self.data.pop() {
Some(v) => v,
None => break,
};
if buf.len() > cap {
let new_buf = RangeBuf {
data: buf.data.split_off(cap),
off: buf.off + cap as u64,
fin: buf.fin,
};
buf.fin = false;
self.data.push(new_buf);
}
out[len..len + buf.len()].copy_from_slice(&buf.data);
self.off += buf.len() as u64;
len += buf.len();
cap -= buf.len();
}
self.max_data_next = self.max_data_next.saturating_add(len as u64);
Ok((len, self.is_fin()))
}
pub fn reset(&mut self, final_size: u64) -> Result<usize> {
if let Some(fin_off) = self.fin_off {
if fin_off != final_size {
return Err(Error::FinalSize);
}
}
if final_size < self.len {
return Err(Error::FinalSize);
}
self.fin_off = Some(final_size);
Ok((final_size - self.len) as usize)
}
pub fn update_max_data(&mut self) -> u64 {
self.max_data = self.max_data_next;
self.max_data
}
pub fn shutdown(&mut self) -> Result<()> {
if self.drain {
return Err(Error::Done);
}
self.drain = true;
self.data.clear();
Ok(())
}
pub fn almost_full(&self) -> bool {
self.fin_off.is_none() &&
self.max_data_next != self.max_data &&
self.max_data_next / 2 > self.max_data - self.len
}
pub fn is_fin(&self) -> bool {
if self.fin_off == Some(self.off) {
return true;
}
false
}
fn ready(&self) -> bool {
let buf = match self.data.peek() {
Some(v) => v,
None => return false,
};
buf.off == self.off
}
}
#[derive(Default)]
pub struct SendBuf {
data: BinaryHeap<RangeBuf>,
off: u64,
len: u64,
max_data: u64,
ack_off: u64,
fin_off: Option<u64>,
shutdown: bool,
}
impl SendBuf {
fn new(max_data: u64) -> SendBuf {
SendBuf {
max_data,
..SendBuf::default()
}
}
pub fn push_slice(
&mut self, mut data: &[u8], mut fin: bool,
) -> Result<usize> {
let mut len = 0;
if self.shutdown {
return Ok(data.len());
}
if data.is_empty() {
let buf = RangeBuf::from(&[], self.off, fin);
return self.push(buf).map(|_| 0);
}
if data.len() > self.cap() {
let len = self.cap();
data = &data[..len];
fin = false;
}
for chunk in data.chunks(MAX_WRITE_SIZE) {
len += chunk.len();
let fin = len == data.len() && fin;
let buf = RangeBuf::from(chunk, self.off, fin);
self.push(buf)?;
self.off += chunk.len() as u64;
}
Ok(data.len())
}
pub fn push(&mut self, buf: RangeBuf) -> Result<()> {
if let Some(fin_off) = self.fin_off {
if buf.max_off() > fin_off {
return Err(Error::FinalSize);
}
if buf.max_off() == fin_off && !buf.fin() {
return Err(Error::FinalSize);
}
}
if self.shutdown {
return Ok(());
}
if self.ack_off >= buf.max_off() {
return Ok(());
}
self.len += buf.len() as u64;
if buf.fin() {
self.fin_off = Some(buf.max_off());
}
if buf.is_empty() {
return Ok(());
}
self.data.push(buf);
Ok(())
}
pub fn pop(&mut self, max_data: usize) -> Result<RangeBuf> {
let mut out = RangeBuf::default();
out.data =
Vec::with_capacity(cmp::min(max_data as u64, self.len) as usize);
let mut out_len = max_data;
let mut out_off = self.data.peek().map_or_else(|| 0, RangeBuf::off);
while out_len > 0 &&
self.ready() &&
self.off() == out_off &&
self.off() < self.max_data
{
let mut buf = match self.data.pop() {
Some(v) => v,
None => break,
};
if buf.len() > out_len || buf.max_off() > self.max_data {
let new_len =
cmp::min(out_len, (self.max_data - buf.off()) as usize);
let new_buf = buf.split_off(new_len);
self.data.push(new_buf);
}
if out.is_empty() {
out.off = buf.off;
}
self.len -= buf.len() as u64;
out_len -= buf.len();
out_off = buf.max_off();
out.data.extend_from_slice(&buf.data);
}
out.fin = self.fin_off == Some(out.max_off());
Ok(out)
}
pub fn update_max_data(&mut self, max_data: u64) {
self.max_data = cmp::max(self.max_data, max_data);
}
pub fn ack(&mut self, off: u64, len: usize) {
if self.ack_off == off {
self.ack_off += len as u64;
}
}
pub fn shutdown(&mut self) -> Result<()> {
if self.shutdown {
return Err(Error::Done);
}
self.shutdown = true;
self.data.clear();
Ok(())
}
pub fn is_fin(&self) -> bool {
if self.fin_off == Some(self.off) {
return true;
}
false
}
fn ready(&self) -> bool {
!self.data.is_empty()
}
fn off(&self) -> u64 {
match self.data.peek() {
Some(v) => v.off(),
None => self.off,
}
}
pub fn cap(&self) -> usize {
(self.max_data - self.off) as usize
}
}
#[derive(Clone, Debug, Default, Eq)]
pub struct RangeBuf {
data: Vec<u8>,
off: u64,
fin: bool,
}
impl RangeBuf {
pub(crate) fn from(buf: &[u8], off: u64, fin: bool) -> RangeBuf {
RangeBuf {
data: Vec::from(buf),
off,
fin,
}
}
pub fn fin(&self) -> bool {
self.fin
}
pub fn off(&self) -> u64 {
self.off
}
pub fn max_off(&self) -> u64 {
self.off() + self.len() as u64
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn split_off(&mut self, at: usize) -> RangeBuf {
let buf = RangeBuf {
data: self.data.split_off(at),
off: self.off + at as u64,
fin: self.fin,
};
self.fin = false;
buf
}
}
impl std::ops::Deref for RangeBuf {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.data
}
}
impl std::ops::DerefMut for RangeBuf {
fn deref_mut(&mut self) -> &mut [u8] {
&mut self.data
}
}
impl Ord for RangeBuf {
fn cmp(&self, other: &RangeBuf) -> cmp::Ordering {
self.off.cmp(&other.off).reverse()
}
}
impl PartialOrd for RangeBuf {
fn partial_cmp(&self, other: &RangeBuf) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for RangeBuf {
fn eq(&self, other: &RangeBuf) -> bool {
self.off == other.off
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn empty_stream_frame() {
let mut recv = RecvBuf::new(15);
assert_eq!(recv.len, 0);
let buf = RangeBuf::from(b"hello", 0, false);
assert!(recv.push(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let mut buf = [0; 32];
assert_eq!(recv.pop(&mut buf), Ok((5, false)));
let buf = RangeBuf::from(b"", 10, false);
assert!(recv.push(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 0);
let buf = RangeBuf::from(b"", 16, false);
assert_eq!(recv.push(buf), Err(Error::FlowControl));
let buf = RangeBuf::from(b"", 5, true);
assert!(recv.push(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
let buf = RangeBuf::from(b"", 5, true);
assert!(recv.push(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
let buf = RangeBuf::from(b"aa", 3, true);
assert!(recv.push(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 1);
let buf = RangeBuf::from(b"", 6, true);
assert_eq!(recv.push(buf), Err(Error::FinalSize));
let buf = RangeBuf::from(b"", 4, true);
assert_eq!(recv.push(buf), Err(Error::FinalSize));
let mut buf = [0; 32];
assert_eq!(recv.pop(&mut buf), Ok((0, true)));
}
#[test]
fn ordered_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, true);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
assert!(recv.push(third).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 19);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"helloworldsomething");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn split_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.pop(&mut buf[..10]).unwrap();
assert_eq!(len, 10);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somethingh");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 10);
let (len, fin) = recv.pop(&mut buf[..5]).unwrap();
assert_eq!(len, 5);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"ellow");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 15);
let (len, fin) = recv.pop(&mut buf[..10]).unwrap();
assert_eq!(len, 4);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"orld");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
}
#[test]
fn incomplete_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"helloworld", 9, true);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 0);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 19);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"somethinghelloworld");
assert_eq!(recv.len, 19);
assert_eq!(recv.off, 19);
}
#[test]
fn zero_len_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"", 9, true);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
}
#[test]
fn past_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
let third = RangeBuf::from(b"ello", 4, true);
let fourth = RangeBuf::from(b"ello", 5, true);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.push(third), Err(Error::FinalSize));
assert!(recv.push(fourth).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 4, false);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"something");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read2() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 4, false);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somehello");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read3() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 8);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 9);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somhellog");
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 9);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn fully_overlapping_read_multi() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"somethingsomething", 0, false);
let second = RangeBuf::from(b"hello", 3, false);
let third = RangeBuf::from(b"hello", 12, false);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 8);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(third).is_ok());
assert_eq!(recv.len, 17);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 18);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 18);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"somhellogsomhellog");
assert_eq!(recv.len, 18);
assert_eq!(recv.off, 18);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn overlapping_start_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"something", 0, false);
let second = RangeBuf::from(b"hello", 8, true);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 13);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"somethingello");
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 13);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn overlapping_end_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"something", 3, true);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 12);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"helsomething");
assert_eq!(recv.len, 12);
assert_eq!(recv.off, 12);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn partially_multi_overlapping_reordered_read() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 8, false);
let second = RangeBuf::from(b"something", 0, false);
let third = RangeBuf::from(b"moar", 11, true);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 13);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.push(third).is_ok());
assert_eq!(recv.len, 15);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 15);
assert_eq!(fin, true);
assert_eq!(&buf[..len], b"somethinhelloar");
assert_eq!(recv.len, 15);
assert_eq!(recv.off, 15);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn partially_multi_overlapping_reordered_read2() {
let mut recv = RecvBuf::new(std::u64::MAX);
assert_eq!(recv.len, 0);
let mut buf = [0; 32];
let first = RangeBuf::from(b"aaa", 0, false);
let second = RangeBuf::from(b"bbb", 2, false);
let third = RangeBuf::from(b"ccc", 4, false);
let fourth = RangeBuf::from(b"ddd", 6, false);
let fifth = RangeBuf::from(b"eee", 9, false);
let sixth = RangeBuf::from(b"fff", 11, false);
assert!(recv.push(second).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);
assert!(recv.push(fourth).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 2);
assert!(recv.push(third).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 3);
assert!(recv.push(first).is_ok());
assert_eq!(recv.len, 9);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 4);
assert!(recv.push(sixth).is_ok());
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 5);
assert!(recv.push(fifth).is_ok());
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 6);
let (len, fin) = recv.pop(&mut buf).unwrap();
assert_eq!(len, 14);
assert_eq!(fin, false);
assert_eq!(&buf[..len], b"aabbbcdddeefff");
assert_eq!(recv.len, 14);
assert_eq!(recv.off, 14);
assert_eq!(recv.data.len(), 0);
assert_eq!(recv.pop(&mut buf), Err(Error::Done));
}
#[test]
fn empty_write() {
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let write = send.pop(std::usize::MAX).unwrap();
assert_eq!(write.len(), 0);
assert_eq!(write.fin(), false);
}
#[test]
fn multi_write() {
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert!(send.push_slice(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.push_slice(second, true).is_ok());
assert_eq!(send.len, 19);
let write = send.pop(128).unwrap();
assert_eq!(write.len(), 19);
assert_eq!(write.fin(), true);
assert_eq!(&write[..], b"somethinghelloworld");
assert_eq!(send.len, 0);
}
#[test]
fn split_write() {
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert!(send.push_slice(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.push_slice(second, true).is_ok());
assert_eq!(send.len, 19);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 10);
assert_eq!(write.fin(), false);
assert_eq!(&write[..], b"somethingh");
assert_eq!(send.len, 9);
let write = send.pop(5).unwrap();
assert_eq!(write.off(), 10);
assert_eq!(write.len(), 5);
assert_eq!(write.fin(), false);
assert_eq!(&write[..], b"ellow");
assert_eq!(send.len, 4);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 15);
assert_eq!(write.len(), 4);
assert_eq!(write.fin(), true);
assert_eq!(&write[..], b"orld");
assert_eq!(send.len, 0);
}
#[test]
fn resend() {
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
assert_eq!(send.off(), 0);
let first = b"something";
let second = b"helloworld";
assert!(send.push_slice(first, false).is_ok());
assert_eq!(send.off(), 0);
assert!(send.push_slice(second, true).is_ok());
assert_eq!(send.off(), 0);
let write1 = send.pop(4).unwrap();
assert_eq!(write1.off(), 0);
assert_eq!(write1.len(), 4);
assert_eq!(write1.fin(), false);
assert_eq!(&write1[..], b"some");
assert_eq!(send.len, 15);
assert_eq!(send.off(), 4);
let write2 = send.pop(5).unwrap();
assert_eq!(write2.off(), 4);
assert_eq!(write2.len(), 5);
assert_eq!(write2.fin(), false);
assert_eq!(&write2[..], b"thing");
assert_eq!(send.len, 10);
assert_eq!(send.off(), 9);
let write3 = send.pop(5).unwrap();
assert_eq!(write3.off(), 9);
assert_eq!(write3.len(), 5);
assert_eq!(write3.fin(), false);
assert_eq!(&write3[..], b"hello");
assert_eq!(send.len, 5);
assert_eq!(send.off(), 14);
send.push(write2).unwrap();
assert_eq!(send.len, 10);
assert_eq!(send.off(), 4);
send.push(write1).unwrap();
assert_eq!(send.len, 14);
assert_eq!(send.off(), 0);
let write4 = send.pop(11).unwrap();
assert_eq!(write4.off(), 0);
assert_eq!(write4.len(), 9);
assert_eq!(write4.fin(), false);
assert_eq!(&write4[..], b"something");
assert_eq!(send.len, 5);
assert_eq!(send.off(), 14);
let write5 = send.pop(11).unwrap();
assert_eq!(write5.off(), 14);
assert_eq!(write5.len(), 5);
assert_eq!(write5.fin(), true);
assert_eq!(&write5[..], b"world");
assert_eq!(send.len, 0);
assert_eq!(send.off(), 19);
}
#[test]
fn write_blocked_by_off() {
let mut send = SendBuf::default();
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert_eq!(send.push_slice(first, false), Ok(0));
assert_eq!(send.len, 0);
assert_eq!(send.push_slice(second, true), Ok(0));
assert_eq!(send.len, 0);
send.update_max_data(5);
assert_eq!(send.push_slice(first, false), Ok(5));
assert_eq!(send.len, 5);
assert_eq!(send.push_slice(second, true), Ok(0));
assert_eq!(send.len, 5);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 5);
assert_eq!(write.fin(), false);
assert_eq!(&write[..], b"somet");
assert_eq!(send.len, 0);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 0);
assert_eq!(write.fin(), false);
assert_eq!(&write[..], b"");
assert_eq!(send.len, 0);
send.update_max_data(15);
assert_eq!(send.push_slice(&first[5..], false), Ok(4));
assert_eq!(send.len, 4);
assert_eq!(send.push_slice(second, true), Ok(6));
assert_eq!(send.len, 10);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 5);
assert_eq!(write.len(), 10);
assert_eq!(write.fin(), false);
assert_eq!(&write[..], b"hinghellow");
assert_eq!(send.len, 0);
send.update_max_data(25);
assert_eq!(send.push_slice(&second[6..], true), Ok(4));
assert_eq!(send.len, 4);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 15);
assert_eq!(write.len(), 4);
assert_eq!(write.fin(), true);
assert_eq!(&write[..], b"orld");
assert_eq!(send.len, 0);
}
#[test]
fn zero_len_write() {
let mut send = SendBuf::new(std::u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
assert!(send.push_slice(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.push_slice(&[], true).is_ok());
assert_eq!(send.len, 9);
let write = send.pop(10).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 9);
assert_eq!(write.fin(), true);
assert_eq!(&write[..], b"something");
assert_eq!(send.len, 0);
}
#[test]
fn recv_flow_control() {
let mut stream = Stream::new(15, 0);
assert!(!stream.recv.almost_full());
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, false);
let third = RangeBuf::from(b"something", 10, false);
assert_eq!(stream.recv.push(second), Ok(()));
assert_eq!(stream.recv.push(first), Ok(()));
assert!(!stream.recv.almost_full());
assert_eq!(stream.recv.push(third), Err(Error::FlowControl));
let (len, fin) = stream.recv.pop(&mut buf).unwrap();
assert_eq!(&buf[..len], b"helloworld");
assert_eq!(fin, false);
assert!(stream.recv.almost_full());
assert_eq!(stream.recv.update_max_data(), 25);
assert!(!stream.recv.almost_full());
let third = RangeBuf::from(b"something", 10, false);
assert_eq!(stream.recv.push(third), Ok(()));
}
#[test]
fn recv_past_fin() {
let mut stream = Stream::new(15, 0);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, false);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.push(second), Err(Error::FinalSize));
}
#[test]
fn recv_fin_dup() {
let mut stream = Stream::new(15, 0);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"hello", 0, true);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.push(second), Ok(()));
let mut buf = [0; 32];
let (len, fin) = stream.recv.pop(&mut buf).unwrap();
assert_eq!(&buf[..len], b"hello");
assert_eq!(fin, true);
}
#[test]
fn recv_fin_change() {
let mut stream = Stream::new(15, 0);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, true);
assert_eq!(stream.recv.push(second), Ok(()));
assert_eq!(stream.recv.push(first), Err(Error::FinalSize));
}
#[test]
fn recv_fin_lower_than_received() {
let mut stream = Stream::new(15, 0);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"world", 5, false);
assert_eq!(stream.recv.push(second), Ok(()));
assert_eq!(stream.recv.push(first), Err(Error::FinalSize));
}
#[test]
fn recv_fin_flow_control() {
let mut stream = Stream::new(15, 0);
assert!(!stream.recv.almost_full());
let mut buf = [0; 32];
let first = RangeBuf::from(b"hello", 0, false);
let second = RangeBuf::from(b"world", 5, true);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.push(second), Ok(()));
let (len, fin) = stream.recv.pop(&mut buf).unwrap();
assert_eq!(&buf[..len], b"helloworld");
assert_eq!(fin, true);
assert!(!stream.recv.almost_full());
}
#[test]
fn recv_fin_reset_mismatch() {
let mut stream = Stream::new(15, 0);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, true);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
}
#[test]
fn recv_reset_dup() {
let mut stream = Stream::new(15, 0);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.reset(5), Ok(0));
assert_eq!(stream.recv.reset(5), Ok(0));
}
#[test]
fn recv_reset_change() {
let mut stream = Stream::new(15, 0);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.reset(5), Ok(0));
assert_eq!(stream.recv.reset(10), Err(Error::FinalSize));
}
#[test]
fn recv_reset_lower_than_received() {
let mut stream = Stream::new(15, 0);
assert!(!stream.recv.almost_full());
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.push(first), Ok(()));
assert_eq!(stream.recv.reset(4), Err(Error::FinalSize));
}
#[test]
fn send_flow_control() {
let mut stream = Stream::new(0, 15);
let first = b"hello";
let second = b"world";
let third = b"something";
assert!(stream.send.push_slice(first, false).is_ok());
assert!(stream.send.push_slice(second, false).is_ok());
assert!(stream.send.push_slice(third, false).is_ok());
let write = stream.send.pop(25).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 15);
assert_eq!(write.fin(), false);
assert_eq!(write.data, b"helloworldsomet");
let write = stream.send.pop(25).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 0);
assert_eq!(write.fin(), false);
assert_eq!(write.data, b"");
let first = RangeBuf::from(b"helloworldsomet", 0, false);
assert_eq!(stream.send.push(first), Ok(()));
let write = stream.send.pop(10).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 10);
assert_eq!(write.fin(), false);
assert_eq!(write.data, b"helloworld");
let write = stream.send.pop(10).unwrap();
assert_eq!(write.off(), 10);
assert_eq!(write.len(), 5);
assert_eq!(write.fin(), false);
assert_eq!(write.data, b"somet");
}
#[test]
fn send_past_fin() {
let mut stream = Stream::new(0, 15);
let first = b"hello";
let second = b"world";
let third = b"third";
assert_eq!(stream.send.push_slice(first, false), Ok(5));
assert_eq!(stream.send.push_slice(second, true), Ok(5));
assert!(stream.send.is_fin());
assert_eq!(stream.send.push_slice(third, false), Err(Error::FinalSize));
}
#[test]
fn send_fin_dup() {
let mut stream = Stream::new(0, 15);
let first = RangeBuf::from(b"hello", 0, true);
let second = RangeBuf::from(b"hello", 0, true);
assert_eq!(stream.send.push(first), Ok(()));
assert_eq!(stream.send.push(second), Ok(()));
}
#[test]
fn send_undo_fin() {
let mut stream = Stream::new(0, 15);
let first = b"hello";
let second = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.send.push_slice(first, true), Ok(5));
assert!(stream.send.is_fin());
assert_eq!(stream.send.push(second), Err(Error::FinalSize));
}
#[test]
fn send_fin_max_data_match() {
let mut stream = Stream::new(0, 15);
let slice = b"hellohellohello";
assert!(stream.send.push_slice(slice, true).is_ok());
let write = stream.send.pop(15).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 15);
assert_eq!(write.fin(), true);
assert_eq!(write.data, slice);
}
#[test]
fn send_fin_zero_length() {
let mut stream = Stream::new(0, 15);
assert_eq!(stream.send.push_slice(b"hello", false), Ok(5));
assert_eq!(stream.send.push_slice(b"", true), Ok(0));
assert!(stream.send.is_fin());
let write = stream.send.pop(5).unwrap();
assert_eq!(write.off(), 0);
assert_eq!(write.len(), 5);
assert_eq!(write.fin(), true);
assert_eq!(write.data, b"hello");
}
#[test]
fn recv_data_below_off() {
let mut stream = Stream::new(15, 0);
let first = RangeBuf::from(b"hello", 0, false);
assert_eq!(stream.recv.push(first), Ok(()));
let mut buf = [0; 10];
let (len, fin) = stream.recv.pop(&mut buf).unwrap();
assert_eq!(&buf[..len], b"hello");
assert_eq!(fin, false);
let first = RangeBuf::from(b"elloworld", 1, true);
assert_eq!(stream.recv.push(first), Ok(()));
let (len, fin) = stream.recv.pop(&mut buf).unwrap();
assert_eq!(&buf[..len], b"world");
assert_eq!(fin, true);
}
}