Skip to main content

mcp_postgres/actions/
replication.rs

1use serde_json::{json, Value};
2use tokio_postgres::Client;
3use crate::errors::Result as MCPResult;
4
5/// 36. Show replication status
6pub async fn show_replication_status(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
7    let (in_recovery,): (bool,) = client
8        .query_one("SELECT pg_is_in_recovery()", &[])
9        .await
10        .map(|r| (r.get(0),))?;
11
12    if !in_recovery {
13        return Ok(json!({
14            "is_wal_replay_paused": false,
15            "last_wal_receive_lsn": null,
16            "last_wal_replay_lsn": null,
17            "uptime": null,
18            "in_recovery": false,
19            "hint": "Server is a primary, not a replica"
20        }));
21    }
22
23    let rows = client
24        .query(
25            "SELECT pg_is_wal_replay_paused(), pg_last_wal_receive_lsn(),
26                    pg_last_wal_replay_lsn(), now() - pg_postmaster_start_time() as uptime",
27            &[],
28        )
29        .await?;
30
31    let row = &rows[0];
32
33    Ok(json!({
34        "is_wal_replay_paused": row.get::<_, bool>(0),
35        "last_wal_receive_lsn": row.get::<_, Option<String>>(1),
36        "last_wal_replay_lsn": row.get::<_, Option<String>>(2),
37        "uptime": row.get::<_, Option<String>>(3),
38        "in_recovery": true,
39    }))
40}
41
42/// 37. List replication slots
43pub async fn list_replication_slots(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
44    let rows = client
45        .query(
46            "SELECT slot_name, slot_type, database::text, active, restart_lsn::text, confirmed_flush_lsn::text
47             FROM pg_replication_slots
48             ORDER BY slot_name",
49            &[],
50        )
51        .await?;
52
53    let slots: Vec<Value> = rows
54        .iter()
55        .map(|row| {
56            json!({
57                "slot_name": row.get::<_, String>(0),
58                "slot_type": row.get::<_, String>(1),
59                "database": row.get::<_, Option<String>>(2),
60                "active": row.get::<_, bool>(3),
61                "restart_lsn": row.get::<_, Option<String>>(4),
62                "confirmed_flush_lsn": row.get::<_, Option<String>>(5),
63            })
64        })
65        .collect();
66
67    Ok(json!({ "replication_slots": slots }))
68}
69
70/// 38. List standby servers
71pub async fn list_standby_servers(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
72    let rows = client
73        .query(
74            "SELECT client_addr, client_port, state, sync_state, write_lag, flush_lag, replay_lag
75             FROM pg_stat_replication
76             ORDER BY client_addr, client_port",
77            &[],
78        )
79        .await?;
80
81    let standbys: Vec<Value> = rows
82        .iter()
83        .map(|row| {
84            json!({
85                "client_address": row.get::<_, Option<String>>(0),
86                "client_port": row.get::<_, Option<i32>>(1),
87                "state": row.get::<_, String>(2),
88                "sync_state": row.get::<_, String>(3),
89                "write_lag": row.get::<_, Option<String>>(4),
90                "flush_lag": row.get::<_, Option<String>>(5),
91                "replay_lag": row.get::<_, Option<String>>(6),
92            })
93        })
94        .collect();
95
96    Ok(json!({ "standbys": standbys }))
97}
98
99/// 39. Show WAL info
100pub async fn show_wal_info(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
101    let (in_recovery,): (bool,) = client
102        .query_one("SELECT pg_is_in_recovery()", &[])
103        .await
104        .map(|r| (r.get(0),))?;
105
106    let wal_replay_paused = if in_recovery {
107        let r = client.query_one("SELECT pg_is_wal_replay_paused()", &[]).await?;
108        Some(r.get::<_, bool>(0))
109    } else {
110        None
111    };
112
113    let rows = client
114        .query(
115            "SELECT pg_current_wal_lsn()::text, pg_current_wal_insert_lsn()::text,
116                    pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0')::bigint as bytes",
117            &[],
118        )
119        .await?;
120
121    let row = &rows[0];
122
123    Ok(json!({
124        "current_wal_lsn": row.get::<_, String>(0),
125        "current_wal_insert_lsn": row.get::<_, String>(1),
126        "wal_replay_paused": wal_replay_paused,
127        "wal_size_bytes": row.get::<_, i64>(2),
128        "in_recovery": in_recovery,
129    }))
130}
131
132/// 40. Show base backup progress
133pub async fn show_base_backup_progress(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
134    // `pg_stat_progress_basebackup` was added in PostgreSQL 13.
135    // No other view name exists for this purpose.
136    let query = match client.query_one(
137        "SELECT count(*) FROM pg_class WHERE relname = 'pg_stat_progress_basebackup'", &[]
138    ).await {
139        Ok(r) if r.get::<_, i64>(0) > 0 => {
140            "SELECT phase, backup_total, backup_streamed, tablespaces_total, tablespaces_streamed
141             FROM pg_stat_progress_basebackup WHERE phase IS NOT NULL"
142        }
143        _ => {
144            // PG < 13 does not have progress reporting for base backups
145            return Ok(json!({
146                "status": "unavailable",
147                "message": "Base backup progress requires PostgreSQL 13+ (pg_stat_progress_basebackup view not found)"
148            }));
149        }
150    };
151    let rows = client
152        .query(query, &[])
153        .await;
154
155    match rows {
156        Ok(r) => {
157            if r.is_empty() {
158                return Ok(json!({
159                    "status": "no_backup",
160                    "message": "No base backup in progress"
161                }));
162            }
163
164            let row = &r[0];
165
166            Ok(json!({
167                "phase": row.get::<_, String>(0),
168                "backup_total": row.get::<_, Option<i64>>(1),
169                "backup_streamed": row.get::<_, Option<i64>>(2),
170                "tablespaces_total": row.get::<_, i64>(3),
171                "tablespaces_streamed": row.get::<_, i64>(4),
172            }))
173        }
174        Err(_) => {
175            Ok(json!({
176                "status": "unavailable",
177                "message": "Base backup progress not available on this PostgreSQL version"
178            }))
179        }
180    }
181}