1use crate::error::{Error, Result};
4use crate::common;
5use crate::pool::{ConnectionPool, PoolConfig};
6use crate::interceptors::{Interceptors, RequestData, ResponseData};
7use bytes::Bytes;
8use http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio::net::TcpStream;
13
14#[derive(Clone)]
16pub struct Client {
17 pub config: ClientConfig,
19 pool: Arc<ConnectionPool>,
21 interceptors: Arc<Interceptors>,
23}
24
25#[derive(Clone)]
27pub struct ClientConfig {
28 pub timeout: Duration,
30 pub default_headers: HeaderMap,
32 pub avl_auth: Option<String>,
34 pub region: Option<String>,
36 pub compression: bool,
38 pub max_redirects: usize,
40 pub pool_config: PoolConfig,
42}
43
44impl Default for ClientConfig {
45 fn default() -> Self {
46 Self {
47 timeout: common::DEFAULT_TIMEOUT,
48 default_headers: HeaderMap::new(),
49 avl_auth: None,
50 region: None,
51 compression: false,
52 max_redirects: 5,
53 pool_config: PoolConfig::default(),
54 }
55 }
56}
57
58impl Client {
59 pub fn new() -> Self {
61 let config = ClientConfig::default();
62 let pool = Arc::new(ConnectionPool::with_config(config.pool_config.clone()));
63 Self {
64 config,
65 pool,
66 interceptors: Arc::new(Interceptors::new()),
67 }
68 }
69
70 pub fn builder() -> ClientBuilder {
72 ClientBuilder::new()
73 }
74
75 pub fn get(&self, url: impl Into<String>) -> RequestBuilder {
77 self.request(Method::GET, url)
78 }
79
80 pub fn post(&self, url: impl Into<String>) -> RequestBuilder {
82 self.request(Method::POST, url)
83 }
84
85 pub fn put(&self, url: impl Into<String>) -> RequestBuilder {
87 self.request(Method::PUT, url)
88 }
89
90 pub fn delete(&self, url: impl Into<String>) -> RequestBuilder {
92 self.request(Method::DELETE, url)
93 }
94
95 pub fn patch(&self, url: impl Into<String>) -> RequestBuilder {
97 self.request(Method::PATCH, url)
98 }
99
100 pub fn head(&self, url: impl Into<String>) -> RequestBuilder {
102 self.request(Method::HEAD, url)
103 }
104
105 pub fn request(&self, method: Method, url: impl Into<String>) -> RequestBuilder {
107 RequestBuilder {
108 client: self.clone(),
109 method,
110 url: url.into(),
111 headers: self.config.default_headers.clone(),
112 body: None,
113 query_params: Vec::new(),
114 timeout: Some(self.config.timeout),
115 }
116 }
117
118 pub async fn pool_stats(&self) -> crate::pool::PoolStats {
120 self.pool.stats().await
121 }
122
123 pub async fn cleanup_pool(&self) {
125 self.pool.cleanup_expired().await
126 }
127
128 pub fn on_request<F>(&mut self, interceptor: F)
132 where
133 F: Fn(&mut RequestData) + Send + Sync + 'static,
134 {
135 Arc::get_mut(&mut self.interceptors)
136 .expect("Cannot modify interceptors while client is cloned")
137 .add_request(interceptor);
138 }
139
140 pub fn on_response<F>(&mut self, interceptor: F)
144 where
145 F: Fn(&ResponseData) + Send + Sync + 'static,
146 {
147 Arc::get_mut(&mut self.interceptors)
148 .expect("Cannot modify interceptors while client is cloned")
149 .add_response(interceptor);
150 }
151}
152
153impl Default for Client {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159pub struct ClientBuilder {
161 config: ClientConfig,
162 interceptors: Interceptors,
163}
164
165impl ClientBuilder {
166 pub fn new() -> Self {
168 Self {
169 config: ClientConfig::default(),
170 interceptors: Interceptors::new(),
171 }
172 }
173
174 pub fn timeout(mut self, timeout: Duration) -> Self {
176 self.config.timeout = timeout;
177 self
178 }
179
180 pub fn default_header(mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> Result<Self> {
182 let name = HeaderName::from_bytes(name.as_ref().as_bytes())
183 .map_err(|_| Error::InvalidHeader {
184 name: name.as_ref().to_string(),
185 value: value.as_ref().to_string(),
186 })?;
187 let value = HeaderValue::from_str(value.as_ref())
188 .map_err(|_| Error::InvalidHeader {
189 name: name.to_string(),
190 value: value.as_ref().to_string(),
191 })?;
192 self.config.default_headers.insert(name, value);
193 Ok(self)
194 }
195
196 pub fn avl_auth(mut self, token: impl Into<String>) -> Self {
198 self.config.avl_auth = Some(token.into());
199 self
200 }
201
202 pub fn region(mut self, region: impl Into<String>) -> Self {
204 self.config.region = Some(region.into());
205 self
206 }
207
208 pub fn compression(mut self, enabled: bool) -> Self {
210 self.config.compression = enabled;
211 self
212 }
213
214 pub fn max_redirects(mut self, max: usize) -> Self {
216 self.config.max_redirects = max;
217 self
218 }
219
220 pub fn pool_max_connections(mut self, max: usize) -> Self {
222 self.config.pool_config.max_connections_per_host = max;
223 self
224 }
225
226 pub fn pool_idle_timeout(mut self, timeout: Duration) -> Self {
228 self.config.pool_config.idle_timeout = timeout;
229 self
230 }
231
232 pub fn pool_connection_timeout(mut self, timeout: Duration) -> Self {
234 self.config.pool_config.connection_timeout = timeout;
235 self
236 }
237
238 pub fn pool_keep_alive(mut self, enabled: bool) -> Self {
240 self.config.pool_config.keep_alive = enabled;
241 self
242 }
243
244 pub fn build(self) -> Result<Client> {
246 let pool = Arc::new(ConnectionPool::with_config(self.config.pool_config.clone()));
247 Ok(Client {
248 config: self.config,
249 pool,
250 interceptors: Arc::new(self.interceptors),
251 })
252 }
253
254 pub fn on_request<F>(mut self, interceptor: F) -> Self
256 where
257 F: Fn(&mut RequestData) + Send + Sync + 'static,
258 {
259 self.interceptors.add_request(interceptor);
260 self
261 }
262
263 pub fn on_response<F>(mut self, interceptor: F) -> Self
265 where
266 F: Fn(&ResponseData) + Send + Sync + 'static,
267 {
268 self.interceptors.add_response(interceptor);
269 self
270 }
271}
272
273impl Default for ClientBuilder {
274 fn default() -> Self {
275 Self::new()
276 }
277}
278
279pub struct RequestBuilder {
281 client: Client,
282 pub method: Method,
284 url: String,
285 headers: HeaderMap,
286 body: Option<Bytes>,
287 query_params: Vec<(String, String)>,
288 timeout: Option<Duration>,
289}
290
291impl RequestBuilder {
292 pub fn header(mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> Result<Self> {
294 let name = HeaderName::from_bytes(name.as_ref().as_bytes())
295 .map_err(|_| Error::InvalidHeader {
296 name: name.as_ref().to_string(),
297 value: value.as_ref().to_string(),
298 })?;
299 let value = HeaderValue::from_str(value.as_ref())
300 .map_err(|_| Error::InvalidHeader {
301 name: name.to_string(),
302 value: value.as_ref().to_string(),
303 })?;
304 self.headers.insert(name, value);
305 Ok(self)
306 }
307
308 pub fn body(mut self, body: impl Into<Bytes>) -> Self {
310 self.body = Some(body.into());
311 self
312 }
313
314 pub fn json<T: serde::Serialize>(mut self, json: &T) -> Result<Self> {
316 let json_str = serde_json::to_string(json)
317 .map_err(|e| Error::JsonError { source: e.to_string() })?;
318 self.body = Some(Bytes::from(json_str));
319 self = self.header("Content-Type", "application/json")?;
320 Ok(self)
321 }
322
323 pub fn query(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
325 self.query_params.push((key.into(), value.into()));
326 self
327 }
328
329 pub fn timeout(mut self, timeout: Duration) -> Self {
331 self.timeout = Some(timeout);
332 self
333 }
334
335 pub async fn send(mut self) -> Result<Response> {
337 let start_time = Instant::now();
338
339 let mut full_url = self.url.clone();
341 if !self.query_params.is_empty() {
342 let query_string = self.query_params
343 .iter()
344 .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
345 .collect::<Vec<_>>()
346 .join("&");
347 full_url = format!("{}?{}", full_url, query_string);
348 }
349
350 let mut request_data = RequestData::new(
352 self.method.clone(),
353 full_url.clone(),
354 self.headers.clone(),
355 self.body.clone(),
356 );
357 self.client.interceptors.apply_request(&mut request_data);
358
359 self.headers = request_data.headers;
361 self.body = request_data.body;
362
363 let (host, port, _is_https) = common::parse_url(&full_url)?;
365
366 let mut stream = self.client.pool.get_connection(&host, port).await?;
368
369 let path = full_url
371 .find("://")
372 .and_then(|pos| full_url[pos + 3..].find('/'))
373 .map(|pos| &full_url[full_url.find("://").unwrap() + 3 + pos..])
374 .unwrap_or("/");
375
376 let mut request = format!("{} {} HTTP/1.1\r\n", self.method, path);
377 request.push_str(&format!("Host: {}\r\n", host));
378 request.push_str("Connection: keep-alive\r\n");
379
380 for (name, value) in self.headers.iter() {
382 request.push_str(&format!("{}: {}\r\n", name, value.to_str().unwrap_or("")));
383 }
384
385 if let Some(auth) = &self.client.config.avl_auth {
387 request.push_str(&format!("Authorization: Bearer {}\r\n", auth));
388 }
389
390 if let Some(body) = &self.body {
392 request.push_str(&format!("Content-Length: {}\r\n", body.len()));
393 request.push_str("\r\n");
394 } else {
395 request.push_str("\r\n");
396 }
397
398 stream.write_all(request.as_bytes()).await?;
400 if let Some(body) = &self.body {
401 stream.write_all(body).await?;
402 }
403
404 let response = read_response_with_pool(&mut stream).await?;
406
407 self.client.pool.return_connection(&host, port, stream).await;
409
410 let duration_ms = start_time.elapsed().as_millis() as u64;
412 let response_data = ResponseData::new(
413 response.status.as_u16(),
414 response.headers.clone(),
415 response.body.len(),
416 duration_ms,
417 );
418 self.client.interceptors.apply_response(&response_data);
419
420 Ok(response)
421 }
422}
423
424pub struct Response {
426 status: StatusCode,
427 headers: HeaderMap,
428 body: Bytes,
429}
430
431impl Response {
432 pub fn status(&self) -> StatusCode {
434 self.status
435 }
436
437 pub fn headers(&self) -> &HeaderMap {
439 &self.headers
440 }
441
442 pub fn bytes(&self) -> &Bytes {
444 &self.body
445 }
446
447 pub async fn text(self) -> Result<String> {
449 String::from_utf8(self.body.to_vec())
450 .map_err(|e| Error::Internal { message: e.to_string() })
451 }
452
453 pub async fn json<T: serde::de::DeserializeOwned>(self) -> Result<T> {
455 serde_json::from_slice(&self.body)
456 .map_err(|e| Error::JsonError { source: e.to_string() })
457 }
458
459 pub fn is_success(&self) -> bool {
461 self.status.is_success()
462 }
463}
464
465async fn read_response_with_pool(stream: &mut TcpStream) -> Result<Response> {
466 let mut headers_buf = Vec::new();
468 let mut byte_buf = [0u8; 1];
469
470 loop {
472 stream.read_exact(&mut byte_buf).await?;
473 headers_buf.push(byte_buf[0]);
474
475 let len = headers_buf.len();
477 if len >= 4 {
478 if &headers_buf[len - 4..] == b"\r\n\r\n" {
479 break;
480 }
481 }
482
483 if headers_buf.len() > 8192 {
485 return Err(Error::Internal {
486 message: "Headers too large".to_string(),
487 });
488 }
489 }
490
491 let header_str = String::from_utf8_lossy(&headers_buf[..headers_buf.len() - 4]);
492 let mut lines = header_str.lines();
493
494 let status_line = lines.next().ok_or_else(|| Error::Internal {
496 message: "Empty response".to_string(),
497 })?;
498
499 let status_code = status_line
500 .split_whitespace()
501 .nth(1)
502 .and_then(|s| s.parse::<u16>().ok())
503 .ok_or_else(|| Error::Internal {
504 message: format!("Invalid status line: {}", status_line),
505 })?;
506
507 let status = StatusCode::from_u16(status_code).map_err(|_| Error::Internal {
508 message: format!("Invalid status code: {}", status_code),
509 })?;
510
511 let mut headers = HeaderMap::new();
513 let mut content_length: Option<usize> = None;
514
515 for line in lines {
516 if line.is_empty() {
517 break;
518 }
519
520 if let Some(pos) = line.find(':') {
521 let name = line[..pos].trim();
522 let value = line[pos + 1..].trim();
523
524 if name.eq_ignore_ascii_case("content-length") {
526 content_length = value.parse().ok();
527 }
528
529 if let (Ok(name), Ok(value)) = (
530 HeaderName::from_bytes(name.as_bytes()),
531 HeaderValue::from_str(value),
532 ) {
533 headers.insert(name, value);
534 }
535 }
536 }
537
538 let body = if let Some(length) = content_length {
540 if length > 0 {
541 let mut body_buf = vec![0u8; length];
542 stream.read_exact(&mut body_buf).await?;
543 Bytes::from(body_buf)
544 } else {
545 Bytes::new()
546 }
547 } else {
548 let mut body_buf = Vec::new();
550 let _ = stream.read_to_end(&mut body_buf).await;
551 Bytes::from(body_buf)
552 };
553
554 Ok(Response {
555 status,
556 headers,
557 body,
558 })
559}
560
561
562
563pub type Request = RequestBuilder;
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569
570 #[test]
571 fn test_client_builder() {
572 let client = Client::builder()
573 .timeout(Duration::from_secs(10))
574 .avl_auth("test-token")
575 .region("br-saopaulo-1")
576 .compression(true)
577 .build()
578 .unwrap();
579
580 assert_eq!(client.config.timeout, Duration::from_secs(10));
581 assert_eq!(client.config.avl_auth, Some("test-token".to_string()));
582 assert_eq!(client.config.region, Some("br-saopaulo-1".to_string()));
583 assert!(client.config.compression);
584 }
585
586 #[test]
587 fn test_request_builder_methods() {
588 let client = Client::new();
589
590 let get_req = client.get("https://example.com");
591 assert_eq!(get_req.method, Method::GET);
592
593 let post_req = client.post("https://example.com");
594 assert_eq!(post_req.method, Method::POST);
595 }
596
597 #[test]
598 fn test_request_with_query_params() {
599 let client = Client::new();
600 let req = client
601 .get("https://api.example.com/data")
602 .query("limit", "100")
603 .query("offset", "0");
604
605 assert_eq!(req.query_params.len(), 2);
606 assert_eq!(req.query_params[0], ("limit".to_string(), "100".to_string()));
607 }
608
609}