Skip to main content

running_process_core/pty/
mod.rs

1use std::collections::VecDeque;
2use std::ffi::OsString;
3use std::io::{Read, Write};
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Condvar, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
10use thiserror::Error;
11
12/// Re-exports for downstream crates that need portable-pty types.
13pub mod reexports {
14    pub use portable_pty;
15}
16
17#[cfg(unix)]
18mod pty_posix;
19#[cfg(windows)]
20mod pty_windows;
21
22pub mod terminal_input;
23
24#[cfg(unix)]
25use pty_posix as pty_platform;
26
27#[derive(Debug, Error)]
28pub enum PtyError {
29    #[error("pseudo-terminal process already started")]
30    AlreadyStarted,
31    #[error("pseudo-terminal process is not running")]
32    NotRunning,
33    #[error("pseudo-terminal timed out")]
34    Timeout,
35    #[error("pseudo-terminal I/O error: {0}")]
36    Io(#[from] std::io::Error),
37    #[error("pseudo-terminal spawn failed: {0}")]
38    Spawn(String),
39    #[error("pseudo-terminal error: {0}")]
40    Other(String),
41}
42
43pub fn is_ignorable_process_control_error(err: &std::io::Error) -> bool {
44    if matches!(
45        err.kind(),
46        std::io::ErrorKind::NotFound | std::io::ErrorKind::InvalidInput
47    ) {
48        return true;
49    }
50    #[cfg(unix)]
51    if err.raw_os_error() == Some(libc::ESRCH) {
52        return true;
53    }
54    false
55}
56
57pub struct PtyReadState {
58    pub chunks: VecDeque<Vec<u8>>,
59    pub closed: bool,
60}
61
62pub struct PtyReadShared {
63    pub state: Mutex<PtyReadState>,
64    pub condvar: Condvar,
65}
66
67pub struct NativePtyHandles {
68    pub master: Box<dyn MasterPty + Send>,
69    pub writer: Box<dyn Write + Send>,
70    pub child: Box<dyn portable_pty::Child + Send + Sync>,
71    #[cfg(windows)]
72    pub _job: WindowsJobHandle,
73}
74
75#[cfg(windows)]
76pub struct WindowsJobHandle(pub usize);
77
78#[cfg(windows)]
79impl Drop for WindowsJobHandle {
80    fn drop(&mut self) {
81        unsafe {
82            winapi::um::handleapi::CloseHandle(self.0 as winapi::shared::ntdef::HANDLE);
83        }
84    }
85}
86
87pub struct IdleMonitorState {
88    pub last_reset_at: Instant,
89    pub returncode: Option<i32>,
90    pub interrupted: bool,
91}
92
93/// Core idle detection logic, shareable across threads via Arc.
94/// The reader thread calls `record_output` directly.
95pub struct IdleDetectorCore {
96    pub timeout_seconds: f64,
97    pub stability_window_seconds: f64,
98    pub sample_interval_seconds: f64,
99    pub reset_on_input: bool,
100    pub reset_on_output: bool,
101    pub count_control_churn_as_output: bool,
102    pub enabled: Arc<AtomicBool>,
103    pub state: Mutex<IdleMonitorState>,
104    pub condvar: Condvar,
105}
106
107impl IdleDetectorCore {
108    pub fn record_input(&self, byte_count: usize) {
109        if !self.reset_on_input || byte_count == 0 {
110            return;
111        }
112        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
113        guard.last_reset_at = Instant::now();
114        self.condvar.notify_all();
115    }
116
117    pub fn record_output(&self, data: &[u8]) {
118        if !self.reset_on_output || data.is_empty() {
119            return;
120        }
121        let control_bytes = control_churn_bytes(data);
122        let visible_output_bytes = data.len().saturating_sub(control_bytes);
123        let active_output =
124            visible_output_bytes > 0 || (self.count_control_churn_as_output && control_bytes > 0);
125        if !active_output {
126            return;
127        }
128        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
129        guard.last_reset_at = Instant::now();
130        self.condvar.notify_all();
131    }
132
133    pub fn mark_exit(&self, returncode: i32, interrupted: bool) {
134        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
135        guard.returncode = Some(returncode);
136        guard.interrupted = interrupted;
137        self.condvar.notify_all();
138    }
139
140    pub fn enabled(&self) -> bool {
141        self.enabled.load(Ordering::Acquire)
142    }
143
144    pub fn set_enabled(&self, enabled: bool) {
145        let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
146        if enabled && !was_enabled {
147            let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
148            guard.last_reset_at = Instant::now();
149        }
150        self.condvar.notify_all();
151    }
152
153    pub fn wait(&self, timeout: Option<f64>) -> (bool, String, f64, Option<i32>) {
154        let started = Instant::now();
155        let overall_timeout = timeout.map(Duration::from_secs_f64);
156        let min_idle = self.timeout_seconds.max(self.stability_window_seconds);
157        let sample_interval = Duration::from_secs_f64(self.sample_interval_seconds.max(0.001));
158
159        let mut guard = self.state.lock().expect("idle monitor mutex poisoned");
160        loop {
161            let now = Instant::now();
162            let idle_for = now.duration_since(guard.last_reset_at).as_secs_f64();
163
164            if let Some(returncode) = guard.returncode {
165                let reason = if guard.interrupted {
166                    "interrupt"
167                } else {
168                    "process_exit"
169                };
170                return (false, reason.to_string(), idle_for, Some(returncode));
171            }
172
173            let enabled = self.enabled.load(Ordering::Acquire);
174            if enabled && idle_for >= min_idle {
175                return (true, "idle_timeout".to_string(), idle_for, None);
176            }
177
178            if let Some(limit) = overall_timeout {
179                if now.duration_since(started) >= limit {
180                    return (false, "timeout".to_string(), idle_for, None);
181                }
182            }
183
184            let idle_remaining = if enabled {
185                (min_idle - idle_for).max(0.0)
186            } else {
187                sample_interval.as_secs_f64()
188            };
189            let mut wait_for =
190                sample_interval.min(Duration::from_secs_f64(idle_remaining.max(0.001)));
191            if let Some(limit) = overall_timeout {
192                let elapsed = now.duration_since(started);
193                if elapsed < limit {
194                    let remaining = limit - elapsed;
195                    wait_for = wait_for.min(remaining);
196                }
197            }
198            let result = self
199                .condvar
200                .wait_timeout(guard, wait_for)
201                .expect("idle monitor mutex poisoned");
202            guard = result.0;
203        }
204    }
205}
206
207pub struct NativePtyProcess {
208    pub argv: Vec<String>,
209    pub cwd: Option<String>,
210    pub env: Option<Vec<(String, String)>>,
211    pub rows: u16,
212    pub cols: u16,
213    #[cfg(windows)]
214    pub nice: Option<i32>,
215    pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
216    pub reader: Arc<PtyReadShared>,
217    pub returncode: Arc<Mutex<Option<i32>>>,
218    pub input_bytes_total: Arc<AtomicUsize>,
219    pub newline_events_total: Arc<AtomicUsize>,
220    pub submit_events_total: Arc<AtomicUsize>,
221    /// When true, the reader thread writes PTY output to stdout.
222    pub echo: Arc<AtomicBool>,
223    /// When set, the reader thread feeds output directly to the idle detector.
224    pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
225    /// Visible (non-control) output bytes seen by the reader thread.
226    pub output_bytes_total: Arc<AtomicUsize>,
227    /// Control churn bytes (ANSI escapes, BS, CR, DEL) seen by the reader.
228    pub control_churn_bytes_total: Arc<AtomicUsize>,
229    #[cfg(windows)]
230    pub terminal_input_relay_stop: Arc<AtomicBool>,
231    #[cfg(windows)]
232    pub terminal_input_relay_active: Arc<AtomicBool>,
233    #[cfg(windows)]
234    pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
235}
236
237impl NativePtyProcess {
238    pub fn new(
239        argv: Vec<String>,
240        cwd: Option<String>,
241        env: Option<Vec<(String, String)>>,
242        rows: u16,
243        cols: u16,
244        nice: Option<i32>,
245    ) -> Result<Self, PtyError> {
246        if argv.is_empty() {
247            return Err(PtyError::Other("command cannot be empty".into()));
248        }
249        #[cfg(not(windows))]
250        let _ = nice;
251        Ok(Self {
252            argv,
253            cwd,
254            env,
255            rows,
256            cols,
257            #[cfg(windows)]
258            nice,
259            handles: Arc::new(Mutex::new(None)),
260            reader: Arc::new(PtyReadShared {
261                state: Mutex::new(PtyReadState {
262                    chunks: VecDeque::new(),
263                    closed: false,
264                }),
265                condvar: Condvar::new(),
266            }),
267            returncode: Arc::new(Mutex::new(None)),
268            input_bytes_total: Arc::new(AtomicUsize::new(0)),
269            newline_events_total: Arc::new(AtomicUsize::new(0)),
270            submit_events_total: Arc::new(AtomicUsize::new(0)),
271            echo: Arc::new(AtomicBool::new(false)),
272            idle_detector: Arc::new(Mutex::new(None)),
273            output_bytes_total: Arc::new(AtomicUsize::new(0)),
274            control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
275            #[cfg(windows)]
276            terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
277            #[cfg(windows)]
278            terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
279            #[cfg(windows)]
280            terminal_input_relay_worker: Mutex::new(None),
281        })
282    }
283
284    pub fn mark_reader_closed(&self) {
285        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
286        guard.closed = true;
287        self.reader.condvar.notify_all();
288    }
289
290    pub fn store_returncode(&self, code: i32) {
291        store_pty_returncode(&self.returncode, code);
292    }
293
294    pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
295        record_pty_input_metrics(
296            &self.input_bytes_total,
297            &self.newline_events_total,
298            &self.submit_events_total,
299            data,
300            submit,
301        );
302    }
303
304    pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
305        self.record_input_metrics(data, submit);
306        write_pty_input(&self.handles, data)?;
307        Ok(())
308    }
309
310    #[cfg(windows)]
311    pub fn request_terminal_input_relay_stop(&self) {
312        self.terminal_input_relay_stop
313            .store(true, Ordering::Release);
314        self.terminal_input_relay_active
315            .store(false, Ordering::Release);
316    }
317
318    #[cfg(windows)]
319    pub fn stop_terminal_input_relay_impl(&self) {
320        self.request_terminal_input_relay_stop();
321        if let Some(worker) = self
322            .terminal_input_relay_worker
323            .lock()
324            .expect("pty terminal input relay mutex poisoned")
325            .take()
326        {
327            let _ = worker.join();
328        }
329    }
330
331    #[cfg(not(windows))]
332    pub fn stop_terminal_input_relay_impl(&self) {}
333
334    /// Synchronously tear down the PTY and reap the child.
335    #[inline(never)]
336    pub fn close_impl(&self) -> Result<(), PtyError> {
337        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_impl");
338        self.stop_terminal_input_relay_impl();
339        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
340        let Some(handles) = guard.take() else {
341            self.mark_reader_closed();
342            return Ok(());
343        };
344        drop(guard);
345
346        #[cfg(windows)]
347        let NativePtyHandles {
348            master,
349            writer,
350            mut child,
351            _job,
352        } = handles;
353        #[cfg(not(windows))]
354        let NativePtyHandles {
355            master,
356            writer,
357            mut child,
358        } = handles;
359
360        if let Err(err) = child.kill() {
361            if !is_ignorable_process_control_error(&err) {
362                return Err(PtyError::Io(err));
363            }
364        }
365
366        drop(writer);
367        drop(master);
368
369        let code = match child.wait() {
370            Ok(status) => portable_exit_code(status),
371            Err(_) => -9,
372        };
373        drop(child);
374        #[cfg(windows)]
375        drop(_job);
376
377        self.store_returncode(code);
378        self.mark_reader_closed();
379        Ok(())
380    }
381
382    /// Best-effort, non-blocking teardown for use from `Drop`.
383    #[inline(never)]
384    pub fn close_nonblocking(&self) {
385        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::close_nonblocking");
386        #[cfg(windows)]
387        self.request_terminal_input_relay_stop();
388        let Ok(mut guard) = self.handles.lock() else {
389            return;
390        };
391        let Some(handles) = guard.take() else {
392            self.mark_reader_closed();
393            return;
394        };
395        drop(guard);
396
397        #[cfg(windows)]
398        let NativePtyHandles {
399            master,
400            writer,
401            mut child,
402            _job,
403        } = handles;
404        #[cfg(not(windows))]
405        let NativePtyHandles {
406            master,
407            writer,
408            mut child,
409        } = handles;
410
411        if let Err(err) = child.kill() {
412            if !is_ignorable_process_control_error(&err) {
413                return;
414            }
415        }
416        drop(writer);
417        drop(master);
418        drop(child);
419        #[cfg(windows)]
420        drop(_job);
421        self.mark_reader_closed();
422    }
423
424    pub fn start_impl(&self) -> Result<(), PtyError> {
425        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::start");
426        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
427        if guard.is_some() {
428            return Err(PtyError::AlreadyStarted);
429        }
430
431        let pty_system = native_pty_system();
432        let pair = pty_system
433            .openpty(PtySize {
434                rows: self.rows,
435                cols: self.cols,
436                pixel_width: 0,
437                pixel_height: 0,
438            })
439            .map_err(|e| PtyError::Spawn(e.to_string()))?;
440
441        let mut cmd = command_builder_from_argv(&self.argv);
442        if let Some(cwd) = &self.cwd {
443            cmd.cwd(cwd);
444        }
445        if let Some(env) = &self.env {
446            cmd.env_clear();
447            for (key, value) in env {
448                cmd.env(key, value);
449            }
450        }
451
452        let reader = pair.master.try_clone_reader().map_err(|e| PtyError::Spawn(e.to_string()))?;
453        let writer = pair.master.take_writer().map_err(|e| PtyError::Spawn(e.to_string()))?;
454        let child = pair.slave.spawn_command(cmd).map_err(|e| PtyError::Spawn(e.to_string()))?;
455        #[cfg(windows)]
456        let job = assign_child_to_windows_kill_on_close_job(child.as_raw_handle())?;
457        #[cfg(windows)]
458        apply_windows_pty_priority(child.as_raw_handle(), self.nice)?;
459        let shared = Arc::clone(&self.reader);
460        let echo = Arc::clone(&self.echo);
461        let idle_detector = Arc::clone(&self.idle_detector);
462        let output_bytes = Arc::clone(&self.output_bytes_total);
463        let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
464        thread::spawn(move || {
465            spawn_pty_reader(
466                reader,
467                shared,
468                echo,
469                idle_detector,
470                output_bytes,
471                churn_bytes,
472            );
473        });
474
475        *guard = Some(NativePtyHandles {
476            master: pair.master,
477            writer,
478            child,
479            #[cfg(windows)]
480            _job: job,
481        });
482        Ok(())
483    }
484
485    pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
486        #[cfg(windows)]
487        {
488            pty_windows::respond_to_queries(self, data)
489        }
490
491        #[cfg(unix)]
492        {
493            pty_platform::respond_to_queries(self, data)
494        }
495    }
496
497    pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
498        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::resize");
499        let guard = self.handles.lock().expect("pty handles mutex poisoned");
500        if let Some(handles) = guard.as_ref() {
501            handles
502                .master
503                .resize(PtySize {
504                    rows,
505                    cols,
506                    pixel_width: 0,
507                    pixel_height: 0,
508                })
509                .map_err(|e| PtyError::Other(e.to_string()))?;
510        }
511        Ok(())
512    }
513
514    pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
515        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::send_interrupt");
516        #[cfg(windows)]
517        {
518            pty_windows::send_interrupt(self)
519        }
520
521        #[cfg(unix)]
522        {
523            pty_platform::send_interrupt(self)
524        }
525    }
526
527    pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
528        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::wait");
529        // Fast path: already exited.
530        if let Some(code) = *self.returncode.lock().expect("pty returncode mutex poisoned") {
531            return Ok(code);
532        }
533        let start = Instant::now();
534        loop {
535            if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
536                return Ok(code);
537            }
538            if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
539                return Err(PtyError::Timeout);
540            }
541            thread::sleep(Duration::from_millis(10));
542        }
543    }
544
545    pub fn terminate_impl(&self) -> Result<(), PtyError> {
546        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate");
547        #[cfg(windows)]
548        {
549            pty_windows::terminate(self)
550        }
551
552        #[cfg(unix)]
553        {
554            pty_platform::terminate(self)
555        }
556    }
557
558    pub fn kill_impl(&self) -> Result<(), PtyError> {
559        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill");
560        #[cfg(windows)]
561        {
562            pty_windows::kill(self)
563        }
564
565        #[cfg(unix)]
566        {
567            pty_platform::kill(self)
568        }
569    }
570
571    pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
572        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::terminate_tree");
573        #[cfg(windows)]
574        {
575            pty_windows::terminate_tree(self)
576        }
577
578        #[cfg(unix)]
579        {
580            pty_platform::terminate_tree(self)
581        }
582    }
583
584    pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
585        crate::rp_rust_debug_scope!("running_process_core::NativePtyProcess::kill_tree");
586        #[cfg(windows)]
587        {
588            pty_windows::kill_tree(self)
589        }
590
591        #[cfg(unix)]
592        {
593            pty_platform::kill_tree(self)
594        }
595    }
596
597    /// Get the PID of the child process, if running.
598    pub fn pid(&self) -> Result<Option<u32>, PtyError> {
599        let guard = self.handles.lock().expect("pty handles mutex poisoned");
600        if let Some(handles) = guard.as_ref() {
601            #[cfg(unix)]
602            if let Some(pid) = handles.master.process_group_leader() {
603                if let Ok(pid) = u32::try_from(pid) {
604                    return Ok(Some(pid));
605                }
606            }
607            return Ok(handles.child.process_id());
608        }
609        Ok(None)
610    }
611
612    /// Wait for a chunk of output from the PTY reader.
613    /// Returns `Ok(Some(chunk))` on data, `Ok(None)` on timeout, `Err` on closed.
614    pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
615        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
616        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
617        loop {
618            if let Some(chunk) = guard.chunks.pop_front() {
619                return Ok(Some(chunk));
620            }
621            if guard.closed {
622                return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
623            }
624            match deadline {
625                Some(deadline) => {
626                    let now = Instant::now();
627                    if now >= deadline {
628                        return Ok(None); // timeout
629                    }
630                    let wait = deadline.saturating_duration_since(now);
631                    let result = self
632                        .reader
633                        .condvar
634                        .wait_timeout(guard, wait)
635                        .expect("pty read mutex poisoned");
636                    guard = result.0;
637                }
638                None => {
639                    guard = self
640                        .reader
641                        .condvar
642                        .wait(guard)
643                        .expect("pty read mutex poisoned");
644                }
645            }
646        }
647    }
648
649    /// Wait for the reader thread to close.
650    pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
651        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
652        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
653        loop {
654            if guard.closed {
655                return true;
656            }
657            match deadline {
658                Some(deadline) => {
659                    let now = Instant::now();
660                    if now >= deadline {
661                        return false;
662                    }
663                    let wait = deadline.saturating_duration_since(now);
664                    let result = self
665                        .reader
666                        .condvar
667                        .wait_timeout(guard, wait)
668                        .expect("pty read mutex poisoned");
669                    guard = result.0;
670                }
671                None => {
672                    guard = self
673                        .reader
674                        .condvar
675                        .wait(guard)
676                        .expect("pty read mutex poisoned");
677                }
678            }
679        }
680    }
681
682    /// Wait for exit then drain remaining output.
683    pub fn wait_and_drain_impl(
684        &self,
685        timeout: Option<f64>,
686        drain_timeout: f64,
687    ) -> Result<i32, PtyError> {
688        let code = self.wait_impl(timeout)?;
689        let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
690        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
691        while !guard.closed {
692            let remaining = deadline.saturating_duration_since(Instant::now());
693            if remaining.is_zero() {
694                break;
695            }
696            let result = self
697                .reader
698                .condvar
699                .wait_timeout(guard, remaining)
700                .expect("pty read mutex poisoned");
701            guard = result.0;
702        }
703        Ok(code)
704    }
705
706    pub fn set_echo(&self, enabled: bool) {
707        self.echo.store(enabled, Ordering::Release);
708    }
709
710    pub fn echo_enabled(&self) -> bool {
711        self.echo.load(Ordering::Acquire)
712    }
713
714    pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
715        let mut guard = self
716            .idle_detector
717            .lock()
718            .expect("idle detector mutex poisoned");
719        *guard = Some(Arc::clone(detector));
720    }
721
722    pub fn detach_idle_detector(&self) {
723        let mut guard = self
724            .idle_detector
725            .lock()
726            .expect("idle detector mutex poisoned");
727        *guard = None;
728    }
729
730    pub fn pty_input_bytes_total(&self) -> usize {
731        self.input_bytes_total.load(Ordering::Acquire)
732    }
733
734    pub fn pty_newline_events_total(&self) -> usize {
735        self.newline_events_total.load(Ordering::Acquire)
736    }
737
738    pub fn pty_submit_events_total(&self) -> usize {
739        self.submit_events_total.load(Ordering::Acquire)
740    }
741
742    pub fn pty_output_bytes_total(&self) -> usize {
743        self.output_bytes_total.load(Ordering::Acquire)
744    }
745
746    pub fn pty_control_churn_bytes_total(&self) -> usize {
747        self.control_churn_bytes_total.load(Ordering::Acquire)
748    }
749}
750
751impl Drop for NativePtyProcess {
752    fn drop(&mut self) {
753        self.close_nonblocking();
754    }
755}
756
757// ── Helper functions ──
758
759pub fn control_churn_bytes(data: &[u8]) -> usize {
760    let mut total = 0;
761    let mut index = 0;
762    while index < data.len() {
763        let byte = data[index];
764        if byte == 0x1B {
765            let start = index;
766            index += 1;
767            if index < data.len() && data[index] == b'[' {
768                index += 1;
769                while index < data.len() {
770                    let current = data[index];
771                    index += 1;
772                    if (0x40..=0x7E).contains(&current) {
773                        break;
774                    }
775                }
776            }
777            total += index - start;
778            continue;
779        }
780        if matches!(byte, 0x08 | 0x0D | 0x7F) {
781            total += 1;
782        }
783        index += 1;
784    }
785    total
786}
787
788pub fn command_builder_from_argv(argv: &[String]) -> CommandBuilder {
789    let mut command = CommandBuilder::new(&argv[0]);
790    if argv.len() > 1 {
791        command.args(
792            argv[1..]
793                .iter()
794                .map(OsString::from)
795                .collect::<Vec<OsString>>(),
796        );
797    }
798    command
799}
800
801#[inline(never)]
802pub fn spawn_pty_reader(
803    mut reader: Box<dyn Read + Send>,
804    shared: Arc<PtyReadShared>,
805    echo: Arc<AtomicBool>,
806    idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
807    output_bytes_total: Arc<AtomicUsize>,
808    control_churn_bytes_total: Arc<AtomicUsize>,
809) {
810    crate::rp_rust_debug_scope!("running_process_core::spawn_pty_reader");
811    let idle_detector_snapshot = idle_detector
812        .lock()
813        .expect("idle detector mutex poisoned")
814        .clone();
815    let mut chunk = vec![0_u8; 65536];
816    loop {
817        match reader.read(&mut chunk) {
818            Ok(0) => break,
819            Ok(n) => {
820                let data = &chunk[..n];
821
822                let churn = control_churn_bytes(data);
823                let visible = data.len().saturating_sub(churn);
824                output_bytes_total.fetch_add(visible, Ordering::Relaxed);
825                control_churn_bytes_total.fetch_add(churn, Ordering::Relaxed);
826
827                if echo.load(Ordering::Relaxed) {
828                    let _ = std::io::stdout().write_all(data);
829                    let _ = std::io::stdout().flush();
830                }
831
832                if let Some(ref detector) = idle_detector_snapshot {
833                    detector.record_output(data);
834                }
835
836                let mut guard = shared.state.lock().expect("pty read mutex poisoned");
837                guard.chunks.push_back(data.to_vec());
838                shared.condvar.notify_all();
839            }
840            Err(err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
841            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
842                thread::sleep(Duration::from_millis(10));
843                continue;
844            }
845            Err(_) => break,
846        }
847    }
848    let mut guard = shared.state.lock().expect("pty read mutex poisoned");
849    guard.closed = true;
850    shared.condvar.notify_all();
851}
852
853pub fn portable_exit_code(status: portable_pty::ExitStatus) -> i32 {
854    if let Some(signal) = status.signal() {
855        let signal = signal.to_ascii_lowercase();
856        if signal.contains("interrupt") {
857            return -2;
858        }
859        if signal.contains("terminated") {
860            return -15;
861        }
862        if signal.contains("killed") {
863            return -9;
864        }
865    }
866    status.exit_code() as i32
867}
868
869pub fn input_contains_newline(data: &[u8]) -> bool {
870    data.iter().any(|byte| matches!(*byte, b'\r' | b'\n'))
871}
872
873pub fn record_pty_input_metrics(
874    input_bytes_total: &Arc<AtomicUsize>,
875    newline_events_total: &Arc<AtomicUsize>,
876    submit_events_total: &Arc<AtomicUsize>,
877    data: &[u8],
878    submit: bool,
879) {
880    input_bytes_total.fetch_add(data.len(), Ordering::AcqRel);
881    if input_contains_newline(data) {
882        newline_events_total.fetch_add(1, Ordering::AcqRel);
883    }
884    if submit {
885        submit_events_total.fetch_add(1, Ordering::AcqRel);
886    }
887}
888
889pub fn store_pty_returncode(returncode: &Arc<Mutex<Option<i32>>>, code: i32) {
890    *returncode.lock().expect("pty returncode mutex poisoned") = Some(code);
891}
892
893pub fn poll_pty_process(
894    handles: &Arc<Mutex<Option<NativePtyHandles>>>,
895    returncode: &Arc<Mutex<Option<i32>>>,
896) -> Result<Option<i32>, std::io::Error> {
897    let mut guard = handles.lock().expect("pty handles mutex poisoned");
898    let Some(handles) = guard.as_mut() else {
899        return Ok(*returncode.lock().expect("pty returncode mutex poisoned"));
900    };
901    let status = handles.child.try_wait()?;
902    let code = status.map(portable_exit_code);
903    if let Some(code) = code {
904        store_pty_returncode(returncode, code);
905        return Ok(Some(code));
906    }
907    Ok(None)
908}
909
910pub fn write_pty_input(
911    handles: &Arc<Mutex<Option<NativePtyHandles>>>,
912    data: &[u8],
913) -> Result<(), std::io::Error> {
914    let mut guard = handles.lock().expect("pty handles mutex poisoned");
915    let handles = guard.as_mut().ok_or_else(|| {
916        std::io::Error::new(
917            std::io::ErrorKind::NotConnected,
918            "Pseudo-terminal process is not running",
919        )
920    })?;
921    #[cfg(windows)]
922    let payload = pty_windows::input_payload(data);
923    #[cfg(unix)]
924    let payload = pty_platform::input_payload(data);
925    handles.writer.write_all(&payload)?;
926    handles.writer.flush()
927}
928
929#[cfg(windows)]
930pub fn windows_terminal_input_payload(data: &[u8]) -> Vec<u8> {
931    let mut translated = Vec::with_capacity(data.len());
932    let mut index = 0usize;
933    while index < data.len() {
934        let current = data[index];
935        if current == b'\r' {
936            translated.push(current);
937            if index + 1 < data.len() && data[index + 1] == b'\n' {
938                translated.push(b'\n');
939                index += 2;
940                continue;
941            }
942            index += 1;
943            continue;
944        }
945        if current == b'\n' {
946            translated.push(b'\r');
947            index += 1;
948            continue;
949        }
950        translated.push(current);
951        index += 1;
952    }
953    translated
954}
955
956#[cfg(windows)]
957#[inline(never)]
958pub fn assign_child_to_windows_kill_on_close_job(
959    handle: Option<std::os::windows::io::RawHandle>,
960) -> Result<WindowsJobHandle, PtyError> {
961    crate::rp_rust_debug_scope!(
962        "running_process_core::pty::assign_child_to_windows_kill_on_close_job"
963    );
964    use std::mem::zeroed;
965
966    use winapi::shared::minwindef::FALSE;
967    use winapi::um::handleapi::INVALID_HANDLE_VALUE;
968    use winapi::um::jobapi2::{
969        AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
970    };
971    use winapi::um::winnt::{
972        JobObjectExtendedLimitInformation, JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
973        JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
974    };
975
976    let Some(handle) = handle else {
977        return Err(PtyError::Other(
978            "Pseudo-terminal child does not expose a Windows process handle".into(),
979        ));
980    };
981
982    let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) };
983    if job.is_null() || job == INVALID_HANDLE_VALUE {
984        return Err(PtyError::Io(std::io::Error::last_os_error()));
985    }
986
987    let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { zeroed() };
988    info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
989    let result = unsafe {
990        SetInformationJobObject(
991            job,
992            JobObjectExtendedLimitInformation,
993            (&mut info as *mut JOBOBJECT_EXTENDED_LIMIT_INFORMATION).cast(),
994            std::mem::size_of::<JOBOBJECT_EXTENDED_LIMIT_INFORMATION>() as u32,
995        )
996    };
997    if result == FALSE {
998        let err = std::io::Error::last_os_error();
999        unsafe {
1000            winapi::um::handleapi::CloseHandle(job);
1001        }
1002        return Err(PtyError::Io(err));
1003    }
1004
1005    let result = unsafe { AssignProcessToJobObject(job, handle.cast()) };
1006    if result == FALSE {
1007        let err = std::io::Error::last_os_error();
1008        unsafe {
1009            winapi::um::handleapi::CloseHandle(job);
1010        }
1011        return Err(PtyError::Io(err));
1012    }
1013
1014    Ok(WindowsJobHandle(job as usize))
1015}
1016
1017#[cfg(windows)]
1018#[inline(never)]
1019pub fn apply_windows_pty_priority(
1020    handle: Option<std::os::windows::io::RawHandle>,
1021    nice: Option<i32>,
1022) -> Result<(), PtyError> {
1023    crate::rp_rust_debug_scope!("running_process_core::pty::apply_windows_pty_priority");
1024    use winapi::um::processthreadsapi::SetPriorityClass;
1025    use winapi::um::winbase::{
1026        ABOVE_NORMAL_PRIORITY_CLASS, BELOW_NORMAL_PRIORITY_CLASS, HIGH_PRIORITY_CLASS,
1027        IDLE_PRIORITY_CLASS,
1028    };
1029
1030    let Some(handle) = handle else {
1031        return Ok(());
1032    };
1033    let flags = match nice {
1034        Some(value) if value >= 15 => IDLE_PRIORITY_CLASS,
1035        Some(value) if value >= 1 => BELOW_NORMAL_PRIORITY_CLASS,
1036        Some(value) if value <= -15 => HIGH_PRIORITY_CLASS,
1037        Some(value) if value <= -1 => ABOVE_NORMAL_PRIORITY_CLASS,
1038        _ => 0,
1039    };
1040    if flags == 0 {
1041        return Ok(());
1042    }
1043    let result = unsafe { SetPriorityClass(handle.cast(), flags) };
1044    if result == 0 {
1045        return Err(PtyError::Io(std::io::Error::last_os_error()));
1046    }
1047    Ok(())
1048}