1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::{Duration, Instant};
6
7use super::backend::{Backend, PtyBackend, PtyChild, PtyMaster, PtySize};
8#[cfg(unix)]
9use super::backend::PtySlave;
10use super::{
11 is_ignorable_process_control_error, poll_pty_process, record_pty_input_metrics,
12 spawn_pty_reader, store_pty_returncode, write_pty_input, IdleDetectorCore, NativePtyHandles,
13 PtyError, PtyReadShared, PtyReadState,
14};
15#[cfg(unix)]
16use super::posix_terminal_input_relay_worker;
17#[cfg(windows)]
18use super::{
19 apply_windows_pty_priority, assign_child_to_windows_kill_on_close_job,
20 assign_conpty_conhost_to_job, conhost_children_of_current_process,
21};
22
23#[cfg(unix)]
24use super::pty_posix as pty_platform;
25#[cfg(windows)]
26use super::pty_windows;
27
28pub struct NativePtyProcess {
29 pub argv: Vec<String>,
30 pub cwd: Option<String>,
31 pub env: Option<Vec<(String, String)>>,
32 pub rows: u16,
33 pub cols: u16,
34 #[cfg(windows)]
35 pub nice: Option<i32>,
36 pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
37 pub reader: Arc<PtyReadShared>,
38 pub returncode: Arc<Mutex<Option<i32>>>,
39 pub input_bytes_total: Arc<AtomicUsize>,
40 pub newline_events_total: Arc<AtomicUsize>,
41 pub submit_events_total: Arc<AtomicUsize>,
42 pub echo: Arc<AtomicBool>,
44 pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
46 pub output_bytes_total: Arc<AtomicUsize>,
48 pub control_churn_bytes_total: Arc<AtomicUsize>,
50 pub reader_worker: Mutex<Option<thread::JoinHandle<()>>>,
51 pub terminal_input_relay_stop: Arc<AtomicBool>,
52 pub terminal_input_relay_active: Arc<AtomicBool>,
53 pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
54}
55
56pub(super) fn resolved_spawn_cwd(cwd: Option<&str>) -> Option<String> {
57 cwd.map(str::to_owned).or_else(|| {
58 std::env::current_dir()
59 .ok()
60 .map(|cwd| cwd.to_string_lossy().to_string())
61 })
62}
63
64impl NativePtyProcess {
65 pub fn new(
66 argv: Vec<String>,
67 cwd: Option<String>,
68 env: Option<Vec<(String, String)>>,
69 rows: u16,
70 cols: u16,
71 nice: Option<i32>,
72 ) -> Result<Self, PtyError> {
73 if argv.is_empty() {
74 return Err(PtyError::Other("command cannot be empty".into()));
75 }
76 #[cfg(not(windows))]
77 let _ = nice;
78 Ok(Self {
79 argv,
80 cwd,
81 env,
82 rows,
83 cols,
84 #[cfg(windows)]
85 nice,
86 handles: Arc::new(Mutex::new(None)),
87 reader: Arc::new(PtyReadShared {
88 state: Mutex::new(PtyReadState {
89 chunks: VecDeque::new(),
90 closed: false,
91 }),
92 condvar: Condvar::new(),
93 }),
94 returncode: Arc::new(Mutex::new(None)),
95 input_bytes_total: Arc::new(AtomicUsize::new(0)),
96 newline_events_total: Arc::new(AtomicUsize::new(0)),
97 submit_events_total: Arc::new(AtomicUsize::new(0)),
98 echo: Arc::new(AtomicBool::new(false)),
99 idle_detector: Arc::new(Mutex::new(None)),
100 output_bytes_total: Arc::new(AtomicUsize::new(0)),
101 control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
102 reader_worker: Mutex::new(None),
103 terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
104 terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
105 terminal_input_relay_worker: Mutex::new(None),
106 })
107 }
108
109 pub fn mark_reader_closed(&self) {
110 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
111 guard.closed = true;
112 self.reader.condvar.notify_all();
113 }
114
115 pub fn store_returncode(&self, code: i32) {
116 store_pty_returncode(&self.returncode, code);
117 }
118
119 pub(super) fn join_reader_worker(&self) {
120 if let Some(worker) = self
121 .reader_worker
122 .lock()
123 .expect("pty reader worker mutex poisoned")
124 .take()
125 {
126 let _ = worker.join();
127 }
128 }
129
130 pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
131 record_pty_input_metrics(
132 &self.input_bytes_total,
133 &self.newline_events_total,
134 &self.submit_events_total,
135 data,
136 submit,
137 );
138 }
139
140 pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
141 self.record_input_metrics(data, submit);
142 write_pty_input(&self.handles, data)?;
143 Ok(())
144 }
145
146 pub fn request_terminal_input_relay_stop(&self) {
147 self.terminal_input_relay_stop
148 .store(true, Ordering::Release);
149 self.terminal_input_relay_active
150 .store(false, Ordering::Release);
151 }
152
153 pub fn start_terminal_input_relay_impl(&self) -> Result<(), PtyError> {
154 let mut worker_guard = self
155 .terminal_input_relay_worker
156 .lock()
157 .expect("pty terminal input relay mutex poisoned");
158 if worker_guard.is_some() && self.terminal_input_relay_active() {
159 return Ok(());
160 }
161 if self
162 .handles
163 .lock()
164 .expect("pty handles mutex poisoned")
165 .is_none()
166 {
167 return Err(PtyError::NotRunning);
168 }
169
170 self.terminal_input_relay_stop
171 .store(false, Ordering::Release);
172 self.terminal_input_relay_active
173 .store(true, Ordering::Release);
174
175 let handles = Arc::clone(&self.handles);
176 let returncode = Arc::clone(&self.returncode);
177 let input_bytes_total = Arc::clone(&self.input_bytes_total);
178 let newline_events_total = Arc::clone(&self.newline_events_total);
179 let submit_events_total = Arc::clone(&self.submit_events_total);
180 let stop = Arc::clone(&self.terminal_input_relay_stop);
181 let active = Arc::clone(&self.terminal_input_relay_active);
182
183 #[cfg(windows)]
184 {
185 let capture = super::terminal_input::TerminalInputCore::new();
186 capture.start_impl().map_err(PtyError::Io)?;
187 *worker_guard = Some(thread::spawn(move || {
188 loop {
189 if stop.load(Ordering::Acquire) {
190 break;
191 }
192 match poll_pty_process(&handles, &returncode) {
193 Ok(Some(_)) => break,
194 Ok(None) => {}
195 Err(_) => break,
196 }
197 match super::terminal_input::wait_for_terminal_input_event(
198 &capture.state,
199 &capture.condvar,
200 Some(Duration::from_millis(50)),
201 ) {
202 super::terminal_input::TerminalInputWaitOutcome::Event(event) => {
203 record_pty_input_metrics(
204 &input_bytes_total,
205 &newline_events_total,
206 &submit_events_total,
207 &event.data,
208 event.submit,
209 );
210 if write_pty_input(&handles, &event.data).is_err() {
211 break;
212 }
213 }
214 super::terminal_input::TerminalInputWaitOutcome::Timeout => continue,
215 super::terminal_input::TerminalInputWaitOutcome::Closed => break,
216 }
217 }
218 active.store(false, Ordering::Release);
219 let _ = capture.stop_impl();
220 }));
221 Ok(())
222 }
223
224 #[cfg(unix)]
225 {
226 if unsafe { libc::isatty(libc::STDIN_FILENO) } != 1 {
227 self.terminal_input_relay_active
228 .store(false, Ordering::Release);
229 return Ok(());
230 }
231
232 *worker_guard = Some(thread::spawn(move || {
233 posix_terminal_input_relay_worker(
234 handles,
235 returncode,
236 input_bytes_total,
237 newline_events_total,
238 submit_events_total,
239 stop,
240 active,
241 );
242 }));
243 Ok(())
244 }
245 }
246
247 pub fn stop_terminal_input_relay_impl(&self) {
248 self.request_terminal_input_relay_stop();
249 if let Some(worker) = self
250 .terminal_input_relay_worker
251 .lock()
252 .expect("pty terminal input relay mutex poisoned")
253 .take()
254 {
255 let _ = worker.join();
256 }
257 }
258
259 pub fn terminal_input_relay_active(&self) -> bool {
260 self.terminal_input_relay_active.load(Ordering::Acquire)
261 }
262
263 #[inline(never)]
265 pub fn close_impl(&self) -> Result<(), PtyError> {
266 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_impl");
267 self.stop_terminal_input_relay_impl();
268 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
269 let Some(handles) = guard.take() else {
270 self.mark_reader_closed();
271 return Ok(());
272 };
273 drop(guard);
274
275 #[cfg(windows)]
276 let NativePtyHandles {
277 master,
278 writer,
279 mut child,
280 _job,
281 } = handles;
282 #[cfg(not(windows))]
283 let NativePtyHandles {
284 master,
285 writer,
286 mut child,
287 } = handles;
288
289 #[cfg(windows)]
290 {
291 {
292 crate::rp_rust_debug_scope!(
293 "running_process::NativePtyProcess::close_impl.drop_job"
294 );
295 drop(_job);
296 }
297
298 {
299 crate::rp_rust_debug_scope!(
300 "running_process::NativePtyProcess::close_impl.wait_job_exit"
301 );
302 let wait_deadline = Instant::now() + Duration::from_secs(2);
303 loop {
304 match child.try_wait() {
305 Ok(Some(status)) => {
306 let code = status as i32;
307 self.store_returncode(code);
308 break;
309 }
310 Ok(None) if Instant::now() < wait_deadline => {
311 thread::sleep(Duration::from_millis(10));
318 }
319 Ok(None) => {
320 if let Err(err) = child.kill() {
321 if !is_ignorable_process_control_error(&err) {
322 return Err(PtyError::Io(err));
323 }
324 }
325 let code = match child.wait() {
326 Ok(status) => status as i32,
327 Err(_) => -9,
328 };
329 self.store_returncode(code);
330 break;
331 }
332 Err(_) => {
333 self.store_returncode(-9);
334 break;
335 }
336 }
337 }
338 }
339 {
340 crate::rp_rust_debug_scope!(
341 "running_process::NativePtyProcess::close_impl.drop_writer"
342 );
343 drop(writer);
344 }
345 {
346 crate::rp_rust_debug_scope!(
347 "running_process::NativePtyProcess::close_impl.drop_master"
348 );
349 drop(master);
350 }
351 drop(child);
352 {
353 crate::rp_rust_debug_scope!(
354 "running_process::NativePtyProcess::close_impl.join_reader"
355 );
356 self.join_reader_worker();
357 }
358 self.mark_reader_closed();
359 Ok(())
360 }
361
362 #[cfg(not(windows))]
363 {
364 drop(writer);
365 drop(master);
366
367 let code = {
368 crate::rp_rust_debug_scope!(
369 "running_process::NativePtyProcess::close_impl.wait_child"
370 );
371 match child.wait() {
372 Ok(status) => status as i32,
373 Err(_) => -9,
374 }
375 };
376 drop(child);
377
378 self.store_returncode(code);
379 {
380 crate::rp_rust_debug_scope!(
381 "running_process::NativePtyProcess::close_impl.join_reader"
382 );
383 self.join_reader_worker();
384 }
385 self.mark_reader_closed();
386 Ok(())
387 }
388 }
389
390 #[inline(never)]
392 pub fn close_nonblocking(&self) {
393 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_nonblocking");
394 #[cfg(windows)]
395 self.request_terminal_input_relay_stop();
396 let Ok(mut guard) = self.handles.lock() else {
397 return;
398 };
399 let Some(handles) = guard.take() else {
400 self.mark_reader_closed();
401 return;
402 };
403 drop(guard);
404
405 #[cfg(windows)]
406 let NativePtyHandles {
407 master,
408 writer,
409 mut child,
410 _job,
411 } = handles;
412 #[cfg(not(windows))]
413 let NativePtyHandles {
414 master,
415 writer,
416 mut child,
417 } = handles;
418
419 if let Err(err) = child.kill() {
420 if !is_ignorable_process_control_error(&err) {
421 return;
422 }
423 }
424 drop(writer);
425 drop(master);
426 drop(child);
427 #[cfg(windows)]
428 drop(_job);
429 self.mark_reader_closed();
430 }
431
432 pub fn start_impl(&self) -> Result<(), PtyError> {
433 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::start");
434 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
435 if guard.is_some() {
436 return Err(PtyError::AlreadyStarted);
437 }
438
439 #[cfg(windows)]
442 let conhost_pids_before = conhost_children_of_current_process();
443
444 let (mut master, slave) = Backend::openpty(PtySize {
445 rows: self.rows,
446 cols: self.cols,
447 pixel_width: 0,
448 pixel_height: 0,
449 })
450 .map_err(|e| PtyError::Spawn(e.to_string()))?;
451
452 let argv: Vec<std::ffi::OsString> =
454 self.argv.iter().map(std::ffi::OsString::from).collect();
455 let cwd = resolved_spawn_cwd(self.cwd.as_deref());
456 let env: Option<Vec<(std::ffi::OsString, std::ffi::OsString)>> =
457 self.env.as_ref().map(|e| {
458 e.iter()
459 .map(|(k, v)| (std::ffi::OsString::from(k), std::ffi::OsString::from(v)))
460 .collect()
461 });
462
463 let reader = master
464 .try_clone_reader()
465 .map_err(|e| PtyError::Spawn(e.to_string()))?;
466 let writer = master
467 .take_writer()
468 .map_err(|e| PtyError::Spawn(e.to_string()))?;
469 let cwd_path = cwd.as_deref().map(std::path::Path::new);
470 let child = slave
471 .spawn(&argv, cwd_path, env.as_deref())
472 .map_err(|e| PtyError::Spawn(e.to_string()))?;
473 #[cfg(windows)]
476 let job =
477 assign_child_to_windows_kill_on_close_job(PtyChild::as_raw_handle(&child))?;
478 #[cfg(windows)]
479 assign_conpty_conhost_to_job(&job, &conhost_pids_before);
480 #[cfg(windows)]
481 apply_windows_pty_priority(PtyChild::as_raw_handle(&child), self.nice)?;
482 let shared = Arc::clone(&self.reader);
483 let echo = Arc::clone(&self.echo);
484 let idle_detector = Arc::clone(&self.idle_detector);
485 let output_bytes = Arc::clone(&self.output_bytes_total);
486 let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
487 let reader_worker = thread::spawn(move || {
488 spawn_pty_reader(
489 reader,
490 shared,
491 echo,
492 idle_detector,
493 output_bytes,
494 churn_bytes,
495 );
496 });
497 *self
498 .reader_worker
499 .lock()
500 .expect("pty reader worker mutex poisoned") = Some(reader_worker);
501
502 *guard = Some(NativePtyHandles {
503 master: Box::new(master) as Box<dyn PtyMaster>,
504 writer,
505 child: Box::new(child) as Box<dyn PtyChild>,
506 #[cfg(windows)]
507 _job: job,
508 });
509 Ok(())
510 }
511
512 pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
513 #[cfg(windows)]
514 {
515 pty_windows::respond_to_queries(self, data)
516 }
517
518 #[cfg(unix)]
519 {
520 pty_platform::respond_to_queries(self, data)
521 }
522 }
523
524 pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
525 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::resize");
526 let guard = self.handles.lock().expect("pty handles mutex poisoned");
527 if let Some(handles) = guard.as_ref() {
528 #[cfg(windows)]
529 {
530 let _ = (rows, cols, handles);
531 return Ok(());
535 }
536
537 #[cfg(not(windows))]
538 handles
539 .master
540 .resize(PtySize {
541 rows,
542 cols,
543 pixel_width: 0,
544 pixel_height: 0,
545 })
546 .map_err(|e| PtyError::Other(e.to_string()))?;
547 }
548 Ok(())
549 }
550
551 pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
552 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::send_interrupt");
553 #[cfg(windows)]
554 {
555 pty_windows::send_interrupt(self)
556 }
557
558 #[cfg(unix)]
559 {
560 pty_platform::send_interrupt(self)
561 }
562 }
563
564 pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
565 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::wait");
566 if let Some(code) = *self
568 .returncode
569 .lock()
570 .expect("pty returncode mutex poisoned")
571 {
572 return Ok(code);
573 }
574 let start = Instant::now();
575 loop {
576 if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
577 return Ok(code);
578 }
579 if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
580 return Err(PtyError::Timeout);
581 }
582 thread::sleep(Duration::from_millis(10));
586 }
587 }
588
589 pub fn terminate_impl(&self) -> Result<(), PtyError> {
590 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate");
591 #[cfg(windows)]
592 {
593 if self
594 .handles
595 .lock()
596 .expect("pty handles mutex poisoned")
597 .is_none()
598 {
599 return Err(PtyError::NotRunning);
600 }
601 self.close_impl()
602 }
603
604 #[cfg(unix)]
605 {
606 pty_platform::terminate(self)
607 }
608 }
609
610 pub fn kill_impl(&self) -> Result<(), PtyError> {
611 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill");
612 #[cfg(windows)]
613 {
614 if self
615 .handles
616 .lock()
617 .expect("pty handles mutex poisoned")
618 .is_none()
619 {
620 return Err(PtyError::NotRunning);
621 }
622 self.close_impl()
623 }
624
625 #[cfg(unix)]
626 {
627 pty_platform::kill(self)
628 }
629 }
630
631 pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
632 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate_tree");
633 #[cfg(windows)]
634 {
635 pty_windows::terminate_tree(self)
636 }
637
638 #[cfg(unix)]
639 {
640 pty_platform::terminate_tree(self)
641 }
642 }
643
644 pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
645 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill_tree");
646 #[cfg(windows)]
647 {
648 pty_windows::kill_tree(self)
649 }
650
651 #[cfg(unix)]
652 {
653 pty_platform::kill_tree(self)
654 }
655 }
656
657 pub fn pid(&self) -> Result<Option<u32>, PtyError> {
659 let guard = self.handles.lock().expect("pty handles mutex poisoned");
660 if let Some(handles) = guard.as_ref() {
661 #[cfg(unix)]
662 if let Some(pid) = handles.master.process_group_leader() {
663 if let Ok(pid) = u32::try_from(pid) {
664 return Ok(Some(pid));
665 }
666 }
667 return Ok(Some(handles.child.pid()));
668 }
669 Ok(None)
670 }
671
672 pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
675 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
676 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
677 loop {
678 if let Some(chunk) = guard.chunks.pop_front() {
679 return Ok(Some(chunk));
680 }
681 if guard.closed {
682 return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
683 }
684 match deadline {
685 Some(deadline) => {
686 let now = Instant::now();
687 if now >= deadline {
688 return Ok(None); }
690 let wait = deadline.saturating_duration_since(now);
691 let result = self
692 .reader
693 .condvar
694 .wait_timeout(guard, wait)
695 .expect("pty read mutex poisoned");
696 guard = result.0;
697 }
698 None => {
699 guard = self
700 .reader
701 .condvar
702 .wait(guard)
703 .expect("pty read mutex poisoned");
704 }
705 }
706 }
707 }
708
709 pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
711 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
712 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
713 loop {
714 if guard.closed {
715 return true;
716 }
717 match deadline {
718 Some(deadline) => {
719 let now = Instant::now();
720 if now >= deadline {
721 return false;
722 }
723 let wait = deadline.saturating_duration_since(now);
724 let result = self
725 .reader
726 .condvar
727 .wait_timeout(guard, wait)
728 .expect("pty read mutex poisoned");
729 guard = result.0;
730 }
731 None => {
732 guard = self
733 .reader
734 .condvar
735 .wait(guard)
736 .expect("pty read mutex poisoned");
737 }
738 }
739 }
740 }
741
742 pub fn wait_and_drain_impl(
744 &self,
745 timeout: Option<f64>,
746 drain_timeout: f64,
747 ) -> Result<i32, PtyError> {
748 let code = self.wait_impl(timeout)?;
749 let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
750 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
751 while !guard.closed {
752 let remaining = deadline.saturating_duration_since(Instant::now());
753 if remaining.is_zero() {
754 break;
755 }
756 let result = self
757 .reader
758 .condvar
759 .wait_timeout(guard, remaining)
760 .expect("pty read mutex poisoned");
761 guard = result.0;
762 }
763 Ok(code)
764 }
765
766 pub fn set_echo(&self, enabled: bool) {
767 self.echo.store(enabled, Ordering::Release);
768 }
769
770 pub fn echo_enabled(&self) -> bool {
771 self.echo.load(Ordering::Acquire)
772 }
773
774 pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
775 let mut guard = self
776 .idle_detector
777 .lock()
778 .expect("idle detector mutex poisoned");
779 *guard = Some(Arc::clone(detector));
780 }
781
782 pub fn detach_idle_detector(&self) {
783 let mut guard = self
784 .idle_detector
785 .lock()
786 .expect("idle detector mutex poisoned");
787 *guard = None;
788 }
789
790 pub fn pty_input_bytes_total(&self) -> usize {
791 self.input_bytes_total.load(Ordering::Acquire)
792 }
793
794 pub fn pty_newline_events_total(&self) -> usize {
795 self.newline_events_total.load(Ordering::Acquire)
796 }
797
798 pub fn pty_submit_events_total(&self) -> usize {
799 self.submit_events_total.load(Ordering::Acquire)
800 }
801
802 pub fn pty_output_bytes_total(&self) -> usize {
803 self.output_bytes_total.load(Ordering::Acquire)
804 }
805
806 pub fn pty_control_churn_bytes_total(&self) -> usize {
807 self.control_churn_bytes_total.load(Ordering::Acquire)
808 }
809}
810
811#[derive(Debug, Clone, Copy)]
816pub struct InteractivePtyOptions {
817 pub echo_output: bool,
818 pub relay_terminal_input: bool,
819 pub respond_to_queries: bool,
820}
821
822impl Default for InteractivePtyOptions {
823 fn default() -> Self {
824 Self {
825 echo_output: true,
826 relay_terminal_input: true,
827 respond_to_queries: true,
828 }
829 }
830}
831
832#[derive(Debug, Default)]
833pub struct InteractivePtyPumpResult {
834 pub chunks: Vec<Vec<u8>>,
835 pub stream_closed: bool,
836}
837
838pub struct InteractivePtySession {
843 process: NativePtyProcess,
844 options: InteractivePtyOptions,
845}
846
847impl InteractivePtySession {
848 pub fn new(process: NativePtyProcess) -> Self {
849 Self::with_options(process, InteractivePtyOptions::default())
850 }
851
852 pub fn with_options(process: NativePtyProcess, options: InteractivePtyOptions) -> Self {
853 Self { process, options }
854 }
855
856 pub fn process(&self) -> &NativePtyProcess {
857 &self.process
858 }
859
860 pub fn start(&self) -> Result<(), PtyError> {
861 self.process.set_echo(self.options.echo_output);
862 self.process.start_impl()?;
863 if self.options.relay_terminal_input {
864 self.process.start_terminal_input_relay_impl()?;
865 }
866 Ok(())
867 }
868
869 pub fn pump_output(
870 &self,
871 timeout: Option<f64>,
872 consume_all: bool,
873 ) -> Result<InteractivePtyPumpResult, PtyError> {
874 let mut pumped = InteractivePtyPumpResult::default();
875 let mut next_timeout = timeout;
876 loop {
877 match self.process.read_chunk_impl(next_timeout) {
878 Ok(Some(chunk)) => {
879 if self.options.respond_to_queries {
880 self.process.respond_to_queries_impl(&chunk)?;
881 }
882 pumped.chunks.push(chunk);
883 if !consume_all {
884 break;
885 }
886 next_timeout = Some(0.0);
887 }
888 Ok(None) => break,
889 Err(PtyError::Other(message)) if message == "Pseudo-terminal stream is closed" => {
890 pumped.stream_closed = true;
891 break;
892 }
893 Err(err) => return Err(err),
894 }
895 }
896 Ok(pumped)
897 }
898
899 pub fn resize(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
900 self.process.resize_impl(rows, cols)
901 }
902
903 pub fn send_interrupt(&self) -> Result<(), PtyError> {
904 self.process.send_interrupt_impl()
905 }
906
907 pub fn wait(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
908 self.process.wait_impl(timeout)
909 }
910
911 pub fn wait_and_drain(
912 &self,
913 timeout: Option<f64>,
914 drain_timeout: f64,
915 ) -> Result<i32, PtyError> {
916 self.process.wait_and_drain_impl(timeout, drain_timeout)
917 }
918
919 pub fn terminate(&self) -> Result<(), PtyError> {
920 self.process.terminate_impl()
921 }
922
923 pub fn kill(&self) -> Result<(), PtyError> {
924 self.process.kill_impl()
925 }
926
927 pub fn close(&self) -> Result<(), PtyError> {
928 self.process.close_impl()
929 }
930}
931
932impl Drop for NativePtyProcess {
933 fn drop(&mut self) {
934 self.close_nonblocking();
935 }
936}