Skip to main content

mcp_postgres/
pool.rs

1use anyhow::Result;
2use std::sync::atomic::{AtomicU32, Ordering};
3use std::sync::Arc;
4use tokio::sync::Notify;
5use tokio_postgres::{connect, Client, NoTls};
6use tracing::{debug, error, warn};
7
8use crate::config::PoolConfig;
9use crate::errors::{MCPError, Result as MCPResult};
10
11/// Lock-free connection pool using a lock-free idle queue
12/// and tokio::sync::Notify for efficient blocking when at capacity.
13///
14/// Design:
15///   - Idle connections in SegQueue (lock-free, no contention).
16///   - AtomicU32 tracks total connections (idle + borrowed).
17///   - Notify wakes waiters when a connection is released.
18///   - No mutexes, no spin-waiting, no semaphore overhead.
19pub struct ConnectionPool {
20    config: PoolConfig,
21    connection_string: String,
22    idle_connections: crossbeam::queue::SegQueue<Arc<Client>>,
23    active_connections: AtomicU32,
24    notify: Notify,
25}
26
27impl ConnectionPool {
28    pub async fn new(connection_string: &str, config: PoolConfig) -> Result<Self> {
29        debug!("Creating connection pool with config: {:?}", config);
30
31        let idle_queue = crossbeam::queue::SegQueue::new();
32        let mut created = 0u32;
33
34        for _ in 0..config.min_size {
35            match connect(connection_string, NoTls).await {
36                Ok((client, connection)) => {
37                    tokio::spawn(async move {
38                        if let Err(e) = connection.await {
39                            error!("Connection error: {}", e);
40                        }
41                    });
42                    idle_queue.push(Arc::new(client));
43                    created += 1;
44                }
45                Err(e) => {
46                    warn!("Failed to create initial connection: {}", e);
47                }
48            }
49        }
50
51        if created == 0 {
52            return Err(anyhow::anyhow!(
53                "Failed to establish any database connection. Check DATABASE_URL and ensure PostgreSQL is running."
54            ));
55        }
56
57        Ok(Self {
58            config,
59            connection_string: connection_string.to_string(),
60            idle_connections: idle_queue,
61            active_connections: AtomicU32::new(created),
62            notify: Notify::new(),
63        })
64    }
65
66    /// Acquire a connection from the pool.
67    ///
68    /// Fast path: pop from idle queue (lock-free, ~20ns).
69    /// Slow path: create new connection (up to max_size), or block via Notify.
70    pub async fn acquire(&self) -> MCPResult<Arc<Client>> {
71        loop {
72            // Fast path: return idle connection immediately
73            if let Some(conn) = self.idle_connections.pop() {
74                if is_connection_alive(&conn) {
75                    return Ok(conn);
76                }
77                self.active_connections.fetch_sub(1, Ordering::Relaxed);
78                continue;
79            }
80
81            // No idle connection available. Try to create a new one.
82            let prev = self.active_connections.fetch_add(1, Ordering::Relaxed);
83
84            if prev < self.config.max_size {
85                // We have room to create a new connection
86                match connect(&self.connection_string, NoTls).await {
87                    Ok((client, connection)) => {
88                        tokio::spawn(async move {
89                            if let Err(e) = connection.await {
90                                error!("Lazy connection error: {}", e);
91                            }
92                        });
93                        return Ok(Arc::new(client));
94                    }
95                    Err(e) => {
96                        error!("Failed to create lazy connection: {}", e);
97                        self.active_connections.fetch_sub(1, Ordering::Relaxed);
98
99                        // If we haven't hit the retry limit, loop back
100                        // (might be transient, try idle again first)
101                        continue;
102                    }
103                }
104            } else {
105                // At capacity — undo our speculative increment and wait.
106                self.active_connections.fetch_sub(1, Ordering::Relaxed);
107
108                // Wait for a release signal with timeout
109                tokio::time::timeout(self.config.queue_timeout, self.notify.notified())
110                    .await
111                    .map_err(|_| MCPError::PoolError("Connection pool exhausted".into()))?;
112
113                // Loop back and try again
114            }
115        }
116    }
117
118    /// Release a connection back to the pool.
119    pub fn release(&self, conn: Arc<Client>) {
120        if is_connection_alive(&conn) {
121            self.idle_connections.push(conn);
122        } else {
123            self.active_connections.fetch_sub(1, Ordering::Relaxed);
124        }
125        // Wake one waiter (if any) — they'll try to pop from idle
126        self.notify.notify_one();
127        debug!("Connection released back to pool");
128    }
129
130    pub fn active_count(&self) -> u32 {
131        self.active_connections.load(Ordering::Relaxed)
132    }
133
134    pub fn max_size(&self) -> u32 {
135        self.config.max_size
136    }
137}
138
139fn is_connection_alive(conn: &Client) -> bool {
140    !conn.is_closed()
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use std::time::Duration;
147
148    #[test]
149    fn test_config() {
150        let cfg = PoolConfig {
151            min_size: 2,
152            max_size: 10,
153            queue_timeout: Duration::from_secs(10),
154        };
155        assert!(cfg.max_size >= cfg.min_size);
156    }
157}