process_wrap/tokio/
process_group.rs

1use std::{
2	future::Future,
3	io::{Error, Result},
4	ops::ControlFlow,
5	os::unix::process::ExitStatusExt,
6	pin::Pin,
7	process::ExitStatus,
8};
9
10use nix::{
11	errno::Errno,
12	libc,
13	sys::{
14		signal::{killpg, Signal},
15		wait::WaitPidFlag,
16	},
17	unistd::Pid,
18};
19use tokio::{process::Command, task::spawn_blocking};
20#[cfg(feature = "tracing")]
21use tracing::instrument;
22
23use crate::ChildExitStatus;
24
25use super::{ChildWrapper, CommandWrap, CommandWrapper};
26
27/// Wrapper which sets the process group of a `Command`.
28///
29/// This wrapper is only available on Unix.
30///
31/// It sets the process group of a [`Command`], either to itself as the leader of a new group, or to
32/// an existing one by its PGID. See [setpgid(2)](https://pubs.opengroup.org/onlinepubs/9699919799/functions/setpgid.html).
33///
34/// Process groups direct signals to all members of the group, and also serve to control job
35/// placement in foreground or background, among other actions.
36///
37/// This wrapper provides a child wrapper: [`ProcessGroupChild`].
38#[derive(Clone, Copy, Debug)]
39pub struct ProcessGroup {
40	leader: Pid,
41}
42
43impl ProcessGroup {
44	/// Create a process group wrapper setting up a new process group with the command as the leader.
45	pub fn leader() -> Self {
46		Self {
47			leader: Pid::from_raw(0),
48		}
49	}
50
51	/// Create a process group wrapper attaching the command to an existing process group ID.
52	pub fn attach_to(leader: u32) -> Self {
53		Self {
54			leader: Pid::from_raw(leader as i32),
55		}
56	}
57}
58
59/// Wrapper for `Child` which ensures that all processes in the group are reaped.
60#[derive(Debug)]
61pub struct ProcessGroupChild {
62	inner: Box<dyn ChildWrapper>,
63	exit_status: ChildExitStatus,
64	pgid: Pid,
65}
66
67impl ProcessGroupChild {
68	#[cfg_attr(feature = "tracing", instrument(level = "debug"))]
69	pub(crate) fn new(inner: Box<dyn ChildWrapper>, pgid: Pid) -> Self {
70		Self {
71			inner,
72			exit_status: ChildExitStatus::Running,
73			pgid,
74		}
75	}
76
77	/// Get the process group ID of this child process.
78	///
79	/// See: [`man 'setpgid(2)'`](https://www.man7.org/linux/man-pages/man2/setpgid.2.html)
80	pub fn pgid(&self) -> u32 {
81		self.pgid.as_raw() as _
82	}
83}
84
85impl CommandWrapper for ProcessGroup {
86	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
87	fn pre_spawn(&mut self, command: &mut Command, _core: &CommandWrap) -> Result<()> {
88		command.process_group(self.leader.as_raw());
89		Ok(())
90	}
91
92	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
93	fn wrap_child(
94		&mut self,
95		inner: Box<dyn ChildWrapper>,
96		_core: &CommandWrap,
97	) -> Result<Box<dyn ChildWrapper>> {
98		let pgid = Pid::from_raw(
99			i32::try_from(
100				inner
101					.id()
102					.expect("Command was reaped before we could read its PID"),
103			)
104			.expect("Command PID > i32::MAX"),
105		);
106
107		Ok(Box::new(ProcessGroupChild::new(inner, pgid)))
108	}
109}
110
111impl ProcessGroupChild {
112	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
113	fn signal_imp(&self, sig: Signal) -> Result<()> {
114		killpg(self.pgid, sig).map_err(Error::from)
115	}
116
117	#[cfg_attr(feature = "tracing", instrument(level = "debug"))]
118	fn wait_imp(pgid: Pid, flag: WaitPidFlag) -> Result<ControlFlow<Option<ExitStatus>>> {
119		// wait for processes in a loop until every process in this group has
120		// exited (this ensures that we reap any zombies that may have been
121		// created if the parent exited after spawning children, but didn't wait
122		// for those children to exit)
123		let mut parent_exit_status: Option<ExitStatus> = None;
124		loop {
125			// we can't use the safe wrapper directly because it doesn't return
126			// the raw status, and we need it to convert to the std's ExitStatus
127			let mut status: i32 = 0;
128			match unsafe {
129				libc::waitpid(-pgid.as_raw(), &mut status as *mut libc::c_int, flag.bits())
130			} {
131				0 => {
132					// zero should only happen if WNOHANG was passed in,
133					// and means that no processes have yet to exit
134					return Ok(ControlFlow::Continue(()));
135				}
136				-1 => {
137					match Errno::last() {
138						Errno::ECHILD => {
139							// no more children to reap; this is a graceful exit
140							return Ok(ControlFlow::Break(parent_exit_status));
141						}
142						errno => {
143							return Err(Error::from(errno));
144						}
145					}
146				}
147				pid => {
148					// a process exited. was it the parent process that we
149					// started? if so, collect the exit signal, otherwise we
150					// reaped a zombie process and should continue looping
151					if pgid == Pid::from_raw(pid) {
152						parent_exit_status = Some(ExitStatus::from_raw(status));
153					} else {
154						// reaped a zombie child; keep looping
155					}
156				}
157			};
158		}
159	}
160}
161
162impl ChildWrapper for ProcessGroupChild {
163	fn inner(&self) -> &dyn ChildWrapper {
164		self.inner.inner()
165	}
166	fn inner_mut(&mut self) -> &mut dyn ChildWrapper {
167		self.inner.inner_mut()
168	}
169	fn into_inner(self: Box<Self>) -> Box<dyn ChildWrapper> {
170		self.inner.into_inner()
171	}
172
173	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
174	fn start_kill(&mut self) -> Result<()> {
175		self.signal_imp(Signal::SIGKILL)
176	}
177
178	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
179	fn wait(&mut self) -> Pin<Box<dyn Future<Output = Result<ExitStatus>> + Send + '_>> {
180		Box::pin(async {
181			if let ChildExitStatus::Exited(status) = &self.exit_status {
182				return Ok(*status);
183			}
184
185			const MAX_RETRY_ATTEMPT: usize = 10;
186			let pgid = self.pgid;
187
188			// always wait for parent to exit first, as by the time it does,
189			// it's likely that all its children have already been reaped.
190			let status = self.inner.wait().await?;
191			self.exit_status = ChildExitStatus::Exited(status);
192
193			// nevertheless, now try reaping all children a few times...
194			for _ in 1..MAX_RETRY_ATTEMPT {
195				if Self::wait_imp(pgid, WaitPidFlag::WNOHANG)?.is_break() {
196					return Ok(status);
197				}
198			}
199
200			// ...finally, if there are some that are still alive,
201			// block in the background to reap them fully.
202			let _ = spawn_blocking(move || Self::wait_imp(pgid, WaitPidFlag::empty())).await??;
203			Ok(status)
204		})
205	}
206
207	#[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
208	fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
209		if let ChildExitStatus::Exited(status) = &self.exit_status {
210			return Ok(Some(*status));
211		}
212
213		match Self::wait_imp(self.pgid, WaitPidFlag::WNOHANG)? {
214			ControlFlow::Break(res) => {
215				if let Some(status) = res {
216					self.exit_status = ChildExitStatus::Exited(status);
217				}
218				Ok(res)
219			}
220			ControlFlow::Continue(()) => {
221				let exited = self.inner.try_wait()?;
222				if let Some(exited) = exited {
223					self.exit_status = ChildExitStatus::Exited(exited);
224				}
225				Ok(exited)
226			}
227		}
228	}
229
230	fn signal(&self, sig: i32) -> Result<()> {
231		self.signal_imp(Signal::try_from(sig)?)
232	}
233}