concurrent_pool/pool.rs
1use std::cmp::max;
2use std::sync::Arc;
3use std::sync::atomic::Ordering::*;
4use std::sync::atomic::{AtomicBool, AtomicUsize};
5
6use crossbeam_queue::ArrayQueue;
7
8use crate::entry::Prc;
9use crate::{Entry, OwnedEntry};
10
11/// A concurrent object pool.
12///
13/// # Examples
14///
15/// ```rust
16/// use concurrent_pool::{Pool, Builder};
17/// use std::sync::{Arc, mpsc};
18///
19/// let mut builder = Builder::new();
20///
21/// let pool: Arc<Pool<String>> = Arc::new(builder.capacity(10).clear_func(String::clear).build());
22///
23///
24/// let (tx, rx) = mpsc::channel();
25/// let clone_pool = pool.clone();
26/// let tx1 = tx.clone();
27/// let sender1 = std::thread::spawn(move || {
28/// let item = clone_pool.pull_owned_with(|x| x.push_str("1")).unwrap();
29/// tx1.send((1, item)).unwrap();
30/// });
31///
32/// let clone_pool = pool.clone();
33/// let sender2 = std::thread::spawn(move || {
34/// let item = clone_pool.pull_owned_with(|x| x.push_str("2")).unwrap();
35/// tx.send((2, item)).unwrap();
36/// });
37///
38/// let receiver = std::thread::spawn(move || {
39/// for _ in 0..2 {
40/// let (id, item) = rx.recv().unwrap();
41/// if id == 1 {
42/// assert_eq!(*item, "1");
43/// } else {
44/// assert_eq!(*item, "2");
45/// }
46/// }
47/// });
48///
49/// sender1.join().unwrap();
50/// sender2.join().unwrap();
51/// receiver.join().unwrap();
52/// ```
53#[derive(Debug)]
54pub struct Pool<T: Default> {
55 /// Configuration of the pool.
56 config: Config<T>,
57 /// Inner queue holding the pooled items.
58 queue: ArrayQueue<Prc<T>>,
59 /// Number of items currently allocated.
60 allocated: AtomicUsize,
61 /// Number of currently continues `surplus-pull` times
62 surpluspulls: AtomicUsize,
63 /// Whether an additional item has been allocated beyond the preallocated items.
64 additional_allocated: AtomicBool,
65}
66
67impl<T: Default> Drop for Pool<T> {
68 fn drop(&mut self) {
69 while let Some(item) = self.queue.pop() {
70 unsafe { item.drop_slow() };
71 }
72 }
73}
74
75impl<T: Default> Pool<T> {
76 /// Create a new pool with the given preallocation and capacity.
77 ///
78 /// # Example
79 ///
80 /// ```rust
81 /// use concurrent_pool::Pool;
82 ///
83 /// let pool: Pool<u32> = Pool::new(2, 5);
84 /// assert_eq!(pool.available(), 5);
85 /// assert_eq!(pool.available_noalloc(), 2);
86 /// let item = pool.pull().unwrap();
87 /// assert_eq!(pool.available_noalloc(), 1);
88 /// ```
89 pub fn new(prealloc: usize, capacity: usize) -> Self {
90 Self::with_config(Config {
91 capacity,
92 prealloc,
93 ..Default::default()
94 })
95 }
96
97 /// Create a new pool with the given capacity.
98 ///
99 /// # Example
100 ///
101 /// ```rust
102 /// use concurrent_pool::Pool;
103 ///
104 /// let pool: Pool<u32> = Pool::with_capacity(10);
105 /// assert_eq!(pool.available(), 10);
106 /// assert_eq!(pool.available_noalloc(), 10);
107 /// let item = pool.pull().unwrap();
108 /// assert_eq!(pool.available(), 9);
109 /// ```
110 pub fn with_capacity(capacity: usize) -> Self {
111 Self::new(capacity, capacity)
112 }
113
114 /// Create a new pool with half of the capacity preallocated.
115 ///
116 /// # Example
117 ///
118 /// ```rust
119 /// use concurrent_pool::Pool;
120 ///
121 /// let pool: Pool<u32> = Pool::with_capacity_half_prealloc(10);
122 /// assert_eq!(pool.available(), 10);
123 /// assert_eq!(pool.available_noalloc(), 5);
124 /// let item = pool.pull().unwrap();
125 /// assert_eq!(pool.available_noalloc(), 4);
126 /// assert_eq!(pool.in_use(), 1);
127 /// ```
128 pub fn with_capacity_half_prealloc(capacity: usize) -> Self {
129 Self::new(capacity / 2, capacity)
130 }
131
132 /// Create a new pool with the given configuration.
133 ///
134 /// # Example
135 ///
136 /// ```rust
137 /// use concurrent_pool::{Pool, Config};
138 ///
139 /// fn clear_func(x: &mut String) {
140 /// x.clear();
141 /// }
142 ///
143 /// let mut config = Config::default();
144 /// config.capacity = 1;
145 /// config.clear_func = Some(clear_func);
146 /// let pool: Pool<String> = Pool::with_config(config);
147 /// let item = pool.pull_with(|s| s.push_str("Hello, World!")).unwrap();
148 /// assert_eq!(&*item, "Hello, World!");
149 /// drop(item);
150 /// let item2 = pool.pull().unwrap();
151 /// assert_eq!(&*item2, "");
152 /// ```
153 pub fn with_config(mut config: Config<T>) -> Self {
154 config.post_process();
155 let prealloc = config.prealloc;
156 assert!(
157 prealloc <= config.capacity,
158 "prealloc must be less than or equal to capacity"
159 );
160
161 let queue_len = max(1, config.capacity);
162 let pool = Self {
163 queue: ArrayQueue::new(queue_len),
164 allocated: AtomicUsize::new(prealloc),
165 surpluspulls: AtomicUsize::new(0),
166 additional_allocated: AtomicBool::new(false),
167 config,
168 };
169 let mut items = Vec::with_capacity(prealloc);
170 for _ in 0..prealloc {
171 items.push(T::default());
172 }
173 while let Some(item) = items.pop() {
174 let _ = pool.queue.push(Prc::new_zero(item));
175 }
176 pool
177 }
178
179 /// Enable automatic reclamation of allocated items to reduce memory usage.
180 pub fn enable_auto_reclaim(&mut self) {
181 self.config.auto_reclaim = true;
182 self.config.post_process();
183 }
184
185 /// Get in used items count.
186 ///
187 /// # Example
188 ///
189 /// ```rust
190 /// use concurrent_pool::Pool;
191 ///
192 /// let pool: Pool<u32> = Pool::with_capacity(10);
193 /// assert_eq!(pool.in_use(), 0);
194 /// let item = pool.pull().unwrap();
195 /// assert_eq!(pool.in_use(), 1);
196 /// let item2 = pool.pull().unwrap();
197 /// assert_eq!(pool.in_use(), 2);
198 /// ```
199 pub fn in_use(&self) -> usize {
200 self.allocated.load(Relaxed) - self.queue.len()
201 }
202
203 /// Get allocated items count.
204 ///
205 /// # Example
206 ///
207 /// ```rust
208 /// use concurrent_pool::Pool;
209 ///
210 /// let pool = Pool::<usize>::new(2, 5);
211 /// assert_eq!(pool.allocated(), 2);
212 /// ```
213 pub fn allocated(&self) -> usize {
214 self.allocated.load(Acquire)
215 }
216
217 /// Get available items count.
218 ///
219 /// # Example
220 ///
221 /// ```rust
222 /// use concurrent_pool::Pool;
223 ///
224 /// let pool: Pool<u32> = Pool::with_capacity(10);
225 /// assert_eq!(pool.available(), 10);
226 /// let item = pool.pull().unwrap();
227 /// assert_eq!(pool.available(), 9);
228 /// ```
229 pub fn available(&self) -> usize {
230 self.config.capacity - self.in_use()
231 }
232
233 /// Get available items count without allocation.
234 ///
235 /// # Example
236 ///
237 /// ```rust
238 /// use concurrent_pool::Pool;
239 ///
240 /// let pool: Pool<u32> = Pool::new(2, 5);
241 /// assert_eq!(pool.available_noalloc(), 2);
242 /// let item = pool.pull().unwrap();
243 /// assert_eq!(pool.available_noalloc(), 1);
244 /// let item2 = pool.pull().unwrap();
245 /// assert_eq!(pool.available_noalloc(), 0);
246 /// let item3 = pool.pull().unwrap();
247 /// assert_eq!(pool.available_noalloc(), 0);
248 /// drop(item);
249 /// assert_eq!(pool.available_noalloc(), 1);
250 /// ```
251 pub fn available_noalloc(&self) -> usize {
252 self.queue.len()
253 }
254
255 /// Check if the pool is empty.
256 ///
257 /// # Example
258 ///
259 /// ```rust
260 /// use concurrent_pool::Pool;
261 ///
262 /// let pool: Pool<u32> = Pool::with_capacity(2);
263 /// assert!(!pool.is_empty());
264 /// let item1 = pool.pull().unwrap();
265 /// assert!(!pool.is_empty());
266 /// let item2 = pool.pull().unwrap();
267 /// assert!(pool.is_empty());
268 /// drop(item1);
269 /// assert!(!pool.is_empty());
270 /// ```
271 pub fn is_empty(&self) -> bool {
272 self.available() == 0
273 }
274
275 /// Get the capacity of the pool.
276 ///
277 /// # Example
278 ///
279 /// ```rust
280 /// use concurrent_pool::Pool;
281 ///
282 /// let pool: Pool<u32> = Pool::with_capacity(10);
283 /// assert_eq!(pool.capacity(), 10);
284 /// ```
285 pub fn capacity(&self) -> usize {
286 self.config.capacity
287 }
288
289 /// Pull an item from the pool. Return `None` if the pool is empty.
290 ///
291 /// # Example
292 ///
293 /// ```rust
294 /// use concurrent_pool::Pool;
295 ///
296 /// let pool: Pool<u32> = Pool::with_capacity(2);
297 /// let item1 = pool.pull().unwrap();
298 /// assert_eq!(*item1, 0);
299 /// ```
300 pub fn pull(&self) -> Option<Entry<'_, T>> {
301 self.pull_inner().map(|item| Entry {
302 item: Some(item),
303 pool: self,
304 })
305 }
306
307 /// Pull an item from the pool and apply a function to it. Return `None` if the pool is empty.
308 ///
309 /// # Example
310 ///
311 /// ```rust
312 /// use concurrent_pool::Pool;
313 ///
314 /// let pool: Pool<u32> = Pool::with_capacity(2);
315 /// let item1 = pool.pull_with(|x| *x = 42).unwrap();
316 /// assert_eq!(*item1, 42);
317 /// ```
318 pub fn pull_with<F>(&self, func: F) -> Option<Entry<'_, T>>
319 where
320 F: FnOnce(&mut T),
321 {
322 self.pull().map(|mut entry| {
323 func(unsafe { entry.get_mut_unchecked() });
324 entry
325 })
326 }
327
328 /// Pull an owned item from the pool. Return `None` if the pool is empty.
329 ///
330 /// # Example
331 ///
332 /// ```rust
333 /// use concurrent_pool::Pool;
334 /// use std::sync::Arc;
335 ///
336 /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
337 /// let item1 = pool.pull_owned().unwrap();
338 /// assert_eq!(*item1, 0);
339 /// ```
340 pub fn pull_owned(self: &Arc<Self>) -> Option<OwnedEntry<T>> {
341 self.pull_inner().map(|item| crate::OwnedEntry {
342 item: Some(item),
343 pool: self.clone(),
344 })
345 }
346
347 /// Pull an owned item from the pool and apply a function to it. Return `None` if the pool is empty.
348 ///
349 /// # Example
350 ///
351 /// ```rust
352 /// use concurrent_pool::Pool;
353 /// use std::sync::Arc;
354 ///
355 /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
356 /// let item1 = pool.pull_owned_with(|x| *x = 42).unwrap();
357 /// assert_eq!(*item1, 42);
358 /// ```
359 pub fn pull_owned_with<F>(self: &Arc<Self>, func: F) -> Option<OwnedEntry<T>>
360 where
361 F: FnOnce(&mut T),
362 {
363 self.pull_owned().map(|mut entry| {
364 func(unsafe { entry.get_mut_unchecked() });
365 entry
366 })
367 }
368
369 /// Internal method to pull an item from the pool.
370 fn pull_inner(&self) -> Option<Prc<T>> {
371 match self.queue.pop() {
372 None => {
373 if !self.additional_allocated.load(Relaxed) {
374 self.additional_allocated.store(true, Relaxed);
375 }
376 if self.config.need_process_reclamation {
377 self.surpluspulls.store(0, SeqCst);
378 }
379
380 match self.allocated.fetch_update(AcqRel, Acquire, |current| {
381 match current < self.config.capacity {
382 true => Some(current + 1),
383 false => None,
384 }
385 }) {
386 Ok(_) => Some(Prc::new(T::default())),
387 Err(_) => None,
388 }
389 }
390 Some(item) => {
391 if self.config.need_process_reclamation {
392 let left = self.queue.len();
393 if left >= self.config.idle_threshold_for_surpluspull {
394 let surpluspulls = self.surpluspulls.fetch_add(1, Relaxed) + 1;
395 if surpluspulls >= self.config.surpluspull_threshold_for_reclaim
396 && self.additional_allocated.load(Relaxed)
397 {
398 self.reclaim();
399 }
400 } else {
401 self.surpluspulls.store(0, Relaxed);
402 }
403 }
404 item.inc_ref();
405 Some(item)
406 }
407 }
408 }
409
410 /// Reclaim an item from the pool to reduce memory usage.
411 fn reclaim(&self) {
412 if let Some(item) = self.queue.pop() {
413 unsafe { item.drop_slow() };
414 let current = self.allocated.fetch_sub(1, Release) - 1;
415 if self.config.need_process_reclamation && current <= self.config.prealloc {
416 if self.additional_allocated.load(Relaxed) {
417 self.additional_allocated.store(false, Relaxed);
418 }
419 }
420 }
421 }
422
423 /// Recycle an item back into the pool.
424 pub(crate) fn recycle(&self, mut item: Prc<T>) {
425 if let Some(func) = &self.config.clear_func {
426 func(unsafe { Prc::get_mut_unchecked(&mut item) })
427 }
428 if self.queue.push(item).is_err() {
429 panic!("It is imposible that the pool is full when recycling an item");
430 }
431 }
432}
433
434/// Configuration for the pool.
435#[derive(Debug)]
436pub struct Config<T: Default> {
437 /// Maximum capacity of the pool.
438 pub capacity: usize,
439 /// Number of items to preallocate.
440 pub prealloc: usize,
441 /// Whether to automatically reclaim allocated items and free them to reduce memory usage.
442 pub auto_reclaim: bool,
443 /// Threshold of `surplus-pull` continuous occurrence to trigger reclamation
444 /// when `auto_reclaim` is enabled.
445 pub surpluspull_threshold_for_reclaim: usize,
446 /// Threshold for idle items to judge as a surplus-pull when `auto_reclaim` is enabled.
447 pub idle_threshold_for_surpluspull: usize,
448 /// Optional function to clear or reset an item before it is reused.
449 pub clear_func: Option<fn(&mut T)>,
450 /// Internal flag to indicate if the pool needs to process reclamation.
451 need_process_reclamation: bool,
452}
453
454impl<T: Default> Default for Config<T> {
455 fn default() -> Self {
456 Self {
457 capacity: 1024,
458 prealloc: 0,
459 auto_reclaim: false,
460 clear_func: None,
461 surpluspull_threshold_for_reclaim: 0,
462 idle_threshold_for_surpluspull: 0,
463 need_process_reclamation: false,
464 }
465 }
466}
467
468impl<T: Default> Config<T> {
469 pub(crate) fn post_process(&mut self) {
470 if self.idle_threshold_for_surpluspull == 0 {
471 self.idle_threshold_for_surpluspull = max(1, self.capacity / 20);
472 }
473
474 if self.surpluspull_threshold_for_reclaim == 0 {
475 self.surpluspull_threshold_for_reclaim = max(2, self.capacity / 100);
476 }
477
478 if self.auto_reclaim && self.prealloc != self.capacity {
479 self.need_process_reclamation = true;
480 } else {
481 self.need_process_reclamation = false;
482 }
483 }
484}