1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::io::{Read, Write};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
10use thiserror::Error;
11
12pub mod reexports {
14 pub use portable_pty;
15}
16
17#[cfg(unix)]
18mod pty_posix;
19#[cfg(windows)]
20mod pty_windows;
21
22pub mod terminal_input;
23
24#[cfg(unix)]
25use pty_posix as pty_platform;
26
27#[derive(Debug, Error)]
28pub enum PtyError {
29 #[error("pseudo-terminal process already started")]
30 AlreadyStarted,
31 #[error("pseudo-terminal process is not running")]
32 NotRunning,
33 #[error("pseudo-terminal timed out")]
34 Timeout,
35 #[error("pseudo-terminal I/O error: {0}")]
36 Io(#[from] std::io::Error),
37 #[error("pseudo-terminal spawn failed: {0}")]
38 Spawn(String),
39 #[error("pseudo-terminal error: {0}")]
40 Other(String),
41}
42
43pub fn is_ignorable_process_control_error(err: &std::io::Error) -> bool {
44 if matches!(
45 err.kind(),
46 std::io::ErrorKind::NotFound | std::io::ErrorKind::InvalidInput
47 ) {
48 return true;
49 }
50 #[cfg(unix)]
51 if err.raw_os_error() == Some(libc::ESRCH) {
52 return true;
53 }
54 false
55}
56
57pub struct PtyReadState {
58 pub chunks: VecDeque<Vec<u8>>,
59 pub closed: bool,
60}
61
62pub struct PtyReadShared {
63 pub state: Mutex<PtyReadState>,
64 pub condvar: Condvar,
65}
66
67pub struct NativePtyHandles {
68 pub master: Box<dyn MasterPty + Send>,
69 pub writer: Box<dyn Write + Send>,
70 pub child: Box<dyn portable_pty::Child + Send + Sync>,
71 #[cfg(windows)]
72 pub _job: WindowsJobHandle,
73}
74
75#[cfg(windows)]
76pub struct WindowsJobHandle(pub usize);
77
78#[cfg(windows)]
79impl WindowsJobHandle {
80 pub fn assign_pid(&self, pid: u32) -> Result<(), std::io::Error> {
82 use winapi::um::handleapi::CloseHandle;
83 use winapi::um::processthreadsapi::OpenProcess;
84 use winapi::um::winnt::PROCESS_SET_QUOTA;
85 use winapi::um::winnt::PROCESS_TERMINATE;
86
87 let handle = unsafe { OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, 0, pid) };
88 if handle.is_null() {
89 return Err(std::io::Error::last_os_error());
90 }
91 let result = unsafe {
92 winapi::um::jobapi2::AssignProcessToJobObject(
93 self.0 as winapi::shared::ntdef::HANDLE,
94 handle,
95 )
96 };
97 unsafe { CloseHandle(handle) };
98 if result == 0 {
99 return Err(std::io::Error::last_os_error());
100 }
101 Ok(())
102 }
103}
104
105#[cfg(windows)]
106impl Drop for WindowsJobHandle {
107 fn drop(&mut self) {
108 unsafe {
109 winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
110 }
111 }
112}
113
114pub struct IdleMonitorState {
115 pub last_reset_at: Instant,
116 pub returncode: Option<i32>,
117 pub interrupted: bool,
118}
119
120pub struct IdleDetectorCore {
123 pub timeout_seconds: f64,
124 pub stability_window_seconds: f64,
125 pub sample_interval_seconds: f64,
126 pub reset_on_input: bool,
127 pub reset_on_output: bool,
128 pub count_control_churn_as_output: bool,
129 pub enabled: Arc<AtomicBool>,
130 pub state: Mutex<IdleMonitorState>,
131 pub condvar: Condvar,
132}
133
134impl IdleDetectorCore {
135 pub fn record_input(&self, byte_count: usize) {
136 if !self.reset_on_input || byte_count == 0 {
137 return;
138 }
139 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
140 guard.last_reset_at = Instant::now();
141 self.condvar.notify_all();
142 }
143
144 pub fn record_output(&self, data: &[u8]) {
145 if !self.reset_on_output || data.is_empty() {
146 return;
147 }
148 let control_bytes = control_churn_bytes(data);
149 let visible_output_bytes = data.len().saturating_sub(control_bytes);
150 let active_output =
151 visible_output_bytes > 0 || (self.count_control_churn_as_output && control_bytes > 0);
152 if !active_output {
153 return;
154 }
155 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
156 guard.last_reset_at = Instant::now();
157 self.condvar.notify_all();
158 }
159
160 pub fn mark_exit(&self, returncode: i32, interrupted: bool) {
161 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
162 guard.returncode = Some(returncode);
163 guard.interrupted = interrupted;
164 self.condvar.notify_all();
165 }
166
167 pub fn enabled(&self) -> bool {
168 self.enabled.load(Ordering::Acquire)
169 }
170
171 pub fn set_enabled(&self, enabled: bool) {
172 let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
173 if enabled && !was_enabled {
174 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
175 guard.last_reset_at = Instant::now();
176 }
177 self.condvar.notify_all();
178 }
179
180 pub fn wait(&self, timeout: Option<f64>) -> (bool, String, f64, Option<i32>) {
181 let started = Instant::now();
182 let overall_timeout = timeout.map(Duration::from_secs_f64);
183 let min_idle = self.timeout_seconds.max(self.stability_window_seconds);
184 let sample_interval = Duration::from_secs_f64(self.sample_interval_seconds.max(0.001));
185
186 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
187 loop {
188 let now = Instant::now();
189 let idle_for = now.duration_since(guard.last_reset_at).as_secs_f64();
190
191 if let Some(returncode) = guard.returncode {
192 let reason = if guard.interrupted {
193 "interrupt"
194 } else {
195 "process_exit"
196 };
197 return (false, reason.to_string(), idle_for, Some(returncode));
198 }
199
200 let enabled = self.enabled.load(Ordering::Acquire);
201 if enabled && idle_for >= min_idle {
202 return (true, "idle_timeout".to_string(), idle_for, None);
203 }
204
205 if let Some(limit) = overall_timeout {
206 if now.duration_since(started) >= limit {
207 return (false, "timeout".to_string(), idle_for, None);
208 }
209 }
210
211 let idle_remaining = if enabled {
212 (min_idle - idle_for).max(0.0)
213 } else {
214 sample_interval.as_secs_f64()
215 };
216 let mut wait_for =
217 sample_interval.min(Duration::from_secs_f64(idle_remaining.max(0.001)));
218 if let Some(limit) = overall_timeout {
219 let elapsed = now.duration_since(started);
220 if elapsed < limit {
221 let remaining = limit - elapsed;
222 wait_for = wait_for.min(remaining);
223 }
224 }
225 let result = self
226 .condvar
227 .wait_timeout(guard, wait_for)
228 .expect("idle monitor mutex poisoned");
229 guard = result.0;
230 }
231 }
232}
233
234pub struct NativePtyProcess {
235 pub argv: Vec<String>,
236 pub cwd: Option<String>,
237 pub env: Option<Vec<(String, String)>>,
238 pub rows: u16,
239 pub cols: u16,
240 #[cfg(windows)]
241 pub nice: Option<i32>,
242 pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
243 pub reader: Arc<PtyReadShared>,
244 pub returncode: Arc<Mutex<Option<i32>>>,
245 pub input_bytes_total: Arc<AtomicUsize>,
246 pub newline_events_total: Arc<AtomicUsize>,
247 pub submit_events_total: Arc<AtomicUsize>,
248 pub echo: Arc<AtomicBool>,
250 pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
252 pub output_bytes_total: Arc<AtomicUsize>,
254 pub control_churn_bytes_total: Arc<AtomicUsize>,
256 #[cfg(windows)]
257 pub terminal_input_relay_stop: Arc<AtomicBool>,
258 #[cfg(windows)]
259 pub terminal_input_relay_active: Arc<AtomicBool>,
260 #[cfg(windows)]
261 pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
262}
263
264fn resolved_spawn_cwd(cwd: Option<&str>) -> Option<String> {
265 cwd.map(str::to_owned).or_else(|| {
266 std::env::current_dir()
267 .ok()
268 .map(|cwd| cwd.to_string_lossy().to_string())
269 })
270}
271
272impl NativePtyProcess {
273 pub fn new(
274 argv: Vec<String>,
275 cwd: Option<String>,
276 env: Option<Vec<(String, String)>>,
277 rows: u16,
278 cols: u16,
279 nice: Option<i32>,
280 ) -> Result<Self, PtyError> {
281 if argv.is_empty() {
282 return Err(PtyError::Other("command cannot be empty".into()));
283 }
284 #[cfg(not(windows))]
285 let _ = nice;
286 Ok(Self {
287 argv,
288 cwd,
289 env,
290 rows,
291 cols,
292 #[cfg(windows)]
293 nice,
294 handles: Arc::new(Mutex::new(None)),
295 reader: Arc::new(PtyReadShared {
296 state: Mutex::new(PtyReadState {
297 chunks: VecDeque::new(),
298 closed: false,
299 }),
300 condvar: Condvar::new(),
301 }),
302 returncode: Arc::new(Mutex::new(None)),
303 input_bytes_total: Arc::new(AtomicUsize::new(0)),
304 newline_events_total: Arc::new(AtomicUsize::new(0)),
305 submit_events_total: Arc::new(AtomicUsize::new(0)),
306 echo: Arc::new(AtomicBool::new(false)),
307 idle_detector: Arc::new(Mutex::new(None)),
308 output_bytes_total: Arc::new(AtomicUsize::new(0)),
309 control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
310 #[cfg(windows)]
311 terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
312 #[cfg(windows)]
313 terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
314 #[cfg(windows)]
315 terminal_input_relay_worker: Mutex::new(None),
316 })
317 }
318
319 pub fn mark_reader_closed(&self) {
320 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
321 guard.closed = true;
322 self.reader.condvar.notify_all();
323 }
324
325 pub fn store_returncode(&self, code: i32) {
326 store_pty_returncode(&self.returncode, code);
327 }
328
329 pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
330 record_pty_input_metrics(
331 &self.input_bytes_total,
332 &self.newline_events_total,
333 &self.submit_events_total,
334 data,
335 submit,
336 );
337 }
338
339 pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
340 self.record_input_metrics(data, submit);
341 write_pty_input(&self.handles, data)?;
342 Ok(())
343 }
344
345 #[cfg(windows)]
346 pub fn request_terminal_input_relay_stop(&self) {
347 self.terminal_input_relay_stop
348 .store(true, Ordering::Release);
349 self.terminal_input_relay_active
350 .store(false, Ordering::Release);
351 }
352
353 #[cfg(windows)]
354 pub fn stop_terminal_input_relay_impl(&self) {
355 self.request_terminal_input_relay_stop();
356 if let Some(worker) = self
357 .terminal_input_relay_worker
358 .lock()
359 .expect("pty terminal input relay mutex poisoned")
360 .take()
361 {
362 let _ = worker.join();
363 }
364 }
365
366 #[cfg(not(windows))]
367 pub fn stop_terminal_input_relay_impl(&self) {}
368
369 #[inline(never)]
371 pub fn close_impl(&self) -> Result<(), PtyError> {
372 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_impl");
373 self.stop_terminal_input_relay_impl();
374 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
375 let Some(handles) = guard.take() else {
376 self.mark_reader_closed();
377 return Ok(());
378 };
379 drop(guard);
380
381 #[cfg(windows)]
382 let NativePtyHandles {
383 master,
384 writer,
385 mut child,
386 _job,
387 } = handles;
388 #[cfg(not(windows))]
389 let NativePtyHandles {
390 master,
391 writer,
392 mut child,
393 } = handles;
394
395 if let Err(err) = child.kill() {
396 if !is_ignorable_process_control_error(&err) {
397 return Err(PtyError::Io(err));
398 }
399 }
400
401 drop(writer);
402 drop(master);
403
404 let code = match child.wait() {
405 Ok(status) => portable_exit_code(status),
406 Err(_) => -9,
407 };
408 drop(child);
409 #[cfg(windows)]
410 drop(_job);
411
412 self.store_returncode(code);
413 self.mark_reader_closed();
414 Ok(())
415 }
416
417 #[inline(never)]
419 pub fn close_nonblocking(&self) {
420 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_nonblocking");
421 #[cfg(windows)]
422 self.request_terminal_input_relay_stop();
423 let Ok(mut guard) = self.handles.lock() else {
424 return;
425 };
426 let Some(handles) = guard.take() else {
427 self.mark_reader_closed();
428 return;
429 };
430 drop(guard);
431
432 #[cfg(windows)]
433 let NativePtyHandles {
434 master,
435 writer,
436 mut child,
437 _job,
438 } = handles;
439 #[cfg(not(windows))]
440 let NativePtyHandles {
441 master,
442 writer,
443 mut child,
444 } = handles;
445
446 if let Err(err) = child.kill() {
447 if !is_ignorable_process_control_error(&err) {
448 return;
449 }
450 }
451 drop(writer);
452 drop(master);
453 drop(child);
454 #[cfg(windows)]
455 drop(_job);
456 self.mark_reader_closed();
457 }
458
459 pub fn start_impl(&self) -> Result<(), PtyError> {
460 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::start");
461 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
462 if guard.is_some() {
463 return Err(PtyError::AlreadyStarted);
464 }
465
466 #[cfg(windows)]
469 let conhost_pids_before = conhost_children_of_current_process();
470
471 let pty_system = native_pty_system();
472 let pair = pty_system
473 .openpty(PtySize {
474 rows: self.rows,
475 cols: self.cols,
476 pixel_width: 0,
477 pixel_height: 0,
478 })
479 .map_err(|e| PtyError::Spawn(e.to_string()))?;
480
481 let mut cmd = command_builder_from_argv(&self.argv);
482 let cwd = resolved_spawn_cwd(self.cwd.as_deref());
483 if let Some(cwd) = &cwd {
484 cmd.cwd(cwd);
485 }
486 if let Some(env) = &self.env {
487 cmd.env_clear();
488 for (key, value) in env {
489 cmd.env(key, value);
490 }
491 }
492
493 let reader = pair
494 .master
495 .try_clone_reader()
496 .map_err(|e| PtyError::Spawn(e.to_string()))?;
497 let writer = pair
498 .master
499 .take_writer()
500 .map_err(|e| PtyError::Spawn(e.to_string()))?;
501 let child = pair
502 .slave
503 .spawn_command(cmd)
504 .map_err(|e| PtyError::Spawn(e.to_string()))?;
505 #[cfg(windows)]
506 let job = assign_child_to_windows_kill_on_close_job(child.as_raw_handle())?;
507 #[cfg(windows)]
508 assign_conpty_conhost_to_job(&job, &conhost_pids_before);
509 #[cfg(windows)]
510 apply_windows_pty_priority(child.as_raw_handle(), self.nice)?;
511 let shared = Arc::clone(&self.reader);
512 let echo = Arc::clone(&self.echo);
513 let idle_detector = Arc::clone(&self.idle_detector);
514 let output_bytes = Arc::clone(&self.output_bytes_total);
515 let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
516 thread::spawn(move || {
517 spawn_pty_reader(
518 reader,
519 shared,
520 echo,
521 idle_detector,
522 output_bytes,
523 churn_bytes,
524 );
525 });
526
527 *guard = Some(NativePtyHandles {
528 master: pair.master,
529 writer,
530 child,
531 #[cfg(windows)]
532 _job: job,
533 });
534 Ok(())
535 }
536
537 pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
538 #[cfg(windows)]
539 {
540 pty_windows::respond_to_queries(self, data)
541 }
542
543 #[cfg(unix)]
544 {
545 pty_platform::respond_to_queries(self, data)
546 }
547 }
548
549 pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
550 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::resize");
551 let guard = self.handles.lock().expect("pty handles mutex poisoned");
552 if let Some(handles) = guard.as_ref() {
553 handles
554 .master
555 .resize(PtySize {
556 rows,
557 cols,
558 pixel_width: 0,
559 pixel_height: 0,
560 })
561 .map_err(|e| PtyError::Other(e.to_string()))?;
562 }
563 Ok(())
564 }
565
566 pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
567 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::send_interrupt");
568 #[cfg(windows)]
569 {
570 pty_windows::send_interrupt(self)
571 }
572
573 #[cfg(unix)]
574 {
575 pty_platform::send_interrupt(self)
576 }
577 }
578
579 pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
580 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::wait");
581 if let Some(code) = *self
583 .returncode
584 .lock()
585 .expect("pty returncode mutex poisoned")
586 {
587 return Ok(code);
588 }
589 let start = Instant::now();
590 loop {
591 if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
592 return Ok(code);
593 }
594 if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
595 return Err(PtyError::Timeout);
596 }
597 thread::sleep(Duration::from_millis(10));
598 }
599 }
600
601 pub fn terminate_impl(&self) -> Result<(), PtyError> {
602 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate");
603 #[cfg(windows)]
604 {
605 pty_windows::terminate(self)
606 }
607
608 #[cfg(unix)]
609 {
610 pty_platform::terminate(self)
611 }
612 }
613
614 pub fn kill_impl(&self) -> Result<(), PtyError> {
615 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill");
616 #[cfg(windows)]
617 {
618 pty_windows::kill(self)
619 }
620
621 #[cfg(unix)]
622 {
623 pty_platform::kill(self)
624 }
625 }
626
627 pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
628 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate_tree");
629 #[cfg(windows)]
630 {
631 pty_windows::terminate_tree(self)
632 }
633
634 #[cfg(unix)]
635 {
636 pty_platform::terminate_tree(self)
637 }
638 }
639
640 pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
641 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill_tree");
642 #[cfg(windows)]
643 {
644 pty_windows::kill_tree(self)
645 }
646
647 #[cfg(unix)]
648 {
649 pty_platform::kill_tree(self)
650 }
651 }
652
653 pub fn pid(&self) -> Result<Option<u32>, PtyError> {
655 let guard = self.handles.lock().expect("pty handles mutex poisoned");
656 if let Some(handles) = guard.as_ref() {
657 #[cfg(unix)]
658 if let Some(pid) = handles.master.process_group_leader() {
659 if let Ok(pid) = u32::try_from(pid) {
660 return Ok(Some(pid));
661 }
662 }
663 return Ok(handles.child.process_id());
664 }
665 Ok(None)
666 }
667
668 pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
671 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
672 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
673 loop {
674 if let Some(chunk) = guard.chunks.pop_front() {
675 return Ok(Some(chunk));
676 }
677 if guard.closed {
678 return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
679 }
680 match deadline {
681 Some(deadline) => {
682 let now = Instant::now();
683 if now >= deadline {
684 return Ok(None); }
686 let wait = deadline.saturating_duration_since(now);
687 let result = self
688 .reader
689 .condvar
690 .wait_timeout(guard, wait)
691 .expect("pty read mutex poisoned");
692 guard = result.0;
693 }
694 None => {
695 guard = self
696 .reader
697 .condvar
698 .wait(guard)
699 .expect("pty read mutex poisoned");
700 }
701 }
702 }
703 }
704
705 pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
707 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
708 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
709 loop {
710 if guard.closed {
711 return true;
712 }
713 match deadline {
714 Some(deadline) => {
715 let now = Instant::now();
716 if now >= deadline {
717 return false;
718 }
719 let wait = deadline.saturating_duration_since(now);
720 let result = self
721 .reader
722 .condvar
723 .wait_timeout(guard, wait)
724 .expect("pty read mutex poisoned");
725 guard = result.0;
726 }
727 None => {
728 guard = self
729 .reader
730 .condvar
731 .wait(guard)
732 .expect("pty read mutex poisoned");
733 }
734 }
735 }
736 }
737
738 pub fn wait_and_drain_impl(
740 &self,
741 timeout: Option<f64>,
742 drain_timeout: f64,
743 ) -> Result<i32, PtyError> {
744 let code = self.wait_impl(timeout)?;
745 let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
746 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
747 while !guard.closed {
748 let remaining = deadline.saturating_duration_since(Instant::now());
749 if remaining.is_zero() {
750 break;
751 }
752 let result = self
753 .reader
754 .condvar
755 .wait_timeout(guard, remaining)
756 .expect("pty read mutex poisoned");
757 guard = result.0;
758 }
759 Ok(code)
760 }
761
762 pub fn set_echo(&self, enabled: bool) {
763 self.echo.store(enabled, Ordering::Release);
764 }
765
766 pub fn echo_enabled(&self) -> bool {
767 self.echo.load(Ordering::Acquire)
768 }
769
770 pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
771 let mut guard = self
772 .idle_detector
773 .lock()
774 .expect("idle detector mutex poisoned");
775 *guard = Some(Arc::clone(detector));
776 }
777
778 pub fn detach_idle_detector(&self) {
779 let mut guard = self
780 .idle_detector
781 .lock()
782 .expect("idle detector mutex poisoned");
783 *guard = None;
784 }
785
786 pub fn pty_input_bytes_total(&self) -> usize {
787 self.input_bytes_total.load(Ordering::Acquire)
788 }
789
790 pub fn pty_newline_events_total(&self) -> usize {
791 self.newline_events_total.load(Ordering::Acquire)
792 }
793
794 pub fn pty_submit_events_total(&self) -> usize {
795 self.submit_events_total.load(Ordering::Acquire)
796 }
797
798 pub fn pty_output_bytes_total(&self) -> usize {
799 self.output_bytes_total.load(Ordering::Acquire)
800 }
801
802 pub fn pty_control_churn_bytes_total(&self) -> usize {
803 self.control_churn_bytes_total.load(Ordering::Acquire)
804 }
805}
806
807impl Drop for NativePtyProcess {
808 fn drop(&mut self) {
809 self.close_nonblocking();
810 }
811}
812
813pub fn control_churn_bytes(data: &[u8]) -> usize {
816 let mut total = 0;
817 let mut index = 0;
818 while index < data.len() {
819 let byte = data[index];
820 if byte == 0x1B {
821 let start = index;
822 index += 1;
823 if index < data.len() && data[index] == b'[' {
824 index += 1;
825 while index < data.len() {
826 let current = data[index];
827 index += 1;
828 if (0x40..=0x7E).contains(¤t) {
829 break;
830 }
831 }
832 }
833 total += index - start;
834 continue;
835 }
836 if matches!(byte, 0x08 | 0x0D | 0x7F) {
837 total += 1;
838 }
839 index += 1;
840 }
841 total
842}
843
844pub fn command_builder_from_argv(argv: &[String]) -> CommandBuilder {
845 let mut command = CommandBuilder::new(&argv[0]);
846 if argv.len() > 1 {
847 command.args(
848 argv[1..]
849 .iter()
850 .map(OsString::from)
851 .collect::<Vec<OsString>>(),
852 );
853 }
854 command
855}
856
857#[inline(never)]
858pub fn spawn_pty_reader(
859 mut reader: Box<dyn Read + Send>,
860 shared: Arc<PtyReadShared>,
861 echo: Arc<AtomicBool>,
862 idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
863 output_bytes_total: Arc<AtomicUsize>,
864 control_churn_bytes_total: Arc<AtomicUsize>,
865) {
866 crate::rp_rust_debug_scope!("running_process_core::spawn_pty_reader");
867 let idle_detector_snapshot = idle_detector
868 .lock()
869 .expect("idle detector mutex poisoned")
870 .clone();
871 let mut chunk = vec![0_u8; 65536];
872 loop {
873 match reader.read(&mut chunk) {
874 Ok(0) => break,
875 Ok(n) => {
876 let data = &chunk[..n];
877
878 let churn = control_churn_bytes(data);
879 let visible = data.len().saturating_sub(churn);
880 output_bytes_total.fetch_add(visible, Ordering::Relaxed);
881 control_churn_bytes_total.fetch_add(churn, Ordering::Relaxed);
882
883 if echo.load(Ordering::Relaxed) {
884 let _ = std::io::stdout().write_all(data);
885 let _ = std::io::stdout().flush();
886 }
887
888 if let Some(ref detector) = idle_detector_snapshot {
889 detector.record_output(data);
890 }
891
892 let mut guard = shared.state.lock().expect("pty read mutex poisoned");
893 guard.chunks.push_back(data.to_vec());
894 shared.condvar.notify_all();
895 }
896 Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
897 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
898 thread::sleep(Duration::from_millis(10));
899 continue;
900 }
901 Err(_) => break,
902 }
903 }
904 let mut guard = shared.state.lock().expect("pty read mutex poisoned");
905 guard.closed = true;
906 shared.condvar.notify_all();
907}
908
909pub fn portable_exit_code(status: portable_pty::ExitStatus) -> i32 {
910 if let Some(signal) = status.signal() {
911 let signal = signal.to_ascii_lowercase();
912 if signal.contains("interrupt") {
913 return -2;
914 }
915 if signal.contains("terminated") {
916 return -15;
917 }
918 if signal.contains("killed") {
919 return -9;
920 }
921 }
922 status.exit_code() as i32
923}
924
925pub fn input_contains_newline(data: &[u8]) -> bool {
926 data.iter().any(|byte| matches!(*byte, b'\r' | b'\n'))
927}
928
929pub fn record_pty_input_metrics(
930 input_bytes_total: &Arc<AtomicUsize>,
931 newline_events_total: &Arc<AtomicUsize>,
932 submit_events_total: &Arc<AtomicUsize>,
933 data: &[u8],
934 submit: bool,
935) {
936 input_bytes_total.fetch_add(data.len(), Ordering::AcqRel);
937 if input_contains_newline(data) {
938 newline_events_total.fetch_add(1, Ordering::AcqRel);
939 }
940 if submit {
941 submit_events_total.fetch_add(1, Ordering::AcqRel);
942 }
943}
944
945pub fn store_pty_returncode(returncode: &Arc<Mutex<Option<i32>>>, code: i32) {
946 *returncode.lock().expect("pty returncode mutex poisoned") = Some(code);
947}
948
949pub fn poll_pty_process(
950 handles: &Arc<Mutex<Option<NativePtyHandles>>>,
951 returncode: &Arc<Mutex<Option<i32>>>,
952) -> Result<Option<i32>, std::io::Error> {
953 let mut guard = handles.lock().expect("pty handles mutex poisoned");
954 let Some(handles) = guard.as_mut() else {
955 return Ok(*returncode.lock().expect("pty returncode mutex poisoned"));
956 };
957 let status = handles.child.try_wait()?;
958 let code = status.map(portable_exit_code);
959 if let Some(code) = code {
960 store_pty_returncode(returncode, code);
961 return Ok(Some(code));
962 }
963 Ok(None)
964}
965
966pub fn write_pty_input(
967 handles: &Arc<Mutex<Option<NativePtyHandles>>>,
968 data: &[u8],
969) -> Result<(), std::io::Error> {
970 let mut guard = handles.lock().expect("pty handles mutex poisoned");
971 let handles = guard.as_mut().ok_or_else(|| {
972 std::io::Error::new(
973 std::io::ErrorKind::NotConnected,
974 "Pseudo-terminal process is not running",
975 )
976 })?;
977 #[cfg(windows)]
978 let payload = pty_windows::input_payload(data);
979 #[cfg(unix)]
980 let payload = pty_platform::input_payload(data);
981 handles.writer.write_all(&payload)?;
982 handles.writer.flush()
983}
984
985#[cfg(windows)]
986pub fn windows_terminal_input_payload(data: &[u8]) -> Vec<u8> {
987 let mut translated = Vec::with_capacity(data.len());
988 let mut index = 0usize;
989 while index < data.len() {
990 let current = data[index];
991 if current == b'\r' {
992 translated.push(current);
993 if index + 1 < data.len() && data[index + 1] == b'\n' {
994 translated.push(b'\n');
995 index += 2;
996 continue;
997 }
998 index += 1;
999 continue;
1000 }
1001 if current == b'\n' {
1002 translated.push(b'\r');
1003 index += 1;
1004 continue;
1005 }
1006 translated.push(current);
1007 index += 1;
1008 }
1009 translated
1010}
1011
1012#[cfg(windows)]
1013#[inline(never)]
1014pub fn assign_child_to_windows_kill_on_close_job(
1015 handle: Option<std::os::windows::io::RawHandle>,
1016) -> Result<WindowsJobHandle, PtyError> {
1017 crate::rp_rust_debug_scope!(
1018 "running_process_core::pty::assign_child_to_windows_kill_on_close_job"
1019 );
1020 use std::mem::zeroed;
1021
1022 use winapi::shared::minwindef::FALSE;
1023 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
1024 use winapi::um::jobapi2::{
1025 AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
1026 };
1027 use winapi::um::winnt::{
1028 JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
1029 JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
1030 };
1031
1032 let Some(handle) = handle else {
1033 return Err(PtyError::Other(
1034 "Pseudo-terminal child does not expose a Windows process handle".into(),
1035 ));
1036 };
1037
1038 let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
1039 if job.is_null() || job == INVALID_HANDLE_VALUE {
1040 return Err(PtyError::Io(std::io::Error::last_os_error()));
1041 }
1042
1043 let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
1044 info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
1045 let result = unsafe {
1046 SetInformationJobObject(
1047 job,
1048 JobObjectExtendedLimitInformation,
1049 (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
1050 std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
1051 )
1052 };
1053 if result == FALSE {
1054 let err = std::io::Error::last_os_error();
1055 unsafe {
1056 winapi::um::handleapi::CloseHandle(job);
1057 }
1058 return Err(PtyError::Io(err));
1059 }
1060
1061 let result = unsafe { AssignProcessToJobObject(job, handle.cast()) };
1062 if result == FALSE {
1063 let err = std::io::Error::last_os_error();
1064 unsafe {
1065 winapi::um::handleapi::CloseHandle(job);
1066 }
1067 return Err(PtyError::Io(err));
1068 }
1069
1070 Ok(WindowsJobHandle(job as usize))
1071}
1072
1073#[cfg(windows)]
1075#[derive(Debug, Clone)]
1076pub struct ChildProcessInfo {
1077 pub pid: u32,
1078 pub name: String,
1079}
1080
1081#[cfg(windows)]
1084pub fn find_child_processes(parent_pid: u32) -> Vec<ChildProcessInfo> {
1085 use winapi::um::handleapi::CloseHandle;
1086 use winapi::um::tlhelp32::{
1087 CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1088 };
1089
1090 let mut children = Vec::new();
1091 let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1092 if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1093 return children;
1094 }
1095
1096 let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1097 entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1098
1099 if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1100 loop {
1101 if entry.th32ParentProcessID == parent_pid {
1102 let name_bytes = &entry.szExeFile;
1103 let name_len = name_bytes
1104 .iter()
1105 .position(|&b| b == 0)
1106 .unwrap_or(name_bytes.len());
1107 let name = String::from_utf8_lossy(
1108 &name_bytes[..name_len]
1109 .iter()
1110 .map(|&c| c as u8)
1111 .collect::<Vec<u8>>(),
1112 )
1113 .into_owned();
1114 children.push(ChildProcessInfo {
1115 pid: entry.th32ProcessID,
1116 name,
1117 });
1118 }
1119 if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1120 break;
1121 }
1122 }
1123 }
1124
1125 unsafe { CloseHandle(snapshot) };
1126 children
1127}
1128
1129#[cfg(windows)]
1131fn conhost_children_of_current_process() -> Vec<u32> {
1132 let our_pid = std::process::id();
1133 find_child_processes(our_pid)
1134 .into_iter()
1135 .filter(|c| c.name.eq_ignore_ascii_case("conhost.exe"))
1136 .map(|c| c.pid)
1137 .collect()
1138}
1139
1140#[cfg(windows)]
1144fn assign_conpty_conhost_to_job(job: &WindowsJobHandle, before_pids: &[u32]) {
1145 let after_pids = conhost_children_of_current_process();
1146 for pid in after_pids {
1147 if !before_pids.contains(&pid) {
1148 let _ = job.assign_pid(pid);
1150 }
1151 }
1152}
1153
1154#[cfg(windows)]
1157#[derive(Debug, Clone)]
1158pub struct OrphanConhostInfo {
1159 pub pid: u32,
1161 pub parent_pid: u32,
1163 pub parent_name: String,
1165}
1166
1167#[cfg(windows)]
1173pub fn find_orphan_conhosts() -> Vec<OrphanConhostInfo> {
1174 use winapi::um::handleapi::CloseHandle;
1175 use winapi::um::tlhelp32::{
1176 CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1177 };
1178
1179 let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1180 if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1181 return Vec::new();
1182 }
1183
1184 let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1185 entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1186
1187 let mut all_pids = std::collections::HashSet::new();
1189 let mut conhosts: Vec<(u32, u32)> = Vec::new(); let mut parent_names: std::collections::HashMap<u32, String> = std::collections::HashMap::new();
1191
1192 if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1193 loop {
1194 let name_bytes = &entry.szExeFile;
1195 let name_len = name_bytes
1196 .iter()
1197 .position(|&b| b == 0)
1198 .unwrap_or(name_bytes.len());
1199 let name = String::from_utf8_lossy(
1200 &name_bytes[..name_len]
1201 .iter()
1202 .map(|&c| c as u8)
1203 .collect::<Vec<u8>>(),
1204 )
1205 .into_owned();
1206
1207 all_pids.insert(entry.th32ProcessID);
1208 parent_names.insert(entry.th32ProcessID, name.clone());
1209
1210 if name.eq_ignore_ascii_case("conhost.exe") {
1211 conhosts.push((entry.th32ProcessID, entry.th32ParentProcessID));
1212 }
1213
1214 if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1215 break;
1216 }
1217 }
1218 }
1219
1220 unsafe { CloseHandle(snapshot) };
1221
1222 conhosts
1224 .into_iter()
1225 .filter(|&(_, parent_pid)| !all_pids.contains(&parent_pid))
1226 .map(|(pid, parent_pid)| OrphanConhostInfo {
1227 pid,
1228 parent_pid,
1229 parent_name: parent_names.get(&parent_pid).cloned().unwrap_or_default(),
1230 })
1231 .collect()
1232}
1233
1234#[cfg(windows)]
1235#[inline(never)]
1236pub fn apply_windows_pty_priority(
1237 handle: Option<std::os::windows::io::RawHandle>,
1238 nice: Option<i32>,
1239) -> Result<(), PtyError> {
1240 crate::rp_rust_debug_scope!("running_process_core::pty::apply_windows_pty_priority");
1241 use winapi::um::processthreadsapi::SetPriorityClass;
1242 use winapi::um::winbase::{
1243 ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, HIGH_PRIORITY_CLASS,
1244 IDLE_PRIORITY_CLASS,
1245 };
1246
1247 let Some(handle) = handle else {
1248 return Ok(());
1249 };
1250 let flags = match nice {
1251 Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
1252 Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
1253 Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
1254 Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
1255 _ => 0,
1256 };
1257 if flags == 0 {
1258 return Ok(());
1259 }
1260 let result = unsafe { SetPriorityClass(handle.cast(), flags) };
1261 if result == 0 {
1262 return Err(PtyError::Io(std::io::Error::last_os_error()));
1263 }
1264 Ok(())
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269 use super::resolved_spawn_cwd;
1270
1271 #[test]
1272 fn resolved_spawn_cwd_preserves_explicit_value() {
1273 assert_eq!(
1274 resolved_spawn_cwd(Some("C:\\temp\\explicit")),
1275 Some("C:\\temp\\explicit".to_string())
1276 );
1277 }
1278
1279 #[test]
1280 fn resolved_spawn_cwd_defaults_to_current_dir_when_unset() {
1281 let expected = std::env::current_dir()
1282 .ok()
1283 .map(|cwd| cwd.to_string_lossy().to_string());
1284 assert_eq!(resolved_spawn_cwd(None), expected);
1285 }
1286}