ralph_workflow/pipeline/
prompt.rs

1//! Prompt-based command execution.
2
3use crate::agents::{is_glm_like_agent, JsonParserType};
4use crate::common::{format_argv_for_log, split_command, truncate_text};
5use crate::config::Config;
6use crate::logger::Colors;
7use crate::logger::Logger;
8use crate::logger::{argv_requests_json, format_generic_json_for_display};
9use crate::pipeline::Timer;
10use std::fs::{self, File, OpenOptions};
11use std::io::{self, BufRead, BufReader, Read, Write};
12use std::path::Path;
13use std::process::{Child, ChildStdout, Command, Stdio};
14
15#[cfg(any(test, feature = "test-utils"))]
16use std::sync::Arc;
17
18/// A line-oriented reader that processes data as it arrives.
19///
20/// Unlike `BufReader::lines()`, this reader yields lines immediately
21/// when newlines are encountered, without waiting for the buffer to fill.
22/// This enables real-time streaming for agents that output NDJSON gradually.
23///
24/// # Buffer Size Limit
25///
26/// The reader enforces a maximum buffer size to prevent memory exhaustion
27/// from malicious or malformed input that never contains newlines.
28/// If the buffer exceeds this limit, subsequent reads will fail with an error.
29struct StreamingLineReader<R: Read> {
30    inner: BufReader<R>,
31    buffer: Vec<u8>,
32    consumed: usize,
33}
34
35/// Maximum buffer size in bytes to prevent unbounded memory growth.
36///
37/// This limits the impact of agents that output continuous data without newlines.
38/// The value of 1 MiB was chosen to:
39/// - Handle most legitimate JSON documents (typically < 100KB)
40/// - Allow for reasonably long single-line JSON outputs
41/// - Prevent memory exhaustion from malicious input
42/// - Keep the buffer size manageable for most systems
43///
44/// If your use case requires larger single-line JSON, consider:
45/// - Modifying your agent to output NDJSON (newline-delimited JSON)
46/// - Adjusting this constant and rebuilding
47const MAX_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB
48
49impl<R: Read> StreamingLineReader<R> {
50    /// Create a new streaming line reader with a small buffer for low latency.
51    fn new(inner: R) -> Self {
52        // Use a smaller buffer (1KB) than default (8KB) for lower latency.
53        // This trades slightly more syscalls for faster response to newlines.
54        const BUFFER_SIZE: usize = 1024;
55        Self {
56            inner: BufReader::with_capacity(BUFFER_SIZE, inner),
57            buffer: Vec::new(),
58            consumed: 0,
59        }
60    }
61
62    /// Fill the internal buffer from the underlying reader.
63    ///
64    /// # Errors
65    ///
66    /// Returns an error if the buffer would exceed `MAX_BUFFER_SIZE`.
67    /// This prevents memory exhaustion from malicious input that never contains newlines.
68    fn fill_buffer(&mut self) -> io::Result<usize> {
69        // Check if we're approaching the limit before reading more
70        let current_size = self.buffer.len() - self.consumed;
71        if current_size >= MAX_BUFFER_SIZE {
72            return Err(io::Error::other(format!(
73                "StreamingLineReader buffer exceeded maximum size of {MAX_BUFFER_SIZE} bytes. \
74                This may indicate malformed input or an agent that is not sending newlines."
75            )));
76        }
77
78        let mut read_buf = [0u8; 256];
79        let n = self.inner.read(&mut read_buf)?;
80        if n > 0 {
81            // Check if adding this data would exceed the limit
82            let new_size = current_size + n;
83            if new_size > MAX_BUFFER_SIZE {
84                return Err(io::Error::other(format!(
85                    "StreamingLineReader buffer would exceed maximum size of {MAX_BUFFER_SIZE} bytes. \
86                    This may indicate malformed input or an agent that is not sending newlines."
87                )));
88            }
89            self.buffer.extend_from_slice(&read_buf[..n]);
90        }
91        Ok(n)
92    }
93}
94
95impl<R: Read> Read for StreamingLineReader<R> {
96    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
97        // First, consume from the buffer
98        let available = self.buffer.len() - self.consumed;
99        if available > 0 {
100            let to_copy = available.min(buf.len());
101            buf[..to_copy].copy_from_slice(&self.buffer[self.consumed..self.consumed + to_copy]);
102            self.consumed += to_copy;
103
104            // Compact the buffer if we've consumed everything
105            if self.consumed == self.buffer.len() {
106                self.buffer.clear();
107                self.consumed = 0;
108            }
109            return Ok(to_copy);
110        }
111
112        // Buffer empty - read directly from underlying reader
113        self.inner.read(buf)
114    }
115}
116
117impl<R: Read> BufRead for StreamingLineReader<R> {
118    fn fill_buf(&mut self) -> io::Result<&[u8]> {
119        const MAX_ATTEMPTS: usize = 8; // Prevent infinite loop
120
121        // If we have unconsumed data, return it
122        if self.consumed < self.buffer.len() {
123            return Ok(&self.buffer[self.consumed..]);
124        }
125
126        // Buffer was fully consumed - clear and try to read more
127        self.buffer.clear();
128        self.consumed = 0;
129
130        // Try to fill the buffer with at least some data
131        let mut total_read = 0;
132        for _ in 0..MAX_ATTEMPTS {
133            match self.fill_buffer()? {
134                0 if total_read == 0 => return Ok(&[]), // EOF
135                0 => break,                             // No more data available right now
136                n => {
137                    total_read += n;
138                    // Check if we have a newline
139                    if self.buffer.contains(&b'\n') {
140                        break;
141                    }
142                }
143            }
144        }
145
146        Ok(&self.buffer[self.consumed..])
147    }
148
149    fn consume(&mut self, amt: usize) {
150        self.consumed = (self.consumed + amt).min(self.buffer.len());
151
152        // Compact the buffer if we've consumed everything
153        if self.consumed == self.buffer.len() {
154            self.buffer.clear();
155            self.consumed = 0;
156        }
157    }
158}
159
160use super::clipboard::get_platform_clipboard_command;
161use super::types::CommandResult;
162
163/// A single prompt-based agent invocation.
164pub struct PromptCommand<'a> {
165    pub label: &'a str,
166    pub display_name: &'a str,
167    pub cmd_str: &'a str,
168    pub prompt: &'a str,
169    pub logfile: &'a str,
170    pub parser_type: JsonParserType,
171    pub env_vars: &'a std::collections::HashMap<String, String>,
172}
173
174/// Runtime services required for running agent commands.
175pub struct PipelineRuntime<'a> {
176    pub timer: &'a mut Timer,
177    pub logger: &'a Logger,
178    pub colors: &'a Colors,
179    pub config: &'a Config,
180    /// Optional agent executor for mocking subprocess execution in tests.
181    #[cfg(any(test, feature = "test-utils"))]
182    pub agent_executor: Option<Arc<dyn super::test_trait::AgentExecutor>>,
183}
184
185/// Command configuration for building an agent command.
186struct CommandConfig<'a> {
187    cmd_str: &'a str,
188    prompt: &'a str,
189    env_vars: &'a std::collections::HashMap<String, String>,
190    logfile: &'a str,
191    parser_type: JsonParserType,
192}
193
194/// Saves the prompt to a file and optionally copies it to the clipboard.
195fn save_prompt_to_file_and_clipboard(
196    prompt: &str,
197    prompt_path: &std::path::PathBuf,
198    interactive: bool,
199    logger: &Logger,
200    colors: Colors,
201) -> io::Result<()> {
202    // Save prompt to file
203    if let Some(parent) = prompt_path.parent() {
204        fs::create_dir_all(parent)?;
205    }
206    fs::write(prompt_path, prompt)?;
207    logger.info(&format!(
208        "Prompt saved to {}{}{}",
209        colors.cyan(),
210        prompt_path.display(),
211        colors.reset()
212    ));
213
214    // Copy to clipboard if interactive
215    if interactive {
216        if let Some(clipboard_cmd) = get_platform_clipboard_command() {
217            if let Ok(mut child) = Command::new(clipboard_cmd.binary)
218                .args(clipboard_cmd.args)
219                .stdin(Stdio::piped())
220                .spawn()
221            {
222                if let Some(mut stdin) = child.stdin.take() {
223                    let _ = stdin.write_all(prompt.as_bytes());
224                }
225                let _ = child.wait();
226                logger.info(&format!(
227                    "Prompt copied to clipboard {}({}){}",
228                    colors.dim(),
229                    clipboard_cmd.paste_hint,
230                    colors.reset()
231                ));
232            }
233        }
234    }
235    Ok(())
236}
237
238/// Builds and configures the agent command with environment variables.
239fn build_agent_command(
240    config: &CommandConfig<'_>,
241    anthropic_env_vars_to_sanitize: &[&str],
242    logger: &Logger,
243    colors: Colors,
244) -> io::Result<(Vec<String>, Command)> {
245    let argv = split_command(config.cmd_str)?;
246    if argv.is_empty() || config.cmd_str.trim().is_empty() {
247        return Err(io::Error::new(
248            io::ErrorKind::InvalidInput,
249            "Agent command is empty or contains only whitespace",
250        ));
251    }
252
253    let mut argv_for_log = argv.clone();
254    argv_for_log.push("<PROMPT>".to_string());
255    let display_cmd = truncate_text(&format_argv_for_log(&argv_for_log), 160);
256    logger.info(&format!(
257        "Executing: {}{}{}",
258        colors.dim(),
259        display_cmd,
260        colors.reset()
261    ));
262
263    // GLM-specific debug logging (only for CCS/Claude-based GLM)
264    let is_glm_cmd = is_glm_like_agent(config.cmd_str);
265    if is_glm_cmd {
266        logger.info(&format!("GLM command details: {display_cmd}"));
267        if argv.iter().any(|arg| arg == "-p") {
268            logger.info("GLM command includes '-p' flag (correct)");
269        } else {
270            logger.warn("GLM command may be missing '-p' flag");
271        }
272    }
273
274    let _uses_json = config.parser_type != JsonParserType::Generic || argv_requests_json(&argv);
275    logger.info(&format!("Using {} parser...", config.parser_type));
276
277    if let Some(parent) = Path::new(config.logfile).parent() {
278        fs::create_dir_all(parent)?;
279    }
280    File::create(config.logfile)?;
281
282    let mut command = Command::new(&argv[0]);
283    command.args(&argv[1..]);
284    command.arg(config.prompt);
285
286    // Inject environment variables from agent config
287    if !config.env_vars.is_empty() {
288        logger.info(&format!(
289            "Injecting {} environment variable(s) into subprocess",
290            config.env_vars.len()
291        ));
292        for key in config.env_vars.keys() {
293            logger.info(&format!("  - {key}"));
294        }
295        for (key, value) in config.env_vars {
296            command.env(key, value);
297        }
298    }
299
300    // Set agent-side buffering disabling environment variables for real-time streaming.
301    // These are only set if not already explicitly configured by the user's env_vars.
302    // This mitigates the issue where AI agents buffer their stdout instead of streaming.
303    //
304    // Note: NODE_ENV is set to "production" (not "development") because production mode
305    // disables buffering in Node.js applications. This is necessary for real-time streaming
306    // but may affect error stack traces and logging levels in Node.js agents.
307    let buffering_vars = [("PYTHONUNBUFFERED", "1"), ("NODE_ENV", "production")];
308    for (key, value) in buffering_vars {
309        if !config.env_vars.contains_key(key) {
310            command.env(key, value);
311        }
312    }
313
314    // Clear problematic Anthropic env vars that weren't explicitly set by the agent.
315    for &var in anthropic_env_vars_to_sanitize {
316        if !config.env_vars.contains_key(var) {
317            command.env_remove(var);
318        }
319    }
320
321    Ok((argv, command))
322}
323
324/// Spawns the agent process with special error handling for `NotFound` and `PermissionDenied`.
325fn spawn_agent_process(
326    mut command: Command,
327    argv: &[String],
328) -> io::Result<Result<Child, CommandResult>> {
329    match command
330        .stdin(Stdio::null())
331        .stdout(Stdio::piped())
332        .stderr(Stdio::piped())
333        .spawn()
334    {
335        Ok(child) => Ok(Ok(child)),
336        Err(e)
337            if matches!(
338                e.kind(),
339                io::ErrorKind::NotFound | io::ErrorKind::PermissionDenied
340            ) =>
341        {
342            let exit_code = if e.kind() == io::ErrorKind::NotFound {
343                127
344            } else {
345                126
346            };
347            Ok(Err(CommandResult {
348                exit_code,
349                stderr: format!("{}: {}", argv[0], e),
350            }))
351        }
352        Err(e) => Err(e),
353    }
354}
355
356/// Streams agent output based on parser type.
357fn stream_agent_output(
358    stdout: ChildStdout,
359    cmd: &PromptCommand<'_>,
360    runtime: &PipelineRuntime<'_>,
361) -> io::Result<()> {
362    // Use StreamingLineReader for real-time streaming instead of BufReader::lines().
363    // StreamingLineReader yields lines immediately when newlines are found,
364    // enabling character-by-character streaming for agents that output NDJSON gradually.
365    let reader = StreamingLineReader::new(stdout);
366
367    if cmd.parser_type != JsonParserType::Generic
368        || argv_requests_json(&split_command(cmd.cmd_str)?)
369    {
370        let stdout_io = io::stdout();
371        let mut out = stdout_io.lock();
372
373        match cmd.parser_type {
374            JsonParserType::Claude => {
375                let p = crate::json_parser::ClaudeParser::new(
376                    *runtime.colors,
377                    runtime.config.verbosity,
378                )
379                .with_display_name(cmd.display_name)
380                .with_log_file(cmd.logfile)
381                .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
382                p.parse_stream(reader)?;
383            }
384            JsonParserType::Codex => {
385                let p =
386                    crate::json_parser::CodexParser::new(*runtime.colors, runtime.config.verbosity)
387                        .with_display_name(cmd.display_name)
388                        .with_log_file(cmd.logfile)
389                        .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
390                p.parse_stream(reader)?;
391            }
392            JsonParserType::Gemini => {
393                let p = crate::json_parser::GeminiParser::new(
394                    *runtime.colors,
395                    runtime.config.verbosity,
396                )
397                .with_display_name(cmd.display_name)
398                .with_log_file(cmd.logfile)
399                .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
400                p.parse_stream(reader)?;
401            }
402            JsonParserType::OpenCode => {
403                let p = crate::json_parser::OpenCodeParser::new(
404                    *runtime.colors,
405                    runtime.config.verbosity,
406                )
407                .with_display_name(cmd.display_name)
408                .with_log_file(cmd.logfile)
409                .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
410                p.parse_stream(reader)?;
411            }
412            JsonParserType::Generic => {
413                let mut logfile = OpenOptions::new()
414                    .create(true)
415                    .append(true)
416                    .open(cmd.logfile)?;
417
418                let mut buf = String::new();
419                for line in reader.lines() {
420                    let line = line?;
421                    // Write raw line to log file for extraction
422                    writeln!(logfile, "{line}")?;
423                    buf.push_str(&line);
424                    buf.push('\n');
425                }
426                logfile.flush()?;
427                // Ensure data is written to disk before continuing
428                // This prevents race conditions where extraction runs before OS commits writes
429                let _ = logfile.sync_all();
430
431                let formatted = format_generic_json_for_display(&buf, runtime.config.verbosity);
432                out.write_all(formatted.as_bytes())?;
433            }
434        }
435    } else {
436        let mut logfile = OpenOptions::new()
437            .create(true)
438            .append(true)
439            .open(cmd.logfile)?;
440
441        let stdout_io = io::stdout();
442        let mut out = stdout_io.lock();
443
444        for line in reader.lines() {
445            let line = line?;
446            writeln!(out, "{line}")?;
447            writeln!(logfile, "{line}")?;
448        }
449        logfile.flush()?;
450        // Ensure data is written to disk before continuing
451        // This prevents race conditions where extraction runs before OS commits writes
452        let _ = logfile.sync_all();
453    }
454    Ok(())
455}
456
457/// Waits for process completion and collects stderr output.
458fn wait_for_completion_and_collect_stderr(
459    mut child: Child,
460    stderr_join_handle: Option<std::thread::JoinHandle<io::Result<String>>>,
461    runtime: &PipelineRuntime<'_>,
462) -> io::Result<(i32, String)> {
463    let status = child.wait()?;
464    let exit_code = status.code().unwrap_or(1);
465
466    if status.code().is_none() && runtime.config.verbosity.is_debug() {
467        runtime
468            .logger
469            .warn("Process terminated by signal (no exit code), treating as failure");
470    }
471
472    let stderr_output = match stderr_join_handle {
473        Some(handle) => match handle.join() {
474            Ok(result) => result?,
475            Err(panic_payload) => {
476                let panic_msg = panic_payload.downcast_ref::<String>().map_or_else(
477                    || {
478                        panic_payload.downcast_ref::<&str>().map_or_else(
479                            || "<unknown panic>".to_string(),
480                            std::string::ToString::to_string,
481                        )
482                    },
483                    std::clone::Clone::clone,
484                );
485                runtime.logger.warn(&format!(
486                    "Stderr collection thread panicked: {panic_msg}. This may indicate a bug."
487                ));
488                String::new()
489            }
490        },
491        None => String::new(),
492    };
493
494    if !stderr_output.is_empty() && runtime.config.verbosity.is_debug() {
495        runtime.logger.warn(&format!(
496            "Agent stderr output detected ({} bytes):",
497            stderr_output.len()
498        ));
499        for (i, line) in stderr_output.lines().take(5).enumerate() {
500            runtime.logger.info(&format!("  stderr[{i}]: {line}"));
501        }
502        if stderr_output.lines().count() > 5 {
503            runtime.logger.info(&format!(
504                "  ... ({} more lines, see log file for full output)",
505                stderr_output.lines().count() - 5
506            ));
507        }
508    }
509
510    Ok((exit_code, stderr_output))
511}
512
513/// Run a command with a prompt argument.
514///
515/// This is an internal helper for `run_with_fallback`.
516pub fn run_with_prompt(
517    cmd: &PromptCommand<'_>,
518    runtime: &mut PipelineRuntime<'_>,
519) -> io::Result<CommandResult> {
520    const ANTHROPIC_ENV_VARS_TO_SANITIZE: &[&str] = &[
521        "ANTHROPIC_API_KEY",
522        "ANTHROPIC_BASE_URL",
523        "ANTHROPIC_AUTH_TOKEN",
524        "ANTHROPIC_MODEL",
525        "ANTHROPIC_DEFAULT_HAIKU_MODEL",
526        "ANTHROPIC_DEFAULT_OPUS_MODEL",
527        "ANTHROPIC_DEFAULT_SONNET_MODEL",
528    ];
529
530    runtime.timer.start_phase();
531    runtime.logger.step(&format!(
532        "{}{}{}",
533        runtime.colors.bold(),
534        cmd.label,
535        runtime.colors.reset()
536    ));
537
538    save_prompt_to_file_and_clipboard(
539        cmd.prompt,
540        &runtime.config.prompt_path,
541        runtime.config.behavior.interactive,
542        runtime.logger,
543        *runtime.colors,
544    )?;
545
546    // Use agent executor if provided (for testing)
547    #[cfg(any(test, feature = "test-utils"))]
548    {
549        if let Some(executor) = runtime.agent_executor.clone() {
550            return run_with_agent_executor(cmd, runtime, &executor);
551        }
552    }
553
554    run_with_subprocess(cmd, runtime, ANTHROPIC_ENV_VARS_TO_SANITIZE)
555}
556
557/// Run agent using the real subprocess execution.
558#[cfg(not(any(test, feature = "test-utils")))]
559fn run_with_subprocess(
560    cmd: &PromptCommand<'_>,
561    runtime: &mut PipelineRuntime<'_>,
562    anthropic_env_vars_to_sanitize: &[&str],
563) -> io::Result<CommandResult> {
564    let (argv, command) = build_agent_command(
565        &CommandConfig {
566            cmd_str: cmd.cmd_str,
567            prompt: cmd.prompt,
568            env_vars: cmd.env_vars,
569            logfile: cmd.logfile,
570            parser_type: cmd.parser_type,
571        },
572        anthropic_env_vars_to_sanitize,
573        runtime.logger,
574        *runtime.colors,
575    )?;
576
577    let mut child = match spawn_agent_process(command, &argv)? {
578        Ok(child) => child,
579        Err(result) => return Ok(result),
580    };
581
582    let stdout = child
583        .stdout
584        .take()
585        .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
586
587    let stderr_join_handle = child.stderr.take().map(|stderr| {
588        std::thread::spawn(move || -> io::Result<String> {
589            const STDERR_MAX_BYTES: usize = 512 * 1024;
590
591            let mut reader = BufReader::new(stderr);
592            let mut buf = [0u8; 8192];
593            let mut collected = Vec::<u8>::new();
594            let mut truncated = false;
595
596            loop {
597                let n = reader.read(&mut buf)?;
598                if n == 0 {
599                    break;
600                }
601
602                let remaining = STDERR_MAX_BYTES.saturating_sub(collected.len());
603                if remaining == 0 {
604                    truncated = true;
605                    break;
606                }
607
608                let to_take = remaining.min(n);
609                collected.extend_from_slice(&buf[..to_take]);
610                if to_take < n {
611                    truncated = true;
612                    break;
613                }
614            }
615
616            let mut stderr_output = String::from_utf8_lossy(&collected).into_owned();
617            if truncated {
618                if !stderr_output.ends_with('\n') {
619                    stderr_output.push('\n');
620                }
621                stderr_output.push_str("<stderr truncated>");
622            }
623
624            Ok(stderr_output)
625        })
626    });
627
628    stream_agent_output(stdout, cmd, runtime)?;
629
630    let (exit_code, stderr_output) =
631        wait_for_completion_and_collect_stderr(child, stderr_join_handle, runtime)?;
632
633    if runtime.config.verbosity.is_verbose() {
634        runtime.logger.info(&format!(
635            "Phase elapsed: {}",
636            runtime.timer.phase_elapsed_formatted()
637        ));
638    }
639
640    Ok(CommandResult {
641        exit_code,
642        stderr: stderr_output,
643    })
644}
645
646/// Run agent using the real subprocess execution.
647#[cfg(any(test, feature = "test-utils"))]
648fn run_with_subprocess(
649    cmd: &PromptCommand<'_>,
650    runtime: &mut PipelineRuntime<'_>,
651    anthropic_env_vars_to_sanitize: &[&str],
652) -> io::Result<CommandResult> {
653    let (argv, command) = build_agent_command(
654        &CommandConfig {
655            cmd_str: cmd.cmd_str,
656            prompt: cmd.prompt,
657            env_vars: cmd.env_vars,
658            logfile: cmd.logfile,
659            parser_type: cmd.parser_type,
660        },
661        anthropic_env_vars_to_sanitize,
662        runtime.logger,
663        *runtime.colors,
664    )?;
665
666    let mut child = match spawn_agent_process(command, &argv)? {
667        Ok(child) => child,
668        Err(result) => return Ok(result),
669    };
670
671    let stdout = child
672        .stdout
673        .take()
674        .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
675
676    let stderr_join_handle = child.stderr.take().map(|stderr| {
677        std::thread::spawn(move || -> io::Result<String> {
678            const STDERR_MAX_BYTES: usize = 512 * 1024;
679
680            let mut reader = BufReader::new(stderr);
681            let mut buf = [0u8; 8192];
682            let mut collected = Vec::<u8>::new();
683            let mut truncated = false;
684
685            loop {
686                let n = reader.read(&mut buf)?;
687                if n == 0 {
688                    break;
689                }
690
691                let remaining = STDERR_MAX_BYTES.saturating_sub(collected.len());
692                if remaining == 0 {
693                    truncated = true;
694                    break;
695                }
696
697                let to_take = remaining.min(n);
698                collected.extend_from_slice(&buf[..to_take]);
699                if to_take < n {
700                    truncated = true;
701                    break;
702                }
703            }
704
705            let mut stderr_output = String::from_utf8_lossy(&collected).into_owned();
706            if truncated {
707                if !stderr_output.ends_with('\n') {
708                    stderr_output.push('\n');
709                }
710                stderr_output.push_str("<stderr truncated>");
711            }
712
713            Ok(stderr_output)
714        })
715    });
716
717    stream_agent_output(stdout, cmd, runtime)?;
718
719    let (exit_code, stderr_output) =
720        wait_for_completion_and_collect_stderr(child, stderr_join_handle, runtime)?;
721
722    if runtime.config.verbosity.is_verbose() {
723        runtime.logger.info(&format!(
724            "Phase elapsed: {}",
725            runtime.timer.phase_elapsed_formatted()
726        ));
727    }
728
729    Ok(CommandResult {
730        exit_code,
731        stderr: stderr_output,
732    })
733}
734
735/// Run agent using the mocked AgentExecutor (for testing).
736#[cfg(any(test, feature = "test-utils"))]
737fn run_with_agent_executor(
738    cmd: &PromptCommand<'_>,
739    runtime: &mut PipelineRuntime<'_>,
740    executor: &std::sync::Arc<dyn super::test_trait::AgentExecutor>,
741) -> io::Result<CommandResult> {
742    use super::test_trait::AgentCommandConfig;
743
744    let (argv, _command) = build_agent_command(
745        &CommandConfig {
746            cmd_str: cmd.cmd_str,
747            prompt: cmd.prompt,
748            env_vars: cmd.env_vars,
749            logfile: cmd.logfile,
750            parser_type: cmd.parser_type,
751        },
752        &[],
753        runtime.logger,
754        *runtime.colors,
755    )?;
756
757    let display_cmd = truncate_text(&format_argv_for_log(&argv), 160);
758    runtime.logger.info(&format!(
759        "Executing (mocked): {}{}{}",
760        runtime.colors.dim(),
761        display_cmd,
762        runtime.colors.reset()
763    ));
764
765    let result = executor.execute(&AgentCommandConfig {
766        cmd: cmd.cmd_str.to_string(),
767        prompt: cmd.prompt.to_string(),
768        env_vars: cmd.env_vars.clone(),
769        parser_type: cmd.parser_type,
770        logfile: cmd.logfile.to_string(),
771        display_name: cmd.display_name.to_string(),
772    })?;
773
774    if runtime.config.verbosity.is_verbose() {
775        runtime.logger.info(&format!(
776            "Phase elapsed: {}",
777            runtime.timer.phase_elapsed_formatted()
778        ));
779    }
780
781    Ok(CommandResult {
782        exit_code: result.exit_code,
783        stderr: result.stderr,
784    })
785}