Skip to main content

running_process/pty/
native_pty_process.rs

1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::{Duration, Instant};
6
7use super::backend::{Backend, PtyBackend, PtyChild, PtyMaster, PtySize};
8#[cfg(unix)]
9use super::backend::PtySlave;
10use super::{
11    is_ignorable_process_control_error, poll_pty_process, record_pty_input_metrics,
12    spawn_pty_reader, store_pty_returncode, write_pty_input, IdleDetectorCore, NativePtyHandles,
13    PtyError, PtyReadShared, PtyReadState,
14};
15#[cfg(unix)]
16use super::posix_terminal_input_relay_worker;
17#[cfg(windows)]
18use super::{
19    apply_windows_pty_priority, assign_child_to_windows_kill_on_close_job,
20    assign_conpty_conhost_to_job, conhost_children_of_current_process,
21};
22
23#[cfg(unix)]
24use super::pty_posix as pty_platform;
25#[cfg(windows)]
26use super::pty_windows;
27
28pub struct NativePtyProcess {
29    pub argv: Vec<String>,
30    pub cwd: Option<String>,
31    pub env: Option<Vec<(String, String)>>,
32    pub rows: u16,
33    pub cols: u16,
34    #[cfg(windows)]
35    pub nice: Option<i32>,
36    pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
37    pub reader: Arc<PtyReadShared>,
38    pub returncode: Arc<Mutex<Option<i32>>>,
39    pub input_bytes_total: Arc<AtomicUsize>,
40    pub newline_events_total: Arc<AtomicUsize>,
41    pub submit_events_total: Arc<AtomicUsize>,
42    /// When true, the reader thread writes PTY output to stdout.
43    pub echo: Arc<AtomicBool>,
44    /// When set, the reader thread feeds output directly to the idle detector.
45    pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
46    /// Visible (non-control) output bytes seen by the reader thread.
47    pub output_bytes_total: Arc<AtomicUsize>,
48    /// Control churn bytes (ANSI escapes, BS, CR, DEL) seen by the reader.
49    pub control_churn_bytes_total: Arc<AtomicUsize>,
50    pub reader_worker: Mutex<Option<thread::JoinHandle<()>>>,
51    pub terminal_input_relay_stop: Arc<AtomicBool>,
52    pub terminal_input_relay_active: Arc<AtomicBool>,
53    pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
54}
55
56pub(super) fn resolved_spawn_cwd(cwd: Option<&str>) -> Option<String> {
57    cwd.map(str::to_owned).or_else(|| {
58        std::env::current_dir()
59            .ok()
60            .map(|cwd| cwd.to_string_lossy().to_string())
61    })
62}
63
64impl NativePtyProcess {
65    pub fn new(
66        argv: Vec<String>,
67        cwd: Option<String>,
68        env: Option<Vec<(String, String)>>,
69        rows: u16,
70        cols: u16,
71        nice: Option<i32>,
72    ) -> Result<Self, PtyError> {
73        if argv.is_empty() {
74            return Err(PtyError::Other("command cannot be empty".into()));
75        }
76        #[cfg(not(windows))]
77        let _ = nice;
78        Ok(Self {
79            argv,
80            cwd,
81            env,
82            rows,
83            cols,
84            #[cfg(windows)]
85            nice,
86            handles: Arc::new(Mutex::new(None)),
87            reader: Arc::new(PtyReadShared {
88                state: Mutex::new(PtyReadState {
89                    chunks: VecDeque::new(),
90                    closed: false,
91                }),
92                condvar: Condvar::new(),
93            }),
94            returncode: Arc::new(Mutex::new(None)),
95            input_bytes_total: Arc::new(AtomicUsize::new(0)),
96            newline_events_total: Arc::new(AtomicUsize::new(0)),
97            submit_events_total: Arc::new(AtomicUsize::new(0)),
98            echo: Arc::new(AtomicBool::new(false)),
99            idle_detector: Arc::new(Mutex::new(None)),
100            output_bytes_total: Arc::new(AtomicUsize::new(0)),
101            control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
102            reader_worker: Mutex::new(None),
103            terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
104            terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
105            terminal_input_relay_worker: Mutex::new(None),
106        })
107    }
108
109    pub fn mark_reader_closed(&self) {
110        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
111        guard.closed = true;
112        self.reader.condvar.notify_all();
113    }
114
115    pub fn store_returncode(&self, code: i32) {
116        store_pty_returncode(&self.returncode, code);
117    }
118
119    pub(super) fn join_reader_worker(&self) {
120        if let Some(worker) = self
121            .reader_worker
122            .lock()
123            .expect("pty reader worker mutex poisoned")
124            .take()
125        {
126            let _ = worker.join();
127        }
128    }
129
130    pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
131        record_pty_input_metrics(
132            &self.input_bytes_total,
133            &self.newline_events_total,
134            &self.submit_events_total,
135            data,
136            submit,
137        );
138    }
139
140    pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
141        self.record_input_metrics(data, submit);
142        write_pty_input(&self.handles, data)?;
143        Ok(())
144    }
145
146    pub fn request_terminal_input_relay_stop(&self) {
147        self.terminal_input_relay_stop
148            .store(true, Ordering::Release);
149        self.terminal_input_relay_active
150            .store(false, Ordering::Release);
151    }
152
153    pub fn start_terminal_input_relay_impl(&self) -> Result<(), PtyError> {
154        let mut worker_guard = self
155            .terminal_input_relay_worker
156            .lock()
157            .expect("pty terminal input relay mutex poisoned");
158        if worker_guard.is_some() && self.terminal_input_relay_active() {
159            return Ok(());
160        }
161        if self
162            .handles
163            .lock()
164            .expect("pty handles mutex poisoned")
165            .is_none()
166        {
167            return Err(PtyError::NotRunning);
168        }
169
170        self.terminal_input_relay_stop
171            .store(false, Ordering::Release);
172        self.terminal_input_relay_active
173            .store(true, Ordering::Release);
174
175        let handles = Arc::clone(&self.handles);
176        let returncode = Arc::clone(&self.returncode);
177        let input_bytes_total = Arc::clone(&self.input_bytes_total);
178        let newline_events_total = Arc::clone(&self.newline_events_total);
179        let submit_events_total = Arc::clone(&self.submit_events_total);
180        let stop = Arc::clone(&self.terminal_input_relay_stop);
181        let active = Arc::clone(&self.terminal_input_relay_active);
182
183        #[cfg(windows)]
184        {
185            let capture = super::terminal_input::TerminalInputCore::new();
186            capture.start_impl().map_err(PtyError::Io)?;
187            *worker_guard = Some(thread::spawn(move || {
188                loop {
189                    if stop.load(Ordering::Acquire) {
190                        break;
191                    }
192                    match poll_pty_process(&handles, &returncode) {
193                        Ok(Some(_)) => break,
194                        Ok(None) => {}
195                        Err(_) => break,
196                    }
197                    match super::terminal_input::wait_for_terminal_input_event(
198                        &capture.state,
199                        &capture.condvar,
200                        Some(Duration::from_millis(50)),
201                    ) {
202                        super::terminal_input::TerminalInputWaitOutcome::Event(event) => {
203                            record_pty_input_metrics(
204                                &input_bytes_total,
205                                &newline_events_total,
206                                &submit_events_total,
207                                &event.data,
208                                event.submit,
209                            );
210                            if write_pty_input(&handles, &event.data).is_err() {
211                                break;
212                            }
213                        }
214                        super::terminal_input::TerminalInputWaitOutcome::Timeout => continue,
215                        super::terminal_input::TerminalInputWaitOutcome::Closed => break,
216                    }
217                }
218                active.store(false, Ordering::Release);
219                let _ = capture.stop_impl();
220            }));
221            Ok(())
222        }
223
224        #[cfg(unix)]
225        {
226            if unsafe { libc::isatty(libc::STDIN_FILENO) } != 1 {
227                self.terminal_input_relay_active
228                    .store(false, Ordering::Release);
229                return Ok(());
230            }
231
232            *worker_guard = Some(thread::spawn(move || {
233                posix_terminal_input_relay_worker(
234                    handles,
235                    returncode,
236                    input_bytes_total,
237                    newline_events_total,
238                    submit_events_total,
239                    stop,
240                    active,
241                );
242            }));
243            Ok(())
244        }
245    }
246
247    pub fn stop_terminal_input_relay_impl(&self) {
248        self.request_terminal_input_relay_stop();
249        if let Some(worker) = self
250            .terminal_input_relay_worker
251            .lock()
252            .expect("pty terminal input relay mutex poisoned")
253            .take()
254        {
255            let _ = worker.join();
256        }
257    }
258
259    pub fn terminal_input_relay_active(&self) -> bool {
260        self.terminal_input_relay_active.load(Ordering::Acquire)
261    }
262
263    /// Synchronously tear down the PTY and reap the child.
264    #[inline(never)]
265    pub fn close_impl(&self) -> Result<(), PtyError> {
266        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_impl");
267        self.stop_terminal_input_relay_impl();
268        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
269        let Some(handles) = guard.take() else {
270            self.mark_reader_closed();
271            return Ok(());
272        };
273        drop(guard);
274
275        #[cfg(windows)]
276        let NativePtyHandles {
277            master,
278            writer,
279            mut child,
280            _job,
281        } = handles;
282        #[cfg(not(windows))]
283        let NativePtyHandles {
284            master,
285            writer,
286            mut child,
287        } = handles;
288
289        #[cfg(windows)]
290        {
291            {
292                crate::rp_rust_debug_scope!(
293                    "running_process::NativePtyProcess::close_impl.drop_job"
294                );
295                drop(_job);
296            }
297
298            {
299                crate::rp_rust_debug_scope!(
300                    "running_process::NativePtyProcess::close_impl.wait_job_exit"
301                );
302                let wait_deadline = Instant::now() + Duration::from_secs(2);
303                loop {
304                    match child.try_wait() {
305                        Ok(Some(status)) => {
306                            let code = status as i32;
307                            self.store_returncode(code);
308                            break;
309                        }
310                        Ok(None) if Instant::now() < wait_deadline => {
311                            // #199: intentional — `PtyChild` doesn't
312                            // expose a "wait with timeout" method
313                            // (portable-pty's Child trait doesn't
314                            // either). Polling at 10ms inside a
315                            // bounded 2-second deadline is the
316                            // close-path graceful-exit watcher.
317                            thread::sleep(Duration::from_millis(10));
318                        }
319                        Ok(None) => {
320                            if let Err(err) = child.kill() {
321                                if !is_ignorable_process_control_error(&err) {
322                                    return Err(PtyError::Io(err));
323                                }
324                            }
325                            let code = match child.wait() {
326                                Ok(status) => status as i32,
327                                Err(_) => -9,
328                            };
329                            self.store_returncode(code);
330                            break;
331                        }
332                        Err(_) => {
333                            self.store_returncode(-9);
334                            break;
335                        }
336                    }
337                }
338            }
339            {
340                crate::rp_rust_debug_scope!(
341                    "running_process::NativePtyProcess::close_impl.drop_writer"
342                );
343                drop(writer);
344            }
345            {
346                crate::rp_rust_debug_scope!(
347                    "running_process::NativePtyProcess::close_impl.drop_master"
348                );
349                drop(master);
350            }
351            drop(child);
352            {
353                crate::rp_rust_debug_scope!(
354                    "running_process::NativePtyProcess::close_impl.join_reader"
355                );
356                self.join_reader_worker();
357            }
358            self.mark_reader_closed();
359            Ok(())
360        }
361
362        #[cfg(not(windows))]
363        {
364            drop(writer);
365            drop(master);
366
367            let code = {
368                crate::rp_rust_debug_scope!(
369                    "running_process::NativePtyProcess::close_impl.wait_child"
370                );
371                match child.wait() {
372                    Ok(status) => status as i32,
373                    Err(_) => -9,
374                }
375            };
376            drop(child);
377
378            self.store_returncode(code);
379            {
380                crate::rp_rust_debug_scope!(
381                    "running_process::NativePtyProcess::close_impl.join_reader"
382                );
383                self.join_reader_worker();
384            }
385            self.mark_reader_closed();
386            Ok(())
387        }
388    }
389
390    /// Best-effort, non-blocking teardown for use from `Drop`.
391    #[inline(never)]
392    pub fn close_nonblocking(&self) {
393        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_nonblocking");
394        #[cfg(windows)]
395        self.request_terminal_input_relay_stop();
396        let Ok(mut guard) = self.handles.lock() else {
397            return;
398        };
399        let Some(handles) = guard.take() else {
400            self.mark_reader_closed();
401            return;
402        };
403        drop(guard);
404
405        #[cfg(windows)]
406        let NativePtyHandles {
407            master,
408            writer,
409            mut child,
410            _job,
411        } = handles;
412        #[cfg(not(windows))]
413        let NativePtyHandles {
414            master,
415            writer,
416            mut child,
417        } = handles;
418
419        if let Err(err) = child.kill() {
420            if !is_ignorable_process_control_error(&err) {
421                return;
422            }
423        }
424        drop(writer);
425        drop(master);
426        drop(child);
427        #[cfg(windows)]
428        drop(_job);
429        self.mark_reader_closed();
430    }
431
432    pub fn start_impl(&self) -> Result<(), PtyError> {
433        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::start");
434        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
435        if guard.is_some() {
436            return Err(PtyError::AlreadyStarted);
437        }
438
439        // Snapshot our conhost.exe children before openpty() so we can diff
440        // after spawn to find the new conhost.exe created by ConPTY.
441        #[cfg(windows)]
442        let conhost_pids_before = conhost_children_of_current_process();
443
444        let (mut master, slave) = Backend::openpty(PtySize {
445            rows: self.rows,
446            cols: self.cols,
447            pixel_width: 0,
448            pixel_height: 0,
449        })
450        .map_err(|e| PtyError::Spawn(e.to_string()))?;
451
452        // Build argv/cwd/env in the shape the backend wants.
453        let argv: Vec<std::ffi::OsString> =
454            self.argv.iter().map(std::ffi::OsString::from).collect();
455        let cwd = resolved_spawn_cwd(self.cwd.as_deref());
456        let env: Option<Vec<(std::ffi::OsString, std::ffi::OsString)>> =
457            self.env.as_ref().map(|e| {
458                e.iter()
459                    .map(|(k, v)| (std::ffi::OsString::from(k), std::ffi::OsString::from(v)))
460                    .collect()
461            });
462
463        let reader = master
464            .try_clone_reader()
465            .map_err(|e| PtyError::Spawn(e.to_string()))?;
466        let writer = master
467            .take_writer()
468            .map_err(|e| PtyError::Spawn(e.to_string()))?;
469        let cwd_path = cwd.as_deref().map(std::path::Path::new);
470        let child = slave
471            .spawn(&argv, cwd_path, env.as_deref())
472            .map_err(|e| PtyError::Spawn(e.to_string()))?;
473        // The trait's PtyChild::as_raw_handle returns Option<RawHandle>
474        // matching portable_pty's signature; pass directly.
475        #[cfg(windows)]
476        let job =
477            assign_child_to_windows_kill_on_close_job(PtyChild::as_raw_handle(&child))?;
478        #[cfg(windows)]
479        assign_conpty_conhost_to_job(&job, &conhost_pids_before);
480        #[cfg(windows)]
481        apply_windows_pty_priority(PtyChild::as_raw_handle(&child), self.nice)?;
482        let shared = Arc::clone(&self.reader);
483        let echo = Arc::clone(&self.echo);
484        let idle_detector = Arc::clone(&self.idle_detector);
485        let output_bytes = Arc::clone(&self.output_bytes_total);
486        let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
487        let reader_worker = thread::spawn(move || {
488            spawn_pty_reader(
489                reader,
490                shared,
491                echo,
492                idle_detector,
493                output_bytes,
494                churn_bytes,
495            );
496        });
497        *self
498            .reader_worker
499            .lock()
500            .expect("pty reader worker mutex poisoned") = Some(reader_worker);
501
502        *guard = Some(NativePtyHandles {
503            master: Box::new(master) as Box<dyn PtyMaster>,
504            writer,
505            child: Box::new(child) as Box<dyn PtyChild>,
506            #[cfg(windows)]
507            _job: job,
508        });
509        Ok(())
510    }
511
512    pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
513        #[cfg(windows)]
514        {
515            pty_windows::respond_to_queries(self, data)
516        }
517
518        #[cfg(unix)]
519        {
520            pty_platform::respond_to_queries(self, data)
521        }
522    }
523
524    pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
525        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::resize");
526        let guard = self.handles.lock().expect("pty handles mutex poisoned");
527        if let Some(handles) = guard.as_ref() {
528            #[cfg(windows)]
529            {
530                let _ = (rows, cols, handles);
531                // ConPTY resize can leave ClosePseudoConsole blocked during
532                // teardown on Windows. Keep resize as a no-op until the
533                // backend can cancel the outstanding PTY read safely.
534                return Ok(());
535            }
536
537            #[cfg(not(windows))]
538            handles
539                .master
540                .resize(PtySize {
541                    rows,
542                    cols,
543                    pixel_width: 0,
544                    pixel_height: 0,
545                })
546                .map_err(|e| PtyError::Other(e.to_string()))?;
547        }
548        Ok(())
549    }
550
551    pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
552        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::send_interrupt");
553        #[cfg(windows)]
554        {
555            pty_windows::send_interrupt(self)
556        }
557
558        #[cfg(unix)]
559        {
560            pty_platform::send_interrupt(self)
561        }
562    }
563
564    pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
565        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::wait");
566        // Fast path: already exited.
567        if let Some(code) = *self
568            .returncode
569            .lock()
570            .expect("pty returncode mutex poisoned")
571        {
572            return Ok(code);
573        }
574        let start = Instant::now();
575        loop {
576            if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
577                return Ok(code);
578            }
579            if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
580                return Err(PtyError::Timeout);
581            }
582            // #199: intentional — `wait_impl` poll. Same constraint
583            // as the close_impl variant above: no per-Child wait
584            // primitive on the trait surface.
585            thread::sleep(Duration::from_millis(10));
586        }
587    }
588
589    pub fn terminate_impl(&self) -> Result<(), PtyError> {
590        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate");
591        #[cfg(windows)]
592        {
593            if self
594                .handles
595                .lock()
596                .expect("pty handles mutex poisoned")
597                .is_none()
598            {
599                return Err(PtyError::NotRunning);
600            }
601            self.close_impl()
602        }
603
604        #[cfg(unix)]
605        {
606            pty_platform::terminate(self)
607        }
608    }
609
610    pub fn kill_impl(&self) -> Result<(), PtyError> {
611        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill");
612        #[cfg(windows)]
613        {
614            if self
615                .handles
616                .lock()
617                .expect("pty handles mutex poisoned")
618                .is_none()
619            {
620                return Err(PtyError::NotRunning);
621            }
622            self.close_impl()
623        }
624
625        #[cfg(unix)]
626        {
627            pty_platform::kill(self)
628        }
629    }
630
631    pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
632        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate_tree");
633        #[cfg(windows)]
634        {
635            pty_windows::terminate_tree(self)
636        }
637
638        #[cfg(unix)]
639        {
640            pty_platform::terminate_tree(self)
641        }
642    }
643
644    pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
645        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill_tree");
646        #[cfg(windows)]
647        {
648            pty_windows::kill_tree(self)
649        }
650
651        #[cfg(unix)]
652        {
653            pty_platform::kill_tree(self)
654        }
655    }
656
657    /// Get the PID of the child process, if running.
658    pub fn pid(&self) -> Result<Option<u32>, PtyError> {
659        let guard = self.handles.lock().expect("pty handles mutex poisoned");
660        if let Some(handles) = guard.as_ref() {
661            #[cfg(unix)]
662            if let Some(pid) = handles.master.process_group_leader() {
663                if let Ok(pid) = u32::try_from(pid) {
664                    return Ok(Some(pid));
665                }
666            }
667            return Ok(Some(handles.child.pid()));
668        }
669        Ok(None)
670    }
671
672    /// Wait for a chunk of output from the PTY reader.
673    /// Returns `Ok(Some(chunk))` on data, `Ok(None)` on timeout, `Err` on closed.
674    pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
675        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
676        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
677        loop {
678            if let Some(chunk) = guard.chunks.pop_front() {
679                return Ok(Some(chunk));
680            }
681            if guard.closed {
682                return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
683            }
684            match deadline {
685                Some(deadline) => {
686                    let now = Instant::now();
687                    if now >= deadline {
688                        return Ok(None); // timeout
689                    }
690                    let wait = deadline.saturating_duration_since(now);
691                    let result = self
692                        .reader
693                        .condvar
694                        .wait_timeout(guard, wait)
695                        .expect("pty read mutex poisoned");
696                    guard = result.0;
697                }
698                None => {
699                    guard = self
700                        .reader
701                        .condvar
702                        .wait(guard)
703                        .expect("pty read mutex poisoned");
704                }
705            }
706        }
707    }
708
709    /// Wait for the reader thread to close.
710    pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
711        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
712        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
713        loop {
714            if guard.closed {
715                return true;
716            }
717            match deadline {
718                Some(deadline) => {
719                    let now = Instant::now();
720                    if now >= deadline {
721                        return false;
722                    }
723                    let wait = deadline.saturating_duration_since(now);
724                    let result = self
725                        .reader
726                        .condvar
727                        .wait_timeout(guard, wait)
728                        .expect("pty read mutex poisoned");
729                    guard = result.0;
730                }
731                None => {
732                    guard = self
733                        .reader
734                        .condvar
735                        .wait(guard)
736                        .expect("pty read mutex poisoned");
737                }
738            }
739        }
740    }
741
742    /// Wait for exit then drain remaining output.
743    pub fn wait_and_drain_impl(
744        &self,
745        timeout: Option<f64>,
746        drain_timeout: f64,
747    ) -> Result<i32, PtyError> {
748        let code = self.wait_impl(timeout)?;
749        let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
750        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
751        while !guard.closed {
752            let remaining = deadline.saturating_duration_since(Instant::now());
753            if remaining.is_zero() {
754                break;
755            }
756            let result = self
757                .reader
758                .condvar
759                .wait_timeout(guard, remaining)
760                .expect("pty read mutex poisoned");
761            guard = result.0;
762        }
763        Ok(code)
764    }
765
766    pub fn set_echo(&self, enabled: bool) {
767        self.echo.store(enabled, Ordering::Release);
768    }
769
770    pub fn echo_enabled(&self) -> bool {
771        self.echo.load(Ordering::Acquire)
772    }
773
774    pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
775        let mut guard = self
776            .idle_detector
777            .lock()
778            .expect("idle detector mutex poisoned");
779        *guard = Some(Arc::clone(detector));
780    }
781
782    pub fn detach_idle_detector(&self) {
783        let mut guard = self
784            .idle_detector
785            .lock()
786            .expect("idle detector mutex poisoned");
787        *guard = None;
788    }
789
790    pub fn pty_input_bytes_total(&self) -> usize {
791        self.input_bytes_total.load(Ordering::Acquire)
792    }
793
794    pub fn pty_newline_events_total(&self) -> usize {
795        self.newline_events_total.load(Ordering::Acquire)
796    }
797
798    pub fn pty_submit_events_total(&self) -> usize {
799        self.submit_events_total.load(Ordering::Acquire)
800    }
801
802    pub fn pty_output_bytes_total(&self) -> usize {
803        self.output_bytes_total.load(Ordering::Acquire)
804    }
805
806    pub fn pty_control_churn_bytes_total(&self) -> usize {
807        self.control_churn_bytes_total.load(Ordering::Acquire)
808    }
809}
810
811/// Safe defaults for a real interactive PTY session.
812///
813/// The helper turns on the parts that a terminal-style session usually needs:
814/// output echo, terminal input relay, and automatic PTY query replies.
815#[derive(Debug, Clone, Copy)]
816pub struct InteractivePtyOptions {
817    pub echo_output: bool,
818    pub relay_terminal_input: bool,
819    pub respond_to_queries: bool,
820}
821
822impl Default for InteractivePtyOptions {
823    fn default() -> Self {
824        Self {
825            echo_output: true,
826            relay_terminal_input: true,
827            respond_to_queries: true,
828        }
829    }
830}
831
832#[derive(Debug, Default)]
833pub struct InteractivePtyPumpResult {
834    pub chunks: Vec<Vec<u8>>,
835    pub stream_closed: bool,
836}
837
838/// Canonical interactive PTY recipe for downstream Rust consumers.
839///
840/// `NativePtyProcess` remains the low-level primitive. This wrapper owns the
841/// interactive setup that callers commonly forget to assemble correctly.
842pub struct InteractivePtySession {
843    process: NativePtyProcess,
844    options: InteractivePtyOptions,
845}
846
847impl InteractivePtySession {
848    pub fn new(process: NativePtyProcess) -> Self {
849        Self::with_options(process, InteractivePtyOptions::default())
850    }
851
852    pub fn with_options(process: NativePtyProcess, options: InteractivePtyOptions) -> Self {
853        Self { process, options }
854    }
855
856    pub fn process(&self) -> &NativePtyProcess {
857        &self.process
858    }
859
860    pub fn start(&self) -> Result<(), PtyError> {
861        self.process.set_echo(self.options.echo_output);
862        self.process.start_impl()?;
863        if self.options.relay_terminal_input {
864            self.process.start_terminal_input_relay_impl()?;
865        }
866        Ok(())
867    }
868
869    pub fn pump_output(
870        &self,
871        timeout: Option<f64>,
872        consume_all: bool,
873    ) -> Result<InteractivePtyPumpResult, PtyError> {
874        let mut pumped = InteractivePtyPumpResult::default();
875        let mut next_timeout = timeout;
876        loop {
877            match self.process.read_chunk_impl(next_timeout) {
878                Ok(Some(chunk)) => {
879                    if self.options.respond_to_queries {
880                        self.process.respond_to_queries_impl(&chunk)?;
881                    }
882                    pumped.chunks.push(chunk);
883                    if !consume_all {
884                        break;
885                    }
886                    next_timeout = Some(0.0);
887                }
888                Ok(None) => break,
889                Err(PtyError::Other(message)) if message == "Pseudo-terminal stream is closed" => {
890                    pumped.stream_closed = true;
891                    break;
892                }
893                Err(err) => return Err(err),
894            }
895        }
896        Ok(pumped)
897    }
898
899    pub fn resize(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
900        self.process.resize_impl(rows, cols)
901    }
902
903    pub fn send_interrupt(&self) -> Result<(), PtyError> {
904        self.process.send_interrupt_impl()
905    }
906
907    pub fn wait(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
908        self.process.wait_impl(timeout)
909    }
910
911    pub fn wait_and_drain(
912        &self,
913        timeout: Option<f64>,
914        drain_timeout: f64,
915    ) -> Result<i32, PtyError> {
916        self.process.wait_and_drain_impl(timeout, drain_timeout)
917    }
918
919    pub fn terminate(&self) -> Result<(), PtyError> {
920        self.process.terminate_impl()
921    }
922
923    pub fn kill(&self) -> Result<(), PtyError> {
924        self.process.kill_impl()
925    }
926
927    pub fn close(&self) -> Result<(), PtyError> {
928        self.process.close_impl()
929    }
930}
931
932impl Drop for NativePtyProcess {
933    fn drop(&mut self) {
934        self.close_nonblocking();
935    }
936}