1use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
4use std::time::Duration;
5use tokio_util::sync::CancellationToken;
6use tracing::{debug, warn};
7use uuid::Uuid;
8
9use crate::models::{HttpError, HttpResult, RetryConfig};
10
11#[derive(Debug, Clone)]
13pub struct TimeoutConfig {
14 pub connect: Duration,
15 pub read: Duration,
16 pub write: Duration,
17}
18
19impl Default for TimeoutConfig {
20 fn default() -> Self {
21 Self {
22 connect: Duration::from_secs(10),
23 read: Duration::from_secs(300),
24 write: Duration::from_secs(10),
25 }
26 }
27}
28
29pub struct HttpClient {
36 client: reqwest::Client,
37 api_url: String,
38 retry_config: RetryConfig,
39 circuit_breaker: Option<std::sync::Arc<crate::circuit_breaker::CircuitBreaker>>,
40}
41
42impl HttpClient {
43 pub fn new(
45 api_url: impl Into<String>,
46 headers: HeaderMap,
47 timeout: Option<TimeoutConfig>,
48 ) -> Result<Self, HttpError> {
49 let timeout = timeout.unwrap_or_default();
50 let client = reqwest::Client::builder()
51 .default_headers(headers)
52 .connect_timeout(timeout.connect)
53 .timeout(timeout.read)
54 .build()?;
55
56 Ok(Self {
57 client,
58 api_url: api_url.into(),
59 retry_config: RetryConfig::default(),
60 circuit_breaker: None,
61 })
62 }
63
64 pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
66 self.retry_config = config;
67 self
68 }
69
70 pub fn with_circuit_breaker(
75 mut self,
76 cb: std::sync::Arc<crate::circuit_breaker::CircuitBreaker>,
77 ) -> Self {
78 self.circuit_breaker = Some(cb);
79 self
80 }
81
82 pub async fn post_json(
91 &self,
92 payload: &serde_json::Value,
93 cancel: Option<&CancellationToken>,
94 ) -> Result<HttpResult, HttpError> {
95 if let Some(cb) = &self.circuit_breaker {
97 cb.check()?;
98 }
99
100 let mut last_result: Option<HttpResult> = None;
101
102 for attempt in 0..=self.retry_config.max_retries {
103 if let Some(token) = cancel
105 && token.is_cancelled()
106 {
107 return Ok(HttpResult::interrupted());
108 }
109
110 let result = self.execute_request(payload, cancel).await;
111
112 match result {
113 Ok(hr) if hr.success => {
114 if let Some(status) = hr.status
116 && self.retry_config.is_retryable_status(status)
117 {
118 let delay = self.get_retry_delay(
119 hr.retry_after.as_deref(),
120 hr.retry_after_ms.as_deref(),
121 attempt,
122 );
123 last_result = Some(hr);
124 if attempt < self.retry_config.max_retries {
125 warn!(
126 status,
127 attempt = attempt + 1,
128 max = self.retry_config.max_retries,
129 "Retryable HTTP status, backing off for {:.1}s",
130 delay.as_secs_f64()
131 );
132 self.interruptible_sleep(delay, cancel).await?;
133 continue;
134 }
135 warn!(
136 status,
137 "Exhausted {} retries", self.retry_config.max_retries
138 );
139 self.cb_record_failure();
140 return Ok(last_result.unwrap_or_else(|| {
141 HttpResult::fail("Unexpected retry exhaustion", false)
142 }));
143 }
144 self.cb_record_success();
145 return Ok(hr);
146 }
147 Ok(hr) if hr.retryable => {
148 let retry_after = hr.retry_after.clone();
149 let retry_after_ms = hr.retry_after_ms.clone();
150 last_result = Some(hr);
151 if attempt < self.retry_config.max_retries {
152 let delay = self.get_retry_delay(
153 retry_after.as_deref(),
154 retry_after_ms.as_deref(),
155 attempt,
156 );
157 warn!(
158 error = last_result.as_ref().and_then(|r| r.error.as_deref()),
159 attempt = attempt + 1,
160 max = self.retry_config.max_retries,
161 "Retryable error, backing off for {:.1}s",
162 delay.as_secs_f64()
163 );
164 self.interruptible_sleep(delay, cancel).await?;
165 continue;
166 }
167 warn!("Exhausted {} retries", self.retry_config.max_retries);
168 self.cb_record_failure();
169 return Ok(last_result.unwrap_or_else(|| {
170 HttpResult::fail("Unexpected retry exhaustion", false)
171 }));
172 }
173 Ok(hr) => {
174 if hr.success {
175 self.cb_record_success();
176 } else {
177 self.cb_record_failure();
178 }
179 return Ok(hr);
180 }
181 Err(e) => {
182 self.cb_record_failure();
183 return Err(e);
184 }
185 }
186 }
187
188 self.cb_record_failure();
189 Ok(last_result.unwrap_or_else(|| HttpResult::fail("Unexpected retry exhaustion", false)))
190 }
191
192 fn cb_record_success(&self) {
194 if let Some(cb) = &self.circuit_breaker {
195 cb.record_success();
196 }
197 }
198
199 fn cb_record_failure(&self) {
201 if let Some(cb) = &self.circuit_breaker {
202 cb.record_failure();
203 }
204 }
205
206 async fn execute_request(
211 &self,
212 payload: &serde_json::Value,
213 cancel: Option<&CancellationToken>,
214 ) -> Result<HttpResult, HttpError> {
215 let request_id = Uuid::new_v4().to_string();
216 debug!(request_id = %request_id, api_url = %self.api_url, "Sending LLM request");
217
218 let request = self
219 .client
220 .post(&self.api_url)
221 .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
222 .header(
223 HeaderName::from_static("x-request-id"),
224 HeaderValue::from_str(&request_id)
225 .unwrap_or_else(|_| HeaderValue::from_static("unknown")),
226 )
227 .json(payload)
228 .send();
229
230 let response = match cancel {
231 Some(token) => {
232 tokio::select! {
233 resp = request => resp,
234 _ = token.cancelled() => {
235 return Ok(HttpResult::interrupted()
236 .with_request_id(request_id));
237 }
238 }
239 }
240 None => request.await,
241 };
242
243 match response {
244 Ok(resp) => {
245 let status = resp.status().as_u16();
246 debug!(request_id = %request_id, status, "LLM response received");
247 if self.retry_config.is_retryable_status(status) {
248 let retry_after = resp
250 .headers()
251 .get("retry-after")
252 .and_then(|v| v.to_str().ok())
253 .map(String::from);
254 let retry_after_ms = resp
255 .headers()
256 .get("retry-after-ms")
257 .and_then(|v| v.to_str().ok())
258 .map(String::from);
259 let body = resp.json::<serde_json::Value>().await.ok();
260 let mut result = HttpResult::retryable_status(status, body, retry_after)
261 .with_request_id(request_id);
262 result.retry_after_ms = retry_after_ms;
263 return Ok(result);
264 }
265 let body = resp.json::<serde_json::Value>().await?;
266 if status >= 400 {
267 let error_msg = body
268 .get("error")
269 .and_then(|e| e.get("message"))
270 .and_then(|m| m.as_str())
271 .map(|s| s.to_string())
272 .unwrap_or_else(|| format!("HTTP {status}"));
273 warn!(request_id = %request_id, status, error = %error_msg, "LLM request failed");
274 return Ok(HttpResult {
275 success: false,
276 status: Some(status),
277 body: Some(body),
278 error: Some(format!("[request_id={}] {}", request_id, error_msg)),
279 interrupted: false,
280 retryable: false,
281 request_id: Some(request_id),
282 retry_after: None,
283 retry_after_ms: None,
284 });
285 }
286 Ok(HttpResult::ok(status, body).with_request_id(request_id))
287 }
288 Err(e) if is_retryable_error(&e) => {
289 warn!(request_id = %request_id, error = %e, "LLM request retryable error");
290 Ok(
291 HttpResult::fail(format!("[request_id={}] {}", request_id, e), true)
292 .with_request_id(request_id),
293 )
294 }
295 Err(e) => {
296 warn!(request_id = %request_id, error = %e, "LLM request error");
297 Ok(
298 HttpResult::fail(format!("[request_id={}] {}", request_id, e), false)
299 .with_request_id(request_id),
300 )
301 }
302 }
303 }
304
305 fn get_retry_delay(
307 &self,
308 retry_after: Option<&str>,
309 retry_after_ms: Option<&str>,
310 attempt: u32,
311 ) -> Duration {
312 if let Some(parsed) = crate::models::parse_retry_after(retry_after, retry_after_ms) {
313 let max = Duration::from_millis(self.retry_config.max_delay_ms);
315 return parsed.min(max);
316 }
317 self.retry_config.delay_for_attempt(attempt)
318 }
319
320 async fn interruptible_sleep(
322 &self,
323 duration: Duration,
324 cancel: Option<&CancellationToken>,
325 ) -> Result<(), HttpError> {
326 match cancel {
327 Some(token) => {
328 tokio::select! {
329 _ = tokio::time::sleep(duration) => Ok(()),
330 _ = token.cancelled() => Err(HttpError::Interrupted),
331 }
332 }
333 None => {
334 tokio::time::sleep(duration).await;
335 Ok(())
336 }
337 }
338 }
339
340 pub async fn send_streaming_request(
349 &self,
350 url: &str,
351 payload: &serde_json::Value,
352 cancel: Option<&CancellationToken>,
353 ) -> Result<reqwest::Response, HttpError> {
354 if let Some(cb) = &self.circuit_breaker {
356 cb.check()?;
357 }
358
359 let mut last_error: Option<HttpError> = None;
360
361 for attempt in 0..=self.retry_config.max_retries {
362 if let Some(token) = cancel
364 && token.is_cancelled()
365 {
366 return Err(HttpError::Interrupted);
367 }
368
369 let request_id = Uuid::new_v4().to_string();
370 debug!(request_id = %request_id, api_url = %url, attempt, "Sending streaming LLM request");
371
372 let request = self
373 .client
374 .post(url)
375 .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
376 .header(
377 HeaderName::from_static("x-request-id"),
378 HeaderValue::from_str(&request_id)
379 .unwrap_or_else(|_| HeaderValue::from_static("unknown")),
380 )
381 .json(payload)
382 .send();
383
384 let response = match cancel {
385 Some(token) => {
386 tokio::select! {
387 resp = request => resp,
388 _ = token.cancelled() => {
389 return Err(HttpError::Interrupted);
390 }
391 }
392 }
393 None => request.await,
394 };
395
396 match response {
397 Ok(resp) => {
398 let status = resp.status().as_u16();
399
400 if self.retry_config.is_retryable_status(status) {
401 let retry_after = resp
403 .headers()
404 .get("retry-after")
405 .and_then(|v| v.to_str().ok())
406 .map(String::from);
407 let retry_after_ms = resp
408 .headers()
409 .get("retry-after-ms")
410 .and_then(|v| v.to_str().ok())
411 .map(String::from);
412 let body = resp.text().await.unwrap_or_default();
413 let error_msg = serde_json::from_str::<serde_json::Value>(&body)
414 .ok()
415 .and_then(|v| {
416 v.get("error")
417 .and_then(|e| e.get("message"))
418 .and_then(|m| m.as_str())
419 .map(String::from)
420 })
421 .unwrap_or_else(|| format!("HTTP {status}"));
422
423 last_error = Some(HttpError::Other(format!(
424 "[request_id={request_id}] {error_msg}"
425 )));
426
427 if attempt < self.retry_config.max_retries {
428 let delay = self.get_retry_delay(
429 retry_after.as_deref(),
430 retry_after_ms.as_deref(),
431 attempt,
432 );
433 warn!(
434 request_id = %request_id,
435 status,
436 attempt = attempt + 1,
437 max = self.retry_config.max_retries,
438 "Streaming request retryable status {status}, backing off for {:.1}s",
439 delay.as_secs_f64()
440 );
441 self.interruptible_sleep(delay, cancel).await?;
442 continue;
443 }
444 warn!(
445 request_id = %request_id,
446 status,
447 "Streaming request exhausted {} retries",
448 self.retry_config.max_retries
449 );
450 } else if status >= 400 {
451 let body = resp.text().await.unwrap_or_default();
453 let error_msg = serde_json::from_str::<serde_json::Value>(&body)
454 .ok()
455 .and_then(|v| {
456 v.get("error")
457 .and_then(|e| e.get("message"))
458 .and_then(|m| m.as_str())
459 .map(String::from)
460 })
461 .unwrap_or_else(|| format!("HTTP {status}"));
462 warn!(request_id = %request_id, status, error = %error_msg, "Streaming request failed");
463 self.cb_record_failure();
464 return Err(HttpError::Other(format!(
465 "[request_id={request_id}] {error_msg}"
466 )));
467 } else {
468 self.cb_record_success();
469 return Ok(resp);
470 }
471 }
472 Err(e) if is_retryable_error(&e) => {
473 warn!(error = %e, attempt = attempt + 1, max = self.retry_config.max_retries, "Streaming request transport error");
474 last_error = Some(HttpError::Request(e));
475 if attempt < self.retry_config.max_retries {
476 let delay = self.get_retry_delay(None, None, attempt);
477 warn!(
478 "Streaming request backing off for {:.1}s",
479 delay.as_secs_f64()
480 );
481 self.interruptible_sleep(delay, cancel).await?;
482 continue;
483 }
484 }
485 Err(e) => {
486 self.cb_record_failure();
488 return Err(e.into());
489 }
490 }
491 }
492
493 self.cb_record_failure();
494 Err(last_error.unwrap_or_else(|| HttpError::Other("Streaming retries exhausted".into())))
495 }
496
497 pub fn api_url(&self) -> &str {
499 &self.api_url
500 }
501}
502
503impl std::fmt::Debug for HttpClient {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 let mut s = f.debug_struct("HttpClient");
506 s.field("api_url", &self.api_url)
507 .field("retry_config", &self.retry_config);
508 if let Some(cb) = &self.circuit_breaker {
509 s.field("circuit_breaker", cb);
510 }
511 s.finish()
512 }
513}
514
515fn is_retryable_error(err: &reqwest::Error) -> bool {
517 err.is_connect() || err.is_timeout() || err.is_request()
518}
519
520#[cfg(test)]
521#[path = "client_tests.rs"]
522mod tests;