ghoti_exec/
io.rs

1use std::cell::RefCell;
2use std::fmt;
3use std::io::{self as stdio, Read as _, Write};
4use std::os::fd::{AsFd, OwnedFd};
5use std::pin::Pin;
6use std::process::Stdio;
7use std::rc::Rc;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
12use tokio::net::unix::pipe;
13
14use crate::{Error, ExecResult, Status};
15
16#[derive(Debug, Clone)]
17pub struct IoConfig {
18    pub stdin: Io,
19    pub stdout: Io,
20    pub stderr: Io,
21}
22
23impl Default for IoConfig {
24    fn default() -> Self {
25        Self {
26            stdin: Io::Stdin,
27            stdout: Io::Stdout,
28            stderr: Io::Stderr,
29        }
30    }
31}
32
33pub(crate) type StdioCollectSink = Rc<dyn Fn(&[u8]) -> ExecResult>;
34
35#[expect(private_interfaces, reason = "TODO")]
36#[derive(Clone)]
37pub enum Io {
38    Stdin,
39    Stdout,
40    Stderr,
41    Close,
42    Collect(StdioCollectSink),
43
44    File(Arc<std::fs::File>),
45    PipeSender(Rc<AsyncCell<pipe::Sender>>),
46    PipeReceiver(Rc<AsyncCell<pipe::Receiver>>),
47}
48
49impl fmt::Debug for Io {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        match self {
52            Self::Stdin => write!(f, "Stdin"),
53            Self::Stdout => write!(f, "Stdout"),
54            Self::Stderr => write!(f, "Stderr"),
55            Self::Close => write!(f, "Close"),
56            Self::Collect(_) => f.debug_tuple("Collect").finish_non_exhaustive(),
57
58            Self::File(_) => f.debug_tuple("File").finish_non_exhaustive(),
59            Self::PipeSender(_) => f.debug_tuple("PipeSender").finish_non_exhaustive(),
60            Self::PipeReceiver(_) => f.debug_tuple("PipeReceiver").finish_non_exhaustive(),
61        }
62    }
63}
64
65impl Io {
66    /// Returns: (tx, rx)
67    pub fn new_os_pipe() -> Result<(Io, Io), Error> {
68        let (r, w) = os_pipe::pipe().map_err(Error::CreatePipe)?;
69        let tx = Io::PipeSender(Rc::new(AsyncCell {
70            blocking: w.into(),
71            non_blocking: RefCell::new(None),
72        }));
73        let rx = Io::PipeReceiver(Rc::new(AsyncCell {
74            blocking: r.into(),
75            non_blocking: RefCell::new(None),
76        }));
77        Ok((tx, rx))
78    }
79
80    pub fn to_stdio(&self) -> Result<(Stdio, Option<StdioCollectSink>), Error> {
81        let stdio = match self {
82            Io::Stdin => stdio::stdin()
83                .as_fd()
84                .try_clone_to_owned()
85                .map_err(Error::CloneHandle)?
86                .into(),
87            Io::Stdout => stdio::stdout().into(),
88            Io::Stderr => stdio::stderr().into(),
89            Io::Close => Stdio::null(),
90            Io::Collect(f) => return Ok((Stdio::piped(), Some(Rc::clone(f)))),
91            Io::File(f) => (**f).try_clone().map_err(Error::CloneHandle)?.into(),
92            Io::PipeSender(tx) => tx.blocking.try_clone().map_err(Error::CloneHandle)?.into(),
93            Io::PipeReceiver(rx) => rx.blocking.try_clone().map_err(Error::CloneHandle)?.into(),
94        };
95        Ok((stdio, None))
96    }
97
98    pub async fn write_all(&self, bytes: impl AsRef<[u8]> + Send + 'static) -> ExecResult {
99        match self {
100            Self::Close | Self::PipeReceiver(_) => Err(Error::PipeClosed),
101            Self::Collect(sink) => sink(bytes.as_ref()),
102            // FIXME
103            Self::Stdin => Err(Error::PipeClosed),
104            Self::File(f) => {
105                tokio::task::spawn_blocking({
106                    let f = Arc::clone(f);
107                    // `std::fs::File` does not need flush.
108                    move || (&*f).write_all(bytes.as_ref())
109                })
110                .await
111                .expect("no panic")
112                .map_err(Error::ReadWrite)?;
113                Ok(Status::SUCCESS)
114            }
115            // FIXME: Should bypass std's lock to avoid deadlocks when using print* macros.
116            Self::Stdout | Self::Stderr => {
117                let is_stdout = matches!(self, Self::Stdout);
118                tokio::task::spawn_blocking(move || {
119                    let lock = if is_stdout {
120                        &mut stdio::stdout().lock() as &mut dyn stdio::Write
121                    } else {
122                        &mut stdio::stderr().lock()
123                    };
124                    lock.write_all(bytes.as_ref())?;
125                    lock.flush()
126                })
127                .await
128                .expect("no panic")
129                .map_err(Error::ReadWrite)?;
130                Ok(Status::SUCCESS)
131            }
132            Self::PipeSender(tx) => {
133                (&**tx)
134                    .write_all(bytes.as_ref())
135                    .await
136                    .map_err(Error::ReadWrite)?;
137                Ok(Status::SUCCESS)
138            }
139        }
140    }
141
142    pub async fn read_to_string(&self) -> ExecResult<String> {
143        self.read_to_end()
144            .await
145            .and_then(|buf| String::from_utf8(buf).map_err(|_| Error::InvalidUtf8))
146    }
147
148    pub async fn read_to_end(&self) -> ExecResult<Vec<u8>> {
149        match self {
150            Self::Stdin => tokio::task::spawn_blocking(move || {
151                let mut buf = Vec::new();
152                stdio::stdin().lock().read_to_end(&mut buf)?;
153                Ok(buf)
154            })
155            .await
156            .expect("no panic")
157            .map_err(Error::ReadWrite),
158            Self::File(f) => tokio::task::spawn_blocking({
159                let f = Arc::clone(f);
160                move || {
161                    let mut buf = Vec::new();
162                    (&*f).read_to_end(&mut buf)?;
163                    Ok(buf)
164                }
165            })
166            .await
167            .expect("no panic")
168            .map_err(Error::ReadWrite),
169            Self::PipeReceiver(rx) => {
170                let mut buf = Vec::new();
171                (&**rx)
172                    .read_to_end(&mut buf)
173                    .await
174                    .map_err(Error::ReadWrite)?;
175                Ok(buf)
176            }
177            Self::Stdout | Self::Stderr | Self::Close | Self::Collect(_) | Self::PipeSender(_) => {
178                Err(Error::PipeClosed)
179            }
180        }
181    }
182}
183
184pub(crate) struct AsyncCell<T> {
185    blocking: OwnedFd,
186    non_blocking: RefCell<Option<T>>,
187}
188
189impl AsyncRead for &AsyncCell<pipe::Receiver> {
190    fn poll_read(
191        self: Pin<&mut Self>,
192        cx: &mut Context<'_>,
193        buf: &mut ReadBuf<'_>,
194    ) -> Poll<stdio::Result<()>> {
195        let mut rx = self.non_blocking.borrow_mut();
196        let rx = match &mut *rx {
197            Some(rx) => rx,
198            None => rx.insert(pipe::Receiver::from_owned_fd_unchecked(
199                self.blocking.try_clone()?,
200            )?),
201        };
202        Pin::new(rx).poll_read(cx, buf)
203    }
204}
205
206impl AsyncWrite for &AsyncCell<pipe::Sender> {
207    fn poll_write(
208        self: Pin<&mut Self>,
209        cx: &mut Context<'_>,
210        buf: &[u8],
211    ) -> Poll<Result<usize, stdio::Error>> {
212        let mut tx = self.non_blocking.borrow_mut();
213        let tx = match &mut *tx {
214            Some(tx) => tx,
215            None => tx.insert(pipe::Sender::from_owned_fd_unchecked(
216                self.blocking.try_clone()?,
217            )?),
218        };
219        Pin::new(tx).poll_write(cx, buf)
220    }
221
222    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), stdio::Error>> {
223        Poll::Ready(Ok(()))
224    }
225
226    fn poll_shutdown(
227        self: Pin<&mut Self>,
228        _cx: &mut Context<'_>,
229    ) -> Poll<Result<(), stdio::Error>> {
230        Poll::Ready(Ok(()))
231    }
232}