mcp-postgres 4.0.2

High-performance MCP server for PostgreSQL with CPU-aware connection pooling and optimized buffers
Documentation
use serde_json::{json, Value};
use tokio_postgres::Client;
use crate::errors::Result as MCPResult;

const MAX_IDENTIFIER_LEN: usize = 255;

fn validate_identifier(val: &str, label: &str) -> Result<(), crate::errors::MCPError> {
    if val.is_empty() || val.len() > MAX_IDENTIFIER_LEN {
        return Err(crate::errors::MCPError::InvalidParams(
            format!("'{label}' must be 1-{MAX_IDENTIFIER_LEN} characters")
        ));
    }
    Ok(())
}

fn qi(ident: &str) -> String {
    crate::validation::quote_ident(ident)
}

pub async fn add_column(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let column = params.as_ref().and_then(|p| p.get("column").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
    let data_type = params.as_ref().and_then(|p| p.get("data_type").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'data_type'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
    let not_null = params.as_ref().and_then(|p| p.get("not_null").and_then(|v| v.as_bool())).unwrap_or(false);
    let default = params.as_ref().and_then(|p| p.get("default").and_then(|v| v.as_str()));

    validate_identifier(table, "table")?;
    validate_identifier(column, "column")?;
    validate_identifier(schema, "schema")?;

    let mut sql = format!("ALTER TABLE {}.{} ADD COLUMN {}", qi(schema), qi(table), qi(column));
    sql.push_str(&format!(" {}", data_type));
    if let Some(d) = default {
        sql.push_str(&format!(" DEFAULT {}", d));
    }
    if not_null {
        sql.push_str(" NOT NULL");
    }

    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}

pub async fn drop_column(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let column = params.as_ref().and_then(|p| p.get("column").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
    let cascade = params.as_ref().and_then(|p| p.get("cascade").and_then(|v| v.as_bool())).unwrap_or(false);

    validate_identifier(table, "table")?;
    validate_identifier(column, "column")?;
    validate_identifier(schema, "schema")?;

    let mut sql = format!("ALTER TABLE {}.{} DROP COLUMN {}", qi(schema), qi(table), qi(column));
    if cascade { sql.push_str(" CASCADE"); }

    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}

pub async fn rename_column(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let column = params.as_ref().and_then(|p| p.get("column").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
    let new_name = params.as_ref().and_then(|p| p.get("new_name").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'new_name'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");

    validate_identifier(table, "table")?;
    validate_identifier(column, "column")?;
    validate_identifier(new_name, "new_name")?;
    validate_identifier(schema, "schema")?;

    let sql = format!("ALTER TABLE {}.{} RENAME COLUMN {} TO {}", qi(schema), qi(table), qi(column), qi(new_name));
    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}

pub async fn alter_column_type(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let column = params.as_ref().and_then(|p| p.get("column").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'column'".into()))?;
    let data_type = params.as_ref().and_then(|p| p.get("data_type").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'data_type'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
    let using = params.as_ref().and_then(|p| p.get("using").and_then(|v| v.as_str()));

    validate_identifier(table, "table")?;
    validate_identifier(column, "column")?;
    validate_identifier(schema, "schema")?;

    let mut sql = format!("ALTER TABLE {}.{} ALTER COLUMN {} TYPE {}", qi(schema), qi(table), qi(column), data_type);
    if let Some(expr) = using {
        sql.push_str(&format!(" USING {}", expr));
    }

    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}

pub async fn rename_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let new_name = params.as_ref().and_then(|p| p.get("new_name").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'new_name'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");

    validate_identifier(table, "table")?;
    validate_identifier(new_name, "new_name")?;
    validate_identifier(schema, "schema")?;

    let sql = format!("ALTER TABLE {}.{} RENAME TO {}", qi(schema), qi(table), qi(new_name));
    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}

pub async fn rename_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let index = params.as_ref().and_then(|p| p.get("index").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'index'".into()))?;
    let new_name = params.as_ref().and_then(|p| p.get("new_name").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'new_name'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");

    validate_identifier(index, "index")?;
    validate_identifier(new_name, "new_name")?;
    validate_identifier(schema, "schema")?;

    let sql = format!("ALTER INDEX {}.{} RENAME TO {}", qi(schema), qi(index), qi(new_name));
    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}

pub async fn rename_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'schema'".into()))?;
    let new_name = params.as_ref().and_then(|p| p.get("new_name").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'new_name'".into()))?;

    validate_identifier(schema, "schema")?;
    validate_identifier(new_name, "new_name")?;

    let sql = format!("ALTER SCHEMA {} RENAME TO {}", qi(schema), qi(new_name));
    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}

pub async fn add_foreign_key(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let columns = params.as_ref().and_then(|p| p.get("columns").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'columns'".into()))?;
    let ref_table = params.as_ref().and_then(|p| p.get("ref_table").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'ref_table'".into()))?;
    let ref_columns = params.as_ref().and_then(|p| p.get("ref_columns").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'ref_columns'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
    let constraint_name = params.as_ref().and_then(|p| p.get("constraint_name").and_then(|v| v.as_str()));
    let on_delete = params.as_ref().and_then(|p| p.get("on_delete").and_then(|v| v.as_str()));
    let on_update = params.as_ref().and_then(|p| p.get("on_update").and_then(|v| v.as_str()));

    validate_identifier(table, "table")?;
    validate_identifier(ref_table, "ref_table")?;
    validate_identifier(schema, "schema")?;

    let mut sql = format!("ALTER TABLE {}.{} ADD", qi(schema), qi(table));
    if let Some(cname) = constraint_name {
        sql.push_str(&format!(" CONSTRAINT {}", qi(cname)));
    }
    sql.push_str(&format!(" FOREIGN KEY ({}) REFERENCES {}.{} ({})", columns, qi(schema), qi(ref_table), ref_columns));
    if let Some(od) = on_delete {
        sql.push_str(&format!(" ON DELETE {}", od));
    }
    if let Some(ou) = on_update {
        sql.push_str(&format!(" ON UPDATE {}", ou));
    }

    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}

pub async fn drop_foreign_key(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let constraint = params.as_ref().and_then(|p| p.get("constraint").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'constraint'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
    let cascade = params.as_ref().and_then(|p| p.get("cascade").and_then(|v| v.as_bool())).unwrap_or(false);

    validate_identifier(table, "table")?;
    validate_identifier(constraint, "constraint")?;
    validate_identifier(schema, "schema")?;

    let mut sql = format!("ALTER TABLE {}.{} DROP CONSTRAINT {}", qi(schema), qi(table), qi(constraint));
    if cascade { sql.push_str(" CASCADE"); }

    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}

pub async fn add_unique_constraint(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let columns = params.as_ref().and_then(|p| p.get("columns").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'columns'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
    let constraint_name = params.as_ref().and_then(|p| p.get("constraint_name").and_then(|v| v.as_str()));

    validate_identifier(table, "table")?;
    validate_identifier(schema, "schema")?;

    let mut sql = format!("ALTER TABLE {}.{} ADD", qi(schema), qi(table));
    if let Some(cname) = constraint_name {
        validate_identifier(cname, "constraint_name")?;
        sql.push_str(&format!(" CONSTRAINT {}", qi(cname)));
    }
    sql.push_str(&format!(" UNIQUE ({})", columns));

    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}

pub async fn drop_constraint(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let constraint = params.as_ref().and_then(|p| p.get("constraint").and_then(|v| v.as_str())).ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'constraint'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
    let cascade = params.as_ref().and_then(|p| p.get("cascade").and_then(|v| v.as_bool())).unwrap_or(false);

    validate_identifier(table, "table")?;
    validate_identifier(constraint, "constraint")?;
    validate_identifier(schema, "schema")?;

    let mut sql = format!("ALTER TABLE {}.{} DROP CONSTRAINT {}", qi(schema), qi(table), qi(constraint));
    if cascade { sql.push_str(" CASCADE"); }

    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "sql": sql }))
}