Skip to main content

ralph_adapters/
pty_executor.rs

1//! PTY executor for running prompts with full terminal emulation.
2//!
3//! Spawns CLI tools in a pseudo-terminal to preserve rich TUI features like
4//! colors, spinners, and animations. Supports both interactive mode (user
5//! input forwarded) and observe mode (output-only).
6//!
7//! Key features:
8//! - PTY creation via `portable-pty` for cross-platform support
9//! - Idle timeout with activity tracking (output AND input reset timer)
10//! - Double Ctrl+C handling (first forwards, second terminates)
11//! - Raw mode management with cleanup on exit/crash
12//!
13//! Architecture:
14//! - Uses `tokio::select!` for non-blocking I/O multiplexing
15//! - Spawns separate tasks for PTY output and user input
16//! - Enables responsive Ctrl+C handling even when PTY is idle
17
18// Exit codes and PIDs are always within i32 range in practice
19#![allow(clippy::cast_possible_wrap)]
20
21use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
22use crate::cli_backend::{CliBackend, OutputFormat};
23use crate::stream_handler::{SessionResult, StreamHandler};
24#[cfg(unix)]
25use nix::sys::signal::{Signal, kill};
26#[cfg(unix)]
27use nix::unistd::Pid;
28use portable_pty::{CommandBuilder, PtyPair, PtySize, native_pty_system};
29use std::io::{self, Read, Write};
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use std::time::{Duration, Instant};
33use tokio::sync::{mpsc, watch};
34use tracing::{debug, info, warn};
35
36/// Result of a PTY execution.
37#[derive(Debug)]
38pub struct PtyExecutionResult {
39    /// The accumulated output (ANSI sequences preserved).
40    pub output: String,
41    /// The ANSI-stripped output for event parsing.
42    pub stripped_output: String,
43    /// Extracted text content from NDJSON stream (for Claude's stream-json output).
44    /// When Claude outputs `--output-format stream-json`, event tags like
45    /// `<event topic="...">` are inside JSON string values. This field contains
46    /// the extracted text content for proper event parsing.
47    /// Empty for non-JSON backends (use `stripped_output` instead).
48    pub extracted_text: String,
49    /// Whether the process exited successfully.
50    pub success: bool,
51    /// The exit code if available.
52    pub exit_code: Option<i32>,
53    /// How the process was terminated.
54    pub termination: TerminationType,
55}
56
57/// How the PTY process was terminated.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum TerminationType {
60    /// Process exited naturally.
61    Natural,
62    /// Terminated due to idle timeout.
63    IdleTimeout,
64    /// Terminated by user (double Ctrl+C).
65    UserInterrupt,
66    /// Force killed by user (Ctrl+\).
67    ForceKill,
68}
69
70/// Configuration for PTY execution.
71#[derive(Debug, Clone)]
72pub struct PtyConfig {
73    /// Enable interactive mode (forward user input).
74    pub interactive: bool,
75    /// Idle timeout in seconds (0 = disabled).
76    pub idle_timeout_secs: u32,
77    /// Terminal width.
78    pub cols: u16,
79    /// Terminal height.
80    pub rows: u16,
81    /// Workspace root directory for command execution.
82    /// This is captured at startup to avoid `current_dir()` failures when the
83    /// working directory no longer exists (e.g., in E2E test workspaces).
84    pub workspace_root: std::path::PathBuf,
85}
86
87impl Default for PtyConfig {
88    fn default() -> Self {
89        Self {
90            interactive: true,
91            idle_timeout_secs: 30,
92            cols: 80,
93            rows: 24,
94            workspace_root: std::env::current_dir()
95                .unwrap_or_else(|_| std::path::PathBuf::from(".")),
96        }
97    }
98}
99
100impl PtyConfig {
101    /// Creates config from environment, falling back to defaults.
102    pub fn from_env() -> Self {
103        let cols = std::env::var("COLUMNS")
104            .ok()
105            .and_then(|s| s.parse().ok())
106            .unwrap_or(80);
107        let rows = std::env::var("LINES")
108            .ok()
109            .and_then(|s| s.parse().ok())
110            .unwrap_or(24);
111
112        Self {
113            cols,
114            rows,
115            ..Default::default()
116        }
117    }
118
119    /// Sets the workspace root directory.
120    pub fn with_workspace_root(mut self, root: impl Into<std::path::PathBuf>) -> Self {
121        self.workspace_root = root.into();
122        self
123    }
124}
125
126/// State machine for double Ctrl+C detection.
127#[derive(Debug)]
128pub struct CtrlCState {
129    /// When the first Ctrl+C was pressed (if any).
130    first_press: Option<Instant>,
131    /// Window duration for double-press detection.
132    window: Duration,
133}
134
135/// Action to take after handling Ctrl+C.
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum CtrlCAction {
138    /// Forward the Ctrl+C to Claude and start/restart the window.
139    ForwardAndStartWindow,
140    /// Terminate Claude (second Ctrl+C within window).
141    Terminate,
142}
143
144impl CtrlCState {
145    /// Creates a new Ctrl+C state tracker.
146    pub fn new() -> Self {
147        Self {
148            first_press: None,
149            window: Duration::from_secs(1),
150        }
151    }
152
153    /// Handles a Ctrl+C keypress and returns the action to take.
154    pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
155        match self.first_press {
156            Some(first) if now.duration_since(first) < self.window => {
157                // Second Ctrl+C within window - terminate
158                self.first_press = None;
159                CtrlCAction::Terminate
160            }
161            _ => {
162                // First Ctrl+C or window expired - forward and start window
163                self.first_press = Some(now);
164                CtrlCAction::ForwardAndStartWindow
165            }
166        }
167    }
168}
169
170impl Default for CtrlCState {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176/// Executor for running prompts in a pseudo-terminal.
177pub struct PtyExecutor {
178    backend: CliBackend,
179    config: PtyConfig,
180    // Channel ends for TUI integration
181    output_tx: mpsc::UnboundedSender<Vec<u8>>,
182    output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
183    input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
184    input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
185    control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
186    control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
187    // Termination notification for TUI
188    terminated_tx: watch::Sender<bool>,
189    terminated_rx: Option<watch::Receiver<bool>>,
190    // Explicit TUI mode flag - set via set_tui_mode() when TUI is connected.
191    // This replaces the previous inference via output_rx.is_none() which broke
192    // after the streaming refactor (handle() is no longer called in TUI mode).
193    tui_mode: bool,
194}
195
196impl PtyExecutor {
197    /// Creates a new PTY executor with the given backend and configuration.
198    pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
199        let (output_tx, output_rx) = mpsc::unbounded_channel();
200        let (input_tx, input_rx) = mpsc::unbounded_channel();
201        let (control_tx, control_rx) = mpsc::unbounded_channel();
202        let (terminated_tx, terminated_rx) = watch::channel(false);
203
204        Self {
205            backend,
206            config,
207            output_tx,
208            output_rx: Some(output_rx),
209            input_tx: Some(input_tx),
210            input_rx,
211            control_tx: Some(control_tx),
212            control_rx,
213            terminated_tx,
214            terminated_rx: Some(terminated_rx),
215            tui_mode: false,
216        }
217    }
218
219    /// Sets the TUI mode flag.
220    ///
221    /// When TUI mode is enabled, PTY output is sent to the TUI channel instead of
222    /// being written directly to stdout. This flag must be set before calling any
223    /// of the run methods when using the TUI.
224    ///
225    /// # Arguments
226    /// * `enabled` - Whether TUI mode should be active
227    pub fn set_tui_mode(&mut self, enabled: bool) {
228        self.tui_mode = enabled;
229    }
230
231    /// Updates the backend configuration for this executor.
232    ///
233    /// This allows switching backends between iterations without recreating
234    /// the entire executor. Critical for hat-level backend configuration support.
235    ///
236    /// # Arguments
237    /// * `backend` - The new backend configuration to use
238    pub fn set_backend(&mut self, backend: CliBackend) {
239        self.backend = backend;
240    }
241
242    /// Returns a handle for TUI integration.
243    ///
244    /// Can only be called once - panics if called multiple times.
245    pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
246        crate::pty_handle::PtyHandle {
247            output_rx: self.output_rx.take().expect("handle() already called"),
248            input_tx: self.input_tx.take().expect("handle() already called"),
249            control_tx: self.control_tx.take().expect("handle() already called"),
250            terminated_rx: self.terminated_rx.take().expect("handle() already called"),
251        }
252    }
253
254    /// Spawns Claude in a PTY and returns the PTY pair, child process, stdin input, and temp file.
255    ///
256    /// The temp file is returned to keep it alive for the duration of execution.
257    /// For large prompts (>7000 chars), Claude is instructed to read from a temp file.
258    /// If the temp file is dropped before Claude reads it, the file is deleted and Claude hangs.
259    ///
260    /// The stdin_input is returned so callers can write it to the PTY after taking the writer.
261    /// This is necessary because `take_writer()` can only be called once per PTY.
262    fn spawn_pty(
263        &self,
264        prompt: &str,
265    ) -> io::Result<(
266        PtyPair,
267        Box<dyn portable_pty::Child + Send>,
268        Option<String>,
269        Option<tempfile::NamedTempFile>,
270    )> {
271        let pty_system = native_pty_system();
272
273        let pair = pty_system
274            .openpty(PtySize {
275                rows: self.config.rows,
276                cols: self.config.cols,
277                pixel_width: 0,
278                pixel_height: 0,
279            })
280            .map_err(|e| io::Error::other(e.to_string()))?;
281
282        let (cmd, args, stdin_input, temp_file) =
283            self.backend.build_command(prompt, self.config.interactive);
284
285        let mut cmd_builder = CommandBuilder::new(&cmd);
286        cmd_builder.args(&args);
287
288        // Set explicit working directory from config (captured at startup to avoid
289        // current_dir() failures when workspace no longer exists)
290        cmd_builder.cwd(&self.config.workspace_root);
291
292        // Set up environment for PTY
293        cmd_builder.env("TERM", "xterm-256color");
294        let child = pair
295            .slave
296            .spawn_command(cmd_builder)
297            .map_err(|e| io::Error::other(e.to_string()))?;
298
299        // Return stdin_input so callers can write it after taking the writer
300        Ok((pair, child, stdin_input, temp_file))
301    }
302
303    /// Runs in observe mode (output-only, no input forwarding).
304    ///
305    /// This is an async function that listens for interrupt signals via the shared
306    /// `interrupt_rx` watch channel from the event loop.
307    /// Uses a separate thread for blocking PTY reads and tokio::select! for signal handling.
308    ///
309    /// Returns when the process exits, idle timeout triggers, or interrupt is received.
310    ///
311    /// # Arguments
312    /// * `prompt` - The prompt to execute
313    /// * `interrupt_rx` - Watch channel receiver for interrupt signals from the event loop
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if PTY allocation fails, the command cannot be spawned,
318    /// or an I/O error occurs during output handling.
319    pub async fn run_observe(
320        &self,
321        prompt: &str,
322        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
323    ) -> io::Result<PtyExecutionResult> {
324        // Keep temp_file alive for the duration of execution (large prompts use temp files)
325        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
326
327        let reader = pair
328            .master
329            .try_clone_reader()
330            .map_err(|e| io::Error::other(e.to_string()))?;
331
332        // Write stdin input if present (for stdin prompt mode)
333        if let Some(ref input) = stdin_input {
334            // Small delay to let process initialize
335            tokio::time::sleep(Duration::from_millis(100)).await;
336            let mut writer = pair
337                .master
338                .take_writer()
339                .map_err(|e| io::Error::other(e.to_string()))?;
340            writer.write_all(input.as_bytes())?;
341            writer.write_all(b"\n")?;
342            writer.flush()?;
343        }
344
345        // Drop the slave to signal EOF when master closes
346        drop(pair.slave);
347
348        let mut output = Vec::new();
349        let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
350            None
351        } else {
352            Some(Duration::from_secs(u64::from(
353                self.config.idle_timeout_secs,
354            )))
355        };
356
357        let mut termination = TerminationType::Natural;
358        let mut last_activity = Instant::now();
359
360        // Flag for termination request (shared with reader thread)
361        let should_terminate = Arc::new(AtomicBool::new(false));
362
363        // Spawn blocking reader thread that sends output via channel
364        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
365        let should_terminate_reader = Arc::clone(&should_terminate);
366        // Check if TUI is handling output (output_rx taken by handle())
367        let tui_connected = self.tui_mode;
368        let tui_output_tx = if tui_connected {
369            Some(self.output_tx.clone())
370        } else {
371            None
372        };
373
374        debug!("Spawning PTY output reader thread (observe mode)");
375        std::thread::spawn(move || {
376            let mut reader = reader;
377            let mut buf = [0u8; 4096];
378
379            loop {
380                if should_terminate_reader.load(Ordering::SeqCst) {
381                    debug!("PTY reader: termination requested");
382                    break;
383                }
384
385                match reader.read(&mut buf) {
386                    Ok(0) => {
387                        debug!("PTY reader: EOF");
388                        let _ = output_tx.blocking_send(OutputEvent::Eof);
389                        break;
390                    }
391                    Ok(n) => {
392                        let data = buf[..n].to_vec();
393                        // Send to TUI channel if connected
394                        if let Some(ref tx) = tui_output_tx {
395                            let _ = tx.send(data.clone());
396                        }
397                        // Send to main loop
398                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
399                            break;
400                        }
401                    }
402                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
403                        std::thread::sleep(Duration::from_millis(10));
404                    }
405                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
406                    Err(e) => {
407                        debug!(error = %e, "PTY reader error");
408                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
409                        break;
410                    }
411                }
412            }
413        });
414
415        // Main event loop using tokio::select! for interruptibility
416        loop {
417            // Calculate timeout for idle check
418            let idle_timeout = timeout_duration.map(|d| {
419                let elapsed = last_activity.elapsed();
420                if elapsed >= d {
421                    Duration::from_millis(1) // Trigger immediately
422                } else {
423                    d.saturating_sub(elapsed)
424                }
425            });
426
427            tokio::select! {
428                // Check for interrupt signal from event loop
429                _ = interrupt_rx.changed() => {
430                    if *interrupt_rx.borrow() {
431                        debug!("Interrupt received in observe mode, terminating");
432                        termination = TerminationType::UserInterrupt;
433                        should_terminate.store(true, Ordering::SeqCst);
434                        let _ = self.terminate_child(&mut child, true).await;
435                        break;
436                    }
437                }
438
439                // Check for output from reader thread
440                event = output_rx.recv() => {
441                    match event {
442                        Some(OutputEvent::Data(data)) => {
443                            // Only write to stdout if TUI is NOT handling output
444                            if !tui_connected {
445                                io::stdout().write_all(&data)?;
446                                io::stdout().flush()?;
447                            }
448                            output.extend_from_slice(&data);
449                            last_activity = Instant::now();
450                        }
451                        Some(OutputEvent::Eof) | None => {
452                            debug!("Output channel closed, process likely exited");
453                            break;
454                        }
455                        Some(OutputEvent::Error(e)) => {
456                            debug!(error = %e, "Reader thread reported error");
457                            break;
458                        }
459                    }
460                }
461
462                // Check for idle timeout
463                _ = async {
464                    if let Some(timeout) = idle_timeout {
465                        tokio::time::sleep(timeout).await;
466                    } else {
467                        // No timeout configured, wait forever
468                        std::future::pending::<()>().await;
469                    }
470                } => {
471                    warn!(
472                        timeout_secs = self.config.idle_timeout_secs,
473                        "Idle timeout triggered"
474                    );
475                    termination = TerminationType::IdleTimeout;
476                    should_terminate.store(true, Ordering::SeqCst);
477                    self.terminate_child(&mut child, true).await?;
478                    break;
479                }
480            }
481
482            // Check if child has exited
483            if let Some(status) = child
484                .try_wait()
485                .map_err(|e| io::Error::other(e.to_string()))?
486            {
487                let exit_code = status.exit_code() as i32;
488                debug!(exit_status = ?status, exit_code, "Child process exited");
489
490                // Drain any remaining output from channel
491                while let Ok(event) = output_rx.try_recv() {
492                    if let OutputEvent::Data(data) = event {
493                        if !tui_connected {
494                            io::stdout().write_all(&data)?;
495                            io::stdout().flush()?;
496                        }
497                        output.extend_from_slice(&data);
498                    }
499                }
500
501                let final_termination = resolve_termination_type(exit_code, termination);
502                // run_observe doesn't parse JSON, so extracted_text is empty
503                return Ok(build_result(
504                    &output,
505                    status.success(),
506                    Some(exit_code),
507                    final_termination,
508                    String::new(),
509                ));
510            }
511        }
512
513        // Signal reader thread to stop
514        should_terminate.store(true, Ordering::SeqCst);
515
516        // Wait for child to fully exit (interruptible + bounded)
517        let status = self
518            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
519            .await?;
520
521        let (success, exit_code, final_termination) = match status {
522            Some(s) => {
523                let code = s.exit_code() as i32;
524                (
525                    s.success(),
526                    Some(code),
527                    resolve_termination_type(code, termination),
528                )
529            }
530            None => {
531                warn!("Timed out waiting for child to exit after termination");
532                (false, None, termination)
533            }
534        };
535
536        // run_observe doesn't parse JSON, so extracted_text is empty
537        Ok(build_result(
538            &output,
539            success,
540            exit_code,
541            final_termination,
542            String::new(),
543        ))
544    }
545
546    /// Runs in observe mode with streaming event handling for JSON output.
547    ///
548    /// When the backend's output format is `StreamJson`, this method parses
549    /// NDJSON lines and dispatches events to the provided handler for real-time
550    /// display. For `Text` format, behaves identically to `run_observe`.
551    ///
552    /// # Arguments
553    /// * `prompt` - The prompt to execute
554    /// * `interrupt_rx` - Watch channel receiver for interrupt signals
555    /// * `handler` - Handler to receive streaming events
556    ///
557    /// # Errors
558    ///
559    /// Returns an error if PTY allocation fails, the command cannot be spawned,
560    /// or an I/O error occurs during output handling.
561    pub async fn run_observe_streaming<H: StreamHandler>(
562        &self,
563        prompt: &str,
564        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
565        handler: &mut H,
566    ) -> io::Result<PtyExecutionResult> {
567        // Check output format to decide parsing strategy
568        let output_format = self.backend.output_format;
569
570        // StreamJson format uses NDJSON line parsing
571        // Text format streams raw output directly to handler
572        let is_stream_json = output_format == OutputFormat::StreamJson;
573
574        // Keep temp_file alive for the duration of execution
575        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
576
577        let reader = pair
578            .master
579            .try_clone_reader()
580            .map_err(|e| io::Error::other(e.to_string()))?;
581
582        // Write stdin input if present (for stdin prompt mode)
583        if let Some(ref input) = stdin_input {
584            tokio::time::sleep(Duration::from_millis(100)).await;
585            let mut writer = pair
586                .master
587                .take_writer()
588                .map_err(|e| io::Error::other(e.to_string()))?;
589            writer.write_all(input.as_bytes())?;
590            writer.write_all(b"\n")?;
591            writer.flush()?;
592        }
593
594        drop(pair.slave);
595
596        let mut output = Vec::new();
597        let mut line_buffer = String::new();
598        // Accumulate extracted text from NDJSON for event parsing
599        let mut extracted_text = String::new();
600        let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
601            None
602        } else {
603            Some(Duration::from_secs(u64::from(
604                self.config.idle_timeout_secs,
605            )))
606        };
607
608        let mut termination = TerminationType::Natural;
609        let mut last_activity = Instant::now();
610
611        let should_terminate = Arc::new(AtomicBool::new(false));
612
613        // Spawn blocking reader thread
614        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
615        let should_terminate_reader = Arc::clone(&should_terminate);
616        let tui_connected = self.tui_mode;
617        let tui_output_tx = if tui_connected {
618            Some(self.output_tx.clone())
619        } else {
620            None
621        };
622
623        debug!("Spawning PTY output reader thread (streaming mode)");
624        std::thread::spawn(move || {
625            let mut reader = reader;
626            let mut buf = [0u8; 4096];
627
628            loop {
629                if should_terminate_reader.load(Ordering::SeqCst) {
630                    debug!("PTY reader: termination requested");
631                    break;
632                }
633
634                match reader.read(&mut buf) {
635                    Ok(0) => {
636                        debug!("PTY reader: EOF");
637                        let _ = output_tx.blocking_send(OutputEvent::Eof);
638                        break;
639                    }
640                    Ok(n) => {
641                        let data = buf[..n].to_vec();
642                        if let Some(ref tx) = tui_output_tx {
643                            let _ = tx.send(data.clone());
644                        }
645                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
646                            break;
647                        }
648                    }
649                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
650                        std::thread::sleep(Duration::from_millis(10));
651                    }
652                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
653                    Err(e) => {
654                        debug!(error = %e, "PTY reader error");
655                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
656                        break;
657                    }
658                }
659            }
660        });
661
662        // Main event loop with JSON line parsing
663        loop {
664            let idle_timeout = timeout_duration.map(|d| {
665                let elapsed = last_activity.elapsed();
666                if elapsed >= d {
667                    Duration::from_millis(1)
668                } else {
669                    d.saturating_sub(elapsed)
670                }
671            });
672
673            tokio::select! {
674                _ = interrupt_rx.changed() => {
675                    if *interrupt_rx.borrow() {
676                        debug!("Interrupt received in streaming observe mode, terminating");
677                        termination = TerminationType::UserInterrupt;
678                        should_terminate.store(true, Ordering::SeqCst);
679                        let _ = self.terminate_child(&mut child, true).await;
680                        break;
681                    }
682                }
683
684                event = output_rx.recv() => {
685                    match event {
686                        Some(OutputEvent::Data(data)) => {
687                            output.extend_from_slice(&data);
688                            last_activity = Instant::now();
689
690                            if let Ok(text) = std::str::from_utf8(&data) {
691                                if is_stream_json {
692                                    // StreamJson format: Parse JSON lines from the data
693                                    line_buffer.push_str(text);
694
695                                    // Process complete lines
696                                    while let Some(newline_pos) = line_buffer.find('\n') {
697                                        let line = line_buffer[..newline_pos].to_string();
698                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
699
700                                        if let Some(event) = ClaudeStreamParser::parse_line(&line) {
701                                            dispatch_stream_event(event, handler, &mut extracted_text);
702                                        }
703                                    }
704                                } else {
705                                    // Text format: Stream raw output directly to handler
706                                    // This preserves ANSI escape codes for TUI rendering
707                                    handler.on_text(text);
708                                }
709                            }
710                        }
711                        Some(OutputEvent::Eof) | None => {
712                            debug!("Output channel closed");
713                            // Process any remaining content in buffer (StreamJson only)
714                            if is_stream_json && !line_buffer.is_empty()
715                                && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
716                            {
717                                dispatch_stream_event(event, handler, &mut extracted_text);
718                            }
719                            break;
720                        }
721                        Some(OutputEvent::Error(e)) => {
722                            debug!(error = %e, "Reader thread reported error");
723                            handler.on_error(&e);
724                            break;
725                        }
726                    }
727                }
728
729                _ = async {
730                    if let Some(timeout) = idle_timeout {
731                        tokio::time::sleep(timeout).await;
732                    } else {
733                        std::future::pending::<()>().await;
734                    }
735                } => {
736                    warn!(
737                        timeout_secs = self.config.idle_timeout_secs,
738                        "Idle timeout triggered"
739                    );
740                    termination = TerminationType::IdleTimeout;
741                    should_terminate.store(true, Ordering::SeqCst);
742                    self.terminate_child(&mut child, true).await?;
743                    break;
744                }
745            }
746
747            // Check if child has exited
748            if let Some(status) = child
749                .try_wait()
750                .map_err(|e| io::Error::other(e.to_string()))?
751            {
752                let exit_code = status.exit_code() as i32;
753                debug!(exit_status = ?status, exit_code, "Child process exited");
754
755                // Drain remaining output
756                while let Ok(event) = output_rx.try_recv() {
757                    if let OutputEvent::Data(data) = event {
758                        output.extend_from_slice(&data);
759                        if let Ok(text) = std::str::from_utf8(&data) {
760                            if is_stream_json {
761                                // StreamJson: parse JSON lines
762                                line_buffer.push_str(text);
763                                while let Some(newline_pos) = line_buffer.find('\n') {
764                                    let line = line_buffer[..newline_pos].to_string();
765                                    line_buffer = line_buffer[newline_pos + 1..].to_string();
766                                    if let Some(event) = ClaudeStreamParser::parse_line(&line) {
767                                        dispatch_stream_event(event, handler, &mut extracted_text);
768                                    }
769                                }
770                            } else {
771                                // Text: stream raw output to handler
772                                handler.on_text(text);
773                            }
774                        }
775                    }
776                }
777
778                // Process final buffer content (StreamJson only)
779                if is_stream_json
780                    && !line_buffer.is_empty()
781                    && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
782                {
783                    dispatch_stream_event(event, handler, &mut extracted_text);
784                }
785
786                let final_termination = resolve_termination_type(exit_code, termination);
787                // Pass extracted_text for event parsing from NDJSON
788                return Ok(build_result(
789                    &output,
790                    status.success(),
791                    Some(exit_code),
792                    final_termination,
793                    extracted_text,
794                ));
795            }
796        }
797
798        should_terminate.store(true, Ordering::SeqCst);
799
800        let status = self
801            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
802            .await?;
803
804        let (success, exit_code, final_termination) = match status {
805            Some(s) => {
806                let code = s.exit_code() as i32;
807                (
808                    s.success(),
809                    Some(code),
810                    resolve_termination_type(code, termination),
811                )
812            }
813            None => {
814                warn!("Timed out waiting for child to exit after termination");
815                (false, None, termination)
816            }
817        };
818
819        // Pass extracted_text for event parsing from NDJSON
820        Ok(build_result(
821            &output,
822            success,
823            exit_code,
824            final_termination,
825            extracted_text,
826        ))
827    }
828
829    /// Runs in interactive mode (bidirectional I/O).
830    ///
831    /// Uses `tokio::select!` for non-blocking I/O multiplexing between:
832    /// 1. PTY output (from blocking reader via channel)
833    /// 2. User input (from stdin thread via channel)
834    /// 3. Interrupt signal from event loop
835    /// 4. Idle timeout
836    ///
837    /// This design ensures Ctrl+C is always responsive, even when the PTY
838    /// has no output (e.g., during long-running tool calls).
839    ///
840    /// # Arguments
841    /// * `prompt` - The prompt to execute
842    /// * `interrupt_rx` - Watch channel receiver for interrupt signals from the event loop
843    ///
844    /// # Errors
845    ///
846    /// Returns an error if PTY allocation fails, the command cannot be spawned,
847    /// or an I/O error occurs during bidirectional communication.
848    #[allow(clippy::too_many_lines)] // Complex state machine requires cohesive implementation
849    pub async fn run_interactive(
850        &mut self,
851        prompt: &str,
852        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
853    ) -> io::Result<PtyExecutionResult> {
854        // Keep temp_file alive for the duration of execution (large prompts use temp files)
855        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
856
857        let reader = pair
858            .master
859            .try_clone_reader()
860            .map_err(|e| io::Error::other(e.to_string()))?;
861        let mut writer = pair
862            .master
863            .take_writer()
864            .map_err(|e| io::Error::other(e.to_string()))?;
865
866        // Keep master for resize operations
867        let master = pair.master;
868
869        // Drop the slave to signal EOF when master closes
870        drop(pair.slave);
871
872        // Store stdin_input for writing after reader thread starts
873        let pending_stdin = stdin_input;
874
875        let mut output = Vec::new();
876        let timeout_duration = if self.config.idle_timeout_secs > 0 {
877            Some(Duration::from_secs(u64::from(
878                self.config.idle_timeout_secs,
879            )))
880        } else {
881            None
882        };
883
884        let mut ctrl_c_state = CtrlCState::new();
885        let mut termination = TerminationType::Natural;
886        let mut last_activity = Instant::now();
887
888        // Flag for termination request (shared with spawned tasks)
889        let should_terminate = Arc::new(AtomicBool::new(false));
890
891        // Spawn output reading task (blocking read wrapped in spawn_blocking via channel)
892        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
893        let should_terminate_output = Arc::clone(&should_terminate);
894        // Check if TUI is handling output (output_rx taken by handle())
895        let tui_connected = self.tui_mode;
896        let tui_output_tx = if tui_connected {
897            Some(self.output_tx.clone())
898        } else {
899            None
900        };
901
902        debug!("Spawning PTY output reader thread");
903        std::thread::spawn(move || {
904            debug!("PTY output reader thread started");
905            let mut reader = reader;
906            let mut buf = [0u8; 4096];
907
908            loop {
909                if should_terminate_output.load(Ordering::SeqCst) {
910                    debug!("PTY output reader: termination requested");
911                    break;
912                }
913
914                match reader.read(&mut buf) {
915                    Ok(0) => {
916                        // EOF - PTY closed
917                        debug!("PTY output reader: EOF received");
918                        let _ = output_tx.blocking_send(OutputEvent::Eof);
919                        break;
920                    }
921                    Ok(n) => {
922                        let data = buf[..n].to_vec();
923                        // Send to TUI channel if connected
924                        if let Some(ref tx) = tui_output_tx {
925                            let _ = tx.send(data.clone());
926                        }
927                        // Send to main loop
928                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
929                            debug!("PTY output reader: channel closed");
930                            break;
931                        }
932                    }
933                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
934                        // Non-blocking mode: no data available, yield briefly
935                        std::thread::sleep(Duration::from_millis(1));
936                    }
937                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {
938                        // Interrupted by signal, retry
939                    }
940                    Err(e) => {
941                        warn!("PTY output reader: error - {}", e);
942                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
943                        break;
944                    }
945                }
946            }
947            debug!("PTY output reader thread exiting");
948        });
949
950        // Spawn input reading task - ONLY when TUI is NOT connected
951        // In TUI mode (observation mode), user input should not be captured from stdin.
952        // The TUI has its own input handling, and raw Ctrl+C should go directly to the
953        // signal handler (interrupt_rx) without racing with the stdin reader.
954        let mut input_rx = if tui_connected {
955            debug!("TUI connected - skipping stdin reader thread");
956            None
957        } else {
958            let (input_tx, input_rx) = mpsc::unbounded_channel::<InputEvent>();
959            let should_terminate_input = Arc::clone(&should_terminate);
960
961            std::thread::spawn(move || {
962                let mut stdin = io::stdin();
963                let mut buf = [0u8; 1];
964
965                loop {
966                    if should_terminate_input.load(Ordering::SeqCst) {
967                        break;
968                    }
969
970                    match stdin.read(&mut buf) {
971                        Ok(0) => break, // EOF
972                        Ok(1) => {
973                            let byte = buf[0];
974                            let event = match byte {
975                                3 => InputEvent::CtrlC,          // Ctrl+C
976                                28 => InputEvent::CtrlBackslash, // Ctrl+\
977                                _ => InputEvent::Data(vec![byte]),
978                            };
979                            if input_tx.send(event).is_err() {
980                                break;
981                            }
982                        }
983                        Ok(_) => {} // Shouldn't happen with 1-byte buffer
984                        Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
985                        Err(_) => break,
986                    }
987                }
988            });
989            Some(input_rx)
990        };
991
992        // Write stdin input after threads are spawned (so we capture any output)
993        // Give Claude's TUI a moment to initialize before sending the prompt
994        if let Some(ref input) = pending_stdin {
995            tokio::time::sleep(Duration::from_millis(100)).await;
996            writer.write_all(input.as_bytes())?;
997            writer.write_all(b"\n")?;
998            writer.flush()?;
999            last_activity = Instant::now();
1000        }
1001
1002        // Main select loop - this is the key fix for blocking I/O
1003        // We use tokio::select! to multiplex between output, input, and timeout
1004        loop {
1005            // Check if child has exited (non-blocking check before select)
1006            if let Some(status) = child
1007                .try_wait()
1008                .map_err(|e| io::Error::other(e.to_string()))?
1009            {
1010                let exit_code = status.exit_code() as i32;
1011                debug!(exit_status = ?status, exit_code, "Child process exited");
1012
1013                // Drain remaining output from channel
1014                while let Ok(event) = output_rx.try_recv() {
1015                    if let OutputEvent::Data(data) = event {
1016                        if !tui_connected {
1017                            io::stdout().write_all(&data)?;
1018                            io::stdout().flush()?;
1019                        }
1020                        output.extend_from_slice(&data);
1021                    }
1022                }
1023
1024                should_terminate.store(true, Ordering::SeqCst);
1025                // Signal TUI that PTY has terminated
1026                let _ = self.terminated_tx.send(true);
1027
1028                let final_termination = resolve_termination_type(exit_code, termination);
1029                // run_interactive doesn't parse JSON, so extracted_text is empty
1030                return Ok(build_result(
1031                    &output,
1032                    status.success(),
1033                    Some(exit_code),
1034                    final_termination,
1035                    String::new(),
1036                ));
1037            }
1038
1039            // Build the timeout future (or a never-completing one if disabled)
1040            let timeout_future = async {
1041                match timeout_duration {
1042                    Some(d) => {
1043                        let elapsed = last_activity.elapsed();
1044                        if elapsed >= d {
1045                            tokio::time::sleep(Duration::ZERO).await
1046                        } else {
1047                            tokio::time::sleep(d.saturating_sub(elapsed)).await
1048                        }
1049                    }
1050                    None => std::future::pending::<()>().await,
1051                }
1052            };
1053
1054            tokio::select! {
1055                // PTY output received
1056                output_event = output_rx.recv() => {
1057                    match output_event {
1058                        Some(OutputEvent::Data(data)) => {
1059                            // Only write to stdout if TUI is NOT handling output
1060                            if !tui_connected {
1061                                io::stdout().write_all(&data)?;
1062                                io::stdout().flush()?;
1063                            }
1064                            output.extend_from_slice(&data);
1065
1066                            last_activity = Instant::now();
1067                        }
1068                        Some(OutputEvent::Eof) => {
1069                            debug!("PTY EOF received");
1070                            break;
1071                        }
1072                        Some(OutputEvent::Error(e)) => {
1073                            debug!(error = %e, "PTY read error");
1074                            break;
1075                        }
1076                        None => {
1077                            // Channel closed, reader thread exited
1078                            break;
1079                        }
1080                    }
1081                }
1082
1083                // User input received (from stdin) - only active when TUI is NOT connected
1084                input_event = async {
1085                    match input_rx.as_mut() {
1086                        Some(rx) => rx.recv().await,
1087                        None => std::future::pending().await, // Never resolves when TUI is connected
1088                    }
1089                } => {
1090                    match input_event {
1091                        Some(InputEvent::CtrlC) => {
1092                            match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1093                                CtrlCAction::ForwardAndStartWindow => {
1094                                    // Forward Ctrl+C to Claude
1095                                    let _ = writer.write_all(&[3]);
1096                                    let _ = writer.flush();
1097                                    last_activity = Instant::now();
1098                                }
1099                                CtrlCAction::Terminate => {
1100                                    info!("Double Ctrl+C detected, terminating");
1101                                    termination = TerminationType::UserInterrupt;
1102                                    should_terminate.store(true, Ordering::SeqCst);
1103                                    self.terminate_child(&mut child, true).await?;
1104                                    break;
1105                                }
1106                            }
1107                        }
1108                        Some(InputEvent::CtrlBackslash) => {
1109                            info!("Ctrl+\\ detected, force killing");
1110                            termination = TerminationType::ForceKill;
1111                            should_terminate.store(true, Ordering::SeqCst);
1112                            self.terminate_child(&mut child, false).await?;
1113                            break;
1114                        }
1115                        Some(InputEvent::Data(data)) => {
1116                            // Forward to Claude
1117                            let _ = writer.write_all(&data);
1118                            let _ = writer.flush();
1119                            last_activity = Instant::now();
1120                        }
1121                        None => {
1122                            // Input channel closed (stdin EOF)
1123                            debug!("Input channel closed");
1124                        }
1125                    }
1126                }
1127
1128                // TUI input received (convert to InputEvent for unified handling)
1129                tui_input = self.input_rx.recv() => {
1130                    if let Some(data) = tui_input {
1131                        match InputEvent::from_bytes(data) {
1132                            InputEvent::CtrlC => {
1133                                match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1134                                    CtrlCAction::ForwardAndStartWindow => {
1135                                        let _ = writer.write_all(&[3]);
1136                                        let _ = writer.flush();
1137                                        last_activity = Instant::now();
1138                                    }
1139                                    CtrlCAction::Terminate => {
1140                                        info!("Double Ctrl+C detected, terminating");
1141                                        termination = TerminationType::UserInterrupt;
1142                                        should_terminate.store(true, Ordering::SeqCst);
1143                                        self.terminate_child(&mut child, true).await?;
1144                                        break;
1145                                    }
1146                                }
1147                            }
1148                            InputEvent::CtrlBackslash => {
1149                                info!("Ctrl+\\ detected, force killing");
1150                                termination = TerminationType::ForceKill;
1151                                should_terminate.store(true, Ordering::SeqCst);
1152                                self.terminate_child(&mut child, false).await?;
1153                                break;
1154                            }
1155                            InputEvent::Data(bytes) => {
1156                                let _ = writer.write_all(&bytes);
1157                                let _ = writer.flush();
1158                                last_activity = Instant::now();
1159                            }
1160                        }
1161                    }
1162                }
1163
1164                // Control commands from TUI
1165                control_cmd = self.control_rx.recv() => {
1166                    if let Some(cmd) = control_cmd {
1167                        use crate::pty_handle::ControlCommand;
1168                        match cmd {
1169                            ControlCommand::Kill => {
1170                                info!("Control command: Kill");
1171                                termination = TerminationType::UserInterrupt;
1172                                should_terminate.store(true, Ordering::SeqCst);
1173                                self.terminate_child(&mut child, true).await?;
1174                                break;
1175                            }
1176                            ControlCommand::Resize(cols, rows) => {
1177                                debug!(cols, rows, "Control command: Resize");
1178                                // Resize the PTY to match TUI dimensions
1179                                if let Err(e) = master.resize(PtySize {
1180                                    rows,
1181                                    cols,
1182                                    pixel_width: 0,
1183                                    pixel_height: 0,
1184                                }) {
1185                                    warn!("Failed to resize PTY: {}", e);
1186                                }
1187                            }
1188                            ControlCommand::Skip | ControlCommand::Abort => {
1189                                // These are handled at orchestrator level, not here
1190                                debug!("Control command: {:?} (ignored at PTY level)", cmd);
1191                            }
1192                        }
1193                    }
1194                }
1195
1196                // Idle timeout expired
1197                _ = timeout_future => {
1198                    warn!(
1199                        timeout_secs = self.config.idle_timeout_secs,
1200                        "Idle timeout triggered"
1201                    );
1202                    termination = TerminationType::IdleTimeout;
1203                    should_terminate.store(true, Ordering::SeqCst);
1204                    self.terminate_child(&mut child, true).await?;
1205                    break;
1206                }
1207
1208                // Interrupt signal from event loop
1209                _ = interrupt_rx.changed() => {
1210                    if *interrupt_rx.borrow() {
1211                        debug!("Interrupt received in interactive mode, terminating");
1212                        termination = TerminationType::UserInterrupt;
1213                        should_terminate.store(true, Ordering::SeqCst);
1214                        self.terminate_child(&mut child, true).await?;
1215                        break;
1216                    }
1217                }
1218            }
1219        }
1220
1221        // Ensure termination flag is set for spawned threads
1222        should_terminate.store(true, Ordering::SeqCst);
1223
1224        // Signal TUI that PTY has terminated
1225        let _ = self.terminated_tx.send(true);
1226
1227        // Wait for child to fully exit (interruptible + bounded)
1228        let status = self
1229            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1230            .await?;
1231
1232        let (success, exit_code, final_termination) = match status {
1233            Some(s) => {
1234                let code = s.exit_code() as i32;
1235                (
1236                    s.success(),
1237                    Some(code),
1238                    resolve_termination_type(code, termination),
1239                )
1240            }
1241            None => {
1242                warn!("Timed out waiting for child to exit after termination");
1243                (false, None, termination)
1244            }
1245        };
1246
1247        // run_interactive doesn't parse JSON, so extracted_text is empty
1248        Ok(build_result(
1249            &output,
1250            success,
1251            exit_code,
1252            final_termination,
1253            String::new(),
1254        ))
1255    }
1256
1257    /// Terminates the child process.
1258    ///
1259    /// If `graceful` is true, sends SIGTERM and waits up to 5 seconds before SIGKILL.
1260    /// If `graceful` is false, sends SIGKILL immediately.
1261    ///
1262    /// This is an async function to avoid blocking the tokio runtime during the
1263    /// grace period wait. Previously used `std::thread::sleep` which blocked the
1264    /// worker thread for up to 5 seconds, making the TUI appear frozen.
1265    #[allow(clippy::unused_self)] // Self is conceptually the right receiver for this method
1266    #[allow(clippy::unused_async)] // Kept async to preserve signature parity with Unix implementation
1267    #[cfg(not(unix))]
1268    async fn terminate_child(
1269        &self,
1270        child: &mut Box<dyn portable_pty::Child + Send>,
1271        _graceful: bool,
1272    ) -> io::Result<()> {
1273        child.kill()
1274    }
1275
1276    #[cfg(unix)]
1277    async fn terminate_child(
1278        &self,
1279        child: &mut Box<dyn portable_pty::Child + Send>,
1280        graceful: bool,
1281    ) -> io::Result<()> {
1282        let pid = match child.process_id() {
1283            Some(id) => Pid::from_raw(id as i32),
1284            None => return Ok(()), // Already exited
1285        };
1286
1287        if graceful {
1288            debug!(pid = %pid, "Sending SIGTERM");
1289            let _ = kill(pid, Signal::SIGTERM);
1290
1291            // Wait up to 5 seconds for graceful exit (reduced from 5s for better UX)
1292            let grace_period = Duration::from_secs(2);
1293            let start = Instant::now();
1294
1295            while start.elapsed() < grace_period {
1296                if child
1297                    .try_wait()
1298                    .map_err(|e| io::Error::other(e.to_string()))?
1299                    .is_some()
1300                {
1301                    return Ok(());
1302                }
1303                // Use async sleep to avoid blocking the tokio runtime
1304                tokio::time::sleep(Duration::from_millis(50)).await;
1305            }
1306
1307            // Still running after grace period - force kill
1308            debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1309        }
1310
1311        debug!(pid = %pid, "Sending SIGKILL");
1312        let _ = kill(pid, Signal::SIGKILL);
1313        Ok(())
1314    }
1315
1316    /// Waits for the child process to exit, optionally with a timeout.
1317    ///
1318    /// This is interruptible by the shared interrupt channel from the event loop.
1319    /// When interrupted, returns `Ok(None)` to let the caller handle termination.
1320    async fn wait_for_exit(
1321        &self,
1322        child: &mut Box<dyn portable_pty::Child + Send>,
1323        max_wait: Option<Duration>,
1324        interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1325    ) -> io::Result<Option<portable_pty::ExitStatus>> {
1326        let start = Instant::now();
1327
1328        loop {
1329            if let Some(status) = child
1330                .try_wait()
1331                .map_err(|e| io::Error::other(e.to_string()))?
1332            {
1333                return Ok(Some(status));
1334            }
1335
1336            if let Some(max) = max_wait
1337                && start.elapsed() >= max
1338            {
1339                return Ok(None);
1340            }
1341
1342            tokio::select! {
1343                _ = interrupt_rx.changed() => {
1344                    if *interrupt_rx.borrow() {
1345                        debug!("Interrupt received while waiting for child exit");
1346                        return Ok(None);
1347                    }
1348                }
1349                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1350            }
1351        }
1352    }
1353}
1354
1355/// Input events from the user.
1356#[derive(Debug)]
1357enum InputEvent {
1358    /// Ctrl+C pressed.
1359    CtrlC,
1360    /// Ctrl+\ pressed.
1361    CtrlBackslash,
1362    /// Regular data to forward.
1363    Data(Vec<u8>),
1364}
1365
1366impl InputEvent {
1367    /// Creates an InputEvent from raw bytes.
1368    fn from_bytes(data: Vec<u8>) -> Self {
1369        if data.len() == 1 {
1370            match data[0] {
1371                3 => return InputEvent::CtrlC,
1372                28 => return InputEvent::CtrlBackslash,
1373                _ => {}
1374            }
1375        }
1376        InputEvent::Data(data)
1377    }
1378}
1379
1380/// Output events from the PTY.
1381#[derive(Debug)]
1382enum OutputEvent {
1383    /// Data received from PTY.
1384    Data(Vec<u8>),
1385    /// PTY reached EOF (process exited).
1386    Eof,
1387    /// Error reading from PTY.
1388    Error(String),
1389}
1390
1391/// Strips ANSI escape sequences from raw bytes.
1392///
1393/// Uses `strip-ansi-escapes` for direct byte-level ANSI removal without terminal
1394/// emulation. This ensures ALL content is preserved regardless of output size,
1395/// unlike vt100's terminal simulation which can lose content that scrolls off.
1396fn strip_ansi(bytes: &[u8]) -> String {
1397    let stripped = strip_ansi_escapes::strip(bytes);
1398    String::from_utf8_lossy(&stripped).into_owned()
1399}
1400
1401/// Determines the final termination type, accounting for SIGINT exit code.
1402///
1403/// Exit code 130 indicates the process was killed by SIGINT (Ctrl+C forwarded to PTY).
1404fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1405    if exit_code == 130 {
1406        info!("Child process killed by SIGINT");
1407        TerminationType::UserInterrupt
1408    } else {
1409        default
1410    }
1411}
1412
1413/// Dispatches a Claude stream event to the appropriate handler method.
1414/// Also accumulates text content into `extracted_text` for event parsing.
1415fn dispatch_stream_event<H: StreamHandler>(
1416    event: ClaudeStreamEvent,
1417    handler: &mut H,
1418    extracted_text: &mut String,
1419) {
1420    match event {
1421        ClaudeStreamEvent::System { .. } => {
1422            // Session initialization - could log in verbose mode but not user-facing
1423        }
1424        ClaudeStreamEvent::Assistant { message, .. } => {
1425            for block in message.content {
1426                match block {
1427                    ContentBlock::Text { text } => {
1428                        handler.on_text(&text);
1429                        // Accumulate text for event parsing
1430                        extracted_text.push_str(&text);
1431                        extracted_text.push('\n');
1432                    }
1433                    ContentBlock::ToolUse { name, id, input } => {
1434                        handler.on_tool_call(&name, &id, &input)
1435                    }
1436                }
1437            }
1438        }
1439        ClaudeStreamEvent::User { message } => {
1440            for block in message.content {
1441                match block {
1442                    UserContentBlock::ToolResult {
1443                        tool_use_id,
1444                        content,
1445                    } => {
1446                        handler.on_tool_result(&tool_use_id, &content);
1447                    }
1448                }
1449            }
1450        }
1451        ClaudeStreamEvent::Result {
1452            duration_ms,
1453            total_cost_usd,
1454            num_turns,
1455            is_error,
1456        } => {
1457            if is_error {
1458                handler.on_error("Session ended with error");
1459            }
1460            handler.on_complete(&SessionResult {
1461                duration_ms,
1462                total_cost_usd,
1463                num_turns,
1464                is_error,
1465            });
1466        }
1467    }
1468}
1469
1470/// Builds a `PtyExecutionResult` from the accumulated output and exit status.
1471///
1472/// # Arguments
1473/// * `output` - Raw bytes from PTY
1474/// * `success` - Whether process exited successfully
1475/// * `exit_code` - Process exit code if available
1476/// * `termination` - How the process was terminated
1477/// * `extracted_text` - Text extracted from NDJSON stream (for Claude's stream-json)
1478fn build_result(
1479    output: &[u8],
1480    success: bool,
1481    exit_code: Option<i32>,
1482    termination: TerminationType,
1483    extracted_text: String,
1484) -> PtyExecutionResult {
1485    PtyExecutionResult {
1486        output: String::from_utf8_lossy(output).to_string(),
1487        stripped_output: strip_ansi(output),
1488        extracted_text,
1489        success,
1490        exit_code,
1491        termination,
1492    }
1493}
1494
1495#[cfg(test)]
1496mod tests {
1497    use super::*;
1498
1499    #[test]
1500    fn test_double_ctrl_c_within_window() {
1501        let mut state = CtrlCState::new();
1502        let now = Instant::now();
1503
1504        // First Ctrl+C: should forward and start window
1505        let action = state.handle_ctrl_c(now);
1506        assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1507
1508        // Second Ctrl+C within 1 second: should terminate
1509        let later = now + Duration::from_millis(500);
1510        let action = state.handle_ctrl_c(later);
1511        assert_eq!(action, CtrlCAction::Terminate);
1512    }
1513
1514    #[test]
1515    fn test_ctrl_c_window_expires() {
1516        let mut state = CtrlCState::new();
1517        let now = Instant::now();
1518
1519        // First Ctrl+C
1520        state.handle_ctrl_c(now);
1521
1522        // Wait 2 seconds (window expires)
1523        let later = now + Duration::from_secs(2);
1524
1525        // Second Ctrl+C: window expired, should forward and start new window
1526        let action = state.handle_ctrl_c(later);
1527        assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1528    }
1529
1530    #[test]
1531    fn test_strip_ansi_basic() {
1532        let input = b"\x1b[1;36m  Thinking...\x1b[0m\r\n";
1533        let stripped = strip_ansi(input);
1534        assert!(stripped.contains("Thinking..."));
1535        assert!(!stripped.contains("\x1b["));
1536    }
1537
1538    #[test]
1539    fn test_completion_promise_extraction() {
1540        // Simulate Claude output with heavy ANSI formatting
1541        let input = b"\x1b[1;36m  Thinking...\x1b[0m\r\n\
1542                      \x1b[2K\x1b[1;32m  Done!\x1b[0m\r\n\
1543                      \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
1544
1545        let stripped = strip_ansi(input);
1546
1547        // Event parser sees clean text
1548        assert!(stripped.contains("LOOP_COMPLETE"));
1549        assert!(!stripped.contains("\x1b["));
1550    }
1551
1552    #[test]
1553    fn test_event_tag_extraction() {
1554        // Event tags may be wrapped in ANSI codes
1555        let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
1556                      Task completed successfully\r\n\
1557                      \x1b[90m</event>\x1b[0m\r\n";
1558
1559        let stripped = strip_ansi(input);
1560
1561        assert!(stripped.contains("<event topic=\"build.done\">"));
1562        assert!(stripped.contains("</event>"));
1563    }
1564
1565    #[test]
1566    fn test_large_output_preserves_early_events() {
1567        // Regression test: ensure event tags aren't lost when output is large
1568        let mut input = Vec::new();
1569
1570        // Event tag at the beginning
1571        input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
1572
1573        // Simulate 500 lines of verbose output (would overflow any terminal)
1574        for i in 0..500 {
1575            input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
1576        }
1577
1578        let stripped = strip_ansi(&input);
1579
1580        // Event tag should still be present - no scrollback loss with strip-ansi-escapes
1581        assert!(
1582            stripped.contains("<event topic=\"build.task\">"),
1583            "Event tag was lost - strip_ansi is not preserving all content"
1584        );
1585        assert!(stripped.contains("Implement feature X"));
1586        assert!(stripped.contains("Line 499")); // Last line should be present too
1587    }
1588
1589    #[test]
1590    fn test_pty_config_defaults() {
1591        let config = PtyConfig::default();
1592        assert!(config.interactive);
1593        assert_eq!(config.idle_timeout_secs, 30);
1594        assert_eq!(config.cols, 80);
1595        assert_eq!(config.rows, 24);
1596    }
1597
1598    /// Verifies that the idle timeout logic in run_interactive correctly handles
1599    /// activity resets. Per spec (interactive-mode.spec.md lines 155-159):
1600    /// - Timeout resets on agent output (any bytes from PTY)
1601    /// - Timeout resets on user input (any key forwarded to agent)
1602    ///
1603    /// This test validates the timeout calculation logic that enables resets.
1604    /// The actual reset happens in the select! branches at lines 497, 523, and 545.
1605    #[test]
1606    fn test_idle_timeout_reset_logic() {
1607        // Simulate the timeout calculation used in run_interactive
1608        let timeout_duration = Duration::from_secs(30);
1609
1610        // Simulate 25 seconds of inactivity
1611        let simulated_25s = Duration::from_secs(25);
1612
1613        // Remaining time before timeout
1614        let remaining = timeout_duration.saturating_sub(simulated_25s);
1615        assert_eq!(remaining.as_secs(), 5);
1616
1617        // After activity (output or input), last_activity would be reset to now
1618        let last_activity_after_reset = Instant::now();
1619
1620        // Now elapsed is 0, full timeout duration available again
1621        let elapsed = last_activity_after_reset.elapsed();
1622        assert!(elapsed < Duration::from_millis(100)); // Should be near-zero
1623
1624        // Timeout calculation would give full duration minus small elapsed
1625        let new_remaining = timeout_duration.saturating_sub(elapsed);
1626        assert!(new_remaining > Duration::from_secs(29)); // Should be nearly full timeout
1627    }
1628
1629    #[test]
1630    fn test_extracted_text_field_exists() {
1631        // Test that PtyExecutionResult has extracted_text field
1632        // This is for NDJSON output where event tags are inside JSON strings
1633        let result = PtyExecutionResult {
1634            output: String::new(),
1635            stripped_output: String::new(),
1636            extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
1637            success: true,
1638            exit_code: Some(0),
1639            termination: TerminationType::Natural,
1640        };
1641
1642        assert!(
1643            result
1644                .extracted_text
1645                .contains("<event topic=\"build.done\">")
1646        );
1647    }
1648
1649    #[test]
1650    fn test_build_result_includes_extracted_text() {
1651        // Test that build_result properly handles extracted_text
1652        let output = b"raw output";
1653        let extracted = "extracted text with <event topic=\"test\">payload</event>";
1654        let result = build_result(
1655            output,
1656            true,
1657            Some(0),
1658            TerminationType::Natural,
1659            extracted.to_string(),
1660        );
1661
1662        assert_eq!(result.extracted_text, extracted);
1663        assert!(result.stripped_output.contains("raw output"));
1664    }
1665
1666    /// Regression test: TUI mode should not spawn stdin reader thread
1667    ///
1668    /// Bug: In TUI mode, Ctrl+C required double-press to exit because the stdin
1669    /// reader thread (which captures byte 0x03) raced with the signal handler.
1670    /// The stdin reader would win, triggering "double Ctrl+C" logic instead of
1671    /// clean exit via interrupt_rx.
1672    ///
1673    /// Fix: When tui_connected=true, skip spawning stdin reader entirely.
1674    /// TUI mode is observation-only; user input should not be captured from stdin.
1675    /// The TUI has its own input handling (Ctrl+a q), and raw Ctrl+C goes directly
1676    /// to the signal handler (interrupt_rx) without racing.
1677    ///
1678    /// This test documents the expected behavior. The actual fix is in
1679    /// run_interactive() where `let mut input_rx = if !tui_connected { ... }`.
1680    #[test]
1681    fn test_tui_mode_stdin_reader_bypass() {
1682        // The tui_connected flag is now determined by the explicit tui_mode field,
1683        // set via set_tui_mode(true) when TUI is connected.
1684        // Previously used output_rx.is_none() which broke after streaming refactor.
1685
1686        // Simulate TUI connected scenario (tui_mode = true)
1687        let tui_mode = true;
1688        let tui_connected = tui_mode;
1689
1690        // When TUI is connected, stdin reader is skipped
1691        // (verified by: input_rx becomes None instead of Some(channel))
1692        assert!(
1693            tui_connected,
1694            "When tui_mode is true, stdin reader must be skipped"
1695        );
1696
1697        // In non-TUI mode, stdin reader is spawned
1698        let tui_mode_disabled = false;
1699        let tui_connected_non_tui = tui_mode_disabled;
1700        assert!(
1701            !tui_connected_non_tui,
1702            "When tui_mode is false, stdin reader must be spawned"
1703        );
1704    }
1705
1706    #[test]
1707    fn test_tui_mode_default_is_false() {
1708        // Create a PtyExecutor and verify tui_mode defaults to false
1709        let backend = CliBackend::claude();
1710        let config = PtyConfig::default();
1711        let executor = PtyExecutor::new(backend, config);
1712
1713        // tui_mode should default to false
1714        assert!(!executor.tui_mode, "tui_mode should default to false");
1715    }
1716
1717    #[test]
1718    fn test_set_tui_mode() {
1719        // Create a PtyExecutor and verify set_tui_mode works
1720        let backend = CliBackend::claude();
1721        let config = PtyConfig::default();
1722        let mut executor = PtyExecutor::new(backend, config);
1723
1724        // Initially false
1725        assert!(!executor.tui_mode, "tui_mode should start as false");
1726
1727        // Set to true
1728        executor.set_tui_mode(true);
1729        assert!(
1730            executor.tui_mode,
1731            "tui_mode should be true after set_tui_mode(true)"
1732        );
1733
1734        // Set back to false
1735        executor.set_tui_mode(false);
1736        assert!(
1737            !executor.tui_mode,
1738            "tui_mode should be false after set_tui_mode(false)"
1739        );
1740    }
1741}