use crate::CoreError;
use crate::io::buffer::{BufferState, ReadState};
use crate::io::writer::WriterState;
use crate::reactor::{Fd, Token};
pub(crate) struct FdSlot {
pub token: Option<Token>,
pub fd: Fd,
}
#[repr(align(64))]
pub struct DrainState<F>
where
F: FnMut(&[u8]) -> bool,
{
pub(crate) stdout_slot: Option<FdSlot>,
pub(crate) stderr_slot: Option<FdSlot>,
pub(crate) stdin_slot: Option<FdSlot>,
pub(crate) buffer: BufferState,
pub(crate) writer: WriterState,
pub(crate) early_exit: Option<F>,
}
impl<F> DrainState<F>
where
F: FnMut(&[u8]) -> bool,
{
pub fn new(
stdin_fd: Option<Fd>,
stdin_buf: Option<Box<[u8]>>,
stdout_fd: Option<Fd>,
stderr_fd: Option<Fd>,
limit: usize,
early_exit: Option<F>,
) -> Result<Self, CoreError> {
let stdin_slot = if stdin_buf.is_some() {
if let Some(fd) = stdin_fd {
fd.set_nonblock()?;
Some(FdSlot { token: None, fd })
} else {
None
}
} else {
None
};
let stdout_slot = if let Some(fd) = stdout_fd {
fd.set_nonblock()?;
Some(FdSlot { token: None, fd })
} else {
None
};
let stderr_slot = if let Some(fd) = stderr_fd {
fd.set_nonblock()?;
Some(FdSlot { token: None, fd })
} else {
None
};
Ok(Self {
stdin_slot,
stdout_slot,
stderr_slot,
buffer: BufferState::new(limit),
writer: WriterState::new(stdin_buf),
early_exit,
})
}
#[inline(always)]
pub fn is_done(&self) -> bool {
self.stdin_slot.is_none() && self.stdout_slot.is_none() && self.stderr_slot.is_none()
}
#[inline(always)]
pub fn write_stdin(&mut self) -> Result<bool, CoreError> {
let fd = if let Some(s) = &self.stdin_slot {
&s.fd
} else {
return Ok(true);
};
let done = self.writer.write_to_fd(fd)?;
if done {
self.stdin_slot.take();
return Ok(true);
}
Ok(false)
}
#[inline(always)]
pub fn read_fd(&mut self, is_stdout: bool) -> Result<bool, CoreError> {
let read_state = {
let slot = if is_stdout {
&self.stdout_slot
} else {
&self.stderr_slot
};
let fd = if let Some(s) = slot {
&s.fd
} else {
return Ok(true);
};
self.buffer
.read_from_fd(fd, is_stdout, &mut self.early_exit)?
};
if read_state != ReadState::Open {
if is_stdout {
self.stdout_slot.take();
return Ok(true);
} else {
self.stderr_slot.take();
return Ok(true);
}
}
Ok(false)
}
pub(crate) fn take_all_slots(&mut self) -> Vec<FdSlot> {
let mut slots = Vec::new();
if let Some(slot) = self.stdin_slot.take() {
slots.push(slot);
}
if let Some(slot) = self.stdout_slot.take() {
slots.push(slot);
}
if let Some(slot) = self.stderr_slot.take() {
slots.push(slot);
}
slots
}
pub(crate) fn register_with_reactor(
&mut self,
reactor: &mut crate::reactor::Reactor,
) -> Result<(), CoreError> {
if let Some(mut slot) = self.stdin_slot.take() {
slot.token = Some(reactor.add(&slot.fd, false, true)?);
self.stdin_slot = Some(slot);
}
if let Some(mut slot) = self.stdout_slot.take() {
slot.token = Some(reactor.add(&slot.fd, true, false)?);
self.stdout_slot = Some(slot);
}
if let Some(mut slot) = self.stderr_slot.take() {
slot.token = Some(reactor.add(&slot.fd, true, false)?);
self.stderr_slot = Some(slot);
}
Ok(())
}
pub(crate) fn stdout_matches(&self, token: Token) -> bool {
self.stdout_slot
.as_ref()
.is_some_and(|slot| slot.token == Some(token))
}
pub(crate) fn stderr_matches(&self, token: Token) -> bool {
self.stderr_slot
.as_ref()
.is_some_and(|slot| slot.token == Some(token))
}
pub(crate) fn stdin_matches(&self, token: Token) -> bool {
self.stdin_slot
.as_ref()
.is_some_and(|slot| slot.token == Some(token))
}
pub(crate) fn drop_stdout(
&mut self,
reactor: &mut crate::reactor::Reactor,
) -> Result<(), CoreError> {
if let Some(slot) = self.stdout_slot.take() {
reactor.del(&slot.fd)?;
}
Ok(())
}
pub(crate) fn drop_stderr(
&mut self,
reactor: &mut crate::reactor::Reactor,
) -> Result<(), CoreError> {
if let Some(slot) = self.stderr_slot.take() {
reactor.del(&slot.fd)?;
}
Ok(())
}
pub(crate) fn drop_stdin(
&mut self,
reactor: &mut crate::reactor::Reactor,
) -> Result<(), CoreError> {
if let Some(slot) = self.stdin_slot.take() {
reactor.del(&slot.fd)?;
}
self.writer.buf = None;
Ok(())
}
pub(crate) fn handle_stdout_ready(
&mut self,
reactor: &mut crate::reactor::Reactor,
) -> Result<(), CoreError> {
if let Some(slot) = &self.stdout_slot {
let read_state = self
.buffer
.read_from_fd(&slot.fd, true, &mut self.early_exit)?;
if read_state != ReadState::Open {
self.drop_stdout(reactor)?;
}
}
Ok(())
}
pub(crate) fn handle_stderr_ready(
&mut self,
reactor: &mut crate::reactor::Reactor,
) -> Result<(), CoreError> {
if let Some(slot) = &self.stderr_slot {
let read_state = self
.buffer
.read_from_fd(&slot.fd, false, &mut self.early_exit)?;
if read_state != ReadState::Open {
self.drop_stderr(reactor)?;
}
}
Ok(())
}
pub(crate) fn handle_stdin_writable(
&mut self,
reactor: &mut crate::reactor::Reactor,
) -> Result<(), CoreError> {
if let Some(slot) = &self.stdin_slot {
let done = self.writer.write_to_fd(&slot.fd)?;
if done {
self.drop_stdin(reactor)?;
}
}
Ok(())
}
pub fn into_parts(mut self) -> (Vec<u8>, Vec<u8>) {
let (stdout, stderr, _, _) = std::mem::take(&mut self.buffer).into_parts();
(stdout, stderr)
}
#[inline(always)]
pub fn output_limit_exceeded(&self) -> bool {
self.buffer.output_limit_exceeded()
}
#[inline(always)]
pub fn stdout_early_exited(&self) -> bool {
self.buffer.stdout_early_exited()
}
pub(crate) fn into_parts_with_state(mut self) -> (Vec<u8>, Vec<u8>, bool, bool) {
std::mem::take(&mut self.buffer).into_parts()
}
}