Skip to main content

running_process_core/pty/
mod.rs

1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::io::{Read, Write};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
10use thiserror::Error;
11
12/// Re-exports for downstream crates that need portable-pty types.
13pub mod reexports {
14    pub use portable_pty;
15}
16
17#[cfg(unix)]
18mod pty_posix;
19#[cfg(windows)]
20mod pty_windows;
21
22pub mod terminal_input;
23
24#[cfg(unix)]
25use pty_posix as pty_platform;
26
27#[derive(Debug, Error)]
28pub enum PtyError {
29    #[error("pseudo-terminal process already started")]
30    AlreadyStarted,
31    #[error("pseudo-terminal process is not running")]
32    NotRunning,
33    #[error("pseudo-terminal timed out")]
34    Timeout,
35    #[error("pseudo-terminal I/O error: {0}")]
36    Io(#[from] std::io::Error),
37    #[error("pseudo-terminal spawn failed: {0}")]
38    Spawn(String),
39    #[error("pseudo-terminal error: {0}")]
40    Other(String),
41}
42
43pub fn is_ignorable_process_control_error(err: &std::io::Error) -> bool {
44    if matches!(
45        err.kind(),
46        std::io::ErrorKind::NotFound | std::io::ErrorKind::InvalidInput
47    ) {
48        return true;
49    }
50    #[cfg(unix)]
51    if err.raw_os_error() == Some(libc::ESRCH) {
52        return true;
53    }
54    false
55}
56
57pub struct PtyReadState {
58    pub chunks: VecDeque<Vec<u8>>,
59    pub closed: bool,
60}
61
62pub struct PtyReadShared {
63    pub state: Mutex<PtyReadState>,
64    pub condvar: Condvar,
65}
66
67pub struct NativePtyHandles {
68    pub master: Box<dyn MasterPty + Send>,
69    pub writer: Box<dyn Write + Send>,
70    pub child: Box<dyn portable_pty::Child + Send + Sync>,
71    #[cfg(windows)]
72    pub _job: WindowsJobHandle,
73}
74
75#[cfg(windows)]
76pub struct WindowsJobHandle(pub usize);
77
78#[cfg(windows)]
79impl WindowsJobHandle {
80    /// Assign an additional process (by PID) to this Job Object.
81    pub fn assign_pid(&self, pid: u32) -> Result<(), std::io::Error> {
82        use winapi::um::handleapi::CloseHandle;
83        use winapi::um::processthreadsapi::OpenProcess;
84        use winapi::um::winnt::PROCESS_SET_QUOTA;
85        use winapi::um::winnt::PROCESS_TERMINATE;
86
87        let handle = unsafe { OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, 0, pid) };
88        if handle.is_null() {
89            return Err(std::io::Error::last_os_error());
90        }
91        let result = unsafe {
92            winapi::um::jobapi2::AssignProcessToJobObject(
93                self.0 as winapi::shared::ntdef::HANDLE,
94                handle,
95            )
96        };
97        unsafe { CloseHandle(handle) };
98        if result == 0 {
99            return Err(std::io::Error::last_os_error());
100        }
101        Ok(())
102    }
103}
104
105#[cfg(windows)]
106impl Drop for WindowsJobHandle {
107    fn drop(&mut self) {
108        unsafe {
109            winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
110        }
111    }
112}
113
114pub struct IdleMonitorState {
115    pub last_reset_at: Instant,
116    pub returncode: Option<i32>,
117    pub interrupted: bool,
118}
119
120/// Core idle detection logic, shareable across threads via Arc.
121/// The reader thread calls `record_output` directly.
122pub struct IdleDetectorCore {
123    pub timeout_seconds: f64,
124    pub stability_window_seconds: f64,
125    pub sample_interval_seconds: f64,
126    pub reset_on_input: bool,
127    pub reset_on_output: bool,
128    pub count_control_churn_as_output: bool,
129    pub enabled: Arc<AtomicBool>,
130    pub state: Mutex<IdleMonitorState>,
131    pub condvar: Condvar,
132}
133
134impl IdleDetectorCore {
135    pub fn record_input(&self, byte_count: usize) {
136        if !self.reset_on_input || byte_count == 0 {
137            return;
138        }
139        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
140        guard.last_reset_at = Instant::now();
141        self.condvar.notify_all();
142    }
143
144    pub fn record_output(&self, data: &[u8]) {
145        if !self.reset_on_output || data.is_empty() {
146            return;
147        }
148        let control_bytes = control_churn_bytes(data);
149        let visible_output_bytes = data.len().saturating_sub(control_bytes);
150        let active_output =
151            visible_output_bytes > 0 || (self.count_control_churn_as_output && control_bytes > 0);
152        if !active_output {
153            return;
154        }
155        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
156        guard.last_reset_at = Instant::now();
157        self.condvar.notify_all();
158    }
159
160    pub fn mark_exit(&self, returncode: i32, interrupted: bool) {
161        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
162        guard.returncode = Some(returncode);
163        guard.interrupted = interrupted;
164        self.condvar.notify_all();
165    }
166
167    pub fn enabled(&self) -> bool {
168        self.enabled.load(Ordering::Acquire)
169    }
170
171    pub fn set_enabled(&self, enabled: bool) {
172        let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
173        if enabled && !was_enabled {
174            let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
175            guard.last_reset_at = Instant::now();
176        }
177        self.condvar.notify_all();
178    }
179
180    pub fn wait(&self, timeout: Option<f64>) -> (bool, String, f64, Option<i32>) {
181        let started = Instant::now();
182        let overall_timeout = timeout.map(Duration::from_secs_f64);
183        let min_idle = self.timeout_seconds.max(self.stability_window_seconds);
184        let sample_interval = Duration::from_secs_f64(self.sample_interval_seconds.max(0.001));
185
186        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
187        loop {
188            let now = Instant::now();
189            let idle_for = now.duration_since(guard.last_reset_at).as_secs_f64();
190
191            if let Some(returncode) = guard.returncode {
192                let reason = if guard.interrupted {
193                    "interrupt"
194                } else {
195                    "process_exit"
196                };
197                return (false, reason.to_string(), idle_for, Some(returncode));
198            }
199
200            let enabled = self.enabled.load(Ordering::Acquire);
201            if enabled && idle_for >= min_idle {
202                return (true, "idle_timeout".to_string(), idle_for, None);
203            }
204
205            if let Some(limit) = overall_timeout {
206                if now.duration_since(started) >= limit {
207                    return (false, "timeout".to_string(), idle_for, None);
208                }
209            }
210
211            let idle_remaining = if enabled {
212                (min_idle - idle_for).max(0.0)
213            } else {
214                sample_interval.as_secs_f64()
215            };
216            let mut wait_for =
217                sample_interval.min(Duration::from_secs_f64(idle_remaining.max(0.001)));
218            if let Some(limit) = overall_timeout {
219                let elapsed = now.duration_since(started);
220                if elapsed < limit {
221                    let remaining = limit - elapsed;
222                    wait_for = wait_for.min(remaining);
223                }
224            }
225            let result = self
226                .condvar
227                .wait_timeout(guard, wait_for)
228                .expect("idle monitor mutex poisoned");
229            guard = result.0;
230        }
231    }
232}
233
234pub struct NativePtyProcess {
235    pub argv: Vec<String>,
236    pub cwd: Option<String>,
237    pub env: Option<Vec<(String, String)>>,
238    pub rows: u16,
239    pub cols: u16,
240    #[cfg(windows)]
241    pub nice: Option<i32>,
242    pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
243    pub reader: Arc<PtyReadShared>,
244    pub returncode: Arc<Mutex<Option<i32>>>,
245    pub input_bytes_total: Arc<AtomicUsize>,
246    pub newline_events_total: Arc<AtomicUsize>,
247    pub submit_events_total: Arc<AtomicUsize>,
248    /// When true, the reader thread writes PTY output to stdout.
249    pub echo: Arc<AtomicBool>,
250    /// When set, the reader thread feeds output directly to the idle detector.
251    pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
252    /// Visible (non-control) output bytes seen by the reader thread.
253    pub output_bytes_total: Arc<AtomicUsize>,
254    /// Control churn bytes (ANSI escapes, BS, CR, DEL) seen by the reader.
255    pub control_churn_bytes_total: Arc<AtomicUsize>,
256    pub reader_worker: Mutex<Option<thread::JoinHandle<()>>>,
257    pub terminal_input_relay_stop: Arc<AtomicBool>,
258    pub terminal_input_relay_active: Arc<AtomicBool>,
259    pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
260}
261
262impl NativePtyProcess {
263    pub fn new(
264        argv: Vec<String>,
265        cwd: Option<String>,
266        env: Option<Vec<(String, String)>>,
267        rows: u16,
268        cols: u16,
269        nice: Option<i32>,
270    ) -> Result<Self, PtyError> {
271        if argv.is_empty() {
272            return Err(PtyError::Other("command cannot be empty".into()));
273        }
274        #[cfg(not(windows))]
275        let _ = nice;
276        Ok(Self {
277            argv,
278            cwd,
279            env,
280            rows,
281            cols,
282            #[cfg(windows)]
283            nice,
284            handles: Arc::new(Mutex::new(None)),
285            reader: Arc::new(PtyReadShared {
286                state: Mutex::new(PtyReadState {
287                    chunks: VecDeque::new(),
288                    closed: false,
289                }),
290                condvar: Condvar::new(),
291            }),
292            returncode: Arc::new(Mutex::new(None)),
293            input_bytes_total: Arc::new(AtomicUsize::new(0)),
294            newline_events_total: Arc::new(AtomicUsize::new(0)),
295            submit_events_total: Arc::new(AtomicUsize::new(0)),
296            echo: Arc::new(AtomicBool::new(false)),
297            idle_detector: Arc::new(Mutex::new(None)),
298            output_bytes_total: Arc::new(AtomicUsize::new(0)),
299            control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
300            reader_worker: Mutex::new(None),
301            terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
302            terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
303            terminal_input_relay_worker: Mutex::new(None),
304        })
305    }
306
307    pub fn mark_reader_closed(&self) {
308        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
309        guard.closed = true;
310        self.reader.condvar.notify_all();
311    }
312
313    pub fn store_returncode(&self, code: i32) {
314        store_pty_returncode(&self.returncode, code);
315    }
316
317    fn join_reader_worker(&self) {
318        if let Some(worker) = self
319            .reader_worker
320            .lock()
321            .expect("pty reader worker mutex poisoned")
322            .take()
323        {
324            let _ = worker.join();
325        }
326    }
327
328    pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
329        record_pty_input_metrics(
330            &self.input_bytes_total,
331            &self.newline_events_total,
332            &self.submit_events_total,
333            data,
334            submit,
335        );
336    }
337
338    pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
339        self.record_input_metrics(data, submit);
340        write_pty_input(&self.handles, data)?;
341        Ok(())
342    }
343
344    pub fn request_terminal_input_relay_stop(&self) {
345        self.terminal_input_relay_stop
346            .store(true, Ordering::Release);
347        self.terminal_input_relay_active
348            .store(false, Ordering::Release);
349    }
350
351    pub fn start_terminal_input_relay_impl(&self) -> Result<(), PtyError> {
352        let mut worker_guard = self
353            .terminal_input_relay_worker
354            .lock()
355            .expect("pty terminal input relay mutex poisoned");
356        if worker_guard.is_some() && self.terminal_input_relay_active() {
357            return Ok(());
358        }
359        if self
360            .handles
361            .lock()
362            .expect("pty handles mutex poisoned")
363            .is_none()
364        {
365            return Err(PtyError::NotRunning);
366        }
367
368        self.terminal_input_relay_stop
369            .store(false, Ordering::Release);
370        self.terminal_input_relay_active
371            .store(true, Ordering::Release);
372
373        let handles = Arc::clone(&self.handles);
374        let returncode = Arc::clone(&self.returncode);
375        let input_bytes_total = Arc::clone(&self.input_bytes_total);
376        let newline_events_total = Arc::clone(&self.newline_events_total);
377        let submit_events_total = Arc::clone(&self.submit_events_total);
378        let stop = Arc::clone(&self.terminal_input_relay_stop);
379        let active = Arc::clone(&self.terminal_input_relay_active);
380
381        #[cfg(windows)]
382        {
383            let capture = terminal_input::TerminalInputCore::new();
384            capture.start_impl().map_err(PtyError::Io)?;
385            *worker_guard = Some(thread::spawn(move || {
386                loop {
387                    if stop.load(Ordering::Acquire) {
388                        break;
389                    }
390                    match poll_pty_process(&handles, &returncode) {
391                        Ok(Some(_)) => break,
392                        Ok(None) => {}
393                        Err(_) => break,
394                    }
395                    match terminal_input::wait_for_terminal_input_event(
396                        &capture.state,
397                        &capture.condvar,
398                        Some(Duration::from_millis(50)),
399                    ) {
400                        terminal_input::TerminalInputWaitOutcome::Event(event) => {
401                            record_pty_input_metrics(
402                                &input_bytes_total,
403                                &newline_events_total,
404                                &submit_events_total,
405                                &event.data,
406                                event.submit,
407                            );
408                            if write_pty_input(&handles, &event.data).is_err() {
409                                break;
410                            }
411                        }
412                        terminal_input::TerminalInputWaitOutcome::Timeout => continue,
413                        terminal_input::TerminalInputWaitOutcome::Closed => break,
414                    }
415                }
416                active.store(false, Ordering::Release);
417                let _ = capture.stop_impl();
418            }));
419            Ok(())
420        }
421
422        #[cfg(unix)]
423        {
424            if unsafe { libc::isatty(libc::STDIN_FILENO) } != 1 {
425                self.terminal_input_relay_active
426                    .store(false, Ordering::Release);
427                return Ok(());
428            }
429
430            *worker_guard = Some(thread::spawn(move || {
431                posix_terminal_input_relay_worker(
432                    handles,
433                    returncode,
434                    input_bytes_total,
435                    newline_events_total,
436                    submit_events_total,
437                    stop,
438                    active,
439                );
440            }));
441            Ok(())
442        }
443    }
444
445    pub fn stop_terminal_input_relay_impl(&self) {
446        self.request_terminal_input_relay_stop();
447        if let Some(worker) = self
448            .terminal_input_relay_worker
449            .lock()
450            .expect("pty terminal input relay mutex poisoned")
451            .take()
452        {
453            let _ = worker.join();
454        }
455    }
456
457    pub fn terminal_input_relay_active(&self) -> bool {
458        self.terminal_input_relay_active.load(Ordering::Acquire)
459    }
460
461    /// Synchronously tear down the PTY and reap the child.
462    #[inline(never)]
463    pub fn close_impl(&self) -> Result<(), PtyError> {
464        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_impl");
465        self.stop_terminal_input_relay_impl();
466        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
467        let Some(handles) = guard.take() else {
468            self.mark_reader_closed();
469            return Ok(());
470        };
471        drop(guard);
472
473        #[cfg(windows)]
474        let NativePtyHandles {
475            master,
476            writer,
477            mut child,
478            _job,
479        } = handles;
480        #[cfg(not(windows))]
481        let NativePtyHandles {
482            master,
483            writer,
484            mut child,
485        } = handles;
486
487        #[cfg(windows)]
488        {
489            {
490                crate::rp_rust_debug_scope!(
491                    "running_process_core::NativePtyProcess::close_impl.drop_job"
492                );
493                drop(_job);
494            }
495
496            {
497                crate::rp_rust_debug_scope!(
498                    "running_process_core::NativePtyProcess::close_impl.wait_job_exit"
499                );
500                let wait_deadline = Instant::now() + Duration::from_secs(2);
501                loop {
502                    match child.try_wait() {
503                        Ok(Some(status)) => {
504                            let code = portable_exit_code(status);
505                            self.store_returncode(code);
506                            break;
507                        }
508                        Ok(None) if Instant::now() < wait_deadline => {
509                            thread::sleep(Duration::from_millis(10));
510                        }
511                        Ok(None) => {
512                            if let Err(err) = child.kill() {
513                                if !is_ignorable_process_control_error(&err) {
514                                    return Err(PtyError::Io(err));
515                                }
516                            }
517                            let code = match child.wait() {
518                                Ok(status) => portable_exit_code(status),
519                                Err(_) => -9,
520                            };
521                            self.store_returncode(code);
522                            break;
523                        }
524                        Err(_) => {
525                            self.store_returncode(-9);
526                            break;
527                        }
528                    }
529                }
530            }
531            {
532                crate::rp_rust_debug_scope!(
533                    "running_process_core::NativePtyProcess::close_impl.drop_writer"
534                );
535                drop(writer);
536            }
537            {
538                crate::rp_rust_debug_scope!(
539                    "running_process_core::NativePtyProcess::close_impl.drop_master"
540                );
541                drop(master);
542            }
543            drop(child);
544            {
545                crate::rp_rust_debug_scope!(
546                    "running_process_core::NativePtyProcess::close_impl.join_reader"
547                );
548                self.join_reader_worker();
549            }
550            self.mark_reader_closed();
551            return Ok(());
552        }
553
554        #[cfg(not(windows))]
555        {
556            drop(writer);
557            drop(master);
558
559            let code = {
560                crate::rp_rust_debug_scope!(
561                    "running_process_core::NativePtyProcess::close_impl.wait_child"
562                );
563                match child.wait() {
564                    Ok(status) => portable_exit_code(status),
565                    Err(_) => -9,
566                }
567            };
568            drop(child);
569
570            self.store_returncode(code);
571            {
572                crate::rp_rust_debug_scope!(
573                    "running_process_core::NativePtyProcess::close_impl.join_reader"
574                );
575                self.join_reader_worker();
576            }
577            self.mark_reader_closed();
578            return Ok(());
579        }
580    }
581
582    /// Best-effort, non-blocking teardown for use from `Drop`.
583    #[inline(never)]
584    pub fn close_nonblocking(&self) {
585        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_nonblocking");
586        #[cfg(windows)]
587        self.request_terminal_input_relay_stop();
588        let Ok(mut guard) = self.handles.lock() else {
589            return;
590        };
591        let Some(handles) = guard.take() else {
592            self.mark_reader_closed();
593            return;
594        };
595        drop(guard);
596
597        #[cfg(windows)]
598        let NativePtyHandles {
599            master,
600            writer,
601            mut child,
602            _job,
603        } = handles;
604        #[cfg(not(windows))]
605        let NativePtyHandles {
606            master,
607            writer,
608            mut child,
609        } = handles;
610
611        if let Err(err) = child.kill() {
612            if !is_ignorable_process_control_error(&err) {
613                return;
614            }
615        }
616        drop(writer);
617        drop(master);
618        drop(child);
619        #[cfg(windows)]
620        drop(_job);
621        self.mark_reader_closed();
622    }
623
624    pub fn start_impl(&self) -> Result<(), PtyError> {
625        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::start");
626        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
627        if guard.is_some() {
628            return Err(PtyError::AlreadyStarted);
629        }
630
631        // Snapshot our conhost.exe children before openpty() so we can diff
632        // after spawn to find the new conhost.exe created by ConPTY.
633        #[cfg(windows)]
634        let conhost_pids_before = conhost_children_of_current_process();
635
636        let pty_system = native_pty_system();
637        let pair = pty_system
638            .openpty(PtySize {
639                rows: self.rows,
640                cols: self.cols,
641                pixel_width: 0,
642                pixel_height: 0,
643            })
644            .map_err(|e| PtyError::Spawn(e.to_string()))?;
645
646        let mut cmd = command_builder_from_argv(&self.argv);
647        if let Some(cwd) = &self.cwd {
648            cmd.cwd(cwd);
649        }
650        if let Some(env) = &self.env {
651            cmd.env_clear();
652            for (key, value) in env {
653                cmd.env(key, value);
654            }
655        }
656
657        let reader = pair
658            .master
659            .try_clone_reader()
660            .map_err(|e| PtyError::Spawn(e.to_string()))?;
661        let writer = pair
662            .master
663            .take_writer()
664            .map_err(|e| PtyError::Spawn(e.to_string()))?;
665        let child = pair
666            .slave
667            .spawn_command(cmd)
668            .map_err(|e| PtyError::Spawn(e.to_string()))?;
669        #[cfg(windows)]
670        let job = assign_child_to_windows_kill_on_close_job(child.as_raw_handle())?;
671        #[cfg(windows)]
672        assign_conpty_conhost_to_job(&job, &conhost_pids_before);
673        #[cfg(windows)]
674        apply_windows_pty_priority(child.as_raw_handle(), self.nice)?;
675        let shared = Arc::clone(&self.reader);
676        let echo = Arc::clone(&self.echo);
677        let idle_detector = Arc::clone(&self.idle_detector);
678        let output_bytes = Arc::clone(&self.output_bytes_total);
679        let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
680        let reader_worker = thread::spawn(move || {
681            spawn_pty_reader(
682                reader,
683                shared,
684                echo,
685                idle_detector,
686                output_bytes,
687                churn_bytes,
688            );
689        });
690        *self
691            .reader_worker
692            .lock()
693            .expect("pty reader worker mutex poisoned") = Some(reader_worker);
694
695        *guard = Some(NativePtyHandles {
696            master: pair.master,
697            writer,
698            child,
699            #[cfg(windows)]
700            _job: job,
701        });
702        Ok(())
703    }
704
705    pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
706        #[cfg(windows)]
707        {
708            pty_windows::respond_to_queries(self, data)
709        }
710
711        #[cfg(unix)]
712        {
713            pty_platform::respond_to_queries(self, data)
714        }
715    }
716
717    pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
718        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::resize");
719        let guard = self.handles.lock().expect("pty handles mutex poisoned");
720        if let Some(handles) = guard.as_ref() {
721            #[cfg(windows)]
722            {
723                let _ = (rows, cols, handles);
724                // ConPTY resize can leave ClosePseudoConsole blocked during
725                // teardown on Windows. Keep resize as a no-op until the
726                // backend can cancel the outstanding PTY read safely.
727                return Ok(());
728            }
729
730            #[cfg(not(windows))]
731            handles
732                .master
733                .resize(PtySize {
734                    rows,
735                    cols,
736                    pixel_width: 0,
737                    pixel_height: 0,
738                })
739                .map_err(|e| PtyError::Other(e.to_string()))?;
740        }
741        Ok(())
742    }
743
744    pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
745        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::send_interrupt");
746        #[cfg(windows)]
747        {
748            pty_windows::send_interrupt(self)
749        }
750
751        #[cfg(unix)]
752        {
753            pty_platform::send_interrupt(self)
754        }
755    }
756
757    pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
758        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::wait");
759        // Fast path: already exited.
760        if let Some(code) = *self
761            .returncode
762            .lock()
763            .expect("pty returncode mutex poisoned")
764        {
765            return Ok(code);
766        }
767        let start = Instant::now();
768        loop {
769            if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
770                return Ok(code);
771            }
772            if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
773                return Err(PtyError::Timeout);
774            }
775            thread::sleep(Duration::from_millis(10));
776        }
777    }
778
779    pub fn terminate_impl(&self) -> Result<(), PtyError> {
780        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate");
781        #[cfg(windows)]
782        {
783            if self
784                .handles
785                .lock()
786                .expect("pty handles mutex poisoned")
787                .is_none()
788            {
789                return Err(PtyError::NotRunning);
790            }
791            return self.close_impl();
792        }
793
794        #[cfg(unix)]
795        {
796            pty_platform::terminate(self)
797        }
798    }
799
800    pub fn kill_impl(&self) -> Result<(), PtyError> {
801        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill");
802        #[cfg(windows)]
803        {
804            if self
805                .handles
806                .lock()
807                .expect("pty handles mutex poisoned")
808                .is_none()
809            {
810                return Err(PtyError::NotRunning);
811            }
812            return self.close_impl();
813        }
814
815        #[cfg(unix)]
816        {
817            pty_platform::kill(self)
818        }
819    }
820
821    pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
822        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate_tree");
823        #[cfg(windows)]
824        {
825            pty_windows::terminate_tree(self)
826        }
827
828        #[cfg(unix)]
829        {
830            pty_platform::terminate_tree(self)
831        }
832    }
833
834    pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
835        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill_tree");
836        #[cfg(windows)]
837        {
838            pty_windows::kill_tree(self)
839        }
840
841        #[cfg(unix)]
842        {
843            pty_platform::kill_tree(self)
844        }
845    }
846
847    /// Get the PID of the child process, if running.
848    pub fn pid(&self) -> Result<Option<u32>, PtyError> {
849        let guard = self.handles.lock().expect("pty handles mutex poisoned");
850        if let Some(handles) = guard.as_ref() {
851            #[cfg(unix)]
852            if let Some(pid) = handles.master.process_group_leader() {
853                if let Ok(pid) = u32::try_from(pid) {
854                    return Ok(Some(pid));
855                }
856            }
857            return Ok(handles.child.process_id());
858        }
859        Ok(None)
860    }
861
862    /// Wait for a chunk of output from the PTY reader.
863    /// Returns `Ok(Some(chunk))` on data, `Ok(None)` on timeout, `Err` on closed.
864    pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
865        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
866        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
867        loop {
868            if let Some(chunk) = guard.chunks.pop_front() {
869                return Ok(Some(chunk));
870            }
871            if guard.closed {
872                return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
873            }
874            match deadline {
875                Some(deadline) => {
876                    let now = Instant::now();
877                    if now >= deadline {
878                        return Ok(None); // timeout
879                    }
880                    let wait = deadline.saturating_duration_since(now);
881                    let result = self
882                        .reader
883                        .condvar
884                        .wait_timeout(guard, wait)
885                        .expect("pty read mutex poisoned");
886                    guard = result.0;
887                }
888                None => {
889                    guard = self
890                        .reader
891                        .condvar
892                        .wait(guard)
893                        .expect("pty read mutex poisoned");
894                }
895            }
896        }
897    }
898
899    /// Wait for the reader thread to close.
900    pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
901        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
902        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
903        loop {
904            if guard.closed {
905                return true;
906            }
907            match deadline {
908                Some(deadline) => {
909                    let now = Instant::now();
910                    if now >= deadline {
911                        return false;
912                    }
913                    let wait = deadline.saturating_duration_since(now);
914                    let result = self
915                        .reader
916                        .condvar
917                        .wait_timeout(guard, wait)
918                        .expect("pty read mutex poisoned");
919                    guard = result.0;
920                }
921                None => {
922                    guard = self
923                        .reader
924                        .condvar
925                        .wait(guard)
926                        .expect("pty read mutex poisoned");
927                }
928            }
929        }
930    }
931
932    /// Wait for exit then drain remaining output.
933    pub fn wait_and_drain_impl(
934        &self,
935        timeout: Option<f64>,
936        drain_timeout: f64,
937    ) -> Result<i32, PtyError> {
938        let code = self.wait_impl(timeout)?;
939        let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
940        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
941        while !guard.closed {
942            let remaining = deadline.saturating_duration_since(Instant::now());
943            if remaining.is_zero() {
944                break;
945            }
946            let result = self
947                .reader
948                .condvar
949                .wait_timeout(guard, remaining)
950                .expect("pty read mutex poisoned");
951            guard = result.0;
952        }
953        Ok(code)
954    }
955
956    pub fn set_echo(&self, enabled: bool) {
957        self.echo.store(enabled, Ordering::Release);
958    }
959
960    pub fn echo_enabled(&self) -> bool {
961        self.echo.load(Ordering::Acquire)
962    }
963
964    pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
965        let mut guard = self
966            .idle_detector
967            .lock()
968            .expect("idle detector mutex poisoned");
969        *guard = Some(Arc::clone(detector));
970    }
971
972    pub fn detach_idle_detector(&self) {
973        let mut guard = self
974            .idle_detector
975            .lock()
976            .expect("idle detector mutex poisoned");
977        *guard = None;
978    }
979
980    pub fn pty_input_bytes_total(&self) -> usize {
981        self.input_bytes_total.load(Ordering::Acquire)
982    }
983
984    pub fn pty_newline_events_total(&self) -> usize {
985        self.newline_events_total.load(Ordering::Acquire)
986    }
987
988    pub fn pty_submit_events_total(&self) -> usize {
989        self.submit_events_total.load(Ordering::Acquire)
990    }
991
992    pub fn pty_output_bytes_total(&self) -> usize {
993        self.output_bytes_total.load(Ordering::Acquire)
994    }
995
996    pub fn pty_control_churn_bytes_total(&self) -> usize {
997        self.control_churn_bytes_total.load(Ordering::Acquire)
998    }
999}
1000
1001/// Safe defaults for a real interactive PTY session.
1002///
1003/// The helper turns on the parts that a terminal-style session usually needs:
1004/// output echo, terminal input relay, and automatic PTY query replies.
1005#[derive(Debug, Clone, Copy)]
1006pub struct InteractivePtyOptions {
1007    pub echo_output: bool,
1008    pub relay_terminal_input: bool,
1009    pub respond_to_queries: bool,
1010}
1011
1012impl Default for InteractivePtyOptions {
1013    fn default() -> Self {
1014        Self {
1015            echo_output: true,
1016            relay_terminal_input: true,
1017            respond_to_queries: true,
1018        }
1019    }
1020}
1021
1022#[derive(Debug, Default)]
1023pub struct InteractivePtyPumpResult {
1024    pub chunks: Vec<Vec<u8>>,
1025    pub stream_closed: bool,
1026}
1027
1028/// Canonical interactive PTY recipe for downstream Rust consumers.
1029///
1030/// `NativePtyProcess` remains the low-level primitive. This wrapper owns the
1031/// interactive setup that callers commonly forget to assemble correctly.
1032pub struct InteractivePtySession {
1033    process: NativePtyProcess,
1034    options: InteractivePtyOptions,
1035}
1036
1037impl InteractivePtySession {
1038    pub fn new(process: NativePtyProcess) -> Self {
1039        Self::with_options(process, InteractivePtyOptions::default())
1040    }
1041
1042    pub fn with_options(process: NativePtyProcess, options: InteractivePtyOptions) -> Self {
1043        Self { process, options }
1044    }
1045
1046    pub fn process(&self) -> &NativePtyProcess {
1047        &self.process
1048    }
1049
1050    pub fn start(&self) -> Result<(), PtyError> {
1051        self.process.set_echo(self.options.echo_output);
1052        self.process.start_impl()?;
1053        if self.options.relay_terminal_input {
1054            self.process.start_terminal_input_relay_impl()?;
1055        }
1056        Ok(())
1057    }
1058
1059    pub fn pump_output(
1060        &self,
1061        timeout: Option<f64>,
1062        consume_all: bool,
1063    ) -> Result<InteractivePtyPumpResult, PtyError> {
1064        let mut pumped = InteractivePtyPumpResult::default();
1065        let mut next_timeout = timeout;
1066        loop {
1067            match self.process.read_chunk_impl(next_timeout) {
1068                Ok(Some(chunk)) => {
1069                    if self.options.respond_to_queries {
1070                        self.process.respond_to_queries_impl(&chunk)?;
1071                    }
1072                    pumped.chunks.push(chunk);
1073                    if !consume_all {
1074                        break;
1075                    }
1076                    next_timeout = Some(0.0);
1077                }
1078                Ok(None) => break,
1079                Err(PtyError::Other(message)) if message == "Pseudo-terminal stream is closed" => {
1080                    pumped.stream_closed = true;
1081                    break;
1082                }
1083                Err(err) => return Err(err),
1084            }
1085        }
1086        Ok(pumped)
1087    }
1088
1089    pub fn resize(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
1090        self.process.resize_impl(rows, cols)
1091    }
1092
1093    pub fn send_interrupt(&self) -> Result<(), PtyError> {
1094        self.process.send_interrupt_impl()
1095    }
1096
1097    pub fn wait(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
1098        self.process.wait_impl(timeout)
1099    }
1100
1101    pub fn wait_and_drain(
1102        &self,
1103        timeout: Option<f64>,
1104        drain_timeout: f64,
1105    ) -> Result<i32, PtyError> {
1106        self.process.wait_and_drain_impl(timeout, drain_timeout)
1107    }
1108
1109    pub fn terminate(&self) -> Result<(), PtyError> {
1110        self.process.terminate_impl()
1111    }
1112
1113    pub fn kill(&self) -> Result<(), PtyError> {
1114        self.process.kill_impl()
1115    }
1116
1117    pub fn close(&self) -> Result<(), PtyError> {
1118        self.process.close_impl()
1119    }
1120}
1121
1122impl Drop for NativePtyProcess {
1123    fn drop(&mut self) {
1124        self.close_nonblocking();
1125    }
1126}
1127
1128// ── Helper functions ──
1129
1130pub fn control_churn_bytes(data: &[u8]) -> usize {
1131    let mut total = 0;
1132    let mut index = 0;
1133    while index < data.len() {
1134        let byte = data[index];
1135        if byte == 0x1B {
1136            let start = index;
1137            index += 1;
1138            if index < data.len() && data[index] == b'[' {
1139                index += 1;
1140                while index < data.len() {
1141                    let current = data[index];
1142                    index += 1;
1143                    if (0x40..=0x7E).contains(&current) {
1144                        break;
1145                    }
1146                }
1147            }
1148            total += index - start;
1149            continue;
1150        }
1151        if matches!(byte, 0x08 | 0x0D | 0x7F) {
1152            total += 1;
1153        }
1154        index += 1;
1155    }
1156    total
1157}
1158
1159pub fn command_builder_from_argv(argv: &[String]) -> CommandBuilder {
1160    let mut command = CommandBuilder::new(&argv[0]);
1161    if argv.len() > 1 {
1162        command.args(
1163            argv[1..]
1164                .iter()
1165                .map(OsString::from)
1166                .collect::<Vec<OsString>>(),
1167        );
1168    }
1169    command
1170}
1171
1172#[inline(never)]
1173pub fn spawn_pty_reader(
1174    mut reader: Box<dyn Read + Send>,
1175    shared: Arc<PtyReadShared>,
1176    echo: Arc<AtomicBool>,
1177    idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
1178    output_bytes_total: Arc<AtomicUsize>,
1179    control_churn_bytes_total: Arc<AtomicUsize>,
1180) {
1181    crate::rp_rust_debug_scope!("running_process_core::spawn_pty_reader");
1182    let idle_detector_snapshot = idle_detector
1183        .lock()
1184        .expect("idle detector mutex poisoned")
1185        .clone();
1186    let mut chunk = vec![0_u8; 65536];
1187    loop {
1188        match reader.read(&mut chunk) {
1189            Ok(0) => break,
1190            Ok(n) => {
1191                let data = &chunk[..n];
1192
1193                let churn = control_churn_bytes(data);
1194                let visible = data.len().saturating_sub(churn);
1195                output_bytes_total.fetch_add(visible, Ordering::Relaxed);
1196                control_churn_bytes_total.fetch_add(churn, Ordering::Relaxed);
1197
1198                if echo.load(Ordering::Relaxed) {
1199                    let _ = std::io::stdout().write_all(data);
1200                    let _ = std::io::stdout().flush();
1201                }
1202
1203                if let Some(ref detector) = idle_detector_snapshot {
1204                    detector.record_output(data);
1205                }
1206
1207                let mut guard = shared.state.lock().expect("pty read mutex poisoned");
1208                guard.chunks.push_back(data.to_vec());
1209                shared.condvar.notify_all();
1210            }
1211            Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
1212            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
1213                thread::sleep(Duration::from_millis(10));
1214                continue;
1215            }
1216            Err(_) => break,
1217        }
1218    }
1219    let mut guard = shared.state.lock().expect("pty read mutex poisoned");
1220    guard.closed = true;
1221    shared.condvar.notify_all();
1222}
1223
1224pub fn portable_exit_code(status: portable_pty::ExitStatus) -> i32 {
1225    if let Some(signal) = status.signal() {
1226        let signal = signal.to_ascii_lowercase();
1227        if signal.contains("interrupt") {
1228            return -2;
1229        }
1230        if signal.contains("terminated") {
1231            return -15;
1232        }
1233        if signal.contains("killed") {
1234            return -9;
1235        }
1236    }
1237    status.exit_code() as i32
1238}
1239
1240pub fn input_contains_newline(data: &[u8]) -> bool {
1241    data.iter().any(|byte| matches!(*byte, b'\r' | b'\n'))
1242}
1243
1244#[cfg(unix)]
1245struct PosixTerminalModeGuard {
1246    stdin_fd: i32,
1247    original_mode: libc::termios,
1248}
1249
1250#[cfg(unix)]
1251impl Drop for PosixTerminalModeGuard {
1252    fn drop(&mut self) {
1253        unsafe {
1254            libc::tcsetattr(self.stdin_fd, libc::TCSANOW, &self.original_mode);
1255        }
1256    }
1257}
1258
1259#[cfg(unix)]
1260fn acquire_posix_terminal_mode_guard() -> Result<PosixTerminalModeGuard, std::io::Error> {
1261    let stdin_fd = libc::STDIN_FILENO;
1262    let mut original_mode = unsafe { std::mem::zeroed::<libc::termios>() };
1263    if unsafe { libc::tcgetattr(stdin_fd, &mut original_mode) } != 0 {
1264        return Err(std::io::Error::last_os_error());
1265    }
1266    let mut raw_mode = original_mode;
1267    unsafe {
1268        libc::cfmakeraw(&mut raw_mode);
1269    }
1270    if unsafe { libc::tcsetattr(stdin_fd, libc::TCSANOW, &raw_mode) } != 0 {
1271        return Err(std::io::Error::last_os_error());
1272    }
1273    Ok(PosixTerminalModeGuard {
1274        stdin_fd,
1275        original_mode,
1276    })
1277}
1278
1279#[cfg(unix)]
1280#[inline(never)]
1281fn posix_terminal_input_relay_worker(
1282    handles: Arc<Mutex<Option<NativePtyHandles>>>,
1283    returncode: Arc<Mutex<Option<i32>>>,
1284    input_bytes_total: Arc<AtomicUsize>,
1285    newline_events_total: Arc<AtomicUsize>,
1286    submit_events_total: Arc<AtomicUsize>,
1287    stop: Arc<AtomicBool>,
1288    active: Arc<AtomicBool>,
1289) {
1290    let _terminal_guard = match acquire_posix_terminal_mode_guard() {
1291        Ok(guard) => guard,
1292        Err(_) => {
1293            active.store(false, Ordering::Release);
1294            return;
1295        }
1296    };
1297
1298    let stdin_fd = libc::STDIN_FILENO;
1299    let mut buffer = vec![0_u8; 65536];
1300    loop {
1301        if stop.load(Ordering::Acquire) {
1302            break;
1303        }
1304        match poll_pty_process(&handles, &returncode) {
1305            Ok(Some(_)) => break,
1306            Ok(None) => {}
1307            Err(_) => break,
1308        }
1309
1310        let mut pollfd = libc::pollfd {
1311            fd: stdin_fd,
1312            events: libc::POLLIN,
1313            revents: 0,
1314        };
1315        let poll_result = unsafe { libc::poll(&mut pollfd, 1, 50) };
1316        if poll_result < 0 {
1317            let err = std::io::Error::last_os_error();
1318            if err.kind() == std::io::ErrorKind::Interrupted {
1319                continue;
1320            }
1321            break;
1322        }
1323        if poll_result == 0 || pollfd.revents & libc::POLLIN == 0 {
1324            continue;
1325        }
1326
1327        let read_result = unsafe { libc::read(stdin_fd, buffer.as_mut_ptr().cast(), buffer.len()) };
1328        if read_result < 0 {
1329            let err = std::io::Error::last_os_error();
1330            if err.kind() == std::io::ErrorKind::Interrupted {
1331                continue;
1332            }
1333            break;
1334        }
1335        if read_result == 0 {
1336            continue;
1337        }
1338
1339        let mut data = buffer[..read_result as usize].to_vec();
1340        loop {
1341            let mut drain_pollfd = libc::pollfd {
1342                fd: stdin_fd,
1343                events: libc::POLLIN,
1344                revents: 0,
1345            };
1346            let drain_ready = unsafe { libc::poll(&mut drain_pollfd, 1, 0) };
1347            if drain_ready <= 0 || drain_pollfd.revents & libc::POLLIN == 0 {
1348                break;
1349            }
1350            let drain_result =
1351                unsafe { libc::read(stdin_fd, buffer.as_mut_ptr().cast(), buffer.len()) };
1352            if drain_result <= 0 {
1353                break;
1354            }
1355            data.extend_from_slice(&buffer[..drain_result as usize]);
1356        }
1357
1358        record_pty_input_metrics(
1359            &input_bytes_total,
1360            &newline_events_total,
1361            &submit_events_total,
1362            &data,
1363            input_contains_newline(&data),
1364        );
1365        if write_pty_input(&handles, &data).is_err() {
1366            break;
1367        }
1368    }
1369
1370    active.store(false, Ordering::Release);
1371}
1372
1373pub fn record_pty_input_metrics(
1374    input_bytes_total: &Arc<AtomicUsize>,
1375    newline_events_total: &Arc<AtomicUsize>,
1376    submit_events_total: &Arc<AtomicUsize>,
1377    data: &[u8],
1378    submit: bool,
1379) {
1380    input_bytes_total.fetch_add(data.len(), Ordering::AcqRel);
1381    if input_contains_newline(data) {
1382        newline_events_total.fetch_add(1, Ordering::AcqRel);
1383    }
1384    if submit {
1385        submit_events_total.fetch_add(1, Ordering::AcqRel);
1386    }
1387}
1388
1389pub fn store_pty_returncode(returncode: &Arc<Mutex<Option<i32>>>, code: i32) {
1390    *returncode.lock().expect("pty returncode mutex poisoned") = Some(code);
1391}
1392
1393pub fn poll_pty_process(
1394    handles: &Arc<Mutex<Option<NativePtyHandles>>>,
1395    returncode: &Arc<Mutex<Option<i32>>>,
1396) -> Result<Option<i32>, std::io::Error> {
1397    let mut guard = handles.lock().expect("pty handles mutex poisoned");
1398    let Some(handles) = guard.as_mut() else {
1399        return Ok(*returncode.lock().expect("pty returncode mutex poisoned"));
1400    };
1401    let status = handles.child.try_wait()?;
1402    let code = status.map(portable_exit_code);
1403    if let Some(code) = code {
1404        store_pty_returncode(returncode, code);
1405        return Ok(Some(code));
1406    }
1407    Ok(None)
1408}
1409
1410pub fn write_pty_input(
1411    handles: &Arc<Mutex<Option<NativePtyHandles>>>,
1412    data: &[u8],
1413) -> Result<(), std::io::Error> {
1414    let mut guard = handles.lock().expect("pty handles mutex poisoned");
1415    let handles = guard.as_mut().ok_or_else(|| {
1416        std::io::Error::new(
1417            std::io::ErrorKind::NotConnected,
1418            "Pseudo-terminal process is not running",
1419        )
1420    })?;
1421    #[cfg(windows)]
1422    let payload = pty_windows::input_payload(data);
1423    #[cfg(unix)]
1424    let payload = pty_platform::input_payload(data);
1425    handles.writer.write_all(&payload)?;
1426    handles.writer.flush()
1427}
1428
1429#[cfg(windows)]
1430pub fn windows_terminal_input_payload(data: &[u8]) -> Vec<u8> {
1431    let mut translated = Vec::with_capacity(data.len());
1432    let mut index = 0usize;
1433    while index < data.len() {
1434        let current = data[index];
1435        if current == b'\r' {
1436            translated.push(current);
1437            if index + 1 < data.len() && data[index + 1] == b'\n' {
1438                translated.push(b'\n');
1439                index += 2;
1440                continue;
1441            }
1442            index += 1;
1443            continue;
1444        }
1445        if current == b'\n' {
1446            translated.push(b'\r');
1447            index += 1;
1448            continue;
1449        }
1450        translated.push(current);
1451        index += 1;
1452    }
1453    translated
1454}
1455
1456#[cfg(windows)]
1457#[inline(never)]
1458pub fn assign_child_to_windows_kill_on_close_job(
1459    handle: Option<std::os::windows::io::RawHandle>,
1460) -> Result<WindowsJobHandle, PtyError> {
1461    crate::rp_rust_debug_scope!(
1462        "running_process_core::pty::assign_child_to_windows_kill_on_close_job"
1463    );
1464    use std::mem::zeroed;
1465
1466    use winapi::shared::minwindef::FALSE;
1467    use winapi::um::handleapi::INVALID_HANDLE_VALUE;
1468    use winapi::um::jobapi2::{
1469        AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
1470    };
1471    use winapi::um::winnt::{
1472        JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
1473        JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
1474    };
1475
1476    let Some(handle) = handle else {
1477        return Err(PtyError::Other(
1478            "Pseudo-terminal child does not expose a Windows process handle".into(),
1479        ));
1480    };
1481
1482    let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
1483    if job.is_null() || job == INVALID_HANDLE_VALUE {
1484        return Err(PtyError::Io(std::io::Error::last_os_error()));
1485    }
1486
1487    let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
1488    info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
1489    let result = unsafe {
1490        SetInformationJobObject(
1491            job,
1492            JobObjectExtendedLimitInformation,
1493            (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
1494            std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
1495        )
1496    };
1497    if result == FALSE {
1498        let err = std::io::Error::last_os_error();
1499        unsafe {
1500            winapi::um::handleapi::CloseHandle(job);
1501        }
1502        return Err(PtyError::Io(err));
1503    }
1504
1505    let result = unsafe { AssignProcessToJobObject(job, handle.cast()) };
1506    if result == FALSE {
1507        let err = std::io::Error::last_os_error();
1508        unsafe {
1509            winapi::um::handleapi::CloseHandle(job);
1510        }
1511        return Err(PtyError::Io(err));
1512    }
1513
1514    Ok(WindowsJobHandle(job as usize))
1515}
1516
1517/// Information about a child process found via Toolhelp snapshot.
1518#[cfg(windows)]
1519#[derive(Debug, Clone)]
1520pub struct ChildProcessInfo {
1521    pub pid: u32,
1522    pub name: String,
1523}
1524
1525/// Find all direct child processes of a given parent PID using the Windows Toolhelp API.
1526/// Returns PID and process name for each child.
1527#[cfg(windows)]
1528pub fn find_child_processes(parent_pid: u32) -> Vec<ChildProcessInfo> {
1529    use winapi::um::handleapi::CloseHandle;
1530    use winapi::um::tlhelp32::{
1531        CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1532    };
1533
1534    let mut children = Vec::new();
1535    let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1536    if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1537        return children;
1538    }
1539
1540    let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1541    entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1542
1543    if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1544        loop {
1545            if entry.th32ParentProcessID == parent_pid {
1546                let name_bytes = &entry.szExeFile;
1547                let name_len = name_bytes
1548                    .iter()
1549                    .position(|&b| b == 0)
1550                    .unwrap_or(name_bytes.len());
1551                let name = String::from_utf8_lossy(
1552                    &name_bytes[..name_len]
1553                        .iter()
1554                        .map(|&c| c as u8)
1555                        .collect::<Vec<u8>>(),
1556                )
1557                .into_owned();
1558                children.push(ChildProcessInfo {
1559                    pid: entry.th32ProcessID,
1560                    name,
1561                });
1562            }
1563            if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1564                break;
1565            }
1566        }
1567    }
1568
1569    unsafe { CloseHandle(snapshot) };
1570    children
1571}
1572
1573/// Return PIDs of all conhost.exe processes that are children of the current process.
1574#[cfg(windows)]
1575fn conhost_children_of_current_process() -> Vec<u32> {
1576    let our_pid = std::process::id();
1577    find_child_processes(our_pid)
1578        .into_iter()
1579        .filter(|c| c.name.eq_ignore_ascii_case("conhost.exe"))
1580        .map(|c| c.pid)
1581        .collect()
1582}
1583
1584/// After spawning a ConPTY child, find the new conhost.exe process that was created
1585/// by the ConPTY infrastructure (child of our process, not present in the "before"
1586/// snapshot) and assign it to the Job Object so it gets cleaned up on Job close.
1587#[cfg(windows)]
1588fn assign_conpty_conhost_to_job(job: &WindowsJobHandle, before_pids: &[u32]) {
1589    let after_pids = conhost_children_of_current_process();
1590    for pid in after_pids {
1591        if !before_pids.contains(&pid) {
1592            // This is a newly created conhost.exe — assign it to the Job.
1593            let _ = job.assign_pid(pid);
1594        }
1595    }
1596}
1597
1598/// A conhost.exe process whose parent is no longer alive — likely an orphan
1599/// from a dead ConPTY session.
1600#[cfg(windows)]
1601#[derive(Debug, Clone)]
1602pub struct OrphanConhostInfo {
1603    /// PID of the orphaned conhost.exe.
1604    pub pid: u32,
1605    /// PID that was the parent when the snapshot was taken.
1606    pub parent_pid: u32,
1607    /// Name of the parent process, if it can be resolved (empty if parent is dead).
1608    pub parent_name: String,
1609}
1610
1611/// Scan all conhost.exe processes on the system and return those whose parent
1612/// process is no longer alive. These are likely orphans from dead ConPTY sessions.
1613///
1614/// Uses `CreateToolhelp32Snapshot` for a point-in-time snapshot — no sysinfo
1615/// dependency, so it's lightweight and can be called frequently.
1616#[cfg(windows)]
1617pub fn find_orphan_conhosts() -> Vec<OrphanConhostInfo> {
1618    use winapi::um::handleapi::CloseHandle;
1619    use winapi::um::tlhelp32::{
1620        CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1621    };
1622
1623    let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1624    if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1625        return Vec::new();
1626    }
1627
1628    let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1629    entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1630
1631    // First pass: collect all PIDs and identify conhost.exe processes.
1632    let mut all_pids = std::collections::HashSet::new();
1633    let mut conhosts: Vec<(u32, u32)> = Vec::new(); // (pid, parent_pid)
1634    let mut parent_names: std::collections::HashMap<u32, String> = std::collections::HashMap::new();
1635
1636    if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1637        loop {
1638            let name_bytes = &entry.szExeFile;
1639            let name_len = name_bytes
1640                .iter()
1641                .position(|&b| b == 0)
1642                .unwrap_or(name_bytes.len());
1643            let name = String::from_utf8_lossy(
1644                &name_bytes[..name_len]
1645                    .iter()
1646                    .map(|&c| c as u8)
1647                    .collect::<Vec<u8>>(),
1648            )
1649            .into_owned();
1650
1651            all_pids.insert(entry.th32ProcessID);
1652            parent_names.insert(entry.th32ProcessID, name.clone());
1653
1654            if name.eq_ignore_ascii_case("conhost.exe") {
1655                conhosts.push((entry.th32ProcessID, entry.th32ParentProcessID));
1656            }
1657
1658            if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1659                break;
1660            }
1661        }
1662    }
1663
1664    unsafe { CloseHandle(snapshot) };
1665
1666    // Second pass: filter to conhosts whose parent PID is not in the live set.
1667    conhosts
1668        .into_iter()
1669        .filter(|&(_, parent_pid)| !all_pids.contains(&parent_pid))
1670        .map(|(pid, parent_pid)| OrphanConhostInfo {
1671            pid,
1672            parent_pid,
1673            parent_name: parent_names.get(&parent_pid).cloned().unwrap_or_default(),
1674        })
1675        .collect()
1676}
1677
1678#[cfg(windows)]
1679#[inline(never)]
1680pub fn apply_windows_pty_priority(
1681    handle: Option<std::os::windows::io::RawHandle>,
1682    nice: Option<i32>,
1683) -> Result<(), PtyError> {
1684    crate::rp_rust_debug_scope!("running_process_core::pty::apply_windows_pty_priority");
1685    use winapi::um::processthreadsapi::SetPriorityClass;
1686    use winapi::um::winbase::{
1687        ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, HIGH_PRIORITY_CLASS,
1688        IDLE_PRIORITY_CLASS,
1689    };
1690
1691    let Some(handle) = handle else {
1692        return Ok(());
1693    };
1694    let flags = match nice {
1695        Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
1696        Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
1697        Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
1698        Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
1699        _ => 0,
1700    };
1701    if flags == 0 {
1702        return Ok(());
1703    }
1704    let result = unsafe { SetPriorityClass(handle.cast(), flags) };
1705    if result == 0 {
1706        return Err(PtyError::Io(std::io::Error::last_os_error()));
1707    }
1708    Ok(())
1709}