Skip to main content

ralph_adapters/
cli_executor.rs

1//! CLI executor for running prompts through backends.
2//!
3//! Executes prompts via CLI tools with real-time streaming output.
4//! Supports optional execution timeout with graceful SIGTERM termination.
5
6#[cfg(test)]
7use crate::cli_backend::PromptMode;
8use crate::cli_backend::{CliBackend, OutputFormat};
9use crate::copilot_stream::CopilotStreamParser;
10#[cfg(unix)]
11use nix::sys::signal::{Signal, kill};
12#[cfg(unix)]
13use nix::unistd::Pid;
14use std::env;
15use std::io::Write;
16use std::process::Stdio;
17use std::time::Duration;
18use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
19use tokio::process::{Child, Command};
20use tracing::{debug, warn};
21
22const POST_EVENT_GRACE_TIMEOUT: Duration = Duration::from_secs(5);
23const TERMINATION_GRACE_TIMEOUT: Duration = Duration::from_secs(2);
24
25/// Result of a CLI execution.
26#[derive(Debug)]
27pub struct ExecutionResult {
28    /// The full output from the CLI.
29    pub output: String,
30    /// Whether the execution succeeded (exit code 0).
31    pub success: bool,
32    /// The exit code.
33    pub exit_code: Option<i32>,
34    /// Whether the execution was terminated due to timeout.
35    pub timed_out: bool,
36}
37
38/// Executor for running prompts through CLI backends.
39#[derive(Debug)]
40pub struct CliExecutor {
41    backend: CliBackend,
42}
43
44enum StreamEvent {
45    StdoutLine(String),
46    StderrLine(String),
47    StdoutEof,
48    StderrEof,
49}
50
51enum StreamKind {
52    Stdout,
53    Stderr,
54}
55
56impl CliExecutor {
57    /// Creates a new executor with the given backend.
58    pub fn new(backend: CliBackend) -> Self {
59        Self { backend }
60    }
61
62    /// Executes a prompt and streams output to the provided writer.
63    ///
64    /// Output is streamed line-by-line to the writer while being accumulated
65    /// for the return value. If `timeout` is provided and the execution produces
66    /// no stdout/stderr activity for longer than that duration, the process
67    /// receives SIGTERM and the result indicates timeout.
68    ///
69    /// When `verbose` is true, stderr output is also written to the output writer
70    /// with a `[stderr]` prefix. When false, stderr is captured but not displayed.
71    pub async fn execute<W: Write + Send>(
72        &self,
73        prompt: &str,
74        mut output_writer: W,
75        timeout: Option<Duration>,
76        verbose: bool,
77    ) -> std::io::Result<ExecutionResult> {
78        // Note: _temp_file is kept alive for the duration of this function scope.
79        // Some Arg-mode backends use temp-file indirection for very large prompts.
80        let (cmd, args, stdin_input, _temp_file) = self.backend.build_command(prompt, false);
81
82        let mut command = Command::new(&cmd);
83        command.args(&args);
84        command.stdout(Stdio::piped());
85        command.stderr(Stdio::piped());
86        #[cfg(unix)]
87        command.process_group(0);
88
89        // Set working directory to current directory (mirrors PTY executor behavior)
90        // Use fallback to "." if current_dir fails (e.g., E2E test workspaces)
91        let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
92        command.current_dir(&cwd);
93        inject_ralph_runtime_env(&mut command, &cwd);
94
95        // Apply backend-specific environment variables (e.g., Agent Teams env var)
96        command.envs(self.backend.env_vars.iter().map(|(k, v)| (k, v)));
97
98        debug!(
99            command = %cmd,
100            args = ?args,
101            cwd = ?cwd,
102            "Spawning CLI command"
103        );
104
105        if stdin_input.is_some() {
106            command.stdin(Stdio::piped());
107        }
108
109        let mut child = command.spawn()?;
110
111        // Write to stdin if needed. Some short-lived commands can exit before
112        // consuming stdin, which surfaces as BrokenPipe. Treat that as benign
113        // and continue collecting output/exit status from the child.
114        if let Some(input) = stdin_input
115            && let Some(mut stdin) = child.stdin.take()
116        {
117            if let Err(err) = stdin.write_all(input.as_bytes()).await
118                && err.kind() != std::io::ErrorKind::BrokenPipe
119            {
120                return Err(err);
121            }
122            drop(stdin); // Close stdin to signal EOF
123        }
124
125        let mut timed_out = false;
126        let mut post_event_deadline: Option<tokio::time::Instant> = None;
127        let mut terminated_status = None;
128
129        // Take both stdout and stderr handles upfront to read concurrently.
130        // Each emitted line resets the inactivity timeout.
131        let stdout_handle = child.stdout.take();
132        let stderr_handle = child.stderr.take();
133        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(256);
134
135        let stdout_task = stdout_handle.map(|stdout| {
136            let tx = event_tx.clone();
137            tokio::spawn(async move { read_stream(stdout, tx, StreamKind::Stdout).await })
138        });
139        let stderr_task = stderr_handle.map(|stderr| {
140            let tx = event_tx.clone();
141            tokio::spawn(async move { read_stream(stderr, tx, StreamKind::Stderr).await })
142        });
143        drop(event_tx);
144
145        let mut stdout_done = stdout_task.is_none();
146        let mut stderr_done = stderr_task.is_none();
147        let mut accumulated_output = String::new();
148
149        if let Some(duration) = timeout {
150            debug!(
151                timeout_secs = duration.as_secs(),
152                "Executing with inactivity timeout"
153            );
154        }
155
156        while !stdout_done || !stderr_done {
157            let now = tokio::time::Instant::now();
158            let effective_timeout = match (timeout, post_event_deadline) {
159                (_, Some(deadline)) if deadline <= now => Some(Duration::ZERO),
160                (Some(duration), Some(deadline)) => {
161                    Some(duration.min(deadline.saturating_duration_since(now)))
162                }
163                (None, Some(deadline)) => Some(deadline.saturating_duration_since(now)),
164                (Some(duration), None) => Some(duration),
165                (None, None) => None,
166            };
167
168            let next_event = match effective_timeout {
169                Some(duration) => match tokio::time::timeout(duration, event_rx.recv()).await {
170                    Ok(event) => event,
171                    Err(_) => {
172                        warn!(
173                            timeout_secs = duration.as_secs(),
174                            "Execution inactivity timeout reached, sending SIGTERM"
175                        );
176                        timed_out = true;
177                        terminated_status = Some(Self::terminate_child_and_wait(&mut child).await?);
178                        break;
179                    }
180                },
181                None => event_rx.recv().await,
182            };
183
184            match next_event {
185                Some(StreamEvent::StdoutLine(line)) => {
186                    if line_signals_event_emitted(&line) {
187                        post_event_deadline.get_or_insert_with(|| {
188                            tokio::time::Instant::now() + POST_EVENT_GRACE_TIMEOUT
189                        });
190                    }
191                    if self.backend.output_format == OutputFormat::CopilotStreamJson {
192                        if let Some(text) = CopilotStreamParser::extract_text(&line) {
193                            write!(output_writer, "{text}")?;
194                            if !text.ends_with('\n') {
195                                writeln!(output_writer)?;
196                            }
197                        }
198                    } else {
199                        writeln!(output_writer, "{line}")?;
200                    }
201                    output_writer.flush()?;
202                    accumulated_output.push_str(&line);
203                    accumulated_output.push('\n');
204                }
205                Some(StreamEvent::StderrLine(line)) => {
206                    if line_signals_event_emitted(&line) {
207                        post_event_deadline.get_or_insert_with(|| {
208                            tokio::time::Instant::now() + POST_EVENT_GRACE_TIMEOUT
209                        });
210                    }
211                    if verbose {
212                        writeln!(output_writer, "[stderr] {line}")?;
213                        output_writer.flush()?;
214                    }
215                    accumulated_output.push_str("[stderr] ");
216                    accumulated_output.push_str(&line);
217                    accumulated_output.push('\n');
218                }
219                Some(StreamEvent::StdoutEof) => stdout_done = true,
220                Some(StreamEvent::StderrEof) => stderr_done = true,
221                None => {
222                    stdout_done = true;
223                    stderr_done = true;
224                }
225            }
226        }
227
228        let status = if let Some(status) = terminated_status {
229            status
230        } else {
231            child.wait().await?
232        };
233
234        if let Some(handle) = stdout_task {
235            handle.await.map_err(join_error_to_io)??;
236        }
237        if let Some(handle) = stderr_task {
238            handle.await.map_err(join_error_to_io)??;
239        }
240
241        Ok(ExecutionResult {
242            output: accumulated_output,
243            success: status.success() && !timed_out,
244            exit_code: status.code(),
245            timed_out,
246        })
247    }
248
249    /// Terminates the child process with SIGTERM, then SIGKILL if it ignores graceful shutdown.
250    async fn terminate_child_and_wait(
251        child: &mut Child,
252    ) -> std::io::Result<std::process::ExitStatus> {
253        #[cfg(not(unix))]
254        {
255            child.start_kill()?;
256            return child.wait().await;
257        }
258
259        #[cfg(unix)]
260        if let Some(pid) = child.id() {
261            #[allow(clippy::cast_possible_wrap)]
262            let pid = Pid::from_raw(pid as i32);
263            let pgid = Pid::from_raw(-pid.as_raw());
264            debug!(%pid, "Sending SIGTERM to child process group");
265            let _ = kill(pgid, Signal::SIGTERM);
266            match tokio::time::timeout(TERMINATION_GRACE_TIMEOUT, child.wait()).await {
267                Ok(status) => status,
268                Err(_) => {
269                    warn!(%pid, "Child process ignored SIGTERM, sending SIGKILL");
270                    let _ = kill(pgid, Signal::SIGKILL);
271                    child.wait().await
272                }
273            }
274        } else {
275            child.wait().await
276        }
277    }
278
279    /// Executes a prompt without streaming (captures all output).
280    ///
281    /// Uses no timeout by default. For timed execution, use `execute_capture_with_timeout`.
282    pub async fn execute_capture(&self, prompt: &str) -> std::io::Result<ExecutionResult> {
283        self.execute_capture_with_timeout(prompt, None).await
284    }
285
286    /// Executes a prompt without streaming, with optional timeout.
287    pub async fn execute_capture_with_timeout(
288        &self,
289        prompt: &str,
290        timeout: Option<Duration>,
291    ) -> std::io::Result<ExecutionResult> {
292        // Use a sink that discards output for non-streaming execution
293        // verbose=false since output is being discarded anyway
294        let sink = std::io::sink();
295        self.execute(prompt, sink, timeout, false).await
296    }
297}
298
299fn line_signals_event_emitted(line: &str) -> bool {
300    line.contains("Event emitted:")
301}
302
303async fn read_stream<R>(
304    stream: R,
305    tx: tokio::sync::mpsc::Sender<StreamEvent>,
306    stream_kind: StreamKind,
307) -> std::io::Result<()>
308where
309    R: AsyncRead + Unpin,
310{
311    let reader = BufReader::new(stream);
312    let mut lines = reader.lines();
313    while let Some(line) = lines.next_line().await? {
314        let event = match stream_kind {
315            StreamKind::Stdout => StreamEvent::StdoutLine(line),
316            StreamKind::Stderr => StreamEvent::StderrLine(line),
317        };
318        if tx.send(event).await.is_err() {
319            return Ok(());
320        }
321    }
322
323    let eof_event = match stream_kind {
324        StreamKind::Stdout => StreamEvent::StdoutEof,
325        StreamKind::Stderr => StreamEvent::StderrEof,
326    };
327    let _ = tx.send(eof_event).await;
328    Ok(())
329}
330
331fn join_error_to_io(error: tokio::task::JoinError) -> std::io::Error {
332    std::io::Error::other(error.to_string())
333}
334
335fn inject_ralph_runtime_env(command: &mut Command, workspace_root: &std::path::Path) {
336    let Ok(current_exe) = env::current_exe() else {
337        return;
338    };
339    let Some(bin_dir) = current_exe.parent() else {
340        return;
341    };
342
343    let mut path_entries = vec![bin_dir.to_path_buf()];
344    if let Some(existing_path) = env::var_os("PATH") {
345        path_entries.extend(env::split_paths(&existing_path));
346    }
347
348    if let Ok(joined_path) = env::join_paths(path_entries) {
349        command.env("PATH", joined_path);
350    }
351    command.env("RALPH_BIN", &current_exe);
352    command.env("RALPH_WORKSPACE_ROOT", workspace_root);
353
354    // Propagate RALPH_EVENTS_FILE so `ralph emit` from any CWD writes to the correct events file
355    let marker = workspace_root.join(".ralph/current-events");
356    if let Ok(relative) = std::fs::read_to_string(&marker) {
357        let abs = workspace_root.join(relative.trim());
358        command.env("RALPH_EVENTS_FILE", &abs);
359    }
360
361    if std::path::Path::new("/var/tmp").is_dir() {
362        command.env("TMPDIR", "/var/tmp");
363        command.env("TMP", "/var/tmp");
364        command.env("TEMP", "/var/tmp");
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    #[tokio::test]
373    async fn test_execute_echo() {
374        // Use echo as a simple test backend
375        let backend = CliBackend {
376            command: "echo".to_string(),
377            args: vec![],
378            prompt_mode: PromptMode::Arg,
379            prompt_flag: None,
380            output_format: OutputFormat::Text,
381            env_vars: vec![],
382        };
383
384        let executor = CliExecutor::new(backend);
385        let mut output = Vec::new();
386
387        let result = executor
388            .execute("hello world", &mut output, None, true)
389            .await
390            .unwrap();
391
392        assert!(result.success);
393        assert!(!result.timed_out);
394        assert!(result.output.contains("hello world"));
395    }
396
397    #[tokio::test]
398    async fn test_execute_stdin() {
399        // Use cat to test stdin mode
400        let backend = CliBackend {
401            command: "cat".to_string(),
402            args: vec![],
403            prompt_mode: PromptMode::Stdin,
404            prompt_flag: None,
405            output_format: OutputFormat::Text,
406            env_vars: vec![],
407        };
408
409        let executor = CliExecutor::new(backend);
410        let result = executor.execute_capture("stdin test").await.unwrap();
411
412        assert!(result.success);
413        assert!(result.output.contains("stdin test"));
414    }
415
416    #[tokio::test]
417    async fn test_execute_failure() {
418        let backend = CliBackend {
419            command: "false".to_string(), // Always exits with code 1
420            args: vec![],
421            prompt_mode: PromptMode::Arg,
422            prompt_flag: None,
423            output_format: OutputFormat::Text,
424            env_vars: vec![],
425        };
426
427        let executor = CliExecutor::new(backend);
428        let result = executor.execute_capture("").await.unwrap();
429
430        assert!(!result.success);
431        assert!(!result.timed_out);
432        assert_eq!(result.exit_code, Some(1));
433    }
434
435    #[tokio::test]
436    async fn test_execute_timeout() {
437        // Use sleep to test timeout behavior
438        // The sleep command ignores stdin, so we use PromptMode::Stdin
439        // to avoid appending the prompt as an argument
440        let backend = CliBackend {
441            command: "sleep".to_string(),
442            args: vec!["10".to_string()],   // Sleep for 10 seconds
443            prompt_mode: PromptMode::Stdin, // Use stdin mode so prompt doesn't interfere
444            prompt_flag: None,
445            output_format: OutputFormat::Text,
446            env_vars: vec![],
447        };
448
449        let executor = CliExecutor::new(backend);
450
451        // Execute with a 100ms timeout - should trigger timeout
452        let timeout = Some(Duration::from_millis(100));
453        let result = executor
454            .execute_capture_with_timeout("", timeout)
455            .await
456            .unwrap();
457
458        assert!(result.timed_out, "Expected execution to time out");
459        assert!(
460            !result.success,
461            "Timed out execution should not be successful"
462        );
463    }
464
465    #[tokio::test]
466    async fn test_execute_timeout_resets_on_output_activity() {
467        let backend = CliBackend {
468            command: "sh".to_string(),
469            args: vec!["-c".to_string()],
470            prompt_mode: PromptMode::Arg,
471            prompt_flag: None,
472            output_format: OutputFormat::Text,
473            env_vars: vec![],
474        };
475
476        let executor = CliExecutor::new(backend);
477        let timeout = Some(Duration::from_millis(300));
478        let result = executor
479            .execute_capture_with_timeout(
480                "printf 'start\\n'; sleep 0.2; printf 'middle\\n'; sleep 0.2; printf 'done\\n'",
481                timeout,
482            )
483            .await
484            .unwrap();
485
486        assert!(
487            !result.timed_out,
488            "Periodic output should reset the inactivity timeout"
489        );
490        assert!(result.success, "Periodic-output command should succeed");
491        assert!(result.output.contains("start"));
492        assert!(result.output.contains("middle"));
493        assert!(result.output.contains("done"));
494    }
495
496    #[tokio::test]
497    async fn test_execute_streams_output_before_inactivity_timeout() {
498        let backend = CliBackend {
499            command: "sh".to_string(),
500            args: vec!["-c".to_string(), "printf 'hello\\n'; sleep 10".to_string()],
501            prompt_mode: PromptMode::Stdin,
502            prompt_flag: None,
503            output_format: OutputFormat::Text,
504            env_vars: vec![],
505        };
506
507        let executor = CliExecutor::new(backend);
508        let mut output = Vec::new();
509        let result = executor
510            .execute("", &mut output, Some(Duration::from_millis(200)), false)
511            .await
512            .unwrap();
513
514        assert!(
515            result.timed_out,
516            "Expected inactivity timeout after output stops"
517        );
518        assert_eq!(String::from_utf8(output).unwrap(), "hello\n");
519        assert!(result.output.contains("hello"));
520    }
521
522    #[tokio::test]
523    async fn test_execute_timeout_force_kills_processes_that_ignore_sigterm() {
524        let backend = CliBackend {
525            command: "sh".to_string(),
526            args: vec![
527                "-c".to_string(),
528                "trap '' TERM; while :; do sleep 1; done".to_string(),
529            ],
530            prompt_mode: PromptMode::Stdin,
531            prompt_flag: None,
532            output_format: OutputFormat::Text,
533            env_vars: vec![],
534        };
535
536        let executor = CliExecutor::new(backend);
537        let started = std::time::Instant::now();
538        let result = executor
539            .execute_capture_with_timeout("", Some(Duration::from_millis(100)))
540            .await
541            .unwrap();
542
543        assert!(
544            result.timed_out,
545            "Expected ignored-SIGTERM command to time out"
546        );
547        assert!(
548            started.elapsed() < Duration::from_secs(5),
549            "Executor should force-kill ignored-SIGTERM processes instead of hanging"
550        );
551    }
552
553    #[tokio::test]
554    async fn test_execute_uses_short_post_event_grace_timeout() {
555        let backend = CliBackend {
556            command: "sh".to_string(),
557            args: vec![
558                "-c".to_string(),
559                "printf 'Event emitted: task.done\\n'; sleep 30".to_string(),
560            ],
561            prompt_mode: PromptMode::Stdin,
562            prompt_flag: None,
563            output_format: OutputFormat::Text,
564            env_vars: vec![],
565        };
566
567        let executor = CliExecutor::new(backend);
568        let started = std::time::Instant::now();
569        let result = executor
570            .execute_capture_with_timeout("", Some(Duration::from_secs(30)))
571            .await
572            .unwrap();
573
574        assert!(
575            result.timed_out,
576            "Expected lingering post-event process to be terminated"
577        );
578        assert!(
579            started.elapsed() < Duration::from_secs(10),
580            "Event-emitting backends should use the short post-event grace timeout instead of the full inactivity timeout"
581        );
582        assert!(result.output.contains("Event emitted: task.done"));
583    }
584
585    #[tokio::test]
586    async fn test_execute_post_event_deadline_does_not_reset_on_output_activity() {
587        let backend = CliBackend {
588            command: "sh".to_string(),
589            args: vec![
590                "-c".to_string(),
591                "printf 'Event emitted: task.done\\n'; while :; do printf 'heartbeat\\n'; sleep 1; done"
592                    .to_string(),
593            ],
594            prompt_mode: PromptMode::Stdin,
595            prompt_flag: None,
596            output_format: OutputFormat::Text,
597            env_vars: vec![],
598        };
599
600        let executor = CliExecutor::new(backend);
601        let started = std::time::Instant::now();
602        let result = executor
603            .execute_capture_with_timeout("", Some(Duration::from_secs(30)))
604            .await
605            .unwrap();
606
607        assert!(
608            result.timed_out,
609            "Expected noisy post-event process to be terminated"
610        );
611        assert!(
612            started.elapsed() < Duration::from_secs(10),
613            "Event-emitting backends should respect the fixed post-event grace deadline even if they keep producing output"
614        );
615        assert!(result.output.contains("Event emitted: task.done"));
616        assert!(result.output.contains("heartbeat"));
617    }
618
619    #[tokio::test]
620    async fn test_execute_no_timeout_when_fast() {
621        // Use echo which completes immediately
622        let backend = CliBackend {
623            command: "echo".to_string(),
624            args: vec![],
625            prompt_mode: PromptMode::Arg,
626            prompt_flag: None,
627            output_format: OutputFormat::Text,
628            env_vars: vec![],
629        };
630
631        let executor = CliExecutor::new(backend);
632
633        // Execute with a generous timeout - should complete before timeout
634        let timeout = Some(Duration::from_secs(10));
635        let result = executor
636            .execute_capture_with_timeout("fast", timeout)
637            .await
638            .unwrap();
639
640        assert!(!result.timed_out, "Fast command should not time out");
641        assert!(result.success);
642        assert!(result.output.contains("fast"));
643    }
644
645    #[tokio::test]
646    async fn test_execute_copilot_stream_writes_extracted_text() {
647        let backend = CliBackend {
648            command: "printf".to_string(),
649            args: vec![
650                "%s\n%s\n".to_string(),
651                r#"{"type":"assistant.turn_start","data":{"turnId":"0"}}"#.to_string(),
652                r#"{"type":"assistant.message","data":{"content":"hello from copilot"}}"#
653                    .to_string(),
654            ],
655            prompt_mode: PromptMode::Stdin,
656            prompt_flag: None,
657            output_format: OutputFormat::CopilotStreamJson,
658            env_vars: vec![],
659        };
660
661        let executor = CliExecutor::new(backend);
662        let mut output = Vec::new();
663
664        let result = executor
665            .execute("ignored", &mut output, None, false)
666            .await
667            .unwrap();
668
669        assert!(result.success);
670        assert!(result.output.contains("\"assistant.message\""));
671        assert_eq!(String::from_utf8(output).unwrap(), "hello from copilot\n");
672    }
673}