Skip to main content

ralph_adapters/
pty_executor.rs

1//! PTY executor for running prompts with full terminal emulation.
2//!
3//! Spawns CLI tools in a pseudo-terminal to preserve rich TUI features like
4//! colors, spinners, and animations. Supports both interactive mode (user
5//! input forwarded) and observe mode (output-only).
6//!
7//! Key features:
8//! - PTY creation via `portable-pty` for cross-platform support
9//! - Idle timeout with activity tracking (output AND input reset timer)
10//! - Double Ctrl+C handling (first forwards, second terminates)
11//! - Raw mode management with cleanup on exit/crash
12//!
13//! Architecture:
14//! - Uses `tokio::select!` for non-blocking I/O multiplexing
15//! - Spawns separate tasks for PTY output and user input
16//! - Enables responsive Ctrl+C handling even when PTY is idle
17
18// Exit codes and PIDs are always within i32 range in practice
19#![allow(clippy::cast_possible_wrap)]
20
21use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
22use crate::cli_backend::{CliBackend, OutputFormat};
23use crate::copilot_stream::{
24    CopilotStreamParser, CopilotStreamState, dispatch_copilot_stream_event,
25};
26use crate::pi_stream::{PiSessionState, PiStreamParser, dispatch_pi_stream_event};
27use crate::stream_handler::{SessionResult, StreamHandler};
28#[cfg(unix)]
29use nix::sys::signal::{Signal, kill};
30#[cfg(unix)]
31use nix::unistd::Pid;
32use portable_pty::{CommandBuilder, PtyPair, PtySize, native_pty_system};
33use std::env;
34use std::io::{self, Read, Write};
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::{Duration, Instant};
38use tokio::sync::{mpsc, watch};
39use tracing::{debug, info, warn};
40
41/// Result of a PTY execution.
42#[derive(Debug)]
43pub struct PtyExecutionResult {
44    /// The accumulated output (ANSI sequences preserved).
45    pub output: String,
46    /// The ANSI-stripped output for event parsing.
47    pub stripped_output: String,
48    /// Extracted text content from NDJSON stream (for Claude's stream-json output).
49    /// When Claude outputs `--output-format stream-json`, event tags like
50    /// `<event topic="...">` are inside JSON string values. This field contains
51    /// the extracted text content for proper event parsing.
52    /// Empty for non-JSON backends (use `stripped_output` instead).
53    pub extracted_text: String,
54    /// Whether the process exited successfully.
55    pub success: bool,
56    /// The exit code if available.
57    pub exit_code: Option<i32>,
58    /// How the process was terminated.
59    pub termination: TerminationType,
60    /// Total session cost in USD, if available from stream metadata.
61    pub total_cost_usd: f64,
62    /// Total input tokens in the session.
63    pub input_tokens: u64,
64    /// Total output tokens in the session.
65    pub output_tokens: u64,
66    /// Total cache-read tokens in the session.
67    pub cache_read_tokens: u64,
68    /// Total cache-write tokens in the session.
69    pub cache_write_tokens: u64,
70}
71
72/// How the PTY process was terminated.
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum TerminationType {
75    /// Process exited naturally.
76    Natural,
77    /// Terminated due to idle timeout.
78    IdleTimeout,
79    /// Terminated by user (double Ctrl+C).
80    UserInterrupt,
81    /// Force killed by user (Ctrl+\).
82    ForceKill,
83}
84
85/// Configuration for PTY execution.
86#[derive(Debug, Clone)]
87pub struct PtyConfig {
88    /// Enable interactive mode (forward user input).
89    pub interactive: bool,
90    /// Idle timeout in seconds (0 = disabled).
91    pub idle_timeout_secs: u32,
92    /// Terminal width.
93    pub cols: u16,
94    /// Terminal height.
95    pub rows: u16,
96    /// Workspace root directory for command execution.
97    /// This is captured at startup to avoid `current_dir()` failures when the
98    /// working directory no longer exists (e.g., in E2E test workspaces).
99    pub workspace_root: std::path::PathBuf,
100}
101
102impl Default for PtyConfig {
103    fn default() -> Self {
104        Self {
105            interactive: true,
106            idle_timeout_secs: 30,
107            cols: 80,
108            rows: 24,
109            workspace_root: std::env::current_dir()
110                .unwrap_or_else(|_| std::path::PathBuf::from(".")),
111        }
112    }
113}
114
115impl PtyConfig {
116    /// Creates config from environment, falling back to defaults.
117    pub fn from_env() -> Self {
118        let cols = std::env::var("COLUMNS")
119            .ok()
120            .and_then(|s| s.parse().ok())
121            .unwrap_or(80);
122        let rows = std::env::var("LINES")
123            .ok()
124            .and_then(|s| s.parse().ok())
125            .unwrap_or(24);
126
127        Self {
128            cols,
129            rows,
130            ..Default::default()
131        }
132    }
133
134    /// Sets the workspace root directory.
135    pub fn with_workspace_root(mut self, root: impl Into<std::path::PathBuf>) -> Self {
136        self.workspace_root = root.into();
137        self
138    }
139}
140
141/// State machine for double Ctrl+C detection.
142#[derive(Debug)]
143pub struct CtrlCState {
144    /// When the first Ctrl+C was pressed (if any).
145    first_press: Option<Instant>,
146    /// Window duration for double-press detection.
147    window: Duration,
148}
149
150/// Action to take after handling Ctrl+C.
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum CtrlCAction {
153    /// Forward the Ctrl+C to Claude and start/restart the window.
154    ForwardAndStartWindow,
155    /// Terminate Claude (second Ctrl+C within window).
156    Terminate,
157}
158
159impl CtrlCState {
160    /// Creates a new Ctrl+C state tracker.
161    pub fn new() -> Self {
162        Self {
163            first_press: None,
164            window: Duration::from_secs(1),
165        }
166    }
167
168    /// Handles a Ctrl+C keypress and returns the action to take.
169    pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
170        match self.first_press {
171            Some(first) if now.duration_since(first) < self.window => {
172                // Second Ctrl+C within window - terminate
173                self.first_press = None;
174                CtrlCAction::Terminate
175            }
176            _ => {
177                // First Ctrl+C or window expired - forward and start window
178                self.first_press = Some(now);
179                CtrlCAction::ForwardAndStartWindow
180            }
181        }
182    }
183}
184
185impl Default for CtrlCState {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191/// Executor for running prompts in a pseudo-terminal.
192pub struct PtyExecutor {
193    backend: CliBackend,
194    config: PtyConfig,
195    // Channel ends for TUI integration
196    output_tx: mpsc::UnboundedSender<Vec<u8>>,
197    output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
198    input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
199    input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
200    control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
201    control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
202    // Termination notification for TUI
203    terminated_tx: watch::Sender<bool>,
204    terminated_rx: Option<watch::Receiver<bool>>,
205    // Explicit TUI mode flag - set via set_tui_mode() when TUI is connected.
206    // This replaces the previous inference via output_rx.is_none() which broke
207    // after the streaming refactor (handle() is no longer called in TUI mode).
208    tui_mode: bool,
209}
210
211impl PtyExecutor {
212    /// Creates a new PTY executor with the given backend and configuration.
213    pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
214        let (output_tx, output_rx) = mpsc::unbounded_channel();
215        let (input_tx, input_rx) = mpsc::unbounded_channel();
216        let (control_tx, control_rx) = mpsc::unbounded_channel();
217        let (terminated_tx, terminated_rx) = watch::channel(false);
218
219        Self {
220            backend,
221            config,
222            output_tx,
223            output_rx: Some(output_rx),
224            input_tx: Some(input_tx),
225            input_rx,
226            control_tx: Some(control_tx),
227            control_rx,
228            terminated_tx,
229            terminated_rx: Some(terminated_rx),
230            tui_mode: false,
231        }
232    }
233
234    /// Sets the TUI mode flag.
235    ///
236    /// When TUI mode is enabled, PTY output is sent to the TUI channel instead of
237    /// being written directly to stdout. This flag must be set before calling any
238    /// of the run methods when using the TUI.
239    ///
240    /// # Arguments
241    /// * `enabled` - Whether TUI mode should be active
242    pub fn set_tui_mode(&mut self, enabled: bool) {
243        self.tui_mode = enabled;
244    }
245
246    /// Updates the backend configuration for this executor.
247    ///
248    /// This allows switching backends between iterations without recreating
249    /// the entire executor. Critical for hat-level backend configuration support.
250    ///
251    /// # Arguments
252    /// * `backend` - The new backend configuration to use
253    pub fn set_backend(&mut self, backend: CliBackend) {
254        self.backend = backend;
255    }
256
257    /// Returns a handle for TUI integration.
258    ///
259    /// Can only be called once - panics if called multiple times.
260    pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
261        crate::pty_handle::PtyHandle {
262            output_rx: self.output_rx.take().expect("handle() already called"),
263            input_tx: self.input_tx.take().expect("handle() already called"),
264            control_tx: self.control_tx.take().expect("handle() already called"),
265            terminated_rx: self.terminated_rx.take().expect("handle() already called"),
266        }
267    }
268
269    /// Spawns Claude in a PTY and returns the PTY pair, child process, stdin input, and temp file.
270    ///
271    /// The temp file is returned to keep it alive for the duration of execution.
272    /// For large prompts (>7000 chars), Claude is instructed to read from a temp file.
273    /// If the temp file is dropped before Claude reads it, the file is deleted and Claude hangs.
274    ///
275    /// The stdin_input is returned so callers can write it to the PTY after taking the writer.
276    /// This is necessary because `take_writer()` can only be called once per PTY.
277    fn spawn_pty(
278        &self,
279        prompt: &str,
280    ) -> io::Result<(
281        PtyPair,
282        Box<dyn portable_pty::Child + Send>,
283        Option<String>,
284        Option<tempfile::NamedTempFile>,
285    )> {
286        let pty_system = native_pty_system();
287
288        let pair = pty_system
289            .openpty(PtySize {
290                rows: self.config.rows,
291                cols: self.config.cols,
292                pixel_width: 0,
293                pixel_height: 0,
294            })
295            .map_err(|e| io::Error::other(e.to_string()))?;
296
297        let (cmd, args, stdin_input, temp_file) =
298            self.backend.build_command(prompt, self.config.interactive);
299
300        let mut cmd_builder = CommandBuilder::new(&cmd);
301        cmd_builder.args(&args);
302
303        // Set explicit working directory from config (captured at startup to avoid
304        // current_dir() failures when workspace no longer exists)
305        cmd_builder.cwd(&self.config.workspace_root);
306
307        // Set up environment for PTY
308        cmd_builder.env("TERM", "xterm-256color");
309        inject_ralph_runtime_env(&mut cmd_builder, &self.config.workspace_root);
310
311        // Apply backend-specific environment variables (e.g., Agent Teams env var)
312        for (key, value) in &self.backend.env_vars {
313            cmd_builder.env(key, value);
314        }
315        let child = pair
316            .slave
317            .spawn_command(cmd_builder)
318            .map_err(|e| io::Error::other(e.to_string()))?;
319
320        // Return stdin_input so callers can write it after taking the writer
321        Ok((pair, child, stdin_input, temp_file))
322    }
323
324    /// Runs in observe mode (output-only, no input forwarding).
325    ///
326    /// This is an async function that listens for interrupt signals via the shared
327    /// `interrupt_rx` watch channel from the event loop.
328    /// Uses a separate thread for blocking PTY reads and tokio::select! for signal handling.
329    ///
330    /// Returns when the process exits, idle timeout triggers, or interrupt is received.
331    ///
332    /// # Arguments
333    /// * `prompt` - The prompt to execute
334    /// * `interrupt_rx` - Watch channel receiver for interrupt signals from the event loop
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if PTY allocation fails, the command cannot be spawned,
339    /// or an I/O error occurs during output handling.
340    pub async fn run_observe(
341        &self,
342        prompt: &str,
343        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
344    ) -> io::Result<PtyExecutionResult> {
345        // Keep temp_file alive for the duration of execution (large prompts use temp files)
346        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
347
348        let reader = pair
349            .master
350            .try_clone_reader()
351            .map_err(|e| io::Error::other(e.to_string()))?;
352
353        // Write stdin input if present (for stdin prompt mode)
354        if let Some(ref input) = stdin_input {
355            // Small delay to let process initialize
356            tokio::time::sleep(Duration::from_millis(100)).await;
357            let mut writer = pair
358                .master
359                .take_writer()
360                .map_err(|e| io::Error::other(e.to_string()))?;
361            writer.write_all(input.as_bytes())?;
362            writer.write_all(b"\n")?;
363            writer.flush()?;
364        }
365
366        // Drop the slave to signal EOF when master closes
367        drop(pair.slave);
368
369        let mut output = Vec::new();
370        let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
371            None
372        } else {
373            Some(Duration::from_secs(u64::from(
374                self.config.idle_timeout_secs,
375            )))
376        };
377
378        let mut termination = TerminationType::Natural;
379        let mut last_activity = Instant::now();
380
381        // Flag for termination request (shared with reader thread)
382        let should_terminate = Arc::new(AtomicBool::new(false));
383
384        // Spawn blocking reader thread that sends output via channel
385        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
386        let should_terminate_reader = Arc::clone(&should_terminate);
387        // Check if TUI is handling output (output_rx taken by handle())
388        let tui_connected = self.tui_mode;
389        let tui_output_tx = if tui_connected {
390            Some(self.output_tx.clone())
391        } else {
392            None
393        };
394
395        debug!("Spawning PTY output reader thread (observe mode)");
396        std::thread::spawn(move || {
397            let mut reader = reader;
398            let mut buf = [0u8; 4096];
399
400            loop {
401                if should_terminate_reader.load(Ordering::SeqCst) {
402                    debug!("PTY reader: termination requested");
403                    break;
404                }
405
406                match reader.read(&mut buf) {
407                    Ok(0) => {
408                        debug!("PTY reader: EOF");
409                        let _ = output_tx.blocking_send(OutputEvent::Eof);
410                        break;
411                    }
412                    Ok(n) => {
413                        let data = buf[..n].to_vec();
414                        // Send to TUI channel if connected
415                        if let Some(ref tx) = tui_output_tx {
416                            let _ = tx.send(data.clone());
417                        }
418                        // Send to main loop
419                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
420                            break;
421                        }
422                    }
423                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
424                        std::thread::sleep(Duration::from_millis(10));
425                    }
426                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
427                    Err(e) => {
428                        debug!(error = %e, "PTY reader error");
429                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
430                        break;
431                    }
432                }
433            }
434        });
435
436        // Main event loop using tokio::select! for interruptibility
437        loop {
438            // Calculate timeout for idle check
439            let idle_timeout = timeout_duration.map(|d| {
440                let elapsed = last_activity.elapsed();
441                if elapsed >= d {
442                    Duration::from_millis(1) // Trigger immediately
443                } else {
444                    d.saturating_sub(elapsed)
445                }
446            });
447
448            tokio::select! {
449                // Check for interrupt signal from event loop
450                _ = interrupt_rx.changed() => {
451                    if *interrupt_rx.borrow() {
452                        debug!("Interrupt received in observe mode, terminating");
453                        termination = TerminationType::UserInterrupt;
454                        should_terminate.store(true, Ordering::SeqCst);
455                        let _ = self.terminate_child(&mut child, true).await;
456                        break;
457                    }
458                }
459
460                // Check for output from reader thread
461                event = output_rx.recv() => {
462                    match event {
463                        Some(OutputEvent::Data(data)) => {
464                            // Only write to stdout if TUI is NOT handling output
465                            if !tui_connected {
466                                io::stdout().write_all(&data)?;
467                                io::stdout().flush()?;
468                            }
469                            output.extend_from_slice(&data);
470                            last_activity = Instant::now();
471                        }
472                        Some(OutputEvent::Eof) | None => {
473                            debug!("Output channel closed, process likely exited");
474                            break;
475                        }
476                        Some(OutputEvent::Error(e)) => {
477                            debug!(error = %e, "Reader thread reported error");
478                            break;
479                        }
480                    }
481                }
482
483                // Check for idle timeout
484                _ = async {
485                    if let Some(timeout) = idle_timeout {
486                        tokio::time::sleep(timeout).await;
487                    } else {
488                        // No timeout configured, wait forever
489                        std::future::pending::<()>().await;
490                    }
491                } => {
492                    warn!(
493                        timeout_secs = self.config.idle_timeout_secs,
494                        "Idle timeout triggered"
495                    );
496                    termination = TerminationType::IdleTimeout;
497                    should_terminate.store(true, Ordering::SeqCst);
498                    self.terminate_child(&mut child, true).await?;
499                    break;
500                }
501            }
502
503            // Check if child has exited
504            if let Some(status) = child
505                .try_wait()
506                .map_err(|e| io::Error::other(e.to_string()))?
507            {
508                let exit_code = status.exit_code() as i32;
509                debug!(exit_status = ?status, exit_code, "Child process exited");
510
511                // Drain any remaining output from channel
512                while let Ok(event) = output_rx.try_recv() {
513                    if let OutputEvent::Data(data) = event {
514                        if !tui_connected {
515                            io::stdout().write_all(&data)?;
516                            io::stdout().flush()?;
517                        }
518                        output.extend_from_slice(&data);
519                    }
520                }
521
522                // Give the reader thread a brief window to flush any final bytes/EOF.
523                // This avoids races where fast-exiting commands can drop tail output.
524                let drain_deadline = Instant::now() + Duration::from_millis(200);
525                loop {
526                    let remaining = drain_deadline.saturating_duration_since(Instant::now());
527                    if remaining.is_zero() {
528                        break;
529                    }
530                    match tokio::time::timeout(remaining, output_rx.recv()).await {
531                        Ok(Some(OutputEvent::Data(data))) => {
532                            if !tui_connected {
533                                io::stdout().write_all(&data)?;
534                                io::stdout().flush()?;
535                            }
536                            output.extend_from_slice(&data);
537                        }
538                        Ok(Some(OutputEvent::Eof) | None) => break,
539                        Ok(Some(OutputEvent::Error(e))) => {
540                            debug!(error = %e, "PTY read error after exit");
541                            break;
542                        }
543                        Err(_) => break,
544                    }
545                }
546
547                let final_termination = resolve_termination_type(exit_code, termination);
548                // run_observe doesn't parse JSON, so extracted_text is empty
549                return Ok(build_result(
550                    &output,
551                    status.success(),
552                    Some(exit_code),
553                    final_termination,
554                    String::new(),
555                    None,
556                ));
557            }
558        }
559
560        // Signal reader thread to stop
561        should_terminate.store(true, Ordering::SeqCst);
562
563        // Wait for child to fully exit (interruptible + bounded)
564        let status = self
565            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
566            .await?;
567
568        let (success, exit_code, final_termination) = match status {
569            Some(s) => {
570                let code = s.exit_code() as i32;
571                (
572                    s.success(),
573                    Some(code),
574                    resolve_termination_type(code, termination),
575                )
576            }
577            None => {
578                warn!("Timed out waiting for child to exit after termination");
579                (false, None, termination)
580            }
581        };
582
583        // run_observe doesn't parse JSON, so extracted_text is empty
584        Ok(build_result(
585            &output,
586            success,
587            exit_code,
588            final_termination,
589            String::new(),
590            None,
591        ))
592    }
593
594    /// Runs in observe mode with streaming event handling for JSON output.
595    ///
596    /// When the backend's output format is `StreamJson`, this method parses
597    /// NDJSON lines and dispatches events to the provided handler for real-time
598    /// display. For `Text` format, behaves identically to `run_observe`.
599    ///
600    /// # Arguments
601    /// * `prompt` - The prompt to execute
602    /// * `interrupt_rx` - Watch channel receiver for interrupt signals
603    /// * `handler` - Handler to receive streaming events
604    ///
605    /// # Errors
606    ///
607    /// Returns an error if PTY allocation fails, the command cannot be spawned,
608    /// or an I/O error occurs during output handling.
609    pub async fn run_observe_streaming<H: StreamHandler>(
610        &self,
611        prompt: &str,
612        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
613        handler: &mut H,
614    ) -> io::Result<PtyExecutionResult> {
615        // Check output format to decide parsing strategy
616        let output_format = self.backend.output_format;
617
618        // StreamJson format uses NDJSON line parsing (Claude)
619        // CopilotStreamJson format uses JSONL line parsing (Copilot prompt mode)
620        // PiStreamJson format uses NDJSON line parsing (Pi)
621        // Text format streams raw output directly to handler
622        let is_stream_json = output_format == OutputFormat::StreamJson;
623        let is_copilot_stream = output_format == OutputFormat::CopilotStreamJson;
624        let is_pi_stream = output_format == OutputFormat::PiStreamJson;
625        // Pi thinking deltas are noisy for plain console output but useful in TUI.
626        let show_pi_thinking = is_pi_stream && self.tui_mode;
627        let is_real_pi_backend = self.backend.command == "pi";
628
629        if is_pi_stream && is_real_pi_backend {
630            let configured_provider =
631                extract_cli_flag_value(&self.backend.args, "--provider", "-p")
632                    .unwrap_or_else(|| "auto".to_string());
633            let configured_model = extract_cli_flag_value(&self.backend.args, "--model", "-m")
634                .unwrap_or_else(|| "default".to_string());
635            handler.on_text(&format!(
636                "Pi configured: provider={configured_provider}, model={configured_model}\n"
637            ));
638        }
639
640        // Keep temp_file alive for the duration of execution
641        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
642
643        let reader = pair
644            .master
645            .try_clone_reader()
646            .map_err(|e| io::Error::other(e.to_string()))?;
647
648        // Write stdin input if present (for stdin prompt mode)
649        if let Some(ref input) = stdin_input {
650            tokio::time::sleep(Duration::from_millis(100)).await;
651            let mut writer = pair
652                .master
653                .take_writer()
654                .map_err(|e| io::Error::other(e.to_string()))?;
655            writer.write_all(input.as_bytes())?;
656            writer.write_all(b"\n")?;
657            writer.flush()?;
658        }
659
660        drop(pair.slave);
661
662        let mut output = Vec::new();
663        let mut line_buffer = String::new();
664        // Accumulate extracted text from NDJSON for event parsing
665        let mut extracted_text = String::new();
666        // Pi session state for accumulating cost/turns (wall-clock for duration)
667        let mut pi_state = PiSessionState::new();
668        let mut copilot_state = CopilotStreamState::new();
669        let mut completion: Option<SessionResult> = None;
670        let start_time = Instant::now();
671        let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
672            None
673        } else {
674            Some(Duration::from_secs(u64::from(
675                self.config.idle_timeout_secs,
676            )))
677        };
678
679        let mut termination = TerminationType::Natural;
680        let mut last_activity = Instant::now();
681
682        let should_terminate = Arc::new(AtomicBool::new(false));
683
684        // Spawn blocking reader thread
685        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
686        let should_terminate_reader = Arc::clone(&should_terminate);
687        let tui_connected = self.tui_mode;
688        let tui_output_tx = if tui_connected {
689            Some(self.output_tx.clone())
690        } else {
691            None
692        };
693
694        debug!("Spawning PTY output reader thread (streaming mode)");
695        std::thread::spawn(move || {
696            let mut reader = reader;
697            let mut buf = [0u8; 4096];
698
699            loop {
700                if should_terminate_reader.load(Ordering::SeqCst) {
701                    debug!("PTY reader: termination requested");
702                    break;
703                }
704
705                match reader.read(&mut buf) {
706                    Ok(0) => {
707                        debug!("PTY reader: EOF");
708                        let _ = output_tx.blocking_send(OutputEvent::Eof);
709                        break;
710                    }
711                    Ok(n) => {
712                        let data = buf[..n].to_vec();
713                        if let Some(ref tx) = tui_output_tx {
714                            let _ = tx.send(data.clone());
715                        }
716                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
717                            break;
718                        }
719                    }
720                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
721                        std::thread::sleep(Duration::from_millis(10));
722                    }
723                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
724                    Err(e) => {
725                        debug!(error = %e, "PTY reader error");
726                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
727                        break;
728                    }
729                }
730            }
731        });
732
733        // Main event loop with JSON line parsing
734        loop {
735            let idle_timeout = timeout_duration.map(|d| {
736                let elapsed = last_activity.elapsed();
737                if elapsed >= d {
738                    Duration::from_millis(1)
739                } else {
740                    d.saturating_sub(elapsed)
741                }
742            });
743
744            tokio::select! {
745                _ = interrupt_rx.changed() => {
746                    if *interrupt_rx.borrow() {
747                        debug!("Interrupt received in streaming observe mode, terminating");
748                        termination = TerminationType::UserInterrupt;
749                        should_terminate.store(true, Ordering::SeqCst);
750                        let _ = self.terminate_child(&mut child, true).await;
751                        break;
752                    }
753                }
754
755                event = output_rx.recv() => {
756                    match event {
757                        Some(OutputEvent::Data(data)) => {
758                            output.extend_from_slice(&data);
759                            last_activity = Instant::now();
760
761                            if let Ok(text) = std::str::from_utf8(&data) {
762                                if is_stream_json {
763                                    // StreamJson format: Parse JSON lines from the data
764                                    line_buffer.push_str(text);
765
766                                    // Process complete lines
767                                    while let Some(newline_pos) = line_buffer.find('\n') {
768                                        let line = line_buffer[..newline_pos].to_string();
769                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
770
771                                        if let Some(event) = ClaudeStreamParser::parse_line(&line) {
772                                            if let ClaudeStreamEvent::Result {
773                                                duration_ms,
774                                                total_cost_usd,
775                                                num_turns,
776                                                is_error,
777                                            } = &event
778                                            {
779                                                completion = Some(SessionResult {
780                                                    duration_ms: *duration_ms,
781                                                    total_cost_usd: *total_cost_usd,
782                                                    num_turns: *num_turns,
783                                                    is_error: *is_error,
784                                                    ..Default::default()
785                                                });
786                                            }
787                                            dispatch_stream_event(event, handler, &mut extracted_text);
788                                        }
789                                    }
790                                } else if is_copilot_stream {
791                                    line_buffer.push_str(text);
792
793                                    while let Some(newline_pos) = line_buffer.find('\n') {
794                                        let line = line_buffer[..newline_pos].to_string();
795                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
796
797                                        if let Some(session_result) = handle_copilot_stream_line(
798                                            &line,
799                                            handler,
800                                            &mut extracted_text,
801                                            &mut copilot_state,
802                                        ) {
803                                            completion = Some(session_result);
804                                        }
805                                    }
806                                } else if is_pi_stream {
807                                    // PiStreamJson format: Parse NDJSON lines from pi
808                                    line_buffer.push_str(text);
809
810                                    while let Some(newline_pos) = line_buffer.find('\n') {
811                                        let line = line_buffer[..newline_pos].to_string();
812                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
813
814                                        if let Some(event) = PiStreamParser::parse_line(&line) {
815                                            dispatch_pi_stream_event(
816                                                event,
817                                                handler,
818                                                &mut extracted_text,
819                                                &mut pi_state,
820                                                show_pi_thinking,
821                                            );
822                                        }
823                                    }
824                                } else {
825                                    // Text format: Stream raw output directly to handler
826                                    // This preserves ANSI escape codes for TUI rendering
827                                    handler.on_text(text);
828                                }
829                            }
830                        }
831                        Some(OutputEvent::Eof) | None => {
832                            debug!("Output channel closed");
833                            // Process any remaining content in buffer
834                            if is_stream_json && !line_buffer.is_empty()
835                                && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
836                            {
837                                if let ClaudeStreamEvent::Result {
838                                    duration_ms,
839                                    total_cost_usd,
840                                    num_turns,
841                                    is_error,
842                                } = &event
843                                {
844                                    completion = Some(SessionResult {
845                                        duration_ms: *duration_ms,
846                                        total_cost_usd: *total_cost_usd,
847                                        num_turns: *num_turns,
848                                        is_error: *is_error,
849                                        ..Default::default()
850                                    });
851                                }
852                                dispatch_stream_event(event, handler, &mut extracted_text);
853                            } else if is_copilot_stream && !line_buffer.is_empty() {
854                                if let Some(session_result) = handle_copilot_stream_line(
855                                    &line_buffer,
856                                    handler,
857                                    &mut extracted_text,
858                                    &mut copilot_state,
859                                ) {
860                                    completion = Some(session_result);
861                                }
862                            } else if is_pi_stream && !line_buffer.is_empty()
863                                && let Some(event) = PiStreamParser::parse_line(&line_buffer)
864                            {
865                                dispatch_pi_stream_event(
866                                    event,
867                                    handler,
868                                    &mut extracted_text,
869                                    &mut pi_state,
870                                    show_pi_thinking,
871                                );
872                            }
873                            break;
874                        }
875                        Some(OutputEvent::Error(e)) => {
876                            debug!(error = %e, "Reader thread reported error");
877                            handler.on_error(&e);
878                            break;
879                        }
880                    }
881                }
882
883                _ = async {
884                    if let Some(timeout) = idle_timeout {
885                        tokio::time::sleep(timeout).await;
886                    } else {
887                        std::future::pending::<()>().await;
888                    }
889                } => {
890                    warn!(
891                        timeout_secs = self.config.idle_timeout_secs,
892                        "Idle timeout triggered"
893                    );
894                    termination = TerminationType::IdleTimeout;
895                    should_terminate.store(true, Ordering::SeqCst);
896                    self.terminate_child(&mut child, true).await?;
897                    break;
898                }
899            }
900
901            // Check if child has exited
902            if let Some(status) = child
903                .try_wait()
904                .map_err(|e| io::Error::other(e.to_string()))?
905            {
906                let exit_code = status.exit_code() as i32;
907                debug!(exit_status = ?status, exit_code, "Child process exited");
908
909                // Drain remaining output
910                while let Ok(event) = output_rx.try_recv() {
911                    if let OutputEvent::Data(data) = event {
912                        output.extend_from_slice(&data);
913                        if let Ok(text) = std::str::from_utf8(&data) {
914                            if is_stream_json {
915                                // StreamJson: parse JSON lines
916                                line_buffer.push_str(text);
917                                while let Some(newline_pos) = line_buffer.find('\n') {
918                                    let line = line_buffer[..newline_pos].to_string();
919                                    line_buffer = line_buffer[newline_pos + 1..].to_string();
920                                    if let Some(event) = ClaudeStreamParser::parse_line(&line) {
921                                        if let ClaudeStreamEvent::Result {
922                                            duration_ms,
923                                            total_cost_usd,
924                                            num_turns,
925                                            is_error,
926                                        } = &event
927                                        {
928                                            completion = Some(SessionResult {
929                                                duration_ms: *duration_ms,
930                                                total_cost_usd: *total_cost_usd,
931                                                num_turns: *num_turns,
932                                                is_error: *is_error,
933                                                ..Default::default()
934                                            });
935                                        }
936                                        dispatch_stream_event(event, handler, &mut extracted_text);
937                                    }
938                                }
939                            } else if is_copilot_stream {
940                                line_buffer.push_str(text);
941                                while let Some(newline_pos) = line_buffer.find('\n') {
942                                    let line = line_buffer[..newline_pos].to_string();
943                                    line_buffer = line_buffer[newline_pos + 1..].to_string();
944                                    if let Some(session_result) = handle_copilot_stream_line(
945                                        &line,
946                                        handler,
947                                        &mut extracted_text,
948                                        &mut copilot_state,
949                                    ) {
950                                        completion = Some(session_result);
951                                    }
952                                }
953                            } else if is_pi_stream {
954                                // PiStreamJson: parse NDJSON lines
955                                line_buffer.push_str(text);
956                                while let Some(newline_pos) = line_buffer.find('\n') {
957                                    let line = line_buffer[..newline_pos].to_string();
958                                    line_buffer = line_buffer[newline_pos + 1..].to_string();
959                                    if let Some(event) = PiStreamParser::parse_line(&line) {
960                                        dispatch_pi_stream_event(
961                                            event,
962                                            handler,
963                                            &mut extracted_text,
964                                            &mut pi_state,
965                                            show_pi_thinking,
966                                        );
967                                    }
968                                }
969                            } else {
970                                // Text: stream raw output to handler
971                                handler.on_text(text);
972                            }
973                        }
974                    }
975                }
976
977                // Give the reader thread a brief window to flush any final bytes/EOF.
978                // This avoids races where fast-exiting commands can drop tail output.
979                let drain_deadline = Instant::now() + Duration::from_millis(200);
980                loop {
981                    let remaining = drain_deadline.saturating_duration_since(Instant::now());
982                    if remaining.is_zero() {
983                        break;
984                    }
985                    match tokio::time::timeout(remaining, output_rx.recv()).await {
986                        Ok(Some(OutputEvent::Data(data))) => {
987                            output.extend_from_slice(&data);
988                            if let Ok(text) = std::str::from_utf8(&data) {
989                                if is_stream_json {
990                                    // StreamJson: parse JSON lines
991                                    line_buffer.push_str(text);
992                                    while let Some(newline_pos) = line_buffer.find('\n') {
993                                        let line = line_buffer[..newline_pos].to_string();
994                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
995                                        if let Some(event) = ClaudeStreamParser::parse_line(&line) {
996                                            if let ClaudeStreamEvent::Result {
997                                                duration_ms,
998                                                total_cost_usd,
999                                                num_turns,
1000                                                is_error,
1001                                            } = &event
1002                                            {
1003                                                completion = Some(SessionResult {
1004                                                    duration_ms: *duration_ms,
1005                                                    total_cost_usd: *total_cost_usd,
1006                                                    num_turns: *num_turns,
1007                                                    is_error: *is_error,
1008                                                    ..Default::default()
1009                                                });
1010                                            }
1011                                            dispatch_stream_event(
1012                                                event,
1013                                                handler,
1014                                                &mut extracted_text,
1015                                            );
1016                                        }
1017                                    }
1018                                } else if is_copilot_stream {
1019                                    line_buffer.push_str(text);
1020                                    while let Some(newline_pos) = line_buffer.find('\n') {
1021                                        let line = line_buffer[..newline_pos].to_string();
1022                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
1023                                        handle_copilot_stream_line(
1024                                            &line,
1025                                            handler,
1026                                            &mut extracted_text,
1027                                            &mut copilot_state,
1028                                        );
1029                                    }
1030                                } else if is_pi_stream {
1031                                    // PiStreamJson: parse NDJSON lines
1032                                    line_buffer.push_str(text);
1033                                    while let Some(newline_pos) = line_buffer.find('\n') {
1034                                        let line = line_buffer[..newline_pos].to_string();
1035                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
1036                                        if let Some(event) = PiStreamParser::parse_line(&line) {
1037                                            dispatch_pi_stream_event(
1038                                                event,
1039                                                handler,
1040                                                &mut extracted_text,
1041                                                &mut pi_state,
1042                                                show_pi_thinking,
1043                                            );
1044                                        }
1045                                    }
1046                                } else {
1047                                    // Text: stream raw output to handler
1048                                    handler.on_text(text);
1049                                }
1050                            }
1051                        }
1052                        Ok(Some(OutputEvent::Eof) | None) => break,
1053                        Ok(Some(OutputEvent::Error(e))) => {
1054                            debug!(error = %e, "PTY read error after exit");
1055                            break;
1056                        }
1057                        Err(_) => break,
1058                    }
1059                }
1060
1061                // Process final buffer content
1062                if is_stream_json
1063                    && !line_buffer.is_empty()
1064                    && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
1065                {
1066                    if let ClaudeStreamEvent::Result {
1067                        duration_ms,
1068                        total_cost_usd,
1069                        num_turns,
1070                        is_error,
1071                    } = &event
1072                    {
1073                        completion = Some(SessionResult {
1074                            duration_ms: *duration_ms,
1075                            total_cost_usd: *total_cost_usd,
1076                            num_turns: *num_turns,
1077                            is_error: *is_error,
1078                            ..Default::default()
1079                        });
1080                    }
1081                    dispatch_stream_event(event, handler, &mut extracted_text);
1082                } else if is_copilot_stream && !line_buffer.is_empty() {
1083                    if let Some(session_result) = handle_copilot_stream_line(
1084                        &line_buffer,
1085                        handler,
1086                        &mut extracted_text,
1087                        &mut copilot_state,
1088                    ) {
1089                        completion = Some(session_result);
1090                    }
1091                } else if is_pi_stream
1092                    && !line_buffer.is_empty()
1093                    && let Some(event) = PiStreamParser::parse_line(&line_buffer)
1094                {
1095                    dispatch_pi_stream_event(
1096                        event,
1097                        handler,
1098                        &mut extracted_text,
1099                        &mut pi_state,
1100                        show_pi_thinking,
1101                    );
1102                }
1103
1104                let final_termination = resolve_termination_type(exit_code, termination);
1105
1106                // Synthesize on_complete for Pi sessions (pi has no dedicated result event)
1107                if is_pi_stream {
1108                    if is_real_pi_backend {
1109                        let stream_provider =
1110                            pi_state.stream_provider.as_deref().unwrap_or("unknown");
1111                        let stream_model = pi_state.stream_model.as_deref().unwrap_or("unknown");
1112                        handler.on_text(&format!(
1113                            "Pi stream: provider={stream_provider}, model={stream_model}\n"
1114                        ));
1115                    }
1116                    let session_result = SessionResult {
1117                        duration_ms: start_time.elapsed().as_millis() as u64,
1118                        total_cost_usd: pi_state.total_cost_usd,
1119                        num_turns: pi_state.num_turns,
1120                        is_error: !status.success(),
1121                        input_tokens: pi_state.input_tokens,
1122                        output_tokens: pi_state.output_tokens,
1123                        cache_read_tokens: pi_state.cache_read_tokens,
1124                        cache_write_tokens: pi_state.cache_write_tokens,
1125                    };
1126                    handler.on_complete(&session_result);
1127                    completion = Some(session_result);
1128                }
1129
1130                // Pass extracted_text for event parsing from NDJSON
1131                return Ok(build_result(
1132                    &output,
1133                    status.success(),
1134                    Some(exit_code),
1135                    final_termination,
1136                    extracted_text,
1137                    completion.as_ref(),
1138                ));
1139            }
1140        }
1141
1142        should_terminate.store(true, Ordering::SeqCst);
1143
1144        let status = self
1145            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1146            .await?;
1147
1148        let (success, exit_code, final_termination) = match status {
1149            Some(s) => {
1150                let code = s.exit_code() as i32;
1151                (
1152                    s.success(),
1153                    Some(code),
1154                    resolve_termination_type(code, termination),
1155                )
1156            }
1157            None => {
1158                warn!("Timed out waiting for child to exit after termination");
1159                (false, None, termination)
1160            }
1161        };
1162
1163        // Synthesize on_complete for Pi sessions (pi has no dedicated result event)
1164        if is_pi_stream {
1165            if is_real_pi_backend {
1166                let stream_provider = pi_state.stream_provider.as_deref().unwrap_or("unknown");
1167                let stream_model = pi_state.stream_model.as_deref().unwrap_or("unknown");
1168                handler.on_text(&format!(
1169                    "Pi stream: provider={stream_provider}, model={stream_model}\n"
1170                ));
1171            }
1172            let session_result = SessionResult {
1173                duration_ms: start_time.elapsed().as_millis() as u64,
1174                total_cost_usd: pi_state.total_cost_usd,
1175                num_turns: pi_state.num_turns,
1176                is_error: !success,
1177                input_tokens: pi_state.input_tokens,
1178                output_tokens: pi_state.output_tokens,
1179                cache_read_tokens: pi_state.cache_read_tokens,
1180                cache_write_tokens: pi_state.cache_write_tokens,
1181            };
1182            handler.on_complete(&session_result);
1183            completion = Some(session_result);
1184        }
1185
1186        // Pass extracted_text for event parsing from NDJSON
1187        Ok(build_result(
1188            &output,
1189            success,
1190            exit_code,
1191            final_termination,
1192            extracted_text,
1193            completion.as_ref(),
1194        ))
1195    }
1196
1197    /// Runs in interactive mode (bidirectional I/O).
1198    ///
1199    /// Uses `tokio::select!` for non-blocking I/O multiplexing between:
1200    /// 1. PTY output (from blocking reader via channel)
1201    /// 2. User input (from stdin thread via channel)
1202    /// 3. Interrupt signal from event loop
1203    /// 4. Idle timeout
1204    ///
1205    /// This design ensures Ctrl+C is always responsive, even when the PTY
1206    /// has no output (e.g., during long-running tool calls).
1207    ///
1208    /// # Arguments
1209    /// * `prompt` - The prompt to execute
1210    /// * `interrupt_rx` - Watch channel receiver for interrupt signals from the event loop
1211    ///
1212    /// # Errors
1213    ///
1214    /// Returns an error if PTY allocation fails, the command cannot be spawned,
1215    /// or an I/O error occurs during bidirectional communication.
1216    #[allow(clippy::too_many_lines)] // Complex state machine requires cohesive implementation
1217    pub async fn run_interactive(
1218        &mut self,
1219        prompt: &str,
1220        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
1221    ) -> io::Result<PtyExecutionResult> {
1222        // Keep temp_file alive for the duration of execution (large prompts use temp files)
1223        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
1224
1225        let reader = pair
1226            .master
1227            .try_clone_reader()
1228            .map_err(|e| io::Error::other(e.to_string()))?;
1229        let mut writer = pair
1230            .master
1231            .take_writer()
1232            .map_err(|e| io::Error::other(e.to_string()))?;
1233
1234        // Keep master for resize operations
1235        let master = pair.master;
1236
1237        // Drop the slave to signal EOF when master closes
1238        drop(pair.slave);
1239
1240        // Store stdin_input for writing after reader thread starts
1241        let pending_stdin = stdin_input;
1242
1243        let mut output = Vec::new();
1244        let timeout_duration = if self.config.idle_timeout_secs > 0 {
1245            Some(Duration::from_secs(u64::from(
1246                self.config.idle_timeout_secs,
1247            )))
1248        } else {
1249            None
1250        };
1251
1252        let mut ctrl_c_state = CtrlCState::new();
1253        let mut termination = TerminationType::Natural;
1254        let mut last_activity = Instant::now();
1255
1256        // Flag for termination request (shared with spawned tasks)
1257        let should_terminate = Arc::new(AtomicBool::new(false));
1258
1259        // Spawn output reading task (blocking read wrapped in spawn_blocking via channel)
1260        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
1261        let should_terminate_output = Arc::clone(&should_terminate);
1262        // Check if TUI is handling output (output_rx taken by handle())
1263        let tui_connected = self.tui_mode;
1264        let tui_output_tx = if tui_connected {
1265            Some(self.output_tx.clone())
1266        } else {
1267            None
1268        };
1269
1270        debug!("Spawning PTY output reader thread");
1271        std::thread::spawn(move || {
1272            debug!("PTY output reader thread started");
1273            let mut reader = reader;
1274            let mut buf = [0u8; 4096];
1275
1276            loop {
1277                if should_terminate_output.load(Ordering::SeqCst) {
1278                    debug!("PTY output reader: termination requested");
1279                    break;
1280                }
1281
1282                match reader.read(&mut buf) {
1283                    Ok(0) => {
1284                        // EOF - PTY closed
1285                        debug!("PTY output reader: EOF received");
1286                        let _ = output_tx.blocking_send(OutputEvent::Eof);
1287                        break;
1288                    }
1289                    Ok(n) => {
1290                        let data = buf[..n].to_vec();
1291                        // Send to TUI channel if connected
1292                        if let Some(ref tx) = tui_output_tx {
1293                            let _ = tx.send(data.clone());
1294                        }
1295                        // Send to main loop
1296                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
1297                            debug!("PTY output reader: channel closed");
1298                            break;
1299                        }
1300                    }
1301                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1302                        // Non-blocking mode: no data available, yield briefly
1303                        std::thread::sleep(Duration::from_millis(1));
1304                    }
1305                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {
1306                        // Interrupted by signal, retry
1307                    }
1308                    Err(e) => {
1309                        warn!("PTY output reader: error - {}", e);
1310                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
1311                        break;
1312                    }
1313                }
1314            }
1315            debug!("PTY output reader thread exiting");
1316        });
1317
1318        // Spawn input reading task - ONLY when TUI is NOT connected
1319        // In TUI mode (observation mode), user input should not be captured from stdin.
1320        // The TUI has its own input handling, and raw Ctrl+C should go directly to the
1321        // signal handler (interrupt_rx) without racing with the stdin reader.
1322        let mut input_rx = if tui_connected {
1323            debug!("TUI connected - skipping stdin reader thread");
1324            None
1325        } else {
1326            let (input_tx, input_rx) = mpsc::unbounded_channel::<InputEvent>();
1327            let should_terminate_input = Arc::clone(&should_terminate);
1328
1329            std::thread::spawn(move || {
1330                let mut stdin = io::stdin();
1331                let mut buf = [0u8; 1];
1332
1333                loop {
1334                    if should_terminate_input.load(Ordering::SeqCst) {
1335                        break;
1336                    }
1337
1338                    match stdin.read(&mut buf) {
1339                        Ok(0) => break, // EOF
1340                        Ok(1) => {
1341                            let byte = buf[0];
1342                            let event = match byte {
1343                                3 => InputEvent::CtrlC,          // Ctrl+C
1344                                28 => InputEvent::CtrlBackslash, // Ctrl+\
1345                                _ => InputEvent::Data(vec![byte]),
1346                            };
1347                            if input_tx.send(event).is_err() {
1348                                break;
1349                            }
1350                        }
1351                        Ok(_) => {} // Shouldn't happen with 1-byte buffer
1352                        Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
1353                        Err(_) => break,
1354                    }
1355                }
1356            });
1357            Some(input_rx)
1358        };
1359
1360        // Write stdin input after threads are spawned (so we capture any output)
1361        // Give Claude's TUI a moment to initialize before sending the prompt
1362        if let Some(ref input) = pending_stdin {
1363            tokio::time::sleep(Duration::from_millis(100)).await;
1364            writer.write_all(input.as_bytes())?;
1365            writer.write_all(b"\n")?;
1366            writer.flush()?;
1367            last_activity = Instant::now();
1368        }
1369
1370        // Main select loop - this is the key fix for blocking I/O
1371        // We use tokio::select! to multiplex between output, input, and timeout
1372        loop {
1373            // Check if child has exited (non-blocking check before select)
1374            if let Some(status) = child
1375                .try_wait()
1376                .map_err(|e| io::Error::other(e.to_string()))?
1377            {
1378                let exit_code = status.exit_code() as i32;
1379                debug!(exit_status = ?status, exit_code, "Child process exited");
1380
1381                // Drain remaining output already buffered.
1382                while let Ok(event) = output_rx.try_recv() {
1383                    if let OutputEvent::Data(data) = event {
1384                        if !tui_connected {
1385                            io::stdout().write_all(&data)?;
1386                            io::stdout().flush()?;
1387                        }
1388                        output.extend_from_slice(&data);
1389                    }
1390                }
1391
1392                // Give the reader thread a brief window to flush any final bytes/EOF.
1393                // This avoids races where fast-exiting commands drop output before we return.
1394                let drain_deadline = Instant::now() + Duration::from_millis(200);
1395                loop {
1396                    let remaining = drain_deadline.saturating_duration_since(Instant::now());
1397                    if remaining.is_zero() {
1398                        break;
1399                    }
1400                    match tokio::time::timeout(remaining, output_rx.recv()).await {
1401                        Ok(Some(OutputEvent::Data(data))) => {
1402                            if !tui_connected {
1403                                io::stdout().write_all(&data)?;
1404                                io::stdout().flush()?;
1405                            }
1406                            output.extend_from_slice(&data);
1407                        }
1408                        Ok(Some(OutputEvent::Eof) | None) => break,
1409                        Ok(Some(OutputEvent::Error(e))) => {
1410                            debug!(error = %e, "PTY read error after exit");
1411                            break;
1412                        }
1413                        Err(_) => break, // timeout
1414                    }
1415                }
1416
1417                should_terminate.store(true, Ordering::SeqCst);
1418                // Signal TUI that PTY has terminated
1419                let _ = self.terminated_tx.send(true);
1420
1421                let final_termination = resolve_termination_type(exit_code, termination);
1422                // run_interactive doesn't parse JSON, so extracted_text is empty
1423                return Ok(build_result(
1424                    &output,
1425                    status.success(),
1426                    Some(exit_code),
1427                    final_termination,
1428                    String::new(),
1429                    None,
1430                ));
1431            }
1432
1433            // Build the timeout future (or a never-completing one if disabled)
1434            let timeout_future = async {
1435                match timeout_duration {
1436                    Some(d) => {
1437                        let elapsed = last_activity.elapsed();
1438                        if elapsed >= d {
1439                            tokio::time::sleep(Duration::ZERO).await
1440                        } else {
1441                            tokio::time::sleep(d.saturating_sub(elapsed)).await
1442                        }
1443                    }
1444                    None => std::future::pending::<()>().await,
1445                }
1446            };
1447
1448            tokio::select! {
1449                // PTY output received
1450                output_event = output_rx.recv() => {
1451                    match output_event {
1452                        Some(OutputEvent::Data(data)) => {
1453                            // Only write to stdout if TUI is NOT handling output
1454                            if !tui_connected {
1455                                io::stdout().write_all(&data)?;
1456                                io::stdout().flush()?;
1457                            }
1458                            output.extend_from_slice(&data);
1459
1460                            last_activity = Instant::now();
1461                        }
1462                        Some(OutputEvent::Eof) => {
1463                            debug!("PTY EOF received");
1464                            break;
1465                        }
1466                        Some(OutputEvent::Error(e)) => {
1467                            debug!(error = %e, "PTY read error");
1468                            break;
1469                        }
1470                        None => {
1471                            // Channel closed, reader thread exited
1472                            break;
1473                        }
1474                    }
1475                }
1476
1477                // User input received (from stdin) - only active when TUI is NOT connected
1478                input_event = async {
1479                    match input_rx.as_mut() {
1480                        Some(rx) => rx.recv().await,
1481                        None => std::future::pending().await, // Never resolves when TUI is connected
1482                    }
1483                } => {
1484                    match input_event {
1485                        Some(InputEvent::CtrlC) => {
1486                            match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1487                                CtrlCAction::ForwardAndStartWindow => {
1488                                    // Forward Ctrl+C to Claude
1489                                    let _ = writer.write_all(&[3]);
1490                                    let _ = writer.flush();
1491                                    last_activity = Instant::now();
1492                                }
1493                                CtrlCAction::Terminate => {
1494                                    info!("Double Ctrl+C detected, terminating");
1495                                    termination = TerminationType::UserInterrupt;
1496                                    should_terminate.store(true, Ordering::SeqCst);
1497                                    self.terminate_child(&mut child, true).await?;
1498                                    break;
1499                                }
1500                            }
1501                        }
1502                        Some(InputEvent::CtrlBackslash) => {
1503                            info!("Ctrl+\\ detected, force killing");
1504                            termination = TerminationType::ForceKill;
1505                            should_terminate.store(true, Ordering::SeqCst);
1506                            self.terminate_child(&mut child, false).await?;
1507                            break;
1508                        }
1509                        Some(InputEvent::Data(data)) => {
1510                            // Forward to Claude
1511                            let _ = writer.write_all(&data);
1512                            let _ = writer.flush();
1513                            last_activity = Instant::now();
1514                        }
1515                        None => {
1516                            // Input channel closed (stdin EOF)
1517                            debug!("Input channel closed");
1518                        }
1519                    }
1520                }
1521
1522                // TUI input received (convert to InputEvent for unified handling)
1523                tui_input = self.input_rx.recv() => {
1524                    if let Some(data) = tui_input {
1525                        match InputEvent::from_bytes(data) {
1526                            InputEvent::CtrlC => {
1527                                match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1528                                    CtrlCAction::ForwardAndStartWindow => {
1529                                        let _ = writer.write_all(&[3]);
1530                                        let _ = writer.flush();
1531                                        last_activity = Instant::now();
1532                                    }
1533                                    CtrlCAction::Terminate => {
1534                                        info!("Double Ctrl+C detected, terminating");
1535                                        termination = TerminationType::UserInterrupt;
1536                                        should_terminate.store(true, Ordering::SeqCst);
1537                                        self.terminate_child(&mut child, true).await?;
1538                                        break;
1539                                    }
1540                                }
1541                            }
1542                            InputEvent::CtrlBackslash => {
1543                                info!("Ctrl+\\ detected, force killing");
1544                                termination = TerminationType::ForceKill;
1545                                should_terminate.store(true, Ordering::SeqCst);
1546                                self.terminate_child(&mut child, false).await?;
1547                                break;
1548                            }
1549                            InputEvent::Data(bytes) => {
1550                                let _ = writer.write_all(&bytes);
1551                                let _ = writer.flush();
1552                                last_activity = Instant::now();
1553                            }
1554                        }
1555                    }
1556                }
1557
1558                // Control commands from TUI
1559                control_cmd = self.control_rx.recv() => {
1560                    if let Some(cmd) = control_cmd {
1561                        use crate::pty_handle::ControlCommand;
1562                        match cmd {
1563                            ControlCommand::Kill => {
1564                                info!("Control command: Kill");
1565                                termination = TerminationType::UserInterrupt;
1566                                should_terminate.store(true, Ordering::SeqCst);
1567                                self.terminate_child(&mut child, true).await?;
1568                                break;
1569                            }
1570                            ControlCommand::Resize(cols, rows) => {
1571                                debug!(cols, rows, "Control command: Resize");
1572                                // Resize the PTY to match TUI dimensions
1573                                if let Err(e) = master.resize(PtySize {
1574                                    rows,
1575                                    cols,
1576                                    pixel_width: 0,
1577                                    pixel_height: 0,
1578                                }) {
1579                                    warn!("Failed to resize PTY: {}", e);
1580                                }
1581                            }
1582                            ControlCommand::Skip | ControlCommand::Abort => {
1583                                // These are handled at orchestrator level, not here
1584                                debug!("Control command: {:?} (ignored at PTY level)", cmd);
1585                            }
1586                        }
1587                    }
1588                }
1589
1590                // Idle timeout expired
1591                _ = timeout_future => {
1592                    warn!(
1593                        timeout_secs = self.config.idle_timeout_secs,
1594                        "Idle timeout triggered"
1595                    );
1596                    termination = TerminationType::IdleTimeout;
1597                    should_terminate.store(true, Ordering::SeqCst);
1598                    self.terminate_child(&mut child, true).await?;
1599                    break;
1600                }
1601
1602                // Interrupt signal from event loop
1603                _ = interrupt_rx.changed() => {
1604                    if *interrupt_rx.borrow() {
1605                        debug!("Interrupt received in interactive mode, terminating");
1606                        termination = TerminationType::UserInterrupt;
1607                        should_terminate.store(true, Ordering::SeqCst);
1608                        self.terminate_child(&mut child, true).await?;
1609                        break;
1610                    }
1611                }
1612            }
1613        }
1614
1615        // Ensure termination flag is set for spawned threads
1616        should_terminate.store(true, Ordering::SeqCst);
1617
1618        // Signal TUI that PTY has terminated
1619        let _ = self.terminated_tx.send(true);
1620
1621        // Wait for child to fully exit (interruptible + bounded)
1622        let status = self
1623            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1624            .await?;
1625
1626        let (success, exit_code, final_termination) = match status {
1627            Some(s) => {
1628                let code = s.exit_code() as i32;
1629                (
1630                    s.success(),
1631                    Some(code),
1632                    resolve_termination_type(code, termination),
1633                )
1634            }
1635            None => {
1636                warn!("Timed out waiting for child to exit after termination");
1637                (false, None, termination)
1638            }
1639        };
1640
1641        // run_interactive doesn't parse JSON, so extracted_text is empty
1642        Ok(build_result(
1643            &output,
1644            success,
1645            exit_code,
1646            final_termination,
1647            String::new(),
1648            None,
1649        ))
1650    }
1651
1652    /// Terminates the child process.
1653    ///
1654    /// If `graceful` is true, sends SIGTERM and waits up to 5 seconds before SIGKILL.
1655    /// If `graceful` is false, sends SIGKILL immediately.
1656    ///
1657    /// This is an async function to avoid blocking the tokio runtime during the
1658    /// grace period wait. Previously used `std::thread::sleep` which blocked the
1659    /// worker thread for up to 5 seconds, making the TUI appear frozen.
1660    #[allow(clippy::unused_self)] // Self is conceptually the right receiver for this method
1661    #[allow(clippy::unused_async)] // Kept async to preserve signature parity with Unix implementation
1662    #[cfg(not(unix))]
1663    async fn terminate_child(
1664        &self,
1665        child: &mut Box<dyn portable_pty::Child + Send>,
1666        _graceful: bool,
1667    ) -> io::Result<()> {
1668        child.kill()
1669    }
1670
1671    #[cfg(unix)]
1672    async fn terminate_child(
1673        &self,
1674        child: &mut Box<dyn portable_pty::Child + Send>,
1675        graceful: bool,
1676    ) -> io::Result<()> {
1677        let pid = match child.process_id() {
1678            Some(id) => Pid::from_raw(id as i32),
1679            None => return Ok(()), // Already exited
1680        };
1681
1682        if graceful {
1683            debug!(pid = %pid, "Sending SIGTERM");
1684            let _ = kill(pid, Signal::SIGTERM);
1685
1686            // Wait up to 5 seconds for graceful exit (reduced from 5s for better UX)
1687            let grace_period = Duration::from_secs(2);
1688            let start = Instant::now();
1689
1690            while start.elapsed() < grace_period {
1691                if child
1692                    .try_wait()
1693                    .map_err(|e| io::Error::other(e.to_string()))?
1694                    .is_some()
1695                {
1696                    return Ok(());
1697                }
1698                // Use async sleep to avoid blocking the tokio runtime
1699                tokio::time::sleep(Duration::from_millis(50)).await;
1700            }
1701
1702            // Still running after grace period - force kill
1703            debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1704        }
1705
1706        debug!(pid = %pid, "Sending SIGKILL");
1707        let _ = kill(pid, Signal::SIGKILL);
1708        Ok(())
1709    }
1710
1711    /// Waits for the child process to exit, optionally with a timeout.
1712    ///
1713    /// This is interruptible by the shared interrupt channel from the event loop.
1714    /// When interrupted, returns `Ok(None)` to let the caller handle termination.
1715    async fn wait_for_exit(
1716        &self,
1717        child: &mut Box<dyn portable_pty::Child + Send>,
1718        max_wait: Option<Duration>,
1719        interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1720    ) -> io::Result<Option<portable_pty::ExitStatus>> {
1721        let start = Instant::now();
1722
1723        loop {
1724            if let Some(status) = child
1725                .try_wait()
1726                .map_err(|e| io::Error::other(e.to_string()))?
1727            {
1728                return Ok(Some(status));
1729            }
1730
1731            if let Some(max) = max_wait
1732                && start.elapsed() >= max
1733            {
1734                return Ok(None);
1735            }
1736
1737            tokio::select! {
1738                _ = interrupt_rx.changed() => {
1739                    if *interrupt_rx.borrow() {
1740                        debug!("Interrupt received while waiting for child exit");
1741                        return Ok(None);
1742                    }
1743                }
1744                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1745            }
1746        }
1747    }
1748}
1749
1750fn handle_copilot_stream_line<H: StreamHandler>(
1751    line: &str,
1752    handler: &mut H,
1753    extracted_text: &mut String,
1754    copilot_state: &mut CopilotStreamState,
1755) -> Option<SessionResult> {
1756    let event = CopilotStreamParser::parse_line(line)?;
1757    dispatch_copilot_stream_event(event, handler, extracted_text, copilot_state)
1758}
1759
1760fn inject_ralph_runtime_env(cmd_builder: &mut CommandBuilder, workspace_root: &std::path::Path) {
1761    let Ok(current_exe) = env::current_exe() else {
1762        return;
1763    };
1764    let Some(bin_dir) = current_exe.parent() else {
1765        return;
1766    };
1767
1768    let mut path_entries = vec![bin_dir.to_path_buf()];
1769    if let Some(existing_path) = env::var_os("PATH") {
1770        path_entries.extend(env::split_paths(&existing_path));
1771    }
1772
1773    if let Ok(joined_path) = env::join_paths(path_entries) {
1774        cmd_builder.env("PATH", joined_path);
1775    }
1776    cmd_builder.env("RALPH_BIN", current_exe);
1777    cmd_builder.env("RALPH_WORKSPACE_ROOT", workspace_root);
1778    if std::path::Path::new("/var/tmp").is_dir() {
1779        cmd_builder.env("TMPDIR", "/var/tmp");
1780        cmd_builder.env("TMP", "/var/tmp");
1781        cmd_builder.env("TEMP", "/var/tmp");
1782    }
1783}
1784
1785/// Input events from the user.
1786#[derive(Debug)]
1787enum InputEvent {
1788    /// Ctrl+C pressed.
1789    CtrlC,
1790    /// Ctrl+\ pressed.
1791    CtrlBackslash,
1792    /// Regular data to forward.
1793    Data(Vec<u8>),
1794}
1795
1796impl InputEvent {
1797    /// Creates an InputEvent from raw bytes.
1798    fn from_bytes(data: Vec<u8>) -> Self {
1799        if data.len() == 1 {
1800            match data[0] {
1801                3 => return InputEvent::CtrlC,
1802                28 => return InputEvent::CtrlBackslash,
1803                _ => {}
1804            }
1805        }
1806        InputEvent::Data(data)
1807    }
1808}
1809
1810/// Output events from the PTY.
1811#[derive(Debug)]
1812enum OutputEvent {
1813    /// Data received from PTY.
1814    Data(Vec<u8>),
1815    /// PTY reached EOF (process exited).
1816    Eof,
1817    /// Error reading from PTY.
1818    Error(String),
1819}
1820
1821/// Strips ANSI escape sequences from raw bytes.
1822///
1823/// Uses `strip-ansi-escapes` for direct byte-level ANSI removal without terminal
1824/// emulation. This ensures ALL content is preserved regardless of output size,
1825/// unlike vt100's terminal simulation which can lose content that scrolls off.
1826fn strip_ansi(bytes: &[u8]) -> String {
1827    let stripped = strip_ansi_escapes::strip(bytes);
1828    String::from_utf8_lossy(&stripped).into_owned()
1829}
1830
1831/// Determines the final termination type, accounting for SIGINT exit code.
1832///
1833/// Exit code 130 indicates the process was killed by SIGINT (Ctrl+C forwarded to PTY).
1834fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1835    if exit_code == 130 {
1836        info!("Child process killed by SIGINT");
1837        TerminationType::UserInterrupt
1838    } else {
1839        default
1840    }
1841}
1842
1843fn extract_cli_flag_value(args: &[String], long_flag: &str, short_flag: &str) -> Option<String> {
1844    for (i, arg) in args.iter().enumerate() {
1845        if arg == long_flag || arg == short_flag {
1846            if let Some(value) = args.get(i + 1)
1847                && !value.starts_with('-')
1848            {
1849                return Some(value.clone());
1850            }
1851            continue;
1852        }
1853
1854        if let Some(value) = arg.strip_prefix(&format!("{long_flag}="))
1855            && !value.is_empty()
1856        {
1857            return Some(value.to_string());
1858        }
1859
1860        if let Some(value) = arg.strip_prefix(&format!("{short_flag}="))
1861            && !value.is_empty()
1862        {
1863            return Some(value.to_string());
1864        }
1865    }
1866
1867    None
1868}
1869
1870/// Dispatches a Claude stream event to the appropriate handler method.
1871/// Also accumulates text content into `extracted_text` for event parsing.
1872fn dispatch_stream_event<H: StreamHandler>(
1873    event: ClaudeStreamEvent,
1874    handler: &mut H,
1875    extracted_text: &mut String,
1876) {
1877    match event {
1878        ClaudeStreamEvent::System { .. } => {
1879            // Session initialization - could log in verbose mode but not user-facing
1880        }
1881        ClaudeStreamEvent::Assistant { message, .. } => {
1882            for block in message.content {
1883                match block {
1884                    ContentBlock::Text { text } => {
1885                        handler.on_text(&text);
1886                        // Accumulate text for event parsing
1887                        extracted_text.push_str(&text);
1888                        extracted_text.push('\n');
1889                    }
1890                    ContentBlock::ToolUse { name, id, input } => {
1891                        handler.on_tool_call(&name, &id, &input)
1892                    }
1893                }
1894            }
1895        }
1896        ClaudeStreamEvent::User { message } => {
1897            for block in message.content {
1898                match block {
1899                    UserContentBlock::ToolResult {
1900                        tool_use_id,
1901                        content,
1902                    } => {
1903                        handler.on_tool_result(&tool_use_id, &content);
1904                    }
1905                }
1906            }
1907        }
1908        ClaudeStreamEvent::Result {
1909            duration_ms,
1910            total_cost_usd,
1911            num_turns,
1912            is_error,
1913        } => {
1914            if is_error {
1915                handler.on_error("Session ended with error");
1916            }
1917            handler.on_complete(&SessionResult {
1918                duration_ms,
1919                total_cost_usd,
1920                num_turns,
1921                is_error,
1922                ..Default::default()
1923            });
1924        }
1925    }
1926}
1927
1928/// Builds a `PtyExecutionResult` from the accumulated output and exit status.
1929///
1930/// # Arguments
1931/// * `output` - Raw bytes from PTY
1932/// * `success` - Whether process exited successfully
1933/// * `exit_code` - Process exit code if available
1934/// * `termination` - How the process was terminated
1935/// * `extracted_text` - Text extracted from NDJSON stream (for Claude's stream-json)
1936fn build_result(
1937    output: &[u8],
1938    success: bool,
1939    exit_code: Option<i32>,
1940    termination: TerminationType,
1941    extracted_text: String,
1942    session_result: Option<&SessionResult>,
1943) -> PtyExecutionResult {
1944    let (total_cost_usd, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens) =
1945        if let Some(result) = session_result {
1946            (
1947                result.total_cost_usd,
1948                result.input_tokens,
1949                result.output_tokens,
1950                result.cache_read_tokens,
1951                result.cache_write_tokens,
1952            )
1953        } else {
1954            (0.0, 0, 0, 0, 0)
1955        };
1956
1957    PtyExecutionResult {
1958        output: String::from_utf8_lossy(output).to_string(),
1959        stripped_output: strip_ansi(output),
1960        extracted_text,
1961        success,
1962        exit_code,
1963        termination,
1964        total_cost_usd,
1965        input_tokens,
1966        output_tokens,
1967        cache_read_tokens,
1968        cache_write_tokens,
1969    }
1970}
1971
1972#[cfg(test)]
1973mod tests {
1974    use super::*;
1975    use crate::claude_stream::{AssistantMessage, UserMessage};
1976    #[cfg(unix)]
1977    use crate::cli_backend::PromptMode;
1978    use crate::stream_handler::{SessionResult, StreamHandler};
1979    #[cfg(unix)]
1980    use tempfile::TempDir;
1981
1982    #[test]
1983    fn test_double_ctrl_c_within_window() {
1984        let mut state = CtrlCState::new();
1985        let now = Instant::now();
1986
1987        // First Ctrl+C: should forward and start window
1988        let action = state.handle_ctrl_c(now);
1989        assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1990
1991        // Second Ctrl+C within 1 second: should terminate
1992        let later = now + Duration::from_millis(500);
1993        let action = state.handle_ctrl_c(later);
1994        assert_eq!(action, CtrlCAction::Terminate);
1995    }
1996
1997    #[test]
1998    fn test_input_event_from_bytes_ctrl_c() {
1999        let event = InputEvent::from_bytes(vec![3]);
2000        assert!(matches!(event, InputEvent::CtrlC));
2001    }
2002
2003    #[test]
2004    fn test_input_event_from_bytes_ctrl_backslash() {
2005        let event = InputEvent::from_bytes(vec![28]);
2006        assert!(matches!(event, InputEvent::CtrlBackslash));
2007    }
2008
2009    #[test]
2010    fn test_input_event_from_bytes_data() {
2011        let event = InputEvent::from_bytes(vec![b'a']);
2012        assert!(matches!(event, InputEvent::Data(_)));
2013
2014        let event = InputEvent::from_bytes(vec![1, 2, 3]);
2015        assert!(matches!(event, InputEvent::Data(_)));
2016    }
2017
2018    #[test]
2019    fn test_ctrl_c_window_expires() {
2020        let mut state = CtrlCState::new();
2021        let now = Instant::now();
2022
2023        // First Ctrl+C
2024        state.handle_ctrl_c(now);
2025
2026        // Wait 2 seconds (window expires)
2027        let later = now + Duration::from_secs(2);
2028
2029        // Second Ctrl+C: window expired, should forward and start new window
2030        let action = state.handle_ctrl_c(later);
2031        assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
2032    }
2033
2034    #[test]
2035    fn test_strip_ansi_basic() {
2036        let input = b"\x1b[1;36m  Thinking...\x1b[0m\r\n";
2037        let stripped = strip_ansi(input);
2038        assert!(stripped.contains("Thinking..."));
2039        assert!(!stripped.contains("\x1b["));
2040    }
2041
2042    #[test]
2043    fn test_completion_promise_extraction() {
2044        // Simulate Claude output with heavy ANSI formatting
2045        let input = b"\x1b[1;36m  Thinking...\x1b[0m\r\n\
2046                      \x1b[2K\x1b[1;32m  Done!\x1b[0m\r\n\
2047                      \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
2048
2049        let stripped = strip_ansi(input);
2050
2051        // Event parser sees clean text
2052        assert!(stripped.contains("LOOP_COMPLETE"));
2053        assert!(!stripped.contains("\x1b["));
2054    }
2055
2056    #[test]
2057    fn test_event_tag_extraction() {
2058        // Event tags may be wrapped in ANSI codes
2059        let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
2060                      Task completed successfully\r\n\
2061                      \x1b[90m</event>\x1b[0m\r\n";
2062
2063        let stripped = strip_ansi(input);
2064
2065        assert!(stripped.contains("<event topic=\"build.done\">"));
2066        assert!(stripped.contains("</event>"));
2067    }
2068
2069    #[test]
2070    fn test_large_output_preserves_early_events() {
2071        // Regression test: ensure event tags aren't lost when output is large
2072        let mut input = Vec::new();
2073
2074        // Event tag at the beginning
2075        input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
2076
2077        // Simulate 500 lines of verbose output (would overflow any terminal)
2078        for i in 0..500 {
2079            input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
2080        }
2081
2082        let stripped = strip_ansi(&input);
2083
2084        // Event tag should still be present - no scrollback loss with strip-ansi-escapes
2085        assert!(
2086            stripped.contains("<event topic=\"build.task\">"),
2087            "Event tag was lost - strip_ansi is not preserving all content"
2088        );
2089        assert!(stripped.contains("Implement feature X"));
2090        assert!(stripped.contains("Line 499")); // Last line should be present too
2091    }
2092
2093    #[test]
2094    fn test_pty_config_defaults() {
2095        let config = PtyConfig::default();
2096        assert!(config.interactive);
2097        assert_eq!(config.idle_timeout_secs, 30);
2098        assert_eq!(config.cols, 80);
2099        assert_eq!(config.rows, 24);
2100    }
2101
2102    #[test]
2103    fn test_pty_config_from_env_matches_env_or_defaults() {
2104        let cols = std::env::var("COLUMNS")
2105            .ok()
2106            .and_then(|value| value.parse::<u16>().ok())
2107            .unwrap_or(80);
2108        let rows = std::env::var("LINES")
2109            .ok()
2110            .and_then(|value| value.parse::<u16>().ok())
2111            .unwrap_or(24);
2112
2113        let config = PtyConfig::from_env();
2114        assert_eq!(config.cols, cols);
2115        assert_eq!(config.rows, rows);
2116    }
2117
2118    /// Verifies that the idle timeout logic in run_interactive correctly handles
2119    /// activity resets. Per spec (interactive-mode.spec.md lines 155-159):
2120    /// - Timeout resets on agent output (any bytes from PTY)
2121    /// - Timeout resets on user input (any key forwarded to agent)
2122    ///
2123    /// This test validates the timeout calculation logic that enables resets.
2124    /// The actual reset happens in the select! branches at lines 497, 523, and 545.
2125    #[test]
2126    fn test_idle_timeout_reset_logic() {
2127        // Simulate the timeout calculation used in run_interactive
2128        let timeout_duration = Duration::from_secs(30);
2129
2130        // Simulate 25 seconds of inactivity
2131        let simulated_25s = Duration::from_secs(25);
2132
2133        // Remaining time before timeout
2134        let remaining = timeout_duration.saturating_sub(simulated_25s);
2135        assert_eq!(remaining.as_secs(), 5);
2136
2137        // After activity (output or input), last_activity would be reset to now
2138        let last_activity_after_reset = Instant::now();
2139
2140        // Now elapsed is 0, full timeout duration available again
2141        let elapsed = last_activity_after_reset.elapsed();
2142        assert!(elapsed < Duration::from_millis(100)); // Should be near-zero
2143
2144        // Timeout calculation would give full duration minus small elapsed
2145        let new_remaining = timeout_duration.saturating_sub(elapsed);
2146        assert!(new_remaining > Duration::from_secs(29)); // Should be nearly full timeout
2147    }
2148
2149    #[test]
2150    fn test_extracted_text_field_exists() {
2151        // Test that PtyExecutionResult has extracted_text field
2152        // This is for NDJSON output where event tags are inside JSON strings
2153        let result = PtyExecutionResult {
2154            output: String::new(),
2155            stripped_output: String::new(),
2156            extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
2157            success: true,
2158            exit_code: Some(0),
2159            termination: TerminationType::Natural,
2160            total_cost_usd: 0.0,
2161            input_tokens: 0,
2162            output_tokens: 0,
2163            cache_read_tokens: 0,
2164            cache_write_tokens: 0,
2165        };
2166
2167        assert!(
2168            result
2169                .extracted_text
2170                .contains("<event topic=\"build.done\">")
2171        );
2172    }
2173
2174    #[test]
2175    fn test_build_result_includes_extracted_text() {
2176        // Test that build_result properly handles extracted_text
2177        let output = b"raw output";
2178        let extracted = "extracted text with <event topic=\"test\">payload</event>";
2179        let result = build_result(
2180            output,
2181            true,
2182            Some(0),
2183            TerminationType::Natural,
2184            extracted.to_string(),
2185            None,
2186        );
2187
2188        assert_eq!(result.extracted_text, extracted);
2189        assert!(result.stripped_output.contains("raw output"));
2190    }
2191
2192    #[test]
2193    fn test_resolve_termination_type_handles_sigint_exit_code() {
2194        let termination = resolve_termination_type(130, TerminationType::Natural);
2195        assert_eq!(termination, TerminationType::UserInterrupt);
2196
2197        let termination = resolve_termination_type(0, TerminationType::ForceKill);
2198        assert_eq!(termination, TerminationType::ForceKill);
2199    }
2200
2201    #[test]
2202    fn test_extract_cli_flag_value_supports_split_and_equals_syntax() {
2203        let args = vec![
2204            "--provider".to_string(),
2205            "anthropic".to_string(),
2206            "--model=claude-sonnet-4".to_string(),
2207        ];
2208
2209        assert_eq!(
2210            extract_cli_flag_value(&args, "--provider", "-p"),
2211            Some("anthropic".to_string())
2212        );
2213        assert_eq!(
2214            extract_cli_flag_value(&args, "--model", "-m"),
2215            Some("claude-sonnet-4".to_string())
2216        );
2217        assert_eq!(extract_cli_flag_value(&args, "--foo", "-f"), None);
2218    }
2219
2220    #[derive(Default)]
2221    struct CapturingHandler {
2222        texts: Vec<String>,
2223        tool_calls: Vec<(String, String, serde_json::Value)>,
2224        tool_results: Vec<(String, String)>,
2225        errors: Vec<String>,
2226        completions: Vec<SessionResult>,
2227    }
2228
2229    impl StreamHandler for CapturingHandler {
2230        fn on_text(&mut self, text: &str) {
2231            self.texts.push(text.to_string());
2232        }
2233
2234        fn on_tool_call(&mut self, name: &str, id: &str, input: &serde_json::Value) {
2235            self.tool_calls
2236                .push((name.to_string(), id.to_string(), input.clone()));
2237        }
2238
2239        fn on_tool_result(&mut self, id: &str, output: &str) {
2240            self.tool_results.push((id.to_string(), output.to_string()));
2241        }
2242
2243        fn on_error(&mut self, error: &str) {
2244            self.errors.push(error.to_string());
2245        }
2246
2247        fn on_complete(&mut self, result: &SessionResult) {
2248            self.completions.push(result.clone());
2249        }
2250    }
2251
2252    #[test]
2253    fn test_dispatch_stream_event_routes_text_and_tool_calls() {
2254        let mut handler = CapturingHandler::default();
2255        let mut extracted_text = String::new();
2256
2257        let event = ClaudeStreamEvent::Assistant {
2258            message: AssistantMessage {
2259                content: vec![
2260                    ContentBlock::Text {
2261                        text: "Hello".to_string(),
2262                    },
2263                    ContentBlock::ToolUse {
2264                        id: "tool-1".to_string(),
2265                        name: "Read".to_string(),
2266                        input: serde_json::json!({"path": "README.md"}),
2267                    },
2268                ],
2269            },
2270            usage: None,
2271        };
2272
2273        dispatch_stream_event(event, &mut handler, &mut extracted_text);
2274
2275        assert_eq!(handler.texts, vec!["Hello".to_string()]);
2276        assert_eq!(handler.tool_calls.len(), 1);
2277        assert!(extracted_text.contains("Hello"));
2278        assert!(extracted_text.ends_with('\n'));
2279    }
2280
2281    #[test]
2282    fn test_dispatch_stream_event_routes_tool_results_and_completion() {
2283        let mut handler = CapturingHandler::default();
2284        let mut extracted_text = String::new();
2285
2286        let event = ClaudeStreamEvent::User {
2287            message: UserMessage {
2288                content: vec![UserContentBlock::ToolResult {
2289                    tool_use_id: "tool-1".to_string(),
2290                    content: "done".to_string(),
2291                }],
2292            },
2293        };
2294
2295        dispatch_stream_event(event, &mut handler, &mut extracted_text);
2296        assert_eq!(handler.tool_results.len(), 1);
2297        assert_eq!(handler.tool_results[0].0, "tool-1");
2298        assert_eq!(handler.tool_results[0].1, "done");
2299
2300        let event = ClaudeStreamEvent::Result {
2301            duration_ms: 12,
2302            total_cost_usd: 0.01,
2303            num_turns: 2,
2304            is_error: true,
2305        };
2306
2307        dispatch_stream_event(event, &mut handler, &mut extracted_text);
2308        assert_eq!(handler.errors.len(), 1);
2309        assert_eq!(handler.completions.len(), 1);
2310        assert!(handler.completions[0].is_error);
2311    }
2312
2313    #[test]
2314    fn test_dispatch_stream_event_system_noop() {
2315        let mut handler = CapturingHandler::default();
2316        let mut extracted_text = String::new();
2317
2318        let event = ClaudeStreamEvent::System {
2319            session_id: "session-1".to_string(),
2320            model: "claude-test".to_string(),
2321            tools: Vec::new(),
2322        };
2323
2324        dispatch_stream_event(event, &mut handler, &mut extracted_text);
2325
2326        assert!(handler.texts.is_empty());
2327        assert!(handler.tool_calls.is_empty());
2328        assert!(handler.tool_results.is_empty());
2329        assert!(handler.errors.is_empty());
2330        assert!(handler.completions.is_empty());
2331        assert!(extracted_text.is_empty());
2332    }
2333
2334    /// Regression test: TUI mode should not spawn stdin reader thread
2335    ///
2336    /// Bug: In TUI mode, Ctrl+C required double-press to exit because the stdin
2337    /// reader thread (which captures byte 0x03) raced with the signal handler.
2338    /// The stdin reader would win, triggering "double Ctrl+C" logic instead of
2339    /// clean exit via interrupt_rx.
2340    ///
2341    /// Fix: When tui_connected=true, skip spawning stdin reader entirely.
2342    /// TUI mode is observation-only; user input should not be captured from stdin.
2343    /// The TUI has its own input handling (Ctrl+a q), and raw Ctrl+C goes directly
2344    /// to the signal handler (interrupt_rx) without racing.
2345    ///
2346    /// This test documents the expected behavior. The actual fix is in
2347    /// run_interactive() where `let mut input_rx = if !tui_connected { ... }`.
2348    #[test]
2349    fn test_tui_mode_stdin_reader_bypass() {
2350        // The tui_connected flag is now determined by the explicit tui_mode field,
2351        // set via set_tui_mode(true) when TUI is connected.
2352        // Previously used output_rx.is_none() which broke after streaming refactor.
2353
2354        // Simulate TUI connected scenario (tui_mode = true)
2355        let tui_mode = true;
2356        let tui_connected = tui_mode;
2357
2358        // When TUI is connected, stdin reader is skipped
2359        // (verified by: input_rx becomes None instead of Some(channel))
2360        assert!(
2361            tui_connected,
2362            "When tui_mode is true, stdin reader must be skipped"
2363        );
2364
2365        // In non-TUI mode, stdin reader is spawned
2366        let tui_mode_disabled = false;
2367        let tui_connected_non_tui = tui_mode_disabled;
2368        assert!(
2369            !tui_connected_non_tui,
2370            "When tui_mode is false, stdin reader must be spawned"
2371        );
2372    }
2373
2374    #[test]
2375    fn test_tui_mode_default_is_false() {
2376        // Create a PtyExecutor and verify tui_mode defaults to false
2377        let backend = CliBackend::claude();
2378        let config = PtyConfig::default();
2379        let executor = PtyExecutor::new(backend, config);
2380
2381        // tui_mode should default to false
2382        assert!(!executor.tui_mode, "tui_mode should default to false");
2383    }
2384
2385    #[test]
2386    fn test_set_tui_mode() {
2387        // Create a PtyExecutor and verify set_tui_mode works
2388        let backend = CliBackend::claude();
2389        let config = PtyConfig::default();
2390        let mut executor = PtyExecutor::new(backend, config);
2391
2392        // Initially false
2393        assert!(!executor.tui_mode, "tui_mode should start as false");
2394
2395        // Set to true
2396        executor.set_tui_mode(true);
2397        assert!(
2398            executor.tui_mode,
2399            "tui_mode should be true after set_tui_mode(true)"
2400        );
2401
2402        // Set back to false
2403        executor.set_tui_mode(false);
2404        assert!(
2405            !executor.tui_mode,
2406            "tui_mode should be false after set_tui_mode(false)"
2407        );
2408    }
2409
2410    #[test]
2411    fn test_build_result_populates_fields() {
2412        let output = b"\x1b[31mHello\x1b[0m\n";
2413        let extracted = "extracted text".to_string();
2414
2415        let result = build_result(
2416            output,
2417            true,
2418            Some(0),
2419            TerminationType::Natural,
2420            extracted.clone(),
2421            None,
2422        );
2423
2424        assert_eq!(result.output, String::from_utf8_lossy(output));
2425        assert!(result.stripped_output.contains("Hello"));
2426        assert!(!result.stripped_output.contains("\x1b["));
2427        assert_eq!(result.extracted_text, extracted);
2428        assert!(result.success);
2429        assert_eq!(result.exit_code, Some(0));
2430        assert_eq!(result.termination, TerminationType::Natural);
2431    }
2432
2433    #[cfg(unix)]
2434    #[tokio::test]
2435    async fn test_run_observe_executes_arg_prompt() {
2436        let temp_dir = TempDir::new().expect("temp dir");
2437        let backend = CliBackend {
2438            command: "sh".to_string(),
2439            args: vec!["-c".to_string()],
2440            prompt_mode: PromptMode::Arg,
2441            prompt_flag: None,
2442            output_format: OutputFormat::Text,
2443            env_vars: vec![],
2444        };
2445        let config = PtyConfig {
2446            interactive: false,
2447            idle_timeout_secs: 0,
2448            cols: 80,
2449            rows: 24,
2450            workspace_root: temp_dir.path().to_path_buf(),
2451        };
2452        let executor = PtyExecutor::new(backend, config);
2453        let (_tx, rx) = tokio::sync::watch::channel(false);
2454
2455        let result = executor
2456            .run_observe("echo hello-pty", rx)
2457            .await
2458            .expect("run_observe");
2459
2460        assert!(result.success);
2461        assert!(result.output.contains("hello-pty"));
2462        assert!(result.stripped_output.contains("hello-pty"));
2463        assert_eq!(result.exit_code, Some(0));
2464        assert_eq!(result.termination, TerminationType::Natural);
2465    }
2466
2467    #[cfg(unix)]
2468    #[tokio::test]
2469    async fn test_run_observe_writes_stdin_prompt() {
2470        let temp_dir = TempDir::new().expect("temp dir");
2471        let backend = CliBackend {
2472            command: "sh".to_string(),
2473            args: vec!["-c".to_string(), "read line; echo \"$line\"".to_string()],
2474            prompt_mode: PromptMode::Stdin,
2475            prompt_flag: None,
2476            output_format: OutputFormat::Text,
2477            env_vars: vec![],
2478        };
2479        let config = PtyConfig {
2480            interactive: false,
2481            idle_timeout_secs: 0,
2482            cols: 80,
2483            rows: 24,
2484            workspace_root: temp_dir.path().to_path_buf(),
2485        };
2486        let executor = PtyExecutor::new(backend, config);
2487        let (_tx, rx) = tokio::sync::watch::channel(false);
2488
2489        let result = executor
2490            .run_observe("stdin-line", rx)
2491            .await
2492            .expect("run_observe");
2493
2494        assert!(result.success);
2495        assert!(result.output.contains("stdin-line"));
2496        assert!(result.stripped_output.contains("stdin-line"));
2497        assert_eq!(result.termination, TerminationType::Natural);
2498    }
2499
2500    #[cfg(unix)]
2501    #[tokio::test]
2502    async fn test_run_observe_streaming_text_routes_output() {
2503        let temp_dir = TempDir::new().expect("temp dir");
2504        let backend = CliBackend {
2505            command: "sh".to_string(),
2506            args: vec!["-c".to_string()],
2507            prompt_mode: PromptMode::Arg,
2508            prompt_flag: None,
2509            output_format: OutputFormat::Text,
2510            env_vars: vec![],
2511        };
2512        let config = PtyConfig {
2513            interactive: false,
2514            idle_timeout_secs: 0,
2515            cols: 80,
2516            rows: 24,
2517            workspace_root: temp_dir.path().to_path_buf(),
2518        };
2519        let executor = PtyExecutor::new(backend, config);
2520        let (_tx, rx) = tokio::sync::watch::channel(false);
2521        let mut handler = CapturingHandler::default();
2522
2523        let result = executor
2524            .run_observe_streaming("printf 'alpha\\nbeta\\n'", rx, &mut handler)
2525            .await
2526            .expect("run_observe_streaming");
2527
2528        assert!(result.success);
2529        let captured = handler.texts.join("");
2530        assert!(captured.contains("alpha"), "captured: {captured}");
2531        assert!(captured.contains("beta"), "captured: {captured}");
2532        assert!(handler.completions.is_empty());
2533        assert!(result.extracted_text.is_empty());
2534    }
2535
2536    #[cfg(unix)]
2537    #[tokio::test]
2538    async fn test_run_observe_streaming_parses_stream_json() {
2539        let temp_dir = TempDir::new().expect("temp dir");
2540        let backend = CliBackend {
2541            command: "sh".to_string(),
2542            args: vec!["-c".to_string()],
2543            prompt_mode: PromptMode::Arg,
2544            prompt_flag: None,
2545            output_format: OutputFormat::StreamJson,
2546            env_vars: vec![],
2547        };
2548        let config = PtyConfig {
2549            interactive: false,
2550            idle_timeout_secs: 0,
2551            cols: 80,
2552            rows: 24,
2553            workspace_root: temp_dir.path().to_path_buf(),
2554        };
2555        let executor = PtyExecutor::new(backend, config);
2556        let (_tx, rx) = tokio::sync::watch::channel(false);
2557        let mut handler = CapturingHandler::default();
2558
2559        let script = r#"printf '%s\n' '{"type":"assistant","message":{"content":[{"type":"text","text":"Hello stream"}]}}' '{"type":"result","duration_ms":1,"total_cost_usd":0.0,"num_turns":1,"is_error":false}'"#;
2560        let result = executor
2561            .run_observe_streaming(script, rx, &mut handler)
2562            .await
2563            .expect("run_observe_streaming");
2564
2565        assert!(result.success);
2566        assert!(
2567            handler
2568                .texts
2569                .iter()
2570                .any(|text| text.contains("Hello stream"))
2571        );
2572        assert_eq!(handler.completions.len(), 1);
2573        assert!(result.extracted_text.contains("Hello stream"));
2574        assert_eq!(result.termination, TerminationType::Natural);
2575    }
2576
2577    #[cfg(unix)]
2578    #[tokio::test]
2579    async fn test_run_interactive_in_tui_mode() {
2580        let temp_dir = TempDir::new().expect("temp dir");
2581        let backend = CliBackend {
2582            command: "sh".to_string(),
2583            args: vec!["-c".to_string()],
2584            prompt_mode: PromptMode::Arg,
2585            prompt_flag: None,
2586            output_format: OutputFormat::Text,
2587            env_vars: vec![],
2588        };
2589        let config = PtyConfig {
2590            interactive: true,
2591            idle_timeout_secs: 0,
2592            cols: 80,
2593            rows: 24,
2594            workspace_root: temp_dir.path().to_path_buf(),
2595        };
2596        let mut executor = PtyExecutor::new(backend, config);
2597        executor.set_tui_mode(true);
2598        let (_tx, rx) = tokio::sync::watch::channel(false);
2599
2600        let result = executor
2601            .run_interactive("echo hello-tui", rx)
2602            .await
2603            .expect("run_interactive");
2604
2605        assert!(result.success);
2606        assert!(result.output.contains("hello-tui"));
2607        assert!(result.stripped_output.contains("hello-tui"));
2608        assert_eq!(result.exit_code, Some(0));
2609        assert_eq!(result.termination, TerminationType::Natural);
2610    }
2611}