use std::cmp;
use std::collections::VecDeque;
use crate::Error;
use crate::Result;
use crate::ranges;
use super::RangeBuf;
#[cfg(test)]
const SEND_BUFFER_SIZE: usize = 5;
#[cfg(not(test))]
const SEND_BUFFER_SIZE: usize = 4096;
#[derive(Debug, Default)]
pub struct SendBuf {
data: VecDeque<RangeBuf>,
pos: usize,
off: u64,
emit_off: u64,
len: u64,
max_data: u64,
blocked_at: Option<u64>,
fin_off: Option<u64>,
shutdown: bool,
acked: ranges::RangeSet,
error: Option<u64>,
}
impl SendBuf {
pub fn new(max_data: u64) -> SendBuf {
SendBuf {
max_data,
..SendBuf::default()
}
}
pub fn write(&mut self, mut data: &[u8], mut fin: bool) -> Result<usize> {
let max_off = self.off + data.len() as u64;
let capacity = self.cap()?;
if data.len() > capacity {
let len = capacity;
data = &data[..len];
fin = false;
}
if let Some(fin_off) = self.fin_off {
if max_off > fin_off {
return Err(Error::FinalSize);
}
if max_off == fin_off && !fin {
return Err(Error::FinalSize);
}
}
if fin {
self.fin_off = Some(max_off);
}
if self.ack_off() >= max_off {
return Ok(data.len());
}
if data.is_empty() {
return Ok(data.len());
}
let mut len = 0;
for chunk in data.chunks(SEND_BUFFER_SIZE) {
len += chunk.len();
let fin = len == data.len() && fin;
let buf = RangeBuf::from(chunk, self.off, fin);
self.data.push_back(buf);
self.off += chunk.len() as u64;
self.len += chunk.len() as u64;
}
Ok(len)
}
pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
let mut out_len = out.len();
let out_off = self.off_front();
let mut next_off = out_off;
while out_len > 0 &&
self.ready() &&
self.off_front() == next_off &&
self.off_front() < self.max_data
{
let buf = match self.data.get_mut(self.pos) {
Some(v) => v,
None => break,
};
if buf.is_empty() {
self.pos += 1;
continue;
}
let buf_len = cmp::min(buf.len(), out_len);
let partial = buf_len < buf.len();
let out_pos = (next_off - out_off) as usize;
out[out_pos..out_pos + buf_len].copy_from_slice(&buf[..buf_len]);
self.len -= buf_len as u64;
out_len -= buf_len;
next_off = buf.off() + buf_len as u64;
buf.consume(buf_len);
if partial {
break;
}
self.pos += 1;
}
let fin = self.fin_off == Some(next_off);
self.emit_off = cmp::max(self.emit_off, next_off);
Ok((out.len() - out_len, fin))
}
pub fn update_max_data(&mut self, max_data: u64) {
self.max_data = cmp::max(self.max_data, max_data);
}
pub fn update_blocked_at(&mut self, blocked_at: Option<u64>) {
self.blocked_at = blocked_at;
}
pub fn blocked_at(&self) -> Option<u64> {
self.blocked_at
}
pub fn ack(&mut self, off: u64, len: usize) {
self.acked.insert(off..off + len as u64);
}
pub fn ack_and_drop(&mut self, off: u64, len: usize) {
self.ack(off, len);
let ack_off = self.ack_off();
if self.data.is_empty() {
return;
}
if off > ack_off {
return;
}
let mut drop_until = None;
for (i, buf) in self.data.iter_mut().enumerate() {
if buf.off >= ack_off {
break;
}
if buf.off < ack_off && ack_off < buf.max_off() {
break;
}
drop_until = Some(i);
}
if let Some(drop) = drop_until {
self.data.drain(..=drop);
self.pos = self.pos.saturating_sub(drop + 1);
}
}
pub fn retransmit(&mut self, off: u64, len: usize) {
let max_off = off + len as u64;
let ack_off = self.ack_off();
if self.data.is_empty() {
return;
}
if max_off <= ack_off {
return;
}
for i in 0..self.data.len() {
let buf = &mut self.data[i];
if buf.off >= max_off {
break;
}
if off > buf.max_off() {
continue;
}
let new_buf = if buf.off < max_off && max_off < buf.max_off() {
Some(buf.split_off((max_off - buf.off) as usize))
} else {
None
};
let prev_pos = buf.pos;
buf.pos = if off > buf.off && off <= buf.max_off() {
cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
} else {
buf.start
};
self.pos = cmp::min(self.pos, i);
self.len += (prev_pos - buf.pos) as u64;
if let Some(b) = new_buf {
self.data.insert(i + 1, b);
}
}
}
pub fn reset(&mut self) -> (u64, u64) {
let unsent_off = cmp::max(self.off_front(), self.emit_off);
let unsent_len = self.off_back().saturating_sub(unsent_off);
self.fin_off = Some(unsent_off);
self.data.clear();
self.ack(0, self.off as usize);
self.pos = 0;
self.len = 0;
self.off = unsent_off;
(self.emit_off, unsent_len)
}
pub fn stop(&mut self, error_code: u64) -> Result<(u64, u64)> {
if self.error.is_some() {
return Err(Error::Done);
}
let (max_off, unsent) = self.reset();
self.error = Some(error_code);
Ok((max_off, unsent))
}
pub fn shutdown(&mut self) -> Result<(u64, u64)> {
if self.shutdown {
return Err(Error::Done);
}
self.shutdown = true;
Ok(self.reset())
}
pub fn off_back(&self) -> u64 {
self.off
}
pub fn off_front(&self) -> u64 {
let mut pos = self.pos;
while let Some(b) = self.data.get(pos) {
if !b.is_empty() {
return b.off();
}
pos += 1;
}
self.off
}
pub fn max_off(&self) -> u64 {
self.max_data
}
pub fn is_fin(&self) -> bool {
if self.fin_off == Some(self.off) {
return true;
}
false
}
pub fn is_complete(&self) -> bool {
if let Some(fin_off) = self.fin_off {
if self.acked == (0..fin_off) {
return true;
}
}
false
}
pub fn is_stopped(&self) -> bool {
self.error.is_some()
}
pub fn is_shutdown(&self) -> bool {
self.shutdown
}
pub fn ready(&self) -> bool {
!self.data.is_empty() && self.off_front() < self.off
}
pub fn ack_off(&self) -> u64 {
match self.acked.iter().next() {
Some(std::ops::Range { start: 0, end }) => end,
Some(_) | None => 0,
}
}
pub fn cap(&self) -> Result<usize> {
if let Some(e) = self.error {
return Err(Error::StreamStopped(e));
}
Ok((self.max_data - self.off) as usize)
}
#[allow(dead_code)]
pub fn bufs_count(&self) -> usize {
self.data.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_write() {
let mut buf = [0; 5];
let mut send = SendBuf::new(u64::MAX);
assert_eq!(send.len, 0);
let (written, fin) = send.emit(&mut buf).unwrap();
assert_eq!(written, 0);
assert!(!fin);
}
#[test]
fn multi_write() {
let mut buf = [0; 128];
let mut send = SendBuf::new(u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert!(send.write(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.write(second, true).is_ok());
assert_eq!(send.len, 19);
let (written, fin) = send.emit(&mut buf[..128]).unwrap();
assert_eq!(written, 19);
assert!(fin);
assert_eq!(&buf[..written], b"somethinghelloworld");
assert_eq!(send.len, 0);
}
#[test]
fn split_write() {
let mut buf = [0; 10];
let mut send = SendBuf::new(u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert!(send.write(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.write(second, true).is_ok());
assert_eq!(send.len, 19);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 10);
assert!(!fin);
assert_eq!(&buf[..written], b"somethingh");
assert_eq!(send.len, 9);
assert_eq!(send.off_front(), 10);
let (written, fin) = send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"ellow");
assert_eq!(send.len, 4);
assert_eq!(send.off_front(), 15);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 4);
assert!(fin);
assert_eq!(&buf[..written], b"orld");
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 19);
}
#[test]
fn resend() {
let mut buf = [0; 15];
let mut send = SendBuf::new(u64::MAX);
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 0);
let first = b"something";
let second = b"helloworld";
assert!(send.write(first, false).is_ok());
assert_eq!(send.off_front(), 0);
assert!(send.write(second, true).is_ok());
assert_eq!(send.off_front(), 0);
assert_eq!(send.len, 19);
let (written, fin) = send.emit(&mut buf[..4]).unwrap();
assert_eq!(written, 4);
assert!(!fin);
assert_eq!(&buf[..written], b"some");
assert_eq!(send.len, 15);
assert_eq!(send.off_front(), 4);
let (written, fin) = send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"thing");
assert_eq!(send.len, 10);
assert_eq!(send.off_front(), 9);
let (written, fin) = send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"hello");
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 14);
send.retransmit(4, 5);
assert_eq!(send.len, 10);
assert_eq!(send.off_front(), 4);
send.retransmit(0, 4);
assert_eq!(send.len, 14);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..11]).unwrap();
assert_eq!(written, 9);
assert!(!fin);
assert_eq!(&buf[..written], b"something");
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 14);
let (written, fin) = send.emit(&mut buf[..11]).unwrap();
assert_eq!(written, 5);
assert!(fin);
assert_eq!(&buf[..written], b"world");
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 19);
}
#[test]
fn write_blocked_by_off() {
let mut buf = [0; 10];
let mut send = SendBuf::default();
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert_eq!(send.write(first, false), Ok(0));
assert_eq!(send.len, 0);
assert_eq!(send.write(second, true), Ok(0));
assert_eq!(send.len, 0);
send.update_max_data(5);
assert_eq!(send.write(first, false), Ok(5));
assert_eq!(send.len, 5);
assert_eq!(send.write(second, true), Ok(0));
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"somet");
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 5);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 0);
assert!(!fin);
assert_eq!(&buf[..written], b"");
assert_eq!(send.len, 0);
send.update_max_data(15);
assert_eq!(send.write(&first[5..], false), Ok(4));
assert_eq!(send.len, 4);
assert_eq!(send.write(second, true), Ok(6));
assert_eq!(send.len, 10);
assert_eq!(send.off_front(), 5);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 10);
assert!(!fin);
assert_eq!(&buf[..10], b"hinghellow");
assert_eq!(send.len, 0);
send.update_max_data(25);
assert_eq!(send.write(&second[6..], true), Ok(4));
assert_eq!(send.len, 4);
assert_eq!(send.off_front(), 15);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 4);
assert!(fin);
assert_eq!(&buf[..written], b"orld");
assert_eq!(send.len, 0);
}
#[test]
fn zero_len_write() {
let mut buf = [0; 10];
let mut send = SendBuf::new(u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
assert!(send.write(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.write(&[], true).is_ok());
assert_eq!(send.len, 9);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 9);
assert!(fin);
assert_eq!(&buf[..written], b"something");
assert_eq!(send.len, 0);
}
#[test]
fn send_buf_len_on_retransmit() {
let mut buf = [0; 15];
let mut send = SendBuf::new(u64::MAX);
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 0);
let first = b"something";
assert!(send.write(first, false).is_ok());
assert_eq!(send.off_front(), 0);
assert_eq!(send.len, 9);
let (written, fin) = send.emit(&mut buf[..4]).unwrap();
assert_eq!(written, 4);
assert!(!fin);
assert_eq!(&buf[..written], b"some");
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 4);
send.retransmit(3, 5);
assert_eq!(send.len, 6);
assert_eq!(send.off_front(), 3);
}
#[test]
fn send_buf_final_size_retransmit() {
let mut buf = [0; 50];
let mut send = SendBuf::new(u64::MAX);
send.write(&buf, false).unwrap();
assert_eq!(send.off_front(), 0);
let (written, _fin) = send.emit(&mut buf).unwrap();
assert_eq!(written, buf.len());
assert_eq!(send.off_front(), buf.len() as u64);
send.retransmit(40, 10);
let (fin_off, unsent) = send.stop(0).unwrap();
assert_eq!(fin_off, 50);
assert_eq!(unsent, 0);
}
}