1use super::ProcessHandle;
2use crate::error::{
3 TerminationAttemptError, TerminationAttemptOperation, TerminationAttemptPhase, TerminationError,
4};
5use crate::output_stream::OutputStream;
6use crate::signal;
7use std::borrow::Cow;
8use std::error::Error;
9use std::io;
10use std::process::ExitStatus;
11use std::time::Duration;
12
13const FORCE_KILL_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
18
19const REAP_AFTER_SIGNAL_FAILURE_GRACE: Duration = Duration::from_millis(100);
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub(super) struct TerminationOutcome {
26 pub(super) exit_status: ExitStatus,
27 pub(super) output_collection_timeout_extension: Duration,
28}
29
30impl TerminationOutcome {
31 fn graceful_success(exit_status: ExitStatus) -> Self {
32 Self {
33 exit_status,
34 output_collection_timeout_extension: Duration::ZERO,
35 }
36 }
37
38 fn force_kill_success(exit_status: ExitStatus) -> Self {
39 Self {
40 exit_status,
41 output_collection_timeout_extension: FORCE_KILL_WAIT_TIMEOUT,
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy)]
47enum GracefulTerminationPhase {
48 Interrupt,
49 Terminate,
50}
51
52impl GracefulTerminationPhase {
53 fn attempt_phase(self) -> TerminationAttemptPhase {
54 match self {
55 Self::Interrupt => TerminationAttemptPhase::Interrupt,
56 Self::Terminate => TerminationAttemptPhase::Terminate,
57 }
58 }
59}
60
61#[derive(Debug, Default)]
62struct TerminationDiagnostics {
63 attempt_errors: Vec<TerminationAttemptError>,
64}
65
66impl TerminationDiagnostics {
67 fn record_preflight_status_error(&mut self, error: impl Error + Send + Sync + 'static) {
68 self.record(
69 TerminationAttemptPhase::Preflight,
70 TerminationAttemptOperation::CheckStatus,
71 None,
72 error,
73 );
74 }
75
76 fn record_graceful_signal_error(
77 &mut self,
78 phase: GracefulTerminationPhase,
79 signal_name: &'static str,
80 error: impl Error + Send + Sync + 'static,
81 ) {
82 self.record(
83 phase.attempt_phase(),
84 TerminationAttemptOperation::SendSignal,
85 Some(signal_name),
86 error,
87 );
88 }
89
90 fn record_graceful_wait_error(
91 &mut self,
92 phase: GracefulTerminationPhase,
93 signal_name: &'static str,
94 error: impl Error + Send + Sync + 'static,
95 ) {
96 self.record(
97 phase.attempt_phase(),
98 TerminationAttemptOperation::WaitForExit,
99 Some(signal_name),
100 error,
101 );
102 }
103
104 fn record_graceful_status_error(
105 &mut self,
106 phase: GracefulTerminationPhase,
107 signal_name: &'static str,
108 error: impl Error + Send + Sync + 'static,
109 ) {
110 self.record(
111 phase.attempt_phase(),
112 TerminationAttemptOperation::CheckStatus,
113 Some(signal_name),
114 error,
115 );
116 }
117
118 fn record_kill_signal_error(&mut self, error: impl Error + Send + Sync + 'static) {
119 self.record(
120 TerminationAttemptPhase::Kill,
121 TerminationAttemptOperation::SendSignal,
122 Some(signal::KILL_SIGNAL_NAME),
123 error,
124 );
125 }
126
127 fn record_kill_wait_error(&mut self, error: impl Error + Send + Sync + 'static) {
128 self.record(
129 TerminationAttemptPhase::Kill,
130 TerminationAttemptOperation::WaitForExit,
131 Some(signal::KILL_SIGNAL_NAME),
132 error,
133 );
134 }
135
136 fn record_kill_status_error(&mut self, error: impl Error + Send + Sync + 'static) {
137 self.record(
138 TerminationAttemptPhase::Kill,
139 TerminationAttemptOperation::CheckStatus,
140 Some(signal::KILL_SIGNAL_NAME),
141 error,
142 );
143 }
144
145 fn record(
146 &mut self,
147 phase: TerminationAttemptPhase,
148 operation: TerminationAttemptOperation,
149 signal_name: Option<&'static str>,
150 error: impl Error + Send + Sync + 'static,
151 ) {
152 self.attempt_errors.push(TerminationAttemptError {
153 phase,
154 operation,
155 signal_name,
156 source: Box::new(error),
157 });
158 }
159
160 #[must_use]
161 fn into_termination_failed(self, process_name: Cow<'static, str>) -> TerminationError {
162 assert!(
163 !self.attempt_errors.is_empty(),
164 "into_termination_failed must not be used when no error was recorded!",
165 );
166
167 TerminationError::TerminationFailed {
168 process_name,
169 attempt_errors: self.attempt_errors,
170 }
171 }
172
173 #[must_use]
174 fn into_signal_failed(self, process_name: Cow<'static, str>) -> TerminationError {
175 assert!(
176 !self.attempt_errors.is_empty(),
177 "into_signal_failed must not be used when no error was recorded!",
178 );
179
180 TerminationError::SignalFailed {
181 process_name,
182 attempt_errors: self.attempt_errors,
183 }
184 }
185}
186
187impl<Stdout, Stderr> ProcessHandle<Stdout, Stderr>
188where
189 Stdout: OutputStream,
190 Stderr: OutputStream,
191{
192 pub fn send_interrupt_signal(&mut self) -> Result<(), TerminationError> {
208 self.send_signal_with_preflight_reap(
209 GracefulTerminationPhase::Interrupt,
210 signal::INTERRUPT_SIGNAL_NAME,
211 signal::send_interrupt,
212 )
213 }
214
215 pub fn send_terminate_signal(&mut self) -> Result<(), TerminationError> {
230 self.send_signal_with_preflight_reap(
231 GracefulTerminationPhase::Terminate,
232 signal::TERMINATE_SIGNAL_NAME,
233 signal::send_terminate,
234 )
235 }
236
237 pub async fn terminate(
271 &mut self,
272 interrupt_timeout: Duration,
273 terminate_timeout: Duration,
274 ) -> Result<ExitStatus, TerminationError> {
275 self.terminate_detailed(interrupt_timeout, terminate_timeout)
276 .await
277 .map(|outcome| outcome.exit_status)
278 }
279
280 pub(super) async fn terminate_detailed(
281 &mut self,
282 interrupt_timeout: Duration,
283 terminate_timeout: Duration,
284 ) -> Result<TerminationOutcome, TerminationError> {
285 self.terminate_inner_with_preflight_reaper(
286 interrupt_timeout,
287 terminate_timeout,
288 Self::try_reap_exit_status,
289 Self::send_interrupt_signal_raw,
290 Self::send_terminate_signal_raw,
291 )
292 .await
293 }
294
295 #[cfg(test)]
296 async fn terminate_inner<InterruptSignalSender, TerminateSignalSender>(
297 &mut self,
298 interrupt_timeout: Duration,
299 terminate_timeout: Duration,
300 send_interrupt_signal: InterruptSignalSender,
301 send_terminate_signal: TerminateSignalSender,
302 ) -> Result<ExitStatus, TerminationError>
303 where
304 InterruptSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
305 TerminateSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
306 {
307 self.terminate_inner_detailed(
308 interrupt_timeout,
309 terminate_timeout,
310 send_interrupt_signal,
311 send_terminate_signal,
312 )
313 .await
314 .map(|outcome| outcome.exit_status)
315 }
316
317 #[cfg(test)]
318 async fn terminate_inner_detailed<InterruptSignalSender, TerminateSignalSender>(
319 &mut self,
320 interrupt_timeout: Duration,
321 terminate_timeout: Duration,
322 send_interrupt_signal: InterruptSignalSender,
323 send_terminate_signal: TerminateSignalSender,
324 ) -> Result<TerminationOutcome, TerminationError>
325 where
326 InterruptSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
327 TerminateSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
328 {
329 self.terminate_inner_with_preflight_reaper(
330 interrupt_timeout,
331 terminate_timeout,
332 Self::try_reap_exit_status,
333 send_interrupt_signal,
334 send_terminate_signal,
335 )
336 .await
337 }
338
339 async fn terminate_inner_with_preflight_reaper<
340 PreflightReaper,
341 InterruptSignalSender,
342 TerminateSignalSender,
343 >(
344 &mut self,
345 interrupt_timeout: Duration,
346 terminate_timeout: Duration,
347 mut try_reap_exit_status: PreflightReaper,
348 mut send_interrupt_signal: InterruptSignalSender,
349 mut send_terminate_signal: TerminateSignalSender,
350 ) -> Result<TerminationOutcome, TerminationError>
351 where
352 PreflightReaper: FnMut(&mut Self) -> Result<Option<ExitStatus>, io::Error>,
353 InterruptSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
354 TerminateSignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
355 {
356 let result = 'termination: {
357 let mut diagnostics = TerminationDiagnostics::default();
358
359 match try_reap_exit_status(self) {
360 Ok(Some(exit_status)) => {
361 break 'termination Ok(TerminationOutcome::graceful_success(exit_status));
362 }
363 Ok(None) => {}
364 Err(err) => {
365 tracing::warn!(
366 process = %self.name,
367 signal = signal::INTERRUPT_SIGNAL_NAME,
368 error = %err,
369 "Could not determine process state before termination. Attempting interrupt signal."
370 );
371 diagnostics.record_preflight_status_error(err);
372 }
373 }
374 if let Some(exit_status) = self
375 .attempt_graceful_phase(
376 signal::INTERRUPT_SIGNAL_NAME,
377 signal::TERMINATE_SIGNAL_NAME,
378 interrupt_timeout,
379 GracefulTerminationPhase::Interrupt,
380 &mut diagnostics,
381 &mut send_interrupt_signal,
382 )
383 .await
384 {
385 break 'termination Ok(exit_status);
386 }
387
388 if let Some(exit_status) = self
389 .attempt_graceful_phase(
390 signal::TERMINATE_SIGNAL_NAME,
391 signal::KILL_SIGNAL_NAME,
392 terminate_timeout,
393 GracefulTerminationPhase::Terminate,
394 &mut diagnostics,
395 &mut send_terminate_signal,
396 )
397 .await
398 {
399 break 'termination Ok(exit_status);
400 }
401
402 self.attempt_forceful_kill(diagnostics).await
403 };
404
405 self.disarm_after_successful_termination(result)
406 }
407
408 fn send_signal_with_preflight_reap<SignalSender>(
409 &mut self,
410 phase: GracefulTerminationPhase,
411 signal_name: &'static str,
412 send_signal: SignalSender,
413 ) -> Result<(), TerminationError>
414 where
415 SignalSender: FnOnce(&tokio::process::Child) -> Result<(), io::Error>,
416 {
417 self.send_signal_with_reaper(phase, signal_name, send_signal, Self::try_reap_exit_status)
418 }
419
420 fn send_signal_with_reaper<SignalSender, Reaper>(
421 &mut self,
422 phase: GracefulTerminationPhase,
423 signal_name: &'static str,
424 send_signal: SignalSender,
425 mut try_reap_exit_status: Reaper,
426 ) -> Result<(), TerminationError>
427 where
428 SignalSender: FnOnce(&tokio::process::Child) -> Result<(), io::Error>,
429 Reaper: FnMut(&mut Self) -> Result<Option<ExitStatus>, io::Error>,
430 {
431 let mut diagnostics = TerminationDiagnostics::default();
432
433 match try_reap_exit_status(self) {
435 Ok(Some(_exit_status)) => {
436 self.must_not_be_terminated();
437 Ok(())
438 }
439 Ok(None) => match send_signal(&self.child) {
440 Ok(()) => Ok(()),
441 Err(signal_error) => match try_reap_exit_status(self) {
444 Ok(Some(_exit_status)) => {
445 self.must_not_be_terminated();
446 Ok(())
447 }
448 Ok(None) => {
449 diagnostics.record_graceful_signal_error(phase, signal_name, signal_error);
450 Err(diagnostics.into_signal_failed(self.name.clone()))
451 }
452 Err(reap_error) => {
453 diagnostics.record_graceful_signal_error(phase, signal_name, signal_error);
454 diagnostics.record_graceful_status_error(phase, signal_name, reap_error);
455 Err(diagnostics.into_signal_failed(self.name.clone()))
456 }
457 },
458 },
459 Err(status_error) => {
460 diagnostics.record_graceful_status_error(phase, signal_name, status_error);
461 Err(diagnostics.into_signal_failed(self.name.clone()))
462 }
463 }
464 }
465
466 fn send_interrupt_signal_raw(&mut self) -> Result<(), io::Error> {
467 signal::send_interrupt(&self.child)
468 }
469
470 fn send_terminate_signal_raw(&mut self) -> Result<(), io::Error> {
471 signal::send_terminate(&self.child)
472 }
473
474 fn disarm_after_successful_termination<T>(
475 &mut self,
476 result: Result<T, TerminationError>,
477 ) -> Result<T, TerminationError> {
478 if result.is_ok() {
479 self.must_not_be_terminated();
480 }
481
482 result
483 }
484
485 async fn attempt_graceful_phase<SignalSender>(
486 &mut self,
487 signal_name: &'static str,
488 next_signal_name: &'static str,
489 timeout: Duration,
490 phase: GracefulTerminationPhase,
491 diagnostics: &mut TerminationDiagnostics,
492 send_signal: &mut SignalSender,
493 ) -> Option<TerminationOutcome>
494 where
495 SignalSender: FnMut(&mut Self) -> Result<(), io::Error>,
496 {
497 match send_signal(self) {
498 Ok(()) => {
499 self.wait_after_graceful_signal(
500 signal_name,
501 next_signal_name,
502 timeout,
503 phase,
504 diagnostics,
505 )
506 .await
507 }
508 Err(err) => {
509 tracing::warn!(
510 process = %self.name,
511 signal = signal_name,
512 next_signal = next_signal_name,
513 error = %err,
514 "Graceful shutdown signal could not be sent. Attempting next shutdown phase."
515 );
516 diagnostics.record_graceful_signal_error(phase, signal_name, err);
517 self.observe_exit_after_failed_signal(signal_name, phase, diagnostics)
518 .await
519 }
520 }
521 }
522
523 async fn wait_after_graceful_signal(
524 &mut self,
525 signal_name: &'static str,
526 next_signal_name: &'static str,
527 timeout: Duration,
528 phase: GracefulTerminationPhase,
529 diagnostics: &mut TerminationDiagnostics,
530 ) -> Option<TerminationOutcome> {
531 match self.wait_for_exit_after_signal(timeout).await {
532 Ok(Some(exit_status)) => Some(TerminationOutcome::graceful_success(exit_status)),
533 Ok(None) => {
534 let not_terminated = Self::wait_timeout_diagnostic(timeout);
535 tracing::warn!(
536 process = %self.name,
537 signal = signal_name,
538 next_signal = next_signal_name,
539 error = %not_terminated,
540 "Graceful shutdown signal timed out. Attempting next shutdown phase."
541 );
542 diagnostics.record_graceful_wait_error(phase, signal_name, not_terminated);
543 None
544 }
545 Err(wait_error) => {
546 tracing::warn!(
547 process = %self.name,
548 signal = signal_name,
549 next_signal = next_signal_name,
550 error = %wait_error,
551 "Graceful shutdown signal timed out. Attempting next shutdown phase."
552 );
553 diagnostics.record_graceful_wait_error(phase, signal_name, wait_error);
554 None
555 }
556 }
557 }
558
559 async fn observe_exit_after_failed_signal(
563 &mut self,
564 signal_name: &'static str,
565 phase: GracefulTerminationPhase,
566 diagnostics: &mut TerminationDiagnostics,
567 ) -> Option<TerminationOutcome> {
568 match self
569 .wait_for_exit_after_signal(REAP_AFTER_SIGNAL_FAILURE_GRACE)
570 .await
571 {
572 Ok(Some(exit_status)) => Some(TerminationOutcome::graceful_success(exit_status)),
573 Ok(None) => None,
574 Err(reap_error) => {
575 tracing::warn!(
576 process = %self.name,
577 signal = signal_name,
578 error = %reap_error,
579 "Could not determine process state after graceful signal send failed."
580 );
581 diagnostics.record_graceful_status_error(phase, signal_name, reap_error);
582 None
583 }
584 }
585 }
586
587 async fn attempt_forceful_kill(
588 &mut self,
589 mut diagnostics: TerminationDiagnostics,
590 ) -> Result<TerminationOutcome, TerminationError> {
591 match Self::start_kill_process_group(&mut self.child) {
592 Ok(()) => {
593 match self
599 .wait_for_exit_after_signal(FORCE_KILL_WAIT_TIMEOUT)
600 .await
601 {
602 Ok(Some(exit_status)) => {
603 Ok(TerminationOutcome::force_kill_success(exit_status))
604 }
605 Ok(None) => {
606 let not_terminated_after_kill =
607 Self::wait_timeout_diagnostic(FORCE_KILL_WAIT_TIMEOUT);
608 tracing::error!(
610 process = %self.name,
611 interrupt_signal = signal::INTERRUPT_SIGNAL_NAME,
612 terminate_signal = signal::TERMINATE_SIGNAL_NAME,
613 kill_signal = signal::KILL_SIGNAL_NAME,
614 "Process did not terminate after all termination attempts. Process may still be running. Manual intervention and investigation required!"
615 );
616 diagnostics.record_kill_wait_error(not_terminated_after_kill);
617 Err(diagnostics.into_termination_failed(self.name.clone()))
618 }
619 Err(not_terminated_after_kill) => {
620 tracing::error!(
622 process = %self.name,
623 interrupt_signal = signal::INTERRUPT_SIGNAL_NAME,
624 terminate_signal = signal::TERMINATE_SIGNAL_NAME,
625 kill_signal = signal::KILL_SIGNAL_NAME,
626 "Process did not terminate after all termination attempts. Process may still be running. Manual intervention and investigation required!"
627 );
628 diagnostics.record_kill_wait_error(not_terminated_after_kill);
629 Err(diagnostics.into_termination_failed(self.name.clone()))
630 }
631 }
632 }
633 Err(kill_error) => {
634 tracing::error!(
635 process = %self.name,
636 error = %kill_error,
637 signal = signal::KILL_SIGNAL_NAME,
638 "Forceful shutdown failed. Process may still be running. Manual intervention required!"
639 );
640 diagnostics.record_kill_signal_error(kill_error);
641
642 match self
645 .wait_for_exit_after_signal(REAP_AFTER_SIGNAL_FAILURE_GRACE)
646 .await
647 {
648 Ok(Some(exit_status)) => {
649 return Ok(TerminationOutcome::graceful_success(exit_status));
650 }
651 Ok(None) => {}
652 Err(reap_error) => {
653 tracing::warn!(
654 process = %self.name,
655 signal = signal::KILL_SIGNAL_NAME,
656 error = %reap_error,
657 "Could not determine process state after forceful shutdown failed."
658 );
659 diagnostics.record_kill_status_error(reap_error);
660 }
661 }
662
663 Err(diagnostics.into_termination_failed(self.name.clone()))
664 }
665 }
666 }
667
668 pub async fn kill(&mut self) -> Result<(), TerminationError> {
684 self.kill_inner(Self::start_kill_raw).await
685 }
686
687 async fn kill_inner<StartKill>(
688 &mut self,
689 mut start_kill: StartKill,
690 ) -> Result<(), TerminationError>
691 where
692 StartKill: FnMut(&mut Self) -> Result<(), io::Error>,
693 {
694 self.stdin().close();
695 let mut diagnostics = TerminationDiagnostics::default();
696
697 if let Err(err) = start_kill(self) {
698 diagnostics.record_kill_signal_error(err);
699 return Err(diagnostics.into_termination_failed(self.name.clone()));
700 }
701
702 if let Err(err) = self.wait_for_completion_unbounded_inner().await {
703 diagnostics.record_kill_wait_error(err);
704 return Err(diagnostics.into_termination_failed(self.name.clone()));
705 }
706
707 Ok(())
708 }
709
710 fn start_kill_raw(&mut self) -> Result<(), io::Error> {
711 Self::start_kill_process_group(&mut self.child)
712 }
713
714 fn start_kill_process_group(child: &mut tokio::process::Child) -> Result<(), io::Error> {
723 #[cfg(unix)]
724 {
725 match child.id() {
726 Some(pid) => signal::send_kill_to_process_group(pid),
727 None => child.start_kill(),
730 }
731 }
732 #[cfg(not(unix))]
733 {
734 child.start_kill()
735 }
736 }
737}
738
739#[cfg(test)]
740mod tests;