Skip to main content

sentinel_driver/pool/
mod.rs

1pub mod config;
2pub mod health;
3
4use std::collections::VecDeque;
5use std::ops::{Deref, DerefMut};
6use std::sync::Arc;
7
8use tokio::sync::{Mutex, Semaphore};
9use tracing::debug;
10
11use crate::config::Config;
12use crate::error::{Error, Result};
13use crate::pool::config::PoolConfig;
14use crate::pool::health::{ConnectionMeta, HealthCheckStrategy};
15use crate::Connection;
16
17/// An idle connection in the pool, with its metadata.
18struct IdleConnection {
19    conn: Connection,
20    meta: ConnectionMeta,
21}
22
23/// Shared inner state of the pool, protected by a Mutex.
24struct PoolState {
25    idle: VecDeque<IdleConnection>,
26    total_count: usize,
27}
28
29/// Shared data that lives behind an Arc, so PooledConnection can own a clone.
30struct PoolShared {
31    config: Config,
32    pool_config: PoolConfig,
33    semaphore: Semaphore,
34    state: Mutex<PoolState>,
35}
36
37/// Snapshot of pool statistics.
38///
39/// Cheap to produce — reads from pool state under a single lock.
40#[derive(Debug, Clone, Copy)]
41pub struct PoolMetrics {
42    /// Number of connections currently checked out by users.
43    pub active: usize,
44    /// Number of idle connections available for checkout.
45    pub idle: usize,
46    /// Total connections (active + idle).
47    pub total: usize,
48    /// Maximum allowed connections.
49    pub max: usize,
50}
51
52/// A connection pool for PostgreSQL.
53///
54/// Cheaply cloneable (internally Arc'd). Uses a semaphore to limit max
55/// connections and a mutex-protected deque for idle connection management.
56/// Designed for <0.5μs checkout latency.
57///
58/// # Lifecycle Callbacks
59///
60/// Three optional callbacks control connection lifecycle:
61/// - `after_connect` — runs once per new connection (session setup)
62/// - `before_acquire` — runs before handing out a connection (validation)
63/// - `after_release` — runs when a connection returns to the pool (cleanup)
64///
65/// # Example
66///
67/// ```rust,no_run
68/// use sentinel_driver::{Config, pool::{Pool, config::PoolConfig}};
69/// use std::time::Duration;
70///
71/// # async fn example() -> sentinel_driver::Result<()> {
72/// let config = Config::parse("postgres://user:pass@localhost/db")?;
73/// let pool = Pool::new(config, PoolConfig::new().max_connections(10));
74///
75/// let conn = pool.acquire().await?;
76/// // use conn...
77/// // conn is returned to pool on drop
78/// # Ok(())
79/// # }
80/// ```
81#[derive(Clone)]
82pub struct Pool {
83    shared: Arc<PoolShared>,
84    pool_instrumentation: Arc<dyn crate::Instrumentation>,
85}
86
87impl Pool {
88    /// Create a new connection pool. No connections are opened until `acquire()`.
89    pub fn new(config: Config, pool_config: PoolConfig) -> Self {
90        let pool_instrumentation = config
91            .instrumentation
92            .clone()
93            .unwrap_or_else(crate::instrumentation::noop);
94        let shared = Arc::new(PoolShared {
95            semaphore: Semaphore::new(pool_config.max_connections),
96            config,
97            pool_config,
98            state: Mutex::new(PoolState {
99                idle: VecDeque::new(),
100                total_count: 0,
101            }),
102        });
103
104        Self {
105            shared,
106            pool_instrumentation,
107        }
108    }
109
110    /// Create a pool that defers all connection establishment until the
111    /// first `acquire()` call.
112    ///
113    /// This is identical to `new()` — both are lazy. Provided for API
114    /// compatibility with connection pools that eagerly open connections.
115    ///
116    /// ```rust,no_run
117    /// # use sentinel_driver::{Config, pool::{Pool, config::PoolConfig}};
118    /// # fn example() -> sentinel_driver::Result<()> {
119    /// let config = Config::parse("postgres://user:pass@localhost/db")?;
120    /// let pool = Pool::connect_lazy(config, PoolConfig::new());
121    /// // No connections opened yet — first acquire() will connect.
122    /// # Ok(())
123    /// # }
124    /// ```
125    pub fn connect_lazy(config: Config, pool_config: PoolConfig) -> Self {
126        Self::new(config, pool_config)
127    }
128
129    /// Install an `Instrumentation` impl. Replaces whatever was inherited
130    /// from `Config::with_instrumentation`. Affects this `Pool` handle and
131    /// any `Pool::clone()` made after this call; existing clones keep the
132    /// previous instrumentation.
133    pub fn with_instrumentation(mut self, instr: Arc<dyn crate::Instrumentation>) -> Self {
134        self.pool_instrumentation = instr;
135        self
136    }
137
138    /// Acquire a connection from the pool.
139    ///
140    /// If an idle connection is available, it's returned immediately.
141    /// Otherwise, a new connection is created (up to `max_connections`).
142    /// If the pool is full, waits up to `acquire_timeout`.
143    pub async fn acquire(&self) -> Result<PooledConnection> {
144        let pending = {
145            let state = self.shared.state.lock().await;
146            state.total_count.saturating_sub(state.idle.len())
147        };
148        self.pool_instrumentation
149            .on_event(&crate::Event::PoolAcquireStart { pending });
150        let started = std::time::Instant::now();
151        let res = self.acquire_inner().await;
152        let wait = started.elapsed();
153        let outcome = match &res {
154            Ok(_) => crate::AcquireOutcome::Ok,
155            Err(crate::Error::Pool(msg)) if msg.contains("timeout") => {
156                crate::AcquireOutcome::Timeout
157            }
158            Err(crate::Error::Pool(msg)) if msg.contains("closed") => {
159                crate::AcquireOutcome::PoolClosed
160            }
161            Err(_) => crate::AcquireOutcome::Error,
162        };
163        self.pool_instrumentation
164            .on_event(&crate::Event::PoolAcquireFinish { wait, outcome });
165        res
166    }
167
168    async fn acquire_inner(&self) -> Result<PooledConnection> {
169        let permit = tokio::time::timeout(
170            self.shared.pool_config.acquire_timeout,
171            self.shared.semaphore.acquire(),
172        )
173        .await
174        .map_err(|_| Error::Pool("acquire timeout: pool exhausted".into()))?
175        .map_err(|_| Error::Pool("pool closed".into()))?;
176
177        // Release semaphore permit immediately — we track count ourselves.
178        // The semaphore just rate-limits concurrent acquires.
179        drop(permit);
180
181        // Try to get an idle connection
182        let idle_conn = {
183            let mut state = self.shared.state.lock().await;
184            state.idle.pop_front()
185        };
186
187        if let Some(idle) = idle_conn {
188            if self.is_fresh(&idle.meta) {
189                let mut conn = idle.conn;
190                // If Query strategy, verify connection is alive
191                if self.shared.pool_config.health_check == HealthCheckStrategy::Query
192                    && !health::check_alive(conn.pg_connection_mut()).await
193                {
194                    debug!("idle connection failed health check, creating new one");
195                    self.decrement_count().await;
196                    let (conn, meta) = self.create_connection().await?;
197                    return Ok(self.wrap(conn, meta));
198                }
199
200                // Run before_acquire callback
201                if let Some(ref cb) = self.shared.pool_config.before_acquire {
202                    match cb(&mut conn).await {
203                        Ok(true) => { /* connection accepted */ }
204                        Ok(false) => {
205                            debug!("before_acquire rejected connection");
206                            self.decrement_count().await;
207                            let (conn, meta) = self.create_connection().await?;
208                            return Ok(self.wrap(conn, meta));
209                        }
210                        Err(_) => {
211                            debug!("before_acquire callback error, discarding connection");
212                            self.decrement_count().await;
213                            let (conn, meta) = self.create_connection().await?;
214                            return Ok(self.wrap(conn, meta));
215                        }
216                    }
217                }
218
219                debug!("reusing idle connection");
220                Ok(self.wrap(conn, idle.meta))
221            } else {
222                debug!("idle connection expired, creating new one");
223                self.decrement_count().await;
224                let (conn, meta) = self.create_connection().await?;
225                Ok(self.wrap(conn, meta))
226            }
227        } else {
228            let (conn, meta) = self.create_connection().await?;
229            Ok(self.wrap(conn, meta))
230        }
231    }
232
233    /// Number of idle connections.
234    pub async fn idle_count(&self) -> usize {
235        self.shared.state.lock().await.idle.len()
236    }
237
238    /// Total number of connections (idle + in use).
239    pub async fn total_count(&self) -> usize {
240        self.shared.state.lock().await.total_count
241    }
242
243    /// Maximum number of connections allowed.
244    pub fn max_connections(&self) -> usize {
245        self.shared.pool_config.max_connections
246    }
247
248    /// Get a snapshot of pool metrics.
249    pub async fn metrics(&self) -> PoolMetrics {
250        let state = self.shared.state.lock().await;
251        let idle = state.idle.len();
252        let total = state.total_count;
253        PoolMetrics {
254            active: total.saturating_sub(idle),
255            idle,
256            total,
257            max: self.shared.pool_config.max_connections,
258        }
259    }
260
261    // ── Internal ─────────────────────────────────────
262
263    /// Wrap a freshly-acquired `Connection` into a `PooledConnection`,
264    /// propagating the pool's instrumentation to the connection before
265    /// returning it to the caller.
266    fn wrap(&self, mut conn: Connection, meta: ConnectionMeta) -> PooledConnection {
267        conn.set_instrumentation(self.pool_instrumentation.clone());
268        PooledConnection {
269            conn: Some(conn),
270            meta,
271            shared: Arc::clone(&self.shared),
272            pool_instrumentation: self.pool_instrumentation.clone(),
273        }
274    }
275
276    async fn create_connection(&self) -> Result<(Connection, ConnectionMeta)> {
277        let mut conn = Connection::connect(self.shared.config.clone()).await?;
278
279        // Run after_connect callback
280        if let Some(ref cb) = self.shared.pool_config.after_connect {
281            if let Err(e) = cb(&mut conn).await {
282                debug!(?e, "after_connect callback failed, discarding connection");
283                return Err(e);
284            }
285        }
286
287        let meta = ConnectionMeta::new();
288
289        let mut state = self.shared.state.lock().await;
290        state.total_count += 1;
291        debug!(total = state.total_count, "created new connection");
292
293        Ok((conn, meta))
294    }
295
296    async fn decrement_count(&self) {
297        let mut state = self.shared.state.lock().await;
298        state.total_count = state.total_count.saturating_sub(1);
299    }
300
301    fn is_fresh(&self, meta: &ConnectionMeta) -> bool {
302        if meta.is_broken {
303            return false;
304        }
305
306        if let Some(timeout) = self.shared.pool_config.idle_timeout {
307            if meta.is_idle_expired(timeout) {
308                return false;
309            }
310        }
311
312        if let Some(lifetime) = self.shared.pool_config.max_lifetime {
313            if meta.is_lifetime_expired(lifetime) {
314                return false;
315            }
316        }
317
318        true
319    }
320}
321
322/// A connection checked out from the pool.
323///
324/// When dropped, the connection is automatically returned to the pool
325/// (unless it has been marked as broken). The `after_release` callback
326/// runs before the connection re-enters the idle queue.
327pub struct PooledConnection {
328    conn: Option<Connection>,
329    meta: ConnectionMeta,
330    shared: Arc<PoolShared>,
331    pool_instrumentation: Arc<dyn crate::Instrumentation>,
332}
333
334impl PooledConnection {
335    /// Mark this connection as broken. It will be discarded on drop
336    /// instead of being returned to the pool.
337    pub fn mark_broken(&mut self) {
338        self.meta.is_broken = true;
339    }
340}
341
342impl Deref for PooledConnection {
343    type Target = Connection;
344
345    #[allow(clippy::expect_used)]
346    fn deref(&self) -> &Self::Target {
347        self.conn
348            .as_ref()
349            .expect("PooledConnection used after drop")
350    }
351}
352
353impl DerefMut for PooledConnection {
354    #[allow(clippy::expect_used)]
355    fn deref_mut(&mut self) -> &mut Self::Target {
356        self.conn
357            .as_mut()
358            .expect("PooledConnection used after drop")
359    }
360}
361
362impl Drop for PooledConnection {
363    fn drop(&mut self) {
364        if let Some(conn) = self.conn.take() {
365            // Emit PoolRelease synchronously; the rest happens async.
366            self.pool_instrumentation
367                .on_event(&crate::Event::PoolRelease);
368
369            let shared = Arc::clone(&self.shared);
370
371            if self.meta.is_broken {
372                tokio::spawn(async move {
373                    drop(conn);
374                    let mut state = shared.state.lock().await;
375                    state.total_count = state.total_count.saturating_sub(1);
376                    debug!("discarded broken connection");
377                });
378            } else {
379                let created_at = self.meta.created_at;
380                let after_release = self.shared.pool_config.after_release.clone();
381
382                tokio::spawn(async move {
383                    let mut conn = conn;
384
385                    // Run after_release callback
386                    if let Some(cb) = after_release {
387                        match cb(&mut conn).await {
388                            Ok(true) => { /* return to pool */ }
389                            Ok(false) => {
390                                debug!("after_release rejected connection, discarding");
391                                let mut state = shared.state.lock().await;
392                                state.total_count = state.total_count.saturating_sub(1);
393                                return;
394                            }
395                            Err(_) => {
396                                debug!("after_release callback error, discarding connection");
397                                let mut state = shared.state.lock().await;
398                                state.total_count = state.total_count.saturating_sub(1);
399                                return;
400                            }
401                        }
402                    }
403
404                    let mut meta = ConnectionMeta::new();
405                    meta.created_at = created_at;
406                    meta.touch();
407
408                    let mut state = shared.state.lock().await;
409                    state.idle.push_back(IdleConnection { conn, meta });
410                });
411            }
412        }
413    }
414}