1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5pub async fn show_replication_status(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
7 let (in_recovery,): (bool,) = client
8 .query_one("SELECT pg_is_in_recovery()", &[])
9 .await
10 .map(|r| (r.get(0),))?;
11
12 if !in_recovery {
13 return Ok(json!({
14 "is_wal_replay_paused": false,
15 "last_wal_receive_lsn": null,
16 "last_wal_replay_lsn": null,
17 "uptime": null,
18 "in_recovery": false,
19 "hint": "Server is a primary, not a replica"
20 }));
21 }
22
23 let rows = client
24 .query(
25 "SELECT pg_is_wal_replay_paused(), pg_last_wal_receive_lsn(),
26 pg_last_wal_replay_lsn(), now() - pg_postmaster_start_time() as uptime",
27 &[],
28 )
29 .await?;
30
31 let row = &rows[0];
32
33 Ok(json!({
34 "is_wal_replay_paused": row.get::<_, bool>(0),
35 "last_wal_receive_lsn": row.get::<_, Option<String>>(1),
36 "last_wal_replay_lsn": row.get::<_, Option<String>>(2),
37 "uptime": row.get::<_, Option<String>>(3),
38 "in_recovery": true,
39 }))
40}
41
42pub async fn list_replication_slots(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
44 let rows = client
45 .query(
46 "SELECT slot_name, slot_type, database::text, active, restart_lsn::text, confirmed_flush_lsn::text
47 FROM pg_replication_slots
48 ORDER BY slot_name",
49 &[],
50 )
51 .await?;
52
53 let slots: Vec<Value> = rows
54 .iter()
55 .map(|row| {
56 json!({
57 "slot_name": row.get::<_, String>(0),
58 "slot_type": row.get::<_, String>(1),
59 "database": row.get::<_, Option<String>>(2),
60 "active": row.get::<_, bool>(3),
61 "restart_lsn": row.get::<_, Option<String>>(4),
62 "confirmed_flush_lsn": row.get::<_, Option<String>>(5),
63 })
64 })
65 .collect();
66
67 Ok(json!({ "replication_slots": slots }))
68}
69
70pub async fn list_standby_servers(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
72 let rows = client
73 .query(
74 "SELECT client_addr, client_port, state, sync_state, write_lag, flush_lag, replay_lag
75 FROM pg_stat_replication
76 ORDER BY client_addr, client_port",
77 &[],
78 )
79 .await?;
80
81 let standbys: Vec<Value> = rows
82 .iter()
83 .map(|row| {
84 json!({
85 "client_address": row.get::<_, Option<String>>(0),
86 "client_port": row.get::<_, Option<i32>>(1),
87 "state": row.get::<_, String>(2),
88 "sync_state": row.get::<_, String>(3),
89 "write_lag": row.get::<_, Option<String>>(4),
90 "flush_lag": row.get::<_, Option<String>>(5),
91 "replay_lag": row.get::<_, Option<String>>(6),
92 })
93 })
94 .collect();
95
96 Ok(json!({ "standbys": standbys }))
97}
98
99pub async fn show_wal_info(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
101 let (in_recovery,): (bool,) = client
102 .query_one("SELECT pg_is_in_recovery()", &[])
103 .await
104 .map(|r| (r.get(0),))?;
105
106 let wal_replay_paused = if in_recovery {
107 let r = client.query_one("SELECT pg_is_wal_replay_paused()", &[]).await?;
108 Some(r.get::<_, bool>(0))
109 } else {
110 None
111 };
112
113 let rows = client
114 .query(
115 "SELECT pg_current_wal_lsn()::text, pg_current_wal_insert_lsn()::text,
116 pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0')::bigint as bytes",
117 &[],
118 )
119 .await?;
120
121 let row = &rows[0];
122
123 Ok(json!({
124 "current_wal_lsn": row.get::<_, String>(0),
125 "current_wal_insert_lsn": row.get::<_, String>(1),
126 "wal_replay_paused": wal_replay_paused,
127 "wal_size_bytes": row.get::<_, i64>(2),
128 "in_recovery": in_recovery,
129 }))
130}
131
132pub async fn show_base_backup_progress(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
134 let query = match client.query_one(
137 "SELECT count(*) FROM pg_class WHERE relname = 'pg_stat_progress_basebackup'", &[]
138 ).await {
139 Ok(r) if r.get::<_, i64>(0) > 0 => {
140 "SELECT phase, backup_total, backup_streamed, tablespaces_total, tablespaces_streamed
141 FROM pg_stat_progress_basebackup WHERE phase IS NOT NULL"
142 }
143 _ => {
144 return Ok(json!({
146 "status": "unavailable",
147 "message": "Base backup progress requires PostgreSQL 13+ (pg_stat_progress_basebackup view not found)"
148 }));
149 }
150 };
151 let rows = client
152 .query(query, &[])
153 .await;
154
155 match rows {
156 Ok(r) => {
157 if r.is_empty() {
158 return Ok(json!({
159 "status": "no_backup",
160 "message": "No base backup in progress"
161 }));
162 }
163
164 let row = &r[0];
165
166 Ok(json!({
167 "phase": row.get::<_, String>(0),
168 "backup_total": row.get::<_, Option<i64>>(1),
169 "backup_streamed": row.get::<_, Option<i64>>(2),
170 "tablespaces_total": row.get::<_, i64>(3),
171 "tablespaces_streamed": row.get::<_, i64>(4),
172 }))
173 }
174 Err(_) => {
175 Ok(json!({
176 "status": "unavailable",
177 "message": "Base backup progress not available on this PostgreSQL version"
178 }))
179 }
180 }
181}