Skip to main content

ralph_adapters/
pty_executor.rs

1//! PTY executor for running prompts with full terminal emulation.
2//!
3//! Spawns CLI tools in a pseudo-terminal to preserve rich TUI features like
4//! colors, spinners, and animations. Supports both interactive mode (user
5//! input forwarded) and observe mode (output-only).
6//!
7//! Key features:
8//! - PTY creation via `portable-pty` for cross-platform support
9//! - Idle timeout with activity tracking (output AND input reset timer)
10//! - Double Ctrl+C handling (first forwards, second terminates)
11//! - Raw mode management with cleanup on exit/crash
12//!
13//! Architecture:
14//! - Uses `tokio::select!` for non-blocking I/O multiplexing
15//! - Spawns separate tasks for PTY output and user input
16//! - Enables responsive Ctrl+C handling even when PTY is idle
17
18// Exit codes and PIDs are always within i32 range in practice
19#![allow(clippy::cast_possible_wrap)]
20
21use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
22use crate::cli_backend::{CliBackend, OutputFormat};
23use crate::copilot_stream::{
24    CopilotStreamParser, CopilotStreamState, dispatch_copilot_stream_event,
25};
26use crate::pi_stream::{PiSessionState, PiStreamParser, dispatch_pi_stream_event};
27use crate::stream_handler::{SessionResult, StreamHandler};
28#[cfg(unix)]
29use nix::sys::signal::{Signal, kill};
30#[cfg(unix)]
31use nix::unistd::Pid;
32use portable_pty::{CommandBuilder, PtyPair, PtySize, native_pty_system};
33use std::env;
34use std::io::{self, Read, Write};
35use std::sync::Arc;
36use std::sync::atomic::{AtomicBool, Ordering};
37use std::time::{Duration, Instant};
38use tokio::sync::{mpsc, watch};
39use tracing::{debug, info, warn};
40
41/// Result of a PTY execution.
42#[derive(Debug)]
43pub struct PtyExecutionResult {
44    /// The accumulated output (ANSI sequences preserved).
45    pub output: String,
46    /// The ANSI-stripped output for event parsing.
47    pub stripped_output: String,
48    /// Extracted text content from NDJSON stream (for Claude's stream-json output).
49    /// When Claude outputs `--output-format stream-json`, event tags like
50    /// `<event topic="...">` are inside JSON string values. This field contains
51    /// the extracted text content for proper event parsing.
52    /// Empty for non-JSON backends (use `stripped_output` instead).
53    pub extracted_text: String,
54    /// Whether the process exited successfully.
55    pub success: bool,
56    /// The exit code if available.
57    pub exit_code: Option<i32>,
58    /// How the process was terminated.
59    pub termination: TerminationType,
60    /// Total session cost in USD, if available from stream metadata.
61    pub total_cost_usd: f64,
62    /// Total input tokens in the session.
63    pub input_tokens: u64,
64    /// Total output tokens in the session.
65    pub output_tokens: u64,
66    /// Total cache-read tokens in the session.
67    pub cache_read_tokens: u64,
68    /// Total cache-write tokens in the session.
69    pub cache_write_tokens: u64,
70}
71
72/// How the PTY process was terminated.
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum TerminationType {
75    /// Process exited naturally.
76    Natural,
77    /// Terminated due to idle timeout.
78    IdleTimeout,
79    /// Terminated by user (double Ctrl+C).
80    UserInterrupt,
81    /// Force killed by user (Ctrl+\).
82    ForceKill,
83}
84
85/// Configuration for PTY execution.
86#[derive(Debug, Clone)]
87pub struct PtyConfig {
88    /// Enable interactive mode (forward user input).
89    pub interactive: bool,
90    /// Idle timeout in seconds (0 = disabled).
91    pub idle_timeout_secs: u32,
92    /// Terminal width.
93    pub cols: u16,
94    /// Terminal height.
95    pub rows: u16,
96    /// Workspace root directory for command execution.
97    /// This is captured at startup to avoid `current_dir()` failures when the
98    /// working directory no longer exists (e.g., in E2E test workspaces).
99    pub workspace_root: std::path::PathBuf,
100}
101
102impl Default for PtyConfig {
103    fn default() -> Self {
104        Self {
105            interactive: true,
106            idle_timeout_secs: 30,
107            cols: 80,
108            rows: 24,
109            workspace_root: std::env::current_dir()
110                .unwrap_or_else(|_| std::path::PathBuf::from(".")),
111        }
112    }
113}
114
115impl PtyConfig {
116    /// Creates config from environment, falling back to defaults.
117    pub fn from_env() -> Self {
118        let cols = std::env::var("COLUMNS")
119            .ok()
120            .and_then(|s| s.parse().ok())
121            .unwrap_or(80);
122        let rows = std::env::var("LINES")
123            .ok()
124            .and_then(|s| s.parse().ok())
125            .unwrap_or(24);
126
127        Self {
128            cols,
129            rows,
130            ..Default::default()
131        }
132    }
133
134    /// Sets the workspace root directory.
135    pub fn with_workspace_root(mut self, root: impl Into<std::path::PathBuf>) -> Self {
136        self.workspace_root = root.into();
137        self
138    }
139}
140
141/// State machine for double Ctrl+C detection.
142#[derive(Debug)]
143pub struct CtrlCState {
144    /// When the first Ctrl+C was pressed (if any).
145    first_press: Option<Instant>,
146    /// Window duration for double-press detection.
147    window: Duration,
148}
149
150/// Action to take after handling Ctrl+C.
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum CtrlCAction {
153    /// Forward the Ctrl+C to Claude and start/restart the window.
154    ForwardAndStartWindow,
155    /// Terminate Claude (second Ctrl+C within window).
156    Terminate,
157}
158
159impl CtrlCState {
160    /// Creates a new Ctrl+C state tracker.
161    pub fn new() -> Self {
162        Self {
163            first_press: None,
164            window: Duration::from_secs(1),
165        }
166    }
167
168    /// Handles a Ctrl+C keypress and returns the action to take.
169    pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
170        match self.first_press {
171            Some(first) if now.duration_since(first) < self.window => {
172                // Second Ctrl+C within window - terminate
173                self.first_press = None;
174                CtrlCAction::Terminate
175            }
176            _ => {
177                // First Ctrl+C or window expired - forward and start window
178                self.first_press = Some(now);
179                CtrlCAction::ForwardAndStartWindow
180            }
181        }
182    }
183}
184
185impl Default for CtrlCState {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191/// Executor for running prompts in a pseudo-terminal.
192pub struct PtyExecutor {
193    backend: CliBackend,
194    config: PtyConfig,
195    // Channel ends for TUI integration
196    output_tx: mpsc::UnboundedSender<Vec<u8>>,
197    output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
198    input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
199    input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
200    control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
201    control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
202    // Termination notification for TUI
203    terminated_tx: watch::Sender<bool>,
204    terminated_rx: Option<watch::Receiver<bool>>,
205    // Explicit TUI mode flag - set via set_tui_mode() when TUI is connected.
206    // This replaces the previous inference via output_rx.is_none() which broke
207    // after the streaming refactor (handle() is no longer called in TUI mode).
208    tui_mode: bool,
209}
210
211impl PtyExecutor {
212    /// Creates a new PTY executor with the given backend and configuration.
213    pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
214        let (output_tx, output_rx) = mpsc::unbounded_channel();
215        let (input_tx, input_rx) = mpsc::unbounded_channel();
216        let (control_tx, control_rx) = mpsc::unbounded_channel();
217        let (terminated_tx, terminated_rx) = watch::channel(false);
218
219        Self {
220            backend,
221            config,
222            output_tx,
223            output_rx: Some(output_rx),
224            input_tx: Some(input_tx),
225            input_rx,
226            control_tx: Some(control_tx),
227            control_rx,
228            terminated_tx,
229            terminated_rx: Some(terminated_rx),
230            tui_mode: false,
231        }
232    }
233
234    /// Sets the TUI mode flag.
235    ///
236    /// When TUI mode is enabled, PTY output is sent to the TUI channel instead of
237    /// being written directly to stdout. This flag must be set before calling any
238    /// of the run methods when using the TUI.
239    ///
240    /// # Arguments
241    /// * `enabled` - Whether TUI mode should be active
242    pub fn set_tui_mode(&mut self, enabled: bool) {
243        self.tui_mode = enabled;
244    }
245
246    /// Updates the backend configuration for this executor.
247    ///
248    /// This allows switching backends between iterations without recreating
249    /// the entire executor. Critical for hat-level backend configuration support.
250    ///
251    /// # Arguments
252    /// * `backend` - The new backend configuration to use
253    pub fn set_backend(&mut self, backend: CliBackend) {
254        self.backend = backend;
255    }
256
257    /// Returns a handle for TUI integration.
258    ///
259    /// Can only be called once - panics if called multiple times.
260    pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
261        crate::pty_handle::PtyHandle {
262            output_rx: self.output_rx.take().expect("handle() already called"),
263            input_tx: self.input_tx.take().expect("handle() already called"),
264            control_tx: self.control_tx.take().expect("handle() already called"),
265            terminated_rx: self.terminated_rx.take().expect("handle() already called"),
266        }
267    }
268
269    /// Spawns Claude in a PTY and returns the PTY pair, child process, stdin input, and temp file.
270    ///
271    /// The temp file is returned to keep it alive for the duration of execution.
272    /// For large prompts (>7000 chars), Claude is instructed to read from a temp file.
273    /// If the temp file is dropped before Claude reads it, the file is deleted and Claude hangs.
274    ///
275    /// The stdin_input is returned so callers can write it to the PTY after taking the writer.
276    /// This is necessary because `take_writer()` can only be called once per PTY.
277    fn spawn_pty(
278        &self,
279        prompt: &str,
280    ) -> io::Result<(
281        PtyPair,
282        Box<dyn portable_pty::Child + Send>,
283        Option<String>,
284        Option<tempfile::NamedTempFile>,
285    )> {
286        let pty_system = native_pty_system();
287
288        let pair = pty_system
289            .openpty(PtySize {
290                rows: self.config.rows,
291                cols: self.config.cols,
292                pixel_width: 0,
293                pixel_height: 0,
294            })
295            .map_err(|e| io::Error::other(e.to_string()))?;
296
297        // Build the command. For non-interactive PTY mode with large prompts,
298        // force arg mode because the PTY line discipline limits canonical
299        // input to ~4 KB per line. Large prompts (30-50 KB+) deadlock when
300        // written through PTY stdin. By forcing arg mode, the prompt is passed
301        // as a command argument (or via temp file for prompts > 7000 chars),
302        // bypassing the PTY input path entirely.  See #280.
303        let use_pty_safe = !self.config.interactive && prompt.len() > 4000;
304        let (cmd, args, stdin_input, temp_file) = if use_pty_safe {
305            self.backend.build_command_pty(prompt)
306        } else {
307            self.backend.build_command(prompt, self.config.interactive)
308        };
309
310        let mut cmd_builder = CommandBuilder::new(&cmd);
311        cmd_builder.args(&args);
312
313        // Set explicit working directory from config (captured at startup to avoid
314        // current_dir() failures when workspace no longer exists)
315        cmd_builder.cwd(&self.config.workspace_root);
316
317        // Set up environment for PTY
318        cmd_builder.env("TERM", "xterm-256color");
319        inject_ralph_runtime_env(&mut cmd_builder, &self.config.workspace_root);
320
321        // Apply backend-specific environment variables (e.g., Agent Teams env var)
322        for (key, value) in &self.backend.env_vars {
323            cmd_builder.env(key, value);
324        }
325        let child = pair
326            .slave
327            .spawn_command(cmd_builder)
328            .map_err(|e| io::Error::other(e.to_string()))?;
329
330        // Return stdin_input so callers can write it after taking the writer
331        Ok((pair, child, stdin_input, temp_file))
332    }
333
334    /// Runs in observe mode (output-only, no input forwarding).
335    ///
336    /// This is an async function that listens for interrupt signals via the shared
337    /// `interrupt_rx` watch channel from the event loop.
338    /// Uses a separate thread for blocking PTY reads and tokio::select! for signal handling.
339    ///
340    /// Returns when the process exits, idle timeout triggers, or interrupt is received.
341    ///
342    /// # Arguments
343    /// * `prompt` - The prompt to execute
344    /// * `interrupt_rx` - Watch channel receiver for interrupt signals from the event loop
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if PTY allocation fails, the command cannot be spawned,
349    /// or an I/O error occurs during output handling.
350    pub async fn run_observe(
351        &self,
352        prompt: &str,
353        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
354    ) -> io::Result<PtyExecutionResult> {
355        // Keep temp_file alive for the duration of execution (large prompts use temp files)
356        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
357
358        let reader = pair
359            .master
360            .try_clone_reader()
361            .map_err(|e| io::Error::other(e.to_string()))?;
362
363        // Write stdin input if present (for stdin prompt mode)
364        if let Some(ref input) = stdin_input {
365            // Small delay to let process initialize
366            tokio::time::sleep(Duration::from_millis(100)).await;
367            let mut writer = pair
368                .master
369                .take_writer()
370                .map_err(|e| io::Error::other(e.to_string()))?;
371            writer.write_all(input.as_bytes())?;
372            writer.write_all(b"\n")?;
373            writer.flush()?;
374        }
375
376        // Drop the slave to signal EOF when master closes
377        drop(pair.slave);
378
379        let mut output = Vec::new();
380        let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
381            None
382        } else {
383            Some(Duration::from_secs(u64::from(
384                self.config.idle_timeout_secs,
385            )))
386        };
387
388        let mut termination = TerminationType::Natural;
389        let mut last_activity = Instant::now();
390
391        // Flag for termination request (shared with reader thread)
392        let should_terminate = Arc::new(AtomicBool::new(false));
393
394        // Spawn blocking reader thread that sends output via channel
395        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
396        let should_terminate_reader = Arc::clone(&should_terminate);
397        // Check if TUI is handling output (output_rx taken by handle())
398        let tui_connected = self.tui_mode;
399        let tui_output_tx = if tui_connected {
400            Some(self.output_tx.clone())
401        } else {
402            None
403        };
404
405        debug!("Spawning PTY output reader thread (observe mode)");
406        std::thread::spawn(move || {
407            let mut reader = reader;
408            let mut buf = [0u8; 4096];
409
410            loop {
411                if should_terminate_reader.load(Ordering::SeqCst) {
412                    debug!("PTY reader: termination requested");
413                    break;
414                }
415
416                match reader.read(&mut buf) {
417                    Ok(0) => {
418                        debug!("PTY reader: EOF");
419                        let _ = output_tx.blocking_send(OutputEvent::Eof);
420                        break;
421                    }
422                    Ok(n) => {
423                        let data = buf[..n].to_vec();
424                        // Send to TUI channel if connected
425                        if let Some(ref tx) = tui_output_tx {
426                            let _ = tx.send(data.clone());
427                        }
428                        // Send to main loop
429                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
430                            break;
431                        }
432                    }
433                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
434                        std::thread::sleep(Duration::from_millis(10));
435                    }
436                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
437                    Err(e) => {
438                        debug!(error = %e, "PTY reader error");
439                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
440                        break;
441                    }
442                }
443            }
444        });
445
446        // Main event loop using tokio::select! for interruptibility
447        loop {
448            // Calculate timeout for idle check
449            let idle_timeout = timeout_duration.map(|d| {
450                let elapsed = last_activity.elapsed();
451                if elapsed >= d {
452                    Duration::from_millis(1) // Trigger immediately
453                } else {
454                    d.saturating_sub(elapsed)
455                }
456            });
457
458            tokio::select! {
459                // Check for interrupt signal from event loop
460                _ = interrupt_rx.changed() => {
461                    if *interrupt_rx.borrow() {
462                        debug!("Interrupt received in observe mode, terminating");
463                        termination = TerminationType::UserInterrupt;
464                        should_terminate.store(true, Ordering::SeqCst);
465                        let _ = self.terminate_child(&mut child, true).await;
466                        break;
467                    }
468                }
469
470                // Check for output from reader thread
471                event = output_rx.recv() => {
472                    match event {
473                        Some(OutputEvent::Data(data)) => {
474                            // Only write to stdout if TUI is NOT handling output
475                            if !tui_connected {
476                                io::stdout().write_all(&data)?;
477                                io::stdout().flush()?;
478                            }
479                            output.extend_from_slice(&data);
480                            last_activity = Instant::now();
481                        }
482                        Some(OutputEvent::Eof) | None => {
483                            debug!("Output channel closed, process likely exited");
484                            break;
485                        }
486                        Some(OutputEvent::Error(e)) => {
487                            debug!(error = %e, "Reader thread reported error");
488                            break;
489                        }
490                    }
491                }
492
493                // Check for idle timeout
494                _ = async {
495                    if let Some(timeout) = idle_timeout {
496                        tokio::time::sleep(timeout).await;
497                    } else {
498                        // No timeout configured, wait forever
499                        std::future::pending::<()>().await;
500                    }
501                } => {
502                    warn!(
503                        timeout_secs = self.config.idle_timeout_secs,
504                        "Idle timeout triggered"
505                    );
506                    termination = TerminationType::IdleTimeout;
507                    should_terminate.store(true, Ordering::SeqCst);
508                    self.terminate_child(&mut child, true).await?;
509                    break;
510                }
511            }
512
513            // Check if child has exited
514            if let Some(status) = child
515                .try_wait()
516                .map_err(|e| io::Error::other(e.to_string()))?
517            {
518                let exit_code = status.exit_code() as i32;
519                debug!(exit_status = ?status, exit_code, "Child process exited");
520
521                // Drain any remaining output from channel
522                while let Ok(event) = output_rx.try_recv() {
523                    if let OutputEvent::Data(data) = event {
524                        if !tui_connected {
525                            io::stdout().write_all(&data)?;
526                            io::stdout().flush()?;
527                        }
528                        output.extend_from_slice(&data);
529                    }
530                }
531
532                // Give the reader thread a brief window to flush any final bytes/EOF.
533                // This avoids races where fast-exiting commands can drop tail output.
534                let drain_deadline = Instant::now() + Duration::from_millis(200);
535                loop {
536                    let remaining = drain_deadline.saturating_duration_since(Instant::now());
537                    if remaining.is_zero() {
538                        break;
539                    }
540                    match tokio::time::timeout(remaining, output_rx.recv()).await {
541                        Ok(Some(OutputEvent::Data(data))) => {
542                            if !tui_connected {
543                                io::stdout().write_all(&data)?;
544                                io::stdout().flush()?;
545                            }
546                            output.extend_from_slice(&data);
547                        }
548                        Ok(Some(OutputEvent::Eof) | None) => break,
549                        Ok(Some(OutputEvent::Error(e))) => {
550                            debug!(error = %e, "PTY read error after exit");
551                            break;
552                        }
553                        Err(_) => break,
554                    }
555                }
556
557                let final_termination = resolve_termination_type(exit_code, termination);
558                // run_observe doesn't parse JSON, so extracted_text is empty
559                return Ok(build_result(
560                    &output,
561                    status.success(),
562                    Some(exit_code),
563                    final_termination,
564                    String::new(),
565                    None,
566                ));
567            }
568        }
569
570        // Signal reader thread to stop
571        should_terminate.store(true, Ordering::SeqCst);
572
573        // Wait for child to fully exit (interruptible + bounded)
574        let status = self
575            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
576            .await?;
577
578        let (success, exit_code, final_termination) = match status {
579            Some(s) => {
580                let code = s.exit_code() as i32;
581                (
582                    s.success(),
583                    Some(code),
584                    resolve_termination_type(code, termination),
585                )
586            }
587            None => {
588                warn!("Timed out waiting for child to exit after termination");
589                (false, None, termination)
590            }
591        };
592
593        // run_observe doesn't parse JSON, so extracted_text is empty
594        Ok(build_result(
595            &output,
596            success,
597            exit_code,
598            final_termination,
599            String::new(),
600            None,
601        ))
602    }
603
604    /// Runs in observe mode with streaming event handling for JSON output.
605    ///
606    /// When the backend's output format is `StreamJson`, this method parses
607    /// NDJSON lines and dispatches events to the provided handler for real-time
608    /// display. For `Text` format, behaves identically to `run_observe`.
609    ///
610    /// # Arguments
611    /// * `prompt` - The prompt to execute
612    /// * `interrupt_rx` - Watch channel receiver for interrupt signals
613    /// * `handler` - Handler to receive streaming events
614    ///
615    /// # Errors
616    ///
617    /// Returns an error if PTY allocation fails, the command cannot be spawned,
618    /// or an I/O error occurs during output handling.
619    pub async fn run_observe_streaming<H: StreamHandler>(
620        &self,
621        prompt: &str,
622        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
623        handler: &mut H,
624    ) -> io::Result<PtyExecutionResult> {
625        // Check output format to decide parsing strategy
626        let output_format = self.backend.output_format;
627
628        // StreamJson format uses NDJSON line parsing (Claude)
629        // CopilotStreamJson format uses JSONL line parsing (Copilot prompt mode)
630        // PiStreamJson format uses NDJSON line parsing (Pi)
631        // Text format streams raw output directly to handler
632        let is_stream_json = output_format == OutputFormat::StreamJson;
633        let is_copilot_stream = output_format == OutputFormat::CopilotStreamJson;
634        let is_pi_stream = output_format == OutputFormat::PiStreamJson;
635        // Pi thinking deltas are noisy for plain console output but useful in TUI.
636        let show_pi_thinking = is_pi_stream && self.tui_mode;
637        let is_real_pi_backend = self.backend.command == "pi";
638
639        if is_pi_stream && is_real_pi_backend {
640            let configured_provider =
641                extract_cli_flag_value(&self.backend.args, "--provider", "-p")
642                    .unwrap_or_else(|| "auto".to_string());
643            let configured_model = extract_cli_flag_value(&self.backend.args, "--model", "-m")
644                .unwrap_or_else(|| "default".to_string());
645            handler.on_text(&format!(
646                "Pi configured: provider={configured_provider}, model={configured_model}\n"
647            ));
648        }
649
650        // Keep temp_file alive for the duration of execution
651        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
652
653        let reader = pair
654            .master
655            .try_clone_reader()
656            .map_err(|e| io::Error::other(e.to_string()))?;
657
658        // Write stdin input if present (for stdin prompt mode)
659        if let Some(ref input) = stdin_input {
660            tokio::time::sleep(Duration::from_millis(100)).await;
661            let mut writer = pair
662                .master
663                .take_writer()
664                .map_err(|e| io::Error::other(e.to_string()))?;
665            writer.write_all(input.as_bytes())?;
666            writer.write_all(b"\n")?;
667            writer.flush()?;
668        }
669
670        drop(pair.slave);
671
672        let mut output = Vec::new();
673        let mut line_buffer = String::new();
674        // Accumulate extracted text from NDJSON for event parsing
675        let mut extracted_text = String::new();
676        // Pi session state for accumulating cost/turns (wall-clock for duration)
677        let mut pi_state = PiSessionState::new();
678        let mut copilot_state = CopilotStreamState::new();
679        let mut completion: Option<SessionResult> = None;
680        let start_time = Instant::now();
681        let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
682            None
683        } else {
684            Some(Duration::from_secs(u64::from(
685                self.config.idle_timeout_secs,
686            )))
687        };
688
689        let mut termination = TerminationType::Natural;
690        let mut last_activity = Instant::now();
691
692        let should_terminate = Arc::new(AtomicBool::new(false));
693
694        // Spawn blocking reader thread
695        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
696        let should_terminate_reader = Arc::clone(&should_terminate);
697        let tui_connected = self.tui_mode;
698        let tui_output_tx = if tui_connected {
699            Some(self.output_tx.clone())
700        } else {
701            None
702        };
703
704        debug!("Spawning PTY output reader thread (streaming mode)");
705        std::thread::spawn(move || {
706            let mut reader = reader;
707            let mut buf = [0u8; 4096];
708
709            loop {
710                if should_terminate_reader.load(Ordering::SeqCst) {
711                    debug!("PTY reader: termination requested");
712                    break;
713                }
714
715                match reader.read(&mut buf) {
716                    Ok(0) => {
717                        debug!("PTY reader: EOF");
718                        let _ = output_tx.blocking_send(OutputEvent::Eof);
719                        break;
720                    }
721                    Ok(n) => {
722                        let data = buf[..n].to_vec();
723                        if let Some(ref tx) = tui_output_tx {
724                            let _ = tx.send(data.clone());
725                        }
726                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
727                            break;
728                        }
729                    }
730                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
731                        std::thread::sleep(Duration::from_millis(10));
732                    }
733                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
734                    Err(e) => {
735                        debug!(error = %e, "PTY reader error");
736                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
737                        break;
738                    }
739                }
740            }
741        });
742
743        // Main event loop with JSON line parsing
744        loop {
745            let idle_timeout = timeout_duration.map(|d| {
746                let elapsed = last_activity.elapsed();
747                if elapsed >= d {
748                    Duration::from_millis(1)
749                } else {
750                    d.saturating_sub(elapsed)
751                }
752            });
753
754            tokio::select! {
755                _ = interrupt_rx.changed() => {
756                    if *interrupt_rx.borrow() {
757                        debug!("Interrupt received in streaming observe mode, terminating");
758                        termination = TerminationType::UserInterrupt;
759                        should_terminate.store(true, Ordering::SeqCst);
760                        let _ = self.terminate_child(&mut child, true).await;
761                        break;
762                    }
763                }
764
765                event = output_rx.recv() => {
766                    match event {
767                        Some(OutputEvent::Data(data)) => {
768                            output.extend_from_slice(&data);
769                            last_activity = Instant::now();
770
771                            if let Ok(text) = std::str::from_utf8(&data) {
772                                if is_stream_json {
773                                    // StreamJson format: Parse JSON lines from the data
774                                    line_buffer.push_str(text);
775
776                                    // Process complete lines
777                                    while let Some(newline_pos) = line_buffer.find('\n') {
778                                        let line = line_buffer[..newline_pos].to_string();
779                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
780
781                                        if let Some(event) = ClaudeStreamParser::parse_line(&line) {
782                                            if let ClaudeStreamEvent::Result {
783                                                duration_ms,
784                                                total_cost_usd,
785                                                num_turns,
786                                                is_error,
787                                            } = &event
788                                            {
789                                                completion = Some(SessionResult {
790                                                    duration_ms: *duration_ms,
791                                                    total_cost_usd: *total_cost_usd,
792                                                    num_turns: *num_turns,
793                                                    is_error: *is_error,
794                                                    ..Default::default()
795                                                });
796                                            }
797                                            dispatch_stream_event(event, handler, &mut extracted_text);
798                                        }
799                                    }
800                                } else if is_copilot_stream {
801                                    line_buffer.push_str(text);
802
803                                    while let Some(newline_pos) = line_buffer.find('\n') {
804                                        let line = line_buffer[..newline_pos].to_string();
805                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
806
807                                        if let Some(session_result) = handle_copilot_stream_line(
808                                            &line,
809                                            handler,
810                                            &mut extracted_text,
811                                            &mut copilot_state,
812                                        ) {
813                                            completion = Some(session_result);
814                                        }
815                                    }
816                                } else if is_pi_stream {
817                                    // PiStreamJson format: Parse NDJSON lines from pi
818                                    line_buffer.push_str(text);
819
820                                    while let Some(newline_pos) = line_buffer.find('\n') {
821                                        let line = line_buffer[..newline_pos].to_string();
822                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
823
824                                        if let Some(event) = PiStreamParser::parse_line(&line) {
825                                            dispatch_pi_stream_event(
826                                                event,
827                                                handler,
828                                                &mut extracted_text,
829                                                &mut pi_state,
830                                                show_pi_thinking,
831                                            );
832                                        }
833                                    }
834                                } else {
835                                    // Text format: Stream raw output directly to handler
836                                    // This preserves ANSI escape codes for TUI rendering
837                                    handler.on_text(text);
838                                }
839                            }
840                        }
841                        Some(OutputEvent::Eof) | None => {
842                            debug!("Output channel closed");
843                            // Process any remaining content in buffer
844                            if is_stream_json && !line_buffer.is_empty()
845                                && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
846                            {
847                                if let ClaudeStreamEvent::Result {
848                                    duration_ms,
849                                    total_cost_usd,
850                                    num_turns,
851                                    is_error,
852                                } = &event
853                                {
854                                    completion = Some(SessionResult {
855                                        duration_ms: *duration_ms,
856                                        total_cost_usd: *total_cost_usd,
857                                        num_turns: *num_turns,
858                                        is_error: *is_error,
859                                        ..Default::default()
860                                    });
861                                }
862                                dispatch_stream_event(event, handler, &mut extracted_text);
863                            } else if is_copilot_stream && !line_buffer.is_empty() {
864                                if let Some(session_result) = handle_copilot_stream_line(
865                                    &line_buffer,
866                                    handler,
867                                    &mut extracted_text,
868                                    &mut copilot_state,
869                                ) {
870                                    completion = Some(session_result);
871                                }
872                            } else if is_pi_stream && !line_buffer.is_empty()
873                                && let Some(event) = PiStreamParser::parse_line(&line_buffer)
874                            {
875                                dispatch_pi_stream_event(
876                                    event,
877                                    handler,
878                                    &mut extracted_text,
879                                    &mut pi_state,
880                                    show_pi_thinking,
881                                );
882                            }
883                            break;
884                        }
885                        Some(OutputEvent::Error(e)) => {
886                            debug!(error = %e, "Reader thread reported error");
887                            handler.on_error(&e);
888                            break;
889                        }
890                    }
891                }
892
893                _ = async {
894                    if let Some(timeout) = idle_timeout {
895                        tokio::time::sleep(timeout).await;
896                    } else {
897                        std::future::pending::<()>().await;
898                    }
899                } => {
900                    warn!(
901                        timeout_secs = self.config.idle_timeout_secs,
902                        "Idle timeout triggered"
903                    );
904                    termination = TerminationType::IdleTimeout;
905                    should_terminate.store(true, Ordering::SeqCst);
906                    self.terminate_child(&mut child, true).await?;
907                    break;
908                }
909            }
910
911            // Check if child has exited
912            if let Some(status) = child
913                .try_wait()
914                .map_err(|e| io::Error::other(e.to_string()))?
915            {
916                let exit_code = status.exit_code() as i32;
917                debug!(exit_status = ?status, exit_code, "Child process exited");
918
919                // Drain remaining output
920                while let Ok(event) = output_rx.try_recv() {
921                    if let OutputEvent::Data(data) = event {
922                        output.extend_from_slice(&data);
923                        if let Ok(text) = std::str::from_utf8(&data) {
924                            if is_stream_json {
925                                // StreamJson: parse JSON lines
926                                line_buffer.push_str(text);
927                                while let Some(newline_pos) = line_buffer.find('\n') {
928                                    let line = line_buffer[..newline_pos].to_string();
929                                    line_buffer = line_buffer[newline_pos + 1..].to_string();
930                                    if let Some(event) = ClaudeStreamParser::parse_line(&line) {
931                                        if let ClaudeStreamEvent::Result {
932                                            duration_ms,
933                                            total_cost_usd,
934                                            num_turns,
935                                            is_error,
936                                        } = &event
937                                        {
938                                            completion = Some(SessionResult {
939                                                duration_ms: *duration_ms,
940                                                total_cost_usd: *total_cost_usd,
941                                                num_turns: *num_turns,
942                                                is_error: *is_error,
943                                                ..Default::default()
944                                            });
945                                        }
946                                        dispatch_stream_event(event, handler, &mut extracted_text);
947                                    }
948                                }
949                            } else if is_copilot_stream {
950                                line_buffer.push_str(text);
951                                while let Some(newline_pos) = line_buffer.find('\n') {
952                                    let line = line_buffer[..newline_pos].to_string();
953                                    line_buffer = line_buffer[newline_pos + 1..].to_string();
954                                    if let Some(session_result) = handle_copilot_stream_line(
955                                        &line,
956                                        handler,
957                                        &mut extracted_text,
958                                        &mut copilot_state,
959                                    ) {
960                                        completion = Some(session_result);
961                                    }
962                                }
963                            } else if is_pi_stream {
964                                // PiStreamJson: parse NDJSON lines
965                                line_buffer.push_str(text);
966                                while let Some(newline_pos) = line_buffer.find('\n') {
967                                    let line = line_buffer[..newline_pos].to_string();
968                                    line_buffer = line_buffer[newline_pos + 1..].to_string();
969                                    if let Some(event) = PiStreamParser::parse_line(&line) {
970                                        dispatch_pi_stream_event(
971                                            event,
972                                            handler,
973                                            &mut extracted_text,
974                                            &mut pi_state,
975                                            show_pi_thinking,
976                                        );
977                                    }
978                                }
979                            } else {
980                                // Text: stream raw output to handler
981                                handler.on_text(text);
982                            }
983                        }
984                    }
985                }
986
987                // Give the reader thread a brief window to flush any final bytes/EOF.
988                // This avoids races where fast-exiting commands can drop tail output.
989                let drain_deadline = Instant::now() + Duration::from_millis(200);
990                loop {
991                    let remaining = drain_deadline.saturating_duration_since(Instant::now());
992                    if remaining.is_zero() {
993                        break;
994                    }
995                    match tokio::time::timeout(remaining, output_rx.recv()).await {
996                        Ok(Some(OutputEvent::Data(data))) => {
997                            output.extend_from_slice(&data);
998                            if let Ok(text) = std::str::from_utf8(&data) {
999                                if is_stream_json {
1000                                    // StreamJson: parse JSON lines
1001                                    line_buffer.push_str(text);
1002                                    while let Some(newline_pos) = line_buffer.find('\n') {
1003                                        let line = line_buffer[..newline_pos].to_string();
1004                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
1005                                        if let Some(event) = ClaudeStreamParser::parse_line(&line) {
1006                                            if let ClaudeStreamEvent::Result {
1007                                                duration_ms,
1008                                                total_cost_usd,
1009                                                num_turns,
1010                                                is_error,
1011                                            } = &event
1012                                            {
1013                                                completion = Some(SessionResult {
1014                                                    duration_ms: *duration_ms,
1015                                                    total_cost_usd: *total_cost_usd,
1016                                                    num_turns: *num_turns,
1017                                                    is_error: *is_error,
1018                                                    ..Default::default()
1019                                                });
1020                                            }
1021                                            dispatch_stream_event(
1022                                                event,
1023                                                handler,
1024                                                &mut extracted_text,
1025                                            );
1026                                        }
1027                                    }
1028                                } else if is_copilot_stream {
1029                                    line_buffer.push_str(text);
1030                                    while let Some(newline_pos) = line_buffer.find('\n') {
1031                                        let line = line_buffer[..newline_pos].to_string();
1032                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
1033                                        handle_copilot_stream_line(
1034                                            &line,
1035                                            handler,
1036                                            &mut extracted_text,
1037                                            &mut copilot_state,
1038                                        );
1039                                    }
1040                                } else if is_pi_stream {
1041                                    // PiStreamJson: parse NDJSON lines
1042                                    line_buffer.push_str(text);
1043                                    while let Some(newline_pos) = line_buffer.find('\n') {
1044                                        let line = line_buffer[..newline_pos].to_string();
1045                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
1046                                        if let Some(event) = PiStreamParser::parse_line(&line) {
1047                                            dispatch_pi_stream_event(
1048                                                event,
1049                                                handler,
1050                                                &mut extracted_text,
1051                                                &mut pi_state,
1052                                                show_pi_thinking,
1053                                            );
1054                                        }
1055                                    }
1056                                } else {
1057                                    // Text: stream raw output to handler
1058                                    handler.on_text(text);
1059                                }
1060                            }
1061                        }
1062                        Ok(Some(OutputEvent::Eof) | None) => break,
1063                        Ok(Some(OutputEvent::Error(e))) => {
1064                            debug!(error = %e, "PTY read error after exit");
1065                            break;
1066                        }
1067                        Err(_) => break,
1068                    }
1069                }
1070
1071                // Process final buffer content
1072                if is_stream_json
1073                    && !line_buffer.is_empty()
1074                    && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
1075                {
1076                    if let ClaudeStreamEvent::Result {
1077                        duration_ms,
1078                        total_cost_usd,
1079                        num_turns,
1080                        is_error,
1081                    } = &event
1082                    {
1083                        completion = Some(SessionResult {
1084                            duration_ms: *duration_ms,
1085                            total_cost_usd: *total_cost_usd,
1086                            num_turns: *num_turns,
1087                            is_error: *is_error,
1088                            ..Default::default()
1089                        });
1090                    }
1091                    dispatch_stream_event(event, handler, &mut extracted_text);
1092                } else if is_copilot_stream && !line_buffer.is_empty() {
1093                    if let Some(session_result) = handle_copilot_stream_line(
1094                        &line_buffer,
1095                        handler,
1096                        &mut extracted_text,
1097                        &mut copilot_state,
1098                    ) {
1099                        completion = Some(session_result);
1100                    }
1101                } else if is_pi_stream
1102                    && !line_buffer.is_empty()
1103                    && let Some(event) = PiStreamParser::parse_line(&line_buffer)
1104                {
1105                    dispatch_pi_stream_event(
1106                        event,
1107                        handler,
1108                        &mut extracted_text,
1109                        &mut pi_state,
1110                        show_pi_thinking,
1111                    );
1112                }
1113
1114                let final_termination = resolve_termination_type(exit_code, termination);
1115
1116                // Synthesize on_complete for Pi sessions (pi has no dedicated result event)
1117                if is_pi_stream {
1118                    if is_real_pi_backend {
1119                        let stream_provider =
1120                            pi_state.stream_provider.as_deref().unwrap_or("unknown");
1121                        let stream_model = pi_state.stream_model.as_deref().unwrap_or("unknown");
1122                        handler.on_text(&format!(
1123                            "Pi stream: provider={stream_provider}, model={stream_model}\n"
1124                        ));
1125                    }
1126                    let session_result = SessionResult {
1127                        duration_ms: start_time.elapsed().as_millis() as u64,
1128                        total_cost_usd: pi_state.total_cost_usd,
1129                        num_turns: pi_state.num_turns,
1130                        is_error: !status.success(),
1131                        input_tokens: pi_state.input_tokens,
1132                        output_tokens: pi_state.output_tokens,
1133                        cache_read_tokens: pi_state.cache_read_tokens,
1134                        cache_write_tokens: pi_state.cache_write_tokens,
1135                    };
1136                    handler.on_complete(&session_result);
1137                    completion = Some(session_result);
1138                }
1139
1140                // Pass extracted_text for event parsing from NDJSON
1141                return Ok(build_result(
1142                    &output,
1143                    status.success(),
1144                    Some(exit_code),
1145                    final_termination,
1146                    extracted_text,
1147                    completion.as_ref(),
1148                ));
1149            }
1150        }
1151
1152        should_terminate.store(true, Ordering::SeqCst);
1153
1154        let status = self
1155            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1156            .await?;
1157
1158        let (success, exit_code, final_termination) = match status {
1159            Some(s) => {
1160                let code = s.exit_code() as i32;
1161                (
1162                    s.success(),
1163                    Some(code),
1164                    resolve_termination_type(code, termination),
1165                )
1166            }
1167            None => {
1168                warn!("Timed out waiting for child to exit after termination");
1169                (false, None, termination)
1170            }
1171        };
1172
1173        // Synthesize on_complete for Pi sessions (pi has no dedicated result event)
1174        if is_pi_stream {
1175            if is_real_pi_backend {
1176                let stream_provider = pi_state.stream_provider.as_deref().unwrap_or("unknown");
1177                let stream_model = pi_state.stream_model.as_deref().unwrap_or("unknown");
1178                handler.on_text(&format!(
1179                    "Pi stream: provider={stream_provider}, model={stream_model}\n"
1180                ));
1181            }
1182            let session_result = SessionResult {
1183                duration_ms: start_time.elapsed().as_millis() as u64,
1184                total_cost_usd: pi_state.total_cost_usd,
1185                num_turns: pi_state.num_turns,
1186                is_error: !success,
1187                input_tokens: pi_state.input_tokens,
1188                output_tokens: pi_state.output_tokens,
1189                cache_read_tokens: pi_state.cache_read_tokens,
1190                cache_write_tokens: pi_state.cache_write_tokens,
1191            };
1192            handler.on_complete(&session_result);
1193            completion = Some(session_result);
1194        }
1195
1196        // Pass extracted_text for event parsing from NDJSON
1197        Ok(build_result(
1198            &output,
1199            success,
1200            exit_code,
1201            final_termination,
1202            extracted_text,
1203            completion.as_ref(),
1204        ))
1205    }
1206
1207    /// Runs in interactive mode (bidirectional I/O).
1208    ///
1209    /// Uses `tokio::select!` for non-blocking I/O multiplexing between:
1210    /// 1. PTY output (from blocking reader via channel)
1211    /// 2. User input (from stdin thread via channel)
1212    /// 3. Interrupt signal from event loop
1213    /// 4. Idle timeout
1214    ///
1215    /// This design ensures Ctrl+C is always responsive, even when the PTY
1216    /// has no output (e.g., during long-running tool calls).
1217    ///
1218    /// # Arguments
1219    /// * `prompt` - The prompt to execute
1220    /// * `interrupt_rx` - Watch channel receiver for interrupt signals from the event loop
1221    ///
1222    /// # Errors
1223    ///
1224    /// Returns an error if PTY allocation fails, the command cannot be spawned,
1225    /// or an I/O error occurs during bidirectional communication.
1226    #[allow(clippy::too_many_lines)] // Complex state machine requires cohesive implementation
1227    pub async fn run_interactive(
1228        &mut self,
1229        prompt: &str,
1230        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
1231    ) -> io::Result<PtyExecutionResult> {
1232        // Keep temp_file alive for the duration of execution (large prompts use temp files)
1233        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
1234
1235        let reader = pair
1236            .master
1237            .try_clone_reader()
1238            .map_err(|e| io::Error::other(e.to_string()))?;
1239        let mut writer = pair
1240            .master
1241            .take_writer()
1242            .map_err(|e| io::Error::other(e.to_string()))?;
1243
1244        // Keep master for resize operations
1245        let master = pair.master;
1246
1247        // Drop the slave to signal EOF when master closes
1248        drop(pair.slave);
1249
1250        // Store stdin_input for writing after reader thread starts
1251        let pending_stdin = stdin_input;
1252
1253        let mut output = Vec::new();
1254        let timeout_duration = if self.config.idle_timeout_secs > 0 {
1255            Some(Duration::from_secs(u64::from(
1256                self.config.idle_timeout_secs,
1257            )))
1258        } else {
1259            None
1260        };
1261
1262        let mut ctrl_c_state = CtrlCState::new();
1263        let mut termination = TerminationType::Natural;
1264        let mut last_activity = Instant::now();
1265
1266        // Flag for termination request (shared with spawned tasks)
1267        let should_terminate = Arc::new(AtomicBool::new(false));
1268
1269        // Spawn output reading task (blocking read wrapped in spawn_blocking via channel)
1270        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
1271        let should_terminate_output = Arc::clone(&should_terminate);
1272        // Check if TUI is handling output (output_rx taken by handle())
1273        let tui_connected = self.tui_mode;
1274        let tui_output_tx = if tui_connected {
1275            Some(self.output_tx.clone())
1276        } else {
1277            None
1278        };
1279
1280        debug!("Spawning PTY output reader thread");
1281        std::thread::spawn(move || {
1282            debug!("PTY output reader thread started");
1283            let mut reader = reader;
1284            let mut buf = [0u8; 4096];
1285
1286            loop {
1287                if should_terminate_output.load(Ordering::SeqCst) {
1288                    debug!("PTY output reader: termination requested");
1289                    break;
1290                }
1291
1292                match reader.read(&mut buf) {
1293                    Ok(0) => {
1294                        // EOF - PTY closed
1295                        debug!("PTY output reader: EOF received");
1296                        let _ = output_tx.blocking_send(OutputEvent::Eof);
1297                        break;
1298                    }
1299                    Ok(n) => {
1300                        let data = buf[..n].to_vec();
1301                        // Send to TUI channel if connected
1302                        if let Some(ref tx) = tui_output_tx {
1303                            let _ = tx.send(data.clone());
1304                        }
1305                        // Send to main loop
1306                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
1307                            debug!("PTY output reader: channel closed");
1308                            break;
1309                        }
1310                    }
1311                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1312                        // Non-blocking mode: no data available, yield briefly
1313                        std::thread::sleep(Duration::from_millis(1));
1314                    }
1315                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {
1316                        // Interrupted by signal, retry
1317                    }
1318                    Err(e) => {
1319                        warn!("PTY output reader: error - {}", e);
1320                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
1321                        break;
1322                    }
1323                }
1324            }
1325            debug!("PTY output reader thread exiting");
1326        });
1327
1328        // Spawn input reading task - ONLY when TUI is NOT connected
1329        // In TUI mode (observation mode), user input should not be captured from stdin.
1330        // The TUI has its own input handling, and raw Ctrl+C should go directly to the
1331        // signal handler (interrupt_rx) without racing with the stdin reader.
1332        let mut input_rx = if tui_connected {
1333            debug!("TUI connected - skipping stdin reader thread");
1334            None
1335        } else {
1336            let (input_tx, input_rx) = mpsc::unbounded_channel::<InputEvent>();
1337            let should_terminate_input = Arc::clone(&should_terminate);
1338
1339            std::thread::spawn(move || {
1340                let mut stdin = io::stdin();
1341                let mut buf = [0u8; 1];
1342
1343                loop {
1344                    if should_terminate_input.load(Ordering::SeqCst) {
1345                        break;
1346                    }
1347
1348                    match stdin.read(&mut buf) {
1349                        Ok(0) => break, // EOF
1350                        Ok(1) => {
1351                            let byte = buf[0];
1352                            let event = match byte {
1353                                3 => InputEvent::CtrlC,          // Ctrl+C
1354                                28 => InputEvent::CtrlBackslash, // Ctrl+\
1355                                _ => InputEvent::Data(vec![byte]),
1356                            };
1357                            if input_tx.send(event).is_err() {
1358                                break;
1359                            }
1360                        }
1361                        Ok(_) => {} // Shouldn't happen with 1-byte buffer
1362                        Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
1363                        Err(_) => break,
1364                    }
1365                }
1366            });
1367            Some(input_rx)
1368        };
1369
1370        // Write stdin input after threads are spawned (so we capture any output)
1371        // Give Claude's TUI a moment to initialize before sending the prompt
1372        if let Some(ref input) = pending_stdin {
1373            tokio::time::sleep(Duration::from_millis(100)).await;
1374            writer.write_all(input.as_bytes())?;
1375            writer.write_all(b"\n")?;
1376            writer.flush()?;
1377            last_activity = Instant::now();
1378        }
1379
1380        // Main select loop - this is the key fix for blocking I/O
1381        // We use tokio::select! to multiplex between output, input, and timeout
1382        loop {
1383            // Check if child has exited (non-blocking check before select)
1384            if let Some(status) = child
1385                .try_wait()
1386                .map_err(|e| io::Error::other(e.to_string()))?
1387            {
1388                let exit_code = status.exit_code() as i32;
1389                debug!(exit_status = ?status, exit_code, "Child process exited");
1390
1391                // Drain remaining output already buffered.
1392                while let Ok(event) = output_rx.try_recv() {
1393                    if let OutputEvent::Data(data) = event {
1394                        if !tui_connected {
1395                            io::stdout().write_all(&data)?;
1396                            io::stdout().flush()?;
1397                        }
1398                        output.extend_from_slice(&data);
1399                    }
1400                }
1401
1402                // Give the reader thread a brief window to flush any final bytes/EOF.
1403                // This avoids races where fast-exiting commands drop output before we return.
1404                let drain_deadline = Instant::now() + Duration::from_millis(200);
1405                loop {
1406                    let remaining = drain_deadline.saturating_duration_since(Instant::now());
1407                    if remaining.is_zero() {
1408                        break;
1409                    }
1410                    match tokio::time::timeout(remaining, output_rx.recv()).await {
1411                        Ok(Some(OutputEvent::Data(data))) => {
1412                            if !tui_connected {
1413                                io::stdout().write_all(&data)?;
1414                                io::stdout().flush()?;
1415                            }
1416                            output.extend_from_slice(&data);
1417                        }
1418                        Ok(Some(OutputEvent::Eof) | None) => break,
1419                        Ok(Some(OutputEvent::Error(e))) => {
1420                            debug!(error = %e, "PTY read error after exit");
1421                            break;
1422                        }
1423                        Err(_) => break, // timeout
1424                    }
1425                }
1426
1427                should_terminate.store(true, Ordering::SeqCst);
1428                // Signal TUI that PTY has terminated
1429                let _ = self.terminated_tx.send(true);
1430
1431                let final_termination = resolve_termination_type(exit_code, termination);
1432                // run_interactive doesn't parse JSON, so extracted_text is empty
1433                return Ok(build_result(
1434                    &output,
1435                    status.success(),
1436                    Some(exit_code),
1437                    final_termination,
1438                    String::new(),
1439                    None,
1440                ));
1441            }
1442
1443            // Build the timeout future (or a never-completing one if disabled)
1444            let timeout_future = async {
1445                match timeout_duration {
1446                    Some(d) => {
1447                        let elapsed = last_activity.elapsed();
1448                        if elapsed >= d {
1449                            tokio::time::sleep(Duration::ZERO).await
1450                        } else {
1451                            tokio::time::sleep(d.saturating_sub(elapsed)).await
1452                        }
1453                    }
1454                    None => std::future::pending::<()>().await,
1455                }
1456            };
1457
1458            tokio::select! {
1459                // PTY output received
1460                output_event = output_rx.recv() => {
1461                    match output_event {
1462                        Some(OutputEvent::Data(data)) => {
1463                            // Only write to stdout if TUI is NOT handling output
1464                            if !tui_connected {
1465                                io::stdout().write_all(&data)?;
1466                                io::stdout().flush()?;
1467                            }
1468                            output.extend_from_slice(&data);
1469
1470                            last_activity = Instant::now();
1471                        }
1472                        Some(OutputEvent::Eof) => {
1473                            debug!("PTY EOF received");
1474                            break;
1475                        }
1476                        Some(OutputEvent::Error(e)) => {
1477                            debug!(error = %e, "PTY read error");
1478                            break;
1479                        }
1480                        None => {
1481                            // Channel closed, reader thread exited
1482                            break;
1483                        }
1484                    }
1485                }
1486
1487                // User input received (from stdin) - only active when TUI is NOT connected
1488                input_event = async {
1489                    match input_rx.as_mut() {
1490                        Some(rx) => rx.recv().await,
1491                        None => std::future::pending().await, // Never resolves when TUI is connected
1492                    }
1493                } => {
1494                    match input_event {
1495                        Some(InputEvent::CtrlC) => {
1496                            match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1497                                CtrlCAction::ForwardAndStartWindow => {
1498                                    // Forward Ctrl+C to Claude
1499                                    let _ = writer.write_all(&[3]);
1500                                    let _ = writer.flush();
1501                                    last_activity = Instant::now();
1502                                }
1503                                CtrlCAction::Terminate => {
1504                                    info!("Double Ctrl+C detected, terminating");
1505                                    termination = TerminationType::UserInterrupt;
1506                                    should_terminate.store(true, Ordering::SeqCst);
1507                                    self.terminate_child(&mut child, true).await?;
1508                                    break;
1509                                }
1510                            }
1511                        }
1512                        Some(InputEvent::CtrlBackslash) => {
1513                            info!("Ctrl+\\ detected, force killing");
1514                            termination = TerminationType::ForceKill;
1515                            should_terminate.store(true, Ordering::SeqCst);
1516                            self.terminate_child(&mut child, false).await?;
1517                            break;
1518                        }
1519                        Some(InputEvent::Data(data)) => {
1520                            // Forward to Claude
1521                            let _ = writer.write_all(&data);
1522                            let _ = writer.flush();
1523                            last_activity = Instant::now();
1524                        }
1525                        None => {
1526                            // Input channel closed (stdin EOF)
1527                            debug!("Input channel closed");
1528                        }
1529                    }
1530                }
1531
1532                // TUI input received (convert to InputEvent for unified handling)
1533                tui_input = self.input_rx.recv() => {
1534                    if let Some(data) = tui_input {
1535                        match InputEvent::from_bytes(data) {
1536                            InputEvent::CtrlC => {
1537                                match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1538                                    CtrlCAction::ForwardAndStartWindow => {
1539                                        let _ = writer.write_all(&[3]);
1540                                        let _ = writer.flush();
1541                                        last_activity = Instant::now();
1542                                    }
1543                                    CtrlCAction::Terminate => {
1544                                        info!("Double Ctrl+C detected, terminating");
1545                                        termination = TerminationType::UserInterrupt;
1546                                        should_terminate.store(true, Ordering::SeqCst);
1547                                        self.terminate_child(&mut child, true).await?;
1548                                        break;
1549                                    }
1550                                }
1551                            }
1552                            InputEvent::CtrlBackslash => {
1553                                info!("Ctrl+\\ detected, force killing");
1554                                termination = TerminationType::ForceKill;
1555                                should_terminate.store(true, Ordering::SeqCst);
1556                                self.terminate_child(&mut child, false).await?;
1557                                break;
1558                            }
1559                            InputEvent::Data(bytes) => {
1560                                let _ = writer.write_all(&bytes);
1561                                let _ = writer.flush();
1562                                last_activity = Instant::now();
1563                            }
1564                        }
1565                    }
1566                }
1567
1568                // Control commands from TUI
1569                control_cmd = self.control_rx.recv() => {
1570                    if let Some(cmd) = control_cmd {
1571                        use crate::pty_handle::ControlCommand;
1572                        match cmd {
1573                            ControlCommand::Kill => {
1574                                info!("Control command: Kill");
1575                                termination = TerminationType::UserInterrupt;
1576                                should_terminate.store(true, Ordering::SeqCst);
1577                                self.terminate_child(&mut child, true).await?;
1578                                break;
1579                            }
1580                            ControlCommand::Resize(cols, rows) => {
1581                                debug!(cols, rows, "Control command: Resize");
1582                                // Resize the PTY to match TUI dimensions
1583                                if let Err(e) = master.resize(PtySize {
1584                                    rows,
1585                                    cols,
1586                                    pixel_width: 0,
1587                                    pixel_height: 0,
1588                                }) {
1589                                    warn!("Failed to resize PTY: {}", e);
1590                                }
1591                            }
1592                            ControlCommand::Skip | ControlCommand::Abort => {
1593                                // These are handled at orchestrator level, not here
1594                                debug!("Control command: {:?} (ignored at PTY level)", cmd);
1595                            }
1596                        }
1597                    }
1598                }
1599
1600                // Idle timeout expired
1601                _ = timeout_future => {
1602                    warn!(
1603                        timeout_secs = self.config.idle_timeout_secs,
1604                        "Idle timeout triggered"
1605                    );
1606                    termination = TerminationType::IdleTimeout;
1607                    should_terminate.store(true, Ordering::SeqCst);
1608                    self.terminate_child(&mut child, true).await?;
1609                    break;
1610                }
1611
1612                // Interrupt signal from event loop
1613                _ = interrupt_rx.changed() => {
1614                    if *interrupt_rx.borrow() {
1615                        debug!("Interrupt received in interactive mode, terminating");
1616                        termination = TerminationType::UserInterrupt;
1617                        should_terminate.store(true, Ordering::SeqCst);
1618                        self.terminate_child(&mut child, true).await?;
1619                        break;
1620                    }
1621                }
1622            }
1623        }
1624
1625        // Ensure termination flag is set for spawned threads
1626        should_terminate.store(true, Ordering::SeqCst);
1627
1628        // Signal TUI that PTY has terminated
1629        let _ = self.terminated_tx.send(true);
1630
1631        // Wait for child to fully exit (interruptible + bounded)
1632        let status = self
1633            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1634            .await?;
1635
1636        let (success, exit_code, final_termination) = match status {
1637            Some(s) => {
1638                let code = s.exit_code() as i32;
1639                (
1640                    s.success(),
1641                    Some(code),
1642                    resolve_termination_type(code, termination),
1643                )
1644            }
1645            None => {
1646                warn!("Timed out waiting for child to exit after termination");
1647                (false, None, termination)
1648            }
1649        };
1650
1651        // run_interactive doesn't parse JSON, so extracted_text is empty
1652        Ok(build_result(
1653            &output,
1654            success,
1655            exit_code,
1656            final_termination,
1657            String::new(),
1658            None,
1659        ))
1660    }
1661
1662    /// Terminates the child process.
1663    ///
1664    /// If `graceful` is true, sends SIGTERM and waits up to 5 seconds before SIGKILL.
1665    /// If `graceful` is false, sends SIGKILL immediately.
1666    ///
1667    /// This is an async function to avoid blocking the tokio runtime during the
1668    /// grace period wait. Previously used `std::thread::sleep` which blocked the
1669    /// worker thread for up to 5 seconds, making the TUI appear frozen.
1670    #[allow(clippy::unused_self)] // Self is conceptually the right receiver for this method
1671    #[allow(clippy::unused_async)] // Kept async to preserve signature parity with Unix implementation
1672    #[cfg(not(unix))]
1673    async fn terminate_child(
1674        &self,
1675        child: &mut Box<dyn portable_pty::Child + Send>,
1676        _graceful: bool,
1677    ) -> io::Result<()> {
1678        child.kill()
1679    }
1680
1681    #[cfg(unix)]
1682    async fn terminate_child(
1683        &self,
1684        child: &mut Box<dyn portable_pty::Child + Send>,
1685        graceful: bool,
1686    ) -> io::Result<()> {
1687        let pid = match child.process_id() {
1688            Some(id) => Pid::from_raw(id as i32),
1689            None => return Ok(()), // Already exited
1690        };
1691
1692        if graceful {
1693            debug!(pid = %pid, "Sending SIGTERM");
1694            let _ = kill(pid, Signal::SIGTERM);
1695
1696            // Wait up to 5 seconds for graceful exit (reduced from 5s for better UX)
1697            let grace_period = Duration::from_secs(2);
1698            let start = Instant::now();
1699
1700            while start.elapsed() < grace_period {
1701                if child
1702                    .try_wait()
1703                    .map_err(|e| io::Error::other(e.to_string()))?
1704                    .is_some()
1705                {
1706                    return Ok(());
1707                }
1708                // Use async sleep to avoid blocking the tokio runtime
1709                tokio::time::sleep(Duration::from_millis(50)).await;
1710            }
1711
1712            // Still running after grace period - force kill
1713            debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1714        }
1715
1716        debug!(pid = %pid, "Sending SIGKILL");
1717        let _ = kill(pid, Signal::SIGKILL);
1718        Ok(())
1719    }
1720
1721    /// Waits for the child process to exit, optionally with a timeout.
1722    ///
1723    /// This is interruptible by the shared interrupt channel from the event loop.
1724    /// When interrupted, returns `Ok(None)` to let the caller handle termination.
1725    async fn wait_for_exit(
1726        &self,
1727        child: &mut Box<dyn portable_pty::Child + Send>,
1728        max_wait: Option<Duration>,
1729        interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1730    ) -> io::Result<Option<portable_pty::ExitStatus>> {
1731        let start = Instant::now();
1732
1733        loop {
1734            if let Some(status) = child
1735                .try_wait()
1736                .map_err(|e| io::Error::other(e.to_string()))?
1737            {
1738                return Ok(Some(status));
1739            }
1740
1741            if let Some(max) = max_wait
1742                && start.elapsed() >= max
1743            {
1744                return Ok(None);
1745            }
1746
1747            tokio::select! {
1748                _ = interrupt_rx.changed() => {
1749                    if *interrupt_rx.borrow() {
1750                        debug!("Interrupt received while waiting for child exit");
1751                        return Ok(None);
1752                    }
1753                }
1754                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1755            }
1756        }
1757    }
1758}
1759
1760fn handle_copilot_stream_line<H: StreamHandler>(
1761    line: &str,
1762    handler: &mut H,
1763    extracted_text: &mut String,
1764    copilot_state: &mut CopilotStreamState,
1765) -> Option<SessionResult> {
1766    let event = CopilotStreamParser::parse_line(line)?;
1767    dispatch_copilot_stream_event(event, handler, extracted_text, copilot_state)
1768}
1769
1770fn inject_ralph_runtime_env(cmd_builder: &mut CommandBuilder, workspace_root: &std::path::Path) {
1771    let Ok(current_exe) = env::current_exe() else {
1772        return;
1773    };
1774    let Some(bin_dir) = current_exe.parent() else {
1775        return;
1776    };
1777
1778    let mut path_entries = vec![bin_dir.to_path_buf()];
1779    if let Some(existing_path) = env::var_os("PATH") {
1780        path_entries.extend(env::split_paths(&existing_path));
1781    }
1782
1783    if let Ok(joined_path) = env::join_paths(path_entries) {
1784        cmd_builder.env("PATH", joined_path);
1785    }
1786    cmd_builder.env("RALPH_BIN", current_exe);
1787    cmd_builder.env("RALPH_WORKSPACE_ROOT", workspace_root);
1788    if std::path::Path::new("/var/tmp").is_dir() {
1789        cmd_builder.env("TMPDIR", "/var/tmp");
1790        cmd_builder.env("TMP", "/var/tmp");
1791        cmd_builder.env("TEMP", "/var/tmp");
1792    }
1793}
1794
1795/// Input events from the user.
1796#[derive(Debug)]
1797enum InputEvent {
1798    /// Ctrl+C pressed.
1799    CtrlC,
1800    /// Ctrl+\ pressed.
1801    CtrlBackslash,
1802    /// Regular data to forward.
1803    Data(Vec<u8>),
1804}
1805
1806impl InputEvent {
1807    /// Creates an InputEvent from raw bytes.
1808    fn from_bytes(data: Vec<u8>) -> Self {
1809        if data.len() == 1 {
1810            match data[0] {
1811                3 => return InputEvent::CtrlC,
1812                28 => return InputEvent::CtrlBackslash,
1813                _ => {}
1814            }
1815        }
1816        InputEvent::Data(data)
1817    }
1818}
1819
1820/// Output events from the PTY.
1821#[derive(Debug)]
1822enum OutputEvent {
1823    /// Data received from PTY.
1824    Data(Vec<u8>),
1825    /// PTY reached EOF (process exited).
1826    Eof,
1827    /// Error reading from PTY.
1828    Error(String),
1829}
1830
1831/// Strips ANSI escape sequences from raw bytes.
1832///
1833/// Uses `strip-ansi-escapes` for direct byte-level ANSI removal without terminal
1834/// emulation. This ensures ALL content is preserved regardless of output size,
1835/// unlike vt100's terminal simulation which can lose content that scrolls off.
1836fn strip_ansi(bytes: &[u8]) -> String {
1837    let stripped = strip_ansi_escapes::strip(bytes);
1838    String::from_utf8_lossy(&stripped).into_owned()
1839}
1840
1841/// Determines the final termination type, accounting for SIGINT exit code.
1842///
1843/// Exit code 130 indicates the process was killed by SIGINT (Ctrl+C forwarded to PTY).
1844fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1845    if exit_code == 130 {
1846        info!("Child process killed by SIGINT");
1847        TerminationType::UserInterrupt
1848    } else {
1849        default
1850    }
1851}
1852
1853fn extract_cli_flag_value(args: &[String], long_flag: &str, short_flag: &str) -> Option<String> {
1854    for (i, arg) in args.iter().enumerate() {
1855        if arg == long_flag || arg == short_flag {
1856            if let Some(value) = args.get(i + 1)
1857                && !value.starts_with('-')
1858            {
1859                return Some(value.clone());
1860            }
1861            continue;
1862        }
1863
1864        if let Some(value) = arg.strip_prefix(&format!("{long_flag}="))
1865            && !value.is_empty()
1866        {
1867            return Some(value.to_string());
1868        }
1869
1870        if let Some(value) = arg.strip_prefix(&format!("{short_flag}="))
1871            && !value.is_empty()
1872        {
1873            return Some(value.to_string());
1874        }
1875    }
1876
1877    None
1878}
1879
1880/// Dispatches a Claude stream event to the appropriate handler method.
1881/// Also accumulates text content into `extracted_text` for event parsing.
1882fn dispatch_stream_event<H: StreamHandler>(
1883    event: ClaudeStreamEvent,
1884    handler: &mut H,
1885    extracted_text: &mut String,
1886) {
1887    match event {
1888        ClaudeStreamEvent::System { .. } => {
1889            // Session initialization - could log in verbose mode but not user-facing
1890        }
1891        ClaudeStreamEvent::Assistant { message, .. } => {
1892            for block in message.content {
1893                match block {
1894                    ContentBlock::Text { text } => {
1895                        handler.on_text(&text);
1896                        // Accumulate text for event parsing
1897                        extracted_text.push_str(&text);
1898                        extracted_text.push('\n');
1899                    }
1900                    ContentBlock::ToolUse { name, id, input } => {
1901                        handler.on_tool_call(&name, &id, &input)
1902                    }
1903                }
1904            }
1905        }
1906        ClaudeStreamEvent::User { message } => {
1907            for block in message.content {
1908                match block {
1909                    UserContentBlock::ToolResult {
1910                        tool_use_id,
1911                        content,
1912                    } => {
1913                        handler.on_tool_result(&tool_use_id, &content);
1914                    }
1915                }
1916            }
1917        }
1918        ClaudeStreamEvent::Result {
1919            duration_ms,
1920            total_cost_usd,
1921            num_turns,
1922            is_error,
1923        } => {
1924            if is_error {
1925                handler.on_error("Session ended with error");
1926            }
1927            handler.on_complete(&SessionResult {
1928                duration_ms,
1929                total_cost_usd,
1930                num_turns,
1931                is_error,
1932                ..Default::default()
1933            });
1934        }
1935    }
1936}
1937
1938/// Builds a `PtyExecutionResult` from the accumulated output and exit status.
1939///
1940/// # Arguments
1941/// * `output` - Raw bytes from PTY
1942/// * `success` - Whether process exited successfully
1943/// * `exit_code` - Process exit code if available
1944/// * `termination` - How the process was terminated
1945/// * `extracted_text` - Text extracted from NDJSON stream (for Claude's stream-json)
1946fn build_result(
1947    output: &[u8],
1948    success: bool,
1949    exit_code: Option<i32>,
1950    termination: TerminationType,
1951    extracted_text: String,
1952    session_result: Option<&SessionResult>,
1953) -> PtyExecutionResult {
1954    let (total_cost_usd, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens) =
1955        if let Some(result) = session_result {
1956            (
1957                result.total_cost_usd,
1958                result.input_tokens,
1959                result.output_tokens,
1960                result.cache_read_tokens,
1961                result.cache_write_tokens,
1962            )
1963        } else {
1964            (0.0, 0, 0, 0, 0)
1965        };
1966
1967    PtyExecutionResult {
1968        output: String::from_utf8_lossy(output).to_string(),
1969        stripped_output: strip_ansi(output),
1970        extracted_text,
1971        success,
1972        exit_code,
1973        termination,
1974        total_cost_usd,
1975        input_tokens,
1976        output_tokens,
1977        cache_read_tokens,
1978        cache_write_tokens,
1979    }
1980}
1981
1982#[cfg(test)]
1983mod tests {
1984    use super::*;
1985    use crate::claude_stream::{AssistantMessage, UserMessage};
1986    #[cfg(unix)]
1987    use crate::cli_backend::PromptMode;
1988    use crate::stream_handler::{SessionResult, StreamHandler};
1989    #[cfg(unix)]
1990    use tempfile::TempDir;
1991
1992    #[test]
1993    fn test_double_ctrl_c_within_window() {
1994        let mut state = CtrlCState::new();
1995        let now = Instant::now();
1996
1997        // First Ctrl+C: should forward and start window
1998        let action = state.handle_ctrl_c(now);
1999        assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
2000
2001        // Second Ctrl+C within 1 second: should terminate
2002        let later = now + Duration::from_millis(500);
2003        let action = state.handle_ctrl_c(later);
2004        assert_eq!(action, CtrlCAction::Terminate);
2005    }
2006
2007    #[test]
2008    fn test_input_event_from_bytes_ctrl_c() {
2009        let event = InputEvent::from_bytes(vec![3]);
2010        assert!(matches!(event, InputEvent::CtrlC));
2011    }
2012
2013    #[test]
2014    fn test_input_event_from_bytes_ctrl_backslash() {
2015        let event = InputEvent::from_bytes(vec![28]);
2016        assert!(matches!(event, InputEvent::CtrlBackslash));
2017    }
2018
2019    #[test]
2020    fn test_input_event_from_bytes_data() {
2021        let event = InputEvent::from_bytes(vec![b'a']);
2022        assert!(matches!(event, InputEvent::Data(_)));
2023
2024        let event = InputEvent::from_bytes(vec![1, 2, 3]);
2025        assert!(matches!(event, InputEvent::Data(_)));
2026    }
2027
2028    #[test]
2029    fn test_ctrl_c_window_expires() {
2030        let mut state = CtrlCState::new();
2031        let now = Instant::now();
2032
2033        // First Ctrl+C
2034        state.handle_ctrl_c(now);
2035
2036        // Wait 2 seconds (window expires)
2037        let later = now + Duration::from_secs(2);
2038
2039        // Second Ctrl+C: window expired, should forward and start new window
2040        let action = state.handle_ctrl_c(later);
2041        assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
2042    }
2043
2044    #[test]
2045    fn test_strip_ansi_basic() {
2046        let input = b"\x1b[1;36m  Thinking...\x1b[0m\r\n";
2047        let stripped = strip_ansi(input);
2048        assert!(stripped.contains("Thinking..."));
2049        assert!(!stripped.contains("\x1b["));
2050    }
2051
2052    #[test]
2053    fn test_completion_promise_extraction() {
2054        // Simulate Claude output with heavy ANSI formatting
2055        let input = b"\x1b[1;36m  Thinking...\x1b[0m\r\n\
2056                      \x1b[2K\x1b[1;32m  Done!\x1b[0m\r\n\
2057                      \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
2058
2059        let stripped = strip_ansi(input);
2060
2061        // Event parser sees clean text
2062        assert!(stripped.contains("LOOP_COMPLETE"));
2063        assert!(!stripped.contains("\x1b["));
2064    }
2065
2066    #[test]
2067    fn test_event_tag_extraction() {
2068        // Event tags may be wrapped in ANSI codes
2069        let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
2070                      Task completed successfully\r\n\
2071                      \x1b[90m</event>\x1b[0m\r\n";
2072
2073        let stripped = strip_ansi(input);
2074
2075        assert!(stripped.contains("<event topic=\"build.done\">"));
2076        assert!(stripped.contains("</event>"));
2077    }
2078
2079    #[test]
2080    fn test_large_output_preserves_early_events() {
2081        // Regression test: ensure event tags aren't lost when output is large
2082        let mut input = Vec::new();
2083
2084        // Event tag at the beginning
2085        input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
2086
2087        // Simulate 500 lines of verbose output (would overflow any terminal)
2088        for i in 0..500 {
2089            input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
2090        }
2091
2092        let stripped = strip_ansi(&input);
2093
2094        // Event tag should still be present - no scrollback loss with strip-ansi-escapes
2095        assert!(
2096            stripped.contains("<event topic=\"build.task\">"),
2097            "Event tag was lost - strip_ansi is not preserving all content"
2098        );
2099        assert!(stripped.contains("Implement feature X"));
2100        assert!(stripped.contains("Line 499")); // Last line should be present too
2101    }
2102
2103    #[test]
2104    fn test_pty_config_defaults() {
2105        let config = PtyConfig::default();
2106        assert!(config.interactive);
2107        assert_eq!(config.idle_timeout_secs, 30);
2108        assert_eq!(config.cols, 80);
2109        assert_eq!(config.rows, 24);
2110    }
2111
2112    #[test]
2113    fn test_pty_config_from_env_matches_env_or_defaults() {
2114        let cols = std::env::var("COLUMNS")
2115            .ok()
2116            .and_then(|value| value.parse::<u16>().ok())
2117            .unwrap_or(80);
2118        let rows = std::env::var("LINES")
2119            .ok()
2120            .and_then(|value| value.parse::<u16>().ok())
2121            .unwrap_or(24);
2122
2123        let config = PtyConfig::from_env();
2124        assert_eq!(config.cols, cols);
2125        assert_eq!(config.rows, rows);
2126    }
2127
2128    /// Verifies that the idle timeout logic in run_interactive correctly handles
2129    /// activity resets. Per spec (interactive-mode.spec.md lines 155-159):
2130    /// - Timeout resets on agent output (any bytes from PTY)
2131    /// - Timeout resets on user input (any key forwarded to agent)
2132    ///
2133    /// This test validates the timeout calculation logic that enables resets.
2134    /// The actual reset happens in the select! branches at lines 497, 523, and 545.
2135    #[test]
2136    fn test_idle_timeout_reset_logic() {
2137        // Simulate the timeout calculation used in run_interactive
2138        let timeout_duration = Duration::from_secs(30);
2139
2140        // Simulate 25 seconds of inactivity
2141        let simulated_25s = Duration::from_secs(25);
2142
2143        // Remaining time before timeout
2144        let remaining = timeout_duration.saturating_sub(simulated_25s);
2145        assert_eq!(remaining.as_secs(), 5);
2146
2147        // After activity (output or input), last_activity would be reset to now
2148        let last_activity_after_reset = Instant::now();
2149
2150        // Now elapsed is 0, full timeout duration available again
2151        let elapsed = last_activity_after_reset.elapsed();
2152        assert!(elapsed < Duration::from_millis(100)); // Should be near-zero
2153
2154        // Timeout calculation would give full duration minus small elapsed
2155        let new_remaining = timeout_duration.saturating_sub(elapsed);
2156        assert!(new_remaining > Duration::from_secs(29)); // Should be nearly full timeout
2157    }
2158
2159    #[test]
2160    fn test_extracted_text_field_exists() {
2161        // Test that PtyExecutionResult has extracted_text field
2162        // This is for NDJSON output where event tags are inside JSON strings
2163        let result = PtyExecutionResult {
2164            output: String::new(),
2165            stripped_output: String::new(),
2166            extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
2167            success: true,
2168            exit_code: Some(0),
2169            termination: TerminationType::Natural,
2170            total_cost_usd: 0.0,
2171            input_tokens: 0,
2172            output_tokens: 0,
2173            cache_read_tokens: 0,
2174            cache_write_tokens: 0,
2175        };
2176
2177        assert!(
2178            result
2179                .extracted_text
2180                .contains("<event topic=\"build.done\">")
2181        );
2182    }
2183
2184    #[test]
2185    fn test_build_result_includes_extracted_text() {
2186        // Test that build_result properly handles extracted_text
2187        let output = b"raw output";
2188        let extracted = "extracted text with <event topic=\"test\">payload</event>";
2189        let result = build_result(
2190            output,
2191            true,
2192            Some(0),
2193            TerminationType::Natural,
2194            extracted.to_string(),
2195            None,
2196        );
2197
2198        assert_eq!(result.extracted_text, extracted);
2199        assert!(result.stripped_output.contains("raw output"));
2200    }
2201
2202    #[test]
2203    fn test_resolve_termination_type_handles_sigint_exit_code() {
2204        let termination = resolve_termination_type(130, TerminationType::Natural);
2205        assert_eq!(termination, TerminationType::UserInterrupt);
2206
2207        let termination = resolve_termination_type(0, TerminationType::ForceKill);
2208        assert_eq!(termination, TerminationType::ForceKill);
2209    }
2210
2211    #[test]
2212    fn test_extract_cli_flag_value_supports_split_and_equals_syntax() {
2213        let args = vec![
2214            "--provider".to_string(),
2215            "anthropic".to_string(),
2216            "--model=claude-sonnet-4".to_string(),
2217        ];
2218
2219        assert_eq!(
2220            extract_cli_flag_value(&args, "--provider", "-p"),
2221            Some("anthropic".to_string())
2222        );
2223        assert_eq!(
2224            extract_cli_flag_value(&args, "--model", "-m"),
2225            Some("claude-sonnet-4".to_string())
2226        );
2227        assert_eq!(extract_cli_flag_value(&args, "--foo", "-f"), None);
2228    }
2229
2230    #[derive(Default)]
2231    struct CapturingHandler {
2232        texts: Vec<String>,
2233        tool_calls: Vec<(String, String, serde_json::Value)>,
2234        tool_results: Vec<(String, String)>,
2235        errors: Vec<String>,
2236        completions: Vec<SessionResult>,
2237    }
2238
2239    impl StreamHandler for CapturingHandler {
2240        fn on_text(&mut self, text: &str) {
2241            self.texts.push(text.to_string());
2242        }
2243
2244        fn on_tool_call(&mut self, name: &str, id: &str, input: &serde_json::Value) {
2245            self.tool_calls
2246                .push((name.to_string(), id.to_string(), input.clone()));
2247        }
2248
2249        fn on_tool_result(&mut self, id: &str, output: &str) {
2250            self.tool_results.push((id.to_string(), output.to_string()));
2251        }
2252
2253        fn on_error(&mut self, error: &str) {
2254            self.errors.push(error.to_string());
2255        }
2256
2257        fn on_complete(&mut self, result: &SessionResult) {
2258            self.completions.push(result.clone());
2259        }
2260    }
2261
2262    #[test]
2263    fn test_dispatch_stream_event_routes_text_and_tool_calls() {
2264        let mut handler = CapturingHandler::default();
2265        let mut extracted_text = String::new();
2266
2267        let event = ClaudeStreamEvent::Assistant {
2268            message: AssistantMessage {
2269                content: vec![
2270                    ContentBlock::Text {
2271                        text: "Hello".to_string(),
2272                    },
2273                    ContentBlock::ToolUse {
2274                        id: "tool-1".to_string(),
2275                        name: "Read".to_string(),
2276                        input: serde_json::json!({"path": "README.md"}),
2277                    },
2278                ],
2279            },
2280            usage: None,
2281        };
2282
2283        dispatch_stream_event(event, &mut handler, &mut extracted_text);
2284
2285        assert_eq!(handler.texts, vec!["Hello".to_string()]);
2286        assert_eq!(handler.tool_calls.len(), 1);
2287        assert!(extracted_text.contains("Hello"));
2288        assert!(extracted_text.ends_with('\n'));
2289    }
2290
2291    #[test]
2292    fn test_dispatch_stream_event_routes_tool_results_and_completion() {
2293        let mut handler = CapturingHandler::default();
2294        let mut extracted_text = String::new();
2295
2296        let event = ClaudeStreamEvent::User {
2297            message: UserMessage {
2298                content: vec![UserContentBlock::ToolResult {
2299                    tool_use_id: "tool-1".to_string(),
2300                    content: "done".to_string(),
2301                }],
2302            },
2303        };
2304
2305        dispatch_stream_event(event, &mut handler, &mut extracted_text);
2306        assert_eq!(handler.tool_results.len(), 1);
2307        assert_eq!(handler.tool_results[0].0, "tool-1");
2308        assert_eq!(handler.tool_results[0].1, "done");
2309
2310        let event = ClaudeStreamEvent::Result {
2311            duration_ms: 12,
2312            total_cost_usd: 0.01,
2313            num_turns: 2,
2314            is_error: true,
2315        };
2316
2317        dispatch_stream_event(event, &mut handler, &mut extracted_text);
2318        assert_eq!(handler.errors.len(), 1);
2319        assert_eq!(handler.completions.len(), 1);
2320        assert!(handler.completions[0].is_error);
2321    }
2322
2323    #[test]
2324    fn test_dispatch_stream_event_system_noop() {
2325        let mut handler = CapturingHandler::default();
2326        let mut extracted_text = String::new();
2327
2328        let event = ClaudeStreamEvent::System {
2329            session_id: "session-1".to_string(),
2330            model: "claude-test".to_string(),
2331            tools: Vec::new(),
2332        };
2333
2334        dispatch_stream_event(event, &mut handler, &mut extracted_text);
2335
2336        assert!(handler.texts.is_empty());
2337        assert!(handler.tool_calls.is_empty());
2338        assert!(handler.tool_results.is_empty());
2339        assert!(handler.errors.is_empty());
2340        assert!(handler.completions.is_empty());
2341        assert!(extracted_text.is_empty());
2342    }
2343
2344    /// Regression test: TUI mode should not spawn stdin reader thread
2345    ///
2346    /// Bug: In TUI mode, Ctrl+C required double-press to exit because the stdin
2347    /// reader thread (which captures byte 0x03) raced with the signal handler.
2348    /// The stdin reader would win, triggering "double Ctrl+C" logic instead of
2349    /// clean exit via interrupt_rx.
2350    ///
2351    /// Fix: When tui_connected=true, skip spawning stdin reader entirely.
2352    /// TUI mode is observation-only; user input should not be captured from stdin.
2353    /// The TUI has its own input handling (Ctrl+a q), and raw Ctrl+C goes directly
2354    /// to the signal handler (interrupt_rx) without racing.
2355    ///
2356    /// This test documents the expected behavior. The actual fix is in
2357    /// run_interactive() where `let mut input_rx = if !tui_connected { ... }`.
2358    #[test]
2359    fn test_tui_mode_stdin_reader_bypass() {
2360        // The tui_connected flag is now determined by the explicit tui_mode field,
2361        // set via set_tui_mode(true) when TUI is connected.
2362        // Previously used output_rx.is_none() which broke after streaming refactor.
2363
2364        // Simulate TUI connected scenario (tui_mode = true)
2365        let tui_mode = true;
2366        let tui_connected = tui_mode;
2367
2368        // When TUI is connected, stdin reader is skipped
2369        // (verified by: input_rx becomes None instead of Some(channel))
2370        assert!(
2371            tui_connected,
2372            "When tui_mode is true, stdin reader must be skipped"
2373        );
2374
2375        // In non-TUI mode, stdin reader is spawned
2376        let tui_mode_disabled = false;
2377        let tui_connected_non_tui = tui_mode_disabled;
2378        assert!(
2379            !tui_connected_non_tui,
2380            "When tui_mode is false, stdin reader must be spawned"
2381        );
2382    }
2383
2384    #[test]
2385    fn test_tui_mode_default_is_false() {
2386        // Create a PtyExecutor and verify tui_mode defaults to false
2387        let backend = CliBackend::claude();
2388        let config = PtyConfig::default();
2389        let executor = PtyExecutor::new(backend, config);
2390
2391        // tui_mode should default to false
2392        assert!(!executor.tui_mode, "tui_mode should default to false");
2393    }
2394
2395    #[test]
2396    fn test_set_tui_mode() {
2397        // Create a PtyExecutor and verify set_tui_mode works
2398        let backend = CliBackend::claude();
2399        let config = PtyConfig::default();
2400        let mut executor = PtyExecutor::new(backend, config);
2401
2402        // Initially false
2403        assert!(!executor.tui_mode, "tui_mode should start as false");
2404
2405        // Set to true
2406        executor.set_tui_mode(true);
2407        assert!(
2408            executor.tui_mode,
2409            "tui_mode should be true after set_tui_mode(true)"
2410        );
2411
2412        // Set back to false
2413        executor.set_tui_mode(false);
2414        assert!(
2415            !executor.tui_mode,
2416            "tui_mode should be false after set_tui_mode(false)"
2417        );
2418    }
2419
2420    #[test]
2421    fn test_build_result_populates_fields() {
2422        let output = b"\x1b[31mHello\x1b[0m\n";
2423        let extracted = "extracted text".to_string();
2424
2425        let result = build_result(
2426            output,
2427            true,
2428            Some(0),
2429            TerminationType::Natural,
2430            extracted.clone(),
2431            None,
2432        );
2433
2434        assert_eq!(result.output, String::from_utf8_lossy(output));
2435        assert!(result.stripped_output.contains("Hello"));
2436        assert!(!result.stripped_output.contains("\x1b["));
2437        assert_eq!(result.extracted_text, extracted);
2438        assert!(result.success);
2439        assert_eq!(result.exit_code, Some(0));
2440        assert_eq!(result.termination, TerminationType::Natural);
2441    }
2442
2443    #[cfg(unix)]
2444    #[tokio::test]
2445    async fn test_run_observe_executes_arg_prompt() {
2446        let temp_dir = TempDir::new().expect("temp dir");
2447        let backend = CliBackend {
2448            command: "sh".to_string(),
2449            args: vec!["-c".to_string()],
2450            prompt_mode: PromptMode::Arg,
2451            prompt_flag: None,
2452            output_format: OutputFormat::Text,
2453            env_vars: vec![],
2454        };
2455        let config = PtyConfig {
2456            interactive: false,
2457            idle_timeout_secs: 0,
2458            cols: 80,
2459            rows: 24,
2460            workspace_root: temp_dir.path().to_path_buf(),
2461        };
2462        let executor = PtyExecutor::new(backend, config);
2463        let (_tx, rx) = tokio::sync::watch::channel(false);
2464
2465        let result = executor
2466            .run_observe("echo hello-pty", rx)
2467            .await
2468            .expect("run_observe");
2469
2470        assert!(result.success);
2471        assert!(result.output.contains("hello-pty"));
2472        assert!(result.stripped_output.contains("hello-pty"));
2473        assert_eq!(result.exit_code, Some(0));
2474        assert_eq!(result.termination, TerminationType::Natural);
2475    }
2476
2477    #[cfg(unix)]
2478    #[tokio::test]
2479    async fn test_run_observe_writes_stdin_prompt() {
2480        let temp_dir = TempDir::new().expect("temp dir");
2481        let backend = CliBackend {
2482            command: "sh".to_string(),
2483            args: vec!["-c".to_string(), "read line; echo \"$line\"".to_string()],
2484            prompt_mode: PromptMode::Stdin,
2485            prompt_flag: None,
2486            output_format: OutputFormat::Text,
2487            env_vars: vec![],
2488        };
2489        let config = PtyConfig {
2490            interactive: false,
2491            idle_timeout_secs: 0,
2492            cols: 80,
2493            rows: 24,
2494            workspace_root: temp_dir.path().to_path_buf(),
2495        };
2496        let executor = PtyExecutor::new(backend, config);
2497        let (_tx, rx) = tokio::sync::watch::channel(false);
2498
2499        let result = executor
2500            .run_observe("stdin-line", rx)
2501            .await
2502            .expect("run_observe");
2503
2504        assert!(result.success);
2505        assert!(result.output.contains("stdin-line"));
2506        assert!(result.stripped_output.contains("stdin-line"));
2507        assert_eq!(result.termination, TerminationType::Natural);
2508    }
2509
2510    /// Regression test for #280: large stdin-mode prompts deadlocked the PTY
2511    /// because the PTY line discipline limits canonical input to ~4KB. The fix
2512    /// converts stdin-mode to arg-mode in non-interactive PTY execution via
2513    /// `build_command_pty`, so the prompt is passed as a command argument
2514    /// (with temp file for very large prompts) instead of through PTY stdin.
2515    #[cfg(unix)]
2516    #[tokio::test]
2517    async fn test_pty_converts_stdin_to_arg_for_large_prompt() {
2518        let _temp_dir = TempDir::new().expect("temp dir");
2519        let backend = CliBackend {
2520            command: "echo".to_string(),
2521            args: vec![],
2522            prompt_mode: PromptMode::Stdin,
2523            prompt_flag: Some("-p".to_string()),
2524            output_format: OutputFormat::Text,
2525            env_vars: vec![],
2526        };
2527
2528        // Verify build_command_pty converts stdin to arg mode
2529        let large_prompt = "x".repeat(32_000);
2530        let (cmd, args, stdin_input, temp_file) = backend.build_command_pty(&large_prompt);
2531        assert_eq!(cmd, "echo");
2532        // stdin_input should be None (converted to arg mode)
2533        assert!(stdin_input.is_none(), "PTY mode should not use stdin");
2534        // Large prompt should use temp file
2535        assert!(temp_file.is_some(), "Large prompt should use temp file");
2536        // Args should contain the temp file instruction
2537        assert!(
2538            args.iter().any(|a| a.contains("Please read and execute")),
2539            "args should contain temp file instruction: {:?}",
2540            args
2541        );
2542
2543        // Also verify a small prompt goes directly as arg
2544        let small_prompt = "hello world";
2545        let (_, args, stdin_input, temp_file) = backend.build_command_pty(small_prompt);
2546        assert!(stdin_input.is_none());
2547        assert!(temp_file.is_none());
2548        assert!(args.iter().any(|a| a == small_prompt));
2549    }
2550
2551    /// Verify that PTY execution with stdin-mode backend completes without
2552    /// deadlock by confirming the prompt is delivered via arg mode.
2553    #[cfg(unix)]
2554    #[tokio::test]
2555    async fn test_run_observe_large_stdin_backend_does_not_deadlock() {
2556        let temp_dir = TempDir::new().expect("temp dir");
2557        // Use echo which just prints its args — confirms the prompt arrives via
2558        // arg mode (not stdin) in PTY context.
2559        let backend = CliBackend {
2560            command: "echo".to_string(),
2561            args: vec![],
2562            prompt_mode: PromptMode::Stdin,
2563            prompt_flag: Some("-p".to_string()),
2564            output_format: OutputFormat::Text,
2565            env_vars: vec![],
2566        };
2567        let config = PtyConfig {
2568            interactive: false,
2569            idle_timeout_secs: 0,
2570            cols: 32768,
2571            rows: 24,
2572            workspace_root: temp_dir.path().to_path_buf(),
2573        };
2574        let executor = PtyExecutor::new(backend, config);
2575        let (_tx, rx) = tokio::sync::watch::channel(false);
2576
2577        let large_prompt = "x".repeat(32_000);
2578
2579        // Before the fix, this would hang forever with stdin-mode backends.
2580        let result = tokio::time::timeout(
2581            std::time::Duration::from_secs(5),
2582            executor.run_observe(&large_prompt, rx),
2583        )
2584        .await
2585        .expect("should not deadlock")
2586        .expect("run_observe");
2587
2588        assert!(result.success);
2589        // echo should have printed the temp file instruction
2590        assert!(
2591            result.output.contains("Please read and execute"),
2592            "output should contain temp file instruction: {}",
2593            &result.output[..result.output.len().min(200)]
2594        );
2595    }
2596
2597    #[cfg(unix)]
2598    #[tokio::test]
2599    async fn test_run_observe_streaming_text_routes_output() {
2600        let temp_dir = TempDir::new().expect("temp dir");
2601        let backend = CliBackend {
2602            command: "sh".to_string(),
2603            args: vec!["-c".to_string()],
2604            prompt_mode: PromptMode::Arg,
2605            prompt_flag: None,
2606            output_format: OutputFormat::Text,
2607            env_vars: vec![],
2608        };
2609        let config = PtyConfig {
2610            interactive: false,
2611            idle_timeout_secs: 0,
2612            cols: 80,
2613            rows: 24,
2614            workspace_root: temp_dir.path().to_path_buf(),
2615        };
2616        let executor = PtyExecutor::new(backend, config);
2617        let (_tx, rx) = tokio::sync::watch::channel(false);
2618        let mut handler = CapturingHandler::default();
2619
2620        let result = executor
2621            .run_observe_streaming("printf 'alpha\\nbeta\\n'", rx, &mut handler)
2622            .await
2623            .expect("run_observe_streaming");
2624
2625        assert!(result.success);
2626        let captured = handler.texts.join("");
2627        assert!(captured.contains("alpha"), "captured: {captured}");
2628        assert!(captured.contains("beta"), "captured: {captured}");
2629        assert!(handler.completions.is_empty());
2630        assert!(result.extracted_text.is_empty());
2631    }
2632
2633    #[cfg(unix)]
2634    #[tokio::test]
2635    async fn test_run_observe_streaming_parses_stream_json() {
2636        let temp_dir = TempDir::new().expect("temp dir");
2637        let backend = CliBackend {
2638            command: "sh".to_string(),
2639            args: vec!["-c".to_string()],
2640            prompt_mode: PromptMode::Arg,
2641            prompt_flag: None,
2642            output_format: OutputFormat::StreamJson,
2643            env_vars: vec![],
2644        };
2645        let config = PtyConfig {
2646            interactive: false,
2647            idle_timeout_secs: 0,
2648            cols: 80,
2649            rows: 24,
2650            workspace_root: temp_dir.path().to_path_buf(),
2651        };
2652        let executor = PtyExecutor::new(backend, config);
2653        let (_tx, rx) = tokio::sync::watch::channel(false);
2654        let mut handler = CapturingHandler::default();
2655
2656        let script = r#"printf '%s\n' '{"type":"assistant","message":{"content":[{"type":"text","text":"Hello stream"}]}}' '{"type":"result","duration_ms":1,"total_cost_usd":0.0,"num_turns":1,"is_error":false}'"#;
2657        let result = executor
2658            .run_observe_streaming(script, rx, &mut handler)
2659            .await
2660            .expect("run_observe_streaming");
2661
2662        assert!(result.success);
2663        assert!(
2664            handler
2665                .texts
2666                .iter()
2667                .any(|text| text.contains("Hello stream"))
2668        );
2669        assert_eq!(handler.completions.len(), 1);
2670        assert!(result.extracted_text.contains("Hello stream"));
2671        assert_eq!(result.termination, TerminationType::Natural);
2672    }
2673
2674    #[cfg(unix)]
2675    #[tokio::test]
2676    async fn test_run_interactive_in_tui_mode() {
2677        let temp_dir = TempDir::new().expect("temp dir");
2678        let backend = CliBackend {
2679            command: "sh".to_string(),
2680            args: vec!["-c".to_string()],
2681            prompt_mode: PromptMode::Arg,
2682            prompt_flag: None,
2683            output_format: OutputFormat::Text,
2684            env_vars: vec![],
2685        };
2686        let config = PtyConfig {
2687            interactive: true,
2688            idle_timeout_secs: 0,
2689            cols: 80,
2690            rows: 24,
2691            workspace_root: temp_dir.path().to_path_buf(),
2692        };
2693        let mut executor = PtyExecutor::new(backend, config);
2694        executor.set_tui_mode(true);
2695        let (_tx, rx) = tokio::sync::watch::channel(false);
2696
2697        let result = executor
2698            .run_interactive("echo hello-tui", rx)
2699            .await
2700            .expect("run_interactive");
2701
2702        assert!(result.success);
2703        assert!(result.output.contains("hello-tui"));
2704        assert!(result.stripped_output.contains("hello-tui"));
2705        assert_eq!(result.exit_code, Some(0));
2706        assert_eq!(result.termination, TerminationType::Natural);
2707    }
2708}