1use std::sync::Arc;
4use std::time::Duration;
5
6use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
7use tokio_postgres::NoTls;
8use tracing::{debug, info};
9
10use crate::config::PgConfig;
11use crate::connection::PgConnection;
12use crate::error::{PgError, PgResult};
13use crate::statement::PreparedStatementCache;
14
15#[derive(Clone)]
17pub struct PgPool {
18 inner: Pool,
19 config: Arc<PgConfig>,
20 statement_cache: Arc<PreparedStatementCache>,
21}
22
23impl PgPool {
24 pub async fn new(config: PgConfig) -> PgResult<Self> {
26 Self::with_pool_config(config, PoolConfig::default()).await
27 }
28
29 pub async fn with_pool_config(config: PgConfig, pool_config: PoolConfig) -> PgResult<Self> {
31 let pg_config = config.to_pg_config();
32
33 let mgr_config = ManagerConfig {
34 recycling_method: RecyclingMethod::Fast,
35 };
36
37 let mgr = Manager::from_config(pg_config, NoTls, mgr_config);
38
39 let pool = Pool::builder(mgr)
40 .max_size(pool_config.max_connections)
41 .wait_timeout(pool_config.connection_timeout)
42 .create_timeout(pool_config.connection_timeout)
43 .recycle_timeout(pool_config.idle_timeout)
44 .build()
45 .map_err(|e| PgError::config(format!("failed to create pool: {}", e)))?;
46
47 info!(
48 host = %config.host,
49 port = %config.port,
50 database = %config.database,
51 max_connections = %pool_config.max_connections,
52 "PostgreSQL connection pool created"
53 );
54
55 Ok(Self {
56 inner: pool,
57 config: Arc::new(config),
58 statement_cache: Arc::new(PreparedStatementCache::new(
59 pool_config.statement_cache_size,
60 )),
61 })
62 }
63
64 pub async fn get(&self) -> PgResult<PgConnection> {
66 debug!("Acquiring connection from pool");
67 let client = self.inner.get().await?;
68 Ok(PgConnection::new(client, self.statement_cache.clone()))
69 }
70
71 pub fn status(&self) -> PoolStatus {
73 let status = self.inner.status();
74 PoolStatus {
75 available: status.available,
76 size: status.size,
77 max_size: status.max_size,
78 waiting: status.waiting,
79 }
80 }
81
82 pub fn config(&self) -> &PgConfig {
84 &self.config
85 }
86
87 pub async fn is_healthy(&self) -> bool {
89 match self.inner.get().await {
90 Ok(client) => {
91 client.query_one("SELECT 1", &[]).await.is_ok()
93 }
94 Err(_) => false,
95 }
96 }
97
98 pub fn close(&self) {
100 self.inner.close();
101 info!("PostgreSQL connection pool closed");
102 }
103
104 pub fn builder() -> PgPoolBuilder {
106 PgPoolBuilder::new()
107 }
108
109 pub async fn warmup(&self, count: usize) -> PgResult<()> {
127 info!(count = count, "Warming up connection pool");
128
129 let count = count.min(self.inner.status().max_size);
130 let mut connections = Vec::with_capacity(count);
131
132 for i in 0..count {
134 match self.inner.get().await {
135 Ok(conn) => {
136 if let Err(e) = conn.query_one("SELECT 1", &[]).await {
138 debug!(error = %e, "Warmup connection {} failed validation", i);
139 } else {
140 debug!("Warmup connection {} established", i);
141 connections.push(conn);
142 }
143 }
144 Err(e) => {
145 debug!(error = %e, "Failed to establish warmup connection {}", i);
146 }
147 }
148 }
149
150 let established = connections.len();
152 drop(connections);
153
154 info!(
155 established = established,
156 requested = count,
157 "Connection pool warmup complete"
158 );
159
160 Ok(())
161 }
162
163 pub async fn warmup_with_statements(&self, count: usize, statements: &[&str]) -> PgResult<()> {
168 info!(
169 count = count,
170 statements = statements.len(),
171 "Warming up connection pool with prepared statements"
172 );
173
174 let count = count.min(self.inner.status().max_size);
175 let mut connections = Vec::with_capacity(count);
176
177 for i in 0..count {
178 match self.inner.get().await {
179 Ok(conn) => {
180 for sql in statements {
182 if let Err(e) = conn.prepare_cached(sql).await {
183 debug!(error = %e, sql = %sql, "Failed to prepare statement");
184 }
185 }
186 debug!(
187 connection = i,
188 statements = statements.len(),
189 "Prepared statements on connection"
190 );
191 connections.push(conn);
192 }
193 Err(e) => {
194 debug!(error = %e, "Failed to establish warmup connection {}", i);
195 }
196 }
197 }
198
199 let established = connections.len();
200 drop(connections);
201
202 info!(
203 established = established,
204 "Connection pool warmup with statements complete"
205 );
206
207 Ok(())
208 }
209}
210
211#[derive(Debug, Clone)]
213pub struct PoolStatus {
214 pub available: usize,
216 pub size: usize,
218 pub max_size: usize,
220 pub waiting: usize,
222}
223
224#[derive(Debug, Clone)]
226pub struct PoolConfig {
227 pub max_connections: usize,
229 pub min_connections: usize,
231 pub connection_timeout: Option<Duration>,
233 pub idle_timeout: Option<Duration>,
235 pub max_lifetime: Option<Duration>,
237 pub statement_cache_size: usize,
239}
240
241impl Default for PoolConfig {
242 fn default() -> Self {
243 Self {
244 max_connections: 10,
245 min_connections: 1,
246 connection_timeout: Some(Duration::from_secs(30)),
247 idle_timeout: Some(Duration::from_secs(600)), max_lifetime: Some(Duration::from_secs(1800)), statement_cache_size: 100,
250 }
251 }
252}
253
254#[derive(Debug, Default)]
256pub struct PgPoolBuilder {
257 config: Option<PgConfig>,
258 url: Option<String>,
259 pool_config: PoolConfig,
260}
261
262impl PgPoolBuilder {
263 pub fn new() -> Self {
265 Self {
266 config: None,
267 url: None,
268 pool_config: PoolConfig::default(),
269 }
270 }
271
272 pub fn url(mut self, url: impl Into<String>) -> Self {
274 self.url = Some(url.into());
275 self
276 }
277
278 pub fn config(mut self, config: PgConfig) -> Self {
280 self.config = Some(config);
281 self
282 }
283
284 pub fn max_connections(mut self, n: usize) -> Self {
286 self.pool_config.max_connections = n;
287 self
288 }
289
290 pub fn min_connections(mut self, n: usize) -> Self {
292 self.pool_config.min_connections = n;
293 self
294 }
295
296 pub fn connection_timeout(mut self, timeout: Duration) -> Self {
298 self.pool_config.connection_timeout = Some(timeout);
299 self
300 }
301
302 pub fn idle_timeout(mut self, timeout: Duration) -> Self {
304 self.pool_config.idle_timeout = Some(timeout);
305 self
306 }
307
308 pub fn max_lifetime(mut self, lifetime: Duration) -> Self {
310 self.pool_config.max_lifetime = Some(lifetime);
311 self
312 }
313
314 pub fn statement_cache_size(mut self, size: usize) -> Self {
316 self.pool_config.statement_cache_size = size;
317 self
318 }
319
320 pub async fn build(self) -> PgResult<PgPool> {
322 let config = if let Some(config) = self.config {
323 config
324 } else if let Some(url) = self.url {
325 PgConfig::from_url(url)?
326 } else {
327 return Err(PgError::config("no database URL or config provided"));
328 };
329
330 PgPool::with_pool_config(config, self.pool_config).await
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337
338 #[test]
339 fn test_pool_config_default() {
340 let config = PoolConfig::default();
341 assert_eq!(config.max_connections, 10);
342 assert_eq!(config.min_connections, 1);
343 assert_eq!(config.statement_cache_size, 100);
344 }
345
346 #[test]
347 fn test_pool_builder() {
348 let builder = PgPoolBuilder::new()
349 .url("postgresql://localhost/test")
350 .max_connections(20)
351 .statement_cache_size(200);
352
353 assert!(builder.url.is_some());
354 assert_eq!(builder.pool_config.max_connections, 20);
355 assert_eq!(builder.pool_config.statement_cache_size, 200);
356 }
357}