1#![allow(clippy::cast_possible_wrap)]
20
21use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
22use crate::cli_backend::{CliBackend, OutputFormat};
23use crate::stream_handler::{SessionResult, StreamHandler};
24use nix::sys::signal::{Signal, kill};
25use nix::unistd::Pid;
26use portable_pty::{CommandBuilder, PtyPair, PtySize, native_pty_system};
27use std::io::{self, Read, Write};
28use std::sync::Arc;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::time::{Duration, Instant};
31use tokio::sync::{mpsc, watch};
32use tracing::{debug, info, warn};
33
34#[derive(Debug)]
36pub struct PtyExecutionResult {
37 pub output: String,
39 pub stripped_output: String,
41 pub extracted_text: String,
47 pub success: bool,
49 pub exit_code: Option<i32>,
51 pub termination: TerminationType,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum TerminationType {
58 Natural,
60 IdleTimeout,
62 UserInterrupt,
64 ForceKill,
66}
67
68#[derive(Debug, Clone)]
70pub struct PtyConfig {
71 pub interactive: bool,
73 pub idle_timeout_secs: u32,
75 pub cols: u16,
77 pub rows: u16,
79}
80
81impl Default for PtyConfig {
82 fn default() -> Self {
83 Self {
84 interactive: true,
85 idle_timeout_secs: 30,
86 cols: 80,
87 rows: 24,
88 }
89 }
90}
91
92impl PtyConfig {
93 pub fn from_env() -> Self {
95 let cols = std::env::var("COLUMNS")
96 .ok()
97 .and_then(|s| s.parse().ok())
98 .unwrap_or(80);
99 let rows = std::env::var("LINES")
100 .ok()
101 .and_then(|s| s.parse().ok())
102 .unwrap_or(24);
103
104 Self {
105 cols,
106 rows,
107 ..Default::default()
108 }
109 }
110}
111
112#[derive(Debug)]
114pub struct CtrlCState {
115 first_press: Option<Instant>,
117 window: Duration,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum CtrlCAction {
124 ForwardAndStartWindow,
126 Terminate,
128}
129
130impl CtrlCState {
131 pub fn new() -> Self {
133 Self {
134 first_press: None,
135 window: Duration::from_secs(1),
136 }
137 }
138
139 pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
141 match self.first_press {
142 Some(first) if now.duration_since(first) < self.window => {
143 self.first_press = None;
145 CtrlCAction::Terminate
146 }
147 _ => {
148 self.first_press = Some(now);
150 CtrlCAction::ForwardAndStartWindow
151 }
152 }
153 }
154}
155
156impl Default for CtrlCState {
157 fn default() -> Self {
158 Self::new()
159 }
160}
161
162pub struct PtyExecutor {
164 backend: CliBackend,
165 config: PtyConfig,
166 output_tx: mpsc::UnboundedSender<Vec<u8>>,
168 output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
169 input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
170 input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
171 control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
172 control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
173 terminated_tx: watch::Sender<bool>,
175 terminated_rx: Option<watch::Receiver<bool>>,
176}
177
178impl PtyExecutor {
179 pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
181 let (output_tx, output_rx) = mpsc::unbounded_channel();
182 let (input_tx, input_rx) = mpsc::unbounded_channel();
183 let (control_tx, control_rx) = mpsc::unbounded_channel();
184 let (terminated_tx, terminated_rx) = watch::channel(false);
185
186 Self {
187 backend,
188 config,
189 output_tx,
190 output_rx: Some(output_rx),
191 input_tx: Some(input_tx),
192 input_rx,
193 control_tx: Some(control_tx),
194 control_rx,
195 terminated_tx,
196 terminated_rx: Some(terminated_rx),
197 }
198 }
199
200 pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
204 crate::pty_handle::PtyHandle {
205 output_rx: self.output_rx.take().expect("handle() already called"),
206 input_tx: self.input_tx.take().expect("handle() already called"),
207 control_tx: self.control_tx.take().expect("handle() already called"),
208 terminated_rx: self.terminated_rx.take().expect("handle() already called"),
209 }
210 }
211
212 fn spawn_pty(
221 &self,
222 prompt: &str,
223 ) -> io::Result<(
224 PtyPair,
225 Box<dyn portable_pty::Child + Send>,
226 Option<String>,
227 Option<tempfile::NamedTempFile>,
228 )> {
229 let pty_system = native_pty_system();
230
231 let pair = pty_system
232 .openpty(PtySize {
233 rows: self.config.rows,
234 cols: self.config.cols,
235 pixel_width: 0,
236 pixel_height: 0,
237 })
238 .map_err(|e| io::Error::other(e.to_string()))?;
239
240 let (cmd, args, stdin_input, temp_file) =
241 self.backend.build_command(prompt, self.config.interactive);
242
243 let mut cmd_builder = CommandBuilder::new(&cmd);
244 cmd_builder.args(&args);
245
246 let cwd = std::env::current_dir()
248 .map_err(|e| io::Error::other(format!("Failed to get current directory: {}", e)))?;
249 cmd_builder.cwd(&cwd);
250
251 cmd_builder.env("TERM", "xterm-256color");
253 let child = pair
254 .slave
255 .spawn_command(cmd_builder)
256 .map_err(|e| io::Error::other(e.to_string()))?;
257
258 Ok((pair, child, stdin_input, temp_file))
260 }
261
262 pub async fn run_observe(
279 &self,
280 prompt: &str,
281 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
282 ) -> io::Result<PtyExecutionResult> {
283 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
285
286 let reader = pair
287 .master
288 .try_clone_reader()
289 .map_err(|e| io::Error::other(e.to_string()))?;
290
291 if let Some(ref input) = stdin_input {
293 tokio::time::sleep(Duration::from_millis(100)).await;
295 let mut writer = pair
296 .master
297 .take_writer()
298 .map_err(|e| io::Error::other(e.to_string()))?;
299 writer.write_all(input.as_bytes())?;
300 writer.write_all(b"\n")?;
301 writer.flush()?;
302 }
303
304 drop(pair.slave);
306
307 let mut output = Vec::new();
308 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
309 None
310 } else {
311 Some(Duration::from_secs(u64::from(
312 self.config.idle_timeout_secs,
313 )))
314 };
315
316 let mut termination = TerminationType::Natural;
317 let mut last_activity = Instant::now();
318
319 let should_terminate = Arc::new(AtomicBool::new(false));
321
322 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
324 let should_terminate_reader = Arc::clone(&should_terminate);
325 let tui_connected = self.output_rx.is_none();
327 let tui_output_tx = if tui_connected {
328 Some(self.output_tx.clone())
329 } else {
330 None
331 };
332
333 debug!("Spawning PTY output reader thread (observe mode)");
334 std::thread::spawn(move || {
335 let mut reader = reader;
336 let mut buf = [0u8; 4096];
337
338 loop {
339 if should_terminate_reader.load(Ordering::SeqCst) {
340 debug!("PTY reader: termination requested");
341 break;
342 }
343
344 match reader.read(&mut buf) {
345 Ok(0) => {
346 debug!("PTY reader: EOF");
347 let _ = output_tx.blocking_send(OutputEvent::Eof);
348 break;
349 }
350 Ok(n) => {
351 let data = buf[..n].to_vec();
352 if let Some(ref tx) = tui_output_tx {
354 let _ = tx.send(data.clone());
355 }
356 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
358 break;
359 }
360 }
361 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
362 std::thread::sleep(Duration::from_millis(10));
363 }
364 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
365 Err(e) => {
366 debug!(error = %e, "PTY reader error");
367 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
368 break;
369 }
370 }
371 }
372 });
373
374 loop {
376 let idle_timeout = timeout_duration.map(|d| {
378 let elapsed = last_activity.elapsed();
379 if elapsed >= d {
380 Duration::from_millis(1) } else {
382 d.saturating_sub(elapsed)
383 }
384 });
385
386 tokio::select! {
387 _ = interrupt_rx.changed() => {
389 if *interrupt_rx.borrow() {
390 debug!("Interrupt received in observe mode, terminating");
391 termination = TerminationType::UserInterrupt;
392 should_terminate.store(true, Ordering::SeqCst);
393 let _ = self.terminate_child(&mut child, true).await;
394 break;
395 }
396 }
397
398 event = output_rx.recv() => {
400 match event {
401 Some(OutputEvent::Data(data)) => {
402 if !tui_connected {
404 io::stdout().write_all(&data)?;
405 io::stdout().flush()?;
406 }
407 output.extend_from_slice(&data);
408 last_activity = Instant::now();
409 }
410 Some(OutputEvent::Eof) | None => {
411 debug!("Output channel closed, process likely exited");
412 break;
413 }
414 Some(OutputEvent::Error(e)) => {
415 debug!(error = %e, "Reader thread reported error");
416 break;
417 }
418 }
419 }
420
421 _ = async {
423 if let Some(timeout) = idle_timeout {
424 tokio::time::sleep(timeout).await;
425 } else {
426 std::future::pending::<()>().await;
428 }
429 } => {
430 warn!(
431 timeout_secs = self.config.idle_timeout_secs,
432 "Idle timeout triggered"
433 );
434 termination = TerminationType::IdleTimeout;
435 should_terminate.store(true, Ordering::SeqCst);
436 self.terminate_child(&mut child, true).await?;
437 break;
438 }
439 }
440
441 if let Some(status) = child
443 .try_wait()
444 .map_err(|e| io::Error::other(e.to_string()))?
445 {
446 let exit_code = status.exit_code() as i32;
447 debug!(exit_status = ?status, exit_code, "Child process exited");
448
449 while let Ok(event) = output_rx.try_recv() {
451 if let OutputEvent::Data(data) = event {
452 if !tui_connected {
453 io::stdout().write_all(&data)?;
454 io::stdout().flush()?;
455 }
456 output.extend_from_slice(&data);
457 }
458 }
459
460 let final_termination = resolve_termination_type(exit_code, termination);
461 return Ok(build_result(
463 &output,
464 status.success(),
465 Some(exit_code),
466 final_termination,
467 String::new(),
468 ));
469 }
470 }
471
472 should_terminate.store(true, Ordering::SeqCst);
474
475 let status = self
477 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
478 .await?;
479
480 let (success, exit_code, final_termination) = match status {
481 Some(s) => {
482 let code = s.exit_code() as i32;
483 (
484 s.success(),
485 Some(code),
486 resolve_termination_type(code, termination),
487 )
488 }
489 None => {
490 warn!("Timed out waiting for child to exit after termination");
491 (false, None, termination)
492 }
493 };
494
495 Ok(build_result(
497 &output,
498 success,
499 exit_code,
500 final_termination,
501 String::new(),
502 ))
503 }
504
505 pub async fn run_observe_streaming<H: StreamHandler>(
521 &self,
522 prompt: &str,
523 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
524 handler: &mut H,
525 ) -> io::Result<PtyExecutionResult> {
526 let output_format = self.backend.output_format;
528
529 if output_format != OutputFormat::StreamJson {
531 return self.run_observe(prompt, interrupt_rx).await;
532 }
533
534 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
536
537 let reader = pair
538 .master
539 .try_clone_reader()
540 .map_err(|e| io::Error::other(e.to_string()))?;
541
542 if let Some(ref input) = stdin_input {
544 tokio::time::sleep(Duration::from_millis(100)).await;
545 let mut writer = pair
546 .master
547 .take_writer()
548 .map_err(|e| io::Error::other(e.to_string()))?;
549 writer.write_all(input.as_bytes())?;
550 writer.write_all(b"\n")?;
551 writer.flush()?;
552 }
553
554 drop(pair.slave);
555
556 let mut output = Vec::new();
557 let mut line_buffer = String::new();
558 let mut extracted_text = String::new();
560 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
561 None
562 } else {
563 Some(Duration::from_secs(u64::from(
564 self.config.idle_timeout_secs,
565 )))
566 };
567
568 let mut termination = TerminationType::Natural;
569 let mut last_activity = Instant::now();
570
571 let should_terminate = Arc::new(AtomicBool::new(false));
572
573 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
575 let should_terminate_reader = Arc::clone(&should_terminate);
576 let tui_connected = self.output_rx.is_none();
577 let tui_output_tx = if tui_connected {
578 Some(self.output_tx.clone())
579 } else {
580 None
581 };
582
583 debug!("Spawning PTY output reader thread (streaming mode)");
584 std::thread::spawn(move || {
585 let mut reader = reader;
586 let mut buf = [0u8; 4096];
587
588 loop {
589 if should_terminate_reader.load(Ordering::SeqCst) {
590 debug!("PTY reader: termination requested");
591 break;
592 }
593
594 match reader.read(&mut buf) {
595 Ok(0) => {
596 debug!("PTY reader: EOF");
597 let _ = output_tx.blocking_send(OutputEvent::Eof);
598 break;
599 }
600 Ok(n) => {
601 let data = buf[..n].to_vec();
602 if let Some(ref tx) = tui_output_tx {
603 let _ = tx.send(data.clone());
604 }
605 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
606 break;
607 }
608 }
609 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
610 std::thread::sleep(Duration::from_millis(10));
611 }
612 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
613 Err(e) => {
614 debug!(error = %e, "PTY reader error");
615 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
616 break;
617 }
618 }
619 }
620 });
621
622 loop {
624 let idle_timeout = timeout_duration.map(|d| {
625 let elapsed = last_activity.elapsed();
626 if elapsed >= d {
627 Duration::from_millis(1)
628 } else {
629 d.saturating_sub(elapsed)
630 }
631 });
632
633 tokio::select! {
634 _ = interrupt_rx.changed() => {
635 if *interrupt_rx.borrow() {
636 debug!("Interrupt received in streaming observe mode, terminating");
637 termination = TerminationType::UserInterrupt;
638 should_terminate.store(true, Ordering::SeqCst);
639 let _ = self.terminate_child(&mut child, true).await;
640 break;
641 }
642 }
643
644 event = output_rx.recv() => {
645 match event {
646 Some(OutputEvent::Data(data)) => {
647 output.extend_from_slice(&data);
648 last_activity = Instant::now();
649
650 if let Ok(text) = std::str::from_utf8(&data) {
652 line_buffer.push_str(text);
653
654 while let Some(newline_pos) = line_buffer.find('\n') {
656 let line = line_buffer[..newline_pos].to_string();
657 line_buffer = line_buffer[newline_pos + 1..].to_string();
658
659 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
660 dispatch_stream_event(event, handler, &mut extracted_text);
661 }
662 }
663 }
664 }
665 Some(OutputEvent::Eof) | None => {
666 debug!("Output channel closed");
667 if !line_buffer.is_empty()
669 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
670 {
671 dispatch_stream_event(event, handler, &mut extracted_text);
672 }
673 break;
674 }
675 Some(OutputEvent::Error(e)) => {
676 debug!(error = %e, "Reader thread reported error");
677 handler.on_error(&e);
678 break;
679 }
680 }
681 }
682
683 _ = async {
684 if let Some(timeout) = idle_timeout {
685 tokio::time::sleep(timeout).await;
686 } else {
687 std::future::pending::<()>().await;
688 }
689 } => {
690 warn!(
691 timeout_secs = self.config.idle_timeout_secs,
692 "Idle timeout triggered"
693 );
694 termination = TerminationType::IdleTimeout;
695 should_terminate.store(true, Ordering::SeqCst);
696 self.terminate_child(&mut child, true).await?;
697 break;
698 }
699 }
700
701 if let Some(status) = child
703 .try_wait()
704 .map_err(|e| io::Error::other(e.to_string()))?
705 {
706 let exit_code = status.exit_code() as i32;
707 debug!(exit_status = ?status, exit_code, "Child process exited");
708
709 while let Ok(event) = output_rx.try_recv() {
711 if let OutputEvent::Data(data) = event {
712 output.extend_from_slice(&data);
713 if let Ok(text) = std::str::from_utf8(&data) {
714 line_buffer.push_str(text);
715 while let Some(newline_pos) = line_buffer.find('\n') {
716 let line = line_buffer[..newline_pos].to_string();
717 line_buffer = line_buffer[newline_pos + 1..].to_string();
718 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
719 dispatch_stream_event(event, handler, &mut extracted_text);
720 }
721 }
722 }
723 }
724 }
725
726 if !line_buffer.is_empty()
728 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
729 {
730 dispatch_stream_event(event, handler, &mut extracted_text);
731 }
732
733 let final_termination = resolve_termination_type(exit_code, termination);
734 return Ok(build_result(
736 &output,
737 status.success(),
738 Some(exit_code),
739 final_termination,
740 extracted_text,
741 ));
742 }
743 }
744
745 should_terminate.store(true, Ordering::SeqCst);
746
747 let status = self
748 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
749 .await?;
750
751 let (success, exit_code, final_termination) = match status {
752 Some(s) => {
753 let code = s.exit_code() as i32;
754 (
755 s.success(),
756 Some(code),
757 resolve_termination_type(code, termination),
758 )
759 }
760 None => {
761 warn!("Timed out waiting for child to exit after termination");
762 (false, None, termination)
763 }
764 };
765
766 Ok(build_result(
768 &output,
769 success,
770 exit_code,
771 final_termination,
772 extracted_text,
773 ))
774 }
775
776 #[allow(clippy::too_many_lines)] pub async fn run_interactive(
797 &mut self,
798 prompt: &str,
799 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
800 ) -> io::Result<PtyExecutionResult> {
801 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
803
804 let reader = pair
805 .master
806 .try_clone_reader()
807 .map_err(|e| io::Error::other(e.to_string()))?;
808 let mut writer = pair
809 .master
810 .take_writer()
811 .map_err(|e| io::Error::other(e.to_string()))?;
812
813 let master = pair.master;
815
816 drop(pair.slave);
818
819 let pending_stdin = stdin_input;
821
822 let mut output = Vec::new();
823 let timeout_duration = if self.config.idle_timeout_secs > 0 {
824 Some(Duration::from_secs(u64::from(
825 self.config.idle_timeout_secs,
826 )))
827 } else {
828 None
829 };
830
831 let mut ctrl_c_state = CtrlCState::new();
832 let mut termination = TerminationType::Natural;
833 let mut last_activity = Instant::now();
834
835 let should_terminate = Arc::new(AtomicBool::new(false));
837
838 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
840 let should_terminate_output = Arc::clone(&should_terminate);
841 let tui_connected = self.output_rx.is_none();
843 let tui_output_tx = if tui_connected {
844 Some(self.output_tx.clone())
845 } else {
846 None
847 };
848
849 debug!("Spawning PTY output reader thread");
850 std::thread::spawn(move || {
851 debug!("PTY output reader thread started");
852 let mut reader = reader;
853 let mut buf = [0u8; 4096];
854
855 loop {
856 if should_terminate_output.load(Ordering::SeqCst) {
857 debug!("PTY output reader: termination requested");
858 break;
859 }
860
861 match reader.read(&mut buf) {
862 Ok(0) => {
863 debug!("PTY output reader: EOF received");
865 let _ = output_tx.blocking_send(OutputEvent::Eof);
866 break;
867 }
868 Ok(n) => {
869 let data = buf[..n].to_vec();
870 if let Some(ref tx) = tui_output_tx {
872 let _ = tx.send(data.clone());
873 }
874 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
876 debug!("PTY output reader: channel closed");
877 break;
878 }
879 }
880 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
881 std::thread::sleep(Duration::from_millis(1));
883 }
884 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
885 }
887 Err(e) => {
888 warn!("PTY output reader: error - {}", e);
889 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
890 break;
891 }
892 }
893 }
894 debug!("PTY output reader thread exiting");
895 });
896
897 let (input_tx, mut input_rx) = mpsc::unbounded_channel::<InputEvent>();
899 let should_terminate_input = Arc::clone(&should_terminate);
900
901 std::thread::spawn(move || {
902 let mut stdin = io::stdin();
903 let mut buf = [0u8; 1];
904
905 loop {
906 if should_terminate_input.load(Ordering::SeqCst) {
907 break;
908 }
909
910 match stdin.read(&mut buf) {
911 Ok(0) => break, Ok(1) => {
913 let byte = buf[0];
914 let event = match byte {
915 3 => InputEvent::CtrlC, 28 => InputEvent::CtrlBackslash, _ => InputEvent::Data(vec![byte]),
918 };
919 if input_tx.send(event).is_err() {
920 break;
921 }
922 }
923 Ok(_) => {} Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
925 Err(_) => break,
926 }
927 }
928 });
929
930 if let Some(ref input) = pending_stdin {
933 tokio::time::sleep(Duration::from_millis(100)).await;
934 writer.write_all(input.as_bytes())?;
935 writer.write_all(b"\n")?;
936 writer.flush()?;
937 last_activity = Instant::now();
938 }
939
940 loop {
943 if let Some(status) = child
945 .try_wait()
946 .map_err(|e| io::Error::other(e.to_string()))?
947 {
948 let exit_code = status.exit_code() as i32;
949 debug!(exit_status = ?status, exit_code, "Child process exited");
950
951 while let Ok(event) = output_rx.try_recv() {
953 if let OutputEvent::Data(data) = event {
954 if !tui_connected {
955 io::stdout().write_all(&data)?;
956 io::stdout().flush()?;
957 }
958 output.extend_from_slice(&data);
959 }
960 }
961
962 should_terminate.store(true, Ordering::SeqCst);
963 let _ = self.terminated_tx.send(true);
965
966 let final_termination = resolve_termination_type(exit_code, termination);
967 return Ok(build_result(
969 &output,
970 status.success(),
971 Some(exit_code),
972 final_termination,
973 String::new(),
974 ));
975 }
976
977 let timeout_future = async {
979 match timeout_duration {
980 Some(d) => {
981 let elapsed = last_activity.elapsed();
982 if elapsed >= d {
983 tokio::time::sleep(Duration::ZERO).await
984 } else {
985 tokio::time::sleep(d.saturating_sub(elapsed)).await
986 }
987 }
988 None => std::future::pending::<()>().await,
989 }
990 };
991
992 tokio::select! {
993 output_event = output_rx.recv() => {
995 match output_event {
996 Some(OutputEvent::Data(data)) => {
997 if !tui_connected {
999 io::stdout().write_all(&data)?;
1000 io::stdout().flush()?;
1001 }
1002 output.extend_from_slice(&data);
1003
1004 last_activity = Instant::now();
1005 }
1006 Some(OutputEvent::Eof) => {
1007 debug!("PTY EOF received");
1008 break;
1009 }
1010 Some(OutputEvent::Error(e)) => {
1011 debug!(error = %e, "PTY read error");
1012 break;
1013 }
1014 None => {
1015 break;
1017 }
1018 }
1019 }
1020
1021 input_event = async { input_rx.recv().await } => {
1023 match input_event {
1024 Some(InputEvent::CtrlC) => {
1025 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1026 CtrlCAction::ForwardAndStartWindow => {
1027 let _ = writer.write_all(&[3]);
1029 let _ = writer.flush();
1030 last_activity = Instant::now();
1031 }
1032 CtrlCAction::Terminate => {
1033 info!("Double Ctrl+C detected, terminating");
1034 termination = TerminationType::UserInterrupt;
1035 should_terminate.store(true, Ordering::SeqCst);
1036 self.terminate_child(&mut child, true).await?;
1037 break;
1038 }
1039 }
1040 }
1041 Some(InputEvent::CtrlBackslash) => {
1042 info!("Ctrl+\\ detected, force killing");
1043 termination = TerminationType::ForceKill;
1044 should_terminate.store(true, Ordering::SeqCst);
1045 self.terminate_child(&mut child, false).await?;
1046 break;
1047 }
1048 Some(InputEvent::Data(data)) => {
1049 let _ = writer.write_all(&data);
1051 let _ = writer.flush();
1052 last_activity = Instant::now();
1053 }
1054 None => {
1055 debug!("Input channel closed");
1057 }
1058 }
1059 }
1060
1061 tui_input = self.input_rx.recv() => {
1063 if let Some(data) = tui_input {
1064 match InputEvent::from_bytes(data) {
1065 InputEvent::CtrlC => {
1066 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1067 CtrlCAction::ForwardAndStartWindow => {
1068 let _ = writer.write_all(&[3]);
1069 let _ = writer.flush();
1070 last_activity = Instant::now();
1071 }
1072 CtrlCAction::Terminate => {
1073 info!("Double Ctrl+C detected, terminating");
1074 termination = TerminationType::UserInterrupt;
1075 should_terminate.store(true, Ordering::SeqCst);
1076 self.terminate_child(&mut child, true).await?;
1077 break;
1078 }
1079 }
1080 }
1081 InputEvent::CtrlBackslash => {
1082 info!("Ctrl+\\ detected, force killing");
1083 termination = TerminationType::ForceKill;
1084 should_terminate.store(true, Ordering::SeqCst);
1085 self.terminate_child(&mut child, false).await?;
1086 break;
1087 }
1088 InputEvent::Data(bytes) => {
1089 let _ = writer.write_all(&bytes);
1090 let _ = writer.flush();
1091 last_activity = Instant::now();
1092 }
1093 }
1094 }
1095 }
1096
1097 control_cmd = self.control_rx.recv() => {
1099 if let Some(cmd) = control_cmd {
1100 use crate::pty_handle::ControlCommand;
1101 match cmd {
1102 ControlCommand::Kill => {
1103 info!("Control command: Kill");
1104 termination = TerminationType::UserInterrupt;
1105 should_terminate.store(true, Ordering::SeqCst);
1106 self.terminate_child(&mut child, true).await?;
1107 break;
1108 }
1109 ControlCommand::Resize(cols, rows) => {
1110 debug!(cols, rows, "Control command: Resize");
1111 if let Err(e) = master.resize(PtySize {
1113 rows,
1114 cols,
1115 pixel_width: 0,
1116 pixel_height: 0,
1117 }) {
1118 warn!("Failed to resize PTY: {}", e);
1119 }
1120 }
1121 ControlCommand::Skip | ControlCommand::Abort => {
1122 debug!("Control command: {:?} (ignored at PTY level)", cmd);
1124 }
1125 }
1126 }
1127 }
1128
1129 _ = timeout_future => {
1131 warn!(
1132 timeout_secs = self.config.idle_timeout_secs,
1133 "Idle timeout triggered"
1134 );
1135 termination = TerminationType::IdleTimeout;
1136 should_terminate.store(true, Ordering::SeqCst);
1137 self.terminate_child(&mut child, true).await?;
1138 break;
1139 }
1140
1141 _ = interrupt_rx.changed() => {
1143 if *interrupt_rx.borrow() {
1144 debug!("Interrupt received in interactive mode, terminating");
1145 termination = TerminationType::UserInterrupt;
1146 should_terminate.store(true, Ordering::SeqCst);
1147 self.terminate_child(&mut child, true).await?;
1148 break;
1149 }
1150 }
1151 }
1152 }
1153
1154 should_terminate.store(true, Ordering::SeqCst);
1156
1157 let _ = self.terminated_tx.send(true);
1159
1160 let status = self
1162 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1163 .await?;
1164
1165 let (success, exit_code, final_termination) = match status {
1166 Some(s) => {
1167 let code = s.exit_code() as i32;
1168 (
1169 s.success(),
1170 Some(code),
1171 resolve_termination_type(code, termination),
1172 )
1173 }
1174 None => {
1175 warn!("Timed out waiting for child to exit after termination");
1176 (false, None, termination)
1177 }
1178 };
1179
1180 Ok(build_result(
1182 &output,
1183 success,
1184 exit_code,
1185 final_termination,
1186 String::new(),
1187 ))
1188 }
1189
1190 #[allow(clippy::unused_self)] async fn terminate_child(
1200 &self,
1201 child: &mut Box<dyn portable_pty::Child + Send>,
1202 graceful: bool,
1203 ) -> io::Result<()> {
1204 let pid = match child.process_id() {
1205 Some(id) => Pid::from_raw(id as i32),
1206 None => return Ok(()), };
1208
1209 if graceful {
1210 debug!(pid = %pid, "Sending SIGTERM");
1211 let _ = kill(pid, Signal::SIGTERM);
1212
1213 let grace_period = Duration::from_secs(2);
1215 let start = Instant::now();
1216
1217 while start.elapsed() < grace_period {
1218 if child
1219 .try_wait()
1220 .map_err(|e| io::Error::other(e.to_string()))?
1221 .is_some()
1222 {
1223 return Ok(());
1224 }
1225 tokio::time::sleep(Duration::from_millis(50)).await;
1227 }
1228
1229 debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1231 }
1232
1233 debug!(pid = %pid, "Sending SIGKILL");
1234 let _ = kill(pid, Signal::SIGKILL);
1235 Ok(())
1236 }
1237
1238 async fn wait_for_exit(
1243 &self,
1244 child: &mut Box<dyn portable_pty::Child + Send>,
1245 max_wait: Option<Duration>,
1246 interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1247 ) -> io::Result<Option<portable_pty::ExitStatus>> {
1248 let start = Instant::now();
1249
1250 loop {
1251 if let Some(status) = child
1252 .try_wait()
1253 .map_err(|e| io::Error::other(e.to_string()))?
1254 {
1255 return Ok(Some(status));
1256 }
1257
1258 if let Some(max) = max_wait
1259 && start.elapsed() >= max
1260 {
1261 return Ok(None);
1262 }
1263
1264 tokio::select! {
1265 _ = interrupt_rx.changed() => {
1266 if *interrupt_rx.borrow() {
1267 debug!("Interrupt received while waiting for child exit");
1268 return Ok(None);
1269 }
1270 }
1271 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1272 }
1273 }
1274 }
1275}
1276
1277#[derive(Debug)]
1279enum InputEvent {
1280 CtrlC,
1282 CtrlBackslash,
1284 Data(Vec<u8>),
1286}
1287
1288impl InputEvent {
1289 fn from_bytes(data: Vec<u8>) -> Self {
1291 if data.len() == 1 {
1292 match data[0] {
1293 3 => return InputEvent::CtrlC,
1294 28 => return InputEvent::CtrlBackslash,
1295 _ => {}
1296 }
1297 }
1298 InputEvent::Data(data)
1299 }
1300}
1301
1302#[derive(Debug)]
1304enum OutputEvent {
1305 Data(Vec<u8>),
1307 Eof,
1309 Error(String),
1311}
1312
1313fn strip_ansi(bytes: &[u8]) -> String {
1319 let stripped = strip_ansi_escapes::strip(bytes);
1320 String::from_utf8_lossy(&stripped).into_owned()
1321}
1322
1323fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1327 if exit_code == 130 {
1328 info!("Child process killed by SIGINT");
1329 TerminationType::UserInterrupt
1330 } else {
1331 default
1332 }
1333}
1334
1335fn dispatch_stream_event<H: StreamHandler>(
1338 event: ClaudeStreamEvent,
1339 handler: &mut H,
1340 extracted_text: &mut String,
1341) {
1342 match event {
1343 ClaudeStreamEvent::System { .. } => {
1344 }
1346 ClaudeStreamEvent::Assistant { message, .. } => {
1347 for block in message.content {
1348 match block {
1349 ContentBlock::Text { text } => {
1350 handler.on_text(&text);
1351 extracted_text.push_str(&text);
1353 extracted_text.push('\n');
1354 }
1355 ContentBlock::ToolUse { name, id, input } => {
1356 handler.on_tool_call(&name, &id, &input)
1357 }
1358 }
1359 }
1360 }
1361 ClaudeStreamEvent::User { message } => {
1362 for block in message.content {
1363 match block {
1364 UserContentBlock::ToolResult {
1365 tool_use_id,
1366 content,
1367 } => {
1368 handler.on_tool_result(&tool_use_id, &content);
1369 }
1370 }
1371 }
1372 }
1373 ClaudeStreamEvent::Result {
1374 duration_ms,
1375 total_cost_usd,
1376 num_turns,
1377 is_error,
1378 } => {
1379 if is_error {
1380 handler.on_error("Session ended with error");
1381 }
1382 handler.on_complete(&SessionResult {
1383 duration_ms,
1384 total_cost_usd,
1385 num_turns,
1386 is_error,
1387 });
1388 }
1389 }
1390}
1391
1392fn build_result(
1401 output: &[u8],
1402 success: bool,
1403 exit_code: Option<i32>,
1404 termination: TerminationType,
1405 extracted_text: String,
1406) -> PtyExecutionResult {
1407 PtyExecutionResult {
1408 output: String::from_utf8_lossy(output).to_string(),
1409 stripped_output: strip_ansi(output),
1410 extracted_text,
1411 success,
1412 exit_code,
1413 termination,
1414 }
1415}
1416
1417#[cfg(test)]
1418mod tests {
1419 use super::*;
1420
1421 #[test]
1422 fn test_double_ctrl_c_within_window() {
1423 let mut state = CtrlCState::new();
1424 let now = Instant::now();
1425
1426 let action = state.handle_ctrl_c(now);
1428 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1429
1430 let later = now + Duration::from_millis(500);
1432 let action = state.handle_ctrl_c(later);
1433 assert_eq!(action, CtrlCAction::Terminate);
1434 }
1435
1436 #[test]
1437 fn test_ctrl_c_window_expires() {
1438 let mut state = CtrlCState::new();
1439 let now = Instant::now();
1440
1441 state.handle_ctrl_c(now);
1443
1444 let later = now + Duration::from_secs(2);
1446
1447 let action = state.handle_ctrl_c(later);
1449 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1450 }
1451
1452 #[test]
1453 fn test_strip_ansi_basic() {
1454 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n";
1455 let stripped = strip_ansi(input);
1456 assert!(stripped.contains("Thinking..."));
1457 assert!(!stripped.contains("\x1b["));
1458 }
1459
1460 #[test]
1461 fn test_completion_promise_extraction() {
1462 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n\
1464 \x1b[2K\x1b[1;32m Done!\x1b[0m\r\n\
1465 \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
1466
1467 let stripped = strip_ansi(input);
1468
1469 assert!(stripped.contains("LOOP_COMPLETE"));
1471 assert!(!stripped.contains("\x1b["));
1472 }
1473
1474 #[test]
1475 fn test_event_tag_extraction() {
1476 let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
1478 Task completed successfully\r\n\
1479 \x1b[90m</event>\x1b[0m\r\n";
1480
1481 let stripped = strip_ansi(input);
1482
1483 assert!(stripped.contains("<event topic=\"build.done\">"));
1484 assert!(stripped.contains("</event>"));
1485 }
1486
1487 #[test]
1488 fn test_large_output_preserves_early_events() {
1489 let mut input = Vec::new();
1491
1492 input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
1494
1495 for i in 0..500 {
1497 input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
1498 }
1499
1500 let stripped = strip_ansi(&input);
1501
1502 assert!(
1504 stripped.contains("<event topic=\"build.task\">"),
1505 "Event tag was lost - strip_ansi is not preserving all content"
1506 );
1507 assert!(stripped.contains("Implement feature X"));
1508 assert!(stripped.contains("Line 499")); }
1510
1511 #[test]
1512 fn test_pty_config_defaults() {
1513 let config = PtyConfig::default();
1514 assert!(config.interactive);
1515 assert_eq!(config.idle_timeout_secs, 30);
1516 assert_eq!(config.cols, 80);
1517 assert_eq!(config.rows, 24);
1518 }
1519
1520 #[test]
1528 fn test_idle_timeout_reset_logic() {
1529 let timeout_duration = Duration::from_secs(30);
1531
1532 let simulated_25s = Duration::from_secs(25);
1534
1535 let remaining = timeout_duration.saturating_sub(simulated_25s);
1537 assert_eq!(remaining.as_secs(), 5);
1538
1539 let last_activity_after_reset = Instant::now();
1541
1542 let elapsed = last_activity_after_reset.elapsed();
1544 assert!(elapsed < Duration::from_millis(100)); let new_remaining = timeout_duration.saturating_sub(elapsed);
1548 assert!(new_remaining > Duration::from_secs(29)); }
1550
1551 #[test]
1552 fn test_extracted_text_field_exists() {
1553 let result = PtyExecutionResult {
1556 output: String::new(),
1557 stripped_output: String::new(),
1558 extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
1559 success: true,
1560 exit_code: Some(0),
1561 termination: TerminationType::Natural,
1562 };
1563
1564 assert!(
1565 result
1566 .extracted_text
1567 .contains("<event topic=\"build.done\">")
1568 );
1569 }
1570
1571 #[test]
1572 fn test_build_result_includes_extracted_text() {
1573 let output = b"raw output";
1575 let extracted = "extracted text with <event topic=\"test\">payload</event>";
1576 let result = build_result(
1577 output,
1578 true,
1579 Some(0),
1580 TerminationType::Natural,
1581 extracted.to_string(),
1582 );
1583
1584 assert_eq!(result.extracted_text, extracted);
1585 assert!(result.stripped_output.contains("raw output"));
1586 }
1587}