1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::{MCPError, Result as MCPResult};
4
5const MAX_IDENTIFIER_LEN: usize = 255;
6
7fn validate_identifier(name: &str, label: &str) -> std::result::Result<(), MCPError> {
8 if name.is_empty() {
9 return Err(MCPError::InvalidParams(format!("'{label}' must not be empty")));
10 }
11 if name.len() > MAX_IDENTIFIER_LEN {
12 return Err(MCPError::InvalidParams(
13 format!("'{label}' exceeds maximum length of {MAX_IDENTIFIER_LEN} characters (got {})", name.len())
14 ));
15 }
16 Ok(())
17}
18
19pub async fn list_tables(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
21 let rows = client
22 .query(
23 "SELECT table_schema, table_name, table_type
24 FROM information_schema.tables
25 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
26 ORDER BY table_schema, table_name",
27 &[],
28 )
29 .await?;
30
31 let tables: Vec<Value> = rows
32 .iter()
33 .map(|row| {
34 json!({
35 "schema": row.get::<_, String>(0),
36 "name": row.get::<_, String>(1),
37 "type": row.get::<_, String>(2),
38 })
39 })
40 .collect();
41
42 Ok(json!({ "tables": tables }))
43}
44
45pub async fn describe_table(client: &Client, params: &Option<Value>) -> MCPResult<Value> {
47 let table_name = params
48 .as_ref()
49 .and_then(|p| p.get("table"))
50 .and_then(|v| v.as_str())
51 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'table' parameter".into()))?;
52
53 validate_identifier(table_name, "table")?;
54
55 let rows = client
56 .query(
57 "SELECT column_name, data_type, is_nullable, column_default, ordinal_position
58 FROM information_schema.columns
59 WHERE table_name = $1
60 ORDER BY ordinal_position",
61 &[&table_name],
62 )
63 .await?;
64
65 let columns: Vec<Value> = rows
66 .iter()
67 .map(|row| {
68 json!({
69 "name": row.get::<_, String>(0),
70 "type": row.get::<_, String>(1),
71 "nullable": row.get::<_, String>(2),
72 "default": row.get::<_, Option<String>>(3),
73 "position": row.get::<_, i32>(4),
74 })
75 })
76 .collect();
77
78 Ok(json!({ "columns": columns }))
79}
80
81pub async fn list_indexes(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
83 let rows = client
84 .query(
85 "SELECT schemaname, tablename, indexname, indexdef
86 FROM pg_indexes
87 WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
88 ORDER BY schemaname, tablename, indexname",
89 &[],
90 )
91 .await?;
92
93 let indexes: Vec<Value> = rows
94 .iter()
95 .map(|row| {
96 json!({
97 "schema": row.get::<_, String>(0),
98 "table": row.get::<_, String>(1),
99 "name": row.get::<_, String>(2),
100 "definition": row.get::<_, String>(3),
101 })
102 })
103 .collect();
104
105 Ok(json!({ "indexes": indexes }))
106}
107
108pub async fn list_schemas(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
110 let rows = client
111 .query(
112 "SELECT schema_name, schema_owner
113 FROM information_schema.schemata
114 WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
115 ORDER BY schema_name",
116 &[],
117 )
118 .await?;
119
120 let schemas: Vec<Value> = rows
121 .iter()
122 .map(|row| {
123 json!({
124 "name": row.get::<_, String>(0),
125 "owner": row.get::<_, String>(1),
126 })
127 })
128 .collect();
129
130 Ok(json!({ "schemas": schemas }))
131}
132
133pub async fn show_constraints(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
135 let rows = client
136 .query(
137 "SELECT table_schema, table_name, constraint_name, constraint_type
138 FROM information_schema.table_constraints
139 WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
140 ORDER BY table_schema, table_name, constraint_name",
141 &[],
142 )
143 .await?;
144
145 let constraints: Vec<Value> = rows
146 .iter()
147 .map(|row| {
148 json!({
149 "schema": row.get::<_, String>(0),
150 "table": row.get::<_, String>(1),
151 "name": row.get::<_, String>(2),
152 "type": row.get::<_, String>(3),
153 })
154 })
155 .collect();
156
157 Ok(json!({ "constraints": constraints }))
158}
159
160pub async fn get_object_details(client: &Client, params: &Option<Value>) -> MCPResult<Value> {
162 let schema_name = params
163 .as_ref()
164 .and_then(|p| p.get("schema"))
165 .and_then(|v| v.as_str())
166 .unwrap_or("public");
167
168 if schema_name.len() > MAX_IDENTIFIER_LEN {
169 return Err(MCPError::InvalidParams(
170 format!("'schema' exceeds maximum length of {MAX_IDENTIFIER_LEN} characters (got {})", schema_name.len())
171 ));
172 }
173
174 let table_name = params
175 .as_ref()
176 .and_then(|p| p.get("table"))
177 .and_then(|v| v.as_str())
178 .ok_or_else(|| MCPError::InvalidParams("Missing 'table' parameter".into()))?;
179
180 validate_identifier(table_name, "table")?;
181
182 let columns = client
183 .query(
184 "SELECT c.column_name::text, c.data_type::text, c.is_nullable::text,
185 c.column_default::text, c.ordinal_position,
186 COALESCE(pgd.description, '')::text AS column_description,
187 CASE WHEN pk.column_name IS NOT NULL THEN true ELSE false END AS is_pk,
188 CASE WHEN uc.column_name IS NOT NULL THEN true ELSE false END AS is_unique
189 FROM information_schema.columns c
190 LEFT JOIN pg_catalog.pg_statio_all_tables st
191 ON st.relname = c.table_name AND st.schemaname = c.table_schema
192 LEFT JOIN pg_catalog.pg_description pgd
193 ON pgd.objoid = st.relid AND pgd.objsubid = c.ordinal_position
194 LEFT JOIN (
195 SELECT ku.column_name, tc.table_schema, tc.table_name
196 FROM information_schema.table_constraints tc
197 JOIN information_schema.key_column_usage ku
198 ON tc.constraint_name = ku.constraint_name
199 AND tc.table_schema = ku.table_schema
200 WHERE tc.constraint_type = 'PRIMARY KEY'
201 ) pk ON pk.column_name = c.column_name
202 AND pk.table_schema = c.table_schema
203 AND pk.table_name = c.table_name
204 LEFT JOIN (
205 SELECT ku.column_name, tc.table_schema, tc.table_name
206 FROM information_schema.table_constraints tc
207 JOIN information_schema.key_column_usage ku
208 ON tc.constraint_name = ku.constraint_name
209 AND tc.table_schema = ku.table_schema
210 WHERE tc.constraint_type = 'UNIQUE'
211 ) uc ON uc.column_name = c.column_name
212 AND uc.table_schema = c.table_schema
213 AND uc.table_name = c.table_name
214 WHERE c.table_schema = $1 AND c.table_name = $2
215 ORDER BY c.ordinal_position",
216 &[&schema_name, &table_name],
217 )
218 .await?;
219
220 let cols: Vec<Value> = columns.iter().map(|row| {
221 json!({
222 "name": row.get::<_, String>(0),
223 "type": row.get::<_, String>(1),
224 "nullable": row.get::<_, String>(2) == "YES",
225 "default": row.get::<_, Option<String>>(3),
226 "position": row.get::<_, i32>(4),
227 "description": row.get::<_, String>(5),
228 "is_primary_key": row.get::<_, bool>(6),
229 "is_unique": row.get::<_, bool>(7),
230 })
231 }).collect();
232
233 let indexes = client
234 .query(
235 "SELECT indexname::text, indexdef::text
236 FROM pg_indexes
237 WHERE schemaname = $1 AND tablename = $2
238 ORDER BY indexname",
239 &[&schema_name, &table_name],
240 )
241 .await?;
242
243 let idxs: Vec<Value> = indexes.iter().map(|row| {
244 json!({
245 "name": row.get::<_, String>(0),
246 "definition": row.get::<_, String>(1),
247 })
248 }).collect();
249
250 let foreign_keys = client
251 .query(
252 "SELECT kcu.column_name::text,
253 ccu.table_schema::text AS foreign_schema,
254 ccu.table_name::text AS foreign_table,
255 ccu.column_name::text AS foreign_column,
256 rc.update_rule::text, rc.delete_rule::text
257 FROM information_schema.table_constraints tc
258 JOIN information_schema.key_column_usage kcu
259 ON tc.constraint_name = kcu.constraint_name
260 AND tc.table_schema = kcu.table_schema
261 JOIN information_schema.constraint_column_usage ccu
262 ON tc.constraint_name = ccu.constraint_name
263 AND tc.table_schema = ccu.table_schema
264 JOIN information_schema.referential_constraints rc
265 ON tc.constraint_name = rc.constraint_name
266 AND tc.table_schema = rc.constraint_schema
267 WHERE tc.constraint_type = 'FOREIGN KEY'
268 AND tc.table_schema = $1 AND tc.table_name = $2
269 ORDER BY kcu.ordinal_position",
270 &[&schema_name, &table_name],
271 )
272 .await?;
273
274 let fks: Vec<Value> = foreign_keys.iter().map(|row| {
275 json!({
276 "column": row.get::<_, String>(0),
277 "references_schema": row.get::<_, String>(1),
278 "references_table": row.get::<_, String>(2),
279 "references_column": row.get::<_, String>(3),
280 "on_update": row.get::<_, String>(4),
281 "on_delete": row.get::<_, String>(5),
282 })
283 }).collect();
284
285 let constraints = client
286 .query(
287 "SELECT constraint_name::text, constraint_type::text
288 FROM information_schema.table_constraints
289 WHERE table_schema = $1 AND table_name = $2
290 ORDER BY constraint_name",
291 &[&schema_name, &table_name],
292 )
293 .await?;
294
295 let cons: Vec<Value> = constraints.iter().map(|row| {
296 json!({
297 "name": row.get::<_, String>(0),
298 "type": row.get::<_, String>(1),
299 })
300 }).collect();
301
302 let row_estimate = client
303 .query_one(
304 "SELECT n_live_tup FROM pg_stat_user_tables
305 WHERE schemaname = $1 AND relname = $2",
306 &[&schema_name, &table_name],
307 )
308 .await
309 .map(|r| r.get::<_, Option<f64>>(0))
310 .unwrap_or(None);
311
312 let table_size = client
313 .query_one(
314 "SELECT pg_size_pretty(pg_total_relation_size($1::regclass))",
315 &[&format!("{}.{}", schema_name, table_name)],
316 )
317 .await
318 .map(|r| r.get::<_, Option<String>>(0))
319 .unwrap_or(None);
320
321 Ok(json!({
322 "table": table_name,
323 "schema": schema_name,
324 "columns": cols,
325 "indexes": idxs,
326 "foreign_keys": fks,
327 "constraints": cons,
328 "estimated_rows": row_estimate,
329 "total_size": table_size,
330 }))
331}