1use std::collections::VecDeque;
2use std::io::Read;
3use std::process::{Child, Command, Stdio};
4use std::sync::atomic::{AtomicI64, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9pub mod console_detect;
10pub mod containment;
11mod helpers;
12#[cfg(feature = "originator-scan")]
13pub mod originator;
14#[cfg(feature = "client")]
19pub mod proto {
20 pub mod daemon {
21 include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
22 }
23}
24
25#[cfg(feature = "client")]
26pub mod client;
27
28#[cfg(feature = "telemetry")]
31#[path = "daemon/telemetry.rs"]
32pub mod telemetry;
33
34#[cfg(feature = "daemon")]
37pub mod daemon;
38pub mod pty;
39mod public_symbols;
40mod rust_debug;
41pub mod spawn;
42pub mod terminal_graphics;
43mod types;
44#[cfg(unix)]
45mod unix;
46#[cfg(windows)]
47mod windows;
48
49pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
50pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
51#[cfg(feature = "originator-scan")]
52pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
53pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
54pub use spawn::{
55 spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
56 StdioSource,
57};
58pub use terminal_graphics::{
59 current_terminal_capabilities, current_terminal_capabilities_with_timeout,
60 detect_terminal_capabilities, CapabilityStatus, EvidenceStrength, GraphicsCapability,
61 GraphicsProtocol, TerminalCapabilities, TerminalCapabilityInput, TerminalGraphicsCapabilities,
62 TerminalProbeEvidence,
63};
64pub use types::{
65 CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
66 StreamEvent, StreamKind,
67};
68
69pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
70#[cfg(unix)]
71pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
72#[cfg(windows)]
73pub(crate) use windows::{
74 assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
75 WindowsJobHandle,
76};
77
78#[macro_export]
79macro_rules! rp_rust_debug_scope {
80 ($label:expr) => {
81 let _running_process_rust_debug_scope =
82 $crate::RustDebugScopeGuard::enter($label, file!(), line!());
83 };
84}
85
86#[derive(Default)]
87struct QueueState {
88 stdout_queue: VecDeque<Vec<u8>>,
89 stderr_queue: VecDeque<Vec<u8>>,
90 combined_queue: VecDeque<StreamEvent>,
91 stdout_history: VecDeque<Vec<u8>>,
92 stderr_history: VecDeque<Vec<u8>>,
93 combined_history: VecDeque<StreamEvent>,
94 stdout_raw: Vec<u8>,
95 stderr_raw: Vec<u8>,
96 stdout_history_bytes: usize,
97 stderr_history_bytes: usize,
98 combined_history_bytes: usize,
99 stdout_closed: bool,
100 stderr_closed: bool,
101}
102
103const RETURNCODE_NOT_SET: i64 = i64::MIN;
105
106struct SharedState {
107 queues: Mutex<QueueState>,
108 condvar: Condvar,
109 returncode: AtomicI64,
112}
113
114struct ChildState {
115 child: Child,
116 #[cfg(windows)]
117 _job: WindowsJobHandle,
118}
119
120impl SharedState {
121 fn new(capture: bool) -> Self {
122 let queues = QueueState {
123 stdout_closed: !capture,
124 stderr_closed: !capture,
125 ..QueueState::default()
126 };
127 Self {
128 queues: Mutex::new(queues),
129 condvar: Condvar::new(),
130 returncode: AtomicI64::new(RETURNCODE_NOT_SET),
131 }
132 }
133}
134
135pub struct NativeProcess {
136 config: ProcessConfig,
137 child: Arc<Mutex<Option<ChildState>>>,
138 shared: Arc<SharedState>,
139 #[cfg(windows)]
140 capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
141}
142
143impl NativeProcess {
144 pub fn new(config: ProcessConfig) -> Self {
145 Self {
146 shared: Arc::new(SharedState::new(config.capture)),
147 child: Arc::new(Mutex::new(None)),
148 config,
149 #[cfg(windows)]
150 capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
151 }
152 }
153
154 #[inline(never)]
156 pub fn start(&self) -> Result<(), ProcessError> {
157 public_symbols::rp_native_process_start_public(self)
158 }
159
160 fn start_impl(&self) -> Result<(), ProcessError> {
161 crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
162 let mut guard = self.child.lock().expect("child mutex poisoned");
163 if guard.is_some() {
164 return Err(ProcessError::AlreadyStarted);
165 }
166
167 let mut command = self.build_command();
168 match self.config.stdin_mode {
169 StdinMode::Inherit => {}
170 StdinMode::Piped => {
171 command.stdin(Stdio::piped());
172 }
173 StdinMode::Null => {
174 command.stdin(Stdio::null());
175 }
176 }
177 if self.config.capture {
178 command.stdout(Stdio::piped());
179 command.stderr(Stdio::piped());
180 }
181
182 let mut child = command.spawn().map_err(ProcessError::Spawn)?;
183 log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
184 #[cfg(windows)]
185 let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
186 .map_err(ProcessError::Spawn)?;
187 if self.config.capture {
188 let stdout = child.stdout.take().expect("stdout pipe missing");
189 let stderr = child.stderr.take().expect("stderr pipe missing");
190 #[cfg(windows)]
191 {
192 use std::os::windows::io::AsRawHandle;
193 let mut handles = self
194 .capture_pipe_handles
195 .lock()
196 .expect("capture pipe handles mutex poisoned");
197 handles.stdout = Some(stdout.as_raw_handle() as usize);
198 handles.stderr = Some(stderr.as_raw_handle() as usize);
199 }
200 self.spawn_reader(
201 stdout,
202 StreamKind::Stdout,
203 StreamKind::Stdout,
204 self.pipe_done_callback(StreamKind::Stdout),
205 );
206 self.spawn_reader(
207 stderr,
208 StreamKind::Stderr,
209 match self.config.stderr_mode {
210 StderrMode::Stdout => StreamKind::Stdout,
211 StderrMode::Pipe => StreamKind::Stderr,
212 },
213 self.pipe_done_callback(StreamKind::Stderr),
214 );
215 }
216 *guard = Some(ChildState {
217 child,
218 #[cfg(windows)]
219 _job: job,
220 });
221 drop(guard);
222 self.spawn_exit_waiter();
223 Ok(())
224 }
225
226 fn spawn_exit_waiter(&self) {
229 let child = Arc::clone(&self.child);
230 let shared = Arc::clone(&self.shared);
231 thread::spawn(move || loop {
232 if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
233 return;
234 }
235 {
236 let mut guard = child.lock().expect("child mutex poisoned");
237 if let Some(child_state) = guard.as_mut() {
238 match child_state.child.try_wait() {
239 Ok(Some(status)) => {
240 let code = exit_code(status);
241 shared.returncode.store(code as i64, Ordering::Release);
242 shared.condvar.notify_all();
243 return;
244 }
245 Ok(None) => {}
246 Err(_) => return,
247 }
248 } else {
249 return;
250 }
251 }
252 thread::sleep(Duration::from_millis(10));
258 });
259 }
260
261 pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
262 let mut guard = self.child.lock().expect("child mutex poisoned");
263 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
264 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
265 use std::io::Write;
266 stdin.write_all(data).map_err(ProcessError::Io)?;
267 stdin.flush().map_err(ProcessError::Io)?;
268 drop(child.stdin.take());
269 Ok(())
270 }
271
272 pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
277 let mut guard = self.child.lock().expect("child mutex poisoned");
278 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
279 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
280 use std::io::Write;
281 stdin.write_all(data).map_err(ProcessError::Io)?;
282 stdin.flush().map_err(ProcessError::Io)?;
283 Ok(())
284 }
285
286 pub fn close_stdin(&self) -> Result<(), ProcessError> {
289 let mut guard = self.child.lock().expect("child mutex poisoned");
290 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
291 drop(child.stdin.take());
292 Ok(())
293 }
294
295 pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
296 if let Some(code) = self.returncode() {
298 return Ok(Some(code));
299 }
300 let mut guard = self.child.lock().expect("child mutex poisoned");
301 let Some(child_state) = guard.as_mut() else {
302 return Ok(self.returncode());
303 };
304 let child = &mut child_state.child;
305 let status = child.try_wait().map_err(ProcessError::Io)?;
306 if let Some(status) = status {
307 let code = exit_code(status);
308 self.set_returncode(code);
309 return Ok(Some(code));
310 }
311 Ok(None)
312 }
313
314 #[inline(never)]
316 pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
317 public_symbols::rp_native_process_wait_public(self, timeout)
318 }
319
320 fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
321 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
322 if self.child.lock().expect("child mutex poisoned").is_none() {
323 return self.returncode().ok_or(ProcessError::NotRunning);
324 }
325 if let Some(code) = self.returncode() {
327 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
328 return Ok(code);
329 }
330 let start = Instant::now();
331 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
332 loop {
333 let rc = self.shared.returncode.load(Ordering::Acquire);
335 if rc != RETURNCODE_NOT_SET {
336 drop(guard);
337 let code = rc as i32;
338 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
339 return Ok(code);
340 }
341 if let Some(limit) = timeout {
342 let elapsed = start.elapsed();
343 if elapsed >= limit {
344 return Err(ProcessError::Timeout);
345 }
346 let remaining = limit - elapsed;
347 let wait_time = remaining.min(Duration::from_millis(50));
349 guard = self
350 .shared
351 .condvar
352 .wait_timeout(guard, wait_time)
353 .expect("queue mutex poisoned")
354 .0;
355 } else {
356 guard = self
358 .shared
359 .condvar
360 .wait_timeout(guard, Duration::from_millis(50))
361 .expect("queue mutex poisoned")
362 .0;
363 }
364 }
365 }
366
367 #[inline(never)]
369 pub fn kill(&self) -> Result<(), ProcessError> {
370 public_symbols::rp_native_process_kill_public(self)
371 }
372
373 fn kill_impl(&self) -> Result<(), ProcessError> {
374 crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
375 {
376 let mut guard = self.child.lock().expect("child mutex poisoned");
377 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
378 child.kill().map_err(ProcessError::Io)?;
379 let status = child.wait().map_err(ProcessError::Io)?;
380 self.set_returncode(exit_code(status));
381 }
382 #[cfg(windows)]
390 self.cancel_capture_io();
391 public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
404 self,
405 kill_drain_deadline(),
406 );
407 Ok(())
408 }
409
410 pub fn terminate(&self) -> Result<(), ProcessError> {
411 self.kill()
412 }
413
414 pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
429 #[cfg(unix)]
430 {
431 if !self.config.create_process_group {
432 return Ok(());
433 }
434 let pid = match self.pid() {
435 Some(p) => p as i32,
436 None => return Err(ProcessError::NotRunning),
437 };
438 let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
439 if result != 0 {
440 let err = std::io::Error::last_os_error();
441 if err.raw_os_error() != Some(libc::ESRCH) {
442 return Err(ProcessError::Io(err));
443 }
444 }
445 Ok(())
446 }
447 #[cfg(windows)]
448 {
449 if !self.config.create_process_group {
450 return Ok(());
455 }
456 let pid = match self.pid() {
457 Some(p) => p,
458 None => return Err(ProcessError::NotRunning),
459 };
460 let ok = unsafe {
464 winapi::um::wincon::GenerateConsoleCtrlEvent(
465 winapi::um::wincon::CTRL_BREAK_EVENT,
466 pid,
467 )
468 };
469 if ok == 0 {
470 let err = std::io::Error::last_os_error();
471 if err.raw_os_error() != Some(6) {
477 return Err(ProcessError::Io(err));
478 }
479 }
480 Ok(())
481 }
482 }
483
484 #[inline(never)]
486 pub fn close(&self) -> Result<(), ProcessError> {
487 public_symbols::rp_native_process_close_public(self)
488 }
489
490 fn close_impl(&self) -> Result<(), ProcessError> {
491 crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
492 if self.child.lock().expect("child mutex poisoned").is_none() {
493 return Ok(());
494 }
495 if self.poll()?.is_none() {
496 self.kill()?;
497 } else {
498 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
499 }
500 Ok(())
501 }
502
503 pub fn pid(&self) -> Option<u32> {
504 self.child
505 .lock()
506 .expect("child mutex poisoned")
507 .as_ref()
508 .map(|state| state.child.id())
509 }
510
511 pub fn returncode(&self) -> Option<i32> {
512 let v = self.shared.returncode.load(Ordering::Acquire);
513 if v == RETURNCODE_NOT_SET {
514 None
515 } else {
516 Some(v as i32)
517 }
518 }
519
520 pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
521 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
522 return false;
523 }
524 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
525 match stream {
526 StreamKind::Stdout => !guard.stdout_queue.is_empty(),
527 StreamKind::Stderr => !guard.stderr_queue.is_empty(),
528 }
529 }
530
531 pub fn has_pending_combined(&self) -> bool {
532 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
533 !guard.combined_queue.is_empty()
534 }
535
536 pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
537 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
538 return Vec::new();
539 }
540 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
541 let queue = match stream {
542 StreamKind::Stdout => &mut guard.stdout_queue,
543 StreamKind::Stderr => &mut guard.stderr_queue,
544 };
545 queue.drain(..).collect()
546 }
547
548 pub fn drain_combined(&self) -> Vec<StreamEvent> {
549 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
550 guard.combined_queue.drain(..).collect()
551 }
552
553 pub fn read_stream(
554 &self,
555 stream: StreamKind,
556 timeout: Option<Duration>,
557 ) -> ReadStatus<Vec<u8>> {
558 let deadline = timeout.map(|limit| Instant::now() + limit);
559 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
560
561 loop {
562 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
563 return ReadStatus::Eof;
564 }
565
566 let queue = match stream {
567 StreamKind::Stdout => &mut guard.stdout_queue,
568 StreamKind::Stderr => &mut guard.stderr_queue,
569 };
570 if let Some(line) = queue.pop_front() {
571 return ReadStatus::Line(line);
572 }
573
574 let closed = match stream {
575 StreamKind::Stdout => {
576 if self.config.stderr_mode == StderrMode::Stdout {
577 guard.stdout_closed && guard.stderr_closed
578 } else {
579 guard.stdout_closed
580 }
581 }
582 StreamKind::Stderr => guard.stderr_closed,
583 };
584 if closed {
585 return ReadStatus::Eof;
586 }
587
588 match deadline {
589 Some(deadline) => {
590 let now = Instant::now();
591 if now >= deadline {
592 return ReadStatus::Timeout;
593 }
594 let wait = deadline.saturating_duration_since(now);
595 let result = self
596 .shared
597 .condvar
598 .wait_timeout(guard, wait)
599 .expect("queue mutex poisoned");
600 guard = result.0;
601 if result.1.timed_out() {
602 return ReadStatus::Timeout;
603 }
604 }
605 None => {
606 guard = self
607 .shared
608 .condvar
609 .wait(guard)
610 .expect("queue mutex poisoned");
611 }
612 }
613 }
614 }
615
616 #[inline(never)]
618 pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
619 public_symbols::rp_native_process_read_combined_public(self, timeout)
620 }
621
622 fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
623 crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
624 let deadline = timeout.map(|limit| Instant::now() + limit);
625 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
626
627 loop {
628 if let Some(event) = guard.combined_queue.pop_front() {
629 return ReadStatus::Line(event);
630 }
631 if guard.stdout_closed && guard.stderr_closed {
632 return ReadStatus::Eof;
633 }
634
635 match deadline {
636 Some(deadline) => {
637 let now = Instant::now();
638 if now >= deadline {
639 return ReadStatus::Timeout;
640 }
641 let wait = deadline.saturating_duration_since(now);
642 let result = self
643 .shared
644 .condvar
645 .wait_timeout(guard, wait)
646 .expect("queue mutex poisoned");
647 guard = result.0;
648 if result.1.timed_out() {
649 return ReadStatus::Timeout;
650 }
651 }
652 None => {
653 guard = self
654 .shared
655 .condvar
656 .wait(guard)
657 .expect("queue mutex poisoned");
658 }
659 }
660 }
661 }
662
663 pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
664 self.shared
665 .queues
666 .lock()
667 .expect("queue mutex poisoned")
668 .stdout_history
669 .clone()
670 .into_iter()
671 .collect()
672 }
673
674 fn captured_stdout_raw(&self) -> Vec<u8> {
675 self.shared
676 .queues
677 .lock()
678 .expect("queue mutex poisoned")
679 .stdout_raw
680 .clone()
681 }
682
683 pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
684 if self.config.stderr_mode == StderrMode::Stdout {
685 return Vec::new();
686 }
687 self.shared
688 .queues
689 .lock()
690 .expect("queue mutex poisoned")
691 .stderr_history
692 .clone()
693 .into_iter()
694 .collect()
695 }
696
697 fn captured_stderr_raw(&self) -> Vec<u8> {
698 if self.config.stderr_mode == StderrMode::Stdout {
699 return Vec::new();
700 }
701 self.shared
702 .queues
703 .lock()
704 .expect("queue mutex poisoned")
705 .stderr_raw
706 .clone()
707 }
708
709 pub fn captured_combined(&self) -> Vec<StreamEvent> {
710 self.shared
711 .queues
712 .lock()
713 .expect("queue mutex poisoned")
714 .combined_history
715 .clone()
716 .into_iter()
717 .collect()
718 }
719
720 pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
721 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
722 return 0;
723 }
724 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
725 match stream {
726 StreamKind::Stdout => guard.stdout_history_bytes,
727 StreamKind::Stderr => guard.stderr_history_bytes,
728 }
729 }
730
731 pub fn captured_combined_bytes(&self) -> usize {
732 self.shared
733 .queues
734 .lock()
735 .expect("queue mutex poisoned")
736 .combined_history_bytes
737 }
738
739 pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
740 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
741 return 0;
742 }
743 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
744 match stream {
745 StreamKind::Stdout => {
746 let released = guard.stdout_history_bytes;
747 guard.stdout_history.clear();
748 guard.stdout_raw.clear();
749 guard.stdout_history_bytes = 0;
750 released
751 }
752 StreamKind::Stderr => {
753 let released = guard.stderr_history_bytes;
754 guard.stderr_history.clear();
755 guard.stderr_raw.clear();
756 guard.stderr_history_bytes = 0;
757 released
758 }
759 }
760 }
761
762 pub fn clear_captured_combined(&self) -> usize {
763 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
764 let released = guard.combined_history_bytes;
765 guard.combined_history.clear();
766 guard.combined_history_bytes = 0;
767 released
768 }
769
770 fn build_command(&self) -> Command {
771 let mut command = match &self.config.command {
772 CommandSpec::Shell(command) => shell_command(command),
773 CommandSpec::Argv(argv) => {
774 let mut command = Command::new(&argv[0]);
775 if argv.len() > 1 {
776 command.args(&argv[1..]);
777 }
778 command
779 }
780 };
781 if let Some(cwd) = &self.config.cwd {
782 command.current_dir(cwd);
783 }
784 if let Some(env) = &self.config.env {
785 command.env_clear();
786 command.envs(env.iter().map(|(k, v)| (k, v)));
787 }
788 #[cfg(windows)]
789 {
790 use std::os::windows::process::CommandExt;
791
792 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
797 let extra = if self.config.create_process_group {
798 CREATE_NEW_PROCESS_GROUP
799 } else {
800 0
801 };
802 let flags = self.config.creationflags.unwrap_or(0)
803 | extra
804 | windows_priority_flags(self.config.nice);
805 if flags != 0 {
806 command.creation_flags(flags);
807 }
808 }
809 #[cfg(unix)]
810 {
811 let create_process_group = self.config.create_process_group;
812 let nice = self.config.nice;
813
814 if create_process_group || nice.is_some() {
815 use std::os::unix::process::CommandExt;
816
817 unsafe {
818 command.pre_exec(move || {
819 if create_process_group && libc::setpgid(0, 0) == -1 {
820 return Err(std::io::Error::last_os_error());
821 }
822 if let Some(nice) = nice {
823 let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
824 if result == -1 {
825 return Err(std::io::Error::last_os_error());
826 }
827 }
828 Ok(())
829 });
830 }
831 }
832 }
833 command
834 }
835
836 fn spawn_reader<R>(
837 &self,
838 pipe: R,
839 source_stream: StreamKind,
840 visible_stream: StreamKind,
841 on_pipe_done: Box<dyn FnOnce() + Send>,
842 ) where
843 R: Read + Send + 'static,
844 {
845 let shared = Arc::clone(&self.shared);
846 thread::spawn(move || {
847 let mut reader = pipe;
848 let mut chunk = vec![0_u8; 65536];
849 let mut pending = Vec::new();
850
851 loop {
852 match reader.read(&mut chunk) {
853 Ok(0) => break,
854 Ok(n) => {
855 append_raw(&shared, visible_stream, &chunk[..n]);
856 let lines = feed_chunk(&mut pending, &chunk[..n]);
857 emit_lines(&shared, visible_stream, lines);
858 }
859 Err(_) => break,
860 }
861 }
862
863 if !pending.is_empty() {
864 emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
865 }
866
867 on_pipe_done();
872 drop(reader);
873
874 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
875 match source_stream {
876 StreamKind::Stdout => guard.stdout_closed = true,
877 StreamKind::Stderr => guard.stderr_closed = true,
878 }
879 shared.condvar.notify_all();
880 });
881 }
882
883 #[cfg(windows)]
884 fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
885 let handles = Arc::clone(&self.capture_pipe_handles);
886 Box::new(move || {
887 let mut guard = handles.lock().expect("capture pipe handles mutex poisoned");
888 match stream {
889 StreamKind::Stdout => guard.stdout = None,
890 StreamKind::Stderr => guard.stderr = None,
891 }
892 })
893 }
894
895 #[cfg(not(windows))]
896 fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
897 Box::new(|| {})
898 }
899
900 #[cfg(windows)]
906 fn cancel_capture_io(&self) {
907 crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
908 use winapi::shared::ntdef::HANDLE;
909 use winapi::um::ioapiset::CancelIoEx;
910 let guard = self
911 .capture_pipe_handles
912 .lock()
913 .expect("capture pipe handles mutex poisoned");
914 if let Some(h) = guard.stdout {
915 unsafe {
922 CancelIoEx(h as HANDLE, std::ptr::null_mut());
923 }
924 }
925 if let Some(h) = guard.stderr {
926 unsafe {
927 CancelIoEx(h as HANDLE, std::ptr::null_mut());
928 }
929 }
930 }
931
932 fn set_returncode(&self, code: i32) {
933 self.shared.returncode.store(code as i64, Ordering::Release);
934 self.shared.condvar.notify_all();
935 }
936
937 fn wait_for_capture_completion_impl(&self) {
938 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait_for_capture_completion");
939 if !self.config.capture {
940 return;
941 }
942
943 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
944 while !(guard.stdout_closed && guard.stderr_closed) {
945 guard = self
946 .shared
947 .condvar
948 .wait(guard)
949 .expect("queue mutex poisoned");
950 }
951 }
952
953 fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
961 crate::rp_rust_debug_scope!(
962 "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
963 );
964 if !self.config.capture {
965 return true;
966 }
967
968 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
969 while !(guard.stdout_closed && guard.stderr_closed) {
970 let now = Instant::now();
971 if now >= deadline {
972 guard.stdout_closed = true;
973 guard.stderr_closed = true;
974 self.shared.condvar.notify_all();
975 return false;
976 }
977 let (next_guard, result) = self
978 .shared
979 .condvar
980 .wait_timeout(guard, deadline - now)
981 .expect("queue mutex poisoned");
982 guard = next_guard;
983 if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
984 guard.stdout_closed = true;
985 guard.stderr_closed = true;
986 self.shared.condvar.notify_all();
987 return false;
988 }
989 }
990 true
991 }
992}
993
994fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
995 if lines.is_empty() {
996 return;
997 }
998 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
999 for line in lines {
1000 let line_len = line.len();
1001 match stream {
1002 StreamKind::Stdout => {
1003 guard.stdout_history_bytes += line_len;
1004 guard.stdout_history.push_back(line.clone());
1005 guard.stdout_queue.push_back(line.clone());
1006 }
1007 StreamKind::Stderr => {
1008 guard.stderr_history_bytes += line_len;
1009 guard.stderr_history.push_back(line.clone());
1010 guard.stderr_queue.push_back(line.clone());
1011 }
1012 }
1013 let event = StreamEvent { stream, line };
1014 guard.combined_history_bytes += line_len;
1015 guard.combined_history.push_back(event.clone());
1016 guard.combined_queue.push_back(event);
1017 }
1018 shared.condvar.notify_all();
1019}
1020
1021fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1022 if chunk.is_empty() {
1023 return;
1024 }
1025 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1026 match stream {
1027 StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1028 StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1029 }
1030}
1031
1032pub fn run_command(
1038 mut config: ProcessConfig,
1039 timeout: Option<Duration>,
1040) -> Result<RunOutput, ProcessError> {
1041 config.capture = true;
1042 let process = NativeProcess::new(config);
1043 process.start()?;
1044
1045 let exit_code = match process.wait(timeout) {
1046 Ok(code) => code,
1047 Err(ProcessError::Timeout) => {
1048 match process.kill() {
1049 Ok(()) | Err(ProcessError::NotRunning) => {}
1050 Err(error) => return Err(error),
1051 }
1052 return Err(ProcessError::Timeout);
1053 }
1054 Err(error) => return Err(error),
1055 };
1056
1057 Ok(RunOutput {
1058 stdout: process.captured_stdout_raw(),
1059 stderr: process.captured_stderr_raw(),
1060 exit_code,
1061 })
1062}
1063
1064pub(crate) fn shell_command(command: &str) -> Command {
1065 #[cfg(windows)]
1066 {
1067 use std::os::windows::process::CommandExt;
1068
1069 let mut cmd = Command::new("cmd");
1070 cmd.raw_arg("/D /S /C \"");
1071 cmd.raw_arg(command);
1072 cmd.raw_arg("\"");
1073 cmd
1074 }
1075 #[cfg(not(windows))]
1076 {
1077 let mut cmd = Command::new("sh");
1078 cmd.arg("-lc").arg(command);
1079 cmd
1080 }
1081}
1082
1083#[cfg(test)]
1084mod tests;