Skip to main content

running_process_core/pty/
mod.rs

1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::io::{Read, Write};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
10use thiserror::Error;
11
12/// Re-exports for downstream crates that need portable-pty types.
13pub mod reexports {
14    pub use portable_pty;
15}
16
17#[cfg(unix)]
18mod pty_posix;
19#[cfg(windows)]
20mod pty_windows;
21
22pub mod terminal_input;
23
24#[cfg(unix)]
25use pty_posix as pty_platform;
26
27#[derive(Debug, Error)]
28pub enum PtyError {
29    #[error("pseudo-terminal process already started")]
30    AlreadyStarted,
31    #[error("pseudo-terminal process is not running")]
32    NotRunning,
33    #[error("pseudo-terminal timed out")]
34    Timeout,
35    #[error("pseudo-terminal I/O error: {0}")]
36    Io(#[from] std::io::Error),
37    #[error("pseudo-terminal spawn failed: {0}")]
38    Spawn(String),
39    #[error("pseudo-terminal error: {0}")]
40    Other(String),
41}
42
43pub fn is_ignorable_process_control_error(err: &std::io::Error) -> bool {
44    if matches!(
45        err.kind(),
46        std::io::ErrorKind::NotFound | std::io::ErrorKind::InvalidInput
47    ) {
48        return true;
49    }
50    #[cfg(unix)]
51    if err.raw_os_error() == Some(libc::ESRCH) {
52        return true;
53    }
54    false
55}
56
57pub struct PtyReadState {
58    pub chunks: VecDeque<Vec<u8>>,
59    pub closed: bool,
60}
61
62pub struct PtyReadShared {
63    pub state: Mutex<PtyReadState>,
64    pub condvar: Condvar,
65}
66
67pub struct NativePtyHandles {
68    pub master: Box<dyn MasterPty + Send>,
69    pub writer: Box<dyn Write + Send>,
70    pub child: Box<dyn portable_pty::Child + Send + Sync>,
71    #[cfg(windows)]
72    pub _job: WindowsJobHandle,
73}
74
75#[cfg(windows)]
76pub struct WindowsJobHandle(pub usize);
77
78#[cfg(windows)]
79impl WindowsJobHandle {
80    /// Assign an additional process (by PID) to this Job Object.
81    pub fn assign_pid(&self, pid: u32) -> Result<(), std::io::Error> {
82        use winapi::um::handleapi::CloseHandle;
83        use winapi::um::processthreadsapi::OpenProcess;
84        use winapi::um::winnt::PROCESS_SET_QUOTA;
85        use winapi::um::winnt::PROCESS_TERMINATE;
86
87        let handle = unsafe { OpenProcess(PROCESS_SET_QUOTA | PROCESS_TERMINATE, 0, pid) };
88        if handle.is_null() {
89            return Err(std::io::Error::last_os_error());
90        }
91        let result = unsafe {
92            winapi::um::jobapi2::AssignProcessToJobObject(
93                self.0 as winapi::shared::ntdef::HANDLE,
94                handle,
95            )
96        };
97        unsafe { CloseHandle(handle) };
98        if result == 0 {
99            return Err(std::io::Error::last_os_error());
100        }
101        Ok(())
102    }
103}
104
105#[cfg(windows)]
106impl Drop for WindowsJobHandle {
107    fn drop(&mut self) {
108        unsafe {
109            winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
110        }
111    }
112}
113
114pub struct IdleMonitorState {
115    pub last_reset_at: Instant,
116    pub returncode: Option<i32>,
117    pub interrupted: bool,
118}
119
120/// Core idle detection logic, shareable across threads via Arc.
121/// The reader thread calls `record_output` directly.
122pub struct IdleDetectorCore {
123    pub timeout_seconds: f64,
124    pub stability_window_seconds: f64,
125    pub sample_interval_seconds: f64,
126    pub reset_on_input: bool,
127    pub reset_on_output: bool,
128    pub count_control_churn_as_output: bool,
129    pub enabled: Arc<AtomicBool>,
130    pub state: Mutex<IdleMonitorState>,
131    pub condvar: Condvar,
132}
133
134impl IdleDetectorCore {
135    pub fn record_input(&self, byte_count: usize) {
136        if !self.reset_on_input || byte_count == 0 {
137            return;
138        }
139        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
140        guard.last_reset_at = Instant::now();
141        self.condvar.notify_all();
142    }
143
144    pub fn record_output(&self, data: &[u8]) {
145        if !self.reset_on_output || data.is_empty() {
146            return;
147        }
148        let control_bytes = control_churn_bytes(data);
149        let visible_output_bytes = data.len().saturating_sub(control_bytes);
150        let active_output =
151            visible_output_bytes > 0 || (self.count_control_churn_as_output && control_bytes > 0);
152        if !active_output {
153            return;
154        }
155        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
156        guard.last_reset_at = Instant::now();
157        self.condvar.notify_all();
158    }
159
160    pub fn mark_exit(&self, returncode: i32, interrupted: bool) {
161        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
162        guard.returncode = Some(returncode);
163        guard.interrupted = interrupted;
164        self.condvar.notify_all();
165    }
166
167    pub fn enabled(&self) -> bool {
168        self.enabled.load(Ordering::Acquire)
169    }
170
171    pub fn set_enabled(&self, enabled: bool) {
172        let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
173        if enabled && !was_enabled {
174            let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
175            guard.last_reset_at = Instant::now();
176        }
177        self.condvar.notify_all();
178    }
179
180    pub fn wait(&self, timeout: Option<f64>) -> (bool, String, f64, Option<i32>) {
181        let started = Instant::now();
182        let overall_timeout = timeout.map(Duration::from_secs_f64);
183        let min_idle = self.timeout_seconds.max(self.stability_window_seconds);
184        let sample_interval = Duration::from_secs_f64(self.sample_interval_seconds.max(0.001));
185
186        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
187        loop {
188            let now = Instant::now();
189            let idle_for = now.duration_since(guard.last_reset_at).as_secs_f64();
190
191            if let Some(returncode) = guard.returncode {
192                let reason = if guard.interrupted {
193                    "interrupt"
194                } else {
195                    "process_exit"
196                };
197                return (false, reason.to_string(), idle_for, Some(returncode));
198            }
199
200            let enabled = self.enabled.load(Ordering::Acquire);
201            if enabled && idle_for >= min_idle {
202                return (true, "idle_timeout".to_string(), idle_for, None);
203            }
204
205            if let Some(limit) = overall_timeout {
206                if now.duration_since(started) >= limit {
207                    return (false, "timeout".to_string(), idle_for, None);
208                }
209            }
210
211            let idle_remaining = if enabled {
212                (min_idle - idle_for).max(0.0)
213            } else {
214                sample_interval.as_secs_f64()
215            };
216            let mut wait_for =
217                sample_interval.min(Duration::from_secs_f64(idle_remaining.max(0.001)));
218            if let Some(limit) = overall_timeout {
219                let elapsed = now.duration_since(started);
220                if elapsed < limit {
221                    let remaining = limit - elapsed;
222                    wait_for = wait_for.min(remaining);
223                }
224            }
225            let result = self
226                .condvar
227                .wait_timeout(guard, wait_for)
228                .expect("idle monitor mutex poisoned");
229            guard = result.0;
230        }
231    }
232}
233
234pub struct NativePtyProcess {
235    pub argv: Vec<String>,
236    pub cwd: Option<String>,
237    pub env: Option<Vec<(String, String)>>,
238    pub rows: u16,
239    pub cols: u16,
240    #[cfg(windows)]
241    pub nice: Option<i32>,
242    pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
243    pub reader: Arc<PtyReadShared>,
244    pub returncode: Arc<Mutex<Option<i32>>>,
245    pub input_bytes_total: Arc<AtomicUsize>,
246    pub newline_events_total: Arc<AtomicUsize>,
247    pub submit_events_total: Arc<AtomicUsize>,
248    /// When true, the reader thread writes PTY output to stdout.
249    pub echo: Arc<AtomicBool>,
250    /// When set, the reader thread feeds output directly to the idle detector.
251    pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
252    /// Visible (non-control) output bytes seen by the reader thread.
253    pub output_bytes_total: Arc<AtomicUsize>,
254    /// Control churn bytes (ANSI escapes, BS, CR, DEL) seen by the reader.
255    pub control_churn_bytes_total: Arc<AtomicUsize>,
256    #[cfg(windows)]
257    pub terminal_input_relay_stop: Arc<AtomicBool>,
258    #[cfg(windows)]
259    pub terminal_input_relay_active: Arc<AtomicBool>,
260    #[cfg(windows)]
261    pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
262}
263
264fn resolved_spawn_cwd(cwd: Option<&str>) -> Option<String> {
265    cwd.map(str::to_owned).or_else(|| {
266        std::env::current_dir()
267            .ok()
268            .map(|cwd| cwd.to_string_lossy().to_string())
269    })
270}
271
272impl NativePtyProcess {
273    pub fn new(
274        argv: Vec<String>,
275        cwd: Option<String>,
276        env: Option<Vec<(String, String)>>,
277        rows: u16,
278        cols: u16,
279        nice: Option<i32>,
280    ) -> Result<Self, PtyError> {
281        if argv.is_empty() {
282            return Err(PtyError::Other("command cannot be empty".into()));
283        }
284        #[cfg(not(windows))]
285        let _ = nice;
286        Ok(Self {
287            argv,
288            cwd,
289            env,
290            rows,
291            cols,
292            #[cfg(windows)]
293            nice,
294            handles: Arc::new(Mutex::new(None)),
295            reader: Arc::new(PtyReadShared {
296                state: Mutex::new(PtyReadState {
297                    chunks: VecDeque::new(),
298                    closed: false,
299                }),
300                condvar: Condvar::new(),
301            }),
302            returncode: Arc::new(Mutex::new(None)),
303            input_bytes_total: Arc::new(AtomicUsize::new(0)),
304            newline_events_total: Arc::new(AtomicUsize::new(0)),
305            submit_events_total: Arc::new(AtomicUsize::new(0)),
306            echo: Arc::new(AtomicBool::new(false)),
307            idle_detector: Arc::new(Mutex::new(None)),
308            output_bytes_total: Arc::new(AtomicUsize::new(0)),
309            control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
310            #[cfg(windows)]
311            terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
312            #[cfg(windows)]
313            terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
314            #[cfg(windows)]
315            terminal_input_relay_worker: Mutex::new(None),
316        })
317    }
318
319    pub fn mark_reader_closed(&self) {
320        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
321        guard.closed = true;
322        self.reader.condvar.notify_all();
323    }
324
325    pub fn store_returncode(&self, code: i32) {
326        store_pty_returncode(&self.returncode, code);
327    }
328
329    pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
330        record_pty_input_metrics(
331            &self.input_bytes_total,
332            &self.newline_events_total,
333            &self.submit_events_total,
334            data,
335            submit,
336        );
337    }
338
339    pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
340        self.record_input_metrics(data, submit);
341        write_pty_input(&self.handles, data)?;
342        Ok(())
343    }
344
345    #[cfg(windows)]
346    pub fn request_terminal_input_relay_stop(&self) {
347        self.terminal_input_relay_stop
348            .store(true, Ordering::Release);
349        self.terminal_input_relay_active
350            .store(false, Ordering::Release);
351    }
352
353    #[cfg(windows)]
354    pub fn stop_terminal_input_relay_impl(&self) {
355        self.request_terminal_input_relay_stop();
356        if let Some(worker) = self
357            .terminal_input_relay_worker
358            .lock()
359            .expect("pty terminal input relay mutex poisoned")
360            .take()
361        {
362            let _ = worker.join();
363        }
364    }
365
366    #[cfg(not(windows))]
367    pub fn stop_terminal_input_relay_impl(&self) {}
368
369    /// Synchronously tear down the PTY and reap the child.
370    #[inline(never)]
371    pub fn close_impl(&self) -> Result<(), PtyError> {
372        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_impl");
373        self.stop_terminal_input_relay_impl();
374        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
375        let Some(handles) = guard.take() else {
376            self.mark_reader_closed();
377            return Ok(());
378        };
379        drop(guard);
380
381        #[cfg(windows)]
382        let NativePtyHandles {
383            master,
384            writer,
385            mut child,
386            _job,
387        } = handles;
388        #[cfg(not(windows))]
389        let NativePtyHandles {
390            master,
391            writer,
392            mut child,
393        } = handles;
394
395        if let Err(err) = child.kill() {
396            if !is_ignorable_process_control_error(&err) {
397                return Err(PtyError::Io(err));
398            }
399        }
400
401        drop(writer);
402        drop(master);
403
404        let code = match child.wait() {
405            Ok(status) => portable_exit_code(status),
406            Err(_) => -9,
407        };
408        drop(child);
409        #[cfg(windows)]
410        drop(_job);
411
412        self.store_returncode(code);
413        self.mark_reader_closed();
414        Ok(())
415    }
416
417    /// Best-effort, non-blocking teardown for use from `Drop`.
418    #[inline(never)]
419    pub fn close_nonblocking(&self) {
420        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_nonblocking");
421        #[cfg(windows)]
422        self.request_terminal_input_relay_stop();
423        let Ok(mut guard) = self.handles.lock() else {
424            return;
425        };
426        let Some(handles) = guard.take() else {
427            self.mark_reader_closed();
428            return;
429        };
430        drop(guard);
431
432        #[cfg(windows)]
433        let NativePtyHandles {
434            master,
435            writer,
436            mut child,
437            _job,
438        } = handles;
439        #[cfg(not(windows))]
440        let NativePtyHandles {
441            master,
442            writer,
443            mut child,
444        } = handles;
445
446        if let Err(err) = child.kill() {
447            if !is_ignorable_process_control_error(&err) {
448                return;
449            }
450        }
451        drop(writer);
452        drop(master);
453        drop(child);
454        #[cfg(windows)]
455        drop(_job);
456        self.mark_reader_closed();
457    }
458
459    pub fn start_impl(&self) -> Result<(), PtyError> {
460        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::start");
461        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
462        if guard.is_some() {
463            return Err(PtyError::AlreadyStarted);
464        }
465
466        // Snapshot our conhost.exe children before openpty() so we can diff
467        // after spawn to find the new conhost.exe created by ConPTY.
468        #[cfg(windows)]
469        let conhost_pids_before = conhost_children_of_current_process();
470
471        let pty_system = native_pty_system();
472        let pair = pty_system
473            .openpty(PtySize {
474                rows: self.rows,
475                cols: self.cols,
476                pixel_width: 0,
477                pixel_height: 0,
478            })
479            .map_err(|e| PtyError::Spawn(e.to_string()))?;
480
481        let mut cmd = command_builder_from_argv(&self.argv);
482        let cwd = resolved_spawn_cwd(self.cwd.as_deref());
483        if let Some(cwd) = &cwd {
484            cmd.cwd(cwd);
485        }
486        if let Some(env) = &self.env {
487            cmd.env_clear();
488            for (key, value) in env {
489                cmd.env(key, value);
490            }
491        }
492
493        let reader = pair
494            .master
495            .try_clone_reader()
496            .map_err(|e| PtyError::Spawn(e.to_string()))?;
497        let writer = pair
498            .master
499            .take_writer()
500            .map_err(|e| PtyError::Spawn(e.to_string()))?;
501        let child = pair
502            .slave
503            .spawn_command(cmd)
504            .map_err(|e| PtyError::Spawn(e.to_string()))?;
505        #[cfg(windows)]
506        let job = assign_child_to_windows_kill_on_close_job(child.as_raw_handle())?;
507        #[cfg(windows)]
508        assign_conpty_conhost_to_job(&job, &conhost_pids_before);
509        #[cfg(windows)]
510        apply_windows_pty_priority(child.as_raw_handle(), self.nice)?;
511        let shared = Arc::clone(&self.reader);
512        let echo = Arc::clone(&self.echo);
513        let idle_detector = Arc::clone(&self.idle_detector);
514        let output_bytes = Arc::clone(&self.output_bytes_total);
515        let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
516        thread::spawn(move || {
517            spawn_pty_reader(
518                reader,
519                shared,
520                echo,
521                idle_detector,
522                output_bytes,
523                churn_bytes,
524            );
525        });
526
527        *guard = Some(NativePtyHandles {
528            master: pair.master,
529            writer,
530            child,
531            #[cfg(windows)]
532            _job: job,
533        });
534        Ok(())
535    }
536
537    pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
538        #[cfg(windows)]
539        {
540            pty_windows::respond_to_queries(self, data)
541        }
542
543        #[cfg(unix)]
544        {
545            pty_platform::respond_to_queries(self, data)
546        }
547    }
548
549    pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
550        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::resize");
551        let guard = self.handles.lock().expect("pty handles mutex poisoned");
552        if let Some(handles) = guard.as_ref() {
553            handles
554                .master
555                .resize(PtySize {
556                    rows,
557                    cols,
558                    pixel_width: 0,
559                    pixel_height: 0,
560                })
561                .map_err(|e| PtyError::Other(e.to_string()))?;
562        }
563        Ok(())
564    }
565
566    pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
567        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::send_interrupt");
568        #[cfg(windows)]
569        {
570            pty_windows::send_interrupt(self)
571        }
572
573        #[cfg(unix)]
574        {
575            pty_platform::send_interrupt(self)
576        }
577    }
578
579    pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
580        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::wait");
581        // Fast path: already exited.
582        if let Some(code) = *self
583            .returncode
584            .lock()
585            .expect("pty returncode mutex poisoned")
586        {
587            return Ok(code);
588        }
589        let start = Instant::now();
590        loop {
591            if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
592                return Ok(code);
593            }
594            if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
595                return Err(PtyError::Timeout);
596            }
597            thread::sleep(Duration::from_millis(10));
598        }
599    }
600
601    pub fn terminate_impl(&self) -> Result<(), PtyError> {
602        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate");
603        #[cfg(windows)]
604        {
605            pty_windows::terminate(self)
606        }
607
608        #[cfg(unix)]
609        {
610            pty_platform::terminate(self)
611        }
612    }
613
614    pub fn kill_impl(&self) -> Result<(), PtyError> {
615        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill");
616        #[cfg(windows)]
617        {
618            pty_windows::kill(self)
619        }
620
621        #[cfg(unix)]
622        {
623            pty_platform::kill(self)
624        }
625    }
626
627    pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
628        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate_tree");
629        #[cfg(windows)]
630        {
631            pty_windows::terminate_tree(self)
632        }
633
634        #[cfg(unix)]
635        {
636            pty_platform::terminate_tree(self)
637        }
638    }
639
640    pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
641        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill_tree");
642        #[cfg(windows)]
643        {
644            pty_windows::kill_tree(self)
645        }
646
647        #[cfg(unix)]
648        {
649            pty_platform::kill_tree(self)
650        }
651    }
652
653    /// Get the PID of the child process, if running.
654    pub fn pid(&self) -> Result<Option<u32>, PtyError> {
655        let guard = self.handles.lock().expect("pty handles mutex poisoned");
656        if let Some(handles) = guard.as_ref() {
657            #[cfg(unix)]
658            if let Some(pid) = handles.master.process_group_leader() {
659                if let Ok(pid) = u32::try_from(pid) {
660                    return Ok(Some(pid));
661                }
662            }
663            return Ok(handles.child.process_id());
664        }
665        Ok(None)
666    }
667
668    /// Wait for a chunk of output from the PTY reader.
669    /// Returns `Ok(Some(chunk))` on data, `Ok(None)` on timeout, `Err` on closed.
670    pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
671        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
672        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
673        loop {
674            if let Some(chunk) = guard.chunks.pop_front() {
675                return Ok(Some(chunk));
676            }
677            if guard.closed {
678                return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
679            }
680            match deadline {
681                Some(deadline) => {
682                    let now = Instant::now();
683                    if now >= deadline {
684                        return Ok(None); // timeout
685                    }
686                    let wait = deadline.saturating_duration_since(now);
687                    let result = self
688                        .reader
689                        .condvar
690                        .wait_timeout(guard, wait)
691                        .expect("pty read mutex poisoned");
692                    guard = result.0;
693                }
694                None => {
695                    guard = self
696                        .reader
697                        .condvar
698                        .wait(guard)
699                        .expect("pty read mutex poisoned");
700                }
701            }
702        }
703    }
704
705    /// Wait for the reader thread to close.
706    pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
707        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
708        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
709        loop {
710            if guard.closed {
711                return true;
712            }
713            match deadline {
714                Some(deadline) => {
715                    let now = Instant::now();
716                    if now >= deadline {
717                        return false;
718                    }
719                    let wait = deadline.saturating_duration_since(now);
720                    let result = self
721                        .reader
722                        .condvar
723                        .wait_timeout(guard, wait)
724                        .expect("pty read mutex poisoned");
725                    guard = result.0;
726                }
727                None => {
728                    guard = self
729                        .reader
730                        .condvar
731                        .wait(guard)
732                        .expect("pty read mutex poisoned");
733                }
734            }
735        }
736    }
737
738    /// Wait for exit then drain remaining output.
739    pub fn wait_and_drain_impl(
740        &self,
741        timeout: Option<f64>,
742        drain_timeout: f64,
743    ) -> Result<i32, PtyError> {
744        let code = self.wait_impl(timeout)?;
745        let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
746        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
747        while !guard.closed {
748            let remaining = deadline.saturating_duration_since(Instant::now());
749            if remaining.is_zero() {
750                break;
751            }
752            let result = self
753                .reader
754                .condvar
755                .wait_timeout(guard, remaining)
756                .expect("pty read mutex poisoned");
757            guard = result.0;
758        }
759        Ok(code)
760    }
761
762    pub fn set_echo(&self, enabled: bool) {
763        self.echo.store(enabled, Ordering::Release);
764    }
765
766    pub fn echo_enabled(&self) -> bool {
767        self.echo.load(Ordering::Acquire)
768    }
769
770    pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
771        let mut guard = self
772            .idle_detector
773            .lock()
774            .expect("idle detector mutex poisoned");
775        *guard = Some(Arc::clone(detector));
776    }
777
778    pub fn detach_idle_detector(&self) {
779        let mut guard = self
780            .idle_detector
781            .lock()
782            .expect("idle detector mutex poisoned");
783        *guard = None;
784    }
785
786    pub fn pty_input_bytes_total(&self) -> usize {
787        self.input_bytes_total.load(Ordering::Acquire)
788    }
789
790    pub fn pty_newline_events_total(&self) -> usize {
791        self.newline_events_total.load(Ordering::Acquire)
792    }
793
794    pub fn pty_submit_events_total(&self) -> usize {
795        self.submit_events_total.load(Ordering::Acquire)
796    }
797
798    pub fn pty_output_bytes_total(&self) -> usize {
799        self.output_bytes_total.load(Ordering::Acquire)
800    }
801
802    pub fn pty_control_churn_bytes_total(&self) -> usize {
803        self.control_churn_bytes_total.load(Ordering::Acquire)
804    }
805}
806
807impl Drop for NativePtyProcess {
808    fn drop(&mut self) {
809        self.close_nonblocking();
810    }
811}
812
813// ── Helper functions ──
814
815pub fn control_churn_bytes(data: &[u8]) -> usize {
816    let mut total = 0;
817    let mut index = 0;
818    while index < data.len() {
819        let byte = data[index];
820        if byte == 0x1B {
821            let start = index;
822            index += 1;
823            if index < data.len() && data[index] == b'[' {
824                index += 1;
825                while index < data.len() {
826                    let current = data[index];
827                    index += 1;
828                    if (0x40..=0x7E).contains(&current) {
829                        break;
830                    }
831                }
832            }
833            total += index - start;
834            continue;
835        }
836        if matches!(byte, 0x08 | 0x0D | 0x7F) {
837            total += 1;
838        }
839        index += 1;
840    }
841    total
842}
843
844pub fn command_builder_from_argv(argv: &[String]) -> CommandBuilder {
845    let mut command = CommandBuilder::new(&argv[0]);
846    if argv.len() > 1 {
847        command.args(
848            argv[1..]
849                .iter()
850                .map(OsString::from)
851                .collect::<Vec<OsString>>(),
852        );
853    }
854    command
855}
856
857#[inline(never)]
858pub fn spawn_pty_reader(
859    mut reader: Box<dyn Read + Send>,
860    shared: Arc<PtyReadShared>,
861    echo: Arc<AtomicBool>,
862    idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
863    output_bytes_total: Arc<AtomicUsize>,
864    control_churn_bytes_total: Arc<AtomicUsize>,
865) {
866    crate::rp_rust_debug_scope!("running_process_core::spawn_pty_reader");
867    let idle_detector_snapshot = idle_detector
868        .lock()
869        .expect("idle detector mutex poisoned")
870        .clone();
871    let mut chunk = vec![0_u8; 65536];
872    loop {
873        match reader.read(&mut chunk) {
874            Ok(0) => break,
875            Ok(n) => {
876                let data = &chunk[..n];
877
878                let churn = control_churn_bytes(data);
879                let visible = data.len().saturating_sub(churn);
880                output_bytes_total.fetch_add(visible, Ordering::Relaxed);
881                control_churn_bytes_total.fetch_add(churn, Ordering::Relaxed);
882
883                if echo.load(Ordering::Relaxed) {
884                    let _ = std::io::stdout().write_all(data);
885                    let _ = std::io::stdout().flush();
886                }
887
888                if let Some(ref detector) = idle_detector_snapshot {
889                    detector.record_output(data);
890                }
891
892                let mut guard = shared.state.lock().expect("pty read mutex poisoned");
893                guard.chunks.push_back(data.to_vec());
894                shared.condvar.notify_all();
895            }
896            Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
897            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
898                thread::sleep(Duration::from_millis(10));
899                continue;
900            }
901            Err(_) => break,
902        }
903    }
904    let mut guard = shared.state.lock().expect("pty read mutex poisoned");
905    guard.closed = true;
906    shared.condvar.notify_all();
907}
908
909pub fn portable_exit_code(status: portable_pty::ExitStatus) -> i32 {
910    if let Some(signal) = status.signal() {
911        let signal = signal.to_ascii_lowercase();
912        if signal.contains("interrupt") {
913            return -2;
914        }
915        if signal.contains("terminated") {
916            return -15;
917        }
918        if signal.contains("killed") {
919            return -9;
920        }
921    }
922    status.exit_code() as i32
923}
924
925pub fn input_contains_newline(data: &[u8]) -> bool {
926    data.iter().any(|byte| matches!(*byte, b'\r' | b'\n'))
927}
928
929pub fn record_pty_input_metrics(
930    input_bytes_total: &Arc<AtomicUsize>,
931    newline_events_total: &Arc<AtomicUsize>,
932    submit_events_total: &Arc<AtomicUsize>,
933    data: &[u8],
934    submit: bool,
935) {
936    input_bytes_total.fetch_add(data.len(), Ordering::AcqRel);
937    if input_contains_newline(data) {
938        newline_events_total.fetch_add(1, Ordering::AcqRel);
939    }
940    if submit {
941        submit_events_total.fetch_add(1, Ordering::AcqRel);
942    }
943}
944
945pub fn store_pty_returncode(returncode: &Arc<Mutex<Option<i32>>>, code: i32) {
946    *returncode.lock().expect("pty returncode mutex poisoned") = Some(code);
947}
948
949pub fn poll_pty_process(
950    handles: &Arc<Mutex<Option<NativePtyHandles>>>,
951    returncode: &Arc<Mutex<Option<i32>>>,
952) -> Result<Option<i32>, std::io::Error> {
953    let mut guard = handles.lock().expect("pty handles mutex poisoned");
954    let Some(handles) = guard.as_mut() else {
955        return Ok(*returncode.lock().expect("pty returncode mutex poisoned"));
956    };
957    let status = handles.child.try_wait()?;
958    let code = status.map(portable_exit_code);
959    if let Some(code) = code {
960        store_pty_returncode(returncode, code);
961        return Ok(Some(code));
962    }
963    Ok(None)
964}
965
966pub fn write_pty_input(
967    handles: &Arc<Mutex<Option<NativePtyHandles>>>,
968    data: &[u8],
969) -> Result<(), std::io::Error> {
970    let mut guard = handles.lock().expect("pty handles mutex poisoned");
971    let handles = guard.as_mut().ok_or_else(|| {
972        std::io::Error::new(
973            std::io::ErrorKind::NotConnected,
974            "Pseudo-terminal process is not running",
975        )
976    })?;
977    #[cfg(windows)]
978    let payload = pty_windows::input_payload(data);
979    #[cfg(unix)]
980    let payload = pty_platform::input_payload(data);
981    handles.writer.write_all(&payload)?;
982    handles.writer.flush()
983}
984
985#[cfg(windows)]
986pub fn windows_terminal_input_payload(data: &[u8]) -> Vec<u8> {
987    let mut translated = Vec::with_capacity(data.len());
988    let mut index = 0usize;
989    while index < data.len() {
990        let current = data[index];
991        if current == b'\r' {
992            translated.push(current);
993            if index + 1 < data.len() && data[index + 1] == b'\n' {
994                translated.push(b'\n');
995                index += 2;
996                continue;
997            }
998            index += 1;
999            continue;
1000        }
1001        if current == b'\n' {
1002            translated.push(b'\r');
1003            index += 1;
1004            continue;
1005        }
1006        translated.push(current);
1007        index += 1;
1008    }
1009    translated
1010}
1011
1012#[cfg(windows)]
1013#[inline(never)]
1014pub fn assign_child_to_windows_kill_on_close_job(
1015    handle: Option<std::os::windows::io::RawHandle>,
1016) -> Result<WindowsJobHandle, PtyError> {
1017    crate::rp_rust_debug_scope!(
1018        "running_process_core::pty::assign_child_to_windows_kill_on_close_job"
1019    );
1020    use std::mem::zeroed;
1021
1022    use winapi::shared::minwindef::FALSE;
1023    use winapi::um::handleapi::INVALID_HANDLE_VALUE;
1024    use winapi::um::jobapi2::{
1025        AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
1026    };
1027    use winapi::um::winnt::{
1028        JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
1029        JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
1030    };
1031
1032    let Some(handle) = handle else {
1033        return Err(PtyError::Other(
1034            "Pseudo-terminal child does not expose a Windows process handle".into(),
1035        ));
1036    };
1037
1038    let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
1039    if job.is_null() || job == INVALID_HANDLE_VALUE {
1040        return Err(PtyError::Io(std::io::Error::last_os_error()));
1041    }
1042
1043    let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
1044    info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
1045    let result = unsafe {
1046        SetInformationJobObject(
1047            job,
1048            JobObjectExtendedLimitInformation,
1049            (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
1050            std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
1051        )
1052    };
1053    if result == FALSE {
1054        let err = std::io::Error::last_os_error();
1055        unsafe {
1056            winapi::um::handleapi::CloseHandle(job);
1057        }
1058        return Err(PtyError::Io(err));
1059    }
1060
1061    let result = unsafe { AssignProcessToJobObject(job, handle.cast()) };
1062    if result == FALSE {
1063        let err = std::io::Error::last_os_error();
1064        unsafe {
1065            winapi::um::handleapi::CloseHandle(job);
1066        }
1067        return Err(PtyError::Io(err));
1068    }
1069
1070    Ok(WindowsJobHandle(job as usize))
1071}
1072
1073/// Information about a child process found via Toolhelp snapshot.
1074#[cfg(windows)]
1075#[derive(Debug, Clone)]
1076pub struct ChildProcessInfo {
1077    pub pid: u32,
1078    pub name: String,
1079}
1080
1081/// Find all direct child processes of a given parent PID using the Windows Toolhelp API.
1082/// Returns PID and process name for each child.
1083#[cfg(windows)]
1084pub fn find_child_processes(parent_pid: u32) -> Vec<ChildProcessInfo> {
1085    use winapi::um::handleapi::CloseHandle;
1086    use winapi::um::tlhelp32::{
1087        CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1088    };
1089
1090    let mut children = Vec::new();
1091    let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1092    if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1093        return children;
1094    }
1095
1096    let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1097    entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1098
1099    if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1100        loop {
1101            if entry.th32ParentProcessID == parent_pid {
1102                let name_bytes = &entry.szExeFile;
1103                let name_len = name_bytes
1104                    .iter()
1105                    .position(|&b| b == 0)
1106                    .unwrap_or(name_bytes.len());
1107                let name = String::from_utf8_lossy(
1108                    &name_bytes[..name_len]
1109                        .iter()
1110                        .map(|&c| c as u8)
1111                        .collect::<Vec<u8>>(),
1112                )
1113                .into_owned();
1114                children.push(ChildProcessInfo {
1115                    pid: entry.th32ProcessID,
1116                    name,
1117                });
1118            }
1119            if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1120                break;
1121            }
1122        }
1123    }
1124
1125    unsafe { CloseHandle(snapshot) };
1126    children
1127}
1128
1129/// Return PIDs of all conhost.exe processes that are children of the current process.
1130#[cfg(windows)]
1131fn conhost_children_of_current_process() -> Vec<u32> {
1132    let our_pid = std::process::id();
1133    find_child_processes(our_pid)
1134        .into_iter()
1135        .filter(|c| c.name.eq_ignore_ascii_case("conhost.exe"))
1136        .map(|c| c.pid)
1137        .collect()
1138}
1139
1140/// After spawning a ConPTY child, find the new conhost.exe process that was created
1141/// by the ConPTY infrastructure (child of our process, not present in the "before"
1142/// snapshot) and assign it to the Job Object so it gets cleaned up on Job close.
1143#[cfg(windows)]
1144fn assign_conpty_conhost_to_job(job: &WindowsJobHandle, before_pids: &[u32]) {
1145    let after_pids = conhost_children_of_current_process();
1146    for pid in after_pids {
1147        if !before_pids.contains(&pid) {
1148            // This is a newly created conhost.exe — assign it to the Job.
1149            let _ = job.assign_pid(pid);
1150        }
1151    }
1152}
1153
1154/// A conhost.exe process whose parent is no longer alive — likely an orphan
1155/// from a dead ConPTY session.
1156#[cfg(windows)]
1157#[derive(Debug, Clone)]
1158pub struct OrphanConhostInfo {
1159    /// PID of the orphaned conhost.exe.
1160    pub pid: u32,
1161    /// PID that was the parent when the snapshot was taken.
1162    pub parent_pid: u32,
1163    /// Name of the parent process, if it can be resolved (empty if parent is dead).
1164    pub parent_name: String,
1165}
1166
1167/// Scan all conhost.exe processes on the system and return those whose parent
1168/// process is no longer alive. These are likely orphans from dead ConPTY sessions.
1169///
1170/// Uses `CreateToolhelp32Snapshot` for a point-in-time snapshot — no sysinfo
1171/// dependency, so it's lightweight and can be called frequently.
1172#[cfg(windows)]
1173pub fn find_orphan_conhosts() -> Vec<OrphanConhostInfo> {
1174    use winapi::um::handleapi::CloseHandle;
1175    use winapi::um::tlhelp32::{
1176        CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, TH32CS_SNAPPROCESS,
1177    };
1178
1179    let snapshot = unsafe { CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0) };
1180    if snapshot == winapi::um::handleapi::INVALID_HANDLE_VALUE {
1181        return Vec::new();
1182    }
1183
1184    let mut entry: PROCESSENTRY32 = unsafe { std::mem::zeroed() };
1185    entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
1186
1187    // First pass: collect all PIDs and identify conhost.exe processes.
1188    let mut all_pids = std::collections::HashSet::new();
1189    let mut conhosts: Vec<(u32, u32)> = Vec::new(); // (pid, parent_pid)
1190    let mut parent_names: std::collections::HashMap<u32, String> = std::collections::HashMap::new();
1191
1192    if unsafe { Process32First(snapshot, &mut entry) } != 0 {
1193        loop {
1194            let name_bytes = &entry.szExeFile;
1195            let name_len = name_bytes
1196                .iter()
1197                .position(|&b| b == 0)
1198                .unwrap_or(name_bytes.len());
1199            let name = String::from_utf8_lossy(
1200                &name_bytes[..name_len]
1201                    .iter()
1202                    .map(|&c| c as u8)
1203                    .collect::<Vec<u8>>(),
1204            )
1205            .into_owned();
1206
1207            all_pids.insert(entry.th32ProcessID);
1208            parent_names.insert(entry.th32ProcessID, name.clone());
1209
1210            if name.eq_ignore_ascii_case("conhost.exe") {
1211                conhosts.push((entry.th32ProcessID, entry.th32ParentProcessID));
1212            }
1213
1214            if unsafe { Process32Next(snapshot, &mut entry) } == 0 {
1215                break;
1216            }
1217        }
1218    }
1219
1220    unsafe { CloseHandle(snapshot) };
1221
1222    // Second pass: filter to conhosts whose parent PID is not in the live set.
1223    conhosts
1224        .into_iter()
1225        .filter(|&(_, parent_pid)| !all_pids.contains(&parent_pid))
1226        .map(|(pid, parent_pid)| OrphanConhostInfo {
1227            pid,
1228            parent_pid,
1229            parent_name: parent_names.get(&parent_pid).cloned().unwrap_or_default(),
1230        })
1231        .collect()
1232}
1233
1234#[cfg(windows)]
1235#[inline(never)]
1236pub fn apply_windows_pty_priority(
1237    handle: Option<std::os::windows::io::RawHandle>,
1238    nice: Option<i32>,
1239) -> Result<(), PtyError> {
1240    crate::rp_rust_debug_scope!("running_process_core::pty::apply_windows_pty_priority");
1241    use winapi::um::processthreadsapi::SetPriorityClass;
1242    use winapi::um::winbase::{
1243        ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, HIGH_PRIORITY_CLASS,
1244        IDLE_PRIORITY_CLASS,
1245    };
1246
1247    let Some(handle) = handle else {
1248        return Ok(());
1249    };
1250    let flags = match nice {
1251        Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
1252        Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
1253        Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
1254        Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
1255        _ => 0,
1256    };
1257    if flags == 0 {
1258        return Ok(());
1259    }
1260    let result = unsafe { SetPriorityClass(handle.cast(), flags) };
1261    if result == 0 {
1262        return Err(PtyError::Io(std::io::Error::last_os_error()));
1263    }
1264    Ok(())
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269    use super::resolved_spawn_cwd;
1270
1271    #[test]
1272    fn resolved_spawn_cwd_preserves_explicit_value() {
1273        assert_eq!(
1274            resolved_spawn_cwd(Some("C:\\temp\\explicit")),
1275            Some("C:\\temp\\explicit".to_string())
1276        );
1277    }
1278
1279    #[test]
1280    fn resolved_spawn_cwd_defaults_to_current_dir_when_unset() {
1281        let expected = std::env::current_dir()
1282            .ok()
1283            .map(|cwd| cwd.to_string_lossy().to_string());
1284        assert_eq!(resolved_spawn_cwd(None), expected);
1285    }
1286}