Skip to main content

serverless_fn/transport/http/
mod.rs

1//! HTTP transport implementation.
2
3use std::collections::HashMap;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use reqwest::Url;
8
9use crate::error::ServerlessError;
10use crate::transport::Transport;
11
12/// HTTP transport implementation.
13pub struct HttpTransport {
14    base_url: String,
15    client: reqwest::Client,
16}
17
18impl HttpTransport {
19    /// Creates a new HTTP transport with the specified base URL.
20    #[must_use]
21    pub fn new(base_url: String, timeout: Duration, retries: u32) -> Self {
22        let retry_policy = {
23            let url = Url::parse(&base_url).unwrap();
24            let host = url.host().unwrap();
25            reqwest::retry::for_host(format!("{host}"))
26                .max_retries_per_request(retries)
27                .classify_fn(|req_rep| {
28                    if req_rep.error().is_some()
29                        || matches!(
30                            req_rep.status(),
31                            Some(http::StatusCode::SERVICE_UNAVAILABLE)
32                        )
33                    {
34                        req_rep.retryable()
35                    } else {
36                        req_rep.success()
37                    }
38                })
39        };
40        let client = reqwest::Client::builder()
41            .timeout(timeout)
42            .retry(retry_policy)
43            .build()
44            .unwrap_or_else(|_| reqwest::Client::new());
45
46        Self { base_url, client }
47    }
48}
49
50#[async_trait]
51impl Transport for HttpTransport {
52    async fn call(
53        &self,
54        function_name: &str,
55        payload: Vec<u8>,
56        headers: Option<HashMap<String, String>>,
57    ) -> Result<Vec<u8>, ServerlessError> {
58        let url = format!("{}/{}", self.base_url, function_name);
59
60        let mut request_builder = self.client.post(&url).body(payload.clone());
61
62        if let Some(hdrs) = headers.clone() {
63            for (key, value) in hdrs {
64                request_builder = request_builder.header(&key, value);
65            }
66        }
67
68        match request_builder.send().await {
69            Ok(response) => {
70                let status = response.status();
71                if status.is_success() {
72                    let bytes = response
73                        .bytes()
74                        .await
75                        .map_err(|e| ServerlessError::Transport(e.to_string()))?;
76                    return Ok(bytes.to_vec());
77                }
78
79                let error_body = response
80                    .text()
81                    .await
82                    .unwrap_or_else(|_| format!("HTTP error: {}", status));
83                Err(ServerlessError::RemoteExecution(error_body))
84            }
85            Err(e) => Err(ServerlessError::RemoteExecution(format!("{e}"))),
86        }
87    }
88}
89
90/// Creates HTTP transport with default configuration.
91#[must_use]
92pub fn create_http_transport(timeout: Duration, retries: u32) -> Box<dyn Transport> {
93    let base_url = std::env::var("SERVERLESS_BASE_URL")
94        .unwrap_or_else(|_| "http://localhost:3000".to_string());
95    Box::new(HttpTransport::new(base_url, timeout, retries))
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101
102    #[test]
103    fn test_http_transport_creation() {
104        let _transport = create_http_transport(Duration::from_secs(30), 3);
105        // Basic test to ensure transport can be created
106    }
107}