process_wrap/tokio/
core.rs

1use std::{
2	any::Any,
3	future::Future,
4	io::Result,
5	pin::Pin,
6	process::{ExitStatus, Output},
7};
8
9use futures::future::try_join3;
10#[cfg(unix)]
11use nix::{
12	sys::signal::{kill, Signal},
13	unistd::Pid,
14};
15use tokio::{
16	io::{AsyncRead, AsyncReadExt},
17	process::{Child, ChildStderr, ChildStdin, ChildStdout, Command},
18};
19
20crate::generic_wrap::Wrap!(Command, Child, ChildWrapper, |child| child);
21
22/// Wrapper for `tokio::process::Child`.
23///
24/// This trait exposes most of the functionality of the underlying [`Child`]. It is implemented for
25/// [`Child`] and by wrappers.
26///
27/// The required methods are `inner`, `inner_mut`, and `into_inner`. That provides access to the
28/// underlying `Child` and allows the wrapper to be dropped and the `Child` to be used directly if
29/// necessary.
30///
31/// It also makes it possible for all the other methods to have default implementations. Some are
32/// direct passthroughs to the underlying `Child`, while others are more complex.
33///
34/// Here's a simple example of a wrapper:
35///
36/// ```rust
37/// use process_wrap::tokio::*;
38/// use tokio::process::Child;
39///
40/// #[derive(Debug)]
41/// pub struct YourChildWrapper(Child);
42///
43/// impl ChildWrapper for YourChildWrapper {
44///     fn inner(&self) -> &dyn ChildWrapper {
45///         &self.0
46///     }
47///
48///     fn inner_mut(&mut self) -> &mut dyn ChildWrapper {
49///         &mut self.0
50///     }
51///
52///     fn into_inner(self: Box<Self>) -> Box<dyn ChildWrapper> {
53///         Box::new((*self).0)
54///     }
55/// }
56/// ```
57pub trait ChildWrapper: Any + std::fmt::Debug + Send {
58	/// Obtain a reference to the wrapped child.
59	fn inner(&self) -> &dyn ChildWrapper;
60
61	/// Obtain a mutable reference to the wrapped child.
62	fn inner_mut(&mut self) -> &mut dyn ChildWrapper;
63
64	/// Consume the current wrapper and return the wrapped child.
65	///
66	/// Note that this may disrupt whatever the current wrapper was doing. However, wrappers must
67	/// ensure that the wrapped child is in a consistent state when this is called or they are
68	/// dropped, so that this is always safe.
69	fn into_inner(self: Box<Self>) -> Box<dyn ChildWrapper>;
70
71	/// Obtain a clone if possible.
72	///
73	/// Some implementations may make it possible to clone the implementing structure, even though
74	/// Tokio's `Child` isn't `Clone`. In those cases, this method should be overridden.
75	fn try_clone(&self) -> Option<Box<dyn ChildWrapper>> {
76		None
77	}
78
79	/// Obtain the `Child`'s stdin.
80	///
81	/// By default this is a passthrough to the wrapped child.
82	fn stdin(&mut self) -> &mut Option<ChildStdin> {
83		self.inner_mut().stdin()
84	}
85
86	/// Obtain the `Child`'s stdout.
87	///
88	/// By default this is a passthrough to the wrapped child.
89	fn stdout(&mut self) -> &mut Option<ChildStdout> {
90		self.inner_mut().stdout()
91	}
92
93	/// Obtain the `Child`'s stderr.
94	///
95	/// By default this is a passthrough to the wrapped child.
96	fn stderr(&mut self) -> &mut Option<ChildStderr> {
97		self.inner_mut().stderr()
98	}
99
100	/// Obtain the `Child`'s process ID.
101	///
102	/// In general this should be the PID of the top-level spawned process that was spawned
103	/// However, that may vary depending on what a wrapper does.
104	///
105	/// Returns an `Option` to resemble Tokio's API, but isn't expected to be `None` in practice.
106	fn id(&self) -> Option<u32> {
107		self.inner().id()
108	}
109
110	/// Kill the `Child` and wait for it to exit.
111	///
112	/// By default this calls `start_kill()` and then `wait()`, which is the same way it is done on
113	/// the underlying `Child`, but that way implementing either or both of those methods will use
114	/// them when calling `kill()`, instead of requiring a stub implementation.
115	fn kill(&mut self) -> Box<dyn Future<Output = Result<()>> + Send + '_> {
116		Box::new(async {
117			self.start_kill()?;
118			self.wait().await?;
119			Ok(())
120		})
121	}
122
123	/// Kill the `Child` without waiting for it to exit.
124	///
125	/// By default this is a passthrough to the underlying `Child`, which:
126	/// - on Unix, sends a `SIGKILL` signal to the process;
127	/// - otherwise, passes through to the `kill()` method.
128	fn start_kill(&mut self) -> Result<()> {
129		self.inner_mut().start_kill()
130	}
131
132	/// Check if the `Child` has exited without waiting, and if it has, return its exit status.
133	///
134	/// Wrappers must ensure that repeatedly calling this (or other wait methods) after the child
135	/// has exited will always return the same result.
136	///
137	/// By default this is a passthrough to the underlying `Child`.
138	fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
139		self.inner_mut().try_wait()
140	}
141
142	/// Wait for the `Child` to exit and return its exit status.
143	///
144	/// Wrappers must ensure that repeatedly calling this (or other wait methods) after the child
145	/// has exited will always return the same result.
146	///
147	/// By default this is a passthrough to the underlying `Child`.
148	fn wait(&mut self) -> Pin<Box<dyn Future<Output = Result<ExitStatus>> + Send + '_>> {
149		Box::pin(self.inner_mut().wait())
150	}
151
152	/// Wait for the `Child` to exit and return its exit status and outputs.
153	///
154	/// Note that this method reads the child's stdout and stderr to completion into memory.
155	///
156	/// By default this is a reimplementation of the Tokio method, so that it can use the wrapper's
157	/// `wait()` method instead of the underlying `Child`'s `wait()`.
158	fn wait_with_output(mut self: Box<Self>) -> Box<dyn Future<Output = Result<Output>> + Send>
159	where
160		Self: 'static,
161	{
162		Box::new(async move {
163			async fn read_to_end<A: AsyncRead + Unpin>(io: &mut Option<A>) -> Result<Vec<u8>> {
164				let mut vec = Vec::new();
165				if let Some(io) = io.as_mut() {
166					io.read_to_end(&mut vec).await?;
167				}
168				Ok(vec)
169			}
170
171			let mut stdout_pipe = self.stdout().take();
172			let mut stderr_pipe = self.stderr().take();
173
174			let stdout_fut = read_to_end(&mut stdout_pipe);
175			let stderr_fut = read_to_end(&mut stderr_pipe);
176
177			let (status, stdout, stderr) = try_join3(self.wait(), stdout_fut, stderr_fut).await?;
178
179			// Drop happens after `try_join` due to <https://github.com/tokio-rs/tokio/issues/4309>
180			drop(stdout_pipe);
181			drop(stderr_pipe);
182
183			Ok(Output {
184				status,
185				stdout,
186				stderr,
187			})
188		})
189	}
190
191	/// Send a signal to the `Child`.
192	///
193	/// This method is only available on Unix. It doesn't exist on Tokio's `Child`, nor on std's. It
194	/// was introduced by command-group to abstract over the signal behaviour between process groups
195	/// and unwrapped processes.
196	#[cfg(unix)]
197	fn signal(&self, sig: i32) -> Result<()> {
198		self.inner().signal(sig)
199	}
200}
201
202impl ChildWrapper for Child {
203	fn inner(&self) -> &dyn ChildWrapper {
204		self
205	}
206	fn inner_mut(&mut self) -> &mut dyn ChildWrapper {
207		self
208	}
209	fn into_inner(self: Box<Self>) -> Box<dyn ChildWrapper> {
210		Box::new(*self)
211	}
212	fn stdin(&mut self) -> &mut Option<ChildStdin> {
213		&mut self.stdin
214	}
215	fn stdout(&mut self) -> &mut Option<ChildStdout> {
216		&mut self.stdout
217	}
218	fn stderr(&mut self) -> &mut Option<ChildStderr> {
219		&mut self.stderr
220	}
221	fn id(&self) -> Option<u32> {
222		Child::id(self)
223	}
224	fn start_kill(&mut self) -> Result<()> {
225		Child::start_kill(self)
226	}
227	fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
228		Child::try_wait(self)
229	}
230	fn wait(&mut self) -> Pin<Box<dyn Future<Output = Result<ExitStatus>> + Send + '_>> {
231		Box::pin(Child::wait(self))
232	}
233	#[cfg(unix)]
234	fn signal(&self, sig: i32) -> Result<()> {
235		if let Some(id) = self.id() {
236			kill(
237				Pid::from_raw(i32::try_from(id).map_err(std::io::Error::other)?),
238				Signal::try_from(sig)?,
239			)
240			.map_err(std::io::Error::from)
241		} else {
242			Ok(())
243		}
244	}
245}
246
247impl dyn ChildWrapper {
248	fn downcast_ref<T: 'static>(&self) -> Option<&T> {
249		(self as &dyn Any).downcast_ref()
250	}
251
252	fn is_raw_child(&self) -> bool {
253		self.downcast_ref::<Child>().is_some()
254	}
255
256	/// Obtain a reference to the underlying [`Child`].
257	pub fn inner_child(&self) -> &Child {
258		let mut inner = self;
259		while !inner.is_raw_child() {
260			inner = inner.inner();
261		}
262
263		// UNWRAP: we've just checked that it's Some with is_raw_child()
264		inner.downcast_ref().unwrap()
265	}
266
267	/// Obtain a mutable reference to the underlying [`Child`].
268	///
269	/// Modifying the raw child may be unsound depending on the layering of wrappers.
270	pub unsafe fn inner_child_mut(&mut self) -> &mut Child {
271		let mut inner = self;
272		while !inner.is_raw_child() {
273			inner = inner.inner_mut();
274		}
275
276		// UNWRAP: we've just checked that with is_raw_child()
277		(inner as &mut dyn Any).downcast_mut().unwrap()
278	}
279
280	/// Obtain the underlying [`Child`].
281	///
282	/// Unwrapping everything may be unsound depending on the state of the wrappers.
283	pub unsafe fn into_inner_child(self: Box<Self>) -> Child {
284		let mut inner = self;
285		while !inner.is_raw_child() {
286			inner = inner.into_inner();
287		}
288
289		// UNWRAP: we've just checked that with is_raw_child()
290		*(inner as Box<dyn Any>).downcast().unwrap()
291	}
292}