Skip to main content

mcp_postgres/
pool.rs

1//! PostgreSQL connection pool — lock-free implementation.
2//!
3//! Uses `LockFreePool<tokio_postgres::Client>` internally.  No mutexes,
4//! no semaphores, no kernel transitions on the hot path — only CAS loops
5//! on `crossbeam::queue::ArrayQueue` and atomic size tracking.
6//!
7//! The `acquire()` method returns a `PooledConnection` which auto-returns
8//! to the pool on `Drop`.  There is no explicit `release()` needed.
9
10use std::time::Duration;
11use tokio_postgres::{Client, NoTls};
12use tracing::debug;
13
14use crate::config::PoolConfig;
15use crate::errors::{MCPError, Result as MCPResult};
16use crate::lockfree_pool::{BoxFuture, CreateFn, LockFreePool, PoolConfig as LFPoolConfig, PooledConnection, PoolError, ValidateFn};
17
18/// Wrapper around the lock-free connection pool.
19pub struct ConnectionPool {
20    inner: LockFreePool<Client>,
21    max_size: u32,
22}
23
24impl ConnectionPool {
25    pub async fn new(connection_string: &str, config: PoolConfig) -> anyhow::Result<Self> {
26        debug!("Creating lock-free connection pool: max_size={}", config.max_size);
27
28        let conn_string = connection_string.to_string();
29        let create_timeout = Duration::from_secs(5);
30
31        let create = {
32            let cs = conn_string.clone();
33            Box::new(move || {
34                let cs = cs.clone();
35                Box::pin(async move {
36                    let (client, connection) = tokio_postgres::connect(&cs, NoTls)
37                        .await
38                        .map_err(|e| e.to_string())?;
39                    tokio::spawn(connection);
40                    Ok(client)
41                }) as BoxFuture<'static, Result<Client, String>>
42            }) as CreateFn<Client>
43        };
44
45        let validate = Box::new(|client: &Client| {
46            !client.is_closed()
47        }) as ValidateFn<Client>;
48
49        let lf_config = LFPoolConfig {
50            max_size: config.max_size,
51            create_timeout,
52            wait_timeout: config.queue_timeout,
53        };
54
55        let pool = LockFreePool::new(create, validate, lf_config);
56
57        // Test the pool by acquiring a connection
58        let test_conn = pool.acquire().await.map_err(|e| {
59            anyhow::anyhow!("Failed to establish database connection: {e}")
60        })?;
61        drop(test_conn);
62
63        Ok(Self {
64            inner: pool,
65            max_size: config.max_size,
66        })
67    }
68
69    /// Acquire a connection from the pool.
70    ///
71    /// Returns a `PooledConnection<Client>` which implements `Deref<Target = Client>`
72    /// and automatically returns to the pool when dropped.
73    pub async fn acquire(&self) -> MCPResult<PooledConnection<Client>> {
74        self.inner.acquire().await.map_err(|e| match e {
75            PoolError::Timeout => {
76                MCPError::PoolError("Connection pool timeout: no connection available".into())
77            }
78            PoolError::Closed => {
79                MCPError::PoolError("Connection pool is closed".into())
80            }
81            PoolError::CreateFailed(msg) => {
82                MCPError::PoolError(format!("Failed to create connection: {msg}"))
83            }
84        })
85    }
86
87    /// Release a connection back to the pool.
88    ///
89    /// With `PooledConnection`, this is automatic on `Drop`.  This method
90    /// exists for backward compatibility with existing callers.
91    pub fn release(&self, _conn: PooledConnection<Client>) {
92        // Connection auto-returns to pool on Drop
93    }
94
95    pub fn active_count(&self) -> u32 {
96        self.inner.status().size
97    }
98
99    pub fn max_size(&self) -> u32 {
100        self.max_size
101    }
102
103    pub fn is_closed(&self) -> bool {
104        self.inner.is_closed()
105    }
106
107    /// Close the pool, dropping all idle connections.
108    pub fn close(&self) {
109        self.inner.close();
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use tokio::time::sleep;
117
118    #[test]
119    fn test_config() {
120        let cfg = PoolConfig {
121            min_size: 2,
122            max_size: 10,
123            queue_timeout: Duration::from_secs(10),
124        };
125        assert!(cfg.max_size >= cfg.min_size);
126    }
127
128    #[tokio::test]
129    async fn test_pool_create_and_acquire() {
130        // This test requires a real PostgreSQL instance.
131        // It's a no-op if DATABASE_URL is not set.
132        if std::env::var("DATABASE_URL").is_err() && std::env::var("PGHOST").is_err() {
133            eprintln!("Skipping: no database available");
134            return;
135        }
136        let url = std::env::var("DATABASE_URL")
137            .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/postgres".to_string());
138        let config = PoolConfig {
139            min_size: 1,
140            max_size: 5,
141            queue_timeout: Duration::from_secs(5),
142        };
143        let pool = ConnectionPool::new(&url, config).await.unwrap();
144        assert_eq!(pool.max_size(), 5);
145        let conn = pool.acquire().await.unwrap();
146        assert!(!conn.is_closed());
147        pool.release(conn);
148        sleep(Duration::from_millis(50)).await;
149        assert!(pool.active_count() > 0);
150    }
151}