1use anyhow::Result;
19use tracing::info;
20
21use crate::jobstore::{InvalidJobState, JobDir, resolve_root};
22use crate::schema::{JobState, JobStateJob, JobStateResult, JobStatus, KillData, Response};
23
24#[derive(Debug)]
26pub struct KillOpts<'a> {
27 pub job_id: &'a str,
28 pub root: Option<&'a str>,
29 pub signal: &'a str,
31 pub no_wait: bool,
33}
34
35impl<'a> Default for KillOpts<'a> {
36 fn default() -> Self {
37 KillOpts {
38 job_id: "",
39 root: None,
40 signal: "TERM",
41 no_wait: false,
42 }
43 }
44}
45
46pub fn execute(opts: KillOpts) -> Result<()> {
48 let data = execute_inner(opts)?;
49 let response = Response::new("kill", data);
50 response.print();
51 Ok(())
52}
53
54pub fn execute_inner(opts: KillOpts) -> Result<KillData> {
56 let root = resolve_root(opts.root);
57 let job_dir = JobDir::open(&root, opts.job_id)?;
58
59 let state = job_dir.read_state()?;
60 let signal_upper = opts.signal.to_uppercase();
61
62 if *state.status() == JobStatus::Created {
63 return Err(anyhow::Error::new(InvalidJobState(format!(
64 "job {} is in 'created' state and has not been started; cannot send signal",
65 opts.job_id
66 ))));
67 }
68
69 if *state.status() != JobStatus::Running {
70 return Ok(KillData {
71 job_id: job_dir.job_id.clone(),
72 signal: signal_upper,
73 state: if opts.no_wait {
74 None
75 } else {
76 Some(state.status().as_str().to_string())
77 },
78 exit_code: if opts.no_wait {
79 None
80 } else {
81 state.exit_code()
82 },
83 terminated_signal: if opts.no_wait {
84 None
85 } else {
86 state.result.signal.clone()
87 },
88 observed_within_ms: if opts.no_wait { None } else { Some(0) },
89 });
90 }
91
92 if let Some(pid) = state.pid {
93 #[cfg(windows)]
94 send_signal(pid, &signal_upper, state.windows_job_name.as_deref())?;
95 #[cfg(not(windows))]
96 send_signal(pid, &signal_upper)?;
97
98 info!(job_id = %job_dir.job_id, pid, signal = %signal_upper, "signal sent");
99
100 let now = crate::run::now_rfc3339_pub();
101 let new_state = JobState {
102 job: JobStateJob {
103 id: job_dir.job_id.clone(),
104 status: JobStatus::Killed,
105 started_at: state.started_at().map(|s| s.to_string()),
106 },
107 result: JobStateResult {
108 exit_code: None,
109 signal: Some(signal_upper.clone()),
110 duration_ms: None,
111 },
112 pid: Some(pid),
113 finished_at: Some(now.clone()),
114 updated_at: now,
115 windows_job_name: None,
116 };
117 job_dir.write_state(&new_state)?;
118 }
119
120 if opts.no_wait {
121 return Ok(KillData {
122 job_id: job_dir.job_id.clone(),
123 signal: signal_upper,
124 state: None,
125 exit_code: None,
126 terminated_signal: None,
127 observed_within_ms: None,
128 });
129 }
130
131 let obs = observe_post_signal(&job_dir, std::time::Duration::from_secs(3));
132
133 Ok(KillData {
134 job_id: job_dir.job_id.clone(),
135 signal: signal_upper,
136 state: Some(obs.state),
137 exit_code: obs.exit_code,
138 terminated_signal: obs.terminated_signal,
139 observed_within_ms: Some(obs.observed_within_ms),
140 })
141}
142
143struct PostSignalObservation {
144 state: String,
145 exit_code: Option<i32>,
146 terminated_signal: Option<String>,
147 observed_within_ms: u64,
148}
149
150fn observe_post_signal(job_dir: &JobDir, budget: std::time::Duration) -> PostSignalObservation {
151 let start = std::time::Instant::now();
152 let deadline = start + budget;
153 let poll_interval = std::time::Duration::from_millis(100);
154
155 loop {
156 if let Ok(st) = job_dir.read_state()
157 && !st.status().is_non_terminal()
158 {
159 return PostSignalObservation {
160 state: st.status().as_str().to_string(),
161 exit_code: st.exit_code(),
162 terminated_signal: st.result.signal.clone(),
163 observed_within_ms: start.elapsed().as_millis() as u64,
164 };
165 }
166 if std::time::Instant::now() >= deadline {
167 break;
168 }
169 std::thread::sleep(poll_interval);
170 }
171
172 if let Ok(st) = job_dir.read_state() {
173 PostSignalObservation {
174 state: st.status().as_str().to_string(),
175 exit_code: st.exit_code(),
176 terminated_signal: st.result.signal.clone(),
177 observed_within_ms: start.elapsed().as_millis() as u64,
178 }
179 } else {
180 PostSignalObservation {
181 state: "running".to_string(),
182 exit_code: None,
183 terminated_signal: None,
184 observed_within_ms: start.elapsed().as_millis() as u64,
185 }
186 }
187}
188
189#[cfg(unix)]
190fn send_signal(pid: u32, signal: &str) -> Result<()> {
191 let signum: libc::c_int = match signal {
192 "TERM" => libc::SIGTERM,
193 "INT" => libc::SIGINT,
194 "KILL" => libc::SIGKILL,
195 _ => libc::SIGKILL, };
197 let pgid = -(pid as libc::pid_t);
202 let ret = unsafe { libc::kill(pgid, signum) };
203 if ret != 0 {
204 let err = std::io::Error::last_os_error();
205 if err.raw_os_error() == Some(libc::ESRCH) {
206 let ret2 = unsafe { libc::kill(pid as libc::pid_t, signum) };
208 if ret2 != 0 {
209 let err2 = std::io::Error::last_os_error();
210 if err2.raw_os_error() != Some(libc::ESRCH) {
211 return Err(err2.into());
212 }
213 }
214 } else {
215 return Err(err.into());
216 }
217 }
218 Ok(())
219}
220
221#[cfg(windows)]
231fn send_signal(pid: u32, signal: &str, job_name: Option<&str>) -> Result<()> {
232 use tracing::debug;
233 use windows::Win32::Foundation::CloseHandle;
234
235 let _mapped = match signal {
237 "TERM" => "TerminateJobObject (TERM→process-tree kill)",
238 "INT" => "TerminateJobObject (INT→process-tree kill)",
239 "KILL" => "TerminateJobObject (KILL→process-tree kill)",
240 other => {
241 debug!(
242 signal = other,
243 "unknown signal mapped to KILL (process-tree kill)"
244 );
245 "TerminateJobObject (unknown→process-tree kill)"
246 }
247 };
248
249 if let Some(name) = job_name {
251 use windows::Win32::System::JobObjects::{
252 JOB_OBJECT_ALL_ACCESS, OpenJobObjectW, TerminateJobObject,
253 };
254 use windows::core::HSTRING;
255
256 let hname = HSTRING::from(name);
257 unsafe {
258 let job = OpenJobObjectW(JOB_OBJECT_ALL_ACCESS, false, &hname)
259 .map_err(|e| anyhow::anyhow!("OpenJobObjectW({name}) failed: {e}"))?;
260 let result = TerminateJobObject(job, 1)
261 .map_err(|e| anyhow::anyhow!("TerminateJobObject({name}) failed: {e}"));
262 let _ = CloseHandle(job);
263 return result;
264 }
265 }
266
267 send_signal_no_job(pid)
269}
270
271#[cfg(windows)]
276fn send_signal_no_job(pid: u32) -> Result<()> {
277 use windows::Win32::Foundation::{CloseHandle, HANDLE};
278 use windows::Win32::System::JobObjects::{
279 AssignProcessToJobObject, CreateJobObjectW, TerminateJobObject,
280 };
281 use windows::Win32::System::Threading::{OpenProcess, PROCESS_SET_QUOTA, PROCESS_TERMINATE};
282
283 unsafe {
284 let proc_handle: HANDLE = OpenProcess(PROCESS_TERMINATE | PROCESS_SET_QUOTA, false, pid)?;
286
287 let job: HANDLE = CreateJobObjectW(None, None)?;
291
292 if AssignProcessToJobObject(job, proc_handle).is_err() {
296 let _ = CloseHandle(job);
302 let _ = CloseHandle(proc_handle);
303 return terminate_process_tree(pid);
306 }
307
308 TerminateJobObject(job, 1).map_err(|e| {
312 let _ = CloseHandle(proc_handle);
313 let _ = CloseHandle(job);
314 anyhow::anyhow!("TerminateJobObject failed: {}", e)
315 })?;
316
317 let _ = CloseHandle(proc_handle);
318 let _ = CloseHandle(job);
319 }
320 Ok(())
321}
322
323#[cfg(windows)]
332fn terminate_process_tree(root_pid: u32) -> Result<()> {
333 use windows::Win32::Foundation::CloseHandle;
334 use windows::Win32::System::Diagnostics::ToolHelp::{
335 CreateToolhelp32Snapshot, PROCESSENTRY32, Process32First, Process32Next, TH32CS_SNAPPROCESS,
336 };
337 use windows::Win32::System::Threading::{OpenProcess, PROCESS_TERMINATE, TerminateProcess};
338
339 unsafe {
340 let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)
344 .map_err(|e| anyhow::anyhow!("CreateToolhelp32Snapshot failed: {}", e))?;
345
346 let mut entries: Vec<(u32, u32)> = Vec::new();
347 let mut entry = PROCESSENTRY32 {
348 dwSize: std::mem::size_of::<PROCESSENTRY32>() as u32,
349 ..Default::default()
350 };
351
352 if Process32First(snapshot, &mut entry).is_ok() {
353 loop {
354 entries.push((entry.th32ProcessID, entry.th32ParentProcessID));
355 entry = PROCESSENTRY32 {
356 dwSize: std::mem::size_of::<PROCESSENTRY32>() as u32,
357 ..Default::default()
358 };
359 if Process32Next(snapshot, &mut entry).is_err() {
360 break;
361 }
362 }
363 }
364 let _ = CloseHandle(snapshot);
365
366 let mut to_kill: Vec<u32> = vec![root_pid];
368 let mut i = 0;
369 while i < to_kill.len() {
370 let parent = to_kill[i];
371 for &(child_pid, parent_pid) in &entries {
372 if parent_pid == parent && !to_kill.contains(&child_pid) {
373 to_kill.push(child_pid);
374 }
375 }
376 i += 1;
377 }
378
379 use windows::Win32::Foundation::ERROR_INVALID_PARAMETER;
385
386 for &target_pid in to_kill.iter().rev() {
387 match OpenProcess(PROCESS_TERMINATE, false, target_pid) {
388 Ok(h) => {
389 let result = TerminateProcess(h, 1);
390 let _ = CloseHandle(h);
391 result.map_err(|e| {
392 anyhow::anyhow!("TerminateProcess for pid {} failed: {}", target_pid, e)
393 })?;
394 }
395 Err(e) => {
396 if e.code() != ERROR_INVALID_PARAMETER.to_hresult() {
402 return Err(anyhow::anyhow!(
403 "OpenProcess for pid {} failed (process may still be running): {}",
404 target_pid,
405 e
406 ));
407 }
408 }
410 }
411 }
412 }
413 Ok(())
414}
415
416#[cfg(not(any(unix, windows)))]
417fn send_signal(_pid: u32, _signal: &str) -> Result<()> {
418 anyhow::bail!("kill not supported on this platform");
419}