1use std::os::unix::process::CommandExt; use std::process::{Command as StdCommand, ExitStatus, Stdio};
5use std::time::{Duration, Instant};
6use thiserror::Error;
7use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
8use tokio::process::{Child, Command as TokioCommand};
9use tokio::time::sleep_until;
10use tracing::{debug, instrument, warn};
11use nix::sys::signal::{killpg, Signal};
13use nix::unistd::Pid;
14#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct CommandOutput {
21 pub stdout: Vec<u8>,
23 pub stderr: Vec<u8>,
25 pub exit_status: Option<ExitStatus>,
27 pub duration: Duration,
29 pub timed_out: bool,
31}
32
33#[derive(Error, Debug)]
35pub enum CommandError {
36 #[error("Failed to spawn command")]
37 Spawn(#[source] std::io::Error),
38 #[error("Failed to get command stdout")]
39 StdoutPipe,
40 #[error("Failed to get command stderr")]
41 StderrPipe,
42 #[error("I/O error reading command output")]
43 Io(#[source] std::io::Error),
44 #[error("Failed to kill command")]
45 Kill(#[source] std::io::Error),
46 #[error("Failed to wait for command exit")]
47 Wait(#[source] std::io::Error),
48 #[error("Invalid timeout configuration: {0}")]
49 InvalidTimeout(String),
50}
51
52#[derive(Clone, Copy, Debug)]
54struct TimeoutConfig {
55 minimum: Duration,
56 maximum: Duration,
57 activity: Duration,
58 start_time: Instant,
59 absolute_deadline: Instant,
60}
61
62struct CommandExecutionState<R1: AsyncRead + Unpin, R2: AsyncRead + Unpin> {
64 child: Child,
65 stdout_reader: Option<BufReader<R1>>,
66 stderr_reader: Option<BufReader<R2>>,
67 stdout_buffer: Vec<u8>,
68 stderr_buffer: Vec<u8>,
69 stdout_read_buffer: Vec<u8>,
70 stderr_read_buffer: Vec<u8>,
71 current_deadline: Instant,
72 timed_out: bool,
73 exit_status: Option<ExitStatus>,
74}
75
76fn validate_timeouts(min: Duration, max: Duration, activity: Duration) -> Result<(), CommandError> {
80 if min > max {
81 return Err(CommandError::InvalidTimeout(format!(
82 "minimum_timeout ({:?}) cannot be greater than maximum_timeout ({:?})",
83 min, max
84 )));
85 }
86 if activity == Duration::ZERO {
87 return Err(CommandError::InvalidTimeout(
88 "activity_timeout must be positive".to_string(),
89 ));
90 }
91 Ok(())
92}
93
94fn spawn_command_and_setup_state(
97 command: &mut StdCommand,
98 initial_deadline: Instant,
99) -> Result<CommandExecutionState<impl AsyncRead + Unpin, impl AsyncRead + Unpin>, CommandError> {
100 command.stdout(Stdio::piped());
101 command.stderr(Stdio::piped());
102
103 let mut tokio_cmd = TokioCommand::from(std::mem::replace(command, StdCommand::new("")));
105
106 let mut child = tokio_cmd
107 .kill_on_drop(true)
108 .spawn()
109 .map_err(CommandError::Spawn)?;
110
111 debug!(pid = child.id(), "Process spawned successfully");
112
113 let stdout_pipe = child.stdout.take().ok_or(CommandError::StdoutPipe)?;
114 let stderr_pipe = child.stderr.take().ok_or(CommandError::StderrPipe)?;
115
116 debug!(deadline = ?initial_deadline, "Initial deadline set");
117
118 Ok(CommandExecutionState {
119 child,
120 stdout_reader: Some(BufReader::new(stdout_pipe)),
121 stderr_reader: Some(BufReader::new(stderr_pipe)),
122 stdout_buffer: Vec::new(),
123 stderr_buffer: Vec::new(),
124 stdout_read_buffer: Vec::with_capacity(1024),
125 stderr_read_buffer: Vec::with_capacity(1024),
126 current_deadline: initial_deadline,
127 timed_out: false,
128 exit_status: None,
129 })
130}
131
132fn calculate_new_deadline(absolute_deadline: Instant, activity_timeout: Duration) -> Instant {
134 let potential_new_deadline = Instant::now() + activity_timeout;
135 let new_deadline = std::cmp::min(potential_new_deadline, absolute_deadline);
136 debug!(
137 potential = ?potential_new_deadline,
138 absolute = ?absolute_deadline,
139 new = ?new_deadline,
140 "Calculated new deadline based on activity"
141 );
142 new_deadline
143}
144
145#[instrument(level = "debug", skip(current_deadline, timeouts))]
147fn handle_stream_activity(
148 bytes_read: usize,
149 stream_name: &str,
150 current_deadline: &mut Instant,
151 timeouts: &TimeoutConfig,
152) {
153 debug!(
154 bytes = bytes_read,
155 stream = stream_name,
156 "Activity detected"
157 );
158 let new_deadline = calculate_new_deadline(timeouts.absolute_deadline, timeouts.activity);
159
160 if *current_deadline < timeouts.absolute_deadline && new_deadline != *current_deadline {
161 debug!(old = ?*current_deadline, new = ?new_deadline, "Updating deadline");
162 *current_deadline = new_deadline;
163 } else {
164 debug!(deadline = ?*current_deadline, "Deadline remains unchanged (likely at absolute limit or no change)");
165 }
166}
167
168async fn read_stream_chunk<R: AsyncRead + Unpin>(
170 reader: &mut BufReader<R>,
171 buf: &mut Vec<u8>,
172) -> std::io::Result<Option<usize>> {
173 match reader.read_buf(buf).await {
176 Ok(0) => Ok(None), Ok(n) => Ok(Some(n)), Err(e) => Err(e),
179 }
180}
181
182async fn drain_reader<R: AsyncRead + Unpin>(
184 reader_opt: &mut Option<BufReader<R>>,
185 buffer: &mut Vec<u8>,
186 read_buf: &mut Vec<u8>, stream_name: &str,
188) -> Result<(), CommandError> {
189 if let Some(reader) = reader_opt.as_mut() {
190 debug!("Draining remaining output from {}", stream_name);
191 loop {
192 read_buf.clear(); match read_stream_chunk(reader, read_buf).await {
194 Ok(Some(n)) => {
195 if n > 0 {
196 debug!("Drained {} bytes from {}", n, stream_name);
197 buffer.extend_from_slice(&read_buf[..n]);
198 } else {
199 debug!("Drained 0 bytes from {}, treating as EOF.", stream_name);
200 break; }
202 }
203 Ok(None) => {
204 debug!("EOF reached while draining {}", stream_name);
206 break; }
208 Err(e) => {
209 if matches!(
211 e.kind(),
212 std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::ConnectionReset
213 ) {
214 debug!(
215 "{} closed while draining ({}): {}",
216 stream_name,
217 e.kind(),
218 e
219 );
220 } else {
221 warn!("Error draining remaining {} output: {}", stream_name, e);
222 }
224 break; }
226 }
227 }
228 *reader_opt = None;
230 debug!("Finished draining {}", stream_name);
231 }
232 Ok(())
233}
234
235#[instrument(level = "debug", skip(child, timeouts))]
238async fn handle_timeout_event(
239 child: &mut Child,
240 triggered_deadline: Instant,
241 timeouts: &TimeoutConfig,
242) -> Result<Option<ExitStatus>, CommandError> {
243 let now = Instant::now();
244 let elapsed = now.duration_since(timeouts.start_time);
245 debug!(deadline = ?triggered_deadline.duration_since(timeouts.start_time), elapsed = ?elapsed, "Timeout check triggered");
246 let killed_reason;
247
248 if now >= timeouts.absolute_deadline {
249 debug!(timeout=?timeouts.maximum, "Maximum timeout exceeded");
250 killed_reason = "maximum timeout";
251 } else {
252 debug!(timeout=?timeouts.activity, min_duration=?timeouts.minimum, "Activity timeout likely exceeded after minimum duration");
253 killed_reason = "activity timeout";
254 }
255
256 let pid_opt = child.id(); if let Some(pid_u32) = pid_opt {
259 warn!(
260 pid = pid_u32,
261 reason = killed_reason,
262 elapsed = ?elapsed,
263 "Killing process group due to timeout"
264 );
265 let pid = Pid::from_raw(pid_u32 as i32);
267 match killpg(pid, Signal::SIGKILL) {
271 Ok(()) => {
272 debug!(
273 pid = pid_u32,
274 pgid = pid.as_raw(),
275 "Process group kill signal (SIGKILL) sent successfully."
276 );
277 Ok(None)
279 }
280 Err(e) => {
281 if e == nix::errno::Errno::ESRCH {
283 warn!(pid = pid_u32, error = %e, "Failed to kill process group (ESRCH - likely already exited). Checking child status.");
284 match child.try_wait() {
286 Ok(Some(status)) => {
287 debug!(pid = pid_u32, status = %status, "Original child had already exited before kill signal processed");
288 return Ok(Some(status)); }
290 Ok(None) => {
291 debug!(pid = pid_u32, "Original child still running or uncollected after killpg failed (ESRCH).");
292 return Ok(None);
294 }
295 Err(wait_err) => {
296 warn!(pid = pid_u32, error = %wait_err, "Error checking child status after failed killpg (ESRCH)");
297 return Err(CommandError::Wait(wait_err));
298 }
299 }
300 } else {
301 warn!(pid = pid_u32, pgid = pid.as_raw(), error = %e, "Failed to send kill signal to process group.");
303 return Err(CommandError::Kill(std::io::Error::new(
305 std::io::ErrorKind::Other,
306 format!("Failed to kill process group for PID {}: {}", pid_u32, e),
307 )));
308 }
309 }
310 }
311 } else {
312 warn!(
314 "Could not get PID to kill process for timeout. Process might have exited abnormally."
315 );
316 Ok(None)
318 }
319}
320
321async fn run_command_loop(
323 state: &mut CommandExecutionState<impl AsyncRead + Unpin, impl AsyncRead + Unpin>,
324 timeouts: &TimeoutConfig,
325) -> Result<(), CommandError> {
326 loop {
327 let deadline_sleep = sleep_until(state.current_deadline.into());
328 tokio::pin!(deadline_sleep);
329
330 let can_read_stdout = state.stdout_reader.is_some() && state.exit_status.is_none();
332 let can_read_stderr = state.stderr_reader.is_some() && state.exit_status.is_none();
333 let can_check_exit = state.exit_status.is_none();
334 let can_check_timeout = state.exit_status.is_none();
335
336 tokio::select! {
337 biased; result = state.child.wait(), if can_check_exit => {
341 state.exit_status = match result {
342 Ok(status) => {
343 debug!(status = %status, "Process exited naturally");
344 Some(status)
345 },
346 Err(e) => {
347 warn!(error = %e, "Error waiting for process exit");
348 return Err(CommandError::Wait(e));
349 }
350 };
351 break; }
353
354 read_result = async {
356 if let Some(reader) = state.stdout_reader.as_mut() {
357 if state.exit_status.is_none() {
358 read_stream_chunk(reader, &mut state.stdout_read_buffer).await
359 } else { Ok(None) } } else { Ok(None) } }, if can_read_stdout => {
362 match read_result {
363 Ok(Some(n)) => {
364 state.stdout_buffer.extend_from_slice(&state.stdout_read_buffer[..n]);
365 handle_stream_activity(n, "stdout", &mut state.current_deadline, timeouts);
366 }
367 Ok(None) => { if state.stdout_reader.is_some() {
369 debug!("Stdout pipe closed (EOF) or process exited during read.");
370 state.stdout_reader = None; }
372 }
373 Err(e) => {
374 warn!(error = %e, "Error reading stdout");
375 state.stdout_read_buffer.clear(); return Err(CommandError::Io(e));
377 }
378 }
379 state.stdout_read_buffer.clear(); }
381
382 read_result = async {
384 if let Some(reader) = state.stderr_reader.as_mut() {
385 if state.exit_status.is_none() {
386 read_stream_chunk(reader, &mut state.stderr_read_buffer).await
387 } else { Ok(None) } } else { Ok(None) } }, if can_read_stderr => {
390 match read_result {
391 Ok(Some(n)) => {
392 state.stderr_buffer.extend_from_slice(&state.stderr_read_buffer[..n]);
393 handle_stream_activity(n, "stderr", &mut state.current_deadline, timeouts);
394 }
395 Ok(None) => { if state.stderr_reader.is_some() {
397 debug!("Stderr pipe closed (EOF) or process exited during read.");
398 state.stderr_reader = None; }
400 }
401 Err(e) => {
402 warn!(error = %e, "Error reading stderr");
403 state.stderr_read_buffer.clear(); return Err(CommandError::Io(e));
405 }
406 }
407 state.stderr_read_buffer.clear(); }
409
410
411_ = &mut deadline_sleep, if can_check_timeout => {
413 let now = Instant::now();
414 let triggered_deadline = if now >= timeouts.absolute_deadline {
415 debug!("Absolute deadline exceeded. Triggering timeout.");
416 timeouts.absolute_deadline
417 } else {
418 debug!("Activity timeout likely exceeded. Triggering timeout.");
419 state.current_deadline
420 };
421
422 match handle_timeout_event(&mut state.child, triggered_deadline, timeouts).await {
423 Ok(Some(status)) => {
424 debug!("Timeout detected but process already exited.");
425 state.exit_status = Some(status);
426 state.timed_out = false; }
428 Ok(None) => {
429 state.timed_out = true; }
431 Err(e) => {
432 return Err(e); }
434 }
435 break; }
437}
438} Ok(())
441}
442
443async fn finalize_exit_status(
445 child: &mut Child,
446 current_status: Option<ExitStatus>,
447 timed_out: bool,
448) -> Result<Option<ExitStatus>, CommandError> {
449 if timed_out && current_status.is_none() {
450 debug!(
451 pid = child.id(),
452 "Waiting for process to exit after kill signal..."
453 );
454 match child.wait().await {
455 Ok(status) => {
456 debug!(pid = child.id(), status = %status, "Process exited after kill");
457 Ok(Some(status))
458 }
459 Err(e) => {
460 warn!(pid = child.id(), error = %e, "Error waiting for process exit after kill. Proceeding without status.");
461 Ok(None) }
463 }
464 } else {
465 Ok(current_status) }
467}
468
469#[instrument(skip(command), fields(command = ?command.get_program(), args = ?command.get_args()))]
475pub async fn run_command_with_timeout(
476 mut command: StdCommand,
477 minimum_timeout: Duration,
478 maximum_timeout: Duration,
479 activity_timeout: Duration,
480) -> Result<CommandOutput, CommandError> {
481 validate_timeouts(minimum_timeout, maximum_timeout, activity_timeout)?;
482
483 let start_time = Instant::now();
484 let absolute_deadline = start_time + maximum_timeout;
485 let initial_deadline = std::cmp::min(
486 absolute_deadline,
487 start_time + std::cmp::max(minimum_timeout, activity_timeout),
488 );
489
490 let timeout_config = TimeoutConfig {
491 minimum: minimum_timeout,
492 maximum: maximum_timeout,
493 activity: activity_timeout,
494 start_time,
495 absolute_deadline,
496 };
497
498 let mut std_cmd = std::mem::replace(&mut command, StdCommand::new("")); unsafe {
503 std_cmd.pre_exec(|| {
504 if libc::setpgid(0, 0) == 0 {
507 Ok(())
508 } else {
509 Err(std::io::Error::last_os_error())
511 }
512 });
513 }
514 command = std_cmd;
516
517 let mut state = spawn_command_and_setup_state(&mut command, initial_deadline)?;
519
520 run_command_loop(&mut state, &timeout_config).await?;
522
523 debug!("Command loop finished. Draining remaining output streams.");
525 drain_reader(
526 &mut state.stdout_reader,
527 &mut state.stdout_buffer,
528 &mut state.stdout_read_buffer,
529 "stdout",
530 )
531 .await?;
532 drain_reader(
533 &mut state.stderr_reader,
534 &mut state.stderr_buffer,
535 &mut state.stderr_read_buffer,
536 "stderr",
537 )
538 .await?;
539
540 let final_exit_status = finalize_exit_status(
542 &mut state.child,
543 state.exit_status, state.timed_out,
545 )
546 .await?;
547
548 let end_time = Instant::now();
549 let duration = end_time.duration_since(start_time);
550
551 debug!(
552 duration = ?duration,
553 exit_status = ?final_exit_status,
554 timed_out = state.timed_out,
555 stdout_len = state.stdout_buffer.len(),
556 stderr_len = state.stderr_buffer.len(),
557 "Command execution finished."
558 );
559
560 Ok(CommandOutput {
561 stdout: state.stdout_buffer,
562 stderr: state.stderr_buffer,
563 exit_status: final_exit_status,
564 duration,
565 timed_out: state.timed_out,
566 })
567}
568
569#[cfg(test)]
571mod tests {
572 use super::*;
573 use libc;
574 use std::os::unix::process::ExitStatusExt; use tokio::runtime::Runtime;
576 use tracing_subscriber::{fmt, EnvFilter}; fn setup_tracing() {
580 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
582 fmt()
583 .with_env_filter(filter)
584 .with_test_writer()
585 .try_init()
586 .ok(); }
588
589 fn run_async_test<F, Fut>(test_fn: F)
591 where
592 F: FnOnce() -> Fut,
593 Fut: std::future::Future<Output = ()>,
594 {
595 setup_tracing();
596 let rt = Runtime::new().unwrap();
597 rt.block_on(test_fn());
598 }
599
600 #[test]
601 fn test_command_runs_successfully_within_timeouts() {
602 run_async_test(|| async {
603 let mut cmd = StdCommand::new("sh");
604 cmd.arg("-c")
605 .arg("echo 'Hello'; sleep 0.1; echo 'World' >&2");
606
607 let min_timeout = Duration::from_millis(50);
608 let max_timeout = Duration::from_secs(2);
609 let activity_timeout = Duration::from_secs(1);
610
611 let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
612 .await
613 .expect("Command failed unexpectedly");
614
615 assert_eq!(result.stdout, b"Hello\n");
616 assert_eq!(result.stderr, b"World\n");
617 assert!(result.exit_status.is_some(), "Exit status should be Some");
618 assert_eq!(
619 result.exit_status.unwrap().code(),
620 Some(0),
621 "Exit code should be 0"
622 );
623 assert!(!result.timed_out, "Should not have timed out");
624 assert!(
625 result.duration >= Duration::from_millis(100),
626 "Duration should be >= 100ms"
627 );
628 assert!(
629 result.duration < max_timeout,
630 "Duration should be < max_timeout"
631 );
632 });
633 }
634
635 #[test]
636 fn test_command_exits_quickly_before_min_timeout() {
637 run_async_test(|| async {
638 let mut cmd = StdCommand::new("echo");
639 cmd.arg("Immediate exit");
640
641 let min_timeout = Duration::from_secs(2); let max_timeout = Duration::from_secs(5);
643 let activity_timeout = Duration::from_secs(1);
644
645 let start = Instant::now();
646 let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
647 .await
648 .expect("Command failed unexpectedly");
649
650 let duration = start.elapsed();
651
652 assert_eq!(result.stdout, b"Immediate exit\n");
653 assert!(result.stderr.is_empty(), "Stderr should be empty");
654 assert!(result.exit_status.is_some(), "Exit status should be Some");
655 assert_eq!(
656 result.exit_status.unwrap().code(),
657 Some(0),
658 "Exit code should be 0"
659 );
660 assert!(!result.timed_out, "Should not have timed out");
661 assert!(
662 duration < Duration::from_millis(500),
663 "Test duration should be short"
664 );
665 assert!(
666 result.duration < Duration::from_millis(500),
667 "Reported duration should be short"
668 );
669 });
670 }
671
672 #[test]
673 fn test_maximum_timeout_kills_long_running_command() {
674 run_async_test(|| async {
675 let mut cmd = StdCommand::new("sleep");
676 cmd.arg("5"); let min_timeout = Duration::from_millis(100);
679 let max_timeout = Duration::from_secs(1); let activity_timeout = Duration::from_secs(10); let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
683 .await
684 .expect("Command failed unexpectedly");
685
686 assert!(result.stdout.is_empty(), "Stdout should be empty");
687 assert!(result.stderr.is_empty(), "Stderr should be empty");
688 assert!(
689 result.exit_status.is_some(),
690 "Exit status should be Some after kill"
691 );
692 assert_eq!(
694 result.exit_status.unwrap().signal(),
695 Some(libc::SIGKILL as i32),
696 "Should be killed by SIGKILL"
697 );
698 assert!(result.timed_out, "Should have timed out");
699 assert!(
700 result.duration >= max_timeout,
701 "Duration should be >= max_timeout"
702 );
703 assert!(
705 result.duration < max_timeout + Duration::from_millis(750),
706 "Duration allow buffer"
707 );
708 });
709 }
710
711 #[test]
712 fn test_activity_timeout_kills_idle_command_after_min_timeout() {
713 run_async_test(|| async {
714 let mut cmd = StdCommand::new("sh");
715 cmd.arg("-c")
716 .arg("echo 'Initial output'; sleep 5; echo 'This should not appear'");
717
718 let min_timeout = Duration::from_millis(200);
719 let max_timeout = Duration::from_secs(10);
720 let activity_timeout = Duration::from_secs(1); let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
723 .await
724 .expect("Command failed unexpectedly");
725
726 assert_eq!(result.stdout, b"Initial output\n");
727 assert!(result.stderr.is_empty(), "Stderr should be empty");
728 assert!(
729 result.exit_status.is_some(),
730 "Exit status should be Some after kill"
731 );
732 assert_eq!(
734 result.exit_status.unwrap().signal(),
735 Some(libc::SIGKILL as i32),
736 "Should be killed by SIGKILL"
737 );
738 assert!(result.timed_out, "Should have timed out");
739
740 assert!(
743 result.duration >= min_timeout,
744 "Duration ({:?}) should be >= min_timeout ({:?})",
745 result.duration,
746 min_timeout
747 );
748
749 let lower_bound = activity_timeout; let upper_bound = activity_timeout + Duration::from_millis(750); assert!(
753 result.duration >= lower_bound,
754 "Duration ({:?}) should be >= activity_timeout ({:?})",
755 result.duration,
756 lower_bound
757 );
758 assert!(
759 result.duration < upper_bound,
760 "Duration ({:?}) should be < activity_timeout plus buffer ({:?})",
761 result.duration,
762 upper_bound
763 );
764
765 assert!(
767 result.duration < Duration::from_secs(5),
768 "Should be killed before sleep 5 ends"
769 );
770 });
771 }
772
773 #[test]
774 fn test_activity_resets_timeout_allowing_completion() {
775 run_async_test(|| async {
776 let mut cmd = StdCommand::new("sh");
777 cmd.arg("-c")
778 .arg("echo '1'; sleep 0.5; echo '2' >&2; sleep 0.5; echo '3'; sleep 0.5; echo '4'");
779
780 let min_timeout = Duration::from_millis(100);
781 let max_timeout = Duration::from_secs(5);
782 let activity_timeout = Duration::from_secs(1); let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
785 .await
786 .expect("Command failed unexpectedly");
787
788 assert_eq!(result.stdout, b"1\n3\n4\n");
789 assert_eq!(result.stderr, b"2\n");
790 assert!(result.exit_status.is_some(), "Exit status should be Some");
791 assert_eq!(
792 result.exit_status.unwrap().code(),
793 Some(0),
794 "Exit code should be 0"
795 );
796 assert!(!result.timed_out, "Should not have timed out");
797 assert!(
798 result.duration > Duration::from_secs(1),
799 "Duration should be > 1s (actual ~1.5s)"
800 );
801 assert!(
802 result.duration < max_timeout,
803 "Duration should be < max_timeout"
804 );
805 });
806 }
807
808 #[test]
809 fn test_binary_output_is_handled() {
810 run_async_test(|| async {
811 let mut cmd = StdCommand::new("head");
812 cmd.arg("-c").arg("50").arg("/dev/urandom");
813
814 let min_timeout = Duration::from_millis(50);
815 let max_timeout = Duration::from_secs(2);
816 let activity_timeout = Duration::from_secs(1);
817
818 let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
819 .await
820 .expect("Command failed unexpectedly");
821
822 assert_eq!(result.stdout.len(), 50, "Stdout length should be 50");
823 assert!(result.stderr.is_empty(), "Stderr should be empty");
824 assert!(result.exit_status.is_some(), "Exit status should be Some");
825 assert_eq!(
826 result.exit_status.unwrap().code(),
827 Some(0),
828 "Exit code should be 0"
829 );
830 assert!(!result.timed_out, "Should not have timed out");
831 });
832 }
833
834 #[test]
835 fn test_command_not_found() {
836 run_async_test(|| async {
837 let cmd = StdCommand::new("a_command_that_does_not_exist_hopefully"); let min_timeout = Duration::from_millis(50);
840 let max_timeout = Duration::from_secs(2);
841 let activity_timeout = Duration::from_secs(1);
842
843 let result =
844 run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout).await;
845
846 assert!(result.is_err(), "Should return error");
847 match result.err().unwrap() {
848 CommandError::Spawn(e) => {
849 assert_eq!(
850 e.kind(),
851 std::io::ErrorKind::NotFound,
852 "Error kind should be NotFound"
853 );
854 }
855 e => panic!("Expected CommandError::Spawn, got {:?}", e),
856 }
857 });
858 }
859
860 #[test]
861 fn test_min_timeout_greater_than_max_timeout() {
862 run_async_test(|| async {
863 let cmd = StdCommand::new("echo"); let min_timeout = Duration::from_secs(2);
867 let max_timeout = Duration::from_secs(1); let activity_timeout = Duration::from_secs(1);
869
870 let result =
871 run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout).await;
872
873 assert!(result.is_err(), "Should return error");
874 match result.err().unwrap() {
875 CommandError::InvalidTimeout(_) => {} e => panic!("Expected CommandError::InvalidTimeout, got {:?}", e),
877 }
878 });
879 }
880
881 #[test]
882 fn test_zero_activity_timeout() {
883 run_async_test(|| async {
884 let cmd = StdCommand::new("echo"); let min_timeout = Duration::from_millis(100);
888 let max_timeout = Duration::from_secs(1);
889 let activity_timeout = Duration::ZERO; let result =
892 run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout).await;
893
894 assert!(result.is_err(), "Should return error");
895 match result.err().unwrap() {
896 CommandError::InvalidTimeout(_) => {} e => panic!("Expected CommandError::InvalidTimeout, got {:?}", e),
898 }
899 });
900 }
901
902 #[test]
903 fn test_process_exits_with_error_code() {
904 run_async_test(|| async {
905 let mut cmd = StdCommand::new("sh");
906 cmd.arg("-c").arg("echo 'Error message' >&2; exit 55");
907
908 let min_timeout = Duration::from_millis(50);
909 let max_timeout = Duration::from_secs(2);
910 let activity_timeout = Duration::from_secs(1);
911
912 let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
913 .await
914 .expect("Command failed unexpectedly");
915
916 assert!(result.stdout.is_empty(), "Stdout should be empty");
917 assert_eq!(result.stderr, b"Error message\n");
918 assert!(result.exit_status.is_some(), "Exit status should be Some");
919 assert_eq!(
920 result.exit_status.unwrap().code(),
921 Some(55),
922 "Exit code should be 55"
923 );
924 assert!(!result.timed_out, "Should not have timed out");
925 });
926 }
927
928 #[test]
929 fn test_continuous_output_does_not_timeout() {
930 run_async_test(|| async {
931 let mut cmd = StdCommand::new("sh");
932 cmd.arg("-c")
934 .arg("i=0; while [ $i -lt 20 ]; do echo $i; i=$((i+1)); sleep 0.1; done");
935
936 let min_timeout = Duration::from_millis(50);
937 let max_timeout = Duration::from_secs(10);
938 let activity_timeout = Duration::from_millis(500); let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
941 .await
942 .expect("Command failed unexpectedly");
943
944 assert!(!result.stdout.is_empty(), "Stdout should not be empty");
945 assert!(result.stderr.is_empty(), "Stderr should be empty");
946 assert!(result.exit_status.is_some(), "Exit status should be Some");
947 assert_eq!(
948 result.exit_status.unwrap().code(),
949 Some(0),
950 "Exit code should be 0"
951 );
952 assert!(!result.timed_out, "Should not have timed out");
953 assert!(
954 result.duration > Duration::from_secs(2),
955 "Duration should be > 2s"
956 ); assert!(
958 result.duration < Duration::from_secs(3),
959 "Duration should be < 3s"
960 );
961 });
962 }
963
964 #[test]
965 fn test_timeout_immediately_if_min_timeout_is_zero_and_no_activity() {
966 run_async_test(|| async {
967 let mut cmd = StdCommand::new("sleep");
968 cmd.arg("5");
969
970 let min_timeout = Duration::ZERO; let max_timeout = Duration::from_secs(10);
972 let activity_timeout = Duration::from_millis(100); let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
975 .await
976 .expect("Command failed unexpectedly");
977
978 assert!(result.stdout.is_empty(), "Stdout should be empty");
979 assert!(result.stderr.is_empty(), "Stderr should be empty");
980 assert!(
981 result.exit_status.is_some(),
982 "Exit status should be Some after kill"
983 );
984 assert_eq!(
986 result.exit_status.unwrap().signal(),
987 Some(libc::SIGKILL as i32),
988 "Should be killed by SIGKILL"
989 );
990 assert!(result.timed_out, "Should have timed out");
991 assert!(
993 result.duration >= activity_timeout,
994 "Duration should be >= activity_timeout"
995 );
996 assert!(
998 result.duration < activity_timeout + Duration::from_millis(750),
999 "Duration allow buffer"
1000 );
1001 });
1002 }
1003
1004 #[test]
1006fn test_calculate_new_deadline_absolute_deadline_passed() {
1007 let absolute_deadline = Instant::now() - Duration::from_secs(1); let activity_timeout = Duration::from_secs(5);
1009
1010 let new_deadline = calculate_new_deadline(absolute_deadline, activity_timeout);
1011
1012 assert_eq!(
1013 new_deadline, absolute_deadline,
1014 "New deadline should be the absolute deadline when it has already passed"
1015 );
1016}
1017
1018#[test]
1019fn test_calculate_new_deadline_activity_timeout_before_absolute_deadline() {
1020 let absolute_deadline = Instant::now() + Duration::from_secs(10);
1021 let activity_timeout = Duration::from_secs(5);
1022
1023 let new_deadline = calculate_new_deadline(absolute_deadline, activity_timeout);
1024
1025 assert!(
1026 new_deadline <= absolute_deadline,
1027 "New deadline should not exceed the absolute deadline"
1028 );
1029 assert!(
1030 new_deadline > Instant::now(),
1031 "New deadline should be in the future"
1032 );
1033}
1034 #[test]
1036fn test_handle_stream_activity_updates_deadline() {
1037 let mut current_deadline = Instant::now() + Duration::from_secs(5);
1038 let timeouts = TimeoutConfig {
1039 minimum: Duration::from_secs(1),
1040 maximum: Duration::from_secs(10),
1041 activity: Duration::from_secs(3),
1042 start_time: Instant::now(),
1043 absolute_deadline: Instant::now() + Duration::from_secs(10),
1044 };
1045
1046 handle_stream_activity(10, "stdout", &mut current_deadline, &timeouts);
1047
1048 assert!(
1049 current_deadline > Instant::now(),
1050 "Current deadline should be updated to a future time"
1051 );
1052 assert!(
1053 current_deadline <= timeouts.absolute_deadline,
1054 "Current deadline should not exceed the absolute deadline"
1055 );
1056}
1057
1058#[test]
1059fn test_handle_stream_activity_no_update_at_absolute_limit() {
1060 let absolute_deadline = Instant::now() + Duration::from_secs(5);
1061 let mut current_deadline = absolute_deadline; let timeouts = TimeoutConfig {
1063 minimum: Duration::from_secs(1),
1064 maximum: Duration::from_secs(10),
1065 activity: Duration::from_secs(3),
1066 start_time: Instant::now(),
1067 absolute_deadline,
1068 };
1069
1070 handle_stream_activity(10, "stderr", &mut current_deadline, &timeouts);
1071
1072 assert_eq!(
1073 current_deadline, absolute_deadline,
1074 "Current deadline should remain unchanged when at the absolute limit"
1075 );
1076}
1077
1078 #[test]
1080fn test_run_command_loop_exits_on_process_finish() {
1081 run_async_test(|| async {
1082 let mut cmd = StdCommand::new("echo");
1083 cmd.arg("Test");
1084
1085 let timeouts = TimeoutConfig {
1086 minimum: Duration::from_secs(1),
1087 maximum: Duration::from_secs(5),
1088 activity: Duration::from_secs(2),
1089 start_time: Instant::now(),
1090 absolute_deadline: Instant::now() + Duration::from_secs(5),
1091 };
1092
1093 let mut state = spawn_command_and_setup_state(&mut cmd, timeouts.absolute_deadline)
1094 .expect("Failed to spawn command");
1095
1096 let result = run_command_loop(&mut state, &timeouts).await;
1097
1098 assert!(result.is_ok(), "Command loop should exit without errors");
1099 assert!(
1100 state.exit_status.is_some(),
1101 "Exit status should be set when process finishes naturally"
1102 );
1103 });
1104}
1105
1106#[test]
1107fn test_run_command_loop_exits_on_timeout() {
1108 run_async_test(|| async {
1109 let mut cmd = StdCommand::new("sleep");
1110 cmd.arg("5");
1111
1112 let timeouts = TimeoutConfig {
1113 minimum: Duration::from_secs(1),
1114 maximum: Duration::from_secs(2), activity: Duration::from_secs(10),
1116 start_time: Instant::now(),
1117 absolute_deadline: Instant::now() + Duration::from_secs(2),
1118 };
1119
1120 let mut state = spawn_command_and_setup_state(&mut cmd, timeouts.absolute_deadline)
1121 .expect("Failed to spawn command");
1122
1123 let result = run_command_loop(&mut state, &timeouts).await;
1124
1125 assert!(result.is_ok(), "Command loop should exit without errors");
1126 assert!(
1127 state.exit_status.is_none(),
1128 "Exit status should be None when process is killed due to timeout"
1129 );
1130 assert!(state.timed_out, "State should indicate that the process timed out");
1131 });
1132}
1133
1134
1135#[test]
1136fn test_absolute_deadline_kills_infinite_loop_command() {
1137 run_async_test(|| async {
1138 let mut cmd = StdCommand::new("sh");
1139 cmd.arg("-c").arg("while true; do :; done"); let min_timeout = Duration::from_secs(1);
1142 let max_timeout = Duration::from_secs(2); let activity_timeout = Duration::from_secs(10); let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
1146 .await
1147 .expect("Command failed unexpectedly");
1148
1149 assert!(result.stdout.is_empty(), "Stdout should be empty");
1150 assert!(result.stderr.is_empty(), "Stderr should be empty");
1151 assert!(
1152 result.exit_status.is_some(),
1153 "Exit status should be Some after kill"
1154 );
1155 assert_eq!(
1157 result.exit_status.unwrap().signal(),
1158 Some(libc::SIGKILL as i32),
1159 "Should be killed by SIGKILL"
1160 );
1161 assert!(result.timed_out, "Should have timed out");
1162 assert!(
1163 result.duration >= max_timeout,
1164 "Duration should be >= max_timeout"
1165 );
1166 assert!(
1167 result.duration < max_timeout + Duration::from_millis(750),
1168 "Duration should allow a small buffer for process group kill and reaping"
1169 );
1170 });
1171}
1172
1173#[test]
1174fn test_infinite_output_command() {
1175 run_async_test(|| async {
1176 let mut cmd = StdCommand::new("yes");
1177 cmd.arg("infinite");
1178
1179 let min_timeout = Duration::from_secs(1);
1180 let max_timeout = Duration::from_secs(2); let activity_timeout = Duration::from_secs(1); let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
1184 .await
1185 .expect("Command failed unexpectedly");
1186
1187 assert!(
1188 !result.stdout.is_empty(),
1189 "Stdout should not be empty for infinite output"
1190 );
1191 assert!(
1192 result.stderr.is_empty(),
1193 "Stderr should be empty for the `yes` command"
1194 );
1195 assert!(
1196 result.exit_status.is_some(),
1197 "Exit status should be Some after timeout"
1198 );
1199 assert_eq!(
1201 result.exit_status.unwrap().signal(),
1202 Some(libc::SIGKILL as i32),
1203 "Should be killed by SIGKILL"
1204 );
1205 assert!(result.timed_out, "Should have timed out");
1206 assert!(
1207 result.duration >= max_timeout,
1208 "Duration should be >= max_timeout"
1209 );
1210 assert!(
1211 result.duration < max_timeout + Duration::from_millis(750),
1212 "Duration should allow a small buffer for process group kill and reaping"
1213 );
1214 });
1215}
1216
1217
1218}