process_wrap/tokio/
process_group.rs1use std::{
2 future::Future,
3 io::{Error, Result},
4 ops::ControlFlow,
5 os::unix::process::ExitStatusExt,
6 pin::Pin,
7 process::ExitStatus,
8};
9
10use nix::{
11 errno::Errno,
12 libc,
13 sys::{
14 signal::{killpg, Signal},
15 wait::WaitPidFlag,
16 },
17 unistd::Pid,
18};
19use tokio::{process::Command, task::spawn_blocking};
20#[cfg(feature = "tracing")]
21use tracing::instrument;
22
23use crate::ChildExitStatus;
24
25use super::{ChildWrapper, CommandWrap, CommandWrapper};
26
27#[derive(Clone, Copy, Debug)]
39pub struct ProcessGroup {
40 leader: Pid,
41}
42
43impl ProcessGroup {
44 pub fn leader() -> Self {
46 Self {
47 leader: Pid::from_raw(0),
48 }
49 }
50
51 pub fn attach_to(leader: u32) -> Self {
53 Self {
54 leader: Pid::from_raw(leader as i32),
55 }
56 }
57}
58
59#[derive(Debug)]
61pub struct ProcessGroupChild {
62 inner: Box<dyn ChildWrapper>,
63 exit_status: ChildExitStatus,
64 pgid: Pid,
65}
66
67impl ProcessGroupChild {
68 #[cfg_attr(feature = "tracing", instrument(level = "debug"))]
69 pub(crate) fn new(inner: Box<dyn ChildWrapper>, pgid: Pid) -> Self {
70 Self {
71 inner,
72 exit_status: ChildExitStatus::Running,
73 pgid,
74 }
75 }
76
77 pub fn pgid(&self) -> u32 {
81 self.pgid.as_raw() as _
82 }
83}
84
85impl CommandWrapper for ProcessGroup {
86 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
87 fn pre_spawn(&mut self, command: &mut Command, _core: &CommandWrap) -> Result<()> {
88 command.process_group(self.leader.as_raw());
89 Ok(())
90 }
91
92 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
93 fn wrap_child(
94 &mut self,
95 inner: Box<dyn ChildWrapper>,
96 _core: &CommandWrap,
97 ) -> Result<Box<dyn ChildWrapper>> {
98 let pgid = Pid::from_raw(
99 i32::try_from(
100 inner
101 .id()
102 .expect("Command was reaped before we could read its PID"),
103 )
104 .expect("Command PID > i32::MAX"),
105 );
106
107 Ok(Box::new(ProcessGroupChild::new(inner, pgid)))
108 }
109}
110
111impl ProcessGroupChild {
112 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
113 fn signal_imp(&self, sig: Signal) -> Result<()> {
114 killpg(self.pgid, sig).map_err(Error::from)
115 }
116
117 #[cfg_attr(feature = "tracing", instrument(level = "debug"))]
118 fn wait_imp(pgid: Pid, flag: WaitPidFlag) -> Result<ControlFlow<Option<ExitStatus>>> {
119 let mut parent_exit_status: Option<ExitStatus> = None;
124 loop {
125 let mut status: i32 = 0;
128 match unsafe {
129 libc::waitpid(-pgid.as_raw(), &mut status as *mut libc::c_int, flag.bits())
130 } {
131 0 => {
132 return Ok(ControlFlow::Continue(()));
135 }
136 -1 => {
137 match Errno::last() {
138 Errno::ECHILD => {
139 return Ok(ControlFlow::Break(parent_exit_status));
141 }
142 errno => {
143 return Err(Error::from(errno));
144 }
145 }
146 }
147 pid => {
148 if pgid == Pid::from_raw(pid) {
152 parent_exit_status = Some(ExitStatus::from_raw(status));
153 } else {
154 }
156 }
157 };
158 }
159 }
160}
161
162impl ChildWrapper for ProcessGroupChild {
163 fn inner(&self) -> &dyn ChildWrapper {
164 self.inner.inner()
165 }
166 fn inner_mut(&mut self) -> &mut dyn ChildWrapper {
167 self.inner.inner_mut()
168 }
169 fn into_inner(self: Box<Self>) -> Box<dyn ChildWrapper> {
170 self.inner.into_inner()
171 }
172
173 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
174 fn start_kill(&mut self) -> Result<()> {
175 self.signal_imp(Signal::SIGKILL)
176 }
177
178 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
179 fn wait(&mut self) -> Pin<Box<dyn Future<Output = Result<ExitStatus>> + Send + '_>> {
180 Box::pin(async {
181 if let ChildExitStatus::Exited(status) = &self.exit_status {
182 return Ok(*status);
183 }
184
185 const MAX_RETRY_ATTEMPT: usize = 10;
186 let pgid = self.pgid;
187
188 let status = self.inner.wait().await?;
191 self.exit_status = ChildExitStatus::Exited(status);
192
193 for _ in 1..MAX_RETRY_ATTEMPT {
195 if Self::wait_imp(pgid, WaitPidFlag::WNOHANG)?.is_break() {
196 return Ok(status);
197 }
198 }
199
200 let _ = spawn_blocking(move || Self::wait_imp(pgid, WaitPidFlag::empty())).await??;
203 Ok(status)
204 })
205 }
206
207 #[cfg_attr(feature = "tracing", instrument(level = "debug", skip(self)))]
208 fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
209 if let ChildExitStatus::Exited(status) = &self.exit_status {
210 return Ok(Some(*status));
211 }
212
213 match Self::wait_imp(self.pgid, WaitPidFlag::WNOHANG)? {
214 ControlFlow::Break(res) => {
215 if let Some(status) = res {
216 self.exit_status = ChildExitStatus::Exited(status);
217 }
218 Ok(res)
219 }
220 ControlFlow::Continue(()) => {
221 let exited = self.inner.try_wait()?;
222 if let Some(exited) = exited {
223 self.exit_status = ChildExitStatus::Exited(exited);
224 }
225 Ok(exited)
226 }
227 }
228 }
229
230 fn signal(&self, sig: i32) -> Result<()> {
231 self.signal_imp(Signal::try_from(sig)?)
232 }
233}