worldinterface_connector/connectors/
http_request.rs1use std::collections::HashMap;
7use std::time::Duration;
8
9use serde_json::{json, Value};
10use worldinterface_core::descriptor::{ConnectorCategory, Descriptor};
11
12use crate::context::InvocationContext;
13use crate::error::ConnectorError;
14use crate::traits::Connector;
15
16pub struct HttpRequestConnector {
19 client: reqwest::blocking::Client,
20}
21
22impl HttpRequestConnector {
23 pub fn new() -> Self {
24 Self {
25 client: reqwest::blocking::Client::builder()
26 .build()
27 .expect("failed to build HTTP client"),
28 }
29 }
30}
31
32impl Default for HttpRequestConnector {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl Connector for HttpRequestConnector {
39 fn describe(&self) -> Descriptor {
40 Descriptor {
41 name: "http.request".into(),
42 display_name: "HTTP Request".into(),
43 description: "Makes an HTTP request to an external URL.".into(),
44 category: ConnectorCategory::Http,
45 input_schema: Some(json!({
46 "type": "object",
47 "required": ["url"],
48 "properties": {
49 "method": { "type": "string", "enum": ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD"] },
50 "url": { "type": "string" },
51 "headers": { "type": "object" },
52 "body": {},
53 "timeout_ms": { "type": "integer", "minimum": 0 }
54 }
55 })),
56 output_schema: Some(json!({
57 "type": "object",
58 "properties": {
59 "status": { "type": "integer" },
60 "headers": { "type": "object" },
61 "body": { "type": "string" }
62 }
63 })),
64 idempotent: false,
65 side_effects: true,
66 }
67 }
68
69 fn invoke(&self, ctx: &InvocationContext, params: &Value) -> Result<Value, ConnectorError> {
70 if ctx.cancellation.is_cancelled() {
72 return Err(ConnectorError::Cancelled);
73 }
74
75 let url = params.get("url").and_then(Value::as_str).ok_or_else(|| {
76 ConnectorError::InvalidParams("missing or invalid 'url' (expected string)".into())
77 })?;
78
79 let method_str = params.get("method").and_then(Value::as_str).unwrap_or("GET");
80
81 let method: reqwest::Method = method_str.parse().map_err(|_| {
82 ConnectorError::InvalidParams(format!("unknown HTTP method: '{method_str}'"))
83 })?;
84
85 let mut request = self.client.request(method, url);
87
88 request = request.header("X-Idempotency-Key", ctx.run_id.to_string());
90
91 if let Some(headers) = params.get("headers").and_then(Value::as_object) {
93 for (key, value) in headers {
94 if let Some(val_str) = value.as_str() {
95 request = request.header(key.as_str(), val_str);
96 }
97 }
98 }
99
100 if let Some(body) = params.get("body") {
102 match body {
103 Value::String(s) => {
104 request = request.body(s.clone());
105 }
106 other => {
107 request =
108 request.header("Content-Type", "application/json").body(other.to_string());
109 }
110 }
111 }
112
113 if let Some(timeout_ms) = params.get("timeout_ms").and_then(Value::as_u64) {
115 request = request.timeout(Duration::from_millis(timeout_ms));
116 }
117
118 let response = request.send().map_err(|e| classify_reqwest_error(&e, url))?;
120
121 let status = response.status().as_u16();
123 let response_headers: HashMap<String, String> = response
124 .headers()
125 .iter()
126 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
127 .collect();
128 let body = response
129 .text()
130 .map_err(|e| ConnectorError::Retryable(format!("failed to read response body: {e}")))?;
131
132 Ok(json!({
133 "status": status,
134 "headers": response_headers,
135 "body": body,
136 }))
137 }
138}
139
140fn classify_reqwest_error(err: &reqwest::Error, url: &str) -> ConnectorError {
141 if err.is_builder() {
142 ConnectorError::InvalidParams(format!("invalid URL: {url}"))
143 } else {
144 ConnectorError::Retryable(format!("HTTP request failed: {err}"))
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use std::io::{BufRead, BufReader, Write};
152 use std::net::TcpListener;
153
154 use serde_json::json;
155 use uuid::Uuid;
156 use worldinterface_core::id::{FlowRunId, NodeId, StepRunId};
157
158 use super::*;
159 use crate::context::{CancellationToken, InvocationContext};
160
161 fn test_ctx() -> InvocationContext {
162 InvocationContext {
163 flow_run_id: FlowRunId::new(),
164 node_id: NodeId::new(),
165 step_run_id: StepRunId::new(),
166 run_id: Uuid::new_v4(),
167 attempt_id: Uuid::new_v4(),
168 attempt_number: 1,
169 cancellation: CancellationToken::new(),
170 }
171 }
172
173 fn mock_server<F>(handler: F) -> (String, std::thread::JoinHandle<Vec<String>>)
177 where
178 F: FnOnce(&[String]) -> (String, Vec<(String, String)>, String) + Send + 'static,
179 {
180 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
181 let addr = listener.local_addr().unwrap();
182 let url = format!("http://127.0.0.1:{}", addr.port());
183
184 let handle = std::thread::spawn(move || {
185 let (mut stream, _) = listener.accept().unwrap();
186 let reader = BufReader::new(stream.try_clone().unwrap());
187
188 let mut lines = Vec::new();
190 for line in reader.lines() {
191 let line = line.unwrap();
192 if line.is_empty() {
193 break;
194 }
195 lines.push(line);
196 }
197
198 let (status, headers, body) = handler(&lines);
199
200 let mut response = format!("HTTP/1.1 {status}\r\n");
201 for (k, v) in &headers {
202 response.push_str(&format!("{k}: {v}\r\n"));
203 }
204 response.push_str(&format!("Content-Length: {}\r\n", body.len()));
205 response.push_str("\r\n");
206 response.push_str(&body);
207
208 stream.write_all(response.as_bytes()).unwrap();
209 stream.flush().unwrap();
210
211 lines
212 });
213
214 (url, handle)
215 }
216
217 fn simple_ok_handler(_lines: &[String]) -> (String, Vec<(String, String)>, String) {
218 ("200 OK".into(), vec![("Content-Type".into(), "text/plain".into())], "ok".into())
219 }
220
221 #[test]
222 fn http_get_success() {
223 let (url, handle) = mock_server(simple_ok_handler);
224 let ctx = test_ctx();
225 let result = HttpRequestConnector::new()
226 .invoke(&ctx, &json!({"url": url, "method": "GET"}))
227 .unwrap();
228
229 assert_eq!(result["status"], 200);
230 assert_eq!(result["body"], "ok");
231 handle.join().unwrap();
232 }
233
234 #[test]
235 fn http_post_with_body() {
236 let (url, handle) = mock_server(|lines| {
237 assert!(lines[0].starts_with("POST"));
239 (
240 "201 Created".into(),
241 vec![("Content-Type".into(), "application/json".into())],
242 r#"{"id":1}"#.into(),
243 )
244 });
245
246 let ctx = test_ctx();
247 let result = HttpRequestConnector::new()
248 .invoke(
249 &ctx,
250 &json!({
251 "url": url,
252 "method": "POST",
253 "body": "request body"
254 }),
255 )
256 .unwrap();
257
258 assert_eq!(result["status"], 201);
259 assert_eq!(result["body"], r#"{"id":1}"#);
260 handle.join().unwrap();
261 }
262
263 #[test]
264 fn http_includes_idempotency_header() {
265 let (url, handle) = mock_server(|lines| {
266 let has_key = lines.iter().any(|l| l.to_lowercase().starts_with("x-idempotency-key:"));
268 assert!(has_key, "X-Idempotency-Key header not found in: {lines:?}");
269 simple_ok_handler(lines)
270 });
271
272 let ctx = test_ctx();
273 HttpRequestConnector::new().invoke(&ctx, &json!({"url": url})).unwrap();
274 handle.join().unwrap();
275 }
276
277 #[test]
278 fn http_returns_response_headers() {
279 let (url, handle) = mock_server(|_| {
280 (
281 "200 OK".into(),
282 vec![
283 ("Content-Type".into(), "text/plain".into()),
284 ("X-Custom".into(), "custom-value".into()),
285 ],
286 "ok".into(),
287 )
288 });
289
290 let ctx = test_ctx();
291 let result = HttpRequestConnector::new().invoke(&ctx, &json!({"url": url})).unwrap();
292
293 let headers = result["headers"].as_object().unwrap();
294 assert_eq!(headers["x-custom"], "custom-value");
295 handle.join().unwrap();
296 }
297
298 #[test]
299 fn http_connection_refused_is_retryable() {
300 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
302 let port = listener.local_addr().unwrap().port();
303 drop(listener);
304
305 let ctx = test_ctx();
306 let result = HttpRequestConnector::new()
307 .invoke(&ctx, &json!({"url": format!("http://127.0.0.1:{port}"), "timeout_ms": 1000}));
308
309 assert!(matches!(result, Err(ConnectorError::Retryable(_))));
310 }
311
312 #[test]
313 fn http_invalid_url_is_invalid_params() {
314 let ctx = test_ctx();
315 let result = HttpRequestConnector::new().invoke(&ctx, &json!({"url": "not a valid url"}));
316 assert!(result.is_err());
318 }
319
320 #[test]
321 fn http_missing_url_is_invalid_params() {
322 let ctx = test_ctx();
323 let result = HttpRequestConnector::new().invoke(&ctx, &json!({"method": "GET"}));
324 assert!(matches!(result, Err(ConnectorError::InvalidParams(_))));
325 }
326
327 #[test]
328 fn http_missing_method_defaults_to_get() {
329 let (url, handle) = mock_server(|lines| {
330 assert!(lines[0].starts_with("GET"));
331 simple_ok_handler(lines)
332 });
333
334 let ctx = test_ctx();
335 let result = HttpRequestConnector::new().invoke(&ctx, &json!({"url": url})).unwrap();
336
337 assert_eq!(result["status"], 200);
338 handle.join().unwrap();
339 }
340
341 #[test]
342 fn http_5xx_is_successful_output() {
343 let (url, handle) =
344 mock_server(|_| ("500 Internal Server Error".into(), vec![], "server error".into()));
345
346 let ctx = test_ctx();
347 let result = HttpRequestConnector::new().invoke(&ctx, &json!({"url": url})).unwrap();
348
349 assert_eq!(result["status"], 500);
350 assert_eq!(result["body"], "server error");
351 handle.join().unwrap();
352 }
353
354 #[test]
355 fn http_4xx_is_successful_output() {
356 let (url, handle) = mock_server(|_| ("404 Not Found".into(), vec![], "not found".into()));
357
358 let ctx = test_ctx();
359 let result = HttpRequestConnector::new().invoke(&ctx, &json!({"url": url})).unwrap();
360
361 assert_eq!(result["status"], 404);
362 assert_eq!(result["body"], "not found");
363 handle.join().unwrap();
364 }
365
366 #[test]
367 fn http_timeout_is_retryable() {
368 let (url, _handle) = mock_server(|_| {
369 std::thread::sleep(std::time::Duration::from_secs(5));
371 simple_ok_handler(&[])
372 });
373
374 let ctx = test_ctx();
375 let result =
376 HttpRequestConnector::new().invoke(&ctx, &json!({"url": url, "timeout_ms": 100}));
377
378 assert!(matches!(result, Err(ConnectorError::Retryable(_))));
379 }
380
381 #[test]
382 fn http_descriptor() {
383 let desc = HttpRequestConnector::new().describe();
384 assert_eq!(desc.name, "http.request");
385 assert_eq!(desc.category, ConnectorCategory::Http);
386 assert!(!desc.idempotent);
387 assert!(desc.side_effects);
388 assert!(desc.input_schema.is_some());
389 assert!(desc.output_schema.is_some());
390 }
391}