ralph_adapters/
pty_executor.rs

1//! PTY executor for running prompts with full terminal emulation.
2//!
3//! Spawns CLI tools in a pseudo-terminal to preserve rich TUI features like
4//! colors, spinners, and animations. Supports both interactive mode (user
5//! input forwarded) and observe mode (output-only).
6//!
7//! Key features:
8//! - PTY creation via `portable-pty` for cross-platform support
9//! - Idle timeout with activity tracking (output AND input reset timer)
10//! - Double Ctrl+C handling (first forwards, second terminates)
11//! - Raw mode management with cleanup on exit/crash
12//!
13//! Architecture:
14//! - Uses `tokio::select!` for non-blocking I/O multiplexing
15//! - Spawns separate tasks for PTY output and user input
16//! - Enables responsive Ctrl+C handling even when PTY is idle
17
18// Exit codes and PIDs are always within i32 range in practice
19#![allow(clippy::cast_possible_wrap)]
20
21use crate::cli_backend::{CliBackend, OutputFormat};
22use crate::claude_stream::{ClaudeStreamEvent, ClaudeStreamParser, ContentBlock, UserContentBlock};
23use crate::stream_handler::{SessionResult, StreamHandler};
24use nix::sys::signal::{kill, Signal};
25use nix::unistd::Pid;
26use portable_pty::{native_pty_system, CommandBuilder, PtyPair, PtySize};
27use std::io::{self, Read, Write};
28use std::sync::atomic::{AtomicBool, Ordering};
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use tokio::sync::{mpsc, watch};
32use tracing::{debug, info, warn};
33
34/// Result of a PTY execution.
35#[derive(Debug)]
36pub struct PtyExecutionResult {
37    /// The accumulated output (ANSI sequences preserved).
38    pub output: String,
39    /// The ANSI-stripped output for event parsing.
40    pub stripped_output: String,
41    /// Extracted text content from NDJSON stream (for Claude's stream-json output).
42    /// When Claude outputs `--output-format stream-json`, event tags like
43    /// `<event topic="...">` are inside JSON string values. This field contains
44    /// the extracted text content for proper event parsing.
45    /// Empty for non-JSON backends (use `stripped_output` instead).
46    pub extracted_text: String,
47    /// Whether the process exited successfully.
48    pub success: bool,
49    /// The exit code if available.
50    pub exit_code: Option<i32>,
51    /// How the process was terminated.
52    pub termination: TerminationType,
53}
54
55/// How the PTY process was terminated.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum TerminationType {
58    /// Process exited naturally.
59    Natural,
60    /// Terminated due to idle timeout.
61    IdleTimeout,
62    /// Terminated by user (double Ctrl+C).
63    UserInterrupt,
64    /// Force killed by user (Ctrl+\).
65    ForceKill,
66}
67
68/// Configuration for PTY execution.
69#[derive(Debug, Clone)]
70pub struct PtyConfig {
71    /// Enable interactive mode (forward user input).
72    pub interactive: bool,
73    /// Idle timeout in seconds (0 = disabled).
74    pub idle_timeout_secs: u32,
75    /// Terminal width.
76    pub cols: u16,
77    /// Terminal height.
78    pub rows: u16,
79}
80
81impl Default for PtyConfig {
82    fn default() -> Self {
83        Self {
84            interactive: true,
85            idle_timeout_secs: 30,
86            cols: 80,
87            rows: 24,
88        }
89    }
90}
91
92impl PtyConfig {
93    /// Creates config from environment, falling back to defaults.
94    pub fn from_env() -> Self {
95        let cols = std::env::var("COLUMNS")
96            .ok()
97            .and_then(|s| s.parse().ok())
98            .unwrap_or(80);
99        let rows = std::env::var("LINES")
100            .ok()
101            .and_then(|s| s.parse().ok())
102            .unwrap_or(24);
103
104        Self {
105            cols,
106            rows,
107            ..Default::default()
108        }
109    }
110}
111
112/// State machine for double Ctrl+C detection.
113#[derive(Debug)]
114pub struct CtrlCState {
115    /// When the first Ctrl+C was pressed (if any).
116    first_press: Option<Instant>,
117    /// Window duration for double-press detection.
118    window: Duration,
119}
120
121/// Action to take after handling Ctrl+C.
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum CtrlCAction {
124    /// Forward the Ctrl+C to Claude and start/restart the window.
125    ForwardAndStartWindow,
126    /// Terminate Claude (second Ctrl+C within window).
127    Terminate,
128}
129
130impl CtrlCState {
131    /// Creates a new Ctrl+C state tracker.
132    pub fn new() -> Self {
133        Self {
134            first_press: None,
135            window: Duration::from_secs(1),
136        }
137    }
138
139    /// Handles a Ctrl+C keypress and returns the action to take.
140    pub fn handle_ctrl_c(&mut self, now: Instant) -> CtrlCAction {
141        match self.first_press {
142            Some(first) if now.duration_since(first) < self.window => {
143                // Second Ctrl+C within window - terminate
144                self.first_press = None;
145                CtrlCAction::Terminate
146            }
147            _ => {
148                // First Ctrl+C or window expired - forward and start window
149                self.first_press = Some(now);
150                CtrlCAction::ForwardAndStartWindow
151            }
152        }
153    }
154}
155
156impl Default for CtrlCState {
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162/// Executor for running prompts in a pseudo-terminal.
163pub struct PtyExecutor {
164    backend: CliBackend,
165    config: PtyConfig,
166    // Channel ends for TUI integration
167    output_tx: mpsc::UnboundedSender<Vec<u8>>,
168    output_rx: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
169    input_tx: Option<mpsc::UnboundedSender<Vec<u8>>>,
170    input_rx: mpsc::UnboundedReceiver<Vec<u8>>,
171    control_tx: Option<mpsc::UnboundedSender<crate::pty_handle::ControlCommand>>,
172    control_rx: mpsc::UnboundedReceiver<crate::pty_handle::ControlCommand>,
173    // Termination notification for TUI
174    terminated_tx: watch::Sender<bool>,
175    terminated_rx: Option<watch::Receiver<bool>>,
176}
177
178impl PtyExecutor {
179    /// Creates a new PTY executor with the given backend and configuration.
180    pub fn new(backend: CliBackend, config: PtyConfig) -> Self {
181        let (output_tx, output_rx) = mpsc::unbounded_channel();
182        let (input_tx, input_rx) = mpsc::unbounded_channel();
183        let (control_tx, control_rx) = mpsc::unbounded_channel();
184        let (terminated_tx, terminated_rx) = watch::channel(false);
185
186        Self {
187            backend,
188            config,
189            output_tx,
190            output_rx: Some(output_rx),
191            input_tx: Some(input_tx),
192            input_rx,
193            control_tx: Some(control_tx),
194            control_rx,
195            terminated_tx,
196            terminated_rx: Some(terminated_rx),
197        }
198    }
199
200    /// Returns a handle for TUI integration.
201    ///
202    /// Can only be called once - panics if called multiple times.
203    pub fn handle(&mut self) -> crate::pty_handle::PtyHandle {
204        crate::pty_handle::PtyHandle {
205            output_rx: self.output_rx.take().expect("handle() already called"),
206            input_tx: self.input_tx.take().expect("handle() already called"),
207            control_tx: self.control_tx.take().expect("handle() already called"),
208            terminated_rx: self.terminated_rx.take().expect("handle() already called"),
209        }
210    }
211
212    /// Spawns Claude in a PTY and returns the PTY pair, child process, stdin input, and temp file.
213    ///
214    /// The temp file is returned to keep it alive for the duration of execution.
215    /// For large prompts (>7000 chars), Claude is instructed to read from a temp file.
216    /// If the temp file is dropped before Claude reads it, the file is deleted and Claude hangs.
217    ///
218    /// The stdin_input is returned so callers can write it to the PTY after taking the writer.
219    /// This is necessary because `take_writer()` can only be called once per PTY.
220    fn spawn_pty(&self, prompt: &str) -> io::Result<(PtyPair, Box<dyn portable_pty::Child + Send>, Option<String>, Option<tempfile::NamedTempFile>)> {
221        let pty_system = native_pty_system();
222
223        let pair = pty_system
224            .openpty(PtySize {
225                rows: self.config.rows,
226                cols: self.config.cols,
227                pixel_width: 0,
228                pixel_height: 0,
229            })
230            .map_err(|e| io::Error::other(e.to_string()))?;
231
232        let (cmd, args, stdin_input, temp_file) = self.backend.build_command(prompt, self.config.interactive);
233
234        let mut cmd_builder = CommandBuilder::new(&cmd);
235        cmd_builder.args(&args);
236
237        // Set explicit working directory
238        let cwd = std::env::current_dir()
239            .map_err(|e| io::Error::other(format!("Failed to get current directory: {}", e)))?;
240        cmd_builder.cwd(&cwd);
241
242        // Set up environment for PTY
243        cmd_builder.env("TERM", "xterm-256color");
244        let child = pair
245            .slave
246            .spawn_command(cmd_builder)
247            .map_err(|e| io::Error::other(e.to_string()))?;
248
249        // Return stdin_input so callers can write it after taking the writer
250        Ok((pair, child, stdin_input, temp_file))
251    }
252
253    /// Runs in observe mode (output-only, no input forwarding).
254    ///
255    /// This is an async function that listens for interrupt signals via the shared
256    /// `interrupt_rx` watch channel from the event loop.
257    /// Uses a separate thread for blocking PTY reads and tokio::select! for signal handling.
258    ///
259    /// Returns when the process exits, idle timeout triggers, or interrupt is received.
260    ///
261    /// # Arguments
262    /// * `prompt` - The prompt to execute
263    /// * `interrupt_rx` - Watch channel receiver for interrupt signals from the event loop
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if PTY allocation fails, the command cannot be spawned,
268    /// or an I/O error occurs during output handling.
269    pub async fn run_observe(
270        &self,
271        prompt: &str,
272        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
273    ) -> io::Result<PtyExecutionResult> {
274        // Keep temp_file alive for the duration of execution (large prompts use temp files)
275        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
276
277        let reader = pair.master.try_clone_reader()
278            .map_err(|e| io::Error::other(e.to_string()))?;
279
280        // Write stdin input if present (for stdin prompt mode)
281        if let Some(ref input) = stdin_input {
282            // Small delay to let process initialize
283            tokio::time::sleep(Duration::from_millis(100)).await;
284            let mut writer = pair.master.take_writer()
285                .map_err(|e| io::Error::other(e.to_string()))?;
286            writer.write_all(input.as_bytes())?;
287            writer.write_all(b"\n")?;
288            writer.flush()?;
289        }
290
291        // Drop the slave to signal EOF when master closes
292        drop(pair.slave);
293
294        let mut output = Vec::new();
295        let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
296            None
297        } else {
298            Some(Duration::from_secs(u64::from(self.config.idle_timeout_secs)))
299        };
300
301        let mut termination = TerminationType::Natural;
302        let mut last_activity = Instant::now();
303
304        // Flag for termination request (shared with reader thread)
305        let should_terminate = Arc::new(AtomicBool::new(false));
306
307        // Spawn blocking reader thread that sends output via channel
308        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
309        let should_terminate_reader = Arc::clone(&should_terminate);
310        // Check if TUI is handling output (output_rx taken by handle())
311        let tui_connected = self.output_rx.is_none();
312        let tui_output_tx = if tui_connected {
313            Some(self.output_tx.clone())
314        } else {
315            None
316        };
317
318        debug!("Spawning PTY output reader thread (observe mode)");
319        std::thread::spawn(move || {
320            let mut reader = reader;
321            let mut buf = [0u8; 4096];
322
323            loop {
324                if should_terminate_reader.load(Ordering::SeqCst) {
325                    debug!("PTY reader: termination requested");
326                    break;
327                }
328
329                match reader.read(&mut buf) {
330                    Ok(0) => {
331                        debug!("PTY reader: EOF");
332                        let _ = output_tx.blocking_send(OutputEvent::Eof);
333                        break;
334                    }
335                    Ok(n) => {
336                        let data = buf[..n].to_vec();
337                        // Send to TUI channel if connected
338                        if let Some(ref tx) = tui_output_tx {
339                            let _ = tx.send(data.clone());
340                        }
341                        // Send to main loop
342                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
343                            break;
344                        }
345                    }
346                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
347                        std::thread::sleep(Duration::from_millis(10));
348                    }
349                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
350                    Err(e) => {
351                        debug!(error = %e, "PTY reader error");
352                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
353                        break;
354                    }
355                }
356            }
357        });
358
359        // Main event loop using tokio::select! for interruptibility
360        loop {
361            // Calculate timeout for idle check
362            let idle_timeout = timeout_duration.map(|d| {
363                let elapsed = last_activity.elapsed();
364                if elapsed >= d {
365                    Duration::from_millis(1) // Trigger immediately
366                } else {
367                    d - elapsed
368                }
369            });
370
371            tokio::select! {
372                // Check for interrupt signal from event loop
373                _ = interrupt_rx.changed() => {
374                    if *interrupt_rx.borrow() {
375                        debug!("Interrupt received in observe mode, terminating");
376                        termination = TerminationType::UserInterrupt;
377                        should_terminate.store(true, Ordering::SeqCst);
378                        let _ = self.terminate_child(&mut child, true).await;
379                        break;
380                    }
381                }
382
383                // Check for output from reader thread
384                event = output_rx.recv() => {
385                    match event {
386                        Some(OutputEvent::Data(data)) => {
387                            // Only write to stdout if TUI is NOT handling output
388                            if !tui_connected {
389                                io::stdout().write_all(&data)?;
390                                io::stdout().flush()?;
391                            }
392                            output.extend_from_slice(&data);
393                            last_activity = Instant::now();
394                        }
395                        Some(OutputEvent::Eof) | None => {
396                            debug!("Output channel closed, process likely exited");
397                            break;
398                        }
399                        Some(OutputEvent::Error(e)) => {
400                            debug!(error = %e, "Reader thread reported error");
401                            break;
402                        }
403                    }
404                }
405
406                // Check for idle timeout
407                _ = async {
408                    if let Some(timeout) = idle_timeout {
409                        tokio::time::sleep(timeout).await;
410                    } else {
411                        // No timeout configured, wait forever
412                        std::future::pending::<()>().await;
413                    }
414                } => {
415                    warn!(
416                        timeout_secs = self.config.idle_timeout_secs,
417                        "Idle timeout triggered"
418                    );
419                    termination = TerminationType::IdleTimeout;
420                    should_terminate.store(true, Ordering::SeqCst);
421                    self.terminate_child(&mut child, true).await?;
422                    break;
423                }
424            }
425
426            // Check if child has exited
427            if let Some(status) = child.try_wait().map_err(|e| io::Error::other(e.to_string()))? {
428                let exit_code = status.exit_code() as i32;
429                debug!(exit_status = ?status, exit_code, "Child process exited");
430
431                // Drain any remaining output from channel
432                while let Ok(event) = output_rx.try_recv() {
433                    if let OutputEvent::Data(data) = event {
434                        if !tui_connected {
435                            io::stdout().write_all(&data)?;
436                            io::stdout().flush()?;
437                        }
438                        output.extend_from_slice(&data);
439                    }
440                }
441
442                let final_termination = resolve_termination_type(exit_code, termination);
443                // run_observe doesn't parse JSON, so extracted_text is empty
444                return Ok(build_result(&output, status.success(), Some(exit_code), final_termination, String::new()));
445            }
446        }
447
448        // Signal reader thread to stop
449        should_terminate.store(true, Ordering::SeqCst);
450
451        // Wait for child to fully exit (interruptible + bounded)
452        let status = self
453            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
454            .await?;
455
456        let (success, exit_code, final_termination) = match status {
457            Some(s) => {
458                let code = s.exit_code() as i32;
459                (s.success(), Some(code), resolve_termination_type(code, termination))
460            }
461            None => {
462                warn!("Timed out waiting for child to exit after termination");
463                (false, None, termination)
464            }
465        };
466
467        // run_observe doesn't parse JSON, so extracted_text is empty
468        Ok(build_result(&output, success, exit_code, final_termination, String::new()))
469    }
470
471    /// Runs in observe mode with streaming event handling for JSON output.
472    ///
473    /// When the backend's output format is `StreamJson`, this method parses
474    /// NDJSON lines and dispatches events to the provided handler for real-time
475    /// display. For `Text` format, behaves identically to `run_observe`.
476    ///
477    /// # Arguments
478    /// * `prompt` - The prompt to execute
479    /// * `interrupt_rx` - Watch channel receiver for interrupt signals
480    /// * `handler` - Handler to receive streaming events
481    ///
482    /// # Errors
483    ///
484    /// Returns an error if PTY allocation fails, the command cannot be spawned,
485    /// or an I/O error occurs during output handling.
486    pub async fn run_observe_streaming<H: StreamHandler>(
487        &self,
488        prompt: &str,
489        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
490        handler: &mut H,
491    ) -> io::Result<PtyExecutionResult> {
492        // Check output format to decide parsing strategy
493        let output_format = self.backend.output_format;
494
495        // If not StreamJson, delegate to regular run_observe
496        if output_format != OutputFormat::StreamJson {
497            return self.run_observe(prompt, interrupt_rx).await;
498        }
499
500        // Keep temp_file alive for the duration of execution
501        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
502
503        let reader = pair.master.try_clone_reader()
504            .map_err(|e| io::Error::other(e.to_string()))?;
505
506        // Write stdin input if present (for stdin prompt mode)
507        if let Some(ref input) = stdin_input {
508            tokio::time::sleep(Duration::from_millis(100)).await;
509            let mut writer = pair.master.take_writer()
510                .map_err(|e| io::Error::other(e.to_string()))?;
511            writer.write_all(input.as_bytes())?;
512            writer.write_all(b"\n")?;
513            writer.flush()?;
514        }
515
516        drop(pair.slave);
517
518        let mut output = Vec::new();
519        let mut line_buffer = String::new();
520        // Accumulate extracted text from NDJSON for event parsing
521        let mut extracted_text = String::new();
522        let timeout_duration = if !self.config.interactive || self.config.idle_timeout_secs == 0 {
523            None
524        } else {
525            Some(Duration::from_secs(u64::from(self.config.idle_timeout_secs)))
526        };
527
528        let mut termination = TerminationType::Natural;
529        let mut last_activity = Instant::now();
530
531        let should_terminate = Arc::new(AtomicBool::new(false));
532
533        // Spawn blocking reader thread
534        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
535        let should_terminate_reader = Arc::clone(&should_terminate);
536        let tui_connected = self.output_rx.is_none();
537        let tui_output_tx = if tui_connected {
538            Some(self.output_tx.clone())
539        } else {
540            None
541        };
542
543        debug!("Spawning PTY output reader thread (streaming mode)");
544        std::thread::spawn(move || {
545            let mut reader = reader;
546            let mut buf = [0u8; 4096];
547
548            loop {
549                if should_terminate_reader.load(Ordering::SeqCst) {
550                    debug!("PTY reader: termination requested");
551                    break;
552                }
553
554                match reader.read(&mut buf) {
555                    Ok(0) => {
556                        debug!("PTY reader: EOF");
557                        let _ = output_tx.blocking_send(OutputEvent::Eof);
558                        break;
559                    }
560                    Ok(n) => {
561                        let data = buf[..n].to_vec();
562                        if let Some(ref tx) = tui_output_tx {
563                            let _ = tx.send(data.clone());
564                        }
565                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
566                            break;
567                        }
568                    }
569                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
570                        std::thread::sleep(Duration::from_millis(10));
571                    }
572                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
573                    Err(e) => {
574                        debug!(error = %e, "PTY reader error");
575                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
576                        break;
577                    }
578                }
579            }
580        });
581
582        // Main event loop with JSON line parsing
583        loop {
584            let idle_timeout = timeout_duration.map(|d| {
585                let elapsed = last_activity.elapsed();
586                if elapsed >= d {
587                    Duration::from_millis(1)
588                } else {
589                    d - elapsed
590                }
591            });
592
593            tokio::select! {
594                _ = interrupt_rx.changed() => {
595                    if *interrupt_rx.borrow() {
596                        debug!("Interrupt received in streaming observe mode, terminating");
597                        termination = TerminationType::UserInterrupt;
598                        should_terminate.store(true, Ordering::SeqCst);
599                        let _ = self.terminate_child(&mut child, true).await;
600                        break;
601                    }
602                }
603
604                event = output_rx.recv() => {
605                    match event {
606                        Some(OutputEvent::Data(data)) => {
607                            output.extend_from_slice(&data);
608                            last_activity = Instant::now();
609
610                            // Parse JSON lines from the data
611                            if let Ok(text) = std::str::from_utf8(&data) {
612                                line_buffer.push_str(text);
613
614                                // Process complete lines
615                                while let Some(newline_pos) = line_buffer.find('\n') {
616                                    let line = line_buffer[..newline_pos].to_string();
617                                    line_buffer = line_buffer[newline_pos + 1..].to_string();
618
619                                    if let Some(event) = ClaudeStreamParser::parse_line(&line) {
620                                        dispatch_stream_event(event, handler, &mut extracted_text);
621                                    }
622                                }
623                            }
624                        }
625                        Some(OutputEvent::Eof) | None => {
626                            debug!("Output channel closed");
627                            // Process any remaining content in buffer
628                            if !line_buffer.is_empty() {
629                                if let Some(event) = ClaudeStreamParser::parse_line(&line_buffer) {
630                                    dispatch_stream_event(event, handler, &mut extracted_text);
631                                }
632                            }
633                            break;
634                        }
635                        Some(OutputEvent::Error(e)) => {
636                            debug!(error = %e, "Reader thread reported error");
637                            handler.on_error(&e);
638                            break;
639                        }
640                    }
641                }
642
643                _ = async {
644                    if let Some(timeout) = idle_timeout {
645                        tokio::time::sleep(timeout).await;
646                    } else {
647                        std::future::pending::<()>().await;
648                    }
649                } => {
650                    warn!(
651                        timeout_secs = self.config.idle_timeout_secs,
652                        "Idle timeout triggered"
653                    );
654                    termination = TerminationType::IdleTimeout;
655                    should_terminate.store(true, Ordering::SeqCst);
656                    self.terminate_child(&mut child, true).await?;
657                    break;
658                }
659            }
660
661            // Check if child has exited
662            if let Some(status) = child.try_wait().map_err(|e| io::Error::other(e.to_string()))? {
663                let exit_code = status.exit_code() as i32;
664                debug!(exit_status = ?status, exit_code, "Child process exited");
665
666                // Drain remaining output
667                while let Ok(event) = output_rx.try_recv() {
668                    if let OutputEvent::Data(data) = event {
669                        output.extend_from_slice(&data);
670                        if let Ok(text) = std::str::from_utf8(&data) {
671                            line_buffer.push_str(text);
672                            while let Some(newline_pos) = line_buffer.find('\n') {
673                                let line = line_buffer[..newline_pos].to_string();
674                                line_buffer = line_buffer[newline_pos + 1..].to_string();
675                                if let Some(event) = ClaudeStreamParser::parse_line(&line) {
676                                    dispatch_stream_event(event, handler, &mut extracted_text);
677                                }
678                            }
679                        }
680                    }
681                }
682
683                // Process final buffer content
684                if !line_buffer.is_empty() {
685                    if let Some(event) = ClaudeStreamParser::parse_line(&line_buffer) {
686                        dispatch_stream_event(event, handler, &mut extracted_text);
687                    }
688                }
689
690                let final_termination = resolve_termination_type(exit_code, termination);
691                // Pass extracted_text for event parsing from NDJSON
692                return Ok(build_result(&output, status.success(), Some(exit_code), final_termination, extracted_text));
693            }
694        }
695
696        should_terminate.store(true, Ordering::SeqCst);
697
698        let status = self
699            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
700            .await?;
701
702        let (success, exit_code, final_termination) = match status {
703            Some(s) => {
704                let code = s.exit_code() as i32;
705                (s.success(), Some(code), resolve_termination_type(code, termination))
706            }
707            None => {
708                warn!("Timed out waiting for child to exit after termination");
709                (false, None, termination)
710            }
711        };
712
713        // Pass extracted_text for event parsing from NDJSON
714        Ok(build_result(&output, success, exit_code, final_termination, extracted_text))
715    }
716
717    /// Runs in interactive mode (bidirectional I/O).
718    ///
719    /// Uses `tokio::select!` for non-blocking I/O multiplexing between:
720    /// 1. PTY output (from blocking reader via channel)
721    /// 2. User input (from stdin thread via channel)
722    /// 3. Interrupt signal from event loop
723    /// 4. Idle timeout
724    ///
725    /// This design ensures Ctrl+C is always responsive, even when the PTY
726    /// has no output (e.g., during long-running tool calls).
727    ///
728    /// # Arguments
729    /// * `prompt` - The prompt to execute
730    /// * `interrupt_rx` - Watch channel receiver for interrupt signals from the event loop
731    ///
732    /// # Errors
733    ///
734    /// Returns an error if PTY allocation fails, the command cannot be spawned,
735    /// or an I/O error occurs during bidirectional communication.
736    #[allow(clippy::too_many_lines)] // Complex state machine requires cohesive implementation
737    pub async fn run_interactive(
738        &mut self,
739        prompt: &str,
740        mut interrupt_rx: tokio::sync::watch::Receiver<bool>,
741    ) -> io::Result<PtyExecutionResult> {
742        // Keep temp_file alive for the duration of execution (large prompts use temp files)
743        let (pair, mut child, stdin_input, _temp_file) = self.spawn_pty(prompt)?;
744
745        let reader = pair.master.try_clone_reader()
746            .map_err(|e| io::Error::other(e.to_string()))?;
747        let mut writer = pair.master.take_writer()
748            .map_err(|e| io::Error::other(e.to_string()))?;
749
750        // Keep master for resize operations
751        let master = pair.master;
752
753        // Drop the slave to signal EOF when master closes
754        drop(pair.slave);
755
756        // Store stdin_input for writing after reader thread starts
757        let pending_stdin = stdin_input;
758
759        let mut output = Vec::new();
760        let timeout_duration = if self.config.idle_timeout_secs > 0 {
761            Some(Duration::from_secs(u64::from(self.config.idle_timeout_secs)))
762        } else {
763            None
764        };
765
766        let mut ctrl_c_state = CtrlCState::new();
767        let mut termination = TerminationType::Natural;
768        let mut last_activity = Instant::now();
769
770        // Flag for termination request (shared with spawned tasks)
771        let should_terminate = Arc::new(AtomicBool::new(false));
772
773        // Spawn output reading task (blocking read wrapped in spawn_blocking via channel)
774        let (output_tx, mut output_rx) = mpsc::channel::<OutputEvent>(256);
775        let should_terminate_output = Arc::clone(&should_terminate);
776        // Check if TUI is handling output (output_rx taken by handle())
777        let tui_connected = self.output_rx.is_none();
778        let tui_output_tx = if tui_connected {
779            Some(self.output_tx.clone())
780        } else {
781            None
782        };
783
784        debug!("Spawning PTY output reader thread");
785        std::thread::spawn(move || {
786            debug!("PTY output reader thread started");
787            let mut reader = reader;
788            let mut buf = [0u8; 4096];
789
790            loop {
791                if should_terminate_output.load(Ordering::SeqCst) {
792                    debug!("PTY output reader: termination requested");
793                    break;
794                }
795
796                match reader.read(&mut buf) {
797                    Ok(0) => {
798                        // EOF - PTY closed
799                        debug!("PTY output reader: EOF received");
800                        let _ = output_tx.blocking_send(OutputEvent::Eof);
801                        break;
802                    }
803                    Ok(n) => {
804                        let data = buf[..n].to_vec();
805                        // Send to TUI channel if connected
806                        if let Some(ref tx) = tui_output_tx {
807                            let _ = tx.send(data.clone());
808                        }
809                        // Send to main loop
810                        if output_tx.blocking_send(OutputEvent::Data(data)).is_err() {
811                            debug!("PTY output reader: channel closed");
812                            break;
813                        }
814                    }
815                    Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
816                        // Non-blocking mode: no data available, yield briefly
817                        std::thread::sleep(Duration::from_millis(1));
818                    }
819                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {
820                        // Interrupted by signal, retry
821                    }
822                    Err(e) => {
823                        warn!("PTY output reader: error - {}", e);
824                        let _ = output_tx.blocking_send(OutputEvent::Error(e.to_string()));
825                        break;
826                    }
827                }
828            }
829            debug!("PTY output reader thread exiting");
830        });
831
832        // Spawn input reading task
833        let (input_tx, mut input_rx) = mpsc::unbounded_channel::<InputEvent>();
834        let should_terminate_input = Arc::clone(&should_terminate);
835
836        std::thread::spawn(move || {
837            let mut stdin = io::stdin();
838            let mut buf = [0u8; 1];
839
840            loop {
841                if should_terminate_input.load(Ordering::SeqCst) {
842                    break;
843                }
844
845                match stdin.read(&mut buf) {
846                    Ok(0) => break, // EOF
847                    Ok(1) => {
848                        let byte = buf[0];
849                        let event = match byte {
850                            3 => InputEvent::CtrlC,           // Ctrl+C
851                            28 => InputEvent::CtrlBackslash,  // Ctrl+\
852                            _ => InputEvent::Data(vec![byte]),
853                        };
854                        if input_tx.send(event).is_err() {
855                            break;
856                        }
857                    }
858                    Ok(_) => {} // Shouldn't happen with 1-byte buffer
859                    Err(e) if e.kind() == io::ErrorKind::Interrupted => {}
860                    Err(_) => break,
861                }
862            }
863        });
864
865        // Write stdin input after threads are spawned (so we capture any output)
866        // Give Claude's TUI a moment to initialize before sending the prompt
867        if let Some(ref input) = pending_stdin {
868            tokio::time::sleep(Duration::from_millis(100)).await;
869            writer.write_all(input.as_bytes())?;
870            writer.write_all(b"\n")?;
871            writer.flush()?;
872            last_activity = Instant::now();
873        }
874
875        // Main select loop - this is the key fix for blocking I/O
876        // We use tokio::select! to multiplex between output, input, and timeout
877        loop {
878            // Check if child has exited (non-blocking check before select)
879            if let Some(status) = child.try_wait()
880                .map_err(|e| io::Error::other(e.to_string()))?
881            {
882                let exit_code = status.exit_code() as i32;
883                debug!(exit_status = ?status, exit_code, "Child process exited");
884
885                // Drain remaining output from channel
886                while let Ok(event) = output_rx.try_recv() {
887                    if let OutputEvent::Data(data) = event {
888                        if !tui_connected {
889                            io::stdout().write_all(&data)?;
890                            io::stdout().flush()?;
891                        }
892                        output.extend_from_slice(&data);
893                    }
894                }
895
896                should_terminate.store(true, Ordering::SeqCst);
897                // Signal TUI that PTY has terminated
898                let _ = self.terminated_tx.send(true);
899
900                let final_termination = resolve_termination_type(exit_code, termination);
901                // run_interactive doesn't parse JSON, so extracted_text is empty
902                return Ok(build_result(&output, status.success(), Some(exit_code), final_termination, String::new()));
903            }
904
905            // Build the timeout future (or a never-completing one if disabled)
906            let timeout_future = async {
907                match timeout_duration {
908                    Some(d) => {
909                        let elapsed = last_activity.elapsed();
910                        if elapsed >= d {
911                            tokio::time::sleep(Duration::ZERO).await
912                        } else {
913                            tokio::time::sleep(d - elapsed).await
914                        }
915                    }
916                    None => std::future::pending::<()>().await,
917                }
918            };
919
920            tokio::select! {
921                // PTY output received
922                output_event = output_rx.recv() => {
923                    match output_event {
924                        Some(OutputEvent::Data(data)) => {
925                            // Only write to stdout if TUI is NOT handling output
926                            if !tui_connected {
927                                io::stdout().write_all(&data)?;
928                                io::stdout().flush()?;
929                            }
930                            output.extend_from_slice(&data);
931
932                            last_activity = Instant::now();
933                        }
934                        Some(OutputEvent::Eof) => {
935                            debug!("PTY EOF received");
936                            break;
937                        }
938                        Some(OutputEvent::Error(e)) => {
939                            debug!(error = %e, "PTY read error");
940                            break;
941                        }
942                        None => {
943                            // Channel closed, reader thread exited
944                            break;
945                        }
946                    }
947                }
948
949                // User input received (from stdin)
950                input_event = async { input_rx.recv().await } => {
951                    match input_event {
952                        Some(InputEvent::CtrlC) => {
953                            match ctrl_c_state.handle_ctrl_c(Instant::now()) {
954                                CtrlCAction::ForwardAndStartWindow => {
955                                    // Forward Ctrl+C to Claude
956                                    let _ = writer.write_all(&[3]);
957                                    let _ = writer.flush();
958                                    last_activity = Instant::now();
959                                }
960                                CtrlCAction::Terminate => {
961                                    info!("Double Ctrl+C detected, terminating");
962                                    termination = TerminationType::UserInterrupt;
963                                    should_terminate.store(true, Ordering::SeqCst);
964                                    self.terminate_child(&mut child, true).await?;
965                                    break;
966                                }
967                            }
968                        }
969                        Some(InputEvent::CtrlBackslash) => {
970                            info!("Ctrl+\\ detected, force killing");
971                            termination = TerminationType::ForceKill;
972                            should_terminate.store(true, Ordering::SeqCst);
973                            self.terminate_child(&mut child, false).await?;
974                            break;
975                        }
976                        Some(InputEvent::Data(data)) => {
977                            // Forward to Claude
978                            let _ = writer.write_all(&data);
979                            let _ = writer.flush();
980                            last_activity = Instant::now();
981                        }
982                        None => {
983                            // Input channel closed (stdin EOF)
984                            debug!("Input channel closed");
985                        }
986                    }
987                }
988
989                // TUI input received (convert to InputEvent for unified handling)
990                tui_input = self.input_rx.recv() => {
991                    if let Some(data) = tui_input {
992                        match InputEvent::from_bytes(data) {
993                            InputEvent::CtrlC => {
994                                match ctrl_c_state.handle_ctrl_c(Instant::now()) {
995                                    CtrlCAction::ForwardAndStartWindow => {
996                                        let _ = writer.write_all(&[3]);
997                                        let _ = writer.flush();
998                                        last_activity = Instant::now();
999                                    }
1000                                    CtrlCAction::Terminate => {
1001                                        info!("Double Ctrl+C detected, terminating");
1002                                        termination = TerminationType::UserInterrupt;
1003                                        should_terminate.store(true, Ordering::SeqCst);
1004                                        self.terminate_child(&mut child, true).await?;
1005                                        break;
1006                                    }
1007                                }
1008                            }
1009                            InputEvent::CtrlBackslash => {
1010                                info!("Ctrl+\\ detected, force killing");
1011                                termination = TerminationType::ForceKill;
1012                                should_terminate.store(true, Ordering::SeqCst);
1013                                self.terminate_child(&mut child, false).await?;
1014                                break;
1015                            }
1016                            InputEvent::Data(bytes) => {
1017                                let _ = writer.write_all(&bytes);
1018                                let _ = writer.flush();
1019                                last_activity = Instant::now();
1020                            }
1021                        }
1022                    }
1023                }
1024
1025                // Control commands from TUI
1026                control_cmd = self.control_rx.recv() => {
1027                    if let Some(cmd) = control_cmd {
1028                        use crate::pty_handle::ControlCommand;
1029                        match cmd {
1030                            ControlCommand::Kill => {
1031                                info!("Control command: Kill");
1032                                termination = TerminationType::UserInterrupt;
1033                                should_terminate.store(true, Ordering::SeqCst);
1034                                self.terminate_child(&mut child, true).await?;
1035                                break;
1036                            }
1037                            ControlCommand::Resize(cols, rows) => {
1038                                debug!(cols, rows, "Control command: Resize");
1039                                // Resize the PTY to match TUI dimensions
1040                                if let Err(e) = master.resize(PtySize {
1041                                    rows,
1042                                    cols,
1043                                    pixel_width: 0,
1044                                    pixel_height: 0,
1045                                }) {
1046                                    warn!("Failed to resize PTY: {}", e);
1047                                }
1048                            }
1049                            ControlCommand::Skip | ControlCommand::Abort => {
1050                                // These are handled at orchestrator level, not here
1051                                debug!("Control command: {:?} (ignored at PTY level)", cmd);
1052                            }
1053                        }
1054                    }
1055                }
1056
1057                // Idle timeout expired
1058                _ = timeout_future => {
1059                    warn!(
1060                        timeout_secs = self.config.idle_timeout_secs,
1061                        "Idle timeout triggered"
1062                    );
1063                    termination = TerminationType::IdleTimeout;
1064                    should_terminate.store(true, Ordering::SeqCst);
1065                    self.terminate_child(&mut child, true).await?;
1066                    break;
1067                }
1068
1069                // Interrupt signal from event loop
1070                _ = interrupt_rx.changed() => {
1071                    if *interrupt_rx.borrow() {
1072                        debug!("Interrupt received in interactive mode, terminating");
1073                        termination = TerminationType::UserInterrupt;
1074                        should_terminate.store(true, Ordering::SeqCst);
1075                        self.terminate_child(&mut child, true).await?;
1076                        break;
1077                    }
1078                }
1079            }
1080        }
1081
1082        // Ensure termination flag is set for spawned threads
1083        should_terminate.store(true, Ordering::SeqCst);
1084
1085        // Signal TUI that PTY has terminated
1086        let _ = self.terminated_tx.send(true);
1087
1088        // Wait for child to fully exit (interruptible + bounded)
1089        let status = self
1090            .wait_for_exit(&mut child, Some(Duration::from_secs(2)), &mut interrupt_rx)
1091            .await?;
1092
1093        let (success, exit_code, final_termination) = match status {
1094            Some(s) => {
1095                let code = s.exit_code() as i32;
1096                (s.success(), Some(code), resolve_termination_type(code, termination))
1097            }
1098            None => {
1099                warn!("Timed out waiting for child to exit after termination");
1100                (false, None, termination)
1101            }
1102        };
1103
1104        // run_interactive doesn't parse JSON, so extracted_text is empty
1105        Ok(build_result(&output, success, exit_code, final_termination, String::new()))
1106    }
1107
1108    /// Terminates the child process.
1109    ///
1110    /// If `graceful` is true, sends SIGTERM and waits up to 5 seconds before SIGKILL.
1111    /// If `graceful` is false, sends SIGKILL immediately.
1112    ///
1113    /// This is an async function to avoid blocking the tokio runtime during the
1114    /// grace period wait. Previously used `std::thread::sleep` which blocked the
1115    /// worker thread for up to 5 seconds, making the TUI appear frozen.
1116    #[allow(clippy::unused_self)] // Self is conceptually the right receiver for this method
1117    async fn terminate_child(&self, child: &mut Box<dyn portable_pty::Child + Send>, graceful: bool) -> io::Result<()> {
1118        let pid = match child.process_id() {
1119            Some(id) => Pid::from_raw(id as i32),
1120            None => return Ok(()), // Already exited
1121        };
1122
1123        if graceful {
1124            debug!(pid = %pid, "Sending SIGTERM");
1125            let _ = kill(pid, Signal::SIGTERM);
1126
1127            // Wait up to 5 seconds for graceful exit (reduced from 5s for better UX)
1128            let grace_period = Duration::from_secs(2);
1129            let start = Instant::now();
1130
1131            while start.elapsed() < grace_period {
1132                if child.try_wait()
1133                    .map_err(|e| io::Error::other(e.to_string()))?
1134                    .is_some()
1135                {
1136                    return Ok(());
1137                }
1138                // Use async sleep to avoid blocking the tokio runtime
1139                tokio::time::sleep(Duration::from_millis(50)).await;
1140            }
1141
1142            // Still running after grace period - force kill
1143            debug!(pid = %pid, "Grace period expired, sending SIGKILL");
1144        }
1145
1146        debug!(pid = %pid, "Sending SIGKILL");
1147        let _ = kill(pid, Signal::SIGKILL);
1148        Ok(())
1149    }
1150
1151    /// Waits for the child process to exit, optionally with a timeout.
1152    ///
1153    /// This is interruptible by the shared interrupt channel from the event loop.
1154    /// When interrupted, returns `Ok(None)` to let the caller handle termination.
1155    async fn wait_for_exit(
1156        &self,
1157        child: &mut Box<dyn portable_pty::Child + Send>,
1158        max_wait: Option<Duration>,
1159        interrupt_rx: &mut tokio::sync::watch::Receiver<bool>,
1160    ) -> io::Result<Option<portable_pty::ExitStatus>> {
1161        let start = Instant::now();
1162
1163        loop {
1164            if let Some(status) = child
1165                .try_wait()
1166                .map_err(|e| io::Error::other(e.to_string()))?
1167            {
1168                return Ok(Some(status));
1169            }
1170
1171            if let Some(max) = max_wait {
1172                if start.elapsed() >= max {
1173                    return Ok(None);
1174                }
1175            }
1176
1177            tokio::select! {
1178                _ = interrupt_rx.changed() => {
1179                    if *interrupt_rx.borrow() {
1180                        debug!("Interrupt received while waiting for child exit");
1181                        return Ok(None);
1182                    }
1183                }
1184                _ = tokio::time::sleep(Duration::from_millis(50)) => {}
1185            }
1186        }
1187    }
1188}
1189
1190/// Input events from the user.
1191#[derive(Debug)]
1192enum InputEvent {
1193    /// Ctrl+C pressed.
1194    CtrlC,
1195    /// Ctrl+\ pressed.
1196    CtrlBackslash,
1197    /// Regular data to forward.
1198    Data(Vec<u8>),
1199}
1200
1201impl InputEvent {
1202    /// Creates an InputEvent from raw bytes.
1203    fn from_bytes(data: Vec<u8>) -> Self {
1204        if data.len() == 1 {
1205            match data[0] {
1206                3 => return InputEvent::CtrlC,
1207                28 => return InputEvent::CtrlBackslash,
1208                _ => {}
1209            }
1210        }
1211        InputEvent::Data(data)
1212    }
1213}
1214
1215/// Output events from the PTY.
1216#[derive(Debug)]
1217enum OutputEvent {
1218    /// Data received from PTY.
1219    Data(Vec<u8>),
1220    /// PTY reached EOF (process exited).
1221    Eof,
1222    /// Error reading from PTY.
1223    Error(String),
1224}
1225
1226/// Strips ANSI escape sequences from raw bytes.
1227///
1228/// Uses `strip-ansi-escapes` for direct byte-level ANSI removal without terminal
1229/// emulation. This ensures ALL content is preserved regardless of output size,
1230/// unlike vt100's terminal simulation which can lose content that scrolls off.
1231fn strip_ansi(bytes: &[u8]) -> String {
1232    let stripped = strip_ansi_escapes::strip(bytes);
1233    String::from_utf8_lossy(&stripped).into_owned()
1234}
1235
1236/// Determines the final termination type, accounting for SIGINT exit code.
1237///
1238/// Exit code 130 indicates the process was killed by SIGINT (Ctrl+C forwarded to PTY).
1239fn resolve_termination_type(exit_code: i32, default: TerminationType) -> TerminationType {
1240    if exit_code == 130 {
1241        info!("Child process killed by SIGINT");
1242        TerminationType::UserInterrupt
1243    } else {
1244        default
1245    }
1246}
1247
1248/// Dispatches a Claude stream event to the appropriate handler method.
1249/// Also accumulates text content into `extracted_text` for event parsing.
1250fn dispatch_stream_event<H: StreamHandler>(
1251    event: ClaudeStreamEvent,
1252    handler: &mut H,
1253    extracted_text: &mut String,
1254) {
1255    match event {
1256        ClaudeStreamEvent::System { .. } => {
1257            // Session initialization - could log in verbose mode but not user-facing
1258        }
1259        ClaudeStreamEvent::Assistant { message, .. } => {
1260            for block in message.content {
1261                match block {
1262                    ContentBlock::Text { text } => {
1263                        handler.on_text(&text);
1264                        // Accumulate text for event parsing
1265                        extracted_text.push_str(&text);
1266                        extracted_text.push('\n');
1267                    }
1268                    ContentBlock::ToolUse { name, id, input } => {
1269                        handler.on_tool_call(&name, &id, &input)
1270                    }
1271                }
1272            }
1273        }
1274        ClaudeStreamEvent::User { message } => {
1275            for block in message.content {
1276                match block {
1277                    UserContentBlock::ToolResult { tool_use_id, content } => {
1278                        handler.on_tool_result(&tool_use_id, &content);
1279                    }
1280                }
1281            }
1282        }
1283        ClaudeStreamEvent::Result { duration_ms, total_cost_usd, num_turns, is_error } => {
1284            if is_error {
1285                handler.on_error("Session ended with error");
1286            }
1287            handler.on_complete(&SessionResult {
1288                duration_ms,
1289                total_cost_usd,
1290                num_turns,
1291                is_error,
1292            });
1293        }
1294    }
1295}
1296
1297/// Builds a `PtyExecutionResult` from the accumulated output and exit status.
1298///
1299/// # Arguments
1300/// * `output` - Raw bytes from PTY
1301/// * `success` - Whether process exited successfully
1302/// * `exit_code` - Process exit code if available
1303/// * `termination` - How the process was terminated
1304/// * `extracted_text` - Text extracted from NDJSON stream (for Claude's stream-json)
1305fn build_result(
1306    output: &[u8],
1307    success: bool,
1308    exit_code: Option<i32>,
1309    termination: TerminationType,
1310    extracted_text: String,
1311) -> PtyExecutionResult {
1312    PtyExecutionResult {
1313        output: String::from_utf8_lossy(output).to_string(),
1314        stripped_output: strip_ansi(output),
1315        extracted_text,
1316        success,
1317        exit_code,
1318        termination,
1319    }
1320}
1321
1322#[cfg(test)]
1323mod tests {
1324    use super::*;
1325
1326    #[test]
1327    fn test_double_ctrl_c_within_window() {
1328        let mut state = CtrlCState::new();
1329        let now = Instant::now();
1330
1331        // First Ctrl+C: should forward and start window
1332        let action = state.handle_ctrl_c(now);
1333        assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1334
1335        // Second Ctrl+C within 1 second: should terminate
1336        let later = now + Duration::from_millis(500);
1337        let action = state.handle_ctrl_c(later);
1338        assert_eq!(action, CtrlCAction::Terminate);
1339    }
1340
1341    #[test]
1342    fn test_ctrl_c_window_expires() {
1343        let mut state = CtrlCState::new();
1344        let now = Instant::now();
1345
1346        // First Ctrl+C
1347        state.handle_ctrl_c(now);
1348
1349        // Wait 2 seconds (window expires)
1350        let later = now + Duration::from_secs(2);
1351
1352        // Second Ctrl+C: window expired, should forward and start new window
1353        let action = state.handle_ctrl_c(later);
1354        assert_eq!(action, CtrlCAction::ForwardAndStartWindow);
1355    }
1356
1357    #[test]
1358    fn test_strip_ansi_basic() {
1359        let input = b"\x1b[1;36m  Thinking...\x1b[0m\r\n";
1360        let stripped = strip_ansi(input);
1361        assert!(stripped.contains("Thinking..."));
1362        assert!(!stripped.contains("\x1b["));
1363    }
1364
1365    #[test]
1366    fn test_completion_promise_extraction() {
1367        // Simulate Claude output with heavy ANSI formatting
1368        let input = b"\x1b[1;36m  Thinking...\x1b[0m\r\n\
1369                      \x1b[2K\x1b[1;32m  Done!\x1b[0m\r\n\
1370                      \x1b[33mLOOP_COMPLETE\x1b[0m\r\n";
1371
1372        let stripped = strip_ansi(input);
1373
1374        // Event parser sees clean text
1375        assert!(stripped.contains("LOOP_COMPLETE"));
1376        assert!(!stripped.contains("\x1b["));
1377    }
1378
1379    #[test]
1380    fn test_event_tag_extraction() {
1381        // Event tags may be wrapped in ANSI codes
1382        let input = b"\x1b[90m<event topic=\"build.done\">\x1b[0m\r\n\
1383                      Task completed successfully\r\n\
1384                      \x1b[90m</event>\x1b[0m\r\n";
1385
1386        let stripped = strip_ansi(input);
1387
1388        assert!(stripped.contains("<event topic=\"build.done\">"));
1389        assert!(stripped.contains("</event>"));
1390    }
1391
1392    #[test]
1393    fn test_large_output_preserves_early_events() {
1394        // Regression test: ensure event tags aren't lost when output is large
1395        let mut input = Vec::new();
1396
1397        // Event tag at the beginning
1398        input.extend_from_slice(b"<event topic=\"build.task\">Implement feature X</event>\r\n");
1399
1400        // Simulate 500 lines of verbose output (would overflow any terminal)
1401        for i in 0..500 {
1402            input.extend_from_slice(format!("Line {}: Processing step {}...\r\n", i, i).as_bytes());
1403        }
1404
1405        let stripped = strip_ansi(&input);
1406
1407        // Event tag should still be present - no scrollback loss with strip-ansi-escapes
1408        assert!(
1409            stripped.contains("<event topic=\"build.task\">"),
1410            "Event tag was lost - strip_ansi is not preserving all content"
1411        );
1412        assert!(stripped.contains("Implement feature X"));
1413        assert!(stripped.contains("Line 499")); // Last line should be present too
1414    }
1415
1416    #[test]
1417    fn test_pty_config_defaults() {
1418        let config = PtyConfig::default();
1419        assert!(config.interactive);
1420        assert_eq!(config.idle_timeout_secs, 30);
1421        assert_eq!(config.cols, 80);
1422        assert_eq!(config.rows, 24);
1423    }
1424
1425    /// Verifies that the idle timeout logic in run_interactive correctly handles
1426    /// activity resets. Per spec (interactive-mode.spec.md lines 155-159):
1427    /// - Timeout resets on agent output (any bytes from PTY)
1428    /// - Timeout resets on user input (any key forwarded to agent)
1429    ///
1430    /// This test validates the timeout calculation logic that enables resets.
1431    /// The actual reset happens in the select! branches at lines 497, 523, and 545.
1432    #[test]
1433    fn test_idle_timeout_reset_logic() {
1434        // Simulate the timeout calculation used in run_interactive
1435        let timeout_duration = Duration::from_secs(30);
1436
1437        // Simulate 25 seconds of inactivity
1438        let simulated_25s = Duration::from_secs(25);
1439
1440        // Remaining time before timeout
1441        let remaining = timeout_duration.saturating_sub(simulated_25s);
1442        assert_eq!(remaining.as_secs(), 5);
1443
1444        // After activity (output or input), last_activity would be reset to now
1445        let last_activity_after_reset = Instant::now();
1446
1447        // Now elapsed is 0, full timeout duration available again
1448        let elapsed = last_activity_after_reset.elapsed();
1449        assert!(elapsed < Duration::from_millis(100)); // Should be near-zero
1450
1451        // Timeout calculation would give full duration minus small elapsed
1452        let new_remaining = timeout_duration.saturating_sub(elapsed);
1453        assert!(new_remaining > Duration::from_secs(29)); // Should be nearly full timeout
1454    }
1455
1456    #[test]
1457    fn test_extracted_text_field_exists() {
1458        // Test that PtyExecutionResult has extracted_text field
1459        // This is for NDJSON output where event tags are inside JSON strings
1460        let result = PtyExecutionResult {
1461            output: String::new(),
1462            stripped_output: String::new(),
1463            extracted_text: String::from("<event topic=\"build.done\">Test</event>"),
1464            success: true,
1465            exit_code: Some(0),
1466            termination: TerminationType::Natural,
1467        };
1468
1469        assert!(result.extracted_text.contains("<event topic=\"build.done\">"));
1470    }
1471
1472    #[test]
1473    fn test_build_result_includes_extracted_text() {
1474        // Test that build_result properly handles extracted_text
1475        let output = b"raw output";
1476        let extracted = "extracted text with <event topic=\"test\">payload</event>";
1477        let result = build_result(output, true, Some(0), TerminationType::Natural, extracted.to_string());
1478
1479        assert_eq!(result.extracted_text, extracted);
1480        assert!(result.stripped_output.contains("raw output"));
1481    }
1482}