reqwest_management_pool/
lib.rs1use reqwest::Client;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicUsize, Ordering};
44use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc};
45use tokio::time::{self, Duration, Instant};
46
47const MIN_IDLE_SIZE: usize = 8; const MAX_POOL_SIZE: usize = 100; const IDLE_TIMEOUT: Duration = Duration::from_secs(120); const CLEANUP_TIMEOUT: Duration = Duration::from_secs(90); const MONITOR_INTERVAL: Duration = Duration::from_secs(150); struct IdleClient {
55 client: Client,
56 last_used: Instant,
57}
58
59#[derive(Clone)]
61pub struct ClientPool {
62 semaphore: Arc<Semaphore>,
63 idle_rx: Arc<Mutex<mpsc::Receiver<IdleClient>>>,
64 release_tx: mpsc::Sender<IdleClient>,
65
66 idle_count: Arc<AtomicUsize>, total_count: Arc<AtomicUsize>, }
70
71pub struct PooledClientInner {
73 client: Option<Client>,
74 _permit: OwnedSemaphorePermit, release_tx: mpsc::Sender<IdleClient>,
76 idle_count: Arc<AtomicUsize>,
77}
78
79impl PooledClientInner {
80 pub fn get(&self) -> &Client {
82 self.client.as_ref().expect("Client should be present")
83 }
84}
85
86impl Drop for PooledClientInner {
87 fn drop(&mut self) {
89 if let Some(client) = self.client.take() {
90 let tx = self.release_tx.clone();
91 let count_ref = self.idle_count.clone();
92 let idle = IdleClient {
93 client,
94 last_used: Instant::now(),
95 };
96
97 tokio::spawn(async move {
98 if let Ok(_) = tx.send(idle).await {
99 count_ref.fetch_add(1, Ordering::Relaxed);
101 }
102 });
103 }
104 }
105}
106
107impl ClientPool {
108 pub fn new() -> Self {
110 let (tx, rx) = mpsc::channel(MAX_POOL_SIZE);
111 let idle_count = Arc::new(AtomicUsize::new(0));
112 let total_count = Arc::new(AtomicUsize::new(0));
113
114 for _ in 0..MIN_IDLE_SIZE {
116 let _ = tx.try_send(IdleClient {
117 client: Client::new(),
118 last_used: Instant::now(),
119 });
120 idle_count.fetch_add(1, Ordering::Relaxed);
121 total_count.fetch_add(1, Ordering::Relaxed);
122 }
123
124 let pool = Self {
125 semaphore: Arc::new(Semaphore::new(MAX_POOL_SIZE)),
126 idle_rx: Arc::new(Mutex::new(rx)),
127 release_tx: tx,
128 idle_count,
129 total_count,
130 };
131
132 pool.start_cleanup_task();
134 pool.start_monitor_reporter(MONITOR_INTERVAL);
135
136 pool
137 }
138
139 fn start_cleanup_task(&self) {
141 let idle_rx = self.idle_rx.clone();
142 let release_tx = self.release_tx.clone();
143 let idle_count = self.idle_count.clone();
144 let total_count = self.total_count.clone();
145
146 tokio::spawn(async move {
147 let mut interval = time::interval(CLEANUP_TIMEOUT);
148 loop {
149 interval.tick().await;
150 let mut rx = idle_rx.lock().await;
151 let mut kept = Vec::new();
152
153 while let Ok(idle) = rx.try_recv() {
155 idle_count.fetch_sub(1, Ordering::Relaxed);
156
157 if idle.last_used.elapsed() < IDLE_TIMEOUT || kept.len() < MIN_IDLE_SIZE {
158 kept.push(idle);
159 } else {
160 total_count.fetch_sub(1, Ordering::Relaxed);
162 }
163 }
164
165 for item in kept {
167 let _ = release_tx.send(item).await;
168 idle_count.fetch_add(1, Ordering::Relaxed);
169 }
170 }
171 });
172 }
173
174 pub fn start_monitor_reporter(&self, interval_dur: Duration) {
176 let pool = self.clone();
177 tokio::spawn(async move {
178 let mut interval = time::interval(interval_dur);
179 loop {
180 interval.tick().await;
181 let total = pool.total_count();
182 let idle = pool.idle_count();
183 let working = pool.working_count();
184
185 println!(
186 "[POOL MONITOR] Total: {}, Idle: {}, Working: {}, Load: {:.1}%",
187 total,
188 idle,
189 working,
190 if total > 0 {
191 (working as f64 / total as f64) * 100.0
192 } else {
193 0.0
194 }
195 );
196 }
197 });
198 }
199
200 pub async fn malloc(&self) -> Option<PooledClientInner> {
202 let permit = self.semaphore.clone().acquire_owned().await.ok()?;
204
205 let mut rx = self.idle_rx.lock().await;
206 let client = match rx.try_recv() {
207 Ok(idle) => {
208 self.idle_count.fetch_sub(1, Ordering::Relaxed);
210 idle.client
211 }
212 Err(_) => {
213 self.total_count.fetch_add(1, Ordering::Relaxed);
215 Client::new()
216 }
217 };
218
219 Some(PooledClientInner {
220 client: Some(client),
221 _permit: permit,
222 release_tx: self.release_tx.clone(),
223 idle_count: self.idle_count.clone(),
224 })
225 }
226
227 pub fn idle_count(&self) -> usize {
231 self.idle_count.load(Ordering::Relaxed)
232 }
233
234 pub fn total_count(&self) -> usize {
236 self.total_count.load(Ordering::Relaxed)
237 }
238
239 pub fn working_count(&self) -> usize {
241 self.total_count().saturating_sub(self.idle_count())
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use std::time::Duration;
249 use tokio::time::sleep;
250 #[tokio::test]
251 async fn test() {
252 let pool = ClientPool::new();
254 println!(">>> Test Started: Simulating High Load for 15s <<<");
255
256 for i in 0..40 {
258 let p = pool.clone();
259 tokio::spawn(async move {
260 if let Some(_client) = p.malloc().await {
261 sleep(Duration::from_secs(2)).await;
263 if i % 10 == 0 {
264 println!(" [Task {}] Request completed.", i);
265 }
266 }
267 });
268 sleep(Duration::from_millis(50)).await;
270 }
271
272 println!(">>> Load Phase Ended: Waiting for Scale-Down (15s) <<<");
274 for sec in 1..=240 {
275 sleep(Duration::from_secs(1)).await;
276 if sec % 5 == 0 {
277 println!(
278 " Snapshot at {}s: Working={}, Idle={}",
279 sec,
280 pool.working_count(),
281 pool.idle_count()
282 );
283 }
284 }
285
286 println!(">>> Test Finished <<<");
287 }
288}