ironflow_engine/executor/
http.rs1use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use rust_decimal::Decimal;
7use serde_json::json;
8use tracing::info;
9
10use ironflow_core::operations::http::Http;
11use ironflow_core::provider::AgentProvider;
12
13use crate::config::HttpConfig;
14use crate::error::EngineError;
15
16use super::{StepExecutor, StepOutput};
17
18pub struct HttpExecutor<'a> {
22 config: &'a HttpConfig,
23}
24
25impl<'a> HttpExecutor<'a> {
26 pub fn new(config: &'a HttpConfig) -> Self {
28 Self { config }
29 }
30}
31
32impl StepExecutor for HttpExecutor<'_> {
33 async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
34 let start = Instant::now();
35
36 let mut http = match self.config.method.to_uppercase().as_str() {
37 "GET" => Http::get(&self.config.url),
38 "POST" => Http::post(&self.config.url),
39 "PUT" => Http::put(&self.config.url),
40 "PATCH" => Http::patch(&self.config.url),
41 "DELETE" => Http::delete(&self.config.url),
42 other => {
43 return Err(EngineError::StepConfig(format!(
44 "unsupported HTTP method: {other}"
45 )));
46 }
47 };
48
49 for (name, value) in &self.config.headers {
50 http = http.header(name, value);
51 }
52 if let Some(ref body) = self.config.body {
53 http = http.json(body.clone());
54 }
55 if let Some(secs) = self.config.timeout_secs {
56 http = http.timeout(Duration::from_secs(secs));
57 }
58
59 let output = http.run().await?;
60 let duration_ms = start.elapsed().as_millis() as u64;
61
62 info!(
63 step_kind = "http",
64 method = %self.config.method,
65 url = %self.config.url,
66 status = output.status(),
67 duration_ms,
68 "http step completed"
69 );
70
71 #[cfg(feature = "prometheus")]
72 {
73 use ironflow_core::metric_names::{HTTP_DURATION_SECONDS, HTTP_TOTAL, STATUS_SUCCESS};
74 use metrics::{counter, histogram};
75 counter!(HTTP_TOTAL, "method" => self.config.method.clone(), "status" => STATUS_SUCCESS).increment(1);
76 histogram!(HTTP_DURATION_SECONDS).record(duration_ms as f64 / 1000.0);
77 }
78
79 Ok(StepOutput {
80 output: json!({
81 "status": output.status(),
82 "headers": output.headers(),
83 "body": output.body(),
84 }),
85 duration_ms,
86 cost_usd: Decimal::ZERO,
87 input_tokens: None,
88 output_tokens: None,
89 debug_messages: None,
90 })
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use axum::Router;
98 use axum::routing::{delete, get, patch, post, put};
99 use ironflow_core::providers::claude::ClaudeCodeProvider;
100 use ironflow_core::providers::record_replay::RecordReplayProvider;
101 use tokio::net::TcpListener;
102
103 fn create_test_provider() -> Arc<dyn AgentProvider> {
104 let inner = ClaudeCodeProvider::new();
105 Arc::new(RecordReplayProvider::replay(
106 inner,
107 "/tmp/ironflow-fixtures",
108 ))
109 }
110
111 async fn start_test_server() -> String {
112 let app = Router::new()
113 .route("/status/200", get(|| async { "ok" }))
114 .route("/post", post(|| async { "ok" }))
115 .route("/put", put(|| async { "ok" }))
116 .route("/patch", patch(|| async { "ok" }))
117 .route("/delete", delete(|| async { "ok" }))
118 .route("/headers", get(|| async { "ok" }));
119
120 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
121 let port = listener.local_addr().unwrap().port();
122 tokio::spawn(async move {
123 axum::serve(listener, app).await.unwrap();
124 });
125 format!("http://localhost:{port}")
126 }
127
128 #[tokio::test]
129 async fn http_get_method() {
130 let base = start_test_server().await;
131 let config = HttpConfig::get(&format!("{base}/status/200"));
132 let executor = HttpExecutor::new(&config);
133 let provider = create_test_provider();
134
135 let result = executor.execute(&provider).await;
136 assert!(result.is_ok());
137 let output = result.unwrap();
138 assert!(output.output.get("status").is_some());
139 assert!(output.output.get("headers").is_some());
140 assert!(output.output.get("body").is_some());
141 }
142
143 #[tokio::test]
144 async fn http_post_method() {
145 let base = start_test_server().await;
146 let config = HttpConfig::post(&format!("{base}/post"));
147 let executor = HttpExecutor::new(&config);
148 let provider = create_test_provider();
149
150 let result = executor.execute(&provider).await;
151 assert!(result.is_ok());
152 }
153
154 #[tokio::test]
155 async fn http_put_method() {
156 let base = start_test_server().await;
157 let config = HttpConfig::put(&format!("{base}/put"));
158 let executor = HttpExecutor::new(&config);
159 let provider = create_test_provider();
160
161 let result = executor.execute(&provider).await;
162 assert!(result.is_ok());
163 }
164
165 #[tokio::test]
166 async fn http_patch_method() {
167 let base = start_test_server().await;
168 let config = HttpConfig::patch(&format!("{base}/patch"));
169 let executor = HttpExecutor::new(&config);
170 let provider = create_test_provider();
171
172 let result = executor.execute(&provider).await;
173 assert!(result.is_ok());
174 }
175
176 #[tokio::test]
177 async fn http_delete_method() {
178 let base = start_test_server().await;
179 let config = HttpConfig::delete(&format!("{base}/delete"));
180 let executor = HttpExecutor::new(&config);
181 let provider = create_test_provider();
182
183 let result = executor.execute(&provider).await;
184 assert!(result.is_ok());
185 }
186
187 #[tokio::test]
188 async fn http_unsupported_method_returns_error() {
189 let base = start_test_server().await;
190 let mut config = HttpConfig::get(&format!("{base}/status/200"));
191 config.method = "INVALID".to_string();
192 let executor = HttpExecutor::new(&config);
193 let provider = create_test_provider();
194
195 let result = executor.execute(&provider).await;
196 assert!(result.is_err());
197 match result {
198 Err(EngineError::StepConfig(msg)) => {
199 assert!(msg.contains("unsupported HTTP method"));
200 }
201 _ => panic!("expected StepConfig error"),
202 }
203 }
204
205 #[tokio::test]
206 async fn http_with_custom_headers() {
207 let base = start_test_server().await;
208 let config = HttpConfig::get(&format!("{base}/headers"))
209 .header("X-Custom-Header", "test-value")
210 .header("Authorization", "Bearer token");
211 let executor = HttpExecutor::new(&config);
212 let provider = create_test_provider();
213
214 let result = executor.execute(&provider).await;
215 assert!(result.is_ok());
216 }
217
218 #[tokio::test]
219 async fn http_with_json_body() {
220 let base = start_test_server().await;
221 let config =
222 HttpConfig::post(&format!("{base}/post")).json(json!({"key": "value", "number": 42}));
223 let executor = HttpExecutor::new(&config);
224 let provider = create_test_provider();
225
226 let result = executor.execute(&provider).await;
227 assert!(result.is_ok());
228 }
229
230 #[tokio::test]
231 async fn http_step_output_has_structure() {
232 let base = start_test_server().await;
233 let config = HttpConfig::get(&format!("{base}/status/200"));
234 let executor = HttpExecutor::new(&config);
235 let provider = create_test_provider();
236
237 let output = executor.execute(&provider).await.unwrap();
238 assert!(output.output.get("status").is_some());
239 assert!(output.output.get("headers").is_some());
240 assert!(output.output.get("body").is_some());
241 assert_eq!(output.cost_usd, Decimal::ZERO);
242 }
243}