mcp_postgres/actions/
data_io.rs1use futures::SinkExt;
2use futures::StreamExt;
3use serde_json::{json, Value};
4use tokio_postgres::Client;
5use crate::errors::Result as MCPResult;
6use crate::validation::quote_ident;
7
8pub async fn import_from_url(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
9 let url = params.as_ref().and_then(|p| p.get("url").and_then(|v| v.as_str()))
10 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'url' parameter".into()))?;
11 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()))
12 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table' parameter".into()))?;
13 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
14 let delimiter = params.as_ref().and_then(|p| p.get("delimiter").and_then(|v| v.as_str())).unwrap_or(",");
15 let header = params.as_ref().and_then(|p| p.get("header").and_then(|v| v.as_bool())).unwrap_or(true);
16 let truncate = params.as_ref().and_then(|p| p.get("truncate").and_then(|v| v.as_bool())).unwrap_or(false);
17 let columns = params.as_ref().and_then(|p| p.get("columns").and_then(|v| v.as_str()));
18
19 let qualified = format!("{}.{}", quote_ident(schema), quote_ident(table));
20
21 if truncate {
22 client.execute(&format!("TRUNCATE {}", qualified), &[]).await?;
23 }
24
25 let resp = reqwest::get(url).await
26 .map_err(|e| crate::errors::MCPError::InvalidParams(format!("Failed to fetch URL: {}", e)))?;
27 let status = resp.status();
28 if !status.is_success() {
29 return Err(crate::errors::MCPError::InvalidParams(format!("URL returned HTTP {}", status)));
30 }
31 let content = resp.bytes().await
32 .map_err(|e| crate::errors::MCPError::InvalidParams(format!("Failed to read response body: {}", e)))?;
33
34 let col_clause = columns.map(|c| format!(" ({})", c)).unwrap_or_default();
35 let copy_sql = format!(
36 "COPY {} FROM STDIN (FORMAT csv, HEADER {}, DELIMITER '{}'){}",
37 qualified,
38 if header { "true" } else { "false" },
39 delimiter.replace('\'', "''"),
40 col_clause,
41 );
42
43 let mut sink = Box::pin(client.copy_in(©_sql).await?);
44 sink.as_mut().send(content.clone()).await?;
45 sink.as_mut().close().await?;
46
47 let count = 0i64;
48
49 Ok(json!({
50 "success": true,
51 "table": table,
52 "schema": schema,
53 "rows_imported": count,
54 "source": url,
55 }))
56}
57
58pub async fn export_csv(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
59 let query = params.as_ref().and_then(|p| p.get("query").and_then(|v| v.as_str()));
60 let table = params.as_ref().and_then(|p| p.get("table").and_then(|v| v.as_str()));
61 let schema = params.as_ref().and_then(|p| p.get("schema").and_then(|v| v.as_str())).unwrap_or("public");
62 let header = params.as_ref().and_then(|p| p.get("header").and_then(|v| v.as_bool())).unwrap_or(true);
63 let delimiter = params.as_ref().and_then(|p| p.get("delimiter").and_then(|v| v.as_str())).unwrap_or(",");
64 let limit = params.as_ref().and_then(|p| p.get("limit").and_then(|v| v.as_i64())).unwrap_or(10000).min(100000);
65
66 let sql = match (query, table) {
67 (Some(q), _) => {
68 let trimmed = q.trim();
69 if trimmed.to_uppercase().starts_with("SELECT") {
70 format!("({}) AS _export", trimmed.trim_end_matches(';'))
71 } else {
72 return Err(crate::errors::MCPError::InvalidParams("Query must be a SELECT statement".into()));
73 }
74 }
75 (None, Some(t)) => format!("{}.{}", quote_ident(schema), quote_ident(t)),
76 (None, None) => return Err(crate::errors::MCPError::InvalidParams("Either 'query' or 'table' is required".into())),
77 };
78
79 let copy_sql = format!(
80 "COPY {} TO STDOUT (FORMAT csv, HEADER {}, DELIMITER '{}', LIMIT {})",
81 sql,
82 if header { "true" } else { "false" },
83 delimiter.replace('\'', "''"),
84 limit,
85 );
86
87 let stream = client.copy_out(©_sql).await?;
88 let mut stream = Box::pin(stream);
89 let mut output = Vec::new();
90 while let Some(chunk) = stream.next().await {
91 let chunk = chunk?;
92 output.extend_from_slice(&chunk);
93 }
94
95 let csv_text = String::from_utf8(output)
96 .map_err(|e| crate::errors::MCPError::InvalidParams(format!("Output is not valid UTF-8: {}", e)))?;
97
98 Ok(json!({
99 "csv": csv_text,
100 "row_count": csv_text.lines().count().saturating_sub(if header { 1 } else { 0 }),
101 "format": "csv",
102 }))
103}