Skip to main content

edgebase_core/
http_client.rs

1//! EdgeBase Rust SDK — Internal HTTP client (#133: set_context / X-EdgeBase-Context removed)
2
3use crate::Error;
4use reqwest::{Client, Method};
5use serde_json::Value;
6use std::time::Duration;
7
8pub struct HttpClient {
9    client: Client,
10    base_url: String,
11    service_key: String,
12    #[cfg_attr(not(test), allow(dead_code))]
13    timeout_ms: Option<u64>,
14}
15
16fn parse_timeout_ms(raw: Option<&str>) -> Option<u64> {
17    raw.and_then(|value| value.trim().parse::<u64>().ok())
18        .filter(|value| *value > 0)
19}
20
21impl HttpClient {
22    pub fn new(base_url: &str, service_key: &str) -> Result<Self, Error> {
23        let url = base_url.trim_end_matches('/').to_string();
24        let timeout_ms = parse_timeout_ms(std::env::var("EDGEBASE_HTTP_TIMEOUT_MS").ok().as_deref());
25        let mut builder = Client::builder();
26        if let Some(timeout_ms) = timeout_ms {
27            builder = builder.timeout(Duration::from_millis(timeout_ms));
28        }
29        Ok(Self {
30            client: builder.build()?,
31            base_url: url,
32            service_key: service_key.to_string(),
33            timeout_ms,
34        })
35    }
36
37    pub fn base_url(&self) -> &str {
38        &self.base_url
39    }
40
41    #[cfg_attr(not(test), allow(dead_code))]
42    pub(crate) fn timeout_ms(&self) -> Option<u64> {
43        self.timeout_ms
44    }
45
46    #[cfg_attr(not(test), allow(dead_code))]
47    pub(crate) fn parse_timeout_ms_for_tests(raw: Option<&str>) -> Option<u64> {
48        parse_timeout_ms(raw)
49    }
50
51    fn build_request(&self, method: Method, path: &str) -> reqwest::RequestBuilder {
52        let url = format!("{}{}", self.base_url, path);
53        let mut req = self.client.request(method, &url);
54        // Token refresh failed — proceed as unauthenticated
55        if let Ok(key) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| self.service_key.clone())) {
56            if !key.is_empty() {
57                req = req.header("X-EdgeBase-Service-Key", &key);
58                req = req.header("Authorization", format!("Bearer {}", key));
59            }
60        }
61        req
62    }
63
64    async fn send(&self, req: reqwest::RequestBuilder) -> Result<Value, Error> {
65        let resp = req.send().await?;
66        let status = resp.status();
67        let text = resp.text().await?;
68        if !status.is_success() {
69            let msg = serde_json::from_str::<Value>(&text)
70                .ok()
71                .and_then(|v| {
72                    v.get("error")
73                        .or_else(|| v.get("message"))
74                        .and_then(|m| m.as_str())
75                        .map(|s| s.to_string())
76                })
77                .unwrap_or_else(|| text.clone());
78            return Err(Error::Api {
79                status: status.as_u16(),
80                message: msg,
81            });
82        }
83        if text.is_empty() {
84            return Ok(Value::Null);
85        }
86        Ok(serde_json::from_str(&text)?)
87    }
88
89    /// Send request with 429 retry and transport retry. Rebuilds request on each attempt.
90    async fn send_with_retry(
91        &self,
92        method: Method,
93        path: &str,
94        body: Option<&Value>,
95        query: Option<&std::collections::HashMap<String, String>>,
96    ) -> Result<Value, Error> {
97        let max_retries: usize = 3;
98        for attempt in 0..=max_retries {
99            let mut req = self.build_request(method.clone(), path);
100            if let Some(b) = body {
101                req = req.json(b);
102            }
103            if let Some(q) = query {
104                req = req.query(q);
105            }
106
107            match req.send().await {
108                Ok(resp) => {
109                    let status = resp.status();
110                    // 429 retry with Retry-After header
111                    if status.as_u16() == 429 && attempt < max_retries {
112                        let retry_after = resp
113                            .headers()
114                            .get("retry-after")
115                            .and_then(|v| v.to_str().ok())
116                            .and_then(|v| v.parse::<u64>().ok());
117                        let base_ms = retry_after.map(|s| s * 1000).unwrap_or(1000 * (1u64 << attempt));
118                        // Jitter: 0–25% of base delay (use simple pseudo-random from time nanos)
119                        let nanos = std::time::SystemTime::now()
120                            .duration_since(std::time::UNIX_EPOCH)
121                            .map(|d| d.subsec_nanos())
122                            .unwrap_or(0);
123                        let jitter = (base_ms as f64 * 0.25 * (nanos % 1_000_000) as f64 / 1_000_000.0) as u64;
124                        let delay = std::cmp::min(base_ms + jitter, 10000);
125                        tokio::time::sleep(Duration::from_millis(delay)).await;
126                        continue;
127                    }
128                    let text = resp.text().await?;
129                    if !status.is_success() {
130                        let msg = serde_json::from_str::<Value>(&text)
131                            .ok()
132                            .and_then(|v| {
133                                v.get("error")
134                                    .or_else(|| v.get("message"))
135                                    .and_then(|m| m.as_str())
136                                    .map(|s| s.to_string())
137                            })
138                            .unwrap_or_else(|| text.clone());
139                        return Err(Error::Api {
140                            status: status.as_u16(),
141                            message: msg,
142                        });
143                    }
144                    if text.is_empty() {
145                        return Ok(Value::Null);
146                    }
147                    return Ok(serde_json::from_str(&text)?);
148                }
149                Err(e) => {
150                    if attempt < 2 {
151                        tokio::time::sleep(Duration::from_millis(50 * (attempt as u64 + 1))).await;
152                        continue;
153                    }
154                    return Err(e.into());
155                }
156            }
157        }
158        Err(Error::Api {
159            status: 0,
160            message: "Request failed after retries".to_string(),
161        })
162    }
163
164    pub async fn get(&self, path: &str) -> Result<Value, Error> {
165        self.send_with_retry(Method::GET, path, None, None).await
166    }
167
168    pub async fn get_with_query(&self, path: &str, query: &std::collections::HashMap<String, String>) -> Result<Value, Error> {
169        self.send_with_retry(Method::GET, path, None, Some(query)).await
170    }
171
172    pub async fn post(&self, path: &str, body: &Value) -> Result<Value, Error> {
173        self.send_with_retry(Method::POST, path, Some(body), None).await
174    }
175
176    pub async fn post_with_query(&self, path: &str, body: &Value, query: &std::collections::HashMap<String, String>) -> Result<Value, Error> {
177        self.send_with_retry(Method::POST, path, Some(body), Some(query)).await
178    }
179
180    pub async fn patch(&self, path: &str, body: &Value) -> Result<Value, Error> {
181        self.send_with_retry(Method::PATCH, path, Some(body), None).await
182    }
183
184    pub async fn delete(&self, path: &str) -> Result<Value, Error> {
185        self.send_with_retry(Method::DELETE, path, None, None).await
186    }
187
188    pub async fn delete_with_body(&self, path: &str, body: &Value) -> Result<Value, Error> {
189        self.send_with_retry(Method::DELETE, path, Some(body), None).await
190    }
191
192    /// HEAD request — returns `true` if the resource exists (2xx status).
193    pub async fn head(&self, path: &str) -> Result<bool, Error> {
194        let req = self.build_request(Method::HEAD, path);
195        let resp = req.send().await?;
196        Ok(resp.status().is_success())
197    }
198
199    pub async fn put(&self, path: &str, body: &Value) -> Result<Value, Error> {
200        self.send_with_retry(Method::PUT, path, Some(body), None).await
201    }
202
203    pub async fn put_with_query(&self, path: &str, body: &Value, query: &std::collections::HashMap<String, String>) -> Result<Value, Error> {
204        self.send_with_retry(Method::PUT, path, Some(body), Some(query)).await
205    }
206
207    /// Multipart file upload.
208    pub async fn upload_multipart(
209        &self, path: &str, key: &str, data: Vec<u8>, content_type: &str,
210    ) -> Result<Value, Error> {
211        use reqwest::multipart::{Form, Part};
212        let part = Part::bytes(data)
213            .file_name(key.to_string())
214            .mime_str(content_type)
215            .map_err(|e| Error::Url(e.to_string()))?;
216        let form = Form::new()
217            .part("file", part)
218            .text("key", key.to_string());
219        let url = format!("{}{}", self.base_url, path);
220        let mut req = self.client.post(&url).multipart(form);
221        // Token refresh failed — proceed as unauthenticated
222        if let Ok(key) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| self.service_key.clone())) {
223            if !key.is_empty() {
224                req = req.header("X-EdgeBase-Service-Key", &key);
225                req = req.header("Authorization", format!("Bearer {}", key));
226            }
227        }
228        self.send(req).await
229    }
230
231    /// POST raw bytes (for multipart upload-part).
232    pub async fn post_bytes(&self, path: &str, data: Vec<u8>, content_type: &str) -> Result<Value, Error> {
233        let req = self.build_request(Method::POST, path)
234            .header("Content-Type", content_type)
235            .body(data);
236        self.send(req).await
237    }
238
239    /// Download raw bytes.
240    pub async fn download_raw(&self, path: &str) -> Result<Vec<u8>, Error> {
241        let req = self.build_request(Method::GET, path);
242        let resp = req.send().await?;
243        let status = resp.status();
244        if !status.is_success() {
245            let msg = resp.text().await.unwrap_or_default();
246            return Err(Error::Api { status: status.as_u16(), message: msg });
247        }
248        Ok(resp.bytes().await.map(|b| b.to_vec())?)
249    }
250}