prax_postgres/
pool.rs

1//! Connection pool for PostgreSQL.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
7use tokio_postgres::NoTls;
8use tracing::{debug, info};
9
10use crate::config::PgConfig;
11use crate::connection::PgConnection;
12use crate::error::{PgError, PgResult};
13use crate::statement::PreparedStatementCache;
14
15/// A connection pool for PostgreSQL.
16#[derive(Clone)]
17pub struct PgPool {
18    inner: Pool,
19    config: Arc<PgConfig>,
20    statement_cache: Arc<PreparedStatementCache>,
21}
22
23impl PgPool {
24    /// Create a new connection pool from configuration.
25    pub async fn new(config: PgConfig) -> PgResult<Self> {
26        Self::with_pool_config(config, PoolConfig::default()).await
27    }
28
29    /// Create a new connection pool with custom pool configuration.
30    pub async fn with_pool_config(config: PgConfig, pool_config: PoolConfig) -> PgResult<Self> {
31        let pg_config = config.to_pg_config();
32
33        let mgr_config = ManagerConfig {
34            recycling_method: RecyclingMethod::Fast,
35        };
36
37        let mgr = Manager::from_config(pg_config, NoTls, mgr_config);
38
39        let pool = Pool::builder(mgr)
40            .max_size(pool_config.max_connections)
41            .wait_timeout(pool_config.connection_timeout)
42            .create_timeout(pool_config.connection_timeout)
43            .recycle_timeout(pool_config.idle_timeout)
44            .build()
45            .map_err(|e| PgError::config(format!("failed to create pool: {}", e)))?;
46
47        info!(
48            host = %config.host,
49            port = %config.port,
50            database = %config.database,
51            max_connections = %pool_config.max_connections,
52            "PostgreSQL connection pool created"
53        );
54
55        Ok(Self {
56            inner: pool,
57            config: Arc::new(config),
58            statement_cache: Arc::new(PreparedStatementCache::new(
59                pool_config.statement_cache_size,
60            )),
61        })
62    }
63
64    /// Get a connection from the pool.
65    pub async fn get(&self) -> PgResult<PgConnection> {
66        debug!("Acquiring connection from pool");
67        let client = self.inner.get().await?;
68        Ok(PgConnection::new(client, self.statement_cache.clone()))
69    }
70
71    /// Get the current pool status.
72    pub fn status(&self) -> PoolStatus {
73        let status = self.inner.status();
74        PoolStatus {
75            available: status.available,
76            size: status.size,
77            max_size: status.max_size,
78            waiting: status.waiting,
79        }
80    }
81
82    /// Get the pool configuration.
83    pub fn config(&self) -> &PgConfig {
84        &self.config
85    }
86
87    /// Check if the pool is healthy by attempting to get a connection.
88    pub async fn is_healthy(&self) -> bool {
89        match self.inner.get().await {
90            Ok(client) => {
91                // Try a simple query to verify the connection is actually working
92                client.query_one("SELECT 1", &[]).await.is_ok()
93            }
94            Err(_) => false,
95        }
96    }
97
98    /// Close the pool and all connections.
99    pub fn close(&self) {
100        self.inner.close();
101        info!("PostgreSQL connection pool closed");
102    }
103
104    /// Create a builder for configuring the pool.
105    pub fn builder() -> PgPoolBuilder {
106        PgPoolBuilder::new()
107    }
108
109    /// Warm up the connection pool by pre-establishing connections.
110    ///
111    /// This eliminates the latency of establishing connections on the first queries.
112    /// The `count` parameter specifies how many connections to pre-establish.
113    ///
114    /// # Example
115    ///
116    /// ```rust,ignore
117    /// let pool = PgPool::builder()
118    ///     .url("postgresql://localhost/db")
119    ///     .max_connections(10)
120    ///     .build()
121    ///     .await?;
122    ///
123    /// // Pre-establish 5 connections
124    /// pool.warmup(5).await?;
125    /// ```
126    pub async fn warmup(&self, count: usize) -> PgResult<()> {
127        info!(count = count, "Warming up connection pool");
128
129        let count = count.min(self.inner.status().max_size);
130        let mut connections = Vec::with_capacity(count);
131
132        // Acquire connections to force establishment
133        for i in 0..count {
134            match self.inner.get().await {
135                Ok(conn) => {
136                    // Validate the connection with a simple query
137                    if let Err(e) = conn.query_one("SELECT 1", &[]).await {
138                        debug!(error = %e, "Warmup connection {} failed validation", i);
139                    } else {
140                        debug!("Warmup connection {} established", i);
141                        connections.push(conn);
142                    }
143                }
144                Err(e) => {
145                    debug!(error = %e, "Failed to establish warmup connection {}", i);
146                }
147            }
148        }
149
150        // Connections are returned to pool when dropped
151        let established = connections.len();
152        drop(connections);
153
154        info!(
155            established = established,
156            requested = count,
157            "Connection pool warmup complete"
158        );
159
160        Ok(())
161    }
162
163    /// Warm up with common prepared statements.
164    ///
165    /// This pre-prepares common SQL statements on warmed connections,
166    /// eliminating the prepare latency on first use.
167    pub async fn warmup_with_statements(&self, count: usize, statements: &[&str]) -> PgResult<()> {
168        info!(
169            count = count,
170            statements = statements.len(),
171            "Warming up connection pool with prepared statements"
172        );
173
174        let count = count.min(self.inner.status().max_size);
175        let mut connections = Vec::with_capacity(count);
176
177        for i in 0..count {
178            match self.inner.get().await {
179                Ok(conn) => {
180                    // Pre-prepare all statements
181                    for sql in statements {
182                        if let Err(e) = conn.prepare_cached(sql).await {
183                            debug!(error = %e, sql = %sql, "Failed to prepare statement");
184                        }
185                    }
186                    debug!(
187                        connection = i,
188                        statements = statements.len(),
189                        "Prepared statements on connection"
190                    );
191                    connections.push(conn);
192                }
193                Err(e) => {
194                    debug!(error = %e, "Failed to establish warmup connection {}", i);
195                }
196            }
197        }
198
199        let established = connections.len();
200        drop(connections);
201
202        info!(
203            established = established,
204            "Connection pool warmup with statements complete"
205        );
206
207        Ok(())
208    }
209}
210
211/// Pool status information.
212#[derive(Debug, Clone)]
213pub struct PoolStatus {
214    /// Number of available (idle) connections.
215    pub available: usize,
216    /// Current total size of the pool.
217    pub size: usize,
218    /// Maximum size of the pool.
219    pub max_size: usize,
220    /// Number of tasks waiting for a connection.
221    pub waiting: usize,
222}
223
224/// Configuration for the connection pool.
225#[derive(Debug, Clone)]
226pub struct PoolConfig {
227    /// Maximum number of connections in the pool.
228    pub max_connections: usize,
229    /// Minimum number of connections to keep alive.
230    pub min_connections: usize,
231    /// Maximum time to wait for a connection.
232    pub connection_timeout: Option<Duration>,
233    /// Maximum idle time before a connection is closed.
234    pub idle_timeout: Option<Duration>,
235    /// Maximum lifetime of a connection.
236    pub max_lifetime: Option<Duration>,
237    /// Size of the prepared statement cache per connection.
238    pub statement_cache_size: usize,
239}
240
241impl Default for PoolConfig {
242    fn default() -> Self {
243        Self {
244            max_connections: 10,
245            min_connections: 1,
246            connection_timeout: Some(Duration::from_secs(30)),
247            idle_timeout: Some(Duration::from_secs(600)), // 10 minutes
248            max_lifetime: Some(Duration::from_secs(1800)), // 30 minutes
249            statement_cache_size: 100,
250        }
251    }
252}
253
254/// Builder for creating a connection pool.
255#[derive(Debug, Default)]
256pub struct PgPoolBuilder {
257    config: Option<PgConfig>,
258    url: Option<String>,
259    pool_config: PoolConfig,
260}
261
262impl PgPoolBuilder {
263    /// Create a new pool builder.
264    pub fn new() -> Self {
265        Self {
266            config: None,
267            url: None,
268            pool_config: PoolConfig::default(),
269        }
270    }
271
272    /// Set the database URL.
273    pub fn url(mut self, url: impl Into<String>) -> Self {
274        self.url = Some(url.into());
275        self
276    }
277
278    /// Set the configuration.
279    pub fn config(mut self, config: PgConfig) -> Self {
280        self.config = Some(config);
281        self
282    }
283
284    /// Set the maximum number of connections.
285    pub fn max_connections(mut self, n: usize) -> Self {
286        self.pool_config.max_connections = n;
287        self
288    }
289
290    /// Set the minimum number of connections.
291    pub fn min_connections(mut self, n: usize) -> Self {
292        self.pool_config.min_connections = n;
293        self
294    }
295
296    /// Set the connection timeout.
297    pub fn connection_timeout(mut self, timeout: Duration) -> Self {
298        self.pool_config.connection_timeout = Some(timeout);
299        self
300    }
301
302    /// Set the idle timeout.
303    pub fn idle_timeout(mut self, timeout: Duration) -> Self {
304        self.pool_config.idle_timeout = Some(timeout);
305        self
306    }
307
308    /// Set the maximum connection lifetime.
309    pub fn max_lifetime(mut self, lifetime: Duration) -> Self {
310        self.pool_config.max_lifetime = Some(lifetime);
311        self
312    }
313
314    /// Set the prepared statement cache size.
315    pub fn statement_cache_size(mut self, size: usize) -> Self {
316        self.pool_config.statement_cache_size = size;
317        self
318    }
319
320    /// Build the connection pool.
321    pub async fn build(self) -> PgResult<PgPool> {
322        let config = if let Some(config) = self.config {
323            config
324        } else if let Some(url) = self.url {
325            PgConfig::from_url(url)?
326        } else {
327            return Err(PgError::config("no database URL or config provided"));
328        };
329
330        PgPool::with_pool_config(config, self.pool_config).await
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337
338    #[test]
339    fn test_pool_config_default() {
340        let config = PoolConfig::default();
341        assert_eq!(config.max_connections, 10);
342        assert_eq!(config.min_connections, 1);
343        assert_eq!(config.statement_cache_size, 100);
344    }
345
346    #[test]
347    fn test_pool_builder() {
348        let builder = PgPoolBuilder::new()
349            .url("postgresql://localhost/test")
350            .max_connections(20)
351            .statement_cache_size(200);
352
353        assert!(builder.url.is_some());
354        assert_eq!(builder.pool_config.max_connections, 20);
355        assert_eq!(builder.pool_config.statement_cache_size, 200);
356    }
357}