1use std::collections::VecDeque;
2use std::io::Read;
3use std::process::{Child, Command, Stdio};
4use std::sync::atomic::{AtomicI64, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9pub mod console_detect;
10pub mod containment;
11mod helpers;
12#[cfg(feature = "originator-scan")]
13pub mod originator;
14#[cfg(feature = "client")]
19pub mod proto {
20 pub mod daemon {
21 include!(concat!(env!("OUT_DIR"), "/running_process.daemon.v1.rs"));
22 }
23}
24
25#[cfg(feature = "client")]
26pub mod client;
27
28#[cfg(feature = "telemetry")]
31#[path = "daemon/telemetry.rs"]
32pub mod telemetry;
33
34#[cfg(feature = "daemon")]
37pub mod daemon;
38pub mod pty;
39mod public_symbols;
40mod rust_debug;
41pub mod spawn;
42mod types;
43#[cfg(unix)]
44mod unix;
45#[cfg(windows)]
46mod windows;
47
48pub use console_detect::{monitor_console_windows, ConsoleWindowInfo};
49pub use containment::{ContainedProcessGroup, ORIGINATOR_ENV_VAR};
50#[cfg(feature = "originator-scan")]
51pub use originator::{find_processes_by_originator, OriginatorProcessInfo};
52pub use rust_debug::{render_rust_debug_traces, RustDebugScopeGuard};
53pub use spawn::{
54 spawn, spawn_daemon, spawn_daemon_with_clear_env, DaemonChild, SpawnStdio, SpawnedChild,
55 StdioSource,
56};
57pub use types::{
58 CommandSpec, ProcessConfig, ProcessError, ReadStatus, RunOutput, StderrMode, StdinMode,
59 StreamEvent, StreamKind,
60};
61
62pub(crate) use helpers::{exit_code, feed_chunk, kill_drain_deadline, log_spawned_child_pid};
63#[cfg(unix)]
64pub use unix::{unix_set_priority, unix_signal_process, unix_signal_process_group, UnixSignal};
65#[cfg(windows)]
66pub(crate) use windows::{
67 assign_child_to_windows_kill_on_close_job_impl, windows_priority_flags, CapturePipeHandles,
68 WindowsJobHandle,
69};
70
71#[macro_export]
72macro_rules! rp_rust_debug_scope {
73 ($label:expr) => {
74 let _running_process_rust_debug_scope =
75 $crate::RustDebugScopeGuard::enter($label, file!(), line!());
76 };
77}
78
79#[derive(Default)]
80struct QueueState {
81 stdout_queue: VecDeque<Vec<u8>>,
82 stderr_queue: VecDeque<Vec<u8>>,
83 combined_queue: VecDeque<StreamEvent>,
84 stdout_history: VecDeque<Vec<u8>>,
85 stderr_history: VecDeque<Vec<u8>>,
86 combined_history: VecDeque<StreamEvent>,
87 stdout_raw: Vec<u8>,
88 stderr_raw: Vec<u8>,
89 stdout_history_bytes: usize,
90 stderr_history_bytes: usize,
91 combined_history_bytes: usize,
92 stdout_closed: bool,
93 stderr_closed: bool,
94}
95
96const RETURNCODE_NOT_SET: i64 = i64::MIN;
98
99struct SharedState {
100 queues: Mutex<QueueState>,
101 condvar: Condvar,
102 returncode: AtomicI64,
105}
106
107struct ChildState {
108 child: Child,
109 #[cfg(windows)]
110 _job: WindowsJobHandle,
111}
112
113impl SharedState {
114 fn new(capture: bool) -> Self {
115 let queues = QueueState {
116 stdout_closed: !capture,
117 stderr_closed: !capture,
118 ..QueueState::default()
119 };
120 Self {
121 queues: Mutex::new(queues),
122 condvar: Condvar::new(),
123 returncode: AtomicI64::new(RETURNCODE_NOT_SET),
124 }
125 }
126}
127
128pub struct NativeProcess {
129 config: ProcessConfig,
130 child: Arc<Mutex<Option<ChildState>>>,
131 shared: Arc<SharedState>,
132 #[cfg(windows)]
133 capture_pipe_handles: Arc<Mutex<CapturePipeHandles>>,
134}
135
136impl NativeProcess {
137 pub fn new(config: ProcessConfig) -> Self {
138 Self {
139 shared: Arc::new(SharedState::new(config.capture)),
140 child: Arc::new(Mutex::new(None)),
141 config,
142 #[cfg(windows)]
143 capture_pipe_handles: Arc::new(Mutex::new(CapturePipeHandles::default())),
144 }
145 }
146
147 #[inline(never)]
149 pub fn start(&self) -> Result<(), ProcessError> {
150 public_symbols::rp_native_process_start_public(self)
151 }
152
153 fn start_impl(&self) -> Result<(), ProcessError> {
154 crate::rp_rust_debug_scope!("running_process::NativeProcess::start");
155 let mut guard = self.child.lock().expect("child mutex poisoned");
156 if guard.is_some() {
157 return Err(ProcessError::AlreadyStarted);
158 }
159
160 let mut command = self.build_command();
161 match self.config.stdin_mode {
162 StdinMode::Inherit => {}
163 StdinMode::Piped => {
164 command.stdin(Stdio::piped());
165 }
166 StdinMode::Null => {
167 command.stdin(Stdio::null());
168 }
169 }
170 if self.config.capture {
171 command.stdout(Stdio::piped());
172 command.stderr(Stdio::piped());
173 }
174
175 let mut child = command.spawn().map_err(ProcessError::Spawn)?;
176 log_spawned_child_pid(child.id()).map_err(ProcessError::Spawn)?;
177 #[cfg(windows)]
178 let job = public_symbols::rp_assign_child_to_windows_kill_on_close_job_public(&child)
179 .map_err(ProcessError::Spawn)?;
180 if self.config.capture {
181 let stdout = child.stdout.take().expect("stdout pipe missing");
182 let stderr = child.stderr.take().expect("stderr pipe missing");
183 #[cfg(windows)]
184 {
185 use std::os::windows::io::AsRawHandle;
186 let mut handles = self
187 .capture_pipe_handles
188 .lock()
189 .expect("capture pipe handles mutex poisoned");
190 handles.stdout = Some(stdout.as_raw_handle() as usize);
191 handles.stderr = Some(stderr.as_raw_handle() as usize);
192 }
193 self.spawn_reader(
194 stdout,
195 StreamKind::Stdout,
196 StreamKind::Stdout,
197 self.pipe_done_callback(StreamKind::Stdout),
198 );
199 self.spawn_reader(
200 stderr,
201 StreamKind::Stderr,
202 match self.config.stderr_mode {
203 StderrMode::Stdout => StreamKind::Stdout,
204 StderrMode::Pipe => StreamKind::Stderr,
205 },
206 self.pipe_done_callback(StreamKind::Stderr),
207 );
208 }
209 *guard = Some(ChildState {
210 child,
211 #[cfg(windows)]
212 _job: job,
213 });
214 drop(guard);
215 self.spawn_exit_waiter();
216 Ok(())
217 }
218
219 fn spawn_exit_waiter(&self) {
222 let child = Arc::clone(&self.child);
223 let shared = Arc::clone(&self.shared);
224 thread::spawn(move || loop {
225 if shared.returncode.load(Ordering::Acquire) != RETURNCODE_NOT_SET {
226 return;
227 }
228 {
229 let mut guard = child.lock().expect("child mutex poisoned");
230 if let Some(child_state) = guard.as_mut() {
231 match child_state.child.try_wait() {
232 Ok(Some(status)) => {
233 let code = exit_code(status);
234 shared.returncode.store(code as i64, Ordering::Release);
235 shared.condvar.notify_all();
236 return;
237 }
238 Ok(None) => {}
239 Err(_) => return,
240 }
241 } else {
242 return;
243 }
244 }
245 thread::sleep(Duration::from_millis(10));
251 });
252 }
253
254 pub fn write_stdin(&self, data: &[u8]) -> Result<(), ProcessError> {
255 let mut guard = self.child.lock().expect("child mutex poisoned");
256 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
257 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
258 use std::io::Write;
259 stdin.write_all(data).map_err(ProcessError::Io)?;
260 stdin.flush().map_err(ProcessError::Io)?;
261 drop(child.stdin.take());
262 Ok(())
263 }
264
265 pub fn write_stdin_streaming(&self, data: &[u8]) -> Result<(), ProcessError> {
270 let mut guard = self.child.lock().expect("child mutex poisoned");
271 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
272 let stdin = child.stdin.as_mut().ok_or(ProcessError::StdinUnavailable)?;
273 use std::io::Write;
274 stdin.write_all(data).map_err(ProcessError::Io)?;
275 stdin.flush().map_err(ProcessError::Io)?;
276 Ok(())
277 }
278
279 pub fn close_stdin(&self) -> Result<(), ProcessError> {
282 let mut guard = self.child.lock().expect("child mutex poisoned");
283 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
284 drop(child.stdin.take());
285 Ok(())
286 }
287
288 pub fn poll(&self) -> Result<Option<i32>, ProcessError> {
289 if let Some(code) = self.returncode() {
291 return Ok(Some(code));
292 }
293 let mut guard = self.child.lock().expect("child mutex poisoned");
294 let Some(child_state) = guard.as_mut() else {
295 return Ok(self.returncode());
296 };
297 let child = &mut child_state.child;
298 let status = child.try_wait().map_err(ProcessError::Io)?;
299 if let Some(status) = status {
300 let code = exit_code(status);
301 self.set_returncode(code);
302 return Ok(Some(code));
303 }
304 Ok(None)
305 }
306
307 #[inline(never)]
309 pub fn wait(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
310 public_symbols::rp_native_process_wait_public(self, timeout)
311 }
312
313 fn wait_impl(&self, timeout: Option<Duration>) -> Result<i32, ProcessError> {
314 crate::rp_rust_debug_scope!("running_process::NativeProcess::wait");
315 if self.child.lock().expect("child mutex poisoned").is_none() {
316 return self.returncode().ok_or(ProcessError::NotRunning);
317 }
318 if let Some(code) = self.returncode() {
320 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
321 return Ok(code);
322 }
323 let start = Instant::now();
324 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
325 loop {
326 let rc = self.shared.returncode.load(Ordering::Acquire);
328 if rc != RETURNCODE_NOT_SET {
329 drop(guard);
330 let code = rc as i32;
331 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
332 return Ok(code);
333 }
334 if let Some(limit) = timeout {
335 let elapsed = start.elapsed();
336 if elapsed >= limit {
337 return Err(ProcessError::Timeout);
338 }
339 let remaining = limit - elapsed;
340 let wait_time = remaining.min(Duration::from_millis(50));
342 guard = self
343 .shared
344 .condvar
345 .wait_timeout(guard, wait_time)
346 .expect("queue mutex poisoned")
347 .0;
348 } else {
349 guard = self
351 .shared
352 .condvar
353 .wait_timeout(guard, Duration::from_millis(50))
354 .expect("queue mutex poisoned")
355 .0;
356 }
357 }
358 }
359
360 #[inline(never)]
362 pub fn kill(&self) -> Result<(), ProcessError> {
363 public_symbols::rp_native_process_kill_public(self)
364 }
365
366 fn kill_impl(&self) -> Result<(), ProcessError> {
367 crate::rp_rust_debug_scope!("running_process::NativeProcess::kill");
368 {
369 let mut guard = self.child.lock().expect("child mutex poisoned");
370 let child = &mut guard.as_mut().ok_or(ProcessError::NotRunning)?.child;
371 child.kill().map_err(ProcessError::Io)?;
372 let status = child.wait().map_err(ProcessError::Io)?;
373 self.set_returncode(exit_code(status));
374 }
375 #[cfg(windows)]
383 self.cancel_capture_io();
384 public_symbols::rp_native_process_wait_for_capture_completion_with_deadline_public(
397 self,
398 kill_drain_deadline(),
399 );
400 Ok(())
401 }
402
403 pub fn terminate(&self) -> Result<(), ProcessError> {
404 self.kill()
405 }
406
407 pub fn terminate_group_soft(&self) -> Result<(), ProcessError> {
422 #[cfg(unix)]
423 {
424 if !self.config.create_process_group {
425 return Ok(());
426 }
427 let pid = match self.pid() {
428 Some(p) => p as i32,
429 None => return Err(ProcessError::NotRunning),
430 };
431 let result = unsafe { libc::kill(-pid, libc::SIGTERM) };
432 if result != 0 {
433 let err = std::io::Error::last_os_error();
434 if err.raw_os_error() != Some(libc::ESRCH) {
435 return Err(ProcessError::Io(err));
436 }
437 }
438 Ok(())
439 }
440 #[cfg(windows)]
441 {
442 if !self.config.create_process_group {
443 return Ok(());
448 }
449 let pid = match self.pid() {
450 Some(p) => p,
451 None => return Err(ProcessError::NotRunning),
452 };
453 let ok = unsafe {
457 winapi::um::wincon::GenerateConsoleCtrlEvent(
458 winapi::um::wincon::CTRL_BREAK_EVENT,
459 pid,
460 )
461 };
462 if ok == 0 {
463 let err = std::io::Error::last_os_error();
464 if err.raw_os_error() != Some(6) {
470 return Err(ProcessError::Io(err));
471 }
472 }
473 Ok(())
474 }
475 }
476
477 #[inline(never)]
479 pub fn close(&self) -> Result<(), ProcessError> {
480 public_symbols::rp_native_process_close_public(self)
481 }
482
483 fn close_impl(&self) -> Result<(), ProcessError> {
484 crate::rp_rust_debug_scope!("running_process::NativeProcess::close");
485 if self.child.lock().expect("child mutex poisoned").is_none() {
486 return Ok(());
487 }
488 if self.poll()?.is_none() {
489 self.kill()?;
490 } else {
491 public_symbols::rp_native_process_wait_for_capture_completion_public(self);
492 }
493 Ok(())
494 }
495
496 pub fn pid(&self) -> Option<u32> {
497 self.child
498 .lock()
499 .expect("child mutex poisoned")
500 .as_ref()
501 .map(|state| state.child.id())
502 }
503
504 pub fn returncode(&self) -> Option<i32> {
505 let v = self.shared.returncode.load(Ordering::Acquire);
506 if v == RETURNCODE_NOT_SET {
507 None
508 } else {
509 Some(v as i32)
510 }
511 }
512
513 pub fn has_pending_stream(&self, stream: StreamKind) -> bool {
514 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
515 return false;
516 }
517 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
518 match stream {
519 StreamKind::Stdout => !guard.stdout_queue.is_empty(),
520 StreamKind::Stderr => !guard.stderr_queue.is_empty(),
521 }
522 }
523
524 pub fn has_pending_combined(&self) -> bool {
525 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
526 !guard.combined_queue.is_empty()
527 }
528
529 pub fn drain_stream(&self, stream: StreamKind) -> Vec<Vec<u8>> {
530 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
531 return Vec::new();
532 }
533 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
534 let queue = match stream {
535 StreamKind::Stdout => &mut guard.stdout_queue,
536 StreamKind::Stderr => &mut guard.stderr_queue,
537 };
538 queue.drain(..).collect()
539 }
540
541 pub fn drain_combined(&self) -> Vec<StreamEvent> {
542 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
543 guard.combined_queue.drain(..).collect()
544 }
545
546 pub fn read_stream(
547 &self,
548 stream: StreamKind,
549 timeout: Option<Duration>,
550 ) -> ReadStatus<Vec<u8>> {
551 let deadline = timeout.map(|limit| Instant::now() + limit);
552 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
553
554 loop {
555 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
556 return ReadStatus::Eof;
557 }
558
559 let queue = match stream {
560 StreamKind::Stdout => &mut guard.stdout_queue,
561 StreamKind::Stderr => &mut guard.stderr_queue,
562 };
563 if let Some(line) = queue.pop_front() {
564 return ReadStatus::Line(line);
565 }
566
567 let closed = match stream {
568 StreamKind::Stdout => {
569 if self.config.stderr_mode == StderrMode::Stdout {
570 guard.stdout_closed && guard.stderr_closed
571 } else {
572 guard.stdout_closed
573 }
574 }
575 StreamKind::Stderr => guard.stderr_closed,
576 };
577 if closed {
578 return ReadStatus::Eof;
579 }
580
581 match deadline {
582 Some(deadline) => {
583 let now = Instant::now();
584 if now >= deadline {
585 return ReadStatus::Timeout;
586 }
587 let wait = deadline.saturating_duration_since(now);
588 let result = self
589 .shared
590 .condvar
591 .wait_timeout(guard, wait)
592 .expect("queue mutex poisoned");
593 guard = result.0;
594 if result.1.timed_out() {
595 return ReadStatus::Timeout;
596 }
597 }
598 None => {
599 guard = self
600 .shared
601 .condvar
602 .wait(guard)
603 .expect("queue mutex poisoned");
604 }
605 }
606 }
607 }
608
609 #[inline(never)]
611 pub fn read_combined(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
612 public_symbols::rp_native_process_read_combined_public(self, timeout)
613 }
614
615 fn read_combined_impl(&self, timeout: Option<Duration>) -> ReadStatus<StreamEvent> {
616 crate::rp_rust_debug_scope!("running_process::NativeProcess::read_combined");
617 let deadline = timeout.map(|limit| Instant::now() + limit);
618 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
619
620 loop {
621 if let Some(event) = guard.combined_queue.pop_front() {
622 return ReadStatus::Line(event);
623 }
624 if guard.stdout_closed && guard.stderr_closed {
625 return ReadStatus::Eof;
626 }
627
628 match deadline {
629 Some(deadline) => {
630 let now = Instant::now();
631 if now >= deadline {
632 return ReadStatus::Timeout;
633 }
634 let wait = deadline.saturating_duration_since(now);
635 let result = self
636 .shared
637 .condvar
638 .wait_timeout(guard, wait)
639 .expect("queue mutex poisoned");
640 guard = result.0;
641 if result.1.timed_out() {
642 return ReadStatus::Timeout;
643 }
644 }
645 None => {
646 guard = self
647 .shared
648 .condvar
649 .wait(guard)
650 .expect("queue mutex poisoned");
651 }
652 }
653 }
654 }
655
656 pub fn captured_stdout(&self) -> Vec<Vec<u8>> {
657 self.shared
658 .queues
659 .lock()
660 .expect("queue mutex poisoned")
661 .stdout_history
662 .clone()
663 .into_iter()
664 .collect()
665 }
666
667 fn captured_stdout_raw(&self) -> Vec<u8> {
668 self.shared
669 .queues
670 .lock()
671 .expect("queue mutex poisoned")
672 .stdout_raw
673 .clone()
674 }
675
676 pub fn captured_stderr(&self) -> Vec<Vec<u8>> {
677 if self.config.stderr_mode == StderrMode::Stdout {
678 return Vec::new();
679 }
680 self.shared
681 .queues
682 .lock()
683 .expect("queue mutex poisoned")
684 .stderr_history
685 .clone()
686 .into_iter()
687 .collect()
688 }
689
690 fn captured_stderr_raw(&self) -> Vec<u8> {
691 if self.config.stderr_mode == StderrMode::Stdout {
692 return Vec::new();
693 }
694 self.shared
695 .queues
696 .lock()
697 .expect("queue mutex poisoned")
698 .stderr_raw
699 .clone()
700 }
701
702 pub fn captured_combined(&self) -> Vec<StreamEvent> {
703 self.shared
704 .queues
705 .lock()
706 .expect("queue mutex poisoned")
707 .combined_history
708 .clone()
709 .into_iter()
710 .collect()
711 }
712
713 pub fn captured_stream_bytes(&self, stream: StreamKind) -> usize {
714 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
715 return 0;
716 }
717 let guard = self.shared.queues.lock().expect("queue mutex poisoned");
718 match stream {
719 StreamKind::Stdout => guard.stdout_history_bytes,
720 StreamKind::Stderr => guard.stderr_history_bytes,
721 }
722 }
723
724 pub fn captured_combined_bytes(&self) -> usize {
725 self.shared
726 .queues
727 .lock()
728 .expect("queue mutex poisoned")
729 .combined_history_bytes
730 }
731
732 pub fn clear_captured_stream(&self, stream: StreamKind) -> usize {
733 if stream == StreamKind::Stderr && self.config.stderr_mode == StderrMode::Stdout {
734 return 0;
735 }
736 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
737 match stream {
738 StreamKind::Stdout => {
739 let released = guard.stdout_history_bytes;
740 guard.stdout_history.clear();
741 guard.stdout_raw.clear();
742 guard.stdout_history_bytes = 0;
743 released
744 }
745 StreamKind::Stderr => {
746 let released = guard.stderr_history_bytes;
747 guard.stderr_history.clear();
748 guard.stderr_raw.clear();
749 guard.stderr_history_bytes = 0;
750 released
751 }
752 }
753 }
754
755 pub fn clear_captured_combined(&self) -> usize {
756 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
757 let released = guard.combined_history_bytes;
758 guard.combined_history.clear();
759 guard.combined_history_bytes = 0;
760 released
761 }
762
763 fn build_command(&self) -> Command {
764 let mut command = match &self.config.command {
765 CommandSpec::Shell(command) => shell_command(command),
766 CommandSpec::Argv(argv) => {
767 let mut command = Command::new(&argv[0]);
768 if argv.len() > 1 {
769 command.args(&argv[1..]);
770 }
771 command
772 }
773 };
774 if let Some(cwd) = &self.config.cwd {
775 command.current_dir(cwd);
776 }
777 if let Some(env) = &self.config.env {
778 command.env_clear();
779 command.envs(env.iter().map(|(k, v)| (k, v)));
780 }
781 #[cfg(windows)]
782 {
783 use std::os::windows::process::CommandExt;
784
785 const CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
790 let extra = if self.config.create_process_group {
791 CREATE_NEW_PROCESS_GROUP
792 } else {
793 0
794 };
795 let flags = self.config.creationflags.unwrap_or(0)
796 | extra
797 | windows_priority_flags(self.config.nice);
798 if flags != 0 {
799 command.creation_flags(flags);
800 }
801 }
802 #[cfg(unix)]
803 {
804 let create_process_group = self.config.create_process_group;
805 let nice = self.config.nice;
806
807 if create_process_group || nice.is_some() {
808 use std::os::unix::process::CommandExt;
809
810 unsafe {
811 command.pre_exec(move || {
812 if create_process_group && libc::setpgid(0, 0) == -1 {
813 return Err(std::io::Error::last_os_error());
814 }
815 if let Some(nice) = nice {
816 let result = libc::setpriority(libc::PRIO_PROCESS, 0, nice);
817 if result == -1 {
818 return Err(std::io::Error::last_os_error());
819 }
820 }
821 Ok(())
822 });
823 }
824 }
825 }
826 command
827 }
828
829 fn spawn_reader<R>(
830 &self,
831 pipe: R,
832 source_stream: StreamKind,
833 visible_stream: StreamKind,
834 on_pipe_done: Box<dyn FnOnce() + Send>,
835 ) where
836 R: Read + Send + 'static,
837 {
838 let shared = Arc::clone(&self.shared);
839 thread::spawn(move || {
840 let mut reader = pipe;
841 let mut chunk = vec![0_u8; 65536];
842 let mut pending = Vec::new();
843
844 loop {
845 match reader.read(&mut chunk) {
846 Ok(0) => break,
847 Ok(n) => {
848 append_raw(&shared, visible_stream, &chunk[..n]);
849 let lines = feed_chunk(&mut pending, &chunk[..n]);
850 emit_lines(&shared, visible_stream, lines);
851 }
852 Err(_) => break,
853 }
854 }
855
856 if !pending.is_empty() {
857 emit_lines(&shared, visible_stream, vec![std::mem::take(&mut pending)]);
858 }
859
860 on_pipe_done();
865 drop(reader);
866
867 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
868 match source_stream {
869 StreamKind::Stdout => guard.stdout_closed = true,
870 StreamKind::Stderr => guard.stderr_closed = true,
871 }
872 shared.condvar.notify_all();
873 });
874 }
875
876 #[cfg(windows)]
877 fn pipe_done_callback(&self, stream: StreamKind) -> Box<dyn FnOnce() + Send> {
878 let handles = Arc::clone(&self.capture_pipe_handles);
879 Box::new(move || {
880 let mut guard = handles
881 .lock()
882 .expect("capture pipe handles mutex poisoned");
883 match stream {
884 StreamKind::Stdout => guard.stdout = None,
885 StreamKind::Stderr => guard.stderr = None,
886 }
887 })
888 }
889
890 #[cfg(not(windows))]
891 fn pipe_done_callback(&self, _stream: StreamKind) -> Box<dyn FnOnce() + Send> {
892 Box::new(|| {})
893 }
894
895 #[cfg(windows)]
901 fn cancel_capture_io(&self) {
902 crate::rp_rust_debug_scope!("running_process::NativeProcess::cancel_capture_io");
903 use winapi::shared::ntdef::HANDLE;
904 use winapi::um::ioapiset::CancelIoEx;
905 let guard = self
906 .capture_pipe_handles
907 .lock()
908 .expect("capture pipe handles mutex poisoned");
909 if let Some(h) = guard.stdout {
910 unsafe {
917 CancelIoEx(h as HANDLE, std::ptr::null_mut());
918 }
919 }
920 if let Some(h) = guard.stderr {
921 unsafe {
922 CancelIoEx(h as HANDLE, std::ptr::null_mut());
923 }
924 }
925 }
926
927 fn set_returncode(&self, code: i32) {
928 self.shared.returncode.store(code as i64, Ordering::Release);
929 self.shared.condvar.notify_all();
930 }
931
932 fn wait_for_capture_completion_impl(&self) {
933 crate::rp_rust_debug_scope!(
934 "running_process::NativeProcess::wait_for_capture_completion"
935 );
936 if !self.config.capture {
937 return;
938 }
939
940 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
941 while !(guard.stdout_closed && guard.stderr_closed) {
942 guard = self
943 .shared
944 .condvar
945 .wait(guard)
946 .expect("queue mutex poisoned");
947 }
948 }
949
950 fn wait_for_capture_completion_with_deadline_impl(&self, deadline: Instant) -> bool {
958 crate::rp_rust_debug_scope!(
959 "running_process::NativeProcess::wait_for_capture_completion_with_deadline"
960 );
961 if !self.config.capture {
962 return true;
963 }
964
965 let mut guard = self.shared.queues.lock().expect("queue mutex poisoned");
966 while !(guard.stdout_closed && guard.stderr_closed) {
967 let now = Instant::now();
968 if now >= deadline {
969 guard.stdout_closed = true;
970 guard.stderr_closed = true;
971 self.shared.condvar.notify_all();
972 return false;
973 }
974 let (next_guard, result) = self
975 .shared
976 .condvar
977 .wait_timeout(guard, deadline - now)
978 .expect("queue mutex poisoned");
979 guard = next_guard;
980 if result.timed_out() && !(guard.stdout_closed && guard.stderr_closed) {
981 guard.stdout_closed = true;
982 guard.stderr_closed = true;
983 self.shared.condvar.notify_all();
984 return false;
985 }
986 }
987 true
988 }
989}
990
991fn emit_lines(shared: &Arc<SharedState>, stream: StreamKind, lines: Vec<Vec<u8>>) {
992 if lines.is_empty() {
993 return;
994 }
995 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
996 for line in lines {
997 let line_len = line.len();
998 match stream {
999 StreamKind::Stdout => {
1000 guard.stdout_history_bytes += line_len;
1001 guard.stdout_history.push_back(line.clone());
1002 guard.stdout_queue.push_back(line.clone());
1003 }
1004 StreamKind::Stderr => {
1005 guard.stderr_history_bytes += line_len;
1006 guard.stderr_history.push_back(line.clone());
1007 guard.stderr_queue.push_back(line.clone());
1008 }
1009 }
1010 let event = StreamEvent { stream, line };
1011 guard.combined_history_bytes += line_len;
1012 guard.combined_history.push_back(event.clone());
1013 guard.combined_queue.push_back(event);
1014 }
1015 shared.condvar.notify_all();
1016}
1017
1018fn append_raw(shared: &Arc<SharedState>, stream: StreamKind, chunk: &[u8]) {
1019 if chunk.is_empty() {
1020 return;
1021 }
1022 let mut guard = shared.queues.lock().expect("queue mutex poisoned");
1023 match stream {
1024 StreamKind::Stdout => guard.stdout_raw.extend_from_slice(chunk),
1025 StreamKind::Stderr => guard.stderr_raw.extend_from_slice(chunk),
1026 }
1027}
1028
1029pub fn run_command(
1035 mut config: ProcessConfig,
1036 timeout: Option<Duration>,
1037) -> Result<RunOutput, ProcessError> {
1038 config.capture = true;
1039 let process = NativeProcess::new(config);
1040 process.start()?;
1041
1042 let exit_code = match process.wait(timeout) {
1043 Ok(code) => code,
1044 Err(ProcessError::Timeout) => {
1045 match process.kill() {
1046 Ok(()) | Err(ProcessError::NotRunning) => {}
1047 Err(error) => return Err(error),
1048 }
1049 return Err(ProcessError::Timeout);
1050 }
1051 Err(error) => return Err(error),
1052 };
1053
1054 Ok(RunOutput {
1055 stdout: process.captured_stdout_raw(),
1056 stderr: process.captured_stderr_raw(),
1057 exit_code,
1058 })
1059}
1060
1061pub(crate) fn shell_command(command: &str) -> Command {
1062 #[cfg(windows)]
1063 {
1064 use std::os::windows::process::CommandExt;
1065
1066 let mut cmd = Command::new("cmd");
1067 cmd.raw_arg("/D /S /C \"");
1068 cmd.raw_arg(command);
1069 cmd.raw_arg("\"");
1070 cmd
1071 }
1072 #[cfg(not(windows))]
1073 {
1074 let mut cmd = Command::new("sh");
1075 cmd.arg("-lc").arg(command);
1076 cmd
1077 }
1078}
1079
1080#[cfg(test)]
1081mod tests;