process_wrap/tokio/
process_group.rs

1use std::{
2	future::Future,
3	io::{Error, Result},
4	ops::ControlFlow,
5	os::unix::process::ExitStatusExt,
6	process::ExitStatus,
7};
8
9use nix::{
10	errno::Errno,
11	libc,
12	sys::{
13		signal::{killpg, Signal},
14		wait::WaitPidFlag,
15	},
16	unistd::Pid,
17};
18use tokio::{
19	process::{Child, Command},
20	task::spawn_blocking,
21};
22#[cfg(feature = "tracing")]
23use tracing::instrument;
24
25use crate::ChildExitStatus;
26
27use super::{TokioChildWrapper, TokioCommandWrap, TokioCommandWrapper};
28
29/// Wrapper which sets the process group of a `Command`.
30///
31/// This wrapper is only available on Unix.
32///
33/// It sets the process group of a [`Command`], either to itself as the leader of a new group, or to
34/// an existing one by its PGID. See [setpgid(2)](https://pubs.opengroup.org/onlinepubs/9699919799/functions/setpgid.html).
35///
36/// Process groups direct signals to all members of the group, and also serve to control job
37/// placement in foreground or background, among other actions.
38///
39/// This wrapper provides a child wrapper: [`ProcessGroupChild`].
40#[derive(Clone, Copy, Debug)]
41pub struct ProcessGroup {
42	leader: Pid,
43}
44
45impl ProcessGroup {
46	/// Create a process group wrapper setting up a new process group with the command as the leader.
47	pub fn leader() -> Self {
48		Self {
49			leader: Pid::from_raw(0),
50		}
51	}
52
53	/// Create a process group wrapper attaching the command to an existing process group ID.
54	pub fn attach_to(leader: u32) -> Self {
55		Self {
56			leader: Pid::from_raw(leader as i32),
57		}
58	}
59}
60
61/// Wrapper for `Child` which ensures that all processes in the group are reaped.
62#[derive(Debug)]
63pub struct ProcessGroupChild {
64	inner: Box<dyn TokioChildWrapper>,
65	exit_status: ChildExitStatus,
66	pgid: Pid,
67}
68
69impl ProcessGroupChild {
70	#[cfg_attr(feature = "tracing", instrument(level = "debug"))]
71	pub(crate) fn new(inner: Box<dyn TokioChildWrapper>, pgid: Pid) -> Self {
72		Self {
73			inner,
74			exit_status: ChildExitStatus::Running,
75			pgid,
76		}
77	}
78
79	/// Get the process group ID of this child process.
80	///
81	/// See: [`man 'setpgid(2)'`](https://www.man7.org/linux/man-pages/man2/setpgid.2.html)
82	pub fn pgid(&self) -> u32 {
83		self.pgid.as_raw() as _
84	}
85}
86
87impl TokioCommandWrapper for ProcessGroup {
88	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
89	fn pre_spawn(&mut self, command: &mut Command, _core: &TokioCommandWrap) -> Result<()> {
90		command.process_group(self.leader.as_raw());
91		Ok(())
92	}
93
94	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
95	fn wrap_child(
96		&mut self,
97		inner: Box<dyn TokioChildWrapper>,
98		_core: &TokioCommandWrap,
99	) -> Result<Box<dyn TokioChildWrapper>> {
100		let pgid = Pid::from_raw(
101			i32::try_from(
102				inner
103					.id()
104					.expect("Command was reaped before we could read its PID"),
105			)
106			.expect("Command PID > i32::MAX"),
107		);
108
109		Ok(Box::new(ProcessGroupChild::new(inner, pgid)))
110	}
111}
112
113impl ProcessGroupChild {
114	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
115	fn signal_imp(&self, sig: Signal) -> Result<()> {
116		killpg(self.pgid, sig).map_err(Error::from)
117	}
118
119	#[cfg_attr(feature = "tracing", instrument(level = "debug"))]
120	fn wait_imp(pgid: Pid, flag: WaitPidFlag) -> Result<ControlFlow<Option<ExitStatus>>> {
121		// wait for processes in a loop until every process in this group has
122		// exited (this ensures that we reap any zombies that may have been
123		// created if the parent exited after spawning children, but didn't wait
124		// for those children to exit)
125		let mut parent_exit_status: Option<ExitStatus> = None;
126		loop {
127			// we can't use the safe wrapper directly because it doesn't return
128			// the raw status, and we need it to convert to the std's ExitStatus
129			let mut status: i32 = 0;
130			match unsafe {
131				libc::waitpid(-pgid.as_raw(), &mut status as *mut libc::c_int, flag.bits())
132			} {
133				0 => {
134					// zero should only happen if WNOHANG was passed in,
135					// and means that no processes have yet to exit
136					return Ok(ControlFlow::Continue(()));
137				}
138				-1 => {
139					match Errno::last() {
140						Errno::ECHILD => {
141							// no more children to reap; this is a graceful exit
142							return Ok(ControlFlow::Break(parent_exit_status));
143						}
144						errno => {
145							return Err(Error::from(errno));
146						}
147					}
148				}
149				pid => {
150					// a process exited. was it the parent process that we
151					// started? if so, collect the exit signal, otherwise we
152					// reaped a zombie process and should continue looping
153					if pgid == Pid::from_raw(pid) {
154						parent_exit_status = Some(ExitStatus::from_raw(status));
155					} else {
156						// reaped a zombie child; keep looping
157					}
158				}
159			};
160		}
161	}
162}
163
164impl TokioChildWrapper for ProcessGroupChild {
165	fn inner(&self) -> &Child {
166		self.inner.inner()
167	}
168	fn inner_mut(&mut self) -> &mut Child {
169		self.inner.inner_mut()
170	}
171	fn into_inner(self: Box<Self>) -> Child {
172		self.inner.into_inner()
173	}
174
175	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
176	fn start_kill(&mut self) -> Result<()> {
177		self.signal_imp(Signal::SIGKILL)
178	}
179
180	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
181	fn wait(&mut self) -> Box<dyn Future<Output = Result<ExitStatus>> + Send + '_> {
182		Box::new(async {
183			if let ChildExitStatus::Exited(status) = &self.exit_status {
184				return Ok(*status);
185			}
186
187			const MAX_RETRY_ATTEMPT: usize = 10;
188			let pgid = self.pgid;
189
190			// always wait for parent to exit first, as by the time it does,
191			// it's likely that all its children have already been reaped.
192			let status = Box::into_pin(self.inner.wait()).await?;
193			self.exit_status = ChildExitStatus::Exited(status);
194
195			// nevertheless, now try reaping all children a few times...
196			for _ in 1..MAX_RETRY_ATTEMPT {
197				if Self::wait_imp(pgid, WaitPidFlag::WNOHANG)?.is_break() {
198					return Ok(status);
199				}
200			}
201
202			// ...finally, if there are some that are still alive,
203			// block in the background to reap them fully.
204			spawn_blocking(move || Self::wait_imp(pgid, WaitPidFlag::empty())).await??;
205			Ok(status)
206		})
207	}
208
209	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
210	fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
211		if let ChildExitStatus::Exited(status) = &self.exit_status {
212			return Ok(Some(*status));
213		}
214
215		match Self::wait_imp(self.pgid, WaitPidFlag::WNOHANG)? {
216			ControlFlow::Break(res) => {
217				if let Some(status) = res {
218					self.exit_status = ChildExitStatus::Exited(status);
219				}
220				Ok(res)
221			}
222			ControlFlow::Continue(()) => {
223				let exited = self.inner.try_wait()?;
224				if let Some(exited) = exited {
225					self.exit_status = ChildExitStatus::Exited(exited);
226				}
227				Ok(exited)
228			}
229		}
230	}
231
232	fn signal(&self, sig: i32) -> Result<()> {
233		self.signal_imp(Signal::try_from(sig)?)
234	}
235}