ironflow_engine/executor/
shell.rs1use std::process::Stdio;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use rust_decimal::Decimal;
8use serde_json::json;
9use tokio::io::{AsyncBufReadExt, BufReader};
10use tokio::process::Command;
11use tokio::spawn;
12use tracing::info;
13
14use ironflow_core::error::OperationError;
15use ironflow_core::operations::shell::Shell;
16use ironflow_core::provider::AgentProvider;
17use ironflow_core::utils::truncate_output;
18
19use crate::config::ShellConfig;
20use crate::error::EngineError;
21use crate::log_sender::StepLogSender;
22use crate::notify::LogStream;
23
24use super::{StepExecutor, StepOutput};
25
26const DEFAULT_SHELL_TIMEOUT: Duration = Duration::from_secs(300);
27
28async fn read_and_stream<R: tokio::io::AsyncRead + Unpin>(
31 reader: R,
32 sender: StepLogSender,
33 stream: LogStream,
34) -> String {
35 let mut lines = BufReader::new(reader).lines();
36 let mut collected = String::new();
37 while let Ok(Some(line)) = lines.next_line().await {
38 sender.emit(stream, &line);
39 if !collected.is_empty() {
40 collected.push('\n');
41 }
42 collected.push_str(&line);
43 }
44 collected
45}
46
47pub struct ShellExecutor<'a> {
53 config: &'a ShellConfig,
54 log_sender: Option<StepLogSender>,
55}
56
57impl<'a> ShellExecutor<'a> {
58 pub fn new(config: &'a ShellConfig) -> Self {
60 Self {
61 config,
62 log_sender: None,
63 }
64 }
65
66 pub fn with_log_sender(mut self, sender: StepLogSender) -> Self {
68 self.log_sender = Some(sender);
69 self
70 }
71}
72
73impl StepExecutor for ShellExecutor<'_> {
74 async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
75 match self.log_sender {
76 Some(ref sender) => self.execute_streaming(sender.clone()).await,
77 None => self.execute_buffered().await,
78 }
79 }
80}
81
82impl ShellExecutor<'_> {
83 async fn execute_buffered(&self) -> Result<StepOutput, EngineError> {
85 let start = Instant::now();
86
87 let mut shell = Shell::new(&self.config.command);
88 if let Some(secs) = self.config.timeout_secs {
89 shell = shell.timeout(Duration::from_secs(secs));
90 }
91 if let Some(ref dir) = self.config.dir {
92 shell = shell.dir(dir);
93 }
94 for (key, value) in &self.config.env {
95 shell = shell.env(key, value);
96 }
97 if self.config.clean_env {
98 shell = shell.clean_env();
99 }
100
101 let output = shell.run().await?;
102 let duration_ms = start.elapsed().as_millis() as u64;
103
104 info!(
105 step_kind = "shell",
106 command = %self.config.command,
107 exit_code = output.exit_code(),
108 duration_ms,
109 "shell step completed"
110 );
111
112 self.record_metrics(duration_ms);
113
114 Ok(StepOutput {
115 output: json!({
116 "stdout": output.stdout(),
117 "stderr": output.stderr(),
118 "exit_code": output.exit_code(),
119 }),
120 duration_ms,
121 cost_usd: Decimal::ZERO,
122 input_tokens: None,
123 output_tokens: None,
124 debug_messages: None,
125 })
126 }
127
128 async fn execute_streaming(&self, sender: StepLogSender) -> Result<StepOutput, EngineError> {
131 let start = Instant::now();
132
133 let mut cmd = Command::new("sh");
134 cmd.arg("-c").arg(&self.config.command);
135 cmd.stdout(Stdio::piped())
136 .stderr(Stdio::piped())
137 .kill_on_drop(true);
138
139 if self.config.clean_env {
140 cmd.env_clear();
141 }
142 if let Some(ref dir) = self.config.dir {
143 cmd.current_dir(dir);
144 }
145 for (key, value) in &self.config.env {
146 cmd.env(key, value);
147 }
148
149 let mut child = cmd.spawn().map_err(|e| {
150 EngineError::Operation(OperationError::Shell {
151 exit_code: -1,
152 stderr: format!("failed to spawn shell: {e}"),
153 })
154 })?;
155
156 let stdout_pipe = child.stdout.take().expect("stdout piped");
157 let stderr_pipe = child.stderr.take().expect("stderr piped");
158
159 let stdout_task = spawn(read_and_stream(
160 stdout_pipe,
161 sender.clone(),
162 LogStream::Stdout,
163 ));
164 let stderr_task = spawn(read_and_stream(stderr_pipe, sender, LogStream::Stderr));
165
166 let timeout_dur = self
167 .config
168 .timeout_secs
169 .map(Duration::from_secs)
170 .unwrap_or(DEFAULT_SHELL_TIMEOUT);
171
172 let status = match tokio::time::timeout(timeout_dur, child.wait()).await {
173 Ok(Ok(status)) => status,
174 Ok(Err(e)) => {
175 return Err(EngineError::Operation(OperationError::Shell {
176 exit_code: -1,
177 stderr: format!("failed to wait for shell: {e}"),
178 }));
179 }
180 Err(_) => {
181 child.kill().await.ok();
182 return Err(EngineError::Operation(OperationError::Timeout {
183 step: self.config.command.clone(),
184 limit: timeout_dur,
185 }));
186 }
187 };
188
189 let raw_stdout = stdout_task.await.unwrap_or_default();
190 let raw_stderr = stderr_task.await.unwrap_or_default();
191
192 let stdout = truncate_output(raw_stdout.as_bytes(), "shell stdout");
193 let stderr = truncate_output(raw_stderr.as_bytes(), "shell stderr");
194
195 let exit_code = status.code().unwrap_or(-1);
196 let duration_ms = start.elapsed().as_millis() as u64;
197
198 info!(
199 step_kind = "shell",
200 command = %self.config.command,
201 exit_code,
202 duration_ms,
203 streaming = true,
204 "shell step completed"
205 );
206
207 self.record_metrics(duration_ms);
208
209 if exit_code != 0 {
210 return Err(EngineError::Operation(OperationError::Shell {
211 exit_code,
212 stderr: stderr.clone(),
213 }));
214 }
215
216 Ok(StepOutput {
217 output: json!({
218 "stdout": stdout,
219 "stderr": stderr,
220 "exit_code": exit_code,
221 }),
222 duration_ms,
223 cost_usd: Decimal::ZERO,
224 input_tokens: None,
225 output_tokens: None,
226 debug_messages: None,
227 })
228 }
229
230 #[allow(unused_variables)]
231 fn record_metrics(&self, duration_ms: u64) {
232 #[cfg(feature = "prometheus")]
233 {
234 use ironflow_core::metric_names::{
235 SHELL_DURATION_SECONDS, SHELL_TOTAL, STATUS_SUCCESS,
236 };
237 use metrics::{counter, histogram};
238 counter!(SHELL_TOTAL, "status" => STATUS_SUCCESS).increment(1);
239 histogram!(SHELL_DURATION_SECONDS).record(duration_ms as f64 / 1000.0);
240 }
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use ironflow_core::providers::claude::ClaudeCodeProvider;
248 use ironflow_core::providers::record_replay::RecordReplayProvider;
249
250 fn create_test_provider() -> Arc<dyn AgentProvider> {
251 let inner = ClaudeCodeProvider::new();
252 Arc::new(RecordReplayProvider::replay(
253 inner,
254 "/tmp/ironflow-fixtures",
255 ))
256 }
257
258 #[tokio::test]
259 async fn shell_simple_command() {
260 let config = ShellConfig::new("echo hello");
261 let executor = ShellExecutor::new(&config);
262 let provider = create_test_provider();
263
264 let result = executor.execute(&provider).await;
265 assert!(result.is_ok());
266 let output = result.unwrap();
267 assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
268 assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
269 }
270
271 #[tokio::test]
272 async fn shell_nonzero_exit_returns_error() {
273 let config = ShellConfig::new("exit 1");
274 let executor = ShellExecutor::new(&config);
275 let provider = create_test_provider();
276
277 let result = executor.execute(&provider).await;
278 assert!(result.is_err());
279 }
280
281 #[tokio::test]
282 async fn shell_env_variables() {
283 let config = ShellConfig::new("echo $MY_VAR").env("MY_VAR", "test_value");
284 let executor = ShellExecutor::new(&config);
285 let provider = create_test_provider();
286
287 let result = executor.execute(&provider).await;
288 assert!(result.is_ok());
289 let output = result.unwrap();
290 assert!(
291 output.output["stdout"]
292 .as_str()
293 .unwrap()
294 .contains("test_value")
295 );
296 }
297
298 #[tokio::test]
299 async fn shell_step_output_has_structure() {
300 let config = ShellConfig::new("echo test");
301 let executor = ShellExecutor::new(&config);
302 let provider = create_test_provider();
303
304 let output = executor.execute(&provider).await.unwrap();
305 assert!(output.output.get("stdout").is_some());
306 assert!(output.output.get("stderr").is_some());
307 assert!(output.output.get("exit_code").is_some());
308 assert_eq!(output.cost_usd, Decimal::ZERO);
309 assert!(output.duration_ms < 5000);
310 }
311
312 #[tokio::test]
313 async fn shell_command_with_pipe() {
314 let config = ShellConfig::new("echo hello | grep hello");
315 let executor = ShellExecutor::new(&config);
316 let provider = create_test_provider();
317
318 let result = executor.execute(&provider).await;
319 assert!(result.is_ok());
320 let output = result.unwrap();
321 assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
322 assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
323 }
324
325 #[tokio::test]
326 async fn shell_streaming_emits_lines() {
327 let config = ShellConfig::new("echo line1 && echo line2");
328 let (sender, mut receiver) = crate::log_sender::channel();
329 let step_sender = StepLogSender::new(
330 sender,
331 uuid::Uuid::now_v7(),
332 uuid::Uuid::now_v7(),
333 "test".to_string(),
334 );
335 let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
336 let provider = create_test_provider();
337
338 let result = executor.execute(&provider).await;
339 assert!(result.is_ok());
340
341 let output = result.unwrap();
342 assert!(output.output["stdout"].as_str().unwrap().contains("line1"));
343 assert!(output.output["stdout"].as_str().unwrap().contains("line2"));
344
345 let mut lines = Vec::new();
346 while let Ok(line) = receiver.try_recv() {
347 lines.push(line);
348 }
349 assert!(lines.len() >= 2);
350 assert_eq!(lines[0].stream, LogStream::Stdout);
351 assert_eq!(lines[0].line, "line1");
352 assert_eq!(lines[1].line, "line2");
353 }
354
355 #[tokio::test]
356 async fn shell_streaming_captures_stderr() {
357 let config = ShellConfig::new("echo err >&2");
358 let (sender, mut receiver) = crate::log_sender::channel();
359 let step_sender = StepLogSender::new(
360 sender,
361 uuid::Uuid::now_v7(),
362 uuid::Uuid::now_v7(),
363 "test".to_string(),
364 );
365 let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
366 let provider = create_test_provider();
367
368 let result = executor.execute(&provider).await;
369 assert!(result.is_ok());
370
371 let mut stderr_lines = Vec::new();
372 while let Ok(line) = receiver.try_recv() {
373 if line.stream == LogStream::Stderr {
374 stderr_lines.push(line);
375 }
376 }
377 assert!(!stderr_lines.is_empty());
378 assert_eq!(stderr_lines[0].line, "err");
379 }
380
381 #[tokio::test]
382 async fn shell_streaming_nonzero_exit_returns_error() {
383 let config = ShellConfig::new("exit 42");
384 let (sender, _receiver) = crate::log_sender::channel();
385 let step_sender = StepLogSender::new(
386 sender,
387 uuid::Uuid::now_v7(),
388 uuid::Uuid::now_v7(),
389 "test".to_string(),
390 );
391 let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
392 let provider = create_test_provider();
393
394 let result = executor.execute(&provider).await;
395 assert!(result.is_err());
396 }
397}