database_replicator/replication/
monitor.rs1use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7#[derive(Debug, Clone)]
9pub struct SourceReplicationStats {
10 pub application_name: String,
11 pub state: String,
12 pub sent_lsn: String,
13 pub write_lsn: String,
14 pub flush_lsn: String,
15 pub replay_lsn: String,
16 pub write_lag_ms: Option<i64>,
17 pub flush_lag_ms: Option<i64>,
18 pub replay_lag_ms: Option<i64>,
19}
20
21#[derive(Debug, Clone)]
23pub struct SubscriptionStats {
24 pub subscription_name: String,
25 pub pid: Option<i32>,
26 pub received_lsn: Option<String>,
27 pub latest_end_lsn: Option<String>,
28 pub state: String,
29}
30
31pub async fn get_replication_lag(
34 client: &Client,
35 subscription_name: Option<&str>,
36) -> Result<Vec<SourceReplicationStats>> {
37 if let Some(name) = subscription_name {
38 crate::utils::validate_postgres_identifier(name).context("Invalid subscription name")?;
39 }
40
41 let rows = if let Some(sub_name) = subscription_name {
42 client
43 .query(
44 "SELECT
45 application_name,
46 state,
47 sent_lsn::text,
48 write_lsn::text,
49 flush_lsn::text,
50 replay_lsn::text,
51 EXTRACT(EPOCH FROM write_lag) * 1000 as write_lag_ms,
52 EXTRACT(EPOCH FROM flush_lag) * 1000 as flush_lag_ms,
53 EXTRACT(EPOCH FROM replay_lag) * 1000 as replay_lag_ms
54 FROM pg_stat_replication
55 WHERE application_name = $1",
56 &[&sub_name],
57 )
58 .await
59 } else {
60 client
61 .query(
62 "SELECT
63 application_name,
64 state,
65 sent_lsn::text,
66 write_lsn::text,
67 flush_lsn::text,
68 replay_lsn::text,
69 EXTRACT(EPOCH FROM write_lag) * 1000 as write_lag_ms,
70 EXTRACT(EPOCH FROM flush_lag) * 1000 as flush_lag_ms,
71 EXTRACT(EPOCH FROM replay_lag) * 1000 as replay_lag_ms
72 FROM pg_stat_replication",
73 &[],
74 )
75 .await
76 }
77 .context("Failed to query replication statistics")?;
78
79 let mut stats = Vec::new();
80 for row in rows {
81 stats.push(SourceReplicationStats {
82 application_name: row.get(0),
83 state: row.get(1),
84 sent_lsn: row.get(2),
85 write_lsn: row.get(3),
86 flush_lsn: row.get(4),
87 replay_lsn: row.get(5),
88 write_lag_ms: row.get(6),
89 flush_lag_ms: row.get(7),
90 replay_lag_ms: row.get(8),
91 });
92 }
93
94 Ok(stats)
95}
96
97pub async fn get_subscription_status(
100 client: &Client,
101 subscription_name: Option<&str>,
102) -> Result<Vec<SubscriptionStats>> {
103 if let Some(name) = subscription_name {
104 crate::utils::validate_postgres_identifier(name).context("Invalid subscription name")?;
105 }
106
107 let rows = if let Some(sub_name) = subscription_name {
108 client
109 .query(
110 "SELECT
111 subname,
112 pid,
113 received_lsn::text,
114 latest_end_lsn::text,
115 srsubstate
116 FROM pg_stat_subscription
117 WHERE subname = $1",
118 &[&sub_name],
119 )
120 .await
121 } else {
122 client
123 .query(
124 "SELECT
125 subname,
126 pid,
127 received_lsn::text,
128 latest_end_lsn::text,
129 srsubstate
130 FROM pg_stat_subscription",
131 &[],
132 )
133 .await
134 }
135 .context("Failed to query subscription statistics")?;
136
137 let mut stats = Vec::new();
138 for row in rows {
139 stats.push(SubscriptionStats {
140 subscription_name: row.get(0),
141 pid: row.get(1),
142 received_lsn: row.get(2),
143 latest_end_lsn: row.get(3),
144 state: row.get(4),
145 });
146 }
147
148 Ok(stats)
149}
150
151pub async fn is_replication_caught_up(
154 client: &Client,
155 subscription_name: Option<&str>,
156) -> Result<bool> {
157 let stats = get_replication_lag(client, subscription_name).await?;
158
159 if stats.is_empty() {
160 return Ok(false);
162 }
163
164 for stat in stats {
165 if let Some(lag_ms) = stat.replay_lag_ms {
167 if lag_ms > 1000 {
168 return Ok(false);
169 }
170 } else {
171 return Ok(false);
173 }
174 }
175
176 Ok(true)
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::postgres::connect;
183
184 #[tokio::test]
185 #[ignore]
186 async fn test_get_replication_lag() {
187 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
189 let client = connect(&source_url).await.unwrap();
190
191 let result = get_replication_lag(&client, None).await;
192 match &result {
193 Ok(stats) => {
194 println!("✓ Replication lag query succeeded");
195 println!("Found {} replication slots", stats.len());
196 for stat in stats {
197 println!(
198 " - {}: {} (replay lag: {:?}ms)",
199 stat.application_name, stat.state, stat.replay_lag_ms
200 );
201 }
202 }
203 Err(e) => {
204 println!("Error querying replication lag: {:?}", e);
205 if !e.to_string().contains("relation") && !e.to_string().contains("permission") {
207 panic!("Unexpected error: {:?}", e);
208 }
209 }
210 }
211 assert!(result.is_ok());
212 }
213
214 #[tokio::test]
215 #[ignore]
216 async fn test_get_subscription_status() {
217 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
219 let client = connect(&target_url).await.unwrap();
220
221 let result = get_subscription_status(&client, None).await;
222 match &result {
223 Ok(stats) => {
224 println!("✓ Subscription status query succeeded");
225 println!("Found {} subscriptions", stats.len());
226 for stat in stats {
227 println!(
228 " - {}: state={} (pid: {:?})",
229 stat.subscription_name, stat.state, stat.pid
230 );
231 }
232 }
233 Err(e) => {
234 println!("Error querying subscription status: {:?}", e);
235 if !e.to_string().contains("relation") && !e.to_string().contains("permission") {
237 panic!("Unexpected error: {:?}", e);
238 }
239 }
240 }
241 assert!(result.is_ok());
242 }
243
244 #[tokio::test]
245 #[ignore]
246 async fn test_is_replication_caught_up() {
247 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
248 let client = connect(&source_url).await.unwrap();
249
250 let result = is_replication_caught_up(&client, None).await;
251 match &result {
252 Ok(caught_up) => {
253 println!("✓ Caught up check succeeded: {}", caught_up);
254 }
255 Err(e) => {
256 println!("Error checking if caught up: {:?}", e);
257 if !e.to_string().contains("relation") && !e.to_string().contains("permission") {
259 panic!("Unexpected error: {:?}", e);
260 }
261 }
262 }
263 assert!(result.is_ok());
264 }
265
266 #[tokio::test]
267 #[ignore]
268 async fn test_get_replication_lag_with_name() {
269 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
270 let client = connect(&source_url).await.unwrap();
271
272 let result = get_replication_lag(&client, Some("seren_migration_sub")).await;
274 match &result {
275 Ok(stats) => {
276 println!("✓ Named replication lag query succeeded");
277 println!("Found {} matching replication slots", stats.len());
278 }
279 Err(e) => {
280 println!("Error querying named replication lag: {:?}", e);
281 }
282 }
283 assert!(result.is_ok());
284 }
285}