command_timeout/
lib.rs

1// src/lib.rs
2
3use std::os::unix::process::CommandExt; // For pre_exec
4use std::process::{Command as StdCommand, ExitStatus, Stdio};
5use std::time::{Duration, Instant};
6use thiserror::Error;
7use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
8use tokio::process::{Child, Command as TokioCommand};
9use tokio::time::sleep_until;
10use tracing::{debug, instrument, warn};
11// --- Add nix imports ---
12use nix::sys::signal::{killpg, Signal};
13use nix::unistd::Pid;
14// --- End add ---
15
16// --- Structs and Enums ---
17
18/// Represents the output of a command executed with `run_command_with_timeout`.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct CommandOutput {
21    /// The data captured from the command's standard output (stdout).
22    pub stdout: Vec<u8>,
23    /// The data captured from the command's standard error (stderr).
24    pub stderr: Vec<u8>,
25    /// The exit status of the command. `None` if the command was killed due to a timeout or if waiting failed after kill.
26    pub exit_status: Option<ExitStatus>,
27    /// The total time the command ran or was allowed to run before being terminated.
28    pub duration: Duration,
29    /// Indicates whether the command was terminated due to exceeding a timeout condition.
30    pub timed_out: bool,
31}
32
33/// Errors that can occur during command execution.
34#[derive(Error, Debug)]
35pub enum CommandError {
36    #[error("Failed to spawn command")]
37    Spawn(#[source] std::io::Error),
38    #[error("Failed to get command stdout")]
39    StdoutPipe,
40    #[error("Failed to get command stderr")]
41    StderrPipe,
42    #[error("I/O error reading command output")]
43    Io(#[source] std::io::Error),
44    #[error("Failed to kill command")]
45    Kill(#[source] std::io::Error),
46    #[error("Failed to wait for command exit")]
47    Wait(#[source] std::io::Error),
48    #[error("Invalid timeout configuration: {0}")]
49    InvalidTimeout(String),
50}
51
52/// Configuration for command timeouts.
53#[derive(Clone, Copy, Debug)]
54struct TimeoutConfig {
55    minimum: Duration,
56    maximum: Duration,
57    activity: Duration,
58    start_time: Instant,
59    absolute_deadline: Instant,
60}
61
62/// Holds the state during command execution.
63struct CommandExecutionState<R1: AsyncRead + Unpin, R2: AsyncRead + Unpin> {
64    child: Child,
65    stdout_reader: Option<BufReader<R1>>,
66    stderr_reader: Option<BufReader<R2>>,
67    stdout_buffer: Vec<u8>,
68    stderr_buffer: Vec<u8>,
69    stdout_read_buffer: Vec<u8>,
70    stderr_read_buffer: Vec<u8>,
71    current_deadline: Instant,
72    timed_out: bool,
73    exit_status: Option<ExitStatus>,
74}
75
76// --- Helper Functions (Definitions Before Use) ---
77
78/// Validates the timeout durations.
79fn validate_timeouts(min: Duration, max: Duration, activity: Duration) -> Result<(), CommandError> {
80    if min > max {
81        return Err(CommandError::InvalidTimeout(format!(
82            "minimum_timeout ({:?}) cannot be greater than maximum_timeout ({:?})",
83            min, max
84        )));
85    }
86    if activity == Duration::ZERO {
87        return Err(CommandError::InvalidTimeout(
88            "activity_timeout must be positive".to_string(),
89        ));
90    }
91    Ok(())
92}
93
94/// Spawns the command, sets up pipes, and initializes the execution state.
95/// Note: The command passed in should already have pre_exec configured if needed.
96fn spawn_command_and_setup_state(
97    command: &mut StdCommand,
98    initial_deadline: Instant,
99) -> Result<CommandExecutionState<impl AsyncRead + Unpin, impl AsyncRead + Unpin>, CommandError> {
100    command.stdout(Stdio::piped());
101    command.stderr(Stdio::piped());
102
103    // Command already has pre_exec set by the caller function
104    let mut tokio_cmd = TokioCommand::from(std::mem::replace(command, StdCommand::new("")));
105
106    let mut child = tokio_cmd
107        .kill_on_drop(true)
108        .spawn()
109        .map_err(CommandError::Spawn)?;
110
111    debug!(pid = child.id(), "Process spawned successfully");
112
113    let stdout_pipe = child.stdout.take().ok_or(CommandError::StdoutPipe)?;
114    let stderr_pipe = child.stderr.take().ok_or(CommandError::StderrPipe)?;
115
116    debug!(deadline = ?initial_deadline, "Initial deadline set");
117
118    Ok(CommandExecutionState {
119        child,
120        stdout_reader: Some(BufReader::new(stdout_pipe)),
121        stderr_reader: Some(BufReader::new(stderr_pipe)),
122        stdout_buffer: Vec::new(),
123        stderr_buffer: Vec::new(),
124        stdout_read_buffer: Vec::with_capacity(1024),
125        stderr_read_buffer: Vec::with_capacity(1024),
126        current_deadline: initial_deadline,
127        timed_out: false,
128        exit_status: None,
129    })
130}
131
132/// Calculates the next deadline based on activity, capped by the absolute deadline.
133fn calculate_new_deadline(absolute_deadline: Instant, activity_timeout: Duration) -> Instant {
134    let potential_new_deadline = Instant::now() + activity_timeout;
135    let new_deadline = std::cmp::min(potential_new_deadline, absolute_deadline);
136    debug!(
137        potential = ?potential_new_deadline,
138        absolute = ?absolute_deadline,
139        new = ?new_deadline,
140        "Calculated new deadline based on activity"
141    );
142    new_deadline
143}
144
145/// Updates the current deadline based on detected activity.
146#[instrument(level = "debug", skip(current_deadline, timeouts))]
147fn handle_stream_activity(
148    bytes_read: usize,
149    stream_name: &str,
150    current_deadline: &mut Instant,
151    timeouts: &TimeoutConfig,
152) {
153    debug!(
154        bytes = bytes_read,
155        stream = stream_name,
156        "Activity detected"
157    );
158    let new_deadline = calculate_new_deadline(timeouts.absolute_deadline, timeouts.activity);
159    
160    if *current_deadline < timeouts.absolute_deadline && new_deadline != *current_deadline {
161        debug!(old = ?*current_deadline, new = ?new_deadline, "Updating deadline");
162        *current_deadline = new_deadline;
163    } else {
164        debug!(deadline = ?*current_deadline, "Deadline remains unchanged (likely at absolute limit or no change)");
165    }
166}
167
168/// Reads a chunk from the stream using read_buf.
169async fn read_stream_chunk<R: AsyncRead + Unpin>(
170    reader: &mut BufReader<R>,
171    buf: &mut Vec<u8>,
172) -> std::io::Result<Option<usize>> {
173    // read_buf appends to the vector's initialized part.
174    // Caller MUST clear the buffer afterwards if reusing it.
175    match reader.read_buf(buf).await {
176        Ok(0) => Ok(None),    // EOF
177        Ok(n) => Ok(Some(n)), // Read n bytes
178        Err(e) => Err(e),
179    }
180}
181
182/// Drains remaining data from an optional reader into a buffer.
183async fn drain_reader<R: AsyncRead + Unpin>(
184    reader_opt: &mut Option<BufReader<R>>,
185    buffer: &mut Vec<u8>,
186    read_buf: &mut Vec<u8>, // Use the per-stream read buffer
187    stream_name: &str,
188) -> Result<(), CommandError> {
189    if let Some(reader) = reader_opt.as_mut() {
190        debug!("Draining remaining output from {}", stream_name);
191        loop {
192            read_buf.clear(); // Clear temporary buffer before reading new chunk
193            match read_stream_chunk(reader, read_buf).await {
194                Ok(Some(n)) => {
195                    if n > 0 {
196                        debug!("Drained {} bytes from {}", n, stream_name);
197                        buffer.extend_from_slice(&read_buf[..n]);
198                    } else {
199                        debug!("Drained 0 bytes from {}, treating as EOF.", stream_name);
200                        break; // Should be caught by Ok(None) but handles defensively
201                    }
202                }
203                Ok(None) => {
204                    // EOF
205                    debug!("EOF reached while draining {}", stream_name);
206                    break; // Finished draining
207                }
208                Err(e) => {
209                    // Ignore expected errors after process exit, log others
210                    if matches!(
211                        e.kind(),
212                        std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::ConnectionReset
213                    ) {
214                        debug!(
215                            "{} closed while draining ({}): {}",
216                            stream_name,
217                            e.kind(),
218                            e
219                        );
220                    } else {
221                        warn!("Error draining remaining {} output: {}", stream_name, e);
222                        // Don't return error, just stop draining and report what was gathered
223                    }
224                    break; // Stop draining on error
225                }
226            }
227        }
228        // Mark as drained by removing the reader
229        *reader_opt = None;
230        debug!("Finished draining {}", stream_name);
231    }
232    Ok(())
233}
234
235/// Handles the timeout event: logs, attempts to kill process group, checks for immediate exit.
236/// Returns Ok(Some(status)) if process exited before kill, Ok(None) if kill attempted/succeeded, Err on failure.
237#[instrument(level = "debug", skip(child, timeouts))]
238async fn handle_timeout_event(
239    child: &mut Child,
240    triggered_deadline: Instant,
241    timeouts: &TimeoutConfig,
242) -> Result<Option<ExitStatus>, CommandError> {
243    let now = Instant::now();
244    let elapsed = now.duration_since(timeouts.start_time);
245    debug!(deadline = ?triggered_deadline.duration_since(timeouts.start_time), elapsed = ?elapsed, "Timeout check triggered");
246    let killed_reason;
247
248    if now >= timeouts.absolute_deadline {
249        debug!(timeout=?timeouts.maximum, "Maximum timeout exceeded");
250        killed_reason = "maximum timeout";
251    } else {
252        debug!(timeout=?timeouts.activity, min_duration=?timeouts.minimum, "Activity timeout likely exceeded after minimum duration");
253        killed_reason = "activity timeout";
254    }
255
256    let pid_opt = child.id(); // Get the PID (u32) of the direct child
257
258    if let Some(pid_u32) = pid_opt {
259        warn!(
260            pid = pid_u32,
261            reason = killed_reason,
262            elapsed = ?elapsed,
263            "Killing process group due to timeout"
264        );
265        // Convert u32 PID to nix's Pid type (i32)
266        let pid = Pid::from_raw(pid_u32 as i32);
267        // Send SIGKILL to the entire process group.
268        // killpg takes the PID of any process in the group (usually the leader)
269        // and signals the entire group associated with that process.
270        match killpg(pid, Signal::SIGKILL) {
271            Ok(()) => {
272                debug!(
273                    pid = pid_u32,
274                    pgid = pid.as_raw(),
275                    "Process group kill signal (SIGKILL) sent successfully."
276                );
277                // Signal sent, the final wait() in finalize_exit_status will reap the original child.
278                Ok(None)
279            }
280            Err(e) => {
281                // ESRCH means the process group doesn't exist (likely all processes exited quickly)
282                if e == nix::errno::Errno::ESRCH {
283                    warn!(pid = pid_u32, error = %e, "Failed to kill process group (ESRCH - likely already exited). Checking child status.");
284                    // Check if the *original* child process exited
285                    match child.try_wait() {
286                        Ok(Some(status)) => {
287                            debug!(pid = pid_u32, status = %status, "Original child had already exited before kill signal processed");
288                            return Ok(Some(status)); // Treat as natural exit
289                        }
290                        Ok(None) => {
291                            debug!(pid = pid_u32, "Original child still running or uncollected after killpg failed (ESRCH).");
292                            // Proceed as if timeout kill was attempted.
293                            return Ok(None);
294                        }
295                        Err(wait_err) => {
296                            warn!(pid = pid_u32, error = %wait_err, "Error checking child status after failed killpg (ESRCH)");
297                            return Err(CommandError::Wait(wait_err));
298                        }
299                    }
300                } else {
301                    // Another error occurred during killpg (e.g., permissions EPERM)
302                    warn!(pid = pid_u32, pgid = pid.as_raw(), error = %e, "Failed to send kill signal to process group.");
303                    // Map nix::Error to std::io::Error for CommandError::Kill
304                    return Err(CommandError::Kill(std::io::Error::new(
305                        std::io::ErrorKind::Other,
306                        format!("Failed to kill process group for PID {}: {}", pid_u32, e),
307                    )));
308                }
309            }
310        }
311    } else {
312        // This case should be extremely unlikely if spawn succeeded.
313        warn!(
314            "Could not get PID to kill process for timeout. Process might have exited abnormally."
315        );
316        // Cannot attempt kill, treat as timed out, let finalize_exit_status handle wait().
317        Ok(None)
318    }
319}
320
321/// The main `select!` loop monitoring the process, streams, and timeouts.
322async fn run_command_loop(
323    state: &mut CommandExecutionState<impl AsyncRead + Unpin, impl AsyncRead + Unpin>,
324    timeouts: &TimeoutConfig,
325) -> Result<(), CommandError> {
326    loop {
327        let deadline_sleep = sleep_until(state.current_deadline.into());
328        tokio::pin!(deadline_sleep);
329
330        // Conditions to enable select branches
331        let can_read_stdout = state.stdout_reader.is_some() && state.exit_status.is_none();
332        let can_read_stderr = state.stderr_reader.is_some() && state.exit_status.is_none();
333        let can_check_exit = state.exit_status.is_none();
334        let can_check_timeout = state.exit_status.is_none();
335
336        tokio::select! {
337            biased; // Prioritize checking exit status
338
339            // 1. Check for process exit
340            result = state.child.wait(), if can_check_exit => {
341                state.exit_status = match result {
342                    Ok(status) => {
343                        debug!(status = %status, "Process exited naturally");
344                        Some(status)
345                    },
346                    Err(e) => {
347                        warn!(error = %e, "Error waiting for process exit");
348                        return Err(CommandError::Wait(e));
349                    }
350                };
351                break; // Process finished naturally
352            }
353
354            // 2. Read from stdout (Safer access pattern)
355            read_result = async {
356                if let Some(reader) = state.stdout_reader.as_mut() {
357                    if state.exit_status.is_none() {
358                       read_stream_chunk(reader, &mut state.stdout_read_buffer).await
359                    } else { Ok(None) } // Treat as EOF if process exited
360                } else { Ok(None) } // Reader gone
361            }, if can_read_stdout => {
362                match read_result {
363                    Ok(Some(n)) => {
364                        state.stdout_buffer.extend_from_slice(&state.stdout_read_buffer[..n]);
365                        handle_stream_activity(n, "stdout", &mut state.current_deadline, timeouts);
366                    }
367                    Ok(None) => { // EOF or reader gone or process exited during poll
368                        if state.stdout_reader.is_some() {
369                           debug!("Stdout pipe closed (EOF) or process exited during read.");
370                           state.stdout_reader = None; // Mark as closed
371                        }
372                    }
373                    Err(e) => {
374                        warn!(error = %e, "Error reading stdout");
375                        state.stdout_read_buffer.clear(); // Clear buffer even on error
376                        return Err(CommandError::Io(e));
377                    }
378                }
379                state.stdout_read_buffer.clear(); // Clear after processing
380            }
381
382            // 3. Read from stderr (Safer access pattern)
383            read_result = async {
384                 if let Some(reader) = state.stderr_reader.as_mut() {
385                     if state.exit_status.is_none() {
386                        read_stream_chunk(reader, &mut state.stderr_read_buffer).await
387                     } else { Ok(None) } // Treat as EOF if process exited
388                 } else { Ok(None) } // Reader gone
389            }, if can_read_stderr => {
390                 match read_result {
391                    Ok(Some(n)) => {
392                        state.stderr_buffer.extend_from_slice(&state.stderr_read_buffer[..n]);
393                        handle_stream_activity(n, "stderr", &mut state.current_deadline, timeouts);
394                    }
395                    Ok(None) => { // EOF or reader gone or process exited during poll
396                         if state.stderr_reader.is_some() {
397                            debug!("Stderr pipe closed (EOF) or process exited during read.");
398                            state.stderr_reader = None; // Mark as closed
399                         }
400                    }
401                    Err(e) => {
402                        warn!(error = %e, "Error reading stderr");
403                        state.stderr_read_buffer.clear(); // Clear on error
404                        return Err(CommandError::Io(e));
405                    }
406                }
407                state.stderr_read_buffer.clear(); // Clear after processing
408            }
409
410            
411// 4. Check for timeout
412_ = &mut deadline_sleep, if can_check_timeout => {
413    let now = Instant::now();
414    let triggered_deadline = if now >= timeouts.absolute_deadline {
415        debug!("Absolute deadline exceeded. Triggering timeout.");
416        timeouts.absolute_deadline
417    } else {
418        debug!("Activity timeout likely exceeded. Triggering timeout.");
419        state.current_deadline
420    };
421
422    match handle_timeout_event(&mut state.child, triggered_deadline, timeouts).await {
423        Ok(Some(status)) => {
424            debug!("Timeout detected but process already exited.");
425            state.exit_status = Some(status);
426            state.timed_out = false; // Not actually killed by us
427        }
428        Ok(None) => {
429            state.timed_out = true; // Timeout occurred, kill attempted/succeeded.
430        }
431        Err(e) => {
432            return Err(e); // Error during kill or subsequent check
433        }
434    }
435    break; // Exit loop after timeout event
436}
437}
438} // end loop
439
440    Ok(())
441}
442
443/// Waits for the child process to exit if it was killed and status isn't known yet.
444async fn finalize_exit_status(
445    child: &mut Child,
446    current_status: Option<ExitStatus>,
447    timed_out: bool,
448) -> Result<Option<ExitStatus>, CommandError> {
449    if timed_out && current_status.is_none() {
450        debug!(
451            pid = child.id(),
452            "Waiting for process to exit after kill signal..."
453        );
454        match child.wait().await {
455            Ok(status) => {
456                debug!(pid = child.id(), status = %status, "Process exited after kill");
457                Ok(Some(status))
458            }
459            Err(e) => {
460                warn!(pid = child.id(), error = %e, "Error waiting for process exit after kill. Proceeding without status.");
461                Ok(None) // Kill attempted, but final status couldn't be obtained
462            }
463        }
464    } else {
465        Ok(current_status) // Already have status, or didn't time out
466    }
467}
468
469// --- Public API Function ---
470
471/// Runs a standard library `Command` asynchronously with sophisticated timeout logic.
472///
473/// (Rustdoc remains the same)
474#[instrument(skip(command), fields(command = ?command.get_program(), args = ?command.get_args()))]
475pub async fn run_command_with_timeout(
476    mut command: StdCommand,
477    minimum_timeout: Duration,
478    maximum_timeout: Duration,
479    activity_timeout: Duration,
480) -> Result<CommandOutput, CommandError> {
481    validate_timeouts(minimum_timeout, maximum_timeout, activity_timeout)?;
482
483    let start_time = Instant::now();
484    let absolute_deadline = start_time + maximum_timeout;
485    let initial_deadline = std::cmp::min(
486        absolute_deadline,
487        start_time + std::cmp::max(minimum_timeout, activity_timeout),
488    );
489
490    let timeout_config = TimeoutConfig {
491        minimum: minimum_timeout,
492        maximum: maximum_timeout,
493        activity: activity_timeout,
494        start_time,
495        absolute_deadline,
496    };
497
498    // Configure the command to run in its own process group
499    // This MUST be done before spawning the command.
500    // Take ownership to modify, then pass the modified command to spawn_command_and_setup_state
501    let mut std_cmd = std::mem::replace(&mut command, StdCommand::new("")); // Take ownership temporarily
502    unsafe {
503        std_cmd.pre_exec(|| {
504            // libc::setpgid(0, 0) makes the new process its own group leader.
505            // Pass 0 for both pid and pgid to achieve this for the calling process.
506            if libc::setpgid(0, 0) == 0 {
507                Ok(())
508            } else {
509                // Capture the error from the OS if setpgid fails
510                Err(std::io::Error::last_os_error())
511            }
512        });
513    }
514    // Put the modified command back for spawning
515    command = std_cmd;
516
517    // Setup state (spawns command with pre_exec hook)
518    let mut state = spawn_command_and_setup_state(&mut command, initial_deadline)?;
519
520    // Main execution loop
521    run_command_loop(&mut state, &timeout_config).await?;
522
523    // Drain remaining output after loop exit (natural exit or timeout break)
524    debug!("Command loop finished. Draining remaining output streams.");
525    drain_reader(
526        &mut state.stdout_reader,
527        &mut state.stdout_buffer,
528        &mut state.stdout_read_buffer,
529        "stdout",
530    )
531    .await?;
532    drain_reader(
533        &mut state.stderr_reader,
534        &mut state.stderr_buffer,
535        &mut state.stderr_read_buffer,
536        "stderr",
537    )
538    .await?;
539
540    // Post-loop processing: Final wait if killed and status not yet obtained
541    let final_exit_status = finalize_exit_status(
542        &mut state.child,
543        state.exit_status, // Use status potentially set in loop
544        state.timed_out,
545    )
546    .await?;
547
548    let end_time = Instant::now();
549    let duration = end_time.duration_since(start_time);
550
551    debug!(
552        duration = ?duration,
553        exit_status = ?final_exit_status,
554        timed_out = state.timed_out,
555        stdout_len = state.stdout_buffer.len(),
556        stderr_len = state.stderr_buffer.len(),
557        "Command execution finished."
558    );
559
560    Ok(CommandOutput {
561        stdout: state.stdout_buffer,
562        stderr: state.stderr_buffer,
563        exit_status: final_exit_status,
564        duration,
565        timed_out: state.timed_out,
566    })
567}
568
569// ----------- Tests -----------
570#[cfg(test)]
571mod tests {
572    use super::*;
573    use libc;
574    use std::os::unix::process::ExitStatusExt; // For signal checking
575    use tokio::runtime::Runtime;
576    use tracing_subscriber::{fmt, EnvFilter}; // Make sure libc is in scope for SIGKILL constant
577
578    // Helper to initialize tracing for tests
579    fn setup_tracing() {
580        // Use `RUST_LOG=debug` env var to see logs, default info
581        let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
582        fmt()
583            .with_env_filter(filter)
584            .with_test_writer()
585            .try_init()
586            .ok(); // ok() ignores errors if already initialized
587    }
588
589    // Helper to run async tests
590    fn run_async_test<F, Fut>(test_fn: F)
591    where
592        F: FnOnce() -> Fut,
593        Fut: std::future::Future<Output = ()>,
594    {
595        setup_tracing();
596        let rt = Runtime::new().unwrap();
597        rt.block_on(test_fn());
598    }
599
600    #[test]
601    fn test_command_runs_successfully_within_timeouts() {
602        run_async_test(|| async {
603            let mut cmd = StdCommand::new("sh");
604            cmd.arg("-c")
605                .arg("echo 'Hello'; sleep 0.1; echo 'World' >&2");
606
607            let min_timeout = Duration::from_millis(50);
608            let max_timeout = Duration::from_secs(2);
609            let activity_timeout = Duration::from_secs(1);
610
611            let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
612                .await
613                .expect("Command failed unexpectedly");
614
615            assert_eq!(result.stdout, b"Hello\n");
616            assert_eq!(result.stderr, b"World\n");
617            assert!(result.exit_status.is_some(), "Exit status should be Some");
618            assert_eq!(
619                result.exit_status.unwrap().code(),
620                Some(0),
621                "Exit code should be 0"
622            );
623            assert!(!result.timed_out, "Should not have timed out");
624            assert!(
625                result.duration >= Duration::from_millis(100),
626                "Duration should be >= 100ms"
627            );
628            assert!(
629                result.duration < max_timeout,
630                "Duration should be < max_timeout"
631            );
632        });
633    }
634
635    #[test]
636    fn test_command_exits_quickly_before_min_timeout() {
637        run_async_test(|| async {
638            let mut cmd = StdCommand::new("echo");
639            cmd.arg("Immediate exit");
640
641            let min_timeout = Duration::from_secs(2); // Long min timeout
642            let max_timeout = Duration::from_secs(5);
643            let activity_timeout = Duration::from_secs(1);
644
645            let start = Instant::now();
646            let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
647                .await
648                .expect("Command failed unexpectedly");
649
650            let duration = start.elapsed();
651
652            assert_eq!(result.stdout, b"Immediate exit\n");
653            assert!(result.stderr.is_empty(), "Stderr should be empty");
654            assert!(result.exit_status.is_some(), "Exit status should be Some");
655            assert_eq!(
656                result.exit_status.unwrap().code(),
657                Some(0),
658                "Exit code should be 0"
659            );
660            assert!(!result.timed_out, "Should not have timed out");
661            assert!(
662                duration < Duration::from_millis(500),
663                "Test duration should be short"
664            );
665            assert!(
666                result.duration < Duration::from_millis(500),
667                "Reported duration should be short"
668            );
669        });
670    }
671
672    #[test]
673    fn test_maximum_timeout_kills_long_running_command() {
674        run_async_test(|| async {
675            let mut cmd = StdCommand::new("sleep");
676            cmd.arg("5"); // Sleeps for 5 seconds
677
678            let min_timeout = Duration::from_millis(100);
679            let max_timeout = Duration::from_secs(1); // Max timeout is 1 second
680            let activity_timeout = Duration::from_secs(10); // Activity > max to ensure max triggers
681
682            let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
683                .await
684                .expect("Command failed unexpectedly");
685
686            assert!(result.stdout.is_empty(), "Stdout should be empty");
687            assert!(result.stderr.is_empty(), "Stderr should be empty");
688            assert!(
689                result.exit_status.is_some(),
690                "Exit status should be Some after kill"
691            );
692            // SIGKILL is signal 9
693            assert_eq!(
694                result.exit_status.unwrap().signal(),
695                Some(libc::SIGKILL as i32),
696                "Should be killed by SIGKILL"
697            );
698            assert!(result.timed_out, "Should have timed out");
699            assert!(
700                result.duration >= max_timeout,
701                "Duration should be >= max_timeout"
702            );
703            // Allow slightly more buffer for process group kill and reaping
704            assert!(
705                result.duration < max_timeout + Duration::from_millis(750),
706                "Duration allow buffer"
707            );
708        });
709    }
710
711    #[test]
712    fn test_activity_timeout_kills_idle_command_after_min_timeout() {
713        run_async_test(|| async {
714            let mut cmd = StdCommand::new("sh");
715            cmd.arg("-c")
716                .arg("echo 'Initial output'; sleep 5; echo 'This should not appear'");
717
718            let min_timeout = Duration::from_millis(200);
719            let max_timeout = Duration::from_secs(10);
720            let activity_timeout = Duration::from_secs(1); // Kill after 1s of inactivity
721
722            let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
723                .await
724                .expect("Command failed unexpectedly");
725
726            assert_eq!(result.stdout, b"Initial output\n");
727            assert!(result.stderr.is_empty(), "Stderr should be empty");
728            assert!(
729                result.exit_status.is_some(),
730                "Exit status should be Some after kill"
731            );
732            // SIGKILL is signal 9
733            assert_eq!(
734                result.exit_status.unwrap().signal(),
735                Some(libc::SIGKILL as i32),
736                "Should be killed by SIGKILL"
737            );
738            assert!(result.timed_out, "Should have timed out");
739
740            // Duration Assertions (Process group kill should be faster now)
741            // 1. Should run for at least the minimum guaranteed time
742            assert!(
743                result.duration >= min_timeout,
744                "Duration ({:?}) should be >= min_timeout ({:?})",
745                result.duration,
746                min_timeout
747            );
748
749            // 2. Should run for approximately the activity timeout after the initial echo.
750            let lower_bound = activity_timeout; // Kill signal sent *at* activity timeout
751            let upper_bound = activity_timeout + Duration::from_millis(750); // Allow generous buffer for reaping
752            assert!(
753                result.duration >= lower_bound,
754                "Duration ({:?}) should be >= activity_timeout ({:?})",
755                result.duration,
756                lower_bound
757            );
758            assert!(
759                result.duration < upper_bound,
760                "Duration ({:?}) should be < activity_timeout plus buffer ({:?})",
761                result.duration,
762                upper_bound
763            );
764
765            // 3. Must be killed before the internal sleep finishes
766            assert!(
767                result.duration < Duration::from_secs(5),
768                "Should be killed before sleep 5 ends"
769            );
770        });
771    }
772
773    #[test]
774    fn test_activity_resets_timeout_allowing_completion() {
775        run_async_test(|| async {
776            let mut cmd = StdCommand::new("sh");
777            cmd.arg("-c")
778                .arg("echo '1'; sleep 0.5; echo '2' >&2; sleep 0.5; echo '3'; sleep 0.5; echo '4'");
779
780            let min_timeout = Duration::from_millis(100);
781            let max_timeout = Duration::from_secs(5);
782            let activity_timeout = Duration::from_secs(1); // Activity timeout > sleep interval
783
784            let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
785                .await
786                .expect("Command failed unexpectedly");
787
788            assert_eq!(result.stdout, b"1\n3\n4\n");
789            assert_eq!(result.stderr, b"2\n");
790            assert!(result.exit_status.is_some(), "Exit status should be Some");
791            assert_eq!(
792                result.exit_status.unwrap().code(),
793                Some(0),
794                "Exit code should be 0"
795            );
796            assert!(!result.timed_out, "Should not have timed out");
797            assert!(
798                result.duration > Duration::from_secs(1),
799                "Duration should be > 1s (actual ~1.5s)"
800            );
801            assert!(
802                result.duration < max_timeout,
803                "Duration should be < max_timeout"
804            );
805        });
806    }
807
808    #[test]
809    fn test_binary_output_is_handled() {
810        run_async_test(|| async {
811            let mut cmd = StdCommand::new("head");
812            cmd.arg("-c").arg("50").arg("/dev/urandom");
813
814            let min_timeout = Duration::from_millis(50);
815            let max_timeout = Duration::from_secs(2);
816            let activity_timeout = Duration::from_secs(1);
817
818            let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
819                .await
820                .expect("Command failed unexpectedly");
821
822            assert_eq!(result.stdout.len(), 50, "Stdout length should be 50");
823            assert!(result.stderr.is_empty(), "Stderr should be empty");
824            assert!(result.exit_status.is_some(), "Exit status should be Some");
825            assert_eq!(
826                result.exit_status.unwrap().code(),
827                Some(0),
828                "Exit code should be 0"
829            );
830            assert!(!result.timed_out, "Should not have timed out");
831        });
832    }
833
834    #[test]
835    fn test_command_not_found() {
836        run_async_test(|| async {
837            let cmd = StdCommand::new("a_command_that_does_not_exist_hopefully"); // removed mut
838
839            let min_timeout = Duration::from_millis(50);
840            let max_timeout = Duration::from_secs(2);
841            let activity_timeout = Duration::from_secs(1);
842
843            let result =
844                run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout).await;
845
846            assert!(result.is_err(), "Should return error");
847            match result.err().unwrap() {
848                CommandError::Spawn(e) => {
849                    assert_eq!(
850                        e.kind(),
851                        std::io::ErrorKind::NotFound,
852                        "Error kind should be NotFound"
853                    );
854                }
855                e => panic!("Expected CommandError::Spawn, got {:?}", e),
856            }
857        });
858    }
859
860    #[test]
861    fn test_min_timeout_greater_than_max_timeout() {
862        run_async_test(|| async {
863            let cmd = StdCommand::new("echo"); // removed mut
864                                               // cmd.arg("test"); // Don't need args
865
866            let min_timeout = Duration::from_secs(2);
867            let max_timeout = Duration::from_secs(1); // Invalid config
868            let activity_timeout = Duration::from_secs(1);
869
870            let result =
871                run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout).await;
872
873            assert!(result.is_err(), "Should return error");
874            match result.err().unwrap() {
875                CommandError::InvalidTimeout(_) => {} // Expected error
876                e => panic!("Expected CommandError::InvalidTimeout, got {:?}", e),
877            }
878        });
879    }
880
881    #[test]
882    fn test_zero_activity_timeout() {
883        run_async_test(|| async {
884            let cmd = StdCommand::new("echo"); // removed mut
885                                               // cmd.arg("test"); // Don't need args
886
887            let min_timeout = Duration::from_millis(100);
888            let max_timeout = Duration::from_secs(1);
889            let activity_timeout = Duration::ZERO; // Invalid config
890
891            let result =
892                run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout).await;
893
894            assert!(result.is_err(), "Should return error");
895            match result.err().unwrap() {
896                CommandError::InvalidTimeout(_) => {} // Expected error
897                e => panic!("Expected CommandError::InvalidTimeout, got {:?}", e),
898            }
899        });
900    }
901
902    #[test]
903    fn test_process_exits_with_error_code() {
904        run_async_test(|| async {
905            let mut cmd = StdCommand::new("sh");
906            cmd.arg("-c").arg("echo 'Error message' >&2; exit 55");
907
908            let min_timeout = Duration::from_millis(50);
909            let max_timeout = Duration::from_secs(2);
910            let activity_timeout = Duration::from_secs(1);
911
912            let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
913                .await
914                .expect("Command failed unexpectedly");
915
916            assert!(result.stdout.is_empty(), "Stdout should be empty");
917            assert_eq!(result.stderr, b"Error message\n");
918            assert!(result.exit_status.is_some(), "Exit status should be Some");
919            assert_eq!(
920                result.exit_status.unwrap().code(),
921                Some(55),
922                "Exit code should be 55"
923            );
924            assert!(!result.timed_out, "Should not have timed out");
925        });
926    }
927
928    #[test]
929    fn test_continuous_output_does_not_timeout() {
930        run_async_test(|| async {
931            let mut cmd = StdCommand::new("sh");
932            // Continuously output numbers for ~2 seconds, sleeping shortly
933            cmd.arg("-c")
934                .arg("i=0; while [ $i -lt 20 ]; do echo $i; i=$((i+1)); sleep 0.1; done");
935
936            let min_timeout = Duration::from_millis(50);
937            let max_timeout = Duration::from_secs(10);
938            let activity_timeout = Duration::from_millis(500); // activity > sleep
939
940            let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
941                .await
942                .expect("Command failed unexpectedly");
943
944            assert!(!result.stdout.is_empty(), "Stdout should not be empty");
945            assert!(result.stderr.is_empty(), "Stderr should be empty");
946            assert!(result.exit_status.is_some(), "Exit status should be Some");
947            assert_eq!(
948                result.exit_status.unwrap().code(),
949                Some(0),
950                "Exit code should be 0"
951            );
952            assert!(!result.timed_out, "Should not have timed out");
953            assert!(
954                result.duration > Duration::from_secs(2),
955                "Duration should be > 2s"
956            ); // 20 * 0.1s
957            assert!(
958                result.duration < Duration::from_secs(3),
959                "Duration should be < 3s"
960            );
961        });
962    }
963
964    #[test]
965    fn test_timeout_immediately_if_min_timeout_is_zero_and_no_activity() {
966        run_async_test(|| async {
967            let mut cmd = StdCommand::new("sleep");
968            cmd.arg("5");
969
970            let min_timeout = Duration::ZERO; // Allows immediate check
971            let max_timeout = Duration::from_secs(10);
972            let activity_timeout = Duration::from_millis(100); // Check quickly
973
974            let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
975                .await
976                .expect("Command failed unexpectedly");
977
978            assert!(result.stdout.is_empty(), "Stdout should be empty");
979            assert!(result.stderr.is_empty(), "Stderr should be empty");
980            assert!(
981                result.exit_status.is_some(),
982                "Exit status should be Some after kill"
983            );
984            // SIGKILL is signal 9
985            assert_eq!(
986                result.exit_status.unwrap().signal(),
987                Some(libc::SIGKILL as i32),
988                "Should be killed by SIGKILL"
989            );
990            assert!(result.timed_out, "Should have timed out");
991            // Should be killed after activity_timeout (since min is 0)
992            assert!(
993                result.duration >= activity_timeout,
994                "Duration should be >= activity_timeout"
995            );
996            // Allow slightly more buffer for process group kill and reaping
997            assert!(
998                result.duration < activity_timeout + Duration::from_millis(750),
999                "Duration allow buffer"
1000            );
1001        });
1002    }
1003
1004    //  ----- tests for calculate_new_deadline -----
1005    #[test]
1006fn test_calculate_new_deadline_absolute_deadline_passed() {
1007    let absolute_deadline = Instant::now() - Duration::from_secs(1); // Already passed
1008    let activity_timeout = Duration::from_secs(5);
1009
1010    let new_deadline = calculate_new_deadline(absolute_deadline, activity_timeout);
1011
1012    assert_eq!(
1013        new_deadline, absolute_deadline,
1014        "New deadline should be the absolute deadline when it has already passed"
1015    );
1016}
1017
1018#[test]
1019fn test_calculate_new_deadline_activity_timeout_before_absolute_deadline() {
1020    let absolute_deadline = Instant::now() + Duration::from_secs(10);
1021    let activity_timeout = Duration::from_secs(5);
1022
1023    let new_deadline = calculate_new_deadline(absolute_deadline, activity_timeout);
1024
1025    assert!(
1026        new_deadline <= absolute_deadline,
1027        "New deadline should not exceed the absolute deadline"
1028    );
1029    assert!(
1030        new_deadline > Instant::now(),
1031        "New deadline should be in the future"
1032    );
1033}
1034    // ----- tests for handle_stream_activity -----
1035    #[test]
1036fn test_handle_stream_activity_updates_deadline() {
1037    let mut current_deadline = Instant::now() + Duration::from_secs(5);
1038    let timeouts = TimeoutConfig {
1039        minimum: Duration::from_secs(1),
1040        maximum: Duration::from_secs(10),
1041        activity: Duration::from_secs(3),
1042        start_time: Instant::now(),
1043        absolute_deadline: Instant::now() + Duration::from_secs(10),
1044    };
1045
1046    handle_stream_activity(10, "stdout", &mut current_deadline, &timeouts);
1047
1048    assert!(
1049        current_deadline > Instant::now(),
1050        "Current deadline should be updated to a future time"
1051    );
1052    assert!(
1053        current_deadline <= timeouts.absolute_deadline,
1054        "Current deadline should not exceed the absolute deadline"
1055    );
1056}
1057
1058#[test]
1059fn test_handle_stream_activity_no_update_at_absolute_limit() {
1060    let absolute_deadline = Instant::now() + Duration::from_secs(5);
1061    let mut current_deadline = absolute_deadline; // Already at the absolute limit
1062    let timeouts = TimeoutConfig {
1063        minimum: Duration::from_secs(1),
1064        maximum: Duration::from_secs(10),
1065        activity: Duration::from_secs(3),
1066        start_time: Instant::now(),
1067        absolute_deadline,
1068    };
1069
1070    handle_stream_activity(10, "stderr", &mut current_deadline, &timeouts);
1071
1072    assert_eq!(
1073        current_deadline, absolute_deadline,
1074        "Current deadline should remain unchanged when at the absolute limit"
1075    );
1076}
1077
1078    // ----- tests for run_command_loop -----
1079#[test]
1080fn test_run_command_loop_exits_on_process_finish() {
1081    run_async_test(|| async {
1082        let mut cmd = StdCommand::new("echo");
1083        cmd.arg("Test");
1084
1085        let timeouts = TimeoutConfig {
1086            minimum: Duration::from_secs(1),
1087            maximum: Duration::from_secs(5),
1088            activity: Duration::from_secs(2),
1089            start_time: Instant::now(),
1090            absolute_deadline: Instant::now() + Duration::from_secs(5),
1091        };
1092
1093        let mut state = spawn_command_and_setup_state(&mut cmd, timeouts.absolute_deadline)
1094            .expect("Failed to spawn command");
1095
1096        let result = run_command_loop(&mut state, &timeouts).await;
1097
1098        assert!(result.is_ok(), "Command loop should exit without errors");
1099        assert!(
1100            state.exit_status.is_some(),
1101            "Exit status should be set when process finishes naturally"
1102        );
1103    });
1104}
1105
1106#[test]
1107fn test_run_command_loop_exits_on_timeout() {
1108    run_async_test(|| async {
1109        let mut cmd = StdCommand::new("sleep");
1110        cmd.arg("5");
1111
1112        let timeouts = TimeoutConfig {
1113            minimum: Duration::from_secs(1),
1114            maximum: Duration::from_secs(2), // Short timeout
1115            activity: Duration::from_secs(10),
1116            start_time: Instant::now(),
1117            absolute_deadline: Instant::now() + Duration::from_secs(2),
1118        };
1119
1120        let mut state = spawn_command_and_setup_state(&mut cmd, timeouts.absolute_deadline)
1121            .expect("Failed to spawn command");
1122
1123        let result = run_command_loop(&mut state, &timeouts).await;
1124
1125        assert!(result.is_ok(), "Command loop should exit without errors");
1126        assert!(
1127            state.exit_status.is_none(),
1128            "Exit status should be None when process is killed due to timeout"
1129        );
1130        assert!(state.timed_out, "State should indicate that the process timed out");
1131    });
1132}
1133
1134
1135#[test]
1136fn test_absolute_deadline_kills_infinite_loop_command() {
1137    run_async_test(|| async {
1138        let mut cmd = StdCommand::new("sh");
1139        cmd.arg("-c").arg("while true; do :; done"); // Infinite loop
1140
1141        let min_timeout = Duration::from_secs(1);
1142        let max_timeout = Duration::from_secs(2); // Absolute deadline of 2 seconds
1143        let activity_timeout = Duration::from_secs(10); // Irrelevant since absolute deadline is shorter
1144
1145        let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
1146            .await
1147            .expect("Command failed unexpectedly");
1148
1149        assert!(result.stdout.is_empty(), "Stdout should be empty");
1150        assert!(result.stderr.is_empty(), "Stderr should be empty");
1151        assert!(
1152            result.exit_status.is_some(),
1153            "Exit status should be Some after kill"
1154        );
1155        // SIGKILL is signal 9
1156        assert_eq!(
1157            result.exit_status.unwrap().signal(),
1158            Some(libc::SIGKILL as i32),
1159            "Should be killed by SIGKILL"
1160        );
1161        assert!(result.timed_out, "Should have timed out");
1162        assert!(
1163            result.duration >= max_timeout,
1164            "Duration should be >= max_timeout"
1165        );
1166        assert!(
1167            result.duration < max_timeout + Duration::from_millis(750),
1168            "Duration should allow a small buffer for process group kill and reaping"
1169        );
1170    });
1171}
1172
1173#[test]
1174fn test_infinite_output_command() {
1175    run_async_test(|| async {
1176        let mut cmd = StdCommand::new("yes");
1177        cmd.arg("infinite");
1178
1179        let min_timeout = Duration::from_secs(1);
1180        let max_timeout = Duration::from_secs(2); // Absolute deadline of 2 seconds
1181        let activity_timeout = Duration::from_secs(1); // Activity timeout of 1 second
1182
1183        let result = run_command_with_timeout(cmd, min_timeout, max_timeout, activity_timeout)
1184            .await
1185            .expect("Command failed unexpectedly");
1186
1187        assert!(
1188            !result.stdout.is_empty(),
1189            "Stdout should not be empty for infinite output"
1190        );
1191        assert!(
1192            result.stderr.is_empty(),
1193            "Stderr should be empty for the `yes` command"
1194        );
1195        assert!(
1196            result.exit_status.is_some(),
1197            "Exit status should be Some after timeout"
1198        );
1199        // SIGKILL is signal 9
1200        assert_eq!(
1201            result.exit_status.unwrap().signal(),
1202            Some(libc::SIGKILL as i32),
1203            "Should be killed by SIGKILL"
1204        );
1205        assert!(result.timed_out, "Should have timed out");
1206        assert!(
1207            result.duration >= max_timeout,
1208            "Duration should be >= max_timeout"
1209        );
1210        assert!(
1211            result.duration < max_timeout + Duration::from_millis(750),
1212            "Duration should allow a small buffer for process group kill and reaping"
1213        );
1214    });
1215}
1216
1217
1218} // end tests mod