1#![allow(clippy::cast_possible_wrap)]
20
21use crate::cli_backend::{CliBackend, OutputFormat};
22use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
23use crate::stream_handler::{SessionResult, StreamHandler};
24use nix::sys::signal::{kill, Signal};
25use nix::unistd::Pid;
26use portable_pty::{native_pty_system, CommandBuilder, PtyPair, PtySize};
27use std::io::{self, Read, Write};
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use tokio::sync::{mpsc, watch};
32use tracing::{debug, info, warn};
33
34#[derive(Debug)]
36pub struct PtyExecutionResult {
37 pub output: String,
39 pub stripped_output: String,
41 pub extracted_text: String,
47 pub success: bool,
49 pub exit_code: Option<i32>,
51 pub termination: TerminationType,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum TerminationType {
58 Natural,
60 IdleTimeout,
62 UserInterrupt,
64 ForceKill,
66}
67
68#[derive(Debug, Clone)]
70pub struct PtyConfig {
71 pub interactive: bool,
73 pub idle_timeout_secs: u32,
75 pub cols: u16,
77 pub rows: u16,
79}
80
81impl Default for PtyConfig {
82 fn default() -> Self {
83 Self {
84 interactive: true,
85 idle_timeout_secs: 30,
86 cols: 80,
87 rows: 24,
88 }
89 }
90}
91
92impl PtyConfig {
93 pub fn from_env() -> Self {
95 let cols = std::env::var("COLUMNS")
96 .ok()
97 .and_then(|s| s.parse().ok())
98 .unwrap_or(80);
99 let rows = std::env::var("LINES")
100 .ok()
101 .and_then(|s| s.parse().ok())
102 .unwrap_or(24);
103
104 Self {
105 cols,
106 rows,
107 ..Default::default()
108 }
109 }
110}
111
112#[derive(Debug)]
114pub struct CtrlCState {
115 first_press: Option<Instant>,
117 window: Duration,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum CtrlCAction {
124 ForwardAndStartWindow,
126 Terminate,
128}
129
130impl CtrlCState {
131 pub fn new() -> Self {
133 Self {
134 first_press: None,
135 window: Duration::from_secs(1),
136 }
137 }
138
139 pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
141 match self.first_press {
142 Some(first) if now.duration_since(first) < self.window => {
143 self.first_press = None;
145 CtrlCAction::Terminate
146 }
147 _ => {
148 self.first_press = Some(now);
150 CtrlCAction::ForwardAndStartWindow
151 }
152 }
153 }
154}
155
156impl Default for CtrlCState {
157 fn default() -> Self {
158 Self::new()
159 }
160}
161
162pub struct PtyExecutor {
164 backend: CliBackend,
165 config: PtyConfig,
166 output_tx: mpsc::UnboundedSender<Vec<u8>>,
168 output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
169 input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
170 input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
171 control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
172 control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
173 terminated_tx: watch::Sender<bool>,
175 terminated_rx: Option<watch::Receiver<bool>>,
176}
177
178impl PtyExecutor {
179 pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
181 let (output_tx, output_rx) = mpsc::unbounded_channel();
182 let (input_tx, input_rx) = mpsc::unbounded_channel();
183 let (control_tx, control_rx) = mpsc::unbounded_channel();
184 let (terminated_tx, terminated_rx) = watch::channel(false);
185
186 Self {
187 backend,
188 config,
189 output_tx,
190 output_rx: Some(output_rx),
191 input_tx: Some(input_tx),
192 input_rx,
193 control_tx: Some(control_tx),
194 control_rx,
195 terminated_tx,
196 terminated_rx: Some(terminated_rx),
197 }
198 }
199
200 pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
204 crate::pty_handle::PtyHandle {
205 output_rx: self.output_rx.take().expect("handle() already called"),
206 input_tx: self.input_tx.take().expect("handle() already called"),
207 control_tx: self.control_tx.take().expect("handle() already called"),
208 terminated_rx: self.terminated_rx.take().expect("handle() already called"),
209 }
210 }
211
212 fn spawn_pty(&self, prompt: &str) -> io::Result<(PtyPair, Box<dyn portable_pty::Child + Send>, Option<String>, Option<tempfile::NamedTempFile>)> {
221 let pty_system = native_pty_system();
222
223 let pair = pty_system
224 .openpty(PtySize {
225 rows: self.config.rows,
226 cols: self.config.cols,
227 pixel_width: 0,
228 pixel_height: 0,
229 })
230 .map_err(|e| io::Error::other(e.to_string()))?;
231
232 let (cmd, args, stdin_input, temp_file) = self.backend.build_command(prompt, self.config.interactive);
233
234 let mut cmd_builder = CommandBuilder::new(&cmd);
235 cmd_builder.args(&args);
236
237 let cwd = std::env::current_dir()
239 .map_err(|e| io::Error::other(format!("Failed to get current directory: {}", e)))?;
240 cmd_builder.cwd(&cwd);
241
242 cmd_builder.env("TERM", "xterm-256color");
244 let child = pair
245 .slave
246 .spawn_command(cmd_builder)
247 .map_err(|e| io::Error::other(e.to_string()))?;
248
249 Ok((pair, child, stdin_input, temp_file))
251 }
252
253 pub async fn run_observe(
270 &self,
271 prompt: &str,
272 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
273 ) -> io::Result<PtyExecutionResult> {
274 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
276
277 let reader = pair.master.try_clone_reader()
278 .map_err(|e| io::Error::other(e.to_string()))?;
279
280 if let Some(ref input) = stdin_input {
282 tokio::time::sleep(Duration::from_millis(100)).await;
284 let mut writer = pair.master.take_writer()
285 .map_err(|e| io::Error::other(e.to_string()))?;
286 writer.write_all(input.as_bytes())?;
287 writer.write_all(b"\n")?;
288 writer.flush()?;
289 }
290
291 drop(pair.slave);
293
294 let mut output = Vec::new();
295 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
296 None
297 } else {
298 Some(Duration::from_secs(u64::from(self.config.idle_timeout_secs)))
299 };
300
301 let mut termination = TerminationType::Natural;
302 let mut last_activity = Instant::now();
303
304 let should_terminate = Arc::new(AtomicBool::new(false));
306
307 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
309 let should_terminate_reader = Arc::clone(&should_terminate);
310 let tui_connected = self.output_rx.is_none();
312 let tui_output_tx = if tui_connected {
313 Some(self.output_tx.clone())
314 } else {
315 None
316 };
317
318 debug!("Spawning PTY output reader thread (observe mode)");
319 std::thread::spawn(move || {
320 let mut reader = reader;
321 let mut buf = [0u8; 4096];
322
323 loop {
324 if should_terminate_reader.load(Ordering::SeqCst) {
325 debug!("PTY reader: termination requested");
326 break;
327 }
328
329 match reader.read(&mut buf) {
330 Ok(0) => {
331 debug!("PTY reader: EOF");
332 let _ = output_tx.blocking_send(OutputEvent::Eof);
333 break;
334 }
335 Ok(n) => {
336 let data = buf[..n].to_vec();
337 if let Some(ref tx) = tui_output_tx {
339 let _ = tx.send(data.clone());
340 }
341 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
343 break;
344 }
345 }
346 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
347 std::thread::sleep(Duration::from_millis(10));
348 }
349 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
350 Err(e) => {
351 debug!(error = %e, "PTY reader error");
352 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
353 break;
354 }
355 }
356 }
357 });
358
359 loop {
361 let idle_timeout = timeout_duration.map(|d| {
363 let elapsed = last_activity.elapsed();
364 if elapsed >= d {
365 Duration::from_millis(1) } else {
367 d - elapsed
368 }
369 });
370
371 tokio::select! {
372 _ = interrupt_rx.changed() => {
374 if *interrupt_rx.borrow() {
375 debug!("Interrupt received in observe mode, terminating");
376 termination = TerminationType::UserInterrupt;
377 should_terminate.store(true, Ordering::SeqCst);
378 let _ = self.terminate_child(&mut child, true).await;
379 break;
380 }
381 }
382
383 event = output_rx.recv() => {
385 match event {
386 Some(OutputEvent::Data(data)) => {
387 if !tui_connected {
389 io::stdout().write_all(&data)?;
390 io::stdout().flush()?;
391 }
392 output.extend_from_slice(&data);
393 last_activity = Instant::now();
394 }
395 Some(OutputEvent::Eof) | None => {
396 debug!("Output channel closed, process likely exited");
397 break;
398 }
399 Some(OutputEvent::Error(e)) => {
400 debug!(error = %e, "Reader thread reported error");
401 break;
402 }
403 }
404 }
405
406 _ = async {
408 if let Some(timeout) = idle_timeout {
409 tokio::time::sleep(timeout).await;
410 } else {
411 std::future::pending::<()>().await;
413 }
414 } => {
415 warn!(
416 timeout_secs = self.config.idle_timeout_secs,
417 "Idle timeout triggered"
418 );
419 termination = TerminationType::IdleTimeout;
420 should_terminate.store(true, Ordering::SeqCst);
421 self.terminate_child(&mut child, true).await?;
422 break;
423 }
424 }
425
426 if let Some(status) = child.try_wait().map_err(|e| io::Error::other(e.to_string()))? {
428 let exit_code = status.exit_code() as i32;
429 debug!(exit_status = ?status, exit_code, "Child process exited");
430
431 while let Ok(event) = output_rx.try_recv() {
433 if let OutputEvent::Data(data) = event {
434 if !tui_connected {
435 io::stdout().write_all(&data)?;
436 io::stdout().flush()?;
437 }
438 output.extend_from_slice(&data);
439 }
440 }
441
442 let final_termination = resolve_termination_type(exit_code, termination);
443 return Ok(build_result(&output, status.success(), Some(exit_code), final_termination, String::new()));
445 }
446 }
447
448 should_terminate.store(true, Ordering::SeqCst);
450
451 let status = self
453 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
454 .await?;
455
456 let (success, exit_code, final_termination) = match status {
457 Some(s) => {
458 let code = s.exit_code() as i32;
459 (s.success(), Some(code), resolve_termination_type(code, termination))
460 }
461 None => {
462 warn!("Timed out waiting for child to exit after termination");
463 (false, None, termination)
464 }
465 };
466
467 Ok(build_result(&output, success, exit_code, final_termination, String::new()))
469 }
470
471 pub async fn run_observe_streaming<H: StreamHandler>(
487 &self,
488 prompt: &str,
489 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
490 handler: &mut H,
491 ) -> io::Result<PtyExecutionResult> {
492 let output_format = self.backend.output_format;
494
495 if output_format != OutputFormat::StreamJson {
497 return self.run_observe(prompt, interrupt_rx).await;
498 }
499
500 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
502
503 let reader = pair.master.try_clone_reader()
504 .map_err(|e| io::Error::other(e.to_string()))?;
505
506 if let Some(ref input) = stdin_input {
508 tokio::time::sleep(Duration::from_millis(100)).await;
509 let mut writer = pair.master.take_writer()
510 .map_err(|e| io::Error::other(e.to_string()))?;
511 writer.write_all(input.as_bytes())?;
512 writer.write_all(b"\n")?;
513 writer.flush()?;
514 }
515
516 drop(pair.slave);
517
518 let mut output = Vec::new();
519 let mut line_buffer = String::new();
520 let mut extracted_text = String::new();
522 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
523 None
524 } else {
525 Some(Duration::from_secs(u64::from(self.config.idle_timeout_secs)))
526 };
527
528 let mut termination = TerminationType::Natural;
529 let mut last_activity = Instant::now();
530
531 let should_terminate = Arc::new(AtomicBool::new(false));
532
533 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
535 let should_terminate_reader = Arc::clone(&should_terminate);
536 let tui_connected = self.output_rx.is_none();
537 let tui_output_tx = if tui_connected {
538 Some(self.output_tx.clone())
539 } else {
540 None
541 };
542
543 debug!("Spawning PTY output reader thread (streaming mode)");
544 std::thread::spawn(move || {
545 let mut reader = reader;
546 let mut buf = [0u8; 4096];
547
548 loop {
549 if should_terminate_reader.load(Ordering::SeqCst) {
550 debug!("PTY reader: termination requested");
551 break;
552 }
553
554 match reader.read(&mut buf) {
555 Ok(0) => {
556 debug!("PTY reader: EOF");
557 let _ = output_tx.blocking_send(OutputEvent::Eof);
558 break;
559 }
560 Ok(n) => {
561 let data = buf[..n].to_vec();
562 if let Some(ref tx) = tui_output_tx {
563 let _ = tx.send(data.clone());
564 }
565 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
566 break;
567 }
568 }
569 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
570 std::thread::sleep(Duration::from_millis(10));
571 }
572 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
573 Err(e) => {
574 debug!(error = %e, "PTY reader error");
575 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
576 break;
577 }
578 }
579 }
580 });
581
582 loop {
584 let idle_timeout = timeout_duration.map(|d| {
585 let elapsed = last_activity.elapsed();
586 if elapsed >= d {
587 Duration::from_millis(1)
588 } else {
589 d - elapsed
590 }
591 });
592
593 tokio::select! {
594 _ = interrupt_rx.changed() => {
595 if *interrupt_rx.borrow() {
596 debug!("Interrupt received in streaming observe mode, terminating");
597 termination = TerminationType::UserInterrupt;
598 should_terminate.store(true, Ordering::SeqCst);
599 let _ = self.terminate_child(&mut child, true).await;
600 break;
601 }
602 }
603
604 event = output_rx.recv() => {
605 match event {
606 Some(OutputEvent::Data(data)) => {
607 output.extend_from_slice(&data);
608 last_activity = Instant::now();
609
610 if let Ok(text) = std::str::from_utf8(&data) {
612 line_buffer.push_str(text);
613
614 while let Some(newline_pos) = line_buffer.find('\n') {
616 let line = line_buffer[..newline_pos].to_string();
617 line_buffer = line_buffer[newline_pos + 1..].to_string();
618
619 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
620 dispatch_stream_event(event, handler, &mut extracted_text);
621 }
622 }
623 }
624 }
625 Some(OutputEvent::Eof) | None => {
626 debug!("Output channel closed");
627 if !line_buffer.is_empty() {
629 if let Some(event) = ClaudeStreamParser::parse_line(&line_buffer) {
630 dispatch_stream_event(event, handler, &mut extracted_text);
631 }
632 }
633 break;
634 }
635 Some(OutputEvent::Error(e)) => {
636 debug!(error = %e, "Reader thread reported error");
637 handler.on_error(&e);
638 break;
639 }
640 }
641 }
642
643 _ = async {
644 if let Some(timeout) = idle_timeout {
645 tokio::time::sleep(timeout).await;
646 } else {
647 std::future::pending::<()>().await;
648 }
649 } => {
650 warn!(
651 timeout_secs = self.config.idle_timeout_secs,
652 "Idle timeout triggered"
653 );
654 termination = TerminationType::IdleTimeout;
655 should_terminate.store(true, Ordering::SeqCst);
656 self.terminate_child(&mut child, true).await?;
657 break;
658 }
659 }
660
661 if let Some(status) = child.try_wait().map_err(|e| io::Error::other(e.to_string()))? {
663 let exit_code = status.exit_code() as i32;
664 debug!(exit_status = ?status, exit_code, "Child process exited");
665
666 while let Ok(event) = output_rx.try_recv() {
668 if let OutputEvent::Data(data) = event {
669 output.extend_from_slice(&data);
670 if let Ok(text) = std::str::from_utf8(&data) {
671 line_buffer.push_str(text);
672 while let Some(newline_pos) = line_buffer.find('\n') {
673 let line = line_buffer[..newline_pos].to_string();
674 line_buffer = line_buffer[newline_pos + 1..].to_string();
675 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
676 dispatch_stream_event(event, handler, &mut extracted_text);
677 }
678 }
679 }
680 }
681 }
682
683 if !line_buffer.is_empty() {
685 if let Some(event) = ClaudeStreamParser::parse_line(&line_buffer) {
686 dispatch_stream_event(event, handler, &mut extracted_text);
687 }
688 }
689
690 let final_termination = resolve_termination_type(exit_code, termination);
691 return Ok(build_result(&output, status.success(), Some(exit_code), final_termination, extracted_text));
693 }
694 }
695
696 should_terminate.store(true, Ordering::SeqCst);
697
698 let status = self
699 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
700 .await?;
701
702 let (success, exit_code, final_termination) = match status {
703 Some(s) => {
704 let code = s.exit_code() as i32;
705 (s.success(), Some(code), resolve_termination_type(code, termination))
706 }
707 None => {
708 warn!("Timed out waiting for child to exit after termination");
709 (false, None, termination)
710 }
711 };
712
713 Ok(build_result(&output, success, exit_code, final_termination, extracted_text))
715 }
716
717 #[allow(clippy::too_many_lines)] pub async fn run_interactive(
738 &mut self,
739 prompt: &str,
740 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
741 ) -> io::Result<PtyExecutionResult> {
742 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
744
745 let reader = pair.master.try_clone_reader()
746 .map_err(|e| io::Error::other(e.to_string()))?;
747 let mut writer = pair.master.take_writer()
748 .map_err(|e| io::Error::other(e.to_string()))?;
749
750 let master = pair.master;
752
753 drop(pair.slave);
755
756 let pending_stdin = stdin_input;
758
759 let mut output = Vec::new();
760 let timeout_duration = if self.config.idle_timeout_secs > 0 {
761 Some(Duration::from_secs(u64::from(self.config.idle_timeout_secs)))
762 } else {
763 None
764 };
765
766 let mut ctrl_c_state = CtrlCState::new();
767 let mut termination = TerminationType::Natural;
768 let mut last_activity = Instant::now();
769
770 let should_terminate = Arc::new(AtomicBool::new(false));
772
773 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
775 let should_terminate_output = Arc::clone(&should_terminate);
776 let tui_connected = self.output_rx.is_none();
778 let tui_output_tx = if tui_connected {
779 Some(self.output_tx.clone())
780 } else {
781 None
782 };
783
784 debug!("Spawning PTY output reader thread");
785 std::thread::spawn(move || {
786 debug!("PTY output reader thread started");
787 let mut reader = reader;
788 let mut buf = [0u8; 4096];
789
790 loop {
791 if should_terminate_output.load(Ordering::SeqCst) {
792 debug!("PTY output reader: termination requested");
793 break;
794 }
795
796 match reader.read(&mut buf) {
797 Ok(0) => {
798 debug!("PTY output reader: EOF received");
800 let _ = output_tx.blocking_send(OutputEvent::Eof);
801 break;
802 }
803 Ok(n) => {
804 let data = buf[..n].to_vec();
805 if let Some(ref tx) = tui_output_tx {
807 let _ = tx.send(data.clone());
808 }
809 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
811 debug!("PTY output reader: channel closed");
812 break;
813 }
814 }
815 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
816 std::thread::sleep(Duration::from_millis(1));
818 }
819 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
820 }
822 Err(e) => {
823 warn!("PTY output reader: error - {}", e);
824 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
825 break;
826 }
827 }
828 }
829 debug!("PTY output reader thread exiting");
830 });
831
832 let (input_tx, mut input_rx) = mpsc::unbounded_channel::<InputEvent>();
834 let should_terminate_input = Arc::clone(&should_terminate);
835
836 std::thread::spawn(move || {
837 let mut stdin = io::stdin();
838 let mut buf = [0u8; 1];
839
840 loop {
841 if should_terminate_input.load(Ordering::SeqCst) {
842 break;
843 }
844
845 match stdin.read(&mut buf) {
846 Ok(0) => break, Ok(1) => {
848 let byte = buf[0];
849 let event = match byte {
850 3 => InputEvent::CtrlC, 28 => InputEvent::CtrlBackslash, _ => InputEvent::Data(vec![byte]),
853 };
854 if input_tx.send(event).is_err() {
855 break;
856 }
857 }
858 Ok(_) => {} Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
860 Err(_) => break,
861 }
862 }
863 });
864
865 if let Some(ref input) = pending_stdin {
868 tokio::time::sleep(Duration::from_millis(100)).await;
869 writer.write_all(input.as_bytes())?;
870 writer.write_all(b"\n")?;
871 writer.flush()?;
872 last_activity = Instant::now();
873 }
874
875 loop {
878 if let Some(status) = child.try_wait()
880 .map_err(|e| io::Error::other(e.to_string()))?
881 {
882 let exit_code = status.exit_code() as i32;
883 debug!(exit_status = ?status, exit_code, "Child process exited");
884
885 while let Ok(event) = output_rx.try_recv() {
887 if let OutputEvent::Data(data) = event {
888 if !tui_connected {
889 io::stdout().write_all(&data)?;
890 io::stdout().flush()?;
891 }
892 output.extend_from_slice(&data);
893 }
894 }
895
896 should_terminate.store(true, Ordering::SeqCst);
897 let _ = self.terminated_tx.send(true);
899
900 let final_termination = resolve_termination_type(exit_code, termination);
901 return Ok(build_result(&output, status.success(), Some(exit_code), final_termination, String::new()));
903 }
904
905 let timeout_future = async {
907 match timeout_duration {
908 Some(d) => {
909 let elapsed = last_activity.elapsed();
910 if elapsed >= d {
911 tokio::time::sleep(Duration::ZERO).await
912 } else {
913 tokio::time::sleep(d - elapsed).await
914 }
915 }
916 None => std::future::pending::<()>().await,
917 }
918 };
919
920 tokio::select! {
921 output_event = output_rx.recv() => {
923 match output_event {
924 Some(OutputEvent::Data(data)) => {
925 if !tui_connected {
927 io::stdout().write_all(&data)?;
928 io::stdout().flush()?;
929 }
930 output.extend_from_slice(&data);
931
932 last_activity = Instant::now();
933 }
934 Some(OutputEvent::Eof) => {
935 debug!("PTY EOF received");
936 break;
937 }
938 Some(OutputEvent::Error(e)) => {
939 debug!(error = %e, "PTY read error");
940 break;
941 }
942 None => {
943 break;
945 }
946 }
947 }
948
949 input_event = async { input_rx.recv().await } => {
951 match input_event {
952 Some(InputEvent::CtrlC) => {
953 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
954 CtrlCAction::ForwardAndStartWindow => {
955 let _ = writer.write_all(&[3]);
957 let _ = writer.flush();
958 last_activity = Instant::now();
959 }
960 CtrlCAction::Terminate => {
961 info!("Double Ctrl+C detected, terminating");
962 termination = TerminationType::UserInterrupt;
963 should_terminate.store(true, Ordering::SeqCst);
964 self.terminate_child(&mut child, true).await?;
965 break;
966 }
967 }
968 }
969 Some(InputEvent::CtrlBackslash) => {
970 info!("Ctrl+\\ detected, force killing");
971 termination = TerminationType::ForceKill;
972 should_terminate.store(true, Ordering::SeqCst);
973 self.terminate_child(&mut child, false).await?;
974 break;
975 }
976 Some(InputEvent::Data(data)) => {
977 let _ = writer.write_all(&data);
979 let _ = writer.flush();
980 last_activity = Instant::now();
981 }
982 None => {
983 debug!("Input channel closed");
985 }
986 }
987 }
988
989 tui_input = self.input_rx.recv() => {
991 if let Some(data) = tui_input {
992 match InputEvent::from_bytes(data) {
993 InputEvent::CtrlC => {
994 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
995 CtrlCAction::ForwardAndStartWindow => {
996 let _ = writer.write_all(&[3]);
997 let _ = writer.flush();
998 last_activity = Instant::now();
999 }
1000 CtrlCAction::Terminate => {
1001 info!("Double Ctrl+C detected, terminating");
1002 termination = TerminationType::UserInterrupt;
1003 should_terminate.store(true, Ordering::SeqCst);
1004 self.terminate_child(&mut child, true).await?;
1005 break;
1006 }
1007 }
1008 }
1009 InputEvent::CtrlBackslash => {
1010 info!("Ctrl+\\ detected, force killing");
1011 termination = TerminationType::ForceKill;
1012 should_terminate.store(true, Ordering::SeqCst);
1013 self.terminate_child(&mut child, false).await?;
1014 break;
1015 }
1016 InputEvent::Data(bytes) => {
1017 let _ = writer.write_all(&bytes);
1018 let _ = writer.flush();
1019 last_activity = Instant::now();
1020 }
1021 }
1022 }
1023 }
1024
1025 control_cmd = self.control_rx.recv() => {
1027 if let Some(cmd) = control_cmd {
1028 use crate::pty_handle::ControlCommand;
1029 match cmd {
1030 ControlCommand::Kill => {
1031 info!("Control command: Kill");
1032 termination = TerminationType::UserInterrupt;
1033 should_terminate.store(true, Ordering::SeqCst);
1034 self.terminate_child(&mut child, true).await?;
1035 break;
1036 }
1037 ControlCommand::Resize(cols, rows) => {
1038 debug!(cols, rows, "Control command: Resize");
1039 if let Err(e) = master.resize(PtySize {
1041 rows,
1042 cols,
1043 pixel_width: 0,
1044 pixel_height: 0,
1045 }) {
1046 warn!("Failed to resize PTY: {}", e);
1047 }
1048 }
1049 ControlCommand::Skip | ControlCommand::Abort => {
1050 debug!("Control command: {:?} (ignored at PTY level)", cmd);
1052 }
1053 }
1054 }
1055 }
1056
1057 _ = timeout_future => {
1059 warn!(
1060 timeout_secs = self.config.idle_timeout_secs,
1061 "Idle timeout triggered"
1062 );
1063 termination = TerminationType::IdleTimeout;
1064 should_terminate.store(true, Ordering::SeqCst);
1065 self.terminate_child(&mut child, true).await?;
1066 break;
1067 }
1068
1069 _ = interrupt_rx.changed() => {
1071 if *interrupt_rx.borrow() {
1072 debug!("Interrupt received in interactive mode, terminating");
1073 termination = TerminationType::UserInterrupt;
1074 should_terminate.store(true, Ordering::SeqCst);
1075 self.terminate_child(&mut child, true).await?;
1076 break;
1077 }
1078 }
1079 }
1080 }
1081
1082 should_terminate.store(true, Ordering::SeqCst);
1084
1085 let _ = self.terminated_tx.send(true);
1087
1088 let status = self
1090 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1091 .await?;
1092
1093 let (success, exit_code, final_termination) = match status {
1094 Some(s) => {
1095 let code = s.exit_code() as i32;
1096 (s.success(), Some(code), resolve_termination_type(code, termination))
1097 }
1098 None => {
1099 warn!("Timed out waiting for child to exit after termination");
1100 (false, None, termination)
1101 }
1102 };
1103
1104 Ok(build_result(&output, success, exit_code, final_termination, String::new()))
1106 }
1107
1108 #[allow(clippy::unused_self)] async fn terminate_child(&self, child: &mut Box<dyn portable_pty::Child + Send>, graceful: bool) -> io::Result<()> {
1118 let pid = match child.process_id() {
1119 Some(id) => Pid::from_raw(id as i32),
1120 None => return Ok(()), };
1122
1123 if graceful {
1124 debug!(pid = %pid, "Sending SIGTERM");
1125 let _ = kill(pid, Signal::SIGTERM);
1126
1127 let grace_period = Duration::from_secs(2);
1129 let start = Instant::now();
1130
1131 while start.elapsed() < grace_period {
1132 if child.try_wait()
1133 .map_err(|e| io::Error::other(e.to_string()))?
1134 .is_some()
1135 {
1136 return Ok(());
1137 }
1138 tokio::time::sleep(Duration::from_millis(50)).await;
1140 }
1141
1142 debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1144 }
1145
1146 debug!(pid = %pid, "Sending SIGKILL");
1147 let _ = kill(pid, Signal::SIGKILL);
1148 Ok(())
1149 }
1150
1151 async fn wait_for_exit(
1156 &self,
1157 child: &mut Box<dyn portable_pty::Child + Send>,
1158 max_wait: Option<Duration>,
1159 interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1160 ) -> io::Result<Option<portable_pty::ExitStatus>> {
1161 let start = Instant::now();
1162
1163 loop {
1164 if let Some(status) = child
1165 .try_wait()
1166 .map_err(|e| io::Error::other(e.to_string()))?
1167 {
1168 return Ok(Some(status));
1169 }
1170
1171 if let Some(max) = max_wait {
1172 if start.elapsed() >= max {
1173 return Ok(None);
1174 }
1175 }
1176
1177 tokio::select! {
1178 _ = interrupt_rx.changed() => {
1179 if *interrupt_rx.borrow() {
1180 debug!("Interrupt received while waiting for child exit");
1181 return Ok(None);
1182 }
1183 }
1184 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1185 }
1186 }
1187 }
1188}
1189
1190#[derive(Debug)]
1192enum InputEvent {
1193 CtrlC,
1195 CtrlBackslash,
1197 Data(Vec<u8>),
1199}
1200
1201impl InputEvent {
1202 fn from_bytes(data: Vec<u8>) -> Self {
1204 if data.len() == 1 {
1205 match data[0] {
1206 3 => return InputEvent::CtrlC,
1207 28 => return InputEvent::CtrlBackslash,
1208 _ => {}
1209 }
1210 }
1211 InputEvent::Data(data)
1212 }
1213}
1214
1215#[derive(Debug)]
1217enum OutputEvent {
1218 Data(Vec<u8>),
1220 Eof,
1222 Error(String),
1224}
1225
1226fn strip_ansi(bytes: &[u8]) -> String {
1232 let stripped = strip_ansi_escapes::strip(bytes);
1233 String::from_utf8_lossy(&stripped).into_owned()
1234}
1235
1236fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1240 if exit_code == 130 {
1241 info!("Child process killed by SIGINT");
1242 TerminationType::UserInterrupt
1243 } else {
1244 default
1245 }
1246}
1247
1248fn dispatch_stream_event<H: StreamHandler>(
1251 event: ClaudeStreamEvent,
1252 handler: &mut H,
1253 extracted_text: &mut String,
1254) {
1255 match event {
1256 ClaudeStreamEvent::System { .. } => {
1257 }
1259 ClaudeStreamEvent::Assistant { message, .. } => {
1260 for block in message.content {
1261 match block {
1262 ContentBlock::Text { text } => {
1263 handler.on_text(&text);
1264 extracted_text.push_str(&text);
1266 extracted_text.push('\n');
1267 }
1268 ContentBlock::ToolUse { name, id, input } => {
1269 handler.on_tool_call(&name, &id, &input)
1270 }
1271 }
1272 }
1273 }
1274 ClaudeStreamEvent::User { message } => {
1275 for block in message.content {
1276 match block {
1277 UserContentBlock::ToolResult { tool_use_id, content } => {
1278 handler.on_tool_result(&tool_use_id, &content);
1279 }
1280 }
1281 }
1282 }
1283 ClaudeStreamEvent::Result { duration_ms, total_cost_usd, num_turns, is_error } => {
1284 if is_error {
1285 handler.on_error("Session ended with error");
1286 }
1287 handler.on_complete(&SessionResult {
1288 duration_ms,
1289 total_cost_usd,
1290 num_turns,
1291 is_error,
1292 });
1293 }
1294 }
1295}
1296
1297fn build_result(
1306 output: &[u8],
1307 success: bool,
1308 exit_code: Option<i32>,
1309 termination: TerminationType,
1310 extracted_text: String,
1311) -> PtyExecutionResult {
1312 PtyExecutionResult {
1313 output: String::from_utf8_lossy(output).to_string(),
1314 stripped_output: strip_ansi(output),
1315 extracted_text,
1316 success,
1317 exit_code,
1318 termination,
1319 }
1320}
1321
1322#[cfg(test)]
1323mod tests {
1324 use super::*;
1325
1326 #[test]
1327 fn test_double_ctrl_c_within_window() {
1328 let mut state = CtrlCState::new();
1329 let now = Instant::now();
1330
1331 let action = state.handle_ctrl_c(now);
1333 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1334
1335 let later = now + Duration::from_millis(500);
1337 let action = state.handle_ctrl_c(later);
1338 assert_eq!(action, CtrlCAction::Terminate);
1339 }
1340
1341 #[test]
1342 fn test_ctrl_c_window_expires() {
1343 let mut state = CtrlCState::new();
1344 let now = Instant::now();
1345
1346 state.handle_ctrl_c(now);
1348
1349 let later = now + Duration::from_secs(2);
1351
1352 let action = state.handle_ctrl_c(later);
1354 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1355 }
1356
1357 #[test]
1358 fn test_strip_ansi_basic() {
1359 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n";
1360 let stripped = strip_ansi(input);
1361 assert!(stripped.contains("Thinking..."));
1362 assert!(!stripped.contains("\x1b["));
1363 }
1364
1365 #[test]
1366 fn test_completion_promise_extraction() {
1367 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n\
1369 \x1b[2K\x1b[1;32m Done!\x1b[0m\r\n\
1370 \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
1371
1372 let stripped = strip_ansi(input);
1373
1374 assert!(stripped.contains("LOOP_COMPLETE"));
1376 assert!(!stripped.contains("\x1b["));
1377 }
1378
1379 #[test]
1380 fn test_event_tag_extraction() {
1381 let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
1383 Task completed successfully\r\n\
1384 \x1b[90m</event>\x1b[0m\r\n";
1385
1386 let stripped = strip_ansi(input);
1387
1388 assert!(stripped.contains("<event topic=\"build.done\">"));
1389 assert!(stripped.contains("</event>"));
1390 }
1391
1392 #[test]
1393 fn test_large_output_preserves_early_events() {
1394 let mut input = Vec::new();
1396
1397 input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
1399
1400 for i in 0..500 {
1402 input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
1403 }
1404
1405 let stripped = strip_ansi(&input);
1406
1407 assert!(
1409 stripped.contains("<event topic=\"build.task\">"),
1410 "Event tag was lost - strip_ansi is not preserving all content"
1411 );
1412 assert!(stripped.contains("Implement feature X"));
1413 assert!(stripped.contains("Line 499")); }
1415
1416 #[test]
1417 fn test_pty_config_defaults() {
1418 let config = PtyConfig::default();
1419 assert!(config.interactive);
1420 assert_eq!(config.idle_timeout_secs, 30);
1421 assert_eq!(config.cols, 80);
1422 assert_eq!(config.rows, 24);
1423 }
1424
1425 #[test]
1433 fn test_idle_timeout_reset_logic() {
1434 let timeout_duration = Duration::from_secs(30);
1436
1437 let simulated_25s = Duration::from_secs(25);
1439
1440 let remaining = timeout_duration.saturating_sub(simulated_25s);
1442 assert_eq!(remaining.as_secs(), 5);
1443
1444 let last_activity_after_reset = Instant::now();
1446
1447 let elapsed = last_activity_after_reset.elapsed();
1449 assert!(elapsed < Duration::from_millis(100)); let new_remaining = timeout_duration.saturating_sub(elapsed);
1453 assert!(new_remaining > Duration::from_secs(29)); }
1455
1456 #[test]
1457 fn test_extracted_text_field_exists() {
1458 let result = PtyExecutionResult {
1461 output: String::new(),
1462 stripped_output: String::new(),
1463 extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
1464 success: true,
1465 exit_code: Some(0),
1466 termination: TerminationType::Natural,
1467 };
1468
1469 assert!(result.extracted_text.contains("<event topic=\"build.done\">"));
1470 }
1471
1472 #[test]
1473 fn test_build_result_includes_extracted_text() {
1474 let output = b"raw output";
1476 let extracted = "extracted text with <event topic=\"test\">payload</event>";
1477 let result = build_result(output, true, Some(0), TerminationType::Natural, extracted.to_string());
1478
1479 assert_eq!(result.extracted_text, extracted);
1480 assert!(result.stripped_output.contains("raw output"));
1481 }
1482}