use mio::net::UnixStream;
use pipebuf::PBufRdWr;
use std::io::{ErrorKind, Result};
pub struct UnixStreamLink {
max_read_unit: usize,
pause_writes: bool,
pause_reads: bool,
}
impl UnixStreamLink {
#[inline]
pub fn new() -> Self {
Self {
max_read_unit: 2048,
pause_writes: true,
pause_reads: true,
}
}
#[inline]
pub fn set_max_read_unit(&mut self, max_read_unit: usize) {
self.max_read_unit = max_read_unit;
}
#[inline]
pub fn set_pause_writes(&mut self, pause: bool) {
self.pause_writes = pause;
}
#[inline]
pub fn set_pause_reads(&mut self, pause: bool) {
self.pause_reads = pause;
}
pub fn process(&mut self, stream: &mut UnixStream, mut pbuf: PBufRdWr) -> Result<bool> {
let rd_activity = self.process_out(stream, pbuf.reborrow())?;
let wr_activity = self.process_in(stream, pbuf.reborrow())?;
Ok(rd_activity || wr_activity)
}
pub fn process_out(&mut self, stream: &mut UnixStream, mut pbuf: PBufRdWr) -> Result<bool> {
if self.pause_writes {
return Ok(false);
}
let mut prd = pbuf.rd;
let trip = prd.tripwire();
match prd.output_to(stream, false) {
Err(ref e) if e.kind() == ErrorKind::WouldBlock => (),
Err(e) => return Err(e),
Ok(_) => {
if prd.is_empty() && prd.has_pending_eof() {
let shutdown = if prd.is_aborted() {
if !pbuf.wr.is_eof() {
pbuf.wr.abort();
}
std::net::Shutdown::Both
} else {
std::net::Shutdown::Write
};
match retry!(stream.shutdown(shutdown)) {
Err(ref e) if e.kind() == ErrorKind::WouldBlock => (),
Err(e) => return Err(e),
Ok(_) => {
prd.consume_eof();
}
}
}
}
}
Ok(prd.is_tripped(trip))
}
pub fn process_in(&mut self, stream: &mut UnixStream, pbuf: PBufRdWr) -> Result<bool> {
let mut pwr = pbuf.wr;
if self.pause_reads || pwr.is_eof() {
return Ok(false);
}
let trip = pwr.tripwire();
if let Err(e) = pwr.input_from(stream, self.max_read_unit) {
match e.kind() {
ErrorKind::ConnectionReset | ErrorKind::ConnectionAborted => pwr.abort(),
ErrorKind::WouldBlock => (),
_ => return Err(e),
}
}
Ok(pwr.is_tripped(trip))
}
}
impl Default for UnixStreamLink {
fn default() -> Self {
Self::new()
}
}