concurrent_pool/
pool.rs

1use std::cmp::max;
2use std::sync::Arc;
3use std::sync::atomic::Ordering::*;
4use std::sync::atomic::{AtomicBool, AtomicUsize};
5
6use crossbeam_queue::ArrayQueue;
7
8use crate::entry::Prc;
9use crate::{Entry, OwnedEntry};
10
11/// A concurrent object pool.
12///
13/// # Examples
14///
15/// ```rust
16/// use concurrent_pool::{Pool, Builder};
17/// use std::sync::{Arc, mpsc};
18///
19/// let mut builder = Builder::new();
20///
21/// let pool: Arc<Pool<String>> = Arc::new(builder.capacity(10).clear_func(String::clear).build());
22///
23///
24/// let (tx, rx) = mpsc::channel();
25/// let clone_pool = pool.clone();
26/// let tx1 = tx.clone();
27/// let sender1 = std::thread::spawn(move || {
28///     let item = clone_pool.pull_owned_with(|x| x.push_str("1")).unwrap();
29///     tx1.send((1, item)).unwrap();
30/// });
31///
32/// let clone_pool = pool.clone();
33/// let sender2 = std::thread::spawn(move || {
34///     let item = clone_pool.pull_owned_with(|x| x.push_str("2")).unwrap();
35///     tx.send((2, item)).unwrap();
36/// });
37///
38/// let receiver = std::thread::spawn(move || {
39///     for _ in 0..2 {
40///         let (id, item) = rx.recv().unwrap();
41///         if id == 1 {
42///             assert_eq!(*item, "1");
43///         } else {
44///             assert_eq!(*item, "2");
45///         }
46///     }
47/// });
48///
49/// sender1.join().unwrap();
50/// sender2.join().unwrap();
51/// receiver.join().unwrap();
52/// ```
53pub struct Pool<T: Default> {
54    /// Configuration of the pool.
55    config: Config<T>,
56    /// Inner queue holding the pooled items.
57    queue: ArrayQueue<Prc<T>>,
58    /// Number of items currently allocated.
59    allocated: AtomicUsize,
60    /// Number of currently continues `surplus-pull` times
61    surpluspulls: AtomicUsize,
62    /// Whether an additional item has been allocated beyond the preallocated items.
63    additional_allocated: AtomicBool,
64}
65
66impl<T: Default> Drop for Pool<T> {
67    fn drop(&mut self) {
68        while let Some(item) = self.queue.pop() {
69            unsafe { item.drop_slow() };
70        }
71    }
72}
73
74impl<T: Default> Pool<T> {
75    /// Create a new pool with the given preallocation and capacity.
76    ///
77    /// # Example
78    ///
79    /// ```rust
80    /// use concurrent_pool::Pool;
81    ///
82    /// let pool: Pool<u32> = Pool::new(2, 5);
83    /// assert_eq!(pool.available(), 5);
84    /// assert_eq!(pool.available_noalloc(), 2);
85    /// let item = pool.pull().unwrap();
86    /// assert_eq!(pool.available_noalloc(), 1);
87    /// ```
88    pub fn new(prealloc: usize, capacity: usize) -> Self {
89        Self::with_config(Config {
90            capacity,
91            prealloc,
92            ..Default::default()
93        })
94    }
95
96    /// Create a new pool with the given capacity.
97    ///
98    /// # Example
99    ///
100    /// ```rust
101    /// use concurrent_pool::Pool;
102    ///
103    /// let pool: Pool<u32> = Pool::with_capacity(10);
104    /// assert_eq!(pool.available(), 10);
105    /// assert_eq!(pool.available_noalloc(), 10);
106    /// let item = pool.pull().unwrap();
107    /// assert_eq!(pool.available(), 9);
108    /// ```
109    pub fn with_capacity(capacity: usize) -> Self {
110        Self::new(capacity, capacity)
111    }
112
113    /// Create a new pool with half of the capacity preallocated.
114    ///
115    /// # Example
116    ///
117    /// ```rust
118    /// use concurrent_pool::Pool;
119    ///
120    /// let pool: Pool<u32> = Pool::with_capacity_half_prealloc(10);
121    /// assert_eq!(pool.available(), 10);
122    /// assert_eq!(pool.available_noalloc(), 5);
123    /// let item = pool.pull().unwrap();
124    /// assert_eq!(pool.available_noalloc(), 4);
125    /// assert_eq!(pool.in_use(), 1);
126    /// ```
127    pub fn with_capacity_half_prealloc(capacity: usize) -> Self {
128        Self::new(capacity / 2, capacity)
129    }
130
131    /// Create a new pool with the given configuration.
132    ///
133    /// # Example
134    ///
135    /// ```rust
136    /// use concurrent_pool::{Pool, Config};
137    ///
138    /// fn clear_func(x: &mut String) {
139    ///     x.clear();
140    /// }
141    ///
142    /// let mut config = Config::default();
143    /// config.capacity = 1;
144    /// config.clear_func = Some(clear_func);
145    /// let pool: Pool<String> = Pool::with_config(config);
146    /// let item = pool.pull_with(|s| s.push_str("Hello, World!")).unwrap();
147    /// assert_eq!(&*item, "Hello, World!");
148    /// drop(item);
149    /// let item2 = pool.pull().unwrap();
150    /// assert_eq!(&*item2, "");
151    /// ```
152    pub fn with_config(mut config: Config<T>) -> Self {
153        config.post_process();
154        let prealloc = config.prealloc;
155        assert!(
156            prealloc <= config.capacity,
157            "prealloc must be less than or equal to capacity"
158        );
159
160        let queue_len = max(1, config.capacity);
161        let pool = Self {
162            queue: ArrayQueue::new(queue_len),
163            allocated: AtomicUsize::new(prealloc),
164            surpluspulls: AtomicUsize::new(0),
165            additional_allocated: AtomicBool::new(false),
166            config,
167        };
168        let mut items = Vec::with_capacity(prealloc);
169        for _ in 0..prealloc {
170            items.push(T::default());
171        }
172        while let Some(item) = items.pop() {
173            let _ = pool.queue.push(Prc::new_zero(item));
174        }
175        pool
176    }
177
178    /// Get in used items count.
179    ///
180    /// # Example
181    ///
182    /// ```rust
183    /// use concurrent_pool::Pool;
184    ///
185    /// let pool: Pool<u32> = Pool::with_capacity(10);
186    /// assert_eq!(pool.in_use(), 0);
187    /// let item = pool.pull().unwrap();
188    /// assert_eq!(pool.in_use(), 1);
189    /// let item2 = pool.pull().unwrap();
190    /// assert_eq!(pool.in_use(), 2);
191    /// ```
192    pub fn in_use(&self) -> usize {
193        self.allocated.load(Relaxed) - self.queue.len()
194    }
195
196    /// Get allocated items count.
197    ///
198    /// # Example
199    ///
200    /// ```rust
201    /// use concurrent_pool::Pool;
202    ///
203    /// let pool = Pool::<usize>::new(2, 5);
204    /// assert_eq!(pool.allocated(), 2);
205    /// ```
206    pub fn allocated(&self) -> usize {
207        self.allocated.load(Acquire)
208    }
209
210    /// Get available items count.
211    ///
212    /// # Example
213    ///
214    /// ```rust
215    /// use concurrent_pool::Pool;
216    ///
217    /// let pool: Pool<u32> = Pool::with_capacity(10);
218    /// assert_eq!(pool.available(), 10);
219    /// let item = pool.pull().unwrap();
220    /// assert_eq!(pool.available(), 9);
221    /// ```
222    pub fn available(&self) -> usize {
223        self.config.capacity - self.in_use()
224    }
225
226    /// Get available items count without allocation.
227    ///
228    /// # Example
229    ///
230    /// ```rust
231    /// use concurrent_pool::Pool;
232    ///
233    /// let pool: Pool<u32> = Pool::new(2, 5);
234    /// assert_eq!(pool.available_noalloc(), 2);
235    /// let item = pool.pull().unwrap();
236    /// assert_eq!(pool.available_noalloc(), 1);
237    /// let item2 = pool.pull().unwrap();
238    /// assert_eq!(pool.available_noalloc(), 0);
239    /// let item3 = pool.pull().unwrap();
240    /// assert_eq!(pool.available_noalloc(), 0);
241    /// drop(item);
242    /// assert_eq!(pool.available_noalloc(), 1);
243    /// ```
244    pub fn available_noalloc(&self) -> usize {
245        self.queue.len()
246    }
247
248    /// Check if the pool is empty.
249    ///
250    /// # Example
251    ///
252    /// ```rust
253    /// use concurrent_pool::Pool;
254    ///
255    /// let pool: Pool<u32> = Pool::with_capacity(2);
256    /// assert!(!pool.is_empty());
257    /// let item1 = pool.pull().unwrap();
258    /// assert!(!pool.is_empty());
259    /// let item2 = pool.pull().unwrap();
260    /// assert!(pool.is_empty());
261    /// drop(item1);
262    /// assert!(!pool.is_empty());
263    /// ```
264    pub fn is_empty(&self) -> bool {
265        self.available() == 0
266    }
267
268    /// Get the capacity of the pool.
269    ///
270    /// # Example
271    ///
272    /// ```rust
273    /// use concurrent_pool::Pool;
274    ///
275    /// let pool: Pool<u32> = Pool::with_capacity(10);
276    /// assert_eq!(pool.capacity(), 10);
277    /// ```
278    pub fn capacity(&self) -> usize {
279        self.config.capacity
280    }
281
282    /// Pull an item from the pool. Return `None` if the pool is empty.
283    ///
284    /// # Example
285    ///
286    /// ```rust
287    /// use concurrent_pool::Pool;
288    ///
289    /// let pool: Pool<u32> = Pool::with_capacity(2);
290    /// let item1 = pool.pull().unwrap();
291    /// assert_eq!(*item1, 0);
292    /// ```
293    pub fn pull(&self) -> Option<Entry<'_, T>> {
294        self.pull_inner().map(|item| Entry {
295            item: Some(item),
296            pool: self,
297        })
298    }
299
300    /// Pull an item from the pool and apply a function to it. Return `None` if the pool is empty.
301    ///
302    /// # Example
303    ///
304    /// ```rust
305    /// use concurrent_pool::Pool;
306    ///
307    /// let pool: Pool<u32> = Pool::with_capacity(2);
308    /// let item1 = pool.pull_with(|x| *x = 42).unwrap();
309    /// assert_eq!(*item1, 42);
310    /// ```
311    pub fn pull_with<F>(&self, func: F) -> Option<Entry<'_, T>>
312    where
313        F: FnOnce(&mut T),
314    {
315        self.pull().map(|mut entry| {
316            func(unsafe { entry.get_mut_unchecked() });
317            entry
318        })
319    }
320
321    /// Pull an owned item from the pool. Return `None` if the pool is empty.
322    ///
323    /// # Example
324    ///
325    /// ```rust
326    /// use concurrent_pool::Pool;
327    /// use std::sync::Arc;
328    ///
329    /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
330    /// let item1 = pool.pull_owned().unwrap();
331    /// assert_eq!(*item1, 0);
332    /// ```
333    pub fn pull_owned(self: &Arc<Self>) -> Option<OwnedEntry<T>> {
334        self.pull_inner().map(|item| crate::OwnedEntry {
335            item: Some(item),
336            pool: self.clone(),
337        })
338    }
339
340    /// Pull an owned item from the pool and apply a function to it. Return `None` if the pool is empty.
341    ///
342    /// # Example
343    ///
344    /// ```rust
345    /// use concurrent_pool::Pool;
346    /// use std::sync::Arc;
347    ///
348    /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
349    /// let item1 = pool.pull_owned_with(|x| *x = 42).unwrap();
350    /// assert_eq!(*item1, 42);
351    /// ```
352    pub fn pull_owned_with<F>(self: &Arc<Self>, func: F) -> Option<OwnedEntry<T>>
353    where
354        F: FnOnce(&mut T),
355    {
356        self.pull_owned().map(|mut entry| {
357            func(unsafe { entry.get_mut_unchecked() });
358            entry
359        })
360    }
361
362    /// Internal method to pull an item from the pool.
363    fn pull_inner(&self) -> Option<Prc<T>> {
364        match self.queue.pop() {
365            None => {
366                if !self.additional_allocated.load(Relaxed) {
367                    self.additional_allocated.store(true, Relaxed);
368                }
369                if self.config.need_process_reclamation {
370                    self.surpluspulls.store(0, SeqCst);
371                }
372                if self.allocated.load(Acquire) < self.config.capacity {
373                    self.allocated.fetch_add(1, Relaxed);
374                    Some(Prc::new(T::default()))
375                } else {
376                    None
377                }
378            }
379            Some(item) => {
380                if self.config.need_process_reclamation {
381                    let left = self.queue.len();
382                    if left >= self.config.idle_threshold_for_surpluspull {
383                        let surpluspulls = self.surpluspulls.fetch_add(1, Relaxed) + 1;
384                        if surpluspulls >= self.config.surpluspull_threshold_for_reclaim
385                            && self.additional_allocated.load(Relaxed)
386                        {
387                            self.reclaim();
388                        }
389                    } else {
390                        self.surpluspulls.store(0, Relaxed);
391                    }
392                }
393                item.inc_ref();
394                Some(item)
395            }
396        }
397    }
398
399    /// Reclaim an item from the pool to reduce memory usage.
400    fn reclaim(&self) {
401        if let Some(item) = self.queue.pop() {
402            unsafe { item.drop_slow() };
403            let current = self.allocated.fetch_sub(1, Release) - 1;
404            if self.config.need_process_reclamation && current <= self.config.prealloc {
405                if self.additional_allocated.load(Relaxed) {
406                    self.additional_allocated.store(false, Relaxed);
407                }
408            }
409        }
410    }
411
412    /// Recycle an item back into the pool.
413    pub(crate) fn recycle(&self, mut item: Prc<T>) {
414        if let Some(func) = &self.config.clear_func {
415            func(unsafe { Prc::get_mut_unchecked(&mut item) })
416        }
417        if self.queue.push(item).is_err() {
418            panic!("It is imposible that the pool is full when recycling an item");
419        }
420    }
421}
422
423/// Configuration for the pool.
424pub struct Config<T: Default> {
425    /// Maximum capacity of the pool.
426    pub capacity: usize,
427    /// Number of items to preallocate.
428    pub prealloc: usize,
429    /// Whether to automatically reclaim allocated items and free them to reduce memory usage.
430    pub auto_reclaim: bool,
431    /// Threshold of `surplus-pull` continuous occurrence to trigger reclamation
432    /// when `auto_reclaim` is enabled.
433    pub surpluspull_threshold_for_reclaim: usize,
434    /// Threshold for idle items to judge as a surplus-pull when `auto_reclaim` is enabled.
435    pub idle_threshold_for_surpluspull: usize,
436    /// Optional function to clear or reset an item before it is reused.
437    pub clear_func: Option<fn(&mut T)>,
438    /// Internal flag to indicate if the pool needs to process reclamation.
439    need_process_reclamation: bool,
440}
441
442impl<T: Default> Default for Config<T> {
443    fn default() -> Self {
444        Self {
445            capacity: 1024,
446            prealloc: 0,
447            auto_reclaim: false,
448            clear_func: None,
449            surpluspull_threshold_for_reclaim: 0,
450            idle_threshold_for_surpluspull: 0,
451            need_process_reclamation: false,
452        }
453    }
454}
455
456impl<T: Default> Config<T> {
457    pub(crate) fn post_process(&mut self) {
458        if self.idle_threshold_for_surpluspull == 0 {
459            self.idle_threshold_for_surpluspull = max(1, self.capacity / 20);
460        }
461
462        if self.surpluspull_threshold_for_reclaim == 0 {
463            self.surpluspull_threshold_for_reclaim = max(2, self.capacity / 100);
464        }
465
466        if self.auto_reclaim && self.prealloc != self.capacity {
467            self.need_process_reclamation = true;
468        } else {
469            self.need_process_reclamation = false;
470        }
471    }
472}