graphos_common/memory/
pool.rs1use parking_lot::Mutex;
7use std::ops::{Deref, DerefMut};
8
9pub struct ObjectPool<T> {
15 pool: Mutex<Vec<T>>,
17 factory: Box<dyn Fn() -> T + Send + Sync>,
19 reset: Option<Box<dyn Fn(&mut T) + Send + Sync>>,
21 max_size: usize,
23}
24
25impl<T> ObjectPool<T> {
26 pub fn new<F>(factory: F) -> Self
28 where
29 F: Fn() -> T + Send + Sync + 'static,
30 {
31 Self {
32 pool: Mutex::new(Vec::new()),
33 factory: Box::new(factory),
34 reset: None,
35 max_size: 1024,
36 }
37 }
38
39 pub fn with_reset<F, R>(factory: F, reset: R) -> Self
44 where
45 F: Fn() -> T + Send + Sync + 'static,
46 R: Fn(&mut T) + Send + Sync + 'static,
47 {
48 Self {
49 pool: Mutex::new(Vec::new()),
50 factory: Box::new(factory),
51 reset: Some(Box::new(reset)),
52 max_size: 1024,
53 }
54 }
55
56 #[must_use]
60 pub fn with_max_size(mut self, max_size: usize) -> Self {
61 self.max_size = max_size;
62 self
63 }
64
65 pub fn prefill(&self, count: usize) {
67 let mut pool = self.pool.lock();
68 let to_add = count.saturating_sub(pool.len()).min(self.max_size - pool.len());
69 for _ in 0..to_add {
70 pool.push((self.factory)());
71 }
72 }
73
74 pub fn get(&self) -> Pooled<'_, T> {
79 let value = self.pool.lock().pop().unwrap_or_else(|| (self.factory)());
80 Pooled { pool: self, value: Some(value) }
81 }
82
83 pub fn take(&self) -> T {
87 self.pool.lock().pop().unwrap_or_else(|| (self.factory)())
88 }
89
90 pub fn put(&self, mut value: T) {
94 if let Some(ref reset) = self.reset {
95 reset(&mut value);
96 }
97
98 let mut pool = self.pool.lock();
99 if pool.len() < self.max_size {
100 pool.push(value);
101 }
102 }
104
105 #[must_use]
107 pub fn available(&self) -> usize {
108 self.pool.lock().len()
109 }
110
111 #[must_use]
113 pub fn max_size(&self) -> usize {
114 self.max_size
115 }
116
117 pub fn clear(&self) {
119 self.pool.lock().clear();
120 }
121}
122
123pub struct Pooled<'a, T> {
125 pool: &'a ObjectPool<T>,
126 value: Option<T>,
127}
128
129impl<T> Pooled<'_, T> {
130 pub fn take(mut self) -> T {
132 self.value.take().expect("Value already taken")
133 }
134}
135
136impl<T> Deref for Pooled<'_, T> {
137 type Target = T;
138
139 fn deref(&self) -> &Self::Target {
140 self.value.as_ref().expect("Value already taken")
141 }
142}
143
144impl<T> DerefMut for Pooled<'_, T> {
145 fn deref_mut(&mut self) -> &mut Self::Target {
146 self.value.as_mut().expect("Value already taken")
147 }
148}
149
150impl<T> Drop for Pooled<'_, T> {
151 fn drop(&mut self) {
152 if let Some(value) = self.value.take() {
153 self.pool.put(value);
154 }
155 }
156}
157
158pub type VecPool<T> = ObjectPool<Vec<T>>;
160
161impl<T: 'static> VecPool<T> {
162 pub fn new_vec_pool() -> Self {
164 ObjectPool::with_reset(Vec::new, |v| v.clear())
165 }
166
167 pub fn new_vec_pool_with_capacity(capacity: usize) -> Self {
169 ObjectPool::with_reset(move || Vec::with_capacity(capacity), |v| v.clear())
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 #[test]
178 fn test_pool_basic() {
179 let pool: ObjectPool<Vec<u8>> = ObjectPool::new(Vec::new);
180
181 let mut obj = pool.get();
183 obj.push(1);
184 obj.push(2);
185 assert_eq!(&*obj, &[1, 2]);
186
187 drop(obj);
189 assert_eq!(pool.available(), 1);
190
191 let obj2 = pool.get();
193 assert_eq!(pool.available(), 0);
194
195 assert_eq!(&*obj2, &[1, 2]);
197 }
198
199 #[test]
200 fn test_pool_with_reset() {
201 let pool: ObjectPool<Vec<u8>> = ObjectPool::with_reset(Vec::new, |v| v.clear());
202
203 let mut obj = pool.get();
204 obj.push(1);
205 obj.push(2);
206
207 drop(obj);
208
209 let obj2 = pool.get();
211 assert!(obj2.is_empty());
212 }
213
214 #[test]
215 fn test_pool_prefill() {
216 let pool: ObjectPool<String> = ObjectPool::new(String::new);
217
218 pool.prefill(10);
219 assert_eq!(pool.available(), 10);
220
221 let _obj = pool.get();
224 assert_eq!(pool.available(), 9);
225 }
226
227 #[test]
228 fn test_pool_max_size() {
229 let pool: ObjectPool<u64> = ObjectPool::new(|| 0).with_max_size(3);
230
231 pool.prefill(10);
232 assert_eq!(pool.available(), 3);
234
235 let o1 = pool.take();
237 let o2 = pool.take();
238 let o3 = pool.take();
239
240 assert_eq!(pool.available(), 0);
241
242 pool.put(o1);
243 pool.put(o2);
244 pool.put(o3);
245 pool.put(99); assert_eq!(pool.available(), 3);
248 }
249
250 #[test]
251 fn test_pool_take_ownership() {
252 let pool: ObjectPool<String> = ObjectPool::new(String::new);
253
254 let mut obj = pool.get();
255 obj.push_str("hello");
256
257 let owned = obj.take();
259 assert_eq!(owned, "hello");
260 assert_eq!(pool.available(), 0);
261 }
262
263 #[test]
264 fn test_pool_clear() {
265 let pool: ObjectPool<u64> = ObjectPool::new(|| 0);
266
267 pool.prefill(10);
268 assert_eq!(pool.available(), 10);
269
270 pool.clear();
271 assert_eq!(pool.available(), 0);
272 }
273
274 #[test]
275 fn test_vec_pool() {
276 let pool: VecPool<u8> = VecPool::new_vec_pool();
277
278 let mut v = pool.get();
279 v.extend_from_slice(&[1, 2, 3]);
280
281 drop(v);
282
283 let v2 = pool.get();
284 assert!(v2.is_empty()); }
286
287 #[test]
288 fn test_vec_pool_with_capacity() {
289 let pool: VecPool<u8> = VecPool::new_vec_pool_with_capacity(100);
290
291 let v = pool.get();
292 assert!(v.capacity() >= 100);
293 }
294
295 #[test]
296 fn test_pool_thread_safety() {
297 use std::sync::Arc;
298 use std::thread;
299
300 let pool: Arc<ObjectPool<Vec<u8>>> =
301 Arc::new(ObjectPool::with_reset(Vec::new, |v| v.clear()));
302
303 let handles: Vec<_> = (0..4)
304 .map(|_| {
305 let pool = Arc::clone(&pool);
306 thread::spawn(move || {
307 for _ in 0..100 {
308 let mut v = pool.get();
309 v.push(42);
310 }
312 })
313 })
314 .collect();
315
316 for h in handles {
317 h.join().unwrap();
318 }
319
320 assert!(pool.available() > 0);
322 }
323}