process_wrap/tokio/core.rs
1use std::{
2 any::Any,
3 future::Future,
4 io::Result,
5 pin::Pin,
6 process::{ExitStatus, Output},
7};
8
9use futures::future::try_join3;
10#[cfg(unix)]
11use nix::{
12 sys::signal::{kill, Signal},
13 unistd::Pid,
14};
15use tokio::{
16 io::{AsyncRead, AsyncReadExt},
17 process::{Child, ChildStderr, ChildStdin, ChildStdout, Command},
18};
19
20crate::generic_wrap::Wrap!(Command, Child, ChildWrapper, |child| child);
21
22/// Wrapper for `tokio::process::Child`.
23///
24/// This trait exposes most of the functionality of the underlying [`Child`]. It is implemented for
25/// [`Child`] and by wrappers.
26///
27/// The required methods are `inner`, `inner_mut`, and `into_inner`. That provides access to the
28/// underlying `Child` and allows the wrapper to be dropped and the `Child` to be used directly if
29/// necessary.
30///
31/// It also makes it possible for all the other methods to have default implementations. Some are
32/// direct passthroughs to the underlying `Child`, while others are more complex.
33///
34/// Here's a simple example of a wrapper:
35///
36/// ```rust
37/// use process_wrap::tokio::*;
38/// use tokio::process::Child;
39///
40/// #[derive(Debug)]
41/// pub struct YourChildWrapper(Child);
42///
43/// impl ChildWrapper for YourChildWrapper {
44/// fn inner(&self) -> &dyn ChildWrapper {
45/// &self.0
46/// }
47///
48/// fn inner_mut(&mut self) -> &mut dyn ChildWrapper {
49/// &mut self.0
50/// }
51///
52/// fn into_inner(self: Box<Self>) -> Box<dyn ChildWrapper> {
53/// Box::new((*self).0)
54/// }
55/// }
56/// ```
57pub trait ChildWrapper: Any + std::fmt::Debug + Send {
58 /// Obtain a reference to the wrapped child.
59 fn inner(&self) -> &dyn ChildWrapper;
60
61 /// Obtain a mutable reference to the wrapped child.
62 fn inner_mut(&mut self) -> &mut dyn ChildWrapper;
63
64 /// Consume the current wrapper and return the wrapped child.
65 ///
66 /// Note that this may disrupt whatever the current wrapper was doing. However, wrappers must
67 /// ensure that the wrapped child is in a consistent state when this is called or they are
68 /// dropped, so that this is always safe.
69 fn into_inner(self: Box<Self>) -> Box<dyn ChildWrapper>;
70
71 /// Obtain a clone if possible.
72 ///
73 /// Some implementations may make it possible to clone the implementing structure, even though
74 /// Tokio's `Child` isn't `Clone`. In those cases, this method should be overridden.
75 fn try_clone(&self) -> Option<Box<dyn ChildWrapper>> {
76 None
77 }
78
79 /// Obtain the `Child`'s stdin.
80 ///
81 /// By default this is a passthrough to the wrapped child.
82 fn stdin(&mut self) -> &mut Option<ChildStdin> {
83 self.inner_mut().stdin()
84 }
85
86 /// Obtain the `Child`'s stdout.
87 ///
88 /// By default this is a passthrough to the wrapped child.
89 fn stdout(&mut self) -> &mut Option<ChildStdout> {
90 self.inner_mut().stdout()
91 }
92
93 /// Obtain the `Child`'s stderr.
94 ///
95 /// By default this is a passthrough to the wrapped child.
96 fn stderr(&mut self) -> &mut Option<ChildStderr> {
97 self.inner_mut().stderr()
98 }
99
100 /// Obtain the `Child`'s process ID.
101 ///
102 /// In general this should be the PID of the top-level spawned process that was spawned
103 /// However, that may vary depending on what a wrapper does.
104 ///
105 /// Returns an `Option` to resemble Tokio's API, but isn't expected to be `None` in practice.
106 fn id(&self) -> Option<u32> {
107 self.inner().id()
108 }
109
110 /// Kill the `Child` and wait for it to exit.
111 ///
112 /// By default this calls `start_kill()` and then `wait()`, which is the same way it is done on
113 /// the underlying `Child`, but that way implementing either or both of those methods will use
114 /// them when calling `kill()`, instead of requiring a stub implementation.
115 fn kill(&mut self) -> Box<dyn Future<Output = Result<()>> + Send + '_> {
116 Box::new(async {
117 self.start_kill()?;
118 self.wait().await?;
119 Ok(())
120 })
121 }
122
123 /// Kill the `Child` without waiting for it to exit.
124 ///
125 /// By default this is a passthrough to the underlying `Child`, which:
126 /// - on Unix, sends a `SIGKILL` signal to the process;
127 /// - otherwise, passes through to the `kill()` method.
128 fn start_kill(&mut self) -> Result<()> {
129 self.inner_mut().start_kill()
130 }
131
132 /// Check if the `Child` has exited without waiting, and if it has, return its exit status.
133 ///
134 /// Wrappers must ensure that repeatedly calling this (or other wait methods) after the child
135 /// has exited will always return the same result.
136 ///
137 /// By default this is a passthrough to the underlying `Child`.
138 fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
139 self.inner_mut().try_wait()
140 }
141
142 /// Wait for the `Child` to exit and return its exit status.
143 ///
144 /// Wrappers must ensure that repeatedly calling this (or other wait methods) after the child
145 /// has exited will always return the same result.
146 ///
147 /// By default this is a passthrough to the underlying `Child`.
148 fn wait(&mut self) -> Pin<Box<dyn Future<Output = Result<ExitStatus>> + Send + '_>> {
149 Box::pin(self.inner_mut().wait())
150 }
151
152 /// Wait for the `Child` to exit and return its exit status and outputs.
153 ///
154 /// Note that this method reads the child's stdout and stderr to completion into memory.
155 ///
156 /// By default this is a reimplementation of the Tokio method, so that it can use the wrapper's
157 /// `wait()` method instead of the underlying `Child`'s `wait()`.
158 fn wait_with_output(mut self: Box<Self>) -> Box<dyn Future<Output = Result<Output>> + Send>
159 where
160 Self: 'static,
161 {
162 Box::new(async move {
163 async fn read_to_end<A: AsyncRead + Unpin>(io: &mut Option<A>) -> Result<Vec<u8>> {
164 let mut vec = Vec::new();
165 if let Some(io) = io.as_mut() {
166 io.read_to_end(&mut vec).await?;
167 }
168 Ok(vec)
169 }
170
171 let mut stdout_pipe = self.stdout().take();
172 let mut stderr_pipe = self.stderr().take();
173
174 let stdout_fut = read_to_end(&mut stdout_pipe);
175 let stderr_fut = read_to_end(&mut stderr_pipe);
176
177 let (status, stdout, stderr) = try_join3(self.wait(), stdout_fut, stderr_fut).await?;
178
179 // Drop happens after `try_join` due to <https://github.com/tokio-rs/tokio/issues/4309>
180 drop(stdout_pipe);
181 drop(stderr_pipe);
182
183 Ok(Output {
184 status,
185 stdout,
186 stderr,
187 })
188 })
189 }
190
191 /// Send a signal to the `Child`.
192 ///
193 /// This method is only available on Unix. It doesn't exist on Tokio's `Child`, nor on std's. It
194 /// was introduced by command-group to abstract over the signal behaviour between process groups
195 /// and unwrapped processes.
196 #[cfg(unix)]
197 fn signal(&self, sig: i32) -> Result<()> {
198 self.inner().signal(sig)
199 }
200}
201
202impl ChildWrapper for Child {
203 fn inner(&self) -> &dyn ChildWrapper {
204 self
205 }
206 fn inner_mut(&mut self) -> &mut dyn ChildWrapper {
207 self
208 }
209 fn into_inner(self: Box<Self>) -> Box<dyn ChildWrapper> {
210 Box::new(*self)
211 }
212 fn stdin(&mut self) -> &mut Option<ChildStdin> {
213 &mut self.stdin
214 }
215 fn stdout(&mut self) -> &mut Option<ChildStdout> {
216 &mut self.stdout
217 }
218 fn stderr(&mut self) -> &mut Option<ChildStderr> {
219 &mut self.stderr
220 }
221 fn id(&self) -> Option<u32> {
222 Child::id(self)
223 }
224 fn start_kill(&mut self) -> Result<()> {
225 Child::start_kill(self)
226 }
227 fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
228 Child::try_wait(self)
229 }
230 fn wait(&mut self) -> Pin<Box<dyn Future<Output = Result<ExitStatus>> + Send + '_>> {
231 Box::pin(Child::wait(self))
232 }
233 #[cfg(unix)]
234 fn signal(&self, sig: i32) -> Result<()> {
235 if let Some(id) = self.id() {
236 kill(
237 Pid::from_raw(i32::try_from(id).map_err(std::io::Error::other)?),
238 Signal::try_from(sig)?,
239 )
240 .map_err(std::io::Error::from)
241 } else {
242 Ok(())
243 }
244 }
245}
246
247impl dyn ChildWrapper {
248 fn downcast_ref<T: 'static>(&self) -> Option<&T> {
249 (self as &dyn Any).downcast_ref()
250 }
251
252 fn is_raw_child(&self) -> bool {
253 self.downcast_ref::<Child>().is_some()
254 }
255
256 /// Obtain a reference to the underlying [`Child`].
257 pub fn inner_child(&self) -> &Child {
258 let mut inner = self;
259 while !inner.is_raw_child() {
260 inner = inner.inner();
261 }
262
263 // UNWRAP: we've just checked that it's Some with is_raw_child()
264 inner.downcast_ref().unwrap()
265 }
266
267 /// Obtain a mutable reference to the underlying [`Child`].
268 ///
269 /// Modifying the raw child may be unsound depending on the layering of wrappers.
270 pub unsafe fn inner_child_mut(&mut self) -> &mut Child {
271 let mut inner = self;
272 while !inner.is_raw_child() {
273 inner = inner.inner_mut();
274 }
275
276 // UNWRAP: we've just checked that with is_raw_child()
277 (inner as &mut dyn Any).downcast_mut().unwrap()
278 }
279
280 /// Obtain the underlying [`Child`].
281 ///
282 /// Unwrapping everything may be unsound depending on the state of the wrappers.
283 pub unsafe fn into_inner_child(self: Box<Self>) -> Child {
284 let mut inner = self;
285 while !inner.is_raw_child() {
286 inner = inner.into_inner();
287 }
288
289 // UNWRAP: we've just checked that with is_raw_child()
290 *(inner as Box<dyn Any>).downcast().unwrap()
291 }
292}