1use std::collections::VecDeque;
10use std::io::Read;
11use std::process::{Child, Command, Stdio};
12use std::sync::atomic::{AtomicI64, Ordering};
13use std::sync::{Arc, Condvar, Mutex};
14use std::thread;
15use std::time::{Duration, Instant};
16
17pub mod console_detect;
18pub mod containment;
19mod helpers;
20#[cfg(feature = "originator-scan")]
21pub mod originator;
22#[cfg(feature = "client")]
27pub mod proto {
29 #[allow(missing_docs)]
31 pub mod daemon {
32 include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
33 }
34}
35
36#[cfg(feature = "client")]
37pub mod client;
38
39#[cfg(feature = "client")]
44pub mod broker;
45
46#[cfg(feature = "client")]
52pub mod maintenance;
53
54#[cfg(feature = "client")]
55pub mod cleanup;
56
57#[cfg(feature = "telemetry")]
60#[path = "daemon/telemetry.rs"]
61pub mod telemetry;
62
63#[cfg(feature = "daemon")]
66pub mod daemon;
68pub mod pty;
70mod public_symbols;
71mod rust_debug;
72pub mod spawn;
73pub mod terminal_graphics;
74mod types;
75#[cfg(unix)]
76mod unix;
77#[cfg(windows)]
78mod windows;
79
80pub use console_detect::{ConsoleWindowInfo, monitor_console_windows};
81pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
82#[cfg(feature = "originator-scan")]
83pub use originator::{OriginatorProcessInfo, find_processes_by_originator};
84pub use rust_debug::{RustDebugScopeGuard, render_rust_debug_traces};
85pub use spawn::{
86 DaemonChild, SpawnStdio, SpawnedChild, StdioSource, spawn, spawn_daemon,
87 spawn_daemon_with_clear_env,
88};
89pub use terminal_graphics::{
90 CapabilityStatus, EvidenceStrength, GraphicsCapability, GraphicsProtocol, TerminalCapabilities,
91 TerminalCapabilityInput, TerminalGraphicsCapabilities, TerminalProbeEvidence,
92 current_terminal_capabilities, current_terminal_capabilities_with_timeout,
93 detect_terminal_capabilities,
94};
95pub use types::{
96 CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
97 StreamEvent, StreamKind,
98};
99
100pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
101#[cfg(unix)]
102pub use unix::{UnixSignal, unix_set_priority, unix_signal_process, unix_signal_process_group};
103#[cfg(windows)]
104pub(crate) use windows::{
105 CapturePipeHandles, WindowsJobHandle, assign_child_to_windows_kill_on_close_job_impl,
106 windows_priority_flags,
107};
108
109#[macro_export]
110macro_rules! rp_rust_debug_scope {
112 ($label:expr) => {
113 let _running_process_rust_debug_scope =
114 $crate::RustDebugScopeGuard::enter($label, file!(), line!());
115 };
116}
117
118#[derive(Default)]
119struct QueueState {
120 stdout_queue: VecDeque<Vec<u8>>,
121 stderr_queue: VecDeque<Vec<u8>>,
122 combined_queue: VecDeque<StreamEvent>,
123 stdout_history: VecDeque<Vec<u8>>,
124 stderr_history: VecDeque<Vec<u8>>,
125 combined_history: VecDeque<StreamEvent>,
126 stdout_raw: Vec<u8>,
127 stderr_raw: Vec<u8>,
128 stdout_history_bytes: usize,
129 stderr_history_bytes: usize,
130 combined_history_bytes: usize,
131 stdout_closed: bool,
132 stderr_closed: bool,
133}
134
135const RETURNCODE_NOT_SET: i64 = i64::MIN;
137
138struct SharedState {
139 queues: Mutex<QueueState>,
140 condvar: Condvar,
141 returncode: AtomicI64,
144}
145
146struct ChildState {
147 child: Child,
148 #[cfg(windows)]
149 _job: WindowsJobHandle,
150}
151
152impl SharedState {
153 fn new(capture: bool) -> Self {
154 let queues = QueueState {
155 stdout_closed: !capture,
156 stderr_closed: !capture,
157 ..QueueState::default()
158 };
159 Self {
160 queues: Mutex::new(queues),
161 condvar: Condvar::new(),
162 returncode: AtomicI64::new(RETURNCODE_NOT_SET),
163 }
164 }
165}
166
167pub struct NativeProcess {
174 config: ProcessConfig,
175 child: Arc<Mutex<Option<ChildState>>>,
176 shared: Arc<SharedState>,
177 #[cfg(windows)]
178 capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
179}
180
181impl NativeProcess {
182 pub fn new(config: ProcessConfig) -> Self {
186 Self {
187 shared: Arc::new(SharedState::new(config.capture)),
188 child: Arc::new(Mutex::new(None)),
189 config,
190 #[cfg(windows)]
191 capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
192 }
193 }
194
195 #[inline(never)]
197 pub fn start(&self) -> Result<(), ProcessError> {
202 public_symbols::rp_native_process_start_public(self)
203 }
204
205 fn start_impl(&self) -> Result<(), ProcessError> {
206 crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
207 let mut guard = self.child.lock().expect("child mutex poisoned");
208 if guard.is_some() {
209 return Err(ProcessError::AlreadyStarted);
210 }
211
212 let mut command = self.build_command();
213 match self.config.stdin_mode {
214 StdinMode::Inherit => {}
215 StdinMode::Piped => {
216 command.stdin(Stdio::piped());
217 }
218 StdinMode::Null => {
219 command.stdin(Stdio::null());
220 }
221 }
222 if self.config.capture {
223 command.stdout(Stdio::piped());
224 command.stderr(Stdio::piped());
225 }
226
227 let mut child = command.spawn().map_err(ProcessError::Spawn)?;
228 log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
229 #[cfg(windows)]
230 let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
231 .map_err(ProcessError::Spawn)?;
232 if self.config.capture {
233 let stdout = child.stdout.take().expect("stdout pipe missing");
234 let stderr = child.stderr.take().expect("stderr pipe missing");
235 #[cfg(windows)]
236 {
237 use std::os::windows::io::AsRawHandle;
238 let mut handles = self
239 .capture_pipe_handles
240 .lock()
241 .expect("capture pipe handles mutex poisoned");
242 handles.stdout = Some(stdout.as_raw_handle() as usize);
243 handles.stderr = Some(stderr.as_raw_handle() as usize);
244 }
245 self.spawn_reader(
246 stdout,
247 StreamKind::Stdout,
248 StreamKind::Stdout,
249 self.pipe_done_callback(StreamKind::Stdout),
250 );
251 self.spawn_reader(
252 stderr,
253 StreamKind::Stderr,
254 match self.config.stderr_mode {
255 StderrMode::Stdout => StreamKind::Stdout,
256 StderrMode::Pipe => StreamKind::Stderr,
257 },
258 self.pipe_done_callback(StreamKind::Stderr),
259 );
260 }
261 *guard = Some(ChildState {
262 child,
263 #[cfg(windows)]
264 _job: job,
265 });
266 drop(guard);
267 self.spawn_exit_waiter();
268 Ok(())
269 }
270
271 fn spawn_exit_waiter(&self) {
274 let child = Arc::clone(&self.child);
275 let shared = Arc::clone(&self.shared);
276 thread::spawn(move || {
277 loop {
278 if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
279 return;
280 }
281 {
282 let mut guard = child.lock().expect("child mutex poisoned");
283 if let Some(child_state) = guard.as_mut() {
284 match child_state.child.try_wait() {
285 Ok(Some(status)) => {
286 let code = exit_code(status);
287 shared.returncode.store(code as i64, Ordering::Release);
288 shared.condvar.notify_all();
289 return;
290 }
291 Ok(None) => {}
292 Err(_) => return,
293 }
294 } else {
295 return;
296 }
297 }
298 thread::sleep(Duration::from_millis(10));
304 }
305 });
306 }
307
308 pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
310 let mut guard = self.child.lock().expect("child mutex poisoned");
311 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
312 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
313 use std::io::Write;
314 stdin.write_all(data).map_err(ProcessError::Io)?;
315 stdin.flush().map_err(ProcessError::Io)?;
316 drop(child.stdin.take());
317 Ok(())
318 }
319
320 pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
325 let mut guard = self.child.lock().expect("child mutex poisoned");
326 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
327 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
328 use std::io::Write;
329 stdin.write_all(data).map_err(ProcessError::Io)?;
330 stdin.flush().map_err(ProcessError::Io)?;
331 Ok(())
332 }
333
334 pub fn close_stdin(&self) -> Result<(), ProcessError> {
337 let mut guard = self.child.lock().expect("child mutex poisoned");
338 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
339 drop(child.stdin.take());
340 Ok(())
341 }
342
343 pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
347 if let Some(code) = self.returncode() {
349 return Ok(Some(code));
350 }
351 let mut guard = self.child.lock().expect("child mutex poisoned");
352 let Some(child_state) = guard.as_mut() else {
353 return Ok(self.returncode());
354 };
355 let child = &mut child_state.child;
356 let status = child.try_wait().map_err(ProcessError::Io)?;
357 if let Some(status) = status {
358 let code = exit_code(status);
359 self.set_returncode(code);
360 return Ok(Some(code));
361 }
362 Ok(None)
363 }
364
365 #[inline(never)]
367 pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
372 public_symbols::rp_native_process_wait_public(self, timeout)
373 }
374
375 fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
376 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
377 if self.child.lock().expect("child mutex poisoned").is_none() {
378 return self.returncode().ok_or(ProcessError::NotRunning);
379 }
380 if let Some(code) = self.returncode() {
382 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
383 return Ok(code);
384 }
385 let start = Instant::now();
386 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
387 loop {
388 let rc = self.shared.returncode.load(Ordering::Acquire);
390 if rc != RETURNCODE_NOT_SET {
391 drop(guard);
392 let code = rc as i32;
393 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
394 return Ok(code);
395 }
396 if let Some(limit) = timeout {
397 let elapsed = start.elapsed();
398 if elapsed >= limit {
399 return Err(ProcessError::Timeout);
400 }
401 let remaining = limit - elapsed;
402 let wait_time = remaining.min(Duration::from_millis(50));
404 guard = self
405 .shared
406 .condvar
407 .wait_timeout(guard, wait_time)
408 .expect("queue mutex poisoned")
409 .0;
410 } else {
411 guard = self
413 .shared
414 .condvar
415 .wait_timeout(guard, Duration::from_millis(50))
416 .expect("queue mutex poisoned")
417 .0;
418 }
419 }
420 }
421
422 #[inline(never)]
424 pub fn kill(&self) -> Result<(), ProcessError> {
426 public_symbols::rp_native_process_kill_public(self)
427 }
428
429 fn kill_impl(&self) -> Result<(), ProcessError> {
430 crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
431 {
432 let mut guard = self.child.lock().expect("child mutex poisoned");
433 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
434 child.kill().map_err(ProcessError::Io)?;
435 let status = child.wait().map_err(ProcessError::Io)?;
436 self.set_returncode(exit_code(status));
437 }
438 #[cfg(windows)]
446 self.cancel_capture_io();
447 public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
460 self,
461 kill_drain_deadline(),
462 );
463 Ok(())
464 }
465
466 pub fn terminate(&self) -> Result<(), ProcessError> {
470 self.kill()
471 }
472
473 pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
488 #[cfg(unix)]
489 {
490 if !self.config.create_process_group {
491 return Ok(());
492 }
493 let pid = match self.pid() {
494 Some(p) => p as i32,
495 None => return Err(ProcessError::NotRunning),
496 };
497 let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
498 if result != 0 {
499 let err = std::io::Error::last_os_error();
500 if err.raw_os_error() != Some(libc::ESRCH) {
501 return Err(ProcessError::Io(err));
502 }
503 }
504 Ok(())
505 }
506 #[cfg(windows)]
507 {
508 if !self.config.create_process_group {
509 return Ok(());
514 }
515 let pid = match self.pid() {
516 Some(p) => p,
517 None => return Err(ProcessError::NotRunning),
518 };
519 let ok = unsafe {
523 winapi::um::wincon::GenerateConsoleCtrlEvent(
524 winapi::um::wincon::CTRL_BREAK_EVENT,
525 pid,
526 )
527 };
528 if ok == 0 {
529 let err = std::io::Error::last_os_error();
530 if err.raw_os_error() != Some(6) {
536 return Err(ProcessError::Io(err));
537 }
538 }
539 Ok(())
540 }
541 }
542
543 #[inline(never)]
545 pub fn close(&self) -> Result<(), ProcessError> {
547 public_symbols::rp_native_process_close_public(self)
548 }
549
550 fn close_impl(&self) -> Result<(), ProcessError> {
551 crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
552 if self.child.lock().expect("child mutex poisoned").is_none() {
553 return Ok(());
554 }
555 if self.poll()?.is_none() {
556 self.kill()?;
557 } else {
558 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
559 }
560 Ok(())
561 }
562
563 pub fn pid(&self) -> Option<u32> {
565 self.child
566 .lock()
567 .expect("child mutex poisoned")
568 .as_ref()
569 .map(|state| state.child.id())
570 }
571
572 pub fn returncode(&self) -> Option<i32> {
574 let v = self.shared.returncode.load(Ordering::Acquire);
575 if v == RETURNCODE_NOT_SET {
576 None
577 } else {
578 Some(v as i32)
579 }
580 }
581
582 pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
584 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
585 return false;
586 }
587 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
588 match stream {
589 StreamKind::Stdout => !guard.stdout_queue.is_empty(),
590 StreamKind::Stderr => !guard.stderr_queue.is_empty(),
591 }
592 }
593
594 pub fn has_pending_combined(&self) -> bool {
596 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
597 !guard.combined_queue.is_empty()
598 }
599
600 pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
602 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
603 return Vec::new();
604 }
605 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
606 let queue = match stream {
607 StreamKind::Stdout => &mut guard.stdout_queue,
608 StreamKind::Stderr => &mut guard.stderr_queue,
609 };
610 queue.drain(..).collect()
611 }
612
613 pub fn drain_combined(&self) -> Vec<StreamEvent> {
615 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
616 guard.combined_queue.drain(..).collect()
617 }
618
619 pub fn read_stream(
624 &self,
625 stream: StreamKind,
626 timeout: Option<Duration>,
627 ) -> ReadStatus<Vec<u8>> {
628 let deadline = timeout.map(|limit| Instant::now() + limit);
629 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
630
631 loop {
632 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
633 return ReadStatus::Eof;
634 }
635
636 let queue = match stream {
637 StreamKind::Stdout => &mut guard.stdout_queue,
638 StreamKind::Stderr => &mut guard.stderr_queue,
639 };
640 if let Some(line) = queue.pop_front() {
641 return ReadStatus::Line(line);
642 }
643
644 let closed = match stream {
645 StreamKind::Stdout => {
646 if self.config.stderr_mode == StderrMode::Stdout {
647 guard.stdout_closed && guard.stderr_closed
648 } else {
649 guard.stdout_closed
650 }
651 }
652 StreamKind::Stderr => guard.stderr_closed,
653 };
654 if closed {
655 return ReadStatus::Eof;
656 }
657
658 match deadline {
659 Some(deadline) => {
660 let now = Instant::now();
661 if now >= deadline {
662 return ReadStatus::Timeout;
663 }
664 let wait = deadline.saturating_duration_since(now);
665 let result = self
666 .shared
667 .condvar
668 .wait_timeout(guard, wait)
669 .expect("queue mutex poisoned");
670 guard = result.0;
671 if result.1.timed_out() {
672 return ReadStatus::Timeout;
673 }
674 }
675 None => {
676 guard = self
677 .shared
678 .condvar
679 .wait(guard)
680 .expect("queue mutex poisoned");
681 }
682 }
683 }
684 }
685
686 #[inline(never)]
688 pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
690 public_symbols::rp_native_process_read_combined_public(self, timeout)
691 }
692
693 fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
694 crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
695 let deadline = timeout.map(|limit| Instant::now() + limit);
696 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
697
698 loop {
699 if let Some(event) = guard.combined_queue.pop_front() {
700 return ReadStatus::Line(event);
701 }
702 if guard.stdout_closed && guard.stderr_closed {
703 return ReadStatus::Eof;
704 }
705
706 match deadline {
707 Some(deadline) => {
708 let now = Instant::now();
709 if now >= deadline {
710 return ReadStatus::Timeout;
711 }
712 let wait = deadline.saturating_duration_since(now);
713 let result = self
714 .shared
715 .condvar
716 .wait_timeout(guard, wait)
717 .expect("queue mutex poisoned");
718 guard = result.0;
719 if result.1.timed_out() {
720 return ReadStatus::Timeout;
721 }
722 }
723 None => {
724 guard = self
725 .shared
726 .condvar
727 .wait(guard)
728 .expect("queue mutex poisoned");
729 }
730 }
731 }
732 }
733
734 pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
736 self.shared
737 .queues
738 .lock()
739 .expect("queue mutex poisoned")
740 .stdout_history
741 .clone()
742 .into_iter()
743 .collect()
744 }
745
746 fn captured_stdout_raw(&self) -> Vec<u8> {
747 self.shared
748 .queues
749 .lock()
750 .expect("queue mutex poisoned")
751 .stdout_raw
752 .clone()
753 }
754
755 pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
757 if self.config.stderr_mode == StderrMode::Stdout {
758 return Vec::new();
759 }
760 self.shared
761 .queues
762 .lock()
763 .expect("queue mutex poisoned")
764 .stderr_history
765 .clone()
766 .into_iter()
767 .collect()
768 }
769
770 fn captured_stderr_raw(&self) -> Vec<u8> {
771 if self.config.stderr_mode == StderrMode::Stdout {
772 return Vec::new();
773 }
774 self.shared
775 .queues
776 .lock()
777 .expect("queue mutex poisoned")
778 .stderr_raw
779 .clone()
780 }
781
782 pub fn captured_combined(&self) -> Vec<StreamEvent> {
784 self.shared
785 .queues
786 .lock()
787 .expect("queue mutex poisoned")
788 .combined_history
789 .clone()
790 .into_iter()
791 .collect()
792 }
793
794 pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
796 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
797 return 0;
798 }
799 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
800 match stream {
801 StreamKind::Stdout => guard.stdout_history_bytes,
802 StreamKind::Stderr => guard.stderr_history_bytes,
803 }
804 }
805
806 pub fn captured_combined_bytes(&self) -> usize {
808 self.shared
809 .queues
810 .lock()
811 .expect("queue mutex poisoned")
812 .combined_history_bytes
813 }
814
815 pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
817 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
818 return 0;
819 }
820 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
821 match stream {
822 StreamKind::Stdout => {
823 let released = guard.stdout_history_bytes;
824 guard.stdout_history.clear();
825 guard.stdout_raw.clear();
826 guard.stdout_history_bytes = 0;
827 released
828 }
829 StreamKind::Stderr => {
830 let released = guard.stderr_history_bytes;
831 guard.stderr_history.clear();
832 guard.stderr_raw.clear();
833 guard.stderr_history_bytes = 0;
834 released
835 }
836 }
837 }
838
839 pub fn clear_captured_combined(&self) -> usize {
841 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
842 let released = guard.combined_history_bytes;
843 guard.combined_history.clear();
844 guard.combined_history_bytes = 0;
845 released
846 }
847
848 fn build_command(&self) -> Command {
849 let mut command = match &self.config.command {
850 CommandSpec::Shell(command) => shell_command(command),
851 CommandSpec::Argv(argv) => {
852 let mut command = Command::new(&argv[0]);
853 if argv.len() > 1 {
854 command.args(&argv[1..]);
855 }
856 command
857 }
858 };
859 if let Some(cwd) = &self.config.cwd {
860 command.current_dir(cwd);
861 }
862 if let Some(env) = &self.config.env {
863 command.env_clear();
864 command.envs(env.iter().map(|(k, v)| (k, v)));
865 }
866 #[cfg(windows)]
867 {
868 use std::os::windows::process::CommandExt;
869
870 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
875 let extra = if self.config.create_process_group {
876 CREATE_NEW_PROCESS_GROUP
877 } else {
878 0
879 };
880 let flags = self.config.creationflags.unwrap_or(0)
881 | extra
882 | windows_priority_flags(self.config.nice);
883 if flags != 0 {
884 command.creation_flags(flags);
885 }
886 }
887 #[cfg(unix)]
888 {
889 let create_process_group = self.config.create_process_group;
890 let nice = self.config.nice;
891
892 if create_process_group || nice.is_some() {
893 use std::os::unix::process::CommandExt;
894
895 unsafe {
896 command.pre_exec(move || {
897 if create_process_group && libc::setpgid(0, 0) == -1 {
898 return Err(std::io::Error::last_os_error());
899 }
900 if let Some(nice) = nice {
901 let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
902 if result == -1 {
903 return Err(std::io::Error::last_os_error());
904 }
905 }
906 Ok(())
907 });
908 }
909 }
910 }
911 command
912 }
913
914 fn spawn_reader<R>(
915 &self,
916 pipe: R,
917 source_stream: StreamKind,
918 visible_stream: StreamKind,
919 on_pipe_done: Box<dyn FnOnce() + Send>,
920 ) where
921 R: Read + Send + 'static,
922 {
923 let shared = Arc::clone(&self.shared);
924 thread::spawn(move || {
925 let mut reader = pipe;
926 let mut chunk = vec![0_u8; 65536];
927 let mut pending = Vec::new();
928
929 loop {
930 match reader.read(&mut chunk) {
931 Ok(0) => break,
932 Ok(n) => {
933 append_raw(&shared, visible_stream, &chunk[..n]);
934 let lines = feed_chunk(&mut pending, &chunk[..n]);
935 emit_lines(&shared, visible_stream, lines);
936 }
937 Err(_) => break,
938 }
939 }
940
941 if !pending.is_empty() {
942 emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
943 }
944
945 on_pipe_done();
950 drop(reader);
951
952 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
953 match source_stream {
954 StreamKind::Stdout => guard.stdout_closed = true,
955 StreamKind::Stderr => guard.stderr_closed = true,
956 }
957 shared.condvar.notify_all();
958 });
959 }
960
961 #[cfg(windows)]
962 fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
963 let handles = Arc::clone(&self.capture_pipe_handles);
964 Box::new(move || {
965 let mut guard = handles.lock().expect("capture pipe handles mutex poisoned");
966 match stream {
967 StreamKind::Stdout => guard.stdout = None,
968 StreamKind::Stderr => guard.stderr = None,
969 }
970 })
971 }
972
973 #[cfg(not(windows))]
974 fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
975 Box::new(|| {})
976 }
977
978 #[cfg(windows)]
984 fn cancel_capture_io(&self) {
985 crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
986 use winapi::shared::ntdef::HANDLE;
987 use winapi::um::ioapiset::CancelIoEx;
988 let guard = self
989 .capture_pipe_handles
990 .lock()
991 .expect("capture pipe handles mutex poisoned");
992 if let Some(h) = guard.stdout {
993 unsafe {
1000 CancelIoEx(h as HANDLE, std::ptr::null_mut());
1001 }
1002 }
1003 if let Some(h) = guard.stderr {
1004 unsafe {
1005 CancelIoEx(h as HANDLE, std::ptr::null_mut());
1006 }
1007 }
1008 }
1009
1010 fn set_returncode(&self, code: i32) {
1011 self.shared.returncode.store(code as i64, Ordering::Release);
1012 self.shared.condvar.notify_all();
1013 }
1014
1015 fn wait_for_capture_completion_impl(&self) {
1016 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait_for_capture_completion");
1017 if !self.config.capture {
1018 return;
1019 }
1020
1021 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1022 while !(guard.stdout_closed && guard.stderr_closed) {
1023 guard = self
1024 .shared
1025 .condvar
1026 .wait(guard)
1027 .expect("queue mutex poisoned");
1028 }
1029 }
1030
1031 fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
1039 crate::rp_rust_debug_scope!(
1040 "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
1041 );
1042 if !self.config.capture {
1043 return true;
1044 }
1045
1046 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1047 while !(guard.stdout_closed && guard.stderr_closed) {
1048 let now = Instant::now();
1049 if now >= deadline {
1050 guard.stdout_closed = true;
1051 guard.stderr_closed = true;
1052 self.shared.condvar.notify_all();
1053 return false;
1054 }
1055 let (next_guard, result) = self
1056 .shared
1057 .condvar
1058 .wait_timeout(guard, deadline - now)
1059 .expect("queue mutex poisoned");
1060 guard = next_guard;
1061 if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
1062 guard.stdout_closed = true;
1063 guard.stderr_closed = true;
1064 self.shared.condvar.notify_all();
1065 return false;
1066 }
1067 }
1068 true
1069 }
1070}
1071
1072fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
1073 if lines.is_empty() {
1074 return;
1075 }
1076 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1077 for line in lines {
1078 let line_len = line.len();
1079 match stream {
1080 StreamKind::Stdout => {
1081 guard.stdout_history_bytes += line_len;
1082 guard.stdout_history.push_back(line.clone());
1083 guard.stdout_queue.push_back(line.clone());
1084 }
1085 StreamKind::Stderr => {
1086 guard.stderr_history_bytes += line_len;
1087 guard.stderr_history.push_back(line.clone());
1088 guard.stderr_queue.push_back(line.clone());
1089 }
1090 }
1091 let event = StreamEvent { stream, line };
1092 guard.combined_history_bytes += line_len;
1093 guard.combined_history.push_back(event.clone());
1094 guard.combined_queue.push_back(event);
1095 }
1096 shared.condvar.notify_all();
1097}
1098
1099fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1100 if chunk.is_empty() {
1101 return;
1102 }
1103 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1104 match stream {
1105 StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1106 StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1107 }
1108}
1109
1110pub fn run_command(
1116 mut config: ProcessConfig,
1117 timeout: Option<Duration>,
1118) -> Result<RunOutput, ProcessError> {
1119 config.capture = true;
1120 let process = NativeProcess::new(config);
1121 process.start()?;
1122
1123 let exit_code = match process.wait(timeout) {
1124 Ok(code) => code,
1125 Err(ProcessError::Timeout) => {
1126 match process.kill() {
1127 Ok(()) | Err(ProcessError::NotRunning) => {}
1128 Err(error) => return Err(error),
1129 }
1130 return Err(ProcessError::Timeout);
1131 }
1132 Err(error) => return Err(error),
1133 };
1134
1135 Ok(RunOutput {
1136 stdout: process.captured_stdout_raw(),
1137 stderr: process.captured_stderr_raw(),
1138 exit_code,
1139 })
1140}
1141
1142pub(crate) fn shell_command(command: &str) -> Command {
1143 #[cfg(windows)]
1144 {
1145 use std::os::windows::process::CommandExt;
1146
1147 let mut cmd = Command::new("cmd");
1148 cmd.raw_arg("/D /S /C \"");
1149 cmd.raw_arg(command);
1150 cmd.raw_arg("\"");
1151 cmd
1152 }
1153 #[cfg(not(windows))]
1154 {
1155 let mut cmd = Command::new("sh");
1156 cmd.arg("-lc").arg(command);
1157 cmd
1158 }
1159}
1160
1161#[cfg(test)]
1162mod tests;