1#![cfg_attr(
2 not(any(feature = "mariadb", feature = "postgres", feature = "sqlite")),
3 allow(unreachable_code, unused_variables)
4)]
5
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant};
8use std::{future::Future, pin::Pin};
9
10use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
11use tokio::time::timeout;
12
13use crate::connection::{
14 ManagedConnection, commit_inner, connect_managed, prepare_inner, query_inner, rollback_inner,
15 start_transaction_inner,
16};
17use crate::{
18 BoundStatement, ConnectOptions, Driver, Encode, Error, ExecResult, Executor,
19 MysqlConnectOptions, ParamSource, PostgresConnectOptions, PreparedStatement, Result, Rows,
20 SqliteConnectOptions, Statement,
21};
22
23pub struct Connection {
31 pub(crate) conn: ManagedConnection,
32}
33
34pub trait IntoConnectOptions {
43 fn into_connect_options(self) -> Result<ConnectOptions>;
45}
46
47impl<T> IntoConnectOptions for T
48where
49 T: AsRef<str>,
50{
51 fn into_connect_options(self) -> Result<ConnectOptions> {
52 ConnectOptions::from_url(self.as_ref())
53 }
54}
55
56impl IntoConnectOptions for SqliteConnectOptions {
57 fn into_connect_options(self) -> Result<ConnectOptions> {
58 Ok(ConnectOptions::from(self))
59 }
60}
61
62impl IntoConnectOptions for PostgresConnectOptions {
63 fn into_connect_options(self) -> Result<ConnectOptions> {
64 Ok(ConnectOptions::from(self))
65 }
66}
67
68impl IntoConnectOptions for MysqlConnectOptions {
69 fn into_connect_options(self) -> Result<ConnectOptions> {
70 Ok(ConnectOptions::from(self))
71 }
72}
73
74impl IntoConnectOptions for ConnectOptions {
75 fn into_connect_options(self) -> Result<ConnectOptions> {
76 Ok(self)
77 }
78}
79
80impl Connection {
81 pub async fn connect(options: impl IntoConnectOptions) -> Result<Self> {
83 let options = options.into_connect_options()?;
84 Ok(Self {
85 conn: connect_managed(options).await?,
86 })
87 }
88
89 pub fn driver(&self) -> Driver {
91 self.conn.driver
92 }
93
94 pub async fn query(&mut self, sql: &str) -> Result<Rows<'_>> {
98 query_inner(&mut self.conn.inner, sql).await
99 }
100
101 pub async fn prepare(&mut self, sql: &str) -> Result<Statement<'_>> {
103 prepare_inner(&mut self.conn.inner, sql).await
104 }
105
106 pub async fn begin(&mut self) -> Result<Transaction<'_>> {
111 Transaction::begin(&mut self.conn).await
112 }
113
114 #[cfg(feature = "postgres")]
116 pub async fn listen(&mut self, channel: &str) -> Result<()> {
117 match &mut self.conn.inner {
118 crate::connection::ConnectionInner::Postgres(conn) => conn.listen(channel).await?,
119 _ => {
120 return Err(Error::Unsupported(
121 "postgres notifications require a postgres connection".into(),
122 ));
123 }
124 }
125 Ok(())
126 }
127
128 #[cfg(feature = "postgres")]
130 pub async fn unlisten(&mut self, channel: &str) -> Result<()> {
131 match &mut self.conn.inner {
132 crate::connection::ConnectionInner::Postgres(conn) => {
133 conn.unlisten(channel).await.map_err(Error::from)?
134 }
135 _ => {
136 return Err(Error::Unsupported(
137 "postgres notifications require a postgres connection".into(),
138 ));
139 }
140 }
141 Ok(())
142 }
143
144 #[cfg(feature = "postgres")]
146 pub async fn unlisten_all(&mut self) -> Result<()> {
147 match &mut self.conn.inner {
148 crate::connection::ConnectionInner::Postgres(conn) => {
149 conn.unlisten_all().await.map_err(Error::from)?
150 }
151 _ => {
152 return Err(Error::Unsupported(
153 "postgres notifications require a postgres connection".into(),
154 ));
155 }
156 }
157 Ok(())
158 }
159
160 #[cfg(feature = "postgres")]
162 pub async fn notify(&mut self, channel: &str, payload: Option<&str>) -> Result<()> {
163 match &mut self.conn.inner {
164 crate::connection::ConnectionInner::Postgres(conn) => {
165 conn.notify(channel, payload).await.map_err(Error::from)?
166 }
167 _ => {
168 return Err(Error::Unsupported(
169 "postgres notifications require a postgres connection".into(),
170 ));
171 }
172 }
173 Ok(())
174 }
175
176 #[cfg(feature = "postgres")]
178 pub async fn try_recv_notification(&mut self) -> Result<Option<crate::PostgresNotification>> {
179 match &mut self.conn.inner {
180 crate::connection::ConnectionInner::Postgres(conn) => {
181 conn.try_recv_notification().await.map_err(Error::from)
182 }
183 _ => Err(Error::Unsupported(
184 "postgres notifications require a postgres connection".into(),
185 )),
186 }
187 }
188
189 #[cfg(feature = "postgres")]
191 pub async fn wait_for_notification(&mut self) -> Result<crate::PostgresNotification> {
192 match &mut self.conn.inner {
193 crate::connection::ConnectionInner::Postgres(conn) => {
194 conn.wait_for_notification().await.map_err(Error::from)
195 }
196 _ => Err(Error::Unsupported(
197 "postgres notifications require a postgres connection".into(),
198 )),
199 }
200 }
201}
202
203impl Executor for &mut Connection {
204 type Rows<'a>
205 = Rows<'a>
206 where
207 Self: 'a;
208 type Statement<'a>
209 = Statement<'a>
210 where
211 Self: 'a;
212
213 fn driver(&self) -> Driver {
214 Connection::driver(self)
215 }
216
217 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
218 Connection::query(*self, sql).await
219 }
220
221 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
222 where
223 P: ParamSource + ?Sized,
224 {
225 let mut stmt = Connection::prepare(*self, sql).await?;
226 let rows = stmt.execute_source(params).await?;
227 Ok(rows.into_lifetime())
228 }
229
230 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
231 Connection::prepare(*self, sql).await
232 }
233}
234
235impl Executor for Connection {
236 type Rows<'a>
237 = Rows<'a>
238 where
239 Self: 'a;
240 type Statement<'a>
241 = Statement<'a>
242 where
243 Self: 'a;
244
245 fn driver(&self) -> Driver {
246 self.driver()
247 }
248
249 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
250 Connection::query(self, sql).await
251 }
252
253 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
254 where
255 P: ParamSource + ?Sized,
256 {
257 let mut stmt = Connection::prepare(self, sql).await?;
258 let rows = stmt.execute_source(params).await?;
259 Ok(rows.into_lifetime())
260 }
261
262 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
263 Connection::prepare(self, sql).await
264 }
265}
266
267pub struct Transaction<'a> {
274 inner: Option<TransactionInner<'a>>,
275}
276
277enum TransactionInner<'a> {
278 #[cfg(feature = "mariadb")]
279 Mysql(quex_driver::mysql::Transaction<'a>),
280 #[cfg(feature = "postgres")]
281 Postgres(quex_driver::postgres::Transaction<'a>),
282 #[cfg(feature = "sqlite")]
283 Sqlite(quex_driver::sqlite::Transaction<'a>),
284 _Marker(std::marker::PhantomData<&'a ()>),
285}
286
287impl<'a> Transaction<'a> {
288 async fn begin(conn: &'a mut ManagedConnection) -> Result<Self> {
289 let inner = match &mut conn.inner {
290 #[cfg(feature = "mariadb")]
291 crate::connection::ConnectionInner::Mysql(conn) => {
292 TransactionInner::Mysql(conn.begin().await?)
293 }
294 #[cfg(feature = "postgres")]
295 crate::connection::ConnectionInner::Postgres(conn) => {
296 TransactionInner::Postgres(conn.begin().await?)
297 }
298 #[cfg(feature = "sqlite")]
299 crate::connection::ConnectionInner::Sqlite(conn) => {
300 TransactionInner::Sqlite(conn.begin().await?)
301 }
302 crate::connection::ConnectionInner::_Disabled => {
303 unreachable!("disabled backend placeholder")
304 }
305 };
306 Ok(Self { inner: Some(inner) })
307 }
308
309 fn inner_mut(&mut self) -> &mut TransactionInner<'a> {
310 self.inner.as_mut().expect("transaction missing")
311 }
312}
313
314pub struct PoolTransaction {
323 conn: PooledConnection,
324 finished: bool,
325}
326
327impl Transaction<'_> {
328 pub fn driver(&self) -> Driver {
330 match self.inner.as_ref().expect("transaction missing") {
331 #[cfg(feature = "mariadb")]
332 TransactionInner::Mysql(_) => Driver::Mysql,
333 #[cfg(feature = "postgres")]
334 TransactionInner::Postgres(_) => Driver::Pgsql,
335 #[cfg(feature = "sqlite")]
336 TransactionInner::Sqlite(_) => Driver::Sqlite,
337 TransactionInner::_Marker(_) => unreachable!("disabled backend placeholder"),
338 }
339 }
340
341 pub async fn query(&mut self, sql: &str) -> Result<Rows<'_>> {
343 match self.inner_mut() {
344 #[cfg(feature = "mariadb")]
345 TransactionInner::Mysql(tx) => Ok(Rows::mysql(tx.connection().query(sql).await?)),
346 #[cfg(feature = "postgres")]
347 TransactionInner::Postgres(tx) => Ok(Rows::postgres(tx.connection().query(sql).await?)),
348 #[cfg(feature = "sqlite")]
349 TransactionInner::Sqlite(tx) => Ok(Rows::sqlite(tx.connection().query(sql).await?)),
350 TransactionInner::_Marker(_) => unreachable!("disabled backend placeholder"),
351 }
352 }
353
354 pub async fn prepare(&mut self, sql: &str) -> Result<Statement<'_>> {
356 match self.inner_mut() {
357 #[cfg(feature = "mariadb")]
358 TransactionInner::Mysql(tx) => {
359 Ok(Statement::Mysql(tx.connection().prepare_cached(sql).await?))
360 }
361 #[cfg(feature = "postgres")]
362 TransactionInner::Postgres(tx) => {
363 let sql = crate::connection::rewrite_postgres_placeholders(sql);
364 Ok(Statement::Postgres(
365 tx.connection().prepare_cached(&sql).await?,
366 ))
367 }
368 #[cfg(feature = "sqlite")]
369 TransactionInner::Sqlite(tx) => Ok(Statement::Sqlite(
370 tx.connection().prepare_cached(sql).await?,
371 )),
372 TransactionInner::_Marker(_) => unreachable!("disabled backend placeholder"),
373 }
374 }
375
376 pub async fn commit(mut self) -> Result<()> {
378 match self.inner.take().expect("transaction missing") {
379 #[cfg(feature = "mariadb")]
380 TransactionInner::Mysql(tx) => tx.commit().await.map_err(Into::into),
381 #[cfg(feature = "postgres")]
382 TransactionInner::Postgres(tx) => tx.commit().await.map_err(Into::into),
383 #[cfg(feature = "sqlite")]
384 TransactionInner::Sqlite(tx) => tx.commit().await.map_err(Into::into),
385 TransactionInner::_Marker(_) => unreachable!("disabled backend placeholder"),
386 }
387 }
388
389 pub async fn rollback(mut self) -> Result<()> {
391 match self.inner.take().expect("transaction missing") {
392 #[cfg(feature = "mariadb")]
393 TransactionInner::Mysql(tx) => tx.rollback().await.map_err(Into::into),
394 #[cfg(feature = "postgres")]
395 TransactionInner::Postgres(tx) => tx.rollback().await.map_err(Into::into),
396 #[cfg(feature = "sqlite")]
397 TransactionInner::Sqlite(tx) => tx.rollback().await.map_err(Into::into),
398 TransactionInner::_Marker(_) => unreachable!("disabled backend placeholder"),
399 }
400 }
401}
402
403impl PoolTransaction {
404 pub fn driver(&self) -> Driver {
406 self.conn.driver()
407 }
408
409 pub async fn query(&mut self, sql: &str) -> Result<Rows<'_>> {
411 self.conn.query(sql).await
412 }
413
414 pub async fn prepare(&mut self, sql: &str) -> Result<PooledStatement<'_>> {
416 self.conn.prepare(sql).await
417 }
418
419 pub async fn commit(mut self) -> Result<()> {
421 self.finished = true;
422 match commit_inner(&mut self.conn.conn_mut().inner).await {
423 Ok(()) => Ok(()),
424 Err(err) => {
425 self.conn.mark_broken();
426 Err(err)
427 }
428 }
429 }
430
431 pub async fn rollback(mut self) -> Result<()> {
433 self.finished = true;
434 match rollback_inner(&mut self.conn.conn_mut().inner).await {
435 Ok(()) => Ok(()),
436 Err(err) => {
437 self.conn.mark_broken();
438 Err(err)
439 }
440 }
441 }
442}
443
444impl Executor for &mut Transaction<'_> {
445 type Rows<'a>
446 = Rows<'a>
447 where
448 Self: 'a;
449 type Statement<'a>
450 = Statement<'a>
451 where
452 Self: 'a;
453
454 fn driver(&self) -> Driver {
455 Transaction::driver(self)
456 }
457
458 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
459 Transaction::query(*self, sql).await
460 }
461
462 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
463 where
464 P: ParamSource + ?Sized,
465 {
466 let mut stmt = Transaction::prepare(*self, sql).await?;
467 let rows = stmt.execute_source(params).await?;
468 Ok(rows.into_lifetime())
469 }
470
471 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
472 Transaction::prepare(*self, sql).await
473 }
474}
475
476impl Executor for Transaction<'_> {
477 type Rows<'a>
478 = Rows<'a>
479 where
480 Self: 'a;
481 type Statement<'a>
482 = Statement<'a>
483 where
484 Self: 'a;
485
486 fn driver(&self) -> Driver {
487 self.driver()
488 }
489
490 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
491 Transaction::query(self, sql).await
492 }
493
494 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
495 where
496 P: ParamSource + ?Sized,
497 {
498 let mut stmt = Transaction::prepare(self, sql).await?;
499 let rows = stmt.execute_source(params).await?;
500 Ok(rows.into_lifetime())
501 }
502
503 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
504 Transaction::prepare(self, sql).await
505 }
506}
507
508impl Drop for Transaction<'_> {
509 fn drop(&mut self) {
510 let _ = self.inner.take();
511 }
512}
513
514impl Executor for &mut PoolTransaction {
515 type Rows<'a>
516 = Rows<'a>
517 where
518 Self: 'a;
519 type Statement<'a>
520 = PooledStatement<'a>
521 where
522 Self: 'a;
523
524 fn driver(&self) -> Driver {
525 PoolTransaction::driver(self)
526 }
527
528 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
529 PoolTransaction::query(*self, sql).await
530 }
531
532 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
533 where
534 P: ParamSource + ?Sized,
535 {
536 let mut stmt = PoolTransaction::prepare(*self, sql).await?;
537 let rows = stmt.execute_source(params).await?;
538 Ok(rows.into_lifetime())
539 }
540
541 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
542 PoolTransaction::prepare(*self, sql).await
543 }
544}
545
546impl Executor for PoolTransaction {
547 type Rows<'a>
548 = Rows<'a>
549 where
550 Self: 'a;
551 type Statement<'a>
552 = PooledStatement<'a>
553 where
554 Self: 'a;
555
556 fn driver(&self) -> Driver {
557 self.driver()
558 }
559
560 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
561 PoolTransaction::query(self, sql).await
562 }
563
564 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
565 where
566 P: ParamSource + ?Sized,
567 {
568 let mut stmt = PoolTransaction::prepare(self, sql).await?;
569 let rows = stmt.execute_source(params).await?;
570 Ok(rows.into_lifetime())
571 }
572
573 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
574 PoolTransaction::prepare(self, sql).await
575 }
576}
577
578impl Drop for PoolTransaction {
579 fn drop(&mut self) {
580 if !self.finished {
581 self.conn.mark_broken();
582 }
583 }
584}
585
586#[derive(Clone)]
613pub struct Pool {
614 inner: Arc<PoolInner>,
615 hooks: Option<Arc<Hooks>>,
616}
617
618struct PoolInner {
619 driver: Driver,
620 max_size: usize,
621 permits: Arc<Semaphore>,
622 idle: Mutex<Vec<IdleConnection>>,
623 options: Option<ConnectOptions>,
624 acquire_timeout: Option<Duration>,
625 idle_timeout: Option<Duration>,
626 max_lifetime: Option<Duration>,
627}
628
629struct IdleConnection {
630 conn: ManagedConnection,
631 created_at: Instant,
632 idle_since: Instant,
633 on_connect_ran: bool,
634}
635
636type HookFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T>> + Send + 'a>>;
637
638trait OnConnectHook: Send + Sync {
639 fn call<'a>(&'a self, conn: &'a mut PooledConnection) -> HookFuture<'a, ()>;
640}
641
642impl<F> OnConnectHook for F
643where
644 F: Send + Sync + 'static + for<'a> Fn(&'a mut PooledConnection) -> HookFuture<'a, ()>,
645{
646 fn call<'a>(&'a self, conn: &'a mut PooledConnection) -> HookFuture<'a, ()> {
647 Box::pin(self(conn))
648 }
649}
650
651trait BeforeAcquireHook: Send + Sync {
652 fn call<'a>(&'a self, conn: &'a mut PooledConnection) -> HookFuture<'a, AcquireDecision>;
653}
654
655impl<F> BeforeAcquireHook for F
656where
657 F: Send
658 + Sync
659 + 'static
660 + for<'a> Fn(&'a mut PooledConnection) -> HookFuture<'a, AcquireDecision>,
661{
662 fn call<'a>(&'a self, conn: &'a mut PooledConnection) -> HookFuture<'a, AcquireDecision> {
663 Box::pin(self(conn))
664 }
665}
666
667#[derive(Debug, Clone, Copy, PartialEq, Eq)]
669pub enum AcquireDecision {
670 Accept,
672 Retry,
674}
675
676#[derive(Default)]
706pub struct Hooks {
707 on_connect: Option<Arc<dyn OnConnectHook>>,
708 before_acquire: Option<Arc<dyn BeforeAcquireHook>>,
709}
710
711impl Hooks {
712 pub fn new() -> Self {
714 Self::default()
715 }
716
717 pub fn on_connect<F>(mut self, hook: F) -> Self
723 where
724 F: Send + Sync + 'static + for<'a> Fn(&'a mut PooledConnection) -> HookFuture<'a, ()>,
725 {
726 self.on_connect = Some(Arc::new(hook));
727 self
728 }
729
730 pub fn before_acquire<F>(mut self, hook: F) -> Self
739 where
740 F: Send
741 + Sync
742 + 'static
743 + for<'a> Fn(&'a mut PooledConnection) -> HookFuture<'a, AcquireDecision>,
744 {
745 self.before_acquire = Some(Arc::new(hook));
746 self
747 }
748}
749
750impl Pool {
751 pub fn connect(options: impl IntoConnectOptions) -> Result<PoolBuilder> {
753 PoolBuilder::new().connect(options)
754 }
755
756 pub fn with_hooks(mut self, hooks: Hooks) -> Self {
761 self.hooks = Some(Arc::new(hooks));
762 self
763 }
764
765 pub async fn acquire(&self) -> Result<PooledConnection> {
777 let deadline = self
778 .inner
779 .acquire_timeout
780 .map(|timeout| Instant::now() + timeout);
781 let permit = self.acquire_permit(deadline).await?;
782 self.checkout(permit, deadline).await
783 }
784
785 pub async fn try_acquire(&self) -> Result<PooledConnection> {
794 let permit = self
795 .inner
796 .permits
797 .clone()
798 .try_acquire_owned()
799 .map_err(|err| match err {
800 TryAcquireError::NoPermits => Error::PoolExhausted,
801 TryAcquireError::Closed => Error::PoolClosed,
802 })?;
803
804 self.checkout(permit, None).await
805 }
806
807 async fn acquire_permit(&self, deadline: Option<Instant>) -> Result<OwnedSemaphorePermit> {
808 match deadline {
809 Some(deadline) => timeout(
810 self.remaining_timeout(deadline)?,
811 self.inner.permits.clone().acquire_owned(),
812 )
813 .await
814 .map_err(|_| Error::PoolTimedOut)?
815 .map_err(|_| Error::PoolClosed),
816 None => self
817 .inner
818 .permits
819 .clone()
820 .acquire_owned()
821 .await
822 .map_err(|_| Error::PoolClosed),
823 }
824 }
825
826 fn remaining_timeout(&self, deadline: Instant) -> Result<Duration> {
827 deadline
828 .checked_duration_since(Instant::now())
829 .ok_or(Error::PoolTimedOut)
830 }
831
832 async fn checkout(
833 &self,
834 permit: OwnedSemaphorePermit,
835 deadline: Option<Instant>,
836 ) -> Result<PooledConnection> {
837 let mut permit = Some(permit);
838 loop {
839 let (conn, created_at, on_connect_ran, fresh) = match self.take_idle_connection() {
840 Some(entry) => (entry.conn, entry.created_at, entry.on_connect_ran, false),
841 None => {
842 let options = self.inner.options.clone().ok_or_else(|| {
843 Error::Unsupported("pool cannot reopen this connection".into())
844 })?;
845 let conn = match deadline {
846 Some(deadline) => {
847 timeout(self.remaining_timeout(deadline)?, connect_managed(options))
848 .await
849 .map_err(|_| Error::PoolTimedOut)??
850 }
851 None => connect_managed(options).await?,
852 };
853 (conn, Instant::now(), false, true)
854 }
855 };
856
857 let mut pooled = PooledConnection {
858 pool: Arc::clone(&self.inner),
859 permit: Some(permit.take().expect("pool permit missing")),
860 conn: Some(conn),
861 reusable: true,
862 created_at,
863 on_connect_ran,
864 };
865
866 if !pooled.on_connect_ran {
867 if let Some(hook) = self
868 .hooks
869 .as_ref()
870 .and_then(|hooks| hooks.on_connect.as_ref())
871 {
872 let result = match deadline {
873 Some(deadline) => {
874 timeout(self.remaining_timeout(deadline)?, hook.call(&mut pooled))
875 .await
876 .map_err(|_| Error::PoolTimedOut)?
877 }
878 None => hook.call(&mut pooled).await,
879 };
880 if let Err(err) = result {
881 pooled.mark_broken();
882 return Err(err);
883 }
884 pooled.on_connect_ran = true;
885 }
886 }
887
888 if let Some(hook) = self
889 .hooks
890 .as_ref()
891 .and_then(|hooks| hooks.before_acquire.as_ref())
892 {
893 let decision = match deadline {
894 Some(deadline) => {
895 timeout(self.remaining_timeout(deadline)?, hook.call(&mut pooled))
896 .await
897 .map_err(|_| Error::PoolTimedOut)?
898 }
899 None => hook.call(&mut pooled).await,
900 };
901 match decision {
902 Ok(AcquireDecision::Accept) => return Ok(pooled),
903 Ok(AcquireDecision::Retry) => {
904 pooled.mark_broken();
905 if fresh && deadline.is_none() {
906 return Err(Error::Unsupported(
907 "before_acquire rejected a fresh connection".into(),
908 ));
909 }
910 permit = Some(pooled.permit.take().expect("pool permit missing"));
911 drop(pooled);
912 continue;
913 }
914 Err(err) => {
915 pooled.mark_broken();
916 return Err(err);
917 }
918 }
919 }
920
921 return Ok(pooled);
922 }
923 }
924
925 fn take_idle_connection(&self) -> Option<IdleConnection> {
926 let now = Instant::now();
927 let mut idle = self.inner.idle.lock().expect("pool mutex poisoned");
928 while let Some(entry) = idle.pop() {
929 if self
930 .inner
931 .idle_timeout
932 .is_some_and(|limit| now.duration_since(entry.idle_since) >= limit)
933 {
934 continue;
935 }
936 if self
937 .inner
938 .max_lifetime
939 .is_some_and(|limit| now.duration_since(entry.created_at) >= limit)
940 {
941 continue;
942 }
943 return Some(entry);
944 }
945 None
946 }
947
948 pub fn max_size(&self) -> usize {
950 self.inner.max_size
951 }
952
953 pub fn driver(&self) -> Driver {
955 self.inner.driver
956 }
957
958 pub fn idle_count(&self) -> usize {
960 self.inner.idle.lock().expect("pool mutex poisoned").len()
961 }
962
963 pub fn available_permits(&self) -> usize {
965 self.inner.permits.available_permits()
966 }
967
968 pub fn is_closed(&self) -> bool {
970 self.inner.permits.is_closed()
971 }
972
973 pub fn close(&self) {
979 self.inner.permits.close();
980 self.inner.idle.lock().expect("pool mutex poisoned").clear();
981 }
982
983 pub async fn query(&self, sql: &str) -> Result<Rows<'_>> {
985 let mut conn = self.acquire().await?;
986 let rows = conn.query(sql).await?;
987 Ok(rows.into_lifetime())
988 }
989
990 pub async fn prepare(&self, sql: &str) -> Result<PoolStatement> {
995 Ok(PoolStatement::new(self.clone(), sql))
996 }
997
998 pub async fn begin(&self) -> Result<PoolTransaction> {
1004 let mut conn = self.acquire().await?;
1005 start_transaction_inner(&mut conn.conn_mut().inner).await?;
1006 Ok(PoolTransaction {
1007 conn,
1008 finished: false,
1009 })
1010 }
1011
1012 pub async fn try_begin(&self) -> Result<PoolTransaction> {
1018 let mut conn = self.try_acquire().await?;
1019 start_transaction_inner(&mut conn.conn_mut().inner).await?;
1020 Ok(PoolTransaction {
1021 conn,
1022 finished: false,
1023 })
1024 }
1025}
1026
1027impl Executor for &Pool {
1028 type Rows<'a>
1029 = Rows<'a>
1030 where
1031 Self: 'a;
1032 type Statement<'a>
1033 = PoolStatement
1034 where
1035 Self: 'a;
1036
1037 fn driver(&self) -> Driver {
1038 Pool::driver(self)
1039 }
1040
1041 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1042 Pool::query(self, sql).await
1043 }
1044
1045 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1046 where
1047 P: ParamSource + ?Sized,
1048 {
1049 let mut stmt = Pool::prepare(self, sql).await?;
1050 let rows = stmt.execute_source(params).await?;
1051 Ok(rows.into_lifetime())
1052 }
1053
1054 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1055 Pool::prepare(self, sql).await
1056 }
1057}
1058
1059impl Executor for &mut Pool {
1060 type Rows<'a>
1061 = Rows<'a>
1062 where
1063 Self: 'a;
1064 type Statement<'a>
1065 = PoolStatement
1066 where
1067 Self: 'a;
1068
1069 fn driver(&self) -> Driver {
1070 Pool::driver(self)
1071 }
1072
1073 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1074 Pool::query(self, sql).await
1075 }
1076
1077 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1078 where
1079 P: ParamSource + ?Sized,
1080 {
1081 let mut stmt = Pool::prepare(self, sql).await?;
1082 let rows = stmt.execute_source(params).await?;
1083 Ok(rows.into_lifetime())
1084 }
1085
1086 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1087 Pool::prepare(self, sql).await
1088 }
1089}
1090
1091impl Executor for Pool {
1092 type Rows<'a>
1093 = Rows<'a>
1094 where
1095 Self: 'a;
1096 type Statement<'a>
1097 = PoolStatement
1098 where
1099 Self: 'a;
1100
1101 fn driver(&self) -> Driver {
1102 self.driver()
1103 }
1104
1105 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1106 Pool::query(self, sql).await
1107 }
1108
1109 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1110 where
1111 P: ParamSource + ?Sized,
1112 {
1113 let mut stmt = Pool::prepare(self, sql).await?;
1114 let rows = stmt.execute_source(params).await?;
1115 Ok(rows.into_lifetime())
1116 }
1117
1118 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1119 Pool::prepare(self, sql).await
1120 }
1121}
1122
1123pub struct PoolBuilder {
1128 max_size: usize,
1129 min_connections: usize,
1130 acquire_timeout: Option<Duration>,
1131 idle_timeout: Option<Duration>,
1132 max_lifetime: Option<Duration>,
1133 options: Option<ConnectOptions>,
1134}
1135
1136impl Default for PoolBuilder {
1137 fn default() -> Self {
1138 Self::new()
1139 }
1140}
1141
1142impl PoolBuilder {
1143 pub fn new() -> Self {
1145 Self {
1146 max_size: 10,
1147 min_connections: 0,
1148 acquire_timeout: None,
1149 idle_timeout: None,
1150 max_lifetime: None,
1151 options: None,
1152 }
1153 }
1154
1155 pub fn max_size(mut self, value: usize) -> Self {
1159 self.max_size = value;
1160 self
1161 }
1162
1163 pub fn min_connections(mut self, value: usize) -> Self {
1167 self.min_connections = value;
1168 self
1169 }
1170
1171 pub fn acquire_timeout(mut self, value: Duration) -> Self {
1177 self.acquire_timeout = Some(value);
1178 self
1179 }
1180
1181 pub fn idle_timeout(mut self, value: Duration) -> Self {
1186 self.idle_timeout = Some(value);
1187 self
1188 }
1189
1190 pub fn max_lifetime(mut self, value: Duration) -> Self {
1195 self.max_lifetime = Some(value);
1196 self
1197 }
1198
1199 pub fn connect(mut self, url: impl IntoConnectOptions) -> Result<Self> {
1203 self.options = Some(url.into_connect_options()?);
1204 Ok(self)
1205 }
1206
1207 pub async fn build(self) -> Result<Pool> {
1213 if self.max_size == 0 {
1214 return Err(Error::Unsupported(
1215 "pool max_size must be greater than zero".into(),
1216 ));
1217 }
1218 if self.min_connections > self.max_size {
1219 return Err(Error::Unsupported(
1220 "pool min_connections cannot be greater than max_size".into(),
1221 ));
1222 }
1223
1224 let options = self
1225 .options
1226 .ok_or_else(|| Error::Unsupported("pool requires connection options".into()))?;
1227 let driver = options
1228 .driver
1229 .ok_or_else(|| Error::Unsupported("pool requires a driver".into()))?;
1230
1231 let now = Instant::now();
1232 let mut idle = Vec::with_capacity(self.min_connections);
1233 for _ in 0..self.min_connections {
1234 idle.push(IdleConnection {
1235 conn: connect_managed(options.clone()).await?,
1236 created_at: now,
1237 idle_since: now,
1238 on_connect_ran: false,
1239 });
1240 }
1241
1242 Ok(Pool {
1243 inner: Arc::new(PoolInner {
1244 driver,
1245 max_size: self.max_size,
1246 permits: Arc::new(Semaphore::new(self.max_size)),
1247 idle: Mutex::new(idle),
1248 options: Some(options),
1249 acquire_timeout: self.acquire_timeout,
1250 idle_timeout: self.idle_timeout,
1251 max_lifetime: self.max_lifetime,
1252 }),
1253 hooks: None,
1254 })
1255 }
1256}
1257
1258pub struct PooledConnection {
1266 pool: Arc<PoolInner>,
1267 permit: Option<OwnedSemaphorePermit>,
1268 conn: Option<ManagedConnection>,
1269 reusable: bool,
1270 created_at: Instant,
1271 on_connect_ran: bool,
1272}
1273
1274impl PooledConnection {
1275 fn conn_mut(&mut self) -> &mut ManagedConnection {
1276 self.conn.as_mut().expect("pooled connection missing")
1277 }
1278
1279 fn mark_broken(&mut self) {
1280 self.reusable = false;
1281 }
1282
1283 pub fn driver(&self) -> Driver {
1285 self.conn
1286 .as_ref()
1287 .expect("pooled connection missing")
1288 .driver
1289 }
1290
1291 pub async fn query(&mut self, sql: &str) -> Result<Rows<'_>> {
1293 let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1294 match query_inner(
1295 &mut conn.as_mut().expect("pooled connection missing").inner,
1296 sql,
1297 )
1298 .await
1299 {
1300 Ok(rows) => Ok(rows),
1301 Err(err) => {
1302 *reusable = false;
1303 Err(err)
1304 }
1305 }
1306 }
1307
1308 pub async fn prepare(&mut self, sql: &str) -> Result<PooledStatement<'_>> {
1310 let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1311 match prepare_inner(
1312 &mut conn.as_mut().expect("pooled connection missing").inner,
1313 sql,
1314 )
1315 .await
1316 {
1317 Ok(stmt) => Ok(PooledStatement { stmt, reusable }),
1318 Err(err) => {
1319 *reusable = false;
1320 Err(err)
1321 }
1322 }
1323 }
1324
1325 pub async fn begin(&mut self) -> Result<PooledTransaction<'_>> {
1330 let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1331 match start_transaction_inner(&mut conn.as_mut().expect("pooled connection missing").inner)
1332 .await
1333 {
1334 Ok(()) => Ok(PooledTransaction {
1335 pooled: self,
1336 finished: false,
1337 }),
1338 Err(err) => {
1339 *reusable = false;
1340 Err(err)
1341 }
1342 }
1343 }
1344
1345 #[cfg(feature = "postgres")]
1347 pub async fn listen(&mut self, channel: &str) -> Result<()> {
1348 let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1349 match &mut conn.as_mut().expect("pooled connection missing").inner {
1350 crate::connection::ConnectionInner::Postgres(conn) => {
1351 match conn.listen(channel).await {
1352 Ok(()) => Ok(()),
1353 Err(err) => {
1354 *reusable = false;
1355 Err(err.into())
1356 }
1357 }
1358 }
1359 _ => Err(Error::Unsupported(
1360 "postgres notifications require a postgres connection".into(),
1361 )),
1362 }
1363 }
1364
1365 #[cfg(feature = "postgres")]
1367 pub async fn unlisten(&mut self, channel: &str) -> Result<()> {
1368 let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1369 match &mut conn.as_mut().expect("pooled connection missing").inner {
1370 crate::connection::ConnectionInner::Postgres(conn) => {
1371 match conn.unlisten(channel).await {
1372 Ok(()) => Ok(()),
1373 Err(err) => {
1374 *reusable = false;
1375 Err(err.into())
1376 }
1377 }
1378 }
1379 _ => Err(Error::Unsupported(
1380 "postgres notifications require a postgres connection".into(),
1381 )),
1382 }
1383 }
1384
1385 #[cfg(feature = "postgres")]
1387 pub async fn unlisten_all(&mut self) -> Result<()> {
1388 let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1389 match &mut conn.as_mut().expect("pooled connection missing").inner {
1390 crate::connection::ConnectionInner::Postgres(conn) => match conn.unlisten_all().await {
1391 Ok(()) => Ok(()),
1392 Err(err) => {
1393 *reusable = false;
1394 Err(err.into())
1395 }
1396 },
1397 _ => Err(Error::Unsupported(
1398 "postgres notifications require a postgres connection".into(),
1399 )),
1400 }
1401 }
1402
1403 #[cfg(feature = "postgres")]
1405 pub async fn notify(&mut self, channel: &str, payload: Option<&str>) -> Result<()> {
1406 let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1407 match &mut conn.as_mut().expect("pooled connection missing").inner {
1408 crate::connection::ConnectionInner::Postgres(conn) => {
1409 match conn.notify(channel, payload).await {
1410 Ok(()) => Ok(()),
1411 Err(err) => {
1412 *reusable = false;
1413 Err(err.into())
1414 }
1415 }
1416 }
1417 _ => Err(Error::Unsupported(
1418 "postgres notifications require a postgres connection".into(),
1419 )),
1420 }
1421 }
1422
1423 #[cfg(feature = "postgres")]
1425 pub async fn try_recv_notification(&mut self) -> Result<Option<crate::PostgresNotification>> {
1426 let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1427 match &mut conn.as_mut().expect("pooled connection missing").inner {
1428 crate::connection::ConnectionInner::Postgres(conn) => {
1429 match conn.try_recv_notification().await {
1430 Ok(notification) => Ok(notification),
1431 Err(err) => {
1432 *reusable = false;
1433 Err(err.into())
1434 }
1435 }
1436 }
1437 _ => Err(Error::Unsupported(
1438 "postgres notifications require a postgres connection".into(),
1439 )),
1440 }
1441 }
1442
1443 #[cfg(feature = "postgres")]
1445 pub async fn wait_for_notification(&mut self) -> Result<crate::PostgresNotification> {
1446 let (conn, reusable) = (&mut self.conn, &mut self.reusable);
1447 match &mut conn.as_mut().expect("pooled connection missing").inner {
1448 crate::connection::ConnectionInner::Postgres(conn) => {
1449 match conn.wait_for_notification().await {
1450 Ok(notification) => Ok(notification),
1451 Err(err) => {
1452 *reusable = false;
1453 Err(err.into())
1454 }
1455 }
1456 }
1457 _ => Err(Error::Unsupported(
1458 "postgres notifications require a postgres connection".into(),
1459 )),
1460 }
1461 }
1462}
1463
1464pub struct PoolStatement {
1473 pool: Pool,
1474 sql: String,
1475}
1476
1477impl PoolStatement {
1478 fn new(pool: Pool, sql: &str) -> Self {
1479 Self {
1480 pool,
1481 sql: sql.into(),
1482 }
1483 }
1484
1485 pub fn bind<T>(&mut self, value: T) -> BoundStatement<'_, Self>
1487 where
1488 T: Encode,
1489 {
1490 BoundStatement::new(self).bind(value)
1491 }
1492
1493 pub async fn execute_source<P>(&mut self, params: &P) -> Result<Rows<'_>>
1495 where
1496 P: ParamSource + ?Sized,
1497 {
1498 let mut conn = self.pool.acquire().await?;
1499 let mut stmt = prepare_inner(&mut conn.conn_mut().inner, &self.sql).await?;
1500 let rows = stmt.execute_source(params).await?;
1501 Ok(rows.into_lifetime())
1502 }
1503
1504 pub async fn exec_source<P>(&mut self, params: &P) -> Result<ExecResult>
1506 where
1507 P: ParamSource + ?Sized,
1508 {
1509 let mut conn = self.pool.acquire().await?;
1510 let mut stmt = prepare_inner(&mut conn.conn_mut().inner, &self.sql).await?;
1511 stmt.exec_source(params).await
1512 }
1513}
1514
1515impl PreparedStatement for PoolStatement {
1516 type Rows<'a>
1517 = Rows<'a>
1518 where
1519 Self: 'a;
1520
1521 async fn execute_source<P>(&mut self, params: &P) -> Result<Self::Rows<'_>>
1522 where
1523 P: ParamSource + ?Sized,
1524 {
1525 PoolStatement::execute_source(self, params).await
1526 }
1527
1528 async fn exec_source<P>(&mut self, params: &P) -> Result<ExecResult>
1529 where
1530 P: ParamSource + ?Sized,
1531 {
1532 PoolStatement::exec_source(self, params).await
1533 }
1534}
1535
1536impl Executor for &mut PooledConnection {
1537 type Rows<'a>
1538 = Rows<'a>
1539 where
1540 Self: 'a;
1541 type Statement<'a>
1542 = PooledStatement<'a>
1543 where
1544 Self: 'a;
1545
1546 fn driver(&self) -> Driver {
1547 PooledConnection::driver(self)
1548 }
1549
1550 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1551 PooledConnection::query(*self, sql).await
1552 }
1553
1554 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1555 where
1556 P: ParamSource + ?Sized,
1557 {
1558 let mut stmt = PooledConnection::prepare(*self, sql).await?;
1559 let rows = stmt.execute_source(params).await?;
1560 Ok(rows.into_lifetime())
1561 }
1562
1563 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1564 PooledConnection::prepare(*self, sql).await
1565 }
1566}
1567
1568impl Executor for PooledConnection {
1569 type Rows<'a>
1570 = Rows<'a>
1571 where
1572 Self: 'a;
1573 type Statement<'a>
1574 = PooledStatement<'a>
1575 where
1576 Self: 'a;
1577
1578 fn driver(&self) -> Driver {
1579 self.driver()
1580 }
1581
1582 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1583 PooledConnection::query(self, sql).await
1584 }
1585
1586 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1587 where
1588 P: ParamSource + ?Sized,
1589 {
1590 let mut stmt = PooledConnection::prepare(self, sql).await?;
1591 let rows = stmt.execute_source(params).await?;
1592 Ok(rows.into_lifetime())
1593 }
1594
1595 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1596 PooledConnection::prepare(self, sql).await
1597 }
1598}
1599
1600impl Drop for PooledConnection {
1601 fn drop(&mut self) {
1602 let conn = self.conn.take();
1603 if let Some(conn) = conn {
1604 let now = Instant::now();
1605 let expired_by_lifetime = self
1606 .pool
1607 .max_lifetime
1608 .is_some_and(|limit| now.duration_since(self.created_at) >= limit);
1609 if self.reusable && !self.pool.permits.is_closed() && !expired_by_lifetime {
1610 self.pool
1611 .idle
1612 .lock()
1613 .expect("pool mutex poisoned")
1614 .push(IdleConnection {
1615 conn,
1616 created_at: self.created_at,
1617 idle_since: now,
1618 on_connect_ran: self.on_connect_ran,
1619 });
1620 }
1621 }
1622 self.permit.take();
1623 }
1624}
1625
1626pub struct PooledTransaction<'a> {
1632 pooled: &'a mut PooledConnection,
1633 finished: bool,
1634}
1635
1636impl<'a> PooledTransaction<'a> {
1637 pub fn driver(&self) -> Driver {
1639 self.pooled.driver()
1640 }
1641
1642 pub async fn query(&mut self, sql: &str) -> Result<Rows<'_>> {
1644 self.pooled.query(sql).await
1645 }
1646
1647 pub async fn prepare(&mut self, sql: &str) -> Result<PooledStatement<'_>> {
1649 self.pooled.prepare(sql).await
1650 }
1651
1652 pub async fn commit(mut self) -> Result<()> {
1654 self.finished = true;
1655 match commit_inner(&mut self.pooled.conn_mut().inner).await {
1656 Ok(()) => Ok(()),
1657 Err(err) => {
1658 self.pooled.mark_broken();
1659 Err(err)
1660 }
1661 }
1662 }
1663
1664 pub async fn rollback(mut self) -> Result<()> {
1666 self.finished = true;
1667 match rollback_inner(&mut self.pooled.conn_mut().inner).await {
1668 Ok(()) => Ok(()),
1669 Err(err) => {
1670 self.pooled.mark_broken();
1671 Err(err)
1672 }
1673 }
1674 }
1675
1676 pub fn connection(&mut self) -> &mut PooledConnection {
1678 self.pooled
1679 }
1680}
1681
1682impl Executor for &mut PooledTransaction<'_> {
1683 type Rows<'a>
1684 = Rows<'a>
1685 where
1686 Self: 'a;
1687 type Statement<'a>
1688 = PooledStatement<'a>
1689 where
1690 Self: 'a;
1691
1692 fn driver(&self) -> Driver {
1693 PooledTransaction::driver(self)
1694 }
1695
1696 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1697 PooledTransaction::query(*self, sql).await
1698 }
1699
1700 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1701 where
1702 P: ParamSource + ?Sized,
1703 {
1704 let mut stmt = PooledTransaction::prepare(*self, sql).await?;
1705 let rows = stmt.execute_source(params).await?;
1706 Ok(rows.into_lifetime())
1707 }
1708
1709 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1710 PooledTransaction::prepare(*self, sql).await
1711 }
1712}
1713
1714impl Executor for PooledTransaction<'_> {
1715 type Rows<'a>
1716 = Rows<'a>
1717 where
1718 Self: 'a;
1719 type Statement<'a>
1720 = PooledStatement<'a>
1721 where
1722 Self: 'a;
1723
1724 fn driver(&self) -> Driver {
1725 self.driver()
1726 }
1727
1728 async fn query(&mut self, sql: &str) -> Result<Self::Rows<'_>> {
1729 PooledTransaction::query(self, sql).await
1730 }
1731
1732 async fn query_prepared_source<P>(&mut self, sql: &str, params: &P) -> Result<Self::Rows<'_>>
1733 where
1734 P: ParamSource + ?Sized,
1735 {
1736 let mut stmt = PooledTransaction::prepare(self, sql).await?;
1737 let rows = stmt.execute_source(params).await?;
1738 Ok(rows.into_lifetime())
1739 }
1740
1741 async fn prepare(&mut self, sql: &str) -> Result<Self::Statement<'_>> {
1742 PooledTransaction::prepare(self, sql).await
1743 }
1744}
1745
1746impl Drop for PooledTransaction<'_> {
1747 fn drop(&mut self) {
1748 if !self.finished {
1749 self.pooled.mark_broken();
1750 }
1751 }
1752}
1753
1754pub struct PooledStatement<'a> {
1758 stmt: Statement<'a>,
1759 reusable: &'a mut bool,
1760}
1761
1762impl PooledStatement<'_> {
1763 pub fn bind<T>(&mut self, value: T) -> BoundStatement<'_, Self>
1765 where
1766 T: Encode,
1767 {
1768 BoundStatement::new(self).bind(value)
1769 }
1770
1771 pub async fn execute_source<P>(&mut self, params: &P) -> Result<Rows<'_>>
1773 where
1774 P: ParamSource + ?Sized,
1775 {
1776 match self.stmt.execute_source(params).await {
1777 Ok(rows) => Ok(rows),
1778 Err(err) => {
1779 *self.reusable = false;
1780 Err(err)
1781 }
1782 }
1783 }
1784
1785 pub async fn exec_source<P>(&mut self, params: &P) -> Result<ExecResult>
1787 where
1788 P: ParamSource + ?Sized,
1789 {
1790 match self.stmt.exec_source(params).await {
1791 Ok(result) => Ok(result),
1792 Err(err) => {
1793 *self.reusable = false;
1794 Err(err)
1795 }
1796 }
1797 }
1798}
1799
1800impl PreparedStatement for PooledStatement<'_> {
1801 type Rows<'a>
1802 = Rows<'a>
1803 where
1804 Self: 'a;
1805
1806 async fn execute_source<P>(&mut self, params: &P) -> Result<Self::Rows<'_>>
1807 where
1808 P: ParamSource + ?Sized,
1809 {
1810 PooledStatement::execute_source(self, params).await
1811 }
1812
1813 async fn exec_source<P>(&mut self, params: &P) -> Result<ExecResult>
1814 where
1815 P: ParamSource + ?Sized,
1816 {
1817 PooledStatement::exec_source(self, params).await
1818 }
1819}