Skip to main content

mcp_postgres/actions/
session_mgmt.rs

1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5pub async fn cancel_query(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
6    let pid = params.as_ref().and_then(|p| p.get("pid").and_then(|v| v.as_i64()))
7        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'pid' parameter".into()))?;
8
9    let rows = client.query("SELECT pg_cancel_backend($1) AS cancelled", &[&(pid as i32)]).await?;
10    let cancelled: bool = rows[0].get(0);
11
12    Ok(json!({
13        "pid": pid,
14        "cancelled": cancelled,
15        "message": if cancelled { "Query cancellation sent" } else { "No active query found for this PID" }
16    }))
17}
18
19pub async fn terminate_connection(client: &Client, params: &Option<&Value>) -> MCPResult<Value> {
20    let pid = params.as_ref().and_then(|p| p.get("pid").and_then(|v| v.as_i64()))
21        .ok_or_else(|| crate::errors::MCPError::InvalidParams("Missing 'pid' parameter".into()))?;
22
23    let rows = client.query("SELECT pg_terminate_backend($1) AS terminated", &[&(pid as i32)]).await?;
24    let terminated: bool = rows[0].get(0);
25
26    Ok(json!({
27        "pid": pid,
28        "terminated": terminated,
29        "message": if terminated { "Connection terminated" } else { "No connection found with this PID" }
30    }))
31}
32
33pub async fn show_blocked_queries(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
34    let rows = client
35        .query(
36            "SELECT
37                blocked.pid AS blocked_pid,
38                blocked.usename AS blocked_user,
39                blocked.query AS blocked_query,
40                blocked.query_start AS blocked_start,
41                blocking.pid AS blocking_pid,
42                blocking.usename AS blocking_user,
43                blocking.query AS blocking_query,
44                blocking.query_start AS blocking_start,
45                pg_blocking_pids(blocked.pid) AS blocking_pids
46             FROM pg_stat_activity blocked
47             JOIN pg_stat_activity blocking ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))
48             WHERE blocked.state != 'idle'
49             ORDER BY blocked.query_start",
50            &[],
51        )
52        .await?;
53
54    let blocks: Vec<Value> = rows.iter().map(|row| {
55        json!({
56            "blocked_pid": row.get::<_, i32>(0),
57            "blocked_user": row.get::<_, Option<String>>(1),
58            "blocked_query": row.get::<_, Option<String>>(2),
59            "blocked_start": row.get::<_, Option<String>>(3),
60            "blocking_pid": row.get::<_, i32>(4),
61            "blocking_user": row.get::<_, Option<String>>(5),
62            "blocking_query": row.get::<_, Option<String>>(6),
63            "blocking_start": row.get::<_, Option<String>>(7),
64        })
65    }).collect();
66
67    Ok(json!({ "blocked_queries": blocks }))
68}