1#[cfg(test)]
7use crate::cli_backend::PromptMode;
8use crate::cli_backend::{CliBackend, OutputFormat};
9use crate::copilot_stream::CopilotStreamParser;
10#[cfg(unix)]
11use nix::sys::signal::{Signal, kill};
12#[cfg(unix)]
13use nix::unistd::Pid;
14use std::env;
15use std::io::Write;
16use std::process::Stdio;
17use std::time::Duration;
18use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
19use tokio::process::{Child, Command};
20use tracing::{debug, warn};
21
22const POST_EVENT_GRACE_TIMEOUT: Duration = Duration::from_secs(5);
23const TERMINATION_GRACE_TIMEOUT: Duration = Duration::from_secs(2);
24
25#[derive(Debug)]
27pub struct ExecutionResult {
28 pub output: String,
30 pub success: bool,
32 pub exit_code: Option<i32>,
34 pub timed_out: bool,
36}
37
38#[derive(Debug)]
40pub struct CliExecutor {
41 backend: CliBackend,
42}
43
44enum StreamEvent {
45 StdoutLine(String),
46 StderrLine(String),
47 StdoutEof,
48 StderrEof,
49}
50
51enum StreamKind {
52 Stdout,
53 Stderr,
54}
55
56impl CliExecutor {
57 pub fn new(backend: CliBackend) -> Self {
59 Self { backend }
60 }
61
62 pub async fn execute<W: Write + Send>(
72 &self,
73 prompt: &str,
74 mut output_writer: W,
75 timeout: Option<Duration>,
76 verbose: bool,
77 ) -> std::io::Result<ExecutionResult> {
78 let (cmd, args, stdin_input, _temp_file) = self.backend.build_command(prompt, false);
81
82 let mut command = Command::new(&cmd);
83 command.args(&args);
84 command.stdout(Stdio::piped());
85 command.stderr(Stdio::piped());
86 #[cfg(unix)]
87 command.process_group(0);
88
89 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
92 command.current_dir(&cwd);
93 inject_ralph_runtime_env(&mut command, &cwd);
94
95 command.envs(self.backend.env_vars.iter().map(|(k, v)| (k, v)));
97
98 debug!(
99 command = %cmd,
100 args = ?args,
101 cwd = ?cwd,
102 "Spawning CLI command"
103 );
104
105 if stdin_input.is_some() {
106 command.stdin(Stdio::piped());
107 }
108
109 let mut child = command.spawn()?;
110
111 if let Some(input) = stdin_input
115 && let Some(mut stdin) = child.stdin.take()
116 {
117 if let Err(err) = stdin.write_all(input.as_bytes()).await
118 && err.kind() != std::io::ErrorKind::BrokenPipe
119 {
120 return Err(err);
121 }
122 drop(stdin); }
124
125 let mut timed_out = false;
126 let mut post_event_deadline: Option<tokio::time::Instant> = None;
127 let mut terminated_status = None;
128
129 let stdout_handle = child.stdout.take();
132 let stderr_handle = child.stderr.take();
133 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(256);
134
135 let stdout_task = stdout_handle.map(|stdout| {
136 let tx = event_tx.clone();
137 tokio::spawn(async move { read_stream(stdout, tx, StreamKind::Stdout).await })
138 });
139 let stderr_task = stderr_handle.map(|stderr| {
140 let tx = event_tx.clone();
141 tokio::spawn(async move { read_stream(stderr, tx, StreamKind::Stderr).await })
142 });
143 drop(event_tx);
144
145 let mut stdout_done = stdout_task.is_none();
146 let mut stderr_done = stderr_task.is_none();
147 let mut accumulated_output = String::new();
148
149 if let Some(duration) = timeout {
150 debug!(
151 timeout_secs = duration.as_secs(),
152 "Executing with inactivity timeout"
153 );
154 }
155
156 while !stdout_done || !stderr_done {
157 let now = tokio::time::Instant::now();
158 let effective_timeout = match (timeout, post_event_deadline) {
159 (_, Some(deadline)) if deadline <= now => Some(Duration::ZERO),
160 (Some(duration), Some(deadline)) => {
161 Some(duration.min(deadline.saturating_duration_since(now)))
162 }
163 (None, Some(deadline)) => Some(deadline.saturating_duration_since(now)),
164 (Some(duration), None) => Some(duration),
165 (None, None) => None,
166 };
167
168 let next_event = match effective_timeout {
169 Some(duration) => match tokio::time::timeout(duration, event_rx.recv()).await {
170 Ok(event) => event,
171 Err(_) => {
172 warn!(
173 timeout_secs = duration.as_secs(),
174 "Execution inactivity timeout reached, sending SIGTERM"
175 );
176 timed_out = true;
177 terminated_status = Some(Self::terminate_child_and_wait(&mut child).await?);
178 break;
179 }
180 },
181 None => event_rx.recv().await,
182 };
183
184 match next_event {
185 Some(StreamEvent::StdoutLine(line)) => {
186 if line_signals_event_emitted(&line) {
187 post_event_deadline.get_or_insert_with(|| {
188 tokio::time::Instant::now() + POST_EVENT_GRACE_TIMEOUT
189 });
190 }
191 if self.backend.output_format == OutputFormat::CopilotStreamJson {
192 if let Some(text) = CopilotStreamParser::extract_text(&line) {
193 write!(output_writer, "{text}")?;
194 if !text.ends_with('\n') {
195 writeln!(output_writer)?;
196 }
197 }
198 } else {
199 writeln!(output_writer, "{line}")?;
200 }
201 output_writer.flush()?;
202 accumulated_output.push_str(&line);
203 accumulated_output.push('\n');
204 }
205 Some(StreamEvent::StderrLine(line)) => {
206 if line_signals_event_emitted(&line) {
207 post_event_deadline.get_or_insert_with(|| {
208 tokio::time::Instant::now() + POST_EVENT_GRACE_TIMEOUT
209 });
210 }
211 if verbose {
212 writeln!(output_writer, "[stderr] {line}")?;
213 output_writer.flush()?;
214 }
215 accumulated_output.push_str("[stderr] ");
216 accumulated_output.push_str(&line);
217 accumulated_output.push('\n');
218 }
219 Some(StreamEvent::StdoutEof) => stdout_done = true,
220 Some(StreamEvent::StderrEof) => stderr_done = true,
221 None => {
222 stdout_done = true;
223 stderr_done = true;
224 }
225 }
226 }
227
228 let status = if let Some(status) = terminated_status {
229 status
230 } else {
231 child.wait().await?
232 };
233
234 if let Some(handle) = stdout_task {
235 handle.await.map_err(join_error_to_io)??;
236 }
237 if let Some(handle) = stderr_task {
238 handle.await.map_err(join_error_to_io)??;
239 }
240
241 Ok(ExecutionResult {
242 output: accumulated_output,
243 success: status.success() && !timed_out,
244 exit_code: status.code(),
245 timed_out,
246 })
247 }
248
249 async fn terminate_child_and_wait(
251 child: &mut Child,
252 ) -> std::io::Result<std::process::ExitStatus> {
253 #[cfg(not(unix))]
254 {
255 child.start_kill()?;
256 return child.wait().await;
257 }
258
259 #[cfg(unix)]
260 if let Some(pid) = child.id() {
261 #[allow(clippy::cast_possible_wrap)]
262 let pid = Pid::from_raw(pid as i32);
263 let pgid = Pid::from_raw(-pid.as_raw());
264 debug!(%pid, "Sending SIGTERM to child process group");
265 let _ = kill(pgid, Signal::SIGTERM);
266 match tokio::time::timeout(TERMINATION_GRACE_TIMEOUT, child.wait()).await {
267 Ok(status) => status,
268 Err(_) => {
269 warn!(%pid, "Child process ignored SIGTERM, sending SIGKILL");
270 let _ = kill(pgid, Signal::SIGKILL);
271 child.wait().await
272 }
273 }
274 } else {
275 child.wait().await
276 }
277 }
278
279 pub async fn execute_capture(&self, prompt: &str) -> std::io::Result<ExecutionResult> {
283 self.execute_capture_with_timeout(prompt, None).await
284 }
285
286 pub async fn execute_capture_with_timeout(
288 &self,
289 prompt: &str,
290 timeout: Option<Duration>,
291 ) -> std::io::Result<ExecutionResult> {
292 let sink = std::io::sink();
295 self.execute(prompt, sink, timeout, false).await
296 }
297}
298
299fn line_signals_event_emitted(line: &str) -> bool {
300 line.contains("Event emitted:")
301}
302
303async fn read_stream<R>(
304 stream: R,
305 tx: tokio::sync::mpsc::Sender<StreamEvent>,
306 stream_kind: StreamKind,
307) -> std::io::Result<()>
308where
309 R: AsyncRead + Unpin,
310{
311 let reader = BufReader::new(stream);
312 let mut lines = reader.lines();
313 while let Some(line) = lines.next_line().await? {
314 let event = match stream_kind {
315 StreamKind::Stdout => StreamEvent::StdoutLine(line),
316 StreamKind::Stderr => StreamEvent::StderrLine(line),
317 };
318 if tx.send(event).await.is_err() {
319 return Ok(());
320 }
321 }
322
323 let eof_event = match stream_kind {
324 StreamKind::Stdout => StreamEvent::StdoutEof,
325 StreamKind::Stderr => StreamEvent::StderrEof,
326 };
327 let _ = tx.send(eof_event).await;
328 Ok(())
329}
330
331fn join_error_to_io(error: tokio::task::JoinError) -> std::io::Error {
332 std::io::Error::other(error.to_string())
333}
334
335fn inject_ralph_runtime_env(command: &mut Command, workspace_root: &std::path::Path) {
336 let Ok(current_exe) = env::current_exe() else {
337 return;
338 };
339 let Some(bin_dir) = current_exe.parent() else {
340 return;
341 };
342
343 let mut path_entries = vec![bin_dir.to_path_buf()];
344 if let Some(existing_path) = env::var_os("PATH") {
345 path_entries.extend(env::split_paths(&existing_path));
346 }
347
348 if let Ok(joined_path) = env::join_paths(path_entries) {
349 command.env("PATH", joined_path);
350 }
351 command.env("RALPH_BIN", ¤t_exe);
352 command.env("RALPH_WORKSPACE_ROOT", workspace_root);
353
354 let marker = workspace_root.join(".ralph/current-events");
356 if let Ok(relative) = std::fs::read_to_string(&marker) {
357 let abs = workspace_root.join(relative.trim());
358 command.env("RALPH_EVENTS_FILE", &abs);
359 }
360
361 if std::path::Path::new("/var/tmp").is_dir() {
362 command.env("TMPDIR", "/var/tmp");
363 command.env("TMP", "/var/tmp");
364 command.env("TEMP", "/var/tmp");
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371
372 #[tokio::test]
373 async fn test_execute_echo() {
374 let backend = CliBackend {
376 command: "echo".to_string(),
377 args: vec![],
378 prompt_mode: PromptMode::Arg,
379 prompt_flag: None,
380 output_format: OutputFormat::Text,
381 env_vars: vec![],
382 };
383
384 let executor = CliExecutor::new(backend);
385 let mut output = Vec::new();
386
387 let result = executor
388 .execute("hello world", &mut output, None, true)
389 .await
390 .unwrap();
391
392 assert!(result.success);
393 assert!(!result.timed_out);
394 assert!(result.output.contains("hello world"));
395 }
396
397 #[tokio::test]
398 async fn test_execute_stdin() {
399 let backend = CliBackend {
401 command: "cat".to_string(),
402 args: vec![],
403 prompt_mode: PromptMode::Stdin,
404 prompt_flag: None,
405 output_format: OutputFormat::Text,
406 env_vars: vec![],
407 };
408
409 let executor = CliExecutor::new(backend);
410 let result = executor.execute_capture("stdin test").await.unwrap();
411
412 assert!(result.success);
413 assert!(result.output.contains("stdin test"));
414 }
415
416 #[tokio::test]
417 async fn test_execute_failure() {
418 let backend = CliBackend {
419 command: "false".to_string(), args: vec![],
421 prompt_mode: PromptMode::Arg,
422 prompt_flag: None,
423 output_format: OutputFormat::Text,
424 env_vars: vec![],
425 };
426
427 let executor = CliExecutor::new(backend);
428 let result = executor.execute_capture("").await.unwrap();
429
430 assert!(!result.success);
431 assert!(!result.timed_out);
432 assert_eq!(result.exit_code, Some(1));
433 }
434
435 #[tokio::test]
436 async fn test_execute_timeout() {
437 let backend = CliBackend {
441 command: "sleep".to_string(),
442 args: vec!["10".to_string()], prompt_mode: PromptMode::Stdin, prompt_flag: None,
445 output_format: OutputFormat::Text,
446 env_vars: vec![],
447 };
448
449 let executor = CliExecutor::new(backend);
450
451 let timeout = Some(Duration::from_millis(100));
453 let result = executor
454 .execute_capture_with_timeout("", timeout)
455 .await
456 .unwrap();
457
458 assert!(result.timed_out, "Expected execution to time out");
459 assert!(
460 !result.success,
461 "Timed out execution should not be successful"
462 );
463 }
464
465 #[tokio::test]
466 async fn test_execute_timeout_resets_on_output_activity() {
467 let backend = CliBackend {
468 command: "sh".to_string(),
469 args: vec!["-c".to_string()],
470 prompt_mode: PromptMode::Arg,
471 prompt_flag: None,
472 output_format: OutputFormat::Text,
473 env_vars: vec![],
474 };
475
476 let executor = CliExecutor::new(backend);
477 let timeout = Some(Duration::from_millis(300));
478 let result = executor
479 .execute_capture_with_timeout(
480 "printf 'start\\n'; sleep 0.2; printf 'middle\\n'; sleep 0.2; printf 'done\\n'",
481 timeout,
482 )
483 .await
484 .unwrap();
485
486 assert!(
487 !result.timed_out,
488 "Periodic output should reset the inactivity timeout"
489 );
490 assert!(result.success, "Periodic-output command should succeed");
491 assert!(result.output.contains("start"));
492 assert!(result.output.contains("middle"));
493 assert!(result.output.contains("done"));
494 }
495
496 #[tokio::test]
497 async fn test_execute_streams_output_before_inactivity_timeout() {
498 let backend = CliBackend {
499 command: "sh".to_string(),
500 args: vec!["-c".to_string(), "printf 'hello\\n'; sleep 10".to_string()],
501 prompt_mode: PromptMode::Stdin,
502 prompt_flag: None,
503 output_format: OutputFormat::Text,
504 env_vars: vec![],
505 };
506
507 let executor = CliExecutor::new(backend);
508 let mut output = Vec::new();
509 let result = executor
510 .execute("", &mut output, Some(Duration::from_millis(200)), false)
511 .await
512 .unwrap();
513
514 assert!(
515 result.timed_out,
516 "Expected inactivity timeout after output stops"
517 );
518 assert_eq!(String::from_utf8(output).unwrap(), "hello\n");
519 assert!(result.output.contains("hello"));
520 }
521
522 #[tokio::test]
523 async fn test_execute_timeout_force_kills_processes_that_ignore_sigterm() {
524 let backend = CliBackend {
525 command: "sh".to_string(),
526 args: vec![
527 "-c".to_string(),
528 "trap '' TERM; while :; do sleep 1; done".to_string(),
529 ],
530 prompt_mode: PromptMode::Stdin,
531 prompt_flag: None,
532 output_format: OutputFormat::Text,
533 env_vars: vec![],
534 };
535
536 let executor = CliExecutor::new(backend);
537 let started = std::time::Instant::now();
538 let result = executor
539 .execute_capture_with_timeout("", Some(Duration::from_millis(100)))
540 .await
541 .unwrap();
542
543 assert!(
544 result.timed_out,
545 "Expected ignored-SIGTERM command to time out"
546 );
547 assert!(
548 started.elapsed() < Duration::from_secs(5),
549 "Executor should force-kill ignored-SIGTERM processes instead of hanging"
550 );
551 }
552
553 #[tokio::test]
554 async fn test_execute_uses_short_post_event_grace_timeout() {
555 let backend = CliBackend {
556 command: "sh".to_string(),
557 args: vec![
558 "-c".to_string(),
559 "printf 'Event emitted: task.done\\n'; sleep 30".to_string(),
560 ],
561 prompt_mode: PromptMode::Stdin,
562 prompt_flag: None,
563 output_format: OutputFormat::Text,
564 env_vars: vec![],
565 };
566
567 let executor = CliExecutor::new(backend);
568 let started = std::time::Instant::now();
569 let result = executor
570 .execute_capture_with_timeout("", Some(Duration::from_secs(30)))
571 .await
572 .unwrap();
573
574 assert!(
575 result.timed_out,
576 "Expected lingering post-event process to be terminated"
577 );
578 assert!(
579 started.elapsed() < Duration::from_secs(10),
580 "Event-emitting backends should use the short post-event grace timeout instead of the full inactivity timeout"
581 );
582 assert!(result.output.contains("Event emitted: task.done"));
583 }
584
585 #[tokio::test]
586 async fn test_execute_post_event_deadline_does_not_reset_on_output_activity() {
587 let backend = CliBackend {
588 command: "sh".to_string(),
589 args: vec![
590 "-c".to_string(),
591 "printf 'Event emitted: task.done\\n'; while :; do printf 'heartbeat\\n'; sleep 1; done"
592 .to_string(),
593 ],
594 prompt_mode: PromptMode::Stdin,
595 prompt_flag: None,
596 output_format: OutputFormat::Text,
597 env_vars: vec![],
598 };
599
600 let executor = CliExecutor::new(backend);
601 let started = std::time::Instant::now();
602 let result = executor
603 .execute_capture_with_timeout("", Some(Duration::from_secs(30)))
604 .await
605 .unwrap();
606
607 assert!(
608 result.timed_out,
609 "Expected noisy post-event process to be terminated"
610 );
611 assert!(
612 started.elapsed() < Duration::from_secs(10),
613 "Event-emitting backends should respect the fixed post-event grace deadline even if they keep producing output"
614 );
615 assert!(result.output.contains("Event emitted: task.done"));
616 assert!(result.output.contains("heartbeat"));
617 }
618
619 #[tokio::test]
620 async fn test_execute_no_timeout_when_fast() {
621 let backend = CliBackend {
623 command: "echo".to_string(),
624 args: vec![],
625 prompt_mode: PromptMode::Arg,
626 prompt_flag: None,
627 output_format: OutputFormat::Text,
628 env_vars: vec![],
629 };
630
631 let executor = CliExecutor::new(backend);
632
633 let timeout = Some(Duration::from_secs(10));
635 let result = executor
636 .execute_capture_with_timeout("fast", timeout)
637 .await
638 .unwrap();
639
640 assert!(!result.timed_out, "Fast command should not time out");
641 assert!(result.success);
642 assert!(result.output.contains("fast"));
643 }
644
645 #[tokio::test]
646 async fn test_execute_copilot_stream_writes_extracted_text() {
647 let backend = CliBackend {
648 command: "printf".to_string(),
649 args: vec![
650 "%s\n%s\n".to_string(),
651 r#"{"type":"assistant.turn_start","data":{"turnId":"0"}}"#.to_string(),
652 r#"{"type":"assistant.message","data":{"content":"hello from copilot"}}"#
653 .to_string(),
654 ],
655 prompt_mode: PromptMode::Stdin,
656 prompt_flag: None,
657 output_format: OutputFormat::CopilotStreamJson,
658 env_vars: vec![],
659 };
660
661 let executor = CliExecutor::new(backend);
662 let mut output = Vec::new();
663
664 let result = executor
665 .execute("ignored", &mut output, None, false)
666 .await
667 .unwrap();
668
669 assert!(result.success);
670 assert!(result.output.contains("\"assistant.message\""));
671 assert_eq!(String::from_utf8(output).unwrap(), "hello from copilot\n");
672 }
673}