1use crate::agents::{is_glm_like_agent, JsonParserType};
4use crate::common::{format_argv_for_log, split_command, truncate_text};
5use crate::config::Config;
6use crate::logger::Colors;
7use crate::logger::Logger;
8use crate::logger::{argv_requests_json, format_generic_json_for_display};
9use crate::pipeline::Timer;
10use std::fs::{self, File, OpenOptions};
11use std::io::{self, BufRead, BufReader, Read, Write};
12use std::path::Path;
13use std::process::{Child, ChildStdout, Command, Stdio};
14
15#[cfg(any(test, feature = "test-utils"))]
16use std::sync::Arc;
17
18struct StreamingLineReader<R: Read> {
30 inner: BufReader<R>,
31 buffer: Vec<u8>,
32 consumed: usize,
33}
34
35const MAX_BUFFER_SIZE: usize = 1024 * 1024; impl<R: Read> StreamingLineReader<R> {
50 fn new(inner: R) -> Self {
52 const BUFFER_SIZE: usize = 1024;
55 Self {
56 inner: BufReader::with_capacity(BUFFER_SIZE, inner),
57 buffer: Vec::new(),
58 consumed: 0,
59 }
60 }
61
62 fn fill_buffer(&mut self) -> io::Result<usize> {
69 let current_size = self.buffer.len() - self.consumed;
71 if current_size >= MAX_BUFFER_SIZE {
72 return Err(io::Error::other(format!(
73 "StreamingLineReader buffer exceeded maximum size of {MAX_BUFFER_SIZE} bytes. \
74 This may indicate malformed input or an agent that is not sending newlines."
75 )));
76 }
77
78 let mut read_buf = [0u8; 256];
79 let n = self.inner.read(&mut read_buf)?;
80 if n > 0 {
81 let new_size = current_size + n;
83 if new_size > MAX_BUFFER_SIZE {
84 return Err(io::Error::other(format!(
85 "StreamingLineReader buffer would exceed maximum size of {MAX_BUFFER_SIZE} bytes. \
86 This may indicate malformed input or an agent that is not sending newlines."
87 )));
88 }
89 self.buffer.extend_from_slice(&read_buf[..n]);
90 }
91 Ok(n)
92 }
93}
94
95impl<R: Read> Read for StreamingLineReader<R> {
96 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
97 let available = self.buffer.len() - self.consumed;
99 if available > 0 {
100 let to_copy = available.min(buf.len());
101 buf[..to_copy].copy_from_slice(&self.buffer[self.consumed..self.consumed + to_copy]);
102 self.consumed += to_copy;
103
104 if self.consumed == self.buffer.len() {
106 self.buffer.clear();
107 self.consumed = 0;
108 }
109 return Ok(to_copy);
110 }
111
112 self.inner.read(buf)
114 }
115}
116
117impl<R: Read> BufRead for StreamingLineReader<R> {
118 fn fill_buf(&mut self) -> io::Result<&[u8]> {
119 const MAX_ATTEMPTS: usize = 8; if self.consumed < self.buffer.len() {
123 return Ok(&self.buffer[self.consumed..]);
124 }
125
126 self.buffer.clear();
128 self.consumed = 0;
129
130 let mut total_read = 0;
132 for _ in 0..MAX_ATTEMPTS {
133 match self.fill_buffer()? {
134 0 if total_read == 0 => return Ok(&[]), 0 => break, n => {
137 total_read += n;
138 if self.buffer.contains(&b'\n') {
140 break;
141 }
142 }
143 }
144 }
145
146 Ok(&self.buffer[self.consumed..])
147 }
148
149 fn consume(&mut self, amt: usize) {
150 self.consumed = (self.consumed + amt).min(self.buffer.len());
151
152 if self.consumed == self.buffer.len() {
154 self.buffer.clear();
155 self.consumed = 0;
156 }
157 }
158}
159
160use super::clipboard::get_platform_clipboard_command;
161use super::types::CommandResult;
162
163pub struct PromptCommand<'a> {
165 pub label: &'a str,
166 pub display_name: &'a str,
167 pub cmd_str: &'a str,
168 pub prompt: &'a str,
169 pub logfile: &'a str,
170 pub parser_type: JsonParserType,
171 pub env_vars: &'a std::collections::HashMap<String, String>,
172}
173
174pub struct PipelineRuntime<'a> {
176 pub timer: &'a mut Timer,
177 pub logger: &'a Logger,
178 pub colors: &'a Colors,
179 pub config: &'a Config,
180 #[cfg(any(test, feature = "test-utils"))]
182 pub agent_executor: Option<Arc<dyn super::test_trait::AgentExecutor>>,
183}
184
185struct CommandConfig<'a> {
187 cmd_str: &'a str,
188 prompt: &'a str,
189 env_vars: &'a std::collections::HashMap<String, String>,
190 logfile: &'a str,
191 parser_type: JsonParserType,
192}
193
194fn save_prompt_to_file_and_clipboard(
196 prompt: &str,
197 prompt_path: &std::path::PathBuf,
198 interactive: bool,
199 logger: &Logger,
200 colors: Colors,
201) -> io::Result<()> {
202 if let Some(parent) = prompt_path.parent() {
204 fs::create_dir_all(parent)?;
205 }
206 fs::write(prompt_path, prompt)?;
207 logger.info(&format!(
208 "Prompt saved to {}{}{}",
209 colors.cyan(),
210 prompt_path.display(),
211 colors.reset()
212 ));
213
214 if interactive {
216 if let Some(clipboard_cmd) = get_platform_clipboard_command() {
217 if let Ok(mut child) = Command::new(clipboard_cmd.binary)
218 .args(clipboard_cmd.args)
219 .stdin(Stdio::piped())
220 .spawn()
221 {
222 if let Some(mut stdin) = child.stdin.take() {
223 let _ = stdin.write_all(prompt.as_bytes());
224 }
225 let _ = child.wait();
226 logger.info(&format!(
227 "Prompt copied to clipboard {}({}){}",
228 colors.dim(),
229 clipboard_cmd.paste_hint,
230 colors.reset()
231 ));
232 }
233 }
234 }
235 Ok(())
236}
237
238fn build_agent_command(
240 config: &CommandConfig<'_>,
241 anthropic_env_vars_to_sanitize: &[&str],
242 logger: &Logger,
243 colors: Colors,
244) -> io::Result<(Vec<String>, Command)> {
245 let argv = split_command(config.cmd_str)?;
246 if argv.is_empty() || config.cmd_str.trim().is_empty() {
247 return Err(io::Error::new(
248 io::ErrorKind::InvalidInput,
249 "Agent command is empty or contains only whitespace",
250 ));
251 }
252
253 let mut argv_for_log = argv.clone();
254 argv_for_log.push("<PROMPT>".to_string());
255 let display_cmd = truncate_text(&format_argv_for_log(&argv_for_log), 160);
256 logger.info(&format!(
257 "Executing: {}{}{}",
258 colors.dim(),
259 display_cmd,
260 colors.reset()
261 ));
262
263 let is_glm_cmd = is_glm_like_agent(config.cmd_str);
265 if is_glm_cmd {
266 logger.info(&format!("GLM command details: {display_cmd}"));
267 if argv.iter().any(|arg| arg == "-p") {
268 logger.info("GLM command includes '-p' flag (correct)");
269 } else {
270 logger.warn("GLM command may be missing '-p' flag");
271 }
272 }
273
274 let _uses_json = config.parser_type != JsonParserType::Generic || argv_requests_json(&argv);
275 logger.info(&format!("Using {} parser...", config.parser_type));
276
277 if let Some(parent) = Path::new(config.logfile).parent() {
278 fs::create_dir_all(parent)?;
279 }
280 File::create(config.logfile)?;
281
282 let mut command = Command::new(&argv[0]);
283 command.args(&argv[1..]);
284 command.arg(config.prompt);
285
286 if !config.env_vars.is_empty() {
288 logger.info(&format!(
289 "Injecting {} environment variable(s) into subprocess",
290 config.env_vars.len()
291 ));
292 for key in config.env_vars.keys() {
293 logger.info(&format!(" - {key}"));
294 }
295 for (key, value) in config.env_vars {
296 command.env(key, value);
297 }
298 }
299
300 let buffering_vars = [("PYTHONUNBUFFERED", "1"), ("NODE_ENV", "production")];
308 for (key, value) in buffering_vars {
309 if !config.env_vars.contains_key(key) {
310 command.env(key, value);
311 }
312 }
313
314 for &var in anthropic_env_vars_to_sanitize {
316 if !config.env_vars.contains_key(var) {
317 command.env_remove(var);
318 }
319 }
320
321 Ok((argv, command))
322}
323
324fn spawn_agent_process(
326 mut command: Command,
327 argv: &[String],
328) -> io::Result<Result<Child, CommandResult>> {
329 match command
330 .stdin(Stdio::null())
331 .stdout(Stdio::piped())
332 .stderr(Stdio::piped())
333 .spawn()
334 {
335 Ok(child) => Ok(Ok(child)),
336 Err(e)
337 if matches!(
338 e.kind(),
339 io::ErrorKind::NotFound | io::ErrorKind::PermissionDenied
340 ) =>
341 {
342 let exit_code = if e.kind() == io::ErrorKind::NotFound {
343 127
344 } else {
345 126
346 };
347 Ok(Err(CommandResult {
348 exit_code,
349 stderr: format!("{}: {}", argv[0], e),
350 }))
351 }
352 Err(e) => Err(e),
353 }
354}
355
356fn stream_agent_output(
358 stdout: ChildStdout,
359 cmd: &PromptCommand<'_>,
360 runtime: &PipelineRuntime<'_>,
361) -> io::Result<()> {
362 let reader = StreamingLineReader::new(stdout);
366
367 if cmd.parser_type != JsonParserType::Generic
368 || argv_requests_json(&split_command(cmd.cmd_str)?)
369 {
370 let stdout_io = io::stdout();
371 let mut out = stdout_io.lock();
372
373 match cmd.parser_type {
374 JsonParserType::Claude => {
375 let p = crate::json_parser::ClaudeParser::new(
376 *runtime.colors,
377 runtime.config.verbosity,
378 )
379 .with_display_name(cmd.display_name)
380 .with_log_file(cmd.logfile)
381 .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
382 p.parse_stream(reader)?;
383 }
384 JsonParserType::Codex => {
385 let p =
386 crate::json_parser::CodexParser::new(*runtime.colors, runtime.config.verbosity)
387 .with_display_name(cmd.display_name)
388 .with_log_file(cmd.logfile)
389 .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
390 p.parse_stream(reader)?;
391 }
392 JsonParserType::Gemini => {
393 let p = crate::json_parser::GeminiParser::new(
394 *runtime.colors,
395 runtime.config.verbosity,
396 )
397 .with_display_name(cmd.display_name)
398 .with_log_file(cmd.logfile)
399 .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
400 p.parse_stream(reader)?;
401 }
402 JsonParserType::OpenCode => {
403 let p = crate::json_parser::OpenCodeParser::new(
404 *runtime.colors,
405 runtime.config.verbosity,
406 )
407 .with_display_name(cmd.display_name)
408 .with_log_file(cmd.logfile)
409 .with_show_streaming_metrics(runtime.config.show_streaming_metrics);
410 p.parse_stream(reader)?;
411 }
412 JsonParserType::Generic => {
413 let mut logfile = OpenOptions::new()
414 .create(true)
415 .append(true)
416 .open(cmd.logfile)?;
417
418 let mut buf = String::new();
419 for line in reader.lines() {
420 let line = line?;
421 writeln!(logfile, "{line}")?;
423 buf.push_str(&line);
424 buf.push('\n');
425 }
426 logfile.flush()?;
427 let _ = logfile.sync_all();
430
431 let formatted = format_generic_json_for_display(&buf, runtime.config.verbosity);
432 out.write_all(formatted.as_bytes())?;
433 }
434 }
435 } else {
436 let mut logfile = OpenOptions::new()
437 .create(true)
438 .append(true)
439 .open(cmd.logfile)?;
440
441 let stdout_io = io::stdout();
442 let mut out = stdout_io.lock();
443
444 for line in reader.lines() {
445 let line = line?;
446 writeln!(out, "{line}")?;
447 writeln!(logfile, "{line}")?;
448 }
449 logfile.flush()?;
450 let _ = logfile.sync_all();
453 }
454 Ok(())
455}
456
457fn wait_for_completion_and_collect_stderr(
459 mut child: Child,
460 stderr_join_handle: Option<std::thread::JoinHandle<io::Result<String>>>,
461 runtime: &PipelineRuntime<'_>,
462) -> io::Result<(i32, String)> {
463 let status = child.wait()?;
464 let exit_code = status.code().unwrap_or(1);
465
466 if status.code().is_none() && runtime.config.verbosity.is_debug() {
467 runtime
468 .logger
469 .warn("Process terminated by signal (no exit code), treating as failure");
470 }
471
472 let stderr_output = match stderr_join_handle {
473 Some(handle) => match handle.join() {
474 Ok(result) => result?,
475 Err(panic_payload) => {
476 let panic_msg = panic_payload.downcast_ref::<String>().map_or_else(
477 || {
478 panic_payload.downcast_ref::<&str>().map_or_else(
479 || "<unknown panic>".to_string(),
480 std::string::ToString::to_string,
481 )
482 },
483 std::clone::Clone::clone,
484 );
485 runtime.logger.warn(&format!(
486 "Stderr collection thread panicked: {panic_msg}. This may indicate a bug."
487 ));
488 String::new()
489 }
490 },
491 None => String::new(),
492 };
493
494 if !stderr_output.is_empty() && runtime.config.verbosity.is_debug() {
495 runtime.logger.warn(&format!(
496 "Agent stderr output detected ({} bytes):",
497 stderr_output.len()
498 ));
499 for (i, line) in stderr_output.lines().take(5).enumerate() {
500 runtime.logger.info(&format!(" stderr[{i}]: {line}"));
501 }
502 if stderr_output.lines().count() > 5 {
503 runtime.logger.info(&format!(
504 " ... ({} more lines, see log file for full output)",
505 stderr_output.lines().count() - 5
506 ));
507 }
508 }
509
510 Ok((exit_code, stderr_output))
511}
512
513pub fn run_with_prompt(
517 cmd: &PromptCommand<'_>,
518 runtime: &mut PipelineRuntime<'_>,
519) -> io::Result<CommandResult> {
520 const ANTHROPIC_ENV_VARS_TO_SANITIZE: &[&str] = &[
521 "ANTHROPIC_API_KEY",
522 "ANTHROPIC_BASE_URL",
523 "ANTHROPIC_AUTH_TOKEN",
524 "ANTHROPIC_MODEL",
525 "ANTHROPIC_DEFAULT_HAIKU_MODEL",
526 "ANTHROPIC_DEFAULT_OPUS_MODEL",
527 "ANTHROPIC_DEFAULT_SONNET_MODEL",
528 ];
529
530 runtime.timer.start_phase();
531 runtime.logger.step(&format!(
532 "{}{}{}",
533 runtime.colors.bold(),
534 cmd.label,
535 runtime.colors.reset()
536 ));
537
538 save_prompt_to_file_and_clipboard(
539 cmd.prompt,
540 &runtime.config.prompt_path,
541 runtime.config.behavior.interactive,
542 runtime.logger,
543 *runtime.colors,
544 )?;
545
546 #[cfg(any(test, feature = "test-utils"))]
548 {
549 if let Some(executor) = runtime.agent_executor.clone() {
550 return run_with_agent_executor(cmd, runtime, &executor);
551 }
552 }
553
554 run_with_subprocess(cmd, runtime, ANTHROPIC_ENV_VARS_TO_SANITIZE)
555}
556
557#[cfg(not(any(test, feature = "test-utils")))]
559fn run_with_subprocess(
560 cmd: &PromptCommand<'_>,
561 runtime: &mut PipelineRuntime<'_>,
562 anthropic_env_vars_to_sanitize: &[&str],
563) -> io::Result<CommandResult> {
564 let (argv, command) = build_agent_command(
565 &CommandConfig {
566 cmd_str: cmd.cmd_str,
567 prompt: cmd.prompt,
568 env_vars: cmd.env_vars,
569 logfile: cmd.logfile,
570 parser_type: cmd.parser_type,
571 },
572 anthropic_env_vars_to_sanitize,
573 runtime.logger,
574 *runtime.colors,
575 )?;
576
577 let mut child = match spawn_agent_process(command, &argv)? {
578 Ok(child) => child,
579 Err(result) => return Ok(result),
580 };
581
582 let stdout = child
583 .stdout
584 .take()
585 .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
586
587 let stderr_join_handle = child.stderr.take().map(|stderr| {
588 std::thread::spawn(move || -> io::Result<String> {
589 const STDERR_MAX_BYTES: usize = 512 * 1024;
590
591 let mut reader = BufReader::new(stderr);
592 let mut buf = [0u8; 8192];
593 let mut collected = Vec::<u8>::new();
594 let mut truncated = false;
595
596 loop {
597 let n = reader.read(&mut buf)?;
598 if n == 0 {
599 break;
600 }
601
602 let remaining = STDERR_MAX_BYTES.saturating_sub(collected.len());
603 if remaining == 0 {
604 truncated = true;
605 break;
606 }
607
608 let to_take = remaining.min(n);
609 collected.extend_from_slice(&buf[..to_take]);
610 if to_take < n {
611 truncated = true;
612 break;
613 }
614 }
615
616 let mut stderr_output = String::from_utf8_lossy(&collected).into_owned();
617 if truncated {
618 if !stderr_output.ends_with('\n') {
619 stderr_output.push('\n');
620 }
621 stderr_output.push_str("<stderr truncated>");
622 }
623
624 Ok(stderr_output)
625 })
626 });
627
628 stream_agent_output(stdout, cmd, runtime)?;
629
630 let (exit_code, stderr_output) =
631 wait_for_completion_and_collect_stderr(child, stderr_join_handle, runtime)?;
632
633 if runtime.config.verbosity.is_verbose() {
634 runtime.logger.info(&format!(
635 "Phase elapsed: {}",
636 runtime.timer.phase_elapsed_formatted()
637 ));
638 }
639
640 Ok(CommandResult {
641 exit_code,
642 stderr: stderr_output,
643 })
644}
645
646#[cfg(any(test, feature = "test-utils"))]
648fn run_with_subprocess(
649 cmd: &PromptCommand<'_>,
650 runtime: &mut PipelineRuntime<'_>,
651 anthropic_env_vars_to_sanitize: &[&str],
652) -> io::Result<CommandResult> {
653 let (argv, command) = build_agent_command(
654 &CommandConfig {
655 cmd_str: cmd.cmd_str,
656 prompt: cmd.prompt,
657 env_vars: cmd.env_vars,
658 logfile: cmd.logfile,
659 parser_type: cmd.parser_type,
660 },
661 anthropic_env_vars_to_sanitize,
662 runtime.logger,
663 *runtime.colors,
664 )?;
665
666 let mut child = match spawn_agent_process(command, &argv)? {
667 Ok(child) => child,
668 Err(result) => return Ok(result),
669 };
670
671 let stdout = child
672 .stdout
673 .take()
674 .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
675
676 let stderr_join_handle = child.stderr.take().map(|stderr| {
677 std::thread::spawn(move || -> io::Result<String> {
678 const STDERR_MAX_BYTES: usize = 512 * 1024;
679
680 let mut reader = BufReader::new(stderr);
681 let mut buf = [0u8; 8192];
682 let mut collected = Vec::<u8>::new();
683 let mut truncated = false;
684
685 loop {
686 let n = reader.read(&mut buf)?;
687 if n == 0 {
688 break;
689 }
690
691 let remaining = STDERR_MAX_BYTES.saturating_sub(collected.len());
692 if remaining == 0 {
693 truncated = true;
694 break;
695 }
696
697 let to_take = remaining.min(n);
698 collected.extend_from_slice(&buf[..to_take]);
699 if to_take < n {
700 truncated = true;
701 break;
702 }
703 }
704
705 let mut stderr_output = String::from_utf8_lossy(&collected).into_owned();
706 if truncated {
707 if !stderr_output.ends_with('\n') {
708 stderr_output.push('\n');
709 }
710 stderr_output.push_str("<stderr truncated>");
711 }
712
713 Ok(stderr_output)
714 })
715 });
716
717 stream_agent_output(stdout, cmd, runtime)?;
718
719 let (exit_code, stderr_output) =
720 wait_for_completion_and_collect_stderr(child, stderr_join_handle, runtime)?;
721
722 if runtime.config.verbosity.is_verbose() {
723 runtime.logger.info(&format!(
724 "Phase elapsed: {}",
725 runtime.timer.phase_elapsed_formatted()
726 ));
727 }
728
729 Ok(CommandResult {
730 exit_code,
731 stderr: stderr_output,
732 })
733}
734
735#[cfg(any(test, feature = "test-utils"))]
737fn run_with_agent_executor(
738 cmd: &PromptCommand<'_>,
739 runtime: &mut PipelineRuntime<'_>,
740 executor: &std::sync::Arc<dyn super::test_trait::AgentExecutor>,
741) -> io::Result<CommandResult> {
742 use super::test_trait::AgentCommandConfig;
743
744 let (argv, _command) = build_agent_command(
745 &CommandConfig {
746 cmd_str: cmd.cmd_str,
747 prompt: cmd.prompt,
748 env_vars: cmd.env_vars,
749 logfile: cmd.logfile,
750 parser_type: cmd.parser_type,
751 },
752 &[],
753 runtime.logger,
754 *runtime.colors,
755 )?;
756
757 let display_cmd = truncate_text(&format_argv_for_log(&argv), 160);
758 runtime.logger.info(&format!(
759 "Executing (mocked): {}{}{}",
760 runtime.colors.dim(),
761 display_cmd,
762 runtime.colors.reset()
763 ));
764
765 let result = executor.execute(&AgentCommandConfig {
766 cmd: cmd.cmd_str.to_string(),
767 prompt: cmd.prompt.to_string(),
768 env_vars: cmd.env_vars.clone(),
769 parser_type: cmd.parser_type,
770 logfile: cmd.logfile.to_string(),
771 display_name: cmd.display_name.to_string(),
772 })?;
773
774 if runtime.config.verbosity.is_verbose() {
775 runtime.logger.info(&format!(
776 "Phase elapsed: {}",
777 runtime.timer.phase_elapsed_formatted()
778 ));
779 }
780
781 Ok(CommandResult {
782 exit_code: result.exit_code,
783 stderr: result.stderr,
784 })
785}