1use super::ProcessHandle;
2use crate::error::{
3 TerminationAttemptError, TerminationAttemptOperation, TerminationAttemptPhase, TerminationError,
4};
5use crate::output_stream::OutputStream;
6use crate::signal;
7use std::borrow::Cow;
8use std::error::Error;
9use std::io;
10use std::process::ExitStatus;
11use std::time::Duration;
12
13const FORCE_KILL_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub(super) struct TerminationOutcome {
21 pub(super) exit_status: ExitStatus,
22 pub(super) output_collection_timeout_extension: Duration,
23}
24
25impl TerminationOutcome {
26 fn graceful_success(exit_status: ExitStatus) -> Self {
27 Self {
28 exit_status,
29 output_collection_timeout_extension: Duration::ZERO,
30 }
31 }
32
33 fn force_kill_success(exit_status: ExitStatus) -> Self {
34 Self {
35 exit_status,
36 output_collection_timeout_extension: FORCE_KILL_WAIT_TIMEOUT,
37 }
38 }
39}
40
41#[derive(Debug, Clone, Copy)]
42enum GracefulTerminationPhase {
43 Interrupt,
44 Terminate,
45}
46
47impl GracefulTerminationPhase {
48 fn attempt_phase(self) -> TerminationAttemptPhase {
49 match self {
50 Self::Interrupt => TerminationAttemptPhase::Interrupt,
51 Self::Terminate => TerminationAttemptPhase::Terminate,
52 }
53 }
54}
55
56#[derive(Debug, Default)]
57struct TerminationDiagnostics {
58 attempt_errors: Vec<TerminationAttemptError>,
59}
60
61impl TerminationDiagnostics {
62 fn record_preflight_status_error(&mut self, error: impl Error + Send + Sync + 'static) {
63 self.record(
64 TerminationAttemptPhase::Preflight,
65 TerminationAttemptOperation::CheckStatus,
66 None,
67 error,
68 );
69 }
70
71 fn record_graceful_signal_error(
72 &mut self,
73 phase: GracefulTerminationPhase,
74 signal_name: &'static str,
75 error: impl Error + Send + Sync + 'static,
76 ) {
77 self.record(
78 phase.attempt_phase(),
79 TerminationAttemptOperation::SendSignal,
80 Some(signal_name),
81 error,
82 );
83 }
84
85 fn record_graceful_wait_error(
86 &mut self,
87 phase: GracefulTerminationPhase,
88 signal_name: &'static str,
89 error: impl Error + Send + Sync + 'static,
90 ) {
91 self.record(
92 phase.attempt_phase(),
93 TerminationAttemptOperation::WaitForExit,
94 Some(signal_name),
95 error,
96 );
97 }
98
99 fn record_graceful_status_error(
100 &mut self,
101 phase: GracefulTerminationPhase,
102 signal_name: &'static str,
103 error: impl Error + Send + Sync + 'static,
104 ) {
105 self.record(
106 phase.attempt_phase(),
107 TerminationAttemptOperation::CheckStatus,
108 Some(signal_name),
109 error,
110 );
111 }
112
113 fn record_kill_signal_error(&mut self, error: impl Error + Send + Sync + 'static) {
114 self.record(
115 TerminationAttemptPhase::Kill,
116 TerminationAttemptOperation::SendSignal,
117 Some(signal::KILL_SIGNAL_NAME),
118 error,
119 );
120 }
121
122 fn record_kill_wait_error(&mut self, error: impl Error + Send + Sync + 'static) {
123 self.record(
124 TerminationAttemptPhase::Kill,
125 TerminationAttemptOperation::WaitForExit,
126 Some(signal::KILL_SIGNAL_NAME),
127 error,
128 );
129 }
130
131 fn record_kill_status_error(&mut self, error: impl Error + Send + Sync + 'static) {
132 self.record(
133 TerminationAttemptPhase::Kill,
134 TerminationAttemptOperation::CheckStatus,
135 Some(signal::KILL_SIGNAL_NAME),
136 error,
137 );
138 }
139
140 fn record(
141 &mut self,
142 phase: TerminationAttemptPhase,
143 operation: TerminationAttemptOperation,
144 signal_name: Option<&'static str>,
145 error: impl Error + Send + Sync + 'static,
146 ) {
147 self.attempt_errors.push(TerminationAttemptError {
148 phase,
149 operation,
150 signal_name,
151 source: Box::new(error),
152 });
153 }
154
155 #[must_use]
156 fn into_termination_failed(self, process_name: Cow<'static, str>) -> TerminationError {
157 assert!(
158 !self.attempt_errors.is_empty(),
159 "into_termination_failed must not be used when no error was recorded!",
160 );
161
162 TerminationError::TerminationFailed {
163 process_name,
164 attempt_errors: self.attempt_errors,
165 }
166 }
167
168 #[must_use]
169 fn into_signal_failed(self, process_name: Cow<'static, str>) -> TerminationError {
170 assert!(
171 !self.attempt_errors.is_empty(),
172 "into_signal_failed must not be used when no error was recorded!",
173 );
174
175 TerminationError::SignalFailed {
176 process_name,
177 attempt_errors: self.attempt_errors,
178 }
179 }
180}
181
182impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
183where
184 Stdout: OutputStream,
185 Stderr: OutputStream,
186{
187 pub fn send_interrupt_signal(&mut self) -> Result<(), TerminationError> {
203 self.send_signal_with_preflight_reap(
204 GracefulTerminationPhase::Interrupt,
205 signal::INTERRUPT_SIGNAL_NAME,
206 signal::send_interrupt,
207 )
208 }
209
210 pub fn send_terminate_signal(&mut self) -> Result<(), TerminationError> {
225 self.send_signal_with_preflight_reap(
226 GracefulTerminationPhase::Terminate,
227 signal::TERMINATE_SIGNAL_NAME,
228 signal::send_terminate,
229 )
230 }
231
232 pub async fn terminate(
253 &mut self,
254 interrupt_timeout: Duration,
255 terminate_timeout: Duration,
256 ) -> Result<ExitStatus, TerminationError> {
257 self.terminate_detailed(interrupt_timeout, terminate_timeout)
258 .await
259 .map(|outcome| outcome.exit_status)
260 }
261
262 pub(super) async fn terminate_detailed(
263 &mut self,
264 interrupt_timeout: Duration,
265 terminate_timeout: Duration,
266 ) -> Result<TerminationOutcome, TerminationError> {
267 self.terminate_inner_with_preflight_reaper(
268 interrupt_timeout,
269 terminate_timeout,
270 Self::try_reap_exit_status,
271 Self::send_interrupt_signal_raw,
272 Self::send_terminate_signal_raw,
273 )
274 .await
275 }
276
277 #[cfg(test)]
278 async fn terminate_inner<InterruptSignalSender, TerminateSignalSender>(
279 &mut self,
280 interrupt_timeout: Duration,
281 terminate_timeout: Duration,
282 send_interrupt_signal: InterruptSignalSender,
283 send_terminate_signal: TerminateSignalSender,
284 ) -> Result<ExitStatus, TerminationError>
285 where
286 InterruptSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
287 TerminateSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
288 {
289 self.terminate_inner_detailed(
290 interrupt_timeout,
291 terminate_timeout,
292 send_interrupt_signal,
293 send_terminate_signal,
294 )
295 .await
296 .map(|outcome| outcome.exit_status)
297 }
298
299 #[cfg(test)]
300 async fn terminate_inner_detailed<InterruptSignalSender, TerminateSignalSender>(
301 &mut self,
302 interrupt_timeout: Duration,
303 terminate_timeout: Duration,
304 send_interrupt_signal: InterruptSignalSender,
305 send_terminate_signal: TerminateSignalSender,
306 ) -> Result<TerminationOutcome, TerminationError>
307 where
308 InterruptSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
309 TerminateSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
310 {
311 self.terminate_inner_with_preflight_reaper(
312 interrupt_timeout,
313 terminate_timeout,
314 Self::try_reap_exit_status,
315 send_interrupt_signal,
316 send_terminate_signal,
317 )
318 .await
319 }
320
321 async fn terminate_inner_with_preflight_reaper<
322 PreflightReaper,
323 InterruptSignalSender,
324 TerminateSignalSender,
325 >(
326 &mut self,
327 interrupt_timeout: Duration,
328 terminate_timeout: Duration,
329 mut try_reap_exit_status: PreflightReaper,
330 mut send_interrupt_signal: InterruptSignalSender,
331 mut send_terminate_signal: TerminateSignalSender,
332 ) -> Result<TerminationOutcome, TerminationError>
333 where
334 PreflightReaper: FnMut(&mut Self) -> Result<Option<ExitStatus>, io::Error>,
335 InterruptSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
336 TerminateSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
337 {
338 let result = 'termination: {
339 let mut diagnostics = TerminationDiagnostics::default();
340
341 match try_reap_exit_status(self) {
342 Ok(Some(exit_status)) => {
343 break 'termination Ok(TerminationOutcome::graceful_success(exit_status));
344 }
345 Ok(None) => {}
346 Err(err) => {
347 tracing::warn!(
348 process = %self.name,
349 signal = signal::INTERRUPT_SIGNAL_NAME,
350 error = %err,
351 "Could not determine process state before termination. Attempting interrupt signal."
352 );
353 diagnostics.record_preflight_status_error(err);
354 }
355 }
356 if let Some(exit_status) = self
357 .attempt_graceful_phase(
358 signal::INTERRUPT_SIGNAL_NAME,
359 signal::TERMINATE_SIGNAL_NAME,
360 interrupt_timeout,
361 GracefulTerminationPhase::Interrupt,
362 &mut diagnostics,
363 &mut send_interrupt_signal,
364 )
365 .await
366 {
367 break 'termination Ok(exit_status);
368 }
369
370 if let Some(exit_status) = self
371 .attempt_graceful_phase(
372 signal::TERMINATE_SIGNAL_NAME,
373 signal::KILL_SIGNAL_NAME,
374 terminate_timeout,
375 GracefulTerminationPhase::Terminate,
376 &mut diagnostics,
377 &mut send_terminate_signal,
378 )
379 .await
380 {
381 break 'termination Ok(exit_status);
382 }
383
384 self.attempt_forceful_kill(diagnostics).await
385 };
386
387 self.disarm_after_successful_termination(result)
388 }
389
390 fn send_signal_with_preflight_reap<SignalSender>(
391 &mut self,
392 phase: GracefulTerminationPhase,
393 signal_name: &'static str,
394 send_signal: SignalSender,
395 ) -> Result<(), TerminationError>
396 where
397 SignalSender: FnOnce(&tokio::process::Child) -> Result<(), io::Error>,
398 {
399 self.send_signal_with_reaper(phase, signal_name, send_signal, Self::try_reap_exit_status)
400 }
401
402 fn send_signal_with_reaper<SignalSender, Reaper>(
403 &mut self,
404 phase: GracefulTerminationPhase,
405 signal_name: &'static str,
406 send_signal: SignalSender,
407 mut try_reap_exit_status: Reaper,
408 ) -> Result<(), TerminationError>
409 where
410 SignalSender: FnOnce(&tokio::process::Child) -> Result<(), io::Error>,
411 Reaper: FnMut(&mut Self) -> Result<Option<ExitStatus>, io::Error>,
412 {
413 let mut diagnostics = TerminationDiagnostics::default();
414
415 match try_reap_exit_status(self) {
416 Ok(Some(_)) => {
417 self.must_not_be_terminated();
418 Ok(())
419 }
420 Ok(None) => match send_signal(&self.child) {
421 Ok(()) => Ok(()),
422 Err(signal_error) => match try_reap_exit_status(self) {
423 Ok(Some(_)) => {
424 self.must_not_be_terminated();
425 Ok(())
426 }
427 Ok(None) => {
428 diagnostics.record_graceful_signal_error(phase, signal_name, signal_error);
429 Err(diagnostics.into_signal_failed(self.name.clone()))
430 }
431 Err(reap_error) => {
432 diagnostics.record_graceful_signal_error(phase, signal_name, signal_error);
433 diagnostics.record_graceful_status_error(phase, signal_name, reap_error);
434 Err(diagnostics.into_signal_failed(self.name.clone()))
435 }
436 },
437 },
438 Err(status_error) => {
439 diagnostics.record_graceful_status_error(phase, signal_name, status_error);
440 Err(diagnostics.into_signal_failed(self.name.clone()))
441 }
442 }
443 }
444
445 fn send_interrupt_signal_raw(&mut self) -> Result<(), io::Error> {
446 signal::send_interrupt(&self.child)
447 }
448
449 fn send_terminate_signal_raw(&mut self) -> Result<(), io::Error> {
450 signal::send_terminate(&self.child)
451 }
452
453 fn disarm_after_successful_termination<T>(
454 &mut self,
455 result: Result<T, TerminationError>,
456 ) -> Result<T, TerminationError> {
457 if result.is_ok() {
458 self.must_not_be_terminated();
459 }
460
461 result
462 }
463
464 async fn attempt_graceful_phase<SignalSender>(
465 &mut self,
466 signal_name: &'static str,
467 next_signal_name: &'static str,
468 timeout: Duration,
469 phase: GracefulTerminationPhase,
470 diagnostics: &mut TerminationDiagnostics,
471 send_signal: &mut SignalSender,
472 ) -> Option<TerminationOutcome>
473 where
474 SignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
475 {
476 match send_signal(self) {
477 Ok(()) => {
478 self.wait_after_graceful_signal(
479 signal_name,
480 next_signal_name,
481 timeout,
482 phase,
483 diagnostics,
484 )
485 .await
486 }
487 Err(err) => {
488 tracing::warn!(
489 process = %self.name,
490 signal = signal_name,
491 next_signal = next_signal_name,
492 error = %err,
493 "Graceful shutdown signal could not be sent. Attempting next shutdown phase."
494 );
495 diagnostics.record_graceful_signal_error(phase, signal_name, err);
496 self.try_reap_after_failed_signal(signal_name, phase, diagnostics)
497 }
498 }
499 }
500
501 async fn wait_after_graceful_signal(
502 &mut self,
503 signal_name: &'static str,
504 next_signal_name: &'static str,
505 timeout: Duration,
506 phase: GracefulTerminationPhase,
507 diagnostics: &mut TerminationDiagnostics,
508 ) -> Option<TerminationOutcome> {
509 match self.wait_for_exit_after_signal(timeout).await {
510 Ok(Some(exit_status)) => Some(TerminationOutcome::graceful_success(exit_status)),
511 Ok(None) => {
512 let not_terminated = Self::wait_timeout_diagnostic(timeout);
513 tracing::warn!(
514 process = %self.name,
515 signal = signal_name,
516 next_signal = next_signal_name,
517 error = %not_terminated,
518 "Graceful shutdown signal timed out. Attempting next shutdown phase."
519 );
520 diagnostics.record_graceful_wait_error(phase, signal_name, not_terminated);
521 None
522 }
523 Err(wait_error) => {
524 tracing::warn!(
525 process = %self.name,
526 signal = signal_name,
527 next_signal = next_signal_name,
528 error = %wait_error,
529 "Graceful shutdown signal timed out. Attempting next shutdown phase."
530 );
531 diagnostics.record_graceful_wait_error(phase, signal_name, wait_error);
532 None
533 }
534 }
535 }
536
537 fn try_reap_after_failed_signal(
538 &mut self,
539 signal_name: &'static str,
540 phase: GracefulTerminationPhase,
541 diagnostics: &mut TerminationDiagnostics,
542 ) -> Option<TerminationOutcome> {
543 match self.try_reap_exit_status() {
544 Ok(Some(exit_status)) => Some(TerminationOutcome::graceful_success(exit_status)),
545 Ok(None) => None,
546 Err(reap_error) => {
547 tracing::warn!(
548 process = %self.name,
549 signal = signal_name,
550 error = %reap_error,
551 "Could not determine process state after graceful signal send failed."
552 );
553 diagnostics.record_graceful_status_error(phase, signal_name, reap_error);
554 None
555 }
556 }
557 }
558
559 async fn attempt_forceful_kill(
560 &mut self,
561 mut diagnostics: TerminationDiagnostics,
562 ) -> Result<TerminationOutcome, TerminationError> {
563 match Self::start_kill_process_group(&mut self.child) {
564 Ok(()) => {
565 match self
571 .wait_for_exit_after_signal(FORCE_KILL_WAIT_TIMEOUT)
572 .await
573 {
574 Ok(Some(exit_status)) => {
575 Ok(TerminationOutcome::force_kill_success(exit_status))
576 }
577 Ok(None) => {
578 let not_terminated_after_kill =
579 Self::wait_timeout_diagnostic(FORCE_KILL_WAIT_TIMEOUT);
580 tracing::error!(
582 process = %self.name,
583 interrupt_signal = signal::INTERRUPT_SIGNAL_NAME,
584 terminate_signal = signal::TERMINATE_SIGNAL_NAME,
585 kill_signal = signal::KILL_SIGNAL_NAME,
586 "Process did not terminate after all termination attempts. Process may still be running. Manual intervention and investigation required!"
587 );
588 diagnostics.record_kill_wait_error(not_terminated_after_kill);
589 Err(diagnostics.into_termination_failed(self.name.clone()))
590 }
591 Err(not_terminated_after_kill) => {
592 tracing::error!(
594 process = %self.name,
595 interrupt_signal = signal::INTERRUPT_SIGNAL_NAME,
596 terminate_signal = signal::TERMINATE_SIGNAL_NAME,
597 kill_signal = signal::KILL_SIGNAL_NAME,
598 "Process did not terminate after all termination attempts. Process may still be running. Manual intervention and investigation required!"
599 );
600 diagnostics.record_kill_wait_error(not_terminated_after_kill);
601 Err(diagnostics.into_termination_failed(self.name.clone()))
602 }
603 }
604 }
605 Err(kill_error) => {
606 tracing::error!(
607 process = %self.name,
608 error = %kill_error,
609 signal = signal::KILL_SIGNAL_NAME,
610 "Forceful shutdown failed. Process may still be running. Manual intervention required!"
611 );
612 diagnostics.record_kill_signal_error(kill_error);
613
614 match self.try_reap_exit_status() {
615 Ok(Some(exit_status)) => {
616 return Ok(TerminationOutcome::graceful_success(exit_status));
617 }
618 Ok(None) => {}
619 Err(reap_error) => {
620 tracing::warn!(
621 process = %self.name,
622 signal = signal::KILL_SIGNAL_NAME,
623 error = %reap_error,
624 "Could not determine process state after forceful shutdown failed."
625 );
626 diagnostics.record_kill_status_error(reap_error);
627 }
628 }
629
630 Err(diagnostics.into_termination_failed(self.name.clone()))
631 }
632 }
633 }
634
635 pub async fn kill(&mut self) -> Result<(), TerminationError> {
648 self.kill_inner(Self::start_kill_raw).await
649 }
650
651 async fn kill_inner<StartKill>(
652 &mut self,
653 mut start_kill: StartKill,
654 ) -> Result<(), TerminationError>
655 where
656 StartKill: FnMut(&mut Self) -> Result<(), io::Error>,
657 {
658 self.stdin().close();
659 let mut diagnostics = TerminationDiagnostics::default();
660
661 if let Err(err) = start_kill(self) {
662 diagnostics.record_kill_signal_error(err);
663 return Err(diagnostics.into_termination_failed(self.name.clone()));
664 }
665
666 if let Err(err) = self.wait_for_completion_unbounded_inner().await {
667 diagnostics.record_kill_wait_error(err);
668 return Err(diagnostics.into_termination_failed(self.name.clone()));
669 }
670
671 Ok(())
672 }
673
674 fn start_kill_raw(&mut self) -> Result<(), io::Error> {
675 Self::start_kill_process_group(&mut self.child)
676 }
677
678 fn start_kill_process_group(child: &mut tokio::process::Child) -> Result<(), io::Error> {
687 #[cfg(unix)]
688 {
689 match child.id() {
690 Some(pid) => signal::send_kill_to_process_group(pid),
691 None => child.start_kill(),
694 }
695 }
696 #[cfg(not(unix))]
697 {
698 child.start_kill()
699 }
700 }
701}
702
703#[cfg(test)]
704mod tests;