1use anyhow::{Context, Result};
4use async_trait::async_trait;
5use futures::StreamExt;
6use std::env;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio_util::sync::CancellationToken;
11
12pub struct HttpResponse {
14 pub status: u16,
15 pub body: String,
16}
17
18pub struct StreamingHttpResponse {
20 pub status: u16,
21 pub retry_after: Option<String>,
23 pub byte_stream: Pin<Box<dyn futures::Stream<Item = Result<bytes::Bytes>> + Send>>,
25 pub error_body: String,
27}
28
29#[derive(Debug, Clone)]
31pub struct HttpMetricsRecord {
32 pub url: String,
34 pub method: String,
36 pub status: u16,
38 pub duration_ms: f64,
40 pub request_bytes: u64,
42 pub response_bytes: u64,
44 pub streaming: bool,
46}
47
48pub type HttpMetricsCallback = Arc<dyn Fn(HttpMetricsRecord) + Send + Sync>;
51
52static HTTP_METRICS_CALLBACK: std::sync::RwLock<Option<HttpMetricsCallback>> =
57 std::sync::RwLock::new(None);
58
59pub fn set_http_metrics_callback(callback: HttpMetricsCallback) {
62 *HTTP_METRICS_CALLBACK.write().unwrap() = Some(callback);
63}
64
65pub fn clear_http_metrics_callback() {
67 *HTTP_METRICS_CALLBACK.write().unwrap() = None;
68}
69
70fn maybe_record_metrics(record: HttpMetricsRecord) {
71 if let Some(callback) = HTTP_METRICS_CALLBACK.read().unwrap().as_ref() {
72 callback(record);
73 }
74}
75
76#[async_trait]
80pub trait HttpClient: Send + Sync {
81 async fn post(
83 &self,
84 url: &str,
85 headers: Vec<(&str, &str)>,
86 body: &serde_json::Value,
87 cancel_token: CancellationToken,
88 ) -> Result<HttpResponse>;
89
90 async fn post_streaming(
93 &self,
94 url: &str,
95 headers: Vec<(&str, &str)>,
96 body: &serde_json::Value,
97 cancel_token: CancellationToken,
98 ) -> Result<StreamingHttpResponse>;
99}
100
101pub struct ReqwestHttpClient {
103 client: reqwest::Client,
104}
105
106impl ReqwestHttpClient {
107 pub fn new() -> Self {
108 Self {
109 client: build_reqwest_client(None, None).expect("failed to build default HTTP client"),
110 }
111 }
112}
113
114impl Default for ReqwestHttpClient {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120#[async_trait]
121impl HttpClient for ReqwestHttpClient {
122 async fn post(
123 &self,
124 url: &str,
125 headers: Vec<(&str, &str)>,
126 body: &serde_json::Value,
127 cancel_token: CancellationToken,
128 ) -> Result<HttpResponse> {
129 let start = std::time::Instant::now();
130 let request_body = serde_json::to_string(body).unwrap_or_default();
131 let request_bytes = request_body.len() as u64;
132
133 tracing::debug!(
134 "HTTP POST to {}: {}",
135 url,
136 serde_json::to_string_pretty(body)?
137 );
138
139 let mut request = self.client.post(url);
140 for (key, value) in headers {
141 request = request.header(key, value);
142 }
143 request = request.json(body);
144
145 let response = tokio::select! {
146 _ = cancel_token.cancelled() => {
147 anyhow::bail!("HTTP request cancelled");
148 }
149 result = request.send() => {
150 result.context(format!("Failed to send request to {}", url))?
151 }
152 };
153
154 let status = response.status().as_u16();
155 let response_body = response.text().await?;
156 let response_bytes = response_body.len() as u64;
157 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
158
159 maybe_record_metrics(HttpMetricsRecord {
160 url: url.to_string(),
161 method: "POST".to_string(),
162 status,
163 duration_ms,
164 request_bytes,
165 response_bytes,
166 streaming: false,
167 });
168
169 Ok(HttpResponse {
170 status,
171 body: response_body,
172 })
173 }
174
175 async fn post_streaming(
176 &self,
177 url: &str,
178 headers: Vec<(&str, &str)>,
179 body: &serde_json::Value,
180 cancel_token: CancellationToken,
181 ) -> Result<StreamingHttpResponse> {
182 let start = std::time::Instant::now();
183 let request_body = serde_json::to_string(body).unwrap_or_default();
184 let request_bytes = request_body.len() as u64;
185
186 let mut request = self.client.post(url);
187 for (key, value) in headers {
188 request = request.header(key, value);
189 }
190 request = request.json(body);
191
192 let response = tokio::select! {
193 _ = cancel_token.cancelled() => {
194 anyhow::bail!("HTTP streaming request cancelled");
195 }
196 result = request.send() => {
197 result.context(format!("Failed to send streaming request to {}", url))?
198 }
199 };
200
201 let status = response.status().as_u16();
202 let retry_after = response
203 .headers()
204 .get("retry-after")
205 .and_then(|v| v.to_str().ok())
206 .map(String::from);
207
208 let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
211 maybe_record_metrics(HttpMetricsRecord {
212 url: url.to_string(),
213 method: "POST".to_string(),
214 status,
215 duration_ms,
216 request_bytes,
217 response_bytes: 0, streaming: true,
219 });
220
221 if (200..300).contains(&status) {
222 let byte_stream = response
223 .bytes_stream()
224 .map(|r| r.map_err(|e| anyhow::anyhow!("Stream error: {}", e)));
225 Ok(StreamingHttpResponse {
226 status,
227 retry_after,
228 byte_stream: Box::pin(byte_stream),
229 error_body: String::new(),
230 })
231 } else {
232 let error_body = response.text().await.unwrap_or_default();
233 let empty: futures::stream::Empty<Result<bytes::Bytes>> = futures::stream::empty();
235 Ok(StreamingHttpResponse {
236 status,
237 retry_after,
238 byte_stream: Box::pin(empty),
239 error_body,
240 })
241 }
242 }
243}
244
245pub fn default_http_client() -> Arc<dyn HttpClient> {
247 Arc::new(ReqwestHttpClient::new())
248}
249
250#[derive(Debug, Clone, Default, PartialEq, Eq)]
251struct ExplicitProxyConfig {
252 http: Option<String>,
253 https: Option<String>,
254}
255
256pub(crate) fn build_reqwest_client(
263 timeout: Option<Duration>,
264 default_headers: Option<reqwest::header::HeaderMap>,
265) -> Result<reqwest::Client> {
266 let mut builder = reqwest::Client::builder().no_proxy();
267
268 if let Some(timeout) = timeout {
269 builder = builder.timeout(timeout);
270 }
271
272 if let Some(default_headers) = default_headers {
273 builder = builder.default_headers(default_headers);
274 }
275
276 let proxy_config = explicit_proxy_config_from_env();
277 if let Some(http_proxy) = proxy_config.http.as_deref() {
278 builder = builder.proxy(
279 reqwest::Proxy::http(http_proxy)
280 .with_context(|| format!("Invalid HTTP proxy URL: {http_proxy}"))?,
281 );
282 }
283 if let Some(https_proxy) = proxy_config.https.as_deref() {
284 builder = builder.proxy(
285 reqwest::Proxy::https(https_proxy)
286 .with_context(|| format!("Invalid HTTPS proxy URL: {https_proxy}"))?,
287 );
288 }
289
290 builder.build().context("Failed to build reqwest client")
291}
292
293fn explicit_proxy_config_from_env() -> ExplicitProxyConfig {
294 let http = first_non_empty_env(&["http_proxy", "HTTP_PROXY"]);
295 let https = first_non_empty_env(&["https_proxy", "HTTPS_PROXY"]).or_else(|| http.clone());
296
297 ExplicitProxyConfig { http, https }
298}
299
300fn first_non_empty_env(keys: &[&str]) -> Option<String> {
301 keys.iter().find_map(|key| {
302 env::var(key)
303 .ok()
304 .map(|value| value.trim().to_string())
305 .filter(|value| !value.is_empty())
306 })
307}
308
309pub(crate) fn normalize_base_url(base_url: &str) -> String {
311 base_url
312 .trim_end_matches('/')
313 .trim_end_matches("/v1")
314 .trim_end_matches('/')
315 .to_string()
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use std::sync::{Mutex, OnceLock};
322
323 fn proxy_env_lock() -> &'static Mutex<()> {
324 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
325 LOCK.get_or_init(|| Mutex::new(()))
326 }
327
328 fn clear_proxy_env() {
329 for key in ["http_proxy", "HTTP_PROXY", "https_proxy", "HTTPS_PROXY"] {
330 unsafe { env::remove_var(key) };
331 }
332 }
333
334 #[test]
335 fn test_normalize_base_url() {
336 assert_eq!(
337 normalize_base_url("https://api.example.com"),
338 "https://api.example.com"
339 );
340 assert_eq!(
341 normalize_base_url("https://api.example.com/"),
342 "https://api.example.com"
343 );
344 assert_eq!(
345 normalize_base_url("https://api.example.com/v1"),
346 "https://api.example.com"
347 );
348 assert_eq!(
349 normalize_base_url("https://api.example.com/v1/"),
350 "https://api.example.com"
351 );
352 }
353
354 #[test]
355 fn test_normalize_base_url_edge_cases() {
356 assert_eq!(
357 normalize_base_url("http://localhost:8080/v1"),
358 "http://localhost:8080"
359 );
360 assert_eq!(
361 normalize_base_url("http://localhost:8080"),
362 "http://localhost:8080"
363 );
364 assert_eq!(
365 normalize_base_url("https://api.example.com/v1/"),
366 "https://api.example.com"
367 );
368 }
369
370 #[test]
371 fn test_normalize_base_url_multiple_trailing_slashes() {
372 assert_eq!(
373 normalize_base_url("https://api.example.com//"),
374 "https://api.example.com"
375 );
376 }
377
378 #[test]
379 fn test_normalize_base_url_with_port() {
380 assert_eq!(
381 normalize_base_url("http://localhost:11434/v1/"),
382 "http://localhost:11434"
383 );
384 }
385
386 #[test]
387 fn test_normalize_base_url_already_normalized() {
388 assert_eq!(
389 normalize_base_url("https://api.openai.com"),
390 "https://api.openai.com"
391 );
392 }
393
394 #[test]
395 fn test_normalize_base_url_empty_string() {
396 assert_eq!(normalize_base_url(""), "");
397 }
398
399 #[test]
400 fn test_default_http_client_creation() {
401 let _client = default_http_client();
402 }
403
404 #[test]
405 fn test_explicit_proxy_config_from_env_prefers_lowercase_vars() {
406 let _guard = proxy_env_lock().lock().unwrap();
407 clear_proxy_env();
408 unsafe {
409 env::set_var("http_proxy", "http://lower-http:3128");
410 env::set_var("HTTP_PROXY", "http://upper-http:3128");
411 env::set_var("https_proxy", "http://lower-https:3128");
412 env::set_var("HTTPS_PROXY", "http://upper-https:3128");
413 }
414
415 let proxy_config = explicit_proxy_config_from_env();
416
417 assert_eq!(
418 proxy_config,
419 ExplicitProxyConfig {
420 http: Some("http://lower-http:3128".to_string()),
421 https: Some("http://lower-https:3128".to_string()),
422 }
423 );
424 clear_proxy_env();
425 }
426
427 #[test]
428 fn test_explicit_proxy_config_from_env_falls_back_to_http_for_https() {
429 let _guard = proxy_env_lock().lock().unwrap();
430 clear_proxy_env();
431 unsafe {
432 env::set_var("HTTP_PROXY", "http://proxy.example:3128");
433 }
434
435 let proxy_config = explicit_proxy_config_from_env();
436
437 assert_eq!(
438 proxy_config,
439 ExplicitProxyConfig {
440 http: Some("http://proxy.example:3128".to_string()),
441 https: Some("http://proxy.example:3128".to_string()),
442 }
443 );
444 clear_proxy_env();
445 }
446
447 #[test]
448 fn test_build_reqwest_client_accepts_proxy_env_urls() {
449 let _guard = proxy_env_lock().lock().unwrap();
450 clear_proxy_env();
451 unsafe {
452 env::set_var("http_proxy", "http://127.0.0.1:3128");
453 env::set_var("https_proxy", "http://127.0.0.1:3128");
454 }
455
456 let client = build_reqwest_client(None, None);
457 assert!(client.is_ok());
458 clear_proxy_env();
459 }
460}