1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::io::{Read, Write};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
10use thiserror::Error;
11
12pub mod reexports {
14 pub use portable_pty;
15}
16
17#[cfg(unix)]
18mod pty_posix;
19#[cfg(windows)]
20mod pty_windows;
21
22pub mod terminal_input;
23
24#[cfg(unix)]
25use pty_posix as pty_platform;
26
27#[derive(Debug, Error)]
28pub enum PtyError {
29 #[error("pseudo-terminal process already started")]
30 AlreadyStarted,
31 #[error("pseudo-terminal process is not running")]
32 NotRunning,
33 #[error("pseudo-terminal timed out")]
34 Timeout,
35 #[error("pseudo-terminal I/O error: {0}")]
36 Io(#[from] std::io::Error),
37 #[error("pseudo-terminal spawn failed: {0}")]
38 Spawn(String),
39 #[error("pseudo-terminal error: {0}")]
40 Other(String),
41}
42
43pub fn is_ignorable_process_control_error(err: &std::io::Error) -> bool {
44 if matches!(
45 err.kind(),
46 std::io::ErrorKind::NotFound | std::io::ErrorKind::InvalidInput
47 ) {
48 return true;
49 }
50 #[cfg(unix)]
51 if err.raw_os_error() == Some(libc::ESRCH) {
52 return true;
53 }
54 false
55}
56
57pub struct PtyReadState {
58 pub chunks: VecDeque<Vec<u8>>,
59 pub closed: bool,
60}
61
62pub struct PtyReadShared {
63 pub state: Mutex<PtyReadState>,
64 pub condvar: Condvar,
65}
66
67pub struct NativePtyHandles {
68 pub master: Box<dyn MasterPty + Send>,
69 pub writer: Box<dyn Write + Send>,
70 pub child: Box<dyn portable_pty::Child + Send + Sync>,
71 #[cfg(windows)]
72 pub _job: WindowsJobHandle,
73}
74
75#[cfg(windows)]
76pub struct WindowsJobHandle(pub usize);
77
78#[cfg(windows)]
79impl WindowsJobHandle {
80 pub fn assign_pid(&self, pid: u32) -> Result<(), std::io::Error> {
82 use winapi::um::handleapi::CloseHandle;
83 use winapi::um::processthreadsapi::OpenProcess;
84 use winapi::um::winnt::PROCESS_SET_QUOTA;
85 use winapi::um::winnt::PROCESS_TERMINATE;
86
87 let handle = unsafe { OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, 0, pid) };
88 if handle.is_null() {
89 return Err(std::io::Error::last_os_error());
90 }
91 let result = unsafe {
92 winapi::um::jobapi2::AssignProcessToJobObject(
93 self.0 as winapi::shared::ntdef::HANDLE,
94 handle,
95 )
96 };
97 unsafe { CloseHandle(handle) };
98 if result == 0 {
99 return Err(std::io::Error::last_os_error());
100 }
101 Ok(())
102 }
103}
104
105#[cfg(windows)]
106impl Drop for WindowsJobHandle {
107 fn drop(&mut self) {
108 unsafe {
109 winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
110 }
111 }
112}
113
114pub struct IdleMonitorState {
115 pub last_reset_at: Instant,
116 pub returncode: Option<i32>,
117 pub interrupted: bool,
118}
119
120pub struct IdleDetectorCore {
123 pub timeout_seconds: f64,
124 pub stability_window_seconds: f64,
125 pub sample_interval_seconds: f64,
126 pub reset_on_input: bool,
127 pub reset_on_output: bool,
128 pub count_control_churn_as_output: bool,
129 pub enabled: Arc<AtomicBool>,
130 pub state: Mutex<IdleMonitorState>,
131 pub condvar: Condvar,
132}
133
134impl IdleDetectorCore {
135 pub fn record_input(&self, byte_count: usize) {
136 if !self.reset_on_input || byte_count == 0 {
137 return;
138 }
139 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
140 guard.last_reset_at = Instant::now();
141 self.condvar.notify_all();
142 }
143
144 pub fn record_output(&self, data: &[u8]) {
145 if !self.reset_on_output || data.is_empty() {
146 return;
147 }
148 let control_bytes = control_churn_bytes(data);
149 let visible_output_bytes = data.len().saturating_sub(control_bytes);
150 let active_output =
151 visible_output_bytes > 0 || (self.count_control_churn_as_output && control_bytes > 0);
152 if !active_output {
153 return;
154 }
155 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
156 guard.last_reset_at = Instant::now();
157 self.condvar.notify_all();
158 }
159
160 pub fn mark_exit(&self, returncode: i32, interrupted: bool) {
161 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
162 guard.returncode = Some(returncode);
163 guard.interrupted = interrupted;
164 self.condvar.notify_all();
165 }
166
167 pub fn enabled(&self) -> bool {
168 self.enabled.load(Ordering::Acquire)
169 }
170
171 pub fn set_enabled(&self, enabled: bool) {
172 let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
173 if enabled && !was_enabled {
174 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
175 guard.last_reset_at = Instant::now();
176 }
177 self.condvar.notify_all();
178 }
179
180 pub fn wait(&self, timeout: Option<f64>) -> (bool, String, f64, Option<i32>) {
181 let started = Instant::now();
182 let overall_timeout = timeout.map(Duration::from_secs_f64);
183 let min_idle = self.timeout_seconds.max(self.stability_window_seconds);
184 let sample_interval = Duration::from_secs_f64(self.sample_interval_seconds.max(0.001));
185
186 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
187 loop {
188 let now = Instant::now();
189 let idle_for = now.duration_since(guard.last_reset_at).as_secs_f64();
190
191 if let Some(returncode) = guard.returncode {
192 let reason = if guard.interrupted {
193 "interrupt"
194 } else {
195 "process_exit"
196 };
197 return (false, reason.to_string(), idle_for, Some(returncode));
198 }
199
200 let enabled = self.enabled.load(Ordering::Acquire);
201 if enabled && idle_for >= min_idle {
202 return (true, "idle_timeout".to_string(), idle_for, None);
203 }
204
205 if let Some(limit) = overall_timeout {
206 if now.duration_since(started) >= limit {
207 return (false, "timeout".to_string(), idle_for, None);
208 }
209 }
210
211 let idle_remaining = if enabled {
212 (min_idle - idle_for).max(0.0)
213 } else {
214 sample_interval.as_secs_f64()
215 };
216 let mut wait_for =
217 sample_interval.min(Duration::from_secs_f64(idle_remaining.max(0.001)));
218 if let Some(limit) = overall_timeout {
219 let elapsed = now.duration_since(started);
220 if elapsed < limit {
221 let remaining = limit - elapsed;
222 wait_for = wait_for.min(remaining);
223 }
224 }
225 let result = self
226 .condvar
227 .wait_timeout(guard, wait_for)
228 .expect("idle monitor mutex poisoned");
229 guard = result.0;
230 }
231 }
232}
233
234pub struct NativePtyProcess {
235 pub argv: Vec<String>,
236 pub cwd: Option<String>,
237 pub env: Option<Vec<(String, String)>>,
238 pub rows: u16,
239 pub cols: u16,
240 #[cfg(windows)]
241 pub nice: Option<i32>,
242 pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
243 pub reader: Arc<PtyReadShared>,
244 pub returncode: Arc<Mutex<Option<i32>>>,
245 pub input_bytes_total: Arc<AtomicUsize>,
246 pub newline_events_total: Arc<AtomicUsize>,
247 pub submit_events_total: Arc<AtomicUsize>,
248 pub echo: Arc<AtomicBool>,
250 pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
252 pub output_bytes_total: Arc<AtomicUsize>,
254 pub control_churn_bytes_total: Arc<AtomicUsize>,
256 #[cfg(windows)]
257 pub terminal_input_relay_stop: Arc<AtomicBool>,
258 #[cfg(windows)]
259 pub terminal_input_relay_active: Arc<AtomicBool>,
260 #[cfg(windows)]
261 pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
262}
263
264impl NativePtyProcess {
265 pub fn new(
266 argv: Vec<String>,
267 cwd: Option<String>,
268 env: Option<Vec<(String, String)>>,
269 rows: u16,
270 cols: u16,
271 nice: Option<i32>,
272 ) -> Result<Self, PtyError> {
273 if argv.is_empty() {
274 return Err(PtyError::Other("command cannot be empty".into()));
275 }
276 #[cfg(not(windows))]
277 let _ = nice;
278 Ok(Self {
279 argv,
280 cwd,
281 env,
282 rows,
283 cols,
284 #[cfg(windows)]
285 nice,
286 handles: Arc::new(Mutex::new(None)),
287 reader: Arc::new(PtyReadShared {
288 state: Mutex::new(PtyReadState {
289 chunks: VecDeque::new(),
290 closed: false,
291 }),
292 condvar: Condvar::new(),
293 }),
294 returncode: Arc::new(Mutex::new(None)),
295 input_bytes_total: Arc::new(AtomicUsize::new(0)),
296 newline_events_total: Arc::new(AtomicUsize::new(0)),
297 submit_events_total: Arc::new(AtomicUsize::new(0)),
298 echo: Arc::new(AtomicBool::new(false)),
299 idle_detector: Arc::new(Mutex::new(None)),
300 output_bytes_total: Arc::new(AtomicUsize::new(0)),
301 control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
302 #[cfg(windows)]
303 terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
304 #[cfg(windows)]
305 terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
306 #[cfg(windows)]
307 terminal_input_relay_worker: Mutex::new(None),
308 })
309 }
310
311 pub fn mark_reader_closed(&self) {
312 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
313 guard.closed = true;
314 self.reader.condvar.notify_all();
315 }
316
317 pub fn store_returncode(&self, code: i32) {
318 store_pty_returncode(&self.returncode, code);
319 }
320
321 pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
322 record_pty_input_metrics(
323 &self.input_bytes_total,
324 &self.newline_events_total,
325 &self.submit_events_total,
326 data,
327 submit,
328 );
329 }
330
331 pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
332 self.record_input_metrics(data, submit);
333 write_pty_input(&self.handles, data)?;
334 Ok(())
335 }
336
337 #[cfg(windows)]
338 pub fn request_terminal_input_relay_stop(&self) {
339 self.terminal_input_relay_stop
340 .store(true, Ordering::Release);
341 self.terminal_input_relay_active
342 .store(false, Ordering::Release);
343 }
344
345 #[cfg(windows)]
346 pub fn stop_terminal_input_relay_impl(&self) {
347 self.request_terminal_input_relay_stop();
348 if let Some(worker) = self
349 .terminal_input_relay_worker
350 .lock()
351 .expect("pty terminal input relay mutex poisoned")
352 .take()
353 {
354 let _ = worker.join();
355 }
356 }
357
358 #[cfg(not(windows))]
359 pub fn stop_terminal_input_relay_impl(&self) {}
360
361 #[inline(never)]
363 pub fn close_impl(&self) -> Result<(), PtyError> {
364 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_impl");
365 self.stop_terminal_input_relay_impl();
366 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
367 let Some(handles) = guard.take() else {
368 self.mark_reader_closed();
369 return Ok(());
370 };
371 drop(guard);
372
373 #[cfg(windows)]
374 let NativePtyHandles {
375 master,
376 writer,
377 mut child,
378 _job,
379 } = handles;
380 #[cfg(not(windows))]
381 let NativePtyHandles {
382 master,
383 writer,
384 mut child,
385 } = handles;
386
387 if let Err(err) = child.kill() {
388 if !is_ignorable_process_control_error(&err) {
389 return Err(PtyError::Io(err));
390 }
391 }
392
393 drop(writer);
394 drop(master);
395
396 let code = match child.wait() {
397 Ok(status) => portable_exit_code(status),
398 Err(_) => -9,
399 };
400 drop(child);
401 #[cfg(windows)]
402 drop(_job);
403
404 self.store_returncode(code);
405 self.mark_reader_closed();
406 Ok(())
407 }
408
409 #[inline(never)]
411 pub fn close_nonblocking(&self) {
412 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_nonblocking");
413 #[cfg(windows)]
414 self.request_terminal_input_relay_stop();
415 let Ok(mut guard) = self.handles.lock() else {
416 return;
417 };
418 let Some(handles) = guard.take() else {
419 self.mark_reader_closed();
420 return;
421 };
422 drop(guard);
423
424 #[cfg(windows)]
425 let NativePtyHandles {
426 master,
427 writer,
428 mut child,
429 _job,
430 } = handles;
431 #[cfg(not(windows))]
432 let NativePtyHandles {
433 master,
434 writer,
435 mut child,
436 } = handles;
437
438 if let Err(err) = child.kill() {
439 if !is_ignorable_process_control_error(&err) {
440 return;
441 }
442 }
443 drop(writer);
444 drop(master);
445 drop(child);
446 #[cfg(windows)]
447 drop(_job);
448 self.mark_reader_closed();
449 }
450
451 pub fn start_impl(&self) -> Result<(), PtyError> {
452 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::start");
453 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
454 if guard.is_some() {
455 return Err(PtyError::AlreadyStarted);
456 }
457
458 #[cfg(windows)]
461 let conhost_pids_before = conhost_children_of_current_process();
462
463 let pty_system = native_pty_system();
464 let pair = pty_system
465 .openpty(PtySize {
466 rows: self.rows,
467 cols: self.cols,
468 pixel_width: 0,
469 pixel_height: 0,
470 })
471 .map_err(|e| PtyError::Spawn(e.to_string()))?;
472
473 let mut cmd = command_builder_from_argv(&self.argv);
474 if let Some(cwd) = &self.cwd {
475 cmd.cwd(cwd);
476 }
477 if let Some(env) = &self.env {
478 cmd.env_clear();
479 for (key, value) in env {
480 cmd.env(key, value);
481 }
482 }
483
484 let reader = pair.master.try_clone_reader().map_err(|e| PtyError::Spawn(e.to_string()))?;
485 let writer = pair.master.take_writer().map_err(|e| PtyError::Spawn(e.to_string()))?;
486 let child = pair.slave.spawn_command(cmd).map_err(|e| PtyError::Spawn(e.to_string()))?;
487 #[cfg(windows)]
488 let job = assign_child_to_windows_kill_on_close_job(child.as_raw_handle())?;
489 #[cfg(windows)]
490 assign_conpty_conhost_to_job(&job, &conhost_pids_before);
491 #[cfg(windows)]
492 apply_windows_pty_priority(child.as_raw_handle(), self.nice)?;
493 let shared = Arc::clone(&self.reader);
494 let echo = Arc::clone(&self.echo);
495 let idle_detector = Arc::clone(&self.idle_detector);
496 let output_bytes = Arc::clone(&self.output_bytes_total);
497 let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
498 thread::spawn(move || {
499 spawn_pty_reader(
500 reader,
501 shared,
502 echo,
503 idle_detector,
504 output_bytes,
505 churn_bytes,
506 );
507 });
508
509 *guard = Some(NativePtyHandles {
510 master: pair.master,
511 writer,
512 child,
513 #[cfg(windows)]
514 _job: job,
515 });
516 Ok(())
517 }
518
519 pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
520 #[cfg(windows)]
521 {
522 pty_windows::respond_to_queries(self, data)
523 }
524
525 #[cfg(unix)]
526 {
527 pty_platform::respond_to_queries(self, data)
528 }
529 }
530
531 pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
532 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::resize");
533 let guard = self.handles.lock().expect("pty handles mutex poisoned");
534 if let Some(handles) = guard.as_ref() {
535 handles
536 .master
537 .resize(PtySize {
538 rows,
539 cols,
540 pixel_width: 0,
541 pixel_height: 0,
542 })
543 .map_err(|e| PtyError::Other(e.to_string()))?;
544 }
545 Ok(())
546 }
547
548 pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
549 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::send_interrupt");
550 #[cfg(windows)]
551 {
552 pty_windows::send_interrupt(self)
553 }
554
555 #[cfg(unix)]
556 {
557 pty_platform::send_interrupt(self)
558 }
559 }
560
561 pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
562 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::wait");
563 if let Some(code) = *self.returncode.lock().expect("pty returncode mutex poisoned") {
565 return Ok(code);
566 }
567 let start = Instant::now();
568 loop {
569 if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
570 return Ok(code);
571 }
572 if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
573 return Err(PtyError::Timeout);
574 }
575 thread::sleep(Duration::from_millis(10));
576 }
577 }
578
579 pub fn terminate_impl(&self) -> Result<(), PtyError> {
580 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate");
581 #[cfg(windows)]
582 {
583 pty_windows::terminate(self)
584 }
585
586 #[cfg(unix)]
587 {
588 pty_platform::terminate(self)
589 }
590 }
591
592 pub fn kill_impl(&self) -> Result<(), PtyError> {
593 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill");
594 #[cfg(windows)]
595 {
596 pty_windows::kill(self)
597 }
598
599 #[cfg(unix)]
600 {
601 pty_platform::kill(self)
602 }
603 }
604
605 pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
606 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate_tree");
607 #[cfg(windows)]
608 {
609 pty_windows::terminate_tree(self)
610 }
611
612 #[cfg(unix)]
613 {
614 pty_platform::terminate_tree(self)
615 }
616 }
617
618 pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
619 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill_tree");
620 #[cfg(windows)]
621 {
622 pty_windows::kill_tree(self)
623 }
624
625 #[cfg(unix)]
626 {
627 pty_platform::kill_tree(self)
628 }
629 }
630
631 pub fn pid(&self) -> Result<Option<u32>, PtyError> {
633 let guard = self.handles.lock().expect("pty handles mutex poisoned");
634 if let Some(handles) = guard.as_ref() {
635 #[cfg(unix)]
636 if let Some(pid) = handles.master.process_group_leader() {
637 if let Ok(pid) = u32::try_from(pid) {
638 return Ok(Some(pid));
639 }
640 }
641 return Ok(handles.child.process_id());
642 }
643 Ok(None)
644 }
645
646 pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
649 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
650 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
651 loop {
652 if let Some(chunk) = guard.chunks.pop_front() {
653 return Ok(Some(chunk));
654 }
655 if guard.closed {
656 return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
657 }
658 match deadline {
659 Some(deadline) => {
660 let now = Instant::now();
661 if now >= deadline {
662 return Ok(None); }
664 let wait = deadline.saturating_duration_since(now);
665 let result = self
666 .reader
667 .condvar
668 .wait_timeout(guard, wait)
669 .expect("pty read mutex poisoned");
670 guard = result.0;
671 }
672 None => {
673 guard = self
674 .reader
675 .condvar
676 .wait(guard)
677 .expect("pty read mutex poisoned");
678 }
679 }
680 }
681 }
682
683 pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
685 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
686 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
687 loop {
688 if guard.closed {
689 return true;
690 }
691 match deadline {
692 Some(deadline) => {
693 let now = Instant::now();
694 if now >= deadline {
695 return false;
696 }
697 let wait = deadline.saturating_duration_since(now);
698 let result = self
699 .reader
700 .condvar
701 .wait_timeout(guard, wait)
702 .expect("pty read mutex poisoned");
703 guard = result.0;
704 }
705 None => {
706 guard = self
707 .reader
708 .condvar
709 .wait(guard)
710 .expect("pty read mutex poisoned");
711 }
712 }
713 }
714 }
715
716 pub fn wait_and_drain_impl(
718 &self,
719 timeout: Option<f64>,
720 drain_timeout: f64,
721 ) -> Result<i32, PtyError> {
722 let code = self.wait_impl(timeout)?;
723 let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
724 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
725 while !guard.closed {
726 let remaining = deadline.saturating_duration_since(Instant::now());
727 if remaining.is_zero() {
728 break;
729 }
730 let result = self
731 .reader
732 .condvar
733 .wait_timeout(guard, remaining)
734 .expect("pty read mutex poisoned");
735 guard = result.0;
736 }
737 Ok(code)
738 }
739
740 pub fn set_echo(&self, enabled: bool) {
741 self.echo.store(enabled, Ordering::Release);
742 }
743
744 pub fn echo_enabled(&self) -> bool {
745 self.echo.load(Ordering::Acquire)
746 }
747
748 pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
749 let mut guard = self
750 .idle_detector
751 .lock()
752 .expect("idle detector mutex poisoned");
753 *guard = Some(Arc::clone(detector));
754 }
755
756 pub fn detach_idle_detector(&self) {
757 let mut guard = self
758 .idle_detector
759 .lock()
760 .expect("idle detector mutex poisoned");
761 *guard = None;
762 }
763
764 pub fn pty_input_bytes_total(&self) -> usize {
765 self.input_bytes_total.load(Ordering::Acquire)
766 }
767
768 pub fn pty_newline_events_total(&self) -> usize {
769 self.newline_events_total.load(Ordering::Acquire)
770 }
771
772 pub fn pty_submit_events_total(&self) -> usize {
773 self.submit_events_total.load(Ordering::Acquire)
774 }
775
776 pub fn pty_output_bytes_total(&self) -> usize {
777 self.output_bytes_total.load(Ordering::Acquire)
778 }
779
780 pub fn pty_control_churn_bytes_total(&self) -> usize {
781 self.control_churn_bytes_total.load(Ordering::Acquire)
782 }
783}
784
785impl Drop for NativePtyProcess {
786 fn drop(&mut self) {
787 self.close_nonblocking();
788 }
789}
790
791pub fn control_churn_bytes(data: &[u8]) -> usize {
794 let mut total = 0;
795 let mut index = 0;
796 while index < data.len() {
797 let byte = data[index];
798 if byte == 0x1B {
799 let start = index;
800 index += 1;
801 if index < data.len() && data[index] == b'[' {
802 index += 1;
803 while index < data.len() {
804 let current = data[index];
805 index += 1;
806 if (0x40..=0x7E).contains(¤t) {
807 break;
808 }
809 }
810 }
811 total += index - start;
812 continue;
813 }
814 if matches!(byte, 0x08 | 0x0D | 0x7F) {
815 total += 1;
816 }
817 index += 1;
818 }
819 total
820}
821
822pub fn command_builder_from_argv(argv: &[String]) -> CommandBuilder {
823 let mut command = CommandBuilder::new(&argv[0]);
824 if argv.len() > 1 {
825 command.args(
826 argv[1..]
827 .iter()
828 .map(OsString::from)
829 .collect::<Vec<OsString>>(),
830 );
831 }
832 command
833}
834
835#[inline(never)]
836pub fn spawn_pty_reader(
837 mut reader: Box<dyn Read + Send>,
838 shared: Arc<PtyReadShared>,
839 echo: Arc<AtomicBool>,
840 idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
841 output_bytes_total: Arc<AtomicUsize>,
842 control_churn_bytes_total: Arc<AtomicUsize>,
843) {
844 crate::rp_rust_debug_scope!("running_process_core::spawn_pty_reader");
845 let idle_detector_snapshot = idle_detector
846 .lock()
847 .expect("idle detector mutex poisoned")
848 .clone();
849 let mut chunk = vec![0_u8; 65536];
850 loop {
851 match reader.read(&mut chunk) {
852 Ok(0) => break,
853 Ok(n) => {
854 let data = &chunk[..n];
855
856 let churn = control_churn_bytes(data);
857 let visible = data.len().saturating_sub(churn);
858 output_bytes_total.fetch_add(visible, Ordering::Relaxed);
859 control_churn_bytes_total.fetch_add(churn, Ordering::Relaxed);
860
861 if echo.load(Ordering::Relaxed) {
862 let _ = std::io::stdout().write_all(data);
863 let _ = std::io::stdout().flush();
864 }
865
866 if let Some(ref detector) = idle_detector_snapshot {
867 detector.record_output(data);
868 }
869
870 let mut guard = shared.state.lock().expect("pty read mutex poisoned");
871 guard.chunks.push_back(data.to_vec());
872 shared.condvar.notify_all();
873 }
874 Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
875 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
876 thread::sleep(Duration::from_millis(10));
877 continue;
878 }
879 Err(_) => break,
880 }
881 }
882 let mut guard = shared.state.lock().expect("pty read mutex poisoned");
883 guard.closed = true;
884 shared.condvar.notify_all();
885}
886
887pub fn portable_exit_code(status: portable_pty::ExitStatus) -> i32 {
888 if let Some(signal) = status.signal() {
889 let signal = signal.to_ascii_lowercase();
890 if signal.contains("interrupt") {
891 return -2;
892 }
893 if signal.contains("terminated") {
894 return -15;
895 }
896 if signal.contains("killed") {
897 return -9;
898 }
899 }
900 status.exit_code() as i32
901}
902
903pub fn input_contains_newline(data: &[u8]) -> bool {
904 data.iter().any(|byte| matches!(*byte, b'\r' | b'\n'))
905}
906
907pub fn record_pty_input_metrics(
908 input_bytes_total: &Arc<AtomicUsize>,
909 newline_events_total: &Arc<AtomicUsize>,
910 submit_events_total: &Arc<AtomicUsize>,
911 data: &[u8],
912 submit: bool,
913) {
914 input_bytes_total.fetch_add(data.len(), Ordering::AcqRel);
915 if input_contains_newline(data) {
916 newline_events_total.fetch_add(1, Ordering::AcqRel);
917 }
918 if submit {
919 submit_events_total.fetch_add(1, Ordering::AcqRel);
920 }
921}
922
923pub fn store_pty_returncode(returncode: &Arc<Mutex<Option<i32>>>, code: i32) {
924 *returncode.lock().expect("pty returncode mutex poisoned") = Some(code);
925}
926
927pub fn poll_pty_process(
928 handles: &Arc<Mutex<Option<NativePtyHandles>>>,
929 returncode: &Arc<Mutex<Option<i32>>>,
930) -> Result<Option<i32>, std::io::Error> {
931 let mut guard = handles.lock().expect("pty handles mutex poisoned");
932 let Some(handles) = guard.as_mut() else {
933 return Ok(*returncode.lock().expect("pty returncode mutex poisoned"));
934 };
935 let status = handles.child.try_wait()?;
936 let code = status.map(portable_exit_code);
937 if let Some(code) = code {
938 store_pty_returncode(returncode, code);
939 return Ok(Some(code));
940 }
941 Ok(None)
942}
943
944pub fn write_pty_input(
945 handles: &Arc<Mutex<Option<NativePtyHandles>>>,
946 data: &[u8],
947) -> Result<(), std::io::Error> {
948 let mut guard = handles.lock().expect("pty handles mutex poisoned");
949 let handles = guard.as_mut().ok_or_else(|| {
950 std::io::Error::new(
951 std::io::ErrorKind::NotConnected,
952 "Pseudo-terminal process is not running",
953 )
954 })?;
955 #[cfg(windows)]
956 let payload = pty_windows::input_payload(data);
957 #[cfg(unix)]
958 let payload = pty_platform::input_payload(data);
959 handles.writer.write_all(&payload)?;
960 handles.writer.flush()
961}
962
963#[cfg(windows)]
964pub fn windows_terminal_input_payload(data: &[u8]) -> Vec<u8> {
965 let mut translated = Vec::with_capacity(data.len());
966 let mut index = 0usize;
967 while index < data.len() {
968 let current = data[index];
969 if current == b'\r' {
970 translated.push(current);
971 if index + 1 < data.len() && data[index + 1] == b'\n' {
972 translated.push(b'\n');
973 index += 2;
974 continue;
975 }
976 index += 1;
977 continue;
978 }
979 if current == b'\n' {
980 translated.push(b'\r');
981 index += 1;
982 continue;
983 }
984 translated.push(current);
985 index += 1;
986 }
987 translated
988}
989
990#[cfg(windows)]
991#[inline(never)]
992pub fn assign_child_to_windows_kill_on_close_job(
993 handle: Option<std::os::windows::io::RawHandle>,
994) -> Result<WindowsJobHandle, PtyError> {
995 crate::rp_rust_debug_scope!(
996 "running_process_core::pty::assign_child_to_windows_kill_on_close_job"
997 );
998 use std::mem::zeroed;
999
1000 use winapi::shared::minwindef::FALSE;
1001 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
1002 use winapi::um::jobapi2::{
1003 AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
1004 };
1005 use winapi::um::winnt::{
1006 JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
1007 JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
1008 };
1009
1010 let Some(handle) = handle else {
1011 return Err(PtyError::Other(
1012 "Pseudo-terminal child does not expose a Windows process handle".into(),
1013 ));
1014 };
1015
1016 let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
1017 if job.is_null() || job == INVALID_HANDLE_VALUE {
1018 return Err(PtyError::Io(std::io::Error::last_os_error()));
1019 }
1020
1021 let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
1022 info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
1023 let result = unsafe {
1024 SetInformationJobObject(
1025 job,
1026 JobObjectExtendedLimitInformation,
1027 (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
1028 std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
1029 )
1030 };
1031 if result == FALSE {
1032 let err = std::io::Error::last_os_error();
1033 unsafe {
1034 winapi::um::handleapi::CloseHandle(job);
1035 }
1036 return Err(PtyError::Io(err));
1037 }
1038
1039 let result = unsafe { AssignProcessToJobObject(job, handle.cast()) };
1040 if result == FALSE {
1041 let err = std::io::Error::last_os_error();
1042 unsafe {
1043 winapi::um::handleapi::CloseHandle(job);
1044 }
1045 return Err(PtyError::Io(err));
1046 }
1047
1048 Ok(WindowsJobHandle(job as usize))
1049}
1050
1051#[cfg(windows)]
1053#[derive(Debug, Clone)]
1054pub struct ChildProcessInfo {
1055 pub pid: u32,
1056 pub name: String,
1057}
1058
1059#[cfg(windows)]
1062pub fn find_child_processes(parent_pid: u32) -> Vec<ChildProcessInfo> {
1063 use winapi::um::handleapi::CloseHandle;
1064 use winapi::um::tlhelp32::{
1065 CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1066 };
1067
1068 let mut children = Vec::new();
1069 let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1070 if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1071 return children;
1072 }
1073
1074 let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1075 entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1076
1077 if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1078 loop {
1079 if entry.th32ParentProcessID == parent_pid {
1080 let name_bytes = &entry.szExeFile;
1081 let name_len = name_bytes.iter().position(|&b| b == 0).unwrap_or(name_bytes.len());
1082 let name = String::from_utf8_lossy(
1083 &name_bytes[..name_len].iter().map(|&c| c as u8).collect::<Vec<u8>>(),
1084 )
1085 .into_owned();
1086 children.push(ChildProcessInfo {
1087 pid: entry.th32ProcessID,
1088 name,
1089 });
1090 }
1091 if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1092 break;
1093 }
1094 }
1095 }
1096
1097 unsafe { CloseHandle(snapshot) };
1098 children
1099}
1100
1101#[cfg(windows)]
1103fn conhost_children_of_current_process() -> Vec<u32> {
1104 let our_pid = std::process::id();
1105 find_child_processes(our_pid)
1106 .into_iter()
1107 .filter(|c| c.name.eq_ignore_ascii_case("conhost.exe"))
1108 .map(|c| c.pid)
1109 .collect()
1110}
1111
1112#[cfg(windows)]
1116fn assign_conpty_conhost_to_job(job: &WindowsJobHandle, before_pids: &[u32]) {
1117 let after_pids = conhost_children_of_current_process();
1118 for pid in after_pids {
1119 if !before_pids.contains(&pid) {
1120 let _ = job.assign_pid(pid);
1122 }
1123 }
1124}
1125
1126#[cfg(windows)]
1129#[derive(Debug, Clone)]
1130pub struct OrphanConhostInfo {
1131 pub pid: u32,
1133 pub parent_pid: u32,
1135 pub parent_name: String,
1137}
1138
1139#[cfg(windows)]
1145pub fn find_orphan_conhosts() -> Vec<OrphanConhostInfo> {
1146 use winapi::um::handleapi::CloseHandle;
1147 use winapi::um::tlhelp32::{
1148 CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1149 };
1150
1151 let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1152 if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1153 return Vec::new();
1154 }
1155
1156 let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1157 entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1158
1159 let mut all_pids = std::collections::HashSet::new();
1161 let mut conhosts: Vec<(u32, u32)> = Vec::new(); let mut parent_names: std::collections::HashMap<u32, String> = std::collections::HashMap::new();
1163
1164 if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1165 loop {
1166 let name_bytes = &entry.szExeFile;
1167 let name_len = name_bytes.iter().position(|&b| b == 0).unwrap_or(name_bytes.len());
1168 let name = String::from_utf8_lossy(
1169 &name_bytes[..name_len].iter().map(|&c| c as u8).collect::<Vec<u8>>(),
1170 )
1171 .into_owned();
1172
1173 all_pids.insert(entry.th32ProcessID);
1174 parent_names.insert(entry.th32ProcessID, name.clone());
1175
1176 if name.eq_ignore_ascii_case("conhost.exe") {
1177 conhosts.push((entry.th32ProcessID, entry.th32ParentProcessID));
1178 }
1179
1180 if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1181 break;
1182 }
1183 }
1184 }
1185
1186 unsafe { CloseHandle(snapshot) };
1187
1188 conhosts
1190 .into_iter()
1191 .filter(|&(_, parent_pid)| !all_pids.contains(&parent_pid))
1192 .map(|(pid, parent_pid)| OrphanConhostInfo {
1193 pid,
1194 parent_pid,
1195 parent_name: parent_names.get(&parent_pid).cloned().unwrap_or_default(),
1196 })
1197 .collect()
1198}
1199
1200#[cfg(windows)]
1201#[inline(never)]
1202pub fn apply_windows_pty_priority(
1203 handle: Option<std::os::windows::io::RawHandle>,
1204 nice: Option<i32>,
1205) -> Result<(), PtyError> {
1206 crate::rp_rust_debug_scope!("running_process_core::pty::apply_windows_pty_priority");
1207 use winapi::um::processthreadsapi::SetPriorityClass;
1208 use winapi::um::winbase::{
1209 ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, HIGH_PRIORITY_CLASS,
1210 IDLE_PRIORITY_CLASS,
1211 };
1212
1213 let Some(handle) = handle else {
1214 return Ok(());
1215 };
1216 let flags = match nice {
1217 Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
1218 Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
1219 Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
1220 Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
1221 _ => 0,
1222 };
1223 if flags == 0 {
1224 return Ok(());
1225 }
1226 let result = unsafe { SetPriorityClass(handle.cast(), flags) };
1227 if result == 0 {
1228 return Err(PtyError::Io(std::io::Error::last_os_error()));
1229 }
1230 Ok(())
1231}