Skip to main content

a3s_code_core/llm/
http.rs

1//! HTTP utilities and abstraction for LLM API calls
2
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use futures::StreamExt;
6use std::env;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio_util::sync::CancellationToken;
11
12/// HTTP response from a non-streaming POST request
13pub struct HttpResponse {
14    pub status: u16,
15    pub body: String,
16}
17
18/// HTTP response from a streaming POST request
19pub struct StreamingHttpResponse {
20    pub status: u16,
21    /// Retry-After header value (if present)
22    pub retry_after: Option<String>,
23    /// Byte stream (valid when status is 2xx)
24    pub byte_stream: Pin<Box<dyn futures::Stream<Item = Result<bytes::Bytes>> + Send>>,
25    /// Error body (populated when status is not 2xx)
26    pub error_body: String,
27}
28
29/// Information about an HTTP request for metrics collection.
30#[derive(Debug, Clone)]
31pub struct HttpMetricsRecord {
32    /// The target URL
33    pub url: String,
34    /// HTTP method (currently only POST is used for LLM calls)
35    pub method: String,
36    /// Response status code
37    pub status: u16,
38    /// Request duration in milliseconds
39    pub duration_ms: f64,
40    /// Number of bytes sent (request body size)
41    pub request_bytes: u64,
42    /// Number of bytes received (response body size)
43    pub response_bytes: u64,
44    /// Whether this was a streaming request
45    pub streaming: bool,
46}
47
48/// Callback function type for HTTP metrics collection.
49/// The callback is called after each HTTP request completes.
50pub type HttpMetricsCallback = Arc<dyn Fn(HttpMetricsRecord) + Send + Sync>;
51
52/// Global HTTP metrics callback registry.
53///
54/// Set this to enable HTTP metrics collection for LLM API calls.
55/// The callback will be invoked after each HTTP request completes.
56static HTTP_METRICS_CALLBACK: std::sync::RwLock<Option<HttpMetricsCallback>> =
57    std::sync::RwLock::new(None);
58
59/// Register a global HTTP metrics callback.
60/// The callback will be invoked after each HTTP request completes.
61pub fn set_http_metrics_callback(callback: HttpMetricsCallback) {
62    *HTTP_METRICS_CALLBACK.write().unwrap() = Some(callback);
63}
64
65/// Clear the global HTTP metrics callback.
66pub fn clear_http_metrics_callback() {
67    *HTTP_METRICS_CALLBACK.write().unwrap() = None;
68}
69
70fn maybe_record_metrics(record: HttpMetricsRecord) {
71    if let Some(callback) = HTTP_METRICS_CALLBACK.read().unwrap().as_ref() {
72        callback(record);
73    }
74}
75
76/// Abstraction over HTTP POST requests for LLM API calls.
77///
78/// Enables dependency injection for testing without hitting real HTTP endpoints.
79#[async_trait]
80pub trait HttpClient: Send + Sync {
81    /// Make a POST request and return status + body
82    async fn post(
83        &self,
84        url: &str,
85        headers: Vec<(&str, &str)>,
86        body: &serde_json::Value,
87        cancel_token: CancellationToken,
88    ) -> Result<HttpResponse>;
89
90    /// Make a POST request and return a streaming response.
91    /// If cancel_token is cancelled during the request, the HTTP connection is aborted.
92    async fn post_streaming(
93        &self,
94        url: &str,
95        headers: Vec<(&str, &str)>,
96        body: &serde_json::Value,
97        cancel_token: CancellationToken,
98    ) -> Result<StreamingHttpResponse>;
99}
100
101/// Default HTTP client backed by reqwest
102pub struct ReqwestHttpClient {
103    client: reqwest::Client,
104}
105
106impl ReqwestHttpClient {
107    pub fn new() -> Self {
108        Self {
109            client: build_reqwest_client(None, None).expect("failed to build default HTTP client"),
110        }
111    }
112}
113
114impl Default for ReqwestHttpClient {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120#[async_trait]
121impl HttpClient for ReqwestHttpClient {
122    async fn post(
123        &self,
124        url: &str,
125        headers: Vec<(&str, &str)>,
126        body: &serde_json::Value,
127        cancel_token: CancellationToken,
128    ) -> Result<HttpResponse> {
129        let start = std::time::Instant::now();
130        let request_body = serde_json::to_string(body).unwrap_or_default();
131        let request_bytes = request_body.len() as u64;
132
133        tracing::debug!(
134            "HTTP POST to {}: {}",
135            url,
136            serde_json::to_string_pretty(body)?
137        );
138
139        let mut request = self.client.post(url);
140        for (key, value) in headers {
141            request = request.header(key, value);
142        }
143        request = request.json(body);
144
145        let response = tokio::select! {
146            _ = cancel_token.cancelled() => {
147                anyhow::bail!("HTTP request cancelled");
148            }
149            result = request.send() => {
150                result.context(format!("Failed to send request to {}", url))?
151            }
152        };
153
154        let status = response.status().as_u16();
155        let response_body = response.text().await?;
156        let response_bytes = response_body.len() as u64;
157        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
158
159        maybe_record_metrics(HttpMetricsRecord {
160            url: url.to_string(),
161            method: "POST".to_string(),
162            status,
163            duration_ms,
164            request_bytes,
165            response_bytes,
166            streaming: false,
167        });
168
169        Ok(HttpResponse {
170            status,
171            body: response_body,
172        })
173    }
174
175    async fn post_streaming(
176        &self,
177        url: &str,
178        headers: Vec<(&str, &str)>,
179        body: &serde_json::Value,
180        cancel_token: CancellationToken,
181    ) -> Result<StreamingHttpResponse> {
182        let start = std::time::Instant::now();
183        let request_body = serde_json::to_string(body).unwrap_or_default();
184        let request_bytes = request_body.len() as u64;
185
186        let mut request = self.client.post(url);
187        for (key, value) in headers {
188            request = request.header(key, value);
189        }
190        request = request.json(body);
191
192        let response = tokio::select! {
193            _ = cancel_token.cancelled() => {
194                anyhow::bail!("HTTP streaming request cancelled");
195            }
196            result = request.send() => {
197                result.context(format!("Failed to send streaming request to {}", url))?
198            }
199        };
200
201        let status = response.status().as_u16();
202        let retry_after = response
203            .headers()
204            .get("retry-after")
205            .and_then(|v| v.to_str().ok())
206            .map(String::from);
207
208        // For streaming, we record metrics after sending but before consuming the stream
209        // Note: response_bytes is estimated as we can't know the full stream size upfront
210        let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
211        maybe_record_metrics(HttpMetricsRecord {
212            url: url.to_string(),
213            method: "POST".to_string(),
214            status,
215            duration_ms,
216            request_bytes,
217            response_bytes: 0, // Unknown for streaming
218            streaming: true,
219        });
220
221        if (200..300).contains(&status) {
222            let byte_stream = response
223                .bytes_stream()
224                .map(|r| r.map_err(|e| anyhow::anyhow!("Stream error: {}", e)));
225            Ok(StreamingHttpResponse {
226                status,
227                retry_after,
228                byte_stream: Box::pin(byte_stream),
229                error_body: String::new(),
230            })
231        } else {
232            let error_body = response.text().await.unwrap_or_default();
233            // Return an empty stream for error responses
234            let empty: futures::stream::Empty<Result<bytes::Bytes>> = futures::stream::empty();
235            Ok(StreamingHttpResponse {
236                status,
237                retry_after,
238                byte_stream: Box::pin(empty),
239                error_body,
240            })
241        }
242    }
243}
244
245/// Create a default HTTP client
246pub fn default_http_client() -> Arc<dyn HttpClient> {
247    Arc::new(ReqwestHttpClient::new())
248}
249
250#[derive(Debug, Clone, Default, PartialEq, Eq)]
251struct ExplicitProxyConfig {
252    http: Option<String>,
253    https: Option<String>,
254}
255
256/// Build a reqwest client without consulting system proxy settings.
257///
258/// On macOS test runners, the system proxy lookup path can panic inside the
259/// `system-configuration` crate when no dynamic store is available. Disabling
260/// implicit proxy discovery keeps client construction deterministic while still
261/// honoring standard proxy environment variables explicitly.
262pub(crate) fn build_reqwest_client(
263    timeout: Option<Duration>,
264    default_headers: Option<reqwest::header::HeaderMap>,
265) -> Result<reqwest::Client> {
266    let mut builder = reqwest::Client::builder().no_proxy();
267
268    if let Some(timeout) = timeout {
269        builder = builder.timeout(timeout);
270    }
271
272    if let Some(default_headers) = default_headers {
273        builder = builder.default_headers(default_headers);
274    }
275
276    let proxy_config = explicit_proxy_config_from_env();
277    if let Some(http_proxy) = proxy_config.http.as_deref() {
278        builder = builder.proxy(
279            reqwest::Proxy::http(http_proxy)
280                .with_context(|| format!("Invalid HTTP proxy URL: {http_proxy}"))?,
281        );
282    }
283    if let Some(https_proxy) = proxy_config.https.as_deref() {
284        builder = builder.proxy(
285            reqwest::Proxy::https(https_proxy)
286                .with_context(|| format!("Invalid HTTPS proxy URL: {https_proxy}"))?,
287        );
288    }
289
290    builder.build().context("Failed to build reqwest client")
291}
292
293fn explicit_proxy_config_from_env() -> ExplicitProxyConfig {
294    let http = first_non_empty_env(&["http_proxy", "HTTP_PROXY"]);
295    let https = first_non_empty_env(&["https_proxy", "HTTPS_PROXY"]).or_else(|| http.clone());
296
297    ExplicitProxyConfig { http, https }
298}
299
300fn first_non_empty_env(keys: &[&str]) -> Option<String> {
301    keys.iter().find_map(|key| {
302        env::var(key)
303            .ok()
304            .map(|value| value.trim().to_string())
305            .filter(|value| !value.is_empty())
306    })
307}
308
309/// Normalize base URL by stripping trailing /v1
310pub(crate) fn normalize_base_url(base_url: &str) -> String {
311    base_url
312        .trim_end_matches('/')
313        .trim_end_matches("/v1")
314        .trim_end_matches('/')
315        .to_string()
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use std::sync::{Mutex, OnceLock};
322
323    fn proxy_env_lock() -> &'static Mutex<()> {
324        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
325        LOCK.get_or_init(|| Mutex::new(()))
326    }
327
328    fn clear_proxy_env() {
329        for key in ["http_proxy", "HTTP_PROXY", "https_proxy", "HTTPS_PROXY"] {
330            unsafe { env::remove_var(key) };
331        }
332    }
333
334    #[test]
335    fn test_normalize_base_url() {
336        assert_eq!(
337            normalize_base_url("https://api.example.com"),
338            "https://api.example.com"
339        );
340        assert_eq!(
341            normalize_base_url("https://api.example.com/"),
342            "https://api.example.com"
343        );
344        assert_eq!(
345            normalize_base_url("https://api.example.com/v1"),
346            "https://api.example.com"
347        );
348        assert_eq!(
349            normalize_base_url("https://api.example.com/v1/"),
350            "https://api.example.com"
351        );
352    }
353
354    #[test]
355    fn test_normalize_base_url_edge_cases() {
356        assert_eq!(
357            normalize_base_url("http://localhost:8080/v1"),
358            "http://localhost:8080"
359        );
360        assert_eq!(
361            normalize_base_url("http://localhost:8080"),
362            "http://localhost:8080"
363        );
364        assert_eq!(
365            normalize_base_url("https://api.example.com/v1/"),
366            "https://api.example.com"
367        );
368    }
369
370    #[test]
371    fn test_normalize_base_url_multiple_trailing_slashes() {
372        assert_eq!(
373            normalize_base_url("https://api.example.com//"),
374            "https://api.example.com"
375        );
376    }
377
378    #[test]
379    fn test_normalize_base_url_with_port() {
380        assert_eq!(
381            normalize_base_url("http://localhost:11434/v1/"),
382            "http://localhost:11434"
383        );
384    }
385
386    #[test]
387    fn test_normalize_base_url_already_normalized() {
388        assert_eq!(
389            normalize_base_url("https://api.openai.com"),
390            "https://api.openai.com"
391        );
392    }
393
394    #[test]
395    fn test_normalize_base_url_empty_string() {
396        assert_eq!(normalize_base_url(""), "");
397    }
398
399    #[test]
400    fn test_default_http_client_creation() {
401        let _client = default_http_client();
402    }
403
404    #[test]
405    fn test_explicit_proxy_config_from_env_prefers_lowercase_vars() {
406        let _guard = proxy_env_lock().lock().unwrap();
407        clear_proxy_env();
408        unsafe {
409            env::set_var("http_proxy", "http://lower-http:3128");
410            env::set_var("HTTP_PROXY", "http://upper-http:3128");
411            env::set_var("https_proxy", "http://lower-https:3128");
412            env::set_var("HTTPS_PROXY", "http://upper-https:3128");
413        }
414
415        let proxy_config = explicit_proxy_config_from_env();
416
417        assert_eq!(
418            proxy_config,
419            ExplicitProxyConfig {
420                http: Some("http://lower-http:3128".to_string()),
421                https: Some("http://lower-https:3128".to_string()),
422            }
423        );
424        clear_proxy_env();
425    }
426
427    #[test]
428    fn test_explicit_proxy_config_from_env_falls_back_to_http_for_https() {
429        let _guard = proxy_env_lock().lock().unwrap();
430        clear_proxy_env();
431        unsafe {
432            env::set_var("HTTP_PROXY", "http://proxy.example:3128");
433        }
434
435        let proxy_config = explicit_proxy_config_from_env();
436
437        assert_eq!(
438            proxy_config,
439            ExplicitProxyConfig {
440                http: Some("http://proxy.example:3128".to_string()),
441                https: Some("http://proxy.example:3128".to_string()),
442            }
443        );
444        clear_proxy_env();
445    }
446
447    #[test]
448    fn test_build_reqwest_client_accepts_proxy_env_urls() {
449        let _guard = proxy_env_lock().lock().unwrap();
450        clear_proxy_env();
451        unsafe {
452            env::set_var("http_proxy", "http://127.0.0.1:3128");
453            env::set_var("https_proxy", "http://127.0.0.1:3128");
454        }
455
456        let client = build_reqwest_client(None, None);
457        assert!(client.is_ok());
458        clear_proxy_env();
459    }
460}