Skip to main content

quex/
pool.rs

1#![cfg_attr(
2    not(any(feature = "mariadb", feature = "postgres", feature = "sqlite")),
3    allow(unreachable_code, unused_variables)
4)]
5
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant};
8use std::{future::Future, pin::Pin};
9
10use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
11use tokio::time::timeout;
12
13use crate::connection::{
14    ManagedConnection, commit_inner, connect_managed, prepare_inner, query_inner, rollback_inner,
15    start_transaction_inner,
16};
17use crate::{
18    BoundStatement, ConnectOptions, Driver, Encode, Error, ExecResult, Executor,
19    MysqlConnectOptions, ParamSource, PostgresConnectOptions, PreparedStatement, Result, Rows,
20    SqliteConnectOptions, Statement,
21};
22
23/// A simple handle for one database connection.
24///
25/// Use `Connection` when one connection is enough. Use [`Pool`] when many
26/// tasks need to acquire their own connection.
27///
28/// `Connection` owns one connection for its whole lifetime. It does not do
29/// pooling or background connection management.
30pub struct Connection {
31    pub(crate) conn: ManagedConnection,
32}
33
34/// Converts a value into generic connection options.
35///
36/// Strings are parsed as database URLs. Typed option builders are copied into
37/// the generic option type.
38///
39/// This trait is mostly a convenience for constructors like
40/// [`Connection::connect`]
41/// and [`Pool::connect`].
42pub trait IntoConnectOptions {
43    /// Converts into [`ConnectOptions`].
44    fn into_connect_options(self) -> Result<ConnectOptions>;
45}
46
47impl<T> IntoConnectOptions for T
48where
49    T: AsRef<str>,
50{
51    fn into_connect_options(self) -> Result<ConnectOptions> {
52        ConnectOptions::from_url(self.as_ref())
53    }
54}
55
56impl IntoConnectOptions for SqliteConnectOptions {
57    fn into_connect_options(self) -> Result<ConnectOptions> {
58        Ok(ConnectOptions::from(self))
59    }
60}
61
62impl IntoConnectOptions for PostgresConnectOptions {
63    fn into_connect_options(self) -> Result<ConnectOptions> {
64        Ok(ConnectOptions::from(self))
65    }
66}
67
68impl IntoConnectOptions for MysqlConnectOptions {
69    fn into_connect_options(self) -> Result<ConnectOptions> {
70        Ok(ConnectOptions::from(self))
71    }
72}
73
74impl IntoConnectOptions for ConnectOptions {
75    fn into_connect_options(self) -> Result<ConnectOptions> {
76        Ok(self)
77    }
78}
79
80impl Connection {
81    /// Connects from a database URL or typed connection options.
82    pub async fn connect(options: impl IntoConnectOptions) -> Result<Self> {
83        let options = options.into_connect_options()?;
84        Ok(Self {
85            conn: connect_managed(options).await?,
86        })
87    }
88
89    /// Returns the selected driver.
90    pub fn driver(&self) -> Driver {
91        self.conn.driver
92    }
93
94    /// Runs SQL and returns rows.
95    ///
96    /// For statements with parameters, prefer [`crate::query`].
97    pub async fn query(&mut self, sql: &str) -> Result<Rows<'_>> {
98        query_inner(&mut self.conn.inner, sql).await
99    }
100
101    /// Prepares a reusable statement.
102    pub async fn prepare(&mut self, sql: &str) -> Result<Statement<'_>> {
103        prepare_inner(&mut self.conn.inner, sql).await
104    }
105
106    /// Starts a transaction.
107    ///
108    /// Dropping the transaction without committing or rolling back follows the
109    /// behavior of the selected driver.
110    pub async fn begin(&mut self) -> Result<Transaction<'_>> {
111        Transaction::begin(&mut self.conn).await
112    }
113
114    /// Starts listening on one Postgres notification channel.
115    #[cfg(feature = "postgres")]
116    pub async fn listen(&mut self, channel: &str) -> Result<()> {
117        match &mut self.conn.inner {
118            crate::connection::ConnectionInner::Postgres(conn) => conn.listen(channel).await?,
119            _ => {
120                return Err(Error::Unsupported(
121                    "postgres notifications require a postgres connection".into(),
122                ));
123            }
124        }
125        Ok(())
126    }
127
128    /// Stops listening on one Postgres notification channel.
129    #[cfg(feature = "postgres")]
130    pub async fn unlisten(&mut self, channel: &str) -> Result<()> {
131        match &mut self.conn.inner {
132            crate::connection::ConnectionInner::Postgres(conn) => {
133                conn.unlisten(channel).await.map_err(Error::from)?
134            }
135            _ => {
136                return Err(Error::Unsupported(
137                    "postgres notifications require a postgres connection".into(),
138                ));
139            }
140        }
141        Ok(())
142    }
143
144    /// Stops listening on all Postgres notification channels.
145    #[cfg(feature = "postgres")]
146    pub async fn unlisten_all(&mut self) -> Result<()> {
147        match &mut self.conn.inner {
148            crate::connection::ConnectionInner::Postgres(conn) => {
149                conn.unlisten_all().await.map_err(Error::from)?
150            }
151            _ => {
152                return Err(Error::Unsupported(
153                    "postgres notifications require a postgres connection".into(),
154                ));
155            }
156        }
157        Ok(())
158    }
159
160    /// Sends one Postgres notification.
161    #[cfg(feature = "postgres")]
162    pub async fn notify(&mut self, channel: &str, payload: Option<&str>) -> Result<()> {
163        match &mut self.conn.inner {
164            crate::connection::ConnectionInner::Postgres(conn) => {
165                conn.notify(channel, payload).await.map_err(Error::from)?
166            }
167            _ => {
168                return Err(Error::Unsupported(
169                    "postgres notifications require a postgres connection".into(),
170                ));
171            }
172        }
173        Ok(())
174    }
175
176    /// Returns one pending Postgres notification if one is already buffered.
177    #[cfg(feature = "postgres")]
178    pub async fn try_recv_notification(&mut self) -> Result<Option<crate::PostgresNotification>> {
179        match &mut self.conn.inner {
180            crate::connection::ConnectionInner::Postgres(conn) => {
181                conn.try_recv_notification().await.map_err(Error::from)
182            }
183            _ => Err(Error::Unsupported(
184                "postgres notifications require a postgres connection".into(),
185            )),
186        }
187    }
188
189    /// Waits for the next Postgres notification on this connection.
190    #[cfg(feature = "postgres")]
191    pub async fn wait_for_notification(&mut self) -> Result<crate::PostgresNotification> {
192        match &mut self.conn.inner {
193            crate::connection::ConnectionInner::Postgres(conn) => {
194                conn.wait_for_notification().await.map_err(Error::from)
195            }
196            _ => Err(Error::Unsupported(
197                "postgres notifications require a postgres connection".into(),
198            )),
199        }
200    }
201}
202
203impl Executor for &mut Connection {
204    type Rows<'a>
205        = Rows<'a>
206    where
207        Self: 'a;
208    type Statement<'a>
209        = Statement<'a>
210    where
211        Self: 'a;
212
213    fn driver(&self) -> Driver {
214        Connection::driver(self)
215    }
216
217    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
218        Connection::query(*self, sql).await
219    }
220
221    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
222    where
223        P: ParamSource + ?Sized,
224    {
225        let mut stmt = Connection::prepare(*self, sql).await?;
226        let rows = stmt.execute_source(params).await?;
227        Ok(rows.into_lifetime())
228    }
229
230    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
231        Connection::prepare(*self, sql).await
232    }
233}
234
235impl Executor for Connection {
236    type Rows<'a>
237        = Rows<'a>
238    where
239        Self: 'a;
240    type Statement<'a>
241        = Statement<'a>
242    where
243        Self: 'a;
244
245    fn driver(&self) -> Driver {
246        self.driver()
247    }
248
249    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
250        Connection::query(self, sql).await
251    }
252
253    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
254    where
255        P: ParamSource + ?Sized,
256    {
257        let mut stmt = Connection::prepare(self, sql).await?;
258        let rows = stmt.execute_source(params).await?;
259        Ok(rows.into_lifetime())
260    }
261
262    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
263        Connection::prepare(self, sql).await
264    }
265}
266
267/// A transaction on a [`Connection`] connection.
268///
269/// Dropping a transaction before commit or rollback follows the behavior of the
270/// selected driver.
271///
272/// While the transaction is alive, it keeps the `Connection` borrowed.
273pub struct Transaction<'a> {
274    inner: Option<TransactionInner<'a>>,
275}
276
277enum TransactionInner<'a> {
278    #[cfg(feature = "mariadb")]
279    Mysql(quex_driver::mysql::Transaction<'a>),
280    #[cfg(feature = "postgres")]
281    Postgres(quex_driver::postgres::Transaction<'a>),
282    #[cfg(feature = "sqlite")]
283    Sqlite(quex_driver::sqlite::Transaction<'a>),
284    _Marker(std::marker::PhantomData<&'a ()>),
285}
286
287impl<'a> Transaction<'a> {
288    async fn begin(conn: &'a mut ManagedConnection) -> Result<Self> {
289        let inner = match &mut conn.inner {
290            #[cfg(feature = "mariadb")]
291            crate::connection::ConnectionInner::Mysql(conn) => {
292                TransactionInner::Mysql(conn.begin().await?)
293            }
294            #[cfg(feature = "postgres")]
295            crate::connection::ConnectionInner::Postgres(conn) => {
296                TransactionInner::Postgres(conn.begin().await?)
297            }
298            #[cfg(feature = "sqlite")]
299            crate::connection::ConnectionInner::Sqlite(conn) => {
300                TransactionInner::Sqlite(conn.begin().await?)
301            }
302            crate::connection::ConnectionInner::_Disabled => {
303                unreachable!("disabled backend placeholder")
304            }
305        };
306        Ok(Self { inner: Some(inner) })
307    }
308
309    fn inner_mut(&mut self) -> &mut TransactionInner<'a> {
310        self.inner.as_mut().expect("transaction missing")
311    }
312}
313
314/// A transaction started from a [`Pool`].
315///
316/// This holds one checked-out connection for the life of the transaction.
317/// Dropping it before commit or rollback marks that connection as broken so it
318/// will not be reused.
319///
320/// Use this when you want a transaction directly from the pool without first
321/// calling [`Pool::acquire`].
322pub struct PoolTransaction {
323    conn: PooledConnection,
324    finished: bool,
325}
326
327impl Transaction<'_> {
328    /// Returns the selected driver.
329    pub fn driver(&self) -> Driver {
330        match self.inner.as_ref().expect("transaction missing") {
331            #[cfg(feature = "mariadb")]
332            TransactionInner::Mysql(_) => Driver::Mysql,
333            #[cfg(feature = "postgres")]
334            TransactionInner::Postgres(_) => Driver::Pgsql,
335            #[cfg(feature = "sqlite")]
336            TransactionInner::Sqlite(_) => Driver::Sqlite,
337            TransactionInner::_Marker(_) => unreachable!("disabled backend placeholder"),
338        }
339    }
340
341    /// Runs SQL inside the transaction.
342    pub async fn query(&mut self, sql: &str) -> Result<Rows<'_>> {
343        match self.inner_mut() {
344            #[cfg(feature = "mariadb")]
345            TransactionInner::Mysql(tx) => Ok(Rows::mysql(tx.connection().query(sql).await?)),
346            #[cfg(feature = "postgres")]
347            TransactionInner::Postgres(tx) => Ok(Rows::postgres(tx.connection().query(sql).await?)),
348            #[cfg(feature = "sqlite")]
349            TransactionInner::Sqlite(tx) => Ok(Rows::sqlite(tx.connection().query(sql).await?)),
350            TransactionInner::_Marker(_) => unreachable!("disabled backend placeholder"),
351        }
352    }
353
354    /// Prepares SQL inside the transaction.
355    pub async fn prepare(&mut self, sql: &str) -> Result<Statement<'_>> {
356        match self.inner_mut() {
357            #[cfg(feature = "mariadb")]
358            TransactionInner::Mysql(tx) => {
359                Ok(Statement::Mysql(tx.connection().prepare_cached(sql).await?))
360            }
361            #[cfg(feature = "postgres")]
362            TransactionInner::Postgres(tx) => {
363                let sql = crate::connection::rewrite_postgres_placeholders(sql);
364                Ok(Statement::Postgres(
365                    tx.connection().prepare_cached(&sql).await?,
366                ))
367            }
368            #[cfg(feature = "sqlite")]
369            TransactionInner::Sqlite(tx) => Ok(Statement::Sqlite(
370                tx.connection().prepare_cached(sql).await?,
371            )),
372            TransactionInner::_Marker(_) => unreachable!("disabled backend placeholder"),
373        }
374    }
375
376    /// Commits the transaction.
377    pub async fn commit(mut self) -> Result<()> {
378        match self.inner.take().expect("transaction missing") {
379            #[cfg(feature = "mariadb")]
380            TransactionInner::Mysql(tx) => tx.commit().await.map_err(Into::into),
381            #[cfg(feature = "postgres")]
382            TransactionInner::Postgres(tx) => tx.commit().await.map_err(Into::into),
383            #[cfg(feature = "sqlite")]
384            TransactionInner::Sqlite(tx) => tx.commit().await.map_err(Into::into),
385            TransactionInner::_Marker(_) => unreachable!("disabled backend placeholder"),
386        }
387    }
388
389    /// Rolls the transaction back.
390    pub async fn rollback(mut self) -> Result<()> {
391        match self.inner.take().expect("transaction missing") {
392            #[cfg(feature = "mariadb")]
393            TransactionInner::Mysql(tx) => tx.rollback().await.map_err(Into::into),
394            #[cfg(feature = "postgres")]
395            TransactionInner::Postgres(tx) => tx.rollback().await.map_err(Into::into),
396            #[cfg(feature = "sqlite")]
397            TransactionInner::Sqlite(tx) => tx.rollback().await.map_err(Into::into),
398            TransactionInner::_Marker(_) => unreachable!("disabled backend placeholder"),
399        }
400    }
401}
402
403impl PoolTransaction {
404    /// Returns the selected driver.
405    pub fn driver(&self) -> Driver {
406        self.conn.driver()
407    }
408
409    /// Runs SQL inside the transaction.
410    pub async fn query(&mut self, sql: &str) -> Result<Rows<'_>> {
411        self.conn.query(sql).await
412    }
413
414    /// Prepares SQL inside the transaction.
415    pub async fn prepare(&mut self, sql: &str) -> Result<PooledStatement<'_>> {
416        self.conn.prepare(sql).await
417    }
418
419    /// Commits the transaction.
420    pub async fn commit(mut self) -> Result<()> {
421        self.finished = true;
422        match commit_inner(&mut self.conn.conn_mut().inner).await {
423            Ok(()) => Ok(()),
424            Err(err) => {
425                self.conn.mark_broken();
426                Err(err)
427            }
428        }
429    }
430
431    /// Rolls the transaction back.
432    pub async fn rollback(mut self) -> Result<()> {
433        self.finished = true;
434        match rollback_inner(&mut self.conn.conn_mut().inner).await {
435            Ok(()) => Ok(()),
436            Err(err) => {
437                self.conn.mark_broken();
438                Err(err)
439            }
440        }
441    }
442}
443
444impl Executor for &mut Transaction<'_> {
445    type Rows<'a>
446        = Rows<'a>
447    where
448        Self: 'a;
449    type Statement<'a>
450        = Statement<'a>
451    where
452        Self: 'a;
453
454    fn driver(&self) -> Driver {
455        Transaction::driver(self)
456    }
457
458    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
459        Transaction::query(*self, sql).await
460    }
461
462    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
463    where
464        P: ParamSource + ?Sized,
465    {
466        let mut stmt = Transaction::prepare(*self, sql).await?;
467        let rows = stmt.execute_source(params).await?;
468        Ok(rows.into_lifetime())
469    }
470
471    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
472        Transaction::prepare(*self, sql).await
473    }
474}
475
476impl Executor for Transaction<'_> {
477    type Rows<'a>
478        = Rows<'a>
479    where
480        Self: 'a;
481    type Statement<'a>
482        = Statement<'a>
483    where
484        Self: 'a;
485
486    fn driver(&self) -> Driver {
487        self.driver()
488    }
489
490    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
491        Transaction::query(self, sql).await
492    }
493
494    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
495    where
496        P: ParamSource + ?Sized,
497    {
498        let mut stmt = Transaction::prepare(self, sql).await?;
499        let rows = stmt.execute_source(params).await?;
500        Ok(rows.into_lifetime())
501    }
502
503    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
504        Transaction::prepare(self, sql).await
505    }
506}
507
508impl Drop for Transaction<'_> {
509    fn drop(&mut self) {
510        let _ = self.inner.take();
511    }
512}
513
514impl Executor for &mut PoolTransaction {
515    type Rows<'a>
516        = Rows<'a>
517    where
518        Self: 'a;
519    type Statement<'a>
520        = PooledStatement<'a>
521    where
522        Self: 'a;
523
524    fn driver(&self) -> Driver {
525        PoolTransaction::driver(self)
526    }
527
528    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
529        PoolTransaction::query(*self, sql).await
530    }
531
532    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
533    where
534        P: ParamSource + ?Sized,
535    {
536        let mut stmt = PoolTransaction::prepare(*self, sql).await?;
537        let rows = stmt.execute_source(params).await?;
538        Ok(rows.into_lifetime())
539    }
540
541    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
542        PoolTransaction::prepare(*self, sql).await
543    }
544}
545
546impl Executor for PoolTransaction {
547    type Rows<'a>
548        = Rows<'a>
549    where
550        Self: 'a;
551    type Statement<'a>
552        = PooledStatement<'a>
553    where
554        Self: 'a;
555
556    fn driver(&self) -> Driver {
557        self.driver()
558    }
559
560    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
561        PoolTransaction::query(self, sql).await
562    }
563
564    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
565    where
566        P: ParamSource + ?Sized,
567    {
568        let mut stmt = PoolTransaction::prepare(self, sql).await?;
569        let rows = stmt.execute_source(params).await?;
570        Ok(rows.into_lifetime())
571    }
572
573    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
574        PoolTransaction::prepare(self, sql).await
575    }
576}
577
578impl Drop for PoolTransaction {
579    fn drop(&mut self) {
580        if !self.finished {
581            self.conn.mark_broken();
582        }
583    }
584}
585
586/// A cloneable pool of database connections.
587///
588/// Use `Pool` when many tasks need database access at the same time. Cloning
589/// the pool gives you another handle to the same shared pool state.
590///
591/// `Pool` is lazy by default. Building it does not open a connection or check
592/// that the database is reachable unless [`PoolBuilder::min_connections`] asks
593/// it to warm some connections up first. After that, [`Pool::acquire`],
594/// [`Pool::query`], [`Pool::prepare`], and [`Pool::begin`] open a connection
595/// only when the pool has no idle one ready to reuse.
596///
597/// The pool keeps up to `max_size` open connections. When that many
598/// connections are already checked out, [`Pool::acquire`] waits until one is
599/// returned. Idle connections stay in the pool and are reused by later
600/// callers.
601///
602/// There is no background opener, health checker, or maintenance task. Idle
603/// timeout and max lifetime limits are enforced lazily when a connection is
604/// checked out or returned to the pool. Failed operations and unfinished pool
605/// transactions mark that checked-out connection as broken so it will not be
606/// reused.
607///
608/// Use [`Pool::with_hooks`] when the pool should run setup on fresh
609/// connections or validate a connection before handing it out.
610///
611/// Use [`Connection`] when one long-lived connection is enough.
612#[derive(Clone)]
613pub struct Pool {
614    inner: Arc<PoolInner>,
615    hooks: Option<Arc<Hooks>>,
616}
617
618struct PoolInner {
619    driver: Driver,
620    max_size: usize,
621    permits: Arc<Semaphore>,
622    idle: Mutex<Vec<IdleConnection>>,
623    options: Option<ConnectOptions>,
624    acquire_timeout: Option<Duration>,
625    idle_timeout: Option<Duration>,
626    max_lifetime: Option<Duration>,
627}
628
629struct IdleConnection {
630    conn: ManagedConnection,
631    created_at: Instant,
632    idle_since: Instant,
633    on_connect_ran: bool,
634}
635
636type HookFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T>> + Send + 'a>>;
637
638trait OnConnectHook: Send + Sync {
639    fn call<'a>(&'a self, conn: &'a mut PooledConnection) -> HookFuture<'a, ()>;
640}
641
642impl<F> OnConnectHook for F
643where
644    F: Send + Sync + 'static + for<'a> Fn(&'a mut PooledConnection) -> HookFuture<'a, ()>,
645{
646    fn call<'a>(&'a self, conn: &'a mut PooledConnection) -> HookFuture<'a, ()> {
647        Box::pin(self(conn))
648    }
649}
650
651trait BeforeAcquireHook: Send + Sync {
652    fn call<'a>(&'a self, conn: &'a mut PooledConnection) -> HookFuture<'a, AcquireDecision>;
653}
654
655impl<F> BeforeAcquireHook for F
656where
657    F: Send
658        + Sync
659        + 'static
660        + for<'a> Fn(&'a mut PooledConnection) -> HookFuture<'a, AcquireDecision>,
661{
662    fn call<'a>(&'a self, conn: &'a mut PooledConnection) -> HookFuture<'a, AcquireDecision> {
663        Box::pin(self(conn))
664    }
665}
666
667/// Decides what the pool should do with a connection after `before_acquire`.
668#[derive(Debug, Clone, Copy, PartialEq, Eq)]
669pub enum AcquireDecision {
670    /// Hand the connection to the caller.
671    Accept,
672    /// Drop this connection and try another one.
673    Retry,
674}
675
676/// Optional pool hooks.
677///
678/// Use this with [`Pool::with_hooks`] when you want connection setup or
679/// validation around pool checkout.
680///
681/// This stays empty by default:
682///
683/// ```rust
684/// # let hooks =
685/// quex::Hooks::new();
686/// # let _ = hooks;
687/// ```
688///
689/// A more typical setup looks like this:
690///
691/// ```rust
692/// # let hooks =
693/// quex::Hooks::new()
694///     .on_connect(|conn| Box::pin(async move {
695///         conn.query("create table if not exists users(id integer primary key)")
696///             .await?;
697///         Ok(())
698///     }))
699///     .before_acquire(|conn| Box::pin(async move {
700///         conn.query("select 1").await?;
701///         Ok(quex::AcquireDecision::Accept)
702///     }));
703/// # let _ = hooks;
704/// ```
705#[derive(Default)]
706pub struct Hooks {
707    on_connect: Option<Arc<dyn OnConnectHook>>,
708    before_acquire: Option<Arc<dyn BeforeAcquireHook>>,
709}
710
711impl Hooks {
712    /// Creates an empty hook set.
713    pub fn new() -> Self {
714        Self::default()
715    }
716
717    /// Runs after the pool opens a fresh connection.
718    ///
719    /// Use this for setup that should happen once per new connection.
720    ///
721    /// This does not run when the pool reuses an idle connection.
722    pub fn on_connect<F>(mut self, hook: F) -> Self
723    where
724        F: Send + Sync + 'static + for<'a> Fn(&'a mut PooledConnection) -> HookFuture<'a, ()>,
725    {
726        self.on_connect = Some(Arc::new(hook));
727        self
728    }
729
730    /// Runs before the pool returns a checked-out connection.
731    ///
732    /// Use this to validate the connection and decide whether the pool should
733    /// hand it out or discard it and try again.
734    ///
735    /// Return [`AcquireDecision::Accept`] to hand the connection to the
736    /// caller, or [`AcquireDecision::Retry`] to discard this candidate and let
737    /// the pool try another one.
738    pub fn before_acquire<F>(mut self, hook: F) -> Self
739    where
740        F: Send
741            + Sync
742            + 'static
743            + for<'a> Fn(&'a mut PooledConnection) -> HookFuture<'a, AcquireDecision>,
744    {
745        self.before_acquire = Some(Arc::new(hook));
746        self
747    }
748}
749
750impl Pool {
751    /// Starts building a pool for the given connection options.
752    pub fn connect(options: impl IntoConnectOptions) -> Result<PoolBuilder> {
753        PoolBuilder::new().connect(options)
754    }
755
756    /// Returns a pool handle with hooks enabled.
757    ///
758    /// Use [`Hooks`] when new connections need setup work or checked-out
759    /// connections need validation before they are returned.
760    pub fn with_hooks(mut self, hooks: Hooks) -> Self {
761        self.hooks = Some(Arc::new(hooks));
762        self
763    }
764
765    /// Acquires one connection from the pool.
766    ///
767    /// This first tries to reuse an idle connection. If none is available and
768    /// the pool is still below `max_size`, it opens a new one from the stored
769    /// connection options.
770    ///
771    /// When all pooled connections are already checked out, this waits until
772    /// one is returned.
773    ///
774    /// If hooks are configured, fresh connections run [`Hooks::on_connect`]
775    /// and every candidate runs [`Hooks::before_acquire`] before this returns.
776    pub async fn acquire(&self) -> Result<PooledConnection> {
777        let deadline = self
778            .inner
779            .acquire_timeout
780            .map(|timeout| Instant::now() + timeout);
781        let permit = self.acquire_permit(deadline).await?;
782        self.checkout(permit, deadline).await
783    }
784
785    /// Tries to acquire one connection from the pool without waiting.
786    ///
787    /// This follows the same reuse rules as [`Pool::acquire`], but returns
788    /// [`Error::PoolExhausted`] immediately when all pooled connections are
789    /// already checked out.
790    ///
791    /// Once the pool has a candidate connection, configured hooks still run
792    /// before it is returned.
793    pub async fn try_acquire(&self) -> Result<PooledConnection> {
794        let permit = self
795            .inner
796            .permits
797            .clone()
798            .try_acquire_owned()
799            .map_err(|err| match err {
800                TryAcquireError::NoPermits => Error::PoolExhausted,
801                TryAcquireError::Closed => Error::PoolClosed,
802            })?;
803
804        self.checkout(permit, None).await
805    }
806
807    async fn acquire_permit(&self, deadline: Option<Instant>) -> Result<OwnedSemaphorePermit> {
808        match deadline {
809            Some(deadline) => timeout(
810                self.remaining_timeout(deadline)?,
811                self.inner.permits.clone().acquire_owned(),
812            )
813            .await
814            .map_err(|_| Error::PoolTimedOut)?
815            .map_err(|_| Error::PoolClosed),
816            None => self
817                .inner
818                .permits
819                .clone()
820                .acquire_owned()
821                .await
822                .map_err(|_| Error::PoolClosed),
823        }
824    }
825
826    fn remaining_timeout(&self, deadline: Instant) -> Result<Duration> {
827        deadline
828            .checked_duration_since(Instant::now())
829            .ok_or(Error::PoolTimedOut)
830    }
831
832    async fn checkout(
833        &self,
834        permit: OwnedSemaphorePermit,
835        deadline: Option<Instant>,
836    ) -> Result<PooledConnection> {
837        let mut permit = Some(permit);
838        loop {
839            let (conn, created_at, on_connect_ran, fresh) = match self.take_idle_connection() {
840                Some(entry) => (entry.conn, entry.created_at, entry.on_connect_ran, false),
841                None => {
842                    let options = self.inner.options.clone().ok_or_else(|| {
843                        Error::Unsupported("pool cannot reopen this connection".into())
844                    })?;
845                    let conn = match deadline {
846                        Some(deadline) => {
847                            timeout(self.remaining_timeout(deadline)?, connect_managed(options))
848                                .await
849                                .map_err(|_| Error::PoolTimedOut)??
850                        }
851                        None => connect_managed(options).await?,
852                    };
853                    (conn, Instant::now(), false, true)
854                }
855            };
856
857            let mut pooled = PooledConnection {
858                pool: Arc::clone(&self.inner),
859                permit: Some(permit.take().expect("pool permit missing")),
860                conn: Some(conn),
861                reusable: true,
862                created_at,
863                on_connect_ran,
864            };
865
866            if !pooled.on_connect_ran {
867                if let Some(hook) = self
868                    .hooks
869                    .as_ref()
870                    .and_then(|hooks| hooks.on_connect.as_ref())
871                {
872                    let result = match deadline {
873                        Some(deadline) => {
874                            timeout(self.remaining_timeout(deadline)?, hook.call(&mut pooled))
875                                .await
876                                .map_err(|_| Error::PoolTimedOut)?
877                        }
878                        None => hook.call(&mut pooled).await,
879                    };
880                    if let Err(err) = result {
881                        pooled.mark_broken();
882                        return Err(err);
883                    }
884                    pooled.on_connect_ran = true;
885                }
886            }
887
888            if let Some(hook) = self
889                .hooks
890                .as_ref()
891                .and_then(|hooks| hooks.before_acquire.as_ref())
892            {
893                let decision = match deadline {
894                    Some(deadline) => {
895                        timeout(self.remaining_timeout(deadline)?, hook.call(&mut pooled))
896                            .await
897                            .map_err(|_| Error::PoolTimedOut)?
898                    }
899                    None => hook.call(&mut pooled).await,
900                };
901                match decision {
902                    Ok(AcquireDecision::Accept) => return Ok(pooled),
903                    Ok(AcquireDecision::Retry) => {
904                        pooled.mark_broken();
905                        if fresh && deadline.is_none() {
906                            return Err(Error::Unsupported(
907                                "before_acquire rejected a fresh connection".into(),
908                            ));
909                        }
910                        permit = Some(pooled.permit.take().expect("pool permit missing"));
911                        drop(pooled);
912                        continue;
913                    }
914                    Err(err) => {
915                        pooled.mark_broken();
916                        return Err(err);
917                    }
918                }
919            }
920
921            return Ok(pooled);
922        }
923    }
924
925    fn take_idle_connection(&self) -> Option<IdleConnection> {
926        let now = Instant::now();
927        let mut idle = self.inner.idle.lock().expect("pool mutex poisoned");
928        while let Some(entry) = idle.pop() {
929            if self
930                .inner
931                .idle_timeout
932                .is_some_and(|limit| now.duration_since(entry.idle_since) >= limit)
933            {
934                continue;
935            }
936            if self
937                .inner
938                .max_lifetime
939                .is_some_and(|limit| now.duration_since(entry.created_at) >= limit)
940            {
941                continue;
942            }
943            return Some(entry);
944        }
945        None
946    }
947
948    /// Returns the maximum number of open connections.
949    pub fn max_size(&self) -> usize {
950        self.inner.max_size
951    }
952
953    /// Returns the selected driver.
954    pub fn driver(&self) -> Driver {
955        self.inner.driver
956    }
957
958    /// Returns the number of idle connections currently kept by the pool.
959    pub fn idle_count(&self) -> usize {
960        self.inner.idle.lock().expect("pool mutex poisoned").len()
961    }
962
963    /// Returns the number of connections that can be acquired without waiting.
964    pub fn available_permits(&self) -> usize {
965        self.inner.permits.available_permits()
966    }
967
968    /// Returns whether the pool has been closed.
969    pub fn is_closed(&self) -> bool {
970        self.inner.permits.is_closed()
971    }
972
973    /// Closes the pool to new acquire and begin calls.
974    ///
975    /// Idle connections are dropped immediately. Checked-out connections stay
976    /// usable until they are dropped, and are discarded instead of being
977    /// returned to the pool.
978    pub fn close(&self) {
979        self.inner.permits.close();
980        self.inner.idle.lock().expect("pool mutex poisoned").clear();
981    }
982
983    /// Acquires a connection, runs SQL, and returns rows.
984    pub async fn query(&self, sql: &str) -> Result<Rows<'_>> {
985        let mut conn = self.acquire().await?;
986        let rows = conn.query(sql).await?;
987        Ok(rows.into_lifetime())
988    }
989
990    /// Creates a pool-owned prepared statement handle.
991    ///
992    /// Each execution acquires a connection and prepares the SQL on that
993    /// connection.
994    pub async fn prepare(&self, sql: &str) -> Result<PoolStatement> {
995        Ok(PoolStatement::new(self.clone(), sql))
996    }
997
998    /// Starts a transaction on a checked-out connection.
999    ///
1000    /// This is a convenience for calling [`Pool::acquire`] and then
1001    /// [`PooledConnection::begin`]. The transaction keeps that connection
1002    /// checked out until commit, rollback, or drop.
1003    pub async fn begin(&self) -> Result<PoolTransaction> {
1004        let mut conn = self.acquire().await?;
1005        start_transaction_inner(&mut conn.conn_mut().inner).await?;
1006        Ok(PoolTransaction {
1007            conn,
1008            finished: false,
1009        })
1010    }
1011
1012    /// Tries to start a transaction without waiting for a connection.
1013    ///
1014    /// This is the non-blocking counterpart to [`Pool::begin`]. It returns
1015    /// [`Error::PoolExhausted`] immediately when all pooled connections are
1016    /// already checked out.
1017    pub async fn try_begin(&self) -> Result<PoolTransaction> {
1018        let mut conn = self.try_acquire().await?;
1019        start_transaction_inner(&mut conn.conn_mut().inner).await?;
1020        Ok(PoolTransaction {
1021            conn,
1022            finished: false,
1023        })
1024    }
1025}
1026
1027impl Executor for &Pool {
1028    type Rows<'a>
1029        = Rows<'a>
1030    where
1031        Self: 'a;
1032    type Statement<'a>
1033        = PoolStatement
1034    where
1035        Self: 'a;
1036
1037    fn driver(&self) -> Driver {
1038        Pool::driver(self)
1039    }
1040
1041    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1042        Pool::query(self, sql).await
1043    }
1044
1045    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1046    where
1047        P: ParamSource + ?Sized,
1048    {
1049        let mut stmt = Pool::prepare(self, sql).await?;
1050        let rows = stmt.execute_source(params).await?;
1051        Ok(rows.into_lifetime())
1052    }
1053
1054    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1055        Pool::prepare(self, sql).await
1056    }
1057}
1058
1059impl Executor for &mut Pool {
1060    type Rows<'a>
1061        = Rows<'a>
1062    where
1063        Self: 'a;
1064    type Statement<'a>
1065        = PoolStatement
1066    where
1067        Self: 'a;
1068
1069    fn driver(&self) -> Driver {
1070        Pool::driver(self)
1071    }
1072
1073    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1074        Pool::query(self, sql).await
1075    }
1076
1077    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1078    where
1079        P: ParamSource + ?Sized,
1080    {
1081        let mut stmt = Pool::prepare(self, sql).await?;
1082        let rows = stmt.execute_source(params).await?;
1083        Ok(rows.into_lifetime())
1084    }
1085
1086    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1087        Pool::prepare(self, sql).await
1088    }
1089}
1090
1091impl Executor for Pool {
1092    type Rows<'a>
1093        = Rows<'a>
1094    where
1095        Self: 'a;
1096    type Statement<'a>
1097        = PoolStatement
1098    where
1099        Self: 'a;
1100
1101    fn driver(&self) -> Driver {
1102        self.driver()
1103    }
1104
1105    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1106        Pool::query(self, sql).await
1107    }
1108
1109    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1110    where
1111        P: ParamSource + ?Sized,
1112    {
1113        let mut stmt = Pool::prepare(self, sql).await?;
1114        let rows = stmt.execute_source(params).await?;
1115        Ok(rows.into_lifetime())
1116    }
1117
1118    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1119        Pool::prepare(self, sql).await
1120    }
1121}
1122
1123/// Builder for [`Pool`].
1124///
1125/// Start with [`Pool::connect`], adjust any settings you need, then call
1126/// [`PoolBuilder::build`].
1127pub struct PoolBuilder {
1128    max_size: usize,
1129    min_connections: usize,
1130    acquire_timeout: Option<Duration>,
1131    idle_timeout: Option<Duration>,
1132    max_lifetime: Option<Duration>,
1133    options: Option<ConnectOptions>,
1134}
1135
1136impl Default for PoolBuilder {
1137    fn default() -> Self {
1138        Self::new()
1139    }
1140}
1141
1142impl PoolBuilder {
1143    /// Creates a builder with a default maximum size of 10.
1144    pub fn new() -> Self {
1145        Self {
1146            max_size: 10,
1147            min_connections: 0,
1148            acquire_timeout: None,
1149            idle_timeout: None,
1150            max_lifetime: None,
1151            options: None,
1152        }
1153    }
1154
1155    /// Sets the maximum number of open connections.
1156    ///
1157    /// `0` is rejected by [`Self::build`].
1158    pub fn max_size(mut self, value: usize) -> Self {
1159        self.max_size = value;
1160        self
1161    }
1162
1163    /// Sets how many connections to open before the pool is returned.
1164    ///
1165    /// The default is `0`, which keeps pool creation lazy.
1166    pub fn min_connections(mut self, value: usize) -> Self {
1167        self.min_connections = value;
1168        self
1169    }
1170
1171    /// Sets how long [`Pool::acquire`] and [`Pool::begin`] may wait.
1172    ///
1173    /// When the timeout is reached, they return [`Error::PoolTimedOut`].
1174    ///
1175    /// The default is no timeout.
1176    pub fn acquire_timeout(mut self, value: Duration) -> Self {
1177        self.acquire_timeout = Some(value);
1178        self
1179    }
1180
1181    /// Sets how long an unused idle connection may stay in the pool.
1182    ///
1183    /// Expired idle connections are dropped lazily on the next checkout or
1184    /// return. The default is no idle timeout.
1185    pub fn idle_timeout(mut self, value: Duration) -> Self {
1186        self.idle_timeout = Some(value);
1187        self
1188    }
1189
1190    /// Sets the maximum lifetime of a pooled connection.
1191    ///
1192    /// Expired connections are dropped lazily on the next checkout or return.
1193    /// The default is no lifetime limit.
1194    pub fn max_lifetime(mut self, value: Duration) -> Self {
1195        self.max_lifetime = Some(value);
1196        self
1197    }
1198
1199    /// Stores connection options.
1200    ///
1201    /// Strings are parsed as database URLs.
1202    pub fn connect(mut self, url: impl IntoConnectOptions) -> Result<Self> {
1203        self.options = Some(url.into_connect_options()?);
1204        Ok(self)
1205    }
1206
1207    /// Builds a pool.
1208    ///
1209    /// When `min_connections` is `0`, this only stores the configuration and
1210    /// stays lazy. When it is greater than `0`, this opens that many
1211    /// connections before returning.
1212    pub async fn build(self) -> Result<Pool> {
1213        if self.max_size == 0 {
1214            return Err(Error::Unsupported(
1215                "pool max_size must be greater than zero".into(),
1216            ));
1217        }
1218        if self.min_connections > self.max_size {
1219            return Err(Error::Unsupported(
1220                "pool min_connections cannot be greater than max_size".into(),
1221            ));
1222        }
1223
1224        let options = self
1225            .options
1226            .ok_or_else(|| Error::Unsupported("pool requires connection options".into()))?;
1227        let driver = options
1228            .driver
1229            .ok_or_else(|| Error::Unsupported("pool requires a driver".into()))?;
1230
1231        let now = Instant::now();
1232        let mut idle = Vec::with_capacity(self.min_connections);
1233        for _ in 0..self.min_connections {
1234            idle.push(IdleConnection {
1235                conn: connect_managed(options.clone()).await?,
1236                created_at: now,
1237                idle_since: now,
1238                on_connect_ran: false,
1239            });
1240        }
1241
1242        Ok(Pool {
1243            inner: Arc::new(PoolInner {
1244                driver,
1245                max_size: self.max_size,
1246                permits: Arc::new(Semaphore::new(self.max_size)),
1247                idle: Mutex::new(idle),
1248                options: Some(options),
1249                acquire_timeout: self.acquire_timeout,
1250                idle_timeout: self.idle_timeout,
1251                max_lifetime: self.max_lifetime,
1252            }),
1253            hooks: None,
1254        })
1255    }
1256}
1257
1258/// A connection checked out from a [`Pool`].
1259///
1260/// The connection returns to the pool on drop unless an operation failed or a
1261/// transaction was dropped without being finished.
1262///
1263/// Keep this when you need one connection for several operations or want to
1264/// prepare a statement that stays on the same connection.
1265pub struct PooledConnection {
1266    pool: Arc<PoolInner>,
1267    permit: Option<OwnedSemaphorePermit>,
1268    conn: Option<ManagedConnection>,
1269    reusable: bool,
1270    created_at: Instant,
1271    on_connect_ran: bool,
1272}
1273
1274impl PooledConnection {
1275    fn conn_mut(&mut self) -> &mut ManagedConnection {
1276        self.conn.as_mut().expect("pooled connection missing")
1277    }
1278
1279    fn mark_broken(&mut self) {
1280        self.reusable = false;
1281    }
1282
1283    /// Returns the selected driver.
1284    pub fn driver(&self) -> Driver {
1285        self.conn
1286            .as_ref()
1287            .expect("pooled connection missing")
1288            .driver
1289    }
1290
1291    /// Runs SQL on this checked-out connection.
1292    pub async fn query(&mut self, sql: &str) -> Result<Rows<'_>> {
1293        let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1294        match query_inner(
1295            &mut conn.as_mut().expect("pooled connection missing").inner,
1296            sql,
1297        )
1298        .await
1299        {
1300            Ok(rows) => Ok(rows),
1301            Err(err) => {
1302                *reusable = false;
1303                Err(err)
1304            }
1305        }
1306    }
1307
1308    /// Prepares SQL on this checked-out connection.
1309    pub async fn prepare(&mut self, sql: &str) -> Result<PooledStatement<'_>> {
1310        let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1311        match prepare_inner(
1312            &mut conn.as_mut().expect("pooled connection missing").inner,
1313            sql,
1314        )
1315        .await
1316        {
1317            Ok(stmt) => Ok(PooledStatement { stmt, reusable }),
1318            Err(err) => {
1319                *reusable = false;
1320                Err(err)
1321            }
1322        }
1323    }
1324
1325    /// Starts a transaction on this checked-out connection.
1326    ///
1327    /// The connection is kept until the transaction is committed, rolled back,
1328    /// or dropped.
1329    pub async fn begin(&mut self) -> Result<PooledTransaction<'_>> {
1330        let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1331        match start_transaction_inner(&mut conn.as_mut().expect("pooled connection missing").inner)
1332            .await
1333        {
1334            Ok(()) => Ok(PooledTransaction {
1335                pooled: self,
1336                finished: false,
1337            }),
1338            Err(err) => {
1339                *reusable = false;
1340                Err(err)
1341            }
1342        }
1343    }
1344
1345    /// Starts listening on one Postgres notification channel.
1346    #[cfg(feature = "postgres")]
1347    pub async fn listen(&mut self, channel: &str) -> Result<()> {
1348        let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1349        match &mut conn.as_mut().expect("pooled connection missing").inner {
1350            crate::connection::ConnectionInner::Postgres(conn) => {
1351                match conn.listen(channel).await {
1352                    Ok(()) => Ok(()),
1353                    Err(err) => {
1354                        *reusable = false;
1355                        Err(err.into())
1356                    }
1357                }
1358            }
1359            _ => Err(Error::Unsupported(
1360                "postgres notifications require a postgres connection".into(),
1361            )),
1362        }
1363    }
1364
1365    /// Stops listening on one Postgres notification channel.
1366    #[cfg(feature = "postgres")]
1367    pub async fn unlisten(&mut self, channel: &str) -> Result<()> {
1368        let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1369        match &mut conn.as_mut().expect("pooled connection missing").inner {
1370            crate::connection::ConnectionInner::Postgres(conn) => {
1371                match conn.unlisten(channel).await {
1372                    Ok(()) => Ok(()),
1373                    Err(err) => {
1374                        *reusable = false;
1375                        Err(err.into())
1376                    }
1377                }
1378            }
1379            _ => Err(Error::Unsupported(
1380                "postgres notifications require a postgres connection".into(),
1381            )),
1382        }
1383    }
1384
1385    /// Stops listening on all Postgres notification channels.
1386    #[cfg(feature = "postgres")]
1387    pub async fn unlisten_all(&mut self) -> Result<()> {
1388        let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1389        match &mut conn.as_mut().expect("pooled connection missing").inner {
1390            crate::connection::ConnectionInner::Postgres(conn) => match conn.unlisten_all().await {
1391                Ok(()) => Ok(()),
1392                Err(err) => {
1393                    *reusable = false;
1394                    Err(err.into())
1395                }
1396            },
1397            _ => Err(Error::Unsupported(
1398                "postgres notifications require a postgres connection".into(),
1399            )),
1400        }
1401    }
1402
1403    /// Sends one Postgres notification.
1404    #[cfg(feature = "postgres")]
1405    pub async fn notify(&mut self, channel: &str, payload: Option<&str>) -> Result<()> {
1406        let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1407        match &mut conn.as_mut().expect("pooled connection missing").inner {
1408            crate::connection::ConnectionInner::Postgres(conn) => {
1409                match conn.notify(channel, payload).await {
1410                    Ok(()) => Ok(()),
1411                    Err(err) => {
1412                        *reusable = false;
1413                        Err(err.into())
1414                    }
1415                }
1416            }
1417            _ => Err(Error::Unsupported(
1418                "postgres notifications require a postgres connection".into(),
1419            )),
1420        }
1421    }
1422
1423    /// Returns one pending Postgres notification if one is already buffered.
1424    #[cfg(feature = "postgres")]
1425    pub async fn try_recv_notification(&mut self) -> Result<Option<crate::PostgresNotification>> {
1426        let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1427        match &mut conn.as_mut().expect("pooled connection missing").inner {
1428            crate::connection::ConnectionInner::Postgres(conn) => {
1429                match conn.try_recv_notification().await {
1430                    Ok(notification) => Ok(notification),
1431                    Err(err) => {
1432                        *reusable = false;
1433                        Err(err.into())
1434                    }
1435                }
1436            }
1437            _ => Err(Error::Unsupported(
1438                "postgres notifications require a postgres connection".into(),
1439            )),
1440        }
1441    }
1442
1443    /// Waits for the next Postgres notification on this checked-out connection.
1444    #[cfg(feature = "postgres")]
1445    pub async fn wait_for_notification(&mut self) -> Result<crate::PostgresNotification> {
1446        let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1447        match &mut conn.as_mut().expect("pooled connection missing").inner {
1448            crate::connection::ConnectionInner::Postgres(conn) => {
1449                match conn.wait_for_notification().await {
1450                    Ok(notification) => Ok(notification),
1451                    Err(err) => {
1452                        *reusable = false;
1453                        Err(err.into())
1454                    }
1455                }
1456            }
1457            _ => Err(Error::Unsupported(
1458                "postgres notifications require a postgres connection".into(),
1459            )),
1460        }
1461    }
1462}
1463
1464/// A statement that belongs to a pool.
1465///
1466/// Each execution acquires a connection from the pool. Use
1467/// [`PooledConnection::prepare`] when repeated executions must stay on the same
1468/// connection.
1469///
1470/// This is a good fit when the same SQL runs more than once but does not need
1471/// to stay pinned to one checked-out connection between calls.
1472pub struct PoolStatement {
1473    pool: Pool,
1474    sql: String,
1475}
1476
1477impl PoolStatement {
1478    fn new(pool: Pool, sql: &str) -> Self {
1479        Self {
1480            pool,
1481            sql: sql.into(),
1482        }
1483    }
1484
1485    /// Starts binding parameters for this statement.
1486    pub fn bind<T>(&mut self, value: T) -> BoundStatement<'_, Self>
1487    where
1488        T: Encode,
1489    {
1490        BoundStatement::new(self).bind(value)
1491    }
1492
1493    /// Runs the statement and returns rows.
1494    pub async fn execute_source<P>(&mut self, params: &P) -> Result<Rows<'_>>
1495    where
1496        P: ParamSource + ?Sized,
1497    {
1498        let mut conn = self.pool.acquire().await?;
1499        let mut stmt = prepare_inner(&mut conn.conn_mut().inner, &self.sql).await?;
1500        let rows = stmt.execute_source(params).await?;
1501        Ok(rows.into_lifetime())
1502    }
1503
1504    /// Runs the statement when no rows are expected.
1505    pub async fn exec_source<P>(&mut self, params: &P) -> Result<ExecResult>
1506    where
1507        P: ParamSource + ?Sized,
1508    {
1509        let mut conn = self.pool.acquire().await?;
1510        let mut stmt = prepare_inner(&mut conn.conn_mut().inner, &self.sql).await?;
1511        stmt.exec_source(params).await
1512    }
1513}
1514
1515impl PreparedStatement for PoolStatement {
1516    type Rows<'a>
1517        = Rows<'a>
1518    where
1519        Self: 'a;
1520
1521    async fn execute_source<P>(&mut self, params: &P) -> Result<Self::Rows<'_>>
1522    where
1523        P: ParamSource + ?Sized,
1524    {
1525        PoolStatement::execute_source(self, params).await
1526    }
1527
1528    async fn exec_source<P>(&mut self, params: &P) -> Result<ExecResult>
1529    where
1530        P: ParamSource + ?Sized,
1531    {
1532        PoolStatement::exec_source(self, params).await
1533    }
1534}
1535
1536impl Executor for &mut PooledConnection {
1537    type Rows<'a>
1538        = Rows<'a>
1539    where
1540        Self: 'a;
1541    type Statement<'a>
1542        = PooledStatement<'a>
1543    where
1544        Self: 'a;
1545
1546    fn driver(&self) -> Driver {
1547        PooledConnection::driver(self)
1548    }
1549
1550    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1551        PooledConnection::query(*self, sql).await
1552    }
1553
1554    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1555    where
1556        P: ParamSource + ?Sized,
1557    {
1558        let mut stmt = PooledConnection::prepare(*self, sql).await?;
1559        let rows = stmt.execute_source(params).await?;
1560        Ok(rows.into_lifetime())
1561    }
1562
1563    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1564        PooledConnection::prepare(*self, sql).await
1565    }
1566}
1567
1568impl Executor for PooledConnection {
1569    type Rows<'a>
1570        = Rows<'a>
1571    where
1572        Self: 'a;
1573    type Statement<'a>
1574        = PooledStatement<'a>
1575    where
1576        Self: 'a;
1577
1578    fn driver(&self) -> Driver {
1579        self.driver()
1580    }
1581
1582    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1583        PooledConnection::query(self, sql).await
1584    }
1585
1586    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1587    where
1588        P: ParamSource + ?Sized,
1589    {
1590        let mut stmt = PooledConnection::prepare(self, sql).await?;
1591        let rows = stmt.execute_source(params).await?;
1592        Ok(rows.into_lifetime())
1593    }
1594
1595    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1596        PooledConnection::prepare(self, sql).await
1597    }
1598}
1599
1600impl Drop for PooledConnection {
1601    fn drop(&mut self) {
1602        let conn = self.conn.take();
1603        if let Some(conn) = conn {
1604            let now = Instant::now();
1605            let expired_by_lifetime = self
1606                .pool
1607                .max_lifetime
1608                .is_some_and(|limit| now.duration_since(self.created_at) >= limit);
1609            if self.reusable && !self.pool.permits.is_closed() && !expired_by_lifetime {
1610                self.pool
1611                    .idle
1612                    .lock()
1613                    .expect("pool mutex poisoned")
1614                    .push(IdleConnection {
1615                        conn,
1616                        created_at: self.created_at,
1617                        idle_since: now,
1618                        on_connect_ran: self.on_connect_ran,
1619                    });
1620            }
1621        }
1622        self.permit.take();
1623    }
1624}
1625
1626/// A transaction borrowed from a [`PooledConnection`].
1627///
1628/// Dropping it without commit or rollback marks the connection as broken.
1629///
1630/// While this value exists, the underlying [`PooledConnection`] stays borrowed.
1631pub struct PooledTransaction<'a> {
1632    pooled: &'a mut PooledConnection,
1633    finished: bool,
1634}
1635
1636impl<'a> PooledTransaction<'a> {
1637    /// Returns the selected driver.
1638    pub fn driver(&self) -> Driver {
1639        self.pooled.driver()
1640    }
1641
1642    /// Runs SQL inside the transaction.
1643    pub async fn query(&mut self, sql: &str) -> Result<Rows<'_>> {
1644        self.pooled.query(sql).await
1645    }
1646
1647    /// Prepares SQL inside the transaction.
1648    pub async fn prepare(&mut self, sql: &str) -> Result<PooledStatement<'_>> {
1649        self.pooled.prepare(sql).await
1650    }
1651
1652    /// Commits the transaction.
1653    pub async fn commit(mut self) -> Result<()> {
1654        self.finished = true;
1655        match commit_inner(&mut self.pooled.conn_mut().inner).await {
1656            Ok(()) => Ok(()),
1657            Err(err) => {
1658                self.pooled.mark_broken();
1659                Err(err)
1660            }
1661        }
1662    }
1663
1664    /// Rolls the transaction back.
1665    pub async fn rollback(mut self) -> Result<()> {
1666        self.finished = true;
1667        match rollback_inner(&mut self.pooled.conn_mut().inner).await {
1668            Ok(()) => Ok(()),
1669            Err(err) => {
1670                self.pooled.mark_broken();
1671                Err(err)
1672            }
1673        }
1674    }
1675
1676    /// Returns the underlying checked-out connection.
1677    pub fn connection(&mut self) -> &mut PooledConnection {
1678        self.pooled
1679    }
1680}
1681
1682impl Executor for &mut PooledTransaction<'_> {
1683    type Rows<'a>
1684        = Rows<'a>
1685    where
1686        Self: 'a;
1687    type Statement<'a>
1688        = PooledStatement<'a>
1689    where
1690        Self: 'a;
1691
1692    fn driver(&self) -> Driver {
1693        PooledTransaction::driver(self)
1694    }
1695
1696    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1697        PooledTransaction::query(*self, sql).await
1698    }
1699
1700    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1701    where
1702        P: ParamSource + ?Sized,
1703    {
1704        let mut stmt = PooledTransaction::prepare(*self, sql).await?;
1705        let rows = stmt.execute_source(params).await?;
1706        Ok(rows.into_lifetime())
1707    }
1708
1709    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1710        PooledTransaction::prepare(*self, sql).await
1711    }
1712}
1713
1714impl Executor for PooledTransaction<'_> {
1715    type Rows<'a>
1716        = Rows<'a>
1717    where
1718        Self: 'a;
1719    type Statement<'a>
1720        = PooledStatement<'a>
1721    where
1722        Self: 'a;
1723
1724    fn driver(&self) -> Driver {
1725        self.driver()
1726    }
1727
1728    async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1729        PooledTransaction::query(self, sql).await
1730    }
1731
1732    async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1733    where
1734        P: ParamSource + ?Sized,
1735    {
1736        let mut stmt = PooledTransaction::prepare(self, sql).await?;
1737        let rows = stmt.execute_source(params).await?;
1738        Ok(rows.into_lifetime())
1739    }
1740
1741    async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1742        PooledTransaction::prepare(self, sql).await
1743    }
1744}
1745
1746impl Drop for PooledTransaction<'_> {
1747    fn drop(&mut self) {
1748        if !self.finished {
1749            self.pooled.mark_broken();
1750        }
1751    }
1752}
1753
1754/// A statement prepared on a checked-out connection.
1755///
1756/// The statement stays tied to that connection for as long as it is borrowed.
1757pub struct PooledStatement<'a> {
1758    stmt: Statement<'a>,
1759    reusable: &'a mut bool,
1760}
1761
1762impl PooledStatement<'_> {
1763    /// Starts binding parameters for this statement.
1764    pub fn bind<T>(&mut self, value: T) -> BoundStatement<'_, Self>
1765    where
1766        T: Encode,
1767    {
1768        BoundStatement::new(self).bind(value)
1769    }
1770
1771    /// Runs the statement and returns rows.
1772    pub async fn execute_source<P>(&mut self, params: &P) -> Result<Rows<'_>>
1773    where
1774        P: ParamSource + ?Sized,
1775    {
1776        match self.stmt.execute_source(params).await {
1777            Ok(rows) => Ok(rows),
1778            Err(err) => {
1779                *self.reusable = false;
1780                Err(err)
1781            }
1782        }
1783    }
1784
1785    /// Runs the statement when no rows are expected.
1786    pub async fn exec_source<P>(&mut self, params: &P) -> Result<ExecResult>
1787    where
1788        P: ParamSource + ?Sized,
1789    {
1790        match self.stmt.exec_source(params).await {
1791            Ok(result) => Ok(result),
1792            Err(err) => {
1793                *self.reusable = false;
1794                Err(err)
1795            }
1796        }
1797    }
1798}
1799
1800impl PreparedStatement for PooledStatement<'_> {
1801    type Rows<'a>
1802        = Rows<'a>
1803    where
1804        Self: 'a;
1805
1806    async fn execute_source<P>(&mut self, params: &P) -> Result<Self::Rows<'_>>
1807    where
1808        P: ParamSource + ?Sized,
1809    {
1810        PooledStatement::execute_source(self, params).await
1811    }
1812
1813    async fn exec_source<P>(&mut self, params: &P) -> Result<ExecResult>
1814    where
1815        P: ParamSource + ?Sized,
1816    {
1817        PooledStatement::exec_source(self, params).await
1818    }
1819}