dataflow_rs/engine/functions/
http.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::AsyncFunctionHandler;
3use crate::engine::message::{Change, Message};
4use async_trait::async_trait;
5use reqwest::{
6    header::{HeaderMap, HeaderName, HeaderValue},
7    Client, Method,
8};
9use serde_json::{json, Value};
10use std::convert::TryFrom;
11use std::str::FromStr;
12use std::time::Duration;
13
14/// An HTTP task function for making API requests asynchronously.
15///
16/// This implementation uses the async reqwest client for efficient non-blocking HTTP requests.
17/// It supports different HTTP methods, headers, and parsing responses
18/// into the message payload.
19pub struct HttpFunction {
20    client: Client,
21}
22
23impl HttpFunction {
24    pub fn new(timeout_secs: u64) -> Self {
25        let client = Client::builder()
26            .timeout(Duration::from_secs(timeout_secs))
27            .build()
28            .expect("Failed to create HTTP client");
29
30        Self { client }
31    }
32}
33
34#[async_trait]
35impl AsyncFunctionHandler for HttpFunction {
36    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
37        // Extract URL
38        let url = input
39            .get("url")
40            .and_then(Value::as_str)
41            .ok_or_else(|| DataflowError::Validation("URL is required".to_string()))?;
42
43        // Extract method (default to GET)
44        let method_str = input.get("method").and_then(Value::as_str).unwrap_or("GET");
45
46        let method = Method::from_str(method_str)
47            .map_err(|e| DataflowError::Validation(format!("Invalid HTTP method: {e}")))?;
48
49        // Determine whether this method supports a body
50        let supports_body = method != Method::GET && method != Method::HEAD;
51
52        // Build the request
53        let mut request = self.client.request(method, url);
54
55        // Add headers if present
56        if let Some(headers) = input.get("headers").and_then(Value::as_object) {
57            let mut header_map = HeaderMap::new();
58
59            for (key, value) in headers {
60                if let Some(value_str) = value.as_str() {
61                    let header_name = HeaderName::try_from(key).map_err(|e| {
62                        DataflowError::Validation(format!("Invalid header name '{key}': {e}"))
63                    })?;
64
65                    let header_value = HeaderValue::try_from(value_str).map_err(|e| {
66                        DataflowError::Validation(format!(
67                            "Invalid header value '{value_str}': {e}"
68                        ))
69                    })?;
70
71                    header_map.insert(header_name, header_value);
72                }
73            }
74
75            request = request.headers(header_map);
76        }
77
78        // Add body if present for methods that support it
79        if let Some(body) = input.get("body") {
80            if supports_body {
81                request = request.json(body);
82            }
83        }
84
85        // Make the request asynchronously
86        let response = request.send().await.map_err(|e| {
87            if e.is_timeout() {
88                DataflowError::Timeout(format!("HTTP request timed out: {e}"))
89            } else if e.is_connect() {
90                DataflowError::Http {
91                    status: 0,
92                    message: format!("Connection error: {e}"),
93                }
94            } else {
95                DataflowError::Http {
96                    status: e.status().map_or(0, |s| s.as_u16()),
97                    message: format!("HTTP request failed: {e}"),
98                }
99            }
100        })?;
101
102        // Get status code
103        let status = response.status();
104        let status_code = status.as_u16() as usize;
105
106        // Parse the response asynchronously
107        let response_body = response.text().await.map_err(|e| DataflowError::Http {
108            status: status.as_u16(),
109            message: format!("Failed to read response body: {e}"),
110        })?;
111
112        // Try to parse as JSON, but fall back to string if it's not valid JSON
113        let response_value =
114            serde_json::from_str::<Value>(&response_body).unwrap_or_else(|_| json!(response_body));
115
116        // Store response in message temp data
117        message.temp_data = json!({
118            "status": status_code,
119            "body": response_value,
120            "success": status.is_success(),
121        });
122
123        // Return changes
124        Ok((
125            status_code,
126            vec![Change {
127                path: "temp_data".to_string(),
128                old_value: Value::Null,
129                new_value: message.temp_data.clone(),
130            }],
131        ))
132    }
133}