1use std::collections::VecDeque;
2use std::io::Read;
3use std::process::{Child, Command, Stdio};
4use std::sync::atomic::{AtomicI64, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9pub mod console_detect;
10pub mod containment;
11mod helpers;
12#[cfg(feature = "originator-scan")]
13pub mod originator;
14#[cfg(feature = "client")]
19pub mod proto {
20 pub mod daemon {
21 include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
22 }
23}
24
25#[cfg(feature = "client")]
26pub mod client;
27
28#[cfg(feature = "daemon")]
31pub mod daemon;
32pub mod pty;
33mod public_symbols;
34mod rust_debug;
35pub mod spawn;
36mod types;
37#[cfg(unix)]
38mod unix;
39#[cfg(windows)]
40mod windows;
41
42pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
43pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
44#[cfg(feature = "originator-scan")]
45pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
46pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
47pub use spawn::{
48 spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
49 StdioSource,
50};
51pub use types::{
52 CommandSpec, ProcessConfig, ProcessError, ReadStatus, StderrMode, StdinMode, StreamEvent,
53 StreamKind,
54};
55
56pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
57#[cfg(unix)]
58pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
59#[cfg(windows)]
60pub(crate) use windows::{
61 assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
62 WindowsJobHandle,
63};
64
65#[macro_export]
66macro_rules! rp_rust_debug_scope {
67 ($label:expr) => {
68 let _running_process_rust_debug_scope =
69 $crate::RustDebugScopeGuard::enter($label, file!(), line!());
70 };
71}
72
73#[derive(Default)]
74struct QueueState {
75 stdout_queue: VecDeque<Vec<u8>>,
76 stderr_queue: VecDeque<Vec<u8>>,
77 combined_queue: VecDeque<StreamEvent>,
78 stdout_history: VecDeque<Vec<u8>>,
79 stderr_history: VecDeque<Vec<u8>>,
80 combined_history: VecDeque<StreamEvent>,
81 stdout_history_bytes: usize,
82 stderr_history_bytes: usize,
83 combined_history_bytes: usize,
84 stdout_closed: bool,
85 stderr_closed: bool,
86}
87
88const RETURNCODE_NOT_SET: i64 = i64::MIN;
90
91struct SharedState {
92 queues: Mutex<QueueState>,
93 condvar: Condvar,
94 returncode: AtomicI64,
97}
98
99struct ChildState {
100 child: Child,
101 #[cfg(windows)]
102 _job: WindowsJobHandle,
103}
104
105impl SharedState {
106 fn new(capture: bool) -> Self {
107 let queues = QueueState {
108 stdout_closed: !capture,
109 stderr_closed: !capture,
110 ..QueueState::default()
111 };
112 Self {
113 queues: Mutex::new(queues),
114 condvar: Condvar::new(),
115 returncode: AtomicI64::new(RETURNCODE_NOT_SET),
116 }
117 }
118}
119
120pub struct NativeProcess {
121 config: ProcessConfig,
122 child: Arc<Mutex<Option<ChildState>>>,
123 shared: Arc<SharedState>,
124 #[cfg(windows)]
125 capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
126}
127
128impl NativeProcess {
129 pub fn new(config: ProcessConfig) -> Self {
130 Self {
131 shared: Arc::new(SharedState::new(config.capture)),
132 child: Arc::new(Mutex::new(None)),
133 config,
134 #[cfg(windows)]
135 capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
136 }
137 }
138
139 #[inline(never)]
141 pub fn start(&self) -> Result<(), ProcessError> {
142 public_symbols::rp_native_process_start_public(self)
143 }
144
145 fn start_impl(&self) -> Result<(), ProcessError> {
146 crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
147 let mut guard = self.child.lock().expect("child mutex poisoned");
148 if guard.is_some() {
149 return Err(ProcessError::AlreadyStarted);
150 }
151
152 let mut command = self.build_command();
153 match self.config.stdin_mode {
154 StdinMode::Inherit => {}
155 StdinMode::Piped => {
156 command.stdin(Stdio::piped());
157 }
158 StdinMode::Null => {
159 command.stdin(Stdio::null());
160 }
161 }
162 if self.config.capture {
163 command.stdout(Stdio::piped());
164 command.stderr(Stdio::piped());
165 }
166
167 let mut child = command.spawn().map_err(ProcessError::Spawn)?;
168 log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
169 #[cfg(windows)]
170 let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
171 .map_err(ProcessError::Spawn)?;
172 if self.config.capture {
173 let stdout = child.stdout.take().expect("stdout pipe missing");
174 let stderr = child.stderr.take().expect("stderr pipe missing");
175 #[cfg(windows)]
176 {
177 use std::os::windows::io::AsRawHandle;
178 let mut handles = self
179 .capture_pipe_handles
180 .lock()
181 .expect("capture pipe handles mutex poisoned");
182 handles.stdout = Some(stdout.as_raw_handle() as usize);
183 handles.stderr = Some(stderr.as_raw_handle() as usize);
184 }
185 self.spawn_reader(
186 stdout,
187 StreamKind::Stdout,
188 StreamKind::Stdout,
189 self.pipe_done_callback(StreamKind::Stdout),
190 );
191 self.spawn_reader(
192 stderr,
193 StreamKind::Stderr,
194 match self.config.stderr_mode {
195 StderrMode::Stdout => StreamKind::Stdout,
196 StderrMode::Pipe => StreamKind::Stderr,
197 },
198 self.pipe_done_callback(StreamKind::Stderr),
199 );
200 }
201 *guard = Some(ChildState {
202 child,
203 #[cfg(windows)]
204 _job: job,
205 });
206 drop(guard);
207 self.spawn_exit_waiter();
208 Ok(())
209 }
210
211 fn spawn_exit_waiter(&self) {
214 let child = Arc::clone(&self.child);
215 let shared = Arc::clone(&self.shared);
216 thread::spawn(move || loop {
217 if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
218 return;
219 }
220 {
221 let mut guard = child.lock().expect("child mutex poisoned");
222 if let Some(child_state) = guard.as_mut() {
223 match child_state.child.try_wait() {
224 Ok(Some(status)) => {
225 let code = exit_code(status);
226 shared.returncode.store(code as i64, Ordering::Release);
227 shared.condvar.notify_all();
228 return;
229 }
230 Ok(None) => {}
231 Err(_) => return,
232 }
233 } else {
234 return;
235 }
236 }
237 thread::sleep(Duration::from_millis(10));
243 });
244 }
245
246 pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
247 let mut guard = self.child.lock().expect("child mutex poisoned");
248 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
249 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
250 use std::io::Write;
251 stdin.write_all(data).map_err(ProcessError::Io)?;
252 stdin.flush().map_err(ProcessError::Io)?;
253 drop(child.stdin.take());
254 Ok(())
255 }
256
257 pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
262 let mut guard = self.child.lock().expect("child mutex poisoned");
263 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
264 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
265 use std::io::Write;
266 stdin.write_all(data).map_err(ProcessError::Io)?;
267 stdin.flush().map_err(ProcessError::Io)?;
268 Ok(())
269 }
270
271 pub fn close_stdin(&self) -> Result<(), ProcessError> {
274 let mut guard = self.child.lock().expect("child mutex poisoned");
275 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
276 drop(child.stdin.take());
277 Ok(())
278 }
279
280 pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
281 if let Some(code) = self.returncode() {
283 return Ok(Some(code));
284 }
285 let mut guard = self.child.lock().expect("child mutex poisoned");
286 let Some(child_state) = guard.as_mut() else {
287 return Ok(self.returncode());
288 };
289 let child = &mut child_state.child;
290 let status = child.try_wait().map_err(ProcessError::Io)?;
291 if let Some(status) = status {
292 let code = exit_code(status);
293 self.set_returncode(code);
294 return Ok(Some(code));
295 }
296 Ok(None)
297 }
298
299 #[inline(never)]
301 pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
302 public_symbols::rp_native_process_wait_public(self, timeout)
303 }
304
305 fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
306 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
307 if self.child.lock().expect("child mutex poisoned").is_none() {
308 return self.returncode().ok_or(ProcessError::NotRunning);
309 }
310 if let Some(code) = self.returncode() {
312 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
313 return Ok(code);
314 }
315 let start = Instant::now();
316 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
317 loop {
318 let rc = self.shared.returncode.load(Ordering::Acquire);
320 if rc != RETURNCODE_NOT_SET {
321 drop(guard);
322 let code = rc as i32;
323 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
324 return Ok(code);
325 }
326 if let Some(limit) = timeout {
327 let elapsed = start.elapsed();
328 if elapsed >= limit {
329 return Err(ProcessError::Timeout);
330 }
331 let remaining = limit - elapsed;
332 let wait_time = remaining.min(Duration::from_millis(50));
334 guard = self
335 .shared
336 .condvar
337 .wait_timeout(guard, wait_time)
338 .expect("queue mutex poisoned")
339 .0;
340 } else {
341 guard = self
343 .shared
344 .condvar
345 .wait_timeout(guard, Duration::from_millis(50))
346 .expect("queue mutex poisoned")
347 .0;
348 }
349 }
350 }
351
352 #[inline(never)]
354 pub fn kill(&self) -> Result<(), ProcessError> {
355 public_symbols::rp_native_process_kill_public(self)
356 }
357
358 fn kill_impl(&self) -> Result<(), ProcessError> {
359 crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
360 {
361 let mut guard = self.child.lock().expect("child mutex poisoned");
362 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
363 child.kill().map_err(ProcessError::Io)?;
364 let status = child.wait().map_err(ProcessError::Io)?;
365 self.set_returncode(exit_code(status));
366 }
367 #[cfg(windows)]
375 self.cancel_capture_io();
376 public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
389 self,
390 kill_drain_deadline(),
391 );
392 Ok(())
393 }
394
395 pub fn terminate(&self) -> Result<(), ProcessError> {
396 self.kill()
397 }
398
399 pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
414 #[cfg(unix)]
415 {
416 if !self.config.create_process_group {
417 return Ok(());
418 }
419 let pid = match self.pid() {
420 Some(p) => p as i32,
421 None => return Err(ProcessError::NotRunning),
422 };
423 let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
424 if result != 0 {
425 let err = std::io::Error::last_os_error();
426 if err.raw_os_error() != Some(libc::ESRCH) {
427 return Err(ProcessError::Io(err));
428 }
429 }
430 Ok(())
431 }
432 #[cfg(windows)]
433 {
434 if !self.config.create_process_group {
435 return Ok(());
440 }
441 let pid = match self.pid() {
442 Some(p) => p,
443 None => return Err(ProcessError::NotRunning),
444 };
445 let ok = unsafe {
449 winapi::um::wincon::GenerateConsoleCtrlEvent(
450 winapi::um::wincon::CTRL_BREAK_EVENT,
451 pid,
452 )
453 };
454 if ok == 0 {
455 let err = std::io::Error::last_os_error();
456 if err.raw_os_error() != Some(6) {
462 return Err(ProcessError::Io(err));
463 }
464 }
465 Ok(())
466 }
467 }
468
469 #[inline(never)]
471 pub fn close(&self) -> Result<(), ProcessError> {
472 public_symbols::rp_native_process_close_public(self)
473 }
474
475 fn close_impl(&self) -> Result<(), ProcessError> {
476 crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
477 if self.child.lock().expect("child mutex poisoned").is_none() {
478 return Ok(());
479 }
480 if self.poll()?.is_none() {
481 self.kill()?;
482 } else {
483 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
484 }
485 Ok(())
486 }
487
488 pub fn pid(&self) -> Option<u32> {
489 self.child
490 .lock()
491 .expect("child mutex poisoned")
492 .as_ref()
493 .map(|state| state.child.id())
494 }
495
496 pub fn returncode(&self) -> Option<i32> {
497 let v = self.shared.returncode.load(Ordering::Acquire);
498 if v == RETURNCODE_NOT_SET {
499 None
500 } else {
501 Some(v as i32)
502 }
503 }
504
505 pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
506 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
507 return false;
508 }
509 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
510 match stream {
511 StreamKind::Stdout => !guard.stdout_queue.is_empty(),
512 StreamKind::Stderr => !guard.stderr_queue.is_empty(),
513 }
514 }
515
516 pub fn has_pending_combined(&self) -> bool {
517 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
518 !guard.combined_queue.is_empty()
519 }
520
521 pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
522 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
523 return Vec::new();
524 }
525 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
526 let queue = match stream {
527 StreamKind::Stdout => &mut guard.stdout_queue,
528 StreamKind::Stderr => &mut guard.stderr_queue,
529 };
530 queue.drain(..).collect()
531 }
532
533 pub fn drain_combined(&self) -> Vec<StreamEvent> {
534 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
535 guard.combined_queue.drain(..).collect()
536 }
537
538 pub fn read_stream(
539 &self,
540 stream: StreamKind,
541 timeout: Option<Duration>,
542 ) -> ReadStatus<Vec<u8>> {
543 let deadline = timeout.map(|limit| Instant::now() + limit);
544 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
545
546 loop {
547 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
548 return ReadStatus::Eof;
549 }
550
551 let queue = match stream {
552 StreamKind::Stdout => &mut guard.stdout_queue,
553 StreamKind::Stderr => &mut guard.stderr_queue,
554 };
555 if let Some(line) = queue.pop_front() {
556 return ReadStatus::Line(line);
557 }
558
559 let closed = match stream {
560 StreamKind::Stdout => {
561 if self.config.stderr_mode == StderrMode::Stdout {
562 guard.stdout_closed && guard.stderr_closed
563 } else {
564 guard.stdout_closed
565 }
566 }
567 StreamKind::Stderr => guard.stderr_closed,
568 };
569 if closed {
570 return ReadStatus::Eof;
571 }
572
573 match deadline {
574 Some(deadline) => {
575 let now = Instant::now();
576 if now >= deadline {
577 return ReadStatus::Timeout;
578 }
579 let wait = deadline.saturating_duration_since(now);
580 let result = self
581 .shared
582 .condvar
583 .wait_timeout(guard, wait)
584 .expect("queue mutex poisoned");
585 guard = result.0;
586 if result.1.timed_out() {
587 return ReadStatus::Timeout;
588 }
589 }
590 None => {
591 guard = self
592 .shared
593 .condvar
594 .wait(guard)
595 .expect("queue mutex poisoned");
596 }
597 }
598 }
599 }
600
601 #[inline(never)]
603 pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
604 public_symbols::rp_native_process_read_combined_public(self, timeout)
605 }
606
607 fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
608 crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
609 let deadline = timeout.map(|limit| Instant::now() + limit);
610 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
611
612 loop {
613 if let Some(event) = guard.combined_queue.pop_front() {
614 return ReadStatus::Line(event);
615 }
616 if guard.stdout_closed && guard.stderr_closed {
617 return ReadStatus::Eof;
618 }
619
620 match deadline {
621 Some(deadline) => {
622 let now = Instant::now();
623 if now >= deadline {
624 return ReadStatus::Timeout;
625 }
626 let wait = deadline.saturating_duration_since(now);
627 let result = self
628 .shared
629 .condvar
630 .wait_timeout(guard, wait)
631 .expect("queue mutex poisoned");
632 guard = result.0;
633 if result.1.timed_out() {
634 return ReadStatus::Timeout;
635 }
636 }
637 None => {
638 guard = self
639 .shared
640 .condvar
641 .wait(guard)
642 .expect("queue mutex poisoned");
643 }
644 }
645 }
646 }
647
648 pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
649 self.shared
650 .queues
651 .lock()
652 .expect("queue mutex poisoned")
653 .stdout_history
654 .clone()
655 .into_iter()
656 .collect()
657 }
658
659 pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
660 if self.config.stderr_mode == StderrMode::Stdout {
661 return Vec::new();
662 }
663 self.shared
664 .queues
665 .lock()
666 .expect("queue mutex poisoned")
667 .stderr_history
668 .clone()
669 .into_iter()
670 .collect()
671 }
672
673 pub fn captured_combined(&self) -> Vec<StreamEvent> {
674 self.shared
675 .queues
676 .lock()
677 .expect("queue mutex poisoned")
678 .combined_history
679 .clone()
680 .into_iter()
681 .collect()
682 }
683
684 pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
685 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
686 return 0;
687 }
688 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
689 match stream {
690 StreamKind::Stdout => guard.stdout_history_bytes,
691 StreamKind::Stderr => guard.stderr_history_bytes,
692 }
693 }
694
695 pub fn captured_combined_bytes(&self) -> usize {
696 self.shared
697 .queues
698 .lock()
699 .expect("queue mutex poisoned")
700 .combined_history_bytes
701 }
702
703 pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
704 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
705 return 0;
706 }
707 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
708 match stream {
709 StreamKind::Stdout => {
710 let released = guard.stdout_history_bytes;
711 guard.stdout_history.clear();
712 guard.stdout_history_bytes = 0;
713 released
714 }
715 StreamKind::Stderr => {
716 let released = guard.stderr_history_bytes;
717 guard.stderr_history.clear();
718 guard.stderr_history_bytes = 0;
719 released
720 }
721 }
722 }
723
724 pub fn clear_captured_combined(&self) -> usize {
725 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
726 let released = guard.combined_history_bytes;
727 guard.combined_history.clear();
728 guard.combined_history_bytes = 0;
729 released
730 }
731
732 fn build_command(&self) -> Command {
733 let mut command = match &self.config.command {
734 CommandSpec::Shell(command) => shell_command(command),
735 CommandSpec::Argv(argv) => {
736 let mut command = Command::new(&argv[0]);
737 if argv.len() > 1 {
738 command.args(&argv[1..]);
739 }
740 command
741 }
742 };
743 if let Some(cwd) = &self.config.cwd {
744 command.current_dir(cwd);
745 }
746 if let Some(env) = &self.config.env {
747 command.env_clear();
748 command.envs(env.iter().map(|(k, v)| (k, v)));
749 }
750 #[cfg(windows)]
751 {
752 use std::os::windows::process::CommandExt;
753
754 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
759 let extra = if self.config.create_process_group {
760 CREATE_NEW_PROCESS_GROUP
761 } else {
762 0
763 };
764 let flags = self.config.creationflags.unwrap_or(0)
765 | extra
766 | windows_priority_flags(self.config.nice);
767 if flags != 0 {
768 command.creation_flags(flags);
769 }
770 }
771 #[cfg(unix)]
772 {
773 let create_process_group = self.config.create_process_group;
774 let nice = self.config.nice;
775
776 if create_process_group || nice.is_some() {
777 use std::os::unix::process::CommandExt;
778
779 unsafe {
780 command.pre_exec(move || {
781 if create_process_group && libc::setpgid(0, 0) == -1 {
782 return Err(std::io::Error::last_os_error());
783 }
784 if let Some(nice) = nice {
785 let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
786 if result == -1 {
787 return Err(std::io::Error::last_os_error());
788 }
789 }
790 Ok(())
791 });
792 }
793 }
794 }
795 command
796 }
797
798 fn spawn_reader<R>(
799 &self,
800 pipe: R,
801 source_stream: StreamKind,
802 visible_stream: StreamKind,
803 on_pipe_done: Box<dyn FnOnce() + Send>,
804 ) where
805 R: Read + Send + 'static,
806 {
807 let shared = Arc::clone(&self.shared);
808 thread::spawn(move || {
809 let mut reader = pipe;
810 let mut chunk = vec![0_u8; 65536];
811 let mut pending = Vec::new();
812
813 loop {
814 match reader.read(&mut chunk) {
815 Ok(0) => break,
816 Ok(n) => {
817 let lines = feed_chunk(&mut pending, &chunk[..n]);
818 emit_lines(&shared, visible_stream, lines);
819 }
820 Err(_) => break,
821 }
822 }
823
824 if !pending.is_empty() {
825 emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
826 }
827
828 on_pipe_done();
833 drop(reader);
834
835 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
836 match source_stream {
837 StreamKind::Stdout => guard.stdout_closed = true,
838 StreamKind::Stderr => guard.stderr_closed = true,
839 }
840 shared.condvar.notify_all();
841 });
842 }
843
844 #[cfg(windows)]
845 fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
846 let handles = Arc::clone(&self.capture_pipe_handles);
847 Box::new(move || {
848 let mut guard = handles
849 .lock()
850 .expect("capture pipe handles mutex poisoned");
851 match stream {
852 StreamKind::Stdout => guard.stdout = None,
853 StreamKind::Stderr => guard.stderr = None,
854 }
855 })
856 }
857
858 #[cfg(not(windows))]
859 fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
860 Box::new(|| {})
861 }
862
863 #[cfg(windows)]
869 fn cancel_capture_io(&self) {
870 crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
871 use winapi::shared::ntdef::HANDLE;
872 use winapi::um::ioapiset::CancelIoEx;
873 let guard = self
874 .capture_pipe_handles
875 .lock()
876 .expect("capture pipe handles mutex poisoned");
877 if let Some(h) = guard.stdout {
878 unsafe {
885 CancelIoEx(h as HANDLE, std::ptr::null_mut());
886 }
887 }
888 if let Some(h) = guard.stderr {
889 unsafe {
890 CancelIoEx(h as HANDLE, std::ptr::null_mut());
891 }
892 }
893 }
894
895 fn set_returncode(&self, code: i32) {
896 self.shared.returncode.store(code as i64, Ordering::Release);
897 self.shared.condvar.notify_all();
898 }
899
900 fn wait_for_capture_completion_impl(&self) {
901 crate::rp_rust_debug_scope!(
902 "running_process::NativeProcess::wait_for_capture_completion"
903 );
904 if !self.config.capture {
905 return;
906 }
907
908 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
909 while !(guard.stdout_closed && guard.stderr_closed) {
910 guard = self
911 .shared
912 .condvar
913 .wait(guard)
914 .expect("queue mutex poisoned");
915 }
916 }
917
918 fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
926 crate::rp_rust_debug_scope!(
927 "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
928 );
929 if !self.config.capture {
930 return true;
931 }
932
933 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
934 while !(guard.stdout_closed && guard.stderr_closed) {
935 let now = Instant::now();
936 if now >= deadline {
937 guard.stdout_closed = true;
938 guard.stderr_closed = true;
939 self.shared.condvar.notify_all();
940 return false;
941 }
942 let (next_guard, result) = self
943 .shared
944 .condvar
945 .wait_timeout(guard, deadline - now)
946 .expect("queue mutex poisoned");
947 guard = next_guard;
948 if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
949 guard.stdout_closed = true;
950 guard.stderr_closed = true;
951 self.shared.condvar.notify_all();
952 return false;
953 }
954 }
955 true
956 }
957}
958
959fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
960 if lines.is_empty() {
961 return;
962 }
963 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
964 for line in lines {
965 let line_len = line.len();
966 match stream {
967 StreamKind::Stdout => {
968 guard.stdout_history_bytes += line_len;
969 guard.stdout_history.push_back(line.clone());
970 guard.stdout_queue.push_back(line.clone());
971 }
972 StreamKind::Stderr => {
973 guard.stderr_history_bytes += line_len;
974 guard.stderr_history.push_back(line.clone());
975 guard.stderr_queue.push_back(line.clone());
976 }
977 }
978 let event = StreamEvent { stream, line };
979 guard.combined_history_bytes += line_len;
980 guard.combined_history.push_back(event.clone());
981 guard.combined_queue.push_back(event);
982 }
983 shared.condvar.notify_all();
984}
985
986pub(crate) fn shell_command(command: &str) -> Command {
987 #[cfg(windows)]
988 {
989 use std::os::windows::process::CommandExt;
990
991 let mut cmd = Command::new("cmd");
992 cmd.raw_arg("/D /S /C \"");
993 cmd.raw_arg(command);
994 cmd.raw_arg("\"");
995 cmd
996 }
997 #[cfg(not(windows))]
998 {
999 let mut cmd = Command::new("sh");
1000 cmd.arg("-lc").arg(command);
1001 cmd
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests;