Skip to main content

ironflow_engine/executor/
shell.rs

1//! Shell step executor.
2
3use std::process::Stdio;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use rust_decimal::Decimal;
8use serde_json::json;
9use tokio::io::{AsyncBufReadExt, BufReader};
10use tokio::process::Command;
11use tokio::spawn;
12use tracing::info;
13
14use ironflow_core::error::OperationError;
15use ironflow_core::operations::shell::Shell;
16use ironflow_core::provider::AgentProvider;
17use ironflow_core::utils::truncate_output;
18
19use crate::config::ShellConfig;
20use crate::error::EngineError;
21use crate::log_sender::StepLogSender;
22use crate::notify::LogStream;
23
24use super::{StepExecutor, StepOutput};
25
26const DEFAULT_SHELL_TIMEOUT: Duration = Duration::from_secs(300);
27
28/// Read lines from an async reader, emit each line to the sender, and
29/// accumulate the full output as a single `String`.
30async fn read_and_stream<R: tokio::io::AsyncRead + Unpin>(
31    reader: R,
32    sender: StepLogSender,
33    stream: LogStream,
34) -> String {
35    let mut lines = BufReader::new(reader).lines();
36    let mut collected = String::new();
37    while let Ok(Some(line)) = lines.next_line().await {
38        sender.emit(stream, &line);
39        if !collected.is_empty() {
40            collected.push('\n');
41        }
42        collected.push_str(&line);
43    }
44    collected
45}
46
47/// Executor for shell steps.
48///
49/// Runs a shell command and captures stdout, stderr, and exit code.
50/// When a [`StepLogSender`] is attached, stdout and stderr are streamed
51/// line-by-line in real time.
52pub struct ShellExecutor<'a> {
53    config: &'a ShellConfig,
54    log_sender: Option<StepLogSender>,
55}
56
57impl<'a> ShellExecutor<'a> {
58    /// Create a new shell executor from a config reference.
59    pub fn new(config: &'a ShellConfig) -> Self {
60        Self {
61            config,
62            log_sender: None,
63        }
64    }
65
66    /// Attach a log sender for real-time line streaming.
67    pub fn with_log_sender(mut self, sender: StepLogSender) -> Self {
68        self.log_sender = Some(sender);
69        self
70    }
71}
72
73impl StepExecutor for ShellExecutor<'_> {
74    async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
75        match self.log_sender {
76            Some(ref sender) => self.execute_streaming(sender.clone()).await,
77            None => self.execute_buffered().await,
78        }
79    }
80}
81
82impl ShellExecutor<'_> {
83    /// Non-streaming execution via [`Shell::run()`].
84    async fn execute_buffered(&self) -> Result<StepOutput, EngineError> {
85        let start = Instant::now();
86
87        let mut shell = Shell::new(&self.config.command);
88        if let Some(secs) = self.config.timeout_secs {
89            shell = shell.timeout(Duration::from_secs(secs));
90        }
91        if let Some(ref dir) = self.config.dir {
92            shell = shell.dir(dir);
93        }
94        for (key, value) in &self.config.env {
95            shell = shell.env(key, value);
96        }
97        if self.config.clean_env {
98            shell = shell.clean_env();
99        }
100
101        let output = shell.run().await?;
102        let duration_ms = start.elapsed().as_millis() as u64;
103
104        info!(
105            step_kind = "shell",
106            command = %self.config.command,
107            exit_code = output.exit_code(),
108            duration_ms,
109            "shell step completed"
110        );
111
112        self.record_metrics(duration_ms);
113
114        Ok(StepOutput {
115            output: json!({
116                "stdout": output.stdout(),
117                "stderr": output.stderr(),
118                "exit_code": output.exit_code(),
119            }),
120            duration_ms,
121            cost_usd: Decimal::ZERO,
122            input_tokens: None,
123            output_tokens: None,
124            model: None,
125            debug_messages: None,
126        })
127    }
128
129    /// Streaming execution: reads stdout/stderr line-by-line and forwards
130    /// each line to the [`StepLogSender`] in real time.
131    async fn execute_streaming(&self, sender: StepLogSender) -> Result<StepOutput, EngineError> {
132        let start = Instant::now();
133
134        let mut cmd = Command::new("sh");
135        cmd.arg("-c").arg(&self.config.command);
136        cmd.stdout(Stdio::piped())
137            .stderr(Stdio::piped())
138            .kill_on_drop(true);
139
140        if self.config.clean_env {
141            cmd.env_clear();
142        }
143        if let Some(ref dir) = self.config.dir {
144            cmd.current_dir(dir);
145        }
146        for (key, value) in &self.config.env {
147            cmd.env(key, value);
148        }
149
150        let mut child = cmd.spawn().map_err(|e| {
151            EngineError::Operation(OperationError::Shell {
152                exit_code: -1,
153                stderr: format!("failed to spawn shell: {e}"),
154            })
155        })?;
156
157        let stdout_pipe = child.stdout.take().expect("stdout piped");
158        let stderr_pipe = child.stderr.take().expect("stderr piped");
159
160        let stdout_task = spawn(read_and_stream(
161            stdout_pipe,
162            sender.clone(),
163            LogStream::Stdout,
164        ));
165        let stderr_task = spawn(read_and_stream(stderr_pipe, sender, LogStream::Stderr));
166
167        let timeout_dur = self
168            .config
169            .timeout_secs
170            .map(Duration::from_secs)
171            .unwrap_or(DEFAULT_SHELL_TIMEOUT);
172
173        let status = match tokio::time::timeout(timeout_dur, child.wait()).await {
174            Ok(Ok(status)) => status,
175            Ok(Err(e)) => {
176                return Err(EngineError::Operation(OperationError::Shell {
177                    exit_code: -1,
178                    stderr: format!("failed to wait for shell: {e}"),
179                }));
180            }
181            Err(_) => {
182                child.kill().await.ok();
183                return Err(EngineError::Operation(OperationError::Timeout {
184                    step: self.config.command.clone(),
185                    limit: timeout_dur,
186                }));
187            }
188        };
189
190        let raw_stdout = stdout_task.await.unwrap_or_default();
191        let raw_stderr = stderr_task.await.unwrap_or_default();
192
193        let stdout = truncate_output(raw_stdout.as_bytes(), "shell stdout");
194        let stderr = truncate_output(raw_stderr.as_bytes(), "shell stderr");
195
196        let exit_code = status.code().unwrap_or(-1);
197        let duration_ms = start.elapsed().as_millis() as u64;
198
199        info!(
200            step_kind = "shell",
201            command = %self.config.command,
202            exit_code,
203            duration_ms,
204            streaming = true,
205            "shell step completed"
206        );
207
208        self.record_metrics(duration_ms);
209
210        if exit_code != 0 {
211            return Err(EngineError::Operation(OperationError::Shell {
212                exit_code,
213                stderr: stderr.clone(),
214            }));
215        }
216
217        Ok(StepOutput {
218            output: json!({
219                "stdout": stdout,
220                "stderr": stderr,
221                "exit_code": exit_code,
222            }),
223            duration_ms,
224            cost_usd: Decimal::ZERO,
225            input_tokens: None,
226            output_tokens: None,
227            model: None,
228            debug_messages: None,
229        })
230    }
231
232    #[allow(unused_variables)]
233    fn record_metrics(&self, duration_ms: u64) {
234        #[cfg(feature = "prometheus")]
235        {
236            use ironflow_core::metric_names::{
237                SHELL_DURATION_SECONDS, SHELL_TOTAL, STATUS_SUCCESS,
238            };
239            use metrics::{counter, histogram};
240            counter!(SHELL_TOTAL, "status" => STATUS_SUCCESS).increment(1);
241            histogram!(SHELL_DURATION_SECONDS).record(duration_ms as f64 / 1000.0);
242        }
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use ironflow_core::providers::claude::ClaudeCodeProvider;
250    use ironflow_core::providers::record_replay::RecordReplayProvider;
251
252    fn create_test_provider() -> Arc<dyn AgentProvider> {
253        let inner = ClaudeCodeProvider::new();
254        Arc::new(RecordReplayProvider::replay(
255            inner,
256            "/tmp/ironflow-fixtures",
257        ))
258    }
259
260    #[tokio::test]
261    async fn shell_simple_command() {
262        let config = ShellConfig::new("echo hello");
263        let executor = ShellExecutor::new(&config);
264        let provider = create_test_provider();
265
266        let result = executor.execute(&provider).await;
267        assert!(result.is_ok());
268        let output = result.unwrap();
269        assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
270        assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
271    }
272
273    #[tokio::test]
274    async fn shell_nonzero_exit_returns_error() {
275        let config = ShellConfig::new("exit 1");
276        let executor = ShellExecutor::new(&config);
277        let provider = create_test_provider();
278
279        let result = executor.execute(&provider).await;
280        assert!(result.is_err());
281    }
282
283    #[tokio::test]
284    async fn shell_env_variables() {
285        let config = ShellConfig::new("echo $MY_VAR").env("MY_VAR", "test_value");
286        let executor = ShellExecutor::new(&config);
287        let provider = create_test_provider();
288
289        let result = executor.execute(&provider).await;
290        assert!(result.is_ok());
291        let output = result.unwrap();
292        assert!(
293            output.output["stdout"]
294                .as_str()
295                .unwrap()
296                .contains("test_value")
297        );
298    }
299
300    #[tokio::test]
301    async fn shell_step_output_has_structure() {
302        let config = ShellConfig::new("echo test");
303        let executor = ShellExecutor::new(&config);
304        let provider = create_test_provider();
305
306        let output = executor.execute(&provider).await.unwrap();
307        assert!(output.output.get("stdout").is_some());
308        assert!(output.output.get("stderr").is_some());
309        assert!(output.output.get("exit_code").is_some());
310        assert_eq!(output.cost_usd, Decimal::ZERO);
311        assert!(output.duration_ms < 5000);
312    }
313
314    #[tokio::test]
315    async fn shell_command_with_pipe() {
316        let config = ShellConfig::new("echo hello | grep hello");
317        let executor = ShellExecutor::new(&config);
318        let provider = create_test_provider();
319
320        let result = executor.execute(&provider).await;
321        assert!(result.is_ok());
322        let output = result.unwrap();
323        assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
324        assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
325    }
326
327    #[tokio::test]
328    async fn shell_streaming_emits_lines() {
329        let config = ShellConfig::new("echo line1 && echo line2");
330        let (sender, mut receiver) = crate::log_sender::channel();
331        let step_sender = StepLogSender::new(
332            sender,
333            uuid::Uuid::now_v7(),
334            uuid::Uuid::now_v7(),
335            "test".to_string(),
336        );
337        let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
338        let provider = create_test_provider();
339
340        let result = executor.execute(&provider).await;
341        assert!(result.is_ok());
342
343        let output = result.unwrap();
344        assert!(output.output["stdout"].as_str().unwrap().contains("line1"));
345        assert!(output.output["stdout"].as_str().unwrap().contains("line2"));
346
347        let mut lines = Vec::new();
348        while let Ok(line) = receiver.try_recv() {
349            lines.push(line);
350        }
351        assert!(lines.len() >= 2);
352        assert_eq!(lines[0].stream, LogStream::Stdout);
353        assert_eq!(lines[0].line, "line1");
354        assert_eq!(lines[1].line, "line2");
355    }
356
357    #[tokio::test]
358    async fn shell_streaming_captures_stderr() {
359        let config = ShellConfig::new("echo err >&2");
360        let (sender, mut receiver) = crate::log_sender::channel();
361        let step_sender = StepLogSender::new(
362            sender,
363            uuid::Uuid::now_v7(),
364            uuid::Uuid::now_v7(),
365            "test".to_string(),
366        );
367        let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
368        let provider = create_test_provider();
369
370        let result = executor.execute(&provider).await;
371        assert!(result.is_ok());
372
373        let mut stderr_lines = Vec::new();
374        while let Ok(line) = receiver.try_recv() {
375            if line.stream == LogStream::Stderr {
376                stderr_lines.push(line);
377            }
378        }
379        assert!(!stderr_lines.is_empty());
380        assert_eq!(stderr_lines[0].line, "err");
381    }
382
383    #[tokio::test]
384    async fn shell_streaming_nonzero_exit_returns_error() {
385        let config = ShellConfig::new("exit 42");
386        let (sender, _receiver) = crate::log_sender::channel();
387        let step_sender = StepLogSender::new(
388            sender,
389            uuid::Uuid::now_v7(),
390            uuid::Uuid::now_v7(),
391            "test".to_string(),
392        );
393        let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
394        let provider = create_test_provider();
395
396        let result = executor.execute(&provider).await;
397        assert!(result.is_err());
398    }
399}