1#![allow(clippy::cast_possible_wrap)]
20
21use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
22use crate::cli_backend::{CliBackend, OutputFormat};
23use crate::copilot_stream::{
24 CopilotStreamParser, CopilotStreamState, dispatch_copilot_stream_event,
25};
26use crate::pi_stream::{PiSessionState, PiStreamParser, dispatch_pi_stream_event};
27use crate::stream_handler::{SessionResult, StreamHandler};
28#[cfg(unix)]
29use nix::sys::signal::{Signal, kill};
30#[cfg(unix)]
31use nix::unistd::Pid;
32use portable_pty::{CommandBuilder, PtyPair, PtySize, native_pty_system};
33use std::env;
34use std::io::{self, Read, Write};
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::{Duration, Instant};
38use tokio::sync::{mpsc, watch};
39use tracing::{debug, info, warn};
40
41#[derive(Debug)]
43pub struct PtyExecutionResult {
44 pub output: String,
46 pub stripped_output: String,
48 pub extracted_text: String,
54 pub success: bool,
56 pub exit_code: Option<i32>,
58 pub termination: TerminationType,
60 pub total_cost_usd: f64,
62 pub input_tokens: u64,
64 pub output_tokens: u64,
66 pub cache_read_tokens: u64,
68 pub cache_write_tokens: u64,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum TerminationType {
75 Natural,
77 IdleTimeout,
79 UserInterrupt,
81 ForceKill,
83}
84
85#[derive(Debug, Clone)]
87pub struct PtyConfig {
88 pub interactive: bool,
90 pub idle_timeout_secs: u32,
92 pub cols: u16,
94 pub rows: u16,
96 pub workspace_root: std::path::PathBuf,
100}
101
102impl Default for PtyConfig {
103 fn default() -> Self {
104 Self {
105 interactive: true,
106 idle_timeout_secs: 30,
107 cols: 80,
108 rows: 24,
109 workspace_root: std::env::current_dir()
110 .unwrap_or_else(|_| std::path::PathBuf::from(".")),
111 }
112 }
113}
114
115impl PtyConfig {
116 pub fn from_env() -> Self {
118 let cols = std::env::var("COLUMNS")
119 .ok()
120 .and_then(|s| s.parse().ok())
121 .unwrap_or(80);
122 let rows = std::env::var("LINES")
123 .ok()
124 .and_then(|s| s.parse().ok())
125 .unwrap_or(24);
126
127 Self {
128 cols,
129 rows,
130 ..Default::default()
131 }
132 }
133
134 pub fn with_workspace_root(mut self, root: impl Into<std::path::PathBuf>) -> Self {
136 self.workspace_root = root.into();
137 self
138 }
139}
140
141#[derive(Debug)]
143pub struct CtrlCState {
144 first_press: Option<Instant>,
146 window: Duration,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum CtrlCAction {
153 ForwardAndStartWindow,
155 Terminate,
157}
158
159impl CtrlCState {
160 pub fn new() -> Self {
162 Self {
163 first_press: None,
164 window: Duration::from_secs(1),
165 }
166 }
167
168 pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
170 match self.first_press {
171 Some(first) if now.duration_since(first) < self.window => {
172 self.first_press = None;
174 CtrlCAction::Terminate
175 }
176 _ => {
177 self.first_press = Some(now);
179 CtrlCAction::ForwardAndStartWindow
180 }
181 }
182 }
183}
184
185impl Default for CtrlCState {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191pub struct PtyExecutor {
193 backend: CliBackend,
194 config: PtyConfig,
195 output_tx: mpsc::UnboundedSender<Vec<u8>>,
197 output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
198 input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
199 input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
200 control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
201 control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
202 terminated_tx: watch::Sender<bool>,
204 terminated_rx: Option<watch::Receiver<bool>>,
205 tui_mode: bool,
209}
210
211impl PtyExecutor {
212 pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
214 let (output_tx, output_rx) = mpsc::unbounded_channel();
215 let (input_tx, input_rx) = mpsc::unbounded_channel();
216 let (control_tx, control_rx) = mpsc::unbounded_channel();
217 let (terminated_tx, terminated_rx) = watch::channel(false);
218
219 Self {
220 backend,
221 config,
222 output_tx,
223 output_rx: Some(output_rx),
224 input_tx: Some(input_tx),
225 input_rx,
226 control_tx: Some(control_tx),
227 control_rx,
228 terminated_tx,
229 terminated_rx: Some(terminated_rx),
230 tui_mode: false,
231 }
232 }
233
234 pub fn set_tui_mode(&mut self, enabled: bool) {
243 self.tui_mode = enabled;
244 }
245
246 pub fn set_backend(&mut self, backend: CliBackend) {
254 self.backend = backend;
255 }
256
257 pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
261 crate::pty_handle::PtyHandle {
262 output_rx: self.output_rx.take().expect("handle() already called"),
263 input_tx: self.input_tx.take().expect("handle() already called"),
264 control_tx: self.control_tx.take().expect("handle() already called"),
265 terminated_rx: self.terminated_rx.take().expect("handle() already called"),
266 }
267 }
268
269 fn spawn_pty(
278 &self,
279 prompt: &str,
280 ) -> io::Result<(
281 PtyPair,
282 Box<dyn portable_pty::Child + Send>,
283 Option<String>,
284 Option<tempfile::NamedTempFile>,
285 )> {
286 let pty_system = native_pty_system();
287
288 let pair = pty_system
289 .openpty(PtySize {
290 rows: self.config.rows,
291 cols: self.config.cols,
292 pixel_width: 0,
293 pixel_height: 0,
294 })
295 .map_err(|e| io::Error::other(e.to_string()))?;
296
297 let use_pty_safe = !self.config.interactive && prompt.len() > 4000;
304 let (cmd, args, stdin_input, temp_file) = if use_pty_safe {
305 self.backend.build_command_pty(prompt)
306 } else {
307 self.backend.build_command(prompt, self.config.interactive)
308 };
309
310 let mut cmd_builder = CommandBuilder::new(&cmd);
311 cmd_builder.args(&args);
312
313 cmd_builder.cwd(&self.config.workspace_root);
316
317 cmd_builder.env("TERM", "xterm-256color");
319 inject_ralph_runtime_env(&mut cmd_builder, &self.config.workspace_root);
320
321 for (key, value) in &self.backend.env_vars {
323 cmd_builder.env(key, value);
324 }
325 let child = pair
326 .slave
327 .spawn_command(cmd_builder)
328 .map_err(|e| io::Error::other(e.to_string()))?;
329
330 Ok((pair, child, stdin_input, temp_file))
332 }
333
334 pub async fn run_observe(
351 &self,
352 prompt: &str,
353 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
354 ) -> io::Result<PtyExecutionResult> {
355 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
357
358 let reader = pair
359 .master
360 .try_clone_reader()
361 .map_err(|e| io::Error::other(e.to_string()))?;
362
363 if let Some(ref input) = stdin_input {
365 tokio::time::sleep(Duration::from_millis(100)).await;
367 let mut writer = pair
368 .master
369 .take_writer()
370 .map_err(|e| io::Error::other(e.to_string()))?;
371 writer.write_all(input.as_bytes())?;
372 writer.write_all(b"\n")?;
373 writer.flush()?;
374 }
375
376 drop(pair.slave);
378
379 let mut output = Vec::new();
380 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
381 None
382 } else {
383 Some(Duration::from_secs(u64::from(
384 self.config.idle_timeout_secs,
385 )))
386 };
387
388 let mut termination = TerminationType::Natural;
389 let mut last_activity = Instant::now();
390
391 let should_terminate = Arc::new(AtomicBool::new(false));
393
394 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
396 let should_terminate_reader = Arc::clone(&should_terminate);
397 let tui_connected = self.tui_mode;
399 let tui_output_tx = if tui_connected {
400 Some(self.output_tx.clone())
401 } else {
402 None
403 };
404
405 debug!("Spawning PTY output reader thread (observe mode)");
406 std::thread::spawn(move || {
407 let mut reader = reader;
408 let mut buf = [0u8; 4096];
409
410 loop {
411 if should_terminate_reader.load(Ordering::SeqCst) {
412 debug!("PTY reader: termination requested");
413 break;
414 }
415
416 match reader.read(&mut buf) {
417 Ok(0) => {
418 debug!("PTY reader: EOF");
419 let _ = output_tx.blocking_send(OutputEvent::Eof);
420 break;
421 }
422 Ok(n) => {
423 let data = buf[..n].to_vec();
424 if let Some(ref tx) = tui_output_tx {
426 let _ = tx.send(data.clone());
427 }
428 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
430 break;
431 }
432 }
433 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
434 std::thread::sleep(Duration::from_millis(10));
435 }
436 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
437 Err(e) => {
438 debug!(error = %e, "PTY reader error");
439 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
440 break;
441 }
442 }
443 }
444 });
445
446 loop {
448 let idle_timeout = timeout_duration.map(|d| {
450 let elapsed = last_activity.elapsed();
451 if elapsed >= d {
452 Duration::from_millis(1) } else {
454 d.saturating_sub(elapsed)
455 }
456 });
457
458 tokio::select! {
459 _ = interrupt_rx.changed() => {
461 if *interrupt_rx.borrow() {
462 debug!("Interrupt received in observe mode, terminating");
463 termination = TerminationType::UserInterrupt;
464 should_terminate.store(true, Ordering::SeqCst);
465 let _ = self.terminate_child(&mut child, true).await;
466 break;
467 }
468 }
469
470 event = output_rx.recv() => {
472 match event {
473 Some(OutputEvent::Data(data)) => {
474 if !tui_connected {
476 io::stdout().write_all(&data)?;
477 io::stdout().flush()?;
478 }
479 output.extend_from_slice(&data);
480 last_activity = Instant::now();
481 }
482 Some(OutputEvent::Eof) | None => {
483 debug!("Output channel closed, process likely exited");
484 break;
485 }
486 Some(OutputEvent::Error(e)) => {
487 debug!(error = %e, "Reader thread reported error");
488 break;
489 }
490 }
491 }
492
493 _ = async {
495 if let Some(timeout) = idle_timeout {
496 tokio::time::sleep(timeout).await;
497 } else {
498 std::future::pending::<()>().await;
500 }
501 } => {
502 warn!(
503 timeout_secs = self.config.idle_timeout_secs,
504 "Idle timeout triggered"
505 );
506 termination = TerminationType::IdleTimeout;
507 should_terminate.store(true, Ordering::SeqCst);
508 self.terminate_child(&mut child, true).await?;
509 break;
510 }
511 }
512
513 if let Some(status) = child
515 .try_wait()
516 .map_err(|e| io::Error::other(e.to_string()))?
517 {
518 let exit_code = status.exit_code() as i32;
519 debug!(exit_status = ?status, exit_code, "Child process exited");
520
521 while let Ok(event) = output_rx.try_recv() {
523 if let OutputEvent::Data(data) = event {
524 if !tui_connected {
525 io::stdout().write_all(&data)?;
526 io::stdout().flush()?;
527 }
528 output.extend_from_slice(&data);
529 }
530 }
531
532 let drain_deadline = Instant::now() + Duration::from_millis(200);
535 loop {
536 let remaining = drain_deadline.saturating_duration_since(Instant::now());
537 if remaining.is_zero() {
538 break;
539 }
540 match tokio::time::timeout(remaining, output_rx.recv()).await {
541 Ok(Some(OutputEvent::Data(data))) => {
542 if !tui_connected {
543 io::stdout().write_all(&data)?;
544 io::stdout().flush()?;
545 }
546 output.extend_from_slice(&data);
547 }
548 Ok(Some(OutputEvent::Eof) | None) => break,
549 Ok(Some(OutputEvent::Error(e))) => {
550 debug!(error = %e, "PTY read error after exit");
551 break;
552 }
553 Err(_) => break,
554 }
555 }
556
557 let final_termination = resolve_termination_type(exit_code, termination);
558 return Ok(build_result(
560 &output,
561 status.success(),
562 Some(exit_code),
563 final_termination,
564 String::new(),
565 None,
566 ));
567 }
568 }
569
570 should_terminate.store(true, Ordering::SeqCst);
572
573 let status = self
575 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
576 .await?;
577
578 let (success, exit_code, final_termination) = match status {
579 Some(s) => {
580 let code = s.exit_code() as i32;
581 (
582 s.success(),
583 Some(code),
584 resolve_termination_type(code, termination),
585 )
586 }
587 None => {
588 warn!("Timed out waiting for child to exit after termination");
589 (false, None, termination)
590 }
591 };
592
593 Ok(build_result(
595 &output,
596 success,
597 exit_code,
598 final_termination,
599 String::new(),
600 None,
601 ))
602 }
603
604 pub async fn run_observe_streaming<H: StreamHandler>(
620 &self,
621 prompt: &str,
622 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
623 handler: &mut H,
624 ) -> io::Result<PtyExecutionResult> {
625 let output_format = self.backend.output_format;
627
628 let is_stream_json = output_format == OutputFormat::StreamJson;
633 let is_copilot_stream = output_format == OutputFormat::CopilotStreamJson;
634 let is_pi_stream = output_format == OutputFormat::PiStreamJson;
635 let show_pi_thinking = is_pi_stream && self.tui_mode;
637 let is_real_pi_backend = self.backend.command == "pi";
638
639 if is_pi_stream && is_real_pi_backend {
640 let configured_provider =
641 extract_cli_flag_value(&self.backend.args, "--provider", "-p")
642 .unwrap_or_else(|| "auto".to_string());
643 let configured_model = extract_cli_flag_value(&self.backend.args, "--model", "-m")
644 .unwrap_or_else(|| "default".to_string());
645 handler.on_text(&format!(
646 "Pi configured: provider={configured_provider}, model={configured_model}\n"
647 ));
648 }
649
650 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
652
653 let reader = pair
654 .master
655 .try_clone_reader()
656 .map_err(|e| io::Error::other(e.to_string()))?;
657
658 if let Some(ref input) = stdin_input {
660 tokio::time::sleep(Duration::from_millis(100)).await;
661 let mut writer = pair
662 .master
663 .take_writer()
664 .map_err(|e| io::Error::other(e.to_string()))?;
665 writer.write_all(input.as_bytes())?;
666 writer.write_all(b"\n")?;
667 writer.flush()?;
668 }
669
670 drop(pair.slave);
671
672 let mut output = Vec::new();
673 let mut line_buffer = String::new();
674 let mut extracted_text = String::new();
676 let mut pi_state = PiSessionState::new();
678 let mut copilot_state = CopilotStreamState::new();
679 let mut completion: Option<SessionResult> = None;
680 let start_time = Instant::now();
681 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
682 None
683 } else {
684 Some(Duration::from_secs(u64::from(
685 self.config.idle_timeout_secs,
686 )))
687 };
688
689 let mut termination = TerminationType::Natural;
690 let mut last_activity = Instant::now();
691
692 let should_terminate = Arc::new(AtomicBool::new(false));
693
694 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
696 let should_terminate_reader = Arc::clone(&should_terminate);
697 let tui_connected = self.tui_mode;
698 let tui_output_tx = if tui_connected {
699 Some(self.output_tx.clone())
700 } else {
701 None
702 };
703
704 debug!("Spawning PTY output reader thread (streaming mode)");
705 std::thread::spawn(move || {
706 let mut reader = reader;
707 let mut buf = [0u8; 4096];
708
709 loop {
710 if should_terminate_reader.load(Ordering::SeqCst) {
711 debug!("PTY reader: termination requested");
712 break;
713 }
714
715 match reader.read(&mut buf) {
716 Ok(0) => {
717 debug!("PTY reader: EOF");
718 let _ = output_tx.blocking_send(OutputEvent::Eof);
719 break;
720 }
721 Ok(n) => {
722 let data = buf[..n].to_vec();
723 if let Some(ref tx) = tui_output_tx {
724 let _ = tx.send(data.clone());
725 }
726 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
727 break;
728 }
729 }
730 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
731 std::thread::sleep(Duration::from_millis(10));
732 }
733 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
734 Err(e) => {
735 debug!(error = %e, "PTY reader error");
736 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
737 break;
738 }
739 }
740 }
741 });
742
743 loop {
745 let idle_timeout = timeout_duration.map(|d| {
746 let elapsed = last_activity.elapsed();
747 if elapsed >= d {
748 Duration::from_millis(1)
749 } else {
750 d.saturating_sub(elapsed)
751 }
752 });
753
754 tokio::select! {
755 _ = interrupt_rx.changed() => {
756 if *interrupt_rx.borrow() {
757 debug!("Interrupt received in streaming observe mode, terminating");
758 termination = TerminationType::UserInterrupt;
759 should_terminate.store(true, Ordering::SeqCst);
760 let _ = self.terminate_child(&mut child, true).await;
761 break;
762 }
763 }
764
765 event = output_rx.recv() => {
766 match event {
767 Some(OutputEvent::Data(data)) => {
768 output.extend_from_slice(&data);
769 last_activity = Instant::now();
770
771 if let Ok(text) = std::str::from_utf8(&data) {
772 if is_stream_json {
773 line_buffer.push_str(text);
775
776 while let Some(newline_pos) = line_buffer.find('\n') {
778 let line = line_buffer[..newline_pos].to_string();
779 line_buffer = line_buffer[newline_pos + 1..].to_string();
780
781 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
782 if let ClaudeStreamEvent::Result {
783 duration_ms,
784 total_cost_usd,
785 num_turns,
786 is_error,
787 } = &event
788 {
789 completion = Some(SessionResult {
790 duration_ms: *duration_ms,
791 total_cost_usd: *total_cost_usd,
792 num_turns: *num_turns,
793 is_error: *is_error,
794 ..Default::default()
795 });
796 }
797 dispatch_stream_event(event, handler, &mut extracted_text);
798 }
799 }
800 } else if is_copilot_stream {
801 line_buffer.push_str(text);
802
803 while let Some(newline_pos) = line_buffer.find('\n') {
804 let line = line_buffer[..newline_pos].to_string();
805 line_buffer = line_buffer[newline_pos + 1..].to_string();
806
807 if let Some(session_result) = handle_copilot_stream_line(
808 &line,
809 handler,
810 &mut extracted_text,
811 &mut copilot_state,
812 ) {
813 completion = Some(session_result);
814 }
815 }
816 } else if is_pi_stream {
817 line_buffer.push_str(text);
819
820 while let Some(newline_pos) = line_buffer.find('\n') {
821 let line = line_buffer[..newline_pos].to_string();
822 line_buffer = line_buffer[newline_pos + 1..].to_string();
823
824 if let Some(event) = PiStreamParser::parse_line(&line) {
825 dispatch_pi_stream_event(
826 event,
827 handler,
828 &mut extracted_text,
829 &mut pi_state,
830 show_pi_thinking,
831 );
832 }
833 }
834 } else {
835 handler.on_text(text);
838 }
839 }
840 }
841 Some(OutputEvent::Eof) | None => {
842 debug!("Output channel closed");
843 if is_stream_json && !line_buffer.is_empty()
845 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
846 {
847 if let ClaudeStreamEvent::Result {
848 duration_ms,
849 total_cost_usd,
850 num_turns,
851 is_error,
852 } = &event
853 {
854 completion = Some(SessionResult {
855 duration_ms: *duration_ms,
856 total_cost_usd: *total_cost_usd,
857 num_turns: *num_turns,
858 is_error: *is_error,
859 ..Default::default()
860 });
861 }
862 dispatch_stream_event(event, handler, &mut extracted_text);
863 } else if is_copilot_stream && !line_buffer.is_empty() {
864 if let Some(session_result) = handle_copilot_stream_line(
865 &line_buffer,
866 handler,
867 &mut extracted_text,
868 &mut copilot_state,
869 ) {
870 completion = Some(session_result);
871 }
872 } else if is_pi_stream && !line_buffer.is_empty()
873 && let Some(event) = PiStreamParser::parse_line(&line_buffer)
874 {
875 dispatch_pi_stream_event(
876 event,
877 handler,
878 &mut extracted_text,
879 &mut pi_state,
880 show_pi_thinking,
881 );
882 }
883 break;
884 }
885 Some(OutputEvent::Error(e)) => {
886 debug!(error = %e, "Reader thread reported error");
887 handler.on_error(&e);
888 break;
889 }
890 }
891 }
892
893 _ = async {
894 if let Some(timeout) = idle_timeout {
895 tokio::time::sleep(timeout).await;
896 } else {
897 std::future::pending::<()>().await;
898 }
899 } => {
900 warn!(
901 timeout_secs = self.config.idle_timeout_secs,
902 "Idle timeout triggered"
903 );
904 termination = TerminationType::IdleTimeout;
905 should_terminate.store(true, Ordering::SeqCst);
906 self.terminate_child(&mut child, true).await?;
907 break;
908 }
909 }
910
911 if let Some(status) = child
913 .try_wait()
914 .map_err(|e| io::Error::other(e.to_string()))?
915 {
916 let exit_code = status.exit_code() as i32;
917 debug!(exit_status = ?status, exit_code, "Child process exited");
918
919 while let Ok(event) = output_rx.try_recv() {
921 if let OutputEvent::Data(data) = event {
922 output.extend_from_slice(&data);
923 if let Ok(text) = std::str::from_utf8(&data) {
924 if is_stream_json {
925 line_buffer.push_str(text);
927 while let Some(newline_pos) = line_buffer.find('\n') {
928 let line = line_buffer[..newline_pos].to_string();
929 line_buffer = line_buffer[newline_pos + 1..].to_string();
930 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
931 if let ClaudeStreamEvent::Result {
932 duration_ms,
933 total_cost_usd,
934 num_turns,
935 is_error,
936 } = &event
937 {
938 completion = Some(SessionResult {
939 duration_ms: *duration_ms,
940 total_cost_usd: *total_cost_usd,
941 num_turns: *num_turns,
942 is_error: *is_error,
943 ..Default::default()
944 });
945 }
946 dispatch_stream_event(event, handler, &mut extracted_text);
947 }
948 }
949 } else if is_copilot_stream {
950 line_buffer.push_str(text);
951 while let Some(newline_pos) = line_buffer.find('\n') {
952 let line = line_buffer[..newline_pos].to_string();
953 line_buffer = line_buffer[newline_pos + 1..].to_string();
954 if let Some(session_result) = handle_copilot_stream_line(
955 &line,
956 handler,
957 &mut extracted_text,
958 &mut copilot_state,
959 ) {
960 completion = Some(session_result);
961 }
962 }
963 } else if is_pi_stream {
964 line_buffer.push_str(text);
966 while let Some(newline_pos) = line_buffer.find('\n') {
967 let line = line_buffer[..newline_pos].to_string();
968 line_buffer = line_buffer[newline_pos + 1..].to_string();
969 if let Some(event) = PiStreamParser::parse_line(&line) {
970 dispatch_pi_stream_event(
971 event,
972 handler,
973 &mut extracted_text,
974 &mut pi_state,
975 show_pi_thinking,
976 );
977 }
978 }
979 } else {
980 handler.on_text(text);
982 }
983 }
984 }
985 }
986
987 let drain_deadline = Instant::now() + Duration::from_millis(200);
990 loop {
991 let remaining = drain_deadline.saturating_duration_since(Instant::now());
992 if remaining.is_zero() {
993 break;
994 }
995 match tokio::time::timeout(remaining, output_rx.recv()).await {
996 Ok(Some(OutputEvent::Data(data))) => {
997 output.extend_from_slice(&data);
998 if let Ok(text) = std::str::from_utf8(&data) {
999 if is_stream_json {
1000 line_buffer.push_str(text);
1002 while let Some(newline_pos) = line_buffer.find('\n') {
1003 let line = line_buffer[..newline_pos].to_string();
1004 line_buffer = line_buffer[newline_pos + 1..].to_string();
1005 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
1006 if let ClaudeStreamEvent::Result {
1007 duration_ms,
1008 total_cost_usd,
1009 num_turns,
1010 is_error,
1011 } = &event
1012 {
1013 completion = Some(SessionResult {
1014 duration_ms: *duration_ms,
1015 total_cost_usd: *total_cost_usd,
1016 num_turns: *num_turns,
1017 is_error: *is_error,
1018 ..Default::default()
1019 });
1020 }
1021 dispatch_stream_event(
1022 event,
1023 handler,
1024 &mut extracted_text,
1025 );
1026 }
1027 }
1028 } else if is_copilot_stream {
1029 line_buffer.push_str(text);
1030 while let Some(newline_pos) = line_buffer.find('\n') {
1031 let line = line_buffer[..newline_pos].to_string();
1032 line_buffer = line_buffer[newline_pos + 1..].to_string();
1033 handle_copilot_stream_line(
1034 &line,
1035 handler,
1036 &mut extracted_text,
1037 &mut copilot_state,
1038 );
1039 }
1040 } else if is_pi_stream {
1041 line_buffer.push_str(text);
1043 while let Some(newline_pos) = line_buffer.find('\n') {
1044 let line = line_buffer[..newline_pos].to_string();
1045 line_buffer = line_buffer[newline_pos + 1..].to_string();
1046 if let Some(event) = PiStreamParser::parse_line(&line) {
1047 dispatch_pi_stream_event(
1048 event,
1049 handler,
1050 &mut extracted_text,
1051 &mut pi_state,
1052 show_pi_thinking,
1053 );
1054 }
1055 }
1056 } else {
1057 handler.on_text(text);
1059 }
1060 }
1061 }
1062 Ok(Some(OutputEvent::Eof) | None) => break,
1063 Ok(Some(OutputEvent::Error(e))) => {
1064 debug!(error = %e, "PTY read error after exit");
1065 break;
1066 }
1067 Err(_) => break,
1068 }
1069 }
1070
1071 if is_stream_json
1073 && !line_buffer.is_empty()
1074 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
1075 {
1076 if let ClaudeStreamEvent::Result {
1077 duration_ms,
1078 total_cost_usd,
1079 num_turns,
1080 is_error,
1081 } = &event
1082 {
1083 completion = Some(SessionResult {
1084 duration_ms: *duration_ms,
1085 total_cost_usd: *total_cost_usd,
1086 num_turns: *num_turns,
1087 is_error: *is_error,
1088 ..Default::default()
1089 });
1090 }
1091 dispatch_stream_event(event, handler, &mut extracted_text);
1092 } else if is_copilot_stream && !line_buffer.is_empty() {
1093 if let Some(session_result) = handle_copilot_stream_line(
1094 &line_buffer,
1095 handler,
1096 &mut extracted_text,
1097 &mut copilot_state,
1098 ) {
1099 completion = Some(session_result);
1100 }
1101 } else if is_pi_stream
1102 && !line_buffer.is_empty()
1103 && let Some(event) = PiStreamParser::parse_line(&line_buffer)
1104 {
1105 dispatch_pi_stream_event(
1106 event,
1107 handler,
1108 &mut extracted_text,
1109 &mut pi_state,
1110 show_pi_thinking,
1111 );
1112 }
1113
1114 let final_termination = resolve_termination_type(exit_code, termination);
1115
1116 if is_pi_stream {
1118 if is_real_pi_backend {
1119 let stream_provider =
1120 pi_state.stream_provider.as_deref().unwrap_or("unknown");
1121 let stream_model = pi_state.stream_model.as_deref().unwrap_or("unknown");
1122 handler.on_text(&format!(
1123 "Pi stream: provider={stream_provider}, model={stream_model}\n"
1124 ));
1125 }
1126 let session_result = SessionResult {
1127 duration_ms: start_time.elapsed().as_millis() as u64,
1128 total_cost_usd: pi_state.total_cost_usd,
1129 num_turns: pi_state.num_turns,
1130 is_error: !status.success(),
1131 input_tokens: pi_state.input_tokens,
1132 output_tokens: pi_state.output_tokens,
1133 cache_read_tokens: pi_state.cache_read_tokens,
1134 cache_write_tokens: pi_state.cache_write_tokens,
1135 };
1136 handler.on_complete(&session_result);
1137 completion = Some(session_result);
1138 }
1139
1140 return Ok(build_result(
1142 &output,
1143 status.success(),
1144 Some(exit_code),
1145 final_termination,
1146 extracted_text,
1147 completion.as_ref(),
1148 ));
1149 }
1150 }
1151
1152 should_terminate.store(true, Ordering::SeqCst);
1153
1154 let status = self
1155 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1156 .await?;
1157
1158 let (success, exit_code, final_termination) = match status {
1159 Some(s) => {
1160 let code = s.exit_code() as i32;
1161 (
1162 s.success(),
1163 Some(code),
1164 resolve_termination_type(code, termination),
1165 )
1166 }
1167 None => {
1168 warn!("Timed out waiting for child to exit after termination");
1169 (false, None, termination)
1170 }
1171 };
1172
1173 if is_pi_stream {
1175 if is_real_pi_backend {
1176 let stream_provider = pi_state.stream_provider.as_deref().unwrap_or("unknown");
1177 let stream_model = pi_state.stream_model.as_deref().unwrap_or("unknown");
1178 handler.on_text(&format!(
1179 "Pi stream: provider={stream_provider}, model={stream_model}\n"
1180 ));
1181 }
1182 let session_result = SessionResult {
1183 duration_ms: start_time.elapsed().as_millis() as u64,
1184 total_cost_usd: pi_state.total_cost_usd,
1185 num_turns: pi_state.num_turns,
1186 is_error: !success,
1187 input_tokens: pi_state.input_tokens,
1188 output_tokens: pi_state.output_tokens,
1189 cache_read_tokens: pi_state.cache_read_tokens,
1190 cache_write_tokens: pi_state.cache_write_tokens,
1191 };
1192 handler.on_complete(&session_result);
1193 completion = Some(session_result);
1194 }
1195
1196 Ok(build_result(
1198 &output,
1199 success,
1200 exit_code,
1201 final_termination,
1202 extracted_text,
1203 completion.as_ref(),
1204 ))
1205 }
1206
1207 #[allow(clippy::too_many_lines)] pub async fn run_interactive(
1228 &mut self,
1229 prompt: &str,
1230 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
1231 ) -> io::Result<PtyExecutionResult> {
1232 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
1234
1235 let reader = pair
1236 .master
1237 .try_clone_reader()
1238 .map_err(|e| io::Error::other(e.to_string()))?;
1239 let mut writer = pair
1240 .master
1241 .take_writer()
1242 .map_err(|e| io::Error::other(e.to_string()))?;
1243
1244 let master = pair.master;
1246
1247 drop(pair.slave);
1249
1250 let pending_stdin = stdin_input;
1252
1253 let mut output = Vec::new();
1254 let timeout_duration = if self.config.idle_timeout_secs > 0 {
1255 Some(Duration::from_secs(u64::from(
1256 self.config.idle_timeout_secs,
1257 )))
1258 } else {
1259 None
1260 };
1261
1262 let mut ctrl_c_state = CtrlCState::new();
1263 let mut termination = TerminationType::Natural;
1264 let mut last_activity = Instant::now();
1265
1266 let should_terminate = Arc::new(AtomicBool::new(false));
1268
1269 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
1271 let should_terminate_output = Arc::clone(&should_terminate);
1272 let tui_connected = self.tui_mode;
1274 let tui_output_tx = if tui_connected {
1275 Some(self.output_tx.clone())
1276 } else {
1277 None
1278 };
1279
1280 debug!("Spawning PTY output reader thread");
1281 std::thread::spawn(move || {
1282 debug!("PTY output reader thread started");
1283 let mut reader = reader;
1284 let mut buf = [0u8; 4096];
1285
1286 loop {
1287 if should_terminate_output.load(Ordering::SeqCst) {
1288 debug!("PTY output reader: termination requested");
1289 break;
1290 }
1291
1292 match reader.read(&mut buf) {
1293 Ok(0) => {
1294 debug!("PTY output reader: EOF received");
1296 let _ = output_tx.blocking_send(OutputEvent::Eof);
1297 break;
1298 }
1299 Ok(n) => {
1300 let data = buf[..n].to_vec();
1301 if let Some(ref tx) = tui_output_tx {
1303 let _ = tx.send(data.clone());
1304 }
1305 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
1307 debug!("PTY output reader: channel closed");
1308 break;
1309 }
1310 }
1311 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1312 std::thread::sleep(Duration::from_millis(1));
1314 }
1315 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
1316 }
1318 Err(e) => {
1319 warn!("PTY output reader: error - {}", e);
1320 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
1321 break;
1322 }
1323 }
1324 }
1325 debug!("PTY output reader thread exiting");
1326 });
1327
1328 let mut input_rx = if tui_connected {
1333 debug!("TUI connected - skipping stdin reader thread");
1334 None
1335 } else {
1336 let (input_tx, input_rx) = mpsc::unbounded_channel::<InputEvent>();
1337 let should_terminate_input = Arc::clone(&should_terminate);
1338
1339 std::thread::spawn(move || {
1340 let mut stdin = io::stdin();
1341 let mut buf = [0u8; 1];
1342
1343 loop {
1344 if should_terminate_input.load(Ordering::SeqCst) {
1345 break;
1346 }
1347
1348 match stdin.read(&mut buf) {
1349 Ok(0) => break, Ok(1) => {
1351 let byte = buf[0];
1352 let event = match byte {
1353 3 => InputEvent::CtrlC, 28 => InputEvent::CtrlBackslash, _ => InputEvent::Data(vec![byte]),
1356 };
1357 if input_tx.send(event).is_err() {
1358 break;
1359 }
1360 }
1361 Ok(_) => {} Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
1363 Err(_) => break,
1364 }
1365 }
1366 });
1367 Some(input_rx)
1368 };
1369
1370 if let Some(ref input) = pending_stdin {
1373 tokio::time::sleep(Duration::from_millis(100)).await;
1374 writer.write_all(input.as_bytes())?;
1375 writer.write_all(b"\n")?;
1376 writer.flush()?;
1377 last_activity = Instant::now();
1378 }
1379
1380 loop {
1383 if let Some(status) = child
1385 .try_wait()
1386 .map_err(|e| io::Error::other(e.to_string()))?
1387 {
1388 let exit_code = status.exit_code() as i32;
1389 debug!(exit_status = ?status, exit_code, "Child process exited");
1390
1391 while let Ok(event) = output_rx.try_recv() {
1393 if let OutputEvent::Data(data) = event {
1394 if !tui_connected {
1395 io::stdout().write_all(&data)?;
1396 io::stdout().flush()?;
1397 }
1398 output.extend_from_slice(&data);
1399 }
1400 }
1401
1402 let drain_deadline = Instant::now() + Duration::from_millis(200);
1405 loop {
1406 let remaining = drain_deadline.saturating_duration_since(Instant::now());
1407 if remaining.is_zero() {
1408 break;
1409 }
1410 match tokio::time::timeout(remaining, output_rx.recv()).await {
1411 Ok(Some(OutputEvent::Data(data))) => {
1412 if !tui_connected {
1413 io::stdout().write_all(&data)?;
1414 io::stdout().flush()?;
1415 }
1416 output.extend_from_slice(&data);
1417 }
1418 Ok(Some(OutputEvent::Eof) | None) => break,
1419 Ok(Some(OutputEvent::Error(e))) => {
1420 debug!(error = %e, "PTY read error after exit");
1421 break;
1422 }
1423 Err(_) => break, }
1425 }
1426
1427 should_terminate.store(true, Ordering::SeqCst);
1428 let _ = self.terminated_tx.send(true);
1430
1431 let final_termination = resolve_termination_type(exit_code, termination);
1432 return Ok(build_result(
1434 &output,
1435 status.success(),
1436 Some(exit_code),
1437 final_termination,
1438 String::new(),
1439 None,
1440 ));
1441 }
1442
1443 let timeout_future = async {
1445 match timeout_duration {
1446 Some(d) => {
1447 let elapsed = last_activity.elapsed();
1448 if elapsed >= d {
1449 tokio::time::sleep(Duration::ZERO).await
1450 } else {
1451 tokio::time::sleep(d.saturating_sub(elapsed)).await
1452 }
1453 }
1454 None => std::future::pending::<()>().await,
1455 }
1456 };
1457
1458 tokio::select! {
1459 output_event = output_rx.recv() => {
1461 match output_event {
1462 Some(OutputEvent::Data(data)) => {
1463 if !tui_connected {
1465 io::stdout().write_all(&data)?;
1466 io::stdout().flush()?;
1467 }
1468 output.extend_from_slice(&data);
1469
1470 last_activity = Instant::now();
1471 }
1472 Some(OutputEvent::Eof) => {
1473 debug!("PTY EOF received");
1474 break;
1475 }
1476 Some(OutputEvent::Error(e)) => {
1477 debug!(error = %e, "PTY read error");
1478 break;
1479 }
1480 None => {
1481 break;
1483 }
1484 }
1485 }
1486
1487 input_event = async {
1489 match input_rx.as_mut() {
1490 Some(rx) => rx.recv().await,
1491 None => std::future::pending().await, }
1493 } => {
1494 match input_event {
1495 Some(InputEvent::CtrlC) => {
1496 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1497 CtrlCAction::ForwardAndStartWindow => {
1498 let _ = writer.write_all(&[3]);
1500 let _ = writer.flush();
1501 last_activity = Instant::now();
1502 }
1503 CtrlCAction::Terminate => {
1504 info!("Double Ctrl+C detected, terminating");
1505 termination = TerminationType::UserInterrupt;
1506 should_terminate.store(true, Ordering::SeqCst);
1507 self.terminate_child(&mut child, true).await?;
1508 break;
1509 }
1510 }
1511 }
1512 Some(InputEvent::CtrlBackslash) => {
1513 info!("Ctrl+\\ detected, force killing");
1514 termination = TerminationType::ForceKill;
1515 should_terminate.store(true, Ordering::SeqCst);
1516 self.terminate_child(&mut child, false).await?;
1517 break;
1518 }
1519 Some(InputEvent::Data(data)) => {
1520 let _ = writer.write_all(&data);
1522 let _ = writer.flush();
1523 last_activity = Instant::now();
1524 }
1525 None => {
1526 debug!("Input channel closed");
1528 }
1529 }
1530 }
1531
1532 tui_input = self.input_rx.recv() => {
1534 if let Some(data) = tui_input {
1535 match InputEvent::from_bytes(data) {
1536 InputEvent::CtrlC => {
1537 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1538 CtrlCAction::ForwardAndStartWindow => {
1539 let _ = writer.write_all(&[3]);
1540 let _ = writer.flush();
1541 last_activity = Instant::now();
1542 }
1543 CtrlCAction::Terminate => {
1544 info!("Double Ctrl+C detected, terminating");
1545 termination = TerminationType::UserInterrupt;
1546 should_terminate.store(true, Ordering::SeqCst);
1547 self.terminate_child(&mut child, true).await?;
1548 break;
1549 }
1550 }
1551 }
1552 InputEvent::CtrlBackslash => {
1553 info!("Ctrl+\\ detected, force killing");
1554 termination = TerminationType::ForceKill;
1555 should_terminate.store(true, Ordering::SeqCst);
1556 self.terminate_child(&mut child, false).await?;
1557 break;
1558 }
1559 InputEvent::Data(bytes) => {
1560 let _ = writer.write_all(&bytes);
1561 let _ = writer.flush();
1562 last_activity = Instant::now();
1563 }
1564 }
1565 }
1566 }
1567
1568 control_cmd = self.control_rx.recv() => {
1570 if let Some(cmd) = control_cmd {
1571 use crate::pty_handle::ControlCommand;
1572 match cmd {
1573 ControlCommand::Kill => {
1574 info!("Control command: Kill");
1575 termination = TerminationType::UserInterrupt;
1576 should_terminate.store(true, Ordering::SeqCst);
1577 self.terminate_child(&mut child, true).await?;
1578 break;
1579 }
1580 ControlCommand::Resize(cols, rows) => {
1581 debug!(cols, rows, "Control command: Resize");
1582 if let Err(e) = master.resize(PtySize {
1584 rows,
1585 cols,
1586 pixel_width: 0,
1587 pixel_height: 0,
1588 }) {
1589 warn!("Failed to resize PTY: {}", e);
1590 }
1591 }
1592 ControlCommand::Skip | ControlCommand::Abort => {
1593 debug!("Control command: {:?} (ignored at PTY level)", cmd);
1595 }
1596 }
1597 }
1598 }
1599
1600 _ = timeout_future => {
1602 warn!(
1603 timeout_secs = self.config.idle_timeout_secs,
1604 "Idle timeout triggered"
1605 );
1606 termination = TerminationType::IdleTimeout;
1607 should_terminate.store(true, Ordering::SeqCst);
1608 self.terminate_child(&mut child, true).await?;
1609 break;
1610 }
1611
1612 _ = interrupt_rx.changed() => {
1614 if *interrupt_rx.borrow() {
1615 debug!("Interrupt received in interactive mode, terminating");
1616 termination = TerminationType::UserInterrupt;
1617 should_terminate.store(true, Ordering::SeqCst);
1618 self.terminate_child(&mut child, true).await?;
1619 break;
1620 }
1621 }
1622 }
1623 }
1624
1625 should_terminate.store(true, Ordering::SeqCst);
1627
1628 let _ = self.terminated_tx.send(true);
1630
1631 let status = self
1633 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1634 .await?;
1635
1636 let (success, exit_code, final_termination) = match status {
1637 Some(s) => {
1638 let code = s.exit_code() as i32;
1639 (
1640 s.success(),
1641 Some(code),
1642 resolve_termination_type(code, termination),
1643 )
1644 }
1645 None => {
1646 warn!("Timed out waiting for child to exit after termination");
1647 (false, None, termination)
1648 }
1649 };
1650
1651 Ok(build_result(
1653 &output,
1654 success,
1655 exit_code,
1656 final_termination,
1657 String::new(),
1658 None,
1659 ))
1660 }
1661
1662 #[allow(clippy::unused_self)] #[allow(clippy::unused_async)] #[cfg(not(unix))]
1673 async fn terminate_child(
1674 &self,
1675 child: &mut Box<dyn portable_pty::Child + Send>,
1676 _graceful: bool,
1677 ) -> io::Result<()> {
1678 child.kill()
1679 }
1680
1681 #[cfg(unix)]
1682 async fn terminate_child(
1683 &self,
1684 child: &mut Box<dyn portable_pty::Child + Send>,
1685 graceful: bool,
1686 ) -> io::Result<()> {
1687 let pid = match child.process_id() {
1688 Some(id) => Pid::from_raw(id as i32),
1689 None => return Ok(()), };
1691
1692 if graceful {
1693 debug!(pid = %pid, "Sending SIGTERM");
1694 let _ = kill(pid, Signal::SIGTERM);
1695
1696 let grace_period = Duration::from_secs(2);
1698 let start = Instant::now();
1699
1700 while start.elapsed() < grace_period {
1701 if child
1702 .try_wait()
1703 .map_err(|e| io::Error::other(e.to_string()))?
1704 .is_some()
1705 {
1706 return Ok(());
1707 }
1708 tokio::time::sleep(Duration::from_millis(50)).await;
1710 }
1711
1712 debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1714 }
1715
1716 debug!(pid = %pid, "Sending SIGKILL");
1717 let _ = kill(pid, Signal::SIGKILL);
1718 Ok(())
1719 }
1720
1721 async fn wait_for_exit(
1726 &self,
1727 child: &mut Box<dyn portable_pty::Child + Send>,
1728 max_wait: Option<Duration>,
1729 interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1730 ) -> io::Result<Option<portable_pty::ExitStatus>> {
1731 let start = Instant::now();
1732
1733 loop {
1734 if let Some(status) = child
1735 .try_wait()
1736 .map_err(|e| io::Error::other(e.to_string()))?
1737 {
1738 return Ok(Some(status));
1739 }
1740
1741 if let Some(max) = max_wait
1742 && start.elapsed() >= max
1743 {
1744 return Ok(None);
1745 }
1746
1747 tokio::select! {
1748 _ = interrupt_rx.changed() => {
1749 if *interrupt_rx.borrow() {
1750 debug!("Interrupt received while waiting for child exit");
1751 return Ok(None);
1752 }
1753 }
1754 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1755 }
1756 }
1757 }
1758}
1759
1760fn handle_copilot_stream_line<H: StreamHandler>(
1761 line: &str,
1762 handler: &mut H,
1763 extracted_text: &mut String,
1764 copilot_state: &mut CopilotStreamState,
1765) -> Option<SessionResult> {
1766 let event = CopilotStreamParser::parse_line(line)?;
1767 dispatch_copilot_stream_event(event, handler, extracted_text, copilot_state)
1768}
1769
1770fn inject_ralph_runtime_env(cmd_builder: &mut CommandBuilder, workspace_root: &std::path::Path) {
1771 let Ok(current_exe) = env::current_exe() else {
1772 return;
1773 };
1774 let Some(bin_dir) = current_exe.parent() else {
1775 return;
1776 };
1777
1778 let mut path_entries = vec![bin_dir.to_path_buf()];
1779 if let Some(existing_path) = env::var_os("PATH") {
1780 path_entries.extend(env::split_paths(&existing_path));
1781 }
1782
1783 if let Ok(joined_path) = env::join_paths(path_entries) {
1784 cmd_builder.env("PATH", joined_path);
1785 }
1786 cmd_builder.env("RALPH_BIN", current_exe);
1787 cmd_builder.env("RALPH_WORKSPACE_ROOT", workspace_root);
1788
1789 let marker = workspace_root.join(".ralph/current-events");
1791 if let Ok(relative) = std::fs::read_to_string(&marker) {
1792 let abs = workspace_root.join(relative.trim());
1793 cmd_builder.env("RALPH_EVENTS_FILE", abs);
1794 }
1795
1796 if std::path::Path::new("/var/tmp").is_dir() {
1797 cmd_builder.env("TMPDIR", "/var/tmp");
1798 cmd_builder.env("TMP", "/var/tmp");
1799 cmd_builder.env("TEMP", "/var/tmp");
1800 }
1801}
1802
1803#[derive(Debug)]
1805enum InputEvent {
1806 CtrlC,
1808 CtrlBackslash,
1810 Data(Vec<u8>),
1812}
1813
1814impl InputEvent {
1815 fn from_bytes(data: Vec<u8>) -> Self {
1817 if data.len() == 1 {
1818 match data[0] {
1819 3 => return InputEvent::CtrlC,
1820 28 => return InputEvent::CtrlBackslash,
1821 _ => {}
1822 }
1823 }
1824 InputEvent::Data(data)
1825 }
1826}
1827
1828#[derive(Debug)]
1830enum OutputEvent {
1831 Data(Vec<u8>),
1833 Eof,
1835 Error(String),
1837}
1838
1839fn strip_ansi(bytes: &[u8]) -> String {
1845 let stripped = strip_ansi_escapes::strip(bytes);
1846 String::from_utf8_lossy(&stripped).into_owned()
1847}
1848
1849fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1853 if exit_code == 130 {
1854 info!("Child process killed by SIGINT");
1855 TerminationType::UserInterrupt
1856 } else {
1857 default
1858 }
1859}
1860
1861fn extract_cli_flag_value(args: &[String], long_flag: &str, short_flag: &str) -> Option<String> {
1862 for (i, arg) in args.iter().enumerate() {
1863 if arg == long_flag || arg == short_flag {
1864 if let Some(value) = args.get(i + 1)
1865 && !value.starts_with('-')
1866 {
1867 return Some(value.clone());
1868 }
1869 continue;
1870 }
1871
1872 if let Some(value) = arg.strip_prefix(&format!("{long_flag}="))
1873 && !value.is_empty()
1874 {
1875 return Some(value.to_string());
1876 }
1877
1878 if let Some(value) = arg.strip_prefix(&format!("{short_flag}="))
1879 && !value.is_empty()
1880 {
1881 return Some(value.to_string());
1882 }
1883 }
1884
1885 None
1886}
1887
1888fn dispatch_stream_event<H: StreamHandler>(
1891 event: ClaudeStreamEvent,
1892 handler: &mut H,
1893 extracted_text: &mut String,
1894) {
1895 match event {
1896 ClaudeStreamEvent::System { .. } => {
1897 }
1899 ClaudeStreamEvent::Assistant { message, .. } => {
1900 for block in message.content {
1901 match block {
1902 ContentBlock::Text { text } => {
1903 handler.on_text(&text);
1904 extracted_text.push_str(&text);
1906 extracted_text.push('\n');
1907 }
1908 ContentBlock::ToolUse { name, id, input } => {
1909 handler.on_tool_call(&name, &id, &input)
1910 }
1911 }
1912 }
1913 }
1914 ClaudeStreamEvent::User { message } => {
1915 for block in message.content {
1916 match block {
1917 UserContentBlock::ToolResult {
1918 tool_use_id,
1919 content,
1920 } => {
1921 handler.on_tool_result(&tool_use_id, &content);
1922 }
1923 }
1924 }
1925 }
1926 ClaudeStreamEvent::Result {
1927 duration_ms,
1928 total_cost_usd,
1929 num_turns,
1930 is_error,
1931 } => {
1932 if is_error {
1933 handler.on_error("Session ended with error");
1934 }
1935 handler.on_complete(&SessionResult {
1936 duration_ms,
1937 total_cost_usd,
1938 num_turns,
1939 is_error,
1940 ..Default::default()
1941 });
1942 }
1943 }
1944}
1945
1946fn build_result(
1955 output: &[u8],
1956 success: bool,
1957 exit_code: Option<i32>,
1958 termination: TerminationType,
1959 extracted_text: String,
1960 session_result: Option<&SessionResult>,
1961) -> PtyExecutionResult {
1962 let (total_cost_usd, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens) =
1963 if let Some(result) = session_result {
1964 (
1965 result.total_cost_usd,
1966 result.input_tokens,
1967 result.output_tokens,
1968 result.cache_read_tokens,
1969 result.cache_write_tokens,
1970 )
1971 } else {
1972 (0.0, 0, 0, 0, 0)
1973 };
1974
1975 PtyExecutionResult {
1976 output: String::from_utf8_lossy(output).to_string(),
1977 stripped_output: strip_ansi(output),
1978 extracted_text,
1979 success,
1980 exit_code,
1981 termination,
1982 total_cost_usd,
1983 input_tokens,
1984 output_tokens,
1985 cache_read_tokens,
1986 cache_write_tokens,
1987 }
1988}
1989
1990#[cfg(test)]
1991mod tests {
1992 use super::*;
1993 use crate::claude_stream::{AssistantMessage, UserMessage};
1994 #[cfg(unix)]
1995 use crate::cli_backend::PromptMode;
1996 use crate::stream_handler::{SessionResult, StreamHandler};
1997 #[cfg(unix)]
1998 use tempfile::TempDir;
1999
2000 #[test]
2001 fn test_double_ctrl_c_within_window() {
2002 let mut state = CtrlCState::new();
2003 let now = Instant::now();
2004
2005 let action = state.handle_ctrl_c(now);
2007 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
2008
2009 let later = now + Duration::from_millis(500);
2011 let action = state.handle_ctrl_c(later);
2012 assert_eq!(action, CtrlCAction::Terminate);
2013 }
2014
2015 #[test]
2016 fn test_input_event_from_bytes_ctrl_c() {
2017 let event = InputEvent::from_bytes(vec![3]);
2018 assert!(matches!(event, InputEvent::CtrlC));
2019 }
2020
2021 #[test]
2022 fn test_input_event_from_bytes_ctrl_backslash() {
2023 let event = InputEvent::from_bytes(vec![28]);
2024 assert!(matches!(event, InputEvent::CtrlBackslash));
2025 }
2026
2027 #[test]
2028 fn test_input_event_from_bytes_data() {
2029 let event = InputEvent::from_bytes(vec![b'a']);
2030 assert!(matches!(event, InputEvent::Data(_)));
2031
2032 let event = InputEvent::from_bytes(vec![1, 2, 3]);
2033 assert!(matches!(event, InputEvent::Data(_)));
2034 }
2035
2036 #[test]
2037 fn test_ctrl_c_window_expires() {
2038 let mut state = CtrlCState::new();
2039 let now = Instant::now();
2040
2041 state.handle_ctrl_c(now);
2043
2044 let later = now + Duration::from_secs(2);
2046
2047 let action = state.handle_ctrl_c(later);
2049 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
2050 }
2051
2052 #[test]
2053 fn test_strip_ansi_basic() {
2054 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n";
2055 let stripped = strip_ansi(input);
2056 assert!(stripped.contains("Thinking..."));
2057 assert!(!stripped.contains("\x1b["));
2058 }
2059
2060 #[test]
2061 fn test_completion_promise_extraction() {
2062 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n\
2064 \x1b[2K\x1b[1;32m Done!\x1b[0m\r\n\
2065 \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
2066
2067 let stripped = strip_ansi(input);
2068
2069 assert!(stripped.contains("LOOP_COMPLETE"));
2071 assert!(!stripped.contains("\x1b["));
2072 }
2073
2074 #[test]
2075 fn test_event_tag_extraction() {
2076 let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
2078 Task completed successfully\r\n\
2079 \x1b[90m</event>\x1b[0m\r\n";
2080
2081 let stripped = strip_ansi(input);
2082
2083 assert!(stripped.contains("<event topic=\"build.done\">"));
2084 assert!(stripped.contains("</event>"));
2085 }
2086
2087 #[test]
2088 fn test_large_output_preserves_early_events() {
2089 let mut input = Vec::new();
2091
2092 input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
2094
2095 for i in 0..500 {
2097 input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
2098 }
2099
2100 let stripped = strip_ansi(&input);
2101
2102 assert!(
2104 stripped.contains("<event topic=\"build.task\">"),
2105 "Event tag was lost - strip_ansi is not preserving all content"
2106 );
2107 assert!(stripped.contains("Implement feature X"));
2108 assert!(stripped.contains("Line 499")); }
2110
2111 #[test]
2112 fn test_pty_config_defaults() {
2113 let config = PtyConfig::default();
2114 assert!(config.interactive);
2115 assert_eq!(config.idle_timeout_secs, 30);
2116 assert_eq!(config.cols, 80);
2117 assert_eq!(config.rows, 24);
2118 }
2119
2120 #[test]
2121 fn test_pty_config_from_env_matches_env_or_defaults() {
2122 let cols = std::env::var("COLUMNS")
2123 .ok()
2124 .and_then(|value| value.parse::<u16>().ok())
2125 .unwrap_or(80);
2126 let rows = std::env::var("LINES")
2127 .ok()
2128 .and_then(|value| value.parse::<u16>().ok())
2129 .unwrap_or(24);
2130
2131 let config = PtyConfig::from_env();
2132 assert_eq!(config.cols, cols);
2133 assert_eq!(config.rows, rows);
2134 }
2135
2136 #[test]
2144 fn test_idle_timeout_reset_logic() {
2145 let timeout_duration = Duration::from_secs(30);
2147
2148 let simulated_25s = Duration::from_secs(25);
2150
2151 let remaining = timeout_duration.saturating_sub(simulated_25s);
2153 assert_eq!(remaining.as_secs(), 5);
2154
2155 let last_activity_after_reset = Instant::now();
2157
2158 let elapsed = last_activity_after_reset.elapsed();
2160 assert!(elapsed < Duration::from_millis(100)); let new_remaining = timeout_duration.saturating_sub(elapsed);
2164 assert!(new_remaining > Duration::from_secs(29)); }
2166
2167 #[test]
2168 fn test_extracted_text_field_exists() {
2169 let result = PtyExecutionResult {
2172 output: String::new(),
2173 stripped_output: String::new(),
2174 extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
2175 success: true,
2176 exit_code: Some(0),
2177 termination: TerminationType::Natural,
2178 total_cost_usd: 0.0,
2179 input_tokens: 0,
2180 output_tokens: 0,
2181 cache_read_tokens: 0,
2182 cache_write_tokens: 0,
2183 };
2184
2185 assert!(
2186 result
2187 .extracted_text
2188 .contains("<event topic=\"build.done\">")
2189 );
2190 }
2191
2192 #[test]
2193 fn test_build_result_includes_extracted_text() {
2194 let output = b"raw output";
2196 let extracted = "extracted text with <event topic=\"test\">payload</event>";
2197 let result = build_result(
2198 output,
2199 true,
2200 Some(0),
2201 TerminationType::Natural,
2202 extracted.to_string(),
2203 None,
2204 );
2205
2206 assert_eq!(result.extracted_text, extracted);
2207 assert!(result.stripped_output.contains("raw output"));
2208 }
2209
2210 #[test]
2211 fn test_resolve_termination_type_handles_sigint_exit_code() {
2212 let termination = resolve_termination_type(130, TerminationType::Natural);
2213 assert_eq!(termination, TerminationType::UserInterrupt);
2214
2215 let termination = resolve_termination_type(0, TerminationType::ForceKill);
2216 assert_eq!(termination, TerminationType::ForceKill);
2217 }
2218
2219 #[test]
2220 fn test_extract_cli_flag_value_supports_split_and_equals_syntax() {
2221 let args = vec![
2222 "--provider".to_string(),
2223 "anthropic".to_string(),
2224 "--model=claude-sonnet-4".to_string(),
2225 ];
2226
2227 assert_eq!(
2228 extract_cli_flag_value(&args, "--provider", "-p"),
2229 Some("anthropic".to_string())
2230 );
2231 assert_eq!(
2232 extract_cli_flag_value(&args, "--model", "-m"),
2233 Some("claude-sonnet-4".to_string())
2234 );
2235 assert_eq!(extract_cli_flag_value(&args, "--foo", "-f"), None);
2236 }
2237
2238 #[derive(Default)]
2239 struct CapturingHandler {
2240 texts: Vec<String>,
2241 tool_calls: Vec<(String, String, serde_json::Value)>,
2242 tool_results: Vec<(String, String)>,
2243 errors: Vec<String>,
2244 completions: Vec<SessionResult>,
2245 }
2246
2247 impl StreamHandler for CapturingHandler {
2248 fn on_text(&mut self, text: &str) {
2249 self.texts.push(text.to_string());
2250 }
2251
2252 fn on_tool_call(&mut self, name: &str, id: &str, input: &serde_json::Value) {
2253 self.tool_calls
2254 .push((name.to_string(), id.to_string(), input.clone()));
2255 }
2256
2257 fn on_tool_result(&mut self, id: &str, output: &str) {
2258 self.tool_results.push((id.to_string(), output.to_string()));
2259 }
2260
2261 fn on_error(&mut self, error: &str) {
2262 self.errors.push(error.to_string());
2263 }
2264
2265 fn on_complete(&mut self, result: &SessionResult) {
2266 self.completions.push(result.clone());
2267 }
2268 }
2269
2270 #[test]
2271 fn test_dispatch_stream_event_routes_text_and_tool_calls() {
2272 let mut handler = CapturingHandler::default();
2273 let mut extracted_text = String::new();
2274
2275 let event = ClaudeStreamEvent::Assistant {
2276 message: AssistantMessage {
2277 content: vec![
2278 ContentBlock::Text {
2279 text: "Hello".to_string(),
2280 },
2281 ContentBlock::ToolUse {
2282 id: "tool-1".to_string(),
2283 name: "Read".to_string(),
2284 input: serde_json::json!({"path": "README.md"}),
2285 },
2286 ],
2287 },
2288 usage: None,
2289 };
2290
2291 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2292
2293 assert_eq!(handler.texts, vec!["Hello".to_string()]);
2294 assert_eq!(handler.tool_calls.len(), 1);
2295 assert!(extracted_text.contains("Hello"));
2296 assert!(extracted_text.ends_with('\n'));
2297 }
2298
2299 #[test]
2300 fn test_dispatch_stream_event_routes_tool_results_and_completion() {
2301 let mut handler = CapturingHandler::default();
2302 let mut extracted_text = String::new();
2303
2304 let event = ClaudeStreamEvent::User {
2305 message: UserMessage {
2306 content: vec![UserContentBlock::ToolResult {
2307 tool_use_id: "tool-1".to_string(),
2308 content: "done".to_string(),
2309 }],
2310 },
2311 };
2312
2313 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2314 assert_eq!(handler.tool_results.len(), 1);
2315 assert_eq!(handler.tool_results[0].0, "tool-1");
2316 assert_eq!(handler.tool_results[0].1, "done");
2317
2318 let event = ClaudeStreamEvent::Result {
2319 duration_ms: 12,
2320 total_cost_usd: 0.01,
2321 num_turns: 2,
2322 is_error: true,
2323 };
2324
2325 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2326 assert_eq!(handler.errors.len(), 1);
2327 assert_eq!(handler.completions.len(), 1);
2328 assert!(handler.completions[0].is_error);
2329 }
2330
2331 #[test]
2332 fn test_dispatch_stream_event_system_noop() {
2333 let mut handler = CapturingHandler::default();
2334 let mut extracted_text = String::new();
2335
2336 let event = ClaudeStreamEvent::System {
2337 session_id: "session-1".to_string(),
2338 model: "claude-test".to_string(),
2339 tools: Vec::new(),
2340 };
2341
2342 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2343
2344 assert!(handler.texts.is_empty());
2345 assert!(handler.tool_calls.is_empty());
2346 assert!(handler.tool_results.is_empty());
2347 assert!(handler.errors.is_empty());
2348 assert!(handler.completions.is_empty());
2349 assert!(extracted_text.is_empty());
2350 }
2351
2352 #[test]
2367 fn test_tui_mode_stdin_reader_bypass() {
2368 let tui_mode = true;
2374 let tui_connected = tui_mode;
2375
2376 assert!(
2379 tui_connected,
2380 "When tui_mode is true, stdin reader must be skipped"
2381 );
2382
2383 let tui_mode_disabled = false;
2385 let tui_connected_non_tui = tui_mode_disabled;
2386 assert!(
2387 !tui_connected_non_tui,
2388 "When tui_mode is false, stdin reader must be spawned"
2389 );
2390 }
2391
2392 #[test]
2393 fn test_tui_mode_default_is_false() {
2394 let backend = CliBackend::claude();
2396 let config = PtyConfig::default();
2397 let executor = PtyExecutor::new(backend, config);
2398
2399 assert!(!executor.tui_mode, "tui_mode should default to false");
2401 }
2402
2403 #[test]
2404 fn test_set_tui_mode() {
2405 let backend = CliBackend::claude();
2407 let config = PtyConfig::default();
2408 let mut executor = PtyExecutor::new(backend, config);
2409
2410 assert!(!executor.tui_mode, "tui_mode should start as false");
2412
2413 executor.set_tui_mode(true);
2415 assert!(
2416 executor.tui_mode,
2417 "tui_mode should be true after set_tui_mode(true)"
2418 );
2419
2420 executor.set_tui_mode(false);
2422 assert!(
2423 !executor.tui_mode,
2424 "tui_mode should be false after set_tui_mode(false)"
2425 );
2426 }
2427
2428 #[test]
2429 fn test_build_result_populates_fields() {
2430 let output = b"\x1b[31mHello\x1b[0m\n";
2431 let extracted = "extracted text".to_string();
2432
2433 let result = build_result(
2434 output,
2435 true,
2436 Some(0),
2437 TerminationType::Natural,
2438 extracted.clone(),
2439 None,
2440 );
2441
2442 assert_eq!(result.output, String::from_utf8_lossy(output));
2443 assert!(result.stripped_output.contains("Hello"));
2444 assert!(!result.stripped_output.contains("\x1b["));
2445 assert_eq!(result.extracted_text, extracted);
2446 assert!(result.success);
2447 assert_eq!(result.exit_code, Some(0));
2448 assert_eq!(result.termination, TerminationType::Natural);
2449 }
2450
2451 #[cfg(unix)]
2452 #[tokio::test]
2453 async fn test_run_observe_executes_arg_prompt() {
2454 let temp_dir = TempDir::new().expect("temp dir");
2455 let backend = CliBackend {
2456 command: "sh".to_string(),
2457 args: vec!["-c".to_string()],
2458 prompt_mode: PromptMode::Arg,
2459 prompt_flag: None,
2460 output_format: OutputFormat::Text,
2461 env_vars: vec![],
2462 };
2463 let config = PtyConfig {
2464 interactive: false,
2465 idle_timeout_secs: 0,
2466 cols: 80,
2467 rows: 24,
2468 workspace_root: temp_dir.path().to_path_buf(),
2469 };
2470 let executor = PtyExecutor::new(backend, config);
2471 let (_tx, rx) = tokio::sync::watch::channel(false);
2472
2473 let result = executor
2474 .run_observe("echo hello-pty", rx)
2475 .await
2476 .expect("run_observe");
2477
2478 assert!(result.success);
2479 assert!(result.output.contains("hello-pty"));
2480 assert!(result.stripped_output.contains("hello-pty"));
2481 assert_eq!(result.exit_code, Some(0));
2482 assert_eq!(result.termination, TerminationType::Natural);
2483 }
2484
2485 #[cfg(unix)]
2486 #[tokio::test]
2487 async fn test_run_observe_writes_stdin_prompt() {
2488 let temp_dir = TempDir::new().expect("temp dir");
2489 let backend = CliBackend {
2490 command: "sh".to_string(),
2491 args: vec!["-c".to_string(), "read line; echo \"$line\"".to_string()],
2492 prompt_mode: PromptMode::Stdin,
2493 prompt_flag: None,
2494 output_format: OutputFormat::Text,
2495 env_vars: vec![],
2496 };
2497 let config = PtyConfig {
2498 interactive: false,
2499 idle_timeout_secs: 0,
2500 cols: 80,
2501 rows: 24,
2502 workspace_root: temp_dir.path().to_path_buf(),
2503 };
2504 let executor = PtyExecutor::new(backend, config);
2505 let (_tx, rx) = tokio::sync::watch::channel(false);
2506
2507 let result = executor
2508 .run_observe("stdin-line", rx)
2509 .await
2510 .expect("run_observe");
2511
2512 assert!(result.success);
2513 assert!(result.output.contains("stdin-line"));
2514 assert!(result.stripped_output.contains("stdin-line"));
2515 assert_eq!(result.termination, TerminationType::Natural);
2516 }
2517
2518 #[cfg(unix)]
2524 #[tokio::test]
2525 async fn test_pty_converts_stdin_to_arg_for_large_prompt() {
2526 let _temp_dir = TempDir::new().expect("temp dir");
2527 let backend = CliBackend {
2528 command: "echo".to_string(),
2529 args: vec![],
2530 prompt_mode: PromptMode::Stdin,
2531 prompt_flag: Some("-p".to_string()),
2532 output_format: OutputFormat::Text,
2533 env_vars: vec![],
2534 };
2535
2536 let large_prompt = "x".repeat(32_000);
2538 let (cmd, args, stdin_input, temp_file) = backend.build_command_pty(&large_prompt);
2539 assert_eq!(cmd, "echo");
2540 assert!(stdin_input.is_none(), "PTY mode should not use stdin");
2542 assert!(temp_file.is_some(), "Large prompt should use temp file");
2544 assert!(
2546 args.iter().any(|a| a.contains("Please read and execute")),
2547 "args should contain temp file instruction: {:?}",
2548 args
2549 );
2550
2551 let small_prompt = "hello world";
2553 let (_, args, stdin_input, temp_file) = backend.build_command_pty(small_prompt);
2554 assert!(stdin_input.is_none());
2555 assert!(temp_file.is_none());
2556 assert!(args.iter().any(|a| a == small_prompt));
2557 }
2558
2559 #[cfg(unix)]
2562 #[tokio::test]
2563 async fn test_run_observe_large_stdin_backend_does_not_deadlock() {
2564 let temp_dir = TempDir::new().expect("temp dir");
2565 let backend = CliBackend {
2568 command: "echo".to_string(),
2569 args: vec![],
2570 prompt_mode: PromptMode::Stdin,
2571 prompt_flag: Some("-p".to_string()),
2572 output_format: OutputFormat::Text,
2573 env_vars: vec![],
2574 };
2575 let config = PtyConfig {
2576 interactive: false,
2577 idle_timeout_secs: 0,
2578 cols: 32768,
2579 rows: 24,
2580 workspace_root: temp_dir.path().to_path_buf(),
2581 };
2582 let executor = PtyExecutor::new(backend, config);
2583 let (_tx, rx) = tokio::sync::watch::channel(false);
2584
2585 let large_prompt = "x".repeat(32_000);
2586
2587 let result = tokio::time::timeout(
2589 std::time::Duration::from_secs(5),
2590 executor.run_observe(&large_prompt, rx),
2591 )
2592 .await
2593 .expect("should not deadlock")
2594 .expect("run_observe");
2595
2596 assert!(result.success);
2597 assert!(
2599 result.output.contains("Please read and execute"),
2600 "output should contain temp file instruction: {}",
2601 &result.output[..result.output.len().min(200)]
2602 );
2603 }
2604
2605 #[cfg(unix)]
2606 #[tokio::test]
2607 async fn test_run_observe_streaming_text_routes_output() {
2608 let temp_dir = TempDir::new().expect("temp dir");
2609 let backend = CliBackend {
2610 command: "sh".to_string(),
2611 args: vec!["-c".to_string()],
2612 prompt_mode: PromptMode::Arg,
2613 prompt_flag: None,
2614 output_format: OutputFormat::Text,
2615 env_vars: vec![],
2616 };
2617 let config = PtyConfig {
2618 interactive: false,
2619 idle_timeout_secs: 0,
2620 cols: 80,
2621 rows: 24,
2622 workspace_root: temp_dir.path().to_path_buf(),
2623 };
2624 let executor = PtyExecutor::new(backend, config);
2625 let (_tx, rx) = tokio::sync::watch::channel(false);
2626 let mut handler = CapturingHandler::default();
2627
2628 let result = executor
2629 .run_observe_streaming("printf 'alpha\\nbeta\\n'", rx, &mut handler)
2630 .await
2631 .expect("run_observe_streaming");
2632
2633 assert!(result.success);
2634 let captured = handler.texts.join("");
2635 assert!(captured.contains("alpha"), "captured: {captured}");
2636 assert!(captured.contains("beta"), "captured: {captured}");
2637 assert!(handler.completions.is_empty());
2638 assert!(result.extracted_text.is_empty());
2639 }
2640
2641 #[cfg(unix)]
2642 #[tokio::test]
2643 async fn test_run_observe_streaming_parses_stream_json() {
2644 let temp_dir = TempDir::new().expect("temp dir");
2645 let backend = CliBackend {
2646 command: "sh".to_string(),
2647 args: vec!["-c".to_string()],
2648 prompt_mode: PromptMode::Arg,
2649 prompt_flag: None,
2650 output_format: OutputFormat::StreamJson,
2651 env_vars: vec![],
2652 };
2653 let config = PtyConfig {
2654 interactive: false,
2655 idle_timeout_secs: 0,
2656 cols: 80,
2657 rows: 24,
2658 workspace_root: temp_dir.path().to_path_buf(),
2659 };
2660 let executor = PtyExecutor::new(backend, config);
2661 let (_tx, rx) = tokio::sync::watch::channel(false);
2662 let mut handler = CapturingHandler::default();
2663
2664 let script = r#"printf '%s\n' '{"type":"assistant","message":{"content":[{"type":"text","text":"Hello stream"}]}}' '{"type":"result","duration_ms":1,"total_cost_usd":0.0,"num_turns":1,"is_error":false}'"#;
2665 let result = executor
2666 .run_observe_streaming(script, rx, &mut handler)
2667 .await
2668 .expect("run_observe_streaming");
2669
2670 assert!(result.success);
2671 assert!(
2672 handler
2673 .texts
2674 .iter()
2675 .any(|text| text.contains("Hello stream"))
2676 );
2677 assert_eq!(handler.completions.len(), 1);
2678 assert!(result.extracted_text.contains("Hello stream"));
2679 assert_eq!(result.termination, TerminationType::Natural);
2680 }
2681
2682 #[cfg(unix)]
2683 #[tokio::test]
2684 async fn test_run_interactive_in_tui_mode() {
2685 let temp_dir = TempDir::new().expect("temp dir");
2686 let backend = CliBackend {
2687 command: "sh".to_string(),
2688 args: vec!["-c".to_string()],
2689 prompt_mode: PromptMode::Arg,
2690 prompt_flag: None,
2691 output_format: OutputFormat::Text,
2692 env_vars: vec![],
2693 };
2694 let config = PtyConfig {
2695 interactive: true,
2696 idle_timeout_secs: 0,
2697 cols: 80,
2698 rows: 24,
2699 workspace_root: temp_dir.path().to_path_buf(),
2700 };
2701 let mut executor = PtyExecutor::new(backend, config);
2702 executor.set_tui_mode(true);
2703 let (_tx, rx) = tokio::sync::watch::channel(false);
2704
2705 let result = executor
2706 .run_interactive("echo hello-tui", rx)
2707 .await
2708 .expect("run_interactive");
2709
2710 assert!(result.success);
2711 assert!(result.output.contains("hello-tui"));
2712 assert!(result.stripped_output.contains("hello-tui"));
2713 assert_eq!(result.exit_code, Some(0));
2714 assert_eq!(result.termination, TerminationType::Natural);
2715 }
2716}