use serde_json::{json, Value};
use tokio_postgres::Client;
use crate::errors::Result as MCPResult;
pub async fn execute_query(client: &Client, params: Option<Value>) -> MCPResult<Value> {
let sql = params
.as_ref()
.and_then(|p| p.get("sql"))
.and_then(|v| v.as_str())
.ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
let first_word = sql.trim_start().split_whitespace().next().unwrap_or("").to_uppercase();
if first_word != "SELECT" {
return Err(crate::errors::MCPError::InvalidParams("Only SELECT queries allowed".into()));
}
let rows = client.query(sql, &[]).await?;
let results: Vec<Value> = rows
.iter()
.map(|row| {
let values: Vec<Value> = (0..row.len())
.map(|i| {
match row.try_get::<_, String>(i) {
Ok(v) => Value::String(v),
Err(_) => Value::Null,
}
})
.collect();
Value::Array(values)
})
.collect();
Ok(json!({ "rows": results }))
}
pub async fn execute_insert(client: &Client, params: Option<Value>) -> MCPResult<Value> {
let sql = params
.as_ref()
.and_then(|p| p.get("sql"))
.and_then(|v| v.as_str())
.ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
let first_word = sql.trim_start().split_whitespace().next().unwrap_or("").to_uppercase();
if first_word != "INSERT" {
return Err(crate::errors::MCPError::InvalidParams("Invalid INSERT query".into()));
}
let rows_affected = client.execute(sql, &[]).await?;
Ok(json!({ "rows_affected": rows_affected }))
}
pub async fn execute_update(client: &Client, params: Option<Value>) -> MCPResult<Value> {
let sql = params
.as_ref()
.and_then(|p| p.get("sql"))
.and_then(|v| v.as_str())
.ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
let first_word = sql.trim_start().split_whitespace().next().unwrap_or("").to_uppercase();
if first_word != "UPDATE" {
return Err(crate::errors::MCPError::InvalidParams("Invalid UPDATE query".into()));
}
let rows_affected = client.execute(sql, &[]).await?;
Ok(json!({ "rows_affected": rows_affected }))
}
pub async fn execute_delete(client: &Client, params: Option<Value>) -> MCPResult<Value> {
let sql = params
.as_ref()
.and_then(|p| p.get("sql"))
.and_then(|v| v.as_str())
.ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
let first_word = sql.trim_start().split_whitespace().next().unwrap_or("").to_uppercase();
if first_word != "DELETE" {
return Err(crate::errors::MCPError::InvalidParams("Invalid DELETE query".into()));
}
let rows_affected = client.execute(sql, &[]).await?;
Ok(json!({ "rows_affected": rows_affected }))
}
pub async fn explain_query(client: &Client, params: Option<Value>) -> MCPResult<Value> {
let sql = params
.as_ref()
.and_then(|p| p.get("sql"))
.and_then(|v| v.as_str())
.ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
let mut explain_sql = String::with_capacity(sql.len() + 24);
explain_sql.push_str("EXPLAIN (FORMAT JSON) ");
explain_sql.push_str(sql);
let rows = client.query(&explain_sql, &[]).await?;
if rows.is_empty() {
return Ok(json!({ "plan": null }));
}
let plan: String = rows[0].get(0);
Ok(json!({ "plan": serde_json::from_str::<Value>(&plan)? }))
}