sqlx_build_trust_core/pool/options.rs
1use crate::connection::Connection;
2use crate::database::Database;
3use crate::error::Error;
4use crate::pool::inner::PoolInner;
5use crate::pool::Pool;
6use futures_core::future::BoxFuture;
7use std::fmt::{self, Debug, Formatter};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11/// Configuration options for [`Pool`][super::Pool].
12///
13/// ### Callback Functions: Why Do I Need `Box::pin()`?
14/// Essentially, because it's impossible to write generic bounds that describe a closure
15/// with a higher-ranked lifetime parameter, returning a future with that same lifetime.
16///
17/// Ideally, you could define it like this:
18/// ```rust,ignore
19/// async fn takes_foo_callback(f: impl for<'a> Fn(&'a mut Foo) -> impl Future<'a, Output = ()>)
20/// ```
21///
22/// However, the compiler does not allow using `impl Trait` in the return type of an `impl Fn`.
23///
24/// And if you try to do it like this:
25/// ```rust,ignore
26/// async fn takes_foo_callback<F, Fut>(f: F)
27/// where
28/// F: for<'a> Fn(&'a mut Foo) -> Fut,
29/// Fut: for<'a> Future<Output = ()> + 'a
30/// ```
31///
32/// There's no way to tell the compiler that those two `'a`s should be the same lifetime.
33///
34/// It's possible to make this work with a custom trait, but it's fiddly and requires naming
35/// the type of the closure parameter.
36///
37/// Having the closure return `BoxFuture` allows us to work around this, as all the type information
38/// fits into a single generic parameter.
39///
40/// We still need to `Box` the future internally to give it a concrete type to avoid leaking a type
41/// parameter everywhere, and `Box` is in the prelude so it doesn't need to be manually imported,
42/// so having the closure return `Pin<Box<dyn Future>` directly is the path of least resistance from
43/// the perspectives of both API designer and consumer.
44pub struct PoolOptions<DB: Database> {
45 pub(crate) test_before_acquire: bool,
46 pub(crate) after_connect: Option<
47 Arc<
48 dyn Fn(&mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'_, Result<(), Error>>
49 + 'static
50 + Send
51 + Sync,
52 >,
53 >,
54 pub(crate) before_acquire: Option<
55 Arc<
56 dyn Fn(
57 &mut DB::Connection,
58 PoolConnectionMetadata,
59 ) -> BoxFuture<'_, Result<bool, Error>>
60 + 'static
61 + Send
62 + Sync,
63 >,
64 >,
65 pub(crate) after_release: Option<
66 Arc<
67 dyn Fn(
68 &mut DB::Connection,
69 PoolConnectionMetadata,
70 ) -> BoxFuture<'_, Result<bool, Error>>
71 + 'static
72 + Send
73 + Sync,
74 >,
75 >,
76 pub(crate) max_connections: u32,
77 pub(crate) acquire_timeout: Duration,
78 pub(crate) min_connections: u32,
79 pub(crate) max_lifetime: Option<Duration>,
80 pub(crate) idle_timeout: Option<Duration>,
81 pub(crate) fair: bool,
82
83 pub(crate) parent_pool: Option<Pool<DB>>,
84}
85
86// Manually implement `Clone` to avoid a trait bound issue.
87//
88// See: https://github.com/launchbadge/sqlx/issues/2548
89impl<DB: Database> Clone for PoolOptions<DB> {
90 fn clone(&self) -> Self {
91 PoolOptions {
92 test_before_acquire: self.test_before_acquire,
93 after_connect: self.after_connect.clone(),
94 before_acquire: self.before_acquire.clone(),
95 after_release: self.after_release.clone(),
96 max_connections: self.max_connections,
97 acquire_timeout: self.acquire_timeout,
98 min_connections: self.min_connections,
99 max_lifetime: self.max_lifetime,
100 idle_timeout: self.idle_timeout,
101 fair: self.fair,
102 parent_pool: self.parent_pool.as_ref().map(Pool::clone),
103 }
104 }
105}
106
107/// Metadata for the connection being processed by a [`PoolOptions`] callback.
108#[derive(Debug)] // Don't want to commit to any other trait impls yet.
109#[non_exhaustive] // So we can safely add fields in the future.
110pub struct PoolConnectionMetadata {
111 /// The duration since the connection was first opened.
112 ///
113 /// For [`after_connect`][PoolOptions::after_connect], this is [`Duration::ZERO`].
114 pub age: Duration,
115
116 /// The duration that the connection spent in the idle queue.
117 ///
118 /// Only relevant for [`before_acquire`][PoolOptions::before_acquire].
119 /// For other callbacks, this is [`Duration::ZERO`].
120 pub idle_for: Duration,
121}
122
123impl<DB: Database> Default for PoolOptions<DB> {
124 fn default() -> Self {
125 Self::new()
126 }
127}
128
129impl<DB: Database> PoolOptions<DB> {
130 /// Returns a default "sane" configuration, suitable for testing or light-duty applications.
131 ///
132 /// Production applications will likely want to at least modify
133 /// [`max_connections`][Self::max_connections].
134 ///
135 /// See the source of this method for the current default values.
136 pub fn new() -> Self {
137 Self {
138 // User-specifiable routines
139 after_connect: None,
140 before_acquire: None,
141 after_release: None,
142 test_before_acquire: true,
143 // A production application will want to set a higher limit than this.
144 max_connections: 10,
145 min_connections: 0,
146 acquire_timeout: Duration::from_secs(30),
147 idle_timeout: Some(Duration::from_secs(10 * 60)),
148 max_lifetime: Some(Duration::from_secs(30 * 60)),
149 fair: true,
150 parent_pool: None,
151 }
152 }
153
154 /// Set the maximum number of connections that this pool should maintain.
155 ///
156 /// Be mindful of the connection limits for your database as well as other applications
157 /// which may want to connect to the same database (or even multiple instances of the same
158 /// application in high-availability deployments).
159 pub fn max_connections(mut self, max: u32) -> Self {
160 self.max_connections = max;
161 self
162 }
163
164 /// Get the maximum number of connections that this pool should maintain
165 pub fn get_max_connections(&self) -> u32 {
166 self.max_connections
167 }
168
169 /// Set the minimum number of connections to maintain at all times.
170 ///
171 /// When the pool is built, this many connections will be automatically spun up.
172 ///
173 /// If any connection is reaped by [`max_lifetime`] or [`idle_timeout`], or explicitly closed,
174 /// and it brings the connection count below this amount, a new connection will be opened to
175 /// replace it.
176 ///
177 /// This is only done on a best-effort basis, however. The routine that maintains this value
178 /// has a deadline so it doesn't wait forever if the database is being slow or returning errors.
179 ///
180 /// This value is clamped internally to not exceed [`max_connections`].
181 ///
182 /// We've chosen not to assert `min_connections <= max_connections` anywhere
183 /// because it shouldn't break anything internally if the condition doesn't hold,
184 /// and if the application allows either value to be dynamically set
185 /// then it should be checking this condition itself and returning
186 /// a nicer error than a panic anyway.
187 ///
188 /// [`max_lifetime`]: Self::max_lifetime
189 /// [`idle_timeout`]: Self::idle_timeout
190 /// [`max_connections`]: Self::max_connections
191 pub fn min_connections(mut self, min: u32) -> Self {
192 self.min_connections = min;
193 self
194 }
195
196 /// Get the minimum number of connections to maintain at all times.
197 pub fn get_min_connections(&self) -> u32 {
198 self.min_connections
199 }
200
201 /// Set the maximum amount of time to spend waiting for a connection in [`Pool::acquire()`].
202 ///
203 /// Caps the total amount of time `Pool::acquire()` can spend waiting across multiple phases:
204 ///
205 /// * First, it may need to wait for a permit from the semaphore, which grants it the privilege
206 /// of opening a connection or popping one from the idle queue.
207 /// * If an existing idle connection is acquired, by default it will be checked for liveness
208 /// and integrity before being returned, which may require executing a command on the
209 /// connection. This can be disabled with [`test_before_acquire(false)`][Self::test_before_acquire].
210 /// * If [`before_acquire`][Self::before_acquire] is set, that will also be executed.
211 /// * If a new connection needs to be opened, that will obviously require I/O, handshaking,
212 /// and initialization commands.
213 /// * If [`after_connect`][Self::after_connect] is set, that will also be executed.
214 pub fn acquire_timeout(mut self, timeout: Duration) -> Self {
215 self.acquire_timeout = timeout;
216 self
217 }
218
219 /// Get the maximum amount of time to spend waiting for a connection in [`Pool::acquire()`].
220 pub fn get_acquire_timeout(&self) -> Duration {
221 self.acquire_timeout
222 }
223
224 /// Set the maximum lifetime of individual connections.
225 ///
226 /// Any connection with a lifetime greater than this will be closed.
227 ///
228 /// When set to `None`, all connections live until either reaped by [`idle_timeout`]
229 /// or explicitly disconnected.
230 ///
231 /// Infinite connections are not recommended due to the unfortunate reality of memory/resource
232 /// leaks on the database-side. It is better to retire connections periodically
233 /// (even if only once daily) to allow the database the opportunity to clean up data structures
234 /// (parse trees, query metadata caches, thread-local storage, etc.) that are associated with a
235 /// session.
236 ///
237 /// [`idle_timeout`]: Self::idle_timeout
238 pub fn max_lifetime(mut self, lifetime: impl Into<Option<Duration>>) -> Self {
239 self.max_lifetime = lifetime.into();
240 self
241 }
242
243 /// Get the maximum lifetime of individual connections.
244 pub fn get_max_lifetime(&self) -> Option<Duration> {
245 self.max_lifetime
246 }
247
248 /// Set a maximum idle duration for individual connections.
249 ///
250 /// Any connection that remains in the idle queue longer than this will be closed.
251 ///
252 /// For usage-based database server billing, this can be a cost saver.
253 pub fn idle_timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
254 self.idle_timeout = timeout.into();
255 self
256 }
257
258 /// Get the maximum idle duration for individual connections.
259 pub fn get_idle_timeout(&self) -> Option<Duration> {
260 self.idle_timeout
261 }
262
263 /// If true, the health of a connection will be verified by a call to [`Connection::ping`]
264 /// before returning the connection.
265 ///
266 /// Defaults to `true`.
267 pub fn test_before_acquire(mut self, test: bool) -> Self {
268 self.test_before_acquire = test;
269 self
270 }
271
272 /// Get's whether `test_before_acquire` is currently set.
273 pub fn get_test_before_acquire(&self) -> bool {
274 self.test_before_acquire
275 }
276
277 /// If set to `true`, calls to `acquire()` are fair and connections are issued
278 /// in first-come-first-serve order. If `false`, "drive-by" tasks may steal idle connections
279 /// ahead of tasks that have been waiting.
280 ///
281 /// According to `sqlx-bench/benches/pg_pool` this may slightly increase time
282 /// to `acquire()` at low pool contention but at very high contention it helps
283 /// avoid tasks at the head of the waiter queue getting repeatedly preempted by
284 /// these "drive-by" tasks and tasks further back in the queue timing out because
285 /// the queue isn't moving.
286 ///
287 /// Currently only exposed for benchmarking; `fair = true` seems to be the superior option
288 /// in most cases.
289 #[doc(hidden)]
290 pub fn __fair(mut self, fair: bool) -> Self {
291 self.fair = fair;
292 self
293 }
294
295 /// Perform an asynchronous action after connecting to the database.
296 ///
297 /// If the operation returns with an error then the error is logged, the connection is closed
298 /// and a new one is opened in its place and the callback is invoked again.
299 ///
300 /// This occurs in a backoff loop to avoid high CPU usage and spamming logs during a transient
301 /// error condition.
302 ///
303 /// Note that this may be called for internally opened connections, such as when maintaining
304 /// [`min_connections`][Self::min_connections], that are then immediately returned to the pool
305 /// without invoking [`after_release`][Self::after_release].
306 ///
307 /// # Example: Additional Parameters
308 /// This callback may be used to set additional configuration parameters
309 /// that are not exposed by the database's `ConnectOptions`.
310 ///
311 /// This example is written for PostgreSQL but can likely be adapted to other databases.
312 ///
313 /// ```no_run
314 /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
315 /// use sqlx::Executor;
316 /// use sqlx::postgres::PgPoolOptions;
317 ///
318 /// let pool = PgPoolOptions::new()
319 /// .after_connect(|conn, _meta| Box::pin(async move {
320 /// // When directly invoking `Executor` methods,
321 /// // it is possible to execute multiple statements with one call.
322 /// conn.execute("SET application_name = 'your_app'; SET search_path = 'my_schema';")
323 /// .await?;
324 ///
325 /// Ok(())
326 /// }))
327 /// .connect("postgres:// …").await?;
328 /// # Ok(())
329 /// # }
330 /// ```
331 ///
332 /// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self].
333 pub fn after_connect<F>(mut self, callback: F) -> Self
334 where
335 // We're passing the `PoolConnectionMetadata` here mostly for future-proofing.
336 // `age` and `idle_for` are obviously not useful for fresh connections.
337 for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<(), Error>>
338 + 'static
339 + Send
340 + Sync,
341 {
342 self.after_connect = Some(Arc::new(callback));
343 self
344 }
345
346 /// Perform an asynchronous action on a previously idle connection before giving it out.
347 ///
348 /// Alongside the connection, the closure gets [`PoolConnectionMetadata`] which contains
349 /// potentially useful information such as the connection's age and the duration it was
350 /// idle.
351 ///
352 /// If the operation returns `Ok(true)`, the connection is returned to the task that called
353 /// [`Pool::acquire`].
354 ///
355 /// If the operation returns `Ok(false)` or an error, the error is logged (if applicable)
356 /// and then the connection is closed and [`Pool::acquire`] tries again with another idle
357 /// connection. If it runs out of idle connections, it opens a new connection instead.
358 ///
359 /// This is *not* invoked for new connections. Use [`after_connect`][Self::after_connect]
360 /// for those.
361 ///
362 /// # Example: Custom `test_before_acquire` Logic
363 /// If you only want to ping connections if they've been idle a certain amount of time,
364 /// you can implement your own logic here:
365 ///
366 /// This example is written for Postgres but should be trivially adaptable to other databases.
367 /// ```no_run
368 /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
369 /// use sqlx::{Connection, Executor};
370 /// use sqlx::postgres::PgPoolOptions;
371 ///
372 /// let pool = PgPoolOptions::new()
373 /// .test_before_acquire(false)
374 /// .before_acquire(|conn, meta| Box::pin(async move {
375 /// // One minute
376 /// if meta.idle_for.as_secs() > 60 {
377 /// conn.ping().await?;
378 /// }
379 ///
380 /// Ok(true)
381 /// }))
382 /// .connect("postgres:// …").await?;
383 /// # Ok(())
384 /// # }
385 ///```
386 ///
387 /// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self].
388 pub fn before_acquire<F>(mut self, callback: F) -> Self
389 where
390 for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<bool, Error>>
391 + 'static
392 + Send
393 + Sync,
394 {
395 self.before_acquire = Some(Arc::new(callback));
396 self
397 }
398
399 /// Perform an asynchronous action on a connection before it is returned to the pool.
400 ///
401 /// Alongside the connection, the closure gets [`PoolConnectionMetadata`] which contains
402 /// potentially useful information such as the connection's age.
403 ///
404 /// If the operation returns `Ok(true)`, the connection is returned to the pool's idle queue.
405 /// If the operation returns `Ok(false)` or an error, the error is logged (if applicable)
406 /// and the connection is closed, allowing a task waiting on [`Pool::acquire`] to
407 /// open a new one in its place.
408 ///
409 /// # Example (Postgres): Close Memory-Hungry Connections
410 /// Instead of relying on [`max_lifetime`][Self::max_lifetime] to close connections,
411 /// we can monitor their memory usage directly and close any that have allocated too much.
412 ///
413 /// Note that this is purely an example showcasing a possible use for this callback
414 /// and may be flawed as it has not been tested.
415 ///
416 /// This example queries [`pg_backend_memory_contexts`](https://www.postgresql.org/docs/current/view-pg-backend-memory-contexts.html)
417 /// which is only allowed for superusers.
418 ///
419 /// ```no_run
420 /// # async fn f() -> Result<(), Box<dyn std::error::Error>> {
421 /// use sqlx::{Connection, Executor};
422 /// use sqlx::postgres::PgPoolOptions;
423 ///
424 /// let pool = PgPoolOptions::new()
425 /// // Let connections live as long as they want.
426 /// .max_lifetime(None)
427 /// .after_release(|conn, meta| Box::pin(async move {
428 /// // Only check connections older than 6 hours.
429 /// if meta.age.as_secs() < 6 * 60 * 60 {
430 /// return Ok(true);
431 /// }
432 ///
433 /// let total_memory_usage: i64 = sqlx::query_scalar(
434 /// "select sum(used_bytes) from pg_backend_memory_contexts"
435 /// )
436 /// .fetch_one(conn)
437 /// .await?;
438 ///
439 /// // Close the connection if the backend memory usage exceeds 256 MiB.
440 /// Ok(total_memory_usage <= (2 << 28))
441 /// }))
442 /// .connect("postgres:// …").await?;
443 /// # Ok(())
444 /// # }
445 pub fn after_release<F>(mut self, callback: F) -> Self
446 where
447 for<'c> F: Fn(&'c mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'c, Result<bool, Error>>
448 + 'static
449 + Send
450 + Sync,
451 {
452 self.after_release = Some(Arc::new(callback));
453 self
454 }
455
456 /// Set the parent `Pool` from which the new pool will inherit its semaphore.
457 ///
458 /// This is currently an internal-only API.
459 ///
460 /// ### Panics
461 /// If `self.max_connections` is greater than the setting the given pool was created with,
462 /// or `self.fair` differs from the setting the given pool was created with.
463 #[doc(hidden)]
464 pub fn parent(mut self, pool: Pool<DB>) -> Self {
465 self.parent_pool = Some(pool);
466 self
467 }
468
469 /// Create a new pool from this `PoolOptions` and immediately open at least one connection.
470 ///
471 /// This ensures the configuration is correct.
472 ///
473 /// The total number of connections opened is <code>max(1, [min_connections][Self::min_connections])</code>.
474 ///
475 /// Refer to the relevant `ConnectOptions` impl for your database for the expected URL format:
476 ///
477 /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions]
478 /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions]
479 /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions]
480 /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions]
481 pub async fn connect(self, url: &str) -> Result<Pool<DB>, Error> {
482 self.connect_with(url.parse()?).await
483 }
484
485 /// Create a new pool from this `PoolOptions` and immediately open at least one connection.
486 ///
487 /// This ensures the configuration is correct.
488 ///
489 /// The total number of connections opened is <code>max(1, [min_connections][Self::min_connections])</code>.
490 pub async fn connect_with(
491 self,
492 options: <DB::Connection as Connection>::Options,
493 ) -> Result<Pool<DB>, Error> {
494 // Don't take longer than `acquire_timeout` starting from when this is called.
495 let deadline = Instant::now() + self.acquire_timeout;
496
497 let inner = PoolInner::new_arc(self, options);
498
499 if inner.options.min_connections > 0 {
500 // If the idle reaper is spawned then this will race with the call from that task
501 // and may not report any connection errors.
502 inner.try_min_connections(deadline).await?;
503 }
504
505 // If `min_connections` is nonzero then we'll likely just pull a connection
506 // from the idle queue here, but it should at least get tested first.
507 let conn = inner.acquire().await?;
508 inner.release(conn);
509
510 Ok(Pool(inner))
511 }
512
513 /// Create a new pool from this `PoolOptions`, but don't open any connections right now.
514 ///
515 /// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to
516 /// optimistically establish that many connections for the pool.
517 ///
518 /// Refer to the relevant `ConnectOptions` impl for your database for the expected URL format:
519 ///
520 /// * Postgres: [`PgConnectOptions`][crate::postgres::PgConnectOptions]
521 /// * MySQL: [`MySqlConnectOptions`][crate::mysql::MySqlConnectOptions]
522 /// * SQLite: [`SqliteConnectOptions`][crate::sqlite::SqliteConnectOptions]
523 /// * MSSQL: [`MssqlConnectOptions`][crate::mssql::MssqlConnectOptions]
524 pub fn connect_lazy(self, url: &str) -> Result<Pool<DB>, Error> {
525 Ok(self.connect_lazy_with(url.parse()?))
526 }
527
528 /// Create a new pool from this `PoolOptions`, but don't open any connections right now.
529 ///
530 /// If [`min_connections`][Self::min_connections] is set, a background task will be spawned to
531 /// optimistically establish that many connections for the pool.
532 pub fn connect_lazy_with(self, options: <DB::Connection as Connection>::Options) -> Pool<DB> {
533 // `min_connections` is guaranteed by the idle reaper now.
534 Pool(PoolInner::new_arc(self, options))
535 }
536}
537
538impl<DB: Database> Debug for PoolOptions<DB> {
539 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
540 f.debug_struct("PoolOptions")
541 .field("max_connections", &self.max_connections)
542 .field("min_connections", &self.min_connections)
543 .field("connect_timeout", &self.acquire_timeout)
544 .field("max_lifetime", &self.max_lifetime)
545 .field("idle_timeout", &self.idle_timeout)
546 .field("test_before_acquire", &self.test_before_acquire)
547 .finish()
548 }
549}