Skip to main content

opendev_http/
client.rs

1//! HTTP client with retry logic and cancellation support.
2
3use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue};
4use std::time::Duration;
5use tokio_util::sync::CancellationToken;
6use tracing::{debug, warn};
7use uuid::Uuid;
8
9use crate::models::{HttpError, HttpResult, RetryConfig};
10
11/// Timeout configuration for HTTP requests.
12#[derive(Debug, Clone)]
13pub struct TimeoutConfig {
14    pub connect: Duration,
15    pub read: Duration,
16    pub write: Duration,
17}
18
19impl Default for TimeoutConfig {
20    fn default() -> Self {
21        Self {
22            connect: Duration::from_secs(10),
23            read: Duration::from_secs(300),
24            write: Duration::from_secs(10),
25        }
26    }
27}
28
29/// Async HTTP client with retry and cancellation support.
30///
31/// Wraps reqwest with:
32/// - Exponential backoff retries on 429/503
33/// - Respect for `Retry-After` headers
34/// - Cancellation via `CancellationToken` (checked between retries and via `tokio::select!`)
35pub struct HttpClient {
36    client: reqwest::Client,
37    api_url: String,
38    retry_config: RetryConfig,
39    circuit_breaker: Option<std::sync::Arc<crate::circuit_breaker::CircuitBreaker>>,
40}
41
42impl HttpClient {
43    /// Create a new HTTP client.
44    pub fn new(
45        api_url: impl Into<String>,
46        headers: HeaderMap,
47        timeout: Option<TimeoutConfig>,
48    ) -> Result<Self, HttpError> {
49        let timeout = timeout.unwrap_or_default();
50        let client = reqwest::Client::builder()
51            .default_headers(headers)
52            .connect_timeout(timeout.connect)
53            .timeout(timeout.read)
54            .build()?;
55
56        Ok(Self {
57            client,
58            api_url: api_url.into(),
59            retry_config: RetryConfig::default(),
60            circuit_breaker: None,
61        })
62    }
63
64    /// Create a client with custom retry configuration.
65    pub fn with_retry_config(mut self, config: RetryConfig) -> Self {
66        self.retry_config = config;
67        self
68    }
69
70    /// Attach a circuit breaker to this client.
71    ///
72    /// When set, every request is gated by the circuit breaker. Successful
73    /// responses close the circuit; failures (transport-level or 5xx) open it.
74    pub fn with_circuit_breaker(
75        mut self,
76        cb: std::sync::Arc<crate::circuit_breaker::CircuitBreaker>,
77    ) -> Self {
78        self.circuit_breaker = Some(cb);
79        self
80    }
81
82    /// POST JSON with retry logic and optional cancellation.
83    ///
84    /// On 429/503 responses, retries with exponential backoff. Respects
85    /// `Retry-After` headers. Checks the cancellation token between attempts
86    /// and races it against each request via `tokio::select!`.
87    ///
88    /// When a circuit breaker is attached, requests are rejected immediately
89    /// if the circuit is open.
90    pub async fn post_json(
91        &self,
92        payload: &serde_json::Value,
93        cancel: Option<&CancellationToken>,
94    ) -> Result<HttpResult, HttpError> {
95        // Check circuit breaker before attempting any request.
96        if let Some(cb) = &self.circuit_breaker {
97            cb.check()?;
98        }
99
100        let mut last_result: Option<HttpResult> = None;
101
102        for attempt in 0..=self.retry_config.max_retries {
103            // Check cancellation before each attempt
104            if let Some(token) = cancel
105                && token.is_cancelled()
106            {
107                return Ok(HttpResult::interrupted());
108            }
109
110            let result = self.execute_request(payload, cancel).await;
111
112            match result {
113                Ok(hr) if hr.success => {
114                    // Check if status is retryable (429/503 with a body)
115                    if let Some(status) = hr.status
116                        && self.retry_config.is_retryable_status(status)
117                    {
118                        let delay = self.get_retry_delay(
119                            hr.retry_after.as_deref(),
120                            hr.retry_after_ms.as_deref(),
121                            attempt,
122                        );
123                        last_result = Some(hr);
124                        if attempt < self.retry_config.max_retries {
125                            warn!(
126                                status,
127                                attempt = attempt + 1,
128                                max = self.retry_config.max_retries,
129                                "Retryable HTTP status, backing off for {:.1}s",
130                                delay.as_secs_f64()
131                            );
132                            self.interruptible_sleep(delay, cancel).await?;
133                            continue;
134                        }
135                        warn!(
136                            status,
137                            "Exhausted {} retries", self.retry_config.max_retries
138                        );
139                        self.cb_record_failure();
140                        return Ok(last_result.unwrap_or_else(|| {
141                            HttpResult::fail("Unexpected retry exhaustion", false)
142                        }));
143                    }
144                    self.cb_record_success();
145                    return Ok(hr);
146                }
147                Ok(hr) if hr.retryable => {
148                    let retry_after = hr.retry_after.clone();
149                    let retry_after_ms = hr.retry_after_ms.clone();
150                    last_result = Some(hr);
151                    if attempt < self.retry_config.max_retries {
152                        let delay = self.get_retry_delay(
153                            retry_after.as_deref(),
154                            retry_after_ms.as_deref(),
155                            attempt,
156                        );
157                        warn!(
158                            error = last_result.as_ref().and_then(|r| r.error.as_deref()),
159                            attempt = attempt + 1,
160                            max = self.retry_config.max_retries,
161                            "Retryable error, backing off for {:.1}s",
162                            delay.as_secs_f64()
163                        );
164                        self.interruptible_sleep(delay, cancel).await?;
165                        continue;
166                    }
167                    warn!("Exhausted {} retries", self.retry_config.max_retries);
168                    self.cb_record_failure();
169                    return Ok(last_result.unwrap_or_else(|| {
170                        HttpResult::fail("Unexpected retry exhaustion", false)
171                    }));
172                }
173                Ok(hr) => {
174                    if hr.success {
175                        self.cb_record_success();
176                    } else {
177                        self.cb_record_failure();
178                    }
179                    return Ok(hr);
180                }
181                Err(e) => {
182                    self.cb_record_failure();
183                    return Err(e);
184                }
185            }
186        }
187
188        self.cb_record_failure();
189        Ok(last_result.unwrap_or_else(|| HttpResult::fail("Unexpected retry exhaustion", false)))
190    }
191
192    /// Record a success on the circuit breaker (if attached).
193    fn cb_record_success(&self) {
194        if let Some(cb) = &self.circuit_breaker {
195            cb.record_success();
196        }
197    }
198
199    /// Record a failure on the circuit breaker (if attached).
200    fn cb_record_failure(&self) {
201        if let Some(cb) = &self.circuit_breaker {
202            cb.record_failure();
203        }
204    }
205
206    /// Execute a single POST request, racing against cancellation.
207    ///
208    /// Each request is tagged with a unique `X-Request-Id` header and
209    /// logged via a tracing span for end-to-end observability.
210    async fn execute_request(
211        &self,
212        payload: &serde_json::Value,
213        cancel: Option<&CancellationToken>,
214    ) -> Result<HttpResult, HttpError> {
215        let request_id = Uuid::new_v4().to_string();
216        debug!(request_id = %request_id, api_url = %self.api_url, "Sending LLM request");
217
218        let request = self
219            .client
220            .post(&self.api_url)
221            .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
222            .header(
223                HeaderName::from_static("x-request-id"),
224                HeaderValue::from_str(&request_id)
225                    .unwrap_or_else(|_| HeaderValue::from_static("unknown")),
226            )
227            .json(payload)
228            .send();
229
230        let response = match cancel {
231            Some(token) => {
232                tokio::select! {
233                    resp = request => resp,
234                    _ = token.cancelled() => {
235                        return Ok(HttpResult::interrupted()
236                            .with_request_id(request_id));
237                    }
238                }
239            }
240            None => request.await,
241        };
242
243        match response {
244            Ok(resp) => {
245                let status = resp.status().as_u16();
246                debug!(request_id = %request_id, status, "LLM response received");
247                if self.retry_config.is_retryable_status(status) {
248                    // Extract Retry-After and retry-after-ms headers
249                    let retry_after = resp
250                        .headers()
251                        .get("retry-after")
252                        .and_then(|v| v.to_str().ok())
253                        .map(String::from);
254                    let retry_after_ms = resp
255                        .headers()
256                        .get("retry-after-ms")
257                        .and_then(|v| v.to_str().ok())
258                        .map(String::from);
259                    let body = resp.json::<serde_json::Value>().await.ok();
260                    let mut result = HttpResult::retryable_status(status, body, retry_after)
261                        .with_request_id(request_id);
262                    result.retry_after_ms = retry_after_ms;
263                    return Ok(result);
264                }
265                let body = resp.json::<serde_json::Value>().await?;
266                if status >= 400 {
267                    let error_msg = body
268                        .get("error")
269                        .and_then(|e| e.get("message"))
270                        .and_then(|m| m.as_str())
271                        .map(|s| s.to_string())
272                        .unwrap_or_else(|| format!("HTTP {status}"));
273                    warn!(request_id = %request_id, status, error = %error_msg, "LLM request failed");
274                    return Ok(HttpResult {
275                        success: false,
276                        status: Some(status),
277                        body: Some(body),
278                        error: Some(format!("[request_id={}] {}", request_id, error_msg)),
279                        interrupted: false,
280                        retryable: false,
281                        request_id: Some(request_id),
282                        retry_after: None,
283                        retry_after_ms: None,
284                    });
285                }
286                Ok(HttpResult::ok(status, body).with_request_id(request_id))
287            }
288            Err(e) if is_retryable_error(&e) => {
289                warn!(request_id = %request_id, error = %e, "LLM request retryable error");
290                Ok(
291                    HttpResult::fail(format!("[request_id={}] {}", request_id, e), true)
292                        .with_request_id(request_id),
293                )
294            }
295            Err(e) => {
296                warn!(request_id = %request_id, error = %e, "LLM request error");
297                Ok(
298                    HttpResult::fail(format!("[request_id={}] {}", request_id, e), false)
299                        .with_request_id(request_id),
300                )
301            }
302        }
303    }
304
305    /// Determine retry delay from Retry-After/retry-after-ms headers or default backoff.
306    fn get_retry_delay(
307        &self,
308        retry_after: Option<&str>,
309        retry_after_ms: Option<&str>,
310        attempt: u32,
311    ) -> Duration {
312        if let Some(parsed) = crate::models::parse_retry_after(retry_after, retry_after_ms) {
313            // Cap server-requested delay at max_delay_ms
314            let max = Duration::from_millis(self.retry_config.max_delay_ms);
315            return parsed.min(max);
316        }
317        self.retry_config.delay_for_attempt(attempt)
318    }
319
320    /// Sleep that can be interrupted by cancellation.
321    async fn interruptible_sleep(
322        &self,
323        duration: Duration,
324        cancel: Option<&CancellationToken>,
325    ) -> Result<(), HttpError> {
326        match cancel {
327            Some(token) => {
328                tokio::select! {
329                    _ = tokio::time::sleep(duration) => Ok(()),
330                    _ = token.cancelled() => Err(HttpError::Interrupted),
331                }
332            }
333            None => {
334                tokio::time::sleep(duration).await;
335                Ok(())
336            }
337        }
338    }
339
340    /// Send a POST request and return the raw response for streaming.
341    ///
342    /// Unlike `post_json`, this does NOT read the response body. The caller
343    /// is responsible for consuming the response (e.g., reading SSE lines).
344    ///
345    /// Retries on transport errors and retryable HTTP status codes (429/503)
346    /// before any response body has been consumed. Once a successful response
347    /// is returned to the caller, no further retries are attempted.
348    pub async fn send_streaming_request(
349        &self,
350        url: &str,
351        payload: &serde_json::Value,
352        cancel: Option<&CancellationToken>,
353    ) -> Result<reqwest::Response, HttpError> {
354        // Check circuit breaker
355        if let Some(cb) = &self.circuit_breaker {
356            cb.check()?;
357        }
358
359        let mut last_error: Option<HttpError> = None;
360
361        for attempt in 0..=self.retry_config.max_retries {
362            // Check cancellation before each attempt
363            if let Some(token) = cancel
364                && token.is_cancelled()
365            {
366                return Err(HttpError::Interrupted);
367            }
368
369            let request_id = Uuid::new_v4().to_string();
370            debug!(request_id = %request_id, api_url = %url, attempt, "Sending streaming LLM request");
371
372            let request = self
373                .client
374                .post(url)
375                .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
376                .header(
377                    HeaderName::from_static("x-request-id"),
378                    HeaderValue::from_str(&request_id)
379                        .unwrap_or_else(|_| HeaderValue::from_static("unknown")),
380                )
381                .json(payload)
382                .send();
383
384            let response = match cancel {
385                Some(token) => {
386                    tokio::select! {
387                        resp = request => resp,
388                        _ = token.cancelled() => {
389                            return Err(HttpError::Interrupted);
390                        }
391                    }
392                }
393                None => request.await,
394            };
395
396            match response {
397                Ok(resp) => {
398                    let status = resp.status().as_u16();
399
400                    if self.retry_config.is_retryable_status(status) {
401                        // Extract Retry-After headers before consuming the body
402                        let retry_after = resp
403                            .headers()
404                            .get("retry-after")
405                            .and_then(|v| v.to_str().ok())
406                            .map(String::from);
407                        let retry_after_ms = resp
408                            .headers()
409                            .get("retry-after-ms")
410                            .and_then(|v| v.to_str().ok())
411                            .map(String::from);
412                        let body = resp.text().await.unwrap_or_default();
413                        let error_msg = serde_json::from_str::<serde_json::Value>(&body)
414                            .ok()
415                            .and_then(|v| {
416                                v.get("error")
417                                    .and_then(|e| e.get("message"))
418                                    .and_then(|m| m.as_str())
419                                    .map(String::from)
420                            })
421                            .unwrap_or_else(|| format!("HTTP {status}"));
422
423                        last_error = Some(HttpError::Other(format!(
424                            "[request_id={request_id}] {error_msg}"
425                        )));
426
427                        if attempt < self.retry_config.max_retries {
428                            let delay = self.get_retry_delay(
429                                retry_after.as_deref(),
430                                retry_after_ms.as_deref(),
431                                attempt,
432                            );
433                            warn!(
434                                request_id = %request_id,
435                                status,
436                                attempt = attempt + 1,
437                                max = self.retry_config.max_retries,
438                                "Streaming request retryable status {status}, backing off for {:.1}s",
439                                delay.as_secs_f64()
440                            );
441                            self.interruptible_sleep(delay, cancel).await?;
442                            continue;
443                        }
444                        warn!(
445                            request_id = %request_id,
446                            status,
447                            "Streaming request exhausted {} retries",
448                            self.retry_config.max_retries
449                        );
450                    } else if status >= 400 {
451                        // Non-retryable error — fail immediately
452                        let body = resp.text().await.unwrap_or_default();
453                        let error_msg = serde_json::from_str::<serde_json::Value>(&body)
454                            .ok()
455                            .and_then(|v| {
456                                v.get("error")
457                                    .and_then(|e| e.get("message"))
458                                    .and_then(|m| m.as_str())
459                                    .map(String::from)
460                            })
461                            .unwrap_or_else(|| format!("HTTP {status}"));
462                        warn!(request_id = %request_id, status, error = %error_msg, "Streaming request failed");
463                        self.cb_record_failure();
464                        return Err(HttpError::Other(format!(
465                            "[request_id={request_id}] {error_msg}"
466                        )));
467                    } else {
468                        self.cb_record_success();
469                        return Ok(resp);
470                    }
471                }
472                Err(e) if is_retryable_error(&e) => {
473                    warn!(error = %e, attempt = attempt + 1, max = self.retry_config.max_retries, "Streaming request transport error");
474                    last_error = Some(HttpError::Request(e));
475                    if attempt < self.retry_config.max_retries {
476                        let delay = self.get_retry_delay(None, None, attempt);
477                        warn!(
478                            "Streaming request backing off for {:.1}s",
479                            delay.as_secs_f64()
480                        );
481                        self.interruptible_sleep(delay, cancel).await?;
482                        continue;
483                    }
484                }
485                Err(e) => {
486                    // Non-retryable transport error — fail immediately
487                    self.cb_record_failure();
488                    return Err(e.into());
489                }
490            }
491        }
492
493        self.cb_record_failure();
494        Err(last_error.unwrap_or_else(|| HttpError::Other("Streaming retries exhausted".into())))
495    }
496
497    /// Get the configured API URL.
498    pub fn api_url(&self) -> &str {
499        &self.api_url
500    }
501}
502
503impl std::fmt::Debug for HttpClient {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        let mut s = f.debug_struct("HttpClient");
506        s.field("api_url", &self.api_url)
507            .field("retry_config", &self.retry_config);
508        if let Some(cb) = &self.circuit_breaker {
509            s.field("circuit_breaker", cb);
510        }
511        s.finish()
512    }
513}
514
515/// Check if a reqwest error is transient and worth retrying.
516fn is_retryable_error(err: &reqwest::Error) -> bool {
517    err.is_connect() || err.is_timeout() || err.is_request()
518}
519
520#[cfg(test)]
521#[path = "client_tests.rs"]
522mod tests;