ironflow_engine/executor/
shell.rs1use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use rust_decimal::Decimal;
7use serde_json::json;
8use tracing::info;
9
10use ironflow_core::operations::shell::Shell;
11use ironflow_core::provider::AgentProvider;
12
13use crate::config::ShellConfig;
14use crate::error::EngineError;
15
16use super::{StepExecutor, StepOutput};
17
18pub struct ShellExecutor<'a> {
22 config: &'a ShellConfig,
23}
24
25impl<'a> ShellExecutor<'a> {
26 pub fn new(config: &'a ShellConfig) -> Self {
28 Self { config }
29 }
30}
31
32impl StepExecutor for ShellExecutor<'_> {
33 async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
34 let start = Instant::now();
35
36 let mut shell = Shell::new(&self.config.command);
37 if let Some(secs) = self.config.timeout_secs {
38 shell = shell.timeout(Duration::from_secs(secs));
39 }
40 if let Some(ref dir) = self.config.dir {
41 shell = shell.dir(dir);
42 }
43 for (key, value) in &self.config.env {
44 shell = shell.env(key, value);
45 }
46 if self.config.clean_env {
47 shell = shell.clean_env();
48 }
49
50 let output = shell.run().await?;
51 let duration_ms = start.elapsed().as_millis() as u64;
52
53 info!(
54 step_kind = "shell",
55 command = %self.config.command,
56 exit_code = output.exit_code(),
57 duration_ms,
58 "shell step completed"
59 );
60
61 #[cfg(feature = "prometheus")]
62 {
63 use ironflow_core::metric_names::{
64 SHELL_DURATION_SECONDS, SHELL_TOTAL, STATUS_SUCCESS,
65 };
66 use metrics::{counter, histogram};
67 counter!(SHELL_TOTAL, "status" => STATUS_SUCCESS).increment(1);
68 histogram!(SHELL_DURATION_SECONDS).record(duration_ms as f64 / 1000.0);
69 }
70
71 Ok(StepOutput {
72 output: json!({
73 "stdout": output.stdout(),
74 "stderr": output.stderr(),
75 "exit_code": output.exit_code(),
76 }),
77 duration_ms,
78 cost_usd: Decimal::ZERO,
79 input_tokens: None,
80 output_tokens: None,
81 debug_messages: None,
82 })
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89 use ironflow_core::providers::claude::ClaudeCodeProvider;
90 use ironflow_core::providers::record_replay::RecordReplayProvider;
91
92 fn create_test_provider() -> Arc<dyn AgentProvider> {
93 let inner = ClaudeCodeProvider::new();
94 Arc::new(RecordReplayProvider::replay(
95 inner,
96 "/tmp/ironflow-fixtures",
97 ))
98 }
99
100 #[tokio::test]
101 async fn shell_simple_command() {
102 let config = ShellConfig::new("echo hello");
103 let executor = ShellExecutor::new(&config);
104 let provider = create_test_provider();
105
106 let result = executor.execute(&provider).await;
107 assert!(result.is_ok());
108 let output = result.unwrap();
109 assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
110 assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
111 }
112
113 #[tokio::test]
114 async fn shell_nonzero_exit_returns_error() {
115 let config = ShellConfig::new("exit 1");
116 let executor = ShellExecutor::new(&config);
117 let provider = create_test_provider();
118
119 let result = executor.execute(&provider).await;
120 assert!(result.is_err());
121 }
122
123 #[tokio::test]
124 async fn shell_env_variables() {
125 let config = ShellConfig::new("echo $MY_VAR").env("MY_VAR", "test_value");
126 let executor = ShellExecutor::new(&config);
127 let provider = create_test_provider();
128
129 let result = executor.execute(&provider).await;
130 assert!(result.is_ok());
131 let output = result.unwrap();
132 assert!(
133 output.output["stdout"]
134 .as_str()
135 .unwrap()
136 .contains("test_value")
137 );
138 }
139
140 #[tokio::test]
141 async fn shell_step_output_has_structure() {
142 let config = ShellConfig::new("echo test");
143 let executor = ShellExecutor::new(&config);
144 let provider = create_test_provider();
145
146 let output = executor.execute(&provider).await.unwrap();
147 assert!(output.output.get("stdout").is_some());
148 assert!(output.output.get("stderr").is_some());
149 assert!(output.output.get("exit_code").is_some());
150 assert_eq!(output.cost_usd, Decimal::ZERO);
151 assert!(output.duration_ms < 5000);
153 }
154
155 #[tokio::test]
156 async fn shell_command_with_pipe() {
157 let config = ShellConfig::new("echo hello | grep hello");
158 let executor = ShellExecutor::new(&config);
159 let provider = create_test_provider();
160
161 let result = executor.execute(&provider).await;
162 assert!(result.is_ok());
163 let output = result.unwrap();
164 assert_eq!(output.output["exit_code"].as_i64().unwrap(), 0);
165 assert!(output.output["stdout"].as_str().unwrap().contains("hello"));
166 }
167}