sentinel_driver/pool/
mod.rs1pub mod config;
2pub mod health;
3
4use std::collections::VecDeque;
5use std::ops::{Deref, DerefMut};
6use std::sync::Arc;
7
8use tokio::sync::{Mutex, Semaphore};
9use tracing::debug;
10
11use crate::config::Config;
12use crate::error::{Error, Result};
13use crate::pool::config::PoolConfig;
14use crate::pool::health::{ConnectionMeta, HealthCheckStrategy};
15use crate::Connection;
16
17struct IdleConnection {
19 conn: Connection,
20 meta: ConnectionMeta,
21}
22
23struct PoolState {
25 idle: VecDeque<IdleConnection>,
26 total_count: usize,
27}
28
29struct PoolShared {
31 config: Config,
32 pool_config: PoolConfig,
33 semaphore: Semaphore,
34 state: Mutex<PoolState>,
35}
36
37#[derive(Debug, Clone, Copy)]
41pub struct PoolMetrics {
42 pub active: usize,
44 pub idle: usize,
46 pub total: usize,
48 pub max: usize,
50}
51
52#[derive(Clone)]
82pub struct Pool {
83 shared: Arc<PoolShared>,
84}
85
86impl Pool {
87 pub fn new(config: Config, pool_config: PoolConfig) -> Self {
89 let shared = Arc::new(PoolShared {
90 semaphore: Semaphore::new(pool_config.max_connections),
91 config,
92 pool_config,
93 state: Mutex::new(PoolState {
94 idle: VecDeque::new(),
95 total_count: 0,
96 }),
97 });
98
99 Self { shared }
100 }
101
102 pub fn connect_lazy(config: Config, pool_config: PoolConfig) -> Self {
118 Self::new(config, pool_config)
119 }
120
121 pub async fn acquire(&self) -> Result<PooledConnection> {
127 let permit = tokio::time::timeout(
128 self.shared.pool_config.acquire_timeout,
129 self.shared.semaphore.acquire(),
130 )
131 .await
132 .map_err(|_| Error::Pool("acquire timeout: pool exhausted".into()))?
133 .map_err(|_| Error::Pool("pool closed".into()))?;
134
135 drop(permit);
138
139 let idle_conn = {
141 let mut state = self.shared.state.lock().await;
142 state.idle.pop_front()
143 };
144
145 if let Some(idle) = idle_conn {
146 if self.is_fresh(&idle.meta) {
147 let mut conn = idle.conn;
148 if self.shared.pool_config.health_check == HealthCheckStrategy::Query
150 && !health::check_alive(conn.pg_connection_mut()).await
151 {
152 debug!("idle connection failed health check, creating new one");
153 self.decrement_count().await;
154 let (conn, meta) = self.create_connection().await?;
155 return Ok(PooledConnection {
156 conn: Some(conn),
157 meta,
158 shared: Arc::clone(&self.shared),
159 });
160 }
161
162 if let Some(ref cb) = self.shared.pool_config.before_acquire {
164 match cb(&mut conn).await {
165 Ok(true) => { }
166 Ok(false) => {
167 debug!("before_acquire rejected connection");
168 self.decrement_count().await;
169 let (conn, meta) = self.create_connection().await?;
170 return Ok(PooledConnection {
171 conn: Some(conn),
172 meta,
173 shared: Arc::clone(&self.shared),
174 });
175 }
176 Err(_) => {
177 debug!("before_acquire callback error, discarding connection");
178 self.decrement_count().await;
179 let (conn, meta) = self.create_connection().await?;
180 return Ok(PooledConnection {
181 conn: Some(conn),
182 meta,
183 shared: Arc::clone(&self.shared),
184 });
185 }
186 }
187 }
188
189 debug!("reusing idle connection");
190 Ok(PooledConnection {
191 conn: Some(conn),
192 meta: idle.meta,
193 shared: Arc::clone(&self.shared),
194 })
195 } else {
196 debug!("idle connection expired, creating new one");
197 self.decrement_count().await;
198 let (conn, meta) = self.create_connection().await?;
199 Ok(PooledConnection {
200 conn: Some(conn),
201 meta,
202 shared: Arc::clone(&self.shared),
203 })
204 }
205 } else {
206 let (conn, meta) = self.create_connection().await?;
207 Ok(PooledConnection {
208 conn: Some(conn),
209 meta,
210 shared: Arc::clone(&self.shared),
211 })
212 }
213 }
214
215 pub async fn idle_count(&self) -> usize {
217 self.shared.state.lock().await.idle.len()
218 }
219
220 pub async fn total_count(&self) -> usize {
222 self.shared.state.lock().await.total_count
223 }
224
225 pub fn max_connections(&self) -> usize {
227 self.shared.pool_config.max_connections
228 }
229
230 pub async fn metrics(&self) -> PoolMetrics {
232 let state = self.shared.state.lock().await;
233 let idle = state.idle.len();
234 let total = state.total_count;
235 PoolMetrics {
236 active: total.saturating_sub(idle),
237 idle,
238 total,
239 max: self.shared.pool_config.max_connections,
240 }
241 }
242
243 async fn create_connection(&self) -> Result<(Connection, ConnectionMeta)> {
246 let mut conn = Connection::connect(self.shared.config.clone()).await?;
247
248 if let Some(ref cb) = self.shared.pool_config.after_connect {
250 if let Err(e) = cb(&mut conn).await {
251 debug!(?e, "after_connect callback failed, discarding connection");
252 return Err(e);
253 }
254 }
255
256 let meta = ConnectionMeta::new();
257
258 let mut state = self.shared.state.lock().await;
259 state.total_count += 1;
260 debug!(total = state.total_count, "created new connection");
261
262 Ok((conn, meta))
263 }
264
265 async fn decrement_count(&self) {
266 let mut state = self.shared.state.lock().await;
267 state.total_count = state.total_count.saturating_sub(1);
268 }
269
270 fn is_fresh(&self, meta: &ConnectionMeta) -> bool {
271 if meta.is_broken {
272 return false;
273 }
274
275 if let Some(timeout) = self.shared.pool_config.idle_timeout {
276 if meta.is_idle_expired(timeout) {
277 return false;
278 }
279 }
280
281 if let Some(lifetime) = self.shared.pool_config.max_lifetime {
282 if meta.is_lifetime_expired(lifetime) {
283 return false;
284 }
285 }
286
287 true
288 }
289}
290
291pub struct PooledConnection {
297 conn: Option<Connection>,
298 meta: ConnectionMeta,
299 shared: Arc<PoolShared>,
300}
301
302impl PooledConnection {
303 pub fn mark_broken(&mut self) {
306 self.meta.is_broken = true;
307 }
308}
309
310impl Deref for PooledConnection {
311 type Target = Connection;
312
313 #[allow(clippy::expect_used)]
314 fn deref(&self) -> &Self::Target {
315 self.conn
316 .as_ref()
317 .expect("PooledConnection used after drop")
318 }
319}
320
321impl DerefMut for PooledConnection {
322 #[allow(clippy::expect_used)]
323 fn deref_mut(&mut self) -> &mut Self::Target {
324 self.conn
325 .as_mut()
326 .expect("PooledConnection used after drop")
327 }
328}
329
330impl Drop for PooledConnection {
331 fn drop(&mut self) {
332 if let Some(conn) = self.conn.take() {
333 let shared = Arc::clone(&self.shared);
334
335 if self.meta.is_broken {
336 tokio::spawn(async move {
337 drop(conn);
338 let mut state = shared.state.lock().await;
339 state.total_count = state.total_count.saturating_sub(1);
340 debug!("discarded broken connection");
341 });
342 } else {
343 let created_at = self.meta.created_at;
344 let after_release = self.shared.pool_config.after_release.clone();
345
346 tokio::spawn(async move {
347 let mut conn = conn;
348
349 if let Some(cb) = after_release {
351 match cb(&mut conn).await {
352 Ok(true) => { }
353 Ok(false) => {
354 debug!("after_release rejected connection, discarding");
355 let mut state = shared.state.lock().await;
356 state.total_count = state.total_count.saturating_sub(1);
357 return;
358 }
359 Err(_) => {
360 debug!("after_release callback error, discarding connection");
361 let mut state = shared.state.lock().await;
362 state.total_count = state.total_count.saturating_sub(1);
363 return;
364 }
365 }
366 }
367
368 let mut meta = ConnectionMeta::new();
369 meta.created_at = created_at;
370 meta.touch();
371
372 let mut state = shared.state.lock().await;
373 state.idle.push_back(IdleConnection { conn, meta });
374 });
375 }
376 }
377 }
378}