1use std::collections::VecDeque;
10use std::io::Read;
11use std::process::{Child, Command, Stdio};
12use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
13use std::sync::{Arc, Condvar, Mutex};
14use std::thread;
15use std::time::{Duration, Instant};
16
17use crate::observer::ObserverEmitter;
18
19pub mod console_detect;
20pub mod containment;
21mod helpers;
22pub mod observer;
27#[cfg(feature = "originator-scan")]
28pub mod originator;
29#[cfg(feature = "client")]
34pub mod proto {
36 #[allow(missing_docs)]
38 pub mod daemon {
39 include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
40 }
41}
42
43#[cfg(feature = "client")]
44pub mod client;
45
46#[cfg(feature = "client")]
51pub mod broker;
52
53#[cfg(feature = "client")]
59pub mod maintenance;
60
61#[cfg(feature = "client")]
62pub mod cleanup;
63
64#[cfg(feature = "client")]
68pub mod boot_autostart;
69
70#[cfg(feature = "client")]
75pub mod runpm_config;
76
77#[cfg(feature = "test-support")]
82pub mod test_support;
83
84#[cfg(feature = "telemetry")]
87#[path = "daemon/telemetry.rs"]
88pub mod telemetry;
89
90#[cfg(feature = "daemon")]
93pub mod daemon;
95pub mod pty;
97mod public_symbols;
98mod rust_debug;
99pub mod spawn;
100pub mod systemd_killmode;
101pub mod terminal_graphics;
102mod types;
103#[cfg(unix)]
104mod unix;
105#[cfg(windows)]
106mod windows;
107
108pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
109pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
110pub use observer::{
111 CapabilitySupport, CategoryCapability, EventCategory, ObserverCapabilities, ObserverConfig,
112 ObserverEvent, ObserverEventKind, ObserverSubscriber,
113};
114#[cfg(feature = "originator-scan")]
115pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
116pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
117pub use spawn::{
118 spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
119 StdioSource,
120};
121pub use terminal_graphics::{
122 current_terminal_capabilities, current_terminal_capabilities_with_timeout,
123 detect_terminal_capabilities, CapabilityStatus, EvidenceStrength, GraphicsCapability,
124 GraphicsProtocol, TerminalCapabilities, TerminalCapabilityInput, TerminalGraphicsCapabilities,
125 TerminalProbeEvidence,
126};
127pub use types::{
128 CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
129 StreamEvent, StreamKind,
130};
131
132pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
133#[cfg(unix)]
134pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
135#[cfg(windows)]
136pub(crate) use windows::{
137 assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
138 WindowsJobHandle,
139};
140
141#[macro_export]
142macro_rules! rp_rust_debug_scope {
144 ($label:expr) => {
145 let _running_process_rust_debug_scope =
146 $crate::RustDebugScopeGuard::enter($label, file!(), line!());
147 };
148}
149
150#[derive(Default)]
151struct QueueState {
152 stdout_queue: VecDeque<Vec<u8>>,
153 stderr_queue: VecDeque<Vec<u8>>,
154 combined_queue: VecDeque<StreamEvent>,
155 stdout_history: VecDeque<Vec<u8>>,
156 stderr_history: VecDeque<Vec<u8>>,
157 combined_history: VecDeque<StreamEvent>,
158 stdout_raw: Vec<u8>,
159 stderr_raw: Vec<u8>,
160 stdout_history_bytes: usize,
161 stderr_history_bytes: usize,
162 combined_history_bytes: usize,
163 stdout_closed: bool,
164 stderr_closed: bool,
165}
166
167const RETURNCODE_NOT_SET: i64 = i64::MIN;
169
170struct SharedState {
171 queues: Mutex<QueueState>,
172 condvar: Condvar,
173 returncode: AtomicI64,
176 observer: Option<ObserverEmitter>,
181 observer_exit_emitted: AtomicBool,
184}
185
186struct ChildState {
187 child: Child,
188 #[cfg(windows)]
189 _job: WindowsJobHandle,
190}
191
192impl SharedState {
193 fn new(capture: bool) -> Self {
194 Self::with_observer(capture, None)
195 }
196
197 fn with_observer(capture: bool, observer: Option<ObserverEmitter>) -> Self {
198 let queues = QueueState {
199 stdout_closed: !capture,
200 stderr_closed: !capture,
201 ..QueueState::default()
202 };
203 Self {
204 queues: Mutex::new(queues),
205 condvar: Condvar::new(),
206 returncode: AtomicI64::new(RETURNCODE_NOT_SET),
207 observer,
208 observer_exit_emitted: AtomicBool::new(false),
209 }
210 }
211
212 fn emit_exited(&self, pid: u32, exit_code: i32) {
215 let Some(emitter) = self.observer.as_ref() else {
216 return;
217 };
218 if self
219 .observer_exit_emitted
220 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
221 .is_ok()
222 {
223 emitter.emit_exited(pid, exit_code);
224 }
225 }
226}
227
228pub struct NativeProcess {
235 config: ProcessConfig,
236 child: Arc<Mutex<Option<ChildState>>>,
237 shared: Arc<SharedState>,
238 #[cfg(windows)]
239 capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
240}
241
242impl NativeProcess {
243 pub fn new(config: ProcessConfig) -> Self {
249 Self::new_with_observer(config, None)
250 }
251
252 pub fn with_observer(
265 config: ProcessConfig,
266 observer: crate::observer::ObserverConfig,
267 ) -> (Self, ObserverSubscriber) {
268 let (emitter, subscriber) = ObserverEmitter::new(observer);
269 let process = Self::new_with_observer(config, Some(emitter));
270 (process, subscriber)
271 }
272
273 fn new_with_observer(config: ProcessConfig, observer: Option<ObserverEmitter>) -> Self {
274 let shared = match observer {
275 None => SharedState::new(config.capture),
277 Some(emitter) => SharedState::with_observer(config.capture, Some(emitter)),
278 };
279 Self {
280 shared: Arc::new(shared),
281 child: Arc::new(Mutex::new(None)),
282 config,
283 #[cfg(windows)]
284 capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
285 }
286 }
287
288 #[inline(never)]
290 pub fn start(&self) -> Result<(), ProcessError> {
295 public_symbols::rp_native_process_start_public(self)
296 }
297
298 fn start_impl(&self) -> Result<(), ProcessError> {
299 crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
300 let mut guard = self.child.lock().expect("child mutex poisoned");
301 if guard.is_some() {
302 return Err(ProcessError::AlreadyStarted);
303 }
304
305 let mut command = self.build_command();
306 match self.config.stdin_mode {
307 StdinMode::Inherit => {}
308 StdinMode::Piped => {
309 command.stdin(Stdio::piped());
310 }
311 StdinMode::Null => {
312 command.stdin(Stdio::null());
313 }
314 }
315 if self.config.capture {
316 command.stdout(Stdio::piped());
317 command.stderr(Stdio::piped());
318 }
319
320 let mut child = command.spawn().map_err(ProcessError::Spawn)?;
321 log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
322 if let Some(emitter) = self.shared.observer.as_ref() {
325 emitter.emit_started(child.id());
326 }
327 #[cfg(windows)]
328 let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
329 .map_err(ProcessError::Spawn)?;
330 if self.config.capture {
331 let stdout = child.stdout.take().expect("stdout pipe missing");
332 let stderr = child.stderr.take().expect("stderr pipe missing");
333 #[cfg(windows)]
334 {
335 use std::os::windows::io::AsRawHandle;
336 let mut handles = self
337 .capture_pipe_handles
338 .lock()
339 .expect("capture pipe handles mutex poisoned");
340 handles.stdout = Some(stdout.as_raw_handle() as usize);
341 handles.stderr = Some(stderr.as_raw_handle() as usize);
342 }
343 self.spawn_reader(
344 stdout,
345 StreamKind::Stdout,
346 StreamKind::Stdout,
347 self.pipe_done_callback(StreamKind::Stdout),
348 );
349 self.spawn_reader(
350 stderr,
351 StreamKind::Stderr,
352 match self.config.stderr_mode {
353 StderrMode::Stdout => StreamKind::Stdout,
354 StderrMode::Pipe => StreamKind::Stderr,
355 },
356 self.pipe_done_callback(StreamKind::Stderr),
357 );
358 }
359 *guard = Some(ChildState {
360 child,
361 #[cfg(windows)]
362 _job: job,
363 });
364 drop(guard);
365 self.spawn_exit_waiter();
366 Ok(())
367 }
368
369 fn spawn_exit_waiter(&self) {
372 let child = Arc::clone(&self.child);
373 let shared = Arc::clone(&self.shared);
374 thread::spawn(move || {
375 loop {
376 if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
377 return;
378 }
379 {
380 let mut guard = child.lock().expect("child mutex poisoned");
381 if let Some(child_state) = guard.as_mut() {
382 let pid = child_state.child.id();
383 match child_state.child.try_wait() {
384 Ok(Some(status)) => {
385 let code = exit_code(status);
386 shared.returncode.store(code as i64, Ordering::Release);
387 shared.emit_exited(pid, code);
391 shared.condvar.notify_all();
392 return;
393 }
394 Ok(None) => {}
395 Err(_) => return,
396 }
397 } else {
398 return;
399 }
400 }
401 thread::sleep(Duration::from_millis(10));
407 }
408 });
409 }
410
411 pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
413 let mut guard = self.child.lock().expect("child mutex poisoned");
414 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
415 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
416 use std::io::Write;
417 stdin.write_all(data).map_err(ProcessError::Io)?;
418 stdin.flush().map_err(ProcessError::Io)?;
419 drop(child.stdin.take());
420 Ok(())
421 }
422
423 pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
428 let mut guard = self.child.lock().expect("child mutex poisoned");
429 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
430 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
431 use std::io::Write;
432 stdin.write_all(data).map_err(ProcessError::Io)?;
433 stdin.flush().map_err(ProcessError::Io)?;
434 Ok(())
435 }
436
437 pub fn close_stdin(&self) -> Result<(), ProcessError> {
440 let mut guard = self.child.lock().expect("child mutex poisoned");
441 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
442 drop(child.stdin.take());
443 Ok(())
444 }
445
446 pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
450 if let Some(code) = self.returncode() {
452 return Ok(Some(code));
453 }
454 let mut guard = self.child.lock().expect("child mutex poisoned");
455 let Some(child_state) = guard.as_mut() else {
456 return Ok(self.returncode());
457 };
458 let pid = child_state.child.id();
459 let child = &mut child_state.child;
460 let status = child.try_wait().map_err(ProcessError::Io)?;
461 if let Some(status) = status {
462 let code = exit_code(status);
463 self.set_returncode(code);
464 self.shared.emit_exited(pid, code);
465 return Ok(Some(code));
466 }
467 Ok(None)
468 }
469
470 #[inline(never)]
472 pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
477 public_symbols::rp_native_process_wait_public(self, timeout)
478 }
479
480 fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
481 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
482 if self.child.lock().expect("child mutex poisoned").is_none() {
483 return self.returncode().ok_or(ProcessError::NotRunning);
484 }
485 if let Some(code) = self.returncode() {
487 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
488 return Ok(code);
489 }
490 let start = Instant::now();
491 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
492 loop {
493 let rc = self.shared.returncode.load(Ordering::Acquire);
495 if rc != RETURNCODE_NOT_SET {
496 drop(guard);
497 let code = rc as i32;
498 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
499 return Ok(code);
500 }
501 if let Some(limit) = timeout {
502 let elapsed = start.elapsed();
503 if elapsed >= limit {
504 return Err(ProcessError::Timeout);
505 }
506 let remaining = limit - elapsed;
507 let wait_time = remaining.min(Duration::from_millis(50));
509 guard = self
510 .shared
511 .condvar
512 .wait_timeout(guard, wait_time)
513 .expect("queue mutex poisoned")
514 .0;
515 } else {
516 guard = self
518 .shared
519 .condvar
520 .wait_timeout(guard, Duration::from_millis(50))
521 .expect("queue mutex poisoned")
522 .0;
523 }
524 }
525 }
526
527 #[inline(never)]
529 pub fn kill(&self) -> Result<(), ProcessError> {
531 public_symbols::rp_native_process_kill_public(self)
532 }
533
534 fn kill_impl(&self) -> Result<(), ProcessError> {
535 crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
536 {
537 let mut guard = self.child.lock().expect("child mutex poisoned");
538 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
539 let pid = child.id();
540 child.kill().map_err(ProcessError::Io)?;
541 let status = child.wait().map_err(ProcessError::Io)?;
542 let code = exit_code(status);
543 self.set_returncode(code);
544 self.shared.emit_exited(pid, code);
547 }
548 #[cfg(windows)]
556 self.cancel_capture_io();
557 public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
570 self,
571 kill_drain_deadline(),
572 );
573 Ok(())
574 }
575
576 pub fn terminate(&self) -> Result<(), ProcessError> {
580 self.kill()
581 }
582
583 pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
598 #[cfg(unix)]
599 {
600 if !self.config.create_process_group {
601 return Ok(());
602 }
603 let pid = match self.pid() {
604 Some(p) => p as i32,
605 None => return Err(ProcessError::NotRunning),
606 };
607 let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
608 if result != 0 {
609 let err = std::io::Error::last_os_error();
610 if err.raw_os_error() != Some(libc::ESRCH) {
611 return Err(ProcessError::Io(err));
612 }
613 }
614 Ok(())
615 }
616 #[cfg(windows)]
617 {
618 if !self.config.create_process_group {
619 return Ok(());
624 }
625 let pid = match self.pid() {
626 Some(p) => p,
627 None => return Err(ProcessError::NotRunning),
628 };
629 let ok = unsafe {
633 winapi::um::wincon::GenerateConsoleCtrlEvent(
634 winapi::um::wincon::CTRL_BREAK_EVENT,
635 pid,
636 )
637 };
638 if ok == 0 {
639 let err = std::io::Error::last_os_error();
640 if err.raw_os_error() != Some(6) {
646 return Err(ProcessError::Io(err));
647 }
648 }
649 Ok(())
650 }
651 }
652
653 #[inline(never)]
655 pub fn close(&self) -> Result<(), ProcessError> {
657 public_symbols::rp_native_process_close_public(self)
658 }
659
660 fn close_impl(&self) -> Result<(), ProcessError> {
661 crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
662 if self.child.lock().expect("child mutex poisoned").is_none() {
663 return Ok(());
664 }
665 if self.poll()?.is_none() {
666 self.kill()?;
667 } else {
668 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
669 }
670 Ok(())
671 }
672
673 pub fn pid(&self) -> Option<u32> {
675 self.child
676 .lock()
677 .expect("child mutex poisoned")
678 .as_ref()
679 .map(|state| state.child.id())
680 }
681
682 pub fn returncode(&self) -> Option<i32> {
684 let v = self.shared.returncode.load(Ordering::Acquire);
685 if v == RETURNCODE_NOT_SET {
686 None
687 } else {
688 Some(v as i32)
689 }
690 }
691
692 pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
694 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
695 return false;
696 }
697 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
698 match stream {
699 StreamKind::Stdout => !guard.stdout_queue.is_empty(),
700 StreamKind::Stderr => !guard.stderr_queue.is_empty(),
701 }
702 }
703
704 pub fn has_pending_combined(&self) -> bool {
706 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
707 !guard.combined_queue.is_empty()
708 }
709
710 pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
712 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
713 return Vec::new();
714 }
715 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
716 let queue = match stream {
717 StreamKind::Stdout => &mut guard.stdout_queue,
718 StreamKind::Stderr => &mut guard.stderr_queue,
719 };
720 queue.drain(..).collect()
721 }
722
723 pub fn drain_combined(&self) -> Vec<StreamEvent> {
725 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
726 guard.combined_queue.drain(..).collect()
727 }
728
729 pub fn read_stream(
734 &self,
735 stream: StreamKind,
736 timeout: Option<Duration>,
737 ) -> ReadStatus<Vec<u8>> {
738 let deadline = timeout.map(|limit| Instant::now() + limit);
739 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
740
741 loop {
742 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
743 return ReadStatus::Eof;
744 }
745
746 let queue = match stream {
747 StreamKind::Stdout => &mut guard.stdout_queue,
748 StreamKind::Stderr => &mut guard.stderr_queue,
749 };
750 if let Some(line) = queue.pop_front() {
751 return ReadStatus::Line(line);
752 }
753
754 let closed = match stream {
755 StreamKind::Stdout => {
756 if self.config.stderr_mode == StderrMode::Stdout {
757 guard.stdout_closed && guard.stderr_closed
758 } else {
759 guard.stdout_closed
760 }
761 }
762 StreamKind::Stderr => guard.stderr_closed,
763 };
764 if closed {
765 return ReadStatus::Eof;
766 }
767
768 match deadline {
769 Some(deadline) => {
770 let now = Instant::now();
771 if now >= deadline {
772 return ReadStatus::Timeout;
773 }
774 let wait = deadline.saturating_duration_since(now);
775 let result = self
776 .shared
777 .condvar
778 .wait_timeout(guard, wait)
779 .expect("queue mutex poisoned");
780 guard = result.0;
781 if result.1.timed_out() {
782 return ReadStatus::Timeout;
783 }
784 }
785 None => {
786 guard = self
787 .shared
788 .condvar
789 .wait(guard)
790 .expect("queue mutex poisoned");
791 }
792 }
793 }
794 }
795
796 #[inline(never)]
798 pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
800 public_symbols::rp_native_process_read_combined_public(self, timeout)
801 }
802
803 fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
804 crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
805 let deadline = timeout.map(|limit| Instant::now() + limit);
806 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
807
808 loop {
809 if let Some(event) = guard.combined_queue.pop_front() {
810 return ReadStatus::Line(event);
811 }
812 if guard.stdout_closed && guard.stderr_closed {
813 return ReadStatus::Eof;
814 }
815
816 match deadline {
817 Some(deadline) => {
818 let now = Instant::now();
819 if now >= deadline {
820 return ReadStatus::Timeout;
821 }
822 let wait = deadline.saturating_duration_since(now);
823 let result = self
824 .shared
825 .condvar
826 .wait_timeout(guard, wait)
827 .expect("queue mutex poisoned");
828 guard = result.0;
829 if result.1.timed_out() {
830 return ReadStatus::Timeout;
831 }
832 }
833 None => {
834 guard = self
835 .shared
836 .condvar
837 .wait(guard)
838 .expect("queue mutex poisoned");
839 }
840 }
841 }
842 }
843
844 pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
846 self.shared
847 .queues
848 .lock()
849 .expect("queue mutex poisoned")
850 .stdout_history
851 .clone()
852 .into_iter()
853 .collect()
854 }
855
856 fn captured_stdout_raw(&self) -> Vec<u8> {
857 self.shared
858 .queues
859 .lock()
860 .expect("queue mutex poisoned")
861 .stdout_raw
862 .clone()
863 }
864
865 pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
867 if self.config.stderr_mode == StderrMode::Stdout {
868 return Vec::new();
869 }
870 self.shared
871 .queues
872 .lock()
873 .expect("queue mutex poisoned")
874 .stderr_history
875 .clone()
876 .into_iter()
877 .collect()
878 }
879
880 fn captured_stderr_raw(&self) -> Vec<u8> {
881 if self.config.stderr_mode == StderrMode::Stdout {
882 return Vec::new();
883 }
884 self.shared
885 .queues
886 .lock()
887 .expect("queue mutex poisoned")
888 .stderr_raw
889 .clone()
890 }
891
892 pub fn captured_combined(&self) -> Vec<StreamEvent> {
894 self.shared
895 .queues
896 .lock()
897 .expect("queue mutex poisoned")
898 .combined_history
899 .clone()
900 .into_iter()
901 .collect()
902 }
903
904 pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
906 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
907 return 0;
908 }
909 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
910 match stream {
911 StreamKind::Stdout => guard.stdout_history_bytes,
912 StreamKind::Stderr => guard.stderr_history_bytes,
913 }
914 }
915
916 pub fn captured_combined_bytes(&self) -> usize {
918 self.shared
919 .queues
920 .lock()
921 .expect("queue mutex poisoned")
922 .combined_history_bytes
923 }
924
925 pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
927 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
928 return 0;
929 }
930 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
931 match stream {
932 StreamKind::Stdout => {
933 let released = guard.stdout_history_bytes;
934 guard.stdout_history.clear();
935 guard.stdout_raw.clear();
936 guard.stdout_history_bytes = 0;
937 released
938 }
939 StreamKind::Stderr => {
940 let released = guard.stderr_history_bytes;
941 guard.stderr_history.clear();
942 guard.stderr_raw.clear();
943 guard.stderr_history_bytes = 0;
944 released
945 }
946 }
947 }
948
949 pub fn clear_captured_combined(&self) -> usize {
951 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
952 let released = guard.combined_history_bytes;
953 guard.combined_history.clear();
954 guard.combined_history_bytes = 0;
955 released
956 }
957
958 fn build_command(&self) -> Command {
959 let mut command = match &self.config.command {
960 CommandSpec::Shell(command) => shell_command(command),
961 CommandSpec::Argv(argv) => {
962 let mut command = Command::new(&argv[0]);
963 if argv.len() > 1 {
964 command.args(&argv[1..]);
965 }
966 command
967 }
968 };
969 if let Some(cwd) = &self.config.cwd {
970 command.current_dir(cwd);
971 }
972 if let Some(env) = &self.config.env {
973 command.env_clear();
974 command.envs(env.iter().map(|(k, v)| (k, v)));
975 }
976 #[cfg(windows)]
977 {
978 use std::os::windows::process::CommandExt;
979
980 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
985 let extra = if self.config.create_process_group {
986 CREATE_NEW_PROCESS_GROUP
987 } else {
988 0
989 };
990 let flags = self.config.creationflags.unwrap_or(0)
991 | extra
992 | windows_priority_flags(self.config.nice);
993 if flags != 0 {
994 command.creation_flags(flags);
995 }
996 }
997 #[cfg(unix)]
998 {
999 let create_process_group = self.config.create_process_group;
1000 let nice = self.config.nice;
1001
1002 if create_process_group || nice.is_some() {
1003 use std::os::unix::process::CommandExt;
1004
1005 unsafe {
1006 command.pre_exec(move || {
1007 if create_process_group && libc::setpgid(0, 0) == -1 {
1008 return Err(std::io::Error::last_os_error());
1009 }
1010 if let Some(nice) = nice {
1011 let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
1012 if result == -1 {
1013 return Err(std::io::Error::last_os_error());
1014 }
1015 }
1016 Ok(())
1017 });
1018 }
1019 }
1020 }
1021 command
1022 }
1023
1024 fn spawn_reader<R>(
1025 &self,
1026 pipe: R,
1027 source_stream: StreamKind,
1028 visible_stream: StreamKind,
1029 on_pipe_done: Box<dyn FnOnce() + Send>,
1030 ) where
1031 R: Read + Send + 'static,
1032 {
1033 let shared = Arc::clone(&self.shared);
1034 thread::spawn(move || {
1035 let mut reader = pipe;
1036 let mut chunk = vec![0_u8; 65536];
1037 let mut pending = Vec::new();
1038
1039 loop {
1040 match reader.read(&mut chunk) {
1041 Ok(0) => break,
1042 Ok(n) => {
1043 append_raw(&shared, visible_stream, &chunk[..n]);
1044 let lines = feed_chunk(&mut pending, &chunk[..n]);
1045 emit_lines(&shared, visible_stream, lines);
1046 }
1047 Err(_) => break,
1048 }
1049 }
1050
1051 if !pending.is_empty() {
1052 emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
1053 }
1054
1055 on_pipe_done();
1060 drop(reader);
1061
1062 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1063 match source_stream {
1064 StreamKind::Stdout => guard.stdout_closed = true,
1065 StreamKind::Stderr => guard.stderr_closed = true,
1066 }
1067 shared.condvar.notify_all();
1068 });
1069 }
1070
1071 #[cfg(windows)]
1072 fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
1073 let handles = Arc::clone(&self.capture_pipe_handles);
1074 Box::new(move || {
1075 let mut guard = handles.lock().expect("capture pipe handles mutex poisoned");
1076 match stream {
1077 StreamKind::Stdout => guard.stdout = None,
1078 StreamKind::Stderr => guard.stderr = None,
1079 }
1080 })
1081 }
1082
1083 #[cfg(not(windows))]
1084 fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
1085 Box::new(|| {})
1086 }
1087
1088 #[cfg(windows)]
1094 fn cancel_capture_io(&self) {
1095 crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
1096 use winapi::shared::ntdef::HANDLE;
1097 use winapi::um::ioapiset::CancelIoEx;
1098 let guard = self
1099 .capture_pipe_handles
1100 .lock()
1101 .expect("capture pipe handles mutex poisoned");
1102 if let Some(h) = guard.stdout {
1103 unsafe {
1110 CancelIoEx(h as HANDLE, std::ptr::null_mut());
1111 }
1112 }
1113 if let Some(h) = guard.stderr {
1114 unsafe {
1115 CancelIoEx(h as HANDLE, std::ptr::null_mut());
1116 }
1117 }
1118 }
1119
1120 fn set_returncode(&self, code: i32) {
1121 self.shared.returncode.store(code as i64, Ordering::Release);
1122 self.shared.condvar.notify_all();
1123 }
1124
1125 fn wait_for_capture_completion_impl(&self) {
1126 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait_for_capture_completion");
1127 if !self.config.capture {
1128 return;
1129 }
1130
1131 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1132 while !(guard.stdout_closed && guard.stderr_closed) {
1133 guard = self
1134 .shared
1135 .condvar
1136 .wait(guard)
1137 .expect("queue mutex poisoned");
1138 }
1139 }
1140
1141 fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
1149 crate::rp_rust_debug_scope!(
1150 "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
1151 );
1152 if !self.config.capture {
1153 return true;
1154 }
1155
1156 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
1157 while !(guard.stdout_closed && guard.stderr_closed) {
1158 let now = Instant::now();
1159 if now >= deadline {
1160 guard.stdout_closed = true;
1161 guard.stderr_closed = true;
1162 self.shared.condvar.notify_all();
1163 return false;
1164 }
1165 let (next_guard, result) = self
1166 .shared
1167 .condvar
1168 .wait_timeout(guard, deadline - now)
1169 .expect("queue mutex poisoned");
1170 guard = next_guard;
1171 if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
1172 guard.stdout_closed = true;
1173 guard.stderr_closed = true;
1174 self.shared.condvar.notify_all();
1175 return false;
1176 }
1177 }
1178 true
1179 }
1180}
1181
1182fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
1183 if lines.is_empty() {
1184 return;
1185 }
1186 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1187 for line in lines {
1188 let line_len = line.len();
1189 match stream {
1190 StreamKind::Stdout => {
1191 guard.stdout_history_bytes += line_len;
1192 guard.stdout_history.push_back(line.clone());
1193 guard.stdout_queue.push_back(line.clone());
1194 }
1195 StreamKind::Stderr => {
1196 guard.stderr_history_bytes += line_len;
1197 guard.stderr_history.push_back(line.clone());
1198 guard.stderr_queue.push_back(line.clone());
1199 }
1200 }
1201 let event = StreamEvent { stream, line };
1202 guard.combined_history_bytes += line_len;
1203 guard.combined_history.push_back(event.clone());
1204 guard.combined_queue.push_back(event);
1205 }
1206 shared.condvar.notify_all();
1207}
1208
1209fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1210 if chunk.is_empty() {
1211 return;
1212 }
1213 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1214 match stream {
1215 StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1216 StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1217 }
1218}
1219
1220pub fn run_command(
1226 mut config: ProcessConfig,
1227 timeout: Option<Duration>,
1228) -> Result<RunOutput, ProcessError> {
1229 config.capture = true;
1230 let process = NativeProcess::new(config);
1231 process.start()?;
1232
1233 let exit_code = match process.wait(timeout) {
1234 Ok(code) => code,
1235 Err(ProcessError::Timeout) => {
1236 match process.kill() {
1237 Ok(()) | Err(ProcessError::NotRunning) => {}
1238 Err(error) => return Err(error),
1239 }
1240 return Err(ProcessError::Timeout);
1241 }
1242 Err(error) => return Err(error),
1243 };
1244
1245 Ok(RunOutput {
1246 stdout: process.captured_stdout_raw(),
1247 stderr: process.captured_stderr_raw(),
1248 exit_code,
1249 })
1250}
1251
1252pub(crate) fn shell_command(command: &str) -> Command {
1253 #[cfg(windows)]
1254 {
1255 use std::os::windows::process::CommandExt;
1256
1257 let mut cmd = Command::new("cmd");
1258 cmd.raw_arg("/D /S /C \"");
1259 cmd.raw_arg(command);
1260 cmd.raw_arg("\"");
1261 cmd
1262 }
1263 #[cfg(not(windows))]
1264 {
1265 let mut cmd = Command::new("sh");
1266 cmd.arg("-lc").arg(command);
1267 cmd
1268 }
1269}
1270
1271#[cfg(test)]
1272mod tests;