1use std::collections::VecDeque;
2use std::fs::OpenOptions;
3use std::io::Read;
4use std::io::Write;
5use std::path::PathBuf;
6use std::process::{Child, Command, Stdio};
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::thread;
10use std::time::{Duration, Instant};
11
12use thiserror::Error;
13
14pub mod console_detect;
15pub mod containment;
16#[cfg(feature = "originator-scan")]
17pub mod originator;
18pub mod pty;
19mod public_symbols;
20mod rust_debug;
21
22pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
23pub use containment::{ContainedChild, ContainedProcessGroup, Containment, ORIGINATOR_ENV_VAR};
24#[cfg(feature = "originator-scan")]
25pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
26pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
27
28#[macro_export]
29macro_rules! rp_rust_debug_scope {
30 ($label:expr) => {
31 let _running_process_rust_debug_scope =
32 $crate::RustDebugScopeGuard::enter($label, file!(), line!());
33 };
34}
35
36const CHILD_PID_LOG_PATH_ENV: &str = "RUNNING_PROCESS_CHILD_PID_LOG_PATH";
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum StreamKind {
40 Stdout,
41 Stderr,
42}
43
44impl StreamKind {
45 pub fn as_str(self) -> &'static str {
46 match self {
47 Self::Stdout => "stdout",
48 Self::Stderr => "stderr",
49 }
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct StreamEvent {
55 pub stream: StreamKind,
56 pub line: Vec<u8>,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum ReadStatus<T> {
61 Line(T),
62 Timeout,
63 Eof,
64}
65
66#[derive(Debug, Error)]
67pub enum ProcessError {
68 #[error("process already started")]
69 AlreadyStarted,
70 #[error("process is not running")]
71 NotRunning,
72 #[error("process stdin is not available")]
73 StdinUnavailable,
74 #[error("failed to spawn process: {0}")]
75 Spawn(std::io::Error),
76 #[error("failed to read process output: {0}")]
77 Io(std::io::Error),
78 #[error("process timed out")]
79 Timeout,
80}
81
82#[derive(Debug, Clone)]
83pub enum CommandSpec {
84 Shell(String),
85 Argv(Vec<String>),
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum StdinMode {
90 Inherit,
91 Piped,
92 Null,
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub enum StderrMode {
97 Stdout,
98 Pipe,
99}
100
101#[cfg(unix)]
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum UnixSignal {
104 Interrupt,
105 Terminate,
106 Kill,
107}
108
109#[derive(Debug, Clone)]
110pub struct ProcessConfig {
111 pub command: CommandSpec,
112 pub cwd: Option<PathBuf>,
113 pub env: Option<Vec<(String, String)>>,
114 pub capture: bool,
115 pub stderr_mode: StderrMode,
116 pub creationflags: Option<u32>,
117 pub create_process_group: bool,
118 pub stdin_mode: StdinMode,
119 pub nice: Option<i32>,
120 pub containment: Option<Containment>,
125}
126
127#[derive(Default)]
128struct QueueState {
129 stdout_queue: VecDeque<Vec<u8>>,
130 stderr_queue: VecDeque<Vec<u8>>,
131 combined_queue: VecDeque<StreamEvent>,
132 stdout_history: VecDeque<Vec<u8>>,
133 stderr_history: VecDeque<Vec<u8>>,
134 combined_history: VecDeque<StreamEvent>,
135 stdout_history_bytes: usize,
136 stderr_history_bytes: usize,
137 combined_history_bytes: usize,
138 stdout_closed: bool,
139 stderr_closed: bool,
140}
141
142const RETURNCODE_NOT_SET: i64 = i64::MIN;
144
145struct SharedState {
146 queues: Mutex<QueueState>,
147 condvar: Condvar,
148 returncode: AtomicI64,
151}
152
153#[cfg(windows)]
154struct WindowsJobHandle(usize);
155
156#[cfg(windows)]
157impl Drop for WindowsJobHandle {
158 fn drop(&mut self) {
159 unsafe {
160 winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
161 }
162 }
163}
164
165struct ChildState {
166 child: Child,
167 #[cfg(windows)]
168 _job: WindowsJobHandle,
169}
170
171impl SharedState {
172 fn new(capture: bool) -> Self {
173 let queues = QueueState {
174 stdout_closed: !capture,
175 stderr_closed: !capture,
176 ..QueueState::default()
177 };
178 Self {
179 queues: Mutex::new(queues),
180 condvar: Condvar::new(),
181 returncode: AtomicI64::new(RETURNCODE_NOT_SET),
182 }
183 }
184}
185
186pub struct NativeProcess {
187 config: ProcessConfig,
188 child: Arc<Mutex<Option<ChildState>>>,
189 shared: Arc<SharedState>,
190}
191
192impl NativeProcess {
193 pub fn new(config: ProcessConfig) -> Self {
194 Self {
195 shared: Arc::new(SharedState::new(config.capture)),
196 child: Arc::new(Mutex::new(None)),
197 config,
198 }
199 }
200
201 #[inline(never)]
203 pub fn start(&self) -> Result<(), ProcessError> {
204 public_symbols::rp_native_process_start_public(self)
205 }
206
207 fn start_impl(&self) -> Result<(), ProcessError> {
208 crate::rp_rust_debug_scope!("running_process_core::NativeProcess::start");
209 let mut guard = self.child.lock().expect("child mutex poisoned");
210 if guard.is_some() {
211 return Err(ProcessError::AlreadyStarted);
212 }
213
214 let mut command = self.build_command();
215 match self.config.stdin_mode {
216 StdinMode::Inherit => {}
217 StdinMode::Piped => {
218 command.stdin(Stdio::piped());
219 }
220 StdinMode::Null => {
221 command.stdin(Stdio::null());
222 }
223 }
224 if self.config.capture {
225 command.stdout(Stdio::piped());
226 command.stderr(Stdio::piped());
227 }
228
229 let mut child = command.spawn().map_err(ProcessError::Spawn)?;
230 log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
231 #[cfg(windows)]
232 let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
233 .map_err(ProcessError::Spawn)?;
234 if self.config.capture {
235 let stdout = child.stdout.take().expect("stdout pipe missing");
236 let stderr = child.stderr.take().expect("stderr pipe missing");
237 self.spawn_reader(stdout, StreamKind::Stdout, StreamKind::Stdout);
238 self.spawn_reader(
239 stderr,
240 StreamKind::Stderr,
241 match self.config.stderr_mode {
242 StderrMode::Stdout => StreamKind::Stdout,
243 StderrMode::Pipe => StreamKind::Stderr,
244 },
245 );
246 }
247 *guard = Some(ChildState {
248 child,
249 #[cfg(windows)]
250 _job: job,
251 });
252 drop(guard);
253 self.spawn_exit_waiter();
254 Ok(())
255 }
256
257 fn spawn_exit_waiter(&self) {
260 let child = Arc::clone(&self.child);
261 let shared = Arc::clone(&self.shared);
262 thread::spawn(move || loop {
263 if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
264 return;
265 }
266 {
267 let mut guard = child.lock().expect("child mutex poisoned");
268 if let Some(child_state) = guard.as_mut() {
269 match child_state.child.try_wait() {
270 Ok(Some(status)) => {
271 let code = exit_code(status);
272 shared.returncode.store(code as i64, Ordering::Release);
273 shared.condvar.notify_all();
274 return;
275 }
276 Ok(None) => {}
277 Err(_) => return,
278 }
279 } else {
280 return;
281 }
282 }
283 thread::sleep(Duration::from_millis(10));
284 });
285 }
286
287 pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
288 let mut guard = self.child.lock().expect("child mutex poisoned");
289 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
290 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
291 use std::io::Write;
292 stdin.write_all(data).map_err(ProcessError::Io)?;
293 stdin.flush().map_err(ProcessError::Io)?;
294 drop(child.stdin.take());
295 Ok(())
296 }
297
298 pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
299 if let Some(code) = self.returncode() {
301 return Ok(Some(code));
302 }
303 let mut guard = self.child.lock().expect("child mutex poisoned");
304 let Some(child_state) = guard.as_mut() else {
305 return Ok(self.returncode());
306 };
307 let child = &mut child_state.child;
308 let status = child.try_wait().map_err(ProcessError::Io)?;
309 if let Some(status) = status {
310 let code = exit_code(status);
311 self.set_returncode(code);
312 return Ok(Some(code));
313 }
314 Ok(None)
315 }
316
317 #[inline(never)]
319 pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
320 public_symbols::rp_native_process_wait_public(self, timeout)
321 }
322
323 fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
324 crate::rp_rust_debug_scope!("running_process_core::NativeProcess::wait");
325 if self.child.lock().expect("child mutex poisoned").is_none() {
326 return self.returncode().ok_or(ProcessError::NotRunning);
327 }
328 if let Some(code) = self.returncode() {
330 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
331 return Ok(code);
332 }
333 let start = Instant::now();
334 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
335 loop {
336 let rc = self.shared.returncode.load(Ordering::Acquire);
338 if rc != RETURNCODE_NOT_SET {
339 drop(guard);
340 let code = rc as i32;
341 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
342 return Ok(code);
343 }
344 if let Some(limit) = timeout {
345 let elapsed = start.elapsed();
346 if elapsed >= limit {
347 return Err(ProcessError::Timeout);
348 }
349 let remaining = limit - elapsed;
350 let wait_time = remaining.min(Duration::from_millis(50));
352 guard = self
353 .shared
354 .condvar
355 .wait_timeout(guard, wait_time)
356 .expect("queue mutex poisoned")
357 .0;
358 } else {
359 guard = self
361 .shared
362 .condvar
363 .wait_timeout(guard, Duration::from_millis(50))
364 .expect("queue mutex poisoned")
365 .0;
366 }
367 }
368 }
369
370 #[inline(never)]
372 pub fn kill(&self) -> Result<(), ProcessError> {
373 public_symbols::rp_native_process_kill_public(self)
374 }
375
376 fn kill_impl(&self) -> Result<(), ProcessError> {
377 crate::rp_rust_debug_scope!("running_process_core::NativeProcess::kill");
378 let mut guard = self.child.lock().expect("child mutex poisoned");
379 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
380 child.kill().map_err(ProcessError::Io)?;
381 let status = child.wait().map_err(ProcessError::Io)?;
382 self.set_returncode(exit_code(status));
383 Ok(())
384 }
385
386 pub fn terminate(&self) -> Result<(), ProcessError> {
387 self.kill()
388 }
389
390 #[inline(never)]
392 pub fn close(&self) -> Result<(), ProcessError> {
393 public_symbols::rp_native_process_close_public(self)
394 }
395
396 fn close_impl(&self) -> Result<(), ProcessError> {
397 crate::rp_rust_debug_scope!("running_process_core::NativeProcess::close");
398 if self.child.lock().expect("child mutex poisoned").is_none() {
399 return Ok(());
400 }
401 if self.poll()?.is_none() {
402 self.kill()?;
403 } else {
404 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
405 }
406 Ok(())
407 }
408
409 pub fn pid(&self) -> Option<u32> {
410 self.child
411 .lock()
412 .expect("child mutex poisoned")
413 .as_ref()
414 .map(|state| state.child.id())
415 }
416
417 pub fn returncode(&self) -> Option<i32> {
418 let v = self.shared.returncode.load(Ordering::Acquire);
419 if v == RETURNCODE_NOT_SET {
420 None
421 } else {
422 Some(v as i32)
423 }
424 }
425
426 pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
427 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
428 return false;
429 }
430 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
431 match stream {
432 StreamKind::Stdout => !guard.stdout_queue.is_empty(),
433 StreamKind::Stderr => !guard.stderr_queue.is_empty(),
434 }
435 }
436
437 pub fn has_pending_combined(&self) -> bool {
438 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
439 !guard.combined_queue.is_empty()
440 }
441
442 pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
443 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
444 return Vec::new();
445 }
446 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
447 let queue = match stream {
448 StreamKind::Stdout => &mut guard.stdout_queue,
449 StreamKind::Stderr => &mut guard.stderr_queue,
450 };
451 queue.drain(..).collect()
452 }
453
454 pub fn drain_combined(&self) -> Vec<StreamEvent> {
455 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
456 guard.combined_queue.drain(..).collect()
457 }
458
459 pub fn read_stream(
460 &self,
461 stream: StreamKind,
462 timeout: Option<Duration>,
463 ) -> ReadStatus<Vec<u8>> {
464 let deadline = timeout.map(|limit| Instant::now() + limit);
465 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
466
467 loop {
468 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
469 return ReadStatus::Eof;
470 }
471
472 let queue = match stream {
473 StreamKind::Stdout => &mut guard.stdout_queue,
474 StreamKind::Stderr => &mut guard.stderr_queue,
475 };
476 if let Some(line) = queue.pop_front() {
477 return ReadStatus::Line(line);
478 }
479
480 let closed = match stream {
481 StreamKind::Stdout => {
482 if self.config.stderr_mode == StderrMode::Stdout {
483 guard.stdout_closed && guard.stderr_closed
484 } else {
485 guard.stdout_closed
486 }
487 }
488 StreamKind::Stderr => guard.stderr_closed,
489 };
490 if closed {
491 return ReadStatus::Eof;
492 }
493
494 match deadline {
495 Some(deadline) => {
496 let now = Instant::now();
497 if now >= deadline {
498 return ReadStatus::Timeout;
499 }
500 let wait = deadline.saturating_duration_since(now);
501 let result = self
502 .shared
503 .condvar
504 .wait_timeout(guard, wait)
505 .expect("queue mutex poisoned");
506 guard = result.0;
507 if result.1.timed_out() {
508 return ReadStatus::Timeout;
509 }
510 }
511 None => {
512 guard = self
513 .shared
514 .condvar
515 .wait(guard)
516 .expect("queue mutex poisoned");
517 }
518 }
519 }
520 }
521
522 #[inline(never)]
524 pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
525 public_symbols::rp_native_process_read_combined_public(self, timeout)
526 }
527
528 fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
529 crate::rp_rust_debug_scope!("running_process_core::NativeProcess::read_combined");
530 let deadline = timeout.map(|limit| Instant::now() + limit);
531 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
532
533 loop {
534 if let Some(event) = guard.combined_queue.pop_front() {
535 return ReadStatus::Line(event);
536 }
537 if guard.stdout_closed && guard.stderr_closed {
538 return ReadStatus::Eof;
539 }
540
541 match deadline {
542 Some(deadline) => {
543 let now = Instant::now();
544 if now >= deadline {
545 return ReadStatus::Timeout;
546 }
547 let wait = deadline.saturating_duration_since(now);
548 let result = self
549 .shared
550 .condvar
551 .wait_timeout(guard, wait)
552 .expect("queue mutex poisoned");
553 guard = result.0;
554 if result.1.timed_out() {
555 return ReadStatus::Timeout;
556 }
557 }
558 None => {
559 guard = self
560 .shared
561 .condvar
562 .wait(guard)
563 .expect("queue mutex poisoned");
564 }
565 }
566 }
567 }
568
569 pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
570 self.shared
571 .queues
572 .lock()
573 .expect("queue mutex poisoned")
574 .stdout_history
575 .clone()
576 .into_iter()
577 .collect()
578 }
579
580 pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
581 if self.config.stderr_mode == StderrMode::Stdout {
582 return Vec::new();
583 }
584 self.shared
585 .queues
586 .lock()
587 .expect("queue mutex poisoned")
588 .stderr_history
589 .clone()
590 .into_iter()
591 .collect()
592 }
593
594 pub fn captured_combined(&self) -> Vec<StreamEvent> {
595 self.shared
596 .queues
597 .lock()
598 .expect("queue mutex poisoned")
599 .combined_history
600 .clone()
601 .into_iter()
602 .collect()
603 }
604
605 pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
606 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
607 return 0;
608 }
609 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
610 match stream {
611 StreamKind::Stdout => guard.stdout_history_bytes,
612 StreamKind::Stderr => guard.stderr_history_bytes,
613 }
614 }
615
616 pub fn captured_combined_bytes(&self) -> usize {
617 self.shared
618 .queues
619 .lock()
620 .expect("queue mutex poisoned")
621 .combined_history_bytes
622 }
623
624 pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
625 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
626 return 0;
627 }
628 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
629 match stream {
630 StreamKind::Stdout => {
631 let released = guard.stdout_history_bytes;
632 guard.stdout_history.clear();
633 guard.stdout_history_bytes = 0;
634 released
635 }
636 StreamKind::Stderr => {
637 let released = guard.stderr_history_bytes;
638 guard.stderr_history.clear();
639 guard.stderr_history_bytes = 0;
640 released
641 }
642 }
643 }
644
645 pub fn clear_captured_combined(&self) -> usize {
646 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
647 let released = guard.combined_history_bytes;
648 guard.combined_history.clear();
649 guard.combined_history_bytes = 0;
650 released
651 }
652
653 fn build_command(&self) -> Command {
654 let mut command = match &self.config.command {
655 CommandSpec::Shell(command) => shell_command(command),
656 CommandSpec::Argv(argv) => {
657 let mut command = Command::new(&argv[0]);
658 if argv.len() > 1 {
659 command.args(&argv[1..]);
660 }
661 command
662 }
663 };
664 if let Some(cwd) = &self.config.cwd {
665 command.current_dir(cwd);
666 }
667 if let Some(env) = &self.config.env {
668 command.env_clear();
669 command.envs(env.iter().map(|(k, v)| (k, v)));
670 }
671 #[cfg(windows)]
672 {
673 use std::os::windows::process::CommandExt;
674
675 let flags =
676 self.config.creationflags.unwrap_or(0) | windows_priority_flags(self.config.nice);
677 if flags != 0 {
678 command.creation_flags(flags);
679 }
680 }
681 #[cfg(unix)]
682 {
683 let create_process_group = self.config.create_process_group;
684 let nice = self.config.nice;
685 let containment = self.config.containment;
686
687 let needs_pre_exec = create_process_group || nice.is_some() || containment.is_some();
688
689 if needs_pre_exec {
690 use std::os::unix::process::CommandExt;
691
692 unsafe {
693 command.pre_exec(move || {
694 match containment {
695 Some(Containment::Contained) => {
696 if libc::setpgid(0, 0) == -1 {
698 return Err(std::io::Error::last_os_error());
699 }
700 #[cfg(target_os = "linux")]
706 {
707 if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL) == -1 {
708 return Err(std::io::Error::last_os_error());
709 }
710 if libc::getppid() == 1 {
711 libc::_exit(1);
712 }
713 }
714 }
715 Some(Containment::Detached) => {
716 if libc::setsid() == -1 {
719 return Err(std::io::Error::last_os_error());
720 }
721 }
722 None => {
723 if create_process_group && libc::setpgid(0, 0) == -1 {
724 return Err(std::io::Error::last_os_error());
725 }
726 }
727 }
728 if let Some(nice) = nice {
729 let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
730 if result == -1 {
731 return Err(std::io::Error::last_os_error());
732 }
733 }
734 Ok(())
735 });
736 }
737 }
738 }
739 command
740 }
741
742 fn spawn_reader<R>(&self, pipe: R, source_stream: StreamKind, visible_stream: StreamKind)
743 where
744 R: Read + Send + 'static,
745 {
746 let shared = Arc::clone(&self.shared);
747 thread::spawn(move || {
748 let mut reader = pipe;
749 let mut chunk = vec![0_u8; 65536];
750 let mut pending = Vec::new();
751
752 loop {
753 match reader.read(&mut chunk) {
754 Ok(0) => break,
755 Ok(n) => {
756 let lines = feed_chunk(&mut pending, &chunk[..n]);
757 emit_lines(&shared, visible_stream, lines);
758 }
759 Err(_) => break,
760 }
761 }
762
763 if !pending.is_empty() {
764 emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
765 }
766
767 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
768 match source_stream {
769 StreamKind::Stdout => guard.stdout_closed = true,
770 StreamKind::Stderr => guard.stderr_closed = true,
771 }
772 shared.condvar.notify_all();
773 });
774 }
775
776 fn set_returncode(&self, code: i32) {
777 self.shared.returncode.store(code as i64, Ordering::Release);
778 self.shared.condvar.notify_all();
779 }
780
781 fn wait_for_capture_completion_impl(&self) {
782 crate::rp_rust_debug_scope!(
783 "running_process_core::NativeProcess::wait_for_capture_completion"
784 );
785 if !self.config.capture {
786 return;
787 }
788
789 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
790 while !(guard.stdout_closed && guard.stderr_closed) {
791 guard = self
792 .shared
793 .condvar
794 .wait(guard)
795 .expect("queue mutex poisoned");
796 }
797 }
798}
799
800#[cfg(unix)]
801pub fn unix_set_priority(pid: u32, nice: i32) -> Result<(), std::io::Error> {
802 let result = unsafe { libc::setpriority(libc::PRIO_PROCESS, pid, nice) };
803 if result == -1 {
804 return Err(std::io::Error::last_os_error());
805 }
806 Ok(())
807}
808
809#[cfg(unix)]
810pub fn unix_signal_process(pid: u32, signal: UnixSignal) -> Result<(), std::io::Error> {
811 let result = unsafe { libc::kill(pid as i32, unix_signal_raw(signal)) };
812 if result == -1 {
813 return Err(std::io::Error::last_os_error());
814 }
815 Ok(())
816}
817
818#[cfg(unix)]
819pub fn unix_signal_process_group(pid: i32, signal: UnixSignal) -> Result<(), std::io::Error> {
820 let result = unsafe { libc::killpg(pid, unix_signal_raw(signal)) };
821 if result == -1 {
822 return Err(std::io::Error::last_os_error());
823 }
824 Ok(())
825}
826
827fn log_spawned_child_pid(pid: u32) -> Result<(), std::io::Error> {
828 let Some(path) = std::env::var_os(CHILD_PID_LOG_PATH_ENV) else {
829 return Ok(());
830 };
831
832 let mut file = OpenOptions::new().create(true).append(true).open(path)?;
833 file.write_all(format!("{pid}\n").as_bytes())?;
834 file.flush()?;
835 Ok(())
836}
837
838#[cfg(windows)]
839fn assign_child_to_windows_kill_on_close_job_impl(
840 child: &Child,
841) -> Result<WindowsJobHandle, std::io::Error> {
842 crate::rp_rust_debug_scope!("running_process_core::assign_child_to_windows_kill_on_close_job");
843 use std::mem::zeroed;
844 use std::os::windows::io::AsRawHandle;
845
846 use winapi::shared::minwindef::FALSE;
847 use winapi::um::handleapi::{CloseHandle, INVALID_HANDLE_VALUE};
848 use winapi::um::jobapi2::{
849 AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
850 };
851 use winapi::um::winnt::{
852 JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
853 JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
854 };
855
856 let handle = child.as_raw_handle();
857 let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
858 if job.is_null() || job == INVALID_HANDLE_VALUE {
859 return Err(std::io::Error::last_os_error());
860 }
861
862 let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
863 info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
864 let ok = unsafe {
865 SetInformationJobObject(
866 job,
867 JobObjectExtendedLimitInformation,
868 (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
869 std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
870 )
871 };
872 if ok == FALSE {
873 let err = std::io::Error::last_os_error();
874 unsafe { CloseHandle(job) };
875 return Err(err);
876 }
877
878 let ok = unsafe { AssignProcessToJobObject(job, handle.cast()) };
879 if ok == FALSE {
880 let err = std::io::Error::last_os_error();
881 unsafe { CloseHandle(job) };
882 return Err(err);
883 }
884
885 Ok(WindowsJobHandle(job as usize))
886}
887
888fn feed_chunk(pending: &mut Vec<u8>, chunk: &[u8]) -> Vec<Vec<u8>> {
889 let mut lines = Vec::new();
890 let mut start = 0;
891 let mut index = 0;
892
893 while index < chunk.len() {
894 if chunk[index] == b'\n' {
895 let end = if index > start && chunk[index - 1] == b'\r' {
896 index - 1
897 } else {
898 index
899 };
900 pending.extend_from_slice(&chunk[start..end]);
901 if !pending.is_empty() {
902 lines.push(std::mem::take(pending));
903 }
904 start = index + 1;
905 }
906 index += 1;
907 }
908
909 pending.extend_from_slice(&chunk[start..]);
910 lines
911}
912
913fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
914 if lines.is_empty() {
915 return;
916 }
917 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
918 for line in lines {
919 let line_len = line.len();
920 match stream {
921 StreamKind::Stdout => {
922 guard.stdout_history_bytes += line_len;
923 guard.stdout_history.push_back(line.clone());
924 guard.stdout_queue.push_back(line.clone());
925 }
926 StreamKind::Stderr => {
927 guard.stderr_history_bytes += line_len;
928 guard.stderr_history.push_back(line.clone());
929 guard.stderr_queue.push_back(line.clone());
930 }
931 }
932 let event = StreamEvent { stream, line };
933 guard.combined_history_bytes += line_len;
934 guard.combined_history.push_back(event.clone());
935 guard.combined_queue.push_back(event);
936 }
937 shared.condvar.notify_all();
938}
939
940fn shell_command(command: &str) -> Command {
941 #[cfg(windows)]
942 {
943 use std::os::windows::process::CommandExt;
944
945 let mut cmd = Command::new("cmd");
946 cmd.raw_arg("/D /S /C \"");
947 cmd.raw_arg(command);
948 cmd.raw_arg("\"");
949 cmd
950 }
951 #[cfg(not(windows))]
952 {
953 let mut cmd = Command::new("sh");
954 cmd.arg("-lc").arg(command);
955 cmd
956 }
957}
958
959fn exit_code(status: std::process::ExitStatus) -> i32 {
960 #[cfg(unix)]
961 {
962 use std::os::unix::process::ExitStatusExt;
963 status
964 .code()
965 .unwrap_or_else(|| -status.signal().unwrap_or(1))
966 }
967 #[cfg(not(unix))]
968 {
969 status.code().unwrap_or(1)
970 }
971}
972
973#[cfg(unix)]
974fn unix_signal_raw(signal: UnixSignal) -> i32 {
975 match signal {
976 UnixSignal::Interrupt => libc::SIGINT,
977 UnixSignal::Terminate => libc::SIGTERM,
978 UnixSignal::Kill => libc::SIGKILL,
979 }
980}
981
982#[cfg(windows)]
983fn windows_priority_flags(nice: Option<i32>) -> u32 {
984 const IDLE_PRIORITY_CLASS: u32 = 0x0000_0040;
985 const BELOW_NORMAL_PRIORITY_CLASS: u32 = 0x0000_4000;
986 const ABOVE_NORMAL_PRIORITY_CLASS: u32 = 0x0000_8000;
987 const HIGH_PRIORITY_CLASS: u32 = 0x0000_0080;
988
989 match nice {
990 Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
991 Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
992 Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
993 Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
994 _ => 0,
995 }
996}
997#[cfg(test)]
998mod tests {
999 use super::*;
1000
1001 #[test]
1004 fn stream_kind_as_str_stdout() {
1005 assert_eq!(StreamKind::Stdout.as_str(), "stdout");
1006 }
1007
1008 #[test]
1009 fn stream_kind_as_str_stderr() {
1010 assert_eq!(StreamKind::Stderr.as_str(), "stderr");
1011 }
1012
1013 #[test]
1014 fn stream_kind_equality() {
1015 assert_eq!(StreamKind::Stdout, StreamKind::Stdout);
1016 assert_ne!(StreamKind::Stdout, StreamKind::Stderr);
1017 }
1018
1019 #[test]
1022 fn stream_event_clone() {
1023 let event = StreamEvent {
1024 stream: StreamKind::Stdout,
1025 line: b"hello".to_vec(),
1026 };
1027 let cloned = event.clone();
1028 assert_eq!(event, cloned);
1029 }
1030
1031 #[test]
1034 fn read_status_line_variant() {
1035 let status: ReadStatus<Vec<u8>> = ReadStatus::Line(b"data".to_vec());
1036 assert!(matches!(status, ReadStatus::Line(ref v) if v == b"data"));
1037 }
1038
1039 #[test]
1040 fn read_status_timeout_variant() {
1041 let status: ReadStatus<Vec<u8>> = ReadStatus::Timeout;
1042 assert!(matches!(status, ReadStatus::Timeout));
1043 }
1044
1045 #[test]
1046 fn read_status_eof_variant() {
1047 let status: ReadStatus<Vec<u8>> = ReadStatus::Eof;
1048 assert!(matches!(status, ReadStatus::Eof));
1049 }
1050
1051 #[test]
1054 fn process_error_display_already_started() {
1055 assert_eq!(
1056 ProcessError::AlreadyStarted.to_string(),
1057 "process already started"
1058 );
1059 }
1060
1061 #[test]
1062 fn process_error_display_not_running() {
1063 assert_eq!(
1064 ProcessError::NotRunning.to_string(),
1065 "process is not running"
1066 );
1067 }
1068
1069 #[test]
1070 fn process_error_display_stdin_unavailable() {
1071 assert_eq!(
1072 ProcessError::StdinUnavailable.to_string(),
1073 "process stdin is not available"
1074 );
1075 }
1076
1077 #[test]
1078 fn process_error_display_timeout() {
1079 assert_eq!(ProcessError::Timeout.to_string(), "process timed out");
1080 }
1081
1082 #[test]
1083 fn process_error_display_spawn() {
1084 let err = ProcessError::Spawn(std::io::Error::new(
1085 std::io::ErrorKind::NotFound,
1086 "not found",
1087 ));
1088 assert!(err.to_string().contains("not found"));
1089 }
1090
1091 #[test]
1092 fn process_error_display_io() {
1093 let err = ProcessError::Io(std::io::Error::new(
1094 std::io::ErrorKind::BrokenPipe,
1095 "broken",
1096 ));
1097 assert!(err.to_string().contains("broken"));
1098 }
1099
1100 #[test]
1103 fn command_spec_shell_variant() {
1104 let spec = CommandSpec::Shell("echo hello".to_string());
1105 assert!(matches!(spec, CommandSpec::Shell(ref s) if s == "echo hello"));
1106 }
1107
1108 #[test]
1109 fn command_spec_argv_variant() {
1110 let spec = CommandSpec::Argv(vec!["echo".to_string(), "hello".to_string()]);
1111 assert!(matches!(spec, CommandSpec::Argv(ref v) if v.len() == 2));
1112 }
1113
1114 #[test]
1117 fn stdin_mode_equality() {
1118 assert_eq!(StdinMode::Inherit, StdinMode::Inherit);
1119 assert_ne!(StdinMode::Piped, StdinMode::Null);
1120 }
1121
1122 #[test]
1123 fn stderr_mode_equality() {
1124 assert_eq!(StderrMode::Stdout, StderrMode::Stdout);
1125 assert_ne!(StderrMode::Stdout, StderrMode::Pipe);
1126 }
1127
1128 #[test]
1131 fn shared_state_new_with_capture() {
1132 let state = SharedState::new(true);
1133 let queues = state.queues.lock().unwrap();
1134 assert!(!queues.stdout_closed);
1135 assert!(!queues.stderr_closed);
1136 assert!(queues.stdout_queue.is_empty());
1137 assert!(queues.stderr_queue.is_empty());
1138 }
1139
1140 #[test]
1141 fn shared_state_new_without_capture() {
1142 let state = SharedState::new(false);
1143 let queues = state.queues.lock().unwrap();
1144 assert!(queues.stdout_closed);
1145 assert!(queues.stderr_closed);
1146 }
1147
1148 #[test]
1149 fn shared_state_returncode_initially_not_set() {
1150 let state = SharedState::new(true);
1151 let code = state.returncode.load(Ordering::Acquire);
1152 assert_eq!(code, RETURNCODE_NOT_SET);
1153 }
1154
1155 #[test]
1158 fn feed_chunk_single_line_with_newline() {
1159 let shared = Arc::new(SharedState::new(true));
1160 let mut pending = Vec::new();
1161 let lines = feed_chunk(&mut pending, b"hello\n");
1162 emit_lines(&shared, StreamKind::Stdout, lines);
1163 let queues = shared.queues.lock().unwrap();
1164 assert_eq!(queues.stdout_queue.len(), 1);
1165 assert_eq!(queues.stdout_queue[0], b"hello");
1166 assert!(pending.is_empty());
1167 }
1168
1169 #[test]
1170 fn feed_chunk_crlf_stripping() {
1171 let shared = Arc::new(SharedState::new(true));
1172 let mut pending = Vec::new();
1173 let lines = feed_chunk(&mut pending, b"hello\r\n");
1174 emit_lines(&shared, StreamKind::Stdout, lines);
1175 let queues = shared.queues.lock().unwrap();
1176 assert_eq!(queues.stdout_queue.len(), 1);
1177 assert_eq!(queues.stdout_queue[0], b"hello");
1178 }
1179
1180 #[test]
1181 fn feed_chunk_multiple_lines() {
1182 let shared = Arc::new(SharedState::new(true));
1183 let mut pending = Vec::new();
1184 let lines = feed_chunk(&mut pending, b"a\nb\nc\n");
1185 emit_lines(&shared, StreamKind::Stdout, lines);
1186 let queues = shared.queues.lock().unwrap();
1187 assert_eq!(queues.stdout_queue.len(), 3);
1188 assert_eq!(queues.stdout_queue[0], b"a");
1189 assert_eq!(queues.stdout_queue[1], b"b");
1190 assert_eq!(queues.stdout_queue[2], b"c");
1191 }
1192
1193 #[test]
1194 fn feed_chunk_no_newline_stays_pending() {
1195 let mut pending = Vec::new();
1196 let lines = feed_chunk(&mut pending, b"partial");
1197 assert!(lines.is_empty());
1198 assert_eq!(pending, b"partial");
1199 }
1200
1201 #[test]
1202 fn feed_chunk_accumulates_pending() {
1203 let shared = Arc::new(SharedState::new(true));
1204 let mut pending = Vec::new();
1205 let lines1 = feed_chunk(&mut pending, b"hel");
1206 emit_lines(&shared, StreamKind::Stdout, lines1);
1207 let lines2 = feed_chunk(&mut pending, b"lo\n");
1208 emit_lines(&shared, StreamKind::Stdout, lines2);
1209 let queues = shared.queues.lock().unwrap();
1210 assert_eq!(queues.stdout_queue.len(), 1);
1211 assert_eq!(queues.stdout_queue[0], b"hello");
1212 assert!(pending.is_empty());
1213 }
1214
1215 #[test]
1216 fn feed_chunk_empty_line_not_emitted() {
1217 let shared = Arc::new(SharedState::new(true));
1218 let mut pending = Vec::new();
1219 let lines = feed_chunk(&mut pending, b"\n");
1220 emit_lines(&shared, StreamKind::Stdout, lines);
1221 let queues = shared.queues.lock().unwrap();
1222 assert!(queues.stdout_queue.is_empty());
1223 }
1224
1225 #[test]
1226 fn feed_chunk_stderr_goes_to_stderr_queue() {
1227 let shared = Arc::new(SharedState::new(true));
1228 let mut pending = Vec::new();
1229 let lines = feed_chunk(&mut pending, b"error\n");
1230 emit_lines(&shared, StreamKind::Stderr, lines);
1231 let queues = shared.queues.lock().unwrap();
1232 assert!(queues.stdout_queue.is_empty());
1233 assert_eq!(queues.stderr_queue.len(), 1);
1234 assert_eq!(queues.stderr_queue[0], b"error");
1235 }
1236
1237 #[test]
1240 fn emit_lines_updates_all_queues_and_history() {
1241 let shared = Arc::new(SharedState::new(true));
1242 emit_lines(&shared, StreamKind::Stdout, vec![b"test".to_vec()]);
1243 let queues = shared.queues.lock().unwrap();
1244 assert_eq!(queues.stdout_queue.len(), 1);
1245 assert_eq!(queues.stdout_history.len(), 1);
1246 assert_eq!(queues.stdout_history_bytes, 4);
1247 assert_eq!(queues.combined_queue.len(), 1);
1248 assert_eq!(queues.combined_history.len(), 1);
1249 assert_eq!(queues.combined_history_bytes, 4);
1250 }
1251
1252 #[test]
1253 fn emit_lines_stderr_updates_stderr_queues() {
1254 let shared = Arc::new(SharedState::new(true));
1255 emit_lines(&shared, StreamKind::Stderr, vec![b"err".to_vec()]);
1256 let queues = shared.queues.lock().unwrap();
1257 assert_eq!(queues.stderr_queue.len(), 1);
1258 assert_eq!(queues.stderr_history.len(), 1);
1259 assert_eq!(queues.stderr_history_bytes, 3);
1260 assert_eq!(queues.combined_queue.len(), 1);
1261 assert_eq!(queues.combined_history_bytes, 3);
1262 }
1263
1264 #[test]
1267 fn native_process_returncode_none_before_start() {
1268 let process = NativeProcess::new(ProcessConfig {
1269 command: CommandSpec::Argv(vec!["echo".into()]),
1270 cwd: None,
1271 env: None,
1272 capture: false,
1273 stderr_mode: StderrMode::Stdout,
1274 creationflags: None,
1275 create_process_group: false,
1276 stdin_mode: StdinMode::Inherit,
1277 nice: None,
1278 containment: None,
1279 });
1280 assert!(process.returncode().is_none());
1281 }
1282
1283 #[test]
1284 fn native_process_pid_none_before_start() {
1285 let process = NativeProcess::new(ProcessConfig {
1286 command: CommandSpec::Argv(vec!["echo".into()]),
1287 cwd: None,
1288 env: None,
1289 capture: false,
1290 stderr_mode: StderrMode::Stdout,
1291 creationflags: None,
1292 create_process_group: false,
1293 stdin_mode: StdinMode::Inherit,
1294 nice: None,
1295 containment: None,
1296 });
1297 assert!(process.pid().is_none());
1298 }
1299
1300 #[test]
1301 fn native_process_has_pending_false_when_no_capture() {
1302 let process = NativeProcess::new(ProcessConfig {
1303 command: CommandSpec::Argv(vec!["echo".into()]),
1304 cwd: None,
1305 env: None,
1306 capture: false,
1307 stderr_mode: StderrMode::Stdout,
1308 creationflags: None,
1309 create_process_group: false,
1310 stdin_mode: StdinMode::Inherit,
1311 nice: None,
1312 containment: None,
1313 });
1314 assert!(!process.has_pending_stream(StreamKind::Stdout));
1315 assert!(!process.has_pending_combined());
1316 }
1317
1318 #[test]
1319 fn native_process_drain_empty_when_no_capture() {
1320 let process = NativeProcess::new(ProcessConfig {
1321 command: CommandSpec::Argv(vec!["echo".into()]),
1322 cwd: None,
1323 env: None,
1324 capture: false,
1325 stderr_mode: StderrMode::Stdout,
1326 creationflags: None,
1327 create_process_group: false,
1328 stdin_mode: StdinMode::Inherit,
1329 nice: None,
1330 containment: None,
1331 });
1332 assert!(process.drain_stream(StreamKind::Stdout).is_empty());
1333 assert!(process.drain_combined().is_empty());
1334 }
1335
1336 #[test]
1337 fn native_process_stderr_not_pending_when_merged() {
1338 let process = NativeProcess::new(ProcessConfig {
1339 command: CommandSpec::Argv(vec!["echo".into()]),
1340 cwd: None,
1341 env: None,
1342 capture: true,
1343 stderr_mode: StderrMode::Stdout,
1344 creationflags: None,
1345 create_process_group: false,
1346 stdin_mode: StdinMode::Inherit,
1347 nice: None,
1348 containment: None,
1349 });
1350 assert!(!process.has_pending_stream(StreamKind::Stderr));
1351 }
1352
1353 #[test]
1354 fn native_process_drain_stderr_empty_when_merged() {
1355 let process = NativeProcess::new(ProcessConfig {
1356 command: CommandSpec::Argv(vec!["echo".into()]),
1357 cwd: None,
1358 env: None,
1359 capture: true,
1360 stderr_mode: StderrMode::Stdout,
1361 creationflags: None,
1362 create_process_group: false,
1363 stdin_mode: StdinMode::Inherit,
1364 nice: None,
1365 containment: None,
1366 });
1367 assert!(process.drain_stream(StreamKind::Stderr).is_empty());
1368 }
1369
1370 #[test]
1371 fn native_process_captured_stderr_empty_when_merged() {
1372 let process = NativeProcess::new(ProcessConfig {
1373 command: CommandSpec::Argv(vec!["echo".into()]),
1374 cwd: None,
1375 env: None,
1376 capture: true,
1377 stderr_mode: StderrMode::Stdout,
1378 creationflags: None,
1379 create_process_group: false,
1380 stdin_mode: StdinMode::Inherit,
1381 nice: None,
1382 containment: None,
1383 });
1384 assert!(process.captured_stderr().is_empty());
1385 }
1386
1387 #[test]
1388 fn native_process_captured_stream_bytes_zero_when_merged_stderr() {
1389 let process = NativeProcess::new(ProcessConfig {
1390 command: CommandSpec::Argv(vec!["echo".into()]),
1391 cwd: None,
1392 env: None,
1393 capture: true,
1394 stderr_mode: StderrMode::Stdout,
1395 creationflags: None,
1396 create_process_group: false,
1397 stdin_mode: StdinMode::Inherit,
1398 nice: None,
1399 containment: None,
1400 });
1401 assert_eq!(process.captured_stream_bytes(StreamKind::Stderr), 0);
1402 }
1403
1404 #[test]
1405 fn native_process_clear_captured_stderr_zero_when_merged() {
1406 let process = NativeProcess::new(ProcessConfig {
1407 command: CommandSpec::Argv(vec!["echo".into()]),
1408 cwd: None,
1409 env: None,
1410 capture: true,
1411 stderr_mode: StderrMode::Stdout,
1412 creationflags: None,
1413 create_process_group: false,
1414 stdin_mode: StdinMode::Inherit,
1415 nice: None,
1416 containment: None,
1417 });
1418 assert_eq!(process.clear_captured_stream(StreamKind::Stderr), 0);
1419 }
1420
1421 #[test]
1422 fn native_process_read_stream_eof_when_stderr_merged() {
1423 let process = NativeProcess::new(ProcessConfig {
1424 command: CommandSpec::Argv(vec!["echo".into()]),
1425 cwd: None,
1426 env: None,
1427 capture: true,
1428 stderr_mode: StderrMode::Stdout,
1429 creationflags: None,
1430 create_process_group: false,
1431 stdin_mode: StdinMode::Inherit,
1432 nice: None,
1433 containment: None,
1434 });
1435 assert_eq!(
1436 process.read_stream(StreamKind::Stderr, Some(Duration::from_millis(10))),
1437 ReadStatus::Eof
1438 );
1439 }
1440
1441 #[test]
1444 fn log_spawned_child_pid_noop_without_env() {
1445 std::env::remove_var("RUNNING_PROCESS_CHILD_PID_LOG_PATH");
1446 assert!(log_spawned_child_pid(12345).is_ok());
1447 }
1448
1449 #[test]
1452 fn shell_command_creates_command() {
1453 let cmd = shell_command("echo test");
1454 let _ = format!("{:?}", cmd);
1455 }
1456
1457 #[test]
1460 fn exit_code_from_success() {
1461 let output = std::process::Command::new("python")
1462 .args(["-c", "pass"])
1463 .output()
1464 .unwrap();
1465 assert_eq!(exit_code(output.status), 0);
1466 }
1467
1468 #[test]
1469 fn exit_code_from_nonzero() {
1470 let output = std::process::Command::new("python")
1471 .args(["-c", "import sys; sys.exit(42)"])
1472 .output()
1473 .unwrap();
1474 assert_eq!(exit_code(output.status), 42);
1475 }
1476
1477 #[cfg(windows)]
1480 mod windows_tests {
1481 use super::*;
1482
1483 const IDLE_PRIORITY_CLASS: u32 = 0x0000_0040;
1484 const BELOW_NORMAL_PRIORITY_CLASS: u32 = 0x0000_4000;
1485 const ABOVE_NORMAL_PRIORITY_CLASS: u32 = 0x0000_8000;
1486 const HIGH_PRIORITY_CLASS: u32 = 0x0000_0080;
1487
1488 #[test]
1489 fn priority_flags_none() {
1490 assert_eq!(windows_priority_flags(None), 0);
1491 }
1492
1493 #[test]
1494 fn priority_flags_zero() {
1495 assert_eq!(windows_priority_flags(Some(0)), 0);
1496 }
1497
1498 #[test]
1499 fn priority_flags_high_nice_idle() {
1500 assert_eq!(windows_priority_flags(Some(15)), IDLE_PRIORITY_CLASS);
1501 assert_eq!(windows_priority_flags(Some(20)), IDLE_PRIORITY_CLASS);
1502 }
1503
1504 #[test]
1505 fn priority_flags_low_positive_below_normal() {
1506 assert_eq!(windows_priority_flags(Some(1)), BELOW_NORMAL_PRIORITY_CLASS);
1507 assert_eq!(
1508 windows_priority_flags(Some(14)),
1509 BELOW_NORMAL_PRIORITY_CLASS
1510 );
1511 }
1512
1513 #[test]
1514 fn priority_flags_negative_above_normal() {
1515 assert_eq!(
1516 windows_priority_flags(Some(-1)),
1517 ABOVE_NORMAL_PRIORITY_CLASS
1518 );
1519 assert_eq!(
1520 windows_priority_flags(Some(-14)),
1521 ABOVE_NORMAL_PRIORITY_CLASS
1522 );
1523 }
1524
1525 #[test]
1526 fn priority_flags_very_negative_high() {
1527 assert_eq!(windows_priority_flags(Some(-15)), HIGH_PRIORITY_CLASS);
1528 assert_eq!(windows_priority_flags(Some(-20)), HIGH_PRIORITY_CLASS);
1529 }
1530 }
1531
1532 #[test]
1535 fn process_config_clone() {
1536 let config = ProcessConfig {
1537 command: CommandSpec::Shell("echo".to_string()),
1538 cwd: Some("/tmp".into()),
1539 env: Some(vec![("KEY".to_string(), "VAL".to_string())]),
1540 capture: true,
1541 stderr_mode: StderrMode::Pipe,
1542 creationflags: Some(0x10),
1543 create_process_group: true,
1544 stdin_mode: StdinMode::Piped,
1545 nice: Some(5),
1546 containment: None,
1547 };
1548 let cloned = config.clone();
1549 assert!(cloned.capture);
1550 assert_eq!(cloned.nice, Some(5));
1551 }
1552
1553 #[test]
1556 fn render_rust_debug_traces_returns_string() {
1557 let result = render_rust_debug_traces();
1558 let _ = result.len();
1559 }
1560
1561 #[test]
1564 fn rust_debug_scope_guard_enters_and_drops() {
1565 let _guard = RustDebugScopeGuard::enter("test_scope", file!(), line!());
1566 let traces = render_rust_debug_traces();
1567 assert!(traces.contains("test_scope"));
1568 drop(_guard);
1569 }
1570
1571 #[cfg(unix)]
1574 mod unix_tests {
1575 use super::*;
1576
1577 #[test]
1578 fn unix_signal_raw_values() {
1579 assert_eq!(unix_signal_raw(UnixSignal::Interrupt), libc::SIGINT);
1580 assert_eq!(unix_signal_raw(UnixSignal::Terminate), libc::SIGTERM);
1581 assert_eq!(unix_signal_raw(UnixSignal::Kill), libc::SIGKILL);
1582 }
1583
1584 #[test]
1585 fn unix_signal_enum_equality() {
1586 assert_eq!(UnixSignal::Interrupt, UnixSignal::Interrupt);
1587 assert_ne!(UnixSignal::Interrupt, UnixSignal::Kill);
1588 }
1589 }
1590
1591 #[test]
1594 fn wait_for_capture_completion_noop_without_capture() {
1595 let process = NativeProcess::new(ProcessConfig {
1596 command: CommandSpec::Argv(vec!["echo".into()]),
1597 cwd: None,
1598 env: None,
1599 capture: false,
1600 stderr_mode: StderrMode::Stdout,
1601 creationflags: None,
1602 create_process_group: false,
1603 stdin_mode: StdinMode::Inherit,
1604 nice: None,
1605 containment: None,
1606 });
1607 process.wait_for_capture_completion_impl();
1608 }
1609
1610 #[test]
1613 fn build_command_from_argv() {
1614 let process = NativeProcess::new(ProcessConfig {
1615 command: CommandSpec::Argv(vec!["echo".into(), "hello".into(), "world".into()]),
1616 cwd: None,
1617 env: None,
1618 capture: false,
1619 stderr_mode: StderrMode::Stdout,
1620 creationflags: None,
1621 create_process_group: false,
1622 stdin_mode: StdinMode::Inherit,
1623 nice: None,
1624 containment: None,
1625 });
1626 let cmd = process.build_command();
1627 assert_eq!(cmd.get_program(), "echo");
1628 let args: Vec<_> = cmd.get_args().collect();
1629 assert_eq!(args, vec!["hello", "world"]);
1630 }
1631
1632 #[test]
1633 fn build_command_from_shell() {
1634 let process = NativeProcess::new(ProcessConfig {
1635 command: CommandSpec::Shell("echo test".into()),
1636 cwd: None,
1637 env: None,
1638 capture: false,
1639 stderr_mode: StderrMode::Stdout,
1640 creationflags: None,
1641 create_process_group: false,
1642 stdin_mode: StdinMode::Inherit,
1643 nice: None,
1644 containment: None,
1645 });
1646 let cmd = process.build_command();
1647 let program = cmd.get_program().to_string_lossy().to_string();
1649 #[cfg(windows)]
1650 assert!(
1651 program.contains("cmd"),
1652 "expected cmd shell, got {}",
1653 program
1654 );
1655 #[cfg(not(windows))]
1656 assert!(program.contains("sh"), "expected sh shell, got {}", program);
1657 }
1658
1659 #[test]
1660 fn build_command_with_cwd() {
1661 let tmp = std::env::temp_dir();
1662 let process = NativeProcess::new(ProcessConfig {
1663 command: CommandSpec::Argv(vec!["echo".into()]),
1664 cwd: Some(tmp.clone()),
1665 env: None,
1666 capture: false,
1667 stderr_mode: StderrMode::Stdout,
1668 creationflags: None,
1669 create_process_group: false,
1670 stdin_mode: StdinMode::Inherit,
1671 nice: None,
1672 containment: None,
1673 });
1674 let cmd = process.build_command();
1675 assert_eq!(cmd.get_current_dir().unwrap(), &tmp);
1676 }
1677
1678 #[test]
1679 fn build_command_with_env() {
1680 let process = NativeProcess::new(ProcessConfig {
1681 command: CommandSpec::Argv(vec!["echo".into()]),
1682 cwd: None,
1683 env: Some(vec![
1684 ("FOO".into(), "bar".into()),
1685 ("BAZ".into(), "qux".into()),
1686 ]),
1687 capture: false,
1688 stderr_mode: StderrMode::Stdout,
1689 creationflags: None,
1690 create_process_group: false,
1691 stdin_mode: StdinMode::Inherit,
1692 nice: None,
1693 containment: None,
1694 });
1695 let cmd = process.build_command();
1696 let envs: Vec<_> = cmd.get_envs().collect();
1697 assert!(envs
1698 .iter()
1699 .any(|(k, v)| *k == "FOO" && *v == Some(std::ffi::OsStr::new("bar"))));
1700 assert!(envs
1701 .iter()
1702 .any(|(k, v)| *k == "BAZ" && *v == Some(std::ffi::OsStr::new("qux"))));
1703 }
1704
1705 #[test]
1706 fn build_command_single_argv() {
1707 let process = NativeProcess::new(ProcessConfig {
1708 command: CommandSpec::Argv(vec!["echo".into()]),
1709 cwd: None,
1710 env: None,
1711 capture: false,
1712 stderr_mode: StderrMode::Stdout,
1713 creationflags: None,
1714 create_process_group: false,
1715 stdin_mode: StdinMode::Inherit,
1716 nice: None,
1717 containment: None,
1718 });
1719 let cmd = process.build_command();
1720 assert_eq!(cmd.get_program(), "echo");
1721 assert_eq!(cmd.get_args().count(), 0);
1722 }
1723
1724 #[test]
1727 fn set_returncode_updates_shared_state() {
1728 let process = NativeProcess::new(ProcessConfig {
1729 command: CommandSpec::Argv(vec!["echo".into()]),
1730 cwd: None,
1731 env: None,
1732 capture: false,
1733 stderr_mode: StderrMode::Stdout,
1734 creationflags: None,
1735 create_process_group: false,
1736 stdin_mode: StdinMode::Inherit,
1737 nice: None,
1738 containment: None,
1739 });
1740 assert!(process.returncode().is_none());
1741 process.set_returncode(42);
1742 assert_eq!(process.returncode(), Some(42));
1743 }
1744
1745 #[test]
1746 fn set_returncode_overwrites() {
1747 let process = NativeProcess::new(ProcessConfig {
1748 command: CommandSpec::Argv(vec!["echo".into()]),
1749 cwd: None,
1750 env: None,
1751 capture: false,
1752 stderr_mode: StderrMode::Stdout,
1753 creationflags: None,
1754 create_process_group: false,
1755 stdin_mode: StdinMode::Inherit,
1756 nice: None,
1757 containment: None,
1758 });
1759 process.set_returncode(1);
1760 process.set_returncode(2);
1761 assert_eq!(process.returncode(), Some(2));
1762 }
1763
1764 #[test]
1767 fn shared_state_with_capture_queues_open() {
1768 let state = SharedState::new(true);
1769 let guard = state.queues.lock().unwrap();
1770 assert!(!guard.stdout_closed);
1771 assert!(!guard.stderr_closed);
1772 }
1773
1774 #[test]
1775 fn shared_state_without_capture_queues_closed() {
1776 let state = SharedState::new(false);
1777 let guard = state.queues.lock().unwrap();
1778 assert!(guard.stdout_closed);
1779 assert!(guard.stderr_closed);
1780 }
1781
1782 #[test]
1785 fn process_error_display_io_variant() {
1786 let err = ProcessError::Io(std::io::Error::new(
1787 std::io::ErrorKind::BrokenPipe,
1788 "pipe broken",
1789 ));
1790 let msg = format!("{}", err);
1791 assert!(msg.contains("pipe broken"));
1792 }
1793
1794 #[test]
1795 fn process_error_display_spawn_variant() {
1796 let err = ProcessError::Spawn(std::io::Error::new(
1797 std::io::ErrorKind::NotFound,
1798 "not found",
1799 ));
1800 let msg = format!("{}", err);
1801 assert!(msg.contains("not found"));
1802 }
1803
1804 #[test]
1807 fn shell_command_returns_command_with_shell() {
1808 let cmd = shell_command("echo test");
1809 let program = cmd.get_program().to_string_lossy().to_string();
1810 #[cfg(windows)]
1811 assert!(program.contains("cmd"));
1812 #[cfg(not(windows))]
1813 assert!(program.contains("sh"));
1814 }
1815}