dataflow_rs/engine/functions/
http.rs

1use crate::engine::error::{DataflowError, Result};
2use crate::engine::functions::AsyncFunctionHandler;
3use crate::engine::message::{Change, Message};
4use async_trait::async_trait;
5use reqwest::{
6    header::{HeaderMap, HeaderName, HeaderValue},
7    Client, Method,
8};
9use serde_json::{json, Value};
10use std::convert::TryFrom;
11use std::str::FromStr;
12use std::time::Duration;
13
14/// An HTTP task function for making API requests asynchronously.
15///
16/// This implementation uses the async reqwest client for efficient non-blocking HTTP requests.
17/// It supports different HTTP methods, headers, and parsing responses
18/// into the message payload.
19pub struct HttpFunction {
20    client: Client,
21}
22
23impl HttpFunction {
24    pub fn new(timeout_secs: u64) -> Self {
25        let client = Client::builder()
26            .timeout(Duration::from_secs(timeout_secs))
27            .build()
28            .expect("Failed to create HTTP client");
29
30        Self { client }
31    }
32}
33
34#[async_trait]
35impl AsyncFunctionHandler for HttpFunction {
36    async fn execute(&self, message: &mut Message, input: &Value) -> Result<(usize, Vec<Change>)> {
37        // Extract URL
38        let url = input
39            .get("url")
40            .and_then(Value::as_str)
41            .ok_or_else(|| DataflowError::Validation("URL is required".to_string()))?;
42
43        // Extract method (default to GET)
44        let method_str = input.get("method").and_then(Value::as_str).unwrap_or("GET");
45
46        let method = Method::from_str(method_str)
47            .map_err(|e| DataflowError::Validation(format!("Invalid HTTP method: {}", e)))?;
48
49        // Determine whether this method supports a body
50        let supports_body = method != Method::GET && method != Method::HEAD;
51
52        // Build the request
53        let mut request = self.client.request(method, url);
54
55        // Add headers if present
56        if let Some(headers) = input.get("headers").and_then(Value::as_object) {
57            let mut header_map = HeaderMap::new();
58
59            for (key, value) in headers {
60                if let Some(value_str) = value.as_str() {
61                    let header_name = HeaderName::try_from(key).map_err(|e| {
62                        DataflowError::Validation(format!("Invalid header name '{}': {}", key, e))
63                    })?;
64
65                    let header_value = HeaderValue::try_from(value_str).map_err(|e| {
66                        DataflowError::Validation(format!(
67                            "Invalid header value '{}': {}",
68                            value_str, e
69                        ))
70                    })?;
71
72                    header_map.insert(header_name, header_value);
73                }
74            }
75
76            request = request.headers(header_map);
77        }
78
79        // Add body if present for methods that support it
80        if let Some(body) = input.get("body") {
81            if supports_body {
82                request = request.json(body);
83            }
84        }
85
86        // Make the request asynchronously
87        let response = request.send().await.map_err(|e| {
88            if e.is_timeout() {
89                DataflowError::Timeout(format!("HTTP request timed out: {}", e))
90            } else if e.is_connect() {
91                DataflowError::Http {
92                    status: 0,
93                    message: format!("Connection error: {}", e),
94                }
95            } else {
96                DataflowError::Http {
97                    status: e.status().map_or(0, |s| s.as_u16()),
98                    message: format!("HTTP request failed: {}", e),
99                }
100            }
101        })?;
102
103        // Get status code
104        let status = response.status();
105        let status_code = status.as_u16() as usize;
106
107        // Parse the response asynchronously
108        let response_body = response.text().await.map_err(|e| DataflowError::Http {
109            status: status.as_u16(),
110            message: format!("Failed to read response body: {}", e),
111        })?;
112
113        // Try to parse as JSON, but fall back to string if it's not valid JSON
114        let response_value =
115            serde_json::from_str::<Value>(&response_body).unwrap_or_else(|_| json!(response_body));
116
117        // Store response in message temp data
118        message.temp_data = json!({
119            "status": status_code,
120            "body": response_value,
121            "success": status.is_success(),
122        });
123
124        // Return changes
125        Ok((
126            status_code,
127            vec![Change {
128                path: "temp_data".to_string(),
129                old_value: Value::Null,
130                new_value: message.temp_data.clone(),
131            }],
132        ))
133    }
134}