1#![allow(clippy::too_many_lines)]
8
9use crate::ir::Task as IRTask;
10use std::collections::BTreeMap;
11use std::path::PathBuf;
12use std::process::Stdio;
13use thiserror::Error;
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::process::Command;
16
17#[derive(Debug, Error)]
19pub enum RunnerError {
20 #[error("Task '{task}' has empty command")]
22 EmptyCommand { task: String },
23
24 #[error("Failed to spawn task '{task}': {source}")]
26 SpawnFailed {
27 task: String,
28 #[source]
29 source: std::io::Error,
30 },
31
32 #[error("Task '{task}' execution failed: {source}")]
34 ExecutionFailed {
35 task: String,
36 #[source]
37 source: std::io::Error,
38 },
39}
40
41#[derive(Debug, Clone)]
43pub struct TaskOutput {
44 pub task_id: String,
46 pub exit_code: i32,
48 pub stdout: String,
50 pub stderr: String,
52 pub success: bool,
54 pub from_cache: bool,
56 pub duration_ms: u64,
58}
59
60impl TaskOutput {
61 #[must_use]
63 pub const fn from_cache(task_id: String, duration_ms: u64) -> Self {
64 Self {
65 task_id,
66 exit_code: 0,
67 stdout: String::new(),
68 stderr: String::new(),
69 success: true,
70 from_cache: true,
71 duration_ms,
72 }
73 }
74
75 #[must_use]
77 pub const fn dry_run(task_id: String) -> Self {
78 Self {
79 task_id,
80 exit_code: 0,
81 stdout: String::new(),
82 stderr: String::new(),
83 success: true,
84 from_cache: false,
85 duration_ms: 0,
86 }
87 }
88}
89
90pub const DEFAULT_SHELL: &str = "/bin/sh";
92
93pub struct IRTaskRunner {
95 project_root: PathBuf,
97 capture_output: cuenv_core::OutputCapture,
99 shell_path: String,
101}
102
103impl IRTaskRunner {
104 #[must_use]
106 pub fn new(project_root: PathBuf, capture_output: cuenv_core::OutputCapture) -> Self {
107 Self {
108 project_root,
109 capture_output,
110 shell_path: DEFAULT_SHELL.to_string(),
111 }
112 }
113
114 #[must_use]
116 pub fn with_shell(
117 project_root: PathBuf,
118 capture_output: cuenv_core::OutputCapture,
119 shell_path: impl Into<String>,
120 ) -> Self {
121 Self {
122 project_root,
123 capture_output,
124 shell_path: shell_path.into(),
125 }
126 }
127
128 #[tracing::instrument(
137 name = "execute_task",
138 fields(task_id = %task.id, shell = task.shell),
139 skip(self, env)
140 )]
141 pub async fn execute(
142 &self,
143 task: &IRTask,
144 env: BTreeMap<String, String>,
145 ) -> Result<TaskOutput, RunnerError> {
146 if task.command.is_empty() {
147 return Err(RunnerError::EmptyCommand {
148 task: task.id.clone(),
149 });
150 }
151
152 let start = std::time::Instant::now();
153
154 let mut cmd = if task.shell {
156 let shell_cmd = task.command.join(" ");
158 tracing::debug!(shell_cmd = %shell_cmd, shell = %self.shell_path, "Running in shell mode");
159
160 let mut c = Command::new(&self.shell_path);
161 c.arg("-c");
162 c.arg(&shell_cmd);
163 c
164 } else {
165 tracing::debug!(cmd = ?task.command, "Running in direct mode");
167
168 let mut c = Command::new(&task.command[0]);
169 if task.command.len() > 1 {
170 c.args(&task.command[1..]);
171 }
172 c
173 };
174
175 cmd.current_dir(&self.project_root);
177
178 cmd.env_clear();
180 for (k, v) in &env {
181 cmd.env(k, v);
182 }
183
184 if !env.contains_key("PATH")
187 && let Ok(path) = std::env::var("PATH")
188 {
189 cmd.env("PATH", path);
190 }
191 if !env.contains_key("HOME")
192 && let Ok(home) = std::env::var("HOME")
193 {
194 cmd.env("HOME", home);
195 }
196
197 if !env.contains_key("FORCE_COLOR") {
200 cmd.env("FORCE_COLOR", "1");
201 }
202 if !env.contains_key("CLICOLOR_FORCE") {
203 cmd.env("CLICOLOR_FORCE", "1");
204 }
205
206 if self.capture_output.should_capture() {
208 cmd.stdout(Stdio::piped());
209 cmd.stderr(Stdio::piped());
210 } else {
211 cmd.stdout(Stdio::inherit());
212 cmd.stderr(Stdio::inherit());
213 }
214
215 tracing::info!(task = %task.id, "Starting task execution");
217
218 let mut child = cmd.spawn().map_err(|e| RunnerError::SpawnFailed {
220 task: task.id.clone(),
221 source: e,
222 })?;
223
224 let (stdout_content, stderr_content) = if self.capture_output.should_capture() {
226 let stdout_handle = child.stdout.take();
227 let stderr_handle = child.stderr.take();
228
229 let task_id_stdout = task.id.clone();
230 let task_id_stderr = task.id.clone();
231
232 let stdout_task = tokio::spawn(async move {
234 let mut lines = Vec::new();
235 if let Some(stdout) = stdout_handle {
236 let mut reader = BufReader::new(stdout).lines();
237 while let Ok(Some(line)) = reader.next_line().await {
238 cuenv_events::emit_task_output!(&task_id_stdout, "stdout", &line);
240 lines.push(line);
241 }
242 }
243 lines.join("\n")
244 });
245
246 let stderr_task = tokio::spawn(async move {
248 let mut lines = Vec::new();
249 if let Some(stderr) = stderr_handle {
250 let mut reader = BufReader::new(stderr).lines();
251 while let Ok(Some(line)) = reader.next_line().await {
252 cuenv_events::emit_task_output!(&task_id_stderr, "stderr", &line);
254 lines.push(line);
255 }
256 }
257 lines.join("\n")
258 });
259
260 let stdout = stdout_task.await.unwrap_or_default();
262 let stderr = stderr_task.await.unwrap_or_default();
263 (stdout, stderr)
264 } else {
265 (String::new(), String::new())
266 };
267
268 let status = child
270 .wait()
271 .await
272 .map_err(|e| RunnerError::ExecutionFailed {
273 task: task.id.clone(),
274 source: e,
275 })?;
276
277 let duration = start.elapsed();
278 let exit_code = status.code().unwrap_or(-1);
279 let success = status.success();
280
281 let duration_ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX);
282 tracing::info!(
283 task = %task.id,
284 exit_code = exit_code,
285 success = success,
286 duration_ms,
287 "Task execution completed"
288 );
289
290 Ok(TaskOutput {
291 task_id: task.id.clone(),
292 exit_code,
293 stdout: stdout_content,
294 stderr: stderr_content,
295 success,
296 from_cache: false,
297 duration_ms,
298 })
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use crate::ir::CachePolicy;
306 use std::collections::BTreeMap;
307 use std::path::Path;
308 use tempfile::TempDir;
309
310 fn make_task(id: &str, command: &[&str], shell: bool) -> IRTask {
311 IRTask {
312 id: id.to_string(),
313 runtime: None,
314 command: command.iter().map(|s| (*s).to_string()).collect(),
315 shell,
316 env: BTreeMap::new(),
317 secrets: BTreeMap::new(),
318 resources: None,
319 concurrency_group: None,
320 inputs: vec![],
321 outputs: vec![],
322 depends_on: vec![],
323 cache_policy: CachePolicy::Normal,
324 deployment: false,
325 manual_approval: false,
326 matrix: None,
327 artifact_downloads: vec![],
328 params: BTreeMap::new(),
329 phase: None,
330 label: None,
331 priority: None,
332 contributor: None,
333 condition: None,
334 provider_hints: None,
335 }
336 }
337
338 #[tokio::test]
339 async fn test_simple_command() {
340 let tmp = TempDir::new().unwrap();
341 let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
342 let echo_path = if Path::new("/bin/echo").exists() {
343 "/bin/echo"
344 } else {
345 "echo"
346 };
347 let task = make_task("test", &[echo_path, "hello"], false);
348
349 let result = runner.execute(&task, BTreeMap::new()).await.unwrap();
350
351 assert!(result.success);
352 assert_eq!(result.exit_code, 0);
353 assert!(result.stdout.contains("hello"));
354 assert!(!result.from_cache);
355 }
356
357 #[tokio::test]
358 async fn test_shell_mode() {
359 let tmp = TempDir::new().unwrap();
360 let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
361 let task = make_task("test", &["echo", "hello", "&&", "echo", "world"], true);
362
363 let result = runner.execute(&task, BTreeMap::new()).await.unwrap();
364
365 assert!(result.success);
366 assert!(result.stdout.contains("hello"));
367 assert!(result.stdout.contains("world"));
368 }
369
370 #[tokio::test]
371 async fn test_env_injection() {
372 let tmp = TempDir::new().unwrap();
373 let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
374 let task = make_task("test", &["printenv", "MY_VAR"], false);
375
376 let env = BTreeMap::from([("MY_VAR".to_string(), "test_value".to_string())]);
377 let result = runner.execute(&task, env).await.unwrap();
378
379 assert!(result.success);
380 assert!(result.stdout.contains("test_value"));
381 }
382
383 #[tokio::test]
384 async fn test_failing_command() {
385 let tmp = TempDir::new().unwrap();
386 let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
387 let task = make_task("test", &["false"], false);
388
389 let result = runner.execute(&task, BTreeMap::new()).await.unwrap();
390
391 assert!(!result.success);
392 assert_ne!(result.exit_code, 0);
393 }
394
395 #[tokio::test]
396 async fn test_empty_command_error() {
397 let tmp = TempDir::new().unwrap();
398 let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
399 let task = make_task("test", &[], false);
400
401 let result = runner.execute(&task, BTreeMap::new()).await;
402 assert!(matches!(result, Err(RunnerError::EmptyCommand { .. })));
403 }
404
405 #[test]
406 fn test_cached_output() {
407 let output = TaskOutput::from_cache("test".to_string(), 100);
408 assert!(output.success);
409 assert!(output.from_cache);
410 assert_eq!(output.duration_ms, 100);
411 }
412
413 #[test]
414 fn test_dry_run_output() {
415 let output = TaskOutput::dry_run("test".to_string());
416 assert!(output.success);
417 assert!(!output.from_cache);
418 assert_eq!(output.duration_ms, 0);
419 }
420
421 #[tokio::test]
422 #[ignore = "requires /bin/bash which may not exist in sandboxed builds"]
423 async fn test_custom_shell() {
424 let tmp = TempDir::new().unwrap();
425 let runner = IRTaskRunner::with_shell(tmp.path().to_path_buf(), true.into(), "/bin/bash");
427 let task = make_task("test", &["echo", "$BASH_VERSION"], true);
428
429 let result = runner.execute(&task, BTreeMap::new()).await.unwrap();
430
431 assert!(result.success);
433 }
434
435 #[test]
436 fn test_runner_default_shell() {
437 let tmp = TempDir::new().unwrap();
438 let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
439 assert_eq!(runner.shell_path, "/bin/sh");
440 }
441}