ironflow_engine/executor/
http.rs1use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use rust_decimal::Decimal;
7use serde_json::json;
8use tracing::info;
9
10use ironflow_core::operations::http::Http;
11use ironflow_core::provider::AgentProvider;
12
13use crate::config::HttpConfig;
14use crate::error::EngineError;
15
16use super::{StepExecutor, StepOutput};
17
18pub struct HttpExecutor<'a> {
22 config: &'a HttpConfig,
23}
24
25impl<'a> HttpExecutor<'a> {
26 pub fn new(config: &'a HttpConfig) -> Self {
28 Self { config }
29 }
30}
31
32impl StepExecutor for HttpExecutor<'_> {
33 async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
34 let start = Instant::now();
35
36 let mut http = match self.config.method.to_uppercase().as_str() {
37 "GET" => Http::get(&self.config.url),
38 "POST" => Http::post(&self.config.url),
39 "PUT" => Http::put(&self.config.url),
40 "PATCH" => Http::patch(&self.config.url),
41 "DELETE" => Http::delete(&self.config.url),
42 other => {
43 return Err(EngineError::StepConfig(format!(
44 "unsupported HTTP method: {other}"
45 )));
46 }
47 };
48
49 for (name, value) in &self.config.headers {
50 http = http.header(name, value);
51 }
52 if let Some(ref body) = self.config.body {
53 http = http.json(body.clone());
54 }
55 if let Some(secs) = self.config.timeout_secs {
56 http = http.timeout(Duration::from_secs(secs));
57 }
58
59 let output = http.run().await?;
60 let duration_ms = start.elapsed().as_millis() as u64;
61
62 info!(
63 step_kind = "http",
64 method = %self.config.method,
65 url = %self.config.url,
66 status = output.status(),
67 duration_ms,
68 "http step completed"
69 );
70
71 #[cfg(feature = "prometheus")]
72 {
73 use ironflow_core::metric_names::{HTTP_DURATION_SECONDS, HTTP_TOTAL, STATUS_SUCCESS};
74 use metrics::{counter, histogram};
75 counter!(HTTP_TOTAL, "method" => self.config.method.clone(), "status" => STATUS_SUCCESS).increment(1);
76 histogram!(HTTP_DURATION_SECONDS).record(duration_ms as f64 / 1000.0);
77 }
78
79 Ok(StepOutput {
80 output: json!({
81 "status": output.status(),
82 "headers": output.headers(),
83 "body": output.body(),
84 }),
85 duration_ms,
86 cost_usd: Decimal::ZERO,
87 input_tokens: None,
88 output_tokens: None,
89 })
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use super::*;
96 use axum::Router;
97 use axum::routing::{delete, get, patch, post, put};
98 use ironflow_core::providers::claude::ClaudeCodeProvider;
99 use ironflow_core::providers::record_replay::RecordReplayProvider;
100 use tokio::net::TcpListener;
101
102 fn create_test_provider() -> Arc<dyn AgentProvider> {
103 let inner = ClaudeCodeProvider::new();
104 Arc::new(RecordReplayProvider::replay(
105 inner,
106 "/tmp/ironflow-fixtures",
107 ))
108 }
109
110 async fn start_test_server() -> String {
111 let app = Router::new()
112 .route("/status/200", get(|| async { "ok" }))
113 .route("/post", post(|| async { "ok" }))
114 .route("/put", put(|| async { "ok" }))
115 .route("/patch", patch(|| async { "ok" }))
116 .route("/delete", delete(|| async { "ok" }))
117 .route("/headers", get(|| async { "ok" }));
118
119 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
120 let port = listener.local_addr().unwrap().port();
121 tokio::spawn(async move {
122 axum::serve(listener, app).await.unwrap();
123 });
124 format!("http://localhost:{port}")
125 }
126
127 #[tokio::test]
128 async fn http_get_method() {
129 let base = start_test_server().await;
130 let config = HttpConfig::get(&format!("{base}/status/200"));
131 let executor = HttpExecutor::new(&config);
132 let provider = create_test_provider();
133
134 let result = executor.execute(&provider).await;
135 assert!(result.is_ok());
136 let output = result.unwrap();
137 assert!(output.output.get("status").is_some());
138 assert!(output.output.get("headers").is_some());
139 assert!(output.output.get("body").is_some());
140 }
141
142 #[tokio::test]
143 async fn http_post_method() {
144 let base = start_test_server().await;
145 let config = HttpConfig::post(&format!("{base}/post"));
146 let executor = HttpExecutor::new(&config);
147 let provider = create_test_provider();
148
149 let result = executor.execute(&provider).await;
150 assert!(result.is_ok());
151 }
152
153 #[tokio::test]
154 async fn http_put_method() {
155 let base = start_test_server().await;
156 let config = HttpConfig::put(&format!("{base}/put"));
157 let executor = HttpExecutor::new(&config);
158 let provider = create_test_provider();
159
160 let result = executor.execute(&provider).await;
161 assert!(result.is_ok());
162 }
163
164 #[tokio::test]
165 async fn http_patch_method() {
166 let base = start_test_server().await;
167 let config = HttpConfig::patch(&format!("{base}/patch"));
168 let executor = HttpExecutor::new(&config);
169 let provider = create_test_provider();
170
171 let result = executor.execute(&provider).await;
172 assert!(result.is_ok());
173 }
174
175 #[tokio::test]
176 async fn http_delete_method() {
177 let base = start_test_server().await;
178 let config = HttpConfig::delete(&format!("{base}/delete"));
179 let executor = HttpExecutor::new(&config);
180 let provider = create_test_provider();
181
182 let result = executor.execute(&provider).await;
183 assert!(result.is_ok());
184 }
185
186 #[tokio::test]
187 async fn http_unsupported_method_returns_error() {
188 let base = start_test_server().await;
189 let mut config = HttpConfig::get(&format!("{base}/status/200"));
190 config.method = "INVALID".to_string();
191 let executor = HttpExecutor::new(&config);
192 let provider = create_test_provider();
193
194 let result = executor.execute(&provider).await;
195 assert!(result.is_err());
196 match result {
197 Err(EngineError::StepConfig(msg)) => {
198 assert!(msg.contains("unsupported HTTP method"));
199 }
200 _ => panic!("expected StepConfig error"),
201 }
202 }
203
204 #[tokio::test]
205 async fn http_with_custom_headers() {
206 let base = start_test_server().await;
207 let config = HttpConfig::get(&format!("{base}/headers"))
208 .header("X-Custom-Header", "test-value")
209 .header("Authorization", "Bearer token");
210 let executor = HttpExecutor::new(&config);
211 let provider = create_test_provider();
212
213 let result = executor.execute(&provider).await;
214 assert!(result.is_ok());
215 }
216
217 #[tokio::test]
218 async fn http_with_json_body() {
219 let base = start_test_server().await;
220 let config =
221 HttpConfig::post(&format!("{base}/post")).json(json!({"key": "value", "number": 42}));
222 let executor = HttpExecutor::new(&config);
223 let provider = create_test_provider();
224
225 let result = executor.execute(&provider).await;
226 assert!(result.is_ok());
227 }
228
229 #[tokio::test]
230 async fn http_step_output_has_structure() {
231 let base = start_test_server().await;
232 let config = HttpConfig::get(&format!("{base}/status/200"));
233 let executor = HttpExecutor::new(&config);
234 let provider = create_test_provider();
235
236 let output = executor.execute(&provider).await.unwrap();
237 assert!(output.output.get("status").is_some());
238 assert!(output.output.get("headers").is_some());
239 assert!(output.output.get("body").is_some());
240 assert_eq!(output.cost_usd, Decimal::ZERO);
241 }
242}