Skip to main content

mcp_postgres/
pool.rs

1use anyhow::Result;
2use deadpool_postgres::{Pool, Config as DeadpoolConfig, PoolConfig as DeadpoolPoolConfig, Runtime, Object};
3use tokio_postgres::NoTls;
4use tracing::debug;
5use std::sync::Arc;
6
7use crate::config::PoolConfig;
8use crate::errors::{MCPError, Result as MCPResult};
9
10/// Connection pool wrapper using deadpool-postgres
11pub struct ConnectionPool {
12    pool: Pool,
13    max_size: u32,
14}
15
16impl ConnectionPool {
17    pub async fn new(connection_string: &str, config: PoolConfig) -> Result<Self> {
18        debug!("Creating connection pool with config: {:?}", config);
19
20        let cfg = DeadpoolConfig {
21            url: Some(connection_string.to_string()),
22            pool: Some(DeadpoolPoolConfig {
23                max_size: config.max_size as usize,
24                timeouts: deadpool_postgres::Timeouts {
25                    wait: Some(config.queue_timeout),
26                    create: Some(std::time::Duration::from_secs(5)),
27                    recycle: Some(std::time::Duration::from_secs(300)), // Recycle connections after 5 minutes
28                },
29                queue_mode: deadpool::managed::QueueMode::Lifo,
30            }),
31            ..Default::default()
32        };
33
34        let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls)?;
35
36        // Test the pool by acquiring a connection
37        let _conn = pool.get().await
38            .map_err(|e| anyhow::anyhow!("Failed to establish database connection: {}", e))?;
39
40        Ok(Self {
41            pool,
42            max_size: config.max_size,
43        })
44    }
45
46    /// Acquire a connection from the pool
47    /// Returns Arc<Object> which dereferences to Client
48    pub async fn acquire(&self) -> MCPResult<Arc<Object>> {
49        self.pool
50            .get()
51            .await
52            .map(Arc::new)
53            .map_err(|_| MCPError::PoolError("Connection pool exhausted".into()))
54    }
55
56    /// Release a connection back to the pool (handled automatically by deadpool)
57    pub fn release(&self, _conn: Arc<Object>) {
58        // deadpool automatically returns connections to the pool
59    }
60
61    pub fn active_count(&self) -> u32 {
62        self.pool.status().size as u32
63    }
64
65    pub fn max_size(&self) -> u32 {
66        self.max_size
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use super::*;
73    use std::time::Duration;
74
75    #[test]
76    fn test_config() {
77        let cfg = PoolConfig {
78            min_size: 2,
79            max_size: 10,
80            queue_timeout: Duration::from_secs(10),
81        };
82        assert!(cfg.max_size >= cfg.min_size);
83    }
84}