use std::cell::RefCell;
use std::fmt;
use std::io::{self as stdio, Read as _, Write};
use std::os::fd::{AsFd, OwnedFd};
use std::pin::Pin;
use std::process::Stdio;
use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::net::unix::pipe;
use crate::{Error, ExecResult, Status};
#[derive(Debug, Clone)]
pub struct IoConfig {
pub stdin: Io,
pub stdout: Io,
pub stderr: Io,
}
impl Default for IoConfig {
fn default() -> Self {
Self {
stdin: Io::Stdin,
stdout: Io::Stdout,
stderr: Io::Stderr,
}
}
}
pub(crate) type StdioCollectSink = Rc<dyn Fn(&[u8]) -> ExecResult>;
#[expect(private_interfaces, reason = "TODO")]
#[derive(Clone)]
pub enum Io {
Stdin,
Stdout,
Stderr,
Close,
Collect(StdioCollectSink),
File(Arc<std::fs::File>),
PipeSender(Rc<AsyncCell<pipe::Sender>>),
PipeReceiver(Rc<AsyncCell<pipe::Receiver>>),
}
impl fmt::Debug for Io {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Stdin => write!(f, "Stdin"),
Self::Stdout => write!(f, "Stdout"),
Self::Stderr => write!(f, "Stderr"),
Self::Close => write!(f, "Close"),
Self::Collect(_) => f.debug_tuple("Collect").finish_non_exhaustive(),
Self::File(_) => f.debug_tuple("File").finish_non_exhaustive(),
Self::PipeSender(_) => f.debug_tuple("PipeSender").finish_non_exhaustive(),
Self::PipeReceiver(_) => f.debug_tuple("PipeReceiver").finish_non_exhaustive(),
}
}
}
impl Io {
pub fn new_os_pipe() -> Result<(Io, Io), Error> {
let (r, w) = os_pipe::pipe().map_err(Error::CreatePipe)?;
let tx = Io::PipeSender(Rc::new(AsyncCell {
blocking: w.into(),
non_blocking: RefCell::new(None),
}));
let rx = Io::PipeReceiver(Rc::new(AsyncCell {
blocking: r.into(),
non_blocking: RefCell::new(None),
}));
Ok((tx, rx))
}
pub fn to_stdio(&self) -> Result<(Stdio, Option<StdioCollectSink>), Error> {
let stdio = match self {
Io::Stdin => stdio::stdin()
.as_fd()
.try_clone_to_owned()
.map_err(Error::CloneHandle)?
.into(),
Io::Stdout => stdio::stdout().into(),
Io::Stderr => stdio::stderr().into(),
Io::Close => Stdio::null(),
Io::Collect(f) => return Ok((Stdio::piped(), Some(Rc::clone(f)))),
Io::File(f) => (**f).try_clone().map_err(Error::CloneHandle)?.into(),
Io::PipeSender(tx) => tx.blocking.try_clone().map_err(Error::CloneHandle)?.into(),
Io::PipeReceiver(rx) => rx.blocking.try_clone().map_err(Error::CloneHandle)?.into(),
};
Ok((stdio, None))
}
pub async fn write_all(&self, bytes: impl AsRef<[u8]> + Send + 'static) -> ExecResult {
match self {
Self::Close | Self::PipeReceiver(_) => Err(Error::PipeClosed),
Self::Collect(sink) => sink(bytes.as_ref()),
Self::Stdin => Err(Error::PipeClosed),
Self::File(f) => {
tokio::task::spawn_blocking({
let f = Arc::clone(f);
move || (&*f).write_all(bytes.as_ref())
})
.await
.expect("no panic")
.map_err(Error::ReadWrite)?;
Ok(Status::SUCCESS)
}
Self::Stdout | Self::Stderr => {
let is_stdout = matches!(self, Self::Stdout);
tokio::task::spawn_blocking(move || {
let lock = if is_stdout {
&mut stdio::stdout().lock() as &mut dyn stdio::Write
} else {
&mut stdio::stderr().lock()
};
lock.write_all(bytes.as_ref())?;
lock.flush()
})
.await
.expect("no panic")
.map_err(Error::ReadWrite)?;
Ok(Status::SUCCESS)
}
Self::PipeSender(tx) => {
(&**tx)
.write_all(bytes.as_ref())
.await
.map_err(Error::ReadWrite)?;
Ok(Status::SUCCESS)
}
}
}
pub async fn read_to_string(&self) -> ExecResult<String> {
self.read_to_end()
.await
.and_then(|buf| String::from_utf8(buf).map_err(|_| Error::InvalidUtf8))
}
pub async fn read_to_end(&self) -> ExecResult<Vec<u8>> {
match self {
Self::Stdin => tokio::task::spawn_blocking(move || {
let mut buf = Vec::new();
stdio::stdin().lock().read_to_end(&mut buf)?;
Ok(buf)
})
.await
.expect("no panic")
.map_err(Error::ReadWrite),
Self::File(f) => tokio::task::spawn_blocking({
let f = Arc::clone(f);
move || {
let mut buf = Vec::new();
(&*f).read_to_end(&mut buf)?;
Ok(buf)
}
})
.await
.expect("no panic")
.map_err(Error::ReadWrite),
Self::PipeReceiver(rx) => {
let mut buf = Vec::new();
(&**rx)
.read_to_end(&mut buf)
.await
.map_err(Error::ReadWrite)?;
Ok(buf)
}
Self::Stdout | Self::Stderr | Self::Close | Self::Collect(_) | Self::PipeSender(_) => {
Err(Error::PipeClosed)
}
}
}
}
pub(crate) struct AsyncCell<T> {
blocking: OwnedFd,
non_blocking: RefCell<Option<T>>,
}
impl AsyncRead for &AsyncCell<pipe::Receiver> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<stdio::Result<()>> {
let mut rx = self.non_blocking.borrow_mut();
let rx = match &mut *rx {
Some(rx) => rx,
None => rx.insert(pipe::Receiver::from_owned_fd_unchecked(
self.blocking.try_clone()?,
)?),
};
Pin::new(rx).poll_read(cx, buf)
}
}
impl AsyncWrite for &AsyncCell<pipe::Sender> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, stdio::Error>> {
let mut tx = self.non_blocking.borrow_mut();
let tx = match &mut *tx {
Some(tx) => tx,
None => tx.insert(pipe::Sender::from_owned_fd_unchecked(
self.blocking.try_clone()?,
)?),
};
Pin::new(tx).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), stdio::Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), stdio::Error>> {
Poll::Ready(Ok(()))
}
}