1use anyhow::Result;
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Arc;
4use tokio::sync::Notify;
5use tokio_postgres::{connect, Client, NoTls};
6use tracing::{debug, error, warn};
7
8use crate::config::PoolConfig;
9use crate::errors::{MCPError, Result as MCPResult};
10
11pub struct ConnectionPool {
20 config: PoolConfig,
21 connection_string: String,
22 idle_connections: crossbeam::queue::SegQueue<Arc<Client>>,
23 active_connections: AtomicU32,
24 notify: Notify,
25}
26
27impl ConnectionPool {
28 pub async fn new(connection_string: &str, config: PoolConfig) -> Result<Self> {
29 debug!("Creating connection pool with config: {:?}", config);
30
31 let idle_queue = crossbeam::queue::SegQueue::new();
32 let mut created = 0u32;
33
34 for _ in 0..config.min_size {
35 match connect(connection_string, NoTls).await {
36 Ok((client, connection)) => {
37 tokio::spawn(async move {
38 if let Err(e) = connection.await {
39 error!("Connection error: {}", e);
40 }
41 });
42 idle_queue.push(Arc::new(client));
43 created += 1;
44 }
45 Err(e) => {
46 warn!("Failed to create initial connection: {}", e);
47 }
48 }
49 }
50
51 if created == 0 {
52 return Err(anyhow::anyhow!(
53 "Failed to establish any database connection. Check DATABASE_URL and ensure PostgreSQL is running."
54 ));
55 }
56
57 Ok(Self {
58 config,
59 connection_string: connection_string.to_string(),
60 idle_connections: idle_queue,
61 active_connections: AtomicU32::new(created),
62 notify: Notify::new(),
63 })
64 }
65
66 pub async fn acquire(&self) -> MCPResult<Arc<Client>> {
71 loop {
72 if let Some(conn) = self.idle_connections.pop() {
74 if is_connection_alive(&conn) {
75 return Ok(conn);
76 }
77 self.active_connections.fetch_sub(1, Ordering::Relaxed);
78 continue;
79 }
80
81 let prev = self.active_connections.fetch_add(1, Ordering::Relaxed);
83
84 if prev < self.config.max_size {
85 match connect(&self.connection_string, NoTls).await {
87 Ok((client, connection)) => {
88 tokio::spawn(async move {
89 if let Err(e) = connection.await {
90 error!("Lazy connection error: {}", e);
91 }
92 });
93 return Ok(Arc::new(client));
94 }
95 Err(e) => {
96 error!("Failed to create lazy connection: {}", e);
97 self.active_connections.fetch_sub(1, Ordering::Relaxed);
98
99 continue;
102 }
103 }
104 } else {
105 self.active_connections.fetch_sub(1, Ordering::Relaxed);
107
108 tokio::time::timeout(self.config.queue_timeout, self.notify.notified())
110 .await
111 .map_err(|_| MCPError::PoolError("Connection pool exhausted".into()))?;
112
113 }
115 }
116 }
117
118 pub fn release(&self, conn: Arc<Client>) {
120 if is_connection_alive(&conn) {
121 self.idle_connections.push(conn);
122 } else {
123 self.active_connections.fetch_sub(1, Ordering::Relaxed);
124 }
125 self.notify.notify_one();
127 debug!("Connection released back to pool");
128 }
129
130 pub fn active_count(&self) -> u32 {
131 self.active_connections.load(Ordering::Relaxed)
132 }
133
134 pub fn max_size(&self) -> u32 {
135 self.config.max_size
136 }
137}
138
139fn is_connection_alive(conn: &Client) -> bool {
140 !conn.is_closed()
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use std::time::Duration;
147
148 #[test]
149 fn test_config() {
150 let cfg = PoolConfig {
151 min_size: 2,
152 max_size: 10,
153 queue_timeout: Duration::from_secs(10),
154 };
155 assert!(cfg.max_size >= cfg.min_size);
156 }
157}