Skip to main content

mcp_postgres/actions/
replication.rs

1use crate::errors::Result as MCPResult;
2use serde_json::{Value, json};
3use tokio_postgres::Client;
4
5/// 36. Show replication status
6pub async fn show_replication_status(
7    client: &Client,
8    _params: &Option<&Value>,
9) -> MCPResult<Value> {
10    let (in_recovery,): (bool,) = client
11        .query_one("SELECT pg_is_in_recovery()", &[])
12        .await
13        .map(|r| (r.get(0),))?;
14
15    if !in_recovery {
16        return Ok(json!({
17            "is_wal_replay_paused": false,
18            "last_wal_receive_lsn": null,
19            "last_wal_replay_lsn": null,
20            "uptime": null,
21            "in_recovery": false,
22            "hint": "Server is a primary, not a replica"
23        }));
24    }
25
26    let rows = client
27        .query(
28            "SELECT pg_is_wal_replay_paused(), pg_last_wal_receive_lsn(),
29                    pg_last_wal_replay_lsn(), now() - pg_postmaster_start_time() as uptime",
30            &[],
31        )
32        .await?;
33
34    let row = &rows[0];
35
36    Ok(json!({
37        "is_wal_replay_paused": row.get::<_, bool>(0),
38        "last_wal_receive_lsn": row.get::<_, Option<String>>(1),
39        "last_wal_replay_lsn": row.get::<_, Option<String>>(2),
40        "uptime": row.get::<_, Option<String>>(3),
41        "in_recovery": true,
42    }))
43}
44
45/// 37. List replication slots
46pub async fn list_replication_slots(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
47    let rows = client
48        .query(
49            "SELECT slot_name, slot_type, database::text, active, restart_lsn::text, confirmed_flush_lsn::text
50             FROM pg_replication_slots
51             ORDER BY slot_name",
52            &[],
53        )
54        .await?;
55
56    let slots: Vec<Value> = rows
57        .iter()
58        .map(|row| {
59            json!({
60                "slot_name": row.get::<_, String>(0),
61                "slot_type": row.get::<_, String>(1),
62                "database": row.get::<_, Option<String>>(2),
63                "active": row.get::<_, bool>(3),
64                "restart_lsn": row.get::<_, Option<String>>(4),
65                "confirmed_flush_lsn": row.get::<_, Option<String>>(5),
66            })
67        })
68        .collect();
69
70    Ok(json!({ "replication_slots": slots }))
71}
72
73/// 38. List standby servers
74pub async fn list_standby_servers(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
75    let rows = client
76        .query(
77            "SELECT client_addr, client_port, state, sync_state, write_lag, flush_lag, replay_lag
78             FROM pg_stat_replication
79             ORDER BY client_addr, client_port",
80            &[],
81        )
82        .await?;
83
84    let standbys: Vec<Value> = rows
85        .iter()
86        .map(|row| {
87            json!({
88                "client_address": row.get::<_, Option<String>>(0),
89                "client_port": row.get::<_, Option<i32>>(1),
90                "state": row.get::<_, String>(2),
91                "sync_state": row.get::<_, String>(3),
92                "write_lag": row.get::<_, Option<String>>(4),
93                "flush_lag": row.get::<_, Option<String>>(5),
94                "replay_lag": row.get::<_, Option<String>>(6),
95            })
96        })
97        .collect();
98
99    Ok(json!({ "standbys": standbys }))
100}
101
102/// 39. Show WAL info
103pub async fn show_wal_info(client: &Client, _params: &Option<&Value>) -> MCPResult<Value> {
104    let (in_recovery,): (bool,) = client
105        .query_one("SELECT pg_is_in_recovery()", &[])
106        .await
107        .map(|r| (r.get(0),))?;
108
109    let wal_replay_paused = if in_recovery {
110        let r = client
111            .query_one("SELECT pg_is_wal_replay_paused()", &[])
112            .await?;
113        Some(r.get::<_, bool>(0))
114    } else {
115        None
116    };
117
118    let rows = client
119        .query(
120            "SELECT pg_current_wal_lsn()::text, pg_current_wal_insert_lsn()::text,
121                    pg_wal_lsn_diff(pg_current_wal_lsn(), '0/0')::bigint as bytes",
122            &[],
123        )
124        .await?;
125
126    let row = &rows[0];
127
128    Ok(json!({
129        "current_wal_lsn": row.get::<_, String>(0),
130        "current_wal_insert_lsn": row.get::<_, String>(1),
131        "wal_replay_paused": wal_replay_paused,
132        "wal_size_bytes": row.get::<_, i64>(2),
133        "in_recovery": in_recovery,
134    }))
135}
136
137/// 40. Show base backup progress
138pub async fn show_base_backup_progress(
139    client: &Client,
140    _params: &Option<&Value>,
141) -> MCPResult<Value> {
142    // `pg_stat_progress_basebackup` was added in PostgreSQL 13.
143    // No other view name exists for this purpose.
144    let query = match client
145        .query_one(
146            "SELECT count(*) FROM pg_class WHERE relname = 'pg_stat_progress_basebackup'",
147            &[],
148        )
149        .await
150    {
151        Ok(r) if r.get::<_, i64>(0) > 0 => {
152            "SELECT phase, backup_total, backup_streamed, tablespaces_total, tablespaces_streamed
153             FROM pg_stat_progress_basebackup WHERE phase IS NOT NULL"
154        }
155        _ => {
156            // PG < 13 does not have progress reporting for base backups
157            return Ok(json!({
158                "status": "unavailable",
159                "message": "Base backup progress requires PostgreSQL 13+ (pg_stat_progress_basebackup view not found)"
160            }));
161        }
162    };
163    let rows = client.query(query, &[]).await;
164
165    match rows {
166        Ok(r) => {
167            if r.is_empty() {
168                return Ok(json!({
169                    "status": "no_backup",
170                    "message": "No base backup in progress"
171                }));
172            }
173
174            let row = &r[0];
175
176            Ok(json!({
177                "phase": row.get::<_, String>(0),
178                "backup_total": row.get::<_, Option<i64>>(1),
179                "backup_streamed": row.get::<_, Option<i64>>(2),
180                "tablespaces_total": row.get::<_, i64>(3),
181                "tablespaces_streamed": row.get::<_, i64>(4),
182            }))
183        }
184        Err(_) => Ok(json!({
185            "status": "unavailable",
186            "message": "Base backup progress not available on this PostgreSQL version"
187        })),
188    }
189}