Skip to main content

ralph_adapters/
pty_executor.rs

1//! PTY executor for running prompts with full terminal emulation.
2//!
3//! Spawns CLI tools in a pseudo-terminal to preserve rich TUI features like
4//! colors, spinners, and animations. Supports both interactive mode (user
5//! input forwarded) and observe mode (output-only).
6//!
7//! Key features:
8//! - PTY creation via `portable-pty` for cross-platform support
9//! - Idle timeout with activity tracking (output AND input reset timer)
10//! - Double Ctrl+C handling (first forwards, second terminates)
11//! - Raw mode management with cleanup on exit/crash
12//!
13//! Architecture:
14//! - Uses `tokio::select!` for non-blocking I/O multiplexing
15//! - Spawns separate tasks for PTY output and user input
16//! - Enables responsive Ctrl+C handling even when PTY is idle
17
18// Exit codes and PIDs are always within i32 range in practice
19#![allow(clippy::cast_possible_wrap)]
20
21use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
22use crate::cli_backend::{CliBackend, OutputFormat};
23use crate::stream_handler::{SessionResult, StreamHandler};
24#[cfg(unix)]
25use nix::sys::signal::{Signal, kill};
26#[cfg(unix)]
27use nix::unistd::Pid;
28use portable_pty::{CommandBuilder, PtyPair, PtySize, native_pty_system};
29use std::io::{self, Read, Write};
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use std::time::{Duration, Instant};
33use tokio::sync::{mpsc, watch};
34use tracing::{debug, info, warn};
35
36/// Result of a PTY execution.
37#[derive(Debug)]
38pub struct PtyExecutionResult {
39    /// The accumulated output (ANSI sequences preserved).
40    pub output: String,
41    /// The ANSI-stripped output for event parsing.
42    pub stripped_output: String,
43    /// Extracted text content from NDJSON stream (for Claude's stream-json output).
44    /// When Claude outputs `--output-format stream-json`, event tags like
45    /// `<event topic="...">` are inside JSON string values. This field contains
46    /// the extracted text content for proper event parsing.
47    /// Empty for non-JSON backends (use `stripped_output` instead).
48    pub extracted_text: String,
49    /// Whether the process exited successfully.
50    pub success: bool,
51    /// The exit code if available.
52    pub exit_code: Option<i32>,
53    /// How the process was terminated.
54    pub termination: TerminationType,
55}
56
57/// How the PTY process was terminated.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum TerminationType {
60    /// Process exited naturally.
61    Natural,
62    /// Terminated due to idle timeout.
63    IdleTimeout,
64    /// Terminated by user (double Ctrl+C).
65    UserInterrupt,
66    /// Force killed by user (Ctrl+\).
67    ForceKill,
68}
69
70/// Configuration for PTY execution.
71#[derive(Debug, Clone)]
72pub struct PtyConfig {
73    /// Enable interactive mode (forward user input).
74    pub interactive: bool,
75    /// Idle timeout in seconds (0 = disabled).
76    pub idle_timeout_secs: u32,
77    /// Terminal width.
78    pub cols: u16,
79    /// Terminal height.
80    pub rows: u16,
81    /// Workspace root directory for command execution.
82    /// This is captured at startup to avoid `current_dir()` failures when the
83    /// working directory no longer exists (e.g., in E2E test workspaces).
84    pub workspace_root: std::path::PathBuf,
85}
86
87impl Default for PtyConfig {
88    fn default() -> Self {
89        Self {
90            interactive: true,
91            idle_timeout_secs: 30,
92            cols: 80,
93            rows: 24,
94            workspace_root: std::env::current_dir()
95                .unwrap_or_else(|_| std::path::PathBuf::from(".")),
96        }
97    }
98}
99
100impl PtyConfig {
101    /// Creates config from environment, falling back to defaults.
102    pub fn from_env() -> Self {
103        let cols = std::env::var("COLUMNS")
104            .ok()
105            .and_then(|s| s.parse().ok())
106            .unwrap_or(80);
107        let rows = std::env::var("LINES")
108            .ok()
109            .and_then(|s| s.parse().ok())
110            .unwrap_or(24);
111
112        Self {
113            cols,
114            rows,
115            ..Default::default()
116        }
117    }
118
119    /// Sets the workspace root directory.
120    pub fn with_workspace_root(mut self, root: impl Into<std::path::PathBuf>) -> Self {
121        self.workspace_root = root.into();
122        self
123    }
124}
125
126/// State machine for double Ctrl+C detection.
127#[derive(Debug)]
128pub struct CtrlCState {
129    /// When the first Ctrl+C was pressed (if any).
130    first_press: Option<Instant>,
131    /// Window duration for double-press detection.
132    window: Duration,
133}
134
135/// Action to take after handling Ctrl+C.
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum CtrlCAction {
138    /// Forward the Ctrl+C to Claude and start/restart the window.
139    ForwardAndStartWindow,
140    /// Terminate Claude (second Ctrl+C within window).
141    Terminate,
142}
143
144impl CtrlCState {
145    /// Creates a new Ctrl+C state tracker.
146    pub fn new() -> Self {
147        Self {
148            first_press: None,
149            window: Duration::from_secs(1),
150        }
151    }
152
153    /// Handles a Ctrl+C keypress and returns the action to take.
154    pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
155        match self.first_press {
156            Some(first) if now.duration_since(first) < self.window => {
157                // Second Ctrl+C within window - terminate
158                self.first_press = None;
159                CtrlCAction::Terminate
160            }
161            _ => {
162                // First Ctrl+C or window expired - forward and start window
163                self.first_press = Some(now);
164                CtrlCAction::ForwardAndStartWindow
165            }
166        }
167    }
168}
169
170impl Default for CtrlCState {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176/// Executor for running prompts in a pseudo-terminal.
177pub struct PtyExecutor {
178    backend: CliBackend,
179    config: PtyConfig,
180    // Channel ends for TUI integration
181    output_tx: mpsc::UnboundedSender<Vec<u8>>,
182    output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
183    input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
184    input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
185    control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
186    control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
187    // Termination notification for TUI
188    terminated_tx: watch::Sender<bool>,
189    terminated_rx: Option<watch::Receiver<bool>>,
190    // Explicit TUI mode flag - set via set_tui_mode() when TUI is connected.
191    // This replaces the previous inference via output_rx.is_none() which broke
192    // after the streaming refactor (handle() is no longer called in TUI mode).
193    tui_mode: bool,
194}
195
196impl PtyExecutor {
197    /// Creates a new PTY executor with the given backend and configuration.
198    pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
199        let (output_tx, output_rx) = mpsc::unbounded_channel();
200        let (input_tx, input_rx) = mpsc::unbounded_channel();
201        let (control_tx, control_rx) = mpsc::unbounded_channel();
202        let (terminated_tx, terminated_rx) = watch::channel(false);
203
204        Self {
205            backend,
206            config,
207            output_tx,
208            output_rx: Some(output_rx),
209            input_tx: Some(input_tx),
210            input_rx,
211            control_tx: Some(control_tx),
212            control_rx,
213            terminated_tx,
214            terminated_rx: Some(terminated_rx),
215            tui_mode: false,
216        }
217    }
218
219    /// Sets the TUI mode flag.
220    ///
221    /// When TUI mode is enabled, PTY output is sent to the TUI channel instead of
222    /// being written directly to stdout. This flag must be set before calling any
223    /// of the run methods when using the TUI.
224    ///
225    /// # Arguments
226    /// * `enabled` - Whether TUI mode should be active
227    pub fn set_tui_mode(&mut self, enabled: bool) {
228        self.tui_mode = enabled;
229    }
230
231    /// Updates the backend configuration for this executor.
232    ///
233    /// This allows switching backends between iterations without recreating
234    /// the entire executor. Critical for hat-level backend configuration support.
235    ///
236    /// # Arguments
237    /// * `backend` - The new backend configuration to use
238    pub fn set_backend(&mut self, backend: CliBackend) {
239        self.backend = backend;
240    }
241
242    /// Returns a handle for TUI integration.
243    ///
244    /// Can only be called once - panics if called multiple times.
245    pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
246        crate::pty_handle::PtyHandle {
247            output_rx: self.output_rx.take().expect("handle() already called"),
248            input_tx: self.input_tx.take().expect("handle() already called"),
249            control_tx: self.control_tx.take().expect("handle() already called"),
250            terminated_rx: self.terminated_rx.take().expect("handle() already called"),
251        }
252    }
253
254    /// Spawns Claude in a PTY and returns the PTY pair, child process, stdin input, and temp file.
255    ///
256    /// The temp file is returned to keep it alive for the duration of execution.
257    /// For large prompts (>7000 chars), Claude is instructed to read from a temp file.
258    /// If the temp file is dropped before Claude reads it, the file is deleted and Claude hangs.
259    ///
260    /// The stdin_input is returned so callers can write it to the PTY after taking the writer.
261    /// This is necessary because `take_writer()` can only be called once per PTY.
262    fn spawn_pty(
263        &self,
264        prompt: &str,
265    ) -> io::Result<(
266        PtyPair,
267        Box<dyn portable_pty::Child + Send>,
268        Option<String>,
269        Option<tempfile::NamedTempFile>,
270    )> {
271        let pty_system = native_pty_system();
272
273        let pair = pty_system
274            .openpty(PtySize {
275                rows: self.config.rows,
276                cols: self.config.cols,
277                pixel_width: 0,
278                pixel_height: 0,
279            })
280            .map_err(|e| io::Error::other(e.to_string()))?;
281
282        let (cmd, args, stdin_input, temp_file) =
283            self.backend.build_command(prompt, self.config.interactive);
284
285        let mut cmd_builder = CommandBuilder::new(&cmd);
286        cmd_builder.args(&args);
287
288        // Set explicit working directory from config (captured at startup to avoid
289        // current_dir() failures when workspace no longer exists)
290        cmd_builder.cwd(&self.config.workspace_root);
291
292        // Set up environment for PTY
293        cmd_builder.env("TERM", "xterm-256color");
294        let child = pair
295            .slave
296            .spawn_command(cmd_builder)
297            .map_err(|e| io::Error::other(e.to_string()))?;
298
299        // Return stdin_input so callers can write it after taking the writer
300        Ok((pair, child, stdin_input, temp_file))
301    }
302
303    /// Runs in observe mode (output-only, no input forwarding).
304    ///
305    /// This is an async function that listens for interrupt signals via the shared
306    /// `interrupt_rx` watch channel from the event loop.
307    /// Uses a separate thread for blocking PTY reads and tokio::select! for signal handling.
308    ///
309    /// Returns when the process exits, idle timeout triggers, or interrupt is received.
310    ///
311    /// # Arguments
312    /// * `prompt` - The prompt to execute
313    /// * `interrupt_rx` - Watch channel receiver for interrupt signals from the event loop
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if PTY allocation fails, the command cannot be spawned,
318    /// or an I/O error occurs during output handling.
319    pub async fn run_observe(
320        &self,
321        prompt: &str,
322        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
323    ) -> io::Result<PtyExecutionResult> {
324        // Keep temp_file alive for the duration of execution (large prompts use temp files)
325        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
326
327        let reader = pair
328            .master
329            .try_clone_reader()
330            .map_err(|e| io::Error::other(e.to_string()))?;
331
332        // Write stdin input if present (for stdin prompt mode)
333        if let Some(ref input) = stdin_input {
334            // Small delay to let process initialize
335            tokio::time::sleep(Duration::from_millis(100)).await;
336            let mut writer = pair
337                .master
338                .take_writer()
339                .map_err(|e| io::Error::other(e.to_string()))?;
340            writer.write_all(input.as_bytes())?;
341            writer.write_all(b"\n")?;
342            writer.flush()?;
343        }
344
345        // Drop the slave to signal EOF when master closes
346        drop(pair.slave);
347
348        let mut output = Vec::new();
349        let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
350            None
351        } else {
352            Some(Duration::from_secs(u64::from(
353                self.config.idle_timeout_secs,
354            )))
355        };
356
357        let mut termination = TerminationType::Natural;
358        let mut last_activity = Instant::now();
359
360        // Flag for termination request (shared with reader thread)
361        let should_terminate = Arc::new(AtomicBool::new(false));
362
363        // Spawn blocking reader thread that sends output via channel
364        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
365        let should_terminate_reader = Arc::clone(&should_terminate);
366        // Check if TUI is handling output (output_rx taken by handle())
367        let tui_connected = self.tui_mode;
368        let tui_output_tx = if tui_connected {
369            Some(self.output_tx.clone())
370        } else {
371            None
372        };
373
374        debug!("Spawning PTY output reader thread (observe mode)");
375        std::thread::spawn(move || {
376            let mut reader = reader;
377            let mut buf = [0u8; 4096];
378
379            loop {
380                if should_terminate_reader.load(Ordering::SeqCst) {
381                    debug!("PTY reader: termination requested");
382                    break;
383                }
384
385                match reader.read(&mut buf) {
386                    Ok(0) => {
387                        debug!("PTY reader: EOF");
388                        let _ = output_tx.blocking_send(OutputEvent::Eof);
389                        break;
390                    }
391                    Ok(n) => {
392                        let data = buf[..n].to_vec();
393                        // Send to TUI channel if connected
394                        if let Some(ref tx) = tui_output_tx {
395                            let _ = tx.send(data.clone());
396                        }
397                        // Send to main loop
398                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
399                            break;
400                        }
401                    }
402                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
403                        std::thread::sleep(Duration::from_millis(10));
404                    }
405                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
406                    Err(e) => {
407                        debug!(error = %e, "PTY reader error");
408                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
409                        break;
410                    }
411                }
412            }
413        });
414
415        // Main event loop using tokio::select! for interruptibility
416        loop {
417            // Calculate timeout for idle check
418            let idle_timeout = timeout_duration.map(|d| {
419                let elapsed = last_activity.elapsed();
420                if elapsed >= d {
421                    Duration::from_millis(1) // Trigger immediately
422                } else {
423                    d.saturating_sub(elapsed)
424                }
425            });
426
427            tokio::select! {
428                // Check for interrupt signal from event loop
429                _ = interrupt_rx.changed() => {
430                    if *interrupt_rx.borrow() {
431                        debug!("Interrupt received in observe mode, terminating");
432                        termination = TerminationType::UserInterrupt;
433                        should_terminate.store(true, Ordering::SeqCst);
434                        let _ = self.terminate_child(&mut child, true).await;
435                        break;
436                    }
437                }
438
439                // Check for output from reader thread
440                event = output_rx.recv() => {
441                    match event {
442                        Some(OutputEvent::Data(data)) => {
443                            // Only write to stdout if TUI is NOT handling output
444                            if !tui_connected {
445                                io::stdout().write_all(&data)?;
446                                io::stdout().flush()?;
447                            }
448                            output.extend_from_slice(&data);
449                            last_activity = Instant::now();
450                        }
451                        Some(OutputEvent::Eof) | None => {
452                            debug!("Output channel closed, process likely exited");
453                            break;
454                        }
455                        Some(OutputEvent::Error(e)) => {
456                            debug!(error = %e, "Reader thread reported error");
457                            break;
458                        }
459                    }
460                }
461
462                // Check for idle timeout
463                _ = async {
464                    if let Some(timeout) = idle_timeout {
465                        tokio::time::sleep(timeout).await;
466                    } else {
467                        // No timeout configured, wait forever
468                        std::future::pending::<()>().await;
469                    }
470                } => {
471                    warn!(
472                        timeout_secs = self.config.idle_timeout_secs,
473                        "Idle timeout triggered"
474                    );
475                    termination = TerminationType::IdleTimeout;
476                    should_terminate.store(true, Ordering::SeqCst);
477                    self.terminate_child(&mut child, true).await?;
478                    break;
479                }
480            }
481
482            // Check if child has exited
483            if let Some(status) = child
484                .try_wait()
485                .map_err(|e| io::Error::other(e.to_string()))?
486            {
487                let exit_code = status.exit_code() as i32;
488                debug!(exit_status = ?status, exit_code, "Child process exited");
489
490                // Drain any remaining output from channel
491                while let Ok(event) = output_rx.try_recv() {
492                    if let OutputEvent::Data(data) = event {
493                        if !tui_connected {
494                            io::stdout().write_all(&data)?;
495                            io::stdout().flush()?;
496                        }
497                        output.extend_from_slice(&data);
498                    }
499                }
500
501                let final_termination = resolve_termination_type(exit_code, termination);
502                // run_observe doesn't parse JSON, so extracted_text is empty
503                return Ok(build_result(
504                    &output,
505                    status.success(),
506                    Some(exit_code),
507                    final_termination,
508                    String::new(),
509                ));
510            }
511        }
512
513        // Signal reader thread to stop
514        should_terminate.store(true, Ordering::SeqCst);
515
516        // Wait for child to fully exit (interruptible + bounded)
517        let status = self
518            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
519            .await?;
520
521        let (success, exit_code, final_termination) = match status {
522            Some(s) => {
523                let code = s.exit_code() as i32;
524                (
525                    s.success(),
526                    Some(code),
527                    resolve_termination_type(code, termination),
528                )
529            }
530            None => {
531                warn!("Timed out waiting for child to exit after termination");
532                (false, None, termination)
533            }
534        };
535
536        // run_observe doesn't parse JSON, so extracted_text is empty
537        Ok(build_result(
538            &output,
539            success,
540            exit_code,
541            final_termination,
542            String::new(),
543        ))
544    }
545
546    /// Runs in observe mode with streaming event handling for JSON output.
547    ///
548    /// When the backend's output format is `StreamJson`, this method parses
549    /// NDJSON lines and dispatches events to the provided handler for real-time
550    /// display. For `Text` format, behaves identically to `run_observe`.
551    ///
552    /// # Arguments
553    /// * `prompt` - The prompt to execute
554    /// * `interrupt_rx` - Watch channel receiver for interrupt signals
555    /// * `handler` - Handler to receive streaming events
556    ///
557    /// # Errors
558    ///
559    /// Returns an error if PTY allocation fails, the command cannot be spawned,
560    /// or an I/O error occurs during output handling.
561    pub async fn run_observe_streaming<H: StreamHandler>(
562        &self,
563        prompt: &str,
564        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
565        handler: &mut H,
566    ) -> io::Result<PtyExecutionResult> {
567        // Check output format to decide parsing strategy
568        let output_format = self.backend.output_format;
569
570        // StreamJson format uses NDJSON line parsing
571        // Text format streams raw output directly to handler
572        let is_stream_json = output_format == OutputFormat::StreamJson;
573
574        // Keep temp_file alive for the duration of execution
575        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
576
577        let reader = pair
578            .master
579            .try_clone_reader()
580            .map_err(|e| io::Error::other(e.to_string()))?;
581
582        // Write stdin input if present (for stdin prompt mode)
583        if let Some(ref input) = stdin_input {
584            tokio::time::sleep(Duration::from_millis(100)).await;
585            let mut writer = pair
586                .master
587                .take_writer()
588                .map_err(|e| io::Error::other(e.to_string()))?;
589            writer.write_all(input.as_bytes())?;
590            writer.write_all(b"\n")?;
591            writer.flush()?;
592        }
593
594        drop(pair.slave);
595
596        let mut output = Vec::new();
597        let mut line_buffer = String::new();
598        // Accumulate extracted text from NDJSON for event parsing
599        let mut extracted_text = String::new();
600        let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
601            None
602        } else {
603            Some(Duration::from_secs(u64::from(
604                self.config.idle_timeout_secs,
605            )))
606        };
607
608        let mut termination = TerminationType::Natural;
609        let mut last_activity = Instant::now();
610
611        let should_terminate = Arc::new(AtomicBool::new(false));
612
613        // Spawn blocking reader thread
614        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
615        let should_terminate_reader = Arc::clone(&should_terminate);
616        let tui_connected = self.tui_mode;
617        let tui_output_tx = if tui_connected {
618            Some(self.output_tx.clone())
619        } else {
620            None
621        };
622
623        debug!("Spawning PTY output reader thread (streaming mode)");
624        std::thread::spawn(move || {
625            let mut reader = reader;
626            let mut buf = [0u8; 4096];
627
628            loop {
629                if should_terminate_reader.load(Ordering::SeqCst) {
630                    debug!("PTY reader: termination requested");
631                    break;
632                }
633
634                match reader.read(&mut buf) {
635                    Ok(0) => {
636                        debug!("PTY reader: EOF");
637                        let _ = output_tx.blocking_send(OutputEvent::Eof);
638                        break;
639                    }
640                    Ok(n) => {
641                        let data = buf[..n].to_vec();
642                        if let Some(ref tx) = tui_output_tx {
643                            let _ = tx.send(data.clone());
644                        }
645                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
646                            break;
647                        }
648                    }
649                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
650                        std::thread::sleep(Duration::from_millis(10));
651                    }
652                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
653                    Err(e) => {
654                        debug!(error = %e, "PTY reader error");
655                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
656                        break;
657                    }
658                }
659            }
660        });
661
662        // Main event loop with JSON line parsing
663        loop {
664            let idle_timeout = timeout_duration.map(|d| {
665                let elapsed = last_activity.elapsed();
666                if elapsed >= d {
667                    Duration::from_millis(1)
668                } else {
669                    d.saturating_sub(elapsed)
670                }
671            });
672
673            tokio::select! {
674                _ = interrupt_rx.changed() => {
675                    if *interrupt_rx.borrow() {
676                        debug!("Interrupt received in streaming observe mode, terminating");
677                        termination = TerminationType::UserInterrupt;
678                        should_terminate.store(true, Ordering::SeqCst);
679                        let _ = self.terminate_child(&mut child, true).await;
680                        break;
681                    }
682                }
683
684                event = output_rx.recv() => {
685                    match event {
686                        Some(OutputEvent::Data(data)) => {
687                            output.extend_from_slice(&data);
688                            last_activity = Instant::now();
689
690                            if let Ok(text) = std::str::from_utf8(&data) {
691                                if is_stream_json {
692                                    // StreamJson format: Parse JSON lines from the data
693                                    line_buffer.push_str(text);
694
695                                    // Process complete lines
696                                    while let Some(newline_pos) = line_buffer.find('\n') {
697                                        let line = line_buffer[..newline_pos].to_string();
698                                        line_buffer = line_buffer[newline_pos + 1..].to_string();
699
700                                        if let Some(event) = ClaudeStreamParser::parse_line(&line) {
701                                            dispatch_stream_event(event, handler, &mut extracted_text);
702                                        }
703                                    }
704                                } else {
705                                    // Text format: Stream raw output directly to handler
706                                    // This preserves ANSI escape codes for TUI rendering
707                                    handler.on_text(text);
708                                }
709                            }
710                        }
711                        Some(OutputEvent::Eof) | None => {
712                            debug!("Output channel closed");
713                            // Process any remaining content in buffer (StreamJson only)
714                            if is_stream_json && !line_buffer.is_empty()
715                                && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
716                            {
717                                dispatch_stream_event(event, handler, &mut extracted_text);
718                            }
719                            break;
720                        }
721                        Some(OutputEvent::Error(e)) => {
722                            debug!(error = %e, "Reader thread reported error");
723                            handler.on_error(&e);
724                            break;
725                        }
726                    }
727                }
728
729                _ = async {
730                    if let Some(timeout) = idle_timeout {
731                        tokio::time::sleep(timeout).await;
732                    } else {
733                        std::future::pending::<()>().await;
734                    }
735                } => {
736                    warn!(
737                        timeout_secs = self.config.idle_timeout_secs,
738                        "Idle timeout triggered"
739                    );
740                    termination = TerminationType::IdleTimeout;
741                    should_terminate.store(true, Ordering::SeqCst);
742                    self.terminate_child(&mut child, true).await?;
743                    break;
744                }
745            }
746
747            // Check if child has exited
748            if let Some(status) = child
749                .try_wait()
750                .map_err(|e| io::Error::other(e.to_string()))?
751            {
752                let exit_code = status.exit_code() as i32;
753                debug!(exit_status = ?status, exit_code, "Child process exited");
754
755                // Drain remaining output
756                while let Ok(event) = output_rx.try_recv() {
757                    if let OutputEvent::Data(data) = event {
758                        output.extend_from_slice(&data);
759                        if let Ok(text) = std::str::from_utf8(&data) {
760                            if is_stream_json {
761                                // StreamJson: parse JSON lines
762                                line_buffer.push_str(text);
763                                while let Some(newline_pos) = line_buffer.find('\n') {
764                                    let line = line_buffer[..newline_pos].to_string();
765                                    line_buffer = line_buffer[newline_pos + 1..].to_string();
766                                    if let Some(event) = ClaudeStreamParser::parse_line(&line) {
767                                        dispatch_stream_event(event, handler, &mut extracted_text);
768                                    }
769                                }
770                            } else {
771                                // Text: stream raw output to handler
772                                handler.on_text(text);
773                            }
774                        }
775                    }
776                }
777
778                // Process final buffer content (StreamJson only)
779                if is_stream_json
780                    && !line_buffer.is_empty()
781                    && let Some(event) = ClaudeStreamParser::parse_line(&line_buffer)
782                {
783                    dispatch_stream_event(event, handler, &mut extracted_text);
784                }
785
786                let final_termination = resolve_termination_type(exit_code, termination);
787                // Pass extracted_text for event parsing from NDJSON
788                return Ok(build_result(
789                    &output,
790                    status.success(),
791                    Some(exit_code),
792                    final_termination,
793                    extracted_text,
794                ));
795            }
796        }
797
798        should_terminate.store(true, Ordering::SeqCst);
799
800        let status = self
801            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
802            .await?;
803
804        let (success, exit_code, final_termination) = match status {
805            Some(s) => {
806                let code = s.exit_code() as i32;
807                (
808                    s.success(),
809                    Some(code),
810                    resolve_termination_type(code, termination),
811                )
812            }
813            None => {
814                warn!("Timed out waiting for child to exit after termination");
815                (false, None, termination)
816            }
817        };
818
819        // Pass extracted_text for event parsing from NDJSON
820        Ok(build_result(
821            &output,
822            success,
823            exit_code,
824            final_termination,
825            extracted_text,
826        ))
827    }
828
829    /// Runs in interactive mode (bidirectional I/O).
830    ///
831    /// Uses `tokio::select!` for non-blocking I/O multiplexing between:
832    /// 1. PTY output (from blocking reader via channel)
833    /// 2. User input (from stdin thread via channel)
834    /// 3. Interrupt signal from event loop
835    /// 4. Idle timeout
836    ///
837    /// This design ensures Ctrl+C is always responsive, even when the PTY
838    /// has no output (e.g., during long-running tool calls).
839    ///
840    /// # Arguments
841    /// * `prompt` - The prompt to execute
842    /// * `interrupt_rx` - Watch channel receiver for interrupt signals from the event loop
843    ///
844    /// # Errors
845    ///
846    /// Returns an error if PTY allocation fails, the command cannot be spawned,
847    /// or an I/O error occurs during bidirectional communication.
848    #[allow(clippy::too_many_lines)] // Complex state machine requires cohesive implementation
849    pub async fn run_interactive(
850        &mut self,
851        prompt: &str,
852        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
853    ) -> io::Result<PtyExecutionResult> {
854        // Keep temp_file alive for the duration of execution (large prompts use temp files)
855        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
856
857        let reader = pair
858            .master
859            .try_clone_reader()
860            .map_err(|e| io::Error::other(e.to_string()))?;
861        let mut writer = pair
862            .master
863            .take_writer()
864            .map_err(|e| io::Error::other(e.to_string()))?;
865
866        // Keep master for resize operations
867        let master = pair.master;
868
869        // Drop the slave to signal EOF when master closes
870        drop(pair.slave);
871
872        // Store stdin_input for writing after reader thread starts
873        let pending_stdin = stdin_input;
874
875        let mut output = Vec::new();
876        let timeout_duration = if self.config.idle_timeout_secs > 0 {
877            Some(Duration::from_secs(u64::from(
878                self.config.idle_timeout_secs,
879            )))
880        } else {
881            None
882        };
883
884        let mut ctrl_c_state = CtrlCState::new();
885        let mut termination = TerminationType::Natural;
886        let mut last_activity = Instant::now();
887
888        // Flag for termination request (shared with spawned tasks)
889        let should_terminate = Arc::new(AtomicBool::new(false));
890
891        // Spawn output reading task (blocking read wrapped in spawn_blocking via channel)
892        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
893        let should_terminate_output = Arc::clone(&should_terminate);
894        // Check if TUI is handling output (output_rx taken by handle())
895        let tui_connected = self.tui_mode;
896        let tui_output_tx = if tui_connected {
897            Some(self.output_tx.clone())
898        } else {
899            None
900        };
901
902        debug!("Spawning PTY output reader thread");
903        std::thread::spawn(move || {
904            debug!("PTY output reader thread started");
905            let mut reader = reader;
906            let mut buf = [0u8; 4096];
907
908            loop {
909                if should_terminate_output.load(Ordering::SeqCst) {
910                    debug!("PTY output reader: termination requested");
911                    break;
912                }
913
914                match reader.read(&mut buf) {
915                    Ok(0) => {
916                        // EOF - PTY closed
917                        debug!("PTY output reader: EOF received");
918                        let _ = output_tx.blocking_send(OutputEvent::Eof);
919                        break;
920                    }
921                    Ok(n) => {
922                        let data = buf[..n].to_vec();
923                        // Send to TUI channel if connected
924                        if let Some(ref tx) = tui_output_tx {
925                            let _ = tx.send(data.clone());
926                        }
927                        // Send to main loop
928                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
929                            debug!("PTY output reader: channel closed");
930                            break;
931                        }
932                    }
933                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
934                        // Non-blocking mode: no data available, yield briefly
935                        std::thread::sleep(Duration::from_millis(1));
936                    }
937                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {
938                        // Interrupted by signal, retry
939                    }
940                    Err(e) => {
941                        warn!("PTY output reader: error - {}", e);
942                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
943                        break;
944                    }
945                }
946            }
947            debug!("PTY output reader thread exiting");
948        });
949
950        // Spawn input reading task - ONLY when TUI is NOT connected
951        // In TUI mode (observation mode), user input should not be captured from stdin.
952        // The TUI has its own input handling, and raw Ctrl+C should go directly to the
953        // signal handler (interrupt_rx) without racing with the stdin reader.
954        let mut input_rx = if tui_connected {
955            debug!("TUI connected - skipping stdin reader thread");
956            None
957        } else {
958            let (input_tx, input_rx) = mpsc::unbounded_channel::<InputEvent>();
959            let should_terminate_input = Arc::clone(&should_terminate);
960
961            std::thread::spawn(move || {
962                let mut stdin = io::stdin();
963                let mut buf = [0u8; 1];
964
965                loop {
966                    if should_terminate_input.load(Ordering::SeqCst) {
967                        break;
968                    }
969
970                    match stdin.read(&mut buf) {
971                        Ok(0) => break, // EOF
972                        Ok(1) => {
973                            let byte = buf[0];
974                            let event = match byte {
975                                3 => InputEvent::CtrlC,          // Ctrl+C
976                                28 => InputEvent::CtrlBackslash, // Ctrl+\
977                                _ => InputEvent::Data(vec![byte]),
978                            };
979                            if input_tx.send(event).is_err() {
980                                break;
981                            }
982                        }
983                        Ok(_) => {} // Shouldn't happen with 1-byte buffer
984                        Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
985                        Err(_) => break,
986                    }
987                }
988            });
989            Some(input_rx)
990        };
991
992        // Write stdin input after threads are spawned (so we capture any output)
993        // Give Claude's TUI a moment to initialize before sending the prompt
994        if let Some(ref input) = pending_stdin {
995            tokio::time::sleep(Duration::from_millis(100)).await;
996            writer.write_all(input.as_bytes())?;
997            writer.write_all(b"\n")?;
998            writer.flush()?;
999            last_activity = Instant::now();
1000        }
1001
1002        // Main select loop - this is the key fix for blocking I/O
1003        // We use tokio::select! to multiplex between output, input, and timeout
1004        loop {
1005            // Check if child has exited (non-blocking check before select)
1006            if let Some(status) = child
1007                .try_wait()
1008                .map_err(|e| io::Error::other(e.to_string()))?
1009            {
1010                let exit_code = status.exit_code() as i32;
1011                debug!(exit_status = ?status, exit_code, "Child process exited");
1012
1013                // Drain remaining output already buffered.
1014                while let Ok(event) = output_rx.try_recv() {
1015                    if let OutputEvent::Data(data) = event {
1016                        if !tui_connected {
1017                            io::stdout().write_all(&data)?;
1018                            io::stdout().flush()?;
1019                        }
1020                        output.extend_from_slice(&data);
1021                    }
1022                }
1023
1024                // Give the reader thread a brief window to flush any final bytes/EOF.
1025                // This avoids races where fast-exiting commands drop output before we return.
1026                let drain_deadline = Instant::now() + Duration::from_millis(200);
1027                loop {
1028                    let remaining = drain_deadline.saturating_duration_since(Instant::now());
1029                    if remaining.is_zero() {
1030                        break;
1031                    }
1032                    match tokio::time::timeout(remaining, output_rx.recv()).await {
1033                        Ok(Some(OutputEvent::Data(data))) => {
1034                            if !tui_connected {
1035                                io::stdout().write_all(&data)?;
1036                                io::stdout().flush()?;
1037                            }
1038                            output.extend_from_slice(&data);
1039                        }
1040                        Ok(Some(OutputEvent::Eof) | None) => break,
1041                        Ok(Some(OutputEvent::Error(e))) => {
1042                            debug!(error = %e, "PTY read error after exit");
1043                            break;
1044                        }
1045                        Err(_) => break, // timeout
1046                    }
1047                }
1048
1049                should_terminate.store(true, Ordering::SeqCst);
1050                // Signal TUI that PTY has terminated
1051                let _ = self.terminated_tx.send(true);
1052
1053                let final_termination = resolve_termination_type(exit_code, termination);
1054                // run_interactive doesn't parse JSON, so extracted_text is empty
1055                return Ok(build_result(
1056                    &output,
1057                    status.success(),
1058                    Some(exit_code),
1059                    final_termination,
1060                    String::new(),
1061                ));
1062            }
1063
1064            // Build the timeout future (or a never-completing one if disabled)
1065            let timeout_future = async {
1066                match timeout_duration {
1067                    Some(d) => {
1068                        let elapsed = last_activity.elapsed();
1069                        if elapsed >= d {
1070                            tokio::time::sleep(Duration::ZERO).await
1071                        } else {
1072                            tokio::time::sleep(d.saturating_sub(elapsed)).await
1073                        }
1074                    }
1075                    None => std::future::pending::<()>().await,
1076                }
1077            };
1078
1079            tokio::select! {
1080                // PTY output received
1081                output_event = output_rx.recv() => {
1082                    match output_event {
1083                        Some(OutputEvent::Data(data)) => {
1084                            // Only write to stdout if TUI is NOT handling output
1085                            if !tui_connected {
1086                                io::stdout().write_all(&data)?;
1087                                io::stdout().flush()?;
1088                            }
1089                            output.extend_from_slice(&data);
1090
1091                            last_activity = Instant::now();
1092                        }
1093                        Some(OutputEvent::Eof) => {
1094                            debug!("PTY EOF received");
1095                            break;
1096                        }
1097                        Some(OutputEvent::Error(e)) => {
1098                            debug!(error = %e, "PTY read error");
1099                            break;
1100                        }
1101                        None => {
1102                            // Channel closed, reader thread exited
1103                            break;
1104                        }
1105                    }
1106                }
1107
1108                // User input received (from stdin) - only active when TUI is NOT connected
1109                input_event = async {
1110                    match input_rx.as_mut() {
1111                        Some(rx) => rx.recv().await,
1112                        None => std::future::pending().await, // Never resolves when TUI is connected
1113                    }
1114                } => {
1115                    match input_event {
1116                        Some(InputEvent::CtrlC) => {
1117                            match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1118                                CtrlCAction::ForwardAndStartWindow => {
1119                                    // Forward Ctrl+C to Claude
1120                                    let _ = writer.write_all(&[3]);
1121                                    let _ = writer.flush();
1122                                    last_activity = Instant::now();
1123                                }
1124                                CtrlCAction::Terminate => {
1125                                    info!("Double Ctrl+C detected, terminating");
1126                                    termination = TerminationType::UserInterrupt;
1127                                    should_terminate.store(true, Ordering::SeqCst);
1128                                    self.terminate_child(&mut child, true).await?;
1129                                    break;
1130                                }
1131                            }
1132                        }
1133                        Some(InputEvent::CtrlBackslash) => {
1134                            info!("Ctrl+\\ detected, force killing");
1135                            termination = TerminationType::ForceKill;
1136                            should_terminate.store(true, Ordering::SeqCst);
1137                            self.terminate_child(&mut child, false).await?;
1138                            break;
1139                        }
1140                        Some(InputEvent::Data(data)) => {
1141                            // Forward to Claude
1142                            let _ = writer.write_all(&data);
1143                            let _ = writer.flush();
1144                            last_activity = Instant::now();
1145                        }
1146                        None => {
1147                            // Input channel closed (stdin EOF)
1148                            debug!("Input channel closed");
1149                        }
1150                    }
1151                }
1152
1153                // TUI input received (convert to InputEvent for unified handling)
1154                tui_input = self.input_rx.recv() => {
1155                    if let Some(data) = tui_input {
1156                        match InputEvent::from_bytes(data) {
1157                            InputEvent::CtrlC => {
1158                                match ctrl_c_state.handle_ctrl_c(Instant::now()) {
1159                                    CtrlCAction::ForwardAndStartWindow => {
1160                                        let _ = writer.write_all(&[3]);
1161                                        let _ = writer.flush();
1162                                        last_activity = Instant::now();
1163                                    }
1164                                    CtrlCAction::Terminate => {
1165                                        info!("Double Ctrl+C detected, terminating");
1166                                        termination = TerminationType::UserInterrupt;
1167                                        should_terminate.store(true, Ordering::SeqCst);
1168                                        self.terminate_child(&mut child, true).await?;
1169                                        break;
1170                                    }
1171                                }
1172                            }
1173                            InputEvent::CtrlBackslash => {
1174                                info!("Ctrl+\\ detected, force killing");
1175                                termination = TerminationType::ForceKill;
1176                                should_terminate.store(true, Ordering::SeqCst);
1177                                self.terminate_child(&mut child, false).await?;
1178                                break;
1179                            }
1180                            InputEvent::Data(bytes) => {
1181                                let _ = writer.write_all(&bytes);
1182                                let _ = writer.flush();
1183                                last_activity = Instant::now();
1184                            }
1185                        }
1186                    }
1187                }
1188
1189                // Control commands from TUI
1190                control_cmd = self.control_rx.recv() => {
1191                    if let Some(cmd) = control_cmd {
1192                        use crate::pty_handle::ControlCommand;
1193                        match cmd {
1194                            ControlCommand::Kill => {
1195                                info!("Control command: Kill");
1196                                termination = TerminationType::UserInterrupt;
1197                                should_terminate.store(true, Ordering::SeqCst);
1198                                self.terminate_child(&mut child, true).await?;
1199                                break;
1200                            }
1201                            ControlCommand::Resize(cols, rows) => {
1202                                debug!(cols, rows, "Control command: Resize");
1203                                // Resize the PTY to match TUI dimensions
1204                                if let Err(e) = master.resize(PtySize {
1205                                    rows,
1206                                    cols,
1207                                    pixel_width: 0,
1208                                    pixel_height: 0,
1209                                }) {
1210                                    warn!("Failed to resize PTY: {}", e);
1211                                }
1212                            }
1213                            ControlCommand::Skip | ControlCommand::Abort => {
1214                                // These are handled at orchestrator level, not here
1215                                debug!("Control command: {:?} (ignored at PTY level)", cmd);
1216                            }
1217                        }
1218                    }
1219                }
1220
1221                // Idle timeout expired
1222                _ = timeout_future => {
1223                    warn!(
1224                        timeout_secs = self.config.idle_timeout_secs,
1225                        "Idle timeout triggered"
1226                    );
1227                    termination = TerminationType::IdleTimeout;
1228                    should_terminate.store(true, Ordering::SeqCst);
1229                    self.terminate_child(&mut child, true).await?;
1230                    break;
1231                }
1232
1233                // Interrupt signal from event loop
1234                _ = interrupt_rx.changed() => {
1235                    if *interrupt_rx.borrow() {
1236                        debug!("Interrupt received in interactive mode, terminating");
1237                        termination = TerminationType::UserInterrupt;
1238                        should_terminate.store(true, Ordering::SeqCst);
1239                        self.terminate_child(&mut child, true).await?;
1240                        break;
1241                    }
1242                }
1243            }
1244        }
1245
1246        // Ensure termination flag is set for spawned threads
1247        should_terminate.store(true, Ordering::SeqCst);
1248
1249        // Signal TUI that PTY has terminated
1250        let _ = self.terminated_tx.send(true);
1251
1252        // Wait for child to fully exit (interruptible + bounded)
1253        let status = self
1254            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1255            .await?;
1256
1257        let (success, exit_code, final_termination) = match status {
1258            Some(s) => {
1259                let code = s.exit_code() as i32;
1260                (
1261                    s.success(),
1262                    Some(code),
1263                    resolve_termination_type(code, termination),
1264                )
1265            }
1266            None => {
1267                warn!("Timed out waiting for child to exit after termination");
1268                (false, None, termination)
1269            }
1270        };
1271
1272        // run_interactive doesn't parse JSON, so extracted_text is empty
1273        Ok(build_result(
1274            &output,
1275            success,
1276            exit_code,
1277            final_termination,
1278            String::new(),
1279        ))
1280    }
1281
1282    /// Terminates the child process.
1283    ///
1284    /// If `graceful` is true, sends SIGTERM and waits up to 5 seconds before SIGKILL.
1285    /// If `graceful` is false, sends SIGKILL immediately.
1286    ///
1287    /// This is an async function to avoid blocking the tokio runtime during the
1288    /// grace period wait. Previously used `std::thread::sleep` which blocked the
1289    /// worker thread for up to 5 seconds, making the TUI appear frozen.
1290    #[allow(clippy::unused_self)] // Self is conceptually the right receiver for this method
1291    #[allow(clippy::unused_async)] // Kept async to preserve signature parity with Unix implementation
1292    #[cfg(not(unix))]
1293    async fn terminate_child(
1294        &self,
1295        child: &mut Box<dyn portable_pty::Child + Send>,
1296        _graceful: bool,
1297    ) -> io::Result<()> {
1298        child.kill()
1299    }
1300
1301    #[cfg(unix)]
1302    async fn terminate_child(
1303        &self,
1304        child: &mut Box<dyn portable_pty::Child + Send>,
1305        graceful: bool,
1306    ) -> io::Result<()> {
1307        let pid = match child.process_id() {
1308            Some(id) => Pid::from_raw(id as i32),
1309            None => return Ok(()), // Already exited
1310        };
1311
1312        if graceful {
1313            debug!(pid = %pid, "Sending SIGTERM");
1314            let _ = kill(pid, Signal::SIGTERM);
1315
1316            // Wait up to 5 seconds for graceful exit (reduced from 5s for better UX)
1317            let grace_period = Duration::from_secs(2);
1318            let start = Instant::now();
1319
1320            while start.elapsed() < grace_period {
1321                if child
1322                    .try_wait()
1323                    .map_err(|e| io::Error::other(e.to_string()))?
1324                    .is_some()
1325                {
1326                    return Ok(());
1327                }
1328                // Use async sleep to avoid blocking the tokio runtime
1329                tokio::time::sleep(Duration::from_millis(50)).await;
1330            }
1331
1332            // Still running after grace period - force kill
1333            debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1334        }
1335
1336        debug!(pid = %pid, "Sending SIGKILL");
1337        let _ = kill(pid, Signal::SIGKILL);
1338        Ok(())
1339    }
1340
1341    /// Waits for the child process to exit, optionally with a timeout.
1342    ///
1343    /// This is interruptible by the shared interrupt channel from the event loop.
1344    /// When interrupted, returns `Ok(None)` to let the caller handle termination.
1345    async fn wait_for_exit(
1346        &self,
1347        child: &mut Box<dyn portable_pty::Child + Send>,
1348        max_wait: Option<Duration>,
1349        interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1350    ) -> io::Result<Option<portable_pty::ExitStatus>> {
1351        let start = Instant::now();
1352
1353        loop {
1354            if let Some(status) = child
1355                .try_wait()
1356                .map_err(|e| io::Error::other(e.to_string()))?
1357            {
1358                return Ok(Some(status));
1359            }
1360
1361            if let Some(max) = max_wait
1362                && start.elapsed() >= max
1363            {
1364                return Ok(None);
1365            }
1366
1367            tokio::select! {
1368                _ = interrupt_rx.changed() => {
1369                    if *interrupt_rx.borrow() {
1370                        debug!("Interrupt received while waiting for child exit");
1371                        return Ok(None);
1372                    }
1373                }
1374                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1375            }
1376        }
1377    }
1378}
1379
1380/// Input events from the user.
1381#[derive(Debug)]
1382enum InputEvent {
1383    /// Ctrl+C pressed.
1384    CtrlC,
1385    /// Ctrl+\ pressed.
1386    CtrlBackslash,
1387    /// Regular data to forward.
1388    Data(Vec<u8>),
1389}
1390
1391impl InputEvent {
1392    /// Creates an InputEvent from raw bytes.
1393    fn from_bytes(data: Vec<u8>) -> Self {
1394        if data.len() == 1 {
1395            match data[0] {
1396                3 => return InputEvent::CtrlC,
1397                28 => return InputEvent::CtrlBackslash,
1398                _ => {}
1399            }
1400        }
1401        InputEvent::Data(data)
1402    }
1403}
1404
1405/// Output events from the PTY.
1406#[derive(Debug)]
1407enum OutputEvent {
1408    /// Data received from PTY.
1409    Data(Vec<u8>),
1410    /// PTY reached EOF (process exited).
1411    Eof,
1412    /// Error reading from PTY.
1413    Error(String),
1414}
1415
1416/// Strips ANSI escape sequences from raw bytes.
1417///
1418/// Uses `strip-ansi-escapes` for direct byte-level ANSI removal without terminal
1419/// emulation. This ensures ALL content is preserved regardless of output size,
1420/// unlike vt100's terminal simulation which can lose content that scrolls off.
1421fn strip_ansi(bytes: &[u8]) -> String {
1422    let stripped = strip_ansi_escapes::strip(bytes);
1423    String::from_utf8_lossy(&stripped).into_owned()
1424}
1425
1426/// Determines the final termination type, accounting for SIGINT exit code.
1427///
1428/// Exit code 130 indicates the process was killed by SIGINT (Ctrl+C forwarded to PTY).
1429fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1430    if exit_code == 130 {
1431        info!("Child process killed by SIGINT");
1432        TerminationType::UserInterrupt
1433    } else {
1434        default
1435    }
1436}
1437
1438/// Dispatches a Claude stream event to the appropriate handler method.
1439/// Also accumulates text content into `extracted_text` for event parsing.
1440fn dispatch_stream_event<H: StreamHandler>(
1441    event: ClaudeStreamEvent,
1442    handler: &mut H,
1443    extracted_text: &mut String,
1444) {
1445    match event {
1446        ClaudeStreamEvent::System { .. } => {
1447            // Session initialization - could log in verbose mode but not user-facing
1448        }
1449        ClaudeStreamEvent::Assistant { message, .. } => {
1450            for block in message.content {
1451                match block {
1452                    ContentBlock::Text { text } => {
1453                        handler.on_text(&text);
1454                        // Accumulate text for event parsing
1455                        extracted_text.push_str(&text);
1456                        extracted_text.push('\n');
1457                    }
1458                    ContentBlock::ToolUse { name, id, input } => {
1459                        handler.on_tool_call(&name, &id, &input)
1460                    }
1461                }
1462            }
1463        }
1464        ClaudeStreamEvent::User { message } => {
1465            for block in message.content {
1466                match block {
1467                    UserContentBlock::ToolResult {
1468                        tool_use_id,
1469                        content,
1470                    } => {
1471                        handler.on_tool_result(&tool_use_id, &content);
1472                    }
1473                }
1474            }
1475        }
1476        ClaudeStreamEvent::Result {
1477            duration_ms,
1478            total_cost_usd,
1479            num_turns,
1480            is_error,
1481        } => {
1482            if is_error {
1483                handler.on_error("Session ended with error");
1484            }
1485            handler.on_complete(&SessionResult {
1486                duration_ms,
1487                total_cost_usd,
1488                num_turns,
1489                is_error,
1490            });
1491        }
1492    }
1493}
1494
1495/// Builds a `PtyExecutionResult` from the accumulated output and exit status.
1496///
1497/// # Arguments
1498/// * `output` - Raw bytes from PTY
1499/// * `success` - Whether process exited successfully
1500/// * `exit_code` - Process exit code if available
1501/// * `termination` - How the process was terminated
1502/// * `extracted_text` - Text extracted from NDJSON stream (for Claude's stream-json)
1503fn build_result(
1504    output: &[u8],
1505    success: bool,
1506    exit_code: Option<i32>,
1507    termination: TerminationType,
1508    extracted_text: String,
1509) -> PtyExecutionResult {
1510    PtyExecutionResult {
1511        output: String::from_utf8_lossy(output).to_string(),
1512        stripped_output: strip_ansi(output),
1513        extracted_text,
1514        success,
1515        exit_code,
1516        termination,
1517    }
1518}
1519
1520#[cfg(test)]
1521mod tests {
1522    use super::*;
1523    use crate::claude_stream::{AssistantMessage, UserMessage};
1524    #[cfg(unix)]
1525    use crate::cli_backend::PromptMode;
1526    use crate::stream_handler::{SessionResult, StreamHandler};
1527    #[cfg(unix)]
1528    use tempfile::TempDir;
1529
1530    #[test]
1531    fn test_double_ctrl_c_within_window() {
1532        let mut state = CtrlCState::new();
1533        let now = Instant::now();
1534
1535        // First Ctrl+C: should forward and start window
1536        let action = state.handle_ctrl_c(now);
1537        assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1538
1539        // Second Ctrl+C within 1 second: should terminate
1540        let later = now + Duration::from_millis(500);
1541        let action = state.handle_ctrl_c(later);
1542        assert_eq!(action, CtrlCAction::Terminate);
1543    }
1544
1545    #[test]
1546    fn test_input_event_from_bytes_ctrl_c() {
1547        let event = InputEvent::from_bytes(vec![3]);
1548        assert!(matches!(event, InputEvent::CtrlC));
1549    }
1550
1551    #[test]
1552    fn test_input_event_from_bytes_ctrl_backslash() {
1553        let event = InputEvent::from_bytes(vec![28]);
1554        assert!(matches!(event, InputEvent::CtrlBackslash));
1555    }
1556
1557    #[test]
1558    fn test_input_event_from_bytes_data() {
1559        let event = InputEvent::from_bytes(vec![b'a']);
1560        assert!(matches!(event, InputEvent::Data(_)));
1561
1562        let event = InputEvent::from_bytes(vec![1, 2, 3]);
1563        assert!(matches!(event, InputEvent::Data(_)));
1564    }
1565
1566    #[test]
1567    fn test_ctrl_c_window_expires() {
1568        let mut state = CtrlCState::new();
1569        let now = Instant::now();
1570
1571        // First Ctrl+C
1572        state.handle_ctrl_c(now);
1573
1574        // Wait 2 seconds (window expires)
1575        let later = now + Duration::from_secs(2);
1576
1577        // Second Ctrl+C: window expired, should forward and start new window
1578        let action = state.handle_ctrl_c(later);
1579        assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1580    }
1581
1582    #[test]
1583    fn test_strip_ansi_basic() {
1584        let input = b"\x1b[1;36m  Thinking...\x1b[0m\r\n";
1585        let stripped = strip_ansi(input);
1586        assert!(stripped.contains("Thinking..."));
1587        assert!(!stripped.contains("\x1b["));
1588    }
1589
1590    #[test]
1591    fn test_completion_promise_extraction() {
1592        // Simulate Claude output with heavy ANSI formatting
1593        let input = b"\x1b[1;36m  Thinking...\x1b[0m\r\n\
1594                      \x1b[2K\x1b[1;32m  Done!\x1b[0m\r\n\
1595                      \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
1596
1597        let stripped = strip_ansi(input);
1598
1599        // Event parser sees clean text
1600        assert!(stripped.contains("LOOP_COMPLETE"));
1601        assert!(!stripped.contains("\x1b["));
1602    }
1603
1604    #[test]
1605    fn test_event_tag_extraction() {
1606        // Event tags may be wrapped in ANSI codes
1607        let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
1608                      Task completed successfully\r\n\
1609                      \x1b[90m</event>\x1b[0m\r\n";
1610
1611        let stripped = strip_ansi(input);
1612
1613        assert!(stripped.contains("<event topic=\"build.done\">"));
1614        assert!(stripped.contains("</event>"));
1615    }
1616
1617    #[test]
1618    fn test_large_output_preserves_early_events() {
1619        // Regression test: ensure event tags aren't lost when output is large
1620        let mut input = Vec::new();
1621
1622        // Event tag at the beginning
1623        input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
1624
1625        // Simulate 500 lines of verbose output (would overflow any terminal)
1626        for i in 0..500 {
1627            input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
1628        }
1629
1630        let stripped = strip_ansi(&input);
1631
1632        // Event tag should still be present - no scrollback loss with strip-ansi-escapes
1633        assert!(
1634            stripped.contains("<event topic=\"build.task\">"),
1635            "Event tag was lost - strip_ansi is not preserving all content"
1636        );
1637        assert!(stripped.contains("Implement feature X"));
1638        assert!(stripped.contains("Line 499")); // Last line should be present too
1639    }
1640
1641    #[test]
1642    fn test_pty_config_defaults() {
1643        let config = PtyConfig::default();
1644        assert!(config.interactive);
1645        assert_eq!(config.idle_timeout_secs, 30);
1646        assert_eq!(config.cols, 80);
1647        assert_eq!(config.rows, 24);
1648    }
1649
1650    #[test]
1651    fn test_pty_config_from_env_matches_env_or_defaults() {
1652        let cols = std::env::var("COLUMNS")
1653            .ok()
1654            .and_then(|value| value.parse::<u16>().ok())
1655            .unwrap_or(80);
1656        let rows = std::env::var("LINES")
1657            .ok()
1658            .and_then(|value| value.parse::<u16>().ok())
1659            .unwrap_or(24);
1660
1661        let config = PtyConfig::from_env();
1662        assert_eq!(config.cols, cols);
1663        assert_eq!(config.rows, rows);
1664    }
1665
1666    /// Verifies that the idle timeout logic in run_interactive correctly handles
1667    /// activity resets. Per spec (interactive-mode.spec.md lines 155-159):
1668    /// - Timeout resets on agent output (any bytes from PTY)
1669    /// - Timeout resets on user input (any key forwarded to agent)
1670    ///
1671    /// This test validates the timeout calculation logic that enables resets.
1672    /// The actual reset happens in the select! branches at lines 497, 523, and 545.
1673    #[test]
1674    fn test_idle_timeout_reset_logic() {
1675        // Simulate the timeout calculation used in run_interactive
1676        let timeout_duration = Duration::from_secs(30);
1677
1678        // Simulate 25 seconds of inactivity
1679        let simulated_25s = Duration::from_secs(25);
1680
1681        // Remaining time before timeout
1682        let remaining = timeout_duration.saturating_sub(simulated_25s);
1683        assert_eq!(remaining.as_secs(), 5);
1684
1685        // After activity (output or input), last_activity would be reset to now
1686        let last_activity_after_reset = Instant::now();
1687
1688        // Now elapsed is 0, full timeout duration available again
1689        let elapsed = last_activity_after_reset.elapsed();
1690        assert!(elapsed < Duration::from_millis(100)); // Should be near-zero
1691
1692        // Timeout calculation would give full duration minus small elapsed
1693        let new_remaining = timeout_duration.saturating_sub(elapsed);
1694        assert!(new_remaining > Duration::from_secs(29)); // Should be nearly full timeout
1695    }
1696
1697    #[test]
1698    fn test_extracted_text_field_exists() {
1699        // Test that PtyExecutionResult has extracted_text field
1700        // This is for NDJSON output where event tags are inside JSON strings
1701        let result = PtyExecutionResult {
1702            output: String::new(),
1703            stripped_output: String::new(),
1704            extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
1705            success: true,
1706            exit_code: Some(0),
1707            termination: TerminationType::Natural,
1708        };
1709
1710        assert!(
1711            result
1712                .extracted_text
1713                .contains("<event topic=\"build.done\">")
1714        );
1715    }
1716
1717    #[test]
1718    fn test_build_result_includes_extracted_text() {
1719        // Test that build_result properly handles extracted_text
1720        let output = b"raw output";
1721        let extracted = "extracted text with <event topic=\"test\">payload</event>";
1722        let result = build_result(
1723            output,
1724            true,
1725            Some(0),
1726            TerminationType::Natural,
1727            extracted.to_string(),
1728        );
1729
1730        assert_eq!(result.extracted_text, extracted);
1731        assert!(result.stripped_output.contains("raw output"));
1732    }
1733
1734    #[test]
1735    fn test_resolve_termination_type_handles_sigint_exit_code() {
1736        let termination = resolve_termination_type(130, TerminationType::Natural);
1737        assert_eq!(termination, TerminationType::UserInterrupt);
1738
1739        let termination = resolve_termination_type(0, TerminationType::ForceKill);
1740        assert_eq!(termination, TerminationType::ForceKill);
1741    }
1742
1743    #[derive(Default)]
1744    struct CapturingHandler {
1745        texts: Vec<String>,
1746        tool_calls: Vec<(String, String, serde_json::Value)>,
1747        tool_results: Vec<(String, String)>,
1748        errors: Vec<String>,
1749        completions: Vec<SessionResult>,
1750    }
1751
1752    impl StreamHandler for CapturingHandler {
1753        fn on_text(&mut self, text: &str) {
1754            self.texts.push(text.to_string());
1755        }
1756
1757        fn on_tool_call(&mut self, name: &str, id: &str, input: &serde_json::Value) {
1758            self.tool_calls
1759                .push((name.to_string(), id.to_string(), input.clone()));
1760        }
1761
1762        fn on_tool_result(&mut self, id: &str, output: &str) {
1763            self.tool_results.push((id.to_string(), output.to_string()));
1764        }
1765
1766        fn on_error(&mut self, error: &str) {
1767            self.errors.push(error.to_string());
1768        }
1769
1770        fn on_complete(&mut self, result: &SessionResult) {
1771            self.completions.push(result.clone());
1772        }
1773    }
1774
1775    #[test]
1776    fn test_dispatch_stream_event_routes_text_and_tool_calls() {
1777        let mut handler = CapturingHandler::default();
1778        let mut extracted_text = String::new();
1779
1780        let event = ClaudeStreamEvent::Assistant {
1781            message: AssistantMessage {
1782                content: vec![
1783                    ContentBlock::Text {
1784                        text: "Hello".to_string(),
1785                    },
1786                    ContentBlock::ToolUse {
1787                        id: "tool-1".to_string(),
1788                        name: "Read".to_string(),
1789                        input: serde_json::json!({"path": "README.md"}),
1790                    },
1791                ],
1792            },
1793            usage: None,
1794        };
1795
1796        dispatch_stream_event(event, &mut handler, &mut extracted_text);
1797
1798        assert_eq!(handler.texts, vec!["Hello".to_string()]);
1799        assert_eq!(handler.tool_calls.len(), 1);
1800        assert!(extracted_text.contains("Hello"));
1801        assert!(extracted_text.ends_with('\n'));
1802    }
1803
1804    #[test]
1805    fn test_dispatch_stream_event_routes_tool_results_and_completion() {
1806        let mut handler = CapturingHandler::default();
1807        let mut extracted_text = String::new();
1808
1809        let event = ClaudeStreamEvent::User {
1810            message: UserMessage {
1811                content: vec![UserContentBlock::ToolResult {
1812                    tool_use_id: "tool-1".to_string(),
1813                    content: "done".to_string(),
1814                }],
1815            },
1816        };
1817
1818        dispatch_stream_event(event, &mut handler, &mut extracted_text);
1819        assert_eq!(handler.tool_results.len(), 1);
1820        assert_eq!(handler.tool_results[0].0, "tool-1");
1821        assert_eq!(handler.tool_results[0].1, "done");
1822
1823        let event = ClaudeStreamEvent::Result {
1824            duration_ms: 12,
1825            total_cost_usd: 0.01,
1826            num_turns: 2,
1827            is_error: true,
1828        };
1829
1830        dispatch_stream_event(event, &mut handler, &mut extracted_text);
1831        assert_eq!(handler.errors.len(), 1);
1832        assert_eq!(handler.completions.len(), 1);
1833        assert!(handler.completions[0].is_error);
1834    }
1835
1836    #[test]
1837    fn test_dispatch_stream_event_system_noop() {
1838        let mut handler = CapturingHandler::default();
1839        let mut extracted_text = String::new();
1840
1841        let event = ClaudeStreamEvent::System {
1842            session_id: "session-1".to_string(),
1843            model: "claude-test".to_string(),
1844            tools: Vec::new(),
1845        };
1846
1847        dispatch_stream_event(event, &mut handler, &mut extracted_text);
1848
1849        assert!(handler.texts.is_empty());
1850        assert!(handler.tool_calls.is_empty());
1851        assert!(handler.tool_results.is_empty());
1852        assert!(handler.errors.is_empty());
1853        assert!(handler.completions.is_empty());
1854        assert!(extracted_text.is_empty());
1855    }
1856
1857    /// Regression test: TUI mode should not spawn stdin reader thread
1858    ///
1859    /// Bug: In TUI mode, Ctrl+C required double-press to exit because the stdin
1860    /// reader thread (which captures byte 0x03) raced with the signal handler.
1861    /// The stdin reader would win, triggering "double Ctrl+C" logic instead of
1862    /// clean exit via interrupt_rx.
1863    ///
1864    /// Fix: When tui_connected=true, skip spawning stdin reader entirely.
1865    /// TUI mode is observation-only; user input should not be captured from stdin.
1866    /// The TUI has its own input handling (Ctrl+a q), and raw Ctrl+C goes directly
1867    /// to the signal handler (interrupt_rx) without racing.
1868    ///
1869    /// This test documents the expected behavior. The actual fix is in
1870    /// run_interactive() where `let mut input_rx = if !tui_connected { ... }`.
1871    #[test]
1872    fn test_tui_mode_stdin_reader_bypass() {
1873        // The tui_connected flag is now determined by the explicit tui_mode field,
1874        // set via set_tui_mode(true) when TUI is connected.
1875        // Previously used output_rx.is_none() which broke after streaming refactor.
1876
1877        // Simulate TUI connected scenario (tui_mode = true)
1878        let tui_mode = true;
1879        let tui_connected = tui_mode;
1880
1881        // When TUI is connected, stdin reader is skipped
1882        // (verified by: input_rx becomes None instead of Some(channel))
1883        assert!(
1884            tui_connected,
1885            "When tui_mode is true, stdin reader must be skipped"
1886        );
1887
1888        // In non-TUI mode, stdin reader is spawned
1889        let tui_mode_disabled = false;
1890        let tui_connected_non_tui = tui_mode_disabled;
1891        assert!(
1892            !tui_connected_non_tui,
1893            "When tui_mode is false, stdin reader must be spawned"
1894        );
1895    }
1896
1897    #[test]
1898    fn test_tui_mode_default_is_false() {
1899        // Create a PtyExecutor and verify tui_mode defaults to false
1900        let backend = CliBackend::claude();
1901        let config = PtyConfig::default();
1902        let executor = PtyExecutor::new(backend, config);
1903
1904        // tui_mode should default to false
1905        assert!(!executor.tui_mode, "tui_mode should default to false");
1906    }
1907
1908    #[test]
1909    fn test_set_tui_mode() {
1910        // Create a PtyExecutor and verify set_tui_mode works
1911        let backend = CliBackend::claude();
1912        let config = PtyConfig::default();
1913        let mut executor = PtyExecutor::new(backend, config);
1914
1915        // Initially false
1916        assert!(!executor.tui_mode, "tui_mode should start as false");
1917
1918        // Set to true
1919        executor.set_tui_mode(true);
1920        assert!(
1921            executor.tui_mode,
1922            "tui_mode should be true after set_tui_mode(true)"
1923        );
1924
1925        // Set back to false
1926        executor.set_tui_mode(false);
1927        assert!(
1928            !executor.tui_mode,
1929            "tui_mode should be false after set_tui_mode(false)"
1930        );
1931    }
1932
1933    #[test]
1934    fn test_build_result_populates_fields() {
1935        let output = b"\x1b[31mHello\x1b[0m\n";
1936        let extracted = "extracted text".to_string();
1937
1938        let result = build_result(
1939            output,
1940            true,
1941            Some(0),
1942            TerminationType::Natural,
1943            extracted.clone(),
1944        );
1945
1946        assert_eq!(result.output, String::from_utf8_lossy(output));
1947        assert!(result.stripped_output.contains("Hello"));
1948        assert!(!result.stripped_output.contains("\x1b["));
1949        assert_eq!(result.extracted_text, extracted);
1950        assert!(result.success);
1951        assert_eq!(result.exit_code, Some(0));
1952        assert_eq!(result.termination, TerminationType::Natural);
1953    }
1954
1955    #[cfg(unix)]
1956    #[tokio::test]
1957    async fn test_run_observe_executes_arg_prompt() {
1958        let temp_dir = TempDir::new().expect("temp dir");
1959        let backend = CliBackend {
1960            command: "sh".to_string(),
1961            args: vec!["-c".to_string()],
1962            prompt_mode: PromptMode::Arg,
1963            prompt_flag: None,
1964            output_format: OutputFormat::Text,
1965        };
1966        let config = PtyConfig {
1967            interactive: false,
1968            idle_timeout_secs: 0,
1969            cols: 80,
1970            rows: 24,
1971            workspace_root: temp_dir.path().to_path_buf(),
1972        };
1973        let executor = PtyExecutor::new(backend, config);
1974        let (_tx, rx) = tokio::sync::watch::channel(false);
1975
1976        let result = executor
1977            .run_observe("echo hello-pty", rx)
1978            .await
1979            .expect("run_observe");
1980
1981        assert!(result.success);
1982        assert!(result.output.contains("hello-pty"));
1983        assert!(result.stripped_output.contains("hello-pty"));
1984        assert_eq!(result.exit_code, Some(0));
1985        assert_eq!(result.termination, TerminationType::Natural);
1986    }
1987
1988    #[cfg(unix)]
1989    #[tokio::test]
1990    async fn test_run_observe_writes_stdin_prompt() {
1991        let temp_dir = TempDir::new().expect("temp dir");
1992        let backend = CliBackend {
1993            command: "sh".to_string(),
1994            args: vec!["-c".to_string(), "read line; echo \"$line\"".to_string()],
1995            prompt_mode: PromptMode::Stdin,
1996            prompt_flag: None,
1997            output_format: OutputFormat::Text,
1998        };
1999        let config = PtyConfig {
2000            interactive: false,
2001            idle_timeout_secs: 0,
2002            cols: 80,
2003            rows: 24,
2004            workspace_root: temp_dir.path().to_path_buf(),
2005        };
2006        let executor = PtyExecutor::new(backend, config);
2007        let (_tx, rx) = tokio::sync::watch::channel(false);
2008
2009        let result = executor
2010            .run_observe("stdin-line", rx)
2011            .await
2012            .expect("run_observe");
2013
2014        assert!(result.success);
2015        assert!(result.output.contains("stdin-line"));
2016        assert!(result.stripped_output.contains("stdin-line"));
2017        assert_eq!(result.termination, TerminationType::Natural);
2018    }
2019
2020    #[cfg(unix)]
2021    #[tokio::test]
2022    async fn test_run_observe_streaming_text_routes_output() {
2023        let temp_dir = TempDir::new().expect("temp dir");
2024        let backend = CliBackend {
2025            command: "sh".to_string(),
2026            args: vec!["-c".to_string()],
2027            prompt_mode: PromptMode::Arg,
2028            prompt_flag: None,
2029            output_format: OutputFormat::Text,
2030        };
2031        let config = PtyConfig {
2032            interactive: false,
2033            idle_timeout_secs: 0,
2034            cols: 80,
2035            rows: 24,
2036            workspace_root: temp_dir.path().to_path_buf(),
2037        };
2038        let executor = PtyExecutor::new(backend, config);
2039        let (_tx, rx) = tokio::sync::watch::channel(false);
2040        let mut handler = CapturingHandler::default();
2041
2042        let result = executor
2043            .run_observe_streaming("printf 'alpha\\nbeta\\n'", rx, &mut handler)
2044            .await
2045            .expect("run_observe_streaming");
2046
2047        assert!(result.success);
2048        let captured = handler.texts.join("");
2049        assert!(captured.contains("alpha"), "captured: {captured}");
2050        assert!(captured.contains("beta"), "captured: {captured}");
2051        assert!(handler.completions.is_empty());
2052        assert!(result.extracted_text.is_empty());
2053    }
2054
2055    #[cfg(unix)]
2056    #[tokio::test]
2057    async fn test_run_observe_streaming_parses_stream_json() {
2058        let temp_dir = TempDir::new().expect("temp dir");
2059        let backend = CliBackend {
2060            command: "sh".to_string(),
2061            args: vec!["-c".to_string()],
2062            prompt_mode: PromptMode::Arg,
2063            prompt_flag: None,
2064            output_format: OutputFormat::StreamJson,
2065        };
2066        let config = PtyConfig {
2067            interactive: false,
2068            idle_timeout_secs: 0,
2069            cols: 80,
2070            rows: 24,
2071            workspace_root: temp_dir.path().to_path_buf(),
2072        };
2073        let executor = PtyExecutor::new(backend, config);
2074        let (_tx, rx) = tokio::sync::watch::channel(false);
2075        let mut handler = CapturingHandler::default();
2076
2077        let script = r#"printf '%s\n' '{"type":"assistant","message":{"content":[{"type":"text","text":"Hello stream"}]}}' '{"type":"result","duration_ms":1,"total_cost_usd":0.0,"num_turns":1,"is_error":false}'"#;
2078        let result = executor
2079            .run_observe_streaming(script, rx, &mut handler)
2080            .await
2081            .expect("run_observe_streaming");
2082
2083        assert!(result.success);
2084        assert!(
2085            handler
2086                .texts
2087                .iter()
2088                .any(|text| text.contains("Hello stream"))
2089        );
2090        assert_eq!(handler.completions.len(), 1);
2091        assert!(result.extracted_text.contains("Hello stream"));
2092        assert_eq!(result.termination, TerminationType::Natural);
2093    }
2094
2095    #[cfg(unix)]
2096    #[tokio::test]
2097    async fn test_run_interactive_in_tui_mode() {
2098        let temp_dir = TempDir::new().expect("temp dir");
2099        let backend = CliBackend {
2100            command: "sh".to_string(),
2101            args: vec!["-c".to_string()],
2102            prompt_mode: PromptMode::Arg,
2103            prompt_flag: None,
2104            output_format: OutputFormat::Text,
2105        };
2106        let config = PtyConfig {
2107            interactive: true,
2108            idle_timeout_secs: 0,
2109            cols: 80,
2110            rows: 24,
2111            workspace_root: temp_dir.path().to_path_buf(),
2112        };
2113        let mut executor = PtyExecutor::new(backend, config);
2114        executor.set_tui_mode(true);
2115        let (_tx, rx) = tokio::sync::watch::channel(false);
2116
2117        let result = executor
2118            .run_interactive("echo hello-tui", rx)
2119            .await
2120            .expect("run_interactive");
2121
2122        assert!(result.success);
2123        assert!(result.output.contains("hello-tui"));
2124        assert!(result.stripped_output.contains("hello-tui"));
2125        assert_eq!(result.exit_code, Some(0));
2126        assert_eq!(result.termination, TerminationType::Natural);
2127    }
2128}