concurrent_pool/pool.rs
1use std::cmp::max;
2use std::sync::Arc;
3use std::sync::atomic::Ordering::*;
4use std::sync::atomic::{AtomicBool, AtomicUsize};
5
6use crossbeam_queue::ArrayQueue;
7
8use crate::entry::Prc;
9use crate::{Entry, OwnedEntry};
10
11/// A concurrent object pool.
12///
13/// # Examples
14///
15/// ```rust
16/// use concurrent_pool::Pool;
17/// use std::sync::{Arc, mpsc};
18///
19/// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(10));
20///
21/// let (tx, rx) = mpsc::channel();
22/// let clone_pool = pool.clone();
23/// let tx1 = tx.clone();
24/// let sender1 = std::thread::spawn(move || {
25/// let item = clone_pool.pull_owned_with(|x| *x = 1).unwrap();
26/// tx1.send((1, item)).unwrap();
27/// });
28///
29/// let clone_pool = pool.clone();
30/// let sender2 = std::thread::spawn(move || {
31/// let item = clone_pool.pull_owned_with(|x| *x = 2).unwrap();
32/// tx.send((2, item)).unwrap();
33/// });
34///
35/// let receiver = std::thread::spawn(move || {
36/// for _ in 0..2 {
37/// let (id, item) = rx.recv().unwrap();
38/// if id == 1 {
39/// assert_eq!(*item, 1);
40/// } else {
41/// assert_eq!(*item, 2);
42/// }
43/// }
44/// });
45///
46/// sender1.join().unwrap();
47/// sender2.join().unwrap();
48/// receiver.join().unwrap();
49/// ```
50pub struct Pool<T: Default> {
51 /// Configuration of the pool.
52 config: Config<T>,
53 /// Inner queue holding the pooled items.
54 queue: ArrayQueue<Prc<T>>,
55 /// Number of items currently allocated.
56 allocated: AtomicUsize,
57 /// Number of currently continues `fast-pull` times
58 fastpulls: AtomicUsize,
59 /// Whether an additional item has been allocated beyond the preallocated items.
60 additional_allocated: AtomicBool,
61}
62
63impl<T: Default> Drop for Pool<T> {
64 fn drop(&mut self) {
65 while let Some(item) = self.queue.pop() {
66 unsafe { item.drop_slow() };
67 }
68 }
69}
70
71impl<T: Default> Pool<T> {
72 /// Create a new pool with the given preallocation and capacity.
73 ///
74 /// # Example
75 ///
76 /// ```rust
77 /// use concurrent_pool::Pool;
78 ///
79 /// let pool: Pool<u32> = Pool::new(2, 5);
80 /// assert_eq!(pool.available(), 5);
81 /// assert_eq!(pool.available_noalloc(), 2);
82 /// let item = pool.pull().unwrap();
83 /// assert_eq!(pool.available_noalloc(), 1);
84 /// ```
85 pub fn new(prealloc: usize, capacity: usize) -> Self {
86 Self::with_config(Config {
87 capacity,
88 prealloc,
89 ..Default::default()
90 })
91 }
92
93 /// Create a new pool with the given capacity.
94 ///
95 /// # Example
96 ///
97 /// ```rust
98 /// use concurrent_pool::Pool;
99 ///
100 /// let pool: Pool<u32> = Pool::with_capacity(10);
101 /// assert_eq!(pool.available(), 10);
102 /// assert_eq!(pool.available_noalloc(), 10);
103 /// let item = pool.pull().unwrap();
104 /// assert_eq!(pool.available(), 9);
105 /// ```
106 pub fn with_capacity(capacity: usize) -> Self {
107 Self::new(capacity, capacity)
108 }
109
110 /// Create a new pool with half of the capacity preallocated.
111 ///
112 /// # Example
113 ///
114 /// ```rust
115 /// use concurrent_pool::Pool;
116 ///
117 /// let pool: Pool<u32> = Pool::with_capacity_half_prealloc(10);
118 /// assert_eq!(pool.available(), 10);
119 /// assert_eq!(pool.available_noalloc(), 5);
120 /// let item = pool.pull().unwrap();
121 /// assert_eq!(pool.available_noalloc(), 4);
122 /// assert_eq!(pool.in_use(), 1);
123 /// ```
124 pub fn with_capacity_half_prealloc(capacity: usize) -> Self {
125 Self::new(capacity / 2, capacity)
126 }
127
128 /// Create a new pool with the given configuration.
129 ///
130 /// # Example
131 ///
132 /// ```rust
133 /// use concurrent_pool::{Pool, Config};
134 ///
135 /// fn clear_func(x: &mut String) {
136 /// x.clear();
137 /// }
138 ///
139 /// let mut config = Config::default();
140 /// config.capacity = 1;
141 /// config.clear_func = Some(clear_func);
142 /// let pool: Pool<String> = Pool::with_config(config);
143 /// let item = pool.pull_with(|s| s.push_str("Hello, World!")).unwrap();
144 /// assert_eq!(&*item, "Hello, World!");
145 /// drop(item);
146 /// let item2 = pool.pull().unwrap();
147 /// assert_eq!(&*item2, "");
148 /// ```
149 pub fn with_config(mut config: Config<T>) -> Self {
150 config.post_process();
151 let prealloc = config.prealloc;
152 assert!(
153 prealloc <= config.capacity,
154 "prealloc must be less than or equal to capacity"
155 );
156
157 let pool = Self {
158 queue: ArrayQueue::new(config.capacity),
159 allocated: AtomicUsize::new(prealloc),
160 fastpulls: AtomicUsize::new(0),
161 additional_allocated: AtomicBool::new(false),
162 config,
163 };
164 let mut items = Vec::with_capacity(prealloc);
165 for _ in 0..prealloc {
166 items.push(T::default());
167 }
168 while let Some(item) = items.pop() {
169 let _ = pool.queue.push(Prc::new_zero(item));
170 }
171 pool
172 }
173
174 /// Get in used items count.
175 ///
176 /// # Example
177 ///
178 /// ```rust
179 /// use concurrent_pool::Pool;
180 ///
181 /// let pool: Pool<u32> = Pool::with_capacity(10);
182 /// assert_eq!(pool.in_use(), 0);
183 /// let item = pool.pull().unwrap();
184 /// assert_eq!(pool.in_use(), 1);
185 /// let item2 = pool.pull().unwrap();
186 /// assert_eq!(pool.in_use(), 2);
187 /// ```
188 pub fn in_use(&self) -> usize {
189 self.allocated.load(Relaxed) - self.queue.len()
190 }
191
192 /// Get allocated items count.
193 ///
194 /// # Example
195 ///
196 /// ```rust
197 /// use concurrent_pool::Pool;
198 ///
199 /// let pool = Pool::<usize>::new(2, 5);
200 /// assert_eq!(pool.allocated(), 2);
201 /// ```
202 pub fn allocated(&self) -> usize {
203 self.allocated.load(Acquire)
204 }
205
206 /// Get available items count.
207 ///
208 /// # Example
209 ///
210 /// ```rust
211 /// use concurrent_pool::Pool;
212 ///
213 /// let pool: Pool<u32> = Pool::with_capacity(10);
214 /// assert_eq!(pool.available(), 10);
215 /// let item = pool.pull().unwrap();
216 /// assert_eq!(pool.available(), 9);
217 /// ```
218 pub fn available(&self) -> usize {
219 self.config.capacity - self.in_use()
220 }
221
222 /// Get available items count without allocation.
223 ///
224 /// # Example
225 ///
226 /// ```rust
227 /// use concurrent_pool::Pool;
228 ///
229 /// let pool: Pool<u32> = Pool::new(2, 5);
230 /// assert_eq!(pool.available_noalloc(), 2);
231 /// let item = pool.pull().unwrap();
232 /// assert_eq!(pool.available_noalloc(), 1);
233 /// let item2 = pool.pull().unwrap();
234 /// assert_eq!(pool.available_noalloc(), 0);
235 /// let item3 = pool.pull().unwrap();
236 /// assert_eq!(pool.available_noalloc(), 0);
237 /// drop(item);
238 /// assert_eq!(pool.available_noalloc(), 1);
239 /// ```
240 pub fn available_noalloc(&self) -> usize {
241 self.queue.len()
242 }
243
244 /// Check if the pool is empty.
245 ///
246 /// # Example
247 ///
248 /// ```rust
249 /// use concurrent_pool::Pool;
250 ///
251 /// let pool: Pool<u32> = Pool::with_capacity(2);
252 /// assert!(!pool.is_empty());
253 /// let item1 = pool.pull().unwrap();
254 /// assert!(!pool.is_empty());
255 /// let item2 = pool.pull().unwrap();
256 /// assert!(pool.is_empty());
257 /// drop(item1);
258 /// assert!(!pool.is_empty());
259 /// ```
260 pub fn is_empty(&self) -> bool {
261 self.available() == 0
262 }
263
264 /// Get the capacity of the pool.
265 ///
266 /// # Example
267 ///
268 /// ```rust
269 /// use concurrent_pool::Pool;
270 ///
271 /// let pool: Pool<u32> = Pool::with_capacity(10);
272 /// assert_eq!(pool.capacity(), 10);
273 /// ```
274 pub fn capacity(&self) -> usize {
275 self.config.capacity
276 }
277
278 /// Pull an item from the pool. Return `None` if the pool is empty.
279 ///
280 /// # Example
281 ///
282 /// ```rust
283 /// use concurrent_pool::Pool;
284 ///
285 /// let pool: Pool<u32> = Pool::with_capacity(2);
286 /// let item1 = pool.pull().unwrap();
287 /// assert_eq!(*item1, 0);
288 /// ```
289 pub fn pull(&self) -> Option<Entry<'_, T>> {
290 self.pull_inner().map(|item| Entry {
291 item: Some(item),
292 pool: self,
293 })
294 }
295
296 /// Pull an item from the pool and apply a function to it. Return `None` if the pool is empty.
297 ///
298 /// # Example
299 ///
300 /// ```rust
301 /// use concurrent_pool::Pool;
302 ///
303 /// let pool: Pool<u32> = Pool::with_capacity(2);
304 /// let item1 = pool.pull_with(|x| *x = 42).unwrap();
305 /// assert_eq!(*item1, 42);
306 /// ```
307 pub fn pull_with<F>(&self, func: F) -> Option<Entry<'_, T>>
308 where
309 F: FnOnce(&mut T),
310 {
311 self.pull().map(|mut entry| {
312 func(unsafe { entry.get_mut_unchecked() });
313 entry
314 })
315 }
316
317 /// Pull an owned item from the pool. Return `None` if the pool is empty.
318 ///
319 /// # Example
320 ///
321 /// ```rust
322 /// use concurrent_pool::Pool;
323 /// use std::sync::Arc;
324 ///
325 /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
326 /// let item1 = pool.pull_owned().unwrap();
327 /// assert_eq!(*item1, 0);
328 /// ```
329 pub fn pull_owned(self: &Arc<Self>) -> Option<OwnedEntry<T>> {
330 self.pull_inner().map(|item| crate::OwnedEntry {
331 item: Some(item),
332 pool: self.clone(),
333 })
334 }
335
336 /// Pull an owned item from the pool and apply a function to it. Return `None` if the pool is empty.
337 ///
338 /// # Example
339 ///
340 /// ```rust
341 /// use concurrent_pool::Pool;
342 /// use std::sync::Arc;
343 ///
344 /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
345 /// let item1 = pool.pull_owned_with(|x| *x = 42).unwrap();
346 /// assert_eq!(*item1, 42);
347 /// ```
348 pub fn pull_owned_with<F>(self: &Arc<Self>, func: F) -> Option<OwnedEntry<T>>
349 where
350 F: FnOnce(&mut T),
351 {
352 self.pull_owned().map(|mut entry| {
353 func(unsafe { entry.get_mut_unchecked() });
354 entry
355 })
356 }
357
358 /// Internal method to pull an item from the pool.
359 fn pull_inner(&self) -> Option<Prc<T>> {
360 match self.queue.pop() {
361 None => {
362 if !self.additional_allocated.load(Relaxed) {
363 self.additional_allocated.store(true, Relaxed);
364 }
365 if self.config.need_process_reclamation {
366 self.fastpulls.store(0, SeqCst);
367 }
368 if self.allocated.fetch_add(1, Relaxed) + 1 <= self.config.capacity {
369 Some(Prc::new(T::default()))
370 } else {
371 None
372 }
373 }
374 Some(item) => {
375 if self.config.need_process_reclamation {
376 let left = self.queue.len();
377 if left >= self.config.idle_threshold_for_fastpull {
378 let fastpulls = self.fastpulls.fetch_add(1, Relaxed) + 1;
379 if fastpulls >= self.config.fastpull_threshold_for_reclaim
380 && self.additional_allocated.load(Relaxed)
381 {
382 self.reclaim();
383 }
384 } else {
385 self.fastpulls.store(0, Relaxed);
386 }
387 }
388 item.inc_ref();
389 Some(item)
390 }
391 }
392 }
393
394 /// Reclaim an item from the pool to reduce memory usage.
395 fn reclaim(&self) {
396 if let Some(item) = self.queue.pop() {
397 unsafe { item.drop_slow() };
398 let current = self.allocated.fetch_sub(1, Release) - 1;
399 if self.config.need_process_reclamation && current <= self.config.prealloc {
400 if self.additional_allocated.load(Relaxed) {
401 self.additional_allocated.store(false, Relaxed);
402 }
403 }
404 }
405 }
406
407 /// Recycle an item back into the pool.
408 pub(crate) fn recycle(&self, mut item: Prc<T>) {
409 if let Some(func) = &self.config.clear_func {
410 func(unsafe { Prc::get_mut_unchecked(&mut item) })
411 }
412 if self.queue.push(item).is_err() {
413 panic!("It is imposible that the pool is full when recycling an item");
414 }
415 }
416}
417
418/// Configuration for the pool.
419pub struct Config<T: Default> {
420 /// Maximum capacity of the pool.
421 pub capacity: usize,
422 /// Number of items to preallocate.
423 pub prealloc: usize,
424 /// Whether to automatically reclaim allocated items and free them to reduce memory usage.
425 pub auto_reclaim: bool,
426 /// Threshold of `fast-pull` continuous occurrence to trigger reclamation
427 /// when `auto_reclaim` is enabled.
428 pub fastpull_threshold_for_reclaim: usize,
429 /// Threshold for idle items to judge as a fast-pull when `auto_reclaim` is enabled.
430 pub idle_threshold_for_fastpull: usize,
431 /// Optional function to clear or reset an item before it is reused.
432 pub clear_func: Option<fn(&mut T)>,
433 /// Internal flag to indicate if the pool needs to process reclamation.
434 need_process_reclamation: bool,
435}
436
437impl<T: Default> Default for Config<T> {
438 fn default() -> Self {
439 Self {
440 capacity: 1024,
441 prealloc: 0,
442 auto_reclaim: false,
443 clear_func: None,
444 fastpull_threshold_for_reclaim: 0,
445 idle_threshold_for_fastpull: 0,
446 need_process_reclamation: false,
447 }
448 }
449}
450
451impl<T: Default> Config<T> {
452 pub(crate) fn post_process(&mut self) {
453 if self.idle_threshold_for_fastpull == 0 {
454 self.idle_threshold_for_fastpull = max(1, self.capacity / 20);
455 }
456
457 if self.auto_reclaim && self.prealloc != self.capacity {
458 self.need_process_reclamation = true;
459 } else {
460 self.need_process_reclamation = false;
461 }
462 }
463}