1use anyhow::{anyhow, Result};
2use derive_more::Display;
3use futures::channel::oneshot::channel;
4use shared_child::SharedChild;
5use std::process::Command;
6use std::sync::Arc;
7use std::thread;
8use std::time::Duration;
9
10#[cfg(feature = "lock")]
11pub mod lock;
12
13#[cfg(unix)]
14use {
15 futures::future::{AbortHandle, Abortable},
16 shared_child::unix::SharedChildExt,
17};
18
19pub trait ProcessGroupExt<T> {
20 fn new_process_group(&mut self) -> &mut T;
21}
22
23impl ProcessGroupExt<Command> for Command {
24 #[cfg(unix)]
25 fn new_process_group(&mut self) -> &mut Command {
26 use std::io;
29 use std::os::unix::process::CommandExt;
30
31 unsafe {
32 self.pre_exec(|| {
33 nix::unistd::setsid().map_err(|e| io::Error::from(e))?;
34 Ok(())
35 });
36 }
37 self
38 }
39
40 #[cfg(not(unix))]
41 fn new_process_group(&mut self) -> &mut Command {
42 self
43 }
44}
45
46impl ProcessGroupExt<tokio::process::Command> for tokio::process::Command {
47 #[cfg(unix)]
48 fn new_process_group(&mut self) -> &mut tokio::process::Command {
49 use std::io;
50
51 unsafe {
52 self.pre_exec(|| {
53 nix::unistd::setsid().map_err(|e| io::Error::from(e))?;
54 Ok(())
55 });
56 }
57 self
58 }
59
60 #[cfg(not(unix))]
61 fn new_process_group(&mut self) -> &mut tokio::process::Command {
62 self
63 }
64}
65
66#[derive(Display)]
67pub enum ExeUnitExitStatus {
68 #[display(fmt = "Aborted - {}", _0)]
69 Aborted(std::process::ExitStatus),
70 #[display(fmt = "Finished - {}", _0)]
71 Finished(std::process::ExitStatus),
72 #[display(fmt = "Error - {}", _0)]
73 Error(std::io::Error),
74}
75
76#[derive(Clone)]
77pub struct ProcessHandle {
78 process: Arc<SharedChild>,
79}
80
81impl ProcessHandle {
82 pub fn new(mut command: &mut Command) -> Result<ProcessHandle> {
83 Ok(ProcessHandle {
84 process: Arc::new(SharedChild::spawn(&mut command)?),
85 })
86 }
87
88 pub fn kill(&self) {
89 let _ = self.process.kill();
90 }
91
92 pub fn pid(&self) -> u32 {
93 self.process.id()
94 }
95
96 #[cfg(unix)]
97 pub async fn terminate(&self, timeout: Duration) -> Result<()> {
98 let process = self.process.clone();
99 if let Err(_) = process.send_signal(libc::SIGTERM) {
100 return self.check_if_running();
105 }
106
107 let process = self.clone();
108 let (abort_handle, abort_registration) = AbortHandle::new_pair();
109
110 tokio::task::spawn_local(async move {
111 tokio::time::sleep(timeout).await;
112 abort_handle.abort();
113 });
114
115 let _ = Abortable::new(process.wait_until_finished(), abort_registration).await;
116 self.check_if_running()
117 }
118
119 #[cfg(not(unix))]
120 pub async fn terminate(&self, _timeout: Duration) -> Result<()> {
121 Err(anyhow!(
123 "Process termination not supported on non-UNIX systems"
124 ))
125 }
126
127 pub fn check_if_running(&self) -> Result<()> {
128 let terminate_result = self.process.try_wait();
129 match terminate_result {
130 Ok(expected_status) => match expected_status {
131 Some(_status) => Ok(()),
133 None => Err(anyhow!(
134 "Process [pid={}] is still running.",
135 self.process.id()
136 )),
137 },
138 Err(error) => Err(anyhow!(
139 "Failed to wait for process [pid={}]. Error: {}",
140 self.process.id(),
141 error
142 )),
143 }
144 }
145
146 pub async fn wait_until_finished(self) -> ExeUnitExitStatus {
147 let process = self.process.clone();
148 let (sender, receiver) = channel::<ExeUnitExitStatus>();
149
150 thread::spawn(move || {
151 let result = process.wait();
152
153 let status = match result {
154 Ok(status) => match status.code() {
155 None => ExeUnitExitStatus::Aborted(status),
157 Some(_code) => ExeUnitExitStatus::Finished(status),
158 },
159 Err(error) => ExeUnitExitStatus::Error(error),
160 };
161 sender.send(status)
162 });
163
164 return receiver.await.unwrap();
168 }
169}