Skip to main content

trueno/brick/
resource_pool.rs

1//! Resource Pool with Semaphore
2//!
3//! AWP-05: Semaphore-based resource pool for managing limited resources.
4
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Mutex;
7
8// ----------------------------------------------------------------------------
9// AWP-05: Resource Pool with Semaphore
10// ----------------------------------------------------------------------------
11
12/// Semaphore-based resource pool.
13///
14/// # Example
15/// ```rust
16/// use trueno::brick::ResourcePool;
17///
18/// let pool: ResourcePool<Vec<u8>> = ResourcePool::new(4, || Vec::with_capacity(1024));
19///
20/// // Acquire resources (up to max)
21/// let r1 = pool.try_acquire().unwrap();
22/// let r2 = pool.try_acquire().unwrap();
23/// let r3 = pool.try_acquire().unwrap();
24/// let r4 = pool.try_acquire().unwrap();
25///
26/// // Pool is exhausted
27/// assert!(pool.try_acquire().is_none());
28///
29/// // Release one
30/// drop(r1);
31///
32/// // Now we can acquire again
33/// assert!(pool.try_acquire().is_some());
34/// ```
35pub struct ResourcePool<T> {
36    /// Maximum concurrent resources.
37    max_resources: usize,
38    /// Available permits.
39    available: AtomicUsize,
40    /// Pooled resources.
41    resources: Mutex<Vec<T>>,
42    /// Factory for creating new resources.
43    factory: Box<dyn Fn() -> T + Send + Sync>,
44}
45
46impl<T> ResourcePool<T> {
47    /// Create a new resource pool.
48    pub fn new(max_resources: usize, factory: impl Fn() -> T + Send + Sync + 'static) -> Self {
49        Self {
50            max_resources,
51            available: AtomicUsize::new(max_resources),
52            resources: Mutex::new(Vec::with_capacity(max_resources)),
53            factory: Box::new(factory),
54        }
55    }
56
57    /// Get the maximum number of resources.
58    #[must_use]
59    pub fn max_resources(&self) -> usize {
60        self.max_resources
61    }
62
63    /// Get the number of available permits.
64    #[must_use]
65    pub fn available(&self) -> usize {
66        self.available.load(Ordering::Acquire)
67    }
68
69    /// Try to acquire a resource (non-blocking).
70    pub fn try_acquire(&self) -> Option<PooledResource<'_, T>> {
71        // Try to get a permit
72        loop {
73            let current = self.available.load(Ordering::Acquire);
74            if current == 0 {
75                return None;
76            }
77            if self
78                .available
79                .compare_exchange(current, current - 1, Ordering::AcqRel, Ordering::Relaxed)
80                .is_ok()
81            {
82                break;
83            }
84        }
85
86        // Get or create resource
87        let resource = {
88            let mut pool = self.resources.lock().unwrap_or_else(|e| e.into_inner());
89            pool.pop().unwrap_or_else(|| (self.factory)())
90        };
91
92        Some(PooledResource { resource: Some(resource), pool: self })
93    }
94
95    fn release(&self, resource: T) {
96        {
97            let mut pool = self.resources.lock().unwrap_or_else(|e| e.into_inner());
98            if pool.len() < self.max_resources {
99                pool.push(resource);
100            }
101            // else: drop resource (pool is full)
102        }
103        self.available.fetch_add(1, Ordering::Release);
104    }
105}
106
107impl<T: std::fmt::Debug> std::fmt::Debug for ResourcePool<T> {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct("ResourcePool")
110            .field("max_resources", &self.max_resources)
111            .field("available", &self.available())
112            .finish()
113    }
114}
115
116/// A resource acquired from a pool.
117pub struct PooledResource<'a, T> {
118    resource: Option<T>,
119    pool: &'a ResourcePool<T>,
120}
121
122impl<T> std::ops::Deref for PooledResource<'_, T> {
123    type Target = T;
124    fn deref(&self) -> &T {
125        self.resource
126            .as_ref()
127            .expect("PooledResource accessed after take (bug: Drop ran before Deref)")
128    }
129}
130
131impl<T> std::ops::DerefMut for PooledResource<'_, T> {
132    fn deref_mut(&mut self) -> &mut T {
133        self.resource
134            .as_mut()
135            .expect("PooledResource accessed after take (bug: Drop ran before DerefMut)")
136    }
137}
138
139impl<T> Drop for PooledResource<'_, T> {
140    fn drop(&mut self) {
141        if let Some(resource) = self.resource.take() {
142            self.pool.release(resource);
143        }
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn test_resource_pool_new() {
153        let pool: ResourcePool<Vec<u8>> = ResourcePool::new(4, Vec::new);
154        assert_eq!(pool.max_resources(), 4);
155        assert_eq!(pool.available(), 4);
156    }
157
158    #[test]
159    fn test_resource_pool_acquire_release() {
160        let pool: ResourcePool<u32> = ResourcePool::new(2, || 0);
161
162        let r1 = pool.try_acquire();
163        assert!(r1.is_some());
164        assert_eq!(pool.available(), 1);
165
166        let r2 = pool.try_acquire();
167        assert!(r2.is_some());
168        assert_eq!(pool.available(), 0);
169
170        // Pool exhausted
171        let r3 = pool.try_acquire();
172        assert!(r3.is_none());
173
174        // Release one
175        drop(r1);
176        assert_eq!(pool.available(), 1);
177
178        // Can acquire again
179        let r4 = pool.try_acquire();
180        assert!(r4.is_some());
181    }
182
183    #[test]
184    fn test_resource_pool_factory_called() {
185        use std::sync::atomic::AtomicUsize;
186        use std::sync::Arc;
187
188        let call_count = Arc::new(AtomicUsize::new(0));
189        let cc = Arc::clone(&call_count);
190        let pool: ResourcePool<u32> =
191            ResourcePool::new(2, move || cc.fetch_add(1, Ordering::SeqCst) as u32);
192
193        // First acquire creates a resource
194        let _r1 = pool.try_acquire().unwrap();
195        assert_eq!(call_count.load(Ordering::SeqCst), 1);
196
197        // Second acquire creates another resource
198        let _r2 = pool.try_acquire().unwrap();
199        assert_eq!(call_count.load(Ordering::SeqCst), 2);
200    }
201
202    #[test]
203    fn test_resource_pool_reuse() {
204        use std::sync::atomic::AtomicUsize;
205        use std::sync::Arc;
206
207        let call_count = Arc::new(AtomicUsize::new(0));
208        let cc = Arc::clone(&call_count);
209        let pool: ResourcePool<u32> =
210            ResourcePool::new(2, move || cc.fetch_add(1, Ordering::SeqCst) as u32);
211
212        // Acquire and release
213        let r1 = pool.try_acquire().unwrap();
214        assert_eq!(call_count.load(Ordering::SeqCst), 1);
215        drop(r1);
216
217        // Acquire again - should reuse existing resource
218        let _r2 = pool.try_acquire().unwrap();
219        assert_eq!(call_count.load(Ordering::SeqCst), 1); // No new resource created
220    }
221
222    #[test]
223    fn test_pooled_resource_deref() {
224        let pool: ResourcePool<Vec<u8>> = ResourcePool::new(1, || vec![1, 2, 3]);
225
226        let resource = pool.try_acquire().unwrap();
227        assert_eq!(resource.len(), 3);
228        assert_eq!(&*resource, &[1, 2, 3]);
229    }
230
231    #[test]
232    fn test_pooled_resource_deref_mut() {
233        let pool: ResourcePool<Vec<u8>> = ResourcePool::new(1, Vec::new);
234
235        let mut resource = pool.try_acquire().unwrap();
236        resource.push(42);
237        assert_eq!(resource.len(), 1);
238    }
239
240    #[test]
241    fn test_resource_pool_debug() {
242        let pool: ResourcePool<u32> = ResourcePool::new(4, || 0);
243        let debug = format!("{:?}", pool);
244        assert!(debug.contains("ResourcePool"));
245        assert!(debug.contains("max_resources: 4"));
246        assert!(debug.contains("available: 4"));
247    }
248
249    /// FALSIFICATION TEST: Verify pool never exceeds max resources
250    ///
251    /// Even under concurrent pressure, the pool must never hand out
252    /// more resources than max_resources.
253    #[test]
254    fn test_falsify_pool_never_exceeds_max() {
255        use std::sync::Arc;
256
257        let pool = Arc::new(ResourcePool::<u32>::new(5, || 0));
258        let mut handles = vec![];
259
260        // Spawn threads that aggressively try to acquire
261        for _ in 0..10 {
262            let pool_clone = Arc::clone(&pool);
263            handles.push(std::thread::spawn(move || {
264                let mut acquired = vec![];
265                for _ in 0..100 {
266                    if let Some(r) = pool_clone.try_acquire() {
267                        acquired.push(r);
268                    }
269                    // Random sleep to increase contention
270                    if acquired.len() > 2 {
271                        acquired.pop();
272                    }
273                }
274                acquired.len()
275            }));
276        }
277
278        for handle in handles {
279            let _ = handle.join();
280        }
281
282        // After all threads complete, available should equal max
283        // (all resources released)
284        // Note: This just verifies the pool is consistent
285        let final_available = pool.available();
286        assert!(
287            final_available <= 5,
288            "FALSIFICATION FAILED: available ({}) > max (5)",
289            final_available
290        );
291    }
292
293    /// FALSIFICATION TEST: Verify CAS prevents double-acquire
294    ///
295    /// The compare-exchange loop must prevent two threads from
296    /// acquiring the same permit.
297    #[test]
298    fn test_falsify_cas_prevents_double_acquire() {
299        use std::sync::atomic::AtomicUsize;
300        use std::sync::Arc;
301
302        let pool = Arc::new(ResourcePool::<u32>::new(1, || 0));
303        let acquired_count = Arc::new(AtomicUsize::new(0));
304        let mut handles = vec![];
305
306        // Multiple threads try to acquire the single resource simultaneously
307        for _ in 0..10 {
308            let pool_clone = Arc::clone(&pool);
309            let count_clone = Arc::clone(&acquired_count);
310            handles.push(std::thread::spawn(move || {
311                for _ in 0..100 {
312                    if let Some(_r) = pool_clone.try_acquire() {
313                        let prev = count_clone.fetch_add(1, Ordering::SeqCst);
314                        // Only one thread should hold the resource at a time
315                        assert!(
316                            prev == 0,
317                            "FALSIFICATION FAILED: Multiple threads acquired simultaneously"
318                        );
319                        std::thread::yield_now();
320                        count_clone.fetch_sub(1, Ordering::SeqCst);
321                    }
322                }
323            }));
324        }
325
326        for handle in handles {
327            handle.join().unwrap();
328        }
329    }
330}