Skip to main content

running_process_core/
lib.rs

1use std::collections::VecDeque;
2use std::fs::OpenOptions;
3use std::io::Read;
4use std::io::Write;
5use std::path::PathBuf;
6use std::process::{Child, Command, Stdio};
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::thread;
10use std::time::{Duration, Instant};
11
12use thiserror::Error;
13
14pub mod containment;
15#[cfg(feature = "originator-scan")]
16pub mod originator;
17mod public_symbols;
18mod rust_debug;
19
20pub use containment::{ContainedChild, ContainedProcessGroup, Containment, ORIGINATOR_ENV_VAR};
21#[cfg(feature = "originator-scan")]
22pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
23pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
24
25#[macro_export]
26macro_rules! rp_rust_debug_scope {
27    ($label:expr) => {
28        let _running_process_rust_debug_scope =
29            $crate::RustDebugScopeGuard::enter($label, file!(), line!());
30    };
31}
32
33const CHILD_PID_LOG_PATH_ENV: &str = "RUNNING_PROCESS_CHILD_PID_LOG_PATH";
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum StreamKind {
37    Stdout,
38    Stderr,
39}
40
41impl StreamKind {
42    pub fn as_str(self) -> &'static str {
43        match self {
44            Self::Stdout => "stdout",
45            Self::Stderr => "stderr",
46        }
47    }
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct StreamEvent {
52    pub stream: StreamKind,
53    pub line: Vec<u8>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum ReadStatus<T> {
58    Line(T),
59    Timeout,
60    Eof,
61}
62
63#[derive(Debug, Error)]
64pub enum ProcessError {
65    #[error("process already started")]
66    AlreadyStarted,
67    #[error("process is not running")]
68    NotRunning,
69    #[error("process stdin is not available")]
70    StdinUnavailable,
71    #[error("failed to spawn process: {0}")]
72    Spawn(std::io::Error),
73    #[error("failed to read process output: {0}")]
74    Io(std::io::Error),
75    #[error("process timed out")]
76    Timeout,
77}
78
79#[derive(Debug, Clone)]
80pub enum CommandSpec {
81    Shell(String),
82    Argv(Vec<String>),
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum StdinMode {
87    Inherit,
88    Piped,
89    Null,
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum StderrMode {
94    Stdout,
95    Pipe,
96}
97
98#[cfg(unix)]
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum UnixSignal {
101    Interrupt,
102    Terminate,
103    Kill,
104}
105
106#[derive(Debug, Clone)]
107pub struct ProcessConfig {
108    pub command: CommandSpec,
109    pub cwd: Option<PathBuf>,
110    pub env: Option<Vec<(String, String)>>,
111    pub capture: bool,
112    pub stderr_mode: StderrMode,
113    pub creationflags: Option<u32>,
114    pub create_process_group: bool,
115    pub stdin_mode: StdinMode,
116    pub nice: Option<i32>,
117    /// Optional containment policy. `None` preserves existing behaviour.
118    /// `Some(Contained)` sets `PR_SET_PDEATHSIG(SIGKILL)` on Linux and uses
119    /// the existing Job Object on Windows. `Some(Detached)` creates a new
120    /// session (`setsid`) on Unix so the child survives the parent.
121    pub containment: Option<Containment>,
122}
123
124#[derive(Default)]
125struct QueueState {
126    stdout_queue: VecDeque<Vec<u8>>,
127    stderr_queue: VecDeque<Vec<u8>>,
128    combined_queue: VecDeque<StreamEvent>,
129    stdout_history: VecDeque<Vec<u8>>,
130    stderr_history: VecDeque<Vec<u8>>,
131    combined_history: VecDeque<StreamEvent>,
132    stdout_history_bytes: usize,
133    stderr_history_bytes: usize,
134    combined_history_bytes: usize,
135    stdout_closed: bool,
136    stderr_closed: bool,
137}
138
139/// Sentinel value for returncode atomic: process has not exited yet.
140const RETURNCODE_NOT_SET: i64 = i64::MIN;
141
142struct SharedState {
143    queues: Mutex<QueueState>,
144    condvar: Condvar,
145    /// Atomic exit code. `RETURNCODE_NOT_SET` means "not exited yet".
146    /// Updated by a background waiter thread — reading is lock-free.
147    returncode: AtomicI64,
148}
149
150#[cfg(windows)]
151struct WindowsJobHandle(usize);
152
153#[cfg(windows)]
154impl Drop for WindowsJobHandle {
155    fn drop(&mut self) {
156        unsafe {
157            winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
158        }
159    }
160}
161
162struct ChildState {
163    child: Child,
164    #[cfg(windows)]
165    _job: WindowsJobHandle,
166}
167
168impl SharedState {
169    fn new(capture: bool) -> Self {
170        let queues = QueueState {
171            stdout_closed: !capture,
172            stderr_closed: !capture,
173            ..QueueState::default()
174        };
175        Self {
176            queues: Mutex::new(queues),
177            condvar: Condvar::new(),
178            returncode: AtomicI64::new(RETURNCODE_NOT_SET),
179        }
180    }
181}
182
183pub struct NativeProcess {
184    config: ProcessConfig,
185    child: Arc<Mutex<Option<ChildState>>>,
186    shared: Arc<SharedState>,
187}
188
189impl NativeProcess {
190    pub fn new(config: ProcessConfig) -> Self {
191        Self {
192            shared: Arc::new(SharedState::new(config.capture)),
193            child: Arc::new(Mutex::new(None)),
194            config,
195        }
196    }
197
198    // Preserve a stable Rust frame here in release user dumps.
199    #[inline(never)]
200    pub fn start(&self) -> Result<(), ProcessError> {
201        public_symbols::rp_native_process_start_public(self)
202    }
203
204    fn start_impl(&self) -> Result<(), ProcessError> {
205        crate::rp_rust_debug_scope!("running_process_core::NativeProcess::start");
206        let mut guard = self.child.lock().expect("child mutex poisoned");
207        if guard.is_some() {
208            return Err(ProcessError::AlreadyStarted);
209        }
210
211        let mut command = self.build_command();
212        match self.config.stdin_mode {
213            StdinMode::Inherit => {}
214            StdinMode::Piped => {
215                command.stdin(Stdio::piped());
216            }
217            StdinMode::Null => {
218                command.stdin(Stdio::null());
219            }
220        }
221        if self.config.capture {
222            command.stdout(Stdio::piped());
223            command.stderr(Stdio::piped());
224        }
225
226        let mut child = command.spawn().map_err(ProcessError::Spawn)?;
227        log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
228        #[cfg(windows)]
229        let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
230            .map_err(ProcessError::Spawn)?;
231        if self.config.capture {
232            let stdout = child.stdout.take().expect("stdout pipe missing");
233            let stderr = child.stderr.take().expect("stderr pipe missing");
234            self.spawn_reader(stdout, StreamKind::Stdout, StreamKind::Stdout);
235            self.spawn_reader(
236                stderr,
237                StreamKind::Stderr,
238                match self.config.stderr_mode {
239                    StderrMode::Stdout => StreamKind::Stdout,
240                    StderrMode::Pipe => StreamKind::Stderr,
241                },
242            );
243        }
244        *guard = Some(ChildState {
245            child,
246            #[cfg(windows)]
247            _job: job,
248        });
249        drop(guard);
250        self.spawn_exit_waiter();
251        Ok(())
252    }
253
254    /// Background thread that polls for process exit and stores the exit code
255    /// atomically. This makes `returncode` auto-update without explicit `poll()`.
256    fn spawn_exit_waiter(&self) {
257        let child = Arc::clone(&self.child);
258        let shared = Arc::clone(&self.shared);
259        thread::spawn(move || loop {
260            if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
261                return;
262            }
263            {
264                let mut guard = child.lock().expect("child mutex poisoned");
265                if let Some(child_state) = guard.as_mut() {
266                    match child_state.child.try_wait() {
267                        Ok(Some(status)) => {
268                            let code = exit_code(status);
269                            shared.returncode.store(code as i64, Ordering::Release);
270                            shared.condvar.notify_all();
271                            return;
272                        }
273                        Ok(None) => {}
274                        Err(_) => return,
275                    }
276                } else {
277                    return;
278                }
279            }
280            thread::sleep(Duration::from_millis(1));
281        });
282    }
283
284    pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
285        let mut guard = self.child.lock().expect("child mutex poisoned");
286        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
287        let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
288        use std::io::Write;
289        stdin.write_all(data).map_err(ProcessError::Io)?;
290        stdin.flush().map_err(ProcessError::Io)?;
291        drop(child.stdin.take());
292        Ok(())
293    }
294
295    pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
296        // Fast path: check atomic set by background waiter thread.
297        if let Some(code) = self.returncode() {
298            return Ok(Some(code));
299        }
300        let mut guard = self.child.lock().expect("child mutex poisoned");
301        let Some(child_state) = guard.as_mut() else {
302            return Ok(self.returncode());
303        };
304        let child = &mut child_state.child;
305        let status = child.try_wait().map_err(ProcessError::Io)?;
306        if let Some(status) = status {
307            let code = exit_code(status);
308            self.set_returncode(code);
309            return Ok(Some(code));
310        }
311        Ok(None)
312    }
313
314    // Preserve a stable Rust frame here in release user dumps.
315    #[inline(never)]
316    pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
317        public_symbols::rp_native_process_wait_public(self, timeout)
318    }
319
320    fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
321        crate::rp_rust_debug_scope!("running_process_core::NativeProcess::wait");
322        if self.child.lock().expect("child mutex poisoned").is_none() {
323            return self.returncode().ok_or(ProcessError::NotRunning);
324        }
325        let start = Instant::now();
326        loop {
327            if let Some(code) = self.poll()? {
328                public_symbols::rp_native_process_wait_for_capture_completion_public(self);
329                return Ok(code);
330            }
331            if timeout.is_some_and(|limit| start.elapsed() >= limit) {
332                return Err(ProcessError::Timeout);
333            }
334            thread::sleep(Duration::from_millis(10));
335        }
336    }
337
338    // Preserve a stable Rust frame here in release user dumps.
339    #[inline(never)]
340    pub fn kill(&self) -> Result<(), ProcessError> {
341        public_symbols::rp_native_process_kill_public(self)
342    }
343
344    fn kill_impl(&self) -> Result<(), ProcessError> {
345        crate::rp_rust_debug_scope!("running_process_core::NativeProcess::kill");
346        let mut guard = self.child.lock().expect("child mutex poisoned");
347        let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
348        child.kill().map_err(ProcessError::Io)?;
349        let status = child.wait().map_err(ProcessError::Io)?;
350        self.set_returncode(exit_code(status));
351        Ok(())
352    }
353
354    pub fn terminate(&self) -> Result<(), ProcessError> {
355        self.kill()
356    }
357
358    // Preserve a stable Rust frame here in release user dumps.
359    #[inline(never)]
360    pub fn close(&self) -> Result<(), ProcessError> {
361        public_symbols::rp_native_process_close_public(self)
362    }
363
364    fn close_impl(&self) -> Result<(), ProcessError> {
365        crate::rp_rust_debug_scope!("running_process_core::NativeProcess::close");
366        if self.child.lock().expect("child mutex poisoned").is_none() {
367            return Ok(());
368        }
369        if self.poll()?.is_none() {
370            self.kill()?;
371        } else {
372            public_symbols::rp_native_process_wait_for_capture_completion_public(self);
373        }
374        Ok(())
375    }
376
377    pub fn pid(&self) -> Option<u32> {
378        self.child
379            .lock()
380            .expect("child mutex poisoned")
381            .as_ref()
382            .map(|state| state.child.id())
383    }
384
385    pub fn returncode(&self) -> Option<i32> {
386        let v = self.shared.returncode.load(Ordering::Acquire);
387        if v == RETURNCODE_NOT_SET {
388            None
389        } else {
390            Some(v as i32)
391        }
392    }
393
394    pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
395        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
396            return false;
397        }
398        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
399        match stream {
400            StreamKind::Stdout => !guard.stdout_queue.is_empty(),
401            StreamKind::Stderr => !guard.stderr_queue.is_empty(),
402        }
403    }
404
405    pub fn has_pending_combined(&self) -> bool {
406        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
407        !guard.combined_queue.is_empty()
408    }
409
410    pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
411        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
412            return Vec::new();
413        }
414        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
415        let queue = match stream {
416            StreamKind::Stdout => &mut guard.stdout_queue,
417            StreamKind::Stderr => &mut guard.stderr_queue,
418        };
419        queue.drain(..).collect()
420    }
421
422    pub fn drain_combined(&self) -> Vec<StreamEvent> {
423        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
424        guard.combined_queue.drain(..).collect()
425    }
426
427    pub fn read_stream(
428        &self,
429        stream: StreamKind,
430        timeout: Option<Duration>,
431    ) -> ReadStatus<Vec<u8>> {
432        let deadline = timeout.map(|limit| Instant::now() + limit);
433        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
434
435        loop {
436            if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
437                return ReadStatus::Eof;
438            }
439
440            let queue = match stream {
441                StreamKind::Stdout => &mut guard.stdout_queue,
442                StreamKind::Stderr => &mut guard.stderr_queue,
443            };
444            if let Some(line) = queue.pop_front() {
445                return ReadStatus::Line(line);
446            }
447
448            let closed = match stream {
449                StreamKind::Stdout => {
450                    if self.config.stderr_mode == StderrMode::Stdout {
451                        guard.stdout_closed && guard.stderr_closed
452                    } else {
453                        guard.stdout_closed
454                    }
455                }
456                StreamKind::Stderr => guard.stderr_closed,
457            };
458            if closed {
459                return ReadStatus::Eof;
460            }
461
462            match deadline {
463                Some(deadline) => {
464                    let now = Instant::now();
465                    if now >= deadline {
466                        return ReadStatus::Timeout;
467                    }
468                    let wait = deadline.saturating_duration_since(now);
469                    let result = self
470                        .shared
471                        .condvar
472                        .wait_timeout(guard, wait)
473                        .expect("queue mutex poisoned");
474                    guard = result.0;
475                    if result.1.timed_out() {
476                        return ReadStatus::Timeout;
477                    }
478                }
479                None => {
480                    guard = self
481                        .shared
482                        .condvar
483                        .wait(guard)
484                        .expect("queue mutex poisoned");
485                }
486            }
487        }
488    }
489
490    // Preserve a stable Rust frame here in release user dumps.
491    #[inline(never)]
492    pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
493        public_symbols::rp_native_process_read_combined_public(self, timeout)
494    }
495
496    fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
497        crate::rp_rust_debug_scope!("running_process_core::NativeProcess::read_combined");
498        let deadline = timeout.map(|limit| Instant::now() + limit);
499        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
500
501        loop {
502            if let Some(event) = guard.combined_queue.pop_front() {
503                return ReadStatus::Line(event);
504            }
505            if guard.stdout_closed && guard.stderr_closed {
506                return ReadStatus::Eof;
507            }
508
509            match deadline {
510                Some(deadline) => {
511                    let now = Instant::now();
512                    if now >= deadline {
513                        return ReadStatus::Timeout;
514                    }
515                    let wait = deadline.saturating_duration_since(now);
516                    let result = self
517                        .shared
518                        .condvar
519                        .wait_timeout(guard, wait)
520                        .expect("queue mutex poisoned");
521                    guard = result.0;
522                    if result.1.timed_out() {
523                        return ReadStatus::Timeout;
524                    }
525                }
526                None => {
527                    guard = self
528                        .shared
529                        .condvar
530                        .wait(guard)
531                        .expect("queue mutex poisoned");
532                }
533            }
534        }
535    }
536
537    pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
538        self.shared
539            .queues
540            .lock()
541            .expect("queue mutex poisoned")
542            .stdout_history
543            .clone()
544            .into_iter()
545            .collect()
546    }
547
548    pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
549        if self.config.stderr_mode == StderrMode::Stdout {
550            return Vec::new();
551        }
552        self.shared
553            .queues
554            .lock()
555            .expect("queue mutex poisoned")
556            .stderr_history
557            .clone()
558            .into_iter()
559            .collect()
560    }
561
562    pub fn captured_combined(&self) -> Vec<StreamEvent> {
563        self.shared
564            .queues
565            .lock()
566            .expect("queue mutex poisoned")
567            .combined_history
568            .clone()
569            .into_iter()
570            .collect()
571    }
572
573    pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
574        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
575            return 0;
576        }
577        let guard = self.shared.queues.lock().expect("queue mutex poisoned");
578        match stream {
579            StreamKind::Stdout => guard.stdout_history_bytes,
580            StreamKind::Stderr => guard.stderr_history_bytes,
581        }
582    }
583
584    pub fn captured_combined_bytes(&self) -> usize {
585        self.shared
586            .queues
587            .lock()
588            .expect("queue mutex poisoned")
589            .combined_history_bytes
590    }
591
592    pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
593        if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
594            return 0;
595        }
596        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
597        match stream {
598            StreamKind::Stdout => {
599                let released = guard.stdout_history_bytes;
600                guard.stdout_history.clear();
601                guard.stdout_history_bytes = 0;
602                released
603            }
604            StreamKind::Stderr => {
605                let released = guard.stderr_history_bytes;
606                guard.stderr_history.clear();
607                guard.stderr_history_bytes = 0;
608                released
609            }
610        }
611    }
612
613    pub fn clear_captured_combined(&self) -> usize {
614        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
615        let released = guard.combined_history_bytes;
616        guard.combined_history.clear();
617        guard.combined_history_bytes = 0;
618        released
619    }
620
621    fn build_command(&self) -> Command {
622        let mut command = match &self.config.command {
623            CommandSpec::Shell(command) => shell_command(command),
624            CommandSpec::Argv(argv) => {
625                let mut command = Command::new(&argv[0]);
626                if argv.len() > 1 {
627                    command.args(&argv[1..]);
628                }
629                command
630            }
631        };
632        if let Some(cwd) = &self.config.cwd {
633            command.current_dir(cwd);
634        }
635        if let Some(env) = &self.config.env {
636            command.env_clear();
637            command.envs(env.iter().map(|(k, v)| (k, v)));
638        }
639        #[cfg(windows)]
640        {
641            use std::os::windows::process::CommandExt;
642
643            let flags =
644                self.config.creationflags.unwrap_or(0) | windows_priority_flags(self.config.nice);
645            if flags != 0 {
646                command.creation_flags(flags);
647            }
648        }
649        #[cfg(unix)]
650        {
651            let create_process_group = self.config.create_process_group;
652            let nice = self.config.nice;
653            let containment = self.config.containment;
654
655            let needs_pre_exec = create_process_group || nice.is_some() || containment.is_some();
656
657            if needs_pre_exec {
658                use std::os::unix::process::CommandExt;
659
660                unsafe {
661                    command.pre_exec(move || {
662                        match containment {
663                            Some(Containment::Contained) => {
664                                // Place child into its own process group.
665                                if libc::setpgid(0, 0) == -1 {
666                                    return Err(std::io::Error::last_os_error());
667                                }
668                                // Linux: ask the kernel to SIGKILL us when the
669                                // parent thread dies.
670                                // CAVEAT: PR_SET_PDEATHSIG is per-thread, not
671                                // per-process. If the spawning thread exits
672                                // before the process, the child is killed early.
673                                #[cfg(target_os = "linux")]
674                                {
675                                    if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL) == -1 {
676                                        return Err(std::io::Error::last_os_error());
677                                    }
678                                    if libc::getppid() == 1 {
679                                        libc::_exit(1);
680                                    }
681                                }
682                            }
683                            Some(Containment::Detached) => {
684                                // Create a new session so the child is fully
685                                // independent of the parent.
686                                if libc::setsid() == -1 {
687                                    return Err(std::io::Error::last_os_error());
688                                }
689                            }
690                            None => {
691                                if create_process_group && libc::setpgid(0, 0) == -1 {
692                                    return Err(std::io::Error::last_os_error());
693                                }
694                            }
695                        }
696                        if let Some(nice) = nice {
697                            let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
698                            if result == -1 {
699                                return Err(std::io::Error::last_os_error());
700                            }
701                        }
702                        Ok(())
703                    });
704                }
705            }
706        }
707        command
708    }
709
710    fn spawn_reader<R>(&self, pipe: R, source_stream: StreamKind, visible_stream: StreamKind)
711    where
712        R: Read + Send + 'static,
713    {
714        let shared = Arc::clone(&self.shared);
715        thread::spawn(move || {
716            let mut reader = pipe;
717            let mut chunk = [0_u8; 4096];
718            let mut pending = Vec::new();
719
720            loop {
721                match reader.read(&mut chunk) {
722                    Ok(0) => break,
723                    Ok(n) => feed_chunk(&shared, visible_stream, &mut pending, &chunk[..n]),
724                    Err(_) => break,
725                }
726            }
727
728            if !pending.is_empty() {
729                emit_line(&shared, visible_stream, std::mem::take(&mut pending));
730            }
731
732            let mut guard = shared.queues.lock().expect("queue mutex poisoned");
733            match source_stream {
734                StreamKind::Stdout => guard.stdout_closed = true,
735                StreamKind::Stderr => guard.stderr_closed = true,
736            }
737            shared.condvar.notify_all();
738        });
739    }
740
741    fn set_returncode(&self, code: i32) {
742        self.shared.returncode.store(code as i64, Ordering::Release);
743        self.shared.condvar.notify_all();
744    }
745
746    fn wait_for_capture_completion_impl(&self) {
747        crate::rp_rust_debug_scope!(
748            "running_process_core::NativeProcess::wait_for_capture_completion"
749        );
750        if !self.config.capture {
751            return;
752        }
753
754        let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
755        while !(guard.stdout_closed && guard.stderr_closed) {
756            guard = self
757                .shared
758                .condvar
759                .wait(guard)
760                .expect("queue mutex poisoned");
761        }
762    }
763}
764
765#[cfg(unix)]
766pub fn unix_set_priority(pid: u32, nice: i32) -> Result<(), std::io::Error> {
767    let result = unsafe { libc::setpriority(libc::PRIO_PROCESS, pid, nice) };
768    if result == -1 {
769        return Err(std::io::Error::last_os_error());
770    }
771    Ok(())
772}
773
774#[cfg(unix)]
775pub fn unix_signal_process(pid: u32, signal: UnixSignal) -> Result<(), std::io::Error> {
776    let result = unsafe { libc::kill(pid as i32, unix_signal_raw(signal)) };
777    if result == -1 {
778        return Err(std::io::Error::last_os_error());
779    }
780    Ok(())
781}
782
783#[cfg(unix)]
784pub fn unix_signal_process_group(pid: i32, signal: UnixSignal) -> Result<(), std::io::Error> {
785    let result = unsafe { libc::killpg(pid, unix_signal_raw(signal)) };
786    if result == -1 {
787        return Err(std::io::Error::last_os_error());
788    }
789    Ok(())
790}
791
792fn log_spawned_child_pid(pid: u32) -> Result<(), std::io::Error> {
793    let Some(path) = std::env::var_os(CHILD_PID_LOG_PATH_ENV) else {
794        return Ok(());
795    };
796
797    let mut file = OpenOptions::new().create(true).append(true).open(path)?;
798    file.write_all(format!("{pid}\n").as_bytes())?;
799    file.flush()?;
800    Ok(())
801}
802
803#[cfg(windows)]
804fn assign_child_to_windows_kill_on_close_job_impl(
805    child: &Child,
806) -> Result<WindowsJobHandle, std::io::Error> {
807    crate::rp_rust_debug_scope!("running_process_core::assign_child_to_windows_kill_on_close_job");
808    use std::mem::zeroed;
809    use std::os::windows::io::AsRawHandle;
810
811    use winapi::shared::minwindef::FALSE;
812    use winapi::um::handleapi::{CloseHandle, INVALID_HANDLE_VALUE};
813    use winapi::um::jobapi2::{
814        AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
815    };
816    use winapi::um::winnt::{
817        JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
818        JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
819    };
820
821    let handle = child.as_raw_handle();
822    let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
823    if job.is_null() || job == INVALID_HANDLE_VALUE {
824        return Err(std::io::Error::last_os_error());
825    }
826
827    let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
828    info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
829    let ok = unsafe {
830        SetInformationJobObject(
831            job,
832            JobObjectExtendedLimitInformation,
833            (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
834            std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
835        )
836    };
837    if ok == FALSE {
838        let err = std::io::Error::last_os_error();
839        unsafe { CloseHandle(job) };
840        return Err(err);
841    }
842
843    let ok = unsafe { AssignProcessToJobObject(job, handle.cast()) };
844    if ok == FALSE {
845        let err = std::io::Error::last_os_error();
846        unsafe { CloseHandle(job) };
847        return Err(err);
848    }
849
850    Ok(WindowsJobHandle(job as usize))
851}
852
853fn feed_chunk(shared: &Arc<SharedState>, stream: StreamKind, pending: &mut Vec<u8>, chunk: &[u8]) {
854    let mut start = 0;
855    let mut index = 0;
856
857    while index < chunk.len() {
858        if chunk[index] == b'\n' {
859            let end = if index > start && chunk[index - 1] == b'\r' {
860                index - 1
861            } else {
862                index
863            };
864            pending.extend_from_slice(&chunk[start..end]);
865            if !pending.is_empty() {
866                emit_line(shared, stream, std::mem::take(pending));
867            }
868            start = index + 1;
869        }
870        index += 1;
871    }
872
873    pending.extend_from_slice(&chunk[start..]);
874}
875
876fn emit_line(shared: &Arc<SharedState>, stream: StreamKind, line: Vec<u8>) {
877    let event = StreamEvent { stream, line };
878    let mut guard = shared.queues.lock().expect("queue mutex poisoned");
879    match event.stream {
880        StreamKind::Stdout => {
881            guard.stdout_history_bytes += event.line.len();
882            guard.stdout_history.push_back(event.line.clone());
883            guard.stdout_queue.push_back(event.line.clone());
884        }
885        StreamKind::Stderr => {
886            guard.stderr_history_bytes += event.line.len();
887            guard.stderr_history.push_back(event.line.clone());
888            guard.stderr_queue.push_back(event.line.clone());
889        }
890    }
891    guard.combined_history_bytes += event.line.len();
892    guard.combined_history.push_back(event.clone());
893    guard.combined_queue.push_back(event);
894    shared.condvar.notify_all();
895}
896
897fn shell_command(command: &str) -> Command {
898    #[cfg(windows)]
899    {
900        use std::os::windows::process::CommandExt;
901
902        let mut cmd = Command::new("cmd");
903        cmd.raw_arg("/D /S /C \"");
904        cmd.raw_arg(command);
905        cmd.raw_arg("\"");
906        cmd
907    }
908    #[cfg(not(windows))]
909    {
910        let mut cmd = Command::new("sh");
911        cmd.arg("-lc").arg(command);
912        cmd
913    }
914}
915
916fn exit_code(status: std::process::ExitStatus) -> i32 {
917    #[cfg(unix)]
918    {
919        use std::os::unix::process::ExitStatusExt;
920        status
921            .code()
922            .unwrap_or_else(|| -status.signal().unwrap_or(1))
923    }
924    #[cfg(not(unix))]
925    {
926        status.code().unwrap_or(1)
927    }
928}
929
930#[cfg(unix)]
931fn unix_signal_raw(signal: UnixSignal) -> i32 {
932    match signal {
933        UnixSignal::Interrupt => libc::SIGINT,
934        UnixSignal::Terminate => libc::SIGTERM,
935        UnixSignal::Kill => libc::SIGKILL,
936    }
937}
938
939#[cfg(windows)]
940fn windows_priority_flags(nice: Option<i32>) -> u32 {
941    const IDLE_PRIORITY_CLASS: u32 = 0x0000_0040;
942    const BELOW_NORMAL_PRIORITY_CLASS: u32 = 0x0000_4000;
943    const ABOVE_NORMAL_PRIORITY_CLASS: u32 = 0x0000_8000;
944    const HIGH_PRIORITY_CLASS: u32 = 0x0000_0080;
945
946    match nice {
947        Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
948        Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
949        Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
950        Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
951        _ => 0,
952    }
953}
954#[cfg(test)]
955mod tests {
956    use super::*;
957
958    // ── StreamKind tests ──
959
960    #[test]
961    fn stream_kind_as_str_stdout() {
962        assert_eq!(StreamKind::Stdout.as_str(), "stdout");
963    }
964
965    #[test]
966    fn stream_kind_as_str_stderr() {
967        assert_eq!(StreamKind::Stderr.as_str(), "stderr");
968    }
969
970    #[test]
971    fn stream_kind_equality() {
972        assert_eq!(StreamKind::Stdout, StreamKind::Stdout);
973        assert_ne!(StreamKind::Stdout, StreamKind::Stderr);
974    }
975
976    // ── StreamEvent tests ──
977
978    #[test]
979    fn stream_event_clone() {
980        let event = StreamEvent {
981            stream: StreamKind::Stdout,
982            line: b"hello".to_vec(),
983        };
984        let cloned = event.clone();
985        assert_eq!(event, cloned);
986    }
987
988    // ── ReadStatus tests ──
989
990    #[test]
991    fn read_status_line_variant() {
992        let status: ReadStatus<Vec<u8>> = ReadStatus::Line(b"data".to_vec());
993        assert!(matches!(status, ReadStatus::Line(ref v) if v == b"data"));
994    }
995
996    #[test]
997    fn read_status_timeout_variant() {
998        let status: ReadStatus<Vec<u8>> = ReadStatus::Timeout;
999        assert!(matches!(status, ReadStatus::Timeout));
1000    }
1001
1002    #[test]
1003    fn read_status_eof_variant() {
1004        let status: ReadStatus<Vec<u8>> = ReadStatus::Eof;
1005        assert!(matches!(status, ReadStatus::Eof));
1006    }
1007
1008    // ── ProcessError tests ──
1009
1010    #[test]
1011    fn process_error_display_already_started() {
1012        assert_eq!(
1013            ProcessError::AlreadyStarted.to_string(),
1014            "process already started"
1015        );
1016    }
1017
1018    #[test]
1019    fn process_error_display_not_running() {
1020        assert_eq!(
1021            ProcessError::NotRunning.to_string(),
1022            "process is not running"
1023        );
1024    }
1025
1026    #[test]
1027    fn process_error_display_stdin_unavailable() {
1028        assert_eq!(
1029            ProcessError::StdinUnavailable.to_string(),
1030            "process stdin is not available"
1031        );
1032    }
1033
1034    #[test]
1035    fn process_error_display_timeout() {
1036        assert_eq!(ProcessError::Timeout.to_string(), "process timed out");
1037    }
1038
1039    #[test]
1040    fn process_error_display_spawn() {
1041        let err = ProcessError::Spawn(std::io::Error::new(
1042            std::io::ErrorKind::NotFound,
1043            "not found",
1044        ));
1045        assert!(err.to_string().contains("not found"));
1046    }
1047
1048    #[test]
1049    fn process_error_display_io() {
1050        let err = ProcessError::Io(std::io::Error::new(
1051            std::io::ErrorKind::BrokenPipe,
1052            "broken",
1053        ));
1054        assert!(err.to_string().contains("broken"));
1055    }
1056
1057    // ── CommandSpec tests ──
1058
1059    #[test]
1060    fn command_spec_shell_variant() {
1061        let spec = CommandSpec::Shell("echo hello".to_string());
1062        assert!(matches!(spec, CommandSpec::Shell(ref s) if s == "echo hello"));
1063    }
1064
1065    #[test]
1066    fn command_spec_argv_variant() {
1067        let spec = CommandSpec::Argv(vec!["echo".to_string(), "hello".to_string()]);
1068        assert!(matches!(spec, CommandSpec::Argv(ref v) if v.len() == 2));
1069    }
1070
1071    // ── StdinMode / StderrMode tests ──
1072
1073    #[test]
1074    fn stdin_mode_equality() {
1075        assert_eq!(StdinMode::Inherit, StdinMode::Inherit);
1076        assert_ne!(StdinMode::Piped, StdinMode::Null);
1077    }
1078
1079    #[test]
1080    fn stderr_mode_equality() {
1081        assert_eq!(StderrMode::Stdout, StderrMode::Stdout);
1082        assert_ne!(StderrMode::Stdout, StderrMode::Pipe);
1083    }
1084
1085    // ── SharedState tests ──
1086
1087    #[test]
1088    fn shared_state_new_with_capture() {
1089        let state = SharedState::new(true);
1090        let queues = state.queues.lock().unwrap();
1091        assert!(!queues.stdout_closed);
1092        assert!(!queues.stderr_closed);
1093        assert!(queues.stdout_queue.is_empty());
1094        assert!(queues.stderr_queue.is_empty());
1095    }
1096
1097    #[test]
1098    fn shared_state_new_without_capture() {
1099        let state = SharedState::new(false);
1100        let queues = state.queues.lock().unwrap();
1101        assert!(queues.stdout_closed);
1102        assert!(queues.stderr_closed);
1103    }
1104
1105    #[test]
1106    fn shared_state_returncode_initially_not_set() {
1107        let state = SharedState::new(true);
1108        let code = state.returncode.load(Ordering::Acquire);
1109        assert_eq!(code, RETURNCODE_NOT_SET);
1110    }
1111
1112    // ── feed_chunk tests ──
1113
1114    #[test]
1115    fn feed_chunk_single_line_with_newline() {
1116        let shared = Arc::new(SharedState::new(true));
1117        let mut pending = Vec::new();
1118        feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"hello\n");
1119        let queues = shared.queues.lock().unwrap();
1120        assert_eq!(queues.stdout_queue.len(), 1);
1121        assert_eq!(queues.stdout_queue[0], b"hello");
1122        assert!(pending.is_empty());
1123    }
1124
1125    #[test]
1126    fn feed_chunk_crlf_stripping() {
1127        let shared = Arc::new(SharedState::new(true));
1128        let mut pending = Vec::new();
1129        feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"hello\r\n");
1130        let queues = shared.queues.lock().unwrap();
1131        assert_eq!(queues.stdout_queue.len(), 1);
1132        assert_eq!(queues.stdout_queue[0], b"hello");
1133    }
1134
1135    #[test]
1136    fn feed_chunk_multiple_lines() {
1137        let shared = Arc::new(SharedState::new(true));
1138        let mut pending = Vec::new();
1139        feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"a\nb\nc\n");
1140        let queues = shared.queues.lock().unwrap();
1141        assert_eq!(queues.stdout_queue.len(), 3);
1142        assert_eq!(queues.stdout_queue[0], b"a");
1143        assert_eq!(queues.stdout_queue[1], b"b");
1144        assert_eq!(queues.stdout_queue[2], b"c");
1145    }
1146
1147    #[test]
1148    fn feed_chunk_no_newline_stays_pending() {
1149        let shared = Arc::new(SharedState::new(true));
1150        let mut pending = Vec::new();
1151        feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"partial");
1152        let queues = shared.queues.lock().unwrap();
1153        assert!(queues.stdout_queue.is_empty());
1154        assert_eq!(pending, b"partial");
1155    }
1156
1157    #[test]
1158    fn feed_chunk_accumulates_pending() {
1159        let shared = Arc::new(SharedState::new(true));
1160        let mut pending = Vec::new();
1161        feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"hel");
1162        feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"lo\n");
1163        let queues = shared.queues.lock().unwrap();
1164        assert_eq!(queues.stdout_queue.len(), 1);
1165        assert_eq!(queues.stdout_queue[0], b"hello");
1166        assert!(pending.is_empty());
1167    }
1168
1169    #[test]
1170    fn feed_chunk_empty_line_not_emitted() {
1171        let shared = Arc::new(SharedState::new(true));
1172        let mut pending = Vec::new();
1173        feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"\n");
1174        let queues = shared.queues.lock().unwrap();
1175        assert!(queues.stdout_queue.is_empty());
1176    }
1177
1178    #[test]
1179    fn feed_chunk_stderr_goes_to_stderr_queue() {
1180        let shared = Arc::new(SharedState::new(true));
1181        let mut pending = Vec::new();
1182        feed_chunk(&shared, StreamKind::Stderr, &mut pending, b"error\n");
1183        let queues = shared.queues.lock().unwrap();
1184        assert!(queues.stdout_queue.is_empty());
1185        assert_eq!(queues.stderr_queue.len(), 1);
1186        assert_eq!(queues.stderr_queue[0], b"error");
1187    }
1188
1189    // ── emit_line tests ──
1190
1191    #[test]
1192    fn emit_line_updates_all_queues_and_history() {
1193        let shared = Arc::new(SharedState::new(true));
1194        emit_line(&shared, StreamKind::Stdout, b"test".to_vec());
1195        let queues = shared.queues.lock().unwrap();
1196        assert_eq!(queues.stdout_queue.len(), 1);
1197        assert_eq!(queues.stdout_history.len(), 1);
1198        assert_eq!(queues.stdout_history_bytes, 4);
1199        assert_eq!(queues.combined_queue.len(), 1);
1200        assert_eq!(queues.combined_history.len(), 1);
1201        assert_eq!(queues.combined_history_bytes, 4);
1202    }
1203
1204    #[test]
1205    fn emit_line_stderr_updates_stderr_queues() {
1206        let shared = Arc::new(SharedState::new(true));
1207        emit_line(&shared, StreamKind::Stderr, b"err".to_vec());
1208        let queues = shared.queues.lock().unwrap();
1209        assert_eq!(queues.stderr_queue.len(), 1);
1210        assert_eq!(queues.stderr_history.len(), 1);
1211        assert_eq!(queues.stderr_history_bytes, 3);
1212        assert_eq!(queues.combined_queue.len(), 1);
1213        assert_eq!(queues.combined_history_bytes, 3);
1214    }
1215
1216    // ── NativeProcess unit tests (no process spawn) ──
1217
1218    #[test]
1219    fn native_process_returncode_none_before_start() {
1220        let process = NativeProcess::new(ProcessConfig {
1221            command: CommandSpec::Argv(vec!["echo".into()]),
1222            cwd: None,
1223            env: None,
1224            capture: false,
1225            stderr_mode: StderrMode::Stdout,
1226            creationflags: None,
1227            create_process_group: false,
1228            stdin_mode: StdinMode::Inherit,
1229            nice: None,
1230            containment: None,
1231        });
1232        assert!(process.returncode().is_none());
1233    }
1234
1235    #[test]
1236    fn native_process_pid_none_before_start() {
1237        let process = NativeProcess::new(ProcessConfig {
1238            command: CommandSpec::Argv(vec!["echo".into()]),
1239            cwd: None,
1240            env: None,
1241            capture: false,
1242            stderr_mode: StderrMode::Stdout,
1243            creationflags: None,
1244            create_process_group: false,
1245            stdin_mode: StdinMode::Inherit,
1246            nice: None,
1247            containment: None,
1248        });
1249        assert!(process.pid().is_none());
1250    }
1251
1252    #[test]
1253    fn native_process_has_pending_false_when_no_capture() {
1254        let process = NativeProcess::new(ProcessConfig {
1255            command: CommandSpec::Argv(vec!["echo".into()]),
1256            cwd: None,
1257            env: None,
1258            capture: false,
1259            stderr_mode: StderrMode::Stdout,
1260            creationflags: None,
1261            create_process_group: false,
1262            stdin_mode: StdinMode::Inherit,
1263            nice: None,
1264            containment: None,
1265        });
1266        assert!(!process.has_pending_stream(StreamKind::Stdout));
1267        assert!(!process.has_pending_combined());
1268    }
1269
1270    #[test]
1271    fn native_process_drain_empty_when_no_capture() {
1272        let process = NativeProcess::new(ProcessConfig {
1273            command: CommandSpec::Argv(vec!["echo".into()]),
1274            cwd: None,
1275            env: None,
1276            capture: false,
1277            stderr_mode: StderrMode::Stdout,
1278            creationflags: None,
1279            create_process_group: false,
1280            stdin_mode: StdinMode::Inherit,
1281            nice: None,
1282            containment: None,
1283        });
1284        assert!(process.drain_stream(StreamKind::Stdout).is_empty());
1285        assert!(process.drain_combined().is_empty());
1286    }
1287
1288    #[test]
1289    fn native_process_stderr_not_pending_when_merged() {
1290        let process = NativeProcess::new(ProcessConfig {
1291            command: CommandSpec::Argv(vec!["echo".into()]),
1292            cwd: None,
1293            env: None,
1294            capture: true,
1295            stderr_mode: StderrMode::Stdout,
1296            creationflags: None,
1297            create_process_group: false,
1298            stdin_mode: StdinMode::Inherit,
1299            nice: None,
1300            containment: None,
1301        });
1302        assert!(!process.has_pending_stream(StreamKind::Stderr));
1303    }
1304
1305    #[test]
1306    fn native_process_drain_stderr_empty_when_merged() {
1307        let process = NativeProcess::new(ProcessConfig {
1308            command: CommandSpec::Argv(vec!["echo".into()]),
1309            cwd: None,
1310            env: None,
1311            capture: true,
1312            stderr_mode: StderrMode::Stdout,
1313            creationflags: None,
1314            create_process_group: false,
1315            stdin_mode: StdinMode::Inherit,
1316            nice: None,
1317            containment: None,
1318        });
1319        assert!(process.drain_stream(StreamKind::Stderr).is_empty());
1320    }
1321
1322    #[test]
1323    fn native_process_captured_stderr_empty_when_merged() {
1324        let process = NativeProcess::new(ProcessConfig {
1325            command: CommandSpec::Argv(vec!["echo".into()]),
1326            cwd: None,
1327            env: None,
1328            capture: true,
1329            stderr_mode: StderrMode::Stdout,
1330            creationflags: None,
1331            create_process_group: false,
1332            stdin_mode: StdinMode::Inherit,
1333            nice: None,
1334            containment: None,
1335        });
1336        assert!(process.captured_stderr().is_empty());
1337    }
1338
1339    #[test]
1340    fn native_process_captured_stream_bytes_zero_when_merged_stderr() {
1341        let process = NativeProcess::new(ProcessConfig {
1342            command: CommandSpec::Argv(vec!["echo".into()]),
1343            cwd: None,
1344            env: None,
1345            capture: true,
1346            stderr_mode: StderrMode::Stdout,
1347            creationflags: None,
1348            create_process_group: false,
1349            stdin_mode: StdinMode::Inherit,
1350            nice: None,
1351            containment: None,
1352        });
1353        assert_eq!(process.captured_stream_bytes(StreamKind::Stderr), 0);
1354    }
1355
1356    #[test]
1357    fn native_process_clear_captured_stderr_zero_when_merged() {
1358        let process = NativeProcess::new(ProcessConfig {
1359            command: CommandSpec::Argv(vec!["echo".into()]),
1360            cwd: None,
1361            env: None,
1362            capture: true,
1363            stderr_mode: StderrMode::Stdout,
1364            creationflags: None,
1365            create_process_group: false,
1366            stdin_mode: StdinMode::Inherit,
1367            nice: None,
1368            containment: None,
1369        });
1370        assert_eq!(process.clear_captured_stream(StreamKind::Stderr), 0);
1371    }
1372
1373    #[test]
1374    fn native_process_read_stream_eof_when_stderr_merged() {
1375        let process = NativeProcess::new(ProcessConfig {
1376            command: CommandSpec::Argv(vec!["echo".into()]),
1377            cwd: None,
1378            env: None,
1379            capture: true,
1380            stderr_mode: StderrMode::Stdout,
1381            creationflags: None,
1382            create_process_group: false,
1383            stdin_mode: StdinMode::Inherit,
1384            nice: None,
1385            containment: None,
1386        });
1387        assert_eq!(
1388            process.read_stream(StreamKind::Stderr, Some(Duration::from_millis(10))),
1389            ReadStatus::Eof
1390        );
1391    }
1392
1393    // ── log_spawned_child_pid ──
1394
1395    #[test]
1396    fn log_spawned_child_pid_noop_without_env() {
1397        std::env::remove_var("RUNNING_PROCESS_CHILD_PID_LOG_PATH");
1398        assert!(log_spawned_child_pid(12345).is_ok());
1399    }
1400
1401    // ── shell_command ──
1402
1403    #[test]
1404    fn shell_command_creates_command() {
1405        let cmd = shell_command("echo test");
1406        let _ = format!("{:?}", cmd);
1407    }
1408
1409    // ── exit_code ──
1410
1411    #[test]
1412    fn exit_code_from_success() {
1413        let output = std::process::Command::new("python")
1414            .args(["-c", "pass"])
1415            .output()
1416            .unwrap();
1417        assert_eq!(exit_code(output.status), 0);
1418    }
1419
1420    #[test]
1421    fn exit_code_from_nonzero() {
1422        let output = std::process::Command::new("python")
1423            .args(["-c", "import sys; sys.exit(42)"])
1424            .output()
1425            .unwrap();
1426        assert_eq!(exit_code(output.status), 42);
1427    }
1428
1429    // ── windows_priority_flags ──
1430
1431    #[cfg(windows)]
1432    mod windows_tests {
1433        use super::*;
1434
1435        const IDLE_PRIORITY_CLASS: u32 = 0x0000_0040;
1436        const BELOW_NORMAL_PRIORITY_CLASS: u32 = 0x0000_4000;
1437        const ABOVE_NORMAL_PRIORITY_CLASS: u32 = 0x0000_8000;
1438        const HIGH_PRIORITY_CLASS: u32 = 0x0000_0080;
1439
1440        #[test]
1441        fn priority_flags_none() {
1442            assert_eq!(windows_priority_flags(None), 0);
1443        }
1444
1445        #[test]
1446        fn priority_flags_zero() {
1447            assert_eq!(windows_priority_flags(Some(0)), 0);
1448        }
1449
1450        #[test]
1451        fn priority_flags_high_nice_idle() {
1452            assert_eq!(windows_priority_flags(Some(15)), IDLE_PRIORITY_CLASS);
1453            assert_eq!(windows_priority_flags(Some(20)), IDLE_PRIORITY_CLASS);
1454        }
1455
1456        #[test]
1457        fn priority_flags_low_positive_below_normal() {
1458            assert_eq!(windows_priority_flags(Some(1)), BELOW_NORMAL_PRIORITY_CLASS);
1459            assert_eq!(
1460                windows_priority_flags(Some(14)),
1461                BELOW_NORMAL_PRIORITY_CLASS
1462            );
1463        }
1464
1465        #[test]
1466        fn priority_flags_negative_above_normal() {
1467            assert_eq!(
1468                windows_priority_flags(Some(-1)),
1469                ABOVE_NORMAL_PRIORITY_CLASS
1470            );
1471            assert_eq!(
1472                windows_priority_flags(Some(-14)),
1473                ABOVE_NORMAL_PRIORITY_CLASS
1474            );
1475        }
1476
1477        #[test]
1478        fn priority_flags_very_negative_high() {
1479            assert_eq!(windows_priority_flags(Some(-15)), HIGH_PRIORITY_CLASS);
1480            assert_eq!(windows_priority_flags(Some(-20)), HIGH_PRIORITY_CLASS);
1481        }
1482    }
1483
1484    // ── ProcessConfig ──
1485
1486    #[test]
1487    fn process_config_clone() {
1488        let config = ProcessConfig {
1489            command: CommandSpec::Shell("echo".to_string()),
1490            cwd: Some("/tmp".into()),
1491            env: Some(vec![("KEY".to_string(), "VAL".to_string())]),
1492            capture: true,
1493            stderr_mode: StderrMode::Pipe,
1494            creationflags: Some(0x10),
1495            create_process_group: true,
1496            stdin_mode: StdinMode::Piped,
1497            nice: Some(5),
1498            containment: None,
1499        };
1500        let cloned = config.clone();
1501        assert!(cloned.capture);
1502        assert_eq!(cloned.nice, Some(5));
1503    }
1504
1505    // ── render_rust_debug_traces ──
1506
1507    #[test]
1508    fn render_rust_debug_traces_returns_string() {
1509        let result = render_rust_debug_traces();
1510        let _ = result.len();
1511    }
1512
1513    // ── RustDebugScopeGuard ──
1514
1515    #[test]
1516    fn rust_debug_scope_guard_enters_and_drops() {
1517        let _guard = RustDebugScopeGuard::enter("test_scope", file!(), line!());
1518        let traces = render_rust_debug_traces();
1519        assert!(traces.contains("test_scope"));
1520        drop(_guard);
1521    }
1522
1523    // ── Unix signal tests ──
1524
1525    #[cfg(unix)]
1526    mod unix_tests {
1527        use super::*;
1528
1529        #[test]
1530        fn unix_signal_raw_values() {
1531            assert_eq!(unix_signal_raw(UnixSignal::Interrupt), libc::SIGINT);
1532            assert_eq!(unix_signal_raw(UnixSignal::Terminate), libc::SIGTERM);
1533            assert_eq!(unix_signal_raw(UnixSignal::Kill), libc::SIGKILL);
1534        }
1535
1536        #[test]
1537        fn unix_signal_enum_equality() {
1538            assert_eq!(UnixSignal::Interrupt, UnixSignal::Interrupt);
1539            assert_ne!(UnixSignal::Interrupt, UnixSignal::Kill);
1540        }
1541    }
1542
1543    // ── wait_for_capture_completion ──
1544
1545    #[test]
1546    fn wait_for_capture_completion_noop_without_capture() {
1547        let process = NativeProcess::new(ProcessConfig {
1548            command: CommandSpec::Argv(vec!["echo".into()]),
1549            cwd: None,
1550            env: None,
1551            capture: false,
1552            stderr_mode: StderrMode::Stdout,
1553            creationflags: None,
1554            create_process_group: false,
1555            stdin_mode: StdinMode::Inherit,
1556            nice: None,
1557            containment: None,
1558        });
1559        process.wait_for_capture_completion_impl();
1560    }
1561
1562    // ── build_command tests ──
1563
1564    #[test]
1565    fn build_command_from_argv() {
1566        let process = NativeProcess::new(ProcessConfig {
1567            command: CommandSpec::Argv(vec!["echo".into(), "hello".into(), "world".into()]),
1568            cwd: None,
1569            env: None,
1570            capture: false,
1571            stderr_mode: StderrMode::Stdout,
1572            creationflags: None,
1573            create_process_group: false,
1574            stdin_mode: StdinMode::Inherit,
1575            nice: None,
1576            containment: None,
1577        });
1578        let cmd = process.build_command();
1579        assert_eq!(cmd.get_program(), "echo");
1580        let args: Vec<_> = cmd.get_args().collect();
1581        assert_eq!(args, vec!["hello", "world"]);
1582    }
1583
1584    #[test]
1585    fn build_command_from_shell() {
1586        let process = NativeProcess::new(ProcessConfig {
1587            command: CommandSpec::Shell("echo test".into()),
1588            cwd: None,
1589            env: None,
1590            capture: false,
1591            stderr_mode: StderrMode::Stdout,
1592            creationflags: None,
1593            create_process_group: false,
1594            stdin_mode: StdinMode::Inherit,
1595            nice: None,
1596            containment: None,
1597        });
1598        let cmd = process.build_command();
1599        // Shell commands go through the OS shell
1600        let program = cmd.get_program().to_string_lossy().to_string();
1601        #[cfg(windows)]
1602        assert!(
1603            program.contains("cmd"),
1604            "expected cmd shell, got {}",
1605            program
1606        );
1607        #[cfg(not(windows))]
1608        assert!(program.contains("sh"), "expected sh shell, got {}", program);
1609    }
1610
1611    #[test]
1612    fn build_command_with_cwd() {
1613        let tmp = std::env::temp_dir();
1614        let process = NativeProcess::new(ProcessConfig {
1615            command: CommandSpec::Argv(vec!["echo".into()]),
1616            cwd: Some(tmp.clone()),
1617            env: None,
1618            capture: false,
1619            stderr_mode: StderrMode::Stdout,
1620            creationflags: None,
1621            create_process_group: false,
1622            stdin_mode: StdinMode::Inherit,
1623            nice: None,
1624            containment: None,
1625        });
1626        let cmd = process.build_command();
1627        assert_eq!(cmd.get_current_dir().unwrap(), &tmp);
1628    }
1629
1630    #[test]
1631    fn build_command_with_env() {
1632        let process = NativeProcess::new(ProcessConfig {
1633            command: CommandSpec::Argv(vec!["echo".into()]),
1634            cwd: None,
1635            env: Some(vec![
1636                ("FOO".into(), "bar".into()),
1637                ("BAZ".into(), "qux".into()),
1638            ]),
1639            capture: false,
1640            stderr_mode: StderrMode::Stdout,
1641            creationflags: None,
1642            create_process_group: false,
1643            stdin_mode: StdinMode::Inherit,
1644            nice: None,
1645            containment: None,
1646        });
1647        let cmd = process.build_command();
1648        let envs: Vec<_> = cmd.get_envs().collect();
1649        assert!(envs
1650            .iter()
1651            .any(|(k, v)| *k == "FOO" && *v == Some(std::ffi::OsStr::new("bar"))));
1652        assert!(envs
1653            .iter()
1654            .any(|(k, v)| *k == "BAZ" && *v == Some(std::ffi::OsStr::new("qux"))));
1655    }
1656
1657    #[test]
1658    fn build_command_single_argv() {
1659        let process = NativeProcess::new(ProcessConfig {
1660            command: CommandSpec::Argv(vec!["echo".into()]),
1661            cwd: None,
1662            env: None,
1663            capture: false,
1664            stderr_mode: StderrMode::Stdout,
1665            creationflags: None,
1666            create_process_group: false,
1667            stdin_mode: StdinMode::Inherit,
1668            nice: None,
1669            containment: None,
1670        });
1671        let cmd = process.build_command();
1672        assert_eq!(cmd.get_program(), "echo");
1673        assert_eq!(cmd.get_args().count(), 0);
1674    }
1675
1676    // ── set_returncode tests ──
1677
1678    #[test]
1679    fn set_returncode_updates_shared_state() {
1680        let process = NativeProcess::new(ProcessConfig {
1681            command: CommandSpec::Argv(vec!["echo".into()]),
1682            cwd: None,
1683            env: None,
1684            capture: false,
1685            stderr_mode: StderrMode::Stdout,
1686            creationflags: None,
1687            create_process_group: false,
1688            stdin_mode: StdinMode::Inherit,
1689            nice: None,
1690            containment: None,
1691        });
1692        assert!(process.returncode().is_none());
1693        process.set_returncode(42);
1694        assert_eq!(process.returncode(), Some(42));
1695    }
1696
1697    #[test]
1698    fn set_returncode_overwrites() {
1699        let process = NativeProcess::new(ProcessConfig {
1700            command: CommandSpec::Argv(vec!["echo".into()]),
1701            cwd: None,
1702            env: None,
1703            capture: false,
1704            stderr_mode: StderrMode::Stdout,
1705            creationflags: None,
1706            create_process_group: false,
1707            stdin_mode: StdinMode::Inherit,
1708            nice: None,
1709            containment: None,
1710        });
1711        process.set_returncode(1);
1712        process.set_returncode(2);
1713        assert_eq!(process.returncode(), Some(2));
1714    }
1715
1716    // ── SharedState with capture ──
1717
1718    #[test]
1719    fn shared_state_with_capture_queues_open() {
1720        let state = SharedState::new(true);
1721        let guard = state.queues.lock().unwrap();
1722        assert!(!guard.stdout_closed);
1723        assert!(!guard.stderr_closed);
1724    }
1725
1726    #[test]
1727    fn shared_state_without_capture_queues_closed() {
1728        let state = SharedState::new(false);
1729        let guard = state.queues.lock().unwrap();
1730        assert!(guard.stdout_closed);
1731        assert!(guard.stderr_closed);
1732    }
1733
1734    // ── ProcessError Display additional variants ──
1735
1736    #[test]
1737    fn process_error_display_io_variant() {
1738        let err = ProcessError::Io(std::io::Error::new(
1739            std::io::ErrorKind::BrokenPipe,
1740            "pipe broken",
1741        ));
1742        let msg = format!("{}", err);
1743        assert!(msg.contains("pipe broken"));
1744    }
1745
1746    #[test]
1747    fn process_error_display_spawn_variant() {
1748        let err = ProcessError::Spawn(std::io::Error::new(
1749            std::io::ErrorKind::NotFound,
1750            "not found",
1751        ));
1752        let msg = format!("{}", err);
1753        assert!(msg.contains("not found"));
1754    }
1755
1756    // ── shell_command produces a command ──
1757
1758    #[test]
1759    fn shell_command_returns_command_with_shell() {
1760        let cmd = shell_command("echo test");
1761        let program = cmd.get_program().to_string_lossy().to_string();
1762        #[cfg(windows)]
1763        assert!(program.contains("cmd"));
1764        #[cfg(not(windows))]
1765        assert!(program.contains("sh"));
1766    }
1767}