1use std::collections::VecDeque;
10use std::io::Read;
11use std::process::{Child, Command, Stdio};
12use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
13use std::sync::{Arc, Condvar, Mutex};
14use std::thread;
15use std::time::{Duration, Instant};
16
17use crate::observer::ObserverEmitter;
18
19pub mod console_detect;
20pub mod containment;
21mod helpers;
22pub mod observer;
27#[cfg(feature = "originator-scan")]
28pub mod originator;
29#[cfg(feature = "client")]
34pub mod proto {
36 #[allow(missing_docs)]
38 pub mod daemon {
39 include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
40 }
41}
42
43#[cfg(feature = "client")]
44pub mod client;
45
46#[cfg(feature = "client")]
51pub mod broker;
52
53#[cfg(feature = "client")]
59pub mod maintenance;
60
61#[cfg(feature = "client")]
62pub mod cleanup;
63
64#[cfg(feature = "test-support")]
69pub mod test_support;
70
71#[cfg(feature = "telemetry")]
74#[path = "daemon/telemetry.rs"]
75pub mod telemetry;
76
77#[cfg(feature = "daemon")]
80pub mod daemon;
82pub mod pty;
84mod public_symbols;
85mod rust_debug;
86pub mod spawn;
87pub mod systemd_killmode;
88pub mod terminal_graphics;
89mod types;
90#[cfg(unix)]
91mod unix;
92#[cfg(windows)]
93mod windows;
94
95pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
96pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
97pub use observer::{
98 CapabilitySupport, CategoryCapability, EventCategory, ObserverCapabilities, ObserverConfig,
99 ObserverEvent, ObserverEventKind, ObserverSubscriber,
100};
101#[cfg(feature = "originator-scan")]
102pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
103pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
104pub use spawn::{
105 spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
106 StdioSource,
107};
108pub use terminal_graphics::{
109 current_terminal_capabilities, current_terminal_capabilities_with_timeout,
110 detect_terminal_capabilities, CapabilityStatus, EvidenceStrength, GraphicsCapability,
111 GraphicsProtocol, TerminalCapabilities, TerminalCapabilityInput, TerminalGraphicsCapabilities,
112 TerminalProbeEvidence,
113};
114pub use types::{
115 CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
116 StreamEvent, StreamKind,
117};
118
119pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
120#[cfg(unix)]
121pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
122#[cfg(windows)]
123pub(crate) use windows::{
124 assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
125 WindowsJobHandle,
126};
127
128#[macro_export]
129macro_rules! rp_rust_debug_scope {
131 ($label:expr) => {
132 let _running_process_rust_debug_scope =
133 $crate::RustDebugScopeGuard::enter($label, file!(), line!());
134 };
135}
136
137#[derive(Default)]
138struct QueueState {
139 stdout_queue: VecDeque<Vec<u8>>,
140 stderr_queue: VecDeque<Vec<u8>>,
141 combined_queue: VecDeque<StreamEvent>,
142 stdout_history: VecDeque<Vec<u8>>,
143 stderr_history: VecDeque<Vec<u8>>,
144 combined_history: VecDeque<StreamEvent>,
145 stdout_raw: Vec<u8>,
146 stderr_raw: Vec<u8>,
147 stdout_history_bytes: usize,
148 stderr_history_bytes: usize,
149 combined_history_bytes: usize,
150 stdout_closed: bool,
151 stderr_closed: bool,
152}
153
154const RETURNCODE_NOT_SET: i64 = i64::MIN;
156
157struct SharedState {
158 queues: Mutex<QueueState>,
159 condvar: Condvar,
160 returncode: AtomicI64,
163 observer: Option<ObserverEmitter>,
168 observer_exit_emitted: AtomicBool,
171}
172
173struct ChildState {
174 child: Child,
175 #[cfg(windows)]
176 _job: WindowsJobHandle,
177}
178
179impl SharedState {
180 fn new(capture: bool) -> Self {
181 Self::with_observer(capture, None)
182 }
183
184 fn with_observer(capture: bool, observer: Option<ObserverEmitter>) -> Self {
185 let queues = QueueState {
186 stdout_closed: !capture,
187 stderr_closed: !capture,
188 ..QueueState::default()
189 };
190 Self {
191 queues: Mutex::new(queues),
192 condvar: Condvar::new(),
193 returncode: AtomicI64::new(RETURNCODE_NOT_SET),
194 observer,
195 observer_exit_emitted: AtomicBool::new(false),
196 }
197 }
198
199 fn emit_exited(&self, pid: u32, exit_code: i32) {
202 let Some(emitter) = self.observer.as_ref() else {
203 return;
204 };
205 if self
206 .observer_exit_emitted
207 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
208 .is_ok()
209 {
210 emitter.emit_exited(pid, exit_code);
211 }
212 }
213}
214
215pub struct NativeProcess {
222 config: ProcessConfig,
223 child: Arc<Mutex<Option<ChildState>>>,
224 shared: Arc<SharedState>,
225 #[cfg(windows)]
226 capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
227}
228
229impl NativeProcess {
230 pub fn new(config: ProcessConfig) -> Self {
236 Self::new_with_observer(config, None)
237 }
238
239 pub fn with_observer(
252 config: ProcessConfig,
253 observer: crate::observer::ObserverConfig,
254 ) -> (Self, ObserverSubscriber) {
255 let (emitter, subscriber) = ObserverEmitter::new(observer);
256 let process = Self::new_with_observer(config, Some(emitter));
257 (process, subscriber)
258 }
259
260 fn new_with_observer(config: ProcessConfig, observer: Option<ObserverEmitter>) -> Self {
261 let shared = match observer {
262 None => SharedState::new(config.capture),
264 Some(emitter) => SharedState::with_observer(config.capture, Some(emitter)),
265 };
266 Self {
267 shared: Arc::new(shared),
268 child: Arc::new(Mutex::new(None)),
269 config,
270 #[cfg(windows)]
271 capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
272 }
273 }
274
275 #[inline(never)]
277 pub fn start(&self) -> Result<(), ProcessError> {
282 public_symbols::rp_native_process_start_public(self)
283 }
284
285 fn start_impl(&self) -> Result<(), ProcessError> {
286 crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
287 let mut guard = self.child.lock().expect("child mutex poisoned");
288 if guard.is_some() {
289 return Err(ProcessError::AlreadyStarted);
290 }
291
292 let mut command = self.build_command();
293 match self.config.stdin_mode {
294 StdinMode::Inherit => {}
295 StdinMode::Piped => {
296 command.stdin(Stdio::piped());
297 }
298 StdinMode::Null => {
299 command.stdin(Stdio::null());
300 }
301 }
302 if self.config.capture {
303 command.stdout(Stdio::piped());
304 command.stderr(Stdio::piped());
305 }
306
307 let mut child = command.spawn().map_err(ProcessError::Spawn)?;
308 log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
309 if let Some(emitter) = self.shared.observer.as_ref() {
312 emitter.emit_started(child.id());
313 }
314 #[cfg(windows)]
315 let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
316 .map_err(ProcessError::Spawn)?;
317 if self.config.capture {
318 let stdout = child.stdout.take().expect("stdout pipe missing");
319 let stderr = child.stderr.take().expect("stderr pipe missing");
320 #[cfg(windows)]
321 {
322 use std::os::windows::io::AsRawHandle;
323 let mut handles = self
324 .capture_pipe_handles
325 .lock()
326 .expect("capture pipe handles mutex poisoned");
327 handles.stdout = Some(stdout.as_raw_handle() as usize);
328 handles.stderr = Some(stderr.as_raw_handle() as usize);
329 }
330 self.spawn_reader(
331 stdout,
332 StreamKind::Stdout,
333 StreamKind::Stdout,
334 self.pipe_done_callback(StreamKind::Stdout),
335 );
336 self.spawn_reader(
337 stderr,
338 StreamKind::Stderr,
339 match self.config.stderr_mode {
340 StderrMode::Stdout => StreamKind::Stdout,
341 StderrMode::Pipe => StreamKind::Stderr,
342 },
343 self.pipe_done_callback(StreamKind::Stderr),
344 );
345 }
346 *guard = Some(ChildState {
347 child,
348 #[cfg(windows)]
349 _job: job,
350 });
351 drop(guard);
352 self.spawn_exit_waiter();
353 Ok(())
354 }
355
356 fn spawn_exit_waiter(&self) {
359 let child = Arc::clone(&self.child);
360 let shared = Arc::clone(&self.shared);
361 thread::spawn(move || {
362 loop {
363 if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
364 return;
365 }
366 {
367 let mut guard = child.lock().expect("child mutex poisoned");
368 if let Some(child_state) = guard.as_mut() {
369 let pid = child_state.child.id();
370 match child_state.child.try_wait() {
371 Ok(Some(status)) => {
372 let code = exit_code(status);
373 shared.returncode.store(code as i64, Ordering::Release);
374 shared.emit_exited(pid, code);
378 shared.condvar.notify_all();
379 return;
380 }
381 Ok(None) => {}
382 Err(_) => return,
383 }
384 } else {
385 return;
386 }
387 }
388 thread::sleep(Duration::from_millis(10));
394 }
395 });
396 }
397
398 pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
400 let mut guard = self.child.lock().expect("child mutex poisoned");
401 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
402 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
403 use std::io::Write;
404 stdin.write_all(data).map_err(ProcessError::Io)?;
405 stdin.flush().map_err(ProcessError::Io)?;
406 drop(child.stdin.take());
407 Ok(())
408 }
409
410 pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
415 let mut guard = self.child.lock().expect("child mutex poisoned");
416 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
417 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
418 use std::io::Write;
419 stdin.write_all(data).map_err(ProcessError::Io)?;
420 stdin.flush().map_err(ProcessError::Io)?;
421 Ok(())
422 }
423
424 pub fn close_stdin(&self) -> Result<(), ProcessError> {
427 let mut guard = self.child.lock().expect("child mutex poisoned");
428 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
429 drop(child.stdin.take());
430 Ok(())
431 }
432
433 pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
437 if let Some(code) = self.returncode() {
439 return Ok(Some(code));
440 }
441 let mut guard = self.child.lock().expect("child mutex poisoned");
442 let Some(child_state) = guard.as_mut() else {
443 return Ok(self.returncode());
444 };
445 let pid = child_state.child.id();
446 let child = &mut child_state.child;
447 let status = child.try_wait().map_err(ProcessError::Io)?;
448 if let Some(status) = status {
449 let code = exit_code(status);
450 self.set_returncode(code);
451 self.shared.emit_exited(pid, code);
452 return Ok(Some(code));
453 }
454 Ok(None)
455 }
456
457 #[inline(never)]
459 pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
464 public_symbols::rp_native_process_wait_public(self, timeout)
465 }
466
467 fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
468 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
469 if self.child.lock().expect("child mutex poisoned").is_none() {
470 return self.returncode().ok_or(ProcessError::NotRunning);
471 }
472 if let Some(code) = self.returncode() {
474 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
475 return Ok(code);
476 }
477 let start = Instant::now();
478 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
479 loop {
480 let rc = self.shared.returncode.load(Ordering::Acquire);
482 if rc != RETURNCODE_NOT_SET {
483 drop(guard);
484 let code = rc as i32;
485 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
486 return Ok(code);
487 }
488 if let Some(limit) = timeout {
489 let elapsed = start.elapsed();
490 if elapsed >= limit {
491 return Err(ProcessError::Timeout);
492 }
493 let remaining = limit - elapsed;
494 let wait_time = remaining.min(Duration::from_millis(50));
496 guard = self
497 .shared
498 .condvar
499 .wait_timeout(guard, wait_time)
500 .expect("queue mutex poisoned")
501 .0;
502 } else {
503 guard = self
505 .shared
506 .condvar
507 .wait_timeout(guard, Duration::from_millis(50))
508 .expect("queue mutex poisoned")
509 .0;
510 }
511 }
512 }
513
514 #[inline(never)]
516 pub fn kill(&self) -> Result<(), ProcessError> {
518 public_symbols::rp_native_process_kill_public(self)
519 }
520
521 fn kill_impl(&self) -> Result<(), ProcessError> {
522 crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
523 {
524 let mut guard = self.child.lock().expect("child mutex poisoned");
525 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
526 let pid = child.id();
527 child.kill().map_err(ProcessError::Io)?;
528 let status = child.wait().map_err(ProcessError::Io)?;
529 let code = exit_code(status);
530 self.set_returncode(code);
531 self.shared.emit_exited(pid, code);
534 }
535 #[cfg(windows)]
543 self.cancel_capture_io();
544 public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
557 self,
558 kill_drain_deadline(),
559 );
560 Ok(())
561 }
562
563 pub fn terminate(&self) -> Result<(), ProcessError> {
567 self.kill()
568 }
569
570 pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
585 #[cfg(unix)]
586 {
587 if !self.config.create_process_group {
588 return Ok(());
589 }
590 let pid = match self.pid() {
591 Some(p) => p as i32,
592 None => return Err(ProcessError::NotRunning),
593 };
594 let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
595 if result != 0 {
596 let err = std::io::Error::last_os_error();
597 if err.raw_os_error() != Some(libc::ESRCH) {
598 return Err(ProcessError::Io(err));
599 }
600 }
601 Ok(())
602 }
603 #[cfg(windows)]
604 {
605 if !self.config.create_process_group {
606 return Ok(());
611 }
612 let pid = match self.pid() {
613 Some(p) => p,
614 None => return Err(ProcessError::NotRunning),
615 };
616 let ok = unsafe {
620 winapi::um::wincon::GenerateConsoleCtrlEvent(
621 winapi::um::wincon::CTRL_BREAK_EVENT,
622 pid,
623 )
624 };
625 if ok == 0 {
626 let err = std::io::Error::last_os_error();
627 if err.raw_os_error() != Some(6) {
633 return Err(ProcessError::Io(err));
634 }
635 }
636 Ok(())
637 }
638 }
639
640 #[inline(never)]
642 pub fn close(&self) -> Result<(), ProcessError> {
644 public_symbols::rp_native_process_close_public(self)
645 }
646
647 fn close_impl(&self) -> Result<(), ProcessError> {
648 crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
649 if self.child.lock().expect("child mutex poisoned").is_none() {
650 return Ok(());
651 }
652 if self.poll()?.is_none() {
653 self.kill()?;
654 } else {
655 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
656 }
657 Ok(())
658 }
659
660 pub fn pid(&self) -> Option<u32> {
662 self.child
663 .lock()
664 .expect("child mutex poisoned")
665 .as_ref()
666 .map(|state| state.child.id())
667 }
668
669 pub fn returncode(&self) -> Option<i32> {
671 let v = self.shared.returncode.load(Ordering::Acquire);
672 if v == RETURNCODE_NOT_SET {
673 None
674 } else {
675 Some(v as i32)
676 }
677 }
678
679 pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
681 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
682 return false;
683 }
684 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
685 match stream {
686 StreamKind::Stdout => !guard.stdout_queue.is_empty(),
687 StreamKind::Stderr => !guard.stderr_queue.is_empty(),
688 }
689 }
690
691 pub fn has_pending_combined(&self) -> bool {
693 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
694 !guard.combined_queue.is_empty()
695 }
696
697 pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
699 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
700 return Vec::new();
701 }
702 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
703 let queue = match stream {
704 StreamKind::Stdout => &mut guard.stdout_queue,
705 StreamKind::Stderr => &mut guard.stderr_queue,
706 };
707 queue.drain(..).collect()
708 }
709
710 pub fn drain_combined(&self) -> Vec<StreamEvent> {
712 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
713 guard.combined_queue.drain(..).collect()
714 }
715
716 pub fn read_stream(
721 &self,
722 stream: StreamKind,
723 timeout: Option<Duration>,
724 ) -> ReadStatus<Vec<u8>> {
725 let deadline = timeout.map(|limit| Instant::now() + limit);
726 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
727
728 loop {
729 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
730 return ReadStatus::Eof;
731 }
732
733 let queue = match stream {
734 StreamKind::Stdout => &mut guard.stdout_queue,
735 StreamKind::Stderr => &mut guard.stderr_queue,
736 };
737 if let Some(line) = queue.pop_front() {
738 return ReadStatus::Line(line);
739 }
740
741 let closed = match stream {
742 StreamKind::Stdout => {
743 if self.config.stderr_mode == StderrMode::Stdout {
744 guard.stdout_closed && guard.stderr_closed
745 } else {
746 guard.stdout_closed
747 }
748 }
749 StreamKind::Stderr => guard.stderr_closed,
750 };
751 if closed {
752 return ReadStatus::Eof;
753 }
754
755 match deadline {
756 Some(deadline) => {
757 let now = Instant::now();
758 if now >= deadline {
759 return ReadStatus::Timeout;
760 }
761 let wait = deadline.saturating_duration_since(now);
762 let result = self
763 .shared
764 .condvar
765 .wait_timeout(guard, wait)
766 .expect("queue mutex poisoned");
767 guard = result.0;
768 if result.1.timed_out() {
769 return ReadStatus::Timeout;
770 }
771 }
772 None => {
773 guard = self
774 .shared
775 .condvar
776 .wait(guard)
777 .expect("queue mutex poisoned");
778 }
779 }
780 }
781 }
782
783 #[inline(never)]
785 pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
787 public_symbols::rp_native_process_read_combined_public(self, timeout)
788 }
789
790 fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
791 crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
792 let deadline = timeout.map(|limit| Instant::now() + limit);
793 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
794
795 loop {
796 if let Some(event) = guard.combined_queue.pop_front() {
797 return ReadStatus::Line(event);
798 }
799 if guard.stdout_closed && guard.stderr_closed {
800 return ReadStatus::Eof;
801 }
802
803 match deadline {
804 Some(deadline) => {
805 let now = Instant::now();
806 if now >= deadline {
807 return ReadStatus::Timeout;
808 }
809 let wait = deadline.saturating_duration_since(now);
810 let result = self
811 .shared
812 .condvar
813 .wait_timeout(guard, wait)
814 .expect("queue mutex poisoned");
815 guard = result.0;
816 if result.1.timed_out() {
817 return ReadStatus::Timeout;
818 }
819 }
820 None => {
821 guard = self
822 .shared
823 .condvar
824 .wait(guard)
825 .expect("queue mutex poisoned");
826 }
827 }
828 }
829 }
830
831 pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
833 self.shared
834 .queues
835 .lock()
836 .expect("queue mutex poisoned")
837 .stdout_history
838 .clone()
839 .into_iter()
840 .collect()
841 }
842
843 fn captured_stdout_raw(&self) -> Vec<u8> {
844 self.shared
845 .queues
846 .lock()
847 .expect("queue mutex poisoned")
848 .stdout_raw
849 .clone()
850 }
851
852 pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
854 if self.config.stderr_mode == StderrMode::Stdout {
855 return Vec::new();
856 }
857 self.shared
858 .queues
859 .lock()
860 .expect("queue mutex poisoned")
861 .stderr_history
862 .clone()
863 .into_iter()
864 .collect()
865 }
866
867 fn captured_stderr_raw(&self) -> Vec<u8> {
868 if self.config.stderr_mode == StderrMode::Stdout {
869 return Vec::new();
870 }
871 self.shared
872 .queues
873 .lock()
874 .expect("queue mutex poisoned")
875 .stderr_raw
876 .clone()
877 }
878
879 pub fn captured_combined(&self) -> Vec<StreamEvent> {
881 self.shared
882 .queues
883 .lock()
884 .expect("queue mutex poisoned")
885 .combined_history
886 .clone()
887 .into_iter()
888 .collect()
889 }
890
891 pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
893 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
894 return 0;
895 }
896 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
897 match stream {
898 StreamKind::Stdout => guard.stdout_history_bytes,
899 StreamKind::Stderr => guard.stderr_history_bytes,
900 }
901 }
902
903 pub fn captured_combined_bytes(&self) -> usize {
905 self.shared
906 .queues
907 .lock()
908 .expect("queue mutex poisoned")
909 .combined_history_bytes
910 }
911
912 pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
914 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
915 return 0;
916 }
917 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
918 match stream {
919 StreamKind::Stdout => {
920 let released = guard.stdout_history_bytes;
921 guard.stdout_history.clear();
922 guard.stdout_raw.clear();
923 guard.stdout_history_bytes = 0;
924 released
925 }
926 StreamKind::Stderr => {
927 let released = guard.stderr_history_bytes;
928 guard.stderr_history.clear();
929 guard.stderr_raw.clear();
930 guard.stderr_history_bytes = 0;
931 released
932 }
933 }
934 }
935
936 pub fn clear_captured_combined(&self) -> usize {
938 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
939 let released = guard.combined_history_bytes;
940 guard.combined_history.clear();
941 guard.combined_history_bytes = 0;
942 released
943 }
944
945 fn build_command(&self) -> Command {
946 let mut command = match &self.config.command {
947 CommandSpec::Shell(command) => shell_command(command),
948 CommandSpec::Argv(argv) => {
949 let mut command = Command::new(&argv[0]);
950 if argv.len() > 1 {
951 command.args(&argv[1..]);
952 }
953 command
954 }
955 };
956 if let Some(cwd) = &self.config.cwd {
957 command.current_dir(cwd);
958 }
959 if let Some(env) = &self.config.env {
960 command.env_clear();
961 command.envs(env.iter().map(|(k, v)| (k, v)));
962 }
963 #[cfg(windows)]
964 {
965 use std::os::windows::process::CommandExt;
966
967 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
972 let extra = if self.config.create_process_group {
973 CREATE_NEW_PROCESS_GROUP
974 } else {
975 0
976 };
977 let flags = self.config.creationflags.unwrap_or(0)
978 | extra
979 | windows_priority_flags(self.config.nice);
980 if flags != 0 {
981 command.creation_flags(flags);
982 }
983 }
984 #[cfg(unix)]
985 {
986 let create_process_group = self.config.create_process_group;
987 let nice = self.config.nice;
988
989 if create_process_group || nice.is_some() {
990 use std::os::unix::process::CommandExt;
991
992 unsafe {
993 command.pre_exec(move || {
994 if create_process_group && libc::setpgid(0, 0) == -1 {
995 return Err(std::io::Error::last_os_error());
996 }
997 if let Some(nice) = nice {
998 let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
999 if result == -1 {
1000 return Err(std::io::Error::last_os_error());
1001 }
1002 }
1003 Ok(())
1004 });
1005 }
1006 }
1007 }
1008 command
1009 }
1010
1011 fn spawn_reader<R>(
1012 &self,
1013 pipe: R,
1014 source_stream: StreamKind,
1015 visible_stream: StreamKind,
1016 on_pipe_done: Box<dyn FnOnce() + Send>,
1017 ) where
1018 R: Read + Send + 'static,
1019 {
1020 let shared = Arc::clone(&self.shared);
1021 thread::spawn(move || {
1022 let mut reader = pipe;
1023 let mut chunk = vec![0_u8; 65536];
1024 let mut pending = Vec::new();
1025
1026 loop {
1027 match reader.read(&mut chunk) {
1028 Ok(0) => break,
1029 Ok(n) => {
1030 append_raw(&shared, visible_stream, &chunk[..n]);
1031 let lines = feed_chunk(&mut pending, &chunk[..n]);
1032 emit_lines(&shared, visible_stream, lines);
1033 }
1034 Err(_) => break,
1035 }
1036 }
1037
1038 if !pending.is_empty() {
1039 emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
1040 }
1041
1042 on_pipe_done();
1047 drop(reader);
1048
1049 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1050 match source_stream {
1051 StreamKind::Stdout => guard.stdout_closed = true,
1052 StreamKind::Stderr => guard.stderr_closed = true,
1053 }
1054 shared.condvar.notify_all();
1055 });
1056 }
1057
1058 #[cfg(windows)]
1059 fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
1060 let handles = Arc::clone(&self.capture_pipe_handles);
1061 Box::new(move || {
1062 let mut guard = handles.lock().expect("capture pipe handles mutex poisoned");
1063 match stream {
1064 StreamKind::Stdout => guard.stdout = None,
1065 StreamKind::Stderr => guard.stderr = None,
1066 }
1067 })
1068 }
1069
1070 #[cfg(not(windows))]
1071 fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
1072 Box::new(|| {})
1073 }
1074
1075 #[cfg(windows)]
1081 fn cancel_capture_io(&self) {
1082 crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
1083 use winapi::shared::ntdef::HANDLE;
1084 use winapi::um::ioapiset::CancelIoEx;
1085 let guard = self
1086 .capture_pipe_handles
1087 .lock()
1088 .expect("capture pipe handles mutex poisoned");
1089 if let Some(h) = guard.stdout {
1090 unsafe {
1097 CancelIoEx(h as HANDLE, std::ptr::null_mut());
1098 }
1099 }
1100 if let Some(h) = guard.stderr {
1101 unsafe {
1102 CancelIoEx(h as HANDLE, std::ptr::null_mut());
1103 }
1104 }
1105 }
1106
1107 fn set_returncode(&self, code: i32) {
1108 self.shared.returncode.store(code as i64, Ordering::Release);
1109 self.shared.condvar.notify_all();
1110 }
1111
1112 fn wait_for_capture_completion_impl(&self) {
1113 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait_for_capture_completion");
1114 if !self.config.capture {
1115 return;
1116 }
1117
1118 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1119 while !(guard.stdout_closed && guard.stderr_closed) {
1120 guard = self
1121 .shared
1122 .condvar
1123 .wait(guard)
1124 .expect("queue mutex poisoned");
1125 }
1126 }
1127
1128 fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
1136 crate::rp_rust_debug_scope!(
1137 "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
1138 );
1139 if !self.config.capture {
1140 return true;
1141 }
1142
1143 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1144 while !(guard.stdout_closed && guard.stderr_closed) {
1145 let now = Instant::now();
1146 if now >= deadline {
1147 guard.stdout_closed = true;
1148 guard.stderr_closed = true;
1149 self.shared.condvar.notify_all();
1150 return false;
1151 }
1152 let (next_guard, result) = self
1153 .shared
1154 .condvar
1155 .wait_timeout(guard, deadline - now)
1156 .expect("queue mutex poisoned");
1157 guard = next_guard;
1158 if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
1159 guard.stdout_closed = true;
1160 guard.stderr_closed = true;
1161 self.shared.condvar.notify_all();
1162 return false;
1163 }
1164 }
1165 true
1166 }
1167}
1168
1169fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
1170 if lines.is_empty() {
1171 return;
1172 }
1173 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1174 for line in lines {
1175 let line_len = line.len();
1176 match stream {
1177 StreamKind::Stdout => {
1178 guard.stdout_history_bytes += line_len;
1179 guard.stdout_history.push_back(line.clone());
1180 guard.stdout_queue.push_back(line.clone());
1181 }
1182 StreamKind::Stderr => {
1183 guard.stderr_history_bytes += line_len;
1184 guard.stderr_history.push_back(line.clone());
1185 guard.stderr_queue.push_back(line.clone());
1186 }
1187 }
1188 let event = StreamEvent { stream, line };
1189 guard.combined_history_bytes += line_len;
1190 guard.combined_history.push_back(event.clone());
1191 guard.combined_queue.push_back(event);
1192 }
1193 shared.condvar.notify_all();
1194}
1195
1196fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1197 if chunk.is_empty() {
1198 return;
1199 }
1200 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1201 match stream {
1202 StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1203 StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1204 }
1205}
1206
1207pub fn run_command(
1213 mut config: ProcessConfig,
1214 timeout: Option<Duration>,
1215) -> Result<RunOutput, ProcessError> {
1216 config.capture = true;
1217 let process = NativeProcess::new(config);
1218 process.start()?;
1219
1220 let exit_code = match process.wait(timeout) {
1221 Ok(code) => code,
1222 Err(ProcessError::Timeout) => {
1223 match process.kill() {
1224 Ok(()) | Err(ProcessError::NotRunning) => {}
1225 Err(error) => return Err(error),
1226 }
1227 return Err(ProcessError::Timeout);
1228 }
1229 Err(error) => return Err(error),
1230 };
1231
1232 Ok(RunOutput {
1233 stdout: process.captured_stdout_raw(),
1234 stderr: process.captured_stderr_raw(),
1235 exit_code,
1236 })
1237}
1238
1239pub(crate) fn shell_command(command: &str) -> Command {
1240 #[cfg(windows)]
1241 {
1242 use std::os::windows::process::CommandExt;
1243
1244 let mut cmd = Command::new("cmd");
1245 cmd.raw_arg("/D /S /C \"");
1246 cmd.raw_arg(command);
1247 cmd.raw_arg("\"");
1248 cmd
1249 }
1250 #[cfg(not(windows))]
1251 {
1252 let mut cmd = Command::new("sh");
1253 cmd.arg("-lc").arg(command);
1254 cmd
1255 }
1256}
1257
1258#[cfg(test)]
1259mod tests;