1use std::time::Duration;
11use tokio_postgres::{Client, NoTls};
12use tracing::debug;
13
14use crate::config::PoolConfig;
15use crate::errors::{MCPError, Result as MCPResult};
16use crate::lockfree_pool::{BoxFuture, CreateFn, LockFreePool, PoolConfig as LFPoolConfig, PooledConnection, PoolError, ValidateFn};
17
18pub struct ConnectionPool {
20 inner: LockFreePool<Client>,
21 max_size: u32,
22}
23
24impl ConnectionPool {
25 pub async fn new(connection_string: &str, config: PoolConfig) -> anyhow::Result<Self> {
26 debug!("Creating lock-free connection pool: max_size={}", config.max_size);
27
28 let conn_string = connection_string.to_string();
29 let create_timeout = Duration::from_secs(5);
30
31 let create = {
32 let cs = conn_string.clone();
33 Box::new(move || {
34 let cs = cs.clone();
35 Box::pin(async move {
36 let (client, connection) = tokio_postgres::connect(&cs, NoTls)
37 .await
38 .map_err(|e| e.to_string())?;
39 tokio::spawn(connection);
40 Ok(client)
41 }) as BoxFuture<'static, Result<Client, String>>
42 }) as CreateFn<Client>
43 };
44
45 let validate = Box::new(|client: &Client| {
46 !client.is_closed()
47 }) as ValidateFn<Client>;
48
49 let lf_config = LFPoolConfig {
50 max_size: config.max_size,
51 create_timeout,
52 wait_timeout: config.queue_timeout,
53 };
54
55 let pool = LockFreePool::new(create, validate, lf_config);
56
57 let test_conn = pool.acquire().await.map_err(|e| {
59 anyhow::anyhow!("Failed to establish database connection: {e}")
60 })?;
61 drop(test_conn);
62
63 Ok(Self {
64 inner: pool,
65 max_size: config.max_size,
66 })
67 }
68
69 pub async fn acquire(&self) -> MCPResult<PooledConnection<Client>> {
74 self.inner.acquire().await.map_err(|e| match e {
75 PoolError::Timeout => {
76 MCPError::PoolError("Connection pool timeout: no connection available".into())
77 }
78 PoolError::Closed => {
79 MCPError::PoolError("Connection pool is closed".into())
80 }
81 PoolError::CreateFailed(msg) => {
82 MCPError::PoolError(format!("Failed to create connection: {msg}"))
83 }
84 })
85 }
86
87 pub fn release(&self, _conn: PooledConnection<Client>) {
92 }
94
95 pub fn active_count(&self) -> u32 {
96 self.inner.status().size
97 }
98
99 pub fn max_size(&self) -> u32 {
100 self.max_size
101 }
102
103 pub fn is_closed(&self) -> bool {
104 self.inner.is_closed()
105 }
106
107 pub fn close(&self) {
109 self.inner.close();
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use tokio::time::sleep;
117
118 #[test]
119 fn test_config() {
120 let cfg = PoolConfig {
121 min_size: 2,
122 max_size: 10,
123 queue_timeout: Duration::from_secs(10),
124 };
125 assert!(cfg.max_size >= cfg.min_size);
126 }
127
128 #[tokio::test]
129 async fn test_pool_create_and_acquire() {
130 if std::env::var("DATABASE_URL").is_err() && std::env::var("PGHOST").is_err() {
133 eprintln!("Skipping: no database available");
134 return;
135 }
136 let url = std::env::var("DATABASE_URL")
137 .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/postgres".to_string());
138 let config = PoolConfig {
139 min_size: 1,
140 max_size: 5,
141 queue_timeout: Duration::from_secs(5),
142 };
143 let pool = ConnectionPool::new(&url, config).await.unwrap();
144 assert_eq!(pool.max_size(), 5);
145 let conn = pool.acquire().await.unwrap();
146 assert!(!conn.is_closed());
147 pool.release(conn);
148 sleep(Duration::from_millis(50)).await;
149 assert!(pool.active_count() > 0);
150 }
151}