Skip to main content

running_process_core/
lib.rs

1use std::collections::VecDeque;
2use std::fs::OpenOptions;
3use std::io::Read;
4use std::io::Write;
5use std::path::PathBuf;
6use std::process::{Child, Command, Stdio};
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::thread;
10use std::time::{Duration, Instant};
11
12use thiserror::Error;
13
14pub mod console_detect;
15pub mod containment;
16#[cfg(feature = "originator-scan")]
17pub mod originator;
18pub mod pty;
19mod public_symbols;
20mod rust_debug;
21
22pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
23pub use containment::{ContainedChild, ContainedProcessGroup, Containment, ORIGINATOR_ENV_VAR};
24#[cfg(feature = "originator-scan")]
25pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
26pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
27
28#[macro_export]
29macro_rules! rp_rust_debug_scope {
30    ($label:expr) => {
31        let _running_process_rust_debug_scope =
32            $crate::RustDebugScopeGuard::enter($label, file!(), line!());
33    };
34}
35
36const CHILD_PID_LOG_PATH_ENV: &str = "RUNNING_PROCESS_CHILD_PID_LOG_PATH";
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum StreamKind {
40    Stdout,
41    Stderr,
42}
43
44impl StreamKind {
45    pub fn as_str(self) -> &'static str {
46        match self {
47            Self::Stdout => "stdout",
48            Self::Stderr => "stderr",
49        }
50    }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct StreamEvent {
55    pub stream: StreamKind,
56    pub line: Vec<u8>,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum ReadStatus<T> {
61    Line(T),
62    Timeout,
63    Eof,
64}
65
66#[derive(Debug, Error)]
67pub enum ProcessError {
68    #[error("process already started")]
69    AlreadyStarted,
70    #[error("process is not running")]
71    NotRunning,
72    #[error("process stdin is not available")]
73    StdinUnavailable,
74    #[error("failed to spawn process: {0}")]
75    Spawn(std::io::Error),
76    #[error("failed to read process output: {0}")]
77    Io(std::io::Error),
78    #[error("process timed out")]
79    Timeout,
80}
81
82#[derive(Debug, Clone)]
83pub enum CommandSpec {
84    Shell(String),
85    Argv(Vec<String>),
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum StdinMode {
90    Inherit,
91    Piped,
92    Null,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub enum StderrMode {
97    Stdout,
98    Pipe,
99}
100
101#[cfg(unix)]
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum UnixSignal {
104    Interrupt,
105    Terminate,
106    Kill,
107}
108
109#[derive(Debug, Clone)]
110pub struct ProcessConfig {
111    pub command: CommandSpec,
112    pub cwd: Option<PathBuf>,
113    pub env: Option<Vec<(String, String)>>,
114    pub capture: bool,
115    pub stderr_mode: StderrMode,
116    pub creationflags: Option<u32>,
117    pub create_process_group: bool,
118    pub stdin_mode: StdinMode,
119    pub nice: Option<i32>,
120    /// Optional containment policy. `None` preserves existing behaviour.
121    /// `Some(Contained)` sets `PR_SET_PDEATHSIG(SIGKILL)` on Linux and uses
122    /// the existing Job Object on Windows. `Some(Detached)` creates a new
123    /// session (`setsid`) on Unix so the child survives the parent.
124    pub containment: Option<Containment>,
125}
126
127#[derive(Default)]
128struct QueueState {
129    stdout_queue: VecDeque<Vec<u8>>,
130    stderr_queue: VecDeque<Vec<u8>>,
131    combined_queue: VecDeque<StreamEvent>,
132    stdout_history: VecDeque<Vec<u8>>,
133    stderr_history: VecDeque<Vec<u8>>,
134    combined_history: VecDeque<StreamEvent>,
135    stdout_history_bytes: usize,
136    stderr_history_bytes: usize,
137    combined_history_bytes: usize,
138    stdout_closed: bool,
139    stderr_closed: bool,
140}
141
142/// Sentinel value for returncode atomic: process has not exited yet.
143const RETURNCODE_NOT_SET: i64 = i64::MIN;
144
145struct SharedState {
146    queues: Mutex<QueueState>,
147    condvar: Condvar,
148    /// Atomic exit code. `RETURNCODE_NOT_SET` means "not exited yet".
149    /// Updated by a background waiter thread — reading is lock-free.
150    returncode: AtomicI64,
151}
152
153#[cfg(windows)]
154struct WindowsJobHandle(usize);
155
156#[cfg(windows)]
157impl Drop for WindowsJobHandle {
158    fn drop(&mut self) {
159        unsafe {
160            winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
161        }
162    }
163}
164
165struct ChildState {
166    child: Child,
167    #[cfg(windows)]
168    _job: WindowsJobHandle,
169}
170
171impl SharedState {
172    fn new(capture: bool) -> Self {
173        let queues = QueueState {
174            stdout_closed: !capture,
175            stderr_closed: !capture,
176            ..QueueState::default()
177        };
178        Self {
179            queues: Mutex::new(queues),
180            condvar: Condvar::new(),
181            returncode: AtomicI64::new(RETURNCODE_NOT_SET),
182        }
183    }
184}
185
186pub struct NativeProcess {
187    config: ProcessConfig,
188    child: Arc<Mutex<Option<ChildState>>>,
189    shared: Arc<SharedState>,
190}
191
192impl NativeProcess {
193    pub fn new(config: ProcessConfig) -> Self {
194        Self {
195            shared: Arc::new(SharedState::new(config.capture)),
196            child: Arc::new(Mutex::new(None)),
197            config,
198        }
199    }
200
201    // Preserve a stable Rust frame here in release user dumps.
202    #[inline(never)]
203    pub fn start(&self) -> Result<(), ProcessError> {
204        public_symbols::rp_native_process_start_public(self)
205    }
206
207    fn start_impl(&self) -> Result<(), ProcessError> {
208        crate::rp_rust_debug_scope!("running_process_core::NativeProcess::start");
209        let mut guard = self.child.lock().expect("child mutex poisoned");
210        if guard.is_some() {
211            return Err(ProcessError::AlreadyStarted);
212        }
213
214        let mut command = self.build_command();
215        match self.config.stdin_mode {
216            StdinMode::Inherit => {}
217            StdinMode::Piped => {
218                command.stdin(Stdio::piped());
219            }
220            StdinMode::Null => {
221                command.stdin(Stdio::null());
222            }
223        }
224        if self.config.capture {
225            command.stdout(Stdio::piped());
226            command.stderr(Stdio::piped());
227        }
228
229        let mut child = command.spawn().map_err(ProcessError::Spawn)?;
230        log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
231        #[cfg(windows)]
232        let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
233            .map_err(ProcessError::Spawn)?;
234        if self.config.capture {
235            let stdout = child.stdout.take().expect("stdout pipe missing");
236            let stderr = child.stderr.take().expect("stderr pipe missing");
237            self.spawn_reader(stdout, StreamKind::Stdout, StreamKind::Stdout);
238            self.spawn_reader(
239                stderr,
240                StreamKind::Stderr,
241                match self.config.stderr_mode {
242                    StderrMode::Stdout => StreamKind::Stdout,
243                    StderrMode::Pipe => StreamKind::Stderr,
244                },
245            );
246        }
247        *guard = Some(ChildState {
248            child,
249            #[cfg(windows)]
250            _job: job,
251        });
252        drop(guard);
253        self.spawn_exit_waiter();
254        Ok(())
255    }
256
257    /// Background thread that polls for process exit and stores the exit code
258    /// atomically. This makes `returncode` auto-update without explicit `poll()`.
259    fn spawn_exit_waiter(&self) {
260        let child = Arc::clone(&self.child);
261        let shared = Arc::clone(&self.shared);
262        thread::spawn(move || loop {
263            if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
264                return;
265            }
266            {
267                let mut guard = child.lock().expect("child mutex poisoned");
268                if let Some(child_state) = guard.as_mut() {
269                    match child_state.child.try_wait() {
270                        Ok(Some(status)) => {
271                            let code = exit_code(status);
272                            shared.returncode.store(code as i64, Ordering::Release);
273                            shared.condvar.notify_all();
274                            return;
275                        }
276                        Ok(None) => {}
277                        Err(_) => return,
278                    }
279                } else {
280                    return;
281                }
282            }
283            thread::sleep(Duration::from_millis(10));
284        });
285    }
286
287    pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
288        let mut guard = self.child.lock().expect("child mutex poisoned");
289        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
290        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
291        use std::io::Write;
292        stdin.write_all(data).map_err(ProcessError::Io)?;
293        stdin.flush().map_err(ProcessError::Io)?;
294        drop(child.stdin.take());
295        Ok(())
296    }
297
298    pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
299        // Fast path: check atomic set by background waiter thread.
300        if let Some(code) = self.returncode() {
301            return Ok(Some(code));
302        }
303        let mut guard = self.child.lock().expect("child mutex poisoned");
304        let Some(child_state) = guard.as_mut() else {
305            return Ok(self.returncode());
306        };
307        let child = &mut child_state.child;
308        let status = child.try_wait().map_err(ProcessError::Io)?;
309        if let Some(status) = status {
310            let code = exit_code(status);
311            self.set_returncode(code);
312            return Ok(Some(code));
313        }
314        Ok(None)
315    }
316
317    // Preserve a stable Rust frame here in release user dumps.
318    #[inline(never)]
319    pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
320        public_symbols::rp_native_process_wait_public(self, timeout)
321    }
322
323    fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
324        crate::rp_rust_debug_scope!("running_process_core::NativeProcess::wait");
325        if self.child.lock().expect("child mutex poisoned").is_none() {
326            return self.returncode().ok_or(ProcessError::NotRunning);
327        }
328        // Fast path: already exited.
329        if let Some(code) = self.returncode() {
330            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
331            return Ok(code);
332        }
333        let start = Instant::now();
334        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
335        loop {
336            // Check returncode (set by exit-waiter thread via atomic + condvar).
337            let rc = self.shared.returncode.load(Ordering::Acquire);
338            if rc != RETURNCODE_NOT_SET {
339                drop(guard);
340                let code = rc as i32;
341                public_symbols::rp_native_process_wait_for_capture_completion_public(self);
342                return Ok(code);
343            }
344            if let Some(limit) = timeout {
345                let elapsed = start.elapsed();
346                if elapsed >= limit {
347                    return Err(ProcessError::Timeout);
348                }
349                let remaining = limit - elapsed;
350                // Wait on condvar with timeout, capped at 50ms to recheck.
351                let wait_time = remaining.min(Duration::from_millis(50));
352                guard = self
353                    .shared
354                    .condvar
355                    .wait_timeout(guard, wait_time)
356                    .expect("queue mutex poisoned")
357                    .0;
358            } else {
359                // Wait on condvar with periodic recheck.
360                guard = self
361                    .shared
362                    .condvar
363                    .wait_timeout(guard, Duration::from_millis(50))
364                    .expect("queue mutex poisoned")
365                    .0;
366            }
367        }
368    }
369
370    // Preserve a stable Rust frame here in release user dumps.
371    #[inline(never)]
372    pub fn kill(&self) -> Result<(), ProcessError> {
373        public_symbols::rp_native_process_kill_public(self)
374    }
375
376    fn kill_impl(&self) -> Result<(), ProcessError> {
377        crate::rp_rust_debug_scope!("running_process_core::NativeProcess::kill");
378        let mut guard = self.child.lock().expect("child mutex poisoned");
379        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
380        child.kill().map_err(ProcessError::Io)?;
381        let status = child.wait().map_err(ProcessError::Io)?;
382        self.set_returncode(exit_code(status));
383        Ok(())
384    }
385
386    pub fn terminate(&self) -> Result<(), ProcessError> {
387        self.kill()
388    }
389
390    // Preserve a stable Rust frame here in release user dumps.
391    #[inline(never)]
392    pub fn close(&self) -> Result<(), ProcessError> {
393        public_symbols::rp_native_process_close_public(self)
394    }
395
396    fn close_impl(&self) -> Result<(), ProcessError> {
397        crate::rp_rust_debug_scope!("running_process_core::NativeProcess::close");
398        if self.child.lock().expect("child mutex poisoned").is_none() {
399            return Ok(());
400        }
401        if self.poll()?.is_none() {
402            self.kill()?;
403        } else {
404            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
405        }
406        Ok(())
407    }
408
409    pub fn pid(&self) -> Option<u32> {
410        self.child
411            .lock()
412            .expect("child mutex poisoned")
413            .as_ref()
414            .map(|state| state.child.id())
415    }
416
417    pub fn returncode(&self) -> Option<i32> {
418        let v = self.shared.returncode.load(Ordering::Acquire);
419        if v == RETURNCODE_NOT_SET {
420            None
421        } else {
422            Some(v as i32)
423        }
424    }
425
426    pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
427        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
428            return false;
429        }
430        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
431        match stream {
432            StreamKind::Stdout => !guard.stdout_queue.is_empty(),
433            StreamKind::Stderr => !guard.stderr_queue.is_empty(),
434        }
435    }
436
437    pub fn has_pending_combined(&self) -> bool {
438        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
439        !guard.combined_queue.is_empty()
440    }
441
442    pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
443        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
444            return Vec::new();
445        }
446        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
447        let queue = match stream {
448            StreamKind::Stdout => &mut guard.stdout_queue,
449            StreamKind::Stderr => &mut guard.stderr_queue,
450        };
451        queue.drain(..).collect()
452    }
453
454    pub fn drain_combined(&self) -> Vec<StreamEvent> {
455        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
456        guard.combined_queue.drain(..).collect()
457    }
458
459    pub fn read_stream(
460        &self,
461        stream: StreamKind,
462        timeout: Option<Duration>,
463    ) -> ReadStatus<Vec<u8>> {
464        let deadline = timeout.map(|limit| Instant::now() + limit);
465        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
466
467        loop {
468            if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
469                return ReadStatus::Eof;
470            }
471
472            let queue = match stream {
473                StreamKind::Stdout => &mut guard.stdout_queue,
474                StreamKind::Stderr => &mut guard.stderr_queue,
475            };
476            if let Some(line) = queue.pop_front() {
477                return ReadStatus::Line(line);
478            }
479
480            let closed = match stream {
481                StreamKind::Stdout => {
482                    if self.config.stderr_mode == StderrMode::Stdout {
483                        guard.stdout_closed && guard.stderr_closed
484                    } else {
485                        guard.stdout_closed
486                    }
487                }
488                StreamKind::Stderr => guard.stderr_closed,
489            };
490            if closed {
491                return ReadStatus::Eof;
492            }
493
494            match deadline {
495                Some(deadline) => {
496                    let now = Instant::now();
497                    if now >= deadline {
498                        return ReadStatus::Timeout;
499                    }
500                    let wait = deadline.saturating_duration_since(now);
501                    let result = self
502                        .shared
503                        .condvar
504                        .wait_timeout(guard, wait)
505                        .expect("queue mutex poisoned");
506                    guard = result.0;
507                    if result.1.timed_out() {
508                        return ReadStatus::Timeout;
509                    }
510                }
511                None => {
512                    guard = self
513                        .shared
514                        .condvar
515                        .wait(guard)
516                        .expect("queue mutex poisoned");
517                }
518            }
519        }
520    }
521
522    // Preserve a stable Rust frame here in release user dumps.
523    #[inline(never)]
524    pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
525        public_symbols::rp_native_process_read_combined_public(self, timeout)
526    }
527
528    fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
529        crate::rp_rust_debug_scope!("running_process_core::NativeProcess::read_combined");
530        let deadline = timeout.map(|limit| Instant::now() + limit);
531        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
532
533        loop {
534            if let Some(event) = guard.combined_queue.pop_front() {
535                return ReadStatus::Line(event);
536            }
537            if guard.stdout_closed && guard.stderr_closed {
538                return ReadStatus::Eof;
539            }
540
541            match deadline {
542                Some(deadline) => {
543                    let now = Instant::now();
544                    if now >= deadline {
545                        return ReadStatus::Timeout;
546                    }
547                    let wait = deadline.saturating_duration_since(now);
548                    let result = self
549                        .shared
550                        .condvar
551                        .wait_timeout(guard, wait)
552                        .expect("queue mutex poisoned");
553                    guard = result.0;
554                    if result.1.timed_out() {
555                        return ReadStatus::Timeout;
556                    }
557                }
558                None => {
559                    guard = self
560                        .shared
561                        .condvar
562                        .wait(guard)
563                        .expect("queue mutex poisoned");
564                }
565            }
566        }
567    }
568
569    pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
570        self.shared
571            .queues
572            .lock()
573            .expect("queue mutex poisoned")
574            .stdout_history
575            .clone()
576            .into_iter()
577            .collect()
578    }
579
580    pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
581        if self.config.stderr_mode == StderrMode::Stdout {
582            return Vec::new();
583        }
584        self.shared
585            .queues
586            .lock()
587            .expect("queue mutex poisoned")
588            .stderr_history
589            .clone()
590            .into_iter()
591            .collect()
592    }
593
594    pub fn captured_combined(&self) -> Vec<StreamEvent> {
595        self.shared
596            .queues
597            .lock()
598            .expect("queue mutex poisoned")
599            .combined_history
600            .clone()
601            .into_iter()
602            .collect()
603    }
604
605    pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
606        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
607            return 0;
608        }
609        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
610        match stream {
611            StreamKind::Stdout => guard.stdout_history_bytes,
612            StreamKind::Stderr => guard.stderr_history_bytes,
613        }
614    }
615
616    pub fn captured_combined_bytes(&self) -> usize {
617        self.shared
618            .queues
619            .lock()
620            .expect("queue mutex poisoned")
621            .combined_history_bytes
622    }
623
624    pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
625        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
626            return 0;
627        }
628        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
629        match stream {
630            StreamKind::Stdout => {
631                let released = guard.stdout_history_bytes;
632                guard.stdout_history.clear();
633                guard.stdout_history_bytes = 0;
634                released
635            }
636            StreamKind::Stderr => {
637                let released = guard.stderr_history_bytes;
638                guard.stderr_history.clear();
639                guard.stderr_history_bytes = 0;
640                released
641            }
642        }
643    }
644
645    pub fn clear_captured_combined(&self) -> usize {
646        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
647        let released = guard.combined_history_bytes;
648        guard.combined_history.clear();
649        guard.combined_history_bytes = 0;
650        released
651    }
652
653    fn build_command(&self) -> Command {
654        let mut command = match &self.config.command {
655            CommandSpec::Shell(command) => shell_command(command),
656            CommandSpec::Argv(argv) => {
657                let mut command = Command::new(&argv[0]);
658                if argv.len() > 1 {
659                    command.args(&argv[1..]);
660                }
661                command
662            }
663        };
664        if let Some(cwd) = &self.config.cwd {
665            command.current_dir(cwd);
666        }
667        if let Some(env) = &self.config.env {
668            command.env_clear();
669            command.envs(env.iter().map(|(k, v)| (k, v)));
670        }
671        #[cfg(windows)]
672        {
673            use std::os::windows::process::CommandExt;
674
675            let flags =
676                self.config.creationflags.unwrap_or(0) | windows_priority_flags(self.config.nice);
677            if flags != 0 {
678                command.creation_flags(flags);
679            }
680        }
681        #[cfg(unix)]
682        {
683            let create_process_group = self.config.create_process_group;
684            let nice = self.config.nice;
685            let containment = self.config.containment;
686
687            let needs_pre_exec = create_process_group || nice.is_some() || containment.is_some();
688
689            if needs_pre_exec {
690                use std::os::unix::process::CommandExt;
691
692                unsafe {
693                    command.pre_exec(move || {
694                        match containment {
695                            Some(Containment::Contained) => {
696                                // Place child into its own process group.
697                                if libc::setpgid(0, 0) == -1 {
698                                    return Err(std::io::Error::last_os_error());
699                                }
700                                // Linux: ask the kernel to SIGKILL us when the
701                                // parent thread dies.
702                                // CAVEAT: PR_SET_PDEATHSIG is per-thread, not
703                                // per-process. If the spawning thread exits
704                                // before the process, the child is killed early.
705                                #[cfg(target_os = "linux")]
706                                {
707                                    if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL) == -1 {
708                                        return Err(std::io::Error::last_os_error());
709                                    }
710                                    if libc::getppid() == 1 {
711                                        libc::_exit(1);
712                                    }
713                                }
714                            }
715                            Some(Containment::Detached) => {
716                                // Create a new session so the child is fully
717                                // independent of the parent.
718                                if libc::setsid() == -1 {
719                                    return Err(std::io::Error::last_os_error());
720                                }
721                            }
722                            None => {
723                                if create_process_group && libc::setpgid(0, 0) == -1 {
724                                    return Err(std::io::Error::last_os_error());
725                                }
726                            }
727                        }
728                        if let Some(nice) = nice {
729                            let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
730                            if result == -1 {
731                                return Err(std::io::Error::last_os_error());
732                            }
733                        }
734                        Ok(())
735                    });
736                }
737            }
738        }
739        command
740    }
741
742    fn spawn_reader<R>(&self, pipe: R, source_stream: StreamKind, visible_stream: StreamKind)
743    where
744        R: Read + Send + 'static,
745    {
746        let shared = Arc::clone(&self.shared);
747        thread::spawn(move || {
748            let mut reader = pipe;
749            let mut chunk = vec![0_u8; 65536];
750            let mut pending = Vec::new();
751
752            loop {
753                match reader.read(&mut chunk) {
754                    Ok(0) => break,
755                    Ok(n) => {
756                        let lines = feed_chunk(&mut pending, &chunk[..n]);
757                        emit_lines(&shared, visible_stream, lines);
758                    }
759                    Err(_) => break,
760                }
761            }
762
763            if !pending.is_empty() {
764                emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
765            }
766
767            let mut guard = shared.queues.lock().expect("queue mutex poisoned");
768            match source_stream {
769                StreamKind::Stdout => guard.stdout_closed = true,
770                StreamKind::Stderr => guard.stderr_closed = true,
771            }
772            shared.condvar.notify_all();
773        });
774    }
775
776    fn set_returncode(&self, code: i32) {
777        self.shared.returncode.store(code as i64, Ordering::Release);
778        self.shared.condvar.notify_all();
779    }
780
781    fn wait_for_capture_completion_impl(&self) {
782        crate::rp_rust_debug_scope!(
783            "running_process_core::NativeProcess::wait_for_capture_completion"
784        );
785        if !self.config.capture {
786            return;
787        }
788
789        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
790        while !(guard.stdout_closed && guard.stderr_closed) {
791            guard = self
792                .shared
793                .condvar
794                .wait(guard)
795                .expect("queue mutex poisoned");
796        }
797    }
798}
799
800#[cfg(unix)]
801pub fn unix_set_priority(pid: u32, nice: i32) -> Result<(), std::io::Error> {
802    let result = unsafe { libc::setpriority(libc::PRIO_PROCESS, pid, nice) };
803    if result == -1 {
804        return Err(std::io::Error::last_os_error());
805    }
806    Ok(())
807}
808
809#[cfg(unix)]
810pub fn unix_signal_process(pid: u32, signal: UnixSignal) -> Result<(), std::io::Error> {
811    let result = unsafe { libc::kill(pid as i32, unix_signal_raw(signal)) };
812    if result == -1 {
813        return Err(std::io::Error::last_os_error());
814    }
815    Ok(())
816}
817
818#[cfg(unix)]
819pub fn unix_signal_process_group(pid: i32, signal: UnixSignal) -> Result<(), std::io::Error> {
820    let result = unsafe { libc::killpg(pid, unix_signal_raw(signal)) };
821    if result == -1 {
822        return Err(std::io::Error::last_os_error());
823    }
824    Ok(())
825}
826
827fn log_spawned_child_pid(pid: u32) -> Result<(), std::io::Error> {
828    let Some(path) = std::env::var_os(CHILD_PID_LOG_PATH_ENV) else {
829        return Ok(());
830    };
831
832    let mut file = OpenOptions::new().create(true).append(true).open(path)?;
833    file.write_all(format!("{pid}\n").as_bytes())?;
834    file.flush()?;
835    Ok(())
836}
837
838#[cfg(windows)]
839fn assign_child_to_windows_kill_on_close_job_impl(
840    child: &Child,
841) -> Result<WindowsJobHandle, std::io::Error> {
842    crate::rp_rust_debug_scope!("running_process_core::assign_child_to_windows_kill_on_close_job");
843    use std::mem::zeroed;
844    use std::os::windows::io::AsRawHandle;
845
846    use winapi::shared::minwindef::FALSE;
847    use winapi::um::handleapi::{CloseHandle, INVALID_HANDLE_VALUE};
848    use winapi::um::jobapi2::{
849        AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
850    };
851    use winapi::um::winnt::{
852        JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
853        JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
854    };
855
856    let handle = child.as_raw_handle();
857    let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
858    if job.is_null() || job == INVALID_HANDLE_VALUE {
859        return Err(std::io::Error::last_os_error());
860    }
861
862    let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
863    info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
864    let ok = unsafe {
865        SetInformationJobObject(
866            job,
867            JobObjectExtendedLimitInformation,
868            (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
869            std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
870        )
871    };
872    if ok == FALSE {
873        let err = std::io::Error::last_os_error();
874        unsafe { CloseHandle(job) };
875        return Err(err);
876    }
877
878    let ok = unsafe { AssignProcessToJobObject(job, handle.cast()) };
879    if ok == FALSE {
880        let err = std::io::Error::last_os_error();
881        unsafe { CloseHandle(job) };
882        return Err(err);
883    }
884
885    Ok(WindowsJobHandle(job as usize))
886}
887
888fn feed_chunk(pending: &mut Vec<u8>, chunk: &[u8]) -> Vec<Vec<u8>> {
889    let mut lines = Vec::new();
890    let mut start = 0;
891    let mut index = 0;
892
893    while index < chunk.len() {
894        if chunk[index] == b'\n' {
895            let end = if index > start && chunk[index - 1] == b'\r' {
896                index - 1
897            } else {
898                index
899            };
900            pending.extend_from_slice(&chunk[start..end]);
901            if !pending.is_empty() {
902                lines.push(std::mem::take(pending));
903            }
904            start = index + 1;
905        }
906        index += 1;
907    }
908
909    pending.extend_from_slice(&chunk[start..]);
910    lines
911}
912
913fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
914    if lines.is_empty() {
915        return;
916    }
917    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
918    for line in lines {
919        let line_len = line.len();
920        match stream {
921            StreamKind::Stdout => {
922                guard.stdout_history_bytes += line_len;
923                guard.stdout_history.push_back(line.clone());
924                guard.stdout_queue.push_back(line.clone());
925            }
926            StreamKind::Stderr => {
927                guard.stderr_history_bytes += line_len;
928                guard.stderr_history.push_back(line.clone());
929                guard.stderr_queue.push_back(line.clone());
930            }
931        }
932        let event = StreamEvent { stream, line };
933        guard.combined_history_bytes += line_len;
934        guard.combined_history.push_back(event.clone());
935        guard.combined_queue.push_back(event);
936    }
937    shared.condvar.notify_all();
938}
939
940fn shell_command(command: &str) -> Command {
941    #[cfg(windows)]
942    {
943        use std::os::windows::process::CommandExt;
944
945        let mut cmd = Command::new("cmd");
946        cmd.raw_arg("/D /S /C \"");
947        cmd.raw_arg(command);
948        cmd.raw_arg("\"");
949        cmd
950    }
951    #[cfg(not(windows))]
952    {
953        let mut cmd = Command::new("sh");
954        cmd.arg("-lc").arg(command);
955        cmd
956    }
957}
958
959fn exit_code(status: std::process::ExitStatus) -> i32 {
960    #[cfg(unix)]
961    {
962        use std::os::unix::process::ExitStatusExt;
963        status
964            .code()
965            .unwrap_or_else(|| -status.signal().unwrap_or(1))
966    }
967    #[cfg(not(unix))]
968    {
969        status.code().unwrap_or(1)
970    }
971}
972
973#[cfg(unix)]
974fn unix_signal_raw(signal: UnixSignal) -> i32 {
975    match signal {
976        UnixSignal::Interrupt => libc::SIGINT,
977        UnixSignal::Terminate => libc::SIGTERM,
978        UnixSignal::Kill => libc::SIGKILL,
979    }
980}
981
982#[cfg(windows)]
983fn windows_priority_flags(nice: Option<i32>) -> u32 {
984    const IDLE_PRIORITY_CLASS: u32 = 0x0000_0040;
985    const BELOW_NORMAL_PRIORITY_CLASS: u32 = 0x0000_4000;
986    const ABOVE_NORMAL_PRIORITY_CLASS: u32 = 0x0000_8000;
987    const HIGH_PRIORITY_CLASS: u32 = 0x0000_0080;
988
989    match nice {
990        Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
991        Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
992        Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
993        Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
994        _ => 0,
995    }
996}
997#[cfg(test)]
998mod tests {
999    use super::*;
1000
1001    // ── StreamKind tests ──
1002
1003    #[test]
1004    fn stream_kind_as_str_stdout() {
1005        assert_eq!(StreamKind::Stdout.as_str(), "stdout");
1006    }
1007
1008    #[test]
1009    fn stream_kind_as_str_stderr() {
1010        assert_eq!(StreamKind::Stderr.as_str(), "stderr");
1011    }
1012
1013    #[test]
1014    fn stream_kind_equality() {
1015        assert_eq!(StreamKind::Stdout, StreamKind::Stdout);
1016        assert_ne!(StreamKind::Stdout, StreamKind::Stderr);
1017    }
1018
1019    // ── StreamEvent tests ──
1020
1021    #[test]
1022    fn stream_event_clone() {
1023        let event = StreamEvent {
1024            stream: StreamKind::Stdout,
1025            line: b"hello".to_vec(),
1026        };
1027        let cloned = event.clone();
1028        assert_eq!(event, cloned);
1029    }
1030
1031    // ── ReadStatus tests ──
1032
1033    #[test]
1034    fn read_status_line_variant() {
1035        let status: ReadStatus<Vec<u8>> = ReadStatus::Line(b"data".to_vec());
1036        assert!(matches!(status, ReadStatus::Line(ref v) if v == b"data"));
1037    }
1038
1039    #[test]
1040    fn read_status_timeout_variant() {
1041        let status: ReadStatus<Vec<u8>> = ReadStatus::Timeout;
1042        assert!(matches!(status, ReadStatus::Timeout));
1043    }
1044
1045    #[test]
1046    fn read_status_eof_variant() {
1047        let status: ReadStatus<Vec<u8>> = ReadStatus::Eof;
1048        assert!(matches!(status, ReadStatus::Eof));
1049    }
1050
1051    // ── ProcessError tests ──
1052
1053    #[test]
1054    fn process_error_display_already_started() {
1055        assert_eq!(
1056            ProcessError::AlreadyStarted.to_string(),
1057            "process already started"
1058        );
1059    }
1060
1061    #[test]
1062    fn process_error_display_not_running() {
1063        assert_eq!(
1064            ProcessError::NotRunning.to_string(),
1065            "process is not running"
1066        );
1067    }
1068
1069    #[test]
1070    fn process_error_display_stdin_unavailable() {
1071        assert_eq!(
1072            ProcessError::StdinUnavailable.to_string(),
1073            "process stdin is not available"
1074        );
1075    }
1076
1077    #[test]
1078    fn process_error_display_timeout() {
1079        assert_eq!(ProcessError::Timeout.to_string(), "process timed out");
1080    }
1081
1082    #[test]
1083    fn process_error_display_spawn() {
1084        let err = ProcessError::Spawn(std::io::Error::new(
1085            std::io::ErrorKind::NotFound,
1086            "not found",
1087        ));
1088        assert!(err.to_string().contains("not found"));
1089    }
1090
1091    #[test]
1092    fn process_error_display_io() {
1093        let err = ProcessError::Io(std::io::Error::new(
1094            std::io::ErrorKind::BrokenPipe,
1095            "broken",
1096        ));
1097        assert!(err.to_string().contains("broken"));
1098    }
1099
1100    // ── CommandSpec tests ──
1101
1102    #[test]
1103    fn command_spec_shell_variant() {
1104        let spec = CommandSpec::Shell("echo hello".to_string());
1105        assert!(matches!(spec, CommandSpec::Shell(ref s) if s == "echo hello"));
1106    }
1107
1108    #[test]
1109    fn command_spec_argv_variant() {
1110        let spec = CommandSpec::Argv(vec!["echo".to_string(), "hello".to_string()]);
1111        assert!(matches!(spec, CommandSpec::Argv(ref v) if v.len() == 2));
1112    }
1113
1114    // ── StdinMode / StderrMode tests ──
1115
1116    #[test]
1117    fn stdin_mode_equality() {
1118        assert_eq!(StdinMode::Inherit, StdinMode::Inherit);
1119        assert_ne!(StdinMode::Piped, StdinMode::Null);
1120    }
1121
1122    #[test]
1123    fn stderr_mode_equality() {
1124        assert_eq!(StderrMode::Stdout, StderrMode::Stdout);
1125        assert_ne!(StderrMode::Stdout, StderrMode::Pipe);
1126    }
1127
1128    // ── SharedState tests ──
1129
1130    #[test]
1131    fn shared_state_new_with_capture() {
1132        let state = SharedState::new(true);
1133        let queues = state.queues.lock().unwrap();
1134        assert!(!queues.stdout_closed);
1135        assert!(!queues.stderr_closed);
1136        assert!(queues.stdout_queue.is_empty());
1137        assert!(queues.stderr_queue.is_empty());
1138    }
1139
1140    #[test]
1141    fn shared_state_new_without_capture() {
1142        let state = SharedState::new(false);
1143        let queues = state.queues.lock().unwrap();
1144        assert!(queues.stdout_closed);
1145        assert!(queues.stderr_closed);
1146    }
1147
1148    #[test]
1149    fn shared_state_returncode_initially_not_set() {
1150        let state = SharedState::new(true);
1151        let code = state.returncode.load(Ordering::Acquire);
1152        assert_eq!(code, RETURNCODE_NOT_SET);
1153    }
1154
1155    // ── feed_chunk tests ──
1156
1157    #[test]
1158    fn feed_chunk_single_line_with_newline() {
1159        let shared = Arc::new(SharedState::new(true));
1160        let mut pending = Vec::new();
1161        let lines = feed_chunk(&mut pending, b"hello\n");
1162        emit_lines(&shared, StreamKind::Stdout, lines);
1163        let queues = shared.queues.lock().unwrap();
1164        assert_eq!(queues.stdout_queue.len(), 1);
1165        assert_eq!(queues.stdout_queue[0], b"hello");
1166        assert!(pending.is_empty());
1167    }
1168
1169    #[test]
1170    fn feed_chunk_crlf_stripping() {
1171        let shared = Arc::new(SharedState::new(true));
1172        let mut pending = Vec::new();
1173        let lines = feed_chunk(&mut pending, b"hello\r\n");
1174        emit_lines(&shared, StreamKind::Stdout, lines);
1175        let queues = shared.queues.lock().unwrap();
1176        assert_eq!(queues.stdout_queue.len(), 1);
1177        assert_eq!(queues.stdout_queue[0], b"hello");
1178    }
1179
1180    #[test]
1181    fn feed_chunk_multiple_lines() {
1182        let shared = Arc::new(SharedState::new(true));
1183        let mut pending = Vec::new();
1184        let lines = feed_chunk(&mut pending, b"a\nb\nc\n");
1185        emit_lines(&shared, StreamKind::Stdout, lines);
1186        let queues = shared.queues.lock().unwrap();
1187        assert_eq!(queues.stdout_queue.len(), 3);
1188        assert_eq!(queues.stdout_queue[0], b"a");
1189        assert_eq!(queues.stdout_queue[1], b"b");
1190        assert_eq!(queues.stdout_queue[2], b"c");
1191    }
1192
1193    #[test]
1194    fn feed_chunk_no_newline_stays_pending() {
1195        let mut pending = Vec::new();
1196        let lines = feed_chunk(&mut pending, b"partial");
1197        assert!(lines.is_empty());
1198        assert_eq!(pending, b"partial");
1199    }
1200
1201    #[test]
1202    fn feed_chunk_accumulates_pending() {
1203        let shared = Arc::new(SharedState::new(true));
1204        let mut pending = Vec::new();
1205        let lines1 = feed_chunk(&mut pending, b"hel");
1206        emit_lines(&shared, StreamKind::Stdout, lines1);
1207        let lines2 = feed_chunk(&mut pending, b"lo\n");
1208        emit_lines(&shared, StreamKind::Stdout, lines2);
1209        let queues = shared.queues.lock().unwrap();
1210        assert_eq!(queues.stdout_queue.len(), 1);
1211        assert_eq!(queues.stdout_queue[0], b"hello");
1212        assert!(pending.is_empty());
1213    }
1214
1215    #[test]
1216    fn feed_chunk_empty_line_not_emitted() {
1217        let shared = Arc::new(SharedState::new(true));
1218        let mut pending = Vec::new();
1219        let lines = feed_chunk(&mut pending, b"\n");
1220        emit_lines(&shared, StreamKind::Stdout, lines);
1221        let queues = shared.queues.lock().unwrap();
1222        assert!(queues.stdout_queue.is_empty());
1223    }
1224
1225    #[test]
1226    fn feed_chunk_stderr_goes_to_stderr_queue() {
1227        let shared = Arc::new(SharedState::new(true));
1228        let mut pending = Vec::new();
1229        let lines = feed_chunk(&mut pending, b"error\n");
1230        emit_lines(&shared, StreamKind::Stderr, lines);
1231        let queues = shared.queues.lock().unwrap();
1232        assert!(queues.stdout_queue.is_empty());
1233        assert_eq!(queues.stderr_queue.len(), 1);
1234        assert_eq!(queues.stderr_queue[0], b"error");
1235    }
1236
1237    // ── emit_lines tests ──
1238
1239    #[test]
1240    fn emit_lines_updates_all_queues_and_history() {
1241        let shared = Arc::new(SharedState::new(true));
1242        emit_lines(&shared, StreamKind::Stdout, vec![b"test".to_vec()]);
1243        let queues = shared.queues.lock().unwrap();
1244        assert_eq!(queues.stdout_queue.len(), 1);
1245        assert_eq!(queues.stdout_history.len(), 1);
1246        assert_eq!(queues.stdout_history_bytes, 4);
1247        assert_eq!(queues.combined_queue.len(), 1);
1248        assert_eq!(queues.combined_history.len(), 1);
1249        assert_eq!(queues.combined_history_bytes, 4);
1250    }
1251
1252    #[test]
1253    fn emit_lines_stderr_updates_stderr_queues() {
1254        let shared = Arc::new(SharedState::new(true));
1255        emit_lines(&shared, StreamKind::Stderr, vec![b"err".to_vec()]);
1256        let queues = shared.queues.lock().unwrap();
1257        assert_eq!(queues.stderr_queue.len(), 1);
1258        assert_eq!(queues.stderr_history.len(), 1);
1259        assert_eq!(queues.stderr_history_bytes, 3);
1260        assert_eq!(queues.combined_queue.len(), 1);
1261        assert_eq!(queues.combined_history_bytes, 3);
1262    }
1263
1264    // ── NativeProcess unit tests (no process spawn) ──
1265
1266    #[test]
1267    fn native_process_returncode_none_before_start() {
1268        let process = NativeProcess::new(ProcessConfig {
1269            command: CommandSpec::Argv(vec!["echo".into()]),
1270            cwd: None,
1271            env: None,
1272            capture: false,
1273            stderr_mode: StderrMode::Stdout,
1274            creationflags: None,
1275            create_process_group: false,
1276            stdin_mode: StdinMode::Inherit,
1277            nice: None,
1278            containment: None,
1279        });
1280        assert!(process.returncode().is_none());
1281    }
1282
1283    #[test]
1284    fn native_process_pid_none_before_start() {
1285        let process = NativeProcess::new(ProcessConfig {
1286            command: CommandSpec::Argv(vec!["echo".into()]),
1287            cwd: None,
1288            env: None,
1289            capture: false,
1290            stderr_mode: StderrMode::Stdout,
1291            creationflags: None,
1292            create_process_group: false,
1293            stdin_mode: StdinMode::Inherit,
1294            nice: None,
1295            containment: None,
1296        });
1297        assert!(process.pid().is_none());
1298    }
1299
1300    #[test]
1301    fn native_process_has_pending_false_when_no_capture() {
1302        let process = NativeProcess::new(ProcessConfig {
1303            command: CommandSpec::Argv(vec!["echo".into()]),
1304            cwd: None,
1305            env: None,
1306            capture: false,
1307            stderr_mode: StderrMode::Stdout,
1308            creationflags: None,
1309            create_process_group: false,
1310            stdin_mode: StdinMode::Inherit,
1311            nice: None,
1312            containment: None,
1313        });
1314        assert!(!process.has_pending_stream(StreamKind::Stdout));
1315        assert!(!process.has_pending_combined());
1316    }
1317
1318    #[test]
1319    fn native_process_drain_empty_when_no_capture() {
1320        let process = NativeProcess::new(ProcessConfig {
1321            command: CommandSpec::Argv(vec!["echo".into()]),
1322            cwd: None,
1323            env: None,
1324            capture: false,
1325            stderr_mode: StderrMode::Stdout,
1326            creationflags: None,
1327            create_process_group: false,
1328            stdin_mode: StdinMode::Inherit,
1329            nice: None,
1330            containment: None,
1331        });
1332        assert!(process.drain_stream(StreamKind::Stdout).is_empty());
1333        assert!(process.drain_combined().is_empty());
1334    }
1335
1336    #[test]
1337    fn native_process_stderr_not_pending_when_merged() {
1338        let process = NativeProcess::new(ProcessConfig {
1339            command: CommandSpec::Argv(vec!["echo".into()]),
1340            cwd: None,
1341            env: None,
1342            capture: true,
1343            stderr_mode: StderrMode::Stdout,
1344            creationflags: None,
1345            create_process_group: false,
1346            stdin_mode: StdinMode::Inherit,
1347            nice: None,
1348            containment: None,
1349        });
1350        assert!(!process.has_pending_stream(StreamKind::Stderr));
1351    }
1352
1353    #[test]
1354    fn native_process_drain_stderr_empty_when_merged() {
1355        let process = NativeProcess::new(ProcessConfig {
1356            command: CommandSpec::Argv(vec!["echo".into()]),
1357            cwd: None,
1358            env: None,
1359            capture: true,
1360            stderr_mode: StderrMode::Stdout,
1361            creationflags: None,
1362            create_process_group: false,
1363            stdin_mode: StdinMode::Inherit,
1364            nice: None,
1365            containment: None,
1366        });
1367        assert!(process.drain_stream(StreamKind::Stderr).is_empty());
1368    }
1369
1370    #[test]
1371    fn native_process_captured_stderr_empty_when_merged() {
1372        let process = NativeProcess::new(ProcessConfig {
1373            command: CommandSpec::Argv(vec!["echo".into()]),
1374            cwd: None,
1375            env: None,
1376            capture: true,
1377            stderr_mode: StderrMode::Stdout,
1378            creationflags: None,
1379            create_process_group: false,
1380            stdin_mode: StdinMode::Inherit,
1381            nice: None,
1382            containment: None,
1383        });
1384        assert!(process.captured_stderr().is_empty());
1385    }
1386
1387    #[test]
1388    fn native_process_captured_stream_bytes_zero_when_merged_stderr() {
1389        let process = NativeProcess::new(ProcessConfig {
1390            command: CommandSpec::Argv(vec!["echo".into()]),
1391            cwd: None,
1392            env: None,
1393            capture: true,
1394            stderr_mode: StderrMode::Stdout,
1395            creationflags: None,
1396            create_process_group: false,
1397            stdin_mode: StdinMode::Inherit,
1398            nice: None,
1399            containment: None,
1400        });
1401        assert_eq!(process.captured_stream_bytes(StreamKind::Stderr), 0);
1402    }
1403
1404    #[test]
1405    fn native_process_clear_captured_stderr_zero_when_merged() {
1406        let process = NativeProcess::new(ProcessConfig {
1407            command: CommandSpec::Argv(vec!["echo".into()]),
1408            cwd: None,
1409            env: None,
1410            capture: true,
1411            stderr_mode: StderrMode::Stdout,
1412            creationflags: None,
1413            create_process_group: false,
1414            stdin_mode: StdinMode::Inherit,
1415            nice: None,
1416            containment: None,
1417        });
1418        assert_eq!(process.clear_captured_stream(StreamKind::Stderr), 0);
1419    }
1420
1421    #[test]
1422    fn native_process_read_stream_eof_when_stderr_merged() {
1423        let process = NativeProcess::new(ProcessConfig {
1424            command: CommandSpec::Argv(vec!["echo".into()]),
1425            cwd: None,
1426            env: None,
1427            capture: true,
1428            stderr_mode: StderrMode::Stdout,
1429            creationflags: None,
1430            create_process_group: false,
1431            stdin_mode: StdinMode::Inherit,
1432            nice: None,
1433            containment: None,
1434        });
1435        assert_eq!(
1436            process.read_stream(StreamKind::Stderr, Some(Duration::from_millis(10))),
1437            ReadStatus::Eof
1438        );
1439    }
1440
1441    // ── log_spawned_child_pid ──
1442
1443    #[test]
1444    fn log_spawned_child_pid_noop_without_env() {
1445        std::env::remove_var("RUNNING_PROCESS_CHILD_PID_LOG_PATH");
1446        assert!(log_spawned_child_pid(12345).is_ok());
1447    }
1448
1449    // ── shell_command ──
1450
1451    #[test]
1452    fn shell_command_creates_command() {
1453        let cmd = shell_command("echo test");
1454        let _ = format!("{:?}", cmd);
1455    }
1456
1457    // ── exit_code ──
1458
1459    #[test]
1460    fn exit_code_from_success() {
1461        let output = std::process::Command::new("python")
1462            .args(["-c", "pass"])
1463            .output()
1464            .unwrap();
1465        assert_eq!(exit_code(output.status), 0);
1466    }
1467
1468    #[test]
1469    fn exit_code_from_nonzero() {
1470        let output = std::process::Command::new("python")
1471            .args(["-c", "import sys; sys.exit(42)"])
1472            .output()
1473            .unwrap();
1474        assert_eq!(exit_code(output.status), 42);
1475    }
1476
1477    // ── windows_priority_flags ──
1478
1479    #[cfg(windows)]
1480    mod windows_tests {
1481        use super::*;
1482
1483        const IDLE_PRIORITY_CLASS: u32 = 0x0000_0040;
1484        const BELOW_NORMAL_PRIORITY_CLASS: u32 = 0x0000_4000;
1485        const ABOVE_NORMAL_PRIORITY_CLASS: u32 = 0x0000_8000;
1486        const HIGH_PRIORITY_CLASS: u32 = 0x0000_0080;
1487
1488        #[test]
1489        fn priority_flags_none() {
1490            assert_eq!(windows_priority_flags(None), 0);
1491        }
1492
1493        #[test]
1494        fn priority_flags_zero() {
1495            assert_eq!(windows_priority_flags(Some(0)), 0);
1496        }
1497
1498        #[test]
1499        fn priority_flags_high_nice_idle() {
1500            assert_eq!(windows_priority_flags(Some(15)), IDLE_PRIORITY_CLASS);
1501            assert_eq!(windows_priority_flags(Some(20)), IDLE_PRIORITY_CLASS);
1502        }
1503
1504        #[test]
1505        fn priority_flags_low_positive_below_normal() {
1506            assert_eq!(windows_priority_flags(Some(1)), BELOW_NORMAL_PRIORITY_CLASS);
1507            assert_eq!(
1508                windows_priority_flags(Some(14)),
1509                BELOW_NORMAL_PRIORITY_CLASS
1510            );
1511        }
1512
1513        #[test]
1514        fn priority_flags_negative_above_normal() {
1515            assert_eq!(
1516                windows_priority_flags(Some(-1)),
1517                ABOVE_NORMAL_PRIORITY_CLASS
1518            );
1519            assert_eq!(
1520                windows_priority_flags(Some(-14)),
1521                ABOVE_NORMAL_PRIORITY_CLASS
1522            );
1523        }
1524
1525        #[test]
1526        fn priority_flags_very_negative_high() {
1527            assert_eq!(windows_priority_flags(Some(-15)), HIGH_PRIORITY_CLASS);
1528            assert_eq!(windows_priority_flags(Some(-20)), HIGH_PRIORITY_CLASS);
1529        }
1530    }
1531
1532    // ── ProcessConfig ──
1533
1534    #[test]
1535    fn process_config_clone() {
1536        let config = ProcessConfig {
1537            command: CommandSpec::Shell("echo".to_string()),
1538            cwd: Some("/tmp".into()),
1539            env: Some(vec![("KEY".to_string(), "VAL".to_string())]),
1540            capture: true,
1541            stderr_mode: StderrMode::Pipe,
1542            creationflags: Some(0x10),
1543            create_process_group: true,
1544            stdin_mode: StdinMode::Piped,
1545            nice: Some(5),
1546            containment: None,
1547        };
1548        let cloned = config.clone();
1549        assert!(cloned.capture);
1550        assert_eq!(cloned.nice, Some(5));
1551    }
1552
1553    // ── render_rust_debug_traces ──
1554
1555    #[test]
1556    fn render_rust_debug_traces_returns_string() {
1557        let result = render_rust_debug_traces();
1558        let _ = result.len();
1559    }
1560
1561    // ── RustDebugScopeGuard ──
1562
1563    #[test]
1564    fn rust_debug_scope_guard_enters_and_drops() {
1565        let _guard = RustDebugScopeGuard::enter("test_scope", file!(), line!());
1566        let traces = render_rust_debug_traces();
1567        assert!(traces.contains("test_scope"));
1568        drop(_guard);
1569    }
1570
1571    // ── Unix signal tests ──
1572
1573    #[cfg(unix)]
1574    mod unix_tests {
1575        use super::*;
1576
1577        #[test]
1578        fn unix_signal_raw_values() {
1579            assert_eq!(unix_signal_raw(UnixSignal::Interrupt), libc::SIGINT);
1580            assert_eq!(unix_signal_raw(UnixSignal::Terminate), libc::SIGTERM);
1581            assert_eq!(unix_signal_raw(UnixSignal::Kill), libc::SIGKILL);
1582        }
1583
1584        #[test]
1585        fn unix_signal_enum_equality() {
1586            assert_eq!(UnixSignal::Interrupt, UnixSignal::Interrupt);
1587            assert_ne!(UnixSignal::Interrupt, UnixSignal::Kill);
1588        }
1589    }
1590
1591    // ── wait_for_capture_completion ──
1592
1593    #[test]
1594    fn wait_for_capture_completion_noop_without_capture() {
1595        let process = NativeProcess::new(ProcessConfig {
1596            command: CommandSpec::Argv(vec!["echo".into()]),
1597            cwd: None,
1598            env: None,
1599            capture: false,
1600            stderr_mode: StderrMode::Stdout,
1601            creationflags: None,
1602            create_process_group: false,
1603            stdin_mode: StdinMode::Inherit,
1604            nice: None,
1605            containment: None,
1606        });
1607        process.wait_for_capture_completion_impl();
1608    }
1609
1610    // ── build_command tests ──
1611
1612    #[test]
1613    fn build_command_from_argv() {
1614        let process = NativeProcess::new(ProcessConfig {
1615            command: CommandSpec::Argv(vec!["echo".into(), "hello".into(), "world".into()]),
1616            cwd: None,
1617            env: None,
1618            capture: false,
1619            stderr_mode: StderrMode::Stdout,
1620            creationflags: None,
1621            create_process_group: false,
1622            stdin_mode: StdinMode::Inherit,
1623            nice: None,
1624            containment: None,
1625        });
1626        let cmd = process.build_command();
1627        assert_eq!(cmd.get_program(), "echo");
1628        let args: Vec<_> = cmd.get_args().collect();
1629        assert_eq!(args, vec!["hello", "world"]);
1630    }
1631
1632    #[test]
1633    fn build_command_from_shell() {
1634        let process = NativeProcess::new(ProcessConfig {
1635            command: CommandSpec::Shell("echo test".into()),
1636            cwd: None,
1637            env: None,
1638            capture: false,
1639            stderr_mode: StderrMode::Stdout,
1640            creationflags: None,
1641            create_process_group: false,
1642            stdin_mode: StdinMode::Inherit,
1643            nice: None,
1644            containment: None,
1645        });
1646        let cmd = process.build_command();
1647        // Shell commands go through the OS shell
1648        let program = cmd.get_program().to_string_lossy().to_string();
1649        #[cfg(windows)]
1650        assert!(
1651            program.contains("cmd"),
1652            "expected cmd shell, got {}",
1653            program
1654        );
1655        #[cfg(not(windows))]
1656        assert!(program.contains("sh"), "expected sh shell, got {}", program);
1657    }
1658
1659    #[test]
1660    fn build_command_with_cwd() {
1661        let tmp = std::env::temp_dir();
1662        let process = NativeProcess::new(ProcessConfig {
1663            command: CommandSpec::Argv(vec!["echo".into()]),
1664            cwd: Some(tmp.clone()),
1665            env: None,
1666            capture: false,
1667            stderr_mode: StderrMode::Stdout,
1668            creationflags: None,
1669            create_process_group: false,
1670            stdin_mode: StdinMode::Inherit,
1671            nice: None,
1672            containment: None,
1673        });
1674        let cmd = process.build_command();
1675        assert_eq!(cmd.get_current_dir().unwrap(), &tmp);
1676    }
1677
1678    #[test]
1679    fn build_command_with_env() {
1680        let process = NativeProcess::new(ProcessConfig {
1681            command: CommandSpec::Argv(vec!["echo".into()]),
1682            cwd: None,
1683            env: Some(vec![
1684                ("FOO".into(), "bar".into()),
1685                ("BAZ".into(), "qux".into()),
1686            ]),
1687            capture: false,
1688            stderr_mode: StderrMode::Stdout,
1689            creationflags: None,
1690            create_process_group: false,
1691            stdin_mode: StdinMode::Inherit,
1692            nice: None,
1693            containment: None,
1694        });
1695        let cmd = process.build_command();
1696        let envs: Vec<_> = cmd.get_envs().collect();
1697        assert!(envs
1698            .iter()
1699            .any(|(k, v)| *k == "FOO" && *v == Some(std::ffi::OsStr::new("bar"))));
1700        assert!(envs
1701            .iter()
1702            .any(|(k, v)| *k == "BAZ" && *v == Some(std::ffi::OsStr::new("qux"))));
1703    }
1704
1705    #[test]
1706    fn build_command_single_argv() {
1707        let process = NativeProcess::new(ProcessConfig {
1708            command: CommandSpec::Argv(vec!["echo".into()]),
1709            cwd: None,
1710            env: None,
1711            capture: false,
1712            stderr_mode: StderrMode::Stdout,
1713            creationflags: None,
1714            create_process_group: false,
1715            stdin_mode: StdinMode::Inherit,
1716            nice: None,
1717            containment: None,
1718        });
1719        let cmd = process.build_command();
1720        assert_eq!(cmd.get_program(), "echo");
1721        assert_eq!(cmd.get_args().count(), 0);
1722    }
1723
1724    // ── set_returncode tests ──
1725
1726    #[test]
1727    fn set_returncode_updates_shared_state() {
1728        let process = NativeProcess::new(ProcessConfig {
1729            command: CommandSpec::Argv(vec!["echo".into()]),
1730            cwd: None,
1731            env: None,
1732            capture: false,
1733            stderr_mode: StderrMode::Stdout,
1734            creationflags: None,
1735            create_process_group: false,
1736            stdin_mode: StdinMode::Inherit,
1737            nice: None,
1738            containment: None,
1739        });
1740        assert!(process.returncode().is_none());
1741        process.set_returncode(42);
1742        assert_eq!(process.returncode(), Some(42));
1743    }
1744
1745    #[test]
1746    fn set_returncode_overwrites() {
1747        let process = NativeProcess::new(ProcessConfig {
1748            command: CommandSpec::Argv(vec!["echo".into()]),
1749            cwd: None,
1750            env: None,
1751            capture: false,
1752            stderr_mode: StderrMode::Stdout,
1753            creationflags: None,
1754            create_process_group: false,
1755            stdin_mode: StdinMode::Inherit,
1756            nice: None,
1757            containment: None,
1758        });
1759        process.set_returncode(1);
1760        process.set_returncode(2);
1761        assert_eq!(process.returncode(), Some(2));
1762    }
1763
1764    // ── SharedState with capture ──
1765
1766    #[test]
1767    fn shared_state_with_capture_queues_open() {
1768        let state = SharedState::new(true);
1769        let guard = state.queues.lock().unwrap();
1770        assert!(!guard.stdout_closed);
1771        assert!(!guard.stderr_closed);
1772    }
1773
1774    #[test]
1775    fn shared_state_without_capture_queues_closed() {
1776        let state = SharedState::new(false);
1777        let guard = state.queues.lock().unwrap();
1778        assert!(guard.stdout_closed);
1779        assert!(guard.stderr_closed);
1780    }
1781
1782    // ── ProcessError Display additional variants ──
1783
1784    #[test]
1785    fn process_error_display_io_variant() {
1786        let err = ProcessError::Io(std::io::Error::new(
1787            std::io::ErrorKind::BrokenPipe,
1788            "pipe broken",
1789        ));
1790        let msg = format!("{}", err);
1791        assert!(msg.contains("pipe broken"));
1792    }
1793
1794    #[test]
1795    fn process_error_display_spawn_variant() {
1796        let err = ProcessError::Spawn(std::io::Error::new(
1797            std::io::ErrorKind::NotFound,
1798            "not found",
1799        ));
1800        let msg = format!("{}", err);
1801        assert!(msg.contains("not found"));
1802    }
1803
1804    // ── shell_command produces a command ──
1805
1806    #[test]
1807    fn shell_command_returns_command_with_shell() {
1808        let cmd = shell_command("echo test");
1809        let program = cmd.get_program().to_string_lossy().to_string();
1810        #[cfg(windows)]
1811        assert!(program.contains("cmd"));
1812        #[cfg(not(windows))]
1813        assert!(program.contains("sh"));
1814    }
1815}