1use anyhow::Result;
2use deadpool_postgres::{Pool, Config as DeadpoolConfig, PoolConfig as DeadpoolPoolConfig, Runtime, Object};
3use tokio_postgres::NoTls;
4use tracing::debug;
5use std::sync::Arc;
6
7use crate::config::PoolConfig;
8use crate::errors::{MCPError, Result as MCPResult};
9
10pub struct ConnectionPool {
12 pool: Pool,
13 max_size: u32,
14}
15
16impl ConnectionPool {
17 pub async fn new(connection_string: &str, config: PoolConfig) -> Result<Self> {
18 debug!("Creating connection pool with config: {:?}", config);
19
20 let cfg = DeadpoolConfig {
21 url: Some(connection_string.to_string()),
22 pool: Some(DeadpoolPoolConfig {
23 max_size: config.max_size as usize,
24 timeouts: deadpool_postgres::Timeouts {
25 wait: Some(config.queue_timeout),
26 create: Some(std::time::Duration::from_secs(5)),
27 recycle: Some(std::time::Duration::from_secs(300)), },
29 queue_mode: deadpool::managed::QueueMode::Lifo,
30 }),
31 ..Default::default()
32 };
33
34 let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls)?;
35
36 let _conn = pool.get().await
38 .map_err(|e| anyhow::anyhow!("Failed to establish database connection: {}", e))?;
39
40 Ok(Self {
41 pool,
42 max_size: config.max_size,
43 })
44 }
45
46 pub async fn acquire(&self) -> MCPResult<Arc<Object>> {
49 self.pool
50 .get()
51 .await
52 .map(Arc::new)
53 .map_err(|_| MCPError::PoolError("Connection pool exhausted".into()))
54 }
55
56 pub fn release(&self, _conn: Arc<Object>) {
58 }
60
61 pub fn active_count(&self) -> u32 {
62 self.pool.status().size as u32
63 }
64
65 pub fn max_size(&self) -> u32 {
66 self.max_size
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use super::*;
73 use std::time::Duration;
74
75 #[test]
76 fn test_config() {
77 let cfg = PoolConfig {
78 min_size: 2,
79 max_size: 10,
80 queue_timeout: Duration::from_secs(10),
81 };
82 assert!(cfg.max_size >= cfg.min_size);
83 }
84}