Skip to main content

database_mcp_postgres/
operations.rs

1//! `PostgreSQL` database query operations.
2
3use database_mcp_backend::error::AppError;
4use database_mcp_backend::identifier::validate_identifier;
5use serde_json::{Value, json};
6use sqlx::postgres::PgRow;
7use sqlx_to_json::RowExt;
8
9use super::PostgresBackend;
10
11impl PostgresBackend {
12    // `list_databases` uses the default pool intentionally — `pg_database`
13    // is a server-wide catalog that returns all databases regardless of
14    // which database the connection targets.
15    /// Lists all accessible databases.
16    ///
17    /// # Errors
18    ///
19    /// Returns [`AppError`] if the query fails.
20    pub async fn list_databases(&self) -> Result<Vec<String>, AppError> {
21        let pool = self.get_pool(None).await?;
22        let rows: Vec<(String,)> =
23            sqlx::query_as("SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname")
24                .fetch_all(&pool)
25                .await
26                .map_err(|e| AppError::Query(e.to_string()))?;
27        Ok(rows.into_iter().map(|r| r.0).collect())
28    }
29
30    /// Lists all tables in a database.
31    ///
32    /// # Errors
33    ///
34    /// Returns [`AppError`] if the identifier is invalid or the query fails.
35    pub async fn list_tables(&self, database: &str) -> Result<Vec<String>, AppError> {
36        let db = if database.is_empty() { None } else { Some(database) };
37        let pool = self.get_pool(db).await?;
38        let rows: Vec<(String,)> =
39            sqlx::query_as("SELECT tablename FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename")
40                .fetch_all(&pool)
41                .await
42                .map_err(|e| AppError::Query(e.to_string()))?;
43        Ok(rows.into_iter().map(|r| r.0).collect())
44    }
45
46    /// Executes a SQL query and returns rows as JSON.
47    ///
48    /// # Errors
49    ///
50    /// Returns [`AppError`] if the query fails.
51    pub async fn execute_query(&self, sql: &str, database: Option<&str>) -> Result<Value, AppError> {
52        let pool = self.get_pool(database).await?;
53        let rows: Vec<PgRow> = sqlx::query(sql)
54            .fetch_all(&pool)
55            .await
56            .map_err(|e| AppError::Query(e.to_string()))?;
57        Ok(Value::Array(rows.iter().map(RowExt::to_json).collect()))
58    }
59
60    /// Creates a database if it doesn't exist.
61    ///
62    /// # Errors
63    ///
64    /// Returns [`AppError`] if read-only or the query fails.
65    pub async fn create_database(&self, name: &str) -> Result<Value, AppError> {
66        if self.read_only {
67            return Err(AppError::ReadOnlyViolation);
68        }
69        validate_identifier(name)?;
70
71        let pool = self.get_pool(None).await?;
72
73        // PostgreSQL CREATE DATABASE can't use parameterized queries
74        sqlx::query(&format!("CREATE DATABASE {}", Self::quote_identifier(name)))
75            .execute(&pool)
76            .await
77            .map_err(|e| {
78                let msg = e.to_string();
79                if msg.contains("already exists") {
80                    return AppError::Query(format!("Database '{name}' already exists."));
81                }
82                AppError::Query(msg)
83            })?;
84
85        Ok(json!({
86            "status": "success",
87            "message": format!("Database '{name}' created successfully."),
88            "database_name": name,
89        }))
90    }
91}