Skip to main content

ironflow_engine/executor/
shell.rs

1//! Shell step executor.
2
3use std::process::Stdio;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use rust_decimal::Decimal;
8use serde_json::json;
9use tokio::io::{AsyncBufReadExt, BufReader};
10use tokio::process::Command;
11use tokio::spawn;
12use tracing::info;
13
14use ironflow_core::error::OperationError;
15use ironflow_core::operations::shell::Shell;
16use ironflow_core::provider::AgentProvider;
17use ironflow_core::utils::truncate_output;
18
19use crate::config::ShellConfig;
20use crate::error::EngineError;
21use crate::log_sender::StepLogSender;
22use crate::notify::LogStream;
23
24use super::{StepExecutor, StepOutput};
25
26const DEFAULT_SHELL_TIMEOUT: Duration = Duration::from_secs(300);
27
28/// Read lines from an async reader, emit each line to the sender, and
29/// accumulate the full output as a single `String`.
30async fn read_and_stream<R: tokio::io::AsyncRead + Unpin>(
31    reader: R,
32    sender: StepLogSender,
33    stream: LogStream,
34) -> String {
35    let mut lines = BufReader::new(reader).lines();
36    let mut collected = String::new();
37    while let Ok(Some(line)) = lines.next_line().await {
38        sender.emit(stream, &line);
39        if !collected.is_empty() {
40            collected.push('\n');
41        }
42        collected.push_str(&line);
43    }
44    collected
45}
46
47/// Executor for shell steps.
48///
49/// Runs a shell command and captures stdout, stderr, and exit code.
50/// When a [`StepLogSender`] is attached, stdout and stderr are streamed
51/// line-by-line in real time.
52pub struct ShellExecutor<'a> {
53    config: &'a ShellConfig,
54    log_sender: Option<StepLogSender>,
55}
56
57impl<'a> ShellExecutor<'a> {
58    /// Create a new shell executor from a config reference.
59    pub fn new(config: &'a ShellConfig) -> Self {
60        Self {
61            config,
62            log_sender: None,
63        }
64    }
65
66    /// Attach a log sender for real-time line streaming.
67    pub fn with_log_sender(mut self, sender: StepLogSender) -> Self {
68        self.log_sender = Some(sender);
69        self
70    }
71}
72
73impl StepExecutor for ShellExecutor<'_> {
74    async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
75        match self.log_sender {
76            Some(ref sender) => self.execute_streaming(sender.clone()).await,
77            None => self.execute_buffered().await,
78        }
79    }
80}
81
82impl ShellExecutor<'_> {
83    /// Non-streaming execution via [`Shell::run()`].
84    async fn execute_buffered(&self) -> Result<StepOutput, EngineError> {
85        let start = Instant::now();
86
87        let mut shell = Shell::new(&self.config.command);
88        if let Some(secs) = self.config.timeout_secs {
89            shell = shell.timeout(Duration::from_secs(secs));
90        }
91        if let Some(ref dir) = self.config.dir {
92            shell = shell.dir(dir);
93        }
94        for (key, value) in &self.config.env {
95            shell = shell.env(key, value);
96        }
97        if self.config.clean_env {
98            shell = shell.clean_env();
99        }
100
101        let output = shell.run().await?;
102        let duration_ms = start.elapsed().as_millis() as u64;
103
104        info!(
105            step_kind = "shell",
106            command = %self.config.command,
107            exit_code = output.exit_code(),
108            duration_ms,
109            "shell step completed"
110        );
111
112        self.record_metrics(duration_ms);
113
114        Ok(StepOutput {
115            output: json!({
116                "stdout": output.stdout(),
117                "stderr": output.stderr(),
118                "exit_code": output.exit_code(),
119            }),
120            duration_ms,
121            cost_usd: Decimal::ZERO,
122            input_tokens: None,
123            output_tokens: None,
124            debug_messages: None,
125        })
126    }
127
128    /// Streaming execution: reads stdout/stderr line-by-line and forwards
129    /// each line to the [`StepLogSender`] in real time.
130    async fn execute_streaming(&self, sender: StepLogSender) -> Result<StepOutput, EngineError> {
131        let start = Instant::now();
132
133        let mut cmd = Command::new("sh");
134        cmd.arg("-c").arg(&self.config.command);
135        cmd.stdout(Stdio::piped())
136            .stderr(Stdio::piped())
137            .kill_on_drop(true);
138
139        if self.config.clean_env {
140            cmd.env_clear();
141        }
142        if let Some(ref dir) = self.config.dir {
143            cmd.current_dir(dir);
144        }
145        for (key, value) in &self.config.env {
146            cmd.env(key, value);
147        }
148
149        let mut child = cmd.spawn().map_err(|e| {
150            EngineError::Operation(OperationError::Shell {
151                exit_code: -1,
152                stderr: format!("failed to spawn shell: {e}"),
153            })
154        })?;
155
156        let stdout_pipe = child.stdout.take().expect("stdout piped");
157        let stderr_pipe = child.stderr.take().expect("stderr piped");
158
159        let stdout_task = spawn(read_and_stream(
160            stdout_pipe,
161            sender.clone(),
162            LogStream::Stdout,
163        ));
164        let stderr_task = spawn(read_and_stream(stderr_pipe, sender, LogStream::Stderr));
165
166        let timeout_dur = self
167            .config
168            .timeout_secs
169            .map(Duration::from_secs)
170            .unwrap_or(DEFAULT_SHELL_TIMEOUT);
171
172        let status = match tokio::time::timeout(timeout_dur, child.wait()).await {
173            Ok(Ok(status)) => status,
174            Ok(Err(e)) => {
175                return Err(EngineError::Operation(OperationError::Shell {
176                    exit_code: -1,
177                    stderr: format!("failed to wait for shell: {e}"),
178                }));
179            }
180            Err(_) => {
181                child.kill().await.ok();
182                return Err(EngineError::Operation(OperationError::Timeout {
183                    step: self.config.command.clone(),
184                    limit: timeout_dur,
185                }));
186            }
187        };
188
189        let raw_stdout = stdout_task.await.unwrap_or_default();
190        let raw_stderr = stderr_task.await.unwrap_or_default();
191
192        let stdout = truncate_output(raw_stdout.as_bytes(), "shell stdout");
193        let stderr = truncate_output(raw_stderr.as_bytes(), "shell stderr");
194
195        let exit_code = status.code().unwrap_or(-1);
196        let duration_ms = start.elapsed().as_millis() as u64;
197
198        info!(
199            step_kind = "shell",
200            command = %self.config.command,
201            exit_code,
202            duration_ms,
203            streaming = true,
204            "shell step completed"
205        );
206
207        self.record_metrics(duration_ms);
208
209        if exit_code != 0 {
210            return Err(EngineError::Operation(OperationError::Shell {
211                exit_code,
212                stderr: stderr.clone(),
213            }));
214        }
215
216        Ok(StepOutput {
217            output: json!({
218                "stdout": stdout,
219                "stderr": stderr,
220                "exit_code": exit_code,
221            }),
222            duration_ms,
223            cost_usd: Decimal::ZERO,
224            input_tokens: None,
225            output_tokens: None,
226            debug_messages: None,
227        })
228    }
229
230    #[allow(unused_variables)]
231    fn record_metrics(&self, duration_ms: u64) {
232        #[cfg(feature = "prometheus")]
233        {
234            use ironflow_core::metric_names::{
235                SHELL_DURATION_SECONDS, SHELL_TOTAL, STATUS_SUCCESS,
236            };
237            use metrics::{counter, histogram};
238            counter!(SHELL_TOTAL, "status" => STATUS_SUCCESS).increment(1);
239            histogram!(SHELL_DURATION_SECONDS).record(duration_ms as f64 / 1000.0);
240        }
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use ironflow_core::providers::claude::ClaudeCodeProvider;
248    use ironflow_core::providers::record_replay::RecordReplayProvider;
249
250    fn create_test_provider() -> Arc<dyn AgentProvider> {
251        let inner = ClaudeCodeProvider::new();
252        Arc::new(RecordReplayProvider::replay(
253            inner,
254            "/tmp/ironflow-fixtures",
255        ))
256    }
257
258    #[tokio::test]
259    async fn shell_simple_command() {
260        let config = ShellConfig::new("echo hello");
261        let executor = ShellExecutor::new(&config);
262        let provider = create_test_provider();
263
264        let result = executor.execute(&provider).await;
265        assert!(result.is_ok());
266        let output = result.unwrap();
267        assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
268        assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
269    }
270
271    #[tokio::test]
272    async fn shell_nonzero_exit_returns_error() {
273        let config = ShellConfig::new("exit 1");
274        let executor = ShellExecutor::new(&config);
275        let provider = create_test_provider();
276
277        let result = executor.execute(&provider).await;
278        assert!(result.is_err());
279    }
280
281    #[tokio::test]
282    async fn shell_env_variables() {
283        let config = ShellConfig::new("echo $MY_VAR").env("MY_VAR", "test_value");
284        let executor = ShellExecutor::new(&config);
285        let provider = create_test_provider();
286
287        let result = executor.execute(&provider).await;
288        assert!(result.is_ok());
289        let output = result.unwrap();
290        assert!(
291            output.output["stdout"]
292                .as_str()
293                .unwrap()
294                .contains("test_value")
295        );
296    }
297
298    #[tokio::test]
299    async fn shell_step_output_has_structure() {
300        let config = ShellConfig::new("echo test");
301        let executor = ShellExecutor::new(&config);
302        let provider = create_test_provider();
303
304        let output = executor.execute(&provider).await.unwrap();
305        assert!(output.output.get("stdout").is_some());
306        assert!(output.output.get("stderr").is_some());
307        assert!(output.output.get("exit_code").is_some());
308        assert_eq!(output.cost_usd, Decimal::ZERO);
309        assert!(output.duration_ms < 5000);
310    }
311
312    #[tokio::test]
313    async fn shell_command_with_pipe() {
314        let config = ShellConfig::new("echo hello | grep hello");
315        let executor = ShellExecutor::new(&config);
316        let provider = create_test_provider();
317
318        let result = executor.execute(&provider).await;
319        assert!(result.is_ok());
320        let output = result.unwrap();
321        assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
322        assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
323    }
324
325    #[tokio::test]
326    async fn shell_streaming_emits_lines() {
327        let config = ShellConfig::new("echo line1 && echo line2");
328        let (sender, mut receiver) = crate::log_sender::channel();
329        let step_sender = StepLogSender::new(
330            sender,
331            uuid::Uuid::now_v7(),
332            uuid::Uuid::now_v7(),
333            "test".to_string(),
334        );
335        let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
336        let provider = create_test_provider();
337
338        let result = executor.execute(&provider).await;
339        assert!(result.is_ok());
340
341        let output = result.unwrap();
342        assert!(output.output["stdout"].as_str().unwrap().contains("line1"));
343        assert!(output.output["stdout"].as_str().unwrap().contains("line2"));
344
345        let mut lines = Vec::new();
346        while let Ok(line) = receiver.try_recv() {
347            lines.push(line);
348        }
349        assert!(lines.len() >= 2);
350        assert_eq!(lines[0].stream, LogStream::Stdout);
351        assert_eq!(lines[0].line, "line1");
352        assert_eq!(lines[1].line, "line2");
353    }
354
355    #[tokio::test]
356    async fn shell_streaming_captures_stderr() {
357        let config = ShellConfig::new("echo err >&2");
358        let (sender, mut receiver) = crate::log_sender::channel();
359        let step_sender = StepLogSender::new(
360            sender,
361            uuid::Uuid::now_v7(),
362            uuid::Uuid::now_v7(),
363            "test".to_string(),
364        );
365        let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
366        let provider = create_test_provider();
367
368        let result = executor.execute(&provider).await;
369        assert!(result.is_ok());
370
371        let mut stderr_lines = Vec::new();
372        while let Ok(line) = receiver.try_recv() {
373            if line.stream == LogStream::Stderr {
374                stderr_lines.push(line);
375            }
376        }
377        assert!(!stderr_lines.is_empty());
378        assert_eq!(stderr_lines[0].line, "err");
379    }
380
381    #[tokio::test]
382    async fn shell_streaming_nonzero_exit_returns_error() {
383        let config = ShellConfig::new("exit 42");
384        let (sender, _receiver) = crate::log_sender::channel();
385        let step_sender = StepLogSender::new(
386            sender,
387            uuid::Uuid::now_v7(),
388            uuid::Uuid::now_v7(),
389            "test".to_string(),
390        );
391        let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
392        let provider = create_test_provider();
393
394        let result = executor.execute(&provider).await;
395        assert!(result.is_err());
396    }
397}