use std::time::Duration;
use async_trait::async_trait;
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
use reqwest::{Client, ClientBuilder};
use serde_json::Value;
use crate::error::{Result, VectorizerError};
use crate::transport::{Protocol, Transport};
const RETRY_AFTER_MAX_ATTEMPTS: u32 = 3;
const RETRY_AFTER_MAX_SECS: u64 = 30;
const RETRY_AFTER_DEFAULT_SECS: u64 = 1;
pub struct HttpTransport {
client: Client,
base_url: String,
}
impl HttpTransport {
pub fn new(base_url: &str, api_key: Option<&str>, timeout_secs: u64) -> Result<Self> {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
if let Some(key) = api_key {
let (header_name, header_value) = if looks_like_jwt(key) {
("Authorization", format!("Bearer {key}"))
} else {
("X-API-Key", key.to_string())
};
headers.insert(
header_name,
HeaderValue::from_str(&header_value).map_err(|e| {
VectorizerError::configuration(format!("Invalid auth credential: {e}"))
})?,
);
}
let client = ClientBuilder::new()
.timeout(std::time::Duration::from_secs(timeout_secs))
.default_headers(headers)
.build()
.map_err(|e| {
VectorizerError::configuration(format!("Failed to create HTTP client: {e}"))
})?;
Ok(Self {
client,
base_url: base_url.to_string(),
})
}
}
fn looks_like_jwt(token: &str) -> bool {
let mut parts = token.split('.');
let Some(header) = parts.next() else {
return false;
};
let Some(payload) = parts.next() else {
return false;
};
let Some(signature) = parts.next() else {
return false;
};
if parts.next().is_some() {
return false;
}
!header.is_empty() && !payload.is_empty() && !signature.is_empty()
}
impl HttpTransport {
async fn request(&self, method: &str, path: &str, body: Option<&Value>) -> Result<String> {
let url = format!("{}{}", self.base_url, path);
let mut attempts_remaining = RETRY_AFTER_MAX_ATTEMPTS;
loop {
let mut request = match method {
"GET" => self.client.get(&url),
"POST" => self.client.post(&url),
"PUT" => self.client.put(&url),
"DELETE" => self.client.delete(&url),
_ => {
return Err(VectorizerError::configuration(format!(
"Unsupported HTTP method: {method}"
)));
}
};
if let Some(data) = body {
request = request.json(data);
}
let response = request
.send()
.await
.map_err(|e| VectorizerError::network(format!("HTTP request failed: {e}")))?;
if response.status().as_u16() == 429 {
let retry_after = parse_retry_after_secs(
response
.headers()
.get(reqwest::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok()),
);
let body_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
if attempts_remaining == 0 {
return Err(VectorizerError::rate_limit(format!(
"HTTP 429 after {RETRY_AFTER_MAX_ATTEMPTS} retries: {body_text}",
)));
}
tracing::info!(
"Vectorizer 429 — sleeping {retry_after:?} before retry \
(remaining attempts={attempts_remaining})",
);
attempts_remaining -= 1;
tokio::time::sleep(retry_after).await;
continue;
}
if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(VectorizerError::server(format!(
"HTTP {status}: {error_text}"
)));
}
return response
.text()
.await
.map_err(|e| VectorizerError::network(format!("Failed to read response: {e}")));
}
}
}
#[doc(hidden)]
pub fn parse_retry_after_secs(value: Option<&str>) -> Duration {
let raw = match value {
Some(v) => v.trim(),
None => return Duration::from_secs(RETRY_AFTER_DEFAULT_SECS),
};
let secs = raw.parse::<u64>().unwrap_or(RETRY_AFTER_DEFAULT_SECS);
let secs = if secs == 0 {
RETRY_AFTER_DEFAULT_SECS
} else {
secs.min(RETRY_AFTER_MAX_SECS)
};
Duration::from_secs(secs)
}
#[async_trait]
impl Transport for HttpTransport {
async fn get(&self, path: &str) -> Result<String> {
self.request("GET", path, None).await
}
async fn post(&self, path: &str, data: Option<&Value>) -> Result<String> {
self.request("POST", path, data).await
}
async fn put(&self, path: &str, data: Option<&Value>) -> Result<String> {
self.request("PUT", path, data).await
}
async fn delete(&self, path: &str) -> Result<String> {
self.request("DELETE", path, None).await
}
fn protocol(&self) -> Protocol {
Protocol::Http
}
}
impl HttpTransport {
pub async fn post_multipart(
&self,
path: &str,
file_bytes: Vec<u8>,
filename: &str,
form_fields: std::collections::HashMap<String, String>,
) -> Result<String> {
let url = format!("{}{}", self.base_url, path);
let mut form = reqwest::multipart::Form::new();
let file_part = reqwest::multipart::Part::bytes(file_bytes).file_name(filename.to_string());
form = form.part("file", file_part);
for (key, value) in form_fields {
form = form.text(key, value);
}
let response = self
.client
.post(&url)
.multipart(form)
.send()
.await
.map_err(|e| VectorizerError::network(format!("File upload failed: {e}")))?;
if !response.status().is_success() {
let status = response.status();
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
return Err(VectorizerError::server(format!(
"HTTP {status}: {error_text}"
)));
}
response
.text()
.await
.map_err(|e| VectorizerError::network(format!("Failed to read response: {e}")))
}
}