database_replicator/replication/
monitor.rs

1// ABOUTME: Replication monitoring utilities
2// ABOUTME: Queries replication status and lag from source and target databases
3
4use anyhow::{Context, Result};
5use tokio_postgres::Client;
6
7/// Replication statistics from the source database (publisher)
8#[derive(Debug, Clone)]
9pub struct SourceReplicationStats {
10    pub application_name: String,
11    pub state: String,
12    pub sent_lsn: String,
13    pub write_lsn: String,
14    pub flush_lsn: String,
15    pub replay_lsn: String,
16    pub write_lag_ms: Option<i64>,
17    pub flush_lag_ms: Option<i64>,
18    pub replay_lag_ms: Option<i64>,
19}
20
21/// Subscription statistics from the target database (subscriber)
22#[derive(Debug, Clone)]
23pub struct SubscriptionStats {
24    pub subscription_name: String,
25    pub pid: Option<i32>,
26    pub received_lsn: Option<String>,
27    pub latest_end_lsn: Option<String>,
28    pub state: String,
29}
30
31/// Get replication statistics from the source database
32/// Queries pg_stat_replication to see what's being replicated to subscribers
33pub async fn get_replication_lag(
34    client: &Client,
35    subscription_name: Option<&str>,
36) -> Result<Vec<SourceReplicationStats>> {
37    if let Some(name) = subscription_name {
38        crate::utils::validate_postgres_identifier(name).context("Invalid subscription name")?;
39    }
40
41    let rows = if let Some(sub_name) = subscription_name {
42        client
43            .query(
44                "SELECT
45                    application_name,
46                    state,
47                    sent_lsn::text,
48                    write_lsn::text,
49                    flush_lsn::text,
50                    replay_lsn::text,
51                    EXTRACT(EPOCH FROM write_lag) * 1000 as write_lag_ms,
52                    EXTRACT(EPOCH FROM flush_lag) * 1000 as flush_lag_ms,
53                    EXTRACT(EPOCH FROM replay_lag) * 1000 as replay_lag_ms
54                FROM pg_stat_replication
55                WHERE application_name = $1",
56                &[&sub_name],
57            )
58            .await
59    } else {
60        client
61            .query(
62                "SELECT
63                    application_name,
64                    state,
65                    sent_lsn::text,
66                    write_lsn::text,
67                    flush_lsn::text,
68                    replay_lsn::text,
69                    EXTRACT(EPOCH FROM write_lag) * 1000 as write_lag_ms,
70                    EXTRACT(EPOCH FROM flush_lag) * 1000 as flush_lag_ms,
71                    EXTRACT(EPOCH FROM replay_lag) * 1000 as replay_lag_ms
72                FROM pg_stat_replication",
73                &[],
74            )
75            .await
76    }
77    .context("Failed to query replication statistics")?;
78
79    let mut stats = Vec::new();
80    for row in rows {
81        stats.push(SourceReplicationStats {
82            application_name: row.get(0),
83            state: row.get(1),
84            sent_lsn: row.get(2),
85            write_lsn: row.get(3),
86            flush_lsn: row.get(4),
87            replay_lsn: row.get(5),
88            write_lag_ms: row.get(6),
89            flush_lag_ms: row.get(7),
90            replay_lag_ms: row.get(8),
91        });
92    }
93
94    Ok(stats)
95}
96
97/// Get subscription status from the target database
98/// Queries pg_stat_subscription to see subscription state and progress
99pub async fn get_subscription_status(
100    client: &Client,
101    subscription_name: Option<&str>,
102) -> Result<Vec<SubscriptionStats>> {
103    if let Some(name) = subscription_name {
104        crate::utils::validate_postgres_identifier(name).context("Invalid subscription name")?;
105    }
106
107    let rows = if let Some(sub_name) = subscription_name {
108        client
109            .query(
110                "SELECT
111                    subname,
112                    pid,
113                    received_lsn::text,
114                    latest_end_lsn::text,
115                    srsubstate
116                FROM pg_stat_subscription
117                WHERE subname = $1",
118                &[&sub_name],
119            )
120            .await
121    } else {
122        client
123            .query(
124                "SELECT
125                    subname,
126                    pid,
127                    received_lsn::text,
128                    latest_end_lsn::text,
129                    srsubstate
130                FROM pg_stat_subscription",
131                &[],
132            )
133            .await
134    }
135    .context("Failed to query subscription statistics")?;
136
137    let mut stats = Vec::new();
138    for row in rows {
139        stats.push(SubscriptionStats {
140            subscription_name: row.get(0),
141            pid: row.get(1),
142            received_lsn: row.get(2),
143            latest_end_lsn: row.get(3),
144            state: row.get(4),
145        });
146    }
147
148    Ok(stats)
149}
150
151/// Check if replication is caught up (no lag)
152/// Returns true if all replication slots have < 1 second of replay lag
153pub async fn is_replication_caught_up(
154    client: &Client,
155    subscription_name: Option<&str>,
156) -> Result<bool> {
157    let stats = get_replication_lag(client, subscription_name).await?;
158
159    if stats.is_empty() {
160        // No active replication
161        return Ok(false);
162    }
163
164    for stat in stats {
165        // Check if replay lag is > 1000ms (1 second)
166        if let Some(lag_ms) = stat.replay_lag_ms {
167            if lag_ms > 1000 {
168                return Ok(false);
169            }
170        } else {
171            // If lag is NULL, it might be too far behind or not streaming yet
172            return Ok(false);
173        }
174    }
175
176    Ok(true)
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::postgres::connect;
183
184    #[tokio::test]
185    #[ignore]
186    async fn test_get_replication_lag() {
187        // This test requires a source database with active replication
188        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
189        let client = connect(&source_url).await.unwrap();
190
191        let result = get_replication_lag(&client, None).await;
192        match &result {
193            Ok(stats) => {
194                println!("✓ Replication lag query succeeded");
195                println!("Found {} replication slots", stats.len());
196                for stat in stats {
197                    println!(
198                        "  - {}: {} (replay lag: {:?}ms)",
199                        stat.application_name, stat.state, stat.replay_lag_ms
200                    );
201                }
202            }
203            Err(e) => {
204                println!("Error querying replication lag: {:?}", e);
205                // It's okay if no replication is active
206                if !e.to_string().contains("relation") && !e.to_string().contains("permission") {
207                    panic!("Unexpected error: {:?}", e);
208                }
209            }
210        }
211        assert!(result.is_ok());
212    }
213
214    #[tokio::test]
215    #[ignore]
216    async fn test_get_subscription_status() {
217        // This test requires a target database with active subscription
218        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
219        let client = connect(&target_url).await.unwrap();
220
221        let result = get_subscription_status(&client, None).await;
222        match &result {
223            Ok(stats) => {
224                println!("✓ Subscription status query succeeded");
225                println!("Found {} subscriptions", stats.len());
226                for stat in stats {
227                    println!(
228                        "  - {}: state={} (pid: {:?})",
229                        stat.subscription_name, stat.state, stat.pid
230                    );
231                }
232            }
233            Err(e) => {
234                println!("Error querying subscription status: {:?}", e);
235                // It's okay if no subscriptions exist
236                if !e.to_string().contains("relation") && !e.to_string().contains("permission") {
237                    panic!("Unexpected error: {:?}", e);
238                }
239            }
240        }
241        assert!(result.is_ok());
242    }
243
244    #[tokio::test]
245    #[ignore]
246    async fn test_is_replication_caught_up() {
247        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
248        let client = connect(&source_url).await.unwrap();
249
250        let result = is_replication_caught_up(&client, None).await;
251        match &result {
252            Ok(caught_up) => {
253                println!("✓ Caught up check succeeded: {}", caught_up);
254            }
255            Err(e) => {
256                println!("Error checking if caught up: {:?}", e);
257                // It's okay if no replication is active
258                if !e.to_string().contains("relation") && !e.to_string().contains("permission") {
259                    panic!("Unexpected error: {:?}", e);
260                }
261            }
262        }
263        assert!(result.is_ok());
264    }
265
266    #[tokio::test]
267    #[ignore]
268    async fn test_get_replication_lag_with_name() {
269        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
270        let client = connect(&source_url).await.unwrap();
271
272        // Query for a specific subscription name
273        let result = get_replication_lag(&client, Some("seren_migration_sub")).await;
274        match &result {
275            Ok(stats) => {
276                println!("✓ Named replication lag query succeeded");
277                println!("Found {} matching replication slots", stats.len());
278            }
279            Err(e) => {
280                println!("Error querying named replication lag: {:?}", e);
281            }
282        }
283        assert!(result.is_ok());
284    }
285}