use std::cmp;
use std::collections::VecDeque;
use crate::buffers::BufSplit;
use crate::range_buf::RangeBuf;
use crate::BufFactory;
use crate::Error;
use crate::Result;
use crate::buffers::DefaultBufFactory;
use crate::ranges;
#[cfg(test)]
const SEND_BUFFER_SIZE: usize = 5;
#[cfg(not(test))]
const SEND_BUFFER_SIZE: usize = 4096;
struct SendReserve<'a, F: BufFactory> {
inner: &'a mut SendBuf<F>,
reserved: usize,
fin: bool,
}
impl<F: BufFactory> SendReserve<'_, F> {
fn append_buf(&mut self, buf: F::Buf) -> Result<()> {
let len = buf.as_ref().len();
let inner = &mut self.inner;
if len > self.reserved {
return Err(Error::BufferTooShort);
}
let fin: bool = self.reserved == len && self.fin;
let buf = RangeBuf::from_raw(buf, inner.off, fin);
inner.data.push_back(buf);
inner.off += len as u64;
inner.len += len as u64;
self.reserved -= len;
Ok(())
}
}
impl<F: BufFactory> Drop for SendReserve<'_, F> {
fn drop(&mut self) {
assert_eq!(self.reserved, 0)
}
}
#[derive(Debug, Default)]
pub struct SendBuf<F = DefaultBufFactory>
where
F: BufFactory,
{
data: VecDeque<RangeBuf<F>>,
pos: usize,
off: u64,
emit_off: u64,
len: u64,
max_data: u64,
blocked_at: Option<u64>,
fin_off: Option<u64>,
shutdown: bool,
acked: ranges::RangeSet,
error: Option<u64>,
}
impl<F: BufFactory> SendBuf<F> {
pub fn new(max_data: u64) -> SendBuf<F> {
SendBuf {
max_data,
..SendBuf::default()
}
}
fn reserve_for_write(
&mut self, mut len: usize, mut fin: bool,
) -> Result<SendReserve<'_, F>> {
let max_off = self.off + len as u64;
if len > self.cap()? {
len = self.cap()?;
fin = false;
}
if let Some(fin_off) = self.fin_off {
if max_off > fin_off {
return Err(Error::FinalSize);
}
if max_off == fin_off && !fin {
return Err(Error::FinalSize);
}
}
if fin {
self.fin_off = Some(max_off);
}
if self.ack_off() >= max_off {
return Ok(SendReserve {
inner: self,
reserved: 0,
fin,
});
}
Ok(SendReserve {
inner: self,
reserved: len,
fin,
})
}
pub fn write(&mut self, data: &[u8], fin: bool) -> Result<usize> {
let mut reserve = self.reserve_for_write(data.len(), fin)?;
if reserve.reserved == 0 {
return Ok(0);
}
let ret = reserve.reserved;
for chunk in data[..reserve.reserved].chunks(SEND_BUFFER_SIZE) {
reserve.append_buf(F::buf_from_slice(chunk))?;
}
Ok(ret)
}
pub fn append_buf(
&mut self, mut data: F::Buf, cap: usize, fin: bool,
) -> Result<(usize, Option<F::Buf>)>
where
F::Buf: BufSplit,
{
let len = data.as_ref().len();
let mut reserve = self.reserve_for_write(cap.min(len), fin)?;
if reserve.reserved == 0 {
return Ok((0, Some(data)));
}
let remainder =
(reserve.reserved < len).then(|| data.split_at(reserve.reserved));
let ret = reserve.reserved;
reserve.append_buf(data)?;
Ok((ret, remainder))
}
pub fn emit(&mut self, out: &mut [u8]) -> Result<(usize, bool)> {
let mut out_len = out.len();
let out_off = self.off_front();
let mut next_off = out_off;
while out_len > 0 {
let off_front = self.off_front();
if self.is_empty() ||
off_front >= self.off ||
off_front != next_off ||
off_front >= self.max_data
{
break;
}
let buf = match self.data.get_mut(self.pos) {
Some(v) => v,
None => break,
};
if buf.is_empty() {
self.pos += 1;
continue;
}
let buf_len = cmp::min(buf.len(), out_len);
let partial = buf_len < buf.len();
let out_pos = (next_off - out_off) as usize;
out[out_pos..out_pos + buf_len].copy_from_slice(&buf[..buf_len]);
self.len -= buf_len as u64;
out_len -= buf_len;
next_off = buf.off() + buf_len as u64;
buf.consume(buf_len);
if partial {
break;
}
self.pos += 1;
}
let fin = self.fin_off == Some(next_off);
self.emit_off = cmp::max(self.emit_off, next_off);
Ok((out.len() - out_len, fin))
}
pub fn update_max_data(&mut self, max_data: u64) {
self.max_data = cmp::max(self.max_data, max_data);
}
pub fn update_blocked_at(&mut self, blocked_at: Option<u64>) {
self.blocked_at = blocked_at;
}
pub fn blocked_at(&self) -> Option<u64> {
self.blocked_at
}
pub fn ack(&mut self, off: u64, len: usize) {
self.acked.insert(off..off + len as u64);
}
pub fn ack_and_drop(&mut self, off: u64, len: usize) {
self.ack(off, len);
let ack_off = self.ack_off();
if self.data.is_empty() {
return;
}
if off > ack_off {
return;
}
let mut drop_until = None;
for (i, buf) in self.data.iter_mut().enumerate() {
if buf.off >= ack_off {
break;
}
if buf.off < ack_off && ack_off < buf.max_off() {
break;
}
drop_until = Some(i);
}
if let Some(drop) = drop_until {
self.data.drain(..=drop);
self.pos = self.pos.saturating_sub(drop + 1);
}
}
pub fn retransmit(&mut self, off: u64, len: usize) {
let max_off = off + len as u64;
let ack_off = self.ack_off();
if self.data.is_empty() {
return;
}
if max_off <= ack_off {
return;
}
for i in 0..self.data.len() {
let buf = &mut self.data[i];
if buf.off >= max_off {
break;
}
if off > buf.max_off() {
continue;
}
let new_buf = if buf.off < max_off && max_off < buf.max_off() {
Some(buf.split_off((max_off - buf.off) as usize))
} else {
None
};
let prev_pos = buf.pos;
buf.pos = if off > buf.off && off <= buf.max_off() {
cmp::min(buf.pos, buf.start + (off - buf.off) as usize)
} else {
buf.start
};
self.pos = cmp::min(self.pos, i);
self.len += (prev_pos - buf.pos) as u64;
if let Some(b) = new_buf {
self.data.insert(i + 1, b);
}
}
}
pub fn reset(&mut self) -> (u64, u64) {
let unsent_off = cmp::max(self.off_front(), self.emit_off);
let unsent_len = self.off_back().saturating_sub(unsent_off);
self.fin_off = Some(unsent_off);
self.data.clear();
self.off = unsent_off;
self.ack(0, self.off as usize);
self.pos = 0;
self.len = 0;
(self.emit_off, unsent_len)
}
pub fn stop(&mut self, error_code: u64) -> Result<(u64, u64)> {
if self.error.is_some() {
return Err(Error::Done);
}
let (max_off, unsent) = self.reset();
self.error = Some(error_code);
Ok((max_off, unsent))
}
pub fn shutdown(&mut self) -> Result<(u64, u64)> {
if self.shutdown {
return Err(Error::Done);
}
self.shutdown = true;
Ok(self.reset())
}
pub fn off_back(&self) -> u64 {
self.off
}
pub fn off_front(&self) -> u64 {
let mut pos = self.pos;
while let Some(b) = self.data.get(pos) {
if !b.is_empty() {
return b.off();
}
pos += 1;
}
self.off
}
pub fn max_off(&self) -> u64 {
self.max_data
}
pub fn is_fin(&self) -> bool {
if self.fin_off == Some(self.off) {
return true;
}
false
}
pub fn is_complete(&self) -> bool {
if let Some(fin_off) = self.fin_off {
if self.acked == (0..fin_off) {
return true;
}
}
false
}
pub fn is_stopped(&self) -> bool {
self.error.is_some()
}
pub fn is_shutdown(&self) -> bool {
self.shutdown
}
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
pub fn ack_off(&self) -> u64 {
match self.acked.iter().next() {
Some(std::ops::Range { start: 0, end }) => end,
Some(_) | None => 0,
}
}
pub fn cap(&self) -> Result<usize> {
if let Some(e) = self.error {
return Err(Error::StreamStopped(e));
}
Ok((self.max_data - self.off) as usize)
}
#[allow(dead_code)]
pub fn bufs_count(&self) -> usize {
self.data.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empty_write() {
let mut buf = [0; 5];
let mut send = <SendBuf>::new(u64::MAX);
assert_eq!(send.len, 0);
let (written, fin) = send.emit(&mut buf).unwrap();
assert_eq!(written, 0);
assert!(!fin);
}
#[test]
fn multi_write() {
let mut buf = [0; 128];
let mut send = <SendBuf>::new(u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert!(send.write(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.write(second, true).is_ok());
assert_eq!(send.len, 19);
let (written, fin) = send.emit(&mut buf[..128]).unwrap();
assert_eq!(written, 19);
assert!(fin);
assert_eq!(&buf[..written], b"somethinghelloworld");
assert_eq!(send.len, 0);
}
#[test]
fn split_write() {
let mut buf = [0; 10];
let mut send = <SendBuf>::new(u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert!(send.write(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.write(second, true).is_ok());
assert_eq!(send.len, 19);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 10);
assert!(!fin);
assert_eq!(&buf[..written], b"somethingh");
assert_eq!(send.len, 9);
assert_eq!(send.off_front(), 10);
let (written, fin) = send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"ellow");
assert_eq!(send.len, 4);
assert_eq!(send.off_front(), 15);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 4);
assert!(fin);
assert_eq!(&buf[..written], b"orld");
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 19);
}
#[test]
fn resend() {
let mut buf = [0; 15];
let mut send = <SendBuf>::new(u64::MAX);
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 0);
let first = b"something";
let second = b"helloworld";
assert!(send.write(first, false).is_ok());
assert_eq!(send.off_front(), 0);
assert!(send.write(second, true).is_ok());
assert_eq!(send.off_front(), 0);
assert_eq!(send.len, 19);
let (written, fin) = send.emit(&mut buf[..4]).unwrap();
assert_eq!(written, 4);
assert!(!fin);
assert_eq!(&buf[..written], b"some");
assert_eq!(send.len, 15);
assert_eq!(send.off_front(), 4);
let (written, fin) = send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"thing");
assert_eq!(send.len, 10);
assert_eq!(send.off_front(), 9);
let (written, fin) = send.emit(&mut buf[..5]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"hello");
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 14);
send.retransmit(4, 5);
assert_eq!(send.len, 10);
assert_eq!(send.off_front(), 4);
send.retransmit(0, 4);
assert_eq!(send.len, 14);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..11]).unwrap();
assert_eq!(written, 9);
assert!(!fin);
assert_eq!(&buf[..written], b"something");
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 14);
let (written, fin) = send.emit(&mut buf[..11]).unwrap();
assert_eq!(written, 5);
assert!(fin);
assert_eq!(&buf[..written], b"world");
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 19);
}
#[test]
fn write_blocked_by_off() {
let mut buf = [0; 10];
let mut send = <SendBuf>::default();
assert_eq!(send.len, 0);
let first = b"something";
let second = b"helloworld";
assert_eq!(send.write(first, false), Ok(0));
assert_eq!(send.len, 0);
assert_eq!(send.write(second, true), Ok(0));
assert_eq!(send.len, 0);
send.update_max_data(5);
assert_eq!(send.write(first, false), Ok(5));
assert_eq!(send.len, 5);
assert_eq!(send.write(second, true), Ok(0));
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 5);
assert!(!fin);
assert_eq!(&buf[..written], b"somet");
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 5);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 0);
assert!(!fin);
assert_eq!(&buf[..written], b"");
assert_eq!(send.len, 0);
send.update_max_data(15);
assert_eq!(send.write(&first[5..], false), Ok(4));
assert_eq!(send.len, 4);
assert_eq!(send.write(second, true), Ok(6));
assert_eq!(send.len, 10);
assert_eq!(send.off_front(), 5);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 10);
assert!(!fin);
assert_eq!(&buf[..10], b"hinghellow");
assert_eq!(send.len, 0);
send.update_max_data(25);
assert_eq!(send.write(&second[6..], true), Ok(4));
assert_eq!(send.len, 4);
assert_eq!(send.off_front(), 15);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 4);
assert!(fin);
assert_eq!(&buf[..written], b"orld");
assert_eq!(send.len, 0);
}
#[test]
fn zero_len_write() {
let mut buf = [0; 10];
let mut send = <SendBuf>::new(u64::MAX);
assert_eq!(send.len, 0);
let first = b"something";
assert!(send.write(first, false).is_ok());
assert_eq!(send.len, 9);
assert!(send.write(&[], true).is_ok());
assert_eq!(send.len, 9);
assert_eq!(send.off_front(), 0);
let (written, fin) = send.emit(&mut buf[..10]).unwrap();
assert_eq!(written, 9);
assert!(fin);
assert_eq!(&buf[..written], b"something");
assert_eq!(send.len, 0);
}
#[test]
fn send_buf_len_on_retransmit() {
let mut buf = [0; 15];
let mut send = <SendBuf>::new(u64::MAX);
assert_eq!(send.len, 0);
assert_eq!(send.off_front(), 0);
let first = b"something";
assert!(send.write(first, false).is_ok());
assert_eq!(send.off_front(), 0);
assert_eq!(send.len, 9);
let (written, fin) = send.emit(&mut buf[..4]).unwrap();
assert_eq!(written, 4);
assert!(!fin);
assert_eq!(&buf[..written], b"some");
assert_eq!(send.len, 5);
assert_eq!(send.off_front(), 4);
send.retransmit(3, 5);
assert_eq!(send.len, 6);
assert_eq!(send.off_front(), 3);
}
#[test]
fn send_buf_final_size_retransmit() {
let mut buf = [0; 50];
let mut send = <SendBuf>::new(u64::MAX);
send.write(&buf, false).unwrap();
assert_eq!(send.off_front(), 0);
let (written, _fin) = send.emit(&mut buf).unwrap();
assert_eq!(written, buf.len());
assert_eq!(send.off_front(), buf.len() as u64);
send.retransmit(40, 10);
let (fin_off, unsent) = send.stop(0).unwrap();
assert_eq!(fin_off, 50);
assert_eq!(unsent, 0);
}
}