use std::time::Instant;
use reqwest::header::HeaderMap;
use super::error::{categorise_http, BackendError};
use super::observability;
use super::retry::{parse_retry_after, BackendRetryPolicy};
pub(crate) async fn call_with_retry(
http: &reqwest::Client,
policy: &BackendRetryPolicy,
url: &str,
display_url: Option<&str>,
headers: HeaderMap,
body: Vec<u8>,
provider: &str,
model: &str,
api_key_env: Option<&str>,
) -> Result<(Vec<u8>, u32), BackendError> {
let max_retries = policy.max_retries();
let mut last_status: Option<u16> = None;
let log_url = display_url.unwrap_or(url);
for attempt in 0..=max_retries {
let send_start = Instant::now();
observability::on_http_send(log_url, body.len());
let result = http
.post(url)
.headers(headers.clone())
.body(body.clone())
.send()
.await;
match result {
Ok(response) => {
let status = response.status().as_u16();
let response_headers = response.headers().clone();
let response_bytes = response
.bytes()
.await
.map_err(|e| BackendError::Generic {
provider: provider.into(),
model: model.into(),
status: Some(status),
message: format!("failed to read response bytes: {e}"),
})?;
observability::on_http_recv(
status,
response_bytes.len(),
send_start.elapsed().as_millis() as u64,
);
last_status = Some(status);
if status == 200 {
return Ok((response_bytes.to_vec(), attempt));
}
if BackendRetryPolicy::is_retryable_status(status) && attempt < max_retries {
let retry_after = parse_retry_after(&response_headers);
let delay = policy.delay_for_response(attempt, retry_after);
observability::on_retry_scheduled(
attempt,
delay.as_millis() as u64,
&status.to_string(),
);
tokio::time::sleep(delay).await;
continue;
}
let body_str = String::from_utf8_lossy(&response_bytes).to_string();
let err = categorise_http(
provider,
model,
status,
&response_headers,
&body_str,
api_key_env,
);
observability::on_error(err.category(), Some(status), &err.to_string());
return Err(err);
}
Err(e) if attempt < max_retries => {
let reason = if e.is_timeout() {
"timeout"
} else if e.is_connect() {
"connect"
} else {
"transport"
};
let delay = policy.delay_for_transport(attempt);
observability::on_retry_scheduled(
attempt,
delay.as_millis() as u64,
reason,
);
tokio::time::sleep(delay).await;
continue;
}
Err(e) => {
let err = BackendError::Generic {
provider: provider.into(),
model: model.into(),
status: None,
message: format!("transport failure after {} attempts: {e}", attempt + 1),
};
observability::on_error(err.category(), None, &err.to_string());
return Err(err);
}
}
}
Err(BackendError::Generic {
provider: provider.into(),
model: model.into(),
status: last_status,
message: format!("retry budget exhausted ({max_retries} retries)"),
})
}
#[cfg(test)]
mod tests {
}