Skip to main content

mcp_postgres/
pool.rs

1//! PostgreSQL connection pool — lock-free implementation.
2//!
3//! Uses `LockFreePool<tokio_postgres::Client>` internally.  No mutexes,
4//! no semaphores, no kernel transitions on the hot path — only CAS loops
5//! on `crossbeam::queue::ArrayQueue` and atomic size tracking.
6//!
7//! The `acquire()` method returns a `PooledConnection` which auto-returns
8//! to the pool on `Drop`.  There is no explicit `release()` needed.
9
10use std::time::Duration;
11use tokio_postgres::{Client, NoTls};
12use tracing::debug;
13
14use crate::config::PoolConfig;
15use crate::errors::{MCPError, Result as MCPResult};
16use crate::lockfree_pool::{
17    BoxFuture, CreateFn, LockFreePool, PoolConfig as LFPoolConfig, PoolError, PooledConnection,
18    ValidateFn,
19};
20
21/// Wrapper around the lock-free connection pool.
22pub struct ConnectionPool {
23    inner: LockFreePool<Client>,
24    max_size: u32,
25}
26
27impl ConnectionPool {
28    pub async fn new(connection_string: &str, config: PoolConfig) -> anyhow::Result<Self> {
29        debug!(
30            "Creating lock-free connection pool: max_size={}",
31            config.max_size
32        );
33
34        let conn_string = connection_string.to_string();
35        let create_timeout = Duration::from_secs(5);
36
37        let create = {
38            let cs = conn_string.clone();
39            Box::new(move || {
40                let cs = cs.clone();
41                Box::pin(async move {
42                    let (client, connection) = tokio_postgres::connect(&cs, NoTls)
43                        .await
44                        .map_err(|e| e.to_string())?;
45                    tokio::spawn(connection);
46                    Ok(client)
47                }) as BoxFuture<'static, Result<Client, String>>
48            }) as CreateFn<Client>
49        };
50
51        let validate = Box::new(|client: &Client| !client.is_closed()) as ValidateFn<Client>;
52
53        let lf_config = LFPoolConfig {
54            max_size: config.max_size,
55            create_timeout,
56            wait_timeout: config.queue_timeout,
57        };
58
59        let pool = LockFreePool::new(create, validate, &lf_config);
60
61        // Test the pool by acquiring a connection
62        let test_conn = pool
63            .acquire()
64            .await
65            .map_err(|e| anyhow::anyhow!("Failed to establish database connection: {e}"))?;
66        drop(test_conn);
67
68        Ok(Self {
69            inner: pool,
70            max_size: config.max_size,
71        })
72    }
73
74    /// Acquire a connection from the pool.
75    ///
76    /// Returns a `PooledConnection<Client>` which implements `Deref<Target = Client>`
77    /// and automatically returns to the pool when dropped.
78    pub async fn acquire(&self) -> MCPResult<PooledConnection<Client>> {
79        self.inner.acquire().await.map_err(|e| match e {
80            PoolError::Timeout => {
81                MCPError::PoolError("Connection pool timeout: no connection available".into())
82            }
83            PoolError::Closed => MCPError::PoolError("Connection pool is closed".into()),
84            PoolError::CreateFailed(msg) => {
85                MCPError::PoolError(format!("Failed to create connection: {msg}"))
86            }
87        })
88    }
89
90    /// Release a connection back to the pool.
91    ///
92    /// With `PooledConnection`, this is automatic on `Drop`.  This method
93    /// exists for backward compatibility with existing callers.
94    pub fn release(&self, _conn: PooledConnection<Client>) {
95        // Connection auto-returns to pool on Drop
96    }
97
98    pub fn active_count(&self) -> u32 {
99        self.inner.status().size
100    }
101
102    pub const fn max_size(&self) -> u32 {
103        self.max_size
104    }
105
106    pub fn is_closed(&self) -> bool {
107        self.inner.is_closed()
108    }
109
110    /// Close the pool, dropping all idle connections.
111    pub fn close(&self) {
112        self.inner.close();
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use tokio::time::sleep;
120
121    #[test]
122    fn test_config() {
123        let cfg = PoolConfig {
124            min_size: 2,
125            max_size: 10,
126            queue_timeout: Duration::from_secs(10),
127        };
128        assert!(cfg.max_size >= cfg.min_size);
129    }
130
131    #[tokio::test]
132    async fn test_pool_create_and_acquire() {
133        // This test requires a real PostgreSQL instance.
134        // It's a no-op if DATABASE_URL is not set.
135        if std::env::var("DATABASE_URL").is_err() && std::env::var("PGHOST").is_err() {
136            eprintln!("Skipping: no database available");
137            return;
138        }
139        let url = std::env::var("DATABASE_URL")
140            .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/postgres".to_string());
141        let config = PoolConfig {
142            min_size: 1,
143            max_size: 5,
144            queue_timeout: Duration::from_secs(5),
145        };
146        let pool = ConnectionPool::new(&url, config).await.unwrap();
147        assert_eq!(pool.max_size(), 5);
148        let conn = pool.acquire().await.unwrap();
149        assert!(!conn.is_closed());
150        pool.release(conn);
151        sleep(Duration::from_millis(50)).await;
152        assert!(pool.active_count() > 0);
153    }
154}