Skip to main content

running_process/pty/
native_pty_process.rs

1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::{Duration, Instant};
6
7#[cfg(unix)]
8use super::backend::PtySlave;
9use super::backend::{Backend, PtyBackend, PtyChild, PtyMaster, PtySize};
10#[cfg(unix)]
11use super::posix_terminal_input_relay_worker;
12#[cfg(windows)]
13use super::{
14    apply_windows_pty_priority, assign_child_to_windows_kill_on_close_job,
15    assign_conpty_conhost_to_job, conhost_children_of_current_process,
16};
17use super::{
18    is_ignorable_process_control_error, poll_pty_process, record_pty_input_metrics,
19    spawn_pty_reader, store_pty_returncode, write_pty_input, IdleDetectorCore, NativePtyHandles,
20    PtyError, PtyReadShared, PtyReadState,
21};
22
23#[cfg(unix)]
24use super::pty_posix as pty_platform;
25#[cfg(windows)]
26use super::pty_windows;
27
28/// Low-level native pseudo-terminal process wrapper.
29///
30/// The process is configured at construction time and is spawned by
31/// [`Self::start_impl`]. Output is collected by a reader thread and exposed
32/// through the chunk-reading methods.
33pub struct NativePtyProcess {
34    /// Command argv, including the executable as the first element.
35    pub argv: Vec<String>,
36    /// Working directory used when spawning the child, or the current directory.
37    pub cwd: Option<String>,
38    /// Environment overrides passed to the child process.
39    pub env: Option<Vec<(String, String)>>,
40    /// Initial PTY row count.
41    pub rows: u16,
42    /// Initial PTY column count.
43    pub cols: u16,
44    /// Optional Windows process priority hint for the PTY child.
45    #[cfg(windows)]
46    pub nice: Option<i32>,
47    /// Native PTY handles for the running child, present after start.
48    pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
49    /// Shared reader queue and condition variable for PTY output.
50    pub reader: Arc<PtyReadShared>,
51    /// Cached child exit code once the process has exited.
52    pub returncode: Arc<Mutex<Option<i32>>>,
53    /// Total bytes written to the PTY input stream.
54    pub input_bytes_total: Arc<AtomicUsize>,
55    /// Count of input writes containing a newline.
56    pub newline_events_total: Arc<AtomicUsize>,
57    /// Count of explicit submit events recorded for PTY input.
58    pub submit_events_total: Arc<AtomicUsize>,
59    /// When true, the reader thread writes PTY output to stdout.
60    pub echo: Arc<AtomicBool>,
61    /// When set, the reader thread feeds output directly to the idle detector.
62    pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
63    /// Visible (non-control) output bytes seen by the reader thread.
64    pub output_bytes_total: Arc<AtomicUsize>,
65    /// Control churn bytes (ANSI escapes, BS, CR, DEL) seen by the reader.
66    pub control_churn_bytes_total: Arc<AtomicUsize>,
67    /// Background worker that drains PTY output into the shared queue.
68    pub reader_worker: Mutex<Option<thread::JoinHandle<()>>>,
69    /// Stop flag observed by the terminal input relay worker.
70    pub terminal_input_relay_stop: Arc<AtomicBool>,
71    /// Whether the terminal input relay worker is currently active.
72    pub terminal_input_relay_active: Arc<AtomicBool>,
73    /// Background worker that forwards local terminal input into the PTY.
74    pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
75}
76
77pub(super) fn resolved_spawn_cwd(cwd: Option<&str>) -> Option<String> {
78    cwd.map(str::to_owned).or_else(|| {
79        std::env::current_dir()
80            .ok()
81            .map(|cwd| cwd.to_string_lossy().to_string())
82    })
83}
84
85impl NativePtyProcess {
86    /// Create a pseudo-terminal process configuration.
87    ///
88    /// The child is not spawned until [`Self::start_impl`] is called.
89    pub fn new(
90        argv: Vec<String>,
91        cwd: Option<String>,
92        env: Option<Vec<(String, String)>>,
93        rows: u16,
94        cols: u16,
95        nice: Option<i32>,
96    ) -> Result<Self, PtyError> {
97        if argv.is_empty() {
98            return Err(PtyError::Other("command cannot be empty".into()));
99        }
100        #[cfg(not(windows))]
101        let _ = nice;
102        Ok(Self {
103            argv,
104            cwd,
105            env,
106            rows,
107            cols,
108            #[cfg(windows)]
109            nice,
110            handles: Arc::new(Mutex::new(None)),
111            reader: Arc::new(PtyReadShared {
112                state: Mutex::new(PtyReadState {
113                    chunks: VecDeque::new(),
114                    closed: false,
115                }),
116                condvar: Condvar::new(),
117            }),
118            returncode: Arc::new(Mutex::new(None)),
119            input_bytes_total: Arc::new(AtomicUsize::new(0)),
120            newline_events_total: Arc::new(AtomicUsize::new(0)),
121            submit_events_total: Arc::new(AtomicUsize::new(0)),
122            echo: Arc::new(AtomicBool::new(false)),
123            idle_detector: Arc::new(Mutex::new(None)),
124            output_bytes_total: Arc::new(AtomicUsize::new(0)),
125            control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
126            reader_worker: Mutex::new(None),
127            terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
128            terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
129            terminal_input_relay_worker: Mutex::new(None),
130        })
131    }
132
133    /// Mark the reader stream closed and wake all waiting readers.
134    pub fn mark_reader_closed(&self) {
135        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
136        guard.closed = true;
137        self.reader.condvar.notify_all();
138    }
139
140    /// Store the process return code if it has been observed.
141    pub fn store_returncode(&self, code: i32) {
142        store_pty_returncode(&self.returncode, code);
143    }
144
145    pub(super) fn join_reader_worker(&self) {
146        if let Some(worker) = self
147            .reader_worker
148            .lock()
149            .expect("pty reader worker mutex poisoned")
150            .take()
151        {
152            let _ = worker.join();
153        }
154    }
155
156    /// Record PTY input byte, newline, and submit counters.
157    pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
158        record_pty_input_metrics(
159            &self.input_bytes_total,
160            &self.newline_events_total,
161            &self.submit_events_total,
162            data,
163            submit,
164        );
165    }
166
167    /// Write bytes to the PTY input stream and record input metrics.
168    pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
169        self.record_input_metrics(data, submit);
170        write_pty_input(&self.handles, data)?;
171        Ok(())
172    }
173
174    /// Signal the terminal input relay worker to stop.
175    pub fn request_terminal_input_relay_stop(&self) {
176        self.terminal_input_relay_stop
177            .store(true, Ordering::Release);
178        self.terminal_input_relay_active
179            .store(false, Ordering::Release);
180    }
181
182    /// Start forwarding local terminal input into the PTY.
183    pub fn start_terminal_input_relay_impl(&self) -> Result<(), PtyError> {
184        let mut worker_guard = self
185            .terminal_input_relay_worker
186            .lock()
187            .expect("pty terminal input relay mutex poisoned");
188        if worker_guard.is_some() && self.terminal_input_relay_active() {
189            return Ok(());
190        }
191        if self
192            .handles
193            .lock()
194            .expect("pty handles mutex poisoned")
195            .is_none()
196        {
197            return Err(PtyError::NotRunning);
198        }
199
200        self.terminal_input_relay_stop
201            .store(false, Ordering::Release);
202        self.terminal_input_relay_active
203            .store(true, Ordering::Release);
204
205        let handles = Arc::clone(&self.handles);
206        let returncode = Arc::clone(&self.returncode);
207        let input_bytes_total = Arc::clone(&self.input_bytes_total);
208        let newline_events_total = Arc::clone(&self.newline_events_total);
209        let submit_events_total = Arc::clone(&self.submit_events_total);
210        let stop = Arc::clone(&self.terminal_input_relay_stop);
211        let active = Arc::clone(&self.terminal_input_relay_active);
212
213        #[cfg(windows)]
214        {
215            let capture = super::terminal_input::TerminalInputCore::new();
216            capture.start_impl().map_err(PtyError::Io)?;
217            *worker_guard = Some(thread::spawn(move || {
218                loop {
219                    if stop.load(Ordering::Acquire) {
220                        break;
221                    }
222                    match poll_pty_process(&handles, &returncode) {
223                        Ok(Some(_)) => break,
224                        Ok(None) => {}
225                        Err(_) => break,
226                    }
227                    match super::terminal_input::wait_for_terminal_input_event(
228                        &capture.state,
229                        &capture.condvar,
230                        Some(Duration::from_millis(50)),
231                    ) {
232                        super::terminal_input::TerminalInputWaitOutcome::Event(event) => {
233                            record_pty_input_metrics(
234                                &input_bytes_total,
235                                &newline_events_total,
236                                &submit_events_total,
237                                &event.data,
238                                event.submit,
239                            );
240                            if write_pty_input(&handles, &event.data).is_err() {
241                                break;
242                            }
243                        }
244                        super::terminal_input::TerminalInputWaitOutcome::Timeout => continue,
245                        super::terminal_input::TerminalInputWaitOutcome::Closed => break,
246                    }
247                }
248                active.store(false, Ordering::Release);
249                let _ = capture.stop_impl();
250            }));
251            Ok(())
252        }
253
254        #[cfg(unix)]
255        {
256            if unsafe { libc::isatty(libc::STDIN_FILENO) } != 1 {
257                self.terminal_input_relay_active
258                    .store(false, Ordering::Release);
259                return Ok(());
260            }
261
262            *worker_guard = Some(thread::spawn(move || {
263                posix_terminal_input_relay_worker(
264                    handles,
265                    returncode,
266                    input_bytes_total,
267                    newline_events_total,
268                    submit_events_total,
269                    stop,
270                    active,
271                );
272            }));
273            Ok(())
274        }
275    }
276
277    /// Stop the terminal input relay worker and wait for it to exit.
278    pub fn stop_terminal_input_relay_impl(&self) {
279        self.request_terminal_input_relay_stop();
280        if let Some(worker) = self
281            .terminal_input_relay_worker
282            .lock()
283            .expect("pty terminal input relay mutex poisoned")
284            .take()
285        {
286            let _ = worker.join();
287        }
288    }
289
290    /// Return whether the terminal input relay worker is active.
291    pub fn terminal_input_relay_active(&self) -> bool {
292        self.terminal_input_relay_active.load(Ordering::Acquire)
293    }
294
295    /// Synchronously tear down the PTY and reap the child.
296    #[inline(never)]
297    pub fn close_impl(&self) -> Result<(), PtyError> {
298        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_impl");
299        self.stop_terminal_input_relay_impl();
300        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
301        let Some(handles) = guard.take() else {
302            self.mark_reader_closed();
303            return Ok(());
304        };
305        drop(guard);
306
307        #[cfg(windows)]
308        let NativePtyHandles {
309            master,
310            writer,
311            mut child,
312            _job,
313        } = handles;
314        #[cfg(not(windows))]
315        let NativePtyHandles {
316            master,
317            writer,
318            mut child,
319        } = handles;
320
321        #[cfg(windows)]
322        {
323            {
324                crate::rp_rust_debug_scope!(
325                    "running_process::NativePtyProcess::close_impl.drop_job"
326                );
327                drop(_job);
328            }
329
330            {
331                crate::rp_rust_debug_scope!(
332                    "running_process::NativePtyProcess::close_impl.wait_job_exit"
333                );
334                let wait_deadline = Instant::now() + Duration::from_secs(2);
335                loop {
336                    match child.try_wait() {
337                        Ok(Some(status)) => {
338                            let code = status as i32;
339                            self.store_returncode(code);
340                            break;
341                        }
342                        Ok(None) if Instant::now() < wait_deadline => {
343                            // #199: intentional — `PtyChild` doesn't
344                            // expose a "wait with timeout" method
345                            // (portable-pty's Child trait doesn't
346                            // either). Polling at 10ms inside a
347                            // bounded 2-second deadline is the
348                            // close-path graceful-exit watcher.
349                            thread::sleep(Duration::from_millis(10));
350                        }
351                        Ok(None) => {
352                            if let Err(err) = child.kill() {
353                                if !is_ignorable_process_control_error(&err) {
354                                    return Err(PtyError::Io(err));
355                                }
356                            }
357                            let code = match child.wait() {
358                                Ok(status) => status as i32,
359                                Err(_) => -9,
360                            };
361                            self.store_returncode(code);
362                            break;
363                        }
364                        Err(_) => {
365                            self.store_returncode(-9);
366                            break;
367                        }
368                    }
369                }
370            }
371            {
372                crate::rp_rust_debug_scope!(
373                    "running_process::NativePtyProcess::close_impl.drop_writer"
374                );
375                drop(writer);
376            }
377            {
378                crate::rp_rust_debug_scope!(
379                    "running_process::NativePtyProcess::close_impl.drop_master"
380                );
381                drop(master);
382            }
383            drop(child);
384            {
385                crate::rp_rust_debug_scope!(
386                    "running_process::NativePtyProcess::close_impl.join_reader"
387                );
388                self.join_reader_worker();
389            }
390            self.mark_reader_closed();
391            Ok(())
392        }
393
394        #[cfg(not(windows))]
395        {
396            drop(writer);
397            drop(master);
398
399            let code = {
400                crate::rp_rust_debug_scope!(
401                    "running_process::NativePtyProcess::close_impl.wait_child"
402                );
403                match child.wait() {
404                    Ok(status) => status as i32,
405                    Err(_) => -9,
406                }
407            };
408            drop(child);
409
410            self.store_returncode(code);
411            {
412                crate::rp_rust_debug_scope!(
413                    "running_process::NativePtyProcess::close_impl.join_reader"
414                );
415                self.join_reader_worker();
416            }
417            self.mark_reader_closed();
418            Ok(())
419        }
420    }
421
422    /// Best-effort, non-blocking teardown for use from `Drop`.
423    #[inline(never)]
424    pub fn close_nonblocking(&self) {
425        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_nonblocking");
426        #[cfg(windows)]
427        self.request_terminal_input_relay_stop();
428        let Ok(mut guard) = self.handles.lock() else {
429            return;
430        };
431        let Some(handles) = guard.take() else {
432            self.mark_reader_closed();
433            return;
434        };
435        drop(guard);
436
437        #[cfg(windows)]
438        let NativePtyHandles {
439            master,
440            writer,
441            mut child,
442            _job,
443        } = handles;
444        #[cfg(not(windows))]
445        let NativePtyHandles {
446            master,
447            writer,
448            mut child,
449        } = handles;
450
451        if let Err(err) = child.kill() {
452            if !is_ignorable_process_control_error(&err) {
453                return;
454            }
455        }
456        drop(writer);
457        drop(master);
458        drop(child);
459        #[cfg(windows)]
460        drop(_job);
461        self.mark_reader_closed();
462    }
463
464    /// Spawn the configured child process inside a native PTY.
465    pub fn start_impl(&self) -> Result<(), PtyError> {
466        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::start");
467        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
468        if guard.is_some() {
469            return Err(PtyError::AlreadyStarted);
470        }
471
472        // Snapshot our conhost.exe children before openpty() so we can diff
473        // after spawn to find the new conhost.exe created by ConPTY.
474        #[cfg(windows)]
475        let conhost_pids_before = conhost_children_of_current_process();
476
477        let (mut master, slave) = Backend::openpty(PtySize {
478            rows: self.rows,
479            cols: self.cols,
480            pixel_width: 0,
481            pixel_height: 0,
482        })
483        .map_err(|e| PtyError::Spawn(e.to_string()))?;
484
485        // Build argv/cwd/env in the shape the backend wants.
486        let argv: Vec<std::ffi::OsString> =
487            self.argv.iter().map(std::ffi::OsString::from).collect();
488        let cwd = resolved_spawn_cwd(self.cwd.as_deref());
489        let env: Option<Vec<(std::ffi::OsString, std::ffi::OsString)>> =
490            self.env.as_ref().map(|e| {
491                e.iter()
492                    .map(|(k, v)| (std::ffi::OsString::from(k), std::ffi::OsString::from(v)))
493                    .collect()
494            });
495
496        let reader = master
497            .try_clone_reader()
498            .map_err(|e| PtyError::Spawn(e.to_string()))?;
499        let writer = master
500            .take_writer()
501            .map_err(|e| PtyError::Spawn(e.to_string()))?;
502        let cwd_path = cwd.as_deref().map(std::path::Path::new);
503        let child = slave
504            .spawn(&argv, cwd_path, env.as_deref())
505            .map_err(|e| PtyError::Spawn(e.to_string()))?;
506        // The trait's PtyChild::as_raw_handle returns Option<RawHandle>
507        // matching portable_pty's signature; pass directly.
508        #[cfg(windows)]
509        let job = assign_child_to_windows_kill_on_close_job(PtyChild::as_raw_handle(&child))?;
510        #[cfg(windows)]
511        assign_conpty_conhost_to_job(&job, &conhost_pids_before);
512        #[cfg(windows)]
513        apply_windows_pty_priority(PtyChild::as_raw_handle(&child), self.nice)?;
514        let shared = Arc::clone(&self.reader);
515        let echo = Arc::clone(&self.echo);
516        let idle_detector = Arc::clone(&self.idle_detector);
517        let output_bytes = Arc::clone(&self.output_bytes_total);
518        let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
519        let reader_worker = thread::spawn(move || {
520            spawn_pty_reader(
521                reader,
522                shared,
523                echo,
524                idle_detector,
525                output_bytes,
526                churn_bytes,
527            );
528        });
529        *self
530            .reader_worker
531            .lock()
532            .expect("pty reader worker mutex poisoned") = Some(reader_worker);
533
534        *guard = Some(NativePtyHandles {
535            master: Box::new(master) as Box<dyn PtyMaster>,
536            writer,
537            child: Box::new(child) as Box<dyn PtyChild>,
538            #[cfg(windows)]
539            _job: job,
540        });
541        Ok(())
542    }
543
544    /// Respond to terminal query escape sequences found in a PTY output chunk.
545    pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
546        #[cfg(windows)]
547        {
548            pty_windows::respond_to_queries(self, data)
549        }
550
551        #[cfg(unix)]
552        {
553            pty_platform::respond_to_queries(self, data)
554        }
555    }
556
557    /// Resize the PTY to the given row and column dimensions.
558    pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
559        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::resize");
560        let guard = self.handles.lock().expect("pty handles mutex poisoned");
561        if let Some(handles) = guard.as_ref() {
562            #[cfg(windows)]
563            {
564                let _ = (rows, cols, handles);
565                // ConPTY resize can leave ClosePseudoConsole blocked during
566                // teardown on Windows. Keep resize as a no-op until the
567                // backend can cancel the outstanding PTY read safely.
568                return Ok(());
569            }
570
571            #[cfg(not(windows))]
572            handles
573                .master
574                .resize(PtySize {
575                    rows,
576                    cols,
577                    pixel_width: 0,
578                    pixel_height: 0,
579                })
580                .map_err(|e| PtyError::Other(e.to_string()))?;
581        }
582        Ok(())
583    }
584
585    /// Send an interrupt signal or control event to the PTY child.
586    pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
587        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::send_interrupt");
588        #[cfg(windows)]
589        {
590            pty_windows::send_interrupt(self)
591        }
592
593        #[cfg(unix)]
594        {
595            pty_platform::send_interrupt(self)
596        }
597    }
598
599    /// Wait for the PTY child to exit and return its exit code.
600    ///
601    /// Returns a timeout error when `timeout` elapses before exit.
602    pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
603        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::wait");
604        // Fast path: already exited.
605        if let Some(code) = *self
606            .returncode
607            .lock()
608            .expect("pty returncode mutex poisoned")
609        {
610            return Ok(code);
611        }
612        let start = Instant::now();
613        loop {
614            if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
615                return Ok(code);
616            }
617            if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
618                return Err(PtyError::Timeout);
619            }
620            // #199: intentional — `wait_impl` poll. Same constraint
621            // as the close_impl variant above: no per-Child wait
622            // primitive on the trait surface.
623            thread::sleep(Duration::from_millis(10));
624        }
625    }
626
627    /// Request graceful termination of the PTY child.
628    pub fn terminate_impl(&self) -> Result<(), PtyError> {
629        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate");
630        #[cfg(windows)]
631        {
632            if self
633                .handles
634                .lock()
635                .expect("pty handles mutex poisoned")
636                .is_none()
637            {
638                return Err(PtyError::NotRunning);
639            }
640            self.close_impl()
641        }
642
643        #[cfg(unix)]
644        {
645            pty_platform::terminate(self)
646        }
647    }
648
649    /// Forcefully terminate the PTY child.
650    pub fn kill_impl(&self) -> Result<(), PtyError> {
651        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill");
652        #[cfg(windows)]
653        {
654            if self
655                .handles
656                .lock()
657                .expect("pty handles mutex poisoned")
658                .is_none()
659            {
660                return Err(PtyError::NotRunning);
661            }
662            self.close_impl()
663        }
664
665        #[cfg(unix)]
666        {
667            pty_platform::kill(self)
668        }
669    }
670
671    /// Request graceful termination of the PTY child process tree.
672    pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
673        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate_tree");
674        #[cfg(windows)]
675        {
676            pty_windows::terminate_tree(self)
677        }
678
679        #[cfg(unix)]
680        {
681            pty_platform::terminate_tree(self)
682        }
683    }
684
685    /// Forcefully terminate the PTY child process tree.
686    pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
687        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill_tree");
688        #[cfg(windows)]
689        {
690            pty_windows::kill_tree(self)
691        }
692
693        #[cfg(unix)]
694        {
695            pty_platform::kill_tree(self)
696        }
697    }
698
699    /// Get the PID of the child process, if running.
700    pub fn pid(&self) -> Result<Option<u32>, PtyError> {
701        let guard = self.handles.lock().expect("pty handles mutex poisoned");
702        if let Some(handles) = guard.as_ref() {
703            #[cfg(unix)]
704            if let Some(pid) = handles.master.process_group_leader() {
705                if let Ok(pid) = u32::try_from(pid) {
706                    return Ok(Some(pid));
707                }
708            }
709            return Ok(Some(handles.child.pid()));
710        }
711        Ok(None)
712    }
713
714    /// Wait for a chunk of output from the PTY reader.
715    /// Returns `Ok(Some(chunk))` on data, `Ok(None)` on timeout, `Err` on closed.
716    pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
717        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
718        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
719        loop {
720            if let Some(chunk) = guard.chunks.pop_front() {
721                return Ok(Some(chunk));
722            }
723            if guard.closed {
724                return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
725            }
726            match deadline {
727                Some(deadline) => {
728                    let now = Instant::now();
729                    if now >= deadline {
730                        return Ok(None); // timeout
731                    }
732                    let wait = deadline.saturating_duration_since(now);
733                    let result = self
734                        .reader
735                        .condvar
736                        .wait_timeout(guard, wait)
737                        .expect("pty read mutex poisoned");
738                    guard = result.0;
739                }
740                None => {
741                    guard = self
742                        .reader
743                        .condvar
744                        .wait(guard)
745                        .expect("pty read mutex poisoned");
746                }
747            }
748        }
749    }
750
751    /// Wait for the reader thread to close.
752    pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
753        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
754        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
755        loop {
756            if guard.closed {
757                return true;
758            }
759            match deadline {
760                Some(deadline) => {
761                    let now = Instant::now();
762                    if now >= deadline {
763                        return false;
764                    }
765                    let wait = deadline.saturating_duration_since(now);
766                    let result = self
767                        .reader
768                        .condvar
769                        .wait_timeout(guard, wait)
770                        .expect("pty read mutex poisoned");
771                    guard = result.0;
772                }
773                None => {
774                    guard = self
775                        .reader
776                        .condvar
777                        .wait(guard)
778                        .expect("pty read mutex poisoned");
779                }
780            }
781        }
782    }
783
784    /// Wait for exit then drain remaining output.
785    pub fn wait_and_drain_impl(
786        &self,
787        timeout: Option<f64>,
788        drain_timeout: f64,
789    ) -> Result<i32, PtyError> {
790        let code = self.wait_impl(timeout)?;
791        let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
792        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
793        while !guard.closed {
794            let remaining = deadline.saturating_duration_since(Instant::now());
795            if remaining.is_zero() {
796                break;
797            }
798            let result = self
799                .reader
800                .condvar
801                .wait_timeout(guard, remaining)
802                .expect("pty read mutex poisoned");
803            guard = result.0;
804        }
805        Ok(code)
806    }
807
808    /// Enable or disable echoing PTY output to stdout.
809    pub fn set_echo(&self, enabled: bool) {
810        self.echo.store(enabled, Ordering::Release);
811    }
812
813    /// Return whether PTY output echoing is enabled.
814    pub fn echo_enabled(&self) -> bool {
815        self.echo.load(Ordering::Acquire)
816    }
817
818    /// Attach an idle detector that observes reader-thread output.
819    pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
820        let mut guard = self
821            .idle_detector
822            .lock()
823            .expect("idle detector mutex poisoned");
824        *guard = Some(Arc::clone(detector));
825    }
826
827    /// Detach the current idle detector, if one is attached.
828    pub fn detach_idle_detector(&self) {
829        let mut guard = self
830            .idle_detector
831            .lock()
832            .expect("idle detector mutex poisoned");
833        *guard = None;
834    }
835
836    /// Return total bytes written to PTY input.
837    pub fn pty_input_bytes_total(&self) -> usize {
838        self.input_bytes_total.load(Ordering::Acquire)
839    }
840
841    /// Return the number of PTY input writes containing newlines.
842    pub fn pty_newline_events_total(&self) -> usize {
843        self.newline_events_total.load(Ordering::Acquire)
844    }
845
846    /// Return the number of recorded PTY input submit events.
847    pub fn pty_submit_events_total(&self) -> usize {
848        self.submit_events_total.load(Ordering::Acquire)
849    }
850
851    /// Return visible PTY output bytes observed by the reader thread.
852    pub fn pty_output_bytes_total(&self) -> usize {
853        self.output_bytes_total.load(Ordering::Acquire)
854    }
855
856    /// Return control-churn bytes observed by the reader thread.
857    pub fn pty_control_churn_bytes_total(&self) -> usize {
858        self.control_churn_bytes_total.load(Ordering::Acquire)
859    }
860}
861
862/// Safe defaults for a real interactive PTY session.
863///
864/// The helper turns on the parts that a terminal-style session usually needs:
865/// output echo, terminal input relay, and automatic PTY query replies.
866#[derive(Debug, Clone, Copy)]
867pub struct InteractivePtyOptions {
868    /// Echo PTY output to stdout while the session is running.
869    pub echo_output: bool,
870    /// Relay local terminal input into the PTY.
871    pub relay_terminal_input: bool,
872    /// Automatically answer terminal query escape sequences.
873    pub respond_to_queries: bool,
874}
875
876impl Default for InteractivePtyOptions {
877    fn default() -> Self {
878        Self {
879            echo_output: true,
880            relay_terminal_input: true,
881            respond_to_queries: true,
882        }
883    }
884}
885
886/// Output collected by one interactive PTY pump operation.
887#[derive(Debug, Default)]
888pub struct InteractivePtyPumpResult {
889    /// Output chunks read from the PTY.
890    pub chunks: Vec<Vec<u8>>,
891    /// Whether the PTY stream closed while pumping output.
892    pub stream_closed: bool,
893}
894
895/// Canonical interactive PTY recipe for downstream Rust consumers.
896///
897/// `NativePtyProcess` remains the low-level primitive. This wrapper owns the
898/// interactive setup that callers commonly forget to assemble correctly.
899pub struct InteractivePtySession {
900    process: NativePtyProcess,
901    options: InteractivePtyOptions,
902}
903
904impl InteractivePtySession {
905    /// Create an interactive PTY session with default options.
906    pub fn new(process: NativePtyProcess) -> Self {
907        Self::with_options(process, InteractivePtyOptions::default())
908    }
909
910    /// Create an interactive PTY session with explicit options.
911    pub fn with_options(process: NativePtyProcess, options: InteractivePtyOptions) -> Self {
912        Self { process, options }
913    }
914
915    /// Return the wrapped low-level PTY process.
916    pub fn process(&self) -> &NativePtyProcess {
917        &self.process
918    }
919
920    /// Start the wrapped PTY process and configured interactive helpers.
921    pub fn start(&self) -> Result<(), PtyError> {
922        self.process.set_echo(self.options.echo_output);
923        self.process.start_impl()?;
924        if self.options.relay_terminal_input {
925            self.process.start_terminal_input_relay_impl()?;
926        }
927        Ok(())
928    }
929
930    /// Read and optionally drain available PTY output.
931    ///
932    /// When query responses are enabled, terminal queries in each chunk are
933    /// answered before the chunk is returned.
934    pub fn pump_output(
935        &self,
936        timeout: Option<f64>,
937        consume_all: bool,
938    ) -> Result<InteractivePtyPumpResult, PtyError> {
939        let mut pumped = InteractivePtyPumpResult::default();
940        let mut next_timeout = timeout;
941        loop {
942            match self.process.read_chunk_impl(next_timeout) {
943                Ok(Some(chunk)) => {
944                    if self.options.respond_to_queries {
945                        self.process.respond_to_queries_impl(&chunk)?;
946                    }
947                    pumped.chunks.push(chunk);
948                    if !consume_all {
949                        break;
950                    }
951                    next_timeout = Some(0.0);
952                }
953                Ok(None) => break,
954                Err(PtyError::Other(message)) if message == "Pseudo-terminal stream is closed" => {
955                    pumped.stream_closed = true;
956                    break;
957                }
958                Err(err) => return Err(err),
959            }
960        }
961        Ok(pumped)
962    }
963
964    /// Resize the interactive PTY.
965    pub fn resize(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
966        self.process.resize_impl(rows, cols)
967    }
968
969    /// Send an interrupt to the interactive PTY child.
970    pub fn send_interrupt(&self) -> Result<(), PtyError> {
971        self.process.send_interrupt_impl()
972    }
973
974    /// Wait for the interactive PTY child to exit.
975    pub fn wait(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
976        self.process.wait_impl(timeout)
977    }
978
979    /// Wait for the child to exit, then drain remaining PTY output.
980    pub fn wait_and_drain(
981        &self,
982        timeout: Option<f64>,
983        drain_timeout: f64,
984    ) -> Result<i32, PtyError> {
985        self.process.wait_and_drain_impl(timeout, drain_timeout)
986    }
987
988    /// Request graceful termination of the interactive PTY child.
989    pub fn terminate(&self) -> Result<(), PtyError> {
990        self.process.terminate_impl()
991    }
992
993    /// Forcefully terminate the interactive PTY child.
994    pub fn kill(&self) -> Result<(), PtyError> {
995        self.process.kill_impl()
996    }
997
998    /// Close the interactive PTY session.
999    pub fn close(&self) -> Result<(), PtyError> {
1000        self.process.close_impl()
1001    }
1002}
1003
1004impl Drop for NativePtyProcess {
1005    fn drop(&mut self) {
1006        self.close_nonblocking();
1007    }
1008}