concurrent_pool/
pool.rs

1use std::cmp::max;
2use std::sync::Arc;
3use std::sync::atomic::Ordering::*;
4use std::sync::atomic::{AtomicBool, AtomicUsize};
5
6use crossbeam_queue::ArrayQueue;
7
8use crate::entry::Prc;
9use crate::{Entry, OwnedEntry};
10
11/// A concurrent object pool.
12///
13/// # Examples
14///
15/// ```rust
16/// use concurrent_pool::{Pool, Builder};
17/// use std::sync::{Arc, mpsc};
18///
19/// let mut builder = Builder::new();
20///
21/// let pool: Arc<Pool<String>> = Arc::new(builder.capacity(10).clear_func(String::clear).build());
22///
23///
24/// let (tx, rx) = mpsc::channel();
25/// let clone_pool = pool.clone();
26/// let tx1 = tx.clone();
27/// let sender1 = std::thread::spawn(move || {
28///     let item = clone_pool.pull_owned_with(|x| x.push_str("1")).unwrap();
29///     tx1.send((1, item)).unwrap();
30/// });
31///
32/// let clone_pool = pool.clone();
33/// let sender2 = std::thread::spawn(move || {
34///     let item = clone_pool.pull_owned_with(|x| x.push_str("2")).unwrap();
35///     tx.send((2, item)).unwrap();
36/// });
37///
38/// let receiver = std::thread::spawn(move || {
39///     for _ in 0..2 {
40///         let (id, item) = rx.recv().unwrap();
41///         if id == 1 {
42///             assert_eq!(*item, "1");
43///         } else {
44///             assert_eq!(*item, "2");
45///         }
46///     }
47/// });
48///
49/// sender1.join().unwrap();
50/// sender2.join().unwrap();
51/// receiver.join().unwrap();
52/// ```
53#[derive(Debug)]
54pub struct Pool<T: Default> {
55    /// Configuration of the pool.
56    config: Config<T>,
57    /// Inner queue holding the pooled items.
58    queue: ArrayQueue<Prc<T>>,
59    /// Number of items currently allocated.
60    allocated: AtomicUsize,
61    /// Number of currently continues `surplus-pull` times
62    surpluspulls: AtomicUsize,
63    /// Whether an additional item has been allocated beyond the preallocated items.
64    additional_allocated: AtomicBool,
65}
66
67impl<T: Default> Drop for Pool<T> {
68    fn drop(&mut self) {
69        while let Some(item) = self.queue.pop() {
70            unsafe { item.drop_slow() };
71        }
72    }
73}
74
75impl<T: Default> Pool<T> {
76    /// Create a new pool with the given preallocation and capacity.
77    ///
78    /// # Example
79    ///
80    /// ```rust
81    /// use concurrent_pool::Pool;
82    ///
83    /// let pool: Pool<u32> = Pool::new(2, 5);
84    /// assert_eq!(pool.available(), 5);
85    /// assert_eq!(pool.available_noalloc(), 2);
86    /// let item = pool.pull().unwrap();
87    /// assert_eq!(pool.available_noalloc(), 1);
88    /// ```
89    pub fn new(prealloc: usize, capacity: usize) -> Self {
90        Self::with_config(Config {
91            capacity,
92            prealloc,
93            ..Default::default()
94        })
95    }
96
97    /// Create a new pool with the given capacity.
98    ///
99    /// # Example
100    ///
101    /// ```rust
102    /// use concurrent_pool::Pool;
103    ///
104    /// let pool: Pool<u32> = Pool::with_capacity(10);
105    /// assert_eq!(pool.available(), 10);
106    /// assert_eq!(pool.available_noalloc(), 10);
107    /// let item = pool.pull().unwrap();
108    /// assert_eq!(pool.available(), 9);
109    /// ```
110    pub fn with_capacity(capacity: usize) -> Self {
111        Self::new(capacity, capacity)
112    }
113
114    /// Create a new pool with half of the capacity preallocated.
115    ///
116    /// # Example
117    ///
118    /// ```rust
119    /// use concurrent_pool::Pool;
120    ///
121    /// let pool: Pool<u32> = Pool::with_capacity_half_prealloc(10);
122    /// assert_eq!(pool.available(), 10);
123    /// assert_eq!(pool.available_noalloc(), 5);
124    /// let item = pool.pull().unwrap();
125    /// assert_eq!(pool.available_noalloc(), 4);
126    /// assert_eq!(pool.in_use(), 1);
127    /// ```
128    pub fn with_capacity_half_prealloc(capacity: usize) -> Self {
129        Self::new(capacity / 2, capacity)
130    }
131
132    /// Create a new pool with the given configuration.
133    ///
134    /// # Example
135    ///
136    /// ```rust
137    /// use concurrent_pool::{Pool, Config};
138    ///
139    /// fn clear_func(x: &mut String) {
140    ///     x.clear();
141    /// }
142    ///
143    /// let mut config = Config::default();
144    /// config.capacity = 1;
145    /// config.clear_func = Some(clear_func);
146    /// let pool: Pool<String> = Pool::with_config(config);
147    /// let item = pool.pull_with(|s| s.push_str("Hello, World!")).unwrap();
148    /// assert_eq!(&*item, "Hello, World!");
149    /// drop(item);
150    /// let item2 = pool.pull().unwrap();
151    /// assert_eq!(&*item2, "");
152    /// ```
153    pub fn with_config(mut config: Config<T>) -> Self {
154        config.post_process();
155        let prealloc = config.prealloc;
156        assert!(
157            prealloc <= config.capacity,
158            "prealloc must be less than or equal to capacity"
159        );
160
161        let queue_len = max(1, config.capacity);
162        let pool = Self {
163            queue: ArrayQueue::new(queue_len),
164            allocated: AtomicUsize::new(prealloc),
165            surpluspulls: AtomicUsize::new(0),
166            additional_allocated: AtomicBool::new(false),
167            config,
168        };
169        let mut items = Vec::with_capacity(prealloc);
170        for _ in 0..prealloc {
171            items.push(T::default());
172        }
173        while let Some(item) = items.pop() {
174            let _ = pool.queue.push(Prc::new_zero(item));
175        }
176        pool
177    }
178
179    /// Get in used items count.
180    ///
181    /// # Example
182    ///
183    /// ```rust
184    /// use concurrent_pool::Pool;
185    ///
186    /// let pool: Pool<u32> = Pool::with_capacity(10);
187    /// assert_eq!(pool.in_use(), 0);
188    /// let item = pool.pull().unwrap();
189    /// assert_eq!(pool.in_use(), 1);
190    /// let item2 = pool.pull().unwrap();
191    /// assert_eq!(pool.in_use(), 2);
192    /// ```
193    pub fn in_use(&self) -> usize {
194        self.allocated.load(Relaxed) - self.queue.len()
195    }
196
197    /// Get allocated items count.
198    ///
199    /// # Example
200    ///
201    /// ```rust
202    /// use concurrent_pool::Pool;
203    ///
204    /// let pool = Pool::<usize>::new(2, 5);
205    /// assert_eq!(pool.allocated(), 2);
206    /// ```
207    pub fn allocated(&self) -> usize {
208        self.allocated.load(Acquire)
209    }
210
211    /// Get available items count.
212    ///
213    /// # Example
214    ///
215    /// ```rust
216    /// use concurrent_pool::Pool;
217    ///
218    /// let pool: Pool<u32> = Pool::with_capacity(10);
219    /// assert_eq!(pool.available(), 10);
220    /// let item = pool.pull().unwrap();
221    /// assert_eq!(pool.available(), 9);
222    /// ```
223    pub fn available(&self) -> usize {
224        self.config.capacity - self.in_use()
225    }
226
227    /// Get available items count without allocation.
228    ///
229    /// # Example
230    ///
231    /// ```rust
232    /// use concurrent_pool::Pool;
233    ///
234    /// let pool: Pool<u32> = Pool::new(2, 5);
235    /// assert_eq!(pool.available_noalloc(), 2);
236    /// let item = pool.pull().unwrap();
237    /// assert_eq!(pool.available_noalloc(), 1);
238    /// let item2 = pool.pull().unwrap();
239    /// assert_eq!(pool.available_noalloc(), 0);
240    /// let item3 = pool.pull().unwrap();
241    /// assert_eq!(pool.available_noalloc(), 0);
242    /// drop(item);
243    /// assert_eq!(pool.available_noalloc(), 1);
244    /// ```
245    pub fn available_noalloc(&self) -> usize {
246        self.queue.len()
247    }
248
249    /// Check if the pool is empty.
250    ///
251    /// # Example
252    ///
253    /// ```rust
254    /// use concurrent_pool::Pool;
255    ///
256    /// let pool: Pool<u32> = Pool::with_capacity(2);
257    /// assert!(!pool.is_empty());
258    /// let item1 = pool.pull().unwrap();
259    /// assert!(!pool.is_empty());
260    /// let item2 = pool.pull().unwrap();
261    /// assert!(pool.is_empty());
262    /// drop(item1);
263    /// assert!(!pool.is_empty());
264    /// ```
265    pub fn is_empty(&self) -> bool {
266        self.available() == 0
267    }
268
269    /// Get the capacity of the pool.
270    ///
271    /// # Example
272    ///
273    /// ```rust
274    /// use concurrent_pool::Pool;
275    ///
276    /// let pool: Pool<u32> = Pool::with_capacity(10);
277    /// assert_eq!(pool.capacity(), 10);
278    /// ```
279    pub fn capacity(&self) -> usize {
280        self.config.capacity
281    }
282
283    /// Pull an item from the pool. Return `None` if the pool is empty.
284    ///
285    /// # Example
286    ///
287    /// ```rust
288    /// use concurrent_pool::Pool;
289    ///
290    /// let pool: Pool<u32> = Pool::with_capacity(2);
291    /// let item1 = pool.pull().unwrap();
292    /// assert_eq!(*item1, 0);
293    /// ```
294    pub fn pull(&self) -> Option<Entry<'_, T>> {
295        self.pull_inner().map(|item| Entry {
296            item: Some(item),
297            pool: self,
298        })
299    }
300
301    /// Pull an item from the pool and apply a function to it. Return `None` if the pool is empty.
302    ///
303    /// # Example
304    ///
305    /// ```rust
306    /// use concurrent_pool::Pool;
307    ///
308    /// let pool: Pool<u32> = Pool::with_capacity(2);
309    /// let item1 = pool.pull_with(|x| *x = 42).unwrap();
310    /// assert_eq!(*item1, 42);
311    /// ```
312    pub fn pull_with<F>(&self, func: F) -> Option<Entry<'_, T>>
313    where
314        F: FnOnce(&mut T),
315    {
316        self.pull().map(|mut entry| {
317            func(unsafe { entry.get_mut_unchecked() });
318            entry
319        })
320    }
321
322    /// Pull an owned item from the pool. Return `None` if the pool is empty.
323    ///
324    /// # Example
325    ///
326    /// ```rust
327    /// use concurrent_pool::Pool;
328    /// use std::sync::Arc;
329    ///
330    /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
331    /// let item1 = pool.pull_owned().unwrap();
332    /// assert_eq!(*item1, 0);
333    /// ```
334    pub fn pull_owned(self: &Arc<Self>) -> Option<OwnedEntry<T>> {
335        self.pull_inner().map(|item| crate::OwnedEntry {
336            item: Some(item),
337            pool: self.clone(),
338        })
339    }
340
341    /// Pull an owned item from the pool and apply a function to it. Return `None` if the pool is empty.
342    ///
343    /// # Example
344    ///
345    /// ```rust
346    /// use concurrent_pool::Pool;
347    /// use std::sync::Arc;
348    ///
349    /// let pool: Arc<Pool<u32>> = Arc::new(Pool::with_capacity(2));
350    /// let item1 = pool.pull_owned_with(|x| *x = 42).unwrap();
351    /// assert_eq!(*item1, 42);
352    /// ```
353    pub fn pull_owned_with<F>(self: &Arc<Self>, func: F) -> Option<OwnedEntry<T>>
354    where
355        F: FnOnce(&mut T),
356    {
357        self.pull_owned().map(|mut entry| {
358            func(unsafe { entry.get_mut_unchecked() });
359            entry
360        })
361    }
362
363    /// Internal method to pull an item from the pool.
364    fn pull_inner(&self) -> Option<Prc<T>> {
365        match self.queue.pop() {
366            None => {
367                if !self.additional_allocated.load(Relaxed) {
368                    self.additional_allocated.store(true, Relaxed);
369                }
370                if self.config.need_process_reclamation {
371                    self.surpluspulls.store(0, SeqCst);
372                }
373                if self.allocated.load(Acquire) < self.config.capacity {
374                    self.allocated.fetch_add(1, Relaxed);
375                    Some(Prc::new(T::default()))
376                } else {
377                    None
378                }
379            }
380            Some(item) => {
381                if self.config.need_process_reclamation {
382                    let left = self.queue.len();
383                    if left >= self.config.idle_threshold_for_surpluspull {
384                        let surpluspulls = self.surpluspulls.fetch_add(1, Relaxed) + 1;
385                        if surpluspulls >= self.config.surpluspull_threshold_for_reclaim
386                            && self.additional_allocated.load(Relaxed)
387                        {
388                            self.reclaim();
389                        }
390                    } else {
391                        self.surpluspulls.store(0, Relaxed);
392                    }
393                }
394                item.inc_ref();
395                Some(item)
396            }
397        }
398    }
399
400    /// Reclaim an item from the pool to reduce memory usage.
401    fn reclaim(&self) {
402        if let Some(item) = self.queue.pop() {
403            unsafe { item.drop_slow() };
404            let current = self.allocated.fetch_sub(1, Release) - 1;
405            if self.config.need_process_reclamation && current <= self.config.prealloc {
406                if self.additional_allocated.load(Relaxed) {
407                    self.additional_allocated.store(false, Relaxed);
408                }
409            }
410        }
411    }
412
413    /// Recycle an item back into the pool.
414    pub(crate) fn recycle(&self, mut item: Prc<T>) {
415        if let Some(func) = &self.config.clear_func {
416            func(unsafe { Prc::get_mut_unchecked(&mut item) })
417        }
418        if self.queue.push(item).is_err() {
419            panic!("It is imposible that the pool is full when recycling an item");
420        }
421    }
422}
423
424/// Configuration for the pool.
425#[derive(Debug)]
426pub struct Config<T: Default> {
427    /// Maximum capacity of the pool.
428    pub capacity: usize,
429    /// Number of items to preallocate.
430    pub prealloc: usize,
431    /// Whether to automatically reclaim allocated items and free them to reduce memory usage.
432    pub auto_reclaim: bool,
433    /// Threshold of `surplus-pull` continuous occurrence to trigger reclamation
434    /// when `auto_reclaim` is enabled.
435    pub surpluspull_threshold_for_reclaim: usize,
436    /// Threshold for idle items to judge as a surplus-pull when `auto_reclaim` is enabled.
437    pub idle_threshold_for_surpluspull: usize,
438    /// Optional function to clear or reset an item before it is reused.
439    pub clear_func: Option<fn(&mut T)>,
440    /// Internal flag to indicate if the pool needs to process reclamation.
441    need_process_reclamation: bool,
442}
443
444impl<T: Default> Default for Config<T> {
445    fn default() -> Self {
446        Self {
447            capacity: 1024,
448            prealloc: 0,
449            auto_reclaim: false,
450            clear_func: None,
451            surpluspull_threshold_for_reclaim: 0,
452            idle_threshold_for_surpluspull: 0,
453            need_process_reclamation: false,
454        }
455    }
456}
457
458impl<T: Default> Config<T> {
459    pub(crate) fn post_process(&mut self) {
460        if self.idle_threshold_for_surpluspull == 0 {
461            self.idle_threshold_for_surpluspull = max(1, self.capacity / 20);
462        }
463
464        if self.surpluspull_threshold_for_reclaim == 0 {
465            self.surpluspull_threshold_for_reclaim = max(2, self.capacity / 100);
466        }
467
468        if self.auto_reclaim && self.prealloc != self.capacity {
469            self.need_process_reclamation = true;
470        } else {
471            self.need_process_reclamation = false;
472        }
473    }
474}