use std::io;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::process::Child;
use tracing::{debug, info, warn};
#[cfg(unix)]
pub struct ProcessHandle {
pid: Option<u32>,
}
#[cfg(windows)]
pub struct ProcessHandle {
job: Option<JobObjectGuard>,
}
#[cfg(windows)]
struct JobObjectGuard {
handle: windows::Win32::Foundation::HANDLE,
}
#[cfg(windows)]
unsafe impl Send for JobObjectGuard {}
#[cfg(windows)]
unsafe impl Sync for JobObjectGuard {}
#[cfg(windows)]
impl Drop for JobObjectGuard {
fn drop(&mut self) {
use windows::Win32::Foundation::CloseHandle;
unsafe {
let _ = CloseHandle(self.handle);
}
debug!("Job object handle closed");
}
}
pub struct ManagedChild {
pub child: Child,
pub handle: ProcessHandle,
}
#[allow(dead_code)]
#[derive(Debug)]
pub enum TerminationOutcome {
Exited(std::process::ExitStatus),
ForceKilled(std::process::ExitStatus),
TimedOut,
}
impl ManagedChild {
pub fn new(mut child: Child) -> io::Result<Self> {
let handle = Self::create_handle(&mut child)?;
Ok(Self { child, handle })
}
#[cfg(unix)]
fn create_handle(child: &mut Child) -> io::Result<ProcessHandle> {
Ok(ProcessHandle { pid: child.id() })
}
#[cfg(windows)]
fn create_handle(child: &mut Child) -> io::Result<ProcessHandle> {
let job = assign_to_job(child)?;
Ok(ProcessHandle { job: Some(job) })
}
pub fn terminate(&mut self) -> io::Result<()> {
self.handle.terminate(&self.child)
}
pub async fn force_kill(&mut self) -> io::Result<()> {
#[cfg(unix)]
{
self.handle.force_kill()
}
#[cfg(windows)]
{
self.child.kill().await
}
}
pub async fn terminate_with_timeout(
&mut self,
timeout: Duration,
) -> io::Result<TerminationOutcome> {
self.terminate()?;
match tokio::time::timeout(timeout, self.wait()).await {
Ok(status) => Ok(TerminationOutcome::Exited(status?)),
Err(_) => {
self.force_kill().await?;
match tokio::time::timeout(timeout, self.wait()).await {
Ok(status) => Ok(TerminationOutcome::ForceKilled(status?)),
Err(_) => Ok(TerminationOutcome::TimedOut),
}
}
}
}
#[allow(dead_code)]
pub fn id(&self) -> Option<u32> {
self.child.id()
}
pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
self.child.wait().await
}
#[allow(dead_code)]
pub async fn kill(&mut self) -> io::Result<()> {
self.child.kill().await
}
}
pub struct StreamingChildHandle {
cancel_tx: Option<tokio::sync::oneshot::Sender<()>>,
current_pid: Arc<AtomicU32>,
final_status_rx: tokio::sync::oneshot::Receiver<std::process::ExitStatus>,
}
#[allow(dead_code)] impl StreamingChildHandle {
pub fn new(
cancel_tx: tokio::sync::oneshot::Sender<()>,
current_pid: Arc<AtomicU32>,
final_status_rx: tokio::sync::oneshot::Receiver<std::process::ExitStatus>,
) -> Self {
Self {
cancel_tx: Some(cancel_tx),
current_pid,
final_status_rx,
}
}
pub fn terminate(&mut self) -> io::Result<()> {
if let Some(tx) = self.cancel_tx.take() {
let _ = tx.send(());
}
Ok(())
}
pub async fn terminate_with_timeout(
&mut self,
timeout: Duration,
) -> io::Result<TerminationOutcome> {
self.terminate()?;
match tokio::time::timeout(timeout, &mut self.final_status_rx).await {
Ok(Ok(status)) => Ok(TerminationOutcome::Exited(status)),
Ok(Err(_)) => {
Ok(TerminationOutcome::ForceKilled({
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
std::process::ExitStatus::from_raw(0)
}
#[cfg(not(unix))]
{
use std::os::windows::process::ExitStatusExt;
std::process::ExitStatus::from_raw(0)
}
}))
}
Err(_elapsed) => Ok(TerminationOutcome::TimedOut),
}
}
pub async fn kill(&mut self) -> io::Result<()> {
self.terminate()
}
pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
(&mut self.final_status_rx).await.map_err(|_| {
io::Error::new(io::ErrorKind::BrokenPipe, "streaming child handle dropped")
})
}
pub fn id(&self) -> Option<u32> {
let pid = self.current_pid.load(Ordering::SeqCst);
if pid == 0 {
None
} else {
Some(pid)
}
}
}
impl ProcessHandle {
#[cfg(unix)]
pub fn terminate(&self, _child: &Child) -> io::Result<()> {
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
if let Some(pid) = self.pid {
debug!("Sending SIGTERM to process group {}", pid);
match killpg(Pid::from_raw(pid as i32), Signal::SIGTERM) {
Ok(_) => {
debug!("Successfully sent SIGTERM to process group {}", pid);
Ok(())
}
Err(e) => {
warn!("Failed to send SIGTERM to process group {}: {}", pid, e);
Err(io::Error::other(e))
}
}
} else {
warn!("No PID available for process group termination");
Ok(())
}
}
#[cfg(unix)]
pub fn force_kill(&self) -> io::Result<()> {
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
if let Some(pid) = self.pid {
debug!("Sending SIGKILL to process group {}", pid);
match killpg(Pid::from_raw(pid as i32), Signal::SIGKILL) {
Ok(_) => {
debug!("Successfully sent SIGKILL to process group {}", pid);
Ok(())
}
Err(e) => {
warn!("Failed to send SIGKILL to process group {}: {}", pid, e);
Err(io::Error::other(e))
}
}
} else {
warn!("No PID available for process group force kill");
Ok(())
}
}
#[cfg(windows)]
pub fn terminate(&self, child: &Child) -> io::Result<()> {
if let Some(pid) = child.id() {
debug!("Terminating Windows process {}", pid);
Ok(())
} else {
warn!("No PID available for Windows process termination");
Ok(())
}
}
}
#[cfg(unix)]
#[allow(dead_code)]
pub fn configure_process_group(cmd: &mut tokio::process::Command) {
use nix::unistd::{setpgid, setsid, Pid};
unsafe {
cmd.pre_exec(|| {
match setsid() {
Ok(_) => {
debug!("Created new session (setsid) for child process");
Ok(())
}
Err(e) => {
warn!("Failed to create new session (setsid): {}", e);
match setpgid(Pid::from_raw(0), Pid::from_raw(0)) {
Ok(_) => {
debug!("Process group created successfully (fallback)");
Ok(())
}
Err(e) => {
warn!("Failed to create process group: {}", e);
Err(io::Error::other(e))
}
}
}
}
});
}
}
#[cfg(windows)]
fn assign_to_job(child: &Child) -> io::Result<JobObjectGuard> {
use std::mem::size_of;
use windows::Win32::Foundation::{CloseHandle, HANDLE};
use windows::Win32::System::JobObjects::*;
use windows::Win32::System::Threading::{OpenProcess, PROCESS_ALL_ACCESS};
unsafe {
let job = CreateJobObjectW(None, windows::core::PCWSTR::null())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let mut info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION::default();
info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
SetInformationJobObject(
job,
JobObjectExtendedLimitInformation,
&info as *const _ as *const std::ffi::c_void,
size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let process_handle = OpenProcess(PROCESS_ALL_ACCESS, false, child.id().unwrap())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
AssignProcessToJobObject(job, process_handle).map_err(|e| {
CloseHandle(process_handle);
CloseHandle(job);
io::Error::new(io::ErrorKind::Other, e)
})?;
CloseHandle(process_handle);
debug!("Process assigned to job object successfully");
Ok(JobObjectGuard { handle: job })
}
}
#[cfg(windows)]
pub fn configure_process_group(_cmd: &mut tokio::process::Command) {
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PostCleanupOutcome {
#[allow(dead_code)]
NoPgid,
Terminated,
Killed,
AlreadyGone,
}
#[cfg(unix)]
pub async fn cleanup_process_group(
pgid: u32,
sigterm_grace_ms: u64,
op: Option<&str>,
change_id: Option<&str>,
) -> PostCleanupOutcome {
use nix::errno::Errno;
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
let pgid_nix = Pid::from_raw(pgid as i32);
match killpg(pgid_nix, Signal::SIGTERM) {
Ok(()) => {
info!(
pgid,
op, change_id, "post-cleanup: SIGTERM sent to process group (pgid={})", pgid
);
}
Err(Errno::ESRCH) => {
debug!(
pgid,
op, change_id, "post-cleanup: process group already gone (ESRCH, pgid={})", pgid
);
return PostCleanupOutcome::AlreadyGone;
}
Err(e) => {
warn!(
pgid,
op, change_id, "post-cleanup: SIGTERM failed for pgid={}: {}", pgid, e
);
}
}
tokio::time::sleep(Duration::from_millis(sigterm_grace_ms)).await;
let outcome = match killpg(pgid_nix, Signal::SIGKILL) {
Ok(()) => {
info!(
pgid,
op, change_id, "post-cleanup: SIGKILL sent to process group (pgid={})", pgid
);
PostCleanupOutcome::Killed
}
Err(Errno::ESRCH) => {
debug!(
pgid,
op, change_id, "post-cleanup: process group gone before SIGKILL (pgid={})", pgid
);
PostCleanupOutcome::Terminated
}
Err(e) => {
warn!(
pgid,
op, change_id, "post-cleanup: SIGKILL failed for pgid={}: {}", pgid, e
);
PostCleanupOutcome::Killed
}
};
match killpg(pgid_nix, Signal::SIGKILL) {
Ok(()) => {
warn!(
pgid,
op,
change_id,
"post-cleanup: survivors detected after SIGKILL sweep (pgid={}); \
processes may have escaped to a new session",
pgid
);
}
Err(Errno::ESRCH) => {
debug!(
pgid,
op, change_id, "post-cleanup: verified no live members in pgid={}", pgid
);
}
Err(e) => {
warn!(
pgid,
op, change_id, "post-cleanup: verification signal failed for pgid={}: {}", pgid, e
);
}
}
outcome
}
#[cfg(not(unix))]
pub async fn cleanup_process_group(
pgid: u32,
_sigterm_grace_ms: u64,
_op: Option<&str>,
_change_id: Option<&str>,
) -> PostCleanupOutcome {
debug!("post-cleanup: no-op on non-Unix platform (pgid={})", pgid);
PostCleanupOutcome::NoPgid
}
#[cfg(all(test, unix))]
mod tests {
use super::*;
use tokio::process::Command;
#[tokio::test]
async fn terminate_with_timeout_exits_cleanly() {
let mut cmd = Command::new("sh");
cmd.arg("-c")
.arg("sleep 5")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
configure_process_group(&mut cmd);
let child = cmd.spawn().expect("spawn sleep");
let mut child = ManagedChild::new(child).expect("managed child");
let outcome = child
.terminate_with_timeout(Duration::from_secs(1))
.await
.expect("terminate");
assert!(matches!(
outcome,
TerminationOutcome::Exited(_) | TerminationOutcome::ForceKilled(_)
));
}
#[tokio::test]
async fn terminate_with_timeout_force_kills() {
let mut cmd = Command::new("sh");
cmd.arg("-c")
.arg("trap '' TERM; while true; do sleep 1; done")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
configure_process_group(&mut cmd);
let child = cmd.spawn().expect("spawn trap");
let mut child = ManagedChild::new(child).expect("managed child");
let outcome = child
.terminate_with_timeout(Duration::from_millis(200))
.await
.expect("terminate");
assert!(matches!(
outcome,
TerminationOutcome::Exited(_)
| TerminationOutcome::ForceKilled(_)
| TerminationOutcome::TimedOut
));
}
fn pgid_is_gone(pgid: u32) -> bool {
use nix::errno::Errno;
use nix::sys::signal::{killpg, Signal};
use nix::unistd::Pid;
match killpg(Pid::from_raw(pgid as i32), Signal::SIGKILL) {
Ok(()) => false, Err(Errno::ESRCH) => true, Err(_) => false,
}
}
#[tokio::test]
async fn successful_command_backgrounded_child_is_cleaned_up() {
let mut cmd = Command::new("sh");
cmd.arg("-c")
.arg("sleep 60 & exit 0")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
configure_process_group(&mut cmd);
let child = cmd.spawn().expect("spawn");
let mut child = ManagedChild::new(child).expect("managed child");
let pgid = child.id().unwrap_or(0);
assert!(pgid > 0, "process must have a PID");
child.wait().await.expect("wait");
cleanup_process_group(pgid, 50, Some("test"), Some("regression-1.6")).await;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
pgid_is_gone(pgid),
"process group {} should be gone after cleanup, but members remain",
pgid
);
}
#[tokio::test]
async fn failed_command_backgrounded_child_is_cleaned_up() {
let mut cmd = Command::new("sh");
cmd.arg("-c")
.arg("sleep 60 & exit 1")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
configure_process_group(&mut cmd);
let child = cmd.spawn().expect("spawn");
let mut child = ManagedChild::new(child).expect("managed child");
let pgid = child.id().unwrap_or(0);
assert!(pgid > 0);
child.wait().await.expect("wait");
cleanup_process_group(pgid, 50, Some("test"), Some("regression-1.7")).await;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
pgid_is_gone(pgid),
"process group {} should be gone after cleanup, but members remain",
pgid
);
}
#[tokio::test]
async fn cancellation_triggers_full_process_group_cleanup() {
let mut cmd = Command::new("sh");
cmd.arg("-c")
.arg("sleep 60 & sleep 60")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
configure_process_group(&mut cmd);
let child = cmd.spawn().expect("spawn");
let mut child = ManagedChild::new(child).expect("managed child");
let pgid = child.id().unwrap_or(0);
assert!(pgid > 0);
let _ = child
.terminate_with_timeout(Duration::from_millis(500))
.await;
cleanup_process_group(pgid, 50, Some("test"), Some("regression-1.8")).await;
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
pgid_is_gone(pgid),
"process group {} should be gone after cancellation + cleanup",
pgid
);
}
}