ironflow_engine/executor/
http.rs1use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use rust_decimal::Decimal;
7use serde_json::json;
8use tracing::info;
9
10use ironflow_core::operations::http::Http;
11use ironflow_core::provider::AgentProvider;
12
13use crate::config::HttpConfig;
14use crate::error::EngineError;
15
16use super::{StepExecutor, StepOutput};
17
18pub struct HttpExecutor<'a> {
22 config: &'a HttpConfig,
23}
24
25impl<'a> HttpExecutor<'a> {
26 pub fn new(config: &'a HttpConfig) -> Self {
28 Self { config }
29 }
30}
31
32impl StepExecutor for HttpExecutor<'_> {
33 async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
34 let start = Instant::now();
35
36 let mut http = match self.config.method.to_uppercase().as_str() {
37 "GET" => Http::get(&self.config.url),
38 "POST" => Http::post(&self.config.url),
39 "PUT" => Http::put(&self.config.url),
40 "PATCH" => Http::patch(&self.config.url),
41 "DELETE" => Http::delete(&self.config.url),
42 other => {
43 return Err(EngineError::StepConfig(format!(
44 "unsupported HTTP method: {other}"
45 )));
46 }
47 };
48
49 for (name, value) in &self.config.headers {
50 http = http.header(name, value);
51 }
52 if let Some(ref body) = self.config.body {
53 http = http.json(body.clone());
54 }
55 if let Some(secs) = self.config.timeout_secs {
56 http = http.timeout(Duration::from_secs(secs));
57 }
58
59 let output = http.run().await?;
60 let duration_ms = start.elapsed().as_millis() as u64;
61
62 info!(
63 step_kind = "http",
64 method = %self.config.method,
65 url = %self.config.url,
66 status = output.status(),
67 duration_ms,
68 "http step completed"
69 );
70
71 #[cfg(feature = "prometheus")]
72 {
73 use ironflow_core::metric_names::{HTTP_DURATION_SECONDS, HTTP_TOTAL, STATUS_SUCCESS};
74 use metrics::{counter, histogram};
75 counter!(HTTP_TOTAL, "method" => self.config.method.clone(), "status" => STATUS_SUCCESS).increment(1);
76 histogram!(HTTP_DURATION_SECONDS).record(duration_ms as f64 / 1000.0);
77 }
78
79 Ok(StepOutput {
80 output: json!({
81 "status": output.status(),
82 "headers": output.headers(),
83 "body": output.body(),
84 }),
85 duration_ms,
86 cost_usd: Decimal::ZERO,
87 input_tokens: None,
88 output_tokens: None,
89 model: None,
90 debug_messages: None,
91 })
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98 use axum::Router;
99 use axum::routing::{delete, get, patch, post, put};
100 use ironflow_core::providers::claude::ClaudeCodeProvider;
101 use ironflow_core::providers::record_replay::RecordReplayProvider;
102 use tokio::net::TcpListener;
103
104 fn create_test_provider() -> Arc<dyn AgentProvider> {
105 let inner = ClaudeCodeProvider::new();
106 Arc::new(RecordReplayProvider::replay(
107 inner,
108 "/tmp/ironflow-fixtures",
109 ))
110 }
111
112 async fn start_test_server() -> String {
113 let app = Router::new()
114 .route("/status/200", get(|| async { "ok" }))
115 .route("/post", post(|| async { "ok" }))
116 .route("/put", put(|| async { "ok" }))
117 .route("/patch", patch(|| async { "ok" }))
118 .route("/delete", delete(|| async { "ok" }))
119 .route("/headers", get(|| async { "ok" }));
120
121 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
122 let port = listener.local_addr().unwrap().port();
123 tokio::spawn(async move {
124 axum::serve(listener, app).await.unwrap();
125 });
126 format!("http://localhost:{port}")
127 }
128
129 #[tokio::test]
130 async fn http_get_method() {
131 let base = start_test_server().await;
132 let config = HttpConfig::get(&format!("{base}/status/200"));
133 let executor = HttpExecutor::new(&config);
134 let provider = create_test_provider();
135
136 let result = executor.execute(&provider).await;
137 assert!(result.is_ok());
138 let output = result.unwrap();
139 assert!(output.output.get("status").is_some());
140 assert!(output.output.get("headers").is_some());
141 assert!(output.output.get("body").is_some());
142 }
143
144 #[tokio::test]
145 async fn http_post_method() {
146 let base = start_test_server().await;
147 let config = HttpConfig::post(&format!("{base}/post"));
148 let executor = HttpExecutor::new(&config);
149 let provider = create_test_provider();
150
151 let result = executor.execute(&provider).await;
152 assert!(result.is_ok());
153 }
154
155 #[tokio::test]
156 async fn http_put_method() {
157 let base = start_test_server().await;
158 let config = HttpConfig::put(&format!("{base}/put"));
159 let executor = HttpExecutor::new(&config);
160 let provider = create_test_provider();
161
162 let result = executor.execute(&provider).await;
163 assert!(result.is_ok());
164 }
165
166 #[tokio::test]
167 async fn http_patch_method() {
168 let base = start_test_server().await;
169 let config = HttpConfig::patch(&format!("{base}/patch"));
170 let executor = HttpExecutor::new(&config);
171 let provider = create_test_provider();
172
173 let result = executor.execute(&provider).await;
174 assert!(result.is_ok());
175 }
176
177 #[tokio::test]
178 async fn http_delete_method() {
179 let base = start_test_server().await;
180 let config = HttpConfig::delete(&format!("{base}/delete"));
181 let executor = HttpExecutor::new(&config);
182 let provider = create_test_provider();
183
184 let result = executor.execute(&provider).await;
185 assert!(result.is_ok());
186 }
187
188 #[tokio::test]
189 async fn http_unsupported_method_returns_error() {
190 let base = start_test_server().await;
191 let mut config = HttpConfig::get(&format!("{base}/status/200"));
192 config.method = "INVALID".to_string();
193 let executor = HttpExecutor::new(&config);
194 let provider = create_test_provider();
195
196 let result = executor.execute(&provider).await;
197 assert!(result.is_err());
198 match result {
199 Err(EngineError::StepConfig(msg)) => {
200 assert!(msg.contains("unsupported HTTP method"));
201 }
202 _ => panic!("expected StepConfig error"),
203 }
204 }
205
206 #[tokio::test]
207 async fn http_with_custom_headers() {
208 let base = start_test_server().await;
209 let config = HttpConfig::get(&format!("{base}/headers"))
210 .header("X-Custom-Header", "test-value")
211 .header("Authorization", "Bearer token");
212 let executor = HttpExecutor::new(&config);
213 let provider = create_test_provider();
214
215 let result = executor.execute(&provider).await;
216 assert!(result.is_ok());
217 }
218
219 #[tokio::test]
220 async fn http_with_json_body() {
221 let base = start_test_server().await;
222 let config =
223 HttpConfig::post(&format!("{base}/post")).json(json!({"key": "value", "number": 42}));
224 let executor = HttpExecutor::new(&config);
225 let provider = create_test_provider();
226
227 let result = executor.execute(&provider).await;
228 assert!(result.is_ok());
229 }
230
231 #[tokio::test]
232 async fn http_step_output_has_structure() {
233 let base = start_test_server().await;
234 let config = HttpConfig::get(&format!("{base}/status/200"));
235 let executor = HttpExecutor::new(&config);
236 let provider = create_test_provider();
237
238 let output = executor.execute(&provider).await.unwrap();
239 assert!(output.output.get("status").is_some());
240 assert!(output.output.get("headers").is_some());
241 assert!(output.output.get("body").is_some());
242 assert_eq!(output.cost_usd, Decimal::ZERO);
243 }
244}