Skip to main content

sentinel_driver/pool/
mod.rs

1pub mod config;
2pub mod health;
3
4use std::collections::VecDeque;
5use std::ops::{Deref, DerefMut};
6use std::sync::Arc;
7
8use tokio::sync::{Mutex, Semaphore};
9use tracing::debug;
10
11use crate::config::Config;
12use crate::error::{Error, Result};
13use crate::pool::config::PoolConfig;
14use crate::pool::health::{ConnectionMeta, HealthCheckStrategy};
15use crate::Connection;
16
17/// An idle connection in the pool, with its metadata.
18struct IdleConnection {
19    conn: Connection,
20    meta: ConnectionMeta,
21}
22
23/// Shared inner state of the pool, protected by a Mutex.
24struct PoolState {
25    idle: VecDeque<IdleConnection>,
26    total_count: usize,
27}
28
29/// Shared data that lives behind an Arc, so PooledConnection can own a clone.
30struct PoolShared {
31    config: Config,
32    pool_config: PoolConfig,
33    semaphore: Semaphore,
34    state: Mutex<PoolState>,
35}
36
37/// Snapshot of pool statistics.
38///
39/// Cheap to produce — reads from pool state under a single lock.
40#[derive(Debug, Clone, Copy)]
41pub struct PoolMetrics {
42    /// Number of connections currently checked out by users.
43    pub active: usize,
44    /// Number of idle connections available for checkout.
45    pub idle: usize,
46    /// Total connections (active + idle).
47    pub total: usize,
48    /// Maximum allowed connections.
49    pub max: usize,
50}
51
52/// A connection pool for PostgreSQL.
53///
54/// Cheaply cloneable (internally Arc'd). Uses a semaphore to limit max
55/// connections and a mutex-protected deque for idle connection management.
56/// Designed for <0.5μs checkout latency.
57///
58/// # Lifecycle Callbacks
59///
60/// Three optional callbacks control connection lifecycle:
61/// - `after_connect` — runs once per new connection (session setup)
62/// - `before_acquire` — runs before handing out a connection (validation)
63/// - `after_release` — runs when a connection returns to the pool (cleanup)
64///
65/// # Example
66///
67/// ```rust,no_run
68/// use sentinel_driver::{Config, pool::{Pool, config::PoolConfig}};
69/// use std::time::Duration;
70///
71/// # async fn example() -> sentinel_driver::Result<()> {
72/// let config = Config::parse("postgres://user:pass@localhost/db")?;
73/// let pool = Pool::new(config, PoolConfig::new().max_connections(10));
74///
75/// let conn = pool.acquire().await?;
76/// // use conn...
77/// // conn is returned to pool on drop
78/// # Ok(())
79/// # }
80/// ```
81#[derive(Clone)]
82pub struct Pool {
83    shared: Arc<PoolShared>,
84}
85
86impl Pool {
87    /// Create a new connection pool. No connections are opened until `acquire()`.
88    pub fn new(config: Config, pool_config: PoolConfig) -> Self {
89        let shared = Arc::new(PoolShared {
90            semaphore: Semaphore::new(pool_config.max_connections),
91            config,
92            pool_config,
93            state: Mutex::new(PoolState {
94                idle: VecDeque::new(),
95                total_count: 0,
96            }),
97        });
98
99        Self { shared }
100    }
101
102    /// Create a pool that defers all connection establishment until the
103    /// first `acquire()` call.
104    ///
105    /// This is identical to `new()` — both are lazy. Provided for API
106    /// compatibility with connection pools that eagerly open connections.
107    ///
108    /// ```rust,no_run
109    /// # use sentinel_driver::{Config, pool::{Pool, config::PoolConfig}};
110    /// # fn example() -> sentinel_driver::Result<()> {
111    /// let config = Config::parse("postgres://user:pass@localhost/db")?;
112    /// let pool = Pool::connect_lazy(config, PoolConfig::new());
113    /// // No connections opened yet — first acquire() will connect.
114    /// # Ok(())
115    /// # }
116    /// ```
117    pub fn connect_lazy(config: Config, pool_config: PoolConfig) -> Self {
118        Self::new(config, pool_config)
119    }
120
121    /// Acquire a connection from the pool.
122    ///
123    /// If an idle connection is available, it's returned immediately.
124    /// Otherwise, a new connection is created (up to `max_connections`).
125    /// If the pool is full, waits up to `acquire_timeout`.
126    pub async fn acquire(&self) -> Result<PooledConnection> {
127        let permit = tokio::time::timeout(
128            self.shared.pool_config.acquire_timeout,
129            self.shared.semaphore.acquire(),
130        )
131        .await
132        .map_err(|_| Error::Pool("acquire timeout: pool exhausted".into()))?
133        .map_err(|_| Error::Pool("pool closed".into()))?;
134
135        // Release semaphore permit immediately — we track count ourselves.
136        // The semaphore just rate-limits concurrent acquires.
137        drop(permit);
138
139        // Try to get an idle connection
140        let idle_conn = {
141            let mut state = self.shared.state.lock().await;
142            state.idle.pop_front()
143        };
144
145        if let Some(idle) = idle_conn {
146            if self.is_fresh(&idle.meta) {
147                let mut conn = idle.conn;
148                // If Query strategy, verify connection is alive
149                if self.shared.pool_config.health_check == HealthCheckStrategy::Query
150                    && !health::check_alive(conn.pg_connection_mut()).await
151                {
152                    debug!("idle connection failed health check, creating new one");
153                    self.decrement_count().await;
154                    let (conn, meta) = self.create_connection().await?;
155                    return Ok(PooledConnection {
156                        conn: Some(conn),
157                        meta,
158                        shared: Arc::clone(&self.shared),
159                    });
160                }
161
162                // Run before_acquire callback
163                if let Some(ref cb) = self.shared.pool_config.before_acquire {
164                    match cb(&mut conn).await {
165                        Ok(true) => { /* connection accepted */ }
166                        Ok(false) => {
167                            debug!("before_acquire rejected connection");
168                            self.decrement_count().await;
169                            let (conn, meta) = self.create_connection().await?;
170                            return Ok(PooledConnection {
171                                conn: Some(conn),
172                                meta,
173                                shared: Arc::clone(&self.shared),
174                            });
175                        }
176                        Err(_) => {
177                            debug!("before_acquire callback error, discarding connection");
178                            self.decrement_count().await;
179                            let (conn, meta) = self.create_connection().await?;
180                            return Ok(PooledConnection {
181                                conn: Some(conn),
182                                meta,
183                                shared: Arc::clone(&self.shared),
184                            });
185                        }
186                    }
187                }
188
189                debug!("reusing idle connection");
190                Ok(PooledConnection {
191                    conn: Some(conn),
192                    meta: idle.meta,
193                    shared: Arc::clone(&self.shared),
194                })
195            } else {
196                debug!("idle connection expired, creating new one");
197                self.decrement_count().await;
198                let (conn, meta) = self.create_connection().await?;
199                Ok(PooledConnection {
200                    conn: Some(conn),
201                    meta,
202                    shared: Arc::clone(&self.shared),
203                })
204            }
205        } else {
206            let (conn, meta) = self.create_connection().await?;
207            Ok(PooledConnection {
208                conn: Some(conn),
209                meta,
210                shared: Arc::clone(&self.shared),
211            })
212        }
213    }
214
215    /// Number of idle connections.
216    pub async fn idle_count(&self) -> usize {
217        self.shared.state.lock().await.idle.len()
218    }
219
220    /// Total number of connections (idle + in use).
221    pub async fn total_count(&self) -> usize {
222        self.shared.state.lock().await.total_count
223    }
224
225    /// Maximum number of connections allowed.
226    pub fn max_connections(&self) -> usize {
227        self.shared.pool_config.max_connections
228    }
229
230    /// Get a snapshot of pool metrics.
231    pub async fn metrics(&self) -> PoolMetrics {
232        let state = self.shared.state.lock().await;
233        let idle = state.idle.len();
234        let total = state.total_count;
235        PoolMetrics {
236            active: total.saturating_sub(idle),
237            idle,
238            total,
239            max: self.shared.pool_config.max_connections,
240        }
241    }
242
243    // ── Internal ─────────────────────────────────────
244
245    async fn create_connection(&self) -> Result<(Connection, ConnectionMeta)> {
246        let mut conn = Connection::connect(self.shared.config.clone()).await?;
247
248        // Run after_connect callback
249        if let Some(ref cb) = self.shared.pool_config.after_connect {
250            if let Err(e) = cb(&mut conn).await {
251                debug!(?e, "after_connect callback failed, discarding connection");
252                return Err(e);
253            }
254        }
255
256        let meta = ConnectionMeta::new();
257
258        let mut state = self.shared.state.lock().await;
259        state.total_count += 1;
260        debug!(total = state.total_count, "created new connection");
261
262        Ok((conn, meta))
263    }
264
265    async fn decrement_count(&self) {
266        let mut state = self.shared.state.lock().await;
267        state.total_count = state.total_count.saturating_sub(1);
268    }
269
270    fn is_fresh(&self, meta: &ConnectionMeta) -> bool {
271        if meta.is_broken {
272            return false;
273        }
274
275        if let Some(timeout) = self.shared.pool_config.idle_timeout {
276            if meta.is_idle_expired(timeout) {
277                return false;
278            }
279        }
280
281        if let Some(lifetime) = self.shared.pool_config.max_lifetime {
282            if meta.is_lifetime_expired(lifetime) {
283                return false;
284            }
285        }
286
287        true
288    }
289}
290
291/// A connection checked out from the pool.
292///
293/// When dropped, the connection is automatically returned to the pool
294/// (unless it has been marked as broken). The `after_release` callback
295/// runs before the connection re-enters the idle queue.
296pub struct PooledConnection {
297    conn: Option<Connection>,
298    meta: ConnectionMeta,
299    shared: Arc<PoolShared>,
300}
301
302impl PooledConnection {
303    /// Mark this connection as broken. It will be discarded on drop
304    /// instead of being returned to the pool.
305    pub fn mark_broken(&mut self) {
306        self.meta.is_broken = true;
307    }
308}
309
310impl Deref for PooledConnection {
311    type Target = Connection;
312
313    #[allow(clippy::expect_used)]
314    fn deref(&self) -> &Self::Target {
315        self.conn
316            .as_ref()
317            .expect("PooledConnection used after drop")
318    }
319}
320
321impl DerefMut for PooledConnection {
322    #[allow(clippy::expect_used)]
323    fn deref_mut(&mut self) -> &mut Self::Target {
324        self.conn
325            .as_mut()
326            .expect("PooledConnection used after drop")
327    }
328}
329
330impl Drop for PooledConnection {
331    fn drop(&mut self) {
332        if let Some(conn) = self.conn.take() {
333            let shared = Arc::clone(&self.shared);
334
335            if self.meta.is_broken {
336                tokio::spawn(async move {
337                    drop(conn);
338                    let mut state = shared.state.lock().await;
339                    state.total_count = state.total_count.saturating_sub(1);
340                    debug!("discarded broken connection");
341                });
342            } else {
343                let created_at = self.meta.created_at;
344                let after_release = self.shared.pool_config.after_release.clone();
345
346                tokio::spawn(async move {
347                    let mut conn = conn;
348
349                    // Run after_release callback
350                    if let Some(cb) = after_release {
351                        match cb(&mut conn).await {
352                            Ok(true) => { /* return to pool */ }
353                            Ok(false) => {
354                                debug!("after_release rejected connection, discarding");
355                                let mut state = shared.state.lock().await;
356                                state.total_count = state.total_count.saturating_sub(1);
357                                return;
358                            }
359                            Err(_) => {
360                                debug!("after_release callback error, discarding connection");
361                                let mut state = shared.state.lock().await;
362                                state.total_count = state.total_count.saturating_sub(1);
363                                return;
364                            }
365                        }
366                    }
367
368                    let mut meta = ConnectionMeta::new();
369                    meta.created_at = created_at;
370                    meta.touch();
371
372                    let mut state = shared.state.lock().await;
373                    state.idle.push_back(IdleConnection { conn, meta });
374                });
375            }
376        }
377    }
378}