1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5pub async fn show_active_transactions(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
7 let rows = client
8 .query(
9 "SELECT pid, usename, application_name, state, xact_start, query_start, query
10 FROM pg_stat_activity
11 WHERE xact_start IS NOT NULL AND pid != pg_backend_pid()
12 ORDER BY xact_start ASC",
13 &[],
14 )
15 .await?;
16
17 let transactions: Vec<Value> = rows
18 .iter()
19 .map(|row| {
20 json!({
21 "pid": row.get::<_, i32>(0),
22 "user": row.get::<_, String>(1),
23 "application": row.get::<_, Option<String>>(2),
24 "state": row.get::<_, String>(3),
25 "xact_start": row.get::<_, String>(4),
26 "query_start": row.get::<_, String>(5),
27 "query": row.get::<_, Option<String>>(6),
28 })
29 })
30 .collect();
31
32 Ok(json!({ "transactions": transactions }))
33}
34
35pub async fn show_locks(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
37 let rows = client
38 .query(
39 "SELECT l.pid, a.usename, a.application_name, l.mode, l.granted, l.fastpath,
40 a.query_start, a.query
41 FROM pg_locks l
42 JOIN pg_stat_activity a ON l.pid = a.pid
43 WHERE l.pid != pg_backend_pid()
44 ORDER BY l.pid, l.mode",
45 &[],
46 )
47 .await?;
48
49 let locks: Vec<Value> = rows
50 .iter()
51 .map(|row| {
52 json!({
53 "pid": row.get::<_, i32>(0),
54 "user": row.get::<_, String>(1),
55 "application": row.get::<_, Option<String>>(2),
56 "lock_type": row.get::<_, String>(3),
57 "granted": row.get::<_, bool>(4),
58 "fastpath": row.get::<_, bool>(5),
59 "query_start": row.get::<_, Option<String>>(6),
60 "query": row.get::<_, Option<String>>(7),
61 })
62 })
63 .collect();
64
65 Ok(json!({ "locks": locks }))
66}
67
68pub async fn show_waiting_locks(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
70 let rows = client
71 .query(
72 "SELECT l.pid, a.usename, l.mode, a.query_start, a.query
73 FROM pg_locks l
74 JOIN pg_stat_activity a ON l.pid = a.pid
75 WHERE NOT l.granted AND l.pid != pg_backend_pid()
76 ORDER BY a.query_start ASC",
77 &[],
78 )
79 .await?;
80
81 let waiting: Vec<Value> = rows
82 .iter()
83 .map(|row| {
84 json!({
85 "pid": row.get::<_, i32>(0),
86 "user": row.get::<_, String>(1),
87 "lock_type": row.get::<_, String>(2),
88 "query_start": row.get::<_, String>(3),
89 "query": row.get::<_, Option<String>>(4),
90 })
91 })
92 .collect();
93
94 Ok(json!({ "waiting_locks": waiting }))
95}
96
97pub async fn begin_transaction(client: &Client, params: &Option<Value>) -> MCPResult<Value> {
99 let isolation_level = params
100 .as_ref()
101 .and_then(|p| p.get("isolation_level").and_then(|v| v.as_str()).map(|s| s.to_string()))
102 .unwrap_or_else(|| "READ COMMITTED".to_string());
103
104 let valid_levels = vec!["SERIALIZABLE", "REPEATABLE READ", "READ COMMITTED", "READ UNCOMMITTED"];
105 if !valid_levels.contains(&isolation_level.as_str()) {
106 return Err(crate::errors::MCPError::InvalidParams(
107 format!("Invalid isolation level: {}", isolation_level)
108 ));
109 }
110
111 let sql = format!("BEGIN ISOLATION LEVEL {}", isolation_level);
112 client.execute(&sql, &[]).await?;
113
114 Ok(json!({
115 "status": "success",
116 "action": "BEGIN",
117 "isolation_level": isolation_level
118 }))
119}
120
121pub async fn commit_transaction(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
123 client.execute("COMMIT", &[]).await?;
124
125 Ok(json!({
126 "status": "success",
127 "action": "COMMIT"
128 }))
129}
130
131pub async fn rollback_transaction(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
133 client.execute("ROLLBACK", &[]).await?;
134
135 Ok(json!({
136 "status": "success",
137 "action": "ROLLBACK"
138 }))
139}
140
141pub async fn show_transaction_isolation(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
143 let rows = client
144 .query("SHOW transaction_isolation", &[])
145 .await?;
146
147 let level = rows[0].get::<_, String>(0);
148
149 Ok(json!({
150 "isolation_level": level,
151 "available_levels": ["serializable", "repeatable read", "read committed", "read uncommitted"]
152 }))
153}
154
155pub async fn show_deadlocks(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
157 let rows = client
158 .query(
159 "SELECT pid, usename, application_name, state, query_start, query
160 FROM pg_stat_activity
161 WHERE state = 'disabled' OR wait_event = 'ProcArrayLock'
162 ORDER BY query_start ASC",
163 &[],
164 )
165 .await?;
166
167 let deadlocks: Vec<Value> = rows
168 .iter()
169 .map(|row| {
170 json!({
171 "pid": row.get::<_, i32>(0),
172 "user": row.get::<_, String>(1),
173 "application": row.get::<_, Option<String>>(2),
174 "state": row.get::<_, String>(3),
175 "query_start": row.get::<_, String>(4),
176 "query": row.get::<_, Option<String>>(5),
177 })
178 })
179 .collect();
180
181 Ok(json!({ "potential_deadlocks": deadlocks }))
182}
183
184pub async fn show_autocommit_status(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
190 let autocommit = match client.query("SHOW autocommit", &[]).await {
191 Ok(rows) => rows[0].get::<_, String>(0) == "on",
192 Err(_) => true, };
194
195 Ok(json!({
196 "autocommit": autocommit,
197 "value": if autocommit { "on" } else { "off" }
198 }))
199}
200
201pub async fn show_transaction_timeout(client: &Client, _params: &Option<Value>) -> MCPResult<Value> {
203 let rows = client
204 .query("SHOW statement_timeout", &[])
205 .await?;
206
207 let timeout = rows[0].get::<_, String>(0);
208
209 Ok(json!({
210 "statement_timeout": timeout
211 }))
212}