trueno/brick/
resource_pool.rs1use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Mutex;
7
8pub struct ResourcePool<T> {
36 max_resources: usize,
38 available: AtomicUsize,
40 resources: Mutex<Vec<T>>,
42 factory: Box<dyn Fn() -> T + Send + Sync>,
44}
45
46impl<T> ResourcePool<T> {
47 pub fn new(max_resources: usize, factory: impl Fn() -> T + Send + Sync + 'static) -> Self {
49 Self {
50 max_resources,
51 available: AtomicUsize::new(max_resources),
52 resources: Mutex::new(Vec::with_capacity(max_resources)),
53 factory: Box::new(factory),
54 }
55 }
56
57 #[must_use]
59 pub fn max_resources(&self) -> usize {
60 self.max_resources
61 }
62
63 #[must_use]
65 pub fn available(&self) -> usize {
66 self.available.load(Ordering::Acquire)
67 }
68
69 pub fn try_acquire(&self) -> Option<PooledResource<'_, T>> {
71 loop {
73 let current = self.available.load(Ordering::Acquire);
74 if current == 0 {
75 return None;
76 }
77 if self
78 .available
79 .compare_exchange(current, current - 1, Ordering::AcqRel, Ordering::Relaxed)
80 .is_ok()
81 {
82 break;
83 }
84 }
85
86 let resource = {
88 let mut pool = self.resources.lock().unwrap_or_else(|e| e.into_inner());
89 pool.pop().unwrap_or_else(|| (self.factory)())
90 };
91
92 Some(PooledResource { resource: Some(resource), pool: self })
93 }
94
95 fn release(&self, resource: T) {
96 {
97 let mut pool = self.resources.lock().unwrap_or_else(|e| e.into_inner());
98 if pool.len() < self.max_resources {
99 pool.push(resource);
100 }
101 }
103 self.available.fetch_add(1, Ordering::Release);
104 }
105}
106
107impl<T: std::fmt::Debug> std::fmt::Debug for ResourcePool<T> {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct("ResourcePool")
110 .field("max_resources", &self.max_resources)
111 .field("available", &self.available())
112 .finish()
113 }
114}
115
116pub struct PooledResource<'a, T> {
118 resource: Option<T>,
119 pool: &'a ResourcePool<T>,
120}
121
122impl<T> std::ops::Deref for PooledResource<'_, T> {
123 type Target = T;
124 fn deref(&self) -> &T {
125 self.resource
126 .as_ref()
127 .expect("PooledResource accessed after take (bug: Drop ran before Deref)")
128 }
129}
130
131impl<T> std::ops::DerefMut for PooledResource<'_, T> {
132 fn deref_mut(&mut self) -> &mut T {
133 self.resource
134 .as_mut()
135 .expect("PooledResource accessed after take (bug: Drop ran before DerefMut)")
136 }
137}
138
139impl<T> Drop for PooledResource<'_, T> {
140 fn drop(&mut self) {
141 if let Some(resource) = self.resource.take() {
142 self.pool.release(resource);
143 }
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn test_resource_pool_new() {
153 let pool: ResourcePool<Vec<u8>> = ResourcePool::new(4, Vec::new);
154 assert_eq!(pool.max_resources(), 4);
155 assert_eq!(pool.available(), 4);
156 }
157
158 #[test]
159 fn test_resource_pool_acquire_release() {
160 let pool: ResourcePool<u32> = ResourcePool::new(2, || 0);
161
162 let r1 = pool.try_acquire();
163 assert!(r1.is_some());
164 assert_eq!(pool.available(), 1);
165
166 let r2 = pool.try_acquire();
167 assert!(r2.is_some());
168 assert_eq!(pool.available(), 0);
169
170 let r3 = pool.try_acquire();
172 assert!(r3.is_none());
173
174 drop(r1);
176 assert_eq!(pool.available(), 1);
177
178 let r4 = pool.try_acquire();
180 assert!(r4.is_some());
181 }
182
183 #[test]
184 fn test_resource_pool_factory_called() {
185 use std::sync::atomic::AtomicUsize;
186 use std::sync::Arc;
187
188 let call_count = Arc::new(AtomicUsize::new(0));
189 let cc = Arc::clone(&call_count);
190 let pool: ResourcePool<u32> =
191 ResourcePool::new(2, move || cc.fetch_add(1, Ordering::SeqCst) as u32);
192
193 let _r1 = pool.try_acquire().unwrap();
195 assert_eq!(call_count.load(Ordering::SeqCst), 1);
196
197 let _r2 = pool.try_acquire().unwrap();
199 assert_eq!(call_count.load(Ordering::SeqCst), 2);
200 }
201
202 #[test]
203 fn test_resource_pool_reuse() {
204 use std::sync::atomic::AtomicUsize;
205 use std::sync::Arc;
206
207 let call_count = Arc::new(AtomicUsize::new(0));
208 let cc = Arc::clone(&call_count);
209 let pool: ResourcePool<u32> =
210 ResourcePool::new(2, move || cc.fetch_add(1, Ordering::SeqCst) as u32);
211
212 let r1 = pool.try_acquire().unwrap();
214 assert_eq!(call_count.load(Ordering::SeqCst), 1);
215 drop(r1);
216
217 let _r2 = pool.try_acquire().unwrap();
219 assert_eq!(call_count.load(Ordering::SeqCst), 1); }
221
222 #[test]
223 fn test_pooled_resource_deref() {
224 let pool: ResourcePool<Vec<u8>> = ResourcePool::new(1, || vec![1, 2, 3]);
225
226 let resource = pool.try_acquire().unwrap();
227 assert_eq!(resource.len(), 3);
228 assert_eq!(&*resource, &[1, 2, 3]);
229 }
230
231 #[test]
232 fn test_pooled_resource_deref_mut() {
233 let pool: ResourcePool<Vec<u8>> = ResourcePool::new(1, Vec::new);
234
235 let mut resource = pool.try_acquire().unwrap();
236 resource.push(42);
237 assert_eq!(resource.len(), 1);
238 }
239
240 #[test]
241 fn test_resource_pool_debug() {
242 let pool: ResourcePool<u32> = ResourcePool::new(4, || 0);
243 let debug = format!("{:?}", pool);
244 assert!(debug.contains("ResourcePool"));
245 assert!(debug.contains("max_resources: 4"));
246 assert!(debug.contains("available: 4"));
247 }
248
249 #[test]
254 fn test_falsify_pool_never_exceeds_max() {
255 use std::sync::Arc;
256
257 let pool = Arc::new(ResourcePool::<u32>::new(5, || 0));
258 let mut handles = vec![];
259
260 for _ in 0..10 {
262 let pool_clone = Arc::clone(&pool);
263 handles.push(std::thread::spawn(move || {
264 let mut acquired = vec![];
265 for _ in 0..100 {
266 if let Some(r) = pool_clone.try_acquire() {
267 acquired.push(r);
268 }
269 if acquired.len() > 2 {
271 acquired.pop();
272 }
273 }
274 acquired.len()
275 }));
276 }
277
278 for handle in handles {
279 let _ = handle.join();
280 }
281
282 let final_available = pool.available();
286 assert!(
287 final_available <= 5,
288 "FALSIFICATION FAILED: available ({}) > max (5)",
289 final_available
290 );
291 }
292
293 #[test]
298 fn test_falsify_cas_prevents_double_acquire() {
299 use std::sync::atomic::AtomicUsize;
300 use std::sync::Arc;
301
302 let pool = Arc::new(ResourcePool::<u32>::new(1, || 0));
303 let acquired_count = Arc::new(AtomicUsize::new(0));
304 let mut handles = vec![];
305
306 for _ in 0..10 {
308 let pool_clone = Arc::clone(&pool);
309 let count_clone = Arc::clone(&acquired_count);
310 handles.push(std::thread::spawn(move || {
311 for _ in 0..100 {
312 if let Some(_r) = pool_clone.try_acquire() {
313 let prev = count_clone.fetch_add(1, Ordering::SeqCst);
314 assert!(
316 prev == 0,
317 "FALSIFICATION FAILED: Multiple threads acquired simultaneously"
318 );
319 std::thread::yield_now();
320 count_clone.fetch_sub(1, Ordering::SeqCst);
321 }
322 }
323 }));
324 }
325
326 for handle in handles {
327 handle.join().unwrap();
328 }
329 }
330}