postgres-mcp 0.3.2

A PostgreSQL MCP (Model Context Protocol) server implementation for building AI agents
Documentation
# Postgres MCP

Postgres MCP is a MCP (Model Context Protocol) implementation for Postgres. It allows AI agents to interact with Postgres databases.

## APIs

### pg_mcp register <conn_str> ✅

Register a new Postgres connection pool. AI agents can use this id to query the database.

```shell
pg_mcp register "postgres://postgres:postgres@localhost:5432/postgres"
123e4567-e89b-12d3-a456-426614174000
```

### pg_mcp unregister <conn_id> ✅

Unregister a Postgres connection. The connection pool will be closed and the connection id can't be used again.

```shell
pg_mcp unregister 123e4567-e89b-12d3-a456-426614174000
```

### pg_mcp query <conn_id> <query_sql> ✅

Query the database with a SQL statement. It must be a valid "SELECT" statement. We will use sqlparser to parse the statement, validate it is a valid "SELECT" statement, and then generate the SQL statement again. The newly generated SQL statement will be executed against the database.

```shell
pg_mcp query 123e4567-e89b-12d3-a456-426614174000 "SELECT * FROM users"
```

### pg_mcp insert <conn_id> <query_sql> ✅

Insert a new row into the database. It must be a valid "INSERT" statement. We will use sqlparser to parse the statement, validate it is a valid "INSERT" statement, and then generate the SQL statement again. The newly generated SQL statement will be executed against the database.

```shell
pg_mcp insert 123e4567-e89b-12d3-a456-426614174000 "INSERT INTO users (name, email) VALUES ('John Doe', 'john.doe@example.com')"
```

### pg_mcp update <conn_id> <query_sql> ✅

Update a row in the database. It must be a valid "UPDATE" statement. We will use sqlparser to parse the statement, validate it is a valid "UPDATE" statement, and then generate the SQL statement again. The newly generated SQL statement will be executed against the database.

### pg_mcp delete <conn_id> <query_sql> ✅

Delete a row in the database. It must be a valid "DELETE" statement. We will use sqlparser to parse the statement, validate it is a valid "DELETE" statement, and then generate the SQL statement again. The newly generated SQL statement will be executed against the database.

```shell
pg_mcp delete 123e4567-e89b-12d3-a456-426614174000 "DELETE FROM users WHERE id = 1"
```

### pg_mcp describe <conn_id> <table_name> ✅

Describe a table in the database. We will generate the SQL statement and execute it against the database.

```shell
pg_mcp describe 123e4567-e89b-12d3-a456-426614174000 "users"
```

### pg_mcp create_table <conn_id> <create_sql> ✅

Create a new table in the database. It must be a valid "CREATE TABLE" statement. We will use sqlparser to parse the statement, validate it is a valid "CREATE TABLE" statement, and then generate the SQL statement again. The newly generated SQL statement will be executed against the database.

```shell
pg_mcp create_table 123e4567-e89b-12d3-a456-426614174000 "CREATE TABLE users (id SERIAL PRIMARY KEY, name VARCHAR(255), email VARCHAR(255))"
```

### pg_mcp drop_table <conn_id> <table_name> ✅

Drop a table in the database. We will generate the SQL statement and execute it against the database.

```shell
pg_mcp drop_table 123e4567-e89b-12d3-a456-426614174000 "users"
```

### pg_mcp create_index <conn_id> <create_index_sql> ✅

Create an index on a table. It must be a valid "CREATE INDEX" statement. We will use sqlparser to parse the statement, validate it is a valid "CREATE INDEX" statement, and then generate the SQL statement again. The newly generated SQL statement will be executed against the database.

```shell
pg_mcp create_index 123e4567-e89b-12d3-a456-426614174000 "CREATE INDEX idx_users_name ON users (name)"
```

### pg_mcp drop_index <conn_id> <index_name> ✅

Drop an index on a table. We will generate the SQL statement and execute it against the database.

### pg_mcp list_tables <conn_id> <schema> ✅

List all tables in a given schema. If schema is not provided, it will use the current schema.

```shell
pg_mcp list_tables 123e4567-e89b-12d3-a456-426614174000 "public"
```

### pg_mcp create_type <conn_id> <create_type_sql> ✅

Create a new type in the database. It must be a valid "CREATE TYPE" statement. We will use sqlparser to parse the statement, validate it is a valid "CREATE TYPE" statement, and then generate the SQL statement again. The newly generated SQL statement will be executed against the database.

```shell
pg_mcp create_type 123e4567-e89b-12d3-a456-426614174000 "CREATE TYPE users AS ENUM ('admin', 'user')"
```

### pg_mcp create_schema <conn_id> <schema_name> ✅

Create a new schema in the database. We will generate the SQL statement and execute it against the database.

```shell
pg_mcp create_schema 123e4567-e89b-12d3-a456-426614174000 "hr"
```

## Implementations

### Data structure

```rust

#[derive(Debug, Clone)]
pub(crate) struct Conn {
  pub(crate) id: String,
  pub(crate) conn_str: String,
  pub(crate) pool: PgPool,
}

#[derive(Debug, Clone)]
pub(crate) struct Conns {
  pub(crate) inner: Arc<ArcSwap<HashMap<String, Conn>>>,
}

#[derive(Debug, Clone)]
pub PgMcp {
    conns: Conns,
}

#[derive(Debug, FromRow, Serialize, Deserialize)]
struct JsonRow(serde_json::Value);

impl Conns {
  pub(crate) fn new() -> Self {
    ...
  }

  // return the conn id (uuid) if success
  pub(crate) async fn register(&self, conn_str: String) -> Result<String, Error> {
    let mut conns = self.inner.load();
    // use arc_swap to update the inner map
    ...
  }

  pub(crate) fn unregister(&self, id: String) -> Result<(), Error> {
    let mut conns = self.inner.load();
    // use arc_swap to update the inner map
    ...
  }

  // return the result as a json string (serialize Vec<JsonRow>)
  pub(crate) async fn query(&self, id: &str, query: &str) -> Result<String, Error> { ... }

  // return "success, rows_affected: <rows_affected>" if success
  pub(crate) async fn insert(&self, id: &str, query: &str) -> Result<String, Error> { ... }

  // return "success, rows_affected: <rows_affected>" if success
  pub(crate) async fn update(&self, id: &str, query: &str) -> Result<String, Error> { ... }

  // return "success, rows_affected: <rows_affected>" if success
  pub(crate) async fn delete(&self, id: &str, table: &str, pk: &str) -> Result<String, Error> { ... }

  // return "success" if success
  pub(crate) async fn create_table(&self, id: &str, query: &str) -> Result<String, Error> { ... }

  // return "success" if success
  pub(crate) async fn drop_table(&self, id: &str, table: &str) -> Result<String, Error> { ... }

  // return "success" if success
  pub(crate) async fn create_index(&self, id: &str, query: &str) -> Result<String, Error> { ... }

  // return "success" if success
  pub(crate) async fn drop_index(&self, id: &str, index: &str) -> Result<String, Error> { ... }

  // return the result as a json string (serialize Vec<JsonRow>), table name is "schema.table" or "table" if public schema
  pub(crate) async fn describe(&self, id: &str, table: &str) -> Result<String, Error> { ... }

  // return the result as a json string (serialize Vec<JsonRow>)
  pub(crate) async fn list_tables(&self, id: &str) -> Result<String, Error> { ... }

}
```

### Folder structure

src/
├── main.rs: The main entry point of the server
├── lib.rs: core data structure (Conn, Conns, PgMcp) and imports
├── pg.rs: core postgres related logic, e.g. implementation of Conns
└── mcp.rs: implement of PgMcp (follow Calculator example of rmcp) and mcp server implementation

### Dependencies

anyhow: 1.0

arc-swap: 1.7

sqlx: 0.8 with "runtime-tokio", "tls-rustls-aws-lc-rs", "postgres" features

Example:

```rust
use sqlx::postgres::PgPoolOptions;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://postgres:password@localhost/test").await?;

    let row: (i64,) = sqlx::query_as("SELECT $1")
        .bind(150_i64)
        .fetch_one(&pool).await?;

    assert_eq!(row.0, 150);

    Ok(())
}
```

rmcp: 0.1 with "server", "transport-sse-server", "transport-io" features

Example:

```rust
use anyhow::Result;
use tracing_subscriber::{self, EnvFilter};
use rmcp::{
    ServerHandler, ServiceExt, transport::stdio
    model::{ServerCapabilities, ServerInfo},
    schemars, tool, Error as McpError, model::{CallToolResult, Content, ServerCapabilities, ServerInfo},
};

#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
pub struct SumRequest {
    #[schemars(description = "the left hand side number")]
    pub a: i32,
    pub b: i32,
}
#[derive(Debug, Clone)]
pub struct Calculator;
#[tool(tool_box)]
impl Calculator {
    #[tool(description = "Calculate the sum of two numbers")]
    fn sum(&self, #[tool(aggr)] SumRequest { a, b }: SumRequest) -> Result<CallToolResult, McpError> {
        Ok(CallToolResult::new(Content::Text((a + b).to_string())))
    }

    #[tool(description = "Calculate the sum of two numbers")]
    fn sub(
        &self,
        #[tool(param)]
        #[schemars(description = "the left hand side number")]
        a: i32,
        #[tool(param)]
        #[schemars(description = "the left hand side number")]
        b: i32,
    ) -> String {
        (a - b).to_string()
    }
}

#[tool(tool_box)]
impl ServerHandler for Calculator {
    fn get_info(&self) -> ServerInfo {
        ServerInfo {
            instructions: Some("A simple calculator".into()),
            capabilities: ServerCapabilities::builder().enable_tools().build(),
            ..Default::default()
        }
    }
}

/// to test mcp server, run `npx @modelcontextprotocol/inspector cargo run`
#[tokio::main]
async fn main() -> Result<()> {
    // Initialize the tracing subscriber with file and stdout logging
    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env().add_directive(tracing::Level::DEBUG.into()))
        .with_writer(std::io::stderr)
        .with_ansi(false)
        .init();

    tracing::info!("Starting MCP server");

    // Create an instance of our Calculator router
    let service = Calculator::new().serve(stdio()).await.inspect_err(|e| {
        tracing::error!("serving error: {:?}", e);
    })?;

    service.waiting().await?;
    Ok(())
}
```

schemars: 0.8

Example:

```rust
use schemars::{schema_for, JsonSchema};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, JsonSchema)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct MyStruct {
    #[serde(rename = "myNumber")]
    pub my_int: i32,
    pub my_bool: bool,
    #[serde(default)]
    pub my_nullable_enum: Option<MyEnum>,
}

#[derive(Deserialize, Serialize, JsonSchema)]
#[serde(untagged)]
pub enum MyEnum {
    StringNewType(String),
    StructVariant { floats: Vec<f32> },
}

let schema = schema_for!(MyStruct);
println!("{}", serde_json::to_string_pretty(&schema).unwrap());
```

sqlparser: 0.55

Doc: https://docs.rs/sqlparser/latest/sqlparser/

Example:

```rust
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;

let dialect = PostgreSqlDialect {};

let sql = "SELECT a, b, 123, myfunc(b) \
           FROM table_1 \
           WHERE a > b AND b < 100 \
           ORDER BY a DESC, b";

let ast = Parser::parse_sql(&dialect, sql).unwrap();
```

tokio: 1.44