use super::{PBufState, PBufTrip, PBufWr, PipeBuf};
#[cfg(feature = "std")]
use std::io::{ErrorKind, Write};
pub struct PBufRd<'a, T: 'static = u8> {
pub(crate) pb: &'a mut PipeBuf<T>,
}
impl<'a, T: Copy + Default + 'static> PBufRd<'a, T> {
#[inline(always)]
pub fn reborrow<'b, 'r>(&'r mut self) -> PBufRd<'b, T>
where
'a: 'b,
'r: 'b,
{
PBufRd { pb: &mut *self.pb }
}
#[inline]
pub fn tripwire(&self) -> PBufTrip {
self.pb.tripwire()
}
#[inline]
pub fn is_tripped(&self, trip: PBufTrip) -> bool {
self.tripwire() != trip
}
#[inline(always)]
pub fn data(&self) -> &[T] {
&self.pb.data[self.pb.rd..self.pb.wr]
}
#[inline(always)]
pub fn data_mut(&mut self) -> &mut [T] {
&mut self.pb.data[self.pb.rd..self.pb.wr]
}
#[inline]
#[track_caller]
pub fn consume(&mut self, len: usize) {
let rd = self.pb.rd + len;
if rd > self.pb.wr {
panic_consume_overflow();
}
self.pb.rd = rd;
}
#[inline(always)]
pub fn len(&self) -> usize {
self.pb.wr - self.pb.rd
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.pb.rd == self.pb.wr
}
#[inline]
pub fn consume_push(&mut self) -> bool {
if self.pb.state == PBufState::Push {
self.pb.state = PBufState::Open;
true
} else {
false
}
}
#[inline]
pub fn consume_eof(&mut self) -> bool {
match self.pb.state {
PBufState::Closing => {
self.pb.state = PBufState::Closed;
true
}
PBufState::Aborting => {
self.pb.state = PBufState::Aborted;
true
}
_ => false,
}
}
#[inline]
pub fn has_pending_eof(&self) -> bool {
matches!(self.pb.state, PBufState::Closing | PBufState::Aborting)
}
#[inline]
pub fn is_eof(&self) -> bool {
!matches!(self.pb.state, PBufState::Open | PBufState::Push)
}
#[inline]
pub fn is_aborted(&self) -> bool {
matches!(self.pb.state, PBufState::Aborting | PBufState::Aborted)
}
#[inline]
pub fn is_done(&self) -> bool {
self.pb.is_done()
}
#[inline(always)]
pub fn state(&self) -> PBufState {
self.pb.state
}
pub fn forward(&mut self, mut dest: PBufWr<'_, T>) {
if dest.is_eof() {
return;
}
let data = self.data();
let len = data.len();
dest.space(len).copy_from_slice(data);
dest.commit(len);
self.consume(len);
if self.consume_push() {
dest.push();
}
if self.consume_eof() {
if self.is_aborted() {
dest.abort();
} else {
dest.close();
}
}
}
}
impl<'a> PBufRd<'a, u8> {
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
#[track_caller]
pub fn output_to(&mut self, sink: &mut impl Write, force_flush: bool) -> std::io::Result<()> {
while !self.is_empty() {
match sink.write(self.data()) {
Err(ref e) if e.kind() == ErrorKind::Interrupted => (),
Err(e) => return Err(e),
Ok(0) => break, Ok(len) => {
if len > self.len() {
panic!("Faulty Write implementation consumed more data than it was given");
}
self.consume(len);
}
}
}
if self.consume_push() || force_flush {
loop {
match sink.flush() {
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
Ok(()) => break,
}
}
}
Ok(())
}
}
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
impl<'a> std::io::Read for PBufRd<'a, u8> {
fn read(&mut self, data: &mut [u8]) -> Result<usize, std::io::Error> {
self.pb.read(data)
}
}
#[inline(never)]
#[cold]
#[track_caller]
fn panic_consume_overflow() -> ! {
panic!("Illegal to consume more PipeBuf bytes than are available");
}