avx_http/
client.rs

1//! HTTP Client implementation
2
3use crate::error::{Error, Result};
4use crate::common;
5use crate::pool::{ConnectionPool, PoolConfig};
6use crate::interceptors::{Interceptors, RequestData, ResponseData};
7use bytes::Bytes;
8use http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio::net::TcpStream;
13
14/// HTTP Client for making requests
15#[derive(Clone)]
16pub struct Client {
17    /// Client configuration
18    pub config: ClientConfig,
19    /// Connection pool
20    pool: Arc<ConnectionPool>,
21    /// Request and response interceptors
22    interceptors: Arc<Interceptors>,
23}
24
25/// Client configuration
26#[derive(Clone)]
27pub struct ClientConfig {
28    /// Request timeout
29    pub timeout: Duration,
30    /// Default headers
31    pub default_headers: HeaderMap,
32    /// AVL Platform auth token
33    pub avl_auth: Option<String>,
34    /// Preferred region
35    pub region: Option<String>,
36    /// Enable compression
37    pub compression: bool,
38    /// Maximum redirects
39    pub max_redirects: usize,
40    /// Connection pool configuration
41    pub pool_config: PoolConfig,
42}
43
44impl Default for ClientConfig {
45    fn default() -> Self {
46        Self {
47            timeout: common::DEFAULT_TIMEOUT,
48            default_headers: HeaderMap::new(),
49            avl_auth: None,
50            region: None,
51            compression: false,
52            max_redirects: 5,
53            pool_config: PoolConfig::default(),
54        }
55    }
56}
57
58impl Client {
59    /// Create a new client with default configuration
60    pub fn new() -> Self {
61        let config = ClientConfig::default();
62        let pool = Arc::new(ConnectionPool::with_config(config.pool_config.clone()));
63        Self {
64            config,
65            pool,
66            interceptors: Arc::new(Interceptors::new()),
67        }
68    }
69
70    /// Create a client builder for custom configuration
71    pub fn builder() -> ClientBuilder {
72        ClientBuilder::new()
73    }
74
75    /// Make a GET request
76    pub fn get(&self, url: impl Into<String>) -> RequestBuilder {
77        self.request(Method::GET, url)
78    }
79
80    /// Make a POST request
81    pub fn post(&self, url: impl Into<String>) -> RequestBuilder {
82        self.request(Method::POST, url)
83    }
84
85    /// Make a PUT request
86    pub fn put(&self, url: impl Into<String>) -> RequestBuilder {
87        self.request(Method::PUT, url)
88    }
89
90    /// Make a DELETE request
91    pub fn delete(&self, url: impl Into<String>) -> RequestBuilder {
92        self.request(Method::DELETE, url)
93    }
94
95    /// Make a PATCH request
96    pub fn patch(&self, url: impl Into<String>) -> RequestBuilder {
97        self.request(Method::PATCH, url)
98    }
99
100    /// Make a HEAD request
101    pub fn head(&self, url: impl Into<String>) -> RequestBuilder {
102        self.request(Method::HEAD, url)
103    }
104
105    /// Create a request with custom method
106    pub fn request(&self, method: Method, url: impl Into<String>) -> RequestBuilder {
107        RequestBuilder {
108            client: self.clone(),
109            method,
110            url: url.into(),
111            headers: self.config.default_headers.clone(),
112            body: None,
113            query_params: Vec::new(),
114            timeout: Some(self.config.timeout),
115        }
116    }
117
118    /// Get connection pool statistics
119    pub async fn pool_stats(&self) -> crate::pool::PoolStats {
120        self.pool.stats().await
121    }
122
123    /// Clean up expired connections from the pool
124    pub async fn cleanup_pool(&self) {
125        self.pool.cleanup_expired().await
126    }
127
128    /// Add a request interceptor
129    ///
130    /// The interceptor will be called before each request is sent
131    pub fn on_request<F>(&mut self, interceptor: F)
132    where
133        F: Fn(&mut RequestData) + Send + Sync + 'static,
134    {
135        Arc::get_mut(&mut self.interceptors)
136            .expect("Cannot modify interceptors while client is cloned")
137            .add_request(interceptor);
138    }
139
140    /// Add a response interceptor
141    ///
142    /// The interceptor will be called after each response is received
143    pub fn on_response<F>(&mut self, interceptor: F)
144    where
145        F: Fn(&ResponseData) + Send + Sync + 'static,
146    {
147        Arc::get_mut(&mut self.interceptors)
148            .expect("Cannot modify interceptors while client is cloned")
149            .add_response(interceptor);
150    }
151}
152
153impl Default for Client {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159/// Builder for configuring HTTP client
160pub struct ClientBuilder {
161    config: ClientConfig,
162    interceptors: Interceptors,
163}
164
165impl ClientBuilder {
166    /// Create a new client builder
167    pub fn new() -> Self {
168        Self {
169            config: ClientConfig::default(),
170            interceptors: Interceptors::new(),
171        }
172    }
173
174    /// Set request timeout
175    pub fn timeout(mut self, timeout: Duration) -> Self {
176        self.config.timeout = timeout;
177        self
178    }
179
180    /// Add a default header for all requests
181    pub fn default_header(mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> Result<Self> {
182        let name = HeaderName::from_bytes(name.as_ref().as_bytes())
183            .map_err(|_| Error::InvalidHeader {
184                name: name.as_ref().to_string(),
185                value: value.as_ref().to_string(),
186            })?;
187        let value = HeaderValue::from_str(value.as_ref())
188            .map_err(|_| Error::InvalidHeader {
189                name: name.to_string(),
190                value: value.as_ref().to_string(),
191            })?;
192        self.config.default_headers.insert(name, value);
193        Ok(self)
194    }
195
196    /// Set AVL Platform authentication token
197    pub fn avl_auth(mut self, token: impl Into<String>) -> Self {
198        self.config.avl_auth = Some(token.into());
199        self
200    }
201
202    /// Set preferred region for regional routing
203    pub fn region(mut self, region: impl Into<String>) -> Self {
204        self.config.region = Some(region.into());
205        self
206    }
207
208    /// Enable automatic compression
209    pub fn compression(mut self, enabled: bool) -> Self {
210        self.config.compression = enabled;
211        self
212    }
213
214    /// Set maximum number of redirects to follow
215    pub fn max_redirects(mut self, max: usize) -> Self {
216        self.config.max_redirects = max;
217        self
218    }
219
220    /// Set connection pool max connections per host
221    pub fn pool_max_connections(mut self, max: usize) -> Self {
222        self.config.pool_config.max_connections_per_host = max;
223        self
224    }
225
226    /// Set connection pool idle timeout
227    pub fn pool_idle_timeout(mut self, timeout: Duration) -> Self {
228        self.config.pool_config.idle_timeout = timeout;
229        self
230    }
231
232    /// Set connection pool connection timeout
233    pub fn pool_connection_timeout(mut self, timeout: Duration) -> Self {
234        self.config.pool_config.connection_timeout = timeout;
235        self
236    }
237
238    /// Enable or disable connection keep-alive
239    pub fn pool_keep_alive(mut self, enabled: bool) -> Self {
240        self.config.pool_config.keep_alive = enabled;
241        self
242    }
243
244    /// Build the client
245    pub fn build(self) -> Result<Client> {
246        let pool = Arc::new(ConnectionPool::with_config(self.config.pool_config.clone()));
247        Ok(Client {
248            config: self.config,
249            pool,
250            interceptors: Arc::new(self.interceptors),
251        })
252    }
253
254    /// Add a request interceptor
255    pub fn on_request<F>(mut self, interceptor: F) -> Self
256    where
257        F: Fn(&mut RequestData) + Send + Sync + 'static,
258    {
259        self.interceptors.add_request(interceptor);
260        self
261    }
262
263    /// Add a response interceptor
264    pub fn on_response<F>(mut self, interceptor: F) -> Self
265    where
266        F: Fn(&ResponseData) + Send + Sync + 'static,
267    {
268        self.interceptors.add_response(interceptor);
269        self
270    }
271}
272
273impl Default for ClientBuilder {
274    fn default() -> Self {
275        Self::new()
276    }
277}
278
279/// Builder for constructing HTTP requests
280pub struct RequestBuilder {
281    client: Client,
282    /// HTTP method
283    pub method: Method,
284    url: String,
285    headers: HeaderMap,
286    body: Option<Bytes>,
287    query_params: Vec<(String, String)>,
288    timeout: Option<Duration>,
289}
290
291impl RequestBuilder {
292    /// Add a header to the request
293    pub fn header(mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> Result<Self> {
294        let name = HeaderName::from_bytes(name.as_ref().as_bytes())
295            .map_err(|_| Error::InvalidHeader {
296                name: name.as_ref().to_string(),
297                value: value.as_ref().to_string(),
298            })?;
299        let value = HeaderValue::from_str(value.as_ref())
300            .map_err(|_| Error::InvalidHeader {
301                name: name.to_string(),
302                value: value.as_ref().to_string(),
303            })?;
304        self.headers.insert(name, value);
305        Ok(self)
306    }
307
308    /// Set request body
309    pub fn body(mut self, body: impl Into<Bytes>) -> Self {
310        self.body = Some(body.into());
311        self
312    }
313
314    /// Set JSON body
315    pub fn json<T: serde::Serialize>(mut self, json: &T) -> Result<Self> {
316        let json_str = serde_json::to_string(json)
317            .map_err(|e| Error::JsonError { source: e.to_string() })?;
318        self.body = Some(Bytes::from(json_str));
319        self = self.header("Content-Type", "application/json")?;
320        Ok(self)
321    }
322
323    /// Add query parameter
324    pub fn query(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
325        self.query_params.push((key.into(), value.into()));
326        self
327    }
328
329    /// Set request timeout
330    pub fn timeout(mut self, timeout: Duration) -> Self {
331        self.timeout = Some(timeout);
332        self
333    }
334
335    /// Send the request
336    pub async fn send(mut self) -> Result<Response> {
337        let start_time = Instant::now();
338
339        // Build full URL with query params
340        let mut full_url = self.url.clone();
341        if !self.query_params.is_empty() {
342            let query_string = self.query_params
343                .iter()
344                .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
345                .collect::<Vec<_>>()
346                .join("&");
347            full_url = format!("{}?{}", full_url, query_string);
348        }
349
350        // Apply request interceptors
351        let mut request_data = RequestData::new(
352            self.method.clone(),
353            full_url.clone(),
354            self.headers.clone(),
355            self.body.clone(),
356        );
357        self.client.interceptors.apply_request(&mut request_data);
358
359        // Update request with interceptor changes
360        self.headers = request_data.headers;
361        self.body = request_data.body;
362
363        // Parse URL
364        let (host, port, _is_https) = common::parse_url(&full_url)?;
365
366        // Get connection from pool
367        let mut stream = self.client.pool.get_connection(&host, port).await?;
368
369        // Build HTTP request
370        let path = full_url
371            .find("://")
372            .and_then(|pos| full_url[pos + 3..].find('/'))
373            .map(|pos| &full_url[full_url.find("://").unwrap() + 3 + pos..])
374            .unwrap_or("/");
375
376        let mut request = format!("{} {} HTTP/1.1\r\n", self.method, path);
377        request.push_str(&format!("Host: {}\r\n", host));
378        request.push_str("Connection: keep-alive\r\n");
379
380        // Add headers
381        for (name, value) in self.headers.iter() {
382            request.push_str(&format!("{}: {}\r\n", name, value.to_str().unwrap_or("")));
383        }
384
385        // Add AVL auth if configured
386        if let Some(auth) = &self.client.config.avl_auth {
387            request.push_str(&format!("Authorization: Bearer {}\r\n", auth));
388        }
389
390        // Add body
391        if let Some(body) = &self.body {
392            request.push_str(&format!("Content-Length: {}\r\n", body.len()));
393            request.push_str("\r\n");
394        } else {
395            request.push_str("\r\n");
396        }
397
398        // Send request
399        stream.write_all(request.as_bytes()).await?;
400        if let Some(body) = &self.body {
401            stream.write_all(body).await?;
402        }
403
404        // Read response with connection pooling support
405        let response = read_response_with_pool(&mut stream).await?;
406
407        // Return connection to pool for reuse
408        self.client.pool.return_connection(&host, port, stream).await;
409
410        // Apply response interceptors
411        let duration_ms = start_time.elapsed().as_millis() as u64;
412        let response_data = ResponseData::new(
413            response.status.as_u16(),
414            response.headers.clone(),
415            response.body.len(),
416            duration_ms,
417        );
418        self.client.interceptors.apply_response(&response_data);
419
420        Ok(response)
421    }
422}
423
424/// HTTP Response
425pub struct Response {
426    status: StatusCode,
427    headers: HeaderMap,
428    body: Bytes,
429}
430
431impl Response {
432    /// Get response status code
433    pub fn status(&self) -> StatusCode {
434        self.status
435    }
436
437    /// Get response headers
438    pub fn headers(&self) -> &HeaderMap {
439        &self.headers
440    }
441
442    /// Get response body as bytes
443    pub fn bytes(&self) -> &Bytes {
444        &self.body
445    }
446
447    /// Get response body as text
448    pub async fn text(self) -> Result<String> {
449        String::from_utf8(self.body.to_vec())
450            .map_err(|e| Error::Internal { message: e.to_string() })
451    }
452
453    /// Parse response body as JSON
454    pub async fn json<T: serde::de::DeserializeOwned>(self) -> Result<T> {
455        serde_json::from_slice(&self.body)
456            .map_err(|e| Error::JsonError { source: e.to_string() })
457    }
458
459    /// Check if response status is success (2xx)
460    pub fn is_success(&self) -> bool {
461        self.status.is_success()
462    }
463}
464
465async fn read_response_with_pool(stream: &mut TcpStream) -> Result<Response> {
466    // Read status line and headers
467    let mut headers_buf = Vec::new();
468    let mut byte_buf = [0u8; 1];
469
470    // Read until we find \r\n\r\n (end of headers)
471    loop {
472        stream.read_exact(&mut byte_buf).await?;
473        headers_buf.push(byte_buf[0]);
474
475        // Check for \r\n\r\n pattern
476        let len = headers_buf.len();
477        if len >= 4 {
478            if &headers_buf[len - 4..] == b"\r\n\r\n" {
479                break;
480            }
481        }
482
483        // Prevent infinite loop
484        if headers_buf.len() > 8192 {
485            return Err(Error::Internal {
486                message: "Headers too large".to_string(),
487            });
488        }
489    }
490
491    let header_str = String::from_utf8_lossy(&headers_buf[..headers_buf.len() - 4]);
492    let mut lines = header_str.lines();
493
494    // Parse status line
495    let status_line = lines.next().ok_or_else(|| Error::Internal {
496        message: "Empty response".to_string(),
497    })?;
498
499    let status_code = status_line
500        .split_whitespace()
501        .nth(1)
502        .and_then(|s| s.parse::<u16>().ok())
503        .ok_or_else(|| Error::Internal {
504            message: format!("Invalid status line: {}", status_line),
505        })?;
506
507    let status = StatusCode::from_u16(status_code).map_err(|_| Error::Internal {
508        message: format!("Invalid status code: {}", status_code),
509    })?;
510
511    // Parse headers
512    let mut headers = HeaderMap::new();
513    let mut content_length: Option<usize> = None;
514
515    for line in lines {
516        if line.is_empty() {
517            break;
518        }
519
520        if let Some(pos) = line.find(':') {
521            let name = line[..pos].trim();
522            let value = line[pos + 1..].trim();
523
524            // Check for Content-Length
525            if name.eq_ignore_ascii_case("content-length") {
526                content_length = value.parse().ok();
527            }
528
529            if let (Ok(name), Ok(value)) = (
530                HeaderName::from_bytes(name.as_bytes()),
531                HeaderValue::from_str(value),
532            ) {
533                headers.insert(name, value);
534            }
535        }
536    }
537
538    // Read body based on Content-Length
539    let body = if let Some(length) = content_length {
540        if length > 0 {
541            let mut body_buf = vec![0u8; length];
542            stream.read_exact(&mut body_buf).await?;
543            Bytes::from(body_buf)
544        } else {
545            Bytes::new()
546        }
547    } else {
548        // No Content-Length, read until connection closes (fallback)
549        let mut body_buf = Vec::new();
550        let _ = stream.read_to_end(&mut body_buf).await;
551        Bytes::from(body_buf)
552    };
553
554    Ok(Response {
555        status,
556        headers,
557        body,
558    })
559}
560
561
562
563/// Request type (re-export for convenience)
564pub type Request = RequestBuilder;
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569
570    #[test]
571    fn test_client_builder() {
572        let client = Client::builder()
573            .timeout(Duration::from_secs(10))
574            .avl_auth("test-token")
575            .region("br-saopaulo-1")
576            .compression(true)
577            .build()
578            .unwrap();
579
580        assert_eq!(client.config.timeout, Duration::from_secs(10));
581        assert_eq!(client.config.avl_auth, Some("test-token".to_string()));
582        assert_eq!(client.config.region, Some("br-saopaulo-1".to_string()));
583        assert!(client.config.compression);
584    }
585
586    #[test]
587    fn test_request_builder_methods() {
588        let client = Client::new();
589
590        let get_req = client.get("https://example.com");
591        assert_eq!(get_req.method, Method::GET);
592
593        let post_req = client.post("https://example.com");
594        assert_eq!(post_req.method, Method::POST);
595    }
596
597    #[test]
598    fn test_request_with_query_params() {
599        let client = Client::new();
600        let req = client
601            .get("https://api.example.com/data")
602            .query("limit", "100")
603            .query("offset", "0");
604
605        assert_eq!(req.query_params.len(), 2);
606        assert_eq!(req.query_params[0], ("limit".to_string(), "100".to_string()));
607    }
608
609}