sqlx_core_guts/pool/
options.rs

1use crate::connection::Connection;
2use crate::database::Database;
3use crate::error::Error;
4use crate::pool::inner::PoolInner;
5use crate::pool::Pool;
6use futures_core::future::BoxFuture;
7use std::fmt::{self, Debug, Formatter};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11/// Configuration options for [`Pool`][super::Pool].
12///
13/// ### Callback Functions: Why Do I Need `Box::pin()`?
14/// Essentially, because it's impossible to write generic bounds that describe a closure
15/// with a higher-ranked lifetime parameter, returning a future with that same lifetime.
16///
17/// Ideally, you could define it like this:
18/// ```rust,ignore
19/// async fn takes_foo_callback(f: impl for<'a> Fn(&'a mut Foo) -> impl Future<'a, Output = ()>)
20/// ```
21///
22/// However, the compiler does not allow using `impl Trait` in the return type of an `impl Fn`.
23///
24/// And if you try to do it like this:
25/// ```rust,ignore
26/// async fn takes_foo_callback<F, Fut>(f: F)
27/// where
28///     F: for<'a> Fn(&'a mut Foo) -> Fut,
29///     Fut: for<'a> Future<Output = ()> + 'a
30/// ```
31///
32/// There's no way to tell the compiler that those two `'a`s should be the same lifetime.
33///
34/// It's possible to make this work with a custom trait, but it's fiddly and requires naming
35///  the type of the closure parameter.
36///
37/// Having the closure return `BoxFuture` allows us to work around this, as all the type information
38/// fits into a single generic parameter.
39///
40/// We still need to `Box` the future internally to give it a concrete type to avoid leaking a type
41/// parameter everywhere, and `Box` is in the prelude so it doesn't need to be manually imported,
42/// so having the closure return `Pin<Box<dyn Future>` directly is the path of least resistance from
43/// the perspectives of both API designer and consumer.
44#[derive(Clone)]
45pub struct PoolOptions<DB: Database> {
46    pub(crate) test_before_acquire: bool,
47    pub(crate) after_connect: Option<
48        Arc<
49            dyn Fn(&mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'_, Result<(), Error>>
50                + 'static
51                + Send
52                + Sync,
53        >,
54    >,
55    pub(crate) before_acquire: Option<
56        Arc<
57            dyn Fn(
58                    &mut DB::Connection,
59                    PoolConnectionMetadata,
60                ) -> BoxFuture<'_, Result<bool, Error>>
61                + 'static
62                + Send
63                + Sync,
64        >,
65    >,
66    pub(crate) after_release: Option<
67        Arc<
68            dyn Fn(
69                    &mut DB::Connection,
70                    PoolConnectionMetadata,
71                ) -> BoxFuture<'_, Result<bool, Error>>
72                + 'static
73                + Send
74                + Sync,
75        >,
76    >,
77    pub(crate) max_connections: u32,
78    pub(crate) acquire_timeout: Duration,
79    pub(crate) min_connections: u32,
80    pub(crate) max_lifetime: Option<Duration>,
81    pub(crate) idle_timeout: Option<Duration>,
82    pub(crate) fair: bool,
83}
84
85/// Metadata for the connection being processed by a [`PoolOptions`] callback.
86#[derive(Debug)] // Don't want to commit to any other trait impls yet.
87#[non_exhaustive] // So we can safely add fields in the future.
88pub struct PoolConnectionMetadata {
89    /// The duration since the connection was first opened.
90    ///
91    /// For [`after_connect`][PoolOptions::after_connect], this is [`Duration::ZERO`].
92    pub age: Duration,
93
94    /// The duration that the connection spent in the idle queue.
95    ///
96    /// Only relevant for [`before_acquire`][PoolOptions::before_acquire].
97    /// For other callbacks, this is [`Duration::ZERO`].
98    pub idle_for: Duration,
99}
100
101impl<DB: Database> Default for PoolOptions<DB> {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl<DB: Database> PoolOptions<DB> {
108    /// Returns a default "sane" configuration, suitable for testing or light-duty applications.
109    ///
110    /// Production applications will likely want to at least modify
111    /// [`max_connections`][Self::max_connections].
112    ///
113    /// See the source of this method for the current default values.
114    pub fn new() -> Self {
115        Self {
116            // User-specifiable routines
117            after_connect: None,
118            before_acquire: None,
119            after_release: None,
120            test_before_acquire: true,
121            // A production application will want to set a higher limit than this.
122            max_connections: 10,
123            min_connections: 0,
124            acquire_timeout: Duration::from_secs(30),
125            idle_timeout: Some(Duration::from_secs(10 * 60)),
126            max_lifetime: Some(Duration::from_secs(30 * 60)),
127            fair: true,
128        }
129    }
130
131    /// Set the maximum number of connections that this pool should maintain.
132    ///
133    /// Be mindful of the connection limits for your database as well as other applications
134    /// which may want to connect to the same database (or even multiple instances of the same
135    /// application in high-availability deployments).
136    pub fn max_connections(mut self, max: u32) -> Self {
137        self.max_connections = max;
138        self
139    }
140
141    /// Set the minimum number of connections to maintain at all times.
142    ///
143    /// When the pool is built, this many connections will be automatically spun up.
144    ///
145    /// If any connection is reaped by [`max_lifetime`] or [`idle_timeout`], or explicitly closed,
146    /// and it brings the connection count below this amount, a new connection will be opened to
147    /// replace it.
148    ///
149    /// This is only done on a best-effort basis, however. The routine that maintains this value
150    /// has a deadline so it doesn't wait forever if the database is being slow or returning errors.
151    ///
152    /// This value is clamped internally to not exceed [`max_connections`].
153    ///
154    /// We've chosen not to assert `min_connections <= max_connections` anywhere
155    /// because it shouldn't break anything internally if the condition doesn't hold,
156    /// and if the application allows either value to be dynamically set
157    /// then it should be checking this condition itself and returning
158    /// a nicer error than a panic anyway.
159    ///
160    /// [`max_lifetime`]: Self::max_lifetime
161    /// [`idle_timeout`]: Self::idle_timeout
162    /// [`max_connections`]: Self::max_connections
163    pub fn min_connections(mut self, min: u32) -> Self {
164        self.min_connections = min;
165        self
166    }
167
168    /// Set the maximum amount of time to spend waiting for a connection in [`Pool::acquire()`].
169    ///
170    /// Caps the total amount of time `Pool::acquire()` can spend waiting across multiple phases:
171    ///
172    /// * First, it may need to wait for a permit from the semaphore, which grants it the privilege
173    ///   of opening a connection or popping one from the idle queue.
174    /// * If an existing idle connection is acquired, by default it will be checked for liveness
175    ///   and integrity before being returned, which may require executing a command on the
176    ///   connection. This can be disabled with [`test_before_acquire(false)`][Self::test_before_acquire].
177    ///     * If [`before_acquire`][Self::before_acquire] is set, that will also be executed.
178    /// * If a new connection needs to be opened, that will obviously require I/O, handshaking,
179    ///   and initialization commands.
180    ///     * If [`after_connect`][Self::after_connect] is set, that will also be executed.
181    pub fn acquire_timeout(mut self, timeout: Duration) -> Self {
182        self.acquire_timeout = timeout;
183        self
184    }
185
186    /// Set the maximum lifetime of individual connections.
187    ///
188    /// Any connection with a lifetime greater than this will be closed.
189    ///
190    /// When set to `None`, all connections live until either reaped by [`idle_timeout`]
191    /// or explicitly disconnected.
192    ///
193    /// Infinite connections are not recommended due to the unfortunate reality of memory/resource
194    /// leaks on the database-side. It is better to retire connections periodically
195    /// (even if only once daily) to allow the database the opportunity to clean up data structures
196    /// (parse trees, query metadata caches, thread-local storage, etc.) that are associated with a
197    /// session.
198    ///
199    /// [`idle_timeout`]: Self::idle_timeout
200    pub fn max_lifetime(mut self, lifetime: impl Into<Option<Duration>>) -> Self {
201        self.max_lifetime = lifetime.into();
202        self
203    }
204
205    /// Set a maximum idle duration for individual connections.
206    ///
207    /// Any connection that remains in the idle queue longer than this will be closed.
208    ///
209    /// For usage-based database server billing, this can be a cost saver.
210    pub fn idle_timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
211        self.idle_timeout = timeout.into();
212        self
213    }
214
215    /// If true, the health of a connection will be verified by a call to [`Connection::ping`]
216    /// before returning the connection.
217    ///
218    /// Defaults to `true`.
219    pub fn test_before_acquire(mut self, test: bool) -> Self {
220        self.test_before_acquire = test;
221        self
222    }
223
224    /// If set to `true`, calls to `acquire()` are fair and connections  are issued
225    /// in first-come-first-serve order. If `false`, "drive-by" tasks may steal idle connections
226    /// ahead of tasks that have been waiting.
227    ///
228    /// According to `sqlx-bench/benches/pg_pool` this may slightly increase time
229    /// to `acquire()` at low pool contention but at very high contention it helps
230    /// avoid tasks at the head of the waiter queue getting repeatedly preempted by
231    /// these "drive-by" tasks and tasks further back in the queue timing out because
232    /// the queue isn't moving.
233    ///
234    /// Currently only exposed for benchmarking; `fair = true` seems to be the superior option
235    /// in most cases.
236    #[doc(hidden)]
237    pub fn __fair(mut self, fair: bool) -> Self {
238        self.fair = fair;
239        self
240    }
241
242    /// Perform an asynchronous action after connecting to the database.
243    ///
244    /// If the operation returns with an error then the error is logged, the connection is closed
245    /// and a new one is opened in its place and the callback is invoked again.
246    ///
247    /// This occurs in a backoff loop to avoid high CPU usage and spamming logs during a transient
248    /// error condition.
249    ///
250    /// Note that this may be called for internally opened connections, such as when maintaining
251    /// [`min_connections`][Self::min_connections], that are then immediately returned to the pool
252    /// without invoking [`after_release`][Self::after_release].
253    ///
254    /// # Example: Additional Parameters
255    /// This callback may be used to set additional configuration parameters
256    /// that are not exposed by the database's `ConnectOptions`.
257    ///
258    /// This example is written for PostgreSQL but can likely be adapted to other databases.
259    ///
260    /// ```no_run
261    /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
262    /// use sqlx::Executor;
263    /// use sqlx::postgres::PgPoolOptions;
264    ///
265    /// let pool = PgPoolOptions::new()
266    ///     .after_connect(|conn, _meta| Box::pin(async move {
267    ///         // When directly invoking `Executor` methods,
268    ///         // it is possible to execute multiple statements with one call.
269    ///         conn.execute("SET application_name = 'your_app'; SET search_path = 'my_schema';")
270    ///             .await?;
271    ///
272    ///         Ok(())
273    ///     }))
274    ///     .connect("postgres:// …").await?;
275    /// # Ok(())
276    /// # }
277    /// ```
278    ///
279    /// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self].
280    pub fn after_connect<F>(mut self, callback: F) -> Self
281    where
282        // We're passing the `PoolConnectionMetadata` here mostly for future-proofing.
283        // `age` and `idle_for` are obviously not useful for fresh connections.
284        for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<(), Error>>
285            + 'static
286            + Send
287            + Sync,
288    {
289        self.after_connect = Some(Arc::new(callback));
290        self
291    }
292
293    /// Perform an asynchronous action on a previously idle connection before giving it out.
294    ///
295    /// Alongside the connection, the closure gets [`PoolConnectionMetadata`] which contains
296    /// potentially useful information such as the connection's age and the duration it was
297    /// idle.
298    ///
299    /// If the operation returns `Ok(true)`, the connection is returned to the task that called
300    /// [`Pool::acquire`].
301    ///
302    /// If the operation returns `Ok(false)` or an error, the error is logged (if applicable)
303    /// and then the connection is closed and [`Pool::acquire`] tries again with another idle
304    /// connection. If it runs out of idle connections, it opens a new connection instead.
305    ///
306    /// This is *not* invoked for new connections. Use [`after_connect`][Self::after_connect]
307    /// for those.
308    ///
309    /// # Example: Custom `test_before_acquire` Logic
310    /// If you only want to ping connections if they've been idle a certain amount of time,
311    /// you can implement your own logic here:
312    ///
313    /// This example is written for Postgres but should be trivially adaptable to other databases.
314    /// ```no_run
315    /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
316    /// use sqlx::{Connection, Executor};
317    /// use sqlx::postgres::PgPoolOptions;
318    ///
319    /// let pool = PgPoolOptions::new()
320    ///     .test_before_acquire(false)
321    ///     .before_acquire(|conn, meta| Box::pin(async move {
322    ///         // One minute
323    ///         if meta.idle_for.as_secs() > 60 {
324    ///             conn.ping().await?;
325    ///         }
326    ///
327    ///         Ok(true)
328    ///     }))
329    ///     .connect("postgres:// …").await?;
330    /// # Ok(())
331    /// # }
332    ///```
333    ///
334    /// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self].
335    pub fn before_acquire<F>(mut self, callback: F) -> Self
336    where
337        for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<bool, Error>>
338            + 'static
339            + Send
340            + Sync,
341    {
342        self.before_acquire = Some(Arc::new(callback));
343        self
344    }
345
346    /// Perform an asynchronous action on a connection before it is returned to the pool.
347    ///
348    /// Alongside the connection, the closure gets [`PoolConnectionMetadata`] which contains
349    /// potentially useful information such as the connection's age.
350    ///
351    /// If the operation returns `Ok(true)`, the connection is returned to the pool's idle queue.
352    /// If the operation returns `Ok(false)` or an error, the error is logged (if applicable)
353    /// and the connection is closed, allowing a task waiting on [`Pool::acquire`] to
354    /// open a new one in its place.
355    ///
356    /// # Example (Postgres): Close Memory-Hungry Connections
357    /// Instead of relying on [`max_lifetime`][Self::max_lifetime] to close connections,
358    /// we can monitor their memory usage directly and close any that have allocated too much.
359    ///
360    /// Note that this is purely an example showcasing a possible use for this callback
361    /// and may be flawed as it has not been tested.
362    ///
363    /// This example queries [`pg_backend_memory_contexts`](https://www.postgresql.org/docs/current/view-pg-backend-memory-contexts.html)
364    /// which is only allowed for superusers.
365    ///
366    /// ```no_run
367    /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
368    /// use sqlx::{Connection, Executor};
369    /// use sqlx::postgres::PgPoolOptions;
370    ///
371    /// let pool = PgPoolOptions::new()
372    ///     // Let connections live as long as they want.
373    ///     .max_lifetime(None)
374    ///     .after_release(|conn, meta| Box::pin(async move {
375    ///         // Only check connections older than 6 hours.
376    ///         if meta.age.as_secs() < 6 * 60 * 60 {
377    ///             return Ok(true);
378    ///         }
379    ///
380    ///         let total_memory_usage: i64 = sqlx::query_scalar(
381    ///             "select sum(used_bytes) from pg_backend_memory_contexts"
382    ///         )
383    ///         .fetch_one(conn)
384    ///         .await?;
385    ///
386    ///         // Close the connection if the backend memory usage exceeds 256 MiB.
387    ///         Ok(total_memory_usage <= (2 << 28))
388    ///     }))
389    ///     .connect("postgres:// …").await?;
390    /// # Ok(())
391    /// # }
392    pub fn after_release<F>(mut self, callback: F) -> Self
393    where
394        for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<bool, Error>>
395            + 'static
396            + Send
397            + Sync,
398    {
399        self.after_release = Some(Arc::new(callback));
400        self
401    }
402
403    /// Create a new pool from this `PoolOptions` and immediately open at least one connection.
404    ///
405    /// This ensures the configuration is correct.
406    ///
407    /// The total number of connections opened is <code>min(1, [min_connections][Self::min_connections])</code>.
408    ///
409    /// Refer to the relevant `ConnectOptions` impl for your database for the expected URL format:
410    ///
411    /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions]
412    /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions]
413    /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions]
414    /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions]
415    pub async fn connect(self, url: &str) -> Result<Pool<DB>, Error> {
416        self.connect_with(url.parse()?).await
417    }
418
419    /// Create a new pool from this `PoolOptions` and immediately open at least one connection.
420    ///
421    /// This ensures the configuration is correct.
422    ///
423    /// The total number of connections opened is <code>min(1, [min_connections][Self::min_connections])</code>.
424    pub async fn connect_with(
425        self,
426        options: <DB::Connection as Connection>::Options,
427    ) -> Result<Pool<DB>, Error> {
428        // Don't take longer than `acquire_timeout` starting from when this is called.
429        let deadline = Instant::now() + self.acquire_timeout;
430
431        let inner = PoolInner::new_arc(self, options);
432
433        if inner.options.min_connections > 0 {
434            // If the idle reaper is spawned then this will race with the call from that task
435            // and may not report any connection errors.
436            inner.try_min_connections(deadline).await?;
437        }
438
439        // If `min_connections` is nonzero then we'll likely just pull a connection
440        // from the idle queue here, but it should at least get tested first.
441        let conn = inner.acquire().await?;
442        inner.release(conn);
443
444        Ok(Pool(inner))
445    }
446
447    /// Create a new pool from this `PoolOptions`, but don't open any connections right now.
448    ///
449    /// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to
450    /// optimistically establish that many connections for the pool.
451    ///
452    /// Refer to the relevant `ConnectOptions` impl for your database for the expected URL format:
453    ///
454    /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions]
455    /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions]
456    /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions]
457    /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions]
458    pub fn connect_lazy(self, url: &str) -> Result<Pool<DB>, Error> {
459        Ok(self.connect_lazy_with(url.parse()?))
460    }
461
462    /// Create a new pool from this `PoolOptions`, but don't open any connections right now.
463    ///
464    /// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to
465    /// optimistically establish that many connections for the pool.
466    pub fn connect_lazy_with(self, options: <DB::Connection as Connection>::Options) -> Pool<DB> {
467        // `min_connections` is guaranteed by the idle reaper now.
468        Pool(PoolInner::new_arc(self, options))
469    }
470}
471
472impl<DB: Database> Debug for PoolOptions<DB> {
473    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
474        f.debug_struct("PoolOptions")
475            .field("max_connections", &self.max_connections)
476            .field("min_connections", &self.min_connections)
477            .field("connect_timeout", &self.acquire_timeout)
478            .field("max_lifetime", &self.max_lifetime)
479            .field("idle_timeout", &self.idle_timeout)
480            .field("test_before_acquire", &self.test_before_acquire)
481            .finish()
482    }
483}