mcp_postgres/actions/
query.rs1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5const MAX_SQL_LEN: usize = 10_000;
6
7fn validate_sql(sql: &str, allowed_prefix: &str, label: &str) -> std::result::Result<(), crate::errors::MCPError> {
8 if sql.is_empty() {
9 return Err(crate::errors::MCPError::InvalidParams("'sql' parameter must not be empty".into()));
10 }
11 if sql.len() > MAX_SQL_LEN {
12 return Err(crate::errors::MCPError::InvalidParams(
13 format!("SQL exceeds maximum length of {MAX_SQL_LEN} characters (got {})", sql.len())
14 ));
15 }
16 let first_word = sql.split_whitespace().next().unwrap_or("").to_uppercase();
17 if first_word != allowed_prefix {
18 return Err(crate::errors::MCPError::InvalidParams(
19 format!("Invalid {label} query: expected '{allowed_prefix}'")
20 ));
21 }
22 Ok(())
23}
24
25pub async fn execute_query(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
27 let sql = params
28 .as_ref()
29 .and_then(|p| p.get("sql"))
30 .and_then(|v| v.as_str())
31 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
32
33 validate_sql(sql, "SELECT", "SELECT")?;
34
35 let rows = client.query(sql, &[]).await?;
36
37 let results: Vec<Value> = rows
38 .iter()
39 .map(|row| {
40 let values: Vec<Value> = (0..row.len())
41 .map(|i| {
42 if let Ok(v) = row.try_get::<_, bool>(i) {
44 json!(v)
45 } else if let Ok(v) = row.try_get::<_, i32>(i) {
46 json!(v)
47 } else if let Ok(v) = row.try_get::<_, i64>(i) {
48 json!(v)
49 } else if let Ok(v) = row.try_get::<_, f32>(i) {
50 json!(v)
51 } else if let Ok(v) = row.try_get::<_, f64>(i) {
52 json!(v)
53 } else if let Ok(v) = row.try_get::<_, String>(i) {
54 Value::String(v)
55 } else if let Ok(v) = row.try_get::<_, Option<String>>(i) {
56 v.map(Value::String).unwrap_or(Value::Null)
57 } else {
58 Value::Null
59 }
60 })
61 .collect();
62 Value::Array(values)
63 })
64 .collect();
65
66 Ok(json!({ "rows": results }))
67}
68
69pub async fn execute_insert(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
71 let sql = params
72 .as_ref()
73 .and_then(|p| p.get("sql"))
74 .and_then(|v| v.as_str())
75 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
76
77 validate_sql(sql, "INSERT", "INSERT")?;
78
79 let rows_affected = client.execute(sql, &[]).await?;
80
81 Ok(json!({ "rows_affected": rows_affected }))
82}
83
84pub async fn execute_update(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
86 let sql = params
87 .as_ref()
88 .and_then(|p| p.get("sql"))
89 .and_then(|v| v.as_str())
90 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
91
92 validate_sql(sql, "UPDATE", "UPDATE")?;
93
94 let rows_affected = client.execute(sql, &[]).await?;
95
96 Ok(json!({ "rows_affected": rows_affected }))
97}
98
99pub async fn execute_delete(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
101 let sql = params
102 .as_ref()
103 .and_then(|p| p.get("sql"))
104 .and_then(|v| v.as_str())
105 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
106
107 validate_sql(sql, "DELETE", "DELETE")?;
108
109 let rows_affected = client.execute(sql, &[]).await?;
110
111 Ok(json!({ "rows_affected": rows_affected }))
112}
113
114pub async fn explain_query(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
119 let sql = params
120 .as_ref()
121 .and_then(|p| p.get("sql"))
122 .and_then(|v| v.as_str())
123 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
124
125 validate_sql(sql, "SELECT", "SELECT")?;
126
127 let analyze = params
128 .as_ref()
129 .and_then(|p| p.get("analyze"))
130 .and_then(|v| v.as_bool())
131 .unwrap_or(false);
132
133 let buffers = params
134 .as_ref()
135 .and_then(|p| p.get("buffers"))
136 .and_then(|v| v.as_bool())
137 .unwrap_or(false);
138
139 let format = params
140 .as_ref()
141 .and_then(|p| p.get("format"))
142 .and_then(|v| v.as_str())
143 .unwrap_or("json");
144
145 let format_upper = format.to_uppercase();
146 if format_upper == "XML" {
147 return Err(crate::errors::MCPError::InvalidParams(
148 "XML format is not supported — use TEXT, YAML, or JSON".into()
149 ));
150 }
151
152 let mut opts = Vec::new();
153 opts.push(format!("FORMAT {}", format_upper));
154 if analyze {
155 opts.push("ANALYZE".to_string());
156 }
157 if buffers {
158 opts.push("BUFFERS".to_string());
159 }
160
161 let mut explain_sql = String::with_capacity(sql.len() + 64);
162 explain_sql.push_str("EXPLAIN (");
163 explain_sql.push_str(&opts.join(", "));
164 explain_sql.push_str(") ");
165 explain_sql.push_str(sql);
166
167 let rows = client.query(&explain_sql, &[]).await?;
168
169 if rows.is_empty() {
170 return Ok(json!({ "plan": null }));
171 }
172
173 if format.eq_ignore_ascii_case("json") {
174 let plan: serde_json::Value = rows[0].get(0);
175 Ok(json!({
176 "plan": plan,
177 "options": { "analyze": analyze, "buffers": buffers, "format": format }
178 }))
179 } else {
180 let lines: Vec<String> = rows.iter().map(|r| r.get::<_, String>(0)).collect();
181 Ok(json!({
182 "plan": lines.join("\n"),
183 "options": { "analyze": analyze, "buffers": buffers, "format": format }
184 }))
185 }
186}
187
188pub async fn async_execute_insert(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
195 let sql = params
196 .as_ref()
197 .and_then(|p| p.get("sql"))
198 .and_then(|v| v.as_str())
199 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
200
201 validate_sql(sql, "INSERT", "INSERT")?;
202
203 client.execute("SET synchronous_commit = OFF", &[]).await?;
204 let rows_affected = client.execute(sql, &[]).await?;
205 client.execute("SET synchronous_commit = ON", &[]).await?;
206
207 Ok(json!({ "rows_affected": rows_affected }))
208}
209
210pub async fn async_execute_update(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
218 let sql = params
219 .as_ref()
220 .and_then(|p| p.get("sql"))
221 .and_then(|v| v.as_str())
222 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
223
224 validate_sql(sql, "UPDATE", "UPDATE")?;
225
226 client.execute("SET synchronous_commit = OFF", &[]).await?;
227 let rows_affected = client.execute(sql, &[]).await?;
228 client.execute("SET synchronous_commit = ON", &[]).await?;
229
230 Ok(json!({ "rows_affected": rows_affected }))
231}
232
233pub async fn async_execute_delete(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
241 let sql = params
242 .as_ref()
243 .and_then(|p| p.get("sql"))
244 .and_then(|v| v.as_str())
245 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'sql' parameter".into()))?;
246
247 validate_sql(sql, "DELETE", "DELETE")?;
248
249 client.execute("SET synchronous_commit = OFF", &[]).await?;
250 let rows_affected = client.execute(sql, &[]).await?;
251 client.execute("SET synchronous_commit = ON", &[]).await?;
252
253 Ok(json!({ "rows_affected": rows_affected }))
254}