Skip to main content

running_process/pty/
native_pty_process.rs

1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::{Duration, Instant};
6
7use super::backend::{Backend, PtyBackend, PtyChild, PtyMaster, PtySize, PtySlave};
8use super::{
9    is_ignorable_process_control_error, poll_pty_process, record_pty_input_metrics,
10    spawn_pty_reader, store_pty_returncode, write_pty_input, IdleDetectorCore, NativePtyHandles,
11    PtyError, PtyReadShared, PtyReadState,
12};
13#[cfg(unix)]
14use super::posix_terminal_input_relay_worker;
15#[cfg(windows)]
16use super::{
17    apply_windows_pty_priority, assign_child_to_windows_kill_on_close_job,
18    assign_conpty_conhost_to_job, conhost_children_of_current_process,
19};
20
21#[cfg(unix)]
22use super::pty_posix as pty_platform;
23#[cfg(windows)]
24use super::pty_windows;
25
26pub struct NativePtyProcess {
27    pub argv: Vec<String>,
28    pub cwd: Option<String>,
29    pub env: Option<Vec<(String, String)>>,
30    pub rows: u16,
31    pub cols: u16,
32    #[cfg(windows)]
33    pub nice: Option<i32>,
34    pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
35    pub reader: Arc<PtyReadShared>,
36    pub returncode: Arc<Mutex<Option<i32>>>,
37    pub input_bytes_total: Arc<AtomicUsize>,
38    pub newline_events_total: Arc<AtomicUsize>,
39    pub submit_events_total: Arc<AtomicUsize>,
40    /// When true, the reader thread writes PTY output to stdout.
41    pub echo: Arc<AtomicBool>,
42    /// When set, the reader thread feeds output directly to the idle detector.
43    pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
44    /// Visible (non-control) output bytes seen by the reader thread.
45    pub output_bytes_total: Arc<AtomicUsize>,
46    /// Control churn bytes (ANSI escapes, BS, CR, DEL) seen by the reader.
47    pub control_churn_bytes_total: Arc<AtomicUsize>,
48    pub reader_worker: Mutex<Option<thread::JoinHandle<()>>>,
49    pub terminal_input_relay_stop: Arc<AtomicBool>,
50    pub terminal_input_relay_active: Arc<AtomicBool>,
51    pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
52}
53
54pub(super) fn resolved_spawn_cwd(cwd: Option<&str>) -> Option<String> {
55    cwd.map(str::to_owned).or_else(|| {
56        std::env::current_dir()
57            .ok()
58            .map(|cwd| cwd.to_string_lossy().to_string())
59    })
60}
61
62impl NativePtyProcess {
63    pub fn new(
64        argv: Vec<String>,
65        cwd: Option<String>,
66        env: Option<Vec<(String, String)>>,
67        rows: u16,
68        cols: u16,
69        nice: Option<i32>,
70    ) -> Result<Self, PtyError> {
71        if argv.is_empty() {
72            return Err(PtyError::Other("command cannot be empty".into()));
73        }
74        #[cfg(not(windows))]
75        let _ = nice;
76        Ok(Self {
77            argv,
78            cwd,
79            env,
80            rows,
81            cols,
82            #[cfg(windows)]
83            nice,
84            handles: Arc::new(Mutex::new(None)),
85            reader: Arc::new(PtyReadShared {
86                state: Mutex::new(PtyReadState {
87                    chunks: VecDeque::new(),
88                    closed: false,
89                }),
90                condvar: Condvar::new(),
91            }),
92            returncode: Arc::new(Mutex::new(None)),
93            input_bytes_total: Arc::new(AtomicUsize::new(0)),
94            newline_events_total: Arc::new(AtomicUsize::new(0)),
95            submit_events_total: Arc::new(AtomicUsize::new(0)),
96            echo: Arc::new(AtomicBool::new(false)),
97            idle_detector: Arc::new(Mutex::new(None)),
98            output_bytes_total: Arc::new(AtomicUsize::new(0)),
99            control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
100            reader_worker: Mutex::new(None),
101            terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
102            terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
103            terminal_input_relay_worker: Mutex::new(None),
104        })
105    }
106
107    pub fn mark_reader_closed(&self) {
108        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
109        guard.closed = true;
110        self.reader.condvar.notify_all();
111    }
112
113    pub fn store_returncode(&self, code: i32) {
114        store_pty_returncode(&self.returncode, code);
115    }
116
117    pub(super) fn join_reader_worker(&self) {
118        if let Some(worker) = self
119            .reader_worker
120            .lock()
121            .expect("pty reader worker mutex poisoned")
122            .take()
123        {
124            let _ = worker.join();
125        }
126    }
127
128    pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
129        record_pty_input_metrics(
130            &self.input_bytes_total,
131            &self.newline_events_total,
132            &self.submit_events_total,
133            data,
134            submit,
135        );
136    }
137
138    pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
139        self.record_input_metrics(data, submit);
140        write_pty_input(&self.handles, data)?;
141        Ok(())
142    }
143
144    pub fn request_terminal_input_relay_stop(&self) {
145        self.terminal_input_relay_stop
146            .store(true, Ordering::Release);
147        self.terminal_input_relay_active
148            .store(false, Ordering::Release);
149    }
150
151    pub fn start_terminal_input_relay_impl(&self) -> Result<(), PtyError> {
152        let mut worker_guard = self
153            .terminal_input_relay_worker
154            .lock()
155            .expect("pty terminal input relay mutex poisoned");
156        if worker_guard.is_some() && self.terminal_input_relay_active() {
157            return Ok(());
158        }
159        if self
160            .handles
161            .lock()
162            .expect("pty handles mutex poisoned")
163            .is_none()
164        {
165            return Err(PtyError::NotRunning);
166        }
167
168        self.terminal_input_relay_stop
169            .store(false, Ordering::Release);
170        self.terminal_input_relay_active
171            .store(true, Ordering::Release);
172
173        let handles = Arc::clone(&self.handles);
174        let returncode = Arc::clone(&self.returncode);
175        let input_bytes_total = Arc::clone(&self.input_bytes_total);
176        let newline_events_total = Arc::clone(&self.newline_events_total);
177        let submit_events_total = Arc::clone(&self.submit_events_total);
178        let stop = Arc::clone(&self.terminal_input_relay_stop);
179        let active = Arc::clone(&self.terminal_input_relay_active);
180
181        #[cfg(windows)]
182        {
183            let capture = super::terminal_input::TerminalInputCore::new();
184            capture.start_impl().map_err(PtyError::Io)?;
185            *worker_guard = Some(thread::spawn(move || {
186                loop {
187                    if stop.load(Ordering::Acquire) {
188                        break;
189                    }
190                    match poll_pty_process(&handles, &returncode) {
191                        Ok(Some(_)) => break,
192                        Ok(None) => {}
193                        Err(_) => break,
194                    }
195                    match super::terminal_input::wait_for_terminal_input_event(
196                        &capture.state,
197                        &capture.condvar,
198                        Some(Duration::from_millis(50)),
199                    ) {
200                        super::terminal_input::TerminalInputWaitOutcome::Event(event) => {
201                            record_pty_input_metrics(
202                                &input_bytes_total,
203                                &newline_events_total,
204                                &submit_events_total,
205                                &event.data,
206                                event.submit,
207                            );
208                            if write_pty_input(&handles, &event.data).is_err() {
209                                break;
210                            }
211                        }
212                        super::terminal_input::TerminalInputWaitOutcome::Timeout => continue,
213                        super::terminal_input::TerminalInputWaitOutcome::Closed => break,
214                    }
215                }
216                active.store(false, Ordering::Release);
217                let _ = capture.stop_impl();
218            }));
219            Ok(())
220        }
221
222        #[cfg(unix)]
223        {
224            if unsafe { libc::isatty(libc::STDIN_FILENO) } != 1 {
225                self.terminal_input_relay_active
226                    .store(false, Ordering::Release);
227                return Ok(());
228            }
229
230            *worker_guard = Some(thread::spawn(move || {
231                posix_terminal_input_relay_worker(
232                    handles,
233                    returncode,
234                    input_bytes_total,
235                    newline_events_total,
236                    submit_events_total,
237                    stop,
238                    active,
239                );
240            }));
241            Ok(())
242        }
243    }
244
245    pub fn stop_terminal_input_relay_impl(&self) {
246        self.request_terminal_input_relay_stop();
247        if let Some(worker) = self
248            .terminal_input_relay_worker
249            .lock()
250            .expect("pty terminal input relay mutex poisoned")
251            .take()
252        {
253            let _ = worker.join();
254        }
255    }
256
257    pub fn terminal_input_relay_active(&self) -> bool {
258        self.terminal_input_relay_active.load(Ordering::Acquire)
259    }
260
261    /// Synchronously tear down the PTY and reap the child.
262    #[inline(never)]
263    pub fn close_impl(&self) -> Result<(), PtyError> {
264        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_impl");
265        self.stop_terminal_input_relay_impl();
266        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
267        let Some(handles) = guard.take() else {
268            self.mark_reader_closed();
269            return Ok(());
270        };
271        drop(guard);
272
273        #[cfg(windows)]
274        let NativePtyHandles {
275            master,
276            writer,
277            mut child,
278            _job,
279        } = handles;
280        #[cfg(not(windows))]
281        let NativePtyHandles {
282            master,
283            writer,
284            mut child,
285        } = handles;
286
287        #[cfg(windows)]
288        {
289            {
290                crate::rp_rust_debug_scope!(
291                    "running_process::NativePtyProcess::close_impl.drop_job"
292                );
293                drop(_job);
294            }
295
296            {
297                crate::rp_rust_debug_scope!(
298                    "running_process::NativePtyProcess::close_impl.wait_job_exit"
299                );
300                let wait_deadline = Instant::now() + Duration::from_secs(2);
301                loop {
302                    match child.try_wait() {
303                        Ok(Some(status)) => {
304                            let code = status as i32;
305                            self.store_returncode(code);
306                            break;
307                        }
308                        Ok(None) if Instant::now() < wait_deadline => {
309                            // #199: intentional — `PtyChild` doesn't
310                            // expose a "wait with timeout" method
311                            // (portable-pty's Child trait doesn't
312                            // either). Polling at 10ms inside a
313                            // bounded 2-second deadline is the
314                            // close-path graceful-exit watcher.
315                            thread::sleep(Duration::from_millis(10));
316                        }
317                        Ok(None) => {
318                            if let Err(err) = child.kill() {
319                                if !is_ignorable_process_control_error(&err) {
320                                    return Err(PtyError::Io(err));
321                                }
322                            }
323                            let code = match child.wait() {
324                                Ok(status) => status as i32,
325                                Err(_) => -9,
326                            };
327                            self.store_returncode(code);
328                            break;
329                        }
330                        Err(_) => {
331                            self.store_returncode(-9);
332                            break;
333                        }
334                    }
335                }
336            }
337            {
338                crate::rp_rust_debug_scope!(
339                    "running_process::NativePtyProcess::close_impl.drop_writer"
340                );
341                drop(writer);
342            }
343            {
344                crate::rp_rust_debug_scope!(
345                    "running_process::NativePtyProcess::close_impl.drop_master"
346                );
347                drop(master);
348            }
349            drop(child);
350            {
351                crate::rp_rust_debug_scope!(
352                    "running_process::NativePtyProcess::close_impl.join_reader"
353                );
354                self.join_reader_worker();
355            }
356            self.mark_reader_closed();
357            Ok(())
358        }
359
360        #[cfg(not(windows))]
361        {
362            drop(writer);
363            drop(master);
364
365            let code = {
366                crate::rp_rust_debug_scope!(
367                    "running_process::NativePtyProcess::close_impl.wait_child"
368                );
369                match child.wait() {
370                    Ok(status) => status as i32,
371                    Err(_) => -9,
372                }
373            };
374            drop(child);
375
376            self.store_returncode(code);
377            {
378                crate::rp_rust_debug_scope!(
379                    "running_process::NativePtyProcess::close_impl.join_reader"
380                );
381                self.join_reader_worker();
382            }
383            self.mark_reader_closed();
384            Ok(())
385        }
386    }
387
388    /// Best-effort, non-blocking teardown for use from `Drop`.
389    #[inline(never)]
390    pub fn close_nonblocking(&self) {
391        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_nonblocking");
392        #[cfg(windows)]
393        self.request_terminal_input_relay_stop();
394        let Ok(mut guard) = self.handles.lock() else {
395            return;
396        };
397        let Some(handles) = guard.take() else {
398            self.mark_reader_closed();
399            return;
400        };
401        drop(guard);
402
403        #[cfg(windows)]
404        let NativePtyHandles {
405            master,
406            writer,
407            mut child,
408            _job,
409        } = handles;
410        #[cfg(not(windows))]
411        let NativePtyHandles {
412            master,
413            writer,
414            mut child,
415        } = handles;
416
417        if let Err(err) = child.kill() {
418            if !is_ignorable_process_control_error(&err) {
419                return;
420            }
421        }
422        drop(writer);
423        drop(master);
424        drop(child);
425        #[cfg(windows)]
426        drop(_job);
427        self.mark_reader_closed();
428    }
429
430    pub fn start_impl(&self) -> Result<(), PtyError> {
431        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::start");
432        let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
433        if guard.is_some() {
434            return Err(PtyError::AlreadyStarted);
435        }
436
437        // Snapshot our conhost.exe children before openpty() so we can diff
438        // after spawn to find the new conhost.exe created by ConPTY.
439        #[cfg(windows)]
440        let conhost_pids_before = conhost_children_of_current_process();
441
442        let (mut master, slave) = Backend::openpty(PtySize {
443            rows: self.rows,
444            cols: self.cols,
445            pixel_width: 0,
446            pixel_height: 0,
447        })
448        .map_err(|e| PtyError::Spawn(e.to_string()))?;
449
450        // Build argv/cwd/env in the shape the backend wants.
451        let argv: Vec<std::ffi::OsString> =
452            self.argv.iter().map(std::ffi::OsString::from).collect();
453        let cwd = resolved_spawn_cwd(self.cwd.as_deref());
454        let env: Option<Vec<(std::ffi::OsString, std::ffi::OsString)>> =
455            self.env.as_ref().map(|e| {
456                e.iter()
457                    .map(|(k, v)| (std::ffi::OsString::from(k), std::ffi::OsString::from(v)))
458                    .collect()
459            });
460
461        let reader = master
462            .try_clone_reader()
463            .map_err(|e| PtyError::Spawn(e.to_string()))?;
464        let writer = master
465            .take_writer()
466            .map_err(|e| PtyError::Spawn(e.to_string()))?;
467        let cwd_path = cwd.as_deref().map(std::path::Path::new);
468        let child = slave
469            .spawn(&argv, cwd_path, env.as_deref())
470            .map_err(|e| PtyError::Spawn(e.to_string()))?;
471        // The trait's PtyChild::as_raw_handle returns Option<RawHandle>
472        // matching portable_pty's signature; pass directly.
473        #[cfg(windows)]
474        let job =
475            assign_child_to_windows_kill_on_close_job(PtyChild::as_raw_handle(&child))?;
476        #[cfg(windows)]
477        assign_conpty_conhost_to_job(&job, &conhost_pids_before);
478        #[cfg(windows)]
479        apply_windows_pty_priority(PtyChild::as_raw_handle(&child), self.nice)?;
480        let shared = Arc::clone(&self.reader);
481        let echo = Arc::clone(&self.echo);
482        let idle_detector = Arc::clone(&self.idle_detector);
483        let output_bytes = Arc::clone(&self.output_bytes_total);
484        let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
485        let reader_worker = thread::spawn(move || {
486            spawn_pty_reader(
487                reader,
488                shared,
489                echo,
490                idle_detector,
491                output_bytes,
492                churn_bytes,
493            );
494        });
495        *self
496            .reader_worker
497            .lock()
498            .expect("pty reader worker mutex poisoned") = Some(reader_worker);
499
500        *guard = Some(NativePtyHandles {
501            master: Box::new(master) as Box<dyn PtyMaster>,
502            writer,
503            child: Box::new(child) as Box<dyn PtyChild>,
504            #[cfg(windows)]
505            _job: job,
506        });
507        Ok(())
508    }
509
510    pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
511        #[cfg(windows)]
512        {
513            pty_windows::respond_to_queries(self, data)
514        }
515
516        #[cfg(unix)]
517        {
518            pty_platform::respond_to_queries(self, data)
519        }
520    }
521
522    pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
523        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::resize");
524        let guard = self.handles.lock().expect("pty handles mutex poisoned");
525        if let Some(handles) = guard.as_ref() {
526            #[cfg(windows)]
527            {
528                let _ = (rows, cols, handles);
529                // ConPTY resize can leave ClosePseudoConsole blocked during
530                // teardown on Windows. Keep resize as a no-op until the
531                // backend can cancel the outstanding PTY read safely.
532                return Ok(());
533            }
534
535            #[cfg(not(windows))]
536            handles
537                .master
538                .resize(PtySize {
539                    rows,
540                    cols,
541                    pixel_width: 0,
542                    pixel_height: 0,
543                })
544                .map_err(|e| PtyError::Other(e.to_string()))?;
545        }
546        Ok(())
547    }
548
549    pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
550        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::send_interrupt");
551        #[cfg(windows)]
552        {
553            pty_windows::send_interrupt(self)
554        }
555
556        #[cfg(unix)]
557        {
558            pty_platform::send_interrupt(self)
559        }
560    }
561
562    pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
563        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::wait");
564        // Fast path: already exited.
565        if let Some(code) = *self
566            .returncode
567            .lock()
568            .expect("pty returncode mutex poisoned")
569        {
570            return Ok(code);
571        }
572        let start = Instant::now();
573        loop {
574            if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
575                return Ok(code);
576            }
577            if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
578                return Err(PtyError::Timeout);
579            }
580            // #199: intentional — `wait_impl` poll. Same constraint
581            // as the close_impl variant above: no per-Child wait
582            // primitive on the trait surface.
583            thread::sleep(Duration::from_millis(10));
584        }
585    }
586
587    pub fn terminate_impl(&self) -> Result<(), PtyError> {
588        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate");
589        #[cfg(windows)]
590        {
591            if self
592                .handles
593                .lock()
594                .expect("pty handles mutex poisoned")
595                .is_none()
596            {
597                return Err(PtyError::NotRunning);
598            }
599            self.close_impl()
600        }
601
602        #[cfg(unix)]
603        {
604            pty_platform::terminate(self)
605        }
606    }
607
608    pub fn kill_impl(&self) -> Result<(), PtyError> {
609        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill");
610        #[cfg(windows)]
611        {
612            if self
613                .handles
614                .lock()
615                .expect("pty handles mutex poisoned")
616                .is_none()
617            {
618                return Err(PtyError::NotRunning);
619            }
620            self.close_impl()
621        }
622
623        #[cfg(unix)]
624        {
625            pty_platform::kill(self)
626        }
627    }
628
629    pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
630        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate_tree");
631        #[cfg(windows)]
632        {
633            pty_windows::terminate_tree(self)
634        }
635
636        #[cfg(unix)]
637        {
638            pty_platform::terminate_tree(self)
639        }
640    }
641
642    pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
643        crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill_tree");
644        #[cfg(windows)]
645        {
646            pty_windows::kill_tree(self)
647        }
648
649        #[cfg(unix)]
650        {
651            pty_platform::kill_tree(self)
652        }
653    }
654
655    /// Get the PID of the child process, if running.
656    pub fn pid(&self) -> Result<Option<u32>, PtyError> {
657        let guard = self.handles.lock().expect("pty handles mutex poisoned");
658        if let Some(handles) = guard.as_ref() {
659            #[cfg(unix)]
660            if let Some(pid) = handles.master.process_group_leader() {
661                if let Ok(pid) = u32::try_from(pid) {
662                    return Ok(Some(pid));
663                }
664            }
665            return Ok(Some(handles.child.pid()));
666        }
667        Ok(None)
668    }
669
670    /// Wait for a chunk of output from the PTY reader.
671    /// Returns `Ok(Some(chunk))` on data, `Ok(None)` on timeout, `Err` on closed.
672    pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
673        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
674        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
675        loop {
676            if let Some(chunk) = guard.chunks.pop_front() {
677                return Ok(Some(chunk));
678            }
679            if guard.closed {
680                return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
681            }
682            match deadline {
683                Some(deadline) => {
684                    let now = Instant::now();
685                    if now >= deadline {
686                        return Ok(None); // timeout
687                    }
688                    let wait = deadline.saturating_duration_since(now);
689                    let result = self
690                        .reader
691                        .condvar
692                        .wait_timeout(guard, wait)
693                        .expect("pty read mutex poisoned");
694                    guard = result.0;
695                }
696                None => {
697                    guard = self
698                        .reader
699                        .condvar
700                        .wait(guard)
701                        .expect("pty read mutex poisoned");
702                }
703            }
704        }
705    }
706
707    /// Wait for the reader thread to close.
708    pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
709        let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
710        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
711        loop {
712            if guard.closed {
713                return true;
714            }
715            match deadline {
716                Some(deadline) => {
717                    let now = Instant::now();
718                    if now >= deadline {
719                        return false;
720                    }
721                    let wait = deadline.saturating_duration_since(now);
722                    let result = self
723                        .reader
724                        .condvar
725                        .wait_timeout(guard, wait)
726                        .expect("pty read mutex poisoned");
727                    guard = result.0;
728                }
729                None => {
730                    guard = self
731                        .reader
732                        .condvar
733                        .wait(guard)
734                        .expect("pty read mutex poisoned");
735                }
736            }
737        }
738    }
739
740    /// Wait for exit then drain remaining output.
741    pub fn wait_and_drain_impl(
742        &self,
743        timeout: Option<f64>,
744        drain_timeout: f64,
745    ) -> Result<i32, PtyError> {
746        let code = self.wait_impl(timeout)?;
747        let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
748        let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
749        while !guard.closed {
750            let remaining = deadline.saturating_duration_since(Instant::now());
751            if remaining.is_zero() {
752                break;
753            }
754            let result = self
755                .reader
756                .condvar
757                .wait_timeout(guard, remaining)
758                .expect("pty read mutex poisoned");
759            guard = result.0;
760        }
761        Ok(code)
762    }
763
764    pub fn set_echo(&self, enabled: bool) {
765        self.echo.store(enabled, Ordering::Release);
766    }
767
768    pub fn echo_enabled(&self) -> bool {
769        self.echo.load(Ordering::Acquire)
770    }
771
772    pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
773        let mut guard = self
774            .idle_detector
775            .lock()
776            .expect("idle detector mutex poisoned");
777        *guard = Some(Arc::clone(detector));
778    }
779
780    pub fn detach_idle_detector(&self) {
781        let mut guard = self
782            .idle_detector
783            .lock()
784            .expect("idle detector mutex poisoned");
785        *guard = None;
786    }
787
788    pub fn pty_input_bytes_total(&self) -> usize {
789        self.input_bytes_total.load(Ordering::Acquire)
790    }
791
792    pub fn pty_newline_events_total(&self) -> usize {
793        self.newline_events_total.load(Ordering::Acquire)
794    }
795
796    pub fn pty_submit_events_total(&self) -> usize {
797        self.submit_events_total.load(Ordering::Acquire)
798    }
799
800    pub fn pty_output_bytes_total(&self) -> usize {
801        self.output_bytes_total.load(Ordering::Acquire)
802    }
803
804    pub fn pty_control_churn_bytes_total(&self) -> usize {
805        self.control_churn_bytes_total.load(Ordering::Acquire)
806    }
807}
808
809/// Safe defaults for a real interactive PTY session.
810///
811/// The helper turns on the parts that a terminal-style session usually needs:
812/// output echo, terminal input relay, and automatic PTY query replies.
813#[derive(Debug, Clone, Copy)]
814pub struct InteractivePtyOptions {
815    pub echo_output: bool,
816    pub relay_terminal_input: bool,
817    pub respond_to_queries: bool,
818}
819
820impl Default for InteractivePtyOptions {
821    fn default() -> Self {
822        Self {
823            echo_output: true,
824            relay_terminal_input: true,
825            respond_to_queries: true,
826        }
827    }
828}
829
830#[derive(Debug, Default)]
831pub struct InteractivePtyPumpResult {
832    pub chunks: Vec<Vec<u8>>,
833    pub stream_closed: bool,
834}
835
836/// Canonical interactive PTY recipe for downstream Rust consumers.
837///
838/// `NativePtyProcess` remains the low-level primitive. This wrapper owns the
839/// interactive setup that callers commonly forget to assemble correctly.
840pub struct InteractivePtySession {
841    process: NativePtyProcess,
842    options: InteractivePtyOptions,
843}
844
845impl InteractivePtySession {
846    pub fn new(process: NativePtyProcess) -> Self {
847        Self::with_options(process, InteractivePtyOptions::default())
848    }
849
850    pub fn with_options(process: NativePtyProcess, options: InteractivePtyOptions) -> Self {
851        Self { process, options }
852    }
853
854    pub fn process(&self) -> &NativePtyProcess {
855        &self.process
856    }
857
858    pub fn start(&self) -> Result<(), PtyError> {
859        self.process.set_echo(self.options.echo_output);
860        self.process.start_impl()?;
861        if self.options.relay_terminal_input {
862            self.process.start_terminal_input_relay_impl()?;
863        }
864        Ok(())
865    }
866
867    pub fn pump_output(
868        &self,
869        timeout: Option<f64>,
870        consume_all: bool,
871    ) -> Result<InteractivePtyPumpResult, PtyError> {
872        let mut pumped = InteractivePtyPumpResult::default();
873        let mut next_timeout = timeout;
874        loop {
875            match self.process.read_chunk_impl(next_timeout) {
876                Ok(Some(chunk)) => {
877                    if self.options.respond_to_queries {
878                        self.process.respond_to_queries_impl(&chunk)?;
879                    }
880                    pumped.chunks.push(chunk);
881                    if !consume_all {
882                        break;
883                    }
884                    next_timeout = Some(0.0);
885                }
886                Ok(None) => break,
887                Err(PtyError::Other(message)) if message == "Pseudo-terminal stream is closed" => {
888                    pumped.stream_closed = true;
889                    break;
890                }
891                Err(err) => return Err(err),
892            }
893        }
894        Ok(pumped)
895    }
896
897    pub fn resize(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
898        self.process.resize_impl(rows, cols)
899    }
900
901    pub fn send_interrupt(&self) -> Result<(), PtyError> {
902        self.process.send_interrupt_impl()
903    }
904
905    pub fn wait(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
906        self.process.wait_impl(timeout)
907    }
908
909    pub fn wait_and_drain(
910        &self,
911        timeout: Option<f64>,
912        drain_timeout: f64,
913    ) -> Result<i32, PtyError> {
914        self.process.wait_and_drain_impl(timeout, drain_timeout)
915    }
916
917    pub fn terminate(&self) -> Result<(), PtyError> {
918        self.process.terminate_impl()
919    }
920
921    pub fn kill(&self) -> Result<(), PtyError> {
922        self.process.kill_impl()
923    }
924
925    pub fn close(&self) -> Result<(), PtyError> {
926        self.process.close_impl()
927    }
928}
929
930impl Drop for NativePtyProcess {
931    fn drop(&mut self) {
932        self.close_nonblocking();
933    }
934}