use super::ProcessHandle;
use crate::error::{TerminationAction, TerminationError};
use crate::output_stream::OutputStream;
use std::io;
use std::process::ExitStatus;
use std::time::Duration;
mod diagnostics;
#[cfg(any(unix, windows))]
mod shutdown;
pub(in crate::process_handle) use diagnostics::TerminationDiagnostics;
#[cfg(any(unix, windows))]
pub use shutdown::{
GracefulShutdown, GracefulShutdownBuilder, UnixGracefulPhase, UnixGracefulShutdown,
UnixGracefulSignal, WindowsGracefulShutdown,
};
#[cfg(any(unix, windows))]
const FORCE_KILL_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
#[cfg(any(unix, windows))]
const REAP_AFTER_SIGNAL_FAILURE_GRACE: Duration = Duration::from_millis(100);
#[cfg(unix)]
const KILL_LABEL: &str = "SIGKILL";
#[cfg(windows)]
const KILL_LABEL: &str = "TerminateProcess";
#[cfg(not(any(unix, windows)))]
const KILL_LABEL: &str = "kill";
#[cfg(any(unix, windows))]
#[derive(Debug, Clone, Copy)]
struct TerminationStep {
signal_label: &'static str,
timeout: Duration,
}
impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
pub async fn kill(&mut self) -> Result<(), TerminationError> {
self.stdin().close();
let mut diagnostics = TerminationDiagnostics::default();
if let Err(err) = self.send_kill_signal() {
diagnostics.record(
TerminationAction::SendSignal {
signal_name: KILL_LABEL,
},
err,
);
return Err(diagnostics.into_termination_failed(self.name.clone()));
}
if let Err(err) = self.wait_for_completion_unbounded().await {
diagnostics.record(TerminationAction::WaitForExit, err);
return Err(diagnostics.into_termination_failed(self.name.clone()));
}
Ok(())
}
}
#[cfg(any(unix, windows))]
impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
where
Stdout: OutputStream,
Stderr: OutputStream,
{
pub async fn terminate(
&mut self,
shutdown: GracefulShutdown,
) -> Result<ExitStatus, TerminationError> {
#[cfg(unix)]
{
self.terminate_with_hooks(
&shutdown.unix,
Self::try_reap_exit_status,
|this, signal| match signal {
UnixGracefulSignal::Interrupt => this.group.send_interrupt(),
UnixGracefulSignal::Terminate => this.group.send_terminate(),
},
)
.await
}
#[cfg(windows)]
{
self.terminate_with_hooks(&shutdown.windows, Self::try_reap_exit_status, |this| {
this.group.send_ctrl_break()
})
.await
}
}
#[doc(hidden)]
#[cfg(unix)]
pub async fn terminate_with_hooks<ExitStatusReaper, SignalSender>(
&mut self,
sequence: &UnixGracefulShutdown,
try_reap_exit_status: ExitStatusReaper,
mut send_signal: SignalSender,
) -> Result<ExitStatus, TerminationError>
where
ExitStatusReaper: FnMut(&mut Self) -> Result<Option<ExitStatus>, io::Error>,
SignalSender: FnMut(&mut Self, UnixGracefulSignal) -> Result<(), io::Error>,
{
let phases = sequence.phases();
let steps: Vec<TerminationStep> = phases
.iter()
.map(|phase| TerminationStep {
signal_label: phase.signal.label(),
timeout: phase.timeout,
})
.collect();
self.run_termination_loop(&steps, try_reap_exit_status, |this, index, _step| {
send_signal(this, phases[index].signal)
})
.await
}
#[doc(hidden)]
#[cfg(windows)]
pub async fn terminate_with_hooks<PreflightReaper, SignalSender>(
&mut self,
sequence: &WindowsGracefulShutdown,
try_reap_exit_status: PreflightReaper,
mut send_signal: SignalSender,
) -> Result<ExitStatus, TerminationError>
where
PreflightReaper: FnMut(&mut Self) -> Result<Option<ExitStatus>, io::Error>,
SignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
{
let steps = [TerminationStep {
signal_label: "CTRL_BREAK_EVENT",
timeout: sequence.timeout,
}];
self.run_termination_loop(&steps, try_reap_exit_status, |this, _index, _step| {
send_signal(this)
})
.await
}
#[cfg(any(unix, windows))]
async fn run_termination_loop<PreflightReaper, SignalSender>(
&mut self,
steps: &[TerminationStep],
mut try_reap_exit_status: PreflightReaper,
mut send_signal: SignalSender,
) -> Result<ExitStatus, TerminationError>
where
PreflightReaper: FnMut(&mut Self) -> Result<Option<ExitStatus>, io::Error>,
SignalSender: FnMut(&mut Self, usize, &TerminationStep) -> Result<(), io::Error>,
{
debug_assert!(
!steps.is_empty(),
"run_termination_loop requires at least one graceful step",
);
let result = 'termination: {
let mut diagnostics = TerminationDiagnostics::default();
let first_phase_label = steps.first().map_or(KILL_LABEL, |step| step.signal_label);
match try_reap_exit_status(self) {
Ok(Some(exit_status)) => {
break 'termination Ok(exit_status);
}
Ok(None) => {}
Err(err) => {
tracing::warn!(
process = %self.name,
signal = first_phase_label,
error = %err,
"Could not determine process state before termination. Attempting first graceful phase."
);
diagnostics.record(TerminationAction::CheckStatus, err);
}
}
for (index, step) in steps.iter().enumerate() {
let next_label = steps
.get(index + 1)
.map_or(KILL_LABEL, |next| next.signal_label);
let send = &mut send_signal;
let outcome = self
.attempt_graceful_phase(
step.signal_label,
next_label,
step.timeout,
&mut diagnostics,
&mut |this: &mut Self| send(this, index, step),
)
.await;
if let Some(exit_status) = outcome {
break 'termination Ok(exit_status);
}
}
self.attempt_forceful_kill(diagnostics).await
};
self.disarm_after_successful_termination(result)
}
#[doc(hidden)]
pub fn disarm_after_successful_termination<T>(
&mut self,
result: Result<T, TerminationError>,
) -> Result<T, TerminationError> {
if result.is_ok() {
self.must_not_be_terminated();
}
result
}
async fn attempt_graceful_phase<SignalSender>(
&mut self,
signal_name: &'static str,
next_signal_name: &'static str,
timeout: Duration,
diagnostics: &mut TerminationDiagnostics,
send_signal: &mut SignalSender,
) -> Option<ExitStatus>
where
SignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
{
match send_signal(self) {
Ok(()) => match self.wait_for_exit_after_signal(timeout).await {
Ok(Some(exit_status)) => Some(exit_status),
Ok(None) => {
let not_terminated = wait_timeout_error(timeout);
tracing::warn!(
process = %self.name,
signal = signal_name,
next_signal = next_signal_name,
error = %not_terminated,
"Graceful shutdown signal timed out. Attempting next shutdown phase."
);
diagnostics.record(TerminationAction::WaitForExit, not_terminated);
None
}
Err(wait_error) => {
tracing::warn!(
process = %self.name,
signal = signal_name,
next_signal = next_signal_name,
error = %wait_error,
"Wait for graceful shutdown failed. Attempting next shutdown phase."
);
diagnostics.record(TerminationAction::WaitForExit, wait_error);
None
}
},
Err(send_error) => {
tracing::warn!(
process = %self.name,
signal = signal_name,
next_signal = next_signal_name,
error = %send_error,
"Graceful shutdown signal could not be sent. Attempting next shutdown phase."
);
diagnostics.record(TerminationAction::SendSignal { signal_name }, send_error);
match self
.wait_for_exit_after_signal(REAP_AFTER_SIGNAL_FAILURE_GRACE)
.await
{
Ok(Some(exit_status)) => Some(exit_status),
Ok(None) => None,
Err(reap_error) => {
tracing::warn!(
process = %self.name,
signal = signal_name,
error = %reap_error,
"Could not determine process state after graceful signal send failed."
);
diagnostics.record(TerminationAction::CheckStatus, reap_error);
None
}
}
}
}
}
async fn attempt_forceful_kill(
&mut self,
mut diagnostics: TerminationDiagnostics,
) -> Result<ExitStatus, TerminationError> {
match self.send_kill_signal() {
Ok(()) => {
match self
.wait_for_exit_after_signal(FORCE_KILL_WAIT_TIMEOUT)
.await
{
Ok(Some(exit_status)) => Ok(exit_status),
Ok(None) => {
let not_terminated_after_kill = wait_timeout_error(FORCE_KILL_WAIT_TIMEOUT);
tracing::error!(
process = %self.name,
kill_signal = KILL_LABEL,
"Process did not terminate after all termination attempts. Process may still be running. Manual intervention and investigation required!"
);
diagnostics
.record(TerminationAction::WaitForExit, not_terminated_after_kill);
Err(diagnostics.into_termination_failed(self.name.clone()))
}
Err(not_terminated_after_kill) => {
tracing::error!(
process = %self.name,
kill_signal = KILL_LABEL,
"Process did not terminate after all termination attempts. Process may still be running. Manual intervention and investigation required!"
);
diagnostics
.record(TerminationAction::WaitForExit, not_terminated_after_kill);
Err(diagnostics.into_termination_failed(self.name.clone()))
}
}
}
Err(kill_error) => {
tracing::error!(
process = %self.name,
error = %kill_error,
signal = KILL_LABEL,
"Forceful shutdown failed. Process may still be running. Manual intervention required!"
);
diagnostics.record(
TerminationAction::SendSignal {
signal_name: KILL_LABEL,
},
kill_error,
);
match self
.wait_for_exit_after_signal(REAP_AFTER_SIGNAL_FAILURE_GRACE)
.await
{
Ok(Some(exit_status)) => {
return Ok(exit_status);
}
Ok(None) => {}
Err(reap_error) => {
tracing::warn!(
process = %self.name,
signal = KILL_LABEL,
error = %reap_error,
"Could not determine process state after forceful shutdown failed."
);
diagnostics.record(TerminationAction::CheckStatus, reap_error);
}
}
Err(diagnostics.into_termination_failed(self.name.clone()))
}
}
}
}
fn wait_timeout_error(timeout: Duration) -> io::Error {
io::Error::new(
io::ErrorKind::TimedOut,
format!("process did not complete within {timeout:?}"),
)
}