1use crate::output_stream::broadcast::BroadcastOutputStream;
2use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
3use crate::output_stream::{BackpressureControl, FromStreamOptions};
4use crate::panic_on_drop::PanicOnDrop;
5use crate::terminate_on_drop::TerminateOnDrop;
6use crate::{CollectorError, LineParsingOptions, OutputStream, signal};
7use std::borrow::Cow;
8use std::fmt::Debug;
9use std::io;
10use std::process::{ExitStatus, Stdio};
11use std::time::Duration;
12use thiserror::Error;
13use tokio::process::Child;
14
15#[derive(Debug, Error)]
16pub enum TerminationError {
17 #[error("Failed to send '{signal}' signal to process: {source}")]
18 SignallingFailed {
19 source: io::Error,
20 signal: &'static str,
21 },
22
23 #[error(
24 "Failed to terminate process. Graceful SIGINT termination failure: {not_terminated_after_sigint}. Graceful SIGTERM termination failure: {not_terminated_after_sigterm}. Forceful termination failure: {not_terminated_after_sigkill}"
25 )]
26 TerminationFailed {
27 not_terminated_after_sigint: io::Error,
28 not_terminated_after_sigterm: io::Error,
29 not_terminated_after_sigkill: io::Error,
30 },
31}
32
33#[derive(Debug)]
35pub enum RunningState {
36 Running,
38
39 Terminated(ExitStatus),
41
42 Uncertain(io::Error),
44}
45
46impl RunningState {
47 pub fn as_bool(&self) -> bool {
48 match self {
49 RunningState::Running => true,
50 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
51 }
52 }
53}
54
55impl From<RunningState> for bool {
56 fn from(is_running: RunningState) -> Self {
57 match is_running {
58 RunningState::Running => true,
59 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
60 }
61 }
62}
63
64#[derive(Debug, Error)]
66pub enum WaitError {
67 #[error("A general io error occurred")]
68 IoError(#[from] io::Error),
69
70 #[error("Collector failed")]
71 CollectorFailed(#[from] CollectorError),
72}
73
74#[derive(Debug)]
75pub struct ProcessHandle<O: OutputStream> {
76 pub(crate) name: Cow<'static, str>,
77 child: Child,
78 std_out_stream: O,
79 std_err_stream: O,
80 panic_on_drop: Option<PanicOnDrop>,
81}
82
83impl ProcessHandle<BroadcastOutputStream> {
84 pub fn spawn(
85 name: impl Into<Cow<'static, str>>,
86 cmd: tokio::process::Command,
87 ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
88 Self::spawn_with_capacity(name, cmd, 128, 128)
89 }
90
91 pub fn spawn_with_capacity(
92 name: impl Into<Cow<'static, str>>,
93 mut cmd: tokio::process::Command,
94 stdout_channel_capacity: usize,
95 stderr_channel_capacity: usize,
96 ) -> io::Result<ProcessHandle<BroadcastOutputStream>> {
97 Self::prepare_command(&mut cmd).spawn().map(|child| {
98 Self::new_from_child_with_piped_io_and_capacity(
99 name,
100 child,
101 stdout_channel_capacity,
102 stderr_channel_capacity,
103 )
104 })
105 }
106
107 fn new_from_child_with_piped_io_and_capacity(
108 name: impl Into<Cow<'static, str>>,
109 mut child: Child,
110 stdout_channel_capacity: usize,
111 stderr_channel_capacity: usize,
112 ) -> ProcessHandle<BroadcastOutputStream> {
113 let stdout = child
114 .stdout
115 .take()
116 .expect("Child process stdout wasn't captured");
117 let stderr = child
118 .stderr
119 .take()
120 .expect("Child process stderr wasn't captured");
121
122 let (child, std_out_stream, std_err_stream) = (
123 child,
124 BroadcastOutputStream::from_stream(
125 stdout,
126 FromStreamOptions {
127 channel_capacity: stdout_channel_capacity,
128 ..Default::default()
129 },
130 ),
131 BroadcastOutputStream::from_stream(
132 stderr,
133 FromStreamOptions {
134 channel_capacity: stderr_channel_capacity,
135 ..Default::default()
136 },
137 ),
138 );
139
140 let mut this = ProcessHandle {
141 name: name.into(),
142 child,
143 std_out_stream,
144 std_err_stream,
145 panic_on_drop: None,
146 };
147 this.must_be_terminated();
148 this
149 }
150
151 pub async fn wait_with_output(
152 &mut self,
153 options: LineParsingOptions,
154 ) -> Result<(ExitStatus, Vec<String>, Vec<String>), WaitError> {
155 let out_collector = self.std_out_stream.collect_lines_into_vec(options);
156 let err_collector = self.std_err_stream.collect_lines_into_vec(options);
157
158 let status = self.wait().await?;
159 let std_out = out_collector.cancel().await?;
160 let std_err = err_collector.cancel().await?;
161
162 Ok((status, std_out, std_err))
163 }
164}
165
166impl ProcessHandle<SingleSubscriberOutputStream> {
167 pub fn spawn(
168 name: impl Into<Cow<'static, str>>,
169 cmd: tokio::process::Command,
170 ) -> io::Result<Self> {
171 Self::spawn_with_capacity(name, cmd, 128, 128)
172 }
173
174 pub fn spawn_with_capacity(
175 name: impl Into<Cow<'static, str>>,
176 mut cmd: tokio::process::Command,
177 stdout_channel_capacity: usize,
178 stderr_channel_capacity: usize,
179 ) -> io::Result<Self> {
180 Self::prepare_command(&mut cmd).spawn().map(|child| {
181 Self::new_from_child_with_piped_io_and_capacity(
182 name,
183 child,
184 stdout_channel_capacity,
185 stderr_channel_capacity,
186 )
187 })
188 }
189
190 fn new_from_child_with_piped_io_and_capacity(
191 name: impl Into<Cow<'static, str>>,
192 mut child: Child,
193 stdout_channel_capacity: usize,
194 stderr_channel_capacity: usize,
195 ) -> Self {
196 let stdout = child
197 .stdout
198 .take()
199 .expect("Child process stdout wasn't captured");
200 let stderr = child
201 .stderr
202 .take()
203 .expect("Child process stderr wasn't captured");
204
205 let (child, std_out_stream, std_err_stream) = (
206 child,
207 SingleSubscriberOutputStream::from_stream(
208 stdout,
209 BackpressureControl::DropLatestIncomingIfBufferFull,
210 FromStreamOptions {
211 channel_capacity: stdout_channel_capacity,
212 ..Default::default()
213 },
214 ),
215 SingleSubscriberOutputStream::from_stream(
216 stderr,
217 BackpressureControl::DropLatestIncomingIfBufferFull,
218 FromStreamOptions {
219 channel_capacity: stderr_channel_capacity,
220 ..Default::default()
221 },
222 ),
223 );
224
225 let mut this = ProcessHandle {
226 name: name.into(),
227 child,
228 std_out_stream,
229 std_err_stream,
230 panic_on_drop: None,
231 };
232 this.must_be_terminated();
233 this
234 }
235
236 pub async fn wait_with_output(
237 &mut self,
238 options: LineParsingOptions,
239 ) -> Result<(ExitStatus, Vec<String>, Vec<String>), WaitError> {
240 let out_collector = self.std_out_stream.collect_lines_into_vec(options);
241 let err_collector = self.std_err_stream.collect_lines_into_vec(options);
242
243 let status = self.wait().await?;
244 let std_out = out_collector.cancel().await?;
245 let std_err = err_collector.cancel().await?;
246
247 Ok((status, std_out, std_err))
248 }
249}
250
251impl<O: OutputStream> ProcessHandle<O> {
252 fn prepare_platform_specifics(
260 command: &mut tokio::process::Command,
261 ) -> &mut tokio::process::Command {
262 #[cfg(windows)]
263 {
264 use windows::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
265
266 let flag = if self.graceful_exit {
267 CREATE_NEW_PROCESS_GROUP.0
268 } else {
269 0
270 };
271 command.creation_flags(flag)
272 }
273 #[cfg(not(windows))]
274 {
275 command
276 }
277 }
278
279 fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
280 Self::prepare_platform_specifics(command)
281 .stdout(Stdio::piped())
282 .stderr(Stdio::piped())
283 .kill_on_drop(true)
289 }
290
291 pub fn id(&self) -> Option<u32> {
292 self.child.id()
293 }
294
295 pub fn is_running(&mut self) -> RunningState {
297 match self.child.try_wait() {
298 Ok(None) => RunningState::Running,
299 Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
300 Err(err) => RunningState::Uncertain(err),
301 }
302 }
303
304 pub fn stdout(&self) -> &O {
305 &self.std_out_stream
306 }
307 pub fn stdout_mut(&mut self) -> &mut O {
308 &mut self.std_out_stream
309 }
310
311 pub fn stderr(&self) -> &O {
312 &self.std_err_stream
313 }
314
315 pub fn stderr_mut(&mut self) -> &mut O {
316 &mut self.std_err_stream
317 }
318
319 pub fn must_be_terminated(&mut self) {
337 self.panic_on_drop = Some(PanicOnDrop {
338 resource_name: "ProcessHandle".into(),
339 details: "Call `terminate()` before the type is dropped!".into(),
340 armed: true,
341 });
342 }
343
344 pub fn must_not_be_terminated(&mut self) {
345 if let Some(mut it) = self.panic_on_drop.take() {
346 it.defuse()
347 }
348 }
349
350 pub fn terminate_on_drop(
356 self,
357 graceful_termination_timeout: Duration,
358 forceful_termination_timeout: Duration,
359 ) -> TerminateOnDrop<O> {
360 TerminateOnDrop {
361 process_handle: self,
362 interrupt_timeout: graceful_termination_timeout,
363 terminate_timeout: forceful_termination_timeout,
364 }
365 }
366
367 pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
371 signal::send_interrupt(&self.child)
372 }
373
374 pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
378 signal::send_terminate(&self.child)
379 }
380
381 pub async fn terminate(
384 &mut self,
385 interrupt_timeout: Duration,
386 terminate_timeout: Duration,
387 ) -> Result<ExitStatus, TerminationError> {
388 self.must_not_be_terminated();
391
392 self.send_interrupt_signal()
393 .map_err(|err| TerminationError::SignallingFailed {
394 source: err,
395 signal: "SIGINT",
396 })?;
397
398 match self.wait_for_completion(Some(interrupt_timeout)).await {
399 Ok(exit_status) => Ok(exit_status),
400 Err(not_terminated_after_sigint) => {
401 tracing::warn!(
402 process = %self.name,
403 error = %not_terminated_after_sigint,
404 "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
405 );
406
407 self.send_terminate_signal()
408 .map_err(|err| TerminationError::SignallingFailed {
409 source: err,
410 signal: "SIGTERM",
411 })?;
412
413 match self.wait_for_completion(Some(terminate_timeout)).await {
414 Ok(exit_status) => Ok(exit_status),
415 Err(not_terminated_after_sigterm) => {
416 tracing::warn!(
417 process = %self.name,
418 error = %not_terminated_after_sigterm,
419 "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
420 );
421
422 match self.kill().await {
423 Ok(()) => {
424 match self.wait_for_completion(Some(Duration::from_secs(3))).await {
432 Ok(exit_status) => Ok(exit_status),
433 Err(not_terminated_after_sigkill) => {
434 tracing::error!(
436 "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
437 self.name
438 );
439 Err(TerminationError::TerminationFailed {
440 not_terminated_after_sigint,
441 not_terminated_after_sigterm,
442 not_terminated_after_sigkill,
443 })
444 }
445 }
446 }
447 Err(not_terminated_after_sigkill) => {
448 tracing::error!(
449 process = %self.name,
450 error = %not_terminated_after_sigkill,
451 "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
452 );
453
454 Err(TerminationError::TerminationFailed {
455 not_terminated_after_sigint,
456 not_terminated_after_sigterm,
457 not_terminated_after_sigkill,
458 })
459 }
460 }
461 }
462 }
463 }
464 }
465 }
466
467 pub async fn kill(&mut self) -> io::Result<()> {
468 self.child.kill().await
469 }
470
471 async fn wait(&mut self) -> io::Result<ExitStatus> {
476 match self.child.wait().await {
477 Ok(status) => {
478 self.must_not_be_terminated();
479 Ok(status)
480 }
481 Err(err) => Err(err),
482 }
483 }
484
485 pub async fn wait_for_completion(
487 &mut self,
488 timeout: Option<Duration>,
489 ) -> io::Result<ExitStatus> {
490 match timeout {
491 None => self.wait().await,
492 Some(timeout) => match tokio::time::timeout(timeout, self.wait()).await {
493 Ok(exit_status) => exit_status,
494 Err(err) => Err(err.into()),
495 },
496 }
497 }
498
499 pub async fn wait_for_completion_or_terminate(
500 &mut self,
501 wait_timeout: Duration,
502 interrupt_timeout: Duration,
503 terminate_timeout: Duration,
504 ) -> Result<ExitStatus, TerminationError> {
505 match self.wait_for_completion(Some(wait_timeout)).await {
506 Ok(exit_status) => Ok(exit_status),
507 Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
508 }
509 }
510
511 pub fn into_inner(self) -> (Child, O, O) {
514 (self.child, self.std_out_stream, self.std_err_stream)
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use assertr::prelude::*;
522
523 #[tokio::test]
524 async fn test_termination() {
525 let mut cmd = tokio::process::Command::new("sleep");
526 cmd.arg("5");
527
528 let started_at = jiff::Zoned::now();
529 let mut handle = ProcessHandle::<BroadcastOutputStream>::spawn("sleep", cmd).unwrap();
530 tokio::time::sleep(Duration::from_millis(100)).await;
531 let exit_status = handle
532 .terminate(Duration::from_secs(1), Duration::from_secs(1))
533 .await
534 .unwrap();
535 let terminated_at = jiff::Zoned::now();
536
537 let ran_for = started_at.duration_until(&terminated_at);
541 assert_that(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
542
543 assert_that(exit_status.code()).is_none();
545 }
546}