Skip to main content

mcp_postgres/actions/
schema.rs

1use crate::errors::{MCPError, Result as MCPResult};
2use crate::validation::validate_identifier;
3use serde_json::{Value, json};
4use tokio_postgres::Client;
5
6/// 1. List all tables
7pub async fn list_tables(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
8    let rows = client
9        .query(
10            "SELECT table_schema, table_name, table_type
11             FROM information_schema.tables
12             WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
13             ORDER BY table_schema, table_name",
14            &[],
15        )
16        .await?;
17
18    let tables: Vec<Value> = rows
19        .iter()
20        .map(|row| {
21            json!({
22                "schema": row.get::<_, String>(0),
23                "name": row.get::<_, String>(1),
24                "type": row.get::<_, String>(2),
25            })
26        })
27        .collect();
28
29    Ok(json!({ "tables": tables }))
30}
31
32/// 2. Describe table structure
33pub async fn describe_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
34    let table_name = params
35        .as_ref()
36        .and_then(|p| p.get("table"))
37        .and_then(|v| v.as_str())
38        .ok_or_else(|| {
39            crate::errors::MCPError::InvalidParams("Missing 'table' parameter".into())
40        })?;
41
42    validate_identifier(table_name, "table")?;
43
44    let rows = client
45        .query(
46            "SELECT column_name, data_type, is_nullable, column_default, ordinal_position
47             FROM information_schema.columns
48             WHERE table_name = $1
49             ORDER BY ordinal_position",
50            &[&table_name],
51        )
52        .await?;
53
54    let columns: Vec<Value> = rows
55        .iter()
56        .map(|row| {
57            json!({
58                "name": row.get::<_, String>(0),
59                "type": row.get::<_, String>(1),
60                "nullable": row.get::<_, String>(2),
61                "default": row.get::<_, Option<String>>(3),
62                "position": row.get::<_, i32>(4),
63            })
64        })
65        .collect();
66
67    Ok(json!({ "columns": columns }))
68}
69
70/// 3. List all indexes
71pub async fn list_indexes(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
72    let rows = client
73        .query(
74            "SELECT schemaname, tablename, indexname, indexdef
75             FROM pg_indexes
76             WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
77             ORDER BY schemaname, tablename, indexname",
78            &[],
79        )
80        .await?;
81
82    let indexes: Vec<Value> = rows
83        .iter()
84        .map(|row| {
85            json!({
86                "schema": row.get::<_, String>(0),
87                "table": row.get::<_, String>(1),
88                "name": row.get::<_, String>(2),
89                "definition": row.get::<_, String>(3),
90            })
91        })
92        .collect();
93
94    Ok(json!({ "indexes": indexes }))
95}
96
97/// 4. List schemas
98pub async fn list_schemas(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
99    let rows = client
100        .query(
101            "SELECT schema_name, schema_owner
102             FROM information_schema.schemata
103             WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
104             ORDER BY schema_name",
105            &[],
106        )
107        .await?;
108
109    let schemas: Vec<Value> = rows
110        .iter()
111        .map(|row| {
112            json!({
113                "name": row.get::<_, String>(0),
114                "owner": row.get::<_, String>(1),
115            })
116        })
117        .collect();
118
119    Ok(json!({ "schemas": schemas }))
120}
121
122/// 5. Show constraints
123pub async fn show_constraints(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
124    let rows = client
125        .query(
126            "SELECT table_schema, table_name, constraint_name, constraint_type
127             FROM information_schema.table_constraints
128             WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
129             ORDER BY table_schema, table_name, constraint_name",
130            &[],
131        )
132        .await?;
133
134    let constraints: Vec<Value> = rows
135        .iter()
136        .map(|row| {
137            json!({
138                "schema": row.get::<_, String>(0),
139                "table": row.get::<_, String>(1),
140                "name": row.get::<_, String>(2),
141                "type": row.get::<_, String>(3),
142            })
143        })
144        .collect();
145
146    Ok(json!({ "constraints": constraints }))
147}
148
149/// 5b. Get detailed object info (columns, constraints, indexes, FKs, descriptions)
150pub async fn get_object_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
151    let schema_name = params
152        .as_ref()
153        .and_then(|p| p.get("schema"))
154        .and_then(|v| v.as_str())
155        .unwrap_or("public");
156
157    validate_identifier(schema_name, "schema")?;
158
159    let table_name = params
160        .as_ref()
161        .and_then(|p| p.get("table"))
162        .and_then(|v| v.as_str())
163        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
164
165    validate_identifier(table_name, "table")?;
166
167    let columns = client
168        .query(
169            "SELECT c.column_name::text, c.data_type::text, c.is_nullable::text,
170                    c.column_default::text, c.ordinal_position,
171                    COALESCE(pgd.description, '')::text AS column_description,
172                    CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END AS is_pk,
173                    CASE WHEN uc.column_name IS NOT NULL THEN true ELSE false END AS is_unique
174             FROM information_schema.columns c
175             LEFT JOIN pg_catalog.pg_statio_all_tables st
176                 ON st.relname = c.table_name AND st.schemaname = c.table_schema
177             LEFT JOIN pg_catalog.pg_description pgd
178                 ON pgd.objoid = st.relid AND pgd.objsubid = c.ordinal_position
179             LEFT JOIN (
180                 SELECT ku.column_name, tc.table_schema, tc.table_name
181                 FROM information_schema.table_constraints tc
182                 JOIN information_schema.key_column_usage ku
183                     ON tc.constraint_name = ku.constraint_name
184                     AND tc.table_schema = ku.table_schema
185                 WHERE tc.constraint_type = 'PRIMARY KEY'
186             ) pk ON pk.column_name = c.column_name
187                 AND pk.table_schema = c.table_schema
188                 AND pk.table_name = c.table_name
189             LEFT JOIN (
190                 SELECT ku.column_name, tc.table_schema, tc.table_name
191                 FROM information_schema.table_constraints tc
192                 JOIN information_schema.key_column_usage ku
193                     ON tc.constraint_name = ku.constraint_name
194                     AND tc.table_schema = ku.table_schema
195                 WHERE tc.constraint_type = 'UNIQUE'
196             ) uc ON uc.column_name = c.column_name
197                 AND uc.table_schema = c.table_schema
198                 AND uc.table_name = c.table_name
199             WHERE c.table_schema = $1 AND c.table_name = $2
200             ORDER BY c.ordinal_position",
201            &[&schema_name, &table_name],
202        )
203        .await?;
204
205    let cols: Vec<Value> = columns
206        .iter()
207        .map(|row| {
208            json!({
209                "name": row.get::<_, String>(0),
210                "type": row.get::<_, String>(1),
211                "nullable": row.get::<_, String>(2) == "YES",
212                "default": row.get::<_, Option<String>>(3),
213                "position": row.get::<_, i32>(4),
214                "description": row.get::<_, String>(5),
215                "is_primary_key": row.get::<_, bool>(6),
216                "is_unique": row.get::<_, bool>(7),
217            })
218        })
219        .collect();
220
221    let indexes = client
222        .query(
223            "SELECT indexname::text, indexdef::text
224             FROM pg_indexes
225             WHERE schemaname = $1 AND tablename = $2
226             ORDER BY indexname",
227            &[&schema_name, &table_name],
228        )
229        .await?;
230
231    let idxs: Vec<Value> = indexes
232        .iter()
233        .map(|row| {
234            json!({
235                "name": row.get::<_, String>(0),
236                "definition": row.get::<_, String>(1),
237            })
238        })
239        .collect();
240
241    let foreign_keys = client
242        .query(
243            "SELECT kcu.column_name::text,
244                    ccu.table_schema::text AS foreign_schema,
245                    ccu.table_name::text AS foreign_table,
246                    ccu.column_name::text AS foreign_column,
247                    rc.update_rule::text, rc.delete_rule::text
248             FROM information_schema.table_constraints tc
249             JOIN information_schema.key_column_usage kcu
250                 ON tc.constraint_name = kcu.constraint_name
251                 AND tc.table_schema = kcu.table_schema
252             JOIN information_schema.constraint_column_usage ccu
253                 ON tc.constraint_name = ccu.constraint_name
254                 AND tc.table_schema = ccu.table_schema
255             JOIN information_schema.referential_constraints rc
256                 ON tc.constraint_name = rc.constraint_name
257                 AND tc.table_schema = rc.constraint_schema
258             WHERE tc.constraint_type = 'FOREIGN KEY'
259                 AND tc.table_schema = $1 AND tc.table_name = $2
260             ORDER BY kcu.ordinal_position",
261            &[&schema_name, &table_name],
262        )
263        .await?;
264
265    let fks: Vec<Value> = foreign_keys
266        .iter()
267        .map(|row| {
268            json!({
269                "column": row.get::<_, String>(0),
270                "references_schema": row.get::<_, String>(1),
271                "references_table": row.get::<_, String>(2),
272                "references_column": row.get::<_, String>(3),
273                "on_update": row.get::<_, String>(4),
274                "on_delete": row.get::<_, String>(5),
275            })
276        })
277        .collect();
278
279    let constraints = client
280        .query(
281            "SELECT constraint_name::text, constraint_type::text
282             FROM information_schema.table_constraints
283             WHERE table_schema = $1 AND table_name = $2
284             ORDER BY constraint_name",
285            &[&schema_name, &table_name],
286        )
287        .await?;
288
289    let cons: Vec<Value> = constraints
290        .iter()
291        .map(|row| {
292            json!({
293                "name": row.get::<_, String>(0),
294                "type": row.get::<_, String>(1),
295            })
296        })
297        .collect();
298
299    let row_estimate = client
300        .query_one(
301            "SELECT n_live_tup FROM pg_stat_user_tables
302             WHERE schemaname = $1 AND relname = $2",
303            &[&schema_name, &table_name],
304        )
305        .await
306        .map(|r| r.get::<_, Option<f64>>(0))
307        .unwrap_or(None);
308
309    let table_size = client
310        .query_one(
311            "SELECT pg_size_pretty(pg_total_relation_size($1::regclass))",
312            &[&format!("{}.{}", schema_name, table_name)],
313        )
314        .await
315        .map(|r| r.get::<_, Option<String>>(0))
316        .unwrap_or(None);
317
318    Ok(json!({
319        "table": table_name,
320        "schema": schema_name,
321        "columns": cols,
322        "indexes": idxs,
323        "foreign_keys": fks,
324        "constraints": cons,
325        "estimated_rows": row_estimate,
326        "total_size": table_size,
327    }))
328}
329
330/// 6. List triggers
331pub async fn list_triggers(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
332    let table = params
333        .as_ref()
334        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
335        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
336
337    let schema = params
338        .as_ref()
339        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
340        .unwrap_or("public");
341
342    let limit = params
343        .as_ref()
344        .and_then(|p| p.get("limit").and_then(|v| v.as_i64()))
345        .unwrap_or(1000);
346
347    validate_identifier(table, "table")?;
348    validate_identifier(schema, "schema")?;
349
350    if !((1..=10000).contains(&limit)) {
351        return Err(MCPError::InvalidParams(format!(
352            "'limit' must be between 1 and 10000 (got {})",
353            limit
354        )));
355    }
356
357    let rows = client
358        .query(
359            "SELECT trigger_name, event_object_table, event_manipulation,
360                    action_timing, action_statement, trigger_schema
361             FROM information_schema.triggers
362             WHERE event_object_table = $1 AND trigger_schema = $2
363             ORDER BY trigger_name
364             LIMIT $3",
365            &[&table, &schema, &limit],
366        )
367        .await?;
368
369    let triggers: Vec<Value> = rows
370        .iter()
371        .map(|row| {
372            json!({
373                "name": row.get::<_, String>(0),
374                "table": row.get::<_, String>(1),
375                "event": row.get::<_, String>(2),
376                "timing": row.get::<_, String>(3),
377                "statement": row.get::<_, String>(4),
378                "schema": row.get::<_, String>(5),
379            })
380        })
381        .collect();
382
383    Ok(json!({
384        "table": table,
385        "schema": schema,
386        "trigger_count": triggers.len(),
387        "triggers": triggers
388    }))
389}
390
391/// 7. Create index
392pub async fn create_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
393    let index_name = params
394        .as_ref()
395        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
396        .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
397
398    let table = params
399        .as_ref()
400        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
401        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
402
403    let columns = params
404        .as_ref()
405        .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
406        .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
407
408    if columns.is_empty() {
409        return Err(MCPError::InvalidParams(
410            "'columns' array must not be empty".into(),
411        ));
412    }
413
414    validate_identifier(index_name, "index_name")?;
415    validate_identifier(table, "table")?;
416
417    let mut column_list = Vec::new();
418    for col in columns {
419        let col_name = col
420            .as_str()
421            .ok_or_else(|| MCPError::InvalidParams("Column names must be strings".into()))?;
422        validate_identifier(col_name, "column")?;
423        column_list.push(col_name.to_string());
424    }
425
426    let unique = params
427        .as_ref()
428        .and_then(|p| p.get("unique").and_then(|v| v.as_bool()))
429        .unwrap_or(false);
430
431    let concurrent = params
432        .as_ref()
433        .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
434        .unwrap_or(false);
435
436    let unique_str = if unique { "UNIQUE " } else { "" };
437    let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
438    let columns_str = column_list.join(", ");
439
440    let sql = format!(
441        "CREATE {}INDEX {} {} ON {}({})",
442        unique_str, concurrent_str, index_name, table, columns_str
443    );
444
445    client.execute(&sql, &[]).await?;
446
447    Ok(json!({
448        "status": "success",
449        "action": "CREATE INDEX",
450        "index_name": index_name,
451        "table": table,
452        "columns": column_list,
453        "unique": unique,
454        "concurrent": concurrent
455    }))
456}
457
458/// 8. Drop index
459pub async fn drop_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
460    let index_name = params
461        .as_ref()
462        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
463        .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
464
465    validate_identifier(index_name, "index_name")?;
466
467    let if_exists = params
468        .as_ref()
469        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
470        .unwrap_or(false);
471
472    let concurrent = params
473        .as_ref()
474        .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
475        .unwrap_or(false);
476
477    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
478    let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
479
480    let sql = format!(
481        "DROP INDEX {}{}{}",
482        if_exists_str, concurrent_str, index_name
483    );
484
485    client.execute(&sql, &[]).await?;
486
487    Ok(json!({
488        "status": "success",
489        "action": "DROP INDEX",
490        "index_name": index_name,
491        "if_exists": if_exists,
492        "concurrent": concurrent
493    }))
494}
495
496/// 9. Create partition
497pub async fn create_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
498    let table = params
499        .as_ref()
500        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
501        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
502
503    let partition_name = params
504        .as_ref()
505        .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
506        .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
507
508    let partition_type = params
509        .as_ref()
510        .and_then(|p| p.get("partition_type").and_then(|v| v.as_str()))
511        .ok_or_else(|| {
512            MCPError::InvalidParams("Missing 'partition_type' parameter (RANGE/LIST/HASH)".into())
513        })?;
514
515    validate_identifier(table, "table")?;
516    validate_identifier(partition_name, "partition_name")?;
517
518    let partition_type_upper = partition_type.to_uppercase();
519    if !["RANGE", "LIST", "HASH"].contains(&partition_type_upper.as_str()) {
520        return Err(MCPError::InvalidParams(format!(
521            "'partition_type' must be RANGE, LIST, or HASH (got {})",
522            partition_type
523        )));
524    }
525
526    let column = params
527        .as_ref()
528        .and_then(|p| p.get("column").and_then(|v| v.as_str()))
529        .ok_or_else(|| MCPError::InvalidParams("Missing 'column' parameter".into()))?;
530
531    validate_identifier(column, "column")?;
532
533    let values = params
534        .as_ref()
535        .and_then(|p| p.get("values").and_then(|v| v.as_str()))
536        .ok_or_else(|| MCPError::InvalidParams(
537            "Missing 'values' parameter (FOR RANGE: 'FROM (x) TO (y)' or FOR LIST: 'IN (values)' or FOR HASH: 'MODULUS n REMAINDER r')".into()
538        ))?;
539
540    if values.contains(';') || values.contains("--") {
541        return Err(MCPError::InvalidParams(
542            "Invalid 'values' parameter: semicolons and SQL comments not allowed".into(),
543        ));
544    }
545
546    let sql = format!(
547        "CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
548        partition_name, table, values
549    );
550
551    client.execute(&sql, &[]).await?;
552
553    Ok(json!({
554        "status": "success",
555        "action": "CREATE TABLE PARTITION",
556        "table": table,
557        "partition_name": partition_name,
558        "partition_type": partition_type,
559        "column": column
560    }))
561}
562
563/// 10. Drop partition
564pub async fn drop_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
565    let table = params
566        .as_ref()
567        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
568        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
569
570    let partition_name = params
571        .as_ref()
572        .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
573        .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
574
575    validate_identifier(partition_name, "partition_name")?;
576
577    let if_exists = params
578        .as_ref()
579        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
580        .unwrap_or(false);
581
582    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
583
584    let sql = format!("DROP TABLE {}{}", if_exists_str, partition_name);
585
586    client.execute(&sql, &[]).await?;
587
588    Ok(json!({
589        "status": "success",
590        "action": "DROP TABLE PARTITION",
591        "table": table,
592        "partition_name": partition_name,
593        "if_exists": if_exists
594    }))
595}
596
597/// 11. Create table
598pub async fn create_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
599    let table = params
600        .as_ref()
601        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
602        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
603
604    let columns = params
605        .as_ref()
606        .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
607        .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
608
609    if columns.is_empty() {
610        return Err(MCPError::InvalidParams(
611            "'columns' array must not be empty".into(),
612        ));
613    }
614
615    validate_identifier(table, "table")?;
616
617    let mut column_defs = Vec::new();
618    for (idx, col) in columns.iter().enumerate() {
619        let col_def = col.as_str().ok_or_else(|| {
620            MCPError::InvalidParams(format!(
621                "Column {} must be a string with format: 'name TYPE [constraints]'",
622                idx
623            ))
624        })?;
625
626        if col_def.is_empty() {
627            return Err(MCPError::InvalidParams(format!(
628                "Column {} definition cannot be empty",
629                idx
630            )));
631        }
632
633        if col_def.contains(';') || col_def.contains("--") {
634            return Err(MCPError::InvalidParams(format!(
635                "Column {} definition contains dangerous SQL patterns",
636                idx
637            )));
638        }
639
640        column_defs.push(col_def.to_string());
641    }
642
643    let columns_str = column_defs.join(", ");
644    let sql = format!("CREATE TABLE {} ({})", table, columns_str);
645
646    client.execute(&sql, &[]).await?;
647
648    Ok(json!({
649        "status": "success",
650        "action": "CREATE TABLE",
651        "table": table,
652        "column_count": columns.len()
653    }))
654}
655
656/// 12. Drop table
657pub async fn drop_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
658    let table = params
659        .as_ref()
660        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
661        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
662
663    validate_identifier(table, "table")?;
664
665    let if_exists = params
666        .as_ref()
667        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
668        .unwrap_or(false);
669
670    let cascade = params
671        .as_ref()
672        .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
673        .unwrap_or(false);
674
675    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
676    let cascade_str = if cascade { " CASCADE" } else { "" };
677
678    let sql = format!("DROP TABLE {}{}{}", if_exists_str, table, cascade_str);
679
680    client.execute(&sql, &[]).await?;
681
682    Ok(json!({
683        "status": "success",
684        "action": "DROP TABLE",
685        "table": table,
686        "if_exists": if_exists,
687        "cascade": cascade
688    }))
689}
690
691/// 13. Create view
692pub async fn create_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
693    let view_name = params
694        .as_ref()
695        .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
696        .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
697
698    let query = params
699        .as_ref()
700        .and_then(|p| p.get("query").and_then(|v| v.as_str()))
701        .ok_or_else(|| MCPError::InvalidParams("Missing 'query' parameter".into()))?;
702
703    validate_identifier(view_name, "view_name")?;
704
705    if query.trim().is_empty() {
706        return Err(MCPError::InvalidParams("'query' cannot be empty".into()));
707    }
708
709    let materialized = params
710        .as_ref()
711        .and_then(|p| p.get("materialized").and_then(|v| v.as_bool()))
712        .unwrap_or(false);
713
714    let or_replace = params
715        .as_ref()
716        .and_then(|p| p.get("or_replace").and_then(|v| v.as_bool()))
717        .unwrap_or(false);
718
719    let materialized_str = if materialized { "MATERIALIZED " } else { "" };
720    let or_replace_str = if or_replace { "OR REPLACE " } else { "" };
721
722    let sql = format!(
723        "CREATE {}{}VIEW {} AS {}",
724        or_replace_str, materialized_str, view_name, query
725    );
726
727    client.execute(&sql, &[]).await?;
728
729    Ok(json!({
730        "status": "success",
731        "action": "CREATE VIEW",
732        "view_name": view_name,
733        "materialized": materialized,
734        "or_replace": or_replace
735    }))
736}
737
738/// 14. Drop view
739pub async fn drop_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
740    let view_name = params
741        .as_ref()
742        .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
743        .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
744
745    validate_identifier(view_name, "view_name")?;
746
747    let if_exists = params
748        .as_ref()
749        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
750        .unwrap_or(false);
751
752    let cascade = params
753        .as_ref()
754        .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
755        .unwrap_or(false);
756
757    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
758    let cascade_str = if cascade { " CASCADE" } else { "" };
759
760    let sql = format!("DROP VIEW {}{}{}", if_exists_str, view_name, cascade_str);
761
762    client.execute(&sql, &[]).await?;
763
764    Ok(json!({
765        "status": "success",
766        "action": "DROP VIEW",
767        "view_name": view_name,
768        "if_exists": if_exists,
769        "cascade": cascade
770    }))
771}
772
773/// 15. Alter view
774pub async fn alter_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
775    let view_name = params
776        .as_ref()
777        .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
778        .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
779
780    validate_identifier(view_name, "view_name")?;
781
782    let rename_to = params
783        .as_ref()
784        .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
785
786    let set_schema = params
787        .as_ref()
788        .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
789
790    if rename_to.is_none() && set_schema.is_none() {
791        return Err(MCPError::InvalidParams(
792            "Must provide either 'rename_to' or 'set_schema' parameter".into(),
793        ));
794    }
795
796    let mut action_desc = Vec::new();
797
798    if let Some(new_name) = rename_to {
799        validate_identifier(new_name, "rename_to")?;
800        let sql = format!("ALTER VIEW {} RENAME TO {}", view_name, new_name);
801        client.execute(&sql, &[]).await?;
802        action_desc.push(format!("renamed to {}", new_name));
803    }
804
805    if let Some(schema) = set_schema {
806        validate_identifier(schema, "set_schema")?;
807        let sql = format!("ALTER VIEW {} SET SCHEMA {}", view_name, schema);
808        client.execute(&sql, &[]).await?;
809        action_desc.push(format!("moved to schema {}", schema));
810    }
811
812    Ok(json!({
813        "status": "success",
814        "action": "ALTER VIEW",
815        "view_name": view_name,
816        "changes": action_desc
817    }))
818}
819
820/// 16. Create schema
821pub async fn create_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
822    let schema_name = params
823        .as_ref()
824        .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
825        .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
826
827    validate_identifier(schema_name, "schema_name")?;
828
829    let if_not_exists = params
830        .as_ref()
831        .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
832        .unwrap_or(false);
833
834    let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
835
836    let sql = format!("CREATE SCHEMA {}{}", if_not_exists_str, schema_name);
837
838    client.execute(&sql, &[]).await?;
839
840    Ok(json!({
841        "status": "success",
842        "action": "CREATE SCHEMA",
843        "schema_name": schema_name,
844        "if_not_exists": if_not_exists
845    }))
846}
847
848/// 17. Drop schema
849pub async fn drop_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
850    let schema_name = params
851        .as_ref()
852        .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
853        .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
854
855    validate_identifier(schema_name, "schema_name")?;
856
857    let if_exists = params
858        .as_ref()
859        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
860        .unwrap_or(false);
861
862    let cascade = params
863        .as_ref()
864        .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
865        .unwrap_or(false);
866
867    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
868    let cascade_str = if cascade { " CASCADE" } else { "" };
869
870    let sql = format!(
871        "DROP SCHEMA {}{}{}",
872        if_exists_str, schema_name, cascade_str
873    );
874
875    client.execute(&sql, &[]).await?;
876
877    Ok(json!({
878        "status": "success",
879        "action": "DROP SCHEMA",
880        "schema_name": schema_name,
881        "if_exists": if_exists,
882        "cascade": cascade
883    }))
884}
885
886/// 18. Create sequence
887pub async fn create_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
888    let sequence_name = params
889        .as_ref()
890        .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
891        .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
892
893    validate_identifier(sequence_name, "sequence_name")?;
894
895    let if_not_exists = params
896        .as_ref()
897        .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
898        .unwrap_or(false);
899
900    let start = params
901        .as_ref()
902        .and_then(|p| p.get("start").and_then(|v| v.as_i64()))
903        .unwrap_or(1);
904
905    let increment = params
906        .as_ref()
907        .and_then(|p| p.get("increment").and_then(|v| v.as_i64()))
908        .unwrap_or(1);
909
910    let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
911
912    let sql = format!(
913        "CREATE SEQUENCE {}{} START {} INCREMENT {}",
914        if_not_exists_str, sequence_name, start, increment
915    );
916
917    client.execute(&sql, &[]).await?;
918
919    Ok(json!({
920        "status": "success",
921        "action": "CREATE SEQUENCE",
922        "sequence_name": sequence_name,
923        "start": start,
924        "increment": increment,
925        "if_not_exists": if_not_exists
926    }))
927}
928
929/// 19. Drop sequence
930pub async fn drop_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
931    let sequence_name = params
932        .as_ref()
933        .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
934        .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
935
936    validate_identifier(sequence_name, "sequence_name")?;
937
938    let if_exists = params
939        .as_ref()
940        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
941        .unwrap_or(false);
942
943    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
944
945    let sql = format!("DROP SEQUENCE {}{}", if_exists_str, sequence_name);
946
947    client.execute(&sql, &[]).await?;
948
949    Ok(json!({
950        "status": "success",
951        "action": "DROP SEQUENCE",
952        "sequence_name": sequence_name,
953        "if_exists": if_exists
954    }))
955}
956
957/// 20. Alter index
958pub async fn alter_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
959    let index_name = params
960        .as_ref()
961        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
962        .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
963
964    validate_identifier(index_name, "index_name")?;
965
966    let rename_to = params
967        .as_ref()
968        .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
969
970    let set_schema = params
971        .as_ref()
972        .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
973
974    if rename_to.is_none() && set_schema.is_none() {
975        return Err(MCPError::InvalidParams(
976            "Must provide either 'rename_to' or 'set_schema' parameter".into(),
977        ));
978    }
979
980    let mut action_desc = Vec::new();
981
982    if let Some(new_name) = rename_to {
983        validate_identifier(new_name, "rename_to")?;
984        let sql = format!("ALTER INDEX {} RENAME TO {}", index_name, new_name);
985        client.execute(&sql, &[]).await?;
986        action_desc.push(format!("renamed to {}", new_name));
987    }
988
989    if let Some(schema) = set_schema {
990        validate_identifier(schema, "set_schema")?;
991        let sql = format!("ALTER INDEX {} SET SCHEMA {}", index_name, schema);
992        client.execute(&sql, &[]).await?;
993        action_desc.push(format!("moved to schema {}", schema));
994    }
995
996    Ok(json!({
997        "status": "success",
998        "action": "ALTER INDEX",
999        "index_name": index_name,
1000        "changes": action_desc
1001    }))
1002}
1003
1004/// 21. List partitions
1005pub async fn list_partitions(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
1006    let table = params
1007        .as_ref()
1008        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
1009        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
1010
1011    validate_identifier(table, "table")?;
1012
1013    let rows = client
1014        .query(
1015            "SELECT child.relname as partition_name, pg_size_pretty(pg_relation_size(child.oid)) as size
1016             FROM pg_inherits
1017             JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
1018             JOIN pg_class child ON pg_inherits.inhrelid = child.oid
1019             JOIN pg_namespace ON child.relnamespace = pg_namespace.oid
1020             WHERE parent.relname = $1
1021             ORDER BY child.relname",
1022            &[&table],
1023        )
1024        .await?;
1025
1026    let partitions: Vec<Value> = rows
1027        .iter()
1028        .map(|row| {
1029            json!({
1030                "partition_name": row.get::<_, String>(0),
1031                "size": row.get::<_, String>(1),
1032            })
1033        })
1034        .collect();
1035
1036    Ok(json!({
1037        "table": table,
1038        "partition_count": partitions.len(),
1039        "partitions": partitions
1040    }))
1041}
1042
1043/// 22. Backup table
1044pub async fn backup_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
1045    let table = params
1046        .as_ref()
1047        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
1048        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
1049
1050    validate_identifier(table, "table")?;
1051
1052    let backup_name = format!("backup_{}", table);
1053    validate_identifier(&backup_name, "backup_name")?;
1054
1055    // Verify source table exists
1056    let _: (i32,) = client
1057        .query_one(
1058            "SELECT 1 FROM information_schema.tables
1059             WHERE table_name = $1 AND table_schema NOT IN ('pg_catalog', 'information_schema')",
1060            &[&table],
1061        )
1062        .await
1063        .map_err(|_| MCPError::InvalidParams(format!("Table '{}' does not exist", table)))?
1064        .try_get(0)
1065        .map(|val| (val,))
1066        .map_err(|_| MCPError::InvalidParams("Could not verify table existence".into()))?;
1067
1068    // Check if backup already exists
1069    let backup_exists = client
1070        .query_opt(
1071            "SELECT 1 FROM information_schema.tables WHERE table_name = $1",
1072            &[&backup_name],
1073        )
1074        .await?
1075        .is_some();
1076
1077    if backup_exists {
1078        return Err(MCPError::InvalidParams(format!(
1079            "Backup table '{}' already exists. Drop it first or use a different table.",
1080            backup_name
1081        )));
1082    }
1083
1084    // Create backup table with all columns and structure
1085    let create_backup_sql = format!("CREATE TABLE {} AS SELECT * FROM {}", backup_name, table);
1086    client.execute(&create_backup_sql, &[]).await?;
1087
1088    // Get column info for the original table
1089    let columns: Vec<(String, String, String)> = client
1090        .query(
1091            "SELECT column_name, data_type, is_nullable
1092             FROM information_schema.columns
1093             WHERE table_name = $1
1094             ORDER BY ordinal_position",
1095            &[&table],
1096        )
1097        .await?
1098        .iter()
1099        .map(|row| {
1100            (
1101                row.get::<_, String>(0),
1102                row.get::<_, String>(1),
1103                row.get::<_, String>(2),
1104            )
1105        })
1106        .collect();
1107
1108    // Get and copy indexes
1109    let indexes: Vec<String> = client
1110        .query(
1111            "SELECT indexname FROM pg_indexes WHERE tablename = $1 AND schemaname NOT IN ('pg_catalog')",
1112            &[&table],
1113        )
1114        .await?
1115        .iter()
1116        .map(|row| row.get::<_, String>(0))
1117        .collect();
1118
1119    let mut indexes_created = 0;
1120    for idx_name in indexes {
1121        let new_idx_name = format!("{}_on_{}", idx_name, backup_name);
1122        let idx_def: String = client
1123            .query_one(
1124                "SELECT indexdef FROM pg_indexes WHERE indexname = $1",
1125                &[&idx_name],
1126            )
1127            .await?
1128            .get(0);
1129
1130        let updated_def = idx_def
1131            .replace(&format!("ON {}", table), &format!("ON {}", backup_name))
1132            .replace(&idx_name, &new_idx_name);
1133
1134        if client.execute(&updated_def, &[]).await.is_ok() {
1135            indexes_created += 1;
1136        }
1137    }
1138
1139    // Get row count
1140    let row_count: (i64,) = client
1141        .query_one(&format!("SELECT COUNT(*) FROM {}", backup_name), &[])
1142        .await?
1143        .try_get(0)
1144        .map(|val| (val,))
1145        .map_err(|_| MCPError::InvalidParams("Could not count rows".into()))?;
1146
1147    Ok(json!({
1148        "status": "success",
1149        "action": "BACKUP TABLE",
1150        "original_table": table,
1151        "backup_table": backup_name,
1152        "rows_copied": row_count.0,
1153        "columns_copied": columns.len(),
1154        "indexes_created": indexes_created,
1155        "message": format!("Table '{}' backed up to '{}' with {} rows", table, backup_name, row_count.0)
1156    }))
1157}