use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::pool::inner::SharedPool;
use crate::pool::Pool;
use futures_core::future::BoxFuture;
use sqlx_rt::spawn;
use std::cmp;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use std::time::{Duration, Instant};
pub struct PoolOptions<DB: Database> {
pub(crate) test_before_acquire: bool,
pub(crate) after_connect: Option<
Box<
dyn Fn(&mut DB::Connection) -> BoxFuture<'_, Result<(), Error>> + 'static + Send + Sync,
>,
>,
pub(crate) before_acquire: Option<
Box<
dyn Fn(&mut DB::Connection) -> BoxFuture<'_, Result<bool, Error>>
+ 'static
+ Send
+ Sync,
>,
>,
pub(crate) after_release:
Option<Box<dyn Fn(&mut DB::Connection) -> bool + 'static + Send + Sync>>,
pub(crate) max_connections: u32,
pub(crate) connect_timeout: Duration,
pub(crate) min_connections: u32,
pub(crate) max_lifetime: Option<Duration>,
pub(crate) idle_timeout: Option<Duration>,
pub(crate) fair: bool,
}
impl<DB: Database> Default for PoolOptions<DB> {
fn default() -> Self {
Self::new()
}
}
impl<DB: Database> PoolOptions<DB> {
pub fn new() -> Self {
Self {
after_connect: None,
test_before_acquire: true,
before_acquire: None,
after_release: None,
max_connections: 10,
min_connections: 0,
connect_timeout: Duration::from_secs(30),
idle_timeout: Some(Duration::from_secs(10 * 60)),
max_lifetime: Some(Duration::from_secs(30 * 60)),
fair: true,
}
}
/// Set the maximum number of connections that this pool should maintain.
pub fn max_connections(mut self, max: u32) -> Self {
self.max_connections = max;
self
}
/// Set the amount of time to attempt connecting to the database.
///
/// If this timeout elapses, [`Pool::acquire`] will return an error.
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = timeout;
self
}
/// Set the minimum number of connections to maintain at all times.
///
/// When the pool is built, this many connections will be automatically spun up.
///
/// If any connection is reaped by [`max_lifetime`] or [`idle_timeout`] and it brings
/// the connection count below this amount, a new connection will be opened to replace it.
///
/// [`max_lifetime`]: Self::max_lifetime
/// [`idle_timeout`]: Self::idle_timeout
pub fn min_connections(mut self, min: u32) -> Self {
self.min_connections = min;
self
}
/// Set the maximum lifetime of individual connections.
///
/// Any connection with a lifetime greater than this will be closed.
///
/// When set to `None`, all connections live until either reaped by [`idle_timeout`]
/// or explicitly disconnected.
///
/// Infinite connections are not recommended due to the unfortunate reality of memory/resource
/// leaks on the database-side. It is better to retire connections periodically
/// (even if only once daily) to allow the database the opportunity to clean up data structures
/// (parse trees, query metadata caches, thread-local storage, etc.) that are associated with a
/// session.
///
/// [`idle_timeout`]: Self::idle_timeout
pub fn max_lifetime(mut self, lifetime: impl Into<Option<Duration>>) -> Self {
self.max_lifetime = lifetime.into();
self
}
/// Set a maximum idle duration for individual connections.
///
/// Any connection with an idle duration longer than this will be closed.
///
/// For usage-based database server billing, this can be a cost saver.
pub fn idle_timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
self.idle_timeout = timeout.into();
self
}
/// If true, the health of a connection will be verified by a call to [`Connection::ping`]
/// before returning the connection.
///
/// Defaults to `true`.
pub fn test_before_acquire(mut self, test: bool) -> Self {
self.test_before_acquire = test;
self
}
/// If set to `true`, calls to `acquire()` are fair and connections are issued
/// in first-come-first-serve order. If `false`, "drive-by" tasks may steal idle connections
/// ahead of tasks that have been waiting.
///
/// According to `sqlx-bench/benches/pg_pool` this may slightly increase time
/// to `acquire()` at low pool contention but at very high contention it helps
/// avoid tasks at the head of the waiter queue getting repeatedly preempted by
/// these "drive-by" tasks and tasks further back in the queue timing out because
/// the queue isn't moving.
///
/// Currently only exposed for benchmarking; `fair = true` seems to be the superior option
/// in most cases.
#[doc(hidden)]
pub fn __fair(mut self, fair: bool) -> Self {
self.fair = fair;
self
}
/// Perform an action after connecting to the database.
///
/// # Example
///
/// ```no_run
/// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
/// use sqlx_core::executor::Executor;
/// use sqlx_core::postgres::PgPoolOptions;
/// // PostgreSQL
/// let pool = PgPoolOptions::new()
/// .after_connect(|conn| Box::pin(async move {
/// conn.execute("SET application_name = 'your_app';").await?;
/// conn.execute("SET search_path = 'my_schema';").await?;
///
/// Ok(())
/// }))
/// .connect("postgres:// …").await?;
/// # Ok(())
/// # }
/// ```
pub fn after_connect<F>(mut self, callback: F) -> Self
where
for<'c> F:
Fn(&'c mut DB::Connection) -> BoxFuture<'c, Result<(), Error>> + 'static + Send + Sync,
{
self.after_connect = Some(Box::new(callback));
self
}
pub fn before_acquire<F>(mut self, callback: F) -> Self
where
for<'c> F: Fn(&'c mut DB::Connection) -> BoxFuture<'c, Result<bool, Error>>
+ 'static
+ Send
+ Sync,
{
self.before_acquire = Some(Box::new(callback));
self
}
pub fn after_release<F>(mut self, callback: F) -> Self
where
F: Fn(&mut DB::Connection) -> bool + 'static + Send + Sync,
{
self.after_release = Some(Box::new(callback));
self
}
/// Creates a new pool from this configuration and immediately establishes one connection.
pub async fn connect(self, uri: &str) -> Result<Pool<DB>, Error> {
self.connect_with(uri.parse()?).await
}
/// Creates a new pool from this configuration and immediately establishes one connection.
pub async fn connect_with(
self,
options: <DB::Connection as Connection>::Options,
) -> Result<Pool<DB>, Error> {
let shared = SharedPool::new_arc(self, options);
init_min_connections(&shared).await?;
Ok(Pool(shared))
}
/// Creates a new pool from this configuration and will establish a connections as the pool
/// starts to be used.
pub fn connect_lazy(self, uri: &str) -> Result<Pool<DB>, Error> {
Ok(self.connect_lazy_with(uri.parse()?))
}
/// Creates a new pool from this configuration and will establish a connections as the pool
/// starts to be used.
pub fn connect_lazy_with(self, options: <DB::Connection as Connection>::Options) -> Pool<DB> {
let shared = SharedPool::new_arc(self, options);
let _ = spawn({
let shared = Arc::clone(&shared);
async move {
let _ = init_min_connections(&shared).await;
}
});
Pool(shared)
}
}
async fn init_min_connections<DB: Database>(pool: &SharedPool<DB>) -> Result<(), Error> {
for _ in 0..cmp::max(pool.options.min_connections, 1) {
let deadline = Instant::now() + pool.options.connect_timeout;
let permit = pool.semaphore.acquire(1).await;
// this guard will prevent us from exceeding `max_size`
if let Ok(guard) = pool.try_increment_size(permit) {
// [connect] will raise an error when past deadline
let conn = pool.connection(deadline, guard).await?;
pool.release(conn);
}
}
Ok(())
}
impl<DB: Database> Debug for PoolOptions<DB> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolOptions")
.field("max_connections", &self.max_connections)
.field("min_connections", &self.min_connections)
.field("connect_timeout", &self.connect_timeout)
.field("max_lifetime", &self.max_lifetime)
.field("idle_timeout", &self.idle_timeout)
.field("test_before_acquire", &self.test_before_acquire)
.finish()
}
}