use std::time::Duration;
use tokio_postgres::{Client, NoTls};
use tracing::debug;
use crate::config::PoolConfig;
use crate::errors::{MCPError, Result as MCPResult};
use crate::lockfree_pool::{
BoxFuture, CreateFn, LockFreePool, PoolConfig as LFPoolConfig, PoolError, PooledConnection,
ValidateFn,
};
pub struct ConnectionPool {
inner: LockFreePool<Client>,
max_size: u32,
}
impl ConnectionPool {
pub async fn new(connection_string: &str, config: PoolConfig) -> anyhow::Result<Self> {
debug!(
"Creating lock-free connection pool: max_size={}",
config.max_size
);
let conn_string = connection_string.to_string();
let create_timeout = Duration::from_secs(5);
let create = {
let cs = conn_string.clone();
Box::new(move || {
let cs = cs.clone();
Box::pin(async move {
let (client, connection) = tokio_postgres::connect(&cs, NoTls)
.await
.map_err(|e| e.to_string())?;
tokio::spawn(connection);
Ok(client)
}) as BoxFuture<'static, Result<Client, String>>
}) as CreateFn<Client>
};
let validate = Box::new(|client: &Client| !client.is_closed()) as ValidateFn<Client>;
let lf_config = LFPoolConfig {
max_size: config.max_size,
create_timeout,
wait_timeout: config.queue_timeout,
};
let pool = LockFreePool::new(create, validate, &lf_config);
let test_conn = pool
.acquire()
.await
.map_err(|e| anyhow::anyhow!("Failed to establish database connection: {e}"))?;
drop(test_conn);
Ok(Self {
inner: pool,
max_size: config.max_size,
})
}
pub async fn acquire(&self) -> MCPResult<PooledConnection<Client>> {
self.inner.acquire().await.map_err(|e| match e {
PoolError::Timeout => {
MCPError::PoolError("Connection pool timeout: no connection available".into())
}
PoolError::Closed => MCPError::PoolError("Connection pool is closed".into()),
PoolError::CreateFailed(msg) => {
MCPError::PoolError(format!("Failed to create connection: {msg}"))
}
})
}
pub fn release(&self, _conn: PooledConnection<Client>) {
}
pub fn active_count(&self) -> u32 {
self.inner.status().size
}
pub const fn max_size(&self) -> u32 {
self.max_size
}
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
}
pub fn close(&self) {
self.inner.close();
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::sleep;
#[test]
fn test_config() {
let cfg = PoolConfig {
min_size: 2,
max_size: 10,
queue_timeout: Duration::from_secs(10),
};
assert!(cfg.max_size >= cfg.min_size);
}
#[tokio::test]
async fn test_pool_create_and_acquire() {
if std::env::var("DATABASE_URL").is_err() && std::env::var("PGHOST").is_err() {
eprintln!("Skipping: no database available");
return;
}
let url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/postgres".to_string());
let config = PoolConfig {
min_size: 1,
max_size: 5,
queue_timeout: Duration::from_secs(5),
};
let pool = ConnectionPool::new(&url, config).await.unwrap();
assert_eq!(pool.max_size(), 5);
let conn = pool.acquire().await.unwrap();
assert!(!conn.is_closed());
pool.release(conn);
sleep(Duration::from_millis(50)).await;
assert!(pool.active_count() > 0);
}
}