ralph_workflow/pipeline/
prompt.rs

1//! Prompt-based command execution.
2
3use crate::agents::{is_glm_like_agent, JsonParserType};
4use crate::common::{format_argv_for_log, split_command, truncate_text};
5use crate::config::Config;
6use crate::logger::Colors;
7use crate::logger::Logger;
8use crate::logger::{argv_requests_json, format_generic_json_for_display};
9use crate::pipeline::Timer;
10use std::fs::{self, File, OpenOptions};
11use std::io::{self, BufRead, BufReader, Read, Write};
12use std::path::Path;
13use std::process::{Child, ChildStdout, Command, Stdio};
14
15#[cfg(any(test, feature = "test-utils"))]
16use std::sync::Arc;
17
18/// A line-oriented reader that processes data as it arrives.
19///
20/// Unlike `BufReader::lines()`, this reader yields lines immediately
21/// when newlines are encountered, without waiting for the buffer to fill.
22/// This enables real-time streaming for agents that output NDJSON gradually.
23///
24/// # Buffer Size Limit
25///
26/// The reader enforces a maximum buffer size to prevent memory exhaustion
27/// from malicious or malformed input that never contains newlines.
28/// If the buffer exceeds this limit, subsequent reads will fail with an error.
29struct StreamingLineReader<R: Read> {
30    inner: BufReader<R>,
31    buffer: Vec<u8>,
32    consumed: usize,
33}
34
35/// Maximum buffer size in bytes to prevent unbounded memory growth.
36///
37/// This limits the impact of agents that output continuous data without newlines.
38/// The value of 1 MiB was chosen to:
39/// - Handle most legitimate JSON documents (typically < 100KB)
40/// - Allow for reasonably long single-line JSON outputs
41/// - Prevent memory exhaustion from malicious input
42/// - Keep the buffer size manageable for most systems
43///
44/// If your use case requires larger single-line JSON, consider:
45/// - Modifying your agent to output NDJSON (newline-delimited JSON)
46/// - Adjusting this constant and rebuilding
47const MAX_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB
48
49/// Maximum safe prompt size in bytes for command-line arguments.
50///
51/// The OS has a limit on total argument size (ARG_MAX), typically:
52/// - Linux: 2MB (but often limited to 128KB per argument)
53/// - macOS: ~1MB
54/// - Windows: 32KB
55///
56/// We use a conservative limit of 200KB to:
57/// - Leave room for other arguments and environment variables
58/// - Work safely across all platforms
59/// - Avoid E2BIG (Argument list too long) errors at spawn time
60const MAX_PROMPT_SIZE: usize = 200 * 1024; // 200KB
61
62/// Truncate a prompt that exceeds the safe size limit.
63///
64/// This function intelligently truncates prompts by:
65/// 1. Looking for `{{LAST_OUTPUT}}` marker sections (from XSD retry templates)
66/// 2. Truncating from the beginning of LAST_OUTPUT content (keeping the end)
67/// 3. If no marker found, truncating from the middle to preserve start/end context
68///
69/// Returns the original prompt if within limits, or a truncated version with a marker.
70fn truncate_prompt_if_needed(prompt: &str, logger: &Logger) -> String {
71    if prompt.len() <= MAX_PROMPT_SIZE {
72        return prompt.to_string();
73    }
74
75    let excess = prompt.len() - MAX_PROMPT_SIZE;
76    logger.warn(&format!(
77        "Prompt exceeds safe limit ({} bytes > {} bytes), truncating {} bytes",
78        prompt.len(),
79        MAX_PROMPT_SIZE,
80        excess
81    ));
82
83    // Strategy: Find the largest contiguous block of content that looks like
84    // log output or previous agent output, and truncate from its beginning.
85    // This preserves the task instructions at the start and the most recent
86    // output at the end (which is most relevant for XSD retry errors).
87
88    // Look for common markers that indicate the start of embedded output
89    let truncation_markers = [
90        "\n---\n",            // Common section separator
91        "\n```\n",            // Code block start
92        "\n<last-output>",    // Explicit marker
93        "\nPrevious output:", // Text marker
94    ];
95
96    for marker in truncation_markers {
97        if let Some(marker_pos) = prompt.find(marker) {
98            // Found a marker - truncate content after it
99            let content_start = marker_pos + marker.len();
100            if content_start < prompt.len() {
101                let before_marker = &prompt[..content_start];
102                let after_marker = &prompt[content_start..];
103
104                if after_marker.len() > excess + 100 {
105                    // Truncate from the beginning of the content section
106                    let keep_from = excess + 100; // Keep extra for clean line boundary
107                    let truncated_content = &after_marker[keep_from..];
108
109                    // Find next newline for clean truncation
110                    let clean_start = truncated_content.find('\n').map(|i| i + 1).unwrap_or(0);
111
112                    return format!(
113                        "{}\n[... {} bytes truncated to fit CLI argument limit ...]\n{}",
114                        before_marker,
115                        keep_from + clean_start,
116                        &truncated_content[clean_start..]
117                    );
118                }
119            }
120        }
121    }
122
123    // Fallback: truncate from the middle, preserving start and end
124    let keep_start = MAX_PROMPT_SIZE / 3;
125    let keep_end = MAX_PROMPT_SIZE / 3;
126    let start_part = &prompt[..keep_start];
127    let end_part = &prompt[prompt.len() - keep_end..];
128
129    // Find clean line boundaries
130    let start_end = start_part.rfind('\n').map(|i| i + 1).unwrap_or(keep_start);
131    let end_start = end_part.find('\n').map(|i| i + 1).unwrap_or(0);
132
133    format!(
134        "{}\n\n[... {} bytes truncated to fit CLI argument limit ...]\n\n{}",
135        &prompt[..start_end],
136        prompt.len() - start_end - (keep_end - end_start),
137        &end_part[end_start..]
138    )
139}
140
141impl<R: Read> StreamingLineReader<R> {
142    /// Create a new streaming line reader with a small buffer for low latency.
143    fn new(inner: R) -> Self {
144        // Use a smaller buffer (1KB) than default (8KB) for lower latency.
145        // This trades slightly more syscalls for faster response to newlines.
146        const BUFFER_SIZE: usize = 1024;
147        Self {
148            inner: BufReader::with_capacity(BUFFER_SIZE, inner),
149            buffer: Vec::new(),
150            consumed: 0,
151        }
152    }
153
154    /// Fill the internal buffer from the underlying reader.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if the buffer would exceed `MAX_BUFFER_SIZE`.
159    /// This prevents memory exhaustion from malicious input that never contains newlines.
160    fn fill_buffer(&mut self) -> io::Result<usize> {
161        // Check if we're approaching the limit before reading more
162        let current_size = self.buffer.len() - self.consumed;
163        if current_size >= MAX_BUFFER_SIZE {
164            return Err(io::Error::other(format!(
165                "StreamingLineReader buffer exceeded maximum size of {MAX_BUFFER_SIZE} bytes. \
166                This may indicate malformed input or an agent that is not sending newlines."
167            )));
168        }
169
170        let mut read_buf = [0u8; 256];
171        let n = self.inner.read(&mut read_buf)?;
172        if n > 0 {
173            // Check if adding this data would exceed the limit
174            let new_size = current_size + n;
175            if new_size > MAX_BUFFER_SIZE {
176                return Err(io::Error::other(format!(
177                    "StreamingLineReader buffer would exceed maximum size of {MAX_BUFFER_SIZE} bytes. \
178                    This may indicate malformed input or an agent that is not sending newlines."
179                )));
180            }
181            self.buffer.extend_from_slice(&read_buf[..n]);
182        }
183        Ok(n)
184    }
185}
186
187impl<R: Read> Read for StreamingLineReader<R> {
188    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
189        // First, consume from the buffer
190        let available = self.buffer.len() - self.consumed;
191        if available > 0 {
192            let to_copy = available.min(buf.len());
193            buf[..to_copy].copy_from_slice(&self.buffer[self.consumed..self.consumed + to_copy]);
194            self.consumed += to_copy;
195
196            // Compact the buffer if we've consumed everything
197            if self.consumed == self.buffer.len() {
198                self.buffer.clear();
199                self.consumed = 0;
200            }
201            return Ok(to_copy);
202        }
203
204        // Buffer empty - read directly from underlying reader
205        self.inner.read(buf)
206    }
207}
208
209impl<R: Read> BufRead for StreamingLineReader<R> {
210    fn fill_buf(&mut self) -> io::Result<&[u8]> {
211        const MAX_ATTEMPTS: usize = 8; // Prevent infinite loop
212
213        // If we have unconsumed data, return it
214        if self.consumed < self.buffer.len() {
215            return Ok(&self.buffer[self.consumed..]);
216        }
217
218        // Buffer was fully consumed - clear and try to read more
219        self.buffer.clear();
220        self.consumed = 0;
221
222        // Try to fill the buffer with at least some data
223        let mut total_read = 0;
224        for _ in 0..MAX_ATTEMPTS {
225            match self.fill_buffer()? {
226                0 if total_read == 0 => return Ok(&[]), // EOF
227                0 => break,                             // No more data available right now
228                n => {
229                    total_read += n;
230                    // Check if we have a newline
231                    if self.buffer.contains(&b'\n') {
232                        break;
233                    }
234                }
235            }
236        }
237
238        Ok(&self.buffer[self.consumed..])
239    }
240
241    fn consume(&mut self, amt: usize) {
242        self.consumed = (self.consumed + amt).min(self.buffer.len());
243
244        // Compact the buffer if we've consumed everything
245        if self.consumed == self.buffer.len() {
246            self.buffer.clear();
247            self.consumed = 0;
248        }
249    }
250}
251
252use super::clipboard::get_platform_clipboard_command;
253use super::types::CommandResult;
254
255/// A single prompt-based agent invocation.
256pub struct PromptCommand<'a> {
257    pub label: &'a str,
258    pub display_name: &'a str,
259    pub cmd_str: &'a str,
260    pub prompt: &'a str,
261    pub logfile: &'a str,
262    pub parser_type: JsonParserType,
263    pub env_vars: &'a std::collections::HashMap<String, String>,
264}
265
266/// Runtime services required for running agent commands.
267pub struct PipelineRuntime<'a> {
268    pub timer: &'a mut Timer,
269    pub logger: &'a Logger,
270    pub colors: &'a Colors,
271    pub config: &'a Config,
272    /// Optional agent executor for mocking subprocess execution in tests.
273    #[cfg(any(test, feature = "test-utils"))]
274    pub agent_executor: Option<Arc<dyn super::test_trait::AgentExecutor>>,
275}
276
277/// Command configuration for building an agent command.
278struct CommandConfig<'a> {
279    cmd_str: &'a str,
280    prompt: &'a str,
281    env_vars: &'a std::collections::HashMap<String, String>,
282    logfile: &'a str,
283    parser_type: JsonParserType,
284}
285
286/// Saves the prompt to a file and optionally copies it to the clipboard.
287fn save_prompt_to_file_and_clipboard(
288    prompt: &str,
289    prompt_path: &std::path::PathBuf,
290    interactive: bool,
291    logger: &Logger,
292    colors: Colors,
293) -> io::Result<()> {
294    // Save prompt to file
295    if let Some(parent) = prompt_path.parent() {
296        fs::create_dir_all(parent)?;
297    }
298    fs::write(prompt_path, prompt)?;
299    logger.info(&format!(
300        "Prompt saved to {}{}{}",
301        colors.cyan(),
302        prompt_path.display(),
303        colors.reset()
304    ));
305
306    // Copy to clipboard if interactive
307    if interactive {
308        if let Some(clipboard_cmd) = get_platform_clipboard_command() {
309            if let Ok(mut child) = Command::new(clipboard_cmd.binary)
310                .args(clipboard_cmd.args)
311                .stdin(Stdio::piped())
312                .spawn()
313            {
314                if let Some(mut stdin) = child.stdin.take() {
315                    let _ = stdin.write_all(prompt.as_bytes());
316                }
317                let _ = child.wait();
318                logger.info(&format!(
319                    "Prompt copied to clipboard {}({}){}",
320                    colors.dim(),
321                    clipboard_cmd.paste_hint,
322                    colors.reset()
323                ));
324            }
325        }
326    }
327    Ok(())
328}
329
330/// Builds and configures the agent command with environment variables.
331fn build_agent_command(
332    config: &CommandConfig<'_>,
333    anthropic_env_vars_to_sanitize: &[&str],
334    logger: &Logger,
335    colors: Colors,
336) -> io::Result<(Vec<String>, Command)> {
337    let argv = split_command(config.cmd_str)?;
338    if argv.is_empty() || config.cmd_str.trim().is_empty() {
339        return Err(io::Error::new(
340            io::ErrorKind::InvalidInput,
341            "Agent command is empty or contains only whitespace",
342        ));
343    }
344
345    let mut argv_for_log = argv.clone();
346    argv_for_log.push("<PROMPT>".to_string());
347    let display_cmd = truncate_text(&format_argv_for_log(&argv_for_log), 160);
348    logger.info(&format!(
349        "Executing: {}{}{}",
350        colors.dim(),
351        display_cmd,
352        colors.reset()
353    ));
354
355    // GLM-specific debug logging (only for CCS/Claude-based GLM)
356    let is_glm_cmd = is_glm_like_agent(config.cmd_str);
357    if is_glm_cmd {
358        logger.info(&format!("GLM command details: {display_cmd}"));
359        if argv.iter().any(|arg| arg == "-p") {
360            logger.info("GLM command includes '-p' flag (correct)");
361        } else {
362            logger.warn("GLM command may be missing '-p' flag");
363        }
364    }
365
366    let _uses_json = config.parser_type != JsonParserType::Generic || argv_requests_json(&argv);
367    logger.info(&format!("Using {} parser...", config.parser_type));
368
369    if let Some(parent) = Path::new(config.logfile).parent() {
370        fs::create_dir_all(parent)?;
371    }
372    File::create(config.logfile)?;
373
374    let mut command = Command::new(&argv[0]);
375    command.args(&argv[1..]);
376
377    // Truncate prompt if it exceeds safe CLI argument limits to prevent E2BIG errors
378    let prompt = truncate_prompt_if_needed(config.prompt, logger);
379    command.arg(&prompt);
380
381    // Inject environment variables from agent config
382    if !config.env_vars.is_empty() {
383        logger.info(&format!(
384            "Injecting {} environment variable(s) into subprocess",
385            config.env_vars.len()
386        ));
387        for key in config.env_vars.keys() {
388            logger.info(&format!("  - {key}"));
389        }
390        for (key, value) in config.env_vars {
391            command.env(key, value);
392        }
393    }
394
395    // Set agent-side buffering disabling environment variables for real-time streaming.
396    // These are only set if not already explicitly configured by the user's env_vars.
397    // This mitigates the issue where AI agents buffer their stdout instead of streaming.
398    //
399    // Note: NODE_ENV is set to "production" (not "development") because production mode
400    // disables buffering in Node.js applications. This is necessary for real-time streaming
401    // but may affect error stack traces and logging levels in Node.js agents.
402    let buffering_vars = [("PYTHONUNBUFFERED", "1"), ("NODE_ENV", "production")];
403    for (key, value) in buffering_vars {
404        if !config.env_vars.contains_key(key) {
405            command.env(key, value);
406        }
407    }
408
409    // Clear problematic Anthropic env vars that weren't explicitly set by the agent.
410    for &var in anthropic_env_vars_to_sanitize {
411        if !config.env_vars.contains_key(var) {
412            command.env_remove(var);
413        }
414    }
415
416    Ok((argv, command))
417}
418
419/// Spawns the agent process, converting ALL spawn errors into `CommandResult`.
420///
421/// This ensures that any failure to spawn the agent process is handled by the
422/// fallback system instead of crashing the pipeline. Common errors:
423/// - `NotFound` (exit code 127): Command not found
424/// - `PermissionDenied` (exit code 126): Permission denied  
425/// - `ArgumentListTooLong` (exit code 7): Prompt too large for command-line argument
426/// - Other errors (exit code 1): Converted to CommandResult for fallback handling
427fn spawn_agent_process(mut command: Command, argv: &[String]) -> Result<Child, CommandResult> {
428    match command
429        .stdin(Stdio::null())
430        .stdout(Stdio::piped())
431        .stderr(Stdio::piped())
432        .spawn()
433    {
434        Ok(child) => Ok(child),
435        Err(e) => {
436            // Convert ALL spawn errors to CommandResult so fallback can handle them.
437            // This prevents any spawn failure from crashing the entire pipeline.
438            let (exit_code, detail) = match e.kind() {
439                io::ErrorKind::NotFound => (127, "command not found"),
440                io::ErrorKind::PermissionDenied => (126, "permission denied"),
441                io::ErrorKind::ArgumentListTooLong => {
442                    (7, "argument list too long (prompt exceeds OS limit)")
443                }
444                io::ErrorKind::InvalidInput => (22, "invalid input"),
445                io::ErrorKind::OutOfMemory => (12, "out of memory"),
446                _ => (1, "spawn failed"),
447            };
448
449            Err(CommandResult {
450                exit_code,
451                stderr: format!("{}: {} - {}", argv[0], detail, e),
452            })
453        }
454    }
455}
456
457/// Streams agent output based on parser type.
458fn stream_agent_output(
459    stdout: ChildStdout,
460    cmd: &PromptCommand<'_>,
461    runtime: &PipelineRuntime<'_>,
462) -> io::Result<()> {
463    // Use StreamingLineReader for real-time streaming instead of BufReader::lines().
464    // StreamingLineReader yields lines immediately when newlines are found,
465    // enabling character-by-character streaming for agents that output NDJSON gradually.
466    let reader = StreamingLineReader::new(stdout);
467
468    if cmd.parser_type != JsonParserType::Generic
469        || argv_requests_json(&split_command(cmd.cmd_str)?)
470    {
471        let stdout_io = io::stdout();
472        let mut out = stdout_io.lock();
473
474        match cmd.parser_type {
475            JsonParserType::Claude => {
476                let p = crate::json_parser::ClaudeParser::new(
477                    *runtime.colors,
478                    runtime.config.verbosity,
479                )
480                .with_display_name(cmd.display_name)
481                .with_log_file(cmd.logfile)
482                .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
483                p.parse_stream(reader)?;
484            }
485            JsonParserType::Codex => {
486                let p =
487                    crate::json_parser::CodexParser::new(*runtime.colors, runtime.config.verbosity)
488                        .with_display_name(cmd.display_name)
489                        .with_log_file(cmd.logfile)
490                        .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
491                p.parse_stream(reader)?;
492            }
493            JsonParserType::Gemini => {
494                let p = crate::json_parser::GeminiParser::new(
495                    *runtime.colors,
496                    runtime.config.verbosity,
497                )
498                .with_display_name(cmd.display_name)
499                .with_log_file(cmd.logfile)
500                .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
501                p.parse_stream(reader)?;
502            }
503            JsonParserType::OpenCode => {
504                let p = crate::json_parser::OpenCodeParser::new(
505                    *runtime.colors,
506                    runtime.config.verbosity,
507                )
508                .with_display_name(cmd.display_name)
509                .with_log_file(cmd.logfile)
510                .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
511                p.parse_stream(reader)?;
512            }
513            JsonParserType::Generic => {
514                let mut logfile = OpenOptions::new()
515                    .create(true)
516                    .append(true)
517                    .open(cmd.logfile)?;
518
519                let mut buf = String::new();
520                for line in reader.lines() {
521                    let line = line?;
522                    // Write raw line to log file for extraction
523                    writeln!(logfile, "{line}")?;
524                    buf.push_str(&line);
525                    buf.push('\n');
526                }
527                logfile.flush()?;
528                // Ensure data is written to disk before continuing
529                // This prevents race conditions where extraction runs before OS commits writes
530                let _ = logfile.sync_all();
531
532                let formatted = format_generic_json_for_display(&buf, runtime.config.verbosity);
533                out.write_all(formatted.as_bytes())?;
534            }
535        }
536    } else {
537        let mut logfile = OpenOptions::new()
538            .create(true)
539            .append(true)
540            .open(cmd.logfile)?;
541
542        let stdout_io = io::stdout();
543        let mut out = stdout_io.lock();
544
545        for line in reader.lines() {
546            let line = line?;
547            writeln!(out, "{line}")?;
548            writeln!(logfile, "{line}")?;
549        }
550        logfile.flush()?;
551        // Ensure data is written to disk before continuing
552        // This prevents race conditions where extraction runs before OS commits writes
553        let _ = logfile.sync_all();
554    }
555    Ok(())
556}
557
558/// Waits for process completion and collects stderr output.
559fn wait_for_completion_and_collect_stderr(
560    mut child: Child,
561    stderr_join_handle: Option<std::thread::JoinHandle<io::Result<String>>>,
562    runtime: &PipelineRuntime<'_>,
563) -> io::Result<(i32, String)> {
564    let status = child.wait()?;
565    let exit_code = status.code().unwrap_or(1);
566
567    if status.code().is_none() && runtime.config.verbosity.is_debug() {
568        runtime
569            .logger
570            .warn("Process terminated by signal (no exit code), treating as failure");
571    }
572
573    let stderr_output = match stderr_join_handle {
574        Some(handle) => match handle.join() {
575            Ok(result) => result?,
576            Err(panic_payload) => {
577                let panic_msg = panic_payload.downcast_ref::<String>().map_or_else(
578                    || {
579                        panic_payload.downcast_ref::<&str>().map_or_else(
580                            || "<unknown panic>".to_string(),
581                            std::string::ToString::to_string,
582                        )
583                    },
584                    std::clone::Clone::clone,
585                );
586                runtime.logger.warn(&format!(
587                    "Stderr collection thread panicked: {panic_msg}. This may indicate a bug."
588                ));
589                String::new()
590            }
591        },
592        None => String::new(),
593    };
594
595    if !stderr_output.is_empty() && runtime.config.verbosity.is_debug() {
596        runtime.logger.warn(&format!(
597            "Agent stderr output detected ({} bytes):",
598            stderr_output.len()
599        ));
600        for (i, line) in stderr_output.lines().take(5).enumerate() {
601            runtime.logger.info(&format!("  stderr[{i}]: {line}"));
602        }
603        if stderr_output.lines().count() > 5 {
604            runtime.logger.info(&format!(
605                "  ... ({} more lines, see log file for full output)",
606                stderr_output.lines().count() - 5
607            ));
608        }
609    }
610
611    Ok((exit_code, stderr_output))
612}
613
614/// Run a command with a prompt argument.
615///
616/// This is an internal helper for `run_with_fallback`.
617pub fn run_with_prompt(
618    cmd: &PromptCommand<'_>,
619    runtime: &mut PipelineRuntime<'_>,
620) -> io::Result<CommandResult> {
621    const ANTHROPIC_ENV_VARS_TO_SANITIZE: &[&str] = &[
622        "ANTHROPIC_API_KEY",
623        "ANTHROPIC_BASE_URL",
624        "ANTHROPIC_AUTH_TOKEN",
625        "ANTHROPIC_MODEL",
626        "ANTHROPIC_DEFAULT_HAIKU_MODEL",
627        "ANTHROPIC_DEFAULT_OPUS_MODEL",
628        "ANTHROPIC_DEFAULT_SONNET_MODEL",
629    ];
630
631    runtime.timer.start_phase();
632    runtime.logger.step(&format!(
633        "{}{}{}",
634        runtime.colors.bold(),
635        cmd.label,
636        runtime.colors.reset()
637    ));
638
639    save_prompt_to_file_and_clipboard(
640        cmd.prompt,
641        &runtime.config.prompt_path,
642        runtime.config.behavior.interactive,
643        runtime.logger,
644        *runtime.colors,
645    )?;
646
647    // Use agent executor if provided (for testing)
648    #[cfg(any(test, feature = "test-utils"))]
649    {
650        if let Some(executor) = runtime.agent_executor.clone() {
651            return run_with_agent_executor(cmd, runtime, &executor);
652        }
653    }
654
655    run_with_subprocess(cmd, runtime, ANTHROPIC_ENV_VARS_TO_SANITIZE)
656}
657
658/// Run agent using the real subprocess execution.
659#[cfg(not(any(test, feature = "test-utils")))]
660fn run_with_subprocess(
661    cmd: &PromptCommand<'_>,
662    runtime: &mut PipelineRuntime<'_>,
663    anthropic_env_vars_to_sanitize: &[&str],
664) -> io::Result<CommandResult> {
665    let (argv, command) = build_agent_command(
666        &CommandConfig {
667            cmd_str: cmd.cmd_str,
668            prompt: cmd.prompt,
669            env_vars: cmd.env_vars,
670            logfile: cmd.logfile,
671            parser_type: cmd.parser_type,
672        },
673        anthropic_env_vars_to_sanitize,
674        runtime.logger,
675        *runtime.colors,
676    )?;
677
678    let mut child = match spawn_agent_process(command, &argv) {
679        Ok(child) => child,
680        Err(result) => return Ok(result),
681    };
682
683    let stdout = child
684        .stdout
685        .take()
686        .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
687
688    let stderr_join_handle = child.stderr.take().map(|stderr| {
689        std::thread::spawn(move || -> io::Result<String> {
690            const STDERR_MAX_BYTES: usize = 512 * 1024;
691
692            let mut reader = BufReader::new(stderr);
693            let mut buf = [0u8; 8192];
694            let mut collected = Vec::<u8>::new();
695            let mut truncated = false;
696
697            loop {
698                let n = reader.read(&mut buf)?;
699                if n == 0 {
700                    break;
701                }
702
703                let remaining = STDERR_MAX_BYTES.saturating_sub(collected.len());
704                if remaining == 0 {
705                    truncated = true;
706                    break;
707                }
708
709                let to_take = remaining.min(n);
710                collected.extend_from_slice(&buf[..to_take]);
711                if to_take < n {
712                    truncated = true;
713                    break;
714                }
715            }
716
717            let mut stderr_output = String::from_utf8_lossy(&collected).into_owned();
718            if truncated {
719                if !stderr_output.ends_with('\n') {
720                    stderr_output.push('\n');
721                }
722                stderr_output.push_str("<stderr truncated>");
723            }
724
725            Ok(stderr_output)
726        })
727    });
728
729    stream_agent_output(stdout, cmd, runtime)?;
730
731    let (exit_code, stderr_output) =
732        wait_for_completion_and_collect_stderr(child, stderr_join_handle, runtime)?;
733
734    if runtime.config.verbosity.is_verbose() {
735        runtime.logger.info(&format!(
736            "Phase elapsed: {}",
737            runtime.timer.phase_elapsed_formatted()
738        ));
739    }
740
741    Ok(CommandResult {
742        exit_code,
743        stderr: stderr_output,
744    })
745}
746
747/// Run agent using the real subprocess execution.
748#[cfg(any(test, feature = "test-utils"))]
749fn run_with_subprocess(
750    cmd: &PromptCommand<'_>,
751    runtime: &mut PipelineRuntime<'_>,
752    anthropic_env_vars_to_sanitize: &[&str],
753) -> io::Result<CommandResult> {
754    let (argv, command) = build_agent_command(
755        &CommandConfig {
756            cmd_str: cmd.cmd_str,
757            prompt: cmd.prompt,
758            env_vars: cmd.env_vars,
759            logfile: cmd.logfile,
760            parser_type: cmd.parser_type,
761        },
762        anthropic_env_vars_to_sanitize,
763        runtime.logger,
764        *runtime.colors,
765    )?;
766
767    let mut child = match spawn_agent_process(command, &argv) {
768        Ok(child) => child,
769        Err(result) => return Ok(result),
770    };
771
772    let stdout = child
773        .stdout
774        .take()
775        .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
776
777    let stderr_join_handle = child.stderr.take().map(|stderr| {
778        std::thread::spawn(move || -> io::Result<String> {
779            const STDERR_MAX_BYTES: usize = 512 * 1024;
780
781            let mut reader = BufReader::new(stderr);
782            let mut buf = [0u8; 8192];
783            let mut collected = Vec::<u8>::new();
784            let mut truncated = false;
785
786            loop {
787                let n = reader.read(&mut buf)?;
788                if n == 0 {
789                    break;
790                }
791
792                let remaining = STDERR_MAX_BYTES.saturating_sub(collected.len());
793                if remaining == 0 {
794                    truncated = true;
795                    break;
796                }
797
798                let to_take = remaining.min(n);
799                collected.extend_from_slice(&buf[..to_take]);
800                if to_take < n {
801                    truncated = true;
802                    break;
803                }
804            }
805
806            let mut stderr_output = String::from_utf8_lossy(&collected).into_owned();
807            if truncated {
808                if !stderr_output.ends_with('\n') {
809                    stderr_output.push('\n');
810                }
811                stderr_output.push_str("<stderr truncated>");
812            }
813
814            Ok(stderr_output)
815        })
816    });
817
818    stream_agent_output(stdout, cmd, runtime)?;
819
820    let (exit_code, stderr_output) =
821        wait_for_completion_and_collect_stderr(child, stderr_join_handle, runtime)?;
822
823    if runtime.config.verbosity.is_verbose() {
824        runtime.logger.info(&format!(
825            "Phase elapsed: {}",
826            runtime.timer.phase_elapsed_formatted()
827        ));
828    }
829
830    Ok(CommandResult {
831        exit_code,
832        stderr: stderr_output,
833    })
834}
835
836/// Run agent using the mocked AgentExecutor (for testing).
837#[cfg(any(test, feature = "test-utils"))]
838fn run_with_agent_executor(
839    cmd: &PromptCommand<'_>,
840    runtime: &mut PipelineRuntime<'_>,
841    executor: &std::sync::Arc<dyn super::test_trait::AgentExecutor>,
842) -> io::Result<CommandResult> {
843    use super::test_trait::AgentCommandConfig;
844
845    let (argv, _command) = build_agent_command(
846        &CommandConfig {
847            cmd_str: cmd.cmd_str,
848            prompt: cmd.prompt,
849            env_vars: cmd.env_vars,
850            logfile: cmd.logfile,
851            parser_type: cmd.parser_type,
852        },
853        &[],
854        runtime.logger,
855        *runtime.colors,
856    )?;
857
858    let display_cmd = truncate_text(&format_argv_for_log(&argv), 160);
859    runtime.logger.info(&format!(
860        "Executing (mocked): {}{}{}",
861        runtime.colors.dim(),
862        display_cmd,
863        runtime.colors.reset()
864    ));
865
866    let result = executor.execute(&AgentCommandConfig {
867        cmd: cmd.cmd_str.to_string(),
868        prompt: cmd.prompt.to_string(),
869        env_vars: cmd.env_vars.clone(),
870        parser_type: cmd.parser_type,
871        logfile: cmd.logfile.to_string(),
872        display_name: cmd.display_name.to_string(),
873    })?;
874
875    if runtime.config.verbosity.is_verbose() {
876        runtime.logger.info(&format!(
877            "Phase elapsed: {}",
878            runtime.timer.phase_elapsed_formatted()
879        ));
880    }
881
882    Ok(CommandResult {
883        exit_code: result.exit_code,
884        stderr: result.stderr,
885    })
886}
887
888#[cfg(test)]
889mod tests {
890    use super::*;
891
892    fn test_logger() -> Logger {
893        Logger::new(Colors::new())
894    }
895
896    #[test]
897    fn test_truncate_prompt_small_content() {
898        let logger = test_logger();
899        let content = "This is a small prompt that fits within limits.";
900        let result = truncate_prompt_if_needed(content, &logger);
901        assert_eq!(result, content);
902    }
903
904    #[test]
905    fn test_truncate_prompt_large_content_with_marker() {
906        let logger = test_logger();
907        // Create content larger than MAX_PROMPT_SIZE with a section separator
908        let prefix = "Task: Do something\n\n---\n";
909        let large_content = "x".repeat(MAX_PROMPT_SIZE + 50000);
910        let content = format!("{}{}", prefix, large_content);
911
912        let result = truncate_prompt_if_needed(&content, &logger);
913
914        // Should be truncated
915        assert!(result.len() < content.len());
916        // Should have truncation marker
917        assert!(result.contains("truncated"));
918        // Should preserve the prefix
919        assert!(result.starts_with("Task:"));
920    }
921
922    #[test]
923    fn test_truncate_prompt_large_content_fallback() {
924        let logger = test_logger();
925        // Create content larger than MAX_PROMPT_SIZE without any markers
926        let content = "a".repeat(MAX_PROMPT_SIZE + 50000);
927
928        let result = truncate_prompt_if_needed(&content, &logger);
929
930        // Should be truncated
931        assert!(result.len() < content.len());
932        // Should have truncation marker
933        assert!(result.contains("truncated"));
934    }
935
936    #[test]
937    fn test_truncate_prompt_preserves_end() {
938        let logger = test_logger();
939        // Content with marker and important end content
940        let prefix = "Instructions\n\n---\n";
941        let middle = "m".repeat(MAX_PROMPT_SIZE);
942        let suffix = "\nIMPORTANT_END_MARKER";
943        let content = format!("{}{}{}", prefix, middle, suffix);
944
945        let result = truncate_prompt_if_needed(&content, &logger);
946
947        // Should preserve the end content (most relevant for XSD errors)
948        assert!(result.contains("IMPORTANT_END_MARKER"));
949    }
950
951    #[test]
952    fn test_spawn_agent_process_command_not_found() {
953        // Try to spawn a command that doesn't exist
954        let command = Command::new("/nonexistent/command/that/does/not/exist");
955        let argv = vec!["/nonexistent/command/that/does/not/exist".to_string()];
956
957        let result = spawn_agent_process(command, &argv);
958
959        // Should return Err(CommandResult) with exit code 127, not panic
960        assert!(result.is_err());
961        let cmd_result = result.unwrap_err();
962        assert_eq!(cmd_result.exit_code, 127);
963        assert!(cmd_result.stderr.contains("command not found"));
964    }
965
966    #[test]
967    fn test_spawn_agent_process_converts_all_errors_to_command_result() {
968        // Verify that spawn errors don't propagate as io::Error
969        // This test ensures fallback can handle the error
970        let command = Command::new(""); // Empty command should fail
971        let argv = vec!["".to_string()];
972
973        let result = spawn_agent_process(command, &argv);
974
975        // Should be Err(CommandResult), not a panic or io::Error propagation
976        assert!(result.is_err());
977        // The CommandResult should have a non-zero exit code
978        assert_ne!(result.unwrap_err().exit_code, 0);
979    }
980}