Skip to main content

mcp_postgres/actions/
schema.rs

1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::{MCPError, Result as MCPResult};
4use crate::validation::validate_identifier;
5
6/// 1. List all tables
7pub async fn list_tables(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
8    let rows = client
9        .query(
10            "SELECT table_schema, table_name, table_type
11             FROM information_schema.tables
12             WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
13             ORDER BY table_schema, table_name",
14            &[],
15        )
16        .await?;
17
18    let tables: Vec<Value> = rows
19        .iter()
20        .map(|row| {
21            json!({
22                "schema": row.get::<_, String>(0),
23                "name": row.get::<_, String>(1),
24                "type": row.get::<_, String>(2),
25            })
26        })
27        .collect();
28
29    Ok(json!({ "tables": tables }))
30}
31
32/// 2. Describe table structure
33pub async fn describe_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
34    let table_name = params
35        .as_ref()
36        .and_then(|p| p.get("table"))
37        .and_then(|v| v.as_str())
38        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table' parameter".into()))?;
39
40    validate_identifier(table_name, "table")?;
41
42    let rows = client
43        .query(
44            "SELECT column_name, data_type, is_nullable, column_default, ordinal_position
45             FROM information_schema.columns
46             WHERE table_name = $1
47             ORDER BY ordinal_position",
48            &[&table_name],
49        )
50        .await?;
51
52    let columns: Vec<Value> = rows
53        .iter()
54        .map(|row| {
55            json!({
56                "name": row.get::<_, String>(0),
57                "type": row.get::<_, String>(1),
58                "nullable": row.get::<_, String>(2),
59                "default": row.get::<_, Option<String>>(3),
60                "position": row.get::<_, i32>(4),
61            })
62        })
63        .collect();
64
65    Ok(json!({ "columns": columns }))
66}
67
68/// 3. List all indexes
69pub async fn list_indexes(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
70    let rows = client
71        .query(
72            "SELECT schemaname, tablename, indexname, indexdef
73             FROM pg_indexes
74             WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
75             ORDER BY schemaname, tablename, indexname",
76            &[],
77        )
78        .await?;
79
80    let indexes: Vec<Value> = rows
81        .iter()
82        .map(|row| {
83            json!({
84                "schema": row.get::<_, String>(0),
85                "table": row.get::<_, String>(1),
86                "name": row.get::<_, String>(2),
87                "definition": row.get::<_, String>(3),
88            })
89        })
90        .collect();
91
92    Ok(json!({ "indexes": indexes }))
93}
94
95/// 4. List schemas
96pub async fn list_schemas(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
97    let rows = client
98        .query(
99            "SELECT schema_name, schema_owner
100             FROM information_schema.schemata
101             WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
102             ORDER BY schema_name",
103            &[],
104        )
105        .await?;
106
107    let schemas: Vec<Value> = rows
108        .iter()
109        .map(|row| {
110            json!({
111                "name": row.get::<_, String>(0),
112                "owner": row.get::<_, String>(1),
113            })
114        })
115        .collect();
116
117    Ok(json!({ "schemas": schemas }))
118}
119
120/// 5. Show constraints
121pub async fn show_constraints(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
122    let rows = client
123        .query(
124            "SELECT table_schema, table_name, constraint_name, constraint_type
125             FROM information_schema.table_constraints
126             WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
127             ORDER BY table_schema, table_name, constraint_name",
128            &[],
129        )
130        .await?;
131
132    let constraints: Vec<Value> = rows
133        .iter()
134        .map(|row| {
135            json!({
136                "schema": row.get::<_, String>(0),
137                "table": row.get::<_, String>(1),
138                "name": row.get::<_, String>(2),
139                "type": row.get::<_, String>(3),
140            })
141        })
142        .collect();
143
144    Ok(json!({ "constraints": constraints }))
145}
146
147/// 5b. Get detailed object info (columns, constraints, indexes, FKs, descriptions)
148pub async fn get_object_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
149    let schema_name = params
150        .as_ref()
151        .and_then(|p| p.get("schema"))
152        .and_then(|v| v.as_str())
153        .unwrap_or("public");
154
155    validate_identifier(schema_name, "schema")?;
156
157    let table_name = params
158        .as_ref()
159        .and_then(|p| p.get("table"))
160        .and_then(|v| v.as_str())
161        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
162
163    validate_identifier(table_name, "table")?;
164
165    let columns = client
166        .query(
167            "SELECT c.column_name::text, c.data_type::text, c.is_nullable::text,
168                    c.column_default::text, c.ordinal_position,
169                    COALESCE(pgd.description, '')::text AS column_description,
170                    CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END AS is_pk,
171                    CASE WHEN uc.column_name IS NOT NULL THEN true ELSE false END AS is_unique
172             FROM information_schema.columns c
173             LEFT JOIN pg_catalog.pg_statio_all_tables st
174                 ON st.relname = c.table_name AND st.schemaname = c.table_schema
175             LEFT JOIN pg_catalog.pg_description pgd
176                 ON pgd.objoid = st.relid AND pgd.objsubid = c.ordinal_position
177             LEFT JOIN (
178                 SELECT ku.column_name, tc.table_schema, tc.table_name
179                 FROM information_schema.table_constraints tc
180                 JOIN information_schema.key_column_usage ku
181                     ON tc.constraint_name = ku.constraint_name
182                     AND tc.table_schema = ku.table_schema
183                 WHERE tc.constraint_type = 'PRIMARY KEY'
184             ) pk ON pk.column_name = c.column_name
185                 AND pk.table_schema = c.table_schema
186                 AND pk.table_name = c.table_name
187             LEFT JOIN (
188                 SELECT ku.column_name, tc.table_schema, tc.table_name
189                 FROM information_schema.table_constraints tc
190                 JOIN information_schema.key_column_usage ku
191                     ON tc.constraint_name = ku.constraint_name
192                     AND tc.table_schema = ku.table_schema
193                 WHERE tc.constraint_type = 'UNIQUE'
194             ) uc ON uc.column_name = c.column_name
195                 AND uc.table_schema = c.table_schema
196                 AND uc.table_name = c.table_name
197             WHERE c.table_schema = $1 AND c.table_name = $2
198             ORDER BY c.ordinal_position",
199            &[&schema_name, &table_name],
200        )
201        .await?;
202
203    let cols: Vec<Value> = columns.iter().map(|row| {
204        json!({
205            "name": row.get::<_, String>(0),
206            "type": row.get::<_, String>(1),
207            "nullable": row.get::<_, String>(2) == "YES",
208            "default": row.get::<_, Option<String>>(3),
209            "position": row.get::<_, i32>(4),
210            "description": row.get::<_, String>(5),
211            "is_primary_key": row.get::<_, bool>(6),
212            "is_unique": row.get::<_, bool>(7),
213        })
214    }).collect();
215
216    let indexes = client
217        .query(
218            "SELECT indexname::text, indexdef::text
219             FROM pg_indexes
220             WHERE schemaname = $1 AND tablename = $2
221             ORDER BY indexname",
222            &[&schema_name, &table_name],
223        )
224        .await?;
225
226    let idxs: Vec<Value> = indexes.iter().map(|row| {
227        json!({
228            "name": row.get::<_, String>(0),
229            "definition": row.get::<_, String>(1),
230        })
231    }).collect();
232
233    let foreign_keys = client
234        .query(
235            "SELECT kcu.column_name::text,
236                    ccu.table_schema::text AS foreign_schema,
237                    ccu.table_name::text AS foreign_table,
238                    ccu.column_name::text AS foreign_column,
239                    rc.update_rule::text, rc.delete_rule::text
240             FROM information_schema.table_constraints tc
241             JOIN information_schema.key_column_usage kcu
242                 ON tc.constraint_name = kcu.constraint_name
243                 AND tc.table_schema = kcu.table_schema
244             JOIN information_schema.constraint_column_usage ccu
245                 ON tc.constraint_name = ccu.constraint_name
246                 AND tc.table_schema = ccu.table_schema
247             JOIN information_schema.referential_constraints rc
248                 ON tc.constraint_name = rc.constraint_name
249                 AND tc.table_schema = rc.constraint_schema
250             WHERE tc.constraint_type = 'FOREIGN KEY'
251                 AND tc.table_schema = $1 AND tc.table_name = $2
252             ORDER BY kcu.ordinal_position",
253            &[&schema_name, &table_name],
254        )
255        .await?;
256
257    let fks: Vec<Value> = foreign_keys.iter().map(|row| {
258        json!({
259            "column": row.get::<_, String>(0),
260            "references_schema": row.get::<_, String>(1),
261            "references_table": row.get::<_, String>(2),
262            "references_column": row.get::<_, String>(3),
263            "on_update": row.get::<_, String>(4),
264            "on_delete": row.get::<_, String>(5),
265        })
266    }).collect();
267
268    let constraints = client
269        .query(
270            "SELECT constraint_name::text, constraint_type::text
271             FROM information_schema.table_constraints
272             WHERE table_schema = $1 AND table_name = $2
273             ORDER BY constraint_name",
274            &[&schema_name, &table_name],
275        )
276        .await?;
277
278    let cons: Vec<Value> = constraints.iter().map(|row| {
279        json!({
280            "name": row.get::<_, String>(0),
281            "type": row.get::<_, String>(1),
282        })
283    }).collect();
284
285    let row_estimate = client
286        .query_one(
287            "SELECT n_live_tup FROM pg_stat_user_tables
288             WHERE schemaname = $1 AND relname = $2",
289            &[&schema_name, &table_name],
290        )
291        .await
292        .map(|r| r.get::<_, Option<f64>>(0))
293        .unwrap_or(None);
294
295    let table_size = client
296        .query_one(
297            "SELECT pg_size_pretty(pg_total_relation_size($1::regclass))",
298            &[&format!("{}.{}", schema_name, table_name)],
299        )
300        .await
301        .map(|r| r.get::<_, Option<String>>(0))
302        .unwrap_or(None);
303
304    Ok(json!({
305        "table": table_name,
306        "schema": schema_name,
307        "columns": cols,
308        "indexes": idxs,
309        "foreign_keys": fks,
310        "constraints": cons,
311        "estimated_rows": row_estimate,
312        "total_size": table_size,
313    }))
314}
315
316/// 6. List triggers
317pub async fn list_triggers(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
318    let table = params
319        .as_ref()
320        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
321        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
322
323    let schema = params
324        .as_ref()
325        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
326        .unwrap_or("public");
327
328    let limit = params
329        .as_ref()
330        .and_then(|p| p.get("limit").and_then(|v| v.as_i64()))
331        .unwrap_or(1000);
332
333    validate_identifier(table, "table")?;
334    validate_identifier(schema, "schema")?;
335
336    if !((1..=10000).contains(&limit)) {
337        return Err(MCPError::InvalidParams(
338            format!("'limit' must be between 1 and 10000 (got {})", limit)
339        ));
340    }
341
342    let rows = client
343        .query(
344            "SELECT trigger_name, event_object_table, event_manipulation,
345                    action_timing, action_statement, trigger_schema
346             FROM information_schema.triggers
347             WHERE event_object_table = $1 AND trigger_schema = $2
348             ORDER BY trigger_name
349             LIMIT $3",
350            &[&table, &schema, &limit],
351        )
352        .await?;
353
354    let triggers: Vec<Value> = rows
355        .iter()
356        .map(|row| {
357            json!({
358                "name": row.get::<_, String>(0),
359                "table": row.get::<_, String>(1),
360                "event": row.get::<_, String>(2),
361                "timing": row.get::<_, String>(3),
362                "statement": row.get::<_, String>(4),
363                "schema": row.get::<_, String>(5),
364            })
365        })
366        .collect();
367
368    Ok(json!({
369        "table": table,
370        "schema": schema,
371        "trigger_count": triggers.len(),
372        "triggers": triggers
373    }))
374}
375
376/// 7. Create index
377pub async fn create_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
378    let index_name = params
379        .as_ref()
380        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
381        .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
382
383    let table = params
384        .as_ref()
385        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
386        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
387
388    let columns = params
389        .as_ref()
390        .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
391        .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
392
393    if columns.is_empty() {
394        return Err(MCPError::InvalidParams("'columns' array must not be empty".into()));
395    }
396
397    validate_identifier(index_name, "index_name")?;
398    validate_identifier(table, "table")?;
399
400    let mut column_list = Vec::new();
401    for col in columns {
402        let col_name = col.as_str()
403            .ok_or_else(|| MCPError::InvalidParams("Column names must be strings".into()))?;
404        validate_identifier(col_name, "column")?;
405        column_list.push(col_name.to_string());
406    }
407
408    let unique = params
409        .as_ref()
410        .and_then(|p| p.get("unique").and_then(|v| v.as_bool()))
411        .unwrap_or(false);
412
413    let concurrent = params
414        .as_ref()
415        .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
416        .unwrap_or(false);
417
418    let unique_str = if unique { "UNIQUE " } else { "" };
419    let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
420    let columns_str = column_list.join(", ");
421
422    let sql = format!(
423        "CREATE {}INDEX {} {} ON {}({})",
424        unique_str,
425        concurrent_str,
426        index_name,
427        table,
428        columns_str
429    );
430
431    client.execute(&sql, &[]).await?;
432
433    Ok(json!({
434        "status": "success",
435        "action": "CREATE INDEX",
436        "index_name": index_name,
437        "table": table,
438        "columns": column_list,
439        "unique": unique,
440        "concurrent": concurrent
441    }))
442}
443
444/// 8. Drop index
445pub async fn drop_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
446    let index_name = params
447        .as_ref()
448        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
449        .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
450
451    validate_identifier(index_name, "index_name")?;
452
453    let if_exists = params
454        .as_ref()
455        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
456        .unwrap_or(false);
457
458    let concurrent = params
459        .as_ref()
460        .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
461        .unwrap_or(false);
462
463    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
464    let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
465
466    let sql = format!(
467        "DROP INDEX {}{}{}",
468        if_exists_str,
469        concurrent_str,
470        index_name
471    );
472
473    client.execute(&sql, &[]).await?;
474
475    Ok(json!({
476        "status": "success",
477        "action": "DROP INDEX",
478        "index_name": index_name,
479        "if_exists": if_exists,
480        "concurrent": concurrent
481    }))
482}
483
484/// 9. Create partition
485pub async fn create_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
486    let table = params
487        .as_ref()
488        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
489        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
490
491    let partition_name = params
492        .as_ref()
493        .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
494        .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
495
496    let partition_type = params
497        .as_ref()
498        .and_then(|p| p.get("partition_type").and_then(|v| v.as_str()))
499        .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_type' parameter (RANGE/LIST/HASH)".into()))?;
500
501    validate_identifier(table, "table")?;
502    validate_identifier(partition_name, "partition_name")?;
503
504    let partition_type_upper = partition_type.to_uppercase();
505    if !["RANGE", "LIST", "HASH"].contains(&partition_type_upper.as_str()) {
506        return Err(MCPError::InvalidParams(
507            format!("'partition_type' must be RANGE, LIST, or HASH (got {})", partition_type)
508        ));
509    }
510
511    let column = params
512        .as_ref()
513        .and_then(|p| p.get("column").and_then(|v| v.as_str()))
514        .ok_or_else(|| MCPError::InvalidParams("Missing 'column' parameter".into()))?;
515
516    validate_identifier(column, "column")?;
517
518    let values = params
519        .as_ref()
520        .and_then(|p| p.get("values").and_then(|v| v.as_str()))
521        .ok_or_else(|| MCPError::InvalidParams(
522            "Missing 'values' parameter (FOR RANGE: 'FROM (x) TO (y)' or FOR LIST: 'IN (values)' or FOR HASH: 'MODULUS n REMAINDER r')".into()
523        ))?;
524
525    if values.contains(';') || values.contains("--") {
526        return Err(MCPError::InvalidParams(
527            "Invalid 'values' parameter: semicolons and SQL comments not allowed".into()
528        ));
529    }
530
531    let sql = format!(
532        "CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
533        partition_name,
534        table,
535        values
536    );
537
538    client.execute(&sql, &[]).await?;
539
540    Ok(json!({
541        "status": "success",
542        "action": "CREATE TABLE PARTITION",
543        "table": table,
544        "partition_name": partition_name,
545        "partition_type": partition_type,
546        "column": column
547    }))
548}
549
550/// 10. Drop partition
551pub async fn drop_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
552    let table = params
553        .as_ref()
554        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
555        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
556
557    let partition_name = params
558        .as_ref()
559        .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
560        .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
561
562    validate_identifier(partition_name, "partition_name")?;
563
564    let if_exists = params
565        .as_ref()
566        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
567        .unwrap_or(false);
568
569    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
570
571    let sql = format!(
572        "DROP TABLE {}{}",
573        if_exists_str,
574        partition_name
575    );
576
577    client.execute(&sql, &[]).await?;
578
579    Ok(json!({
580        "status": "success",
581        "action": "DROP TABLE PARTITION",
582        "table": table,
583        "partition_name": partition_name,
584        "if_exists": if_exists
585    }))
586}
587
588/// 11. Create table
589pub async fn create_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
590    let table = params
591        .as_ref()
592        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
593        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
594
595    let columns = params
596        .as_ref()
597        .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
598        .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
599
600    if columns.is_empty() {
601        return Err(MCPError::InvalidParams("'columns' array must not be empty".into()));
602    }
603
604    validate_identifier(table, "table")?;
605
606    let mut column_defs = Vec::new();
607    for (idx, col) in columns.iter().enumerate() {
608        let col_def = col.as_str()
609            .ok_or_else(|| MCPError::InvalidParams(format!("Column {} must be a string with format: 'name TYPE [constraints]'", idx)))?;
610
611        if col_def.is_empty() {
612            return Err(MCPError::InvalidParams(format!("Column {} definition cannot be empty", idx)));
613        }
614
615        if col_def.contains(';') || col_def.contains("--") {
616            return Err(MCPError::InvalidParams(
617                format!("Column {} definition contains dangerous SQL patterns", idx)
618            ));
619        }
620
621        column_defs.push(col_def.to_string());
622    }
623
624    let columns_str = column_defs.join(", ");
625    let sql = format!("CREATE TABLE {} ({})", table, columns_str);
626
627    client.execute(&sql, &[]).await?;
628
629    Ok(json!({
630        "status": "success",
631        "action": "CREATE TABLE",
632        "table": table,
633        "column_count": columns.len()
634    }))
635}
636
637/// 12. Drop table
638pub async fn drop_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
639    let table = params
640        .as_ref()
641        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
642        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
643
644    validate_identifier(table, "table")?;
645
646    let if_exists = params
647        .as_ref()
648        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
649        .unwrap_or(false);
650
651    let cascade = params
652        .as_ref()
653        .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
654        .unwrap_or(false);
655
656    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
657    let cascade_str = if cascade { " CASCADE" } else { "" };
658
659    let sql = format!("DROP TABLE {}{}{}", if_exists_str, table, cascade_str);
660
661    client.execute(&sql, &[]).await?;
662
663    Ok(json!({
664        "status": "success",
665        "action": "DROP TABLE",
666        "table": table,
667        "if_exists": if_exists,
668        "cascade": cascade
669    }))
670}
671
672/// 13. Create view
673pub async fn create_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
674    let view_name = params
675        .as_ref()
676        .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
677        .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
678
679    let query = params
680        .as_ref()
681        .and_then(|p| p.get("query").and_then(|v| v.as_str()))
682        .ok_or_else(|| MCPError::InvalidParams("Missing 'query' parameter".into()))?;
683
684    validate_identifier(view_name, "view_name")?;
685
686    if query.trim().is_empty() {
687        return Err(MCPError::InvalidParams("'query' cannot be empty".into()));
688    }
689
690    let materialized = params
691        .as_ref()
692        .and_then(|p| p.get("materialized").and_then(|v| v.as_bool()))
693        .unwrap_or(false);
694
695    let or_replace = params
696        .as_ref()
697        .and_then(|p| p.get("or_replace").and_then(|v| v.as_bool()))
698        .unwrap_or(false);
699
700    let materialized_str = if materialized { "MATERIALIZED " } else { "" };
701    let or_replace_str = if or_replace { "OR REPLACE " } else { "" };
702
703    let sql = format!("CREATE {}{}VIEW {} AS {}", or_replace_str, materialized_str, view_name, query);
704
705    client.execute(&sql, &[]).await?;
706
707    Ok(json!({
708        "status": "success",
709        "action": "CREATE VIEW",
710        "view_name": view_name,
711        "materialized": materialized,
712        "or_replace": or_replace
713    }))
714}
715
716/// 14. Drop view
717pub async fn drop_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
718    let view_name = params
719        .as_ref()
720        .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
721        .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
722
723    validate_identifier(view_name, "view_name")?;
724
725    let if_exists = params
726        .as_ref()
727        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
728        .unwrap_or(false);
729
730    let cascade = params
731        .as_ref()
732        .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
733        .unwrap_or(false);
734
735    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
736    let cascade_str = if cascade { " CASCADE" } else { "" };
737
738    let sql = format!("DROP VIEW {}{}{}", if_exists_str, view_name, cascade_str);
739
740    client.execute(&sql, &[]).await?;
741
742    Ok(json!({
743        "status": "success",
744        "action": "DROP VIEW",
745        "view_name": view_name,
746        "if_exists": if_exists,
747        "cascade": cascade
748    }))
749}
750
751/// 15. Alter view
752pub async fn alter_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
753    let view_name = params
754        .as_ref()
755        .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
756        .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
757
758    validate_identifier(view_name, "view_name")?;
759
760    let rename_to = params
761        .as_ref()
762        .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
763
764    let set_schema = params
765        .as_ref()
766        .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
767
768    if rename_to.is_none() && set_schema.is_none() {
769        return Err(MCPError::InvalidParams(
770            "Must provide either 'rename_to' or 'set_schema' parameter".into()
771        ));
772    }
773
774    let mut action_desc = Vec::new();
775
776    if let Some(new_name) = rename_to {
777        validate_identifier(new_name, "rename_to")?;
778        let sql = format!("ALTER VIEW {} RENAME TO {}", view_name, new_name);
779        client.execute(&sql, &[]).await?;
780        action_desc.push(format!("renamed to {}", new_name));
781    }
782
783    if let Some(schema) = set_schema {
784        validate_identifier(schema, "set_schema")?;
785        let sql = format!("ALTER VIEW {} SET SCHEMA {}", view_name, schema);
786        client.execute(&sql, &[]).await?;
787        action_desc.push(format!("moved to schema {}", schema));
788    }
789
790    Ok(json!({
791        "status": "success",
792        "action": "ALTER VIEW",
793        "view_name": view_name,
794        "changes": action_desc
795    }))
796}
797
798/// 16. Create schema
799pub async fn create_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
800    let schema_name = params
801        .as_ref()
802        .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
803        .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
804
805    validate_identifier(schema_name, "schema_name")?;
806
807    let if_not_exists = params
808        .as_ref()
809        .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
810        .unwrap_or(false);
811
812    let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
813
814    let sql = format!("CREATE SCHEMA {}{}", if_not_exists_str, schema_name);
815
816    client.execute(&sql, &[]).await?;
817
818    Ok(json!({
819        "status": "success",
820        "action": "CREATE SCHEMA",
821        "schema_name": schema_name,
822        "if_not_exists": if_not_exists
823    }))
824}
825
826/// 17. Drop schema
827pub async fn drop_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
828    let schema_name = params
829        .as_ref()
830        .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
831        .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
832
833    validate_identifier(schema_name, "schema_name")?;
834
835    let if_exists = params
836        .as_ref()
837        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
838        .unwrap_or(false);
839
840    let cascade = params
841        .as_ref()
842        .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
843        .unwrap_or(false);
844
845    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
846    let cascade_str = if cascade { " CASCADE" } else { "" };
847
848    let sql = format!("DROP SCHEMA {}{}{}", if_exists_str, schema_name, cascade_str);
849
850    client.execute(&sql, &[]).await?;
851
852    Ok(json!({
853        "status": "success",
854        "action": "DROP SCHEMA",
855        "schema_name": schema_name,
856        "if_exists": if_exists,
857        "cascade": cascade
858    }))
859}
860
861/// 18. Create sequence
862pub async fn create_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
863    let sequence_name = params
864        .as_ref()
865        .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
866        .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
867
868    validate_identifier(sequence_name, "sequence_name")?;
869
870    let if_not_exists = params
871        .as_ref()
872        .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
873        .unwrap_or(false);
874
875    let start = params
876        .as_ref()
877        .and_then(|p| p.get("start").and_then(|v| v.as_i64()))
878        .unwrap_or(1);
879
880    let increment = params
881        .as_ref()
882        .and_then(|p| p.get("increment").and_then(|v| v.as_i64()))
883        .unwrap_or(1);
884
885    let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
886
887    let sql = format!(
888        "CREATE SEQUENCE {}{} START {} INCREMENT {}",
889        if_not_exists_str,
890        sequence_name,
891        start,
892        increment
893    );
894
895    client.execute(&sql, &[]).await?;
896
897    Ok(json!({
898        "status": "success",
899        "action": "CREATE SEQUENCE",
900        "sequence_name": sequence_name,
901        "start": start,
902        "increment": increment,
903        "if_not_exists": if_not_exists
904    }))
905}
906
907/// 19. Drop sequence
908pub async fn drop_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
909    let sequence_name = params
910        .as_ref()
911        .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
912        .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
913
914    validate_identifier(sequence_name, "sequence_name")?;
915
916    let if_exists = params
917        .as_ref()
918        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
919        .unwrap_or(false);
920
921    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
922
923    let sql = format!("DROP SEQUENCE {}{}", if_exists_str, sequence_name);
924
925    client.execute(&sql, &[]).await?;
926
927    Ok(json!({
928        "status": "success",
929        "action": "DROP SEQUENCE",
930        "sequence_name": sequence_name,
931        "if_exists": if_exists
932    }))
933}
934
935/// 20. Alter index
936pub async fn alter_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
937    let index_name = params
938        .as_ref()
939        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
940        .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
941
942    validate_identifier(index_name, "index_name")?;
943
944    let rename_to = params
945        .as_ref()
946        .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
947
948    let set_schema = params
949        .as_ref()
950        .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
951
952    if rename_to.is_none() && set_schema.is_none() {
953        return Err(MCPError::InvalidParams(
954            "Must provide either 'rename_to' or 'set_schema' parameter".into()
955        ));
956    }
957
958    let mut action_desc = Vec::new();
959
960    if let Some(new_name) = rename_to {
961        validate_identifier(new_name, "rename_to")?;
962        let sql = format!("ALTER INDEX {} RENAME TO {}", index_name, new_name);
963        client.execute(&sql, &[]).await?;
964        action_desc.push(format!("renamed to {}", new_name));
965    }
966
967    if let Some(schema) = set_schema {
968        validate_identifier(schema, "set_schema")?;
969        let sql = format!("ALTER INDEX {} SET SCHEMA {}", index_name, schema);
970        client.execute(&sql, &[]).await?;
971        action_desc.push(format!("moved to schema {}", schema));
972    }
973
974    Ok(json!({
975        "status": "success",
976        "action": "ALTER INDEX",
977        "index_name": index_name,
978        "changes": action_desc
979    }))
980}
981
982/// 21. List partitions
983pub async fn list_partitions(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
984    let table = params
985        .as_ref()
986        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
987        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
988
989    validate_identifier(table, "table")?;
990
991    let rows = client
992        .query(
993            "SELECT child.relname as partition_name, pg_size_pretty(pg_relation_size(child.oid)) as size
994             FROM pg_inherits
995             JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
996             JOIN pg_class child ON pg_inherits.inhrelid = child.oid
997             JOIN pg_namespace ON child.relnamespace = pg_namespace.oid
998             WHERE parent.relname = $1
999             ORDER BY child.relname",
1000            &[&table],
1001        )
1002        .await?;
1003
1004    let partitions: Vec<Value> = rows
1005        .iter()
1006        .map(|row| {
1007            json!({
1008                "partition_name": row.get::<_, String>(0),
1009                "size": row.get::<_, String>(1),
1010            })
1011        })
1012        .collect();
1013
1014    Ok(json!({
1015        "table": table,
1016        "partition_count": partitions.len(),
1017        "partitions": partitions
1018    }))
1019}
1020
1021/// 22. Backup table
1022pub async fn backup_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
1023    let table = params
1024        .as_ref()
1025        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
1026        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
1027
1028    validate_identifier(table, "table")?;
1029
1030    let backup_name = format!("backup_{}", table);
1031    validate_identifier(&backup_name, "backup_name")?;
1032
1033    // Verify source table exists
1034    let _: (i32,) = client
1035        .query_one(
1036            "SELECT 1 FROM information_schema.tables
1037             WHERE table_name = $1 AND table_schema NOT IN ('pg_catalog', 'information_schema')",
1038            &[&table],
1039        )
1040        .await
1041        .map_err(|_| MCPError::InvalidParams(format!("Table '{}' does not exist", table)))?
1042        .try_get(0)
1043        .map(|val| (val,))
1044        .map_err(|_| MCPError::InvalidParams("Could not verify table existence".into()))?;
1045
1046    // Check if backup already exists
1047    let backup_exists = client
1048        .query_opt(
1049            "SELECT 1 FROM information_schema.tables WHERE table_name = $1",
1050            &[&backup_name],
1051        )
1052        .await?
1053        .is_some();
1054
1055    if backup_exists {
1056        return Err(MCPError::InvalidParams(
1057            format!("Backup table '{}' already exists. Drop it first or use a different table.", backup_name)
1058        ));
1059    }
1060
1061    // Create backup table with all columns and structure
1062    let create_backup_sql = format!("CREATE TABLE {} AS SELECT * FROM {}", backup_name, table);
1063    client.execute(&create_backup_sql, &[]).await?;
1064
1065    // Get column info for the original table
1066    let columns: Vec<(String, String, String)> = client
1067        .query(
1068            "SELECT column_name, data_type, is_nullable
1069             FROM information_schema.columns
1070             WHERE table_name = $1
1071             ORDER BY ordinal_position",
1072            &[&table],
1073        )
1074        .await?
1075        .iter()
1076        .map(|row| {
1077            (
1078                row.get::<_, String>(0),
1079                row.get::<_, String>(1),
1080                row.get::<_, String>(2),
1081            )
1082        })
1083        .collect();
1084
1085    // Get and copy indexes
1086    let indexes: Vec<String> = client
1087        .query(
1088            "SELECT indexname FROM pg_indexes WHERE tablename = $1 AND schemaname NOT IN ('pg_catalog')",
1089            &[&table],
1090        )
1091        .await?
1092        .iter()
1093        .map(|row| row.get::<_, String>(0))
1094        .collect();
1095
1096    let mut indexes_created = 0;
1097    for idx_name in indexes {
1098        let new_idx_name = format!("{}_on_{}", idx_name, backup_name);
1099        let idx_def: String = client
1100            .query_one(
1101                "SELECT indexdef FROM pg_indexes WHERE indexname = $1",
1102                &[&idx_name],
1103            )
1104            .await?
1105            .get(0);
1106
1107        let updated_def = idx_def
1108            .replace(&format!("ON {}", table), &format!("ON {}", backup_name))
1109            .replace(&idx_name, &new_idx_name);
1110
1111        if client.execute(&updated_def, &[]).await.is_ok() {
1112            indexes_created += 1;
1113        }
1114    }
1115
1116    // Get row count
1117    let row_count: (i64,) = client
1118        .query_one(&format!("SELECT COUNT(*) FROM {}", backup_name), &[])
1119        .await?
1120        .try_get(0)
1121        .map(|val| (val,))
1122        .map_err(|_| MCPError::InvalidParams("Could not count rows".into()))?;
1123
1124    Ok(json!({
1125        "status": "success",
1126        "action": "BACKUP TABLE",
1127        "original_table": table,
1128        "backup_table": backup_name,
1129        "rows_copied": row_count.0,
1130        "columns_copied": columns.len(),
1131        "indexes_created": indexes_created,
1132        "message": format!("Table '{}' backed up to '{}' with {} rows", table, backup_name, row_count.0)
1133    }))
1134}