1use crate::errors::Result as MCPResult;
2use serde_json::{Value, json};
3use tokio_postgres::Client;
4
5pub async fn show_replication_status(
7 client: &Client,
8 _params: &Option<&Value>,
9) -> MCPResult<Value> {
10 let (in_recovery,): (bool,) = client
11 .query_one("SELECT pg_is_in_recovery()", &[])
12 .await
13 .map(|r| (r.get(0),))?;
14
15 if !in_recovery {
16 return Ok(json!({
17 "is_wal_replay_paused": false,
18 "last_wal_receive_lsn": null,
19 "last_wal_replay_lsn": null,
20 "uptime": null,
21 "in_recovery": false,
22 "hint": "Server is a primary, not a replica"
23 }));
24 }
25
26 let rows = client
27 .query(
28 "SELECT pg_is_wal_replay_paused(), pg_last_wal_receive_lsn(),
29 pg_last_wal_replay_lsn(), now() - pg_postmaster_start_time() as uptime",
30 &[],
31 )
32 .await?;
33
34 let row = &rows[0];
35
36 Ok(json!({
37 "is_wal_replay_paused": row.get::<_, bool>(0),
38 "last_wal_receive_lsn": row.get::<_, Option<String>>(1),
39 "last_wal_replay_lsn": row.get::<_, Option<String>>(2),
40 "uptime": row.get::<_, Option<String>>(3),
41 "in_recovery": true,
42 }))
43}
44
45pub async fn list_replication_slots(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
47 let rows = client
48 .query(
49 "SELECT slot_name, slot_type, database::text, active, restart_lsn::text, confirmed_flush_lsn::text
50 FROM pg_replication_slots
51 ORDER BY slot_name",
52 &[],
53 )
54 .await?;
55
56 let slots: Vec<Value> = rows
57 .iter()
58 .map(|row| {
59 json!({
60 "slot_name": row.get::<_, String>(0),
61 "slot_type": row.get::<_, String>(1),
62 "database": row.get::<_, Option<String>>(2),
63 "active": row.get::<_, bool>(3),
64 "restart_lsn": row.get::<_, Option<String>>(4),
65 "confirmed_flush_lsn": row.get::<_, Option<String>>(5),
66 })
67 })
68 .collect();
69
70 Ok(json!({ "replication_slots": slots }))
71}
72
73pub async fn list_standby_servers(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
75 let rows = client
76 .query(
77 "SELECT client_addr, client_port, state, sync_state, write_lag, flush_lag, replay_lag
78 FROM pg_stat_replication
79 ORDER BY client_addr, client_port",
80 &[],
81 )
82 .await?;
83
84 let standbys: Vec<Value> = rows
85 .iter()
86 .map(|row| {
87 json!({
88 "client_address": row.get::<_, Option<String>>(0),
89 "client_port": row.get::<_, Option<i32>>(1),
90 "state": row.get::<_, String>(2),
91 "sync_state": row.get::<_, String>(3),
92 "write_lag": row.get::<_, Option<String>>(4),
93 "flush_lag": row.get::<_, Option<String>>(5),
94 "replay_lag": row.get::<_, Option<String>>(6),
95 })
96 })
97 .collect();
98
99 Ok(json!({ "standbys": standbys }))
100}
101
102pub async fn show_wal_info(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
104 let (in_recovery,): (bool,) = client
105 .query_one("SELECT pg_is_in_recovery()", &[])
106 .await
107 .map(|r| (r.get(0),))?;
108
109 let wal_replay_paused = if in_recovery {
110 let r = client
111 .query_one("SELECT pg_is_wal_replay_paused()", &[])
112 .await?;
113 Some(r.get::<_, bool>(0))
114 } else {
115 None
116 };
117
118 let rows = client
119 .query(
120 "SELECT pg_current_wal_lsn()::text, pg_current_wal_insert_lsn()::text,
121 pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0')::bigint as bytes",
122 &[],
123 )
124 .await?;
125
126 let row = &rows[0];
127
128 Ok(json!({
129 "current_wal_lsn": row.get::<_, String>(0),
130 "current_wal_insert_lsn": row.get::<_, String>(1),
131 "wal_replay_paused": wal_replay_paused,
132 "wal_size_bytes": row.get::<_, i64>(2),
133 "in_recovery": in_recovery,
134 }))
135}
136
137pub async fn show_base_backup_progress(
139 client: &Client,
140 _params: &Option<&Value>,
141) -> MCPResult<Value> {
142 let query = match client
145 .query_one(
146 "SELECT count(*) FROM pg_class WHERE relname = 'pg_stat_progress_basebackup'",
147 &[],
148 )
149 .await
150 {
151 Ok(r) if r.get::<_, i64>(0) > 0 => {
152 "SELECT phase, backup_total, backup_streamed, tablespaces_total, tablespaces_streamed
153 FROM pg_stat_progress_basebackup WHERE phase IS NOT NULL"
154 }
155 _ => {
156 return Ok(json!({
158 "status": "unavailable",
159 "message": "Base backup progress requires PostgreSQL 13+ (pg_stat_progress_basebackup view not found)"
160 }));
161 }
162 };
163 let rows = client.query(query, &[]).await;
164
165 match rows {
166 Ok(r) => {
167 if r.is_empty() {
168 return Ok(json!({
169 "status": "no_backup",
170 "message": "No base backup in progress"
171 }));
172 }
173
174 let row = &r[0];
175
176 Ok(json!({
177 "phase": row.get::<_, String>(0),
178 "backup_total": row.get::<_, Option<i64>>(1),
179 "backup_streamed": row.get::<_, Option<i64>>(2),
180 "tablespaces_total": row.get::<_, i64>(3),
181 "tablespaces_streamed": row.get::<_, i64>(4),
182 }))
183 }
184 Err(_) => Ok(json!({
185 "status": "unavailable",
186 "message": "Base backup progress not available on this PostgreSQL version"
187 })),
188 }
189}