1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::io::{Read, Write};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
10use thiserror::Error;
11
12pub mod reexports {
14 pub use portable_pty;
15}
16
17#[cfg(unix)]
18mod pty_posix;
19#[cfg(windows)]
20mod pty_windows;
21
22pub mod terminal_input;
23
24#[cfg(unix)]
25use pty_posix as pty_platform;
26
27#[derive(Debug, Error)]
28pub enum PtyError {
29 #[error("pseudo-terminal process already started")]
30 AlreadyStarted,
31 #[error("pseudo-terminal process is not running")]
32 NotRunning,
33 #[error("pseudo-terminal timed out")]
34 Timeout,
35 #[error("pseudo-terminal I/O error: {0}")]
36 Io(#[from] std::io::Error),
37 #[error("pseudo-terminal spawn failed: {0}")]
38 Spawn(String),
39 #[error("pseudo-terminal error: {0}")]
40 Other(String),
41}
42
43pub fn is_ignorable_process_control_error(err: &std::io::Error) -> bool {
44 if matches!(
45 err.kind(),
46 std::io::ErrorKind::NotFound | std::io::ErrorKind::InvalidInput
47 ) {
48 return true;
49 }
50 #[cfg(unix)]
51 if err.raw_os_error() == Some(libc::ESRCH) {
52 return true;
53 }
54 false
55}
56
57pub struct PtyReadState {
58 pub chunks: VecDeque<Vec<u8>>,
59 pub closed: bool,
60}
61
62pub struct PtyReadShared {
63 pub state: Mutex<PtyReadState>,
64 pub condvar: Condvar,
65}
66
67pub struct NativePtyHandles {
68 pub master: Box<dyn MasterPty + Send>,
69 pub writer: Box<dyn Write + Send>,
70 pub child: Box<dyn portable_pty::Child + Send + Sync>,
71 #[cfg(windows)]
72 pub _job: WindowsJobHandle,
73}
74
75#[cfg(windows)]
76pub struct WindowsJobHandle(pub usize);
77
78#[cfg(windows)]
79impl Drop for WindowsJobHandle {
80 fn drop(&mut self) {
81 unsafe {
82 winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
83 }
84 }
85}
86
87pub struct IdleMonitorState {
88 pub last_reset_at: Instant,
89 pub returncode: Option<i32>,
90 pub interrupted: bool,
91}
92
93pub struct IdleDetectorCore {
96 pub timeout_seconds: f64,
97 pub stability_window_seconds: f64,
98 pub sample_interval_seconds: f64,
99 pub reset_on_input: bool,
100 pub reset_on_output: bool,
101 pub count_control_churn_as_output: bool,
102 pub enabled: Arc<AtomicBool>,
103 pub state: Mutex<IdleMonitorState>,
104 pub condvar: Condvar,
105}
106
107impl IdleDetectorCore {
108 pub fn record_input(&self, byte_count: usize) {
109 if !self.reset_on_input || byte_count == 0 {
110 return;
111 }
112 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
113 guard.last_reset_at = Instant::now();
114 self.condvar.notify_all();
115 }
116
117 pub fn record_output(&self, data: &[u8]) {
118 if !self.reset_on_output || data.is_empty() {
119 return;
120 }
121 let control_bytes = control_churn_bytes(data);
122 let visible_output_bytes = data.len().saturating_sub(control_bytes);
123 let active_output =
124 visible_output_bytes > 0 || (self.count_control_churn_as_output && control_bytes > 0);
125 if !active_output {
126 return;
127 }
128 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
129 guard.last_reset_at = Instant::now();
130 self.condvar.notify_all();
131 }
132
133 pub fn mark_exit(&self, returncode: i32, interrupted: bool) {
134 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
135 guard.returncode = Some(returncode);
136 guard.interrupted = interrupted;
137 self.condvar.notify_all();
138 }
139
140 pub fn enabled(&self) -> bool {
141 self.enabled.load(Ordering::Acquire)
142 }
143
144 pub fn set_enabled(&self, enabled: bool) {
145 let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
146 if enabled && !was_enabled {
147 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
148 guard.last_reset_at = Instant::now();
149 }
150 self.condvar.notify_all();
151 }
152
153 pub fn wait(&self, timeout: Option<f64>) -> (bool, String, f64, Option<i32>) {
154 let started = Instant::now();
155 let overall_timeout = timeout.map(Duration::from_secs_f64);
156 let min_idle = self.timeout_seconds.max(self.stability_window_seconds);
157 let sample_interval = Duration::from_secs_f64(self.sample_interval_seconds.max(0.001));
158
159 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
160 loop {
161 let now = Instant::now();
162 let idle_for = now.duration_since(guard.last_reset_at).as_secs_f64();
163
164 if let Some(returncode) = guard.returncode {
165 let reason = if guard.interrupted {
166 "interrupt"
167 } else {
168 "process_exit"
169 };
170 return (false, reason.to_string(), idle_for, Some(returncode));
171 }
172
173 let enabled = self.enabled.load(Ordering::Acquire);
174 if enabled && idle_for >= min_idle {
175 return (true, "idle_timeout".to_string(), idle_for, None);
176 }
177
178 if let Some(limit) = overall_timeout {
179 if now.duration_since(started) >= limit {
180 return (false, "timeout".to_string(), idle_for, None);
181 }
182 }
183
184 let idle_remaining = if enabled {
185 (min_idle - idle_for).max(0.0)
186 } else {
187 sample_interval.as_secs_f64()
188 };
189 let mut wait_for =
190 sample_interval.min(Duration::from_secs_f64(idle_remaining.max(0.001)));
191 if let Some(limit) = overall_timeout {
192 let elapsed = now.duration_since(started);
193 if elapsed < limit {
194 let remaining = limit - elapsed;
195 wait_for = wait_for.min(remaining);
196 }
197 }
198 let result = self
199 .condvar
200 .wait_timeout(guard, wait_for)
201 .expect("idle monitor mutex poisoned");
202 guard = result.0;
203 }
204 }
205}
206
207pub struct NativePtyProcess {
208 pub argv: Vec<String>,
209 pub cwd: Option<String>,
210 pub env: Option<Vec<(String, String)>>,
211 pub rows: u16,
212 pub cols: u16,
213 #[cfg(windows)]
214 pub nice: Option<i32>,
215 pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
216 pub reader: Arc<PtyReadShared>,
217 pub returncode: Arc<Mutex<Option<i32>>>,
218 pub input_bytes_total: Arc<AtomicUsize>,
219 pub newline_events_total: Arc<AtomicUsize>,
220 pub submit_events_total: Arc<AtomicUsize>,
221 pub echo: Arc<AtomicBool>,
223 pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
225 pub output_bytes_total: Arc<AtomicUsize>,
227 pub control_churn_bytes_total: Arc<AtomicUsize>,
229 #[cfg(windows)]
230 pub terminal_input_relay_stop: Arc<AtomicBool>,
231 #[cfg(windows)]
232 pub terminal_input_relay_active: Arc<AtomicBool>,
233 #[cfg(windows)]
234 pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
235}
236
237impl NativePtyProcess {
238 pub fn new(
239 argv: Vec<String>,
240 cwd: Option<String>,
241 env: Option<Vec<(String, String)>>,
242 rows: u16,
243 cols: u16,
244 nice: Option<i32>,
245 ) -> Result<Self, PtyError> {
246 if argv.is_empty() {
247 return Err(PtyError::Other("command cannot be empty".into()));
248 }
249 #[cfg(not(windows))]
250 let _ = nice;
251 Ok(Self {
252 argv,
253 cwd,
254 env,
255 rows,
256 cols,
257 #[cfg(windows)]
258 nice,
259 handles: Arc::new(Mutex::new(None)),
260 reader: Arc::new(PtyReadShared {
261 state: Mutex::new(PtyReadState {
262 chunks: VecDeque::new(),
263 closed: false,
264 }),
265 condvar: Condvar::new(),
266 }),
267 returncode: Arc::new(Mutex::new(None)),
268 input_bytes_total: Arc::new(AtomicUsize::new(0)),
269 newline_events_total: Arc::new(AtomicUsize::new(0)),
270 submit_events_total: Arc::new(AtomicUsize::new(0)),
271 echo: Arc::new(AtomicBool::new(false)),
272 idle_detector: Arc::new(Mutex::new(None)),
273 output_bytes_total: Arc::new(AtomicUsize::new(0)),
274 control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
275 #[cfg(windows)]
276 terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
277 #[cfg(windows)]
278 terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
279 #[cfg(windows)]
280 terminal_input_relay_worker: Mutex::new(None),
281 })
282 }
283
284 pub fn mark_reader_closed(&self) {
285 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
286 guard.closed = true;
287 self.reader.condvar.notify_all();
288 }
289
290 pub fn store_returncode(&self, code: i32) {
291 store_pty_returncode(&self.returncode, code);
292 }
293
294 pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
295 record_pty_input_metrics(
296 &self.input_bytes_total,
297 &self.newline_events_total,
298 &self.submit_events_total,
299 data,
300 submit,
301 );
302 }
303
304 pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
305 self.record_input_metrics(data, submit);
306 write_pty_input(&self.handles, data)?;
307 Ok(())
308 }
309
310 #[cfg(windows)]
311 pub fn request_terminal_input_relay_stop(&self) {
312 self.terminal_input_relay_stop
313 .store(true, Ordering::Release);
314 self.terminal_input_relay_active
315 .store(false, Ordering::Release);
316 }
317
318 #[cfg(windows)]
319 pub fn stop_terminal_input_relay_impl(&self) {
320 self.request_terminal_input_relay_stop();
321 if let Some(worker) = self
322 .terminal_input_relay_worker
323 .lock()
324 .expect("pty terminal input relay mutex poisoned")
325 .take()
326 {
327 let _ = worker.join();
328 }
329 }
330
331 #[cfg(not(windows))]
332 pub fn stop_terminal_input_relay_impl(&self) {}
333
334 #[inline(never)]
336 pub fn close_impl(&self) -> Result<(), PtyError> {
337 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_impl");
338 self.stop_terminal_input_relay_impl();
339 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
340 let Some(handles) = guard.take() else {
341 self.mark_reader_closed();
342 return Ok(());
343 };
344 drop(guard);
345
346 #[cfg(windows)]
347 let NativePtyHandles {
348 master,
349 writer,
350 mut child,
351 _job,
352 } = handles;
353 #[cfg(not(windows))]
354 let NativePtyHandles {
355 master,
356 writer,
357 mut child,
358 } = handles;
359
360 if let Err(err) = child.kill() {
361 if !is_ignorable_process_control_error(&err) {
362 return Err(PtyError::Io(err));
363 }
364 }
365
366 drop(writer);
367 drop(master);
368
369 let code = match child.wait() {
370 Ok(status) => portable_exit_code(status),
371 Err(_) => -9,
372 };
373 drop(child);
374 #[cfg(windows)]
375 drop(_job);
376
377 self.store_returncode(code);
378 self.mark_reader_closed();
379 Ok(())
380 }
381
382 #[inline(never)]
384 pub fn close_nonblocking(&self) {
385 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_nonblocking");
386 #[cfg(windows)]
387 self.request_terminal_input_relay_stop();
388 let Ok(mut guard) = self.handles.lock() else {
389 return;
390 };
391 let Some(handles) = guard.take() else {
392 self.mark_reader_closed();
393 return;
394 };
395 drop(guard);
396
397 #[cfg(windows)]
398 let NativePtyHandles {
399 master,
400 writer,
401 mut child,
402 _job,
403 } = handles;
404 #[cfg(not(windows))]
405 let NativePtyHandles {
406 master,
407 writer,
408 mut child,
409 } = handles;
410
411 if let Err(err) = child.kill() {
412 if !is_ignorable_process_control_error(&err) {
413 return;
414 }
415 }
416 drop(writer);
417 drop(master);
418 drop(child);
419 #[cfg(windows)]
420 drop(_job);
421 self.mark_reader_closed();
422 }
423
424 pub fn start_impl(&self) -> Result<(), PtyError> {
425 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::start");
426 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
427 if guard.is_some() {
428 return Err(PtyError::AlreadyStarted);
429 }
430
431 let pty_system = native_pty_system();
432 let pair = pty_system
433 .openpty(PtySize {
434 rows: self.rows,
435 cols: self.cols,
436 pixel_width: 0,
437 pixel_height: 0,
438 })
439 .map_err(|e| PtyError::Spawn(e.to_string()))?;
440
441 let mut cmd = command_builder_from_argv(&self.argv);
442 if let Some(cwd) = &self.cwd {
443 cmd.cwd(cwd);
444 }
445 if let Some(env) = &self.env {
446 cmd.env_clear();
447 for (key, value) in env {
448 cmd.env(key, value);
449 }
450 }
451
452 let reader = pair.master.try_clone_reader().map_err(|e| PtyError::Spawn(e.to_string()))?;
453 let writer = pair.master.take_writer().map_err(|e| PtyError::Spawn(e.to_string()))?;
454 let child = pair.slave.spawn_command(cmd).map_err(|e| PtyError::Spawn(e.to_string()))?;
455 #[cfg(windows)]
456 let job = assign_child_to_windows_kill_on_close_job(child.as_raw_handle())?;
457 #[cfg(windows)]
458 apply_windows_pty_priority(child.as_raw_handle(), self.nice)?;
459 let shared = Arc::clone(&self.reader);
460 let echo = Arc::clone(&self.echo);
461 let idle_detector = Arc::clone(&self.idle_detector);
462 let output_bytes = Arc::clone(&self.output_bytes_total);
463 let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
464 thread::spawn(move || {
465 spawn_pty_reader(
466 reader,
467 shared,
468 echo,
469 idle_detector,
470 output_bytes,
471 churn_bytes,
472 );
473 });
474
475 *guard = Some(NativePtyHandles {
476 master: pair.master,
477 writer,
478 child,
479 #[cfg(windows)]
480 _job: job,
481 });
482 Ok(())
483 }
484
485 pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
486 #[cfg(windows)]
487 {
488 pty_windows::respond_to_queries(self, data)
489 }
490
491 #[cfg(unix)]
492 {
493 pty_platform::respond_to_queries(self, data)
494 }
495 }
496
497 pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
498 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::resize");
499 let guard = self.handles.lock().expect("pty handles mutex poisoned");
500 if let Some(handles) = guard.as_ref() {
501 handles
502 .master
503 .resize(PtySize {
504 rows,
505 cols,
506 pixel_width: 0,
507 pixel_height: 0,
508 })
509 .map_err(|e| PtyError::Other(e.to_string()))?;
510 }
511 Ok(())
512 }
513
514 pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
515 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::send_interrupt");
516 #[cfg(windows)]
517 {
518 pty_windows::send_interrupt(self)
519 }
520
521 #[cfg(unix)]
522 {
523 pty_platform::send_interrupt(self)
524 }
525 }
526
527 pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
528 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::wait");
529 if let Some(code) = *self.returncode.lock().expect("pty returncode mutex poisoned") {
531 return Ok(code);
532 }
533 let start = Instant::now();
534 loop {
535 if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
536 return Ok(code);
537 }
538 if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
539 return Err(PtyError::Timeout);
540 }
541 thread::sleep(Duration::from_millis(10));
542 }
543 }
544
545 pub fn terminate_impl(&self) -> Result<(), PtyError> {
546 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate");
547 #[cfg(windows)]
548 {
549 pty_windows::terminate(self)
550 }
551
552 #[cfg(unix)]
553 {
554 pty_platform::terminate(self)
555 }
556 }
557
558 pub fn kill_impl(&self) -> Result<(), PtyError> {
559 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill");
560 #[cfg(windows)]
561 {
562 pty_windows::kill(self)
563 }
564
565 #[cfg(unix)]
566 {
567 pty_platform::kill(self)
568 }
569 }
570
571 pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
572 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate_tree");
573 #[cfg(windows)]
574 {
575 pty_windows::terminate_tree(self)
576 }
577
578 #[cfg(unix)]
579 {
580 pty_platform::terminate_tree(self)
581 }
582 }
583
584 pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
585 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill_tree");
586 #[cfg(windows)]
587 {
588 pty_windows::kill_tree(self)
589 }
590
591 #[cfg(unix)]
592 {
593 pty_platform::kill_tree(self)
594 }
595 }
596
597 pub fn pid(&self) -> Result<Option<u32>, PtyError> {
599 let guard = self.handles.lock().expect("pty handles mutex poisoned");
600 if let Some(handles) = guard.as_ref() {
601 #[cfg(unix)]
602 if let Some(pid) = handles.master.process_group_leader() {
603 if let Ok(pid) = u32::try_from(pid) {
604 return Ok(Some(pid));
605 }
606 }
607 return Ok(handles.child.process_id());
608 }
609 Ok(None)
610 }
611
612 pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
615 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
616 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
617 loop {
618 if let Some(chunk) = guard.chunks.pop_front() {
619 return Ok(Some(chunk));
620 }
621 if guard.closed {
622 return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
623 }
624 match deadline {
625 Some(deadline) => {
626 let now = Instant::now();
627 if now >= deadline {
628 return Ok(None); }
630 let wait = deadline.saturating_duration_since(now);
631 let result = self
632 .reader
633 .condvar
634 .wait_timeout(guard, wait)
635 .expect("pty read mutex poisoned");
636 guard = result.0;
637 }
638 None => {
639 guard = self
640 .reader
641 .condvar
642 .wait(guard)
643 .expect("pty read mutex poisoned");
644 }
645 }
646 }
647 }
648
649 pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
651 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
652 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
653 loop {
654 if guard.closed {
655 return true;
656 }
657 match deadline {
658 Some(deadline) => {
659 let now = Instant::now();
660 if now >= deadline {
661 return false;
662 }
663 let wait = deadline.saturating_duration_since(now);
664 let result = self
665 .reader
666 .condvar
667 .wait_timeout(guard, wait)
668 .expect("pty read mutex poisoned");
669 guard = result.0;
670 }
671 None => {
672 guard = self
673 .reader
674 .condvar
675 .wait(guard)
676 .expect("pty read mutex poisoned");
677 }
678 }
679 }
680 }
681
682 pub fn wait_and_drain_impl(
684 &self,
685 timeout: Option<f64>,
686 drain_timeout: f64,
687 ) -> Result<i32, PtyError> {
688 let code = self.wait_impl(timeout)?;
689 let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
690 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
691 while !guard.closed {
692 let remaining = deadline.saturating_duration_since(Instant::now());
693 if remaining.is_zero() {
694 break;
695 }
696 let result = self
697 .reader
698 .condvar
699 .wait_timeout(guard, remaining)
700 .expect("pty read mutex poisoned");
701 guard = result.0;
702 }
703 Ok(code)
704 }
705
706 pub fn set_echo(&self, enabled: bool) {
707 self.echo.store(enabled, Ordering::Release);
708 }
709
710 pub fn echo_enabled(&self) -> bool {
711 self.echo.load(Ordering::Acquire)
712 }
713
714 pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
715 let mut guard = self
716 .idle_detector
717 .lock()
718 .expect("idle detector mutex poisoned");
719 *guard = Some(Arc::clone(detector));
720 }
721
722 pub fn detach_idle_detector(&self) {
723 let mut guard = self
724 .idle_detector
725 .lock()
726 .expect("idle detector mutex poisoned");
727 *guard = None;
728 }
729
730 pub fn pty_input_bytes_total(&self) -> usize {
731 self.input_bytes_total.load(Ordering::Acquire)
732 }
733
734 pub fn pty_newline_events_total(&self) -> usize {
735 self.newline_events_total.load(Ordering::Acquire)
736 }
737
738 pub fn pty_submit_events_total(&self) -> usize {
739 self.submit_events_total.load(Ordering::Acquire)
740 }
741
742 pub fn pty_output_bytes_total(&self) -> usize {
743 self.output_bytes_total.load(Ordering::Acquire)
744 }
745
746 pub fn pty_control_churn_bytes_total(&self) -> usize {
747 self.control_churn_bytes_total.load(Ordering::Acquire)
748 }
749}
750
751impl Drop for NativePtyProcess {
752 fn drop(&mut self) {
753 self.close_nonblocking();
754 }
755}
756
757pub fn control_churn_bytes(data: &[u8]) -> usize {
760 let mut total = 0;
761 let mut index = 0;
762 while index < data.len() {
763 let byte = data[index];
764 if byte == 0x1B {
765 let start = index;
766 index += 1;
767 if index < data.len() && data[index] == b'[' {
768 index += 1;
769 while index < data.len() {
770 let current = data[index];
771 index += 1;
772 if (0x40..=0x7E).contains(¤t) {
773 break;
774 }
775 }
776 }
777 total += index - start;
778 continue;
779 }
780 if matches!(byte, 0x08 | 0x0D | 0x7F) {
781 total += 1;
782 }
783 index += 1;
784 }
785 total
786}
787
788pub fn command_builder_from_argv(argv: &[String]) -> CommandBuilder {
789 let mut command = CommandBuilder::new(&argv[0]);
790 if argv.len() > 1 {
791 command.args(
792 argv[1..]
793 .iter()
794 .map(OsString::from)
795 .collect::<Vec<OsString>>(),
796 );
797 }
798 command
799}
800
801#[inline(never)]
802pub fn spawn_pty_reader(
803 mut reader: Box<dyn Read + Send>,
804 shared: Arc<PtyReadShared>,
805 echo: Arc<AtomicBool>,
806 idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
807 output_bytes_total: Arc<AtomicUsize>,
808 control_churn_bytes_total: Arc<AtomicUsize>,
809) {
810 crate::rp_rust_debug_scope!("running_process_core::spawn_pty_reader");
811 let idle_detector_snapshot = idle_detector
812 .lock()
813 .expect("idle detector mutex poisoned")
814 .clone();
815 let mut chunk = vec![0_u8; 65536];
816 loop {
817 match reader.read(&mut chunk) {
818 Ok(0) => break,
819 Ok(n) => {
820 let data = &chunk[..n];
821
822 let churn = control_churn_bytes(data);
823 let visible = data.len().saturating_sub(churn);
824 output_bytes_total.fetch_add(visible, Ordering::Relaxed);
825 control_churn_bytes_total.fetch_add(churn, Ordering::Relaxed);
826
827 if echo.load(Ordering::Relaxed) {
828 let _ = std::io::stdout().write_all(data);
829 let _ = std::io::stdout().flush();
830 }
831
832 if let Some(ref detector) = idle_detector_snapshot {
833 detector.record_output(data);
834 }
835
836 let mut guard = shared.state.lock().expect("pty read mutex poisoned");
837 guard.chunks.push_back(data.to_vec());
838 shared.condvar.notify_all();
839 }
840 Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
841 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
842 thread::sleep(Duration::from_millis(10));
843 continue;
844 }
845 Err(_) => break,
846 }
847 }
848 let mut guard = shared.state.lock().expect("pty read mutex poisoned");
849 guard.closed = true;
850 shared.condvar.notify_all();
851}
852
853pub fn portable_exit_code(status: portable_pty::ExitStatus) -> i32 {
854 if let Some(signal) = status.signal() {
855 let signal = signal.to_ascii_lowercase();
856 if signal.contains("interrupt") {
857 return -2;
858 }
859 if signal.contains("terminated") {
860 return -15;
861 }
862 if signal.contains("killed") {
863 return -9;
864 }
865 }
866 status.exit_code() as i32
867}
868
869pub fn input_contains_newline(data: &[u8]) -> bool {
870 data.iter().any(|byte| matches!(*byte, b'\r' | b'\n'))
871}
872
873pub fn record_pty_input_metrics(
874 input_bytes_total: &Arc<AtomicUsize>,
875 newline_events_total: &Arc<AtomicUsize>,
876 submit_events_total: &Arc<AtomicUsize>,
877 data: &[u8],
878 submit: bool,
879) {
880 input_bytes_total.fetch_add(data.len(), Ordering::AcqRel);
881 if input_contains_newline(data) {
882 newline_events_total.fetch_add(1, Ordering::AcqRel);
883 }
884 if submit {
885 submit_events_total.fetch_add(1, Ordering::AcqRel);
886 }
887}
888
889pub fn store_pty_returncode(returncode: &Arc<Mutex<Option<i32>>>, code: i32) {
890 *returncode.lock().expect("pty returncode mutex poisoned") = Some(code);
891}
892
893pub fn poll_pty_process(
894 handles: &Arc<Mutex<Option<NativePtyHandles>>>,
895 returncode: &Arc<Mutex<Option<i32>>>,
896) -> Result<Option<i32>, std::io::Error> {
897 let mut guard = handles.lock().expect("pty handles mutex poisoned");
898 let Some(handles) = guard.as_mut() else {
899 return Ok(*returncode.lock().expect("pty returncode mutex poisoned"));
900 };
901 let status = handles.child.try_wait()?;
902 let code = status.map(portable_exit_code);
903 if let Some(code) = code {
904 store_pty_returncode(returncode, code);
905 return Ok(Some(code));
906 }
907 Ok(None)
908}
909
910pub fn write_pty_input(
911 handles: &Arc<Mutex<Option<NativePtyHandles>>>,
912 data: &[u8],
913) -> Result<(), std::io::Error> {
914 let mut guard = handles.lock().expect("pty handles mutex poisoned");
915 let handles = guard.as_mut().ok_or_else(|| {
916 std::io::Error::new(
917 std::io::ErrorKind::NotConnected,
918 "Pseudo-terminal process is not running",
919 )
920 })?;
921 #[cfg(windows)]
922 let payload = pty_windows::input_payload(data);
923 #[cfg(unix)]
924 let payload = pty_platform::input_payload(data);
925 handles.writer.write_all(&payload)?;
926 handles.writer.flush()
927}
928
929#[cfg(windows)]
930pub fn windows_terminal_input_payload(data: &[u8]) -> Vec<u8> {
931 let mut translated = Vec::with_capacity(data.len());
932 let mut index = 0usize;
933 while index < data.len() {
934 let current = data[index];
935 if current == b'\r' {
936 translated.push(current);
937 if index + 1 < data.len() && data[index + 1] == b'\n' {
938 translated.push(b'\n');
939 index += 2;
940 continue;
941 }
942 index += 1;
943 continue;
944 }
945 if current == b'\n' {
946 translated.push(b'\r');
947 index += 1;
948 continue;
949 }
950 translated.push(current);
951 index += 1;
952 }
953 translated
954}
955
956#[cfg(windows)]
957#[inline(never)]
958pub fn assign_child_to_windows_kill_on_close_job(
959 handle: Option<std::os::windows::io::RawHandle>,
960) -> Result<WindowsJobHandle, PtyError> {
961 crate::rp_rust_debug_scope!(
962 "running_process_core::pty::assign_child_to_windows_kill_on_close_job"
963 );
964 use std::mem::zeroed;
965
966 use winapi::shared::minwindef::FALSE;
967 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
968 use winapi::um::jobapi2::{
969 AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
970 };
971 use winapi::um::winnt::{
972 JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
973 JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
974 };
975
976 let Some(handle) = handle else {
977 return Err(PtyError::Other(
978 "Pseudo-terminal child does not expose a Windows process handle".into(),
979 ));
980 };
981
982 let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
983 if job.is_null() || job == INVALID_HANDLE_VALUE {
984 return Err(PtyError::Io(std::io::Error::last_os_error()));
985 }
986
987 let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
988 info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
989 let result = unsafe {
990 SetInformationJobObject(
991 job,
992 JobObjectExtendedLimitInformation,
993 (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
994 std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
995 )
996 };
997 if result == FALSE {
998 let err = std::io::Error::last_os_error();
999 unsafe {
1000 winapi::um::handleapi::CloseHandle(job);
1001 }
1002 return Err(PtyError::Io(err));
1003 }
1004
1005 let result = unsafe { AssignProcessToJobObject(job, handle.cast()) };
1006 if result == FALSE {
1007 let err = std::io::Error::last_os_error();
1008 unsafe {
1009 winapi::um::handleapi::CloseHandle(job);
1010 }
1011 return Err(PtyError::Io(err));
1012 }
1013
1014 Ok(WindowsJobHandle(job as usize))
1015}
1016
1017#[cfg(windows)]
1018#[inline(never)]
1019pub fn apply_windows_pty_priority(
1020 handle: Option<std::os::windows::io::RawHandle>,
1021 nice: Option<i32>,
1022) -> Result<(), PtyError> {
1023 crate::rp_rust_debug_scope!("running_process_core::pty::apply_windows_pty_priority");
1024 use winapi::um::processthreadsapi::SetPriorityClass;
1025 use winapi::um::winbase::{
1026 ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, HIGH_PRIORITY_CLASS,
1027 IDLE_PRIORITY_CLASS,
1028 };
1029
1030 let Some(handle) = handle else {
1031 return Ok(());
1032 };
1033 let flags = match nice {
1034 Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
1035 Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
1036 Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
1037 Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
1038 _ => 0,
1039 };
1040 if flags == 0 {
1041 return Ok(());
1042 }
1043 let result = unsafe { SetPriorityClass(handle.cast(), flags) };
1044 if result == 0 {
1045 return Err(PtyError::Io(std::io::Error::last_os_error()));
1046 }
1047 Ok(())
1048}