command_group/tokio/child/
unix.rs1use std::{
2 convert::TryInto,
3 io::{Error, Result},
4 ops::ControlFlow,
5 os::unix::process::ExitStatusExt,
6 process::ExitStatus,
7};
8
9use nix::{
10 errno::Errno,
11 libc,
12 sys::{
13 signal::{killpg, Signal},
14 wait::WaitPidFlag,
15 },
16 unistd::Pid,
17};
18use tokio::{
19 process::{Child, ChildStderr, ChildStdin, ChildStdout},
20 task::spawn_blocking,
21};
22
23pub(super) struct ChildImp {
24 pgid: Pid,
25 inner: Child,
26}
27
28impl ChildImp {
29 pub(super) fn new(inner: Child) -> Self {
30 let pid = inner
31 .id()
32 .expect("Command was reaped before we could read its PID")
33 .try_into()
34 .expect("Command PID > i32::MAX");
35 Self {
36 pgid: Pid::from_raw(pid),
37 inner,
38 }
39 }
40
41 pub(super) fn take_stdin(&mut self) -> Option<ChildStdin> {
42 self.inner.stdin.take()
43 }
44
45 pub(super) fn take_stdout(&mut self) -> Option<ChildStdout> {
46 self.inner.stdout.take()
47 }
48
49 pub(super) fn take_stderr(&mut self) -> Option<ChildStderr> {
50 self.inner.stderr.take()
51 }
52
53 pub fn inner(&mut self) -> &mut Child {
54 &mut self.inner
55 }
56
57 pub fn into_inner(self) -> Child {
58 self.inner
59 }
60
61 pub(super) fn signal_imp(&self, sig: Signal) -> Result<()> {
62 killpg(self.pgid, sig).map_err(Error::from)
63 }
64
65 pub fn start_kill(&mut self) -> Result<()> {
66 self.signal_imp(Signal::SIGKILL)
67 }
68
69 pub fn id(&self) -> Option<u32> {
70 self.inner.id()
71 }
72
73 fn wait_imp(pgid: i32, flag: WaitPidFlag) -> Result<ControlFlow<Option<ExitStatus>>> {
74 let mut parent_exit_status: Option<ExitStatus> = None;
80 loop {
81 let mut status: i32 = 0;
85 match unsafe { libc::waitpid(-pgid, &mut status as *mut libc::c_int, flag.bits()) } {
86 0 => {
87 return Ok(ControlFlow::Continue(()));
90 }
91 -1 => {
92 match Errno::last() {
93 Errno::ECHILD => {
94 return Ok(ControlFlow::Break(parent_exit_status));
97 }
98 errno => {
99 return Err(Error::from(errno));
100 }
101 }
102 }
103 pid => {
104 if pgid == pid {
109 parent_exit_status = Some(ExitStatus::from_raw(status));
110 } else {
111 }
113 }
114 };
115 }
116 }
117
118 pub async fn wait(&mut self) -> Result<ExitStatus> {
119 const MAX_RETRY_ATTEMPT: usize = 10;
120
121 let status = self.inner.wait().await?;
126
127 let pgid = self.pgid.as_raw();
128
129 for retry_attempt in 1..=MAX_RETRY_ATTEMPT {
132 if Self::wait_imp(pgid, WaitPidFlag::WNOHANG)?.is_break() {
133 break;
134 } else if retry_attempt == MAX_RETRY_ATTEMPT {
135 spawn_blocking(move || Self::wait_imp(pgid, WaitPidFlag::empty())).await??;
136 }
137 }
138
139 Ok(status)
140 }
141
142 pub fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
143 match Self::wait_imp(self.pgid.as_raw(), WaitPidFlag::WNOHANG)? {
144 ControlFlow::Break(res) => Ok(res),
145 ControlFlow::Continue(()) => self.inner.try_wait(),
146 }
147 }
148}
149
150impl crate::UnixChildExt for ChildImp {
151 fn signal(&self, sig: Signal) -> Result<()> {
152 self.signal_imp(sig)
153 }
154}