process_wrap/tokio/core.rs
1use std::{
2 future::Future,
3 io::Result,
4 process::{ExitStatus, Output},
5};
6
7use futures::future::try_join3;
8#[cfg(unix)]
9use nix::{
10 sys::signal::{kill, Signal},
11 unistd::Pid,
12};
13use tokio::{
14 io::{AsyncRead, AsyncReadExt},
15 process::{Child, ChildStderr, ChildStdin, ChildStdout, Command},
16};
17
18crate::generic_wrap::Wrap!(
19 TokioCommandWrap,
20 Command,
21 TokioCommandWrapper,
22 Child,
23 TokioChildWrapper,
24 |child| child
25);
26
27crate::generic_wrap::MaybeAnyTrait! {
28/// Wrapper for `tokio::process::Child`.
29///
30/// This trait exposes most of the functionality of the underlying [`Child`]. It is implemented for
31/// [`Child`] and by wrappers.
32///
33/// The required methods are `inner`, `inner_mut`, and `into_inner`. That provides access to the
34/// underlying `Child` and allows the wrapper to be dropped and the `Child` to be used directly if
35/// necessary.
36///
37/// It also makes it possible for all the other methods to have default implementations. Some are
38/// direct passthroughs to the underlying `Child`, while others are more complex.
39///
40/// Here's a simple example of a wrapper:
41///
42/// ```rust
43/// use process_wrap::tokio::*;
44/// use tokio::process::Child;
45///
46/// #[derive(Debug)]
47/// pub struct YourChildWrapper(Child);
48///
49/// impl TokioChildWrapper for YourChildWrapper {
50/// fn inner(&self) -> &Child {
51/// &self.0
52/// }
53///
54/// fn inner_mut(&mut self) -> &mut Child {
55/// &mut self.0
56/// }
57///
58/// fn into_inner(self: Box<Self>) -> Child {
59/// (*self).0
60/// }
61/// }
62/// ```
63pub trait TokioChildWrapper {
64 /// Obtain a reference to the underlying `Child`.
65 fn inner(&self) -> &Child;
66
67 /// Obtain a mutable reference to the underlying `Child`.
68 fn inner_mut(&mut self) -> &mut Child;
69
70 /// Consume the wrapper and return the underlying `Child`.
71 ///
72 /// Note that this may disrupt whatever the wrappers were doing. However, wrappers must ensure
73 /// that the `Child` is in a consistent state when this is called or they are dropped, so that
74 /// this is always safe.
75 fn into_inner(self: Box<Self>) -> Child;
76
77 /// Obtain a clone if possible.
78 ///
79 /// Some implementations may make it possible to clone the implementing structure, even though
80 /// Tokio's `Child` isn't `Clone`. In those cases, this method should be overridden.
81 fn try_clone(&self) -> Option<Box<dyn TokioChildWrapper>> {
82 None
83 }
84
85 /// Obtain the `Child`'s stdin.
86 ///
87 /// By default this is a passthrough to the underlying `Child`.
88 fn stdin(&mut self) -> &mut Option<ChildStdin> {
89 &mut self.inner_mut().stdin
90 }
91
92 /// Obtain the `Child`'s stdout.
93 ///
94 /// By default this is a passthrough to the underlying `Child`.
95 fn stdout(&mut self) -> &mut Option<ChildStdout> {
96 &mut self.inner_mut().stdout
97 }
98
99 /// Obtain the `Child`'s stderr.
100 ///
101 /// By default this is a passthrough to the underlying `Child`.
102 fn stderr(&mut self) -> &mut Option<ChildStderr> {
103 &mut self.inner_mut().stderr
104 }
105
106 /// Obtain the `Child`'s process ID.
107 ///
108 /// In general this should be the PID of the top-level spawned process that was spawned
109 /// However, that may vary depending on what a wrapper does.
110 ///
111 /// Returns an `Option` to resemble Tokio's API, but isn't expected to be `None` in practice.
112 fn id(&self) -> Option<u32> {
113 self.inner().id()
114 }
115
116 /// Kill the `Child` and wait for it to exit.
117 ///
118 /// By default this calls `start_kill()` and then `wait()`, which is the same way it is done on
119 /// the underlying `Child`, but that way implementing either or both of those methods will use
120 /// them when calling `kill()`, instead of requiring a stub implementation.
121 fn kill(&mut self) -> Box<dyn Future<Output = Result<()>> + Send + '_> {
122 Box::new(async {
123 self.start_kill()?;
124 Box::into_pin(self.wait()).await?;
125 Ok(())
126 })
127 }
128
129 /// Kill the `Child` without waiting for it to exit.
130 ///
131 /// By default this is a passthrough to the underlying `Child`, which:
132 /// - on Unix, sends a `SIGKILL` signal to the process;
133 /// - otherwise, passes through to the `kill()` method.
134 fn start_kill(&mut self) -> Result<()> {
135 self.inner_mut().start_kill()
136 }
137
138 /// Check if the `Child` has exited without waiting, and if it has, return its exit status.
139 ///
140 /// Wrappers must ensure that repeatedly calling this (or other wait methods) after the child
141 /// has exited will always return the same result.
142 ///
143 /// By default this is a passthrough to the underlying `Child`.
144 fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
145 self.inner_mut().try_wait()
146 }
147
148 /// Wait for the `Child` to exit and return its exit status.
149 ///
150 /// Wrappers must ensure that repeatedly calling this (or other wait methods) after the child
151 /// has exited will always return the same result.
152 ///
153 /// By default this is a passthrough to the underlying `Child`.
154 fn wait(&mut self) -> Box<dyn Future<Output = Result<ExitStatus>> + Send + '_> {
155 Box::new(self.inner_mut().wait())
156 }
157
158 /// Wait for the `Child` to exit and return its exit status and outputs.
159 ///
160 /// Note that this method reads the child's stdout and stderr to completion into memory.
161 ///
162 /// By default this is a reimplementation of the Tokio method, so that it can use the wrapper's
163 /// `wait()` method instead of the underlying `Child`'s `wait()`.
164 fn wait_with_output(mut self: Box<Self>) -> Box<dyn Future<Output = Result<Output>> + Send>
165 where
166 Self: 'static,
167 {
168 Box::new(async move {
169 async fn read_to_end<A: AsyncRead + Unpin>(io: &mut Option<A>) -> Result<Vec<u8>> {
170 let mut vec = Vec::new();
171 if let Some(io) = io.as_mut() {
172 io.read_to_end(&mut vec).await?;
173 }
174 Ok(vec)
175 }
176
177 let mut stdout_pipe = self.stdout().take();
178 let mut stderr_pipe = self.stderr().take();
179
180 let stdout_fut = read_to_end(&mut stdout_pipe);
181 let stderr_fut = read_to_end(&mut stderr_pipe);
182
183 let (status, stdout, stderr) =
184 try_join3(Box::into_pin(self.wait()), stdout_fut, stderr_fut).await?;
185
186 // Drop happens after `try_join` due to <https://github.com/tokio-rs/tokio/issues/4309>
187 drop(stdout_pipe);
188 drop(stderr_pipe);
189
190 Ok(Output {
191 status,
192 stdout,
193 stderr,
194 })
195 })
196 }
197
198 /// Send a signal to the `Child`.
199 ///
200 /// This method is only available on Unix. It doesn't exist on Tokio's `Child`, nor on std's. It
201 /// was introduced by command-group to abstract over the signal behaviour between process groups
202 /// and unwrapped processes.
203 #[cfg(unix)]
204 fn signal(&self, sig: i32) -> Result<()> {
205 if let Some(id) = self.id() {
206 kill(
207 Pid::from_raw(i32::try_from(id).map_err(std::io::Error::other)?),
208 Signal::try_from(sig)?,
209 )
210 .map_err(std::io::Error::from)
211 } else {
212 Ok(())
213 }
214 }
215}
216}
217
218impl TokioChildWrapper for Child {
219 fn inner(&self) -> &Child {
220 self
221 }
222 fn inner_mut(&mut self) -> &mut Child {
223 self
224 }
225 fn into_inner(self: Box<Self>) -> Child {
226 *self
227 }
228}