1#![allow(clippy::cast_possible_wrap)]
20
21use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
22use crate::cli_backend::{CliBackend, OutputFormat};
23use crate::copilot_stream::{
24 CopilotStreamParser, CopilotStreamState, dispatch_copilot_stream_event,
25};
26use crate::pi_stream::{PiSessionState, PiStreamParser, dispatch_pi_stream_event};
27use crate::stream_handler::{SessionResult, StreamHandler};
28#[cfg(unix)]
29use nix::sys::signal::{Signal, kill};
30#[cfg(unix)]
31use nix::unistd::Pid;
32use portable_pty::{CommandBuilder, PtyPair, PtySize, native_pty_system};
33use std::env;
34use std::io::{self, Read, Write};
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::{Duration, Instant};
38use tokio::sync::{mpsc, watch};
39use tracing::{debug, info, warn};
40
41#[derive(Debug)]
43pub struct PtyExecutionResult {
44 pub output: String,
46 pub stripped_output: String,
48 pub extracted_text: String,
54 pub success: bool,
56 pub exit_code: Option<i32>,
58 pub termination: TerminationType,
60 pub total_cost_usd: f64,
62 pub input_tokens: u64,
64 pub output_tokens: u64,
66 pub cache_read_tokens: u64,
68 pub cache_write_tokens: u64,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum TerminationType {
75 Natural,
77 IdleTimeout,
79 UserInterrupt,
81 ForceKill,
83}
84
85#[derive(Debug, Clone)]
87pub struct PtyConfig {
88 pub interactive: bool,
90 pub idle_timeout_secs: u32,
92 pub cols: u16,
94 pub rows: u16,
96 pub workspace_root: std::path::PathBuf,
100}
101
102impl Default for PtyConfig {
103 fn default() -> Self {
104 Self {
105 interactive: true,
106 idle_timeout_secs: 30,
107 cols: 80,
108 rows: 24,
109 workspace_root: std::env::current_dir()
110 .unwrap_or_else(|_| std::path::PathBuf::from(".")),
111 }
112 }
113}
114
115impl PtyConfig {
116 pub fn from_env() -> Self {
118 let cols = std::env::var("COLUMNS")
119 .ok()
120 .and_then(|s| s.parse().ok())
121 .unwrap_or(80);
122 let rows = std::env::var("LINES")
123 .ok()
124 .and_then(|s| s.parse().ok())
125 .unwrap_or(24);
126
127 Self {
128 cols,
129 rows,
130 ..Default::default()
131 }
132 }
133
134 pub fn with_workspace_root(mut self, root: impl Into<std::path::PathBuf>) -> Self {
136 self.workspace_root = root.into();
137 self
138 }
139}
140
141#[derive(Debug)]
143pub struct CtrlCState {
144 first_press: Option<Instant>,
146 window: Duration,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum CtrlCAction {
153 ForwardAndStartWindow,
155 Terminate,
157}
158
159impl CtrlCState {
160 pub fn new() -> Self {
162 Self {
163 first_press: None,
164 window: Duration::from_secs(1),
165 }
166 }
167
168 pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
170 match self.first_press {
171 Some(first) if now.duration_since(first) < self.window => {
172 self.first_press = None;
174 CtrlCAction::Terminate
175 }
176 _ => {
177 self.first_press = Some(now);
179 CtrlCAction::ForwardAndStartWindow
180 }
181 }
182 }
183}
184
185impl Default for CtrlCState {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191pub struct PtyExecutor {
193 backend: CliBackend,
194 config: PtyConfig,
195 output_tx: mpsc::UnboundedSender<Vec<u8>>,
197 output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
198 input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
199 input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
200 control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
201 control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
202 terminated_tx: watch::Sender<bool>,
204 terminated_rx: Option<watch::Receiver<bool>>,
205 tui_mode: bool,
209}
210
211impl PtyExecutor {
212 pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
214 let (output_tx, output_rx) = mpsc::unbounded_channel();
215 let (input_tx, input_rx) = mpsc::unbounded_channel();
216 let (control_tx, control_rx) = mpsc::unbounded_channel();
217 let (terminated_tx, terminated_rx) = watch::channel(false);
218
219 Self {
220 backend,
221 config,
222 output_tx,
223 output_rx: Some(output_rx),
224 input_tx: Some(input_tx),
225 input_rx,
226 control_tx: Some(control_tx),
227 control_rx,
228 terminated_tx,
229 terminated_rx: Some(terminated_rx),
230 tui_mode: false,
231 }
232 }
233
234 pub fn set_tui_mode(&mut self, enabled: bool) {
243 self.tui_mode = enabled;
244 }
245
246 pub fn set_backend(&mut self, backend: CliBackend) {
254 self.backend = backend;
255 }
256
257 pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
261 crate::pty_handle::PtyHandle {
262 output_rx: self.output_rx.take().expect("handle() already called"),
263 input_tx: self.input_tx.take().expect("handle() already called"),
264 control_tx: self.control_tx.take().expect("handle() already called"),
265 terminated_rx: self.terminated_rx.take().expect("handle() already called"),
266 }
267 }
268
269 fn spawn_pty(
278 &self,
279 prompt: &str,
280 ) -> io::Result<(
281 PtyPair,
282 Box<dyn portable_pty::Child + Send>,
283 Option<String>,
284 Option<tempfile::NamedTempFile>,
285 )> {
286 let pty_system = native_pty_system();
287
288 let pair = pty_system
289 .openpty(PtySize {
290 rows: self.config.rows,
291 cols: self.config.cols,
292 pixel_width: 0,
293 pixel_height: 0,
294 })
295 .map_err(|e| io::Error::other(e.to_string()))?;
296
297 let use_pty_safe = !self.config.interactive && prompt.len() > 4000;
304 let (cmd, args, stdin_input, temp_file) = if use_pty_safe {
305 self.backend.build_command_pty(prompt)
306 } else {
307 self.backend.build_command(prompt, self.config.interactive)
308 };
309
310 let mut cmd_builder = CommandBuilder::new(&cmd);
311 cmd_builder.args(&args);
312
313 cmd_builder.cwd(&self.config.workspace_root);
316
317 cmd_builder.env("TERM", "xterm-256color");
319 inject_ralph_runtime_env(&mut cmd_builder, &self.config.workspace_root);
320
321 for (key, value) in &self.backend.env_vars {
323 cmd_builder.env(key, value);
324 }
325 let child = pair
326 .slave
327 .spawn_command(cmd_builder)
328 .map_err(|e| io::Error::other(e.to_string()))?;
329
330 Ok((pair, child, stdin_input, temp_file))
332 }
333
334 pub async fn run_observe(
351 &self,
352 prompt: &str,
353 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
354 ) -> io::Result<PtyExecutionResult> {
355 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
357
358 let reader = pair
359 .master
360 .try_clone_reader()
361 .map_err(|e| io::Error::other(e.to_string()))?;
362
363 if let Some(ref input) = stdin_input {
365 tokio::time::sleep(Duration::from_millis(100)).await;
367 let mut writer = pair
368 .master
369 .take_writer()
370 .map_err(|e| io::Error::other(e.to_string()))?;
371 writer.write_all(input.as_bytes())?;
372 writer.write_all(b"\n")?;
373 writer.flush()?;
374 }
375
376 drop(pair.slave);
378
379 let mut output = Vec::new();
380 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
381 None
382 } else {
383 Some(Duration::from_secs(u64::from(
384 self.config.idle_timeout_secs,
385 )))
386 };
387
388 let mut termination = TerminationType::Natural;
389 let mut last_activity = Instant::now();
390
391 let should_terminate = Arc::new(AtomicBool::new(false));
393
394 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
396 let should_terminate_reader = Arc::clone(&should_terminate);
397 let tui_connected = self.tui_mode;
399 let tui_output_tx = if tui_connected {
400 Some(self.output_tx.clone())
401 } else {
402 None
403 };
404
405 debug!("Spawning PTY output reader thread (observe mode)");
406 std::thread::spawn(move || {
407 let mut reader = reader;
408 let mut buf = [0u8; 4096];
409
410 loop {
411 if should_terminate_reader.load(Ordering::SeqCst) {
412 debug!("PTY reader: termination requested");
413 break;
414 }
415
416 match reader.read(&mut buf) {
417 Ok(0) => {
418 debug!("PTY reader: EOF");
419 let _ = output_tx.blocking_send(OutputEvent::Eof);
420 break;
421 }
422 Ok(n) => {
423 let data = buf[..n].to_vec();
424 if let Some(ref tx) = tui_output_tx {
426 let _ = tx.send(data.clone());
427 }
428 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
430 break;
431 }
432 }
433 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
434 std::thread::sleep(Duration::from_millis(10));
435 }
436 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
437 Err(e) => {
438 debug!(error = %e, "PTY reader error");
439 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
440 break;
441 }
442 }
443 }
444 });
445
446 loop {
448 let idle_timeout = timeout_duration.map(|d| {
450 let elapsed = last_activity.elapsed();
451 if elapsed >= d {
452 Duration::from_millis(1) } else {
454 d.saturating_sub(elapsed)
455 }
456 });
457
458 tokio::select! {
459 _ = interrupt_rx.changed() => {
461 if *interrupt_rx.borrow() {
462 debug!("Interrupt received in observe mode, terminating");
463 termination = TerminationType::UserInterrupt;
464 should_terminate.store(true, Ordering::SeqCst);
465 let _ = self.terminate_child(&mut child, true).await;
466 break;
467 }
468 }
469
470 event = output_rx.recv() => {
472 match event {
473 Some(OutputEvent::Data(data)) => {
474 if !tui_connected {
476 io::stdout().write_all(&data)?;
477 io::stdout().flush()?;
478 }
479 output.extend_from_slice(&data);
480 last_activity = Instant::now();
481 }
482 Some(OutputEvent::Eof) | None => {
483 debug!("Output channel closed, process likely exited");
484 break;
485 }
486 Some(OutputEvent::Error(e)) => {
487 debug!(error = %e, "Reader thread reported error");
488 break;
489 }
490 }
491 }
492
493 _ = async {
495 if let Some(timeout) = idle_timeout {
496 tokio::time::sleep(timeout).await;
497 } else {
498 std::future::pending::<()>().await;
500 }
501 } => {
502 warn!(
503 timeout_secs = self.config.idle_timeout_secs,
504 "Idle timeout triggered"
505 );
506 termination = TerminationType::IdleTimeout;
507 should_terminate.store(true, Ordering::SeqCst);
508 self.terminate_child(&mut child, true).await?;
509 break;
510 }
511 }
512
513 if let Some(status) = child
515 .try_wait()
516 .map_err(|e| io::Error::other(e.to_string()))?
517 {
518 let exit_code = status.exit_code() as i32;
519 debug!(exit_status = ?status, exit_code, "Child process exited");
520
521 while let Ok(event) = output_rx.try_recv() {
523 if let OutputEvent::Data(data) = event {
524 if !tui_connected {
525 io::stdout().write_all(&data)?;
526 io::stdout().flush()?;
527 }
528 output.extend_from_slice(&data);
529 }
530 }
531
532 let drain_deadline = Instant::now() + Duration::from_millis(200);
535 loop {
536 let remaining = drain_deadline.saturating_duration_since(Instant::now());
537 if remaining.is_zero() {
538 break;
539 }
540 match tokio::time::timeout(remaining, output_rx.recv()).await {
541 Ok(Some(OutputEvent::Data(data))) => {
542 if !tui_connected {
543 io::stdout().write_all(&data)?;
544 io::stdout().flush()?;
545 }
546 output.extend_from_slice(&data);
547 }
548 Ok(Some(OutputEvent::Eof) | None) => break,
549 Ok(Some(OutputEvent::Error(e))) => {
550 debug!(error = %e, "PTY read error after exit");
551 break;
552 }
553 Err(_) => break,
554 }
555 }
556
557 let final_termination = resolve_termination_type(exit_code, termination);
558 return Ok(build_result(
560 &output,
561 status.success(),
562 Some(exit_code),
563 final_termination,
564 String::new(),
565 None,
566 ));
567 }
568 }
569
570 should_terminate.store(true, Ordering::SeqCst);
572
573 let status = self
575 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
576 .await?;
577
578 let (success, exit_code, final_termination) = match status {
579 Some(s) => {
580 let code = s.exit_code() as i32;
581 (
582 s.success(),
583 Some(code),
584 resolve_termination_type(code, termination),
585 )
586 }
587 None => {
588 warn!("Timed out waiting for child to exit after termination");
589 (false, None, termination)
590 }
591 };
592
593 Ok(build_result(
595 &output,
596 success,
597 exit_code,
598 final_termination,
599 String::new(),
600 None,
601 ))
602 }
603
604 pub async fn run_observe_streaming<H: StreamHandler>(
620 &self,
621 prompt: &str,
622 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
623 handler: &mut H,
624 ) -> io::Result<PtyExecutionResult> {
625 let output_format = self.backend.output_format;
627
628 let is_stream_json = output_format == OutputFormat::StreamJson;
633 let is_copilot_stream = output_format == OutputFormat::CopilotStreamJson;
634 let is_pi_stream = output_format == OutputFormat::PiStreamJson;
635 let show_pi_thinking = is_pi_stream && self.tui_mode;
637 let is_real_pi_backend = self.backend.command == "pi";
638
639 if is_pi_stream && is_real_pi_backend {
640 let configured_provider =
641 extract_cli_flag_value(&self.backend.args, "--provider", "-p")
642 .unwrap_or_else(|| "auto".to_string());
643 let configured_model = extract_cli_flag_value(&self.backend.args, "--model", "-m")
644 .unwrap_or_else(|| "default".to_string());
645 handler.on_text(&format!(
646 "Pi configured: provider={configured_provider}, model={configured_model}\n"
647 ));
648 }
649
650 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
652
653 let reader = pair
654 .master
655 .try_clone_reader()
656 .map_err(|e| io::Error::other(e.to_string()))?;
657
658 if let Some(ref input) = stdin_input {
660 tokio::time::sleep(Duration::from_millis(100)).await;
661 let mut writer = pair
662 .master
663 .take_writer()
664 .map_err(|e| io::Error::other(e.to_string()))?;
665 writer.write_all(input.as_bytes())?;
666 writer.write_all(b"\n")?;
667 writer.flush()?;
668 }
669
670 drop(pair.slave);
671
672 let mut output = Vec::new();
673 let mut line_buffer = String::new();
674 let mut extracted_text = String::new();
676 let mut pi_state = PiSessionState::new();
678 let mut copilot_state = CopilotStreamState::new();
679 let mut completion: Option<SessionResult> = None;
680 let start_time = Instant::now();
681 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
682 None
683 } else {
684 Some(Duration::from_secs(u64::from(
685 self.config.idle_timeout_secs,
686 )))
687 };
688
689 let mut termination = TerminationType::Natural;
690 let mut last_activity = Instant::now();
691
692 let should_terminate = Arc::new(AtomicBool::new(false));
693
694 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
696 let should_terminate_reader = Arc::clone(&should_terminate);
697 let tui_connected = self.tui_mode;
698 let tui_output_tx = if tui_connected {
699 Some(self.output_tx.clone())
700 } else {
701 None
702 };
703
704 debug!("Spawning PTY output reader thread (streaming mode)");
705 std::thread::spawn(move || {
706 let mut reader = reader;
707 let mut buf = [0u8; 4096];
708
709 loop {
710 if should_terminate_reader.load(Ordering::SeqCst) {
711 debug!("PTY reader: termination requested");
712 break;
713 }
714
715 match reader.read(&mut buf) {
716 Ok(0) => {
717 debug!("PTY reader: EOF");
718 let _ = output_tx.blocking_send(OutputEvent::Eof);
719 break;
720 }
721 Ok(n) => {
722 let data = buf[..n].to_vec();
723 if let Some(ref tx) = tui_output_tx {
724 let _ = tx.send(data.clone());
725 }
726 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
727 break;
728 }
729 }
730 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
731 std::thread::sleep(Duration::from_millis(10));
732 }
733 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
734 Err(e) => {
735 debug!(error = %e, "PTY reader error");
736 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
737 break;
738 }
739 }
740 }
741 });
742
743 loop {
745 let idle_timeout = timeout_duration.map(|d| {
746 let elapsed = last_activity.elapsed();
747 if elapsed >= d {
748 Duration::from_millis(1)
749 } else {
750 d.saturating_sub(elapsed)
751 }
752 });
753
754 tokio::select! {
755 _ = interrupt_rx.changed() => {
756 if *interrupt_rx.borrow() {
757 debug!("Interrupt received in streaming observe mode, terminating");
758 termination = TerminationType::UserInterrupt;
759 should_terminate.store(true, Ordering::SeqCst);
760 let _ = self.terminate_child(&mut child, true).await;
761 break;
762 }
763 }
764
765 event = output_rx.recv() => {
766 match event {
767 Some(OutputEvent::Data(data)) => {
768 output.extend_from_slice(&data);
769 last_activity = Instant::now();
770
771 if let Ok(text) = std::str::from_utf8(&data) {
772 if is_stream_json {
773 line_buffer.push_str(text);
775
776 while let Some(newline_pos) = line_buffer.find('\n') {
778 let line = line_buffer[..newline_pos].to_string();
779 line_buffer = line_buffer[newline_pos + 1..].to_string();
780
781 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
782 if let ClaudeStreamEvent::Result {
783 duration_ms,
784 total_cost_usd,
785 num_turns,
786 is_error,
787 } = &event
788 {
789 completion = Some(SessionResult {
790 duration_ms: *duration_ms,
791 total_cost_usd: *total_cost_usd,
792 num_turns: *num_turns,
793 is_error: *is_error,
794 ..Default::default()
795 });
796 }
797 dispatch_stream_event(event, handler, &mut extracted_text);
798 }
799 }
800 } else if is_copilot_stream {
801 line_buffer.push_str(text);
802
803 while let Some(newline_pos) = line_buffer.find('\n') {
804 let line = line_buffer[..newline_pos].to_string();
805 line_buffer = line_buffer[newline_pos + 1..].to_string();
806
807 if let Some(session_result) = handle_copilot_stream_line(
808 &line,
809 handler,
810 &mut extracted_text,
811 &mut copilot_state,
812 ) {
813 completion = Some(session_result);
814 }
815 }
816 } else if is_pi_stream {
817 line_buffer.push_str(text);
819
820 while let Some(newline_pos) = line_buffer.find('\n') {
821 let line = line_buffer[..newline_pos].to_string();
822 line_buffer = line_buffer[newline_pos + 1..].to_string();
823
824 if let Some(event) = PiStreamParser::parse_line(&line) {
825 dispatch_pi_stream_event(
826 event,
827 handler,
828 &mut extracted_text,
829 &mut pi_state,
830 show_pi_thinking,
831 );
832 }
833 }
834 } else {
835 handler.on_text(text);
838 }
839 }
840 }
841 Some(OutputEvent::Eof) | None => {
842 debug!("Output channel closed");
843 if is_stream_json && !line_buffer.is_empty()
845 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
846 {
847 if let ClaudeStreamEvent::Result {
848 duration_ms,
849 total_cost_usd,
850 num_turns,
851 is_error,
852 } = &event
853 {
854 completion = Some(SessionResult {
855 duration_ms: *duration_ms,
856 total_cost_usd: *total_cost_usd,
857 num_turns: *num_turns,
858 is_error: *is_error,
859 ..Default::default()
860 });
861 }
862 dispatch_stream_event(event, handler, &mut extracted_text);
863 } else if is_copilot_stream && !line_buffer.is_empty() {
864 if let Some(session_result) = handle_copilot_stream_line(
865 &line_buffer,
866 handler,
867 &mut extracted_text,
868 &mut copilot_state,
869 ) {
870 completion = Some(session_result);
871 }
872 } else if is_pi_stream && !line_buffer.is_empty()
873 && let Some(event) = PiStreamParser::parse_line(&line_buffer)
874 {
875 dispatch_pi_stream_event(
876 event,
877 handler,
878 &mut extracted_text,
879 &mut pi_state,
880 show_pi_thinking,
881 );
882 }
883 break;
884 }
885 Some(OutputEvent::Error(e)) => {
886 debug!(error = %e, "Reader thread reported error");
887 handler.on_error(&e);
888 break;
889 }
890 }
891 }
892
893 _ = async {
894 if let Some(timeout) = idle_timeout {
895 tokio::time::sleep(timeout).await;
896 } else {
897 std::future::pending::<()>().await;
898 }
899 } => {
900 warn!(
901 timeout_secs = self.config.idle_timeout_secs,
902 "Idle timeout triggered"
903 );
904 termination = TerminationType::IdleTimeout;
905 should_terminate.store(true, Ordering::SeqCst);
906 self.terminate_child(&mut child, true).await?;
907 break;
908 }
909 }
910
911 if let Some(status) = child
913 .try_wait()
914 .map_err(|e| io::Error::other(e.to_string()))?
915 {
916 let exit_code = status.exit_code() as i32;
917 debug!(exit_status = ?status, exit_code, "Child process exited");
918
919 while let Ok(event) = output_rx.try_recv() {
921 if let OutputEvent::Data(data) = event {
922 output.extend_from_slice(&data);
923 if let Ok(text) = std::str::from_utf8(&data) {
924 if is_stream_json {
925 line_buffer.push_str(text);
927 while let Some(newline_pos) = line_buffer.find('\n') {
928 let line = line_buffer[..newline_pos].to_string();
929 line_buffer = line_buffer[newline_pos + 1..].to_string();
930 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
931 if let ClaudeStreamEvent::Result {
932 duration_ms,
933 total_cost_usd,
934 num_turns,
935 is_error,
936 } = &event
937 {
938 completion = Some(SessionResult {
939 duration_ms: *duration_ms,
940 total_cost_usd: *total_cost_usd,
941 num_turns: *num_turns,
942 is_error: *is_error,
943 ..Default::default()
944 });
945 }
946 dispatch_stream_event(event, handler, &mut extracted_text);
947 }
948 }
949 } else if is_copilot_stream {
950 line_buffer.push_str(text);
951 while let Some(newline_pos) = line_buffer.find('\n') {
952 let line = line_buffer[..newline_pos].to_string();
953 line_buffer = line_buffer[newline_pos + 1..].to_string();
954 if let Some(session_result) = handle_copilot_stream_line(
955 &line,
956 handler,
957 &mut extracted_text,
958 &mut copilot_state,
959 ) {
960 completion = Some(session_result);
961 }
962 }
963 } else if is_pi_stream {
964 line_buffer.push_str(text);
966 while let Some(newline_pos) = line_buffer.find('\n') {
967 let line = line_buffer[..newline_pos].to_string();
968 line_buffer = line_buffer[newline_pos + 1..].to_string();
969 if let Some(event) = PiStreamParser::parse_line(&line) {
970 dispatch_pi_stream_event(
971 event,
972 handler,
973 &mut extracted_text,
974 &mut pi_state,
975 show_pi_thinking,
976 );
977 }
978 }
979 } else {
980 handler.on_text(text);
982 }
983 }
984 }
985 }
986
987 let drain_deadline = Instant::now() + Duration::from_millis(200);
990 loop {
991 let remaining = drain_deadline.saturating_duration_since(Instant::now());
992 if remaining.is_zero() {
993 break;
994 }
995 match tokio::time::timeout(remaining, output_rx.recv()).await {
996 Ok(Some(OutputEvent::Data(data))) => {
997 output.extend_from_slice(&data);
998 if let Ok(text) = std::str::from_utf8(&data) {
999 if is_stream_json {
1000 line_buffer.push_str(text);
1002 while let Some(newline_pos) = line_buffer.find('\n') {
1003 let line = line_buffer[..newline_pos].to_string();
1004 line_buffer = line_buffer[newline_pos + 1..].to_string();
1005 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
1006 if let ClaudeStreamEvent::Result {
1007 duration_ms,
1008 total_cost_usd,
1009 num_turns,
1010 is_error,
1011 } = &event
1012 {
1013 completion = Some(SessionResult {
1014 duration_ms: *duration_ms,
1015 total_cost_usd: *total_cost_usd,
1016 num_turns: *num_turns,
1017 is_error: *is_error,
1018 ..Default::default()
1019 });
1020 }
1021 dispatch_stream_event(
1022 event,
1023 handler,
1024 &mut extracted_text,
1025 );
1026 }
1027 }
1028 } else if is_copilot_stream {
1029 line_buffer.push_str(text);
1030 while let Some(newline_pos) = line_buffer.find('\n') {
1031 let line = line_buffer[..newline_pos].to_string();
1032 line_buffer = line_buffer[newline_pos + 1..].to_string();
1033 handle_copilot_stream_line(
1034 &line,
1035 handler,
1036 &mut extracted_text,
1037 &mut copilot_state,
1038 );
1039 }
1040 } else if is_pi_stream {
1041 line_buffer.push_str(text);
1043 while let Some(newline_pos) = line_buffer.find('\n') {
1044 let line = line_buffer[..newline_pos].to_string();
1045 line_buffer = line_buffer[newline_pos + 1..].to_string();
1046 if let Some(event) = PiStreamParser::parse_line(&line) {
1047 dispatch_pi_stream_event(
1048 event,
1049 handler,
1050 &mut extracted_text,
1051 &mut pi_state,
1052 show_pi_thinking,
1053 );
1054 }
1055 }
1056 } else {
1057 handler.on_text(text);
1059 }
1060 }
1061 }
1062 Ok(Some(OutputEvent::Eof) | None) => break,
1063 Ok(Some(OutputEvent::Error(e))) => {
1064 debug!(error = %e, "PTY read error after exit");
1065 break;
1066 }
1067 Err(_) => break,
1068 }
1069 }
1070
1071 if is_stream_json
1073 && !line_buffer.is_empty()
1074 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
1075 {
1076 if let ClaudeStreamEvent::Result {
1077 duration_ms,
1078 total_cost_usd,
1079 num_turns,
1080 is_error,
1081 } = &event
1082 {
1083 completion = Some(SessionResult {
1084 duration_ms: *duration_ms,
1085 total_cost_usd: *total_cost_usd,
1086 num_turns: *num_turns,
1087 is_error: *is_error,
1088 ..Default::default()
1089 });
1090 }
1091 dispatch_stream_event(event, handler, &mut extracted_text);
1092 } else if is_copilot_stream && !line_buffer.is_empty() {
1093 if let Some(session_result) = handle_copilot_stream_line(
1094 &line_buffer,
1095 handler,
1096 &mut extracted_text,
1097 &mut copilot_state,
1098 ) {
1099 completion = Some(session_result);
1100 }
1101 } else if is_pi_stream
1102 && !line_buffer.is_empty()
1103 && let Some(event) = PiStreamParser::parse_line(&line_buffer)
1104 {
1105 dispatch_pi_stream_event(
1106 event,
1107 handler,
1108 &mut extracted_text,
1109 &mut pi_state,
1110 show_pi_thinking,
1111 );
1112 }
1113
1114 let final_termination = resolve_termination_type(exit_code, termination);
1115
1116 if is_pi_stream {
1118 if is_real_pi_backend {
1119 let stream_provider =
1120 pi_state.stream_provider.as_deref().unwrap_or("unknown");
1121 let stream_model = pi_state.stream_model.as_deref().unwrap_or("unknown");
1122 handler.on_text(&format!(
1123 "Pi stream: provider={stream_provider}, model={stream_model}\n"
1124 ));
1125 }
1126 let session_result = SessionResult {
1127 duration_ms: start_time.elapsed().as_millis() as u64,
1128 total_cost_usd: pi_state.total_cost_usd,
1129 num_turns: pi_state.num_turns,
1130 is_error: !status.success(),
1131 input_tokens: pi_state.input_tokens,
1132 output_tokens: pi_state.output_tokens,
1133 cache_read_tokens: pi_state.cache_read_tokens,
1134 cache_write_tokens: pi_state.cache_write_tokens,
1135 };
1136 handler.on_complete(&session_result);
1137 completion = Some(session_result);
1138 }
1139
1140 return Ok(build_result(
1142 &output,
1143 status.success(),
1144 Some(exit_code),
1145 final_termination,
1146 extracted_text,
1147 completion.as_ref(),
1148 ));
1149 }
1150 }
1151
1152 should_terminate.store(true, Ordering::SeqCst);
1153
1154 let status = self
1155 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1156 .await?;
1157
1158 let (success, exit_code, final_termination) = match status {
1159 Some(s) => {
1160 let code = s.exit_code() as i32;
1161 (
1162 s.success(),
1163 Some(code),
1164 resolve_termination_type(code, termination),
1165 )
1166 }
1167 None => {
1168 warn!("Timed out waiting for child to exit after termination");
1169 (false, None, termination)
1170 }
1171 };
1172
1173 if is_pi_stream {
1175 if is_real_pi_backend {
1176 let stream_provider = pi_state.stream_provider.as_deref().unwrap_or("unknown");
1177 let stream_model = pi_state.stream_model.as_deref().unwrap_or("unknown");
1178 handler.on_text(&format!(
1179 "Pi stream: provider={stream_provider}, model={stream_model}\n"
1180 ));
1181 }
1182 let session_result = SessionResult {
1183 duration_ms: start_time.elapsed().as_millis() as u64,
1184 total_cost_usd: pi_state.total_cost_usd,
1185 num_turns: pi_state.num_turns,
1186 is_error: !success,
1187 input_tokens: pi_state.input_tokens,
1188 output_tokens: pi_state.output_tokens,
1189 cache_read_tokens: pi_state.cache_read_tokens,
1190 cache_write_tokens: pi_state.cache_write_tokens,
1191 };
1192 handler.on_complete(&session_result);
1193 completion = Some(session_result);
1194 }
1195
1196 Ok(build_result(
1198 &output,
1199 success,
1200 exit_code,
1201 final_termination,
1202 extracted_text,
1203 completion.as_ref(),
1204 ))
1205 }
1206
1207 #[allow(clippy::too_many_lines)] pub async fn run_interactive(
1228 &mut self,
1229 prompt: &str,
1230 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
1231 ) -> io::Result<PtyExecutionResult> {
1232 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
1234
1235 let reader = pair
1236 .master
1237 .try_clone_reader()
1238 .map_err(|e| io::Error::other(e.to_string()))?;
1239 let mut writer = pair
1240 .master
1241 .take_writer()
1242 .map_err(|e| io::Error::other(e.to_string()))?;
1243
1244 let master = pair.master;
1246
1247 drop(pair.slave);
1249
1250 let pending_stdin = stdin_input;
1252
1253 let mut output = Vec::new();
1254 let timeout_duration = if self.config.idle_timeout_secs > 0 {
1255 Some(Duration::from_secs(u64::from(
1256 self.config.idle_timeout_secs,
1257 )))
1258 } else {
1259 None
1260 };
1261
1262 let mut ctrl_c_state = CtrlCState::new();
1263 let mut termination = TerminationType::Natural;
1264 let mut last_activity = Instant::now();
1265
1266 let should_terminate = Arc::new(AtomicBool::new(false));
1268
1269 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
1271 let should_terminate_output = Arc::clone(&should_terminate);
1272 let tui_connected = self.tui_mode;
1274 let tui_output_tx = if tui_connected {
1275 Some(self.output_tx.clone())
1276 } else {
1277 None
1278 };
1279
1280 debug!("Spawning PTY output reader thread");
1281 std::thread::spawn(move || {
1282 debug!("PTY output reader thread started");
1283 let mut reader = reader;
1284 let mut buf = [0u8; 4096];
1285
1286 loop {
1287 if should_terminate_output.load(Ordering::SeqCst) {
1288 debug!("PTY output reader: termination requested");
1289 break;
1290 }
1291
1292 match reader.read(&mut buf) {
1293 Ok(0) => {
1294 debug!("PTY output reader: EOF received");
1296 let _ = output_tx.blocking_send(OutputEvent::Eof);
1297 break;
1298 }
1299 Ok(n) => {
1300 let data = buf[..n].to_vec();
1301 if let Some(ref tx) = tui_output_tx {
1303 let _ = tx.send(data.clone());
1304 }
1305 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
1307 debug!("PTY output reader: channel closed");
1308 break;
1309 }
1310 }
1311 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1312 std::thread::sleep(Duration::from_millis(1));
1314 }
1315 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
1316 }
1318 Err(e) => {
1319 warn!("PTY output reader: error - {}", e);
1320 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
1321 break;
1322 }
1323 }
1324 }
1325 debug!("PTY output reader thread exiting");
1326 });
1327
1328 let mut input_rx = if tui_connected {
1333 debug!("TUI connected - skipping stdin reader thread");
1334 None
1335 } else {
1336 let (input_tx, input_rx) = mpsc::unbounded_channel::<InputEvent>();
1337 let should_terminate_input = Arc::clone(&should_terminate);
1338
1339 std::thread::spawn(move || {
1340 let mut stdin = io::stdin();
1341 let mut buf = [0u8; 1];
1342
1343 loop {
1344 if should_terminate_input.load(Ordering::SeqCst) {
1345 break;
1346 }
1347
1348 match stdin.read(&mut buf) {
1349 Ok(0) => break, Ok(1) => {
1351 let byte = buf[0];
1352 let event = match byte {
1353 3 => InputEvent::CtrlC, 28 => InputEvent::CtrlBackslash, _ => InputEvent::Data(vec![byte]),
1356 };
1357 if input_tx.send(event).is_err() {
1358 break;
1359 }
1360 }
1361 Ok(_) => {} Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
1363 Err(_) => break,
1364 }
1365 }
1366 });
1367 Some(input_rx)
1368 };
1369
1370 if let Some(ref input) = pending_stdin {
1373 tokio::time::sleep(Duration::from_millis(100)).await;
1374 writer.write_all(input.as_bytes())?;
1375 writer.write_all(b"\n")?;
1376 writer.flush()?;
1377 last_activity = Instant::now();
1378 }
1379
1380 loop {
1383 if let Some(status) = child
1385 .try_wait()
1386 .map_err(|e| io::Error::other(e.to_string()))?
1387 {
1388 let exit_code = status.exit_code() as i32;
1389 debug!(exit_status = ?status, exit_code, "Child process exited");
1390
1391 while let Ok(event) = output_rx.try_recv() {
1393 if let OutputEvent::Data(data) = event {
1394 if !tui_connected {
1395 io::stdout().write_all(&data)?;
1396 io::stdout().flush()?;
1397 }
1398 output.extend_from_slice(&data);
1399 }
1400 }
1401
1402 let drain_deadline = Instant::now() + Duration::from_millis(200);
1405 loop {
1406 let remaining = drain_deadline.saturating_duration_since(Instant::now());
1407 if remaining.is_zero() {
1408 break;
1409 }
1410 match tokio::time::timeout(remaining, output_rx.recv()).await {
1411 Ok(Some(OutputEvent::Data(data))) => {
1412 if !tui_connected {
1413 io::stdout().write_all(&data)?;
1414 io::stdout().flush()?;
1415 }
1416 output.extend_from_slice(&data);
1417 }
1418 Ok(Some(OutputEvent::Eof) | None) => break,
1419 Ok(Some(OutputEvent::Error(e))) => {
1420 debug!(error = %e, "PTY read error after exit");
1421 break;
1422 }
1423 Err(_) => break, }
1425 }
1426
1427 should_terminate.store(true, Ordering::SeqCst);
1428 let _ = self.terminated_tx.send(true);
1430
1431 let final_termination = resolve_termination_type(exit_code, termination);
1432 return Ok(build_result(
1434 &output,
1435 status.success(),
1436 Some(exit_code),
1437 final_termination,
1438 String::new(),
1439 None,
1440 ));
1441 }
1442
1443 let timeout_future = async {
1445 match timeout_duration {
1446 Some(d) => {
1447 let elapsed = last_activity.elapsed();
1448 if elapsed >= d {
1449 tokio::time::sleep(Duration::ZERO).await
1450 } else {
1451 tokio::time::sleep(d.saturating_sub(elapsed)).await
1452 }
1453 }
1454 None => std::future::pending::<()>().await,
1455 }
1456 };
1457
1458 tokio::select! {
1459 output_event = output_rx.recv() => {
1461 match output_event {
1462 Some(OutputEvent::Data(data)) => {
1463 if !tui_connected {
1465 io::stdout().write_all(&data)?;
1466 io::stdout().flush()?;
1467 }
1468 output.extend_from_slice(&data);
1469
1470 last_activity = Instant::now();
1471 }
1472 Some(OutputEvent::Eof) => {
1473 debug!("PTY EOF received");
1474 break;
1475 }
1476 Some(OutputEvent::Error(e)) => {
1477 debug!(error = %e, "PTY read error");
1478 break;
1479 }
1480 None => {
1481 break;
1483 }
1484 }
1485 }
1486
1487 input_event = async {
1489 match input_rx.as_mut() {
1490 Some(rx) => rx.recv().await,
1491 None => std::future::pending().await, }
1493 } => {
1494 match input_event {
1495 Some(InputEvent::CtrlC) => {
1496 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1497 CtrlCAction::ForwardAndStartWindow => {
1498 let _ = writer.write_all(&[3]);
1500 let _ = writer.flush();
1501 last_activity = Instant::now();
1502 }
1503 CtrlCAction::Terminate => {
1504 info!("Double Ctrl+C detected, terminating");
1505 termination = TerminationType::UserInterrupt;
1506 should_terminate.store(true, Ordering::SeqCst);
1507 self.terminate_child(&mut child, true).await?;
1508 break;
1509 }
1510 }
1511 }
1512 Some(InputEvent::CtrlBackslash) => {
1513 info!("Ctrl+\\ detected, force killing");
1514 termination = TerminationType::ForceKill;
1515 should_terminate.store(true, Ordering::SeqCst);
1516 self.terminate_child(&mut child, false).await?;
1517 break;
1518 }
1519 Some(InputEvent::Data(data)) => {
1520 let _ = writer.write_all(&data);
1522 let _ = writer.flush();
1523 last_activity = Instant::now();
1524 }
1525 None => {
1526 debug!("Input channel closed");
1528 }
1529 }
1530 }
1531
1532 tui_input = self.input_rx.recv() => {
1534 if let Some(data) = tui_input {
1535 match InputEvent::from_bytes(data) {
1536 InputEvent::CtrlC => {
1537 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1538 CtrlCAction::ForwardAndStartWindow => {
1539 let _ = writer.write_all(&[3]);
1540 let _ = writer.flush();
1541 last_activity = Instant::now();
1542 }
1543 CtrlCAction::Terminate => {
1544 info!("Double Ctrl+C detected, terminating");
1545 termination = TerminationType::UserInterrupt;
1546 should_terminate.store(true, Ordering::SeqCst);
1547 self.terminate_child(&mut child, true).await?;
1548 break;
1549 }
1550 }
1551 }
1552 InputEvent::CtrlBackslash => {
1553 info!("Ctrl+\\ detected, force killing");
1554 termination = TerminationType::ForceKill;
1555 should_terminate.store(true, Ordering::SeqCst);
1556 self.terminate_child(&mut child, false).await?;
1557 break;
1558 }
1559 InputEvent::Data(bytes) => {
1560 let _ = writer.write_all(&bytes);
1561 let _ = writer.flush();
1562 last_activity = Instant::now();
1563 }
1564 }
1565 }
1566 }
1567
1568 control_cmd = self.control_rx.recv() => {
1570 if let Some(cmd) = control_cmd {
1571 use crate::pty_handle::ControlCommand;
1572 match cmd {
1573 ControlCommand::Kill => {
1574 info!("Control command: Kill");
1575 termination = TerminationType::UserInterrupt;
1576 should_terminate.store(true, Ordering::SeqCst);
1577 self.terminate_child(&mut child, true).await?;
1578 break;
1579 }
1580 ControlCommand::Resize(cols, rows) => {
1581 debug!(cols, rows, "Control command: Resize");
1582 if let Err(e) = master.resize(PtySize {
1584 rows,
1585 cols,
1586 pixel_width: 0,
1587 pixel_height: 0,
1588 }) {
1589 warn!("Failed to resize PTY: {}", e);
1590 }
1591 }
1592 ControlCommand::Skip | ControlCommand::Abort => {
1593 debug!("Control command: {:?} (ignored at PTY level)", cmd);
1595 }
1596 }
1597 }
1598 }
1599
1600 _ = timeout_future => {
1602 warn!(
1603 timeout_secs = self.config.idle_timeout_secs,
1604 "Idle timeout triggered"
1605 );
1606 termination = TerminationType::IdleTimeout;
1607 should_terminate.store(true, Ordering::SeqCst);
1608 self.terminate_child(&mut child, true).await?;
1609 break;
1610 }
1611
1612 _ = interrupt_rx.changed() => {
1614 if *interrupt_rx.borrow() {
1615 debug!("Interrupt received in interactive mode, terminating");
1616 termination = TerminationType::UserInterrupt;
1617 should_terminate.store(true, Ordering::SeqCst);
1618 self.terminate_child(&mut child, true).await?;
1619 break;
1620 }
1621 }
1622 }
1623 }
1624
1625 should_terminate.store(true, Ordering::SeqCst);
1627
1628 let _ = self.terminated_tx.send(true);
1630
1631 let status = self
1633 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1634 .await?;
1635
1636 let (success, exit_code, final_termination) = match status {
1637 Some(s) => {
1638 let code = s.exit_code() as i32;
1639 (
1640 s.success(),
1641 Some(code),
1642 resolve_termination_type(code, termination),
1643 )
1644 }
1645 None => {
1646 warn!("Timed out waiting for child to exit after termination");
1647 (false, None, termination)
1648 }
1649 };
1650
1651 Ok(build_result(
1653 &output,
1654 success,
1655 exit_code,
1656 final_termination,
1657 String::new(),
1658 None,
1659 ))
1660 }
1661
1662 #[allow(clippy::unused_self)] #[allow(clippy::unused_async)] #[cfg(not(unix))]
1673 async fn terminate_child(
1674 &self,
1675 child: &mut Box<dyn portable_pty::Child + Send>,
1676 _graceful: bool,
1677 ) -> io::Result<()> {
1678 child.kill()
1679 }
1680
1681 #[cfg(unix)]
1682 async fn terminate_child(
1683 &self,
1684 child: &mut Box<dyn portable_pty::Child + Send>,
1685 graceful: bool,
1686 ) -> io::Result<()> {
1687 let pid = match child.process_id() {
1688 Some(id) => Pid::from_raw(id as i32),
1689 None => return Ok(()), };
1691
1692 if graceful {
1693 debug!(pid = %pid, "Sending SIGTERM");
1694 let _ = kill(pid, Signal::SIGTERM);
1695
1696 let grace_period = Duration::from_secs(2);
1698 let start = Instant::now();
1699
1700 while start.elapsed() < grace_period {
1701 if child
1702 .try_wait()
1703 .map_err(|e| io::Error::other(e.to_string()))?
1704 .is_some()
1705 {
1706 return Ok(());
1707 }
1708 tokio::time::sleep(Duration::from_millis(50)).await;
1710 }
1711
1712 debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1714 }
1715
1716 debug!(pid = %pid, "Sending SIGKILL");
1717 let _ = kill(pid, Signal::SIGKILL);
1718 Ok(())
1719 }
1720
1721 async fn wait_for_exit(
1726 &self,
1727 child: &mut Box<dyn portable_pty::Child + Send>,
1728 max_wait: Option<Duration>,
1729 interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1730 ) -> io::Result<Option<portable_pty::ExitStatus>> {
1731 let start = Instant::now();
1732
1733 loop {
1734 if let Some(status) = child
1735 .try_wait()
1736 .map_err(|e| io::Error::other(e.to_string()))?
1737 {
1738 return Ok(Some(status));
1739 }
1740
1741 if let Some(max) = max_wait
1742 && start.elapsed() >= max
1743 {
1744 return Ok(None);
1745 }
1746
1747 tokio::select! {
1748 _ = interrupt_rx.changed() => {
1749 if *interrupt_rx.borrow() {
1750 debug!("Interrupt received while waiting for child exit");
1751 return Ok(None);
1752 }
1753 }
1754 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1755 }
1756 }
1757 }
1758}
1759
1760fn handle_copilot_stream_line<H: StreamHandler>(
1761 line: &str,
1762 handler: &mut H,
1763 extracted_text: &mut String,
1764 copilot_state: &mut CopilotStreamState,
1765) -> Option<SessionResult> {
1766 let event = CopilotStreamParser::parse_line(line)?;
1767 dispatch_copilot_stream_event(event, handler, extracted_text, copilot_state)
1768}
1769
1770fn inject_ralph_runtime_env(cmd_builder: &mut CommandBuilder, workspace_root: &std::path::Path) {
1771 let Ok(current_exe) = env::current_exe() else {
1772 return;
1773 };
1774 let Some(bin_dir) = current_exe.parent() else {
1775 return;
1776 };
1777
1778 let mut path_entries = vec![bin_dir.to_path_buf()];
1779 if let Some(existing_path) = env::var_os("PATH") {
1780 path_entries.extend(env::split_paths(&existing_path));
1781 }
1782
1783 if let Ok(joined_path) = env::join_paths(path_entries) {
1784 cmd_builder.env("PATH", joined_path);
1785 }
1786 cmd_builder.env("RALPH_BIN", current_exe);
1787 cmd_builder.env("RALPH_WORKSPACE_ROOT", workspace_root);
1788 if std::path::Path::new("/var/tmp").is_dir() {
1789 cmd_builder.env("TMPDIR", "/var/tmp");
1790 cmd_builder.env("TMP", "/var/tmp");
1791 cmd_builder.env("TEMP", "/var/tmp");
1792 }
1793}
1794
1795#[derive(Debug)]
1797enum InputEvent {
1798 CtrlC,
1800 CtrlBackslash,
1802 Data(Vec<u8>),
1804}
1805
1806impl InputEvent {
1807 fn from_bytes(data: Vec<u8>) -> Self {
1809 if data.len() == 1 {
1810 match data[0] {
1811 3 => return InputEvent::CtrlC,
1812 28 => return InputEvent::CtrlBackslash,
1813 _ => {}
1814 }
1815 }
1816 InputEvent::Data(data)
1817 }
1818}
1819
1820#[derive(Debug)]
1822enum OutputEvent {
1823 Data(Vec<u8>),
1825 Eof,
1827 Error(String),
1829}
1830
1831fn strip_ansi(bytes: &[u8]) -> String {
1837 let stripped = strip_ansi_escapes::strip(bytes);
1838 String::from_utf8_lossy(&stripped).into_owned()
1839}
1840
1841fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1845 if exit_code == 130 {
1846 info!("Child process killed by SIGINT");
1847 TerminationType::UserInterrupt
1848 } else {
1849 default
1850 }
1851}
1852
1853fn extract_cli_flag_value(args: &[String], long_flag: &str, short_flag: &str) -> Option<String> {
1854 for (i, arg) in args.iter().enumerate() {
1855 if arg == long_flag || arg == short_flag {
1856 if let Some(value) = args.get(i + 1)
1857 && !value.starts_with('-')
1858 {
1859 return Some(value.clone());
1860 }
1861 continue;
1862 }
1863
1864 if let Some(value) = arg.strip_prefix(&format!("{long_flag}="))
1865 && !value.is_empty()
1866 {
1867 return Some(value.to_string());
1868 }
1869
1870 if let Some(value) = arg.strip_prefix(&format!("{short_flag}="))
1871 && !value.is_empty()
1872 {
1873 return Some(value.to_string());
1874 }
1875 }
1876
1877 None
1878}
1879
1880fn dispatch_stream_event<H: StreamHandler>(
1883 event: ClaudeStreamEvent,
1884 handler: &mut H,
1885 extracted_text: &mut String,
1886) {
1887 match event {
1888 ClaudeStreamEvent::System { .. } => {
1889 }
1891 ClaudeStreamEvent::Assistant { message, .. } => {
1892 for block in message.content {
1893 match block {
1894 ContentBlock::Text { text } => {
1895 handler.on_text(&text);
1896 extracted_text.push_str(&text);
1898 extracted_text.push('\n');
1899 }
1900 ContentBlock::ToolUse { name, id, input } => {
1901 handler.on_tool_call(&name, &id, &input)
1902 }
1903 }
1904 }
1905 }
1906 ClaudeStreamEvent::User { message } => {
1907 for block in message.content {
1908 match block {
1909 UserContentBlock::ToolResult {
1910 tool_use_id,
1911 content,
1912 } => {
1913 handler.on_tool_result(&tool_use_id, &content);
1914 }
1915 }
1916 }
1917 }
1918 ClaudeStreamEvent::Result {
1919 duration_ms,
1920 total_cost_usd,
1921 num_turns,
1922 is_error,
1923 } => {
1924 if is_error {
1925 handler.on_error("Session ended with error");
1926 }
1927 handler.on_complete(&SessionResult {
1928 duration_ms,
1929 total_cost_usd,
1930 num_turns,
1931 is_error,
1932 ..Default::default()
1933 });
1934 }
1935 }
1936}
1937
1938fn build_result(
1947 output: &[u8],
1948 success: bool,
1949 exit_code: Option<i32>,
1950 termination: TerminationType,
1951 extracted_text: String,
1952 session_result: Option<&SessionResult>,
1953) -> PtyExecutionResult {
1954 let (total_cost_usd, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens) =
1955 if let Some(result) = session_result {
1956 (
1957 result.total_cost_usd,
1958 result.input_tokens,
1959 result.output_tokens,
1960 result.cache_read_tokens,
1961 result.cache_write_tokens,
1962 )
1963 } else {
1964 (0.0, 0, 0, 0, 0)
1965 };
1966
1967 PtyExecutionResult {
1968 output: String::from_utf8_lossy(output).to_string(),
1969 stripped_output: strip_ansi(output),
1970 extracted_text,
1971 success,
1972 exit_code,
1973 termination,
1974 total_cost_usd,
1975 input_tokens,
1976 output_tokens,
1977 cache_read_tokens,
1978 cache_write_tokens,
1979 }
1980}
1981
1982#[cfg(test)]
1983mod tests {
1984 use super::*;
1985 use crate::claude_stream::{AssistantMessage, UserMessage};
1986 #[cfg(unix)]
1987 use crate::cli_backend::PromptMode;
1988 use crate::stream_handler::{SessionResult, StreamHandler};
1989 #[cfg(unix)]
1990 use tempfile::TempDir;
1991
1992 #[test]
1993 fn test_double_ctrl_c_within_window() {
1994 let mut state = CtrlCState::new();
1995 let now = Instant::now();
1996
1997 let action = state.handle_ctrl_c(now);
1999 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
2000
2001 let later = now + Duration::from_millis(500);
2003 let action = state.handle_ctrl_c(later);
2004 assert_eq!(action, CtrlCAction::Terminate);
2005 }
2006
2007 #[test]
2008 fn test_input_event_from_bytes_ctrl_c() {
2009 let event = InputEvent::from_bytes(vec![3]);
2010 assert!(matches!(event, InputEvent::CtrlC));
2011 }
2012
2013 #[test]
2014 fn test_input_event_from_bytes_ctrl_backslash() {
2015 let event = InputEvent::from_bytes(vec![28]);
2016 assert!(matches!(event, InputEvent::CtrlBackslash));
2017 }
2018
2019 #[test]
2020 fn test_input_event_from_bytes_data() {
2021 let event = InputEvent::from_bytes(vec![b'a']);
2022 assert!(matches!(event, InputEvent::Data(_)));
2023
2024 let event = InputEvent::from_bytes(vec![1, 2, 3]);
2025 assert!(matches!(event, InputEvent::Data(_)));
2026 }
2027
2028 #[test]
2029 fn test_ctrl_c_window_expires() {
2030 let mut state = CtrlCState::new();
2031 let now = Instant::now();
2032
2033 state.handle_ctrl_c(now);
2035
2036 let later = now + Duration::from_secs(2);
2038
2039 let action = state.handle_ctrl_c(later);
2041 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
2042 }
2043
2044 #[test]
2045 fn test_strip_ansi_basic() {
2046 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n";
2047 let stripped = strip_ansi(input);
2048 assert!(stripped.contains("Thinking..."));
2049 assert!(!stripped.contains("\x1b["));
2050 }
2051
2052 #[test]
2053 fn test_completion_promise_extraction() {
2054 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n\
2056 \x1b[2K\x1b[1;32m Done!\x1b[0m\r\n\
2057 \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
2058
2059 let stripped = strip_ansi(input);
2060
2061 assert!(stripped.contains("LOOP_COMPLETE"));
2063 assert!(!stripped.contains("\x1b["));
2064 }
2065
2066 #[test]
2067 fn test_event_tag_extraction() {
2068 let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
2070 Task completed successfully\r\n\
2071 \x1b[90m</event>\x1b[0m\r\n";
2072
2073 let stripped = strip_ansi(input);
2074
2075 assert!(stripped.contains("<event topic=\"build.done\">"));
2076 assert!(stripped.contains("</event>"));
2077 }
2078
2079 #[test]
2080 fn test_large_output_preserves_early_events() {
2081 let mut input = Vec::new();
2083
2084 input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
2086
2087 for i in 0..500 {
2089 input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
2090 }
2091
2092 let stripped = strip_ansi(&input);
2093
2094 assert!(
2096 stripped.contains("<event topic=\"build.task\">"),
2097 "Event tag was lost - strip_ansi is not preserving all content"
2098 );
2099 assert!(stripped.contains("Implement feature X"));
2100 assert!(stripped.contains("Line 499")); }
2102
2103 #[test]
2104 fn test_pty_config_defaults() {
2105 let config = PtyConfig::default();
2106 assert!(config.interactive);
2107 assert_eq!(config.idle_timeout_secs, 30);
2108 assert_eq!(config.cols, 80);
2109 assert_eq!(config.rows, 24);
2110 }
2111
2112 #[test]
2113 fn test_pty_config_from_env_matches_env_or_defaults() {
2114 let cols = std::env::var("COLUMNS")
2115 .ok()
2116 .and_then(|value| value.parse::<u16>().ok())
2117 .unwrap_or(80);
2118 let rows = std::env::var("LINES")
2119 .ok()
2120 .and_then(|value| value.parse::<u16>().ok())
2121 .unwrap_or(24);
2122
2123 let config = PtyConfig::from_env();
2124 assert_eq!(config.cols, cols);
2125 assert_eq!(config.rows, rows);
2126 }
2127
2128 #[test]
2136 fn test_idle_timeout_reset_logic() {
2137 let timeout_duration = Duration::from_secs(30);
2139
2140 let simulated_25s = Duration::from_secs(25);
2142
2143 let remaining = timeout_duration.saturating_sub(simulated_25s);
2145 assert_eq!(remaining.as_secs(), 5);
2146
2147 let last_activity_after_reset = Instant::now();
2149
2150 let elapsed = last_activity_after_reset.elapsed();
2152 assert!(elapsed < Duration::from_millis(100)); let new_remaining = timeout_duration.saturating_sub(elapsed);
2156 assert!(new_remaining > Duration::from_secs(29)); }
2158
2159 #[test]
2160 fn test_extracted_text_field_exists() {
2161 let result = PtyExecutionResult {
2164 output: String::new(),
2165 stripped_output: String::new(),
2166 extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
2167 success: true,
2168 exit_code: Some(0),
2169 termination: TerminationType::Natural,
2170 total_cost_usd: 0.0,
2171 input_tokens: 0,
2172 output_tokens: 0,
2173 cache_read_tokens: 0,
2174 cache_write_tokens: 0,
2175 };
2176
2177 assert!(
2178 result
2179 .extracted_text
2180 .contains("<event topic=\"build.done\">")
2181 );
2182 }
2183
2184 #[test]
2185 fn test_build_result_includes_extracted_text() {
2186 let output = b"raw output";
2188 let extracted = "extracted text with <event topic=\"test\">payload</event>";
2189 let result = build_result(
2190 output,
2191 true,
2192 Some(0),
2193 TerminationType::Natural,
2194 extracted.to_string(),
2195 None,
2196 );
2197
2198 assert_eq!(result.extracted_text, extracted);
2199 assert!(result.stripped_output.contains("raw output"));
2200 }
2201
2202 #[test]
2203 fn test_resolve_termination_type_handles_sigint_exit_code() {
2204 let termination = resolve_termination_type(130, TerminationType::Natural);
2205 assert_eq!(termination, TerminationType::UserInterrupt);
2206
2207 let termination = resolve_termination_type(0, TerminationType::ForceKill);
2208 assert_eq!(termination, TerminationType::ForceKill);
2209 }
2210
2211 #[test]
2212 fn test_extract_cli_flag_value_supports_split_and_equals_syntax() {
2213 let args = vec![
2214 "--provider".to_string(),
2215 "anthropic".to_string(),
2216 "--model=claude-sonnet-4".to_string(),
2217 ];
2218
2219 assert_eq!(
2220 extract_cli_flag_value(&args, "--provider", "-p"),
2221 Some("anthropic".to_string())
2222 );
2223 assert_eq!(
2224 extract_cli_flag_value(&args, "--model", "-m"),
2225 Some("claude-sonnet-4".to_string())
2226 );
2227 assert_eq!(extract_cli_flag_value(&args, "--foo", "-f"), None);
2228 }
2229
2230 #[derive(Default)]
2231 struct CapturingHandler {
2232 texts: Vec<String>,
2233 tool_calls: Vec<(String, String, serde_json::Value)>,
2234 tool_results: Vec<(String, String)>,
2235 errors: Vec<String>,
2236 completions: Vec<SessionResult>,
2237 }
2238
2239 impl StreamHandler for CapturingHandler {
2240 fn on_text(&mut self, text: &str) {
2241 self.texts.push(text.to_string());
2242 }
2243
2244 fn on_tool_call(&mut self, name: &str, id: &str, input: &serde_json::Value) {
2245 self.tool_calls
2246 .push((name.to_string(), id.to_string(), input.clone()));
2247 }
2248
2249 fn on_tool_result(&mut self, id: &str, output: &str) {
2250 self.tool_results.push((id.to_string(), output.to_string()));
2251 }
2252
2253 fn on_error(&mut self, error: &str) {
2254 self.errors.push(error.to_string());
2255 }
2256
2257 fn on_complete(&mut self, result: &SessionResult) {
2258 self.completions.push(result.clone());
2259 }
2260 }
2261
2262 #[test]
2263 fn test_dispatch_stream_event_routes_text_and_tool_calls() {
2264 let mut handler = CapturingHandler::default();
2265 let mut extracted_text = String::new();
2266
2267 let event = ClaudeStreamEvent::Assistant {
2268 message: AssistantMessage {
2269 content: vec![
2270 ContentBlock::Text {
2271 text: "Hello".to_string(),
2272 },
2273 ContentBlock::ToolUse {
2274 id: "tool-1".to_string(),
2275 name: "Read".to_string(),
2276 input: serde_json::json!({"path": "README.md"}),
2277 },
2278 ],
2279 },
2280 usage: None,
2281 };
2282
2283 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2284
2285 assert_eq!(handler.texts, vec!["Hello".to_string()]);
2286 assert_eq!(handler.tool_calls.len(), 1);
2287 assert!(extracted_text.contains("Hello"));
2288 assert!(extracted_text.ends_with('\n'));
2289 }
2290
2291 #[test]
2292 fn test_dispatch_stream_event_routes_tool_results_and_completion() {
2293 let mut handler = CapturingHandler::default();
2294 let mut extracted_text = String::new();
2295
2296 let event = ClaudeStreamEvent::User {
2297 message: UserMessage {
2298 content: vec![UserContentBlock::ToolResult {
2299 tool_use_id: "tool-1".to_string(),
2300 content: "done".to_string(),
2301 }],
2302 },
2303 };
2304
2305 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2306 assert_eq!(handler.tool_results.len(), 1);
2307 assert_eq!(handler.tool_results[0].0, "tool-1");
2308 assert_eq!(handler.tool_results[0].1, "done");
2309
2310 let event = ClaudeStreamEvent::Result {
2311 duration_ms: 12,
2312 total_cost_usd: 0.01,
2313 num_turns: 2,
2314 is_error: true,
2315 };
2316
2317 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2318 assert_eq!(handler.errors.len(), 1);
2319 assert_eq!(handler.completions.len(), 1);
2320 assert!(handler.completions[0].is_error);
2321 }
2322
2323 #[test]
2324 fn test_dispatch_stream_event_system_noop() {
2325 let mut handler = CapturingHandler::default();
2326 let mut extracted_text = String::new();
2327
2328 let event = ClaudeStreamEvent::System {
2329 session_id: "session-1".to_string(),
2330 model: "claude-test".to_string(),
2331 tools: Vec::new(),
2332 };
2333
2334 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2335
2336 assert!(handler.texts.is_empty());
2337 assert!(handler.tool_calls.is_empty());
2338 assert!(handler.tool_results.is_empty());
2339 assert!(handler.errors.is_empty());
2340 assert!(handler.completions.is_empty());
2341 assert!(extracted_text.is_empty());
2342 }
2343
2344 #[test]
2359 fn test_tui_mode_stdin_reader_bypass() {
2360 let tui_mode = true;
2366 let tui_connected = tui_mode;
2367
2368 assert!(
2371 tui_connected,
2372 "When tui_mode is true, stdin reader must be skipped"
2373 );
2374
2375 let tui_mode_disabled = false;
2377 let tui_connected_non_tui = tui_mode_disabled;
2378 assert!(
2379 !tui_connected_non_tui,
2380 "When tui_mode is false, stdin reader must be spawned"
2381 );
2382 }
2383
2384 #[test]
2385 fn test_tui_mode_default_is_false() {
2386 let backend = CliBackend::claude();
2388 let config = PtyConfig::default();
2389 let executor = PtyExecutor::new(backend, config);
2390
2391 assert!(!executor.tui_mode, "tui_mode should default to false");
2393 }
2394
2395 #[test]
2396 fn test_set_tui_mode() {
2397 let backend = CliBackend::claude();
2399 let config = PtyConfig::default();
2400 let mut executor = PtyExecutor::new(backend, config);
2401
2402 assert!(!executor.tui_mode, "tui_mode should start as false");
2404
2405 executor.set_tui_mode(true);
2407 assert!(
2408 executor.tui_mode,
2409 "tui_mode should be true after set_tui_mode(true)"
2410 );
2411
2412 executor.set_tui_mode(false);
2414 assert!(
2415 !executor.tui_mode,
2416 "tui_mode should be false after set_tui_mode(false)"
2417 );
2418 }
2419
2420 #[test]
2421 fn test_build_result_populates_fields() {
2422 let output = b"\x1b[31mHello\x1b[0m\n";
2423 let extracted = "extracted text".to_string();
2424
2425 let result = build_result(
2426 output,
2427 true,
2428 Some(0),
2429 TerminationType::Natural,
2430 extracted.clone(),
2431 None,
2432 );
2433
2434 assert_eq!(result.output, String::from_utf8_lossy(output));
2435 assert!(result.stripped_output.contains("Hello"));
2436 assert!(!result.stripped_output.contains("\x1b["));
2437 assert_eq!(result.extracted_text, extracted);
2438 assert!(result.success);
2439 assert_eq!(result.exit_code, Some(0));
2440 assert_eq!(result.termination, TerminationType::Natural);
2441 }
2442
2443 #[cfg(unix)]
2444 #[tokio::test]
2445 async fn test_run_observe_executes_arg_prompt() {
2446 let temp_dir = TempDir::new().expect("temp dir");
2447 let backend = CliBackend {
2448 command: "sh".to_string(),
2449 args: vec!["-c".to_string()],
2450 prompt_mode: PromptMode::Arg,
2451 prompt_flag: None,
2452 output_format: OutputFormat::Text,
2453 env_vars: vec![],
2454 };
2455 let config = PtyConfig {
2456 interactive: false,
2457 idle_timeout_secs: 0,
2458 cols: 80,
2459 rows: 24,
2460 workspace_root: temp_dir.path().to_path_buf(),
2461 };
2462 let executor = PtyExecutor::new(backend, config);
2463 let (_tx, rx) = tokio::sync::watch::channel(false);
2464
2465 let result = executor
2466 .run_observe("echo hello-pty", rx)
2467 .await
2468 .expect("run_observe");
2469
2470 assert!(result.success);
2471 assert!(result.output.contains("hello-pty"));
2472 assert!(result.stripped_output.contains("hello-pty"));
2473 assert_eq!(result.exit_code, Some(0));
2474 assert_eq!(result.termination, TerminationType::Natural);
2475 }
2476
2477 #[cfg(unix)]
2478 #[tokio::test]
2479 async fn test_run_observe_writes_stdin_prompt() {
2480 let temp_dir = TempDir::new().expect("temp dir");
2481 let backend = CliBackend {
2482 command: "sh".to_string(),
2483 args: vec!["-c".to_string(), "read line; echo \"$line\"".to_string()],
2484 prompt_mode: PromptMode::Stdin,
2485 prompt_flag: None,
2486 output_format: OutputFormat::Text,
2487 env_vars: vec![],
2488 };
2489 let config = PtyConfig {
2490 interactive: false,
2491 idle_timeout_secs: 0,
2492 cols: 80,
2493 rows: 24,
2494 workspace_root: temp_dir.path().to_path_buf(),
2495 };
2496 let executor = PtyExecutor::new(backend, config);
2497 let (_tx, rx) = tokio::sync::watch::channel(false);
2498
2499 let result = executor
2500 .run_observe("stdin-line", rx)
2501 .await
2502 .expect("run_observe");
2503
2504 assert!(result.success);
2505 assert!(result.output.contains("stdin-line"));
2506 assert!(result.stripped_output.contains("stdin-line"));
2507 assert_eq!(result.termination, TerminationType::Natural);
2508 }
2509
2510 #[cfg(unix)]
2516 #[tokio::test]
2517 async fn test_pty_converts_stdin_to_arg_for_large_prompt() {
2518 let _temp_dir = TempDir::new().expect("temp dir");
2519 let backend = CliBackend {
2520 command: "echo".to_string(),
2521 args: vec![],
2522 prompt_mode: PromptMode::Stdin,
2523 prompt_flag: Some("-p".to_string()),
2524 output_format: OutputFormat::Text,
2525 env_vars: vec![],
2526 };
2527
2528 let large_prompt = "x".repeat(32_000);
2530 let (cmd, args, stdin_input, temp_file) = backend.build_command_pty(&large_prompt);
2531 assert_eq!(cmd, "echo");
2532 assert!(stdin_input.is_none(), "PTY mode should not use stdin");
2534 assert!(temp_file.is_some(), "Large prompt should use temp file");
2536 assert!(
2538 args.iter().any(|a| a.contains("Please read and execute")),
2539 "args should contain temp file instruction: {:?}",
2540 args
2541 );
2542
2543 let small_prompt = "hello world";
2545 let (_, args, stdin_input, temp_file) = backend.build_command_pty(small_prompt);
2546 assert!(stdin_input.is_none());
2547 assert!(temp_file.is_none());
2548 assert!(args.iter().any(|a| a == small_prompt));
2549 }
2550
2551 #[cfg(unix)]
2554 #[tokio::test]
2555 async fn test_run_observe_large_stdin_backend_does_not_deadlock() {
2556 let temp_dir = TempDir::new().expect("temp dir");
2557 let backend = CliBackend {
2560 command: "echo".to_string(),
2561 args: vec![],
2562 prompt_mode: PromptMode::Stdin,
2563 prompt_flag: Some("-p".to_string()),
2564 output_format: OutputFormat::Text,
2565 env_vars: vec![],
2566 };
2567 let config = PtyConfig {
2568 interactive: false,
2569 idle_timeout_secs: 0,
2570 cols: 32768,
2571 rows: 24,
2572 workspace_root: temp_dir.path().to_path_buf(),
2573 };
2574 let executor = PtyExecutor::new(backend, config);
2575 let (_tx, rx) = tokio::sync::watch::channel(false);
2576
2577 let large_prompt = "x".repeat(32_000);
2578
2579 let result = tokio::time::timeout(
2581 std::time::Duration::from_secs(5),
2582 executor.run_observe(&large_prompt, rx),
2583 )
2584 .await
2585 .expect("should not deadlock")
2586 .expect("run_observe");
2587
2588 assert!(result.success);
2589 assert!(
2591 result.output.contains("Please read and execute"),
2592 "output should contain temp file instruction: {}",
2593 &result.output[..result.output.len().min(200)]
2594 );
2595 }
2596
2597 #[cfg(unix)]
2598 #[tokio::test]
2599 async fn test_run_observe_streaming_text_routes_output() {
2600 let temp_dir = TempDir::new().expect("temp dir");
2601 let backend = CliBackend {
2602 command: "sh".to_string(),
2603 args: vec!["-c".to_string()],
2604 prompt_mode: PromptMode::Arg,
2605 prompt_flag: None,
2606 output_format: OutputFormat::Text,
2607 env_vars: vec![],
2608 };
2609 let config = PtyConfig {
2610 interactive: false,
2611 idle_timeout_secs: 0,
2612 cols: 80,
2613 rows: 24,
2614 workspace_root: temp_dir.path().to_path_buf(),
2615 };
2616 let executor = PtyExecutor::new(backend, config);
2617 let (_tx, rx) = tokio::sync::watch::channel(false);
2618 let mut handler = CapturingHandler::default();
2619
2620 let result = executor
2621 .run_observe_streaming("printf 'alpha\\nbeta\\n'", rx, &mut handler)
2622 .await
2623 .expect("run_observe_streaming");
2624
2625 assert!(result.success);
2626 let captured = handler.texts.join("");
2627 assert!(captured.contains("alpha"), "captured: {captured}");
2628 assert!(captured.contains("beta"), "captured: {captured}");
2629 assert!(handler.completions.is_empty());
2630 assert!(result.extracted_text.is_empty());
2631 }
2632
2633 #[cfg(unix)]
2634 #[tokio::test]
2635 async fn test_run_observe_streaming_parses_stream_json() {
2636 let temp_dir = TempDir::new().expect("temp dir");
2637 let backend = CliBackend {
2638 command: "sh".to_string(),
2639 args: vec!["-c".to_string()],
2640 prompt_mode: PromptMode::Arg,
2641 prompt_flag: None,
2642 output_format: OutputFormat::StreamJson,
2643 env_vars: vec![],
2644 };
2645 let config = PtyConfig {
2646 interactive: false,
2647 idle_timeout_secs: 0,
2648 cols: 80,
2649 rows: 24,
2650 workspace_root: temp_dir.path().to_path_buf(),
2651 };
2652 let executor = PtyExecutor::new(backend, config);
2653 let (_tx, rx) = tokio::sync::watch::channel(false);
2654 let mut handler = CapturingHandler::default();
2655
2656 let script = r#"printf '%s\n' '{"type":"assistant","message":{"content":[{"type":"text","text":"Hello stream"}]}}' '{"type":"result","duration_ms":1,"total_cost_usd":0.0,"num_turns":1,"is_error":false}'"#;
2657 let result = executor
2658 .run_observe_streaming(script, rx, &mut handler)
2659 .await
2660 .expect("run_observe_streaming");
2661
2662 assert!(result.success);
2663 assert!(
2664 handler
2665 .texts
2666 .iter()
2667 .any(|text| text.contains("Hello stream"))
2668 );
2669 assert_eq!(handler.completions.len(), 1);
2670 assert!(result.extracted_text.contains("Hello stream"));
2671 assert_eq!(result.termination, TerminationType::Natural);
2672 }
2673
2674 #[cfg(unix)]
2675 #[tokio::test]
2676 async fn test_run_interactive_in_tui_mode() {
2677 let temp_dir = TempDir::new().expect("temp dir");
2678 let backend = CliBackend {
2679 command: "sh".to_string(),
2680 args: vec!["-c".to_string()],
2681 prompt_mode: PromptMode::Arg,
2682 prompt_flag: None,
2683 output_format: OutputFormat::Text,
2684 env_vars: vec![],
2685 };
2686 let config = PtyConfig {
2687 interactive: true,
2688 idle_timeout_secs: 0,
2689 cols: 80,
2690 rows: 24,
2691 workspace_root: temp_dir.path().to_path_buf(),
2692 };
2693 let mut executor = PtyExecutor::new(backend, config);
2694 executor.set_tui_mode(true);
2695 let (_tx, rx) = tokio::sync::watch::channel(false);
2696
2697 let result = executor
2698 .run_interactive("echo hello-tui", rx)
2699 .await
2700 .expect("run_interactive");
2701
2702 assert!(result.success);
2703 assert!(result.output.contains("hello-tui"));
2704 assert!(result.stripped_output.contains("hello-tui"));
2705 assert_eq!(result.exit_code, Some(0));
2706 assert_eq!(result.termination, TerminationType::Natural);
2707 }
2708}