1use std::collections::VecDeque;
10use std::io::Read;
11use std::process::{Child, Command, Stdio};
12use std::sync::atomic::{AtomicI64, Ordering};
13use std::sync::{Arc, Condvar, Mutex};
14use std::thread;
15use std::time::{Duration, Instant};
16
17pub mod console_detect;
18pub mod containment;
19mod helpers;
20#[cfg(feature = "originator-scan")]
21pub mod originator;
22#[cfg(feature = "client")]
27pub mod proto {
29 #[allow(missing_docs)]
31 pub mod daemon {
32 include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
33 }
34}
35
36#[cfg(feature = "client")]
37pub mod client;
38
39#[cfg(feature = "client")]
44pub mod broker;
45
46#[cfg(feature = "client")]
52pub mod maintenance;
53
54#[cfg(feature = "client")]
55pub mod cleanup;
56
57#[cfg(feature = "test-support")]
62pub mod test_support;
63
64#[cfg(feature = "telemetry")]
67#[path = "daemon/telemetry.rs"]
68pub mod telemetry;
69
70#[cfg(feature = "daemon")]
73pub mod daemon;
75pub mod pty;
77mod public_symbols;
78mod rust_debug;
79pub mod spawn;
80pub mod systemd_killmode;
81pub mod terminal_graphics;
82mod types;
83#[cfg(unix)]
84mod unix;
85#[cfg(windows)]
86mod windows;
87
88pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
89pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
90#[cfg(feature = "originator-scan")]
91pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
92pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
93pub use spawn::{
94 spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
95 StdioSource,
96};
97pub use terminal_graphics::{
98 current_terminal_capabilities, current_terminal_capabilities_with_timeout,
99 detect_terminal_capabilities, CapabilityStatus, EvidenceStrength, GraphicsCapability,
100 GraphicsProtocol, TerminalCapabilities, TerminalCapabilityInput, TerminalGraphicsCapabilities,
101 TerminalProbeEvidence,
102};
103pub use types::{
104 CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
105 StreamEvent, StreamKind,
106};
107
108pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
109#[cfg(unix)]
110pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
111#[cfg(windows)]
112pub(crate) use windows::{
113 assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
114 WindowsJobHandle,
115};
116
117#[macro_export]
118macro_rules! rp_rust_debug_scope {
120 ($label:expr) => {
121 let _running_process_rust_debug_scope =
122 $crate::RustDebugScopeGuard::enter($label, file!(), line!());
123 };
124}
125
126#[derive(Default)]
127struct QueueState {
128 stdout_queue: VecDeque<Vec<u8>>,
129 stderr_queue: VecDeque<Vec<u8>>,
130 combined_queue: VecDeque<StreamEvent>,
131 stdout_history: VecDeque<Vec<u8>>,
132 stderr_history: VecDeque<Vec<u8>>,
133 combined_history: VecDeque<StreamEvent>,
134 stdout_raw: Vec<u8>,
135 stderr_raw: Vec<u8>,
136 stdout_history_bytes: usize,
137 stderr_history_bytes: usize,
138 combined_history_bytes: usize,
139 stdout_closed: bool,
140 stderr_closed: bool,
141}
142
143const RETURNCODE_NOT_SET: i64 = i64::MIN;
145
146struct SharedState {
147 queues: Mutex<QueueState>,
148 condvar: Condvar,
149 returncode: AtomicI64,
152}
153
154struct ChildState {
155 child: Child,
156 #[cfg(windows)]
157 _job: WindowsJobHandle,
158}
159
160impl SharedState {
161 fn new(capture: bool) -> Self {
162 let queues = QueueState {
163 stdout_closed: !capture,
164 stderr_closed: !capture,
165 ..QueueState::default()
166 };
167 Self {
168 queues: Mutex::new(queues),
169 condvar: Condvar::new(),
170 returncode: AtomicI64::new(RETURNCODE_NOT_SET),
171 }
172 }
173}
174
175pub struct NativeProcess {
182 config: ProcessConfig,
183 child: Arc<Mutex<Option<ChildState>>>,
184 shared: Arc<SharedState>,
185 #[cfg(windows)]
186 capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
187}
188
189impl NativeProcess {
190 pub fn new(config: ProcessConfig) -> Self {
194 Self {
195 shared: Arc::new(SharedState::new(config.capture)),
196 child: Arc::new(Mutex::new(None)),
197 config,
198 #[cfg(windows)]
199 capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
200 }
201 }
202
203 #[inline(never)]
205 pub fn start(&self) -> Result<(), ProcessError> {
210 public_symbols::rp_native_process_start_public(self)
211 }
212
213 fn start_impl(&self) -> Result<(), ProcessError> {
214 crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
215 let mut guard = self.child.lock().expect("child mutex poisoned");
216 if guard.is_some() {
217 return Err(ProcessError::AlreadyStarted);
218 }
219
220 let mut command = self.build_command();
221 match self.config.stdin_mode {
222 StdinMode::Inherit => {}
223 StdinMode::Piped => {
224 command.stdin(Stdio::piped());
225 }
226 StdinMode::Null => {
227 command.stdin(Stdio::null());
228 }
229 }
230 if self.config.capture {
231 command.stdout(Stdio::piped());
232 command.stderr(Stdio::piped());
233 }
234
235 let mut child = command.spawn().map_err(ProcessError::Spawn)?;
236 log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
237 #[cfg(windows)]
238 let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
239 .map_err(ProcessError::Spawn)?;
240 if self.config.capture {
241 let stdout = child.stdout.take().expect("stdout pipe missing");
242 let stderr = child.stderr.take().expect("stderr pipe missing");
243 #[cfg(windows)]
244 {
245 use std::os::windows::io::AsRawHandle;
246 let mut handles = self
247 .capture_pipe_handles
248 .lock()
249 .expect("capture pipe handles mutex poisoned");
250 handles.stdout = Some(stdout.as_raw_handle() as usize);
251 handles.stderr = Some(stderr.as_raw_handle() as usize);
252 }
253 self.spawn_reader(
254 stdout,
255 StreamKind::Stdout,
256 StreamKind::Stdout,
257 self.pipe_done_callback(StreamKind::Stdout),
258 );
259 self.spawn_reader(
260 stderr,
261 StreamKind::Stderr,
262 match self.config.stderr_mode {
263 StderrMode::Stdout => StreamKind::Stdout,
264 StderrMode::Pipe => StreamKind::Stderr,
265 },
266 self.pipe_done_callback(StreamKind::Stderr),
267 );
268 }
269 *guard = Some(ChildState {
270 child,
271 #[cfg(windows)]
272 _job: job,
273 });
274 drop(guard);
275 self.spawn_exit_waiter();
276 Ok(())
277 }
278
279 fn spawn_exit_waiter(&self) {
282 let child = Arc::clone(&self.child);
283 let shared = Arc::clone(&self.shared);
284 thread::spawn(move || {
285 loop {
286 if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
287 return;
288 }
289 {
290 let mut guard = child.lock().expect("child mutex poisoned");
291 if let Some(child_state) = guard.as_mut() {
292 match child_state.child.try_wait() {
293 Ok(Some(status)) => {
294 let code = exit_code(status);
295 shared.returncode.store(code as i64, Ordering::Release);
296 shared.condvar.notify_all();
297 return;
298 }
299 Ok(None) => {}
300 Err(_) => return,
301 }
302 } else {
303 return;
304 }
305 }
306 thread::sleep(Duration::from_millis(10));
312 }
313 });
314 }
315
316 pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
318 let mut guard = self.child.lock().expect("child mutex poisoned");
319 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
320 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
321 use std::io::Write;
322 stdin.write_all(data).map_err(ProcessError::Io)?;
323 stdin.flush().map_err(ProcessError::Io)?;
324 drop(child.stdin.take());
325 Ok(())
326 }
327
328 pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
333 let mut guard = self.child.lock().expect("child mutex poisoned");
334 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
335 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
336 use std::io::Write;
337 stdin.write_all(data).map_err(ProcessError::Io)?;
338 stdin.flush().map_err(ProcessError::Io)?;
339 Ok(())
340 }
341
342 pub fn close_stdin(&self) -> Result<(), ProcessError> {
345 let mut guard = self.child.lock().expect("child mutex poisoned");
346 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
347 drop(child.stdin.take());
348 Ok(())
349 }
350
351 pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
355 if let Some(code) = self.returncode() {
357 return Ok(Some(code));
358 }
359 let mut guard = self.child.lock().expect("child mutex poisoned");
360 let Some(child_state) = guard.as_mut() else {
361 return Ok(self.returncode());
362 };
363 let child = &mut child_state.child;
364 let status = child.try_wait().map_err(ProcessError::Io)?;
365 if let Some(status) = status {
366 let code = exit_code(status);
367 self.set_returncode(code);
368 return Ok(Some(code));
369 }
370 Ok(None)
371 }
372
373 #[inline(never)]
375 pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
380 public_symbols::rp_native_process_wait_public(self, timeout)
381 }
382
383 fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
384 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
385 if self.child.lock().expect("child mutex poisoned").is_none() {
386 return self.returncode().ok_or(ProcessError::NotRunning);
387 }
388 if let Some(code) = self.returncode() {
390 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
391 return Ok(code);
392 }
393 let start = Instant::now();
394 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
395 loop {
396 let rc = self.shared.returncode.load(Ordering::Acquire);
398 if rc != RETURNCODE_NOT_SET {
399 drop(guard);
400 let code = rc as i32;
401 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
402 return Ok(code);
403 }
404 if let Some(limit) = timeout {
405 let elapsed = start.elapsed();
406 if elapsed >= limit {
407 return Err(ProcessError::Timeout);
408 }
409 let remaining = limit - elapsed;
410 let wait_time = remaining.min(Duration::from_millis(50));
412 guard = self
413 .shared
414 .condvar
415 .wait_timeout(guard, wait_time)
416 .expect("queue mutex poisoned")
417 .0;
418 } else {
419 guard = self
421 .shared
422 .condvar
423 .wait_timeout(guard, Duration::from_millis(50))
424 .expect("queue mutex poisoned")
425 .0;
426 }
427 }
428 }
429
430 #[inline(never)]
432 pub fn kill(&self) -> Result<(), ProcessError> {
434 public_symbols::rp_native_process_kill_public(self)
435 }
436
437 fn kill_impl(&self) -> Result<(), ProcessError> {
438 crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
439 {
440 let mut guard = self.child.lock().expect("child mutex poisoned");
441 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
442 child.kill().map_err(ProcessError::Io)?;
443 let status = child.wait().map_err(ProcessError::Io)?;
444 self.set_returncode(exit_code(status));
445 }
446 #[cfg(windows)]
454 self.cancel_capture_io();
455 public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
468 self,
469 kill_drain_deadline(),
470 );
471 Ok(())
472 }
473
474 pub fn terminate(&self) -> Result<(), ProcessError> {
478 self.kill()
479 }
480
481 pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
496 #[cfg(unix)]
497 {
498 if !self.config.create_process_group {
499 return Ok(());
500 }
501 let pid = match self.pid() {
502 Some(p) => p as i32,
503 None => return Err(ProcessError::NotRunning),
504 };
505 let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
506 if result != 0 {
507 let err = std::io::Error::last_os_error();
508 if err.raw_os_error() != Some(libc::ESRCH) {
509 return Err(ProcessError::Io(err));
510 }
511 }
512 Ok(())
513 }
514 #[cfg(windows)]
515 {
516 if !self.config.create_process_group {
517 return Ok(());
522 }
523 let pid = match self.pid() {
524 Some(p) => p,
525 None => return Err(ProcessError::NotRunning),
526 };
527 let ok = unsafe {
531 winapi::um::wincon::GenerateConsoleCtrlEvent(
532 winapi::um::wincon::CTRL_BREAK_EVENT,
533 pid,
534 )
535 };
536 if ok == 0 {
537 let err = std::io::Error::last_os_error();
538 if err.raw_os_error() != Some(6) {
544 return Err(ProcessError::Io(err));
545 }
546 }
547 Ok(())
548 }
549 }
550
551 #[inline(never)]
553 pub fn close(&self) -> Result<(), ProcessError> {
555 public_symbols::rp_native_process_close_public(self)
556 }
557
558 fn close_impl(&self) -> Result<(), ProcessError> {
559 crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
560 if self.child.lock().expect("child mutex poisoned").is_none() {
561 return Ok(());
562 }
563 if self.poll()?.is_none() {
564 self.kill()?;
565 } else {
566 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
567 }
568 Ok(())
569 }
570
571 pub fn pid(&self) -> Option<u32> {
573 self.child
574 .lock()
575 .expect("child mutex poisoned")
576 .as_ref()
577 .map(|state| state.child.id())
578 }
579
580 pub fn returncode(&self) -> Option<i32> {
582 let v = self.shared.returncode.load(Ordering::Acquire);
583 if v == RETURNCODE_NOT_SET {
584 None
585 } else {
586 Some(v as i32)
587 }
588 }
589
590 pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
592 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
593 return false;
594 }
595 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
596 match stream {
597 StreamKind::Stdout => !guard.stdout_queue.is_empty(),
598 StreamKind::Stderr => !guard.stderr_queue.is_empty(),
599 }
600 }
601
602 pub fn has_pending_combined(&self) -> bool {
604 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
605 !guard.combined_queue.is_empty()
606 }
607
608 pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
610 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
611 return Vec::new();
612 }
613 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
614 let queue = match stream {
615 StreamKind::Stdout => &mut guard.stdout_queue,
616 StreamKind::Stderr => &mut guard.stderr_queue,
617 };
618 queue.drain(..).collect()
619 }
620
621 pub fn drain_combined(&self) -> Vec<StreamEvent> {
623 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
624 guard.combined_queue.drain(..).collect()
625 }
626
627 pub fn read_stream(
632 &self,
633 stream: StreamKind,
634 timeout: Option<Duration>,
635 ) -> ReadStatus<Vec<u8>> {
636 let deadline = timeout.map(|limit| Instant::now() + limit);
637 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
638
639 loop {
640 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
641 return ReadStatus::Eof;
642 }
643
644 let queue = match stream {
645 StreamKind::Stdout => &mut guard.stdout_queue,
646 StreamKind::Stderr => &mut guard.stderr_queue,
647 };
648 if let Some(line) = queue.pop_front() {
649 return ReadStatus::Line(line);
650 }
651
652 let closed = match stream {
653 StreamKind::Stdout => {
654 if self.config.stderr_mode == StderrMode::Stdout {
655 guard.stdout_closed && guard.stderr_closed
656 } else {
657 guard.stdout_closed
658 }
659 }
660 StreamKind::Stderr => guard.stderr_closed,
661 };
662 if closed {
663 return ReadStatus::Eof;
664 }
665
666 match deadline {
667 Some(deadline) => {
668 let now = Instant::now();
669 if now >= deadline {
670 return ReadStatus::Timeout;
671 }
672 let wait = deadline.saturating_duration_since(now);
673 let result = self
674 .shared
675 .condvar
676 .wait_timeout(guard, wait)
677 .expect("queue mutex poisoned");
678 guard = result.0;
679 if result.1.timed_out() {
680 return ReadStatus::Timeout;
681 }
682 }
683 None => {
684 guard = self
685 .shared
686 .condvar
687 .wait(guard)
688 .expect("queue mutex poisoned");
689 }
690 }
691 }
692 }
693
694 #[inline(never)]
696 pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
698 public_symbols::rp_native_process_read_combined_public(self, timeout)
699 }
700
701 fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
702 crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
703 let deadline = timeout.map(|limit| Instant::now() + limit);
704 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
705
706 loop {
707 if let Some(event) = guard.combined_queue.pop_front() {
708 return ReadStatus::Line(event);
709 }
710 if guard.stdout_closed && guard.stderr_closed {
711 return ReadStatus::Eof;
712 }
713
714 match deadline {
715 Some(deadline) => {
716 let now = Instant::now();
717 if now >= deadline {
718 return ReadStatus::Timeout;
719 }
720 let wait = deadline.saturating_duration_since(now);
721 let result = self
722 .shared
723 .condvar
724 .wait_timeout(guard, wait)
725 .expect("queue mutex poisoned");
726 guard = result.0;
727 if result.1.timed_out() {
728 return ReadStatus::Timeout;
729 }
730 }
731 None => {
732 guard = self
733 .shared
734 .condvar
735 .wait(guard)
736 .expect("queue mutex poisoned");
737 }
738 }
739 }
740 }
741
742 pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
744 self.shared
745 .queues
746 .lock()
747 .expect("queue mutex poisoned")
748 .stdout_history
749 .clone()
750 .into_iter()
751 .collect()
752 }
753
754 fn captured_stdout_raw(&self) -> Vec<u8> {
755 self.shared
756 .queues
757 .lock()
758 .expect("queue mutex poisoned")
759 .stdout_raw
760 .clone()
761 }
762
763 pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
765 if self.config.stderr_mode == StderrMode::Stdout {
766 return Vec::new();
767 }
768 self.shared
769 .queues
770 .lock()
771 .expect("queue mutex poisoned")
772 .stderr_history
773 .clone()
774 .into_iter()
775 .collect()
776 }
777
778 fn captured_stderr_raw(&self) -> Vec<u8> {
779 if self.config.stderr_mode == StderrMode::Stdout {
780 return Vec::new();
781 }
782 self.shared
783 .queues
784 .lock()
785 .expect("queue mutex poisoned")
786 .stderr_raw
787 .clone()
788 }
789
790 pub fn captured_combined(&self) -> Vec<StreamEvent> {
792 self.shared
793 .queues
794 .lock()
795 .expect("queue mutex poisoned")
796 .combined_history
797 .clone()
798 .into_iter()
799 .collect()
800 }
801
802 pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
804 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
805 return 0;
806 }
807 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
808 match stream {
809 StreamKind::Stdout => guard.stdout_history_bytes,
810 StreamKind::Stderr => guard.stderr_history_bytes,
811 }
812 }
813
814 pub fn captured_combined_bytes(&self) -> usize {
816 self.shared
817 .queues
818 .lock()
819 .expect("queue mutex poisoned")
820 .combined_history_bytes
821 }
822
823 pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
825 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
826 return 0;
827 }
828 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
829 match stream {
830 StreamKind::Stdout => {
831 let released = guard.stdout_history_bytes;
832 guard.stdout_history.clear();
833 guard.stdout_raw.clear();
834 guard.stdout_history_bytes = 0;
835 released
836 }
837 StreamKind::Stderr => {
838 let released = guard.stderr_history_bytes;
839 guard.stderr_history.clear();
840 guard.stderr_raw.clear();
841 guard.stderr_history_bytes = 0;
842 released
843 }
844 }
845 }
846
847 pub fn clear_captured_combined(&self) -> usize {
849 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
850 let released = guard.combined_history_bytes;
851 guard.combined_history.clear();
852 guard.combined_history_bytes = 0;
853 released
854 }
855
856 fn build_command(&self) -> Command {
857 let mut command = match &self.config.command {
858 CommandSpec::Shell(command) => shell_command(command),
859 CommandSpec::Argv(argv) => {
860 let mut command = Command::new(&argv[0]);
861 if argv.len() > 1 {
862 command.args(&argv[1..]);
863 }
864 command
865 }
866 };
867 if let Some(cwd) = &self.config.cwd {
868 command.current_dir(cwd);
869 }
870 if let Some(env) = &self.config.env {
871 command.env_clear();
872 command.envs(env.iter().map(|(k, v)| (k, v)));
873 }
874 #[cfg(windows)]
875 {
876 use std::os::windows::process::CommandExt;
877
878 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
883 let extra = if self.config.create_process_group {
884 CREATE_NEW_PROCESS_GROUP
885 } else {
886 0
887 };
888 let flags = self.config.creationflags.unwrap_or(0)
889 | extra
890 | windows_priority_flags(self.config.nice);
891 if flags != 0 {
892 command.creation_flags(flags);
893 }
894 }
895 #[cfg(unix)]
896 {
897 let create_process_group = self.config.create_process_group;
898 let nice = self.config.nice;
899
900 if create_process_group || nice.is_some() {
901 use std::os::unix::process::CommandExt;
902
903 unsafe {
904 command.pre_exec(move || {
905 if create_process_group && libc::setpgid(0, 0) == -1 {
906 return Err(std::io::Error::last_os_error());
907 }
908 if let Some(nice) = nice {
909 let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
910 if result == -1 {
911 return Err(std::io::Error::last_os_error());
912 }
913 }
914 Ok(())
915 });
916 }
917 }
918 }
919 command
920 }
921
922 fn spawn_reader<R>(
923 &self,
924 pipe: R,
925 source_stream: StreamKind,
926 visible_stream: StreamKind,
927 on_pipe_done: Box<dyn FnOnce() + Send>,
928 ) where
929 R: Read + Send + 'static,
930 {
931 let shared = Arc::clone(&self.shared);
932 thread::spawn(move || {
933 let mut reader = pipe;
934 let mut chunk = vec![0_u8; 65536];
935 let mut pending = Vec::new();
936
937 loop {
938 match reader.read(&mut chunk) {
939 Ok(0) => break,
940 Ok(n) => {
941 append_raw(&shared, visible_stream, &chunk[..n]);
942 let lines = feed_chunk(&mut pending, &chunk[..n]);
943 emit_lines(&shared, visible_stream, lines);
944 }
945 Err(_) => break,
946 }
947 }
948
949 if !pending.is_empty() {
950 emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
951 }
952
953 on_pipe_done();
958 drop(reader);
959
960 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
961 match source_stream {
962 StreamKind::Stdout => guard.stdout_closed = true,
963 StreamKind::Stderr => guard.stderr_closed = true,
964 }
965 shared.condvar.notify_all();
966 });
967 }
968
969 #[cfg(windows)]
970 fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
971 let handles = Arc::clone(&self.capture_pipe_handles);
972 Box::new(move || {
973 let mut guard = handles.lock().expect("capture pipe handles mutex poisoned");
974 match stream {
975 StreamKind::Stdout => guard.stdout = None,
976 StreamKind::Stderr => guard.stderr = None,
977 }
978 })
979 }
980
981 #[cfg(not(windows))]
982 fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
983 Box::new(|| {})
984 }
985
986 #[cfg(windows)]
992 fn cancel_capture_io(&self) {
993 crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
994 use winapi::shared::ntdef::HANDLE;
995 use winapi::um::ioapiset::CancelIoEx;
996 let guard = self
997 .capture_pipe_handles
998 .lock()
999 .expect("capture pipe handles mutex poisoned");
1000 if let Some(h) = guard.stdout {
1001 unsafe {
1008 CancelIoEx(h as HANDLE, std::ptr::null_mut());
1009 }
1010 }
1011 if let Some(h) = guard.stderr {
1012 unsafe {
1013 CancelIoEx(h as HANDLE, std::ptr::null_mut());
1014 }
1015 }
1016 }
1017
1018 fn set_returncode(&self, code: i32) {
1019 self.shared.returncode.store(code as i64, Ordering::Release);
1020 self.shared.condvar.notify_all();
1021 }
1022
1023 fn wait_for_capture_completion_impl(&self) {
1024 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait_for_capture_completion");
1025 if !self.config.capture {
1026 return;
1027 }
1028
1029 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1030 while !(guard.stdout_closed && guard.stderr_closed) {
1031 guard = self
1032 .shared
1033 .condvar
1034 .wait(guard)
1035 .expect("queue mutex poisoned");
1036 }
1037 }
1038
1039 fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
1047 crate::rp_rust_debug_scope!(
1048 "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
1049 );
1050 if !self.config.capture {
1051 return true;
1052 }
1053
1054 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1055 while !(guard.stdout_closed && guard.stderr_closed) {
1056 let now = Instant::now();
1057 if now >= deadline {
1058 guard.stdout_closed = true;
1059 guard.stderr_closed = true;
1060 self.shared.condvar.notify_all();
1061 return false;
1062 }
1063 let (next_guard, result) = self
1064 .shared
1065 .condvar
1066 .wait_timeout(guard, deadline - now)
1067 .expect("queue mutex poisoned");
1068 guard = next_guard;
1069 if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
1070 guard.stdout_closed = true;
1071 guard.stderr_closed = true;
1072 self.shared.condvar.notify_all();
1073 return false;
1074 }
1075 }
1076 true
1077 }
1078}
1079
1080fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
1081 if lines.is_empty() {
1082 return;
1083 }
1084 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1085 for line in lines {
1086 let line_len = line.len();
1087 match stream {
1088 StreamKind::Stdout => {
1089 guard.stdout_history_bytes += line_len;
1090 guard.stdout_history.push_back(line.clone());
1091 guard.stdout_queue.push_back(line.clone());
1092 }
1093 StreamKind::Stderr => {
1094 guard.stderr_history_bytes += line_len;
1095 guard.stderr_history.push_back(line.clone());
1096 guard.stderr_queue.push_back(line.clone());
1097 }
1098 }
1099 let event = StreamEvent { stream, line };
1100 guard.combined_history_bytes += line_len;
1101 guard.combined_history.push_back(event.clone());
1102 guard.combined_queue.push_back(event);
1103 }
1104 shared.condvar.notify_all();
1105}
1106
1107fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1108 if chunk.is_empty() {
1109 return;
1110 }
1111 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1112 match stream {
1113 StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1114 StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1115 }
1116}
1117
1118pub fn run_command(
1124 mut config: ProcessConfig,
1125 timeout: Option<Duration>,
1126) -> Result<RunOutput, ProcessError> {
1127 config.capture = true;
1128 let process = NativeProcess::new(config);
1129 process.start()?;
1130
1131 let exit_code = match process.wait(timeout) {
1132 Ok(code) => code,
1133 Err(ProcessError::Timeout) => {
1134 match process.kill() {
1135 Ok(()) | Err(ProcessError::NotRunning) => {}
1136 Err(error) => return Err(error),
1137 }
1138 return Err(ProcessError::Timeout);
1139 }
1140 Err(error) => return Err(error),
1141 };
1142
1143 Ok(RunOutput {
1144 stdout: process.captured_stdout_raw(),
1145 stderr: process.captured_stderr_raw(),
1146 exit_code,
1147 })
1148}
1149
1150pub(crate) fn shell_command(command: &str) -> Command {
1151 #[cfg(windows)]
1152 {
1153 use std::os::windows::process::CommandExt;
1154
1155 let mut cmd = Command::new("cmd");
1156 cmd.raw_arg("/D /S /C \"");
1157 cmd.raw_arg(command);
1158 cmd.raw_arg("\"");
1159 cmd
1160 }
1161 #[cfg(not(windows))]
1162 {
1163 let mut cmd = Command::new("sh");
1164 cmd.arg("-lc").arg(command);
1165 cmd
1166 }
1167}
1168
1169#[cfg(test)]
1170mod tests;