process_wrap/tokio/
process_group.rs1use std::{
2 future::Future,
3 io::{Error, Result},
4 ops::ControlFlow,
5 os::unix::process::ExitStatusExt,
6 process::ExitStatus,
7};
8
9use nix::{
10 errno::Errno,
11 libc,
12 sys::{
13 signal::{killpg, Signal},
14 wait::WaitPidFlag,
15 },
16 unistd::Pid,
17};
18use tokio::{
19 process::{Child, Command},
20 task::spawn_blocking,
21};
22#[cfg(feature = "tracing")]
23use tracing::instrument;
24
25use crate::ChildExitStatus;
26
27use super::{TokioChildWrapper, TokioCommandWrap, TokioCommandWrapper};
28
29#[derive(Clone, Copy, Debug)]
41pub struct ProcessGroup {
42 leader: Pid,
43}
44
45impl ProcessGroup {
46 pub fn leader() -> Self {
48 Self {
49 leader: Pid::from_raw(0),
50 }
51 }
52
53 pub fn attach_to(leader: u32) -> Self {
55 Self {
56 leader: Pid::from_raw(leader as i32),
57 }
58 }
59}
60
61#[derive(Debug)]
63pub struct ProcessGroupChild {
64 inner: Box<dyn TokioChildWrapper>,
65 exit_status: ChildExitStatus,
66 pgid: Pid,
67}
68
69impl ProcessGroupChild {
70 #[cfg_attr(feature = "tracing", instrument(level = "debug"))]
71 pub(crate) fn new(inner: Box<dyn TokioChildWrapper>, pgid: Pid) -> Self {
72 Self {
73 inner,
74 exit_status: ChildExitStatus::Running,
75 pgid,
76 }
77 }
78
79 pub fn pgid(&self) -> u32 {
83 self.pgid.as_raw() as _
84 }
85}
86
87impl TokioCommandWrapper for ProcessGroup {
88 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
89 fn pre_spawn(&mut self, command: &mut Command, _core: &TokioCommandWrap) -> Result<()> {
90 command.process_group(self.leader.as_raw());
91 Ok(())
92 }
93
94 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
95 fn wrap_child(
96 &mut self,
97 inner: Box<dyn TokioChildWrapper>,
98 _core: &TokioCommandWrap,
99 ) -> Result<Box<dyn TokioChildWrapper>> {
100 let pgid = Pid::from_raw(
101 i32::try_from(
102 inner
103 .id()
104 .expect("Command was reaped before we could read its PID"),
105 )
106 .expect("Command PID > i32::MAX"),
107 );
108
109 Ok(Box::new(ProcessGroupChild::new(inner, pgid)))
110 }
111}
112
113impl ProcessGroupChild {
114 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
115 fn signal_imp(&self, sig: Signal) -> Result<()> {
116 killpg(self.pgid, sig).map_err(Error::from)
117 }
118
119 #[cfg_attr(feature = "tracing", instrument(level = "debug"))]
120 fn wait_imp(pgid: Pid, flag: WaitPidFlag) -> Result<ControlFlow<Option<ExitStatus>>> {
121 let mut parent_exit_status: Option<ExitStatus> = None;
126 loop {
127 let mut status: i32 = 0;
130 match unsafe {
131 libc::waitpid(-pgid.as_raw(), &mut status as *mut libc::c_int, flag.bits())
132 } {
133 0 => {
134 return Ok(ControlFlow::Continue(()));
137 }
138 -1 => {
139 match Errno::last() {
140 Errno::ECHILD => {
141 return Ok(ControlFlow::Break(parent_exit_status));
143 }
144 errno => {
145 return Err(Error::from(errno));
146 }
147 }
148 }
149 pid => {
150 if pgid == Pid::from_raw(pid) {
154 parent_exit_status = Some(ExitStatus::from_raw(status));
155 } else {
156 }
158 }
159 };
160 }
161 }
162}
163
164impl TokioChildWrapper for ProcessGroupChild {
165 fn inner(&self) -> &Child {
166 self.inner.inner()
167 }
168 fn inner_mut(&mut self) -> &mut Child {
169 self.inner.inner_mut()
170 }
171 fn into_inner(self: Box<Self>) -> Child {
172 self.inner.into_inner()
173 }
174
175 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
176 fn start_kill(&mut self) -> Result<()> {
177 self.signal_imp(Signal::SIGKILL)
178 }
179
180 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
181 fn wait(&mut self) -> Box<dyn Future<Output = Result<ExitStatus>> + Send + '_> {
182 Box::new(async {
183 if let ChildExitStatus::Exited(status) = &self.exit_status {
184 return Ok(*status);
185 }
186
187 const MAX_RETRY_ATTEMPT: usize = 10;
188 let pgid = self.pgid;
189
190 let status = Box::into_pin(self.inner.wait()).await?;
193 self.exit_status = ChildExitStatus::Exited(status);
194
195 for _ in 1..MAX_RETRY_ATTEMPT {
197 if Self::wait_imp(pgid, WaitPidFlag::WNOHANG)?.is_break() {
198 return Ok(status);
199 }
200 }
201
202 spawn_blocking(move || Self::wait_imp(pgid, WaitPidFlag::empty())).await??;
205 Ok(status)
206 })
207 }
208
209 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
210 fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
211 if let ChildExitStatus::Exited(status) = &self.exit_status {
212 return Ok(Some(*status));
213 }
214
215 match Self::wait_imp(self.pgid, WaitPidFlag::WNOHANG)? {
216 ControlFlow::Break(res) => {
217 if let Some(status) = res {
218 self.exit_status = ChildExitStatus::Exited(status);
219 }
220 Ok(res)
221 }
222 ControlFlow::Continue(()) => {
223 let exited = self.inner.try_wait()?;
224 if let Some(exited) = exited {
225 self.exit_status = ChildExitStatus::Exited(exited);
226 }
227 Ok(exited)
228 }
229 }
230 }
231
232 fn signal(&self, sig: i32) -> Result<()> {
233 self.signal_imp(Signal::try_from(sig)?)
234 }
235}