1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::{MCPError, Result as MCPResult};
4use crate::validation::validate_identifier;
5
6pub async fn list_tables(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
8 let rows = client
9 .query(
10 "SELECT table_schema, table_name, table_type
11 FROM information_schema.tables
12 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
13 ORDER BY table_schema, table_name",
14 &[],
15 )
16 .await?;
17
18 let tables: Vec<Value> = rows
19 .iter()
20 .map(|row| {
21 json!({
22 "schema": row.get::<_, String>(0),
23 "name": row.get::<_, String>(1),
24 "type": row.get::<_, String>(2),
25 })
26 })
27 .collect();
28
29 Ok(json!({ "tables": tables }))
30}
31
32pub async fn describe_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
34 let table_name = params
35 .as_ref()
36 .and_then(|p| p.get("table"))
37 .and_then(|v| v.as_str())
38 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table' parameter".into()))?;
39
40 validate_identifier(table_name, "table")?;
41
42 let rows = client
43 .query(
44 "SELECT column_name, data_type, is_nullable, column_default, ordinal_position
45 FROM information_schema.columns
46 WHERE table_name = $1
47 ORDER BY ordinal_position",
48 &[&table_name],
49 )
50 .await?;
51
52 let columns: Vec<Value> = rows
53 .iter()
54 .map(|row| {
55 json!({
56 "name": row.get::<_, String>(0),
57 "type": row.get::<_, String>(1),
58 "nullable": row.get::<_, String>(2),
59 "default": row.get::<_, Option<String>>(3),
60 "position": row.get::<_, i32>(4),
61 })
62 })
63 .collect();
64
65 Ok(json!({ "columns": columns }))
66}
67
68pub async fn list_indexes(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
70 let rows = client
71 .query(
72 "SELECT schemaname, tablename, indexname, indexdef
73 FROM pg_indexes
74 WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
75 ORDER BY schemaname, tablename, indexname",
76 &[],
77 )
78 .await?;
79
80 let indexes: Vec<Value> = rows
81 .iter()
82 .map(|row| {
83 json!({
84 "schema": row.get::<_, String>(0),
85 "table": row.get::<_, String>(1),
86 "name": row.get::<_, String>(2),
87 "definition": row.get::<_, String>(3),
88 })
89 })
90 .collect();
91
92 Ok(json!({ "indexes": indexes }))
93}
94
95pub async fn list_schemas(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
97 let rows = client
98 .query(
99 "SELECT schema_name, schema_owner
100 FROM information_schema.schemata
101 WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
102 ORDER BY schema_name",
103 &[],
104 )
105 .await?;
106
107 let schemas: Vec<Value> = rows
108 .iter()
109 .map(|row| {
110 json!({
111 "name": row.get::<_, String>(0),
112 "owner": row.get::<_, String>(1),
113 })
114 })
115 .collect();
116
117 Ok(json!({ "schemas": schemas }))
118}
119
120pub async fn show_constraints(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
122 let rows = client
123 .query(
124 "SELECT table_schema, table_name, constraint_name, constraint_type
125 FROM information_schema.table_constraints
126 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
127 ORDER BY table_schema, table_name, constraint_name",
128 &[],
129 )
130 .await?;
131
132 let constraints: Vec<Value> = rows
133 .iter()
134 .map(|row| {
135 json!({
136 "schema": row.get::<_, String>(0),
137 "table": row.get::<_, String>(1),
138 "name": row.get::<_, String>(2),
139 "type": row.get::<_, String>(3),
140 })
141 })
142 .collect();
143
144 Ok(json!({ "constraints": constraints }))
145}
146
147pub async fn get_object_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
149 let schema_name = params
150 .as_ref()
151 .and_then(|p| p.get("schema"))
152 .and_then(|v| v.as_str())
153 .unwrap_or("public");
154
155 validate_identifier(schema_name, "schema")?;
156
157 let table_name = params
158 .as_ref()
159 .and_then(|p| p.get("table"))
160 .and_then(|v| v.as_str())
161 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
162
163 validate_identifier(table_name, "table")?;
164
165 let columns = client
166 .query(
167 "SELECT c.column_name::text, c.data_type::text, c.is_nullable::text,
168 c.column_default::text, c.ordinal_position,
169 COALESCE(pgd.description, '')::text AS column_description,
170 CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END AS is_pk,
171 CASE WHEN uc.column_name IS NOT NULL THEN true ELSE false END AS is_unique
172 FROM information_schema.columns c
173 LEFT JOIN pg_catalog.pg_statio_all_tables st
174 ON st.relname = c.table_name AND st.schemaname = c.table_schema
175 LEFT JOIN pg_catalog.pg_description pgd
176 ON pgd.objoid = st.relid AND pgd.objsubid = c.ordinal_position
177 LEFT JOIN (
178 SELECT ku.column_name, tc.table_schema, tc.table_name
179 FROM information_schema.table_constraints tc
180 JOIN information_schema.key_column_usage ku
181 ON tc.constraint_name = ku.constraint_name
182 AND tc.table_schema = ku.table_schema
183 WHERE tc.constraint_type = 'PRIMARY KEY'
184 ) pk ON pk.column_name = c.column_name
185 AND pk.table_schema = c.table_schema
186 AND pk.table_name = c.table_name
187 LEFT JOIN (
188 SELECT ku.column_name, tc.table_schema, tc.table_name
189 FROM information_schema.table_constraints tc
190 JOIN information_schema.key_column_usage ku
191 ON tc.constraint_name = ku.constraint_name
192 AND tc.table_schema = ku.table_schema
193 WHERE tc.constraint_type = 'UNIQUE'
194 ) uc ON uc.column_name = c.column_name
195 AND uc.table_schema = c.table_schema
196 AND uc.table_name = c.table_name
197 WHERE c.table_schema = $1 AND c.table_name = $2
198 ORDER BY c.ordinal_position",
199 &[&schema_name, &table_name],
200 )
201 .await?;
202
203 let cols: Vec<Value> = columns.iter().map(|row| {
204 json!({
205 "name": row.get::<_, String>(0),
206 "type": row.get::<_, String>(1),
207 "nullable": row.get::<_, String>(2) == "YES",
208 "default": row.get::<_, Option<String>>(3),
209 "position": row.get::<_, i32>(4),
210 "description": row.get::<_, String>(5),
211 "is_primary_key": row.get::<_, bool>(6),
212 "is_unique": row.get::<_, bool>(7),
213 })
214 }).collect();
215
216 let indexes = client
217 .query(
218 "SELECT indexname::text, indexdef::text
219 FROM pg_indexes
220 WHERE schemaname = $1 AND tablename = $2
221 ORDER BY indexname",
222 &[&schema_name, &table_name],
223 )
224 .await?;
225
226 let idxs: Vec<Value> = indexes.iter().map(|row| {
227 json!({
228 "name": row.get::<_, String>(0),
229 "definition": row.get::<_, String>(1),
230 })
231 }).collect();
232
233 let foreign_keys = client
234 .query(
235 "SELECT kcu.column_name::text,
236 ccu.table_schema::text AS foreign_schema,
237 ccu.table_name::text AS foreign_table,
238 ccu.column_name::text AS foreign_column,
239 rc.update_rule::text, rc.delete_rule::text
240 FROM information_schema.table_constraints tc
241 JOIN information_schema.key_column_usage kcu
242 ON tc.constraint_name = kcu.constraint_name
243 AND tc.table_schema = kcu.table_schema
244 JOIN information_schema.constraint_column_usage ccu
245 ON tc.constraint_name = ccu.constraint_name
246 AND tc.table_schema = ccu.table_schema
247 JOIN information_schema.referential_constraints rc
248 ON tc.constraint_name = rc.constraint_name
249 AND tc.table_schema = rc.constraint_schema
250 WHERE tc.constraint_type = 'FOREIGN KEY'
251 AND tc.table_schema = $1 AND tc.table_name = $2
252 ORDER BY kcu.ordinal_position",
253 &[&schema_name, &table_name],
254 )
255 .await?;
256
257 let fks: Vec<Value> = foreign_keys.iter().map(|row| {
258 json!({
259 "column": row.get::<_, String>(0),
260 "references_schema": row.get::<_, String>(1),
261 "references_table": row.get::<_, String>(2),
262 "references_column": row.get::<_, String>(3),
263 "on_update": row.get::<_, String>(4),
264 "on_delete": row.get::<_, String>(5),
265 })
266 }).collect();
267
268 let constraints = client
269 .query(
270 "SELECT constraint_name::text, constraint_type::text
271 FROM information_schema.table_constraints
272 WHERE table_schema = $1 AND table_name = $2
273 ORDER BY constraint_name",
274 &[&schema_name, &table_name],
275 )
276 .await?;
277
278 let cons: Vec<Value> = constraints.iter().map(|row| {
279 json!({
280 "name": row.get::<_, String>(0),
281 "type": row.get::<_, String>(1),
282 })
283 }).collect();
284
285 let row_estimate = client
286 .query_one(
287 "SELECT n_live_tup FROM pg_stat_user_tables
288 WHERE schemaname = $1 AND relname = $2",
289 &[&schema_name, &table_name],
290 )
291 .await
292 .map(|r| r.get::<_, Option<f64>>(0))
293 .unwrap_or(None);
294
295 let table_size = client
296 .query_one(
297 "SELECT pg_size_pretty(pg_total_relation_size($1::regclass))",
298 &[&format!("{}.{}", schema_name, table_name)],
299 )
300 .await
301 .map(|r| r.get::<_, Option<String>>(0))
302 .unwrap_or(None);
303
304 Ok(json!({
305 "table": table_name,
306 "schema": schema_name,
307 "columns": cols,
308 "indexes": idxs,
309 "foreign_keys": fks,
310 "constraints": cons,
311 "estimated_rows": row_estimate,
312 "total_size": table_size,
313 }))
314}
315
316pub async fn list_triggers(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
318 let table = params
319 .as_ref()
320 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
321 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
322
323 let schema = params
324 .as_ref()
325 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
326 .unwrap_or("public");
327
328 let limit = params
329 .as_ref()
330 .and_then(|p| p.get("limit").and_then(|v| v.as_i64()))
331 .unwrap_or(1000);
332
333 validate_identifier(table, "table")?;
334 validate_identifier(schema, "schema")?;
335
336 if !((1..=10000).contains(&limit)) {
337 return Err(MCPError::InvalidParams(
338 format!("'limit' must be between 1 and 10000 (got {})", limit)
339 ));
340 }
341
342 let rows = client
343 .query(
344 "SELECT trigger_name, event_object_table, event_manipulation,
345 action_timing, action_statement, trigger_schema
346 FROM information_schema.triggers
347 WHERE event_object_table = $1 AND trigger_schema = $2
348 ORDER BY trigger_name
349 LIMIT $3",
350 &[&table, &schema, &limit],
351 )
352 .await?;
353
354 let triggers: Vec<Value> = rows
355 .iter()
356 .map(|row| {
357 json!({
358 "name": row.get::<_, String>(0),
359 "table": row.get::<_, String>(1),
360 "event": row.get::<_, String>(2),
361 "timing": row.get::<_, String>(3),
362 "statement": row.get::<_, String>(4),
363 "schema": row.get::<_, String>(5),
364 })
365 })
366 .collect();
367
368 Ok(json!({
369 "table": table,
370 "schema": schema,
371 "trigger_count": triggers.len(),
372 "triggers": triggers
373 }))
374}
375
376pub async fn create_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
378 let index_name = params
379 .as_ref()
380 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
381 .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
382
383 let table = params
384 .as_ref()
385 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
386 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
387
388 let columns = params
389 .as_ref()
390 .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
391 .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
392
393 if columns.is_empty() {
394 return Err(MCPError::InvalidParams("'columns' array must not be empty".into()));
395 }
396
397 validate_identifier(index_name, "index_name")?;
398 validate_identifier(table, "table")?;
399
400 let mut column_list = Vec::new();
401 for col in columns {
402 let col_name = col.as_str()
403 .ok_or_else(|| MCPError::InvalidParams("Column names must be strings".into()))?;
404 validate_identifier(col_name, "column")?;
405 column_list.push(col_name.to_string());
406 }
407
408 let unique = params
409 .as_ref()
410 .and_then(|p| p.get("unique").and_then(|v| v.as_bool()))
411 .unwrap_or(false);
412
413 let concurrent = params
414 .as_ref()
415 .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
416 .unwrap_or(false);
417
418 let unique_str = if unique { "UNIQUE " } else { "" };
419 let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
420 let columns_str = column_list.join(", ");
421
422 let sql = format!(
423 "CREATE {}INDEX {} {} ON {}({})",
424 unique_str,
425 concurrent_str,
426 index_name,
427 table,
428 columns_str
429 );
430
431 client.execute(&sql, &[]).await?;
432
433 Ok(json!({
434 "status": "success",
435 "action": "CREATE INDEX",
436 "index_name": index_name,
437 "table": table,
438 "columns": column_list,
439 "unique": unique,
440 "concurrent": concurrent
441 }))
442}
443
444pub async fn drop_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
446 let index_name = params
447 .as_ref()
448 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
449 .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
450
451 validate_identifier(index_name, "index_name")?;
452
453 let if_exists = params
454 .as_ref()
455 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
456 .unwrap_or(false);
457
458 let concurrent = params
459 .as_ref()
460 .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
461 .unwrap_or(false);
462
463 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
464 let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
465
466 let sql = format!(
467 "DROP INDEX {}{}{}",
468 if_exists_str,
469 concurrent_str,
470 index_name
471 );
472
473 client.execute(&sql, &[]).await?;
474
475 Ok(json!({
476 "status": "success",
477 "action": "DROP INDEX",
478 "index_name": index_name,
479 "if_exists": if_exists,
480 "concurrent": concurrent
481 }))
482}
483
484pub async fn create_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
486 let table = params
487 .as_ref()
488 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
489 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
490
491 let partition_name = params
492 .as_ref()
493 .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
494 .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
495
496 let partition_type = params
497 .as_ref()
498 .and_then(|p| p.get("partition_type").and_then(|v| v.as_str()))
499 .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_type' parameter (RANGE/LIST/HASH)".into()))?;
500
501 validate_identifier(table, "table")?;
502 validate_identifier(partition_name, "partition_name")?;
503
504 let partition_type_upper = partition_type.to_uppercase();
505 if !["RANGE", "LIST", "HASH"].contains(&partition_type_upper.as_str()) {
506 return Err(MCPError::InvalidParams(
507 format!("'partition_type' must be RANGE, LIST, or HASH (got {})", partition_type)
508 ));
509 }
510
511 let column = params
512 .as_ref()
513 .and_then(|p| p.get("column").and_then(|v| v.as_str()))
514 .ok_or_else(|| MCPError::InvalidParams("Missing 'column' parameter".into()))?;
515
516 validate_identifier(column, "column")?;
517
518 let values = params
519 .as_ref()
520 .and_then(|p| p.get("values").and_then(|v| v.as_str()))
521 .ok_or_else(|| MCPError::InvalidParams(
522 "Missing 'values' parameter (FOR RANGE: 'FROM (x) TO (y)' or FOR LIST: 'IN (values)' or FOR HASH: 'MODULUS n REMAINDER r')".into()
523 ))?;
524
525 if values.contains(';') || values.contains("--") {
526 return Err(MCPError::InvalidParams(
527 "Invalid 'values' parameter: semicolons and SQL comments not allowed".into()
528 ));
529 }
530
531 let sql = format!(
532 "CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
533 partition_name,
534 table,
535 values
536 );
537
538 client.execute(&sql, &[]).await?;
539
540 Ok(json!({
541 "status": "success",
542 "action": "CREATE TABLE PARTITION",
543 "table": table,
544 "partition_name": partition_name,
545 "partition_type": partition_type,
546 "column": column
547 }))
548}
549
550pub async fn drop_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
552 let table = params
553 .as_ref()
554 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
555 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
556
557 let partition_name = params
558 .as_ref()
559 .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
560 .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
561
562 validate_identifier(partition_name, "partition_name")?;
563
564 let if_exists = params
565 .as_ref()
566 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
567 .unwrap_or(false);
568
569 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
570
571 let sql = format!(
572 "DROP TABLE {}{}",
573 if_exists_str,
574 partition_name
575 );
576
577 client.execute(&sql, &[]).await?;
578
579 Ok(json!({
580 "status": "success",
581 "action": "DROP TABLE PARTITION",
582 "table": table,
583 "partition_name": partition_name,
584 "if_exists": if_exists
585 }))
586}
587
588pub async fn create_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
590 let table = params
591 .as_ref()
592 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
593 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
594
595 let columns = params
596 .as_ref()
597 .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
598 .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
599
600 if columns.is_empty() {
601 return Err(MCPError::InvalidParams("'columns' array must not be empty".into()));
602 }
603
604 validate_identifier(table, "table")?;
605
606 let mut column_defs = Vec::new();
607 for (idx, col) in columns.iter().enumerate() {
608 let col_def = col.as_str()
609 .ok_or_else(|| MCPError::InvalidParams(format!("Column {} must be a string with format: 'name TYPE [constraints]'", idx)))?;
610
611 if col_def.is_empty() {
612 return Err(MCPError::InvalidParams(format!("Column {} definition cannot be empty", idx)));
613 }
614
615 if col_def.contains(';') || col_def.contains("--") {
616 return Err(MCPError::InvalidParams(
617 format!("Column {} definition contains dangerous SQL patterns", idx)
618 ));
619 }
620
621 column_defs.push(col_def.to_string());
622 }
623
624 let columns_str = column_defs.join(", ");
625 let sql = format!("CREATE TABLE {} ({})", table, columns_str);
626
627 client.execute(&sql, &[]).await?;
628
629 Ok(json!({
630 "status": "success",
631 "action": "CREATE TABLE",
632 "table": table,
633 "column_count": columns.len()
634 }))
635}
636
637pub async fn drop_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
639 let table = params
640 .as_ref()
641 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
642 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
643
644 validate_identifier(table, "table")?;
645
646 let if_exists = params
647 .as_ref()
648 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
649 .unwrap_or(false);
650
651 let cascade = params
652 .as_ref()
653 .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
654 .unwrap_or(false);
655
656 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
657 let cascade_str = if cascade { " CASCADE" } else { "" };
658
659 let sql = format!("DROP TABLE {}{}{}", if_exists_str, table, cascade_str);
660
661 client.execute(&sql, &[]).await?;
662
663 Ok(json!({
664 "status": "success",
665 "action": "DROP TABLE",
666 "table": table,
667 "if_exists": if_exists,
668 "cascade": cascade
669 }))
670}
671
672pub async fn create_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
674 let view_name = params
675 .as_ref()
676 .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
677 .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
678
679 let query = params
680 .as_ref()
681 .and_then(|p| p.get("query").and_then(|v| v.as_str()))
682 .ok_or_else(|| MCPError::InvalidParams("Missing 'query' parameter".into()))?;
683
684 validate_identifier(view_name, "view_name")?;
685
686 if query.trim().is_empty() {
687 return Err(MCPError::InvalidParams("'query' cannot be empty".into()));
688 }
689
690 let materialized = params
691 .as_ref()
692 .and_then(|p| p.get("materialized").and_then(|v| v.as_bool()))
693 .unwrap_or(false);
694
695 let or_replace = params
696 .as_ref()
697 .and_then(|p| p.get("or_replace").and_then(|v| v.as_bool()))
698 .unwrap_or(false);
699
700 let materialized_str = if materialized { "MATERIALIZED " } else { "" };
701 let or_replace_str = if or_replace { "OR REPLACE " } else { "" };
702
703 let sql = format!("CREATE {}{}VIEW {} AS {}", or_replace_str, materialized_str, view_name, query);
704
705 client.execute(&sql, &[]).await?;
706
707 Ok(json!({
708 "status": "success",
709 "action": "CREATE VIEW",
710 "view_name": view_name,
711 "materialized": materialized,
712 "or_replace": or_replace
713 }))
714}
715
716pub async fn drop_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
718 let view_name = params
719 .as_ref()
720 .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
721 .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
722
723 validate_identifier(view_name, "view_name")?;
724
725 let if_exists = params
726 .as_ref()
727 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
728 .unwrap_or(false);
729
730 let cascade = params
731 .as_ref()
732 .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
733 .unwrap_or(false);
734
735 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
736 let cascade_str = if cascade { " CASCADE" } else { "" };
737
738 let sql = format!("DROP VIEW {}{}{}", if_exists_str, view_name, cascade_str);
739
740 client.execute(&sql, &[]).await?;
741
742 Ok(json!({
743 "status": "success",
744 "action": "DROP VIEW",
745 "view_name": view_name,
746 "if_exists": if_exists,
747 "cascade": cascade
748 }))
749}
750
751pub async fn alter_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
753 let view_name = params
754 .as_ref()
755 .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
756 .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
757
758 validate_identifier(view_name, "view_name")?;
759
760 let rename_to = params
761 .as_ref()
762 .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
763
764 let set_schema = params
765 .as_ref()
766 .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
767
768 if rename_to.is_none() && set_schema.is_none() {
769 return Err(MCPError::InvalidParams(
770 "Must provide either 'rename_to' or 'set_schema' parameter".into()
771 ));
772 }
773
774 let mut action_desc = Vec::new();
775
776 if let Some(new_name) = rename_to {
777 validate_identifier(new_name, "rename_to")?;
778 let sql = format!("ALTER VIEW {} RENAME TO {}", view_name, new_name);
779 client.execute(&sql, &[]).await?;
780 action_desc.push(format!("renamed to {}", new_name));
781 }
782
783 if let Some(schema) = set_schema {
784 validate_identifier(schema, "set_schema")?;
785 let sql = format!("ALTER VIEW {} SET SCHEMA {}", view_name, schema);
786 client.execute(&sql, &[]).await?;
787 action_desc.push(format!("moved to schema {}", schema));
788 }
789
790 Ok(json!({
791 "status": "success",
792 "action": "ALTER VIEW",
793 "view_name": view_name,
794 "changes": action_desc
795 }))
796}
797
798pub async fn create_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
800 let schema_name = params
801 .as_ref()
802 .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
803 .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
804
805 validate_identifier(schema_name, "schema_name")?;
806
807 let if_not_exists = params
808 .as_ref()
809 .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
810 .unwrap_or(false);
811
812 let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
813
814 let sql = format!("CREATE SCHEMA {}{}", if_not_exists_str, schema_name);
815
816 client.execute(&sql, &[]).await?;
817
818 Ok(json!({
819 "status": "success",
820 "action": "CREATE SCHEMA",
821 "schema_name": schema_name,
822 "if_not_exists": if_not_exists
823 }))
824}
825
826pub async fn drop_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
828 let schema_name = params
829 .as_ref()
830 .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
831 .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
832
833 validate_identifier(schema_name, "schema_name")?;
834
835 let if_exists = params
836 .as_ref()
837 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
838 .unwrap_or(false);
839
840 let cascade = params
841 .as_ref()
842 .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
843 .unwrap_or(false);
844
845 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
846 let cascade_str = if cascade { " CASCADE" } else { "" };
847
848 let sql = format!("DROP SCHEMA {}{}{}", if_exists_str, schema_name, cascade_str);
849
850 client.execute(&sql, &[]).await?;
851
852 Ok(json!({
853 "status": "success",
854 "action": "DROP SCHEMA",
855 "schema_name": schema_name,
856 "if_exists": if_exists,
857 "cascade": cascade
858 }))
859}
860
861pub async fn create_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
863 let sequence_name = params
864 .as_ref()
865 .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
866 .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
867
868 validate_identifier(sequence_name, "sequence_name")?;
869
870 let if_not_exists = params
871 .as_ref()
872 .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
873 .unwrap_or(false);
874
875 let start = params
876 .as_ref()
877 .and_then(|p| p.get("start").and_then(|v| v.as_i64()))
878 .unwrap_or(1);
879
880 let increment = params
881 .as_ref()
882 .and_then(|p| p.get("increment").and_then(|v| v.as_i64()))
883 .unwrap_or(1);
884
885 let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
886
887 let sql = format!(
888 "CREATE SEQUENCE {}{} START {} INCREMENT {}",
889 if_not_exists_str,
890 sequence_name,
891 start,
892 increment
893 );
894
895 client.execute(&sql, &[]).await?;
896
897 Ok(json!({
898 "status": "success",
899 "action": "CREATE SEQUENCE",
900 "sequence_name": sequence_name,
901 "start": start,
902 "increment": increment,
903 "if_not_exists": if_not_exists
904 }))
905}
906
907pub async fn drop_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
909 let sequence_name = params
910 .as_ref()
911 .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
912 .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
913
914 validate_identifier(sequence_name, "sequence_name")?;
915
916 let if_exists = params
917 .as_ref()
918 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
919 .unwrap_or(false);
920
921 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
922
923 let sql = format!("DROP SEQUENCE {}{}", if_exists_str, sequence_name);
924
925 client.execute(&sql, &[]).await?;
926
927 Ok(json!({
928 "status": "success",
929 "action": "DROP SEQUENCE",
930 "sequence_name": sequence_name,
931 "if_exists": if_exists
932 }))
933}
934
935pub async fn alter_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
937 let index_name = params
938 .as_ref()
939 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
940 .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
941
942 validate_identifier(index_name, "index_name")?;
943
944 let rename_to = params
945 .as_ref()
946 .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
947
948 let set_schema = params
949 .as_ref()
950 .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
951
952 if rename_to.is_none() && set_schema.is_none() {
953 return Err(MCPError::InvalidParams(
954 "Must provide either 'rename_to' or 'set_schema' parameter".into()
955 ));
956 }
957
958 let mut action_desc = Vec::new();
959
960 if let Some(new_name) = rename_to {
961 validate_identifier(new_name, "rename_to")?;
962 let sql = format!("ALTER INDEX {} RENAME TO {}", index_name, new_name);
963 client.execute(&sql, &[]).await?;
964 action_desc.push(format!("renamed to {}", new_name));
965 }
966
967 if let Some(schema) = set_schema {
968 validate_identifier(schema, "set_schema")?;
969 let sql = format!("ALTER INDEX {} SET SCHEMA {}", index_name, schema);
970 client.execute(&sql, &[]).await?;
971 action_desc.push(format!("moved to schema {}", schema));
972 }
973
974 Ok(json!({
975 "status": "success",
976 "action": "ALTER INDEX",
977 "index_name": index_name,
978 "changes": action_desc
979 }))
980}
981
982pub async fn list_partitions(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
984 let table = params
985 .as_ref()
986 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
987 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
988
989 validate_identifier(table, "table")?;
990
991 let rows = client
992 .query(
993 "SELECT child.relname as partition_name, pg_size_pretty(pg_relation_size(child.oid)) as size
994 FROM pg_inherits
995 JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
996 JOIN pg_class child ON pg_inherits.inhrelid = child.oid
997 JOIN pg_namespace ON child.relnamespace = pg_namespace.oid
998 WHERE parent.relname = $1
999 ORDER BY child.relname",
1000 &[&table],
1001 )
1002 .await?;
1003
1004 let partitions: Vec<Value> = rows
1005 .iter()
1006 .map(|row| {
1007 json!({
1008 "partition_name": row.get::<_, String>(0),
1009 "size": row.get::<_, String>(1),
1010 })
1011 })
1012 .collect();
1013
1014 Ok(json!({
1015 "table": table,
1016 "partition_count": partitions.len(),
1017 "partitions": partitions
1018 }))
1019}
1020
1021pub async fn backup_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
1023 let table = params
1024 .as_ref()
1025 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
1026 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
1027
1028 validate_identifier(table, "table")?;
1029
1030 let backup_name = format!("backup_{}", table);
1031 validate_identifier(&backup_name, "backup_name")?;
1032
1033 let _: (i32,) = client
1035 .query_one(
1036 "SELECT 1 FROM information_schema.tables
1037 WHERE table_name = $1 AND table_schema NOT IN ('pg_catalog', 'information_schema')",
1038 &[&table],
1039 )
1040 .await
1041 .map_err(|_| MCPError::InvalidParams(format!("Table '{}' does not exist", table)))?
1042 .try_get(0)
1043 .map(|val| (val,))
1044 .map_err(|_| MCPError::InvalidParams("Could not verify table existence".into()))?;
1045
1046 let backup_exists = client
1048 .query_opt(
1049 "SELECT 1 FROM information_schema.tables WHERE table_name = $1",
1050 &[&backup_name],
1051 )
1052 .await?
1053 .is_some();
1054
1055 if backup_exists {
1056 return Err(MCPError::InvalidParams(
1057 format!("Backup table '{}' already exists. Drop it first or use a different table.", backup_name)
1058 ));
1059 }
1060
1061 let create_backup_sql = format!("CREATE TABLE {} AS SELECT * FROM {}", backup_name, table);
1063 client.execute(&create_backup_sql, &[]).await?;
1064
1065 let columns: Vec<(String, String, String)> = client
1067 .query(
1068 "SELECT column_name, data_type, is_nullable
1069 FROM information_schema.columns
1070 WHERE table_name = $1
1071 ORDER BY ordinal_position",
1072 &[&table],
1073 )
1074 .await?
1075 .iter()
1076 .map(|row| {
1077 (
1078 row.get::<_, String>(0),
1079 row.get::<_, String>(1),
1080 row.get::<_, String>(2),
1081 )
1082 })
1083 .collect();
1084
1085 let indexes: Vec<String> = client
1087 .query(
1088 "SELECT indexname FROM pg_indexes WHERE tablename = $1 AND schemaname NOT IN ('pg_catalog')",
1089 &[&table],
1090 )
1091 .await?
1092 .iter()
1093 .map(|row| row.get::<_, String>(0))
1094 .collect();
1095
1096 let mut indexes_created = 0;
1097 for idx_name in indexes {
1098 let new_idx_name = format!("{}_on_{}", idx_name, backup_name);
1099 let idx_def: String = client
1100 .query_one(
1101 "SELECT indexdef FROM pg_indexes WHERE indexname = $1",
1102 &[&idx_name],
1103 )
1104 .await?
1105 .get(0);
1106
1107 let updated_def = idx_def
1108 .replace(&format!("ON {}", table), &format!("ON {}", backup_name))
1109 .replace(&idx_name, &new_idx_name);
1110
1111 if client.execute(&updated_def, &[]).await.is_ok() {
1112 indexes_created += 1;
1113 }
1114 }
1115
1116 let row_count: (i64,) = client
1118 .query_one(&format!("SELECT COUNT(*) FROM {}", backup_name), &[])
1119 .await?
1120 .try_get(0)
1121 .map(|val| (val,))
1122 .map_err(|_| MCPError::InvalidParams("Could not count rows".into()))?;
1123
1124 Ok(json!({
1125 "status": "success",
1126 "action": "BACKUP TABLE",
1127 "original_table": table,
1128 "backup_table": backup_name,
1129 "rows_copied": row_count.0,
1130 "columns_copied": columns.len(),
1131 "indexes_created": indexes_created,
1132 "message": format!("Table '{}' backed up to '{}' with {} rows", table, backup_name, row_count.0)
1133 }))
1134}