mcp_postgres/actions/
connections.rs1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5const MAX_PID: i64 = 4_000_000;
6
7pub async fn list_connections(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
9 let rows = client
10 .query(
11 "SELECT pid, usename::text, application_name, state,
12 state_change::text, backend_start::text, query_start::text
13 FROM pg_stat_activity
14 WHERE pid != pg_backend_pid()
15 ORDER BY backend_start DESC",
16 &[],
17 )
18 .await?;
19
20 let connections: Vec<Value> = rows
21 .iter()
22 .map(|row| {
23 json!({
24 "pid": row.get::<_, i32>(0),
25 "user": row.get::<_, Option<String>>(1),
26 "application": row.get::<_, Option<String>>(2),
27 "state": row.get::<_, Option<String>>(3),
28 "state_change": row.get::<_, Option<String>>(4),
29 "backend_start": row.get::<_, Option<String>>(5),
30 "query_start": row.get::<_, Option<String>>(6),
31 })
32 })
33 .collect();
34
35 Ok(json!({ "connections": connections }))
36}
37
38pub async fn kill_connection(client: &Client, params: &Option<Value>) -> MCPResult<Value> {
40 let pid = params
41 .as_ref()
42 .and_then(|p| p.get("pid").and_then(|v| v.as_i64()))
43 .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'pid' parameter".into()))?;
44
45 if pid <= 0 || pid > MAX_PID {
46 return Err(crate::errors::MCPError::InvalidParams(
47 format!("'pid' must be between 1 and {MAX_PID}")
48 ));
49 }
50
51 let rows = client
52 .query("SELECT pg_terminate_backend($1)", &[&(pid as i32)])
53 .await?;
54
55 let terminated = rows[0].get::<_, bool>(0);
56
57 Ok(json!({
58 "pid": pid,
59 "terminated": terminated
60 }))
61}
62
63pub async fn show_current_user(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
65 let rows = client
66 .query("SELECT current_user, current_database(), version()", &[])
67 .await?;
68
69 let row = &rows[0];
70
71 Ok(json!({
72 "user": row.get::<_, String>(0),
73 "database": row.get::<_, String>(1),
74 "version": row.get::<_, String>(2),
75 }))
76}
77
78pub async fn show_running_queries(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
80 let rows = client
81 .query(
82 "SELECT pid, usename, application_name, state, query, query_start
83 FROM pg_stat_activity
84 WHERE state != 'idle' AND pid != pg_backend_pid()
85 ORDER BY query_start DESC",
86 &[],
87 )
88 .await?;
89
90 let queries: Vec<Value> = rows
91 .iter()
92 .map(|row| {
93 json!({
94 "pid": row.get::<_, i32>(0),
95 "user": row.get::<_, String>(1),
96 "application": row.get::<_, Option<String>>(2),
97 "state": row.get::<_, String>(3),
98 "query": row.get::<_, Option<String>>(4),
99 "query_start": row.get::<_, Option<String>>(5),
100 })
101 })
102 .collect();
103
104 Ok(json!({ "queries": queries }))
105}
106
107pub async fn show_connection_summary(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
109 let rows = client
110 .query(
111 "SELECT state, count(*) as count
112 FROM pg_stat_activity
113 GROUP BY state
114 ORDER BY count DESC",
115 &[],
116 )
117 .await?;
118
119 let summary: Vec<Value> = rows
120 .iter()
121 .map(|row| {
122 json!({
123 "state": row.get::<_, Option<String>>(0).unwrap_or_else(|| "unknown".to_string()),
124 "count": row.get::<_, i64>(1),
125 })
126 })
127 .collect();
128
129 Ok(json!({ "summary": summary }))
130}