ya_utils_process/
lib.rs

1use anyhow::{anyhow, Result};
2use derive_more::Display;
3use futures::channel::oneshot::channel;
4use shared_child::SharedChild;
5use std::process::Command;
6use std::sync::Arc;
7use std::thread;
8use std::time::Duration;
9
10#[cfg(feature = "lock")]
11pub mod lock;
12
13#[cfg(unix)]
14use {
15    futures::future::{AbortHandle, Abortable},
16    shared_child::unix::SharedChildExt,
17};
18
19pub trait ProcessGroupExt<T> {
20    fn new_process_group(&mut self) -> &mut T;
21}
22
23impl ProcessGroupExt<Command> for Command {
24    #[cfg(unix)]
25    fn new_process_group(&mut self) -> &mut Command {
26        // FIXME: Linux: refactor and use the tokio-process-ns crate
27
28        use std::io;
29        use std::os::unix::process::CommandExt;
30
31        unsafe {
32            self.pre_exec(|| {
33                nix::unistd::setsid().map_err(|e| io::Error::from(e))?;
34                Ok(())
35            });
36        }
37        self
38    }
39
40    #[cfg(not(unix))]
41    fn new_process_group(&mut self) -> &mut Command {
42        self
43    }
44}
45
46impl ProcessGroupExt<tokio::process::Command> for tokio::process::Command {
47    #[cfg(unix)]
48    fn new_process_group(&mut self) -> &mut tokio::process::Command {
49        use std::io;
50
51        unsafe {
52            self.pre_exec(|| {
53                nix::unistd::setsid().map_err(|e| io::Error::from(e))?;
54                Ok(())
55            });
56        }
57        self
58    }
59
60    #[cfg(not(unix))]
61    fn new_process_group(&mut self) -> &mut tokio::process::Command {
62        self
63    }
64}
65
66#[derive(Display)]
67pub enum ExeUnitExitStatus {
68    #[display(fmt = "Aborted - {}", _0)]
69    Aborted(std::process::ExitStatus),
70    #[display(fmt = "Finished - {}", _0)]
71    Finished(std::process::ExitStatus),
72    #[display(fmt = "Error - {}", _0)]
73    Error(std::io::Error),
74}
75
76#[derive(Clone)]
77pub struct ProcessHandle {
78    process: Arc<SharedChild>,
79}
80
81impl ProcessHandle {
82    pub fn new(mut command: &mut Command) -> Result<ProcessHandle> {
83        Ok(ProcessHandle {
84            process: Arc::new(SharedChild::spawn(&mut command)?),
85        })
86    }
87
88    pub fn kill(&self) {
89        let _ = self.process.kill();
90    }
91
92    pub fn pid(&self) -> u32 {
93        self.process.id()
94    }
95
96    #[cfg(unix)]
97    pub async fn terminate(&self, timeout: Duration) -> Result<()> {
98        let process = self.process.clone();
99        if let Err(_) = process.send_signal(libc::SIGTERM) {
100            // Error means, that probably process was already terminated, because:
101            // - We have permissions to send signal, since we created this process.
102            // - We specified correct signal SIGTERM.
103            // But better let's check.
104            return self.check_if_running();
105        }
106
107        let process = self.clone();
108        let (abort_handle, abort_registration) = AbortHandle::new_pair();
109
110        tokio::task::spawn_local(async move {
111            tokio::time::sleep(timeout).await;
112            abort_handle.abort();
113        });
114
115        let _ = Abortable::new(process.wait_until_finished(), abort_registration).await;
116        self.check_if_running()
117    }
118
119    #[cfg(not(unix))]
120    pub async fn terminate(&self, _timeout: Duration) -> Result<()> {
121        // TODO: Implement termination for Windows
122        Err(anyhow!(
123            "Process termination not supported on non-UNIX systems"
124        ))
125    }
126
127    pub fn check_if_running(&self) -> Result<()> {
128        let terminate_result = self.process.try_wait();
129        match terminate_result {
130            Ok(expected_status) => match expected_status {
131                // Process already exited. Terminate was successful.
132                Some(_status) => Ok(()),
133                None => Err(anyhow!(
134                    "Process [pid={}] is still running.",
135                    self.process.id()
136                )),
137            },
138            Err(error) => Err(anyhow!(
139                "Failed to wait for process [pid={}]. Error: {}",
140                self.process.id(),
141                error
142            )),
143        }
144    }
145
146    pub async fn wait_until_finished(self) -> ExeUnitExitStatus {
147        let process = self.process.clone();
148        let (sender, receiver) = channel::<ExeUnitExitStatus>();
149
150        thread::spawn(move || {
151            let result = process.wait();
152
153            let status = match result {
154                Ok(status) => match status.code() {
155                    // status.code() will return None in case of termination by signal.
156                    None => ExeUnitExitStatus::Aborted(status),
157                    Some(_code) => ExeUnitExitStatus::Finished(status),
158                },
159                Err(error) => ExeUnitExitStatus::Error(error),
160            };
161            sender.send(status)
162        });
163
164        // Note: unwrap can't fail here. All sender, receiver and thread will
165        // end their lifetime before await will return. There's no danger
166        // that one of them will be dropped earlier.
167        return receiver.await.unwrap();
168    }
169}