1use std::collections::VecDeque;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3use std::sync::{Arc, Condvar, Mutex};
4use std::thread;
5use std::time::{Duration, Instant};
6
7use super::backend::{Backend, PtyBackend, PtyChild, PtyMaster, PtySize, PtySlave};
8use super::{
9 is_ignorable_process_control_error, poll_pty_process, record_pty_input_metrics,
10 spawn_pty_reader, store_pty_returncode, write_pty_input, IdleDetectorCore, NativePtyHandles,
11 PtyError, PtyReadShared, PtyReadState,
12};
13#[cfg(unix)]
14use super::posix_terminal_input_relay_worker;
15#[cfg(windows)]
16use super::{
17 apply_windows_pty_priority, assign_child_to_windows_kill_on_close_job,
18 assign_conpty_conhost_to_job, conhost_children_of_current_process,
19};
20
21#[cfg(unix)]
22use super::pty_posix as pty_platform;
23#[cfg(windows)]
24use super::pty_windows;
25
26pub struct NativePtyProcess {
27 pub argv: Vec<String>,
28 pub cwd: Option<String>,
29 pub env: Option<Vec<(String, String)>>,
30 pub rows: u16,
31 pub cols: u16,
32 #[cfg(windows)]
33 pub nice: Option<i32>,
34 pub handles: Arc<Mutex<Option<NativePtyHandles>>>,
35 pub reader: Arc<PtyReadShared>,
36 pub returncode: Arc<Mutex<Option<i32>>>,
37 pub input_bytes_total: Arc<AtomicUsize>,
38 pub newline_events_total: Arc<AtomicUsize>,
39 pub submit_events_total: Arc<AtomicUsize>,
40 pub echo: Arc<AtomicBool>,
42 pub idle_detector: Arc<Mutex<Option<Arc<IdleDetectorCore>>>>,
44 pub output_bytes_total: Arc<AtomicUsize>,
46 pub control_churn_bytes_total: Arc<AtomicUsize>,
48 pub reader_worker: Mutex<Option<thread::JoinHandle<()>>>,
49 pub terminal_input_relay_stop: Arc<AtomicBool>,
50 pub terminal_input_relay_active: Arc<AtomicBool>,
51 pub terminal_input_relay_worker: Mutex<Option<thread::JoinHandle<()>>>,
52}
53
54pub(super) fn resolved_spawn_cwd(cwd: Option<&str>) -> Option<String> {
55 cwd.map(str::to_owned).or_else(|| {
56 std::env::current_dir()
57 .ok()
58 .map(|cwd| cwd.to_string_lossy().to_string())
59 })
60}
61
62impl NativePtyProcess {
63 pub fn new(
64 argv: Vec<String>,
65 cwd: Option<String>,
66 env: Option<Vec<(String, String)>>,
67 rows: u16,
68 cols: u16,
69 nice: Option<i32>,
70 ) -> Result<Self, PtyError> {
71 if argv.is_empty() {
72 return Err(PtyError::Other("command cannot be empty".into()));
73 }
74 #[cfg(not(windows))]
75 let _ = nice;
76 Ok(Self {
77 argv,
78 cwd,
79 env,
80 rows,
81 cols,
82 #[cfg(windows)]
83 nice,
84 handles: Arc::new(Mutex::new(None)),
85 reader: Arc::new(PtyReadShared {
86 state: Mutex::new(PtyReadState {
87 chunks: VecDeque::new(),
88 closed: false,
89 }),
90 condvar: Condvar::new(),
91 }),
92 returncode: Arc::new(Mutex::new(None)),
93 input_bytes_total: Arc::new(AtomicUsize::new(0)),
94 newline_events_total: Arc::new(AtomicUsize::new(0)),
95 submit_events_total: Arc::new(AtomicUsize::new(0)),
96 echo: Arc::new(AtomicBool::new(false)),
97 idle_detector: Arc::new(Mutex::new(None)),
98 output_bytes_total: Arc::new(AtomicUsize::new(0)),
99 control_churn_bytes_total: Arc::new(AtomicUsize::new(0)),
100 reader_worker: Mutex::new(None),
101 terminal_input_relay_stop: Arc::new(AtomicBool::new(false)),
102 terminal_input_relay_active: Arc::new(AtomicBool::new(false)),
103 terminal_input_relay_worker: Mutex::new(None),
104 })
105 }
106
107 pub fn mark_reader_closed(&self) {
108 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
109 guard.closed = true;
110 self.reader.condvar.notify_all();
111 }
112
113 pub fn store_returncode(&self, code: i32) {
114 store_pty_returncode(&self.returncode, code);
115 }
116
117 pub(super) fn join_reader_worker(&self) {
118 if let Some(worker) = self
119 .reader_worker
120 .lock()
121 .expect("pty reader worker mutex poisoned")
122 .take()
123 {
124 let _ = worker.join();
125 }
126 }
127
128 pub fn record_input_metrics(&self, data: &[u8], submit: bool) {
129 record_pty_input_metrics(
130 &self.input_bytes_total,
131 &self.newline_events_total,
132 &self.submit_events_total,
133 data,
134 submit,
135 );
136 }
137
138 pub fn write_impl(&self, data: &[u8], submit: bool) -> Result<(), PtyError> {
139 self.record_input_metrics(data, submit);
140 write_pty_input(&self.handles, data)?;
141 Ok(())
142 }
143
144 pub fn request_terminal_input_relay_stop(&self) {
145 self.terminal_input_relay_stop
146 .store(true, Ordering::Release);
147 self.terminal_input_relay_active
148 .store(false, Ordering::Release);
149 }
150
151 pub fn start_terminal_input_relay_impl(&self) -> Result<(), PtyError> {
152 let mut worker_guard = self
153 .terminal_input_relay_worker
154 .lock()
155 .expect("pty terminal input relay mutex poisoned");
156 if worker_guard.is_some() && self.terminal_input_relay_active() {
157 return Ok(());
158 }
159 if self
160 .handles
161 .lock()
162 .expect("pty handles mutex poisoned")
163 .is_none()
164 {
165 return Err(PtyError::NotRunning);
166 }
167
168 self.terminal_input_relay_stop
169 .store(false, Ordering::Release);
170 self.terminal_input_relay_active
171 .store(true, Ordering::Release);
172
173 let handles = Arc::clone(&self.handles);
174 let returncode = Arc::clone(&self.returncode);
175 let input_bytes_total = Arc::clone(&self.input_bytes_total);
176 let newline_events_total = Arc::clone(&self.newline_events_total);
177 let submit_events_total = Arc::clone(&self.submit_events_total);
178 let stop = Arc::clone(&self.terminal_input_relay_stop);
179 let active = Arc::clone(&self.terminal_input_relay_active);
180
181 #[cfg(windows)]
182 {
183 let capture = super::terminal_input::TerminalInputCore::new();
184 capture.start_impl().map_err(PtyError::Io)?;
185 *worker_guard = Some(thread::spawn(move || {
186 loop {
187 if stop.load(Ordering::Acquire) {
188 break;
189 }
190 match poll_pty_process(&handles, &returncode) {
191 Ok(Some(_)) => break,
192 Ok(None) => {}
193 Err(_) => break,
194 }
195 match super::terminal_input::wait_for_terminal_input_event(
196 &capture.state,
197 &capture.condvar,
198 Some(Duration::from_millis(50)),
199 ) {
200 super::terminal_input::TerminalInputWaitOutcome::Event(event) => {
201 record_pty_input_metrics(
202 &input_bytes_total,
203 &newline_events_total,
204 &submit_events_total,
205 &event.data,
206 event.submit,
207 );
208 if write_pty_input(&handles, &event.data).is_err() {
209 break;
210 }
211 }
212 super::terminal_input::TerminalInputWaitOutcome::Timeout => continue,
213 super::terminal_input::TerminalInputWaitOutcome::Closed => break,
214 }
215 }
216 active.store(false, Ordering::Release);
217 let _ = capture.stop_impl();
218 }));
219 Ok(())
220 }
221
222 #[cfg(unix)]
223 {
224 if unsafe { libc::isatty(libc::STDIN_FILENO) } != 1 {
225 self.terminal_input_relay_active
226 .store(false, Ordering::Release);
227 return Ok(());
228 }
229
230 *worker_guard = Some(thread::spawn(move || {
231 posix_terminal_input_relay_worker(
232 handles,
233 returncode,
234 input_bytes_total,
235 newline_events_total,
236 submit_events_total,
237 stop,
238 active,
239 );
240 }));
241 Ok(())
242 }
243 }
244
245 pub fn stop_terminal_input_relay_impl(&self) {
246 self.request_terminal_input_relay_stop();
247 if let Some(worker) = self
248 .terminal_input_relay_worker
249 .lock()
250 .expect("pty terminal input relay mutex poisoned")
251 .take()
252 {
253 let _ = worker.join();
254 }
255 }
256
257 pub fn terminal_input_relay_active(&self) -> bool {
258 self.terminal_input_relay_active.load(Ordering::Acquire)
259 }
260
261 #[inline(never)]
263 pub fn close_impl(&self) -> Result<(), PtyError> {
264 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_impl");
265 self.stop_terminal_input_relay_impl();
266 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
267 let Some(handles) = guard.take() else {
268 self.mark_reader_closed();
269 return Ok(());
270 };
271 drop(guard);
272
273 #[cfg(windows)]
274 let NativePtyHandles {
275 master,
276 writer,
277 mut child,
278 _job,
279 } = handles;
280 #[cfg(not(windows))]
281 let NativePtyHandles {
282 master,
283 writer,
284 mut child,
285 } = handles;
286
287 #[cfg(windows)]
288 {
289 {
290 crate::rp_rust_debug_scope!(
291 "running_process::NativePtyProcess::close_impl.drop_job"
292 );
293 drop(_job);
294 }
295
296 {
297 crate::rp_rust_debug_scope!(
298 "running_process::NativePtyProcess::close_impl.wait_job_exit"
299 );
300 let wait_deadline = Instant::now() + Duration::from_secs(2);
301 loop {
302 match child.try_wait() {
303 Ok(Some(status)) => {
304 let code = status as i32;
305 self.store_returncode(code);
306 break;
307 }
308 Ok(None) if Instant::now() < wait_deadline => {
309 thread::sleep(Duration::from_millis(10));
316 }
317 Ok(None) => {
318 if let Err(err) = child.kill() {
319 if !is_ignorable_process_control_error(&err) {
320 return Err(PtyError::Io(err));
321 }
322 }
323 let code = match child.wait() {
324 Ok(status) => status as i32,
325 Err(_) => -9,
326 };
327 self.store_returncode(code);
328 break;
329 }
330 Err(_) => {
331 self.store_returncode(-9);
332 break;
333 }
334 }
335 }
336 }
337 {
338 crate::rp_rust_debug_scope!(
339 "running_process::NativePtyProcess::close_impl.drop_writer"
340 );
341 drop(writer);
342 }
343 {
344 crate::rp_rust_debug_scope!(
345 "running_process::NativePtyProcess::close_impl.drop_master"
346 );
347 drop(master);
348 }
349 drop(child);
350 {
351 crate::rp_rust_debug_scope!(
352 "running_process::NativePtyProcess::close_impl.join_reader"
353 );
354 self.join_reader_worker();
355 }
356 self.mark_reader_closed();
357 Ok(())
358 }
359
360 #[cfg(not(windows))]
361 {
362 drop(writer);
363 drop(master);
364
365 let code = {
366 crate::rp_rust_debug_scope!(
367 "running_process::NativePtyProcess::close_impl.wait_child"
368 );
369 match child.wait() {
370 Ok(status) => status as i32,
371 Err(_) => -9,
372 }
373 };
374 drop(child);
375
376 self.store_returncode(code);
377 {
378 crate::rp_rust_debug_scope!(
379 "running_process::NativePtyProcess::close_impl.join_reader"
380 );
381 self.join_reader_worker();
382 }
383 self.mark_reader_closed();
384 Ok(())
385 }
386 }
387
388 #[inline(never)]
390 pub fn close_nonblocking(&self) {
391 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::close_nonblocking");
392 #[cfg(windows)]
393 self.request_terminal_input_relay_stop();
394 let Ok(mut guard) = self.handles.lock() else {
395 return;
396 };
397 let Some(handles) = guard.take() else {
398 self.mark_reader_closed();
399 return;
400 };
401 drop(guard);
402
403 #[cfg(windows)]
404 let NativePtyHandles {
405 master,
406 writer,
407 mut child,
408 _job,
409 } = handles;
410 #[cfg(not(windows))]
411 let NativePtyHandles {
412 master,
413 writer,
414 mut child,
415 } = handles;
416
417 if let Err(err) = child.kill() {
418 if !is_ignorable_process_control_error(&err) {
419 return;
420 }
421 }
422 drop(writer);
423 drop(master);
424 drop(child);
425 #[cfg(windows)]
426 drop(_job);
427 self.mark_reader_closed();
428 }
429
430 pub fn start_impl(&self) -> Result<(), PtyError> {
431 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::start");
432 let mut guard = self.handles.lock().expect("pty handles mutex poisoned");
433 if guard.is_some() {
434 return Err(PtyError::AlreadyStarted);
435 }
436
437 #[cfg(windows)]
440 let conhost_pids_before = conhost_children_of_current_process();
441
442 let (mut master, slave) = Backend::openpty(PtySize {
443 rows: self.rows,
444 cols: self.cols,
445 pixel_width: 0,
446 pixel_height: 0,
447 })
448 .map_err(|e| PtyError::Spawn(e.to_string()))?;
449
450 let argv: Vec<std::ffi::OsString> =
452 self.argv.iter().map(std::ffi::OsString::from).collect();
453 let cwd = resolved_spawn_cwd(self.cwd.as_deref());
454 let env: Option<Vec<(std::ffi::OsString, std::ffi::OsString)>> =
455 self.env.as_ref().map(|e| {
456 e.iter()
457 .map(|(k, v)| (std::ffi::OsString::from(k), std::ffi::OsString::from(v)))
458 .collect()
459 });
460
461 let reader = master
462 .try_clone_reader()
463 .map_err(|e| PtyError::Spawn(e.to_string()))?;
464 let writer = master
465 .take_writer()
466 .map_err(|e| PtyError::Spawn(e.to_string()))?;
467 let cwd_path = cwd.as_deref().map(std::path::Path::new);
468 let child = slave
469 .spawn(&argv, cwd_path, env.as_deref())
470 .map_err(|e| PtyError::Spawn(e.to_string()))?;
471 #[cfg(windows)]
474 let job =
475 assign_child_to_windows_kill_on_close_job(PtyChild::as_raw_handle(&child))?;
476 #[cfg(windows)]
477 assign_conpty_conhost_to_job(&job, &conhost_pids_before);
478 #[cfg(windows)]
479 apply_windows_pty_priority(PtyChild::as_raw_handle(&child), self.nice)?;
480 let shared = Arc::clone(&self.reader);
481 let echo = Arc::clone(&self.echo);
482 let idle_detector = Arc::clone(&self.idle_detector);
483 let output_bytes = Arc::clone(&self.output_bytes_total);
484 let churn_bytes = Arc::clone(&self.control_churn_bytes_total);
485 let reader_worker = thread::spawn(move || {
486 spawn_pty_reader(
487 reader,
488 shared,
489 echo,
490 idle_detector,
491 output_bytes,
492 churn_bytes,
493 );
494 });
495 *self
496 .reader_worker
497 .lock()
498 .expect("pty reader worker mutex poisoned") = Some(reader_worker);
499
500 *guard = Some(NativePtyHandles {
501 master: Box::new(master) as Box<dyn PtyMaster>,
502 writer,
503 child: Box::new(child) as Box<dyn PtyChild>,
504 #[cfg(windows)]
505 _job: job,
506 });
507 Ok(())
508 }
509
510 pub fn respond_to_queries_impl(&self, data: &[u8]) -> Result<(), PtyError> {
511 #[cfg(windows)]
512 {
513 pty_windows::respond_to_queries(self, data)
514 }
515
516 #[cfg(unix)]
517 {
518 pty_platform::respond_to_queries(self, data)
519 }
520 }
521
522 pub fn resize_impl(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
523 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::resize");
524 let guard = self.handles.lock().expect("pty handles mutex poisoned");
525 if let Some(handles) = guard.as_ref() {
526 #[cfg(windows)]
527 {
528 let _ = (rows, cols, handles);
529 return Ok(());
533 }
534
535 #[cfg(not(windows))]
536 handles
537 .master
538 .resize(PtySize {
539 rows,
540 cols,
541 pixel_width: 0,
542 pixel_height: 0,
543 })
544 .map_err(|e| PtyError::Other(e.to_string()))?;
545 }
546 Ok(())
547 }
548
549 pub fn send_interrupt_impl(&self) -> Result<(), PtyError> {
550 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::send_interrupt");
551 #[cfg(windows)]
552 {
553 pty_windows::send_interrupt(self)
554 }
555
556 #[cfg(unix)]
557 {
558 pty_platform::send_interrupt(self)
559 }
560 }
561
562 pub fn wait_impl(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
563 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::wait");
564 if let Some(code) = *self
566 .returncode
567 .lock()
568 .expect("pty returncode mutex poisoned")
569 {
570 return Ok(code);
571 }
572 let start = Instant::now();
573 loop {
574 if let Some(code) = poll_pty_process(&self.handles, &self.returncode)? {
575 return Ok(code);
576 }
577 if timeout.is_some_and(|limit| start.elapsed() >= Duration::from_secs_f64(limit)) {
578 return Err(PtyError::Timeout);
579 }
580 thread::sleep(Duration::from_millis(10));
584 }
585 }
586
587 pub fn terminate_impl(&self) -> Result<(), PtyError> {
588 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate");
589 #[cfg(windows)]
590 {
591 if self
592 .handles
593 .lock()
594 .expect("pty handles mutex poisoned")
595 .is_none()
596 {
597 return Err(PtyError::NotRunning);
598 }
599 self.close_impl()
600 }
601
602 #[cfg(unix)]
603 {
604 pty_platform::terminate(self)
605 }
606 }
607
608 pub fn kill_impl(&self) -> Result<(), PtyError> {
609 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill");
610 #[cfg(windows)]
611 {
612 if self
613 .handles
614 .lock()
615 .expect("pty handles mutex poisoned")
616 .is_none()
617 {
618 return Err(PtyError::NotRunning);
619 }
620 self.close_impl()
621 }
622
623 #[cfg(unix)]
624 {
625 pty_platform::kill(self)
626 }
627 }
628
629 pub fn terminate_tree_impl(&self) -> Result<(), PtyError> {
630 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::terminate_tree");
631 #[cfg(windows)]
632 {
633 pty_windows::terminate_tree(self)
634 }
635
636 #[cfg(unix)]
637 {
638 pty_platform::terminate_tree(self)
639 }
640 }
641
642 pub fn kill_tree_impl(&self) -> Result<(), PtyError> {
643 crate::rp_rust_debug_scope!("running_process::NativePtyProcess::kill_tree");
644 #[cfg(windows)]
645 {
646 pty_windows::kill_tree(self)
647 }
648
649 #[cfg(unix)]
650 {
651 pty_platform::kill_tree(self)
652 }
653 }
654
655 pub fn pid(&self) -> Result<Option<u32>, PtyError> {
657 let guard = self.handles.lock().expect("pty handles mutex poisoned");
658 if let Some(handles) = guard.as_ref() {
659 #[cfg(unix)]
660 if let Some(pid) = handles.master.process_group_leader() {
661 if let Ok(pid) = u32::try_from(pid) {
662 return Ok(Some(pid));
663 }
664 }
665 return Ok(Some(handles.child.pid()));
666 }
667 Ok(None)
668 }
669
670 pub fn read_chunk_impl(&self, timeout: Option<f64>) -> Result<Option<Vec<u8>>, PtyError> {
673 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
674 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
675 loop {
676 if let Some(chunk) = guard.chunks.pop_front() {
677 return Ok(Some(chunk));
678 }
679 if guard.closed {
680 return Err(PtyError::Other("Pseudo-terminal stream is closed".into()));
681 }
682 match deadline {
683 Some(deadline) => {
684 let now = Instant::now();
685 if now >= deadline {
686 return Ok(None); }
688 let wait = deadline.saturating_duration_since(now);
689 let result = self
690 .reader
691 .condvar
692 .wait_timeout(guard, wait)
693 .expect("pty read mutex poisoned");
694 guard = result.0;
695 }
696 None => {
697 guard = self
698 .reader
699 .condvar
700 .wait(guard)
701 .expect("pty read mutex poisoned");
702 }
703 }
704 }
705 }
706
707 pub fn wait_for_reader_closed_impl(&self, timeout: Option<f64>) -> bool {
709 let deadline = timeout.map(|secs| Instant::now() + Duration::from_secs_f64(secs));
710 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
711 loop {
712 if guard.closed {
713 return true;
714 }
715 match deadline {
716 Some(deadline) => {
717 let now = Instant::now();
718 if now >= deadline {
719 return false;
720 }
721 let wait = deadline.saturating_duration_since(now);
722 let result = self
723 .reader
724 .condvar
725 .wait_timeout(guard, wait)
726 .expect("pty read mutex poisoned");
727 guard = result.0;
728 }
729 None => {
730 guard = self
731 .reader
732 .condvar
733 .wait(guard)
734 .expect("pty read mutex poisoned");
735 }
736 }
737 }
738 }
739
740 pub fn wait_and_drain_impl(
742 &self,
743 timeout: Option<f64>,
744 drain_timeout: f64,
745 ) -> Result<i32, PtyError> {
746 let code = self.wait_impl(timeout)?;
747 let deadline = Instant::now() + Duration::from_secs_f64(drain_timeout.max(0.0));
748 let mut guard = self.reader.state.lock().expect("pty read mutex poisoned");
749 while !guard.closed {
750 let remaining = deadline.saturating_duration_since(Instant::now());
751 if remaining.is_zero() {
752 break;
753 }
754 let result = self
755 .reader
756 .condvar
757 .wait_timeout(guard, remaining)
758 .expect("pty read mutex poisoned");
759 guard = result.0;
760 }
761 Ok(code)
762 }
763
764 pub fn set_echo(&self, enabled: bool) {
765 self.echo.store(enabled, Ordering::Release);
766 }
767
768 pub fn echo_enabled(&self) -> bool {
769 self.echo.load(Ordering::Acquire)
770 }
771
772 pub fn attach_idle_detector(&self, detector: &Arc<IdleDetectorCore>) {
773 let mut guard = self
774 .idle_detector
775 .lock()
776 .expect("idle detector mutex poisoned");
777 *guard = Some(Arc::clone(detector));
778 }
779
780 pub fn detach_idle_detector(&self) {
781 let mut guard = self
782 .idle_detector
783 .lock()
784 .expect("idle detector mutex poisoned");
785 *guard = None;
786 }
787
788 pub fn pty_input_bytes_total(&self) -> usize {
789 self.input_bytes_total.load(Ordering::Acquire)
790 }
791
792 pub fn pty_newline_events_total(&self) -> usize {
793 self.newline_events_total.load(Ordering::Acquire)
794 }
795
796 pub fn pty_submit_events_total(&self) -> usize {
797 self.submit_events_total.load(Ordering::Acquire)
798 }
799
800 pub fn pty_output_bytes_total(&self) -> usize {
801 self.output_bytes_total.load(Ordering::Acquire)
802 }
803
804 pub fn pty_control_churn_bytes_total(&self) -> usize {
805 self.control_churn_bytes_total.load(Ordering::Acquire)
806 }
807}
808
809#[derive(Debug, Clone, Copy)]
814pub struct InteractivePtyOptions {
815 pub echo_output: bool,
816 pub relay_terminal_input: bool,
817 pub respond_to_queries: bool,
818}
819
820impl Default for InteractivePtyOptions {
821 fn default() -> Self {
822 Self {
823 echo_output: true,
824 relay_terminal_input: true,
825 respond_to_queries: true,
826 }
827 }
828}
829
830#[derive(Debug, Default)]
831pub struct InteractivePtyPumpResult {
832 pub chunks: Vec<Vec<u8>>,
833 pub stream_closed: bool,
834}
835
836pub struct InteractivePtySession {
841 process: NativePtyProcess,
842 options: InteractivePtyOptions,
843}
844
845impl InteractivePtySession {
846 pub fn new(process: NativePtyProcess) -> Self {
847 Self::with_options(process, InteractivePtyOptions::default())
848 }
849
850 pub fn with_options(process: NativePtyProcess, options: InteractivePtyOptions) -> Self {
851 Self { process, options }
852 }
853
854 pub fn process(&self) -> &NativePtyProcess {
855 &self.process
856 }
857
858 pub fn start(&self) -> Result<(), PtyError> {
859 self.process.set_echo(self.options.echo_output);
860 self.process.start_impl()?;
861 if self.options.relay_terminal_input {
862 self.process.start_terminal_input_relay_impl()?;
863 }
864 Ok(())
865 }
866
867 pub fn pump_output(
868 &self,
869 timeout: Option<f64>,
870 consume_all: bool,
871 ) -> Result<InteractivePtyPumpResult, PtyError> {
872 let mut pumped = InteractivePtyPumpResult::default();
873 let mut next_timeout = timeout;
874 loop {
875 match self.process.read_chunk_impl(next_timeout) {
876 Ok(Some(chunk)) => {
877 if self.options.respond_to_queries {
878 self.process.respond_to_queries_impl(&chunk)?;
879 }
880 pumped.chunks.push(chunk);
881 if !consume_all {
882 break;
883 }
884 next_timeout = Some(0.0);
885 }
886 Ok(None) => break,
887 Err(PtyError::Other(message)) if message == "Pseudo-terminal stream is closed" => {
888 pumped.stream_closed = true;
889 break;
890 }
891 Err(err) => return Err(err),
892 }
893 }
894 Ok(pumped)
895 }
896
897 pub fn resize(&self, rows: u16, cols: u16) -> Result<(), PtyError> {
898 self.process.resize_impl(rows, cols)
899 }
900
901 pub fn send_interrupt(&self) -> Result<(), PtyError> {
902 self.process.send_interrupt_impl()
903 }
904
905 pub fn wait(&self, timeout: Option<f64>) -> Result<i32, PtyError> {
906 self.process.wait_impl(timeout)
907 }
908
909 pub fn wait_and_drain(
910 &self,
911 timeout: Option<f64>,
912 drain_timeout: f64,
913 ) -> Result<i32, PtyError> {
914 self.process.wait_and_drain_impl(timeout, drain_timeout)
915 }
916
917 pub fn terminate(&self) -> Result<(), PtyError> {
918 self.process.terminate_impl()
919 }
920
921 pub fn kill(&self) -> Result<(), PtyError> {
922 self.process.kill_impl()
923 }
924
925 pub fn close(&self) -> Result<(), PtyError> {
926 self.process.close_impl()
927 }
928}
929
930impl Drop for NativePtyProcess {
931 fn drop(&mut self) {
932 self.close_nonblocking();
933 }
934}