1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::{Duration, Instant};
6
7#[cfg(unix)]
8use super::backend::PtySlave;
9use super::backend::{Backend, PtyBackend, PtyChild, PtyMaster, PtySize};
10#[cfg(unix)]
11use super::posix_terminal_input_relay_worker;
12#[cfg(windows)]
13use super::{
14 apply_windows_pty_priority, assign_child_to_windows_kill_on_close_job,
15 assign_conpty_conhost_to_job, conhost_children_of_current_process,
16};
17use super::{
18 is_ignorable_process_control_error, poll_pty_process, record_pty_input_metrics,
19 spawn_pty_reader, store_pty_returncode, write_pty_input, IdleDetectorCore, NativePtyHandles,
20 PtyError, PtyReadShared, PtyReadState,
21};
22
23#[cfg(unix)]
24use super::pty_posix as pty_platform;
25#[cfg(windows)]
26use super::pty_windows;
27
28pub struct NativePtyProcess {
34 pub argv: Vec<String>,
36 pub cwd: Option<String>,
38 pub env: Option<Vec<(String, String)>>,
40 pub rows: u16,
42 pub cols: u16,
44 #[cfg(windows)]
46 pub nice: Option<i32>,
47 pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
49 pub reader: Arc<PtyReadShared>,
51 pub returncode: Arc<Mutex<Option<i32>>>,
53 pub input_bytes_total: Arc<AtomicUsize>,
55 pub newline_events_total: Arc<AtomicUsize>,
57 pub submit_events_total: Arc<AtomicUsize>,
59 pub echo: Arc<AtomicBool>,
61 pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
63 pub output_bytes_total: Arc<AtomicUsize>,
65 pub control_churn_bytes_total: Arc<AtomicUsize>,
67 pub reader_worker: Mutex<Option<thread::JoinHandle<()>>>,
69 pub terminal_input_relay_stop: Arc<AtomicBool>,
71 pub terminal_input_relay_active: Arc<AtomicBool>,
73 pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
75}
76
77pub(super) fn resolved_spawn_cwd(cwd: Option<&str>) -> Option<String> {
78 cwd.map(str::to_owned).or_else(|| {
79 std::env::current_dir()
80 .ok()
81 .map(|cwd| cwd.to_string_lossy().to_string())
82 })
83}
84
85impl NativePtyProcess {
86 pub fn new(
90 argv: Vec<String>,
91 cwd: Option<String>,
92 env: Option<Vec<(String, String)>>,
93 rows: u16,
94 cols: u16,
95 nice: Option<i32>,
96 ) -> Result<Self, PtyError> {
97 if argv.is_empty() {
98 return Err(PtyError::Other("command cannot be empty".into()));
99 }
100 #[cfg(not(windows))]
101 let _ = nice;
102 Ok(Self {
103 argv,
104 cwd,
105 env,
106 rows,
107 cols,
108 #[cfg(windows)]
109 nice,
110 handles: Arc::new(Mutex::new(None)),
111 reader: Arc::new(PtyReadShared {
112 state: Mutex::new(PtyReadState {
113 chunks: VecDeque::new(),
114 closed: false,
115 }),
116 condvar: Condvar::new(),
117 }),
118 returncode: Arc::new(Mutex::new(None)),
119 input_bytes_total: Arc::new(AtomicUsize::new(0)),
120 newline_events_total: Arc::new(AtomicUsize::new(0)),
121 submit_events_total: Arc::new(AtomicUsize::new(0)),
122 echo: Arc::new(AtomicBool::new(false)),
123 idle_detector: Arc::new(Mutex::new(None)),
124 output_bytes_total: Arc::new(AtomicUsize::new(0)),
125 control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
126 reader_worker: Mutex::new(None),
127 terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
128 terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
129 terminal_input_relay_worker: Mutex::new(None),
130 })
131 }
132
133 pub fn mark_reader_closed(&self) {
135 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
136 guard.closed = true;
137 self.reader.condvar.notify_all();
138 }
139
140 pub fn store_returncode(&self, code: i32) {
142 store_pty_returncode(&self.returncode, code);
143 }
144
145 pub(super) fn join_reader_worker(&self) {
146 if let Some(worker) = self
147 .reader_worker
148 .lock()
149 .expect("pty reader worker mutex poisoned")
150 .take()
151 {
152 let _ = worker.join();
153 }
154 }
155
156 pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
158 record_pty_input_metrics(
159 &self.input_bytes_total,
160 &self.newline_events_total,
161 &self.submit_events_total,
162 data,
163 submit,
164 );
165 }
166
167 pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
169 self.record_input_metrics(data, submit);
170 write_pty_input(&self.handles, data)?;
171 Ok(())
172 }
173
174 pub fn request_terminal_input_relay_stop(&self) {
176 self.terminal_input_relay_stop
177 .store(true, Ordering::Release);
178 self.terminal_input_relay_active
179 .store(false, Ordering::Release);
180 }
181
182 pub fn start_terminal_input_relay_impl(&self) -> Result<(), PtyError> {
184 let mut worker_guard = self
185 .terminal_input_relay_worker
186 .lock()
187 .expect("pty terminal input relay mutex poisoned");
188 if worker_guard.is_some() && self.terminal_input_relay_active() {
189 return Ok(());
190 }
191 if self
192 .handles
193 .lock()
194 .expect("pty handles mutex poisoned")
195 .is_none()
196 {
197 return Err(PtyError::NotRunning);
198 }
199
200 self.terminal_input_relay_stop
201 .store(false, Ordering::Release);
202 self.terminal_input_relay_active
203 .store(true, Ordering::Release);
204
205 let handles = Arc::clone(&self.handles);
206 let returncode = Arc::clone(&self.returncode);
207 let input_bytes_total = Arc::clone(&self.input_bytes_total);
208 let newline_events_total = Arc::clone(&self.newline_events_total);
209 let submit_events_total = Arc::clone(&self.submit_events_total);
210 let stop = Arc::clone(&self.terminal_input_relay_stop);
211 let active = Arc::clone(&self.terminal_input_relay_active);
212
213 #[cfg(windows)]
214 {
215 let capture = super::terminal_input::TerminalInputCore::new();
216 capture.start_impl().map_err(PtyError::Io)?;
217 *worker_guard = Some(thread::spawn(move || {
218 loop {
219 if stop.load(Ordering::Acquire) {
220 break;
221 }
222 match poll_pty_process(&handles, &returncode) {
223 Ok(Some(_)) => break,
224 Ok(None) => {}
225 Err(_) => break,
226 }
227 match super::terminal_input::wait_for_terminal_input_event(
228 &capture.state,
229 &capture.condvar,
230 Some(Duration::from_millis(50)),
231 ) {
232 super::terminal_input::TerminalInputWaitOutcome::Event(event) => {
233 record_pty_input_metrics(
234 &input_bytes_total,
235 &newline_events_total,
236 &submit_events_total,
237 &event.data,
238 event.submit,
239 );
240 if write_pty_input(&handles, &event.data).is_err() {
241 break;
242 }
243 }
244 super::terminal_input::TerminalInputWaitOutcome::Timeout => continue,
245 super::terminal_input::TerminalInputWaitOutcome::Closed => break,
246 }
247 }
248 active.store(false, Ordering::Release);
249 let _ = capture.stop_impl();
250 }));
251 Ok(())
252 }
253
254 #[cfg(unix)]
255 {
256 if unsafe { libc::isatty(libc::STDIN_FILENO) } != 1 {
257 self.terminal_input_relay_active
258 .store(false, Ordering::Release);
259 return Ok(());
260 }
261
262 *worker_guard = Some(thread::spawn(move || {
263 posix_terminal_input_relay_worker(
264 handles,
265 returncode,
266 input_bytes_total,
267 newline_events_total,
268 submit_events_total,
269 stop,
270 active,
271 );
272 }));
273 Ok(())
274 }
275 }
276
277 pub fn stop_terminal_input_relay_impl(&self) {
279 self.request_terminal_input_relay_stop();
280 if let Some(worker) = self
281 .terminal_input_relay_worker
282 .lock()
283 .expect("pty terminal input relay mutex poisoned")
284 .take()
285 {
286 let _ = worker.join();
287 }
288 }
289
290 pub fn terminal_input_relay_active(&self) -> bool {
292 self.terminal_input_relay_active.load(Ordering::Acquire)
293 }
294
295 #[inline(never)]
297 pub fn close_impl(&self) -> Result<(), PtyError> {
298 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_impl");
299 self.stop_terminal_input_relay_impl();
300 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
301 let Some(handles) = guard.take() else {
302 self.mark_reader_closed();
303 return Ok(());
304 };
305 drop(guard);
306
307 #[cfg(windows)]
308 let NativePtyHandles {
309 master,
310 writer,
311 mut child,
312 _job,
313 } = handles;
314 #[cfg(not(windows))]
315 let NativePtyHandles {
316 master,
317 writer,
318 mut child,
319 } = handles;
320
321 #[cfg(windows)]
322 {
323 {
324 crate::rp_rust_debug_scope!(
325 "running_process::NativePtyProcess::close_impl.drop_job"
326 );
327 drop(_job);
328 }
329
330 {
331 crate::rp_rust_debug_scope!(
332 "running_process::NativePtyProcess::close_impl.wait_job_exit"
333 );
334 let wait_deadline = Instant::now() + Duration::from_secs(2);
335 loop {
336 match child.try_wait() {
337 Ok(Some(status)) => {
338 let code = status as i32;
339 self.store_returncode(code);
340 break;
341 }
342 Ok(None) if Instant::now() < wait_deadline => {
343 thread::sleep(Duration::from_millis(10));
350 }
351 Ok(None) => {
352 if let Err(err) = child.kill() {
353 if !is_ignorable_process_control_error(&err) {
354 return Err(PtyError::Io(err));
355 }
356 }
357 let code = match child.wait() {
358 Ok(status) => status as i32,
359 Err(_) => -9,
360 };
361 self.store_returncode(code);
362 break;
363 }
364 Err(_) => {
365 self.store_returncode(-9);
366 break;
367 }
368 }
369 }
370 }
371 {
372 crate::rp_rust_debug_scope!(
373 "running_process::NativePtyProcess::close_impl.drop_writer"
374 );
375 drop(writer);
376 }
377 {
378 crate::rp_rust_debug_scope!(
379 "running_process::NativePtyProcess::close_impl.drop_master"
380 );
381 drop(master);
382 }
383 drop(child);
384 {
385 crate::rp_rust_debug_scope!(
386 "running_process::NativePtyProcess::close_impl.join_reader"
387 );
388 self.join_reader_worker();
389 }
390 self.mark_reader_closed();
391 Ok(())
392 }
393
394 #[cfg(not(windows))]
395 {
396 drop(writer);
397 drop(master);
398
399 let code = {
400 crate::rp_rust_debug_scope!(
401 "running_process::NativePtyProcess::close_impl.wait_child"
402 );
403 match child.wait() {
404 Ok(status) => status as i32,
405 Err(_) => -9,
406 }
407 };
408 drop(child);
409
410 self.store_returncode(code);
411 {
412 crate::rp_rust_debug_scope!(
413 "running_process::NativePtyProcess::close_impl.join_reader"
414 );
415 self.join_reader_worker();
416 }
417 self.mark_reader_closed();
418 Ok(())
419 }
420 }
421
422 #[inline(never)]
424 pub fn close_nonblocking(&self) {
425 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_nonblocking");
426 #[cfg(windows)]
427 self.request_terminal_input_relay_stop();
428 let Ok(mut guard) = self.handles.lock() else {
429 return;
430 };
431 let Some(handles) = guard.take() else {
432 self.mark_reader_closed();
433 return;
434 };
435 drop(guard);
436
437 #[cfg(windows)]
438 let NativePtyHandles {
439 master,
440 writer,
441 mut child,
442 _job,
443 } = handles;
444 #[cfg(not(windows))]
445 let NativePtyHandles {
446 master,
447 writer,
448 mut child,
449 } = handles;
450
451 if let Err(err) = child.kill() {
452 if !is_ignorable_process_control_error(&err) {
453 return;
454 }
455 }
456 drop(writer);
457 drop(master);
458 drop(child);
459 #[cfg(windows)]
460 drop(_job);
461 self.mark_reader_closed();
462 }
463
464 pub fn start_impl(&self) -> Result<(), PtyError> {
466 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::start");
467 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
468 if guard.is_some() {
469 return Err(PtyError::AlreadyStarted);
470 }
471
472 #[cfg(windows)]
475 let conhost_pids_before = conhost_children_of_current_process();
476
477 let (mut master, slave) = Backend::openpty(PtySize {
478 rows: self.rows,
479 cols: self.cols,
480 pixel_width: 0,
481 pixel_height: 0,
482 })
483 .map_err(|e| PtyError::Spawn(e.to_string()))?;
484
485 let argv: Vec<std::ffi::OsString> =
487 self.argv.iter().map(std::ffi::OsString::from).collect();
488 let cwd = resolved_spawn_cwd(self.cwd.as_deref());
489 let env: Option<Vec<(std::ffi::OsString, std::ffi::OsString)>> =
490 self.env.as_ref().map(|e| {
491 e.iter()
492 .map(|(k, v)| (std::ffi::OsString::from(k), std::ffi::OsString::from(v)))
493 .collect()
494 });
495
496 let reader = master
497 .try_clone_reader()
498 .map_err(|e| PtyError::Spawn(e.to_string()))?;
499 let writer = master
500 .take_writer()
501 .map_err(|e| PtyError::Spawn(e.to_string()))?;
502 let cwd_path = cwd.as_deref().map(std::path::Path::new);
503 let child = slave
504 .spawn(&argv, cwd_path, env.as_deref())
505 .map_err(|e| PtyError::Spawn(e.to_string()))?;
506 #[cfg(windows)]
509 let job = assign_child_to_windows_kill_on_close_job(PtyChild::as_raw_handle(&child))?;
510 #[cfg(windows)]
511 assign_conpty_conhost_to_job(&job, &conhost_pids_before);
512 #[cfg(windows)]
513 apply_windows_pty_priority(PtyChild::as_raw_handle(&child), self.nice)?;
514 let shared = Arc::clone(&self.reader);
515 let echo = Arc::clone(&self.echo);
516 let idle_detector = Arc::clone(&self.idle_detector);
517 let output_bytes = Arc::clone(&self.output_bytes_total);
518 let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
519 let reader_worker = thread::spawn(move || {
520 spawn_pty_reader(
521 reader,
522 shared,
523 echo,
524 idle_detector,
525 output_bytes,
526 churn_bytes,
527 );
528 });
529 *self
530 .reader_worker
531 .lock()
532 .expect("pty reader worker mutex poisoned") = Some(reader_worker);
533
534 *guard = Some(NativePtyHandles {
535 master: Box::new(master) as Box<dyn PtyMaster>,
536 writer,
537 child: Box::new(child) as Box<dyn PtyChild>,
538 #[cfg(windows)]
539 _job: job,
540 });
541 Ok(())
542 }
543
544 pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
546 #[cfg(windows)]
547 {
548 pty_windows::respond_to_queries(self, data)
549 }
550
551 #[cfg(unix)]
552 {
553 pty_platform::respond_to_queries(self, data)
554 }
555 }
556
557 pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
559 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::resize");
560 let guard = self.handles.lock().expect("pty handles mutex poisoned");
561 if let Some(handles) = guard.as_ref() {
562 #[cfg(windows)]
563 {
564 let _ = (rows, cols, handles);
565 return Ok(());
569 }
570
571 #[cfg(not(windows))]
572 handles
573 .master
574 .resize(PtySize {
575 rows,
576 cols,
577 pixel_width: 0,
578 pixel_height: 0,
579 })
580 .map_err(|e| PtyError::Other(e.to_string()))?;
581 }
582 Ok(())
583 }
584
585 pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
587 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::send_interrupt");
588 #[cfg(windows)]
589 {
590 pty_windows::send_interrupt(self)
591 }
592
593 #[cfg(unix)]
594 {
595 pty_platform::send_interrupt(self)
596 }
597 }
598
599 pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
603 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::wait");
604 if let Some(code) = *self
606 .returncode
607 .lock()
608 .expect("pty returncode mutex poisoned")
609 {
610 return Ok(code);
611 }
612 let start = Instant::now();
613 loop {
614 if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
615 return Ok(code);
616 }
617 if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
618 return Err(PtyError::Timeout);
619 }
620 thread::sleep(Duration::from_millis(10));
624 }
625 }
626
627 pub fn terminate_impl(&self) -> Result<(), PtyError> {
629 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate");
630 #[cfg(windows)]
631 {
632 if self
633 .handles
634 .lock()
635 .expect("pty handles mutex poisoned")
636 .is_none()
637 {
638 return Err(PtyError::NotRunning);
639 }
640 self.close_impl()
641 }
642
643 #[cfg(unix)]
644 {
645 pty_platform::terminate(self)
646 }
647 }
648
649 pub fn kill_impl(&self) -> Result<(), PtyError> {
651 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill");
652 #[cfg(windows)]
653 {
654 if self
655 .handles
656 .lock()
657 .expect("pty handles mutex poisoned")
658 .is_none()
659 {
660 return Err(PtyError::NotRunning);
661 }
662 self.close_impl()
663 }
664
665 #[cfg(unix)]
666 {
667 pty_platform::kill(self)
668 }
669 }
670
671 pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
673 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate_tree");
674 #[cfg(windows)]
675 {
676 pty_windows::terminate_tree(self)
677 }
678
679 #[cfg(unix)]
680 {
681 pty_platform::terminate_tree(self)
682 }
683 }
684
685 pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
687 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill_tree");
688 #[cfg(windows)]
689 {
690 pty_windows::kill_tree(self)
691 }
692
693 #[cfg(unix)]
694 {
695 pty_platform::kill_tree(self)
696 }
697 }
698
699 pub fn pid(&self) -> Result<Option<u32>, PtyError> {
701 let guard = self.handles.lock().expect("pty handles mutex poisoned");
702 if let Some(handles) = guard.as_ref() {
703 #[cfg(unix)]
704 if let Some(pid) = handles.master.process_group_leader() {
705 if let Ok(pid) = u32::try_from(pid) {
706 return Ok(Some(pid));
707 }
708 }
709 return Ok(Some(handles.child.pid()));
710 }
711 Ok(None)
712 }
713
714 pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
717 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
718 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
719 loop {
720 if let Some(chunk) = guard.chunks.pop_front() {
721 return Ok(Some(chunk));
722 }
723 if guard.closed {
724 return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
725 }
726 match deadline {
727 Some(deadline) => {
728 let now = Instant::now();
729 if now >= deadline {
730 return Ok(None); }
732 let wait = deadline.saturating_duration_since(now);
733 let result = self
734 .reader
735 .condvar
736 .wait_timeout(guard, wait)
737 .expect("pty read mutex poisoned");
738 guard = result.0;
739 }
740 None => {
741 guard = self
742 .reader
743 .condvar
744 .wait(guard)
745 .expect("pty read mutex poisoned");
746 }
747 }
748 }
749 }
750
751 pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
753 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
754 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
755 loop {
756 if guard.closed {
757 return true;
758 }
759 match deadline {
760 Some(deadline) => {
761 let now = Instant::now();
762 if now >= deadline {
763 return false;
764 }
765 let wait = deadline.saturating_duration_since(now);
766 let result = self
767 .reader
768 .condvar
769 .wait_timeout(guard, wait)
770 .expect("pty read mutex poisoned");
771 guard = result.0;
772 }
773 None => {
774 guard = self
775 .reader
776 .condvar
777 .wait(guard)
778 .expect("pty read mutex poisoned");
779 }
780 }
781 }
782 }
783
784 pub fn wait_and_drain_impl(
786 &self,
787 timeout: Option<f64>,
788 drain_timeout: f64,
789 ) -> Result<i32, PtyError> {
790 let code = self.wait_impl(timeout)?;
791 let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
792 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
793 while !guard.closed {
794 let remaining = deadline.saturating_duration_since(Instant::now());
795 if remaining.is_zero() {
796 break;
797 }
798 let result = self
799 .reader
800 .condvar
801 .wait_timeout(guard, remaining)
802 .expect("pty read mutex poisoned");
803 guard = result.0;
804 }
805 Ok(code)
806 }
807
808 pub fn set_echo(&self, enabled: bool) {
810 self.echo.store(enabled, Ordering::Release);
811 }
812
813 pub fn echo_enabled(&self) -> bool {
815 self.echo.load(Ordering::Acquire)
816 }
817
818 pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
820 let mut guard = self
821 .idle_detector
822 .lock()
823 .expect("idle detector mutex poisoned");
824 *guard = Some(Arc::clone(detector));
825 }
826
827 pub fn detach_idle_detector(&self) {
829 let mut guard = self
830 .idle_detector
831 .lock()
832 .expect("idle detector mutex poisoned");
833 *guard = None;
834 }
835
836 pub fn pty_input_bytes_total(&self) -> usize {
838 self.input_bytes_total.load(Ordering::Acquire)
839 }
840
841 pub fn pty_newline_events_total(&self) -> usize {
843 self.newline_events_total.load(Ordering::Acquire)
844 }
845
846 pub fn pty_submit_events_total(&self) -> usize {
848 self.submit_events_total.load(Ordering::Acquire)
849 }
850
851 pub fn pty_output_bytes_total(&self) -> usize {
853 self.output_bytes_total.load(Ordering::Acquire)
854 }
855
856 pub fn pty_control_churn_bytes_total(&self) -> usize {
858 self.control_churn_bytes_total.load(Ordering::Acquire)
859 }
860}
861
862#[derive(Debug, Clone, Copy)]
867pub struct InteractivePtyOptions {
868 pub echo_output: bool,
870 pub relay_terminal_input: bool,
872 pub respond_to_queries: bool,
874}
875
876impl Default for InteractivePtyOptions {
877 fn default() -> Self {
878 Self {
879 echo_output: true,
880 relay_terminal_input: true,
881 respond_to_queries: true,
882 }
883 }
884}
885
886#[derive(Debug, Default)]
888pub struct InteractivePtyPumpResult {
889 pub chunks: Vec<Vec<u8>>,
891 pub stream_closed: bool,
893}
894
895pub struct InteractivePtySession {
900 process: NativePtyProcess,
901 options: InteractivePtyOptions,
902}
903
904impl InteractivePtySession {
905 pub fn new(process: NativePtyProcess) -> Self {
907 Self::with_options(process, InteractivePtyOptions::default())
908 }
909
910 pub fn with_options(process: NativePtyProcess, options: InteractivePtyOptions) -> Self {
912 Self { process, options }
913 }
914
915 pub fn process(&self) -> &NativePtyProcess {
917 &self.process
918 }
919
920 pub fn start(&self) -> Result<(), PtyError> {
922 self.process.set_echo(self.options.echo_output);
923 self.process.start_impl()?;
924 if self.options.relay_terminal_input {
925 self.process.start_terminal_input_relay_impl()?;
926 }
927 Ok(())
928 }
929
930 pub fn pump_output(
935 &self,
936 timeout: Option<f64>,
937 consume_all: bool,
938 ) -> Result<InteractivePtyPumpResult, PtyError> {
939 let mut pumped = InteractivePtyPumpResult::default();
940 let mut next_timeout = timeout;
941 loop {
942 match self.process.read_chunk_impl(next_timeout) {
943 Ok(Some(chunk)) => {
944 if self.options.respond_to_queries {
945 self.process.respond_to_queries_impl(&chunk)?;
946 }
947 pumped.chunks.push(chunk);
948 if !consume_all {
949 break;
950 }
951 next_timeout = Some(0.0);
952 }
953 Ok(None) => break,
954 Err(PtyError::Other(message)) if message == "Pseudo-terminal stream is closed" => {
955 pumped.stream_closed = true;
956 break;
957 }
958 Err(err) => return Err(err),
959 }
960 }
961 Ok(pumped)
962 }
963
964 pub fn resize(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
966 self.process.resize_impl(rows, cols)
967 }
968
969 pub fn send_interrupt(&self) -> Result<(), PtyError> {
971 self.process.send_interrupt_impl()
972 }
973
974 pub fn wait(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
976 self.process.wait_impl(timeout)
977 }
978
979 pub fn wait_and_drain(
981 &self,
982 timeout: Option<f64>,
983 drain_timeout: f64,
984 ) -> Result<i32, PtyError> {
985 self.process.wait_and_drain_impl(timeout, drain_timeout)
986 }
987
988 pub fn terminate(&self) -> Result<(), PtyError> {
990 self.process.terminate_impl()
991 }
992
993 pub fn kill(&self) -> Result<(), PtyError> {
995 self.process.kill_impl()
996 }
997
998 pub fn close(&self) -> Result<(), PtyError> {
1000 self.process.close_impl()
1001 }
1002}
1003
1004impl Drop for NativePtyProcess {
1005 fn drop(&mut self) {
1006 self.close_nonblocking();
1007 }
1008}