ironflow_engine/executor/
shell.rs1use std::process::Stdio;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use rust_decimal::Decimal;
8use serde_json::json;
9use tokio::io::{AsyncBufReadExt, BufReader};
10use tokio::process::Command;
11use tokio::spawn;
12use tracing::info;
13
14use ironflow_core::error::OperationError;
15use ironflow_core::operations::shell::Shell;
16use ironflow_core::provider::AgentProvider;
17use ironflow_core::utils::truncate_output;
18
19use crate::config::ShellConfig;
20use crate::error::EngineError;
21use crate::log_sender::StepLogSender;
22use crate::notify::LogStream;
23
24use super::{StepExecutor, StepOutput};
25
26const DEFAULT_SHELL_TIMEOUT: Duration = Duration::from_secs(300);
27
28async fn read_and_stream<R: tokio::io::AsyncRead + Unpin>(
31 reader: R,
32 sender: StepLogSender,
33 stream: LogStream,
34) -> String {
35 let mut lines = BufReader::new(reader).lines();
36 let mut collected = String::new();
37 while let Ok(Some(line)) = lines.next_line().await {
38 sender.emit(stream, &line);
39 if !collected.is_empty() {
40 collected.push('\n');
41 }
42 collected.push_str(&line);
43 }
44 collected
45}
46
47pub struct ShellExecutor<'a> {
53 config: &'a ShellConfig,
54 log_sender: Option<StepLogSender>,
55}
56
57impl<'a> ShellExecutor<'a> {
58 pub fn new(config: &'a ShellConfig) -> Self {
60 Self {
61 config,
62 log_sender: None,
63 }
64 }
65
66 pub fn with_log_sender(mut self, sender: StepLogSender) -> Self {
68 self.log_sender = Some(sender);
69 self
70 }
71}
72
73impl StepExecutor for ShellExecutor<'_> {
74 async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
75 match self.log_sender {
76 Some(ref sender) => self.execute_streaming(sender.clone()).await,
77 None => self.execute_buffered().await,
78 }
79 }
80}
81
82impl ShellExecutor<'_> {
83 async fn execute_buffered(&self) -> Result<StepOutput, EngineError> {
85 let start = Instant::now();
86
87 let mut shell = Shell::new(&self.config.command);
88 if let Some(secs) = self.config.timeout_secs {
89 shell = shell.timeout(Duration::from_secs(secs));
90 }
91 if let Some(ref dir) = self.config.dir {
92 shell = shell.dir(dir);
93 }
94 for (key, value) in &self.config.env {
95 shell = shell.env(key, value);
96 }
97 if self.config.clean_env {
98 shell = shell.clean_env();
99 }
100
101 let output = shell.run().await?;
102 let duration_ms = start.elapsed().as_millis() as u64;
103
104 info!(
105 step_kind = "shell",
106 command = %self.config.command,
107 exit_code = output.exit_code(),
108 duration_ms,
109 "shell step completed"
110 );
111
112 self.record_metrics(duration_ms);
113
114 Ok(StepOutput {
115 output: json!({
116 "stdout": output.stdout(),
117 "stderr": output.stderr(),
118 "exit_code": output.exit_code(),
119 }),
120 duration_ms,
121 cost_usd: Decimal::ZERO,
122 input_tokens: None,
123 output_tokens: None,
124 model: None,
125 debug_messages: None,
126 })
127 }
128
129 async fn execute_streaming(&self, sender: StepLogSender) -> Result<StepOutput, EngineError> {
132 let start = Instant::now();
133
134 let mut cmd = Command::new("sh");
135 cmd.arg("-c").arg(&self.config.command);
136 cmd.stdout(Stdio::piped())
137 .stderr(Stdio::piped())
138 .kill_on_drop(true);
139
140 if self.config.clean_env {
141 cmd.env_clear();
142 }
143 if let Some(ref dir) = self.config.dir {
144 cmd.current_dir(dir);
145 }
146 for (key, value) in &self.config.env {
147 cmd.env(key, value);
148 }
149
150 let mut child = cmd.spawn().map_err(|e| {
151 EngineError::Operation(OperationError::Shell {
152 exit_code: -1,
153 stderr: format!("failed to spawn shell: {e}"),
154 })
155 })?;
156
157 let stdout_pipe = child.stdout.take().expect("stdout piped");
158 let stderr_pipe = child.stderr.take().expect("stderr piped");
159
160 let stdout_task = spawn(read_and_stream(
161 stdout_pipe,
162 sender.clone(),
163 LogStream::Stdout,
164 ));
165 let stderr_task = spawn(read_and_stream(stderr_pipe, sender, LogStream::Stderr));
166
167 let timeout_dur = self
168 .config
169 .timeout_secs
170 .map(Duration::from_secs)
171 .unwrap_or(DEFAULT_SHELL_TIMEOUT);
172
173 let status = match tokio::time::timeout(timeout_dur, child.wait()).await {
174 Ok(Ok(status)) => status,
175 Ok(Err(e)) => {
176 return Err(EngineError::Operation(OperationError::Shell {
177 exit_code: -1,
178 stderr: format!("failed to wait for shell: {e}"),
179 }));
180 }
181 Err(_) => {
182 child.kill().await.ok();
183 return Err(EngineError::Operation(OperationError::Timeout {
184 step: self.config.command.clone(),
185 limit: timeout_dur,
186 }));
187 }
188 };
189
190 let raw_stdout = stdout_task.await.unwrap_or_default();
191 let raw_stderr = stderr_task.await.unwrap_or_default();
192
193 let stdout = truncate_output(raw_stdout.as_bytes(), "shell stdout");
194 let stderr = truncate_output(raw_stderr.as_bytes(), "shell stderr");
195
196 let exit_code = status.code().unwrap_or(-1);
197 let duration_ms = start.elapsed().as_millis() as u64;
198
199 info!(
200 step_kind = "shell",
201 command = %self.config.command,
202 exit_code,
203 duration_ms,
204 streaming = true,
205 "shell step completed"
206 );
207
208 self.record_metrics(duration_ms);
209
210 if exit_code != 0 {
211 return Err(EngineError::Operation(OperationError::Shell {
212 exit_code,
213 stderr: stderr.clone(),
214 }));
215 }
216
217 Ok(StepOutput {
218 output: json!({
219 "stdout": stdout,
220 "stderr": stderr,
221 "exit_code": exit_code,
222 }),
223 duration_ms,
224 cost_usd: Decimal::ZERO,
225 input_tokens: None,
226 output_tokens: None,
227 model: None,
228 debug_messages: None,
229 })
230 }
231
232 #[allow(unused_variables)]
233 fn record_metrics(&self, duration_ms: u64) {
234 #[cfg(feature = "prometheus")]
235 {
236 use ironflow_core::metric_names::{
237 SHELL_DURATION_SECONDS, SHELL_TOTAL, STATUS_SUCCESS,
238 };
239 use metrics::{counter, histogram};
240 counter!(SHELL_TOTAL, "status" => STATUS_SUCCESS).increment(1);
241 histogram!(SHELL_DURATION_SECONDS).record(duration_ms as f64 / 1000.0);
242 }
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use ironflow_core::providers::claude::ClaudeCodeProvider;
250 use ironflow_core::providers::record_replay::RecordReplayProvider;
251
252 fn create_test_provider() -> Arc<dyn AgentProvider> {
253 let inner = ClaudeCodeProvider::new();
254 Arc::new(RecordReplayProvider::replay(
255 inner,
256 "/tmp/ironflow-fixtures",
257 ))
258 }
259
260 #[tokio::test]
261 async fn shell_simple_command() {
262 let config = ShellConfig::new("echo hello");
263 let executor = ShellExecutor::new(&config);
264 let provider = create_test_provider();
265
266 let result = executor.execute(&provider).await;
267 assert!(result.is_ok());
268 let output = result.unwrap();
269 assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
270 assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
271 }
272
273 #[tokio::test]
274 async fn shell_nonzero_exit_returns_error() {
275 let config = ShellConfig::new("exit 1");
276 let executor = ShellExecutor::new(&config);
277 let provider = create_test_provider();
278
279 let result = executor.execute(&provider).await;
280 assert!(result.is_err());
281 }
282
283 #[tokio::test]
284 async fn shell_env_variables() {
285 let config = ShellConfig::new("echo $MY_VAR").env("MY_VAR", "test_value");
286 let executor = ShellExecutor::new(&config);
287 let provider = create_test_provider();
288
289 let result = executor.execute(&provider).await;
290 assert!(result.is_ok());
291 let output = result.unwrap();
292 assert!(
293 output.output["stdout"]
294 .as_str()
295 .unwrap()
296 .contains("test_value")
297 );
298 }
299
300 #[tokio::test]
301 async fn shell_step_output_has_structure() {
302 let config = ShellConfig::new("echo test");
303 let executor = ShellExecutor::new(&config);
304 let provider = create_test_provider();
305
306 let output = executor.execute(&provider).await.unwrap();
307 assert!(output.output.get("stdout").is_some());
308 assert!(output.output.get("stderr").is_some());
309 assert!(output.output.get("exit_code").is_some());
310 assert_eq!(output.cost_usd, Decimal::ZERO);
311 assert!(output.duration_ms < 5000);
312 }
313
314 #[tokio::test]
315 async fn shell_command_with_pipe() {
316 let config = ShellConfig::new("echo hello | grep hello");
317 let executor = ShellExecutor::new(&config);
318 let provider = create_test_provider();
319
320 let result = executor.execute(&provider).await;
321 assert!(result.is_ok());
322 let output = result.unwrap();
323 assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
324 assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
325 }
326
327 #[tokio::test]
328 async fn shell_streaming_emits_lines() {
329 let config = ShellConfig::new("echo line1 && echo line2");
330 let (sender, mut receiver) = crate::log_sender::channel();
331 let step_sender = StepLogSender::new(
332 sender,
333 uuid::Uuid::now_v7(),
334 uuid::Uuid::now_v7(),
335 "test".to_string(),
336 );
337 let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
338 let provider = create_test_provider();
339
340 let result = executor.execute(&provider).await;
341 assert!(result.is_ok());
342
343 let output = result.unwrap();
344 assert!(output.output["stdout"].as_str().unwrap().contains("line1"));
345 assert!(output.output["stdout"].as_str().unwrap().contains("line2"));
346
347 let mut lines = Vec::new();
348 while let Ok(line) = receiver.try_recv() {
349 lines.push(line);
350 }
351 assert!(lines.len() >= 2);
352 assert_eq!(lines[0].stream, LogStream::Stdout);
353 assert_eq!(lines[0].line, "line1");
354 assert_eq!(lines[1].line, "line2");
355 }
356
357 #[tokio::test]
358 async fn shell_streaming_captures_stderr() {
359 let config = ShellConfig::new("echo err >&2");
360 let (sender, mut receiver) = crate::log_sender::channel();
361 let step_sender = StepLogSender::new(
362 sender,
363 uuid::Uuid::now_v7(),
364 uuid::Uuid::now_v7(),
365 "test".to_string(),
366 );
367 let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
368 let provider = create_test_provider();
369
370 let result = executor.execute(&provider).await;
371 assert!(result.is_ok());
372
373 let mut stderr_lines = Vec::new();
374 while let Ok(line) = receiver.try_recv() {
375 if line.stream == LogStream::Stderr {
376 stderr_lines.push(line);
377 }
378 }
379 assert!(!stderr_lines.is_empty());
380 assert_eq!(stderr_lines[0].line, "err");
381 }
382
383 #[tokio::test]
384 async fn shell_streaming_nonzero_exit_returns_error() {
385 let config = ShellConfig::new("exit 42");
386 let (sender, _receiver) = crate::log_sender::channel();
387 let step_sender = StepLogSender::new(
388 sender,
389 uuid::Uuid::now_v7(),
390 uuid::Uuid::now_v7(),
391 "test".to_string(),
392 );
393 let executor = ShellExecutor::new(&config).with_log_sender(step_sender);
394 let provider = create_test_provider();
395
396 let result = executor.execute(&provider).await;
397 assert!(result.is_err());
398 }
399}