use mio::net::TcpStream;
use pipebuf::PBufRdWr;
use std::io::{ErrorKind, Result};
pub struct TcpLink {
max_read_unit: usize,
nodelay: bool,
pause_writes: bool,
pause_reads: bool,
pending_set_nodelay: bool,
}
impl TcpLink {
#[inline]
pub fn new() -> Self {
Self {
max_read_unit: 2048,
nodelay: false,
pause_writes: true,
pause_reads: true,
pending_set_nodelay: false,
}
}
#[inline]
pub fn set_max_read_unit(&mut self, max_read_unit: usize) {
self.max_read_unit = max_read_unit;
}
#[inline]
pub fn set_nodelay(&mut self, nodelay: bool) {
if self.nodelay != nodelay {
self.nodelay = nodelay;
self.pending_set_nodelay = true
}
}
#[inline]
pub fn set_pause_writes(&mut self, pause: bool) {
self.pause_writes = pause;
}
#[inline]
pub fn set_pause_reads(&mut self, pause: bool) {
self.pause_reads = pause;
}
pub fn process(&mut self, stream: &mut TcpStream, mut pbuf: PBufRdWr) -> Result<bool> {
let rd_activity = self.process_out(stream, pbuf.reborrow())?;
let wr_activity = self.process_in(stream, pbuf.reborrow())?;
Ok(rd_activity || wr_activity)
}
pub fn process_out(&mut self, stream: &mut TcpStream, mut pbuf: PBufRdWr) -> Result<bool> {
if self.pause_writes {
return Ok(false);
}
if self.pending_set_nodelay {
self.pending_set_nodelay = false;
retry!(stream.set_nodelay(self.nodelay))?;
}
let mut prd = pbuf.rd;
let trip = prd.tripwire();
match prd.output_to(stream, false) {
Err(ref e) if e.kind() == ErrorKind::WouldBlock => (),
Err(e) => return Err(e),
Ok(_) => {
if prd.is_empty() && prd.has_pending_eof() {
let shutdown = if prd.is_aborted() {
if !pbuf.wr.is_eof() {
pbuf.wr.abort();
}
std::net::Shutdown::Both
} else {
std::net::Shutdown::Write
};
match retry!(stream.shutdown(shutdown)) {
Err(ref e) if e.kind() == ErrorKind::WouldBlock => (),
Err(ref e) if e.kind() == ErrorKind::NotConnected => {
prd.consume_eof();
if !pbuf.wr.is_eof() {
pbuf.wr.abort();
}
}
Err(e) => return Err(e),
Ok(_) => {
prd.consume_eof();
}
}
}
}
}
Ok(prd.is_tripped(trip))
}
pub fn process_in(&mut self, stream: &mut TcpStream, pbuf: PBufRdWr) -> Result<bool> {
let mut pwr = pbuf.wr;
if self.pause_reads || pwr.is_eof() {
return Ok(false);
}
let trip = pwr.tripwire();
if let Err(e) = pwr.input_from(stream, self.max_read_unit) {
match e.kind() {
ErrorKind::ConnectionReset | ErrorKind::ConnectionAborted => pwr.abort(),
ErrorKind::WouldBlock => (),
_ => return Err(e),
}
}
Ok(pwr.is_tripped(trip))
}
}
impl Default for TcpLink {
fn default() -> Self {
Self::new()
}
}