Skip to main content

ironflow_engine/executor/
shell.rs

1//! Shell step executor.
2
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use rust_decimal::Decimal;
7use serde_json::json;
8use tracing::info;
9
10use ironflow_core::operations::shell::Shell;
11use ironflow_core::provider::AgentProvider;
12
13use crate::config::ShellConfig;
14use crate::error::EngineError;
15
16use super::{StepExecutor, StepOutput};
17
18/// Executor for shell steps.
19///
20/// Runs a shell command and captures stdout, stderr, and exit code.
21pub struct ShellExecutor<'a> {
22    config: &'a ShellConfig,
23}
24
25impl<'a> ShellExecutor<'a> {
26    /// Create a new shell executor from a config reference.
27    pub fn new(config: &'a ShellConfig) -> Self {
28        Self { config }
29    }
30}
31
32impl StepExecutor for ShellExecutor<'_> {
33    async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
34        let start = Instant::now();
35
36        let mut shell = Shell::new(&self.config.command);
37        if let Some(secs) = self.config.timeout_secs {
38            shell = shell.timeout(Duration::from_secs(secs));
39        }
40        if let Some(ref dir) = self.config.dir {
41            shell = shell.dir(dir);
42        }
43        for (key, value) in &self.config.env {
44            shell = shell.env(key, value);
45        }
46        if self.config.clean_env {
47            shell = shell.clean_env();
48        }
49
50        let output = shell.run().await?;
51        let duration_ms = start.elapsed().as_millis() as u64;
52
53        info!(
54            step_kind = "shell",
55            command = %self.config.command,
56            exit_code = output.exit_code(),
57            duration_ms,
58            "shell step completed"
59        );
60
61        #[cfg(feature = "prometheus")]
62        {
63            use ironflow_core::metric_names::{
64                SHELL_DURATION_SECONDS, SHELL_TOTAL, STATUS_SUCCESS,
65            };
66            use metrics::{counter, histogram};
67            counter!(SHELL_TOTAL, "status" => STATUS_SUCCESS).increment(1);
68            histogram!(SHELL_DURATION_SECONDS).record(duration_ms as f64 / 1000.0);
69        }
70
71        Ok(StepOutput {
72            output: json!({
73                "stdout": output.stdout(),
74                "stderr": output.stderr(),
75                "exit_code": output.exit_code(),
76            }),
77            duration_ms,
78            cost_usd: Decimal::ZERO,
79            input_tokens: None,
80            output_tokens: None,
81            debug_messages: None,
82        })
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89    use ironflow_core::providers::claude::ClaudeCodeProvider;
90    use ironflow_core::providers::record_replay::RecordReplayProvider;
91
92    fn create_test_provider() -> Arc<dyn AgentProvider> {
93        let inner = ClaudeCodeProvider::new();
94        Arc::new(RecordReplayProvider::replay(
95            inner,
96            "/tmp/ironflow-fixtures",
97        ))
98    }
99
100    #[tokio::test]
101    async fn shell_simple_command() {
102        let config = ShellConfig::new("echo hello");
103        let executor = ShellExecutor::new(&config);
104        let provider = create_test_provider();
105
106        let result = executor.execute(&provider).await;
107        assert!(result.is_ok());
108        let output = result.unwrap();
109        assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
110        assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
111    }
112
113    #[tokio::test]
114    async fn shell_nonzero_exit_returns_error() {
115        let config = ShellConfig::new("exit 1");
116        let executor = ShellExecutor::new(&config);
117        let provider = create_test_provider();
118
119        let result = executor.execute(&provider).await;
120        assert!(result.is_err());
121    }
122
123    #[tokio::test]
124    async fn shell_env_variables() {
125        let config = ShellConfig::new("echo $MY_VAR").env("MY_VAR", "test_value");
126        let executor = ShellExecutor::new(&config);
127        let provider = create_test_provider();
128
129        let result = executor.execute(&provider).await;
130        assert!(result.is_ok());
131        let output = result.unwrap();
132        assert!(
133            output.output["stdout"]
134                .as_str()
135                .unwrap()
136                .contains("test_value")
137        );
138    }
139
140    #[tokio::test]
141    async fn shell_step_output_has_structure() {
142        let config = ShellConfig::new("echo test");
143        let executor = ShellExecutor::new(&config);
144        let provider = create_test_provider();
145
146        let output = executor.execute(&provider).await.unwrap();
147        assert!(output.output.get("stdout").is_some());
148        assert!(output.output.get("stderr").is_some());
149        assert!(output.output.get("exit_code").is_some());
150        assert_eq!(output.cost_usd, Decimal::ZERO);
151        // duration_ms can be 0 when the command completes in under 1ms (CI)
152        assert!(output.duration_ms < 5000);
153    }
154
155    #[tokio::test]
156    async fn shell_command_with_pipe() {
157        let config = ShellConfig::new("echo hello | grep hello");
158        let executor = ShellExecutor::new(&config);
159        let provider = create_test_provider();
160
161        let result = executor.execute(&provider).await;
162        assert!(result.is_ok());
163        let output = result.unwrap();
164        assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
165        assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
166    }
167}