dataflow_rs/engine/functions/
http.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::AsyncFunctionHandler;
3use crate::engine::message::{Change, Message};
4use async_trait::async_trait;
5use reqwest::{
6 header::{HeaderMap, HeaderName, HeaderValue},
7 Client, Method,
8};
9use serde_json::{json, Value};
10use std::convert::TryFrom;
11use std::str::FromStr;
12use std::time::Duration;
13
14pub struct HttpFunction {
20 client: Client,
21}
22
23impl HttpFunction {
24 pub fn new(timeout_secs: u64) -> Self {
25 let client = Client::builder()
26 .timeout(Duration::from_secs(timeout_secs))
27 .build()
28 .expect("Failed to create HTTP client");
29
30 Self { client }
31 }
32}
33
34#[async_trait]
35impl AsyncFunctionHandler for HttpFunction {
36 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
37 let url = input
39 .get("url")
40 .and_then(Value::as_str)
41 .ok_or_else(|| DataflowError::Validation("URL is required".to_string()))?;
42
43 let method_str = input.get("method").and_then(Value::as_str).unwrap_or("GET");
45
46 let method = Method::from_str(method_str)
47 .map_err(|e| DataflowError::Validation(format!("Invalid HTTP method: {}", e)))?;
48
49 let supports_body = method != Method::GET && method != Method::HEAD;
51
52 let mut request = self.client.request(method, url);
54
55 if let Some(headers) = input.get("headers").and_then(Value::as_object) {
57 let mut header_map = HeaderMap::new();
58
59 for (key, value) in headers {
60 if let Some(value_str) = value.as_str() {
61 let header_name = HeaderName::try_from(key).map_err(|e| {
62 DataflowError::Validation(format!("Invalid header name '{}': {}", key, e))
63 })?;
64
65 let header_value = HeaderValue::try_from(value_str).map_err(|e| {
66 DataflowError::Validation(format!(
67 "Invalid header value '{}': {}",
68 value_str, e
69 ))
70 })?;
71
72 header_map.insert(header_name, header_value);
73 }
74 }
75
76 request = request.headers(header_map);
77 }
78
79 if let Some(body) = input.get("body") {
81 if supports_body {
82 request = request.json(body);
83 }
84 }
85
86 let response = request.send().await.map_err(|e| {
88 if e.is_timeout() {
89 DataflowError::Timeout(format!("HTTP request timed out: {}", e))
90 } else if e.is_connect() {
91 DataflowError::Http {
92 status: 0,
93 message: format!("Connection error: {}", e),
94 }
95 } else {
96 DataflowError::Http {
97 status: e.status().map_or(0, |s| s.as_u16()),
98 message: format!("HTTP request failed: {}", e),
99 }
100 }
101 })?;
102
103 let status = response.status();
105 let status_code = status.as_u16() as usize;
106
107 let response_body = response.text().await.map_err(|e| DataflowError::Http {
109 status: status.as_u16(),
110 message: format!("Failed to read response body: {}", e),
111 })?;
112
113 let response_value =
115 serde_json::from_str::<Value>(&response_body).unwrap_or_else(|_| json!(response_body));
116
117 message.temp_data = json!({
119 "status": status_code,
120 "body": response_value,
121 "success": status.is_success(),
122 });
123
124 Ok((
126 status_code,
127 vec![Change {
128 path: "temp_data".to_string(),
129 old_value: Value::Null,
130 new_value: message.temp_data.clone(),
131 }],
132 ))
133 }
134}