qusql_mysql/pool.rs
1//! Implements a pool of connections to Mariadb/Mysql
2//!
3//! Example:
4//! --------
5//! ```no_run
6//! use qusql_mysql::connection::{ConnectionOptions, ConnectionError, ExecutorExt};
7//! use qusql_mysql::pool::{Pool, PoolOptions};
8//!
9//! async fn test() -> Result<(), ConnectionError> {
10//! let pool = Pool::connect(
11//! ConnectionOptions::new()
12//! .address("127.0.0.1:3307").unwrap()
13//! .user("user")
14//! .password("pw")
15//! .database("test"),
16//! PoolOptions::new().max_connections(10)
17//! ).await?;
18//!
19//! let mut conn = pool.acquire().await?;
20//!
21//! let row: Option<(i64,)> = conn.fetch_optional(
22//! "SELECT `number` FROM `table` WHERE `id`=?",
23//! (42,)
24//! ).await?;
25//!
26//! if let Some((id,)) = row {
27//! println!("Found id {}", id);
28//! }
29//!
30//! Ok(())
31//! }
32//! ```
33use std::{
34 mem::ManuallyDrop,
35 ops::{Deref, DerefMut},
36 sync::{Arc, Mutex},
37 time::{Duration, Instant},
38};
39
40use crate::{
41 Executor,
42 connection::{Connection, ConnectionOptions, ConnectionResult},
43 handle_drop::HandleDrop,
44};
45
46/// Options used for connection pool
47pub struct PoolOptions {
48 /// With this long when a connection is dropped while it is performing a query.
49 ///
50 /// After timeout the connection is closed. And a new connection may then be opened
51 clean_timeout: Duration,
52 /// Wait this long to attempt to connection again if we fail to connect
53 reconnect_time: Duration,
54 /// The maximum number of concurrent connections allowed
55 max_connections: usize,
56 /// When acquiring a connection from the pool that is older than this, ping it first
57 /// to ensure that it is still good
58 stale_connection_time: Duration,
59 /// When pinning a stale connection only wait this long
60 ping_timeout: Duration,
61}
62
63impl PoolOptions {
64 /// New default pool options
65 pub fn new() -> Self {
66 PoolOptions::default()
67 }
68
69 /// With this long when a connection is dropped while it is performing a query.
70 ///
71 /// After timeout the connection is closed. And a new connection may then be opened
72 pub fn clean_timeout(self, duration: Duration) -> Self {
73 PoolOptions {
74 clean_timeout: duration,
75 ..self
76 }
77 }
78
79 /// Wait this long to attempt to connection again if we fail to connect
80 pub fn reconnect_time(self, duration: Duration) -> Self {
81 PoolOptions {
82 reconnect_time: duration,
83 ..self
84 }
85 }
86
87 /// The maximum number of concurrent connections allowed
88 pub fn max_connections(self, connection: usize) -> Self {
89 PoolOptions {
90 max_connections: connection,
91 ..self
92 }
93 }
94}
95
96impl Default for PoolOptions {
97 fn default() -> Self {
98 Self {
99 clean_timeout: Duration::from_millis(200),
100 reconnect_time: Duration::from_secs(2),
101 stale_connection_time: Duration::from_secs(10 * 60),
102 ping_timeout: Duration::from_millis(200),
103 max_connections: 5,
104 }
105 }
106}
107
108/// Part of pool state protected by a mutex
109struct PoolProtected {
110 /// Current free transactions
111 connections: Vec<(Connection, Instant)>,
112 /// Number of transactions we are still allowed to allocate
113 unallocated_connections: usize,
114}
115
116/// Inner state of a pool
117struct PoolInner {
118 /// Part of state protected by a mutex
119 protected: Mutex<PoolProtected>,
120 /// The pool options given at creation time
121 pool_options: PoolOptions,
122 /// The connection options given at creation time
123 connection_options: ConnectionOptions<'static>,
124 /// Notify this when a connection becomes available
125 connection_available: tokio::sync::Notify,
126}
127
128/// A pool of shared connections that can be acquired
129#[derive(Clone)]
130pub struct Pool(Arc<PoolInner>);
131
132impl Pool {
133 /// Establish a new pool with at least one connection
134 pub async fn connect(
135 connection_options: ConnectionOptions<'static>,
136 pool_options: PoolOptions,
137 ) -> ConnectionResult<Self> {
138 let connection = Connection::connect(&connection_options).await?;
139 Ok(Pool(Arc::new(PoolInner {
140 protected: Mutex::new(PoolProtected {
141 connections: vec![(connection, std::time::Instant::now())],
142 unallocated_connections: pool_options.max_connections - 1,
143 }),
144 pool_options,
145 connection_options,
146 connection_available: tokio::sync::Notify::new(),
147 })))
148 }
149
150 /// Acquire a free connection from the pool.
151 ///
152 /// If there is no free connection wait for one to become available
153 ///
154 /// The returned future is drop safe
155 pub async fn acquire(&self) -> ConnectionResult<PoolConnection> {
156 enum Res<N, R> {
157 /// Wait for a connection to become available
158 Wait,
159 /// Establish a new connection
160 New(N),
161 /// Reuse an existing connection
162 Reuse(R),
163 }
164 loop {
165 let res = {
166 let mut inner = self.0.protected.lock().unwrap();
167 if let Some((connection, last_use)) = inner.connections.pop() {
168 Res::Reuse(HandleDrop::new(
169 (connection, last_use, self.clone()),
170 |(connection, last_use, pool)| {
171 let mut inner = pool.0.protected.lock().unwrap();
172 inner.connections.push((connection, last_use));
173 },
174 ))
175 } else if inner.unallocated_connections == 0 {
176 Res::Wait
177 } else {
178 inner.unallocated_connections -= 1;
179 Res::New(HandleDrop::new(self.clone(), |pool| {
180 pool.connection_dropped();
181 }))
182 }
183 };
184
185 match res {
186 Res::Wait => {
187 // Safety cancel: We are not holding any resources
188 self.0.connection_available.notified().await
189 }
190 Res::New(handle) => {
191 // Safety cancel: This is cancel safe since the handle will increment the unallocated_connections when dropped
192 let r = Connection::connect(&self.0.connection_options).await;
193 match r {
194 Ok(connection) => {
195 let pool = handle.release();
196 return Ok(PoolConnection {
197 pool,
198 connection: ManuallyDrop::new(connection),
199 });
200 }
201 Err(e) => {
202 // Wait a bit with releasing the handle, since the next acquire will probably run into the same failure
203 tokio::task::spawn(async move {
204 tokio::time::sleep((*handle).0.pool_options.reconnect_time).await;
205 std::mem::drop(handle);
206 });
207 return Err(e);
208 }
209 }
210 }
211 Res::Reuse(mut handle) => {
212 let (connection, last_use, pool) = &mut *handle;
213 if last_use.elapsed() > pool.0.pool_options.stale_connection_time {
214 // Safety cancel: This is cancel safe since the handle will put the connection back into the pool
215 match tokio::time::timeout(
216 pool.0.pool_options.ping_timeout,
217 connection.ping(),
218 )
219 .await
220 {
221 Ok(Ok(())) => (),
222 Err(_) | Ok(Err(_)) => {
223 // Ping failed or time outed. Lets drop the connection and create a new one
224 let (connection, _, pool) = handle.release();
225 std::mem::drop(connection);
226 pool.connection_dropped();
227 continue;
228 }
229 }
230 }
231 let (connection, _, pool) = handle.release();
232 let connection = PoolConnection {
233 pool,
234 connection: ManuallyDrop::new(connection),
235 };
236 return Ok(connection);
237 }
238 }
239 }
240 }
241
242 /// A connection has been dropped, allow new connections to be established
243 fn connection_dropped(&self) {
244 let mut inner = self.0.protected.lock().unwrap();
245 inner.unallocated_connections += 1;
246 self.0.connection_available.notify_one();
247 }
248
249 /// Put a connection back into the pool
250 fn release(&self, connection: Connection) {
251 let mut inner = self.0.protected.lock().unwrap();
252 self.0.connection_available.notify_one();
253 inner
254 .connections
255 .push((connection, std::time::Instant::now()));
256 }
257}
258
259/// A connection borrowed from the pool
260pub struct PoolConnection {
261 /// The pool the connection is borrowed from
262 pool: Pool,
263 /// The borrowed connection
264 connection: ManuallyDrop<Connection>,
265}
266
267impl Deref for PoolConnection {
268 type Target = Connection;
269
270 fn deref(&self) -> &Self::Target {
271 &self.connection
272 }
273}
274
275impl DerefMut for PoolConnection {
276 fn deref_mut(&mut self) -> &mut Self::Target {
277 &mut self.connection
278 }
279}
280
281impl Drop for PoolConnection {
282 /// Drop the connection, if we are in the middle of a request wait a bit for it to finish
283 fn drop(&mut self) {
284 // Safety: I will not access self.connection after this
285 let mut connection = unsafe { ManuallyDrop::take(&mut self.connection) };
286 if connection.is_clean() {
287 self.pool.release(connection);
288 } else {
289 // The connection is not clean, lets try to clean it up for a bit
290 let pool = self.pool.clone();
291 tokio::spawn(async move {
292 match tokio::time::timeout(pool.0.pool_options.clean_timeout, connection.cleanup())
293 .await
294 {
295 Ok(Ok(())) => {
296 pool.release(connection);
297 }
298 Ok(Err(_)) => {
299 // Connection error during cleaning, lets just close the connection
300 std::mem::drop(connection);
301 pool.connection_dropped();
302 }
303 Err(_) => {
304 // Timeout during cleaning
305 std::mem::drop(connection);
306 pool.connection_dropped();
307 }
308 }
309 });
310 }
311 }
312}