Skip to main content

pool_mod/
pool.rs

1//! The pool itself: [`Pool`] and its [`Builder`].
2
3use std::collections::VecDeque;
4use std::sync::{Arc, Condvar, Mutex, MutexGuard, PoisonError, Weak};
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crate::config::PoolConfig;
9use crate::error::Error;
10use crate::manager::Manager;
11use crate::object::Pooled;
12use crate::status::Status;
13
14/// Acquire a mutex guard, recovering the data even if a previous holder panicked.
15///
16/// The pool only mutates plain counters and a queue while the lock is held, never
17/// running user code, so the protected state is always consistent at an unlock
18/// point. Honouring poison would convert an unrelated panic elsewhere into a
19/// permanent, unrecoverable pool outage, which is the worse failure mode.
20#[inline]
21pub(crate) fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
22    mutex.lock().unwrap_or_else(PoisonError::into_inner)
23}
24
25/// A resource resting in the idle set, tagged with the timestamps the pool uses
26/// to enforce `idle_timeout` and `max_lifetime`.
27pub(crate) struct Idle<R> {
28    pub(crate) resource: R,
29    pub(crate) created_at: Instant,
30    pub(crate) last_used: Instant,
31}
32
33/// The mutable inner state guarded by the pool's mutex.
34struct State<R> {
35    idle: VecDeque<Idle<R>>,
36    /// Resources the pool currently owns: idle, checked out, or mid-creation.
37    total: usize,
38    /// Threads currently blocked in [`PoolInner::acquire`] waiting on `available`.
39    /// Maintained under the lock so the wakers can skip the condvar entirely when
40    /// no one is waiting — the common, uncontended case.
41    waiters: usize,
42    closed: bool,
43}
44
45/// The decision reached while holding the lock, carried out once it is released.
46///
47/// Only outcomes that require running user code — and therefore must not hold the
48/// lock — escape the locked region; waiting and immediate errors are handled in
49/// place.
50enum Action<R> {
51    Reuse(Idle<R>),
52    Create,
53}
54
55/// The stop signal shared between the pool and its background reaper thread.
56///
57/// Held by both sides through an [`Arc`] so it outlives the [`PoolInner`] it
58/// guards: when the last pool handle drops, `PoolInner`'s destructor sets the
59/// flag and wakes the reaper, which then exits.
60struct Shutdown {
61    stop: Mutex<bool>,
62    wake: Condvar,
63}
64
65impl Shutdown {
66    fn new() -> Self {
67        Shutdown {
68            stop: Mutex::new(false),
69            wake: Condvar::new(),
70        }
71    }
72
73    /// Signal the reaper to stop and wake it immediately.
74    fn signal(&self) {
75        let mut stop = lock(&self.stop);
76        *stop = true;
77        drop(stop);
78        self.wake.notify_all();
79    }
80}
81
82/// Shared pool state behind an [`Arc`]. Every [`Pool`] handle and every live
83/// [`Pooled`] guard holds one of these.
84pub(crate) struct PoolInner<M: Manager> {
85    pub(crate) manager: M,
86    config: PoolConfig,
87    state: Mutex<State<M::Resource>>,
88    available: Condvar,
89    shutdown: Arc<Shutdown>,
90}
91
92impl<M: Manager> Drop for PoolInner<M> {
93    fn drop(&mut self) {
94        // The last handle is gone. Stop the reaper (if one is running) so it does
95        // not outlive the pool it maintains.
96        self.shutdown.signal();
97    }
98}
99
100impl<M: Manager> PoolInner<M> {
101    /// Take a resource out of the pool, blocking until one is free, a slot opens
102    /// for a fresh one, or the deadline passes.
103    fn acquire(
104        &self,
105        deadline: Option<Instant>,
106    ) -> Result<(M::Resource, Instant), Error<M::Error>> {
107        loop {
108            let action = {
109                let mut state = lock(&self.state);
110                loop {
111                    if state.closed {
112                        return Err(Error::Closed);
113                    }
114                    if let Some(idle) = state.idle.pop_front() {
115                        break Action::Reuse(idle);
116                    }
117                    if state.total < self.config.max_size {
118                        state.total += 1;
119                        break Action::Create;
120                    }
121                    // Saturated: wait for a check-in or a close, then re-evaluate.
122                    // `waiters` is bumped immediately before sleeping and dropped
123                    // the moment we wake, both under the lock, so a waker can tell
124                    // whether a signal is needed at all.
125                    match deadline {
126                        None => {
127                            state.waiters += 1;
128                            state = self
129                                .available
130                                .wait(state)
131                                .unwrap_or_else(PoisonError::into_inner);
132                            state.waiters -= 1;
133                        }
134                        Some(dl) => {
135                            let now = Instant::now();
136                            if now >= dl {
137                                return Err(Error::Timeout);
138                            }
139                            state.waiters += 1;
140                            let (guard, _) = self
141                                .available
142                                .wait_timeout(state, dl - now)
143                                .unwrap_or_else(PoisonError::into_inner);
144                            state = guard;
145                            state.waiters -= 1;
146                        }
147                    }
148                }
149            };
150
151            match action {
152                Action::Reuse(idle) => {
153                    if let Some(prepared) = self.prepare(idle) {
154                        return Ok(prepared);
155                    }
156                    // The idle resource was stale or invalid and has been dropped;
157                    // release its slot and try again from the top.
158                    self.release_slot();
159                }
160                Action::Create => match self.manager.create() {
161                    Ok(resource) => return Ok((resource, Instant::now())),
162                    Err(source) => {
163                        self.release_slot();
164                        return Err(Error::Backend(source));
165                    }
166                },
167            }
168        }
169    }
170
171    /// Apply lifetime, idle-timeout, and validation checks to an idle resource.
172    ///
173    /// Returns the resource and its original creation time on success; returns
174    /// `None` (dropping the resource) when it is too old, has sat idle too long,
175    /// or fails validation.
176    fn prepare(&self, mut idle: Idle<M::Resource>) -> Option<(M::Resource, Instant)> {
177        if self.is_time_expired(&idle, Instant::now()) {
178            return None;
179        }
180        if !self.manager.validate(&mut idle.resource) {
181            return None;
182        }
183        Some((idle.resource, idle.created_at))
184    }
185
186    /// Whether an idle resource has outlived `max_lifetime` or `idle_timeout`.
187    ///
188    /// This is the time-based half of expiry, shared by checkout
189    /// ([`prepare`](Self::prepare)) and the background [`reap`](Self::reap); it
190    /// deliberately does not call [`Manager::validate`], which is a checkout-time
191    /// concern.
192    fn is_time_expired(&self, idle: &Idle<M::Resource>, now: Instant) -> bool {
193        if let Some(max_lifetime) = self.config.max_lifetime {
194            if now.saturating_duration_since(idle.created_at) >= max_lifetime {
195                return true;
196            }
197        }
198        if let Some(idle_timeout) = self.config.idle_timeout {
199            if now.saturating_duration_since(idle.last_used) >= idle_timeout {
200                return true;
201            }
202        }
203        false
204    }
205
206    /// Prune idle resources that have outlived their `idle_timeout` or
207    /// `max_lifetime`. Called on each tick of the background reaper.
208    ///
209    /// Expired resources are removed under the lock but dropped outside it, so a
210    /// slow destructor does not stall checkouts. The reaper never creates
211    /// resources; pruning shrinks the idle set, and on-demand growth refills it.
212    fn reap(&self) {
213        if self.config.idle_timeout.is_none() && self.config.max_lifetime.is_none() {
214            return;
215        }
216        let now = Instant::now();
217        let mut expired = Vec::new();
218        let waiters;
219        {
220            let mut state = lock(&self.state);
221            if state.closed {
222                return;
223            }
224            let mut kept = VecDeque::with_capacity(state.idle.len());
225            while let Some(idle) = state.idle.pop_front() {
226                if self.is_time_expired(&idle, now) {
227                    expired.push(idle.resource);
228                } else {
229                    kept.push_back(idle);
230                }
231            }
232            state.total = state.total.saturating_sub(expired.len());
233            state.idle = kept;
234            waiters = state.waiters;
235        }
236        drop(expired); // destructors run here, outside the lock
237                       // Freed slots may unblock threads waiting to create a resource.
238        self.wake_all(waiters);
239    }
240
241    /// Give back a reserved slot and wake one waiter, if any, so it can claim it.
242    fn release_slot(&self) {
243        let mut state = lock(&self.state);
244        state.total = state.total.saturating_sub(1);
245        let waiters = state.waiters;
246        drop(state);
247        self.wake_one(waiters);
248    }
249
250    /// Return a borrowed resource to the pool. Called from [`Pooled`]'s `Drop`.
251    pub(crate) fn checkin(&self, mut resource: M::Resource, created_at: Instant) {
252        let recycled = self.manager.recycle(&mut resource);
253        let mut state = lock(&self.state);
254        let waiters = state.waiters;
255        if state.closed || recycled.is_err() {
256            state.total = state.total.saturating_sub(1);
257            drop(state);
258            self.wake_one(waiters);
259            // `resource` is dropped here, outside the lock.
260        } else {
261            let last_used = Instant::now();
262            state.idle.push_back(Idle {
263                resource,
264                created_at,
265                last_used,
266            });
267            drop(state);
268            self.wake_one(waiters);
269        }
270    }
271
272    /// Wake a single blocked acquirer, but only if one is actually waiting.
273    ///
274    /// `waiters` is the count read under the lock by the caller; skipping the
275    /// condvar when it is zero keeps the uncontended checkout/return path free of
276    /// needless signaling.
277    #[inline]
278    fn wake_one(&self, waiters: usize) {
279        if waiters > 0 {
280            self.available.notify_one();
281        }
282    }
283
284    /// Wake every blocked acquirer, but only if any are waiting.
285    #[inline]
286    fn wake_all(&self, waiters: usize) {
287        if waiters > 0 {
288            self.available.notify_all();
289        }
290    }
291}
292
293/// The body of the background reaper thread.
294///
295/// Sleeps for `interval` between sweeps, waking early when the shutdown signal
296/// fires. It holds only a [`Weak`] reference to the pool, so it never keeps the
297/// pool alive; once every handle is dropped (`upgrade` returns `None`) or the
298/// stop flag is set, it returns and the thread ends.
299fn reaper_loop<M: Manager>(pool: Weak<PoolInner<M>>, shutdown: Arc<Shutdown>, interval: Duration) {
300    loop {
301        {
302            let stop = lock(&shutdown.stop);
303            if *stop {
304                return;
305            }
306            let (stop, _timed_out) = shutdown
307                .wake
308                .wait_timeout(stop, interval)
309                .unwrap_or_else(PoisonError::into_inner);
310            if *stop {
311                return;
312            }
313        }
314        match pool.upgrade() {
315            Some(inner) => inner.reap(),
316            None => return,
317        }
318    }
319}
320
321/// A thread-safe pool of reusable resources.
322///
323/// A `Pool<M>` lends out resources built by a [`Manager`], reclaiming and
324/// recycling each one when its [`Pooled`] guard is dropped. It is cheap to clone
325/// — every clone is a handle onto the same shared pool — so share it across
326/// threads by cloning rather than wrapping it in another `Arc`.
327///
328/// The pool is runtime-agnostic and carries no async dependency.
329/// [`get`](Pool::get) blocks the calling thread until a resource is available; in
330/// an async context, acquire on a blocking-friendly executor thread (for example
331/// `tokio::task::spawn_blocking`). The returned guard is `Send`, so it can be
332/// held across `.await` points.
333///
334/// # Examples
335///
336/// ```
337/// use pool_mod::{Manager, Pool};
338/// use std::convert::Infallible;
339///
340/// struct Connections;
341/// impl Manager for Connections {
342///     type Resource = String;
343///     type Error = Infallible;
344///     fn create(&self) -> Result<String, Infallible> { Ok(String::new()) }
345///     fn recycle(&self, c: &mut String) -> Result<(), Infallible> { c.clear(); Ok(()) }
346/// }
347///
348/// let pool = Pool::builder(Connections).max_size(4).build()
349///     .expect("configuration is valid");
350///
351/// let mut conn = pool.get().expect("a connection is available");
352/// conn.push_str("SELECT 1");
353/// assert_eq!(pool.status().in_use, 1);
354/// drop(conn);
355/// assert_eq!(pool.status().in_use, 0);
356/// ```
357pub struct Pool<M: Manager>(Arc<PoolInner<M>>);
358
359impl<M: Manager> Clone for Pool<M> {
360    fn clone(&self) -> Self {
361        Pool(Arc::clone(&self.0))
362    }
363}
364
365impl<M: Manager> Pool<M> {
366    /// Start building a pool for `manager` with the default configuration.
367    ///
368    /// # Examples
369    ///
370    /// ```
371    /// use pool_mod::{Manager, Pool};
372    /// use std::convert::Infallible;
373    /// # struct M;
374    /// # impl Manager for M {
375    /// #   type Resource = u32; type Error = Infallible;
376    /// #   fn create(&self) -> Result<u32, Infallible> { Ok(0) }
377    /// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
378    /// # }
379    /// let pool = Pool::builder(M).max_size(8).min_idle(2).build()
380    ///     .expect("configuration is valid");
381    /// assert_eq!(pool.status().max_size, 8);
382    /// ```
383    pub fn builder(manager: M) -> Builder<M> {
384        Builder::new(manager)
385    }
386
387    /// Build a pool for `manager` with the [default configuration](PoolConfig::default).
388    ///
389    /// A shortcut for `Pool::builder(manager).build()`.
390    ///
391    /// # Errors
392    ///
393    /// Returns [`Error::Backend`] if pre-creating the initial resources fails.
394    /// (With the default `min_idle` of 0, no resources are created up front, so
395    /// the default-configured pool only fails to build if you have customized the
396    /// configuration through [`Pool::builder`] instead.)
397    ///
398    /// # Examples
399    ///
400    /// ```
401    /// use pool_mod::{Manager, Pool};
402    /// use std::convert::Infallible;
403    /// # struct M;
404    /// # impl Manager for M {
405    /// #   type Resource = u32; type Error = Infallible;
406    /// #   fn create(&self) -> Result<u32, Infallible> { Ok(0) }
407    /// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
408    /// # }
409    /// let pool = Pool::new(M).expect("configuration is valid");
410    /// assert_eq!(pool.status().max_size, 10); // the default
411    /// ```
412    pub fn new(manager: M) -> Result<Self, Error<M::Error>> {
413        Builder::new(manager).build()
414    }
415
416    /// Borrow a resource, waiting up to the configured
417    /// [`create_timeout`](PoolConfig::create_timeout) if the pool is saturated.
418    ///
419    /// Reuses an idle resource when one is available (after validation), grows the
420    /// pool toward `max_size` when it is not, and otherwise blocks until a
421    /// resource is returned or the timeout elapses.
422    ///
423    /// # Errors
424    ///
425    /// - [`Error::Backend`] if the manager fails to create a resource.
426    /// - [`Error::Timeout`] if the pool stays saturated past `create_timeout`.
427    /// - [`Error::Closed`] if the pool has been closed.
428    ///
429    /// # Examples
430    ///
431    /// ```
432    /// use pool_mod::{Manager, Pool};
433    /// use std::convert::Infallible;
434    /// # struct M;
435    /// # impl Manager for M {
436    /// #   type Resource = u32; type Error = Infallible;
437    /// #   fn create(&self) -> Result<u32, Infallible> { Ok(7) }
438    /// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
439    /// # }
440    /// let pool = Pool::builder(M).max_size(2).build().expect("valid");
441    /// let resource = pool.get().expect("available");
442    /// assert_eq!(*resource, 7);
443    /// ```
444    pub fn get(&self) -> Result<Pooled<M>, Error<M::Error>> {
445        let deadline = self
446            .0
447            .config
448            .create_timeout
449            .map(|timeout| Instant::now() + timeout);
450        self.acquire(deadline)
451    }
452
453    /// Borrow a resource, waiting at most `timeout` regardless of the configured
454    /// [`create_timeout`](PoolConfig::create_timeout).
455    ///
456    /// A `timeout` of [`Duration::ZERO`] makes this a non-blocking try: it returns
457    /// [`Error::Timeout`] at once if no resource can be handed out immediately.
458    ///
459    /// # Errors
460    ///
461    /// - [`Error::Backend`] if the manager fails to create a resource.
462    /// - [`Error::Timeout`] if no resource becomes available within `timeout`.
463    /// - [`Error::Closed`] if the pool has been closed.
464    ///
465    /// # Examples
466    ///
467    /// ```
468    /// use std::time::Duration;
469    /// use pool_mod::{Error, Manager, Pool};
470    /// use std::convert::Infallible;
471    /// # struct M;
472    /// # impl Manager for M {
473    /// #   type Resource = u32; type Error = Infallible;
474    /// #   fn create(&self) -> Result<u32, Infallible> { Ok(0) }
475    /// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
476    /// # }
477    /// let pool = Pool::builder(M).max_size(1).build().expect("valid");
478    /// let held = pool.get().expect("first checkout");
479    /// // The single slot is taken, so an immediate retry times out.
480    /// assert!(matches!(pool.get_timeout(Duration::ZERO), Err(Error::Timeout)));
481    /// ```
482    pub fn get_timeout(&self, timeout: Duration) -> Result<Pooled<M>, Error<M::Error>> {
483        self.acquire(Some(Instant::now() + timeout))
484    }
485
486    /// Borrow a resource without ever blocking.
487    ///
488    /// Returns a resource if one can be handed out immediately — an idle resource
489    /// is ready, or the pool has room to create one — and otherwise returns
490    /// [`Error::Timeout`] at once. Equivalent to
491    /// [`get_timeout(Duration::ZERO)`](Pool::get_timeout).
492    ///
493    /// # Errors
494    ///
495    /// - [`Error::Backend`] if the manager fails to create a resource.
496    /// - [`Error::Timeout`] if no resource is immediately available.
497    /// - [`Error::Closed`] if the pool has been closed.
498    ///
499    /// # Examples
500    ///
501    /// ```
502    /// use pool_mod::{Error, Manager, Pool};
503    /// use std::convert::Infallible;
504    /// # struct M;
505    /// # impl Manager for M {
506    /// #   type Resource = u32; type Error = Infallible;
507    /// #   fn create(&self) -> Result<u32, Infallible> { Ok(0) }
508    /// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
509    /// # }
510    /// let pool = Pool::builder(M).max_size(1).build().expect("valid");
511    /// let first = pool.try_get().expect("room to create one");
512    /// // The only slot is taken, so the next try fails immediately.
513    /// assert!(matches!(pool.try_get(), Err(Error::Timeout)));
514    /// ```
515    pub fn try_get(&self) -> Result<Pooled<M>, Error<M::Error>> {
516        self.acquire(Some(Instant::now()))
517    }
518
519    fn acquire(&self, deadline: Option<Instant>) -> Result<Pooled<M>, Error<M::Error>> {
520        let (resource, created_at) = self.0.acquire(deadline)?;
521        Ok(Pooled::new(Arc::clone(&self.0), resource, created_at))
522    }
523
524    /// Take a snapshot of the pool's current occupancy.
525    ///
526    /// # Examples
527    ///
528    /// ```
529    /// use pool_mod::{Manager, Pool};
530    /// use std::convert::Infallible;
531    /// # struct M;
532    /// # impl Manager for M {
533    /// #   type Resource = (); type Error = Infallible;
534    /// #   fn create(&self) -> Result<(), Infallible> { Ok(()) }
535    /// #   fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
536    /// # }
537    /// let pool = Pool::builder(M).max_size(4).min_idle(1).build().expect("valid");
538    /// let status = pool.status();
539    /// assert_eq!(status.idle, 1);
540    /// assert_eq!(status.max_size, 4);
541    /// ```
542    pub fn status(&self) -> Status {
543        let state = lock(&self.0.state);
544        let idle = state.idle.len();
545        let size = state.total;
546        Status {
547            size,
548            idle,
549            in_use: size.saturating_sub(idle),
550            max_size: self.0.config.max_size,
551        }
552    }
553
554    /// Close the pool: discard every idle resource and reject all future
555    /// checkouts with [`Error::Closed`].
556    ///
557    /// Resources currently checked out are unaffected and are simply dropped
558    /// (not returned to the idle set) when their guards fall. Closing is
559    /// idempotent. Idle resources are dropped outside the pool's lock, so a slow
560    /// resource destructor does not block other threads.
561    ///
562    /// # Examples
563    ///
564    /// ```
565    /// use pool_mod::{Error, Manager, Pool};
566    /// use std::convert::Infallible;
567    /// # struct M;
568    /// # impl Manager for M {
569    /// #   type Resource = (); type Error = Infallible;
570    /// #   fn create(&self) -> Result<(), Infallible> { Ok(()) }
571    /// #   fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
572    /// # }
573    /// let pool = Pool::builder(M).max_size(2).min_idle(2).build().expect("valid");
574    /// pool.close();
575    /// assert!(pool.is_closed());
576    /// assert!(matches!(pool.get(), Err(Error::Closed)));
577    /// ```
578    pub fn close(&self) {
579        let mut state = lock(&self.0.state);
580        let drained = std::mem::take(&mut state.idle);
581        state.total = state.total.saturating_sub(drained.len());
582        state.closed = true;
583        let waiters = state.waiters;
584        drop(state);
585        self.0.wake_all(waiters); // wake blocked acquirers so they observe the close
586        self.0.shutdown.signal(); // stop the reaper; a closed pool needs no upkeep
587        drop(drained); // resource destructors run here, outside the lock
588    }
589
590    /// Report whether the pool has been [closed](Pool::close).
591    #[must_use]
592    pub fn is_closed(&self) -> bool {
593        lock(&self.0.state).closed
594    }
595}
596
597/// A fluent builder for a [`Pool`].
598///
599/// Created by [`Pool::builder`]. Each setter consumes and returns the builder, so
600/// calls chain; [`build`](Builder::build) validates the configuration and
601/// pre-creates the `min_idle` resources.
602///
603/// # Examples
604///
605/// ```
606/// use std::time::Duration;
607/// use pool_mod::{Manager, Pool};
608/// use std::convert::Infallible;
609/// # struct M;
610/// # impl Manager for M {
611/// #   type Resource = u32; type Error = Infallible;
612/// #   fn create(&self) -> Result<u32, Infallible> { Ok(0) }
613/// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
614/// # }
615/// let pool = Pool::builder(M)
616///     .max_size(32)
617///     .min_idle(4)
618///     .idle_timeout(Some(Duration::from_secs(600)))
619///     .max_lifetime(Some(Duration::from_secs(3600)))
620///     .build()
621///     .expect("configuration is valid");
622/// assert_eq!(pool.status().idle, 4);
623/// ```
624#[must_use = "a Builder does nothing until `.build()` is called"]
625pub struct Builder<M: Manager> {
626    manager: M,
627    config: PoolConfig,
628}
629
630impl<M: Manager> Builder<M> {
631    /// Create a builder for `manager` seeded with the default configuration.
632    pub fn new(manager: M) -> Self {
633        Builder {
634            manager,
635            config: PoolConfig::default(),
636        }
637    }
638
639    /// Set the maximum number of resources the pool may own at once.
640    pub fn max_size(mut self, max_size: usize) -> Self {
641        self.config.max_size = max_size;
642        self
643    }
644
645    /// Set how many resources to create up front and keep ready.
646    pub fn min_idle(mut self, min_idle: usize) -> Self {
647        self.config.min_idle = min_idle;
648        self
649    }
650
651    /// Set how long [`Pool::get`] waits when the pool is saturated. `None` waits
652    /// indefinitely.
653    pub fn create_timeout(mut self, timeout: Option<Duration>) -> Self {
654        self.config.create_timeout = timeout;
655        self
656    }
657
658    /// Set the idle-expiry window. `None` disables idle expiry.
659    pub fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
660        self.config.idle_timeout = timeout;
661        self
662    }
663
664    /// Set the maximum resource lifetime. `None` disables lifetime expiry.
665    pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
666        self.config.max_lifetime = lifetime;
667        self
668    }
669
670    /// Set how often a background thread prunes expired idle resources. `None`
671    /// (the default) runs no background thread, applying expiry lazily on borrow.
672    ///
673    /// Has no effect unless `idle_timeout` or `max_lifetime` is also set.
674    pub fn reap_interval(mut self, interval: Option<Duration>) -> Self {
675        self.config.reap_interval = interval;
676        self
677    }
678
679    /// Replace the entire configuration with `config`.
680    ///
681    /// Useful when the configuration is loaded from a file rather than assembled
682    /// setter by setter.
683    pub fn config(mut self, config: PoolConfig) -> Self {
684        self.config = config;
685        self
686    }
687
688    /// Validate the configuration, build the pool, and pre-create `min_idle`
689    /// resources.
690    ///
691    /// # Errors
692    ///
693    /// - [`Error::InvalidConfig`] if `max_size` is zero or `min_idle` exceeds
694    ///   `max_size`.
695    /// - [`Error::Backend`] if creating one of the `min_idle` resources fails;
696    ///   any already-created resources are dropped before returning.
697    ///
698    /// # Examples
699    ///
700    /// ```
701    /// use pool_mod::{Error, Manager, Pool};
702    /// use std::convert::Infallible;
703    /// # struct M;
704    /// # impl Manager for M {
705    /// #   type Resource = (); type Error = Infallible;
706    /// #   fn create(&self) -> Result<(), Infallible> { Ok(()) }
707    /// #   fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
708    /// # }
709    /// let invalid = Pool::builder(M).max_size(0).build();
710    /// assert!(matches!(invalid, Err(Error::InvalidConfig(_))));
711    /// ```
712    pub fn build(self) -> Result<Pool<M>, Error<M::Error>> {
713        if self.config.max_size == 0 {
714            return Err(Error::InvalidConfig("max_size must be at least 1"));
715        }
716        if self.config.min_idle > self.config.max_size {
717            return Err(Error::InvalidConfig("min_idle must not exceed max_size"));
718        }
719
720        let pool = Pool(Arc::new(PoolInner {
721            manager: self.manager,
722            config: self.config,
723            state: Mutex::new(State {
724                idle: VecDeque::with_capacity(self.config.max_size),
725                total: 0,
726                waiters: 0,
727                closed: false,
728            }),
729            available: Condvar::new(),
730            shutdown: Arc::new(Shutdown::new()),
731        }));
732
733        for _ in 0..pool.0.config.min_idle {
734            match pool.0.manager.create() {
735                Ok(resource) => {
736                    let now = Instant::now();
737                    let mut state = lock(&pool.0.state);
738                    state.idle.push_back(Idle {
739                        resource,
740                        created_at: now,
741                        last_used: now,
742                    });
743                    state.total += 1;
744                }
745                Err(source) => return Err(Error::Backend(source)),
746            }
747        }
748
749        pool.spawn_reaper();
750
751        Ok(pool)
752    }
753}
754
755impl<M: Manager> Pool<M> {
756    /// Start the background reaper if `reap_interval` is configured.
757    ///
758    /// The reaper holds only a [`Weak`] handle, so it never keeps the pool alive;
759    /// it stops when every handle is dropped or the pool is closed. If the OS
760    /// refuses a new thread, the pool keeps working with expiry applied lazily on
761    /// checkout — the reaper is an optimization, not a correctness requirement.
762    fn spawn_reaper(&self) {
763        let Some(interval) = self.0.config.reap_interval else {
764            return;
765        };
766        let pool = Arc::downgrade(&self.0);
767        let shutdown = Arc::clone(&self.0.shutdown);
768        match thread::Builder::new()
769            .name("pool-mod-reaper".to_owned())
770            .spawn(move || reaper_loop(pool, shutdown, interval))
771        {
772            // Detach the handle; the reaper self-terminates via the shutdown
773            // signal and its `Weak` reference.
774            Ok(handle) => drop(handle),
775            // Spawn failed: fall back to lazy, checkout-time expiry.
776            Err(_) => self.0.shutdown.signal(),
777        }
778    }
779}
780
781#[cfg(test)]
782// Justification: in test code a failed setup or checkout has no meaningful
783// recovery — unwinding with a clear panic is the correct outcome. REPS permits
784// `unwrap`/`expect` in test modules for exactly this reason.
785#[allow(clippy::unwrap_used, clippy::expect_used)]
786mod tests {
787    use super::*;
788    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
789
790    #[derive(Debug, PartialEq, Eq)]
791    struct TestError(&'static str);
792
793    impl std::fmt::Display for TestError {
794        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
795            f.write_str(self.0)
796        }
797    }
798
799    impl std::error::Error for TestError {}
800
801    /// A manager whose behaviour is steerable through atomics so individual
802    /// lifecycle paths can be exercised deterministically.
803    struct Steerable {
804        created: AtomicUsize,
805        recycled: AtomicUsize,
806        validated: AtomicUsize,
807        create_fails: AtomicBool,
808        recycle_fails: AtomicBool,
809        valid: AtomicBool,
810    }
811
812    impl Steerable {
813        fn new() -> Self {
814            Steerable {
815                created: AtomicUsize::new(0),
816                recycled: AtomicUsize::new(0),
817                validated: AtomicUsize::new(0),
818                create_fails: AtomicBool::new(false),
819                recycle_fails: AtomicBool::new(false),
820                valid: AtomicBool::new(true),
821            }
822        }
823    }
824
825    impl Manager for Steerable {
826        type Resource = usize;
827        type Error = TestError;
828
829        fn create(&self) -> Result<usize, TestError> {
830            if self.create_fails.load(Ordering::SeqCst) {
831                return Err(TestError("create failed"));
832            }
833            Ok(self.created.fetch_add(1, Ordering::SeqCst))
834        }
835
836        fn recycle(&self, _resource: &mut usize) -> Result<(), TestError> {
837            let _ = self.recycled.fetch_add(1, Ordering::SeqCst);
838            if self.recycle_fails.load(Ordering::SeqCst) {
839                return Err(TestError("recycle failed"));
840            }
841            Ok(())
842        }
843
844        fn validate(&self, _resource: &mut usize) -> bool {
845            let _ = self.validated.fetch_add(1, Ordering::SeqCst);
846            self.valid.load(Ordering::SeqCst)
847        }
848    }
849
850    fn pool(builder: impl FnOnce(Builder<Steerable>) -> Builder<Steerable>) -> Pool<Steerable> {
851        builder(Pool::builder(Steerable::new())).build().unwrap()
852    }
853
854    #[test]
855    fn test_build_min_idle_precreates_resources() {
856        let p = pool(|b| b.max_size(4).min_idle(2));
857        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
858        let status = p.status();
859        assert_eq!(status.idle, 2);
860        assert_eq!(status.size, 2);
861        assert_eq!(status.in_use, 0);
862    }
863
864    #[test]
865    fn test_get_then_drop_reuses_same_resource() {
866        let p = pool(|b| b.max_size(4));
867        {
868            let first = p.get().unwrap();
869            assert_eq!(*first, 0);
870        }
871        let second = p.get().unwrap();
872        assert_eq!(*second, 0); // same resource id, reused
873        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 1);
874        assert_eq!(p.0.manager.recycled.load(Ordering::SeqCst), 1);
875    }
876
877    #[test]
878    fn test_in_use_tracks_outstanding_guards() {
879        let p = pool(|b| b.max_size(2));
880        let a = p.get().unwrap();
881        let b = p.get().unwrap();
882        assert_eq!(p.status().in_use, 2);
883        assert_eq!(p.status().idle, 0);
884        drop(a);
885        drop(b);
886        assert_eq!(p.status().in_use, 0);
887        assert_eq!(p.status().idle, 2);
888    }
889
890    #[test]
891    fn test_saturated_pool_times_out() {
892        let p = pool(|b| b.max_size(1));
893        let _held = p.get().unwrap();
894        let result = p.get_timeout(Duration::ZERO);
895        assert!(matches!(result, Err(Error::Timeout)));
896    }
897
898    #[test]
899    fn test_invalid_resource_is_discarded_and_replaced() {
900        let p = pool(|b| b.max_size(4).min_idle(1));
901        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 1);
902        p.0.manager.valid.store(false, Ordering::SeqCst);
903
904        // The single idle resource fails validation, so it is dropped and a fresh
905        // one is created. (The fresh resource is not itself re-validated.)
906        let resource = p.get().unwrap();
907        assert_eq!(*resource, 1);
908        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
909        assert!(p.0.manager.validated.load(Ordering::SeqCst) >= 1);
910    }
911
912    #[test]
913    fn test_max_lifetime_forces_replacement() {
914        let p = pool(|b| b.max_size(4).min_idle(1).max_lifetime(Some(Duration::ZERO)));
915        // Zero lifetime means any idle resource is always too old on checkout.
916        let resource = p.get().unwrap();
917        assert_eq!(*resource, 1);
918        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
919    }
920
921    #[test]
922    fn test_idle_timeout_forces_replacement() {
923        let p = pool(|b| b.max_size(4).min_idle(1).idle_timeout(Some(Duration::ZERO)));
924        let resource = p.get().unwrap();
925        assert_eq!(*resource, 1);
926        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
927    }
928
929    #[test]
930    fn test_recycle_failure_drops_resource() {
931        let p = pool(|b| b.max_size(2));
932        p.0.manager.recycle_fails.store(true, Ordering::SeqCst);
933        {
934            let _resource = p.get().unwrap();
935            assert_eq!(p.status().size, 1);
936        }
937        // Recycle failed on return, so the resource was discarded, not pooled.
938        assert_eq!(p.status().size, 0);
939        assert_eq!(p.status().idle, 0);
940    }
941
942    #[test]
943    fn test_create_failure_surfaces_and_frees_slot() {
944        let p = pool(|b| b.max_size(2));
945        p.0.manager.create_fails.store(true, Ordering::SeqCst);
946        let result = p.get();
947        assert!(matches!(
948            result,
949            Err(Error::Backend(TestError("create failed")))
950        ));
951        // The reserved slot was released, so the pool did not shrink.
952        assert_eq!(p.status().size, 0);
953    }
954
955    #[test]
956    fn test_closed_pool_rejects_checkout() {
957        let p = pool(|b| b.max_size(2).min_idle(1));
958        p.close();
959        assert!(p.is_closed());
960        assert!(matches!(p.get(), Err(Error::Closed)));
961        assert_eq!(p.status().idle, 0); // idle resources were dropped on close
962    }
963
964    #[test]
965    fn test_close_is_idempotent() {
966        let p = pool(|b| b.max_size(2).min_idle(2));
967        p.close();
968        p.close();
969        assert!(p.is_closed());
970    }
971
972    #[test]
973    fn test_build_rejects_zero_max_size() {
974        let result = Pool::builder(Steerable::new()).max_size(0).build();
975        assert!(matches!(result, Err(Error::InvalidConfig(_))));
976    }
977
978    #[test]
979    fn test_build_rejects_min_idle_above_max_size() {
980        let result = Pool::builder(Steerable::new())
981            .max_size(2)
982            .min_idle(3)
983            .build();
984        assert!(matches!(result, Err(Error::InvalidConfig(_))));
985    }
986
987    #[test]
988    fn test_try_get_does_not_block_when_saturated() {
989        let p = pool(|b| b.max_size(1));
990        let _held = p.try_get().unwrap();
991        assert!(matches!(p.try_get(), Err(Error::Timeout)));
992    }
993
994    #[test]
995    fn test_reap_prunes_time_expired_idle() {
996        // A zero idle_timeout makes every idle resource immediately expired, so
997        // `reap` is deterministic without a background thread or sleeps.
998        let p = pool(|b| b.max_size(4).min_idle(2).idle_timeout(Some(Duration::ZERO)));
999        assert_eq!(p.status().idle, 2);
1000        p.0.reap();
1001        assert_eq!(p.status().idle, 0);
1002        assert_eq!(p.status().size, 0);
1003    }
1004
1005    #[test]
1006    fn test_reap_is_noop_without_expiry_policy() {
1007        let p = pool(|b| b.max_size(4).min_idle(2));
1008        p.0.reap();
1009        assert_eq!(p.status().idle, 2); // no idle_timeout/max_lifetime: nothing to prune
1010    }
1011
1012    #[test]
1013    fn test_clone_shares_one_pool() {
1014        let p = pool(|b| b.max_size(1));
1015        let clone = p.clone();
1016        let _held = p.get().unwrap();
1017        // The clone sees the same exhausted pool.
1018        assert!(matches!(
1019            clone.get_timeout(Duration::ZERO),
1020            Err(Error::Timeout)
1021        ));
1022    }
1023}