1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::io::{Read, Write};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
10use thiserror::Error;
11
12pub mod reexports {
14 pub use portable_pty;
15}
16
17#[cfg(unix)]
18mod pty_posix;
19#[cfg(windows)]
20mod pty_windows;
21
22pub mod terminal_input;
23
24#[cfg(unix)]
25use pty_posix as pty_platform;
26
27#[derive(Debug, Error)]
28pub enum PtyError {
29 #[error("pseudo-terminal process already started")]
30 AlreadyStarted,
31 #[error("pseudo-terminal process is not running")]
32 NotRunning,
33 #[error("pseudo-terminal timed out")]
34 Timeout,
35 #[error("pseudo-terminal I/O error: {0}")]
36 Io(#[from] std::io::Error),
37 #[error("pseudo-terminal spawn failed: {0}")]
38 Spawn(String),
39 #[error("pseudo-terminal error: {0}")]
40 Other(String),
41}
42
43pub fn is_ignorable_process_control_error(err: &std::io::Error) -> bool {
44 if matches!(
45 err.kind(),
46 std::io::ErrorKind::NotFound | std::io::ErrorKind::InvalidInput
47 ) {
48 return true;
49 }
50 #[cfg(unix)]
51 if err.raw_os_error() == Some(libc::ESRCH) {
52 return true;
53 }
54 false
55}
56
57pub struct PtyReadState {
58 pub chunks: VecDeque<Vec<u8>>,
59 pub closed: bool,
60}
61
62pub struct PtyReadShared {
63 pub state: Mutex<PtyReadState>,
64 pub condvar: Condvar,
65}
66
67pub struct NativePtyHandles {
68 pub master: Box<dyn MasterPty + Send>,
69 pub writer: Box<dyn Write + Send>,
70 pub child: Box<dyn portable_pty::Child + Send + Sync>,
71 #[cfg(windows)]
72 pub _job: WindowsJobHandle,
73}
74
75#[cfg(windows)]
76pub struct WindowsJobHandle(pub usize);
77
78#[cfg(windows)]
79impl WindowsJobHandle {
80 pub fn assign_pid(&self, pid: u32) -> Result<(), std::io::Error> {
82 use winapi::um::handleapi::CloseHandle;
83 use winapi::um::processthreadsapi::OpenProcess;
84 use winapi::um::winnt::PROCESS_SET_QUOTA;
85 use winapi::um::winnt::PROCESS_TERMINATE;
86
87 let handle = unsafe { OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, 0, pid) };
88 if handle.is_null() {
89 return Err(std::io::Error::last_os_error());
90 }
91 let result = unsafe {
92 winapi::um::jobapi2::AssignProcessToJobObject(
93 self.0 as winapi::shared::ntdef::HANDLE,
94 handle,
95 )
96 };
97 unsafe { CloseHandle(handle) };
98 if result == 0 {
99 return Err(std::io::Error::last_os_error());
100 }
101 Ok(())
102 }
103}
104
105#[cfg(windows)]
106impl Drop for WindowsJobHandle {
107 fn drop(&mut self) {
108 unsafe {
109 winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
110 }
111 }
112}
113
114pub struct IdleMonitorState {
115 pub last_reset_at: Instant,
116 pub returncode: Option<i32>,
117 pub interrupted: bool,
118}
119
120pub struct IdleDetectorCore {
123 pub timeout_seconds: f64,
124 pub stability_window_seconds: f64,
125 pub sample_interval_seconds: f64,
126 pub reset_on_input: bool,
127 pub reset_on_output: bool,
128 pub count_control_churn_as_output: bool,
129 pub enabled: Arc<AtomicBool>,
130 pub state: Mutex<IdleMonitorState>,
131 pub condvar: Condvar,
132}
133
134impl IdleDetectorCore {
135 pub fn record_input(&self, byte_count: usize) {
136 if !self.reset_on_input || byte_count == 0 {
137 return;
138 }
139 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
140 guard.last_reset_at = Instant::now();
141 self.condvar.notify_all();
142 }
143
144 pub fn record_output(&self, data: &[u8]) {
145 if !self.reset_on_output || data.is_empty() {
146 return;
147 }
148 let control_bytes = control_churn_bytes(data);
149 let visible_output_bytes = data.len().saturating_sub(control_bytes);
150 let active_output =
151 visible_output_bytes > 0 || (self.count_control_churn_as_output && control_bytes > 0);
152 if !active_output {
153 return;
154 }
155 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
156 guard.last_reset_at = Instant::now();
157 self.condvar.notify_all();
158 }
159
160 pub fn mark_exit(&self, returncode: i32, interrupted: bool) {
161 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
162 guard.returncode = Some(returncode);
163 guard.interrupted = interrupted;
164 self.condvar.notify_all();
165 }
166
167 pub fn enabled(&self) -> bool {
168 self.enabled.load(Ordering::Acquire)
169 }
170
171 pub fn set_enabled(&self, enabled: bool) {
172 let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
173 if enabled && !was_enabled {
174 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
175 guard.last_reset_at = Instant::now();
176 }
177 self.condvar.notify_all();
178 }
179
180 pub fn wait(&self, timeout: Option<f64>) -> (bool, String, f64, Option<i32>) {
181 let started = Instant::now();
182 let overall_timeout = timeout.map(Duration::from_secs_f64);
183 let min_idle = self.timeout_seconds.max(self.stability_window_seconds);
184 let sample_interval = Duration::from_secs_f64(self.sample_interval_seconds.max(0.001));
185
186 let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
187 loop {
188 let now = Instant::now();
189 let idle_for = now.duration_since(guard.last_reset_at).as_secs_f64();
190
191 if let Some(returncode) = guard.returncode {
192 let reason = if guard.interrupted {
193 "interrupt"
194 } else {
195 "process_exit"
196 };
197 return (false, reason.to_string(), idle_for, Some(returncode));
198 }
199
200 let enabled = self.enabled.load(Ordering::Acquire);
201 if enabled && idle_for >= min_idle {
202 return (true, "idle_timeout".to_string(), idle_for, None);
203 }
204
205 if let Some(limit) = overall_timeout {
206 if now.duration_since(started) >= limit {
207 return (false, "timeout".to_string(), idle_for, None);
208 }
209 }
210
211 let idle_remaining = if enabled {
212 (min_idle - idle_for).max(0.0)
213 } else {
214 sample_interval.as_secs_f64()
215 };
216 let mut wait_for =
217 sample_interval.min(Duration::from_secs_f64(idle_remaining.max(0.001)));
218 if let Some(limit) = overall_timeout {
219 let elapsed = now.duration_since(started);
220 if elapsed < limit {
221 let remaining = limit - elapsed;
222 wait_for = wait_for.min(remaining);
223 }
224 }
225 let result = self
226 .condvar
227 .wait_timeout(guard, wait_for)
228 .expect("idle monitor mutex poisoned");
229 guard = result.0;
230 }
231 }
232}
233
234pub struct NativePtyProcess {
235 pub argv: Vec<String>,
236 pub cwd: Option<String>,
237 pub env: Option<Vec<(String, String)>>,
238 pub rows: u16,
239 pub cols: u16,
240 #[cfg(windows)]
241 pub nice: Option<i32>,
242 pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
243 pub reader: Arc<PtyReadShared>,
244 pub returncode: Arc<Mutex<Option<i32>>>,
245 pub input_bytes_total: Arc<AtomicUsize>,
246 pub newline_events_total: Arc<AtomicUsize>,
247 pub submit_events_total: Arc<AtomicUsize>,
248 pub echo: Arc<AtomicBool>,
250 pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
252 pub output_bytes_total: Arc<AtomicUsize>,
254 pub control_churn_bytes_total: Arc<AtomicUsize>,
256 pub reader_worker: Mutex<Option<thread::JoinHandle<()>>>,
257 pub terminal_input_relay_stop: Arc<AtomicBool>,
258 pub terminal_input_relay_active: Arc<AtomicBool>,
259 pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
260}
261
262fn resolved_spawn_cwd(cwd: Option<&str>) -> Option<String> {
263 cwd.map(str::to_owned).or_else(|| {
264 std::env::current_dir()
265 .ok()
266 .map(|cwd| cwd.to_string_lossy().to_string())
267 })
268}
269
270impl NativePtyProcess {
271 pub fn new(
272 argv: Vec<String>,
273 cwd: Option<String>,
274 env: Option<Vec<(String, String)>>,
275 rows: u16,
276 cols: u16,
277 nice: Option<i32>,
278 ) -> Result<Self, PtyError> {
279 if argv.is_empty() {
280 return Err(PtyError::Other("command cannot be empty".into()));
281 }
282 #[cfg(not(windows))]
283 let _ = nice;
284 Ok(Self {
285 argv,
286 cwd,
287 env,
288 rows,
289 cols,
290 #[cfg(windows)]
291 nice,
292 handles: Arc::new(Mutex::new(None)),
293 reader: Arc::new(PtyReadShared {
294 state: Mutex::new(PtyReadState {
295 chunks: VecDeque::new(),
296 closed: false,
297 }),
298 condvar: Condvar::new(),
299 }),
300 returncode: Arc::new(Mutex::new(None)),
301 input_bytes_total: Arc::new(AtomicUsize::new(0)),
302 newline_events_total: Arc::new(AtomicUsize::new(0)),
303 submit_events_total: Arc::new(AtomicUsize::new(0)),
304 echo: Arc::new(AtomicBool::new(false)),
305 idle_detector: Arc::new(Mutex::new(None)),
306 output_bytes_total: Arc::new(AtomicUsize::new(0)),
307 control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
308 reader_worker: Mutex::new(None),
309 terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
310 terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
311 terminal_input_relay_worker: Mutex::new(None),
312 })
313 }
314
315 pub fn mark_reader_closed(&self) {
316 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
317 guard.closed = true;
318 self.reader.condvar.notify_all();
319 }
320
321 pub fn store_returncode(&self, code: i32) {
322 store_pty_returncode(&self.returncode, code);
323 }
324
325 fn join_reader_worker(&self) {
326 if let Some(worker) = self
327 .reader_worker
328 .lock()
329 .expect("pty reader worker mutex poisoned")
330 .take()
331 {
332 let _ = worker.join();
333 }
334 }
335
336 pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
337 record_pty_input_metrics(
338 &self.input_bytes_total,
339 &self.newline_events_total,
340 &self.submit_events_total,
341 data,
342 submit,
343 );
344 }
345
346 pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
347 self.record_input_metrics(data, submit);
348 write_pty_input(&self.handles, data)?;
349 Ok(())
350 }
351
352 pub fn request_terminal_input_relay_stop(&self) {
353 self.terminal_input_relay_stop
354 .store(true, Ordering::Release);
355 self.terminal_input_relay_active
356 .store(false, Ordering::Release);
357 }
358
359 pub fn start_terminal_input_relay_impl(&self) -> Result<(), PtyError> {
360 let mut worker_guard = self
361 .terminal_input_relay_worker
362 .lock()
363 .expect("pty terminal input relay mutex poisoned");
364 if worker_guard.is_some() && self.terminal_input_relay_active() {
365 return Ok(());
366 }
367 if self
368 .handles
369 .lock()
370 .expect("pty handles mutex poisoned")
371 .is_none()
372 {
373 return Err(PtyError::NotRunning);
374 }
375
376 self.terminal_input_relay_stop
377 .store(false, Ordering::Release);
378 self.terminal_input_relay_active
379 .store(true, Ordering::Release);
380
381 let handles = Arc::clone(&self.handles);
382 let returncode = Arc::clone(&self.returncode);
383 let input_bytes_total = Arc::clone(&self.input_bytes_total);
384 let newline_events_total = Arc::clone(&self.newline_events_total);
385 let submit_events_total = Arc::clone(&self.submit_events_total);
386 let stop = Arc::clone(&self.terminal_input_relay_stop);
387 let active = Arc::clone(&self.terminal_input_relay_active);
388
389 #[cfg(windows)]
390 {
391 let capture = terminal_input::TerminalInputCore::new();
392 capture.start_impl().map_err(PtyError::Io)?;
393 *worker_guard = Some(thread::spawn(move || {
394 loop {
395 if stop.load(Ordering::Acquire) {
396 break;
397 }
398 match poll_pty_process(&handles, &returncode) {
399 Ok(Some(_)) => break,
400 Ok(None) => {}
401 Err(_) => break,
402 }
403 match terminal_input::wait_for_terminal_input_event(
404 &capture.state,
405 &capture.condvar,
406 Some(Duration::from_millis(50)),
407 ) {
408 terminal_input::TerminalInputWaitOutcome::Event(event) => {
409 record_pty_input_metrics(
410 &input_bytes_total,
411 &newline_events_total,
412 &submit_events_total,
413 &event.data,
414 event.submit,
415 );
416 if write_pty_input(&handles, &event.data).is_err() {
417 break;
418 }
419 }
420 terminal_input::TerminalInputWaitOutcome::Timeout => continue,
421 terminal_input::TerminalInputWaitOutcome::Closed => break,
422 }
423 }
424 active.store(false, Ordering::Release);
425 let _ = capture.stop_impl();
426 }));
427 Ok(())
428 }
429
430 #[cfg(unix)]
431 {
432 if unsafe { libc::isatty(libc::STDIN_FILENO) } != 1 {
433 self.terminal_input_relay_active
434 .store(false, Ordering::Release);
435 return Ok(());
436 }
437
438 *worker_guard = Some(thread::spawn(move || {
439 posix_terminal_input_relay_worker(
440 handles,
441 returncode,
442 input_bytes_total,
443 newline_events_total,
444 submit_events_total,
445 stop,
446 active,
447 );
448 }));
449 Ok(())
450 }
451 }
452
453 pub fn stop_terminal_input_relay_impl(&self) {
454 self.request_terminal_input_relay_stop();
455 if let Some(worker) = self
456 .terminal_input_relay_worker
457 .lock()
458 .expect("pty terminal input relay mutex poisoned")
459 .take()
460 {
461 let _ = worker.join();
462 }
463 }
464
465 pub fn terminal_input_relay_active(&self) -> bool {
466 self.terminal_input_relay_active.load(Ordering::Acquire)
467 }
468
469 #[inline(never)]
471 pub fn close_impl(&self) -> Result<(), PtyError> {
472 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_impl");
473 self.stop_terminal_input_relay_impl();
474 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
475 let Some(handles) = guard.take() else {
476 self.mark_reader_closed();
477 return Ok(());
478 };
479 drop(guard);
480
481 #[cfg(windows)]
482 let NativePtyHandles {
483 master,
484 writer,
485 mut child,
486 _job,
487 } = handles;
488 #[cfg(not(windows))]
489 let NativePtyHandles {
490 master,
491 writer,
492 mut child,
493 } = handles;
494
495 #[cfg(windows)]
496 {
497 {
498 crate::rp_rust_debug_scope!(
499 "running_process_core::NativePtyProcess::close_impl.drop_job"
500 );
501 drop(_job);
502 }
503
504 {
505 crate::rp_rust_debug_scope!(
506 "running_process_core::NativePtyProcess::close_impl.wait_job_exit"
507 );
508 let wait_deadline = Instant::now() + Duration::from_secs(2);
509 loop {
510 match child.try_wait() {
511 Ok(Some(status)) => {
512 let code = portable_exit_code(status);
513 self.store_returncode(code);
514 break;
515 }
516 Ok(None) if Instant::now() < wait_deadline => {
517 thread::sleep(Duration::from_millis(10));
518 }
519 Ok(None) => {
520 if let Err(err) = child.kill() {
521 if !is_ignorable_process_control_error(&err) {
522 return Err(PtyError::Io(err));
523 }
524 }
525 let code = match child.wait() {
526 Ok(status) => portable_exit_code(status),
527 Err(_) => -9,
528 };
529 self.store_returncode(code);
530 break;
531 }
532 Err(_) => {
533 self.store_returncode(-9);
534 break;
535 }
536 }
537 }
538 }
539 {
540 crate::rp_rust_debug_scope!(
541 "running_process_core::NativePtyProcess::close_impl.drop_writer"
542 );
543 drop(writer);
544 }
545 {
546 crate::rp_rust_debug_scope!(
547 "running_process_core::NativePtyProcess::close_impl.drop_master"
548 );
549 drop(master);
550 }
551 drop(child);
552 {
553 crate::rp_rust_debug_scope!(
554 "running_process_core::NativePtyProcess::close_impl.join_reader"
555 );
556 self.join_reader_worker();
557 }
558 self.mark_reader_closed();
559 Ok(())
560 }
561
562 #[cfg(not(windows))]
563 {
564 drop(writer);
565 drop(master);
566
567 let code = {
568 crate::rp_rust_debug_scope!(
569 "running_process_core::NativePtyProcess::close_impl.wait_child"
570 );
571 match child.wait() {
572 Ok(status) => portable_exit_code(status),
573 Err(_) => -9,
574 }
575 };
576 drop(child);
577
578 self.store_returncode(code);
579 {
580 crate::rp_rust_debug_scope!(
581 "running_process_core::NativePtyProcess::close_impl.join_reader"
582 );
583 self.join_reader_worker();
584 }
585 self.mark_reader_closed();
586 Ok(())
587 }
588 }
589
590 #[inline(never)]
592 pub fn close_nonblocking(&self) {
593 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_nonblocking");
594 #[cfg(windows)]
595 self.request_terminal_input_relay_stop();
596 let Ok(mut guard) = self.handles.lock() else {
597 return;
598 };
599 let Some(handles) = guard.take() else {
600 self.mark_reader_closed();
601 return;
602 };
603 drop(guard);
604
605 #[cfg(windows)]
606 let NativePtyHandles {
607 master,
608 writer,
609 mut child,
610 _job,
611 } = handles;
612 #[cfg(not(windows))]
613 let NativePtyHandles {
614 master,
615 writer,
616 mut child,
617 } = handles;
618
619 if let Err(err) = child.kill() {
620 if !is_ignorable_process_control_error(&err) {
621 return;
622 }
623 }
624 drop(writer);
625 drop(master);
626 drop(child);
627 #[cfg(windows)]
628 drop(_job);
629 self.mark_reader_closed();
630 }
631
632 pub fn start_impl(&self) -> Result<(), PtyError> {
633 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::start");
634 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
635 if guard.is_some() {
636 return Err(PtyError::AlreadyStarted);
637 }
638
639 #[cfg(windows)]
642 let conhost_pids_before = conhost_children_of_current_process();
643
644 let pty_system = native_pty_system();
645 let pair = pty_system
646 .openpty(PtySize {
647 rows: self.rows,
648 cols: self.cols,
649 pixel_width: 0,
650 pixel_height: 0,
651 })
652 .map_err(|e| PtyError::Spawn(e.to_string()))?;
653
654 let mut cmd = command_builder_from_argv(&self.argv);
655 let cwd = resolved_spawn_cwd(self.cwd.as_deref());
656 if let Some(cwd) = &cwd {
657 cmd.cwd(cwd);
658 }
659 if let Some(env) = &self.env {
660 cmd.env_clear();
661 for (key, value) in env {
662 cmd.env(key, value);
663 }
664 }
665
666 let reader = pair
667 .master
668 .try_clone_reader()
669 .map_err(|e| PtyError::Spawn(e.to_string()))?;
670 let writer = pair
671 .master
672 .take_writer()
673 .map_err(|e| PtyError::Spawn(e.to_string()))?;
674 let child = pair
675 .slave
676 .spawn_command(cmd)
677 .map_err(|e| PtyError::Spawn(e.to_string()))?;
678 #[cfg(windows)]
679 let job = assign_child_to_windows_kill_on_close_job(child.as_raw_handle())?;
680 #[cfg(windows)]
681 assign_conpty_conhost_to_job(&job, &conhost_pids_before);
682 #[cfg(windows)]
683 apply_windows_pty_priority(child.as_raw_handle(), self.nice)?;
684 let shared = Arc::clone(&self.reader);
685 let echo = Arc::clone(&self.echo);
686 let idle_detector = Arc::clone(&self.idle_detector);
687 let output_bytes = Arc::clone(&self.output_bytes_total);
688 let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
689 let reader_worker = thread::spawn(move || {
690 spawn_pty_reader(
691 reader,
692 shared,
693 echo,
694 idle_detector,
695 output_bytes,
696 churn_bytes,
697 );
698 });
699 *self
700 .reader_worker
701 .lock()
702 .expect("pty reader worker mutex poisoned") = Some(reader_worker);
703
704 *guard = Some(NativePtyHandles {
705 master: pair.master,
706 writer,
707 child,
708 #[cfg(windows)]
709 _job: job,
710 });
711 Ok(())
712 }
713
714 pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
715 #[cfg(windows)]
716 {
717 pty_windows::respond_to_queries(self, data)
718 }
719
720 #[cfg(unix)]
721 {
722 pty_platform::respond_to_queries(self, data)
723 }
724 }
725
726 pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
727 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::resize");
728 let guard = self.handles.lock().expect("pty handles mutex poisoned");
729 if let Some(handles) = guard.as_ref() {
730 #[cfg(windows)]
731 {
732 let _ = (rows, cols, handles);
733 return Ok(());
737 }
738
739 #[cfg(not(windows))]
740 handles
741 .master
742 .resize(PtySize {
743 rows,
744 cols,
745 pixel_width: 0,
746 pixel_height: 0,
747 })
748 .map_err(|e| PtyError::Other(e.to_string()))?;
749 }
750 Ok(())
751 }
752
753 pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
754 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::send_interrupt");
755 #[cfg(windows)]
756 {
757 pty_windows::send_interrupt(self)
758 }
759
760 #[cfg(unix)]
761 {
762 pty_platform::send_interrupt(self)
763 }
764 }
765
766 pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
767 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::wait");
768 if let Some(code) = *self
770 .returncode
771 .lock()
772 .expect("pty returncode mutex poisoned")
773 {
774 return Ok(code);
775 }
776 let start = Instant::now();
777 loop {
778 if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
779 return Ok(code);
780 }
781 if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
782 return Err(PtyError::Timeout);
783 }
784 thread::sleep(Duration::from_millis(10));
785 }
786 }
787
788 pub fn terminate_impl(&self) -> Result<(), PtyError> {
789 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate");
790 #[cfg(windows)]
791 {
792 if self
793 .handles
794 .lock()
795 .expect("pty handles mutex poisoned")
796 .is_none()
797 {
798 return Err(PtyError::NotRunning);
799 }
800 self.close_impl()
801 }
802
803 #[cfg(unix)]
804 {
805 pty_platform::terminate(self)
806 }
807 }
808
809 pub fn kill_impl(&self) -> Result<(), PtyError> {
810 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill");
811 #[cfg(windows)]
812 {
813 if self
814 .handles
815 .lock()
816 .expect("pty handles mutex poisoned")
817 .is_none()
818 {
819 return Err(PtyError::NotRunning);
820 }
821 self.close_impl()
822 }
823
824 #[cfg(unix)]
825 {
826 pty_platform::kill(self)
827 }
828 }
829
830 pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
831 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate_tree");
832 #[cfg(windows)]
833 {
834 pty_windows::terminate_tree(self)
835 }
836
837 #[cfg(unix)]
838 {
839 pty_platform::terminate_tree(self)
840 }
841 }
842
843 pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
844 crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill_tree");
845 #[cfg(windows)]
846 {
847 pty_windows::kill_tree(self)
848 }
849
850 #[cfg(unix)]
851 {
852 pty_platform::kill_tree(self)
853 }
854 }
855
856 pub fn pid(&self) -> Result<Option<u32>, PtyError> {
858 let guard = self.handles.lock().expect("pty handles mutex poisoned");
859 if let Some(handles) = guard.as_ref() {
860 #[cfg(unix)]
861 if let Some(pid) = handles.master.process_group_leader() {
862 if let Ok(pid) = u32::try_from(pid) {
863 return Ok(Some(pid));
864 }
865 }
866 return Ok(handles.child.process_id());
867 }
868 Ok(None)
869 }
870
871 pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
874 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
875 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
876 loop {
877 if let Some(chunk) = guard.chunks.pop_front() {
878 return Ok(Some(chunk));
879 }
880 if guard.closed {
881 return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
882 }
883 match deadline {
884 Some(deadline) => {
885 let now = Instant::now();
886 if now >= deadline {
887 return Ok(None); }
889 let wait = deadline.saturating_duration_since(now);
890 let result = self
891 .reader
892 .condvar
893 .wait_timeout(guard, wait)
894 .expect("pty read mutex poisoned");
895 guard = result.0;
896 }
897 None => {
898 guard = self
899 .reader
900 .condvar
901 .wait(guard)
902 .expect("pty read mutex poisoned");
903 }
904 }
905 }
906 }
907
908 pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
910 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
911 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
912 loop {
913 if guard.closed {
914 return true;
915 }
916 match deadline {
917 Some(deadline) => {
918 let now = Instant::now();
919 if now >= deadline {
920 return false;
921 }
922 let wait = deadline.saturating_duration_since(now);
923 let result = self
924 .reader
925 .condvar
926 .wait_timeout(guard, wait)
927 .expect("pty read mutex poisoned");
928 guard = result.0;
929 }
930 None => {
931 guard = self
932 .reader
933 .condvar
934 .wait(guard)
935 .expect("pty read mutex poisoned");
936 }
937 }
938 }
939 }
940
941 pub fn wait_and_drain_impl(
943 &self,
944 timeout: Option<f64>,
945 drain_timeout: f64,
946 ) -> Result<i32, PtyError> {
947 let code = self.wait_impl(timeout)?;
948 let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
949 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
950 while !guard.closed {
951 let remaining = deadline.saturating_duration_since(Instant::now());
952 if remaining.is_zero() {
953 break;
954 }
955 let result = self
956 .reader
957 .condvar
958 .wait_timeout(guard, remaining)
959 .expect("pty read mutex poisoned");
960 guard = result.0;
961 }
962 Ok(code)
963 }
964
965 pub fn set_echo(&self, enabled: bool) {
966 self.echo.store(enabled, Ordering::Release);
967 }
968
969 pub fn echo_enabled(&self) -> bool {
970 self.echo.load(Ordering::Acquire)
971 }
972
973 pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
974 let mut guard = self
975 .idle_detector
976 .lock()
977 .expect("idle detector mutex poisoned");
978 *guard = Some(Arc::clone(detector));
979 }
980
981 pub fn detach_idle_detector(&self) {
982 let mut guard = self
983 .idle_detector
984 .lock()
985 .expect("idle detector mutex poisoned");
986 *guard = None;
987 }
988
989 pub fn pty_input_bytes_total(&self) -> usize {
990 self.input_bytes_total.load(Ordering::Acquire)
991 }
992
993 pub fn pty_newline_events_total(&self) -> usize {
994 self.newline_events_total.load(Ordering::Acquire)
995 }
996
997 pub fn pty_submit_events_total(&self) -> usize {
998 self.submit_events_total.load(Ordering::Acquire)
999 }
1000
1001 pub fn pty_output_bytes_total(&self) -> usize {
1002 self.output_bytes_total.load(Ordering::Acquire)
1003 }
1004
1005 pub fn pty_control_churn_bytes_total(&self) -> usize {
1006 self.control_churn_bytes_total.load(Ordering::Acquire)
1007 }
1008}
1009
1010#[derive(Debug, Clone, Copy)]
1015pub struct InteractivePtyOptions {
1016 pub echo_output: bool,
1017 pub relay_terminal_input: bool,
1018 pub respond_to_queries: bool,
1019}
1020
1021impl Default for InteractivePtyOptions {
1022 fn default() -> Self {
1023 Self {
1024 echo_output: true,
1025 relay_terminal_input: true,
1026 respond_to_queries: true,
1027 }
1028 }
1029}
1030
1031#[derive(Debug, Default)]
1032pub struct InteractivePtyPumpResult {
1033 pub chunks: Vec<Vec<u8>>,
1034 pub stream_closed: bool,
1035}
1036
1037pub struct InteractivePtySession {
1042 process: NativePtyProcess,
1043 options: InteractivePtyOptions,
1044}
1045
1046impl InteractivePtySession {
1047 pub fn new(process: NativePtyProcess) -> Self {
1048 Self::with_options(process, InteractivePtyOptions::default())
1049 }
1050
1051 pub fn with_options(process: NativePtyProcess, options: InteractivePtyOptions) -> Self {
1052 Self { process, options }
1053 }
1054
1055 pub fn process(&self) -> &NativePtyProcess {
1056 &self.process
1057 }
1058
1059 pub fn start(&self) -> Result<(), PtyError> {
1060 self.process.set_echo(self.options.echo_output);
1061 self.process.start_impl()?;
1062 if self.options.relay_terminal_input {
1063 self.process.start_terminal_input_relay_impl()?;
1064 }
1065 Ok(())
1066 }
1067
1068 pub fn pump_output(
1069 &self,
1070 timeout: Option<f64>,
1071 consume_all: bool,
1072 ) -> Result<InteractivePtyPumpResult, PtyError> {
1073 let mut pumped = InteractivePtyPumpResult::default();
1074 let mut next_timeout = timeout;
1075 loop {
1076 match self.process.read_chunk_impl(next_timeout) {
1077 Ok(Some(chunk)) => {
1078 if self.options.respond_to_queries {
1079 self.process.respond_to_queries_impl(&chunk)?;
1080 }
1081 pumped.chunks.push(chunk);
1082 if !consume_all {
1083 break;
1084 }
1085 next_timeout = Some(0.0);
1086 }
1087 Ok(None) => break,
1088 Err(PtyError::Other(message)) if message == "Pseudo-terminal stream is closed" => {
1089 pumped.stream_closed = true;
1090 break;
1091 }
1092 Err(err) => return Err(err),
1093 }
1094 }
1095 Ok(pumped)
1096 }
1097
1098 pub fn resize(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
1099 self.process.resize_impl(rows, cols)
1100 }
1101
1102 pub fn send_interrupt(&self) -> Result<(), PtyError> {
1103 self.process.send_interrupt_impl()
1104 }
1105
1106 pub fn wait(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
1107 self.process.wait_impl(timeout)
1108 }
1109
1110 pub fn wait_and_drain(
1111 &self,
1112 timeout: Option<f64>,
1113 drain_timeout: f64,
1114 ) -> Result<i32, PtyError> {
1115 self.process.wait_and_drain_impl(timeout, drain_timeout)
1116 }
1117
1118 pub fn terminate(&self) -> Result<(), PtyError> {
1119 self.process.terminate_impl()
1120 }
1121
1122 pub fn kill(&self) -> Result<(), PtyError> {
1123 self.process.kill_impl()
1124 }
1125
1126 pub fn close(&self) -> Result<(), PtyError> {
1127 self.process.close_impl()
1128 }
1129}
1130
1131impl Drop for NativePtyProcess {
1132 fn drop(&mut self) {
1133 self.close_nonblocking();
1134 }
1135}
1136
1137pub fn control_churn_bytes(data: &[u8]) -> usize {
1140 let mut total = 0;
1141 let mut index = 0;
1142 while index < data.len() {
1143 let byte = data[index];
1144 if byte == 0x1B {
1145 let start = index;
1146 index += 1;
1147 if index < data.len() && data[index] == b'[' {
1148 index += 1;
1149 while index < data.len() {
1150 let current = data[index];
1151 index += 1;
1152 if (0x40..=0x7E).contains(¤t) {
1153 break;
1154 }
1155 }
1156 }
1157 total += index - start;
1158 continue;
1159 }
1160 if matches!(byte, 0x08 | 0x0D | 0x7F) {
1161 total += 1;
1162 }
1163 index += 1;
1164 }
1165 total
1166}
1167
1168pub fn command_builder_from_argv(argv: &[String]) -> CommandBuilder {
1169 let mut command = CommandBuilder::new(&argv[0]);
1170 if argv.len() > 1 {
1171 command.args(
1172 argv[1..]
1173 .iter()
1174 .map(OsString::from)
1175 .collect::<Vec<OsString>>(),
1176 );
1177 }
1178 command
1179}
1180
1181#[inline(never)]
1182pub fn spawn_pty_reader(
1183 mut reader: Box<dyn Read + Send>,
1184 shared: Arc<PtyReadShared>,
1185 echo: Arc<AtomicBool>,
1186 idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
1187 output_bytes_total: Arc<AtomicUsize>,
1188 control_churn_bytes_total: Arc<AtomicUsize>,
1189) {
1190 crate::rp_rust_debug_scope!("running_process_core::spawn_pty_reader");
1191 let idle_detector_snapshot = idle_detector
1192 .lock()
1193 .expect("idle detector mutex poisoned")
1194 .clone();
1195 let mut chunk = vec![0_u8; 65536];
1196 loop {
1197 match reader.read(&mut chunk) {
1198 Ok(0) => break,
1199 Ok(n) => {
1200 let data = &chunk[..n];
1201
1202 let churn = control_churn_bytes(data);
1203 let visible = data.len().saturating_sub(churn);
1204 output_bytes_total.fetch_add(visible, Ordering::Relaxed);
1205 control_churn_bytes_total.fetch_add(churn, Ordering::Relaxed);
1206
1207 if echo.load(Ordering::Relaxed) {
1208 let _ = std::io::stdout().write_all(data);
1209 let _ = std::io::stdout().flush();
1210 }
1211
1212 if let Some(ref detector) = idle_detector_snapshot {
1213 detector.record_output(data);
1214 }
1215
1216 let mut guard = shared.state.lock().expect("pty read mutex poisoned");
1217 guard.chunks.push_back(data.to_vec());
1218 shared.condvar.notify_all();
1219 }
1220 Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
1221 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
1222 thread::sleep(Duration::from_millis(10));
1223 continue;
1224 }
1225 Err(_) => break,
1226 }
1227 }
1228 let mut guard = shared.state.lock().expect("pty read mutex poisoned");
1229 guard.closed = true;
1230 shared.condvar.notify_all();
1231}
1232
1233pub fn portable_exit_code(status: portable_pty::ExitStatus) -> i32 {
1234 if let Some(signal) = status.signal() {
1235 let signal = signal.to_ascii_lowercase();
1236 if signal.contains("interrupt") {
1237 return -2;
1238 }
1239 if signal.contains("terminated") {
1240 return -15;
1241 }
1242 if signal.contains("killed") {
1243 return -9;
1244 }
1245 }
1246 status.exit_code() as i32
1247}
1248
1249pub fn input_contains_newline(data: &[u8]) -> bool {
1250 data.iter().any(|byte| matches!(*byte, b'\r' | b'\n'))
1251}
1252
1253#[cfg(unix)]
1254struct PosixTerminalModeGuard {
1255 stdin_fd: i32,
1256 original_mode: libc::termios,
1257}
1258
1259#[cfg(unix)]
1260impl Drop for PosixTerminalModeGuard {
1261 fn drop(&mut self) {
1262 unsafe {
1263 libc::tcsetattr(self.stdin_fd, libc::TCSANOW, &self.original_mode);
1264 }
1265 }
1266}
1267
1268#[cfg(unix)]
1269fn acquire_posix_terminal_mode_guard() -> Result<PosixTerminalModeGuard, std::io::Error> {
1270 let stdin_fd = libc::STDIN_FILENO;
1271 let mut original_mode = unsafe { std::mem::zeroed::<libc::termios>() };
1272 if unsafe { libc::tcgetattr(stdin_fd, &mut original_mode) } != 0 {
1273 return Err(std::io::Error::last_os_error());
1274 }
1275 let mut raw_mode = original_mode;
1276 unsafe {
1277 libc::cfmakeraw(&mut raw_mode);
1278 }
1279 if unsafe { libc::tcsetattr(stdin_fd, libc::TCSANOW, &raw_mode) } != 0 {
1280 return Err(std::io::Error::last_os_error());
1281 }
1282 Ok(PosixTerminalModeGuard {
1283 stdin_fd,
1284 original_mode,
1285 })
1286}
1287
1288#[cfg(unix)]
1289#[inline(never)]
1290fn posix_terminal_input_relay_worker(
1291 handles: Arc<Mutex<Option<NativePtyHandles>>>,
1292 returncode: Arc<Mutex<Option<i32>>>,
1293 input_bytes_total: Arc<AtomicUsize>,
1294 newline_events_total: Arc<AtomicUsize>,
1295 submit_events_total: Arc<AtomicUsize>,
1296 stop: Arc<AtomicBool>,
1297 active: Arc<AtomicBool>,
1298) {
1299 let _terminal_guard = match acquire_posix_terminal_mode_guard() {
1300 Ok(guard) => guard,
1301 Err(_) => {
1302 active.store(false, Ordering::Release);
1303 return;
1304 }
1305 };
1306
1307 let stdin_fd = libc::STDIN_FILENO;
1308 let mut buffer = vec![0_u8; 65536];
1309 loop {
1310 if stop.load(Ordering::Acquire) {
1311 break;
1312 }
1313 match poll_pty_process(&handles, &returncode) {
1314 Ok(Some(_)) => break,
1315 Ok(None) => {}
1316 Err(_) => break,
1317 }
1318
1319 let mut pollfd = libc::pollfd {
1320 fd: stdin_fd,
1321 events: libc::POLLIN,
1322 revents: 0,
1323 };
1324 let poll_result = unsafe { libc::poll(&mut pollfd, 1, 50) };
1325 if poll_result < 0 {
1326 let err = std::io::Error::last_os_error();
1327 if err.kind() == std::io::ErrorKind::Interrupted {
1328 continue;
1329 }
1330 break;
1331 }
1332 if poll_result == 0 || pollfd.revents & libc::POLLIN == 0 {
1333 continue;
1334 }
1335
1336 let read_result = unsafe { libc::read(stdin_fd, buffer.as_mut_ptr().cast(), buffer.len()) };
1337 if read_result < 0 {
1338 let err = std::io::Error::last_os_error();
1339 if err.kind() == std::io::ErrorKind::Interrupted {
1340 continue;
1341 }
1342 break;
1343 }
1344 if read_result == 0 {
1345 continue;
1346 }
1347
1348 let mut data = buffer[..read_result as usize].to_vec();
1349 loop {
1350 let mut drain_pollfd = libc::pollfd {
1351 fd: stdin_fd,
1352 events: libc::POLLIN,
1353 revents: 0,
1354 };
1355 let drain_ready = unsafe { libc::poll(&mut drain_pollfd, 1, 0) };
1356 if drain_ready <= 0 || drain_pollfd.revents & libc::POLLIN == 0 {
1357 break;
1358 }
1359 let drain_result =
1360 unsafe { libc::read(stdin_fd, buffer.as_mut_ptr().cast(), buffer.len()) };
1361 if drain_result <= 0 {
1362 break;
1363 }
1364 data.extend_from_slice(&buffer[..drain_result as usize]);
1365 }
1366
1367 record_pty_input_metrics(
1368 &input_bytes_total,
1369 &newline_events_total,
1370 &submit_events_total,
1371 &data,
1372 input_contains_newline(&data),
1373 );
1374 if write_pty_input(&handles, &data).is_err() {
1375 break;
1376 }
1377 }
1378
1379 active.store(false, Ordering::Release);
1380}
1381
1382pub fn record_pty_input_metrics(
1383 input_bytes_total: &Arc<AtomicUsize>,
1384 newline_events_total: &Arc<AtomicUsize>,
1385 submit_events_total: &Arc<AtomicUsize>,
1386 data: &[u8],
1387 submit: bool,
1388) {
1389 input_bytes_total.fetch_add(data.len(), Ordering::AcqRel);
1390 if input_contains_newline(data) {
1391 newline_events_total.fetch_add(1, Ordering::AcqRel);
1392 }
1393 if submit {
1394 submit_events_total.fetch_add(1, Ordering::AcqRel);
1395 }
1396}
1397
1398pub fn store_pty_returncode(returncode: &Arc<Mutex<Option<i32>>>, code: i32) {
1399 *returncode.lock().expect("pty returncode mutex poisoned") = Some(code);
1400}
1401
1402pub fn poll_pty_process(
1403 handles: &Arc<Mutex<Option<NativePtyHandles>>>,
1404 returncode: &Arc<Mutex<Option<i32>>>,
1405) -> Result<Option<i32>, std::io::Error> {
1406 let mut guard = handles.lock().expect("pty handles mutex poisoned");
1407 let Some(handles) = guard.as_mut() else {
1408 return Ok(*returncode.lock().expect("pty returncode mutex poisoned"));
1409 };
1410 let status = handles.child.try_wait()?;
1411 let code = status.map(portable_exit_code);
1412 if let Some(code) = code {
1413 store_pty_returncode(returncode, code);
1414 return Ok(Some(code));
1415 }
1416 Ok(None)
1417}
1418
1419pub fn write_pty_input(
1420 handles: &Arc<Mutex<Option<NativePtyHandles>>>,
1421 data: &[u8],
1422) -> Result<(), std::io::Error> {
1423 let mut guard = handles.lock().expect("pty handles mutex poisoned");
1424 let handles = guard.as_mut().ok_or_else(|| {
1425 std::io::Error::new(
1426 std::io::ErrorKind::NotConnected,
1427 "Pseudo-terminal process is not running",
1428 )
1429 })?;
1430 #[cfg(windows)]
1431 let payload = pty_windows::input_payload(data);
1432 #[cfg(unix)]
1433 let payload = pty_platform::input_payload(data);
1434 handles.writer.write_all(&payload)?;
1435 handles.writer.flush()
1436}
1437
1438#[cfg(windows)]
1439pub fn windows_terminal_input_payload(data: &[u8]) -> Vec<u8> {
1440 let mut translated = Vec::with_capacity(data.len());
1441 let mut index = 0usize;
1442 while index < data.len() {
1443 let current = data[index];
1444 if current == b'\r' {
1445 translated.push(current);
1446 if index + 1 < data.len() && data[index + 1] == b'\n' {
1447 translated.push(b'\n');
1448 index += 2;
1449 continue;
1450 }
1451 index += 1;
1452 continue;
1453 }
1454 if current == b'\n' {
1455 translated.push(b'\r');
1456 index += 1;
1457 continue;
1458 }
1459 translated.push(current);
1460 index += 1;
1461 }
1462 translated
1463}
1464
1465#[cfg(windows)]
1466#[inline(never)]
1467pub fn assign_child_to_windows_kill_on_close_job(
1468 handle: Option<std::os::windows::io::RawHandle>,
1469) -> Result<WindowsJobHandle, PtyError> {
1470 crate::rp_rust_debug_scope!(
1471 "running_process_core::pty::assign_child_to_windows_kill_on_close_job"
1472 );
1473 use std::mem::zeroed;
1474
1475 use winapi::shared::minwindef::FALSE;
1476 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
1477 use winapi::um::jobapi2::{
1478 AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
1479 };
1480 use winapi::um::winnt::{
1481 JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
1482 JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
1483 };
1484
1485 let Some(handle) = handle else {
1486 return Err(PtyError::Other(
1487 "Pseudo-terminal child does not expose a Windows process handle".into(),
1488 ));
1489 };
1490
1491 let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
1492 if job.is_null() || job == INVALID_HANDLE_VALUE {
1493 return Err(PtyError::Io(std::io::Error::last_os_error()));
1494 }
1495
1496 let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
1497 info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
1498 let result = unsafe {
1499 SetInformationJobObject(
1500 job,
1501 JobObjectExtendedLimitInformation,
1502 (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
1503 std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
1504 )
1505 };
1506 if result == FALSE {
1507 let err = std::io::Error::last_os_error();
1508 unsafe {
1509 winapi::um::handleapi::CloseHandle(job);
1510 }
1511 return Err(PtyError::Io(err));
1512 }
1513
1514 let result = unsafe { AssignProcessToJobObject(job, handle.cast()) };
1515 if result == FALSE {
1516 let err = std::io::Error::last_os_error();
1517 unsafe {
1518 winapi::um::handleapi::CloseHandle(job);
1519 }
1520 return Err(PtyError::Io(err));
1521 }
1522
1523 Ok(WindowsJobHandle(job as usize))
1524}
1525
1526#[cfg(windows)]
1528#[derive(Debug, Clone)]
1529pub struct ChildProcessInfo {
1530 pub pid: u32,
1531 pub name: String,
1532}
1533
1534#[cfg(windows)]
1537pub fn find_child_processes(parent_pid: u32) -> Vec<ChildProcessInfo> {
1538 use winapi::um::handleapi::CloseHandle;
1539 use winapi::um::tlhelp32::{
1540 CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1541 };
1542
1543 let mut children = Vec::new();
1544 let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1545 if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1546 return children;
1547 }
1548
1549 let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1550 entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1551
1552 if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1553 loop {
1554 if entry.th32ParentProcessID == parent_pid {
1555 let name_bytes = &entry.szExeFile;
1556 let name_len = name_bytes
1557 .iter()
1558 .position(|&b| b == 0)
1559 .unwrap_or(name_bytes.len());
1560 let name = String::from_utf8_lossy(
1561 &name_bytes[..name_len]
1562 .iter()
1563 .map(|&c| c as u8)
1564 .collect::<Vec<u8>>(),
1565 )
1566 .into_owned();
1567 children.push(ChildProcessInfo {
1568 pid: entry.th32ProcessID,
1569 name,
1570 });
1571 }
1572 if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1573 break;
1574 }
1575 }
1576 }
1577
1578 unsafe { CloseHandle(snapshot) };
1579 children
1580}
1581
1582#[cfg(windows)]
1584fn conhost_children_of_current_process() -> Vec<u32> {
1585 let our_pid = std::process::id();
1586 find_child_processes(our_pid)
1587 .into_iter()
1588 .filter(|c| c.name.eq_ignore_ascii_case("conhost.exe"))
1589 .map(|c| c.pid)
1590 .collect()
1591}
1592
1593#[cfg(windows)]
1597fn assign_conpty_conhost_to_job(job: &WindowsJobHandle, before_pids: &[u32]) {
1598 let after_pids = conhost_children_of_current_process();
1599 for pid in after_pids {
1600 if !before_pids.contains(&pid) {
1601 let _ = job.assign_pid(pid);
1603 }
1604 }
1605}
1606
1607#[cfg(windows)]
1610#[derive(Debug, Clone)]
1611pub struct OrphanConhostInfo {
1612 pub pid: u32,
1614 pub parent_pid: u32,
1616 pub parent_name: String,
1618}
1619
1620#[cfg(windows)]
1626pub fn find_orphan_conhosts() -> Vec<OrphanConhostInfo> {
1627 use winapi::um::handleapi::CloseHandle;
1628 use winapi::um::tlhelp32::{
1629 CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1630 };
1631
1632 let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1633 if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1634 return Vec::new();
1635 }
1636
1637 let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1638 entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1639
1640 let mut all_pids = std::collections::HashSet::new();
1642 let mut conhosts: Vec<(u32, u32)> = Vec::new(); let mut parent_names: std::collections::HashMap<u32, String> = std::collections::HashMap::new();
1644
1645 if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1646 loop {
1647 let name_bytes = &entry.szExeFile;
1648 let name_len = name_bytes
1649 .iter()
1650 .position(|&b| b == 0)
1651 .unwrap_or(name_bytes.len());
1652 let name = String::from_utf8_lossy(
1653 &name_bytes[..name_len]
1654 .iter()
1655 .map(|&c| c as u8)
1656 .collect::<Vec<u8>>(),
1657 )
1658 .into_owned();
1659
1660 all_pids.insert(entry.th32ProcessID);
1661 parent_names.insert(entry.th32ProcessID, name.clone());
1662
1663 if name.eq_ignore_ascii_case("conhost.exe") {
1664 conhosts.push((entry.th32ProcessID, entry.th32ParentProcessID));
1665 }
1666
1667 if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1668 break;
1669 }
1670 }
1671 }
1672
1673 unsafe { CloseHandle(snapshot) };
1674
1675 conhosts
1677 .into_iter()
1678 .filter(|&(_, parent_pid)| !all_pids.contains(&parent_pid))
1679 .map(|(pid, parent_pid)| OrphanConhostInfo {
1680 pid,
1681 parent_pid,
1682 parent_name: parent_names.get(&parent_pid).cloned().unwrap_or_default(),
1683 })
1684 .collect()
1685}
1686
1687#[cfg(windows)]
1688#[inline(never)]
1689pub fn apply_windows_pty_priority(
1690 handle: Option<std::os::windows::io::RawHandle>,
1691 nice: Option<i32>,
1692) -> Result<(), PtyError> {
1693 crate::rp_rust_debug_scope!("running_process_core::pty::apply_windows_pty_priority");
1694 use winapi::um::processthreadsapi::SetPriorityClass;
1695 use winapi::um::winbase::{
1696 ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, HIGH_PRIORITY_CLASS,
1697 IDLE_PRIORITY_CLASS,
1698 };
1699
1700 let Some(handle) = handle else {
1701 return Ok(());
1702 };
1703 let flags = match nice {
1704 Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
1705 Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
1706 Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
1707 Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
1708 _ => 0,
1709 };
1710 if flags == 0 {
1711 return Ok(());
1712 }
1713 let result = unsafe { SetPriorityClass(handle.cast(), flags) };
1714 if result == 0 {
1715 return Err(PtyError::Io(std::io::Error::last_os_error()));
1716 }
1717 Ok(())
1718}
1719
1720#[cfg(test)]
1721mod tests {
1722 use super::resolved_spawn_cwd;
1723
1724 #[test]
1725 fn resolved_spawn_cwd_preserves_explicit_value() {
1726 assert_eq!(
1727 resolved_spawn_cwd(Some("C:\\temp\\explicit")),
1728 Some("C:\\temp\\explicit".to_string())
1729 );
1730 }
1731
1732 #[test]
1733 fn resolved_spawn_cwd_defaults_to_current_dir_when_unset() {
1734 let expected = std::env::current_dir()
1735 .ok()
1736 .map(|cwd| cwd.to_string_lossy().to_string());
1737 assert_eq!(resolved_spawn_cwd(None), expected);
1738 }
1739}