concurrent_pool/
pool.rs

1use std::cmp::max;
2use std::sync::Arc;
3use std::sync::atomic::Ordering::*;
4use std::sync::atomic::{AtomicBool, AtomicUsize};
5
6use crossbeam_queue::ArrayQueue;
7
8use crate::entry::Prc;
9use crate::{Entry, OwnedEntry};
10
11/// A concurrent object pool.
12///
13/// # Examples
14///
15/// ```rust
16/// use concurrent_pool::{Pool, Builder};
17/// use std::sync::{Arc, mpsc};
18///
19/// let mut builder = Builder::new();
20///
21/// let pool: Arc<Pool<String>> = Arc::new(builder.capacity(10).clear_func(String::clear).build());
22///
23///
24/// let (tx, rx) = mpsc::channel();
25/// let clone_pool = pool.clone();
26/// let tx1 = tx.clone();
27/// let sender1 = std::thread::spawn(move || {
28///     let item = clone_pool.pull_owned_with(|x| x.push_str("1")).unwrap();
29///     tx1.send((1, item)).unwrap();
30/// });
31///
32/// let clone_pool = pool.clone();
33/// let sender2 = std::thread::spawn(move || {
34///     let item = clone_pool.pull_owned_with(|x| x.push_str("2")).unwrap();
35///     tx.send((2, item)).unwrap();
36/// });
37///
38/// let receiver = std::thread::spawn(move || {
39///     for _ in 0..2 {
40///         let (id, item) = rx.recv().unwrap();
41///         if id == 1 {
42///             assert_eq!(*item, "1");
43///         } else {
44///             assert_eq!(*item, "2");
45///         }
46///     }
47/// });
48///
49/// sender1.join().unwrap();
50/// sender2.join().unwrap();
51/// receiver.join().unwrap();
52/// ```
53#[derive(Debug)]
54pub struct Pool<T: Default> {
55    /// Configuration of the pool.
56    config: Config<T>,
57    /// Inner queue holding the pooled items.
58    queue: ArrayQueue<Prc<T>>,
59    /// Number of items currently allocated.
60    allocated: AtomicUsize,
61    /// Number of currently continues `surplus-pull` times
62    surpluspulls: AtomicUsize,
63    /// Whether an additional item has been allocated beyond the preallocated items.
64    additional_allocated: AtomicBool,
65}
66
67impl<T: Default> Drop for Pool<T> {
68    fn drop(&mut self) {
69        while let Some(item) = self.queue.pop() {
70            unsafe { item.drop_slow() };
71        }
72    }
73}
74
75impl<T: Default> Pool<T> {
76    /// Create a new pool with the given preallocation and capacity.
77    ///
78    /// # Example
79    ///
80    /// ```rust
81    /// use concurrent_pool::Pool;
82    ///
83    /// let pool: Pool<u32> = Pool::new(2, 5);
84    /// assert_eq!(pool.available(), 5);
85    /// assert_eq!(pool.available_noalloc(), 2);
86    /// let item = pool.pull().unwrap();
87    /// assert_eq!(pool.available_noalloc(), 1);
88    /// ```
89    pub fn new(prealloc: usize, capacity: usize) -> Self {
90        Self::with_config(Config {
91            capacity,
92            prealloc,
93            ..Default::default()
94        })
95    }
96
97    /// Create a new pool with the given capacity.
98    ///
99    /// # Example
100    ///
101    /// ```rust
102    /// use concurrent_pool::Pool;
103    ///
104    /// let pool: Pool<u32> = Pool::with_capacity(10);
105    /// assert_eq!(pool.available(), 10);
106    /// assert_eq!(pool.available_noalloc(), 10);
107    /// let item = pool.pull().unwrap();
108    /// assert_eq!(pool.available(), 9);
109    /// ```
110    pub fn with_capacity(capacity: usize) -> Self {
111        Self::new(capacity, capacity)
112    }
113
114    /// Create a new pool with half of the capacity preallocated.
115    ///
116    /// # Example
117    ///
118    /// ```rust
119    /// use concurrent_pool::Pool;
120    ///
121    /// let pool: Pool<u32> = Pool::with_capacity_half_prealloc(10);
122    /// assert_eq!(pool.available(), 10);
123    /// assert_eq!(pool.available_noalloc(), 5);
124    /// let item = pool.pull().unwrap();
125    /// assert_eq!(pool.available_noalloc(), 4);
126    /// assert_eq!(pool.in_use(), 1);
127    /// ```
128    pub fn with_capacity_half_prealloc(capacity: usize) -> Self {
129        Self::new(capacity / 2, capacity)
130    }
131
132    /// Create a new pool with the given configuration.
133    ///
134    /// # Example
135    ///
136    /// ```rust
137    /// use concurrent_pool::{Pool, Config};
138    ///
139    /// fn clear_func(x: &mut String) {
140    ///     x.clear();
141    /// }
142    ///
143    /// let mut config = Config::default();
144    /// config.capacity = 1;
145    /// config.clear_func = Some(clear_func);
146    /// let pool: Pool<String> = Pool::with_config(config);
147    /// let item = pool.pull_with(|s| s.push_str("Hello, World!")).unwrap();
148    /// assert_eq!(&*item, "Hello, World!");
149    /// drop(item);
150    /// let item2 = pool.pull().unwrap();
151    /// assert_eq!(&*item2, "");
152    /// ```
153    pub fn with_config(mut config: Config<T>) -> Self {
154        config.post_process();
155        let prealloc = config.prealloc;
156        assert!(
157            prealloc <= config.capacity,
158            "prealloc must be less than or equal to capacity"
159        );
160
161        let queue_len = max(1, config.capacity);
162        let pool = Self {
163            queue: ArrayQueue::new(queue_len),
164            allocated: AtomicUsize::new(prealloc),
165            surpluspulls: AtomicUsize::new(0),
166            additional_allocated: AtomicBool::new(false),
167            config,
168        };
169        let mut items = Vec::with_capacity(prealloc);
170        for _ in 0..prealloc {
171            items.push(T::default());
172        }
173        while let Some(item) = items.pop() {
174            let _ = pool.queue.push(Prc::new_zero(item));
175        }
176        pool
177    }
178
179    /// Enable automatic reclamation of allocated items to reduce memory usage.
180    pub fn enable_auto_reclaim(&mut self) {
181        self.config.auto_reclaim = true;
182        self.config.post_process();
183    }
184
185    /// Get in used items count.
186    ///
187    /// # Example
188    ///
189    /// ```rust
190    /// use concurrent_pool::Pool;
191    ///
192    /// let pool: Pool<u32> = Pool::with_capacity(10);
193    /// assert_eq!(pool.in_use(), 0);
194    /// let item = pool.pull().unwrap();
195    /// assert_eq!(pool.in_use(), 1);
196    /// let item2 = pool.pull().unwrap();
197    /// assert_eq!(pool.in_use(), 2);
198    /// ```
199    pub fn in_use(&self) -> usize {
200        self.allocated.load(Relaxed) - self.queue.len()
201    }
202
203    /// Get allocated items count.
204    ///
205    /// # Example
206    ///
207    /// ```rust
208    /// use concurrent_pool::Pool;
209    ///
210    /// let pool = Pool::<usize>::new(2, 5);
211    /// assert_eq!(pool.allocated(), 2);
212    /// ```
213    pub fn allocated(&self) -> usize {
214        self.allocated.load(Acquire)
215    }
216
217    /// Get available items count.
218    ///
219    /// # Example
220    ///
221    /// ```rust
222    /// use concurrent_pool::Pool;
223    ///
224    /// let pool: Pool<u32> = Pool::with_capacity(10);
225    /// assert_eq!(pool.available(), 10);
226    /// let item = pool.pull().unwrap();
227    /// assert_eq!(pool.available(), 9);
228    /// ```
229    pub fn available(&self) -> usize {
230        self.config.capacity - self.in_use()
231    }
232
233    /// Get available items count without allocation.
234    ///
235    /// # Example
236    ///
237    /// ```rust
238    /// use concurrent_pool::Pool;
239    ///
240    /// let pool: Pool<u32> = Pool::new(2, 5);
241    /// assert_eq!(pool.available_noalloc(), 2);
242    /// let item = pool.pull().unwrap();
243    /// assert_eq!(pool.available_noalloc(), 1);
244    /// let item2 = pool.pull().unwrap();
245    /// assert_eq!(pool.available_noalloc(), 0);
246    /// let item3 = pool.pull().unwrap();
247    /// assert_eq!(pool.available_noalloc(), 0);
248    /// drop(item);
249    /// assert_eq!(pool.available_noalloc(), 1);
250    /// ```
251    pub fn available_noalloc(&self) -> usize {
252        self.queue.len()
253    }
254
255    /// Check if the pool is empty.
256    ///
257    /// # Example
258    ///
259    /// ```rust
260    /// use concurrent_pool::Pool;
261    ///
262    /// let pool: Pool<u32> = Pool::with_capacity(2);
263    /// assert!(!pool.is_empty());
264    /// let item1 = pool.pull().unwrap();
265    /// assert!(!pool.is_empty());
266    /// let item2 = pool.pull().unwrap();
267    /// assert!(pool.is_empty());
268    /// drop(item1);
269    /// assert!(!pool.is_empty());
270    /// ```
271    pub fn is_empty(&self) -> bool {
272        self.available() == 0
273    }
274
275    /// Get the capacity of the pool.
276    ///
277    /// # Example
278    ///
279    /// ```rust
280    /// use concurrent_pool::Pool;
281    ///
282    /// let pool: Pool<u32> = Pool::with_capacity(10);
283    /// assert_eq!(pool.capacity(), 10);
284    /// ```
285    pub fn capacity(&self) -> usize {
286        self.config.capacity
287    }
288
289    /// Pull an item from the pool. Return `None` if the pool is empty.
290    ///
291    /// # Example
292    ///
293    /// ```rust
294    /// use concurrent_pool::Pool;
295    ///
296    /// let pool: Pool<u32> = Pool::with_capacity(2);
297    /// let item1 = pool.pull().unwrap();
298    /// assert_eq!(*item1, 0);
299    /// ```
300    pub fn pull(&self) -> Option<Entry<'_, T>> {
301        self.pull_inner().map(|item| Entry {
302            item: Some(item),
303            pool: self,
304        })
305    }
306
307    /// Pull an item from the pool and apply a function to it. Return `None` if the pool is empty.
308    ///
309    /// # Example
310    ///
311    /// ```rust
312    /// use concurrent_pool::Pool;
313    ///
314    /// let pool: Pool<u32> = Pool::with_capacity(2);
315    /// let item1 = pool.pull_with(|x| *x = 42).unwrap();
316    /// assert_eq!(*item1, 42);
317    /// ```
318    pub fn pull_with<F>(&self, func: F) -> Option<Entry<'_, T>>
319    where
320        F: FnOnce(&mut T),
321    {
322        self.pull().map(|mut entry| {
323            func(unsafe { entry.get_mut_unchecked() });
324            entry
325        })
326    }
327
328    /// Pull an owned item from the pool. Return `None` if the pool is empty.
329    ///
330    /// # Example
331    ///
332    /// ```rust
333    /// use concurrent_pool::Pool;
334    /// use std::sync::Arc;
335    ///
336    /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
337    /// let item1 = pool.pull_owned().unwrap();
338    /// assert_eq!(*item1, 0);
339    /// ```
340    pub fn pull_owned(self: &Arc<Self>) -> Option<OwnedEntry<T>> {
341        self.pull_inner().map(|item| crate::OwnedEntry {
342            item: Some(item),
343            pool: self.clone(),
344        })
345    }
346
347    /// Pull an owned item from the pool and apply a function to it. Return `None` if the pool is empty.
348    ///
349    /// # Example
350    ///
351    /// ```rust
352    /// use concurrent_pool::Pool;
353    /// use std::sync::Arc;
354    ///
355    /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
356    /// let item1 = pool.pull_owned_with(|x| *x = 42).unwrap();
357    /// assert_eq!(*item1, 42);
358    /// ```
359    pub fn pull_owned_with<F>(self: &Arc<Self>, func: F) -> Option<OwnedEntry<T>>
360    where
361        F: FnOnce(&mut T),
362    {
363        self.pull_owned().map(|mut entry| {
364            func(unsafe { entry.get_mut_unchecked() });
365            entry
366        })
367    }
368
369    /// Internal method to pull an item from the pool.
370    fn pull_inner(&self) -> Option<Prc<T>> {
371        match self.queue.pop() {
372            None => {
373                if !self.additional_allocated.load(Relaxed) {
374                    self.additional_allocated.store(true, Relaxed);
375                }
376                if self.config.need_process_reclamation {
377                    self.surpluspulls.store(0, SeqCst);
378                }
379
380                match self.allocated.fetch_update(AcqRel, Acquire, |current| {
381                    match current < self.config.capacity {
382                        true => Some(current + 1),
383                        false => None,
384                    }
385                }) {
386                    Ok(_) => Some(Prc::new(T::default())),
387                    Err(_) => None,
388                }
389            }
390            Some(item) => {
391                if self.config.need_process_reclamation {
392                    let left = self.queue.len();
393                    if left >= self.config.idle_threshold_for_surpluspull {
394                        let surpluspulls = self.surpluspulls.fetch_add(1, Relaxed) + 1;
395                        if surpluspulls >= self.config.surpluspull_threshold_for_reclaim
396                            && self.additional_allocated.load(Relaxed)
397                        {
398                            self.reclaim();
399                        }
400                    } else {
401                        self.surpluspulls.store(0, Relaxed);
402                    }
403                }
404                item.inc_ref();
405                Some(item)
406            }
407        }
408    }
409
410    /// Reclaim an item from the pool to reduce memory usage.
411    fn reclaim(&self) {
412        if let Some(item) = self.queue.pop() {
413            unsafe { item.drop_slow() };
414            let current = self.allocated.fetch_sub(1, Release) - 1;
415            if self.config.need_process_reclamation && current <= self.config.prealloc {
416                if self.additional_allocated.load(Relaxed) {
417                    self.additional_allocated.store(false, Relaxed);
418                }
419            }
420        }
421    }
422
423    /// Recycle an item back into the pool.
424    pub(crate) fn recycle(&self, mut item: Prc<T>) {
425        if let Some(func) = &self.config.clear_func {
426            func(unsafe { Prc::get_mut_unchecked(&mut item) })
427        }
428        if self.queue.push(item).is_err() {
429            panic!("It is imposible that the pool is full when recycling an item");
430        }
431    }
432}
433
434/// Configuration for the pool.
435#[derive(Debug)]
436pub struct Config<T: Default> {
437    /// Maximum capacity of the pool.
438    pub capacity: usize,
439    /// Number of items to preallocate.
440    pub prealloc: usize,
441    /// Whether to automatically reclaim allocated items and free them to reduce memory usage.
442    pub auto_reclaim: bool,
443    /// Threshold of `surplus-pull` continuous occurrence to trigger reclamation
444    /// when `auto_reclaim` is enabled.
445    pub surpluspull_threshold_for_reclaim: usize,
446    /// Threshold for idle items to judge as a surplus-pull when `auto_reclaim` is enabled.
447    pub idle_threshold_for_surpluspull: usize,
448    /// Optional function to clear or reset an item before it is reused.
449    pub clear_func: Option<fn(&mut T)>,
450    /// Internal flag to indicate if the pool needs to process reclamation.
451    need_process_reclamation: bool,
452}
453
454impl<T: Default> Default for Config<T> {
455    fn default() -> Self {
456        Self {
457            capacity: 1024,
458            prealloc: 0,
459            auto_reclaim: false,
460            clear_func: None,
461            surpluspull_threshold_for_reclaim: 0,
462            idle_threshold_for_surpluspull: 0,
463            need_process_reclamation: false,
464        }
465    }
466}
467
468impl<T: Default> Config<T> {
469    pub(crate) fn post_process(&mut self) {
470        if self.idle_threshold_for_surpluspull == 0 {
471            self.idle_threshold_for_surpluspull = max(1, self.capacity / 20);
472        }
473
474        if self.surpluspull_threshold_for_reclaim == 0 {
475            self.surpluspull_threshold_for_reclaim = max(2, self.capacity / 100);
476        }
477
478        if self.auto_reclaim && self.prealloc != self.capacity {
479            self.need_process_reclamation = true;
480        } else {
481            self.need_process_reclamation = false;
482        }
483    }
484}