database_replicator/commands/
status.rs

1// ABOUTME: Status command implementation - Check replication health
2// ABOUTME: Displays real-time replication lag and subscription status
3
4use crate::replication::{get_replication_lag, get_subscription_status, is_replication_caught_up};
5use crate::{migration, postgres::connect};
6use anyhow::{Context, Result};
7
8/// Format milliseconds into a human-readable duration string
9fn format_duration(ms: i64) -> String {
10    if ms < 1000 {
11        format!("{}ms", ms)
12    } else if ms < 60_000 {
13        format!("{:.1}s", ms as f64 / 1000.0)
14    } else if ms < 3_600_000 {
15        let minutes = ms / 60_000;
16        let seconds = (ms % 60_000) / 1000;
17        format!("{}m {}s", minutes, seconds)
18    } else {
19        let hours = ms / 3_600_000;
20        let minutes = (ms % 3_600_000) / 60_000;
21        format!("{}h {}m", hours, minutes)
22    }
23}
24
25/// Check replication status and display health information
26///
27/// This command performs Phase 4 of the migration process:
28/// 1. Discovers databases and filters them based on criteria
29/// 2. For each filtered database:
30///    - Queries pg_stat_replication on source for replication lag
31///    - Queries pg_stat_subscription on target for subscription status
32///    - Displays health information in human-readable format
33///
34/// Provides real-time monitoring of replication health including:
35/// - Replication lag (write/flush/replay) per database
36/// - Subscription state per database
37/// - Whether each database is caught up with source
38///
39/// # Arguments
40///
41/// * `source_url` - PostgreSQL connection string for source database
42/// * `target_url` - PostgreSQL connection string for target (Seren) database
43/// * `filter` - Optional replication filter for database selection
44///
45/// # Returns
46///
47/// Returns `Ok(())` after displaying status information.
48///
49/// # Errors
50///
51/// This function will return an error if:
52/// - Cannot connect to source or target database
53/// - Cannot discover databases on source
54/// - Cannot query replication statistics
55/// - Cannot query subscription status
56///
57/// # Examples
58///
59/// ```no_run
60/// # use anyhow::Result;
61/// # use database_replicator::commands::status;
62/// # use database_replicator::filters::ReplicationFilter;
63/// # async fn example() -> Result<()> {
64/// // Show status for all databases
65/// status(
66///     "postgresql://user:pass@source.example.com/postgres",
67///     "postgresql://user:pass@target.example.com/postgres",
68///     None
69/// ).await?;
70///
71/// // Show status for specific databases only
72/// let filter = ReplicationFilter::new(
73///     Some(vec!["mydb".to_string(), "analytics".to_string()]),
74///     None,
75///     None,
76///     None,
77/// )?;
78/// status(
79///     "postgresql://user:pass@source.example.com/postgres",
80///     "postgresql://user:pass@target.example.com/postgres",
81///     Some(filter)
82/// ).await?;
83/// # Ok(())
84/// # }
85/// ```
86pub async fn status(
87    source_url: &str,
88    target_url: &str,
89    filter: Option<crate::filters::ReplicationFilter>,
90) -> Result<()> {
91    let filter = filter.unwrap_or_else(crate::filters::ReplicationFilter::empty);
92    let sub_name_template = "seren_migration_sub";
93
94    tracing::info!("Checking replication status...");
95    tracing::info!("");
96
97    // Ensure source and target are different
98    crate::utils::validate_source_target_different(source_url, target_url)
99        .context("Source and target validation failed")?;
100    tracing::info!("✓ Verified source and target are different databases");
101    tracing::info!("");
102
103    // Connect to source database
104    tracing::info!("Connecting to source database...");
105    let source_client = connect(source_url)
106        .await
107        .context("Failed to connect to source database")?;
108
109    // Discover and filter databases
110    tracing::info!("Discovering databases on source...");
111    let all_databases = migration::list_databases(&source_client)
112        .await
113        .context("Failed to list databases on source")?;
114
115    // Apply filtering rules
116    let databases: Vec<_> = all_databases
117        .into_iter()
118        .filter(|db| filter.should_replicate_database(&db.name))
119        .collect();
120
121    if databases.is_empty() {
122        tracing::warn!("⚠ No databases matched the filter criteria");
123        tracing::warn!("  No replication status to show");
124        return Ok(());
125    }
126
127    tracing::info!("Found {} database(s) to check:", databases.len());
128    for db in &databases {
129        tracing::info!("  - {}", db.name);
130    }
131    tracing::info!("");
132
133    // Connect to target database
134    tracing::info!("Connecting to target database...");
135    let target_client = connect(target_url)
136        .await
137        .context("Failed to connect to target database")?;
138    tracing::info!("");
139
140    // Check status for each database
141    tracing::info!("========================================");
142    tracing::info!("Replication Status Report");
143    tracing::info!("========================================");
144    tracing::info!("");
145
146    let mut all_caught_up = true;
147    let mut any_active = false;
148
149    for db in &databases {
150        // Build subscription name for this database
151        let sub_name = if databases.len() == 1 {
152            // Single database - use template name as-is
153            sub_name_template.to_string()
154        } else {
155            // Multiple databases - append database name
156            format!("{}_{}", sub_name_template, db.name)
157        };
158
159        tracing::info!("Database: '{}'", db.name);
160        tracing::info!("Subscription: '{}'", sub_name);
161        tracing::info!("");
162
163        // Query replication lag from source
164        let source_stats = get_replication_lag(&source_client, Some(&sub_name))
165            .await
166            .context(format!(
167                "Failed to query replication lag for database '{}'",
168                db.name
169            ))?;
170
171        // Query subscription status from target
172        let target_stats = get_subscription_status(&target_client, Some(&sub_name))
173            .await
174            .context(format!(
175                "Failed to query subscription status for database '{}'",
176                db.name
177            ))?;
178
179        // Check if caught up
180        let caught_up = is_replication_caught_up(&source_client, Some(&sub_name))
181            .await
182            .unwrap_or(false);
183
184        if source_stats.is_empty() {
185            tracing::warn!("⚠ No active replication found for this database");
186            tracing::warn!("  Subscription '{}' may not be set up yet", sub_name);
187            tracing::info!("");
188            all_caught_up = false;
189        } else {
190            any_active = true;
191            for stat in &source_stats {
192                tracing::info!("Source Replication Slot:");
193                tracing::info!("  Application: {}", stat.application_name);
194                tracing::info!("  State: {}", stat.state);
195                tracing::info!("  Sent LSN: {}", stat.sent_lsn);
196                tracing::info!("  Write LSN: {}", stat.write_lsn);
197                tracing::info!("  Flush LSN: {}", stat.flush_lsn);
198                tracing::info!("  Replay LSN: {}", stat.replay_lsn);
199
200                if let Some(lag) = stat.replay_lag_ms {
201                    tracing::info!("  Replay Lag: {}", format_duration(lag));
202                } else {
203                    tracing::info!("  Replay Lag: N/A");
204                }
205
206                if let Some(lag) = stat.flush_lag_ms {
207                    tracing::info!("  Flush Lag: {}", format_duration(lag));
208                }
209
210                if let Some(lag) = stat.write_lag_ms {
211                    tracing::info!("  Write Lag: {}", format_duration(lag));
212                }
213
214                tracing::info!("");
215            }
216        }
217
218        if target_stats.is_empty() {
219            tracing::warn!("⚠ No subscription found on target");
220            tracing::warn!("  Subscription '{}' may not exist", sub_name);
221            tracing::info!("");
222            all_caught_up = false;
223        } else {
224            for stat in &target_stats {
225                tracing::info!("Target Subscription:");
226                tracing::info!("  Name: {}", stat.subscription_name);
227
228                let state_str = match stat.state.as_str() {
229                    "i" => "Initializing",
230                    "d" => "Copying data",
231                    "s" => "Syncing",
232                    "r" => "Ready (streaming)",
233                    _ => &stat.state,
234                };
235                tracing::info!("  State: {}", state_str);
236
237                if let Some(pid) = stat.pid {
238                    tracing::info!("  Worker PID: {}", pid);
239                } else {
240                    tracing::info!("  Worker PID: Not running");
241                }
242
243                if let Some(lsn) = &stat.received_lsn {
244                    tracing::info!("  Received LSN: {}", lsn);
245                }
246
247                if let Some(lsn) = &stat.latest_end_lsn {
248                    tracing::info!("  Latest End LSN: {}", lsn);
249                }
250
251                tracing::info!("");
252            }
253        }
254
255        // Per-database summary
256        if caught_up {
257            tracing::info!("✓ Database '{}' is CAUGHT UP", db.name);
258        } else {
259            tracing::warn!("⚠ Database '{}' is LAGGING or NOT ACTIVE", db.name);
260            all_caught_up = false;
261        }
262
263        tracing::info!("");
264        tracing::info!("----------------------------------------");
265        tracing::info!("");
266    }
267
268    // Overall health summary
269    tracing::info!("========================================");
270    tracing::info!("Overall Status Summary");
271    tracing::info!("========================================");
272    if all_caught_up && any_active {
273        tracing::info!("✓ All databases are CAUGHT UP (lag < 1s)");
274        tracing::info!("  Your target databases are fully in sync!");
275    } else if !any_active {
276        tracing::warn!("✗ Replication is NOT ACTIVE");
277        tracing::warn!("  Run 'sync' command to set up replication");
278    } else {
279        tracing::warn!("⚠ Some databases are LAGGING or NOT ACTIVE");
280        tracing::warn!("  Wait for replication to catch up before cutover");
281    }
282    tracing::info!("========================================");
283
284    Ok(())
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_format_duration() {
293        assert_eq!(format_duration(0), "0ms");
294        assert_eq!(format_duration(500), "500ms");
295        assert_eq!(format_duration(999), "999ms");
296        assert_eq!(format_duration(1000), "1.0s");
297        assert_eq!(format_duration(1500), "1.5s");
298        assert_eq!(format_duration(59999), "60.0s");
299        assert_eq!(format_duration(60000), "1m 0s");
300        assert_eq!(format_duration(65000), "1m 5s");
301        assert_eq!(format_duration(135000), "2m 15s");
302        assert_eq!(format_duration(3600000), "1h 0m");
303        assert_eq!(format_duration(3660000), "1h 1m");
304    }
305
306    #[tokio::test]
307    #[ignore]
308    async fn test_status_command() {
309        // This test requires both source and target databases with active replication
310        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
311        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
312
313        let result = status(&source_url, &target_url, None).await;
314
315        match &result {
316            Ok(_) => println!("✓ Status command completed successfully"),
317            Err(e) => {
318                println!("Error in status command: {:?}", e);
319                // It's okay if replication is not set up yet
320                if !e.to_string().contains("not supported") && !e.to_string().contains("permission")
321                {
322                    panic!("Unexpected error: {:?}", e);
323                }
324            }
325        }
326
327        assert!(result.is_ok(), "Status command failed: {:?}", result);
328    }
329
330    #[tokio::test]
331    #[ignore]
332    async fn test_status_with_defaults() {
333        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
334        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
335
336        let result = status(&source_url, &target_url, None).await;
337
338        match &result {
339            Ok(_) => println!("✓ Status with defaults completed successfully"),
340            Err(e) => {
341                println!("Error in status with defaults: {:?}", e);
342            }
343        }
344
345        assert!(result.is_ok(), "Status with defaults failed: {:?}", result);
346    }
347
348    #[tokio::test]
349    #[ignore]
350    async fn test_status_with_database_filter() {
351        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
352        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
353
354        // Create filter that includes only postgres database
355        let filter = crate::filters::ReplicationFilter::new(
356            Some(vec!["postgres".to_string()]),
357            None,
358            None,
359            None,
360        )
361        .expect("Failed to create filter");
362
363        let result = status(&source_url, &target_url, Some(filter)).await;
364
365        match &result {
366            Ok(_) => println!("✓ Status with database filter completed successfully"),
367            Err(e) => {
368                println!("Error in status with database filter: {:?}", e);
369            }
370        }
371
372        assert!(
373            result.is_ok(),
374            "Status with database filter failed: {:?}",
375            result
376        );
377    }
378
379    #[tokio::test]
380    #[ignore]
381    async fn test_status_with_no_matching_databases() {
382        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
383        let target_url = std::env::var("TEST_TARGET_URL").unwrap();
384
385        // Create filter that matches no databases
386        let filter = crate::filters::ReplicationFilter::new(
387            Some(vec!["nonexistent_database".to_string()]),
388            None,
389            None,
390            None,
391        )
392        .expect("Failed to create filter");
393
394        let result = status(&source_url, &target_url, Some(filter)).await;
395
396        // Should succeed but show no status (early return)
397        assert!(result.is_ok(), "Status should succeed even with no matches");
398    }
399}