use super::{IntoInnerError, DEFAULT_BUF_SIZE};
use duplex::HalfDuplex;
#[cfg(feature = "layered-io")]
use layered_io::{default_suggested_buffer_size, Bufferable, HalfDuplexLayered};
#[cfg(read_initializer)]
use std::io::Initializer;
use std::io::{self, BufRead, Error, ErrorKind, IoSlice, IoSliceMut, Read, Write};
use std::{cmp, fmt};
#[cfg(not(windows))]
use {
io_extras::os::rustix::{AsRawFd, RawFd},
io_lifetimes::{AsFd, BorrowedFd},
};
#[cfg(windows)]
use {
io_extras::os::windows::{
AsHandleOrSocket, AsRawHandleOrSocket, BorrowedHandleOrSocket, RawHandleOrSocket,
},
io_lifetimes::{AsHandle, AsSocket, BorrowedHandle, BorrowedSocket},
std::os::windows::io::{AsRawHandle, AsRawSocket, RawHandle, RawSocket},
};
pub struct BufDuplexer<Inner: HalfDuplex> {
inner: BufDuplexerBackend<Inner>,
}
pub(crate) struct BufDuplexerBackend<Inner: HalfDuplex> {
inner: Option<Inner>,
writer_buf: Vec<u8>,
panicked: bool,
reader_buf: Box<[u8]>,
pos: usize,
cap: usize,
}
impl<Inner: HalfDuplex> BufDuplexer<Inner> {
#[inline]
pub fn new(inner: Inner) -> Self {
Self {
inner: BufDuplexerBackend::new(inner),
}
}
#[inline]
pub fn with_capacities(reader_capacity: usize, writer_capacity: usize, inner: Inner) -> Self {
Self {
inner: BufDuplexerBackend::with_capacities(reader_capacity, writer_capacity, inner),
}
}
#[inline]
pub fn get_ref(&self) -> &Inner {
self.inner.get_ref()
}
#[inline]
pub fn get_mut(&mut self) -> &mut Inner {
self.inner.get_mut()
}
#[inline]
pub fn writer_buffer(&self) -> &[u8] {
self.inner.writer_buffer()
}
pub fn reader_buffer(&self) -> &[u8] {
self.inner.reader_buffer()
}
#[inline]
pub fn writer_capacity(&self) -> usize {
self.inner.writer_capacity()
}
pub fn reader_capacity(&self) -> usize {
self.inner.reader_capacity()
}
pub fn into_inner(self) -> Result<Inner, IntoInnerError<Self>> {
self.inner
.into_inner()
.map_err(|err| err.new_wrapped(|inner| Self { inner }))
}
}
impl<Inner: HalfDuplex> BufDuplexerBackend<Inner> {
pub fn new(inner: Inner) -> Self {
Self::with_capacities(DEFAULT_BUF_SIZE, DEFAULT_BUF_SIZE, inner)
}
pub fn with_capacities(reader_capacity: usize, writer_capacity: usize, inner: Inner) -> Self {
#[cfg(not(read_initializer))]
let buffer = vec![0; reader_capacity];
#[cfg(read_initializer)]
let buffer = unsafe {
let mut buffer = Vec::with_capacity(reader_capacity);
buffer.set_len(reader_capacity);
inner.initializer().initialize(&mut buffer);
buffer
};
Self {
inner: Some(inner),
writer_buf: Vec::with_capacity(writer_capacity),
panicked: false,
reader_buf: buffer.into_boxed_slice(),
pos: 0,
cap: 0,
}
}
pub(super) fn flush_buf(&mut self) -> io::Result<()> {
struct BufGuard<'a> {
buffer: &'a mut Vec<u8>,
written: usize,
}
impl<'a> BufGuard<'a> {
fn new(buffer: &'a mut Vec<u8>) -> Self {
Self { buffer, written: 0 }
}
fn remaining(&self) -> &[u8] {
&self.buffer[self.written..]
}
fn consume(&mut self, amt: usize) {
self.written += amt;
}
fn done(&self) -> bool {
self.written >= self.buffer.len()
}
}
impl Drop for BufGuard<'_> {
fn drop(&mut self) {
if self.written > 0 {
self.buffer.drain(..self.written);
}
}
}
let mut guard = BufGuard::new(&mut self.writer_buf);
let inner = self.inner.as_mut().unwrap();
while !guard.done() {
self.panicked = true;
let r = inner.write(guard.remaining());
self.panicked = false;
match r {
Ok(0) => {
return Err(Error::new(
ErrorKind::WriteZero,
"failed to write the buffered data",
));
}
Ok(n) => guard.consume(n),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
}
pub(super) fn write_to_buf(&mut self, buf: &[u8]) -> usize {
let available = self.writer_buf.capacity() - self.writer_buf.len();
let amt_to_buffer = available.min(buf.len());
self.writer_buf.extend_from_slice(&buf[..amt_to_buffer]);
amt_to_buffer
}
#[inline]
pub fn get_ref(&self) -> &Inner {
self.inner.as_ref().unwrap()
}
#[inline]
pub fn get_mut(&mut self) -> &mut Inner {
self.inner.as_mut().unwrap()
}
#[inline]
pub fn writer_buffer(&self) -> &[u8] {
&self.writer_buf
}
pub fn reader_buffer(&self) -> &[u8] {
&self.reader_buf[self.pos..self.cap]
}
#[inline]
pub fn writer_capacity(&self) -> usize {
self.writer_buf.capacity()
}
pub fn reader_capacity(&self) -> usize {
self.reader_buf.len()
}
pub fn into_inner(mut self) -> Result<Inner, IntoInnerError<Self>> {
match self.flush_buf() {
Err(e) => Err(IntoInnerError::new(self, e)),
Ok(()) => Ok(self.inner.take().unwrap()),
}
}
#[inline]
fn discard_reader_buffer(&mut self) {
self.pos = 0;
self.cap = 0;
}
}
impl<Inner: HalfDuplex> Write for BufDuplexer<Inner> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
#[inline]
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.inner.write_all(buf)
}
#[inline]
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
self.inner.write_vectored(bufs)
}
#[cfg(can_vector)]
#[inline]
fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl<Inner: HalfDuplex> Write for BufDuplexerBackend<Inner> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.writer_buf.len() + buf.len() > self.writer_buf.capacity() {
self.flush_buf()?;
}
if buf.len() >= self.writer_buf.capacity() {
self.panicked = true;
let r = self.get_mut().write(buf);
self.panicked = false;
r
} else {
self.writer_buf.extend_from_slice(buf);
Ok(buf.len())
}
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
if self.writer_buf.len() + buf.len() > self.writer_buf.capacity() {
self.flush_buf()?;
}
if buf.len() >= self.writer_buf.capacity() {
self.panicked = true;
let r = self.get_mut().write_all(buf);
self.panicked = false;
r
} else {
self.writer_buf.extend_from_slice(buf);
Ok(())
}
}
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
if self.writer_buf.len() + total_len > self.writer_buf.capacity() {
self.flush_buf()?;
}
if total_len >= self.writer_buf.capacity() {
self.panicked = true;
let r = self.get_mut().write_vectored(bufs);
self.panicked = false;
r
} else {
bufs.iter()
.for_each(|b| self.writer_buf.extend_from_slice(b));
Ok(total_len)
}
}
#[cfg(can_vector)]
#[inline]
fn is_write_vectored(&self) -> bool {
self.get_ref().is_write_vectored()
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.flush_buf().and_then(|()| self.get_mut().flush())
}
}
impl<Inner: HalfDuplex> Read for BufDuplexer<Inner> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.flush()?;
self.inner.read(buf)
}
#[inline]
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.inner.flush()?;
self.inner.read_vectored(bufs)
}
#[cfg(can_vector)]
#[inline]
fn is_read_vectored(&self) -> bool {
self.inner.is_read_vectored()
}
#[cfg(read_initializer)]
#[inline]
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
}
impl<Inner: HalfDuplex> Read for BufDuplexerBackend<Inner> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.pos == self.cap && buf.len() >= self.reader_buf.len() {
self.discard_reader_buffer();
return self.inner.as_mut().unwrap().read(buf);
}
let size = {
let mut rem = self.fill_buf()?;
rem.read(buf)?
};
self.consume(size);
Ok(size)
}
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
if self.pos == self.cap && total_len >= self.reader_buf.len() {
self.discard_reader_buffer();
return self.inner.as_mut().unwrap().read_vectored(bufs);
}
let size = {
let mut rem = self.fill_buf()?;
rem.read_vectored(bufs)?
};
self.consume(size);
Ok(size)
}
#[cfg(can_vector)]
fn is_read_vectored(&self) -> bool {
self.inner.as_ref().unwrap().is_read_vectored()
}
#[cfg(read_initializer)]
unsafe fn initializer(&self) -> Initializer {
self.inner.as_ref().unwrap().initializer()
}
}
impl<Inner: HalfDuplex> BufRead for BufDuplexer<Inner> {
#[inline]
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.inner.fill_buf()
}
#[inline]
fn consume(&mut self, amt: usize) {
self.inner.consume(amt)
}
#[inline]
fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize> {
self.inner.flush()?;
self.inner.read_until(byte, buf)
}
#[inline]
fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
self.inner.flush()?;
self.inner.read_line(buf)
}
}
impl<Inner: HalfDuplex> BufRead for BufDuplexerBackend<Inner> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.pos >= self.cap {
debug_assert!(self.pos == self.cap);
self.cap = self.inner.as_mut().unwrap().read(&mut self.reader_buf)?;
self.pos = 0;
}
Ok(&self.reader_buf[self.pos..self.cap])
}
fn consume(&mut self, amt: usize) {
self.pos = cmp::min(self.pos + amt, self.cap);
}
}
impl<Inner: HalfDuplex> fmt::Debug for BufDuplexer<Inner>
where
Inner: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(fmt)
}
}
impl<Inner: HalfDuplex> fmt::Debug for BufDuplexerBackend<Inner>
where
Inner: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BufDuplexer")
.field("inner", &self.inner.as_ref().unwrap())
.field(
"reader_buffer",
&format_args!("{}/{}", self.cap - self.pos, self.reader_buf.len()),
)
.field(
"writer_buffer",
&format_args!("{}/{}", self.writer_buf.len(), self.writer_buf.capacity()),
)
.finish()
}
}
impl<Inner: HalfDuplex> Drop for BufDuplexerBackend<Inner> {
fn drop(&mut self) {
if self.inner.is_some() && !self.panicked {
let _r = self.flush_buf();
}
}
}
#[cfg(not(windows))]
impl<Inner: HalfDuplex + AsRawFd> AsRawFd for BufDuplexer<Inner> {
#[inline]
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsRawHandle> AsRawHandle for BufDuplexer<Inner> {
#[inline]
fn as_raw_handle(&self) -> RawHandle {
self.inner.as_raw_handle()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsRawSocket> AsRawSocket for BufDuplexer<Inner> {
#[inline]
fn as_raw_socket(&self) -> RawSocket {
self.inner.as_raw_socket()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsRawHandleOrSocket> AsRawHandleOrSocket for BufDuplexer<Inner> {
#[inline]
fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
self.inner.as_raw_handle_or_socket()
}
}
#[cfg(not(windows))]
impl<Inner: HalfDuplex + AsRawFd> AsRawFd for BufDuplexerBackend<Inner> {
#[inline]
fn as_raw_fd(&self) -> RawFd {
self.inner.as_ref().unwrap().as_raw_fd()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsRawHandle> AsRawHandle for BufDuplexerBackend<Inner> {
#[inline]
fn as_raw_handle(&self) -> RawHandle {
self.inner.as_ref().unwrap().as_raw_handle()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsRawSocket> AsRawSocket for BufDuplexerBackend<Inner> {
#[inline]
fn as_raw_socket(&self) -> RawSocket {
self.inner.as_ref().unwrap().as_raw_socket()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsRawHandleOrSocket> AsRawHandleOrSocket for BufDuplexerBackend<Inner> {
#[inline]
fn as_raw_handle_or_socket(&self) -> RawHandleOrSocket {
self.inner.as_ref().unwrap().as_raw_handle_or_socket()
}
}
#[cfg(not(windows))]
impl<Inner: HalfDuplex + AsFd> AsFd for BufDuplexer<Inner> {
#[inline]
fn as_fd(&self) -> BorrowedFd<'_> {
self.inner.as_fd()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsHandle> AsHandle for BufDuplexer<Inner> {
#[inline]
fn as_handle(&self) -> BorrowedHandle<'_> {
self.inner.as_handle()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsSocket> AsSocket for BufDuplexer<Inner> {
#[inline]
fn as_socket(&self) -> BorrowedSocket<'_> {
self.inner.as_socket()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsHandleOrSocket> AsHandleOrSocket for BufDuplexer<Inner> {
#[inline]
fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
self.inner.as_handle_or_socket()
}
}
#[cfg(not(windows))]
impl<Inner: HalfDuplex + AsFd> AsFd for BufDuplexerBackend<Inner> {
#[inline]
fn as_fd(&self) -> BorrowedFd<'_> {
self.inner.as_ref().unwrap().as_fd()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsHandle> AsHandle for BufDuplexerBackend<Inner> {
#[inline]
fn as_handle(&self) -> BorrowedHandle<'_> {
self.inner.as_ref().unwrap().as_handle()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsSocket> AsSocket for BufDuplexerBackend<Inner> {
#[inline]
fn as_socket(&self) -> BorrowedSocket<'_> {
self.inner.as_ref().unwrap().as_socket()
}
}
#[cfg(windows)]
impl<Inner: HalfDuplex + AsHandleOrSocket> AsHandleOrSocket for BufDuplexerBackend<Inner> {
#[inline]
fn as_handle_or_socket(&self) -> BorrowedHandleOrSocket<'_> {
self.inner.as_ref().unwrap().as_handle_or_socket()
}
}
#[cfg(feature = "terminal-io")]
impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::Terminal for BufDuplexer<Inner> {}
#[cfg(feature = "terminal-io")]
impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::Terminal
for BufDuplexerBackend<Inner>
{
}
#[cfg(feature = "terminal-io")]
impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::WriteTerminal
for BufDuplexer<Inner>
{
#[inline]
fn color_support(&self) -> terminal_io::TerminalColorSupport {
self.inner.color_support()
}
#[inline]
fn color_preference(&self) -> bool {
self.inner.color_preference()
}
#[inline]
fn is_output_terminal(&self) -> bool {
self.inner.is_output_terminal()
}
}
#[cfg(feature = "terminal-io")]
impl<Inner: HalfDuplex + terminal_io::WriteTerminal> terminal_io::WriteTerminal
for BufDuplexerBackend<Inner>
{
#[inline]
fn color_support(&self) -> terminal_io::TerminalColorSupport {
self.inner.as_ref().unwrap().color_support()
}
#[inline]
fn color_preference(&self) -> bool {
self.inner.as_ref().unwrap().color_preference()
}
#[inline]
fn is_output_terminal(&self) -> bool {
match &self.inner {
Some(inner) => inner.is_output_terminal(),
None => false,
}
}
}
#[cfg(feature = "layered-io")]
impl<Inner: HalfDuplexLayered> Bufferable for BufDuplexer<Inner> {
#[inline]
fn abandon(&mut self) {
self.inner.abandon()
}
#[inline]
fn suggested_buffer_size(&self) -> usize {
self.inner.suggested_buffer_size()
}
}
#[cfg(feature = "layered-io")]
impl<Inner: HalfDuplexLayered> Bufferable for BufDuplexerBackend<Inner> {
#[inline]
fn abandon(&mut self) {
match &mut self.inner {
Some(inner) => inner.abandon(),
None => (),
}
}
#[inline]
fn suggested_buffer_size(&self) -> usize {
match &self.inner {
Some(inner) => {
std::cmp::max(inner.minimum_buffer_size(), inner.suggested_buffer_size())
}
None => default_suggested_buffer_size(self),
}
}
}