1#![allow(clippy::cast_possible_wrap)]
20
21use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
22use crate::cli_backend::{CliBackend, OutputFormat};
23use crate::stream_handler::{SessionResult, StreamHandler};
24#[cfg(unix)]
25use nix::sys::signal::{Signal, kill};
26#[cfg(unix)]
27use nix::unistd::Pid;
28use portable_pty::{CommandBuilder, PtyPair, PtySize, native_pty_system};
29use std::io::{self, Read, Write};
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use std::time::{Duration, Instant};
33use tokio::sync::{mpsc, watch};
34use tracing::{debug, info, warn};
35
36#[derive(Debug)]
38pub struct PtyExecutionResult {
39 pub output: String,
41 pub stripped_output: String,
43 pub extracted_text: String,
49 pub success: bool,
51 pub exit_code: Option<i32>,
53 pub termination: TerminationType,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum TerminationType {
60 Natural,
62 IdleTimeout,
64 UserInterrupt,
66 ForceKill,
68}
69
70#[derive(Debug, Clone)]
72pub struct PtyConfig {
73 pub interactive: bool,
75 pub idle_timeout_secs: u32,
77 pub cols: u16,
79 pub rows: u16,
81 pub workspace_root: std::path::PathBuf,
85}
86
87impl Default for PtyConfig {
88 fn default() -> Self {
89 Self {
90 interactive: true,
91 idle_timeout_secs: 30,
92 cols: 80,
93 rows: 24,
94 workspace_root: std::env::current_dir()
95 .unwrap_or_else(|_| std::path::PathBuf::from(".")),
96 }
97 }
98}
99
100impl PtyConfig {
101 pub fn from_env() -> Self {
103 let cols = std::env::var("COLUMNS")
104 .ok()
105 .and_then(|s| s.parse().ok())
106 .unwrap_or(80);
107 let rows = std::env::var("LINES")
108 .ok()
109 .and_then(|s| s.parse().ok())
110 .unwrap_or(24);
111
112 Self {
113 cols,
114 rows,
115 ..Default::default()
116 }
117 }
118
119 pub fn with_workspace_root(mut self, root: impl Into<std::path::PathBuf>) -> Self {
121 self.workspace_root = root.into();
122 self
123 }
124}
125
126#[derive(Debug)]
128pub struct CtrlCState {
129 first_press: Option<Instant>,
131 window: Duration,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum CtrlCAction {
138 ForwardAndStartWindow,
140 Terminate,
142}
143
144impl CtrlCState {
145 pub fn new() -> Self {
147 Self {
148 first_press: None,
149 window: Duration::from_secs(1),
150 }
151 }
152
153 pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
155 match self.first_press {
156 Some(first) if now.duration_since(first) < self.window => {
157 self.first_press = None;
159 CtrlCAction::Terminate
160 }
161 _ => {
162 self.first_press = Some(now);
164 CtrlCAction::ForwardAndStartWindow
165 }
166 }
167 }
168}
169
170impl Default for CtrlCState {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176pub struct PtyExecutor {
178 backend: CliBackend,
179 config: PtyConfig,
180 output_tx: mpsc::UnboundedSender<Vec<u8>>,
182 output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
183 input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
184 input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
185 control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
186 control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
187 terminated_tx: watch::Sender<bool>,
189 terminated_rx: Option<watch::Receiver<bool>>,
190 tui_mode: bool,
194}
195
196impl PtyExecutor {
197 pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
199 let (output_tx, output_rx) = mpsc::unbounded_channel();
200 let (input_tx, input_rx) = mpsc::unbounded_channel();
201 let (control_tx, control_rx) = mpsc::unbounded_channel();
202 let (terminated_tx, terminated_rx) = watch::channel(false);
203
204 Self {
205 backend,
206 config,
207 output_tx,
208 output_rx: Some(output_rx),
209 input_tx: Some(input_tx),
210 input_rx,
211 control_tx: Some(control_tx),
212 control_rx,
213 terminated_tx,
214 terminated_rx: Some(terminated_rx),
215 tui_mode: false,
216 }
217 }
218
219 pub fn set_tui_mode(&mut self, enabled: bool) {
228 self.tui_mode = enabled;
229 }
230
231 pub fn set_backend(&mut self, backend: CliBackend) {
239 self.backend = backend;
240 }
241
242 pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
246 crate::pty_handle::PtyHandle {
247 output_rx: self.output_rx.take().expect("handle() already called"),
248 input_tx: self.input_tx.take().expect("handle() already called"),
249 control_tx: self.control_tx.take().expect("handle() already called"),
250 terminated_rx: self.terminated_rx.take().expect("handle() already called"),
251 }
252 }
253
254 fn spawn_pty(
263 &self,
264 prompt: &str,
265 ) -> io::Result<(
266 PtyPair,
267 Box<dyn portable_pty::Child + Send>,
268 Option<String>,
269 Option<tempfile::NamedTempFile>,
270 )> {
271 let pty_system = native_pty_system();
272
273 let pair = pty_system
274 .openpty(PtySize {
275 rows: self.config.rows,
276 cols: self.config.cols,
277 pixel_width: 0,
278 pixel_height: 0,
279 })
280 .map_err(|e| io::Error::other(e.to_string()))?;
281
282 let (cmd, args, stdin_input, temp_file) =
283 self.backend.build_command(prompt, self.config.interactive);
284
285 let mut cmd_builder = CommandBuilder::new(&cmd);
286 cmd_builder.args(&args);
287
288 cmd_builder.cwd(&self.config.workspace_root);
291
292 cmd_builder.env("TERM", "xterm-256color");
294 let child = pair
295 .slave
296 .spawn_command(cmd_builder)
297 .map_err(|e| io::Error::other(e.to_string()))?;
298
299 Ok((pair, child, stdin_input, temp_file))
301 }
302
303 pub async fn run_observe(
320 &self,
321 prompt: &str,
322 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
323 ) -> io::Result<PtyExecutionResult> {
324 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
326
327 let reader = pair
328 .master
329 .try_clone_reader()
330 .map_err(|e| io::Error::other(e.to_string()))?;
331
332 if let Some(ref input) = stdin_input {
334 tokio::time::sleep(Duration::from_millis(100)).await;
336 let mut writer = pair
337 .master
338 .take_writer()
339 .map_err(|e| io::Error::other(e.to_string()))?;
340 writer.write_all(input.as_bytes())?;
341 writer.write_all(b"\n")?;
342 writer.flush()?;
343 }
344
345 drop(pair.slave);
347
348 let mut output = Vec::new();
349 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
350 None
351 } else {
352 Some(Duration::from_secs(u64::from(
353 self.config.idle_timeout_secs,
354 )))
355 };
356
357 let mut termination = TerminationType::Natural;
358 let mut last_activity = Instant::now();
359
360 let should_terminate = Arc::new(AtomicBool::new(false));
362
363 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
365 let should_terminate_reader = Arc::clone(&should_terminate);
366 let tui_connected = self.tui_mode;
368 let tui_output_tx = if tui_connected {
369 Some(self.output_tx.clone())
370 } else {
371 None
372 };
373
374 debug!("Spawning PTY output reader thread (observe mode)");
375 std::thread::spawn(move || {
376 let mut reader = reader;
377 let mut buf = [0u8; 4096];
378
379 loop {
380 if should_terminate_reader.load(Ordering::SeqCst) {
381 debug!("PTY reader: termination requested");
382 break;
383 }
384
385 match reader.read(&mut buf) {
386 Ok(0) => {
387 debug!("PTY reader: EOF");
388 let _ = output_tx.blocking_send(OutputEvent::Eof);
389 break;
390 }
391 Ok(n) => {
392 let data = buf[..n].to_vec();
393 if let Some(ref tx) = tui_output_tx {
395 let _ = tx.send(data.clone());
396 }
397 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
399 break;
400 }
401 }
402 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
403 std::thread::sleep(Duration::from_millis(10));
404 }
405 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
406 Err(e) => {
407 debug!(error = %e, "PTY reader error");
408 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
409 break;
410 }
411 }
412 }
413 });
414
415 loop {
417 let idle_timeout = timeout_duration.map(|d| {
419 let elapsed = last_activity.elapsed();
420 if elapsed >= d {
421 Duration::from_millis(1) } else {
423 d.saturating_sub(elapsed)
424 }
425 });
426
427 tokio::select! {
428 _ = interrupt_rx.changed() => {
430 if *interrupt_rx.borrow() {
431 debug!("Interrupt received in observe mode, terminating");
432 termination = TerminationType::UserInterrupt;
433 should_terminate.store(true, Ordering::SeqCst);
434 let _ = self.terminate_child(&mut child, true).await;
435 break;
436 }
437 }
438
439 event = output_rx.recv() => {
441 match event {
442 Some(OutputEvent::Data(data)) => {
443 if !tui_connected {
445 io::stdout().write_all(&data)?;
446 io::stdout().flush()?;
447 }
448 output.extend_from_slice(&data);
449 last_activity = Instant::now();
450 }
451 Some(OutputEvent::Eof) | None => {
452 debug!("Output channel closed, process likely exited");
453 break;
454 }
455 Some(OutputEvent::Error(e)) => {
456 debug!(error = %e, "Reader thread reported error");
457 break;
458 }
459 }
460 }
461
462 _ = async {
464 if let Some(timeout) = idle_timeout {
465 tokio::time::sleep(timeout).await;
466 } else {
467 std::future::pending::<()>().await;
469 }
470 } => {
471 warn!(
472 timeout_secs = self.config.idle_timeout_secs,
473 "Idle timeout triggered"
474 );
475 termination = TerminationType::IdleTimeout;
476 should_terminate.store(true, Ordering::SeqCst);
477 self.terminate_child(&mut child, true).await?;
478 break;
479 }
480 }
481
482 if let Some(status) = child
484 .try_wait()
485 .map_err(|e| io::Error::other(e.to_string()))?
486 {
487 let exit_code = status.exit_code() as i32;
488 debug!(exit_status = ?status, exit_code, "Child process exited");
489
490 while let Ok(event) = output_rx.try_recv() {
492 if let OutputEvent::Data(data) = event {
493 if !tui_connected {
494 io::stdout().write_all(&data)?;
495 io::stdout().flush()?;
496 }
497 output.extend_from_slice(&data);
498 }
499 }
500
501 let final_termination = resolve_termination_type(exit_code, termination);
502 return Ok(build_result(
504 &output,
505 status.success(),
506 Some(exit_code),
507 final_termination,
508 String::new(),
509 ));
510 }
511 }
512
513 should_terminate.store(true, Ordering::SeqCst);
515
516 let status = self
518 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
519 .await?;
520
521 let (success, exit_code, final_termination) = match status {
522 Some(s) => {
523 let code = s.exit_code() as i32;
524 (
525 s.success(),
526 Some(code),
527 resolve_termination_type(code, termination),
528 )
529 }
530 None => {
531 warn!("Timed out waiting for child to exit after termination");
532 (false, None, termination)
533 }
534 };
535
536 Ok(build_result(
538 &output,
539 success,
540 exit_code,
541 final_termination,
542 String::new(),
543 ))
544 }
545
546 pub async fn run_observe_streaming<H: StreamHandler>(
562 &self,
563 prompt: &str,
564 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
565 handler: &mut H,
566 ) -> io::Result<PtyExecutionResult> {
567 let output_format = self.backend.output_format;
569
570 let is_stream_json = output_format == OutputFormat::StreamJson;
573
574 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
576
577 let reader = pair
578 .master
579 .try_clone_reader()
580 .map_err(|e| io::Error::other(e.to_string()))?;
581
582 if let Some(ref input) = stdin_input {
584 tokio::time::sleep(Duration::from_millis(100)).await;
585 let mut writer = pair
586 .master
587 .take_writer()
588 .map_err(|e| io::Error::other(e.to_string()))?;
589 writer.write_all(input.as_bytes())?;
590 writer.write_all(b"\n")?;
591 writer.flush()?;
592 }
593
594 drop(pair.slave);
595
596 let mut output = Vec::new();
597 let mut line_buffer = String::new();
598 let mut extracted_text = String::new();
600 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
601 None
602 } else {
603 Some(Duration::from_secs(u64::from(
604 self.config.idle_timeout_secs,
605 )))
606 };
607
608 let mut termination = TerminationType::Natural;
609 let mut last_activity = Instant::now();
610
611 let should_terminate = Arc::new(AtomicBool::new(false));
612
613 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
615 let should_terminate_reader = Arc::clone(&should_terminate);
616 let tui_connected = self.tui_mode;
617 let tui_output_tx = if tui_connected {
618 Some(self.output_tx.clone())
619 } else {
620 None
621 };
622
623 debug!("Spawning PTY output reader thread (streaming mode)");
624 std::thread::spawn(move || {
625 let mut reader = reader;
626 let mut buf = [0u8; 4096];
627
628 loop {
629 if should_terminate_reader.load(Ordering::SeqCst) {
630 debug!("PTY reader: termination requested");
631 break;
632 }
633
634 match reader.read(&mut buf) {
635 Ok(0) => {
636 debug!("PTY reader: EOF");
637 let _ = output_tx.blocking_send(OutputEvent::Eof);
638 break;
639 }
640 Ok(n) => {
641 let data = buf[..n].to_vec();
642 if let Some(ref tx) = tui_output_tx {
643 let _ = tx.send(data.clone());
644 }
645 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
646 break;
647 }
648 }
649 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
650 std::thread::sleep(Duration::from_millis(10));
651 }
652 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
653 Err(e) => {
654 debug!(error = %e, "PTY reader error");
655 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
656 break;
657 }
658 }
659 }
660 });
661
662 loop {
664 let idle_timeout = timeout_duration.map(|d| {
665 let elapsed = last_activity.elapsed();
666 if elapsed >= d {
667 Duration::from_millis(1)
668 } else {
669 d.saturating_sub(elapsed)
670 }
671 });
672
673 tokio::select! {
674 _ = interrupt_rx.changed() => {
675 if *interrupt_rx.borrow() {
676 debug!("Interrupt received in streaming observe mode, terminating");
677 termination = TerminationType::UserInterrupt;
678 should_terminate.store(true, Ordering::SeqCst);
679 let _ = self.terminate_child(&mut child, true).await;
680 break;
681 }
682 }
683
684 event = output_rx.recv() => {
685 match event {
686 Some(OutputEvent::Data(data)) => {
687 output.extend_from_slice(&data);
688 last_activity = Instant::now();
689
690 if let Ok(text) = std::str::from_utf8(&data) {
691 if is_stream_json {
692 line_buffer.push_str(text);
694
695 while let Some(newline_pos) = line_buffer.find('\n') {
697 let line = line_buffer[..newline_pos].to_string();
698 line_buffer = line_buffer[newline_pos + 1..].to_string();
699
700 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
701 dispatch_stream_event(event, handler, &mut extracted_text);
702 }
703 }
704 } else {
705 handler.on_text(text);
708 }
709 }
710 }
711 Some(OutputEvent::Eof) | None => {
712 debug!("Output channel closed");
713 if is_stream_json && !line_buffer.is_empty()
715 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
716 {
717 dispatch_stream_event(event, handler, &mut extracted_text);
718 }
719 break;
720 }
721 Some(OutputEvent::Error(e)) => {
722 debug!(error = %e, "Reader thread reported error");
723 handler.on_error(&e);
724 break;
725 }
726 }
727 }
728
729 _ = async {
730 if let Some(timeout) = idle_timeout {
731 tokio::time::sleep(timeout).await;
732 } else {
733 std::future::pending::<()>().await;
734 }
735 } => {
736 warn!(
737 timeout_secs = self.config.idle_timeout_secs,
738 "Idle timeout triggered"
739 );
740 termination = TerminationType::IdleTimeout;
741 should_terminate.store(true, Ordering::SeqCst);
742 self.terminate_child(&mut child, true).await?;
743 break;
744 }
745 }
746
747 if let Some(status) = child
749 .try_wait()
750 .map_err(|e| io::Error::other(e.to_string()))?
751 {
752 let exit_code = status.exit_code() as i32;
753 debug!(exit_status = ?status, exit_code, "Child process exited");
754
755 while let Ok(event) = output_rx.try_recv() {
757 if let OutputEvent::Data(data) = event {
758 output.extend_from_slice(&data);
759 if let Ok(text) = std::str::from_utf8(&data) {
760 if is_stream_json {
761 line_buffer.push_str(text);
763 while let Some(newline_pos) = line_buffer.find('\n') {
764 let line = line_buffer[..newline_pos].to_string();
765 line_buffer = line_buffer[newline_pos + 1..].to_string();
766 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
767 dispatch_stream_event(event, handler, &mut extracted_text);
768 }
769 }
770 } else {
771 handler.on_text(text);
773 }
774 }
775 }
776 }
777
778 if is_stream_json
780 && !line_buffer.is_empty()
781 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
782 {
783 dispatch_stream_event(event, handler, &mut extracted_text);
784 }
785
786 let final_termination = resolve_termination_type(exit_code, termination);
787 return Ok(build_result(
789 &output,
790 status.success(),
791 Some(exit_code),
792 final_termination,
793 extracted_text,
794 ));
795 }
796 }
797
798 should_terminate.store(true, Ordering::SeqCst);
799
800 let status = self
801 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
802 .await?;
803
804 let (success, exit_code, final_termination) = match status {
805 Some(s) => {
806 let code = s.exit_code() as i32;
807 (
808 s.success(),
809 Some(code),
810 resolve_termination_type(code, termination),
811 )
812 }
813 None => {
814 warn!("Timed out waiting for child to exit after termination");
815 (false, None, termination)
816 }
817 };
818
819 Ok(build_result(
821 &output,
822 success,
823 exit_code,
824 final_termination,
825 extracted_text,
826 ))
827 }
828
829 #[allow(clippy::too_many_lines)] pub async fn run_interactive(
850 &mut self,
851 prompt: &str,
852 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
853 ) -> io::Result<PtyExecutionResult> {
854 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
856
857 let reader = pair
858 .master
859 .try_clone_reader()
860 .map_err(|e| io::Error::other(e.to_string()))?;
861 let mut writer = pair
862 .master
863 .take_writer()
864 .map_err(|e| io::Error::other(e.to_string()))?;
865
866 let master = pair.master;
868
869 drop(pair.slave);
871
872 let pending_stdin = stdin_input;
874
875 let mut output = Vec::new();
876 let timeout_duration = if self.config.idle_timeout_secs > 0 {
877 Some(Duration::from_secs(u64::from(
878 self.config.idle_timeout_secs,
879 )))
880 } else {
881 None
882 };
883
884 let mut ctrl_c_state = CtrlCState::new();
885 let mut termination = TerminationType::Natural;
886 let mut last_activity = Instant::now();
887
888 let should_terminate = Arc::new(AtomicBool::new(false));
890
891 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
893 let should_terminate_output = Arc::clone(&should_terminate);
894 let tui_connected = self.tui_mode;
896 let tui_output_tx = if tui_connected {
897 Some(self.output_tx.clone())
898 } else {
899 None
900 };
901
902 debug!("Spawning PTY output reader thread");
903 std::thread::spawn(move || {
904 debug!("PTY output reader thread started");
905 let mut reader = reader;
906 let mut buf = [0u8; 4096];
907
908 loop {
909 if should_terminate_output.load(Ordering::SeqCst) {
910 debug!("PTY output reader: termination requested");
911 break;
912 }
913
914 match reader.read(&mut buf) {
915 Ok(0) => {
916 debug!("PTY output reader: EOF received");
918 let _ = output_tx.blocking_send(OutputEvent::Eof);
919 break;
920 }
921 Ok(n) => {
922 let data = buf[..n].to_vec();
923 if let Some(ref tx) = tui_output_tx {
925 let _ = tx.send(data.clone());
926 }
927 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
929 debug!("PTY output reader: channel closed");
930 break;
931 }
932 }
933 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
934 std::thread::sleep(Duration::from_millis(1));
936 }
937 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
938 }
940 Err(e) => {
941 warn!("PTY output reader: error - {}", e);
942 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
943 break;
944 }
945 }
946 }
947 debug!("PTY output reader thread exiting");
948 });
949
950 let mut input_rx = if tui_connected {
955 debug!("TUI connected - skipping stdin reader thread");
956 None
957 } else {
958 let (input_tx, input_rx) = mpsc::unbounded_channel::<InputEvent>();
959 let should_terminate_input = Arc::clone(&should_terminate);
960
961 std::thread::spawn(move || {
962 let mut stdin = io::stdin();
963 let mut buf = [0u8; 1];
964
965 loop {
966 if should_terminate_input.load(Ordering::SeqCst) {
967 break;
968 }
969
970 match stdin.read(&mut buf) {
971 Ok(0) => break, Ok(1) => {
973 let byte = buf[0];
974 let event = match byte {
975 3 => InputEvent::CtrlC, 28 => InputEvent::CtrlBackslash, _ => InputEvent::Data(vec![byte]),
978 };
979 if input_tx.send(event).is_err() {
980 break;
981 }
982 }
983 Ok(_) => {} Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
985 Err(_) => break,
986 }
987 }
988 });
989 Some(input_rx)
990 };
991
992 if let Some(ref input) = pending_stdin {
995 tokio::time::sleep(Duration::from_millis(100)).await;
996 writer.write_all(input.as_bytes())?;
997 writer.write_all(b"\n")?;
998 writer.flush()?;
999 last_activity = Instant::now();
1000 }
1001
1002 loop {
1005 if let Some(status) = child
1007 .try_wait()
1008 .map_err(|e| io::Error::other(e.to_string()))?
1009 {
1010 let exit_code = status.exit_code() as i32;
1011 debug!(exit_status = ?status, exit_code, "Child process exited");
1012
1013 while let Ok(event) = output_rx.try_recv() {
1015 if let OutputEvent::Data(data) = event {
1016 if !tui_connected {
1017 io::stdout().write_all(&data)?;
1018 io::stdout().flush()?;
1019 }
1020 output.extend_from_slice(&data);
1021 }
1022 }
1023
1024 should_terminate.store(true, Ordering::SeqCst);
1025 let _ = self.terminated_tx.send(true);
1027
1028 let final_termination = resolve_termination_type(exit_code, termination);
1029 return Ok(build_result(
1031 &output,
1032 status.success(),
1033 Some(exit_code),
1034 final_termination,
1035 String::new(),
1036 ));
1037 }
1038
1039 let timeout_future = async {
1041 match timeout_duration {
1042 Some(d) => {
1043 let elapsed = last_activity.elapsed();
1044 if elapsed >= d {
1045 tokio::time::sleep(Duration::ZERO).await
1046 } else {
1047 tokio::time::sleep(d.saturating_sub(elapsed)).await
1048 }
1049 }
1050 None => std::future::pending::<()>().await,
1051 }
1052 };
1053
1054 tokio::select! {
1055 output_event = output_rx.recv() => {
1057 match output_event {
1058 Some(OutputEvent::Data(data)) => {
1059 if !tui_connected {
1061 io::stdout().write_all(&data)?;
1062 io::stdout().flush()?;
1063 }
1064 output.extend_from_slice(&data);
1065
1066 last_activity = Instant::now();
1067 }
1068 Some(OutputEvent::Eof) => {
1069 debug!("PTY EOF received");
1070 break;
1071 }
1072 Some(OutputEvent::Error(e)) => {
1073 debug!(error = %e, "PTY read error");
1074 break;
1075 }
1076 None => {
1077 break;
1079 }
1080 }
1081 }
1082
1083 input_event = async {
1085 match input_rx.as_mut() {
1086 Some(rx) => rx.recv().await,
1087 None => std::future::pending().await, }
1089 } => {
1090 match input_event {
1091 Some(InputEvent::CtrlC) => {
1092 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1093 CtrlCAction::ForwardAndStartWindow => {
1094 let _ = writer.write_all(&[3]);
1096 let _ = writer.flush();
1097 last_activity = Instant::now();
1098 }
1099 CtrlCAction::Terminate => {
1100 info!("Double Ctrl+C detected, terminating");
1101 termination = TerminationType::UserInterrupt;
1102 should_terminate.store(true, Ordering::SeqCst);
1103 self.terminate_child(&mut child, true).await?;
1104 break;
1105 }
1106 }
1107 }
1108 Some(InputEvent::CtrlBackslash) => {
1109 info!("Ctrl+\\ detected, force killing");
1110 termination = TerminationType::ForceKill;
1111 should_terminate.store(true, Ordering::SeqCst);
1112 self.terminate_child(&mut child, false).await?;
1113 break;
1114 }
1115 Some(InputEvent::Data(data)) => {
1116 let _ = writer.write_all(&data);
1118 let _ = writer.flush();
1119 last_activity = Instant::now();
1120 }
1121 None => {
1122 debug!("Input channel closed");
1124 }
1125 }
1126 }
1127
1128 tui_input = self.input_rx.recv() => {
1130 if let Some(data) = tui_input {
1131 match InputEvent::from_bytes(data) {
1132 InputEvent::CtrlC => {
1133 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1134 CtrlCAction::ForwardAndStartWindow => {
1135 let _ = writer.write_all(&[3]);
1136 let _ = writer.flush();
1137 last_activity = Instant::now();
1138 }
1139 CtrlCAction::Terminate => {
1140 info!("Double Ctrl+C detected, terminating");
1141 termination = TerminationType::UserInterrupt;
1142 should_terminate.store(true, Ordering::SeqCst);
1143 self.terminate_child(&mut child, true).await?;
1144 break;
1145 }
1146 }
1147 }
1148 InputEvent::CtrlBackslash => {
1149 info!("Ctrl+\\ detected, force killing");
1150 termination = TerminationType::ForceKill;
1151 should_terminate.store(true, Ordering::SeqCst);
1152 self.terminate_child(&mut child, false).await?;
1153 break;
1154 }
1155 InputEvent::Data(bytes) => {
1156 let _ = writer.write_all(&bytes);
1157 let _ = writer.flush();
1158 last_activity = Instant::now();
1159 }
1160 }
1161 }
1162 }
1163
1164 control_cmd = self.control_rx.recv() => {
1166 if let Some(cmd) = control_cmd {
1167 use crate::pty_handle::ControlCommand;
1168 match cmd {
1169 ControlCommand::Kill => {
1170 info!("Control command: Kill");
1171 termination = TerminationType::UserInterrupt;
1172 should_terminate.store(true, Ordering::SeqCst);
1173 self.terminate_child(&mut child, true).await?;
1174 break;
1175 }
1176 ControlCommand::Resize(cols, rows) => {
1177 debug!(cols, rows, "Control command: Resize");
1178 if let Err(e) = master.resize(PtySize {
1180 rows,
1181 cols,
1182 pixel_width: 0,
1183 pixel_height: 0,
1184 }) {
1185 warn!("Failed to resize PTY: {}", e);
1186 }
1187 }
1188 ControlCommand::Skip | ControlCommand::Abort => {
1189 debug!("Control command: {:?} (ignored at PTY level)", cmd);
1191 }
1192 }
1193 }
1194 }
1195
1196 _ = timeout_future => {
1198 warn!(
1199 timeout_secs = self.config.idle_timeout_secs,
1200 "Idle timeout triggered"
1201 );
1202 termination = TerminationType::IdleTimeout;
1203 should_terminate.store(true, Ordering::SeqCst);
1204 self.terminate_child(&mut child, true).await?;
1205 break;
1206 }
1207
1208 _ = interrupt_rx.changed() => {
1210 if *interrupt_rx.borrow() {
1211 debug!("Interrupt received in interactive mode, terminating");
1212 termination = TerminationType::UserInterrupt;
1213 should_terminate.store(true, Ordering::SeqCst);
1214 self.terminate_child(&mut child, true).await?;
1215 break;
1216 }
1217 }
1218 }
1219 }
1220
1221 should_terminate.store(true, Ordering::SeqCst);
1223
1224 let _ = self.terminated_tx.send(true);
1226
1227 let status = self
1229 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1230 .await?;
1231
1232 let (success, exit_code, final_termination) = match status {
1233 Some(s) => {
1234 let code = s.exit_code() as i32;
1235 (
1236 s.success(),
1237 Some(code),
1238 resolve_termination_type(code, termination),
1239 )
1240 }
1241 None => {
1242 warn!("Timed out waiting for child to exit after termination");
1243 (false, None, termination)
1244 }
1245 };
1246
1247 Ok(build_result(
1249 &output,
1250 success,
1251 exit_code,
1252 final_termination,
1253 String::new(),
1254 ))
1255 }
1256
1257 #[allow(clippy::unused_self)] #[allow(clippy::unused_async)] #[cfg(not(unix))]
1268 async fn terminate_child(
1269 &self,
1270 child: &mut Box<dyn portable_pty::Child + Send>,
1271 _graceful: bool,
1272 ) -> io::Result<()> {
1273 child.kill()
1274 }
1275
1276 #[cfg(unix)]
1277 async fn terminate_child(
1278 &self,
1279 child: &mut Box<dyn portable_pty::Child + Send>,
1280 graceful: bool,
1281 ) -> io::Result<()> {
1282 let pid = match child.process_id() {
1283 Some(id) => Pid::from_raw(id as i32),
1284 None => return Ok(()), };
1286
1287 if graceful {
1288 debug!(pid = %pid, "Sending SIGTERM");
1289 let _ = kill(pid, Signal::SIGTERM);
1290
1291 let grace_period = Duration::from_secs(2);
1293 let start = Instant::now();
1294
1295 while start.elapsed() < grace_period {
1296 if child
1297 .try_wait()
1298 .map_err(|e| io::Error::other(e.to_string()))?
1299 .is_some()
1300 {
1301 return Ok(());
1302 }
1303 tokio::time::sleep(Duration::from_millis(50)).await;
1305 }
1306
1307 debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1309 }
1310
1311 debug!(pid = %pid, "Sending SIGKILL");
1312 let _ = kill(pid, Signal::SIGKILL);
1313 Ok(())
1314 }
1315
1316 async fn wait_for_exit(
1321 &self,
1322 child: &mut Box<dyn portable_pty::Child + Send>,
1323 max_wait: Option<Duration>,
1324 interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1325 ) -> io::Result<Option<portable_pty::ExitStatus>> {
1326 let start = Instant::now();
1327
1328 loop {
1329 if let Some(status) = child
1330 .try_wait()
1331 .map_err(|e| io::Error::other(e.to_string()))?
1332 {
1333 return Ok(Some(status));
1334 }
1335
1336 if let Some(max) = max_wait
1337 && start.elapsed() >= max
1338 {
1339 return Ok(None);
1340 }
1341
1342 tokio::select! {
1343 _ = interrupt_rx.changed() => {
1344 if *interrupt_rx.borrow() {
1345 debug!("Interrupt received while waiting for child exit");
1346 return Ok(None);
1347 }
1348 }
1349 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1350 }
1351 }
1352 }
1353}
1354
1355#[derive(Debug)]
1357enum InputEvent {
1358 CtrlC,
1360 CtrlBackslash,
1362 Data(Vec<u8>),
1364}
1365
1366impl InputEvent {
1367 fn from_bytes(data: Vec<u8>) -> Self {
1369 if data.len() == 1 {
1370 match data[0] {
1371 3 => return InputEvent::CtrlC,
1372 28 => return InputEvent::CtrlBackslash,
1373 _ => {}
1374 }
1375 }
1376 InputEvent::Data(data)
1377 }
1378}
1379
1380#[derive(Debug)]
1382enum OutputEvent {
1383 Data(Vec<u8>),
1385 Eof,
1387 Error(String),
1389}
1390
1391fn strip_ansi(bytes: &[u8]) -> String {
1397 let stripped = strip_ansi_escapes::strip(bytes);
1398 String::from_utf8_lossy(&stripped).into_owned()
1399}
1400
1401fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1405 if exit_code == 130 {
1406 info!("Child process killed by SIGINT");
1407 TerminationType::UserInterrupt
1408 } else {
1409 default
1410 }
1411}
1412
1413fn dispatch_stream_event<H: StreamHandler>(
1416 event: ClaudeStreamEvent,
1417 handler: &mut H,
1418 extracted_text: &mut String,
1419) {
1420 match event {
1421 ClaudeStreamEvent::System { .. } => {
1422 }
1424 ClaudeStreamEvent::Assistant { message, .. } => {
1425 for block in message.content {
1426 match block {
1427 ContentBlock::Text { text } => {
1428 handler.on_text(&text);
1429 extracted_text.push_str(&text);
1431 extracted_text.push('\n');
1432 }
1433 ContentBlock::ToolUse { name, id, input } => {
1434 handler.on_tool_call(&name, &id, &input)
1435 }
1436 }
1437 }
1438 }
1439 ClaudeStreamEvent::User { message } => {
1440 for block in message.content {
1441 match block {
1442 UserContentBlock::ToolResult {
1443 tool_use_id,
1444 content,
1445 } => {
1446 handler.on_tool_result(&tool_use_id, &content);
1447 }
1448 }
1449 }
1450 }
1451 ClaudeStreamEvent::Result {
1452 duration_ms,
1453 total_cost_usd,
1454 num_turns,
1455 is_error,
1456 } => {
1457 if is_error {
1458 handler.on_error("Session ended with error");
1459 }
1460 handler.on_complete(&SessionResult {
1461 duration_ms,
1462 total_cost_usd,
1463 num_turns,
1464 is_error,
1465 });
1466 }
1467 }
1468}
1469
1470fn build_result(
1479 output: &[u8],
1480 success: bool,
1481 exit_code: Option<i32>,
1482 termination: TerminationType,
1483 extracted_text: String,
1484) -> PtyExecutionResult {
1485 PtyExecutionResult {
1486 output: String::from_utf8_lossy(output).to_string(),
1487 stripped_output: strip_ansi(output),
1488 extracted_text,
1489 success,
1490 exit_code,
1491 termination,
1492 }
1493}
1494
1495#[cfg(test)]
1496mod tests {
1497 use super::*;
1498
1499 #[test]
1500 fn test_double_ctrl_c_within_window() {
1501 let mut state = CtrlCState::new();
1502 let now = Instant::now();
1503
1504 let action = state.handle_ctrl_c(now);
1506 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1507
1508 let later = now + Duration::from_millis(500);
1510 let action = state.handle_ctrl_c(later);
1511 assert_eq!(action, CtrlCAction::Terminate);
1512 }
1513
1514 #[test]
1515 fn test_ctrl_c_window_expires() {
1516 let mut state = CtrlCState::new();
1517 let now = Instant::now();
1518
1519 state.handle_ctrl_c(now);
1521
1522 let later = now + Duration::from_secs(2);
1524
1525 let action = state.handle_ctrl_c(later);
1527 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1528 }
1529
1530 #[test]
1531 fn test_strip_ansi_basic() {
1532 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n";
1533 let stripped = strip_ansi(input);
1534 assert!(stripped.contains("Thinking..."));
1535 assert!(!stripped.contains("\x1b["));
1536 }
1537
1538 #[test]
1539 fn test_completion_promise_extraction() {
1540 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n\
1542 \x1b[2K\x1b[1;32m Done!\x1b[0m\r\n\
1543 \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
1544
1545 let stripped = strip_ansi(input);
1546
1547 assert!(stripped.contains("LOOP_COMPLETE"));
1549 assert!(!stripped.contains("\x1b["));
1550 }
1551
1552 #[test]
1553 fn test_event_tag_extraction() {
1554 let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
1556 Task completed successfully\r\n\
1557 \x1b[90m</event>\x1b[0m\r\n";
1558
1559 let stripped = strip_ansi(input);
1560
1561 assert!(stripped.contains("<event topic=\"build.done\">"));
1562 assert!(stripped.contains("</event>"));
1563 }
1564
1565 #[test]
1566 fn test_large_output_preserves_early_events() {
1567 let mut input = Vec::new();
1569
1570 input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
1572
1573 for i in 0..500 {
1575 input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
1576 }
1577
1578 let stripped = strip_ansi(&input);
1579
1580 assert!(
1582 stripped.contains("<event topic=\"build.task\">"),
1583 "Event tag was lost - strip_ansi is not preserving all content"
1584 );
1585 assert!(stripped.contains("Implement feature X"));
1586 assert!(stripped.contains("Line 499")); }
1588
1589 #[test]
1590 fn test_pty_config_defaults() {
1591 let config = PtyConfig::default();
1592 assert!(config.interactive);
1593 assert_eq!(config.idle_timeout_secs, 30);
1594 assert_eq!(config.cols, 80);
1595 assert_eq!(config.rows, 24);
1596 }
1597
1598 #[test]
1606 fn test_idle_timeout_reset_logic() {
1607 let timeout_duration = Duration::from_secs(30);
1609
1610 let simulated_25s = Duration::from_secs(25);
1612
1613 let remaining = timeout_duration.saturating_sub(simulated_25s);
1615 assert_eq!(remaining.as_secs(), 5);
1616
1617 let last_activity_after_reset = Instant::now();
1619
1620 let elapsed = last_activity_after_reset.elapsed();
1622 assert!(elapsed < Duration::from_millis(100)); let new_remaining = timeout_duration.saturating_sub(elapsed);
1626 assert!(new_remaining > Duration::from_secs(29)); }
1628
1629 #[test]
1630 fn test_extracted_text_field_exists() {
1631 let result = PtyExecutionResult {
1634 output: String::new(),
1635 stripped_output: String::new(),
1636 extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
1637 success: true,
1638 exit_code: Some(0),
1639 termination: TerminationType::Natural,
1640 };
1641
1642 assert!(
1643 result
1644 .extracted_text
1645 .contains("<event topic=\"build.done\">")
1646 );
1647 }
1648
1649 #[test]
1650 fn test_build_result_includes_extracted_text() {
1651 let output = b"raw output";
1653 let extracted = "extracted text with <event topic=\"test\">payload</event>";
1654 let result = build_result(
1655 output,
1656 true,
1657 Some(0),
1658 TerminationType::Natural,
1659 extracted.to_string(),
1660 );
1661
1662 assert_eq!(result.extracted_text, extracted);
1663 assert!(result.stripped_output.contains("raw output"));
1664 }
1665
1666 #[test]
1681 fn test_tui_mode_stdin_reader_bypass() {
1682 let tui_mode = true;
1688 let tui_connected = tui_mode;
1689
1690 assert!(
1693 tui_connected,
1694 "When tui_mode is true, stdin reader must be skipped"
1695 );
1696
1697 let tui_mode_disabled = false;
1699 let tui_connected_non_tui = tui_mode_disabled;
1700 assert!(
1701 !tui_connected_non_tui,
1702 "When tui_mode is false, stdin reader must be spawned"
1703 );
1704 }
1705
1706 #[test]
1707 fn test_tui_mode_default_is_false() {
1708 let backend = CliBackend::claude();
1710 let config = PtyConfig::default();
1711 let executor = PtyExecutor::new(backend, config);
1712
1713 assert!(!executor.tui_mode, "tui_mode should default to false");
1715 }
1716
1717 #[test]
1718 fn test_set_tui_mode() {
1719 let backend = CliBackend::claude();
1721 let config = PtyConfig::default();
1722 let mut executor = PtyExecutor::new(backend, config);
1723
1724 assert!(!executor.tui_mode, "tui_mode should start as false");
1726
1727 executor.set_tui_mode(true);
1729 assert!(
1730 executor.tui_mode,
1731 "tui_mode should be true after set_tui_mode(true)"
1732 );
1733
1734 executor.set_tui_mode(false);
1736 assert!(
1737 !executor.tui_mode,
1738 "tui_mode should be false after set_tui_mode(false)"
1739 );
1740 }
1741}