1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
//! Provides the connection pool for asynchronous SQLx connections. //! //! Opening a database connection for each and every operation to the database can quickly //! become expensive. Furthermore, sharing a database connection between threads and functions //! can be difficult to express in Rust. //! //! A connection pool is a standard technique that can manage opening and re-using connections. //! Normally it also enforces a maximum number of connections as these are an expensive resource //! on the database server. //! //! SQLx provides a canonical connection pool implementation intended to satisfy the majority //! of use cases. //! //! # Opening a connection pool //! //! A new connection pool with a default configuration can be created by supplying `Pool` //! with the database driver and a connection string. //! //! ```rust,ignore //! use sqlx::Pool; //! use sqlx::postgres::Postgres; //! //! let pool = Pool::<Postgres>::connect("postgres://").await?; //! ``` //! //! For convenience, database-specific type aliases are provided: //! //! ```rust,ignore //! use sqlx::mssql::MssqlPool; //! //! let pool = MssqlPool::connect("mssql://").await?; //! ``` //! //! # Using a connection pool //! //! A connection pool implements [`Executor`](../trait.Executor.html) and can be used directly //! when executing a query. Notice that only an immutable reference (`&Pool`) is needed. //! //! ```rust,ignore //! sqlx::query("DELETE FROM articles").execute(&pool).await?; //! ``` //! //! A connection or transaction may also be manually acquired with //! [`Pool::acquire`](struct.Pool.html#method.acquire) or //! [`Pool::begin`](struct.Pool.html#method.begin). //! use self::inner::SharedPool; use crate::connection::Connection; use crate::database::Database; use crate::error::Error; use crate::transaction::Transaction; use std::fmt; use std::future::Future; use std::sync::Arc; use std::time::{Duration, Instant}; #[macro_use] mod executor; #[macro_use] mod maybe; mod connection; mod inner; mod options; pub use self::connection::PoolConnection; pub(crate) use self::maybe::MaybePoolConnection; pub use self::options::PoolOptions; /// An asynchronous pool of SQLx database connections. pub struct Pool<DB: Database>(pub(crate) Arc<SharedPool<DB>>); impl<DB: Database> Pool<DB> { /// Creates a new connection pool with a default pool configuration and /// the given connection URI; and, immediately establishes one connection. pub async fn connect(uri: &str) -> Result<Self, Error> { PoolOptions::<DB>::new().connect(uri).await } /// Creates a new connection pool with a default pool configuration and /// the given connection options; and, immediately establishes one connection. pub async fn connect_with( options: <DB::Connection as Connection>::Options, ) -> Result<Self, Error> { PoolOptions::<DB>::new().connect_with(options).await } /// Creates a new connection pool with a default pool configuration and /// the given connection URI; and, will establish a connections as the pool /// starts to be used. pub fn connect_lazy(uri: &str) -> Result<Self, Error> { PoolOptions::<DB>::new().connect_lazy(uri) } /// Creates a new connection pool with a default pool configuration and /// the given connection options; and, will establish a connections as the pool /// starts to be used. pub fn connect_lazy_with(options: <DB::Connection as Connection>::Options) -> Self { PoolOptions::<DB>::new().connect_lazy_with(options) } /// Retrieves a connection from the pool. /// /// Waits for at most the configured connection timeout before returning an error. pub fn acquire(&self) -> impl Future<Output = Result<PoolConnection<DB>, Error>> + 'static { let shared = self.0.clone(); async move { shared.acquire().await.map(|conn| conn.attach(&shared)) } } /// Attempts to retrieve a connection from the pool if there is one available. /// /// Returns `None` immediately if there are no idle connections available in the pool. pub fn try_acquire(&self) -> Option<PoolConnection<DB>> { self.0.try_acquire().map(|conn| conn.attach(&self.0)) } /// Retrieves a new connection and immediately begins a new transaction. pub async fn begin(&self) -> Result<Transaction<'static, DB>, Error> { Ok(Transaction::begin(MaybePoolConnection::PoolConnection(self.acquire().await?)).await?) } /// Attempts to retrieve a new connection and immediately begins a new transaction if there /// is one available. pub async fn try_begin(&self) -> Result<Option<Transaction<'static, DB>>, Error> { match self.try_acquire() { Some(conn) => Transaction::begin(MaybePoolConnection::PoolConnection(conn)) .await .map(Some), None => Ok(None), } } /// Ends the use of a connection pool. Prevents any new connections /// and will close all active connections when they are returned to the pool. /// /// Does not resolve until all connections are closed. pub async fn close(&self) { self.0.close().await; } /// Returns `true` if [`.close()`][Pool::close] has been called on the pool, `false` otherwise. pub fn is_closed(&self) -> bool { self.0.is_closed() } } /// Returns a new [Pool] tied to the same shared connection pool. impl<DB: Database> Clone for Pool<DB> { fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } } impl<DB: Database> fmt::Debug for Pool<DB> { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Pool") .field("size", &self.0.size()) .field("num_idle", &self.0.num_idle()) .field("is_closed", &self.0.is_closed()) .field("options", &self.0.options) .finish() } } /// get the time between the deadline and now and use that as our timeout /// /// returns `Error::PoolTimedOut` if the deadline is in the past fn deadline_as_timeout<DB: Database>(deadline: Instant) -> Result<Duration, Error> { deadline .checked_duration_since(Instant::now()) .ok_or(Error::PoolTimedOut) } #[test] #[allow(dead_code)] fn assert_pool_traits() { fn assert_send_sync<T: Send + Sync>() {} fn assert_clone<T: Clone>() {} fn assert_pool<DB: Database>() { assert_send_sync::<Pool<DB>>(); assert_clone::<Pool<DB>>(); } }