Skip to main content

agent_air_runtime/client/http/
mod.rs

1use std::pin::Pin;
2use std::time::Duration;
3
4use async_stream::stream;
5use futures::Stream;
6use http_body_util::{BodyExt, Full};
7use hyper::body::Bytes;
8use hyper::{Method, Request, StatusCode};
9use hyper_rustls::HttpsConnectorBuilder;
10use hyper_util::client::legacy::Client;
11use hyper_util::rt::TokioExecutor;
12
13use crate::client::error::LlmError;
14
15type HttpsClient = Client<
16    hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
17    Full<Bytes>,
18>;
19
20/// Maximum number of retries for rate limit errors
21const MAX_RETRIES: u32 = 5;
22
23/// Base delay for exponential backoff (in milliseconds)
24const BASE_DELAY_MS: u64 = 1000;
25
26/// Maximum delay cap (in milliseconds)
27const MAX_DELAY_MS: u64 = 60000;
28
29/// HTTP client with TLS support and automatic retry logic.
30#[derive(Clone)]
31pub struct HttpClient {
32    client: HttpsClient,
33}
34
35/// Calculate exponential backoff delay with jitter.
36/// Also checks for Retry-After header in response body (Anthropic includes it in error message).
37fn calculate_backoff_delay(attempt: u32, response_text: &str) -> Duration {
38    // Try to extract retry-after from Anthropic's error response
39    // Format: "... Please retry after X seconds."
40    if let Some(seconds) = extract_retry_after(response_text) {
41        return Duration::from_secs(seconds);
42    }
43
44    // Exponential backoff: base * 2^attempt + jitter
45    let exponential_delay = BASE_DELAY_MS * (1 << attempt);
46    let capped_delay = exponential_delay.min(MAX_DELAY_MS);
47
48    // Add random jitter (0-25% of delay)
49    let jitter = (capped_delay as f64 * 0.25 * rand_factor()) as u64;
50    Duration::from_millis(capped_delay + jitter)
51}
52
53/// Extract retry-after seconds from Anthropic error message.
54fn extract_retry_after(response_text: &str) -> Option<u64> {
55    // Look for patterns like "retry after X seconds" or "retry_after": X
56    let lower = response_text.to_lowercase();
57
58    // Pattern: "retry after X seconds"
59    if let Some(pos) = lower.find("retry after ") {
60        let after_pos = pos + "retry after ".len();
61        let remaining = &lower[after_pos..];
62        if let Some(space_pos) = remaining.find(' ')
63            && let Ok(seconds) = remaining[..space_pos].trim().parse::<u64>()
64        {
65            return Some(seconds);
66        }
67    }
68
69    // Pattern: "retry_after": X (JSON field)
70    if let Some(pos) = lower.find("\"retry_after\":") {
71        let after_pos = pos + "\"retry_after\":".len();
72        let remaining = &lower[after_pos..];
73        // Skip whitespace
74        let trimmed = remaining.trim_start();
75        // Parse number
76        let num_str: String = trimmed.chars().take_while(|c| c.is_ascii_digit()).collect();
77        if let Ok(seconds) = num_str.parse::<u64>() {
78            return Some(seconds);
79        }
80    }
81
82    None
83}
84
85/// Simple pseudo-random factor between 0.0 and 1.0.
86/// Uses current time nanoseconds for randomness (good enough for jitter).
87fn rand_factor() -> f64 {
88    use std::time::SystemTime;
89    let nanos = SystemTime::now()
90        .duration_since(SystemTime::UNIX_EPOCH)
91        .map(|d| d.subsec_nanos())
92        .unwrap_or(0);
93    (nanos % 1000) as f64 / 1000.0
94}
95
96impl HttpClient {
97    /// Create a new HTTP client with native TLS roots.
98    pub fn new() -> Result<Self, LlmError> {
99        let https = HttpsConnectorBuilder::new()
100            .with_native_roots()
101            .map_err(|e| {
102                LlmError::new(
103                    "TLS_INIT_FAILED",
104                    format!("failed to load native TLS roots: {}", e),
105                )
106            })?
107            .https_or_http()
108            .enable_http1()
109            .build();
110
111        let client = Client::builder(TokioExecutor::new()).build(https);
112        Ok(Self { client })
113    }
114
115    /// Send a GET request and return the response body as a string.
116    pub async fn get(&self, uri: &str) -> Result<String, LlmError> {
117        let uri: hyper::Uri = uri
118            .parse()
119            .map_err(|e| LlmError::new("HTTP_INVALID_URI", format!("{}", e)))?;
120
121        let request = Request::builder()
122            .method(Method::GET)
123            .uri(uri)
124            .body(Full::new(Bytes::new()))
125            .map_err(|e| LlmError::new("HTTP_REQUEST_BUILD", format!("{}", e)))?;
126
127        let res = self
128            .client
129            .request(request)
130            .await
131            .map_err(|e| LlmError::new("HTTP_REQUEST_FAILED", format!("{}", e)))?;
132
133        let body = res
134            .collect()
135            .await
136            .map_err(|e| LlmError::new("HTTP_BODY_READ", format!("{}", e)))?
137            .to_bytes();
138
139        String::from_utf8(body.to_vec())
140            .map_err(|e| LlmError::new("HTTP_INVALID_UTF8", format!("{}", e)))
141    }
142
143    /// Send a POST request with automatic retry on rate limits.
144    ///
145    /// Retries up to 3 times on 429 or 529 status codes with exponential backoff.
146    pub async fn post(
147        &self,
148        uri: &str,
149        headers: &[(&str, &str)],
150        body: &str,
151    ) -> Result<String, LlmError> {
152        let parsed_uri: hyper::Uri = uri
153            .parse()
154            .map_err(|e| LlmError::new("HTTP_INVALID_URI", format!("{}", e)))?;
155
156        let mut last_error = None;
157
158        for attempt in 0..=MAX_RETRIES {
159            let mut builder = Request::builder()
160                .method(Method::POST)
161                .uri(parsed_uri.clone());
162
163            for (key, value) in headers {
164                builder = builder.header(*key, *value);
165            }
166
167            let request = builder
168                .body(Full::new(Bytes::from(body.to_string())))
169                .map_err(|e| LlmError::new("HTTP_REQUEST_BUILD", format!("{}", e)))?;
170
171            let res = self
172                .client
173                .request(request)
174                .await
175                .map_err(|e| LlmError::new("HTTP_REQUEST_FAILED", format!("{}", e)))?;
176
177            let status = res.status();
178
179            let response_body = res
180                .collect()
181                .await
182                .map_err(|e| LlmError::new("HTTP_BODY_READ", format!("{}", e)))?
183                .to_bytes();
184
185            let response_text = String::from_utf8(response_body.to_vec())
186                .map_err(|e| LlmError::new("HTTP_INVALID_UTF8", format!("{}", e)))?;
187
188            // Check for rate limit (429) or overloaded (529)
189            if (status == StatusCode::TOO_MANY_REQUESTS || status.as_u16() == 529)
190                && attempt < MAX_RETRIES
191            {
192                let delay = calculate_backoff_delay(attempt, &response_text);
193                tracing::warn!(
194                    status = %status,
195                    attempt = attempt + 1,
196                    max_retries = MAX_RETRIES,
197                    delay_ms = delay.as_millis(),
198                    "Rate limited, retrying after delay"
199                );
200                tokio::time::sleep(delay).await;
201                last_error = Some(LlmError::new(
202                    format!("HTTP_{}", status.as_u16()),
203                    response_text,
204                ));
205                continue;
206            }
207
208            // Return the response body (caller parses for API errors)
209            return Ok(response_text);
210        }
211
212        // All retries exhausted
213        Err(last_error.unwrap_or_else(|| {
214            LlmError::new("RATE_LIMIT_EXHAUSTED", "Rate limit retries exhausted")
215        }))
216    }
217
218    /// POST request that returns a stream of bytes for SSE handling.
219    pub async fn post_stream(
220        &self,
221        uri: &str,
222        headers: &[(&str, &str)],
223        body: &str,
224    ) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, LlmError>> + Send>>, LlmError> {
225        let parsed_uri: hyper::Uri = uri
226            .parse()
227            .map_err(|e| LlmError::new("HTTP_INVALID_URI", format!("{}", e)))?;
228
229        let mut last_error = None;
230
231        for attempt in 0..=MAX_RETRIES {
232            let mut builder = Request::builder()
233                .method(Method::POST)
234                .uri(parsed_uri.clone());
235
236            for (key, value) in headers {
237                builder = builder.header(*key, *value);
238            }
239
240            let request = builder
241                .body(Full::new(Bytes::from(body.to_string())))
242                .map_err(|e| LlmError::new("HTTP_REQUEST_BUILD", format!("{}", e)))?;
243
244            let res = self
245                .client
246                .request(request)
247                .await
248                .map_err(|e| LlmError::new("HTTP_REQUEST_FAILED", format!("{}", e)))?;
249
250            let status = res.status();
251
252            // Check for rate limit (429) or overloaded (529)
253            if status == StatusCode::TOO_MANY_REQUESTS || status.as_u16() == 529 {
254                let error_body = res
255                    .collect()
256                    .await
257                    .map_err(|e| LlmError::new("HTTP_BODY_READ", format!("{}", e)))?
258                    .to_bytes();
259                let error_text = String::from_utf8_lossy(&error_body).to_string();
260
261                if attempt < MAX_RETRIES {
262                    let delay = calculate_backoff_delay(attempt, &error_text);
263                    tracing::warn!(
264                        status = %status,
265                        attempt = attempt + 1,
266                        max_retries = MAX_RETRIES,
267                        delay_ms = delay.as_millis(),
268                        "Rate limited on stream request, retrying after delay"
269                    );
270                    tokio::time::sleep(delay).await;
271                    last_error = Some(LlmError::new(
272                        format!("HTTP_{}", status.as_u16()),
273                        error_text,
274                    ));
275                    continue;
276                }
277
278                // Max retries exceeded
279                return Err(LlmError::new(
280                    format!("HTTP_{}", status.as_u16()),
281                    error_text,
282                ));
283            }
284
285            // Check for other error status codes (no retry)
286            if !status.is_success() {
287                let error_body = res
288                    .collect()
289                    .await
290                    .map_err(|e| LlmError::new("HTTP_BODY_READ", format!("{}", e)))?
291                    .to_bytes();
292                let error_text = String::from_utf8_lossy(&error_body);
293                return Err(LlmError::new(
294                    format!("HTTP_{}", status.as_u16()),
295                    error_text.to_string(),
296                ));
297            }
298
299            // Success - return the stream
300            let response_body = res.into_body();
301            let byte_stream = stream! {
302                use http_body_util::BodyExt;
303                let mut body = response_body;
304                while let Some(frame_result) = body.frame().await {
305                    match frame_result {
306                        Ok(frame) => {
307                            if let Some(data) = frame.data_ref() {
308                                yield Ok(data.clone());
309                            }
310                        }
311                        Err(e) => {
312                            yield Err(LlmError::new("HTTP_STREAM_ERROR", format!("{}", e)));
313                            break;
314                        }
315                    }
316                }
317            };
318
319            return Ok(Box::pin(byte_stream)
320                as Pin<Box<dyn Stream<Item = Result<Bytes, LlmError>> + Send>>);
321        }
322
323        // All retries exhausted
324        Err(last_error.unwrap_or_else(|| {
325            LlmError::new("RATE_LIMIT_EXHAUSTED", "Rate limit retries exhausted")
326        }))
327    }
328}