1use crate::output_stream::broadcast::BroadcastOutputStream;
2use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
3use crate::output_stream::{BackpressureControl, FromStreamOptions};
4use crate::panic_on_drop::PanicOnDrop;
5use crate::terminate_on_drop::TerminateOnDrop;
6use crate::{CollectorError, LineParsingOptions, OutputStream, signal};
7use std::borrow::Cow;
8use std::fmt::Debug;
9use std::io;
10use std::process::{ExitStatus, Stdio};
11use std::time::Duration;
12use thiserror::Error;
13use tokio::process::Child;
14
15#[derive(Debug, Error)]
16pub enum TerminationError {
17 #[error("Failed to send '{signal}' signal to process: {source}")]
18 SignallingFailed {
19 source: io::Error,
20 signal: &'static str,
21 },
22
23 #[error(
24 "Failed to terminate process. Graceful SIGINT termination failure: {not_terminated_after_sigint}. Graceful SIGTERM termination failure: {not_terminated_after_sigterm}. Forceful termination failure: {not_terminated_after_sigkill}"
25 )]
26 TerminationFailed {
27 not_terminated_after_sigint: io::Error,
28 not_terminated_after_sigterm: io::Error,
29 not_terminated_after_sigkill: io::Error,
30 },
31}
32
33#[derive(Debug)]
35pub enum RunningState {
36 Running,
38
39 Terminated(ExitStatus),
41
42 Uncertain(io::Error),
44}
45
46impl RunningState {
47 pub fn as_bool(&self) -> bool {
48 match self {
49 RunningState::Running => true,
50 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
51 }
52 }
53}
54
55impl From<RunningState> for bool {
56 fn from(is_running: RunningState) -> Self {
57 match is_running {
58 RunningState::Running => true,
59 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
60 }
61 }
62}
63
64#[derive(Debug, Error)]
66pub enum WaitError {
67 #[error("A general io error occurred")]
68 IoError(#[from] io::Error),
69
70 #[error("Collector failed")]
71 CollectorFailed(#[from] CollectorError),
72}
73
74#[derive(Debug)]
75pub struct ProcessHandle<O: OutputStream> {
76 pub(crate) name: Cow<'static, str>,
77 child: Child,
78 std_out_stream: O,
79 std_err_stream: O,
80 panic_on_drop: Option<PanicOnDrop>,
81}
82
83impl ProcessHandle<BroadcastOutputStream> {
84 pub fn spawn(
85 name: impl Into<Cow<'static, str>>,
86 cmd: tokio::process::Command,
87 ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
88 Self::spawn_with_capacity(name, cmd, 128, 128)
89 }
90
91 pub fn spawn_with_capacity(
92 name: impl Into<Cow<'static, str>>,
93 mut cmd: tokio::process::Command,
94 stdout_channel_capacity: usize,
95 stderr_channel_capacity: usize,
96 ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
97 Self::prepare_command(&mut cmd).spawn().map(|child| {
98 Self::new_from_child_with_piped_io_and_capacity(
99 name,
100 child,
101 stdout_channel_capacity,
102 stderr_channel_capacity,
103 )
104 })
105 }
106
107 fn new_from_child_with_piped_io_and_capacity(
108 name: impl Into<Cow<'static, str>>,
109 mut child: Child,
110 stdout_channel_capacity: usize,
111 stderr_channel_capacity: usize,
112 ) -> ProcessHandle<BroadcastOutputStream> {
113 let stdout = child
114 .stdout
115 .take()
116 .expect("Child process stdout wasn't captured");
117 let stderr = child
118 .stderr
119 .take()
120 .expect("Child process stderr wasn't captured");
121
122 let (child, std_out_stream, std_err_stream) = (
123 child,
124 BroadcastOutputStream::from_stream(
125 stdout,
126 FromStreamOptions {
127 channel_capacity: stdout_channel_capacity,
128 ..Default::default()
129 },
130 ),
131 BroadcastOutputStream::from_stream(
132 stderr,
133 FromStreamOptions {
134 channel_capacity: stderr_channel_capacity,
135 ..Default::default()
136 },
137 ),
138 );
139
140 let mut this = ProcessHandle {
141 name: name.into(),
142 child,
143 std_out_stream,
144 std_err_stream,
145 panic_on_drop: None,
146 };
147 this.must_be_terminated();
148 this
149 }
150
151 pub async fn wait_with_output(
152 &mut self,
153 options: LineParsingOptions,
154 ) -> Result<(ExitStatus, Vec<String>, Vec<String>), WaitError> {
155 let out_collector = self.std_out_stream.collect_lines_into_vec(options);
156 let err_collector = self.std_err_stream.collect_lines_into_vec(options);
157
158 let status = self.wait().await?;
159 let std_out = out_collector.cancel().await?;
160 let std_err = err_collector.cancel().await?;
161
162 Ok((status, std_out, std_err))
163 }
164}
165
166impl ProcessHandle<SingleSubscriberOutputStream> {
167 pub fn spawn(
168 name: impl Into<Cow<'static, str>>,
169 cmd: tokio::process::Command,
170 ) -> io::Result<Self> {
171 Self::spawn_with_capacity(name, cmd, 128, 128)
172 }
173
174 pub fn spawn_with_capacity(
175 name: impl Into<Cow<'static, str>>,
176 mut cmd: tokio::process::Command,
177 stdout_channel_capacity: usize,
178 stderr_channel_capacity: usize,
179 ) -> io::Result<Self> {
180 Self::prepare_command(&mut cmd).spawn().map(|child| {
181 Self::new_from_child_with_piped_io_and_capacity(
182 name,
183 child,
184 stdout_channel_capacity,
185 stderr_channel_capacity,
186 )
187 })
188 }
189
190 fn new_from_child_with_piped_io_and_capacity(
191 name: impl Into<Cow<'static, str>>,
192 mut child: Child,
193 stdout_channel_capacity: usize,
194 stderr_channel_capacity: usize,
195 ) -> Self {
196 let stdout = child
197 .stdout
198 .take()
199 .expect("Child process stdout wasn't captured");
200 let stderr = child
201 .stderr
202 .take()
203 .expect("Child process stderr wasn't captured");
204
205 let (child, std_out_stream, std_err_stream) = (
206 child,
207 SingleSubscriberOutputStream::from_stream(
208 stdout,
209 BackpressureControl::DropLatestIncomingIfBufferFull,
210 FromStreamOptions {
211 channel_capacity: stdout_channel_capacity,
212 ..Default::default()
213 },
214 ),
215 SingleSubscriberOutputStream::from_stream(
216 stderr,
217 BackpressureControl::DropLatestIncomingIfBufferFull,
218 FromStreamOptions {
219 channel_capacity: stderr_channel_capacity,
220 ..Default::default()
221 },
222 ),
223 );
224
225 let mut this = ProcessHandle {
226 name: name.into(),
227 child,
228 std_out_stream,
229 std_err_stream,
230 panic_on_drop: None,
231 };
232 this.must_be_terminated();
233 this
234 }
235
236 pub async fn wait_with_output(
237 &mut self,
238 options: LineParsingOptions,
239 ) -> Result<(ExitStatus, Vec<String>, Vec<String>), WaitError> {
240 let out_collector = self.std_out_stream.collect_lines_into_vec(options);
241 let err_collector = self.std_err_stream.collect_lines_into_vec(options);
242
243 let status = self.wait().await?;
244 let std_out = out_collector.cancel().await?;
245 let std_err = err_collector.cancel().await?;
246
247 Ok((status, std_out, std_err))
248 }
249}
250
251impl<O: OutputStream> ProcessHandle<O> {
252 fn prepare_platform_specifics(
260 command: &mut tokio::process::Command,
261 ) -> &mut tokio::process::Command {
262 #[cfg(windows)]
263 {
264 use windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
265
266 let flag = if self.graceful_exit {
267 CREATE_NEW_PROCESS_GROUP.0
268 } else {
269 0
270 };
271 command.creation_flags(flag)
272 }
273 #[cfg(not(windows))]
274 {
275 command
276 }
277 }
278
279 fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
280 Self::prepare_platform_specifics(command)
281 .stdout(Stdio::piped())
282 .stderr(Stdio::piped())
283 .kill_on_drop(true)
289 }
290
291 pub fn id(&self) -> Option<u32> {
292 self.child.id()
293 }
294
295 pub fn is_running(&mut self) -> RunningState {
297 match self.child.try_wait() {
298 Ok(None) => RunningState::Running,
299 Ok(Some(exit_status)) => {
300 self.must_not_be_terminated();
301 RunningState::Terminated(exit_status)
302 }
303 Err(err) => RunningState::Uncertain(err),
304 }
305 }
306
307 pub fn stdout(&self) -> &O {
308 &self.std_out_stream
309 }
310 pub fn stdout_mut(&mut self) -> &mut O {
311 &mut self.std_out_stream
312 }
313
314 pub fn stderr(&self) -> &O {
315 &self.std_err_stream
316 }
317
318 pub fn stderr_mut(&mut self) -> &mut O {
319 &mut self.std_err_stream
320 }
321
322 pub fn must_be_terminated(&mut self) {
340 self.panic_on_drop = Some(PanicOnDrop {
341 resource_name: "ProcessHandle".into(),
342 details: "Call `terminate()` before the type is dropped!".into(),
343 armed: true,
344 });
345 }
346
347 pub fn must_not_be_terminated(&mut self) {
348 if let Some(mut it) = self.panic_on_drop.take() {
349 it.defuse()
350 }
351 }
352
353 pub fn terminate_on_drop(
359 self,
360 graceful_termination_timeout: Duration,
361 forceful_termination_timeout: Duration,
362 ) -> TerminateOnDrop<O> {
363 TerminateOnDrop {
364 process_handle: self,
365 interrupt_timeout: graceful_termination_timeout,
366 terminate_timeout: forceful_termination_timeout,
367 }
368 }
369
370 pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
374 signal::send_interrupt(&self.child)
375 }
376
377 pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
381 signal::send_terminate(&self.child)
382 }
383
384 pub async fn terminate(
387 &mut self,
388 interrupt_timeout: Duration,
389 terminate_timeout: Duration,
390 ) -> Result<ExitStatus, TerminationError> {
391 self.must_not_be_terminated();
394
395 self.send_interrupt_signal()
396 .map_err(|err| TerminationError::SignallingFailed {
397 source: err,
398 signal: "SIGINT",
399 })?;
400
401 match self.wait_for_completion(Some(interrupt_timeout)).await {
402 Ok(exit_status) => Ok(exit_status),
403 Err(not_terminated_after_sigint) => {
404 tracing::warn!(
405 process = %self.name,
406 error = %not_terminated_after_sigint,
407 "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
408 );
409
410 self.send_terminate_signal()
411 .map_err(|err| TerminationError::SignallingFailed {
412 source: err,
413 signal: "SIGTERM",
414 })?;
415
416 match self.wait_for_completion(Some(terminate_timeout)).await {
417 Ok(exit_status) => Ok(exit_status),
418 Err(not_terminated_after_sigterm) => {
419 tracing::warn!(
420 process = %self.name,
421 error = %not_terminated_after_sigterm,
422 "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
423 );
424
425 match self.kill().await {
426 Ok(()) => {
427 match self.wait_for_completion(Some(Duration::from_secs(3))).await {
435 Ok(exit_status) => Ok(exit_status),
436 Err(not_terminated_after_sigkill) => {
437 tracing::error!(
439 "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
440 self.name
441 );
442 Err(TerminationError::TerminationFailed {
443 not_terminated_after_sigint,
444 not_terminated_after_sigterm,
445 not_terminated_after_sigkill,
446 })
447 }
448 }
449 }
450 Err(not_terminated_after_sigkill) => {
451 tracing::error!(
452 process = %self.name,
453 error = %not_terminated_after_sigkill,
454 "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
455 );
456
457 Err(TerminationError::TerminationFailed {
458 not_terminated_after_sigint,
459 not_terminated_after_sigterm,
460 not_terminated_after_sigkill,
461 })
462 }
463 }
464 }
465 }
466 }
467 }
468 }
469
470 pub async fn kill(&mut self) -> io::Result<()> {
471 self.child.kill().await
472 }
473
474 async fn wait(&mut self) -> io::Result<ExitStatus> {
479 match self.child.wait().await {
480 Ok(status) => {
481 self.must_not_be_terminated();
482 Ok(status)
483 }
484 Err(err) => Err(err),
485 }
486 }
487
488 pub async fn wait_for_completion(
490 &mut self,
491 timeout: Option<Duration>,
492 ) -> io::Result<ExitStatus> {
493 match timeout {
494 None => self.wait().await,
495 Some(timeout) => match tokio::time::timeout(timeout, self.wait()).await {
496 Ok(exit_status) => exit_status,
497 Err(err) => Err(err.into()),
498 },
499 }
500 }
501
502 pub async fn wait_for_completion_or_terminate(
503 &mut self,
504 wait_timeout: Duration,
505 interrupt_timeout: Duration,
506 terminate_timeout: Duration,
507 ) -> Result<ExitStatus, TerminationError> {
508 match self.wait_for_completion(Some(wait_timeout)).await {
509 Ok(exit_status) => Ok(exit_status),
510 Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
511 }
512 }
513
514 pub fn into_inner(self) -> (Child, O, O) {
517 (self.child, self.std_out_stream, self.std_err_stream)
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use assertr::prelude::*;
525
526 #[tokio::test]
527 async fn test_termination() {
528 let mut cmd = tokio::process::Command::new("sleep");
529 cmd.arg("5");
530
531 let started_at = jiff::Zoned::now();
532 let mut handle = ProcessHandle::<BroadcastOutputStream>::spawn("sleep", cmd).unwrap();
533 tokio::time::sleep(Duration::from_millis(100)).await;
534 let exit_status = handle
535 .terminate(Duration::from_secs(1), Duration::from_secs(1))
536 .await
537 .unwrap();
538 let terminated_at = jiff::Zoned::now();
539
540 let ran_for = started_at.duration_until(&terminated_at);
544 assert_that(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
545
546 assert_that(exit_status.code()).is_none();
548 }
549}