1use std::collections::VecDeque;
2use std::fs::OpenOptions;
3use std::io::Read;
4use std::io::Write;
5use std::path::PathBuf;
6use std::process::{Child, Command, Stdio};
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::{Arc, Condvar, Mutex};
9use std::thread;
10use std::time::{Duration, Instant};
11
12use thiserror::Error;
13
14pub mod containment;
15#[cfg(feature = "originator-scan")]
16pub mod originator;
17mod public_symbols;
18mod rust_debug;
19
20pub use containment::{ContainedChild, ContainedProcessGroup, Containment, ORIGINATOR_ENV_VAR};
21#[cfg(feature = "originator-scan")]
22pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
23pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
24
25#[macro_export]
26macro_rules! rp_rust_debug_scope {
27 ($label:expr) => {
28 let _running_process_rust_debug_scope =
29 $crate::RustDebugScopeGuard::enter($label, file!(), line!());
30 };
31}
32
33const CHILD_PID_LOG_PATH_ENV: &str = "RUNNING_PROCESS_CHILD_PID_LOG_PATH";
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum StreamKind {
37 Stdout,
38 Stderr,
39}
40
41impl StreamKind {
42 pub fn as_str(self) -> &'static str {
43 match self {
44 Self::Stdout => "stdout",
45 Self::Stderr => "stderr",
46 }
47 }
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub struct StreamEvent {
52 pub stream: StreamKind,
53 pub line: Vec<u8>,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum ReadStatus<T> {
58 Line(T),
59 Timeout,
60 Eof,
61}
62
63#[derive(Debug, Error)]
64pub enum ProcessError {
65 #[error("process already started")]
66 AlreadyStarted,
67 #[error("process is not running")]
68 NotRunning,
69 #[error("process stdin is not available")]
70 StdinUnavailable,
71 #[error("failed to spawn process: {0}")]
72 Spawn(std::io::Error),
73 #[error("failed to read process output: {0}")]
74 Io(std::io::Error),
75 #[error("process timed out")]
76 Timeout,
77}
78
79#[derive(Debug, Clone)]
80pub enum CommandSpec {
81 Shell(String),
82 Argv(Vec<String>),
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum StdinMode {
87 Inherit,
88 Piped,
89 Null,
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum StderrMode {
94 Stdout,
95 Pipe,
96}
97
98#[cfg(unix)]
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum UnixSignal {
101 Interrupt,
102 Terminate,
103 Kill,
104}
105
106#[derive(Debug, Clone)]
107pub struct ProcessConfig {
108 pub command: CommandSpec,
109 pub cwd: Option<PathBuf>,
110 pub env: Option<Vec<(String, String)>>,
111 pub capture: bool,
112 pub stderr_mode: StderrMode,
113 pub creationflags: Option<u32>,
114 pub create_process_group: bool,
115 pub stdin_mode: StdinMode,
116 pub nice: Option<i32>,
117 pub containment: Option<Containment>,
122}
123
124#[derive(Default)]
125struct QueueState {
126 stdout_queue: VecDeque<Vec<u8>>,
127 stderr_queue: VecDeque<Vec<u8>>,
128 combined_queue: VecDeque<StreamEvent>,
129 stdout_history: VecDeque<Vec<u8>>,
130 stderr_history: VecDeque<Vec<u8>>,
131 combined_history: VecDeque<StreamEvent>,
132 stdout_history_bytes: usize,
133 stderr_history_bytes: usize,
134 combined_history_bytes: usize,
135 stdout_closed: bool,
136 stderr_closed: bool,
137}
138
139const RETURNCODE_NOT_SET: i64 = i64::MIN;
141
142struct SharedState {
143 queues: Mutex<QueueState>,
144 condvar: Condvar,
145 returncode: AtomicI64,
148}
149
150#[cfg(windows)]
151struct WindowsJobHandle(usize);
152
153#[cfg(windows)]
154impl Drop for WindowsJobHandle {
155 fn drop(&mut self) {
156 unsafe {
157 winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
158 }
159 }
160}
161
162struct ChildState {
163 child: Child,
164 #[cfg(windows)]
165 _job: WindowsJobHandle,
166}
167
168impl SharedState {
169 fn new(capture: bool) -> Self {
170 let queues = QueueState {
171 stdout_closed: !capture,
172 stderr_closed: !capture,
173 ..QueueState::default()
174 };
175 Self {
176 queues: Mutex::new(queues),
177 condvar: Condvar::new(),
178 returncode: AtomicI64::new(RETURNCODE_NOT_SET),
179 }
180 }
181}
182
183pub struct NativeProcess {
184 config: ProcessConfig,
185 child: Arc<Mutex<Option<ChildState>>>,
186 shared: Arc<SharedState>,
187}
188
189impl NativeProcess {
190 pub fn new(config: ProcessConfig) -> Self {
191 Self {
192 shared: Arc::new(SharedState::new(config.capture)),
193 child: Arc::new(Mutex::new(None)),
194 config,
195 }
196 }
197
198 #[inline(never)]
200 pub fn start(&self) -> Result<(), ProcessError> {
201 public_symbols::rp_native_process_start_public(self)
202 }
203
204 fn start_impl(&self) -> Result<(), ProcessError> {
205 crate::rp_rust_debug_scope!("running_process_core::NativeProcess::start");
206 let mut guard = self.child.lock().expect("child mutex poisoned");
207 if guard.is_some() {
208 return Err(ProcessError::AlreadyStarted);
209 }
210
211 let mut command = self.build_command();
212 match self.config.stdin_mode {
213 StdinMode::Inherit => {}
214 StdinMode::Piped => {
215 command.stdin(Stdio::piped());
216 }
217 StdinMode::Null => {
218 command.stdin(Stdio::null());
219 }
220 }
221 if self.config.capture {
222 command.stdout(Stdio::piped());
223 command.stderr(Stdio::piped());
224 }
225
226 let mut child = command.spawn().map_err(ProcessError::Spawn)?;
227 log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
228 #[cfg(windows)]
229 let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
230 .map_err(ProcessError::Spawn)?;
231 if self.config.capture {
232 let stdout = child.stdout.take().expect("stdout pipe missing");
233 let stderr = child.stderr.take().expect("stderr pipe missing");
234 self.spawn_reader(stdout, StreamKind::Stdout, StreamKind::Stdout);
235 self.spawn_reader(
236 stderr,
237 StreamKind::Stderr,
238 match self.config.stderr_mode {
239 StderrMode::Stdout => StreamKind::Stdout,
240 StderrMode::Pipe => StreamKind::Stderr,
241 },
242 );
243 }
244 *guard = Some(ChildState {
245 child,
246 #[cfg(windows)]
247 _job: job,
248 });
249 drop(guard);
250 self.spawn_exit_waiter();
251 Ok(())
252 }
253
254 fn spawn_exit_waiter(&self) {
257 let child = Arc::clone(&self.child);
258 let shared = Arc::clone(&self.shared);
259 thread::spawn(move || loop {
260 if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
261 return;
262 }
263 {
264 let mut guard = child.lock().expect("child mutex poisoned");
265 if let Some(child_state) = guard.as_mut() {
266 match child_state.child.try_wait() {
267 Ok(Some(status)) => {
268 let code = exit_code(status);
269 shared.returncode.store(code as i64, Ordering::Release);
270 shared.condvar.notify_all();
271 return;
272 }
273 Ok(None) => {}
274 Err(_) => return,
275 }
276 } else {
277 return;
278 }
279 }
280 thread::sleep(Duration::from_millis(1));
281 });
282 }
283
284 pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
285 let mut guard = self.child.lock().expect("child mutex poisoned");
286 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
287 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
288 use std::io::Write;
289 stdin.write_all(data).map_err(ProcessError::Io)?;
290 stdin.flush().map_err(ProcessError::Io)?;
291 drop(child.stdin.take());
292 Ok(())
293 }
294
295 pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
296 if let Some(code) = self.returncode() {
298 return Ok(Some(code));
299 }
300 let mut guard = self.child.lock().expect("child mutex poisoned");
301 let Some(child_state) = guard.as_mut() else {
302 return Ok(self.returncode());
303 };
304 let child = &mut child_state.child;
305 let status = child.try_wait().map_err(ProcessError::Io)?;
306 if let Some(status) = status {
307 let code = exit_code(status);
308 self.set_returncode(code);
309 return Ok(Some(code));
310 }
311 Ok(None)
312 }
313
314 #[inline(never)]
316 pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
317 public_symbols::rp_native_process_wait_public(self, timeout)
318 }
319
320 fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
321 crate::rp_rust_debug_scope!("running_process_core::NativeProcess::wait");
322 if self.child.lock().expect("child mutex poisoned").is_none() {
323 return self.returncode().ok_or(ProcessError::NotRunning);
324 }
325 let start = Instant::now();
326 loop {
327 if let Some(code) = self.poll()? {
328 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
329 return Ok(code);
330 }
331 if timeout.is_some_and(|limit| start.elapsed() >= limit) {
332 return Err(ProcessError::Timeout);
333 }
334 thread::sleep(Duration::from_millis(10));
335 }
336 }
337
338 #[inline(never)]
340 pub fn kill(&self) -> Result<(), ProcessError> {
341 public_symbols::rp_native_process_kill_public(self)
342 }
343
344 fn kill_impl(&self) -> Result<(), ProcessError> {
345 crate::rp_rust_debug_scope!("running_process_core::NativeProcess::kill");
346 let mut guard = self.child.lock().expect("child mutex poisoned");
347 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
348 child.kill().map_err(ProcessError::Io)?;
349 let status = child.wait().map_err(ProcessError::Io)?;
350 self.set_returncode(exit_code(status));
351 Ok(())
352 }
353
354 pub fn terminate(&self) -> Result<(), ProcessError> {
355 self.kill()
356 }
357
358 #[inline(never)]
360 pub fn close(&self) -> Result<(), ProcessError> {
361 public_symbols::rp_native_process_close_public(self)
362 }
363
364 fn close_impl(&self) -> Result<(), ProcessError> {
365 crate::rp_rust_debug_scope!("running_process_core::NativeProcess::close");
366 if self.child.lock().expect("child mutex poisoned").is_none() {
367 return Ok(());
368 }
369 if self.poll()?.is_none() {
370 self.kill()?;
371 } else {
372 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
373 }
374 Ok(())
375 }
376
377 pub fn pid(&self) -> Option<u32> {
378 self.child
379 .lock()
380 .expect("child mutex poisoned")
381 .as_ref()
382 .map(|state| state.child.id())
383 }
384
385 pub fn returncode(&self) -> Option<i32> {
386 let v = self.shared.returncode.load(Ordering::Acquire);
387 if v == RETURNCODE_NOT_SET {
388 None
389 } else {
390 Some(v as i32)
391 }
392 }
393
394 pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
395 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
396 return false;
397 }
398 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
399 match stream {
400 StreamKind::Stdout => !guard.stdout_queue.is_empty(),
401 StreamKind::Stderr => !guard.stderr_queue.is_empty(),
402 }
403 }
404
405 pub fn has_pending_combined(&self) -> bool {
406 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
407 !guard.combined_queue.is_empty()
408 }
409
410 pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
411 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
412 return Vec::new();
413 }
414 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
415 let queue = match stream {
416 StreamKind::Stdout => &mut guard.stdout_queue,
417 StreamKind::Stderr => &mut guard.stderr_queue,
418 };
419 queue.drain(..).collect()
420 }
421
422 pub fn drain_combined(&self) -> Vec<StreamEvent> {
423 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
424 guard.combined_queue.drain(..).collect()
425 }
426
427 pub fn read_stream(
428 &self,
429 stream: StreamKind,
430 timeout: Option<Duration>,
431 ) -> ReadStatus<Vec<u8>> {
432 let deadline = timeout.map(|limit| Instant::now() + limit);
433 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
434
435 loop {
436 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
437 return ReadStatus::Eof;
438 }
439
440 let queue = match stream {
441 StreamKind::Stdout => &mut guard.stdout_queue,
442 StreamKind::Stderr => &mut guard.stderr_queue,
443 };
444 if let Some(line) = queue.pop_front() {
445 return ReadStatus::Line(line);
446 }
447
448 let closed = match stream {
449 StreamKind::Stdout => {
450 if self.config.stderr_mode == StderrMode::Stdout {
451 guard.stdout_closed && guard.stderr_closed
452 } else {
453 guard.stdout_closed
454 }
455 }
456 StreamKind::Stderr => guard.stderr_closed,
457 };
458 if closed {
459 return ReadStatus::Eof;
460 }
461
462 match deadline {
463 Some(deadline) => {
464 let now = Instant::now();
465 if now >= deadline {
466 return ReadStatus::Timeout;
467 }
468 let wait = deadline.saturating_duration_since(now);
469 let result = self
470 .shared
471 .condvar
472 .wait_timeout(guard, wait)
473 .expect("queue mutex poisoned");
474 guard = result.0;
475 if result.1.timed_out() {
476 return ReadStatus::Timeout;
477 }
478 }
479 None => {
480 guard = self
481 .shared
482 .condvar
483 .wait(guard)
484 .expect("queue mutex poisoned");
485 }
486 }
487 }
488 }
489
490 #[inline(never)]
492 pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
493 public_symbols::rp_native_process_read_combined_public(self, timeout)
494 }
495
496 fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
497 crate::rp_rust_debug_scope!("running_process_core::NativeProcess::read_combined");
498 let deadline = timeout.map(|limit| Instant::now() + limit);
499 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
500
501 loop {
502 if let Some(event) = guard.combined_queue.pop_front() {
503 return ReadStatus::Line(event);
504 }
505 if guard.stdout_closed && guard.stderr_closed {
506 return ReadStatus::Eof;
507 }
508
509 match deadline {
510 Some(deadline) => {
511 let now = Instant::now();
512 if now >= deadline {
513 return ReadStatus::Timeout;
514 }
515 let wait = deadline.saturating_duration_since(now);
516 let result = self
517 .shared
518 .condvar
519 .wait_timeout(guard, wait)
520 .expect("queue mutex poisoned");
521 guard = result.0;
522 if result.1.timed_out() {
523 return ReadStatus::Timeout;
524 }
525 }
526 None => {
527 guard = self
528 .shared
529 .condvar
530 .wait(guard)
531 .expect("queue mutex poisoned");
532 }
533 }
534 }
535 }
536
537 pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
538 self.shared
539 .queues
540 .lock()
541 .expect("queue mutex poisoned")
542 .stdout_history
543 .clone()
544 .into_iter()
545 .collect()
546 }
547
548 pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
549 if self.config.stderr_mode == StderrMode::Stdout {
550 return Vec::new();
551 }
552 self.shared
553 .queues
554 .lock()
555 .expect("queue mutex poisoned")
556 .stderr_history
557 .clone()
558 .into_iter()
559 .collect()
560 }
561
562 pub fn captured_combined(&self) -> Vec<StreamEvent> {
563 self.shared
564 .queues
565 .lock()
566 .expect("queue mutex poisoned")
567 .combined_history
568 .clone()
569 .into_iter()
570 .collect()
571 }
572
573 pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
574 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
575 return 0;
576 }
577 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
578 match stream {
579 StreamKind::Stdout => guard.stdout_history_bytes,
580 StreamKind::Stderr => guard.stderr_history_bytes,
581 }
582 }
583
584 pub fn captured_combined_bytes(&self) -> usize {
585 self.shared
586 .queues
587 .lock()
588 .expect("queue mutex poisoned")
589 .combined_history_bytes
590 }
591
592 pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
593 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
594 return 0;
595 }
596 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
597 match stream {
598 StreamKind::Stdout => {
599 let released = guard.stdout_history_bytes;
600 guard.stdout_history.clear();
601 guard.stdout_history_bytes = 0;
602 released
603 }
604 StreamKind::Stderr => {
605 let released = guard.stderr_history_bytes;
606 guard.stderr_history.clear();
607 guard.stderr_history_bytes = 0;
608 released
609 }
610 }
611 }
612
613 pub fn clear_captured_combined(&self) -> usize {
614 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
615 let released = guard.combined_history_bytes;
616 guard.combined_history.clear();
617 guard.combined_history_bytes = 0;
618 released
619 }
620
621 fn build_command(&self) -> Command {
622 let mut command = match &self.config.command {
623 CommandSpec::Shell(command) => shell_command(command),
624 CommandSpec::Argv(argv) => {
625 let mut command = Command::new(&argv[0]);
626 if argv.len() > 1 {
627 command.args(&argv[1..]);
628 }
629 command
630 }
631 };
632 if let Some(cwd) = &self.config.cwd {
633 command.current_dir(cwd);
634 }
635 if let Some(env) = &self.config.env {
636 command.env_clear();
637 command.envs(env.iter().map(|(k, v)| (k, v)));
638 }
639 #[cfg(windows)]
640 {
641 use std::os::windows::process::CommandExt;
642
643 let flags =
644 self.config.creationflags.unwrap_or(0) | windows_priority_flags(self.config.nice);
645 if flags != 0 {
646 command.creation_flags(flags);
647 }
648 }
649 #[cfg(unix)]
650 {
651 let create_process_group = self.config.create_process_group;
652 let nice = self.config.nice;
653 let containment = self.config.containment;
654
655 let needs_pre_exec = create_process_group || nice.is_some() || containment.is_some();
656
657 if needs_pre_exec {
658 use std::os::unix::process::CommandExt;
659
660 unsafe {
661 command.pre_exec(move || {
662 match containment {
663 Some(Containment::Contained) => {
664 if libc::setpgid(0, 0) == -1 {
666 return Err(std::io::Error::last_os_error());
667 }
668 #[cfg(target_os = "linux")]
674 {
675 if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGKILL) == -1 {
676 return Err(std::io::Error::last_os_error());
677 }
678 if libc::getppid() == 1 {
679 libc::_exit(1);
680 }
681 }
682 }
683 Some(Containment::Detached) => {
684 if libc::setsid() == -1 {
687 return Err(std::io::Error::last_os_error());
688 }
689 }
690 None => {
691 if create_process_group && libc::setpgid(0, 0) == -1 {
692 return Err(std::io::Error::last_os_error());
693 }
694 }
695 }
696 if let Some(nice) = nice {
697 let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
698 if result == -1 {
699 return Err(std::io::Error::last_os_error());
700 }
701 }
702 Ok(())
703 });
704 }
705 }
706 }
707 command
708 }
709
710 fn spawn_reader<R>(&self, pipe: R, source_stream: StreamKind, visible_stream: StreamKind)
711 where
712 R: Read + Send + 'static,
713 {
714 let shared = Arc::clone(&self.shared);
715 thread::spawn(move || {
716 let mut reader = pipe;
717 let mut chunk = [0_u8; 4096];
718 let mut pending = Vec::new();
719
720 loop {
721 match reader.read(&mut chunk) {
722 Ok(0) => break,
723 Ok(n) => feed_chunk(&shared, visible_stream, &mut pending, &chunk[..n]),
724 Err(_) => break,
725 }
726 }
727
728 if !pending.is_empty() {
729 emit_line(&shared, visible_stream, std::mem::take(&mut pending));
730 }
731
732 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
733 match source_stream {
734 StreamKind::Stdout => guard.stdout_closed = true,
735 StreamKind::Stderr => guard.stderr_closed = true,
736 }
737 shared.condvar.notify_all();
738 });
739 }
740
741 fn set_returncode(&self, code: i32) {
742 self.shared.returncode.store(code as i64, Ordering::Release);
743 self.shared.condvar.notify_all();
744 }
745
746 fn wait_for_capture_completion_impl(&self) {
747 crate::rp_rust_debug_scope!(
748 "running_process_core::NativeProcess::wait_for_capture_completion"
749 );
750 if !self.config.capture {
751 return;
752 }
753
754 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
755 while !(guard.stdout_closed && guard.stderr_closed) {
756 guard = self
757 .shared
758 .condvar
759 .wait(guard)
760 .expect("queue mutex poisoned");
761 }
762 }
763}
764
765#[cfg(unix)]
766pub fn unix_set_priority(pid: u32, nice: i32) -> Result<(), std::io::Error> {
767 let result = unsafe { libc::setpriority(libc::PRIO_PROCESS, pid, nice) };
768 if result == -1 {
769 return Err(std::io::Error::last_os_error());
770 }
771 Ok(())
772}
773
774#[cfg(unix)]
775pub fn unix_signal_process(pid: u32, signal: UnixSignal) -> Result<(), std::io::Error> {
776 let result = unsafe { libc::kill(pid as i32, unix_signal_raw(signal)) };
777 if result == -1 {
778 return Err(std::io::Error::last_os_error());
779 }
780 Ok(())
781}
782
783#[cfg(unix)]
784pub fn unix_signal_process_group(pid: i32, signal: UnixSignal) -> Result<(), std::io::Error> {
785 let result = unsafe { libc::killpg(pid, unix_signal_raw(signal)) };
786 if result == -1 {
787 return Err(std::io::Error::last_os_error());
788 }
789 Ok(())
790}
791
792fn log_spawned_child_pid(pid: u32) -> Result<(), std::io::Error> {
793 let Some(path) = std::env::var_os(CHILD_PID_LOG_PATH_ENV) else {
794 return Ok(());
795 };
796
797 let mut file = OpenOptions::new().create(true).append(true).open(path)?;
798 file.write_all(format!("{pid}\n").as_bytes())?;
799 file.flush()?;
800 Ok(())
801}
802
803#[cfg(windows)]
804fn assign_child_to_windows_kill_on_close_job_impl(
805 child: &Child,
806) -> Result<WindowsJobHandle, std::io::Error> {
807 crate::rp_rust_debug_scope!("running_process_core::assign_child_to_windows_kill_on_close_job");
808 use std::mem::zeroed;
809 use std::os::windows::io::AsRawHandle;
810
811 use winapi::shared::minwindef::FALSE;
812 use winapi::um::handleapi::{CloseHandle, INVALID_HANDLE_VALUE};
813 use winapi::um::jobapi2::{
814 AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
815 };
816 use winapi::um::winnt::{
817 JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
818 JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
819 };
820
821 let handle = child.as_raw_handle();
822 let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
823 if job.is_null() || job == INVALID_HANDLE_VALUE {
824 return Err(std::io::Error::last_os_error());
825 }
826
827 let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
828 info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
829 let ok = unsafe {
830 SetInformationJobObject(
831 job,
832 JobObjectExtendedLimitInformation,
833 (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
834 std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
835 )
836 };
837 if ok == FALSE {
838 let err = std::io::Error::last_os_error();
839 unsafe { CloseHandle(job) };
840 return Err(err);
841 }
842
843 let ok = unsafe { AssignProcessToJobObject(job, handle.cast()) };
844 if ok == FALSE {
845 let err = std::io::Error::last_os_error();
846 unsafe { CloseHandle(job) };
847 return Err(err);
848 }
849
850 Ok(WindowsJobHandle(job as usize))
851}
852
853fn feed_chunk(shared: &Arc<SharedState>, stream: StreamKind, pending: &mut Vec<u8>, chunk: &[u8]) {
854 let mut start = 0;
855 let mut index = 0;
856
857 while index < chunk.len() {
858 if chunk[index] == b'\n' {
859 let end = if index > start && chunk[index - 1] == b'\r' {
860 index - 1
861 } else {
862 index
863 };
864 pending.extend_from_slice(&chunk[start..end]);
865 if !pending.is_empty() {
866 emit_line(shared, stream, std::mem::take(pending));
867 }
868 start = index + 1;
869 }
870 index += 1;
871 }
872
873 pending.extend_from_slice(&chunk[start..]);
874}
875
876fn emit_line(shared: &Arc<SharedState>, stream: StreamKind, line: Vec<u8>) {
877 let event = StreamEvent { stream, line };
878 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
879 match event.stream {
880 StreamKind::Stdout => {
881 guard.stdout_history_bytes += event.line.len();
882 guard.stdout_history.push_back(event.line.clone());
883 guard.stdout_queue.push_back(event.line.clone());
884 }
885 StreamKind::Stderr => {
886 guard.stderr_history_bytes += event.line.len();
887 guard.stderr_history.push_back(event.line.clone());
888 guard.stderr_queue.push_back(event.line.clone());
889 }
890 }
891 guard.combined_history_bytes += event.line.len();
892 guard.combined_history.push_back(event.clone());
893 guard.combined_queue.push_back(event);
894 shared.condvar.notify_all();
895}
896
897fn shell_command(command: &str) -> Command {
898 #[cfg(windows)]
899 {
900 use std::os::windows::process::CommandExt;
901
902 let mut cmd = Command::new("cmd");
903 cmd.raw_arg("/D /S /C \"");
904 cmd.raw_arg(command);
905 cmd.raw_arg("\"");
906 cmd
907 }
908 #[cfg(not(windows))]
909 {
910 let mut cmd = Command::new("sh");
911 cmd.arg("-lc").arg(command);
912 cmd
913 }
914}
915
916fn exit_code(status: std::process::ExitStatus) -> i32 {
917 #[cfg(unix)]
918 {
919 use std::os::unix::process::ExitStatusExt;
920 status
921 .code()
922 .unwrap_or_else(|| -status.signal().unwrap_or(1))
923 }
924 #[cfg(not(unix))]
925 {
926 status.code().unwrap_or(1)
927 }
928}
929
930#[cfg(unix)]
931fn unix_signal_raw(signal: UnixSignal) -> i32 {
932 match signal {
933 UnixSignal::Interrupt => libc::SIGINT,
934 UnixSignal::Terminate => libc::SIGTERM,
935 UnixSignal::Kill => libc::SIGKILL,
936 }
937}
938
939#[cfg(windows)]
940fn windows_priority_flags(nice: Option<i32>) -> u32 {
941 const IDLE_PRIORITY_CLASS: u32 = 0x0000_0040;
942 const BELOW_NORMAL_PRIORITY_CLASS: u32 = 0x0000_4000;
943 const ABOVE_NORMAL_PRIORITY_CLASS: u32 = 0x0000_8000;
944 const HIGH_PRIORITY_CLASS: u32 = 0x0000_0080;
945
946 match nice {
947 Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
948 Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
949 Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
950 Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
951 _ => 0,
952 }
953}
954#[cfg(test)]
955mod tests {
956 use super::*;
957
958 #[test]
961 fn stream_kind_as_str_stdout() {
962 assert_eq!(StreamKind::Stdout.as_str(), "stdout");
963 }
964
965 #[test]
966 fn stream_kind_as_str_stderr() {
967 assert_eq!(StreamKind::Stderr.as_str(), "stderr");
968 }
969
970 #[test]
971 fn stream_kind_equality() {
972 assert_eq!(StreamKind::Stdout, StreamKind::Stdout);
973 assert_ne!(StreamKind::Stdout, StreamKind::Stderr);
974 }
975
976 #[test]
979 fn stream_event_clone() {
980 let event = StreamEvent {
981 stream: StreamKind::Stdout,
982 line: b"hello".to_vec(),
983 };
984 let cloned = event.clone();
985 assert_eq!(event, cloned);
986 }
987
988 #[test]
991 fn read_status_line_variant() {
992 let status: ReadStatus<Vec<u8>> = ReadStatus::Line(b"data".to_vec());
993 assert!(matches!(status, ReadStatus::Line(ref v) if v == b"data"));
994 }
995
996 #[test]
997 fn read_status_timeout_variant() {
998 let status: ReadStatus<Vec<u8>> = ReadStatus::Timeout;
999 assert!(matches!(status, ReadStatus::Timeout));
1000 }
1001
1002 #[test]
1003 fn read_status_eof_variant() {
1004 let status: ReadStatus<Vec<u8>> = ReadStatus::Eof;
1005 assert!(matches!(status, ReadStatus::Eof));
1006 }
1007
1008 #[test]
1011 fn process_error_display_already_started() {
1012 assert_eq!(
1013 ProcessError::AlreadyStarted.to_string(),
1014 "process already started"
1015 );
1016 }
1017
1018 #[test]
1019 fn process_error_display_not_running() {
1020 assert_eq!(
1021 ProcessError::NotRunning.to_string(),
1022 "process is not running"
1023 );
1024 }
1025
1026 #[test]
1027 fn process_error_display_stdin_unavailable() {
1028 assert_eq!(
1029 ProcessError::StdinUnavailable.to_string(),
1030 "process stdin is not available"
1031 );
1032 }
1033
1034 #[test]
1035 fn process_error_display_timeout() {
1036 assert_eq!(ProcessError::Timeout.to_string(), "process timed out");
1037 }
1038
1039 #[test]
1040 fn process_error_display_spawn() {
1041 let err = ProcessError::Spawn(std::io::Error::new(
1042 std::io::ErrorKind::NotFound,
1043 "not found",
1044 ));
1045 assert!(err.to_string().contains("not found"));
1046 }
1047
1048 #[test]
1049 fn process_error_display_io() {
1050 let err = ProcessError::Io(std::io::Error::new(
1051 std::io::ErrorKind::BrokenPipe,
1052 "broken",
1053 ));
1054 assert!(err.to_string().contains("broken"));
1055 }
1056
1057 #[test]
1060 fn command_spec_shell_variant() {
1061 let spec = CommandSpec::Shell("echo hello".to_string());
1062 assert!(matches!(spec, CommandSpec::Shell(ref s) if s == "echo hello"));
1063 }
1064
1065 #[test]
1066 fn command_spec_argv_variant() {
1067 let spec = CommandSpec::Argv(vec!["echo".to_string(), "hello".to_string()]);
1068 assert!(matches!(spec, CommandSpec::Argv(ref v) if v.len() == 2));
1069 }
1070
1071 #[test]
1074 fn stdin_mode_equality() {
1075 assert_eq!(StdinMode::Inherit, StdinMode::Inherit);
1076 assert_ne!(StdinMode::Piped, StdinMode::Null);
1077 }
1078
1079 #[test]
1080 fn stderr_mode_equality() {
1081 assert_eq!(StderrMode::Stdout, StderrMode::Stdout);
1082 assert_ne!(StderrMode::Stdout, StderrMode::Pipe);
1083 }
1084
1085 #[test]
1088 fn shared_state_new_with_capture() {
1089 let state = SharedState::new(true);
1090 let queues = state.queues.lock().unwrap();
1091 assert!(!queues.stdout_closed);
1092 assert!(!queues.stderr_closed);
1093 assert!(queues.stdout_queue.is_empty());
1094 assert!(queues.stderr_queue.is_empty());
1095 }
1096
1097 #[test]
1098 fn shared_state_new_without_capture() {
1099 let state = SharedState::new(false);
1100 let queues = state.queues.lock().unwrap();
1101 assert!(queues.stdout_closed);
1102 assert!(queues.stderr_closed);
1103 }
1104
1105 #[test]
1106 fn shared_state_returncode_initially_not_set() {
1107 let state = SharedState::new(true);
1108 let code = state.returncode.load(Ordering::Acquire);
1109 assert_eq!(code, RETURNCODE_NOT_SET);
1110 }
1111
1112 #[test]
1115 fn feed_chunk_single_line_with_newline() {
1116 let shared = Arc::new(SharedState::new(true));
1117 let mut pending = Vec::new();
1118 feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"hello\n");
1119 let queues = shared.queues.lock().unwrap();
1120 assert_eq!(queues.stdout_queue.len(), 1);
1121 assert_eq!(queues.stdout_queue[0], b"hello");
1122 assert!(pending.is_empty());
1123 }
1124
1125 #[test]
1126 fn feed_chunk_crlf_stripping() {
1127 let shared = Arc::new(SharedState::new(true));
1128 let mut pending = Vec::new();
1129 feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"hello\r\n");
1130 let queues = shared.queues.lock().unwrap();
1131 assert_eq!(queues.stdout_queue.len(), 1);
1132 assert_eq!(queues.stdout_queue[0], b"hello");
1133 }
1134
1135 #[test]
1136 fn feed_chunk_multiple_lines() {
1137 let shared = Arc::new(SharedState::new(true));
1138 let mut pending = Vec::new();
1139 feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"a\nb\nc\n");
1140 let queues = shared.queues.lock().unwrap();
1141 assert_eq!(queues.stdout_queue.len(), 3);
1142 assert_eq!(queues.stdout_queue[0], b"a");
1143 assert_eq!(queues.stdout_queue[1], b"b");
1144 assert_eq!(queues.stdout_queue[2], b"c");
1145 }
1146
1147 #[test]
1148 fn feed_chunk_no_newline_stays_pending() {
1149 let shared = Arc::new(SharedState::new(true));
1150 let mut pending = Vec::new();
1151 feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"partial");
1152 let queues = shared.queues.lock().unwrap();
1153 assert!(queues.stdout_queue.is_empty());
1154 assert_eq!(pending, b"partial");
1155 }
1156
1157 #[test]
1158 fn feed_chunk_accumulates_pending() {
1159 let shared = Arc::new(SharedState::new(true));
1160 let mut pending = Vec::new();
1161 feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"hel");
1162 feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"lo\n");
1163 let queues = shared.queues.lock().unwrap();
1164 assert_eq!(queues.stdout_queue.len(), 1);
1165 assert_eq!(queues.stdout_queue[0], b"hello");
1166 assert!(pending.is_empty());
1167 }
1168
1169 #[test]
1170 fn feed_chunk_empty_line_not_emitted() {
1171 let shared = Arc::new(SharedState::new(true));
1172 let mut pending = Vec::new();
1173 feed_chunk(&shared, StreamKind::Stdout, &mut pending, b"\n");
1174 let queues = shared.queues.lock().unwrap();
1175 assert!(queues.stdout_queue.is_empty());
1176 }
1177
1178 #[test]
1179 fn feed_chunk_stderr_goes_to_stderr_queue() {
1180 let shared = Arc::new(SharedState::new(true));
1181 let mut pending = Vec::new();
1182 feed_chunk(&shared, StreamKind::Stderr, &mut pending, b"error\n");
1183 let queues = shared.queues.lock().unwrap();
1184 assert!(queues.stdout_queue.is_empty());
1185 assert_eq!(queues.stderr_queue.len(), 1);
1186 assert_eq!(queues.stderr_queue[0], b"error");
1187 }
1188
1189 #[test]
1192 fn emit_line_updates_all_queues_and_history() {
1193 let shared = Arc::new(SharedState::new(true));
1194 emit_line(&shared, StreamKind::Stdout, b"test".to_vec());
1195 let queues = shared.queues.lock().unwrap();
1196 assert_eq!(queues.stdout_queue.len(), 1);
1197 assert_eq!(queues.stdout_history.len(), 1);
1198 assert_eq!(queues.stdout_history_bytes, 4);
1199 assert_eq!(queues.combined_queue.len(), 1);
1200 assert_eq!(queues.combined_history.len(), 1);
1201 assert_eq!(queues.combined_history_bytes, 4);
1202 }
1203
1204 #[test]
1205 fn emit_line_stderr_updates_stderr_queues() {
1206 let shared = Arc::new(SharedState::new(true));
1207 emit_line(&shared, StreamKind::Stderr, b"err".to_vec());
1208 let queues = shared.queues.lock().unwrap();
1209 assert_eq!(queues.stderr_queue.len(), 1);
1210 assert_eq!(queues.stderr_history.len(), 1);
1211 assert_eq!(queues.stderr_history_bytes, 3);
1212 assert_eq!(queues.combined_queue.len(), 1);
1213 assert_eq!(queues.combined_history_bytes, 3);
1214 }
1215
1216 #[test]
1219 fn native_process_returncode_none_before_start() {
1220 let process = NativeProcess::new(ProcessConfig {
1221 command: CommandSpec::Argv(vec!["echo".into()]),
1222 cwd: None,
1223 env: None,
1224 capture: false,
1225 stderr_mode: StderrMode::Stdout,
1226 creationflags: None,
1227 create_process_group: false,
1228 stdin_mode: StdinMode::Inherit,
1229 nice: None,
1230 containment: None,
1231 });
1232 assert!(process.returncode().is_none());
1233 }
1234
1235 #[test]
1236 fn native_process_pid_none_before_start() {
1237 let process = NativeProcess::new(ProcessConfig {
1238 command: CommandSpec::Argv(vec!["echo".into()]),
1239 cwd: None,
1240 env: None,
1241 capture: false,
1242 stderr_mode: StderrMode::Stdout,
1243 creationflags: None,
1244 create_process_group: false,
1245 stdin_mode: StdinMode::Inherit,
1246 nice: None,
1247 containment: None,
1248 });
1249 assert!(process.pid().is_none());
1250 }
1251
1252 #[test]
1253 fn native_process_has_pending_false_when_no_capture() {
1254 let process = NativeProcess::new(ProcessConfig {
1255 command: CommandSpec::Argv(vec!["echo".into()]),
1256 cwd: None,
1257 env: None,
1258 capture: false,
1259 stderr_mode: StderrMode::Stdout,
1260 creationflags: None,
1261 create_process_group: false,
1262 stdin_mode: StdinMode::Inherit,
1263 nice: None,
1264 containment: None,
1265 });
1266 assert!(!process.has_pending_stream(StreamKind::Stdout));
1267 assert!(!process.has_pending_combined());
1268 }
1269
1270 #[test]
1271 fn native_process_drain_empty_when_no_capture() {
1272 let process = NativeProcess::new(ProcessConfig {
1273 command: CommandSpec::Argv(vec!["echo".into()]),
1274 cwd: None,
1275 env: None,
1276 capture: false,
1277 stderr_mode: StderrMode::Stdout,
1278 creationflags: None,
1279 create_process_group: false,
1280 stdin_mode: StdinMode::Inherit,
1281 nice: None,
1282 containment: None,
1283 });
1284 assert!(process.drain_stream(StreamKind::Stdout).is_empty());
1285 assert!(process.drain_combined().is_empty());
1286 }
1287
1288 #[test]
1289 fn native_process_stderr_not_pending_when_merged() {
1290 let process = NativeProcess::new(ProcessConfig {
1291 command: CommandSpec::Argv(vec!["echo".into()]),
1292 cwd: None,
1293 env: None,
1294 capture: true,
1295 stderr_mode: StderrMode::Stdout,
1296 creationflags: None,
1297 create_process_group: false,
1298 stdin_mode: StdinMode::Inherit,
1299 nice: None,
1300 containment: None,
1301 });
1302 assert!(!process.has_pending_stream(StreamKind::Stderr));
1303 }
1304
1305 #[test]
1306 fn native_process_drain_stderr_empty_when_merged() {
1307 let process = NativeProcess::new(ProcessConfig {
1308 command: CommandSpec::Argv(vec!["echo".into()]),
1309 cwd: None,
1310 env: None,
1311 capture: true,
1312 stderr_mode: StderrMode::Stdout,
1313 creationflags: None,
1314 create_process_group: false,
1315 stdin_mode: StdinMode::Inherit,
1316 nice: None,
1317 containment: None,
1318 });
1319 assert!(process.drain_stream(StreamKind::Stderr).is_empty());
1320 }
1321
1322 #[test]
1323 fn native_process_captured_stderr_empty_when_merged() {
1324 let process = NativeProcess::new(ProcessConfig {
1325 command: CommandSpec::Argv(vec!["echo".into()]),
1326 cwd: None,
1327 env: None,
1328 capture: true,
1329 stderr_mode: StderrMode::Stdout,
1330 creationflags: None,
1331 create_process_group: false,
1332 stdin_mode: StdinMode::Inherit,
1333 nice: None,
1334 containment: None,
1335 });
1336 assert!(process.captured_stderr().is_empty());
1337 }
1338
1339 #[test]
1340 fn native_process_captured_stream_bytes_zero_when_merged_stderr() {
1341 let process = NativeProcess::new(ProcessConfig {
1342 command: CommandSpec::Argv(vec!["echo".into()]),
1343 cwd: None,
1344 env: None,
1345 capture: true,
1346 stderr_mode: StderrMode::Stdout,
1347 creationflags: None,
1348 create_process_group: false,
1349 stdin_mode: StdinMode::Inherit,
1350 nice: None,
1351 containment: None,
1352 });
1353 assert_eq!(process.captured_stream_bytes(StreamKind::Stderr), 0);
1354 }
1355
1356 #[test]
1357 fn native_process_clear_captured_stderr_zero_when_merged() {
1358 let process = NativeProcess::new(ProcessConfig {
1359 command: CommandSpec::Argv(vec!["echo".into()]),
1360 cwd: None,
1361 env: None,
1362 capture: true,
1363 stderr_mode: StderrMode::Stdout,
1364 creationflags: None,
1365 create_process_group: false,
1366 stdin_mode: StdinMode::Inherit,
1367 nice: None,
1368 containment: None,
1369 });
1370 assert_eq!(process.clear_captured_stream(StreamKind::Stderr), 0);
1371 }
1372
1373 #[test]
1374 fn native_process_read_stream_eof_when_stderr_merged() {
1375 let process = NativeProcess::new(ProcessConfig {
1376 command: CommandSpec::Argv(vec!["echo".into()]),
1377 cwd: None,
1378 env: None,
1379 capture: true,
1380 stderr_mode: StderrMode::Stdout,
1381 creationflags: None,
1382 create_process_group: false,
1383 stdin_mode: StdinMode::Inherit,
1384 nice: None,
1385 containment: None,
1386 });
1387 assert_eq!(
1388 process.read_stream(StreamKind::Stderr, Some(Duration::from_millis(10))),
1389 ReadStatus::Eof
1390 );
1391 }
1392
1393 #[test]
1396 fn log_spawned_child_pid_noop_without_env() {
1397 std::env::remove_var("RUNNING_PROCESS_CHILD_PID_LOG_PATH");
1398 assert!(log_spawned_child_pid(12345).is_ok());
1399 }
1400
1401 #[test]
1404 fn shell_command_creates_command() {
1405 let cmd = shell_command("echo test");
1406 let _ = format!("{:?}", cmd);
1407 }
1408
1409 #[test]
1412 fn exit_code_from_success() {
1413 let output = std::process::Command::new("python")
1414 .args(["-c", "pass"])
1415 .output()
1416 .unwrap();
1417 assert_eq!(exit_code(output.status), 0);
1418 }
1419
1420 #[test]
1421 fn exit_code_from_nonzero() {
1422 let output = std::process::Command::new("python")
1423 .args(["-c", "import sys; sys.exit(42)"])
1424 .output()
1425 .unwrap();
1426 assert_eq!(exit_code(output.status), 42);
1427 }
1428
1429 #[cfg(windows)]
1432 mod windows_tests {
1433 use super::*;
1434
1435 const IDLE_PRIORITY_CLASS: u32 = 0x0000_0040;
1436 const BELOW_NORMAL_PRIORITY_CLASS: u32 = 0x0000_4000;
1437 const ABOVE_NORMAL_PRIORITY_CLASS: u32 = 0x0000_8000;
1438 const HIGH_PRIORITY_CLASS: u32 = 0x0000_0080;
1439
1440 #[test]
1441 fn priority_flags_none() {
1442 assert_eq!(windows_priority_flags(None), 0);
1443 }
1444
1445 #[test]
1446 fn priority_flags_zero() {
1447 assert_eq!(windows_priority_flags(Some(0)), 0);
1448 }
1449
1450 #[test]
1451 fn priority_flags_high_nice_idle() {
1452 assert_eq!(windows_priority_flags(Some(15)), IDLE_PRIORITY_CLASS);
1453 assert_eq!(windows_priority_flags(Some(20)), IDLE_PRIORITY_CLASS);
1454 }
1455
1456 #[test]
1457 fn priority_flags_low_positive_below_normal() {
1458 assert_eq!(windows_priority_flags(Some(1)), BELOW_NORMAL_PRIORITY_CLASS);
1459 assert_eq!(
1460 windows_priority_flags(Some(14)),
1461 BELOW_NORMAL_PRIORITY_CLASS
1462 );
1463 }
1464
1465 #[test]
1466 fn priority_flags_negative_above_normal() {
1467 assert_eq!(
1468 windows_priority_flags(Some(-1)),
1469 ABOVE_NORMAL_PRIORITY_CLASS
1470 );
1471 assert_eq!(
1472 windows_priority_flags(Some(-14)),
1473 ABOVE_NORMAL_PRIORITY_CLASS
1474 );
1475 }
1476
1477 #[test]
1478 fn priority_flags_very_negative_high() {
1479 assert_eq!(windows_priority_flags(Some(-15)), HIGH_PRIORITY_CLASS);
1480 assert_eq!(windows_priority_flags(Some(-20)), HIGH_PRIORITY_CLASS);
1481 }
1482 }
1483
1484 #[test]
1487 fn process_config_clone() {
1488 let config = ProcessConfig {
1489 command: CommandSpec::Shell("echo".to_string()),
1490 cwd: Some("/tmp".into()),
1491 env: Some(vec![("KEY".to_string(), "VAL".to_string())]),
1492 capture: true,
1493 stderr_mode: StderrMode::Pipe,
1494 creationflags: Some(0x10),
1495 create_process_group: true,
1496 stdin_mode: StdinMode::Piped,
1497 nice: Some(5),
1498 containment: None,
1499 };
1500 let cloned = config.clone();
1501 assert!(cloned.capture);
1502 assert_eq!(cloned.nice, Some(5));
1503 }
1504
1505 #[test]
1508 fn render_rust_debug_traces_returns_string() {
1509 let result = render_rust_debug_traces();
1510 let _ = result.len();
1511 }
1512
1513 #[test]
1516 fn rust_debug_scope_guard_enters_and_drops() {
1517 let _guard = RustDebugScopeGuard::enter("test_scope", file!(), line!());
1518 let traces = render_rust_debug_traces();
1519 assert!(traces.contains("test_scope"));
1520 drop(_guard);
1521 }
1522
1523 #[cfg(unix)]
1526 mod unix_tests {
1527 use super::*;
1528
1529 #[test]
1530 fn unix_signal_raw_values() {
1531 assert_eq!(unix_signal_raw(UnixSignal::Interrupt), libc::SIGINT);
1532 assert_eq!(unix_signal_raw(UnixSignal::Terminate), libc::SIGTERM);
1533 assert_eq!(unix_signal_raw(UnixSignal::Kill), libc::SIGKILL);
1534 }
1535
1536 #[test]
1537 fn unix_signal_enum_equality() {
1538 assert_eq!(UnixSignal::Interrupt, UnixSignal::Interrupt);
1539 assert_ne!(UnixSignal::Interrupt, UnixSignal::Kill);
1540 }
1541 }
1542
1543 #[test]
1546 fn wait_for_capture_completion_noop_without_capture() {
1547 let process = NativeProcess::new(ProcessConfig {
1548 command: CommandSpec::Argv(vec!["echo".into()]),
1549 cwd: None,
1550 env: None,
1551 capture: false,
1552 stderr_mode: StderrMode::Stdout,
1553 creationflags: None,
1554 create_process_group: false,
1555 stdin_mode: StdinMode::Inherit,
1556 nice: None,
1557 containment: None,
1558 });
1559 process.wait_for_capture_completion_impl();
1560 }
1561
1562 #[test]
1565 fn build_command_from_argv() {
1566 let process = NativeProcess::new(ProcessConfig {
1567 command: CommandSpec::Argv(vec!["echo".into(), "hello".into(), "world".into()]),
1568 cwd: None,
1569 env: None,
1570 capture: false,
1571 stderr_mode: StderrMode::Stdout,
1572 creationflags: None,
1573 create_process_group: false,
1574 stdin_mode: StdinMode::Inherit,
1575 nice: None,
1576 containment: None,
1577 });
1578 let cmd = process.build_command();
1579 assert_eq!(cmd.get_program(), "echo");
1580 let args: Vec<_> = cmd.get_args().collect();
1581 assert_eq!(args, vec!["hello", "world"]);
1582 }
1583
1584 #[test]
1585 fn build_command_from_shell() {
1586 let process = NativeProcess::new(ProcessConfig {
1587 command: CommandSpec::Shell("echo test".into()),
1588 cwd: None,
1589 env: None,
1590 capture: false,
1591 stderr_mode: StderrMode::Stdout,
1592 creationflags: None,
1593 create_process_group: false,
1594 stdin_mode: StdinMode::Inherit,
1595 nice: None,
1596 containment: None,
1597 });
1598 let cmd = process.build_command();
1599 let program = cmd.get_program().to_string_lossy().to_string();
1601 #[cfg(windows)]
1602 assert!(
1603 program.contains("cmd"),
1604 "expected cmd shell, got {}",
1605 program
1606 );
1607 #[cfg(not(windows))]
1608 assert!(program.contains("sh"), "expected sh shell, got {}", program);
1609 }
1610
1611 #[test]
1612 fn build_command_with_cwd() {
1613 let tmp = std::env::temp_dir();
1614 let process = NativeProcess::new(ProcessConfig {
1615 command: CommandSpec::Argv(vec!["echo".into()]),
1616 cwd: Some(tmp.clone()),
1617 env: None,
1618 capture: false,
1619 stderr_mode: StderrMode::Stdout,
1620 creationflags: None,
1621 create_process_group: false,
1622 stdin_mode: StdinMode::Inherit,
1623 nice: None,
1624 containment: None,
1625 });
1626 let cmd = process.build_command();
1627 assert_eq!(cmd.get_current_dir().unwrap(), &tmp);
1628 }
1629
1630 #[test]
1631 fn build_command_with_env() {
1632 let process = NativeProcess::new(ProcessConfig {
1633 command: CommandSpec::Argv(vec!["echo".into()]),
1634 cwd: None,
1635 env: Some(vec![
1636 ("FOO".into(), "bar".into()),
1637 ("BAZ".into(), "qux".into()),
1638 ]),
1639 capture: false,
1640 stderr_mode: StderrMode::Stdout,
1641 creationflags: None,
1642 create_process_group: false,
1643 stdin_mode: StdinMode::Inherit,
1644 nice: None,
1645 containment: None,
1646 });
1647 let cmd = process.build_command();
1648 let envs: Vec<_> = cmd.get_envs().collect();
1649 assert!(envs
1650 .iter()
1651 .any(|(k, v)| *k == "FOO" && *v == Some(std::ffi::OsStr::new("bar"))));
1652 assert!(envs
1653 .iter()
1654 .any(|(k, v)| *k == "BAZ" && *v == Some(std::ffi::OsStr::new("qux"))));
1655 }
1656
1657 #[test]
1658 fn build_command_single_argv() {
1659 let process = NativeProcess::new(ProcessConfig {
1660 command: CommandSpec::Argv(vec!["echo".into()]),
1661 cwd: None,
1662 env: None,
1663 capture: false,
1664 stderr_mode: StderrMode::Stdout,
1665 creationflags: None,
1666 create_process_group: false,
1667 stdin_mode: StdinMode::Inherit,
1668 nice: None,
1669 containment: None,
1670 });
1671 let cmd = process.build_command();
1672 assert_eq!(cmd.get_program(), "echo");
1673 assert_eq!(cmd.get_args().count(), 0);
1674 }
1675
1676 #[test]
1679 fn set_returncode_updates_shared_state() {
1680 let process = NativeProcess::new(ProcessConfig {
1681 command: CommandSpec::Argv(vec!["echo".into()]),
1682 cwd: None,
1683 env: None,
1684 capture: false,
1685 stderr_mode: StderrMode::Stdout,
1686 creationflags: None,
1687 create_process_group: false,
1688 stdin_mode: StdinMode::Inherit,
1689 nice: None,
1690 containment: None,
1691 });
1692 assert!(process.returncode().is_none());
1693 process.set_returncode(42);
1694 assert_eq!(process.returncode(), Some(42));
1695 }
1696
1697 #[test]
1698 fn set_returncode_overwrites() {
1699 let process = NativeProcess::new(ProcessConfig {
1700 command: CommandSpec::Argv(vec!["echo".into()]),
1701 cwd: None,
1702 env: None,
1703 capture: false,
1704 stderr_mode: StderrMode::Stdout,
1705 creationflags: None,
1706 create_process_group: false,
1707 stdin_mode: StdinMode::Inherit,
1708 nice: None,
1709 containment: None,
1710 });
1711 process.set_returncode(1);
1712 process.set_returncode(2);
1713 assert_eq!(process.returncode(), Some(2));
1714 }
1715
1716 #[test]
1719 fn shared_state_with_capture_queues_open() {
1720 let state = SharedState::new(true);
1721 let guard = state.queues.lock().unwrap();
1722 assert!(!guard.stdout_closed);
1723 assert!(!guard.stderr_closed);
1724 }
1725
1726 #[test]
1727 fn shared_state_without_capture_queues_closed() {
1728 let state = SharedState::new(false);
1729 let guard = state.queues.lock().unwrap();
1730 assert!(guard.stdout_closed);
1731 assert!(guard.stderr_closed);
1732 }
1733
1734 #[test]
1737 fn process_error_display_io_variant() {
1738 let err = ProcessError::Io(std::io::Error::new(
1739 std::io::ErrorKind::BrokenPipe,
1740 "pipe broken",
1741 ));
1742 let msg = format!("{}", err);
1743 assert!(msg.contains("pipe broken"));
1744 }
1745
1746 #[test]
1747 fn process_error_display_spawn_variant() {
1748 let err = ProcessError::Spawn(std::io::Error::new(
1749 std::io::ErrorKind::NotFound,
1750 "not found",
1751 ));
1752 let msg = format!("{}", err);
1753 assert!(msg.contains("not found"));
1754 }
1755
1756 #[test]
1759 fn shell_command_returns_command_with_shell() {
1760 let cmd = shell_command("echo test");
1761 let program = cmd.get_program().to_string_lossy().to_string();
1762 #[cfg(windows)]
1763 assert!(program.contains("cmd"));
1764 #[cfg(not(windows))]
1765 assert!(program.contains("sh"));
1766 }
1767}