process_wrap/tokio/
core.rs

1use std::{
2	future::Future,
3	io::Result,
4	process::{ExitStatus, Output},
5};
6
7use futures::future::try_join3;
8#[cfg(unix)]
9use nix::{
10	sys::signal::{kill, Signal},
11	unistd::Pid,
12};
13use tokio::{
14	io::{AsyncRead, AsyncReadExt},
15	process::{Child, ChildStderr, ChildStdin, ChildStdout, Command},
16};
17
18crate::generic_wrap::Wrap!(
19	TokioCommandWrap,
20	Command,
21	TokioCommandWrapper,
22	Child,
23	TokioChildWrapper,
24	|child| child
25);
26
27crate::generic_wrap::MaybeAnyTrait! {
28/// Wrapper for `tokio::process::Child`.
29///
30/// This trait exposes most of the functionality of the underlying [`Child`]. It is implemented for
31/// [`Child`] and by wrappers.
32///
33/// The required methods are `inner`, `inner_mut`, and `into_inner`. That provides access to the
34/// underlying `Child` and allows the wrapper to be dropped and the `Child` to be used directly if
35/// necessary.
36///
37/// It also makes it possible for all the other methods to have default implementations. Some are
38/// direct passthroughs to the underlying `Child`, while others are more complex.
39///
40/// Here's a simple example of a wrapper:
41///
42/// ```rust
43/// use process_wrap::tokio::*;
44/// use tokio::process::Child;
45///
46/// #[derive(Debug)]
47/// pub struct YourChildWrapper(Child);
48///
49/// impl TokioChildWrapper for YourChildWrapper {
50///     fn inner(&self) -> &Child {
51///         &self.0
52///     }
53///
54///     fn inner_mut(&mut self) -> &mut Child {
55///         &mut self.0
56///     }
57///
58///     fn into_inner(self: Box<Self>) -> Child {
59///         (*self).0
60///     }
61/// }
62/// ```
63pub trait TokioChildWrapper {
64	/// Obtain a reference to the underlying `Child`.
65	fn inner(&self) -> &Child;
66
67	/// Obtain a mutable reference to the underlying `Child`.
68	fn inner_mut(&mut self) -> &mut Child;
69
70	/// Consume the wrapper and return the underlying `Child`.
71	///
72	/// Note that this may disrupt whatever the wrappers were doing. However, wrappers must ensure
73	/// that the `Child` is in a consistent state when this is called or they are dropped, so that
74	/// this is always safe.
75	fn into_inner(self: Box<Self>) -> Child;
76
77	/// Obtain a clone if possible.
78	///
79	/// Some implementations may make it possible to clone the implementing structure, even though
80	/// Tokio's `Child` isn't `Clone`. In those cases, this method should be overridden.
81	fn try_clone(&self) -> Option<Box<dyn TokioChildWrapper>> {
82		None
83	}
84
85	/// Obtain the `Child`'s stdin.
86	///
87	/// By default this is a passthrough to the underlying `Child`.
88	fn stdin(&mut self) -> &mut Option<ChildStdin> {
89		&mut self.inner_mut().stdin
90	}
91
92	/// Obtain the `Child`'s stdout.
93	///
94	/// By default this is a passthrough to the underlying `Child`.
95	fn stdout(&mut self) -> &mut Option<ChildStdout> {
96		&mut self.inner_mut().stdout
97	}
98
99	/// Obtain the `Child`'s stderr.
100	///
101	/// By default this is a passthrough to the underlying `Child`.
102	fn stderr(&mut self) -> &mut Option<ChildStderr> {
103		&mut self.inner_mut().stderr
104	}
105
106	/// Obtain the `Child`'s process ID.
107	///
108	/// In general this should be the PID of the top-level spawned process that was spawned
109	/// However, that may vary depending on what a wrapper does.
110	///
111	/// Returns an `Option` to resemble Tokio's API, but isn't expected to be `None` in practice.
112	fn id(&self) -> Option<u32> {
113		self.inner().id()
114	}
115
116	/// Kill the `Child` and wait for it to exit.
117	///
118	/// By default this calls `start_kill()` and then `wait()`, which is the same way it is done on
119	/// the underlying `Child`, but that way implementing either or both of those methods will use
120	/// them when calling `kill()`, instead of requiring a stub implementation.
121	fn kill(&mut self) -> Box<dyn Future<Output = Result<()>> + Send + '_> {
122		Box::new(async {
123			self.start_kill()?;
124			Box::into_pin(self.wait()).await?;
125			Ok(())
126		})
127	}
128
129	/// Kill the `Child` without waiting for it to exit.
130	///
131	/// By default this is a passthrough to the underlying `Child`, which:
132	/// - on Unix, sends a `SIGKILL` signal to the process;
133	/// - otherwise, passes through to the `kill()` method.
134	fn start_kill(&mut self) -> Result<()> {
135		self.inner_mut().start_kill()
136	}
137
138	/// Check if the `Child` has exited without waiting, and if it has, return its exit status.
139	///
140	/// Wrappers must ensure that repeatedly calling this (or other wait methods) after the child
141	/// has exited will always return the same result.
142	///
143	/// By default this is a passthrough to the underlying `Child`.
144	fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
145		self.inner_mut().try_wait()
146	}
147
148	/// Wait for the `Child` to exit and return its exit status.
149	///
150	/// Wrappers must ensure that repeatedly calling this (or other wait methods) after the child
151	/// has exited will always return the same result.
152	///
153	/// By default this is a passthrough to the underlying `Child`.
154	fn wait(&mut self) -> Box<dyn Future<Output = Result<ExitStatus>> + Send + '_> {
155		Box::new(self.inner_mut().wait())
156	}
157
158	/// Wait for the `Child` to exit and return its exit status and outputs.
159	///
160	/// Note that this method reads the child's stdout and stderr to completion into memory.
161	///
162	/// By default this is a reimplementation of the Tokio method, so that it can use the wrapper's
163	/// `wait()` method instead of the underlying `Child`'s `wait()`.
164	fn wait_with_output(mut self: Box<Self>) -> Box<dyn Future<Output = Result<Output>> + Send>
165	where
166		Self: 'static,
167	{
168		Box::new(async move {
169			async fn read_to_end<A: AsyncRead + Unpin>(io: &mut Option<A>) -> Result<Vec<u8>> {
170				let mut vec = Vec::new();
171				if let Some(io) = io.as_mut() {
172					io.read_to_end(&mut vec).await?;
173				}
174				Ok(vec)
175			}
176
177			let mut stdout_pipe = self.stdout().take();
178			let mut stderr_pipe = self.stderr().take();
179
180			let stdout_fut = read_to_end(&mut stdout_pipe);
181			let stderr_fut = read_to_end(&mut stderr_pipe);
182
183			let (status, stdout, stderr) =
184				try_join3(Box::into_pin(self.wait()), stdout_fut, stderr_fut).await?;
185
186			// Drop happens after `try_join` due to <https://github.com/tokio-rs/tokio/issues/4309>
187			drop(stdout_pipe);
188			drop(stderr_pipe);
189
190			Ok(Output {
191				status,
192				stdout,
193				stderr,
194			})
195		})
196	}
197
198	/// Send a signal to the `Child`.
199	///
200	/// This method is only available on Unix. It doesn't exist on Tokio's `Child`, nor on std's. It
201	/// was introduced by command-group to abstract over the signal behaviour between process groups
202	/// and unwrapped processes.
203	#[cfg(unix)]
204	fn signal(&self, sig: i32) -> Result<()> {
205		if let Some(id) = self.id() {
206			kill(
207				Pid::from_raw(i32::try_from(id).map_err(std::io::Error::other)?),
208				Signal::try_from(sig)?,
209			)
210			.map_err(std::io::Error::from)
211		} else {
212			Ok(())
213		}
214	}
215}
216}
217
218impl TokioChildWrapper for Child {
219	fn inner(&self) -> &Child {
220		self
221	}
222	fn inner_mut(&mut self) -> &mut Child {
223		self
224	}
225	fn into_inner(self: Box<Self>) -> Child {
226		*self
227	}
228}