Skip to main content

foundation_utils/
resource.rs

1//! Resource management patterns
2//!
3//! This module provides patterns for safe resource acquisition, management, and release.
4//! Includes resource pools, connection management, and other resource patterns common
5//! in distributed systems and agent architectures.
6//!
7//! ## Key Features
8//!
9//! - **Resource Pools**: Thread-safe resource pooling with automatic cleanup
10//! - **Resource Guards**: RAII-based resource acquisition and release
11//! - **Lease Management**: Time-based and usage-based resource leasing
12//! - **Health Checking**: Automatic resource health validation
13//! - **Graceful Degradation**: Fallback patterns when resources are unavailable
14
15use std::collections::VecDeque;
16use std::sync::{Arc, Condvar, Mutex};
17use std::time::{Duration, Instant};
18
19/// Resource guard that manages the lifetime of a resource
20///
21/// This guard ensures that resources are properly returned to their pool
22/// or cleaned up when they're no longer needed.
23pub struct ResourceGuard<T> {
24    resource: Option<T>,
25    return_fn: Option<Box<dyn FnOnce(T) + Send>>,
26}
27
28impl<T> ResourceGuard<T> {
29    /// Create a new resource guard
30    ///
31    /// # Arguments
32    /// - `resource`: The resource to manage
33    /// - `return_fn`: Function to call when returning the resource
34    pub fn new<F>(resource: T, return_fn: F) -> Self
35    where
36        F: FnOnce(T) + Send + 'static,
37    {
38        Self {
39            resource: Some(resource),
40            return_fn: Some(Box::new(return_fn)),
41        }
42    }
43
44    /// Get a reference to the managed resource
45    pub fn resource(&self) -> &T {
46        self.resource.as_ref().expect("Resource already returned")
47    }
48
49    /// Get a mutable reference to the managed resource
50    pub fn resource_mut(&mut self) -> &mut T {
51        self.resource.as_mut().expect("Resource already returned")
52    }
53
54    /// Manually return the resource (consumes the guard)
55    pub fn return_resource(mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
56        match (self.resource.take(), self.return_fn.take()) {
57            (Some(resource), Some(return_fn)) => {
58                return_fn(resource);
59                Ok(())
60            }
61            _ => Err("Resource already returned".into()),
62        }
63    }
64
65    /// Take ownership of the resource without returning it to the pool
66    pub fn take_ownership(mut self) -> T {
67        self.return_fn.take(); // Prevent automatic return
68        self.resource.take().expect("Resource already taken")
69    }
70}
71
72impl<T> Drop for ResourceGuard<T> {
73    fn drop(&mut self) {
74        if let (Some(resource), Some(return_fn)) = (self.resource.take(), self.return_fn.take()) {
75            return_fn(resource);
76        }
77    }
78}
79
80/// Configuration for resource pools
81#[derive(Debug, Clone)]
82pub struct PoolConfig {
83    /// Maximum number of resources in the pool
84    pub max_size: usize,
85
86    /// Minimum number of resources to keep in the pool
87    pub min_size: usize,
88
89    /// Maximum time to wait for a resource to become available
90    pub acquire_timeout: Duration,
91
92    /// Maximum idle time before a resource is considered expired
93    pub max_idle_time: Duration,
94
95    /// Interval for health checking idle resources
96    pub health_check_interval: Duration,
97}
98
99impl Default for PoolConfig {
100    fn default() -> Self {
101        Self {
102            max_size: 10,
103            min_size: 1,
104            acquire_timeout: Duration::from_secs(30),
105            max_idle_time: Duration::from_secs(600), // 10 minutes
106            health_check_interval: Duration::from_secs(60), // 1 minute
107        }
108    }
109}
110
111/// A resource pool that manages the lifecycle of expensive resources
112///
113/// This pool provides thread-safe access to resources with automatic
114/// cleanup, health checking, and graceful degradation.
115pub struct ResourcePool<T> {
116    inner: Arc<Mutex<PoolInner<T>>>,
117    not_empty: Arc<Condvar>,
118    config: PoolConfig,
119    factory: Arc<dyn Fn() -> Result<T, Box<dyn std::error::Error + Send + Sync>> + Send + Sync>,
120    health_checker: Option<Arc<dyn Fn(&T) -> bool + Send + Sync>>,
121}
122
123struct PoolInner<T> {
124    resources: VecDeque<PooledResource<T>>,
125    total_count: usize,
126    active_count: usize,
127}
128
129struct PooledResource<T> {
130    resource: T,
131    created_at: Instant,
132    last_used: Instant,
133}
134
135impl<T> ResourcePool<T>
136where
137    T: Send + 'static,
138{
139    /// Create a new resource pool
140    ///
141    /// # Arguments
142    /// - `config`: Pool configuration
143    /// - `factory`: Function to create new resources
144    ///
145    /// # Example
146    /// ```rust
147    /// use foundation_utils::resource::{ResourcePool, PoolConfig};
148    ///
149    /// let pool = ResourcePool::new(
150    ///     PoolConfig::default(),
151    ///     || Ok("new_resource".to_string())
152    /// ).unwrap();
153    /// ```
154    pub fn new<F>(
155        config: PoolConfig,
156        factory: F,
157    ) -> Result<Self, Box<dyn std::error::Error + Send + Sync>>
158    where
159        F: Fn() -> Result<T, Box<dyn std::error::Error + Send + Sync>> + Send + Sync + 'static,
160    {
161        let pool = Self {
162            inner: Arc::new(Mutex::new(PoolInner {
163                resources: VecDeque::new(),
164                total_count: 0,
165                active_count: 0,
166            })),
167            not_empty: Arc::new(Condvar::new()),
168            config: config.clone(),
169            factory: Arc::new(factory),
170            health_checker: None,
171        };
172
173        // Pre-populate with minimum number of resources
174        pool.ensure_min_resources()?;
175
176        Ok(pool)
177    }
178
179    /// Set a health checker function for resources
180    ///
181    /// The health checker is called periodically to validate that
182    /// pooled resources are still healthy.
183    pub fn with_health_checker<F>(mut self, health_checker: F) -> Self
184    where
185        F: Fn(&T) -> bool + Send + Sync + 'static,
186    {
187        self.health_checker = Some(Arc::new(health_checker));
188        self
189    }
190
191    /// Acquire a resource from the pool
192    ///
193    /// This method will either return an existing resource from the pool
194    /// or create a new one if the pool is not at capacity.
195    ///
196    /// # Returns
197    /// A `ResourceGuard` that automatically returns the resource to the pool
198    /// when dropped.
199    pub fn acquire(&self) -> Result<ResourceGuard<T>, Box<dyn std::error::Error + Send + Sync>> {
200        let start_time = Instant::now();
201
202        loop {
203            // Try to get a resource from the pool
204            if let Some(resource) = self.try_acquire_existing()? {
205                return Ok(resource);
206            }
207
208            // Try to create a new resource if under capacity
209            if let Some(resource) = self.try_create_new()? {
210                return Ok(resource);
211            }
212
213            // Wait for a resource to become available
214            if start_time.elapsed() >= self.config.acquire_timeout {
215                return Err("Timeout waiting for resource".into());
216            }
217
218            // Wait for notification that a resource is available
219            let inner = self.inner.lock().unwrap();
220            let _guard = self
221                .not_empty
222                .wait_timeout(inner, Duration::from_millis(100))
223                .unwrap();
224        }
225    }
226
227    /// Try to acquire an existing resource from the pool
228    fn try_acquire_existing(
229        &self,
230    ) -> Result<Option<ResourceGuard<T>>, Box<dyn std::error::Error + Send + Sync>> {
231        let mut inner = self.inner.lock().unwrap();
232
233        while let Some(mut pooled) = inner.resources.pop_front() {
234            // Check if resource is still healthy
235            if let Some(ref health_checker) = self.health_checker {
236                if !health_checker(&pooled.resource) {
237                    inner.total_count -= 1;
238                    continue; // Try next resource
239                }
240            }
241
242            // Check if resource is expired
243            if pooled.last_used.elapsed() > self.config.max_idle_time {
244                inner.total_count -= 1;
245                continue; // Try next resource
246            }
247
248            // Resource is good, update usage time and return it
249            pooled.last_used = Instant::now();
250            inner.active_count += 1;
251
252            let pool_ref = Arc::clone(&self.inner);
253            let not_empty_ref = Arc::clone(&self.not_empty);
254
255            let guard = ResourceGuard::new(pooled.resource, move |resource| {
256                let mut inner = pool_ref.lock().unwrap();
257                inner.resources.push_back(PooledResource {
258                    resource,
259                    created_at: pooled.created_at,
260                    last_used: Instant::now(),
261                });
262                inner.active_count -= 1;
263                not_empty_ref.notify_one();
264            });
265
266            return Ok(Some(guard));
267        }
268
269        Ok(None)
270    }
271
272    /// Try to create a new resource if under capacity
273    fn try_create_new(
274        &self,
275    ) -> Result<Option<ResourceGuard<T>>, Box<dyn std::error::Error + Send + Sync>> {
276        let inner = self.inner.lock().unwrap();
277
278        if inner.total_count >= self.config.max_size {
279            return Ok(None);
280        }
281
282        // Create new resource outside the lock
283        drop(inner);
284        let resource = (self.factory)()?;
285
286        // Re-acquire lock and update counters
287        let mut inner = self.inner.lock().unwrap();
288        inner.total_count += 1;
289        inner.active_count += 1;
290
291        let pool_ref = Arc::clone(&self.inner);
292        let not_empty_ref = Arc::clone(&self.not_empty);
293        let created_at = Instant::now();
294
295        let guard = ResourceGuard::new(resource, move |resource| {
296            let mut inner = pool_ref.lock().unwrap();
297            inner.resources.push_back(PooledResource {
298                resource,
299                created_at,
300                last_used: Instant::now(),
301            });
302            inner.active_count -= 1;
303            not_empty_ref.notify_one();
304        });
305
306        Ok(Some(guard))
307    }
308
309    /// Ensure the pool has at least the minimum number of resources
310    fn ensure_min_resources(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
311        loop {
312            // Check if we need more resources
313            let need_more = {
314                let inner = self.inner.lock().unwrap();
315                if inner.total_count >= self.config.max_size {
316                    false
317                } else {
318                    inner.resources.len() + inner.active_count < self.config.min_size
319                }
320            };
321
322            if !need_more {
323                break;
324            }
325
326            // Create resource outside the lock
327            let resource = (self.factory)()?;
328
329            // Add resource to pool
330            {
331                let mut inner = self.inner.lock().unwrap();
332                inner.resources.push_back(PooledResource {
333                    resource,
334                    created_at: Instant::now(),
335                    last_used: Instant::now(),
336                });
337                inner.total_count += 1;
338            }
339        }
340
341        Ok(())
342    }
343
344    /// Get pool statistics
345    pub fn stats(&self) -> PoolStats {
346        let inner = self.inner.lock().unwrap();
347        PoolStats {
348            total_resources: inner.total_count,
349            available_resources: inner.resources.len(),
350            active_resources: inner.active_count,
351            max_size: self.config.max_size,
352            min_size: self.config.min_size,
353        }
354    }
355
356    /// Perform health check on all idle resources
357    pub fn health_check(&self) -> Result<usize, Box<dyn std::error::Error + Send + Sync>> {
358        let Some(ref health_checker) = self.health_checker else {
359            return Ok(0);
360        };
361
362        let mut inner = self.inner.lock().unwrap();
363        let mut removed_count = 0;
364        let mut healthy_resources = VecDeque::new();
365
366        while let Some(pooled) = inner.resources.pop_front() {
367            if health_checker(&pooled.resource)
368                && pooled.last_used.elapsed() <= self.config.max_idle_time
369            {
370                healthy_resources.push_back(pooled);
371            } else {
372                removed_count += 1;
373                inner.total_count -= 1;
374            }
375        }
376
377        inner.resources = healthy_resources;
378
379        Ok(removed_count)
380    }
381}
382
383/// Statistics about a resource pool
384#[derive(Debug, Clone)]
385pub struct PoolStats {
386    /// Total number of resources (active + available)
387    pub total_resources: usize,
388
389    /// Number of available resources in the pool
390    pub available_resources: usize,
391
392    /// Number of actively used resources
393    pub active_resources: usize,
394
395    /// Maximum pool size
396    pub max_size: usize,
397
398    /// Minimum pool size
399    pub min_size: usize,
400}
401
402impl PoolStats {
403    /// Get the pool utilization as a percentage (0.0 to 1.0)
404    pub fn utilization(&self) -> f64 {
405        if self.max_size == 0 {
406            0.0
407        } else {
408            self.active_resources as f64 / self.max_size as f64
409        }
410    }
411
412    /// Check if the pool is at capacity
413    pub fn is_at_capacity(&self) -> bool {
414        self.total_resources >= self.max_size
415    }
416
417    /// Check if the pool is below minimum size
418    pub fn is_below_minimum(&self) -> bool {
419        self.total_resources < self.min_size
420    }
421}
422
423/// Convenience function to create a simple resource guard
424///
425/// This function creates a guard that will call the cleanup function
426/// when the resource is no longer needed.
427pub fn managed_resource<T, F>(resource: T, cleanup: F) -> ResourceGuard<T>
428where
429    F: FnOnce(T) + Send + 'static,
430{
431    ResourceGuard::new(resource, cleanup)
432}
433
434/// Trait for types that can be managed as resources
435pub trait ManagedResource: Sized + Send {
436    type Error: std::error::Error + Send + Sync + 'static;
437
438    /// Create a new instance of this resource
439    fn create() -> Result<Self, Self::Error>;
440
441    /// Check if this resource is healthy
442    fn is_healthy(&self) -> bool {
443        true
444    }
445
446    /// Clean up this resource (called when resource is dropped)
447    fn cleanup(self) -> Result<(), Self::Error> {
448        Ok(())
449    }
450
451    /// Create a managed resource guard
452    fn managed(self) -> ResourceGuard<Self> {
453        managed_resource(self, |resource| {
454            let _ = resource.cleanup();
455        })
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use std::sync::atomic::{AtomicUsize, Ordering};
463
464    #[derive(Debug)]
465    struct TestResource {
466        _id: usize,
467        closed: Arc<AtomicUsize>,
468    }
469
470    impl TestResource {
471        fn new(id: usize, closed: Arc<AtomicUsize>) -> Self {
472            Self { _id: id, closed }
473        }
474    }
475
476    impl Drop for TestResource {
477        fn drop(&mut self) {
478            self.closed.fetch_add(1, Ordering::Relaxed);
479        }
480    }
481
482    #[test]
483    fn test_resource_guard() {
484        let cleanup_called = Arc::new(AtomicUsize::new(0));
485        let cleanup_called_clone = cleanup_called.clone();
486
487        {
488            let guard = ResourceGuard::new(42, move |value| {
489                cleanup_called_clone.store(value, Ordering::Relaxed);
490            });
491
492            assert_eq!(*guard.resource(), 42);
493        } // Guard drops here
494
495        assert_eq!(cleanup_called.load(Ordering::Relaxed), 42);
496    }
497
498    #[test]
499    fn test_resource_pool_basic() {
500        let closed_count = Arc::new(AtomicUsize::new(0));
501        let closed_count_clone = closed_count.clone();
502
503        let pool = ResourcePool::new(
504            PoolConfig {
505                max_size: 3,
506                min_size: 1,
507                ..Default::default()
508            },
509            move || {
510                let id = 1;
511                Ok(TestResource::new(id, closed_count_clone.clone()))
512            },
513        )
514        .unwrap();
515
516        // Test acquiring and releasing resources
517        {
518            let _resource1 = pool.acquire().unwrap();
519            let _resource2 = pool.acquire().unwrap();
520
521            let stats = pool.stats();
522            assert_eq!(stats.active_resources, 2);
523            assert!(stats.available_resources <= 1);
524        } // Resources should be returned to pool
525
526        let stats = pool.stats();
527        assert_eq!(stats.active_resources, 0);
528        assert!(stats.available_resources > 0);
529    }
530
531    #[test]
532    fn test_resource_pool_capacity() {
533        let pool = ResourcePool::new(
534            PoolConfig {
535                max_size: 2,
536                min_size: 0,
537                acquire_timeout: Duration::from_millis(100),
538                ..Default::default()
539            },
540            || Ok("resource".to_string()),
541        )
542        .unwrap();
543
544        // Acquire all resources
545        let _r1 = pool.acquire().unwrap();
546        let _r2 = pool.acquire().unwrap();
547
548        // This should timeout since pool is at capacity
549        let result = pool.acquire();
550        assert!(result.is_err());
551    }
552
553    #[test]
554    fn test_resource_pool_health_check() {
555        let pool = ResourcePool::new(
556            PoolConfig {
557                max_size: 5,
558                min_size: 2, // Pre-populate with 2 resources
559                ..Default::default()
560            },
561            || Ok("resource".to_string()),
562        )
563        .unwrap()
564        .with_health_checker(|_| false); // Mark all resources as unhealthy
565
566        // Check that we have some available resources
567        let stats_before = pool.stats();
568        println!("Stats before health check: {:?}", stats_before);
569
570        // Perform health check - should remove unhealthy resources
571        let removed = pool.health_check().unwrap();
572
573        let stats_after = pool.stats();
574        println!("Stats after health check: {:?}", stats_after);
575
576        // We should have removed some resources since they were all marked unhealthy
577        // and we had min_size=2 resources initially
578        assert_eq!(removed, 2); // Should have removed the 2 initial resources
579    }
580
581    #[test]
582    fn test_managed_resource() {
583        let cleanup_called = Arc::new(AtomicUsize::new(0));
584        let cleanup_called_clone = cleanup_called.clone();
585
586        {
587            let _guard = managed_resource(42, move |value| {
588                cleanup_called_clone.store(value, Ordering::Relaxed);
589            });
590        }
591
592        assert_eq!(cleanup_called.load(Ordering::Relaxed), 42);
593    }
594
595    #[test]
596    fn test_pool_stats() {
597        let pool = ResourcePool::new(
598            PoolConfig {
599                max_size: 10,
600                min_size: 2,
601                ..Default::default()
602            },
603            || Ok("resource".to_string()),
604        )
605        .unwrap();
606
607        let stats = pool.stats();
608        assert_eq!(stats.max_size, 10);
609        assert_eq!(stats.min_size, 2);
610        assert!(stats.utilization() <= 1.0);
611        assert!(!stats.is_at_capacity());
612    }
613}