Skip to main content

mcp_postgres/actions/
schema.rs

1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::{MCPError, Result as MCPResult};
4
5const MAX_IDENTIFIER_LEN: usize = 255;
6
7fn validate_identifier(name: &str, label: &str) -> std::result::Result<(), MCPError> {
8    if name.is_empty() {
9        return Err(MCPError::InvalidParams(format!("'{label}' must not be empty")));
10    }
11    if name.len() > MAX_IDENTIFIER_LEN {
12        return Err(MCPError::InvalidParams(
13            format!("'{label}' exceeds maximum length of {MAX_IDENTIFIER_LEN} characters (got {})", name.len())
14        ));
15    }
16    Ok(())
17}
18
19/// 1. List all tables
20pub async fn list_tables(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
21    let rows = client
22        .query(
23            "SELECT table_schema, table_name, table_type
24             FROM information_schema.tables
25             WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
26             ORDER BY table_schema, table_name",
27            &[],
28        )
29        .await?;
30
31    let tables: Vec<Value> = rows
32        .iter()
33        .map(|row| {
34            json!({
35                "schema": row.get::<_, String>(0),
36                "name": row.get::<_, String>(1),
37                "type": row.get::<_, String>(2),
38            })
39        })
40        .collect();
41
42    Ok(json!({ "tables": tables }))
43}
44
45/// 2. Describe table structure
46pub async fn describe_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
47    let table_name = params
48        .as_ref()
49        .and_then(|p| p.get("table"))
50        .and_then(|v| v.as_str())
51        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table' parameter".into()))?;
52
53    validate_identifier(table_name, "table")?;
54
55    let rows = client
56        .query(
57            "SELECT column_name, data_type, is_nullable, column_default, ordinal_position
58             FROM information_schema.columns
59             WHERE table_name = $1
60             ORDER BY ordinal_position",
61            &[&table_name],
62        )
63        .await?;
64
65    let columns: Vec<Value> = rows
66        .iter()
67        .map(|row| {
68            json!({
69                "name": row.get::<_, String>(0),
70                "type": row.get::<_, String>(1),
71                "nullable": row.get::<_, String>(2),
72                "default": row.get::<_, Option<String>>(3),
73                "position": row.get::<_, i32>(4),
74            })
75        })
76        .collect();
77
78    Ok(json!({ "columns": columns }))
79}
80
81/// 3. List all indexes
82pub async fn list_indexes(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
83    let rows = client
84        .query(
85            "SELECT schemaname, tablename, indexname, indexdef
86             FROM pg_indexes
87             WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
88             ORDER BY schemaname, tablename, indexname",
89            &[],
90        )
91        .await?;
92
93    let indexes: Vec<Value> = rows
94        .iter()
95        .map(|row| {
96            json!({
97                "schema": row.get::<_, String>(0),
98                "table": row.get::<_, String>(1),
99                "name": row.get::<_, String>(2),
100                "definition": row.get::<_, String>(3),
101            })
102        })
103        .collect();
104
105    Ok(json!({ "indexes": indexes }))
106}
107
108/// 4. List schemas
109pub async fn list_schemas(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
110    let rows = client
111        .query(
112            "SELECT schema_name, schema_owner
113             FROM information_schema.schemata
114             WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
115             ORDER BY schema_name",
116            &[],
117        )
118        .await?;
119
120    let schemas: Vec<Value> = rows
121        .iter()
122        .map(|row| {
123            json!({
124                "name": row.get::<_, String>(0),
125                "owner": row.get::<_, String>(1),
126            })
127        })
128        .collect();
129
130    Ok(json!({ "schemas": schemas }))
131}
132
133/// 5. Show constraints
134pub async fn show_constraints(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
135    let rows = client
136        .query(
137            "SELECT table_schema, table_name, constraint_name, constraint_type
138             FROM information_schema.table_constraints
139             WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
140             ORDER BY table_schema, table_name, constraint_name",
141            &[],
142        )
143        .await?;
144
145    let constraints: Vec<Value> = rows
146        .iter()
147        .map(|row| {
148            json!({
149                "schema": row.get::<_, String>(0),
150                "table": row.get::<_, String>(1),
151                "name": row.get::<_, String>(2),
152                "type": row.get::<_, String>(3),
153            })
154        })
155        .collect();
156
157    Ok(json!({ "constraints": constraints }))
158}
159
160/// 5b. Get detailed object info (columns, constraints, indexes, FKs, descriptions)
161pub async fn get_object_details(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
162    let schema_name = params
163        .as_ref()
164        .and_then(|p| p.get("schema"))
165        .and_then(|v| v.as_str())
166        .unwrap_or("public");
167
168    if schema_name.len() > MAX_IDENTIFIER_LEN {
169        return Err(MCPError::InvalidParams(
170            format!("'schema' exceeds maximum length of {MAX_IDENTIFIER_LEN} characters (got {})", schema_name.len())
171        ));
172    }
173
174    let table_name = params
175        .as_ref()
176        .and_then(|p| p.get("table"))
177        .and_then(|v| v.as_str())
178        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
179
180    validate_identifier(table_name, "table")?;
181
182    let columns = client
183        .query(
184            "SELECT c.column_name::text, c.data_type::text, c.is_nullable::text,
185                    c.column_default::text, c.ordinal_position,
186                    COALESCE(pgd.description, '')::text AS column_description,
187                    CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END AS is_pk,
188                    CASE WHEN uc.column_name IS NOT NULL THEN true ELSE false END AS is_unique
189             FROM information_schema.columns c
190             LEFT JOIN pg_catalog.pg_statio_all_tables st
191                 ON st.relname = c.table_name AND st.schemaname = c.table_schema
192             LEFT JOIN pg_catalog.pg_description pgd
193                 ON pgd.objoid = st.relid AND pgd.objsubid = c.ordinal_position
194             LEFT JOIN (
195                 SELECT ku.column_name, tc.table_schema, tc.table_name
196                 FROM information_schema.table_constraints tc
197                 JOIN information_schema.key_column_usage ku
198                     ON tc.constraint_name = ku.constraint_name
199                     AND tc.table_schema = ku.table_schema
200                 WHERE tc.constraint_type = 'PRIMARY KEY'
201             ) pk ON pk.column_name = c.column_name
202                 AND pk.table_schema = c.table_schema
203                 AND pk.table_name = c.table_name
204             LEFT JOIN (
205                 SELECT ku.column_name, tc.table_schema, tc.table_name
206                 FROM information_schema.table_constraints tc
207                 JOIN information_schema.key_column_usage ku
208                     ON tc.constraint_name = ku.constraint_name
209                     AND tc.table_schema = ku.table_schema
210                 WHERE tc.constraint_type = 'UNIQUE'
211             ) uc ON uc.column_name = c.column_name
212                 AND uc.table_schema = c.table_schema
213                 AND uc.table_name = c.table_name
214             WHERE c.table_schema = $1 AND c.table_name = $2
215             ORDER BY c.ordinal_position",
216            &[&schema_name, &table_name],
217        )
218        .await?;
219
220    let cols: Vec<Value> = columns.iter().map(|row| {
221        json!({
222            "name": row.get::<_, String>(0),
223            "type": row.get::<_, String>(1),
224            "nullable": row.get::<_, String>(2) == "YES",
225            "default": row.get::<_, Option<String>>(3),
226            "position": row.get::<_, i32>(4),
227            "description": row.get::<_, String>(5),
228            "is_primary_key": row.get::<_, bool>(6),
229            "is_unique": row.get::<_, bool>(7),
230        })
231    }).collect();
232
233    let indexes = client
234        .query(
235            "SELECT indexname::text, indexdef::text
236             FROM pg_indexes
237             WHERE schemaname = $1 AND tablename = $2
238             ORDER BY indexname",
239            &[&schema_name, &table_name],
240        )
241        .await?;
242
243    let idxs: Vec<Value> = indexes.iter().map(|row| {
244        json!({
245            "name": row.get::<_, String>(0),
246            "definition": row.get::<_, String>(1),
247        })
248    }).collect();
249
250    let foreign_keys = client
251        .query(
252            "SELECT kcu.column_name::text,
253                    ccu.table_schema::text AS foreign_schema,
254                    ccu.table_name::text AS foreign_table,
255                    ccu.column_name::text AS foreign_column,
256                    rc.update_rule::text, rc.delete_rule::text
257             FROM information_schema.table_constraints tc
258             JOIN information_schema.key_column_usage kcu
259                 ON tc.constraint_name = kcu.constraint_name
260                 AND tc.table_schema = kcu.table_schema
261             JOIN information_schema.constraint_column_usage ccu
262                 ON tc.constraint_name = ccu.constraint_name
263                 AND tc.table_schema = ccu.table_schema
264             JOIN information_schema.referential_constraints rc
265                 ON tc.constraint_name = rc.constraint_name
266                 AND tc.table_schema = rc.constraint_schema
267             WHERE tc.constraint_type = 'FOREIGN KEY'
268                 AND tc.table_schema = $1 AND tc.table_name = $2
269             ORDER BY kcu.ordinal_position",
270            &[&schema_name, &table_name],
271        )
272        .await?;
273
274    let fks: Vec<Value> = foreign_keys.iter().map(|row| {
275        json!({
276            "column": row.get::<_, String>(0),
277            "references_schema": row.get::<_, String>(1),
278            "references_table": row.get::<_, String>(2),
279            "references_column": row.get::<_, String>(3),
280            "on_update": row.get::<_, String>(4),
281            "on_delete": row.get::<_, String>(5),
282        })
283    }).collect();
284
285    let constraints = client
286        .query(
287            "SELECT constraint_name::text, constraint_type::text
288             FROM information_schema.table_constraints
289             WHERE table_schema = $1 AND table_name = $2
290             ORDER BY constraint_name",
291            &[&schema_name, &table_name],
292        )
293        .await?;
294
295    let cons: Vec<Value> = constraints.iter().map(|row| {
296        json!({
297            "name": row.get::<_, String>(0),
298            "type": row.get::<_, String>(1),
299        })
300    }).collect();
301
302    let row_estimate = client
303        .query_one(
304            "SELECT n_live_tup FROM pg_stat_user_tables
305             WHERE schemaname = $1 AND relname = $2",
306            &[&schema_name, &table_name],
307        )
308        .await
309        .map(|r| r.get::<_, Option<f64>>(0))
310        .unwrap_or(None);
311
312    let table_size = client
313        .query_one(
314            "SELECT pg_size_pretty(pg_total_relation_size($1::regclass))",
315            &[&format!("{}.{}", schema_name, table_name)],
316        )
317        .await
318        .map(|r| r.get::<_, Option<String>>(0))
319        .unwrap_or(None);
320
321    Ok(json!({
322        "table": table_name,
323        "schema": schema_name,
324        "columns": cols,
325        "indexes": idxs,
326        "foreign_keys": fks,
327        "constraints": cons,
328        "estimated_rows": row_estimate,
329        "total_size": table_size,
330    }))
331}
332
333/// 6. List triggers
334pub async fn list_triggers(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
335    let table = params
336        .as_ref()
337        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
338        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
339
340    let schema = params
341        .as_ref()
342        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
343        .unwrap_or("public");
344
345    let limit = params
346        .as_ref()
347        .and_then(|p| p.get("limit").and_then(|v| v.as_i64()))
348        .unwrap_or(1000);
349
350    validate_identifier(table, "table")?;
351    validate_identifier(schema, "schema")?;
352
353    if !((1..=10000).contains(&limit)) {
354        return Err(MCPError::InvalidParams(
355            format!("'limit' must be between 1 and 10000 (got {})", limit)
356        ));
357    }
358
359    let rows = client
360        .query(
361            "SELECT trigger_name, event_object_table, event_manipulation,
362                    action_timing, action_statement, trigger_schema
363             FROM information_schema.triggers
364             WHERE event_object_table = $1 AND trigger_schema = $2
365             ORDER BY trigger_name
366             LIMIT $3",
367            &[&table, &schema, &limit],
368        )
369        .await?;
370
371    let triggers: Vec<Value> = rows
372        .iter()
373        .map(|row| {
374            json!({
375                "name": row.get::<_, String>(0),
376                "table": row.get::<_, String>(1),
377                "event": row.get::<_, String>(2),
378                "timing": row.get::<_, String>(3),
379                "statement": row.get::<_, String>(4),
380                "schema": row.get::<_, String>(5),
381            })
382        })
383        .collect();
384
385    Ok(json!({
386        "table": table,
387        "schema": schema,
388        "trigger_count": triggers.len(),
389        "triggers": triggers
390    }))
391}
392
393/// 7. Create index
394pub async fn create_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
395    let index_name = params
396        .as_ref()
397        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
398        .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
399
400    let table = params
401        .as_ref()
402        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
403        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
404
405    let columns = params
406        .as_ref()
407        .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
408        .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
409
410    if columns.is_empty() {
411        return Err(MCPError::InvalidParams("'columns' array must not be empty".into()));
412    }
413
414    validate_identifier(index_name, "index_name")?;
415    validate_identifier(table, "table")?;
416
417    let mut column_list = Vec::new();
418    for col in columns {
419        let col_name = col.as_str()
420            .ok_or_else(|| MCPError::InvalidParams("Column names must be strings".into()))?;
421        validate_identifier(col_name, "column")?;
422        column_list.push(col_name.to_string());
423    }
424
425    let unique = params
426        .as_ref()
427        .and_then(|p| p.get("unique").and_then(|v| v.as_bool()))
428        .unwrap_or(false);
429
430    let concurrent = params
431        .as_ref()
432        .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
433        .unwrap_or(false);
434
435    let unique_str = if unique { "UNIQUE " } else { "" };
436    let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
437    let columns_str = column_list.join(", ");
438
439    let sql = format!(
440        "CREATE {}INDEX {}{}ON {}({})",
441        unique_str,
442        concurrent_str,
443        index_name,
444        table,
445        columns_str
446    );
447
448    client.execute(&sql, &[]).await?;
449
450    Ok(json!({
451        "status": "success",
452        "action": "CREATE INDEX",
453        "index_name": index_name,
454        "table": table,
455        "columns": column_list,
456        "unique": unique,
457        "concurrent": concurrent
458    }))
459}
460
461/// 8. Drop index
462pub async fn drop_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
463    let index_name = params
464        .as_ref()
465        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
466        .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
467
468    validate_identifier(index_name, "index_name")?;
469
470    let if_exists = params
471        .as_ref()
472        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
473        .unwrap_or(false);
474
475    let concurrent = params
476        .as_ref()
477        .and_then(|p| p.get("concurrent").and_then(|v| v.as_bool()))
478        .unwrap_or(false);
479
480    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
481    let concurrent_str = if concurrent { "CONCURRENTLY " } else { "" };
482
483    let sql = format!(
484        "DROP INDEX {}{}{}",
485        if_exists_str,
486        concurrent_str,
487        index_name
488    );
489
490    client.execute(&sql, &[]).await?;
491
492    Ok(json!({
493        "status": "success",
494        "action": "DROP INDEX",
495        "index_name": index_name,
496        "if_exists": if_exists,
497        "concurrent": concurrent
498    }))
499}
500
501/// 9. Create partition
502pub async fn create_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
503    let table = params
504        .as_ref()
505        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
506        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
507
508    let partition_name = params
509        .as_ref()
510        .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
511        .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
512
513    let partition_type = params
514        .as_ref()
515        .and_then(|p| p.get("partition_type").and_then(|v| v.as_str()))
516        .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_type' parameter (RANGE/LIST/HASH)".into()))?;
517
518    validate_identifier(table, "table")?;
519    validate_identifier(partition_name, "partition_name")?;
520
521    let partition_type_upper = partition_type.to_uppercase();
522    if !["RANGE", "LIST", "HASH"].contains(&partition_type_upper.as_str()) {
523        return Err(MCPError::InvalidParams(
524            format!("'partition_type' must be RANGE, LIST, or HASH (got {})", partition_type)
525        ));
526    }
527
528    let column = params
529        .as_ref()
530        .and_then(|p| p.get("column").and_then(|v| v.as_str()))
531        .ok_or_else(|| MCPError::InvalidParams("Missing 'column' parameter".into()))?;
532
533    validate_identifier(column, "column")?;
534
535    let values = params
536        .as_ref()
537        .and_then(|p| p.get("values").and_then(|v| v.as_str()))
538        .ok_or_else(|| MCPError::InvalidParams(
539            "Missing 'values' parameter (FOR RANGE: 'FROM (x) TO (y)' or FOR LIST: 'IN (values)' or FOR HASH: 'MODULUS n REMAINDER r')".into()
540        ))?;
541
542    if values.contains(';') || values.contains("--") {
543        return Err(MCPError::InvalidParams(
544            "Invalid 'values' parameter: semicolons and SQL comments not allowed".into()
545        ));
546    }
547
548    let sql = format!(
549        "CREATE TABLE {} PARTITION OF {} FOR VALUES {}",
550        partition_name,
551        table,
552        values
553    );
554
555    client.execute(&sql, &[]).await?;
556
557    Ok(json!({
558        "status": "success",
559        "action": "CREATE TABLE PARTITION",
560        "table": table,
561        "partition_name": partition_name,
562        "partition_type": partition_type,
563        "column": column
564    }))
565}
566
567/// 10. Drop partition
568pub async fn drop_partition(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
569    let table = params
570        .as_ref()
571        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
572        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
573
574    let partition_name = params
575        .as_ref()
576        .and_then(|p| p.get("partition_name").and_then(|v| v.as_str()))
577        .ok_or_else(|| MCPError::InvalidParams("Missing 'partition_name' parameter".into()))?;
578
579    validate_identifier(partition_name, "partition_name")?;
580
581    let if_exists = params
582        .as_ref()
583        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
584        .unwrap_or(false);
585
586    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
587
588    let sql = format!(
589        "DROP TABLE {}{}",
590        if_exists_str,
591        partition_name
592    );
593
594    client.execute(&sql, &[]).await?;
595
596    Ok(json!({
597        "status": "success",
598        "action": "DROP TABLE PARTITION",
599        "table": table,
600        "partition_name": partition_name,
601        "if_exists": if_exists
602    }))
603}
604
605/// 11. Create table
606pub async fn create_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
607    let table = params
608        .as_ref()
609        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
610        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
611
612    let columns = params
613        .as_ref()
614        .and_then(|p| p.get("columns").and_then(|v| v.as_array()))
615        .ok_or_else(|| MCPError::InvalidParams("Missing 'columns' parameter (array)".into()))?;
616
617    if columns.is_empty() {
618        return Err(MCPError::InvalidParams("'columns' array must not be empty".into()));
619    }
620
621    validate_identifier(table, "table")?;
622
623    let mut column_defs = Vec::new();
624    for (idx, col) in columns.iter().enumerate() {
625        let col_def = col.as_str()
626            .ok_or_else(|| MCPError::InvalidParams(format!("Column {} must be a string with format: 'name TYPE [constraints]'", idx)))?;
627
628        if col_def.is_empty() {
629            return Err(MCPError::InvalidParams(format!("Column {} definition cannot be empty", idx)));
630        }
631
632        if col_def.contains(';') || col_def.contains("--") {
633            return Err(MCPError::InvalidParams(
634                format!("Column {} definition contains dangerous SQL patterns", idx)
635            ));
636        }
637
638        column_defs.push(col_def.to_string());
639    }
640
641    let columns_str = column_defs.join(", ");
642    let sql = format!("CREATE TABLE {} ({})", table, columns_str);
643
644    client.execute(&sql, &[]).await?;
645
646    Ok(json!({
647        "status": "success",
648        "action": "CREATE TABLE",
649        "table": table,
650        "column_count": columns.len()
651    }))
652}
653
654/// 12. Drop table
655pub async fn drop_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
656    let table = params
657        .as_ref()
658        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
659        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
660
661    validate_identifier(table, "table")?;
662
663    let if_exists = params
664        .as_ref()
665        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
666        .unwrap_or(false);
667
668    let cascade = params
669        .as_ref()
670        .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
671        .unwrap_or(false);
672
673    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
674    let cascade_str = if cascade { " CASCADE" } else { "" };
675
676    let sql = format!("DROP TABLE {}{}{}", if_exists_str, table, cascade_str);
677
678    client.execute(&sql, &[]).await?;
679
680    Ok(json!({
681        "status": "success",
682        "action": "DROP TABLE",
683        "table": table,
684        "if_exists": if_exists,
685        "cascade": cascade
686    }))
687}
688
689/// 13. Create view
690pub async fn create_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
691    let view_name = params
692        .as_ref()
693        .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
694        .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
695
696    let query = params
697        .as_ref()
698        .and_then(|p| p.get("query").and_then(|v| v.as_str()))
699        .ok_or_else(|| MCPError::InvalidParams("Missing 'query' parameter".into()))?;
700
701    validate_identifier(view_name, "view_name")?;
702
703    if query.trim().is_empty() {
704        return Err(MCPError::InvalidParams("'query' cannot be empty".into()));
705    }
706
707    let materialized = params
708        .as_ref()
709        .and_then(|p| p.get("materialized").and_then(|v| v.as_bool()))
710        .unwrap_or(false);
711
712    let or_replace = params
713        .as_ref()
714        .and_then(|p| p.get("or_replace").and_then(|v| v.as_bool()))
715        .unwrap_or(false);
716
717    let materialized_str = if materialized { "MATERIALIZED " } else { "" };
718    let or_replace_str = if or_replace { "OR REPLACE " } else { "" };
719
720    let sql = format!("CREATE {}{}VIEW {} AS {}", or_replace_str, materialized_str, view_name, query);
721
722    client.execute(&sql, &[]).await?;
723
724    Ok(json!({
725        "status": "success",
726        "action": "CREATE VIEW",
727        "view_name": view_name,
728        "materialized": materialized,
729        "or_replace": or_replace
730    }))
731}
732
733/// 14. Drop view
734pub async fn drop_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
735    let view_name = params
736        .as_ref()
737        .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
738        .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
739
740    validate_identifier(view_name, "view_name")?;
741
742    let if_exists = params
743        .as_ref()
744        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
745        .unwrap_or(false);
746
747    let cascade = params
748        .as_ref()
749        .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
750        .unwrap_or(false);
751
752    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
753    let cascade_str = if cascade { " CASCADE" } else { "" };
754
755    let sql = format!("DROP VIEW {}{}{}", if_exists_str, view_name, cascade_str);
756
757    client.execute(&sql, &[]).await?;
758
759    Ok(json!({
760        "status": "success",
761        "action": "DROP VIEW",
762        "view_name": view_name,
763        "if_exists": if_exists,
764        "cascade": cascade
765    }))
766}
767
768/// 15. Alter view
769pub async fn alter_view(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
770    let view_name = params
771        .as_ref()
772        .and_then(|p| p.get("view_name").and_then(|v| v.as_str()))
773        .ok_or_else(|| MCPError::InvalidParams("Missing 'view_name' parameter".into()))?;
774
775    validate_identifier(view_name, "view_name")?;
776
777    let rename_to = params
778        .as_ref()
779        .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
780
781    let set_schema = params
782        .as_ref()
783        .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
784
785    if rename_to.is_none() && set_schema.is_none() {
786        return Err(MCPError::InvalidParams(
787            "Must provide either 'rename_to' or 'set_schema' parameter".into()
788        ));
789    }
790
791    let mut action_desc = Vec::new();
792
793    if let Some(new_name) = rename_to {
794        validate_identifier(new_name, "rename_to")?;
795        let sql = format!("ALTER VIEW {} RENAME TO {}", view_name, new_name);
796        client.execute(&sql, &[]).await?;
797        action_desc.push(format!("renamed to {}", new_name));
798    }
799
800    if let Some(schema) = set_schema {
801        validate_identifier(schema, "set_schema")?;
802        let sql = format!("ALTER VIEW {} SET SCHEMA {}", view_name, schema);
803        client.execute(&sql, &[]).await?;
804        action_desc.push(format!("moved to schema {}", schema));
805    }
806
807    Ok(json!({
808        "status": "success",
809        "action": "ALTER VIEW",
810        "view_name": view_name,
811        "changes": action_desc
812    }))
813}
814
815/// 16. Create schema
816pub async fn create_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
817    let schema_name = params
818        .as_ref()
819        .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
820        .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
821
822    validate_identifier(schema_name, "schema_name")?;
823
824    let if_not_exists = params
825        .as_ref()
826        .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
827        .unwrap_or(false);
828
829    let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
830
831    let sql = format!("CREATE SCHEMA {}{}", if_not_exists_str, schema_name);
832
833    client.execute(&sql, &[]).await?;
834
835    Ok(json!({
836        "status": "success",
837        "action": "CREATE SCHEMA",
838        "schema_name": schema_name,
839        "if_not_exists": if_not_exists
840    }))
841}
842
843/// 17. Drop schema
844pub async fn drop_schema(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
845    let schema_name = params
846        .as_ref()
847        .and_then(|p| p.get("schema_name").and_then(|v| v.as_str()))
848        .ok_or_else(|| MCPError::InvalidParams("Missing 'schema_name' parameter".into()))?;
849
850    validate_identifier(schema_name, "schema_name")?;
851
852    let if_exists = params
853        .as_ref()
854        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
855        .unwrap_or(false);
856
857    let cascade = params
858        .as_ref()
859        .and_then(|p| p.get("cascade").and_then(|v| v.as_bool()))
860        .unwrap_or(false);
861
862    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
863    let cascade_str = if cascade { " CASCADE" } else { "" };
864
865    let sql = format!("DROP SCHEMA {}{}{}", if_exists_str, schema_name, cascade_str);
866
867    client.execute(&sql, &[]).await?;
868
869    Ok(json!({
870        "status": "success",
871        "action": "DROP SCHEMA",
872        "schema_name": schema_name,
873        "if_exists": if_exists,
874        "cascade": cascade
875    }))
876}
877
878/// 18. Create sequence
879pub async fn create_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
880    let sequence_name = params
881        .as_ref()
882        .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
883        .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
884
885    validate_identifier(sequence_name, "sequence_name")?;
886
887    let if_not_exists = params
888        .as_ref()
889        .and_then(|p| p.get("if_not_exists").and_then(|v| v.as_bool()))
890        .unwrap_or(false);
891
892    let start = params
893        .as_ref()
894        .and_then(|p| p.get("start").and_then(|v| v.as_i64()))
895        .unwrap_or(1);
896
897    let increment = params
898        .as_ref()
899        .and_then(|p| p.get("increment").and_then(|v| v.as_i64()))
900        .unwrap_or(1);
901
902    let if_not_exists_str = if if_not_exists { "IF NOT EXISTS " } else { "" };
903
904    let sql = format!(
905        "CREATE SEQUENCE {}{}START {} INCREMENT {}",
906        if_not_exists_str,
907        sequence_name,
908        start,
909        increment
910    );
911
912    client.execute(&sql, &[]).await?;
913
914    Ok(json!({
915        "status": "success",
916        "action": "CREATE SEQUENCE",
917        "sequence_name": sequence_name,
918        "start": start,
919        "increment": increment,
920        "if_not_exists": if_not_exists
921    }))
922}
923
924/// 19. Drop sequence
925pub async fn drop_sequence(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
926    let sequence_name = params
927        .as_ref()
928        .and_then(|p| p.get("sequence_name").and_then(|v| v.as_str()))
929        .ok_or_else(|| MCPError::InvalidParams("Missing 'sequence_name' parameter".into()))?;
930
931    validate_identifier(sequence_name, "sequence_name")?;
932
933    let if_exists = params
934        .as_ref()
935        .and_then(|p| p.get("if_exists").and_then(|v| v.as_bool()))
936        .unwrap_or(false);
937
938    let if_exists_str = if if_exists { "IF EXISTS " } else { "" };
939
940    let sql = format!("DROP SEQUENCE {}{}", if_exists_str, sequence_name);
941
942    client.execute(&sql, &[]).await?;
943
944    Ok(json!({
945        "status": "success",
946        "action": "DROP SEQUENCE",
947        "sequence_name": sequence_name,
948        "if_exists": if_exists
949    }))
950}
951
952/// 20. Alter index
953pub async fn alter_index(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
954    let index_name = params
955        .as_ref()
956        .and_then(|p| p.get("index_name").and_then(|v| v.as_str()))
957        .ok_or_else(|| MCPError::InvalidParams("Missing 'index_name' parameter".into()))?;
958
959    validate_identifier(index_name, "index_name")?;
960
961    let rename_to = params
962        .as_ref()
963        .and_then(|p| p.get("rename_to").and_then(|v| v.as_str()));
964
965    let set_schema = params
966        .as_ref()
967        .and_then(|p| p.get("set_schema").and_then(|v| v.as_str()));
968
969    if rename_to.is_none() && set_schema.is_none() {
970        return Err(MCPError::InvalidParams(
971            "Must provide either 'rename_to' or 'set_schema' parameter".into()
972        ));
973    }
974
975    let mut action_desc = Vec::new();
976
977    if let Some(new_name) = rename_to {
978        validate_identifier(new_name, "rename_to")?;
979        let sql = format!("ALTER INDEX {} RENAME TO {}", index_name, new_name);
980        client.execute(&sql, &[]).await?;
981        action_desc.push(format!("renamed to {}", new_name));
982    }
983
984    if let Some(schema) = set_schema {
985        validate_identifier(schema, "set_schema")?;
986        let sql = format!("ALTER INDEX {} SET SCHEMA {}", index_name, schema);
987        client.execute(&sql, &[]).await?;
988        action_desc.push(format!("moved to schema {}", schema));
989    }
990
991    Ok(json!({
992        "status": "success",
993        "action": "ALTER INDEX",
994        "index_name": index_name,
995        "changes": action_desc
996    }))
997}
998
999/// 21. List partitions
1000pub async fn list_partitions(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
1001    let table = params
1002        .as_ref()
1003        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
1004        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
1005
1006    validate_identifier(table, "table")?;
1007
1008    let rows = client
1009        .query(
1010            "SELECT inhrelname as partition_name, pg_size_pretty(pg_relation_size(inhrelid)) as size
1011             FROM pg_inherits
1012             JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
1013             JOIN pg_class child ON pg_inherits.inhrelid = child.oid
1014             JOIN pg_namespace ON child.relnamespace = pg_namespace.oid
1015             WHERE parent.relname = $1
1016             ORDER BY child.relname",
1017            &[&table],
1018        )
1019        .await?;
1020
1021    let partitions: Vec<Value> = rows
1022        .iter()
1023        .map(|row| {
1024            json!({
1025                "partition_name": row.get::<_, String>(0),
1026                "size": row.get::<_, String>(1),
1027            })
1028        })
1029        .collect();
1030
1031    Ok(json!({
1032        "table": table,
1033        "partition_count": partitions.len(),
1034        "partitions": partitions
1035    }))
1036}
1037
1038/// 22. Backup table
1039pub async fn backup_table(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
1040    let table = params
1041        .as_ref()
1042        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
1043        .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
1044
1045    validate_identifier(table, "table")?;
1046
1047    let backup_name = format!("backup_{}", table);
1048    validate_identifier(&backup_name, "backup_name")?;
1049
1050    // Verify source table exists
1051    let _: (i64,) = client
1052        .query_one(
1053            "SELECT 1 FROM information_schema.tables
1054             WHERE table_name = $1 AND table_schema NOT IN ('pg_catalog', 'information_schema')",
1055            &[&table],
1056        )
1057        .await
1058        .map_err(|_| MCPError::InvalidParams(format!("Table '{}' does not exist", table)))?
1059        .try_get(0)
1060        .map(|val| (val,))
1061        .map_err(|_| MCPError::InvalidParams("Could not verify table existence".into()))?;
1062
1063    // Check if backup already exists
1064    let backup_exists = client
1065        .query_opt(
1066            "SELECT 1 FROM information_schema.tables WHERE table_name = $1",
1067            &[&backup_name],
1068        )
1069        .await?
1070        .is_some();
1071
1072    if backup_exists {
1073        return Err(MCPError::InvalidParams(
1074            format!("Backup table '{}' already exists. Drop it first or use a different table.", backup_name)
1075        ));
1076    }
1077
1078    // Create backup table with all columns and structure
1079    let create_backup_sql = format!("CREATE TABLE {} AS SELECT * FROM {}", backup_name, table);
1080    client.execute(&create_backup_sql, &[]).await?;
1081
1082    // Get column info for the original table
1083    let columns: Vec<(String, String, String)> = client
1084        .query(
1085            "SELECT column_name, data_type, is_nullable
1086             FROM information_schema.columns
1087             WHERE table_name = $1
1088             ORDER BY ordinal_position",
1089            &[&table],
1090        )
1091        .await?
1092        .iter()
1093        .map(|row| {
1094            (
1095                row.get::<_, String>(0),
1096                row.get::<_, String>(1),
1097                row.get::<_, String>(2),
1098            )
1099        })
1100        .collect();
1101
1102    // Get and copy indexes
1103    let indexes: Vec<String> = client
1104        .query(
1105            "SELECT indexname FROM pg_indexes WHERE tablename = $1 AND schemaname NOT IN ('pg_catalog')",
1106            &[&table],
1107        )
1108        .await?
1109        .iter()
1110        .map(|row| row.get::<_, String>(0))
1111        .collect();
1112
1113    let mut indexes_created = 0;
1114    for idx_name in indexes {
1115        let new_idx_name = format!("{}_on_{}", idx_name, backup_name);
1116        let idx_def: String = client
1117            .query_one(
1118                "SELECT indexdef FROM pg_indexes WHERE indexname = $1",
1119                &[&idx_name],
1120            )
1121            .await?
1122            .get(0);
1123
1124        let updated_def = idx_def
1125            .replace(&format!("ON {}", table), &format!("ON {}", backup_name))
1126            .replace(&idx_name, &new_idx_name);
1127
1128        if client.execute(&updated_def, &[]).await.is_ok() {
1129            indexes_created += 1;
1130        }
1131    }
1132
1133    // Get row count
1134    let row_count: (i64,) = client
1135        .query_one(&format!("SELECT COUNT(*) FROM {}", backup_name), &[])
1136        .await?
1137        .try_get(0)
1138        .map(|val| (val,))
1139        .map_err(|_| MCPError::InvalidParams("Could not count rows".into()))?;
1140
1141    Ok(json!({
1142        "status": "success",
1143        "action": "BACKUP TABLE",
1144        "original_table": table,
1145        "backup_table": backup_name,
1146        "rows_copied": row_count.0,
1147        "columns_copied": columns.len(),
1148        "indexes_created": indexes_created,
1149        "message": format!("Table '{}' backed up to '{}' with {} rows", table, backup_name, row_count.0)
1150    }))
1151}