pool_mod/pool.rs
1//! The pool itself: [`Pool`] and its [`Builder`].
2
3use std::collections::VecDeque;
4use std::sync::{Arc, Condvar, Mutex, MutexGuard, PoisonError, Weak};
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crate::config::PoolConfig;
9use crate::error::Error;
10use crate::manager::Manager;
11use crate::object::Pooled;
12use crate::status::Status;
13
14/// Acquire a mutex guard, recovering the data even if a previous holder panicked.
15///
16/// The pool only mutates plain counters and a queue while the lock is held, never
17/// running user code, so the protected state is always consistent at an unlock
18/// point. Honouring poison would convert an unrelated panic elsewhere into a
19/// permanent, unrecoverable pool outage, which is the worse failure mode.
20#[inline]
21pub(crate) fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
22 mutex.lock().unwrap_or_else(PoisonError::into_inner)
23}
24
25/// A resource resting in the idle set, tagged with the timestamps the pool uses
26/// to enforce `idle_timeout` and `max_lifetime`.
27pub(crate) struct Idle<R> {
28 pub(crate) resource: R,
29 pub(crate) created_at: Instant,
30 pub(crate) last_used: Instant,
31}
32
33/// The mutable inner state guarded by the pool's mutex.
34struct State<R> {
35 idle: VecDeque<Idle<R>>,
36 /// Resources the pool currently owns: idle, checked out, or mid-creation.
37 total: usize,
38 /// Threads currently blocked in [`PoolInner::acquire`] waiting on `available`.
39 /// Maintained under the lock so the wakers can skip the condvar entirely when
40 /// no one is waiting — the common, uncontended case.
41 waiters: usize,
42 closed: bool,
43}
44
45/// The decision reached while holding the lock, carried out once it is released.
46///
47/// Only outcomes that require running user code — and therefore must not hold the
48/// lock — escape the locked region; waiting and immediate errors are handled in
49/// place.
50enum Action<R> {
51 Reuse(Idle<R>),
52 Create,
53}
54
55/// The stop signal shared between the pool and its background reaper thread.
56///
57/// Held by both sides through an [`Arc`] so it outlives the [`PoolInner`] it
58/// guards: when the last pool handle drops, `PoolInner`'s destructor sets the
59/// flag and wakes the reaper, which then exits.
60struct Shutdown {
61 stop: Mutex<bool>,
62 wake: Condvar,
63}
64
65impl Shutdown {
66 fn new() -> Self {
67 Shutdown {
68 stop: Mutex::new(false),
69 wake: Condvar::new(),
70 }
71 }
72
73 /// Signal the reaper to stop and wake it immediately.
74 fn signal(&self) {
75 let mut stop = lock(&self.stop);
76 *stop = true;
77 drop(stop);
78 self.wake.notify_all();
79 }
80}
81
82/// Shared pool state behind an [`Arc`]. Every [`Pool`] handle and every live
83/// [`Pooled`] guard holds one of these.
84pub(crate) struct PoolInner<M: Manager> {
85 pub(crate) manager: M,
86 config: PoolConfig,
87 state: Mutex<State<M::Resource>>,
88 available: Condvar,
89 shutdown: Arc<Shutdown>,
90}
91
92impl<M: Manager> Drop for PoolInner<M> {
93 fn drop(&mut self) {
94 // The last handle is gone. Stop the reaper (if one is running) so it does
95 // not outlive the pool it maintains.
96 self.shutdown.signal();
97 }
98}
99
100impl<M: Manager> PoolInner<M> {
101 /// Take a resource out of the pool, blocking until one is free, a slot opens
102 /// for a fresh one, or the deadline passes.
103 fn acquire(
104 &self,
105 deadline: Option<Instant>,
106 ) -> Result<(M::Resource, Instant), Error<M::Error>> {
107 loop {
108 let action = {
109 let mut state = lock(&self.state);
110 loop {
111 if state.closed {
112 return Err(Error::Closed);
113 }
114 if let Some(idle) = state.idle.pop_front() {
115 break Action::Reuse(idle);
116 }
117 if state.total < self.config.max_size {
118 state.total += 1;
119 break Action::Create;
120 }
121 // Saturated: wait for a check-in or a close, then re-evaluate.
122 // `waiters` is bumped immediately before sleeping and dropped
123 // the moment we wake, both under the lock, so a waker can tell
124 // whether a signal is needed at all.
125 match deadline {
126 None => {
127 state.waiters += 1;
128 state = self
129 .available
130 .wait(state)
131 .unwrap_or_else(PoisonError::into_inner);
132 state.waiters -= 1;
133 }
134 Some(dl) => {
135 let now = Instant::now();
136 if now >= dl {
137 return Err(Error::Timeout);
138 }
139 state.waiters += 1;
140 let (guard, _) = self
141 .available
142 .wait_timeout(state, dl - now)
143 .unwrap_or_else(PoisonError::into_inner);
144 state = guard;
145 state.waiters -= 1;
146 }
147 }
148 }
149 };
150
151 match action {
152 Action::Reuse(idle) => {
153 if let Some(prepared) = self.prepare(idle) {
154 return Ok(prepared);
155 }
156 // The idle resource was stale or invalid and has been dropped;
157 // release its slot and try again from the top.
158 self.release_slot();
159 }
160 Action::Create => match self.manager.create() {
161 Ok(resource) => return Ok((resource, Instant::now())),
162 Err(source) => {
163 self.release_slot();
164 return Err(Error::Backend(source));
165 }
166 },
167 }
168 }
169 }
170
171 /// Apply lifetime, idle-timeout, and validation checks to an idle resource.
172 ///
173 /// Returns the resource and its original creation time on success; returns
174 /// `None` (dropping the resource) when it is too old, has sat idle too long,
175 /// or fails validation.
176 fn prepare(&self, mut idle: Idle<M::Resource>) -> Option<(M::Resource, Instant)> {
177 if self.is_time_expired(&idle, Instant::now()) {
178 return None;
179 }
180 if !self.manager.validate(&mut idle.resource) {
181 return None;
182 }
183 Some((idle.resource, idle.created_at))
184 }
185
186 /// Whether an idle resource has outlived `max_lifetime` or `idle_timeout`.
187 ///
188 /// This is the time-based half of expiry, shared by checkout
189 /// ([`prepare`](Self::prepare)) and the background [`reap`](Self::reap); it
190 /// deliberately does not call [`Manager::validate`], which is a checkout-time
191 /// concern.
192 fn is_time_expired(&self, idle: &Idle<M::Resource>, now: Instant) -> bool {
193 if let Some(max_lifetime) = self.config.max_lifetime {
194 if now.saturating_duration_since(idle.created_at) >= max_lifetime {
195 return true;
196 }
197 }
198 if let Some(idle_timeout) = self.config.idle_timeout {
199 if now.saturating_duration_since(idle.last_used) >= idle_timeout {
200 return true;
201 }
202 }
203 false
204 }
205
206 /// Prune idle resources that have outlived their `idle_timeout` or
207 /// `max_lifetime`. Called on each tick of the background reaper.
208 ///
209 /// Expired resources are removed under the lock but dropped outside it, so a
210 /// slow destructor does not stall checkouts. The reaper never creates
211 /// resources; pruning shrinks the idle set, and on-demand growth refills it.
212 fn reap(&self) {
213 if self.config.idle_timeout.is_none() && self.config.max_lifetime.is_none() {
214 return;
215 }
216 let now = Instant::now();
217 let mut expired = Vec::new();
218 let waiters;
219 {
220 let mut state = lock(&self.state);
221 if state.closed {
222 return;
223 }
224 let mut kept = VecDeque::with_capacity(state.idle.len());
225 while let Some(idle) = state.idle.pop_front() {
226 if self.is_time_expired(&idle, now) {
227 expired.push(idle.resource);
228 } else {
229 kept.push_back(idle);
230 }
231 }
232 state.total = state.total.saturating_sub(expired.len());
233 state.idle = kept;
234 waiters = state.waiters;
235 }
236 drop(expired); // destructors run here, outside the lock
237 // Freed slots may unblock threads waiting to create a resource.
238 self.wake_all(waiters);
239 }
240
241 /// Give back a reserved slot and wake one waiter, if any, so it can claim it.
242 fn release_slot(&self) {
243 let mut state = lock(&self.state);
244 state.total = state.total.saturating_sub(1);
245 let waiters = state.waiters;
246 drop(state);
247 self.wake_one(waiters);
248 }
249
250 /// Return a borrowed resource to the pool. Called from [`Pooled`]'s `Drop`.
251 pub(crate) fn checkin(&self, mut resource: M::Resource, created_at: Instant) {
252 let recycled = self.manager.recycle(&mut resource);
253 let mut state = lock(&self.state);
254 let waiters = state.waiters;
255 if state.closed || recycled.is_err() {
256 state.total = state.total.saturating_sub(1);
257 drop(state);
258 self.wake_one(waiters);
259 // `resource` is dropped here, outside the lock.
260 } else {
261 let last_used = Instant::now();
262 state.idle.push_back(Idle {
263 resource,
264 created_at,
265 last_used,
266 });
267 drop(state);
268 self.wake_one(waiters);
269 }
270 }
271
272 /// Wake a single blocked acquirer, but only if one is actually waiting.
273 ///
274 /// `waiters` is the count read under the lock by the caller; skipping the
275 /// condvar when it is zero keeps the uncontended checkout/return path free of
276 /// needless signaling.
277 #[inline]
278 fn wake_one(&self, waiters: usize) {
279 if waiters > 0 {
280 self.available.notify_one();
281 }
282 }
283
284 /// Wake every blocked acquirer, but only if any are waiting.
285 #[inline]
286 fn wake_all(&self, waiters: usize) {
287 if waiters > 0 {
288 self.available.notify_all();
289 }
290 }
291}
292
293/// The body of the background reaper thread.
294///
295/// Sleeps for `interval` between sweeps, waking early when the shutdown signal
296/// fires. It holds only a [`Weak`] reference to the pool, so it never keeps the
297/// pool alive; once every handle is dropped (`upgrade` returns `None`) or the
298/// stop flag is set, it returns and the thread ends.
299fn reaper_loop<M: Manager>(pool: Weak<PoolInner<M>>, shutdown: Arc<Shutdown>, interval: Duration) {
300 loop {
301 {
302 let stop = lock(&shutdown.stop);
303 if *stop {
304 return;
305 }
306 let (stop, _timed_out) = shutdown
307 .wake
308 .wait_timeout(stop, interval)
309 .unwrap_or_else(PoisonError::into_inner);
310 if *stop {
311 return;
312 }
313 }
314 match pool.upgrade() {
315 Some(inner) => inner.reap(),
316 None => return,
317 }
318 }
319}
320
321/// A thread-safe pool of reusable resources.
322///
323/// A `Pool<M>` lends out resources built by a [`Manager`], reclaiming and
324/// recycling each one when its [`Pooled`] guard is dropped. It is cheap to clone
325/// — every clone is a handle onto the same shared pool — so share it across
326/// threads by cloning rather than wrapping it in another `Arc`.
327///
328/// The pool is runtime-agnostic and carries no async dependency.
329/// [`get`](Pool::get) blocks the calling thread until a resource is available; in
330/// an async context, acquire on a blocking-friendly executor thread (for example
331/// `tokio::task::spawn_blocking`). The returned guard is `Send`, so it can be
332/// held across `.await` points.
333///
334/// # Examples
335///
336/// ```
337/// use pool_mod::{Manager, Pool};
338/// use std::convert::Infallible;
339///
340/// struct Connections;
341/// impl Manager for Connections {
342/// type Resource = String;
343/// type Error = Infallible;
344/// fn create(&self) -> Result<String, Infallible> { Ok(String::new()) }
345/// fn recycle(&self, c: &mut String) -> Result<(), Infallible> { c.clear(); Ok(()) }
346/// }
347///
348/// let pool = Pool::builder(Connections).max_size(4).build()
349/// .expect("configuration is valid");
350///
351/// let mut conn = pool.get().expect("a connection is available");
352/// conn.push_str("SELECT 1");
353/// assert_eq!(pool.status().in_use, 1);
354/// drop(conn);
355/// assert_eq!(pool.status().in_use, 0);
356/// ```
357pub struct Pool<M: Manager>(Arc<PoolInner<M>>);
358
359impl<M: Manager> Clone for Pool<M> {
360 fn clone(&self) -> Self {
361 Pool(Arc::clone(&self.0))
362 }
363}
364
365impl<M: Manager> Pool<M> {
366 /// Start building a pool for `manager` with the default configuration.
367 ///
368 /// # Examples
369 ///
370 /// ```
371 /// use pool_mod::{Manager, Pool};
372 /// use std::convert::Infallible;
373 /// # struct M;
374 /// # impl Manager for M {
375 /// # type Resource = u32; type Error = Infallible;
376 /// # fn create(&self) -> Result<u32, Infallible> { Ok(0) }
377 /// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
378 /// # }
379 /// let pool = Pool::builder(M).max_size(8).min_idle(2).build()
380 /// .expect("configuration is valid");
381 /// assert_eq!(pool.status().max_size, 8);
382 /// ```
383 pub fn builder(manager: M) -> Builder<M> {
384 Builder::new(manager)
385 }
386
387 /// Build a pool for `manager` with the [default configuration](PoolConfig::default).
388 ///
389 /// A shortcut for `Pool::builder(manager).build()`.
390 ///
391 /// # Errors
392 ///
393 /// Returns [`Error::Backend`] if pre-creating the initial resources fails.
394 /// (With the default `min_idle` of 0, no resources are created up front, so
395 /// the default-configured pool only fails to build if you have customized the
396 /// configuration through [`Pool::builder`] instead.)
397 ///
398 /// # Examples
399 ///
400 /// ```
401 /// use pool_mod::{Manager, Pool};
402 /// use std::convert::Infallible;
403 /// # struct M;
404 /// # impl Manager for M {
405 /// # type Resource = u32; type Error = Infallible;
406 /// # fn create(&self) -> Result<u32, Infallible> { Ok(0) }
407 /// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
408 /// # }
409 /// let pool = Pool::new(M).expect("configuration is valid");
410 /// assert_eq!(pool.status().max_size, 10); // the default
411 /// ```
412 pub fn new(manager: M) -> Result<Self, Error<M::Error>> {
413 Builder::new(manager).build()
414 }
415
416 /// Borrow a resource, waiting up to the configured
417 /// [`create_timeout`](PoolConfig::create_timeout) if the pool is saturated.
418 ///
419 /// Reuses an idle resource when one is available (after validation), grows the
420 /// pool toward `max_size` when it is not, and otherwise blocks until a
421 /// resource is returned or the timeout elapses.
422 ///
423 /// # Errors
424 ///
425 /// - [`Error::Backend`] if the manager fails to create a resource.
426 /// - [`Error::Timeout`] if the pool stays saturated past `create_timeout`.
427 /// - [`Error::Closed`] if the pool has been closed.
428 ///
429 /// # Examples
430 ///
431 /// ```
432 /// use pool_mod::{Manager, Pool};
433 /// use std::convert::Infallible;
434 /// # struct M;
435 /// # impl Manager for M {
436 /// # type Resource = u32; type Error = Infallible;
437 /// # fn create(&self) -> Result<u32, Infallible> { Ok(7) }
438 /// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
439 /// # }
440 /// let pool = Pool::builder(M).max_size(2).build().expect("valid");
441 /// let resource = pool.get().expect("available");
442 /// assert_eq!(*resource, 7);
443 /// ```
444 pub fn get(&self) -> Result<Pooled<M>, Error<M::Error>> {
445 let deadline = self
446 .0
447 .config
448 .create_timeout
449 .map(|timeout| Instant::now() + timeout);
450 self.acquire(deadline)
451 }
452
453 /// Borrow a resource, waiting at most `timeout` regardless of the configured
454 /// [`create_timeout`](PoolConfig::create_timeout).
455 ///
456 /// A `timeout` of [`Duration::ZERO`] makes this a non-blocking try: it returns
457 /// [`Error::Timeout`] at once if no resource can be handed out immediately.
458 ///
459 /// # Errors
460 ///
461 /// - [`Error::Backend`] if the manager fails to create a resource.
462 /// - [`Error::Timeout`] if no resource becomes available within `timeout`.
463 /// - [`Error::Closed`] if the pool has been closed.
464 ///
465 /// # Examples
466 ///
467 /// ```
468 /// use std::time::Duration;
469 /// use pool_mod::{Error, Manager, Pool};
470 /// use std::convert::Infallible;
471 /// # struct M;
472 /// # impl Manager for M {
473 /// # type Resource = u32; type Error = Infallible;
474 /// # fn create(&self) -> Result<u32, Infallible> { Ok(0) }
475 /// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
476 /// # }
477 /// let pool = Pool::builder(M).max_size(1).build().expect("valid");
478 /// let held = pool.get().expect("first checkout");
479 /// // The single slot is taken, so an immediate retry times out.
480 /// assert!(matches!(pool.get_timeout(Duration::ZERO), Err(Error::Timeout)));
481 /// ```
482 pub fn get_timeout(&self, timeout: Duration) -> Result<Pooled<M>, Error<M::Error>> {
483 self.acquire(Some(Instant::now() + timeout))
484 }
485
486 /// Borrow a resource without ever blocking.
487 ///
488 /// Returns a resource if one can be handed out immediately — an idle resource
489 /// is ready, or the pool has room to create one — and otherwise returns
490 /// [`Error::Timeout`] at once. Equivalent to
491 /// [`get_timeout(Duration::ZERO)`](Pool::get_timeout).
492 ///
493 /// # Errors
494 ///
495 /// - [`Error::Backend`] if the manager fails to create a resource.
496 /// - [`Error::Timeout`] if no resource is immediately available.
497 /// - [`Error::Closed`] if the pool has been closed.
498 ///
499 /// # Examples
500 ///
501 /// ```
502 /// use pool_mod::{Error, Manager, Pool};
503 /// use std::convert::Infallible;
504 /// # struct M;
505 /// # impl Manager for M {
506 /// # type Resource = u32; type Error = Infallible;
507 /// # fn create(&self) -> Result<u32, Infallible> { Ok(0) }
508 /// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
509 /// # }
510 /// let pool = Pool::builder(M).max_size(1).build().expect("valid");
511 /// let first = pool.try_get().expect("room to create one");
512 /// // The only slot is taken, so the next try fails immediately.
513 /// assert!(matches!(pool.try_get(), Err(Error::Timeout)));
514 /// ```
515 pub fn try_get(&self) -> Result<Pooled<M>, Error<M::Error>> {
516 self.acquire(Some(Instant::now()))
517 }
518
519 fn acquire(&self, deadline: Option<Instant>) -> Result<Pooled<M>, Error<M::Error>> {
520 let (resource, created_at) = self.0.acquire(deadline)?;
521 Ok(Pooled::new(Arc::clone(&self.0), resource, created_at))
522 }
523
524 /// Take a snapshot of the pool's current occupancy.
525 ///
526 /// # Examples
527 ///
528 /// ```
529 /// use pool_mod::{Manager, Pool};
530 /// use std::convert::Infallible;
531 /// # struct M;
532 /// # impl Manager for M {
533 /// # type Resource = (); type Error = Infallible;
534 /// # fn create(&self) -> Result<(), Infallible> { Ok(()) }
535 /// # fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
536 /// # }
537 /// let pool = Pool::builder(M).max_size(4).min_idle(1).build().expect("valid");
538 /// let status = pool.status();
539 /// assert_eq!(status.idle, 1);
540 /// assert_eq!(status.max_size, 4);
541 /// ```
542 pub fn status(&self) -> Status {
543 let state = lock(&self.0.state);
544 let idle = state.idle.len();
545 let size = state.total;
546 Status {
547 size,
548 idle,
549 in_use: size.saturating_sub(idle),
550 max_size: self.0.config.max_size,
551 }
552 }
553
554 /// Close the pool: discard every idle resource and reject all future
555 /// checkouts with [`Error::Closed`].
556 ///
557 /// Resources currently checked out are unaffected and are simply dropped
558 /// (not returned to the idle set) when their guards fall. Closing is
559 /// idempotent. Idle resources are dropped outside the pool's lock, so a slow
560 /// resource destructor does not block other threads.
561 ///
562 /// # Examples
563 ///
564 /// ```
565 /// use pool_mod::{Error, Manager, Pool};
566 /// use std::convert::Infallible;
567 /// # struct M;
568 /// # impl Manager for M {
569 /// # type Resource = (); type Error = Infallible;
570 /// # fn create(&self) -> Result<(), Infallible> { Ok(()) }
571 /// # fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
572 /// # }
573 /// let pool = Pool::builder(M).max_size(2).min_idle(2).build().expect("valid");
574 /// pool.close();
575 /// assert!(pool.is_closed());
576 /// assert!(matches!(pool.get(), Err(Error::Closed)));
577 /// ```
578 pub fn close(&self) {
579 let mut state = lock(&self.0.state);
580 let drained = std::mem::take(&mut state.idle);
581 state.total = state.total.saturating_sub(drained.len());
582 state.closed = true;
583 let waiters = state.waiters;
584 drop(state);
585 self.0.wake_all(waiters); // wake blocked acquirers so they observe the close
586 self.0.shutdown.signal(); // stop the reaper; a closed pool needs no upkeep
587 drop(drained); // resource destructors run here, outside the lock
588 }
589
590 /// Report whether the pool has been [closed](Pool::close).
591 #[must_use]
592 pub fn is_closed(&self) -> bool {
593 lock(&self.0.state).closed
594 }
595}
596
597/// A fluent builder for a [`Pool`].
598///
599/// Created by [`Pool::builder`]. Each setter consumes and returns the builder, so
600/// calls chain; [`build`](Builder::build) validates the configuration and
601/// pre-creates the `min_idle` resources.
602///
603/// # Examples
604///
605/// ```
606/// use std::time::Duration;
607/// use pool_mod::{Manager, Pool};
608/// use std::convert::Infallible;
609/// # struct M;
610/// # impl Manager for M {
611/// # type Resource = u32; type Error = Infallible;
612/// # fn create(&self) -> Result<u32, Infallible> { Ok(0) }
613/// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
614/// # }
615/// let pool = Pool::builder(M)
616/// .max_size(32)
617/// .min_idle(4)
618/// .idle_timeout(Some(Duration::from_secs(600)))
619/// .max_lifetime(Some(Duration::from_secs(3600)))
620/// .build()
621/// .expect("configuration is valid");
622/// assert_eq!(pool.status().idle, 4);
623/// ```
624#[must_use = "a Builder does nothing until `.build()` is called"]
625pub struct Builder<M: Manager> {
626 manager: M,
627 config: PoolConfig,
628}
629
630impl<M: Manager> Builder<M> {
631 /// Create a builder for `manager` seeded with the default configuration.
632 pub fn new(manager: M) -> Self {
633 Builder {
634 manager,
635 config: PoolConfig::default(),
636 }
637 }
638
639 /// Set the maximum number of resources the pool may own at once.
640 pub fn max_size(mut self, max_size: usize) -> Self {
641 self.config.max_size = max_size;
642 self
643 }
644
645 /// Set how many resources to create up front and keep ready.
646 pub fn min_idle(mut self, min_idle: usize) -> Self {
647 self.config.min_idle = min_idle;
648 self
649 }
650
651 /// Set how long [`Pool::get`] waits when the pool is saturated. `None` waits
652 /// indefinitely.
653 pub fn create_timeout(mut self, timeout: Option<Duration>) -> Self {
654 self.config.create_timeout = timeout;
655 self
656 }
657
658 /// Set the idle-expiry window. `None` disables idle expiry.
659 pub fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
660 self.config.idle_timeout = timeout;
661 self
662 }
663
664 /// Set the maximum resource lifetime. `None` disables lifetime expiry.
665 pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
666 self.config.max_lifetime = lifetime;
667 self
668 }
669
670 /// Set how often a background thread prunes expired idle resources. `None`
671 /// (the default) runs no background thread, applying expiry lazily on borrow.
672 ///
673 /// Has no effect unless `idle_timeout` or `max_lifetime` is also set.
674 pub fn reap_interval(mut self, interval: Option<Duration>) -> Self {
675 self.config.reap_interval = interval;
676 self
677 }
678
679 /// Replace the entire configuration with `config`.
680 ///
681 /// Useful when the configuration is loaded from a file rather than assembled
682 /// setter by setter.
683 pub fn config(mut self, config: PoolConfig) -> Self {
684 self.config = config;
685 self
686 }
687
688 /// Validate the configuration, build the pool, and pre-create `min_idle`
689 /// resources.
690 ///
691 /// # Errors
692 ///
693 /// - [`Error::InvalidConfig`] if `max_size` is zero or `min_idle` exceeds
694 /// `max_size`.
695 /// - [`Error::Backend`] if creating one of the `min_idle` resources fails;
696 /// any already-created resources are dropped before returning.
697 ///
698 /// # Examples
699 ///
700 /// ```
701 /// use pool_mod::{Error, Manager, Pool};
702 /// use std::convert::Infallible;
703 /// # struct M;
704 /// # impl Manager for M {
705 /// # type Resource = (); type Error = Infallible;
706 /// # fn create(&self) -> Result<(), Infallible> { Ok(()) }
707 /// # fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
708 /// # }
709 /// let invalid = Pool::builder(M).max_size(0).build();
710 /// assert!(matches!(invalid, Err(Error::InvalidConfig(_))));
711 /// ```
712 pub fn build(self) -> Result<Pool<M>, Error<M::Error>> {
713 if self.config.max_size == 0 {
714 return Err(Error::InvalidConfig("max_size must be at least 1"));
715 }
716 if self.config.min_idle > self.config.max_size {
717 return Err(Error::InvalidConfig("min_idle must not exceed max_size"));
718 }
719
720 let pool = Pool(Arc::new(PoolInner {
721 manager: self.manager,
722 config: self.config,
723 state: Mutex::new(State {
724 idle: VecDeque::with_capacity(self.config.max_size),
725 total: 0,
726 waiters: 0,
727 closed: false,
728 }),
729 available: Condvar::new(),
730 shutdown: Arc::new(Shutdown::new()),
731 }));
732
733 for _ in 0..pool.0.config.min_idle {
734 match pool.0.manager.create() {
735 Ok(resource) => {
736 let now = Instant::now();
737 let mut state = lock(&pool.0.state);
738 state.idle.push_back(Idle {
739 resource,
740 created_at: now,
741 last_used: now,
742 });
743 state.total += 1;
744 }
745 Err(source) => return Err(Error::Backend(source)),
746 }
747 }
748
749 pool.spawn_reaper();
750
751 Ok(pool)
752 }
753}
754
755impl<M: Manager> Pool<M> {
756 /// Start the background reaper if `reap_interval` is configured.
757 ///
758 /// The reaper holds only a [`Weak`] handle, so it never keeps the pool alive;
759 /// it stops when every handle is dropped or the pool is closed. If the OS
760 /// refuses a new thread, the pool keeps working with expiry applied lazily on
761 /// checkout — the reaper is an optimization, not a correctness requirement.
762 fn spawn_reaper(&self) {
763 let Some(interval) = self.0.config.reap_interval else {
764 return;
765 };
766 let pool = Arc::downgrade(&self.0);
767 let shutdown = Arc::clone(&self.0.shutdown);
768 match thread::Builder::new()
769 .name("pool-mod-reaper".to_owned())
770 .spawn(move || reaper_loop(pool, shutdown, interval))
771 {
772 // Detach the handle; the reaper self-terminates via the shutdown
773 // signal and its `Weak` reference.
774 Ok(handle) => drop(handle),
775 // Spawn failed: fall back to lazy, checkout-time expiry.
776 Err(_) => self.0.shutdown.signal(),
777 }
778 }
779}
780
781#[cfg(test)]
782// Justification: in test code a failed setup or checkout has no meaningful
783// recovery — unwinding with a clear panic is the correct outcome. REPS permits
784// `unwrap`/`expect` in test modules for exactly this reason.
785#[allow(clippy::unwrap_used, clippy::expect_used)]
786mod tests {
787 use super::*;
788 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
789
790 #[derive(Debug, PartialEq, Eq)]
791 struct TestError(&'static str);
792
793 impl std::fmt::Display for TestError {
794 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
795 f.write_str(self.0)
796 }
797 }
798
799 impl std::error::Error for TestError {}
800
801 /// A manager whose behaviour is steerable through atomics so individual
802 /// lifecycle paths can be exercised deterministically.
803 struct Steerable {
804 created: AtomicUsize,
805 recycled: AtomicUsize,
806 validated: AtomicUsize,
807 create_fails: AtomicBool,
808 recycle_fails: AtomicBool,
809 valid: AtomicBool,
810 }
811
812 impl Steerable {
813 fn new() -> Self {
814 Steerable {
815 created: AtomicUsize::new(0),
816 recycled: AtomicUsize::new(0),
817 validated: AtomicUsize::new(0),
818 create_fails: AtomicBool::new(false),
819 recycle_fails: AtomicBool::new(false),
820 valid: AtomicBool::new(true),
821 }
822 }
823 }
824
825 impl Manager for Steerable {
826 type Resource = usize;
827 type Error = TestError;
828
829 fn create(&self) -> Result<usize, TestError> {
830 if self.create_fails.load(Ordering::SeqCst) {
831 return Err(TestError("create failed"));
832 }
833 Ok(self.created.fetch_add(1, Ordering::SeqCst))
834 }
835
836 fn recycle(&self, _resource: &mut usize) -> Result<(), TestError> {
837 let _ = self.recycled.fetch_add(1, Ordering::SeqCst);
838 if self.recycle_fails.load(Ordering::SeqCst) {
839 return Err(TestError("recycle failed"));
840 }
841 Ok(())
842 }
843
844 fn validate(&self, _resource: &mut usize) -> bool {
845 let _ = self.validated.fetch_add(1, Ordering::SeqCst);
846 self.valid.load(Ordering::SeqCst)
847 }
848 }
849
850 fn pool(builder: impl FnOnce(Builder<Steerable>) -> Builder<Steerable>) -> Pool<Steerable> {
851 builder(Pool::builder(Steerable::new())).build().unwrap()
852 }
853
854 #[test]
855 fn test_build_min_idle_precreates_resources() {
856 let p = pool(|b| b.max_size(4).min_idle(2));
857 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
858 let status = p.status();
859 assert_eq!(status.idle, 2);
860 assert_eq!(status.size, 2);
861 assert_eq!(status.in_use, 0);
862 }
863
864 #[test]
865 fn test_get_then_drop_reuses_same_resource() {
866 let p = pool(|b| b.max_size(4));
867 {
868 let first = p.get().unwrap();
869 assert_eq!(*first, 0);
870 }
871 let second = p.get().unwrap();
872 assert_eq!(*second, 0); // same resource id, reused
873 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 1);
874 assert_eq!(p.0.manager.recycled.load(Ordering::SeqCst), 1);
875 }
876
877 #[test]
878 fn test_in_use_tracks_outstanding_guards() {
879 let p = pool(|b| b.max_size(2));
880 let a = p.get().unwrap();
881 let b = p.get().unwrap();
882 assert_eq!(p.status().in_use, 2);
883 assert_eq!(p.status().idle, 0);
884 drop(a);
885 drop(b);
886 assert_eq!(p.status().in_use, 0);
887 assert_eq!(p.status().idle, 2);
888 }
889
890 #[test]
891 fn test_saturated_pool_times_out() {
892 let p = pool(|b| b.max_size(1));
893 let _held = p.get().unwrap();
894 let result = p.get_timeout(Duration::ZERO);
895 assert!(matches!(result, Err(Error::Timeout)));
896 }
897
898 #[test]
899 fn test_invalid_resource_is_discarded_and_replaced() {
900 let p = pool(|b| b.max_size(4).min_idle(1));
901 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 1);
902 p.0.manager.valid.store(false, Ordering::SeqCst);
903
904 // The single idle resource fails validation, so it is dropped and a fresh
905 // one is created. (The fresh resource is not itself re-validated.)
906 let resource = p.get().unwrap();
907 assert_eq!(*resource, 1);
908 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
909 assert!(p.0.manager.validated.load(Ordering::SeqCst) >= 1);
910 }
911
912 #[test]
913 fn test_max_lifetime_forces_replacement() {
914 let p = pool(|b| b.max_size(4).min_idle(1).max_lifetime(Some(Duration::ZERO)));
915 // Zero lifetime means any idle resource is always too old on checkout.
916 let resource = p.get().unwrap();
917 assert_eq!(*resource, 1);
918 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
919 }
920
921 #[test]
922 fn test_idle_timeout_forces_replacement() {
923 let p = pool(|b| b.max_size(4).min_idle(1).idle_timeout(Some(Duration::ZERO)));
924 let resource = p.get().unwrap();
925 assert_eq!(*resource, 1);
926 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
927 }
928
929 #[test]
930 fn test_recycle_failure_drops_resource() {
931 let p = pool(|b| b.max_size(2));
932 p.0.manager.recycle_fails.store(true, Ordering::SeqCst);
933 {
934 let _resource = p.get().unwrap();
935 assert_eq!(p.status().size, 1);
936 }
937 // Recycle failed on return, so the resource was discarded, not pooled.
938 assert_eq!(p.status().size, 0);
939 assert_eq!(p.status().idle, 0);
940 }
941
942 #[test]
943 fn test_create_failure_surfaces_and_frees_slot() {
944 let p = pool(|b| b.max_size(2));
945 p.0.manager.create_fails.store(true, Ordering::SeqCst);
946 let result = p.get();
947 assert!(matches!(
948 result,
949 Err(Error::Backend(TestError("create failed")))
950 ));
951 // The reserved slot was released, so the pool did not shrink.
952 assert_eq!(p.status().size, 0);
953 }
954
955 #[test]
956 fn test_closed_pool_rejects_checkout() {
957 let p = pool(|b| b.max_size(2).min_idle(1));
958 p.close();
959 assert!(p.is_closed());
960 assert!(matches!(p.get(), Err(Error::Closed)));
961 assert_eq!(p.status().idle, 0); // idle resources were dropped on close
962 }
963
964 #[test]
965 fn test_close_is_idempotent() {
966 let p = pool(|b| b.max_size(2).min_idle(2));
967 p.close();
968 p.close();
969 assert!(p.is_closed());
970 }
971
972 #[test]
973 fn test_build_rejects_zero_max_size() {
974 let result = Pool::builder(Steerable::new()).max_size(0).build();
975 assert!(matches!(result, Err(Error::InvalidConfig(_))));
976 }
977
978 #[test]
979 fn test_build_rejects_min_idle_above_max_size() {
980 let result = Pool::builder(Steerable::new())
981 .max_size(2)
982 .min_idle(3)
983 .build();
984 assert!(matches!(result, Err(Error::InvalidConfig(_))));
985 }
986
987 #[test]
988 fn test_try_get_does_not_block_when_saturated() {
989 let p = pool(|b| b.max_size(1));
990 let _held = p.try_get().unwrap();
991 assert!(matches!(p.try_get(), Err(Error::Timeout)));
992 }
993
994 #[test]
995 fn test_reap_prunes_time_expired_idle() {
996 // A zero idle_timeout makes every idle resource immediately expired, so
997 // `reap` is deterministic without a background thread or sleeps.
998 let p = pool(|b| b.max_size(4).min_idle(2).idle_timeout(Some(Duration::ZERO)));
999 assert_eq!(p.status().idle, 2);
1000 p.0.reap();
1001 assert_eq!(p.status().idle, 0);
1002 assert_eq!(p.status().size, 0);
1003 }
1004
1005 #[test]
1006 fn test_reap_is_noop_without_expiry_policy() {
1007 let p = pool(|b| b.max_size(4).min_idle(2));
1008 p.0.reap();
1009 assert_eq!(p.status().idle, 2); // no idle_timeout/max_lifetime: nothing to prune
1010 }
1011
1012 #[test]
1013 fn test_clone_shares_one_pool() {
1014 let p = pool(|b| b.max_size(1));
1015 let clone = p.clone();
1016 let _held = p.get().unwrap();
1017 // The clone sees the same exhausted pool.
1018 assert!(matches!(
1019 clone.get_timeout(Duration::ZERO),
1020 Err(Error::Timeout)
1021 ));
1022 }
1023}