mcp-postgres 1.2.2

High-performance MCP server for PostgreSQL with CPU-aware connection pooling and optimized buffers
Documentation
use anyhow::Result;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;
use tokio_postgres::{connect, Client, NoTls};
use tracing::{debug, error, warn};

use crate::config::PoolConfig;
use crate::errors::{MCPError, Result as MCPResult};

/// Lock-free connection pool using a lock-free idle queue
/// and tokio::sync::Notify for efficient blocking when at capacity.
///
/// Design:
///   - Idle connections in SegQueue (lock-free, no contention).
///   - AtomicU32 tracks total connections (idle + borrowed).
///   - Notify wakes waiters when a connection is released.
///   - No mutexes, no spin-waiting, no semaphore overhead.
pub struct ConnectionPool {
    config: PoolConfig,
    connection_string: String,
    idle_connections: crossbeam::queue::SegQueue<Arc<Client>>,
    active_connections: AtomicU32,
    notify: Notify,
}

impl ConnectionPool {
    pub async fn new(connection_string: &str, config: PoolConfig) -> Result<Self> {
        debug!("Creating connection pool with config: {:?}", config);

        let idle_queue = crossbeam::queue::SegQueue::new();
        let mut created = 0u32;

        for _ in 0..config.min_size {
            match connect(connection_string, NoTls).await {
                Ok((client, connection)) => {
                    tokio::spawn(async move {
                        if let Err(e) = connection.await {
                            error!("Connection error: {}", e);
                        }
                    });
                    idle_queue.push(Arc::new(client));
                    created += 1;
                }
                Err(e) => {
                    warn!("Failed to create initial connection: {}", e);
                }
            }
        }

        if created == 0 {
            return Err(anyhow::anyhow!(
                "Failed to establish any database connection. Check DATABASE_URL and ensure PostgreSQL is running."
            ));
        }

        Ok(Self {
            config,
            connection_string: connection_string.to_string(),
            idle_connections: idle_queue,
            active_connections: AtomicU32::new(created),
            notify: Notify::new(),
        })
    }

    /// Acquire a connection from the pool.
    ///
    /// Fast path: pop from idle queue (lock-free, ~20ns).
    /// Slow path: create new connection (up to max_size), or block via Notify.
    pub async fn acquire(&self) -> MCPResult<Arc<Client>> {
        loop {
            // Fast path: return idle connection immediately
            if let Some(conn) = self.idle_connections.pop() {
                if is_connection_alive(&conn) {
                    return Ok(conn);
                }
                self.active_connections.fetch_sub(1, Ordering::Relaxed);
                continue;
            }

            // No idle connection available. Try to create a new one.
            let prev = self.active_connections.fetch_add(1, Ordering::Relaxed);

            if prev < self.config.max_size {
                // We have room to create a new connection
                match connect(&self.connection_string, NoTls).await {
                    Ok((client, connection)) => {
                        tokio::spawn(async move {
                            if let Err(e) = connection.await {
                                error!("Lazy connection error: {}", e);
                            }
                        });
                        return Ok(Arc::new(client));
                    }
                    Err(e) => {
                        error!("Failed to create lazy connection: {}", e);
                        self.active_connections.fetch_sub(1, Ordering::Relaxed);

                        // If we haven't hit the retry limit, loop back
                        // (might be transient, try idle again first)
                        continue;
                    }
                }
            } else {
                // At capacity — undo our speculative increment and wait.
                self.active_connections.fetch_sub(1, Ordering::Relaxed);

                // Wait for a release signal with timeout
                tokio::time::timeout(self.config.queue_timeout, self.notify.notified())
                    .await
                    .map_err(|_| MCPError::PoolError("Connection pool exhausted".into()))?;

                // Loop back and try again
            }
        }
    }

    /// Release a connection back to the pool.
    pub fn release(&self, conn: Arc<Client>) {
        if is_connection_alive(&conn) {
            self.idle_connections.push(conn);
        } else {
            self.active_connections.fetch_sub(1, Ordering::Relaxed);
        }
        // Wake one waiter (if any) — they'll try to pop from idle
        self.notify.notify_one();
        debug!("Connection released back to pool");
    }

    pub fn active_count(&self) -> u32 {
        self.active_connections.load(Ordering::Relaxed)
    }

    pub fn max_size(&self) -> u32 {
        self.config.max_size
    }
}

fn is_connection_alive(conn: &Client) -> bool {
    !conn.is_closed()
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;

    #[test]
    fn test_config() {
        let cfg = PoolConfig {
            min_size: 2,
            max_size: 10,
            queue_timeout: Duration::from_secs(10),
        };
        assert!(cfg.max_size >= cfg.min_size);
    }
}