serverless_fn/transport/http/
mod.rs1use std::collections::HashMap;
4use std::time::Duration;
5
6use async_trait::async_trait;
7use reqwest::Url;
8
9use crate::error::ServerlessError;
10use crate::transport::Transport;
11
12pub struct HttpTransport {
14 base_url: String,
15 client: reqwest::Client,
16}
17
18impl HttpTransport {
19 #[must_use]
21 pub fn new(base_url: String, timeout: Duration, retries: u32) -> Self {
22 let retry_policy = {
23 let url = Url::parse(&base_url).unwrap();
24 let host = url.host().unwrap();
25 reqwest::retry::for_host(format!("{host}"))
26 .max_retries_per_request(retries)
27 .classify_fn(|req_rep| {
28 if req_rep.error().is_some()
29 || matches!(
30 req_rep.status(),
31 Some(http::StatusCode::SERVICE_UNAVAILABLE)
32 )
33 {
34 req_rep.retryable()
35 } else {
36 req_rep.success()
37 }
38 })
39 };
40 let client = reqwest::Client::builder()
41 .timeout(timeout)
42 .retry(retry_policy)
43 .build()
44 .unwrap_or_else(|_| reqwest::Client::new());
45
46 Self { base_url, client }
47 }
48}
49
50#[async_trait]
51impl Transport for HttpTransport {
52 async fn call(
53 &self,
54 function_name: &str,
55 payload: Vec<u8>,
56 headers: Option<HashMap<String, String>>,
57 ) -> Result<Vec<u8>, ServerlessError> {
58 let url = format!("{}/{}", self.base_url, function_name);
59
60 let mut request_builder = self.client.post(&url).body(payload.clone());
61
62 if let Some(hdrs) = headers.clone() {
63 for (key, value) in hdrs {
64 request_builder = request_builder.header(&key, value);
65 }
66 }
67
68 match request_builder.send().await {
69 Ok(response) => {
70 let status = response.status();
71 if status.is_success() {
72 let bytes = response
73 .bytes()
74 .await
75 .map_err(|e| ServerlessError::Transport(e.to_string()))?;
76 return Ok(bytes.to_vec());
77 }
78
79 let error_body = response
80 .text()
81 .await
82 .unwrap_or_else(|_| format!("HTTP error: {}", status));
83 Err(ServerlessError::RemoteExecution(error_body))
84 }
85 Err(e) => Err(ServerlessError::RemoteExecution(format!("{e}"))),
86 }
87 }
88}
89
90#[must_use]
92pub fn create_http_transport(timeout: Duration, retries: u32) -> Box<dyn Transport> {
93 let base_url = std::env::var("SERVERLESS_BASE_URL")
94 .unwrap_or_else(|_| "http://localhost:3000".to_string());
95 Box::new(HttpTransport::new(base_url, timeout, retries))
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101
102 #[test]
103 fn test_http_transport_creation() {
104 let _transport = create_http_transport(Duration::from_secs(30), 3);
105 }
107}