Skip to main content

running_process_core/pty/
mod.rs

1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::io::{Read, Write};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
10use thiserror::Error;
11
12/// Re-exports for downstream crates that need portable-pty types.
13pub mod reexports {
14    pub use portable_pty;
15}
16
17#[cfg(unix)]
18mod pty_posix;
19#[cfg(windows)]
20mod pty_windows;
21
22pub mod terminal_input;
23
24#[cfg(unix)]
25use pty_posix as pty_platform;
26
27#[derive(Debug, Error)]
28pub enum PtyError {
29    #[error("pseudo-terminal process already started")]
30    AlreadyStarted,
31    #[error("pseudo-terminal process is not running")]
32    NotRunning,
33    #[error("pseudo-terminal timed out")]
34    Timeout,
35    #[error("pseudo-terminal I/O error: {0}")]
36    Io(#[from] std::io::Error),
37    #[error("pseudo-terminal spawn failed: {0}")]
38    Spawn(String),
39    #[error("pseudo-terminal error: {0}")]
40    Other(String),
41}
42
43pub fn is_ignorable_process_control_error(err: &std::io::Error) -> bool {
44    if matches!(
45        err.kind(),
46        std::io::ErrorKind::NotFound | std::io::ErrorKind::InvalidInput
47    ) {
48        return true;
49    }
50    #[cfg(unix)]
51    if err.raw_os_error() == Some(libc::ESRCH) {
52        return true;
53    }
54    false
55}
56
57pub struct PtyReadState {
58    pub chunks: VecDeque<Vec<u8>>,
59    pub closed: bool,
60}
61
62pub struct PtyReadShared {
63    pub state: Mutex<PtyReadState>,
64    pub condvar: Condvar,
65}
66
67pub struct NativePtyHandles {
68    pub master: Box<dyn MasterPty + Send>,
69    pub writer: Box<dyn Write + Send>,
70    pub child: Box<dyn portable_pty::Child + Send + Sync>,
71    #[cfg(windows)]
72    pub _job: WindowsJobHandle,
73}
74
75#[cfg(windows)]
76pub struct WindowsJobHandle(pub usize);
77
78#[cfg(windows)]
79impl WindowsJobHandle {
80    /// Assign an additional process (by PID) to this Job Object.
81    pub fn assign_pid(&self, pid: u32) -> Result<(), std::io::Error> {
82        use winapi::um::handleapi::CloseHandle;
83        use winapi::um::processthreadsapi::OpenProcess;
84        use winapi::um::winnt::PROCESS_SET_QUOTA;
85        use winapi::um::winnt::PROCESS_TERMINATE;
86
87        let handle = unsafe { OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, 0, pid) };
88        if handle.is_null() {
89            return Err(std::io::Error::last_os_error());
90        }
91        let result = unsafe {
92            winapi::um::jobapi2::AssignProcessToJobObject(
93                self.0 as winapi::shared::ntdef::HANDLE,
94                handle,
95            )
96        };
97        unsafe { CloseHandle(handle) };
98        if result == 0 {
99            return Err(std::io::Error::last_os_error());
100        }
101        Ok(())
102    }
103}
104
105#[cfg(windows)]
106impl Drop for WindowsJobHandle {
107    fn drop(&mut self) {
108        unsafe {
109            winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
110        }
111    }
112}
113
114pub struct IdleMonitorState {
115    pub last_reset_at: Instant,
116    pub returncode: Option<i32>,
117    pub interrupted: bool,
118}
119
120/// Core idle detection logic, shareable across threads via Arc.
121/// The reader thread calls `record_output` directly.
122pub struct IdleDetectorCore {
123    pub timeout_seconds: f64,
124    pub stability_window_seconds: f64,
125    pub sample_interval_seconds: f64,
126    pub reset_on_input: bool,
127    pub reset_on_output: bool,
128    pub count_control_churn_as_output: bool,
129    pub enabled: Arc<AtomicBool>,
130    pub state: Mutex<IdleMonitorState>,
131    pub condvar: Condvar,
132}
133
134impl IdleDetectorCore {
135    pub fn record_input(&self, byte_count: usize) {
136        if !self.reset_on_input || byte_count == 0 {
137            return;
138        }
139        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
140        guard.last_reset_at = Instant::now();
141        self.condvar.notify_all();
142    }
143
144    pub fn record_output(&self, data: &[u8]) {
145        if !self.reset_on_output || data.is_empty() {
146            return;
147        }
148        let control_bytes = control_churn_bytes(data);
149        let visible_output_bytes = data.len().saturating_sub(control_bytes);
150        let active_output =
151            visible_output_bytes > 0 || (self.count_control_churn_as_output && control_bytes > 0);
152        if !active_output {
153            return;
154        }
155        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
156        guard.last_reset_at = Instant::now();
157        self.condvar.notify_all();
158    }
159
160    pub fn mark_exit(&self, returncode: i32, interrupted: bool) {
161        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
162        guard.returncode = Some(returncode);
163        guard.interrupted = interrupted;
164        self.condvar.notify_all();
165    }
166
167    pub fn enabled(&self) -> bool {
168        self.enabled.load(Ordering::Acquire)
169    }
170
171    pub fn set_enabled(&self, enabled: bool) {
172        let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
173        if enabled && !was_enabled {
174            let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
175            guard.last_reset_at = Instant::now();
176        }
177        self.condvar.notify_all();
178    }
179
180    pub fn wait(&self, timeout: Option<f64>) -> (bool, String, f64, Option<i32>) {
181        let started = Instant::now();
182        let overall_timeout = timeout.map(Duration::from_secs_f64);
183        let min_idle = self.timeout_seconds.max(self.stability_window_seconds);
184        let sample_interval = Duration::from_secs_f64(self.sample_interval_seconds.max(0.001));
185
186        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
187        loop {
188            let now = Instant::now();
189            let idle_for = now.duration_since(guard.last_reset_at).as_secs_f64();
190
191            if let Some(returncode) = guard.returncode {
192                let reason = if guard.interrupted {
193                    "interrupt"
194                } else {
195                    "process_exit"
196                };
197                return (false, reason.to_string(), idle_for, Some(returncode));
198            }
199
200            let enabled = self.enabled.load(Ordering::Acquire);
201            if enabled && idle_for >= min_idle {
202                return (true, "idle_timeout".to_string(), idle_for, None);
203            }
204
205            if let Some(limit) = overall_timeout {
206                if now.duration_since(started) >= limit {
207                    return (false, "timeout".to_string(), idle_for, None);
208                }
209            }
210
211            let idle_remaining = if enabled {
212                (min_idle - idle_for).max(0.0)
213            } else {
214                sample_interval.as_secs_f64()
215            };
216            let mut wait_for =
217                sample_interval.min(Duration::from_secs_f64(idle_remaining.max(0.001)));
218            if let Some(limit) = overall_timeout {
219                let elapsed = now.duration_since(started);
220                if elapsed < limit {
221                    let remaining = limit - elapsed;
222                    wait_for = wait_for.min(remaining);
223                }
224            }
225            let result = self
226                .condvar
227                .wait_timeout(guard, wait_for)
228                .expect("idle monitor mutex poisoned");
229            guard = result.0;
230        }
231    }
232}
233
234pub struct NativePtyProcess {
235    pub argv: Vec<String>,
236    pub cwd: Option<String>,
237    pub env: Option<Vec<(String, String)>>,
238    pub rows: u16,
239    pub cols: u16,
240    #[cfg(windows)]
241    pub nice: Option<i32>,
242    pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
243    pub reader: Arc<PtyReadShared>,
244    pub returncode: Arc<Mutex<Option<i32>>>,
245    pub input_bytes_total: Arc<AtomicUsize>,
246    pub newline_events_total: Arc<AtomicUsize>,
247    pub submit_events_total: Arc<AtomicUsize>,
248    /// When true, the reader thread writes PTY output to stdout.
249    pub echo: Arc<AtomicBool>,
250    /// When set, the reader thread feeds output directly to the idle detector.
251    pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
252    /// Visible (non-control) output bytes seen by the reader thread.
253    pub output_bytes_total: Arc<AtomicUsize>,
254    /// Control churn bytes (ANSI escapes, BS, CR, DEL) seen by the reader.
255    pub control_churn_bytes_total: Arc<AtomicUsize>,
256    pub reader_worker: Mutex<Option<thread::JoinHandle<()>>>,
257    pub terminal_input_relay_stop: Arc<AtomicBool>,
258    pub terminal_input_relay_active: Arc<AtomicBool>,
259    pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
260}
261
262fn resolved_spawn_cwd(cwd: Option<&str>) -> Option<String> {
263    cwd.map(str::to_owned).or_else(|| {
264        std::env::current_dir()
265            .ok()
266            .map(|cwd| cwd.to_string_lossy().to_string())
267    })
268}
269
270impl NativePtyProcess {
271    pub fn new(
272        argv: Vec<String>,
273        cwd: Option<String>,
274        env: Option<Vec<(String, String)>>,
275        rows: u16,
276        cols: u16,
277        nice: Option<i32>,
278    ) -> Result<Self, PtyError> {
279        if argv.is_empty() {
280            return Err(PtyError::Other("command cannot be empty".into()));
281        }
282        #[cfg(not(windows))]
283        let _ = nice;
284        Ok(Self {
285            argv,
286            cwd,
287            env,
288            rows,
289            cols,
290            #[cfg(windows)]
291            nice,
292            handles: Arc::new(Mutex::new(None)),
293            reader: Arc::new(PtyReadShared {
294                state: Mutex::new(PtyReadState {
295                    chunks: VecDeque::new(),
296                    closed: false,
297                }),
298                condvar: Condvar::new(),
299            }),
300            returncode: Arc::new(Mutex::new(None)),
301            input_bytes_total: Arc::new(AtomicUsize::new(0)),
302            newline_events_total: Arc::new(AtomicUsize::new(0)),
303            submit_events_total: Arc::new(AtomicUsize::new(0)),
304            echo: Arc::new(AtomicBool::new(false)),
305            idle_detector: Arc::new(Mutex::new(None)),
306            output_bytes_total: Arc::new(AtomicUsize::new(0)),
307            control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
308            reader_worker: Mutex::new(None),
309            terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
310            terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
311            terminal_input_relay_worker: Mutex::new(None),
312        })
313    }
314
315    pub fn mark_reader_closed(&self) {
316        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
317        guard.closed = true;
318        self.reader.condvar.notify_all();
319    }
320
321    pub fn store_returncode(&self, code: i32) {
322        store_pty_returncode(&self.returncode, code);
323    }
324
325    fn join_reader_worker(&self) {
326        if let Some(worker) = self
327            .reader_worker
328            .lock()
329            .expect("pty reader worker mutex poisoned")
330            .take()
331        {
332            let _ = worker.join();
333        }
334    }
335
336    pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
337        record_pty_input_metrics(
338            &self.input_bytes_total,
339            &self.newline_events_total,
340            &self.submit_events_total,
341            data,
342            submit,
343        );
344    }
345
346    pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
347        self.record_input_metrics(data, submit);
348        write_pty_input(&self.handles, data)?;
349        Ok(())
350    }
351
352    pub fn request_terminal_input_relay_stop(&self) {
353        self.terminal_input_relay_stop
354            .store(true, Ordering::Release);
355        self.terminal_input_relay_active
356            .store(false, Ordering::Release);
357    }
358
359    pub fn start_terminal_input_relay_impl(&self) -> Result<(), PtyError> {
360        let mut worker_guard = self
361            .terminal_input_relay_worker
362            .lock()
363            .expect("pty terminal input relay mutex poisoned");
364        if worker_guard.is_some() && self.terminal_input_relay_active() {
365            return Ok(());
366        }
367        if self
368            .handles
369            .lock()
370            .expect("pty handles mutex poisoned")
371            .is_none()
372        {
373            return Err(PtyError::NotRunning);
374        }
375
376        self.terminal_input_relay_stop
377            .store(false, Ordering::Release);
378        self.terminal_input_relay_active
379            .store(true, Ordering::Release);
380
381        let handles = Arc::clone(&self.handles);
382        let returncode = Arc::clone(&self.returncode);
383        let input_bytes_total = Arc::clone(&self.input_bytes_total);
384        let newline_events_total = Arc::clone(&self.newline_events_total);
385        let submit_events_total = Arc::clone(&self.submit_events_total);
386        let stop = Arc::clone(&self.terminal_input_relay_stop);
387        let active = Arc::clone(&self.terminal_input_relay_active);
388
389        #[cfg(windows)]
390        {
391            let capture = terminal_input::TerminalInputCore::new();
392            capture.start_impl().map_err(PtyError::Io)?;
393            *worker_guard = Some(thread::spawn(move || {
394                loop {
395                    if stop.load(Ordering::Acquire) {
396                        break;
397                    }
398                    match poll_pty_process(&handles, &returncode) {
399                        Ok(Some(_)) => break,
400                        Ok(None) => {}
401                        Err(_) => break,
402                    }
403                    match terminal_input::wait_for_terminal_input_event(
404                        &capture.state,
405                        &capture.condvar,
406                        Some(Duration::from_millis(50)),
407                    ) {
408                        terminal_input::TerminalInputWaitOutcome::Event(event) => {
409                            record_pty_input_metrics(
410                                &input_bytes_total,
411                                &newline_events_total,
412                                &submit_events_total,
413                                &event.data,
414                                event.submit,
415                            );
416                            if write_pty_input(&handles, &event.data).is_err() {
417                                break;
418                            }
419                        }
420                        terminal_input::TerminalInputWaitOutcome::Timeout => continue,
421                        terminal_input::TerminalInputWaitOutcome::Closed => break,
422                    }
423                }
424                active.store(false, Ordering::Release);
425                let _ = capture.stop_impl();
426            }));
427            Ok(())
428        }
429
430        #[cfg(unix)]
431        {
432            if unsafe { libc::isatty(libc::STDIN_FILENO) } != 1 {
433                self.terminal_input_relay_active
434                    .store(false, Ordering::Release);
435                return Ok(());
436            }
437
438            *worker_guard = Some(thread::spawn(move || {
439                posix_terminal_input_relay_worker(
440                    handles,
441                    returncode,
442                    input_bytes_total,
443                    newline_events_total,
444                    submit_events_total,
445                    stop,
446                    active,
447                );
448            }));
449            Ok(())
450        }
451    }
452
453    pub fn stop_terminal_input_relay_impl(&self) {
454        self.request_terminal_input_relay_stop();
455        if let Some(worker) = self
456            .terminal_input_relay_worker
457            .lock()
458            .expect("pty terminal input relay mutex poisoned")
459            .take()
460        {
461            let _ = worker.join();
462        }
463    }
464
465    pub fn terminal_input_relay_active(&self) -> bool {
466        self.terminal_input_relay_active.load(Ordering::Acquire)
467    }
468
469    /// Synchronously tear down the PTY and reap the child.
470    #[inline(never)]
471    pub fn close_impl(&self) -> Result<(), PtyError> {
472        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_impl");
473        self.stop_terminal_input_relay_impl();
474        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
475        let Some(handles) = guard.take() else {
476            self.mark_reader_closed();
477            return Ok(());
478        };
479        drop(guard);
480
481        #[cfg(windows)]
482        let NativePtyHandles {
483            master,
484            writer,
485            mut child,
486            _job,
487        } = handles;
488        #[cfg(not(windows))]
489        let NativePtyHandles {
490            master,
491            writer,
492            mut child,
493        } = handles;
494
495        #[cfg(windows)]
496        {
497            {
498                crate::rp_rust_debug_scope!(
499                    "running_process_core::NativePtyProcess::close_impl.drop_job"
500                );
501                drop(_job);
502            }
503
504            {
505                crate::rp_rust_debug_scope!(
506                    "running_process_core::NativePtyProcess::close_impl.wait_job_exit"
507                );
508                let wait_deadline = Instant::now() + Duration::from_secs(2);
509                loop {
510                    match child.try_wait() {
511                        Ok(Some(status)) => {
512                            let code = portable_exit_code(status);
513                            self.store_returncode(code);
514                            break;
515                        }
516                        Ok(None) if Instant::now() < wait_deadline => {
517                            thread::sleep(Duration::from_millis(10));
518                        }
519                        Ok(None) => {
520                            if let Err(err) = child.kill() {
521                                if !is_ignorable_process_control_error(&err) {
522                                    return Err(PtyError::Io(err));
523                                }
524                            }
525                            let code = match child.wait() {
526                                Ok(status) => portable_exit_code(status),
527                                Err(_) => -9,
528                            };
529                            self.store_returncode(code);
530                            break;
531                        }
532                        Err(_) => {
533                            self.store_returncode(-9);
534                            break;
535                        }
536                    }
537                }
538            }
539            {
540                crate::rp_rust_debug_scope!(
541                    "running_process_core::NativePtyProcess::close_impl.drop_writer"
542                );
543                drop(writer);
544            }
545            {
546                crate::rp_rust_debug_scope!(
547                    "running_process_core::NativePtyProcess::close_impl.drop_master"
548                );
549                drop(master);
550            }
551            drop(child);
552            {
553                crate::rp_rust_debug_scope!(
554                    "running_process_core::NativePtyProcess::close_impl.join_reader"
555                );
556                self.join_reader_worker();
557            }
558            self.mark_reader_closed();
559            Ok(())
560        }
561
562        #[cfg(not(windows))]
563        {
564            drop(writer);
565            drop(master);
566
567            let code = {
568                crate::rp_rust_debug_scope!(
569                    "running_process_core::NativePtyProcess::close_impl.wait_child"
570                );
571                match child.wait() {
572                    Ok(status) => portable_exit_code(status),
573                    Err(_) => -9,
574                }
575            };
576            drop(child);
577
578            self.store_returncode(code);
579            {
580                crate::rp_rust_debug_scope!(
581                    "running_process_core::NativePtyProcess::close_impl.join_reader"
582                );
583                self.join_reader_worker();
584            }
585            self.mark_reader_closed();
586            Ok(())
587        }
588    }
589
590    /// Best-effort, non-blocking teardown for use from `Drop`.
591    #[inline(never)]
592    pub fn close_nonblocking(&self) {
593        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_nonblocking");
594        #[cfg(windows)]
595        self.request_terminal_input_relay_stop();
596        let Ok(mut guard) = self.handles.lock() else {
597            return;
598        };
599        let Some(handles) = guard.take() else {
600            self.mark_reader_closed();
601            return;
602        };
603        drop(guard);
604
605        #[cfg(windows)]
606        let NativePtyHandles {
607            master,
608            writer,
609            mut child,
610            _job,
611        } = handles;
612        #[cfg(not(windows))]
613        let NativePtyHandles {
614            master,
615            writer,
616            mut child,
617        } = handles;
618
619        if let Err(err) = child.kill() {
620            if !is_ignorable_process_control_error(&err) {
621                return;
622            }
623        }
624        drop(writer);
625        drop(master);
626        drop(child);
627        #[cfg(windows)]
628        drop(_job);
629        self.mark_reader_closed();
630    }
631
632    pub fn start_impl(&self) -> Result<(), PtyError> {
633        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::start");
634        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
635        if guard.is_some() {
636            return Err(PtyError::AlreadyStarted);
637        }
638
639        // Snapshot our conhost.exe children before openpty() so we can diff
640        // after spawn to find the new conhost.exe created by ConPTY.
641        #[cfg(windows)]
642        let conhost_pids_before = conhost_children_of_current_process();
643
644        let pty_system = native_pty_system();
645        let pair = pty_system
646            .openpty(PtySize {
647                rows: self.rows,
648                cols: self.cols,
649                pixel_width: 0,
650                pixel_height: 0,
651            })
652            .map_err(|e| PtyError::Spawn(e.to_string()))?;
653
654        let mut cmd = command_builder_from_argv(&self.argv);
655        let cwd = resolved_spawn_cwd(self.cwd.as_deref());
656        if let Some(cwd) = &cwd {
657            cmd.cwd(cwd);
658        }
659        if let Some(env) = &self.env {
660            cmd.env_clear();
661            for (key, value) in env {
662                cmd.env(key, value);
663            }
664        }
665
666        let reader = pair
667            .master
668            .try_clone_reader()
669            .map_err(|e| PtyError::Spawn(e.to_string()))?;
670        let writer = pair
671            .master
672            .take_writer()
673            .map_err(|e| PtyError::Spawn(e.to_string()))?;
674        let child = pair
675            .slave
676            .spawn_command(cmd)
677            .map_err(|e| PtyError::Spawn(e.to_string()))?;
678        #[cfg(windows)]
679        let job = assign_child_to_windows_kill_on_close_job(child.as_raw_handle())?;
680        #[cfg(windows)]
681        assign_conpty_conhost_to_job(&job, &conhost_pids_before);
682        #[cfg(windows)]
683        apply_windows_pty_priority(child.as_raw_handle(), self.nice)?;
684        let shared = Arc::clone(&self.reader);
685        let echo = Arc::clone(&self.echo);
686        let idle_detector = Arc::clone(&self.idle_detector);
687        let output_bytes = Arc::clone(&self.output_bytes_total);
688        let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
689        let reader_worker = thread::spawn(move || {
690            spawn_pty_reader(
691                reader,
692                shared,
693                echo,
694                idle_detector,
695                output_bytes,
696                churn_bytes,
697            );
698        });
699        *self
700            .reader_worker
701            .lock()
702            .expect("pty reader worker mutex poisoned") = Some(reader_worker);
703
704        *guard = Some(NativePtyHandles {
705            master: pair.master,
706            writer,
707            child,
708            #[cfg(windows)]
709            _job: job,
710        });
711        Ok(())
712    }
713
714    pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
715        #[cfg(windows)]
716        {
717            pty_windows::respond_to_queries(self, data)
718        }
719
720        #[cfg(unix)]
721        {
722            pty_platform::respond_to_queries(self, data)
723        }
724    }
725
726    pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
727        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::resize");
728        let guard = self.handles.lock().expect("pty handles mutex poisoned");
729        if let Some(handles) = guard.as_ref() {
730            #[cfg(windows)]
731            {
732                let _ = (rows, cols, handles);
733                // ConPTY resize can leave ClosePseudoConsole blocked during
734                // teardown on Windows. Keep resize as a no-op until the
735                // backend can cancel the outstanding PTY read safely.
736                return Ok(());
737            }
738
739            #[cfg(not(windows))]
740            handles
741                .master
742                .resize(PtySize {
743                    rows,
744                    cols,
745                    pixel_width: 0,
746                    pixel_height: 0,
747                })
748                .map_err(|e| PtyError::Other(e.to_string()))?;
749        }
750        Ok(())
751    }
752
753    pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
754        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::send_interrupt");
755        #[cfg(windows)]
756        {
757            pty_windows::send_interrupt(self)
758        }
759
760        #[cfg(unix)]
761        {
762            pty_platform::send_interrupt(self)
763        }
764    }
765
766    pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
767        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::wait");
768        // Fast path: already exited.
769        if let Some(code) = *self
770            .returncode
771            .lock()
772            .expect("pty returncode mutex poisoned")
773        {
774            return Ok(code);
775        }
776        let start = Instant::now();
777        loop {
778            if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
779                return Ok(code);
780            }
781            if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
782                return Err(PtyError::Timeout);
783            }
784            thread::sleep(Duration::from_millis(10));
785        }
786    }
787
788    pub fn terminate_impl(&self) -> Result<(), PtyError> {
789        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate");
790        #[cfg(windows)]
791        {
792            if self
793                .handles
794                .lock()
795                .expect("pty handles mutex poisoned")
796                .is_none()
797            {
798                return Err(PtyError::NotRunning);
799            }
800            self.close_impl()
801        }
802
803        #[cfg(unix)]
804        {
805            pty_platform::terminate(self)
806        }
807    }
808
809    pub fn kill_impl(&self) -> Result<(), PtyError> {
810        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill");
811        #[cfg(windows)]
812        {
813            if self
814                .handles
815                .lock()
816                .expect("pty handles mutex poisoned")
817                .is_none()
818            {
819                return Err(PtyError::NotRunning);
820            }
821            self.close_impl()
822        }
823
824        #[cfg(unix)]
825        {
826            pty_platform::kill(self)
827        }
828    }
829
830    pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
831        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate_tree");
832        #[cfg(windows)]
833        {
834            pty_windows::terminate_tree(self)
835        }
836
837        #[cfg(unix)]
838        {
839            pty_platform::terminate_tree(self)
840        }
841    }
842
843    pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
844        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill_tree");
845        #[cfg(windows)]
846        {
847            pty_windows::kill_tree(self)
848        }
849
850        #[cfg(unix)]
851        {
852            pty_platform::kill_tree(self)
853        }
854    }
855
856    /// Get the PID of the child process, if running.
857    pub fn pid(&self) -> Result<Option<u32>, PtyError> {
858        let guard = self.handles.lock().expect("pty handles mutex poisoned");
859        if let Some(handles) = guard.as_ref() {
860            #[cfg(unix)]
861            if let Some(pid) = handles.master.process_group_leader() {
862                if let Ok(pid) = u32::try_from(pid) {
863                    return Ok(Some(pid));
864                }
865            }
866            return Ok(handles.child.process_id());
867        }
868        Ok(None)
869    }
870
871    /// Wait for a chunk of output from the PTY reader.
872    /// Returns `Ok(Some(chunk))` on data, `Ok(None)` on timeout, `Err` on closed.
873    pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
874        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
875        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
876        loop {
877            if let Some(chunk) = guard.chunks.pop_front() {
878                return Ok(Some(chunk));
879            }
880            if guard.closed {
881                return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
882            }
883            match deadline {
884                Some(deadline) => {
885                    let now = Instant::now();
886                    if now >= deadline {
887                        return Ok(None); // timeout
888                    }
889                    let wait = deadline.saturating_duration_since(now);
890                    let result = self
891                        .reader
892                        .condvar
893                        .wait_timeout(guard, wait)
894                        .expect("pty read mutex poisoned");
895                    guard = result.0;
896                }
897                None => {
898                    guard = self
899                        .reader
900                        .condvar
901                        .wait(guard)
902                        .expect("pty read mutex poisoned");
903                }
904            }
905        }
906    }
907
908    /// Wait for the reader thread to close.
909    pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
910        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
911        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
912        loop {
913            if guard.closed {
914                return true;
915            }
916            match deadline {
917                Some(deadline) => {
918                    let now = Instant::now();
919                    if now >= deadline {
920                        return false;
921                    }
922                    let wait = deadline.saturating_duration_since(now);
923                    let result = self
924                        .reader
925                        .condvar
926                        .wait_timeout(guard, wait)
927                        .expect("pty read mutex poisoned");
928                    guard = result.0;
929                }
930                None => {
931                    guard = self
932                        .reader
933                        .condvar
934                        .wait(guard)
935                        .expect("pty read mutex poisoned");
936                }
937            }
938        }
939    }
940
941    /// Wait for exit then drain remaining output.
942    pub fn wait_and_drain_impl(
943        &self,
944        timeout: Option<f64>,
945        drain_timeout: f64,
946    ) -> Result<i32, PtyError> {
947        let code = self.wait_impl(timeout)?;
948        let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
949        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
950        while !guard.closed {
951            let remaining = deadline.saturating_duration_since(Instant::now());
952            if remaining.is_zero() {
953                break;
954            }
955            let result = self
956                .reader
957                .condvar
958                .wait_timeout(guard, remaining)
959                .expect("pty read mutex poisoned");
960            guard = result.0;
961        }
962        Ok(code)
963    }
964
965    pub fn set_echo(&self, enabled: bool) {
966        self.echo.store(enabled, Ordering::Release);
967    }
968
969    pub fn echo_enabled(&self) -> bool {
970        self.echo.load(Ordering::Acquire)
971    }
972
973    pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
974        let mut guard = self
975            .idle_detector
976            .lock()
977            .expect("idle detector mutex poisoned");
978        *guard = Some(Arc::clone(detector));
979    }
980
981    pub fn detach_idle_detector(&self) {
982        let mut guard = self
983            .idle_detector
984            .lock()
985            .expect("idle detector mutex poisoned");
986        *guard = None;
987    }
988
989    pub fn pty_input_bytes_total(&self) -> usize {
990        self.input_bytes_total.load(Ordering::Acquire)
991    }
992
993    pub fn pty_newline_events_total(&self) -> usize {
994        self.newline_events_total.load(Ordering::Acquire)
995    }
996
997    pub fn pty_submit_events_total(&self) -> usize {
998        self.submit_events_total.load(Ordering::Acquire)
999    }
1000
1001    pub fn pty_output_bytes_total(&self) -> usize {
1002        self.output_bytes_total.load(Ordering::Acquire)
1003    }
1004
1005    pub fn pty_control_churn_bytes_total(&self) -> usize {
1006        self.control_churn_bytes_total.load(Ordering::Acquire)
1007    }
1008}
1009
1010/// Safe defaults for a real interactive PTY session.
1011///
1012/// The helper turns on the parts that a terminal-style session usually needs:
1013/// output echo, terminal input relay, and automatic PTY query replies.
1014#[derive(Debug, Clone, Copy)]
1015pub struct InteractivePtyOptions {
1016    pub echo_output: bool,
1017    pub relay_terminal_input: bool,
1018    pub respond_to_queries: bool,
1019}
1020
1021impl Default for InteractivePtyOptions {
1022    fn default() -> Self {
1023        Self {
1024            echo_output: true,
1025            relay_terminal_input: true,
1026            respond_to_queries: true,
1027        }
1028    }
1029}
1030
1031#[derive(Debug, Default)]
1032pub struct InteractivePtyPumpResult {
1033    pub chunks: Vec<Vec<u8>>,
1034    pub stream_closed: bool,
1035}
1036
1037/// Canonical interactive PTY recipe for downstream Rust consumers.
1038///
1039/// `NativePtyProcess` remains the low-level primitive. This wrapper owns the
1040/// interactive setup that callers commonly forget to assemble correctly.
1041pub struct InteractivePtySession {
1042    process: NativePtyProcess,
1043    options: InteractivePtyOptions,
1044}
1045
1046impl InteractivePtySession {
1047    pub fn new(process: NativePtyProcess) -> Self {
1048        Self::with_options(process, InteractivePtyOptions::default())
1049    }
1050
1051    pub fn with_options(process: NativePtyProcess, options: InteractivePtyOptions) -> Self {
1052        Self { process, options }
1053    }
1054
1055    pub fn process(&self) -> &NativePtyProcess {
1056        &self.process
1057    }
1058
1059    pub fn start(&self) -> Result<(), PtyError> {
1060        self.process.set_echo(self.options.echo_output);
1061        self.process.start_impl()?;
1062        if self.options.relay_terminal_input {
1063            self.process.start_terminal_input_relay_impl()?;
1064        }
1065        Ok(())
1066    }
1067
1068    pub fn pump_output(
1069        &self,
1070        timeout: Option<f64>,
1071        consume_all: bool,
1072    ) -> Result<InteractivePtyPumpResult, PtyError> {
1073        let mut pumped = InteractivePtyPumpResult::default();
1074        let mut next_timeout = timeout;
1075        loop {
1076            match self.process.read_chunk_impl(next_timeout) {
1077                Ok(Some(chunk)) => {
1078                    if self.options.respond_to_queries {
1079                        self.process.respond_to_queries_impl(&chunk)?;
1080                    }
1081                    pumped.chunks.push(chunk);
1082                    if !consume_all {
1083                        break;
1084                    }
1085                    next_timeout = Some(0.0);
1086                }
1087                Ok(None) => break,
1088                Err(PtyError::Other(message)) if message == "Pseudo-terminal stream is closed" => {
1089                    pumped.stream_closed = true;
1090                    break;
1091                }
1092                Err(err) => return Err(err),
1093            }
1094        }
1095        Ok(pumped)
1096    }
1097
1098    pub fn resize(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
1099        self.process.resize_impl(rows, cols)
1100    }
1101
1102    pub fn send_interrupt(&self) -> Result<(), PtyError> {
1103        self.process.send_interrupt_impl()
1104    }
1105
1106    pub fn wait(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
1107        self.process.wait_impl(timeout)
1108    }
1109
1110    pub fn wait_and_drain(
1111        &self,
1112        timeout: Option<f64>,
1113        drain_timeout: f64,
1114    ) -> Result<i32, PtyError> {
1115        self.process.wait_and_drain_impl(timeout, drain_timeout)
1116    }
1117
1118    pub fn terminate(&self) -> Result<(), PtyError> {
1119        self.process.terminate_impl()
1120    }
1121
1122    pub fn kill(&self) -> Result<(), PtyError> {
1123        self.process.kill_impl()
1124    }
1125
1126    pub fn close(&self) -> Result<(), PtyError> {
1127        self.process.close_impl()
1128    }
1129}
1130
1131impl Drop for NativePtyProcess {
1132    fn drop(&mut self) {
1133        self.close_nonblocking();
1134    }
1135}
1136
1137// ── Helper functions ──
1138
1139pub fn control_churn_bytes(data: &[u8]) -> usize {
1140    let mut total = 0;
1141    let mut index = 0;
1142    while index < data.len() {
1143        let byte = data[index];
1144        if byte == 0x1B {
1145            let start = index;
1146            index += 1;
1147            if index < data.len() && data[index] == b'[' {
1148                index += 1;
1149                while index < data.len() {
1150                    let current = data[index];
1151                    index += 1;
1152                    if (0x40..=0x7E).contains(&current) {
1153                        break;
1154                    }
1155                }
1156            }
1157            total += index - start;
1158            continue;
1159        }
1160        if matches!(byte, 0x08 | 0x0D | 0x7F) {
1161            total += 1;
1162        }
1163        index += 1;
1164    }
1165    total
1166}
1167
1168pub fn command_builder_from_argv(argv: &[String]) -> CommandBuilder {
1169    let mut command = CommandBuilder::new(&argv[0]);
1170    if argv.len() > 1 {
1171        command.args(
1172            argv[1..]
1173                .iter()
1174                .map(OsString::from)
1175                .collect::<Vec<OsString>>(),
1176        );
1177    }
1178    command
1179}
1180
1181#[inline(never)]
1182pub fn spawn_pty_reader(
1183    mut reader: Box<dyn Read + Send>,
1184    shared: Arc<PtyReadShared>,
1185    echo: Arc<AtomicBool>,
1186    idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
1187    output_bytes_total: Arc<AtomicUsize>,
1188    control_churn_bytes_total: Arc<AtomicUsize>,
1189) {
1190    crate::rp_rust_debug_scope!("running_process_core::spawn_pty_reader");
1191    let idle_detector_snapshot = idle_detector
1192        .lock()
1193        .expect("idle detector mutex poisoned")
1194        .clone();
1195    let mut chunk = vec![0_u8; 65536];
1196    loop {
1197        match reader.read(&mut chunk) {
1198            Ok(0) => break,
1199            Ok(n) => {
1200                let data = &chunk[..n];
1201
1202                let churn = control_churn_bytes(data);
1203                let visible = data.len().saturating_sub(churn);
1204                output_bytes_total.fetch_add(visible, Ordering::Relaxed);
1205                control_churn_bytes_total.fetch_add(churn, Ordering::Relaxed);
1206
1207                if echo.load(Ordering::Relaxed) {
1208                    let _ = std::io::stdout().write_all(data);
1209                    let _ = std::io::stdout().flush();
1210                }
1211
1212                if let Some(ref detector) = idle_detector_snapshot {
1213                    detector.record_output(data);
1214                }
1215
1216                let mut guard = shared.state.lock().expect("pty read mutex poisoned");
1217                guard.chunks.push_back(data.to_vec());
1218                shared.condvar.notify_all();
1219            }
1220            Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
1221            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
1222                thread::sleep(Duration::from_millis(10));
1223                continue;
1224            }
1225            Err(_) => break,
1226        }
1227    }
1228    let mut guard = shared.state.lock().expect("pty read mutex poisoned");
1229    guard.closed = true;
1230    shared.condvar.notify_all();
1231}
1232
1233pub fn portable_exit_code(status: portable_pty::ExitStatus) -> i32 {
1234    if let Some(signal) = status.signal() {
1235        let signal = signal.to_ascii_lowercase();
1236        if signal.contains("interrupt") {
1237            return -2;
1238        }
1239        if signal.contains("terminated") {
1240            return -15;
1241        }
1242        if signal.contains("killed") {
1243            return -9;
1244        }
1245    }
1246    status.exit_code() as i32
1247}
1248
1249pub fn input_contains_newline(data: &[u8]) -> bool {
1250    data.iter().any(|byte| matches!(*byte, b'\r' | b'\n'))
1251}
1252
1253#[cfg(unix)]
1254struct PosixTerminalModeGuard {
1255    stdin_fd: i32,
1256    original_mode: libc::termios,
1257}
1258
1259#[cfg(unix)]
1260impl Drop for PosixTerminalModeGuard {
1261    fn drop(&mut self) {
1262        unsafe {
1263            libc::tcsetattr(self.stdin_fd, libc::TCSANOW, &self.original_mode);
1264        }
1265    }
1266}
1267
1268#[cfg(unix)]
1269fn acquire_posix_terminal_mode_guard() -> Result<PosixTerminalModeGuard, std::io::Error> {
1270    let stdin_fd = libc::STDIN_FILENO;
1271    let mut original_mode = unsafe { std::mem::zeroed::<libc::termios>() };
1272    if unsafe { libc::tcgetattr(stdin_fd, &mut original_mode) } != 0 {
1273        return Err(std::io::Error::last_os_error());
1274    }
1275    let mut raw_mode = original_mode;
1276    unsafe {
1277        libc::cfmakeraw(&mut raw_mode);
1278    }
1279    if unsafe { libc::tcsetattr(stdin_fd, libc::TCSANOW, &raw_mode) } != 0 {
1280        return Err(std::io::Error::last_os_error());
1281    }
1282    Ok(PosixTerminalModeGuard {
1283        stdin_fd,
1284        original_mode,
1285    })
1286}
1287
1288#[cfg(unix)]
1289#[inline(never)]
1290fn posix_terminal_input_relay_worker(
1291    handles: Arc<Mutex<Option<NativePtyHandles>>>,
1292    returncode: Arc<Mutex<Option<i32>>>,
1293    input_bytes_total: Arc<AtomicUsize>,
1294    newline_events_total: Arc<AtomicUsize>,
1295    submit_events_total: Arc<AtomicUsize>,
1296    stop: Arc<AtomicBool>,
1297    active: Arc<AtomicBool>,
1298) {
1299    let _terminal_guard = match acquire_posix_terminal_mode_guard() {
1300        Ok(guard) => guard,
1301        Err(_) => {
1302            active.store(false, Ordering::Release);
1303            return;
1304        }
1305    };
1306
1307    let stdin_fd = libc::STDIN_FILENO;
1308    let mut buffer = vec![0_u8; 65536];
1309    loop {
1310        if stop.load(Ordering::Acquire) {
1311            break;
1312        }
1313        match poll_pty_process(&handles, &returncode) {
1314            Ok(Some(_)) => break,
1315            Ok(None) => {}
1316            Err(_) => break,
1317        }
1318
1319        let mut pollfd = libc::pollfd {
1320            fd: stdin_fd,
1321            events: libc::POLLIN,
1322            revents: 0,
1323        };
1324        let poll_result = unsafe { libc::poll(&mut pollfd, 1, 50) };
1325        if poll_result < 0 {
1326            let err = std::io::Error::last_os_error();
1327            if err.kind() == std::io::ErrorKind::Interrupted {
1328                continue;
1329            }
1330            break;
1331        }
1332        if poll_result == 0 || pollfd.revents & libc::POLLIN == 0 {
1333            continue;
1334        }
1335
1336        let read_result = unsafe { libc::read(stdin_fd, buffer.as_mut_ptr().cast(), buffer.len()) };
1337        if read_result < 0 {
1338            let err = std::io::Error::last_os_error();
1339            if err.kind() == std::io::ErrorKind::Interrupted {
1340                continue;
1341            }
1342            break;
1343        }
1344        if read_result == 0 {
1345            continue;
1346        }
1347
1348        let mut data = buffer[..read_result as usize].to_vec();
1349        loop {
1350            let mut drain_pollfd = libc::pollfd {
1351                fd: stdin_fd,
1352                events: libc::POLLIN,
1353                revents: 0,
1354            };
1355            let drain_ready = unsafe { libc::poll(&mut drain_pollfd, 1, 0) };
1356            if drain_ready <= 0 || drain_pollfd.revents & libc::POLLIN == 0 {
1357                break;
1358            }
1359            let drain_result =
1360                unsafe { libc::read(stdin_fd, buffer.as_mut_ptr().cast(), buffer.len()) };
1361            if drain_result <= 0 {
1362                break;
1363            }
1364            data.extend_from_slice(&buffer[..drain_result as usize]);
1365        }
1366
1367        record_pty_input_metrics(
1368            &input_bytes_total,
1369            &newline_events_total,
1370            &submit_events_total,
1371            &data,
1372            input_contains_newline(&data),
1373        );
1374        if write_pty_input(&handles, &data).is_err() {
1375            break;
1376        }
1377    }
1378
1379    active.store(false, Ordering::Release);
1380}
1381
1382pub fn record_pty_input_metrics(
1383    input_bytes_total: &Arc<AtomicUsize>,
1384    newline_events_total: &Arc<AtomicUsize>,
1385    submit_events_total: &Arc<AtomicUsize>,
1386    data: &[u8],
1387    submit: bool,
1388) {
1389    input_bytes_total.fetch_add(data.len(), Ordering::AcqRel);
1390    if input_contains_newline(data) {
1391        newline_events_total.fetch_add(1, Ordering::AcqRel);
1392    }
1393    if submit {
1394        submit_events_total.fetch_add(1, Ordering::AcqRel);
1395    }
1396}
1397
1398pub fn store_pty_returncode(returncode: &Arc<Mutex<Option<i32>>>, code: i32) {
1399    *returncode.lock().expect("pty returncode mutex poisoned") = Some(code);
1400}
1401
1402pub fn poll_pty_process(
1403    handles: &Arc<Mutex<Option<NativePtyHandles>>>,
1404    returncode: &Arc<Mutex<Option<i32>>>,
1405) -> Result<Option<i32>, std::io::Error> {
1406    let mut guard = handles.lock().expect("pty handles mutex poisoned");
1407    let Some(handles) = guard.as_mut() else {
1408        return Ok(*returncode.lock().expect("pty returncode mutex poisoned"));
1409    };
1410    let status = handles.child.try_wait()?;
1411    let code = status.map(portable_exit_code);
1412    if let Some(code) = code {
1413        store_pty_returncode(returncode, code);
1414        return Ok(Some(code));
1415    }
1416    Ok(None)
1417}
1418
1419pub fn write_pty_input(
1420    handles: &Arc<Mutex<Option<NativePtyHandles>>>,
1421    data: &[u8],
1422) -> Result<(), std::io::Error> {
1423    let mut guard = handles.lock().expect("pty handles mutex poisoned");
1424    let handles = guard.as_mut().ok_or_else(|| {
1425        std::io::Error::new(
1426            std::io::ErrorKind::NotConnected,
1427            "Pseudo-terminal process is not running",
1428        )
1429    })?;
1430    #[cfg(windows)]
1431    let payload = pty_windows::input_payload(data);
1432    #[cfg(unix)]
1433    let payload = pty_platform::input_payload(data);
1434    handles.writer.write_all(&payload)?;
1435    handles.writer.flush()
1436}
1437
1438#[cfg(windows)]
1439pub fn windows_terminal_input_payload(data: &[u8]) -> Vec<u8> {
1440    let mut translated = Vec::with_capacity(data.len());
1441    let mut index = 0usize;
1442    while index < data.len() {
1443        let current = data[index];
1444        if current == b'\r' {
1445            translated.push(current);
1446            if index + 1 < data.len() && data[index + 1] == b'\n' {
1447                translated.push(b'\n');
1448                index += 2;
1449                continue;
1450            }
1451            index += 1;
1452            continue;
1453        }
1454        if current == b'\n' {
1455            translated.push(b'\r');
1456            index += 1;
1457            continue;
1458        }
1459        translated.push(current);
1460        index += 1;
1461    }
1462    translated
1463}
1464
1465#[cfg(windows)]
1466#[inline(never)]
1467pub fn assign_child_to_windows_kill_on_close_job(
1468    handle: Option<std::os::windows::io::RawHandle>,
1469) -> Result<WindowsJobHandle, PtyError> {
1470    crate::rp_rust_debug_scope!(
1471        "running_process_core::pty::assign_child_to_windows_kill_on_close_job"
1472    );
1473    use std::mem::zeroed;
1474
1475    use winapi::shared::minwindef::FALSE;
1476    use winapi::um::handleapi::INVALID_HANDLE_VALUE;
1477    use winapi::um::jobapi2::{
1478        AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
1479    };
1480    use winapi::um::winnt::{
1481        JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
1482        JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
1483    };
1484
1485    let Some(handle) = handle else {
1486        return Err(PtyError::Other(
1487            "Pseudo-terminal child does not expose a Windows process handle".into(),
1488        ));
1489    };
1490
1491    let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
1492    if job.is_null() || job == INVALID_HANDLE_VALUE {
1493        return Err(PtyError::Io(std::io::Error::last_os_error()));
1494    }
1495
1496    let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
1497    info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
1498    let result = unsafe {
1499        SetInformationJobObject(
1500            job,
1501            JobObjectExtendedLimitInformation,
1502            (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
1503            std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
1504        )
1505    };
1506    if result == FALSE {
1507        let err = std::io::Error::last_os_error();
1508        unsafe {
1509            winapi::um::handleapi::CloseHandle(job);
1510        }
1511        return Err(PtyError::Io(err));
1512    }
1513
1514    let result = unsafe { AssignProcessToJobObject(job, handle.cast()) };
1515    if result == FALSE {
1516        let err = std::io::Error::last_os_error();
1517        unsafe {
1518            winapi::um::handleapi::CloseHandle(job);
1519        }
1520        return Err(PtyError::Io(err));
1521    }
1522
1523    Ok(WindowsJobHandle(job as usize))
1524}
1525
1526/// Information about a child process found via Toolhelp snapshot.
1527#[cfg(windows)]
1528#[derive(Debug, Clone)]
1529pub struct ChildProcessInfo {
1530    pub pid: u32,
1531    pub name: String,
1532}
1533
1534/// Find all direct child processes of a given parent PID using the Windows Toolhelp API.
1535/// Returns PID and process name for each child.
1536#[cfg(windows)]
1537pub fn find_child_processes(parent_pid: u32) -> Vec<ChildProcessInfo> {
1538    use winapi::um::handleapi::CloseHandle;
1539    use winapi::um::tlhelp32::{
1540        CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1541    };
1542
1543    let mut children = Vec::new();
1544    let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1545    if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1546        return children;
1547    }
1548
1549    let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1550    entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1551
1552    if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1553        loop {
1554            if entry.th32ParentProcessID == parent_pid {
1555                let name_bytes = &entry.szExeFile;
1556                let name_len = name_bytes
1557                    .iter()
1558                    .position(|&b| b == 0)
1559                    .unwrap_or(name_bytes.len());
1560                let name = String::from_utf8_lossy(
1561                    &name_bytes[..name_len]
1562                        .iter()
1563                        .map(|&c| c as u8)
1564                        .collect::<Vec<u8>>(),
1565                )
1566                .into_owned();
1567                children.push(ChildProcessInfo {
1568                    pid: entry.th32ProcessID,
1569                    name,
1570                });
1571            }
1572            if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1573                break;
1574            }
1575        }
1576    }
1577
1578    unsafe { CloseHandle(snapshot) };
1579    children
1580}
1581
1582/// Return PIDs of all conhost.exe processes that are children of the current process.
1583#[cfg(windows)]
1584fn conhost_children_of_current_process() -> Vec<u32> {
1585    let our_pid = std::process::id();
1586    find_child_processes(our_pid)
1587        .into_iter()
1588        .filter(|c| c.name.eq_ignore_ascii_case("conhost.exe"))
1589        .map(|c| c.pid)
1590        .collect()
1591}
1592
1593/// After spawning a ConPTY child, find the new conhost.exe process that was created
1594/// by the ConPTY infrastructure (child of our process, not present in the "before"
1595/// snapshot) and assign it to the Job Object so it gets cleaned up on Job close.
1596#[cfg(windows)]
1597fn assign_conpty_conhost_to_job(job: &WindowsJobHandle, before_pids: &[u32]) {
1598    let after_pids = conhost_children_of_current_process();
1599    for pid in after_pids {
1600        if !before_pids.contains(&pid) {
1601            // This is a newly created conhost.exe — assign it to the Job.
1602            let _ = job.assign_pid(pid);
1603        }
1604    }
1605}
1606
1607/// A conhost.exe process whose parent is no longer alive — likely an orphan
1608/// from a dead ConPTY session.
1609#[cfg(windows)]
1610#[derive(Debug, Clone)]
1611pub struct OrphanConhostInfo {
1612    /// PID of the orphaned conhost.exe.
1613    pub pid: u32,
1614    /// PID that was the parent when the snapshot was taken.
1615    pub parent_pid: u32,
1616    /// Name of the parent process, if it can be resolved (empty if parent is dead).
1617    pub parent_name: String,
1618}
1619
1620/// Scan all conhost.exe processes on the system and return those whose parent
1621/// process is no longer alive. These are likely orphans from dead ConPTY sessions.
1622///
1623/// Uses `CreateToolhelp32Snapshot` for a point-in-time snapshot — no sysinfo
1624/// dependency, so it's lightweight and can be called frequently.
1625#[cfg(windows)]
1626pub fn find_orphan_conhosts() -> Vec<OrphanConhostInfo> {
1627    use winapi::um::handleapi::CloseHandle;
1628    use winapi::um::tlhelp32::{
1629        CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1630    };
1631
1632    let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1633    if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1634        return Vec::new();
1635    }
1636
1637    let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1638    entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1639
1640    // First pass: collect all PIDs and identify conhost.exe processes.
1641    let mut all_pids = std::collections::HashSet::new();
1642    let mut conhosts: Vec<(u32, u32)> = Vec::new(); // (pid, parent_pid)
1643    let mut parent_names: std::collections::HashMap<u32, String> = std::collections::HashMap::new();
1644
1645    if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1646        loop {
1647            let name_bytes = &entry.szExeFile;
1648            let name_len = name_bytes
1649                .iter()
1650                .position(|&b| b == 0)
1651                .unwrap_or(name_bytes.len());
1652            let name = String::from_utf8_lossy(
1653                &name_bytes[..name_len]
1654                    .iter()
1655                    .map(|&c| c as u8)
1656                    .collect::<Vec<u8>>(),
1657            )
1658            .into_owned();
1659
1660            all_pids.insert(entry.th32ProcessID);
1661            parent_names.insert(entry.th32ProcessID, name.clone());
1662
1663            if name.eq_ignore_ascii_case("conhost.exe") {
1664                conhosts.push((entry.th32ProcessID, entry.th32ParentProcessID));
1665            }
1666
1667            if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1668                break;
1669            }
1670        }
1671    }
1672
1673    unsafe { CloseHandle(snapshot) };
1674
1675    // Second pass: filter to conhosts whose parent PID is not in the live set.
1676    conhosts
1677        .into_iter()
1678        .filter(|&(_, parent_pid)| !all_pids.contains(&parent_pid))
1679        .map(|(pid, parent_pid)| OrphanConhostInfo {
1680            pid,
1681            parent_pid,
1682            parent_name: parent_names.get(&parent_pid).cloned().unwrap_or_default(),
1683        })
1684        .collect()
1685}
1686
1687#[cfg(windows)]
1688#[inline(never)]
1689pub fn apply_windows_pty_priority(
1690    handle: Option<std::os::windows::io::RawHandle>,
1691    nice: Option<i32>,
1692) -> Result<(), PtyError> {
1693    crate::rp_rust_debug_scope!("running_process_core::pty::apply_windows_pty_priority");
1694    use winapi::um::processthreadsapi::SetPriorityClass;
1695    use winapi::um::winbase::{
1696        ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, HIGH_PRIORITY_CLASS,
1697        IDLE_PRIORITY_CLASS,
1698    };
1699
1700    let Some(handle) = handle else {
1701        return Ok(());
1702    };
1703    let flags = match nice {
1704        Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
1705        Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
1706        Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
1707        Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
1708        _ => 0,
1709    };
1710    if flags == 0 {
1711        return Ok(());
1712    }
1713    let result = unsafe { SetPriorityClass(handle.cast(), flags) };
1714    if result == 0 {
1715        return Err(PtyError::Io(std::io::Error::last_os_error()));
1716    }
1717    Ok(())
1718}
1719
1720#[cfg(test)]
1721mod tests {
1722    use super::resolved_spawn_cwd;
1723
1724    #[test]
1725    fn resolved_spawn_cwd_preserves_explicit_value() {
1726        assert_eq!(
1727            resolved_spawn_cwd(Some("C:\\temp\\explicit")),
1728            Some("C:\\temp\\explicit".to_string())
1729        );
1730    }
1731
1732    #[test]
1733    fn resolved_spawn_cwd_defaults_to_current_dir_when_unset() {
1734        let expected = std::env::current_dir()
1735            .ok()
1736            .map(|cwd| cwd.to_string_lossy().to_string());
1737        assert_eq!(resolved_spawn_cwd(None), expected);
1738    }
1739}