Skip to main content

cuenv_ci/executor/
runner.rs

1//! IR Task Runner
2//!
3//! Executes individual IR tasks with proper command handling, environment
4//! injection, and output streaming with secret redaction.
5
6// Task execution with output streaming and error handling
7#![allow(clippy::too_many_lines)]
8
9use crate::ir::Task as IRTask;
10use std::collections::BTreeMap;
11use std::path::PathBuf;
12use std::process::Stdio;
13use thiserror::Error;
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::process::Command;
16
17/// Error types for task execution
18#[derive(Debug, Error)]
19pub enum RunnerError {
20    /// Task command is empty
21    #[error("Task '{task}' has empty command")]
22    EmptyCommand { task: String },
23
24    /// Process spawn failed
25    #[error("Failed to spawn task '{task}': {source}")]
26    SpawnFailed {
27        task: String,
28        #[source]
29        source: std::io::Error,
30    },
31
32    /// Process execution failed
33    #[error("Task '{task}' execution failed: {source}")]
34    ExecutionFailed {
35        task: String,
36        #[source]
37        source: std::io::Error,
38    },
39}
40
41/// Output from task execution
42#[derive(Debug, Clone)]
43pub struct TaskOutput {
44    /// Task ID
45    pub task_id: String,
46    /// Process exit code
47    pub exit_code: i32,
48    /// Captured stdout
49    pub stdout: String,
50    /// Captured stderr
51    pub stderr: String,
52    /// Whether the task succeeded
53    pub success: bool,
54    /// Whether result was from cache
55    pub from_cache: bool,
56    /// Execution duration in milliseconds
57    pub duration_ms: u64,
58}
59
60impl TaskOutput {
61    /// Create a cached result (no actual execution)
62    #[must_use]
63    pub const fn from_cache(task_id: String, duration_ms: u64) -> Self {
64        Self {
65            task_id,
66            exit_code: 0,
67            stdout: String::new(),
68            stderr: String::new(),
69            success: true,
70            from_cache: true,
71            duration_ms,
72        }
73    }
74
75    /// Create a dry-run result
76    #[must_use]
77    pub const fn dry_run(task_id: String) -> Self {
78        Self {
79            task_id,
80            exit_code: 0,
81            stdout: String::new(),
82            stderr: String::new(),
83            success: true,
84            from_cache: false,
85            duration_ms: 0,
86        }
87    }
88}
89
90/// Default shell path for task execution
91pub const DEFAULT_SHELL: &str = "/bin/sh";
92
93/// Runner for executing IR tasks
94pub struct IRTaskRunner {
95    /// Working directory for task execution
96    project_root: PathBuf,
97    /// Whether to capture output
98    capture_output: cuenv_core::OutputCapture,
99    /// Shell path for shell-mode execution
100    shell_path: String,
101}
102
103impl IRTaskRunner {
104    /// Create a new task runner with default shell
105    #[must_use]
106    pub fn new(project_root: PathBuf, capture_output: cuenv_core::OutputCapture) -> Self {
107        Self {
108            project_root,
109            capture_output,
110            shell_path: DEFAULT_SHELL.to_string(),
111        }
112    }
113
114    /// Create a new task runner with custom shell path
115    #[must_use]
116    pub fn with_shell(
117        project_root: PathBuf,
118        capture_output: cuenv_core::OutputCapture,
119        shell_path: impl Into<String>,
120    ) -> Self {
121        Self {
122            project_root,
123            capture_output,
124            shell_path: shell_path.into(),
125        }
126    }
127
128    /// Execute a single IR task
129    ///
130    /// # Arguments
131    /// * `task` - The IR task definition
132    /// * `env` - Environment variables to inject (includes resolved secrets)
133    ///
134    /// # Errors
135    /// Returns error if the task command is empty or execution fails
136    #[tracing::instrument(
137        name = "execute_task",
138        fields(task_id = %task.id, shell = task.shell),
139        skip(self, env)
140    )]
141    pub async fn execute(
142        &self,
143        task: &IRTask,
144        env: BTreeMap<String, String>,
145    ) -> Result<TaskOutput, RunnerError> {
146        if task.command.is_empty() {
147            return Err(RunnerError::EmptyCommand {
148                task: task.id.clone(),
149            });
150        }
151
152        let start = std::time::Instant::now();
153
154        // Build command based on shell mode
155        let mut cmd = if task.shell {
156            // Shell mode: wrap command in shell -c
157            let shell_cmd = task.command.join(" ");
158            tracing::debug!(shell_cmd = %shell_cmd, shell = %self.shell_path, "Running in shell mode");
159
160            let mut c = Command::new(&self.shell_path);
161            c.arg("-c");
162            c.arg(&shell_cmd);
163            c
164        } else {
165            // Direct mode: execve
166            tracing::debug!(cmd = ?task.command, "Running in direct mode");
167
168            let mut c = Command::new(&task.command[0]);
169            if task.command.len() > 1 {
170                c.args(&task.command[1..]);
171            }
172            c
173        };
174
175        // Set working directory
176        cmd.current_dir(&self.project_root);
177
178        // Clear environment and inject our variables
179        cmd.env_clear();
180        for (k, v) in &env {
181            cmd.env(k, v);
182        }
183
184        // Only inject essential env vars if not already provided in the env map
185        // (CI orchestrator may provide custom PATH with tool directories)
186        if !env.contains_key("PATH")
187            && let Ok(path) = std::env::var("PATH")
188        {
189            cmd.env("PATH", path);
190        }
191        if !env.contains_key("HOME")
192            && let Ok(home) = std::env::var("HOME")
193        {
194            cmd.env("HOME", home);
195        }
196
197        // Force color output even when stdout is piped
198        // These are widely supported: FORCE_COLOR by Node.js/chalk, CLICOLOR_FORCE by BSD/macOS
199        if !env.contains_key("FORCE_COLOR") {
200            cmd.env("FORCE_COLOR", "1");
201        }
202        if !env.contains_key("CLICOLOR_FORCE") {
203            cmd.env("CLICOLOR_FORCE", "1");
204        }
205
206        // Configure output - always pipe for streaming, or inherit if not capturing
207        if self.capture_output.should_capture() {
208            cmd.stdout(Stdio::piped());
209            cmd.stderr(Stdio::piped());
210        } else {
211            cmd.stdout(Stdio::inherit());
212            cmd.stderr(Stdio::inherit());
213        }
214
215        // Execute
216        tracing::info!(task = %task.id, "Starting task execution");
217
218        // Spawn the process
219        let mut child = cmd.spawn().map_err(|e| RunnerError::SpawnFailed {
220            task: task.id.clone(),
221            source: e,
222        })?;
223
224        // Collect output while streaming via events
225        let (stdout_content, stderr_content) = if self.capture_output.should_capture() {
226            let stdout_handle = child.stdout.take();
227            let stderr_handle = child.stderr.take();
228
229            let task_id_stdout = task.id.clone();
230            let task_id_stderr = task.id.clone();
231
232            // Stream stdout line-by-line
233            let stdout_task = tokio::spawn(async move {
234                let mut lines = Vec::new();
235                if let Some(stdout) = stdout_handle {
236                    let mut reader = BufReader::new(stdout).lines();
237                    while let Ok(Some(line)) = reader.next_line().await {
238                        // Emit via event system - redaction happens in the event layer
239                        cuenv_events::emit_task_output!(&task_id_stdout, "stdout", &line);
240                        lines.push(line);
241                    }
242                }
243                lines.join("\n")
244            });
245
246            // Stream stderr line-by-line
247            let stderr_task = tokio::spawn(async move {
248                let mut lines = Vec::new();
249                if let Some(stderr) = stderr_handle {
250                    let mut reader = BufReader::new(stderr).lines();
251                    while let Ok(Some(line)) = reader.next_line().await {
252                        // Emit via event system - redaction happens in the event layer
253                        cuenv_events::emit_task_output!(&task_id_stderr, "stderr", &line);
254                        lines.push(line);
255                    }
256                }
257                lines.join("\n")
258            });
259
260            // Wait for both streams to complete
261            let stdout = stdout_task.await.unwrap_or_default();
262            let stderr = stderr_task.await.unwrap_or_default();
263            (stdout, stderr)
264        } else {
265            (String::new(), String::new())
266        };
267
268        // Wait for the process to complete
269        let status = child
270            .wait()
271            .await
272            .map_err(|e| RunnerError::ExecutionFailed {
273                task: task.id.clone(),
274                source: e,
275            })?;
276
277        let duration = start.elapsed();
278        let exit_code = status.code().unwrap_or(-1);
279        let success = status.success();
280
281        let duration_ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX);
282        tracing::info!(
283            task = %task.id,
284            exit_code = exit_code,
285            success = success,
286            duration_ms,
287            "Task execution completed"
288        );
289
290        Ok(TaskOutput {
291            task_id: task.id.clone(),
292            exit_code,
293            stdout: stdout_content,
294            stderr: stderr_content,
295            success,
296            from_cache: false,
297            duration_ms,
298        })
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use crate::ir::CachePolicy;
306    use std::collections::BTreeMap;
307    use std::path::Path;
308    use tempfile::TempDir;
309
310    fn make_task(id: &str, command: &[&str], shell: bool) -> IRTask {
311        IRTask {
312            id: id.to_string(),
313            runtime: None,
314            command: command.iter().map(|s| (*s).to_string()).collect(),
315            shell,
316            env: BTreeMap::new(),
317            secrets: BTreeMap::new(),
318            resources: None,
319            concurrency_group: None,
320            inputs: vec![],
321            outputs: vec![],
322            depends_on: vec![],
323            cache_policy: CachePolicy::Normal,
324            deployment: false,
325            manual_approval: false,
326            matrix: None,
327            artifact_downloads: vec![],
328            params: BTreeMap::new(),
329            phase: None,
330            label: None,
331            priority: None,
332            contributor: None,
333            condition: None,
334            provider_hints: None,
335        }
336    }
337
338    #[tokio::test]
339    async fn test_simple_command() {
340        let tmp = TempDir::new().unwrap();
341        let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
342        let echo_path = if Path::new("/bin/echo").exists() {
343            "/bin/echo"
344        } else {
345            "echo"
346        };
347        let task = make_task("test", &[echo_path, "hello"], false);
348
349        let result = runner.execute(&task, BTreeMap::new()).await.unwrap();
350
351        assert!(result.success);
352        assert_eq!(result.exit_code, 0);
353        assert!(result.stdout.contains("hello"));
354        assert!(!result.from_cache);
355    }
356
357    #[tokio::test]
358    async fn test_shell_mode() {
359        let tmp = TempDir::new().unwrap();
360        let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
361        let task = make_task("test", &["echo", "hello", "&&", "echo", "world"], true);
362
363        let result = runner.execute(&task, BTreeMap::new()).await.unwrap();
364
365        assert!(result.success);
366        assert!(result.stdout.contains("hello"));
367        assert!(result.stdout.contains("world"));
368    }
369
370    #[tokio::test]
371    async fn test_env_injection() {
372        let tmp = TempDir::new().unwrap();
373        let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
374        let task = make_task("test", &["printenv", "MY_VAR"], false);
375
376        let env = BTreeMap::from([("MY_VAR".to_string(), "test_value".to_string())]);
377        let result = runner.execute(&task, env).await.unwrap();
378
379        assert!(result.success);
380        assert!(result.stdout.contains("test_value"));
381    }
382
383    #[tokio::test]
384    async fn test_failing_command() {
385        let tmp = TempDir::new().unwrap();
386        let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
387        let task = make_task("test", &["false"], false);
388
389        let result = runner.execute(&task, BTreeMap::new()).await.unwrap();
390
391        assert!(!result.success);
392        assert_ne!(result.exit_code, 0);
393    }
394
395    #[tokio::test]
396    async fn test_empty_command_error() {
397        let tmp = TempDir::new().unwrap();
398        let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
399        let task = make_task("test", &[], false);
400
401        let result = runner.execute(&task, BTreeMap::new()).await;
402        assert!(matches!(result, Err(RunnerError::EmptyCommand { .. })));
403    }
404
405    #[test]
406    fn test_cached_output() {
407        let output = TaskOutput::from_cache("test".to_string(), 100);
408        assert!(output.success);
409        assert!(output.from_cache);
410        assert_eq!(output.duration_ms, 100);
411    }
412
413    #[test]
414    fn test_dry_run_output() {
415        let output = TaskOutput::dry_run("test".to_string());
416        assert!(output.success);
417        assert!(!output.from_cache);
418        assert_eq!(output.duration_ms, 0);
419    }
420
421    #[tokio::test]
422    #[ignore = "requires /bin/bash which may not exist in sandboxed builds"]
423    async fn test_custom_shell() {
424        let tmp = TempDir::new().unwrap();
425        // Use /bin/bash (available on most Unix systems)
426        let runner = IRTaskRunner::with_shell(tmp.path().to_path_buf(), true.into(), "/bin/bash");
427        let task = make_task("test", &["echo", "$BASH_VERSION"], true);
428
429        let result = runner.execute(&task, BTreeMap::new()).await.unwrap();
430
431        // On systems with bash, this should succeed and output something
432        assert!(result.success);
433    }
434
435    #[test]
436    fn test_runner_default_shell() {
437        let tmp = TempDir::new().unwrap();
438        let runner = IRTaskRunner::new(tmp.path().to_path_buf(), true.into());
439        assert_eq!(runner.shell_path, "/bin/sh");
440    }
441}