Skip to main content

running_process_core/pty/
mod.rs

1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::io::{Read, Write};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
10use thiserror::Error;
11
12/// Re-exports for downstream crates that need portable-pty types.
13pub mod reexports {
14    pub use portable_pty;
15}
16
17#[cfg(unix)]
18mod pty_posix;
19#[cfg(windows)]
20mod pty_windows;
21
22pub mod terminal_input;
23
24#[cfg(unix)]
25use pty_posix as pty_platform;
26
27#[derive(Debug, Error)]
28pub enum PtyError {
29    #[error("pseudo-terminal process already started")]
30    AlreadyStarted,
31    #[error("pseudo-terminal process is not running")]
32    NotRunning,
33    #[error("pseudo-terminal timed out")]
34    Timeout,
35    #[error("pseudo-terminal I/O error: {0}")]
36    Io(#[from] std::io::Error),
37    #[error("pseudo-terminal spawn failed: {0}")]
38    Spawn(String),
39    #[error("pseudo-terminal error: {0}")]
40    Other(String),
41}
42
43pub fn is_ignorable_process_control_error(err: &std::io::Error) -> bool {
44    if matches!(
45        err.kind(),
46        std::io::ErrorKind::NotFound | std::io::ErrorKind::InvalidInput
47    ) {
48        return true;
49    }
50    #[cfg(unix)]
51    if err.raw_os_error() == Some(libc::ESRCH) {
52        return true;
53    }
54    false
55}
56
57pub struct PtyReadState {
58    pub chunks: VecDeque<Vec<u8>>,
59    pub closed: bool,
60}
61
62pub struct PtyReadShared {
63    pub state: Mutex<PtyReadState>,
64    pub condvar: Condvar,
65}
66
67pub struct NativePtyHandles {
68    pub master: Box<dyn MasterPty + Send>,
69    pub writer: Box<dyn Write + Send>,
70    pub child: Box<dyn portable_pty::Child + Send + Sync>,
71    #[cfg(windows)]
72    pub _job: WindowsJobHandle,
73}
74
75#[cfg(windows)]
76pub struct WindowsJobHandle(pub usize);
77
78#[cfg(windows)]
79impl WindowsJobHandle {
80    /// Assign an additional process (by PID) to this Job Object.
81    pub fn assign_pid(&self, pid: u32) -> Result<(), std::io::Error> {
82        use winapi::um::handleapi::CloseHandle;
83        use winapi::um::processthreadsapi::OpenProcess;
84        use winapi::um::winnt::PROCESS_SET_QUOTA;
85        use winapi::um::winnt::PROCESS_TERMINATE;
86
87        let handle = unsafe { OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, 0, pid) };
88        if handle.is_null() {
89            return Err(std::io::Error::last_os_error());
90        }
91        let result = unsafe {
92            winapi::um::jobapi2::AssignProcessToJobObject(
93                self.0 as winapi::shared::ntdef::HANDLE,
94                handle,
95            )
96        };
97        unsafe { CloseHandle(handle) };
98        if result == 0 {
99            return Err(std::io::Error::last_os_error());
100        }
101        Ok(())
102    }
103}
104
105#[cfg(windows)]
106impl Drop for WindowsJobHandle {
107    fn drop(&mut self) {
108        unsafe {
109            winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
110        }
111    }
112}
113
114pub struct IdleMonitorState {
115    pub last_reset_at: Instant,
116    pub returncode: Option<i32>,
117    pub interrupted: bool,
118}
119
120/// Core idle detection logic, shareable across threads via Arc.
121/// The reader thread calls `record_output` directly.
122pub struct IdleDetectorCore {
123    pub timeout_seconds: f64,
124    pub stability_window_seconds: f64,
125    pub sample_interval_seconds: f64,
126    pub reset_on_input: bool,
127    pub reset_on_output: bool,
128    pub count_control_churn_as_output: bool,
129    pub enabled: Arc<AtomicBool>,
130    pub state: Mutex<IdleMonitorState>,
131    pub condvar: Condvar,
132}
133
134impl IdleDetectorCore {
135    pub fn record_input(&self, byte_count: usize) {
136        if !self.reset_on_input || byte_count == 0 {
137            return;
138        }
139        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
140        guard.last_reset_at = Instant::now();
141        self.condvar.notify_all();
142    }
143
144    pub fn record_output(&self, data: &[u8]) {
145        if !self.reset_on_output || data.is_empty() {
146            return;
147        }
148        let control_bytes = control_churn_bytes(data);
149        let visible_output_bytes = data.len().saturating_sub(control_bytes);
150        let active_output =
151            visible_output_bytes > 0 || (self.count_control_churn_as_output && control_bytes > 0);
152        if !active_output {
153            return;
154        }
155        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
156        guard.last_reset_at = Instant::now();
157        self.condvar.notify_all();
158    }
159
160    pub fn mark_exit(&self, returncode: i32, interrupted: bool) {
161        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
162        guard.returncode = Some(returncode);
163        guard.interrupted = interrupted;
164        self.condvar.notify_all();
165    }
166
167    pub fn enabled(&self) -> bool {
168        self.enabled.load(Ordering::Acquire)
169    }
170
171    pub fn set_enabled(&self, enabled: bool) {
172        let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
173        if enabled && !was_enabled {
174            let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
175            guard.last_reset_at = Instant::now();
176        }
177        self.condvar.notify_all();
178    }
179
180    pub fn wait(&self, timeout: Option<f64>) -> (bool, String, f64, Option<i32>) {
181        let started = Instant::now();
182        let overall_timeout = timeout.map(Duration::from_secs_f64);
183        let min_idle = self.timeout_seconds.max(self.stability_window_seconds);
184        let sample_interval = Duration::from_secs_f64(self.sample_interval_seconds.max(0.001));
185
186        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
187        loop {
188            let now = Instant::now();
189            let idle_for = now.duration_since(guard.last_reset_at).as_secs_f64();
190
191            if let Some(returncode) = guard.returncode {
192                let reason = if guard.interrupted {
193                    "interrupt"
194                } else {
195                    "process_exit"
196                };
197                return (false, reason.to_string(), idle_for, Some(returncode));
198            }
199
200            let enabled = self.enabled.load(Ordering::Acquire);
201            if enabled && idle_for >= min_idle {
202                return (true, "idle_timeout".to_string(), idle_for, None);
203            }
204
205            if let Some(limit) = overall_timeout {
206                if now.duration_since(started) >= limit {
207                    return (false, "timeout".to_string(), idle_for, None);
208                }
209            }
210
211            let idle_remaining = if enabled {
212                (min_idle - idle_for).max(0.0)
213            } else {
214                sample_interval.as_secs_f64()
215            };
216            let mut wait_for =
217                sample_interval.min(Duration::from_secs_f64(idle_remaining.max(0.001)));
218            if let Some(limit) = overall_timeout {
219                let elapsed = now.duration_since(started);
220                if elapsed < limit {
221                    let remaining = limit - elapsed;
222                    wait_for = wait_for.min(remaining);
223                }
224            }
225            let result = self
226                .condvar
227                .wait_timeout(guard, wait_for)
228                .expect("idle monitor mutex poisoned");
229            guard = result.0;
230        }
231    }
232}
233
234pub struct NativePtyProcess {
235    pub argv: Vec<String>,
236    pub cwd: Option<String>,
237    pub env: Option<Vec<(String, String)>>,
238    pub rows: u16,
239    pub cols: u16,
240    #[cfg(windows)]
241    pub nice: Option<i32>,
242    pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
243    pub reader: Arc<PtyReadShared>,
244    pub returncode: Arc<Mutex<Option<i32>>>,
245    pub input_bytes_total: Arc<AtomicUsize>,
246    pub newline_events_total: Arc<AtomicUsize>,
247    pub submit_events_total: Arc<AtomicUsize>,
248    /// When true, the reader thread writes PTY output to stdout.
249    pub echo: Arc<AtomicBool>,
250    /// When set, the reader thread feeds output directly to the idle detector.
251    pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
252    /// Visible (non-control) output bytes seen by the reader thread.
253    pub output_bytes_total: Arc<AtomicUsize>,
254    /// Control churn bytes (ANSI escapes, BS, CR, DEL) seen by the reader.
255    pub control_churn_bytes_total: Arc<AtomicUsize>,
256    #[cfg(windows)]
257    pub terminal_input_relay_stop: Arc<AtomicBool>,
258    #[cfg(windows)]
259    pub terminal_input_relay_active: Arc<AtomicBool>,
260    #[cfg(windows)]
261    pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
262}
263
264impl NativePtyProcess {
265    pub fn new(
266        argv: Vec<String>,
267        cwd: Option<String>,
268        env: Option<Vec<(String, String)>>,
269        rows: u16,
270        cols: u16,
271        nice: Option<i32>,
272    ) -> Result<Self, PtyError> {
273        if argv.is_empty() {
274            return Err(PtyError::Other("command cannot be empty".into()));
275        }
276        #[cfg(not(windows))]
277        let _ = nice;
278        Ok(Self {
279            argv,
280            cwd,
281            env,
282            rows,
283            cols,
284            #[cfg(windows)]
285            nice,
286            handles: Arc::new(Mutex::new(None)),
287            reader: Arc::new(PtyReadShared {
288                state: Mutex::new(PtyReadState {
289                    chunks: VecDeque::new(),
290                    closed: false,
291                }),
292                condvar: Condvar::new(),
293            }),
294            returncode: Arc::new(Mutex::new(None)),
295            input_bytes_total: Arc::new(AtomicUsize::new(0)),
296            newline_events_total: Arc::new(AtomicUsize::new(0)),
297            submit_events_total: Arc::new(AtomicUsize::new(0)),
298            echo: Arc::new(AtomicBool::new(false)),
299            idle_detector: Arc::new(Mutex::new(None)),
300            output_bytes_total: Arc::new(AtomicUsize::new(0)),
301            control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
302            #[cfg(windows)]
303            terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
304            #[cfg(windows)]
305            terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
306            #[cfg(windows)]
307            terminal_input_relay_worker: Mutex::new(None),
308        })
309    }
310
311    pub fn mark_reader_closed(&self) {
312        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
313        guard.closed = true;
314        self.reader.condvar.notify_all();
315    }
316
317    pub fn store_returncode(&self, code: i32) {
318        store_pty_returncode(&self.returncode, code);
319    }
320
321    pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
322        record_pty_input_metrics(
323            &self.input_bytes_total,
324            &self.newline_events_total,
325            &self.submit_events_total,
326            data,
327            submit,
328        );
329    }
330
331    pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
332        self.record_input_metrics(data, submit);
333        write_pty_input(&self.handles, data)?;
334        Ok(())
335    }
336
337    #[cfg(windows)]
338    pub fn request_terminal_input_relay_stop(&self) {
339        self.terminal_input_relay_stop
340            .store(true, Ordering::Release);
341        self.terminal_input_relay_active
342            .store(false, Ordering::Release);
343    }
344
345    #[cfg(windows)]
346    pub fn stop_terminal_input_relay_impl(&self) {
347        self.request_terminal_input_relay_stop();
348        if let Some(worker) = self
349            .terminal_input_relay_worker
350            .lock()
351            .expect("pty terminal input relay mutex poisoned")
352            .take()
353        {
354            let _ = worker.join();
355        }
356    }
357
358    #[cfg(not(windows))]
359    pub fn stop_terminal_input_relay_impl(&self) {}
360
361    /// Synchronously tear down the PTY and reap the child.
362    #[inline(never)]
363    pub fn close_impl(&self) -> Result<(), PtyError> {
364        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_impl");
365        self.stop_terminal_input_relay_impl();
366        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
367        let Some(handles) = guard.take() else {
368            self.mark_reader_closed();
369            return Ok(());
370        };
371        drop(guard);
372
373        #[cfg(windows)]
374        let NativePtyHandles {
375            master,
376            writer,
377            mut child,
378            _job,
379        } = handles;
380        #[cfg(not(windows))]
381        let NativePtyHandles {
382            master,
383            writer,
384            mut child,
385        } = handles;
386
387        if let Err(err) = child.kill() {
388            if !is_ignorable_process_control_error(&err) {
389                return Err(PtyError::Io(err));
390            }
391        }
392
393        drop(writer);
394        drop(master);
395
396        let code = match child.wait() {
397            Ok(status) => portable_exit_code(status),
398            Err(_) => -9,
399        };
400        drop(child);
401        #[cfg(windows)]
402        drop(_job);
403
404        self.store_returncode(code);
405        self.mark_reader_closed();
406        Ok(())
407    }
408
409    /// Best-effort, non-blocking teardown for use from `Drop`.
410    #[inline(never)]
411    pub fn close_nonblocking(&self) {
412        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_nonblocking");
413        #[cfg(windows)]
414        self.request_terminal_input_relay_stop();
415        let Ok(mut guard) = self.handles.lock() else {
416            return;
417        };
418        let Some(handles) = guard.take() else {
419            self.mark_reader_closed();
420            return;
421        };
422        drop(guard);
423
424        #[cfg(windows)]
425        let NativePtyHandles {
426            master,
427            writer,
428            mut child,
429            _job,
430        } = handles;
431        #[cfg(not(windows))]
432        let NativePtyHandles {
433            master,
434            writer,
435            mut child,
436        } = handles;
437
438        if let Err(err) = child.kill() {
439            if !is_ignorable_process_control_error(&err) {
440                return;
441            }
442        }
443        drop(writer);
444        drop(master);
445        drop(child);
446        #[cfg(windows)]
447        drop(_job);
448        self.mark_reader_closed();
449    }
450
451    pub fn start_impl(&self) -> Result<(), PtyError> {
452        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::start");
453        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
454        if guard.is_some() {
455            return Err(PtyError::AlreadyStarted);
456        }
457
458        // Snapshot our conhost.exe children before openpty() so we can diff
459        // after spawn to find the new conhost.exe created by ConPTY.
460        #[cfg(windows)]
461        let conhost_pids_before = conhost_children_of_current_process();
462
463        let pty_system = native_pty_system();
464        let pair = pty_system
465            .openpty(PtySize {
466                rows: self.rows,
467                cols: self.cols,
468                pixel_width: 0,
469                pixel_height: 0,
470            })
471            .map_err(|e| PtyError::Spawn(e.to_string()))?;
472
473        let mut cmd = command_builder_from_argv(&self.argv);
474        if let Some(cwd) = &self.cwd {
475            cmd.cwd(cwd);
476        }
477        if let Some(env) = &self.env {
478            cmd.env_clear();
479            for (key, value) in env {
480                cmd.env(key, value);
481            }
482        }
483
484        let reader = pair.master.try_clone_reader().map_err(|e| PtyError::Spawn(e.to_string()))?;
485        let writer = pair.master.take_writer().map_err(|e| PtyError::Spawn(e.to_string()))?;
486        let child = pair.slave.spawn_command(cmd).map_err(|e| PtyError::Spawn(e.to_string()))?;
487        #[cfg(windows)]
488        let job = assign_child_to_windows_kill_on_close_job(child.as_raw_handle())?;
489        #[cfg(windows)]
490        assign_conpty_conhost_to_job(&job, &conhost_pids_before);
491        #[cfg(windows)]
492        apply_windows_pty_priority(child.as_raw_handle(), self.nice)?;
493        let shared = Arc::clone(&self.reader);
494        let echo = Arc::clone(&self.echo);
495        let idle_detector = Arc::clone(&self.idle_detector);
496        let output_bytes = Arc::clone(&self.output_bytes_total);
497        let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
498        thread::spawn(move || {
499            spawn_pty_reader(
500                reader,
501                shared,
502                echo,
503                idle_detector,
504                output_bytes,
505                churn_bytes,
506            );
507        });
508
509        *guard = Some(NativePtyHandles {
510            master: pair.master,
511            writer,
512            child,
513            #[cfg(windows)]
514            _job: job,
515        });
516        Ok(())
517    }
518
519    pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
520        #[cfg(windows)]
521        {
522            pty_windows::respond_to_queries(self, data)
523        }
524
525        #[cfg(unix)]
526        {
527            pty_platform::respond_to_queries(self, data)
528        }
529    }
530
531    pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
532        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::resize");
533        let guard = self.handles.lock().expect("pty handles mutex poisoned");
534        if let Some(handles) = guard.as_ref() {
535            handles
536                .master
537                .resize(PtySize {
538                    rows,
539                    cols,
540                    pixel_width: 0,
541                    pixel_height: 0,
542                })
543                .map_err(|e| PtyError::Other(e.to_string()))?;
544        }
545        Ok(())
546    }
547
548    pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
549        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::send_interrupt");
550        #[cfg(windows)]
551        {
552            pty_windows::send_interrupt(self)
553        }
554
555        #[cfg(unix)]
556        {
557            pty_platform::send_interrupt(self)
558        }
559    }
560
561    pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
562        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::wait");
563        // Fast path: already exited.
564        if let Some(code) = *self.returncode.lock().expect("pty returncode mutex poisoned") {
565            return Ok(code);
566        }
567        let start = Instant::now();
568        loop {
569            if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
570                return Ok(code);
571            }
572            if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
573                return Err(PtyError::Timeout);
574            }
575            thread::sleep(Duration::from_millis(10));
576        }
577    }
578
579    pub fn terminate_impl(&self) -> Result<(), PtyError> {
580        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate");
581        #[cfg(windows)]
582        {
583            pty_windows::terminate(self)
584        }
585
586        #[cfg(unix)]
587        {
588            pty_platform::terminate(self)
589        }
590    }
591
592    pub fn kill_impl(&self) -> Result<(), PtyError> {
593        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill");
594        #[cfg(windows)]
595        {
596            pty_windows::kill(self)
597        }
598
599        #[cfg(unix)]
600        {
601            pty_platform::kill(self)
602        }
603    }
604
605    pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
606        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate_tree");
607        #[cfg(windows)]
608        {
609            pty_windows::terminate_tree(self)
610        }
611
612        #[cfg(unix)]
613        {
614            pty_platform::terminate_tree(self)
615        }
616    }
617
618    pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
619        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill_tree");
620        #[cfg(windows)]
621        {
622            pty_windows::kill_tree(self)
623        }
624
625        #[cfg(unix)]
626        {
627            pty_platform::kill_tree(self)
628        }
629    }
630
631    /// Get the PID of the child process, if running.
632    pub fn pid(&self) -> Result<Option<u32>, PtyError> {
633        let guard = self.handles.lock().expect("pty handles mutex poisoned");
634        if let Some(handles) = guard.as_ref() {
635            #[cfg(unix)]
636            if let Some(pid) = handles.master.process_group_leader() {
637                if let Ok(pid) = u32::try_from(pid) {
638                    return Ok(Some(pid));
639                }
640            }
641            return Ok(handles.child.process_id());
642        }
643        Ok(None)
644    }
645
646    /// Wait for a chunk of output from the PTY reader.
647    /// Returns `Ok(Some(chunk))` on data, `Ok(None)` on timeout, `Err` on closed.
648    pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
649        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
650        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
651        loop {
652            if let Some(chunk) = guard.chunks.pop_front() {
653                return Ok(Some(chunk));
654            }
655            if guard.closed {
656                return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
657            }
658            match deadline {
659                Some(deadline) => {
660                    let now = Instant::now();
661                    if now >= deadline {
662                        return Ok(None); // timeout
663                    }
664                    let wait = deadline.saturating_duration_since(now);
665                    let result = self
666                        .reader
667                        .condvar
668                        .wait_timeout(guard, wait)
669                        .expect("pty read mutex poisoned");
670                    guard = result.0;
671                }
672                None => {
673                    guard = self
674                        .reader
675                        .condvar
676                        .wait(guard)
677                        .expect("pty read mutex poisoned");
678                }
679            }
680        }
681    }
682
683    /// Wait for the reader thread to close.
684    pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
685        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
686        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
687        loop {
688            if guard.closed {
689                return true;
690            }
691            match deadline {
692                Some(deadline) => {
693                    let now = Instant::now();
694                    if now >= deadline {
695                        return false;
696                    }
697                    let wait = deadline.saturating_duration_since(now);
698                    let result = self
699                        .reader
700                        .condvar
701                        .wait_timeout(guard, wait)
702                        .expect("pty read mutex poisoned");
703                    guard = result.0;
704                }
705                None => {
706                    guard = self
707                        .reader
708                        .condvar
709                        .wait(guard)
710                        .expect("pty read mutex poisoned");
711                }
712            }
713        }
714    }
715
716    /// Wait for exit then drain remaining output.
717    pub fn wait_and_drain_impl(
718        &self,
719        timeout: Option<f64>,
720        drain_timeout: f64,
721    ) -> Result<i32, PtyError> {
722        let code = self.wait_impl(timeout)?;
723        let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
724        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
725        while !guard.closed {
726            let remaining = deadline.saturating_duration_since(Instant::now());
727            if remaining.is_zero() {
728                break;
729            }
730            let result = self
731                .reader
732                .condvar
733                .wait_timeout(guard, remaining)
734                .expect("pty read mutex poisoned");
735            guard = result.0;
736        }
737        Ok(code)
738    }
739
740    pub fn set_echo(&self, enabled: bool) {
741        self.echo.store(enabled, Ordering::Release);
742    }
743
744    pub fn echo_enabled(&self) -> bool {
745        self.echo.load(Ordering::Acquire)
746    }
747
748    pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
749        let mut guard = self
750            .idle_detector
751            .lock()
752            .expect("idle detector mutex poisoned");
753        *guard = Some(Arc::clone(detector));
754    }
755
756    pub fn detach_idle_detector(&self) {
757        let mut guard = self
758            .idle_detector
759            .lock()
760            .expect("idle detector mutex poisoned");
761        *guard = None;
762    }
763
764    pub fn pty_input_bytes_total(&self) -> usize {
765        self.input_bytes_total.load(Ordering::Acquire)
766    }
767
768    pub fn pty_newline_events_total(&self) -> usize {
769        self.newline_events_total.load(Ordering::Acquire)
770    }
771
772    pub fn pty_submit_events_total(&self) -> usize {
773        self.submit_events_total.load(Ordering::Acquire)
774    }
775
776    pub fn pty_output_bytes_total(&self) -> usize {
777        self.output_bytes_total.load(Ordering::Acquire)
778    }
779
780    pub fn pty_control_churn_bytes_total(&self) -> usize {
781        self.control_churn_bytes_total.load(Ordering::Acquire)
782    }
783}
784
785impl Drop for NativePtyProcess {
786    fn drop(&mut self) {
787        self.close_nonblocking();
788    }
789}
790
791// ── Helper functions ──
792
793pub fn control_churn_bytes(data: &[u8]) -> usize {
794    let mut total = 0;
795    let mut index = 0;
796    while index < data.len() {
797        let byte = data[index];
798        if byte == 0x1B {
799            let start = index;
800            index += 1;
801            if index < data.len() && data[index] == b'[' {
802                index += 1;
803                while index < data.len() {
804                    let current = data[index];
805                    index += 1;
806                    if (0x40..=0x7E).contains(&current) {
807                        break;
808                    }
809                }
810            }
811            total += index - start;
812            continue;
813        }
814        if matches!(byte, 0x08 | 0x0D | 0x7F) {
815            total += 1;
816        }
817        index += 1;
818    }
819    total
820}
821
822pub fn command_builder_from_argv(argv: &[String]) -> CommandBuilder {
823    let mut command = CommandBuilder::new(&argv[0]);
824    if argv.len() > 1 {
825        command.args(
826            argv[1..]
827                .iter()
828                .map(OsString::from)
829                .collect::<Vec<OsString>>(),
830        );
831    }
832    command
833}
834
835#[inline(never)]
836pub fn spawn_pty_reader(
837    mut reader: Box<dyn Read + Send>,
838    shared: Arc<PtyReadShared>,
839    echo: Arc<AtomicBool>,
840    idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
841    output_bytes_total: Arc<AtomicUsize>,
842    control_churn_bytes_total: Arc<AtomicUsize>,
843) {
844    crate::rp_rust_debug_scope!("running_process_core::spawn_pty_reader");
845    let idle_detector_snapshot = idle_detector
846        .lock()
847        .expect("idle detector mutex poisoned")
848        .clone();
849    let mut chunk = vec![0_u8; 65536];
850    loop {
851        match reader.read(&mut chunk) {
852            Ok(0) => break,
853            Ok(n) => {
854                let data = &chunk[..n];
855
856                let churn = control_churn_bytes(data);
857                let visible = data.len().saturating_sub(churn);
858                output_bytes_total.fetch_add(visible, Ordering::Relaxed);
859                control_churn_bytes_total.fetch_add(churn, Ordering::Relaxed);
860
861                if echo.load(Ordering::Relaxed) {
862                    let _ = std::io::stdout().write_all(data);
863                    let _ = std::io::stdout().flush();
864                }
865
866                if let Some(ref detector) = idle_detector_snapshot {
867                    detector.record_output(data);
868                }
869
870                let mut guard = shared.state.lock().expect("pty read mutex poisoned");
871                guard.chunks.push_back(data.to_vec());
872                shared.condvar.notify_all();
873            }
874            Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
875            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
876                thread::sleep(Duration::from_millis(10));
877                continue;
878            }
879            Err(_) => break,
880        }
881    }
882    let mut guard = shared.state.lock().expect("pty read mutex poisoned");
883    guard.closed = true;
884    shared.condvar.notify_all();
885}
886
887pub fn portable_exit_code(status: portable_pty::ExitStatus) -> i32 {
888    if let Some(signal) = status.signal() {
889        let signal = signal.to_ascii_lowercase();
890        if signal.contains("interrupt") {
891            return -2;
892        }
893        if signal.contains("terminated") {
894            return -15;
895        }
896        if signal.contains("killed") {
897            return -9;
898        }
899    }
900    status.exit_code() as i32
901}
902
903pub fn input_contains_newline(data: &[u8]) -> bool {
904    data.iter().any(|byte| matches!(*byte, b'\r' | b'\n'))
905}
906
907pub fn record_pty_input_metrics(
908    input_bytes_total: &Arc<AtomicUsize>,
909    newline_events_total: &Arc<AtomicUsize>,
910    submit_events_total: &Arc<AtomicUsize>,
911    data: &[u8],
912    submit: bool,
913) {
914    input_bytes_total.fetch_add(data.len(), Ordering::AcqRel);
915    if input_contains_newline(data) {
916        newline_events_total.fetch_add(1, Ordering::AcqRel);
917    }
918    if submit {
919        submit_events_total.fetch_add(1, Ordering::AcqRel);
920    }
921}
922
923pub fn store_pty_returncode(returncode: &Arc<Mutex<Option<i32>>>, code: i32) {
924    *returncode.lock().expect("pty returncode mutex poisoned") = Some(code);
925}
926
927pub fn poll_pty_process(
928    handles: &Arc<Mutex<Option<NativePtyHandles>>>,
929    returncode: &Arc<Mutex<Option<i32>>>,
930) -> Result<Option<i32>, std::io::Error> {
931    let mut guard = handles.lock().expect("pty handles mutex poisoned");
932    let Some(handles) = guard.as_mut() else {
933        return Ok(*returncode.lock().expect("pty returncode mutex poisoned"));
934    };
935    let status = handles.child.try_wait()?;
936    let code = status.map(portable_exit_code);
937    if let Some(code) = code {
938        store_pty_returncode(returncode, code);
939        return Ok(Some(code));
940    }
941    Ok(None)
942}
943
944pub fn write_pty_input(
945    handles: &Arc<Mutex<Option<NativePtyHandles>>>,
946    data: &[u8],
947) -> Result<(), std::io::Error> {
948    let mut guard = handles.lock().expect("pty handles mutex poisoned");
949    let handles = guard.as_mut().ok_or_else(|| {
950        std::io::Error::new(
951            std::io::ErrorKind::NotConnected,
952            "Pseudo-terminal process is not running",
953        )
954    })?;
955    #[cfg(windows)]
956    let payload = pty_windows::input_payload(data);
957    #[cfg(unix)]
958    let payload = pty_platform::input_payload(data);
959    handles.writer.write_all(&payload)?;
960    handles.writer.flush()
961}
962
963#[cfg(windows)]
964pub fn windows_terminal_input_payload(data: &[u8]) -> Vec<u8> {
965    let mut translated = Vec::with_capacity(data.len());
966    let mut index = 0usize;
967    while index < data.len() {
968        let current = data[index];
969        if current == b'\r' {
970            translated.push(current);
971            if index + 1 < data.len() && data[index + 1] == b'\n' {
972                translated.push(b'\n');
973                index += 2;
974                continue;
975            }
976            index += 1;
977            continue;
978        }
979        if current == b'\n' {
980            translated.push(b'\r');
981            index += 1;
982            continue;
983        }
984        translated.push(current);
985        index += 1;
986    }
987    translated
988}
989
990#[cfg(windows)]
991#[inline(never)]
992pub fn assign_child_to_windows_kill_on_close_job(
993    handle: Option<std::os::windows::io::RawHandle>,
994) -> Result<WindowsJobHandle, PtyError> {
995    crate::rp_rust_debug_scope!(
996        "running_process_core::pty::assign_child_to_windows_kill_on_close_job"
997    );
998    use std::mem::zeroed;
999
1000    use winapi::shared::minwindef::FALSE;
1001    use winapi::um::handleapi::INVALID_HANDLE_VALUE;
1002    use winapi::um::jobapi2::{
1003        AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
1004    };
1005    use winapi::um::winnt::{
1006        JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
1007        JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
1008    };
1009
1010    let Some(handle) = handle else {
1011        return Err(PtyError::Other(
1012            "Pseudo-terminal child does not expose a Windows process handle".into(),
1013        ));
1014    };
1015
1016    let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
1017    if job.is_null() || job == INVALID_HANDLE_VALUE {
1018        return Err(PtyError::Io(std::io::Error::last_os_error()));
1019    }
1020
1021    let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
1022    info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
1023    let result = unsafe {
1024        SetInformationJobObject(
1025            job,
1026            JobObjectExtendedLimitInformation,
1027            (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
1028            std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
1029        )
1030    };
1031    if result == FALSE {
1032        let err = std::io::Error::last_os_error();
1033        unsafe {
1034            winapi::um::handleapi::CloseHandle(job);
1035        }
1036        return Err(PtyError::Io(err));
1037    }
1038
1039    let result = unsafe { AssignProcessToJobObject(job, handle.cast()) };
1040    if result == FALSE {
1041        let err = std::io::Error::last_os_error();
1042        unsafe {
1043            winapi::um::handleapi::CloseHandle(job);
1044        }
1045        return Err(PtyError::Io(err));
1046    }
1047
1048    Ok(WindowsJobHandle(job as usize))
1049}
1050
1051/// Information about a child process found via Toolhelp snapshot.
1052#[cfg(windows)]
1053#[derive(Debug, Clone)]
1054pub struct ChildProcessInfo {
1055    pub pid: u32,
1056    pub name: String,
1057}
1058
1059/// Find all direct child processes of a given parent PID using the Windows Toolhelp API.
1060/// Returns PID and process name for each child.
1061#[cfg(windows)]
1062pub fn find_child_processes(parent_pid: u32) -> Vec<ChildProcessInfo> {
1063    use winapi::um::handleapi::CloseHandle;
1064    use winapi::um::tlhelp32::{
1065        CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1066    };
1067
1068    let mut children = Vec::new();
1069    let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1070    if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1071        return children;
1072    }
1073
1074    let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1075    entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1076
1077    if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1078        loop {
1079            if entry.th32ParentProcessID == parent_pid {
1080                let name_bytes = &entry.szExeFile;
1081                let name_len = name_bytes.iter().position(|&b| b == 0).unwrap_or(name_bytes.len());
1082                let name = String::from_utf8_lossy(
1083                    &name_bytes[..name_len].iter().map(|&c| c as u8).collect::<Vec<u8>>(),
1084                )
1085                .into_owned();
1086                children.push(ChildProcessInfo {
1087                    pid: entry.th32ProcessID,
1088                    name,
1089                });
1090            }
1091            if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1092                break;
1093            }
1094        }
1095    }
1096
1097    unsafe { CloseHandle(snapshot) };
1098    children
1099}
1100
1101/// Return PIDs of all conhost.exe processes that are children of the current process.
1102#[cfg(windows)]
1103fn conhost_children_of_current_process() -> Vec<u32> {
1104    let our_pid = std::process::id();
1105    find_child_processes(our_pid)
1106        .into_iter()
1107        .filter(|c| c.name.eq_ignore_ascii_case("conhost.exe"))
1108        .map(|c| c.pid)
1109        .collect()
1110}
1111
1112/// After spawning a ConPTY child, find the new conhost.exe process that was created
1113/// by the ConPTY infrastructure (child of our process, not present in the "before"
1114/// snapshot) and assign it to the Job Object so it gets cleaned up on Job close.
1115#[cfg(windows)]
1116fn assign_conpty_conhost_to_job(job: &WindowsJobHandle, before_pids: &[u32]) {
1117    let after_pids = conhost_children_of_current_process();
1118    for pid in after_pids {
1119        if !before_pids.contains(&pid) {
1120            // This is a newly created conhost.exe — assign it to the Job.
1121            let _ = job.assign_pid(pid);
1122        }
1123    }
1124}
1125
1126/// A conhost.exe process whose parent is no longer alive — likely an orphan
1127/// from a dead ConPTY session.
1128#[cfg(windows)]
1129#[derive(Debug, Clone)]
1130pub struct OrphanConhostInfo {
1131    /// PID of the orphaned conhost.exe.
1132    pub pid: u32,
1133    /// PID that was the parent when the snapshot was taken.
1134    pub parent_pid: u32,
1135    /// Name of the parent process, if it can be resolved (empty if parent is dead).
1136    pub parent_name: String,
1137}
1138
1139/// Scan all conhost.exe processes on the system and return those whose parent
1140/// process is no longer alive. These are likely orphans from dead ConPTY sessions.
1141///
1142/// Uses `CreateToolhelp32Snapshot` for a point-in-time snapshot — no sysinfo
1143/// dependency, so it's lightweight and can be called frequently.
1144#[cfg(windows)]
1145pub fn find_orphan_conhosts() -> Vec<OrphanConhostInfo> {
1146    use winapi::um::handleapi::CloseHandle;
1147    use winapi::um::tlhelp32::{
1148        CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1149    };
1150
1151    let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1152    if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1153        return Vec::new();
1154    }
1155
1156    let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1157    entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1158
1159    // First pass: collect all PIDs and identify conhost.exe processes.
1160    let mut all_pids = std::collections::HashSet::new();
1161    let mut conhosts: Vec<(u32, u32)> = Vec::new(); // (pid, parent_pid)
1162    let mut parent_names: std::collections::HashMap<u32, String> = std::collections::HashMap::new();
1163
1164    if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1165        loop {
1166            let name_bytes = &entry.szExeFile;
1167            let name_len = name_bytes.iter().position(|&b| b == 0).unwrap_or(name_bytes.len());
1168            let name = String::from_utf8_lossy(
1169                &name_bytes[..name_len].iter().map(|&c| c as u8).collect::<Vec<u8>>(),
1170            )
1171            .into_owned();
1172
1173            all_pids.insert(entry.th32ProcessID);
1174            parent_names.insert(entry.th32ProcessID, name.clone());
1175
1176            if name.eq_ignore_ascii_case("conhost.exe") {
1177                conhosts.push((entry.th32ProcessID, entry.th32ParentProcessID));
1178            }
1179
1180            if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1181                break;
1182            }
1183        }
1184    }
1185
1186    unsafe { CloseHandle(snapshot) };
1187
1188    // Second pass: filter to conhosts whose parent PID is not in the live set.
1189    conhosts
1190        .into_iter()
1191        .filter(|&(_, parent_pid)| !all_pids.contains(&parent_pid))
1192        .map(|(pid, parent_pid)| OrphanConhostInfo {
1193            pid,
1194            parent_pid,
1195            parent_name: parent_names.get(&parent_pid).cloned().unwrap_or_default(),
1196        })
1197        .collect()
1198}
1199
1200#[cfg(windows)]
1201#[inline(never)]
1202pub fn apply_windows_pty_priority(
1203    handle: Option<std::os::windows::io::RawHandle>,
1204    nice: Option<i32>,
1205) -> Result<(), PtyError> {
1206    crate::rp_rust_debug_scope!("running_process_core::pty::apply_windows_pty_priority");
1207    use winapi::um::processthreadsapi::SetPriorityClass;
1208    use winapi::um::winbase::{
1209        ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, HIGH_PRIORITY_CLASS,
1210        IDLE_PRIORITY_CLASS,
1211    };
1212
1213    let Some(handle) = handle else {
1214        return Ok(());
1215    };
1216    let flags = match nice {
1217        Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
1218        Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
1219        Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
1220        Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
1221        _ => 0,
1222    };
1223    if flags == 0 {
1224        return Ok(());
1225    }
1226    let result = unsafe { SetPriorityClass(handle.cast(), flags) };
1227    if result == 0 {
1228        return Err(PtyError::Io(std::io::Error::last_os_error()));
1229    }
1230    Ok(())
1231}