1use crate::error::{SpawnError, TerminationError, WaitError};
2use crate::output::{Output, RawOutput};
3use crate::output_stream::broadcast::BroadcastOutputStream;
4use crate::output_stream::single_subscriber::SingleSubscriberOutputStream;
5use crate::output_stream::{BackpressureControl, FromStreamOptions};
6use crate::panic_on_drop::PanicOnDrop;
7use crate::terminate_on_drop::TerminateOnDrop;
8use crate::{LineParsingOptions, NumBytes, OutputStream, signal};
9use std::borrow::Cow;
10use std::fmt::Debug;
11use std::io;
12use std::process::{ExitStatus, Stdio};
13use std::time::Duration;
14use tokio::process::{Child, ChildStdin};
15
16const STDOUT_STREAM_NAME: &str = "stdout";
17const STDERR_STREAM_NAME: &str = "stderr";
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub(crate) struct SingleSubscriberStreamConfig {
21 pub(crate) chunk_size: NumBytes,
22 pub(crate) channel_capacity: usize,
23 pub(crate) backpressure_control: BackpressureControl,
24}
25
26const SIGKILL_WAIT_TIMEOUT: Duration = Duration::from_secs(3);
31
32#[derive(Debug)]
38pub enum Stdin {
39 Open(ChildStdin),
41 Closed,
43}
44
45impl Stdin {
46 #[must_use]
48 pub fn is_open(&self) -> bool {
49 matches!(self, Stdin::Open(_))
50 }
51
52 pub fn as_mut(&mut self) -> Option<&mut ChildStdin> {
54 match self {
55 Stdin::Open(stdin) => Some(stdin),
56 Stdin::Closed => None,
57 }
58 }
59
60 pub fn close(&mut self) {
65 *self = Stdin::Closed;
66 }
67}
68
69#[derive(Debug)]
71pub enum RunningState {
72 Running,
74
75 Terminated(ExitStatus),
77
78 Uncertain(io::Error),
80}
81
82impl RunningState {
83 #[must_use]
85 pub fn as_bool(&self) -> bool {
86 match self {
87 RunningState::Running => true,
88 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
89 }
90 }
91}
92
93impl From<RunningState> for bool {
94 fn from(is_running: RunningState) -> Self {
95 match is_running {
96 RunningState::Running => true,
97 RunningState::Terminated(_) | RunningState::Uncertain(_) => false,
98 }
99 }
100}
101
102#[derive(Debug)]
111pub struct ProcessHandle<O: OutputStream> {
112 pub(crate) name: Cow<'static, str>,
113 child: Child,
114 std_in: Stdin,
115 std_out_stream: O,
116 std_err_stream: O,
117 cleanup_on_drop: bool,
118 panic_on_drop: Option<PanicOnDrop>,
119}
120
121impl<O: OutputStream> Drop for ProcessHandle<O> {
122 fn drop(&mut self) {
123 if self.cleanup_on_drop {
124 if let Err(err) = self.child.start_kill() {
128 tracing::warn!(
129 process = %self.name,
130 error = %err,
131 "Failed to kill process while dropping an armed ProcessHandle"
132 );
133 }
134 }
135 }
136}
137
138impl ProcessHandle<BroadcastOutputStream> {
139 pub(crate) fn spawn_with_capacity(
144 name: impl Into<Cow<'static, str>>,
145 mut cmd: tokio::process::Command,
146 stdout_chunk_size: NumBytes,
147 stderr_chunk_size: NumBytes,
148 stdout_channel_capacity: usize,
149 stderr_channel_capacity: usize,
150 ) -> Result<ProcessHandle<BroadcastOutputStream>, SpawnError> {
151 stdout_chunk_size.assert_non_zero("stdout_chunk_size");
152 stderr_chunk_size.assert_non_zero("stderr_chunk_size");
153
154 let process_name = name.into();
155 Self::prepare_command(&mut cmd)
156 .spawn()
157 .map(|child| {
158 Self::new_from_child_with_piped_io_and_capacity(
159 process_name.clone(),
160 child,
161 stdout_chunk_size,
162 stderr_chunk_size,
163 stdout_channel_capacity,
164 stderr_channel_capacity,
165 )
166 })
167 .map_err(|source| SpawnError::SpawnFailed {
168 process_name,
169 source,
170 })
171 }
172
173 fn new_from_child_with_piped_io_and_capacity(
174 name: impl Into<Cow<'static, str>>,
175 mut child: Child,
176 stdout_chunk_size: NumBytes,
177 stderr_chunk_size: NumBytes,
178 stdout_channel_capacity: usize,
179 stderr_channel_capacity: usize,
180 ) -> ProcessHandle<BroadcastOutputStream> {
181 let std_in = match child.stdin.take() {
182 Some(stdin) => Stdin::Open(stdin),
183 None => Stdin::Closed,
184 };
185 let stdout = child
186 .stdout
187 .take()
188 .expect("Child process stdout wasn't captured");
189 let stderr = child
190 .stderr
191 .take()
192 .expect("Child process stderr wasn't captured");
193
194 let std_out_stream = BroadcastOutputStream::from_stream(
195 stdout,
196 STDOUT_STREAM_NAME,
197 FromStreamOptions {
198 chunk_size: stdout_chunk_size,
199 channel_capacity: stdout_channel_capacity,
200 },
201 );
202 let std_err_stream = BroadcastOutputStream::from_stream(
203 stderr,
204 STDERR_STREAM_NAME,
205 FromStreamOptions {
206 chunk_size: stderr_chunk_size,
207 channel_capacity: stderr_channel_capacity,
208 },
209 );
210
211 let mut this = ProcessHandle {
212 name: name.into(),
213 child,
214 std_in,
215 std_out_stream,
216 std_err_stream,
217 cleanup_on_drop: false,
218 panic_on_drop: None,
219 };
220 this.must_be_terminated();
221 this
222 }
223
224 pub async fn wait_for_completion_with_output(
246 &mut self,
247 timeout: Option<Duration>,
248 options: LineParsingOptions,
249 ) -> Result<Output, WaitError> {
250 let out_collector = self.stdout().collect_lines_into_vec(options);
251 let err_collector = self.stderr().collect_lines_into_vec(options);
252
253 let status = self.wait_for_completion(timeout).await?;
254
255 let stdout = out_collector.wait().await?;
256 let stderr = err_collector.wait().await?;
257
258 Ok(Output {
259 status,
260 stdout,
261 stderr,
262 })
263 }
264
265 pub async fn wait_for_completion_with_raw_output(
273 &mut self,
274 timeout: Option<Duration>,
275 ) -> Result<RawOutput, WaitError> {
276 let out_collector = self.stdout().collect_chunks_into_vec();
277 let err_collector = self.stderr().collect_chunks_into_vec();
278
279 let status = self.wait_for_completion(timeout).await?;
280
281 let stdout = out_collector.wait().await?;
282 let stderr = err_collector.wait().await?;
283
284 Ok(RawOutput {
285 status,
286 stdout,
287 stderr,
288 })
289 }
290
291 pub async fn wait_for_completion_with_output_or_terminate(
299 &mut self,
300 wait_timeout: Duration,
301 interrupt_timeout: Duration,
302 terminate_timeout: Duration,
303 options: LineParsingOptions,
304 ) -> Result<Output, WaitError> {
305 let out_collector = self.stdout().collect_lines_into_vec(options);
306 let err_collector = self.stderr().collect_lines_into_vec(options);
307
308 let status = self
309 .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
310 .await?;
311
312 let stdout = out_collector.wait().await?;
313 let stderr = err_collector.wait().await?;
314
315 Ok(Output {
316 status,
317 stdout,
318 stderr,
319 })
320 }
321
322 pub async fn wait_for_completion_with_raw_output_or_terminate(
330 &mut self,
331 wait_timeout: Duration,
332 interrupt_timeout: Duration,
333 terminate_timeout: Duration,
334 ) -> Result<RawOutput, WaitError> {
335 let out_collector = self.stdout().collect_chunks_into_vec();
336 let err_collector = self.stderr().collect_chunks_into_vec();
337
338 let status = self
339 .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
340 .await?;
341
342 let stdout = out_collector.wait().await?;
343 let stderr = err_collector.wait().await?;
344
345 Ok(RawOutput {
346 status,
347 stdout,
348 stderr,
349 })
350 }
351}
352
353impl ProcessHandle<SingleSubscriberOutputStream> {
354 pub(crate) fn spawn_with_capacity(
359 name: impl Into<Cow<'static, str>>,
360 mut cmd: tokio::process::Command,
361 stdout_config: SingleSubscriberStreamConfig,
362 stderr_config: SingleSubscriberStreamConfig,
363 ) -> Result<Self, SpawnError> {
364 stdout_config
365 .chunk_size
366 .assert_non_zero("stdout_config.chunk_size");
367 stderr_config
368 .chunk_size
369 .assert_non_zero("stderr_config.chunk_size");
370
371 let process_name = name.into();
372 Self::prepare_command(&mut cmd)
373 .spawn()
374 .map(|child| {
375 Self::new_from_child_with_piped_io_and_capacity(
376 process_name.clone(),
377 child,
378 stdout_config,
379 stderr_config,
380 )
381 })
382 .map_err(|source| SpawnError::SpawnFailed {
383 process_name,
384 source,
385 })
386 }
387
388 fn new_from_child_with_piped_io_and_capacity(
389 name: impl Into<Cow<'static, str>>,
390 mut child: Child,
391 stdout_config: SingleSubscriberStreamConfig,
392 stderr_config: SingleSubscriberStreamConfig,
393 ) -> Self {
394 let std_in = match child.stdin.take() {
395 Some(stdin) => Stdin::Open(stdin),
396 None => Stdin::Closed,
397 };
398 let stdout = child
399 .stdout
400 .take()
401 .expect("Child process stdout wasn't captured");
402 let stderr = child
403 .stderr
404 .take()
405 .expect("Child process stderr wasn't captured");
406
407 let std_out_stream = SingleSubscriberOutputStream::from_stream(
408 stdout,
409 STDOUT_STREAM_NAME,
410 stdout_config.backpressure_control,
411 FromStreamOptions {
412 chunk_size: stdout_config.chunk_size,
413 channel_capacity: stdout_config.channel_capacity,
414 },
415 );
416 let std_err_stream = SingleSubscriberOutputStream::from_stream(
417 stderr,
418 STDERR_STREAM_NAME,
419 stderr_config.backpressure_control,
420 FromStreamOptions {
421 chunk_size: stderr_config.chunk_size,
422 channel_capacity: stderr_config.channel_capacity,
423 },
424 );
425
426 let mut this = ProcessHandle {
427 name: name.into(),
428 child,
429 std_in,
430 std_out_stream,
431 std_err_stream,
432 cleanup_on_drop: false,
433 panic_on_drop: None,
434 };
435 this.must_be_terminated();
436 this
437 }
438
439 pub async fn wait_for_completion_with_output(
460 &mut self,
461 timeout: Option<Duration>,
462 options: LineParsingOptions,
463 ) -> Result<Output, WaitError> {
464 let out_collector = self.stdout().collect_lines_into_vec(options);
465 let err_collector = self.stderr().collect_lines_into_vec(options);
466
467 let status = self.wait_for_completion(timeout).await?;
468
469 let stdout = out_collector.wait().await?;
470 let stderr = err_collector.wait().await?;
471
472 Ok(Output {
473 status,
474 stdout,
475 stderr,
476 })
477 }
478
479 pub async fn wait_for_completion_with_raw_output(
487 &mut self,
488 timeout: Option<Duration>,
489 ) -> Result<RawOutput, WaitError> {
490 let out_collector = self.stdout().collect_chunks_into_vec();
491 let err_collector = self.stderr().collect_chunks_into_vec();
492
493 let status = self.wait_for_completion(timeout).await?;
494
495 let stdout = out_collector.wait().await?;
496 let stderr = err_collector.wait().await?;
497
498 Ok(RawOutput {
499 status,
500 stdout,
501 stderr,
502 })
503 }
504
505 pub async fn wait_for_completion_with_output_or_terminate(
513 &mut self,
514 wait_timeout: Duration,
515 interrupt_timeout: Duration,
516 terminate_timeout: Duration,
517 options: LineParsingOptions,
518 ) -> Result<Output, WaitError> {
519 let out_collector = self.stdout().collect_lines_into_vec(options);
520 let err_collector = self.stderr().collect_lines_into_vec(options);
521
522 let status = self
523 .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
524 .await?;
525
526 let stdout = out_collector.wait().await?;
527 let stderr = err_collector.wait().await?;
528
529 Ok(Output {
530 status,
531 stdout,
532 stderr,
533 })
534 }
535
536 pub async fn wait_for_completion_with_raw_output_or_terminate(
544 &mut self,
545 wait_timeout: Duration,
546 interrupt_timeout: Duration,
547 terminate_timeout: Duration,
548 ) -> Result<RawOutput, WaitError> {
549 let out_collector = self.stdout().collect_chunks_into_vec();
550 let err_collector = self.stderr().collect_chunks_into_vec();
551
552 let status = self
553 .wait_for_completion_or_terminate(wait_timeout, interrupt_timeout, terminate_timeout)
554 .await?;
555
556 let stdout = out_collector.wait().await?;
557 let stderr = err_collector.wait().await?;
558
559 Ok(RawOutput {
560 status,
561 stdout,
562 stderr,
563 })
564 }
565}
566
567impl<O: OutputStream> ProcessHandle<O> {
568 fn prepare_platform_specifics(
576 command: &mut tokio::process::Command,
577 ) -> &mut tokio::process::Command {
578 #[cfg(windows)]
579 {
580 use windows_sys::Win32::System::Threading::CREATE_NEW_PROCESS_GROUP;
581 command.creation_flags(CREATE_NEW_PROCESS_GROUP)
582 }
583 #[cfg(not(windows))]
584 {
585 command
586 }
587 }
588
589 fn prepare_command(command: &mut tokio::process::Command) -> &mut tokio::process::Command {
590 Self::prepare_platform_specifics(command)
591 .stdin(Stdio::piped())
592 .stdout(Stdio::piped())
593 .stderr(Stdio::piped())
594 .kill_on_drop(false)
598 }
599
600 pub fn id(&self) -> Option<u32> {
604 self.child.id()
605 }
606
607 fn try_reap_exit_status(&mut self) -> Result<Option<ExitStatus>, io::Error> {
608 match self.child.try_wait() {
609 Ok(Some(exit_status)) => {
610 self.must_not_be_terminated();
611 Ok(Some(exit_status))
612 }
613 Ok(None) => Ok(None),
614 Err(err) => Err(err),
615 }
616 }
617
618 fn signalling_failed_or_reap(
619 &mut self,
620 signal: &'static str,
621 source: io::Error,
622 ) -> Result<ExitStatus, TerminationError> {
623 match self.try_reap_exit_status() {
624 Ok(Some(exit_status)) => Ok(exit_status),
625 Ok(None) | Err(_) => Err(TerminationError::SignallingFailed {
626 process_name: self.name.clone(),
627 source,
628 signal,
629 }),
630 }
631 }
632
633 pub fn is_running(&mut self) -> RunningState {
640 match self.try_reap_exit_status() {
641 Ok(None) => RunningState::Running,
642 Ok(Some(exit_status)) => RunningState::Terminated(exit_status),
643 Err(err) => RunningState::Uncertain(err),
644 }
645 }
646
647 pub fn stdin(&mut self) -> &mut Stdin {
675 &mut self.std_in
676 }
677
678 pub fn stdout(&self) -> &O {
684 &self.std_out_stream
685 }
686
687 pub fn stderr(&self) -> &O {
693 &self.std_err_stream
694 }
695
696 pub fn must_be_terminated(&mut self) {
714 self.cleanup_on_drop = true;
715 self.panic_on_drop = Some(PanicOnDrop::new(
716 "tokio_process_tools::ProcessHandle",
717 "The process was not terminated.",
718 "Call `wait_for_completion` or `terminate` before the type is dropped!",
719 ));
720 }
721
722 pub fn must_not_be_terminated(&mut self) {
731 self.cleanup_on_drop = false;
732 self.defuse_drop_panic();
733 }
734
735 fn defuse_drop_panic(&mut self) {
736 if let Some(mut it) = self.panic_on_drop.take() {
737 it.defuse();
738 }
739 }
740
741 pub fn terminate_on_drop(
750 self,
751 graceful_termination_timeout: Duration,
752 forceful_termination_timeout: Duration,
753 ) -> TerminateOnDrop<O> {
754 TerminateOnDrop {
755 process_handle: self,
756 interrupt_timeout: graceful_termination_timeout,
757 terminate_timeout: forceful_termination_timeout,
758 }
759 }
760
761 pub fn send_interrupt_signal(&mut self) -> Result<(), io::Error> {
769 signal::send_interrupt(&self.child)
770 }
771
772 pub fn send_terminate_signal(&mut self) -> Result<(), io::Error> {
780 signal::send_terminate(&self.child)
781 }
782
783 pub async fn terminate(
793 &mut self,
794 interrupt_timeout: Duration,
795 terminate_timeout: Duration,
796 ) -> Result<ExitStatus, TerminationError> {
797 self.defuse_drop_panic();
802
803 if let Some(exit_status) =
804 self.try_reap_exit_status()
805 .map_err(|source| TerminationError::SignallingFailed {
806 process_name: self.name.clone(),
807 source,
808 signal: "SIGINT",
809 })?
810 {
811 return Ok(exit_status);
812 }
813
814 if let Err(err) = self.send_interrupt_signal() {
815 return self.signalling_failed_or_reap("SIGINT", err);
816 }
817
818 match self.wait_for_completion(Some(interrupt_timeout)).await {
819 Ok(exit_status) => Ok(exit_status),
820 Err(not_terminated_after_sigint) => {
821 tracing::warn!(
822 process = %self.name,
823 error = %not_terminated_after_sigint,
824 "Graceful shutdown using SIGINT (or equivalent on current platform) failed. Attempting graceful shutdown using SIGTERM signal."
825 );
826
827 if let Err(err) = self.send_terminate_signal() {
828 return self.signalling_failed_or_reap("SIGTERM", err);
829 }
830
831 match self.wait_for_completion(Some(terminate_timeout)).await {
832 Ok(exit_status) => Ok(exit_status),
833 Err(not_terminated_after_sigterm) => {
834 tracing::warn!(
835 process = %self.name,
836 error = %not_terminated_after_sigterm,
837 "Graceful shutdown using SIGTERM (or equivalent on current platform) failed. Attempting forceful shutdown using SIGKILL signal."
838 );
839
840 match self.kill().await {
841 Ok(()) => {
842 match self.wait_for_completion(Some(SIGKILL_WAIT_TIMEOUT)).await {
850 Ok(exit_status) => Ok(exit_status),
851 Err(not_terminated_after_sigkill) => {
852 tracing::error!(
854 "Process, having custom name '{}', did not terminate after receiving a SIGINT, SIGTERM and SIGKILL event (or equivalent on the current platform). Something must have gone horribly wrong... Process may still be running. Manual intervention and investigation required!",
855 self.name
856 );
857 Err(TerminationError::TerminationFailed {
858 process_name: self.name.clone(),
859 sigint_error: not_terminated_after_sigint.to_string(),
860 sigterm_error: not_terminated_after_sigterm.to_string(),
861 sigkill_error: io::Error::new(
862 io::ErrorKind::TimedOut,
863 not_terminated_after_sigkill.to_string(),
864 ),
865 })
866 }
867 }
868 }
869 Err(kill_error) => {
870 if let Ok(Some(exit_status)) = self.try_reap_exit_status() {
871 return Ok(exit_status);
872 }
873 tracing::error!(
874 process = %self.name,
875 error = %kill_error,
876 "Forceful shutdown using SIGKILL (or equivalent on current platform) failed. Process may still be running. Manual intervention required!"
877 );
878
879 Err(TerminationError::TerminationFailed {
880 process_name: self.name.clone(),
881 sigint_error: not_terminated_after_sigint.to_string(),
882 sigterm_error: not_terminated_after_sigterm.to_string(),
883 sigkill_error: kill_error,
884 })
885 }
886 }
887 }
888 }
889 }
890 }
891 }
892
893 pub async fn kill(&mut self) -> io::Result<()> {
901 self.child.kill().await
902 }
903
904 async fn wait(&mut self) -> io::Result<ExitStatus> {
909 match self.child.wait().await {
910 Ok(status) => {
911 self.must_not_be_terminated();
912 Ok(status)
913 }
914 Err(err) => Err(err),
915 }
916 }
917
918 pub async fn wait_for_completion(
933 &mut self,
934 timeout: Option<Duration>,
935 ) -> Result<ExitStatus, WaitError> {
936 match timeout {
937 None => self.wait().await.map_err(|source| WaitError::IoError {
938 process_name: self.name.clone(),
939 source,
940 }),
941 Some(timeout_duration) => {
942 match tokio::time::timeout(timeout_duration, self.wait()).await {
943 Ok(Ok(exit_status)) => Ok(exit_status),
944 Ok(Err(source)) => Err(WaitError::IoError {
945 process_name: self.name.clone(),
946 source,
947 }),
948 Err(_elapsed) => Err(WaitError::Timeout {
949 process_name: self.name.clone(),
950 timeout: timeout_duration,
951 }),
952 }
953 }
954 }
955 }
956
957 pub async fn wait_for_completion_or_terminate(
969 &mut self,
970 wait_timeout: Duration,
971 interrupt_timeout: Duration,
972 terminate_timeout: Duration,
973 ) -> Result<ExitStatus, TerminationError> {
974 match self.wait_for_completion(Some(wait_timeout)).await {
975 Ok(exit_status) => Ok(exit_status),
976 Err(_err) => self.terminate(interrupt_timeout, terminate_timeout).await,
977 }
978 }
979
980 pub fn into_inner(mut self) -> (Child, O, O) {
983 self.must_not_be_terminated();
984 let mut this = std::mem::ManuallyDrop::new(self);
985
986 unsafe {
987 let child = std::ptr::read(&raw const this.child);
988 let stdout = std::ptr::read(&raw const this.std_out_stream);
989 let stderr = std::ptr::read(&raw const this.std_err_stream);
990
991 std::ptr::drop_in_place(&raw mut this.name);
992 std::ptr::drop_in_place(&raw mut this.std_in);
994 std::ptr::drop_in_place(&raw mut this.panic_on_drop);
995
996 (child, stdout, stderr)
997 }
998 }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use super::*;
1004 use assertr::prelude::*;
1005 use std::fs;
1006 use std::sync::{Arc, Mutex};
1007 use tokio::io::AsyncWriteExt;
1008
1009 use crate::Next;
1010
1011 #[tokio::test]
1012 async fn test_termination() {
1013 let mut cmd = tokio::process::Command::new("sleep");
1014 cmd.arg("5");
1015
1016 let started_at = jiff::Zoned::now();
1017 let mut handle = crate::Process::new(cmd)
1018 .name("sleep")
1019 .spawn_broadcast()
1020 .unwrap();
1021 tokio::time::sleep(Duration::from_millis(100)).await;
1022 let exit_status = handle
1023 .terminate(Duration::from_secs(1), Duration::from_secs(1))
1024 .await
1025 .unwrap();
1026 let terminated_at = jiff::Zoned::now();
1027
1028 let ran_for = started_at.duration_until(&terminated_at);
1032 assert_that!(ran_for.as_secs_f32()).is_close_to(0.1, 0.5);
1033
1034 assert_that!(exit_status.code()).is_none();
1036 }
1037
1038 #[tokio::test]
1039 async fn terminate_returns_normal_exit_when_process_already_exited() {
1040 let mut cmd = tokio::process::Command::new("sh");
1041 cmd.arg("-c").arg("exit 0");
1042
1043 let mut handle = crate::Process::new(cmd)
1044 .name("sh")
1045 .spawn_broadcast()
1046 .unwrap();
1047
1048 tokio::time::sleep(Duration::from_millis(50)).await;
1049
1050 let exit_status = handle
1051 .terminate(Duration::from_millis(50), Duration::from_millis(50))
1052 .await
1053 .unwrap();
1054
1055 assert_that!(exit_status.success()).is_true();
1056 }
1057
1058 #[tokio::test]
1059 async fn test_stdin_write_and_read() {
1060 let cmd = tokio::process::Command::new("cat");
1061 let mut process = crate::Process::new(cmd)
1062 .name("cat")
1063 .spawn_broadcast()
1064 .unwrap();
1065
1066 assert_that!(process.stdin().is_open()).is_true();
1068
1069 let test_data = b"Hello from stdin!\n";
1071 if let Some(stdin) = process.stdin().as_mut() {
1072 stdin.write_all(test_data).await.unwrap();
1073 stdin.flush().await.unwrap();
1074 }
1075
1076 process.stdin().close();
1078 assert_that!(process.stdin().is_open()).is_false();
1079
1080 let output = process
1082 .wait_for_completion_with_output(
1083 Some(Duration::from_secs(2)),
1084 LineParsingOptions::default(),
1085 )
1086 .await
1087 .unwrap();
1088
1089 assert_that!(output.status.success()).is_true();
1090 assert_that!(&output.stdout).has_length(1);
1091 assert_that!(output.stdout[0].as_str()).is_equal_to("Hello from stdin!");
1092 }
1093
1094 #[tokio::test]
1095 async fn test_stdin_close_sends_eof() {
1096 let cmd = tokio::process::Command::new("cat");
1098 let mut process = crate::Process::new(cmd)
1099 .name("cat")
1100 .spawn_broadcast()
1101 .unwrap();
1102
1103 process.stdin().close();
1105 assert_that!(process.stdin().is_open()).is_false();
1106
1107 let status = process
1109 .wait_for_completion(Some(Duration::from_secs(2)))
1110 .await
1111 .unwrap();
1112
1113 assert_that!(status.success()).is_true();
1114 }
1115
1116 #[tokio::test]
1117 async fn test_stdin_multiple_writes() {
1118 let cmd = tokio::process::Command::new("cat");
1119 let mut process = crate::Process::new(cmd)
1120 .name("cat")
1121 .spawn_broadcast()
1122 .unwrap();
1123
1124 if let Some(stdin) = process.stdin().as_mut() {
1126 stdin.write_all(b"Line 1\n").await.unwrap();
1127 stdin.write_all(b"Line 2\n").await.unwrap();
1128 stdin.write_all(b"Line 3\n").await.unwrap();
1129 stdin.flush().await.unwrap();
1130 }
1131
1132 process.stdin().close();
1133
1134 let output = process
1135 .wait_for_completion_with_output(
1136 Some(Duration::from_secs(2)),
1137 LineParsingOptions::default(),
1138 )
1139 .await
1140 .unwrap();
1141
1142 assert_that!(&output.stdout).has_length(3);
1143 assert_that!(output.stdout[0].as_str()).is_equal_to("Line 1");
1144 assert_that!(output.stdout[1].as_str()).is_equal_to("Line 2");
1145 assert_that!(output.stdout[2].as_str()).is_equal_to("Line 3");
1146 }
1147
1148 #[tokio::test]
1149 async fn test_shell_command_dispatch() {
1150 let cmd = tokio::process::Command::new("sh");
1151
1152 let mut process = crate::Process::new(cmd).spawn_broadcast().unwrap();
1153
1154 let collector = process
1156 .stdout()
1157 .collect_lines_into_vec(LineParsingOptions::default());
1158
1159 if let Some(stdin) = process.stdin().as_mut() {
1161 stdin
1162 .write_all(b"printf 'Hello from shell\\n'\nexit\n")
1163 .await
1164 .unwrap();
1165 stdin.flush().await.unwrap();
1166 }
1167
1168 tokio::time::sleep(Duration::from_millis(500)).await;
1170
1171 process.stdin().close();
1172 process
1173 .wait_for_completion(Some(Duration::from_secs(1)))
1174 .await
1175 .unwrap();
1176
1177 let collected = collector.wait().await.unwrap();
1178 assert_that!(&collected).has_length(1);
1179 assert_that!(collected[0].as_str()).is_equal_to("Hello from shell");
1180 }
1181
1182 #[tokio::test]
1183 async fn test_into_inner_defuses_panic_guard() {
1184 let mut cmd = tokio::process::Command::new("sleep");
1185 cmd.arg("5");
1186
1187 let process = crate::Process::new(cmd)
1188 .name("sleep")
1189 .spawn_broadcast()
1190 .unwrap();
1191
1192 let (mut child, _stdout, _stderr) = process.into_inner();
1193 child.kill().await.unwrap();
1194 let _status = child.wait().await.unwrap();
1195 }
1196
1197 #[tokio::test]
1198 async fn test_into_inner_with_owned_name_drops_owned_string() {
1199 let mut cmd = tokio::process::Command::new("sleep");
1204 cmd.arg("5");
1205
1206 let process = crate::Process::new(cmd)
1207 .with_name(format!("sleeper-{}", 7))
1208 .spawn_broadcast()
1209 .unwrap();
1210
1211 let (mut child, _stdout, _stderr) = process.into_inner();
1212 child.kill().await.unwrap();
1213 let _status = child.wait().await.unwrap();
1214 }
1215
1216 #[tokio::test]
1217 async fn test_defusing_drop_panic_keeps_cleanup_guard_armed() {
1218 let mut cmd = tokio::process::Command::new("sleep");
1219 cmd.arg("5");
1220
1221 let mut process = crate::Process::new(cmd)
1222 .name("sleep")
1223 .spawn_broadcast()
1224 .unwrap();
1225
1226 assert_that!(process.cleanup_on_drop).is_true();
1227 assert_that!(
1228 process
1229 .panic_on_drop
1230 .as_ref()
1231 .is_some_and(PanicOnDrop::is_armed)
1232 )
1233 .is_true();
1234
1235 process.defuse_drop_panic();
1236
1237 assert_that!(process.cleanup_on_drop).is_true();
1238 assert_that!(&process.panic_on_drop).is_none();
1239
1240 process.kill().await.unwrap();
1241 process.wait_for_completion(None).await.unwrap();
1242 }
1243
1244 #[tokio::test]
1245 async fn test_wait_for_completion_disarms_cleanup_and_panic_guards() {
1246 let mut cmd = tokio::process::Command::new("sleep");
1247 cmd.arg("0.1");
1248
1249 let mut process = crate::Process::new(cmd)
1250 .name("sleep")
1251 .spawn_broadcast()
1252 .unwrap();
1253
1254 process
1255 .wait_for_completion(Some(Duration::from_secs(2)))
1256 .await
1257 .unwrap();
1258
1259 assert_that!(process.cleanup_on_drop).is_false();
1260 assert_that!(&process.panic_on_drop).is_none();
1261 }
1262
1263 #[cfg(unix)]
1264 #[tokio::test]
1265 async fn test_must_not_be_terminated_allows_process_to_survive_handle_drop() {
1266 use nix::errno::Errno;
1267 use nix::sys::signal::{self, Signal};
1268 use nix::sys::wait::waitpid;
1269 use nix::unistd::Pid;
1270
1271 let mut cmd = tokio::process::Command::new("sleep");
1272 cmd.arg("5");
1273
1274 let mut process = crate::Process::new(cmd)
1275 .name("sleep")
1276 .spawn_broadcast()
1277 .unwrap();
1278 let pid = process.id().unwrap();
1279
1280 process.must_not_be_terminated();
1281 assert_that!(process.cleanup_on_drop).is_false();
1282 assert_that!(&process.panic_on_drop).is_none();
1283 drop(process);
1284
1285 let pid = Pid::from_raw(pid.cast_signed());
1286 assert_that!(signal::kill(pid, None).is_ok()).is_true();
1287
1288 signal::kill(pid, Signal::SIGKILL).unwrap();
1289 match waitpid(pid, None) {
1290 Ok(_) | Err(Errno::ECHILD) => {}
1291 Err(err) => panic!("waitpid failed: {err}"),
1292 }
1293 }
1294
1295 #[cfg(unix)]
1296 #[tokio::test]
1297 async fn test_must_not_be_terminated_still_closes_stdin_on_drop() {
1298 use nix::sys::wait::waitpid;
1299 use nix::unistd::Pid;
1300 use tempfile::tempdir;
1301
1302 let temp_dir = tempdir().unwrap();
1303 let output_file = temp_dir.path().join("stdin-result.txt");
1304 let output_file = output_file.to_str().unwrap().replace('\'', "'\"'\"'");
1305
1306 let mut cmd = tokio::process::Command::new("sh");
1307 cmd.arg("-c")
1308 .arg(format!("cat >/dev/null; printf eof > '{output_file}'"));
1309
1310 let mut process = crate::Process::new(cmd)
1311 .name("sh")
1312 .spawn_broadcast()
1313 .unwrap();
1314 let pid = Pid::from_raw(process.id().unwrap().cast_signed());
1315
1316 process.must_not_be_terminated();
1317 drop(process);
1318
1319 tokio::time::timeout(
1320 Duration::from_secs(2),
1321 tokio::task::spawn_blocking(move || waitpid(pid, None)),
1322 )
1323 .await
1324 .unwrap()
1325 .unwrap()
1326 .unwrap();
1327
1328 assert_that!(fs::read_to_string(temp_dir.path().join("stdin-result.txt")).unwrap())
1329 .is_equal_to("eof");
1330 }
1331
1332 #[cfg(unix)]
1333 #[tokio::test]
1334 async fn test_must_not_be_terminated_does_not_keep_stdout_pipe_alive() {
1335 use nix::sys::wait::waitpid;
1336 use nix::unistd::Pid;
1337
1338 let mut cmd = tokio::process::Command::new("yes");
1339 cmd.arg("tick");
1340
1341 let mut process = crate::Process::new(cmd)
1342 .name("yes")
1343 .spawn_broadcast()
1344 .unwrap();
1345 let pid = Pid::from_raw(process.id().unwrap().cast_signed());
1346
1347 process.must_not_be_terminated();
1348 drop(process);
1349
1350 tokio::time::timeout(
1351 Duration::from_secs(2),
1352 tokio::task::spawn_blocking(move || waitpid(pid, None)),
1353 )
1354 .await
1355 .unwrap()
1356 .unwrap()
1357 .unwrap();
1358 }
1359
1360 #[tokio::test]
1361 async fn test_wait_for_completion_with_output_preserves_unterminated_final_line() {
1362 let mut cmd = tokio::process::Command::new("sh");
1363 cmd.arg("-c").arg("printf tail");
1364
1365 let mut process = crate::Process::new(cmd)
1366 .name("sh")
1367 .spawn_broadcast()
1368 .unwrap();
1369
1370 let output = process
1371 .wait_for_completion_with_output(
1372 Some(Duration::from_secs(2)),
1373 LineParsingOptions::default(),
1374 )
1375 .await
1376 .unwrap();
1377
1378 assert_that!(output.status.success()).is_true();
1379 assert_that!(output.stdout).contains_exactly(["tail"]);
1380 assert_that!(output.stderr).is_empty();
1381 }
1382
1383 #[tokio::test]
1384 async fn test_broadcast_wait_for_completion_with_raw_output_preserves_bytes() {
1385 let mut cmd = tokio::process::Command::new("sh");
1386 cmd.arg("-c")
1387 .arg("printf 'out\nraw'; printf 'err\nraw' >&2");
1388
1389 let mut process = crate::Process::new(cmd)
1390 .name("sh")
1391 .spawn_broadcast()
1392 .unwrap();
1393
1394 let output = process
1395 .wait_for_completion_with_raw_output(Some(Duration::from_secs(2)))
1396 .await
1397 .unwrap();
1398
1399 assert_that!(output.status.success()).is_true();
1400 assert_that!(output.stdout).is_equal_to(b"out\nraw".to_vec());
1401 assert_that!(output.stderr).is_equal_to(b"err\nraw".to_vec());
1402 }
1403
1404 #[tokio::test]
1405 async fn test_single_subscriber_wait_for_completion_with_raw_output_preserves_bytes() {
1406 let mut cmd = tokio::process::Command::new("sh");
1407 cmd.arg("-c")
1408 .arg("printf 'out\nraw'; printf 'err\nraw' >&2");
1409
1410 let mut process = crate::Process::new(cmd)
1411 .name("sh")
1412 .spawn_single_subscriber()
1413 .unwrap();
1414
1415 let output = process
1416 .wait_for_completion_with_raw_output(Some(Duration::from_secs(2)))
1417 .await
1418 .unwrap();
1419
1420 assert_that!(output.status.success()).is_true();
1421 assert_that!(output.stdout).is_equal_to(b"out\nraw".to_vec());
1422 assert_that!(output.stderr).is_equal_to(b"err\nraw".to_vec());
1423 }
1424
1425 #[tokio::test]
1426 async fn test_inspect_lines_async_preserves_unterminated_final_line() {
1427 let mut cmd = tokio::process::Command::new("sh");
1428 cmd.arg("-c").arg("printf tail");
1429
1430 let mut process = crate::Process::new(cmd)
1431 .name("sh")
1432 .spawn_broadcast()
1433 .unwrap();
1434
1435 let seen = Arc::new(Mutex::new(Vec::<String>::new()));
1436 let seen_in_task = Arc::clone(&seen);
1437 let inspector = process.stdout().inspect_lines_async(
1438 move |line| {
1439 let seen = Arc::clone(&seen_in_task);
1440 let line = line.into_owned();
1441 async move {
1442 seen.lock().expect("lock").push(line);
1443 Next::Continue
1444 }
1445 },
1446 LineParsingOptions::default(),
1447 );
1448
1449 process
1450 .wait_for_completion(Some(Duration::from_secs(2)))
1451 .await
1452 .unwrap();
1453 inspector.wait().await.unwrap();
1454
1455 let seen = seen.lock().expect("lock").clone();
1456 assert_that!(seen).contains_exactly(["tail"]);
1457 }
1458}