ironflow_engine/executor/
http.rs1use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use rust_decimal::Decimal;
7use serde_json::json;
8use tracing::info;
9
10use ironflow_core::operations::http::Http;
11use ironflow_core::provider::AgentProvider;
12
13use crate::config::HttpConfig;
14use crate::error::EngineError;
15
16use super::{StepExecutor, StepOutput};
17
18pub struct HttpExecutor<'a> {
22 config: &'a HttpConfig,
23}
24
25impl<'a> HttpExecutor<'a> {
26 pub fn new(config: &'a HttpConfig) -> Self {
28 Self { config }
29 }
30}
31
32impl StepExecutor for HttpExecutor<'_> {
33 async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
34 let start = Instant::now();
35
36 let mut http = match self.config.method.to_uppercase().as_str() {
37 "GET" => Http::get(&self.config.url),
38 "POST" => Http::post(&self.config.url),
39 "PUT" => Http::put(&self.config.url),
40 "PATCH" => Http::patch(&self.config.url),
41 "DELETE" => Http::delete(&self.config.url),
42 other => {
43 return Err(EngineError::StepConfig(format!(
44 "unsupported HTTP method: {other}"
45 )));
46 }
47 };
48
49 for (name, value) in &self.config.headers {
50 http = http.header(name, value);
51 }
52 if let Some(ref body) = self.config.body {
53 http = http.json(body.clone());
54 }
55 if let Some(secs) = self.config.timeout_secs {
56 http = http.timeout(Duration::from_secs(secs));
57 }
58
59 let output = http.run().await?;
60 let duration_ms = start.elapsed().as_millis() as u64;
61
62 info!(
63 step_kind = "http",
64 method = %self.config.method,
65 url = %self.config.url,
66 status = output.status(),
67 duration_ms,
68 "http step completed"
69 );
70
71 Ok(StepOutput {
72 output: json!({
73 "status": output.status(),
74 "headers": output.headers(),
75 "body": output.body(),
76 }),
77 duration_ms,
78 cost_usd: Decimal::ZERO,
79 input_tokens: None,
80 output_tokens: None,
81 })
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use axum::Router;
89 use axum::routing::{delete, get, patch, post, put};
90 use ironflow_core::providers::claude::ClaudeCodeProvider;
91 use ironflow_core::providers::record_replay::RecordReplayProvider;
92 use tokio::net::TcpListener;
93
94 fn create_test_provider() -> Arc<dyn AgentProvider> {
95 let inner = ClaudeCodeProvider::new();
96 Arc::new(RecordReplayProvider::replay(
97 inner,
98 "/tmp/ironflow-fixtures",
99 ))
100 }
101
102 async fn start_test_server() -> String {
103 let app = Router::new()
104 .route("/status/200", get(|| async { "ok" }))
105 .route("/post", post(|| async { "ok" }))
106 .route("/put", put(|| async { "ok" }))
107 .route("/patch", patch(|| async { "ok" }))
108 .route("/delete", delete(|| async { "ok" }))
109 .route("/headers", get(|| async { "ok" }));
110
111 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
112 let port = listener.local_addr().unwrap().port();
113 tokio::spawn(async move {
114 axum::serve(listener, app).await.unwrap();
115 });
116 format!("http://localhost:{port}")
117 }
118
119 #[tokio::test]
120 async fn http_get_method() {
121 let base = start_test_server().await;
122 let config = HttpConfig::get(&format!("{base}/status/200"));
123 let executor = HttpExecutor::new(&config);
124 let provider = create_test_provider();
125
126 let result = executor.execute(&provider).await;
127 assert!(result.is_ok());
128 let output = result.unwrap();
129 assert!(output.output.get("status").is_some());
130 assert!(output.output.get("headers").is_some());
131 assert!(output.output.get("body").is_some());
132 }
133
134 #[tokio::test]
135 async fn http_post_method() {
136 let base = start_test_server().await;
137 let config = HttpConfig::post(&format!("{base}/post"));
138 let executor = HttpExecutor::new(&config);
139 let provider = create_test_provider();
140
141 let result = executor.execute(&provider).await;
142 assert!(result.is_ok());
143 }
144
145 #[tokio::test]
146 async fn http_put_method() {
147 let base = start_test_server().await;
148 let config = HttpConfig::put(&format!("{base}/put"));
149 let executor = HttpExecutor::new(&config);
150 let provider = create_test_provider();
151
152 let result = executor.execute(&provider).await;
153 assert!(result.is_ok());
154 }
155
156 #[tokio::test]
157 async fn http_patch_method() {
158 let base = start_test_server().await;
159 let config = HttpConfig::patch(&format!("{base}/patch"));
160 let executor = HttpExecutor::new(&config);
161 let provider = create_test_provider();
162
163 let result = executor.execute(&provider).await;
164 assert!(result.is_ok());
165 }
166
167 #[tokio::test]
168 async fn http_delete_method() {
169 let base = start_test_server().await;
170 let config = HttpConfig::delete(&format!("{base}/delete"));
171 let executor = HttpExecutor::new(&config);
172 let provider = create_test_provider();
173
174 let result = executor.execute(&provider).await;
175 assert!(result.is_ok());
176 }
177
178 #[tokio::test]
179 async fn http_unsupported_method_returns_error() {
180 let base = start_test_server().await;
181 let mut config = HttpConfig::get(&format!("{base}/status/200"));
182 config.method = "INVALID".to_string();
183 let executor = HttpExecutor::new(&config);
184 let provider = create_test_provider();
185
186 let result = executor.execute(&provider).await;
187 assert!(result.is_err());
188 match result {
189 Err(EngineError::StepConfig(msg)) => {
190 assert!(msg.contains("unsupported HTTP method"));
191 }
192 _ => panic!("expected StepConfig error"),
193 }
194 }
195
196 #[tokio::test]
197 async fn http_with_custom_headers() {
198 let base = start_test_server().await;
199 let config = HttpConfig::get(&format!("{base}/headers"))
200 .header("X-Custom-Header", "test-value")
201 .header("Authorization", "Bearer token");
202 let executor = HttpExecutor::new(&config);
203 let provider = create_test_provider();
204
205 let result = executor.execute(&provider).await;
206 assert!(result.is_ok());
207 }
208
209 #[tokio::test]
210 async fn http_with_json_body() {
211 let base = start_test_server().await;
212 let config =
213 HttpConfig::post(&format!("{base}/post")).json(json!({"key": "value", "number": 42}));
214 let executor = HttpExecutor::new(&config);
215 let provider = create_test_provider();
216
217 let result = executor.execute(&provider).await;
218 assert!(result.is_ok());
219 }
220
221 #[tokio::test]
222 async fn http_step_output_has_structure() {
223 let base = start_test_server().await;
224 let config = HttpConfig::get(&format!("{base}/status/200"));
225 let executor = HttpExecutor::new(&config);
226 let provider = create_test_provider();
227
228 let output = executor.execute(&provider).await.unwrap();
229 assert!(output.output.get("status").is_some());
230 assert!(output.output.get("headers").is_some());
231 assert!(output.output.get("body").is_some());
232 assert_eq!(output.cost_usd, Decimal::ZERO);
233 }
234}