use anyhow::Result;
use tracing::info;
use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
use crate::schema::{JobState, JobStateJob, JobStateResult, JobStatus, KillData, Response};
#[derive(Debug)]
pub struct KillOpts<'a> {
pub job_id: &'a str,
pub root: Option<&'a str>,
pub signal: &'a str,
pub no_wait: bool,
}
impl<'a> Default for KillOpts<'a> {
fn default() -> Self {
KillOpts {
job_id: "",
root: None,
signal: "TERM",
no_wait: false,
}
}
}
pub fn execute(opts: KillOpts) -> Result<()> {
let data = execute_inner(opts)?;
let response = Response::new("kill", data);
response.print();
Ok(())
}
pub fn execute_inner(opts: KillOpts) -> Result<KillData> {
let root = resolve_root(opts.root);
let job_dir = JobDir::open(&root, opts.job_id)?;
let state = job_dir.read_state()?;
let signal_upper = opts.signal.to_uppercase();
if *state.status() == JobStatus::Created {
return Err(anyhow::Error::new(InvalidJobState(format!(
"job {} is in 'created' state and has not been started; cannot send signal",
opts.job_id
))));
}
if *state.status() != JobStatus::Running {
return Ok(KillData {
job_id: job_dir.job_id.clone(),
signal: signal_upper,
state: if opts.no_wait {
None
} else {
Some(state.status().as_str().to_string())
},
exit_code: if opts.no_wait {
None
} else {
state.exit_code()
},
terminated_signal: if opts.no_wait {
None
} else {
state.result.signal.clone()
},
observed_within_ms: if opts.no_wait { None } else { Some(0) },
});
}
if let Some(pid) = state.pid {
#[cfg(windows)]
send_signal(pid, &signal_upper, state.windows_job_name.as_deref())?;
#[cfg(not(windows))]
send_signal(pid, &signal_upper)?;
info!(job_id = %job_dir.job_id, pid, signal = %signal_upper, "signal sent");
let now = crate::run::now_rfc3339_pub();
let new_state = JobState {
job: JobStateJob {
id: job_dir.job_id.clone(),
status: JobStatus::Killed,
started_at: state.started_at().map(|s| s.to_string()),
},
result: JobStateResult {
exit_code: None,
signal: Some(signal_upper.clone()),
duration_ms: None,
},
pid: Some(pid),
finished_at: Some(now.clone()),
updated_at: now,
windows_job_name: None,
};
job_dir.write_state(&new_state)?;
}
if opts.no_wait {
return Ok(KillData {
job_id: job_dir.job_id.clone(),
signal: signal_upper,
state: None,
exit_code: None,
terminated_signal: None,
observed_within_ms: None,
});
}
let obs = observe_post_signal(&job_dir, std::time::Duration::from_secs(3));
Ok(KillData {
job_id: job_dir.job_id.clone(),
signal: signal_upper,
state: Some(obs.state),
exit_code: obs.exit_code,
terminated_signal: obs.terminated_signal,
observed_within_ms: Some(obs.observed_within_ms),
})
}
struct PostSignalObservation {
state: String,
exit_code: Option<i32>,
terminated_signal: Option<String>,
observed_within_ms: u64,
}
fn observe_post_signal(job_dir: &JobDir, budget: std::time::Duration) -> PostSignalObservation {
let start = std::time::Instant::now();
let deadline = start + budget;
let poll_interval = std::time::Duration::from_millis(100);
loop {
if let Ok(st) = job_dir.read_state()
&& !st.status().is_non_terminal()
{
return PostSignalObservation {
state: st.status().as_str().to_string(),
exit_code: st.exit_code(),
terminated_signal: st.result.signal.clone(),
observed_within_ms: start.elapsed().as_millis() as u64,
};
}
if std::time::Instant::now() >= deadline {
break;
}
std::thread::sleep(poll_interval);
}
if let Ok(st) = job_dir.read_state() {
PostSignalObservation {
state: st.status().as_str().to_string(),
exit_code: st.exit_code(),
terminated_signal: st.result.signal.clone(),
observed_within_ms: start.elapsed().as_millis() as u64,
}
} else {
PostSignalObservation {
state: "running".to_string(),
exit_code: None,
terminated_signal: None,
observed_within_ms: start.elapsed().as_millis() as u64,
}
}
}
#[cfg(unix)]
fn send_signal(pid: u32, signal: &str) -> Result<()> {
let signum: libc::c_int = match signal {
"TERM" => libc::SIGTERM,
"INT" => libc::SIGINT,
"KILL" => libc::SIGKILL,
_ => libc::SIGKILL, };
let pgid = -(pid as libc::pid_t);
let ret = unsafe { libc::kill(pgid, signum) };
if ret != 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ESRCH) {
let ret2 = unsafe { libc::kill(pid as libc::pid_t, signum) };
if ret2 != 0 {
let err2 = std::io::Error::last_os_error();
if err2.raw_os_error() != Some(libc::ESRCH) {
return Err(err2.into());
}
}
} else {
return Err(err.into());
}
}
Ok(())
}
#[cfg(windows)]
fn send_signal(pid: u32, signal: &str, job_name: Option<&str>) -> Result<()> {
use tracing::debug;
use windows::Win32::Foundation::CloseHandle;
let _mapped = match signal {
"TERM" => "TerminateJobObject (TERM→process-tree kill)",
"INT" => "TerminateJobObject (INT→process-tree kill)",
"KILL" => "TerminateJobObject (KILL→process-tree kill)",
other => {
debug!(
signal = other,
"unknown signal mapped to KILL (process-tree kill)"
);
"TerminateJobObject (unknown→process-tree kill)"
}
};
if let Some(name) = job_name {
use windows::Win32::System::JobObjects::{
JOB_OBJECT_ALL_ACCESS, OpenJobObjectW, TerminateJobObject,
};
use windows::core::HSTRING;
let hname = HSTRING::from(name);
unsafe {
let job = OpenJobObjectW(JOB_OBJECT_ALL_ACCESS, false, &hname)
.map_err(|e| anyhow::anyhow!("OpenJobObjectW({name}) failed: {e}"))?;
let result = TerminateJobObject(job, 1)
.map_err(|e| anyhow::anyhow!("TerminateJobObject({name}) failed: {e}"));
let _ = CloseHandle(job);
return result;
}
}
send_signal_no_job(pid)
}
#[cfg(windows)]
fn send_signal_no_job(pid: u32) -> Result<()> {
use windows::Win32::Foundation::{CloseHandle, HANDLE};
use windows::Win32::System::JobObjects::{
AssignProcessToJobObject, CreateJobObjectW, TerminateJobObject,
};
use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
unsafe {
let proc_handle: HANDLE = OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid)?;
let job: HANDLE = CreateJobObjectW(None, None)?;
if AssignProcessToJobObject(job, proc_handle).is_err() {
let _ = CloseHandle(job);
let _ = CloseHandle(proc_handle);
return terminate_process_tree(pid);
}
TerminateJobObject(job, 1).map_err(|e| {
let _ = CloseHandle(proc_handle);
let _ = CloseHandle(job);
anyhow::anyhow!("TerminateJobObject failed: {}", e)
})?;
let _ = CloseHandle(proc_handle);
let _ = CloseHandle(job);
}
Ok(())
}
#[cfg(windows)]
fn terminate_process_tree(root_pid: u32) -> Result<()> {
use windows::Win32::Foundation::CloseHandle;
use windows::Win32::System::Diagnostics::ToolHelp::{
CreateToolhelp32Snapshot, PROCESSENTRY32, Process32First, Process32Next, TH32CS_SNAPPROCESS,
};
use windows::Win32::System::Threading::{OpenProcess, PROCESS_TERMINATE, TerminateProcess};
unsafe {
let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)
.map_err(|e| anyhow::anyhow!("CreateToolhelp32Snapshot failed: {}", e))?;
let mut entries: Vec<(u32, u32)> = Vec::new();
let mut entry = PROCESSENTRY32 {
dwSize: std::mem::size_of::<PROCESSENTRY32>() as u32,
..Default::default()
};
if Process32First(snapshot, &mut entry).is_ok() {
loop {
entries.push((entry.th32ProcessID, entry.th32ParentProcessID));
entry = PROCESSENTRY32 {
dwSize: std::mem::size_of::<PROCESSENTRY32>() as u32,
..Default::default()
};
if Process32Next(snapshot, &mut entry).is_err() {
break;
}
}
}
let _ = CloseHandle(snapshot);
let mut to_kill: Vec<u32> = vec![root_pid];
let mut i = 0;
while i < to_kill.len() {
let parent = to_kill[i];
for &(child_pid, parent_pid) in &entries {
if parent_pid == parent && !to_kill.contains(&child_pid) {
to_kill.push(child_pid);
}
}
i += 1;
}
use windows::Win32::Foundation::ERROR_INVALID_PARAMETER;
for &target_pid in to_kill.iter().rev() {
match OpenProcess(PROCESS_TERMINATE, false, target_pid) {
Ok(h) => {
let result = TerminateProcess(h, 1);
let _ = CloseHandle(h);
result.map_err(|e| {
anyhow::anyhow!("TerminateProcess for pid {} failed: {}", target_pid, e)
})?;
}
Err(e) => {
if e.code() != ERROR_INVALID_PARAMETER.to_hresult() {
return Err(anyhow::anyhow!(
"OpenProcess for pid {} failed (process may still be running): {}",
target_pid,
e
));
}
}
}
}
}
Ok(())
}
#[cfg(not(any(unix, windows)))]
fn send_signal(_pid: u32, _signal: &str) -> Result<()> {
anyhow::bail!("kill not supported on this platform");
}