1use anyhow::{Context, Result};
4use async_trait::async_trait;
5use futures::StreamExt;
6use std::env;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10
11pub struct HttpResponse {
13 pub status: u16,
14 pub body: String,
15}
16
17pub struct StreamingHttpResponse {
19 pub status: u16,
20 pub retry_after: Option<String>,
22 pub byte_stream: Pin<Box<dyn futures::Stream<Item = Result<bytes::Bytes>> + Send>>,
24 pub error_body: String,
26}
27
28#[async_trait]
32pub trait HttpClient: Send + Sync {
33 async fn post(
35 &self,
36 url: &str,
37 headers: Vec<(&str, &str)>,
38 body: &serde_json::Value,
39 ) -> Result<HttpResponse>;
40
41 async fn post_streaming(
43 &self,
44 url: &str,
45 headers: Vec<(&str, &str)>,
46 body: &serde_json::Value,
47 ) -> Result<StreamingHttpResponse>;
48}
49
50pub struct ReqwestHttpClient {
52 client: reqwest::Client,
53}
54
55impl ReqwestHttpClient {
56 pub fn new() -> Self {
57 Self {
58 client: build_reqwest_client(None, None).expect("failed to build default HTTP client"),
59 }
60 }
61}
62
63impl Default for ReqwestHttpClient {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69#[async_trait]
70impl HttpClient for ReqwestHttpClient {
71 async fn post(
72 &self,
73 url: &str,
74 headers: Vec<(&str, &str)>,
75 body: &serde_json::Value,
76 ) -> Result<HttpResponse> {
77 tracing::debug!(
78 "HTTP POST to {}: {}",
79 url,
80 serde_json::to_string_pretty(body)?
81 );
82
83 let mut request = self.client.post(url);
84 for (key, value) in headers {
85 request = request.header(key, value);
86 }
87 request = request.json(body);
88
89 let response = request
90 .send()
91 .await
92 .context(format!("Failed to send request to {}", url))?;
93
94 let status = response.status().as_u16();
95 let body = response.text().await?;
96
97 Ok(HttpResponse { status, body })
98 }
99
100 async fn post_streaming(
101 &self,
102 url: &str,
103 headers: Vec<(&str, &str)>,
104 body: &serde_json::Value,
105 ) -> Result<StreamingHttpResponse> {
106 let mut request = self.client.post(url);
107 for (key, value) in headers {
108 request = request.header(key, value);
109 }
110 request = request.json(body);
111
112 let response = request
113 .send()
114 .await
115 .context(format!("Failed to send streaming request to {}", url))?;
116
117 let status = response.status().as_u16();
118 let retry_after = response
119 .headers()
120 .get("retry-after")
121 .and_then(|v| v.to_str().ok())
122 .map(String::from);
123
124 if (200..300).contains(&status) {
125 let byte_stream = response
126 .bytes_stream()
127 .map(|r| r.map_err(|e| anyhow::anyhow!("Stream error: {}", e)));
128 Ok(StreamingHttpResponse {
129 status,
130 retry_after,
131 byte_stream: Box::pin(byte_stream),
132 error_body: String::new(),
133 })
134 } else {
135 let error_body = response.text().await.unwrap_or_default();
136 let empty: futures::stream::Empty<Result<bytes::Bytes>> = futures::stream::empty();
138 Ok(StreamingHttpResponse {
139 status,
140 retry_after,
141 byte_stream: Box::pin(empty),
142 error_body,
143 })
144 }
145 }
146}
147
148pub fn default_http_client() -> Arc<dyn HttpClient> {
150 Arc::new(ReqwestHttpClient::new())
151}
152
153#[derive(Debug, Clone, Default, PartialEq, Eq)]
154struct ExplicitProxyConfig {
155 http: Option<String>,
156 https: Option<String>,
157}
158
159pub(crate) fn build_reqwest_client(
166 timeout: Option<Duration>,
167 default_headers: Option<reqwest::header::HeaderMap>,
168) -> Result<reqwest::Client> {
169 let mut builder = reqwest::Client::builder().no_proxy();
170
171 if let Some(timeout) = timeout {
172 builder = builder.timeout(timeout);
173 }
174
175 if let Some(default_headers) = default_headers {
176 builder = builder.default_headers(default_headers);
177 }
178
179 let proxy_config = explicit_proxy_config_from_env();
180 if let Some(http_proxy) = proxy_config.http.as_deref() {
181 builder = builder.proxy(
182 reqwest::Proxy::http(http_proxy)
183 .with_context(|| format!("Invalid HTTP proxy URL: {http_proxy}"))?,
184 );
185 }
186 if let Some(https_proxy) = proxy_config.https.as_deref() {
187 builder = builder.proxy(
188 reqwest::Proxy::https(https_proxy)
189 .with_context(|| format!("Invalid HTTPS proxy URL: {https_proxy}"))?,
190 );
191 }
192
193 builder.build().context("Failed to build reqwest client")
194}
195
196fn explicit_proxy_config_from_env() -> ExplicitProxyConfig {
197 let http = first_non_empty_env(&["http_proxy", "HTTP_PROXY"]);
198 let https = first_non_empty_env(&["https_proxy", "HTTPS_PROXY"]).or_else(|| http.clone());
199
200 ExplicitProxyConfig { http, https }
201}
202
203fn first_non_empty_env(keys: &[&str]) -> Option<String> {
204 keys.iter().find_map(|key| {
205 env::var(key)
206 .ok()
207 .map(|value| value.trim().to_string())
208 .filter(|value| !value.is_empty())
209 })
210}
211
212pub(crate) fn normalize_base_url(base_url: &str) -> String {
214 base_url
215 .trim_end_matches('/')
216 .trim_end_matches("/v1")
217 .trim_end_matches('/')
218 .to_string()
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use std::sync::{Mutex, OnceLock};
225
226 fn proxy_env_lock() -> &'static Mutex<()> {
227 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
228 LOCK.get_or_init(|| Mutex::new(()))
229 }
230
231 fn clear_proxy_env() {
232 for key in ["http_proxy", "HTTP_PROXY", "https_proxy", "HTTPS_PROXY"] {
233 unsafe { env::remove_var(key) };
234 }
235 }
236
237 #[test]
238 fn test_normalize_base_url() {
239 assert_eq!(
240 normalize_base_url("https://api.example.com"),
241 "https://api.example.com"
242 );
243 assert_eq!(
244 normalize_base_url("https://api.example.com/"),
245 "https://api.example.com"
246 );
247 assert_eq!(
248 normalize_base_url("https://api.example.com/v1"),
249 "https://api.example.com"
250 );
251 assert_eq!(
252 normalize_base_url("https://api.example.com/v1/"),
253 "https://api.example.com"
254 );
255 }
256
257 #[test]
258 fn test_normalize_base_url_edge_cases() {
259 assert_eq!(
260 normalize_base_url("http://localhost:8080/v1"),
261 "http://localhost:8080"
262 );
263 assert_eq!(
264 normalize_base_url("http://localhost:8080"),
265 "http://localhost:8080"
266 );
267 assert_eq!(
268 normalize_base_url("https://api.example.com/v1/"),
269 "https://api.example.com"
270 );
271 }
272
273 #[test]
274 fn test_normalize_base_url_multiple_trailing_slashes() {
275 assert_eq!(
276 normalize_base_url("https://api.example.com//"),
277 "https://api.example.com"
278 );
279 }
280
281 #[test]
282 fn test_normalize_base_url_with_port() {
283 assert_eq!(
284 normalize_base_url("http://localhost:11434/v1/"),
285 "http://localhost:11434"
286 );
287 }
288
289 #[test]
290 fn test_normalize_base_url_already_normalized() {
291 assert_eq!(
292 normalize_base_url("https://api.openai.com"),
293 "https://api.openai.com"
294 );
295 }
296
297 #[test]
298 fn test_normalize_base_url_empty_string() {
299 assert_eq!(normalize_base_url(""), "");
300 }
301
302 #[test]
303 fn test_default_http_client_creation() {
304 let _client = default_http_client();
305 }
306
307 #[test]
308 fn test_explicit_proxy_config_from_env_prefers_lowercase_vars() {
309 let _guard = proxy_env_lock().lock().unwrap();
310 clear_proxy_env();
311 unsafe {
312 env::set_var("http_proxy", "http://lower-http:3128");
313 env::set_var("HTTP_PROXY", "http://upper-http:3128");
314 env::set_var("https_proxy", "http://lower-https:3128");
315 env::set_var("HTTPS_PROXY", "http://upper-https:3128");
316 }
317
318 let proxy_config = explicit_proxy_config_from_env();
319
320 assert_eq!(
321 proxy_config,
322 ExplicitProxyConfig {
323 http: Some("http://lower-http:3128".to_string()),
324 https: Some("http://lower-https:3128".to_string()),
325 }
326 );
327 clear_proxy_env();
328 }
329
330 #[test]
331 fn test_explicit_proxy_config_from_env_falls_back_to_http_for_https() {
332 let _guard = proxy_env_lock().lock().unwrap();
333 clear_proxy_env();
334 unsafe {
335 env::set_var("HTTP_PROXY", "http://proxy.example:3128");
336 }
337
338 let proxy_config = explicit_proxy_config_from_env();
339
340 assert_eq!(
341 proxy_config,
342 ExplicitProxyConfig {
343 http: Some("http://proxy.example:3128".to_string()),
344 https: Some("http://proxy.example:3128".to_string()),
345 }
346 );
347 clear_proxy_env();
348 }
349
350 #[test]
351 fn test_build_reqwest_client_accepts_proxy_env_urls() {
352 let _guard = proxy_env_lock().lock().unwrap();
353 clear_proxy_env();
354 unsafe {
355 env::set_var("http_proxy", "http://127.0.0.1:3128");
356 env::set_var("https_proxy", "http://127.0.0.1:3128");
357 }
358
359 let client = build_reqwest_client(None, None);
360 assert!(client.is_ok());
361 clear_proxy_env();
362 }
363}