agent_air_runtime/client/http/
mod.rs1use std::pin::Pin;
2use std::time::Duration;
3
4use async_stream::stream;
5use futures::Stream;
6use http_body_util::{BodyExt, Full};
7use hyper::body::Bytes;
8use hyper::{Method, Request, StatusCode};
9use hyper_rustls::HttpsConnectorBuilder;
10use hyper_util::client::legacy::Client;
11use hyper_util::rt::TokioExecutor;
12
13use crate::client::error::LlmError;
14
15type HttpsClient = Client<
16 hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>,
17 Full<Bytes>,
18>;
19
20const MAX_RETRIES: u32 = 5;
22
23const BASE_DELAY_MS: u64 = 1000;
25
26const MAX_DELAY_MS: u64 = 60000;
28
29#[derive(Clone)]
31pub struct HttpClient {
32 client: HttpsClient,
33}
34
35fn calculate_backoff_delay(attempt: u32, response_text: &str) -> Duration {
38 if let Some(seconds) = extract_retry_after(response_text) {
41 return Duration::from_secs(seconds);
42 }
43
44 let exponential_delay = BASE_DELAY_MS * (1 << attempt);
46 let capped_delay = exponential_delay.min(MAX_DELAY_MS);
47
48 let jitter = (capped_delay as f64 * 0.25 * rand_factor()) as u64;
50 Duration::from_millis(capped_delay + jitter)
51}
52
53fn extract_retry_after(response_text: &str) -> Option<u64> {
55 let lower = response_text.to_lowercase();
57
58 if let Some(pos) = lower.find("retry after ") {
60 let after_pos = pos + "retry after ".len();
61 let remaining = &lower[after_pos..];
62 if let Some(space_pos) = remaining.find(' ')
63 && let Ok(seconds) = remaining[..space_pos].trim().parse::<u64>()
64 {
65 return Some(seconds);
66 }
67 }
68
69 if let Some(pos) = lower.find("\"retry_after\":") {
71 let after_pos = pos + "\"retry_after\":".len();
72 let remaining = &lower[after_pos..];
73 let trimmed = remaining.trim_start();
75 let num_str: String = trimmed.chars().take_while(|c| c.is_ascii_digit()).collect();
77 if let Ok(seconds) = num_str.parse::<u64>() {
78 return Some(seconds);
79 }
80 }
81
82 None
83}
84
85fn rand_factor() -> f64 {
88 use std::time::SystemTime;
89 let nanos = SystemTime::now()
90 .duration_since(SystemTime::UNIX_EPOCH)
91 .map(|d| d.subsec_nanos())
92 .unwrap_or(0);
93 (nanos % 1000) as f64 / 1000.0
94}
95
96impl HttpClient {
97 pub fn new() -> Result<Self, LlmError> {
99 let https = HttpsConnectorBuilder::new()
100 .with_native_roots()
101 .map_err(|e| {
102 LlmError::new(
103 "TLS_INIT_FAILED",
104 format!("failed to load native TLS roots: {}", e),
105 )
106 })?
107 .https_or_http()
108 .enable_http1()
109 .build();
110
111 let client = Client::builder(TokioExecutor::new()).build(https);
112 Ok(Self { client })
113 }
114
115 pub async fn get(&self, uri: &str) -> Result<String, LlmError> {
117 let uri: hyper::Uri = uri
118 .parse()
119 .map_err(|e| LlmError::new("HTTP_INVALID_URI", format!("{}", e)))?;
120
121 let request = Request::builder()
122 .method(Method::GET)
123 .uri(uri)
124 .body(Full::new(Bytes::new()))
125 .map_err(|e| LlmError::new("HTTP_REQUEST_BUILD", format!("{}", e)))?;
126
127 let res = self
128 .client
129 .request(request)
130 .await
131 .map_err(|e| LlmError::new("HTTP_REQUEST_FAILED", format!("{}", e)))?;
132
133 let body = res
134 .collect()
135 .await
136 .map_err(|e| LlmError::new("HTTP_BODY_READ", format!("{}", e)))?
137 .to_bytes();
138
139 String::from_utf8(body.to_vec())
140 .map_err(|e| LlmError::new("HTTP_INVALID_UTF8", format!("{}", e)))
141 }
142
143 pub async fn post(
147 &self,
148 uri: &str,
149 headers: &[(&str, &str)],
150 body: &str,
151 ) -> Result<String, LlmError> {
152 let parsed_uri: hyper::Uri = uri
153 .parse()
154 .map_err(|e| LlmError::new("HTTP_INVALID_URI", format!("{}", e)))?;
155
156 let mut last_error = None;
157
158 for attempt in 0..=MAX_RETRIES {
159 let mut builder = Request::builder()
160 .method(Method::POST)
161 .uri(parsed_uri.clone());
162
163 for (key, value) in headers {
164 builder = builder.header(*key, *value);
165 }
166
167 let request = builder
168 .body(Full::new(Bytes::from(body.to_string())))
169 .map_err(|e| LlmError::new("HTTP_REQUEST_BUILD", format!("{}", e)))?;
170
171 let res = self
172 .client
173 .request(request)
174 .await
175 .map_err(|e| LlmError::new("HTTP_REQUEST_FAILED", format!("{}", e)))?;
176
177 let status = res.status();
178
179 let response_body = res
180 .collect()
181 .await
182 .map_err(|e| LlmError::new("HTTP_BODY_READ", format!("{}", e)))?
183 .to_bytes();
184
185 let response_text = String::from_utf8(response_body.to_vec())
186 .map_err(|e| LlmError::new("HTTP_INVALID_UTF8", format!("{}", e)))?;
187
188 if (status == StatusCode::TOO_MANY_REQUESTS || status.as_u16() == 529)
190 && attempt < MAX_RETRIES
191 {
192 let delay = calculate_backoff_delay(attempt, &response_text);
193 tracing::warn!(
194 status = %status,
195 attempt = attempt + 1,
196 max_retries = MAX_RETRIES,
197 delay_ms = delay.as_millis(),
198 "Rate limited, retrying after delay"
199 );
200 tokio::time::sleep(delay).await;
201 last_error = Some(LlmError::new(
202 format!("HTTP_{}", status.as_u16()),
203 response_text,
204 ));
205 continue;
206 }
207
208 return Ok(response_text);
210 }
211
212 Err(last_error.unwrap_or_else(|| {
214 LlmError::new("RATE_LIMIT_EXHAUSTED", "Rate limit retries exhausted")
215 }))
216 }
217
218 pub async fn post_stream(
220 &self,
221 uri: &str,
222 headers: &[(&str, &str)],
223 body: &str,
224 ) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, LlmError>> + Send>>, LlmError> {
225 let parsed_uri: hyper::Uri = uri
226 .parse()
227 .map_err(|e| LlmError::new("HTTP_INVALID_URI", format!("{}", e)))?;
228
229 let mut last_error = None;
230
231 for attempt in 0..=MAX_RETRIES {
232 let mut builder = Request::builder()
233 .method(Method::POST)
234 .uri(parsed_uri.clone());
235
236 for (key, value) in headers {
237 builder = builder.header(*key, *value);
238 }
239
240 let request = builder
241 .body(Full::new(Bytes::from(body.to_string())))
242 .map_err(|e| LlmError::new("HTTP_REQUEST_BUILD", format!("{}", e)))?;
243
244 let res = self
245 .client
246 .request(request)
247 .await
248 .map_err(|e| LlmError::new("HTTP_REQUEST_FAILED", format!("{}", e)))?;
249
250 let status = res.status();
251
252 if status == StatusCode::TOO_MANY_REQUESTS || status.as_u16() == 529 {
254 let error_body = res
255 .collect()
256 .await
257 .map_err(|e| LlmError::new("HTTP_BODY_READ", format!("{}", e)))?
258 .to_bytes();
259 let error_text = String::from_utf8_lossy(&error_body).to_string();
260
261 if attempt < MAX_RETRIES {
262 let delay = calculate_backoff_delay(attempt, &error_text);
263 tracing::warn!(
264 status = %status,
265 attempt = attempt + 1,
266 max_retries = MAX_RETRIES,
267 delay_ms = delay.as_millis(),
268 "Rate limited on stream request, retrying after delay"
269 );
270 tokio::time::sleep(delay).await;
271 last_error = Some(LlmError::new(
272 format!("HTTP_{}", status.as_u16()),
273 error_text,
274 ));
275 continue;
276 }
277
278 return Err(LlmError::new(
280 format!("HTTP_{}", status.as_u16()),
281 error_text,
282 ));
283 }
284
285 if !status.is_success() {
287 let error_body = res
288 .collect()
289 .await
290 .map_err(|e| LlmError::new("HTTP_BODY_READ", format!("{}", e)))?
291 .to_bytes();
292 let error_text = String::from_utf8_lossy(&error_body);
293 return Err(LlmError::new(
294 format!("HTTP_{}", status.as_u16()),
295 error_text.to_string(),
296 ));
297 }
298
299 let response_body = res.into_body();
301 let byte_stream = stream! {
302 use http_body_util::BodyExt;
303 let mut body = response_body;
304 while let Some(frame_result) = body.frame().await {
305 match frame_result {
306 Ok(frame) => {
307 if let Some(data) = frame.data_ref() {
308 yield Ok(data.clone());
309 }
310 }
311 Err(e) => {
312 yield Err(LlmError::new("HTTP_STREAM_ERROR", format!("{}", e)));
313 break;
314 }
315 }
316 }
317 };
318
319 return Ok(Box::pin(byte_stream)
320 as Pin<Box<dyn Stream<Item = Result<Bytes, LlmError>> + Send>>);
321 }
322
323 Err(last_error.unwrap_or_else(|| {
325 LlmError::new("RATE_LIMIT_EXHAUSTED", "Rate limit retries exhausted")
326 }))
327 }
328}