1use std::cell::RefCell;
2use std::fmt;
3use std::io::{self as stdio, Read as _, Write};
4use std::os::fd::{AsFd, OwnedFd};
5use std::pin::Pin;
6use std::process::Stdio;
7use std::rc::Rc;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
12use tokio::net::unix::pipe;
13
14use crate::{Error, ExecResult, Status};
15
16#[derive(Debug, Clone)]
17pub struct IoConfig {
18 pub stdin: Io,
19 pub stdout: Io,
20 pub stderr: Io,
21}
22
23impl Default for IoConfig {
24 fn default() -> Self {
25 Self {
26 stdin: Io::Stdin,
27 stdout: Io::Stdout,
28 stderr: Io::Stderr,
29 }
30 }
31}
32
33pub(crate) type StdioCollectSink = Rc<dyn Fn(&[u8]) -> ExecResult>;
34
35#[expect(private_interfaces, reason = "TODO")]
36#[derive(Clone)]
37pub enum Io {
38 Stdin,
39 Stdout,
40 Stderr,
41 Close,
42 Collect(StdioCollectSink),
43
44 File(Arc<std::fs::File>),
45 PipeSender(Rc<AsyncCell<pipe::Sender>>),
46 PipeReceiver(Rc<AsyncCell<pipe::Receiver>>),
47}
48
49impl fmt::Debug for Io {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 match self {
52 Self::Stdin => write!(f, "Stdin"),
53 Self::Stdout => write!(f, "Stdout"),
54 Self::Stderr => write!(f, "Stderr"),
55 Self::Close => write!(f, "Close"),
56 Self::Collect(_) => f.debug_tuple("Collect").finish_non_exhaustive(),
57
58 Self::File(_) => f.debug_tuple("File").finish_non_exhaustive(),
59 Self::PipeSender(_) => f.debug_tuple("PipeSender").finish_non_exhaustive(),
60 Self::PipeReceiver(_) => f.debug_tuple("PipeReceiver").finish_non_exhaustive(),
61 }
62 }
63}
64
65impl Io {
66 pub fn new_os_pipe() -> Result<(Io, Io), Error> {
68 let (r, w) = os_pipe::pipe().map_err(Error::CreatePipe)?;
69 let tx = Io::PipeSender(Rc::new(AsyncCell {
70 blocking: w.into(),
71 non_blocking: RefCell::new(None),
72 }));
73 let rx = Io::PipeReceiver(Rc::new(AsyncCell {
74 blocking: r.into(),
75 non_blocking: RefCell::new(None),
76 }));
77 Ok((tx, rx))
78 }
79
80 pub fn to_stdio(&self) -> Result<(Stdio, Option<StdioCollectSink>), Error> {
81 let stdio = match self {
82 Io::Stdin => stdio::stdin()
83 .as_fd()
84 .try_clone_to_owned()
85 .map_err(Error::CloneHandle)?
86 .into(),
87 Io::Stdout => stdio::stdout().into(),
88 Io::Stderr => stdio::stderr().into(),
89 Io::Close => Stdio::null(),
90 Io::Collect(f) => return Ok((Stdio::piped(), Some(Rc::clone(f)))),
91 Io::File(f) => (**f).try_clone().map_err(Error::CloneHandle)?.into(),
92 Io::PipeSender(tx) => tx.blocking.try_clone().map_err(Error::CloneHandle)?.into(),
93 Io::PipeReceiver(rx) => rx.blocking.try_clone().map_err(Error::CloneHandle)?.into(),
94 };
95 Ok((stdio, None))
96 }
97
98 pub async fn write_all(&self, bytes: impl AsRef<[u8]> + Send + 'static) -> ExecResult {
99 match self {
100 Self::Close | Self::PipeReceiver(_) => Err(Error::PipeClosed),
101 Self::Collect(sink) => sink(bytes.as_ref()),
102 Self::Stdin => Err(Error::PipeClosed),
104 Self::File(f) => {
105 tokio::task::spawn_blocking({
106 let f = Arc::clone(f);
107 move || (&*f).write_all(bytes.as_ref())
109 })
110 .await
111 .expect("no panic")
112 .map_err(Error::ReadWrite)?;
113 Ok(Status::SUCCESS)
114 }
115 Self::Stdout | Self::Stderr => {
117 let is_stdout = matches!(self, Self::Stdout);
118 tokio::task::spawn_blocking(move || {
119 let lock = if is_stdout {
120 &mut stdio::stdout().lock() as &mut dyn stdio::Write
121 } else {
122 &mut stdio::stderr().lock()
123 };
124 lock.write_all(bytes.as_ref())?;
125 lock.flush()
126 })
127 .await
128 .expect("no panic")
129 .map_err(Error::ReadWrite)?;
130 Ok(Status::SUCCESS)
131 }
132 Self::PipeSender(tx) => {
133 (&**tx)
134 .write_all(bytes.as_ref())
135 .await
136 .map_err(Error::ReadWrite)?;
137 Ok(Status::SUCCESS)
138 }
139 }
140 }
141
142 pub async fn read_to_string(&self) -> ExecResult<String> {
143 self.read_to_end()
144 .await
145 .and_then(|buf| String::from_utf8(buf).map_err(|_| Error::InvalidUtf8))
146 }
147
148 pub async fn read_to_end(&self) -> ExecResult<Vec<u8>> {
149 match self {
150 Self::Stdin => tokio::task::spawn_blocking(move || {
151 let mut buf = Vec::new();
152 stdio::stdin().lock().read_to_end(&mut buf)?;
153 Ok(buf)
154 })
155 .await
156 .expect("no panic")
157 .map_err(Error::ReadWrite),
158 Self::File(f) => tokio::task::spawn_blocking({
159 let f = Arc::clone(f);
160 move || {
161 let mut buf = Vec::new();
162 (&*f).read_to_end(&mut buf)?;
163 Ok(buf)
164 }
165 })
166 .await
167 .expect("no panic")
168 .map_err(Error::ReadWrite),
169 Self::PipeReceiver(rx) => {
170 let mut buf = Vec::new();
171 (&**rx)
172 .read_to_end(&mut buf)
173 .await
174 .map_err(Error::ReadWrite)?;
175 Ok(buf)
176 }
177 Self::Stdout | Self::Stderr | Self::Close | Self::Collect(_) | Self::PipeSender(_) => {
178 Err(Error::PipeClosed)
179 }
180 }
181 }
182}
183
184pub(crate) struct AsyncCell<T> {
185 blocking: OwnedFd,
186 non_blocking: RefCell<Option<T>>,
187}
188
189impl AsyncRead for &AsyncCell<pipe::Receiver> {
190 fn poll_read(
191 self: Pin<&mut Self>,
192 cx: &mut Context<'_>,
193 buf: &mut ReadBuf<'_>,
194 ) -> Poll<stdio::Result<()>> {
195 let mut rx = self.non_blocking.borrow_mut();
196 let rx = match &mut *rx {
197 Some(rx) => rx,
198 None => rx.insert(pipe::Receiver::from_owned_fd_unchecked(
199 self.blocking.try_clone()?,
200 )?),
201 };
202 Pin::new(rx).poll_read(cx, buf)
203 }
204}
205
206impl AsyncWrite for &AsyncCell<pipe::Sender> {
207 fn poll_write(
208 self: Pin<&mut Self>,
209 cx: &mut Context<'_>,
210 buf: &[u8],
211 ) -> Poll<Result<usize, stdio::Error>> {
212 let mut tx = self.non_blocking.borrow_mut();
213 let tx = match &mut *tx {
214 Some(tx) => tx,
215 None => tx.insert(pipe::Sender::from_owned_fd_unchecked(
216 self.blocking.try_clone()?,
217 )?),
218 };
219 Pin::new(tx).poll_write(cx, buf)
220 }
221
222 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), stdio::Error>> {
223 Poll::Ready(Ok(()))
224 }
225
226 fn poll_shutdown(
227 self: Pin<&mut Self>,
228 _cx: &mut Context<'_>,
229 ) -> Poll<Result<(), stdio::Error>> {
230 Poll::Ready(Ok(()))
231 }
232}