quaint_forked/
pooled.rs

1//! # A connection pool to a SQL database.
2//!
3//! A pool is created through the [`builder`] method, starting from a connection
4//! string that allows some of the parameters be delivered by the user.
5//!
6//! A connection string has the following structure:
7//!
8//! `connector_type://user:password@host/database?parameters`
9//!
10//! Connector type can be one of the following:
11//!
12//! - `file` opens an SQLite connection.
13//! - `mysql` opens a MySQL connection.
14//! - `postgres`/`postgresql` opens a PostgreSQL connection.
15//!
16//! All parameters should be given in the query string format:
17//! `?key1=val1&key2=val2`. All parameters are optional.
18//!
19//! As a special case, Microsoft SQL Server connections use the JDBC URI
20//! format:
21//!
22//! `jdbc:sqlserver://host\instance:port;key1=val1;key2=val2;`
23//!
24//! ## Common parameters
25//!
26//! - `connection_limit` defines the maximum number of connections opened to the
27//!   database.
28//!
29//! ## SQLite
30//!
31//! - `user`/`password` do not do anything and can be emitted.
32//! - `host` should point to the database file.
33//! - `db_name` parameter should give a name to the database attached for
34//!   query namespacing.
35//! - `socket_timeout` defined in seconds. Acts as the busy timeout in
36//!   SQLite. When set, queries that are waiting for a lock to be released
37//!   will return the `Timeout` error after the defined value.
38//!
39//! ## PostgreSQL
40//!
41//! - `sslmode` either `disable`, `prefer` or `require`. [Read more](https://docs.rs/tokio-postgres/0.5.0-alpha.1/tokio_postgres/config/enum.SslMode.html)
42//! - `sslcert` should point to a PEM certificate file.
43//! - `sslidentity` should point to a PKCS12 certificate database.
44//! - `sslpassword` the password to open the PKCS12 database.
45//! - `sslaccept` either `strict` or `accept_invalid_certs`. If strict, the
46//!   certificate needs to be valid and in the CA certificates.
47//!   `accept_invalid_certs` accepts any certificate from the server and can
48//!   lead to weakened security. Defaults to `accept_invalid_certs`.
49//! - `schema` the default search path.
50//! - `host` additionally the host can be given as a parameter, typically in
51//!   cases when connectiong to the database through a unix socket to
52//!   separate the database name from the database path, such as
53//!   `postgresql:///dbname?host=/var/run/postgresql`.
54//! - `socket_timeout` defined in seconds. If set, a query will return a
55//!   `Timeout` error if it fails to resolve before given time.
56//! - `connect_timeout` defined in seconds. Connecting to a
57//!   database will return a `ConnectTimeout` error if taking more than the
58//!   defined value. Defaults to 5 seconds, if set to 0, no timeout.
59//! - `pool_timeout` defined in seconds. If all connections are in use, the
60//!   database will return a `PoolTimeout` error after waiting for the given time.
61//!   If set to zero, no timeout.
62//! - `pgbouncer` either `true` or `false`. If set, allows usage with the
63//!   pgBouncer connection pool in transaction mode. Additionally a transaction
64//!   is required for every query for the mode to work. When starting a new
65//!   transaction, a deallocation query `DEALLOCATE ALL` is executed right after
66//!   `BEGIN` to avoid possible collisions with statements created in other
67//!   sessions.
68//! - `statement_cache_size`, number of prepared statements kept cached.
69//!   Defaults to 500. If `pgbouncer` mode is enabled, caching is always off.
70//! - `options` Specifies command-line options to send to the server at connection start. [Read more](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNECT-OPTIONS)
71//!
72//! ## MySQL
73//!
74//! - `sslcert` should point to a PEM certificate file.
75//! - `sslidentity` should point to a PKCS12 certificate database.
76//! - `sslpassword` the password to open the PKCS12 database.
77//! - `sslaccept` either `strict` or `accept_invalid_certs`. If strict, the
78//!   certificate needs to be valid and in the CA certificates.
79//!   `accept_invalid_certs` accepts any certificate from the server and can
80//!   lead to weakened security. Defaults to `strict`.
81//! - `socket` needed when connecting to MySQL database through a unix
82//!   socket. When set, the host parameter is dismissed.
83//! - `socket_timeout` defined in seconds. If set, a query will return a
84//!   `Timeout` error if it fails to resolve before given time.
85//! - `connect_timeout` defined in seconds. Connecting to a
86//!   database will return a `ConnectTimeout` error if taking more than the
87//!   defined value. Defaults to 5 seconds, if set to 0, no timeout.
88//! - `pool_timeout` defined in seconds. If all connections are in use, the
89//!   database will return a `PoolTimeout` error after waiting for the given time.
90//!   If set to zero, no timeout.
91//! - `statement_cache_size`, number of prepared statements kept cached.
92//!   Defaults to 1000. Set to 0 to disable caching.
93//!
94//! ## Microsoft SQL Server
95//!
96//! - `encrypt` if set to `true` encrypts all traffic over TLS. If `false`, only
97//!   the login details are encrypted. A special value `DANGER_PLAINTEXT` will
98//!   disable TLS completely, including sending login credentials as plaintext.
99//! - `user` sets the login name.
100//! - `password` sets the login password.
101//! - `database` sets the database to connect to.
102//! - `trustServerCertificate` if set to `true`, accepts any kind of certificate
103//!   from the server.
104//! - `trustServerCertificateCA` sets the path to a custom certificate file.
105//!   Needs to be in pem, crt or der format. Cannot be used together with
106//!   `trustServerCertificate` parameter.
107//! - `socketTimeout` defined in seconds. If set, a query will return a
108//!   `Timeout` error if it fails to resolve before given time.
109//! - `connectTimeout` defined in seconds (default: 5). Connecting to a
110//!   database will return a `ConnectTimeout` error if taking more than the
111//!   defined value. Defaults to 5 seconds, disabled if set to zero.
112//! - `poolTimeout` defined in seconds. If all connections are in use, the
113//!   database will return a `Timeout` error after waiting for the given time.
114//!   If set to zero, no timeout.
115//! - `connectionLimit` defines the maximum number of connections opened to the
116//!   database.
117//! - `schema` the name of the lookup schema. Only stored to the connection,
118//!   must be used in every query to be effective.
119//! - `isolationLevel` the transaction isolation level. Possible values:
120//!   `READ UNCOMMITTED`, `READ COMMITTED`, `REPEATABLE READ`, `SNAPSHOT`,
121//!   `SERIALIZABLE`.
122//!
123//! To create a new `Quaint` pool connecting to a PostgreSQL database:
124//!
125//! ``` no_run
126//! use quaint::{prelude::*, pooled::Quaint};
127//! use std::time::Duration;
128//!
129//! #[tokio::main]
130//! async fn main() -> Result<(), quaint::error::Error> {
131//!     let mut builder = Quaint::builder("postgresql://postgres:password@localhost:5432/postgres")?;
132//!     builder.connection_limit(5);
133//!     builder.max_idle_lifetime(Duration::from_secs(300));
134//!     builder.test_on_check_out(true);
135//!
136//!     let pool = builder.build();
137//!     let conn = pool.check_out().await?;
138//!     let result = conn.select(Select::default().value(1)).await?;
139//!
140//!     assert_eq!(
141//!         Some(1),
142//!         result.into_iter().nth(0).and_then(|row| row[0].as_i64()),
143//!     );
144//!
145//!     Ok(())
146//! }
147//! ```
148//!
149//! [`builder`]: struct.Quaint.html#method.builder
150
151mod manager;
152
153pub use manager::*;
154
155use crate::{
156    connector::ConnectionInfo,
157    error::{Error, ErrorKind},
158};
159use mobc_forked::Pool;
160use std::{sync::Arc, time::Duration};
161
162#[cfg(feature = "sqlite")]
163use std::convert::TryFrom;
164
165/// The main entry point and an abstraction over database connections and
166/// connection handling.
167#[derive(Clone)]
168pub struct Quaint {
169    pub(crate) inner: Pool<QuaintManager>,
170    connection_info: Arc<ConnectionInfo>,
171    pool_timeout: Option<Duration>,
172}
173
174/// A `Builder` to construct an instance of a [`Quaint`] pool.
175///
176/// [`Quaint`]: pooled.Quaint
177pub struct Builder {
178    manager: QuaintManager,
179    connection_info: ConnectionInfo,
180    connection_limit: usize,
181    max_idle: Option<u64>,
182    max_idle_lifetime: Option<Duration>,
183    max_lifetime: Option<Duration>,
184    health_check_interval: Option<Duration>,
185    test_on_check_out: bool,
186    pool_timeout: Option<Duration>,
187}
188
189impl Builder {
190    fn new(url: &str, manager: QuaintManager) -> crate::Result<Self> {
191        let connection_limit = num_cpus::get_physical() * 2 + 1;
192        let connection_info = ConnectionInfo::from_url(url)?;
193
194        Ok(Self {
195            manager,
196            connection_info,
197            connection_limit,
198            max_idle: None,
199            max_idle_lifetime: None,
200            max_lifetime: None,
201            health_check_interval: None,
202            test_on_check_out: false,
203            pool_timeout: None,
204        })
205    }
206
207    /// The maximum number of connections in the pool.
208    ///
209    /// - Defaults to two times the number of physical cores plus one.
210    pub fn connection_limit(&mut self, connection_limit: usize) {
211        self.connection_limit = connection_limit;
212    }
213
214    /// The maximum number of idle connections the pool can contain at the same time. If a
215    /// connection goes idle (a query returns) and there are already this number of idle connections
216    /// in the pool, a connection will be closed immediately. Consider using `max_idle_lifetime` to
217    /// close idle connections less aggressively.
218    ///
219    /// - Defaults to the same value as `connection_limit`.
220    pub fn max_idle(&mut self, max_idle: u64) {
221        self.max_idle = Some(max_idle);
222    }
223
224    /// A timeout for acquiring a connection with the [`check_out`] method. If
225    /// not set, the method never times out.
226    ///
227    /// # Panics
228    ///
229    /// Panics if `pool_timeout` is zero.
230    ///
231    /// [`check_out`]: struct.Quaint.html#method.check_out
232    pub fn pool_timeout(&mut self, pool_timeout: Duration) {
233        assert_ne!(pool_timeout, Duration::from_secs(0), "pool_timeout must be positive");
234
235        self.pool_timeout = Some(pool_timeout);
236    }
237
238    /// A time how long a connection can be kept in the pool before
239    /// replaced with a new one. The reconnect happens in the next
240    /// [`check_out`].
241    ///
242    /// - Defaults to not set, meaning connections are kept forever.
243    ///
244    /// # Panics
245    ///
246    /// Panics if `max_lifetime` is zero.
247    ///
248    /// [`check_out`]: struct.Quaint.html#method.check_out
249    pub fn max_lifetime(&mut self, max_lifetime: Duration) {
250        self.max_lifetime = Some(max_lifetime);
251    }
252
253    /// A time how long an idling connection can be kept in the pool before
254    /// replaced with a new one. The reconnect happens in the next
255    /// [`check_out`].
256    ///
257    /// - Defaults to 300 seconds
258    ///
259    /// # Panics
260    ///
261    /// Panics if `max_idle_lifetime` is zero.
262    ///
263    /// [`check_out`]: struct.Quaint.html#method.check_out
264    pub fn max_idle_lifetime(&mut self, max_idle_lifetime: Duration) {
265        self.max_idle_lifetime = Some(max_idle_lifetime);
266    }
267
268    /// Perform a health check before returning a connection from the
269    /// [`check_out`]. If the health check fails, a few reconnects are tried
270    /// before returning the error and dropping the broken connection from the
271    /// pool.
272    ///
273    /// - Defaults to `false`, meaning connections are never tested on
274    /// `check_out`.
275    ///
276    /// [`check_out`]: struct.Quaint.html#method.check_out
277    pub fn test_on_check_out(&mut self, test_on_check_out: bool) {
278        self.test_on_check_out = test_on_check_out;
279    }
280
281    /// Sets the interval how often a connection health will be tested when
282    /// checking out from the pool. Must be used together with
283    /// [`test_on_check_out`] set to `true`, otherwise does nothing.
284    ///
285    /// - Defaults to not set, meaning a test is performed on every `check_out`.
286    ///
287    /// # Panics
288    ///
289    /// Panics if `health_check_interval` is zero.
290    ///
291    /// [`test_on_check_out`]: #method.test_on_check_out
292    pub fn health_check_interval(&mut self, health_check_interval: Duration) {
293        self.health_check_interval = Some(health_check_interval);
294    }
295
296    /// Consume the builder and create a new instance of a pool.
297    pub fn build(self) -> Quaint {
298        let connection_info = Arc::new(self.connection_info);
299        Self::log_start(&connection_info, self.connection_limit);
300
301        let inner = Pool::builder()
302            .max_open(if let Some(file_path) = connection_info.file_path() {
303                if file_path == ":memory:" {
304                    1
305                } else {
306                    self.connection_limit as u64
307                }
308            } else {
309                self.connection_limit as u64
310            })
311            .max_idle(
312                if let Some(file_path) = connection_info.file_path() {
313                    if file_path == ":memory:" {
314                        1
315                    } else {
316                        self.max_idle.unwrap_or(self.connection_limit as u64)
317                    }
318                } else {
319                    self.max_idle.unwrap_or(self.connection_limit as u64)
320                }
321            )
322            .max_idle_lifetime(self.max_idle_lifetime)
323            .max_lifetime(self.max_lifetime)
324            .get_timeout(None) // we handle timeouts here
325            .health_check_interval(self.health_check_interval)
326            .test_on_check_out(self.test_on_check_out)
327            .build(self.manager);
328
329        Quaint {
330            inner,
331            connection_info,
332            pool_timeout: self.pool_timeout,
333        }
334    }
335
336    fn log_start(info: &ConnectionInfo, connection_limit: usize) {
337        let family = info.sql_family();
338        let pg_bouncer = if info.pg_bouncer() { " in PgBouncer mode" } else { "" };
339
340        tracing::info!(
341            "Starting a {} pool with {} connections{}.",
342            family,
343            connection_limit,
344            pg_bouncer
345        );
346    }
347}
348
349impl Quaint {
350    /// Creates a new builder for a Quaint connection pool with the given
351    /// connection string. See the [module level documentation] for details.
352    ///
353    /// [module level documentation]: index.html
354    pub fn builder(url_str: &str) -> crate::Result<Builder> {
355        match url_str {
356            #[cfg(feature = "sqlite")]
357            s if s.starts_with("file") || s.starts_with("sqlite") => {
358                let params = crate::connector::SqliteParams::try_from(s)?;
359
360                let manager = QuaintManager::Sqlite {
361                    url: s.to_string(),
362                    db_name: params.db_name,
363                };
364
365                let mut builder = Builder::new(s, manager)?;
366
367                if let Some(limit) = params.connection_limit {
368                    builder.connection_limit(limit);
369                }
370
371                if let Some(max_lifetime) = params.max_connection_lifetime {
372                    builder.max_lifetime(max_lifetime);
373                }
374
375                if let Some(max_idle_lifetime) = params.max_idle_connection_lifetime {
376                    builder.max_idle_lifetime(max_idle_lifetime);
377                }
378
379                Ok(builder)
380            }
381            #[cfg(feature = "mysql")]
382            s if s.starts_with("mysql") => {
383                let url = crate::connector::MysqlUrl::new(url::Url::parse(s)?)?;
384                let connection_limit = url.connection_limit();
385                let pool_timeout = url.pool_timeout();
386                let max_connection_lifetime = url.max_connection_lifetime();
387                let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
388
389                let manager = QuaintManager::Mysql { url };
390                let mut builder = Builder::new(s, manager)?;
391
392                if let Some(limit) = connection_limit {
393                    builder.connection_limit(limit);
394                }
395
396                if let Some(timeout) = pool_timeout {
397                    builder.pool_timeout(timeout);
398                }
399
400                if let Some(max_lifetime) = max_connection_lifetime {
401                    builder.max_lifetime(max_lifetime);
402                }
403
404                if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
405                    builder.max_idle_lifetime(max_idle_lifetime);
406                }
407
408                Ok(builder)
409            }
410            #[cfg(feature = "postgresql")]
411            s if s.starts_with("postgres") || s.starts_with("postgresql") => {
412                let url = crate::connector::PostgresUrl::new(url::Url::parse(s)?)?;
413                let connection_limit = url.connection_limit();
414                let pool_timeout = url.pool_timeout();
415                let max_connection_lifetime = url.max_connection_lifetime();
416                let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
417
418                let manager = QuaintManager::Postgres { url };
419                let mut builder = Builder::new(s, manager)?;
420
421                if let Some(limit) = connection_limit {
422                    builder.connection_limit(limit);
423                }
424
425                if let Some(timeout) = pool_timeout {
426                    builder.pool_timeout(timeout);
427                }
428
429                if let Some(max_lifetime) = max_connection_lifetime {
430                    builder.max_lifetime(max_lifetime);
431                }
432
433                if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
434                    builder.max_idle_lifetime(max_idle_lifetime);
435                }
436
437                Ok(builder)
438            }
439            #[cfg(feature = "mssql")]
440            s if s.starts_with("jdbc:sqlserver") || s.starts_with("sqlserver") => {
441                let url = crate::connector::MssqlUrl::new(s)?;
442                let connection_limit = url.connection_limit();
443                let pool_timeout = url.pool_timeout();
444                let max_connection_lifetime = url.max_connection_lifetime();
445                let max_idle_connection_lifetime = url.max_idle_connection_lifetime();
446
447                let manager = QuaintManager::Mssql { url };
448                let mut builder = Builder::new(s, manager)?;
449
450                if let Some(limit) = connection_limit {
451                    builder.connection_limit(limit);
452                }
453
454                if let Some(timeout) = pool_timeout {
455                    builder.pool_timeout(timeout);
456                }
457
458                if let Some(max_lifetime) = max_connection_lifetime {
459                    builder.max_lifetime(max_lifetime);
460                }
461
462                if let Some(max_idle_lifetime) = max_idle_connection_lifetime {
463                    builder.max_idle_lifetime(max_idle_lifetime);
464                }
465
466                Ok(builder)
467            }
468            _ => unimplemented!("Supported url schemes: file or sqlite, mysql, postgres or postgresql."),
469        }
470    }
471
472    /// The number of connections in the pool.
473    pub async fn capacity(&self) -> u32 {
474        self.inner.state().await.max_open as u32
475    }
476
477    /// Reserve a connection from the pool.
478    pub async fn check_out(&self) -> crate::Result<PooledConnection> {
479        let res = match self.pool_timeout {
480            Some(duration) => crate::connector::metrics::check_out(self.inner.get_timeout(duration)).await,
481            None => crate::connector::metrics::check_out(self.inner.get()).await,
482        };
483
484        let inner = match res {
485            Ok(conn) => conn,
486            Err(mobc_forked::Error::PoolClosed) => return Err(Error::builder(ErrorKind::PoolClosed {}).build()),
487            Err(mobc_forked::Error::Timeout) => {
488                let state = self.inner.state().await;
489                // We can use unwrap here because a pool timeout has to be set to use a connection pool
490                let timeout_duration = self.pool_timeout.unwrap();
491                return Err(
492                    Error::builder(ErrorKind::pool_timeout(state.max_open, state.in_use, timeout_duration)).build(),
493                );
494            }
495            Err(mobc_forked::Error::Inner(e)) => return Err(e),
496            Err(e @ mobc_forked::Error::BadConn) => {
497                let error = Error::builder(ErrorKind::ConnectionError(Box::new(e))).build();
498                return Err(error);
499            }
500        };
501
502        Ok(PooledConnection { inner })
503    }
504
505    /// Info about the connection and underlying database.
506    pub fn connection_info(&self) -> &ConnectionInfo {
507        &self.connection_info
508    }
509}