1use std::sync::Arc;
4use std::time::Duration;
5
6use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod, Runtime};
7use tokio_postgres::NoTls;
8use tracing::{debug, info};
9
10use crate::config::PgConfig;
11use crate::connection::PgConnection;
12use crate::error::{PgError, PgResult};
13use crate::statement::PreparedStatementCache;
14
15#[derive(Clone)]
17pub struct PgPool {
18 inner: Pool,
19 config: Arc<PgConfig>,
20 statement_cache: Arc<PreparedStatementCache>,
21}
22
23impl PgPool {
24 pub async fn new(config: PgConfig) -> PgResult<Self> {
26 Self::with_pool_config(config, PoolConfig::default()).await
27 }
28
29 pub async fn with_pool_config(config: PgConfig, pool_config: PoolConfig) -> PgResult<Self> {
31 let pg_config = config.to_pg_config();
32
33 let mgr_config = ManagerConfig {
34 recycling_method: RecyclingMethod::Fast,
35 };
36
37 let mgr = Manager::from_config(pg_config, NoTls, mgr_config);
38
39 let mut builder = Pool::builder(mgr).max_size(pool_config.max_connections);
41
42 if let Some(timeout) = pool_config.connection_timeout {
44 builder = builder
45 .wait_timeout(Some(timeout))
46 .create_timeout(Some(timeout));
47 }
48 if let Some(timeout) = pool_config.idle_timeout {
49 builder = builder.recycle_timeout(Some(timeout));
50 }
51
52 builder = builder.runtime(Runtime::Tokio1);
54
55 let pool = builder
56 .build()
57 .map_err(|e| PgError::config(format!("failed to create pool: {}", e)))?;
58
59 info!(
60 host = %config.host,
61 port = %config.port,
62 database = %config.database,
63 max_connections = %pool_config.max_connections,
64 "PostgreSQL connection pool created"
65 );
66
67 Ok(Self {
68 inner: pool,
69 config: Arc::new(config),
70 statement_cache: Arc::new(PreparedStatementCache::new(
71 pool_config.statement_cache_size,
72 )),
73 })
74 }
75
76 pub async fn get(&self) -> PgResult<PgConnection> {
78 debug!("Acquiring connection from pool");
79 let client = self.inner.get().await?;
80 Ok(PgConnection::new(client, self.statement_cache.clone()))
81 }
82
83 pub(crate) fn inner(&self) -> &Pool {
91 &self.inner
92 }
93
94 pub fn status(&self) -> PoolStatus {
96 let status = self.inner.status();
97 PoolStatus {
98 available: status.available,
99 size: status.size,
100 max_size: status.max_size,
101 waiting: status.waiting,
102 }
103 }
104
105 pub fn config(&self) -> &PgConfig {
107 &self.config
108 }
109
110 pub async fn is_healthy(&self) -> bool {
112 match self.inner.get().await {
113 Ok(client) => {
114 client.query_one("SELECT 1", &[]).await.is_ok()
116 }
117 Err(_) => false,
118 }
119 }
120
121 pub fn close(&self) {
123 self.inner.close();
124 info!("PostgreSQL connection pool closed");
125 }
126
127 pub fn builder() -> PgPoolBuilder {
129 PgPoolBuilder::new()
130 }
131
132 pub async fn warmup(&self, count: usize) -> PgResult<()> {
150 info!(count = count, "Warming up connection pool");
151
152 let count = count.min(self.inner.status().max_size);
153 let mut connections = Vec::with_capacity(count);
154
155 for i in 0..count {
157 match self.inner.get().await {
158 Ok(conn) => {
159 if let Err(e) = conn.query_one("SELECT 1", &[]).await {
161 debug!(error = %e, "Warmup connection {} failed validation", i);
162 } else {
163 debug!("Warmup connection {} established", i);
164 connections.push(conn);
165 }
166 }
167 Err(e) => {
168 debug!(error = %e, "Failed to establish warmup connection {}", i);
169 }
170 }
171 }
172
173 let established = connections.len();
175 drop(connections);
176
177 info!(
178 established = established,
179 requested = count,
180 "Connection pool warmup complete"
181 );
182
183 Ok(())
184 }
185
186 pub async fn warmup_with_statements(&self, count: usize, statements: &[&str]) -> PgResult<()> {
191 info!(
192 count = count,
193 statements = statements.len(),
194 "Warming up connection pool with prepared statements"
195 );
196
197 let count = count.min(self.inner.status().max_size);
198 let mut connections = Vec::with_capacity(count);
199
200 for i in 0..count {
201 match self.inner.get().await {
202 Ok(conn) => {
203 for sql in statements {
205 if let Err(e) = conn.prepare_cached(sql).await {
206 debug!(error = %e, sql = %sql, "Failed to prepare statement");
207 }
208 }
209 debug!(
210 connection = i,
211 statements = statements.len(),
212 "Prepared statements on connection"
213 );
214 connections.push(conn);
215 }
216 Err(e) => {
217 debug!(error = %e, "Failed to establish warmup connection {}", i);
218 }
219 }
220 }
221
222 let established = connections.len();
223 drop(connections);
224
225 info!(
226 established = established,
227 "Connection pool warmup with statements complete"
228 );
229
230 Ok(())
231 }
232}
233
234#[derive(Debug, Clone)]
236pub struct PoolStatus {
237 pub available: usize,
239 pub size: usize,
241 pub max_size: usize,
243 pub waiting: usize,
245}
246
247#[derive(Debug, Clone)]
249pub struct PoolConfig {
250 pub max_connections: usize,
252 pub min_connections: usize,
254 pub connection_timeout: Option<Duration>,
256 pub idle_timeout: Option<Duration>,
258 pub max_lifetime: Option<Duration>,
260 pub statement_cache_size: usize,
262}
263
264impl Default for PoolConfig {
265 fn default() -> Self {
266 Self {
267 max_connections: 10,
268 min_connections: 1,
269 connection_timeout: Some(Duration::from_secs(30)),
270 idle_timeout: Some(Duration::from_secs(600)), max_lifetime: Some(Duration::from_secs(1800)), statement_cache_size: 100,
273 }
274 }
275}
276
277#[derive(Debug, Default)]
279pub struct PgPoolBuilder {
280 config: Option<PgConfig>,
281 url: Option<String>,
282 pool_config: PoolConfig,
283}
284
285impl PgPoolBuilder {
286 pub fn new() -> Self {
288 Self {
289 config: None,
290 url: None,
291 pool_config: PoolConfig::default(),
292 }
293 }
294
295 pub fn url(mut self, url: impl Into<String>) -> Self {
297 self.url = Some(url.into());
298 self
299 }
300
301 pub fn config(mut self, config: PgConfig) -> Self {
303 self.config = Some(config);
304 self
305 }
306
307 pub fn max_connections(mut self, n: usize) -> Self {
309 self.pool_config.max_connections = n;
310 self
311 }
312
313 pub fn min_connections(mut self, n: usize) -> Self {
315 self.pool_config.min_connections = n;
316 self
317 }
318
319 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
321 self.pool_config.connection_timeout = Some(timeout);
322 self
323 }
324
325 pub fn idle_timeout(mut self, timeout: Duration) -> Self {
327 self.pool_config.idle_timeout = Some(timeout);
328 self
329 }
330
331 pub fn max_lifetime(mut self, lifetime: Duration) -> Self {
333 self.pool_config.max_lifetime = Some(lifetime);
334 self
335 }
336
337 pub fn statement_cache_size(mut self, size: usize) -> Self {
339 self.pool_config.statement_cache_size = size;
340 self
341 }
342
343 pub async fn build(self) -> PgResult<PgPool> {
345 let config = if let Some(config) = self.config {
346 config
347 } else if let Some(url) = self.url {
348 PgConfig::from_url(url)?
349 } else {
350 return Err(PgError::config("no database URL or config provided"));
351 };
352
353 PgPool::with_pool_config(config, self.pool_config).await
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360
361 #[test]
362 fn test_pool_config_default() {
363 let config = PoolConfig::default();
364 assert_eq!(config.max_connections, 10);
365 assert_eq!(config.min_connections, 1);
366 assert_eq!(config.statement_cache_size, 100);
367 }
368
369 #[test]
370 fn test_pool_builder() {
371 let builder = PgPoolBuilder::new()
372 .url("postgresql://localhost/test")
373 .max_connections(20)
374 .statement_cache_size(200);
375
376 assert!(builder.url.is_some());
377 assert_eq!(builder.pool_config.max_connections, 20);
378 assert_eq!(builder.pool_config.statement_cache_size, 200);
379 }
380}