Skip to main content

reqwest_management_pool/
lib.rs

1//! # ReqwestManagementPool
2//!
3//! A client management pool based on Reqwest and Tokio ensures
4//! that clients with different configurations can be reused,
5//! reducing memory consumption.
6//!
7//! ## Features
8//!
9//! - Automatically manages the reqwest client connection pool
10//! - Supports connection reuse, improving performance
11//! - Automatically releases and cleans up idle connections
12//! - Thread-safe, supporting concurrent access
13//! - Automatically expands and shrinks the pool size
14//!
15//! ## Usage Examples
16//!
17//! ```no_run
18//! use reqwest_management_pool::ClientPool;
19//!
20//! #[tokio::main]
21//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
22//!     // Create and initialize the connection pool
23//!     let pool = ClientPool::new();
24//!
25//!     // Get clients from the pool
26//!     if let Some(client) = pool.malloc().await {
27//!         // Send a request using the client.
28//!         let response = client.get()
29//!             .get("https://example.com")
30//!             .send()
31//!             .await?;
32//!
33//!         // The client will automatically release it back into the pool when the scope ends.
34//!     }
35//!
36//!     Ok(())
37//! }
38//! ```
39//!
40//! ## Configuration
41use reqwest::Client;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicUsize, Ordering};
44use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc};
45use tokio::time::{self, Duration, Instant};
46
47// Configuration constants
48const MIN_IDLE_SIZE: usize = 8; // Minimum number of clients to keep in the pool
49const MAX_POOL_SIZE: usize = 100; // Maximum total clients (active + idle)
50const IDLE_TIMEOUT: Duration = Duration::from_secs(120); // Time before an idle client is retired
51const CLEANUP_TIMEOUT: Duration = Duration::from_secs(90); // Time between idle client cleanup runs
52const MONITOR_INTERVAL: Duration = Duration::from_secs(150); // Time between pool metrics reports
53/// Internal wrapper for an idle client with a timestamp
54struct IdleClient {
55    client: Client,
56    last_used: Instant,
57}
58
59/// The main structure for the HTTP client management pool
60#[derive(Clone)]
61pub struct ClientPool {
62    semaphore: Arc<Semaphore>,
63    idle_rx: Arc<Mutex<mpsc::Receiver<IdleClient>>>,
64    release_tx: mpsc::Sender<IdleClient>,
65
66    // Thread-safe counters for monitoring
67    idle_count: Arc<AtomicUsize>,  // Current clients sitting in the pool
68    total_count: Arc<AtomicUsize>, // Total clients managed (idle + working)
69}
70
71/// A smart wrapper for a borrowed client. Returns to pool automatically on Drop.
72pub struct PooledClientInner {
73    client: Option<Client>,
74    _permit: OwnedSemaphorePermit, // Holds the slot in the semaphore
75    release_tx: mpsc::Sender<IdleClient>,
76    idle_count: Arc<AtomicUsize>,
77}
78
79impl PooledClientInner {
80    /// Get a reference to the inner reqwest Client
81    pub fn get(&self) -> &Client {
82        self.client.as_ref().expect("Client should be present")
83    }
84}
85
86impl Drop for PooledClientInner {
87    /// When the user is done with the client, send it back to the idle queue
88    fn drop(&mut self) {
89        if let Some(client) = self.client.take() {
90            let tx = self.release_tx.clone();
91            let count_ref = self.idle_count.clone();
92            let idle = IdleClient {
93                client,
94                last_used: Instant::now(),
95            };
96
97            tokio::spawn(async move {
98                if let Ok(_) = tx.send(idle).await {
99                    // Successfully returned to pool, increment idle count
100                    count_ref.fetch_add(1, Ordering::Relaxed);
101                }
102            });
103        }
104    }
105}
106
107impl ClientPool {
108    /// Create a new ClientPool and initialize core clients
109    pub fn new() -> Self {
110        let (tx, rx) = mpsc::channel(MAX_POOL_SIZE);
111        let idle_count = Arc::new(AtomicUsize::new(0));
112        let total_count = Arc::new(AtomicUsize::new(0));
113
114        // Pre-fill the pool with the minimum idle clients
115        for _ in 0..MIN_IDLE_SIZE {
116            let _ = tx.try_send(IdleClient {
117                client: Client::new(),
118                last_used: Instant::now(),
119            });
120            idle_count.fetch_add(1, Ordering::Relaxed);
121            total_count.fetch_add(1, Ordering::Relaxed);
122        }
123
124        let pool = Self {
125            semaphore: Arc::new(Semaphore::new(MAX_POOL_SIZE)),
126            idle_rx: Arc::new(Mutex::new(rx)),
127            release_tx: tx,
128            idle_count,
129            total_count,
130        };
131
132        // Start background maintenance tasks
133        pool.start_cleanup_task();
134        pool.start_monitor_reporter(MONITOR_INTERVAL);
135
136        pool
137    }
138
139    /// Background task to clean up expired idle clients (Scale Down)
140    fn start_cleanup_task(&self) {
141        let idle_rx = self.idle_rx.clone();
142        let release_tx = self.release_tx.clone();
143        let idle_count = self.idle_count.clone();
144        let total_count = self.total_count.clone();
145
146        tokio::spawn(async move {
147            let mut interval = time::interval(CLEANUP_TIMEOUT);
148            loop {
149                interval.tick().await;
150                let mut rx = idle_rx.lock().await;
151                let mut kept = Vec::new();
152
153                // Drain the channel to inspect all idle clients
154                while let Ok(idle) = rx.try_recv() {
155                    idle_count.fetch_sub(1, Ordering::Relaxed);
156
157                    if idle.last_used.elapsed() < IDLE_TIMEOUT || kept.len() < MIN_IDLE_SIZE {
158                        kept.push(idle);
159                    } else {
160                        // Retired: Decrement the total count of clients in the system
161                        total_count.fetch_sub(1, Ordering::Relaxed);
162                    }
163                }
164
165                // Push kept clients back into the channel
166                for item in kept {
167                    let _ = release_tx.send(item).await;
168                    idle_count.fetch_add(1, Ordering::Relaxed);
169                }
170            }
171        });
172    }
173
174    /// Periodically prints pool metrics to the console
175    pub fn start_monitor_reporter(&self, interval_dur: Duration) {
176        let pool = self.clone();
177        tokio::spawn(async move {
178            let mut interval = time::interval(interval_dur);
179            loop {
180                interval.tick().await;
181                let total = pool.total_count();
182                let idle = pool.idle_count();
183                let working = pool.working_count();
184
185                println!(
186                    "[POOL MONITOR] Total: {}, Idle: {}, Working: {}, Load: {:.1}%",
187                    total,
188                    idle,
189                    working,
190                    if total > 0 {
191                        (working as f64 / total as f64) * 100.0
192                    } else {
193                        0.0
194                    }
195                );
196            }
197        });
198    }
199
200    /// Acquire a client from the pool or create a new one if permitted
201    pub async fn malloc(&self) -> Option<PooledClientInner> {
202        // Wait for a permit from the semaphore (limit global concurrency)
203        let permit = self.semaphore.clone().acquire_owned().await.ok()?;
204
205        let mut rx = self.idle_rx.lock().await;
206        let client = match rx.try_recv() {
207            Ok(idle) => {
208                // Reuse existing client
209                self.idle_count.fetch_sub(1, Ordering::Relaxed);
210                idle.client
211            }
212            Err(_) => {
213                // Pool is empty but semaphore allowed it: create new client
214                self.total_count.fetch_add(1, Ordering::Relaxed);
215                Client::new()
216            }
217        };
218
219        Some(PooledClientInner {
220            client: Some(client),
221            _permit: permit,
222            release_tx: self.release_tx.clone(),
223            idle_count: self.idle_count.clone(),
224        })
225    }
226
227    // --- Metric API ---
228
229    /// Returns the number of clients currently available in the pool
230    pub fn idle_count(&self) -> usize {
231        self.idle_count.load(Ordering::Relaxed)
232    }
233
234    /// Returns the total number of clients managed by this pool
235    pub fn total_count(&self) -> usize {
236        self.total_count.load(Ordering::Relaxed)
237    }
238
239    /// Returns the number of clients currently borrowed and in use
240    pub fn working_count(&self) -> usize {
241        self.total_count().saturating_sub(self.idle_count())
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use std::time::Duration;
249    use tokio::time::sleep;
250    #[tokio::test]
251    async fn test() {
252        // 1. Initialize the pool (English logs enabled by reporter)
253        let pool = ClientPool::new();
254        println!(">>> Test Started: Simulating High Load for 15s <<<");
255
256        // 2. Spawn concurrent tasks to consume clients
257        for i in 0..40 {
258            let p = pool.clone();
259            tokio::spawn(async move {
260                if let Some(_client) = p.malloc().await {
261                    // Use the client for a simulated request
262                    sleep(Duration::from_secs(2)).await;
263                    if i % 10 == 0 {
264                        println!("   [Task {}] Request completed.", i);
265                    }
266                }
267            });
268            // Stagger task starts
269            sleep(Duration::from_millis(50)).await;
270        }
271
272        // 3. Observe the Scaling Down phase
273        println!(">>> Load Phase Ended: Waiting for Scale-Down (15s) <<<");
274        for sec in 1..=240 {
275            sleep(Duration::from_secs(1)).await;
276            if sec % 5 == 0 {
277                println!(
278                    "   Snapshot at {}s: Working={}, Idle={}",
279                    sec,
280                    pool.working_count(),
281                    pool.idle_count()
282                );
283            }
284        }
285
286        println!(">>> Test Finished <<<");
287    }
288}