mcp-postgres 4.0.3

High-performance MCP server for PostgreSQL with CPU-aware connection pooling and optimized buffers
Documentation
use serde_json::{json, Value};
use tokio_postgres::Client;
use crate::errors::Result as MCPResult;

pub async fn create_hypertable(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let time_column = params.as_ref().and_then(|p| p.get("time_column").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'time_column'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
    let chunk_time = params.as_ref().and_then(|p| p.get("chunk_time_interval").and_then(|v| v.as_str()));

    let mut sql = format!("SELECT create_hypertable('{}.{}', '{}'",
        crate::validation::quote_ident(schema), crate::validation::quote_ident(table), time_column);
    if let Some(ct) = chunk_time {
        sql.push_str(&format!(", chunk_time_interval => INTERVAL '{}'", ct));
    }
    sql.push(')');

    let rows = client.query(&sql, &[]).await?;
    let created: bool = rows[0].get(0);

    Ok(json!({ "success": created, "hypertable": format!("{}.{}", schema, table), "sql": sql }))
}

pub async fn show_hypertable_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");

    let rows = client
        .query(
            "SELECT hypertable_name, hypertable_schema, owner,
                    num_dimensions, chunk_target_size,
                    compression_state, tablespaces
             FROM timescaledb_information.hypertables
             WHERE hypertable_name = $1 AND hypertable_schema = $2",
            &[&table, &schema],
        )
        .await?;

    if rows.is_empty() {
        return Ok(json!({ "table": format!("{}.{}", schema, table), "is_hypertable": false }));
    }

    let row = &rows[0];
    Ok(json!({
        "table": row.get::<_, String>(0),
        "schema": row.get::<_, String>(1),
        "owner": row.get::<_, Option<String>>(2),
        "dimensions": row.get::<_, Option<i32>>(3),
        "chunk_target_size": row.get::<_, Option<String>>(4),
        "compression": row.get::<_, Option<String>>(5),
        "tablespaces": row.get::<_, Option<String>>(6),
        "is_hypertable": true,
    }))
}

pub async fn show_chunks(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");

    let rows = client
        .query(
            "SELECT chunk_name, chunk_schema, table_name, table_schema,
                    range_start::text, range_end::text, is_compressed::text,
                    disk_size::text
             FROM timescaledb_information.chunks
             WHERE table_name = $1 AND table_schema = $2
             ORDER BY range_start",
            &[&table, &schema],
        )
        .await?;

    let chunks: Vec<Value> = rows.iter().map(|row| {
        json!({
            "chunk_name": row.get::<_, String>(0),
            "chunk_schema": row.get::<_, String>(1),
            "range_start": row.get::<_, String>(3),
            "range_end": row.get::<_, String>(4),
            "compressed": row.get::<_, String>(5),
            "disk_size": row.get::<_, Option<String>>(6),
        })
    }).collect();

    Ok(json!({ "chunks": chunks, "count": chunks.len() }))
}

pub async fn add_retention_policy(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let drop_after = params.as_ref().and_then(|p| p.get("drop_after").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'drop_after' (e.g. '90 days')".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");

    let sql = format!(
        "SELECT add_retention_policy('{}.{}', INTERVAL '{}')",
        crate::validation::quote_ident(schema),
        crate::validation::quote_ident(table),
        drop_after
    );

    let rows = client.query(&sql, &[]).await?;
    let job_id: i32 = rows[0].get(0);

    Ok(json!({ "success": true, "job_id": job_id, "sql": sql }))
}

pub async fn add_compression_policy(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table'".into()))?;
    let compress_after = params.as_ref().and_then(|p| p.get("compress_after").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'compress_after' (e.g. '7 days')".into()))?;
    let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");

    let sql = format!(
        "SELECT add_compression_policy('{}.{}', INTERVAL '{}')",
        crate::validation::quote_ident(schema),
        crate::validation::quote_ident(table),
        compress_after
    );

    let rows = client.query(&sql, &[]).await?;
    let job_id: i32 = rows[0].get(0);

    Ok(json!({ "success": true, "job_id": job_id, "sql": sql }))
}

pub async fn compress_chunk(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let chunk_name = params.as_ref().and_then(|p| p.get("chunk_name").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'chunk_name'".into()))?;
    let chunk_schema = params.as_ref().and_then(|p| p.get("chunk_schema").and_then(|v| v.as_str())).unwrap_or("_hyper");

    let sql = format!("SELECT compress_chunk('{}.{}')", crate::validation::quote_ident(chunk_schema), crate::validation::quote_ident(chunk_name));
    let rows = client.query(&sql, &[]).await?;
    let result: String = rows[0].get(0);

    Ok(json!({ "success": true, "chunk": format!("{}.{}", chunk_schema, chunk_name), "result": result }))
}

pub async fn add_continuous_aggregate(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
    let name = params.as_ref().and_then(|p| p.get("name").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'name'".into()))?;
    let query = params.as_ref().and_then(|p| p.get("query").and_then(|v| v.as_str()))
        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'query'".into()))?;
    let refresh_interval = params.as_ref().and_then(|p| p.get("refresh_interval").and_then(|v| v.as_str()));

    let q_name = crate::validation::quote_ident(name);
    let mut sql = format!("CREATE MATERIALIZED VIEW {q_name}");
    sql.push_str("\nWITH (timescaledb.continuous) AS\n");
    sql.push_str(query);
    if let Some(ri) = refresh_interval {
        sql.push_str(&format!("\nWITH DATA;\nSELECT add_continuous_aggregate_policy('{q_name}', INTERVAL '{ri}', INTERVAL '{ri}')"));
    }

    client.execute(&sql, &[]).await?;
    Ok(json!({ "success": true, "name": name, "sql": sql }))
}