1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::io::{Read, Write};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
10use thiserror::Error;
11
12pub mod reexports {
14 pub use portable_pty;
15}
16
17#[cfg(unix)]
18mod pty_posix;
19#[cfg(windows)]
20mod pty_windows;
21
22pub mod terminal_input;
23
24#[cfg(unix)]
25use pty_posix as pty_platform;
26
27#[derive(Debug, Error)]
28pub enum PtyError {
29 #[error("pseudo-terminal process already started")]
30 AlreadyStarted,
31 #[error("pseudo-terminal process is not running")]
32 NotRunning,
33 #[error("pseudo-terminal timed out")]
34 Timeout,
35 #[error("pseudo-terminal I/O error: {0}")]
36 Io(#[from] std::io::Error),
37 #[error("pseudo-terminal spawn failed: {0}")]
38 Spawn(String),
39 #[error("pseudo-terminal error: {0}")]
40 Other(String),
41}
42
43pub fn is_ignorable_process_control_error(err: &std::io::Error) -> bool {
44 if matches!(
45 err.kind(),
46 std::io::ErrorKind::NotFound | std::io::ErrorKind::InvalidInput
47 ) {
48 return true;
49 }
50 #[cfg(unix)]
51 if err.raw_os_error() == Some(libc::ESRCH) {
52 return true;
53 }
54 false
55}
56
57pub struct PtyReadState {
58 pub chunks: VecDeque<Vec<u8>>,
59 pub closed: bool,
60}
61
62pub struct PtyReadShared {
63 pub state: Mutex<PtyReadState>,
64 pub condvar: Condvar,
65}
66
67pub struct NativePtyHandles {
68 pub master: Box<dyn MasterPty + Send>,
69 pub writer: Box<dyn Write + Send>,
70 pub child: Box<dyn portable_pty::Child + Send + Sync>,
71 #[cfg(windows)]
72 pub _job: WindowsJobHandle,
73}
74
75#[cfg(windows)]
76pub struct WindowsJobHandle(pub usize);
77
78#[cfg(windows)]
79impl WindowsJobHandle {
80 pub fn assign_pid(&self, pid: u32) -> Result<(), std::io::Error> {
82 use winapi::um::handleapi::CloseHandle;
83 use winapi::um::processthreadsapi::OpenProcess;
84 use winapi::um::winnt::PROCESS_SET_QUOTA;
85 use winapi::um::winnt::PROCESS_TERMINATE;
86
87 let handle = unsafe { OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, 0, pid) };
88 if handle.is_null() {
89 return Err(std::io::Error::last_os_error());
90 }
91 let result = unsafe {
92 winapi::um::jobapi2::AssignProcessToJobObject(
93 self.0 as winapi::shared::ntdef::HANDLE,
94 handle,
95 )
96 };
97 unsafe { CloseHandle(handle) };
98 if result == 0 {
99 return Err(std::io::Error::last_os_error());
100 }
101 Ok(())
102 }
103}
104
105#[cfg(windows)]
106impl Drop for WindowsJobHandle {
107 fn drop(&mut self) {
108 unsafe {
109 winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
110 }
111 }
112}
113
114pub struct IdleMonitorState {
115 pub last_reset_at: Instant,
116 pub returncode: Option<i32>,
117 pub interrupted: bool,
118}
119
120pub struct IdleDetectorCore {
123 pub timeout_seconds: f64,
124 pub stability_window_seconds: f64,
125 pub sample_interval_seconds: f64,
126 pub reset_on_input: bool,
127 pub reset_on_output: bool,
128 pub count_control_churn_as_output: bool,
129 pub enabled: Arc<AtomicBool>,
130 pub state: Mutex<IdleMonitorState>,
131 pub condvar: Condvar,
132}
133
134impl IdleDetectorCore {
135 pub fn record_input(&self, byte_count: usize) {
136 if !self.reset_on_input || byte_count == 0 {
137 return;
138 }
139 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
140 guard.last_reset_at = Instant::now();
141 self.condvar.notify_all();
142 }
143
144 pub fn record_output(&self, data: &[u8]) {
145 if !self.reset_on_output || data.is_empty() {
146 return;
147 }
148 let control_bytes = control_churn_bytes(data);
149 let visible_output_bytes = data.len().saturating_sub(control_bytes);
150 let active_output =
151 visible_output_bytes > 0 || (self.count_control_churn_as_output && control_bytes > 0);
152 if !active_output {
153 return;
154 }
155 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
156 guard.last_reset_at = Instant::now();
157 self.condvar.notify_all();
158 }
159
160 pub fn mark_exit(&self, returncode: i32, interrupted: bool) {
161 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
162 guard.returncode = Some(returncode);
163 guard.interrupted = interrupted;
164 self.condvar.notify_all();
165 }
166
167 pub fn enabled(&self) -> bool {
168 self.enabled.load(Ordering::Acquire)
169 }
170
171 pub fn set_enabled(&self, enabled: bool) {
172 let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
173 if enabled && !was_enabled {
174 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
175 guard.last_reset_at = Instant::now();
176 }
177 self.condvar.notify_all();
178 }
179
180 pub fn wait(&self, timeout: Option<f64>) -> (bool, String, f64, Option<i32>) {
181 let started = Instant::now();
182 let overall_timeout = timeout.map(Duration::from_secs_f64);
183 let min_idle = self.timeout_seconds.max(self.stability_window_seconds);
184 let sample_interval = Duration::from_secs_f64(self.sample_interval_seconds.max(0.001));
185
186 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
187 loop {
188 let now = Instant::now();
189 let idle_for = now.duration_since(guard.last_reset_at).as_secs_f64();
190
191 if let Some(returncode) = guard.returncode {
192 let reason = if guard.interrupted {
193 "interrupt"
194 } else {
195 "process_exit"
196 };
197 return (false, reason.to_string(), idle_for, Some(returncode));
198 }
199
200 let enabled = self.enabled.load(Ordering::Acquire);
201 if enabled && idle_for >= min_idle {
202 return (true, "idle_timeout".to_string(), idle_for, None);
203 }
204
205 if let Some(limit) = overall_timeout {
206 if now.duration_since(started) >= limit {
207 return (false, "timeout".to_string(), idle_for, None);
208 }
209 }
210
211 let idle_remaining = if enabled {
212 (min_idle - idle_for).max(0.0)
213 } else {
214 sample_interval.as_secs_f64()
215 };
216 let mut wait_for =
217 sample_interval.min(Duration::from_secs_f64(idle_remaining.max(0.001)));
218 if let Some(limit) = overall_timeout {
219 let elapsed = now.duration_since(started);
220 if elapsed < limit {
221 let remaining = limit - elapsed;
222 wait_for = wait_for.min(remaining);
223 }
224 }
225 let result = self
226 .condvar
227 .wait_timeout(guard, wait_for)
228 .expect("idle monitor mutex poisoned");
229 guard = result.0;
230 }
231 }
232}
233
234pub struct NativePtyProcess {
235 pub argv: Vec<String>,
236 pub cwd: Option<String>,
237 pub env: Option<Vec<(String, String)>>,
238 pub rows: u16,
239 pub cols: u16,
240 #[cfg(windows)]
241 pub nice: Option<i32>,
242 pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
243 pub reader: Arc<PtyReadShared>,
244 pub returncode: Arc<Mutex<Option<i32>>>,
245 pub input_bytes_total: Arc<AtomicUsize>,
246 pub newline_events_total: Arc<AtomicUsize>,
247 pub submit_events_total: Arc<AtomicUsize>,
248 pub echo: Arc<AtomicBool>,
250 pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
252 pub output_bytes_total: Arc<AtomicUsize>,
254 pub control_churn_bytes_total: Arc<AtomicUsize>,
256 pub reader_worker: Mutex<Option<thread::JoinHandle<()>>>,
257 pub terminal_input_relay_stop: Arc<AtomicBool>,
258 pub terminal_input_relay_active: Arc<AtomicBool>,
259 pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
260}
261
262impl NativePtyProcess {
263 pub fn new(
264 argv: Vec<String>,
265 cwd: Option<String>,
266 env: Option<Vec<(String, String)>>,
267 rows: u16,
268 cols: u16,
269 nice: Option<i32>,
270 ) -> Result<Self, PtyError> {
271 if argv.is_empty() {
272 return Err(PtyError::Other("command cannot be empty".into()));
273 }
274 #[cfg(not(windows))]
275 let _ = nice;
276 Ok(Self {
277 argv,
278 cwd,
279 env,
280 rows,
281 cols,
282 #[cfg(windows)]
283 nice,
284 handles: Arc::new(Mutex::new(None)),
285 reader: Arc::new(PtyReadShared {
286 state: Mutex::new(PtyReadState {
287 chunks: VecDeque::new(),
288 closed: false,
289 }),
290 condvar: Condvar::new(),
291 }),
292 returncode: Arc::new(Mutex::new(None)),
293 input_bytes_total: Arc::new(AtomicUsize::new(0)),
294 newline_events_total: Arc::new(AtomicUsize::new(0)),
295 submit_events_total: Arc::new(AtomicUsize::new(0)),
296 echo: Arc::new(AtomicBool::new(false)),
297 idle_detector: Arc::new(Mutex::new(None)),
298 output_bytes_total: Arc::new(AtomicUsize::new(0)),
299 control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
300 reader_worker: Mutex::new(None),
301 terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
302 terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
303 terminal_input_relay_worker: Mutex::new(None),
304 })
305 }
306
307 pub fn mark_reader_closed(&self) {
308 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
309 guard.closed = true;
310 self.reader.condvar.notify_all();
311 }
312
313 pub fn store_returncode(&self, code: i32) {
314 store_pty_returncode(&self.returncode, code);
315 }
316
317 fn join_reader_worker(&self) {
318 if let Some(worker) = self
319 .reader_worker
320 .lock()
321 .expect("pty reader worker mutex poisoned")
322 .take()
323 {
324 let _ = worker.join();
325 }
326 }
327
328 pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
329 record_pty_input_metrics(
330 &self.input_bytes_total,
331 &self.newline_events_total,
332 &self.submit_events_total,
333 data,
334 submit,
335 );
336 }
337
338 pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
339 self.record_input_metrics(data, submit);
340 write_pty_input(&self.handles, data)?;
341 Ok(())
342 }
343
344 pub fn request_terminal_input_relay_stop(&self) {
345 self.terminal_input_relay_stop
346 .store(true, Ordering::Release);
347 self.terminal_input_relay_active
348 .store(false, Ordering::Release);
349 }
350
351 pub fn start_terminal_input_relay_impl(&self) -> Result<(), PtyError> {
352 let mut worker_guard = self
353 .terminal_input_relay_worker
354 .lock()
355 .expect("pty terminal input relay mutex poisoned");
356 if worker_guard.is_some() && self.terminal_input_relay_active() {
357 return Ok(());
358 }
359 if self
360 .handles
361 .lock()
362 .expect("pty handles mutex poisoned")
363 .is_none()
364 {
365 return Err(PtyError::NotRunning);
366 }
367
368 self.terminal_input_relay_stop
369 .store(false, Ordering::Release);
370 self.terminal_input_relay_active
371 .store(true, Ordering::Release);
372
373 let handles = Arc::clone(&self.handles);
374 let returncode = Arc::clone(&self.returncode);
375 let input_bytes_total = Arc::clone(&self.input_bytes_total);
376 let newline_events_total = Arc::clone(&self.newline_events_total);
377 let submit_events_total = Arc::clone(&self.submit_events_total);
378 let stop = Arc::clone(&self.terminal_input_relay_stop);
379 let active = Arc::clone(&self.terminal_input_relay_active);
380
381 #[cfg(windows)]
382 {
383 let capture = terminal_input::TerminalInputCore::new();
384 capture.start_impl().map_err(PtyError::Io)?;
385 *worker_guard = Some(thread::spawn(move || {
386 loop {
387 if stop.load(Ordering::Acquire) {
388 break;
389 }
390 match poll_pty_process(&handles, &returncode) {
391 Ok(Some(_)) => break,
392 Ok(None) => {}
393 Err(_) => break,
394 }
395 match terminal_input::wait_for_terminal_input_event(
396 &capture.state,
397 &capture.condvar,
398 Some(Duration::from_millis(50)),
399 ) {
400 terminal_input::TerminalInputWaitOutcome::Event(event) => {
401 record_pty_input_metrics(
402 &input_bytes_total,
403 &newline_events_total,
404 &submit_events_total,
405 &event.data,
406 event.submit,
407 );
408 if write_pty_input(&handles, &event.data).is_err() {
409 break;
410 }
411 }
412 terminal_input::TerminalInputWaitOutcome::Timeout => continue,
413 terminal_input::TerminalInputWaitOutcome::Closed => break,
414 }
415 }
416 active.store(false, Ordering::Release);
417 let _ = capture.stop_impl();
418 }));
419 Ok(())
420 }
421
422 #[cfg(unix)]
423 {
424 if unsafe { libc::isatty(libc::STDIN_FILENO) } != 1 {
425 self.terminal_input_relay_active
426 .store(false, Ordering::Release);
427 return Ok(());
428 }
429
430 *worker_guard = Some(thread::spawn(move || {
431 posix_terminal_input_relay_worker(
432 handles,
433 returncode,
434 input_bytes_total,
435 newline_events_total,
436 submit_events_total,
437 stop,
438 active,
439 );
440 }));
441 Ok(())
442 }
443 }
444
445 pub fn stop_terminal_input_relay_impl(&self) {
446 self.request_terminal_input_relay_stop();
447 if let Some(worker) = self
448 .terminal_input_relay_worker
449 .lock()
450 .expect("pty terminal input relay mutex poisoned")
451 .take()
452 {
453 let _ = worker.join();
454 }
455 }
456
457 pub fn terminal_input_relay_active(&self) -> bool {
458 self.terminal_input_relay_active.load(Ordering::Acquire)
459 }
460
461 #[inline(never)]
463 pub fn close_impl(&self) -> Result<(), PtyError> {
464 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_impl");
465 self.stop_terminal_input_relay_impl();
466 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
467 let Some(handles) = guard.take() else {
468 self.mark_reader_closed();
469 return Ok(());
470 };
471 drop(guard);
472
473 #[cfg(windows)]
474 let NativePtyHandles {
475 master,
476 writer,
477 mut child,
478 _job,
479 } = handles;
480 #[cfg(not(windows))]
481 let NativePtyHandles {
482 master,
483 writer,
484 mut child,
485 } = handles;
486
487 #[cfg(windows)]
488 {
489 {
490 crate::rp_rust_debug_scope!(
491 "running_process_core::NativePtyProcess::close_impl.drop_job"
492 );
493 drop(_job);
494 }
495
496 {
497 crate::rp_rust_debug_scope!(
498 "running_process_core::NativePtyProcess::close_impl.wait_job_exit"
499 );
500 let wait_deadline = Instant::now() + Duration::from_secs(2);
501 loop {
502 match child.try_wait() {
503 Ok(Some(status)) => {
504 let code = portable_exit_code(status);
505 self.store_returncode(code);
506 break;
507 }
508 Ok(None) if Instant::now() < wait_deadline => {
509 thread::sleep(Duration::from_millis(10));
510 }
511 Ok(None) => {
512 if let Err(err) = child.kill() {
513 if !is_ignorable_process_control_error(&err) {
514 return Err(PtyError::Io(err));
515 }
516 }
517 let code = match child.wait() {
518 Ok(status) => portable_exit_code(status),
519 Err(_) => -9,
520 };
521 self.store_returncode(code);
522 break;
523 }
524 Err(_) => {
525 self.store_returncode(-9);
526 break;
527 }
528 }
529 }
530 }
531 {
532 crate::rp_rust_debug_scope!(
533 "running_process_core::NativePtyProcess::close_impl.drop_writer"
534 );
535 drop(writer);
536 }
537 {
538 crate::rp_rust_debug_scope!(
539 "running_process_core::NativePtyProcess::close_impl.drop_master"
540 );
541 drop(master);
542 }
543 drop(child);
544 {
545 crate::rp_rust_debug_scope!(
546 "running_process_core::NativePtyProcess::close_impl.join_reader"
547 );
548 self.join_reader_worker();
549 }
550 self.mark_reader_closed();
551 return Ok(());
552 }
553
554 #[cfg(not(windows))]
555 {
556 drop(writer);
557 drop(master);
558
559 let code = {
560 crate::rp_rust_debug_scope!(
561 "running_process_core::NativePtyProcess::close_impl.wait_child"
562 );
563 match child.wait() {
564 Ok(status) => portable_exit_code(status),
565 Err(_) => -9,
566 }
567 };
568 drop(child);
569
570 self.store_returncode(code);
571 {
572 crate::rp_rust_debug_scope!(
573 "running_process_core::NativePtyProcess::close_impl.join_reader"
574 );
575 self.join_reader_worker();
576 }
577 self.mark_reader_closed();
578 return Ok(());
579 }
580 }
581
582 #[inline(never)]
584 pub fn close_nonblocking(&self) {
585 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_nonblocking");
586 #[cfg(windows)]
587 self.request_terminal_input_relay_stop();
588 let Ok(mut guard) = self.handles.lock() else {
589 return;
590 };
591 let Some(handles) = guard.take() else {
592 self.mark_reader_closed();
593 return;
594 };
595 drop(guard);
596
597 #[cfg(windows)]
598 let NativePtyHandles {
599 master,
600 writer,
601 mut child,
602 _job,
603 } = handles;
604 #[cfg(not(windows))]
605 let NativePtyHandles {
606 master,
607 writer,
608 mut child,
609 } = handles;
610
611 if let Err(err) = child.kill() {
612 if !is_ignorable_process_control_error(&err) {
613 return;
614 }
615 }
616 drop(writer);
617 drop(master);
618 drop(child);
619 #[cfg(windows)]
620 drop(_job);
621 self.mark_reader_closed();
622 }
623
624 pub fn start_impl(&self) -> Result<(), PtyError> {
625 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::start");
626 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
627 if guard.is_some() {
628 return Err(PtyError::AlreadyStarted);
629 }
630
631 #[cfg(windows)]
634 let conhost_pids_before = conhost_children_of_current_process();
635
636 let pty_system = native_pty_system();
637 let pair = pty_system
638 .openpty(PtySize {
639 rows: self.rows,
640 cols: self.cols,
641 pixel_width: 0,
642 pixel_height: 0,
643 })
644 .map_err(|e| PtyError::Spawn(e.to_string()))?;
645
646 let mut cmd = command_builder_from_argv(&self.argv);
647 if let Some(cwd) = &self.cwd {
648 cmd.cwd(cwd);
649 }
650 if let Some(env) = &self.env {
651 cmd.env_clear();
652 for (key, value) in env {
653 cmd.env(key, value);
654 }
655 }
656
657 let reader = pair
658 .master
659 .try_clone_reader()
660 .map_err(|e| PtyError::Spawn(e.to_string()))?;
661 let writer = pair
662 .master
663 .take_writer()
664 .map_err(|e| PtyError::Spawn(e.to_string()))?;
665 let child = pair
666 .slave
667 .spawn_command(cmd)
668 .map_err(|e| PtyError::Spawn(e.to_string()))?;
669 #[cfg(windows)]
670 let job = assign_child_to_windows_kill_on_close_job(child.as_raw_handle())?;
671 #[cfg(windows)]
672 assign_conpty_conhost_to_job(&job, &conhost_pids_before);
673 #[cfg(windows)]
674 apply_windows_pty_priority(child.as_raw_handle(), self.nice)?;
675 let shared = Arc::clone(&self.reader);
676 let echo = Arc::clone(&self.echo);
677 let idle_detector = Arc::clone(&self.idle_detector);
678 let output_bytes = Arc::clone(&self.output_bytes_total);
679 let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
680 let reader_worker = thread::spawn(move || {
681 spawn_pty_reader(
682 reader,
683 shared,
684 echo,
685 idle_detector,
686 output_bytes,
687 churn_bytes,
688 );
689 });
690 *self
691 .reader_worker
692 .lock()
693 .expect("pty reader worker mutex poisoned") = Some(reader_worker);
694
695 *guard = Some(NativePtyHandles {
696 master: pair.master,
697 writer,
698 child,
699 #[cfg(windows)]
700 _job: job,
701 });
702 Ok(())
703 }
704
705 pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
706 #[cfg(windows)]
707 {
708 pty_windows::respond_to_queries(self, data)
709 }
710
711 #[cfg(unix)]
712 {
713 pty_platform::respond_to_queries(self, data)
714 }
715 }
716
717 pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
718 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::resize");
719 let guard = self.handles.lock().expect("pty handles mutex poisoned");
720 if let Some(handles) = guard.as_ref() {
721 #[cfg(windows)]
722 {
723 let _ = (rows, cols, handles);
724 return Ok(());
728 }
729
730 #[cfg(not(windows))]
731 handles
732 .master
733 .resize(PtySize {
734 rows,
735 cols,
736 pixel_width: 0,
737 pixel_height: 0,
738 })
739 .map_err(|e| PtyError::Other(e.to_string()))?;
740 }
741 Ok(())
742 }
743
744 pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
745 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::send_interrupt");
746 #[cfg(windows)]
747 {
748 pty_windows::send_interrupt(self)
749 }
750
751 #[cfg(unix)]
752 {
753 pty_platform::send_interrupt(self)
754 }
755 }
756
757 pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
758 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::wait");
759 if let Some(code) = *self
761 .returncode
762 .lock()
763 .expect("pty returncode mutex poisoned")
764 {
765 return Ok(code);
766 }
767 let start = Instant::now();
768 loop {
769 if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
770 return Ok(code);
771 }
772 if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
773 return Err(PtyError::Timeout);
774 }
775 thread::sleep(Duration::from_millis(10));
776 }
777 }
778
779 pub fn terminate_impl(&self) -> Result<(), PtyError> {
780 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate");
781 #[cfg(windows)]
782 {
783 if self
784 .handles
785 .lock()
786 .expect("pty handles mutex poisoned")
787 .is_none()
788 {
789 return Err(PtyError::NotRunning);
790 }
791 return self.close_impl();
792 }
793
794 #[cfg(unix)]
795 {
796 pty_platform::terminate(self)
797 }
798 }
799
800 pub fn kill_impl(&self) -> Result<(), PtyError> {
801 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill");
802 #[cfg(windows)]
803 {
804 if self
805 .handles
806 .lock()
807 .expect("pty handles mutex poisoned")
808 .is_none()
809 {
810 return Err(PtyError::NotRunning);
811 }
812 return self.close_impl();
813 }
814
815 #[cfg(unix)]
816 {
817 pty_platform::kill(self)
818 }
819 }
820
821 pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
822 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate_tree");
823 #[cfg(windows)]
824 {
825 pty_windows::terminate_tree(self)
826 }
827
828 #[cfg(unix)]
829 {
830 pty_platform::terminate_tree(self)
831 }
832 }
833
834 pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
835 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill_tree");
836 #[cfg(windows)]
837 {
838 pty_windows::kill_tree(self)
839 }
840
841 #[cfg(unix)]
842 {
843 pty_platform::kill_tree(self)
844 }
845 }
846
847 pub fn pid(&self) -> Result<Option<u32>, PtyError> {
849 let guard = self.handles.lock().expect("pty handles mutex poisoned");
850 if let Some(handles) = guard.as_ref() {
851 #[cfg(unix)]
852 if let Some(pid) = handles.master.process_group_leader() {
853 if let Ok(pid) = u32::try_from(pid) {
854 return Ok(Some(pid));
855 }
856 }
857 return Ok(handles.child.process_id());
858 }
859 Ok(None)
860 }
861
862 pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
865 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
866 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
867 loop {
868 if let Some(chunk) = guard.chunks.pop_front() {
869 return Ok(Some(chunk));
870 }
871 if guard.closed {
872 return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
873 }
874 match deadline {
875 Some(deadline) => {
876 let now = Instant::now();
877 if now >= deadline {
878 return Ok(None); }
880 let wait = deadline.saturating_duration_since(now);
881 let result = self
882 .reader
883 .condvar
884 .wait_timeout(guard, wait)
885 .expect("pty read mutex poisoned");
886 guard = result.0;
887 }
888 None => {
889 guard = self
890 .reader
891 .condvar
892 .wait(guard)
893 .expect("pty read mutex poisoned");
894 }
895 }
896 }
897 }
898
899 pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
901 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
902 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
903 loop {
904 if guard.closed {
905 return true;
906 }
907 match deadline {
908 Some(deadline) => {
909 let now = Instant::now();
910 if now >= deadline {
911 return false;
912 }
913 let wait = deadline.saturating_duration_since(now);
914 let result = self
915 .reader
916 .condvar
917 .wait_timeout(guard, wait)
918 .expect("pty read mutex poisoned");
919 guard = result.0;
920 }
921 None => {
922 guard = self
923 .reader
924 .condvar
925 .wait(guard)
926 .expect("pty read mutex poisoned");
927 }
928 }
929 }
930 }
931
932 pub fn wait_and_drain_impl(
934 &self,
935 timeout: Option<f64>,
936 drain_timeout: f64,
937 ) -> Result<i32, PtyError> {
938 let code = self.wait_impl(timeout)?;
939 let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
940 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
941 while !guard.closed {
942 let remaining = deadline.saturating_duration_since(Instant::now());
943 if remaining.is_zero() {
944 break;
945 }
946 let result = self
947 .reader
948 .condvar
949 .wait_timeout(guard, remaining)
950 .expect("pty read mutex poisoned");
951 guard = result.0;
952 }
953 Ok(code)
954 }
955
956 pub fn set_echo(&self, enabled: bool) {
957 self.echo.store(enabled, Ordering::Release);
958 }
959
960 pub fn echo_enabled(&self) -> bool {
961 self.echo.load(Ordering::Acquire)
962 }
963
964 pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
965 let mut guard = self
966 .idle_detector
967 .lock()
968 .expect("idle detector mutex poisoned");
969 *guard = Some(Arc::clone(detector));
970 }
971
972 pub fn detach_idle_detector(&self) {
973 let mut guard = self
974 .idle_detector
975 .lock()
976 .expect("idle detector mutex poisoned");
977 *guard = None;
978 }
979
980 pub fn pty_input_bytes_total(&self) -> usize {
981 self.input_bytes_total.load(Ordering::Acquire)
982 }
983
984 pub fn pty_newline_events_total(&self) -> usize {
985 self.newline_events_total.load(Ordering::Acquire)
986 }
987
988 pub fn pty_submit_events_total(&self) -> usize {
989 self.submit_events_total.load(Ordering::Acquire)
990 }
991
992 pub fn pty_output_bytes_total(&self) -> usize {
993 self.output_bytes_total.load(Ordering::Acquire)
994 }
995
996 pub fn pty_control_churn_bytes_total(&self) -> usize {
997 self.control_churn_bytes_total.load(Ordering::Acquire)
998 }
999}
1000
1001#[derive(Debug, Clone, Copy)]
1006pub struct InteractivePtyOptions {
1007 pub echo_output: bool,
1008 pub relay_terminal_input: bool,
1009 pub respond_to_queries: bool,
1010}
1011
1012impl Default for InteractivePtyOptions {
1013 fn default() -> Self {
1014 Self {
1015 echo_output: true,
1016 relay_terminal_input: true,
1017 respond_to_queries: true,
1018 }
1019 }
1020}
1021
1022#[derive(Debug, Default)]
1023pub struct InteractivePtyPumpResult {
1024 pub chunks: Vec<Vec<u8>>,
1025 pub stream_closed: bool,
1026}
1027
1028pub struct InteractivePtySession {
1033 process: NativePtyProcess,
1034 options: InteractivePtyOptions,
1035}
1036
1037impl InteractivePtySession {
1038 pub fn new(process: NativePtyProcess) -> Self {
1039 Self::with_options(process, InteractivePtyOptions::default())
1040 }
1041
1042 pub fn with_options(process: NativePtyProcess, options: InteractivePtyOptions) -> Self {
1043 Self { process, options }
1044 }
1045
1046 pub fn process(&self) -> &NativePtyProcess {
1047 &self.process
1048 }
1049
1050 pub fn start(&self) -> Result<(), PtyError> {
1051 self.process.set_echo(self.options.echo_output);
1052 self.process.start_impl()?;
1053 if self.options.relay_terminal_input {
1054 self.process.start_terminal_input_relay_impl()?;
1055 }
1056 Ok(())
1057 }
1058
1059 pub fn pump_output(
1060 &self,
1061 timeout: Option<f64>,
1062 consume_all: bool,
1063 ) -> Result<InteractivePtyPumpResult, PtyError> {
1064 let mut pumped = InteractivePtyPumpResult::default();
1065 let mut next_timeout = timeout;
1066 loop {
1067 match self.process.read_chunk_impl(next_timeout) {
1068 Ok(Some(chunk)) => {
1069 if self.options.respond_to_queries {
1070 self.process.respond_to_queries_impl(&chunk)?;
1071 }
1072 pumped.chunks.push(chunk);
1073 if !consume_all {
1074 break;
1075 }
1076 next_timeout = Some(0.0);
1077 }
1078 Ok(None) => break,
1079 Err(PtyError::Other(message)) if message == "Pseudo-terminal stream is closed" => {
1080 pumped.stream_closed = true;
1081 break;
1082 }
1083 Err(err) => return Err(err),
1084 }
1085 }
1086 Ok(pumped)
1087 }
1088
1089 pub fn resize(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
1090 self.process.resize_impl(rows, cols)
1091 }
1092
1093 pub fn send_interrupt(&self) -> Result<(), PtyError> {
1094 self.process.send_interrupt_impl()
1095 }
1096
1097 pub fn wait(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
1098 self.process.wait_impl(timeout)
1099 }
1100
1101 pub fn wait_and_drain(
1102 &self,
1103 timeout: Option<f64>,
1104 drain_timeout: f64,
1105 ) -> Result<i32, PtyError> {
1106 self.process.wait_and_drain_impl(timeout, drain_timeout)
1107 }
1108
1109 pub fn terminate(&self) -> Result<(), PtyError> {
1110 self.process.terminate_impl()
1111 }
1112
1113 pub fn kill(&self) -> Result<(), PtyError> {
1114 self.process.kill_impl()
1115 }
1116
1117 pub fn close(&self) -> Result<(), PtyError> {
1118 self.process.close_impl()
1119 }
1120}
1121
1122impl Drop for NativePtyProcess {
1123 fn drop(&mut self) {
1124 self.close_nonblocking();
1125 }
1126}
1127
1128pub fn control_churn_bytes(data: &[u8]) -> usize {
1131 let mut total = 0;
1132 let mut index = 0;
1133 while index < data.len() {
1134 let byte = data[index];
1135 if byte == 0x1B {
1136 let start = index;
1137 index += 1;
1138 if index < data.len() && data[index] == b'[' {
1139 index += 1;
1140 while index < data.len() {
1141 let current = data[index];
1142 index += 1;
1143 if (0x40..=0x7E).contains(¤t) {
1144 break;
1145 }
1146 }
1147 }
1148 total += index - start;
1149 continue;
1150 }
1151 if matches!(byte, 0x08 | 0x0D | 0x7F) {
1152 total += 1;
1153 }
1154 index += 1;
1155 }
1156 total
1157}
1158
1159pub fn command_builder_from_argv(argv: &[String]) -> CommandBuilder {
1160 let mut command = CommandBuilder::new(&argv[0]);
1161 if argv.len() > 1 {
1162 command.args(
1163 argv[1..]
1164 .iter()
1165 .map(OsString::from)
1166 .collect::<Vec<OsString>>(),
1167 );
1168 }
1169 command
1170}
1171
1172#[inline(never)]
1173pub fn spawn_pty_reader(
1174 mut reader: Box<dyn Read + Send>,
1175 shared: Arc<PtyReadShared>,
1176 echo: Arc<AtomicBool>,
1177 idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
1178 output_bytes_total: Arc<AtomicUsize>,
1179 control_churn_bytes_total: Arc<AtomicUsize>,
1180) {
1181 crate::rp_rust_debug_scope!("running_process_core::spawn_pty_reader");
1182 let idle_detector_snapshot = idle_detector
1183 .lock()
1184 .expect("idle detector mutex poisoned")
1185 .clone();
1186 let mut chunk = vec![0_u8; 65536];
1187 loop {
1188 match reader.read(&mut chunk) {
1189 Ok(0) => break,
1190 Ok(n) => {
1191 let data = &chunk[..n];
1192
1193 let churn = control_churn_bytes(data);
1194 let visible = data.len().saturating_sub(churn);
1195 output_bytes_total.fetch_add(visible, Ordering::Relaxed);
1196 control_churn_bytes_total.fetch_add(churn, Ordering::Relaxed);
1197
1198 if echo.load(Ordering::Relaxed) {
1199 let _ = std::io::stdout().write_all(data);
1200 let _ = std::io::stdout().flush();
1201 }
1202
1203 if let Some(ref detector) = idle_detector_snapshot {
1204 detector.record_output(data);
1205 }
1206
1207 let mut guard = shared.state.lock().expect("pty read mutex poisoned");
1208 guard.chunks.push_back(data.to_vec());
1209 shared.condvar.notify_all();
1210 }
1211 Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
1212 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
1213 thread::sleep(Duration::from_millis(10));
1214 continue;
1215 }
1216 Err(_) => break,
1217 }
1218 }
1219 let mut guard = shared.state.lock().expect("pty read mutex poisoned");
1220 guard.closed = true;
1221 shared.condvar.notify_all();
1222}
1223
1224pub fn portable_exit_code(status: portable_pty::ExitStatus) -> i32 {
1225 if let Some(signal) = status.signal() {
1226 let signal = signal.to_ascii_lowercase();
1227 if signal.contains("interrupt") {
1228 return -2;
1229 }
1230 if signal.contains("terminated") {
1231 return -15;
1232 }
1233 if signal.contains("killed") {
1234 return -9;
1235 }
1236 }
1237 status.exit_code() as i32
1238}
1239
1240pub fn input_contains_newline(data: &[u8]) -> bool {
1241 data.iter().any(|byte| matches!(*byte, b'\r' | b'\n'))
1242}
1243
1244#[cfg(unix)]
1245struct PosixTerminalModeGuard {
1246 stdin_fd: i32,
1247 original_mode: libc::termios,
1248}
1249
1250#[cfg(unix)]
1251impl Drop for PosixTerminalModeGuard {
1252 fn drop(&mut self) {
1253 unsafe {
1254 libc::tcsetattr(self.stdin_fd, libc::TCSANOW, &self.original_mode);
1255 }
1256 }
1257}
1258
1259#[cfg(unix)]
1260fn acquire_posix_terminal_mode_guard() -> Result<PosixTerminalModeGuard, std::io::Error> {
1261 let stdin_fd = libc::STDIN_FILENO;
1262 let mut original_mode = unsafe { std::mem::zeroed::<libc::termios>() };
1263 if unsafe { libc::tcgetattr(stdin_fd, &mut original_mode) } != 0 {
1264 return Err(std::io::Error::last_os_error());
1265 }
1266 let mut raw_mode = original_mode;
1267 unsafe {
1268 libc::cfmakeraw(&mut raw_mode);
1269 }
1270 if unsafe { libc::tcsetattr(stdin_fd, libc::TCSANOW, &raw_mode) } != 0 {
1271 return Err(std::io::Error::last_os_error());
1272 }
1273 Ok(PosixTerminalModeGuard {
1274 stdin_fd,
1275 original_mode,
1276 })
1277}
1278
1279#[cfg(unix)]
1280#[inline(never)]
1281fn posix_terminal_input_relay_worker(
1282 handles: Arc<Mutex<Option<NativePtyHandles>>>,
1283 returncode: Arc<Mutex<Option<i32>>>,
1284 input_bytes_total: Arc<AtomicUsize>,
1285 newline_events_total: Arc<AtomicUsize>,
1286 submit_events_total: Arc<AtomicUsize>,
1287 stop: Arc<AtomicBool>,
1288 active: Arc<AtomicBool>,
1289) {
1290 let _terminal_guard = match acquire_posix_terminal_mode_guard() {
1291 Ok(guard) => guard,
1292 Err(_) => {
1293 active.store(false, Ordering::Release);
1294 return;
1295 }
1296 };
1297
1298 let stdin_fd = libc::STDIN_FILENO;
1299 let mut buffer = vec![0_u8; 65536];
1300 loop {
1301 if stop.load(Ordering::Acquire) {
1302 break;
1303 }
1304 match poll_pty_process(&handles, &returncode) {
1305 Ok(Some(_)) => break,
1306 Ok(None) => {}
1307 Err(_) => break,
1308 }
1309
1310 let mut pollfd = libc::pollfd {
1311 fd: stdin_fd,
1312 events: libc::POLLIN,
1313 revents: 0,
1314 };
1315 let poll_result = unsafe { libc::poll(&mut pollfd, 1, 50) };
1316 if poll_result < 0 {
1317 let err = std::io::Error::last_os_error();
1318 if err.kind() == std::io::ErrorKind::Interrupted {
1319 continue;
1320 }
1321 break;
1322 }
1323 if poll_result == 0 || pollfd.revents & libc::POLLIN == 0 {
1324 continue;
1325 }
1326
1327 let read_result = unsafe { libc::read(stdin_fd, buffer.as_mut_ptr().cast(), buffer.len()) };
1328 if read_result < 0 {
1329 let err = std::io::Error::last_os_error();
1330 if err.kind() == std::io::ErrorKind::Interrupted {
1331 continue;
1332 }
1333 break;
1334 }
1335 if read_result == 0 {
1336 continue;
1337 }
1338
1339 let mut data = buffer[..read_result as usize].to_vec();
1340 loop {
1341 let mut drain_pollfd = libc::pollfd {
1342 fd: stdin_fd,
1343 events: libc::POLLIN,
1344 revents: 0,
1345 };
1346 let drain_ready = unsafe { libc::poll(&mut drain_pollfd, 1, 0) };
1347 if drain_ready <= 0 || drain_pollfd.revents & libc::POLLIN == 0 {
1348 break;
1349 }
1350 let drain_result =
1351 unsafe { libc::read(stdin_fd, buffer.as_mut_ptr().cast(), buffer.len()) };
1352 if drain_result <= 0 {
1353 break;
1354 }
1355 data.extend_from_slice(&buffer[..drain_result as usize]);
1356 }
1357
1358 record_pty_input_metrics(
1359 &input_bytes_total,
1360 &newline_events_total,
1361 &submit_events_total,
1362 &data,
1363 input_contains_newline(&data),
1364 );
1365 if write_pty_input(&handles, &data).is_err() {
1366 break;
1367 }
1368 }
1369
1370 active.store(false, Ordering::Release);
1371}
1372
1373pub fn record_pty_input_metrics(
1374 input_bytes_total: &Arc<AtomicUsize>,
1375 newline_events_total: &Arc<AtomicUsize>,
1376 submit_events_total: &Arc<AtomicUsize>,
1377 data: &[u8],
1378 submit: bool,
1379) {
1380 input_bytes_total.fetch_add(data.len(), Ordering::AcqRel);
1381 if input_contains_newline(data) {
1382 newline_events_total.fetch_add(1, Ordering::AcqRel);
1383 }
1384 if submit {
1385 submit_events_total.fetch_add(1, Ordering::AcqRel);
1386 }
1387}
1388
1389pub fn store_pty_returncode(returncode: &Arc<Mutex<Option<i32>>>, code: i32) {
1390 *returncode.lock().expect("pty returncode mutex poisoned") = Some(code);
1391}
1392
1393pub fn poll_pty_process(
1394 handles: &Arc<Mutex<Option<NativePtyHandles>>>,
1395 returncode: &Arc<Mutex<Option<i32>>>,
1396) -> Result<Option<i32>, std::io::Error> {
1397 let mut guard = handles.lock().expect("pty handles mutex poisoned");
1398 let Some(handles) = guard.as_mut() else {
1399 return Ok(*returncode.lock().expect("pty returncode mutex poisoned"));
1400 };
1401 let status = handles.child.try_wait()?;
1402 let code = status.map(portable_exit_code);
1403 if let Some(code) = code {
1404 store_pty_returncode(returncode, code);
1405 return Ok(Some(code));
1406 }
1407 Ok(None)
1408}
1409
1410pub fn write_pty_input(
1411 handles: &Arc<Mutex<Option<NativePtyHandles>>>,
1412 data: &[u8],
1413) -> Result<(), std::io::Error> {
1414 let mut guard = handles.lock().expect("pty handles mutex poisoned");
1415 let handles = guard.as_mut().ok_or_else(|| {
1416 std::io::Error::new(
1417 std::io::ErrorKind::NotConnected,
1418 "Pseudo-terminal process is not running",
1419 )
1420 })?;
1421 #[cfg(windows)]
1422 let payload = pty_windows::input_payload(data);
1423 #[cfg(unix)]
1424 let payload = pty_platform::input_payload(data);
1425 handles.writer.write_all(&payload)?;
1426 handles.writer.flush()
1427}
1428
1429#[cfg(windows)]
1430pub fn windows_terminal_input_payload(data: &[u8]) -> Vec<u8> {
1431 let mut translated = Vec::with_capacity(data.len());
1432 let mut index = 0usize;
1433 while index < data.len() {
1434 let current = data[index];
1435 if current == b'\r' {
1436 translated.push(current);
1437 if index + 1 < data.len() && data[index + 1] == b'\n' {
1438 translated.push(b'\n');
1439 index += 2;
1440 continue;
1441 }
1442 index += 1;
1443 continue;
1444 }
1445 if current == b'\n' {
1446 translated.push(b'\r');
1447 index += 1;
1448 continue;
1449 }
1450 translated.push(current);
1451 index += 1;
1452 }
1453 translated
1454}
1455
1456#[cfg(windows)]
1457#[inline(never)]
1458pub fn assign_child_to_windows_kill_on_close_job(
1459 handle: Option<std::os::windows::io::RawHandle>,
1460) -> Result<WindowsJobHandle, PtyError> {
1461 crate::rp_rust_debug_scope!(
1462 "running_process_core::pty::assign_child_to_windows_kill_on_close_job"
1463 );
1464 use std::mem::zeroed;
1465
1466 use winapi::shared::minwindef::FALSE;
1467 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
1468 use winapi::um::jobapi2::{
1469 AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
1470 };
1471 use winapi::um::winnt::{
1472 JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
1473 JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
1474 };
1475
1476 let Some(handle) = handle else {
1477 return Err(PtyError::Other(
1478 "Pseudo-terminal child does not expose a Windows process handle".into(),
1479 ));
1480 };
1481
1482 let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
1483 if job.is_null() || job == INVALID_HANDLE_VALUE {
1484 return Err(PtyError::Io(std::io::Error::last_os_error()));
1485 }
1486
1487 let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
1488 info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
1489 let result = unsafe {
1490 SetInformationJobObject(
1491 job,
1492 JobObjectExtendedLimitInformation,
1493 (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
1494 std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
1495 )
1496 };
1497 if result == FALSE {
1498 let err = std::io::Error::last_os_error();
1499 unsafe {
1500 winapi::um::handleapi::CloseHandle(job);
1501 }
1502 return Err(PtyError::Io(err));
1503 }
1504
1505 let result = unsafe { AssignProcessToJobObject(job, handle.cast()) };
1506 if result == FALSE {
1507 let err = std::io::Error::last_os_error();
1508 unsafe {
1509 winapi::um::handleapi::CloseHandle(job);
1510 }
1511 return Err(PtyError::Io(err));
1512 }
1513
1514 Ok(WindowsJobHandle(job as usize))
1515}
1516
1517#[cfg(windows)]
1519#[derive(Debug, Clone)]
1520pub struct ChildProcessInfo {
1521 pub pid: u32,
1522 pub name: String,
1523}
1524
1525#[cfg(windows)]
1528pub fn find_child_processes(parent_pid: u32) -> Vec<ChildProcessInfo> {
1529 use winapi::um::handleapi::CloseHandle;
1530 use winapi::um::tlhelp32::{
1531 CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1532 };
1533
1534 let mut children = Vec::new();
1535 let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1536 if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1537 return children;
1538 }
1539
1540 let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1541 entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1542
1543 if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1544 loop {
1545 if entry.th32ParentProcessID == parent_pid {
1546 let name_bytes = &entry.szExeFile;
1547 let name_len = name_bytes
1548 .iter()
1549 .position(|&b| b == 0)
1550 .unwrap_or(name_bytes.len());
1551 let name = String::from_utf8_lossy(
1552 &name_bytes[..name_len]
1553 .iter()
1554 .map(|&c| c as u8)
1555 .collect::<Vec<u8>>(),
1556 )
1557 .into_owned();
1558 children.push(ChildProcessInfo {
1559 pid: entry.th32ProcessID,
1560 name,
1561 });
1562 }
1563 if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1564 break;
1565 }
1566 }
1567 }
1568
1569 unsafe { CloseHandle(snapshot) };
1570 children
1571}
1572
1573#[cfg(windows)]
1575fn conhost_children_of_current_process() -> Vec<u32> {
1576 let our_pid = std::process::id();
1577 find_child_processes(our_pid)
1578 .into_iter()
1579 .filter(|c| c.name.eq_ignore_ascii_case("conhost.exe"))
1580 .map(|c| c.pid)
1581 .collect()
1582}
1583
1584#[cfg(windows)]
1588fn assign_conpty_conhost_to_job(job: &WindowsJobHandle, before_pids: &[u32]) {
1589 let after_pids = conhost_children_of_current_process();
1590 for pid in after_pids {
1591 if !before_pids.contains(&pid) {
1592 let _ = job.assign_pid(pid);
1594 }
1595 }
1596}
1597
1598#[cfg(windows)]
1601#[derive(Debug, Clone)]
1602pub struct OrphanConhostInfo {
1603 pub pid: u32,
1605 pub parent_pid: u32,
1607 pub parent_name: String,
1609}
1610
1611#[cfg(windows)]
1617pub fn find_orphan_conhosts() -> Vec<OrphanConhostInfo> {
1618 use winapi::um::handleapi::CloseHandle;
1619 use winapi::um::tlhelp32::{
1620 CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1621 };
1622
1623 let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1624 if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1625 return Vec::new();
1626 }
1627
1628 let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1629 entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1630
1631 let mut all_pids = std::collections::HashSet::new();
1633 let mut conhosts: Vec<(u32, u32)> = Vec::new(); let mut parent_names: std::collections::HashMap<u32, String> = std::collections::HashMap::new();
1635
1636 if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1637 loop {
1638 let name_bytes = &entry.szExeFile;
1639 let name_len = name_bytes
1640 .iter()
1641 .position(|&b| b == 0)
1642 .unwrap_or(name_bytes.len());
1643 let name = String::from_utf8_lossy(
1644 &name_bytes[..name_len]
1645 .iter()
1646 .map(|&c| c as u8)
1647 .collect::<Vec<u8>>(),
1648 )
1649 .into_owned();
1650
1651 all_pids.insert(entry.th32ProcessID);
1652 parent_names.insert(entry.th32ProcessID, name.clone());
1653
1654 if name.eq_ignore_ascii_case("conhost.exe") {
1655 conhosts.push((entry.th32ProcessID, entry.th32ParentProcessID));
1656 }
1657
1658 if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1659 break;
1660 }
1661 }
1662 }
1663
1664 unsafe { CloseHandle(snapshot) };
1665
1666 conhosts
1668 .into_iter()
1669 .filter(|&(_, parent_pid)| !all_pids.contains(&parent_pid))
1670 .map(|(pid, parent_pid)| OrphanConhostInfo {
1671 pid,
1672 parent_pid,
1673 parent_name: parent_names.get(&parent_pid).cloned().unwrap_or_default(),
1674 })
1675 .collect()
1676}
1677
1678#[cfg(windows)]
1679#[inline(never)]
1680pub fn apply_windows_pty_priority(
1681 handle: Option<std::os::windows::io::RawHandle>,
1682 nice: Option<i32>,
1683) -> Result<(), PtyError> {
1684 crate::rp_rust_debug_scope!("running_process_core::pty::apply_windows_pty_priority");
1685 use winapi::um::processthreadsapi::SetPriorityClass;
1686 use winapi::um::winbase::{
1687 ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, HIGH_PRIORITY_CLASS,
1688 IDLE_PRIORITY_CLASS,
1689 };
1690
1691 let Some(handle) = handle else {
1692 return Ok(());
1693 };
1694 let flags = match nice {
1695 Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
1696 Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
1697 Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
1698 Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
1699 _ => 0,
1700 };
1701 if flags == 0 {
1702 return Ok(());
1703 }
1704 let result = unsafe { SetPriorityClass(handle.cast(), flags) };
1705 if result == 0 {
1706 return Err(PtyError::Io(std::io::Error::last_os_error()));
1707 }
1708 Ok(())
1709}