database_replicator/commands/
status.rs1use crate::replication::{get_replication_lag, get_subscription_status, is_replication_caught_up};
5use crate::{migration, postgres::connect};
6use anyhow::{Context, Result};
7
8fn format_duration(ms: i64) -> String {
10 if ms < 1000 {
11 format!("{}ms", ms)
12 } else if ms < 60_000 {
13 format!("{:.1}s", ms as f64 / 1000.0)
14 } else if ms < 3_600_000 {
15 let minutes = ms / 60_000;
16 let seconds = (ms % 60_000) / 1000;
17 format!("{}m {}s", minutes, seconds)
18 } else {
19 let hours = ms / 3_600_000;
20 let minutes = (ms % 3_600_000) / 60_000;
21 format!("{}h {}m", hours, minutes)
22 }
23}
24
25pub async fn status(
87 source_url: &str,
88 target_url: &str,
89 filter: Option<crate::filters::ReplicationFilter>,
90) -> Result<()> {
91 let filter = filter.unwrap_or_else(crate::filters::ReplicationFilter::empty);
92 let sub_name_template = "seren_migration_sub";
93
94 tracing::info!("Checking replication status...");
95 tracing::info!("");
96
97 crate::utils::validate_source_target_different(source_url, target_url)
99 .context("Source and target validation failed")?;
100 tracing::info!("✓ Verified source and target are different databases");
101 tracing::info!("");
102
103 tracing::info!("Connecting to source database...");
105 let source_client = connect(source_url)
106 .await
107 .context("Failed to connect to source database")?;
108
109 tracing::info!("Discovering databases on source...");
111 let all_databases = migration::list_databases(&source_client)
112 .await
113 .context("Failed to list databases on source")?;
114
115 let databases: Vec<_> = all_databases
117 .into_iter()
118 .filter(|db| filter.should_replicate_database(&db.name))
119 .collect();
120
121 if databases.is_empty() {
122 tracing::warn!("⚠ No databases matched the filter criteria");
123 tracing::warn!(" No replication status to show");
124 return Ok(());
125 }
126
127 tracing::info!("Found {} database(s) to check:", databases.len());
128 for db in &databases {
129 tracing::info!(" - {}", db.name);
130 }
131 tracing::info!("");
132
133 tracing::info!("Connecting to target database...");
135 let target_client = connect(target_url)
136 .await
137 .context("Failed to connect to target database")?;
138 tracing::info!("");
139
140 tracing::info!("========================================");
142 tracing::info!("Replication Status Report");
143 tracing::info!("========================================");
144 tracing::info!("");
145
146 let mut all_caught_up = true;
147 let mut any_active = false;
148
149 for db in &databases {
150 let sub_name = if databases.len() == 1 {
152 sub_name_template.to_string()
154 } else {
155 format!("{}_{}", sub_name_template, db.name)
157 };
158
159 tracing::info!("Database: '{}'", db.name);
160 tracing::info!("Subscription: '{}'", sub_name);
161 tracing::info!("");
162
163 let source_stats = get_replication_lag(&source_client, Some(&sub_name))
165 .await
166 .context(format!(
167 "Failed to query replication lag for database '{}'",
168 db.name
169 ))?;
170
171 let target_stats = get_subscription_status(&target_client, Some(&sub_name))
173 .await
174 .context(format!(
175 "Failed to query subscription status for database '{}'",
176 db.name
177 ))?;
178
179 let caught_up = is_replication_caught_up(&source_client, Some(&sub_name))
181 .await
182 .unwrap_or(false);
183
184 if source_stats.is_empty() {
185 tracing::warn!("⚠ No active replication found for this database");
186 tracing::warn!(" Subscription '{}' may not be set up yet", sub_name);
187 tracing::info!("");
188 all_caught_up = false;
189 } else {
190 any_active = true;
191 for stat in &source_stats {
192 tracing::info!("Source Replication Slot:");
193 tracing::info!(" Application: {}", stat.application_name);
194 tracing::info!(" State: {}", stat.state);
195 tracing::info!(" Sent LSN: {}", stat.sent_lsn);
196 tracing::info!(" Write LSN: {}", stat.write_lsn);
197 tracing::info!(" Flush LSN: {}", stat.flush_lsn);
198 tracing::info!(" Replay LSN: {}", stat.replay_lsn);
199
200 if let Some(lag) = stat.replay_lag_ms {
201 tracing::info!(" Replay Lag: {}", format_duration(lag));
202 } else {
203 tracing::info!(" Replay Lag: N/A");
204 }
205
206 if let Some(lag) = stat.flush_lag_ms {
207 tracing::info!(" Flush Lag: {}", format_duration(lag));
208 }
209
210 if let Some(lag) = stat.write_lag_ms {
211 tracing::info!(" Write Lag: {}", format_duration(lag));
212 }
213
214 tracing::info!("");
215 }
216 }
217
218 if target_stats.is_empty() {
219 tracing::warn!("⚠ No subscription found on target");
220 tracing::warn!(" Subscription '{}' may not exist", sub_name);
221 tracing::info!("");
222 all_caught_up = false;
223 } else {
224 for stat in &target_stats {
225 tracing::info!("Target Subscription:");
226 tracing::info!(" Name: {}", stat.subscription_name);
227
228 let state_str = match stat.state.as_str() {
229 "i" => "Initializing",
230 "d" => "Copying data",
231 "s" => "Syncing",
232 "r" => "Ready (streaming)",
233 _ => &stat.state,
234 };
235 tracing::info!(" State: {}", state_str);
236
237 if let Some(pid) = stat.pid {
238 tracing::info!(" Worker PID: {}", pid);
239 } else {
240 tracing::info!(" Worker PID: Not running");
241 }
242
243 if let Some(lsn) = &stat.received_lsn {
244 tracing::info!(" Received LSN: {}", lsn);
245 }
246
247 if let Some(lsn) = &stat.latest_end_lsn {
248 tracing::info!(" Latest End LSN: {}", lsn);
249 }
250
251 tracing::info!("");
252 }
253 }
254
255 if caught_up {
257 tracing::info!("✓ Database '{}' is CAUGHT UP", db.name);
258 } else {
259 tracing::warn!("⚠ Database '{}' is LAGGING or NOT ACTIVE", db.name);
260 all_caught_up = false;
261 }
262
263 tracing::info!("");
264 tracing::info!("----------------------------------------");
265 tracing::info!("");
266 }
267
268 tracing::info!("========================================");
270 tracing::info!("Overall Status Summary");
271 tracing::info!("========================================");
272 if all_caught_up && any_active {
273 tracing::info!("✓ All databases are CAUGHT UP (lag < 1s)");
274 tracing::info!(" Your target databases are fully in sync!");
275 } else if !any_active {
276 tracing::warn!("✗ Replication is NOT ACTIVE");
277 tracing::warn!(" Run 'sync' command to set up replication");
278 } else {
279 tracing::warn!("⚠ Some databases are LAGGING or NOT ACTIVE");
280 tracing::warn!(" Wait for replication to catch up before cutover");
281 }
282 tracing::info!("========================================");
283
284 Ok(())
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn test_format_duration() {
293 assert_eq!(format_duration(0), "0ms");
294 assert_eq!(format_duration(500), "500ms");
295 assert_eq!(format_duration(999), "999ms");
296 assert_eq!(format_duration(1000), "1.0s");
297 assert_eq!(format_duration(1500), "1.5s");
298 assert_eq!(format_duration(59999), "60.0s");
299 assert_eq!(format_duration(60000), "1m 0s");
300 assert_eq!(format_duration(65000), "1m 5s");
301 assert_eq!(format_duration(135000), "2m 15s");
302 assert_eq!(format_duration(3600000), "1h 0m");
303 assert_eq!(format_duration(3660000), "1h 1m");
304 }
305
306 #[tokio::test]
307 #[ignore]
308 async fn test_status_command() {
309 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
311 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
312
313 let result = status(&source_url, &target_url, None).await;
314
315 match &result {
316 Ok(_) => println!("✓ Status command completed successfully"),
317 Err(e) => {
318 println!("Error in status command: {:?}", e);
319 if !e.to_string().contains("not supported") && !e.to_string().contains("permission")
321 {
322 panic!("Unexpected error: {:?}", e);
323 }
324 }
325 }
326
327 assert!(result.is_ok(), "Status command failed: {:?}", result);
328 }
329
330 #[tokio::test]
331 #[ignore]
332 async fn test_status_with_defaults() {
333 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
334 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
335
336 let result = status(&source_url, &target_url, None).await;
337
338 match &result {
339 Ok(_) => println!("✓ Status with defaults completed successfully"),
340 Err(e) => {
341 println!("Error in status with defaults: {:?}", e);
342 }
343 }
344
345 assert!(result.is_ok(), "Status with defaults failed: {:?}", result);
346 }
347
348 #[tokio::test]
349 #[ignore]
350 async fn test_status_with_database_filter() {
351 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
352 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
353
354 let filter = crate::filters::ReplicationFilter::new(
356 Some(vec!["postgres".to_string()]),
357 None,
358 None,
359 None,
360 )
361 .expect("Failed to create filter");
362
363 let result = status(&source_url, &target_url, Some(filter)).await;
364
365 match &result {
366 Ok(_) => println!("✓ Status with database filter completed successfully"),
367 Err(e) => {
368 println!("Error in status with database filter: {:?}", e);
369 }
370 }
371
372 assert!(
373 result.is_ok(),
374 "Status with database filter failed: {:?}",
375 result
376 );
377 }
378
379 #[tokio::test]
380 #[ignore]
381 async fn test_status_with_no_matching_databases() {
382 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
383 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
384
385 let filter = crate::filters::ReplicationFilter::new(
387 Some(vec!["nonexistent_database".to_string()]),
388 None,
389 None,
390 None,
391 )
392 .expect("Failed to create filter");
393
394 let result = status(&source_url, &target_url, Some(filter)).await;
395
396 assert!(result.is_ok(), "Status should succeed even with no matches");
398 }
399}