Skip to main content

ironflow_engine/executor/
http.rs

1//! HTTP step executor.
2
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use rust_decimal::Decimal;
7use serde_json::json;
8use tracing::info;
9
10use ironflow_core::operations::http::Http;
11use ironflow_core::provider::AgentProvider;
12
13use crate::config::HttpConfig;
14use crate::error::EngineError;
15
16use super::{StepExecutor, StepOutput};
17
18/// Executor for HTTP steps.
19///
20/// Sends an HTTP request and captures the response status, headers, and body.
21pub struct HttpExecutor<'a> {
22    config: &'a HttpConfig,
23}
24
25impl<'a> HttpExecutor<'a> {
26    /// Create a new HTTP executor from a config reference.
27    pub fn new(config: &'a HttpConfig) -> Self {
28        Self { config }
29    }
30}
31
32impl StepExecutor for HttpExecutor<'_> {
33    async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
34        let start = Instant::now();
35
36        let mut http = match self.config.method.to_uppercase().as_str() {
37            "GET" => Http::get(&self.config.url),
38            "POST" => Http::post(&self.config.url),
39            "PUT" => Http::put(&self.config.url),
40            "PATCH" => Http::patch(&self.config.url),
41            "DELETE" => Http::delete(&self.config.url),
42            other => {
43                return Err(EngineError::StepConfig(format!(
44                    "unsupported HTTP method: {other}"
45                )));
46            }
47        };
48
49        for (name, value) in &self.config.headers {
50            http = http.header(name, value);
51        }
52        if let Some(ref body) = self.config.body {
53            http = http.json(body.clone());
54        }
55        if let Some(secs) = self.config.timeout_secs {
56            http = http.timeout(Duration::from_secs(secs));
57        }
58
59        let output = http.run().await?;
60        let duration_ms = start.elapsed().as_millis() as u64;
61
62        info!(
63            step_kind = "http",
64            method = %self.config.method,
65            url = %self.config.url,
66            status = output.status(),
67            duration_ms,
68            "http step completed"
69        );
70
71        #[cfg(feature = "prometheus")]
72        {
73            use ironflow_core::metric_names::{HTTP_DURATION_SECONDS, HTTP_TOTAL, STATUS_SUCCESS};
74            use metrics::{counter, histogram};
75            counter!(HTTP_TOTAL, "method" => self.config.method.clone(), "status" => STATUS_SUCCESS).increment(1);
76            histogram!(HTTP_DURATION_SECONDS).record(duration_ms as f64 / 1000.0);
77        }
78
79        Ok(StepOutput {
80            output: json!({
81                "status": output.status(),
82                "headers": output.headers(),
83                "body": output.body(),
84            }),
85            duration_ms,
86            cost_usd: Decimal::ZERO,
87            input_tokens: None,
88            output_tokens: None,
89            model: None,
90            debug_messages: None,
91        })
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98    use axum::Router;
99    use axum::routing::{delete, get, patch, post, put};
100    use ironflow_core::providers::claude::ClaudeCodeProvider;
101    use ironflow_core::providers::record_replay::RecordReplayProvider;
102    use tokio::net::TcpListener;
103
104    fn create_test_provider() -> Arc<dyn AgentProvider> {
105        let inner = ClaudeCodeProvider::new();
106        Arc::new(RecordReplayProvider::replay(
107            inner,
108            "/tmp/ironflow-fixtures",
109        ))
110    }
111
112    async fn start_test_server() -> String {
113        let app = Router::new()
114            .route("/status/200", get(|| async { "ok" }))
115            .route("/post", post(|| async { "ok" }))
116            .route("/put", put(|| async { "ok" }))
117            .route("/patch", patch(|| async { "ok" }))
118            .route("/delete", delete(|| async { "ok" }))
119            .route("/headers", get(|| async { "ok" }));
120
121        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
122        let port = listener.local_addr().unwrap().port();
123        tokio::spawn(async move {
124            axum::serve(listener, app).await.unwrap();
125        });
126        format!("http://localhost:{port}")
127    }
128
129    #[tokio::test]
130    async fn http_get_method() {
131        let base = start_test_server().await;
132        let config = HttpConfig::get(&format!("{base}/status/200"));
133        let executor = HttpExecutor::new(&config);
134        let provider = create_test_provider();
135
136        let result = executor.execute(&provider).await;
137        assert!(result.is_ok());
138        let output = result.unwrap();
139        assert!(output.output.get("status").is_some());
140        assert!(output.output.get("headers").is_some());
141        assert!(output.output.get("body").is_some());
142    }
143
144    #[tokio::test]
145    async fn http_post_method() {
146        let base = start_test_server().await;
147        let config = HttpConfig::post(&format!("{base}/post"));
148        let executor = HttpExecutor::new(&config);
149        let provider = create_test_provider();
150
151        let result = executor.execute(&provider).await;
152        assert!(result.is_ok());
153    }
154
155    #[tokio::test]
156    async fn http_put_method() {
157        let base = start_test_server().await;
158        let config = HttpConfig::put(&format!("{base}/put"));
159        let executor = HttpExecutor::new(&config);
160        let provider = create_test_provider();
161
162        let result = executor.execute(&provider).await;
163        assert!(result.is_ok());
164    }
165
166    #[tokio::test]
167    async fn http_patch_method() {
168        let base = start_test_server().await;
169        let config = HttpConfig::patch(&format!("{base}/patch"));
170        let executor = HttpExecutor::new(&config);
171        let provider = create_test_provider();
172
173        let result = executor.execute(&provider).await;
174        assert!(result.is_ok());
175    }
176
177    #[tokio::test]
178    async fn http_delete_method() {
179        let base = start_test_server().await;
180        let config = HttpConfig::delete(&format!("{base}/delete"));
181        let executor = HttpExecutor::new(&config);
182        let provider = create_test_provider();
183
184        let result = executor.execute(&provider).await;
185        assert!(result.is_ok());
186    }
187
188    #[tokio::test]
189    async fn http_unsupported_method_returns_error() {
190        let base = start_test_server().await;
191        let mut config = HttpConfig::get(&format!("{base}/status/200"));
192        config.method = "INVALID".to_string();
193        let executor = HttpExecutor::new(&config);
194        let provider = create_test_provider();
195
196        let result = executor.execute(&provider).await;
197        assert!(result.is_err());
198        match result {
199            Err(EngineError::StepConfig(msg)) => {
200                assert!(msg.contains("unsupported HTTP method"));
201            }
202            _ => panic!("expected StepConfig error"),
203        }
204    }
205
206    #[tokio::test]
207    async fn http_with_custom_headers() {
208        let base = start_test_server().await;
209        let config = HttpConfig::get(&format!("{base}/headers"))
210            .header("X-Custom-Header", "test-value")
211            .header("Authorization", "Bearer token");
212        let executor = HttpExecutor::new(&config);
213        let provider = create_test_provider();
214
215        let result = executor.execute(&provider).await;
216        assert!(result.is_ok());
217    }
218
219    #[tokio::test]
220    async fn http_with_json_body() {
221        let base = start_test_server().await;
222        let config =
223            HttpConfig::post(&format!("{base}/post")).json(json!({"key": "value", "number": 42}));
224        let executor = HttpExecutor::new(&config);
225        let provider = create_test_provider();
226
227        let result = executor.execute(&provider).await;
228        assert!(result.is_ok());
229    }
230
231    #[tokio::test]
232    async fn http_step_output_has_structure() {
233        let base = start_test_server().await;
234        let config = HttpConfig::get(&format!("{base}/status/200"));
235        let executor = HttpExecutor::new(&config);
236        let provider = create_test_provider();
237
238        let output = executor.execute(&provider).await.unwrap();
239        assert!(output.output.get("status").is_some());
240        assert!(output.output.get("headers").is_some());
241        assert!(output.output.get("body").is_some());
242        assert_eq!(output.cost_usd, Decimal::ZERO);
243    }
244}