Skip to main content

qusql_mysql/
pool.rs

1//! Implements a pool of connections to Mariadb/Mysql
2//!
3//! Example:
4//! --------
5//! ```no_run
6//! use qusql_mysql::connection::{ConnectionOptions, ConnectionError, ExecutorExt};
7//! use qusql_mysql::pool::{Pool, PoolOptions};
8//!
9//! async fn test() -> Result<(), ConnectionError> {
10//!     let pool = Pool::connect(
11//!         ConnectionOptions::new()
12//!             .address("127.0.0.1:3307").unwrap()
13//!             .user("user")
14//!             .password("pw")
15//!             .database("test"),
16//!         PoolOptions::new().max_connections(10)
17//!     ).await?;
18//!
19//!     let mut conn = pool.acquire().await?;
20//!
21//!     let row: Option<(i64,)> = conn.fetch_optional(
22//!         "SELECT `number` FROM `table` WHERE `id`=?",
23//!         (42,)
24//!     ).await?;
25//!
26//!     if let Some((id,)) = row {
27//!         println!("Found id {}", id);
28//!     }
29//!
30//!     Ok(())
31//! }
32//! ```
33use std::{
34    mem::ManuallyDrop,
35    ops::{Deref, DerefMut},
36    sync::{Arc, Mutex},
37    time::{Duration, Instant},
38};
39
40use crate::{
41    Executor,
42    connection::{Connection, ConnectionOptions, ConnectionResult},
43    handle_drop::HandleDrop,
44};
45
46/// Options used for connection pool
47pub struct PoolOptions {
48    /// With this long when a connection is dropped while it is performing a query.
49    ///
50    /// After timeout the connection is closed. And a new connection may then be opened
51    clean_timeout: Duration,
52    /// Wait this long to attempt to connection again if we fail to connect
53    reconnect_time: Duration,
54    /// The maximum number of concurrent connections allowed
55    max_connections: usize,
56    /// When acquiring a connection from the pool that is older than this, ping it first
57    /// to ensure that it is still good
58    stale_connection_time: Duration,
59    /// When pinning a stale connection only wait this long
60    ping_timeout: Duration,
61}
62
63impl PoolOptions {
64    /// New default pool options
65    pub fn new() -> Self {
66        PoolOptions::default()
67    }
68
69    /// With this long when a connection is dropped while it is performing a query.
70    ///
71    /// After timeout the connection is closed. And a new connection may then be opened
72    pub fn clean_timeout(self, duration: Duration) -> Self {
73        PoolOptions {
74            clean_timeout: duration,
75            ..self
76        }
77    }
78
79    /// Wait this long to attempt to connection again if we fail to connect
80    pub fn reconnect_time(self, duration: Duration) -> Self {
81        PoolOptions {
82            reconnect_time: duration,
83            ..self
84        }
85    }
86
87    /// The maximum number of concurrent connections allowed
88    pub fn max_connections(self, connection: usize) -> Self {
89        PoolOptions {
90            max_connections: connection,
91            ..self
92        }
93    }
94}
95
96impl Default for PoolOptions {
97    fn default() -> Self {
98        Self {
99            clean_timeout: Duration::from_millis(200),
100            reconnect_time: Duration::from_secs(2),
101            stale_connection_time: Duration::from_secs(10 * 60),
102            ping_timeout: Duration::from_millis(200),
103            max_connections: 5,
104        }
105    }
106}
107
108/// Part of pool state protected by a mutex
109struct PoolProtected {
110    /// Current free transactions
111    connections: Vec<(Connection, Instant)>,
112    /// Number of transactions we are still allowed to allocate
113    unallocated_connections: usize,
114}
115
116/// Inner state of a pool
117struct PoolInner {
118    /// Part of state protected by a mutex
119    protected: Mutex<PoolProtected>,
120    /// The pool options given at creation time
121    pool_options: PoolOptions,
122    /// The connection options given at creation time
123    connection_options: ConnectionOptions<'static>,
124    /// Notify this when a connection becomes available
125    connection_available: tokio::sync::Notify,
126}
127
128/// A pool of shared connections that can be acquired
129#[derive(Clone)]
130pub struct Pool(Arc<PoolInner>);
131
132impl Pool {
133    /// Establish a new pool with at least one connection
134    pub async fn connect(
135        connection_options: ConnectionOptions<'static>,
136        pool_options: PoolOptions,
137    ) -> ConnectionResult<Self> {
138        let connection = Connection::connect(&connection_options).await?;
139        Ok(Pool(Arc::new(PoolInner {
140            protected: Mutex::new(PoolProtected {
141                connections: vec![(connection, std::time::Instant::now())],
142                unallocated_connections: pool_options.max_connections - 1,
143            }),
144            pool_options,
145            connection_options,
146            connection_available: tokio::sync::Notify::new(),
147        })))
148    }
149
150    /// Acquire a free connection from the pool.
151    ///
152    /// If there is no free connection wait for one to become available
153    ///
154    /// The returned future is drop safe
155    pub async fn acquire(&self) -> ConnectionResult<PoolConnection> {
156        enum Res<N, R> {
157            /// Wait for a connection to become available
158            Wait,
159            /// Establish a new connection
160            New(N),
161            /// Reuse an existing connection
162            Reuse(R),
163        }
164        loop {
165            let res = {
166                let mut inner = self.0.protected.lock().unwrap();
167                if let Some((connection, last_use)) = inner.connections.pop() {
168                    Res::Reuse(HandleDrop::new(
169                        (connection, last_use, self.clone()),
170                        |(connection, last_use, pool)| {
171                            let mut inner = pool.0.protected.lock().unwrap();
172                            inner.connections.push((connection, last_use));
173                        },
174                    ))
175                } else if inner.unallocated_connections == 0 {
176                    Res::Wait
177                } else {
178                    inner.unallocated_connections -= 1;
179                    Res::New(HandleDrop::new(self.clone(), |pool| {
180                        pool.connection_dropped();
181                    }))
182                }
183            };
184
185            match res {
186                Res::Wait => {
187                    // Safety cancel: We are not holding any resources
188                    self.0.connection_available.notified().await
189                }
190                Res::New(handle) => {
191                    // Safety cancel: This is cancel safe since the handle will increment the unallocated_connections when dropped
192                    let r = Connection::connect(&self.0.connection_options).await;
193                    match r {
194                        Ok(connection) => {
195                            let pool = handle.release();
196                            return Ok(PoolConnection {
197                                pool,
198                                connection: ManuallyDrop::new(connection),
199                            });
200                        }
201                        Err(e) => {
202                            // Wait a bit with releasing the handle, since the next acquire will probably run into the same failure
203                            tokio::task::spawn(async move {
204                                tokio::time::sleep((*handle).0.pool_options.reconnect_time).await;
205                                std::mem::drop(handle);
206                            });
207                            return Err(e);
208                        }
209                    }
210                }
211                Res::Reuse(mut handle) => {
212                    let (connection, last_use, pool) = &mut *handle;
213                    if last_use.elapsed() > pool.0.pool_options.stale_connection_time {
214                        // Safety cancel: This is cancel safe since the handle will put the connection back into the pool
215                        match tokio::time::timeout(
216                            pool.0.pool_options.ping_timeout,
217                            connection.ping(),
218                        )
219                        .await
220                        {
221                            Ok(Ok(())) => (),
222                            Err(_) | Ok(Err(_)) => {
223                                // Ping failed or time outed. Lets drop the connection and create a new one
224                                let (connection, _, pool) = handle.release();
225                                std::mem::drop(connection);
226                                pool.connection_dropped();
227                                continue;
228                            }
229                        }
230                    }
231                    let (connection, _, pool) = handle.release();
232                    let connection = PoolConnection {
233                        pool,
234                        connection: ManuallyDrop::new(connection),
235                    };
236                    return Ok(connection);
237                }
238            }
239        }
240    }
241
242    /// A connection has been dropped, allow new connections to be established
243    fn connection_dropped(&self) {
244        let mut inner = self.0.protected.lock().unwrap();
245        inner.unallocated_connections += 1;
246        self.0.connection_available.notify_one();
247    }
248
249    /// Put a connection back into the pool
250    fn release(&self, connection: Connection) {
251        let mut inner = self.0.protected.lock().unwrap();
252        self.0.connection_available.notify_one();
253        inner
254            .connections
255            .push((connection, std::time::Instant::now()));
256    }
257}
258
259/// A connection borrowed from the pool
260pub struct PoolConnection {
261    /// The pool the connection is borrowed from
262    pool: Pool,
263    /// The borrowed connection
264    connection: ManuallyDrop<Connection>,
265}
266
267impl Deref for PoolConnection {
268    type Target = Connection;
269
270    fn deref(&self) -> &Self::Target {
271        &self.connection
272    }
273}
274
275impl DerefMut for PoolConnection {
276    fn deref_mut(&mut self) -> &mut Self::Target {
277        &mut self.connection
278    }
279}
280
281impl Drop for PoolConnection {
282    /// Drop the connection, if we are in the middle of a request wait a bit for it to finish
283    fn drop(&mut self) {
284        // Safety: I will not access self.connection after this
285        let mut connection = unsafe { ManuallyDrop::take(&mut self.connection) };
286        if connection.is_clean() {
287            self.pool.release(connection);
288        } else {
289            // The connection is not clean, lets try to clean it up for a bit
290            let pool = self.pool.clone();
291            tokio::spawn(async move {
292                match tokio::time::timeout(pool.0.pool_options.clean_timeout, connection.cleanup())
293                    .await
294                {
295                    Ok(Ok(())) => {
296                        pool.release(connection);
297                    }
298                    Ok(Err(_)) => {
299                        // Connection error during cleaning, lets just close the connection
300                        std::mem::drop(connection);
301                        pool.connection_dropped();
302                    }
303                    Err(_) => {
304                        // Timeout during cleaning
305                        std::mem::drop(connection);
306                        pool.connection_dropped();
307                    }
308                }
309            });
310        }
311    }
312}