Skip to main content

mcp_postgres/actions/
session_mgmt.rs

1use crate::errors::Result as MCPResult;
2use serde_json::{Value, json};
3use tokio_postgres::Client;
4
5pub async fn cancel_query(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
6    let pid = params
7        .as_ref()
8        .and_then(|p| p.get("pid").and_then(|v| v.as_i64()))
9        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'pid' parameter".into()))?;
10
11    let rows = client
12        .query(
13            "SELECT pg_cancel_backend($1) AS cancelled",
14            &[&(pid as i32)],
15        )
16        .await?;
17    let cancelled: bool = rows[0].get(0);
18
19    Ok(json!({
20        "pid": pid,
21        "cancelled": cancelled,
22        "message": if cancelled { "Query cancellation sent" } else { "No active query found for this PID" }
23    }))
24}
25
26pub async fn terminate_connection(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
27    let pid = params
28        .as_ref()
29        .and_then(|p| p.get("pid").and_then(|v| v.as_i64()))
30        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'pid' parameter".into()))?;
31
32    let rows = client
33        .query(
34            "SELECT pg_terminate_backend($1) AS terminated",
35            &[&(pid as i32)],
36        )
37        .await?;
38    let terminated: bool = rows[0].get(0);
39
40    Ok(json!({
41        "pid": pid,
42        "terminated": terminated,
43        "message": if terminated { "Connection terminated" } else { "No connection found with this PID" }
44    }))
45}
46
47pub async fn show_blocked_queries(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
48    let rows = client
49        .query(
50            "SELECT
51                blocked.pid AS blocked_pid,
52                blocked.usename AS blocked_user,
53                blocked.query AS blocked_query,
54                blocked.query_start AS blocked_start,
55                blocking.pid AS blocking_pid,
56                blocking.usename AS blocking_user,
57                blocking.query AS blocking_query,
58                blocking.query_start AS blocking_start,
59                pg_blocking_pids(blocked.pid) AS blocking_pids
60             FROM pg_stat_activity blocked
61             JOIN pg_stat_activity blocking ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))
62             WHERE blocked.state != 'idle'
63             ORDER BY blocked.query_start",
64            &[],
65        )
66        .await?;
67
68    let blocks: Vec<Value> = rows
69        .iter()
70        .map(|row| {
71            json!({
72                "blocked_pid": row.get::<_, i32>(0),
73                "blocked_user": row.get::<_, Option<String>>(1),
74                "blocked_query": row.get::<_, Option<String>>(2),
75                "blocked_start": row.get::<_, Option<String>>(3),
76                "blocking_pid": row.get::<_, i32>(4),
77                "blocking_user": row.get::<_, Option<String>>(5),
78                "blocking_query": row.get::<_, Option<String>>(6),
79                "blocking_start": row.get::<_, Option<String>>(7),
80            })
81        })
82        .collect();
83
84    Ok(json!({ "blocked_queries": blocks }))
85}