Skip to main content

ironflow_engine/executor/
http.rs

1//! HTTP step executor.
2
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use rust_decimal::Decimal;
7use serde_json::json;
8use tracing::info;
9
10use ironflow_core::operations::http::Http;
11use ironflow_core::provider::AgentProvider;
12
13use crate::config::HttpConfig;
14use crate::error::EngineError;
15
16use super::{StepExecutor, StepOutput};
17
18/// Executor for HTTP steps.
19///
20/// Sends an HTTP request and captures the response status, headers, and body.
21pub struct HttpExecutor<'a> {
22    config: &'a HttpConfig,
23}
24
25impl<'a> HttpExecutor<'a> {
26    /// Create a new HTTP executor from a config reference.
27    pub fn new(config: &'a HttpConfig) -> Self {
28        Self { config }
29    }
30}
31
32impl StepExecutor for HttpExecutor<'_> {
33    async fn execute(&self, _provider: &Arc<dyn AgentProvider>) -> Result<StepOutput, EngineError> {
34        let start = Instant::now();
35
36        let mut http = match self.config.method.to_uppercase().as_str() {
37            "GET" => Http::get(&self.config.url),
38            "POST" => Http::post(&self.config.url),
39            "PUT" => Http::put(&self.config.url),
40            "PATCH" => Http::patch(&self.config.url),
41            "DELETE" => Http::delete(&self.config.url),
42            other => {
43                return Err(EngineError::StepConfig(format!(
44                    "unsupported HTTP method: {other}"
45                )));
46            }
47        };
48
49        for (name, value) in &self.config.headers {
50            http = http.header(name, value);
51        }
52        if let Some(ref body) = self.config.body {
53            http = http.json(body.clone());
54        }
55        if let Some(secs) = self.config.timeout_secs {
56            http = http.timeout(Duration::from_secs(secs));
57        }
58
59        let output = http.run().await?;
60        let duration_ms = start.elapsed().as_millis() as u64;
61
62        info!(
63            step_kind = "http",
64            method = %self.config.method,
65            url = %self.config.url,
66            status = output.status(),
67            duration_ms,
68            "http step completed"
69        );
70
71        Ok(StepOutput {
72            output: json!({
73                "status": output.status(),
74                "headers": output.headers(),
75                "body": output.body(),
76            }),
77            duration_ms,
78            cost_usd: Decimal::ZERO,
79            input_tokens: None,
80            output_tokens: None,
81        })
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use ironflow_core::providers::claude::ClaudeCodeProvider;
89    use ironflow_core::providers::record_replay::RecordReplayProvider;
90
91    fn create_test_provider() -> Arc<dyn AgentProvider> {
92        let inner = ClaudeCodeProvider::new();
93        Arc::new(RecordReplayProvider::replay(
94            inner,
95            "/tmp/ironflow-fixtures",
96        ))
97    }
98
99    #[tokio::test]
100    async fn http_get_method() {
101        let config = HttpConfig::get("http://httpbin.org/status/200");
102        let executor = HttpExecutor::new(&config);
103        let provider = create_test_provider();
104
105        let result = executor.execute(&provider).await;
106        assert!(result.is_ok());
107        let output = result.unwrap();
108        assert!(output.output.get("status").is_some());
109        assert!(output.output.get("headers").is_some());
110        assert!(output.output.get("body").is_some());
111    }
112
113    #[tokio::test]
114    async fn http_post_method() {
115        let config = HttpConfig::post("http://httpbin.org/post");
116        let executor = HttpExecutor::new(&config);
117        let provider = create_test_provider();
118
119        let result = executor.execute(&provider).await;
120        assert!(result.is_ok());
121    }
122
123    #[tokio::test]
124    async fn http_put_method() {
125        let config = HttpConfig::put("http://httpbin.org/put");
126        let executor = HttpExecutor::new(&config);
127        let provider = create_test_provider();
128
129        let result = executor.execute(&provider).await;
130        assert!(result.is_ok());
131    }
132
133    #[tokio::test]
134    async fn http_patch_method() {
135        let config = HttpConfig::patch("http://httpbin.org/patch");
136        let executor = HttpExecutor::new(&config);
137        let provider = create_test_provider();
138
139        let result = executor.execute(&provider).await;
140        assert!(result.is_ok());
141    }
142
143    #[tokio::test]
144    async fn http_delete_method() {
145        let config = HttpConfig::delete("http://httpbin.org/delete");
146        let executor = HttpExecutor::new(&config);
147        let provider = create_test_provider();
148
149        let result = executor.execute(&provider).await;
150        assert!(result.is_ok());
151    }
152
153    #[tokio::test]
154    async fn http_unsupported_method_returns_error() {
155        let mut config = HttpConfig::get("http://httpbin.org/status/200");
156        config.method = "INVALID".to_string();
157        let executor = HttpExecutor::new(&config);
158        let provider = create_test_provider();
159
160        let result = executor.execute(&provider).await;
161        assert!(result.is_err());
162        match result {
163            Err(EngineError::StepConfig(msg)) => {
164                assert!(msg.contains("unsupported HTTP method"));
165            }
166            _ => panic!("expected StepConfig error"),
167        }
168    }
169
170    #[tokio::test]
171    async fn http_with_custom_headers() {
172        let config = HttpConfig::get("http://httpbin.org/headers")
173            .header("X-Custom-Header", "test-value")
174            .header("Authorization", "Bearer token");
175        let executor = HttpExecutor::new(&config);
176        let provider = create_test_provider();
177
178        let result = executor.execute(&provider).await;
179        assert!(result.is_ok());
180    }
181
182    #[tokio::test]
183    async fn http_with_json_body() {
184        let config =
185            HttpConfig::post("http://httpbin.org/post").json(json!({"key": "value", "number": 42}));
186        let executor = HttpExecutor::new(&config);
187        let provider = create_test_provider();
188
189        let result = executor.execute(&provider).await;
190        assert!(result.is_ok());
191    }
192
193    #[tokio::test]
194    async fn http_step_output_has_structure() {
195        let config = HttpConfig::get("http://httpbin.org/status/200");
196        let executor = HttpExecutor::new(&config);
197        let provider = create_test_provider();
198
199        let output = executor.execute(&provider).await.unwrap();
200        assert!(output.output.get("status").is_some());
201        assert!(output.output.get("headers").is_some());
202        assert!(output.output.get("body").is_some());
203        assert_eq!(output.cost_usd, Decimal::ZERO);
204        assert!(output.duration_ms > 0);
205    }
206}