1use std::time::Duration;
11use tokio_postgres::{Client, NoTls};
12use tracing::debug;
13
14use crate::config::PoolConfig;
15use crate::errors::{MCPError, Result as MCPResult};
16use crate::lockfree_pool::{
17 BoxFuture, CreateFn, LockFreePool, PoolConfig as LFPoolConfig, PoolError, PooledConnection,
18 ValidateFn,
19};
20
21pub struct ConnectionPool {
23 inner: LockFreePool<Client>,
24 max_size: u32,
25}
26
27impl ConnectionPool {
28 pub async fn new(connection_string: &str, config: PoolConfig) -> anyhow::Result<Self> {
29 debug!(
30 "Creating lock-free connection pool: max_size={}",
31 config.max_size
32 );
33
34 let conn_string = connection_string.to_string();
35 let create_timeout = Duration::from_secs(5);
36
37 let create = {
38 let cs = conn_string.clone();
39 Box::new(move || {
40 let cs = cs.clone();
41 Box::pin(async move {
42 let (client, connection) = tokio_postgres::connect(&cs, NoTls)
43 .await
44 .map_err(|e| e.to_string())?;
45 tokio::spawn(connection);
46 Ok(client)
47 }) as BoxFuture<'static, Result<Client, String>>
48 }) as CreateFn<Client>
49 };
50
51 let validate = Box::new(|client: &Client| !client.is_closed()) as ValidateFn<Client>;
52
53 let lf_config = LFPoolConfig {
54 max_size: config.max_size,
55 create_timeout,
56 wait_timeout: config.queue_timeout,
57 };
58
59 let pool = LockFreePool::new(create, validate, &lf_config);
60
61 let test_conn = pool
63 .acquire()
64 .await
65 .map_err(|e| anyhow::anyhow!("Failed to establish database connection: {e}"))?;
66 drop(test_conn);
67
68 Ok(Self {
69 inner: pool,
70 max_size: config.max_size,
71 })
72 }
73
74 pub async fn acquire(&self) -> MCPResult<PooledConnection<Client>> {
79 self.inner.acquire().await.map_err(|e| match e {
80 PoolError::Timeout => {
81 MCPError::PoolError("Connection pool timeout: no connection available".into())
82 }
83 PoolError::Closed => MCPError::PoolError("Connection pool is closed".into()),
84 PoolError::CreateFailed(msg) => {
85 MCPError::PoolError(format!("Failed to create connection: {msg}"))
86 }
87 })
88 }
89
90 pub fn release(&self, _conn: PooledConnection<Client>) {
95 }
97
98 pub fn active_count(&self) -> u32 {
99 self.inner.status().size
100 }
101
102 pub const fn max_size(&self) -> u32 {
103 self.max_size
104 }
105
106 pub fn is_closed(&self) -> bool {
107 self.inner.is_closed()
108 }
109
110 pub fn close(&self) {
112 self.inner.close();
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use tokio::time::sleep;
120
121 #[test]
122 fn test_config() {
123 let cfg = PoolConfig {
124 min_size: 2,
125 max_size: 10,
126 queue_timeout: Duration::from_secs(10),
127 };
128 assert!(cfg.max_size >= cfg.min_size);
129 }
130
131 #[tokio::test]
132 async fn test_pool_create_and_acquire() {
133 if std::env::var("DATABASE_URL").is_err() && std::env::var("PGHOST").is_err() {
136 eprintln!("Skipping: no database available");
137 return;
138 }
139 let url = std::env::var("DATABASE_URL")
140 .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/postgres".to_string());
141 let config = PoolConfig {
142 min_size: 1,
143 max_size: 5,
144 queue_timeout: Duration::from_secs(5),
145 };
146 let pool = ConnectionPool::new(&url, config).await.unwrap();
147 assert_eq!(pool.max_size(), 5);
148 let conn = pool.acquire().await.unwrap();
149 assert!(!conn.is_closed());
150 pool.release(conn);
151 sleep(Duration::from_millis(50)).await;
152 assert!(pool.active_count() > 0);
153 }
154}