1use std::collections::VecDeque;
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40use tokio::sync::{Mutex, RwLock, Semaphore};
41use tokio::time::sleep;
42
43#[derive(Debug, Clone)]
48pub struct PoolConfig {
49 max_connections: usize,
51 idle_timeout: Duration,
53 connect_timeout: Duration,
55 request_timeout: Duration,
57 max_retries: usize,
59 retry_base_delay: Duration,
61 retry_max_delay: Duration,
63 tcp_keepalive: bool,
65 tcp_keepalive_interval: Duration,
67}
68
69impl Default for PoolConfig {
70 #[inline]
71 fn default() -> Self {
72 Self {
73 max_connections: 10,
74 idle_timeout: Duration::from_secs(60),
75 connect_timeout: Duration::from_secs(10),
76 request_timeout: Duration::from_secs(30),
77 max_retries: 3,
78 retry_base_delay: Duration::from_millis(100),
79 retry_max_delay: Duration::from_secs(30),
80 tcp_keepalive: true,
81 tcp_keepalive_interval: Duration::from_secs(60),
82 }
83 }
84}
85
86impl PoolConfig {
87 #[must_use]
89 #[inline]
90 pub fn new() -> Self {
91 Self::default()
92 }
93
94 #[must_use]
96 #[inline]
97 pub fn with_max_connections(mut self, max: usize) -> Self {
98 self.max_connections = max;
99 self
100 }
101
102 #[must_use]
104 #[inline]
105 pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
106 self.idle_timeout = timeout;
107 self
108 }
109
110 #[must_use]
112 #[inline]
113 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
114 self.connect_timeout = timeout;
115 self
116 }
117
118 #[must_use]
120 #[inline]
121 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
122 self.request_timeout = timeout;
123 self
124 }
125
126 #[must_use]
128 #[inline]
129 pub fn with_max_retries(mut self, retries: usize) -> Self {
130 self.max_retries = retries;
131 self
132 }
133
134 #[must_use]
136 #[inline]
137 pub fn with_retry_base_delay(mut self, delay: Duration) -> Self {
138 self.retry_base_delay = delay;
139 self
140 }
141
142 #[must_use]
144 #[inline]
145 pub fn with_tcp_keepalive(mut self, enabled: bool) -> Self {
146 self.tcp_keepalive = enabled;
147 self
148 }
149
150 #[must_use]
152 #[inline]
153 pub const fn max_connections(&self) -> usize {
154 self.max_connections
155 }
156
157 #[must_use]
159 #[inline]
160 pub const fn idle_timeout(&self) -> Duration {
161 self.idle_timeout
162 }
163
164 #[must_use]
166 #[inline]
167 pub const fn connect_timeout(&self) -> Duration {
168 self.connect_timeout
169 }
170
171 #[must_use]
173 #[inline]
174 pub const fn request_timeout(&self) -> Duration {
175 self.request_timeout
176 }
177
178 #[must_use]
180 #[inline]
181 pub const fn max_retries(&self) -> usize {
182 self.max_retries
183 }
184}
185
186#[derive(Debug)]
188struct PooledConnection {
189 #[allow(dead_code)]
191 id: usize,
192 last_used: Instant,
194 requests_served: u64,
196 in_use: bool,
198 client: reqwest::Client,
200}
201
202impl PooledConnection {
203 #[must_use]
205 fn new(id: usize, config: &PoolConfig) -> Self {
206 let mut builder = reqwest::Client::builder()
207 .timeout(config.request_timeout)
208 .connect_timeout(config.connect_timeout)
209 .pool_max_idle_per_host(1)
210 .pool_idle_timeout(config.idle_timeout);
211
212 if config.tcp_keepalive {
213 builder = builder.tcp_keepalive(Some(config.tcp_keepalive_interval));
214 }
215
216 let client = builder.build().unwrap_or_else(|_| reqwest::Client::new());
217
218 Self {
219 id,
220 last_used: Instant::now(),
221 requests_served: 0,
222 in_use: false,
223 client,
224 }
225 }
226
227 #[inline]
229 fn mark_used(&mut self) {
230 self.last_used = Instant::now();
231 self.requests_served += 1;
232 self.in_use = true;
233 }
234
235 #[inline]
237 fn release(&mut self) {
238 self.in_use = false;
239 self.last_used = Instant::now();
240 }
241
242 #[must_use]
244 #[inline]
245 fn is_idle(&self, idle_timeout: Duration) -> bool {
246 !self.in_use && self.last_used.elapsed() > idle_timeout
247 }
248
249 #[must_use]
251 #[inline]
252 #[allow(dead_code)]
253 const fn id(&self) -> usize {
254 self.id
255 }
256
257 #[must_use]
259 #[inline]
260 #[allow(dead_code)]
261 const fn requests_served(&self) -> u64 {
262 self.requests_served
263 }
264}
265
266#[derive(Debug, Clone, Default)]
268pub struct PoolStats {
269 pub total_requests: u64,
271 pub successful_requests: u64,
273 pub failed_requests: u64,
275 pub retried_requests: u64,
277 pub active_connections: usize,
279 pub idle_connections: usize,
281 pub total_connections_created: u64,
283 pub total_connections_closed: u64,
285 pub avg_requests_per_connection: f64,
287}
288
289pub struct ConnectionPool {
291 base_url: String,
293 config: PoolConfig,
295 connections: Arc<RwLock<Vec<PooledConnection>>>,
297 #[allow(dead_code)]
299 request_queue: Arc<Mutex<VecDeque<PendingRequest>>>,
300 connection_semaphore: Arc<Semaphore>,
302 stats: Arc<RwLock<PoolStats>>,
304 next_connection_id: Arc<Mutex<usize>>,
306}
307
308#[derive(Debug)]
310#[allow(dead_code)]
311struct PendingRequest {
312 method: String,
313 path: String,
314 body: Vec<u8>,
315 response_tx: tokio::sync::oneshot::Sender<Result<Vec<u8>, ConnectionError>>,
316}
317
318#[derive(Debug, Clone, thiserror::Error)]
320pub enum ConnectionError {
321 #[error("Request timed out")]
323 Timeout,
324 #[error("Connection failed: {0}")]
326 ConnectionFailed(String),
327 #[error("Request failed after {0} retries")]
329 RetriesExhausted(usize),
330 #[error("Invalid configuration: {0}")]
332 InvalidConfig(String),
333 #[error("HTTP error: {status}")]
335 HttpError { status: u16 },
336 #[error("Response channel closed")]
338 ChannelClosed,
339}
340
341impl ConnectionPool {
342 #[must_use]
344 pub fn new(base_url: impl Into<String>, config: PoolConfig) -> Self {
345 let max_connections = config.max_connections;
346
347 Self {
348 base_url: base_url.into(),
349 config,
350 connections: Arc::new(RwLock::new(Vec::new())),
351 request_queue: Arc::new(Mutex::new(VecDeque::new())),
352 connection_semaphore: Arc::new(Semaphore::new(max_connections)),
353 stats: Arc::new(RwLock::new(PoolStats::default())),
354 next_connection_id: Arc::new(Mutex::new(0)),
355 }
356 }
357
358 pub async fn request(
360 &self,
361 method: &str,
362 path: &str,
363 body: &[u8],
364 ) -> Result<Vec<u8>, ConnectionError> {
365 {
367 let mut stats = self.stats.write().await;
368 stats.total_requests += 1;
369 }
370
371 let mut attempts = 0;
373 let mut last_error = None;
374
375 while attempts <= self.config.max_retries {
376 if attempts > 0 {
377 let mut stats = self.stats.write().await;
379 stats.retried_requests += 1;
380
381 let delay = self.calculate_backoff_delay(attempts);
383 sleep(delay).await;
384 }
385
386 match self.execute_request(method, path, body).await {
387 Ok(response) => {
388 let mut stats = self.stats.write().await;
390 stats.successful_requests += 1;
391
392 return Ok(response);
393 }
394 Err(e) => {
395 last_error = Some(e.clone());
396
397 if !self.should_retry(&e) {
399 break;
400 }
401
402 attempts += 1;
403 }
404 }
405 }
406
407 let mut stats = self.stats.write().await;
409 stats.failed_requests += 1;
410
411 Err(last_error.unwrap_or(ConnectionError::RetriesExhausted(attempts)))
412 }
413
414 async fn execute_request(
416 &self,
417 method: &str,
418 path: &str,
419 body: &[u8],
420 ) -> Result<Vec<u8>, ConnectionError> {
421 let connection = self.acquire_connection().await?;
423
424 let url = format!("{}{}", self.base_url, path);
426
427 let result = tokio::time::timeout(
429 self.config.request_timeout,
430 connection
431 .client
432 .request(method.parse().unwrap_or(reqwest::Method::GET), &url)
433 .body(body.to_vec())
434 .send(),
435 )
436 .await;
437
438 self.release_connection(connection).await;
440
441 match result {
443 Ok(Ok(response)) => {
444 let status = response.status();
445 if status.is_success() {
446 response
447 .bytes()
448 .await
449 .map(|b| b.to_vec())
450 .map_err(|e| ConnectionError::ConnectionFailed(e.to_string()))
451 } else {
452 Err(ConnectionError::HttpError {
453 status: status.as_u16(),
454 })
455 }
456 }
457 Ok(Err(e)) => Err(ConnectionError::ConnectionFailed(e.to_string())),
458 Err(_) => Err(ConnectionError::Timeout),
459 }
460 }
461
462 async fn acquire_connection(&self) -> Result<PooledConnection, ConnectionError> {
464 {
466 let mut connections = self.connections.write().await;
467 if let Some(pos) = connections.iter().position(|c| !c.in_use) {
468 connections[pos].mark_used();
469 return Ok(connections.remove(pos));
470 }
471 }
472
473 if let Ok(_permit) = self.connection_semaphore.try_acquire() {
475 let mut next_id = self.next_connection_id.lock().await;
476 let id = *next_id;
477 *next_id += 1;
478
479 let connection = PooledConnection::new(id, &self.config);
480
481 let mut stats = self.stats.write().await;
482 stats.total_connections_created += 1;
483 stats.active_connections += 1;
484
485 return Ok(connection);
486 }
487
488 let _permit = self
490 .connection_semaphore
491 .acquire()
492 .await
493 .map_err(|_| ConnectionError::InvalidConfig("Semaphore closed".to_string()))?;
494
495 let mut next_id = self.next_connection_id.lock().await;
496 let id = *next_id;
497 *next_id += 1;
498
499 let connection = PooledConnection::new(id, &self.config);
500
501 let mut stats = self.stats.write().await;
502 stats.total_connections_created += 1;
503 stats.active_connections += 1;
504
505 Ok(connection)
506 }
507
508 async fn release_connection(&self, mut connection: PooledConnection) {
510 connection.release();
511
512 if connection.is_idle(self.config.idle_timeout) {
514 let mut stats = self.stats.write().await;
515 stats.total_connections_closed += 1;
516 stats.active_connections = stats.active_connections.saturating_sub(1);
517 return;
518 }
519
520 let mut connections = self.connections.write().await;
522 connections.push(connection);
523 }
524
525 #[must_use]
527 #[inline]
528 fn calculate_backoff_delay(&self, attempt: usize) -> Duration {
529 let delay_ms = self.config.retry_base_delay.as_millis() as u64 * 2u64.pow(attempt as u32);
530 let delay = Duration::from_millis(delay_ms);
531 delay.min(self.config.retry_max_delay)
532 }
533
534 #[must_use]
536 #[inline]
537 fn should_retry(&self, error: &ConnectionError) -> bool {
538 matches!(
539 error,
540 ConnectionError::Timeout | ConnectionError::ConnectionFailed(_)
541 )
542 }
543
544 pub async fn stats(&self) -> PoolStats {
546 let mut stats = self.stats.read().await.clone();
547
548 let connections = self.connections.read().await;
550 stats.active_connections = connections.iter().filter(|c| c.in_use).count();
551 stats.idle_connections = connections.iter().filter(|c| !c.in_use).count();
552
553 if stats.total_connections_created > 0 {
555 stats.avg_requests_per_connection =
556 stats.total_requests as f64 / stats.total_connections_created as f64;
557 }
558
559 stats
560 }
561
562 pub async fn close_idle_connections(&self) {
564 let mut connections = self.connections.write().await;
565 let idle_timeout = self.config.idle_timeout;
566
567 let closed_count = connections
568 .iter()
569 .filter(|conn| conn.is_idle(idle_timeout))
570 .count();
571
572 connections.retain(|conn| !conn.is_idle(idle_timeout));
573
574 if closed_count > 0 {
576 let mut stats = self.stats.write().await;
577 stats.total_connections_closed += closed_count as u64;
578 stats.active_connections = stats.active_connections.saturating_sub(closed_count);
579 }
580 }
581
582 #[must_use]
584 #[inline]
585 pub fn config(&self) -> &PoolConfig {
586 &self.config
587 }
588
589 #[must_use]
591 #[inline]
592 pub fn base_url(&self) -> &str {
593 &self.base_url
594 }
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600
601 #[test]
602 fn test_pool_config_default() {
603 let config = PoolConfig::default();
604 assert_eq!(config.max_connections(), 10);
605 assert_eq!(config.idle_timeout(), Duration::from_secs(60));
606 assert_eq!(config.connect_timeout(), Duration::from_secs(10));
607 assert_eq!(config.request_timeout(), Duration::from_secs(30));
608 assert_eq!(config.max_retries(), 3);
609 }
610
611 #[test]
612 fn test_pool_config_builder() {
613 let config = PoolConfig::new()
614 .with_max_connections(20)
615 .with_idle_timeout(Duration::from_secs(120))
616 .with_connect_timeout(Duration::from_secs(5))
617 .with_request_timeout(Duration::from_secs(60))
618 .with_max_retries(5)
619 .with_tcp_keepalive(false);
620
621 assert_eq!(config.max_connections(), 20);
622 assert_eq!(config.idle_timeout(), Duration::from_secs(120));
623 assert_eq!(config.connect_timeout(), Duration::from_secs(5));
624 assert_eq!(config.request_timeout(), Duration::from_secs(60));
625 assert_eq!(config.max_retries(), 5);
626 }
627
628 #[test]
629 fn test_pooled_connection_creation() {
630 let config = PoolConfig::default();
631 let conn = PooledConnection::new(0, &config);
632 assert_eq!(conn.id(), 0);
633 assert_eq!(conn.requests_served(), 0);
634 assert!(!conn.in_use);
635 }
636
637 #[test]
638 fn test_pooled_connection_mark_used() {
639 let config = PoolConfig::default();
640 let mut conn = PooledConnection::new(0, &config);
641 conn.mark_used();
642 assert!(conn.in_use);
643 assert_eq!(conn.requests_served(), 1);
644 }
645
646 #[test]
647 fn test_pooled_connection_release() {
648 let config = PoolConfig::default();
649 let mut conn = PooledConnection::new(0, &config);
650 conn.mark_used();
651 conn.release();
652 assert!(!conn.in_use);
653 assert_eq!(conn.requests_served(), 1);
654 }
655
656 #[test]
657 fn test_pooled_connection_idle() {
658 let config = PoolConfig::default();
659 let conn = PooledConnection::new(0, &config);
660 assert!(!conn.is_idle(Duration::from_millis(1)));
662 }
663
664 #[test]
665 fn test_calculate_backoff_delay() {
666 let config = PoolConfig::default();
667 let pool = ConnectionPool::new("http://localhost", config);
668
669 let delay0 = pool.calculate_backoff_delay(0);
670 let delay1 = pool.calculate_backoff_delay(1);
671 let delay2 = pool.calculate_backoff_delay(2);
672
673 assert_eq!(delay0, Duration::from_millis(100)); assert_eq!(delay1, Duration::from_millis(200)); assert_eq!(delay2, Duration::from_millis(400)); }
677
678 #[test]
679 fn test_should_retry() {
680 let config = PoolConfig::default();
681 let pool = ConnectionPool::new("http://localhost", config);
682
683 assert!(pool.should_retry(&ConnectionError::Timeout));
684 assert!(pool.should_retry(&ConnectionError::ConnectionFailed("test".to_string())));
685 assert!(!pool.should_retry(&ConnectionError::HttpError { status: 400 }));
686 }
687
688 #[tokio::test]
689 async fn test_pool_creation() {
690 let config = PoolConfig::default();
691 let pool = ConnectionPool::new("http://localhost:8080", config);
692 assert_eq!(pool.base_url(), "http://localhost:8080");
693 }
694
695 #[tokio::test]
696 async fn test_pool_stats_initial() {
697 let config = PoolConfig::default();
698 let pool = ConnectionPool::new("http://localhost:8080", config);
699 let stats = pool.stats().await;
700 assert_eq!(stats.total_requests, 0);
701 assert_eq!(stats.successful_requests, 0);
702 assert_eq!(stats.failed_requests, 0);
703 assert_eq!(stats.active_connections, 0);
704 assert_eq!(stats.idle_connections, 0);
705 }
706
707 #[tokio::test]
708 async fn test_pool_close_idle_connections() {
709 let config = PoolConfig::default().with_idle_timeout(Duration::from_millis(10));
710 let pool = ConnectionPool::new("http://localhost:8080", config);
711
712 let stats = pool.stats().await;
714 assert_eq!(stats.idle_connections, 0);
715
716 pool.close_idle_connections().await;
718
719 let stats = pool.stats().await;
720 assert_eq!(stats.idle_connections, 0);
721 }
722}