command_group/tokio/child/
unix.rs

1use std::{
2	convert::TryInto,
3	io::{Error, Result},
4	ops::ControlFlow,
5	os::unix::process::ExitStatusExt,
6	process::ExitStatus,
7};
8
9use nix::{
10	errno::Errno,
11	libc,
12	sys::{
13		signal::{killpg, Signal},
14		wait::WaitPidFlag,
15	},
16	unistd::Pid,
17};
18use tokio::{
19	process::{Child, ChildStderr, ChildStdin, ChildStdout},
20	task::spawn_blocking,
21};
22
23pub(super) struct ChildImp {
24	pgid: Pid,
25	inner: Child,
26}
27
28impl ChildImp {
29	pub(super) fn new(inner: Child) -> Self {
30		let pid = inner
31			.id()
32			.expect("Command was reaped before we could read its PID")
33			.try_into()
34			.expect("Command PID > i32::MAX");
35		Self {
36			pgid: Pid::from_raw(pid),
37			inner,
38		}
39	}
40
41	pub(super) fn take_stdin(&mut self) -> Option<ChildStdin> {
42		self.inner.stdin.take()
43	}
44
45	pub(super) fn take_stdout(&mut self) -> Option<ChildStdout> {
46		self.inner.stdout.take()
47	}
48
49	pub(super) fn take_stderr(&mut self) -> Option<ChildStderr> {
50		self.inner.stderr.take()
51	}
52
53	pub fn inner(&mut self) -> &mut Child {
54		&mut self.inner
55	}
56
57	pub fn into_inner(self) -> Child {
58		self.inner
59	}
60
61	pub(super) fn signal_imp(&self, sig: Signal) -> Result<()> {
62		killpg(self.pgid, sig).map_err(Error::from)
63	}
64
65	pub fn start_kill(&mut self) -> Result<()> {
66		self.signal_imp(Signal::SIGKILL)
67	}
68
69	pub fn id(&self) -> Option<u32> {
70		self.inner.id()
71	}
72
73	fn wait_imp(pgid: i32, flag: WaitPidFlag) -> Result<ControlFlow<Option<ExitStatus>>> {
74		// Wait for processes in a loop until every process in this
75		// process group has exited (this ensures that we reap any
76		// zombies that may have been created if the parent exited after
77		// spawning children, but didn't wait for those children to
78		// exit).
79		let mut parent_exit_status: Option<ExitStatus> = None;
80		loop {
81			// we can't use the safe wrapper directly because it doesn't
82			// return the raw status, and we need it to convert to the
83			// std's ExitStatus.
84			let mut status: i32 = 0;
85			match unsafe { libc::waitpid(-pgid, &mut status as *mut libc::c_int, flag.bits()) } {
86				0 => {
87					// Zero should only happen if WNOHANG was passed in,
88					// and means that no processes have yet to exit.
89					return Ok(ControlFlow::Continue(()));
90				}
91				-1 => {
92					match Errno::last() {
93						Errno::ECHILD => {
94							// No more children to reap; this is a
95							// graceful exit.
96							return Ok(ControlFlow::Break(parent_exit_status));
97						}
98						errno => {
99							return Err(Error::from(errno));
100						}
101					}
102				}
103				pid => {
104					// *A* process exited. Was it the parent process
105					// that we started? If so, collect the exit signal,
106					// otherwise we reaped a zombie process and should
107					// continue in the loop.
108					if pgid == pid {
109						parent_exit_status = Some(ExitStatus::from_raw(status));
110					} else {
111						// Reaped a zombie child; keep looping.
112					}
113				}
114			};
115		}
116	}
117
118	pub async fn wait(&mut self) -> Result<ExitStatus> {
119		const MAX_RETRY_ATTEMPT: usize = 10;
120
121		// Always wait for parent to exit first.
122		//
123		// It's likely that all its children has already exited and reaped by
124		// the time the parent exits.
125		let status = self.inner.wait().await?;
126
127		let pgid = self.pgid.as_raw();
128
129		// Try reaping all children, if there are some that are still alive after
130		// several attempts, then spawn a blocking task to reap them.
131		for retry_attempt in 1..=MAX_RETRY_ATTEMPT {
132			if Self::wait_imp(pgid, WaitPidFlag::WNOHANG)?.is_break() {
133				break;
134			} else if retry_attempt == MAX_RETRY_ATTEMPT {
135				spawn_blocking(move || Self::wait_imp(pgid, WaitPidFlag::empty())).await??;
136			}
137		}
138
139		Ok(status)
140	}
141
142	pub fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
143		match Self::wait_imp(self.pgid.as_raw(), WaitPidFlag::WNOHANG)? {
144			ControlFlow::Break(res) => Ok(res),
145			ControlFlow::Continue(()) => self.inner.try_wait(),
146		}
147	}
148}
149
150impl crate::UnixChildExt for ChildImp {
151	fn signal(&self, sig: Signal) -> Result<()> {
152		self.signal_imp(sig)
153	}
154}