fastpool/bounded.rs
1// Copyright 2025 FastLabs Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bounded object pools.
16//!
17//! A bounded pool creates and recycles objects with full management. You _cannot_ put an object to
18//! the pool manually.
19//!
20//! The pool is bounded by the `max_size` config option of [`PoolConfig`]. If the pool reaches the
21//! maximum size, it will block all the [`Pool::get`] calls until an object is returned to the pool
22//! or an object is detached from the pool.
23//!
24//! Typically, a bounded pool is used wrapped in an [`Arc`] in order to call [`Pool::get`].
25//! This is intended so that users can leverage [`Arc::downgrade`] for running background
26//! maintenance tasks (e.g., [`Pool::retain`]).
27//!
28//! Bounded pools are useful for pooling database connections.
29//!
30//! ## Examples
31//!
32//! Read the following simple demo or more complex examples in the examples directory.
33//!
34//! ```
35//! use std::future::Future;
36//!
37//! use fastpool::ManageObject;
38//! use fastpool::ObjectStatus;
39//! use fastpool::bounded::Pool;
40//! use fastpool::bounded::PoolConfig;
41//!
42//! struct Compute;
43//! impl Compute {
44//! async fn do_work(&self) -> i32 {
45//! 42
46//! }
47//! }
48//!
49//! struct Manager;
50//! impl ManageObject for Manager {
51//! type Object = Compute;
52//! type Error = ();
53//!
54//! async fn create(&self) -> Result<Self::Object, Self::Error> {
55//! Ok(Compute)
56//! }
57//!
58//! async fn is_recyclable(
59//! &self,
60//! o: &mut Self::Object,
61//! status: &ObjectStatus,
62//! ) -> Result<(), Self::Error> {
63//! Ok(())
64//! }
65//! }
66//!
67//! # #[tokio::main]
68//! # async fn main() {
69//! let pool = Pool::new(PoolConfig::new(16), Manager);
70//! let o = pool.get().await.unwrap();
71//! assert_eq!(o.do_work().await, 42);
72//! # }
73//! ```
74
75use std::collections::VecDeque;
76use std::ops::Deref;
77use std::ops::DerefMut;
78use std::sync::Arc;
79use std::sync::Weak;
80use std::sync::atomic::AtomicUsize;
81use std::sync::atomic::Ordering;
82
83use mea::semaphore::OwnedSemaphorePermit;
84use mea::semaphore::Semaphore;
85
86use crate::ManageObject;
87use crate::ObjectStatus;
88use crate::QueueStrategy;
89use crate::RetainResult;
90use crate::mutex::Mutex;
91use crate::retain_spec;
92
93/// The configuration of [`Pool`].
94#[derive(Clone, Copy, Debug)]
95#[non_exhaustive]
96pub struct PoolConfig {
97 /// Maximum size of the [`Pool`].
98 pub max_size: usize,
99
100 /// Queue strategy of the [`Pool`].
101 ///
102 /// Determines the order of objects being queued and dequeued.
103 pub queue_strategy: QueueStrategy,
104}
105
106impl PoolConfig {
107 /// Creates a new [`PoolConfig`].
108 pub fn new(max_size: usize) -> Self {
109 Self {
110 max_size,
111 queue_strategy: QueueStrategy::default(),
112 }
113 }
114
115 /// Returns a new [`PoolConfig`] with the specified queue strategy.
116 pub fn with_queue_strategy(mut self, queue_strategy: QueueStrategy) -> Self {
117 self.queue_strategy = queue_strategy;
118 self
119 }
120}
121
122/// The current pool status.
123///
124/// See [`Pool::status`].
125#[derive(Clone, Copy, Debug)]
126#[non_exhaustive]
127pub struct PoolStatus {
128 /// The maximum size of the pool.
129 pub max_size: usize,
130
131 /// The current size of the pool.
132 pub current_size: usize,
133
134 /// The number of idle objects in the pool.
135 pub idle_count: usize,
136
137 /// The number of futures waiting for an object.
138 pub wait_count: usize,
139}
140
141/// Generic runtime-agnostic object pool with a maximum size.
142///
143/// See the [module level documentation](self) for more.
144pub struct Pool<M: ManageObject> {
145 config: PoolConfig,
146 manager: M,
147
148 /// A counter that tracks the sum of waiters + obtained objects.
149 users: AtomicUsize,
150 /// A semaphore that limits the maximum of users of the pool.
151 permits: Arc<Semaphore>,
152 /// A deque that holds the objects.
153 slots: Mutex<PoolDeque<ObjectState<M::Object>>>,
154}
155
156#[derive(Debug)]
157struct PoolDeque<T> {
158 deque: VecDeque<T>,
159 current_size: usize,
160 max_size: usize,
161}
162
163impl<M> std::fmt::Debug for Pool<M>
164where
165 M: ManageObject,
166 M::Object: std::fmt::Debug,
167{
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct("Pool")
170 .field("slots", &self.slots)
171 .field("config", &self.config)
172 .field("users", &self.users)
173 .field("permits", &self.permits)
174 .finish()
175 }
176}
177
178impl<M: ManageObject> Pool<M> {
179 /// Creates a new [`Pool`].
180 pub fn new(config: PoolConfig, manager: M) -> Arc<Self> {
181 let users = AtomicUsize::new(0);
182 let permits = Arc::new(Semaphore::new(config.max_size));
183 let slots = Mutex::new(PoolDeque {
184 deque: VecDeque::with_capacity(config.max_size),
185 current_size: 0,
186 max_size: config.max_size,
187 });
188
189 Arc::new(Self {
190 config,
191 manager,
192 users,
193 permits,
194 slots,
195 })
196 }
197
198 /// Replenishes the pool with at most `most` number of new objects:
199 ///
200 /// 1. If the pool has fewer slots to fill than `most`, narrow `most` to the number of slots.
201 /// 2. If there is already any idle object in the pool, decrease `most` by the number of idle
202 /// objects.
203 /// 3. If [`ManageObject::create`] returns `Err`, reduces `most` by 1 and continues to the next.
204 ///
205 /// Returns the number of objects that are actually replenished to the pool. This method is
206 /// suitable to implement functionalities like minimal idle connections in a connection
207 /// pool.
208 pub async fn replenish(&self, most: usize) -> usize {
209 let mut permit = {
210 let mut n = most;
211 loop {
212 match self.permits.try_acquire(n) {
213 Some(permit) => break permit,
214 None => {
215 n = n.min(self.permits.available_permits());
216 continue;
217 }
218 }
219 }
220 };
221
222 if permit.permits() == 0 {
223 return 0;
224 }
225
226 let gap = {
227 let idles = self.slots.lock().deque.len();
228 if idles >= permit.permits() {
229 return 0;
230 }
231
232 match permit.split(idles) {
233 None => unreachable!(
234 "idles ({}) should be less than permits ({})",
235 idles,
236 permit.permits()
237 ),
238 Some(p) => {
239 // reduced by existing idle objects and release the corresponding permits
240 drop(p);
241 }
242 }
243
244 permit.permits()
245 };
246
247 let mut replenished = 0;
248 for _ in 0..gap {
249 if let Ok(o) = self.manager.create().await {
250 let status = ObjectStatus::default();
251 let state = ObjectState { o, status };
252
253 let mut slots = self.slots.lock();
254 slots.current_size += 1;
255 slots.deque.push_back(state);
256 drop(slots);
257
258 replenished += 1;
259 }
260
261 match permit.split(1) {
262 None => unreachable!("permit must be greater than 0 at this point"),
263 Some(p) => {
264 // always release one permit to unblock other waiters
265 drop(p);
266 }
267 }
268 }
269
270 replenished
271 }
272
273 /// Retrieves an [`Object`] from this [`Pool`].
274 ///
275 /// This method should be called with a pool wrapped in an [`Arc`]. If the pool reaches the
276 /// maximum size, this method would block until an object is returned to the pool or an object
277 /// is detached from the pool.
278 pub async fn get(self: &Arc<Self>) -> Result<Object<M>, M::Error> {
279 self.users.fetch_add(1, Ordering::Relaxed);
280 let guard = scopeguard::guard((), |()| {
281 self.users.fetch_sub(1, Ordering::Relaxed);
282 });
283
284 let permit = self.permits.clone().acquire_owned(1).await;
285
286 let object = loop {
287 let existing = match self.config.queue_strategy {
288 QueueStrategy::Fifo => self.slots.lock().deque.pop_front(),
289 QueueStrategy::Lifo => self.slots.lock().deque.pop_back(),
290 };
291
292 match existing {
293 None => {
294 let object = self.manager.create().await?;
295 let state = ObjectState {
296 o: object,
297 status: ObjectStatus::default(),
298 };
299 self.slots.lock().current_size += 1;
300 break Object {
301 state: Some(state),
302 permit,
303 pool: Arc::downgrade(self),
304 };
305 }
306 Some(object) => {
307 let mut unready_object = UnreadyObject {
308 state: Some(object),
309 pool: Arc::downgrade(self),
310 };
311
312 let state = unready_object.state();
313 let status = state.status;
314 if self
315 .manager
316 .is_recyclable(&mut state.o, &status)
317 .await
318 .is_ok()
319 {
320 state.status.recycle_count += 1;
321 state.status.recycled = Some(std::time::Instant::now());
322 break unready_object.ready(permit);
323 }
324 }
325 };
326 };
327
328 scopeguard::ScopeGuard::into_inner(guard);
329 Ok(object)
330 }
331
332 /// Retains only the objects that pass the given predicate.
333 ///
334 /// This function blocks the entire pool. Therefore, the given function should not block.
335 ///
336 /// The following example starts a background task that runs every 30 seconds and removes
337 /// objects from the pool that have not been used for more than one minute. The task will
338 /// terminate if the pool is dropped.
339 ///
340 /// ```rust,ignore
341 /// let interval = Duration::from_secs(30);
342 /// let max_age = Duration::from_secs(60);
343 ///
344 /// let weak_pool = Arc::downgrade(&pool);
345 /// tokio::spawn(async move {
346 /// loop {
347 /// tokio::time::sleep(interval).await;
348 /// if let Some(pool) = weak_pool.upgrade() {
349 /// pool.retain(|_, status| status.last_used().elapsed() < max_age);
350 /// } else {
351 /// break;
352 /// }
353 /// }
354 /// });
355 /// ```
356 pub fn retain(
357 &self,
358 f: impl FnMut(&mut M::Object, ObjectStatus) -> bool,
359 ) -> RetainResult<M::Object> {
360 let mut slots = self.slots.lock();
361 let result = retain_spec::do_vec_deque_retain(&mut slots.deque, f);
362 slots.current_size -= result.removed.len();
363 result
364 }
365
366 /// Returns the current status of the pool.
367 ///
368 /// The status returned by the pool is not guaranteed to be consistent.
369 ///
370 /// While this features provides [eventual consistency], the numbers will be
371 /// off when accessing the status of a pool under heavy load. These numbers
372 /// are meant for an overall insight.
373 ///
374 /// [eventual consistency]: (https://en.wikipedia.org/wiki/Eventual_consistency)
375 pub fn status(&self) -> PoolStatus {
376 let slots = self.slots.lock();
377 let (current_size, max_size) = (slots.current_size, slots.max_size);
378 drop(slots);
379
380 let users = self.users.load(Ordering::Relaxed);
381 let (idle_count, wait_count) = if users < current_size {
382 (current_size - users, 0)
383 } else {
384 (0, users - current_size)
385 };
386
387 PoolStatus {
388 max_size,
389 current_size,
390 idle_count,
391 wait_count,
392 }
393 }
394
395 fn push_back(&self, o: ObjectState<M::Object>) {
396 let mut slots = self.slots.lock();
397
398 assert!(
399 slots.current_size <= slots.max_size,
400 "invariant broken: current_size <= max_size (actual: {} <= {})",
401 slots.current_size,
402 slots.max_size,
403 );
404
405 slots.deque.push_back(o);
406 drop(slots);
407
408 self.users.fetch_sub(1, Ordering::Relaxed);
409 }
410
411 fn detach_object(&self, o: &mut M::Object, ready: bool) {
412 let mut slots = self.slots.lock();
413
414 assert!(
415 slots.current_size <= slots.max_size,
416 "invariant broken: current_size <= max_size (actual: {} <= {})",
417 slots.current_size,
418 slots.max_size,
419 );
420
421 slots.current_size -= 1;
422 drop(slots);
423
424 if ready {
425 self.users.fetch_sub(1, Ordering::Relaxed);
426 } else {
427 // if the object is not ready, users count decrement is handled in the caller side,
428 // that is, on exiting the `Pool::get` method.
429 }
430 self.manager.on_detached(o);
431 }
432}
433
434/// A wrapper of the actual pooled object.
435///
436/// This object implements [`Deref`] and [`DerefMut`]. You can use it as if it was of type
437/// `M::Object`.
438///
439/// This object implements [`Drop`] that returns the underlying object to the pool on drop. You may
440/// call [`Object::detach`] to detach the object from the pool before dropping it.
441pub struct Object<M: ManageObject> {
442 state: Option<ObjectState<M::Object>>,
443 permit: OwnedSemaphorePermit,
444 pool: Weak<Pool<M>>,
445}
446
447impl<M> std::fmt::Debug for Object<M>
448where
449 M: ManageObject,
450 M::Object: std::fmt::Debug,
451{
452 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
453 f.debug_struct("Object")
454 .field("state", &self.state)
455 .field("permit", &self.permit)
456 .finish()
457 }
458}
459
460impl<M: ManageObject> Drop for Object<M> {
461 fn drop(&mut self) {
462 if let Some(state) = self.state.take() {
463 if let Some(pool) = self.pool.upgrade() {
464 pool.push_back(state);
465 }
466 }
467 }
468}
469
470impl<M: ManageObject> Deref for Object<M> {
471 type Target = M::Object;
472 fn deref(&self) -> &M::Object {
473 // SAFETY: `state` is always `Some` when `Object` is owned.
474 &self.state.as_ref().unwrap().o
475 }
476}
477
478impl<M: ManageObject> DerefMut for Object<M> {
479 fn deref_mut(&mut self) -> &mut Self::Target {
480 // SAFETY: `state` is always `Some` when `Object` is owned.
481 &mut self.state.as_mut().unwrap().o
482 }
483}
484
485impl<M: ManageObject> AsRef<M::Object> for Object<M> {
486 fn as_ref(&self) -> &M::Object {
487 self
488 }
489}
490
491impl<M: ManageObject> AsMut<M::Object> for Object<M> {
492 fn as_mut(&mut self) -> &mut M::Object {
493 self
494 }
495}
496
497impl<M: ManageObject> Object<M> {
498 /// Detaches the object from the [`Pool`].
499 ///
500 /// This reduces the size of the pool by one.
501 pub fn detach(mut self) -> M::Object {
502 // SAFETY: `state` is always `Some` when `Object` is owned.
503 let mut o = self.state.take().unwrap().o;
504 if let Some(pool) = self.pool.upgrade() {
505 pool.detach_object(&mut o, true);
506 }
507 o
508 }
509
510 /// Returns the status of the object.
511 pub fn status(&self) -> ObjectStatus {
512 // SAFETY: `state` is always `Some` when `Object` is owned.
513 self.state.as_ref().unwrap().status
514 }
515}
516
517/// A wrapper of ObjectStatus that detaches the object from the pool when dropped.
518struct UnreadyObject<M: ManageObject> {
519 state: Option<ObjectState<M::Object>>,
520 pool: Weak<Pool<M>>,
521}
522
523impl<M: ManageObject> Drop for UnreadyObject<M> {
524 fn drop(&mut self) {
525 if let Some(mut state) = self.state.take() {
526 if let Some(pool) = self.pool.upgrade() {
527 pool.detach_object(&mut state.o, false);
528 }
529 }
530 }
531}
532
533impl<M: ManageObject> UnreadyObject<M> {
534 fn ready(mut self, permit: OwnedSemaphorePermit) -> Object<M> {
535 // SAFETY: `state` is always `Some` when `UnreadyObject` is owned.
536 let state = Some(self.state.take().unwrap());
537 let pool = self.pool.clone();
538 Object {
539 state,
540 permit,
541 pool,
542 }
543 }
544
545 fn state(&mut self) -> &mut ObjectState<M::Object> {
546 // SAFETY: `state` is always `Some` when `UnreadyObject` is owned.
547 self.state.as_mut().unwrap()
548 }
549}
550
551#[derive(Debug)]
552struct ObjectState<T> {
553 o: T,
554 status: ObjectStatus,
555}
556
557impl<T> retain_spec::SealedState for ObjectState<T> {
558 type Object = T;
559
560 fn status(&self) -> ObjectStatus {
561 self.status
562 }
563
564 fn mut_object(&mut self) -> &mut Self::Object {
565 &mut self.o
566 }
567
568 fn take_object(self) -> Self::Object {
569 self.o
570 }
571}