database_replicator/commands/
sync.rs1use crate::migration;
5use crate::postgres::connect;
6use crate::replication::{
7 create_publication, create_subscription, detect_subscription_state, drop_subscription,
8 wait_for_sync, SubscriptionState,
9};
10use anyhow::{Context, Result};
11
12pub async fn sync(
89 source_url: &str,
90 target_url: &str,
91 filter: Option<crate::filters::ReplicationFilter>,
92 publication_name: Option<&str>,
93 subscription_name: Option<&str>,
94 sync_timeout_secs: Option<u64>,
95 force: bool,
96) -> Result<()> {
97 let pub_name_template = publication_name.unwrap_or("seren_migration_pub");
98 let sub_name_template = subscription_name.unwrap_or("seren_migration_sub");
99 let timeout = sync_timeout_secs.unwrap_or(300); let filter = filter.unwrap_or_else(crate::filters::ReplicationFilter::empty);
101
102 tracing::info!("Starting logical replication setup...");
103
104 crate::utils::validate_source_target_different(source_url, target_url)
106 .context("Source and target validation failed")?;
107 tracing::info!("✓ Verified source and target are different databases");
108
109 tracing::info!("Checking target wal_level for logical replication...");
111 let target_client = connect(target_url)
112 .await
113 .context("Failed to connect to target database")?;
114 let target_wal_level = crate::postgres::check_wal_level(&target_client).await?;
115
116 if target_wal_level != "logical" {
117 anyhow::bail!(
118 "Target database wal_level is set to '{}', but 'logical' is required for logical replication\n\
119 \n\
120 To fix this:\n\
121 \n\
122 Option 1: Set wal_level in postgresql.conf\n\
123 1. Edit postgresql.conf: wal_level = logical\n\
124 2. Restart PostgreSQL server\n\
125 3. Re-run this command\n\
126 \n\
127 Option 2: Skip continuous sync (snapshot only)\n\
128 Use the init command with --no-sync flag to perform initial snapshot without setting up logical replication\n\
129 \n\
130 Note: Some managed PostgreSQL services may require configuring wal_level through their control panel.",
131 target_wal_level
132 );
133 }
134 tracing::info!("✓ Target wal_level is set to 'logical' (logical replication supported)");
135
136 tracing::info!("Connecting to source database...");
138 let source_client = connect(source_url)
139 .await
140 .context("Failed to connect to source database")?;
141 tracing::info!("✓ Connected to source");
142
143 tracing::info!("Discovering databases on source...");
145 let all_databases = migration::list_databases(&source_client)
146 .await
147 .context("Failed to list databases on source")?;
148
149 let databases: Vec<_> = all_databases
151 .into_iter()
152 .filter(|db| filter.should_replicate_database(&db.name))
153 .collect();
154
155 if databases.is_empty() {
156 if filter.is_empty() {
157 tracing::warn!("⚠ No user databases found on source");
158 tracing::warn!(" This is unusual - the source database appears empty");
159 tracing::warn!(" Only template databases exist");
160 } else {
161 tracing::warn!("⚠ No databases matched the filter criteria");
162 tracing::warn!(" Check your --include-databases or --exclude-databases settings");
163 }
164 tracing::info!("✅ Logical replication setup complete (no databases to replicate)");
165 return Ok(());
166 }
167
168 tracing::info!(
169 "Found {} database(s) to replicate: {}",
170 databases.len(),
171 databases
172 .iter()
173 .map(|db| db.name.as_str())
174 .collect::<Vec<_>>()
175 .join(", ")
176 );
177
178 for db in &databases {
180 tracing::info!("");
181 tracing::info!(
182 "========================================\nDatabase: '{}'\n========================================",
183 db.name
184 );
185
186 let source_db_url = replace_database_in_url(source_url, &db.name).context(format!(
188 "Failed to build source URL for database '{}'",
189 db.name
190 ))?;
191 let target_db_url = replace_database_in_url(target_url, &db.name).context(format!(
192 "Failed to build target URL for database '{}'",
193 db.name
194 ))?;
195
196 let pub_name = if databases.len() == 1 {
198 pub_name_template.to_string()
200 } else {
201 format!("{}_{}", pub_name_template, db.name)
203 };
204
205 let sub_name = if databases.len() == 1 {
206 sub_name_template.to_string()
208 } else {
209 format!("{}_{}", sub_name_template, db.name)
211 };
212
213 tracing::info!("Publication: '{}'", pub_name);
214 tracing::info!("Subscription: '{}'", sub_name);
215
216 tracing::info!("Connecting to source database '{}'...", db.name);
218 let source_db_client = connect(&source_db_url).await.context(format!(
219 "Failed to connect to source database '{}'",
220 db.name
221 ))?;
222 tracing::info!("✓ Connected to source");
223
224 tracing::info!("Connecting to target database '{}'...", db.name);
225 let target_db_client = connect(&target_db_url).await.context(format!(
226 "Failed to connect to target database '{}'",
227 db.name
228 ))?;
229 tracing::info!("✓ Connected to target");
230
231 tracing::info!("Creating publication on source database...");
233 create_publication(&source_db_client, &db.name, &pub_name, &filter)
234 .await
235 .context(format!(
236 "Failed to create publication on source database '{}'",
237 db.name
238 ))?;
239
240 tracing::info!("Checking subscription state...");
242 let sub_state = detect_subscription_state(&target_db_client, &sub_name)
243 .await
244 .context(format!(
245 "Failed to detect subscription state for '{}'",
246 sub_name
247 ))?;
248
249 match sub_state {
250 SubscriptionState::Streaming => {
251 if force {
252 tracing::info!(
253 "⚠ Subscription '{}' is already streaming, but --force flag is set",
254 sub_name
255 );
256 tracing::info!("Dropping existing subscription...");
257 drop_subscription(&target_db_client, &sub_name)
258 .await
259 .context(format!("Failed to drop subscription '{}'", sub_name))?;
260 tracing::info!("Creating new subscription...");
261 create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
262 .await
263 .context(format!(
264 "Failed to create subscription on target database '{}'",
265 db.name
266 ))?;
267 tracing::info!(
268 "Waiting for initial sync to complete (timeout: {}s)...",
269 timeout
270 );
271 wait_for_sync(&target_db_client, &sub_name, timeout)
272 .await
273 .context(format!(
274 "Failed to wait for initial sync on database '{}'",
275 db.name
276 ))?;
277 } else {
278 tracing::info!(
279 "✓ Subscription '{}' is already streaming and healthy",
280 sub_name
281 );
282 tracing::info!(" Skipping subscription creation (use --force to recreate)");
283 }
284 }
285 SubscriptionState::Initializing
286 | SubscriptionState::Copying
287 | SubscriptionState::Syncing => {
288 tracing::info!(
289 "ℹ Subscription '{}' already exists and is syncing (state: {:?})",
290 sub_name,
291 sub_state
292 );
293 tracing::info!(
294 "Waiting for existing sync to complete (timeout: {}s)...",
295 timeout
296 );
297 wait_for_sync(&target_db_client, &sub_name, timeout)
298 .await
299 .context(format!(
300 "Failed to wait for existing sync on database '{}'",
301 db.name
302 ))?;
303 }
304 SubscriptionState::Error(err_state) => {
305 tracing::warn!(
306 "⚠ Subscription '{}' is in error state: {}",
307 sub_name,
308 err_state
309 );
310 if force {
311 tracing::info!("Dropping failed subscription and recreating...");
312 drop_subscription(&target_db_client, &sub_name)
313 .await
314 .context(format!("Failed to drop subscription '{}'", sub_name))?;
315 tracing::info!("Creating new subscription...");
316 create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
317 .await
318 .context(format!(
319 "Failed to create subscription on target database '{}'",
320 db.name
321 ))?;
322 tracing::info!(
323 "Waiting for initial sync to complete (timeout: {}s)...",
324 timeout
325 );
326 wait_for_sync(&target_db_client, &sub_name, timeout)
327 .await
328 .context(format!(
329 "Failed to wait for initial sync on database '{}'",
330 db.name
331 ))?;
332 } else {
333 anyhow::bail!(
334 "Subscription '{}' is in error state: {}.\n\
335 Use --force flag to drop and recreate the subscription.",
336 sub_name,
337 err_state
338 );
339 }
340 }
341 SubscriptionState::NotFound => {
342 tracing::info!("Creating subscription on target database...");
343 create_subscription(&target_db_client, &sub_name, &source_db_url, &pub_name)
344 .await
345 .context(format!(
346 "Failed to create subscription on target database '{}'",
347 db.name
348 ))?;
349 tracing::info!(
350 "Waiting for initial sync to complete (timeout: {}s)...",
351 timeout
352 );
353 wait_for_sync(&target_db_client, &sub_name, timeout)
354 .await
355 .context(format!(
356 "Failed to wait for initial sync on database '{}'",
357 db.name
358 ))?;
359 }
360 }
361
362 tracing::info!("✓ Replication active for database '{}'", db.name);
363 }
364
365 tracing::info!("");
366 tracing::info!("========================================");
367 tracing::info!("✓ Logical replication is now active!");
368 tracing::info!("========================================");
369 tracing::info!("");
370 tracing::info!(
371 "Changes on {} source database(s) will now continuously",
372 databases.len()
373 );
374 tracing::info!("replicate to the target.");
375 tracing::info!("");
376 tracing::info!("Next steps:");
377 tracing::info!(" 1. Run 'status' to monitor replication lag");
378 tracing::info!(" 2. Run 'verify' to validate data integrity");
379 tracing::info!(" 3. When ready, cutover to the target database");
380
381 Ok(())
382}
383
384fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
395 let parts: Vec<&str> = url.splitn(2, '?').collect();
397 let base_url = parts[0];
398 let query_params = parts.get(1);
399
400 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
402
403 if url_parts.len() != 2 {
404 anyhow::bail!("Invalid connection URL format: cannot replace database name");
405 }
406
407 let new_url = if let Some(params) = query_params {
409 format!("{}/{}?{}", url_parts[1], new_db_name, params)
410 } else {
411 format!("{}/{}", url_parts[1], new_db_name)
412 };
413
414 Ok(new_url)
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420
421 #[tokio::test]
422 #[ignore]
423 async fn test_sync_command() {
424 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
426 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
427
428 let pub_name = "test_sync_pub";
429 let sub_name = "test_sync_sub";
430 let timeout = 60; let result = sync(
433 &source_url,
434 &target_url,
435 None,
436 Some(pub_name),
437 Some(sub_name),
438 Some(timeout),
439 false,
440 )
441 .await;
442
443 match &result {
444 Ok(_) => println!("✓ Sync command completed successfully"),
445 Err(e) => {
446 println!("Error in sync command: {:?}", e);
447 if e.to_string().contains("not supported") || e.to_string().contains("permission") {
449 println!("Skipping test - database might not support logical replication");
450 return;
451 }
452 }
453 }
454
455 assert!(result.is_ok(), "Sync command failed: {:?}", result);
456
457 let target_client = connect(&target_url).await.unwrap();
459 crate::replication::drop_subscription(&target_client, sub_name)
460 .await
461 .unwrap();
462
463 let source_client = connect(&source_url).await.unwrap();
464 crate::replication::drop_publication(&source_client, pub_name)
465 .await
466 .unwrap();
467 }
468
469 #[tokio::test]
470 #[ignore]
471 async fn test_sync_with_defaults() {
472 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
473 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
474
475 let result = sync(&source_url, &target_url, None, None, None, Some(60), false).await;
476
477 match &result {
478 Ok(_) => println!("✓ Sync with defaults completed successfully"),
479 Err(e) => {
480 println!("Error in sync with defaults: {:?}", e);
481 if e.to_string().contains("not supported") || e.to_string().contains("permission") {
482 println!("Skipping test - database might not support logical replication");
483 return;
484 }
485 }
486 }
487
488 assert!(result.is_ok(), "Sync with defaults failed: {:?}", result);
489
490 let target_client = connect(&target_url).await.unwrap();
492 crate::replication::drop_subscription(&target_client, "seren_migration_sub")
493 .await
494 .unwrap();
495
496 let source_client = connect(&source_url).await.unwrap();
497 crate::replication::drop_publication(&source_client, "seren_migration_pub")
498 .await
499 .unwrap();
500 }
501
502 #[test]
503 fn test_replace_database_in_url() {
504 let url = "postgresql://user:pass@localhost:5432/olddb";
506 let new_url = replace_database_in_url(url, "newdb").unwrap();
507 assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
508
509 let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
511 let new_url = replace_database_in_url(url, "newdb").unwrap();
512 assert_eq!(
513 new_url,
514 "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
515 );
516
517 let url = "postgresql://user:pass@localhost/olddb";
519 let new_url = replace_database_in_url(url, "newdb").unwrap();
520 assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
521 }
522
523 #[tokio::test]
524 #[ignore]
525 async fn test_sync_with_database_filter() {
526 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
527 let target_url = std::env::var("TEST_TARGET_URL").unwrap();
528
529 println!("Testing sync command with database filter...");
530 println!("⚠ WARNING: This will set up replication for filtered databases!");
531
532 let filter = crate::filters::ReplicationFilter::new(
534 Some(vec!["postgres".to_string()]), None,
536 None,
537 None,
538 )
539 .expect("Failed to create filter");
540
541 let result = sync(
542 &source_url,
543 &target_url,
544 Some(filter),
545 None,
546 None,
547 Some(60),
548 false,
549 )
550 .await;
551
552 match &result {
553 Ok(_) => {
554 println!("✓ Sync with database filter completed successfully");
555 }
556 Err(e) => {
557 println!("Sync with database filter failed: {:?}", e);
558 if e.to_string().contains("not supported") || e.to_string().contains("permission") {
559 println!("Skipping test - database might not support logical replication");
560 return;
561 }
562 }
563 }
564
565 assert!(result.is_ok(), "Sync with database filter failed");
566
567 let db_url = replace_database_in_url(&target_url, "postgres").unwrap();
569 let target_client = connect(&db_url).await.unwrap();
570 crate::replication::drop_subscription(&target_client, "seren_migration_sub")
571 .await
572 .unwrap();
573
574 let source_url_db = replace_database_in_url(&source_url, "postgres").unwrap();
575 let source_client = connect(&source_url_db).await.unwrap();
576 crate::replication::drop_publication(&source_client, "seren_migration_pub")
577 .await
578 .unwrap();
579 }
580}