concurrent_pool/pool.rs
1use std::cmp::max;
2use std::sync::Arc;
3use std::sync::atomic::Ordering::*;
4use std::sync::atomic::{AtomicBool, AtomicUsize};
5
6use crossbeam_queue::ArrayQueue;
7
8use crate::entry::Prc;
9use crate::{Entry, OwnedEntry};
10
11/// A concurrent object pool.
12///
13/// # Examples
14///
15/// ```rust
16/// use concurrent_pool::{Pool, Builder};
17/// use std::sync::{Arc, mpsc};
18///
19/// let mut builder = Builder::new();
20///
21/// let pool: Arc<Pool<String>> = Arc::new(builder.capacity(10).clear_func(String::clear).build());
22///
23///
24/// let (tx, rx) = mpsc::channel();
25/// let clone_pool = pool.clone();
26/// let tx1 = tx.clone();
27/// let sender1 = std::thread::spawn(move || {
28/// let item = clone_pool.pull_owned_with(|x| x.push_str("1")).unwrap();
29/// tx1.send((1, item)).unwrap();
30/// });
31///
32/// let clone_pool = pool.clone();
33/// let sender2 = std::thread::spawn(move || {
34/// let item = clone_pool.pull_owned_with(|x| x.push_str("2")).unwrap();
35/// tx.send((2, item)).unwrap();
36/// });
37///
38/// let receiver = std::thread::spawn(move || {
39/// for _ in 0..2 {
40/// let (id, item) = rx.recv().unwrap();
41/// if id == 1 {
42/// assert_eq!(*item, "1");
43/// } else {
44/// assert_eq!(*item, "2");
45/// }
46/// }
47/// });
48///
49/// sender1.join().unwrap();
50/// sender2.join().unwrap();
51/// receiver.join().unwrap();
52/// ```
53#[derive(Debug)]
54pub struct Pool<T: Default> {
55 /// Configuration of the pool.
56 config: Config<T>,
57 /// Inner queue holding the pooled items.
58 queue: ArrayQueue<Prc<T>>,
59 /// Number of items currently allocated.
60 allocated: AtomicUsize,
61 /// Number of currently continues `surplus-pull` times
62 surpluspulls: AtomicUsize,
63 /// Whether an additional item has been allocated beyond the preallocated items.
64 additional_allocated: AtomicBool,
65}
66
67impl<T: Default> Drop for Pool<T> {
68 fn drop(&mut self) {
69 while let Some(item) = self.queue.pop() {
70 unsafe { item.drop_slow() };
71 }
72 }
73}
74
75impl<T: Default> Pool<T> {
76 /// Create a new pool with the given preallocation and capacity.
77 ///
78 /// # Example
79 ///
80 /// ```rust
81 /// use concurrent_pool::Pool;
82 ///
83 /// let pool: Pool<u32> = Pool::new(2, 5);
84 /// assert_eq!(pool.available(), 5);
85 /// assert_eq!(pool.available_noalloc(), 2);
86 /// let item = pool.pull().unwrap();
87 /// assert_eq!(pool.available_noalloc(), 1);
88 /// ```
89 pub fn new(prealloc: usize, capacity: usize) -> Self {
90 Self::with_config(Config {
91 capacity,
92 prealloc,
93 ..Default::default()
94 })
95 }
96
97 /// Create a new pool with the given capacity.
98 ///
99 /// # Example
100 ///
101 /// ```rust
102 /// use concurrent_pool::Pool;
103 ///
104 /// let pool: Pool<u32> = Pool::with_capacity(10);
105 /// assert_eq!(pool.available(), 10);
106 /// assert_eq!(pool.available_noalloc(), 10);
107 /// let item = pool.pull().unwrap();
108 /// assert_eq!(pool.available(), 9);
109 /// ```
110 pub fn with_capacity(capacity: usize) -> Self {
111 Self::new(capacity, capacity)
112 }
113
114 /// Create a new pool with half of the capacity preallocated.
115 ///
116 /// # Example
117 ///
118 /// ```rust
119 /// use concurrent_pool::Pool;
120 ///
121 /// let pool: Pool<u32> = Pool::with_capacity_half_prealloc(10);
122 /// assert_eq!(pool.available(), 10);
123 /// assert_eq!(pool.available_noalloc(), 5);
124 /// let item = pool.pull().unwrap();
125 /// assert_eq!(pool.available_noalloc(), 4);
126 /// assert_eq!(pool.in_use(), 1);
127 /// ```
128 pub fn with_capacity_half_prealloc(capacity: usize) -> Self {
129 Self::new(capacity / 2, capacity)
130 }
131
132 /// Create a new pool with the given configuration.
133 ///
134 /// # Example
135 ///
136 /// ```rust
137 /// use concurrent_pool::{Pool, Config};
138 ///
139 /// fn clear_func(x: &mut String) {
140 /// x.clear();
141 /// }
142 ///
143 /// let mut config = Config::default();
144 /// config.capacity = 1;
145 /// config.clear_func = Some(clear_func);
146 /// let pool: Pool<String> = Pool::with_config(config);
147 /// let item = pool.pull_with(|s| s.push_str("Hello, World!")).unwrap();
148 /// assert_eq!(&*item, "Hello, World!");
149 /// drop(item);
150 /// let item2 = pool.pull().unwrap();
151 /// assert_eq!(&*item2, "");
152 /// ```
153 pub fn with_config(mut config: Config<T>) -> Self {
154 config.post_process();
155 let prealloc = config.prealloc;
156 assert!(
157 prealloc <= config.capacity,
158 "prealloc must be less than or equal to capacity"
159 );
160
161 let queue_len = max(1, config.capacity);
162 let pool = Self {
163 queue: ArrayQueue::new(queue_len),
164 allocated: AtomicUsize::new(prealloc),
165 surpluspulls: AtomicUsize::new(0),
166 additional_allocated: AtomicBool::new(false),
167 config,
168 };
169 let mut items = Vec::with_capacity(prealloc);
170 for _ in 0..prealloc {
171 items.push(T::default());
172 }
173 while let Some(item) = items.pop() {
174 let _ = pool.queue.push(Prc::new_zero(item));
175 }
176 pool
177 }
178
179 /// Get in used items count.
180 ///
181 /// # Example
182 ///
183 /// ```rust
184 /// use concurrent_pool::Pool;
185 ///
186 /// let pool: Pool<u32> = Pool::with_capacity(10);
187 /// assert_eq!(pool.in_use(), 0);
188 /// let item = pool.pull().unwrap();
189 /// assert_eq!(pool.in_use(), 1);
190 /// let item2 = pool.pull().unwrap();
191 /// assert_eq!(pool.in_use(), 2);
192 /// ```
193 pub fn in_use(&self) -> usize {
194 self.allocated.load(Relaxed) - self.queue.len()
195 }
196
197 /// Get allocated items count.
198 ///
199 /// # Example
200 ///
201 /// ```rust
202 /// use concurrent_pool::Pool;
203 ///
204 /// let pool = Pool::<usize>::new(2, 5);
205 /// assert_eq!(pool.allocated(), 2);
206 /// ```
207 pub fn allocated(&self) -> usize {
208 self.allocated.load(Acquire)
209 }
210
211 /// Get available items count.
212 ///
213 /// # Example
214 ///
215 /// ```rust
216 /// use concurrent_pool::Pool;
217 ///
218 /// let pool: Pool<u32> = Pool::with_capacity(10);
219 /// assert_eq!(pool.available(), 10);
220 /// let item = pool.pull().unwrap();
221 /// assert_eq!(pool.available(), 9);
222 /// ```
223 pub fn available(&self) -> usize {
224 self.config.capacity - self.in_use()
225 }
226
227 /// Get available items count without allocation.
228 ///
229 /// # Example
230 ///
231 /// ```rust
232 /// use concurrent_pool::Pool;
233 ///
234 /// let pool: Pool<u32> = Pool::new(2, 5);
235 /// assert_eq!(pool.available_noalloc(), 2);
236 /// let item = pool.pull().unwrap();
237 /// assert_eq!(pool.available_noalloc(), 1);
238 /// let item2 = pool.pull().unwrap();
239 /// assert_eq!(pool.available_noalloc(), 0);
240 /// let item3 = pool.pull().unwrap();
241 /// assert_eq!(pool.available_noalloc(), 0);
242 /// drop(item);
243 /// assert_eq!(pool.available_noalloc(), 1);
244 /// ```
245 pub fn available_noalloc(&self) -> usize {
246 self.queue.len()
247 }
248
249 /// Check if the pool is empty.
250 ///
251 /// # Example
252 ///
253 /// ```rust
254 /// use concurrent_pool::Pool;
255 ///
256 /// let pool: Pool<u32> = Pool::with_capacity(2);
257 /// assert!(!pool.is_empty());
258 /// let item1 = pool.pull().unwrap();
259 /// assert!(!pool.is_empty());
260 /// let item2 = pool.pull().unwrap();
261 /// assert!(pool.is_empty());
262 /// drop(item1);
263 /// assert!(!pool.is_empty());
264 /// ```
265 pub fn is_empty(&self) -> bool {
266 self.available() == 0
267 }
268
269 /// Get the capacity of the pool.
270 ///
271 /// # Example
272 ///
273 /// ```rust
274 /// use concurrent_pool::Pool;
275 ///
276 /// let pool: Pool<u32> = Pool::with_capacity(10);
277 /// assert_eq!(pool.capacity(), 10);
278 /// ```
279 pub fn capacity(&self) -> usize {
280 self.config.capacity
281 }
282
283 /// Pull an item from the pool. Return `None` if the pool is empty.
284 ///
285 /// # Example
286 ///
287 /// ```rust
288 /// use concurrent_pool::Pool;
289 ///
290 /// let pool: Pool<u32> = Pool::with_capacity(2);
291 /// let item1 = pool.pull().unwrap();
292 /// assert_eq!(*item1, 0);
293 /// ```
294 pub fn pull(&self) -> Option<Entry<'_, T>> {
295 self.pull_inner().map(|item| Entry {
296 item: Some(item),
297 pool: self,
298 })
299 }
300
301 /// Pull an item from the pool and apply a function to it. Return `None` if the pool is empty.
302 ///
303 /// # Example
304 ///
305 /// ```rust
306 /// use concurrent_pool::Pool;
307 ///
308 /// let pool: Pool<u32> = Pool::with_capacity(2);
309 /// let item1 = pool.pull_with(|x| *x = 42).unwrap();
310 /// assert_eq!(*item1, 42);
311 /// ```
312 pub fn pull_with<F>(&self, func: F) -> Option<Entry<'_, T>>
313 where
314 F: FnOnce(&mut T),
315 {
316 self.pull().map(|mut entry| {
317 func(unsafe { entry.get_mut_unchecked() });
318 entry
319 })
320 }
321
322 /// Pull an owned item from the pool. Return `None` if the pool is empty.
323 ///
324 /// # Example
325 ///
326 /// ```rust
327 /// use concurrent_pool::Pool;
328 /// use std::sync::Arc;
329 ///
330 /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
331 /// let item1 = pool.pull_owned().unwrap();
332 /// assert_eq!(*item1, 0);
333 /// ```
334 pub fn pull_owned(self: &Arc<Self>) -> Option<OwnedEntry<T>> {
335 self.pull_inner().map(|item| crate::OwnedEntry {
336 item: Some(item),
337 pool: self.clone(),
338 })
339 }
340
341 /// Pull an owned item from the pool and apply a function to it. Return `None` if the pool is empty.
342 ///
343 /// # Example
344 ///
345 /// ```rust
346 /// use concurrent_pool::Pool;
347 /// use std::sync::Arc;
348 ///
349 /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
350 /// let item1 = pool.pull_owned_with(|x| *x = 42).unwrap();
351 /// assert_eq!(*item1, 42);
352 /// ```
353 pub fn pull_owned_with<F>(self: &Arc<Self>, func: F) -> Option<OwnedEntry<T>>
354 where
355 F: FnOnce(&mut T),
356 {
357 self.pull_owned().map(|mut entry| {
358 func(unsafe { entry.get_mut_unchecked() });
359 entry
360 })
361 }
362
363 /// Internal method to pull an item from the pool.
364 fn pull_inner(&self) -> Option<Prc<T>> {
365 match self.queue.pop() {
366 None => {
367 if !self.additional_allocated.load(Relaxed) {
368 self.additional_allocated.store(true, Relaxed);
369 }
370 if self.config.need_process_reclamation {
371 self.surpluspulls.store(0, SeqCst);
372 }
373 if self.allocated.load(Acquire) < self.config.capacity {
374 self.allocated.fetch_add(1, Relaxed);
375 Some(Prc::new(T::default()))
376 } else {
377 None
378 }
379 }
380 Some(item) => {
381 if self.config.need_process_reclamation {
382 let left = self.queue.len();
383 if left >= self.config.idle_threshold_for_surpluspull {
384 let surpluspulls = self.surpluspulls.fetch_add(1, Relaxed) + 1;
385 if surpluspulls >= self.config.surpluspull_threshold_for_reclaim
386 && self.additional_allocated.load(Relaxed)
387 {
388 self.reclaim();
389 }
390 } else {
391 self.surpluspulls.store(0, Relaxed);
392 }
393 }
394 item.inc_ref();
395 Some(item)
396 }
397 }
398 }
399
400 /// Reclaim an item from the pool to reduce memory usage.
401 fn reclaim(&self) {
402 if let Some(item) = self.queue.pop() {
403 unsafe { item.drop_slow() };
404 let current = self.allocated.fetch_sub(1, Release) - 1;
405 if self.config.need_process_reclamation && current <= self.config.prealloc {
406 if self.additional_allocated.load(Relaxed) {
407 self.additional_allocated.store(false, Relaxed);
408 }
409 }
410 }
411 }
412
413 /// Recycle an item back into the pool.
414 pub(crate) fn recycle(&self, mut item: Prc<T>) {
415 if let Some(func) = &self.config.clear_func {
416 func(unsafe { Prc::get_mut_unchecked(&mut item) })
417 }
418 if self.queue.push(item).is_err() {
419 panic!("It is imposible that the pool is full when recycling an item");
420 }
421 }
422}
423
424/// Configuration for the pool.
425#[derive(Debug)]
426pub struct Config<T: Default> {
427 /// Maximum capacity of the pool.
428 pub capacity: usize,
429 /// Number of items to preallocate.
430 pub prealloc: usize,
431 /// Whether to automatically reclaim allocated items and free them to reduce memory usage.
432 pub auto_reclaim: bool,
433 /// Threshold of `surplus-pull` continuous occurrence to trigger reclamation
434 /// when `auto_reclaim` is enabled.
435 pub surpluspull_threshold_for_reclaim: usize,
436 /// Threshold for idle items to judge as a surplus-pull when `auto_reclaim` is enabled.
437 pub idle_threshold_for_surpluspull: usize,
438 /// Optional function to clear or reset an item before it is reused.
439 pub clear_func: Option<fn(&mut T)>,
440 /// Internal flag to indicate if the pool needs to process reclamation.
441 need_process_reclamation: bool,
442}
443
444impl<T: Default> Default for Config<T> {
445 fn default() -> Self {
446 Self {
447 capacity: 1024,
448 prealloc: 0,
449 auto_reclaim: false,
450 clear_func: None,
451 surpluspull_threshold_for_reclaim: 0,
452 idle_threshold_for_surpluspull: 0,
453 need_process_reclamation: false,
454 }
455 }
456}
457
458impl<T: Default> Config<T> {
459 pub(crate) fn post_process(&mut self) {
460 if self.idle_threshold_for_surpluspull == 0 {
461 self.idle_threshold_for_surpluspull = max(1, self.capacity / 20);
462 }
463
464 if self.surpluspull_threshold_for_reclaim == 0 {
465 self.surpluspull_threshold_for_reclaim = max(2, self.capacity / 100);
466 }
467
468 if self.auto_reclaim && self.prealloc != self.capacity {
469 self.need_process_reclamation = true;
470 } else {
471 self.need_process_reclamation = false;
472 }
473 }
474}