quaint_forked/pooled.rs
1//! # A connection pool to a SQL database.
2//!
3//! A pool is created through the [`builder`] method, starting from a connection
4//! string that allows some of the parameters be delivered by the user.
5//!
6//! A connection string has the following structure:
7//!
8//! `connector_type://user:password@host/database?parameters`
9//!
10//! Connector type can be one of the following:
11//!
12//! - `file` opens an SQLite connection.
13//! - `mysql` opens a MySQL connection.
14//! - `postgres`/`postgresql` opens a PostgreSQL connection.
15//!
16//! All parameters should be given in the query string format:
17//! `?key1=val1&key2=val2`. All parameters are optional.
18//!
19//! As a special case, Microsoft SQL Server connections use the JDBC URI
20//! format:
21//!
22//! `jdbc:sqlserver://host\instance:port;key1=val1;key2=val2;`
23//!
24//! ## Common parameters
25//!
26//! - `connection_limit` defines the maximum number of connections opened to the
27//! database.
28//!
29//! ## SQLite
30//!
31//! - `user`/`password` do not do anything and can be emitted.
32//! - `host` should point to the database file.
33//! - `db_name` parameter should give a name to the database attached for
34//! query namespacing.
35//! - `socket_timeout` defined in seconds. Acts as the busy timeout in
36//! SQLite. When set, queries that are waiting for a lock to be released
37//! will return the `Timeout` error after the defined value.
38//!
39//! ## PostgreSQL
40//!
41//! - `sslmode` either `disable`, `prefer` or `require`. [Read more](https://docs.rs/tokio-postgres/0.5.0-alpha.1/tokio_postgres/config/enum.SslMode.html)
42//! - `sslcert` should point to a PEM certificate file.
43//! - `sslidentity` should point to a PKCS12 certificate database.
44//! - `sslpassword` the password to open the PKCS12 database.
45//! - `sslaccept` either `strict` or `accept_invalid_certs`. If strict, the
46//! certificate needs to be valid and in the CA certificates.
47//! `accept_invalid_certs` accepts any certificate from the server and can
48//! lead to weakened security. Defaults to `accept_invalid_certs`.
49//! - `schema` the default search path.
50//! - `host` additionally the host can be given as a parameter, typically in
51//! cases when connectiong to the database through a unix socket to
52//! separate the database name from the database path, such as
53//! `postgresql:///dbname?host=/var/run/postgresql`.
54//! - `socket_timeout` defined in seconds. If set, a query will return a
55//! `Timeout` error if it fails to resolve before given time.
56//! - `connect_timeout` defined in seconds. Connecting to a
57//! database will return a `ConnectTimeout` error if taking more than the
58//! defined value. Defaults to 5 seconds, if set to 0, no timeout.
59//! - `pool_timeout` defined in seconds. If all connections are in use, the
60//! database will return a `PoolTimeout` error after waiting for the given time.
61//! If set to zero, no timeout.
62//! - `pgbouncer` either `true` or `false`. If set, allows usage with the
63//! pgBouncer connection pool in transaction mode. Additionally a transaction
64//! is required for every query for the mode to work. When starting a new
65//! transaction, a deallocation query `DEALLOCATE ALL` is executed right after
66//! `BEGIN` to avoid possible collisions with statements created in other
67//! sessions.
68//! - `statement_cache_size`, number of prepared statements kept cached.
69//! Defaults to 500. If `pgbouncer` mode is enabled, caching is always off.
70//! - `options` Specifies command-line options to send to the server at connection start. [Read more](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNECT-OPTIONS)
71//!
72//! ## MySQL
73//!
74//! - `sslcert` should point to a PEM certificate file.
75//! - `sslidentity` should point to a PKCS12 certificate database.
76//! - `sslpassword` the password to open the PKCS12 database.
77//! - `sslaccept` either `strict` or `accept_invalid_certs`. If strict, the
78//! certificate needs to be valid and in the CA certificates.
79//! `accept_invalid_certs` accepts any certificate from the server and can
80//! lead to weakened security. Defaults to `strict`.
81//! - `socket` needed when connecting to MySQL database through a unix
82//! socket. When set, the host parameter is dismissed.
83//! - `socket_timeout` defined in seconds. If set, a query will return a
84//! `Timeout` error if it fails to resolve before given time.
85//! - `connect_timeout` defined in seconds. Connecting to a
86//! database will return a `ConnectTimeout` error if taking more than the
87//! defined value. Defaults to 5 seconds, if set to 0, no timeout.
88//! - `pool_timeout` defined in seconds. If all connections are in use, the
89//! database will return a `PoolTimeout` error after waiting for the given time.
90//! If set to zero, no timeout.
91//! - `statement_cache_size`, number of prepared statements kept cached.
92//! Defaults to 1000. Set to 0 to disable caching.
93//!
94//! ## Microsoft SQL Server
95//!
96//! - `encrypt` if set to `true` encrypts all traffic over TLS. If `false`, only
97//! the login details are encrypted. A special value `DANGER_PLAINTEXT` will
98//! disable TLS completely, including sending login credentials as plaintext.
99//! - `user` sets the login name.
100//! - `password` sets the login password.
101//! - `database` sets the database to connect to.
102//! - `trustServerCertificate` if set to `true`, accepts any kind of certificate
103//! from the server.
104//! - `trustServerCertificateCA` sets the path to a custom certificate file.
105//! Needs to be in pem, crt or der format. Cannot be used together with
106//! `trustServerCertificate` parameter.
107//! - `socketTimeout` defined in seconds. If set, a query will return a
108//! `Timeout` error if it fails to resolve before given time.
109//! - `connectTimeout` defined in seconds (default: 5). Connecting to a
110//! database will return a `ConnectTimeout` error if taking more than the
111//! defined value. Defaults to 5 seconds, disabled if set to zero.
112//! - `poolTimeout` defined in seconds. If all connections are in use, the
113//! database will return a `Timeout` error after waiting for the given time.
114//! If set to zero, no timeout.
115//! - `connectionLimit` defines the maximum number of connections opened to the
116//! database.
117//! - `schema` the name of the lookup schema. Only stored to the connection,
118//! must be used in every query to be effective.
119//! - `isolationLevel` the transaction isolation level. Possible values:
120//! `READ UNCOMMITTED`, `READ COMMITTED`, `REPEATABLE READ`, `SNAPSHOT`,
121//! `SERIALIZABLE`.
122//!
123//! To create a new `Quaint` pool connecting to a PostgreSQL database:
124//!
125//! ``` no_run
126//! use quaint::{prelude::*, pooled::Quaint};
127//! use std::time::Duration;
128//!
129//! #[tokio::main]
130//! async fn main() -> Result<(), quaint::error::Error> {
131//! let mut builder = Quaint::builder("postgresql://postgres:password@localhost:5432/postgres")?;
132//! builder.connection_limit(5);
133//! builder.max_idle_lifetime(Duration::from_secs(300));
134//! builder.test_on_check_out(true);
135//!
136//! let pool = builder.build();
137//! let conn = pool.check_out().await?;
138//! let result = conn.select(Select::default().value(1)).await?;
139//!
140//! assert_eq!(
141//! Some(1),
142//! result.into_iter().nth(0).and_then(|row| row[0].as_i64()),
143//! );
144//!
145//! Ok(())
146//! }
147//! ```
148//!
149//! [`builder`]: struct.Quaint.html#method.builder
150
151mod manager;
152
153pub use manager::*;
154
155use crate::{
156 connector::ConnectionInfo,
157 error::{Error, ErrorKind},
158};
159use mobc_forked::Pool;
160use std::{sync::Arc, time::Duration};
161
162#[cfg(feature = "sqlite")]
163use std::convert::TryFrom;
164
165/// The main entry point and an abstraction over database connections and
166/// connection handling.
167#[derive(Clone)]
168pub struct Quaint {
169 pub(crate) inner: Pool<QuaintManager>,
170 connection_info: Arc<ConnectionInfo>,
171 pool_timeout: Option<Duration>,
172}
173
174/// A `Builder` to construct an instance of a [`Quaint`] pool.
175///
176/// [`Quaint`]: pooled.Quaint
177pub struct Builder {
178 manager: QuaintManager,
179 connection_info: ConnectionInfo,
180 connection_limit: usize,
181 max_idle: Option<u64>,
182 max_idle_lifetime: Option<Duration>,
183 max_lifetime: Option<Duration>,
184 health_check_interval: Option<Duration>,
185 test_on_check_out: bool,
186 pool_timeout: Option<Duration>,
187}
188
189impl Builder {
190 fn new(url: &str, manager: QuaintManager) -> crate::Result<Self> {
191 let connection_limit = num_cpus::get_physical() * 2 + 1;
192 let connection_info = ConnectionInfo::from_url(url)?;
193
194 Ok(Self {
195 manager,
196 connection_info,
197 connection_limit,
198 max_idle: None,
199 max_idle_lifetime: None,
200 max_lifetime: None,
201 health_check_interval: None,
202 test_on_check_out: false,
203 pool_timeout: None,
204 })
205 }
206
207 /// The maximum number of connections in the pool.
208 ///
209 /// - Defaults to two times the number of physical cores plus one.
210 pub fn connection_limit(&mut self, connection_limit: usize) {
211 self.connection_limit = connection_limit;
212 }
213
214 /// The maximum number of idle connections the pool can contain at the same time. If a
215 /// connection goes idle (a query returns) and there are already this number of idle connections
216 /// in the pool, a connection will be closed immediately. Consider using `max_idle_lifetime` to
217 /// close idle connections less aggressively.
218 ///
219 /// - Defaults to the same value as `connection_limit`.
220 pub fn max_idle(&mut self, max_idle: u64) {
221 self.max_idle = Some(max_idle);
222 }
223
224 /// A timeout for acquiring a connection with the [`check_out`] method. If
225 /// not set, the method never times out.
226 ///
227 /// # Panics
228 ///
229 /// Panics if `pool_timeout` is zero.
230 ///
231 /// [`check_out`]: struct.Quaint.html#method.check_out
232 pub fn pool_timeout(&mut self, pool_timeout: Duration) {
233 assert_ne!(pool_timeout, Duration::from_secs(0), "pool_timeout must be positive");
234
235 self.pool_timeout = Some(pool_timeout);
236 }
237
238 /// A time how long a connection can be kept in the pool before
239 /// replaced with a new one. The reconnect happens in the next
240 /// [`check_out`].
241 ///
242 /// - Defaults to not set, meaning connections are kept forever.
243 ///
244 /// # Panics
245 ///
246 /// Panics if `max_lifetime` is zero.
247 ///
248 /// [`check_out`]: struct.Quaint.html#method.check_out
249 pub fn max_lifetime(&mut self, max_lifetime: Duration) {
250 self.max_lifetime = Some(max_lifetime);
251 }
252
253 /// A time how long an idling connection can be kept in the pool before
254 /// replaced with a new one. The reconnect happens in the next
255 /// [`check_out`].
256 ///
257 /// - Defaults to 300 seconds
258 ///
259 /// # Panics
260 ///
261 /// Panics if `max_idle_lifetime` is zero.
262 ///
263 /// [`check_out`]: struct.Quaint.html#method.check_out
264 pub fn max_idle_lifetime(&mut self, max_idle_lifetime: Duration) {
265 self.max_idle_lifetime = Some(max_idle_lifetime);
266 }
267
268 /// Perform a health check before returning a connection from the
269 /// [`check_out`]. If the health check fails, a few reconnects are tried
270 /// before returning the error and dropping the broken connection from the
271 /// pool.
272 ///
273 /// - Defaults to `false`, meaning connections are never tested on
274 /// `check_out`.
275 ///
276 /// [`check_out`]: struct.Quaint.html#method.check_out
277 pub fn test_on_check_out(&mut self, test_on_check_out: bool) {
278 self.test_on_check_out = test_on_check_out;
279 }
280
281 /// Sets the interval how often a connection health will be tested when
282 /// checking out from the pool. Must be used together with
283 /// [`test_on_check_out`] set to `true`, otherwise does nothing.
284 ///
285 /// - Defaults to not set, meaning a test is performed on every `check_out`.
286 ///
287 /// # Panics
288 ///
289 /// Panics if `health_check_interval` is zero.
290 ///
291 /// [`test_on_check_out`]: #method.test_on_check_out
292 pub fn health_check_interval(&mut self, health_check_interval: Duration) {
293 self.health_check_interval = Some(health_check_interval);
294 }
295
296 /// Consume the builder and create a new instance of a pool.
297 pub fn build(self) -> Quaint {
298 let connection_info = Arc::new(self.connection_info);
299 Self::log_start(&connection_info, self.connection_limit);
300
301 let inner = Pool::builder()
302 .max_open(if let Some(file_path) = connection_info.file_path() {
303 if file_path == ":memory:" {
304 1
305 } else {
306 self.connection_limit as u64
307 }
308 } else {
309 self.connection_limit as u64
310 })
311 .max_idle(
312 if let Some(file_path) = connection_info.file_path() {
313 if file_path == ":memory:" {
314 1
315 } else {
316 self.max_idle.unwrap_or(self.connection_limit as u64)
317 }
318 } else {
319 self.max_idle.unwrap_or(self.connection_limit as u64)
320 }
321 )
322 .max_idle_lifetime(self.max_idle_lifetime)
323 .max_lifetime(self.max_lifetime)
324 .get_timeout(None) // we handle timeouts here
325 .health_check_interval(self.health_check_interval)
326 .test_on_check_out(self.test_on_check_out)
327 .build(self.manager);
328
329 Quaint {
330 inner,
331 connection_info,
332 pool_timeout: self.pool_timeout,
333 }
334 }
335
336 fn log_start(info: &ConnectionInfo, connection_limit: usize) {
337 let family = info.sql_family();
338 let pg_bouncer = if info.pg_bouncer() { " in PgBouncer mode" } else { "" };
339
340 tracing::info!(
341 "Starting a {} pool with {} connections{}.",
342 family,
343 connection_limit,
344 pg_bouncer
345 );
346 }
347}
348
349impl Quaint {
350 /// Creates a new builder for a Quaint connection pool with the given
351 /// connection string. See the [module level documentation] for details.
352 ///
353 /// [module level documentation]: index.html
354 pub fn builder(url_str: &str) -> crate::Result<Builder> {
355 match url_str {
356 #[cfg(feature = "sqlite")]
357 s if s.starts_with("file") || s.starts_with("sqlite") => {
358 let params = crate::connector::SqliteParams::try_from(s)?;
359
360 let manager = QuaintManager::Sqlite {
361 url: s.to_string(),
362 db_name: params.db_name,
363 };
364
365 let mut builder = Builder::new(s, manager)?;
366
367 if let Some(limit) = params.connection_limit {
368 builder.connection_limit(limit);
369 }
370
371 if let Some(max_lifetime) = params.max_connection_lifetime {
372 builder.max_lifetime(max_lifetime);
373 }
374
375 if let Some(max_idle_lifetime) = params.max_idle_connection_lifetime {
376 builder.max_idle_lifetime(max_idle_lifetime);
377 }
378
379 Ok(builder)
380 }
381 #[cfg(feature = "mysql")]
382 s if s.starts_with("mysql") => {
383 let url = crate::connector::MysqlUrl::new(url::Url::parse(s)?)?;
384 let connection_limit = url.connection_limit();
385 let pool_timeout = url.pool_timeout();
386 let max_connection_lifetime = url.max_connection_lifetime();
387 let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
388
389 let manager = QuaintManager::Mysql { url };
390 let mut builder = Builder::new(s, manager)?;
391
392 if let Some(limit) = connection_limit {
393 builder.connection_limit(limit);
394 }
395
396 if let Some(timeout) = pool_timeout {
397 builder.pool_timeout(timeout);
398 }
399
400 if let Some(max_lifetime) = max_connection_lifetime {
401 builder.max_lifetime(max_lifetime);
402 }
403
404 if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
405 builder.max_idle_lifetime(max_idle_lifetime);
406 }
407
408 Ok(builder)
409 }
410 #[cfg(feature = "postgresql")]
411 s if s.starts_with("postgres") || s.starts_with("postgresql") => {
412 let url = crate::connector::PostgresUrl::new(url::Url::parse(s)?)?;
413 let connection_limit = url.connection_limit();
414 let pool_timeout = url.pool_timeout();
415 let max_connection_lifetime = url.max_connection_lifetime();
416 let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
417
418 let manager = QuaintManager::Postgres { url };
419 let mut builder = Builder::new(s, manager)?;
420
421 if let Some(limit) = connection_limit {
422 builder.connection_limit(limit);
423 }
424
425 if let Some(timeout) = pool_timeout {
426 builder.pool_timeout(timeout);
427 }
428
429 if let Some(max_lifetime) = max_connection_lifetime {
430 builder.max_lifetime(max_lifetime);
431 }
432
433 if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
434 builder.max_idle_lifetime(max_idle_lifetime);
435 }
436
437 Ok(builder)
438 }
439 #[cfg(feature = "mssql")]
440 s if s.starts_with("jdbc:sqlserver") || s.starts_with("sqlserver") => {
441 let url = crate::connector::MssqlUrl::new(s)?;
442 let connection_limit = url.connection_limit();
443 let pool_timeout = url.pool_timeout();
444 let max_connection_lifetime = url.max_connection_lifetime();
445 let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
446
447 let manager = QuaintManager::Mssql { url };
448 let mut builder = Builder::new(s, manager)?;
449
450 if let Some(limit) = connection_limit {
451 builder.connection_limit(limit);
452 }
453
454 if let Some(timeout) = pool_timeout {
455 builder.pool_timeout(timeout);
456 }
457
458 if let Some(max_lifetime) = max_connection_lifetime {
459 builder.max_lifetime(max_lifetime);
460 }
461
462 if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
463 builder.max_idle_lifetime(max_idle_lifetime);
464 }
465
466 Ok(builder)
467 }
468 _ => unimplemented!("Supported url schemes: file or sqlite, mysql, postgres or postgresql."),
469 }
470 }
471
472 /// The number of connections in the pool.
473 pub async fn capacity(&self) -> u32 {
474 self.inner.state().await.max_open as u32
475 }
476
477 /// Reserve a connection from the pool.
478 pub async fn check_out(&self) -> crate::Result<PooledConnection> {
479 let res = match self.pool_timeout {
480 Some(duration) => crate::connector::metrics::check_out(self.inner.get_timeout(duration)).await,
481 None => crate::connector::metrics::check_out(self.inner.get()).await,
482 };
483
484 let inner = match res {
485 Ok(conn) => conn,
486 Err(mobc_forked::Error::PoolClosed) => return Err(Error::builder(ErrorKind::PoolClosed {}).build()),
487 Err(mobc_forked::Error::Timeout) => {
488 let state = self.inner.state().await;
489 // We can use unwrap here because a pool timeout has to be set to use a connection pool
490 let timeout_duration = self.pool_timeout.unwrap();
491 return Err(
492 Error::builder(ErrorKind::pool_timeout(state.max_open, state.in_use, timeout_duration)).build(),
493 );
494 }
495 Err(mobc_forked::Error::Inner(e)) => return Err(e),
496 Err(e @ mobc_forked::Error::BadConn) => {
497 let error = Error::builder(ErrorKind::ConnectionError(Box::new(e))).build();
498 return Err(error);
499 }
500 };
501
502 Ok(PooledConnection { inner })
503 }
504
505 /// Info about the connection and underlying database.
506 pub fn connection_info(&self) -> &ConnectionInfo {
507 &self.connection_info
508 }
509}