concurrent_pool/
pool.rs

1use std::cmp::max;
2use std::sync::Arc;
3use std::sync::atomic::Ordering::*;
4use std::sync::atomic::{AtomicBool, AtomicUsize};
5
6use crossbeam_queue::ArrayQueue;
7
8use crate::entry::Prc;
9use crate::{Entry, OwnedEntry};
10
11/// A concurrent object pool.
12///
13/// # Examples
14///
15/// ```rust
16/// use concurrent_pool::Pool;
17/// use std::sync::{Arc, mpsc};
18///
19/// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(10));
20///
21/// let (tx, rx) = mpsc::channel();
22/// let clone_pool = pool.clone();
23/// let tx1 = tx.clone();
24/// let sender1 = std::thread::spawn(move || {
25///     let item = clone_pool.pull_owned_with(|x| *x = 1).unwrap();
26///     tx1.send((1, item)).unwrap();
27/// });
28///
29/// let clone_pool = pool.clone();
30/// let sender2 = std::thread::spawn(move || {
31///     let item = clone_pool.pull_owned_with(|x| *x = 2).unwrap();
32///     tx.send((2, item)).unwrap();
33/// });
34///
35/// let receiver = std::thread::spawn(move || {
36///     for _ in 0..2 {
37///         let (id, item) = rx.recv().unwrap();
38///         if id == 1 {
39///             assert_eq!(*item, 1);
40///         } else {
41///             assert_eq!(*item, 2);
42///         }
43///     }
44/// });
45///
46/// sender1.join().unwrap();
47/// sender2.join().unwrap();
48/// receiver.join().unwrap();
49/// ```
50pub struct Pool<T: Default> {
51    /// Configuration of the pool.
52    config: Config<T>,
53    /// Inner queue holding the pooled items.
54    queue: ArrayQueue<Prc<T>>,
55    /// Number of items currently allocated.
56    allocated: AtomicUsize,
57    /// Number of currently continues `fast-pull` times
58    fastpulls: AtomicUsize,
59    /// Whether an additional item has been allocated beyond the preallocated items.
60    additional_allocated: AtomicBool,
61}
62
63impl<T: Default> Drop for Pool<T> {
64    fn drop(&mut self) {
65        while let Some(item) = self.queue.pop() {
66            unsafe { item.drop_slow() };
67        }
68    }
69}
70
71impl<T: Default> Pool<T> {
72    /// Create a new pool with the given preallocation and capacity.
73    ///
74    /// # Example
75    ///
76    /// ```rust
77    /// use concurrent_pool::Pool;
78    ///
79    /// let pool: Pool<u32> = Pool::new(2, 5);
80    /// assert_eq!(pool.available(), 5);
81    /// assert_eq!(pool.available_noalloc(), 2);
82    /// let item = pool.pull().unwrap();
83    /// assert_eq!(pool.available_noalloc(), 1);
84    /// ```
85    pub fn new(prealloc: usize, capacity: usize) -> Self {
86        Self::with_config(Config {
87            capacity,
88            prealloc,
89            ..Default::default()
90        })
91    }
92
93    /// Create a new pool with the given capacity.
94    ///
95    /// # Example
96    ///
97    /// ```rust
98    /// use concurrent_pool::Pool;
99    ///
100    /// let pool: Pool<u32> = Pool::with_capacity(10);
101    /// assert_eq!(pool.available(), 10);
102    /// assert_eq!(pool.available_noalloc(), 10);
103    /// let item = pool.pull().unwrap();
104    /// assert_eq!(pool.available(), 9);
105    /// ```
106    pub fn with_capacity(capacity: usize) -> Self {
107        Self::new(capacity, capacity)
108    }
109
110    /// Create a new pool with half of the capacity preallocated.
111    ///
112    /// # Example
113    ///
114    /// ```rust
115    /// use concurrent_pool::Pool;
116    ///
117    /// let pool: Pool<u32> = Pool::with_capacity_half_prealloc(10);
118    /// assert_eq!(pool.available(), 10);
119    /// assert_eq!(pool.available_noalloc(), 5);
120    /// let item = pool.pull().unwrap();
121    /// assert_eq!(pool.available_noalloc(), 4);
122    /// assert_eq!(pool.in_use(), 1);
123    /// ```
124    pub fn with_capacity_half_prealloc(capacity: usize) -> Self {
125        Self::new(capacity / 2, capacity)
126    }
127
128    /// Create a new pool with the given configuration.
129    ///
130    /// # Example
131    ///
132    /// ```rust
133    /// use concurrent_pool::{Pool, Config};
134    ///
135    /// fn clear_func(x: &mut String) {
136    ///     x.clear();
137    /// }
138    ///
139    /// let mut config = Config::default();
140    /// config.capacity = 1;
141    /// config.clear_func = Some(clear_func);
142    /// let pool: Pool<String> = Pool::with_config(config);
143    /// let item = pool.pull_with(|s| s.push_str("Hello, World!")).unwrap();
144    /// assert_eq!(&*item, "Hello, World!");
145    /// drop(item);
146    /// let item2 = pool.pull().unwrap();
147    /// assert_eq!(&*item2, "");
148    /// ```
149    pub fn with_config(mut config: Config<T>) -> Self {
150        config.post_process();
151        let prealloc = config.prealloc;
152        assert!(
153            prealloc <= config.capacity,
154            "prealloc must be less than or equal to capacity"
155        );
156
157        let pool = Self {
158            queue: ArrayQueue::new(config.capacity),
159            allocated: AtomicUsize::new(prealloc),
160            fastpulls: AtomicUsize::new(0),
161            additional_allocated: AtomicBool::new(false),
162            config,
163        };
164        let mut items = Vec::with_capacity(prealloc);
165        for _ in 0..prealloc {
166            items.push(T::default());
167        }
168        while let Some(item) = items.pop() {
169            let _ = pool.queue.push(Prc::new_zero(item));
170        }
171        pool
172    }
173
174    /// Get in used items count.
175    ///
176    /// # Example
177    ///
178    /// ```rust
179    /// use concurrent_pool::Pool;
180    ///
181    /// let pool: Pool<u32> = Pool::with_capacity(10);
182    /// assert_eq!(pool.in_use(), 0);
183    /// let item = pool.pull().unwrap();
184    /// assert_eq!(pool.in_use(), 1);
185    /// let item2 = pool.pull().unwrap();
186    /// assert_eq!(pool.in_use(), 2);
187    /// ```
188    pub fn in_use(&self) -> usize {
189        self.allocated.load(Relaxed) - self.queue.len()
190    }
191
192    /// Get allocated items count.
193    ///
194    /// # Example
195    ///
196    /// ```rust
197    /// use concurrent_pool::Pool;
198    ///
199    /// let pool = Pool::<usize>::new(2, 5);
200    /// assert_eq!(pool.allocated(), 2);
201    /// ```
202    pub fn allocated(&self) -> usize {
203        self.allocated.load(Acquire)
204    }
205
206    /// Get available items count.
207    ///
208    /// # Example
209    ///
210    /// ```rust
211    /// use concurrent_pool::Pool;
212    ///
213    /// let pool: Pool<u32> = Pool::with_capacity(10);
214    /// assert_eq!(pool.available(), 10);
215    /// let item = pool.pull().unwrap();
216    /// assert_eq!(pool.available(), 9);
217    /// ```
218    pub fn available(&self) -> usize {
219        self.config.capacity - self.in_use()
220    }
221
222    /// Get available items count without allocation.
223    ///
224    /// # Example
225    ///
226    /// ```rust
227    /// use concurrent_pool::Pool;
228    ///
229    /// let pool: Pool<u32> = Pool::new(2, 5);
230    /// assert_eq!(pool.available_noalloc(), 2);
231    /// let item = pool.pull().unwrap();
232    /// assert_eq!(pool.available_noalloc(), 1);
233    /// let item2 = pool.pull().unwrap();
234    /// assert_eq!(pool.available_noalloc(), 0);
235    /// let item3 = pool.pull().unwrap();
236    /// assert_eq!(pool.available_noalloc(), 0);
237    /// drop(item);
238    /// assert_eq!(pool.available_noalloc(), 1);
239    /// ```
240    pub fn available_noalloc(&self) -> usize {
241        self.queue.len()
242    }
243
244    /// Check if the pool is empty.
245    ///
246    /// # Example
247    ///
248    /// ```rust
249    /// use concurrent_pool::Pool;
250    ///
251    /// let pool: Pool<u32> = Pool::with_capacity(2);
252    /// assert!(!pool.is_empty());
253    /// let item1 = pool.pull().unwrap();
254    /// assert!(!pool.is_empty());
255    /// let item2 = pool.pull().unwrap();
256    /// assert!(pool.is_empty());
257    /// drop(item1);
258    /// assert!(!pool.is_empty());
259    /// ```
260    pub fn is_empty(&self) -> bool {
261        self.available() == 0
262    }
263
264    /// Get the capacity of the pool.
265    ///
266    /// # Example
267    ///
268    /// ```rust
269    /// use concurrent_pool::Pool;
270    ///
271    /// let pool: Pool<u32> = Pool::with_capacity(10);
272    /// assert_eq!(pool.capacity(), 10);
273    /// ```
274    pub fn capacity(&self) -> usize {
275        self.config.capacity
276    }
277
278    /// Pull an item from the pool. Return `None` if the pool is empty.
279    ///
280    /// # Example
281    ///
282    /// ```rust
283    /// use concurrent_pool::Pool;
284    ///
285    /// let pool: Pool<u32> = Pool::with_capacity(2);
286    /// let item1 = pool.pull().unwrap();
287    /// assert_eq!(*item1, 0);
288    /// ```
289    pub fn pull(&self) -> Option<Entry<'_, T>> {
290        self.pull_inner().map(|item| Entry {
291            item: Some(item),
292            pool: self,
293        })
294    }
295
296    /// Pull an item from the pool and apply a function to it. Return `None` if the pool is empty.
297    ///
298    /// # Example
299    ///
300    /// ```rust
301    /// use concurrent_pool::Pool;
302    ///
303    /// let pool: Pool<u32> = Pool::with_capacity(2);
304    /// let item1 = pool.pull_with(|x| *x = 42).unwrap();
305    /// assert_eq!(*item1, 42);
306    /// ```
307    pub fn pull_with<F>(&self, func: F) -> Option<Entry<'_, T>>
308    where
309        F: FnOnce(&mut T),
310    {
311        self.pull().map(|mut entry| {
312            func(unsafe { entry.get_mut_unchecked() });
313            entry
314        })
315    }
316
317    /// Pull an owned item from the pool. Return `None` if the pool is empty.
318    ///
319    /// # Example
320    ///
321    /// ```rust
322    /// use concurrent_pool::Pool;
323    /// use std::sync::Arc;
324    ///
325    /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
326    /// let item1 = pool.pull_owned().unwrap();
327    /// assert_eq!(*item1, 0);
328    /// ```
329    pub fn pull_owned(self: &Arc<Self>) -> Option<OwnedEntry<T>> {
330        self.pull_inner().map(|item| crate::OwnedEntry {
331            item: Some(item),
332            pool: self.clone(),
333        })
334    }
335
336    /// Pull an owned item from the pool and apply a function to it. Return `None` if the pool is empty.
337    ///
338    /// # Example
339    ///
340    /// ```rust
341    /// use concurrent_pool::Pool;
342    /// use std::sync::Arc;
343    ///
344    /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
345    /// let item1 = pool.pull_owned_with(|x| *x = 42).unwrap();
346    /// assert_eq!(*item1, 42);
347    /// ```
348    pub fn pull_owned_with<F>(self: &Arc<Self>, func: F) -> Option<OwnedEntry<T>>
349    where
350        F: FnOnce(&mut T),
351    {
352        self.pull_owned().map(|mut entry| {
353            func(unsafe { entry.get_mut_unchecked() });
354            entry
355        })
356    }
357
358    /// Internal method to pull an item from the pool.
359    fn pull_inner(&self) -> Option<Prc<T>> {
360        match self.queue.pop() {
361            None => {
362                if !self.additional_allocated.load(Relaxed) {
363                    self.additional_allocated.store(true, Relaxed);
364                }
365                if self.config.need_process_reclamation {
366                    self.fastpulls.store(0, SeqCst);
367                }
368                if self.allocated.fetch_add(1, Relaxed) + 1 <= self.config.capacity {
369                    Some(Prc::new(T::default()))
370                } else {
371                    None
372                }
373            }
374            Some(item) => {
375                if self.config.need_process_reclamation {
376                    let left = self.queue.len();
377                    if left >= self.config.idle_threshold_for_fastpull {
378                        let fastpulls = self.fastpulls.fetch_add(1, Relaxed) + 1;
379                        if fastpulls >= self.config.fastpull_threshold_for_reclaim
380                            && self.additional_allocated.load(Relaxed)
381                        {
382                            self.reclaim();
383                        }
384                    } else {
385                        self.fastpulls.store(0, Relaxed);
386                    }
387                }
388                item.inc_ref();
389                Some(item)
390            }
391        }
392    }
393
394    /// Reclaim an item from the pool to reduce memory usage.
395    fn reclaim(&self) {
396        if let Some(item) = self.queue.pop() {
397            unsafe { item.drop_slow() };
398            let current = self.allocated.fetch_sub(1, Release) - 1;
399            if self.config.need_process_reclamation && current <= self.config.prealloc {
400                if self.additional_allocated.load(Relaxed) {
401                    self.additional_allocated.store(false, Relaxed);
402                }
403            }
404        }
405    }
406
407    /// Recycle an item back into the pool.
408    pub(crate) fn recycle(&self, mut item: Prc<T>) {
409        if let Some(func) = &self.config.clear_func {
410            func(unsafe { Prc::get_mut_unchecked(&mut item) })
411        }
412        if self.queue.push(item).is_err() {
413            panic!("It is imposible that the pool is full when recycling an item");
414        }
415    }
416}
417
418/// Configuration for the pool.
419pub struct Config<T: Default> {
420    /// Maximum capacity of the pool.
421    pub capacity: usize,
422    /// Number of items to preallocate.
423    pub prealloc: usize,
424    /// Whether to automatically reclaim allocated items and free them to reduce memory usage.
425    pub auto_reclaim: bool,
426    /// Threshold of `fast-pull` continuous occurrence to trigger reclamation
427    /// when `auto_reclaim` is enabled.
428    pub fastpull_threshold_for_reclaim: usize,
429    /// Threshold for idle items to judge as a fast-pull when `auto_reclaim` is enabled.
430    pub idle_threshold_for_fastpull: usize,
431    /// Optional function to clear or reset an item before it is reused.
432    pub clear_func: Option<fn(&mut T)>,
433    /// Internal flag to indicate if the pool needs to process reclamation.
434    need_process_reclamation: bool,
435}
436
437impl<T: Default> Default for Config<T> {
438    fn default() -> Self {
439        Self {
440            capacity: 1024,
441            prealloc: 0,
442            auto_reclaim: false,
443            clear_func: None,
444            fastpull_threshold_for_reclaim: 0,
445            idle_threshold_for_fastpull: 0,
446            need_process_reclamation: false,
447        }
448    }
449}
450
451impl<T: Default> Config<T> {
452    pub(crate) fn post_process(&mut self) {
453        if self.idle_threshold_for_fastpull == 0 {
454            self.idle_threshold_for_fastpull = max(1, self.capacity / 20);
455        }
456
457        if self.auto_reclaim && self.prealloc != self.capacity {
458            self.need_process_reclamation = true;
459        } else {
460            self.need_process_reclamation = false;
461        }
462    }
463}