use anyhow::{anyhow, Result};
use derive_more::Display;
use futures::channel::oneshot::channel;
use shared_child::SharedChild;
use std::process::Command;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[cfg(feature = "lock")]
pub mod lock;
#[cfg(unix)]
use {
futures::future::{AbortHandle, Abortable},
shared_child::unix::SharedChildExt,
};
pub trait ProcessGroupExt<T> {
fn new_process_group(&mut self) -> &mut T;
}
impl ProcessGroupExt<Command> for Command {
#[cfg(unix)]
fn new_process_group(&mut self) -> &mut Command {
use std::io;
use std::os::unix::process::CommandExt;
unsafe {
self.pre_exec(|| {
nix::unistd::setsid().map_err(|e| io::Error::from(e))?;
Ok(())
});
}
self
}
#[cfg(not(unix))]
fn new_process_group(&mut self) -> &mut Command {
self
}
}
impl ProcessGroupExt<tokio::process::Command> for tokio::process::Command {
#[cfg(unix)]
fn new_process_group(&mut self) -> &mut tokio::process::Command {
use std::io;
unsafe {
self.pre_exec(|| {
nix::unistd::setsid().map_err(|e| io::Error::from(e))?;
Ok(())
});
}
self
}
#[cfg(not(unix))]
fn new_process_group(&mut self) -> &mut tokio::process::Command {
self
}
}
#[derive(Display)]
pub enum ExeUnitExitStatus {
#[display(fmt = "Aborted - {}", _0)]
Aborted(std::process::ExitStatus),
#[display(fmt = "Finished - {}", _0)]
Finished(std::process::ExitStatus),
#[display(fmt = "Error - {}", _0)]
Error(std::io::Error),
}
#[derive(Clone)]
pub struct ProcessHandle {
process: Arc<SharedChild>,
}
impl ProcessHandle {
pub fn new(mut command: &mut Command) -> Result<ProcessHandle> {
Ok(ProcessHandle {
process: Arc::new(SharedChild::spawn(&mut command)?),
})
}
pub fn kill(&self) {
let _ = self.process.kill();
}
pub fn pid(&self) -> u32 {
self.process.id()
}
#[cfg(unix)]
pub async fn terminate(&self, timeout: Duration) -> Result<()> {
let process = self.process.clone();
if let Err(_) = process.send_signal(libc::SIGTERM) {
return self.check_if_running();
}
let process = self.clone();
let (abort_handle, abort_registration) = AbortHandle::new_pair();
tokio::task::spawn_local(async move {
tokio::time::sleep(timeout).await;
abort_handle.abort();
});
let _ = Abortable::new(process.wait_until_finished(), abort_registration).await;
self.check_if_running()
}
#[cfg(not(unix))]
pub async fn terminate(&self, _timeout: Duration) -> Result<()> {
Err(anyhow!(
"Process termination not supported on non-UNIX systems"
))
}
pub fn check_if_running(&self) -> Result<()> {
let terminate_result = self.process.try_wait();
match terminate_result {
Ok(expected_status) => match expected_status {
Some(_status) => Ok(()),
None => Err(anyhow!(
"Process [pid={}] is still running.",
self.process.id()
)),
},
Err(error) => Err(anyhow!(
"Failed to wait for process [pid={}]. Error: {}",
self.process.id(),
error
)),
}
}
pub async fn wait_until_finished(self) -> ExeUnitExitStatus {
let process = self.process.clone();
let (sender, receiver) = channel::<ExeUnitExitStatus>();
thread::spawn(move || {
let result = process.wait();
let status = match result {
Ok(status) => match status.code() {
None => ExeUnitExitStatus::Aborted(status),
Some(_code) => ExeUnitExitStatus::Finished(status),
},
Err(error) => ExeUnitExitStatus::Error(error),
};
sender.send(status)
});
return receiver.await.unwrap();
}
}