ironflow_engine/executor/
http.rs1use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use rust_decimal::Decimal;
7use serde_json::json;
8use tracing::info;
9
10use ironflow_core::operations::http::Http;
11use ironflow_core::provider::AgentProvider;
12
13use crate::config::HttpConfig;
14use crate::error::EngineError;
15
16use super::{StepExecutor, StepOutput};
17
18pub struct HttpExecutor<'a> {
22 config: &'a HttpConfig,
23}
24
25impl<'a> HttpExecutor<'a> {
26 pub fn new(config: &'a HttpConfig) -> Self {
28 Self { config }
29 }
30}
31
32impl StepExecutor for HttpExecutor<'_> {
33 async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
34 let start = Instant::now();
35
36 let mut http = match self.config.method.to_uppercase().as_str() {
37 "GET" => Http::get(&self.config.url),
38 "POST" => Http::post(&self.config.url),
39 "PUT" => Http::put(&self.config.url),
40 "PATCH" => Http::patch(&self.config.url),
41 "DELETE" => Http::delete(&self.config.url),
42 other => {
43 return Err(EngineError::StepConfig(format!(
44 "unsupported HTTP method: {other}"
45 )));
46 }
47 };
48
49 for (name, value) in &self.config.headers {
50 http = http.header(name, value);
51 }
52 if let Some(ref body) = self.config.body {
53 http = http.json(body.clone());
54 }
55 if let Some(secs) = self.config.timeout_secs {
56 http = http.timeout(Duration::from_secs(secs));
57 }
58
59 let output = http.run().await?;
60 let duration_ms = start.elapsed().as_millis() as u64;
61
62 info!(
63 step_kind = "http",
64 method = %self.config.method,
65 url = %self.config.url,
66 status = output.status(),
67 duration_ms,
68 "http step completed"
69 );
70
71 Ok(StepOutput {
72 output: json!({
73 "status": output.status(),
74 "headers": output.headers(),
75 "body": output.body(),
76 }),
77 duration_ms,
78 cost_usd: Decimal::ZERO,
79 input_tokens: None,
80 output_tokens: None,
81 })
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use ironflow_core::providers::claude::ClaudeCodeProvider;
89 use ironflow_core::providers::record_replay::RecordReplayProvider;
90
91 fn create_test_provider() -> Arc<dyn AgentProvider> {
92 let inner = ClaudeCodeProvider::new();
93 Arc::new(RecordReplayProvider::replay(
94 inner,
95 "/tmp/ironflow-fixtures",
96 ))
97 }
98
99 #[tokio::test]
100 async fn http_get_method() {
101 let config = HttpConfig::get("http://httpbin.org/status/200");
102 let executor = HttpExecutor::new(&config);
103 let provider = create_test_provider();
104
105 let result = executor.execute(&provider).await;
106 assert!(result.is_ok());
107 let output = result.unwrap();
108 assert!(output.output.get("status").is_some());
109 assert!(output.output.get("headers").is_some());
110 assert!(output.output.get("body").is_some());
111 }
112
113 #[tokio::test]
114 async fn http_post_method() {
115 let config = HttpConfig::post("http://httpbin.org/post");
116 let executor = HttpExecutor::new(&config);
117 let provider = create_test_provider();
118
119 let result = executor.execute(&provider).await;
120 assert!(result.is_ok());
121 }
122
123 #[tokio::test]
124 async fn http_put_method() {
125 let config = HttpConfig::put("http://httpbin.org/put");
126 let executor = HttpExecutor::new(&config);
127 let provider = create_test_provider();
128
129 let result = executor.execute(&provider).await;
130 assert!(result.is_ok());
131 }
132
133 #[tokio::test]
134 async fn http_patch_method() {
135 let config = HttpConfig::patch("http://httpbin.org/patch");
136 let executor = HttpExecutor::new(&config);
137 let provider = create_test_provider();
138
139 let result = executor.execute(&provider).await;
140 assert!(result.is_ok());
141 }
142
143 #[tokio::test]
144 async fn http_delete_method() {
145 let config = HttpConfig::delete("http://httpbin.org/delete");
146 let executor = HttpExecutor::new(&config);
147 let provider = create_test_provider();
148
149 let result = executor.execute(&provider).await;
150 assert!(result.is_ok());
151 }
152
153 #[tokio::test]
154 async fn http_unsupported_method_returns_error() {
155 let mut config = HttpConfig::get("http://httpbin.org/status/200");
156 config.method = "INVALID".to_string();
157 let executor = HttpExecutor::new(&config);
158 let provider = create_test_provider();
159
160 let result = executor.execute(&provider).await;
161 assert!(result.is_err());
162 match result {
163 Err(EngineError::StepConfig(msg)) => {
164 assert!(msg.contains("unsupported HTTP method"));
165 }
166 _ => panic!("expected StepConfig error"),
167 }
168 }
169
170 #[tokio::test]
171 async fn http_with_custom_headers() {
172 let config = HttpConfig::get("http://httpbin.org/headers")
173 .header("X-Custom-Header", "test-value")
174 .header("Authorization", "Bearer token");
175 let executor = HttpExecutor::new(&config);
176 let provider = create_test_provider();
177
178 let result = executor.execute(&provider).await;
179 assert!(result.is_ok());
180 }
181
182 #[tokio::test]
183 async fn http_with_json_body() {
184 let config =
185 HttpConfig::post("http://httpbin.org/post").json(json!({"key": "value", "number": 42}));
186 let executor = HttpExecutor::new(&config);
187 let provider = create_test_provider();
188
189 let result = executor.execute(&provider).await;
190 assert!(result.is_ok());
191 }
192
193 #[tokio::test]
194 async fn http_step_output_has_structure() {
195 let config = HttpConfig::get("http://httpbin.org/status/200");
196 let executor = HttpExecutor::new(&config);
197 let provider = create_test_provider();
198
199 let output = executor.execute(&provider).await.unwrap();
200 assert!(output.output.get("status").is_some());
201 assert!(output.output.get("headers").is_some());
202 assert!(output.output.get("body").is_some());
203 assert_eq!(output.cost_usd, Decimal::ZERO);
204 assert!(output.duration_ms > 0);
205 }
206}