Skip to main content

mcp_postgres/actions/
query.rs

1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5const MAX_SQL_LEN: usize = 10_000;
6
7fn validate_sql(sql: &str, allowed_prefix: &str, label: &str) -> std::result::Result<(), crate::errors::MCPError> {
8    if sql.is_empty() {
9        return Err(crate::errors::MCPError::InvalidParams("'sql' parameter must not be empty".into()));
10    }
11    if sql.len() > MAX_SQL_LEN {
12        return Err(crate::errors::MCPError::InvalidParams(
13            format!("SQL exceeds maximum length of {MAX_SQL_LEN} characters (got {})", sql.len())
14        ));
15    }
16    let first_word = sql.split_whitespace().next().unwrap_or("").to_uppercase();
17    if first_word != allowed_prefix {
18        return Err(crate::errors::MCPError::InvalidParams(
19            format!("Invalid {label} query: expected '{allowed_prefix}'")
20        ));
21    }
22    Ok(())
23}
24
25/// 6. Execute query
26pub async fn execute_query(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
27    let sql = params
28        .as_ref()
29        .and_then(|p| p.get("sql"))
30        .and_then(|v| v.as_str())
31        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
32
33    validate_sql(sql, "SELECT", "SELECT")?;
34
35    let rows = client.query(sql, &[]).await?;
36
37    let results: Vec<Value> = rows
38        .iter()
39        .map(|row| {
40            let values: Vec<Value> = (0..row.len())
41                .map(|i| {
42                    // Try type inference: prefer native JSON types over raw strings
43                    if let Ok(v) = row.try_get::<_, bool>(i) {
44                        json!(v)
45                    } else if let Ok(v) = row.try_get::<_, i32>(i) {
46                        json!(v)
47                    } else if let Ok(v) = row.try_get::<_, i64>(i) {
48                        json!(v)
49                    } else if let Ok(v) = row.try_get::<_, f32>(i) {
50                        json!(v)
51                    } else if let Ok(v) = row.try_get::<_, f64>(i) {
52                        json!(v)
53                    } else if let Ok(v) = row.try_get::<_, String>(i) {
54                        Value::String(v)
55                    } else if let Ok(v) = row.try_get::<_, Option<String>>(i) {
56                        v.map(Value::String).unwrap_or(Value::Null)
57                    } else {
58                        Value::Null
59                    }
60                })
61                .collect();
62            Value::Array(values)
63        })
64        .collect();
65
66    Ok(json!({ "rows": results }))
67}
68
69/// 7. Execute insert
70pub async fn execute_insert(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
71    let sql = params
72        .as_ref()
73        .and_then(|p| p.get("sql"))
74        .and_then(|v| v.as_str())
75        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
76
77    validate_sql(sql, "INSERT", "INSERT")?;
78
79    let rows_affected = client.execute(sql, &[]).await?;
80
81    Ok(json!({ "rows_affected": rows_affected }))
82}
83
84/// 8. Execute update
85pub async fn execute_update(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
86    let sql = params
87        .as_ref()
88        .and_then(|p| p.get("sql"))
89        .and_then(|v| v.as_str())
90        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
91
92    validate_sql(sql, "UPDATE", "UPDATE")?;
93
94    let rows_affected = client.execute(sql, &[]).await?;
95
96    Ok(json!({ "rows_affected": rows_affected }))
97}
98
99/// 9. Execute delete
100pub async fn execute_delete(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
101    let sql = params
102        .as_ref()
103        .and_then(|p| p.get("sql"))
104        .and_then(|v| v.as_str())
105        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
106
107    validate_sql(sql, "DELETE", "DELETE")?;
108
109    let rows_affected = client.execute(sql, &[]).await?;
110
111    Ok(json!({ "rows_affected": rows_affected }))
112}
113
114/// 10. Explain query
115///
116/// Supports EXPLAIN with optional ANALYZE, BUFFERS, and FORMAT options.
117/// Only SELECT queries can be explained.
118pub async fn explain_query(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
119    let sql = params
120        .as_ref()
121        .and_then(|p| p.get("sql"))
122        .and_then(|v| v.as_str())
123        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
124
125    validate_sql(sql, "SELECT", "SELECT")?;
126
127    let analyze = params
128        .as_ref()
129        .and_then(|p| p.get("analyze"))
130        .and_then(|v| v.as_bool())
131        .unwrap_or(false);
132
133    let buffers = params
134        .as_ref()
135        .and_then(|p| p.get("buffers"))
136        .and_then(|v| v.as_bool())
137        .unwrap_or(false);
138
139    let format = params
140        .as_ref()
141        .and_then(|p| p.get("format"))
142        .and_then(|v| v.as_str())
143        .unwrap_or("json");
144
145    let format_upper = format.to_uppercase();
146    if format_upper == "XML" {
147        return Err(crate::errors::MCPError::InvalidParams(
148            "XML format is not supported — use TEXT, YAML, or JSON".into()
149        ));
150    }
151
152    let mut opts = Vec::new();
153    opts.push(format!("FORMAT {}", format_upper));
154    if analyze {
155        opts.push("ANALYZE".to_string());
156    }
157    if buffers {
158        opts.push("BUFFERS".to_string());
159    }
160
161    let mut explain_sql = String::with_capacity(sql.len() + 64);
162    explain_sql.push_str("EXPLAIN (");
163    explain_sql.push_str(&opts.join(", "));
164    explain_sql.push_str(") ");
165    explain_sql.push_str(sql);
166
167    let rows = client.query(&explain_sql, &[]).await?;
168
169    if rows.is_empty() {
170        return Ok(json!({ "plan": null }));
171    }
172
173    if format.eq_ignore_ascii_case("json") {
174        let plan: serde_json::Value = rows[0].get(0);
175        Ok(json!({
176            "plan": plan,
177            "options": { "analyze": analyze, "buffers": buffers, "format": format }
178        }))
179    } else {
180        let lines: Vec<String> = rows.iter().map(|r| r.get::<_, String>(0)).collect();
181        Ok(json!({
182            "plan": lines.join("\n"),
183            "options": { "analyze": analyze, "buffers": buffers, "format": format }
184        }))
185    }
186}
187
188/// 26. Async execute insert (with synchronous_commit=off for high-volume operations)
189///
190/// High-performance insert for WHERE predicate affecting more than 100 rows.
191/// Disables synchronous_commit temporarily for maximum throughput.
192/// Significant performance benefit when WHERE condition matches > 100 rows.
193/// Returns rows affected count.
194pub async fn async_execute_insert(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
195    let sql = params
196        .as_ref()
197        .and_then(|p| p.get("sql"))
198        .and_then(|v| v.as_str())
199        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
200
201    validate_sql(sql, "INSERT", "INSERT")?;
202
203    client.execute("SET synchronous_commit = OFF", &[]).await?;
204    let rows_affected = client.execute(sql, &[]).await?;
205    client.execute("SET synchronous_commit = ON", &[]).await?;
206
207    Ok(json!({ "rows_affected": rows_affected }))
208}
209
210/// 27. Async execute update (with synchronous_commit=off for high-volume operations)
211///
212/// High-performance update for WHERE predicate affecting more than 100 rows.
213/// Disables synchronous_commit temporarily for maximum throughput.
214/// Significant performance benefit when WHERE condition matches > 100 rows.
215/// Always include WHERE clause to prevent accidental updates.
216/// Returns rows affected count.
217pub async fn async_execute_update(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
218    let sql = params
219        .as_ref()
220        .and_then(|p| p.get("sql"))
221        .and_then(|v| v.as_str())
222        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
223
224    validate_sql(sql, "UPDATE", "UPDATE")?;
225
226    client.execute("SET synchronous_commit = OFF", &[]).await?;
227    let rows_affected = client.execute(sql, &[]).await?;
228    client.execute("SET synchronous_commit = ON", &[]).await?;
229
230    Ok(json!({ "rows_affected": rows_affected }))
231}
232
233/// 28. Async execute delete (with synchronous_commit=off for high-volume operations)
234///
235/// High-performance delete for WHERE predicate affecting more than 100 rows.
236/// Disables synchronous_commit temporarily for maximum throughput.
237/// Significant performance benefit when WHERE condition matches > 100 rows.
238/// Always include WHERE clause - deleting without one removes all rows.
239/// Returns rows affected count.
240pub async fn async_execute_delete(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
241    let sql = params
242        .as_ref()
243        .and_then(|p| p.get("sql"))
244        .and_then(|v| v.as_str())
245        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
246
247    validate_sql(sql, "DELETE", "DELETE")?;
248
249    client.execute("SET synchronous_commit = OFF", &[]).await?;
250    let rows_affected = client.execute(sql, &[]).await?;
251    client.execute("SET synchronous_commit = ON", &[]).await?;
252
253    Ok(json!({ "rows_affected": rows_affected }))
254}