Skip to main content

mcp_postgres/actions/
transactions.rs

1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5/// 41. Show active transactions
6pub async fn show_active_transactions(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
7    let rows = client
8        .query(
9            "SELECT pid, usename, application_name, state, xact_start, query_start, query
10             FROM pg_stat_activity
11             WHERE xact_start IS NOT NULL AND pid != pg_backend_pid()
12             ORDER BY xact_start ASC",
13            &[],
14        )
15        .await?;
16
17    let transactions: Vec<Value> = rows
18        .iter()
19        .map(|row| {
20            json!({
21                "pid": row.get::<_, i32>(0),
22                "user": row.get::<_, String>(1),
23                "application": row.get::<_, Option<String>>(2),
24                "state": row.get::<_, String>(3),
25                "xact_start": row.get::<_, String>(4),
26                "query_start": row.get::<_, String>(5),
27                "query": row.get::<_, Option<String>>(6),
28            })
29        })
30        .collect();
31
32    Ok(json!({ "transactions": transactions }))
33}
34
35/// 42. Show locks
36pub async fn show_locks(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
37    let rows = client
38        .query(
39            "SELECT l.pid, a.usename, a.application_name, l.mode, l.granted, l.fastpath,
40                    a.query_start, a.query
41             FROM pg_locks l
42             JOIN pg_stat_activity a ON l.pid = a.pid
43             WHERE l.pid != pg_backend_pid()
44             ORDER BY l.pid, l.mode",
45            &[],
46        )
47        .await?;
48
49    let locks: Vec<Value> = rows
50        .iter()
51        .map(|row| {
52            json!({
53                "pid": row.get::<_, i32>(0),
54                "user": row.get::<_, String>(1),
55                "application": row.get::<_, Option<String>>(2),
56                "lock_type": row.get::<_, String>(3),
57                "granted": row.get::<_, bool>(4),
58                "fastpath": row.get::<_, bool>(5),
59                "query_start": row.get::<_, Option<String>>(6),
60                "query": row.get::<_, Option<String>>(7),
61            })
62        })
63        .collect();
64
65    Ok(json!({ "locks": locks }))
66}
67
68/// 43. Show waiting locks
69pub async fn show_waiting_locks(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
70    let rows = client
71        .query(
72            "SELECT l.pid, a.usename, l.mode, a.query_start, a.query
73             FROM pg_locks l
74             JOIN pg_stat_activity a ON l.pid = a.pid
75             WHERE NOT l.granted AND l.pid != pg_backend_pid()
76             ORDER BY a.query_start ASC",
77            &[],
78        )
79        .await?;
80
81    let waiting: Vec<Value> = rows
82        .iter()
83        .map(|row| {
84            json!({
85                "pid": row.get::<_, i32>(0),
86                "user": row.get::<_, String>(1),
87                "lock_type": row.get::<_, String>(2),
88                "query_start": row.get::<_, String>(3),
89                "query": row.get::<_, Option<String>>(4),
90            })
91        })
92        .collect();
93
94    Ok(json!({ "waiting_locks": waiting }))
95}
96
97/// 44. Begin transaction
98pub async fn begin_transaction(client: &Client, params: &Option<Value>) -> MCPResult<Value> {
99    let isolation_level = params
100        .as_ref()
101        .and_then(|p| p.get("isolation_level").and_then(|v| v.as_str()).map(|s| s.to_string()))
102        .unwrap_or_else(|| "READ COMMITTED".to_string());
103
104    let valid_levels = vec!["SERIALIZABLE", "REPEATABLE READ", "READ COMMITTED", "READ UNCOMMITTED"];
105    if !valid_levels.contains(&isolation_level.as_str()) {
106        return Err(crate::errors::MCPError::InvalidParams(
107            format!("Invalid isolation level: {}", isolation_level)
108        ));
109    }
110
111    let sql = format!("BEGIN ISOLATION LEVEL {}", isolation_level);
112    client.execute(&sql, &[]).await?;
113
114    Ok(json!({
115        "status": "success",
116        "action": "BEGIN",
117        "isolation_level": isolation_level
118    }))
119}
120
121/// 45. Commit transaction
122pub async fn commit_transaction(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
123    client.execute("COMMIT", &[]).await?;
124
125    Ok(json!({
126        "status": "success",
127        "action": "COMMIT"
128    }))
129}
130
131/// 46. Rollback transaction
132pub async fn rollback_transaction(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
133    client.execute("ROLLBACK", &[]).await?;
134
135    Ok(json!({
136        "status": "success",
137        "action": "ROLLBACK"
138    }))
139}
140
141/// 47. Show transaction isolation levels
142pub async fn show_transaction_isolation(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
143    let rows = client
144        .query("SHOW transaction_isolation", &[])
145        .await?;
146
147    let level = rows[0].get::<_, String>(0);
148
149    Ok(json!({
150        "isolation_level": level,
151        "available_levels": ["serializable", "repeatable read", "read committed", "read uncommitted"]
152    }))
153}
154
155/// 48. Show deadlocks
156pub async fn show_deadlocks(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
157    let rows = client
158        .query(
159            "SELECT pid, usename, application_name, state, query_start, query
160             FROM pg_stat_activity
161             WHERE state = 'disabled' OR wait_event = 'ProcArrayLock'
162             ORDER BY query_start ASC",
163            &[],
164        )
165        .await?;
166
167    let deadlocks: Vec<Value> = rows
168        .iter()
169        .map(|row| {
170            json!({
171                "pid": row.get::<_, i32>(0),
172                "user": row.get::<_, String>(1),
173                "application": row.get::<_, Option<String>>(2),
174                "state": row.get::<_, String>(3),
175                "query_start": row.get::<_, String>(4),
176                "query": row.get::<_, Option<String>>(5),
177            })
178        })
179        .collect();
180
181    Ok(json!({ "potential_deadlocks": deadlocks }))
182}
183
184/// 49. Show auto commit status
185///
186/// Note: PostgreSQL 17+ removed the `autocommit` GUC.
187/// Autocommit is always-on in the wire protocol and cannot be disabled.
188/// For PG < 17, we query `SHOW autocommit`; for PG >= 17, we return `true`.
189pub async fn show_autocommit_status(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
190    let autocommit = match client.query("SHOW autocommit", &[]).await {
191        Ok(rows) => rows[0].get::<_, String>(0) == "on",
192        Err(_) => true, // PG 17+ removed the setting; always-on
193    };
194
195    Ok(json!({
196        "autocommit": autocommit,
197        "value": if autocommit { "on" } else { "off" }
198    }))
199}
200
201/// 50. Show transaction timeout
202pub async fn show_transaction_timeout(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
203    let rows = client
204        .query("SHOW statement_timeout", &[])
205        .await?;
206
207    let timeout = rows[0].get::<_, String>(0);
208
209    Ok(json!({
210        "statement_timeout": timeout
211    }))
212}