Skip to main content

prax_postgres/
pool.rs

1//! Connection pool for PostgreSQL.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod, Runtime};
7use tokio_postgres::NoTls;
8use tracing::{debug, info};
9
10use crate::config::PgConfig;
11use crate::connection::PgConnection;
12use crate::error::{PgError, PgResult};
13use crate::statement::PreparedStatementCache;
14
15/// A connection pool for PostgreSQL.
16#[derive(Clone)]
17pub struct PgPool {
18    inner: Pool,
19    config: Arc<PgConfig>,
20    statement_cache: Arc<PreparedStatementCache>,
21}
22
23impl PgPool {
24    /// Create a new connection pool from configuration.
25    pub async fn new(config: PgConfig) -> PgResult<Self> {
26        Self::with_pool_config(config, PoolConfig::default()).await
27    }
28
29    /// Create a new connection pool with custom pool configuration.
30    pub async fn with_pool_config(config: PgConfig, pool_config: PoolConfig) -> PgResult<Self> {
31        let pg_config = config.to_pg_config();
32
33        let mgr_config = ManagerConfig {
34            recycling_method: RecyclingMethod::Fast,
35        };
36
37        let mgr = Manager::from_config(pg_config, NoTls, mgr_config);
38
39        // Build pool - set runtime to tokio for timeout support
40        let mut builder = Pool::builder(mgr).max_size(pool_config.max_connections);
41
42        // Only set timeouts if they are configured
43        if let Some(timeout) = pool_config.connection_timeout {
44            builder = builder
45                .wait_timeout(Some(timeout))
46                .create_timeout(Some(timeout));
47        }
48        if let Some(timeout) = pool_config.idle_timeout {
49            builder = builder.recycle_timeout(Some(timeout));
50        }
51
52        // Set runtime for timeout support
53        builder = builder.runtime(Runtime::Tokio1);
54
55        let pool = builder
56            .build()
57            .map_err(|e| PgError::config(format!("failed to create pool: {}", e)))?;
58
59        info!(
60            host = %config.host,
61            port = %config.port,
62            database = %config.database,
63            max_connections = %pool_config.max_connections,
64            "PostgreSQL connection pool created"
65        );
66
67        Ok(Self {
68            inner: pool,
69            config: Arc::new(config),
70            statement_cache: Arc::new(PreparedStatementCache::new(
71                pool_config.statement_cache_size,
72            )),
73        })
74    }
75
76    /// Get a connection from the pool.
77    pub async fn get(&self) -> PgResult<PgConnection> {
78        debug!("Acquiring connection from pool");
79        let client = self.inner.get().await?;
80        Ok(PgConnection::new(client, self.statement_cache.clone()))
81    }
82
83    /// Borrow the underlying `deadpool_postgres::Pool`.
84    ///
85    /// Reserved for intra-crate paths that need a raw `Object` (e.g.
86    /// [`crate::engine::PgEngine::transaction`], which pins a single
87    /// connection for the lifetime of an in-flight transaction). The
88    /// standard path is [`PgPool::get`], which returns a
89    /// cache-wrapped [`PgConnection`].
90    pub(crate) fn inner(&self) -> &Pool {
91        &self.inner
92    }
93
94    /// Get the current pool status.
95    pub fn status(&self) -> PoolStatus {
96        let status = self.inner.status();
97        PoolStatus {
98            available: status.available,
99            size: status.size,
100            max_size: status.max_size,
101            waiting: status.waiting,
102        }
103    }
104
105    /// Get the pool configuration.
106    pub fn config(&self) -> &PgConfig {
107        &self.config
108    }
109
110    /// Check if the pool is healthy by attempting to get a connection.
111    pub async fn is_healthy(&self) -> bool {
112        match self.inner.get().await {
113            Ok(client) => {
114                // Try a simple query to verify the connection is actually working
115                client.query_one("SELECT 1", &[]).await.is_ok()
116            }
117            Err(_) => false,
118        }
119    }
120
121    /// Close the pool and all connections.
122    pub fn close(&self) {
123        self.inner.close();
124        info!("PostgreSQL connection pool closed");
125    }
126
127    /// Create a builder for configuring the pool.
128    pub fn builder() -> PgPoolBuilder {
129        PgPoolBuilder::new()
130    }
131
132    /// Warm up the connection pool by pre-establishing connections.
133    ///
134    /// This eliminates the latency of establishing connections on the first queries.
135    /// The `count` parameter specifies how many connections to pre-establish.
136    ///
137    /// # Example
138    ///
139    /// ```rust,ignore
140    /// let pool = PgPool::builder()
141    ///     .url("postgresql://localhost/db")
142    ///     .max_connections(10)
143    ///     .build()
144    ///     .await?;
145    ///
146    /// // Pre-establish 5 connections
147    /// pool.warmup(5).await?;
148    /// ```
149    pub async fn warmup(&self, count: usize) -> PgResult<()> {
150        info!(count = count, "Warming up connection pool");
151
152        let count = count.min(self.inner.status().max_size);
153        let mut connections = Vec::with_capacity(count);
154
155        // Acquire connections to force establishment
156        for i in 0..count {
157            match self.inner.get().await {
158                Ok(conn) => {
159                    // Validate the connection with a simple query
160                    if let Err(e) = conn.query_one("SELECT 1", &[]).await {
161                        debug!(error = %e, "Warmup connection {} failed validation", i);
162                    } else {
163                        debug!("Warmup connection {} established", i);
164                        connections.push(conn);
165                    }
166                }
167                Err(e) => {
168                    debug!(error = %e, "Failed to establish warmup connection {}", i);
169                }
170            }
171        }
172
173        // Connections are returned to pool when dropped
174        let established = connections.len();
175        drop(connections);
176
177        info!(
178            established = established,
179            requested = count,
180            "Connection pool warmup complete"
181        );
182
183        Ok(())
184    }
185
186    /// Warm up with common prepared statements.
187    ///
188    /// This pre-prepares common SQL statements on warmed connections,
189    /// eliminating the prepare latency on first use.
190    pub async fn warmup_with_statements(&self, count: usize, statements: &[&str]) -> PgResult<()> {
191        info!(
192            count = count,
193            statements = statements.len(),
194            "Warming up connection pool with prepared statements"
195        );
196
197        let count = count.min(self.inner.status().max_size);
198        let mut connections = Vec::with_capacity(count);
199
200        for i in 0..count {
201            match self.inner.get().await {
202                Ok(conn) => {
203                    // Pre-prepare all statements
204                    for sql in statements {
205                        if let Err(e) = conn.prepare_cached(sql).await {
206                            debug!(error = %e, sql = %sql, "Failed to prepare statement");
207                        }
208                    }
209                    debug!(
210                        connection = i,
211                        statements = statements.len(),
212                        "Prepared statements on connection"
213                    );
214                    connections.push(conn);
215                }
216                Err(e) => {
217                    debug!(error = %e, "Failed to establish warmup connection {}", i);
218                }
219            }
220        }
221
222        let established = connections.len();
223        drop(connections);
224
225        info!(
226            established = established,
227            "Connection pool warmup with statements complete"
228        );
229
230        Ok(())
231    }
232}
233
234/// Pool status information.
235#[derive(Debug, Clone)]
236pub struct PoolStatus {
237    /// Number of available (idle) connections.
238    pub available: usize,
239    /// Current total size of the pool.
240    pub size: usize,
241    /// Maximum size of the pool.
242    pub max_size: usize,
243    /// Number of tasks waiting for a connection.
244    pub waiting: usize,
245}
246
247/// Configuration for the connection pool.
248#[derive(Debug, Clone)]
249pub struct PoolConfig {
250    /// Maximum number of connections in the pool.
251    pub max_connections: usize,
252    /// Minimum number of connections to keep alive.
253    pub min_connections: usize,
254    /// Maximum time to wait for a connection.
255    pub connection_timeout: Option<Duration>,
256    /// Maximum idle time before a connection is closed.
257    pub idle_timeout: Option<Duration>,
258    /// Maximum lifetime of a connection.
259    pub max_lifetime: Option<Duration>,
260    /// Size of the prepared statement cache per connection.
261    pub statement_cache_size: usize,
262}
263
264impl Default for PoolConfig {
265    fn default() -> Self {
266        Self {
267            max_connections: 10,
268            min_connections: 1,
269            connection_timeout: Some(Duration::from_secs(30)),
270            idle_timeout: Some(Duration::from_secs(600)), // 10 minutes
271            max_lifetime: Some(Duration::from_secs(1800)), // 30 minutes
272            statement_cache_size: 100,
273        }
274    }
275}
276
277/// Builder for creating a connection pool.
278#[derive(Debug, Default)]
279pub struct PgPoolBuilder {
280    config: Option<PgConfig>,
281    url: Option<String>,
282    pool_config: PoolConfig,
283}
284
285impl PgPoolBuilder {
286    /// Create a new pool builder.
287    pub fn new() -> Self {
288        Self {
289            config: None,
290            url: None,
291            pool_config: PoolConfig::default(),
292        }
293    }
294
295    /// Set the database URL.
296    pub fn url(mut self, url: impl Into<String>) -> Self {
297        self.url = Some(url.into());
298        self
299    }
300
301    /// Set the configuration.
302    pub fn config(mut self, config: PgConfig) -> Self {
303        self.config = Some(config);
304        self
305    }
306
307    /// Set the maximum number of connections.
308    pub fn max_connections(mut self, n: usize) -> Self {
309        self.pool_config.max_connections = n;
310        self
311    }
312
313    /// Set the minimum number of connections.
314    pub fn min_connections(mut self, n: usize) -> Self {
315        self.pool_config.min_connections = n;
316        self
317    }
318
319    /// Set the connection timeout.
320    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
321        self.pool_config.connection_timeout = Some(timeout);
322        self
323    }
324
325    /// Set the idle timeout.
326    pub fn idle_timeout(mut self, timeout: Duration) -> Self {
327        self.pool_config.idle_timeout = Some(timeout);
328        self
329    }
330
331    /// Set the maximum connection lifetime.
332    pub fn max_lifetime(mut self, lifetime: Duration) -> Self {
333        self.pool_config.max_lifetime = Some(lifetime);
334        self
335    }
336
337    /// Set the prepared statement cache size.
338    pub fn statement_cache_size(mut self, size: usize) -> Self {
339        self.pool_config.statement_cache_size = size;
340        self
341    }
342
343    /// Build the connection pool.
344    pub async fn build(self) -> PgResult<PgPool> {
345        let config = if let Some(config) = self.config {
346            config
347        } else if let Some(url) = self.url {
348            PgConfig::from_url(url)?
349        } else {
350            return Err(PgError::config("no database URL or config provided"));
351        };
352
353        PgPool::with_pool_config(config, self.pool_config).await
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360
361    #[test]
362    fn test_pool_config_default() {
363        let config = PoolConfig::default();
364        assert_eq!(config.max_connections, 10);
365        assert_eq!(config.min_connections, 1);
366        assert_eq!(config.statement_cache_size, 100);
367    }
368
369    #[test]
370    fn test_pool_builder() {
371        let builder = PgPoolBuilder::new()
372            .url("postgresql://localhost/test")
373            .max_connections(20)
374            .statement_cache_size(200);
375
376        assert!(builder.url.is_some());
377        assert_eq!(builder.pool_config.max_connections, 20);
378        assert_eq!(builder.pool_config.statement_cache_size, 200);
379    }
380}