concurrent_pool/pool.rs
1use std::cmp::max;
2use std::sync::Arc;
3use std::sync::atomic::Ordering::*;
4use std::sync::atomic::{AtomicBool, AtomicUsize};
5
6use crossbeam_queue::ArrayQueue;
7
8use crate::entry::Prc;
9use crate::{Entry, OwnedEntry};
10
11/// A concurrent object pool.
12///
13/// # Examples
14///
15/// ```rust
16/// use concurrent_pool::{Pool, Builder};
17/// use std::sync::{Arc, mpsc};
18///
19/// let mut builder = Builder::new();
20///
21/// let pool: Arc<Pool<String>> = Arc::new(builder.capacity(10).clear_func(String::clear).build());
22///
23///
24/// let (tx, rx) = mpsc::channel();
25/// let clone_pool = pool.clone();
26/// let tx1 = tx.clone();
27/// let sender1 = std::thread::spawn(move || {
28/// let item = clone_pool.pull_owned_with(|x| x.push_str("1")).unwrap();
29/// tx1.send((1, item)).unwrap();
30/// });
31///
32/// let clone_pool = pool.clone();
33/// let sender2 = std::thread::spawn(move || {
34/// let item = clone_pool.pull_owned_with(|x| x.push_str("2")).unwrap();
35/// tx.send((2, item)).unwrap();
36/// });
37///
38/// let receiver = std::thread::spawn(move || {
39/// for _ in 0..2 {
40/// let (id, item) = rx.recv().unwrap();
41/// if id == 1 {
42/// assert_eq!(*item, "1");
43/// } else {
44/// assert_eq!(*item, "2");
45/// }
46/// }
47/// });
48///
49/// sender1.join().unwrap();
50/// sender2.join().unwrap();
51/// receiver.join().unwrap();
52/// ```
53pub struct Pool<T: Default> {
54 /// Configuration of the pool.
55 config: Config<T>,
56 /// Inner queue holding the pooled items.
57 queue: ArrayQueue<Prc<T>>,
58 /// Number of items currently allocated.
59 allocated: AtomicUsize,
60 /// Number of currently continues `surplus-pull` times
61 surpluspulls: AtomicUsize,
62 /// Whether an additional item has been allocated beyond the preallocated items.
63 additional_allocated: AtomicBool,
64}
65
66impl<T: Default> Drop for Pool<T> {
67 fn drop(&mut self) {
68 while let Some(item) = self.queue.pop() {
69 unsafe { item.drop_slow() };
70 }
71 }
72}
73
74impl<T: Default> Pool<T> {
75 /// Create a new pool with the given preallocation and capacity.
76 ///
77 /// # Example
78 ///
79 /// ```rust
80 /// use concurrent_pool::Pool;
81 ///
82 /// let pool: Pool<u32> = Pool::new(2, 5);
83 /// assert_eq!(pool.available(), 5);
84 /// assert_eq!(pool.available_noalloc(), 2);
85 /// let item = pool.pull().unwrap();
86 /// assert_eq!(pool.available_noalloc(), 1);
87 /// ```
88 pub fn new(prealloc: usize, capacity: usize) -> Self {
89 Self::with_config(Config {
90 capacity,
91 prealloc,
92 ..Default::default()
93 })
94 }
95
96 /// Create a new pool with the given capacity.
97 ///
98 /// # Example
99 ///
100 /// ```rust
101 /// use concurrent_pool::Pool;
102 ///
103 /// let pool: Pool<u32> = Pool::with_capacity(10);
104 /// assert_eq!(pool.available(), 10);
105 /// assert_eq!(pool.available_noalloc(), 10);
106 /// let item = pool.pull().unwrap();
107 /// assert_eq!(pool.available(), 9);
108 /// ```
109 pub fn with_capacity(capacity: usize) -> Self {
110 Self::new(capacity, capacity)
111 }
112
113 /// Create a new pool with half of the capacity preallocated.
114 ///
115 /// # Example
116 ///
117 /// ```rust
118 /// use concurrent_pool::Pool;
119 ///
120 /// let pool: Pool<u32> = Pool::with_capacity_half_prealloc(10);
121 /// assert_eq!(pool.available(), 10);
122 /// assert_eq!(pool.available_noalloc(), 5);
123 /// let item = pool.pull().unwrap();
124 /// assert_eq!(pool.available_noalloc(), 4);
125 /// assert_eq!(pool.in_use(), 1);
126 /// ```
127 pub fn with_capacity_half_prealloc(capacity: usize) -> Self {
128 Self::new(capacity / 2, capacity)
129 }
130
131 /// Create a new pool with the given configuration.
132 ///
133 /// # Example
134 ///
135 /// ```rust
136 /// use concurrent_pool::{Pool, Config};
137 ///
138 /// fn clear_func(x: &mut String) {
139 /// x.clear();
140 /// }
141 ///
142 /// let mut config = Config::default();
143 /// config.capacity = 1;
144 /// config.clear_func = Some(clear_func);
145 /// let pool: Pool<String> = Pool::with_config(config);
146 /// let item = pool.pull_with(|s| s.push_str("Hello, World!")).unwrap();
147 /// assert_eq!(&*item, "Hello, World!");
148 /// drop(item);
149 /// let item2 = pool.pull().unwrap();
150 /// assert_eq!(&*item2, "");
151 /// ```
152 pub fn with_config(mut config: Config<T>) -> Self {
153 config.post_process();
154 let prealloc = config.prealloc;
155 assert!(
156 prealloc <= config.capacity,
157 "prealloc must be less than or equal to capacity"
158 );
159
160 let queue_len = max(1, config.capacity);
161 let pool = Self {
162 queue: ArrayQueue::new(queue_len),
163 allocated: AtomicUsize::new(prealloc),
164 surpluspulls: AtomicUsize::new(0),
165 additional_allocated: AtomicBool::new(false),
166 config,
167 };
168 let mut items = Vec::with_capacity(prealloc);
169 for _ in 0..prealloc {
170 items.push(T::default());
171 }
172 while let Some(item) = items.pop() {
173 let _ = pool.queue.push(Prc::new_zero(item));
174 }
175 pool
176 }
177
178 /// Get in used items count.
179 ///
180 /// # Example
181 ///
182 /// ```rust
183 /// use concurrent_pool::Pool;
184 ///
185 /// let pool: Pool<u32> = Pool::with_capacity(10);
186 /// assert_eq!(pool.in_use(), 0);
187 /// let item = pool.pull().unwrap();
188 /// assert_eq!(pool.in_use(), 1);
189 /// let item2 = pool.pull().unwrap();
190 /// assert_eq!(pool.in_use(), 2);
191 /// ```
192 pub fn in_use(&self) -> usize {
193 self.allocated.load(Relaxed) - self.queue.len()
194 }
195
196 /// Get allocated items count.
197 ///
198 /// # Example
199 ///
200 /// ```rust
201 /// use concurrent_pool::Pool;
202 ///
203 /// let pool = Pool::<usize>::new(2, 5);
204 /// assert_eq!(pool.allocated(), 2);
205 /// ```
206 pub fn allocated(&self) -> usize {
207 self.allocated.load(Acquire)
208 }
209
210 /// Get available items count.
211 ///
212 /// # Example
213 ///
214 /// ```rust
215 /// use concurrent_pool::Pool;
216 ///
217 /// let pool: Pool<u32> = Pool::with_capacity(10);
218 /// assert_eq!(pool.available(), 10);
219 /// let item = pool.pull().unwrap();
220 /// assert_eq!(pool.available(), 9);
221 /// ```
222 pub fn available(&self) -> usize {
223 self.config.capacity - self.in_use()
224 }
225
226 /// Get available items count without allocation.
227 ///
228 /// # Example
229 ///
230 /// ```rust
231 /// use concurrent_pool::Pool;
232 ///
233 /// let pool: Pool<u32> = Pool::new(2, 5);
234 /// assert_eq!(pool.available_noalloc(), 2);
235 /// let item = pool.pull().unwrap();
236 /// assert_eq!(pool.available_noalloc(), 1);
237 /// let item2 = pool.pull().unwrap();
238 /// assert_eq!(pool.available_noalloc(), 0);
239 /// let item3 = pool.pull().unwrap();
240 /// assert_eq!(pool.available_noalloc(), 0);
241 /// drop(item);
242 /// assert_eq!(pool.available_noalloc(), 1);
243 /// ```
244 pub fn available_noalloc(&self) -> usize {
245 self.queue.len()
246 }
247
248 /// Check if the pool is empty.
249 ///
250 /// # Example
251 ///
252 /// ```rust
253 /// use concurrent_pool::Pool;
254 ///
255 /// let pool: Pool<u32> = Pool::with_capacity(2);
256 /// assert!(!pool.is_empty());
257 /// let item1 = pool.pull().unwrap();
258 /// assert!(!pool.is_empty());
259 /// let item2 = pool.pull().unwrap();
260 /// assert!(pool.is_empty());
261 /// drop(item1);
262 /// assert!(!pool.is_empty());
263 /// ```
264 pub fn is_empty(&self) -> bool {
265 self.available() == 0
266 }
267
268 /// Get the capacity of the pool.
269 ///
270 /// # Example
271 ///
272 /// ```rust
273 /// use concurrent_pool::Pool;
274 ///
275 /// let pool: Pool<u32> = Pool::with_capacity(10);
276 /// assert_eq!(pool.capacity(), 10);
277 /// ```
278 pub fn capacity(&self) -> usize {
279 self.config.capacity
280 }
281
282 /// Pull an item from the pool. Return `None` if the pool is empty.
283 ///
284 /// # Example
285 ///
286 /// ```rust
287 /// use concurrent_pool::Pool;
288 ///
289 /// let pool: Pool<u32> = Pool::with_capacity(2);
290 /// let item1 = pool.pull().unwrap();
291 /// assert_eq!(*item1, 0);
292 /// ```
293 pub fn pull(&self) -> Option<Entry<'_, T>> {
294 self.pull_inner().map(|item| Entry {
295 item: Some(item),
296 pool: self,
297 })
298 }
299
300 /// Pull an item from the pool and apply a function to it. Return `None` if the pool is empty.
301 ///
302 /// # Example
303 ///
304 /// ```rust
305 /// use concurrent_pool::Pool;
306 ///
307 /// let pool: Pool<u32> = Pool::with_capacity(2);
308 /// let item1 = pool.pull_with(|x| *x = 42).unwrap();
309 /// assert_eq!(*item1, 42);
310 /// ```
311 pub fn pull_with<F>(&self, func: F) -> Option<Entry<'_, T>>
312 where
313 F: FnOnce(&mut T),
314 {
315 self.pull().map(|mut entry| {
316 func(unsafe { entry.get_mut_unchecked() });
317 entry
318 })
319 }
320
321 /// Pull an owned item from the pool. Return `None` if the pool is empty.
322 ///
323 /// # Example
324 ///
325 /// ```rust
326 /// use concurrent_pool::Pool;
327 /// use std::sync::Arc;
328 ///
329 /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
330 /// let item1 = pool.pull_owned().unwrap();
331 /// assert_eq!(*item1, 0);
332 /// ```
333 pub fn pull_owned(self: &Arc<Self>) -> Option<OwnedEntry<T>> {
334 self.pull_inner().map(|item| crate::OwnedEntry {
335 item: Some(item),
336 pool: self.clone(),
337 })
338 }
339
340 /// Pull an owned item from the pool and apply a function to it. Return `None` if the pool is empty.
341 ///
342 /// # Example
343 ///
344 /// ```rust
345 /// use concurrent_pool::Pool;
346 /// use std::sync::Arc;
347 ///
348 /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
349 /// let item1 = pool.pull_owned_with(|x| *x = 42).unwrap();
350 /// assert_eq!(*item1, 42);
351 /// ```
352 pub fn pull_owned_with<F>(self: &Arc<Self>, func: F) -> Option<OwnedEntry<T>>
353 where
354 F: FnOnce(&mut T),
355 {
356 self.pull_owned().map(|mut entry| {
357 func(unsafe { entry.get_mut_unchecked() });
358 entry
359 })
360 }
361
362 /// Internal method to pull an item from the pool.
363 fn pull_inner(&self) -> Option<Prc<T>> {
364 match self.queue.pop() {
365 None => {
366 if !self.additional_allocated.load(Relaxed) {
367 self.additional_allocated.store(true, Relaxed);
368 }
369 if self.config.need_process_reclamation {
370 self.surpluspulls.store(0, SeqCst);
371 }
372 if self.allocated.load(Acquire) < self.config.capacity {
373 self.allocated.fetch_add(1, Relaxed);
374 Some(Prc::new(T::default()))
375 } else {
376 None
377 }
378 }
379 Some(item) => {
380 if self.config.need_process_reclamation {
381 let left = self.queue.len();
382 if left >= self.config.idle_threshold_for_surpluspull {
383 let surpluspulls = self.surpluspulls.fetch_add(1, Relaxed) + 1;
384 if surpluspulls >= self.config.surpluspull_threshold_for_reclaim
385 && self.additional_allocated.load(Relaxed)
386 {
387 self.reclaim();
388 }
389 } else {
390 self.surpluspulls.store(0, Relaxed);
391 }
392 }
393 item.inc_ref();
394 Some(item)
395 }
396 }
397 }
398
399 /// Reclaim an item from the pool to reduce memory usage.
400 fn reclaim(&self) {
401 if let Some(item) = self.queue.pop() {
402 unsafe { item.drop_slow() };
403 let current = self.allocated.fetch_sub(1, Release) - 1;
404 if self.config.need_process_reclamation && current <= self.config.prealloc {
405 if self.additional_allocated.load(Relaxed) {
406 self.additional_allocated.store(false, Relaxed);
407 }
408 }
409 }
410 }
411
412 /// Recycle an item back into the pool.
413 pub(crate) fn recycle(&self, mut item: Prc<T>) {
414 if let Some(func) = &self.config.clear_func {
415 func(unsafe { Prc::get_mut_unchecked(&mut item) })
416 }
417 if self.queue.push(item).is_err() {
418 panic!("It is imposible that the pool is full when recycling an item");
419 }
420 }
421}
422
423/// Configuration for the pool.
424pub struct Config<T: Default> {
425 /// Maximum capacity of the pool.
426 pub capacity: usize,
427 /// Number of items to preallocate.
428 pub prealloc: usize,
429 /// Whether to automatically reclaim allocated items and free them to reduce memory usage.
430 pub auto_reclaim: bool,
431 /// Threshold of `surplus-pull` continuous occurrence to trigger reclamation
432 /// when `auto_reclaim` is enabled.
433 pub surpluspull_threshold_for_reclaim: usize,
434 /// Threshold for idle items to judge as a surplus-pull when `auto_reclaim` is enabled.
435 pub idle_threshold_for_surpluspull: usize,
436 /// Optional function to clear or reset an item before it is reused.
437 pub clear_func: Option<fn(&mut T)>,
438 /// Internal flag to indicate if the pool needs to process reclamation.
439 need_process_reclamation: bool,
440}
441
442impl<T: Default> Default for Config<T> {
443 fn default() -> Self {
444 Self {
445 capacity: 1024,
446 prealloc: 0,
447 auto_reclaim: false,
448 clear_func: None,
449 surpluspull_threshold_for_reclaim: 0,
450 idle_threshold_for_surpluspull: 0,
451 need_process_reclamation: false,
452 }
453 }
454}
455
456impl<T: Default> Config<T> {
457 pub(crate) fn post_process(&mut self) {
458 if self.idle_threshold_for_surpluspull == 0 {
459 self.idle_threshold_for_surpluspull = max(1, self.capacity / 20);
460 }
461
462 if self.surpluspull_threshold_for_reclaim == 0 {
463 self.surpluspull_threshold_for_reclaim = max(2, self.capacity / 100);
464 }
465
466 if self.auto_reclaim && self.prealloc != self.capacity {
467 self.need_process_reclamation = true;
468 } else {
469 self.need_process_reclamation = false;
470 }
471 }
472}