1#![allow(clippy::cast_possible_wrap)]
20
21use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
22use crate::cli_backend::{CliBackend, OutputFormat};
23use crate::stream_handler::{SessionResult, StreamHandler};
24#[cfg(unix)]
25use nix::sys::signal::{Signal, kill};
26#[cfg(unix)]
27use nix::unistd::Pid;
28use portable_pty::{CommandBuilder, PtyPair, PtySize, native_pty_system};
29use std::io::{self, Read, Write};
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use std::time::{Duration, Instant};
33use tokio::sync::{mpsc, watch};
34use tracing::{debug, info, warn};
35
36#[derive(Debug)]
38pub struct PtyExecutionResult {
39 pub output: String,
41 pub stripped_output: String,
43 pub extracted_text: String,
49 pub success: bool,
51 pub exit_code: Option<i32>,
53 pub termination: TerminationType,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum TerminationType {
60 Natural,
62 IdleTimeout,
64 UserInterrupt,
66 ForceKill,
68}
69
70#[derive(Debug, Clone)]
72pub struct PtyConfig {
73 pub interactive: bool,
75 pub idle_timeout_secs: u32,
77 pub cols: u16,
79 pub rows: u16,
81 pub workspace_root: std::path::PathBuf,
85}
86
87impl Default for PtyConfig {
88 fn default() -> Self {
89 Self {
90 interactive: true,
91 idle_timeout_secs: 30,
92 cols: 80,
93 rows: 24,
94 workspace_root: std::env::current_dir()
95 .unwrap_or_else(|_| std::path::PathBuf::from(".")),
96 }
97 }
98}
99
100impl PtyConfig {
101 pub fn from_env() -> Self {
103 let cols = std::env::var("COLUMNS")
104 .ok()
105 .and_then(|s| s.parse().ok())
106 .unwrap_or(80);
107 let rows = std::env::var("LINES")
108 .ok()
109 .and_then(|s| s.parse().ok())
110 .unwrap_or(24);
111
112 Self {
113 cols,
114 rows,
115 ..Default::default()
116 }
117 }
118
119 pub fn with_workspace_root(mut self, root: impl Into<std::path::PathBuf>) -> Self {
121 self.workspace_root = root.into();
122 self
123 }
124}
125
126#[derive(Debug)]
128pub struct CtrlCState {
129 first_press: Option<Instant>,
131 window: Duration,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum CtrlCAction {
138 ForwardAndStartWindow,
140 Terminate,
142}
143
144impl CtrlCState {
145 pub fn new() -> Self {
147 Self {
148 first_press: None,
149 window: Duration::from_secs(1),
150 }
151 }
152
153 pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
155 match self.first_press {
156 Some(first) if now.duration_since(first) < self.window => {
157 self.first_press = None;
159 CtrlCAction::Terminate
160 }
161 _ => {
162 self.first_press = Some(now);
164 CtrlCAction::ForwardAndStartWindow
165 }
166 }
167 }
168}
169
170impl Default for CtrlCState {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176pub struct PtyExecutor {
178 backend: CliBackend,
179 config: PtyConfig,
180 output_tx: mpsc::UnboundedSender<Vec<u8>>,
182 output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
183 input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
184 input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
185 control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
186 control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
187 terminated_tx: watch::Sender<bool>,
189 terminated_rx: Option<watch::Receiver<bool>>,
190 tui_mode: bool,
194}
195
196impl PtyExecutor {
197 pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
199 let (output_tx, output_rx) = mpsc::unbounded_channel();
200 let (input_tx, input_rx) = mpsc::unbounded_channel();
201 let (control_tx, control_rx) = mpsc::unbounded_channel();
202 let (terminated_tx, terminated_rx) = watch::channel(false);
203
204 Self {
205 backend,
206 config,
207 output_tx,
208 output_rx: Some(output_rx),
209 input_tx: Some(input_tx),
210 input_rx,
211 control_tx: Some(control_tx),
212 control_rx,
213 terminated_tx,
214 terminated_rx: Some(terminated_rx),
215 tui_mode: false,
216 }
217 }
218
219 pub fn set_tui_mode(&mut self, enabled: bool) {
228 self.tui_mode = enabled;
229 }
230
231 pub fn set_backend(&mut self, backend: CliBackend) {
239 self.backend = backend;
240 }
241
242 pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
246 crate::pty_handle::PtyHandle {
247 output_rx: self.output_rx.take().expect("handle() already called"),
248 input_tx: self.input_tx.take().expect("handle() already called"),
249 control_tx: self.control_tx.take().expect("handle() already called"),
250 terminated_rx: self.terminated_rx.take().expect("handle() already called"),
251 }
252 }
253
254 fn spawn_pty(
263 &self,
264 prompt: &str,
265 ) -> io::Result<(
266 PtyPair,
267 Box<dyn portable_pty::Child + Send>,
268 Option<String>,
269 Option<tempfile::NamedTempFile>,
270 )> {
271 let pty_system = native_pty_system();
272
273 let pair = pty_system
274 .openpty(PtySize {
275 rows: self.config.rows,
276 cols: self.config.cols,
277 pixel_width: 0,
278 pixel_height: 0,
279 })
280 .map_err(|e| io::Error::other(e.to_string()))?;
281
282 let (cmd, args, stdin_input, temp_file) =
283 self.backend.build_command(prompt, self.config.interactive);
284
285 let mut cmd_builder = CommandBuilder::new(&cmd);
286 cmd_builder.args(&args);
287
288 cmd_builder.cwd(&self.config.workspace_root);
291
292 cmd_builder.env("TERM", "xterm-256color");
294 let child = pair
295 .slave
296 .spawn_command(cmd_builder)
297 .map_err(|e| io::Error::other(e.to_string()))?;
298
299 Ok((pair, child, stdin_input, temp_file))
301 }
302
303 pub async fn run_observe(
320 &self,
321 prompt: &str,
322 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
323 ) -> io::Result<PtyExecutionResult> {
324 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
326
327 let reader = pair
328 .master
329 .try_clone_reader()
330 .map_err(|e| io::Error::other(e.to_string()))?;
331
332 if let Some(ref input) = stdin_input {
334 tokio::time::sleep(Duration::from_millis(100)).await;
336 let mut writer = pair
337 .master
338 .take_writer()
339 .map_err(|e| io::Error::other(e.to_string()))?;
340 writer.write_all(input.as_bytes())?;
341 writer.write_all(b"\n")?;
342 writer.flush()?;
343 }
344
345 drop(pair.slave);
347
348 let mut output = Vec::new();
349 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
350 None
351 } else {
352 Some(Duration::from_secs(u64::from(
353 self.config.idle_timeout_secs,
354 )))
355 };
356
357 let mut termination = TerminationType::Natural;
358 let mut last_activity = Instant::now();
359
360 let should_terminate = Arc::new(AtomicBool::new(false));
362
363 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
365 let should_terminate_reader = Arc::clone(&should_terminate);
366 let tui_connected = self.tui_mode;
368 let tui_output_tx = if tui_connected {
369 Some(self.output_tx.clone())
370 } else {
371 None
372 };
373
374 debug!("Spawning PTY output reader thread (observe mode)");
375 std::thread::spawn(move || {
376 let mut reader = reader;
377 let mut buf = [0u8; 4096];
378
379 loop {
380 if should_terminate_reader.load(Ordering::SeqCst) {
381 debug!("PTY reader: termination requested");
382 break;
383 }
384
385 match reader.read(&mut buf) {
386 Ok(0) => {
387 debug!("PTY reader: EOF");
388 let _ = output_tx.blocking_send(OutputEvent::Eof);
389 break;
390 }
391 Ok(n) => {
392 let data = buf[..n].to_vec();
393 if let Some(ref tx) = tui_output_tx {
395 let _ = tx.send(data.clone());
396 }
397 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
399 break;
400 }
401 }
402 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
403 std::thread::sleep(Duration::from_millis(10));
404 }
405 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
406 Err(e) => {
407 debug!(error = %e, "PTY reader error");
408 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
409 break;
410 }
411 }
412 }
413 });
414
415 loop {
417 let idle_timeout = timeout_duration.map(|d| {
419 let elapsed = last_activity.elapsed();
420 if elapsed >= d {
421 Duration::from_millis(1) } else {
423 d.saturating_sub(elapsed)
424 }
425 });
426
427 tokio::select! {
428 _ = interrupt_rx.changed() => {
430 if *interrupt_rx.borrow() {
431 debug!("Interrupt received in observe mode, terminating");
432 termination = TerminationType::UserInterrupt;
433 should_terminate.store(true, Ordering::SeqCst);
434 let _ = self.terminate_child(&mut child, true).await;
435 break;
436 }
437 }
438
439 event = output_rx.recv() => {
441 match event {
442 Some(OutputEvent::Data(data)) => {
443 if !tui_connected {
445 io::stdout().write_all(&data)?;
446 io::stdout().flush()?;
447 }
448 output.extend_from_slice(&data);
449 last_activity = Instant::now();
450 }
451 Some(OutputEvent::Eof) | None => {
452 debug!("Output channel closed, process likely exited");
453 break;
454 }
455 Some(OutputEvent::Error(e)) => {
456 debug!(error = %e, "Reader thread reported error");
457 break;
458 }
459 }
460 }
461
462 _ = async {
464 if let Some(timeout) = idle_timeout {
465 tokio::time::sleep(timeout).await;
466 } else {
467 std::future::pending::<()>().await;
469 }
470 } => {
471 warn!(
472 timeout_secs = self.config.idle_timeout_secs,
473 "Idle timeout triggered"
474 );
475 termination = TerminationType::IdleTimeout;
476 should_terminate.store(true, Ordering::SeqCst);
477 self.terminate_child(&mut child, true).await?;
478 break;
479 }
480 }
481
482 if let Some(status) = child
484 .try_wait()
485 .map_err(|e| io::Error::other(e.to_string()))?
486 {
487 let exit_code = status.exit_code() as i32;
488 debug!(exit_status = ?status, exit_code, "Child process exited");
489
490 while let Ok(event) = output_rx.try_recv() {
492 if let OutputEvent::Data(data) = event {
493 if !tui_connected {
494 io::stdout().write_all(&data)?;
495 io::stdout().flush()?;
496 }
497 output.extend_from_slice(&data);
498 }
499 }
500
501 let final_termination = resolve_termination_type(exit_code, termination);
502 return Ok(build_result(
504 &output,
505 status.success(),
506 Some(exit_code),
507 final_termination,
508 String::new(),
509 ));
510 }
511 }
512
513 should_terminate.store(true, Ordering::SeqCst);
515
516 let status = self
518 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
519 .await?;
520
521 let (success, exit_code, final_termination) = match status {
522 Some(s) => {
523 let code = s.exit_code() as i32;
524 (
525 s.success(),
526 Some(code),
527 resolve_termination_type(code, termination),
528 )
529 }
530 None => {
531 warn!("Timed out waiting for child to exit after termination");
532 (false, None, termination)
533 }
534 };
535
536 Ok(build_result(
538 &output,
539 success,
540 exit_code,
541 final_termination,
542 String::new(),
543 ))
544 }
545
546 pub async fn run_observe_streaming<H: StreamHandler>(
562 &self,
563 prompt: &str,
564 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
565 handler: &mut H,
566 ) -> io::Result<PtyExecutionResult> {
567 let output_format = self.backend.output_format;
569
570 let is_stream_json = output_format == OutputFormat::StreamJson;
573
574 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
576
577 let reader = pair
578 .master
579 .try_clone_reader()
580 .map_err(|e| io::Error::other(e.to_string()))?;
581
582 if let Some(ref input) = stdin_input {
584 tokio::time::sleep(Duration::from_millis(100)).await;
585 let mut writer = pair
586 .master
587 .take_writer()
588 .map_err(|e| io::Error::other(e.to_string()))?;
589 writer.write_all(input.as_bytes())?;
590 writer.write_all(b"\n")?;
591 writer.flush()?;
592 }
593
594 drop(pair.slave);
595
596 let mut output = Vec::new();
597 let mut line_buffer = String::new();
598 let mut extracted_text = String::new();
600 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
601 None
602 } else {
603 Some(Duration::from_secs(u64::from(
604 self.config.idle_timeout_secs,
605 )))
606 };
607
608 let mut termination = TerminationType::Natural;
609 let mut last_activity = Instant::now();
610
611 let should_terminate = Arc::new(AtomicBool::new(false));
612
613 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
615 let should_terminate_reader = Arc::clone(&should_terminate);
616 let tui_connected = self.tui_mode;
617 let tui_output_tx = if tui_connected {
618 Some(self.output_tx.clone())
619 } else {
620 None
621 };
622
623 debug!("Spawning PTY output reader thread (streaming mode)");
624 std::thread::spawn(move || {
625 let mut reader = reader;
626 let mut buf = [0u8; 4096];
627
628 loop {
629 if should_terminate_reader.load(Ordering::SeqCst) {
630 debug!("PTY reader: termination requested");
631 break;
632 }
633
634 match reader.read(&mut buf) {
635 Ok(0) => {
636 debug!("PTY reader: EOF");
637 let _ = output_tx.blocking_send(OutputEvent::Eof);
638 break;
639 }
640 Ok(n) => {
641 let data = buf[..n].to_vec();
642 if let Some(ref tx) = tui_output_tx {
643 let _ = tx.send(data.clone());
644 }
645 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
646 break;
647 }
648 }
649 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
650 std::thread::sleep(Duration::from_millis(10));
651 }
652 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
653 Err(e) => {
654 debug!(error = %e, "PTY reader error");
655 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
656 break;
657 }
658 }
659 }
660 });
661
662 loop {
664 let idle_timeout = timeout_duration.map(|d| {
665 let elapsed = last_activity.elapsed();
666 if elapsed >= d {
667 Duration::from_millis(1)
668 } else {
669 d.saturating_sub(elapsed)
670 }
671 });
672
673 tokio::select! {
674 _ = interrupt_rx.changed() => {
675 if *interrupt_rx.borrow() {
676 debug!("Interrupt received in streaming observe mode, terminating");
677 termination = TerminationType::UserInterrupt;
678 should_terminate.store(true, Ordering::SeqCst);
679 let _ = self.terminate_child(&mut child, true).await;
680 break;
681 }
682 }
683
684 event = output_rx.recv() => {
685 match event {
686 Some(OutputEvent::Data(data)) => {
687 output.extend_from_slice(&data);
688 last_activity = Instant::now();
689
690 if let Ok(text) = std::str::from_utf8(&data) {
691 if is_stream_json {
692 line_buffer.push_str(text);
694
695 while let Some(newline_pos) = line_buffer.find('\n') {
697 let line = line_buffer[..newline_pos].to_string();
698 line_buffer = line_buffer[newline_pos + 1..].to_string();
699
700 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
701 dispatch_stream_event(event, handler, &mut extracted_text);
702 }
703 }
704 } else {
705 handler.on_text(text);
708 }
709 }
710 }
711 Some(OutputEvent::Eof) | None => {
712 debug!("Output channel closed");
713 if is_stream_json && !line_buffer.is_empty()
715 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
716 {
717 dispatch_stream_event(event, handler, &mut extracted_text);
718 }
719 break;
720 }
721 Some(OutputEvent::Error(e)) => {
722 debug!(error = %e, "Reader thread reported error");
723 handler.on_error(&e);
724 break;
725 }
726 }
727 }
728
729 _ = async {
730 if let Some(timeout) = idle_timeout {
731 tokio::time::sleep(timeout).await;
732 } else {
733 std::future::pending::<()>().await;
734 }
735 } => {
736 warn!(
737 timeout_secs = self.config.idle_timeout_secs,
738 "Idle timeout triggered"
739 );
740 termination = TerminationType::IdleTimeout;
741 should_terminate.store(true, Ordering::SeqCst);
742 self.terminate_child(&mut child, true).await?;
743 break;
744 }
745 }
746
747 if let Some(status) = child
749 .try_wait()
750 .map_err(|e| io::Error::other(e.to_string()))?
751 {
752 let exit_code = status.exit_code() as i32;
753 debug!(exit_status = ?status, exit_code, "Child process exited");
754
755 while let Ok(event) = output_rx.try_recv() {
757 if let OutputEvent::Data(data) = event {
758 output.extend_from_slice(&data);
759 if let Ok(text) = std::str::from_utf8(&data) {
760 if is_stream_json {
761 line_buffer.push_str(text);
763 while let Some(newline_pos) = line_buffer.find('\n') {
764 let line = line_buffer[..newline_pos].to_string();
765 line_buffer = line_buffer[newline_pos + 1..].to_string();
766 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
767 dispatch_stream_event(event, handler, &mut extracted_text);
768 }
769 }
770 } else {
771 handler.on_text(text);
773 }
774 }
775 }
776 }
777
778 if is_stream_json
780 && !line_buffer.is_empty()
781 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
782 {
783 dispatch_stream_event(event, handler, &mut extracted_text);
784 }
785
786 let final_termination = resolve_termination_type(exit_code, termination);
787 return Ok(build_result(
789 &output,
790 status.success(),
791 Some(exit_code),
792 final_termination,
793 extracted_text,
794 ));
795 }
796 }
797
798 should_terminate.store(true, Ordering::SeqCst);
799
800 let status = self
801 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
802 .await?;
803
804 let (success, exit_code, final_termination) = match status {
805 Some(s) => {
806 let code = s.exit_code() as i32;
807 (
808 s.success(),
809 Some(code),
810 resolve_termination_type(code, termination),
811 )
812 }
813 None => {
814 warn!("Timed out waiting for child to exit after termination");
815 (false, None, termination)
816 }
817 };
818
819 Ok(build_result(
821 &output,
822 success,
823 exit_code,
824 final_termination,
825 extracted_text,
826 ))
827 }
828
829 #[allow(clippy::too_many_lines)] pub async fn run_interactive(
850 &mut self,
851 prompt: &str,
852 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
853 ) -> io::Result<PtyExecutionResult> {
854 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
856
857 let reader = pair
858 .master
859 .try_clone_reader()
860 .map_err(|e| io::Error::other(e.to_string()))?;
861 let mut writer = pair
862 .master
863 .take_writer()
864 .map_err(|e| io::Error::other(e.to_string()))?;
865
866 let master = pair.master;
868
869 drop(pair.slave);
871
872 let pending_stdin = stdin_input;
874
875 let mut output = Vec::new();
876 let timeout_duration = if self.config.idle_timeout_secs > 0 {
877 Some(Duration::from_secs(u64::from(
878 self.config.idle_timeout_secs,
879 )))
880 } else {
881 None
882 };
883
884 let mut ctrl_c_state = CtrlCState::new();
885 let mut termination = TerminationType::Natural;
886 let mut last_activity = Instant::now();
887
888 let should_terminate = Arc::new(AtomicBool::new(false));
890
891 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
893 let should_terminate_output = Arc::clone(&should_terminate);
894 let tui_connected = self.tui_mode;
896 let tui_output_tx = if tui_connected {
897 Some(self.output_tx.clone())
898 } else {
899 None
900 };
901
902 debug!("Spawning PTY output reader thread");
903 std::thread::spawn(move || {
904 debug!("PTY output reader thread started");
905 let mut reader = reader;
906 let mut buf = [0u8; 4096];
907
908 loop {
909 if should_terminate_output.load(Ordering::SeqCst) {
910 debug!("PTY output reader: termination requested");
911 break;
912 }
913
914 match reader.read(&mut buf) {
915 Ok(0) => {
916 debug!("PTY output reader: EOF received");
918 let _ = output_tx.blocking_send(OutputEvent::Eof);
919 break;
920 }
921 Ok(n) => {
922 let data = buf[..n].to_vec();
923 if let Some(ref tx) = tui_output_tx {
925 let _ = tx.send(data.clone());
926 }
927 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
929 debug!("PTY output reader: channel closed");
930 break;
931 }
932 }
933 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
934 std::thread::sleep(Duration::from_millis(1));
936 }
937 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
938 }
940 Err(e) => {
941 warn!("PTY output reader: error - {}", e);
942 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
943 break;
944 }
945 }
946 }
947 debug!("PTY output reader thread exiting");
948 });
949
950 let mut input_rx = if tui_connected {
955 debug!("TUI connected - skipping stdin reader thread");
956 None
957 } else {
958 let (input_tx, input_rx) = mpsc::unbounded_channel::<InputEvent>();
959 let should_terminate_input = Arc::clone(&should_terminate);
960
961 std::thread::spawn(move || {
962 let mut stdin = io::stdin();
963 let mut buf = [0u8; 1];
964
965 loop {
966 if should_terminate_input.load(Ordering::SeqCst) {
967 break;
968 }
969
970 match stdin.read(&mut buf) {
971 Ok(0) => break, Ok(1) => {
973 let byte = buf[0];
974 let event = match byte {
975 3 => InputEvent::CtrlC, 28 => InputEvent::CtrlBackslash, _ => InputEvent::Data(vec![byte]),
978 };
979 if input_tx.send(event).is_err() {
980 break;
981 }
982 }
983 Ok(_) => {} Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
985 Err(_) => break,
986 }
987 }
988 });
989 Some(input_rx)
990 };
991
992 if let Some(ref input) = pending_stdin {
995 tokio::time::sleep(Duration::from_millis(100)).await;
996 writer.write_all(input.as_bytes())?;
997 writer.write_all(b"\n")?;
998 writer.flush()?;
999 last_activity = Instant::now();
1000 }
1001
1002 loop {
1005 if let Some(status) = child
1007 .try_wait()
1008 .map_err(|e| io::Error::other(e.to_string()))?
1009 {
1010 let exit_code = status.exit_code() as i32;
1011 debug!(exit_status = ?status, exit_code, "Child process exited");
1012
1013 while let Ok(event) = output_rx.try_recv() {
1015 if let OutputEvent::Data(data) = event {
1016 if !tui_connected {
1017 io::stdout().write_all(&data)?;
1018 io::stdout().flush()?;
1019 }
1020 output.extend_from_slice(&data);
1021 }
1022 }
1023
1024 let drain_deadline = Instant::now() + Duration::from_millis(200);
1027 loop {
1028 let remaining = drain_deadline.saturating_duration_since(Instant::now());
1029 if remaining.is_zero() {
1030 break;
1031 }
1032 match tokio::time::timeout(remaining, output_rx.recv()).await {
1033 Ok(Some(OutputEvent::Data(data))) => {
1034 if !tui_connected {
1035 io::stdout().write_all(&data)?;
1036 io::stdout().flush()?;
1037 }
1038 output.extend_from_slice(&data);
1039 }
1040 Ok(Some(OutputEvent::Eof) | None) => break,
1041 Ok(Some(OutputEvent::Error(e))) => {
1042 debug!(error = %e, "PTY read error after exit");
1043 break;
1044 }
1045 Err(_) => break, }
1047 }
1048
1049 should_terminate.store(true, Ordering::SeqCst);
1050 let _ = self.terminated_tx.send(true);
1052
1053 let final_termination = resolve_termination_type(exit_code, termination);
1054 return Ok(build_result(
1056 &output,
1057 status.success(),
1058 Some(exit_code),
1059 final_termination,
1060 String::new(),
1061 ));
1062 }
1063
1064 let timeout_future = async {
1066 match timeout_duration {
1067 Some(d) => {
1068 let elapsed = last_activity.elapsed();
1069 if elapsed >= d {
1070 tokio::time::sleep(Duration::ZERO).await
1071 } else {
1072 tokio::time::sleep(d.saturating_sub(elapsed)).await
1073 }
1074 }
1075 None => std::future::pending::<()>().await,
1076 }
1077 };
1078
1079 tokio::select! {
1080 output_event = output_rx.recv() => {
1082 match output_event {
1083 Some(OutputEvent::Data(data)) => {
1084 if !tui_connected {
1086 io::stdout().write_all(&data)?;
1087 io::stdout().flush()?;
1088 }
1089 output.extend_from_slice(&data);
1090
1091 last_activity = Instant::now();
1092 }
1093 Some(OutputEvent::Eof) => {
1094 debug!("PTY EOF received");
1095 break;
1096 }
1097 Some(OutputEvent::Error(e)) => {
1098 debug!(error = %e, "PTY read error");
1099 break;
1100 }
1101 None => {
1102 break;
1104 }
1105 }
1106 }
1107
1108 input_event = async {
1110 match input_rx.as_mut() {
1111 Some(rx) => rx.recv().await,
1112 None => std::future::pending().await, }
1114 } => {
1115 match input_event {
1116 Some(InputEvent::CtrlC) => {
1117 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1118 CtrlCAction::ForwardAndStartWindow => {
1119 let _ = writer.write_all(&[3]);
1121 let _ = writer.flush();
1122 last_activity = Instant::now();
1123 }
1124 CtrlCAction::Terminate => {
1125 info!("Double Ctrl+C detected, terminating");
1126 termination = TerminationType::UserInterrupt;
1127 should_terminate.store(true, Ordering::SeqCst);
1128 self.terminate_child(&mut child, true).await?;
1129 break;
1130 }
1131 }
1132 }
1133 Some(InputEvent::CtrlBackslash) => {
1134 info!("Ctrl+\\ detected, force killing");
1135 termination = TerminationType::ForceKill;
1136 should_terminate.store(true, Ordering::SeqCst);
1137 self.terminate_child(&mut child, false).await?;
1138 break;
1139 }
1140 Some(InputEvent::Data(data)) => {
1141 let _ = writer.write_all(&data);
1143 let _ = writer.flush();
1144 last_activity = Instant::now();
1145 }
1146 None => {
1147 debug!("Input channel closed");
1149 }
1150 }
1151 }
1152
1153 tui_input = self.input_rx.recv() => {
1155 if let Some(data) = tui_input {
1156 match InputEvent::from_bytes(data) {
1157 InputEvent::CtrlC => {
1158 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1159 CtrlCAction::ForwardAndStartWindow => {
1160 let _ = writer.write_all(&[3]);
1161 let _ = writer.flush();
1162 last_activity = Instant::now();
1163 }
1164 CtrlCAction::Terminate => {
1165 info!("Double Ctrl+C detected, terminating");
1166 termination = TerminationType::UserInterrupt;
1167 should_terminate.store(true, Ordering::SeqCst);
1168 self.terminate_child(&mut child, true).await?;
1169 break;
1170 }
1171 }
1172 }
1173 InputEvent::CtrlBackslash => {
1174 info!("Ctrl+\\ detected, force killing");
1175 termination = TerminationType::ForceKill;
1176 should_terminate.store(true, Ordering::SeqCst);
1177 self.terminate_child(&mut child, false).await?;
1178 break;
1179 }
1180 InputEvent::Data(bytes) => {
1181 let _ = writer.write_all(&bytes);
1182 let _ = writer.flush();
1183 last_activity = Instant::now();
1184 }
1185 }
1186 }
1187 }
1188
1189 control_cmd = self.control_rx.recv() => {
1191 if let Some(cmd) = control_cmd {
1192 use crate::pty_handle::ControlCommand;
1193 match cmd {
1194 ControlCommand::Kill => {
1195 info!("Control command: Kill");
1196 termination = TerminationType::UserInterrupt;
1197 should_terminate.store(true, Ordering::SeqCst);
1198 self.terminate_child(&mut child, true).await?;
1199 break;
1200 }
1201 ControlCommand::Resize(cols, rows) => {
1202 debug!(cols, rows, "Control command: Resize");
1203 if let Err(e) = master.resize(PtySize {
1205 rows,
1206 cols,
1207 pixel_width: 0,
1208 pixel_height: 0,
1209 }) {
1210 warn!("Failed to resize PTY: {}", e);
1211 }
1212 }
1213 ControlCommand::Skip | ControlCommand::Abort => {
1214 debug!("Control command: {:?} (ignored at PTY level)", cmd);
1216 }
1217 }
1218 }
1219 }
1220
1221 _ = timeout_future => {
1223 warn!(
1224 timeout_secs = self.config.idle_timeout_secs,
1225 "Idle timeout triggered"
1226 );
1227 termination = TerminationType::IdleTimeout;
1228 should_terminate.store(true, Ordering::SeqCst);
1229 self.terminate_child(&mut child, true).await?;
1230 break;
1231 }
1232
1233 _ = interrupt_rx.changed() => {
1235 if *interrupt_rx.borrow() {
1236 debug!("Interrupt received in interactive mode, terminating");
1237 termination = TerminationType::UserInterrupt;
1238 should_terminate.store(true, Ordering::SeqCst);
1239 self.terminate_child(&mut child, true).await?;
1240 break;
1241 }
1242 }
1243 }
1244 }
1245
1246 should_terminate.store(true, Ordering::SeqCst);
1248
1249 let _ = self.terminated_tx.send(true);
1251
1252 let status = self
1254 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1255 .await?;
1256
1257 let (success, exit_code, final_termination) = match status {
1258 Some(s) => {
1259 let code = s.exit_code() as i32;
1260 (
1261 s.success(),
1262 Some(code),
1263 resolve_termination_type(code, termination),
1264 )
1265 }
1266 None => {
1267 warn!("Timed out waiting for child to exit after termination");
1268 (false, None, termination)
1269 }
1270 };
1271
1272 Ok(build_result(
1274 &output,
1275 success,
1276 exit_code,
1277 final_termination,
1278 String::new(),
1279 ))
1280 }
1281
1282 #[allow(clippy::unused_self)] #[allow(clippy::unused_async)] #[cfg(not(unix))]
1293 async fn terminate_child(
1294 &self,
1295 child: &mut Box<dyn portable_pty::Child + Send>,
1296 _graceful: bool,
1297 ) -> io::Result<()> {
1298 child.kill()
1299 }
1300
1301 #[cfg(unix)]
1302 async fn terminate_child(
1303 &self,
1304 child: &mut Box<dyn portable_pty::Child + Send>,
1305 graceful: bool,
1306 ) -> io::Result<()> {
1307 let pid = match child.process_id() {
1308 Some(id) => Pid::from_raw(id as i32),
1309 None => return Ok(()), };
1311
1312 if graceful {
1313 debug!(pid = %pid, "Sending SIGTERM");
1314 let _ = kill(pid, Signal::SIGTERM);
1315
1316 let grace_period = Duration::from_secs(2);
1318 let start = Instant::now();
1319
1320 while start.elapsed() < grace_period {
1321 if child
1322 .try_wait()
1323 .map_err(|e| io::Error::other(e.to_string()))?
1324 .is_some()
1325 {
1326 return Ok(());
1327 }
1328 tokio::time::sleep(Duration::from_millis(50)).await;
1330 }
1331
1332 debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1334 }
1335
1336 debug!(pid = %pid, "Sending SIGKILL");
1337 let _ = kill(pid, Signal::SIGKILL);
1338 Ok(())
1339 }
1340
1341 async fn wait_for_exit(
1346 &self,
1347 child: &mut Box<dyn portable_pty::Child + Send>,
1348 max_wait: Option<Duration>,
1349 interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1350 ) -> io::Result<Option<portable_pty::ExitStatus>> {
1351 let start = Instant::now();
1352
1353 loop {
1354 if let Some(status) = child
1355 .try_wait()
1356 .map_err(|e| io::Error::other(e.to_string()))?
1357 {
1358 return Ok(Some(status));
1359 }
1360
1361 if let Some(max) = max_wait
1362 && start.elapsed() >= max
1363 {
1364 return Ok(None);
1365 }
1366
1367 tokio::select! {
1368 _ = interrupt_rx.changed() => {
1369 if *interrupt_rx.borrow() {
1370 debug!("Interrupt received while waiting for child exit");
1371 return Ok(None);
1372 }
1373 }
1374 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1375 }
1376 }
1377 }
1378}
1379
1380#[derive(Debug)]
1382enum InputEvent {
1383 CtrlC,
1385 CtrlBackslash,
1387 Data(Vec<u8>),
1389}
1390
1391impl InputEvent {
1392 fn from_bytes(data: Vec<u8>) -> Self {
1394 if data.len() == 1 {
1395 match data[0] {
1396 3 => return InputEvent::CtrlC,
1397 28 => return InputEvent::CtrlBackslash,
1398 _ => {}
1399 }
1400 }
1401 InputEvent::Data(data)
1402 }
1403}
1404
1405#[derive(Debug)]
1407enum OutputEvent {
1408 Data(Vec<u8>),
1410 Eof,
1412 Error(String),
1414}
1415
1416fn strip_ansi(bytes: &[u8]) -> String {
1422 let stripped = strip_ansi_escapes::strip(bytes);
1423 String::from_utf8_lossy(&stripped).into_owned()
1424}
1425
1426fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1430 if exit_code == 130 {
1431 info!("Child process killed by SIGINT");
1432 TerminationType::UserInterrupt
1433 } else {
1434 default
1435 }
1436}
1437
1438fn dispatch_stream_event<H: StreamHandler>(
1441 event: ClaudeStreamEvent,
1442 handler: &mut H,
1443 extracted_text: &mut String,
1444) {
1445 match event {
1446 ClaudeStreamEvent::System { .. } => {
1447 }
1449 ClaudeStreamEvent::Assistant { message, .. } => {
1450 for block in message.content {
1451 match block {
1452 ContentBlock::Text { text } => {
1453 handler.on_text(&text);
1454 extracted_text.push_str(&text);
1456 extracted_text.push('\n');
1457 }
1458 ContentBlock::ToolUse { name, id, input } => {
1459 handler.on_tool_call(&name, &id, &input)
1460 }
1461 }
1462 }
1463 }
1464 ClaudeStreamEvent::User { message } => {
1465 for block in message.content {
1466 match block {
1467 UserContentBlock::ToolResult {
1468 tool_use_id,
1469 content,
1470 } => {
1471 handler.on_tool_result(&tool_use_id, &content);
1472 }
1473 }
1474 }
1475 }
1476 ClaudeStreamEvent::Result {
1477 duration_ms,
1478 total_cost_usd,
1479 num_turns,
1480 is_error,
1481 } => {
1482 if is_error {
1483 handler.on_error("Session ended with error");
1484 }
1485 handler.on_complete(&SessionResult {
1486 duration_ms,
1487 total_cost_usd,
1488 num_turns,
1489 is_error,
1490 });
1491 }
1492 }
1493}
1494
1495fn build_result(
1504 output: &[u8],
1505 success: bool,
1506 exit_code: Option<i32>,
1507 termination: TerminationType,
1508 extracted_text: String,
1509) -> PtyExecutionResult {
1510 PtyExecutionResult {
1511 output: String::from_utf8_lossy(output).to_string(),
1512 stripped_output: strip_ansi(output),
1513 extracted_text,
1514 success,
1515 exit_code,
1516 termination,
1517 }
1518}
1519
1520#[cfg(test)]
1521mod tests {
1522 use super::*;
1523 use crate::claude_stream::{AssistantMessage, UserMessage};
1524 #[cfg(unix)]
1525 use crate::cli_backend::PromptMode;
1526 use crate::stream_handler::{SessionResult, StreamHandler};
1527 #[cfg(unix)]
1528 use tempfile::TempDir;
1529
1530 #[test]
1531 fn test_double_ctrl_c_within_window() {
1532 let mut state = CtrlCState::new();
1533 let now = Instant::now();
1534
1535 let action = state.handle_ctrl_c(now);
1537 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1538
1539 let later = now + Duration::from_millis(500);
1541 let action = state.handle_ctrl_c(later);
1542 assert_eq!(action, CtrlCAction::Terminate);
1543 }
1544
1545 #[test]
1546 fn test_input_event_from_bytes_ctrl_c() {
1547 let event = InputEvent::from_bytes(vec![3]);
1548 assert!(matches!(event, InputEvent::CtrlC));
1549 }
1550
1551 #[test]
1552 fn test_input_event_from_bytes_ctrl_backslash() {
1553 let event = InputEvent::from_bytes(vec![28]);
1554 assert!(matches!(event, InputEvent::CtrlBackslash));
1555 }
1556
1557 #[test]
1558 fn test_input_event_from_bytes_data() {
1559 let event = InputEvent::from_bytes(vec![b'a']);
1560 assert!(matches!(event, InputEvent::Data(_)));
1561
1562 let event = InputEvent::from_bytes(vec![1, 2, 3]);
1563 assert!(matches!(event, InputEvent::Data(_)));
1564 }
1565
1566 #[test]
1567 fn test_ctrl_c_window_expires() {
1568 let mut state = CtrlCState::new();
1569 let now = Instant::now();
1570
1571 state.handle_ctrl_c(now);
1573
1574 let later = now + Duration::from_secs(2);
1576
1577 let action = state.handle_ctrl_c(later);
1579 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1580 }
1581
1582 #[test]
1583 fn test_strip_ansi_basic() {
1584 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n";
1585 let stripped = strip_ansi(input);
1586 assert!(stripped.contains("Thinking..."));
1587 assert!(!stripped.contains("\x1b["));
1588 }
1589
1590 #[test]
1591 fn test_completion_promise_extraction() {
1592 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n\
1594 \x1b[2K\x1b[1;32m Done!\x1b[0m\r\n\
1595 \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
1596
1597 let stripped = strip_ansi(input);
1598
1599 assert!(stripped.contains("LOOP_COMPLETE"));
1601 assert!(!stripped.contains("\x1b["));
1602 }
1603
1604 #[test]
1605 fn test_event_tag_extraction() {
1606 let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
1608 Task completed successfully\r\n\
1609 \x1b[90m</event>\x1b[0m\r\n";
1610
1611 let stripped = strip_ansi(input);
1612
1613 assert!(stripped.contains("<event topic=\"build.done\">"));
1614 assert!(stripped.contains("</event>"));
1615 }
1616
1617 #[test]
1618 fn test_large_output_preserves_early_events() {
1619 let mut input = Vec::new();
1621
1622 input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
1624
1625 for i in 0..500 {
1627 input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
1628 }
1629
1630 let stripped = strip_ansi(&input);
1631
1632 assert!(
1634 stripped.contains("<event topic=\"build.task\">"),
1635 "Event tag was lost - strip_ansi is not preserving all content"
1636 );
1637 assert!(stripped.contains("Implement feature X"));
1638 assert!(stripped.contains("Line 499")); }
1640
1641 #[test]
1642 fn test_pty_config_defaults() {
1643 let config = PtyConfig::default();
1644 assert!(config.interactive);
1645 assert_eq!(config.idle_timeout_secs, 30);
1646 assert_eq!(config.cols, 80);
1647 assert_eq!(config.rows, 24);
1648 }
1649
1650 #[test]
1651 fn test_pty_config_from_env_matches_env_or_defaults() {
1652 let cols = std::env::var("COLUMNS")
1653 .ok()
1654 .and_then(|value| value.parse::<u16>().ok())
1655 .unwrap_or(80);
1656 let rows = std::env::var("LINES")
1657 .ok()
1658 .and_then(|value| value.parse::<u16>().ok())
1659 .unwrap_or(24);
1660
1661 let config = PtyConfig::from_env();
1662 assert_eq!(config.cols, cols);
1663 assert_eq!(config.rows, rows);
1664 }
1665
1666 #[test]
1674 fn test_idle_timeout_reset_logic() {
1675 let timeout_duration = Duration::from_secs(30);
1677
1678 let simulated_25s = Duration::from_secs(25);
1680
1681 let remaining = timeout_duration.saturating_sub(simulated_25s);
1683 assert_eq!(remaining.as_secs(), 5);
1684
1685 let last_activity_after_reset = Instant::now();
1687
1688 let elapsed = last_activity_after_reset.elapsed();
1690 assert!(elapsed < Duration::from_millis(100)); let new_remaining = timeout_duration.saturating_sub(elapsed);
1694 assert!(new_remaining > Duration::from_secs(29)); }
1696
1697 #[test]
1698 fn test_extracted_text_field_exists() {
1699 let result = PtyExecutionResult {
1702 output: String::new(),
1703 stripped_output: String::new(),
1704 extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
1705 success: true,
1706 exit_code: Some(0),
1707 termination: TerminationType::Natural,
1708 };
1709
1710 assert!(
1711 result
1712 .extracted_text
1713 .contains("<event topic=\"build.done\">")
1714 );
1715 }
1716
1717 #[test]
1718 fn test_build_result_includes_extracted_text() {
1719 let output = b"raw output";
1721 let extracted = "extracted text with <event topic=\"test\">payload</event>";
1722 let result = build_result(
1723 output,
1724 true,
1725 Some(0),
1726 TerminationType::Natural,
1727 extracted.to_string(),
1728 );
1729
1730 assert_eq!(result.extracted_text, extracted);
1731 assert!(result.stripped_output.contains("raw output"));
1732 }
1733
1734 #[test]
1735 fn test_resolve_termination_type_handles_sigint_exit_code() {
1736 let termination = resolve_termination_type(130, TerminationType::Natural);
1737 assert_eq!(termination, TerminationType::UserInterrupt);
1738
1739 let termination = resolve_termination_type(0, TerminationType::ForceKill);
1740 assert_eq!(termination, TerminationType::ForceKill);
1741 }
1742
1743 #[derive(Default)]
1744 struct CapturingHandler {
1745 texts: Vec<String>,
1746 tool_calls: Vec<(String, String, serde_json::Value)>,
1747 tool_results: Vec<(String, String)>,
1748 errors: Vec<String>,
1749 completions: Vec<SessionResult>,
1750 }
1751
1752 impl StreamHandler for CapturingHandler {
1753 fn on_text(&mut self, text: &str) {
1754 self.texts.push(text.to_string());
1755 }
1756
1757 fn on_tool_call(&mut self, name: &str, id: &str, input: &serde_json::Value) {
1758 self.tool_calls
1759 .push((name.to_string(), id.to_string(), input.clone()));
1760 }
1761
1762 fn on_tool_result(&mut self, id: &str, output: &str) {
1763 self.tool_results.push((id.to_string(), output.to_string()));
1764 }
1765
1766 fn on_error(&mut self, error: &str) {
1767 self.errors.push(error.to_string());
1768 }
1769
1770 fn on_complete(&mut self, result: &SessionResult) {
1771 self.completions.push(result.clone());
1772 }
1773 }
1774
1775 #[test]
1776 fn test_dispatch_stream_event_routes_text_and_tool_calls() {
1777 let mut handler = CapturingHandler::default();
1778 let mut extracted_text = String::new();
1779
1780 let event = ClaudeStreamEvent::Assistant {
1781 message: AssistantMessage {
1782 content: vec![
1783 ContentBlock::Text {
1784 text: "Hello".to_string(),
1785 },
1786 ContentBlock::ToolUse {
1787 id: "tool-1".to_string(),
1788 name: "Read".to_string(),
1789 input: serde_json::json!({"path": "README.md"}),
1790 },
1791 ],
1792 },
1793 usage: None,
1794 };
1795
1796 dispatch_stream_event(event, &mut handler, &mut extracted_text);
1797
1798 assert_eq!(handler.texts, vec!["Hello".to_string()]);
1799 assert_eq!(handler.tool_calls.len(), 1);
1800 assert!(extracted_text.contains("Hello"));
1801 assert!(extracted_text.ends_with('\n'));
1802 }
1803
1804 #[test]
1805 fn test_dispatch_stream_event_routes_tool_results_and_completion() {
1806 let mut handler = CapturingHandler::default();
1807 let mut extracted_text = String::new();
1808
1809 let event = ClaudeStreamEvent::User {
1810 message: UserMessage {
1811 content: vec![UserContentBlock::ToolResult {
1812 tool_use_id: "tool-1".to_string(),
1813 content: "done".to_string(),
1814 }],
1815 },
1816 };
1817
1818 dispatch_stream_event(event, &mut handler, &mut extracted_text);
1819 assert_eq!(handler.tool_results.len(), 1);
1820 assert_eq!(handler.tool_results[0].0, "tool-1");
1821 assert_eq!(handler.tool_results[0].1, "done");
1822
1823 let event = ClaudeStreamEvent::Result {
1824 duration_ms: 12,
1825 total_cost_usd: 0.01,
1826 num_turns: 2,
1827 is_error: true,
1828 };
1829
1830 dispatch_stream_event(event, &mut handler, &mut extracted_text);
1831 assert_eq!(handler.errors.len(), 1);
1832 assert_eq!(handler.completions.len(), 1);
1833 assert!(handler.completions[0].is_error);
1834 }
1835
1836 #[test]
1837 fn test_dispatch_stream_event_system_noop() {
1838 let mut handler = CapturingHandler::default();
1839 let mut extracted_text = String::new();
1840
1841 let event = ClaudeStreamEvent::System {
1842 session_id: "session-1".to_string(),
1843 model: "claude-test".to_string(),
1844 tools: Vec::new(),
1845 };
1846
1847 dispatch_stream_event(event, &mut handler, &mut extracted_text);
1848
1849 assert!(handler.texts.is_empty());
1850 assert!(handler.tool_calls.is_empty());
1851 assert!(handler.tool_results.is_empty());
1852 assert!(handler.errors.is_empty());
1853 assert!(handler.completions.is_empty());
1854 assert!(extracted_text.is_empty());
1855 }
1856
1857 #[test]
1872 fn test_tui_mode_stdin_reader_bypass() {
1873 let tui_mode = true;
1879 let tui_connected = tui_mode;
1880
1881 assert!(
1884 tui_connected,
1885 "When tui_mode is true, stdin reader must be skipped"
1886 );
1887
1888 let tui_mode_disabled = false;
1890 let tui_connected_non_tui = tui_mode_disabled;
1891 assert!(
1892 !tui_connected_non_tui,
1893 "When tui_mode is false, stdin reader must be spawned"
1894 );
1895 }
1896
1897 #[test]
1898 fn test_tui_mode_default_is_false() {
1899 let backend = CliBackend::claude();
1901 let config = PtyConfig::default();
1902 let executor = PtyExecutor::new(backend, config);
1903
1904 assert!(!executor.tui_mode, "tui_mode should default to false");
1906 }
1907
1908 #[test]
1909 fn test_set_tui_mode() {
1910 let backend = CliBackend::claude();
1912 let config = PtyConfig::default();
1913 let mut executor = PtyExecutor::new(backend, config);
1914
1915 assert!(!executor.tui_mode, "tui_mode should start as false");
1917
1918 executor.set_tui_mode(true);
1920 assert!(
1921 executor.tui_mode,
1922 "tui_mode should be true after set_tui_mode(true)"
1923 );
1924
1925 executor.set_tui_mode(false);
1927 assert!(
1928 !executor.tui_mode,
1929 "tui_mode should be false after set_tui_mode(false)"
1930 );
1931 }
1932
1933 #[test]
1934 fn test_build_result_populates_fields() {
1935 let output = b"\x1b[31mHello\x1b[0m\n";
1936 let extracted = "extracted text".to_string();
1937
1938 let result = build_result(
1939 output,
1940 true,
1941 Some(0),
1942 TerminationType::Natural,
1943 extracted.clone(),
1944 );
1945
1946 assert_eq!(result.output, String::from_utf8_lossy(output));
1947 assert!(result.stripped_output.contains("Hello"));
1948 assert!(!result.stripped_output.contains("\x1b["));
1949 assert_eq!(result.extracted_text, extracted);
1950 assert!(result.success);
1951 assert_eq!(result.exit_code, Some(0));
1952 assert_eq!(result.termination, TerminationType::Natural);
1953 }
1954
1955 #[cfg(unix)]
1956 #[tokio::test]
1957 async fn test_run_observe_executes_arg_prompt() {
1958 let temp_dir = TempDir::new().expect("temp dir");
1959 let backend = CliBackend {
1960 command: "sh".to_string(),
1961 args: vec!["-c".to_string()],
1962 prompt_mode: PromptMode::Arg,
1963 prompt_flag: None,
1964 output_format: OutputFormat::Text,
1965 };
1966 let config = PtyConfig {
1967 interactive: false,
1968 idle_timeout_secs: 0,
1969 cols: 80,
1970 rows: 24,
1971 workspace_root: temp_dir.path().to_path_buf(),
1972 };
1973 let executor = PtyExecutor::new(backend, config);
1974 let (_tx, rx) = tokio::sync::watch::channel(false);
1975
1976 let result = executor
1977 .run_observe("echo hello-pty", rx)
1978 .await
1979 .expect("run_observe");
1980
1981 assert!(result.success);
1982 assert!(result.output.contains("hello-pty"));
1983 assert!(result.stripped_output.contains("hello-pty"));
1984 assert_eq!(result.exit_code, Some(0));
1985 assert_eq!(result.termination, TerminationType::Natural);
1986 }
1987
1988 #[cfg(unix)]
1989 #[tokio::test]
1990 async fn test_run_observe_writes_stdin_prompt() {
1991 let temp_dir = TempDir::new().expect("temp dir");
1992 let backend = CliBackend {
1993 command: "sh".to_string(),
1994 args: vec!["-c".to_string(), "read line; echo \"$line\"".to_string()],
1995 prompt_mode: PromptMode::Stdin,
1996 prompt_flag: None,
1997 output_format: OutputFormat::Text,
1998 };
1999 let config = PtyConfig {
2000 interactive: false,
2001 idle_timeout_secs: 0,
2002 cols: 80,
2003 rows: 24,
2004 workspace_root: temp_dir.path().to_path_buf(),
2005 };
2006 let executor = PtyExecutor::new(backend, config);
2007 let (_tx, rx) = tokio::sync::watch::channel(false);
2008
2009 let result = executor
2010 .run_observe("stdin-line", rx)
2011 .await
2012 .expect("run_observe");
2013
2014 assert!(result.success);
2015 assert!(result.output.contains("stdin-line"));
2016 assert!(result.stripped_output.contains("stdin-line"));
2017 assert_eq!(result.termination, TerminationType::Natural);
2018 }
2019
2020 #[cfg(unix)]
2021 #[tokio::test]
2022 async fn test_run_observe_streaming_text_routes_output() {
2023 let temp_dir = TempDir::new().expect("temp dir");
2024 let backend = CliBackend {
2025 command: "sh".to_string(),
2026 args: vec!["-c".to_string()],
2027 prompt_mode: PromptMode::Arg,
2028 prompt_flag: None,
2029 output_format: OutputFormat::Text,
2030 };
2031 let config = PtyConfig {
2032 interactive: false,
2033 idle_timeout_secs: 0,
2034 cols: 80,
2035 rows: 24,
2036 workspace_root: temp_dir.path().to_path_buf(),
2037 };
2038 let executor = PtyExecutor::new(backend, config);
2039 let (_tx, rx) = tokio::sync::watch::channel(false);
2040 let mut handler = CapturingHandler::default();
2041
2042 let result = executor
2043 .run_observe_streaming("printf 'alpha\\nbeta\\n'", rx, &mut handler)
2044 .await
2045 .expect("run_observe_streaming");
2046
2047 assert!(result.success);
2048 let captured = handler.texts.join("");
2049 assert!(captured.contains("alpha"), "captured: {captured}");
2050 assert!(captured.contains("beta"), "captured: {captured}");
2051 assert!(handler.completions.is_empty());
2052 assert!(result.extracted_text.is_empty());
2053 }
2054
2055 #[cfg(unix)]
2056 #[tokio::test]
2057 async fn test_run_observe_streaming_parses_stream_json() {
2058 let temp_dir = TempDir::new().expect("temp dir");
2059 let backend = CliBackend {
2060 command: "sh".to_string(),
2061 args: vec!["-c".to_string()],
2062 prompt_mode: PromptMode::Arg,
2063 prompt_flag: None,
2064 output_format: OutputFormat::StreamJson,
2065 };
2066 let config = PtyConfig {
2067 interactive: false,
2068 idle_timeout_secs: 0,
2069 cols: 80,
2070 rows: 24,
2071 workspace_root: temp_dir.path().to_path_buf(),
2072 };
2073 let executor = PtyExecutor::new(backend, config);
2074 let (_tx, rx) = tokio::sync::watch::channel(false);
2075 let mut handler = CapturingHandler::default();
2076
2077 let script = r#"printf '%s\n' '{"type":"assistant","message":{"content":[{"type":"text","text":"Hello stream"}]}}' '{"type":"result","duration_ms":1,"total_cost_usd":0.0,"num_turns":1,"is_error":false}'"#;
2078 let result = executor
2079 .run_observe_streaming(script, rx, &mut handler)
2080 .await
2081 .expect("run_observe_streaming");
2082
2083 assert!(result.success);
2084 assert!(
2085 handler
2086 .texts
2087 .iter()
2088 .any(|text| text.contains("Hello stream"))
2089 );
2090 assert_eq!(handler.completions.len(), 1);
2091 assert!(result.extracted_text.contains("Hello stream"));
2092 assert_eq!(result.termination, TerminationType::Natural);
2093 }
2094
2095 #[cfg(unix)]
2096 #[tokio::test]
2097 async fn test_run_interactive_in_tui_mode() {
2098 let temp_dir = TempDir::new().expect("temp dir");
2099 let backend = CliBackend {
2100 command: "sh".to_string(),
2101 args: vec!["-c".to_string()],
2102 prompt_mode: PromptMode::Arg,
2103 prompt_flag: None,
2104 output_format: OutputFormat::Text,
2105 };
2106 let config = PtyConfig {
2107 interactive: true,
2108 idle_timeout_secs: 0,
2109 cols: 80,
2110 rows: 24,
2111 workspace_root: temp_dir.path().to_path_buf(),
2112 };
2113 let mut executor = PtyExecutor::new(backend, config);
2114 executor.set_tui_mode(true);
2115 let (_tx, rx) = tokio::sync::watch::channel(false);
2116
2117 let result = executor
2118 .run_interactive("echo hello-tui", rx)
2119 .await
2120 .expect("run_interactive");
2121
2122 assert!(result.success);
2123 assert!(result.output.contains("hello-tui"));
2124 assert!(result.stripped_output.contains("hello-tui"));
2125 assert_eq!(result.exit_code, Some(0));
2126 assert_eq!(result.termination, TerminationType::Natural);
2127 }
2128}