dataflow_rs/engine/functions/
http.rs1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::AsyncFunctionHandler;
3use crate::engine::message::{Change, Message};
4use async_trait::async_trait;
5use reqwest::{
6 header::{HeaderMap, HeaderName, HeaderValue},
7 Client, Method,
8};
9use serde_json::{json, Value};
10use std::convert::TryFrom;
11use std::str::FromStr;
12use std::time::Duration;
13
14pub struct HttpFunction {
20 client: Client,
21}
22
23impl HttpFunction {
24 pub fn new(timeout_secs: u64) -> Self {
25 let client = Client::builder()
26 .timeout(Duration::from_secs(timeout_secs))
27 .build()
28 .expect("Failed to create HTTP client");
29
30 Self { client }
31 }
32}
33
34#[async_trait]
35impl AsyncFunctionHandler for HttpFunction {
36 async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
37 let url = input
39 .get("url")
40 .and_then(Value::as_str)
41 .ok_or_else(|| DataflowError::Validation("URL is required".to_string()))?;
42
43 let method_str = input.get("method").and_then(Value::as_str).unwrap_or("GET");
45
46 let method = Method::from_str(method_str)
47 .map_err(|e| DataflowError::Validation(format!("Invalid HTTP method: {e}")))?;
48
49 let supports_body = method != Method::GET && method != Method::HEAD;
51
52 let mut request = self.client.request(method, url);
54
55 if let Some(headers) = input.get("headers").and_then(Value::as_object) {
57 let mut header_map = HeaderMap::new();
58
59 for (key, value) in headers {
60 if let Some(value_str) = value.as_str() {
61 let header_name = HeaderName::try_from(key).map_err(|e| {
62 DataflowError::Validation(format!("Invalid header name '{key}': {e}"))
63 })?;
64
65 let header_value = HeaderValue::try_from(value_str).map_err(|e| {
66 DataflowError::Validation(format!(
67 "Invalid header value '{value_str}': {e}"
68 ))
69 })?;
70
71 header_map.insert(header_name, header_value);
72 }
73 }
74
75 request = request.headers(header_map);
76 }
77
78 if let Some(body) = input.get("body") {
80 if supports_body {
81 request = request.json(body);
82 }
83 }
84
85 let response = request.send().await.map_err(|e| {
87 if e.is_timeout() {
88 DataflowError::Timeout(format!("HTTP request timed out: {e}"))
89 } else if e.is_connect() {
90 DataflowError::Http {
91 status: 0,
92 message: format!("Connection error: {e}"),
93 }
94 } else {
95 DataflowError::Http {
96 status: e.status().map_or(0, |s| s.as_u16()),
97 message: format!("HTTP request failed: {e}"),
98 }
99 }
100 })?;
101
102 let status = response.status();
104 let status_code = status.as_u16() as usize;
105
106 let response_body = response.text().await.map_err(|e| DataflowError::Http {
108 status: status.as_u16(),
109 message: format!("Failed to read response body: {e}"),
110 })?;
111
112 let response_value =
114 serde_json::from_str::<Value>(&response_body).unwrap_or_else(|_| json!(response_body));
115
116 message.temp_data = json!({
118 "status": status_code,
119 "body": response_value,
120 "success": status.is_success(),
121 });
122
123 Ok((
125 status_code,
126 vec![Change {
127 path: "temp_data".to_string(),
128 old_value: Value::Null,
129 new_value: message.temp_data.clone(),
130 }],
131 ))
132 }
133}