mcp_postgres/actions/
data_io.rs1use crate::errors::Result as MCPResult;
2use crate::validation::quote_ident;
3use futures::SinkExt;
4use futures::StreamExt;
5use serde_json::{Value, json};
6use tokio_postgres::Client;
7
8pub async fn import_from_url(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
9 let url = params
10 .as_ref()
11 .and_then(|p| p.get("url").and_then(|v| v.as_str()))
12 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'url' parameter".into()))?;
13 let table = params
14 .as_ref()
15 .and_then(|p| p.get("table").and_then(|v| v.as_str()))
16 .ok_or_else(|| {
17 crate::errors::MCPError::InvalidParams("Missing 'table' parameter".into())
18 })?;
19 let schema = params
20 .as_ref()
21 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
22 .unwrap_or("public");
23 let delimiter = params
24 .as_ref()
25 .and_then(|p| p.get("delimiter").and_then(|v| v.as_str()))
26 .unwrap_or(",");
27 let header = params
28 .as_ref()
29 .and_then(|p| p.get("header").and_then(|v| v.as_bool()))
30 .unwrap_or(true);
31 let truncate = params
32 .as_ref()
33 .and_then(|p| p.get("truncate").and_then(|v| v.as_bool()))
34 .unwrap_or(false);
35 let columns = params
36 .as_ref()
37 .and_then(|p| p.get("columns").and_then(|v| v.as_str()));
38
39 let qualified = format!("{}.{}", quote_ident(schema), quote_ident(table));
40
41 if truncate {
42 client
43 .execute(&format!("TRUNCATE {}", qualified), &[])
44 .await?;
45 }
46
47 let resp = reqwest::get(url).await.map_err(|e| {
48 crate::errors::MCPError::InvalidParams(format!("Failed to fetch URL: {}", e))
49 })?;
50 let status = resp.status();
51 if !status.is_success() {
52 return Err(crate::errors::MCPError::InvalidParams(format!(
53 "URL returned HTTP {}",
54 status
55 )));
56 }
57 let content = resp.bytes().await.map_err(|e| {
58 crate::errors::MCPError::InvalidParams(format!("Failed to read response body: {}", e))
59 })?;
60
61 let col_clause = columns.map(|c| format!(" ({})", c)).unwrap_or_default();
62 let copy_sql = format!(
63 "COPY {} FROM STDIN (FORMAT csv, HEADER {}, DELIMITER '{}'){}",
64 qualified,
65 if header { "true" } else { "false" },
66 delimiter.replace('\'', "''"),
67 col_clause,
68 );
69
70 let mut sink = Box::pin(client.copy_in(©_sql).await?);
71 sink.as_mut().send(content.clone()).await?;
72 sink.as_mut().close().await?;
73
74 let count = 0i64;
75
76 Ok(json!({
77 "success": true,
78 "table": table,
79 "schema": schema,
80 "rows_imported": count,
81 "source": url,
82 }))
83}
84
85pub async fn export_csv(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
86 let query = params
87 .as_ref()
88 .and_then(|p| p.get("query").and_then(|v| v.as_str()));
89 let table = params
90 .as_ref()
91 .and_then(|p| p.get("table").and_then(|v| v.as_str()));
92 let schema = params
93 .as_ref()
94 .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
95 .unwrap_or("public");
96 let header = params
97 .as_ref()
98 .and_then(|p| p.get("header").and_then(|v| v.as_bool()))
99 .unwrap_or(true);
100 let delimiter = params
101 .as_ref()
102 .and_then(|p| p.get("delimiter").and_then(|v| v.as_str()))
103 .unwrap_or(",");
104 let limit = params
105 .as_ref()
106 .and_then(|p| p.get("limit").and_then(|v| v.as_i64()))
107 .unwrap_or(10000)
108 .min(100000);
109
110 let sql = match (query, table) {
111 (Some(q), _) => {
112 let trimmed = q.trim();
113 if trimmed.to_uppercase().starts_with("SELECT") {
114 format!("({}) AS _export", trimmed.trim_end_matches(';'))
115 } else {
116 return Err(crate::errors::MCPError::InvalidParams(
117 "Query must be a SELECT statement".into(),
118 ));
119 }
120 }
121 (None, Some(t)) => format!("{}.{}", quote_ident(schema), quote_ident(t)),
122 (None, None) => {
123 return Err(crate::errors::MCPError::InvalidParams(
124 "Either 'query' or 'table' is required".into(),
125 ));
126 }
127 };
128
129 let copy_sql = format!(
130 "COPY {} TO STDOUT (FORMAT csv, HEADER {}, DELIMITER '{}', LIMIT {})",
131 sql,
132 if header { "true" } else { "false" },
133 delimiter.replace('\'', "''"),
134 limit,
135 );
136
137 let stream = client.copy_out(©_sql).await?;
138 let mut stream = Box::pin(stream);
139 let mut output = Vec::new();
140 while let Some(chunk) = stream.next().await {
141 let chunk = chunk?;
142 output.extend_from_slice(&chunk);
143 }
144
145 let csv_text = String::from_utf8(output).map_err(|e| {
146 crate::errors::MCPError::InvalidParams(format!("Output is not valid UTF-8: {}", e))
147 })?;
148
149 Ok(json!({
150 "csv": csv_text,
151 "row_count": csv_text.lines().count().saturating_sub(if header { 1 } else { 0 }),
152 "format": "csv",
153 }))
154}