Skip to main content

worldinterface_connector/connectors/
http_request.rs

1//! The `http.request` connector — makes HTTP requests to external endpoints.
2//!
3//! Uses `reqwest::blocking::Client` because AQ's `ExecutorHandler::execute()`
4//! is synchronous and runs on OS threads (not tokio worker threads).
5
6use std::collections::HashMap;
7use std::time::Duration;
8
9use serde_json::{json, Value};
10use worldinterface_core::descriptor::{ConnectorCategory, Descriptor};
11
12use crate::context::InvocationContext;
13use crate::error::ConnectorError;
14use crate::traits::Connector;
15
16/// Makes HTTP requests to external endpoints. Injects `X-Idempotency-Key`
17/// header with the `run_id` on every request (Invariant 3).
18pub struct HttpRequestConnector {
19    client: reqwest::blocking::Client,
20}
21
22impl HttpRequestConnector {
23    pub fn new() -> Self {
24        Self {
25            client: reqwest::blocking::Client::builder()
26                .build()
27                .expect("failed to build HTTP client"),
28        }
29    }
30}
31
32impl Default for HttpRequestConnector {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl Connector for HttpRequestConnector {
39    fn describe(&self) -> Descriptor {
40        Descriptor {
41            name: "http.request".into(),
42            display_name: "HTTP Request".into(),
43            description: "Makes an HTTP request to an external URL.".into(),
44            category: ConnectorCategory::Http,
45            input_schema: Some(json!({
46                "type": "object",
47                "required": ["url"],
48                "properties": {
49                    "method": { "type": "string", "enum": ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"] },
50                    "url": { "type": "string" },
51                    "headers": { "type": "object" },
52                    "body": {},
53                    "timeout_ms": { "type": "integer", "minimum": 0 }
54                }
55            })),
56            output_schema: Some(json!({
57                "type": "object",
58                "properties": {
59                    "status": { "type": "integer" },
60                    "headers": { "type": "object" },
61                    "body": { "type": "string" }
62                }
63            })),
64            idempotent: false,
65            side_effects: true,
66        }
67    }
68
69    fn invoke(&self, ctx: &InvocationContext, params: &Value) -> Result<Value, ConnectorError> {
70        // Check cancellation before starting the request
71        if ctx.cancellation.is_cancelled() {
72            return Err(ConnectorError::Cancelled);
73        }
74
75        let url = params.get("url").and_then(Value::as_str).ok_or_else(|| {
76            ConnectorError::InvalidParams("missing or invalid 'url' (expected string)".into())
77        })?;
78
79        let method_str = params.get("method").and_then(Value::as_str).unwrap_or("GET");
80
81        let method: reqwest::Method = method_str.parse().map_err(|_| {
82            ConnectorError::InvalidParams(format!("unknown HTTP method: '{method_str}'"))
83        })?;
84
85        // Build request
86        let mut request = self.client.request(method, url);
87
88        // Inject idempotency key (Invariant 3)
89        request = request.header("X-Idempotency-Key", ctx.run_id.to_string());
90
91        // Apply custom headers
92        if let Some(headers) = params.get("headers").and_then(Value::as_object) {
93            for (key, value) in headers {
94                if let Some(val_str) = value.as_str() {
95                    request = request.header(key.as_str(), val_str);
96                }
97            }
98        }
99
100        // Apply body
101        if let Some(body) = params.get("body") {
102            match body {
103                Value::String(s) => {
104                    request = request.body(s.clone());
105                }
106                other => {
107                    request =
108                        request.header("Content-Type", "application/json").body(other.to_string());
109                }
110            }
111        }
112
113        // Apply timeout
114        if let Some(timeout_ms) = params.get("timeout_ms").and_then(Value::as_u64) {
115            request = request.timeout(Duration::from_millis(timeout_ms));
116        }
117
118        // Send request
119        let response = request.send().map_err(|e| classify_reqwest_error(&e, url))?;
120
121        // Build output — all HTTP responses (including 4xx, 5xx) are success
122        let status = response.status().as_u16();
123        let response_headers: HashMap<String, String> = response
124            .headers()
125            .iter()
126            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
127            .collect();
128        let body = response
129            .text()
130            .map_err(|e| ConnectorError::Retryable(format!("failed to read response body: {e}")))?;
131
132        Ok(json!({
133            "status": status,
134            "headers": response_headers,
135            "body": body,
136        }))
137    }
138}
139
140fn classify_reqwest_error(err: &reqwest::Error, url: &str) -> ConnectorError {
141    if err.is_builder() {
142        ConnectorError::InvalidParams(format!("invalid URL: {url}"))
143    } else {
144        // Connection, timeout, and other transport errors are retryable
145        ConnectorError::Retryable(format!("HTTP request failed: {err}"))
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use std::io::{BufRead, BufReader, Write};
152    use std::net::TcpListener;
153
154    use serde_json::json;
155    use uuid::Uuid;
156    use worldinterface_core::id::{FlowRunId, NodeId, StepRunId};
157
158    use super::*;
159    use crate::context::{CancellationToken, InvocationContext};
160
161    fn test_ctx() -> InvocationContext {
162        InvocationContext {
163            flow_run_id: FlowRunId::new(),
164            node_id: NodeId::new(),
165            step_run_id: StepRunId::new(),
166            run_id: Uuid::new_v4(),
167            attempt_id: Uuid::new_v4(),
168            attempt_number: 1,
169            cancellation: CancellationToken::new(),
170        }
171    }
172
173    /// Spawn a minimal mock HTTP server that accepts one connection, reads the
174    /// request, and writes a canned response. Returns (url, join_handle).
175    /// The `handler` receives the raw request lines and returns (status_line, headers, body).
176    fn mock_server<F>(handler: F) -> (String, std::thread::JoinHandle<Vec<String>>)
177    where
178        F: FnOnce(&[String]) -> (String, Vec<(String, String)>, String) + Send + 'static,
179    {
180        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
181        let addr = listener.local_addr().unwrap();
182        let url = format!("http://127.0.0.1:{}", addr.port());
183
184        let handle = std::thread::spawn(move || {
185            let (mut stream, _) = listener.accept().unwrap();
186            let reader = BufReader::new(stream.try_clone().unwrap());
187
188            // Read request headers
189            let mut lines = Vec::new();
190            for line in reader.lines() {
191                let line = line.unwrap();
192                if line.is_empty() {
193                    break;
194                }
195                lines.push(line);
196            }
197
198            let (status, headers, body) = handler(&lines);
199
200            let mut response = format!("HTTP/1.1 {status}\r\n");
201            for (k, v) in &headers {
202                response.push_str(&format!("{k}: {v}\r\n"));
203            }
204            response.push_str(&format!("Content-Length: {}\r\n", body.len()));
205            response.push_str("\r\n");
206            response.push_str(&body);
207
208            stream.write_all(response.as_bytes()).unwrap();
209            stream.flush().unwrap();
210
211            lines
212        });
213
214        (url, handle)
215    }
216
217    fn simple_ok_handler(_lines: &[String]) -> (String, Vec<(String, String)>, String) {
218        ("200 OK".into(), vec![("Content-Type".into(), "text/plain".into())], "ok".into())
219    }
220
221    #[test]
222    fn http_get_success() {
223        let (url, handle) = mock_server(simple_ok_handler);
224        let ctx = test_ctx();
225        let result = HttpRequestConnector::new()
226            .invoke(&ctx, &json!({"url": url, "method": "GET"}))
227            .unwrap();
228
229        assert_eq!(result["status"], 200);
230        assert_eq!(result["body"], "ok");
231        handle.join().unwrap();
232    }
233
234    #[test]
235    fn http_post_with_body() {
236        let (url, handle) = mock_server(|lines| {
237            // Verify it's a POST
238            assert!(lines[0].starts_with("POST"));
239            (
240                "201 Created".into(),
241                vec![("Content-Type".into(), "application/json".into())],
242                r#"{"id":1}"#.into(),
243            )
244        });
245
246        let ctx = test_ctx();
247        let result = HttpRequestConnector::new()
248            .invoke(
249                &ctx,
250                &json!({
251                    "url": url,
252                    "method": "POST",
253                    "body": "request body"
254                }),
255            )
256            .unwrap();
257
258        assert_eq!(result["status"], 201);
259        assert_eq!(result["body"], r#"{"id":1}"#);
260        handle.join().unwrap();
261    }
262
263    #[test]
264    fn http_includes_idempotency_header() {
265        let (url, handle) = mock_server(|lines| {
266            // Find the idempotency key header
267            let has_key = lines.iter().any(|l| l.to_lowercase().starts_with("x-idempotency-key:"));
268            assert!(has_key, "X-Idempotency-Key header not found in: {lines:?}");
269            simple_ok_handler(lines)
270        });
271
272        let ctx = test_ctx();
273        HttpRequestConnector::new().invoke(&ctx, &json!({"url": url})).unwrap();
274        handle.join().unwrap();
275    }
276
277    #[test]
278    fn http_returns_response_headers() {
279        let (url, handle) = mock_server(|_| {
280            (
281                "200 OK".into(),
282                vec![
283                    ("Content-Type".into(), "text/plain".into()),
284                    ("X-Custom".into(), "custom-value".into()),
285                ],
286                "ok".into(),
287            )
288        });
289
290        let ctx = test_ctx();
291        let result = HttpRequestConnector::new().invoke(&ctx, &json!({"url": url})).unwrap();
292
293        let headers = result["headers"].as_object().unwrap();
294        assert_eq!(headers["x-custom"], "custom-value");
295        handle.join().unwrap();
296    }
297
298    #[test]
299    fn http_connection_refused_is_retryable() {
300        // Bind a port then drop the listener so it's closed
301        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
302        let port = listener.local_addr().unwrap().port();
303        drop(listener);
304
305        let ctx = test_ctx();
306        let result = HttpRequestConnector::new()
307            .invoke(&ctx, &json!({"url": format!("http://127.0.0.1:{port}"), "timeout_ms": 1000}));
308
309        assert!(matches!(result, Err(ConnectorError::Retryable(_))));
310    }
311
312    #[test]
313    fn http_invalid_url_is_invalid_params() {
314        let ctx = test_ctx();
315        let result = HttpRequestConnector::new().invoke(&ctx, &json!({"url": "not a valid url"}));
316        // reqwest may treat this as a builder error or connection error
317        assert!(result.is_err());
318    }
319
320    #[test]
321    fn http_missing_url_is_invalid_params() {
322        let ctx = test_ctx();
323        let result = HttpRequestConnector::new().invoke(&ctx, &json!({"method": "GET"}));
324        assert!(matches!(result, Err(ConnectorError::InvalidParams(_))));
325    }
326
327    #[test]
328    fn http_missing_method_defaults_to_get() {
329        let (url, handle) = mock_server(|lines| {
330            assert!(lines[0].starts_with("GET"));
331            simple_ok_handler(lines)
332        });
333
334        let ctx = test_ctx();
335        let result = HttpRequestConnector::new().invoke(&ctx, &json!({"url": url})).unwrap();
336
337        assert_eq!(result["status"], 200);
338        handle.join().unwrap();
339    }
340
341    #[test]
342    fn http_5xx_is_successful_output() {
343        let (url, handle) =
344            mock_server(|_| ("500 Internal Server Error".into(), vec![], "server error".into()));
345
346        let ctx = test_ctx();
347        let result = HttpRequestConnector::new().invoke(&ctx, &json!({"url": url})).unwrap();
348
349        assert_eq!(result["status"], 500);
350        assert_eq!(result["body"], "server error");
351        handle.join().unwrap();
352    }
353
354    #[test]
355    fn http_4xx_is_successful_output() {
356        let (url, handle) = mock_server(|_| ("404 Not Found".into(), vec![], "not found".into()));
357
358        let ctx = test_ctx();
359        let result = HttpRequestConnector::new().invoke(&ctx, &json!({"url": url})).unwrap();
360
361        assert_eq!(result["status"], 404);
362        assert_eq!(result["body"], "not found");
363        handle.join().unwrap();
364    }
365
366    #[test]
367    fn http_timeout_is_retryable() {
368        let (url, _handle) = mock_server(|_| {
369            // Sleep longer than the timeout
370            std::thread::sleep(std::time::Duration::from_secs(5));
371            simple_ok_handler(&[])
372        });
373
374        let ctx = test_ctx();
375        let result =
376            HttpRequestConnector::new().invoke(&ctx, &json!({"url": url, "timeout_ms": 100}));
377
378        assert!(matches!(result, Err(ConnectorError::Retryable(_))));
379    }
380
381    #[test]
382    fn http_descriptor() {
383        let desc = HttpRequestConnector::new().describe();
384        assert_eq!(desc.name, "http.request");
385        assert_eq!(desc.category, ConnectorCategory::Http);
386        assert!(!desc.idempotent);
387        assert!(desc.side_effects);
388        assert!(desc.input_schema.is_some());
389        assert!(desc.output_schema.is_some());
390    }
391}