fastpool/
bounded.rs

1// Copyright 2025 FastLabs Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bounded object pools.
16//!
17//! A bounded pool creates and recycles objects with full management. You _cannot_ put an object to
18//! the pool manually.
19//!
20//! The pool is bounded by the `max_size` config option of [`PoolConfig`]. If the pool reaches the
21//! maximum size, it will block all the [`Pool::get`] calls until an object is returned to the pool
22//! or an object is detached from the pool.
23//!
24//! Typically, a bounded pool is used wrapped in an [`Arc`] in order to call [`Pool::get`].
25//! This is intended so that users can leverage [`Arc::downgrade`] for running background
26//! maintenance tasks (e.g., [`Pool::retain`]).
27//!
28//! Bounded pools are useful for pooling database connections.
29//!
30//! ## Examples
31//!
32//! Read the following simple demo or more complex examples in the examples directory.
33//!
34//! ```
35//! use std::future::Future;
36//!
37//! use fastpool::ManageObject;
38//! use fastpool::ObjectStatus;
39//! use fastpool::bounded::Pool;
40//! use fastpool::bounded::PoolConfig;
41//!
42//! struct Compute;
43//! impl Compute {
44//!     async fn do_work(&self) -> i32 {
45//!         42
46//!     }
47//! }
48//!
49//! struct Manager;
50//! impl ManageObject for Manager {
51//!     type Object = Compute;
52//!     type Error = ();
53//!
54//!     async fn create(&self) -> Result<Self::Object, Self::Error> {
55//!         Ok(Compute)
56//!     }
57//!
58//!     async fn is_recyclable(
59//!         &self,
60//!         o: &mut Self::Object,
61//!         status: &ObjectStatus,
62//!     ) -> Result<(), Self::Error> {
63//!         Ok(())
64//!     }
65//! }
66//!
67//! # #[tokio::main]
68//! # async fn main() {
69//! let pool = Pool::new(PoolConfig::new(16), Manager);
70//! let o = pool.get().await.unwrap();
71//! assert_eq!(o.do_work().await, 42);
72//! # }
73//! ```
74
75use std::collections::VecDeque;
76use std::ops::Deref;
77use std::ops::DerefMut;
78use std::sync::Arc;
79use std::sync::Weak;
80use std::sync::atomic::AtomicUsize;
81use std::sync::atomic::Ordering;
82
83use mea::semaphore::OwnedSemaphorePermit;
84use mea::semaphore::Semaphore;
85
86use crate::ManageObject;
87use crate::ObjectStatus;
88use crate::QueueStrategy;
89use crate::RetainResult;
90use crate::mutex::Mutex;
91use crate::retain_spec;
92
93/// The configuration of [`Pool`].
94#[derive(Clone, Copy, Debug)]
95#[non_exhaustive]
96pub struct PoolConfig {
97    /// Maximum size of the [`Pool`].
98    pub max_size: usize,
99
100    /// Queue strategy of the [`Pool`].
101    ///
102    /// Determines the order of objects being queued and dequeued.
103    pub queue_strategy: QueueStrategy,
104}
105
106impl PoolConfig {
107    /// Creates a new [`PoolConfig`].
108    pub fn new(max_size: usize) -> Self {
109        Self {
110            max_size,
111            queue_strategy: QueueStrategy::default(),
112        }
113    }
114
115    /// Returns a new [`PoolConfig`] with the specified queue strategy.
116    pub fn with_queue_strategy(mut self, queue_strategy: QueueStrategy) -> Self {
117        self.queue_strategy = queue_strategy;
118        self
119    }
120}
121
122/// The current pool status.
123///
124/// See [`Pool::status`].
125#[derive(Clone, Copy, Debug)]
126#[non_exhaustive]
127pub struct PoolStatus {
128    /// The maximum size of the pool.
129    pub max_size: usize,
130
131    /// The current size of the pool.
132    pub current_size: usize,
133
134    /// The number of idle objects in the pool.
135    pub idle_count: usize,
136
137    /// The number of futures waiting for an object.
138    pub wait_count: usize,
139}
140
141/// Generic runtime-agnostic object pool with a maximum size.
142///
143/// See the [module level documentation](self) for more.
144pub struct Pool<M: ManageObject> {
145    config: PoolConfig,
146    manager: M,
147
148    /// A counter that tracks the sum of waiters + obtained objects.
149    users: AtomicUsize,
150    /// A semaphore that limits the maximum of users of the pool.
151    permits: Arc<Semaphore>,
152    /// A deque that holds the objects.
153    slots: Mutex<PoolDeque<ObjectState<M::Object>>>,
154}
155
156#[derive(Debug)]
157struct PoolDeque<T> {
158    deque: VecDeque<T>,
159    current_size: usize,
160    max_size: usize,
161}
162
163impl<M> std::fmt::Debug for Pool<M>
164where
165    M: ManageObject,
166    M::Object: std::fmt::Debug,
167{
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        f.debug_struct("Pool")
170            .field("slots", &self.slots)
171            .field("config", &self.config)
172            .field("users", &self.users)
173            .field("permits", &self.permits)
174            .finish()
175    }
176}
177
178impl<M: ManageObject> Pool<M> {
179    /// Creates a new [`Pool`].
180    pub fn new(config: PoolConfig, manager: M) -> Arc<Self> {
181        let users = AtomicUsize::new(0);
182        let permits = Arc::new(Semaphore::new(config.max_size));
183        let slots = Mutex::new(PoolDeque {
184            deque: VecDeque::with_capacity(config.max_size),
185            current_size: 0,
186            max_size: config.max_size,
187        });
188
189        Arc::new(Self {
190            config,
191            manager,
192            users,
193            permits,
194            slots,
195        })
196    }
197
198    /// Replenishes the pool with at most `most` number of new objects:
199    ///
200    /// 1. If the pool has fewer slots to fill than `most`, narrow `most` to the number of slots.
201    /// 2. If there is already any idle object in the pool, decrease `most` by the number of idle
202    ///    objects.
203    /// 3. If [`ManageObject::create`] returns `Err`, reduces `most` by 1 and continues to the next.
204    ///
205    /// Returns the number of objects that are actually replenished to the pool. This method is
206    /// suitable to implement functionalities like minimal idle connections in a connection
207    /// pool.
208    pub async fn replenish(&self, most: usize) -> usize {
209        let mut permit = {
210            let mut n = most;
211            loop {
212                match self.permits.try_acquire(n) {
213                    Some(permit) => break permit,
214                    None => {
215                        n = n.min(self.permits.available_permits());
216                        continue;
217                    }
218                }
219            }
220        };
221
222        if permit.permits() == 0 {
223            return 0;
224        }
225
226        let gap = {
227            let idles = self.slots.lock().deque.len();
228            if idles >= permit.permits() {
229                return 0;
230            }
231
232            match permit.split(idles) {
233                None => unreachable!(
234                    "idles ({}) should be less than permits ({})",
235                    idles,
236                    permit.permits()
237                ),
238                Some(p) => {
239                    // reduced by existing idle objects and release the corresponding permits
240                    drop(p);
241                }
242            }
243
244            permit.permits()
245        };
246
247        let mut replenished = 0;
248        for _ in 0..gap {
249            if let Ok(o) = self.manager.create().await {
250                let status = ObjectStatus::default();
251                let state = ObjectState { o, status };
252
253                let mut slots = self.slots.lock();
254                slots.current_size += 1;
255                slots.deque.push_back(state);
256                drop(slots);
257
258                replenished += 1;
259            }
260
261            match permit.split(1) {
262                None => unreachable!("permit must be greater than 0 at this point"),
263                Some(p) => {
264                    // always release one permit to unblock other waiters
265                    drop(p);
266                }
267            }
268        }
269
270        replenished
271    }
272
273    /// Retrieves an [`Object`] from this [`Pool`].
274    ///
275    /// This method should be called with a pool wrapped in an [`Arc`]. If the pool reaches the
276    /// maximum size, this method would block until an object is returned to the pool or an object
277    /// is detached from the pool.
278    pub async fn get(self: &Arc<Self>) -> Result<Object<M>, M::Error> {
279        self.users.fetch_add(1, Ordering::Relaxed);
280        let guard = scopeguard::guard((), |()| {
281            self.users.fetch_sub(1, Ordering::Relaxed);
282        });
283
284        let permit = self.permits.clone().acquire_owned(1).await;
285
286        let object = loop {
287            let existing = match self.config.queue_strategy {
288                QueueStrategy::Fifo => self.slots.lock().deque.pop_front(),
289                QueueStrategy::Lifo => self.slots.lock().deque.pop_back(),
290            };
291
292            match existing {
293                None => {
294                    let object = self.manager.create().await?;
295                    let state = ObjectState {
296                        o: object,
297                        status: ObjectStatus::default(),
298                    };
299                    self.slots.lock().current_size += 1;
300                    break Object {
301                        state: Some(state),
302                        permit,
303                        pool: Arc::downgrade(self),
304                    };
305                }
306                Some(object) => {
307                    let mut unready_object = UnreadyObject {
308                        state: Some(object),
309                        pool: Arc::downgrade(self),
310                    };
311
312                    let state = unready_object.state();
313                    let status = state.status;
314                    if self
315                        .manager
316                        .is_recyclable(&mut state.o, &status)
317                        .await
318                        .is_ok()
319                    {
320                        state.status.recycle_count += 1;
321                        state.status.recycled = Some(std::time::Instant::now());
322                        break unready_object.ready(permit);
323                    }
324                }
325            };
326        };
327
328        scopeguard::ScopeGuard::into_inner(guard);
329        Ok(object)
330    }
331
332    /// Retains only the objects that pass the given predicate.
333    ///
334    /// This function blocks the entire pool. Therefore, the given function should not block.
335    ///
336    /// The following example starts a background task that runs every 30 seconds and removes
337    /// objects from the pool that have not been used for more than one minute. The task will
338    /// terminate if the pool is dropped.
339    ///
340    /// ```rust,ignore
341    /// let interval = Duration::from_secs(30);
342    /// let max_age = Duration::from_secs(60);
343    ///
344    /// let weak_pool = Arc::downgrade(&pool);
345    /// tokio::spawn(async move {
346    ///     loop {
347    ///         tokio::time::sleep(interval).await;
348    ///         if let Some(pool) = weak_pool.upgrade() {
349    ///             pool.retain(|_, status| status.last_used().elapsed() < max_age);
350    ///         } else {
351    ///             break;
352    ///         }
353    ///     }
354    /// });
355    /// ```
356    pub fn retain(
357        &self,
358        f: impl FnMut(&mut M::Object, ObjectStatus) -> bool,
359    ) -> RetainResult<M::Object> {
360        let mut slots = self.slots.lock();
361        let result = retain_spec::do_vec_deque_retain(&mut slots.deque, f);
362        slots.current_size -= result.removed.len();
363        result
364    }
365
366    /// Returns the current status of the pool.
367    ///
368    /// The status returned by the pool is not guaranteed to be consistent.
369    ///
370    /// While this features provides [eventual consistency], the numbers will be
371    /// off when accessing the status of a pool under heavy load. These numbers
372    /// are meant for an overall insight.
373    ///
374    /// [eventual consistency]: (https://en.wikipedia.org/wiki/Eventual_consistency)
375    pub fn status(&self) -> PoolStatus {
376        let slots = self.slots.lock();
377        let (current_size, max_size) = (slots.current_size, slots.max_size);
378        drop(slots);
379
380        let users = self.users.load(Ordering::Relaxed);
381        let (idle_count, wait_count) = if users < current_size {
382            (current_size - users, 0)
383        } else {
384            (0, users - current_size)
385        };
386
387        PoolStatus {
388            max_size,
389            current_size,
390            idle_count,
391            wait_count,
392        }
393    }
394
395    fn push_back(&self, o: ObjectState<M::Object>) {
396        let mut slots = self.slots.lock();
397
398        assert!(
399            slots.current_size <= slots.max_size,
400            "invariant broken: current_size <= max_size (actual: {} <= {})",
401            slots.current_size,
402            slots.max_size,
403        );
404
405        slots.deque.push_back(o);
406        drop(slots);
407
408        self.users.fetch_sub(1, Ordering::Relaxed);
409    }
410
411    fn detach_object(&self, o: &mut M::Object, ready: bool) {
412        let mut slots = self.slots.lock();
413
414        assert!(
415            slots.current_size <= slots.max_size,
416            "invariant broken: current_size <= max_size (actual: {} <= {})",
417            slots.current_size,
418            slots.max_size,
419        );
420
421        slots.current_size -= 1;
422        drop(slots);
423
424        if ready {
425            self.users.fetch_sub(1, Ordering::Relaxed);
426        } else {
427            // if the object is not ready, users count decrement is handled in the caller side,
428            // that is, on exiting the `Pool::get` method.
429        }
430        self.manager.on_detached(o);
431    }
432}
433
434/// A wrapper of the actual pooled object.
435///
436/// This object implements [`Deref`] and [`DerefMut`]. You can use it as if it was of type
437/// `M::Object`.
438///
439/// This object implements [`Drop`] that returns the underlying object to the pool on drop. You may
440/// call [`Object::detach`] to detach the object from the pool before dropping it.
441pub struct Object<M: ManageObject> {
442    state: Option<ObjectState<M::Object>>,
443    permit: OwnedSemaphorePermit,
444    pool: Weak<Pool<M>>,
445}
446
447impl<M> std::fmt::Debug for Object<M>
448where
449    M: ManageObject,
450    M::Object: std::fmt::Debug,
451{
452    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453        f.debug_struct("Object")
454            .field("state", &self.state)
455            .field("permit", &self.permit)
456            .finish()
457    }
458}
459
460impl<M: ManageObject> Drop for Object<M> {
461    fn drop(&mut self) {
462        if let Some(state) = self.state.take() {
463            if let Some(pool) = self.pool.upgrade() {
464                pool.push_back(state);
465            }
466        }
467    }
468}
469
470impl<M: ManageObject> Deref for Object<M> {
471    type Target = M::Object;
472    fn deref(&self) -> &M::Object {
473        // SAFETY: `state` is always `Some` when `Object` is owned.
474        &self.state.as_ref().unwrap().o
475    }
476}
477
478impl<M: ManageObject> DerefMut for Object<M> {
479    fn deref_mut(&mut self) -> &mut Self::Target {
480        // SAFETY: `state` is always `Some` when `Object` is owned.
481        &mut self.state.as_mut().unwrap().o
482    }
483}
484
485impl<M: ManageObject> AsRef<M::Object> for Object<M> {
486    fn as_ref(&self) -> &M::Object {
487        self
488    }
489}
490
491impl<M: ManageObject> AsMut<M::Object> for Object<M> {
492    fn as_mut(&mut self) -> &mut M::Object {
493        self
494    }
495}
496
497impl<M: ManageObject> Object<M> {
498    /// Detaches the object from the [`Pool`].
499    ///
500    /// This reduces the size of the pool by one.
501    pub fn detach(mut self) -> M::Object {
502        // SAFETY: `state` is always `Some` when `Object` is owned.
503        let mut o = self.state.take().unwrap().o;
504        if let Some(pool) = self.pool.upgrade() {
505            pool.detach_object(&mut o, true);
506        }
507        o
508    }
509
510    /// Returns the status of the object.
511    pub fn status(&self) -> ObjectStatus {
512        // SAFETY: `state` is always `Some` when `Object` is owned.
513        self.state.as_ref().unwrap().status
514    }
515}
516
517/// A wrapper of ObjectStatus that detaches the object from the pool when dropped.
518struct UnreadyObject<M: ManageObject> {
519    state: Option<ObjectState<M::Object>>,
520    pool: Weak<Pool<M>>,
521}
522
523impl<M: ManageObject> Drop for UnreadyObject<M> {
524    fn drop(&mut self) {
525        if let Some(mut state) = self.state.take() {
526            if let Some(pool) = self.pool.upgrade() {
527                pool.detach_object(&mut state.o, false);
528            }
529        }
530    }
531}
532
533impl<M: ManageObject> UnreadyObject<M> {
534    fn ready(mut self, permit: OwnedSemaphorePermit) -> Object<M> {
535        // SAFETY: `state` is always `Some` when `UnreadyObject` is owned.
536        let state = Some(self.state.take().unwrap());
537        let pool = self.pool.clone();
538        Object {
539            state,
540            permit,
541            pool,
542        }
543    }
544
545    fn state(&mut self) -> &mut ObjectState<M::Object> {
546        // SAFETY: `state` is always `Some` when `UnreadyObject` is owned.
547        self.state.as_mut().unwrap()
548    }
549}
550
551#[derive(Debug)]
552struct ObjectState<T> {
553    o: T,
554    status: ObjectStatus,
555}
556
557impl<T> retain_spec::SealedState for ObjectState<T> {
558    type Object = T;
559
560    fn status(&self) -> ObjectStatus {
561        self.status
562    }
563
564    fn mut_object(&mut self) -> &mut Self::Object {
565        &mut self.o
566    }
567
568    fn take_object(self) -> Self::Object {
569        self.o
570    }
571}