Skip to main content

mcp_postgres/actions/
data_io.rs

1use crate::errors::Result as MCPResult;
2use crate::validation::quote_ident;
3use futures::SinkExt;
4use futures::StreamExt;
5use serde_json::{Value, json};
6use tokio_postgres::Client;
7
8pub async fn import_from_url(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
9    let url = params
10        .as_ref()
11        .and_then(|p| p.get("url").and_then(|v| v.as_str()))
12        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'url' parameter".into()))?;
13    let table = params
14        .as_ref()
15        .and_then(|p| p.get("table").and_then(|v| v.as_str()))
16        .ok_or_else(|| {
17            crate::errors::MCPError::InvalidParams("Missing 'table' parameter".into())
18        })?;
19    let schema = params
20        .as_ref()
21        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
22        .unwrap_or("public");
23    let delimiter = params
24        .as_ref()
25        .and_then(|p| p.get("delimiter").and_then(|v| v.as_str()))
26        .unwrap_or(",");
27    let header = params
28        .as_ref()
29        .and_then(|p| p.get("header").and_then(|v| v.as_bool()))
30        .unwrap_or(true);
31    let truncate = params
32        .as_ref()
33        .and_then(|p| p.get("truncate").and_then(|v| v.as_bool()))
34        .unwrap_or(false);
35    let columns = params
36        .as_ref()
37        .and_then(|p| p.get("columns").and_then(|v| v.as_str()));
38
39    let qualified = format!("{}.{}", quote_ident(schema), quote_ident(table));
40
41    if truncate {
42        client
43            .execute(&format!("TRUNCATE {}", qualified), &[])
44            .await?;
45    }
46
47    let resp = reqwest::get(url).await.map_err(|e| {
48        crate::errors::MCPError::InvalidParams(format!("Failed to fetch URL: {}", e))
49    })?;
50    let status = resp.status();
51    if !status.is_success() {
52        return Err(crate::errors::MCPError::InvalidParams(format!(
53            "URL returned HTTP {}",
54            status
55        )));
56    }
57    let content = resp.bytes().await.map_err(|e| {
58        crate::errors::MCPError::InvalidParams(format!("Failed to read response body: {}", e))
59    })?;
60
61    let col_clause = columns.map(|c| format!(" ({})", c)).unwrap_or_default();
62    let copy_sql = format!(
63        "COPY {} FROM STDIN (FORMAT csv, HEADER {}, DELIMITER '{}'){}",
64        qualified,
65        if header { "true" } else { "false" },
66        delimiter.replace('\'', "''"),
67        col_clause,
68    );
69
70    let mut sink = Box::pin(client.copy_in(&copy_sql).await?);
71    sink.as_mut().send(content.clone()).await?;
72    sink.as_mut().close().await?;
73
74    let count = 0i64;
75
76    Ok(json!({
77        "success": true,
78        "table": table,
79        "schema": schema,
80        "rows_imported": count,
81        "source": url,
82    }))
83}
84
85pub async fn export_csv(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
86    let query = params
87        .as_ref()
88        .and_then(|p| p.get("query").and_then(|v| v.as_str()));
89    let table = params
90        .as_ref()
91        .and_then(|p| p.get("table").and_then(|v| v.as_str()));
92    let schema = params
93        .as_ref()
94        .and_then(|p| p.get("schema").and_then(|v| v.as_str()))
95        .unwrap_or("public");
96    let header = params
97        .as_ref()
98        .and_then(|p| p.get("header").and_then(|v| v.as_bool()))
99        .unwrap_or(true);
100    let delimiter = params
101        .as_ref()
102        .and_then(|p| p.get("delimiter").and_then(|v| v.as_str()))
103        .unwrap_or(",");
104    let limit = params
105        .as_ref()
106        .and_then(|p| p.get("limit").and_then(|v| v.as_i64()))
107        .unwrap_or(10000)
108        .min(100000);
109
110    let sql = match (query, table) {
111        (Some(q), _) => {
112            let trimmed = q.trim();
113            if trimmed.to_uppercase().starts_with("SELECT") {
114                format!("({}) AS _export", trimmed.trim_end_matches(';'))
115            } else {
116                return Err(crate::errors::MCPError::InvalidParams(
117                    "Query must be a SELECT statement".into(),
118                ));
119            }
120        }
121        (None, Some(t)) => format!("{}.{}", quote_ident(schema), quote_ident(t)),
122        (None, None) => {
123            return Err(crate::errors::MCPError::InvalidParams(
124                "Either 'query' or 'table' is required".into(),
125            ));
126        }
127    };
128
129    let copy_sql = format!(
130        "COPY {} TO STDOUT (FORMAT csv, HEADER {}, DELIMITER '{}', LIMIT {})",
131        sql,
132        if header { "true" } else { "false" },
133        delimiter.replace('\'', "''"),
134        limit,
135    );
136
137    let stream = client.copy_out(&copy_sql).await?;
138    let mut stream = Box::pin(stream);
139    let mut output = Vec::new();
140    while let Some(chunk) = stream.next().await {
141        let chunk = chunk?;
142        output.extend_from_slice(&chunk);
143    }
144
145    let csv_text = String::from_utf8(output).map_err(|e| {
146        crate::errors::MCPError::InvalidParams(format!("Output is not valid UTF-8: {}", e))
147    })?;
148
149    Ok(json!({
150        "csv": csv_text,
151        "row_count": csv_text.lines().count().saturating_sub(if header { 1 } else { 0 }),
152        "format": "csv",
153    }))
154}