1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::{MCPError, Result as MCPResult};
4
5const MAX_IDENTIFIER_LEN: usize = 255;
6
7fn validate_identifier(name: &str, label: &str) -> std::result::Result<(), MCPError> {
8 if name.is_empty() {
9 return Err(MCPError::InvalidParams(format!("'{label}' must not be empty")));
10 }
11 if name.len() > MAX_IDENTIFIER_LEN {
12 return Err(MCPError::InvalidParams(
13 format!("'{label}' exceeds maximum length of {MAX_IDENTIFIER_LEN} characters (got {})", name.len())
14 ));
15 }
16 Ok(())
17}
18
19pub async fn list_tables(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
21 let rows = client
22 .query(
23 "SELECT table_schema, table_name, table_type
24 FROM information_schema.tables
25 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
26 ORDER BY table_schema, table_name",
27 &[],
28 )
29 .await?;
30
31 let tables: Vec<Value> = rows
32 .iter()
33 .map(|row| {
34 json!({
35 "schema": row.get::<_, String>(0),
36 "name": row.get::<_, String>(1),
37 "type": row.get::<_, String>(2),
38 })
39 })
40 .collect();
41
42 Ok(json!({ "tables": tables }))
43}
44
45pub async fn describe_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
47 let table_name = params
48 .as_ref()
49 .and_then(|p| p.get("table"))
50 .and_then(|v| v.as_str())
51 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table' parameter".into()))?;
52
53 validate_identifier(table_name, "table")?;
54
55 let rows = client
56 .query(
57 "SELECT column_name, data_type, is_nullable, column_default, ordinal_position
58 FROM information_schema.columns
59 WHERE table_name = $1
60 ORDER BY ordinal_position",
61 &[&table_name],
62 )
63 .await?;
64
65 let columns: Vec<Value> = rows
66 .iter()
67 .map(|row| {
68 json!({
69 "name": row.get::<_, String>(0),
70 "type": row.get::<_, String>(1),
71 "nullable": row.get::<_, String>(2),
72 "default": row.get::<_, Option<String>>(3),
73 "position": row.get::<_, i32>(4),
74 })
75 })
76 .collect();
77
78 Ok(json!({ "columns": columns }))
79}
80
81pub async fn list_indexes(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
83 let rows = client
84 .query(
85 "SELECT schemaname, tablename, indexname, indexdef
86 FROM pg_indexes
87 WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
88 ORDER BY schemaname, tablename, indexname",
89 &[],
90 )
91 .await?;
92
93 let indexes: Vec<Value> = rows
94 .iter()
95 .map(|row| {
96 json!({
97 "schema": row.get::<_, String>(0),
98 "table": row.get::<_, String>(1),
99 "name": row.get::<_, String>(2),
100 "definition": row.get::<_, String>(3),
101 })
102 })
103 .collect();
104
105 Ok(json!({ "indexes": indexes }))
106}
107
108pub async fn list_schemas(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
110 let rows = client
111 .query(
112 "SELECT schema_name, schema_owner
113 FROM information_schema.schemata
114 WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
115 ORDER BY schema_name",
116 &[],
117 )
118 .await?;
119
120 let schemas: Vec<Value> = rows
121 .iter()
122 .map(|row| {
123 json!({
124 "name": row.get::<_, String>(0),
125 "owner": row.get::<_, String>(1),
126 })
127 })
128 .collect();
129
130 Ok(json!({ "schemas": schemas }))
131}
132
133pub async fn show_constraints(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
135 let rows = client
136 .query(
137 "SELECT table_schema, table_name, constraint_name, constraint_type
138 FROM information_schema.table_constraints
139 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
140 ORDER BY table_schema, table_name, constraint_name",
141 &[],
142 )
143 .await?;
144
145 let constraints: Vec<Value> = rows
146 .iter()
147 .map(|row| {
148 json!({
149 "schema": row.get::<_, String>(0),
150 "table": row.get::<_, String>(1),
151 "name": row.get::<_, String>(2),
152 "type": row.get::<_, String>(3),
153 })
154 })
155 .collect();
156
157 Ok(json!({ "constraints": constraints }))
158}
159
160pub async fn get_object_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
162 let schema_name = params
163 .as_ref()
164 .and_then(|p| p.get("schema"))
165 .and_then(|v| v.as_str())
166 .unwrap_or("public");
167
168 if schema_name.len() > MAX_IDENTIFIER_LEN {
169 return Err(MCPError::InvalidParams(
170 format!("'schema' exceeds maximum length of {MAX_IDENTIFIER_LEN} characters (got {})", schema_name.len())
171 ));
172 }
173
174 let table_name = params
175 .as_ref()
176 .and_then(|p| p.get("table"))
177 .and_then(|v| v.as_str())
178 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
179
180 validate_identifier(table_name, "table")?;
181
182 let columns = client
183 .query(
184 "SELECT c.column_name::text, c.data_type::text, c.is_nullable::text,
185 c.column_default::text, c.ordinal_position,
186 COALESCE(pgd.description, '')::text AS column_description,
187 CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END AS is_pk,
188 CASE WHEN uc.column_name IS NOT NULL THEN true ELSE false END AS is_unique
189 FROM information_schema.columns c
190 LEFT JOIN pg_catalog.pg_statio_all_tables st
191 ON st.relname = c.table_name AND st.schemaname = c.table_schema
192 LEFT JOIN pg_catalog.pg_description pgd
193 ON pgd.objoid = st.relid AND pgd.objsubid = c.ordinal_position
194 LEFT JOIN (
195 SELECT ku.column_name, tc.table_schema, tc.table_name
196 FROM information_schema.table_constraints tc
197 JOIN information_schema.key_column_usage ku
198 ON tc.constraint_name = ku.constraint_name
199 AND tc.table_schema = ku.table_schema
200 WHERE tc.constraint_type = 'PRIMARY KEY'
201 ) pk ON pk.column_name = c.column_name
202 AND pk.table_schema = c.table_schema
203 AND pk.table_name = c.table_name
204 LEFT JOIN (
205 SELECT ku.column_name, tc.table_schema, tc.table_name
206 FROM information_schema.table_constraints tc
207 JOIN information_schema.key_column_usage ku
208 ON tc.constraint_name = ku.constraint_name
209 AND tc.table_schema = ku.table_schema
210 WHERE tc.constraint_type = 'UNIQUE'
211 ) uc ON uc.column_name = c.column_name
212 AND uc.table_schema = c.table_schema
213 AND uc.table_name = c.table_name
214 WHERE c.table_schema = $1 AND c.table_name = $2
215 ORDER BY c.ordinal_position",
216 &[&schema_name, &table_name],
217 )
218 .await?;
219
220 let cols: Vec<Value> = columns.iter().map(|row| {
221 json!({
222 "name": row.get::<_, String>(0),
223 "type": row.get::<_, String>(1),
224 "nullable": row.get::<_, String>(2) == "YES",
225 "default": row.get::<_, Option<String>>(3),
226 "position": row.get::<_, i32>(4),
227 "description": row.get::<_, String>(5),
228 "is_primary_key": row.get::<_, bool>(6),
229 "is_unique": row.get::<_, bool>(7),
230 })
231 }).collect();
232
233 let indexes = client
234 .query(
235 "SELECT indexname::text, indexdef::text
236 FROM pg_indexes
237 WHERE schemaname = $1 AND tablename = $2
238 ORDER BY indexname",
239 &[&schema_name, &table_name],
240 )
241 .await?;
242
243 let idxs: Vec<Value> = indexes.iter().map(|row| {
244 json!({
245 "name": row.get::<_, String>(0),
246 "definition": row.get::<_, String>(1),
247 })
248 }).collect();
249
250 let foreign_keys = client
251 .query(
252 "SELECT kcu.column_name::text,
253 ccu.table_schema::text AS foreign_schema,
254 ccu.table_name::text AS foreign_table,
255 ccu.column_name::text AS foreign_column,
256 rc.update_rule::text, rc.delete_rule::text
257 FROM information_schema.table_constraints tc
258 JOIN information_schema.key_column_usage kcu
259 ON tc.constraint_name = kcu.constraint_name
260 AND tc.table_schema = kcu.table_schema
261 JOIN information_schema.constraint_column_usage ccu
262 ON tc.constraint_name = ccu.constraint_name
263 AND tc.table_schema = ccu.table_schema
264 JOIN information_schema.referential_constraints rc
265 ON tc.constraint_name = rc.constraint_name
266 AND tc.table_schema = rc.constraint_schema
267 WHERE tc.constraint_type = 'FOREIGN KEY'
268 AND tc.table_schema = $1 AND tc.table_name = $2
269 ORDER BY kcu.ordinal_position",
270 &[&schema_name, &table_name],
271 )
272 .await?;
273
274 let fks: Vec<Value> = foreign_keys.iter().map(|row| {
275 json!({
276 "column": row.get::<_, String>(0),
277 "references_schema": row.get::<_, String>(1),
278 "references_table": row.get::<_, String>(2),
279 "references_column": row.get::<_, String>(3),
280 "on_update": row.get::<_, String>(4),
281 "on_delete": row.get::<_, String>(5),
282 })
283 }).collect();
284
285 let constraints = client
286 .query(
287 "SELECT constraint_name::text, constraint_type::text
288 FROM information_schema.table_constraints
289 WHERE table_schema = $1 AND table_name = $2
290 ORDER BY constraint_name",
291 &[&schema_name, &table_name],
292 )
293 .await?;
294
295 let cons: Vec<Value> = constraints.iter().map(|row| {
296 json!({
297 "name": row.get::<_, String>(0),
298 "type": row.get::<_, String>(1),
299 })
300 }).collect();
301
302 let row_estimate = client
303 .query_one(
304 "SELECT n_live_tup FROM pg_stat_user_tables
305 WHERE schemaname = $1 AND relname = $2",
306 &[&schema_name, &table_name],
307 )
308 .await
309 .map(|r| r.get::<_, Option<f64>>(0))
310 .unwrap_or(None);
311
312 let table_size = client
313 .query_one(
314 "SELECT pg_size_pretty(pg_total_relation_size($1::regclass))",
315 &[&format!("{}.{}", schema_name, table_name)],
316 )
317 .await
318 .map(|r| r.get::<_, Option<String>>(0))
319 .unwrap_or(None);
320
321 Ok(json!({
322 "table": table_name,
323 "schema": schema_name,
324 "columns": cols,
325 "indexes": idxs,
326 "foreign_keys": fks,
327 "constraints": cons,
328 "estimated_rows": row_estimate,
329 "total_size": table_size,
330 }))
331}
332
333pub async fn list_triggers(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
335 let table = params
336 .as_ref()
337 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
338 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
339
340 let schema = params
341 .as_ref()
342 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
343 .unwrap_or("public");
344
345 let limit = params
346 .as_ref()
347 .and_then(|p| p.get("limit").and_then(|v| v.as_i64()))
348 .unwrap_or(1000);
349
350 validate_identifier(table, "table")?;
351 validate_identifier(schema, "schema")?;
352
353 if !((1..=10000).contains(&limit)) {
354 return Err(MCPError::InvalidParams(
355 format!("'limit' must be between 1 and 10000 (got {})", limit)
356 ));
357 }
358
359 let rows = client
360 .query(
361 "SELECT trigger_name, event_object_table, event_manipulation,
362 action_timing, action_statement, trigger_schema
363 FROM information_schema.triggers
364 WHERE event_object_table = $1 AND trigger_schema = $2
365 ORDER BY trigger_name
366 LIMIT $3",
367 &[&table, &schema, &limit],
368 )
369 .await?;
370
371 let triggers: Vec<Value> = rows
372 .iter()
373 .map(|row| {
374 json!({
375 "name": row.get::<_, String>(0),
376 "table": row.get::<_, String>(1),
377 "event": row.get::<_, String>(2),
378 "timing": row.get::<_, String>(3),
379 "statement": row.get::<_, String>(4),
380 "schema": row.get::<_, String>(5),
381 })
382 })
383 .collect();
384
385 Ok(json!({
386 "table": table,
387 "schema": schema,
388 "trigger_count": triggers.len(),
389 "triggers": triggers
390 }))
391}
392
393pub async fn create_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
395 let index_name = params
396 .as_ref()
397 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
398 .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
399
400 let table = params
401 .as_ref()
402 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
403 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
404
405 let columns = params
406 .as_ref()
407 .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
408 .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
409
410 if columns.is_empty() {
411 return Err(MCPError::InvalidParams("'columns' array must not be empty".into()));
412 }
413
414 validate_identifier(index_name, "index_name")?;
415 validate_identifier(table, "table")?;
416
417 let mut column_list = Vec::new();
418 for col in columns {
419 let col_name = col.as_str()
420 .ok_or_else(|| MCPError::InvalidParams("Column names must be strings".into()))?;
421 validate_identifier(col_name, "column")?;
422 column_list.push(col_name.to_string());
423 }
424
425 let unique = params
426 .as_ref()
427 .and_then(|p| p.get("unique").and_then(|v| v.as_bool()))
428 .unwrap_or(false);
429
430 let concurrent = params
431 .as_ref()
432 .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
433 .unwrap_or(false);
434
435 let unique_str = if unique { "UNIQUE " } else { "" };
436 let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
437 let columns_str = column_list.join(", ");
438
439 let sql = format!(
440 "CREATE {}INDEX {}{}ON {}({})",
441 unique_str,
442 concurrent_str,
443 index_name,
444 table,
445 columns_str
446 );
447
448 client.execute(&sql, &[]).await?;
449
450 Ok(json!({
451 "status": "success",
452 "action": "CREATE INDEX",
453 "index_name": index_name,
454 "table": table,
455 "columns": column_list,
456 "unique": unique,
457 "concurrent": concurrent
458 }))
459}
460
461pub async fn drop_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
463 let index_name = params
464 .as_ref()
465 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
466 .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
467
468 validate_identifier(index_name, "index_name")?;
469
470 let if_exists = params
471 .as_ref()
472 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
473 .unwrap_or(false);
474
475 let concurrent = params
476 .as_ref()
477 .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
478 .unwrap_or(false);
479
480 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
481 let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
482
483 let sql = format!(
484 "DROP INDEX {}{}{}",
485 if_exists_str,
486 concurrent_str,
487 index_name
488 );
489
490 client.execute(&sql, &[]).await?;
491
492 Ok(json!({
493 "status": "success",
494 "action": "DROP INDEX",
495 "index_name": index_name,
496 "if_exists": if_exists,
497 "concurrent": concurrent
498 }))
499}
500
501pub async fn create_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
503 let table = params
504 .as_ref()
505 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
506 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
507
508 let partition_name = params
509 .as_ref()
510 .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
511 .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
512
513 let partition_type = params
514 .as_ref()
515 .and_then(|p| p.get("partition_type").and_then(|v| v.as_str()))
516 .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_type' parameter (RANGE/LIST/HASH)".into()))?;
517
518 validate_identifier(table, "table")?;
519 validate_identifier(partition_name, "partition_name")?;
520
521 let partition_type_upper = partition_type.to_uppercase();
522 if !["RANGE", "LIST", "HASH"].contains(&partition_type_upper.as_str()) {
523 return Err(MCPError::InvalidParams(
524 format!("'partition_type' must be RANGE, LIST, or HASH (got {})", partition_type)
525 ));
526 }
527
528 let column = params
529 .as_ref()
530 .and_then(|p| p.get("column").and_then(|v| v.as_str()))
531 .ok_or_else(|| MCPError::InvalidParams("Missing 'column' parameter".into()))?;
532
533 validate_identifier(column, "column")?;
534
535 let values = params
536 .as_ref()
537 .and_then(|p| p.get("values").and_then(|v| v.as_str()))
538 .ok_or_else(|| MCPError::InvalidParams(
539 "Missing 'values' parameter (FOR RANGE: 'FROM (x) TO (y)' or FOR LIST: 'IN (values)' or FOR HASH: 'MODULUS n REMAINDER r')".into()
540 ))?;
541
542 if values.contains(';') || values.contains("--") {
543 return Err(MCPError::InvalidParams(
544 "Invalid 'values' parameter: semicolons and SQL comments not allowed".into()
545 ));
546 }
547
548 let sql = format!(
549 "CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
550 partition_name,
551 table,
552 values
553 );
554
555 client.execute(&sql, &[]).await?;
556
557 Ok(json!({
558 "status": "success",
559 "action": "CREATE TABLE PARTITION",
560 "table": table,
561 "partition_name": partition_name,
562 "partition_type": partition_type,
563 "column": column
564 }))
565}
566
567pub async fn drop_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
569 let table = params
570 .as_ref()
571 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
572 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
573
574 let partition_name = params
575 .as_ref()
576 .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
577 .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
578
579 validate_identifier(partition_name, "partition_name")?;
580
581 let if_exists = params
582 .as_ref()
583 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
584 .unwrap_or(false);
585
586 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
587
588 let sql = format!(
589 "DROP TABLE {}{}",
590 if_exists_str,
591 partition_name
592 );
593
594 client.execute(&sql, &[]).await?;
595
596 Ok(json!({
597 "status": "success",
598 "action": "DROP TABLE PARTITION",
599 "table": table,
600 "partition_name": partition_name,
601 "if_exists": if_exists
602 }))
603}
604
605pub async fn create_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
607 let table = params
608 .as_ref()
609 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
610 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
611
612 let columns = params
613 .as_ref()
614 .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
615 .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
616
617 if columns.is_empty() {
618 return Err(MCPError::InvalidParams("'columns' array must not be empty".into()));
619 }
620
621 validate_identifier(table, "table")?;
622
623 let mut column_defs = Vec::new();
624 for (idx, col) in columns.iter().enumerate() {
625 let col_def = col.as_str()
626 .ok_or_else(|| MCPError::InvalidParams(format!("Column {} must be a string with format: 'name TYPE [constraints]'", idx)))?;
627
628 if col_def.is_empty() {
629 return Err(MCPError::InvalidParams(format!("Column {} definition cannot be empty", idx)));
630 }
631
632 if col_def.contains(';') || col_def.contains("--") {
633 return Err(MCPError::InvalidParams(
634 format!("Column {} definition contains dangerous SQL patterns", idx)
635 ));
636 }
637
638 column_defs.push(col_def.to_string());
639 }
640
641 let columns_str = column_defs.join(", ");
642 let sql = format!("CREATE TABLE {} ({})", table, columns_str);
643
644 client.execute(&sql, &[]).await?;
645
646 Ok(json!({
647 "status": "success",
648 "action": "CREATE TABLE",
649 "table": table,
650 "column_count": columns.len()
651 }))
652}
653
654pub async fn drop_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
656 let table = params
657 .as_ref()
658 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
659 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
660
661 validate_identifier(table, "table")?;
662
663 let if_exists = params
664 .as_ref()
665 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
666 .unwrap_or(false);
667
668 let cascade = params
669 .as_ref()
670 .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
671 .unwrap_or(false);
672
673 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
674 let cascade_str = if cascade { " CASCADE" } else { "" };
675
676 let sql = format!("DROP TABLE {}{}{}", if_exists_str, table, cascade_str);
677
678 client.execute(&sql, &[]).await?;
679
680 Ok(json!({
681 "status": "success",
682 "action": "DROP TABLE",
683 "table": table,
684 "if_exists": if_exists,
685 "cascade": cascade
686 }))
687}
688
689pub async fn create_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
691 let view_name = params
692 .as_ref()
693 .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
694 .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
695
696 let query = params
697 .as_ref()
698 .and_then(|p| p.get("query").and_then(|v| v.as_str()))
699 .ok_or_else(|| MCPError::InvalidParams("Missing 'query' parameter".into()))?;
700
701 validate_identifier(view_name, "view_name")?;
702
703 if query.trim().is_empty() {
704 return Err(MCPError::InvalidParams("'query' cannot be empty".into()));
705 }
706
707 let materialized = params
708 .as_ref()
709 .and_then(|p| p.get("materialized").and_then(|v| v.as_bool()))
710 .unwrap_or(false);
711
712 let or_replace = params
713 .as_ref()
714 .and_then(|p| p.get("or_replace").and_then(|v| v.as_bool()))
715 .unwrap_or(false);
716
717 let materialized_str = if materialized { "MATERIALIZED " } else { "" };
718 let or_replace_str = if or_replace { "OR REPLACE " } else { "" };
719
720 let sql = format!("CREATE {}{}VIEW {} AS {}", or_replace_str, materialized_str, view_name, query);
721
722 client.execute(&sql, &[]).await?;
723
724 Ok(json!({
725 "status": "success",
726 "action": "CREATE VIEW",
727 "view_name": view_name,
728 "materialized": materialized,
729 "or_replace": or_replace
730 }))
731}
732
733pub async fn drop_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
735 let view_name = params
736 .as_ref()
737 .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
738 .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
739
740 validate_identifier(view_name, "view_name")?;
741
742 let if_exists = params
743 .as_ref()
744 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
745 .unwrap_or(false);
746
747 let cascade = params
748 .as_ref()
749 .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
750 .unwrap_or(false);
751
752 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
753 let cascade_str = if cascade { " CASCADE" } else { "" };
754
755 let sql = format!("DROP VIEW {}{}{}", if_exists_str, view_name, cascade_str);
756
757 client.execute(&sql, &[]).await?;
758
759 Ok(json!({
760 "status": "success",
761 "action": "DROP VIEW",
762 "view_name": view_name,
763 "if_exists": if_exists,
764 "cascade": cascade
765 }))
766}
767
768pub async fn alter_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
770 let view_name = params
771 .as_ref()
772 .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
773 .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
774
775 validate_identifier(view_name, "view_name")?;
776
777 let rename_to = params
778 .as_ref()
779 .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
780
781 let set_schema = params
782 .as_ref()
783 .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
784
785 if rename_to.is_none() && set_schema.is_none() {
786 return Err(MCPError::InvalidParams(
787 "Must provide either 'rename_to' or 'set_schema' parameter".into()
788 ));
789 }
790
791 let mut action_desc = Vec::new();
792
793 if let Some(new_name) = rename_to {
794 validate_identifier(new_name, "rename_to")?;
795 let sql = format!("ALTER VIEW {} RENAME TO {}", view_name, new_name);
796 client.execute(&sql, &[]).await?;
797 action_desc.push(format!("renamed to {}", new_name));
798 }
799
800 if let Some(schema) = set_schema {
801 validate_identifier(schema, "set_schema")?;
802 let sql = format!("ALTER VIEW {} SET SCHEMA {}", view_name, schema);
803 client.execute(&sql, &[]).await?;
804 action_desc.push(format!("moved to schema {}", schema));
805 }
806
807 Ok(json!({
808 "status": "success",
809 "action": "ALTER VIEW",
810 "view_name": view_name,
811 "changes": action_desc
812 }))
813}
814
815pub async fn create_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
817 let schema_name = params
818 .as_ref()
819 .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
820 .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
821
822 validate_identifier(schema_name, "schema_name")?;
823
824 let if_not_exists = params
825 .as_ref()
826 .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
827 .unwrap_or(false);
828
829 let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
830
831 let sql = format!("CREATE SCHEMA {}{}", if_not_exists_str, schema_name);
832
833 client.execute(&sql, &[]).await?;
834
835 Ok(json!({
836 "status": "success",
837 "action": "CREATE SCHEMA",
838 "schema_name": schema_name,
839 "if_not_exists": if_not_exists
840 }))
841}
842
843pub async fn drop_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
845 let schema_name = params
846 .as_ref()
847 .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
848 .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
849
850 validate_identifier(schema_name, "schema_name")?;
851
852 let if_exists = params
853 .as_ref()
854 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
855 .unwrap_or(false);
856
857 let cascade = params
858 .as_ref()
859 .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
860 .unwrap_or(false);
861
862 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
863 let cascade_str = if cascade { " CASCADE" } else { "" };
864
865 let sql = format!("DROP SCHEMA {}{}{}", if_exists_str, schema_name, cascade_str);
866
867 client.execute(&sql, &[]).await?;
868
869 Ok(json!({
870 "status": "success",
871 "action": "DROP SCHEMA",
872 "schema_name": schema_name,
873 "if_exists": if_exists,
874 "cascade": cascade
875 }))
876}
877
878pub async fn create_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
880 let sequence_name = params
881 .as_ref()
882 .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
883 .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
884
885 validate_identifier(sequence_name, "sequence_name")?;
886
887 let if_not_exists = params
888 .as_ref()
889 .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
890 .unwrap_or(false);
891
892 let start = params
893 .as_ref()
894 .and_then(|p| p.get("start").and_then(|v| v.as_i64()))
895 .unwrap_or(1);
896
897 let increment = params
898 .as_ref()
899 .and_then(|p| p.get("increment").and_then(|v| v.as_i64()))
900 .unwrap_or(1);
901
902 let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
903
904 let sql = format!(
905 "CREATE SEQUENCE {}{}START {} INCREMENT {}",
906 if_not_exists_str,
907 sequence_name,
908 start,
909 increment
910 );
911
912 client.execute(&sql, &[]).await?;
913
914 Ok(json!({
915 "status": "success",
916 "action": "CREATE SEQUENCE",
917 "sequence_name": sequence_name,
918 "start": start,
919 "increment": increment,
920 "if_not_exists": if_not_exists
921 }))
922}
923
924pub async fn drop_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
926 let sequence_name = params
927 .as_ref()
928 .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
929 .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
930
931 validate_identifier(sequence_name, "sequence_name")?;
932
933 let if_exists = params
934 .as_ref()
935 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
936 .unwrap_or(false);
937
938 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
939
940 let sql = format!("DROP SEQUENCE {}{}", if_exists_str, sequence_name);
941
942 client.execute(&sql, &[]).await?;
943
944 Ok(json!({
945 "status": "success",
946 "action": "DROP SEQUENCE",
947 "sequence_name": sequence_name,
948 "if_exists": if_exists
949 }))
950}
951
952pub async fn alter_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
954 let index_name = params
955 .as_ref()
956 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
957 .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
958
959 validate_identifier(index_name, "index_name")?;
960
961 let rename_to = params
962 .as_ref()
963 .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
964
965 let set_schema = params
966 .as_ref()
967 .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
968
969 if rename_to.is_none() && set_schema.is_none() {
970 return Err(MCPError::InvalidParams(
971 "Must provide either 'rename_to' or 'set_schema' parameter".into()
972 ));
973 }
974
975 let mut action_desc = Vec::new();
976
977 if let Some(new_name) = rename_to {
978 validate_identifier(new_name, "rename_to")?;
979 let sql = format!("ALTER INDEX {} RENAME TO {}", index_name, new_name);
980 client.execute(&sql, &[]).await?;
981 action_desc.push(format!("renamed to {}", new_name));
982 }
983
984 if let Some(schema) = set_schema {
985 validate_identifier(schema, "set_schema")?;
986 let sql = format!("ALTER INDEX {} SET SCHEMA {}", index_name, schema);
987 client.execute(&sql, &[]).await?;
988 action_desc.push(format!("moved to schema {}", schema));
989 }
990
991 Ok(json!({
992 "status": "success",
993 "action": "ALTER INDEX",
994 "index_name": index_name,
995 "changes": action_desc
996 }))
997}
998
999pub async fn list_partitions(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
1001 let table = params
1002 .as_ref()
1003 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
1004 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
1005
1006 validate_identifier(table, "table")?;
1007
1008 let rows = client
1009 .query(
1010 "SELECT inhrelname as partition_name, pg_size_pretty(pg_relation_size(inhrelid)) as size
1011 FROM pg_inherits
1012 JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
1013 JOIN pg_class child ON pg_inherits.inhrelid = child.oid
1014 JOIN pg_namespace ON child.relnamespace = pg_namespace.oid
1015 WHERE parent.relname = $1
1016 ORDER BY child.relname",
1017 &[&table],
1018 )
1019 .await?;
1020
1021 let partitions: Vec<Value> = rows
1022 .iter()
1023 .map(|row| {
1024 json!({
1025 "partition_name": row.get::<_, String>(0),
1026 "size": row.get::<_, String>(1),
1027 })
1028 })
1029 .collect();
1030
1031 Ok(json!({
1032 "table": table,
1033 "partition_count": partitions.len(),
1034 "partitions": partitions
1035 }))
1036}
1037
1038pub async fn backup_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
1040 let table = params
1041 .as_ref()
1042 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
1043 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
1044
1045 validate_identifier(table, "table")?;
1046
1047 let backup_name = format!("backup_{}", table);
1048 validate_identifier(&backup_name, "backup_name")?;
1049
1050 let _: (i64,) = client
1052 .query_one(
1053 "SELECT 1 FROM information_schema.tables
1054 WHERE table_name = $1 AND table_schema NOT IN ('pg_catalog', 'information_schema')",
1055 &[&table],
1056 )
1057 .await
1058 .map_err(|_| MCPError::InvalidParams(format!("Table '{}' does not exist", table)))?
1059 .try_get(0)
1060 .map(|val| (val,))
1061 .map_err(|_| MCPError::InvalidParams("Could not verify table existence".into()))?;
1062
1063 let backup_exists = client
1065 .query_opt(
1066 "SELECT 1 FROM information_schema.tables WHERE table_name = $1",
1067 &[&backup_name],
1068 )
1069 .await?
1070 .is_some();
1071
1072 if backup_exists {
1073 return Err(MCPError::InvalidParams(
1074 format!("Backup table '{}' already exists. Drop it first or use a different table.", backup_name)
1075 ));
1076 }
1077
1078 let create_backup_sql = format!("CREATE TABLE {} AS SELECT * FROM {}", backup_name, table);
1080 client.execute(&create_backup_sql, &[]).await?;
1081
1082 let columns: Vec<(String, String, String)> = client
1084 .query(
1085 "SELECT column_name, data_type, is_nullable
1086 FROM information_schema.columns
1087 WHERE table_name = $1
1088 ORDER BY ordinal_position",
1089 &[&table],
1090 )
1091 .await?
1092 .iter()
1093 .map(|row| {
1094 (
1095 row.get::<_, String>(0),
1096 row.get::<_, String>(1),
1097 row.get::<_, String>(2),
1098 )
1099 })
1100 .collect();
1101
1102 let indexes: Vec<String> = client
1104 .query(
1105 "SELECT indexname FROM pg_indexes WHERE tablename = $1 AND schemaname NOT IN ('pg_catalog')",
1106 &[&table],
1107 )
1108 .await?
1109 .iter()
1110 .map(|row| row.get::<_, String>(0))
1111 .collect();
1112
1113 let mut indexes_created = 0;
1114 for idx_name in indexes {
1115 let new_idx_name = format!("{}_on_{}", idx_name, backup_name);
1116 let idx_def: String = client
1117 .query_one(
1118 "SELECT indexdef FROM pg_indexes WHERE indexname = $1",
1119 &[&idx_name],
1120 )
1121 .await?
1122 .get(0);
1123
1124 let updated_def = idx_def
1125 .replace(&format!("ON {}", table), &format!("ON {}", backup_name))
1126 .replace(&idx_name, &new_idx_name);
1127
1128 if client.execute(&updated_def, &[]).await.is_ok() {
1129 indexes_created += 1;
1130 }
1131 }
1132
1133 let row_count: (i64,) = client
1135 .query_one(&format!("SELECT COUNT(*) FROM {}", backup_name), &[])
1136 .await?
1137 .try_get(0)
1138 .map(|val| (val,))
1139 .map_err(|_| MCPError::InvalidParams("Could not count rows".into()))?;
1140
1141 Ok(json!({
1142 "status": "success",
1143 "action": "BACKUP TABLE",
1144 "original_table": table,
1145 "backup_table": backup_name,
1146 "rows_copied": row_count.0,
1147 "columns_copied": columns.len(),
1148 "indexes_created": indexes_created,
1149 "message": format!("Table '{}' backed up to '{}' with {} rows", table, backup_name, row_count.0)
1150 }))
1151}