pool_mod/pool.rs
1//! The pool itself: [`Pool`] and its [`Builder`].
2
3use std::collections::VecDeque;
4use std::sync::{Arc, Condvar, Mutex, MutexGuard, PoisonError, Weak};
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crate::config::PoolConfig;
9use crate::error::Error;
10use crate::manager::Manager;
11use crate::object::Pooled;
12use crate::status::Status;
13
14/// Acquire a mutex guard, recovering the data even if a previous holder panicked.
15///
16/// The pool only mutates plain counters and a queue while the lock is held, never
17/// running user code, so the protected state is always consistent at an unlock
18/// point. Honouring poison would convert an unrelated panic elsewhere into a
19/// permanent, unrecoverable pool outage, which is the worse failure mode.
20#[inline]
21pub(crate) fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
22 mutex.lock().unwrap_or_else(PoisonError::into_inner)
23}
24
25/// A resource resting in the idle set, tagged with the timestamps the pool uses
26/// to enforce `idle_timeout` and `max_lifetime`.
27pub(crate) struct Idle<R> {
28 pub(crate) resource: R,
29 pub(crate) created_at: Instant,
30 pub(crate) last_used: Instant,
31}
32
33/// The mutable inner state guarded by the pool's mutex.
34struct State<R> {
35 idle: VecDeque<Idle<R>>,
36 /// Resources the pool currently owns: idle, checked out, or mid-creation.
37 total: usize,
38 closed: bool,
39}
40
41/// The decision reached while holding the lock, carried out once it is released.
42///
43/// Only outcomes that require running user code — and therefore must not hold the
44/// lock — escape the locked region; waiting and immediate errors are handled in
45/// place.
46enum Action<R> {
47 Reuse(Idle<R>),
48 Create,
49}
50
51/// The stop signal shared between the pool and its background reaper thread.
52///
53/// Held by both sides through an [`Arc`] so it outlives the [`PoolInner`] it
54/// guards: when the last pool handle drops, `PoolInner`'s destructor sets the
55/// flag and wakes the reaper, which then exits.
56struct Shutdown {
57 stop: Mutex<bool>,
58 wake: Condvar,
59}
60
61impl Shutdown {
62 fn new() -> Self {
63 Shutdown {
64 stop: Mutex::new(false),
65 wake: Condvar::new(),
66 }
67 }
68
69 /// Signal the reaper to stop and wake it immediately.
70 fn signal(&self) {
71 let mut stop = lock(&self.stop);
72 *stop = true;
73 drop(stop);
74 self.wake.notify_all();
75 }
76}
77
78/// Shared pool state behind an [`Arc`]. Every [`Pool`] handle and every live
79/// [`Pooled`] guard holds one of these.
80pub(crate) struct PoolInner<M: Manager> {
81 pub(crate) manager: M,
82 config: PoolConfig,
83 state: Mutex<State<M::Resource>>,
84 available: Condvar,
85 shutdown: Arc<Shutdown>,
86}
87
88impl<M: Manager> Drop for PoolInner<M> {
89 fn drop(&mut self) {
90 // The last handle is gone. Stop the reaper (if one is running) so it does
91 // not outlive the pool it maintains.
92 self.shutdown.signal();
93 }
94}
95
96impl<M: Manager> PoolInner<M> {
97 /// Take a resource out of the pool, blocking until one is free, a slot opens
98 /// for a fresh one, or the deadline passes.
99 fn acquire(
100 &self,
101 deadline: Option<Instant>,
102 ) -> Result<(M::Resource, Instant), Error<M::Error>> {
103 loop {
104 let action = {
105 let mut state = lock(&self.state);
106 loop {
107 if state.closed {
108 return Err(Error::Closed);
109 }
110 if let Some(idle) = state.idle.pop_front() {
111 break Action::Reuse(idle);
112 }
113 if state.total < self.config.max_size {
114 state.total += 1;
115 break Action::Create;
116 }
117 // Saturated: wait for a check-in or a close, then re-evaluate.
118 match deadline {
119 None => {
120 state = self
121 .available
122 .wait(state)
123 .unwrap_or_else(PoisonError::into_inner);
124 }
125 Some(dl) => {
126 let now = Instant::now();
127 if now >= dl {
128 return Err(Error::Timeout);
129 }
130 let (guard, _) = self
131 .available
132 .wait_timeout(state, dl - now)
133 .unwrap_or_else(PoisonError::into_inner);
134 state = guard;
135 }
136 }
137 }
138 };
139
140 match action {
141 Action::Reuse(idle) => {
142 if let Some(prepared) = self.prepare(idle) {
143 return Ok(prepared);
144 }
145 // The idle resource was stale or invalid and has been dropped;
146 // release its slot and try again from the top.
147 self.release_slot();
148 }
149 Action::Create => match self.manager.create() {
150 Ok(resource) => return Ok((resource, Instant::now())),
151 Err(source) => {
152 self.release_slot();
153 return Err(Error::Backend(source));
154 }
155 },
156 }
157 }
158 }
159
160 /// Apply lifetime, idle-timeout, and validation checks to an idle resource.
161 ///
162 /// Returns the resource and its original creation time on success; returns
163 /// `None` (dropping the resource) when it is too old, has sat idle too long,
164 /// or fails validation.
165 fn prepare(&self, mut idle: Idle<M::Resource>) -> Option<(M::Resource, Instant)> {
166 if self.is_time_expired(&idle, Instant::now()) {
167 return None;
168 }
169 if !self.manager.validate(&mut idle.resource) {
170 return None;
171 }
172 Some((idle.resource, idle.created_at))
173 }
174
175 /// Whether an idle resource has outlived `max_lifetime` or `idle_timeout`.
176 ///
177 /// This is the time-based half of expiry, shared by checkout
178 /// ([`prepare`](Self::prepare)) and the background [`reap`](Self::reap); it
179 /// deliberately does not call [`Manager::validate`], which is a checkout-time
180 /// concern.
181 fn is_time_expired(&self, idle: &Idle<M::Resource>, now: Instant) -> bool {
182 if let Some(max_lifetime) = self.config.max_lifetime {
183 if now.saturating_duration_since(idle.created_at) >= max_lifetime {
184 return true;
185 }
186 }
187 if let Some(idle_timeout) = self.config.idle_timeout {
188 if now.saturating_duration_since(idle.last_used) >= idle_timeout {
189 return true;
190 }
191 }
192 false
193 }
194
195 /// Prune idle resources that have outlived their `idle_timeout` or
196 /// `max_lifetime`. Called on each tick of the background reaper.
197 ///
198 /// Expired resources are removed under the lock but dropped outside it, so a
199 /// slow destructor does not stall checkouts. The reaper never creates
200 /// resources; pruning shrinks the idle set, and on-demand growth refills it.
201 fn reap(&self) {
202 if self.config.idle_timeout.is_none() && self.config.max_lifetime.is_none() {
203 return;
204 }
205 let now = Instant::now();
206 let mut expired = Vec::new();
207 {
208 let mut state = lock(&self.state);
209 if state.closed {
210 return;
211 }
212 let mut kept = VecDeque::with_capacity(state.idle.len());
213 while let Some(idle) = state.idle.pop_front() {
214 if self.is_time_expired(&idle, now) {
215 expired.push(idle.resource);
216 } else {
217 kept.push_back(idle);
218 }
219 }
220 state.total = state.total.saturating_sub(expired.len());
221 state.idle = kept;
222 }
223 let freed = expired.len();
224 drop(expired); // destructors run here, outside the lock
225 if freed > 0 {
226 // Freed slots may unblock threads waiting to create a resource.
227 self.available.notify_all();
228 }
229 }
230
231 /// Give back a reserved slot and wake one waiter so it can claim it.
232 fn release_slot(&self) {
233 let mut state = lock(&self.state);
234 state.total = state.total.saturating_sub(1);
235 drop(state);
236 self.available.notify_one();
237 }
238
239 /// Return a borrowed resource to the pool. Called from [`Pooled`]'s `Drop`.
240 pub(crate) fn checkin(&self, mut resource: M::Resource, created_at: Instant) {
241 let recycled = self.manager.recycle(&mut resource);
242 let mut state = lock(&self.state);
243 if state.closed || recycled.is_err() {
244 state.total = state.total.saturating_sub(1);
245 drop(state);
246 self.available.notify_one();
247 // `resource` is dropped here, outside the lock.
248 } else {
249 let last_used = Instant::now();
250 state.idle.push_back(Idle {
251 resource,
252 created_at,
253 last_used,
254 });
255 drop(state);
256 self.available.notify_one();
257 }
258 }
259}
260
261/// The body of the background reaper thread.
262///
263/// Sleeps for `interval` between sweeps, waking early when the shutdown signal
264/// fires. It holds only a [`Weak`] reference to the pool, so it never keeps the
265/// pool alive; once every handle is dropped (`upgrade` returns `None`) or the
266/// stop flag is set, it returns and the thread ends.
267fn reaper_loop<M: Manager>(pool: Weak<PoolInner<M>>, shutdown: Arc<Shutdown>, interval: Duration) {
268 loop {
269 {
270 let stop = lock(&shutdown.stop);
271 if *stop {
272 return;
273 }
274 let (stop, _timed_out) = shutdown
275 .wake
276 .wait_timeout(stop, interval)
277 .unwrap_or_else(PoisonError::into_inner);
278 if *stop {
279 return;
280 }
281 }
282 match pool.upgrade() {
283 Some(inner) => inner.reap(),
284 None => return,
285 }
286 }
287}
288
289/// A thread-safe pool of reusable resources.
290///
291/// A `Pool<M>` lends out resources built by a [`Manager`], reclaiming and
292/// recycling each one when its [`Pooled`] guard is dropped. It is cheap to clone
293/// — every clone is a handle onto the same shared pool — so share it across
294/// threads by cloning rather than wrapping it in another `Arc`.
295///
296/// The pool is runtime-agnostic and carries no async dependency.
297/// [`get`](Pool::get) blocks the calling thread until a resource is available; in
298/// an async context, acquire on a blocking-friendly executor thread (for example
299/// `tokio::task::spawn_blocking`). The returned guard is `Send`, so it can be
300/// held across `.await` points.
301///
302/// # Examples
303///
304/// ```
305/// use pool_mod::{Manager, Pool};
306/// use std::convert::Infallible;
307///
308/// struct Connections;
309/// impl Manager for Connections {
310/// type Resource = String;
311/// type Error = Infallible;
312/// fn create(&self) -> Result<String, Infallible> { Ok(String::new()) }
313/// fn recycle(&self, c: &mut String) -> Result<(), Infallible> { c.clear(); Ok(()) }
314/// }
315///
316/// let pool = Pool::builder(Connections).max_size(4).build()
317/// .expect("configuration is valid");
318///
319/// let mut conn = pool.get().expect("a connection is available");
320/// conn.push_str("SELECT 1");
321/// assert_eq!(pool.status().in_use, 1);
322/// drop(conn);
323/// assert_eq!(pool.status().in_use, 0);
324/// ```
325pub struct Pool<M: Manager>(Arc<PoolInner<M>>);
326
327impl<M: Manager> Clone for Pool<M> {
328 fn clone(&self) -> Self {
329 Pool(Arc::clone(&self.0))
330 }
331}
332
333impl<M: Manager> Pool<M> {
334 /// Start building a pool for `manager` with the default configuration.
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// use pool_mod::{Manager, Pool};
340 /// use std::convert::Infallible;
341 /// # struct M;
342 /// # impl Manager for M {
343 /// # type Resource = u32; type Error = Infallible;
344 /// # fn create(&self) -> Result<u32, Infallible> { Ok(0) }
345 /// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
346 /// # }
347 /// let pool = Pool::builder(M).max_size(8).min_idle(2).build()
348 /// .expect("configuration is valid");
349 /// assert_eq!(pool.status().max_size, 8);
350 /// ```
351 pub fn builder(manager: M) -> Builder<M> {
352 Builder::new(manager)
353 }
354
355 /// Build a pool for `manager` with the [default configuration](PoolConfig::default).
356 ///
357 /// A shortcut for `Pool::builder(manager).build()`.
358 ///
359 /// # Errors
360 ///
361 /// Returns [`Error::Backend`] if pre-creating the initial resources fails.
362 /// (With the default `min_idle` of 0, no resources are created up front, so
363 /// the default-configured pool only fails to build if you have customized the
364 /// configuration through [`Pool::builder`] instead.)
365 ///
366 /// # Examples
367 ///
368 /// ```
369 /// use pool_mod::{Manager, Pool};
370 /// use std::convert::Infallible;
371 /// # struct M;
372 /// # impl Manager for M {
373 /// # type Resource = u32; type Error = Infallible;
374 /// # fn create(&self) -> Result<u32, Infallible> { Ok(0) }
375 /// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
376 /// # }
377 /// let pool = Pool::new(M).expect("configuration is valid");
378 /// assert_eq!(pool.status().max_size, 10); // the default
379 /// ```
380 pub fn new(manager: M) -> Result<Self, Error<M::Error>> {
381 Builder::new(manager).build()
382 }
383
384 /// Borrow a resource, waiting up to the configured
385 /// [`create_timeout`](PoolConfig::create_timeout) if the pool is saturated.
386 ///
387 /// Reuses an idle resource when one is available (after validation), grows the
388 /// pool toward `max_size` when it is not, and otherwise blocks until a
389 /// resource is returned or the timeout elapses.
390 ///
391 /// # Errors
392 ///
393 /// - [`Error::Backend`] if the manager fails to create a resource.
394 /// - [`Error::Timeout`] if the pool stays saturated past `create_timeout`.
395 /// - [`Error::Closed`] if the pool has been closed.
396 ///
397 /// # Examples
398 ///
399 /// ```
400 /// use pool_mod::{Manager, Pool};
401 /// use std::convert::Infallible;
402 /// # struct M;
403 /// # impl Manager for M {
404 /// # type Resource = u32; type Error = Infallible;
405 /// # fn create(&self) -> Result<u32, Infallible> { Ok(7) }
406 /// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
407 /// # }
408 /// let pool = Pool::builder(M).max_size(2).build().expect("valid");
409 /// let resource = pool.get().expect("available");
410 /// assert_eq!(*resource, 7);
411 /// ```
412 pub fn get(&self) -> Result<Pooled<M>, Error<M::Error>> {
413 let deadline = self
414 .0
415 .config
416 .create_timeout
417 .map(|timeout| Instant::now() + timeout);
418 self.acquire(deadline)
419 }
420
421 /// Borrow a resource, waiting at most `timeout` regardless of the configured
422 /// [`create_timeout`](PoolConfig::create_timeout).
423 ///
424 /// A `timeout` of [`Duration::ZERO`] makes this a non-blocking try: it returns
425 /// [`Error::Timeout`] at once if no resource can be handed out immediately.
426 ///
427 /// # Errors
428 ///
429 /// - [`Error::Backend`] if the manager fails to create a resource.
430 /// - [`Error::Timeout`] if no resource becomes available within `timeout`.
431 /// - [`Error::Closed`] if the pool has been closed.
432 ///
433 /// # Examples
434 ///
435 /// ```
436 /// use std::time::Duration;
437 /// use pool_mod::{Error, Manager, Pool};
438 /// use std::convert::Infallible;
439 /// # struct M;
440 /// # impl Manager for M {
441 /// # type Resource = u32; type Error = Infallible;
442 /// # fn create(&self) -> Result<u32, Infallible> { Ok(0) }
443 /// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
444 /// # }
445 /// let pool = Pool::builder(M).max_size(1).build().expect("valid");
446 /// let held = pool.get().expect("first checkout");
447 /// // The single slot is taken, so an immediate retry times out.
448 /// assert!(matches!(pool.get_timeout(Duration::ZERO), Err(Error::Timeout)));
449 /// ```
450 pub fn get_timeout(&self, timeout: Duration) -> Result<Pooled<M>, Error<M::Error>> {
451 self.acquire(Some(Instant::now() + timeout))
452 }
453
454 /// Borrow a resource without ever blocking.
455 ///
456 /// Returns a resource if one can be handed out immediately — an idle resource
457 /// is ready, or the pool has room to create one — and otherwise returns
458 /// [`Error::Timeout`] at once. Equivalent to
459 /// [`get_timeout(Duration::ZERO)`](Pool::get_timeout).
460 ///
461 /// # Errors
462 ///
463 /// - [`Error::Backend`] if the manager fails to create a resource.
464 /// - [`Error::Timeout`] if no resource is immediately available.
465 /// - [`Error::Closed`] if the pool has been closed.
466 ///
467 /// # Examples
468 ///
469 /// ```
470 /// use pool_mod::{Error, Manager, Pool};
471 /// use std::convert::Infallible;
472 /// # struct M;
473 /// # impl Manager for M {
474 /// # type Resource = u32; type Error = Infallible;
475 /// # fn create(&self) -> Result<u32, Infallible> { Ok(0) }
476 /// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
477 /// # }
478 /// let pool = Pool::builder(M).max_size(1).build().expect("valid");
479 /// let first = pool.try_get().expect("room to create one");
480 /// // The only slot is taken, so the next try fails immediately.
481 /// assert!(matches!(pool.try_get(), Err(Error::Timeout)));
482 /// ```
483 pub fn try_get(&self) -> Result<Pooled<M>, Error<M::Error>> {
484 self.acquire(Some(Instant::now()))
485 }
486
487 fn acquire(&self, deadline: Option<Instant>) -> Result<Pooled<M>, Error<M::Error>> {
488 let (resource, created_at) = self.0.acquire(deadline)?;
489 Ok(Pooled::new(Arc::clone(&self.0), resource, created_at))
490 }
491
492 /// Take a snapshot of the pool's current occupancy.
493 ///
494 /// # Examples
495 ///
496 /// ```
497 /// use pool_mod::{Manager, Pool};
498 /// use std::convert::Infallible;
499 /// # struct M;
500 /// # impl Manager for M {
501 /// # type Resource = (); type Error = Infallible;
502 /// # fn create(&self) -> Result<(), Infallible> { Ok(()) }
503 /// # fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
504 /// # }
505 /// let pool = Pool::builder(M).max_size(4).min_idle(1).build().expect("valid");
506 /// let status = pool.status();
507 /// assert_eq!(status.idle, 1);
508 /// assert_eq!(status.max_size, 4);
509 /// ```
510 pub fn status(&self) -> Status {
511 let state = lock(&self.0.state);
512 let idle = state.idle.len();
513 let size = state.total;
514 Status {
515 size,
516 idle,
517 in_use: size.saturating_sub(idle),
518 max_size: self.0.config.max_size,
519 }
520 }
521
522 /// Close the pool: discard every idle resource and reject all future
523 /// checkouts with [`Error::Closed`].
524 ///
525 /// Resources currently checked out are unaffected and are simply dropped
526 /// (not returned to the idle set) when their guards fall. Closing is
527 /// idempotent. Idle resources are dropped outside the pool's lock, so a slow
528 /// resource destructor does not block other threads.
529 ///
530 /// # Examples
531 ///
532 /// ```
533 /// use pool_mod::{Error, Manager, Pool};
534 /// use std::convert::Infallible;
535 /// # struct M;
536 /// # impl Manager for M {
537 /// # type Resource = (); type Error = Infallible;
538 /// # fn create(&self) -> Result<(), Infallible> { Ok(()) }
539 /// # fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
540 /// # }
541 /// let pool = Pool::builder(M).max_size(2).min_idle(2).build().expect("valid");
542 /// pool.close();
543 /// assert!(pool.is_closed());
544 /// assert!(matches!(pool.get(), Err(Error::Closed)));
545 /// ```
546 pub fn close(&self) {
547 let mut state = lock(&self.0.state);
548 let drained = std::mem::take(&mut state.idle);
549 state.total = state.total.saturating_sub(drained.len());
550 state.closed = true;
551 drop(state);
552 self.0.available.notify_all();
553 self.0.shutdown.signal(); // stop the reaper; a closed pool needs no upkeep
554 drop(drained); // resource destructors run here, outside the lock
555 }
556
557 /// Report whether the pool has been [closed](Pool::close).
558 #[must_use]
559 pub fn is_closed(&self) -> bool {
560 lock(&self.0.state).closed
561 }
562}
563
564/// A fluent builder for a [`Pool`].
565///
566/// Created by [`Pool::builder`]. Each setter consumes and returns the builder, so
567/// calls chain; [`build`](Builder::build) validates the configuration and
568/// pre-creates the `min_idle` resources.
569///
570/// # Examples
571///
572/// ```
573/// use std::time::Duration;
574/// use pool_mod::{Manager, Pool};
575/// use std::convert::Infallible;
576/// # struct M;
577/// # impl Manager for M {
578/// # type Resource = u32; type Error = Infallible;
579/// # fn create(&self) -> Result<u32, Infallible> { Ok(0) }
580/// # fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
581/// # }
582/// let pool = Pool::builder(M)
583/// .max_size(32)
584/// .min_idle(4)
585/// .idle_timeout(Some(Duration::from_secs(600)))
586/// .max_lifetime(Some(Duration::from_secs(3600)))
587/// .build()
588/// .expect("configuration is valid");
589/// assert_eq!(pool.status().idle, 4);
590/// ```
591#[must_use = "a Builder does nothing until `.build()` is called"]
592pub struct Builder<M: Manager> {
593 manager: M,
594 config: PoolConfig,
595}
596
597impl<M: Manager> Builder<M> {
598 /// Create a builder for `manager` seeded with the default configuration.
599 pub fn new(manager: M) -> Self {
600 Builder {
601 manager,
602 config: PoolConfig::default(),
603 }
604 }
605
606 /// Set the maximum number of resources the pool may own at once.
607 pub fn max_size(mut self, max_size: usize) -> Self {
608 self.config.max_size = max_size;
609 self
610 }
611
612 /// Set how many resources to create up front and keep ready.
613 pub fn min_idle(mut self, min_idle: usize) -> Self {
614 self.config.min_idle = min_idle;
615 self
616 }
617
618 /// Set how long [`Pool::get`] waits when the pool is saturated. `None` waits
619 /// indefinitely.
620 pub fn create_timeout(mut self, timeout: Option<Duration>) -> Self {
621 self.config.create_timeout = timeout;
622 self
623 }
624
625 /// Set the idle-expiry window. `None` disables idle expiry.
626 pub fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
627 self.config.idle_timeout = timeout;
628 self
629 }
630
631 /// Set the maximum resource lifetime. `None` disables lifetime expiry.
632 pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
633 self.config.max_lifetime = lifetime;
634 self
635 }
636
637 /// Set how often a background thread prunes expired idle resources. `None`
638 /// (the default) runs no background thread, applying expiry lazily on borrow.
639 ///
640 /// Has no effect unless `idle_timeout` or `max_lifetime` is also set.
641 pub fn reap_interval(mut self, interval: Option<Duration>) -> Self {
642 self.config.reap_interval = interval;
643 self
644 }
645
646 /// Replace the entire configuration with `config`.
647 ///
648 /// Useful when the configuration is loaded from a file rather than assembled
649 /// setter by setter.
650 pub fn config(mut self, config: PoolConfig) -> Self {
651 self.config = config;
652 self
653 }
654
655 /// Validate the configuration, build the pool, and pre-create `min_idle`
656 /// resources.
657 ///
658 /// # Errors
659 ///
660 /// - [`Error::InvalidConfig`] if `max_size` is zero or `min_idle` exceeds
661 /// `max_size`.
662 /// - [`Error::Backend`] if creating one of the `min_idle` resources fails;
663 /// any already-created resources are dropped before returning.
664 ///
665 /// # Examples
666 ///
667 /// ```
668 /// use pool_mod::{Error, Manager, Pool};
669 /// use std::convert::Infallible;
670 /// # struct M;
671 /// # impl Manager for M {
672 /// # type Resource = (); type Error = Infallible;
673 /// # fn create(&self) -> Result<(), Infallible> { Ok(()) }
674 /// # fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
675 /// # }
676 /// let invalid = Pool::builder(M).max_size(0).build();
677 /// assert!(matches!(invalid, Err(Error::InvalidConfig(_))));
678 /// ```
679 pub fn build(self) -> Result<Pool<M>, Error<M::Error>> {
680 if self.config.max_size == 0 {
681 return Err(Error::InvalidConfig("max_size must be at least 1"));
682 }
683 if self.config.min_idle > self.config.max_size {
684 return Err(Error::InvalidConfig("min_idle must not exceed max_size"));
685 }
686
687 let pool = Pool(Arc::new(PoolInner {
688 manager: self.manager,
689 config: self.config,
690 state: Mutex::new(State {
691 idle: VecDeque::with_capacity(self.config.max_size),
692 total: 0,
693 closed: false,
694 }),
695 available: Condvar::new(),
696 shutdown: Arc::new(Shutdown::new()),
697 }));
698
699 for _ in 0..pool.0.config.min_idle {
700 match pool.0.manager.create() {
701 Ok(resource) => {
702 let now = Instant::now();
703 let mut state = lock(&pool.0.state);
704 state.idle.push_back(Idle {
705 resource,
706 created_at: now,
707 last_used: now,
708 });
709 state.total += 1;
710 }
711 Err(source) => return Err(Error::Backend(source)),
712 }
713 }
714
715 pool.spawn_reaper();
716
717 Ok(pool)
718 }
719}
720
721impl<M: Manager> Pool<M> {
722 /// Start the background reaper if `reap_interval` is configured.
723 ///
724 /// The reaper holds only a [`Weak`] handle, so it never keeps the pool alive;
725 /// it stops when every handle is dropped or the pool is closed. If the OS
726 /// refuses a new thread, the pool keeps working with expiry applied lazily on
727 /// checkout — the reaper is an optimization, not a correctness requirement.
728 fn spawn_reaper(&self) {
729 let Some(interval) = self.0.config.reap_interval else {
730 return;
731 };
732 let pool = Arc::downgrade(&self.0);
733 let shutdown = Arc::clone(&self.0.shutdown);
734 match thread::Builder::new()
735 .name("pool-mod-reaper".to_owned())
736 .spawn(move || reaper_loop(pool, shutdown, interval))
737 {
738 // Detach the handle; the reaper self-terminates via the shutdown
739 // signal and its `Weak` reference.
740 Ok(handle) => drop(handle),
741 // Spawn failed: fall back to lazy, checkout-time expiry.
742 Err(_) => self.0.shutdown.signal(),
743 }
744 }
745}
746
747#[cfg(test)]
748// Justification: in test code a failed setup or checkout has no meaningful
749// recovery — unwinding with a clear panic is the correct outcome. REPS permits
750// `unwrap`/`expect` in test modules for exactly this reason.
751#[allow(clippy::unwrap_used, clippy::expect_used)]
752mod tests {
753 use super::*;
754 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
755
756 #[derive(Debug, PartialEq, Eq)]
757 struct TestError(&'static str);
758
759 impl std::fmt::Display for TestError {
760 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761 f.write_str(self.0)
762 }
763 }
764
765 impl std::error::Error for TestError {}
766
767 /// A manager whose behaviour is steerable through atomics so individual
768 /// lifecycle paths can be exercised deterministically.
769 struct Steerable {
770 created: AtomicUsize,
771 recycled: AtomicUsize,
772 validated: AtomicUsize,
773 create_fails: AtomicBool,
774 recycle_fails: AtomicBool,
775 valid: AtomicBool,
776 }
777
778 impl Steerable {
779 fn new() -> Self {
780 Steerable {
781 created: AtomicUsize::new(0),
782 recycled: AtomicUsize::new(0),
783 validated: AtomicUsize::new(0),
784 create_fails: AtomicBool::new(false),
785 recycle_fails: AtomicBool::new(false),
786 valid: AtomicBool::new(true),
787 }
788 }
789 }
790
791 impl Manager for Steerable {
792 type Resource = usize;
793 type Error = TestError;
794
795 fn create(&self) -> Result<usize, TestError> {
796 if self.create_fails.load(Ordering::SeqCst) {
797 return Err(TestError("create failed"));
798 }
799 Ok(self.created.fetch_add(1, Ordering::SeqCst))
800 }
801
802 fn recycle(&self, _resource: &mut usize) -> Result<(), TestError> {
803 let _ = self.recycled.fetch_add(1, Ordering::SeqCst);
804 if self.recycle_fails.load(Ordering::SeqCst) {
805 return Err(TestError("recycle failed"));
806 }
807 Ok(())
808 }
809
810 fn validate(&self, _resource: &mut usize) -> bool {
811 let _ = self.validated.fetch_add(1, Ordering::SeqCst);
812 self.valid.load(Ordering::SeqCst)
813 }
814 }
815
816 fn pool(builder: impl FnOnce(Builder<Steerable>) -> Builder<Steerable>) -> Pool<Steerable> {
817 builder(Pool::builder(Steerable::new())).build().unwrap()
818 }
819
820 #[test]
821 fn test_build_min_idle_precreates_resources() {
822 let p = pool(|b| b.max_size(4).min_idle(2));
823 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
824 let status = p.status();
825 assert_eq!(status.idle, 2);
826 assert_eq!(status.size, 2);
827 assert_eq!(status.in_use, 0);
828 }
829
830 #[test]
831 fn test_get_then_drop_reuses_same_resource() {
832 let p = pool(|b| b.max_size(4));
833 {
834 let first = p.get().unwrap();
835 assert_eq!(*first, 0);
836 }
837 let second = p.get().unwrap();
838 assert_eq!(*second, 0); // same resource id, reused
839 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 1);
840 assert_eq!(p.0.manager.recycled.load(Ordering::SeqCst), 1);
841 }
842
843 #[test]
844 fn test_in_use_tracks_outstanding_guards() {
845 let p = pool(|b| b.max_size(2));
846 let a = p.get().unwrap();
847 let b = p.get().unwrap();
848 assert_eq!(p.status().in_use, 2);
849 assert_eq!(p.status().idle, 0);
850 drop(a);
851 drop(b);
852 assert_eq!(p.status().in_use, 0);
853 assert_eq!(p.status().idle, 2);
854 }
855
856 #[test]
857 fn test_saturated_pool_times_out() {
858 let p = pool(|b| b.max_size(1));
859 let _held = p.get().unwrap();
860 let result = p.get_timeout(Duration::ZERO);
861 assert!(matches!(result, Err(Error::Timeout)));
862 }
863
864 #[test]
865 fn test_invalid_resource_is_discarded_and_replaced() {
866 let p = pool(|b| b.max_size(4).min_idle(1));
867 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 1);
868 p.0.manager.valid.store(false, Ordering::SeqCst);
869
870 // The single idle resource fails validation, so it is dropped and a fresh
871 // one is created. (The fresh resource is not itself re-validated.)
872 let resource = p.get().unwrap();
873 assert_eq!(*resource, 1);
874 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
875 assert!(p.0.manager.validated.load(Ordering::SeqCst) >= 1);
876 }
877
878 #[test]
879 fn test_max_lifetime_forces_replacement() {
880 let p = pool(|b| b.max_size(4).min_idle(1).max_lifetime(Some(Duration::ZERO)));
881 // Zero lifetime means any idle resource is always too old on checkout.
882 let resource = p.get().unwrap();
883 assert_eq!(*resource, 1);
884 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
885 }
886
887 #[test]
888 fn test_idle_timeout_forces_replacement() {
889 let p = pool(|b| b.max_size(4).min_idle(1).idle_timeout(Some(Duration::ZERO)));
890 let resource = p.get().unwrap();
891 assert_eq!(*resource, 1);
892 assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
893 }
894
895 #[test]
896 fn test_recycle_failure_drops_resource() {
897 let p = pool(|b| b.max_size(2));
898 p.0.manager.recycle_fails.store(true, Ordering::SeqCst);
899 {
900 let _resource = p.get().unwrap();
901 assert_eq!(p.status().size, 1);
902 }
903 // Recycle failed on return, so the resource was discarded, not pooled.
904 assert_eq!(p.status().size, 0);
905 assert_eq!(p.status().idle, 0);
906 }
907
908 #[test]
909 fn test_create_failure_surfaces_and_frees_slot() {
910 let p = pool(|b| b.max_size(2));
911 p.0.manager.create_fails.store(true, Ordering::SeqCst);
912 let result = p.get();
913 assert!(matches!(
914 result,
915 Err(Error::Backend(TestError("create failed")))
916 ));
917 // The reserved slot was released, so the pool did not shrink.
918 assert_eq!(p.status().size, 0);
919 }
920
921 #[test]
922 fn test_closed_pool_rejects_checkout() {
923 let p = pool(|b| b.max_size(2).min_idle(1));
924 p.close();
925 assert!(p.is_closed());
926 assert!(matches!(p.get(), Err(Error::Closed)));
927 assert_eq!(p.status().idle, 0); // idle resources were dropped on close
928 }
929
930 #[test]
931 fn test_close_is_idempotent() {
932 let p = pool(|b| b.max_size(2).min_idle(2));
933 p.close();
934 p.close();
935 assert!(p.is_closed());
936 }
937
938 #[test]
939 fn test_build_rejects_zero_max_size() {
940 let result = Pool::builder(Steerable::new()).max_size(0).build();
941 assert!(matches!(result, Err(Error::InvalidConfig(_))));
942 }
943
944 #[test]
945 fn test_build_rejects_min_idle_above_max_size() {
946 let result = Pool::builder(Steerable::new())
947 .max_size(2)
948 .min_idle(3)
949 .build();
950 assert!(matches!(result, Err(Error::InvalidConfig(_))));
951 }
952
953 #[test]
954 fn test_try_get_does_not_block_when_saturated() {
955 let p = pool(|b| b.max_size(1));
956 let _held = p.try_get().unwrap();
957 assert!(matches!(p.try_get(), Err(Error::Timeout)));
958 }
959
960 #[test]
961 fn test_reap_prunes_time_expired_idle() {
962 // A zero idle_timeout makes every idle resource immediately expired, so
963 // `reap` is deterministic without a background thread or sleeps.
964 let p = pool(|b| b.max_size(4).min_idle(2).idle_timeout(Some(Duration::ZERO)));
965 assert_eq!(p.status().idle, 2);
966 p.0.reap();
967 assert_eq!(p.status().idle, 0);
968 assert_eq!(p.status().size, 0);
969 }
970
971 #[test]
972 fn test_reap_is_noop_without_expiry_policy() {
973 let p = pool(|b| b.max_size(4).min_idle(2));
974 p.0.reap();
975 assert_eq!(p.status().idle, 2); // no idle_timeout/max_lifetime: nothing to prune
976 }
977
978 #[test]
979 fn test_clone_shares_one_pool() {
980 let p = pool(|b| b.max_size(1));
981 let clone = p.clone();
982 let _held = p.get().unwrap();
983 // The clone sees the same exhausted pool.
984 assert!(matches!(
985 clone.get_timeout(Duration::ZERO),
986 Err(Error::Timeout)
987 ));
988 }
989}