1use crate::errors::{MCPError, Result as MCPResult};
2use crate::validation::validate_identifier;
3use serde_json::{Value, json};
4use tokio_postgres::Client;
5
6pub async fn list_tables(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
8 let rows = client
9 .query(
10 "SELECT table_schema, table_name, table_type
11 FROM information_schema.tables
12 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
13 ORDER BY table_schema, table_name",
14 &[],
15 )
16 .await?;
17
18 let tables: Vec<Value> = rows
19 .iter()
20 .map(|row| {
21 json!({
22 "schema": row.get::<_, String>(0),
23 "name": row.get::<_, String>(1),
24 "type": row.get::<_, String>(2),
25 })
26 })
27 .collect();
28
29 Ok(json!({ "tables": tables }))
30}
31
32pub async fn describe_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
34 let table_name = params
35 .as_ref()
36 .and_then(|p| p.get("table"))
37 .and_then(|v| v.as_str())
38 .ok_or_else(|| {
39 crate::errors::MCPError::InvalidParams("Missing 'table' parameter".into())
40 })?;
41
42 validate_identifier(table_name, "table")?;
43
44 let rows = client
45 .query(
46 "SELECT column_name, data_type, is_nullable, column_default, ordinal_position
47 FROM information_schema.columns
48 WHERE table_name = $1
49 ORDER BY ordinal_position",
50 &[&table_name],
51 )
52 .await?;
53
54 let columns: Vec<Value> = rows
55 .iter()
56 .map(|row| {
57 json!({
58 "name": row.get::<_, String>(0),
59 "type": row.get::<_, String>(1),
60 "nullable": row.get::<_, String>(2),
61 "default": row.get::<_, Option<String>>(3),
62 "position": row.get::<_, i32>(4),
63 })
64 })
65 .collect();
66
67 Ok(json!({ "columns": columns }))
68}
69
70pub async fn list_indexes(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
72 let rows = client
73 .query(
74 "SELECT schemaname, tablename, indexname, indexdef
75 FROM pg_indexes
76 WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
77 ORDER BY schemaname, tablename, indexname",
78 &[],
79 )
80 .await?;
81
82 let indexes: Vec<Value> = rows
83 .iter()
84 .map(|row| {
85 json!({
86 "schema": row.get::<_, String>(0),
87 "table": row.get::<_, String>(1),
88 "name": row.get::<_, String>(2),
89 "definition": row.get::<_, String>(3),
90 })
91 })
92 .collect();
93
94 Ok(json!({ "indexes": indexes }))
95}
96
97pub async fn list_schemas(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
99 let rows = client
100 .query(
101 "SELECT schema_name, schema_owner
102 FROM information_schema.schemata
103 WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
104 ORDER BY schema_name",
105 &[],
106 )
107 .await?;
108
109 let schemas: Vec<Value> = rows
110 .iter()
111 .map(|row| {
112 json!({
113 "name": row.get::<_, String>(0),
114 "owner": row.get::<_, String>(1),
115 })
116 })
117 .collect();
118
119 Ok(json!({ "schemas": schemas }))
120}
121
122pub async fn show_constraints(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
124 let rows = client
125 .query(
126 "SELECT table_schema, table_name, constraint_name, constraint_type
127 FROM information_schema.table_constraints
128 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
129 ORDER BY table_schema, table_name, constraint_name",
130 &[],
131 )
132 .await?;
133
134 let constraints: Vec<Value> = rows
135 .iter()
136 .map(|row| {
137 json!({
138 "schema": row.get::<_, String>(0),
139 "table": row.get::<_, String>(1),
140 "name": row.get::<_, String>(2),
141 "type": row.get::<_, String>(3),
142 })
143 })
144 .collect();
145
146 Ok(json!({ "constraints": constraints }))
147}
148
149pub async fn get_object_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
151 let schema_name = params
152 .as_ref()
153 .and_then(|p| p.get("schema"))
154 .and_then(|v| v.as_str())
155 .unwrap_or("public");
156
157 validate_identifier(schema_name, "schema")?;
158
159 let table_name = params
160 .as_ref()
161 .and_then(|p| p.get("table"))
162 .and_then(|v| v.as_str())
163 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
164
165 validate_identifier(table_name, "table")?;
166
167 let columns = client
168 .query(
169 "SELECT c.column_name::text, c.data_type::text, c.is_nullable::text,
170 c.column_default::text, c.ordinal_position,
171 COALESCE(pgd.description, '')::text AS column_description,
172 CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END AS is_pk,
173 CASE WHEN uc.column_name IS NOT NULL THEN true ELSE false END AS is_unique
174 FROM information_schema.columns c
175 LEFT JOIN pg_catalog.pg_statio_all_tables st
176 ON st.relname = c.table_name AND st.schemaname = c.table_schema
177 LEFT JOIN pg_catalog.pg_description pgd
178 ON pgd.objoid = st.relid AND pgd.objsubid = c.ordinal_position
179 LEFT JOIN (
180 SELECT ku.column_name, tc.table_schema, tc.table_name
181 FROM information_schema.table_constraints tc
182 JOIN information_schema.key_column_usage ku
183 ON tc.constraint_name = ku.constraint_name
184 AND tc.table_schema = ku.table_schema
185 WHERE tc.constraint_type = 'PRIMARY KEY'
186 ) pk ON pk.column_name = c.column_name
187 AND pk.table_schema = c.table_schema
188 AND pk.table_name = c.table_name
189 LEFT JOIN (
190 SELECT ku.column_name, tc.table_schema, tc.table_name
191 FROM information_schema.table_constraints tc
192 JOIN information_schema.key_column_usage ku
193 ON tc.constraint_name = ku.constraint_name
194 AND tc.table_schema = ku.table_schema
195 WHERE tc.constraint_type = 'UNIQUE'
196 ) uc ON uc.column_name = c.column_name
197 AND uc.table_schema = c.table_schema
198 AND uc.table_name = c.table_name
199 WHERE c.table_schema = $1 AND c.table_name = $2
200 ORDER BY c.ordinal_position",
201 &[&schema_name, &table_name],
202 )
203 .await?;
204
205 let cols: Vec<Value> = columns
206 .iter()
207 .map(|row| {
208 json!({
209 "name": row.get::<_, String>(0),
210 "type": row.get::<_, String>(1),
211 "nullable": row.get::<_, String>(2) == "YES",
212 "default": row.get::<_, Option<String>>(3),
213 "position": row.get::<_, i32>(4),
214 "description": row.get::<_, String>(5),
215 "is_primary_key": row.get::<_, bool>(6),
216 "is_unique": row.get::<_, bool>(7),
217 })
218 })
219 .collect();
220
221 let indexes = client
222 .query(
223 "SELECT indexname::text, indexdef::text
224 FROM pg_indexes
225 WHERE schemaname = $1 AND tablename = $2
226 ORDER BY indexname",
227 &[&schema_name, &table_name],
228 )
229 .await?;
230
231 let idxs: Vec<Value> = indexes
232 .iter()
233 .map(|row| {
234 json!({
235 "name": row.get::<_, String>(0),
236 "definition": row.get::<_, String>(1),
237 })
238 })
239 .collect();
240
241 let foreign_keys = client
242 .query(
243 "SELECT kcu.column_name::text,
244 ccu.table_schema::text AS foreign_schema,
245 ccu.table_name::text AS foreign_table,
246 ccu.column_name::text AS foreign_column,
247 rc.update_rule::text, rc.delete_rule::text
248 FROM information_schema.table_constraints tc
249 JOIN information_schema.key_column_usage kcu
250 ON tc.constraint_name = kcu.constraint_name
251 AND tc.table_schema = kcu.table_schema
252 JOIN information_schema.constraint_column_usage ccu
253 ON tc.constraint_name = ccu.constraint_name
254 AND tc.table_schema = ccu.table_schema
255 JOIN information_schema.referential_constraints rc
256 ON tc.constraint_name = rc.constraint_name
257 AND tc.table_schema = rc.constraint_schema
258 WHERE tc.constraint_type = 'FOREIGN KEY'
259 AND tc.table_schema = $1 AND tc.table_name = $2
260 ORDER BY kcu.ordinal_position",
261 &[&schema_name, &table_name],
262 )
263 .await?;
264
265 let fks: Vec<Value> = foreign_keys
266 .iter()
267 .map(|row| {
268 json!({
269 "column": row.get::<_, String>(0),
270 "references_schema": row.get::<_, String>(1),
271 "references_table": row.get::<_, String>(2),
272 "references_column": row.get::<_, String>(3),
273 "on_update": row.get::<_, String>(4),
274 "on_delete": row.get::<_, String>(5),
275 })
276 })
277 .collect();
278
279 let constraints = client
280 .query(
281 "SELECT constraint_name::text, constraint_type::text
282 FROM information_schema.table_constraints
283 WHERE table_schema = $1 AND table_name = $2
284 ORDER BY constraint_name",
285 &[&schema_name, &table_name],
286 )
287 .await?;
288
289 let cons: Vec<Value> = constraints
290 .iter()
291 .map(|row| {
292 json!({
293 "name": row.get::<_, String>(0),
294 "type": row.get::<_, String>(1),
295 })
296 })
297 .collect();
298
299 let row_estimate = client
300 .query_one(
301 "SELECT n_live_tup FROM pg_stat_user_tables
302 WHERE schemaname = $1 AND relname = $2",
303 &[&schema_name, &table_name],
304 )
305 .await
306 .map(|r| r.get::<_, Option<f64>>(0))
307 .unwrap_or(None);
308
309 let table_size = client
310 .query_one(
311 "SELECT pg_size_pretty(pg_total_relation_size($1::regclass))",
312 &[&format!("{}.{}", schema_name, table_name)],
313 )
314 .await
315 .map(|r| r.get::<_, Option<String>>(0))
316 .unwrap_or(None);
317
318 Ok(json!({
319 "table": table_name,
320 "schema": schema_name,
321 "columns": cols,
322 "indexes": idxs,
323 "foreign_keys": fks,
324 "constraints": cons,
325 "estimated_rows": row_estimate,
326 "total_size": table_size,
327 }))
328}
329
330pub async fn list_triggers(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
332 let table = params
333 .as_ref()
334 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
335 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
336
337 let schema = params
338 .as_ref()
339 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
340 .unwrap_or("public");
341
342 let limit = params
343 .as_ref()
344 .and_then(|p| p.get("limit").and_then(|v| v.as_i64()))
345 .unwrap_or(1000);
346
347 validate_identifier(table, "table")?;
348 validate_identifier(schema, "schema")?;
349
350 if !((1..=10000).contains(&limit)) {
351 return Err(MCPError::InvalidParams(format!(
352 "'limit' must be between 1 and 10000 (got {})",
353 limit
354 )));
355 }
356
357 let rows = client
358 .query(
359 "SELECT trigger_name, event_object_table, event_manipulation,
360 action_timing, action_statement, trigger_schema
361 FROM information_schema.triggers
362 WHERE event_object_table = $1 AND trigger_schema = $2
363 ORDER BY trigger_name
364 LIMIT $3",
365 &[&table, &schema, &limit],
366 )
367 .await?;
368
369 let triggers: Vec<Value> = rows
370 .iter()
371 .map(|row| {
372 json!({
373 "name": row.get::<_, String>(0),
374 "table": row.get::<_, String>(1),
375 "event": row.get::<_, String>(2),
376 "timing": row.get::<_, String>(3),
377 "statement": row.get::<_, String>(4),
378 "schema": row.get::<_, String>(5),
379 })
380 })
381 .collect();
382
383 Ok(json!({
384 "table": table,
385 "schema": schema,
386 "trigger_count": triggers.len(),
387 "triggers": triggers
388 }))
389}
390
391pub async fn create_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
393 let index_name = params
394 .as_ref()
395 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
396 .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
397
398 let table = params
399 .as_ref()
400 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
401 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
402
403 let columns = params
404 .as_ref()
405 .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
406 .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
407
408 if columns.is_empty() {
409 return Err(MCPError::InvalidParams(
410 "'columns' array must not be empty".into(),
411 ));
412 }
413
414 validate_identifier(index_name, "index_name")?;
415 validate_identifier(table, "table")?;
416
417 let mut column_list = Vec::new();
418 for col in columns {
419 let col_name = col
420 .as_str()
421 .ok_or_else(|| MCPError::InvalidParams("Column names must be strings".into()))?;
422 validate_identifier(col_name, "column")?;
423 column_list.push(col_name.to_string());
424 }
425
426 let unique = params
427 .as_ref()
428 .and_then(|p| p.get("unique").and_then(|v| v.as_bool()))
429 .unwrap_or(false);
430
431 let concurrent = params
432 .as_ref()
433 .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
434 .unwrap_or(false);
435
436 let unique_str = if unique { "UNIQUE " } else { "" };
437 let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
438 let columns_str = column_list.join(", ");
439
440 let sql = format!(
441 "CREATE {}INDEX {} {} ON {}({})",
442 unique_str, concurrent_str, index_name, table, columns_str
443 );
444
445 client.execute(&sql, &[]).await?;
446
447 Ok(json!({
448 "status": "success",
449 "action": "CREATE INDEX",
450 "index_name": index_name,
451 "table": table,
452 "columns": column_list,
453 "unique": unique,
454 "concurrent": concurrent
455 }))
456}
457
458pub async fn drop_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
460 let index_name = params
461 .as_ref()
462 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
463 .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
464
465 validate_identifier(index_name, "index_name")?;
466
467 let if_exists = params
468 .as_ref()
469 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
470 .unwrap_or(false);
471
472 let concurrent = params
473 .as_ref()
474 .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
475 .unwrap_or(false);
476
477 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
478 let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
479
480 let sql = format!(
481 "DROP INDEX {}{}{}",
482 if_exists_str, concurrent_str, index_name
483 );
484
485 client.execute(&sql, &[]).await?;
486
487 Ok(json!({
488 "status": "success",
489 "action": "DROP INDEX",
490 "index_name": index_name,
491 "if_exists": if_exists,
492 "concurrent": concurrent
493 }))
494}
495
496pub async fn create_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
498 let table = params
499 .as_ref()
500 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
501 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
502
503 let partition_name = params
504 .as_ref()
505 .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
506 .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
507
508 let partition_type = params
509 .as_ref()
510 .and_then(|p| p.get("partition_type").and_then(|v| v.as_str()))
511 .ok_or_else(|| {
512 MCPError::InvalidParams("Missing 'partition_type' parameter (RANGE/LIST/HASH)".into())
513 })?;
514
515 validate_identifier(table, "table")?;
516 validate_identifier(partition_name, "partition_name")?;
517
518 let partition_type_upper = partition_type.to_uppercase();
519 if !["RANGE", "LIST", "HASH"].contains(&partition_type_upper.as_str()) {
520 return Err(MCPError::InvalidParams(format!(
521 "'partition_type' must be RANGE, LIST, or HASH (got {})",
522 partition_type
523 )));
524 }
525
526 let column = params
527 .as_ref()
528 .and_then(|p| p.get("column").and_then(|v| v.as_str()))
529 .ok_or_else(|| MCPError::InvalidParams("Missing 'column' parameter".into()))?;
530
531 validate_identifier(column, "column")?;
532
533 let values = params
534 .as_ref()
535 .and_then(|p| p.get("values").and_then(|v| v.as_str()))
536 .ok_or_else(|| MCPError::InvalidParams(
537 "Missing 'values' parameter (FOR RANGE: 'FROM (x) TO (y)' or FOR LIST: 'IN (values)' or FOR HASH: 'MODULUS n REMAINDER r')".into()
538 ))?;
539
540 if values.contains(';') || values.contains("--") {
541 return Err(MCPError::InvalidParams(
542 "Invalid 'values' parameter: semicolons and SQL comments not allowed".into(),
543 ));
544 }
545
546 let sql = format!(
547 "CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
548 partition_name, table, values
549 );
550
551 client.execute(&sql, &[]).await?;
552
553 Ok(json!({
554 "status": "success",
555 "action": "CREATE TABLE PARTITION",
556 "table": table,
557 "partition_name": partition_name,
558 "partition_type": partition_type,
559 "column": column
560 }))
561}
562
563pub async fn drop_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
565 let table = params
566 .as_ref()
567 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
568 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
569
570 let partition_name = params
571 .as_ref()
572 .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
573 .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
574
575 validate_identifier(partition_name, "partition_name")?;
576
577 let if_exists = params
578 .as_ref()
579 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
580 .unwrap_or(false);
581
582 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
583
584 let sql = format!("DROP TABLE {}{}", if_exists_str, partition_name);
585
586 client.execute(&sql, &[]).await?;
587
588 Ok(json!({
589 "status": "success",
590 "action": "DROP TABLE PARTITION",
591 "table": table,
592 "partition_name": partition_name,
593 "if_exists": if_exists
594 }))
595}
596
597pub async fn create_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
599 let table = params
600 .as_ref()
601 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
602 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
603
604 let columns = params
605 .as_ref()
606 .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
607 .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
608
609 if columns.is_empty() {
610 return Err(MCPError::InvalidParams(
611 "'columns' array must not be empty".into(),
612 ));
613 }
614
615 validate_identifier(table, "table")?;
616
617 let mut column_defs = Vec::new();
618 for (idx, col) in columns.iter().enumerate() {
619 let col_def = col.as_str().ok_or_else(|| {
620 MCPError::InvalidParams(format!(
621 "Column {} must be a string with format: 'name TYPE [constraints]'",
622 idx
623 ))
624 })?;
625
626 if col_def.is_empty() {
627 return Err(MCPError::InvalidParams(format!(
628 "Column {} definition cannot be empty",
629 idx
630 )));
631 }
632
633 if col_def.contains(';') || col_def.contains("--") {
634 return Err(MCPError::InvalidParams(format!(
635 "Column {} definition contains dangerous SQL patterns",
636 idx
637 )));
638 }
639
640 column_defs.push(col_def.to_string());
641 }
642
643 let columns_str = column_defs.join(", ");
644 let sql = format!("CREATE TABLE {} ({})", table, columns_str);
645
646 client.execute(&sql, &[]).await?;
647
648 Ok(json!({
649 "status": "success",
650 "action": "CREATE TABLE",
651 "table": table,
652 "column_count": columns.len()
653 }))
654}
655
656pub async fn drop_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
658 let table = params
659 .as_ref()
660 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
661 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
662
663 validate_identifier(table, "table")?;
664
665 let if_exists = params
666 .as_ref()
667 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
668 .unwrap_or(false);
669
670 let cascade = params
671 .as_ref()
672 .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
673 .unwrap_or(false);
674
675 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
676 let cascade_str = if cascade { " CASCADE" } else { "" };
677
678 let sql = format!("DROP TABLE {}{}{}", if_exists_str, table, cascade_str);
679
680 client.execute(&sql, &[]).await?;
681
682 Ok(json!({
683 "status": "success",
684 "action": "DROP TABLE",
685 "table": table,
686 "if_exists": if_exists,
687 "cascade": cascade
688 }))
689}
690
691pub async fn create_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
693 let view_name = params
694 .as_ref()
695 .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
696 .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
697
698 let query = params
699 .as_ref()
700 .and_then(|p| p.get("query").and_then(|v| v.as_str()))
701 .ok_or_else(|| MCPError::InvalidParams("Missing 'query' parameter".into()))?;
702
703 validate_identifier(view_name, "view_name")?;
704
705 if query.trim().is_empty() {
706 return Err(MCPError::InvalidParams("'query' cannot be empty".into()));
707 }
708
709 let materialized = params
710 .as_ref()
711 .and_then(|p| p.get("materialized").and_then(|v| v.as_bool()))
712 .unwrap_or(false);
713
714 let or_replace = params
715 .as_ref()
716 .and_then(|p| p.get("or_replace").and_then(|v| v.as_bool()))
717 .unwrap_or(false);
718
719 let materialized_str = if materialized { "MATERIALIZED " } else { "" };
720 let or_replace_str = if or_replace { "OR REPLACE " } else { "" };
721
722 let sql = format!(
723 "CREATE {}{}VIEW {} AS {}",
724 or_replace_str, materialized_str, view_name, query
725 );
726
727 client.execute(&sql, &[]).await?;
728
729 Ok(json!({
730 "status": "success",
731 "action": "CREATE VIEW",
732 "view_name": view_name,
733 "materialized": materialized,
734 "or_replace": or_replace
735 }))
736}
737
738pub async fn drop_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
740 let view_name = params
741 .as_ref()
742 .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
743 .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
744
745 validate_identifier(view_name, "view_name")?;
746
747 let if_exists = params
748 .as_ref()
749 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
750 .unwrap_or(false);
751
752 let cascade = params
753 .as_ref()
754 .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
755 .unwrap_or(false);
756
757 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
758 let cascade_str = if cascade { " CASCADE" } else { "" };
759
760 let sql = format!("DROP VIEW {}{}{}", if_exists_str, view_name, cascade_str);
761
762 client.execute(&sql, &[]).await?;
763
764 Ok(json!({
765 "status": "success",
766 "action": "DROP VIEW",
767 "view_name": view_name,
768 "if_exists": if_exists,
769 "cascade": cascade
770 }))
771}
772
773pub async fn alter_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
775 let view_name = params
776 .as_ref()
777 .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
778 .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
779
780 validate_identifier(view_name, "view_name")?;
781
782 let rename_to = params
783 .as_ref()
784 .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
785
786 let set_schema = params
787 .as_ref()
788 .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
789
790 if rename_to.is_none() && set_schema.is_none() {
791 return Err(MCPError::InvalidParams(
792 "Must provide either 'rename_to' or 'set_schema' parameter".into(),
793 ));
794 }
795
796 let mut action_desc = Vec::new();
797
798 if let Some(new_name) = rename_to {
799 validate_identifier(new_name, "rename_to")?;
800 let sql = format!("ALTER VIEW {} RENAME TO {}", view_name, new_name);
801 client.execute(&sql, &[]).await?;
802 action_desc.push(format!("renamed to {}", new_name));
803 }
804
805 if let Some(schema) = set_schema {
806 validate_identifier(schema, "set_schema")?;
807 let sql = format!("ALTER VIEW {} SET SCHEMA {}", view_name, schema);
808 client.execute(&sql, &[]).await?;
809 action_desc.push(format!("moved to schema {}", schema));
810 }
811
812 Ok(json!({
813 "status": "success",
814 "action": "ALTER VIEW",
815 "view_name": view_name,
816 "changes": action_desc
817 }))
818}
819
820pub async fn create_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
822 let schema_name = params
823 .as_ref()
824 .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
825 .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
826
827 validate_identifier(schema_name, "schema_name")?;
828
829 let if_not_exists = params
830 .as_ref()
831 .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
832 .unwrap_or(false);
833
834 let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
835
836 let sql = format!("CREATE SCHEMA {}{}", if_not_exists_str, schema_name);
837
838 client.execute(&sql, &[]).await?;
839
840 Ok(json!({
841 "status": "success",
842 "action": "CREATE SCHEMA",
843 "schema_name": schema_name,
844 "if_not_exists": if_not_exists
845 }))
846}
847
848pub async fn drop_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
850 let schema_name = params
851 .as_ref()
852 .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
853 .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
854
855 validate_identifier(schema_name, "schema_name")?;
856
857 let if_exists = params
858 .as_ref()
859 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
860 .unwrap_or(false);
861
862 let cascade = params
863 .as_ref()
864 .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
865 .unwrap_or(false);
866
867 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
868 let cascade_str = if cascade { " CASCADE" } else { "" };
869
870 let sql = format!(
871 "DROP SCHEMA {}{}{}",
872 if_exists_str, schema_name, cascade_str
873 );
874
875 client.execute(&sql, &[]).await?;
876
877 Ok(json!({
878 "status": "success",
879 "action": "DROP SCHEMA",
880 "schema_name": schema_name,
881 "if_exists": if_exists,
882 "cascade": cascade
883 }))
884}
885
886pub async fn create_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
888 let sequence_name = params
889 .as_ref()
890 .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
891 .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
892
893 validate_identifier(sequence_name, "sequence_name")?;
894
895 let if_not_exists = params
896 .as_ref()
897 .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
898 .unwrap_or(false);
899
900 let start = params
901 .as_ref()
902 .and_then(|p| p.get("start").and_then(|v| v.as_i64()))
903 .unwrap_or(1);
904
905 let increment = params
906 .as_ref()
907 .and_then(|p| p.get("increment").and_then(|v| v.as_i64()))
908 .unwrap_or(1);
909
910 let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
911
912 let sql = format!(
913 "CREATE SEQUENCE {}{} START {} INCREMENT {}",
914 if_not_exists_str, sequence_name, start, increment
915 );
916
917 client.execute(&sql, &[]).await?;
918
919 Ok(json!({
920 "status": "success",
921 "action": "CREATE SEQUENCE",
922 "sequence_name": sequence_name,
923 "start": start,
924 "increment": increment,
925 "if_not_exists": if_not_exists
926 }))
927}
928
929pub async fn drop_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
931 let sequence_name = params
932 .as_ref()
933 .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
934 .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
935
936 validate_identifier(sequence_name, "sequence_name")?;
937
938 let if_exists = params
939 .as_ref()
940 .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
941 .unwrap_or(false);
942
943 let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
944
945 let sql = format!("DROP SEQUENCE {}{}", if_exists_str, sequence_name);
946
947 client.execute(&sql, &[]).await?;
948
949 Ok(json!({
950 "status": "success",
951 "action": "DROP SEQUENCE",
952 "sequence_name": sequence_name,
953 "if_exists": if_exists
954 }))
955}
956
957pub async fn alter_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
959 let index_name = params
960 .as_ref()
961 .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
962 .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
963
964 validate_identifier(index_name, "index_name")?;
965
966 let rename_to = params
967 .as_ref()
968 .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
969
970 let set_schema = params
971 .as_ref()
972 .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
973
974 if rename_to.is_none() && set_schema.is_none() {
975 return Err(MCPError::InvalidParams(
976 "Must provide either 'rename_to' or 'set_schema' parameter".into(),
977 ));
978 }
979
980 let mut action_desc = Vec::new();
981
982 if let Some(new_name) = rename_to {
983 validate_identifier(new_name, "rename_to")?;
984 let sql = format!("ALTER INDEX {} RENAME TO {}", index_name, new_name);
985 client.execute(&sql, &[]).await?;
986 action_desc.push(format!("renamed to {}", new_name));
987 }
988
989 if let Some(schema) = set_schema {
990 validate_identifier(schema, "set_schema")?;
991 let sql = format!("ALTER INDEX {} SET SCHEMA {}", index_name, schema);
992 client.execute(&sql, &[]).await?;
993 action_desc.push(format!("moved to schema {}", schema));
994 }
995
996 Ok(json!({
997 "status": "success",
998 "action": "ALTER INDEX",
999 "index_name": index_name,
1000 "changes": action_desc
1001 }))
1002}
1003
1004pub async fn list_partitions(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
1006 let table = params
1007 .as_ref()
1008 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
1009 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
1010
1011 validate_identifier(table, "table")?;
1012
1013 let rows = client
1014 .query(
1015 "SELECT child.relname as partition_name, pg_size_pretty(pg_relation_size(child.oid)) as size
1016 FROM pg_inherits
1017 JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
1018 JOIN pg_class child ON pg_inherits.inhrelid = child.oid
1019 JOIN pg_namespace ON child.relnamespace = pg_namespace.oid
1020 WHERE parent.relname = $1
1021 ORDER BY child.relname",
1022 &[&table],
1023 )
1024 .await?;
1025
1026 let partitions: Vec<Value> = rows
1027 .iter()
1028 .map(|row| {
1029 json!({
1030 "partition_name": row.get::<_, String>(0),
1031 "size": row.get::<_, String>(1),
1032 })
1033 })
1034 .collect();
1035
1036 Ok(json!({
1037 "table": table,
1038 "partition_count": partitions.len(),
1039 "partitions": partitions
1040 }))
1041}
1042
1043pub async fn backup_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
1045 let table = params
1046 .as_ref()
1047 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
1048 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
1049
1050 validate_identifier(table, "table")?;
1051
1052 let backup_name = format!("backup_{}", table);
1053 validate_identifier(&backup_name, "backup_name")?;
1054
1055 let _: (i32,) = client
1057 .query_one(
1058 "SELECT 1 FROM information_schema.tables
1059 WHERE table_name = $1 AND table_schema NOT IN ('pg_catalog', 'information_schema')",
1060 &[&table],
1061 )
1062 .await
1063 .map_err(|_| MCPError::InvalidParams(format!("Table '{}' does not exist", table)))?
1064 .try_get(0)
1065 .map(|val| (val,))
1066 .map_err(|_| MCPError::InvalidParams("Could not verify table existence".into()))?;
1067
1068 let backup_exists = client
1070 .query_opt(
1071 "SELECT 1 FROM information_schema.tables WHERE table_name = $1",
1072 &[&backup_name],
1073 )
1074 .await?
1075 .is_some();
1076
1077 if backup_exists {
1078 return Err(MCPError::InvalidParams(format!(
1079 "Backup table '{}' already exists. Drop it first or use a different table.",
1080 backup_name
1081 )));
1082 }
1083
1084 let create_backup_sql = format!("CREATE TABLE {} AS SELECT * FROM {}", backup_name, table);
1086 client.execute(&create_backup_sql, &[]).await?;
1087
1088 let columns: Vec<(String, String, String)> = client
1090 .query(
1091 "SELECT column_name, data_type, is_nullable
1092 FROM information_schema.columns
1093 WHERE table_name = $1
1094 ORDER BY ordinal_position",
1095 &[&table],
1096 )
1097 .await?
1098 .iter()
1099 .map(|row| {
1100 (
1101 row.get::<_, String>(0),
1102 row.get::<_, String>(1),
1103 row.get::<_, String>(2),
1104 )
1105 })
1106 .collect();
1107
1108 let indexes: Vec<String> = client
1110 .query(
1111 "SELECT indexname FROM pg_indexes WHERE tablename = $1 AND schemaname NOT IN ('pg_catalog')",
1112 &[&table],
1113 )
1114 .await?
1115 .iter()
1116 .map(|row| row.get::<_, String>(0))
1117 .collect();
1118
1119 let mut indexes_created = 0;
1120 for idx_name in indexes {
1121 let new_idx_name = format!("{}_on_{}", idx_name, backup_name);
1122 let idx_def: String = client
1123 .query_one(
1124 "SELECT indexdef FROM pg_indexes WHERE indexname = $1",
1125 &[&idx_name],
1126 )
1127 .await?
1128 .get(0);
1129
1130 let updated_def = idx_def
1131 .replace(&format!("ON {}", table), &format!("ON {}", backup_name))
1132 .replace(&idx_name, &new_idx_name);
1133
1134 if client.execute(&updated_def, &[]).await.is_ok() {
1135 indexes_created += 1;
1136 }
1137 }
1138
1139 let row_count: (i64,) = client
1141 .query_one(&format!("SELECT COUNT(*) FROM {}", backup_name), &[])
1142 .await?
1143 .try_get(0)
1144 .map(|val| (val,))
1145 .map_err(|_| MCPError::InvalidParams("Could not count rows".into()))?;
1146
1147 Ok(json!({
1148 "status": "success",
1149 "action": "BACKUP TABLE",
1150 "original_table": table,
1151 "backup_table": backup_name,
1152 "rows_copied": row_count.0,
1153 "columns_copied": columns.len(),
1154 "indexes_created": indexes_created,
1155 "message": format!("Table '{}' backed up to '{}' with {} rows", table, backup_name, row_count.0)
1156 }))
1157}