Skip to main content

vectorizer_sdk/
http_transport.rs

1//! HTTP transport implementation using reqwest
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
7use reqwest::{Client, ClientBuilder};
8use serde_json::Value;
9
10use crate::error::{Result, VectorizerError};
11use crate::transport::{Protocol, Transport};
12
13/// Maximum number of times an HTTP 429 will be retried before the
14/// error is surfaced to the caller (issue #263).
15const RETRY_AFTER_MAX_ATTEMPTS: u32 = 3;
16/// Cap on the `Retry-After` header value the client is willing to
17/// honor. A misconfigured server can't pin the client into a long
18/// sleep beyond this.
19const RETRY_AFTER_MAX_SECS: u64 = 30;
20/// Floor on the parsed `Retry-After` value when the header is missing
21/// or zero, so we don't busy-loop the server.
22const RETRY_AFTER_DEFAULT_SECS: u64 = 1;
23
24/// HTTP transport client
25pub struct HttpTransport {
26    client: Client,
27    base_url: String,
28}
29
30impl HttpTransport {
31    /// Create a new HTTP transport.
32    ///
33    /// The `api_key` argument carries either a raw Vectorizer API key
34    /// (created via `POST /auth/keys`) or a JWT minted by `POST /auth/login`.
35    /// The transport sniffs the shape — three dot-separated base64url
36    /// segments → JWT, sent as `Authorization: Bearer <token>`; otherwise
37    /// sent as `X-API-Key: <key>`. The server's auth middleware treats
38    /// Bearer-wrapped strings as JWTs and never falls back to the API-key
39    /// validator, so sending a raw API key under `Authorization: Bearer`
40    /// silently 401s. This sniff keeps the public method signature
41    /// unchanged while routing each credential down the path the server
42    /// actually accepts.
43    pub fn new(base_url: &str, api_key: Option<&str>, timeout_secs: u64) -> Result<Self> {
44        let mut headers = HeaderMap::new();
45        headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
46
47        if let Some(key) = api_key {
48            let (header_name, header_value) = if looks_like_jwt(key) {
49                ("Authorization", format!("Bearer {key}"))
50            } else {
51                ("X-API-Key", key.to_string())
52            };
53            headers.insert(
54                header_name,
55                HeaderValue::from_str(&header_value).map_err(|e| {
56                    VectorizerError::configuration(format!("Invalid auth credential: {e}"))
57                })?,
58            );
59        }
60
61        let client = ClientBuilder::new()
62            .timeout(std::time::Duration::from_secs(timeout_secs))
63            .default_headers(headers)
64            .build()
65            .map_err(|e| {
66                VectorizerError::configuration(format!("Failed to create HTTP client: {e}"))
67            })?;
68
69        Ok(Self {
70            client,
71            base_url: base_url.to_string(),
72        })
73    }
74}
75
76/// Cheap JWT shape sniff. A JWT is three base64url-encoded segments
77/// separated by `.`; every segment must be non-empty. Raw API keys
78/// generated by `POST /auth/keys` are a single 32-char alphanumeric
79/// string, so they fail this check and get routed to `X-API-Key`.
80fn looks_like_jwt(token: &str) -> bool {
81    let mut parts = token.split('.');
82    let Some(header) = parts.next() else {
83        return false;
84    };
85    let Some(payload) = parts.next() else {
86        return false;
87    };
88    let Some(signature) = parts.next() else {
89        return false;
90    };
91    if parts.next().is_some() {
92        return false;
93    }
94    !header.is_empty() && !payload.is_empty() && !signature.is_empty()
95}
96
97impl HttpTransport {
98    /// Make a generic request. Honors `Retry-After` on HTTP 429
99    /// responses (issue #263): the client sleeps for the header's
100    /// value (capped) and retries up to [`RETRY_AFTER_MAX_ATTEMPTS`]
101    /// times before surfacing a `RateLimit` error.
102    async fn request(&self, method: &str, path: &str, body: Option<&Value>) -> Result<String> {
103        let url = format!("{}{}", self.base_url, path);
104        let mut attempts_remaining = RETRY_AFTER_MAX_ATTEMPTS;
105
106        loop {
107            let mut request = match method {
108                "GET" => self.client.get(&url),
109                "POST" => self.client.post(&url),
110                "PUT" => self.client.put(&url),
111                "DELETE" => self.client.delete(&url),
112                "PATCH" => self.client.patch(&url),
113                _ => {
114                    return Err(VectorizerError::configuration(format!(
115                        "Unsupported HTTP method: {method}"
116                    )));
117                }
118            };
119
120            if let Some(data) = body {
121                request = request.json(data);
122            }
123
124            let response = request
125                .send()
126                .await
127                .map_err(|e| VectorizerError::network(format!("HTTP request failed: {e}")))?;
128
129            if response.status().as_u16() == 429 {
130                let retry_after = parse_retry_after_secs(
131                    response
132                        .headers()
133                        .get(reqwest::header::RETRY_AFTER)
134                        .and_then(|v| v.to_str().ok()),
135                );
136                let body_text = response
137                    .text()
138                    .await
139                    .unwrap_or_else(|_| "Unknown error".to_string());
140
141                if attempts_remaining == 0 {
142                    return Err(VectorizerError::rate_limit(format!(
143                        "HTTP 429 after {RETRY_AFTER_MAX_ATTEMPTS} retries: {body_text}",
144                    )));
145                }
146
147                tracing::info!(
148                    "Vectorizer 429 — sleeping {retry_after:?} before retry \
149                     (remaining attempts={attempts_remaining})",
150                );
151                attempts_remaining -= 1;
152                tokio::time::sleep(retry_after).await;
153                continue;
154            }
155
156            if !response.status().is_success() {
157                let status = response.status();
158                let error_text = response
159                    .text()
160                    .await
161                    .unwrap_or_else(|_| "Unknown error".to_string());
162                return Err(VectorizerError::server(format!(
163                    "HTTP {status}: {error_text}"
164                )));
165            }
166
167            return response
168                .text()
169                .await
170                .map_err(|e| VectorizerError::network(format!("Failed to read response: {e}")));
171        }
172    }
173}
174
175/// Parse a `Retry-After` header value (seconds form only). Returns a
176/// sensible default when missing/unparseable; caps the value so a
177/// misconfigured server can't pin the client into a long sleep.
178///
179/// Public for test consumption only; not part of the stable SDK API.
180#[doc(hidden)]
181pub fn parse_retry_after_secs(value: Option<&str>) -> Duration {
182    let raw = match value {
183        Some(v) => v.trim(),
184        None => return Duration::from_secs(RETRY_AFTER_DEFAULT_SECS),
185    };
186    let secs = raw.parse::<u64>().unwrap_or(RETRY_AFTER_DEFAULT_SECS);
187    let secs = if secs == 0 {
188        RETRY_AFTER_DEFAULT_SECS
189    } else {
190        secs.min(RETRY_AFTER_MAX_SECS)
191    };
192    Duration::from_secs(secs)
193}
194
195#[async_trait]
196impl Transport for HttpTransport {
197    async fn get(&self, path: &str) -> Result<String> {
198        self.request("GET", path, None).await
199    }
200
201    async fn post(&self, path: &str, data: Option<&Value>) -> Result<String> {
202        self.request("POST", path, data).await
203    }
204
205    async fn put(&self, path: &str, data: Option<&Value>) -> Result<String> {
206        self.request("PUT", path, data).await
207    }
208
209    async fn delete(&self, path: &str) -> Result<String> {
210        self.request("DELETE", path, None).await
211    }
212
213    async fn patch(&self, path: &str, data: Option<&Value>) -> Result<String> {
214        self.request("PATCH", path, data).await
215    }
216
217    fn protocol(&self) -> Protocol {
218        Protocol::Http
219    }
220}
221
222impl HttpTransport {
223    /// Upload a file using multipart/form-data (not part of Transport trait)
224    pub async fn post_multipart(
225        &self,
226        path: &str,
227        file_bytes: Vec<u8>,
228        filename: &str,
229        form_fields: std::collections::HashMap<String, String>,
230    ) -> Result<String> {
231        let url = format!("{}{}", self.base_url, path);
232
233        // Create multipart form
234        let mut form = reqwest::multipart::Form::new();
235
236        // Add file
237        let file_part = reqwest::multipart::Part::bytes(file_bytes).file_name(filename.to_string());
238        form = form.part("file", file_part);
239
240        // Add other form fields
241        for (key, value) in form_fields {
242            form = form.text(key, value);
243        }
244
245        let response = self
246            .client
247            .post(&url)
248            .multipart(form)
249            .send()
250            .await
251            .map_err(|e| VectorizerError::network(format!("File upload failed: {e}")))?;
252
253        if !response.status().is_success() {
254            let status = response.status();
255            let error_text = response
256                .text()
257                .await
258                .unwrap_or_else(|_| "Unknown error".to_string());
259            return Err(VectorizerError::server(format!(
260                "HTTP {status}: {error_text}"
261            )));
262        }
263
264        response
265            .text()
266            .await
267            .map_err(|e| VectorizerError::network(format!("Failed to read response: {e}")))
268    }
269}