1#![allow(clippy::cast_possible_wrap)]
20
21use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
22use crate::cli_backend::{CliBackend, OutputFormat};
23use crate::copilot_stream::{
24 CopilotStreamParser, CopilotStreamState, dispatch_copilot_stream_event,
25};
26use crate::pi_stream::{PiSessionState, PiStreamParser, dispatch_pi_stream_event};
27use crate::stream_handler::{SessionResult, StreamHandler};
28#[cfg(unix)]
29use nix::sys::signal::{Signal, kill};
30#[cfg(unix)]
31use nix::unistd::Pid;
32use portable_pty::{CommandBuilder, PtyPair, PtySize, native_pty_system};
33use std::env;
34use std::io::{self, Read, Write};
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::{Duration, Instant};
38use tokio::sync::{mpsc, watch};
39use tracing::{debug, info, warn};
40
41#[derive(Debug)]
43pub struct PtyExecutionResult {
44 pub output: String,
46 pub stripped_output: String,
48 pub extracted_text: String,
54 pub success: bool,
56 pub exit_code: Option<i32>,
58 pub termination: TerminationType,
60 pub total_cost_usd: f64,
62 pub input_tokens: u64,
64 pub output_tokens: u64,
66 pub cache_read_tokens: u64,
68 pub cache_write_tokens: u64,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum TerminationType {
75 Natural,
77 IdleTimeout,
79 UserInterrupt,
81 ForceKill,
83}
84
85#[derive(Debug, Clone)]
87pub struct PtyConfig {
88 pub interactive: bool,
90 pub idle_timeout_secs: u32,
92 pub cols: u16,
94 pub rows: u16,
96 pub workspace_root: std::path::PathBuf,
100}
101
102impl Default for PtyConfig {
103 fn default() -> Self {
104 Self {
105 interactive: true,
106 idle_timeout_secs: 30,
107 cols: 80,
108 rows: 24,
109 workspace_root: std::env::current_dir()
110 .unwrap_or_else(|_| std::path::PathBuf::from(".")),
111 }
112 }
113}
114
115impl PtyConfig {
116 pub fn from_env() -> Self {
118 let cols = std::env::var("COLUMNS")
119 .ok()
120 .and_then(|s| s.parse().ok())
121 .unwrap_or(80);
122 let rows = std::env::var("LINES")
123 .ok()
124 .and_then(|s| s.parse().ok())
125 .unwrap_or(24);
126
127 Self {
128 cols,
129 rows,
130 ..Default::default()
131 }
132 }
133
134 pub fn with_workspace_root(mut self, root: impl Into<std::path::PathBuf>) -> Self {
136 self.workspace_root = root.into();
137 self
138 }
139}
140
141#[derive(Debug)]
143pub struct CtrlCState {
144 first_press: Option<Instant>,
146 window: Duration,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum CtrlCAction {
153 ForwardAndStartWindow,
155 Terminate,
157}
158
159impl CtrlCState {
160 pub fn new() -> Self {
162 Self {
163 first_press: None,
164 window: Duration::from_secs(1),
165 }
166 }
167
168 pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
170 match self.first_press {
171 Some(first) if now.duration_since(first) < self.window => {
172 self.first_press = None;
174 CtrlCAction::Terminate
175 }
176 _ => {
177 self.first_press = Some(now);
179 CtrlCAction::ForwardAndStartWindow
180 }
181 }
182 }
183}
184
185impl Default for CtrlCState {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191pub struct PtyExecutor {
193 backend: CliBackend,
194 config: PtyConfig,
195 output_tx: mpsc::UnboundedSender<Vec<u8>>,
197 output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
198 input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
199 input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
200 control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
201 control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
202 terminated_tx: watch::Sender<bool>,
204 terminated_rx: Option<watch::Receiver<bool>>,
205 tui_mode: bool,
209}
210
211impl PtyExecutor {
212 pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
214 let (output_tx, output_rx) = mpsc::unbounded_channel();
215 let (input_tx, input_rx) = mpsc::unbounded_channel();
216 let (control_tx, control_rx) = mpsc::unbounded_channel();
217 let (terminated_tx, terminated_rx) = watch::channel(false);
218
219 Self {
220 backend,
221 config,
222 output_tx,
223 output_rx: Some(output_rx),
224 input_tx: Some(input_tx),
225 input_rx,
226 control_tx: Some(control_tx),
227 control_rx,
228 terminated_tx,
229 terminated_rx: Some(terminated_rx),
230 tui_mode: false,
231 }
232 }
233
234 pub fn set_tui_mode(&mut self, enabled: bool) {
243 self.tui_mode = enabled;
244 }
245
246 pub fn set_backend(&mut self, backend: CliBackend) {
254 self.backend = backend;
255 }
256
257 pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
261 crate::pty_handle::PtyHandle {
262 output_rx: self.output_rx.take().expect("handle() already called"),
263 input_tx: self.input_tx.take().expect("handle() already called"),
264 control_tx: self.control_tx.take().expect("handle() already called"),
265 terminated_rx: self.terminated_rx.take().expect("handle() already called"),
266 }
267 }
268
269 fn spawn_pty(
278 &self,
279 prompt: &str,
280 ) -> io::Result<(
281 PtyPair,
282 Box<dyn portable_pty::Child + Send>,
283 Option<String>,
284 Option<tempfile::NamedTempFile>,
285 )> {
286 let pty_system = native_pty_system();
287
288 let pair = pty_system
289 .openpty(PtySize {
290 rows: self.config.rows,
291 cols: self.config.cols,
292 pixel_width: 0,
293 pixel_height: 0,
294 })
295 .map_err(|e| io::Error::other(e.to_string()))?;
296
297 let (cmd, args, stdin_input, temp_file) =
298 self.backend.build_command(prompt, self.config.interactive);
299
300 let mut cmd_builder = CommandBuilder::new(&cmd);
301 cmd_builder.args(&args);
302
303 cmd_builder.cwd(&self.config.workspace_root);
306
307 cmd_builder.env("TERM", "xterm-256color");
309 inject_ralph_runtime_env(&mut cmd_builder, &self.config.workspace_root);
310
311 for (key, value) in &self.backend.env_vars {
313 cmd_builder.env(key, value);
314 }
315 let child = pair
316 .slave
317 .spawn_command(cmd_builder)
318 .map_err(|e| io::Error::other(e.to_string()))?;
319
320 Ok((pair, child, stdin_input, temp_file))
322 }
323
324 pub async fn run_observe(
341 &self,
342 prompt: &str,
343 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
344 ) -> io::Result<PtyExecutionResult> {
345 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
347
348 let reader = pair
349 .master
350 .try_clone_reader()
351 .map_err(|e| io::Error::other(e.to_string()))?;
352
353 if let Some(ref input) = stdin_input {
355 tokio::time::sleep(Duration::from_millis(100)).await;
357 let mut writer = pair
358 .master
359 .take_writer()
360 .map_err(|e| io::Error::other(e.to_string()))?;
361 writer.write_all(input.as_bytes())?;
362 writer.write_all(b"\n")?;
363 writer.flush()?;
364 }
365
366 drop(pair.slave);
368
369 let mut output = Vec::new();
370 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
371 None
372 } else {
373 Some(Duration::from_secs(u64::from(
374 self.config.idle_timeout_secs,
375 )))
376 };
377
378 let mut termination = TerminationType::Natural;
379 let mut last_activity = Instant::now();
380
381 let should_terminate = Arc::new(AtomicBool::new(false));
383
384 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
386 let should_terminate_reader = Arc::clone(&should_terminate);
387 let tui_connected = self.tui_mode;
389 let tui_output_tx = if tui_connected {
390 Some(self.output_tx.clone())
391 } else {
392 None
393 };
394
395 debug!("Spawning PTY output reader thread (observe mode)");
396 std::thread::spawn(move || {
397 let mut reader = reader;
398 let mut buf = [0u8; 4096];
399
400 loop {
401 if should_terminate_reader.load(Ordering::SeqCst) {
402 debug!("PTY reader: termination requested");
403 break;
404 }
405
406 match reader.read(&mut buf) {
407 Ok(0) => {
408 debug!("PTY reader: EOF");
409 let _ = output_tx.blocking_send(OutputEvent::Eof);
410 break;
411 }
412 Ok(n) => {
413 let data = buf[..n].to_vec();
414 if let Some(ref tx) = tui_output_tx {
416 let _ = tx.send(data.clone());
417 }
418 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
420 break;
421 }
422 }
423 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
424 std::thread::sleep(Duration::from_millis(10));
425 }
426 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
427 Err(e) => {
428 debug!(error = %e, "PTY reader error");
429 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
430 break;
431 }
432 }
433 }
434 });
435
436 loop {
438 let idle_timeout = timeout_duration.map(|d| {
440 let elapsed = last_activity.elapsed();
441 if elapsed >= d {
442 Duration::from_millis(1) } else {
444 d.saturating_sub(elapsed)
445 }
446 });
447
448 tokio::select! {
449 _ = interrupt_rx.changed() => {
451 if *interrupt_rx.borrow() {
452 debug!("Interrupt received in observe mode, terminating");
453 termination = TerminationType::UserInterrupt;
454 should_terminate.store(true, Ordering::SeqCst);
455 let _ = self.terminate_child(&mut child, true).await;
456 break;
457 }
458 }
459
460 event = output_rx.recv() => {
462 match event {
463 Some(OutputEvent::Data(data)) => {
464 if !tui_connected {
466 io::stdout().write_all(&data)?;
467 io::stdout().flush()?;
468 }
469 output.extend_from_slice(&data);
470 last_activity = Instant::now();
471 }
472 Some(OutputEvent::Eof) | None => {
473 debug!("Output channel closed, process likely exited");
474 break;
475 }
476 Some(OutputEvent::Error(e)) => {
477 debug!(error = %e, "Reader thread reported error");
478 break;
479 }
480 }
481 }
482
483 _ = async {
485 if let Some(timeout) = idle_timeout {
486 tokio::time::sleep(timeout).await;
487 } else {
488 std::future::pending::<()>().await;
490 }
491 } => {
492 warn!(
493 timeout_secs = self.config.idle_timeout_secs,
494 "Idle timeout triggered"
495 );
496 termination = TerminationType::IdleTimeout;
497 should_terminate.store(true, Ordering::SeqCst);
498 self.terminate_child(&mut child, true).await?;
499 break;
500 }
501 }
502
503 if let Some(status) = child
505 .try_wait()
506 .map_err(|e| io::Error::other(e.to_string()))?
507 {
508 let exit_code = status.exit_code() as i32;
509 debug!(exit_status = ?status, exit_code, "Child process exited");
510
511 while let Ok(event) = output_rx.try_recv() {
513 if let OutputEvent::Data(data) = event {
514 if !tui_connected {
515 io::stdout().write_all(&data)?;
516 io::stdout().flush()?;
517 }
518 output.extend_from_slice(&data);
519 }
520 }
521
522 let drain_deadline = Instant::now() + Duration::from_millis(200);
525 loop {
526 let remaining = drain_deadline.saturating_duration_since(Instant::now());
527 if remaining.is_zero() {
528 break;
529 }
530 match tokio::time::timeout(remaining, output_rx.recv()).await {
531 Ok(Some(OutputEvent::Data(data))) => {
532 if !tui_connected {
533 io::stdout().write_all(&data)?;
534 io::stdout().flush()?;
535 }
536 output.extend_from_slice(&data);
537 }
538 Ok(Some(OutputEvent::Eof) | None) => break,
539 Ok(Some(OutputEvent::Error(e))) => {
540 debug!(error = %e, "PTY read error after exit");
541 break;
542 }
543 Err(_) => break,
544 }
545 }
546
547 let final_termination = resolve_termination_type(exit_code, termination);
548 return Ok(build_result(
550 &output,
551 status.success(),
552 Some(exit_code),
553 final_termination,
554 String::new(),
555 None,
556 ));
557 }
558 }
559
560 should_terminate.store(true, Ordering::SeqCst);
562
563 let status = self
565 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
566 .await?;
567
568 let (success, exit_code, final_termination) = match status {
569 Some(s) => {
570 let code = s.exit_code() as i32;
571 (
572 s.success(),
573 Some(code),
574 resolve_termination_type(code, termination),
575 )
576 }
577 None => {
578 warn!("Timed out waiting for child to exit after termination");
579 (false, None, termination)
580 }
581 };
582
583 Ok(build_result(
585 &output,
586 success,
587 exit_code,
588 final_termination,
589 String::new(),
590 None,
591 ))
592 }
593
594 pub async fn run_observe_streaming<H: StreamHandler>(
610 &self,
611 prompt: &str,
612 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
613 handler: &mut H,
614 ) -> io::Result<PtyExecutionResult> {
615 let output_format = self.backend.output_format;
617
618 let is_stream_json = output_format == OutputFormat::StreamJson;
623 let is_copilot_stream = output_format == OutputFormat::CopilotStreamJson;
624 let is_pi_stream = output_format == OutputFormat::PiStreamJson;
625 let show_pi_thinking = is_pi_stream && self.tui_mode;
627 let is_real_pi_backend = self.backend.command == "pi";
628
629 if is_pi_stream && is_real_pi_backend {
630 let configured_provider =
631 extract_cli_flag_value(&self.backend.args, "--provider", "-p")
632 .unwrap_or_else(|| "auto".to_string());
633 let configured_model = extract_cli_flag_value(&self.backend.args, "--model", "-m")
634 .unwrap_or_else(|| "default".to_string());
635 handler.on_text(&format!(
636 "Pi configured: provider={configured_provider}, model={configured_model}\n"
637 ));
638 }
639
640 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
642
643 let reader = pair
644 .master
645 .try_clone_reader()
646 .map_err(|e| io::Error::other(e.to_string()))?;
647
648 if let Some(ref input) = stdin_input {
650 tokio::time::sleep(Duration::from_millis(100)).await;
651 let mut writer = pair
652 .master
653 .take_writer()
654 .map_err(|e| io::Error::other(e.to_string()))?;
655 writer.write_all(input.as_bytes())?;
656 writer.write_all(b"\n")?;
657 writer.flush()?;
658 }
659
660 drop(pair.slave);
661
662 let mut output = Vec::new();
663 let mut line_buffer = String::new();
664 let mut extracted_text = String::new();
666 let mut pi_state = PiSessionState::new();
668 let mut copilot_state = CopilotStreamState::new();
669 let mut completion: Option<SessionResult> = None;
670 let start_time = Instant::now();
671 let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
672 None
673 } else {
674 Some(Duration::from_secs(u64::from(
675 self.config.idle_timeout_secs,
676 )))
677 };
678
679 let mut termination = TerminationType::Natural;
680 let mut last_activity = Instant::now();
681
682 let should_terminate = Arc::new(AtomicBool::new(false));
683
684 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
686 let should_terminate_reader = Arc::clone(&should_terminate);
687 let tui_connected = self.tui_mode;
688 let tui_output_tx = if tui_connected {
689 Some(self.output_tx.clone())
690 } else {
691 None
692 };
693
694 debug!("Spawning PTY output reader thread (streaming mode)");
695 std::thread::spawn(move || {
696 let mut reader = reader;
697 let mut buf = [0u8; 4096];
698
699 loop {
700 if should_terminate_reader.load(Ordering::SeqCst) {
701 debug!("PTY reader: termination requested");
702 break;
703 }
704
705 match reader.read(&mut buf) {
706 Ok(0) => {
707 debug!("PTY reader: EOF");
708 let _ = output_tx.blocking_send(OutputEvent::Eof);
709 break;
710 }
711 Ok(n) => {
712 let data = buf[..n].to_vec();
713 if let Some(ref tx) = tui_output_tx {
714 let _ = tx.send(data.clone());
715 }
716 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
717 break;
718 }
719 }
720 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
721 std::thread::sleep(Duration::from_millis(10));
722 }
723 Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
724 Err(e) => {
725 debug!(error = %e, "PTY reader error");
726 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
727 break;
728 }
729 }
730 }
731 });
732
733 loop {
735 let idle_timeout = timeout_duration.map(|d| {
736 let elapsed = last_activity.elapsed();
737 if elapsed >= d {
738 Duration::from_millis(1)
739 } else {
740 d.saturating_sub(elapsed)
741 }
742 });
743
744 tokio::select! {
745 _ = interrupt_rx.changed() => {
746 if *interrupt_rx.borrow() {
747 debug!("Interrupt received in streaming observe mode, terminating");
748 termination = TerminationType::UserInterrupt;
749 should_terminate.store(true, Ordering::SeqCst);
750 let _ = self.terminate_child(&mut child, true).await;
751 break;
752 }
753 }
754
755 event = output_rx.recv() => {
756 match event {
757 Some(OutputEvent::Data(data)) => {
758 output.extend_from_slice(&data);
759 last_activity = Instant::now();
760
761 if let Ok(text) = std::str::from_utf8(&data) {
762 if is_stream_json {
763 line_buffer.push_str(text);
765
766 while let Some(newline_pos) = line_buffer.find('\n') {
768 let line = line_buffer[..newline_pos].to_string();
769 line_buffer = line_buffer[newline_pos + 1..].to_string();
770
771 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
772 if let ClaudeStreamEvent::Result {
773 duration_ms,
774 total_cost_usd,
775 num_turns,
776 is_error,
777 } = &event
778 {
779 completion = Some(SessionResult {
780 duration_ms: *duration_ms,
781 total_cost_usd: *total_cost_usd,
782 num_turns: *num_turns,
783 is_error: *is_error,
784 ..Default::default()
785 });
786 }
787 dispatch_stream_event(event, handler, &mut extracted_text);
788 }
789 }
790 } else if is_copilot_stream {
791 line_buffer.push_str(text);
792
793 while let Some(newline_pos) = line_buffer.find('\n') {
794 let line = line_buffer[..newline_pos].to_string();
795 line_buffer = line_buffer[newline_pos + 1..].to_string();
796
797 if let Some(session_result) = handle_copilot_stream_line(
798 &line,
799 handler,
800 &mut extracted_text,
801 &mut copilot_state,
802 ) {
803 completion = Some(session_result);
804 }
805 }
806 } else if is_pi_stream {
807 line_buffer.push_str(text);
809
810 while let Some(newline_pos) = line_buffer.find('\n') {
811 let line = line_buffer[..newline_pos].to_string();
812 line_buffer = line_buffer[newline_pos + 1..].to_string();
813
814 if let Some(event) = PiStreamParser::parse_line(&line) {
815 dispatch_pi_stream_event(
816 event,
817 handler,
818 &mut extracted_text,
819 &mut pi_state,
820 show_pi_thinking,
821 );
822 }
823 }
824 } else {
825 handler.on_text(text);
828 }
829 }
830 }
831 Some(OutputEvent::Eof) | None => {
832 debug!("Output channel closed");
833 if is_stream_json && !line_buffer.is_empty()
835 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
836 {
837 if let ClaudeStreamEvent::Result {
838 duration_ms,
839 total_cost_usd,
840 num_turns,
841 is_error,
842 } = &event
843 {
844 completion = Some(SessionResult {
845 duration_ms: *duration_ms,
846 total_cost_usd: *total_cost_usd,
847 num_turns: *num_turns,
848 is_error: *is_error,
849 ..Default::default()
850 });
851 }
852 dispatch_stream_event(event, handler, &mut extracted_text);
853 } else if is_copilot_stream && !line_buffer.is_empty() {
854 if let Some(session_result) = handle_copilot_stream_line(
855 &line_buffer,
856 handler,
857 &mut extracted_text,
858 &mut copilot_state,
859 ) {
860 completion = Some(session_result);
861 }
862 } else if is_pi_stream && !line_buffer.is_empty()
863 && let Some(event) = PiStreamParser::parse_line(&line_buffer)
864 {
865 dispatch_pi_stream_event(
866 event,
867 handler,
868 &mut extracted_text,
869 &mut pi_state,
870 show_pi_thinking,
871 );
872 }
873 break;
874 }
875 Some(OutputEvent::Error(e)) => {
876 debug!(error = %e, "Reader thread reported error");
877 handler.on_error(&e);
878 break;
879 }
880 }
881 }
882
883 _ = async {
884 if let Some(timeout) = idle_timeout {
885 tokio::time::sleep(timeout).await;
886 } else {
887 std::future::pending::<()>().await;
888 }
889 } => {
890 warn!(
891 timeout_secs = self.config.idle_timeout_secs,
892 "Idle timeout triggered"
893 );
894 termination = TerminationType::IdleTimeout;
895 should_terminate.store(true, Ordering::SeqCst);
896 self.terminate_child(&mut child, true).await?;
897 break;
898 }
899 }
900
901 if let Some(status) = child
903 .try_wait()
904 .map_err(|e| io::Error::other(e.to_string()))?
905 {
906 let exit_code = status.exit_code() as i32;
907 debug!(exit_status = ?status, exit_code, "Child process exited");
908
909 while let Ok(event) = output_rx.try_recv() {
911 if let OutputEvent::Data(data) = event {
912 output.extend_from_slice(&data);
913 if let Ok(text) = std::str::from_utf8(&data) {
914 if is_stream_json {
915 line_buffer.push_str(text);
917 while let Some(newline_pos) = line_buffer.find('\n') {
918 let line = line_buffer[..newline_pos].to_string();
919 line_buffer = line_buffer[newline_pos + 1..].to_string();
920 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
921 if let ClaudeStreamEvent::Result {
922 duration_ms,
923 total_cost_usd,
924 num_turns,
925 is_error,
926 } = &event
927 {
928 completion = Some(SessionResult {
929 duration_ms: *duration_ms,
930 total_cost_usd: *total_cost_usd,
931 num_turns: *num_turns,
932 is_error: *is_error,
933 ..Default::default()
934 });
935 }
936 dispatch_stream_event(event, handler, &mut extracted_text);
937 }
938 }
939 } else if is_copilot_stream {
940 line_buffer.push_str(text);
941 while let Some(newline_pos) = line_buffer.find('\n') {
942 let line = line_buffer[..newline_pos].to_string();
943 line_buffer = line_buffer[newline_pos + 1..].to_string();
944 if let Some(session_result) = handle_copilot_stream_line(
945 &line,
946 handler,
947 &mut extracted_text,
948 &mut copilot_state,
949 ) {
950 completion = Some(session_result);
951 }
952 }
953 } else if is_pi_stream {
954 line_buffer.push_str(text);
956 while let Some(newline_pos) = line_buffer.find('\n') {
957 let line = line_buffer[..newline_pos].to_string();
958 line_buffer = line_buffer[newline_pos + 1..].to_string();
959 if let Some(event) = PiStreamParser::parse_line(&line) {
960 dispatch_pi_stream_event(
961 event,
962 handler,
963 &mut extracted_text,
964 &mut pi_state,
965 show_pi_thinking,
966 );
967 }
968 }
969 } else {
970 handler.on_text(text);
972 }
973 }
974 }
975 }
976
977 let drain_deadline = Instant::now() + Duration::from_millis(200);
980 loop {
981 let remaining = drain_deadline.saturating_duration_since(Instant::now());
982 if remaining.is_zero() {
983 break;
984 }
985 match tokio::time::timeout(remaining, output_rx.recv()).await {
986 Ok(Some(OutputEvent::Data(data))) => {
987 output.extend_from_slice(&data);
988 if let Ok(text) = std::str::from_utf8(&data) {
989 if is_stream_json {
990 line_buffer.push_str(text);
992 while let Some(newline_pos) = line_buffer.find('\n') {
993 let line = line_buffer[..newline_pos].to_string();
994 line_buffer = line_buffer[newline_pos + 1..].to_string();
995 if let Some(event) = ClaudeStreamParser::parse_line(&line) {
996 if let ClaudeStreamEvent::Result {
997 duration_ms,
998 total_cost_usd,
999 num_turns,
1000 is_error,
1001 } = &event
1002 {
1003 completion = Some(SessionResult {
1004 duration_ms: *duration_ms,
1005 total_cost_usd: *total_cost_usd,
1006 num_turns: *num_turns,
1007 is_error: *is_error,
1008 ..Default::default()
1009 });
1010 }
1011 dispatch_stream_event(
1012 event,
1013 handler,
1014 &mut extracted_text,
1015 );
1016 }
1017 }
1018 } else if is_copilot_stream {
1019 line_buffer.push_str(text);
1020 while let Some(newline_pos) = line_buffer.find('\n') {
1021 let line = line_buffer[..newline_pos].to_string();
1022 line_buffer = line_buffer[newline_pos + 1..].to_string();
1023 handle_copilot_stream_line(
1024 &line,
1025 handler,
1026 &mut extracted_text,
1027 &mut copilot_state,
1028 );
1029 }
1030 } else if is_pi_stream {
1031 line_buffer.push_str(text);
1033 while let Some(newline_pos) = line_buffer.find('\n') {
1034 let line = line_buffer[..newline_pos].to_string();
1035 line_buffer = line_buffer[newline_pos + 1..].to_string();
1036 if let Some(event) = PiStreamParser::parse_line(&line) {
1037 dispatch_pi_stream_event(
1038 event,
1039 handler,
1040 &mut extracted_text,
1041 &mut pi_state,
1042 show_pi_thinking,
1043 );
1044 }
1045 }
1046 } else {
1047 handler.on_text(text);
1049 }
1050 }
1051 }
1052 Ok(Some(OutputEvent::Eof) | None) => break,
1053 Ok(Some(OutputEvent::Error(e))) => {
1054 debug!(error = %e, "PTY read error after exit");
1055 break;
1056 }
1057 Err(_) => break,
1058 }
1059 }
1060
1061 if is_stream_json
1063 && !line_buffer.is_empty()
1064 && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
1065 {
1066 if let ClaudeStreamEvent::Result {
1067 duration_ms,
1068 total_cost_usd,
1069 num_turns,
1070 is_error,
1071 } = &event
1072 {
1073 completion = Some(SessionResult {
1074 duration_ms: *duration_ms,
1075 total_cost_usd: *total_cost_usd,
1076 num_turns: *num_turns,
1077 is_error: *is_error,
1078 ..Default::default()
1079 });
1080 }
1081 dispatch_stream_event(event, handler, &mut extracted_text);
1082 } else if is_copilot_stream && !line_buffer.is_empty() {
1083 if let Some(session_result) = handle_copilot_stream_line(
1084 &line_buffer,
1085 handler,
1086 &mut extracted_text,
1087 &mut copilot_state,
1088 ) {
1089 completion = Some(session_result);
1090 }
1091 } else if is_pi_stream
1092 && !line_buffer.is_empty()
1093 && let Some(event) = PiStreamParser::parse_line(&line_buffer)
1094 {
1095 dispatch_pi_stream_event(
1096 event,
1097 handler,
1098 &mut extracted_text,
1099 &mut pi_state,
1100 show_pi_thinking,
1101 );
1102 }
1103
1104 let final_termination = resolve_termination_type(exit_code, termination);
1105
1106 if is_pi_stream {
1108 if is_real_pi_backend {
1109 let stream_provider =
1110 pi_state.stream_provider.as_deref().unwrap_or("unknown");
1111 let stream_model = pi_state.stream_model.as_deref().unwrap_or("unknown");
1112 handler.on_text(&format!(
1113 "Pi stream: provider={stream_provider}, model={stream_model}\n"
1114 ));
1115 }
1116 let session_result = SessionResult {
1117 duration_ms: start_time.elapsed().as_millis() as u64,
1118 total_cost_usd: pi_state.total_cost_usd,
1119 num_turns: pi_state.num_turns,
1120 is_error: !status.success(),
1121 input_tokens: pi_state.input_tokens,
1122 output_tokens: pi_state.output_tokens,
1123 cache_read_tokens: pi_state.cache_read_tokens,
1124 cache_write_tokens: pi_state.cache_write_tokens,
1125 };
1126 handler.on_complete(&session_result);
1127 completion = Some(session_result);
1128 }
1129
1130 return Ok(build_result(
1132 &output,
1133 status.success(),
1134 Some(exit_code),
1135 final_termination,
1136 extracted_text,
1137 completion.as_ref(),
1138 ));
1139 }
1140 }
1141
1142 should_terminate.store(true, Ordering::SeqCst);
1143
1144 let status = self
1145 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1146 .await?;
1147
1148 let (success, exit_code, final_termination) = match status {
1149 Some(s) => {
1150 let code = s.exit_code() as i32;
1151 (
1152 s.success(),
1153 Some(code),
1154 resolve_termination_type(code, termination),
1155 )
1156 }
1157 None => {
1158 warn!("Timed out waiting for child to exit after termination");
1159 (false, None, termination)
1160 }
1161 };
1162
1163 if is_pi_stream {
1165 if is_real_pi_backend {
1166 let stream_provider = pi_state.stream_provider.as_deref().unwrap_or("unknown");
1167 let stream_model = pi_state.stream_model.as_deref().unwrap_or("unknown");
1168 handler.on_text(&format!(
1169 "Pi stream: provider={stream_provider}, model={stream_model}\n"
1170 ));
1171 }
1172 let session_result = SessionResult {
1173 duration_ms: start_time.elapsed().as_millis() as u64,
1174 total_cost_usd: pi_state.total_cost_usd,
1175 num_turns: pi_state.num_turns,
1176 is_error: !success,
1177 input_tokens: pi_state.input_tokens,
1178 output_tokens: pi_state.output_tokens,
1179 cache_read_tokens: pi_state.cache_read_tokens,
1180 cache_write_tokens: pi_state.cache_write_tokens,
1181 };
1182 handler.on_complete(&session_result);
1183 completion = Some(session_result);
1184 }
1185
1186 Ok(build_result(
1188 &output,
1189 success,
1190 exit_code,
1191 final_termination,
1192 extracted_text,
1193 completion.as_ref(),
1194 ))
1195 }
1196
1197 #[allow(clippy::too_many_lines)] pub async fn run_interactive(
1218 &mut self,
1219 prompt: &str,
1220 mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
1221 ) -> io::Result<PtyExecutionResult> {
1222 let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
1224
1225 let reader = pair
1226 .master
1227 .try_clone_reader()
1228 .map_err(|e| io::Error::other(e.to_string()))?;
1229 let mut writer = pair
1230 .master
1231 .take_writer()
1232 .map_err(|e| io::Error::other(e.to_string()))?;
1233
1234 let master = pair.master;
1236
1237 drop(pair.slave);
1239
1240 let pending_stdin = stdin_input;
1242
1243 let mut output = Vec::new();
1244 let timeout_duration = if self.config.idle_timeout_secs > 0 {
1245 Some(Duration::from_secs(u64::from(
1246 self.config.idle_timeout_secs,
1247 )))
1248 } else {
1249 None
1250 };
1251
1252 let mut ctrl_c_state = CtrlCState::new();
1253 let mut termination = TerminationType::Natural;
1254 let mut last_activity = Instant::now();
1255
1256 let should_terminate = Arc::new(AtomicBool::new(false));
1258
1259 let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
1261 let should_terminate_output = Arc::clone(&should_terminate);
1262 let tui_connected = self.tui_mode;
1264 let tui_output_tx = if tui_connected {
1265 Some(self.output_tx.clone())
1266 } else {
1267 None
1268 };
1269
1270 debug!("Spawning PTY output reader thread");
1271 std::thread::spawn(move || {
1272 debug!("PTY output reader thread started");
1273 let mut reader = reader;
1274 let mut buf = [0u8; 4096];
1275
1276 loop {
1277 if should_terminate_output.load(Ordering::SeqCst) {
1278 debug!("PTY output reader: termination requested");
1279 break;
1280 }
1281
1282 match reader.read(&mut buf) {
1283 Ok(0) => {
1284 debug!("PTY output reader: EOF received");
1286 let _ = output_tx.blocking_send(OutputEvent::Eof);
1287 break;
1288 }
1289 Ok(n) => {
1290 let data = buf[..n].to_vec();
1291 if let Some(ref tx) = tui_output_tx {
1293 let _ = tx.send(data.clone());
1294 }
1295 if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
1297 debug!("PTY output reader: channel closed");
1298 break;
1299 }
1300 }
1301 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1302 std::thread::sleep(Duration::from_millis(1));
1304 }
1305 Err(e) if e.kind() == io::ErrorKind::Interrupted => {
1306 }
1308 Err(e) => {
1309 warn!("PTY output reader: error - {}", e);
1310 let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
1311 break;
1312 }
1313 }
1314 }
1315 debug!("PTY output reader thread exiting");
1316 });
1317
1318 let mut input_rx = if tui_connected {
1323 debug!("TUI connected - skipping stdin reader thread");
1324 None
1325 } else {
1326 let (input_tx, input_rx) = mpsc::unbounded_channel::<InputEvent>();
1327 let should_terminate_input = Arc::clone(&should_terminate);
1328
1329 std::thread::spawn(move || {
1330 let mut stdin = io::stdin();
1331 let mut buf = [0u8; 1];
1332
1333 loop {
1334 if should_terminate_input.load(Ordering::SeqCst) {
1335 break;
1336 }
1337
1338 match stdin.read(&mut buf) {
1339 Ok(0) => break, Ok(1) => {
1341 let byte = buf[0];
1342 let event = match byte {
1343 3 => InputEvent::CtrlC, 28 => InputEvent::CtrlBackslash, _ => InputEvent::Data(vec![byte]),
1346 };
1347 if input_tx.send(event).is_err() {
1348 break;
1349 }
1350 }
1351 Ok(_) => {} Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
1353 Err(_) => break,
1354 }
1355 }
1356 });
1357 Some(input_rx)
1358 };
1359
1360 if let Some(ref input) = pending_stdin {
1363 tokio::time::sleep(Duration::from_millis(100)).await;
1364 writer.write_all(input.as_bytes())?;
1365 writer.write_all(b"\n")?;
1366 writer.flush()?;
1367 last_activity = Instant::now();
1368 }
1369
1370 loop {
1373 if let Some(status) = child
1375 .try_wait()
1376 .map_err(|e| io::Error::other(e.to_string()))?
1377 {
1378 let exit_code = status.exit_code() as i32;
1379 debug!(exit_status = ?status, exit_code, "Child process exited");
1380
1381 while let Ok(event) = output_rx.try_recv() {
1383 if let OutputEvent::Data(data) = event {
1384 if !tui_connected {
1385 io::stdout().write_all(&data)?;
1386 io::stdout().flush()?;
1387 }
1388 output.extend_from_slice(&data);
1389 }
1390 }
1391
1392 let drain_deadline = Instant::now() + Duration::from_millis(200);
1395 loop {
1396 let remaining = drain_deadline.saturating_duration_since(Instant::now());
1397 if remaining.is_zero() {
1398 break;
1399 }
1400 match tokio::time::timeout(remaining, output_rx.recv()).await {
1401 Ok(Some(OutputEvent::Data(data))) => {
1402 if !tui_connected {
1403 io::stdout().write_all(&data)?;
1404 io::stdout().flush()?;
1405 }
1406 output.extend_from_slice(&data);
1407 }
1408 Ok(Some(OutputEvent::Eof) | None) => break,
1409 Ok(Some(OutputEvent::Error(e))) => {
1410 debug!(error = %e, "PTY read error after exit");
1411 break;
1412 }
1413 Err(_) => break, }
1415 }
1416
1417 should_terminate.store(true, Ordering::SeqCst);
1418 let _ = self.terminated_tx.send(true);
1420
1421 let final_termination = resolve_termination_type(exit_code, termination);
1422 return Ok(build_result(
1424 &output,
1425 status.success(),
1426 Some(exit_code),
1427 final_termination,
1428 String::new(),
1429 None,
1430 ));
1431 }
1432
1433 let timeout_future = async {
1435 match timeout_duration {
1436 Some(d) => {
1437 let elapsed = last_activity.elapsed();
1438 if elapsed >= d {
1439 tokio::time::sleep(Duration::ZERO).await
1440 } else {
1441 tokio::time::sleep(d.saturating_sub(elapsed)).await
1442 }
1443 }
1444 None => std::future::pending::<()>().await,
1445 }
1446 };
1447
1448 tokio::select! {
1449 output_event = output_rx.recv() => {
1451 match output_event {
1452 Some(OutputEvent::Data(data)) => {
1453 if !tui_connected {
1455 io::stdout().write_all(&data)?;
1456 io::stdout().flush()?;
1457 }
1458 output.extend_from_slice(&data);
1459
1460 last_activity = Instant::now();
1461 }
1462 Some(OutputEvent::Eof) => {
1463 debug!("PTY EOF received");
1464 break;
1465 }
1466 Some(OutputEvent::Error(e)) => {
1467 debug!(error = %e, "PTY read error");
1468 break;
1469 }
1470 None => {
1471 break;
1473 }
1474 }
1475 }
1476
1477 input_event = async {
1479 match input_rx.as_mut() {
1480 Some(rx) => rx.recv().await,
1481 None => std::future::pending().await, }
1483 } => {
1484 match input_event {
1485 Some(InputEvent::CtrlC) => {
1486 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1487 CtrlCAction::ForwardAndStartWindow => {
1488 let _ = writer.write_all(&[3]);
1490 let _ = writer.flush();
1491 last_activity = Instant::now();
1492 }
1493 CtrlCAction::Terminate => {
1494 info!("Double Ctrl+C detected, terminating");
1495 termination = TerminationType::UserInterrupt;
1496 should_terminate.store(true, Ordering::SeqCst);
1497 self.terminate_child(&mut child, true).await?;
1498 break;
1499 }
1500 }
1501 }
1502 Some(InputEvent::CtrlBackslash) => {
1503 info!("Ctrl+\\ detected, force killing");
1504 termination = TerminationType::ForceKill;
1505 should_terminate.store(true, Ordering::SeqCst);
1506 self.terminate_child(&mut child, false).await?;
1507 break;
1508 }
1509 Some(InputEvent::Data(data)) => {
1510 let _ = writer.write_all(&data);
1512 let _ = writer.flush();
1513 last_activity = Instant::now();
1514 }
1515 None => {
1516 debug!("Input channel closed");
1518 }
1519 }
1520 }
1521
1522 tui_input = self.input_rx.recv() => {
1524 if let Some(data) = tui_input {
1525 match InputEvent::from_bytes(data) {
1526 InputEvent::CtrlC => {
1527 match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1528 CtrlCAction::ForwardAndStartWindow => {
1529 let _ = writer.write_all(&[3]);
1530 let _ = writer.flush();
1531 last_activity = Instant::now();
1532 }
1533 CtrlCAction::Terminate => {
1534 info!("Double Ctrl+C detected, terminating");
1535 termination = TerminationType::UserInterrupt;
1536 should_terminate.store(true, Ordering::SeqCst);
1537 self.terminate_child(&mut child, true).await?;
1538 break;
1539 }
1540 }
1541 }
1542 InputEvent::CtrlBackslash => {
1543 info!("Ctrl+\\ detected, force killing");
1544 termination = TerminationType::ForceKill;
1545 should_terminate.store(true, Ordering::SeqCst);
1546 self.terminate_child(&mut child, false).await?;
1547 break;
1548 }
1549 InputEvent::Data(bytes) => {
1550 let _ = writer.write_all(&bytes);
1551 let _ = writer.flush();
1552 last_activity = Instant::now();
1553 }
1554 }
1555 }
1556 }
1557
1558 control_cmd = self.control_rx.recv() => {
1560 if let Some(cmd) = control_cmd {
1561 use crate::pty_handle::ControlCommand;
1562 match cmd {
1563 ControlCommand::Kill => {
1564 info!("Control command: Kill");
1565 termination = TerminationType::UserInterrupt;
1566 should_terminate.store(true, Ordering::SeqCst);
1567 self.terminate_child(&mut child, true).await?;
1568 break;
1569 }
1570 ControlCommand::Resize(cols, rows) => {
1571 debug!(cols, rows, "Control command: Resize");
1572 if let Err(e) = master.resize(PtySize {
1574 rows,
1575 cols,
1576 pixel_width: 0,
1577 pixel_height: 0,
1578 }) {
1579 warn!("Failed to resize PTY: {}", e);
1580 }
1581 }
1582 ControlCommand::Skip | ControlCommand::Abort => {
1583 debug!("Control command: {:?} (ignored at PTY level)", cmd);
1585 }
1586 }
1587 }
1588 }
1589
1590 _ = timeout_future => {
1592 warn!(
1593 timeout_secs = self.config.idle_timeout_secs,
1594 "Idle timeout triggered"
1595 );
1596 termination = TerminationType::IdleTimeout;
1597 should_terminate.store(true, Ordering::SeqCst);
1598 self.terminate_child(&mut child, true).await?;
1599 break;
1600 }
1601
1602 _ = interrupt_rx.changed() => {
1604 if *interrupt_rx.borrow() {
1605 debug!("Interrupt received in interactive mode, terminating");
1606 termination = TerminationType::UserInterrupt;
1607 should_terminate.store(true, Ordering::SeqCst);
1608 self.terminate_child(&mut child, true).await?;
1609 break;
1610 }
1611 }
1612 }
1613 }
1614
1615 should_terminate.store(true, Ordering::SeqCst);
1617
1618 let _ = self.terminated_tx.send(true);
1620
1621 let status = self
1623 .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1624 .await?;
1625
1626 let (success, exit_code, final_termination) = match status {
1627 Some(s) => {
1628 let code = s.exit_code() as i32;
1629 (
1630 s.success(),
1631 Some(code),
1632 resolve_termination_type(code, termination),
1633 )
1634 }
1635 None => {
1636 warn!("Timed out waiting for child to exit after termination");
1637 (false, None, termination)
1638 }
1639 };
1640
1641 Ok(build_result(
1643 &output,
1644 success,
1645 exit_code,
1646 final_termination,
1647 String::new(),
1648 None,
1649 ))
1650 }
1651
1652 #[allow(clippy::unused_self)] #[allow(clippy::unused_async)] #[cfg(not(unix))]
1663 async fn terminate_child(
1664 &self,
1665 child: &mut Box<dyn portable_pty::Child + Send>,
1666 _graceful: bool,
1667 ) -> io::Result<()> {
1668 child.kill()
1669 }
1670
1671 #[cfg(unix)]
1672 async fn terminate_child(
1673 &self,
1674 child: &mut Box<dyn portable_pty::Child + Send>,
1675 graceful: bool,
1676 ) -> io::Result<()> {
1677 let pid = match child.process_id() {
1678 Some(id) => Pid::from_raw(id as i32),
1679 None => return Ok(()), };
1681
1682 if graceful {
1683 debug!(pid = %pid, "Sending SIGTERM");
1684 let _ = kill(pid, Signal::SIGTERM);
1685
1686 let grace_period = Duration::from_secs(2);
1688 let start = Instant::now();
1689
1690 while start.elapsed() < grace_period {
1691 if child
1692 .try_wait()
1693 .map_err(|e| io::Error::other(e.to_string()))?
1694 .is_some()
1695 {
1696 return Ok(());
1697 }
1698 tokio::time::sleep(Duration::from_millis(50)).await;
1700 }
1701
1702 debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1704 }
1705
1706 debug!(pid = %pid, "Sending SIGKILL");
1707 let _ = kill(pid, Signal::SIGKILL);
1708 Ok(())
1709 }
1710
1711 async fn wait_for_exit(
1716 &self,
1717 child: &mut Box<dyn portable_pty::Child + Send>,
1718 max_wait: Option<Duration>,
1719 interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1720 ) -> io::Result<Option<portable_pty::ExitStatus>> {
1721 let start = Instant::now();
1722
1723 loop {
1724 if let Some(status) = child
1725 .try_wait()
1726 .map_err(|e| io::Error::other(e.to_string()))?
1727 {
1728 return Ok(Some(status));
1729 }
1730
1731 if let Some(max) = max_wait
1732 && start.elapsed() >= max
1733 {
1734 return Ok(None);
1735 }
1736
1737 tokio::select! {
1738 _ = interrupt_rx.changed() => {
1739 if *interrupt_rx.borrow() {
1740 debug!("Interrupt received while waiting for child exit");
1741 return Ok(None);
1742 }
1743 }
1744 _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1745 }
1746 }
1747 }
1748}
1749
1750fn handle_copilot_stream_line<H: StreamHandler>(
1751 line: &str,
1752 handler: &mut H,
1753 extracted_text: &mut String,
1754 copilot_state: &mut CopilotStreamState,
1755) -> Option<SessionResult> {
1756 let event = CopilotStreamParser::parse_line(line)?;
1757 dispatch_copilot_stream_event(event, handler, extracted_text, copilot_state)
1758}
1759
1760fn inject_ralph_runtime_env(cmd_builder: &mut CommandBuilder, workspace_root: &std::path::Path) {
1761 let Ok(current_exe) = env::current_exe() else {
1762 return;
1763 };
1764 let Some(bin_dir) = current_exe.parent() else {
1765 return;
1766 };
1767
1768 let mut path_entries = vec![bin_dir.to_path_buf()];
1769 if let Some(existing_path) = env::var_os("PATH") {
1770 path_entries.extend(env::split_paths(&existing_path));
1771 }
1772
1773 if let Ok(joined_path) = env::join_paths(path_entries) {
1774 cmd_builder.env("PATH", joined_path);
1775 }
1776 cmd_builder.env("RALPH_BIN", current_exe);
1777 cmd_builder.env("RALPH_WORKSPACE_ROOT", workspace_root);
1778 if std::path::Path::new("/var/tmp").is_dir() {
1779 cmd_builder.env("TMPDIR", "/var/tmp");
1780 cmd_builder.env("TMP", "/var/tmp");
1781 cmd_builder.env("TEMP", "/var/tmp");
1782 }
1783}
1784
1785#[derive(Debug)]
1787enum InputEvent {
1788 CtrlC,
1790 CtrlBackslash,
1792 Data(Vec<u8>),
1794}
1795
1796impl InputEvent {
1797 fn from_bytes(data: Vec<u8>) -> Self {
1799 if data.len() == 1 {
1800 match data[0] {
1801 3 => return InputEvent::CtrlC,
1802 28 => return InputEvent::CtrlBackslash,
1803 _ => {}
1804 }
1805 }
1806 InputEvent::Data(data)
1807 }
1808}
1809
1810#[derive(Debug)]
1812enum OutputEvent {
1813 Data(Vec<u8>),
1815 Eof,
1817 Error(String),
1819}
1820
1821fn strip_ansi(bytes: &[u8]) -> String {
1827 let stripped = strip_ansi_escapes::strip(bytes);
1828 String::from_utf8_lossy(&stripped).into_owned()
1829}
1830
1831fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1835 if exit_code == 130 {
1836 info!("Child process killed by SIGINT");
1837 TerminationType::UserInterrupt
1838 } else {
1839 default
1840 }
1841}
1842
1843fn extract_cli_flag_value(args: &[String], long_flag: &str, short_flag: &str) -> Option<String> {
1844 for (i, arg) in args.iter().enumerate() {
1845 if arg == long_flag || arg == short_flag {
1846 if let Some(value) = args.get(i + 1)
1847 && !value.starts_with('-')
1848 {
1849 return Some(value.clone());
1850 }
1851 continue;
1852 }
1853
1854 if let Some(value) = arg.strip_prefix(&format!("{long_flag}="))
1855 && !value.is_empty()
1856 {
1857 return Some(value.to_string());
1858 }
1859
1860 if let Some(value) = arg.strip_prefix(&format!("{short_flag}="))
1861 && !value.is_empty()
1862 {
1863 return Some(value.to_string());
1864 }
1865 }
1866
1867 None
1868}
1869
1870fn dispatch_stream_event<H: StreamHandler>(
1873 event: ClaudeStreamEvent,
1874 handler: &mut H,
1875 extracted_text: &mut String,
1876) {
1877 match event {
1878 ClaudeStreamEvent::System { .. } => {
1879 }
1881 ClaudeStreamEvent::Assistant { message, .. } => {
1882 for block in message.content {
1883 match block {
1884 ContentBlock::Text { text } => {
1885 handler.on_text(&text);
1886 extracted_text.push_str(&text);
1888 extracted_text.push('\n');
1889 }
1890 ContentBlock::ToolUse { name, id, input } => {
1891 handler.on_tool_call(&name, &id, &input)
1892 }
1893 }
1894 }
1895 }
1896 ClaudeStreamEvent::User { message } => {
1897 for block in message.content {
1898 match block {
1899 UserContentBlock::ToolResult {
1900 tool_use_id,
1901 content,
1902 } => {
1903 handler.on_tool_result(&tool_use_id, &content);
1904 }
1905 }
1906 }
1907 }
1908 ClaudeStreamEvent::Result {
1909 duration_ms,
1910 total_cost_usd,
1911 num_turns,
1912 is_error,
1913 } => {
1914 if is_error {
1915 handler.on_error("Session ended with error");
1916 }
1917 handler.on_complete(&SessionResult {
1918 duration_ms,
1919 total_cost_usd,
1920 num_turns,
1921 is_error,
1922 ..Default::default()
1923 });
1924 }
1925 }
1926}
1927
1928fn build_result(
1937 output: &[u8],
1938 success: bool,
1939 exit_code: Option<i32>,
1940 termination: TerminationType,
1941 extracted_text: String,
1942 session_result: Option<&SessionResult>,
1943) -> PtyExecutionResult {
1944 let (total_cost_usd, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens) =
1945 if let Some(result) = session_result {
1946 (
1947 result.total_cost_usd,
1948 result.input_tokens,
1949 result.output_tokens,
1950 result.cache_read_tokens,
1951 result.cache_write_tokens,
1952 )
1953 } else {
1954 (0.0, 0, 0, 0, 0)
1955 };
1956
1957 PtyExecutionResult {
1958 output: String::from_utf8_lossy(output).to_string(),
1959 stripped_output: strip_ansi(output),
1960 extracted_text,
1961 success,
1962 exit_code,
1963 termination,
1964 total_cost_usd,
1965 input_tokens,
1966 output_tokens,
1967 cache_read_tokens,
1968 cache_write_tokens,
1969 }
1970}
1971
1972#[cfg(test)]
1973mod tests {
1974 use super::*;
1975 use crate::claude_stream::{AssistantMessage, UserMessage};
1976 #[cfg(unix)]
1977 use crate::cli_backend::PromptMode;
1978 use crate::stream_handler::{SessionResult, StreamHandler};
1979 #[cfg(unix)]
1980 use tempfile::TempDir;
1981
1982 #[test]
1983 fn test_double_ctrl_c_within_window() {
1984 let mut state = CtrlCState::new();
1985 let now = Instant::now();
1986
1987 let action = state.handle_ctrl_c(now);
1989 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1990
1991 let later = now + Duration::from_millis(500);
1993 let action = state.handle_ctrl_c(later);
1994 assert_eq!(action, CtrlCAction::Terminate);
1995 }
1996
1997 #[test]
1998 fn test_input_event_from_bytes_ctrl_c() {
1999 let event = InputEvent::from_bytes(vec![3]);
2000 assert!(matches!(event, InputEvent::CtrlC));
2001 }
2002
2003 #[test]
2004 fn test_input_event_from_bytes_ctrl_backslash() {
2005 let event = InputEvent::from_bytes(vec![28]);
2006 assert!(matches!(event, InputEvent::CtrlBackslash));
2007 }
2008
2009 #[test]
2010 fn test_input_event_from_bytes_data() {
2011 let event = InputEvent::from_bytes(vec![b'a']);
2012 assert!(matches!(event, InputEvent::Data(_)));
2013
2014 let event = InputEvent::from_bytes(vec![1, 2, 3]);
2015 assert!(matches!(event, InputEvent::Data(_)));
2016 }
2017
2018 #[test]
2019 fn test_ctrl_c_window_expires() {
2020 let mut state = CtrlCState::new();
2021 let now = Instant::now();
2022
2023 state.handle_ctrl_c(now);
2025
2026 let later = now + Duration::from_secs(2);
2028
2029 let action = state.handle_ctrl_c(later);
2031 assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
2032 }
2033
2034 #[test]
2035 fn test_strip_ansi_basic() {
2036 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n";
2037 let stripped = strip_ansi(input);
2038 assert!(stripped.contains("Thinking..."));
2039 assert!(!stripped.contains("\x1b["));
2040 }
2041
2042 #[test]
2043 fn test_completion_promise_extraction() {
2044 let input = b"\x1b[1;36m Thinking...\x1b[0m\r\n\
2046 \x1b[2K\x1b[1;32m Done!\x1b[0m\r\n\
2047 \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
2048
2049 let stripped = strip_ansi(input);
2050
2051 assert!(stripped.contains("LOOP_COMPLETE"));
2053 assert!(!stripped.contains("\x1b["));
2054 }
2055
2056 #[test]
2057 fn test_event_tag_extraction() {
2058 let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
2060 Task completed successfully\r\n\
2061 \x1b[90m</event>\x1b[0m\r\n";
2062
2063 let stripped = strip_ansi(input);
2064
2065 assert!(stripped.contains("<event topic=\"build.done\">"));
2066 assert!(stripped.contains("</event>"));
2067 }
2068
2069 #[test]
2070 fn test_large_output_preserves_early_events() {
2071 let mut input = Vec::new();
2073
2074 input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
2076
2077 for i in 0..500 {
2079 input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
2080 }
2081
2082 let stripped = strip_ansi(&input);
2083
2084 assert!(
2086 stripped.contains("<event topic=\"build.task\">"),
2087 "Event tag was lost - strip_ansi is not preserving all content"
2088 );
2089 assert!(stripped.contains("Implement feature X"));
2090 assert!(stripped.contains("Line 499")); }
2092
2093 #[test]
2094 fn test_pty_config_defaults() {
2095 let config = PtyConfig::default();
2096 assert!(config.interactive);
2097 assert_eq!(config.idle_timeout_secs, 30);
2098 assert_eq!(config.cols, 80);
2099 assert_eq!(config.rows, 24);
2100 }
2101
2102 #[test]
2103 fn test_pty_config_from_env_matches_env_or_defaults() {
2104 let cols = std::env::var("COLUMNS")
2105 .ok()
2106 .and_then(|value| value.parse::<u16>().ok())
2107 .unwrap_or(80);
2108 let rows = std::env::var("LINES")
2109 .ok()
2110 .and_then(|value| value.parse::<u16>().ok())
2111 .unwrap_or(24);
2112
2113 let config = PtyConfig::from_env();
2114 assert_eq!(config.cols, cols);
2115 assert_eq!(config.rows, rows);
2116 }
2117
2118 #[test]
2126 fn test_idle_timeout_reset_logic() {
2127 let timeout_duration = Duration::from_secs(30);
2129
2130 let simulated_25s = Duration::from_secs(25);
2132
2133 let remaining = timeout_duration.saturating_sub(simulated_25s);
2135 assert_eq!(remaining.as_secs(), 5);
2136
2137 let last_activity_after_reset = Instant::now();
2139
2140 let elapsed = last_activity_after_reset.elapsed();
2142 assert!(elapsed < Duration::from_millis(100)); let new_remaining = timeout_duration.saturating_sub(elapsed);
2146 assert!(new_remaining > Duration::from_secs(29)); }
2148
2149 #[test]
2150 fn test_extracted_text_field_exists() {
2151 let result = PtyExecutionResult {
2154 output: String::new(),
2155 stripped_output: String::new(),
2156 extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
2157 success: true,
2158 exit_code: Some(0),
2159 termination: TerminationType::Natural,
2160 total_cost_usd: 0.0,
2161 input_tokens: 0,
2162 output_tokens: 0,
2163 cache_read_tokens: 0,
2164 cache_write_tokens: 0,
2165 };
2166
2167 assert!(
2168 result
2169 .extracted_text
2170 .contains("<event topic=\"build.done\">")
2171 );
2172 }
2173
2174 #[test]
2175 fn test_build_result_includes_extracted_text() {
2176 let output = b"raw output";
2178 let extracted = "extracted text with <event topic=\"test\">payload</event>";
2179 let result = build_result(
2180 output,
2181 true,
2182 Some(0),
2183 TerminationType::Natural,
2184 extracted.to_string(),
2185 None,
2186 );
2187
2188 assert_eq!(result.extracted_text, extracted);
2189 assert!(result.stripped_output.contains("raw output"));
2190 }
2191
2192 #[test]
2193 fn test_resolve_termination_type_handles_sigint_exit_code() {
2194 let termination = resolve_termination_type(130, TerminationType::Natural);
2195 assert_eq!(termination, TerminationType::UserInterrupt);
2196
2197 let termination = resolve_termination_type(0, TerminationType::ForceKill);
2198 assert_eq!(termination, TerminationType::ForceKill);
2199 }
2200
2201 #[test]
2202 fn test_extract_cli_flag_value_supports_split_and_equals_syntax() {
2203 let args = vec![
2204 "--provider".to_string(),
2205 "anthropic".to_string(),
2206 "--model=claude-sonnet-4".to_string(),
2207 ];
2208
2209 assert_eq!(
2210 extract_cli_flag_value(&args, "--provider", "-p"),
2211 Some("anthropic".to_string())
2212 );
2213 assert_eq!(
2214 extract_cli_flag_value(&args, "--model", "-m"),
2215 Some("claude-sonnet-4".to_string())
2216 );
2217 assert_eq!(extract_cli_flag_value(&args, "--foo", "-f"), None);
2218 }
2219
2220 #[derive(Default)]
2221 struct CapturingHandler {
2222 texts: Vec<String>,
2223 tool_calls: Vec<(String, String, serde_json::Value)>,
2224 tool_results: Vec<(String, String)>,
2225 errors: Vec<String>,
2226 completions: Vec<SessionResult>,
2227 }
2228
2229 impl StreamHandler for CapturingHandler {
2230 fn on_text(&mut self, text: &str) {
2231 self.texts.push(text.to_string());
2232 }
2233
2234 fn on_tool_call(&mut self, name: &str, id: &str, input: &serde_json::Value) {
2235 self.tool_calls
2236 .push((name.to_string(), id.to_string(), input.clone()));
2237 }
2238
2239 fn on_tool_result(&mut self, id: &str, output: &str) {
2240 self.tool_results.push((id.to_string(), output.to_string()));
2241 }
2242
2243 fn on_error(&mut self, error: &str) {
2244 self.errors.push(error.to_string());
2245 }
2246
2247 fn on_complete(&mut self, result: &SessionResult) {
2248 self.completions.push(result.clone());
2249 }
2250 }
2251
2252 #[test]
2253 fn test_dispatch_stream_event_routes_text_and_tool_calls() {
2254 let mut handler = CapturingHandler::default();
2255 let mut extracted_text = String::new();
2256
2257 let event = ClaudeStreamEvent::Assistant {
2258 message: AssistantMessage {
2259 content: vec![
2260 ContentBlock::Text {
2261 text: "Hello".to_string(),
2262 },
2263 ContentBlock::ToolUse {
2264 id: "tool-1".to_string(),
2265 name: "Read".to_string(),
2266 input: serde_json::json!({"path": "README.md"}),
2267 },
2268 ],
2269 },
2270 usage: None,
2271 };
2272
2273 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2274
2275 assert_eq!(handler.texts, vec!["Hello".to_string()]);
2276 assert_eq!(handler.tool_calls.len(), 1);
2277 assert!(extracted_text.contains("Hello"));
2278 assert!(extracted_text.ends_with('\n'));
2279 }
2280
2281 #[test]
2282 fn test_dispatch_stream_event_routes_tool_results_and_completion() {
2283 let mut handler = CapturingHandler::default();
2284 let mut extracted_text = String::new();
2285
2286 let event = ClaudeStreamEvent::User {
2287 message: UserMessage {
2288 content: vec![UserContentBlock::ToolResult {
2289 tool_use_id: "tool-1".to_string(),
2290 content: "done".to_string(),
2291 }],
2292 },
2293 };
2294
2295 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2296 assert_eq!(handler.tool_results.len(), 1);
2297 assert_eq!(handler.tool_results[0].0, "tool-1");
2298 assert_eq!(handler.tool_results[0].1, "done");
2299
2300 let event = ClaudeStreamEvent::Result {
2301 duration_ms: 12,
2302 total_cost_usd: 0.01,
2303 num_turns: 2,
2304 is_error: true,
2305 };
2306
2307 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2308 assert_eq!(handler.errors.len(), 1);
2309 assert_eq!(handler.completions.len(), 1);
2310 assert!(handler.completions[0].is_error);
2311 }
2312
2313 #[test]
2314 fn test_dispatch_stream_event_system_noop() {
2315 let mut handler = CapturingHandler::default();
2316 let mut extracted_text = String::new();
2317
2318 let event = ClaudeStreamEvent::System {
2319 session_id: "session-1".to_string(),
2320 model: "claude-test".to_string(),
2321 tools: Vec::new(),
2322 };
2323
2324 dispatch_stream_event(event, &mut handler, &mut extracted_text);
2325
2326 assert!(handler.texts.is_empty());
2327 assert!(handler.tool_calls.is_empty());
2328 assert!(handler.tool_results.is_empty());
2329 assert!(handler.errors.is_empty());
2330 assert!(handler.completions.is_empty());
2331 assert!(extracted_text.is_empty());
2332 }
2333
2334 #[test]
2349 fn test_tui_mode_stdin_reader_bypass() {
2350 let tui_mode = true;
2356 let tui_connected = tui_mode;
2357
2358 assert!(
2361 tui_connected,
2362 "When tui_mode is true, stdin reader must be skipped"
2363 );
2364
2365 let tui_mode_disabled = false;
2367 let tui_connected_non_tui = tui_mode_disabled;
2368 assert!(
2369 !tui_connected_non_tui,
2370 "When tui_mode is false, stdin reader must be spawned"
2371 );
2372 }
2373
2374 #[test]
2375 fn test_tui_mode_default_is_false() {
2376 let backend = CliBackend::claude();
2378 let config = PtyConfig::default();
2379 let executor = PtyExecutor::new(backend, config);
2380
2381 assert!(!executor.tui_mode, "tui_mode should default to false");
2383 }
2384
2385 #[test]
2386 fn test_set_tui_mode() {
2387 let backend = CliBackend::claude();
2389 let config = PtyConfig::default();
2390 let mut executor = PtyExecutor::new(backend, config);
2391
2392 assert!(!executor.tui_mode, "tui_mode should start as false");
2394
2395 executor.set_tui_mode(true);
2397 assert!(
2398 executor.tui_mode,
2399 "tui_mode should be true after set_tui_mode(true)"
2400 );
2401
2402 executor.set_tui_mode(false);
2404 assert!(
2405 !executor.tui_mode,
2406 "tui_mode should be false after set_tui_mode(false)"
2407 );
2408 }
2409
2410 #[test]
2411 fn test_build_result_populates_fields() {
2412 let output = b"\x1b[31mHello\x1b[0m\n";
2413 let extracted = "extracted text".to_string();
2414
2415 let result = build_result(
2416 output,
2417 true,
2418 Some(0),
2419 TerminationType::Natural,
2420 extracted.clone(),
2421 None,
2422 );
2423
2424 assert_eq!(result.output, String::from_utf8_lossy(output));
2425 assert!(result.stripped_output.contains("Hello"));
2426 assert!(!result.stripped_output.contains("\x1b["));
2427 assert_eq!(result.extracted_text, extracted);
2428 assert!(result.success);
2429 assert_eq!(result.exit_code, Some(0));
2430 assert_eq!(result.termination, TerminationType::Natural);
2431 }
2432
2433 #[cfg(unix)]
2434 #[tokio::test]
2435 async fn test_run_observe_executes_arg_prompt() {
2436 let temp_dir = TempDir::new().expect("temp dir");
2437 let backend = CliBackend {
2438 command: "sh".to_string(),
2439 args: vec!["-c".to_string()],
2440 prompt_mode: PromptMode::Arg,
2441 prompt_flag: None,
2442 output_format: OutputFormat::Text,
2443 env_vars: vec![],
2444 };
2445 let config = PtyConfig {
2446 interactive: false,
2447 idle_timeout_secs: 0,
2448 cols: 80,
2449 rows: 24,
2450 workspace_root: temp_dir.path().to_path_buf(),
2451 };
2452 let executor = PtyExecutor::new(backend, config);
2453 let (_tx, rx) = tokio::sync::watch::channel(false);
2454
2455 let result = executor
2456 .run_observe("echo hello-pty", rx)
2457 .await
2458 .expect("run_observe");
2459
2460 assert!(result.success);
2461 assert!(result.output.contains("hello-pty"));
2462 assert!(result.stripped_output.contains("hello-pty"));
2463 assert_eq!(result.exit_code, Some(0));
2464 assert_eq!(result.termination, TerminationType::Natural);
2465 }
2466
2467 #[cfg(unix)]
2468 #[tokio::test]
2469 async fn test_run_observe_writes_stdin_prompt() {
2470 let temp_dir = TempDir::new().expect("temp dir");
2471 let backend = CliBackend {
2472 command: "sh".to_string(),
2473 args: vec!["-c".to_string(), "read line; echo \"$line\"".to_string()],
2474 prompt_mode: PromptMode::Stdin,
2475 prompt_flag: None,
2476 output_format: OutputFormat::Text,
2477 env_vars: vec![],
2478 };
2479 let config = PtyConfig {
2480 interactive: false,
2481 idle_timeout_secs: 0,
2482 cols: 80,
2483 rows: 24,
2484 workspace_root: temp_dir.path().to_path_buf(),
2485 };
2486 let executor = PtyExecutor::new(backend, config);
2487 let (_tx, rx) = tokio::sync::watch::channel(false);
2488
2489 let result = executor
2490 .run_observe("stdin-line", rx)
2491 .await
2492 .expect("run_observe");
2493
2494 assert!(result.success);
2495 assert!(result.output.contains("stdin-line"));
2496 assert!(result.stripped_output.contains("stdin-line"));
2497 assert_eq!(result.termination, TerminationType::Natural);
2498 }
2499
2500 #[cfg(unix)]
2501 #[tokio::test]
2502 async fn test_run_observe_streaming_text_routes_output() {
2503 let temp_dir = TempDir::new().expect("temp dir");
2504 let backend = CliBackend {
2505 command: "sh".to_string(),
2506 args: vec!["-c".to_string()],
2507 prompt_mode: PromptMode::Arg,
2508 prompt_flag: None,
2509 output_format: OutputFormat::Text,
2510 env_vars: vec![],
2511 };
2512 let config = PtyConfig {
2513 interactive: false,
2514 idle_timeout_secs: 0,
2515 cols: 80,
2516 rows: 24,
2517 workspace_root: temp_dir.path().to_path_buf(),
2518 };
2519 let executor = PtyExecutor::new(backend, config);
2520 let (_tx, rx) = tokio::sync::watch::channel(false);
2521 let mut handler = CapturingHandler::default();
2522
2523 let result = executor
2524 .run_observe_streaming("printf 'alpha\\nbeta\\n'", rx, &mut handler)
2525 .await
2526 .expect("run_observe_streaming");
2527
2528 assert!(result.success);
2529 let captured = handler.texts.join("");
2530 assert!(captured.contains("alpha"), "captured: {captured}");
2531 assert!(captured.contains("beta"), "captured: {captured}");
2532 assert!(handler.completions.is_empty());
2533 assert!(result.extracted_text.is_empty());
2534 }
2535
2536 #[cfg(unix)]
2537 #[tokio::test]
2538 async fn test_run_observe_streaming_parses_stream_json() {
2539 let temp_dir = TempDir::new().expect("temp dir");
2540 let backend = CliBackend {
2541 command: "sh".to_string(),
2542 args: vec!["-c".to_string()],
2543 prompt_mode: PromptMode::Arg,
2544 prompt_flag: None,
2545 output_format: OutputFormat::StreamJson,
2546 env_vars: vec![],
2547 };
2548 let config = PtyConfig {
2549 interactive: false,
2550 idle_timeout_secs: 0,
2551 cols: 80,
2552 rows: 24,
2553 workspace_root: temp_dir.path().to_path_buf(),
2554 };
2555 let executor = PtyExecutor::new(backend, config);
2556 let (_tx, rx) = tokio::sync::watch::channel(false);
2557 let mut handler = CapturingHandler::default();
2558
2559 let script = r#"printf '%s\n' '{"type":"assistant","message":{"content":[{"type":"text","text":"Hello stream"}]}}' '{"type":"result","duration_ms":1,"total_cost_usd":0.0,"num_turns":1,"is_error":false}'"#;
2560 let result = executor
2561 .run_observe_streaming(script, rx, &mut handler)
2562 .await
2563 .expect("run_observe_streaming");
2564
2565 assert!(result.success);
2566 assert!(
2567 handler
2568 .texts
2569 .iter()
2570 .any(|text| text.contains("Hello stream"))
2571 );
2572 assert_eq!(handler.completions.len(), 1);
2573 assert!(result.extracted_text.contains("Hello stream"));
2574 assert_eq!(result.termination, TerminationType::Natural);
2575 }
2576
2577 #[cfg(unix)]
2578 #[tokio::test]
2579 async fn test_run_interactive_in_tui_mode() {
2580 let temp_dir = TempDir::new().expect("temp dir");
2581 let backend = CliBackend {
2582 command: "sh".to_string(),
2583 args: vec!["-c".to_string()],
2584 prompt_mode: PromptMode::Arg,
2585 prompt_flag: None,
2586 output_format: OutputFormat::Text,
2587 env_vars: vec![],
2588 };
2589 let config = PtyConfig {
2590 interactive: true,
2591 idle_timeout_secs: 0,
2592 cols: 80,
2593 rows: 24,
2594 workspace_root: temp_dir.path().to_path_buf(),
2595 };
2596 let mut executor = PtyExecutor::new(backend, config);
2597 executor.set_tui_mode(true);
2598 let (_tx, rx) = tokio::sync::watch::channel(false);
2599
2600 let result = executor
2601 .run_interactive("echo hello-tui", rx)
2602 .await
2603 .expect("run_interactive");
2604
2605 assert!(result.success);
2606 assert!(result.output.contains("hello-tui"));
2607 assert!(result.stripped_output.contains("hello-tui"));
2608 assert_eq!(result.exit_code, Some(0));
2609 assert_eq!(result.termination, TerminationType::Natural);
2610 }
2611}