Skip to main content

pool_mod/
pool.rs

1//! The pool itself: [`Pool`] and its [`Builder`].
2
3use std::collections::VecDeque;
4use std::sync::{Arc, Condvar, Mutex, MutexGuard, PoisonError, Weak};
5use std::thread;
6use std::time::{Duration, Instant};
7
8use crate::config::PoolConfig;
9use crate::error::Error;
10use crate::manager::Manager;
11use crate::object::Pooled;
12use crate::status::Status;
13
14/// Acquire a mutex guard, recovering the data even if a previous holder panicked.
15///
16/// The pool only mutates plain counters and a queue while the lock is held, never
17/// running user code, so the protected state is always consistent at an unlock
18/// point. Honouring poison would convert an unrelated panic elsewhere into a
19/// permanent, unrecoverable pool outage, which is the worse failure mode.
20#[inline]
21pub(crate) fn lock<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
22    mutex.lock().unwrap_or_else(PoisonError::into_inner)
23}
24
25/// A resource resting in the idle set, tagged with the timestamps the pool uses
26/// to enforce `idle_timeout` and `max_lifetime`.
27pub(crate) struct Idle<R> {
28    pub(crate) resource: R,
29    pub(crate) created_at: Instant,
30    pub(crate) last_used: Instant,
31}
32
33/// The mutable inner state guarded by the pool's mutex.
34struct State<R> {
35    idle: VecDeque<Idle<R>>,
36    /// Resources the pool currently owns: idle, checked out, or mid-creation.
37    total: usize,
38    closed: bool,
39}
40
41/// The decision reached while holding the lock, carried out once it is released.
42///
43/// Only outcomes that require running user code — and therefore must not hold the
44/// lock — escape the locked region; waiting and immediate errors are handled in
45/// place.
46enum Action<R> {
47    Reuse(Idle<R>),
48    Create,
49}
50
51/// The stop signal shared between the pool and its background reaper thread.
52///
53/// Held by both sides through an [`Arc`] so it outlives the [`PoolInner`] it
54/// guards: when the last pool handle drops, `PoolInner`'s destructor sets the
55/// flag and wakes the reaper, which then exits.
56struct Shutdown {
57    stop: Mutex<bool>,
58    wake: Condvar,
59}
60
61impl Shutdown {
62    fn new() -> Self {
63        Shutdown {
64            stop: Mutex::new(false),
65            wake: Condvar::new(),
66        }
67    }
68
69    /// Signal the reaper to stop and wake it immediately.
70    fn signal(&self) {
71        let mut stop = lock(&self.stop);
72        *stop = true;
73        drop(stop);
74        self.wake.notify_all();
75    }
76}
77
78/// Shared pool state behind an [`Arc`]. Every [`Pool`] handle and every live
79/// [`Pooled`] guard holds one of these.
80pub(crate) struct PoolInner<M: Manager> {
81    pub(crate) manager: M,
82    config: PoolConfig,
83    state: Mutex<State<M::Resource>>,
84    available: Condvar,
85    shutdown: Arc<Shutdown>,
86}
87
88impl<M: Manager> Drop for PoolInner<M> {
89    fn drop(&mut self) {
90        // The last handle is gone. Stop the reaper (if one is running) so it does
91        // not outlive the pool it maintains.
92        self.shutdown.signal();
93    }
94}
95
96impl<M: Manager> PoolInner<M> {
97    /// Take a resource out of the pool, blocking until one is free, a slot opens
98    /// for a fresh one, or the deadline passes.
99    fn acquire(
100        &self,
101        deadline: Option<Instant>,
102    ) -> Result<(M::Resource, Instant), Error<M::Error>> {
103        loop {
104            let action = {
105                let mut state = lock(&self.state);
106                loop {
107                    if state.closed {
108                        return Err(Error::Closed);
109                    }
110                    if let Some(idle) = state.idle.pop_front() {
111                        break Action::Reuse(idle);
112                    }
113                    if state.total < self.config.max_size {
114                        state.total += 1;
115                        break Action::Create;
116                    }
117                    // Saturated: wait for a check-in or a close, then re-evaluate.
118                    match deadline {
119                        None => {
120                            state = self
121                                .available
122                                .wait(state)
123                                .unwrap_or_else(PoisonError::into_inner);
124                        }
125                        Some(dl) => {
126                            let now = Instant::now();
127                            if now >= dl {
128                                return Err(Error::Timeout);
129                            }
130                            let (guard, _) = self
131                                .available
132                                .wait_timeout(state, dl - now)
133                                .unwrap_or_else(PoisonError::into_inner);
134                            state = guard;
135                        }
136                    }
137                }
138            };
139
140            match action {
141                Action::Reuse(idle) => {
142                    if let Some(prepared) = self.prepare(idle) {
143                        return Ok(prepared);
144                    }
145                    // The idle resource was stale or invalid and has been dropped;
146                    // release its slot and try again from the top.
147                    self.release_slot();
148                }
149                Action::Create => match self.manager.create() {
150                    Ok(resource) => return Ok((resource, Instant::now())),
151                    Err(source) => {
152                        self.release_slot();
153                        return Err(Error::Backend(source));
154                    }
155                },
156            }
157        }
158    }
159
160    /// Apply lifetime, idle-timeout, and validation checks to an idle resource.
161    ///
162    /// Returns the resource and its original creation time on success; returns
163    /// `None` (dropping the resource) when it is too old, has sat idle too long,
164    /// or fails validation.
165    fn prepare(&self, mut idle: Idle<M::Resource>) -> Option<(M::Resource, Instant)> {
166        if self.is_time_expired(&idle, Instant::now()) {
167            return None;
168        }
169        if !self.manager.validate(&mut idle.resource) {
170            return None;
171        }
172        Some((idle.resource, idle.created_at))
173    }
174
175    /// Whether an idle resource has outlived `max_lifetime` or `idle_timeout`.
176    ///
177    /// This is the time-based half of expiry, shared by checkout
178    /// ([`prepare`](Self::prepare)) and the background [`reap`](Self::reap); it
179    /// deliberately does not call [`Manager::validate`], which is a checkout-time
180    /// concern.
181    fn is_time_expired(&self, idle: &Idle<M::Resource>, now: Instant) -> bool {
182        if let Some(max_lifetime) = self.config.max_lifetime {
183            if now.saturating_duration_since(idle.created_at) >= max_lifetime {
184                return true;
185            }
186        }
187        if let Some(idle_timeout) = self.config.idle_timeout {
188            if now.saturating_duration_since(idle.last_used) >= idle_timeout {
189                return true;
190            }
191        }
192        false
193    }
194
195    /// Prune idle resources that have outlived their `idle_timeout` or
196    /// `max_lifetime`. Called on each tick of the background reaper.
197    ///
198    /// Expired resources are removed under the lock but dropped outside it, so a
199    /// slow destructor does not stall checkouts. The reaper never creates
200    /// resources; pruning shrinks the idle set, and on-demand growth refills it.
201    fn reap(&self) {
202        if self.config.idle_timeout.is_none() && self.config.max_lifetime.is_none() {
203            return;
204        }
205        let now = Instant::now();
206        let mut expired = Vec::new();
207        {
208            let mut state = lock(&self.state);
209            if state.closed {
210                return;
211            }
212            let mut kept = VecDeque::with_capacity(state.idle.len());
213            while let Some(idle) = state.idle.pop_front() {
214                if self.is_time_expired(&idle, now) {
215                    expired.push(idle.resource);
216                } else {
217                    kept.push_back(idle);
218                }
219            }
220            state.total = state.total.saturating_sub(expired.len());
221            state.idle = kept;
222        }
223        let freed = expired.len();
224        drop(expired); // destructors run here, outside the lock
225        if freed > 0 {
226            // Freed slots may unblock threads waiting to create a resource.
227            self.available.notify_all();
228        }
229    }
230
231    /// Give back a reserved slot and wake one waiter so it can claim it.
232    fn release_slot(&self) {
233        let mut state = lock(&self.state);
234        state.total = state.total.saturating_sub(1);
235        drop(state);
236        self.available.notify_one();
237    }
238
239    /// Return a borrowed resource to the pool. Called from [`Pooled`]'s `Drop`.
240    pub(crate) fn checkin(&self, mut resource: M::Resource, created_at: Instant) {
241        let recycled = self.manager.recycle(&mut resource);
242        let mut state = lock(&self.state);
243        if state.closed || recycled.is_err() {
244            state.total = state.total.saturating_sub(1);
245            drop(state);
246            self.available.notify_one();
247            // `resource` is dropped here, outside the lock.
248        } else {
249            let last_used = Instant::now();
250            state.idle.push_back(Idle {
251                resource,
252                created_at,
253                last_used,
254            });
255            drop(state);
256            self.available.notify_one();
257        }
258    }
259}
260
261/// The body of the background reaper thread.
262///
263/// Sleeps for `interval` between sweeps, waking early when the shutdown signal
264/// fires. It holds only a [`Weak`] reference to the pool, so it never keeps the
265/// pool alive; once every handle is dropped (`upgrade` returns `None`) or the
266/// stop flag is set, it returns and the thread ends.
267fn reaper_loop<M: Manager>(pool: Weak<PoolInner<M>>, shutdown: Arc<Shutdown>, interval: Duration) {
268    loop {
269        {
270            let stop = lock(&shutdown.stop);
271            if *stop {
272                return;
273            }
274            let (stop, _timed_out) = shutdown
275                .wake
276                .wait_timeout(stop, interval)
277                .unwrap_or_else(PoisonError::into_inner);
278            if *stop {
279                return;
280            }
281        }
282        match pool.upgrade() {
283            Some(inner) => inner.reap(),
284            None => return,
285        }
286    }
287}
288
289/// A thread-safe pool of reusable resources.
290///
291/// A `Pool<M>` lends out resources built by a [`Manager`], reclaiming and
292/// recycling each one when its [`Pooled`] guard is dropped. It is cheap to clone
293/// — every clone is a handle onto the same shared pool — so share it across
294/// threads by cloning rather than wrapping it in another `Arc`.
295///
296/// The pool is runtime-agnostic and carries no async dependency.
297/// [`get`](Pool::get) blocks the calling thread until a resource is available; in
298/// an async context, acquire on a blocking-friendly executor thread (for example
299/// `tokio::task::spawn_blocking`). The returned guard is `Send`, so it can be
300/// held across `.await` points.
301///
302/// # Examples
303///
304/// ```
305/// use pool_mod::{Manager, Pool};
306/// use std::convert::Infallible;
307///
308/// struct Connections;
309/// impl Manager for Connections {
310///     type Resource = String;
311///     type Error = Infallible;
312///     fn create(&self) -> Result<String, Infallible> { Ok(String::new()) }
313///     fn recycle(&self, c: &mut String) -> Result<(), Infallible> { c.clear(); Ok(()) }
314/// }
315///
316/// let pool = Pool::builder(Connections).max_size(4).build()
317///     .expect("configuration is valid");
318///
319/// let mut conn = pool.get().expect("a connection is available");
320/// conn.push_str("SELECT 1");
321/// assert_eq!(pool.status().in_use, 1);
322/// drop(conn);
323/// assert_eq!(pool.status().in_use, 0);
324/// ```
325pub struct Pool<M: Manager>(Arc<PoolInner<M>>);
326
327impl<M: Manager> Clone for Pool<M> {
328    fn clone(&self) -> Self {
329        Pool(Arc::clone(&self.0))
330    }
331}
332
333impl<M: Manager> Pool<M> {
334    /// Start building a pool for `manager` with the default configuration.
335    ///
336    /// # Examples
337    ///
338    /// ```
339    /// use pool_mod::{Manager, Pool};
340    /// use std::convert::Infallible;
341    /// # struct M;
342    /// # impl Manager for M {
343    /// #   type Resource = u32; type Error = Infallible;
344    /// #   fn create(&self) -> Result<u32, Infallible> { Ok(0) }
345    /// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
346    /// # }
347    /// let pool = Pool::builder(M).max_size(8).min_idle(2).build()
348    ///     .expect("configuration is valid");
349    /// assert_eq!(pool.status().max_size, 8);
350    /// ```
351    pub fn builder(manager: M) -> Builder<M> {
352        Builder::new(manager)
353    }
354
355    /// Build a pool for `manager` with the [default configuration](PoolConfig::default).
356    ///
357    /// A shortcut for `Pool::builder(manager).build()`.
358    ///
359    /// # Errors
360    ///
361    /// Returns [`Error::Backend`] if pre-creating the initial resources fails.
362    /// (With the default `min_idle` of 0, no resources are created up front, so
363    /// the default-configured pool only fails to build if you have customized the
364    /// configuration through [`Pool::builder`] instead.)
365    ///
366    /// # Examples
367    ///
368    /// ```
369    /// use pool_mod::{Manager, Pool};
370    /// use std::convert::Infallible;
371    /// # struct M;
372    /// # impl Manager for M {
373    /// #   type Resource = u32; type Error = Infallible;
374    /// #   fn create(&self) -> Result<u32, Infallible> { Ok(0) }
375    /// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
376    /// # }
377    /// let pool = Pool::new(M).expect("configuration is valid");
378    /// assert_eq!(pool.status().max_size, 10); // the default
379    /// ```
380    pub fn new(manager: M) -> Result<Self, Error<M::Error>> {
381        Builder::new(manager).build()
382    }
383
384    /// Borrow a resource, waiting up to the configured
385    /// [`create_timeout`](PoolConfig::create_timeout) if the pool is saturated.
386    ///
387    /// Reuses an idle resource when one is available (after validation), grows the
388    /// pool toward `max_size` when it is not, and otherwise blocks until a
389    /// resource is returned or the timeout elapses.
390    ///
391    /// # Errors
392    ///
393    /// - [`Error::Backend`] if the manager fails to create a resource.
394    /// - [`Error::Timeout`] if the pool stays saturated past `create_timeout`.
395    /// - [`Error::Closed`] if the pool has been closed.
396    ///
397    /// # Examples
398    ///
399    /// ```
400    /// use pool_mod::{Manager, Pool};
401    /// use std::convert::Infallible;
402    /// # struct M;
403    /// # impl Manager for M {
404    /// #   type Resource = u32; type Error = Infallible;
405    /// #   fn create(&self) -> Result<u32, Infallible> { Ok(7) }
406    /// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
407    /// # }
408    /// let pool = Pool::builder(M).max_size(2).build().expect("valid");
409    /// let resource = pool.get().expect("available");
410    /// assert_eq!(*resource, 7);
411    /// ```
412    pub fn get(&self) -> Result<Pooled<M>, Error<M::Error>> {
413        let deadline = self
414            .0
415            .config
416            .create_timeout
417            .map(|timeout| Instant::now() + timeout);
418        self.acquire(deadline)
419    }
420
421    /// Borrow a resource, waiting at most `timeout` regardless of the configured
422    /// [`create_timeout`](PoolConfig::create_timeout).
423    ///
424    /// A `timeout` of [`Duration::ZERO`] makes this a non-blocking try: it returns
425    /// [`Error::Timeout`] at once if no resource can be handed out immediately.
426    ///
427    /// # Errors
428    ///
429    /// - [`Error::Backend`] if the manager fails to create a resource.
430    /// - [`Error::Timeout`] if no resource becomes available within `timeout`.
431    /// - [`Error::Closed`] if the pool has been closed.
432    ///
433    /// # Examples
434    ///
435    /// ```
436    /// use std::time::Duration;
437    /// use pool_mod::{Error, Manager, Pool};
438    /// use std::convert::Infallible;
439    /// # struct M;
440    /// # impl Manager for M {
441    /// #   type Resource = u32; type Error = Infallible;
442    /// #   fn create(&self) -> Result<u32, Infallible> { Ok(0) }
443    /// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
444    /// # }
445    /// let pool = Pool::builder(M).max_size(1).build().expect("valid");
446    /// let held = pool.get().expect("first checkout");
447    /// // The single slot is taken, so an immediate retry times out.
448    /// assert!(matches!(pool.get_timeout(Duration::ZERO), Err(Error::Timeout)));
449    /// ```
450    pub fn get_timeout(&self, timeout: Duration) -> Result<Pooled<M>, Error<M::Error>> {
451        self.acquire(Some(Instant::now() + timeout))
452    }
453
454    /// Borrow a resource without ever blocking.
455    ///
456    /// Returns a resource if one can be handed out immediately — an idle resource
457    /// is ready, or the pool has room to create one — and otherwise returns
458    /// [`Error::Timeout`] at once. Equivalent to
459    /// [`get_timeout(Duration::ZERO)`](Pool::get_timeout).
460    ///
461    /// # Errors
462    ///
463    /// - [`Error::Backend`] if the manager fails to create a resource.
464    /// - [`Error::Timeout`] if no resource is immediately available.
465    /// - [`Error::Closed`] if the pool has been closed.
466    ///
467    /// # Examples
468    ///
469    /// ```
470    /// use pool_mod::{Error, Manager, Pool};
471    /// use std::convert::Infallible;
472    /// # struct M;
473    /// # impl Manager for M {
474    /// #   type Resource = u32; type Error = Infallible;
475    /// #   fn create(&self) -> Result<u32, Infallible> { Ok(0) }
476    /// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
477    /// # }
478    /// let pool = Pool::builder(M).max_size(1).build().expect("valid");
479    /// let first = pool.try_get().expect("room to create one");
480    /// // The only slot is taken, so the next try fails immediately.
481    /// assert!(matches!(pool.try_get(), Err(Error::Timeout)));
482    /// ```
483    pub fn try_get(&self) -> Result<Pooled<M>, Error<M::Error>> {
484        self.acquire(Some(Instant::now()))
485    }
486
487    fn acquire(&self, deadline: Option<Instant>) -> Result<Pooled<M>, Error<M::Error>> {
488        let (resource, created_at) = self.0.acquire(deadline)?;
489        Ok(Pooled::new(Arc::clone(&self.0), resource, created_at))
490    }
491
492    /// Take a snapshot of the pool's current occupancy.
493    ///
494    /// # Examples
495    ///
496    /// ```
497    /// use pool_mod::{Manager, Pool};
498    /// use std::convert::Infallible;
499    /// # struct M;
500    /// # impl Manager for M {
501    /// #   type Resource = (); type Error = Infallible;
502    /// #   fn create(&self) -> Result<(), Infallible> { Ok(()) }
503    /// #   fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
504    /// # }
505    /// let pool = Pool::builder(M).max_size(4).min_idle(1).build().expect("valid");
506    /// let status = pool.status();
507    /// assert_eq!(status.idle, 1);
508    /// assert_eq!(status.max_size, 4);
509    /// ```
510    pub fn status(&self) -> Status {
511        let state = lock(&self.0.state);
512        let idle = state.idle.len();
513        let size = state.total;
514        Status {
515            size,
516            idle,
517            in_use: size.saturating_sub(idle),
518            max_size: self.0.config.max_size,
519        }
520    }
521
522    /// Close the pool: discard every idle resource and reject all future
523    /// checkouts with [`Error::Closed`].
524    ///
525    /// Resources currently checked out are unaffected and are simply dropped
526    /// (not returned to the idle set) when their guards fall. Closing is
527    /// idempotent. Idle resources are dropped outside the pool's lock, so a slow
528    /// resource destructor does not block other threads.
529    ///
530    /// # Examples
531    ///
532    /// ```
533    /// use pool_mod::{Error, Manager, Pool};
534    /// use std::convert::Infallible;
535    /// # struct M;
536    /// # impl Manager for M {
537    /// #   type Resource = (); type Error = Infallible;
538    /// #   fn create(&self) -> Result<(), Infallible> { Ok(()) }
539    /// #   fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
540    /// # }
541    /// let pool = Pool::builder(M).max_size(2).min_idle(2).build().expect("valid");
542    /// pool.close();
543    /// assert!(pool.is_closed());
544    /// assert!(matches!(pool.get(), Err(Error::Closed)));
545    /// ```
546    pub fn close(&self) {
547        let mut state = lock(&self.0.state);
548        let drained = std::mem::take(&mut state.idle);
549        state.total = state.total.saturating_sub(drained.len());
550        state.closed = true;
551        drop(state);
552        self.0.available.notify_all();
553        self.0.shutdown.signal(); // stop the reaper; a closed pool needs no upkeep
554        drop(drained); // resource destructors run here, outside the lock
555    }
556
557    /// Report whether the pool has been [closed](Pool::close).
558    #[must_use]
559    pub fn is_closed(&self) -> bool {
560        lock(&self.0.state).closed
561    }
562}
563
564/// A fluent builder for a [`Pool`].
565///
566/// Created by [`Pool::builder`]. Each setter consumes and returns the builder, so
567/// calls chain; [`build`](Builder::build) validates the configuration and
568/// pre-creates the `min_idle` resources.
569///
570/// # Examples
571///
572/// ```
573/// use std::time::Duration;
574/// use pool_mod::{Manager, Pool};
575/// use std::convert::Infallible;
576/// # struct M;
577/// # impl Manager for M {
578/// #   type Resource = u32; type Error = Infallible;
579/// #   fn create(&self) -> Result<u32, Infallible> { Ok(0) }
580/// #   fn recycle(&self, _r: &mut u32) -> Result<(), Infallible> { Ok(()) }
581/// # }
582/// let pool = Pool::builder(M)
583///     .max_size(32)
584///     .min_idle(4)
585///     .idle_timeout(Some(Duration::from_secs(600)))
586///     .max_lifetime(Some(Duration::from_secs(3600)))
587///     .build()
588///     .expect("configuration is valid");
589/// assert_eq!(pool.status().idle, 4);
590/// ```
591#[must_use = "a Builder does nothing until `.build()` is called"]
592pub struct Builder<M: Manager> {
593    manager: M,
594    config: PoolConfig,
595}
596
597impl<M: Manager> Builder<M> {
598    /// Create a builder for `manager` seeded with the default configuration.
599    pub fn new(manager: M) -> Self {
600        Builder {
601            manager,
602            config: PoolConfig::default(),
603        }
604    }
605
606    /// Set the maximum number of resources the pool may own at once.
607    pub fn max_size(mut self, max_size: usize) -> Self {
608        self.config.max_size = max_size;
609        self
610    }
611
612    /// Set how many resources to create up front and keep ready.
613    pub fn min_idle(mut self, min_idle: usize) -> Self {
614        self.config.min_idle = min_idle;
615        self
616    }
617
618    /// Set how long [`Pool::get`] waits when the pool is saturated. `None` waits
619    /// indefinitely.
620    pub fn create_timeout(mut self, timeout: Option<Duration>) -> Self {
621        self.config.create_timeout = timeout;
622        self
623    }
624
625    /// Set the idle-expiry window. `None` disables idle expiry.
626    pub fn idle_timeout(mut self, timeout: Option<Duration>) -> Self {
627        self.config.idle_timeout = timeout;
628        self
629    }
630
631    /// Set the maximum resource lifetime. `None` disables lifetime expiry.
632    pub fn max_lifetime(mut self, lifetime: Option<Duration>) -> Self {
633        self.config.max_lifetime = lifetime;
634        self
635    }
636
637    /// Set how often a background thread prunes expired idle resources. `None`
638    /// (the default) runs no background thread, applying expiry lazily on borrow.
639    ///
640    /// Has no effect unless `idle_timeout` or `max_lifetime` is also set.
641    pub fn reap_interval(mut self, interval: Option<Duration>) -> Self {
642        self.config.reap_interval = interval;
643        self
644    }
645
646    /// Replace the entire configuration with `config`.
647    ///
648    /// Useful when the configuration is loaded from a file rather than assembled
649    /// setter by setter.
650    pub fn config(mut self, config: PoolConfig) -> Self {
651        self.config = config;
652        self
653    }
654
655    /// Validate the configuration, build the pool, and pre-create `min_idle`
656    /// resources.
657    ///
658    /// # Errors
659    ///
660    /// - [`Error::InvalidConfig`] if `max_size` is zero or `min_idle` exceeds
661    ///   `max_size`.
662    /// - [`Error::Backend`] if creating one of the `min_idle` resources fails;
663    ///   any already-created resources are dropped before returning.
664    ///
665    /// # Examples
666    ///
667    /// ```
668    /// use pool_mod::{Error, Manager, Pool};
669    /// use std::convert::Infallible;
670    /// # struct M;
671    /// # impl Manager for M {
672    /// #   type Resource = (); type Error = Infallible;
673    /// #   fn create(&self) -> Result<(), Infallible> { Ok(()) }
674    /// #   fn recycle(&self, _r: &mut ()) -> Result<(), Infallible> { Ok(()) }
675    /// # }
676    /// let invalid = Pool::builder(M).max_size(0).build();
677    /// assert!(matches!(invalid, Err(Error::InvalidConfig(_))));
678    /// ```
679    pub fn build(self) -> Result<Pool<M>, Error<M::Error>> {
680        if self.config.max_size == 0 {
681            return Err(Error::InvalidConfig("max_size must be at least 1"));
682        }
683        if self.config.min_idle > self.config.max_size {
684            return Err(Error::InvalidConfig("min_idle must not exceed max_size"));
685        }
686
687        let pool = Pool(Arc::new(PoolInner {
688            manager: self.manager,
689            config: self.config,
690            state: Mutex::new(State {
691                idle: VecDeque::with_capacity(self.config.max_size),
692                total: 0,
693                closed: false,
694            }),
695            available: Condvar::new(),
696            shutdown: Arc::new(Shutdown::new()),
697        }));
698
699        for _ in 0..pool.0.config.min_idle {
700            match pool.0.manager.create() {
701                Ok(resource) => {
702                    let now = Instant::now();
703                    let mut state = lock(&pool.0.state);
704                    state.idle.push_back(Idle {
705                        resource,
706                        created_at: now,
707                        last_used: now,
708                    });
709                    state.total += 1;
710                }
711                Err(source) => return Err(Error::Backend(source)),
712            }
713        }
714
715        pool.spawn_reaper();
716
717        Ok(pool)
718    }
719}
720
721impl<M: Manager> Pool<M> {
722    /// Start the background reaper if `reap_interval` is configured.
723    ///
724    /// The reaper holds only a [`Weak`] handle, so it never keeps the pool alive;
725    /// it stops when every handle is dropped or the pool is closed. If the OS
726    /// refuses a new thread, the pool keeps working with expiry applied lazily on
727    /// checkout — the reaper is an optimization, not a correctness requirement.
728    fn spawn_reaper(&self) {
729        let Some(interval) = self.0.config.reap_interval else {
730            return;
731        };
732        let pool = Arc::downgrade(&self.0);
733        let shutdown = Arc::clone(&self.0.shutdown);
734        match thread::Builder::new()
735            .name("pool-mod-reaper".to_owned())
736            .spawn(move || reaper_loop(pool, shutdown, interval))
737        {
738            // Detach the handle; the reaper self-terminates via the shutdown
739            // signal and its `Weak` reference.
740            Ok(handle) => drop(handle),
741            // Spawn failed: fall back to lazy, checkout-time expiry.
742            Err(_) => self.0.shutdown.signal(),
743        }
744    }
745}
746
747#[cfg(test)]
748// Justification: in test code a failed setup or checkout has no meaningful
749// recovery — unwinding with a clear panic is the correct outcome. REPS permits
750// `unwrap`/`expect` in test modules for exactly this reason.
751#[allow(clippy::unwrap_used, clippy::expect_used)]
752mod tests {
753    use super::*;
754    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
755
756    #[derive(Debug, PartialEq, Eq)]
757    struct TestError(&'static str);
758
759    impl std::fmt::Display for TestError {
760        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761            f.write_str(self.0)
762        }
763    }
764
765    impl std::error::Error for TestError {}
766
767    /// A manager whose behaviour is steerable through atomics so individual
768    /// lifecycle paths can be exercised deterministically.
769    struct Steerable {
770        created: AtomicUsize,
771        recycled: AtomicUsize,
772        validated: AtomicUsize,
773        create_fails: AtomicBool,
774        recycle_fails: AtomicBool,
775        valid: AtomicBool,
776    }
777
778    impl Steerable {
779        fn new() -> Self {
780            Steerable {
781                created: AtomicUsize::new(0),
782                recycled: AtomicUsize::new(0),
783                validated: AtomicUsize::new(0),
784                create_fails: AtomicBool::new(false),
785                recycle_fails: AtomicBool::new(false),
786                valid: AtomicBool::new(true),
787            }
788        }
789    }
790
791    impl Manager for Steerable {
792        type Resource = usize;
793        type Error = TestError;
794
795        fn create(&self) -> Result<usize, TestError> {
796            if self.create_fails.load(Ordering::SeqCst) {
797                return Err(TestError("create failed"));
798            }
799            Ok(self.created.fetch_add(1, Ordering::SeqCst))
800        }
801
802        fn recycle(&self, _resource: &mut usize) -> Result<(), TestError> {
803            let _ = self.recycled.fetch_add(1, Ordering::SeqCst);
804            if self.recycle_fails.load(Ordering::SeqCst) {
805                return Err(TestError("recycle failed"));
806            }
807            Ok(())
808        }
809
810        fn validate(&self, _resource: &mut usize) -> bool {
811            let _ = self.validated.fetch_add(1, Ordering::SeqCst);
812            self.valid.load(Ordering::SeqCst)
813        }
814    }
815
816    fn pool(builder: impl FnOnce(Builder<Steerable>) -> Builder<Steerable>) -> Pool<Steerable> {
817        builder(Pool::builder(Steerable::new())).build().unwrap()
818    }
819
820    #[test]
821    fn test_build_min_idle_precreates_resources() {
822        let p = pool(|b| b.max_size(4).min_idle(2));
823        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
824        let status = p.status();
825        assert_eq!(status.idle, 2);
826        assert_eq!(status.size, 2);
827        assert_eq!(status.in_use, 0);
828    }
829
830    #[test]
831    fn test_get_then_drop_reuses_same_resource() {
832        let p = pool(|b| b.max_size(4));
833        {
834            let first = p.get().unwrap();
835            assert_eq!(*first, 0);
836        }
837        let second = p.get().unwrap();
838        assert_eq!(*second, 0); // same resource id, reused
839        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 1);
840        assert_eq!(p.0.manager.recycled.load(Ordering::SeqCst), 1);
841    }
842
843    #[test]
844    fn test_in_use_tracks_outstanding_guards() {
845        let p = pool(|b| b.max_size(2));
846        let a = p.get().unwrap();
847        let b = p.get().unwrap();
848        assert_eq!(p.status().in_use, 2);
849        assert_eq!(p.status().idle, 0);
850        drop(a);
851        drop(b);
852        assert_eq!(p.status().in_use, 0);
853        assert_eq!(p.status().idle, 2);
854    }
855
856    #[test]
857    fn test_saturated_pool_times_out() {
858        let p = pool(|b| b.max_size(1));
859        let _held = p.get().unwrap();
860        let result = p.get_timeout(Duration::ZERO);
861        assert!(matches!(result, Err(Error::Timeout)));
862    }
863
864    #[test]
865    fn test_invalid_resource_is_discarded_and_replaced() {
866        let p = pool(|b| b.max_size(4).min_idle(1));
867        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 1);
868        p.0.manager.valid.store(false, Ordering::SeqCst);
869
870        // The single idle resource fails validation, so it is dropped and a fresh
871        // one is created. (The fresh resource is not itself re-validated.)
872        let resource = p.get().unwrap();
873        assert_eq!(*resource, 1);
874        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
875        assert!(p.0.manager.validated.load(Ordering::SeqCst) >= 1);
876    }
877
878    #[test]
879    fn test_max_lifetime_forces_replacement() {
880        let p = pool(|b| b.max_size(4).min_idle(1).max_lifetime(Some(Duration::ZERO)));
881        // Zero lifetime means any idle resource is always too old on checkout.
882        let resource = p.get().unwrap();
883        assert_eq!(*resource, 1);
884        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
885    }
886
887    #[test]
888    fn test_idle_timeout_forces_replacement() {
889        let p = pool(|b| b.max_size(4).min_idle(1).idle_timeout(Some(Duration::ZERO)));
890        let resource = p.get().unwrap();
891        assert_eq!(*resource, 1);
892        assert_eq!(p.0.manager.created.load(Ordering::SeqCst), 2);
893    }
894
895    #[test]
896    fn test_recycle_failure_drops_resource() {
897        let p = pool(|b| b.max_size(2));
898        p.0.manager.recycle_fails.store(true, Ordering::SeqCst);
899        {
900            let _resource = p.get().unwrap();
901            assert_eq!(p.status().size, 1);
902        }
903        // Recycle failed on return, so the resource was discarded, not pooled.
904        assert_eq!(p.status().size, 0);
905        assert_eq!(p.status().idle, 0);
906    }
907
908    #[test]
909    fn test_create_failure_surfaces_and_frees_slot() {
910        let p = pool(|b| b.max_size(2));
911        p.0.manager.create_fails.store(true, Ordering::SeqCst);
912        let result = p.get();
913        assert!(matches!(
914            result,
915            Err(Error::Backend(TestError("create failed")))
916        ));
917        // The reserved slot was released, so the pool did not shrink.
918        assert_eq!(p.status().size, 0);
919    }
920
921    #[test]
922    fn test_closed_pool_rejects_checkout() {
923        let p = pool(|b| b.max_size(2).min_idle(1));
924        p.close();
925        assert!(p.is_closed());
926        assert!(matches!(p.get(), Err(Error::Closed)));
927        assert_eq!(p.status().idle, 0); // idle resources were dropped on close
928    }
929
930    #[test]
931    fn test_close_is_idempotent() {
932        let p = pool(|b| b.max_size(2).min_idle(2));
933        p.close();
934        p.close();
935        assert!(p.is_closed());
936    }
937
938    #[test]
939    fn test_build_rejects_zero_max_size() {
940        let result = Pool::builder(Steerable::new()).max_size(0).build();
941        assert!(matches!(result, Err(Error::InvalidConfig(_))));
942    }
943
944    #[test]
945    fn test_build_rejects_min_idle_above_max_size() {
946        let result = Pool::builder(Steerable::new())
947            .max_size(2)
948            .min_idle(3)
949            .build();
950        assert!(matches!(result, Err(Error::InvalidConfig(_))));
951    }
952
953    #[test]
954    fn test_try_get_does_not_block_when_saturated() {
955        let p = pool(|b| b.max_size(1));
956        let _held = p.try_get().unwrap();
957        assert!(matches!(p.try_get(), Err(Error::Timeout)));
958    }
959
960    #[test]
961    fn test_reap_prunes_time_expired_idle() {
962        // A zero idle_timeout makes every idle resource immediately expired, so
963        // `reap` is deterministic without a background thread or sleeps.
964        let p = pool(|b| b.max_size(4).min_idle(2).idle_timeout(Some(Duration::ZERO)));
965        assert_eq!(p.status().idle, 2);
966        p.0.reap();
967        assert_eq!(p.status().idle, 0);
968        assert_eq!(p.status().size, 0);
969    }
970
971    #[test]
972    fn test_reap_is_noop_without_expiry_policy() {
973        let p = pool(|b| b.max_size(4).min_idle(2));
974        p.0.reap();
975        assert_eq!(p.status().idle, 2); // no idle_timeout/max_lifetime: nothing to prune
976    }
977
978    #[test]
979    fn test_clone_shares_one_pool() {
980        let p = pool(|b| b.max_size(1));
981        let clone = p.clone();
982        let _held = p.get().unwrap();
983        // The clone sees the same exhausted pool.
984        assert!(matches!(
985            clone.get_timeout(Duration::ZERO),
986            Err(Error::Timeout)
987        ));
988    }
989}