1use std::collections::VecDeque;
16use std::sync::{Arc, Condvar, Mutex};
17use std::time::{Duration, Instant};
18
19pub struct ResourceGuard<T> {
24 resource: Option<T>,
25 return_fn: Option<Box<dyn FnOnce(T) + Send>>,
26}
27
28impl<T> ResourceGuard<T> {
29 pub fn new<F>(resource: T, return_fn: F) -> Self
35 where
36 F: FnOnce(T) + Send + 'static,
37 {
38 Self {
39 resource: Some(resource),
40 return_fn: Some(Box::new(return_fn)),
41 }
42 }
43
44 pub fn resource(&self) -> &T {
46 self.resource.as_ref().expect("Resource already returned")
47 }
48
49 pub fn resource_mut(&mut self) -> &mut T {
51 self.resource.as_mut().expect("Resource already returned")
52 }
53
54 pub fn return_resource(mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
56 match (self.resource.take(), self.return_fn.take()) {
57 (Some(resource), Some(return_fn)) => {
58 return_fn(resource);
59 Ok(())
60 }
61 _ => Err("Resource already returned".into()),
62 }
63 }
64
65 pub fn take_ownership(mut self) -> T {
67 self.return_fn.take(); self.resource.take().expect("Resource already taken")
69 }
70}
71
72impl<T> Drop for ResourceGuard<T> {
73 fn drop(&mut self) {
74 if let (Some(resource), Some(return_fn)) = (self.resource.take(), self.return_fn.take()) {
75 return_fn(resource);
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct PoolConfig {
83 pub max_size: usize,
85
86 pub min_size: usize,
88
89 pub acquire_timeout: Duration,
91
92 pub max_idle_time: Duration,
94
95 pub health_check_interval: Duration,
97}
98
99impl Default for PoolConfig {
100 fn default() -> Self {
101 Self {
102 max_size: 10,
103 min_size: 1,
104 acquire_timeout: Duration::from_secs(30),
105 max_idle_time: Duration::from_secs(600), health_check_interval: Duration::from_secs(60), }
108 }
109}
110
111pub struct ResourcePool<T> {
116 inner: Arc<Mutex<PoolInner<T>>>,
117 not_empty: Arc<Condvar>,
118 config: PoolConfig,
119 factory: Arc<dyn Fn() -> Result<T, Box<dyn std::error::Error + Send + Sync>> + Send + Sync>,
120 health_checker: Option<Arc<dyn Fn(&T) -> bool + Send + Sync>>,
121}
122
123struct PoolInner<T> {
124 resources: VecDeque<PooledResource<T>>,
125 total_count: usize,
126 active_count: usize,
127}
128
129struct PooledResource<T> {
130 resource: T,
131 created_at: Instant,
132 last_used: Instant,
133}
134
135impl<T> ResourcePool<T>
136where
137 T: Send + 'static,
138{
139 pub fn new<F>(
155 config: PoolConfig,
156 factory: F,
157 ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>>
158 where
159 F: Fn() -> Result<T, Box<dyn std::error::Error + Send + Sync>> + Send + Sync + 'static,
160 {
161 let pool = Self {
162 inner: Arc::new(Mutex::new(PoolInner {
163 resources: VecDeque::new(),
164 total_count: 0,
165 active_count: 0,
166 })),
167 not_empty: Arc::new(Condvar::new()),
168 config: config.clone(),
169 factory: Arc::new(factory),
170 health_checker: None,
171 };
172
173 pool.ensure_min_resources()?;
175
176 Ok(pool)
177 }
178
179 pub fn with_health_checker<F>(mut self, health_checker: F) -> Self
184 where
185 F: Fn(&T) -> bool + Send + Sync + 'static,
186 {
187 self.health_checker = Some(Arc::new(health_checker));
188 self
189 }
190
191 pub fn acquire(&self) -> Result<ResourceGuard<T>, Box<dyn std::error::Error + Send + Sync>> {
200 let start_time = Instant::now();
201
202 loop {
203 if let Some(resource) = self.try_acquire_existing()? {
205 return Ok(resource);
206 }
207
208 if let Some(resource) = self.try_create_new()? {
210 return Ok(resource);
211 }
212
213 if start_time.elapsed() >= self.config.acquire_timeout {
215 return Err("Timeout waiting for resource".into());
216 }
217
218 let inner = self.inner.lock().unwrap();
220 let _guard = self
221 .not_empty
222 .wait_timeout(inner, Duration::from_millis(100))
223 .unwrap();
224 }
225 }
226
227 fn try_acquire_existing(
229 &self,
230 ) -> Result<Option<ResourceGuard<T>>, Box<dyn std::error::Error + Send + Sync>> {
231 let mut inner = self.inner.lock().unwrap();
232
233 while let Some(mut pooled) = inner.resources.pop_front() {
234 if let Some(ref health_checker) = self.health_checker {
236 if !health_checker(&pooled.resource) {
237 inner.total_count -= 1;
238 continue; }
240 }
241
242 if pooled.last_used.elapsed() > self.config.max_idle_time {
244 inner.total_count -= 1;
245 continue; }
247
248 pooled.last_used = Instant::now();
250 inner.active_count += 1;
251
252 let pool_ref = Arc::clone(&self.inner);
253 let not_empty_ref = Arc::clone(&self.not_empty);
254
255 let guard = ResourceGuard::new(pooled.resource, move |resource| {
256 let mut inner = pool_ref.lock().unwrap();
257 inner.resources.push_back(PooledResource {
258 resource,
259 created_at: pooled.created_at,
260 last_used: Instant::now(),
261 });
262 inner.active_count -= 1;
263 not_empty_ref.notify_one();
264 });
265
266 return Ok(Some(guard));
267 }
268
269 Ok(None)
270 }
271
272 fn try_create_new(
274 &self,
275 ) -> Result<Option<ResourceGuard<T>>, Box<dyn std::error::Error + Send + Sync>> {
276 let inner = self.inner.lock().unwrap();
277
278 if inner.total_count >= self.config.max_size {
279 return Ok(None);
280 }
281
282 drop(inner);
284 let resource = (self.factory)()?;
285
286 let mut inner = self.inner.lock().unwrap();
288 inner.total_count += 1;
289 inner.active_count += 1;
290
291 let pool_ref = Arc::clone(&self.inner);
292 let not_empty_ref = Arc::clone(&self.not_empty);
293 let created_at = Instant::now();
294
295 let guard = ResourceGuard::new(resource, move |resource| {
296 let mut inner = pool_ref.lock().unwrap();
297 inner.resources.push_back(PooledResource {
298 resource,
299 created_at,
300 last_used: Instant::now(),
301 });
302 inner.active_count -= 1;
303 not_empty_ref.notify_one();
304 });
305
306 Ok(Some(guard))
307 }
308
309 fn ensure_min_resources(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
311 loop {
312 let need_more = {
314 let inner = self.inner.lock().unwrap();
315 if inner.total_count >= self.config.max_size {
316 false
317 } else {
318 inner.resources.len() + inner.active_count < self.config.min_size
319 }
320 };
321
322 if !need_more {
323 break;
324 }
325
326 let resource = (self.factory)()?;
328
329 {
331 let mut inner = self.inner.lock().unwrap();
332 inner.resources.push_back(PooledResource {
333 resource,
334 created_at: Instant::now(),
335 last_used: Instant::now(),
336 });
337 inner.total_count += 1;
338 }
339 }
340
341 Ok(())
342 }
343
344 pub fn stats(&self) -> PoolStats {
346 let inner = self.inner.lock().unwrap();
347 PoolStats {
348 total_resources: inner.total_count,
349 available_resources: inner.resources.len(),
350 active_resources: inner.active_count,
351 max_size: self.config.max_size,
352 min_size: self.config.min_size,
353 }
354 }
355
356 pub fn health_check(&self) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
358 let Some(ref health_checker) = self.health_checker else {
359 return Ok(0);
360 };
361
362 let mut inner = self.inner.lock().unwrap();
363 let mut removed_count = 0;
364 let mut healthy_resources = VecDeque::new();
365
366 while let Some(pooled) = inner.resources.pop_front() {
367 if health_checker(&pooled.resource)
368 && pooled.last_used.elapsed() <= self.config.max_idle_time
369 {
370 healthy_resources.push_back(pooled);
371 } else {
372 removed_count += 1;
373 inner.total_count -= 1;
374 }
375 }
376
377 inner.resources = healthy_resources;
378
379 Ok(removed_count)
380 }
381}
382
383#[derive(Debug, Clone)]
385pub struct PoolStats {
386 pub total_resources: usize,
388
389 pub available_resources: usize,
391
392 pub active_resources: usize,
394
395 pub max_size: usize,
397
398 pub min_size: usize,
400}
401
402impl PoolStats {
403 pub fn utilization(&self) -> f64 {
405 if self.max_size == 0 {
406 0.0
407 } else {
408 self.active_resources as f64 / self.max_size as f64
409 }
410 }
411
412 pub fn is_at_capacity(&self) -> bool {
414 self.total_resources >= self.max_size
415 }
416
417 pub fn is_below_minimum(&self) -> bool {
419 self.total_resources < self.min_size
420 }
421}
422
423pub fn managed_resource<T, F>(resource: T, cleanup: F) -> ResourceGuard<T>
428where
429 F: FnOnce(T) + Send + 'static,
430{
431 ResourceGuard::new(resource, cleanup)
432}
433
434pub trait ManagedResource: Sized + Send {
436 type Error: std::error::Error + Send + Sync + 'static;
437
438 fn create() -> Result<Self, Self::Error>;
440
441 fn is_healthy(&self) -> bool {
443 true
444 }
445
446 fn cleanup(self) -> Result<(), Self::Error> {
448 Ok(())
449 }
450
451 fn managed(self) -> ResourceGuard<Self> {
453 managed_resource(self, |resource| {
454 let _ = resource.cleanup();
455 })
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use std::sync::atomic::{AtomicUsize, Ordering};
463
464 #[derive(Debug)]
465 struct TestResource {
466 _id: usize,
467 closed: Arc<AtomicUsize>,
468 }
469
470 impl TestResource {
471 fn new(id: usize, closed: Arc<AtomicUsize>) -> Self {
472 Self { _id: id, closed }
473 }
474 }
475
476 impl Drop for TestResource {
477 fn drop(&mut self) {
478 self.closed.fetch_add(1, Ordering::Relaxed);
479 }
480 }
481
482 #[test]
483 fn test_resource_guard() {
484 let cleanup_called = Arc::new(AtomicUsize::new(0));
485 let cleanup_called_clone = cleanup_called.clone();
486
487 {
488 let guard = ResourceGuard::new(42, move |value| {
489 cleanup_called_clone.store(value, Ordering::Relaxed);
490 });
491
492 assert_eq!(*guard.resource(), 42);
493 } assert_eq!(cleanup_called.load(Ordering::Relaxed), 42);
496 }
497
498 #[test]
499 fn test_resource_pool_basic() {
500 let closed_count = Arc::new(AtomicUsize::new(0));
501 let closed_count_clone = closed_count.clone();
502
503 let pool = ResourcePool::new(
504 PoolConfig {
505 max_size: 3,
506 min_size: 1,
507 ..Default::default()
508 },
509 move || {
510 let id = 1;
511 Ok(TestResource::new(id, closed_count_clone.clone()))
512 },
513 )
514 .unwrap();
515
516 {
518 let _resource1 = pool.acquire().unwrap();
519 let _resource2 = pool.acquire().unwrap();
520
521 let stats = pool.stats();
522 assert_eq!(stats.active_resources, 2);
523 assert!(stats.available_resources <= 1);
524 } let stats = pool.stats();
527 assert_eq!(stats.active_resources, 0);
528 assert!(stats.available_resources > 0);
529 }
530
531 #[test]
532 fn test_resource_pool_capacity() {
533 let pool = ResourcePool::new(
534 PoolConfig {
535 max_size: 2,
536 min_size: 0,
537 acquire_timeout: Duration::from_millis(100),
538 ..Default::default()
539 },
540 || Ok("resource".to_string()),
541 )
542 .unwrap();
543
544 let _r1 = pool.acquire().unwrap();
546 let _r2 = pool.acquire().unwrap();
547
548 let result = pool.acquire();
550 assert!(result.is_err());
551 }
552
553 #[test]
554 fn test_resource_pool_health_check() {
555 let pool = ResourcePool::new(
556 PoolConfig {
557 max_size: 5,
558 min_size: 2, ..Default::default()
560 },
561 || Ok("resource".to_string()),
562 )
563 .unwrap()
564 .with_health_checker(|_| false); let stats_before = pool.stats();
568 println!("Stats before health check: {:?}", stats_before);
569
570 let removed = pool.health_check().unwrap();
572
573 let stats_after = pool.stats();
574 println!("Stats after health check: {:?}", stats_after);
575
576 assert_eq!(removed, 2); }
580
581 #[test]
582 fn test_managed_resource() {
583 let cleanup_called = Arc::new(AtomicUsize::new(0));
584 let cleanup_called_clone = cleanup_called.clone();
585
586 {
587 let _guard = managed_resource(42, move |value| {
588 cleanup_called_clone.store(value, Ordering::Relaxed);
589 });
590 }
591
592 assert_eq!(cleanup_called.load(Ordering::Relaxed), 42);
593 }
594
595 #[test]
596 fn test_pool_stats() {
597 let pool = ResourcePool::new(
598 PoolConfig {
599 max_size: 10,
600 min_size: 2,
601 ..Default::default()
602 },
603 || Ok("resource".to_string()),
604 )
605 .unwrap();
606
607 let stats = pool.stats();
608 assert_eq!(stats.max_size, 10);
609 assert_eq!(stats.min_size, 2);
610 assert!(stats.utilization() <= 1.0);
611 assert!(!stats.is_at_capacity());
612 }
613}