Skip to main content

openjd_sessions/
subprocess.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// Copyright by contributors to this project.
3// SPDX-License-Identifier: (Apache-2.0 OR MIT)
4
5//! Async subprocess execution with real-time message streaming.
6
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::time::Duration;
10
11use tokio::io::{AsyncBufReadExt, BufReader};
12use tokio::process::Command;
13use tokio::sync::mpsc;
14use tokio_util::sync::CancellationToken;
15
16use crate::action::ActionMessage;
17use crate::action::ActionState;
18use crate::action_filter::{ActionFilter, ActionMessageKind, ActionMessageValue};
19use crate::error::SessionError;
20use crate::logging::LogContent;
21use crate::runner::CancelMethod;
22use crate::session_log;
23use crate::session_user::SessionUser;
24use std::sync::Arc;
25
26/// Grace time to wait for `c.wait()` to reap the child when the stdout
27/// read loop exited through a non-kill path (natural EOF, read error) or
28/// after a `NotifyThenTerminate` cancellation where the process may still
29/// be winding down gracefully in response to the notify signal. Used
30/// when no terminate has been issued yet from inside the stdout-read
31/// loop.
32const STDOUT_GRACE_TIME: Duration = Duration::from_secs(5);
33
34/// Grace time to wait for `c.wait()` to reap the child after we have
35/// already issued `send_terminate` from inside the stdout-read loop
36/// (timeout fired, urgent cancel with `time_limit=0`, or
37/// `CancelMethod::Terminate`).
38///
39/// `send_terminate` resolves to SIGKILL on Unix and `TerminateProcess`
40/// (via `kill_process_tree`) on Windows. In these cases the process
41/// tree was killed at least `STDOUT_DRAIN_AFTER_KILL` (1s) ago — by
42/// the time the drain deadline fires and we get here, `c.wait()`
43/// should reap the already-dead child almost immediately. Two seconds
44/// is a generous bound for runtime scheduling delay (especially on
45/// loaded Windows CI) while still being much tighter than the 5s we
46/// allow on graceful-exit paths.
47///
48/// A shorter bound here means the three "timeout should have fired
49/// quickly" integration tests in `tests/test_session.rs` don't pick up
50/// 3-4s of dead time on every run, which in turn means their 10s
51/// assertion holds with comfortable margin on slow CI.
52const STDOUT_GRACE_TIME_POST_TERMINATE: Duration = Duration::from_secs(2);
53
54/// Grace time to drain stdout after sending a kill signal.
55const STDOUT_DRAIN_AFTER_KILL: Duration = Duration::from_secs(1);
56
57/// Maximum line length for stdout reading.
58pub(crate) const LOG_LINE_MAX_LENGTH: usize = 64 * 1024;
59
60/// Truncate a line to at most `LOG_LINE_MAX_LENGTH` bytes on a valid UTF-8 char boundary.
61pub(crate) fn truncate_line(line: &str) -> &str {
62    if line.len() > LOG_LINE_MAX_LENGTH {
63        &line[..line.floor_char_boundary(LOG_LINE_MAX_LENGTH)]
64    } else {
65        line
66    }
67}
68
69/// Result of running a subprocess action.
70#[derive(Debug)]
71pub struct SubprocessResult {
72    pub state: ActionState,
73    pub exit_code: Option<i32>,
74    pub stdout: String,
75}
76
77/// Configuration for running a subprocess.
78pub struct SubprocessConfig {
79    pub args: Vec<String>,
80    pub env_vars: HashMap<String, Option<String>>,
81    pub working_dir: Option<PathBuf>,
82    pub timeout: Option<Duration>,
83    pub user: Option<Arc<dyn SessionUser>>,
84    pub cancel_method: CancelMethod,
85    pub cancel_request_rx: Option<tokio::sync::watch::Receiver<Option<Duration>>>,
86    /// Whether to accumulate all stdout into `SubprocessResult.stdout`.
87    /// Intended for debugging only — production callers should leave this
88    /// `false` and observe output through the real-time callback.
89    /// Default is `false` — lines are still streamed through the filter and
90    /// callback in real time, but the collected string stays empty.
91    pub debug_collect_stdout: bool,
92}
93
94// ---------------------------------------------------------------------------
95// Platform-specific signal / process-group helpers
96// ---------------------------------------------------------------------------
97
98#[cfg(unix)]
99mod platform {
100    use super::*;
101
102    /// Send SIGTERM to the process group.
103    pub fn notify_process_group(pgid: i32) -> Result<(), std::io::Error> {
104        nix::sys::signal::killpg(
105            nix::unistd::Pid::from_raw(pgid),
106            nix::sys::signal::Signal::SIGTERM,
107        )
108        .map_err(std::io::Error::other)
109    }
110
111    /// Send SIGKILL to the process group.
112    pub fn terminate_process_group(pgid: i32) -> Result<(), std::io::Error> {
113        nix::sys::signal::killpg(
114            nix::unistd::Pid::from_raw(pgid),
115            nix::sys::signal::Signal::SIGKILL,
116        )
117        .map_err(std::io::Error::other)
118    }
119
120    /// Send SIGKILL to the process group.
121    pub fn send_terminate(pid: i32) {
122        let _ = terminate_process_group(pid);
123    }
124
125    /// Send SIGTERM to the process group.
126    pub fn send_notify(pid: i32) {
127        let _ = notify_process_group(pid);
128    }
129
130    /// Spawn a delayed SIGKILL after a grace period.
131    pub fn spawn_delayed_terminate(pid: i32, delay: Duration) {
132        tokio::spawn(async move {
133            tokio::time::sleep(delay).await;
134            let _ = terminate_process_group(pid);
135        });
136    }
137
138    /// Configure the Command for POSIX: setsid + dup2 stderr→stdout via pre_exec.
139    ///
140    /// Returns `None` — on POSIX the merge happens in the child via dup2,
141    /// so the caller reads from `child.stdout` as normal.
142    ///
143    /// # Safety
144    /// Calls `pre_exec` which runs in the forked child before exec.
145    pub unsafe fn configure_command(
146        cmd: &mut Command,
147        use_setsid: bool,
148    ) -> Option<Box<dyn tokio::io::AsyncRead + Unpin + Send>> {
149        cmd.pre_exec(move || {
150            // Redirect stderr to stdout so output ordering is preserved
151            if nix::libc::dup2(1, 2) == -1 {
152                return Err(std::io::Error::last_os_error());
153            }
154            if use_setsid {
155                nix::libc::setsid();
156            }
157            Ok(())
158        });
159        None
160    }
161}
162
163#[cfg(windows)]
164mod platform {
165    use super::*;
166
167    use windows::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
168    use windows::Win32::System::Threading::{
169        GetExitCodeProcess, OpenProcess, TerminateProcess, CREATE_NEW_PROCESS_GROUP,
170        PROCESS_QUERY_INFORMATION, PROCESS_TERMINATE,
171    };
172
173    /// Send CTRL_BREAK_EVENT to a process group for graceful cancellation.
174    ///
175    /// Mirrors Python's `_signal_win_subprocess.py`: detach from current console,
176    /// attach to the target's console, send CTRL_BREAK, then re-attach to our own.
177    ///
178    /// Note: When running as a Windows service (Session 0), console manipulation
179    /// doesn't work reliably. In that case we return false so the caller falls
180    /// back to terminate (immediate kill).
181    fn send_ctrl_break(pid: u32) -> bool {
182        use windows::Win32::System::Console::{
183            AttachConsole, FreeConsole, GenerateConsoleCtrlEvent, CTRL_BREAK_EVENT,
184        };
185
186        // Console APIs don't work from Session 0 (Windows services).
187        // Fall back to terminate for reliable cancellation.
188        if crate::win32::is_session_zero() {
189            log::info!(target: "openjd.sessions", "Running in Session 0, skipping CTRL_BREAK (will fall back to terminate)");
190            return false;
191        }
192
193        unsafe {
194            // Detach from our console
195            let _ = FreeConsole();
196            // Attach to the target process's console
197            if AttachConsole(pid).is_err() {
198                // Re-attach to parent if we can't attach to target
199                let _ = AttachConsole(u32::MAX); // ATTACH_PARENT_PROCESS
200                return false;
201            }
202            let ok = GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid).is_ok();
203            // Detach from target and re-attach to parent
204            let _ = FreeConsole();
205            let _ = AttachConsole(u32::MAX);
206            ok
207        }
208    }
209
210    /// Kill a single process by PID.
211    fn kill_process(pid: u32) -> bool {
212        unsafe {
213            let handle = OpenProcess(PROCESS_TERMINATE, false, pid);
214            if let Ok(h) = handle {
215                let ok = TerminateProcess(h, 1).is_ok();
216                let _ = CloseHandle(h);
217                ok
218            } else {
219                false
220            }
221        }
222    }
223
224    /// Check if a process is still alive.
225    #[allow(dead_code)]
226    fn is_process_alive(pid: u32) -> bool {
227        unsafe {
228            let handle = OpenProcess(PROCESS_QUERY_INFORMATION, false, pid);
229            if let Ok(h) = handle {
230                let mut code = 0u32;
231                let _ = GetExitCodeProcess(h, &mut code);
232                let _ = CloseHandle(h);
233                code == STILL_ACTIVE.0 as u32
234            } else {
235                false
236            }
237        }
238    }
239
240    /// Get child PIDs of a process using CreateToolhelp32Snapshot.
241    fn get_child_pids(parent_pid: u32) -> Vec<u32> {
242        use windows::Win32::System::Diagnostics::ToolHelp::{
243            CreateToolhelp32Snapshot, Process32FirstW, Process32NextW, PROCESSENTRY32W,
244            TH32CS_SNAPPROCESS,
245        };
246        let mut children = Vec::new();
247        unsafe {
248            let snap = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
249            if let Ok(snap) = snap {
250                let mut entry = PROCESSENTRY32W {
251                    dwSize: std::mem::size_of::<PROCESSENTRY32W>() as u32,
252                    ..Default::default()
253                };
254                if Process32FirstW(snap, &mut entry).is_ok() {
255                    loop {
256                        if entry.th32ParentProcessID == parent_pid {
257                            children.push(entry.th32ProcessID);
258                        }
259                        if Process32NextW(snap, &mut entry).is_err() {
260                            break;
261                        }
262                    }
263                }
264                let _ = CloseHandle(snap);
265            }
266        }
267        children
268    }
269
270    /// Kill a process tree: collect all descendants, then kill leaf-to-root.
271    /// Mirrors Python's `_windows_process_killer.py`.
272    fn kill_process_tree(root_pid: u32) {
273        let mut to_kill = Vec::new();
274        collect_tree(root_pid, &mut to_kill);
275        // Kill in reverse order (children first)
276        for &pid in to_kill.iter().rev() {
277            kill_process(pid);
278        }
279    }
280
281    fn collect_tree(pid: u32, result: &mut Vec<u32>) {
282        result.push(pid);
283        for child in get_child_pids(pid) {
284            collect_tree(child, result);
285        }
286    }
287
288    /// Terminate: kill the entire process tree.
289    pub fn send_terminate(pid: i32) {
290        kill_process_tree(pid as u32);
291    }
292
293    /// Notify: send CTRL_BREAK_EVENT for graceful shutdown.
294    pub fn send_notify(pid: i32) {
295        if !send_ctrl_break(pid as u32) {
296            log::warn!(target: "openjd.sessions", "Failed to send CTRL_BREAK to pid {pid}, falling back to terminate");
297            send_terminate(pid);
298        }
299    }
300
301    /// Delayed terminate: kill the process tree after a grace period.
302    pub fn spawn_delayed_terminate(pid: i32, delay: Duration) {
303        tokio::spawn(async move {
304            tokio::time::sleep(delay).await;
305            kill_process_tree(pid as u32);
306        });
307    }
308
309    /// Configure the Command for Windows: CREATE_NEW_PROCESS_GROUP + merge stderr into stdout.
310    ///
311    /// Creates a single OS pipe and sets both stdout and stderr to the write end,
312    /// mirroring POSIX `dup2(1, 2)`. Returns the read end as an async reader.
313    ///
314    /// # Safety
315    /// This function is safe on Windows (no pre_exec).
316    pub unsafe fn configure_command(
317        cmd: &mut Command,
318        _use_setsid: bool,
319    ) -> Option<Box<dyn tokio::io::AsyncRead + Unpin + Send>> {
320        use std::os::windows::io::{FromRawHandle, OwnedHandle};
321        use windows::Win32::Foundation::HANDLE;
322        use windows::Win32::Security::SECURITY_ATTRIBUTES;
323        use windows::Win32::System::Pipes::CreatePipe;
324
325        // CREATE_NEW_PROCESS_GROUP is required for CTRL_BREAK_EVENT to work
326        cmd.creation_flags(CREATE_NEW_PROCESS_GROUP.0);
327
328        // Create an anonymous pipe: read_handle for us, write_handle for the child
329        let mut read_handle = HANDLE::default();
330        let mut write_handle = HANDLE::default();
331        let sa = SECURITY_ATTRIBUTES {
332            nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
333            bInheritHandle: true.into(),
334            lpSecurityDescriptor: std::ptr::null_mut(),
335        };
336        if CreatePipe(&mut read_handle, &mut write_handle, Some(&sa), 0).is_err() {
337            // Fall back to separate pipes
338            cmd.stdout(std::process::Stdio::piped());
339            cmd.stderr(std::process::Stdio::piped());
340            return None;
341        }
342
343        // Convert write handle to Stdio for the child process.
344        // We need two copies: one for stdout, one for stderr.
345        let write_owned = OwnedHandle::from_raw_handle(write_handle.0);
346        let write_stdio_stdout = std::process::Stdio::from(write_owned);
347
348        // Duplicate the write handle for stderr
349        use windows::Win32::Foundation::DuplicateHandle;
350        use windows::Win32::System::Threading::GetCurrentProcess;
351        let mut write_handle_dup = HANDLE::default();
352        let current_process = GetCurrentProcess();
353        if DuplicateHandle(
354            current_process,
355            write_handle,
356            current_process,
357            &mut write_handle_dup,
358            0,
359            true, // bInheritHandle
360            windows::Win32::Foundation::DUPLICATE_SAME_ACCESS,
361        )
362        .is_err()
363        {
364            // Fall back: just use the one handle for stdout, pipe stderr separately
365            cmd.stdout(write_stdio_stdout);
366            cmd.stderr(std::process::Stdio::piped());
367            let read_owned = OwnedHandle::from_raw_handle(read_handle.0);
368            let read_std: std::fs::File = std::fs::File::from(read_owned);
369            let read_tokio = tokio::fs::File::from_std(read_std);
370            return Some(Box::new(read_tokio));
371        }
372        let write_owned_dup = OwnedHandle::from_raw_handle(write_handle_dup.0);
373        let write_stdio_stderr = std::process::Stdio::from(write_owned_dup);
374
375        cmd.stdout(write_stdio_stdout);
376        cmd.stderr(write_stdio_stderr);
377
378        // Convert read handle to an async reader
379        let read_owned = OwnedHandle::from_raw_handle(read_handle.0);
380        let read_std: std::fs::File = std::fs::File::from(read_owned);
381        let read_tokio = tokio::fs::File::from_std(read_std);
382        Some(Box::new(read_tokio))
383    }
384}
385
386use platform::*;
387
388/// Write cancel_info.json to the working directory as required by the OpenJD spec
389/// for NotifyThenTerminate cancelation.
390fn write_cancel_info(working_dir: &Path, terminate_delay: Duration) {
391    let notify_end = std::time::SystemTime::now() + terminate_delay;
392    let secs = notify_end
393        .duration_since(std::time::UNIX_EPOCH)
394        .unwrap_or_default()
395        .as_secs();
396    // Format as ISO 8601 UTC
397    let s = secs % 60;
398    let m = (secs / 60) % 60;
399    let h = (secs / 3600) % 24;
400    let total_days = secs / 86400;
401    // Simple date calculation from days since epoch
402    let (y, mo, d) = days_to_ymd(total_days);
403    let timestamp = format!("{y:04}-{mo:02}-{d:02}T{h:02}:{m:02}:{s:02}Z");
404    let info = serde_json::json!({ "NotifyEnd": timestamp });
405    let path = working_dir.join("cancel_info.json");
406    let _ = std::fs::write(&path, serde_json::to_string(&info).unwrap_or_default());
407}
408
409/// Convert days since Unix epoch to (year, month, day).
410fn days_to_ymd(total_days: u64) -> (u64, u64, u64) {
411    // Algorithm from http://howardhinnant.github.io/date_algorithms.html
412    let z = total_days + 719468;
413    let era = z / 146097;
414    let doe = z - era * 146097;
415    let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
416    let y = yoe + era * 400;
417    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
418    let mp = (5 * doy + 2) / 153;
419    let d = doy - (153 * mp + 2) / 5 + 1;
420    let m = if mp < 10 { mp + 3 } else { mp - 9 };
421    let y = if m <= 2 { y + 1 } else { y };
422    (y, m, d)
423}
424
425/// Format a command argument list for logging, applying redaction for any
426/// `openjd_redacted_env:` tokens that may appear in the arguments.
427pub(crate) fn format_command_for_log(args: &[String]) -> String {
428    let joined =
429        shlex::try_join(args.iter().map(|s| s.as_str())).unwrap_or_else(|_| args.join(" "));
430    crate::action_filter::redact_openjd_redacted_env_requests(&joined)
431}
432
433/// Run a subprocess asynchronously with real-time stdout streaming through an ActionFilter.
434///
435/// Spawns the process with merged environment variables, streams stdout line-by-line
436/// through the ActionFilter, supports cancellation and timeout, uses process groups
437/// (setsid) for proper signal delivery, and handles stdout grace time for detached
438/// grandchild processes.
439///
440/// Parsed `ActionMessage` values are sent through `message_tx` in real-time as
441/// stdout lines are processed.
442pub async fn run_subprocess(
443    config: SubprocessConfig,
444    filter: &mut ActionFilter,
445    session_id: &str,
446    message_tx: mpsc::UnboundedSender<ActionMessage>,
447    cancel_token: CancellationToken,
448) -> Result<SubprocessResult, SessionError> {
449    let args = &config.args;
450    if args.is_empty() {
451        return Err(SessionError::Runtime("No command specified".into()));
452    }
453
454    // Cross-user execution must go through the helper binary.
455    if config.user.as_deref().is_some_and(|u| !u.is_process_user()) {
456        return Err(SessionError::Runtime(
457            "Cross-user subprocess execution requires the helper binary. \
458             Use run_via_helper instead of run_subprocess for cross-user actions."
459                .into(),
460        ));
461    }
462
463    // Build merged environment
464    let mut merged: HashMap<String, String> = std::env::vars().collect();
465    for (k, v) in &config.env_vars {
466        match v {
467            Some(val) => {
468                merged.insert(k.clone(), val.clone());
469            }
470            None => {
471                merged.remove(k);
472            }
473        }
474    }
475
476    // Log the command line (redacting any openjd_redacted_env tokens)
477    session_log!(
478        info,
479        session_id,
480        LogContent::FILE_PATH | LogContent::PROCESS_CONTROL,
481        "Running command {}",
482        format_command_for_log(args)
483    );
484
485    // Spawn the process via tokio::process::Command (same-user only;
486    // cross-user was rejected above).
487    #[cfg(windows)]
488    let win32_process_handle: Option<windows::Win32::Foundation::HANDLE> = None;
489
490    #[allow(unused_mut)]
491    let (mut child, pid, stdout_for_reading): (
492        Option<tokio::process::Child>,
493        i32,
494        Option<Box<dyn tokio::io::AsyncRead + Unpin + Send>>,
495    ) = {
496        let mut cmd = Command::new(&args[0]);
497        cmd.args(&args[1..]);
498        cmd.env_clear();
499        for (k, v) in &merged {
500            cmd.env(k, v);
501        }
502        if let Some(dir) = &config.working_dir {
503            cmd.current_dir(dir);
504        }
505        let merged_reader = unsafe { configure_command(&mut cmd, true) };
506        if merged_reader.is_none() {
507            cmd.stdout(std::process::Stdio::piped());
508        }
509        let mut c = cmd.spawn().map_err(|e| {
510            session_log!(
511                info,
512                session_id,
513                LogContent::EXCEPTION_INFO | LogContent::PROCESS_CONTROL,
514                "Process failed to start: '{}': {}",
515                args[0],
516                e
517            );
518            SessionError::SubprocessStart {
519                command: args[0].clone(),
520                source: e,
521            }
522        })?;
523        let p = c.id().unwrap_or(0) as i32;
524        let stdout = merged_reader.or_else(|| {
525            c.stdout
526                .take()
527                .map(|s| Box::new(s) as Box<dyn tokio::io::AsyncRead + Unpin + Send>)
528        });
529        (Some(c), p, stdout)
530    };
531
532    session_log!(
533        info,
534        session_id,
535        LogContent::PROCESS_CONTROL,
536        "Command started as pid: {}",
537        pid
538    );
539    session_log!(
540        info,
541        session_id,
542        LogContent::BANNER | LogContent::COMMAND_OUTPUT,
543        "Output:"
544    );
545
546    // Read merged stdout+stderr from the child
547    let mut cancel_requested = false;
548    let mut timed_out = false;
549    // True once we've issued `send_terminate` on the process tree from
550    // inside the stdout read loop (timeout path, urgent cancel, or
551    // `CancelMethod::Terminate`). `send_terminate` is the crate's
552    // platform-agnostic "kill now" — SIGKILL on Unix, `TerminateProcess`
553    // via `kill_process_tree` on Windows.
554    //
555    // Stays false for `NotifyThenTerminate` cancel — there the terminate
556    // is only scheduled (via `spawn_delayed_terminate`), so the process
557    // may still be winding down gracefully in response to the notify
558    // signal when the loop exits and needs the longer `STDOUT_GRACE_TIME`
559    // on the final `c.wait()`.
560    let mut terminate_sent = false;
561    let mut stdout_collected = String::new();
562    let mut saw_fail = false;
563
564    if let Some(stdout) = stdout_for_reading {
565        let mut reader = BufReader::new(stdout);
566        let mut line_buf = Vec::new();
567
568        // Create timeout future once, pin it for reuse across loop iterations
569        let timeout_fut = async {
570            match config.timeout {
571                Some(d) => tokio::time::sleep(d).await,
572                None => std::future::pending().await,
573            }
574        };
575        tokio::pin!(timeout_fut);
576
577        // Grace period for stdout to drain after process termination.
578        // On Windows, killed processes (especially MSYS2 sh.exe) may leave
579        // inherited pipe handles open in orphaned child processes, preventing
580        // EOF. This deadline ensures we don't hang forever.
581        let drain_deadline = tokio::time::sleep(Duration::MAX);
582        tokio::pin!(drain_deadline);
583
584        loop {
585            tokio::select! {
586                biased;
587
588                _ = &mut drain_deadline, if cancel_requested => {
589                    session_log!(info, session_id, LogContent::PROCESS_CONTROL,
590                        "Stdout drain grace period expired, stopping read loop");
591                    break;
592                }
593
594                _ = cancel_token.cancelled(), if !cancel_requested => {
595                    cancel_requested = true;
596                    drain_deadline.as_mut().reset(tokio::time::Instant::now() + STDOUT_DRAIN_AFTER_KILL);
597                    let time_limit = config.cancel_request_rx.as_ref()
598                        .and_then(|rx| *rx.borrow());
599
600                    match (&config.cancel_method, time_limit) {
601                        (_, Some(limit)) if limit.is_zero() => {
602                            session_log!(info, session_id, LogContent::PROCESS_CONTROL, "Urgent cancel (time_limit=0), sending SIGKILL to process group {}", pid);
603                            send_terminate(pid);
604                            terminate_sent = true;
605                        }
606                        (CancelMethod::Terminate, _) => {
607                            session_log!(info, session_id, LogContent::PROCESS_CONTROL, "Sending SIGKILL to process group {}", pid);
608                            send_terminate(pid);
609                            terminate_sent = true;
610                        }
611                        (CancelMethod::NotifyThenTerminate { terminate_delay }, _) => {
612                            let delay = match time_limit {
613                                Some(limit) => limit.min(*terminate_delay),
614                                None => *terminate_delay,
615                            };
616                            if let Some(dir) = &config.working_dir {
617                                write_cancel_info(dir, delay);
618                            }
619                            session_log!(info, session_id, LogContent::PROCESS_CONTROL, "Sending SIGTERM to process group {} (grace period: {:?})", pid, delay);
620                            send_notify(pid);
621                            spawn_delayed_terminate(pid, delay);
622                            // Deliberately do NOT set terminate_sent here —
623                            // the terminate is only scheduled (via
624                            // `spawn_delayed_terminate`), not yet delivered.
625                            // The process may still be exiting gracefully in
626                            // response to the notify signal, and the final
627                            // `c.wait()` should get the longer
628                            // `STDOUT_GRACE_TIME`.
629                        }
630                    }
631                }
632
633                _ = &mut timeout_fut, if !cancel_requested && !timed_out => {
634                    timed_out = true;
635                    cancel_requested = true;
636                    drain_deadline.as_mut().reset(tokio::time::Instant::now() + STDOUT_DRAIN_AFTER_KILL);
637                    session_log!(info, session_id, LogContent::PROCESS_CONTROL, "Action timed out, sending SIGKILL to process group");
638                    send_terminate(pid);
639                    terminate_sent = true;
640                }
641
642                n = reader.read_until(b'\n', &mut line_buf) => {
643                    match n {
644                        Ok(0) => break, // EOF
645                        Ok(_) => {
646                            // Strip trailing newline (and \r on Windows)
647                            if line_buf.last() == Some(&b'\n') {
648                                line_buf.pop();
649                            }
650                            if line_buf.last() == Some(&b'\r') {
651                                line_buf.pop();
652                            }
653                            let line = String::from_utf8_lossy(&line_buf);
654                            let line = truncate_line(&line).to_string();
655                            line_buf.clear();
656                            let (display, pass_through) = process_line(&line, filter, session_id, &message_tx, &mut saw_fail);
657                            if pass_through && filter.min_log_level() <= 20 {
658                                session_log!(info, session_id, LogContent::COMMAND_OUTPUT, "{}", display);
659                            }
660                            if config.debug_collect_stdout {
661                                stdout_collected.push_str(&display);
662                                stdout_collected.push('\n');
663                            }
664                        }
665                        Err(_) => break,
666                    }
667                }
668            }
669        }
670    }
671
672    // Wait for process to exit
673    let exit_status = if let Some(ref mut c) = child {
674        // If we already issued `send_terminate` from inside the read loop
675        // (timeout, urgent cancel, or Terminate cancel), the child should
676        // be dead and `c.wait()` just needs to reap it — a short bound
677        // is plenty. Otherwise give the full 5s to accommodate graceful
678        // shutdown (natural EOF or `NotifyThenTerminate`).
679        let grace = if terminate_sent {
680            STDOUT_GRACE_TIME_POST_TERMINATE
681        } else {
682            STDOUT_GRACE_TIME
683        };
684        match tokio::time::timeout(grace, c.wait()).await {
685            Ok(Ok(s)) => Some(s),
686            Ok(Err(_)) => {
687                send_terminate(pid);
688                None
689            }
690            Err(_) => {
691                send_terminate(pid);
692                c.wait().await.ok()
693            }
694        }
695    } else {
696        // Windows cross-user: wait on the raw process handle
697        #[cfg(windows)]
698        {
699            win32_process_handle.map(|h| {
700                use std::os::windows::process::ExitStatusExt;
701                use windows::Win32::System::Threading::{GetExitCodeProcess, WaitForSingleObject};
702                unsafe {
703                    let _ = WaitForSingleObject(h, 60000);
704                    let mut code = 0u32;
705                    let _ = GetExitCodeProcess(h, &mut code);
706                    let _ = windows::Win32::Foundation::CloseHandle(h);
707                    std::process::ExitStatus::from_raw(code)
708                }
709            })
710        }
711        #[cfg(not(windows))]
712        {
713            None
714        }
715    };
716
717    let exit_code = exit_status.and_then(|s| s.code());
718    session_log!(
719        info,
720        session_id,
721        LogContent::PROCESS_CONTROL,
722        "Process exit code: {}",
723        exit_code.map_or("N/A".to_string(), |c| c.to_string())
724    );
725
726    let state = if timed_out {
727        ActionState::Timeout
728    } else if cancel_requested || cancel_token.is_cancelled() {
729        ActionState::Canceled
730    } else if saw_fail {
731        ActionState::Failed
732    } else if exit_status.is_some_and(|s| s.success()) {
733        ActionState::Success
734    } else {
735        ActionState::Failed
736    };
737
738    Ok(SubprocessResult {
739        state,
740        exit_code,
741        stdout: stdout_collected,
742    })
743}
744
745pub(crate) fn process_line(
746    line: &str,
747    filter: &mut ActionFilter,
748    session_id: &str,
749    message_tx: &mpsc::UnboundedSender<ActionMessage>,
750    saw_fail: &mut bool,
751) -> (String, bool) {
752    let (callbacks, pass_through, display) = filter.filter_message(line, session_id);
753    for cb in callbacks {
754        let cancel = cb.cancel;
755        let msg = match cb.kind {
756            ActionMessageKind::Progress => {
757                if let ActionMessageValue::Float(v) = cb.value {
758                    Some(ActionMessage::Progress(v))
759                } else {
760                    None
761                }
762            }
763            ActionMessageKind::Status => {
764                if let ActionMessageValue::String(s) = cb.value {
765                    Some(ActionMessage::Status(s))
766                } else {
767                    None
768                }
769            }
770            ActionMessageKind::Fail => {
771                if let ActionMessageValue::String(s) = cb.value {
772                    *saw_fail = true;
773                    Some(ActionMessage::Fail(s))
774                } else {
775                    None
776                }
777            }
778            ActionMessageKind::Env => {
779                if let ActionMessageValue::EnvVar { name, value } = cb.value {
780                    Some(ActionMessage::SetEnv { name, value })
781                } else {
782                    None
783                }
784            }
785            ActionMessageKind::UnsetEnv => {
786                if let ActionMessageValue::String(name) = cb.value {
787                    Some(ActionMessage::UnsetEnv { name })
788                } else {
789                    None
790                }
791            }
792            ActionMessageKind::RedactedEnv => {
793                if let ActionMessageValue::EnvVar { name, value } = cb.value {
794                    Some(ActionMessage::RedactedEnv { name, value })
795                } else {
796                    None
797                }
798            }
799            _ => None,
800        };
801        if let Some(msg) = msg {
802            let _ = message_tx.send(msg);
803        }
804        if cancel {
805            let fail_msg = "Action canceled due to malformed command".to_string();
806            let _ = message_tx.send(ActionMessage::CancelMarkFailed {
807                fail_message: fail_msg,
808            });
809        }
810    }
811    (display, pass_through)
812}
813
814#[cfg(test)]
815mod tests {
816    #[allow(unused_imports)]
817    use super::*;
818
819    #[cfg(unix)]
820    #[tokio::test]
821    async fn test_cancel_ntt_with_zero_time_limit_is_immediate() {
822        use tokio_util::sync::CancellationToken;
823
824        let token = CancellationToken::new();
825        let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
826        let (msg_tx, _msg_rx) = tokio::sync::mpsc::unbounded_channel();
827
828        let config = SubprocessConfig {
829            args: vec!["sleep".into(), "30".into()],
830            env_vars: HashMap::new(),
831            working_dir: None,
832            timeout: None,
833            user: None,
834            cancel_method: CancelMethod::NotifyThenTerminate {
835                terminate_delay: Duration::from_secs(60),
836            },
837            cancel_request_rx: Some(cancel_rx),
838            debug_collect_stdout: false,
839        };
840
841        let t = token.clone();
842        tokio::spawn(async move {
843            tokio::time::sleep(Duration::from_millis(200)).await;
844            let _ = _cancel_tx.send(Some(Duration::ZERO));
845            t.cancel();
846        });
847
848        let mut filter = crate::action_filter::ActionFilter::new("test", true, false);
849        let start = std::time::Instant::now();
850        let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
851            .await
852            .unwrap();
853        let elapsed = start.elapsed();
854
855        assert_eq!(result.state, ActionState::Canceled);
856        assert!(
857            elapsed < Duration::from_secs(5),
858            "took {:?}, expected < 5s",
859            elapsed
860        );
861    }
862
863    #[cfg(unix)]
864    #[tokio::test]
865    async fn test_cancel_ntt_without_time_limit_uses_default() {
866        use tokio_util::sync::CancellationToken;
867
868        let token = CancellationToken::new();
869        let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
870        let (msg_tx, _msg_rx) = tokio::sync::mpsc::unbounded_channel();
871
872        // Use python3 to reliably ignore SIGTERM in a single process. Using
873        // `sh -c 'trap "" TERM; sleep 30'` is fragile: some sh implementations
874        // install a userspace handler for the trap instead of SIG_IGN, so the
875        // SIG_IGN inheritance rule does not protect the child `sleep`, and
876        // killpg(SIGTERM) kills `sleep` — terminating the script within ms
877        // rather than waiting for the 1s SIGKILL.
878        //
879        // The script writes a sentinel file after installing the handler so
880        // the test can wait for readiness before canceling — avoiding a race
881        // where SIGTERM arrives before SIG_IGN is installed.
882        let ready_dir = tempfile::tempdir().unwrap();
883        let ready_path = ready_dir.path().join("ready");
884        let py_script = format!(
885            "import signal, time, pathlib; signal.signal(signal.SIGTERM, signal.SIG_IGN); pathlib.Path('{}').write_text('ok'); time.sleep(30)",
886            ready_path.display()
887        );
888        let config = SubprocessConfig {
889            args: vec!["python3".into(), "-c".into(), py_script],
890            env_vars: HashMap::new(),
891            working_dir: None,
892            timeout: None,
893            user: None,
894            cancel_method: CancelMethod::NotifyThenTerminate {
895                terminate_delay: Duration::from_secs(1),
896            },
897            cancel_request_rx: Some(cancel_rx),
898            debug_collect_stdout: false,
899        };
900
901        let t = token.clone();
902        let ready_path_clone = ready_path.clone();
903        tokio::spawn(async move {
904            // Wait for the python process to install its SIGTERM handler
905            for _ in 0..100 {
906                if ready_path_clone.exists() {
907                    break;
908                }
909                tokio::time::sleep(Duration::from_millis(50)).await;
910            }
911            t.cancel();
912        });
913
914        let mut filter = crate::action_filter::ActionFilter::new("test", true, false);
915        let start = std::time::Instant::now();
916        let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
917            .await
918            .unwrap();
919        let elapsed = start.elapsed();
920
921        assert_eq!(result.state, ActionState::Canceled);
922        // After cancel, the 1s terminate_delay must elapse before SIGKILL
923        assert!(
924            elapsed >= Duration::from_millis(800),
925            "took {:?}, expected >= 800ms",
926            elapsed
927        );
928        assert!(
929            elapsed < Duration::from_secs(10),
930            "took {:?}, expected < 10s",
931            elapsed
932        );
933    }
934
935    #[cfg(unix)]
936    #[tokio::test]
937    async fn test_cancel_terminate_ignores_time_limit() {
938        use tokio_util::sync::CancellationToken;
939
940        let token = CancellationToken::new();
941        let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
942        let (msg_tx, _msg_rx) = tokio::sync::mpsc::unbounded_channel();
943
944        let config = SubprocessConfig {
945            args: vec!["sleep".into(), "30".into()],
946            env_vars: HashMap::new(),
947            working_dir: None,
948            timeout: None,
949            user: None,
950            cancel_method: CancelMethod::Terminate,
951            cancel_request_rx: Some(cancel_rx),
952            debug_collect_stdout: false,
953        };
954
955        let t = token.clone();
956        tokio::spawn(async move {
957            tokio::time::sleep(Duration::from_millis(200)).await;
958            let _ = _cancel_tx.send(Some(Duration::from_secs(10)));
959            t.cancel();
960        });
961
962        let mut filter = crate::action_filter::ActionFilter::new("test", true, false);
963        let start = std::time::Instant::now();
964        let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
965            .await
966            .unwrap();
967        let elapsed = start.elapsed();
968
969        assert_eq!(result.state, ActionState::Canceled);
970        assert!(
971            elapsed < Duration::from_secs(2),
972            "took {:?}, expected < 2s",
973            elapsed
974        );
975    }
976
977    #[cfg(windows)]
978    #[tokio::test]
979    async fn test_cancel_terminate_on_windows() {
980        use tokio_util::sync::CancellationToken;
981
982        let token = CancellationToken::new();
983        let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(None);
984        let (msg_tx, _msg_rx) = tokio::sync::mpsc::unbounded_channel();
985
986        // Use powershell sleep which is a real process (not a shell builtin)
987        let config = SubprocessConfig {
988            args: vec![
989                "powershell".into(),
990                "-Command".into(),
991                "Start-Sleep 30".into(),
992            ],
993            env_vars: HashMap::new(),
994            working_dir: None,
995            timeout: None,
996            user: None,
997            cancel_method: CancelMethod::Terminate,
998            cancel_request_rx: Some(cancel_rx),
999            debug_collect_stdout: false,
1000        };
1001
1002        let t = token.clone();
1003        tokio::spawn(async move {
1004            tokio::time::sleep(Duration::from_millis(500)).await;
1005            t.cancel();
1006        });
1007
1008        let mut filter = crate::action_filter::ActionFilter::new("test", true, false);
1009        let start = std::time::Instant::now();
1010        let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
1011            .await
1012            .unwrap();
1013        let elapsed = start.elapsed();
1014
1015        assert_eq!(result.state, ActionState::Canceled);
1016        assert!(
1017            elapsed < Duration::from_secs(5),
1018            "Cancel took {:?}, expected < 5s — process was not killed promptly",
1019            elapsed
1020        );
1021    }
1022
1023    #[test]
1024    fn test_format_command_for_log_simple() {
1025        let args = vec!["echo".to_string(), "hello".to_string(), "world".to_string()];
1026        let result = format_command_for_log(&args);
1027        assert_eq!(result, "echo hello world");
1028    }
1029
1030    #[test]
1031    fn test_format_command_for_log_with_spaces() {
1032        let args = vec!["echo".to_string(), "hello world".to_string()];
1033        let result = format_command_for_log(&args);
1034        // Should be shell-quoted
1035        assert!(result.contains("hello world"), "got: {result}");
1036    }
1037
1038    #[test]
1039    fn test_format_command_for_log_redacts_secret() {
1040        let args = vec![
1041            "python".to_string(),
1042            "-c".to_string(),
1043            "print('openjd_redacted_env: PASSWORD=secret123')".to_string(),
1044        ];
1045        let result = format_command_for_log(&args);
1046        assert!(!result.contains("secret123"), "secret leaked in: {result}");
1047        assert!(
1048            result.contains("openjd_redacted_env:"),
1049            "token missing in: {result}"
1050        );
1051        assert!(
1052            result.contains("********"),
1053            "redaction missing in: {result}"
1054        );
1055    }
1056
1057    #[test]
1058    fn test_format_command_for_log_no_redaction_needed() {
1059        let args = vec![
1060            "python".to_string(),
1061            "-c".to_string(),
1062            "print('hello')".to_string(),
1063        ];
1064        let result = format_command_for_log(&args);
1065        assert!(
1066            result.contains("print('hello')") || result.contains("print"),
1067            "got: {result}"
1068        );
1069        assert!(!result.contains("********"));
1070    }
1071
1072    // ── Tier 1: Pure function tests ──────────────────────────────────
1073
1074    #[test]
1075    fn test_days_to_ymd_epoch() {
1076        assert_eq!(days_to_ymd(0), (1970, 1, 1));
1077    }
1078
1079    #[test]
1080    fn test_days_to_ymd_known_date() {
1081        // 2024-02-29 is a leap day. Days since epoch = 19782
1082        assert_eq!(days_to_ymd(19782), (2024, 2, 29));
1083    }
1084
1085    #[test]
1086    fn test_days_to_ymd_end_of_year() {
1087        // 2023-12-31 = day 19722
1088        assert_eq!(days_to_ymd(19722), (2023, 12, 31));
1089    }
1090
1091    #[test]
1092    fn test_days_to_ymd_y2k() {
1093        // 2000-01-01 = day 10957
1094        assert_eq!(days_to_ymd(10957), (2000, 1, 1));
1095    }
1096
1097    #[test]
1098    fn test_write_cancel_info_creates_file() {
1099        let dir = tempfile::tempdir().unwrap();
1100        write_cancel_info(dir.path(), Duration::from_secs(30));
1101        let path = dir.path().join("cancel_info.json");
1102        assert!(path.exists());
1103        let content: serde_json::Value =
1104            serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
1105        let ts = content["NotifyEnd"].as_str().unwrap();
1106        assert!(ts.ends_with('Z'), "Expected UTC timestamp, got: {ts}");
1107        assert!(ts.contains('T'), "Expected ISO 8601, got: {ts}");
1108    }
1109
1110    #[test]
1111    fn test_process_line_plain_text() {
1112        let (tx, _rx) = mpsc::unbounded_channel();
1113        let mut filter = ActionFilter::new("test", true, false);
1114        let mut saw_fail = false;
1115        let (display, pass_through) =
1116            process_line("hello world", &mut filter, "test", &tx, &mut saw_fail);
1117        assert!(pass_through);
1118        assert_eq!(display, "hello world");
1119        assert!(!saw_fail);
1120    }
1121
1122    #[test]
1123    fn test_process_line_progress() {
1124        let (tx, mut rx) = mpsc::unbounded_channel();
1125        let mut filter = ActionFilter::new("test", true, false);
1126        let mut saw_fail = false;
1127        let (_display, _pass_through) = process_line(
1128            "openjd_progress: 0.5",
1129            &mut filter,
1130            "test",
1131            &tx,
1132            &mut saw_fail,
1133        );
1134        assert!(!saw_fail);
1135        match rx.try_recv().unwrap() {
1136            ActionMessage::Progress(v) => assert!((v - 0.5).abs() < f64::EPSILON),
1137            other => panic!("Expected Progress, got: {other:?}"),
1138        }
1139    }
1140
1141    #[test]
1142    fn test_process_line_status() {
1143        let (tx, mut rx) = mpsc::unbounded_channel();
1144        let mut filter = ActionFilter::new("test", true, false);
1145        let mut saw_fail = false;
1146        process_line(
1147            "openjd_status: rendering frame 42",
1148            &mut filter,
1149            "test",
1150            &tx,
1151            &mut saw_fail,
1152        );
1153        assert!(!saw_fail);
1154        match rx.try_recv().unwrap() {
1155            ActionMessage::Status(s) => assert_eq!(s, "rendering frame 42"),
1156            other => panic!("Expected Status, got: {other:?}"),
1157        }
1158    }
1159
1160    #[test]
1161    fn test_process_line_fail() {
1162        let (tx, mut rx) = mpsc::unbounded_channel();
1163        let mut filter = ActionFilter::new("test", true, false);
1164        let mut saw_fail = false;
1165        process_line(
1166            "openjd_fail: out of memory",
1167            &mut filter,
1168            "test",
1169            &tx,
1170            &mut saw_fail,
1171        );
1172        assert!(saw_fail);
1173        match rx.try_recv().unwrap() {
1174            ActionMessage::Fail(s) => assert_eq!(s, "out of memory"),
1175            other => panic!("Expected Fail, got: {other:?}"),
1176        }
1177    }
1178
1179    #[test]
1180    fn test_process_line_env() {
1181        let (tx, mut rx) = mpsc::unbounded_channel();
1182        let mut filter = ActionFilter::new("test", true, false);
1183        let mut saw_fail = false;
1184        process_line(
1185            "openjd_env: MY_VAR=my_value",
1186            &mut filter,
1187            "test",
1188            &tx,
1189            &mut saw_fail,
1190        );
1191        match rx.try_recv().unwrap() {
1192            ActionMessage::SetEnv { name, value } => {
1193                assert_eq!(name, "MY_VAR");
1194                assert_eq!(value, "my_value");
1195            }
1196            other => panic!("Expected SetEnv, got: {other:?}"),
1197        }
1198    }
1199
1200    #[test]
1201    fn test_process_line_unset_env() {
1202        let (tx, mut rx) = mpsc::unbounded_channel();
1203        let mut filter = ActionFilter::new("test", true, false);
1204        let mut saw_fail = false;
1205        process_line(
1206            "openjd_unset_env: MY_VAR",
1207            &mut filter,
1208            "test",
1209            &tx,
1210            &mut saw_fail,
1211        );
1212        match rx.try_recv().unwrap() {
1213            ActionMessage::UnsetEnv { name } => assert_eq!(name, "MY_VAR"),
1214            other => panic!("Expected UnsetEnv, got: {other:?}"),
1215        }
1216    }
1217
1218    #[test]
1219    fn test_process_line_redacted_env() {
1220        let (tx, mut rx) = mpsc::unbounded_channel();
1221        let mut filter = ActionFilter::new("test", true, false);
1222        let mut saw_fail = false;
1223        process_line(
1224            "openjd_redacted_env: SECRET=hunter2",
1225            &mut filter,
1226            "test",
1227            &tx,
1228            &mut saw_fail,
1229        );
1230        match rx.try_recv().unwrap() {
1231            ActionMessage::RedactedEnv { name, value } => {
1232                assert_eq!(name, "SECRET");
1233                assert_eq!(value, "hunter2");
1234            }
1235            other => panic!("Expected RedactedEnv, got: {other:?}"),
1236        }
1237    }
1238
1239    // ── Tier 2: Same-user integration tests ──────────────────────────
1240
1241    #[cfg(unix)]
1242    fn run_simple(args: Vec<String>) -> (SubprocessResult, Vec<ActionMessage>) {
1243        run_with_config(SubprocessConfig {
1244            args,
1245            env_vars: HashMap::new(),
1246            working_dir: None,
1247            timeout: None,
1248            user: None,
1249            cancel_method: CancelMethod::Terminate,
1250            cancel_request_rx: None,
1251            debug_collect_stdout: true,
1252        })
1253    }
1254
1255    #[cfg(unix)]
1256    fn run_with_config(config: SubprocessConfig) -> (SubprocessResult, Vec<ActionMessage>) {
1257        let rt = tokio::runtime::Builder::new_current_thread()
1258            .enable_all()
1259            .build()
1260            .unwrap();
1261        rt.block_on(async {
1262            let (msg_tx, mut msg_rx) = mpsc::unbounded_channel();
1263            let mut filter = ActionFilter::new("test", true, false);
1264            let token = CancellationToken::new();
1265            let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
1266                .await
1267                .unwrap();
1268            let mut msgs = Vec::new();
1269            while let Ok(m) = msg_rx.try_recv() {
1270                msgs.push(m);
1271            }
1272            (result, msgs)
1273        })
1274    }
1275
1276    #[cfg(unix)]
1277    #[test]
1278    fn test_run_subprocess_success() {
1279        let (r, _) = run_simple(vec!["echo".into(), "hello".into()]);
1280        assert_eq!(r.state, ActionState::Success);
1281        assert_eq!(r.exit_code, Some(0));
1282        assert!(r.stdout.contains("hello"), "stdout: {}", r.stdout);
1283    }
1284
1285    #[cfg(unix)]
1286    #[test]
1287    fn test_run_subprocess_failure_exit_code() {
1288        let (r, _) = run_simple(vec!["sh".into(), "-c".into(), "exit 42".into()]);
1289        assert_eq!(r.state, ActionState::Failed);
1290        assert_eq!(r.exit_code, Some(42));
1291    }
1292
1293    #[cfg(unix)]
1294    #[test]
1295    fn test_run_subprocess_command_not_found() {
1296        let rt = tokio::runtime::Builder::new_current_thread()
1297            .enable_all()
1298            .build()
1299            .unwrap();
1300        let err = rt.block_on(async {
1301            let (msg_tx, _) = mpsc::unbounded_channel();
1302            let mut filter = ActionFilter::new("test", true, false);
1303            let token = CancellationToken::new();
1304            let config = SubprocessConfig {
1305                args: vec!["/nonexistent/binary_xyz".into()],
1306                env_vars: HashMap::new(),
1307                working_dir: None,
1308                timeout: None,
1309                user: None,
1310                cancel_method: CancelMethod::Terminate,
1311                cancel_request_rx: None,
1312                debug_collect_stdout: false,
1313            };
1314            run_subprocess(config, &mut filter, "test", msg_tx, token).await
1315        });
1316        assert!(err.is_err());
1317        let msg = err.unwrap_err().to_string();
1318        assert!(msg.contains("/nonexistent/binary_xyz"), "error: {msg}");
1319    }
1320
1321    #[cfg(unix)]
1322    #[test]
1323    fn test_run_subprocess_empty_args() {
1324        let rt = tokio::runtime::Builder::new_current_thread()
1325            .enable_all()
1326            .build()
1327            .unwrap();
1328        let err = rt.block_on(async {
1329            let (msg_tx, _) = mpsc::unbounded_channel();
1330            let mut filter = ActionFilter::new("test", true, false);
1331            let token = CancellationToken::new();
1332            let config = SubprocessConfig {
1333                args: vec![],
1334                env_vars: HashMap::new(),
1335                working_dir: None,
1336                timeout: None,
1337                user: None,
1338                cancel_method: CancelMethod::Terminate,
1339                cancel_request_rx: None,
1340                debug_collect_stdout: false,
1341            };
1342            run_subprocess(config, &mut filter, "test", msg_tx, token).await
1343        });
1344        assert!(err.is_err());
1345        assert!(
1346            err.unwrap_err().to_string().contains("No command"),
1347            "expected empty args error"
1348        );
1349    }
1350
1351    #[cfg(unix)]
1352    #[test]
1353    fn test_run_subprocess_timeout() {
1354        let rt = tokio::runtime::Builder::new_current_thread()
1355            .enable_all()
1356            .build()
1357            .unwrap();
1358        let (r, _) = rt.block_on(async {
1359            let (msg_tx, mut msg_rx) = mpsc::unbounded_channel();
1360            let mut filter = ActionFilter::new("test", true, false);
1361            let token = CancellationToken::new();
1362            let config = SubprocessConfig {
1363                args: vec!["sleep".into(), "30".into()],
1364                env_vars: HashMap::new(),
1365                working_dir: None,
1366                timeout: Some(Duration::from_millis(500)),
1367                user: None,
1368                cancel_method: CancelMethod::Terminate,
1369                cancel_request_rx: None,
1370                debug_collect_stdout: false,
1371            };
1372            let r = run_subprocess(config, &mut filter, "test", msg_tx, token)
1373                .await
1374                .unwrap();
1375            let mut msgs = Vec::new();
1376            while let Ok(m) = msg_rx.try_recv() {
1377                msgs.push(m);
1378            }
1379            (r, msgs)
1380        });
1381        assert_eq!(r.state, ActionState::Timeout);
1382    }
1383
1384    #[cfg(unix)]
1385    #[test]
1386    fn test_run_subprocess_timeout_drains_stdout() {
1387        let rt = tokio::runtime::Builder::new_current_thread()
1388            .enable_all()
1389            .build()
1390            .unwrap();
1391        let (r, _) = rt.block_on(async {
1392            let (msg_tx, mut msg_rx) = mpsc::unbounded_channel();
1393            let mut filter = ActionFilter::new("test", true, false);
1394            let token = CancellationToken::new();
1395            let config = SubprocessConfig {
1396                args: vec![
1397                    "sh".into(),
1398                    "-c".into(),
1399                    "echo before_timeout; sleep 30".into(),
1400                ],
1401                env_vars: HashMap::new(),
1402                working_dir: None,
1403                timeout: Some(Duration::from_millis(500)),
1404                user: None,
1405                cancel_method: CancelMethod::Terminate,
1406                cancel_request_rx: None,
1407                debug_collect_stdout: true,
1408            };
1409            let r = run_subprocess(config, &mut filter, "test", msg_tx, token)
1410                .await
1411                .unwrap();
1412            let mut msgs = Vec::new();
1413            while let Ok(m) = msg_rx.try_recv() {
1414                msgs.push(m);
1415            }
1416            (r, msgs)
1417        });
1418        assert_eq!(r.state, ActionState::Timeout);
1419        assert!(
1420            r.stdout.contains("before_timeout"),
1421            "output before timeout should be captured: {:?}",
1422            r.stdout
1423        );
1424    }
1425
1426    #[cfg(unix)]
1427    #[test]
1428    fn test_run_subprocess_env_vars() {
1429        let mut env = HashMap::new();
1430        env.insert("OPENJD_TEST_VAR".into(), Some("test_value_42".into()));
1431        let (r, _) = run_with_config(SubprocessConfig {
1432            args: vec!["sh".into(), "-c".into(), "echo $OPENJD_TEST_VAR".into()],
1433            env_vars: env,
1434            working_dir: None,
1435            timeout: None,
1436            user: None,
1437            cancel_method: CancelMethod::Terminate,
1438            cancel_request_rx: None,
1439            debug_collect_stdout: true,
1440        });
1441        assert_eq!(r.state, ActionState::Success);
1442        assert!(r.stdout.contains("test_value_42"), "stdout: {}", r.stdout);
1443    }
1444
1445    #[cfg(unix)]
1446    #[test]
1447    fn test_run_subprocess_env_var_unset() {
1448        // Set a var then unset it — should not appear in child
1449        std::env::set_var("OPENJD_UNSET_TEST", "should_be_gone");
1450        let mut env = HashMap::new();
1451        env.insert("OPENJD_UNSET_TEST".into(), None);
1452        let (r, _) = run_with_config(SubprocessConfig {
1453            args: vec![
1454                "sh".into(),
1455                "-c".into(),
1456                "echo VAL=${OPENJD_UNSET_TEST:-UNSET}".into(),
1457            ],
1458            env_vars: env,
1459            working_dir: None,
1460            timeout: None,
1461            user: None,
1462            cancel_method: CancelMethod::Terminate,
1463            cancel_request_rx: None,
1464            debug_collect_stdout: true,
1465        });
1466        assert_eq!(r.state, ActionState::Success);
1467        assert!(r.stdout.contains("VAL=UNSET"), "stdout: {}", r.stdout);
1468    }
1469
1470    #[cfg(unix)]
1471    #[test]
1472    fn test_run_subprocess_working_dir() {
1473        let dir = tempfile::tempdir().unwrap();
1474        let (r, _) = run_with_config(SubprocessConfig {
1475            args: vec!["pwd".into()],
1476            env_vars: HashMap::new(),
1477            working_dir: Some(dir.path().to_path_buf()),
1478            timeout: None,
1479            user: None,
1480            cancel_method: CancelMethod::Terminate,
1481            cancel_request_rx: None,
1482            debug_collect_stdout: true,
1483        });
1484        assert_eq!(r.state, ActionState::Success);
1485        // Resolve symlinks for comparison (macOS /tmp -> /private/tmp)
1486        let expected = dir.path().canonicalize().unwrap();
1487        let actual = PathBuf::from(r.stdout.trim()).canonicalize().unwrap();
1488        assert_eq!(actual, expected);
1489    }
1490
1491    #[cfg(unix)]
1492    #[test]
1493    fn test_run_subprocess_openjd_progress() {
1494        let (r, msgs) = run_simple(vec![
1495            "sh".into(),
1496            "-c".into(),
1497            "echo 'openjd_progress: 0.75'".into(),
1498        ]);
1499        assert_eq!(r.state, ActionState::Success);
1500        assert!(
1501            msgs.iter().any(
1502                |m| matches!(m, ActionMessage::Progress(v) if (*v - 0.75).abs() < f64::EPSILON)
1503            ),
1504            "Expected Progress(0.75), got: {msgs:?}"
1505        );
1506    }
1507
1508    #[cfg(unix)]
1509    #[test]
1510    fn test_run_subprocess_openjd_status() {
1511        let (r, msgs) = run_simple(vec![
1512            "sh".into(),
1513            "-c".into(),
1514            "echo 'openjd_status: rendering'".into(),
1515        ]);
1516        assert_eq!(r.state, ActionState::Success);
1517        assert!(
1518            msgs.iter()
1519                .any(|m| matches!(m, ActionMessage::Status(s) if s == "rendering")),
1520            "Expected Status(rendering), got: {msgs:?}"
1521        );
1522    }
1523
1524    #[cfg(unix)]
1525    #[test]
1526    fn test_run_subprocess_openjd_fail_sets_failed() {
1527        let (r, msgs) = run_simple(vec![
1528            "sh".into(),
1529            "-c".into(),
1530            "echo 'openjd_fail: something broke'".into(),
1531        ]);
1532        assert_eq!(
1533            r.state,
1534            ActionState::Failed,
1535            "openjd_fail should cause Failed state even with exit 0"
1536        );
1537        assert!(
1538            msgs.iter()
1539                .any(|m| matches!(m, ActionMessage::Fail(s) if s == "something broke")),
1540            "Expected Fail message, got: {msgs:?}"
1541        );
1542    }
1543
1544    #[cfg(unix)]
1545    #[test]
1546    fn test_run_subprocess_openjd_env() {
1547        let (r, msgs) = run_simple(vec![
1548            "sh".into(),
1549            "-c".into(),
1550            "echo 'openjd_env: FOO=bar'".into(),
1551        ]);
1552        assert_eq!(r.state, ActionState::Success);
1553        assert!(msgs.iter().any(|m| matches!(m, ActionMessage::SetEnv { name, value } if name == "FOO" && value == "bar")),
1554            "Expected SetEnv, got: {msgs:?}");
1555    }
1556
1557    #[cfg(unix)]
1558    #[test]
1559    fn test_run_subprocess_stderr_merged() {
1560        // stderr should be merged into stdout
1561        let (r, _) = run_simple(vec![
1562            "sh".into(),
1563            "-c".into(),
1564            "echo stdout_line; echo stderr_line >&2".into(),
1565        ]);
1566        assert_eq!(r.state, ActionState::Success);
1567        assert!(r.stdout.contains("stdout_line"), "stdout: {}", r.stdout);
1568        assert!(
1569            r.stdout.contains("stderr_line"),
1570            "stderr should be merged into stdout: {}",
1571            r.stdout
1572        );
1573    }
1574
1575    #[cfg(unix)]
1576    #[test]
1577    fn test_run_subprocess_multiline_output() {
1578        let (r, _) = run_simple(vec![
1579            "sh".into(),
1580            "-c".into(),
1581            "echo line1; echo line2; echo line3".into(),
1582        ]);
1583        assert_eq!(r.state, ActionState::Success);
1584        assert!(r.stdout.contains("line1\n"), "stdout: {:?}", r.stdout);
1585        assert!(r.stdout.contains("line2\n"), "stdout: {:?}", r.stdout);
1586        assert!(r.stdout.contains("line3\n"), "stdout: {:?}", r.stdout);
1587    }
1588
1589    #[cfg(unix)]
1590    #[test]
1591    fn test_run_subprocess_debug_collect_stdout_false_by_default() {
1592        let (r, _) = run_simple(vec!["echo".into(), "hello".into()]);
1593        // run_simple sets debug_collect_stdout: true, so stdout is captured
1594        assert!(r.stdout.contains("hello"));
1595
1596        // With debug_collect_stdout: false (default), stdout should be empty
1597        let rt = tokio::runtime::Builder::new_current_thread()
1598            .enable_all()
1599            .build()
1600            .unwrap();
1601        let r = rt.block_on(async {
1602            let (msg_tx, _) = mpsc::unbounded_channel();
1603            let mut filter = ActionFilter::new("test", true, false);
1604            let token = CancellationToken::new();
1605            let config = SubprocessConfig {
1606                args: vec!["echo".into(), "hello".into()],
1607                env_vars: HashMap::new(),
1608                working_dir: None,
1609                timeout: None,
1610                user: None,
1611                cancel_method: CancelMethod::Terminate,
1612                cancel_request_rx: None,
1613                debug_collect_stdout: false,
1614            };
1615            run_subprocess(config, &mut filter, "test", msg_tx, token)
1616                .await
1617                .unwrap()
1618        });
1619        assert_eq!(r.state, ActionState::Success);
1620        assert!(
1621            r.stdout.is_empty(),
1622            "stdout should be empty when debug_collect_stdout is false: {:?}",
1623            r.stdout
1624        );
1625    }
1626
1627    #[test]
1628    fn test_truncate_line_multibyte_boundary() {
1629        // '€' is 3 bytes in UTF-8. LOG_LINE_MAX_LENGTH (65536) % 3 == 1,
1630        // so the byte boundary falls inside a multi-byte character.
1631        let s = "€".repeat(LOG_LINE_MAX_LENGTH); // 3 * LOG_LINE_MAX_LENGTH bytes
1632        let truncated = truncate_line(&s);
1633        assert!(truncated.len() <= LOG_LINE_MAX_LENGTH);
1634        // Must be valid UTF-8 (the fact that we can call .chars() without panic proves it)
1635        assert!(truncated.chars().count() > 0);
1636    }
1637
1638    #[test]
1639    fn test_truncate_line_short_line_unchanged() {
1640        let s = "hello";
1641        assert_eq!(truncate_line(s), "hello");
1642    }
1643
1644    #[cfg(unix)]
1645    #[test]
1646    fn test_run_subprocess_invalid_utf8_continues() {
1647        // printf outputs raw bytes: valid line, then 0xFF (invalid UTF-8), then another valid line.
1648        // All lines including those after invalid bytes must be captured.
1649        let (r, _) = run_simple(vec![
1650            "sh".into(),
1651            "-c".into(),
1652            r#"echo before; printf '\xff\n'; echo after"#.into(),
1653        ]);
1654        assert_eq!(r.state, ActionState::Success);
1655        assert!(
1656            r.stdout.contains("before"),
1657            "line before invalid UTF-8 should be captured: {:?}",
1658            r.stdout
1659        );
1660        assert!(
1661            r.stdout.contains("after"),
1662            "line after invalid UTF-8 should be captured: {:?}",
1663            r.stdout
1664        );
1665    }
1666
1667    #[cfg(unix)]
1668    #[test]
1669    fn test_run_subprocess_progress_error_in_stdout() {
1670        let (r, _) = run_simple(vec![
1671            "sh".into(),
1672            "-c".into(),
1673            "echo 'openjd_progress: 200.0'".into(),
1674        ]);
1675        assert!(
1676            r.stdout.contains("ERROR"),
1677            "out-of-range progress error should appear in stdout: {:?}",
1678            r.stdout
1679        );
1680    }
1681
1682    /// When a process exits with non-zero and the cancel token has been
1683    /// cancelled, the result should be `Canceled` not `Failed`.
1684    ///
1685    /// This covers the pyo3 binding's cancel path where the cross-user helper
1686    /// kills the process (non-zero exit) while the token is cancelled, but the
1687    /// select loop's cancel branch may not have fired (so `cancel_requested`
1688    /// could be false). The `is_cancelled()` check in state determination
1689    /// ensures the correct result regardless of select ordering.
1690    #[cfg(unix)]
1691    #[tokio::test]
1692    async fn test_cancel_token_set_but_process_killed_externally() {
1693        use tokio_util::sync::CancellationToken;
1694
1695        let token = CancellationToken::new();
1696        let (msg_tx, _msg_rx) = tokio::sync::mpsc::unbounded_channel();
1697
1698        // Process that exits immediately with non-zero
1699        let config = SubprocessConfig {
1700            args: vec!["sh".into(), "-c".into(), "exit 42".into()],
1701            env_vars: HashMap::new(),
1702            working_dir: None,
1703            timeout: None,
1704            user: None,
1705            cancel_method: CancelMethod::Terminate,
1706            cancel_request_rx: None,
1707            debug_collect_stdout: false,
1708        };
1709
1710        // Cancel from OS thread — simulates the pyo3 binding's cancel path
1711        let token_clone = token.clone();
1712        std::thread::spawn(move || {
1713            token_clone.cancel();
1714        });
1715
1716        // Small yield to let the OS thread run
1717        tokio::time::sleep(Duration::from_millis(1)).await;
1718
1719        let mut filter = crate::action_filter::ActionFilter::new("test", true, false);
1720        let result = run_subprocess(config, &mut filter, "test", msg_tx, token)
1721            .await
1722            .unwrap();
1723
1724        // The token is cancelled and the process exited non-zero.
1725        // The result must be Canceled, not Failed.
1726        assert_eq!(
1727            result.state,
1728            ActionState::Canceled,
1729            "Non-zero exit with cancelled token should be Canceled, not {:?}",
1730            result.state
1731        );
1732    }
1733}